标签:应用 free 5.4 9.png exist 综述 int core ali
使用 obs 向 nginx 推送一个直播流,该直播流经 nginx-rtmp 的 ngx_rtmp_live_module 模块转发给 application live 应用,
然后使用 vlc 连接 live,播放该直播流。
# 创建的子进程数
worker_processes 1;
error_log stderr debug;
daemon off;
master_process off;
events {
worker_connections 1024;
}
rtmp {
server {
listen 1935; # rtmp传输端口
chunk_size 4096; # 数据传输块大小
application live { # 直播配置
live on;
}
# obs 将流推到该 push 应用,push 应用又将该流发布到 live 应用
application push {
live on;
push rtmp://192.168.1.82:1935/live; # 推流到上面的直播应用
}
}
}
点击 "+" 选择一个媒体源,确定,然后设置该媒体源,如下图:
点击 "设置" 选择 "流",设置推流地址,如下图,确定后即可进行推流:
首先开始分析从 obs 推送 rtmp 流到 nginx 服务器的整个流程。
nginx 启动后,就会一直在 ngx_process_events 函数中的 epoll_eait 处休眠,监听客户端的连接:
static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
...
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll timer: %M", timer);
/* nginx 最初运行时,timer 为 -1,即一直等待客户端连接 */
events = epoll_wait(ep, event_list, (int) nevents, timer);
...
for (i = 0; i < events; i++) {
c = event_list[i].data.ptr;
instance = (uintptr_t) c & 1;
c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);
/* 获取被监听的读事件 */
rev = c->read;
/* 获取 epoll_wait 返回的事件标志 */
revents = event_list[i].events;
...
/* 若是监听的事件可读,首次监听即表示有新连接到来 */
if ((revents & EPOLLIN) && rev->active) {
...
rev->ready = 1;
/* 若是开启了负载均衡,则先将该事件添加到 ngx_posted_accept_events
* 延迟队列中 */
if (flags & NGX_POST_EVENTS) {
queue = rev->accept ? &ngx_posted_accept_events
: &ngx_posted_events;
ngx_post_event(rev, queue);
} else {
/* 否则,直接调用该读事件的回调函数,若是新连接则
* 调用的是 ngx_event_accept 函数 */
rev->handler(rev);
}
}
...
}
return NGX_OK;
}
ngx_event_accept 函数中主要也就是接受客户端的连接,并调用该监听端口对应的回调函数:
void
ngx_event_accept(ngx_event_t *ev)
{
...
do {
...
s = accept(lc->fd, &sa.sockaddr, &socklen);
...
/* 调用该监听端口对应的回调函数,对于 rtmp 模块,则固定为 ngx_rtmp_init_connection */
ls->handler(c);
...
} while (ev->available);
}
在 ngx_rtmp_init_connection 函数中先经过一系列的初始化后,开始接收与客户端进行 rtmp 的 handshake 过程。
下面从 hanshake 到 hanshake 成功后接收到第一个 rtmp 包之间仅以图片说明,就不再分析源码了。
该 hanshake 阶段即为等待接收客户端发送的 C0 和 C1 阶段。
接收到客户端发送的 C0 和 C1 后,服务器进入 NGX_RTMP_HANDSHAKE_SERVER_SEND_CHALLENGE(2)阶段,即为
发送S0 和 S1 阶段。
该 SERVER_SEND_CHALLENGE 阶段即为等待接收客户端发送的 S0 和 S1 阶段。但是实际上,服务器在发送完 S0 和
S1 后,进入到 SERVER_SEND_RESPONSE(3) 阶段后又立刻发送 S2,因此,在抓到的包如下:
该阶段为等待接收客户端发送的 C2 阶段。
至此,服务器和客户端的 rtmp handshake 过程完整,开始正常的信息交互阶段。
如下代码,接收到 C2 后,服务器即进入循环处理客户端的请求阶段:ngx_rtmp_cycle
static void
ngx_rtmp_handshake_done(ngx_rtmp_session_t *s)
{
ngx_rtmp_free_handshake_buffers(s);
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"handshake: done");
if (ngx_rtmp_fire_event(s, NGX_RTMP_HANDSHAKE_DONE,
NULL, NULL) != NGX_OK)
{
ngx_rtmp_finalize_session(s);
return;
}
ngx_rtmp_cycle(s);
}
ngx_rtmp_cycle 函数中,重新设置了当前 rtmp 连接的读、写事件的回调函数,当监听到客户端发送的数据时,将调用
ngx_rtmp_recv 函数进行处理。
void
ngx_rtmp_cycle(ngx_rtmp_session_t *s)
{
ngx_connection_t *c;
c = s->connection;
c->read->handler = ngx_rtmp_recv;
c->write->handler = ngx_rtmp_send;
s->ping_evt.data = c;
s->ping_evt.log = c->log;
s->ping_evt.handler = ngx_rtmp_ping;
ngx_rtmp_reset_ping(s);
ngx_rtmp_recv(c->read);
}
在 ngx_rtmp_recv 函数中,会循环接收客户端发来的 rtmp 包数据,接收到完整的一个 rtmp message 后,会根据该消息
的 rtmp message type,调用相应的函数进行处理,如,若为 20,即为 amf0 类型的命令消息,就会调用
ngx_rtmp_amf_message_handler 函数进行处理。
hanshake 成功后,接收到客户端发来的第一个 rtmp 包为连接 nginx.conf 中 rtmp{} 下的 application push{}
应用,如下图:
从该图可知,该消息类型为 20,即为 AMF0 Command,因此会调用 ngx_rtmp_amf_message_handler 对该消息进行解析,
然后对其中的命令 connect 调用预先设置好的 ngx_rtmp_cmd_connect_init 回调函数。在 ngx_rtmp_cmd_connect_init
函数中,继续解析该 connect 余下的消息后,开始 ngx_rtmp_connect 构件的 connect 函数链表,该链表中存放着各个
rtmp 模块对该 connect 命令所要做的操作(注:仅有部分 rtmp 模块会对该 connect 命令设置有回调函数,并且就算
设置了回调函数,也需要在配置文件中启用相应的模块才会真正执行该模块对 connect 的处理)。因此,对于 connect
命令,这里仅会真正处理 ngx_rtmp_cmd_module 模块设置 ngx_rtmp_cmd_connect 回调函数。
static ngx_int_t
ngx_rtmp_cmd_connect(ngx_rtmp_session_t *s, ngx_rtmp_connect_t *v)
{
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"rtmp cmd: connect");
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_core_app_conf_t **cacfp;
ngx_uint_t n;
ngx_rtmp_header_t h;
u_char *p;
static double trans;
static double capabilities = NGX_RTMP_CAPABILITIES;
static double object_encoding = 0;
/* 以下内容为服务器将要对客户端的 connect 命令返回的 amf 类型的响应 */
static ngx_rtmp_amf_elt_t out_obj[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("fmsVer"),
NGX_RTMP_FMS_VERSION, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("capabilities"),
&capabilities, 0 },
};
static ngx_rtmp_amf_elt_t out_inf[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("level"),
"status", 0 },
{ NGX_RTMP_AMF_STRING,
ngx_string("code"),
"NetConnection.Connect.Success", 0 },
{ NGX_RTMP_AMF_STRING,
ngx_string("description"),
"Connection succeeded.", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("objectEncoding"),
&object_encoding, 0 }
};
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"_result", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
out_obj, sizeof(out_obj) },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
out_inf, sizeof(out_inf) },
};
if (s->connected) {
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"connect: duplicate connection");
return NGX_ERROR;
}
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
trans = v->trans;
/* fill session parameters */
s->connected = 1;
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_CSID_AMF_INI;
h.type = NGX_RTMP_MSG_AMF_CMD;
#define NGX_RTMP_SET_STRPAR(name) s->name.len = ngx_strlen(v->name); s->name.data = ngx_palloc(s->connection->pool, s->name.len); ngx_memcpy(s->name.data, v->name, s->name.len)
NGX_RTMP_SET_STRPAR(app);
NGX_RTMP_SET_STRPAR(args);
NGX_RTMP_SET_STRPAR(flashver);
NGX_RTMP_SET_STRPAR(swf_url);
NGX_RTMP_SET_STRPAR(tc_url);
NGX_RTMP_SET_STRPAR(page_url);
#undef NGX_RTMP_SET_STRPAR
p = ngx_strlchr(s->app.data, s->app.data + s->app.len, ‘?‘);
if (p) {
s->app.len = (p - s->app.data);
}
s->acodecs = (uint32_t) v->acodecs;
s->vcodecs = (uint32_t) v->vcodecs;
/* 找到客户端 connect 的应用配置 */
/* find application & set app_conf */
cacfp = cscf->applications.elts;
for(n = 0; n < cscf->applications.nelts; ++n, ++cacfp) {
if ((*cacfp)->name.len == s->app.len &&
ngx_strncmp((*cacfp)->name.data, s->app.data, s->app.len) == 0)
{
/* found app! */
s->app_conf = (*cacfp)->app_conf;
break;
}
}
if (s->app_conf == NULL) {
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"connect: application not found: ‘%V‘", &s->app);
return NGX_ERROR;
}
object_encoding = v->object_encoding;
/* 发送应答窗口大小:ack_size 给客户端,该消息是用来通知对方应答窗口的大小,
* 发送方在发送了等于窗口大小的数据之后,等的爱接收对方的应答消息(在接收
* 到应答消息之前停止发送数据)。接收当必须发送应答消息,在会话开始时,在
* 会话开始时,会从上一次发送应答之后接收到了等于窗口大小的数据 */
return ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK ||
/* 发送 设置流带宽消息。发送此消息来说明对方的出口带宽限制,接收方以此来限制
* 自己的出口带宽,即限制未被应答的消息数据大小。接收到此消息的一方,如果
* 窗口大小与上一次发送的不一致,应该回复应答窗口大小的消息 */
ngx_rtmp_send_bandwidth(s, cscf->ack_window,
NGX_RTMP_LIMIT_DYNAMIC) != NGX_OK ||
/* 发送 设置块消息消息,用来通知对方新的最大的块大小。 */
ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK ||
ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]))
!= NGX_OK ? NGX_ERROR : NGX_OK;
}
服务器响应客户端 connect 命令消息后,客户端接着发送 releaseStream 命令消息给服务器,但是 nginx-rtmp 中没有
任何一个 rtmp 模块对该命令设置有回调函数,因此,不进行处理,接着等待接收下一个消息。
接着服务器接收到客户端发来的 createStream 命令消息。
从以前的分析可知,此时,会调用 ngx_rtmp_cmd_create_stream_init 函数。
static ngx_int_t
ngx_rtmp_cmd_create_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
static ngx_rtmp_create_stream_t v;
static ngx_rtmp_amf_elt_t in_elts[] = {
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&v.trans, sizeof(v.trans) },
};
/* 解析该 createStream 命令消息,获取 v.trans 值,从图(10) 可知,为 4 */
if (ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "createStream");
return ngx_rtmp_create_stream(s, &v);
}
接着,从该函数中开始调用 ngx_rtmp_create_stream 构建的函数链表。这里调用到的是 ngx_rtmp_cmd_create_stream
函数。
static ngx_int_t
ngx_rtmp_cmd_create_stream(ngx_rtmp_session_t *s, ngx_rtmp_create_stream_t *v)
{
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "rtmp cmd: create stream");
/* support one message stream per connection */
static double stream;
static double trans;
ngx_rtmp_header_t h;
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"_result", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&stream, sizeof(stream) },
};
trans = v->trans;
stream = NGX_RTMP_MSID;
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_CSID_AMF_INI;
h.type = NGX_RTMP_MSG_AMF_CMD;
return ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0])) == NGX_OK ?
NGX_DONE : NGX_ERROR;
}
该函数主要是发送服务器对 createStream 的响应。
接着,客户端发送 publish 给服务器,用来发布一个有名字的流到服务器,其他客户端可以使用此流名来播放流,接收
发布的音频,视频,以及其他数据消息。
从图中可知,publish type 为 ‘live‘,即服务器不会保存客户端发布的流到文件中。
static ngx_int_t
ngx_rtmp_cmd_publish_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
static ngx_rtmp_publish_t v;
static ngx_rtmp_amf_elt_t in_elts[] = {
/* transaction is always 0 */
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
&v.name, sizeof(v.name) },
{ NGX_RTMP_AMF_OPTIONAL | NGX_RTMP_AMF_STRING,
ngx_null_string,
&v.type, sizeof(v.type) },
};
ngx_memzero(&v, sizeof(v));
/* 从 publish 命令消息中获取 in_elts 中指定的值 */
if (ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
ngx_rtmp_cmd_fill_args(v.name, v.args);
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"publish: name=‘%s‘ args=‘%s‘ type=%s silent=%d",
v.name, v.args, v.type, v.silent);
return ngx_rtmp_publish(s, &v);
}
接着,该函数开始调用 ngx_rtmp_publish 构建的函数链表。从 nginx-rtmp 的源码和 nginx.conf 的配置可知,主要调用
ngx_rtmp_relay_publish 和 ngx_rtmp_live_publish 两个函数。
由 rtmp 模块的排序,首先调用 ngx_rtmp_relay_publish。
static ngx_int_t
ngx_rtmp_relay_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
{
ngx_rtmp_relay_app_conf_t *racf;
ngx_rtmp_relay_target_t *target, **t;
ngx_str_t name;
size_t n;
ngx_rtmp_relay_ctx_t *ctx;
if (s->auto_pushed) {
goto next;
}
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx && s->relay) {
goto next;
}
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
if (racf == NULL || racf->pushes.nelts == 0) {
goto next;
}
/* v->name 中保存的是从客户端发送的 publish 命令消息中提取出的要发布的流名称 */
name.len = ngx_strlen(v->name);
name.data = v->name;
/* 从 pushes 数组中取出首元素,遍历该数组 */
t = racf->pushes.elts;
for (n = 0; n < racf->pushes.nelts; ++n, ++t) {
target = *t;
/* 配置文件中是否指定了要推流的名称,若是,则检测指定的流名字与当前接收到的publish 流名
* 是否一致 */
if (target->name.len && (name.len != target->name.len ||
ngx_memcmp(name.data, target->name.data, name.len)))
{
continue;
}
if (ngx_rtmp_relay_push(s, &name, target) == NGX_OK) {
continue;
}
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
"relay: push failed name=‘%V‘ app=‘%V‘ "
"playpath=‘%V‘ url=‘%V‘",
&name, &target->app, &target->play_path,
&target->url.url);
if (!ctx->push_evt.timer_set) {
ngx_add_timer(&ctx->push_evt, racf->push_reconnect);
}
}
next:
return next_publish(s, v);
}
ngx_int_t
ngx_rtmp_relay_push(ngx_rtmp_session_t *s, ngx_str_t *name,
ngx_rtmp_relay_target_t *target)
{
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"relay: create push name=‘%V‘ app=‘%V‘ playpath=‘%V‘ url=‘%V‘",
name, &target->app, &target->play_path, &target->url.url);
return ngx_rtmp_relay_create(s, name, target,
ngx_rtmp_relay_create_local_ctx,
ngx_rtmp_relay_create_remote_ctx);
}
static ngx_int_t
ngx_rtmp_relay_create(ngx_rtmp_session_t *s, ngx_str_t *name,
ngx_rtmp_relay_target_t *target,
ngx_rtmp_relay_create_ctx_pt create_publish_ctx,
ngx_rtmp_relay_create_ctx_pt create_play_ctx)
{
ngx_rtmp_relay_app_conf_t *racf;
ngx_rtmp_relay_ctx_t *publish_ctx, *play_ctx, **cctx;
ngx_uint_t hash;
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
if (racf == NULL) {
return NGX_ERROR;
}
/* 该函数主要是创建一个新的连接,连接推流url中指定的地址,即将该地址作为上游服务器的地址,
* 向该上游服务器发起连接 */
play_ctx = create_play_ctx(s, name, target);
if (play_ctx == NULL) {
return NGX_ERROR;
}
hash = ngx_hash_key(name->data, name->len);
cctx = &racf->ctx[hash % racf->nbuckets];
for (; *cctx; cctx = &(*cctx)->next) {
if ((*cctx)->name.len == name->len
&& !ngx_memcmp(name->data, (*cctx)->name.data,
name->len))
{
break;
}
}
if (*cctx) {
play_ctx->publish = (*cctx)->publish;
play_ctx->next = (*cctx)->play;
(*cctx)->play = play_ctx;
return NGX_OK;
}
/* 创建一个本地 ngx_rtmp_relay_ctx_t */
publish_ctx = create_publish_ctx(s, name, target);
if (publish_ctx == NULL) {
ngx_rtmp_finalize_session(play_ctx->session);
return NGX_ERROR;
}
publish_ctx->publish = publish_ctx;
publish_ctx->play = play_ctx;
play_ctx->publish = publish_ctx;
*cctx = publish_ctx;
return NGX_OK;
}
static ngx_rtmp_relay_ctx_t *
ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t* name,
ngx_rtmp_relay_target_t *target)
{
ngx_rtmp_conf_ctx_t cctx;
cctx.app_conf = s->app_conf;
cctx.srv_conf = s->srv_conf;
cctx.main_conf = s->main_conf;
return ngx_rtmp_relay_create_connection(&cctx, name, target);
}
static ngx_rtmp_relay_ctx_t *
ngx_rtmp_relay_create_connection(ngx_rtmp_conf_ctx_t *cctx, ngx_str_t* name,
ngx_rtmp_relay_target_t *target)
{
ngx_rtmp_relay_app_conf_t *racf;
ngx_rtmp_relay_ctx_t *rctx;
ngx_rtmp_addr_conf_t *addr_conf;
ngx_rtmp_conf_ctx_t *addr_ctx;
ngx_rtmp_session_t *rs;
ngx_peer_connection_t *pc;
ngx_connection_t *c;
ngx_addr_t *addr;
ngx_pool_t *pool;
ngx_int_t rc;
ngx_str_t v, *uri;
u_char *first, *last, *p;
racf = ngx_rtmp_get_module_app_conf(cctx, ngx_rtmp_relay_module);
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, racf->log, 0,
"relay: create remote context");
pool = NULL;
/* 分配一个内存池 */
pool = ngx_create_pool(4096, racf->log);
if (pool == NULL) {
return NULL;
}
/* 从内存池中为 ngx_rtmp_relay_ctx_t 结构体分配内存 */
rctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_relay_ctx_t));
if (rctx == NULL) {
goto clear;
}
/* 将发布的流名拷贝到新建的 ngx_rtmp_relay_ctx_t 中的 name 成员 */
if (name && ngx_rtmp_relay_copy_str(pool, &rctx->name, name) != NGX_OK) {
goto clear;
}
/* 将配置文件中配置的 push 推流地址,即 url 拷贝到新建的 ngx_rtmp_relay_ctx_t
* 结构体的 url 成员中 */
if (ngx_rtmp_relay_copy_str(pool, &rctx->url, &target->url.url) != NGX_OK) {
goto clear;
}
/* target->tag 指向 ngx_rtmp_relay_module 结构体的首地址 */
rctx->tag = target->tag;
/* target->data 指向当前 data 所属的 ngx_rtmp_relay_ctx_t 结构体的首地址 */
rctx->data = target->data;
#define NGX_RTMP_RELAY_STR_COPY(to, from) if (ngx_rtmp_relay_copy_str(pool, &rctx->to, &target->from) != NGX_OK) { goto clear; }
/* 将以下 target 中的值拷贝到新建的 ngx_rtmp_relay_ctx_t 结构体的相应成员中 */
NGX_RTMP_RELAY_STR_COPY(app, app);
NGX_RTMP_RELAY_STR_COPY(tc_url, tc_url);
NGX_RTMP_RELAY_STR_COPY(page_url, page_url);
NGX_RTMP_RELAY_STR_COPY(swf_url, swf_url);
NGX_RTMP_RELAY_STR_COPY(flash_ver, flash_ver);
NGX_RTMP_RELAY_STR_COPY(play_path, play_path);
rctx->live = target->live;
rctx->start = target->start;
rctx->stop = target->stop;
#undef NGX_RTMP_RELAY_STR_COPY
/* 若 app 的值未知 */
if (rctx->app.len == 0 || rctx->play_path.len == 0) {
/* 这里是从推流地址中提取出 app 的值,下面分析以 "push rtmp:192.168.1.82:1935/live;"
* 为例,则提出的 live 将赋给 rctx->app */
/* parse uri */
uri = &target->url.uri;
first = uri->data;
last = uri->data + uri->len;
if (first != last && *first == ‘/‘) {
++first;
}
if (first != last) {
/* deduce app */
p = ngx_strlchr(first, last, ‘/‘);
if (p == NULL) {
p = last;
}
if (rctx->app.len == 0 && first != p) {
/* 这里 v.data 指向 "live" */
v.data = first;
v.len = p - first;
/* 将 "live" 赋给 rctx->app */
if (ngx_rtmp_relay_copy_str(pool, &rctx->app, &v) != NGX_OK) {
goto clear;
}
}
/* deduce play_path */
if (p != last) {
++p;
}
/* 若播放路径为 NULL 且 p 不等于 last(注,这里 p 不等于 last 意味着
* "push rtmp:192.168.1.82:1935/live;" 的 "live" 字符串后面还有数据,
* 但是,这里没有)*/
if (rctx->play_path.len == 0 && p != last) {
v.data = p;
v.len = last - p;
if (ngx_rtmp_relay_copy_str(pool, &rctx->play_path, &v)
!= NGX_OK)
{
goto clear;
}
}
}
}
/* 从内存池中为主动连接结构体 ngx_peer_connection_t 分配内存 */
pc = ngx_pcalloc(pool, sizeof(ngx_peer_connection_t));
if (pc == NULL) {
goto clear;
}
if (target->url.naddrs == 0) {
ngx_log_error(NGX_LOG_ERR, racf->log, 0,
"relay: no address");
goto clear;
}
/* get address */
/* 获取 推流地址 url 中指明的服务器地址(即推流的目标地址)
* 如"push rtmp:192.168.1.82:1935/live;" 中的 "192.168.1.82:1935" */
addr = &target->url.addrs[target->counter % target->url.naddrs];
target->counter++;
/* copy log to keep shared log unchanged */
rctx->log = *racf->log;
pc->log = &rctx->log;
/* 当使用长连接与上游服务器通信时,可通过该方法由连接池中获取一个新连接 */
pc->get = ngx_rtmp_relay_get_peer;
/* 当使用长连接与上游服务器通信时,通过该方法将使用完毕的连接释放给连接池 */
pc->free = ngx_rtmp_relay_free_peer;
/* 远端服务器的名称,这里其实就是 "192.168.1.82:1935" 该串字符串 */
pc->name = &addr->name;
pc->socklen = addr->socklen;
pc->sockaddr = (struct sockaddr *)ngx_palloc(pool, pc->socklen);
if (pc->sockaddr == NULL) {
goto clear;
}
/* 将 addr->sockaddr 中保存的远端服务器的地址信息拷贝到 pc->sockaddr 中 */
ngx_memcpy(pc->sockaddr, addr->sockaddr, pc->socklen);
/* 开始连接上游服务器 */
rc = ngx_event_connect_peer(pc);
/* 由 ngx_event_connect_peer 源码可知,因为 socket 套接字被设置为非阻塞,
* 因为首次 connect 必定失败,因此该函数返回 NGX_AGAIN */
if (rc != NGX_OK && rc != NGX_AGAIN ) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, racf->log, 0,
"relay: connection failed");
goto clear;
}
c = pc->connection;
c->pool = pool;
/* 推流 URL */
c->addr_text = rctx->url;
addr_conf = ngx_pcalloc(pool, sizeof(ngx_rtmp_addr_conf_t));
if (addr_conf == NULL) {
goto clear;
}
addr_ctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_conf_ctx_t));
if (addr_ctx == NULL) {
goto clear;
}
addr_conf->ctx = addr_ctx;
addr_ctx->main_conf = cctx->main_conf;
addr_ctx->srv_conf = cctx->srv_conf;
ngx_str_set(&addr_conf->addr_text, "ngx-relay");
/* 为该主动连接初始化一个会话 */
rs = ngx_rtmp_init_session(c, addr_conf);
if (rs == NULL) {
/* no need to destroy pool */
return NULL;
}
rs->app_conf = cctx->app_conf;
/* 置该标志位为 1 */
rs->relay = 1;
rctx->session = rs;
ngx_rtmp_set_ctx(rs, rctx, ngx_rtmp_relay_module);
ngx_str_set(&rs->flashver, "ngx-local-relay");
#if (NGX_STAT_STUB)
(void) ngx_atomic_fetch_add(ngx_stat_active, 1);
#endif
/* 此时作为客户端,开始向上游服务器发说送 hanshake 包,即 C0 + C1 */
ngx_rtmp_client_handshake(rs, 1);
return rctx;
clear:
if (pool) {
ngx_destroy_pool(pool);
}
return NULL;
}
ngx_int_t
ngx_event_connect_peer(ngx_peer_connection_t *pc)
{
int rc, type;
#if (NGX_HAVE_IP_BIND_ADDRESS_NO_PORT || NGX_LINUX)
in_port_t port;
#endif
ngx_int_t event;
ngx_err_t err;
ngx_uint_t level;
ngx_socket_t s;
ngx_event_t *rev, *wev;
ngx_connection_t *c;
/* 该 get 方法其实没有做任何处理 */
rc = pc->get(pc, pc->data);
if (rc != NGX_OK) {
return rc;
}
type = (pc->type ? pc->type : SOCK_STREAM);
/* 创建一个 socket 套接字 */
s = ngx_socket(pc->sockaddr->sa_family, type, 0);
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, "%s socket %d",
(type == SOCK_STREAM) ? "stream" : "dgram", s);
if (s == (ngx_socket_t) -1) {
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
ngx_socket_n " failed");
return NGX_ERROR;
}
/* 从连接池中获取一个空闲连接 */
c = ngx_get_connection(s, pc->log);
if (c == NULL) {
if (ngx_close_socket(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
ngx_close_socket_n "failed");
}
return NGX_ERROR;
}
/* 当前 socket 的类型,是 STREAM 还是 DGRAM,这里为 STREAM */
c->type = type;
/* 若设置了接收缓冲区的大小,从上面知没有设置 */
if (pc->rcvbuf) {
if (setsockopt(s, SOL_SOCKET, SO_RCVBUF,
(const void *) &pc->rcvbuf, sizeof(int)) == -1)
{
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
"setsockopt(SO_RCVBUF) failed");
goto failed;
}
}
/* 将该 socket 套接字设置为非阻塞 */
if (ngx_nonblocking(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
ngx_nonblocking_n " failed");
goto failed;
}
/* local 保存的是本地地址信息,则上面可知,没有设置 */
if (pc->local) {
#if (NGX_HAVE_TRANSPARENT_PROXY)
if (pc->transparent) {
if (ngx_event_connect_set_transparent(pc, s) != NGX_OK) {
goto failed;
}
}
#endif
#if (NGX_HAVE_IP_BIND_ADDRESS_NO_PORT || NGX_LINUX)
port = ngx_inet_get_port(pc->local->sockaddr);
#endif
#if (NGX_HAVE_IP_BIND_ADDRESS_NO_PORT)
if (pc->sockaddr->sa_family != AF_UNIX && port == 0) {
static int bind_address_no_port = 1;
if (bind_address_no_port) {
if (setsockopt(s, IPPROTO_IP, IP_BIND_ADDRESS_NO_PORT,
(const void *) &bind_address_no_port,
sizeof(int)) == -1)
{
err = ngx_socket_errno;
if (err != NGX_EOPNOTSUPP && err != NGX_ENOPROTOOPT) {
ngx_log_error(NGX_LOG_ALERT, pc->log, err,
"setsockopt(IP_BIND_ADDRESS_NO_PORT) "
"failed, ignored");
} else {
bind_address_no_port = 0;
}
}
}
}
#endif
#if (NGX_LINUX)
if (pc->type == SOCK_DGRAM && port != 0) {
int reuse_addr = 1;
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
(const void *) &reuse_addr, sizeof(int))
== -1)
{
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
"setsockopt(SO_REUSEADDR) failed");
goto failed;
}
}
#endif
if (bind(s, pc->local->sockaddr, pc->local->socklen) == -1) {
ngx_log_error(NGX_LOG_CRIT, pc->log, ngx_socket_errno,
"bind(%V) failed", &pc->local->name);
goto failed;
}
}
if (type == SOCK_STREAM) {
/* 设置当前连接的 IO 回调函数 */
c->recv = ngx_recv;
c->send = ngx_send;
c->recv_chain = ngx_recv_chain;
c->send_chain = ngx_send_chain;
/* 使用 sendfile */
c->sendfile = 1;
if (pc->sockaddr->sa_family == AF_UNIX) {
c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED;
c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED;
#if (NGX_SOLARIS)
/* Solaris‘s sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */
c->sendfile = 0;
#endif
}
} else { /* type == SOCK_DGRAM */
c->recv = ngx_udp_recv;
c->send = ngx_send;
c->send_chain = ngx_udp_send_chain;
}
c->log_error = pc->log_error;
/* 设置当前主动连接读写事件的回调函数 */
rev = c->read;
wev = c->write;
rev->log = pc->log;
wev->log = pc->log;
pc->connection = c;
c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
/* 将该主动连接的读写事件添加到 epoll 等事件监控机制中 */
if (ngx_add_conn) {
if (ngx_add_conn(c) == NGX_ERROR) {
goto failed;
}
}
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, pc->log, 0,
"connect to %V, fd:%d #%uA", pc->name, s, c->number);
/* 连接该上游服务器,因为该 socket 套接字被设置为非阻塞,因此首次connect返回 -1,即失败 */
rc = connect(s, pc->sockaddr, pc->socklen);
if (rc == -1) {
err = ngx_socket_errno;
if (err != NGX_EINPROGRESS
#if (NGX_WIN32)
/* Winsock returns WSAEWOULDBLOCK (NGX_EAGAIN) */
&& err != NGX_EAGAIN
#endif
)
{
if (err == NGX_ECONNREFUSED
#if (NGX_LINUX)
/*
* Linux returns EAGAIN instead of ECONNREFUSED
* for unix sockets if listen queue is full
*/
|| err == NGX_EAGAIN
#endif
|| err == NGX_ECONNRESET
|| err == NGX_ENETDOWN
|| err == NGX_ENETUNREACH
|| err == NGX_EHOSTDOWN
|| err == NGX_EHOSTUNREACH)
{
level = NGX_LOG_ERR;
} else {
level = NGX_LOG_CRIT;
}
ngx_log_error(level, c->log, err, "connect() to %V failed",
pc->name);
ngx_close_connection(c);
pc->connection = NULL;
return NGX_DECLINED;
}
}
/* 因此,从这里返回 NGX_AGAIN */
if (ngx_add_conn) {
if (rc == -1) {
/* NGX_EINPROGRESS */
return NGX_AGAIN;
}
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected");
wev->ready = 1;
return NGX_OK;
}
if (ngx_event_flags & NGX_USE_IOCP_EVENT) {
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, ngx_socket_errno,
"connect(): %d", rc);
if (ngx_blocking(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
ngx_blocking_n " failed");
goto failed;
}
/*
* FreeBSD‘s aio allows to post an operation on non-connected socket.
* NT does not support it.
*
* TODO: check in Win32, etc. As workaround we can use NGX_ONESHOT_EVENT
*/
rev->ready = 1;
wev->ready = 1;
return NGX_OK;
}
if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {
/* kqueue */
event = NGX_CLEAR_EVENT;
} else {
/* select, poll, /dev/poll */
event = NGX_LEVEL_EVENT;
}
if (ngx_add_event(rev, NGX_READ_EVENT, event) != NGX_OK) {
goto failed;
}
if (rc == -1) {
/* NGX_EINPROGRESS */
if (ngx_add_event(wev, NGX_WRITE_EVENT, event) != NGX_OK) {
goto failed;
}
return NGX_AGAIN;
}
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected");
wev->ready = 1;
return NGX_OK;
failed:
ngx_close_connection(c);
pc->connection = NULL;
return NGX_ERROR;
}
void
ngx_rtmp_client_handshake(ngx_rtmp_session_t *s, unsigned async)
{
ngx_connection_t *c;
c = s->connection;
/* 设置当前连接读写事件的回调函数 */
c->read->handler = ngx_rtmp_handshake_recv;
c->write->handler = ngx_rtmp_handshake_send;
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"handshake: start client handshake");
/* 为该将要进行的 hanshake 过程分配数据缓存,用于存储接收/响应的 hanshake 包 */
s->hs_buf = ngx_rtmp_alloc_handshake_buffer(s);
/* 设置当前 hanshake 阶段,即为 client send: C0 + C1 */
s->hs_stage = NGX_RTMP_HANDSHAKE_CLIENT_SEND_CHALLENGE;
/* 构建 C0 + C1 的 数据包 */
if (ngx_rtmp_handshake_create_challenge(s,
ngx_rtmp_client_version,
&ngx_rtmp_client_partial_key) != NGX_OK)
{
ngx_rtmp_finalize_session(s);
return;
}
/* 有前面的调用传入的参数可知,该值为 1,即为异步,因此这里暂时不向上游服务器发送 handshake,
* 而是将其写事件添加到定时器和 epoll 中,等待下次循环监控到该写事件可写时才发送 C0 + C1 */
if (async) {
/* 将该写事件添加到定时器中,超时时间为 s->timeout */
ngx_add_timer(c->write, s->timeout);
/* 将该写事件添加到 epoll 等事件监控机制中 */
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
ngx_rtmp_finalize_session(s);
}
return;
}
ngx_rtmp_handshake_send(c->write);
}
static ngx_rtmp_relay_ctx_t *
ngx_rtmp_relay_create_local_ctx(ngx_rtmp_session_t *s, ngx_str_t *name,
ngx_rtmp_relay_target_t *target)
{
ngx_rtmp_relay_ctx_t *ctx;
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"relay: create local context");
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL) {
ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_relay_ctx_t));
if (ctx == NULL) {
return NULL;
}
ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_relay_module);
}
ctx->session = s;
ctx->push_evt.data = s;
ctx->push_evt.log = s->connection->log;
/* 设置该 push_evt 事件的回调函数 */
ctx->push_evt.handler = ngx_rtmp_relay_push_reconnect;
if (ctx->publish) {
return NULL;
}
if (ngx_rtmp_relay_copy_str(s->connection->pool, &ctx->name, name)
!= NGX_OK)
{
return NULL;
}
return ctx;
}
从 ngx_rtmp_relay_create_local_ctx 函数返回后,就一直返回到 ngx_rtmp_relay_publish 函数中,接着执行 next_publish 的下
一个函数。这里为 ngx_rtmp_live_publish。
static ngx_int_t
ngx_rtmp_live_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
{
ngx_rtmp_live_app_conf_t *lacf;
ngx_rtmp_live_ctx_t *ctx;
lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
if (lacf == NULL || !lacf->live) {
goto next;
}
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: publish: name=‘%s‘ type=‘%s‘",
v->name, v->type);
/* join stream as publisher */
/* 构建一个 ngx_rtmp_live_ctx_t 结构体作为发布者 */
ngx_rtmp_live_join(s, v->name, 1);
/* 这里获取到的就是上面构建的 ngx_rtmp_live_ctx_t 结构体 */
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
if (ctx == NULL || !ctx->publishing) {
goto next;
}
ctx->silent = v->silent;
if (!ctx->silent) {
/* 对之前客户端发送的 publish 返回一个响应 */
ngx_rtmp_send_status(s, "NetStream.Publish.Start",
"status", "Start publishing");
}
next:
return next_publish(s, v);
}
之后又回到 epoll_wait 处,等待监听的事件触发。接下来的分析先看 nginx 的一段打印:
ngx_process_cycle.c:ngx_single_process_cycle:307 worker cycle
ngx_epoll_module.c:ngx_epoll_process_events:798 epoll timer: 59761
ngx_epoll_module.c:ngx_epoll_process_events:860 epoll: fd:9 ev:0004 d:088F6950
ngx_event_timer.h:ngx_event_del_timer:36 event timer del: 9: 958705070
ngx_send.c:ngx_unix_send:37 send: fd:9 1537 of 1537
ngx_epoll_module.c:ngx_epoll_del_event:686 epoll del event: fd:9 op:3 ev:00002001
ngx_rtmp_handshake.c:ngx_rtmp_handshake_send:544 handshake: stage 7
ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:0
ngx_event_timer.h:ngx_event_add_timer:82 event timer add: 9: 60000:958705071
ngx_epoll_module.c:ngx_epoll_process_events:860 epoll: fd:5 ev:0001 d:088F67E8
ngx_event_accept.c:ngx_event_accept:58 accept on 0.0.0.0:1935, ready: 0
ngx_alloc.c:ngx_memalign:66 posix_memalign: 08930870:4096 @16
ngx_event_accept.c:ngx_event_accept:293 *3 accept: 192.168.1.82:39334 fd:10
ngx_rtmp_init.c:ngx_rtmp_init_connection:124 *3 client connected ‘192.168.1.82‘
ngx_rtmp_handler.c:ngx_rtmp_set_chunk_size:823 setting chunk_size=128
ngx_alloc.c:ngx_memalign:66 posix_memalign: 089318A0:4096 @16
ngx_rtmp_limit_module.c:ngx_rtmp_limit_connect:87 rtmp limit: connect
ngx_rtmp_handshake.c:ngx_rtmp_handshake:589 handshake: start server handshake
ngx_rtmp_handshake.c:ngx_rtmp_alloc_handshake_buffer:208 handshake: allocating buffer
ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:0
ngx_event_timer.h:ngx_event_add_timer:82 event timer add: 10: 60000:958705071
ngx_epoll_module.c:ngx_epoll_add_event:625 epoll add event: fd:10 op:1 ev:80002001
ngx_event.c:ngx_process_events_and_timers:247 timer delta: 1
ngx_process_cycle.c:ngx_single_process_cycle:307 worker cycle
ngx_epoll_module.c:ngx_epoll_process_events:798 epoll timer: 59760
ngx_epoll_module.c:ngx_epoll_process_events:860 epoll: fd:10 ev:0001 d:088F69C8
ngx_event_timer.h:ngx_event_del_timer:36 event timer del: 10: 958705071
ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:1
ngx_recv.c:ngx_unix_recv:72 recv: fd:10 1537 of 1537
ngx_epoll_module.c:ngx_epoll_del_event:686 epoll del event: fd:10 op:2 ev:00000000
ngx_rtmp_handshake.c:ngx_rtmp_handshake_recv:429 handshake: stage 2
ngx_rtmp_handshake.c:ngx_rtmp_handshake_parse_challenge:303 handshake: peer version=14.13.0.12 epoch=958645070
ngx_rtmp_handshake.c:ngx_rtmp_handshake_parse_challenge:320 handshake: digest found at pos=638
ngx_send.c:ngx_unix_send:37 send: fd:10 1537 of 1537
ngx_rtmp_handshake.c:ngx_rtmp_handshake_send:544 handshake: stage 3
ngx_send.c:ngx_unix_send:37 send: fd:10 1536 of 1536
ngx_rtmp_handshake.c:ngx_rtmp_handshake_send:544 handshake: stage 4
ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:1
ngx_recv.c:ngx_unix_recv:72 recv: fd:10 -1 of 1536
ngx_recv.c:ngx_unix_recv:150 recv() not ready (11: Resource temporarily unavailable)
ngx_event_timer.h:ngx_event_add_timer:82 event timer add: 10: 60000:958705071
ngx_epoll_module.c:ngx_epoll_add_event:625 epoll add event: fd:10 op:1 ev:80002001
ngx_event.c:ngx_process_events_and_timers:247 timer delta: 0
ngx_process_cycle.c:ngx_single_process_cycle:307 worker cycle
ngx_epoll_module.c:ngx_epoll_process_events:798 epoll timer: 59760
ngx_epoll_module.c:ngx_epoll_process_events:860 epoll: fd:9 ev:0001 d:088F6950
ngx_event_timer.h:ngx_event_del_timer:36 event timer del: 9: 958705071
ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:1
ngx_recv.c:ngx_unix_recv:72 recv: fd:9 1537 of 1537
ngx_epoll_module.c:ngx_epoll_del_event:686 epoll del event: fd:9 op:2 ev:00000000
ngx_rtmp_handshake.c:ngx_rtmp_handshake_recv:429 handshake: stage 8
ngx_rtmp_handshake.c:ngx_rtmp_handshake_parse_challenge:303 handshake: peer version=13.10.14.13 epoch=958645071
ngx_rtmp_handshake.c:ngx_rtmp_handshake_parse_challenge:320 handshake: digest found at pos=557
ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:1
ngx_recv.c:ngx_unix_recv:72 recv: fd:9 1536 of 1536
ngx_rtmp_handshake.c:ngx_rtmp_handshake_recv:429 handshake: stage 9
ngx_send.c:ngx_unix_send:37 send: fd:9 1536 of 1536
ngx_rtmp_handshake.c:ngx_rtmp_handshake_send:544 handshake: stage 10
ngx_rtmp_handshake.c:ngx_rtmp_handshake_done:362 handshake: done
ngx_rtmp_relay_module.c:ngx_rtmp_relay_handshake_done:1319 rtmp relay module: handhshake done
首先 fd = 9 为连接上游服务器(192.168.1.82:1935) 时创建的作为客户端的 STREAM 类型的 socket 套接字,而 fd = 5 为 nginx
启动时创建的 STREAM 类型的 socket 监听套接字。因此,从打印中可以看出,上面的打印是这么一个流程:
客户端发送 C2 后,会进入 NGX_RTMP_HANDSHAKE_CLIENT_DONE(10) 阶段,接着会调用该函数 ngx_rtmp_handshake_done:
static void
ngx_rtmp_handshake_done(ngx_rtmp_session_t *s)
{
ngx_rtmp_free_handshake_buffers(s);
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"handshake: done");
if (ngx_rtmp_fire_event(s, NGX_RTMP_HANDSHAKE_DONE,
NULL, NULL) != NGX_OK)
{
ngx_rtmp_finalize_session(s);
return;
}
ngx_rtmp_cycle(s);
}
该函数接着会调用到 ngx_rtmp_relay_handshake_done 函数:
static ngx_int_t
ngx_rtmp_relay_handshake_done(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "rtmp relay module: handhshake done");
ngx_rtmp_relay_ctx_t *ctx;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL || !s->relay) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "rtmp relay module: return");
return NGX_OK;
}
/* 主要是向服务器发送 connect 连接命令 */
return ngx_rtmp_relay_send_connect(s);
}
客户端(192.168.1.82:39334, fd = 9) hanshake 成功后会向服务器发送 connec 连接命令。
static ngx_int_t
ngx_rtmp_relay_send_connect(ngx_rtmp_session_t *s)
{
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "rtmp relay module: send connect");
static double trans = NGX_RTMP_RELAY_CONNECT_TRANS;
static double acodecs = 3575;
static double vcodecs = 252;
static ngx_rtmp_amf_elt_t out_cmd[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("app"),
NULL, 0 }, /* <-- fill */
{ NGX_RTMP_AMF_STRING,
ngx_string("tcUrl"),
NULL, 0 }, /* <-- fill */
{ NGX_RTMP_AMF_STRING,
ngx_string("pageUrl"),
NULL, 0 }, /* <-- fill */
{ NGX_RTMP_AMF_STRING,
ngx_string("swfUrl"),
NULL, 0 }, /* <-- fill */
{ NGX_RTMP_AMF_STRING,
ngx_string("flashVer"),
NULL, 0 }, /* <-- fill */
{ NGX_RTMP_AMF_NUMBER,
ngx_string("audioCodecs"),
&acodecs, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("videoCodecs"),
&vcodecs, 0 }
};
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"connect", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
out_cmd, sizeof(out_cmd) }
};
ngx_rtmp_core_app_conf_t *cacf;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_relay_ctx_t *ctx;
ngx_rtmp_header_t h;
size_t len, url_len;
u_char *p, *url_end;
cacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_core_module);
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (cacf == NULL || ctx == NULL) {
return NGX_ERROR;
}
/* app */
if (ctx->app.len) {
out_cmd[0].data = ctx->app.data;
out_cmd[0].len = ctx->app.len;
} else {
out_cmd[0].data = cacf->name.data;
out_cmd[0].len = cacf->name.len;
}
/* tcUrl */
if (ctx->tc_url.len) {
out_cmd[1].data = ctx->tc_url.data;
out_cmd[1].len = ctx->tc_url.len;
} else {
len = sizeof("rtmp://") - 1 + ctx->url.len +
sizeof("/") - 1 + ctx->app.len;
p = ngx_palloc(s->connection->pool, len);
if (p == NULL) {
return NGX_ERROR;
}
out_cmd[1].data = p;
p = ngx_cpymem(p, "rtmp://", sizeof("rtmp://") - 1);
url_len = ctx->url.len;
url_end = ngx_strlchr(ctx->url.data, ctx->url.data + ctx->url.len, ‘/‘);
if (url_end) {
url_len = (size_t) (url_end - ctx->url.data);
}
p = ngx_cpymem(p, ctx->url.data, url_len);
*p++ = ‘/‘;
p = ngx_cpymem(p, ctx->app.data, ctx->app.len);
out_cmd[1].len = p - (u_char *)out_cmd[1].data;
}
/* pageUrl */
out_cmd[2].data = ctx->page_url.data;
out_cmd[2].len = ctx->page_url.len;
/* swfUrl */
out_cmd[3].data = ctx->swf_url.data;
out_cmd[3].len = ctx->swf_url.len;
/* flashVer */
if (ctx->flash_ver.len) {
out_cmd[4].data = ctx->flash_ver.data;
out_cmd[4].len = ctx->flash_ver.len;
} else {
out_cmd[4].data = NGX_RTMP_RELAY_FLASHVER;
out_cmd[4].len = sizeof(NGX_RTMP_RELAY_FLASHVER) - 1;
}
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_RELAY_CSID_AMF_INI;
h.type = NGX_RTMP_MSG_AMF_CMD;
return ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK
|| ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK
|| ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK
? NGX_ERROR
: NGX_OK;
}
发送完这几个 RTMP 包,后,又回到 epoll_wait 中进行监听。
下面的分析区分一个服务器,两个客户端:
此时,监听到 obs 客户端发送的类型为 amf_meta(18) 的 rtmp 消息。
对于 "@setDataFrame",仅有 ngx_rtmp_codec_module 模块对其设置了会调函数,为 ngx_rtmp_codec_meta_data 函数:
static ngx_int_t
ngx_rtmp_codec_meta_data(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_rtmp_codec_app_conf_t *cacf;
ngx_rtmp_codec_ctx_t *ctx;
ngx_uint_t skip;
static struct {
double width;
double height;
double duration;
double frame_rate;
double video_data_rate;
double video_codec_id_n;
u_char video_codec_id_s[32];
double audio_data_rate;
double audio_codec_id_n;
u_char audio_codec_id_s[32];
u_char profile[32];
u_char level[32];
} v;
static ngx_rtmp_amf_elt_t in_video_codec_id[] = {
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&v.video_codec_id_n, 0 },
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
&v.video_codec_id_s, sizeof(v.video_codec_id_s) },
};
static ngx_rtmp_amf_elt_t in_audio_codec_id[] = {
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&v.audio_codec_id_n, 0 },
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
&v.audio_codec_id_s, sizeof(v.audio_codec_id_s) },
};
static ngx_rtmp_amf_elt_t in_inf[] = {
{ NGX_RTMP_AMF_NUMBER,
ngx_string("width"),
&v.width, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("height"),
&v.height, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("duration"),
&v.duration, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("framerate"),
&v.frame_rate, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("fps"),
&v.frame_rate, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("videodatarate"),
&v.video_data_rate, 0 },
{ NGX_RTMP_AMF_VARIANT,
ngx_string("videocodecid"),
in_video_codec_id, sizeof(in_video_codec_id) },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("audiodatarate"),
&v.audio_data_rate, 0 },
{ NGX_RTMP_AMF_VARIANT,
ngx_string("audiocodecid"),
in_audio_codec_id, sizeof(in_audio_codec_id) },
{ NGX_RTMP_AMF_STRING,
ngx_string("profile"),
&v.profile, sizeof(v.profile) },
{ NGX_RTMP_AMF_STRING,
ngx_string("level"),
&v.level, sizeof(v.level) },
};
static ngx_rtmp_amf_elt_t in_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
in_inf, sizeof(in_inf) },
};
cacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_codec_module);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);
if (ctx == NULL) {
ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_codec_ctx_t));
ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_codec_module);
}
ngx_memzero(&v, sizeof(v));
/* use -1 as a sign of unchanged data;
* 0 is a valid value for uncompressed audio */
v.audio_codec_id_n = -1;
/* FFmpeg sends a string in front of actal metadata; ignore it */
skip = !(in->buf->last > in->buf->pos
&& *in->buf->pos == NGX_RTMP_AMF_STRING);
if (ngx_rtmp_receive_amf(s, in, in_elts + skip,
sizeof(in_elts) / sizeof(in_elts[0]) - skip))
{
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
"codec: error parsing data frame");
return NGX_OK;
}
ctx->width = (ngx_uint_t) v.width;
ctx->height = (ngx_uint_t) v.height;
ctx->duration = (ngx_uint_t) v.duration;
ctx->frame_rate = (ngx_uint_t) v.frame_rate;
ctx->video_data_rate = (ngx_uint_t) v.video_data_rate;
ctx->video_codec_id = (ngx_uint_t) v.video_codec_id_n;
ctx->audio_data_rate = (ngx_uint_t) v.audio_data_rate;
ctx->audio_codec_id = (v.audio_codec_id_n == -1
? 0 : v.audio_codec_id_n == 0
? NGX_RTMP_AUDIO_UNCOMPRESSED : (ngx_uint_t) v.audio_codec_id_n);
ngx_memcpy(ctx->profile, v.profile, sizeof(v.profile));
ngx_memcpy(ctx->level, v.level, sizeof(v.level));
ngx_log_debug8(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"codec: data frame: "
"width=%ui height=%ui duration=%ui frame_rate=%ui "
"video=%s (%ui) audio=%s (%ui)",
ctx->width, ctx->height, ctx->duration, ctx->frame_rate,
ngx_rtmp_get_video_codec_name(ctx->video_codec_id),
ctx->video_codec_id,
ngx_rtmp_get_audio_codec_name(ctx->audio_codec_id),
ctx->audio_codec_id);
switch (cacf->meta) {
case NGX_RTMP_CODEC_META_ON: // 初始化为该值
return ngx_rtmp_codec_reconstruct_meta(s);
case NGX_RTMP_CODEC_META_COPY:
return ngx_rtmp_codec_copy_meta(s, h, in);
}
/* NGX_RTMP_CODEC_META_OFF */
return NGX_OK;
}
该函数主要是解析 setDataFrame 的数据,然后调用 ngx_rtmp_codec_reconstruct_meta 函数。
static ngx_int_t
ngx_rtmp_codec_reconstruct_meta(ngx_rtmp_session_t *s)
{
ngx_rtmp_codec_ctx_t *ctx;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_int_t rc;
static struct {
double width;
double height;
double duration;
double frame_rate;
double video_data_rate;
double video_codec_id;
double audio_data_rate;
double audio_codec_id;
u_char profile[32];
u_char level[32];
} v;
static ngx_rtmp_amf_elt_t out_inf[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("Server"),
"NGINX RTMP (github.com/arut/nginx-rtmp-module)", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("width"),
&v.width, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("height"),
&v.height, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("displayWidth"),
&v.width, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("displayHeight"),
&v.height, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("duration"),
&v.duration, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("framerate"),
&v.frame_rate, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("fps"),
&v.frame_rate, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("videodatarate"),
&v.video_data_rate, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("videocodecid"),
&v.video_codec_id, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("audiodatarate"),
&v.audio_data_rate, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("audiocodecid"),
&v.audio_codec_id, 0 },
{ NGX_RTMP_AMF_STRING,
ngx_string("profile"),
&v.profile, sizeof(v.profile) },
{ NGX_RTMP_AMF_STRING,
ngx_string("level"),
&v.level, sizeof(v.level) },
};
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"onMetaData", 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
out_inf, sizeof(out_inf) },
};
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);
if (ctx == NULL) {
return NGX_OK;
}
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
if (ctx->meta) {
ngx_rtmp_free_shared_chain(cscf, ctx->meta);
ctx->meta = NULL;
}
v.width = ctx->width;
v.height = ctx->height;
v.duration = ctx->duration;
v.frame_rate = ctx->frame_rate;
v.video_data_rate = ctx->video_data_rate;
v.video_codec_id = ctx->video_codec_id;
v.audio_data_rate = ctx->audio_data_rate;
v.audio_codec_id = ctx->audio_codec_id;
ngx_memcpy(v.profile, ctx->profile, sizeof(ctx->profile));
ngx_memcpy(v.level, ctx->level, sizeof(ctx->level));
rc = ngx_rtmp_append_amf(s, &ctx->meta, NULL, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]));
if (rc != NGX_OK || ctx->meta == NULL) {
return NGX_ERROR;
}
return ngx_rtmp_codec_prepare_meta(s, 0);
}
static ngx_int_t
ngx_rtmp_codec_prepare_meta(ngx_rtmp_session_t *s, uint32_t timestamp)
{
ngx_rtmp_header_t h;
ngx_rtmp_codec_ctx_t *ctx;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_CSID_AMF;
h.msid = NGX_RTMP_MSID;
h.type = NGX_RTMP_MSG_AMF_META;
h.timestamp = timestamp;
/* 构造完整的 rtmp 消息 */
ngx_rtmp_prepare_message(s, &h, NULL, ctx->meta);
ctx->meta_version = ngx_rtmp_codec_get_next_version();
return NGX_OK;
}
服务器接收到客户端发送的设置块大小消息。此时服务器会调用到 ngx_rtmp_set_chunk_size 函数进行块大小的设置。
ngx_int_t
ngx_rtmp_set_chunk_size(ngx_rtmp_session_t *s, ngx_uint_t size)
{
ngx_rtmp_core_srv_conf_t *cscf;
ngx_chain_t *li, *fli, *lo, *flo;
ngx_buf_t *bi, *bo;
ngx_int_t n;
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"setting chunk_size=%ui", size);
if (size > NGX_RTMP_MAX_CHUNK_SIZE) {
ngx_log_error(NGX_LOG_ALERT, s->connection->log, 0,
"too big RTMP chunk size:%ui", size);
return NGX_ERROR;
}
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
s->in_old_pool = s->in_pool;
s->in_chunk_size = size;
s->in_pool = ngx_create_pool(4096, s->connection->log);
/* copy existing chunk data */
if (s->in_old_pool) {
s->in_chunk_size_changing = 1;
s->in_streams[0].in = NULL;
for(n = 1; n < cscf->max_streams; ++n) {
/* stream buffer is circular
* for all streams except for the current one
* (which caused this chunk size change);
* we can simply ignore it */
li = s->in_streams[n].in;
if (li == NULL || li->next == NULL) {
s->in_streams[n].in = NULL;
continue;
}
/* move from last to the first */
li = li->next;
fli = li;
lo = ngx_rtmp_alloc_in_buf(s);
if (lo == NULL) {
return NGX_ERROR;
}
flo = lo;
for ( ;; ) {
bi = li->buf;
bo = lo->buf;
if (bo->end - bo->last >= bi->last - bi->pos) {
bo->last = ngx_cpymem(bo->last, bi->pos,
bi->last - bi->pos);
li = li->next;
if (li == fli) {
lo->next = flo;
s->in_streams[n].in = lo;
break;
}
continue;
}
bi->pos += (ngx_cpymem(bo->last, bi->pos,
bo->end - bo->last) - bo->last);
lo->next = ngx_rtmp_alloc_in_buf(s);
lo = lo->next;
if (lo == NULL) {
return NGX_ERROR;
}
}
}
}
return NGX_OK;
}
服务器接收到客户端发送的设置应答窗口大小的消息。
ngx_int_t
ngx_rtmp_protocol_message_handler(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in)
{
...
switch(h->type) {
...
case NGX_RTMP_MSG_ACK_SIZE:
/* receive window size =val */
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"receive ack_size=%uD", val);
/* 直接设置应答窗口大小 */
s->ack_size = val;
break;
...
}
}
服务器接收到客户端发送的 connect 连接命令。该客户端要连接的 app 为 live。
抓不到包,只能看打印:
AMF read (1) 00 ‘?‘
AMF read (8) 3F F0 00 00 00 00 00 00 ‘????????‘
AMF read (1) 03 ‘?‘
AMF read (2) 00 03 ‘??‘
AMF read (3) 61 70 70 ‘app‘
AMF read (1) 02 ‘?‘
AMF read (2) 00 04 ‘??‘
AMF read (4) 6C 69 76 65 ‘live‘
AMF read (2) 00 05 ‘??‘
AMF read (5) 74 63 55 72 6C ‘tcUrl‘
AMF read (1) 02 ‘?‘
AMF read (2) 00 1D ‘??‘
AMF read (29) 72 74 6D 70 3A 2F 2F 31 39 32 2E 31 36 38 2E 31 ‘rtmp://192.168.1‘
AMF read (2) 00 07 ‘??‘
AMF read (7) 70 61 67 65 55 72 6C ‘pageUrl‘
AMF read (1) 02 ‘?‘
AMF read (2) 00 00 ‘??‘
AMF read (2) 00 06 ‘??‘
AMF read (6) 73 77 66 55 72 6C ‘swfUrl‘
AMF read (1) 02 ‘?‘
AMF read (2) 00 00 ‘??‘
AMF read (2) 00 08 ‘??‘
AMF read (8) 66 6C 61 73 68 56 65 72 ‘flashVer‘
AMF read (1) 02 ‘?‘
AMF read (2) 00 0F ‘??‘
AMF read (15) 4C 4E 58 2E 31 31 2C 31 2C 31 30 32 2C 35 35 ‘LNX.11,1,102,55‘
AMF read (2) 00 0B ‘??‘
AMF read (11) 61 75 64 69 6F 43 6F 64 65 63 73 ‘audioCodecs‘
AMF read (1) 00 ‘?‘
AMF read (8) 40 AB EE 00 00 00 00 00 ‘@???????‘
AMF read (2) 00 0B ‘??‘
AMF read (11) 76 69 64 65 6F 43 6F 64 65 63 73 ‘videoCodecs‘
AMF read (1) 00 ‘?‘
AMF read (8) 40 6F 80 00 00 00 00 00 ‘@o??????‘
AMF read (2) 00 00 ‘??‘
AMF read (1) 09 ‘?‘
static ngx_int_t
ngx_rtmp_cmd_connect(ngx_rtmp_session_t *s, ngx_rtmp_connect_t *v)
{
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"rtmp cmd: connect");
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_core_app_conf_t **cacfp;
ngx_uint_t n;
ngx_rtmp_header_t h;
u_char *p;
static double trans;
static double capabilities = NGX_RTMP_CAPABILITIES;
static double object_encoding = 0;
/* 以下内容为服务器将要对客户端的 connect 命令返回的 amf 类型的响应 */
static ngx_rtmp_amf_elt_t out_obj[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("fmsVer"),
NGX_RTMP_FMS_VERSION, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("capabilities"),
&capabilities, 0 },
};
static ngx_rtmp_amf_elt_t out_inf[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("level"),
"status", 0 },
{ NGX_RTMP_AMF_STRING,
ngx_string("code"),
"NetConnection.Connect.Success", 0 },
{ NGX_RTMP_AMF_STRING,
ngx_string("description"),
"Connection succeeded.", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("objectEncoding"),
&object_encoding, 0 }
};
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"_result", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
out_obj, sizeof(out_obj) },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
out_inf, sizeof(out_inf) },
};
if (s->connected) {
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"connect: duplicate connection");
return NGX_ERROR;
}
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
trans = v->trans;
/* fill session parameters */
s->connected = 1;
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_CSID_AMF_INI;
h.type = NGX_RTMP_MSG_AMF_CMD;
#define NGX_RTMP_SET_STRPAR(name) s->name.len = ngx_strlen(v->name); s->name.data = ngx_palloc(s->connection->pool, s->name.len); ngx_memcpy(s->name.data, v->name, s->name.len)
NGX_RTMP_SET_STRPAR(app);
NGX_RTMP_SET_STRPAR(args);
NGX_RTMP_SET_STRPAR(flashver);
NGX_RTMP_SET_STRPAR(swf_url);
NGX_RTMP_SET_STRPAR(tc_url);
NGX_RTMP_SET_STRPAR(page_url);
#undef NGX_RTMP_SET_STRPAR
p = ngx_strlchr(s->app.data, s->app.data + s->app.len, ‘?‘);
if (p) {
s->app.len = (p - s->app.data);
}
s->acodecs = (uint32_t) v->acodecs;
s->vcodecs = (uint32_t) v->vcodecs;
/* 找到客户端 connect 的应用配置 */
/* find application & set app_conf */
cacfp = cscf->applications.elts;
for(n = 0; n < cscf->applications.nelts; ++n, ++cacfp) {
if ((*cacfp)->name.len == s->app.len &&
ngx_strncmp((*cacfp)->name.data, s->app.data, s->app.len) == 0)
{
/* found app! */
s->app_conf = (*cacfp)->app_conf;
break;
}
}
if (s->app_conf == NULL) {
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"connect: application not found: ‘%V‘", &s->app);
return NGX_ERROR;
}
object_encoding = v->object_encoding;
/* 发送应答窗口大小:ack_size 给客户端,该消息是用来通知对方应答窗口的大小,
* 发送方在发送了等于窗口大小的数据之后,等的爱接收对方的应答消息(在接收
* 到应答消息之前停止发送数据)。接收当必须发送应答消息,在会话开始时,在
* 会话开始时,会从上一次发送应答之后接收到了等于窗口大小的数据 */
return ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK ||
/* 发送 设置流带宽消息。发送此消息来说明对方的出口带宽限制,接收方以此来限制
* 自己的出口带宽,即限制未被应答的消息数据大小。接收到此消息的一方,如果
* 窗口大小与上一次发送的不一致,应该回复应答窗口大小的消息 */
ngx_rtmp_send_bandwidth(s, cscf->ack_window,
NGX_RTMP_LIMIT_DYNAMIC) != NGX_OK ||
/* 发送 设置块消息消息,用来通知对方新的最大的块大小。 */
ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK ||
ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]))
!= NGX_OK ? NGX_ERROR : NGX_OK;
}
这里,服务器向客户端(192.168.1.82:xxxx)发送了 ack_size、bandwidth、chunk_size 和 对 connect 的响应的包。
客户端接收到服务器发来的设置应答窗口大小的消息。
ngx_int_t
ngx_rtmp_protocol_message_handler(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in)
{
...
switch(h->type) {
...
case NGX_RTMP_MSG_ACK_SIZE:
/* receive window size =val */
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"receive ack_size=%uD", val);
/* 直接设置应答窗口大小 */
s->ack_size = val;
break;
...
}
}
客户端接收到服务器发来的设置流带宽的消息。
ngx_int_t
ngx_rtmp_protocol_message_handler(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in)
{
...
switch(h->type) {
...
case NGX_RTMP_MSG_BANDWIDTH:
if (b->last - b->pos >= 5) {
limit = *(uint8_t*)&b->pos[4];
(void)val;
(void)limit;
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"receive bandwidth=%uD limit=%d",
val, (int)limit);
/* receive window size =val
* && limit */
}
break;
...
}
}
客户端接收到服务器发来的设置块大小的消息。因此调用 ngx_rtmp_set_chunk_size 函数进行设置。
static ngx_int_t
ngx_rtmp_relay_on_result(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_rtmp_relay_ctx_t *ctx;
static struct {
double trans;
u_char level[32];
u_char code[128];
u_char desc[1024];
} v;
static ngx_rtmp_amf_elt_t in_inf[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("level"),
&v.level, sizeof(v.level) },
{ NGX_RTMP_AMF_STRING,
ngx_string("code"),
&v.code, sizeof(v.code) },
{ NGX_RTMP_AMF_STRING,
ngx_string("description"),
&v.desc, sizeof(v.desc) },
};
static ngx_rtmp_amf_elt_t in_elts[] = {
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&v.trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
in_inf, sizeof(in_inf) },
};
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL || !s->relay) {
return NGX_OK;
}
ngx_memzero(&v, sizeof(v));
if (ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"relay: _result: level=‘%s‘ code=‘%s‘ description=‘%s‘",
v.level, v.code, v.desc);
switch ((ngx_int_t)v.trans) {
case NGX_RTMP_RELAY_CONNECT_TRANS:
/* 向服务器发送 createStream 命令 */
return ngx_rtmp_relay_send_create_stream(s);
case NGX_RTMP_RELAY_CREATE_STREAM_TRANS:
if (ctx->publish != ctx && !s->static_relay) {
if (ngx_rtmp_relay_send_publish(s) != NGX_OK) {
return NGX_ERROR;
}
return ngx_rtmp_relay_play_local(s);
} else {
if (ngx_rtmp_relay_send_play(s) != NGX_OK) {
return NGX_ERROR;
}
return ngx_rtmp_relay_publish_local(s);
}
default:
return NGX_OK;
}
}
该函数中首先解析接收到响应数据,然后根据 v.trans 调用相应的函数进行处理,这里为调用 ngx_rtmp_relay_send_create_stream。
static ngx_int_t
ngx_rtmp_relay_send_create_stream(ngx_rtmp_session_t *s)
{
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "rtmp relay: send create stream");
static double trans = NGX_RTMP_RELAY_CREATE_STREAM_TRANS;
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"createStream", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 }
};
ngx_rtmp_header_t h;
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_RELAY_CSID_AMF_INI;
h.type = NGX_RTMP_MSG_AMF_CMD;
return ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]));
}
该函数主要是构建 createStream 包,然后发送给服务器。
服务器接收到客户端发来的 createStream 命令消息。
static ngx_int_t
ngx_rtmp_cmd_create_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
static ngx_rtmp_create_stream_t v;
static ngx_rtmp_amf_elt_t in_elts[] = {
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&v.trans, sizeof(v.trans) },
};
/* 解析该 createStream 命令消息,获取 v.trans 值 */
if (ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "createStream");
return ngx_rtmp_create_stream(s, &v);
}
接着,从该函数中开始调用 ngx_rtmp_create_stream 构建的函数链表。这里调用到的是 ngx_rtmp_cmd_create_stream
函数。
static ngx_int_t
ngx_rtmp_cmd_create_stream(ngx_rtmp_session_t *s, ngx_rtmp_create_stream_t *v)
{
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "rtmp cmd: create stream");
/* support one message stream per connection */
static double stream;
static double trans;
ngx_rtmp_header_t h;
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"_result", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&stream, sizeof(stream) },
};
trans = v->trans;
stream = NGX_RTMP_MSID;
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_CSID_AMF_INI;
h.type = NGX_RTMP_MSG_AMF_CMD;
return ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0])) == NGX_OK ?
NGX_DONE : NGX_ERROR;
}
该函数是构建对 createStream 的响应。
static ngx_int_t
ngx_rtmp_relay_on_result(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_rtmp_relay_ctx_t *ctx;
static struct {
double trans;
u_char level[32];
u_char code[128];
u_char desc[1024];
} v;
static ngx_rtmp_amf_elt_t in_inf[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("level"),
&v.level, sizeof(v.level) },
{ NGX_RTMP_AMF_STRING,
ngx_string("code"),
&v.code, sizeof(v.code) },
{ NGX_RTMP_AMF_STRING,
ngx_string("description"),
&v.desc, sizeof(v.desc) },
};
static ngx_rtmp_amf_elt_t in_elts[] = {
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&v.trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
in_inf, sizeof(in_inf) },
};
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL || !s->relay) {
return NGX_OK;
}
ngx_memzero(&v, sizeof(v));
if (ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"relay: _result: level=‘%s‘ code=‘%s‘ description=‘%s‘",
v.level, v.code, v.desc);
switch ((ngx_int_t)v.trans) {
case NGX_RTMP_RELAY_CONNECT_TRANS:
return ngx_rtmp_relay_send_create_stream(s);
case NGX_RTMP_RELAY_CREATE_STREAM_TRANS:
if (ctx->publish != ctx && !s->static_relay) {
/* 向服务器发送 publish 命令 */
if (ngx_rtmp_relay_send_publish(s) != NGX_OK) {
return NGX_ERROR;
}
return ngx_rtmp_relay_play_local(s);
} else {
if (ngx_rtmp_relay_send_play(s) != NGX_OK) {
return NGX_ERROR;
}
return ngx_rtmp_relay_publish_local(s);
}
default:
return NGX_OK;
}
}
该函数中首先解析接收到响应数据,然后根据 v.trans 调用相应的函数进行处理,这里为调用 ngx_rtmp_relay_send_publish。
static ngx_int_t
ngx_rtmp_relay_send_publish(ngx_rtmp_session_t *s)
{
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "rtmp relay: send publish");
static double trans;
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"publish", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
NULL, 0 }, /* <- to fill */
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"live", 0 }
};
ngx_rtmp_header_t h;
ngx_rtmp_relay_ctx_t *ctx;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL) {
return NGX_ERROR;
}
if (ctx->play_path.len) {
out_elts[3].data = ctx->play_path.data;
out_elts[3].len = ctx->play_path.len;
} else {
out_elts[3].data = ctx->name.data;
out_elts[3].len = ctx->name.len;
}
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_RELAY_CSID_AMF;
h.msid = NGX_RTMP_RELAY_MSID;
h.type = NGX_RTMP_MSG_AMF_CMD;
return ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]));
}
static ngx_int_t
ngx_rtmp_relay_play_local(ngx_rtmp_session_t *s)
{
ngx_rtmp_play_t v;
ngx_rtmp_relay_ctx_t *ctx;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL) {
return NGX_ERROR;
}
ngx_memzero(&v, sizeof(ngx_rtmp_play_t));
v.silent = 1;
*(ngx_cpymem(v.name, ctx->name.data,
ngx_min(sizeof(v.name) - 1, ctx->name.len))) = 0;
return ngx_rtmp_play(s, &v);
}
在该函数中又调用 ngx_rtmp_play 构建的函数链表,这里主要调用了 ngx_rtmp_live_play 函数。
static ngx_int_t
ngx_rtmp_live_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v)
{
ngx_rtmp_live_app_conf_t *lacf;
ngx_rtmp_live_ctx_t *ctx;
lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
if (lacf == NULL || !lacf->live) {
goto next;
}
ngx_log_debug4(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: play: name=‘%s‘ start=%uD duration=%uD reset=%d",
v->name, (uint32_t) v->start,
(uint32_t) v->duration, (uint32_t) v->reset);
/* join stream as subscriber */
ngx_rtmp_live_join(s, v->name, 0);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
if (ctx == NULL) {
goto next;
}
ctx->silent = v->silent;
if (!ctx->silent && !lacf->play_restart) {
ngx_rtmp_send_status(s, "NetStream.Play.Start",
"status", "Start live");
ngx_rtmp_send_sample_access(s);
}
next:
return next_play(s, v);
}
服务器接收到客户端的 publish 命令。
static ngx_int_t
ngx_rtmp_cmd_publish_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
static ngx_rtmp_publish_t v;
static ngx_rtmp_amf_elt_t in_elts[] = {
/* transaction is always 0 */
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
&v.name, sizeof(v.name) },
{ NGX_RTMP_AMF_OPTIONAL | NGX_RTMP_AMF_STRING,
ngx_null_string,
&v.type, sizeof(v.type) },
};
ngx_memzero(&v, sizeof(v));
if (ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
ngx_rtmp_cmd_fill_args(v.name, v.args);
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"publish: name=‘%s‘ args=‘%s‘ type=%s silent=%d",
v.name, v.args, v.type, v.silent);
return ngx_rtmp_publish(s, &v);
}
当前客户端连接的 application 为 live,而该 application{} 下没有 push,因此这里主要调用 ngx_rtmp_live_publish。
static ngx_int_t
ngx_rtmp_live_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
{
ngx_rtmp_live_app_conf_t *lacf;
ngx_rtmp_live_ctx_t *ctx;
lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
if (lacf == NULL || !lacf->live) {
goto next;
}
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: publish: name=‘%s‘ type=‘%s‘",
v->name, v->type);
/* join stream as publisher */
ngx_rtmp_live_join(s, v->name, 1);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
if (ctx == NULL || !ctx->publishing) {
goto next;
}
ctx->silent = v->silent;
if (!ctx->silent) {
/* 发送对 publish 的响应 */
ngx_rtmp_send_status(s, "NetStream.Publish.Start",
"status", "Start publishing");
}
next:
return next_publish(s, v);
}
客户端接收到服务器发送的对 publish 的响应。表示客户端可以向服务器发布流了。
static ngx_int_t
ngx_rtmp_relay_on_status(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_rtmp_relay_ctx_t *ctx;
static struct {
double trans;
u_char level[32];
u_char code[128];
u_char desc[1024];
} v;
static ngx_rtmp_amf_elt_t in_inf[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("level"),
&v.level, sizeof(v.level) },
{ NGX_RTMP_AMF_STRING,
ngx_string("code"),
&v.code, sizeof(v.code) },
{ NGX_RTMP_AMF_STRING,
ngx_string("description"),
&v.desc, sizeof(v.desc) },
};
static ngx_rtmp_amf_elt_t in_elts[] = {
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&v.trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
in_inf, sizeof(in_inf) },
};
static ngx_rtmp_amf_elt_t in_elts_meta[] = {
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
in_inf, sizeof(in_inf) },
};
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL || !s->relay) {
return NGX_OK;
}
ngx_memzero(&v, sizeof(v));
if (h->type == NGX_RTMP_MSG_AMF_META) {
ngx_rtmp_receive_amf(s, in, in_elts_meta,
sizeof(in_elts_meta) / sizeof(in_elts_meta[0]));
} else {
ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0]));
}
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"relay: onStatus: level=‘%s‘ code=‘%s‘ description=‘%s‘",
v.level, v.code, v.desc);
return NGX_OK;
}
服务器接收到客户端 obs 发送的音频包。
对于 NGX_RTMP_MSG_AUDIO(8),主要有以下几个 rtmp 模块设置了回调函数:
这里主要调用 codec 和 live 模块设置的回调函数,首先调用 ngx_rtmp_codec_module 模块设置的回调函数 ngx_rtmp_codec_av。
static ngx_int_t
ngx_rtmp_codec_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_codec_ctx_t *ctx;
ngx_chain_t **header;
uint8_t fmt;
static ngx_uint_t sample_rates[] =
{ 5512, 11025, 22050, 44100 };
if (h->type != NGX_RTMP_MSG_AUDIO && h->type != NGX_RTMP_MSG_VIDEO) {
return NGX_OK;
}
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);
if (ctx == NULL) {
ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_codec_ctx_t));
ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_codec_module);
}
/* save codec */
if (in->buf->last - in->buf->pos < 1) {
return NGX_OK;
}
fmt = in->buf->pos[0];
if (h->type == NGX_RTMP_MSG_AUDIO) {
ctx->audio_codec_id = (fmt & 0xf0) >> 4;
ctx->audio_channels = (fmt & 0x01) + 1;
ctx->sample_size = (fmt & 0x02) ? 2 : 1;
if (ctx->sample_rate == 0) {
ctx->sample_rate = sample_rates[(fmt & 0x0c) >> 2];
}
} else {
ctx->video_codec_id = (fmt & 0x0f);
}
/* save AVC/AAC header */
if (in->buf->last - in->buf->pos < 3) {
return NGX_OK;
}
/* no conf */
if (!ngx_rtmp_is_codec_header(in)) {
return NGX_OK;
}
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
header = NULL;
if (h->type == NGX_RTMP_MSG_AUDIO) {
if (ctx->audio_codec_id == NGX_RTMP_AUDIO_AAC) {
header = &ctx->aac_header;
ngx_rtmp_codec_parse_aac_header(s, in);
}
} else {
if (ctx->video_codec_id == NGX_RTMP_VIDEO_H264) {
header = &ctx->avc_header;
ngx_rtmp_codec_parse_avc_header(s, in);
}
}
if (header == NULL) {
return NGX_OK;
}
if (*header) {
ngx_rtmp_free_shared_chain(cscf, *header);
}
*header = ngx_rtmp_append_shared_bufs(cscf, NULL, in);
return NGX_OK;
}
static void
ngx_rtmp_codec_parse_aac_header(ngx_rtmp_session_t *s, ngx_chain_t *in)
{
ngx_uint_t idx;
ngx_rtmp_codec_ctx_t *ctx;
ngx_rtmp_bit_reader_t br;
static ngx_uint_t aac_sample_rates[] =
{ 96000, 88200, 64000, 48000,
44100, 32000, 24000, 22050,
16000, 12000, 11025, 8000,
7350, 0, 0, 0 };
#if (NGX_DEBUG)
ngx_rtmp_codec_dump_header(s, "aac", in);
#endif
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);
ngx_rtmp_bit_init_reader(&br, in->buf->pos, in->buf->last);
/* 读取 16 bit 的值,这里读取到的值不做处理,相当于跳过 16 bit */
ngx_rtmp_bit_read(&br, 16);
/* 读取 5 bit 的 aac_profile 值 */
ctx->aac_profile = (ngx_uint_t) ngx_rtmp_bit_read(&br, 5);
if (ctx->aac_profile == 31) {
ctx->aac_profile = (ngx_uint_t) ngx_rtmp_bit_read(&br, 6) + 32;
}
idx = (ngx_uint_t) ngx_rtmp_bit_read(&br, 4);
if (idx == 15) {
ctx->sample_rate = (ngx_uint_t) ngx_rtmp_bit_read(&br, 24);
} else {
ctx->sample_rate = aac_sample_rates[idx];
}
ctx->aac_chan_conf = (ngx_uint_t) ngx_rtmp_bit_read(&br, 4);
if (ctx->aac_profile == 5 || ctx->aac_profile == 29) {
if (ctx->aac_profile == 29) {
ctx->aac_ps = 1;
}
ctx->aac_sbr = 1;
idx = (ngx_uint_t) ngx_rtmp_bit_read(&br, 4);
if (idx == 15) {
ctx->sample_rate = (ngx_uint_t) ngx_rtmp_bit_read(&br, 24);
} else {
ctx->sample_rate = aac_sample_rates[idx];
}
ctx->aac_profile = (ngx_uint_t) ngx_rtmp_bit_read(&br, 5);
if (ctx->aac_profile == 31) {
ctx->aac_profile = (ngx_uint_t) ngx_rtmp_bit_read(&br, 6) + 32;
}
}
/* MPEG-4 Audio Specific Config
5 bits: object type
if (object type == 31)
6 bits + 32: object type
4 bits: frequency index
if (frequency index == 15)
24 bits: frequency
4 bits: channel configuration
if (object_type == 5)
4 bits: frequency index
if (frequency index == 15)
24 bits: frequency
5 bits: object type
if (object type == 31)
6 bits + 32: object type
var bits: AOT Specific Config
*/
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"codec: aac header profile=%ui, "
"sample_rate=%ui, chan_conf=%ui",
ctx->aac_profile, ctx->sample_rate, ctx->aac_chan_conf);
}
void
ngx_rtmp_bit_init_reader(ngx_rtmp_bit_reader_t *br, u_char *pos, u_char *last)
{
ngx_memzero(br, sizeof(ngx_rtmp_bit_reader_t));
br->pos = pos;
br->last = last;
}
该函数初始化一个 bit reader。
uint64_t
ngx_rtmp_bit_read(ngx_rtmp_bit_reader_t *br, ngx_uint_t n)
{
uint64_t v;
ngx_uint_t d;
v = 0;
while (n) {
/* 若已经读取到尾部,则置位错误标志位 */
if (br->pos >= br->last) {
br->err = 1;
return 0;
}
/* 控制一次读取的 bit 数不超过 8 bit */
d = (br->offs + n > 8 ? (ngx_uint_t) (8 - br->offs) : n);
v <<= d;
/* 将读取到的值追加到 v 中 */
v += (*br->pos >> (8 - br->offs - d)) & ((u_char) 0xff >> (8 - d));
/* 更新 bit reader 的 偏移值 offs */
br->offs += d;
n -= d;
/* 若偏移值为8,则重置该偏移值 */
if (br->offs == 8) {
br->pos++;
br->offs = 0;
}
}
return v;
}
static ngx_int_t
ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_rtmp_live_ctx_t *ctx, *pctx;
ngx_rtmp_codec_ctx_t *codec_ctx;
ngx_chain_t *header, *coheader, *meta,
*apkt, *aapkt, *acopkt, *rpkt;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_live_app_conf_t *lacf;
ngx_rtmp_session_t *ss;
ngx_rtmp_header_t ch, lh, clh;
ngx_int_t rc, mandatory, dummy_audio;
ngx_uint_t prio;
ngx_uint_t peers;
ngx_uint_t meta_version;
ngx_uint_t csidx;
uint32_t delta;
ngx_rtmp_live_chunk_stream_t *cs;
#ifdef NGX_DEBUG
const char *type_s;
type_s = (h->type == NGX_RTMP_MSG_VIDEO ? "video" : "audio");
#endif
lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
if (lacf == NULL) {
return NGX_ERROR;
}
if (!lacf->live || in == NULL || in->buf == NULL) {
return NGX_OK;
}
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
if (ctx == NULL || ctx->stream == NULL) {
return NGX_OK;
}
if (ctx->publishing == 0) {
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: %s from non-publisher", type_s);
return NGX_OK;
}
/* 若当前流处于未活跃状态 */
if (!ctx->stream->active) {
ngx_rtmp_live_start(s);
}
if (ctx->idle_evt.timer_set) {
ngx_add_timer(&ctx->idle_evt, lacf->idle_timeout);
}
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: %s packet timestamp=%uD",
type_s, h->timestamp);
s->current_time = h->timestamp;
peers = 0;
apkt = NULL;
aapkt = NULL;
acopkt = NULL;
header = NULL;
coheader = NULL;
meta = NULL;
meta_version = 0;
mandatory = 0;
prio = (h->type == NGX_RTMP_MSG_VIDEO ?
ngx_rtmp_get_video_frame_type(in) : 0);
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
csidx = !(lacf->interleave || h->type == NGX_RTMP_MSG_VIDEO);
cs = &ctx->cs[csidx];
ngx_memzero(&ch, sizeof(ch));
ch.timestamp = h->timestamp;
ch.msid = NGX_RTMP_MSID;
ch.csid = cs->csid;
ch.type = h->type;
lh = ch;
if (cs->active) {
lh.timestamp = cs->timestamp;
}
clh = lh;
clh.type = (h->type == NGX_RTMP_MSG_AUDIO ? NGX_RTMP_MSG_VIDEO :
NGX_RTMP_MSG_AUDIO);
cs->active = 1;
cs->timestamp = ch.timestamp;
delta = ch.timestamp - lh.timestamp;
/*
if (delta >> 31) {
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: clipping non-monotonical timestamp %uD->%uD",
lh.timestamp, ch.timestamp);
delta = 0;
ch.timestamp = lh.timestamp;
}
*/
rpkt = ngx_rtmp_append_shared_bufs(cscf, NULL, in);
ngx_rtmp_prepare_message(s, &ch, &lh, rpkt);
codec_ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);
if (codec_ctx) {
if (h->type == NGX_RTMP_MSG_AUDIO) {
header = codec_ctx->aac_header;
if (lacf->interleave) {
coheader = codec_ctx->avc_header;
}
if (codec_ctx->audio_codec_id == NGX_RTMP_AUDIO_AAC &&
ngx_rtmp_is_codec_header(in))
{
prio = 0;
mandatory = 1;
}
} else {
header = codec_ctx->avc_header;
if (lacf->interleave) {
coheader = codec_ctx->aac_header;
}
if (codec_ctx->video_codec_id == NGX_RTMP_VIDEO_H264 &&
ngx_rtmp_is_codec_header(in))
{
prio = 0;
mandatory = 1;
}
}
if (codec_ctx->meta) {
meta = codec_ctx->meta;
meta_version = codec_ctx->meta_version;
}
}
/* broadcast to all subscribers */
for (pctx = ctx->stream->ctx; pctx; pctx = pctx->next) {
if (pctx == ctx || pctx->paused) {
continue;
}
ss = pctx->session;
cs = &pctx->cs[csidx];
/* send metadata */
if (meta && meta_version != pctx->meta_version) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: meta");
if (ngx_rtmp_send_message(ss, meta, 0) == NGX_OK) {
pctx->meta_version = meta_version;
}
}
/* sync stream */
if (cs->active && (lacf->sync && cs->dropped > lacf->sync)) {
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: sync %s dropped=%uD", type_s, cs->dropped);
cs->active = 0;
cs->dropped = 0;
}
/* absolute packet */
if (!cs->active) {
if (mandatory) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: skipping header");
continue;
}
if (lacf->wait_video && h->type == NGX_RTMP_MSG_AUDIO &&
!pctx->cs[0].active)
{
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: waiting for video");
continue;
}
if (lacf->wait_key && prio != NGX_RTMP_VIDEO_KEY_FRAME &&
(lacf->interleave || h->type == NGX_RTMP_MSG_VIDEO))
{
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: skip non-key");
continue;
}
dummy_audio = 0;
if (lacf->wait_video && h->type == NGX_RTMP_MSG_VIDEO &&
!pctx->cs[1].active)
{
dummy_audio = 1;
if (aapkt == NULL) {
aapkt = ngx_rtmp_alloc_shared_buf(cscf);
ngx_rtmp_prepare_message(s, &clh, NULL, aapkt);
}
}
if (header || coheader) {
/* send absolute codec header */
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: abs %s header timestamp=%uD",
type_s, lh.timestamp);
if (header) {
if (apkt == NULL) {
apkt = ngx_rtmp_append_shared_bufs(cscf, NULL, header);
ngx_rtmp_prepare_message(s, &lh, NULL, apkt);
}
rc = ngx_rtmp_send_message(ss, apkt, 0);
if (rc != NGX_OK) {
continue;
}
}
if (coheader) {
if (acopkt == NULL) {
acopkt = ngx_rtmp_append_shared_bufs(cscf, NULL, coheader);
ngx_rtmp_prepare_message(s, &clh, NULL, acopkt);
}
rc = ngx_rtmp_send_message(ss, acopkt, 0);
if (rc != NGX_OK) {
continue;
}
} else if (dummy_audio) {
ngx_rtmp_send_message(ss, aapkt, 0);
}
cs->timestamp = lh.timestamp;
cs->active = 1;
ss->current_time = cs->timestamp;
} else {
/* send absolute packet */
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: abs %s packet timestamp=%uD",
type_s, ch.timestamp);
if (apkt == NULL) {
apkt = ngx_rtmp_append_shared_bufs(cscf, NULL, in);
ngx_rtmp_prepare_message(s, &ch, NULL, apkt);
}
rc = ngx_rtmp_send_message(ss, apkt, prio);
if (rc != NGX_OK) {
continue;
}
cs->timestamp = ch.timestamp;
cs->active = 1;
ss->current_time = cs->timestamp;
++peers;
if (dummy_audio) {
ngx_rtmp_send_message(ss, aapkt, 0);
}
continue;
}
}
/* send relative packet */
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: rel %s packet delta=%uD",
type_s, delta);
if (ngx_rtmp_send_message(ss, rpkt, prio) != NGX_OK) {
++pctx->ndropped;
cs->dropped += delta;
if (mandatory) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: mandatory packet failed");
ngx_rtmp_finalize_session(ss);
}
continue;
}
cs->timestamp += delta;
++peers;
ss->current_time = cs->timestamp;
}
if (rpkt) {
ngx_rtmp_free_shared_chain(cscf, rpkt);
}
if (apkt) {
ngx_rtmp_free_shared_chain(cscf, apkt);
}
if (aapkt) {
ngx_rtmp_free_shared_chain(cscf, aapkt);
}
if (acopkt) {
ngx_rtmp_free_shared_chain(cscf, acopkt);
}
ngx_rtmp_update_bandwidth(&ctx->stream->bw_in, h->mlen);
ngx_rtmp_update_bandwidth(&ctx->stream->bw_out, h->mlen * peers);
ngx_rtmp_update_bandwidth(h->type == NGX_RTMP_MSG_AUDIO ?
&ctx->stream->bw_in_audio :
&ctx->stream->bw_in_video,
h->mlen);
return NGX_OK;
}
该函数的主要是将接收到来自客户端 obs 发送来的 音视频 数据转发给该流的订购者,即 application live。
static void
ngx_rtmp_live_start(ngx_rtmp_session_t *s)
{
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_live_app_conf_t *lacf;
ngx_chain_t *control;
ngx_chain_t *status[3];
size_t n, nstatus;
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
/* 构建好 stream_begin rtmp 包 */
control = ngx_rtmp_create_stream_begin(s, NGX_RTMP_MSID);
nstatus = 0;
if (lacf->play_restart) {
status[nstatus++] = ngx_rtmp_create_status(s, "NetStream.Play.Start",
"status", "Start live");
status[nstatus++] = ngx_rtmp_create_sample_access(s);
}
if (lacf->publish_notify) {
status[nstatus++] = ngx_rtmp_create_status(s,
"NetStream.Play.PublishNotify",
"status", "Start publishing");
}
ngx_rtmp_live_set_status(s, control, status, nstatus, 1);
if (control) {
ngx_rtmp_free_shared_chain(cscf, control);
}
for (n = 0; n < nstatus; ++n) {
ngx_rtmp_free_shared_chain(cscf, status[n]);
}
}
static ngx_int_t
ngx_rtmp_codec_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_codec_ctx_t *ctx;
ngx_chain_t **header;
uint8_t fmt;
static ngx_uint_t sample_rates[] =
{ 5512, 11025, 22050, 44100 };
if (h->type != NGX_RTMP_MSG_AUDIO && h->type != NGX_RTMP_MSG_VIDEO) {
return NGX_OK;
}
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);
if (ctx == NULL) {
ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_codec_ctx_t));
ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_codec_module);
}
/* save codec */
if (in->buf->last - in->buf->pos < 1) {
return NGX_OK;
}
fmt = in->buf->pos[0];
if (h->type == NGX_RTMP_MSG_AUDIO) {
ctx->audio_codec_id = (fmt & 0xf0) >> 4;
ctx->audio_channels = (fmt & 0x01) + 1;
ctx->sample_size = (fmt & 0x02) ? 2 : 1;
if (ctx->sample_rate == 0) {
ctx->sample_rate = sample_rates[(fmt & 0x0c) >> 2];
}
} else {
ctx->video_codec_id = (fmt & 0x0f);
}
/* save AVC/AAC header */
if (in->buf->last - in->buf->pos < 3) {
return NGX_OK;
}
/* no conf */
if (!ngx_rtmp_is_codec_header(in)) {
return NGX_OK;
}
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
header = NULL;
if (h->type == NGX_RTMP_MSG_AUDIO) {
if (ctx->audio_codec_id == NGX_RTMP_AUDIO_AAC) {
header = &ctx->aac_header;
ngx_rtmp_codec_parse_aac_header(s, in);
}
} else {
if (ctx->video_codec_id == NGX_RTMP_VIDEO_H264) {
header = &ctx->avc_header;
ngx_rtmp_codec_parse_avc_header(s, in);
}
}
if (header == NULL) {
return NGX_OK;
}
if (*header) {
ngx_rtmp_free_shared_chain(cscf, *header);
}
*header = ngx_rtmp_append_shared_bufs(cscf, NULL, in);
return NGX_OK;
}
static void
ngx_rtmp_codec_parse_avc_header(ngx_rtmp_session_t *s, ngx_chain_t *in)
{
ngx_uint_t profile_idc, width, height, crop_left, crop_right,
crop_top, crop_bottom, frame_mbs_only, n, cf_idc,
num_ref_frames;
ngx_rtmp_codec_ctx_t *ctx;
ngx_rtmp_bit_reader_t br;
#if (NGX_DEBUG)
ngx_rtmp_codec_dump_header(s, "avc", in);
#endif
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);
ngx_rtmp_bit_init_reader(&br, in->buf->pos, in->buf->last);
ngx_rtmp_bit_read(&br, 48);
ctx->avc_profile = (ngx_uint_t) ngx_rtmp_bit_read_8(&br);
ctx->avc_compat = (ngx_uint_t) ngx_rtmp_bit_read_8(&br);
ctx->avc_level = (ngx_uint_t) ngx_rtmp_bit_read_8(&br);
/* nal bytes */
ctx->avc_nal_bytes = (ngx_uint_t) ((ngx_rtmp_bit_read_8(&br) & 0x03) + 1);
/* nnals */
if ((ngx_rtmp_bit_read_8(&br) & 0x1f) == 0) {
return;
}
/* nal size */
ngx_rtmp_bit_read(&br, 16);
/* nal type */
if (ngx_rtmp_bit_read_8(&br) != 0x67) {
return;
}
/* SPS */
/* profile idc */
profile_idc = (ngx_uint_t) ngx_rtmp_bit_read(&br, 8);
/* flags */
ngx_rtmp_bit_read(&br, 8);
/* level idc */
ngx_rtmp_bit_read(&br, 8);
/* SPS id */
ngx_rtmp_bit_read_golomb(&br);
if (profile_idc == 100 || profile_idc == 110 ||
profile_idc == 122 || profile_idc == 244 || profile_idc == 44 ||
profile_idc == 83 || profile_idc == 86 || profile_idc == 118)
{
/* chroma format idc */
cf_idc = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);
if (cf_idc == 3) {
/* separate color plane */
ngx_rtmp_bit_read(&br, 1);
}
/* bit depth luma - 8 */
ngx_rtmp_bit_read_golomb(&br);
/* bit depth chroma - 8 */
ngx_rtmp_bit_read_golomb(&br);
/* qpprime y zero transform bypass */
ngx_rtmp_bit_read(&br, 1);
/* seq scaling matrix present */
if (ngx_rtmp_bit_read(&br, 1)) {
for (n = 0; n < (cf_idc != 3 ? 8u : 12u); n++) {
/* seq scaling list present */
if (ngx_rtmp_bit_read(&br, 1)) {
/* TODO: scaling_list()
if (n < 6) {
} else {
}
*/
}
}
}
}
/* log2 max frame num */
ngx_rtmp_bit_read_golomb(&br);
/* pic order cnt type */
switch (ngx_rtmp_bit_read_golomb(&br)) {
case 0:
/* max pic order cnt */
ngx_rtmp_bit_read_golomb(&br);
break;
case 1:
/* delta pic order alwys zero */
ngx_rtmp_bit_read(&br, 1);
/* offset for non-ref pic */
ngx_rtmp_bit_read_golomb(&br);
/* offset for top to bottom field */
ngx_rtmp_bit_read_golomb(&br);
/* num ref frames in pic order */
num_ref_frames = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);
for (n = 0; n < num_ref_frames; n++) {
/* offset for ref frame */
ngx_rtmp_bit_read_golomb(&br);
}
}
/* num ref frames */
ctx->avc_ref_frames = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);
/* gaps in frame num allowed */
ngx_rtmp_bit_read(&br, 1);
/* pic width in mbs - 1 */
width = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);
/* pic height in map units - 1 */
height = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);
/* frame mbs only flag */
frame_mbs_only = (ngx_uint_t) ngx_rtmp_bit_read(&br, 1);
if (!frame_mbs_only) {
/* mbs adaprive frame field */
ngx_rtmp_bit_read(&br, 1);
}
/* direct 8x8 inference flag */
ngx_rtmp_bit_read(&br, 1);
/* frame cropping */
if (ngx_rtmp_bit_read(&br, 1)) {
crop_left = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);
crop_right = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);
crop_top = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);
crop_bottom = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);
} else {
crop_left = 0;
crop_right = 0;
crop_top = 0;
crop_bottom = 0;
}
ctx->width = (width + 1) * 16 - (crop_left + crop_right) * 2;
ctx->height = (2 - frame_mbs_only) * (height + 1) * 16 -
(crop_top + crop_bottom) * 2;
ngx_log_debug7(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"codec: avc header "
"profile=%ui, compat=%ui, level=%ui, "
"nal_bytes=%ui, ref_frames=%ui, width=%ui, height=%ui",
ctx->avc_profile, ctx->avc_compat, ctx->avc_level,
ctx->avc_nal_bytes, ctx->avc_ref_frames,
ctx->width, ctx->height);
}
标签:应用 free 5.4 9.png exist 综述 int core ali
原文地址:https://www.cnblogs.com/jimodetiantang/p/8994061.html