博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Redis源代码分析(二十二)--- networking网络协议传输
阅读量:6257 次
发布时间:2019-06-22

本文共 12072 字,大约阅读时间需要 40 分钟。

          上次我仅仅分析了Redis网络部分的代码一部分,今天我把networking的代码实现部分也学习了一遍,netWorking的代码很多其它偏重的是Clientclient的操作。里面addReply()系列的方法操作是基本的部分。

光光这个系列的方法,应该占领了一半的API的数量。我把API分成了3个部分:

/* ------------ API ---------------------- */void *dupClientReplyValue(void *o)	/* 复制value一份 */int listMatchObjects(void *a, void *b) /* 比价2个obj是否相等 */robj *dupLastObjectIfNeeded(list *reply) /* 返回回复列表中最后一个元素对象 */void copyClientOutputBuffer(redisClient *dst, redisClient *src) /* 将源Client的输出buffer复制给目标Client */static void acceptCommonHandler(int fd, int flags) /* 网络连接后的调用方法 */void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask)void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask)void disconnectSlaves(void) /* 使server的slave失去连接 */void replicationHandleMasterDisconnection(void)void flushSlavesOutputBuffers(void) /* 从方法将会在freeMemoryIfNeeded()。释放内存空间函数,将存在内存中数据操作结果刷新到磁盘中 */int processEventsWhileBlocked(void)/* ------------- addReply API -----------------   */int _addReplyToBuffer(redisClient *c, char *s, size_t len) /* 往client缓冲区中加入内容 */void _addReplyObjectToList(redisClient *c, robj *o) /* robj加入到reply的列表中 */void _addReplySdsToList(redisClient *c, sds s) /* 在回复列表中加入Sds字符串对象 */void _addReplyStringToList(redisClient *c, char *s, size_t len) /* 在回复列表中加入字符串对象,參数中已经给定字符的长度 */void addReply(redisClient *c, robj *obj) /* 在redisClient的buffer中写入数据,数据存在obj->ptr的指针中 */void addReplySds(redisClient *c, sds s) /* 在回复中加入Sds字符串,以下的额addReply()系列方法原理基本相似 */void addReplyString(redisClient *c, char *s, size_t len)void addReplyErrorLength(redisClient *c, char *s, size_t len)void addReplyError(redisClient *c, char *err) /* 往Reply中加入error类的信息 */void addReplyErrorFormat(redisClient *c, const char *fmt, ...)void addReplyStatusLength(redisClient *c, char *s, size_t len)void addReplyStatus(redisClient *c, char *status)void addReplyStatusFormat(redisClient *c, const char *fmt, ...)void *addDeferredMultiBulkLength(redisClient *c) /* 在reply list 中加入一个空的obj对象 */void setDeferredMultiBulkLength(redisClient *c, void *node, long length)void addReplyDouble(redisClient *c, double d) /* 在bulk reply中加入一个double类型值。bulk的意思为大块的。bulk reply的意思为大数据量的回复 */void addReplyLongLongWithPrefix(redisClient *c, long long ll, char prefix)void addReplyLongLong(redisClient *c, long long ll)void addReplyMultiBulkLen(redisClient *c, long length)void addReplyBulkLen(redisClient *c, robj *obj) /* 加入bulk 大块的数据的长度 */void addReplyBulk(redisClient *c, robj *obj) /* 将一个obj的数据。拆分成大块数据的加入 */void addReplyBulkCBuffer(redisClient *c, void *p, size_t len)void addReplyBulkCString(redisClient *c, char *s)void addReplyBulkLongLong(redisClient *c, long long ll)/* ------------- Client API -----------------   */	redisClient *createClient(int fd) /* 创建redisClientclient,1.建立连接,2.设置数据库。3.属性设置 */int prepareClientToWrite(redisClient *c) /* 此方法将会被调用于Client准备接受新数据之前调用。在fileEvent为client设定writer的handler处理事件 */static void freeClientArgv(redisClient *c)void freeClient(redisClient *c) /* 释放freeClient,要分为Master和Slave2种情况作不同的处理 */void freeClientAsync(redisClient *c)void freeClientsInAsyncFreeQueue(void) /* 异步的freeclient */void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) /* 将Client中的reply数据存入文件里 */void resetClient(redisClient *c)int processInlineBuffer(redisClient *c) /* 处理redis Client的内链的buffer。就是c->querybuf */static void setProtocolError(redisClient *c, int pos)int processMultibulkBuffer(redisClient *c) /* 处理大块的buffer */void processInputBuffer(redisClient *c) /* 处理redisClient的查询buffer */void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) /* 从Client获取查询query语句 */void getClientsMaxBuffers(unsigned long *longest_output_list,                          unsigned long *biggest_input_buffer) /* 获取Client中输入buffer和输出buffer的最大长度值 */void formatPeerId(char *peerid, size_t peerid_len, char *ip, int port) /* 格式化ip,port端口号的输出,ip:port */int genClientPeerId(redisClient *client, char *peerid, size_t peerid_len) /* 获取Clientclient的ip,port地址信息 */char *getClientPeerId(redisClient *c) /* 获取c->peeridclient的地址信息 */sds catClientInfoString(sds s, redisClient *client) /* 格式化的输出client的属性信息。直接返回一个拼接好的字符串 */sds getAllClientsInfoString(void) /* 获取全部Clientclient的属性信息。并连接成一个总的字符串并输出 */void clientCommand(redisClient *c) /* 运行client的命令的作法 */void rewriteClientCommandVector(redisClient *c, int argc, ...) /* 重写client的命令集合,旧的命令集合的应用计数减1,新的Command  Vector的命令集合增1 */void rewriteClientCommandArgument(redisClient *c, int i, robj *newval) /* 重写Client中的第i个參数 */unsigned long getClientOutputBufferMemoryUsage(redisClient *c) /* 获取Client中已经用去的输出buffer的大小 */int getClientType(redisClient *c)int getClientTypeByName(char *name) /* Client中的名字的3种类型,normal,slave。pubsub */char *getClientTypeName(int class)int checkClientOutputBufferLimits(redisClient *c) /* 推断Clint的输出缓冲区的已经占用大小是否超过软限制或是硬限制 */void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) /* 异步的关闭Client。假设缓冲区中的软限制或是硬限制已经到达的时候,缓冲区超出限制的结果会导致释放不安全, */
我们从最简单的_addReplyToBuffer在缓冲区中加入回复数据開始说起,由于后面的各种addReply的方法都或多或少的调用了和这个歌方法。

/* ----------------------------------------------------------------------------- * Low level functions to add more data to output buffers. * -------------------------------------------------------------------------- *//* 往client缓冲区中加入内容 */int _addReplyToBuffer(redisClient *c, char *s, size_t len) {    size_t available = sizeof(c->buf)-c->bufpos;    if (c->flags & REDIS_CLOSE_AFTER_REPLY) return REDIS_OK;    /* If there already are entries in the reply list, we cannot     * add anything more to the static buffer. */     //假设当前的reply已经存在内容,则操作出错    if (listLength(c->reply) > 0) return REDIS_ERR;    /* Check that the buffer has enough space available for this string. */    if (len > available) return REDIS_ERR;    memcpy(c->buf+c->bufpos,s,len);    c->bufpos+=len;    return REDIS_OK;}
最直接影响的一句话,就是memcpy(c->buf+c->bufpos,s,len);所以内容是加到c->buf中的,这也就是client的输出buffer。加入操作还有第二种形式是加入对象类型:

/* robj加入到reply的列表中 */void _addReplyObjectToList(redisClient *c, robj *o) {    robj *tail;    if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;    if (listLength(c->reply) == 0) {        incrRefCount(o);        //在回复列表汇总加入robj内容        listAddNodeTail(c->reply,o);        c->reply_bytes += zmalloc_size_sds(o->ptr);    } else {        tail = listNodeValue(listLast(c->reply));        /* Append to this object when possible. */        if (tail->ptr != NULL &&            sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)        {            c->reply_bytes -= zmalloc_size_sds(tail->ptr);            tail = dupLastObjectIfNeeded(c->reply);            tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));            c->reply_bytes += zmalloc_size_sds(tail->ptr);        } else {            incrRefCount(o);            listAddNodeTail(c->reply,o);            c->reply_bytes += zmalloc_size_sds(o->ptr);        }    }    asyncCloseClientOnOutputBufferLimitReached(c);}
把robj对象载入reply列表中,而且改变reply的byte大小,最后还调用了一个asyncCloseClientOnOutputBufferLimitReached(c);方法,这种方法我是在这个文件的最底部找到的,一開始还真不知道什么意思,作用就是当加入完数据后,当client的输出缓冲的大小超出限制时,会被异步关闭:

/* Asynchronously close a client if soft or hard limit is reached on the * output buffer size. The caller can check if the client will be closed * checking if the client REDIS_CLOSE_ASAP flag is set. * * Note: we need to close the client asynchronously because this function is * called from contexts where the client can't be freed safely, i.e. from the * lower level functions pushing data inside the client output buffers. *//* 异步的关闭Client。假设缓冲区中的软限制或是硬限制已经到达的时候。缓冲区超出限制的结果会导致释放不安全, */void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) {    redisAssert(c->reply_bytes < ULONG_MAX-(1024*64));    if (c->reply_bytes == 0 || c->flags & REDIS_CLOSE_ASAP) return;    if (checkClientOutputBufferLimits(c)) {        sds client = catClientInfoString(sdsempty(),c);        freeClientAsync(c);        redisLog(REDIS_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);        sdsfree(client);    }}
在addReply方法调用的时候,有时是须要一个前提的,我说的是在写数据事件发生的时候,你得先对写的文件创建一个监听事件:

/* 在回复中加入Sds字符串 */void addReplySds(redisClient *c, sds s) {	//在调用加入操作之前,都要先运行prepareClientToWrite(c),设置文件事件的写事件    if (prepareClientToWrite(c) != REDIS_OK) {        /* The caller expects the sds to be free'd. */        sdsfree(s);        return;    }    if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) {        sdsfree(s);    } else {        /* This method free's the sds when it is no longer needed. */        _addReplySdsToList(c,s);    }}
在这个prepareClientToWrite()里面是干嘛的呢?

/* This function is called every time we are going to transmit new data * to the client. The behavior is the following: * * If the client should receive new data (normal clients will) the function * returns REDIS_OK, and make sure to install the write handler in our event * loop so that when the socket is writable new data gets written. * * If the client should not receive new data, because it is a fake client, * a master, a slave not yet online, or because the setup of the write handler * failed, the function returns REDIS_ERR. * * Typically gets called every time a reply is built, before adding more * data to the clients output buffers. If the function returns REDIS_ERR no * data should be appended to the output buffers. *//* 此方法将会被调用于Client准备接受新数据之前调用,在fileEvent为客户端设定writer的handler处理事件 */int prepareClientToWrite(redisClient *c) {    if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;    if ((c->flags & REDIS_MASTER) &&        !(c->flags & REDIS_MASTER_FORCE_REPLY)) return REDIS_ERR;    if (c->fd <= 0) return REDIS_ERR; /* Fake client */    if (c->bufpos == 0 && listLength(c->reply) == 0 &&        (c->replstate == REDIS_REPL_NONE ||         c->replstate == REDIS_REPL_ONLINE) &&        //在这里创建写的文件事件        aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,        sendReplyToClient, c) == AE_ERR) return REDIS_ERR;    return REDIS_OK;}
在addReply的方法里提到了一个addReplyBulk类型方法,Bulk的中文意思为大块的,说明addReplyBulk加入的都是一些比較大块的数据,找一个方法看看:

/* Add a Redis Object as a bulk reply *//* 将一个obj的数据。拆分成大块数据的加入 */void addReplyBulk(redisClient *c, robj *obj) {	//reply加入长度    addReplyBulkLen(c,obj);    //reply加入对象    addReply(c,obj);    addReply(c,shared.crlf);}
将原本一个robj的数据拆分成可3个普通的addReply的方法调用。

就变成了数据量变大了的数据。大数据的回复一个比較不好的地方是到时解析的时候或者是Data的复制的时候会比較耗时。在networking的方法里还提供了freeClient()的操作:

/* 释放freeClient,要分为Master和Slave2种情况作不同的处理 */void freeClient(redisClient *c) {    listNode *ln;    /* If this is marked as current client unset it */    if (server.current_client == c) server.current_client = NULL;    /* If it is our master that's beging disconnected we should make sure     * to cache the state to try a partial resynchronization later.     *     * Note that before doing this we make sure that the client is not in     * some unexpected state, by checking its flags. */    if (server.master && c->flags & REDIS_MASTER) {        redisLog(REDIS_WARNING,"Connection with master lost.");        if (!(c->flags & (REDIS_CLOSE_AFTER_REPLY|                          REDIS_CLOSE_ASAP|                          REDIS_BLOCKED|                          REDIS_UNBLOCKED)))        {        	//假设是Master客户端,须要做缓存Client的处理,能够迅速又一次启用            replicationCacheMaster(c);            return;        }    }
...后面代码略去了

   当Client中的输出buffer数据渐渐变多了的时候就要准备持久化到磁盘文件了。要调用以下这种方法了,

/* Helper function used by freeMemoryIfNeeded() in order to flush slave * output buffers without returning control to the event loop. *//* 从方法将会在freeMemoryIfNeeded(),释放内存空间函数,将存在内存中数据操作结果刷新到磁盘中 */void flushSlavesOutputBuffers(void) {    listIter li;    listNode *ln;    listRewind(server.slaves,&li);    while((ln = listNext(&li))) {        redisClient *slave = listNodeValue(ln);        int events;        events = aeGetFileEvents(server.el,slave->fd);        if (events & AE_WRITABLE &&            slave->replstate == REDIS_REPL_ONLINE &&            listLength(slave->reply))        {        	//在这里调用了write的方法            sendReplyToClient(server.el,slave->fd,slave,0);        }    }}
这种方法的核心调用又在sendReplyToClient()方法,就是把Client的reply内容和buf内容存入文件。以上就是我的理解了,代码量有点大,的确看的我头有点大。

转载于:https://www.cnblogs.com/gavanwanggw/p/6904536.html

你可能感兴趣的文章
Poj 2388 Who's in the Middle
查看>>
springboot与redis
查看>>
读《Cassandra权威指南》
查看>>
Xmanager连接linux
查看>>
Android开发教程 --- 数据存储 SQLite
查看>>
北大acm1006
查看>>
大数据环境下的数据质量管理策略
查看>>
vue中使用monaco-editor打包文件混乱的问题
查看>>
下载PhantomJS
查看>>
iOS自定义字体及类目
查看>>
lvs
查看>>
BeautifulSoup学习心得(一)
查看>>
20159208《网络攻防实践》第三周学习总结
查看>>
统计信号处理-简单看看克拉美罗界
查看>>
201621123048《java程序设计》第一周学习总结
查看>>
(转)C#中 特性(attribute)的用法
查看>>
IEnumerable.GetEnumerator Method
查看>>
android上的.9.png图片文件
查看>>
最大连续子序列和的问题
查看>>
【转】程序员中"5%神话";刘未鹏:为什么你应该写博客
查看>>