redis-server

  1. 首先在initServer中,注册eventloop,为io复用做准备

    1
    2
    server.c
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
  2. ListenToPort创建基于tcp的socket连接,并监听客户端连接,并将该socket设置为非阻塞

    1
    2
    3
    4
    5
    server.c
    /* Open the TCP listening socket for the user commands. */
    if (server.port != 0 &&
    listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
    exit(1);
  3. 为每个server的fd设置可读事件回调

    1
    2
    3
    4
    5
    for (j = 0; j < server.ipfd_count; j++) {
    if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) {
    serverPanic("Unrecoverable error creating server.ipfd file event.");
    }
    }
  1. aeCreateFileEvent

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    ae.c
    typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
    } aeFileEvent;
    int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
    aeFileProc *proc, void *clientData)
    {
    if (fd >= eventLoop->setsize) {
    errno = ERANGE;
    return AE_ERR;
    }
    // 获取对应server socket fd对应的event
    aeFileEvent *fe = &eventLoop->events[fd];

    // 把fd写入eventloop的state里
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
    return AE_ERR;

    // 设置该fd对应event的读状态或写状态
    fe->mask |= mask;

    // 注册读写处理的回调函数proc
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;

    // 存储数据
    fe->clientData = clientData;

    // 更新eventloop的最大fd
    if (fd > eventLoop->maxfd)
    eventLoop->maxfd = fd;
    return AE_OK;
    }
  2. 再来看看acceptTcpHandler

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    while(max--) {
    对于每个server的fd,accept一个客户端fd,封装了普通的accept
    cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
    if (cfd == ANET_ERR) {
    if (errno != EWOULDBLOCK)
    serverLog(LL_WARNING,
    "Accepting client connection: %s", server.neterr);
    return;
    }
    serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
    acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
    }
    }
  3. acceptCommonHandler

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    static void acceptCommonHandler(connection *conn, int flags, char *ip) {
    client *c;
    UNUSED(ip);

    /* Admission control will happen before a client is created and connAccept()
    * called, because we don't want to even start transport-level negotiation
    * if rejected.
    */
    if (listLength(server.clients) >= server.maxclients) {
    char *err = "-ERR max number of clients reached\r\n";

    /* That's a best effort error message, don't check write errors.
    * Note that for TLS connections, no handshake was done yet so nothing is written
    * and the connection will just drop.
    */
    if (connWrite(conn,err,strlen(err)) == -1) {
    /* Nothing to do, Just to avoid the warning... */
    }
    server.stat_rejected_conn++;
    connClose(conn);
    return;
    }

    /* Create connection and client */
    // 创建客户端,设置该fd非阻塞,并且设置读函数readQueryFromClient
    if ((c = createClient(conn)) == NULL) {
    char conninfo[100];
    serverLog(LL_WARNING,
    "Error registering fd event for the new client: %s (conn: %s)",
    connGetLastError(conn),
    connGetInfo(conn, conninfo, sizeof(conninfo)));
    connClose(conn); /* May be already closed, just ignore errors */
    return;
    }

    /* Last chance to keep flags */
    c->flags |= flags;

    /* Initiate accept.
    *
    * Note that connAccept() is free to do two things here:
    * 1. Call clientAcceptHandler() immediately;
    * 2. Schedule a future call to clientAcceptHandler().
    *
    * Because of that, we must do nothing else afterwards.
    */
    if (connAccept(conn, clientAcceptHandler) == C_ERR) {
    char conninfo[100];
    serverLog(LL_WARNING,
    "Error accepting a client connection: %s (conn: %s)",
    connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo)));
    freeClient(connGetPrivateData(conn));
    return;
    }
    }
  4. redis的eventloop在哪 -> aeMain
    `

IO复用之epoll Redis cli处理服务端的返回逻辑
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×