{{ v.name }}
{{ v.cls }}类
{{ v.price }} ¥{{ v.price }}
在分析msg_server的源码之前,我们先简单地回顾一下msg_server在整个服务器系统中的位置和作用:
各个服务程序的作用描述如下:
从上面的介绍中,我们可以看出teamtalk是支持分布式部署的一套聊天服务器程序,通过分布式部署可以实现分流和支持高数量的用户同时在线。msg_server是整个服务体系的核心系统,可以部署多个,不同的用户可以登录不同的msg_server。这套体系有如下几大亮点:
1.login_server可以根据当前各个msg_server上在线用户数量,来决定一个新用户登录到哪个msg_server,从而实现了负载平衡;
2.route_server可以将登录在不同的msg_server上的用户的聊天消息发给目标用户;
3.通过单独的一个数据库操作服务器db_proxy_server,避免了msg_server直接操作数据库,将数据库操作的入口封装起来。
在前一篇文章《teamtalk源码分析(四)——服务器端db_proxy_server源码分析》中,我介绍了每个服务如何接收连接、读取数据并解包、以及组装数据包发包的操作,这篇文章我将介绍作为客户端,一个服务如何连接另外一个服务。这里msg_server在启动时会同时连接db_proxy_server,login_server,file_server,route_server,push_server。在msg_server服务main函数里面有如下初始化调用:
//连接file_serverinit_file_serv_conn(file_server_list,file_server_count);//连接db_proxy_serverinit_db_serv_conn(db_server_list2,db_server_count2,concurrent_db_conn_cnt);//连接login_serverinit_login_serv_conn(login_server_list,login_server_count,ip_addr1,ip_addr2,listen_port,max_conn_cnt);//连接push_serverinit_route_serv_conn(route_server_list,route_server_count);//连接push_serverinit_push_serv_conn(push_server_list,push_server_count);其中每个连接服务的流程都是一样的。我们这里以第一个连接file_server为例:
voidinit_file_serv_conn(serv_info_t*server_list,uint32_tserver_count){g_file_server_list=server_list;g_file_server_count=server_count;serv_init
template
voidcfileservconn::connect(constchar*server_ip,uint16_tserver_port,uint32_tidx){log("connectingtofileserver%s:%d",server_ip,server_port);m_serv_idx=idx;m_handle=netlib_connect(server_ip,server_port,imconn_callback,(void*)&g_file_server_conn_map);if(m_handle!=netlib_invalid_handle){g_file_server_conn_map.insert(make_pair(m_handle,this));}}
net_handle_tnetlib_connect(constchar*server_ip,uint16_tport,callback_tcallback,void*callback_data){cbasesocket*psocket=newcbasesocket();if(!psocket)returnnetlib_invalid_handle;net_handle_thandle=psocket->connect(server_ip,port,callback,callback_data);if(handle==netlib_invalid_handle)deletepsocket;returnhandle;}
net_handle_tcbasesocket::connect(constchar*server_ip,uint16_tport,callback_tcallback,void*callback_data){log("cbasesocket::connect,server_ip=%s,port=%d",server_ip,port);m_remote_ip=server_ip;m_remote_port=port;m_callback=callback;m_callback_data=callback_data;m_socket=socket(af_inet,sock_stream,0);if(m_socket==invalid_socket){log("socketfailed,err_code=%d",_geterrorcode());returnnetlib_invalid_handle;}_setnonblock(m_socket);_setnodelay(m_socket);sockaddr_inserv_addr;_setaddr(server_ip,port,&serv_addr);intret=connect(m_socket,(sockaddr*)&serv_addr,sizeof(serv_addr));if((ret==socket_error)&&(!_isblock(_geterrorcode()))){log("connectfailed,err_code=%d",_geterrorcode());closesocket(m_socket);returnnetlib_invalid_handle;}m_state=socket_state_connecting;addbasesocket(this);ceventdispatch::instance()->addevent(m_socket,socket_all);return(net_handle_t)m_socket;}注意这里有以下几点:
1.将socket设置成非阻塞的。这样如果底层连接函数connect()不能立马完成,connect会立刻返回。
2.将socket的状态设置成socket_state_connecting。
3.addbasesocket(this)将该socket加入一个全局map中。
4.关注该socket的所有事件(socket_all)。
enum{socket_read=0x1,socket_write=0x2,socket_excep=0x4,socket_all=0x7};
因为socket是非阻塞,所以connect可能没连接成功,也会立即返回。那连接成功以后,我们如何得知呢?还记得上一篇文章中介绍的主线程的消息泵吗?teamtalk每个服务的主线程都有一个这样的消息泵:
while(退出条件){//1.遍历定时器队列,检测是否有定时器事件到期,有则执行定时器的回调函数//2.遍历其他任务队列,检测是否有其他任务需要执行,有,执行之//3.检测socket集合,分离可读、可写和异常事件//4.处理socket可读事件//5.处理socket可写事件//6.处理socket异常事件}当socket连接成功以后,该socket立马会变的可写。此时会触发第5步中的可写事件:
voidcbasesocket::onwrite(){#if((defined_win32)||(defined__apple__))ceventdispatch::instance()->removeevent(m_socket,socket_write);#endifif(m_state==socket_state_connecting){interror=0;socklen_tlen=sizeof(error);#ifdef_win32getsockopt(m_socket,sol_socket,so_error,(char*)&error,&len);#elsegetsockopt(m_socket,sol_socket,so_error,(void*)&error,&len);#endifif(error){m_callback(m_callback_data,netlib_msg_close,(net_handle_t)m_socket,null);}else{m_state=socket_state_connected;m_callback(m_callback_data,netlib_msg_confirm,(net_handle_t)m_socket,null);}}else{m_callback(m_callback_data,netlib_msg_write,(net_handle_t)m_socket,null);}}由于该socket的状态是socket_state_connecting,会走第一个if分支。在不出错的情况下,以参数netlib_msg_confirm调用之前设置的回调函数imconn_callback。
voidimconn_callback(void*callback_data,uint8_tmsg,uint32_thandle,void*pparam){notused_arg(handle);notused_arg(pparam);if(!callback_data)return;connmap_t*conn_map=(connmap_t*)callback_data;cimconn*pconn=findimconn(conn_map,handle);if(!pconn)return;//log("msg=%d,handle=%d",msg,handle);switch(msg){casenetlib_msg_confirm:pconn->onconfirm();break;casenetlib_msg_read:pconn->onread();break;casenetlib_msg_write:pconn->onwrite();break;casenetlib_msg_close:pconn->onclose();break;default:log("!!!imconn_callbackerrormsg:%d",msg);break;}pconn->releaseref();}这次走pconn->onconfirm();分支,由于pconn实际是cimconn的子类对象,根据c++多态性,会调用cfileservconn的onconfirm()函数:
voidcfileservconn::onconfirm(){log("connecttofileserversuccess");m_bopen=true;m_connect_time=get_tick_count();g_file_server_list[m_serv_idx].reconnect_cnt=min_reconnect_cnt/2;//连上file_server以后,给file_server发送获取ip地址的数据包im::server::imfileserveripreqmsg;cimpdupdu;pdu.setpbmsg(&msg);pdu.setserviceid(sid_other);pdu.setcommandid(cid_other_file_server_ip_req);sendpdu(&pdu);}连接上file_server后,msg_server会立即给file_server发一个数据包,以获得file_server的ip地址等信息。
这就是msg_server作为客户端连接其他服务的流程。与这些服务之间的连接都对应一个连接对象:
file_servercfileservconn
db_proxy_servercdbservconn
login_servercloginservconn
route_servercrouteservconn
push_servercpushservconn
而且,和连接file_server一样,msg_server在连接这些服务成功以后,可能会需要将自己的一些状态信息告诉对方:
voidcloginservconn::onconfirm(){log("connecttologinserversuccess");m_bopen=true;g_login_server_list[m_serv_idx].reconnect_cnt=min_reconnect_cnt/2;uint32_tcur_conn_cnt=0;uint32_tshop_user_cnt=0;//连接login_server成功以后,告诉login_server自己的ip地址、端口号//和当前登录的用户数量和可容纳的最大用户数量list
voidcrouteservconn::onconfirm(){log("connecttorouteserversuccess");m_bopen=true;m_connect_time=get_tick_count();g_route_server_list[m_serv_idx].reconnect_cnt=min_reconnect_cnt/2;if(g_master_rs_conn==null){update_master_route_serv_conn();}//连接route_server成功以后,给route_server发包告诉当前登录在本msg_server上有哪些//用户(用户id、用户状态、用户客户端类型)list
voidinit_file_serv_conn(serv_info_t*server_list,uint32_tserver_count){g_file_server_list=server_list;g_file_server_count=server_count;serv_init
voidfile_server_conn_timer_callback(void*callback_data,uint8_tmsg,uint32_thandle,void*pparam){connmap_t::iteratorit_old;cfileservconn*pconn=null;uint64_tcur_time=get_tick_count();for(connmap_t::iteratorit=g_file_server_conn_map.begin();it!=g_file_server_conn_map.end();){it_old=it;it++;pconn=(cfileservconn*)it_old->second;pconn->ontimer(cur_time);}//reconnectfileserverserv_check_reconnect
voidcfileservconn::ontimer(uint64_tcurr_tick){if(curr_tick>m_last_send_tick+server_heartbeat_interval){im::other::imheartbeatmsg;cimpdupdu;pdu.setpbmsg(&msg);pdu.setserviceid(sid_other);pdu.setcommandid(cid_other_heartbeat);sendpdu(&pdu);}if(curr_tick>m_last_recv_tick+server_timeout){log("conntofileservertimeout");close();}}
voidcfileservconn::handlepdu(cimpdu*ppdu){switch(ppdu->getcommandid()){casecid_other_heartbeat:break;casecid_other_file_transfer_rsp:_handlefilemsgtransrsp(ppdu);break;casecid_other_file_server_ip_rsp:_handlefileserveriprsp(ppdu);break;default:log("unknowncmdid=%d",ppdu->getcommandid());break;}}