00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "MultiFramedRTPSource.hh"
00023 #include "GroupsockHelper.hh"
00024 #include <string.h>
00025
00027
00028 class ReorderingPacketBuffer {
00029 public:
00030 ReorderingPacketBuffer(BufferedPacketFactory* packetFactory);
00031 virtual ~ReorderingPacketBuffer();
00032 void reset();
00033
00034 BufferedPacket* getFreePacket(MultiFramedRTPSource* ourSource);
00035 Boolean storePacket(BufferedPacket* bPacket);
00036 BufferedPacket* getNextCompletedPacket(Boolean& packetLossPreceded);
00037 void releaseUsedPacket(BufferedPacket* packet);
00038 void freePacket(BufferedPacket* packet) {
00039 if (packet != fSavedPacket) {
00040 delete packet;
00041 } else {
00042 fSavedPacketFree = True;
00043 }
00044 }
00045 Boolean isEmpty() const { return fHeadPacket == NULL; }
00046
00047 void setThresholdTime(unsigned uSeconds) { fThresholdTime = uSeconds; }
00048
00049 private:
00050 BufferedPacketFactory* fPacketFactory;
00051 unsigned fThresholdTime;
00052 Boolean fHaveSeenFirstPacket;
00053 unsigned short fNextExpectedSeqNo;
00054 BufferedPacket* fHeadPacket;
00055 BufferedPacket* fSavedPacket;
00056
00057 Boolean fSavedPacketFree;
00058 };
00059
00060
00062
00063 MultiFramedRTPSource
00064 ::MultiFramedRTPSource(UsageEnvironment& env, Groupsock* RTPgs,
00065 unsigned char rtpPayloadFormat,
00066 unsigned rtpTimestampFrequency,
00067 BufferedPacketFactory* packetFactory)
00068 : RTPSource(env, RTPgs, rtpPayloadFormat, rtpTimestampFrequency) {
00069 reset();
00070 fReorderingBuffer = new ReorderingPacketBuffer(packetFactory);
00071
00072
00073 increaseReceiveBufferTo(env, RTPgs->socketNum(), 50*1024);
00074 }
00075
00076 void MultiFramedRTPSource::reset() {
00077 fCurrentPacketBeginsFrame = True;
00078 fCurrentPacketCompletesFrame = True;
00079 fAreDoingNetworkReads = False;
00080 fPacketReadInProgress = NULL;
00081 fNeedDelivery = False;
00082 fPacketLossInFragmentedFrame = False;
00083 }
00084
00085 MultiFramedRTPSource::~MultiFramedRTPSource() {
00086 fRTPInterface.stopNetworkReading();
00087 delete fReorderingBuffer;
00088 }
00089
00090 Boolean MultiFramedRTPSource
00091 ::processSpecialHeader(BufferedPacket* ,
00092 unsigned& resultSpecialHeaderSize) {
00093
00094 resultSpecialHeaderSize = 0;
00095 return True;
00096 }
00097
00098 Boolean MultiFramedRTPSource
00099 ::packetIsUsableInJitterCalculation(unsigned char* ,
00100 unsigned ) {
00101
00102 return True;
00103 }
00104
00105 void MultiFramedRTPSource::doStopGettingFrames() {
00106 fRTPInterface.stopNetworkReading();
00107 fReorderingBuffer->reset();
00108 reset();
00109 }
00110
00111 void MultiFramedRTPSource::doGetNextFrame() {
00112 if (!fAreDoingNetworkReads) {
00113
00114 fAreDoingNetworkReads = True;
00115 TaskScheduler::BackgroundHandlerProc* handler
00116 = (TaskScheduler::BackgroundHandlerProc*)&networkReadHandler;
00117 fRTPInterface.startNetworkReading(handler);
00118 }
00119
00120 fSavedTo = fTo;
00121 fSavedMaxSize = fMaxSize;
00122 fFrameSize = 0;
00123 fNeedDelivery = True;
00124 doGetNextFrame1();
00125 }
00126
00127 void MultiFramedRTPSource::doGetNextFrame1() {
00128 while (fNeedDelivery) {
00129
00130 Boolean packetLossPrecededThis;
00131 BufferedPacket* nextPacket
00132 = fReorderingBuffer->getNextCompletedPacket(packetLossPrecededThis);
00133 if (nextPacket == NULL) break;
00134
00135 fNeedDelivery = False;
00136
00137 if (nextPacket->useCount() == 0) {
00138
00139
00140 unsigned specialHeaderSize;
00141 if (!processSpecialHeader(nextPacket, specialHeaderSize)) {
00142
00143 fReorderingBuffer->releaseUsedPacket(nextPacket);
00144 fNeedDelivery = True;
00145 break;
00146 }
00147 nextPacket->skip(specialHeaderSize);
00148 }
00149
00150
00151
00152 if (fCurrentPacketBeginsFrame) {
00153 if (packetLossPrecededThis || fPacketLossInFragmentedFrame) {
00154
00155
00156 fTo = fSavedTo; fMaxSize = fSavedMaxSize;
00157 fFrameSize = 0;
00158 }
00159 fPacketLossInFragmentedFrame = False;
00160 } else if (packetLossPrecededThis) {
00161
00162 fPacketLossInFragmentedFrame = True;
00163 }
00164 if (fPacketLossInFragmentedFrame) {
00165
00166 fReorderingBuffer->releaseUsedPacket(nextPacket);
00167 fNeedDelivery = True;
00168 break;
00169 }
00170
00171
00172 unsigned frameSize;
00173 nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes,
00174 fCurPacketRTPSeqNum, fCurPacketRTPTimestamp,
00175 fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP,
00176 fCurPacketMarkerBit);
00177 fFrameSize += frameSize;
00178
00179 if (!nextPacket->hasUsableData()) {
00180
00181 fReorderingBuffer->releaseUsedPacket(nextPacket);
00182 }
00183
00184 if (fCurrentPacketCompletesFrame || fNumTruncatedBytes > 0) {
00185
00186 if (fNumTruncatedBytes > 0) {
00187 envir() << "MultiFramedRTPSource::doGetNextFrame1(): The total received frame size exceeds the client's buffer size ("
00188 << fSavedMaxSize << "). "
00189 << fNumTruncatedBytes << " bytes of trailing data will be dropped!\n";
00190 }
00191
00192 if (fReorderingBuffer->isEmpty()) {
00193
00194
00195
00196 afterGetting(this);
00197 } else {
00198
00199 nextTask() = envir().taskScheduler().scheduleDelayedTask(0,
00200 (TaskFunc*)FramedSource::afterGetting, this);
00201 }
00202 } else {
00203
00204
00205 fTo += frameSize; fMaxSize -= frameSize;
00206 fNeedDelivery = True;
00207 }
00208 }
00209 }
00210
00211 void MultiFramedRTPSource
00212 ::setPacketReorderingThresholdTime(unsigned uSeconds) {
00213 fReorderingBuffer->setThresholdTime(uSeconds);
00214 }
00215
00216 #define ADVANCE(n) do { bPacket->skip(n); } while (0)
00217
00218 void MultiFramedRTPSource::networkReadHandler(MultiFramedRTPSource* source, int ) {
00219 source->networkReadHandler1();
00220 }
00221
00222 void MultiFramedRTPSource::networkReadHandler1() {
00223 BufferedPacket* bPacket = fPacketReadInProgress;
00224 if (bPacket == NULL) {
00225
00226 bPacket = fReorderingBuffer->getFreePacket(this);
00227 }
00228
00229
00230 Boolean readSuccess = False;
00231 do {
00232 Boolean packetReadWasIncomplete = fPacketReadInProgress != NULL;
00233 if (!bPacket->fillInData(fRTPInterface, packetReadWasIncomplete)) break;
00234 if (packetReadWasIncomplete) {
00235
00236 fPacketReadInProgress = bPacket;
00237 return;
00238 } else {
00239 fPacketReadInProgress = NULL;
00240 }
00241 #ifdef TEST_LOSS
00242 setPacketReorderingThresholdTime(0);
00243
00244 if ((our_random()%10) == 0) break;
00245 #endif
00246
00247
00248 if (bPacket->dataSize() < 12) break;
00249 unsigned rtpHdr = ntohl(*(unsigned*)(bPacket->data())); ADVANCE(4);
00250 Boolean rtpMarkerBit = (rtpHdr&0x00800000) >> 23;
00251 unsigned rtpTimestamp = ntohl(*(unsigned*)(bPacket->data()));ADVANCE(4);
00252 unsigned rtpSSRC = ntohl(*(unsigned*)(bPacket->data())); ADVANCE(4);
00253
00254
00255 if ((rtpHdr&0xC0000000) != 0x80000000) break;
00256
00257
00258 unsigned cc = (rtpHdr>>24)&0xF;
00259 if (bPacket->dataSize() < cc) break;
00260 ADVANCE(cc*4);
00261
00262
00263 if (rtpHdr&0x10000000) {
00264 if (bPacket->dataSize() < 4) break;
00265 unsigned extHdr = ntohl(*(unsigned*)(bPacket->data())); ADVANCE(4);
00266 unsigned remExtSize = 4*(extHdr&0xFFFF);
00267 if (bPacket->dataSize() < remExtSize) break;
00268 ADVANCE(remExtSize);
00269 }
00270
00271
00272 if (rtpHdr&0x20000000) {
00273 if (bPacket->dataSize() == 0) break;
00274 unsigned numPaddingBytes
00275 = (unsigned)(bPacket->data())[bPacket->dataSize()-1];
00276 if (bPacket->dataSize() < numPaddingBytes) break;
00277 bPacket->removePadding(numPaddingBytes);
00278 }
00279
00280 if ((unsigned char)((rtpHdr&0x007F0000)>>16)
00281 != rtpPayloadFormat()) {
00282 break;
00283 }
00284
00285
00286 fLastReceivedSSRC = rtpSSRC;
00287 unsigned short rtpSeqNo = (unsigned short)(rtpHdr&0xFFFF);
00288 Boolean usableInJitterCalculation
00289 = packetIsUsableInJitterCalculation((bPacket->data()),
00290 bPacket->dataSize());
00291 struct timeval presentationTime;
00292 Boolean hasBeenSyncedUsingRTCP;
00293 receptionStatsDB()
00294 .noteIncomingPacket(rtpSSRC, rtpSeqNo, rtpTimestamp,
00295 timestampFrequency(),
00296 usableInJitterCalculation, presentationTime,
00297 hasBeenSyncedUsingRTCP, bPacket->dataSize());
00298
00299
00300 struct timeval timeNow;
00301 gettimeofday(&timeNow, NULL);
00302 bPacket->assignMiscParams(rtpSeqNo, rtpTimestamp, presentationTime,
00303 hasBeenSyncedUsingRTCP, rtpMarkerBit,
00304 timeNow);
00305 if (!fReorderingBuffer->storePacket(bPacket)) break;
00306
00307 readSuccess = True;
00308 } while (0);
00309 if (!readSuccess) fReorderingBuffer->freePacket(bPacket);
00310
00311 doGetNextFrame1();
00312
00313 }
00314
00315
00317
00318 #define MAX_PACKET_SIZE 10000
00319
00320 BufferedPacket::BufferedPacket()
00321 : fPacketSize(MAX_PACKET_SIZE),
00322 fBuf(new unsigned char[MAX_PACKET_SIZE]),
00323 fNextPacket(NULL) {
00324 }
00325
00326 BufferedPacket::~BufferedPacket() {
00327 delete fNextPacket;
00328 delete[] fBuf;
00329 }
00330
00331 void BufferedPacket::reset() {
00332 fHead = fTail = 0;
00333 fUseCount = 0;
00334 fIsFirstPacket = False;
00335 }
00336
00337
00338 unsigned BufferedPacket
00339 ::nextEnclosedFrameSize(unsigned char*& , unsigned dataSize) {
00340
00341
00342
00343 return dataSize;
00344 }
00345
00346 void BufferedPacket
00347 ::getNextEnclosedFrameParameters(unsigned char*& framePtr, unsigned dataSize,
00348 unsigned& frameSize,
00349 unsigned& frameDurationInMicroseconds) {
00350
00351
00352
00353
00354
00355
00356 frameSize = nextEnclosedFrameSize(framePtr, dataSize);
00357
00358 frameDurationInMicroseconds = 0;
00359 }
00360
00361 Boolean BufferedPacket::fillInData(RTPInterface& rtpInterface, Boolean& packetReadWasIncomplete) {
00362 if (!packetReadWasIncomplete) reset();
00363
00364 unsigned numBytesRead;
00365 struct sockaddr_in fromAddress;
00366 if (!rtpInterface.handleRead(&fBuf[fTail], fPacketSize-fTail, numBytesRead, fromAddress, packetReadWasIncomplete)) {
00367 return False;
00368 }
00369 fTail += numBytesRead;
00370 return True;
00371 }
00372
00373 void BufferedPacket
00374 ::assignMiscParams(unsigned short rtpSeqNo, unsigned rtpTimestamp,
00375 struct timeval presentationTime,
00376 Boolean hasBeenSyncedUsingRTCP, Boolean rtpMarkerBit,
00377 struct timeval timeReceived) {
00378 fRTPSeqNo = rtpSeqNo;
00379 fRTPTimestamp = rtpTimestamp;
00380 fPresentationTime = presentationTime;
00381 fHasBeenSyncedUsingRTCP = hasBeenSyncedUsingRTCP;
00382 fRTPMarkerBit = rtpMarkerBit;
00383 fTimeReceived = timeReceived;
00384 }
00385
00386 void BufferedPacket::skip(unsigned numBytes) {
00387 fHead += numBytes;
00388 if (fHead > fTail) fHead = fTail;
00389 }
00390
00391 void BufferedPacket::removePadding(unsigned numBytes) {
00392 if (numBytes > fTail-fHead) numBytes = fTail-fHead;
00393 fTail -= numBytes;
00394 }
00395
00396 void BufferedPacket::appendData(unsigned char* newData, unsigned numBytes) {
00397 if (numBytes > fPacketSize-fTail) numBytes = fPacketSize - fTail;
00398 memmove(&fBuf[fTail], newData, numBytes);
00399 fTail += numBytes;
00400 }
00401
00402 void BufferedPacket::use(unsigned char* to, unsigned toSize,
00403 unsigned& bytesUsed, unsigned& bytesTruncated,
00404 unsigned short& rtpSeqNo, unsigned& rtpTimestamp,
00405 struct timeval& presentationTime,
00406 Boolean& hasBeenSyncedUsingRTCP,
00407 Boolean& rtpMarkerBit) {
00408 unsigned char* origFramePtr = &fBuf[fHead];
00409 unsigned char* newFramePtr = origFramePtr;
00410 unsigned frameSize, frameDurationInMicroseconds;
00411 getNextEnclosedFrameParameters(newFramePtr, fTail - fHead,
00412 frameSize, frameDurationInMicroseconds);
00413 if (frameSize > toSize) {
00414 bytesTruncated = frameSize - toSize;
00415 bytesUsed = toSize;
00416 } else {
00417 bytesTruncated = 0;
00418 bytesUsed = frameSize;
00419 }
00420
00421 memmove(to, newFramePtr, bytesUsed);
00422 fHead += (newFramePtr - origFramePtr) + frameSize;
00423 ++fUseCount;
00424
00425 rtpSeqNo = fRTPSeqNo;
00426 rtpTimestamp = fRTPTimestamp;
00427 presentationTime = fPresentationTime;
00428 hasBeenSyncedUsingRTCP = fHasBeenSyncedUsingRTCP;
00429 rtpMarkerBit = fRTPMarkerBit;
00430
00431
00432 fPresentationTime.tv_usec += frameDurationInMicroseconds;
00433 if (fPresentationTime.tv_usec >= 1000000) {
00434 fPresentationTime.tv_sec += fPresentationTime.tv_usec/1000000;
00435 fPresentationTime.tv_usec = fPresentationTime.tv_usec%1000000;
00436 }
00437 }
00438
00439 BufferedPacketFactory::BufferedPacketFactory() {
00440 }
00441
00442 BufferedPacketFactory::~BufferedPacketFactory() {
00443 }
00444
00445 BufferedPacket* BufferedPacketFactory
00446 ::createNewPacket(MultiFramedRTPSource* ) {
00447 return new BufferedPacket;
00448 }
00449
00450
00452
00453 ReorderingPacketBuffer
00454 ::ReorderingPacketBuffer(BufferedPacketFactory* packetFactory)
00455 : fThresholdTime(100000) ,
00456 fHaveSeenFirstPacket(False), fHeadPacket(NULL), fSavedPacket(NULL), fSavedPacketFree(True) {
00457 fPacketFactory = (packetFactory == NULL)
00458 ? (new BufferedPacketFactory)
00459 : packetFactory;
00460 }
00461
00462 ReorderingPacketBuffer::~ReorderingPacketBuffer() {
00463 reset();
00464 delete fPacketFactory;
00465 }
00466
00467 void ReorderingPacketBuffer::reset() {
00468 if (fSavedPacketFree) delete fSavedPacket;
00469 delete fHeadPacket;
00470 fHaveSeenFirstPacket = False;
00471 fHeadPacket = NULL;
00472 fSavedPacket = NULL;
00473 }
00474
00475 BufferedPacket* ReorderingPacketBuffer::getFreePacket(MultiFramedRTPSource* ourSource) {
00476 if (fSavedPacket == NULL) {
00477 fSavedPacket = fPacketFactory->createNewPacket(ourSource);
00478 fSavedPacketFree = True;
00479 }
00480
00481 if (fSavedPacketFree == True) {
00482 fSavedPacketFree = False;
00483 return fSavedPacket;
00484 } else {
00485 return fPacketFactory->createNewPacket(ourSource);
00486 }
00487 }
00488
00489 Boolean ReorderingPacketBuffer::storePacket(BufferedPacket* bPacket) {
00490 unsigned short rtpSeqNo = bPacket->rtpSeqNo();
00491
00492 if (!fHaveSeenFirstPacket) {
00493 fNextExpectedSeqNo = rtpSeqNo;
00494 bPacket->isFirstPacket() = True;
00495 fHaveSeenFirstPacket = True;
00496 }
00497
00498
00499
00500 if (seqNumLT(rtpSeqNo, fNextExpectedSeqNo)) return False;
00501
00502
00503 BufferedPacket* beforePtr = NULL;
00504 BufferedPacket* afterPtr = fHeadPacket;
00505 while (afterPtr != NULL) {
00506 if (seqNumLT(rtpSeqNo, afterPtr->rtpSeqNo())) break;
00507 if (rtpSeqNo == afterPtr->rtpSeqNo()) {
00508
00509 return False;
00510 }
00511
00512 beforePtr = afterPtr;
00513 afterPtr = afterPtr->nextPacket();
00514 }
00515
00516
00517 bPacket->nextPacket() = afterPtr;
00518 if (beforePtr == NULL) {
00519 fHeadPacket = bPacket;
00520 } else {
00521 beforePtr->nextPacket() = bPacket;
00522 }
00523
00524 return True;
00525 }
00526
00527 void ReorderingPacketBuffer::releaseUsedPacket(BufferedPacket* packet) {
00528
00529
00530 ++fNextExpectedSeqNo;
00531
00532 fHeadPacket = fHeadPacket->nextPacket();
00533 packet->nextPacket() = NULL;
00534
00535 freePacket(packet);
00536 }
00537
00538 BufferedPacket* ReorderingPacketBuffer
00539 ::getNextCompletedPacket(Boolean& packetLossPreceded) {
00540 if (fHeadPacket == NULL) return NULL;
00541
00542
00543
00544
00545 if (fHeadPacket->rtpSeqNo() == fNextExpectedSeqNo) {
00546 packetLossPreceded = fHeadPacket->isFirstPacket();
00547
00548 return fHeadPacket;
00549 }
00550
00551
00552
00553
00554 struct timeval timeNow;
00555 gettimeofday(&timeNow, NULL);
00556 unsigned uSecondsSinceReceived
00557 = (timeNow.tv_sec - fHeadPacket->timeReceived().tv_sec)*1000000
00558 + (timeNow.tv_usec - fHeadPacket->timeReceived().tv_usec);
00559 if (uSecondsSinceReceived > fThresholdTime) {
00560 fNextExpectedSeqNo = fHeadPacket->rtpSeqNo();
00561
00562 packetLossPreceded = True;
00563 return fHeadPacket;
00564 }
00565
00566
00567 return NULL;
00568 }