首先在initServer中,注册eventloop,为io复用做准备
1
2server.c
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);ListenToPort创建基于tcp的socket连接,并监听客户端连接,并将该socket设置为非阻塞
1
2
3
4
5server.c
/* Open the TCP listening socket for the user commands. */
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
exit(1);为每个server的fd设置可读事件回调
1
2
3
4
5for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating server.ipfd file event.");
}
}
aeCreateFileEvent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36ae.c
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
// 获取对应server socket fd对应的event
aeFileEvent *fe = &eventLoop->events[fd];
// 把fd写入eventloop的state里
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
// 设置该fd对应event的读状态或写状态
fe->mask |= mask;
// 注册读写处理的回调函数proc
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
// 存储数据
fe->clientData = clientData;
// 更新eventloop的最大fd
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}再来看看acceptTcpHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
while(max--) {
对于每个server的fd,accept一个客户端fd,封装了普通的accept
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}acceptCommonHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55static void acceptCommonHandler(connection *conn, int flags, char *ip) {
client *c;
UNUSED(ip);
/* Admission control will happen before a client is created and connAccept()
* called, because we don't want to even start transport-level negotiation
* if rejected.
*/
if (listLength(server.clients) >= server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";
/* That's a best effort error message, don't check write errors.
* Note that for TLS connections, no handshake was done yet so nothing is written
* and the connection will just drop.
*/
if (connWrite(conn,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
server.stat_rejected_conn++;
connClose(conn);
return;
}
/* Create connection and client */
// 创建客户端,设置该fd非阻塞,并且设置读函数readQueryFromClient
if ((c = createClient(conn)) == NULL) {
char conninfo[100];
serverLog(LL_WARNING,
"Error registering fd event for the new client: %s (conn: %s)",
connGetLastError(conn),
connGetInfo(conn, conninfo, sizeof(conninfo)));
connClose(conn); /* May be already closed, just ignore errors */
return;
}
/* Last chance to keep flags */
c->flags |= flags;
/* Initiate accept.
*
* Note that connAccept() is free to do two things here:
* 1. Call clientAcceptHandler() immediately;
* 2. Schedule a future call to clientAcceptHandler().
*
* Because of that, we must do nothing else afterwards.
*/
if (connAccept(conn, clientAcceptHandler) == C_ERR) {
char conninfo[100];
serverLog(LL_WARNING,
"Error accepting a client connection: %s (conn: %s)",
connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo)));
freeClient(connGetPrivateData(conn));
return;
}
}redis的eventloop在哪 -> aeMain
`