标签:
转载请声明出处:http://blog.csdn.net/u012377333
本文简介:yrtpchan是一个关于yate自定义的处理rtp协议的模块(关于rtp协议的知识,大家可以网上百度、谷歌)。本文主要是对于yrtpchan处理rtp.chan消息做了简要的分析和理解。
对于yate的每个模块如何去分析和查看(因为没有main函数和其他的函数调用该模块的函数,所以一般常规的阅读代码的方式不是很好),首先是从继承Module类去查看。关于yrtpchan模块,就是从这里开始查看了:
class YRTPPlugin : public Module { public: YRTPPlugin(); virtual ~YRTPPlugin(); virtual void initialize(); virtual bool received(Message& msg, int id); virtual void statusParams(String& str); virtual void statusDetail(String& str); virtual void genUpdate(Message& msg); private: bool reflectSetup(Message& msg, const char* id, RTPTransport& rtp, const char* rHost, const char* leg); bool reflectStart(Message& msg, const char* id, RTPTransport& rtp, SocketAddr& rAddr); void reflectDrop(YRTPReflector*& refl, Lock& mylock); void reflectExecute(Message& msg); void reflectAnswer(Message& msg, bool ignore); void reflectHangup(Message& msg); bool m_first; };还定义了一个静态的全局变量
static YRTPPlugin splugin;
<pre name="code" class="cpp">void YRTPPlugin::initialize() { Output("Initializing module YRTP"); Configuration cfg(Engine::configFile("yrtpchan")); s_ipv6 = SocketAddr::supports(SocketAddr::IPv6) && cfg.getBoolValue("general","ipv6_support",false); s_minport = cfg.getIntValue("general","minport",MIN_PORT); s_maxport = cfg.getIntValue("general","maxport",MAX_PORT); s_bufsize = cfg.getIntValue("general","buffer",BUF_SIZE); s_minJitter = cfg.getIntValue("general","minjitter",50); s_maxJitter = cfg.getIntValue("general","maxjitter",Engine::clientMode() ? 120 : 0); s_tos = cfg.getIntValue("general","tos",Socket::tosValues()); s_udpbuf = cfg.getIntValue("general","udpbuf",0); s_localip = cfg.getValue("general","localip"); s_autoaddr = cfg.getBoolValue("general","autoaddr",true); s_anyssrc = cfg.getBoolValue("general","anyssrc",true); s_padding = cfg.getIntValue("general","padding",0); s_rtcp = cfg.getBoolValue("general","rtcp",true); s_interval = cfg.getIntValue("general","rtcp_interval",4500); s_drill = cfg.getBoolValue("general","drillhole",Engine::clientMode()); s_monitor = cfg.getBoolValue("general","monitoring",false); s_sleep = cfg.getIntValue("general","defsleep",5); RTPGroup::setMinSleep(cfg.getIntValue("general","minsleep")); s_priority = Thread::priority(cfg.getValue("general","thread")); s_rtpWarnSeq = cfg.getBoolValue("general","rtp_warn_seq",true); s_timeout = cfg.getIntValue("timeouts","timeout",3000); s_udptlTimeout = cfg.getIntValue("timeouts","udptl_timeout",25000); s_notifyMsg = cfg.getValue("timeouts","notifymsg"); s_warnFirst = cfg.getBoolValue("timeouts","warnfirst",true); s_warnLater = cfg.getBoolValue("timeouts","warnlater",false); setup(); if (m_first) { m_first = false; installRelay(Execute,50); installRelay(Ringing,50); installRelay(Progress,50); installRelay(Answered,50); installRelay(Private,"chan.hangup",50); Engine::install(new AttachHandler); Engine::install(new RtpHandler); Engine::install(new DTMFHandler); } }读取配置文件里面的参数,如果没有配置文件的话,使用的都是默认参数
Configuration cfg(Engine::configFile("yrtpchan")); s_ipv6 = SocketAddr::supports(SocketAddr::IPv6) && cfg.getBoolValue("general","ipv6_support",false); s_minport = cfg.getIntValue("general","minport",MIN_PORT); s_maxport = cfg.getIntValue("general","maxport",MAX_PORT); s_bufsize = cfg.getIntValue("general","buffer",BUF_SIZE); s_minJitter = cfg.getIntValue("general","minjitter",50); s_maxJitter = cfg.getIntValue("general","maxjitter",Engine::clientMode() ? 120 : 0); s_tos = cfg.getIntValue("general","tos",Socket::tosValues()); s_udpbuf = cfg.getIntValue("general","udpbuf",0); s_localip = cfg.getValue("general","localip"); s_autoaddr = cfg.getBoolValue("general","autoaddr",true); s_anyssrc = cfg.getBoolValue("general","anyssrc",true); s_padding = cfg.getIntValue("general","padding",0); s_rtcp = cfg.getBoolValue("general","rtcp",true); s_interval = cfg.getIntValue("general","rtcp_interval",4500); s_drill = cfg.getBoolValue("general","drillhole",Engine::clientMode()); s_monitor = cfg.getBoolValue("general","monitoring",false); s_sleep = cfg.getIntValue("general","defsleep",5); RTPGroup::setMinSleep(cfg.getIntValue("general","minsleep")); s_priority = Thread::priority(cfg.getValue("general","thread")); s_rtpWarnSeq = cfg.getBoolValue("general","rtp_warn_seq",true); s_timeout = cfg.getIntValue("timeouts","timeout",3000); s_udptlTimeout = cfg.getIntValue("timeouts","udptl_timeout",25000); s_notifyMsg = cfg.getValue("timeouts","notifymsg"); s_warnFirst = cfg.getBoolValue("timeouts","warnfirst",true); s_warnLater = cfg.getBoolValue("timeouts","warnlater",false);
installRelay(Execute,50); installRelay(Ringing,50); installRelay(Progress,50); installRelay(Answered,50); installRelay(Private,"chan.hangup",50); Engine::install(new AttachHandler); Engine::install(new RtpHandler); Engine::install(new DTMFHandler);这里我们重点查看处理RTP消息的类
class RtpHandler : public MessageHandler { public: RtpHandler() : MessageHandler("chan.rtp",100,splugin.name()) { } virtual bool received(Message &msg); };每当有模块发布消息"chan.rtp"的时候,都会进入received函数
bool RtpHandler::received(Message &msg) { bool udptl = false; const String& trans = msg[YSTRING("transport")]; if (trans && !trans.startsWith("RTP/")) { if (trans &= "udptl") udptl = true; else return false; } Debug(&splugin,DebugAll,"%s message received",(trans ? trans.c_str() : "No-transport")); bool terminate = msg.getBoolValue(YSTRING("terminate"),false); const String& dir = msg[YSTRING("direction")]; RTPSession::Direction direction = terminate ? RTPSession::FullStop : RTPSession::SendRecv; bool d_recv = false; bool d_send = false; if (dir == YSTRING("bidir")) { d_recv = true; d_send = true; } else if (dir == YSTRING("receive")) { d_recv = true; direction = RTPSession::RecvOnly; } else if (dir == YSTRING("send")) { d_send = true; direction = RTPSession::SendOnly; } CallEndpoint* ch = YOBJECT(CallEndpoint,msg.userData()); DataEndpoint* de = YOBJECT(DataEndpoint,msg.userData()); const char* media = udptl ? "image" : "audio"; media = msg.getValue(YSTRING("media"),(de ? de->name().c_str() : media)); RefPointer<YRTPWrapper> w = YRTPWrapper::find(ch,media); if (w) Debug(&splugin,DebugAll,"Wrapper %p found by CallEndpoint %p",(YRTPWrapper*)w,ch); else { const String& rid = msg[YSTRING("rtpid")]; w = YRTPWrapper::find(rid); if (w) Debug(&splugin,DebugAll,"Wrapper %p found by ID '%s'",(YRTPWrapper*)w,rid.c_str()); } if (w) w->deref(); if (terminate) { if (w) { if (w->host()) msg.setParam("localip",w->host()); if (w->port()) msg.setParam("localport",String(w->port())); w->terminate(msg); msg.setParam("status","terminated"); return true; } return false; } if (!(ch || de || w)) { Debug(&splugin,DebugWarn,"Neither call channel nor RTP wrapper found!"); return false; } const String& rip = msg[YSTRING("remoteip")]; const char* status = "updated"; if (!w) { // it would be pointless to create an unreferenced wrapper if (!(d_recv || d_send)) return false; String lip(msg.getValue(YSTRING("localip"))); bool ipv6 = msg.getBoolValue(YSTRING("ipv6_support"),s_ipv6); if (lip.null()) YRTPWrapper::guessLocal(rip,lip,ipv6); if (lip.null()) { Debug(&splugin,DebugWarn,"RTP request with no local address!"); return false; } status = "created"; w = new YRTPWrapper(lip,ch,media,direction,msg,udptl,ipv6); w->setMaster(msg.getValue(YSTRING("id"))); w->deref(); } else if (w->valid()) w->addDirection(direction); else return false; if (d_recv) { if (ch && !ch->getSource(media)) { DataSource* s = w->getSource(); ch->setSource(s,media); s->deref(); } else if (de && !de->getSource()) { DataSource* s = w->getSource(); de->setSource(s); s->deref(); } } if (d_send) { if (ch && !ch->getConsumer(media)) { DataConsumer* c = w->getConsumer(); ch->setConsumer(c,media); c->deref(); } else if (de && !de->getConsumer()) { DataConsumer* c = w->getConsumer(); de->setConsumer(c); c->deref(); } } if (w->refcount() <= 1) return false; w->setParams(rip,msg); w->setFaxDivert(msg); msg.setParam("localip",w->host()); msg.setParam("localport",String(w->port())); msg.setParam("rtpid",w->id()); msg.setParam("status",status); if (msg.getBoolValue(YSTRING("getsession"),!msg.userData())) msg.userData(w); return true; }这个里面的内容比较多,我感觉可以分成两个部分来查看:
一个是
RefPointer<YRTPWrapper> w = YRTPWrapper::find(ch,media);
<pre name="code" class="cpp">w = YRTPWrapper::find(rid);
另外一个
w = new YRTPWrapper(lip,ch,media,direction,msg,udptl,ipv6);
不知道大家有没有注意过,yate对于转发两个yate client的rtp包的时候,是创建了两个ip、port去对应两个客户端的接受和发送。这里举一个比较简单的例子:
S是服务器,A和C是客户端,假如是A呼叫C,从S上面抓包(rtp)的格式大致是这样的:
a.ip:port-------------------------------------------->s.ip2:port2
s.ip1:port1----------------------------------------->c.ip:port
s.ip2:port2----------------------------------------->a.ip:port
c.ip:port--------------------------------------------->s.ip1:port
思考:
也就是说服务器创建了两个rtp的套接字去发送和接受两个客户端的媒体数据,为什么会这样?为什么不是一个套接字来处理A到C的媒体数据另外一个套接字来处理C到A的媒体数据?
未完,待续......
版权声明:本文为博主原创文章,未经博主允许不得转载。
标签:
原文地址:http://blog.csdn.net/u012377333/article/details/46730467