码迷,mamicode.com
首页 > 编程语言 > 详细

SRS之接收推流线程:recv

时间:2018-05-29 00:23:00      阅读:1072      评论:0      收藏:0      [点我收藏+]

标签:share   nat   _id   OLE   dsr   memory   please   int   before   

SrsPublishRecvThread、SrsRecvThread、SrsReusableThread2、SrsThread 之间的关系图

技术分享图片

1. recv 线程函数:SrsThread::thread_fun

void *SrsThread::thread_fun(void *arg)
{
    SrsThread* obj = (SrsThread*)arg;
    srs_assert(obj);
    
    /* 进入线程循环 */
    obj->thread_cycle();
    
    // for valgrind to detect.
    SrsThreadContext* ctx = dynamic_cast<SrsThreadContext*>(_srs_context);
    if (ctx) {
        ctx->clear_cid();
    }
    
    st_thread_exit(NULL);
    
    return NULL;
}

1.1 SrsThread::thread_cycle

void SrsThread::thread_cycle()
{
    int ret = ERROR_SUCCESS;
    
    /* 生成 recv 线程的一个上下文 id */
    _srs_context->generate_id();
    srs_info("thread %s cycle start", _name);
    
    /* 将生成的 recv 上下文 id 赋给 _cid,以便 recv 的父线程醒来后
     * 可以继续往下执行,此时父线程会设置 can_run 为 true */
    _cid = _srs_context->get_id();
    
    srs_assert(handler);
    /* 调用 SrsReusableThread2 实现的 on_thread_start 函数 */
    handler->on_thread_start();
    
    // thread is running now.
    really_terminated = false;
    
    /* 上面生成好 recv 线程的上下文 id 后,这里会陷入休眠,接着会
     * 调度到 recv 的父线程执行,父线程检测到 _cid 准备好后,
     * 即会设置 can_run 为 true,表示 recv 线程可以继续往下执行了 */
    // wait for cid to ready, for parent thread to get the cid.
    while (!can_run && loop) {
        st_usleep(10 * 1000);
    }
    
    while (loop) {
        /* 该函数没有具体做任何事,忽略 */
        if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) {
            srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", 
                     _name, ret);
            goto failed;
        }
        srs_info("thread %s on before cycle success", _name);
        
        /* 调用 SrsReusableThread2 实现的 cycle 函数 */
        if ((ret = handler->cycle()) != ERROR_SUCCESS) {
            if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) 
            {
                srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret);
            }
            goto failed;
        }
        srs_info("thread %s cycle success", _name);
        
        if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) {
            srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", 
                     _name, ret);
            goto failed;
        }
        srs_info("thread %s on end cycle success", _name);
        
    failed:
        if (!loop) {
            break;
        }
        
        // to improve performance, donot sleep when interval is zero.
        // @see: https://github.com/ossrs/srs/issues/237
        if (cycle_interval_us != 0) {
            st_usleep(cycle_interval_us);
        }
    }
    
    // readly terminated now.
    really_terminated = true;
    
    handler->on_thread_stop();
    srs_info("thread %s cycle finished", _name);
}

1.2 SrsReusableThread2::on_thread_start

void SrsReusableThread2::on_thread_start()
{
    handler->on_thread_start();
}

该函数中接着调用 SrsRecvThread 实现的 on_thread_start 函数。

1.2.1 SrsRecvThread::on_thread_start

void SrsRecvThread::on_thread_start()
{
    // the multiple messages writev improve performance large,
    // but the timeout recv will cause 33% sys call performance,
    // to use isolate thread to recv, can improve about 33% performance.
    // @see https://github.com/ossrs/srs/issues/194
    // @see: https://github.com/ossrs/srs/issues/217
    rtmp->set_recv_timeout(ST_UTIME_NO_TIMEOUT);
    
    handler->on_thread_start();
}

函数先设置 recv 的超时时间为 -1,然后接着调用 SrsPublishRecvThread 实现的 on_thread_start 函数。

1.2.2 SrsPublishRecvThread::on_thread_start

void SrsPublishRecvThread::on_thread_start()
{
    // we donot set the auto response to false,
    // for the main thread never send message.
    
    /* 若配置文件中没有设置 mr 配置项,则默认没有启用该功能,可忽略 */
#ifdef SRS_PERF_MERGED_READ
    if (mr) {
        // set underlayer buffer size
        set_socket_buffer(mr_sleep);
        
        // disable the merge read
        // @see https://github.com/ossrs/srs/issues/241
        rtmp->set_merge_read(true, this);
    }
#endif
}

1.3 SrsReusableThread2::cycle

int SrsReusableThread2::cycle()
{
    return handler->cycle();
}

接着调用 SrsRecvThread 实现的 cycle 函数,该函数才开始真正接收客户端推流的数据。

2. 接收推流数据:SrsRecvThread::cycle

int SrsRecvThread::cycle()
{
    int ret = ERROR_SUCCESS;
    
    /* 若当前没有被中断的情况下,进入循环开始接收客户端的消息 */
    while (!trd->interrupted()) {
        /* 调用 SrsPublishRecvThread 实现的 can_handle 函数,该函数
         * 默认返回 true,即默认推流线程总是可以处理消息 */
        if (!handler->can_handler()) {
            st_usleep(timeout * 1000);
            continue;
        }
        
        SrsCommonMessage* msg = NULL;
        
        // recv and handle message
        ret = rtmp->recv_message(&msg);
        if (ret == ERROR_SUCCESS) {
            /* 调用 SrsPublishRecvThread 实现的 handle 函数处理接收到的消息 */
            ret = handler->handle(msg);
        }
        
        /* 若发生错误,则中断当前 recv 线程 */
        if (ret != ERROR_SUCCESS) {
            if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) 
            {
                srs_error("thread process message failed. ret=%d", ret);
            }
            
            // we use no timeout to recv, should never got any error.
            trd->interrupt();
            
            // notice the handler got a recv error
            handler->on_recv_error(ret);
            
            return ret;
        }
        srs_verbose("thread loop recv message. ret=%d", ret);
    }
    
    return ret;
}

3. 接收推流数据:SrsRtmpServer::recv_message

int SrsRtmpServer::recv_message(SrsCommonMessage** pmsg)
{
    return protocol->recv_message(pmsg);
}

该函数接着调用 SrsProtocol 实现的 recv_message 函数。

3.1 SrsProtocol::recv_message

int SrsProtocol::recv_message(SrsCommonMessage** pmsg)
{
    *pmsg = NULL;
    
    int ret = ERROR_SUCCESS;
    
    while (true) {
        SrsCommonMessage* msg = NULL;
        
        if ((ret = recv_interlaced_message(&msg)) != ERROR_SUCCESS) {
            if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
                srs_error("recv interlaced message failed. ret=%d", ret);
            }
            srs_freep(msg);
            return ret;
        }
        srs_verbose("entire msg received");
        
        /* 若获取到一个空消息,则继续获取下一个消息 */
        if (!msg) {
            srs_info("got empty message without error.");
            continue;
        }
        
        if (msg->size <= 0 || msg->header.payload_length <= 0) {
            srs_trace("ignore empty message(type=%d, size=%d, time=%"PRId64", sid=%d).",
                msg->header.message_type, msg->header.payload_length,
                msg->header.timestamp, msg->header.stream_id);
            srs_freep(msg);
            continue;
        }
        
        /* 该函数首先检测当前接收到的字节数是否已经达到当前窗口大小,若是,则回应客户端窗口消息
         * 然后接着对接收到的若为 应答窗口大小(5)、设置块大小(1)、用户控制消息(4) 则会进行解码,
         * 并根据解析后的内容更新当前 rtmp 服务器的上下文信息 */
        if ((ret = on_recv_message(msg)) != ERROR_SUCCESS) {
            srs_error("hook the received msg failed. ret=%d", ret);
            srs_freep(msg);
            return ret;
        }
        
        srs_verbose("got a msg, cid=%d, type=%d, size=%d, time=%"PRId64, 
            msg->header.perfer_cid, msg->header.message_type, msg->header.payload_length, 
            msg->header.timestamp);
        *pmsg = msg;
        break;
    }
}

3.2 SrsProtocol::recv_interlaced_message

int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg)
{
    int ret = ERROR_SUCCESS;
    
    // chunk stream basic header.
    char fmt = 0;
    int cid = 0;
    /* 读取 chunk 的基本头 */
    if ((ret = read_basic_header(fmt, cid)) != ERROR_SUCCESS) {
        if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
            srs_error("read basic header failed. ret=%d", ret);
        }
        return ret;
    }
    srs_verbose("read basic header success. fmt=%d, cid=%d", fmt, cid);
    
    // the cid must not negative.
    srs_assert(cid >= 0);
    
    // get the cached chunk stream.
    SrsChunkStream* chunk = NULL;
    
    /* 一个消息客户端可能会分成几个 chunk 发送,因此需要把每次读取
     * 的 chunk 的信息和负载缓存起来 */
    // use chunk stream cache to get the chunk info.
    // @see https://github.com/ossrs/srs/issues/249
    if (cid < SRS_PERF_CHUNK_STREAM_CACHE) {
        // chunk stream cache hit.
        srs_verbose("cs-cache hit, cid=%d", cid);
        // already init, use it direclty
        chunk = cs_cache[cid];
        srs_verbose("cached chunk stream: fmt=%d, cid=%d, size=%d, "
                    "message(type=%d, size=%d, time=%"PRId64", sid=%d)",
            chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0), 
            chunk->header.message_type, chunk->header.payload_length,
            chunk->header.timestamp, chunk->header.stream_id);
    } else {
        // chunk stream cache miss, use map.
        if (chunk_streams.find(cid) == chunk_streams.end()) {
            chunk = chunk_streams[cid] = new SrsChunkStream(cid);
            // set the perfer cid of chunk,
            // which will copy to the message received.
            chunk->header.perfer_cid = cid;
            srs_verbose("cache new chunk stream: fmt=%d, cid=%d", fmt, cid);
        } else {
            chunk = chunk_streams[cid];
            srs_verbose("cached chunk stream: fmt=%d, cid=%d, size=%d, "
                        "message(type=%d, size=%d, time=%"PRId64", sid=%d)",
                chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0), 
                chunk->header.message_type, chunk->header.payload_length,
                chunk->header.timestamp, chunk->header.stream_id);
        }
    }

    // chunk stream message header
    if ((ret = read_message_header(chunk, fmt)) != ERROR_SUCCESS) {
        if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
            srs_error("read message header failed. ret=%d", ret);
        }
        return ret;
    }
    srs_verbose("read message header success. fmt=%d, ext_time=%d, size=%d, "
            "message(type=%d, size=%d, time=%"PRId64", sid=%d)", 
            fmt, chunk->extended_timestamp, (chunk->msg? chunk->msg->size : 0), 
            chunk->header.message_type, chunk->header.payload_length, 
            chunk->header.timestamp, chunk->header.stream_id);
    
    // read msg payload from chunk stream.
    SrsCommonMessage* msg = NULL;
    if ((ret = read_message_payload(chunk, &msg)) != ERROR_SUCCESS) {
        if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
            srs_error("read message payload failed. ret=%d", ret);
        }
        return ret;
    }
    
    // not got an entire RTMP message, try next chunk.
    if (!msg) {
        srs_verbose("get partial message success. size=%d, "
                    "message(type=%d, size=%d, time=%"PRId64", sid=%d)",
                (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), 
                chunk->header.message_type, chunk->header.payload_length,
                chunk->header.timestamp, chunk->header.stream_id);
        return ret;
    }
    
    /* 获取到完整的消息 */
    *pmsg = msg;
    srs_info("get entire message success. size=%d, "
             "message(type=%d, size=%d, time=%"PRId64", sid=%d)",
            (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), 
            chunk->header.message_type, chunk->header.payload_length,
            chunk->header.timestamp, chunk->header.stream_id);
            
    return ret;
}

4. 处理推流消息:SrsPublishRecvThread::handle

int SrsPublishRecvThread::handle(SrsCommonMessage* msg)
{
    int ret = ERROR_SUCCESS;
    
    // when cid changed, change it.
    if (ncid != cid) {
        _srs_context->set_id(ncid);
        cid = ncid;
    }
    
    /* 每接收到一个消息,该将该消息计数值加 1 */
    _nb_msgs++;
    
    /* 若当前消息为视频,则视频帧数加 1 */
    if (msg->header.is_video()) {
        video_frames++;
    }

    /* log to show the time of recv thread. */    
    srs_verbose("recv thread now=%"PRId64"us, got msg time=%"PRId64"ms, size=%d",
        srs_update_system_time_ms(), msg->header.timestamp, msg->size);

    /* the rtmp connection will handle this message. */
    ret = _conn->handle_publish_message(_source, msg, _is_fmle, _is_edge);
    
    /* must always free it,
     * the source will copy it if need to use. */
    srs_freep(msg);
    
    return ret;
}

该函数接着主要调用 SrsRtmpConn 实现的 handle_publish_message 函数。

4.1 SrsRtmpConn::handle_publish_message

int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg, 
    bool is_fmle, bool vhost_is_edge)
{
    int ret = ERROR_SUCCESS;
    
    /* process publish event. */
    if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {
        SrsPacket* pkt = NULL;
        if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
            srs_error("fmle decode unpublish message failed. ret=%d", ret);
            return ret;
        }
        SrsAutoFree(SrsPacket, pkt);
        
        /* for flash, any packet is republish. */
        if (!is_fmle) {
            /* flash unpublish.
             * TODO: maybe need to support republish. */
            srs_trace("flash flash publish finished.");
            return ERROR_CONTROL_REPUBLISH;
        }
        
        /* for fmle, drop others except the fmle start packet. */
        if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {
            SrsFMLEStartPacket* unpublish = dynamic_cast<SrsFMLEStartPacket*>(pkt);
            if ((ret = rtmp->fmle_unpublish(res->stream_id, unpublish->transaction_id)) 
                != ERROR_SUCCESS) {
                return ret;
            }
            return ERROR_CONTROL_REPUBLISH;
        }
        
        srs_trace("fmle ignore AMF0/AMF3 command message.");
        return ret;
    }
    
    /* video, audio, data message */
    if ((ret = process_publish_message(source, msg, vhost_is_edge)) != ERROR_SUCCESS) {
        srs_error("fmle process publish message failed. ret=%d", ret);
        return ret;
    }
    
    return ret;
}

这里暂先不分析接收到 unpublish 的情况,而对于接收到 video、audio 和 data message 等消息情况下,直接调用 SrsRtmpConn 实现的 process_publish_message 进行处理。

5. 媒体数据的处理:SrsRtmpConn::process_publish_message

int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg, 
    bool vhost_is_edge)
{
    int ret = ERROR_SUCCESS;
    
    // for edge, directly proxy message to origin.
    if (vhost_is_edge) {
        if ((ret = source->on_edge_proxy_publish(msg)) != ERROR_SUCCESS) {
            srs_error("edge publish proxy msg failed. ret=%d", ret);
            return ret;
        }
        return ret;
    }
    
    // process audio packet
    if (msg->header.is_audio()) {
        if ((ret = source->on_audio(msg)) != ERROR_SUCCESS) {
            srs_error("source process audio message failed. ret=%d", ret);
            return ret;
        }
        return ret;
    } 
    // process video packet
    if (msg->header.is_video()) {
        if ((ret = source->on_video(msg)) != ERROR_SUCCESS) {
            srs_error("source process video message failed. ret=%d", ret);
            return ret;
        }
        return ret;
    }
    
    // process aggregate packet
    if (msg->header.is_aggregate()) {
        if ((ret = source->on_aggregate(msg)) != ERROR_SUCCESS) {
            srs_error("source process aggregate message failed. ret=%d", ret);
            return ret;
        }
        return ret;
    }
    
    // process onMetadata
    if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
        SrsPacket* pkt = NULL;
        /* 解析元数据 */
        if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
            srs_error("decode onMetaData message failed. ret=%d", ret);
            return ret;
        }
        SrsAutoFree(SrsPacket, pkt);
        
        if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {
            SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);
            if ((ret = source->on_meta_data(msg, metadata)) != ERROR_SUCCESS) {
                srs_error("source process onMetaData message failed. ret=%d", ret);
                return ret;
            }
            srs_info("process onMetaData message success.");
            return ret;
        }
        
        srs_info("ignore AMF0/AMF3 data message.");
        return ret;
    }
    
    return ret;
}

5.1 onMetaData

通常接收到的第一个媒体数据包一般为 onMetaData,抓包图如下图所示。
技术分享图片
接收到 onMetaData 数据包后,需要调用 SrsRtmpServer 实现的 decode_message 函数对该包进行解码。

5.1.1 SrsRtmpServer::decode_message

int SrsRtmpServer::decode_message(SrsCommonMessage* msg, SrsPacket** ppacket)
{
    return protocol->decode_message(msg, ppacket);
}

该函数接着调用 SrsProtocol 实现的 decode_message 函数。

5.1.2 SrsProtocol::decode_message

int SrsProtocol::decode_message(SrsCommonMessage* msg, SrsPacket** packet)
{
    *ppacket = NULL;
    
    int ret = ERROR_SUCCESS;
    
    srs_assert(msg != NULL);
    srs_assert(msg->payload != NULL);
    srs_assert(msg->size > 0);
    
    SrsStream stream;
    
    // initialize the decode stream for all message,
    // it‘s ok for the initialize if fast and without memory copy.
    if ((ret = stream.initialize(msg->payload, msg->size)) != ERROR_SUCCESS) {
        srs_error("initialize stream failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("decode stream initialized success");
    
    // decode the packet.
    SrsPacket* packet = NULL;
    if ((ret = do_decode_message(msg->header, &stream, &packet)) != ERROR_SUCCESS) {
        srs_freep(packet);
        return ret;
    }
    
    // set to output ppacket only when success.
    *ppacket = packet;
    
    return ret;
}

该函数将消息的负载转化为一个字节流,便于调用 SrsProtocol 实现的 do_decode_message 函数对负载数据进行解码。

5.1.3 SrsProtocol::do_decode_message

int SrsProtocol::do_decode_message(SrsMessageHeader& header, 
    SrsStream* stream, SrsPacket* ppacket)
{
    int ret = ERROR_SUCCESS;
    
    SrsPacket* packet = NULL;
    
    // decode specified packet type
    if (header.is_amf0_command() || header.is_amf3_command() || 
        header.is_amf0_data()    || header.is_amf3_data(0)
    {
        srs_verbose("start to decode AMF0/AMF3 command message.");
        
        // skip 1bytes to decode the amf3 command.
        if (header.is_amf3_command() && stream->require(1)) {
            srs_verbose("skip 1bytes to decode AMF3 command");
            stream->skip(1);
        }
        
        // amf0 command message.
        // need to read the command name.
        std::string command;
        if ((ret = srs_amf0_read_string(stream, command)) != ERROR_SUCCESS) {
            srs_error("decode AMF0/AMF3 command name failed. ret=%d", ret);
            return ret;
        }
        srs_verbose("AMF0/AMF3 command message, command_name=%s", command.c_str());
        
        // result/error packet
        if (command == RTMP_AMF0_COMMAND_RESULT || command == RTMP_AMF0_COMMAND_ERROR) {
            /* 这里先忽略,仅考虑对 amf0_data 类型的解码 */
            ...
        }
        
        // reset to zero(amf3 to 1) to restart decode.
        stream->skip(-1 * stream->pos());
        if (header.is_amf3_command()) {
            stream->skip(1);
        }
        
        // decode command object.
        if (command == RTMP_AMF0_COMMAND_CONNECT)
        {
            ...
        }
        ...
        /* "@setDataFrame" or "onMetaData" */
        else if (command == SRS_CONSTS_RTMP_SET_DATAFRAME || 
                 command == SRS_CONSTS_RTMP_ON_METADATA) {
            srs_info("decode the AMF0/AMF3 data(onMetaData message).");
            *ppacket = packet = new SrsOnMetaDataPacket();
            /* 调用 SrsOnMetaDataPacket 类实现的 decode 函数 */
            return packet->decode(stream);
        }
        ...
        
        // default packet to drop message.
        srs_info("drop the AMF0/AMF3 command message, command_name=%s", command.c_str());
        *ppacket = packet = new SrsPacket();
        return ret;
    } else if (header.is_user_control_message()) {
        ...
    } else if
    ...
    
    return ret;
}

对于接收到的 amf_data 类型的数据,统一构造一个 SrsOnMetaDataPacket 类,然后调用该类实现的 decode 函数进行解码。

5.1.4 SrsOnMetaDataPacket 构造函数

/**
 * the stream metadata.
 * FMLE: @setDataFrame
 * others: onMetaData
 */
SrsOnMetaDataPacket::SrsOnMetaDataPacket()
{
    name = SRS_CONSTS_RTMP_ON_METADATA;
    /**
     * Metadata of stream.
     * @remark, never be NULL, an AMF0 object instance.
     */
    metadata = SrsAmf0Any::object();
}

若为 FMLE(Flash Media Live Encoder) 软件,则发送的 amf0_data 消息名为 "@setDataFrame",其他的则为 "onMetaData"。

5.1.5 SrsOnMetaDataPacket::decode

int SrsOnMetaDataPacket::decode(SrsStream* stream)
{
    int ret = ERROR_SUCCESS;
    
    if ((ret = srs_amf0_read_string(stream, name)) != ERROR_SUCCESS) {
        srs_error("decode metadata name failed. ret=%d", ret);
        return ret;
    }
    
    // ignore the @setDataFrame
    if (name == SRS_CONSTS_RTMP_SET_DATAFRAME) {
        /* 名称以 "onMetaData" 为准 */
        if ((ret = srs_amf0_read_string(stream, name)) != ERROR_SUCCESS) {
            srs_error("decode metadata name failed. ret=%d", ret);
            return ret;
        }
    }
    
    srs_verbose("decode metadata name success. name=%s", name.c_str());
    
    // the metadata mayby object or ecma array
    SrsAmf0Any* any = NULL;
    /* 由上图知,该 metadata 的数据类型是 ecma array */
    if ((ret = srs_amf0_read_any(stream, &any)) != ERROR_SUCCESS) {
        srs_error("decode metadata metadata failed. ret=%d", ret);
        return ret;
    }
    
    srs_assert(any);
    if (any_is_object()) {
        srs_freep(metadata);
        metadata = any->to_object();
        srs_info("decode metadata object success");
        return ret;
    }
    
    SrsAutoFree(SrsAmf0Any, any);
    
    if (any->is_ecma_array()) {
        SrsAmf0EcmaArray* arr = any->to_ecma_array();
    
        // if ecma array, copy to object.
        for (int i = 0; i < arr->count(); i++) {
            /* 将解析出来的数据拷贝到 metadata 的 properties 中,
             * metadata 是指向 SrsAmf0Object 对象的指针 */
            metadata->set(arr->key_at(i), arr->value_at(i)->copy());
        }
        
        srs_info("decode metadata array success");
    }
    
    return ret;
}

该函数主要是解析 metadata 数据,然后将其保存在 SrsOnMetaDataPacket 类的成员 metadata 中。

5.1.6 srs_amf0_read_any

int srs_amf0_read_any(SrsStream* stream, SrsAmf0Any** ppvalue)
{
    int ret = ERROR_SUCCESS;
    
    /* 读取 marker,发现是 ecma array 类型,则会构造一个 SrsAmf0EcmaArray 对象,
     * 通过 ppvalue 返回该对象 */
    if ((ret = SrsAmf0Any::discovery(stream, ppvalue)) != ERROR_SUCCESS) {
        srs_error("amf0 discovery any elem failed. ret=%d", ret);
        return ret;
    }
    
    srs_assert(*ppvalue);
    
    /* 调用 SrsAmf0EcmaArray 类实现的 read 函数读取metadata携带的各项property */
    if ((ret = (*ppvalue)->read(stream)) != ERROR_SUCCESS) {
        srs_error("amf0 parse elem failed. ret=%d", ret);
        srs_freep(*ppvalue);
        return ret;
    }
    
    return ret;
}

5.1.7 SrsAmf0EcmaArray::read

int SrsAmf0EcmaArray::read(SrsStream* stream)
{
    int ret = ERROR_SUCCESS;
    
    // marker
    if (!stream->require(1)) {
        ret = ERROR_RTMP_AMF0_DECODE;
        srs_error("amf0 read ecma_array marker failed. ret=%d", ret);
        return ret;
    }
    
    /* 读取 AMF0 type:ECMA array 为 0x08 */
    char marker = stream->read_1bytes();
    if (marker != RTMP_AMF0_EcmaArray) {
        ret = ERROR_RTMP_AMF0_DECODE;
        srs_error("amf0 check ecma_array marker failed. "
            "marker=%#x, required=%#x, ret=%d", marker, RTMP_AMF0_EcmaArray, ret);
        return ret;
    }
    srs_verbose("amf0 read ecma_array marker success");

    // count
    if (!stream->require(4)) {
        ret = ERROR_RTMP_AMF0_DECODE;
        srs_error("amf0 read ecma_array count failed. ret=%d", ret);
        return ret;
    }

    /* 读取该 ECMA array 中有多少个 property */    
    int32_t count = stream->read_4bytes();
    srs_verbose("amf0 read ecma_array count success. count=%d", count);
    
    // value
    this->_count = count;

    while (!stream->empty()) {
        // detect whether is eof.
        if (srs_amf0_is_object_eof(stream)) {
            SrsAmf0ObjectEOF pbj_eof;
            /* ECMA array 类型同样以 0x00 0x00 0x09 结尾,与 object 一样 */
            if ((ret = pbj_eof.read(stream)) != ERROR_SUCCESS) {
                srs_error("amf0 ecma_array read eof failed. ret=%d", ret);
                return ret;
            }
            srs_info("amf0 read ecma_array EOF.");
            break;
        }
        
        // property-name: utf8 string
        std::string property_name;
        /* 读取 property 的名称 */
        if ((ret =srs_amf0_read_utf8(stream, property_name)) != ERROR_SUCCESS) {
            srs_error("amf0 ecma_array read property name failed. ret=%d", ret);
            return ret;
        }
        /* 读取 property 的值:number or string or boolean */
        // property-value: any
        SrsAmf0Any* property_value = NULL;
        if ((ret = srs_amf0_read_any(stream, &property_value)) != ERROR_SUCCESS) {
            srs_error("amf0 ecma_array read property_value failed. "
                "name=%s, ret=%d", property_name.c_str(), ret);
            return ret;
        }
        
        /* 将获取到的每一个 property 以该 property 的名称为 key,保存到 SrsAmf0EcmaArray 类的
         * 成员 properties 中,该 properties 是一个指向 SrsUnSortedHashtable 类的指针,该类的
         * 成员 properties 维护了一个 std::vector<SrsAmf0ObjectPropertyType> 容器,该容器用于
         * 存放所有获取到的 property 项 */
        // add property
        this->set(property_name, property_value);
    }
    
    return ret;
}

解析 metadata 数据成功后,接下来是调用 SrsSource 实现的 on_meta_data 函数对解析后的 metadata 做进一步的处理。

5.1.8 SrsSource::on_meta_data

int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata)
{
    int ret = ERROR_SUCCESS;
    
    /* hls 和 dvr 的暂时忽略 */
#ifdef SRS_AUTO_HLS
    if (metadata && (ret = hls->on_meta_data(metadata->metadata)) != ERROR_SUCCESS) {
        srs_error("hls process onMetaData message failed. ret=%d", ret);
        return ret;
    }
#endif
    
#ifdef SRS_AUTO_DVR
    if (metadata && (ret = dvr->on_meta_data(metadata)) != ERROR_SUCCESS) {
        srs_error("dvr process onMetaData message failed. ret=%d", ret);
        return ret;
    }
#endif

    SrsAmf0Any* prop = NULL;
    
    // when exists the duration, remove it to make ExoPlayer happy.
    if (metadata->metadata->get_property("duration") != NULL) {
        metadata->metadata->remove("duration");
    }
    
    // generate metadata info to print
    std::stringstream ss;
    if ((prop = metadata->metadata->ensure_property_number("width")) != NULL) {
        ss << ", width=" << (int)prop->to_number();
    }
    if ((prop = metadata->metadata->ensure_property_number("height")) != NULL) {
        ss << ", height=" << (int)prop->to_number();
    }
    if ((prop = metadata->metadata->ensure_property_number("videocodecid")) != NULL) {
        ss << ", vcodec=" << (int)prop->to_number();
    }
    if ((prop = metadata->metadata->ensure_property_number("audiocodecid")) != NULL) {
        ss << ", acodec=" << (int)prop->to_number();
    }
    srs_trace("got metadata%s", ss.str().c_str());
    
    // add server info to metadata.
    metadata->metadata->set("server", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER));
    metadata->metadata->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY));
    metadata->metadata->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS));
    
    // version, for example, 1.0.0
    // add version to metadata, please donot remove it, for debug.
    metadata->metadata->set("server_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));
    
    // if allow atc_auto and bravo-atc detected, open atc for vhost.
    atc = _srs_config->get_atc(_req->vhost);
    if (_srs_config->get_atc_auto(_req->vhost)) {
        if ((prop = metadata->metadata->get_property("bravo_atc")) != NULL) {
            if (prop->is_string() && prop->to_str() == "true") {
                atc = true;
            }
        }
    }
    
    // encode the metadata to payload
    int size = 0;
    char* payload = NULL;
    /* 调用继承自父类 SrsPacket 的函数 encode 将 metadata 中的元数据信息编码成
     * payload */
    if ((ret = metadata->encode(size, payload)) != ERROR_SUCCESS) {
        srs_error("encode metadata error. ret=%d", ret);
        srs_freep(payload);
        return ret;
    }
    srs_verbose("encode metadata success.");
    
    if (size <= 0) {
        srs_warn("ignore the invalid metadata. size=%d", size);
        return ret;
    }
    
    // when already got metadata, drop when reduce sequence header.
    bool drop_for_reduce = false;
    if (cache_metadata && _srs_config->get_reduce_sequence_header(_req->vhost)) {
        drop_for_reduce = true;
        srs_warn("drop for reduce sh metadata, size=%d", msg->size);
    }
    
    // create a shared ptr message.
    srs_freep(cache_metadata);
    cache_metadata = new SrsSharedPtrMessage();
    
    // dump message to shared ptr message.
    // the payload/size managed by cache_metadata, user should not free it.
    if ((ret = cache_metadata->create(&msg->header, payload, size)) != ERROR_SUCCESS) {
        srs_error("initialize the cache metadata failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("initialize shared ptr metadata success.");
    
    // copy to all consumer
    if (!drop_for_reduce) {
        /* 若有其他客户端订阅了该直播流,则通知这些客户端 */
        std::vector<SrsConsumer*>::iterator it;
        for (it = consumer.begin(); it != consumers.end(); ++it) {
            SrsConsumer* consumer = *it;
            if ((ret = consumer->enqueue(cache_metadata, atc, jitter_algorithm)) 
                != ERROR_SUCCESS) {
                srs_error("dispatch the metadata failed. ret=%d", ret);
                return ret;
            }
        }
    }
    
    // copy to all forwarders
    if (true) {
        std::vector<SrsForwarder*>::iterator it;
        for (it = forwarders.begin(); it != forwarders.end(); ++it) {
            SrsForwarder* forwarder = *it;
            if ((ret = forwarder->on_meta_data(cache_metadata)) != ERROR_SUCCESS) {
                srs_error("forwarder process onMetaData message failed. ret=%d", ret);
                return ret;
            }
        }
    }
    
    return ret;
}

5.1.9 SrsPacket::encode

/*
 * the subpacket can override this encode,
 * for example, video and audio will directly set the payload without memory copy,
 * other packet which need to serialize/encode to bytes by override the
 * get_size and encode_packet.
 */
int SrsPacket::encode(int& psize, char*& ppayload) 
{
    int ret = ERROR_SUCCESS;
    
    int size = get_size();
    char* payload = NULL;
    
    SrsStream stream;
    
    if (size > 0) {
        payload = new char[size];
        
        if ((ret = stream.initialize(payload, sizse)) != ERROR_SUCCESS) {
            srs_error("initialize the stream failed. ret=%d", ret);
            srs_freepa(payload);
            return ret;
        }
    }
    
    /* 调用 SrsOnMetaDataPacket 类实现的 encode_packet 函数 */
    if ((ret = encode_packet(&stream)) != ERROR_SUCCESS) {
        srs_error("encode the packet failed. ret=%d", ret);
        srs_freepa(payload);
        return ret;
    }
    
    psize = size;
    ppayload = payload;
    srs_verbose("encode the packet success. size=%d", size);
    
    return ret;
}

5.1.10 SrsOnMetaDataPacket::encode_packet

int SrsOnMetaDataPacket::encode_packet(SrsStream* stream)
{
    int ret = ERROR_SUCCESS;
    
    if ((ret = srs_amf0_write_string(stream, name)) != ERROR_SUCCESS) {
        srs_error("encode name failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("encode name success.");
    
    /* 调用 SrsAmf0Object 类实现的 write 函数 */
    if ((ret = metadata->write(stream)) != ERROR_SUCCESS) {
        srs_error("encode metadata failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("encode metadata success.");
    
    srs_info("encode onMetaData packet success.");
    return ret;
}

5.1.11 SrsAmf0Object::write

int SrsAmf0Object::write(SrsStream* stream)
{
    int ret = ERROR_SUCCESS;
    
    // marker
    if (!stream->require(1)) {
        ret = ERROR_RTMP_AMF0_ENCODE;
        srs_error("amf0 write object marker failed. ret=%d", ret);
        return ret;
    }
    
    /* 写入 1 字节的 amf 类型 */
    stream->write_1bytes(RTMP_AMF0_Object);
    srs_verbose("amf0 write object marker success");
    
    // value
    for (int i = 0; i < properties->count(); i++) {
        std::string name = this->key_at(i);
        SrsAmf0Any* any = this->value_at(i);
        
        if ((ret = srs_amf0_write_utf8(stream, name)) != ERROR_SUCCESS) {
            srs_error("write object property name failed. ret=%d", ret);
            return ret;
        }
        
        if ((ret = srs_amf0_write_any(stream, any)) != ERROR_SUCCESS) {
            srs_error("write object property value failed. ret=%d", ret);
            return ret;
        }
        
        srs_verbose("write amf0 property success. name=%s", name.c_str());
    }
    
    /* 写入结束标志 0x00 0x00 0x09 */
    if ((ret = eof->write(stream)) != ERROR_SUCCESS) {
        srs_error("write object eof failed. ret=%d", ret);
        return ret;
    }
    
    srs_verbose("write amf0 object success.");
    
    return ret;
}

5.1.12 SrsSharedPtrMessage 构造函数

/*
 * shared ptr message.
 * for audio/video/data message that need less memory copy.
 * and only for output.
 *
 * create first object by constructor and create(),
 * use copy if need reference count message.
 */
SrsSharedPtrMessage::SrsSharedPtrMessage()
{
    ptr = NULL;
}

5.1.13 SrsSharedPtrMessage::create

/*
 * create shared ptr message,
 * from the header and payload.
 * @remark user should never free the payload.
 * @param pheader, the header to copy to the message. NULL to ignore.
 */
int SrsSharedPtrMessage::create(SrsMessageHeader* pheader, char* payload, int size)
{
    int ret = ERROR_SUCCESS;
    
    if (ptr) {
        ret = ERROR_SYSTEM_ASSERT_FAILED;
        srs_error("should not set the payload twice. ret=%d", ret);
        srs_assert(false);
        
        return ret;
    }
    
    /* 构造 SrsSharedPtrPayload */
    ptr = new SrsSharedPtrPayload();
    
    /* 将解析自 metadata 消息的消息头内容赋给 ptr->header 的相应成员 */
    // direct attach the data.
    if (pheader) {
        /* amf0_data: 0x12 */
        ptr->header.message_type = pheader->message_type;
        /* 负载的大小 */
        ptr->header.payload_length = size;
        ptr->header.perfer_cid = pheader->perfer_cid;
        this->timestamp = pheader->timestamp;
        this->stream_id = pheader->stream_id;
    }
    
    ptr->payload = payload;
    ptr->size = size;
    
    // message can access it.
    /* 
     * payload:
     * the payload of message, the SrsCommonMessage never know about the detail of payload,
     * user must use SrsProtocol.decode_message to get concrete packet.
     * @remark, not all message payload can be decoded to packet. for example,
     *       video/audio packet use raw bytes, no video/audio packet.
     */
    this->payload = ptr->payload;
    /*
     * current message parsed size,
     *     size <= header.payload_length
     * for the payload maybe sent in multiple chunks.
     */
    this->size = ptr->size;
    
    return ret;
}

5.1.14 SrsSharedPtrPayload 构造函数

SrsSharedPtrMessage::SrsSharedPtrPayload::SrsSharedPtrPayload()
{
    /* actual shared payload. */
    payload = NULL;
    /* size of payload. */
    size = 0;
    /* the reference count */
    shared_count = 0;
}

5.1.15 通知消费者:SrsConsumer::enqueue

/**
 * enqueue an shared ptr message.
 * @param shared_msg, directly ptr, copy it if need to save it.
 * @param whether atc, donot use jitter correct if true.
 * @param ag the algorithm of time jitter.
 */
int SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, 
    SrsRtmpJitterAlgorithm ag)
{
    int ret = ERROR_SUCCESS;
    
    /* 拷贝一个副本返回给 msg */
    SrsSharedPtrMessage* msg = shared_msg->copy();
    
    /* 若 atc 为 false,则使用 jitter 进行校正 */
    if (!atc) {
        if ((ret = jitter->correct(msg, ag)) != ERROR_SUCCESS) {
            srs_freep(msg);
            return ret;
        }
    }
    
    if ((ret = queue->enqueue(msg, NULL)) != ERROR_SUCCESS) {
        return ret;
    }
    
#ifdef SRS_PERF_QUEUE_COND_WAIT
    srs_verbose("enqueue msg, time=%"PRId64", size=%d, "
                "duration=%d, waiting=%d, min_msg=%d", 
        msg->timestamp, msg->size, queue->duration(), mw_waiting, mw_min_msgs);
        
    // fire the mw when msgs is enough.
    /* 若有消费者,即播放客户端正在等待 msg 准备好,即上面的 queue->enqueue 成功返回 */
    if (mw_waiting) {
        int duration_ms = queue->duration();
        bool match_min_msgs = queue->size() > mw_min_msgs;
        
        // For ATC, maybe the SH timestamp bigger than A/V packet,
        // when encoder republish or overflow.
        // @see https://github.com/ossrs/srs/pull/749
        if (atc && duration_ms < 0) {
            st_cond_signal(mw_wait);
            mw_waiting = false;
            return ret;
        }
        
        // when duration ok, signal to flush.
        if (match_min_msgs && duration_ms > mw_duration) {
            st_cond_signal(mw_wait);
            mw_waitting = false;
            return ret;
        }
    }
#endif

    return ret;
}

5.1.16 SrsRtmpJitter::correct

int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, SrsRtmpJitterAlgorithm ag)
{
    int ret = ERROR_SUCCESS;
    
    // for performance issue
    if (ag != SrsRtmpJitterAlgorithmFULL) {
        // all jitter correct features is disabled, ignore.
        if (ag == SrsRtmpJitterAlgorithmOFF) {
            return ret;
        }
        
        // start at zero, but donot ensure monotonically increasing.
        if (ag == SrsRtmpJitterAlgothmZERO) {
            // for the first time, last_pkt_corrent_time is -1.
            if (last_pkt_correct_time == -1) {
                last_pkt_correct_time = msg->timestamp;
            }
            msg->timestamp -= last_pkt_correct_time;
            return ret;
        }
        
        // other algorithm, ignore.
        return ret;
    }
    
    // full jitter algorithm, do jitter correct.
    // set to 0 for metadata.
    if (!msg->is_av()) {
        msg->timestamp = 0;
        return ret;
    }
    
    /**
     * we use a very simple time jitter detect/correct algorithm:
     * 1. delta: ensure the delta is positive and valid,
     *     we set the delta to DEFAULT_FRAME_TIME_MS,
     *     if the delta of time is nagative or greater than CONST_MAX_JITTER_MS.
     * 2. last_pkt_time: specifies the original packet time,
     *     is used to detect next jitter.
     * 3. last_pkt_correct_time: simply add the positive delta,
     *     and enforce the time monotonically.
     */
    int64_t time = msg->timestamp;
    int64_t delta = time - last_pkt_time;
    
    // if jitter detected, reset the delta.
    if (delta < CONST_MAX_JITTER_MS_NED || delta > CONST_MAX_JITTER_MS) {
        // use default 10ms to notice the problem of stream.
        // @see https://github.com/ossrs/srs/issues/425
        delta = DEFAULT_FRAME_TIME_MS;
        
        srs_info("jitter detected, last_pts=%"PRId64", pts=%"PRId64", "
                 "diff=%"PRId64", last_time=%"PRId64", time=%"PRId64", diff=%"PRId64"",
            last_pkt_time, time, time - last_pkt_time, last_pkt_correct_time, 
            last_pkt_correct_time + delta, delta);
    } else {
        srs_verbose("timestamp no jitter. time=%"PRId64", "
                    "last_pkt=%"PRId64", correct_to=%"PRId64"", 
            time, last_pkt_time, last_pkt_correct_time + delta);
    }
    
    last_pkt_correct_time = srs_max(0, last_pkt_correct_time + delta);
    
    msg->timestamp = last_pkt_correct_time;
    last_pkt_time = time;
    
    return ret;
}

若传入的第二个参数为 SrsRtmpJitterAlgorithmOFF,则禁止所有的 jitter 校正,构造 SrsSource 的时候默认初始化为 SrsRtmpJitterAlgorithmOFF。

5.1.17 SrsMessageQueue::enqueue

/*
 * enqueue the message, the timestamp always monotonically.
 * @param msg, the msg to enqueue, user never free it whatever the return code.
 * @param is_overflow, whether overflow and shrinked. NULL to ignore.
 */
int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow)
{
    int ret = ERROR_SUCCESS;
    
    if (msg->is_av()) {
        if (av_start_time == -1) {
            av_start_time = msg->timestamp;
        }
        
        av_end_time = msg->timestamp;
    }
    
    /* 若声明了 SRS_PERF_QUEUE_FAST_VECTOR 宏,则调用 SrsFastVector 类
     * 实现的 push_back 函数 */
    msgs.push_back(msg);
    
    /* 检测 msgs 队列是否溢出 */
    while (av_end_time - av_start_time > queue_size_ms) {
        // notice the caller queue already overflow and sharinked.
        if (is_overflow) {
            *is_overflow = true;
        }
        
        /* 满溢的情况下,移除一个 gop */
        sharink();
    }
    
    return ret;
}

5.1.18 SrsFastVector::push_back

void SrsFastVector::push_back(SrsSharedPtrMessage* msg)
{
    // increase vector.
    if (count >= nb_msgs) {
        int size = nb_msgs * 2;
        SrsSharedPtrMessage** buf = new SrsSharedPtrMessage*[size];
        for (int i = 0; i < nb_msgs; i++) {
            buf[i] = msgs[i];
        }
        srs_warn("fast vector incrase %d=>%d", nb_msgs, size);
        
        // use new array.
        srs_freep(msgs);
        msgs = buf;
        nb_msgs = size;
    }
    
    /* msgs 指向一个 SrsSharedPtrMessage 类型的二级数组,该数组的成员是
     * SrsSharedPtrMessage* */
    msgs[count++] = msg;
} 

该函数是直接将 msg 放入到 SrsFastVector 类的成员 msgs 数组中(若该数组大小足够的话)。

5.1.19 SrsMessageQueue::shrink

/*
 * remove a gop from the front.
 * if no iframe found, clear it.
 */
void SrsMessageQueue::shrink()
{
    SrsSharedPtrMessage* video_sh = NULL;
    SrsSharedPtrMessage* audio_sh = NULL;
    int msgs_size = (int)msgs.size();
    
    // remove all msg
    // ignore the sequence header
    for (int i = 0; i < (int)msgs.size(); i++) {
        SrsSharedPtrMessage* msg = msgs.at(i);
        
        if (msg->is_video() && 
            SrsFlvCodec::video_is_sequence_header(msg->payload, msg->size)) 
        {
            srs_freep(video_sh);
            video_sh = msg;
            continue;
        }
        else if (msg->is_audio() && 
                 SrsFlvCodec::audio_is_sequence_header(msg->payload, msg->size)) 
        {
            srs_freep(audio_sh);
            audio_sh = msg;
            continue;
        }
        
        srs_freep(msg);
    }
    msgs.clear();
    
    // update av_start_time
    av_start_time = av_end_time;
    // push_back sequence header and update timestamp
    if (video_sh) {
        video_sh->timestamp = av_end_time;
        msgs.push_back(video_sh);
    }
    if (audio_sh) {
        audio_sh->timestamp = av_end_time;
        msgs.push_back(audio_sh);
    }
    
    if (_ignore_shrink) {
        srs_info("shrink the cache queue, size=%d, removed=%d, max=%.2f", 
            (int)msgs.size(), msgs_size - (int)msgs.size(), queue_size_ms / 1000.0);
    } else {
        srs_trace("shrink the cache queue, size=%d, removed=%d, max=%.2f", 
            (int)msgs.size(), msgs_size - (int)msgs.size(), queue_size_ms / 1000.0);
    }
}

5.1.20 SrsMessageQueue::duration

/* get the duration of queue. */
int SrsMessageQueue::duration()
{
    return (int)(av_end_time - av_start_time);
}

计算当前消息队列中所有消息的总 duration。

5.1.21 st_cond_signal

int st_cond_signal(_st_cond_t *cvar)
{
  return _st_cond_signal(cvar, 0);
}

5.1.22 _st_cond_signal

static int _st_cond_signal(_st_cond_t *cvar, int broadcast)
{
    _st_thread_t *thread;
    _st_clist_t *q;
    
    for (q = cvar->wait_q.next; q != &cvar->wait_q; q = q->next) {
        thread = _ST_THREAD_WAITQ_PTR(q);
        if (thread->state == _ST_ST_COND_WAIT) {
            if (thread->flags & _ST_ST_ON_SLEEPQ) 
                _ST_DEL_SLEEPQ(thread);
            
            /* Make thread runnable */
            thread->state = _ST_ST_RUNNABLE;
            _ST_ADD_RUNQ(thread);
            if (!broadcast)
                break;s
        }
    }
    
    return 0;
}

5.2 Audio

假设接收到的第一个音频包如下图。
技术分享图片

SRS之接收推流线程:recv

标签:share   nat   _id   OLE   dsr   memory   please   int   before   

原文地址:https://www.cnblogs.com/jimodetiantang/p/9098003.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!