注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

和申的个人主页

专注于java开发,1985wanggang

 
 
 

日志

 
 

分析淘宝网络框架tbnet  

2016-05-26 09:59:57|  分类: 阿里巴巴开源代码 |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
http://blog.csdn.net/lizhitao/article/details/17011053

TBNET主要类之间的联系如下:



Transport类是用户使用的接口,服务器通过listen开始监听,需要传递实现IPacketStreamer和IServerAdaptor的类,其中IPacketStreamer用于数据缓冲区与packet(数据包)的相互转换,IServerAdaptor是服务器处理packet的接口,包含handlePacket和handleBatchPacket两个接口,用于处理单个数据包和批量处理数据包。监听之后,服务器就可以开始接受请求并处理,通过start来启动读写线程和超时线程,读写线程处理数据包请求,超时线程用于将超时的请求移除。

 

读写线程执行eventLoop,eventLoop循环的通过EpollSocketEvent来获取准备好的描述符,准备好的事件有两种情况,分别是监听描述符上的事件(由TcpAcceptor处理)和普通请求建立的描述符(由TcpComponent处理)。TcpAcceptor将接到的请求添加到epoll集合中,TcpComponent则会调用Connect的handPacket接口,该接口会根据服务器的参数,调用iServerAdaptor的handlePacket或handleBatchPacket接口。

客户端通过Transport::connect连接服务器,返回一个TcpConnect类,通过postPacket可向服务器发送数据包,需要指定处理回复的接口IPacketHandler,这里还可以指定额外的参数。TBNET的请求是异步处理的,其通过Channel实现,每个请求对应一个channel id,ChannelPool包含channel id到IPacketHandler的映射,postPacket时,会将packet对应的channel id与IPacketHandler及额外参数添加到ChannelPool中,当收到数据包时,根据包的channel id从ChannelPool中取出对应的handler来处理请求。ConnectionManager提供对请求连接的同一管理,提供控制连接参数,以及connect、disconnec、sendPacket等接口,客户端使用该接口发包更简单方便。


通读源代码Tbnet源代码,Tbnet模型的问题在于多个工作线程从任务队列获取任务需要加锁互斥,此过程会产生大量的上下切换,线程等待,当队列IO量非常大时就成了瓶颈。一般像nginx这种模型架构(异步回调),socket读写处理,超时检查都在一个线程中解决。每次从epoll轮询获取的大量事件,首先放到自己的队列中,接着轮询队列队列依次处理业务逻辑,如果处理过程中遇到EAGAIN不能立即处理,则把该事件放入红黑树(rbtreee)中.当定时器触发时,依据时间判断超时事件资源释放或结束掉。

Tbnet超时线程每隔100ms会对IO队列加锁轮询,检查是否有相关IO处理超时或长时间没有做任何操作,需要剔除出队列。

Transport.cpp

[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. /* 
  2.  * 起动运输层,创建两个线程,一个用于读写,一个用超时处理,他们共用一个io队列。 
  3.  * @return 是否成功, true - 成功, false - 失败。 
  4.  */  
  5. bool Transport::start() {  
  6.     signal(SIGPIPE, SIG_IGN);  
  7.     _readWriteThread.start(this, &_socketEvent);  
  8.     _timeoutThread.start(this, NULL);  
  9.     return true;  
  10. }  
[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. /*  
  2.  * socket event(事件循环) 的检测, 被run函数调用  
[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1.  * _readWriteThread读写线程,专门负责accept新建连接和处理socket读写操作  
  2.  */  
  3. void Transport::eventLoop(SocketEvent *socketEvent) {  
  4.     IOEvent events[MAX_SOCKET_EVENTS];  
  5.   
  6.     while (!_stop) {  
  7.         // 检查是否有事件发生  
  8.         int cnt = socketEvent->getEvents(1000, events, MAX_SOCKET_EVENTS);  
  9.         if (cnt < 0) {  
  10.             TBSYS_LOG(INFO, "得到events出错了: %s(%d)\n", strerror(errno), errno);  
  11.         }  
  12.   
  13.         for (int i = 0; i < cnt; i++) {  
  14.             IOComponent *ioc = events[i]._ioc;  
  15.             if (ioc == NULL) {  
  16.                 continue;  
  17.             }  
  18.             if (events[i]._errorOccurred) { // 错误发生了  
  19.                 removeComponent(ioc);  
  20.                 continue;  
  21.             }  
  22.   
  23.             ioc->addRef();  
  24.             // 读写  
  25.             bool rc = true;  
  26.             if (events[i]._readOccurred) {  
  27.                 rc = ioc->handleReadEvent();  
  28.             }  
  29.             if (rc && events[i]._writeOccurred) {  
  30.                 rc = ioc->handleWriteEvent();  
  31.             }  
  32.             ioc->subRef();  
  33.   
  34.             if (!rc) {  
  35.                 removeComponent(ioc);  
  36.             }  
  37.         }  
  38.     }  
  39. }  
  40.   
  41. /*  
  42.  * 超时检查, 被run函数调用  
[cpp] view plain copy
 在CODE上查看代码片派生到我的代码片
  1.  * 超时轮询  
  2.  */  
  3. void Transport::timeoutLoop() {  
  4.     IOComponent *mydelHead = NULL;  
  5.     IOComponent *mydelTail = NULL;  
  6.     std::vector<IOComponent*> mylist;  
  7.     while (!_stop) {  
  8.         // 先写复制到list中  
  9.         _iocsMutex.lock();  
  10.         if (_iocListChanged) {  
  11.             mylist.clear();  
  12.             IOComponent *iocList = _iocListHead;  
  13.             while (iocList) {  
  14.                 mylist.push_back(iocList);  
  15.                 iocList = iocList->_next;  
  16.             }  
  17.             _iocListChanged = false;  
  18.         }  
  19.         // 加入到mydel中  
  20.         if (_delListHead != NULL && _delListTail != NULL) {  
  21.             if (mydelTail == NULL) {  
  22.                 mydelHead = _delListHead;  
  23.             } else {  
  24.                 mydelTail->_next = _delListHead;  
  25.                 _delListHead->_prev = mydelTail;  
  26.             }  
  27.             mydelTail = _delListTail;  
  28.             // 清空delList  
  29.             _delListHead = _delListTail = NULL;  
  30.         }  
  31.         _iocsMutex.unlock();  
  32.   
  33.         // 对每个iocomponent进行检查  
  34.         for (int i=0; i<(int)mylist.size(); i++) {  
  35.             IOComponent *ioc = mylist[i];  
  36.             ioc->checkTimeout(tbsys::CTimeUtil::getTime());  
  37.         }  
  38.   
  39.         // 删除掉  
  40.         IOComponent *tmpList = mydelHead;  
  41.         int64_t nowTime = tbsys::CTimeUtil::getTime() - static_cast<int64_t>(900000000); // 15min  
  42.         while (tmpList) {  
  43.             if (tmpList->getRef() <= 0) {  
  44.                 tmpList->subRef();  
  45.             }  
  46.             if (tmpList->getRef() <= -10 || tmpList->getLastUseTime() < nowTime) {  
  47.                 // 从链中删除  
  48.                 if (tmpList == mydelHead) { // head  
  49.                     mydelHead = tmpList->_next;  
  50.                 }  
  51.                 if (tmpList == mydelTail) { // tail  
  52.                     mydelTail = tmpList->_prev;  
  53.                 }  
  54.                 if (tmpList->_prev != NULL)  
  55.                     tmpList->_prev->_next = tmpList->_next;  
  56.                 if (tmpList->_next != NULL)  
  57.                     tmpList->_next->_prev = tmpList->_prev;  
  58.   
  59.                 IOComponent *ioc = tmpList;  
  60.                 tmpList = tmpList->_next;  
  61.                 TBSYS_LOG(INFO, "DELIOC, %s, IOCount:%d, IOC:%p\n",  
  62.                           ioc->getSocket()->getAddr().c_str(), _iocListCount, ioc);  
  63.                 delete ioc;  
  64.             } else {  
  65.                 tmpList = tmpList->_next;  
  66.             }  
  67.         }  
  68.   
  69.         usleep(500000);  // 最小间隔100ms  
  70.     }  
  71.   
  72.     // 写回到_delList上,让destroy销毁  
  73.     _iocsMutex.lock();  
  74.     if (mydelHead != NULL) {  
  75.         if (_delListTail == NULL) {  
  76.             _delListHead = mydelHead;  
  77.         } else {  
  78.             _delListTail->_next = mydelHead;  
  79.             mydelHead->_prev = _delListTail;  
  80.         }  
  81.         _delListTail = mydelTail;  
  82.     }  
  83.     _iocsMutex.unlock();  
  84. }  
  85.   
  86. /* 
  87.  * 线程的运行函数,实现Runnable接口中的函数 
  88.  * 
  89.  * @param arg: 运行时传入参数 
  90.  */  
  91. void Transport::run(tbsys::CThread *threadvoid *arg) {  
  92.     if (thread == &_timeoutThread) {  
  93.         timeoutLoop();  
  94.     } else {  
  95.         eventLoop((SocketEvent*)arg);  
  96.     }  
  97. }  
  98.   
  99. /* 
  100.  * 把[upd|tcp]:ip:port分开放在args中 
  101.  * 
  102.  * @param src: 源格式 
  103.  * @param args: 目标数组 
  104.  * @param   cnt: 数组中最大个数 
  105.  * @return  返回的数组中个数 
  106.  */  
  107. int Transport::parseAddr(char *src, char **args, int cnt) {  
  108.     int index = 0;  
  109.     char *prev = src;  
  110.   
  111.     while (*src) {  
  112.         if (*src == ':') {  
  113.             *src = '\0';  
  114.             args[index++] = prev;  
  115.   
  116.             if (index >= cnt) { // 数组满了,返回  
  117.                 return index;  
  118.             }  
  119.   
  120.             prev = src + 1;  
  121.         }  
  122.   
  123.         src ++;  
  124.     }  
  125.   
  126.     args[index++] = prev;  
  127.   
  128.     return index;  
  129. }  
  130.   
  131. /* 
  132.  * 起一个监听端口。 
  133.  * 
  134.  * @param spec: 格式 [upd|tcp]:ip:port 
  135.  * @param streamer: 数据包的双向流,用packet创建,解包,组包。 
  136.  * @param serverAdapter: 用在服务器端,当Connection初始化及Channel创建时回调时用 
  137.  * @return IO组件一个对象的指针 
  138.  */  
  139. IOComponent *Transport::listen(const char *spec, IPacketStreamer *streamer, IServerAdapter *serverAdapter) {  
  140.     char tmp[1024];  
  141.     char *args[32];  
  142.     strncpy(tmp, spec, 1024);  
  143.     tmp[1023] = '\0';  
  144.   
  145.     if (parseAddr(tmp, args, 32) != 3) {  
  146.         return NULL;  
  147.     }  
  148.   
  149.     if (strcasecmp(args[0], "tcp") == 0) {  
  150.         char *host = args[1];  
  151.         int port = atoi(args[2]);  
  152.   
  153.         // Server Socket  
  154.         ServerSocket *socket = new ServerSocket();  
  155.   
  156.         if (!socket->setAddress(host, port)) {  
  157.             delete socket;  
  158.             return NULL;  
  159.         }  
  160.   
  161.         // TCPAcceptor  
  162.         TCPAcceptor *acceptor = new TCPAcceptor(this, socket, streamer, serverAdapter);  
  163.   
  164.         if (!acceptor->init()) {  
  165.             delete acceptor;  
  166.             return NULL;  
  167.         }  
  168.   
  169.         // 加入到iocomponents中,及注册可读到socketevent中  
  170.         addComponent(acceptor, truefalse);  
  171.   
  172.         // 返回  
  173.         return acceptor;  
  174.     } else if (strcasecmp(args[0], "udp") == 0) {}  
  175.   
  176.     return NULL;  
  177. }  
  178.   
  179. /* 
  180.  * 创建一个Connection,连接到指定的地址,并加入到Socket的监听事件中。 
  181.  * 
  182.  * @param spec: 格式 [upd|tcp]:ip:port 
  183.  * @param streamer: 数据包的双向流,用packet创建,解包,组包。 
  184.  * @return  返回一个Connectoion对象指针 
  185.  */  
  186. Connection *Transport::connect(const char *spec, IPacketStreamer *streamer, bool autoReconn) {  
  187.     char tmp[1024];  
  188.     char *args[32];  
  189.     strncpy(tmp, spec, 1024);  
  190.     tmp[1023] = '\0';  
  191.   
  192.     if (parseAddr(tmp, args, 32) != 3) {  
  193.         return NULL;  
  194.     }  
  195.   
  196.     if (strcasecmp(args[0], "tcp") == 0) {  
  197.         char *host = args[1];  
  198.         int port = atoi(args[2]);  
  199.   
  200.         // Socket  
  201.         Socket *socket = new Socket();  
  202.   
  203.         if (!socket->setAddress(host, port)) {  
  204.             delete socket;  
  205.             TBSYS_LOG(ERROR, "设置setAddress错误: %s:%d, %s", host, port, spec);  
  206.             return NULL;  
  207.         }  
  208.   
  209.         // TCPComponent  
  210.         TCPComponent *component = new TCPComponent(this, socket, streamer, NULL);  
  211.         // 设置是否自动重连  
  212.         component->setAutoReconn(autoReconn);  
  213.         if (!component->init()) {  
  214.             delete component;  
  215.             TBSYS_LOG(ERROR, "初始化失败TCPComponent: %s:%d", host, port);  
  216.             return NULL;  
  217.         }  
  218.   
  219.         // 加入到iocomponents中,及注册可写到socketevent中  
  220.         addComponent(component, truetrue);  
  221.         component->addRef();  
  222.   
  223.         return component->getConnection();  
  224.     } else if (strcasecmp(args[0], "udp") == 0) {}  
  225.   
  226.     return NULL;  
  227. }  
  228.   
  229. /** 
  230.  * 主动断开 
  231.  */  
  232. bool Transport::disconnect(Connection *conn) {  
  233.     IOComponent *ioc = NULL;  
  234.     if (conn == NULL || (ioc = conn->getIOComponent()) == NULL) {  
  235.         return false;  
  236.     }  
  237.     ioc->setAutoReconn(false);  
  238.     ioc->subRef();  
  239.     if (ioc->_socket) {  
  240.         ioc->_socket->shutdown();  
  241.     }  
  242.     return true;  
  243. }  
  244.   
  245. /* 
  246.  * 加入到iocomponents中 
  247.  * @param  ioc: IO组件 
  248.  * @param  isWrite: 加入读或写事件到socketEvent中 
  249.  */  
  250. void Transport::addComponent(IOComponent *ioc, bool readOn, bool writeOn) {  
  251.     assert(ioc != NULL);  
  252.   
  253.     _iocsMutex.lock();  
  254.     if (ioc->isUsed()) {  
  255.         TBSYS_LOG(ERROR, "已给加过addComponent: %p", ioc);  
  256.         _iocsMutex.unlock();  
  257.         return;  
  258.     }  
  259.     // 加入iocList上  
  260.     ioc->_prev = _iocListTail;  
  261.     ioc->_next = NULL;  
  262.     if (_iocListTail == NULL) {  
  263.         _iocListHead = ioc;  
  264.     } else {  
  265.         _iocListTail->_next = ioc;  
  266.     }  
  267.     _iocListTail = ioc;  
  268.     // 设置在用  
  269.     ioc->setUsed(true);  
  270.     _iocListChanged = true;  
  271.     _iocListCount ++;  
  272.     _iocsMutex.unlock();  
  273.   
  274.     // 设置socketevent  
  275.     Socket *socket = ioc->getSocket();  
  276.     ioc->setSocketEvent(&_socketEvent);  
  277.     _socketEvent.addEvent(socket, readOn, writeOn);  
  278.     TBSYS_LOG(INFO, "ADDIOC, SOCK: %d, %s, RON: %d, WON: %d, IOCount:%d, IOC:%p\n",  
  279.               socket->getSocketHandle(), ioc->getSocket()->getAddr().c_str(),  
  280.               readOn, writeOn, _iocListCount, ioc);  
  281. }  
  282.   
  283. /* 
  284.  * 删除iocomponet 
  285.  * @param ioc: IO组件 
  286.  */  
  287. void Transport::removeComponent(IOComponent *ioc) {  
  288.     assert(ioc != NULL);  
  289.     tbsys::CThreadGuard guard(&_iocsMutex);  
  290.     ioc->close();  
  291.     if (ioc->isAutoReconn()) { // 需要重连, 不从iocomponents去掉  
  292.         return;  
  293.     }  
  294.     if (ioc->isUsed() == false) { // 不在iocList中  
  295.         return;  
  296.     }  
  297.   
  298.     // 从_iocList删除  
  299.     if (ioc == _iocListHead) { // head  
  300.         _iocListHead = ioc->_next;  
  301.     }  
  302.     if (ioc == _iocListTail) { // tail  
  303.         _iocListTail = ioc->_prev;  
  304.     }  
  305.     if (ioc->_prev != NULL)  
  306.         ioc->_prev->_next = ioc->_next;  
  307.     if (ioc->_next != NULL)  
  308.         ioc->_next->_prev = ioc->_prev;  
  309.   
  310.     // 加入到_delList  
  311.     ioc->_prev = _delListTail;  
  312.     ioc->_next = NULL;  
  313.     if (_delListTail == NULL) {  
  314.         _delListHead = ioc;  
  315.     } else {  
  316.         _delListTail->_next = ioc;  
  317.     }  
  318.     _delListTail = ioc;  
  319.   
  320.     // 引用计数减一  
  321.     ioc->setUsed(false);  
  322.     _iocListChanged = true;  
  323.     _iocListCount --;  
  324.   
  325.     TBSYS_LOG(INFO, "RMIOC, %s IOCount:%d, IOC:%p\n",  
  326.               ioc->getSocket()->getAddr().c_str(),  
  327.               _iocListCount, ioc);  
  328. }  
  329.   
  330. /* 
  331.  * 释放变量 
  332.  */  
  333. void Transport::destroy() {  
  334.     tbsys::CThreadGuard guard(&_iocsMutex);  
  335.   
  336.     IOComponent *list, *ioc;  
  337.     // 删除iocList  
  338.     list = _iocListHead;  
  339.     while (list) {  
  340.         ioc = list;  
  341.         list = list->_next;  
  342.         _iocListCount --;  
  343.         TBSYS_LOG(INFO, "DELIOC, IOCount:%d, IOC:%p\n",  
  344.                   _iocListCount, ioc);  
  345.         delete ioc;  
  346.     }  
  347.     _iocListHead = _iocListTail = NULL;  
  348.     _iocListCount = 0;  
  349.     // 删除delList  
  350.     list = _delListHead;  
  351.     while (list) {  
  352.         ioc = list;  
  353.         assert(ioc != NULL);  
  354.         list = list->_next;  
  355.         TBSYS_LOG(INFO, "DELIOC, IOCount:%d, IOC:%p\n",  
  356.                   _iocListCount, ioc);  
  357.         delete ioc;  
  358.     }  
  359.     _delListHead = _delListTail = NULL;  
  360. }  
  361.   
  362. /** 
  363.  * 是否为stop 
  364.  */  
  365. bool* Transport::getStop()  
  366. {  
  367.     return &_stop;  
  368. }  




部分引用自 http://blog.chinaunix.net/uid-20196318-id-3142050.html

  评论这张
 
阅读(126)| 评论(0)
推荐 转载

历史上的今天

在LOFTER的更多文章

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2016