码迷,mamicode.com
首页 > 其他好文 > 详细

yate学习--关于yrtpchan模块

时间:2015-07-02 22:42:09      阅读:310      评论:0      收藏:0      [点我收藏+]

标签:

    转载请声明出处:http://blog.csdn.net/u012377333

    本文简介:yrtpchan是一个关于yate自定义的处理rtp协议的模块(关于rtp协议的知识,大家可以网上百度、谷歌)。本文主要是对于yrtpchan处理rtp.chan消息做了简要的分析和理解。


    对于yate的每个模块如何去分析和查看(因为没有main函数和其他的函数调用该模块的函数,所以一般常规的阅读代码的方式不是很好),首先是从继承Module类去查看。关于yrtpchan模块,就是从这里开始查看了:

class YRTPPlugin : public Module
{
public:
    YRTPPlugin();
    virtual ~YRTPPlugin();
    virtual void initialize();
    virtual bool received(Message& msg, int id);
    virtual void statusParams(String& str);
    virtual void statusDetail(String& str);
    virtual void genUpdate(Message& msg);

private:
    bool reflectSetup(Message& msg, const char* id, RTPTransport& rtp, const char* rHost, const char* leg);
    bool reflectStart(Message& msg, const char* id, RTPTransport& rtp, SocketAddr& rAddr);
    void reflectDrop(YRTPReflector*& refl, Lock& mylock);
    void reflectExecute(Message& msg);
    void reflectAnswer(Message& msg, bool ignore);
    void reflectHangup(Message& msg);
    bool m_first;
};
还定义了一个静态的全局变量

static YRTPPlugin splugin;

在加载模块的时候会调用

<pre name="code" class="cpp">void YRTPPlugin::initialize()
{
    Output("Initializing module YRTP");
    Configuration cfg(Engine::configFile("yrtpchan"));
    s_ipv6 = SocketAddr::supports(SocketAddr::IPv6) &&
	cfg.getBoolValue("general","ipv6_support",false);
    s_minport = cfg.getIntValue("general","minport",MIN_PORT);
    s_maxport = cfg.getIntValue("general","maxport",MAX_PORT);
    s_bufsize = cfg.getIntValue("general","buffer",BUF_SIZE);
    s_minJitter = cfg.getIntValue("general","minjitter",50);
    s_maxJitter = cfg.getIntValue("general","maxjitter",Engine::clientMode() ? 120 : 0);
    s_tos = cfg.getIntValue("general","tos",Socket::tosValues());
    s_udpbuf = cfg.getIntValue("general","udpbuf",0);
    s_localip = cfg.getValue("general","localip");
    s_autoaddr = cfg.getBoolValue("general","autoaddr",true);
    s_anyssrc = cfg.getBoolValue("general","anyssrc",true);
    s_padding = cfg.getIntValue("general","padding",0);
    s_rtcp = cfg.getBoolValue("general","rtcp",true);
    s_interval = cfg.getIntValue("general","rtcp_interval",4500);
    s_drill = cfg.getBoolValue("general","drillhole",Engine::clientMode());
    s_monitor = cfg.getBoolValue("general","monitoring",false);
    s_sleep = cfg.getIntValue("general","defsleep",5);
    RTPGroup::setMinSleep(cfg.getIntValue("general","minsleep"));
    s_priority = Thread::priority(cfg.getValue("general","thread"));
    s_rtpWarnSeq = cfg.getBoolValue("general","rtp_warn_seq",true);
    s_timeout = cfg.getIntValue("timeouts","timeout",3000);
    s_udptlTimeout = cfg.getIntValue("timeouts","udptl_timeout",25000);
    s_notifyMsg = cfg.getValue("timeouts","notifymsg");
    s_warnFirst = cfg.getBoolValue("timeouts","warnfirst",true);
    s_warnLater = cfg.getBoolValue("timeouts","warnlater",false);
    setup();
    if (m_first) {
	m_first = false;
	installRelay(Execute,50);
	installRelay(Ringing,50);
	installRelay(Progress,50);
	installRelay(Answered,50);
	installRelay(Private,"chan.hangup",50);
	Engine::install(new AttachHandler);
	Engine::install(new RtpHandler);
	Engine::install(new DTMFHandler);
    }
}

读取配置文件里面的参数,如果没有配置文件的话,使用的都是默认参数

    Configuration cfg(Engine::configFile("yrtpchan"));
    s_ipv6 = SocketAddr::supports(SocketAddr::IPv6) &&
	cfg.getBoolValue("general","ipv6_support",false);
    s_minport = cfg.getIntValue("general","minport",MIN_PORT);
    s_maxport = cfg.getIntValue("general","maxport",MAX_PORT);
    s_bufsize = cfg.getIntValue("general","buffer",BUF_SIZE);
    s_minJitter = cfg.getIntValue("general","minjitter",50);
    s_maxJitter = cfg.getIntValue("general","maxjitter",Engine::clientMode() ? 120 : 0);
    s_tos = cfg.getIntValue("general","tos",Socket::tosValues());
    s_udpbuf = cfg.getIntValue("general","udpbuf",0);
    s_localip = cfg.getValue("general","localip");
    s_autoaddr = cfg.getBoolValue("general","autoaddr",true);
    s_anyssrc = cfg.getBoolValue("general","anyssrc",true);
    s_padding = cfg.getIntValue("general","padding",0);
    s_rtcp = cfg.getBoolValue("general","rtcp",true);
    s_interval = cfg.getIntValue("general","rtcp_interval",4500);
    s_drill = cfg.getBoolValue("general","drillhole",Engine::clientMode());
    s_monitor = cfg.getBoolValue("general","monitoring",false);
    s_sleep = cfg.getIntValue("general","defsleep",5);
    RTPGroup::setMinSleep(cfg.getIntValue("general","minsleep"));
    s_priority = Thread::priority(cfg.getValue("general","thread"));
    s_rtpWarnSeq = cfg.getBoolValue("general","rtp_warn_seq",true);
    s_timeout = cfg.getIntValue("timeouts","timeout",3000);
    s_udptlTimeout = cfg.getIntValue("timeouts","udptl_timeout",25000);
    s_notifyMsg = cfg.getValue("timeouts","notifymsg");
    s_warnFirst = cfg.getBoolValue("timeouts","warnfirst",true);
    s_warnLater = cfg.getBoolValue("timeouts","warnlater",false);

声明该模块可以处理那些消息

installRelay(Execute,50);
	installRelay(Ringing,50);
	installRelay(Progress,50);
	installRelay(Answered,50);
	installRelay(Private,"chan.hangup",50);
	Engine::install(new AttachHandler);
	Engine::install(new RtpHandler);
	Engine::install(new DTMFHandler);
这里我们重点查看处理RTP消息的类

class RtpHandler : public MessageHandler
{
public:
    RtpHandler() : MessageHandler("chan.rtp",100,splugin.name()) { }
    virtual bool received(Message &msg);
};
每当有模块发布消息"chan.rtp"的时候,都会进入received函数

bool RtpHandler::received(Message &msg)
{
    bool udptl = false;
    const String& trans = msg[YSTRING("transport")];
    if (trans && !trans.startsWith("RTP/")) {
	if (trans &= "udptl")
	    udptl = true;
	else
	    return false;
    }
    Debug(&splugin,DebugAll,"%s message received",(trans ? trans.c_str() : "No-transport"));
    bool terminate = msg.getBoolValue(YSTRING("terminate"),false);
    const String& dir = msg[YSTRING("direction")];
    RTPSession::Direction direction = terminate ? RTPSession::FullStop : RTPSession::SendRecv;
    bool d_recv = false;
    bool d_send = false;
    if (dir == YSTRING("bidir")) {
	d_recv = true;
	d_send = true;
    }
    else if (dir == YSTRING("receive")) {
	d_recv = true;
	direction = RTPSession::RecvOnly;
    }
    else if (dir == YSTRING("send")) {
	d_send = true;
	direction = RTPSession::SendOnly;
    }

    CallEndpoint* ch = YOBJECT(CallEndpoint,msg.userData());
    DataEndpoint* de = YOBJECT(DataEndpoint,msg.userData());
    const char* media = udptl ? "image" : "audio";
    media = msg.getValue(YSTRING("media"),(de ? de->name().c_str() : media));
    RefPointer<YRTPWrapper> w = YRTPWrapper::find(ch,media);
    if (w)
	Debug(&splugin,DebugAll,"Wrapper %p found by CallEndpoint %p",(YRTPWrapper*)w,ch);
    else {
	const String& rid = msg[YSTRING("rtpid")];
	w = YRTPWrapper::find(rid);
	if (w)
	    Debug(&splugin,DebugAll,"Wrapper %p found by ID '%s'",(YRTPWrapper*)w,rid.c_str());
    }
    if (w)
	w->deref();
    if (terminate) {
	if (w) {
	    if (w->host())
		msg.setParam("localip",w->host());
	    if (w->port())
		msg.setParam("localport",String(w->port()));
	    w->terminate(msg);
	    msg.setParam("status","terminated");
	    return true;
	}
	return false;
    }
    if (!(ch || de || w)) {
	Debug(&splugin,DebugWarn,"Neither call channel nor RTP wrapper found!");
	return false;
    }

    const String& rip = msg[YSTRING("remoteip")];
    const char* status = "updated";

    if (!w) {
	// it would be pointless to create an unreferenced wrapper
	if (!(d_recv || d_send))
	    return false;
	String lip(msg.getValue(YSTRING("localip")));
	bool ipv6 = msg.getBoolValue(YSTRING("ipv6_support"),s_ipv6);
	if (lip.null())
	    YRTPWrapper::guessLocal(rip,lip,ipv6);
	if (lip.null()) {
	    Debug(&splugin,DebugWarn,"RTP request with no local address!");
	    return false;
	}

	status = "created";
	w = new YRTPWrapper(lip,ch,media,direction,msg,udptl,ipv6);
	w->setMaster(msg.getValue(YSTRING("id")));

	w->deref();
    }
    else if (w->valid())
	w->addDirection(direction);
    else
	return false;

    if (d_recv) {
	if (ch && !ch->getSource(media)) {
	    DataSource* s = w->getSource();
	    ch->setSource(s,media);
	    s->deref();
	}
	else if (de && !de->getSource()) {
	    DataSource* s = w->getSource();
	    de->setSource(s);
	    s->deref();
	}
    }

    if (d_send) {
	if (ch && !ch->getConsumer(media)) {
	    DataConsumer* c = w->getConsumer();
	    ch->setConsumer(c,media);
	    c->deref();
	}
	else if (de && !de->getConsumer()) {
	    DataConsumer* c = w->getConsumer();
	    de->setConsumer(c);
	    c->deref();
	}
    }

    if (w->refcount() <= 1)
	return false;

    w->setParams(rip,msg);
    w->setFaxDivert(msg);
    msg.setParam("localip",w->host());
    msg.setParam("localport",String(w->port()));
    msg.setParam("rtpid",w->id());
    msg.setParam("status",status);

    if (msg.getBoolValue(YSTRING("getsession"),!msg.userData()))
	msg.userData(w);
    return true;
}
这个里面的内容比较多,我感觉可以分成两个部分来查看:

一个是

RefPointer<YRTPWrapper> w = YRTPWrapper::find(ch,media);
<pre name="code" class="cpp">w = YRTPWrapper::find(rid);

另外一个

w = new YRTPWrapper(lip,ch,media,direction,msg,udptl,ipv6);

不知道大家有没有注意过,yate对于转发两个yate client的rtp包的时候,是创建了两个ip、port去对应两个客户端的接受和发送。这里举一个比较简单的例子:

S是服务器,A和C是客户端,假如是A呼叫C,从S上面抓包(rtp)的格式大致是这样的:

a.ip:port-------------------------------------------->s.ip2:port2

s.ip1:port1----------------------------------------->c.ip:port

s.ip2:port2----------------------------------------->a.ip:port

c.ip:port--------------------------------------------->s.ip1:port

思考:

也就是说服务器创建了两个rtp的套接字去发送和接受两个客户端的媒体数据,为什么会这样?为什么不是一个套接字来处理A到C的媒体数据另外一个套接字来处理C到A的媒体数据?


未完,待续......









版权声明:本文为博主原创文章,未经博主允许不得转载。

yate学习--关于yrtpchan模块

标签:

原文地址:http://blog.csdn.net/u012377333/article/details/46730467

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!