rtp传送开始于函数:MediaSink::startPlaying()。想想也有道理,应是sink跟source要数据,所以从sink上调用startplaying(嘿嘿,相当于directshow的拉模式)。
看一下这个函数:
- Boolean MediaSink::startPlaying(MediaSource& source,
- afterPlayingFunc* afterFunc, void* afterClientData)
- {
-
-
- if (fSource != NULL) {
- envir().setResultMsg("This sink is already being played");
- return False;
- }
-
-
-
- if (!sourceIsCompatibleWithUs(source)) {
- envir().setResultMsg(
- "MediaSink::startPlaying(): source is not compatible!");
- return False;
- }
-
- fSource = (FramedSource*) &source;
-
-
- fAfterFunc = afterFunc;
- fAfterClientData = afterClientData;
- return continuePlaying();
- }
为了进一步封装(让继承类少写一些代码),搞出了一个虚函数continuePlaying()。让我们来看一下:
- Boolean MultiFramedRTPSink::continuePlaying() {
-
-
- buildAndSendPacket(True);
- return True;
- }
MultiFramedRTPSink是与帧有关的类,其实它要求每次必须从source获得一个帧的数据,所以才叫这个name。可以看到continuePlaying()完全被buildAndSendPacket()代替。看一下buildAndSendPacket():
- void MultiFramedRTPSink::buildAndSendPacket(Boolean isFirstPacket)
- {
-
- fIsFirstPacket = isFirstPacket;
-
-
-
- unsigned rtpHdr = 0x80000000;
- rtpHdr |= (fRTPPayloadType << 16);
- rtpHdr |= fSeqNo;
- fOutBuf->enqueueWord(rtpHdr);
-
-
-
-
- fTimestampPosition = fOutBuf->curPacketSize();
- fOutBuf->skipBytes(4);
-
-
- fOutBuf->enqueueWord(SSRC());
-
-
-
-
- fSpecialHeaderPosition = fOutBuf->curPacketSize();
- fSpecialHeaderSize = specialHeaderSize();
- fOutBuf->skipBytes(fSpecialHeaderSize);
-
-
-
- fTotalFrameSpecificHeaderSizes = 0;
- fNoFramesLeft = False;
- fNumFramesUsedSoFar = 0;
-
- packFrame();
- }
继续看packFrame():
- void MultiFramedRTPSink::packFrame()
- {
-
- if (fOutBuf->haveOverflowData()) {
-
-
- unsigned frameSize = fOutBuf->overflowDataSize();
- struct timeval presentationTime = fOutBuf->overflowPresentationTime();
- unsigned durationInMicroseconds =fOutBuf->overflowDurationInMicroseconds();
- fOutBuf->useOverflowData();
-
-
- afterGettingFrame1(frameSize, 0, presentationTime,durationInMicroseconds);
- } else {
-
-
- if (fSource == NULL)
- return;
-
-
-
- fCurFrameSpecificHeaderPosition = fOutBuf->curPacketSize();
- fCurFrameSpecificHeaderSize = frameSpecificHeaderSize();
- fOutBuf->skipBytes(fCurFrameSpecificHeaderSize);
- fTotalFrameSpecificHeaderSizes += fCurFrameSpecificHeaderSize;
-
-
-
- fSource->getNextFrame(fOutBuf->curPtr(),
- fOutBuf->totalBytesAvailable(),
- afterGettingFrame,
- this,
- ourHandleClosure,
- this);
- }
- }
可以想像下面就是source从文件(或某个设备)中读取一帧数据,读完后返回给sink,当然不是从函数返回了,而是以调用afterGettingFrame这个回调函数的方式。所以下面看一下afterGettingFrame():
- void MultiFramedRTPSink::afterGettingFrame(void* clientData,
- unsigned numBytesRead, unsigned numTruncatedBytes,
- struct timeval presentationTime, unsigned durationInMicroseconds)
- {
- MultiFramedRTPSink* sink = (MultiFramedRTPSink*) clientData;
- sink->afterGettingFrame1(numBytesRead, numTruncatedBytes, presentationTime,
- durationInMicroseconds);
- }
没什么可看的,只是过度为调用成员函数,所以afterGettingFrame1()才是重点:
- void MultiFramedRTPSink::afterGettingFrame1(
- unsigned frameSize,
- unsigned numTruncatedBytes,
- struct timeval presentationTime,
- unsigned durationInMicroseconds)
- {
- if (fIsFirstPacket) {
-
- gettimeofday(&fNextSendTime, NULL);
- }
-
-
-
- if (numTruncatedBytes > 0) {
-
-
- unsigned const bufferSize = fOutBuf->totalBytesAvailable();
- envir()
- << "MultiFramedRTPSink::afterGettingFrame1(): The input frame data was too large for our buffer size ("
- << bufferSize
- << "). "
- << numTruncatedBytes
- << " bytes of trailing data was dropped! Correct this by increasing \"OutPacketBuffer::maxSize\" to at least "
- << OutPacketBuffer::maxSize + numTruncatedBytes
- << ", *before* creating this ‘RTPSink‘. (Current value is "
- << OutPacketBuffer::maxSize << ".)\n";
- }
- unsigned curFragmentationOffset = fCurFragmentationOffset;
- unsigned numFrameBytesToUse = frameSize;
- unsigned overflowBytes = 0;
-
-
-
-
-
-
-
- if (fNumFramesUsedSoFar > 0) {
-
- if ((fPreviousFrameEndedFragmentation && !allowOtherFramesAfterLastFragment())
- || !frameCanAppearAfterPacketStart(fOutBuf->curPtr(), frameSize))
- {
-
- numFrameBytesToUse = 0;
- fOutBuf->setOverflowData(fOutBuf->curPacketSize(), frameSize,
- presentationTime, durationInMicroseconds);
- }
- }
-
-
- fPreviousFrameEndedFragmentation = False;
-
-
-
- if (numFrameBytesToUse > 0) {
-
- if (fOutBuf->wouldOverflow(frameSize)) {
-
-
-
-
- if (isTooBigForAPacket(frameSize)
- && (fNumFramesUsedSoFar == 0 || allowFragmentationAfterStart())) {
-
- overflowBytes = computeOverflowForNewFrame(frameSize);
- numFrameBytesToUse -= overflowBytes;
- fCurFragmentationOffset += numFrameBytesToUse;
- } else {
-
- overflowBytes = frameSize;
- numFrameBytesToUse = 0;
- }
- fOutBuf->setOverflowData(fOutBuf->curPacketSize() + numFrameBytesToUse,
- overflowBytes, presentationTime, durationInMicroseconds);
- } else if (fCurFragmentationOffset > 0) {
-
-
- fCurFragmentationOffset = 0;
- fPreviousFrameEndedFragmentation = True;
- }
- }
-
-
-
- if (numFrameBytesToUse == 0 && frameSize > 0) {
-
-
- sendPacketIfNecessary();
- } else {
-
-
-
- unsigned char* frameStart = fOutBuf->curPtr();
- fOutBuf->increment(numFrameBytesToUse);
-
-
-
-
- doSpecialFrameHandling(curFragmentationOffset, frameStart,
- numFrameBytesToUse, presentationTime, overflowBytes);
-
-
- ++fNumFramesUsedSoFar;
-
-
-
-
-
-
- if (overflowBytes == 0) {
- fNextSendTime.tv_usec += durationInMicroseconds;
- fNextSendTime.tv_sec += fNextSendTime.tv_usec / 1000000;
- fNextSendTime.tv_usec %= 1000000;
- }
-
-
-
-
-
-
-
-
-
- if (fOutBuf->isPreferredSize()
- || fOutBuf->wouldOverflow(numFrameBytesToUse)
- || (fPreviousFrameEndedFragmentation
- && !allowOtherFramesAfterLastFragment())
- || !frameCanAppearAfterPacketStart(
- fOutBuf->curPtr() - frameSize, frameSize)) {
-
- sendPacketIfNecessary();
- } else {
-
- packFrame();
- }
- }
- }
看一下发送数据的函数:
- void MultiFramedRTPSink::sendPacketIfNecessary()
- {
-
- if (fNumFramesUsedSoFar > 0) {
-
- #ifdef TEST_LOSS
- if ((our_random()%10) != 0)
- #endif
- if (!fRTPInterface.sendPacket(fOutBuf->packet(),fOutBuf->curPacketSize())) {
-
- if (fOnSendErrorFunc != NULL)
- (*fOnSendErrorFunc)(fOnSendErrorData);
- }
- ++fPacketCount;
- fTotalOctetCount += fOutBuf->curPacketSize();
- fOctetCount += fOutBuf->curPacketSize() - rtpHeaderSize
- - fSpecialHeaderSize - fTotalFrameSpecificHeaderSizes;
-
-
- ++fSeqNo;
- }
-
-
-
- if (fOutBuf->haveOverflowData()
- && fOutBuf->totalBytesAvailable() > fOutBuf->totalBufferSize() / 2) {
-
-
-
-
- unsigned newPacketStart = fOutBuf->curPacketSize()-
- (rtpHeaderSize + fSpecialHeaderSize + frameSpecificHeaderSize());
- fOutBuf->adjustPacketStart(newPacketStart);
- } else {
-
- fOutBuf->resetPacketStart();
- }
- fOutBuf->resetOffset();
- fNumFramesUsedSoFar = 0;
-
-
- if (fNoFramesLeft) {
-
-
- onSourceClosure(this);
- } else {
-
-
-
-
- struct timeval timeNow;
- gettimeofday(&timeNow, NULL);
- int secsDiff = fNextSendTime.tv_sec - timeNow.tv_sec;
- int64_t uSecondsToGo = secsDiff * 1000000
- + (fNextSendTime.tv_usec - timeNow.tv_usec);
- if (uSecondsToGo < 0 || secsDiff < 0) {
- uSecondsToGo = 0;
- }
-
-
-
- nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecondsToGo,
- (TaskFunc*) sendNext, this);
- }
- }
可以看到为了延迟包的发送,使用了delay task来执行下次打包发送任务。
sendNext()中又调用了buildAndSendPacket()函数,呵呵,又是一个圈圈。
总结一下调用过程:
最后,再说明一下包缓冲区的使用:
MultiFramedRTPSink中的帧数据和包缓冲区共用一个,只是用一些额外的变量指明缓冲区中属于包的部分以及属于帧数据的部分(包以外的数据叫做overflow data)。它有时会把overflow data以mem move的方式移到包开始的位置,有时把包的开始位置直接设置到overflow data开始的地方。那么这个缓冲的大小是怎样确定的呢?是跟据调用者指定的的一个最大的包的大小+60000算出的。这个地方把我搞胡涂了:如果一次从source获取一个帧的话,那这个缓冲应设为不小于最大的一个帧的大小才是,为何是按包的大小设置呢?可以看到,当缓冲不够时只是提示一下:
- if (numTruncatedBytes > 0) {
-
-
- unsigned const bufferSize = fOutBuf->totalBytesAvailable();
- envir()
- << "MultiFramedRTPSink::afterGettingFrame1(): The input frame data was too large for our buffer size ("
- << bufferSize
- << "). "
- << numTruncatedBytes
- << " bytes of trailing data was dropped! Correct this by increasing \"OutPacketBuffer::maxSize\" to at least "
- << OutPacketBuffer::maxSize + numTruncatedBytes
- << ", *before* creating this ‘RTPSink‘. (Current value is "
- << OutPacketBuffer::maxSize << ".)\n";
- }
当然此时不会出错,但有可能导致时间戳计算不准,或增加时间戳计算与source端处理的复杂性(因为一次取一帧时间戳是很好计算的)。