TeamTalk源码分析(五) —— 服务器端msg_server源码分析

阅读:922 2019-03-20 16:45:58 来源:新网

在分析msg_server的源码之前,我们先简单地回顾一下msg_server在整个服务器系统中的位置和作用:

各个服务程序的作用描述如下:

从上面的介绍中,我们可以看出teamtalk是支持分布式部署的一套聊天服务器程序,通过分布式部署可以实现分流和支持高数量的用户同时在线。msg_server是整个服务体系的核心系统,可以部署多个,不同的用户可以登录不同的msg_server。这套体系有如下几大亮点:

1.login_server可以根据当前各个msg_server上在线用户数量,来决定一个新用户登录到哪个msg_server,从而实现了负载平衡;

2.route_server可以将登录在不同的msg_server上的用户的聊天消息发给目标用户;

3.通过单独的一个数据库操作服务器db_proxy_server,避免了msg_server直接操作数据库,将数据库操作的入口封装起来。

在前一篇文章《teamtalk源码分析(四)——服务器端db_proxy_server源码分析》中,我介绍了每个服务如何接收连接、读取数据并解包、以及组装数据包发包的操作,这篇文章我将介绍作为客户端,一个服务如何连接另外一个服务。这里msg_server在启动时会同时连接db_proxy_server,login_server,file_server,route_server,push_server。在msg_server服务main函数里面有如下初始化调用:

//连接file_serverinit_file_serv_conn(file_server_list,file_server_count);//连接db_proxy_serverinit_db_serv_conn(db_server_list2,db_server_count2,concurrent_db_conn_cnt);//连接login_serverinit_login_serv_conn(login_server_list,login_server_count,ip_addr1,ip_addr2,listen_port,max_conn_cnt);//连接push_serverinit_route_serv_conn(route_server_list,route_server_count);//连接push_serverinit_push_serv_conn(push_server_list,push_server_count);其中每个连接服务的流程都是一样的。我们这里以第一个连接file_server为例:

voidinit_file_serv_conn(serv_info_t*server_list,uint32_tserver_count){g_file_server_list=server_list;g_file_server_count=server_count;serv_init(g_file_server_list,g_file_server_count);netlib_register_timer(file_server_conn_timer_callback,null,1000);s_file_handler=cfilehandler::getinstance();}

templatevoidserv_init(serv_info_t*server_list,uint32_tserver_count){for(uint32_ti=0;iconnect(server_list[i].server_ip.c_str(),server_list[i].server_port,i);server_list[i].serv_conn=pconn;server_list[i].idle_cnt=0;server_list[i].reconnect_cnt=min_reconnect_cnt/2;}}模板函数serv_init展开参数后实际上是调用cfileservconn->connect(),我们看这个函数的调用:

voidcfileservconn::connect(constchar*server_ip,uint16_tserver_port,uint32_tidx){log("connectingtofileserver%s:%d",server_ip,server_port);m_serv_idx=idx;m_handle=netlib_connect(server_ip,server_port,imconn_callback,(void*)&g_file_server_conn_map);if(m_handle!=netlib_invalid_handle){g_file_server_conn_map.insert(make_pair(m_handle,this));}}

net_handle_tnetlib_connect(constchar*server_ip,uint16_tport,callback_tcallback,void*callback_data){cbasesocket*psocket=newcbasesocket();if(!psocket)returnnetlib_invalid_handle;net_handle_thandle=psocket->connect(server_ip,port,callback,callback_data);if(handle==netlib_invalid_handle)deletepsocket;returnhandle;}

net_handle_tcbasesocket::connect(constchar*server_ip,uint16_tport,callback_tcallback,void*callback_data){log("cbasesocket::connect,server_ip=%s,port=%d",server_ip,port);m_remote_ip=server_ip;m_remote_port=port;m_callback=callback;m_callback_data=callback_data;m_socket=socket(af_inet,sock_stream,0);if(m_socket==invalid_socket){log("socketfailed,err_code=%d",_geterrorcode());returnnetlib_invalid_handle;}_setnonblock(m_socket);_setnodelay(m_socket);sockaddr_inserv_addr;_setaddr(server_ip,port,&serv_addr);intret=connect(m_socket,(sockaddr*)&serv_addr,sizeof(serv_addr));if((ret==socket_error)&&(!_isblock(_geterrorcode()))){log("connectfailed,err_code=%d",_geterrorcode());closesocket(m_socket);returnnetlib_invalid_handle;}m_state=socket_state_connecting;addbasesocket(this);ceventdispatch::instance()->addevent(m_socket,socket_all);return(net_handle_t)m_socket;}注意这里有以下几点:

1.将socket设置成非阻塞的。这样如果底层连接函数connect()不能立马完成,connect会立刻返回。

2.将socket的状态设置成socket_state_connecting。

3.addbasesocket(this)将该socket加入一个全局map中。

4.关注该socket的所有事件(socket_all)。

enum{socket_read=0x1,socket_write=0x2,socket_excep=0x4,socket_all=0x7};

因为socket是非阻塞,所以connect可能没连接成功,也会立即返回。那连接成功以后,我们如何得知呢?还记得上一篇文章中介绍的主线程的消息泵吗?teamtalk每个服务的主线程都有一个这样的消息泵:

while(退出条件){//1.遍历定时器队列,检测是否有定时器事件到期,有则执行定时器的回调函数//2.遍历其他任务队列,检测是否有其他任务需要执行,有,执行之//3.检测socket集合,分离可读、可写和异常事件//4.处理socket可读事件//5.处理socket可写事件//6.处理socket异常事件}当socket连接成功以后,该socket立马会变的可写。此时会触发第5步中的可写事件:

voidcbasesocket::onwrite(){#if((defined_win32)||(defined__apple__))ceventdispatch::instance()->removeevent(m_socket,socket_write);#endifif(m_state==socket_state_connecting){interror=0;socklen_tlen=sizeof(error);#ifdef_win32getsockopt(m_socket,sol_socket,so_error,(char*)&error,&len);#elsegetsockopt(m_socket,sol_socket,so_error,(void*)&error,&len);#endifif(error){m_callback(m_callback_data,netlib_msg_close,(net_handle_t)m_socket,null);}else{m_state=socket_state_connected;m_callback(m_callback_data,netlib_msg_confirm,(net_handle_t)m_socket,null);}}else{m_callback(m_callback_data,netlib_msg_write,(net_handle_t)m_socket,null);}}由于该socket的状态是socket_state_connecting,会走第一个if分支。在不出错的情况下,以参数netlib_msg_confirm调用之前设置的回调函数imconn_callback。

voidimconn_callback(void*callback_data,uint8_tmsg,uint32_thandle,void*pparam){notused_arg(handle);notused_arg(pparam);if(!callback_data)return;connmap_t*conn_map=(connmap_t*)callback_data;cimconn*pconn=findimconn(conn_map,handle);if(!pconn)return;//log("msg=%d,handle=%d",msg,handle);switch(msg){casenetlib_msg_confirm:pconn->onconfirm();break;casenetlib_msg_read:pconn->onread();break;casenetlib_msg_write:pconn->onwrite();break;casenetlib_msg_close:pconn->onclose();break;default:log("!!!imconn_callbackerrormsg:%d",msg);break;}pconn->releaseref();}这次走pconn->onconfirm();分支,由于pconn实际是cimconn的子类对象,根据c++多态性,会调用cfileservconn的onconfirm()函数:

voidcfileservconn::onconfirm(){log("connecttofileserversuccess");m_bopen=true;m_connect_time=get_tick_count();g_file_server_list[m_serv_idx].reconnect_cnt=min_reconnect_cnt/2;//连上file_server以后,给file_server发送获取ip地址的数据包im::server::imfileserveripreqmsg;cimpdupdu;pdu.setpbmsg(&msg);pdu.setserviceid(sid_other);pdu.setcommandid(cid_other_file_server_ip_req);sendpdu(&pdu);}连接上file_server后,msg_server会立即给file_server发一个数据包,以获得file_server的ip地址等信息。

这就是msg_server作为客户端连接其他服务的流程。与这些服务之间的连接都对应一个连接对象:

file_servercfileservconn

db_proxy_servercdbservconn

login_servercloginservconn

route_servercrouteservconn

push_servercpushservconn

而且,和连接file_server一样,msg_server在连接这些服务成功以后,可能会需要将自己的一些状态信息告诉对方:

voidcloginservconn::onconfirm(){log("connecttologinserversuccess");m_bopen=true;g_login_server_list[m_serv_idx].reconnect_cnt=min_reconnect_cnt/2;uint32_tcur_conn_cnt=0;uint32_tshop_user_cnt=0;//连接login_server成功以后,告诉login_server自己的ip地址、端口号//和当前登录的用户数量和可容纳的最大用户数量listuser_conn_list;cimusermanager::getinstance()->getuserconncnt(&user_conn_list,cur_conn_cnt);charhostname[256]={0};gethostname(hostname,256);im::server::immsgservinfomsg;msg.set_ip1(g_msg_server_ip_addr1);msg.set_ip2(g_msg_server_ip_addr2);msg.set_port(g_msg_server_port);msg.set_max_conn_cnt(g_max_conn_cnt);msg.set_cur_conn_cnt(cur_conn_cnt);msg.set_host_name(hostname);cimpdupdu;pdu.setpbmsg(&msg);pdu.setserviceid(sid_other);pdu.setcommandid(cid_other_msg_serv_info);sendpdu(&pdu);}连接route_server成功以后,给route_server发包告诉当前登录在本msg_server上有哪些用户(用户id、用户状态、用户客户端类型)。这样将来a用户给b发聊天消息,msg_server将该聊天消息转给route_server,route_server就知道用户b在哪个msg_server上了,以便将该聊天消息发给b所在的msg_server。

voidcrouteservconn::onconfirm(){log("connecttorouteserversuccess");m_bopen=true;m_connect_time=get_tick_count();g_route_server_list[m_serv_idx].reconnect_cnt=min_reconnect_cnt/2;if(g_master_rs_conn==null){update_master_route_serv_conn();}//连接route_server成功以后,给route_server发包告诉当前登录在本msg_server上有哪些//用户(用户id、用户状态、用户客户端类型)listonline_user_list;cimusermanager::getinstance()->getonlineuserinfo(&online_user_list);im::server::imonlineuserinfomsg;for(list::iteratorit=online_user_list.begin();it!=online_user_list.end();it++){user_stat_tuser_stat=*it;im::basedefine::serveruserstat*server_user_stat=msg.add_user_stat_list();server_user_stat->set_user_id(user_stat.user_id);server_user_stat->set_status((::im::basedefine::userstattype)user_stat.status);server_user_stat->set_client_type((::im::basedefine::clienttype)user_stat.client_type);}cimpdupdu;pdu.setpbmsg(&msg);pdu.setserviceid(sid_other);pdu.setcommandid(cid_other_online_user_info);sendpdu(&pdu);}再来提一下,心跳包机制,和上一篇文章中介绍个与db_proxy_server一样,都是在定时器里面做的,这里不再赘述了,简单地贴出与file_server的心跳包代码吧:

voidinit_file_serv_conn(serv_info_t*server_list,uint32_tserver_count){g_file_server_list=server_list;g_file_server_count=server_count;serv_init(g_file_server_list,g_file_server_count);netlib_register_timer(file_server_conn_timer_callback,null,1000);s_file_handler=cfilehandler::getinstance();}

voidfile_server_conn_timer_callback(void*callback_data,uint8_tmsg,uint32_thandle,void*pparam){connmap_t::iteratorit_old;cfileservconn*pconn=null;uint64_tcur_time=get_tick_count();for(connmap_t::iteratorit=g_file_server_conn_map.begin();it!=g_file_server_conn_map.end();){it_old=it;it++;pconn=(cfileservconn*)it_old->second;pconn->ontimer(cur_time);}//reconnectfileserverserv_check_reconnect(g_file_server_list,g_file_server_count);}在注册的定时器回调函数里面调用cfileservconn::ontimer函数:

voidcfileservconn::ontimer(uint64_tcurr_tick){if(curr_tick>m_last_send_tick+server_heartbeat_interval){im::other::imheartbeatmsg;cimpdupdu;pdu.setpbmsg(&msg);pdu.setserviceid(sid_other);pdu.setcommandid(cid_other_heartbeat);sendpdu(&pdu);}if(curr_tick>m_last_recv_tick+server_timeout){log("conntofileservertimeout");close();}}

voidcfileservconn::handlepdu(cimpdu*ppdu){switch(ppdu->getcommandid()){casecid_other_heartbeat:break;casecid_other_file_transfer_rsp:_handlefilemsgtransrsp(ppdu);break;casecid_other_file_server_ip_rsp:_handlefileserveriprsp(ppdu);break;default:log("unknowncmdid=%d",ppdu->getcommandid());break;}}

相关文章
{{ v.title }}
{{ v.description||(cleanHtml(v.content)).substr(0,100)+'···' }}
你可能感兴趣
推荐阅读 更多>
推荐商标

{{ v.name }}

{{ v.cls }}类

立即购买 联系客服