Redis cli处理服务端的返回逻辑

日本镰仓

redisGetReply方法

redisGetReply 中的 redisBufferRead 的作用是将数据读取到 redis context 的 reader 中,每个 reader 都包含一个读 buf,pos 和 len 记录了 buf 的长度和读取位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
typedef struct redisReader {
int err; /* Error flags, 0 when there is no error */
char errstr[128]; /* String representation of error when applicable */

char *buf; /* Read buffer */
size_t pos; /* Buffer cursor */
size_t len; /* Buffer length */
size_t maxbuf; /* Max length of unused buffer */

redisReadTask rstack[9]; // 遍历使用的栈
int ridx; /* Index of current read task */ 标识遍历到哪一层
void *reply; /* Temporary reply pointer */

redisReplyObjectFunctions *fn;
void *privdata;
} redisReader;
  • buf:读缓冲区
  • pos:当前读取到的位置
  • len:buff 的长度

    redisReaderGetReply

    redisReaderGetReply是将reader中保存的数据生成一个reply

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
先来看用到的数据结构
typedef struct redisReadTask {
int type;
int elements; /* number of elements in multibulk container */
int idx; /* index in parent (array) object */
void *obj; /* holds user-generated value for a read task */
struct redisReadTask *parent; /* parent task */
void *privdata; /* user-settable arbitrary field */
} redisReadTask;

/* This is the reply object returned by redisCommand() */
typedef struct redisReply {
int type; /* REDIS_REPLY_* */
long long integer; /* The integer when type is REDIS_REPLY_INTEGER */
double dval; /* The double when type is REDIS_REPLY_DOUBLE */
size_t len; /* Length of string */
char *str; /* Used for REDIS_REPLY_ERROR, REDIS_REPLY_STRING
and REDIS_REPLY_DOUBLE (in additionl to dval). */
char vtype[4]; /* Used for REDIS_REPLY_VERB, contains the null
terminated 3 character content type, such as "txt". */
size_t elements; /* number of elements, for REDIS_REPLY_ARRAY */
struct redisReply **element; /* elements vector for REDIS_REPLY_ARRAY */
}

处理流程

  1. redisReaderGetReply中初始化栈,生成根节点,如下所示
1
2
3
4
5
6
7
8
9
if (r->ridx == -1) {
r->rstack[0].type = -1;
r->rstack[0].elements = -1;
r->rstack[0].idx = -1;
r->rstack[0].obj = NULL;
r->rstack[0].parent = NULL;
r->rstack[0].privdata = r->privdata;
r->ridx = 0;
}
  1. 深度优先遍历所有节点
1
2
3
while (r->ridx >= 0)
if (processItem(r) != REDIS_OK)
break;
  1. 返回reply给上层方法
1
2
3
4
5
6
7
8
9
/* Emit a reply when there is one. */
if (r->ridx == -1) {
if (reply != NULL) {
*reply = r->reply;
} else if (r->reply != NULL && r->fn && r->fn->freeObject) {
r->fn->freeObject(r->reply);
}
r->reply = NULL;
}

关键的processItem

processItem中对resp协议的每一种元素进行了识别,同时将这些类型划分为三类,分别是:processLineItem、processBulkItem、processAggregateItem。通过以下代码可方便的看出这三种分别对应的数据类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
switch(cur->type) {
case REDIS_REPLY_ERROR:
case REDIS_REPLY_STATUS:
case REDIS_REPLY_INTEGER:
case REDIS_REPLY_DOUBLE:
case REDIS_REPLY_NIL:
case REDIS_REPLY_BOOL:
return processLineItem(r);
case REDIS_REPLY_STRING:
case REDIS_REPLY_VERB:
return processBulkItem(r);
case REDIS_REPLY_ARRAY:
case REDIS_REPLY_MAP:
case REDIS_REPLY_SET:
return processAggregateItem(r);

processAggregateItem

processAggregateItem也算直观,如果elements > 0,则将该task节点信息完善,完成该节点对应的reply的obj赋值。同时创建一个新的task,为了进一步的遍历后续节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
static int processAggregateItem(redisReader *r) {
redisReadTask *cur = &(r->rstack[r->ridx]);
void *obj;
char *p;
long long elements;
int root = 0, len;

/* Set error for nested multi bulks with depth > 7 */
if (r->ridx == 8) {
__redisReaderSetError(r,REDIS_ERR_PROTOCOL,
"No support for nested multi bulk replies with depth > 7");
return REDIS_ERR;
}

if ((p = readLine(r,&len)) != NULL) {
if (string2ll(p, len, &elements) == REDIS_ERR) {
__redisReaderSetError(r,REDIS_ERR_PROTOCOL,
"Bad multi-bulk length");
return REDIS_ERR;
}

root = (r->ridx == 0);

if (elements < -1 || (LLONG_MAX > SIZE_MAX && elements > SIZE_MAX)) {
__redisReaderSetError(r,REDIS_ERR_PROTOCOL,
"Multi-bulk length out of range");
return REDIS_ERR;
}

if (elements == -1) {
if (r->fn && r->fn->createNil)
obj = r->fn->createNil(cur);
else
obj = (void*)REDIS_REPLY_NIL;

if (obj == NULL) {
__redisReaderSetErrorOOM(r);
return REDIS_ERR;
}

moveToNextTask(r);
} else {
if (cur->type == REDIS_REPLY_MAP) elements *= 2;

if (r->fn && r->fn->createArray)
obj = r->fn->createArray(cur,elements);
else
obj = (void*)(long)cur->type;

if (obj == NULL) {
__redisReaderSetErrorOOM(r);
return REDIS_ERR;
}

/* Modify task stack when there are more than 0 elements.*/
if (elements > 0) {
cur->elements = elements;
cur->obj = obj;
r->ridx++;
r->rstack[r->ridx].type = -1;
r->rstack[r->ridx].elements = -1;
r->rstack[r->ridx].idx = 0;
r->rstack[r->ridx].obj = NULL;
r->rstack[r->ridx].parent = cur;
r->rstack[r->ridx].privdata = r->privdata;
} else {
moveToNextTask(r);
}
}

/* Set reply if this is the root object. */
if (root) r->reply = obj;
return REDIS_OK;
}

return REDIS_ERR;
}

processLineItem

processLineItem比较奇怪,因为创建了obj后一直没有使用它。但是有个细节,我们看下:obj = r->fn->createInteger(cur,v)。这说明基于v值创建的对象其实已经绑定到了cur任务中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
static int processLineItem(redisReader *r) {
redisReadTask *cur = &(r->rstack[r->ridx]);
void *obj;
char *p;
int len;

if ((p = readLine(r,&len)) != NULL) {
if (cur->type == REDIS_REPLY_INTEGER) {
if (r->fn && r->fn->createInteger) {
long long v;
if (string2ll(p, len, &v) == REDIS_ERR) {
__redisReaderSetError(r,REDIS_ERR_PROTOCOL,
"Bad integer value");
return REDIS_ERR;
}
obj = r->fn->createInteger(cur,v);
} else {
obj = (void*)REDIS_REPLY_INTEGER;
}
} else if (cur->type == REDIS_REPLY_DOUBLE) {
if (r->fn && r->fn->createDouble) {
char buf[326], *eptr;
double d;

if ((size_t)len >= sizeof(buf)) {
__redisReaderSetError(r,REDIS_ERR_PROTOCOL,
"Double value is too large");
return REDIS_ERR;
}

memcpy(buf,p,len);
buf[len] = '\0';

if (strcasecmp(buf,",inf") == 0) {
d = INFINITY; /* Positive infinite. */
} else if (strcasecmp(buf,",-inf") == 0) {
d = -INFINITY; /* Nevative infinite. */
} else {
d = strtod((char*)buf,&eptr);
if (buf[0] == '\0' || eptr[0] != '\0' || isnan(d)) {
__redisReaderSetError(r,REDIS_ERR_PROTOCOL,
"Bad double value");
return REDIS_ERR;
}
}
obj = r->fn->createDouble(cur,d,buf,len);
} else {
obj = (void*)REDIS_REPLY_DOUBLE;
}
} else if (cur->type == REDIS_REPLY_NIL) {
if (r->fn && r->fn->createNil)
obj = r->fn->createNil(cur);
else
obj = (void*)REDIS_REPLY_NIL;
} else if (cur->type == REDIS_REPLY_BOOL) {
int bval = p[0] == 't' || p[0] == 'T';
if (r->fn && r->fn->createBool)
obj = r->fn->createBool(cur,bval);
else
obj = (void*)REDIS_REPLY_BOOL;
} else {
/* Type will be error or status. */
if (r->fn && r->fn->createString)
obj = r->fn->createString(cur,p,len);
else
obj = (void*)(size_t)(cur->type);
}

if (obj == NULL) {
__redisReaderSetErrorOOM(r);
return REDIS_ERR;
}

/* Set reply if this is the root object. */
if (r->ridx == 0) r->reply = obj;
moveToNextTask(r);
return REDIS_OK;
}

return REDIS_ERR;
}

具体来看下createInteger,发现如果父节点是map、array、set,则将数据放入父节点对象的elements中,如果非结构化对象,则直接返回整数对象,其余line类型的数据也类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static void *createIntegerObject(const redisReadTask *task, long long value) {
redisReply *r, *parent;

r = createReplyObject(REDIS_REPLY_INTEGER);
if (r == NULL)
return NULL;

r->integer = value;

if (task->parent) { // 如果父节点是map、array、set,则将数据放入父节点对象的elements中
parent = task->parent->obj;
assert(parent->type == REDIS_REPLY_ARRAY ||
parent->type == REDIS_REPLY_MAP ||
parent->type == REDIS_REPLY_SET);
parent->element[task->idx] = r;
}
return r;
}
redis-server redis-cli的实现原理
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×