login_server从严格意义上来说,是一个登录分流器,所以名字起的有点名不符实。该服务根据已知的msg_server上的在线用户数量来返回告诉一个即将登录的用户登录哪个msg_server比较合适。关于其程序框架的非业务代码我们已经在前面的两篇文章《teamtalk源码分析(四)——服务器端db_proxy_server源码分析》和《teamtalk源码分析(五)——服务器端msg_server源码分析》中介绍过了。这篇文章主要介绍下其业务代码。
首先,程序初始化的时候,会初始化如下功能:
//1.在8080端口监听客户端连接//2.在8100端口上监听msg_server的连接//3.在8080端口上监听客户端http连接其中连接对象cloginconn代表着login_server与msg_server之间的连接;而chttpconn代表着与客户端的http连接。我们先来看cloginconn对象,上一篇文章中也介绍了其业务代码主要在其handlepdu()函数中,可以看到这路连接主要处理哪些数据包:
voidcloginconn::handlepdu(cimpdu*ppdu){switch(ppdu->getcommandid()){casecid_other_heartbeat:break;casecid_other_msg_serv_info:_handlemsgservinfo(ppdu);break;casecid_other_user_cnt_update:_handleusercntupdate(ppdu);break;casecid_login_req_msgserver:_handlemsgservrequest(ppdu);break;default:log("wrongmsg,cmdid=%d",ppdu->getcommandid());break;}}命令号cid_other_heartbeat是与msg_server的心跳包。上一篇文章《teamtalk源码分析(五)——服务器端msg_server源码分析》中介绍过,msg_server连上login_server后会立刻给login_server发一个数据包,该数据包里面含有该msg_server上的用户数量、最大可容纳的用户数量、自己的ip地址和端口号。
listuser_conn_list;cimusermanager::getinstance()->getuserconncnt(&user_conn_list,cur_conn_cnt);charhostname[256]={0};gethostname(hostname,256);im::server::immsgservinfomsg;msg.set_ip1(g_msg_server_ip_addr1);msg.set_ip2(g_msg_server_ip_addr2);msg.set_port(g_msg_server_port);msg.set_max_conn_cnt(g_max_conn_cnt);msg.set_cur_conn_cnt(cur_conn_cnt);msg.set_host_name(hostname);cimpdupdu;pdu.setpbmsg(&msg);pdu.setserviceid(sid_other);pdu.setcommandid(cid_other_msg_serv_info);sendpdu(&pdu);命令号是cid_other_msg_serv_info。我们来看下login_server如何处理这个命令的:
voidcloginconn::_handlemsgservinfo(cimpdu*ppdu){msg_serv_info_t*pmsgservinfo=newmsg_serv_info_t;im::server::immsgservinfomsg;msg.parsefromarray(ppdu->getbodydata(),ppdu->getbodylength());pmsgservinfo->ip_addr1=msg.ip1();pmsgservinfo->ip_addr2=msg.ip2();pmsgservinfo->port=msg.port();pmsgservinfo->max_conn_cnt=msg.max_conn_cnt();pmsgservinfo->cur_conn_cnt=msg.cur_conn_cnt();pmsgservinfo->hostname=msg.host_name();g_msg_serv_info.insert(make_pair(m_handle,pmsgservinfo));g_total_online_user_cnt+=pmsgservinfo->cur_conn_cnt;log("msgservinfo,ip_addr1=%s,ip_addr2=%s,port=%d,max_conn_cnt=%d,cur_conn_cnt=%d,""hostname:%s.",pmsgservinfo->ip_addr1.c_str(),pmsgservinfo->ip_addr2.c_str(),pmsgservinfo->port,pmsgservinfo->max_conn_cnt,pmsgservinfo->cur_conn_cnt,pmsgservinfo->hostname.c_str());}其实所做的工作无非就是记录下的该msg_server上的ip、端口号、在线用户数量和最大可容纳用户数量等信息而已。存在一个全局map里面:
mapg_msg_serv_info;
typedefstruct{stringip_addr1;//电信ipstringip_addr2;//网通ipuint16_tport;uint32_tmax_conn_cnt;uint32_tcur_conn_cnt;stringhostname;//消息服务器的主机名}msg_serv_info_t;另外一个命令号cid_other_user_cnt_update,是当msg_server上的用户上线或下线时,msg_server给login_server发该类型的命令号,让login_server更新保存的msg_server的上的在线用户数量:
voidcloginconn::_handleusercntupdate(cimpdu*ppdu){map::iteratorit=g_msg_serv_info.find(m_handle);if(it!=g_msg_serv_info.end()){msg_serv_info_t*pmsgservinfo=it->second;im::server::imusercntupdatemsg;msg.parsefromarray(ppdu->getbodydata(),ppdu->getbodylength());uint32_taction=msg.user_action();if(action==user_cnt_inc){pmsgservinfo->cur_conn_cnt++;g_total_online_user_cnt++;}else{pmsgservinfo->cur_conn_cnt--;g_total_online_user_cnt--;}log("%s:%d,cur_cnt=%u,total_cnt=%u",pmsgservinfo->hostname.c_str(),pmsgservinfo->port,pmsgservinfo->cur_conn_cnt,g_total_online_user_cnt);}}命令号cid_login_req_msgserver没用到。
接着说login_server与客户端的http连接处理,这个连接收取数据和解包是直接在chttpconn的onread函数里面处理的:
voidchttpconn::onread(){for(;;){uint32_tfree_buf_len=m_in_buf.getallocsize()-m_in_buf.getwriteoffset();if(free_buf_len1024){log("gettoomuchdata:%s",in_buf);close();return;}//log("onread,buf_len=%u,conn_handle=%un",buf_len,m_conn_handle);//fordebugm_chttpparser.parsehttpcontent(in_buf,buf_len);if(m_chttpparser.isreadall()){stringurl=m_chttpparser.geturl();if(strncmp(url.c_str(),"/msg_server",11)==0){stringcontent=m_chttpparser.getbodycontent();_handlemsgservrequest(url,content);}else{log("urlunknown,url=%s",url.c_str());close();}}}
voidchttpconn::_handlemsgservrequest(string&url,string&post_data){msg_serv_info_t*pmsgservinfo;uint32_tmin_user_cnt=(uint32_t)-1;map::iteratorit_min_conn=g_msg_serv_info.end();map::iteratorit;if(g_msg_serv_info.size()<=0){json::valuevalue;value["code"]=1;value["msg"]="没有msg_server";stringstrcontent=value.tostyledstring();char*szcontent=newchar[http_response_html_max];snprintf(szcontent,http_response_html_max,http_response_html,strcontent.length(),strcontent.c_str());send((void*)szcontent,strlen(szcontent));delete[]szcontent;return;}for(it=g_msg_serv_info.begin();it!=g_msg_serv_info.end();it++){pmsgservinfo=it->second;if((pmsgservinfo->cur_conn_cntmax_conn_cnt)&&(pmsgservinfo->cur_conn_cntcur_conn_cnt;}}if(it_min_conn==g_msg_serv_info.end()){log("alltcpmsgserverarefull");json::valuevalue;value["code"]=2;value["msg"]="负载过高";stringstrcontent=value.tostyledstring();char*szcontent=newchar[http_response_html_max];snprintf(szcontent,http_response_html_max,http_response_html,strcontent.length(),strcontent.c_str());send((void*)szcontent,strlen(szcontent));delete[]szcontent;return;}else{json::valuevalue;value["code"]=0;value["msg"]="";if(pipparser->istelcome(getpeerip())){value["priorip"]=string(it_min_conn->second->ip_addr1);value["backupip"]=string(it_min_conn->second->ip_addr2);value["msfsprior"]=strmsfsurl;value["msfsbackup"]=strmsfsurl;}else{value["priorip"]=string(it_min_conn->second->ip_addr2);value["backupip"]=string(it_min_conn->second->ip_addr1);value["msfsprior"]=strmsfsurl;value["msfsbackup"]=strmsfsurl;}value["discovery"]=strdiscovery;value["port"]=int2string(it_min_conn->second->port);stringstrcontent=value.tostyledstring();char*szcontent=newchar[http_response_html_max];uint32_tnlen=strcontent.length();snprintf(szcontent,http_response_html_max,http_response_html,nlen,strcontent.c_str());send((void*)szcontent,strlen(szcontent));delete[]szcontent;return;}}其实就是根据记录的msg_server的负载情况,返回一个可用的msg_serverip和端口给客户端,这是一个json格式:
{"backupip":"localhost","code":0,"discovery":"http://192.168.226.128/api/discovery","msfsbackup":"http://127.0.0.1:8700/","msfsprior":"http://127.0.0.1:8700/","msg":"","port":"8000","priorip":"localhost"}
发出去这个json之后会调用onwritecomplete()函数,这个函数立刻关闭该http连接,也就是说这个与客户端的http连接是短连接:
voidchttpconn::onwritecomlete(){log("writecomplete");close();}
login_server就这么多内容了。