00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "RTCP.hh"
00022 #include "GroupsockHelper.hh"
00023 #include "rtcp_from_spec.h"
00024
00026
00027 class RTCPMemberDatabase {
00028 public:
00029 RTCPMemberDatabase(RTCPInstance& ourRTCPInstance)
00030 : fOurRTCPInstance(ourRTCPInstance), fNumMembers(1 ),
00031 fTable(HashTable::create(ONE_WORD_HASH_KEYS)) {
00032 }
00033
00034 virtual ~RTCPMemberDatabase() {
00035 delete fTable;
00036 }
00037
00038 Boolean isMember(unsigned ssrc) const {
00039 return fTable->Lookup((char*)(long)ssrc) != NULL;
00040 }
00041
00042 Boolean noteMembership(unsigned ssrc, unsigned curTimeCount) {
00043 Boolean isNew = !isMember(ssrc);
00044
00045 if (isNew) {
00046 ++fNumMembers;
00047 }
00048
00049
00050 fTable->Add((char*)(long)ssrc, (void*)(long)curTimeCount);
00051
00052 return isNew;
00053 }
00054
00055 Boolean remove(unsigned ssrc) {
00056 Boolean wasPresent = fTable->Remove((char*)(long)ssrc);
00057 if (wasPresent) {
00058 --fNumMembers;
00059 }
00060 return wasPresent;
00061 }
00062
00063 unsigned numMembers() const {
00064 return fNumMembers;
00065 }
00066
00067 void reapOldMembers(unsigned threshold);
00068
00069 private:
00070 RTCPInstance& fOurRTCPInstance;
00071 unsigned fNumMembers;
00072 HashTable* fTable;
00073 };
00074
00075 void RTCPMemberDatabase::reapOldMembers(unsigned threshold) {
00076 Boolean foundOldMember;
00077 unsigned oldSSRC = 0;
00078
00079 do {
00080 foundOldMember = False;
00081
00082 HashTable::Iterator* iter
00083 = HashTable::Iterator::create(*fTable);
00084 unsigned long timeCount;
00085 char const* key;
00086 while ((timeCount = (unsigned long)(iter->next(key))) != 0) {
00087 #ifdef DEBUG
00088 fprintf(stderr, "reap: checking SSRC 0x%lx: %ld (threshold %d)\n", (unsigned long)key, timeCount, threshold);
00089 #endif
00090 if (timeCount < (unsigned long)threshold) {
00091 unsigned long ssrc = (unsigned long)key;
00092 oldSSRC = (unsigned)ssrc;
00093 foundOldMember = True;
00094 }
00095 }
00096 delete iter;
00097
00098 if (foundOldMember) {
00099 #ifdef DEBUG
00100 fprintf(stderr, "reap: removing SSRC 0x%x\n", oldSSRC);
00101 #endif
00102 fOurRTCPInstance.removeSSRC(oldSSRC, True);
00103 }
00104 } while (foundOldMember);
00105 }
00106
00107
00109
00110 static double dTimeNow() {
00111 struct timeval timeNow;
00112 gettimeofday(&timeNow, NULL);
00113 return (double) (timeNow.tv_sec + timeNow.tv_usec/1000000.0);
00114 }
00115
00116 static unsigned const maxPacketSize = 1450;
00117
00118 static unsigned const preferredPacketSize = 1000;
00119
00120 RTCPInstance::RTCPInstance(UsageEnvironment& env, Groupsock* RTCPgs,
00121 unsigned totSessionBW,
00122 unsigned char const* cname,
00123 RTPSink* sink, RTPSource const* source,
00124 Boolean isSSMSource)
00125 : Medium(env), fRTCPInterface(this, RTCPgs), fTotSessionBW(totSessionBW),
00126 fSink(sink), fSource(source), fIsSSMSource(isSSMSource),
00127 fCNAME(RTCP_SDES_CNAME, cname), fOutgoingReportCount(1),
00128 fAveRTCPSize(0), fIsInitial(1), fPrevNumMembers(0),
00129 fLastSentSize(0), fLastReceivedSize(0), fLastReceivedSSRC(0),
00130 fTypeOfEvent(EVENT_UNKNOWN), fTypeOfPacket(PACKET_UNKNOWN_TYPE),
00131 fHaveJustSentPacket(False), fLastPacketSentSize(0),
00132 fByeHandlerTask(NULL), fByeHandlerClientData(NULL),
00133 fSRHandlerTask(NULL), fSRHandlerClientData(NULL),
00134 fRRHandlerTask(NULL), fRRHandlerClientData(NULL),
00135 fSpecificRRHandlerTable(NULL) {
00136 #ifdef DEBUG
00137 fprintf(stderr, "RTCPInstance[%p]::RTCPInstance()\n", this);
00138 #endif
00139 if (fTotSessionBW == 0) {
00140 env << "RTCPInstance::RTCPInstance error: totSessionBW parameter should not be zero!\n";
00141 fTotSessionBW = 1;
00142 }
00143
00144 if (isSSMSource) RTCPgs->multicastSendOnly();
00145
00146 double timeNow = dTimeNow();
00147 fPrevReportTime = fNextReportTime = timeNow;
00148
00149 fKnownMembers = new RTCPMemberDatabase(*this);
00150 fInBuf = new unsigned char[maxPacketSize];
00151 if (fKnownMembers == NULL || fInBuf == NULL) return;
00152 fNumBytesAlreadyRead = 0;
00153
00154
00155 unsigned savedMaxSize = OutPacketBuffer::maxSize;
00156 OutPacketBuffer::maxSize = maxPacketSize;
00157 fOutBuf = new OutPacketBuffer(preferredPacketSize, maxPacketSize);
00158 OutPacketBuffer::maxSize = savedMaxSize;
00159 if (fOutBuf == NULL) return;
00160
00161
00162 TaskScheduler::BackgroundHandlerProc* handler
00163 = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;
00164 fRTCPInterface.startNetworkReading(handler);
00165
00166
00167 fTypeOfEvent = EVENT_REPORT;
00168 onExpire(this);
00169 }
00170
00171 struct RRHandlerRecord {
00172 TaskFunc* rrHandlerTask;
00173 void* rrHandlerClientData;
00174 };
00175
00176 RTCPInstance::~RTCPInstance() {
00177 #ifdef DEBUG
00178 fprintf(stderr, "RTCPInstance[%p]::~RTCPInstance()\n", this);
00179 #endif
00180
00181 fRTCPInterface.stopNetworkReading();
00182
00183
00184
00185 fTypeOfEvent = EVENT_BYE;
00186 sendBYE();
00187
00188 if (fSpecificRRHandlerTable != NULL) {
00189 AddressPortLookupTable::Iterator iter(*fSpecificRRHandlerTable);
00190 RRHandlerRecord* rrHandler;
00191 while ((rrHandler = (RRHandlerRecord*)iter.next()) != NULL) {
00192 delete rrHandler;
00193 }
00194 delete fSpecificRRHandlerTable;
00195 }
00196
00197 delete fKnownMembers;
00198 delete fOutBuf;
00199 delete[] fInBuf;
00200 }
00201
00202 RTCPInstance* RTCPInstance::createNew(UsageEnvironment& env, Groupsock* RTCPgs,
00203 unsigned totSessionBW,
00204 unsigned char const* cname,
00205 RTPSink* sink, RTPSource const* source,
00206 Boolean isSSMSource) {
00207 return new RTCPInstance(env, RTCPgs, totSessionBW, cname, sink, source,
00208 isSSMSource);
00209 }
00210
00211 Boolean RTCPInstance::lookupByName(UsageEnvironment& env,
00212 char const* instanceName,
00213 RTCPInstance*& resultInstance) {
00214 resultInstance = NULL;
00215
00216 Medium* medium;
00217 if (!Medium::lookupByName(env, instanceName, medium)) return False;
00218
00219 if (!medium->isRTCPInstance()) {
00220 env.setResultMsg(instanceName, " is not a RTCP instance");
00221 return False;
00222 }
00223
00224 resultInstance = (RTCPInstance*)medium;
00225 return True;
00226 }
00227
00228 Boolean RTCPInstance::isRTCPInstance() const {
00229 return True;
00230 }
00231
00232 unsigned RTCPInstance::numMembers() const {
00233 if (fKnownMembers == NULL) return 0;
00234
00235 return fKnownMembers->numMembers();
00236 }
00237
00238 void RTCPInstance::setByeHandler(TaskFunc* handlerTask, void* clientData,
00239 Boolean handleActiveParticipantsOnly) {
00240 fByeHandlerTask = handlerTask;
00241 fByeHandlerClientData = clientData;
00242 fByeHandleActiveParticipantsOnly = handleActiveParticipantsOnly;
00243 }
00244
00245 void RTCPInstance::setSRHandler(TaskFunc* handlerTask, void* clientData) {
00246 fSRHandlerTask = handlerTask;
00247 fSRHandlerClientData = clientData;
00248 }
00249
00250 void RTCPInstance::setRRHandler(TaskFunc* handlerTask, void* clientData) {
00251 fRRHandlerTask = handlerTask;
00252 fRRHandlerClientData = clientData;
00253 }
00254
00255 void RTCPInstance
00256 ::setSpecificRRHandler(netAddressBits fromAddress, Port fromPort,
00257 TaskFunc* handlerTask, void* clientData) {
00258 if (handlerTask == NULL && clientData == NULL) {
00259 unsetSpecificRRHandler(fromAddress, fromPort);
00260 return;
00261 }
00262
00263 RRHandlerRecord* rrHandler = new RRHandlerRecord;
00264 rrHandler->rrHandlerTask = handlerTask;
00265 rrHandler->rrHandlerClientData = clientData;
00266 if (fSpecificRRHandlerTable == NULL) {
00267 fSpecificRRHandlerTable = new AddressPortLookupTable;
00268 }
00269 fSpecificRRHandlerTable->Add(fromAddress, (~0), fromPort, rrHandler);
00270 }
00271
00272 void RTCPInstance
00273 ::unsetSpecificRRHandler(netAddressBits fromAddress, Port fromPort) {
00274 if (fSpecificRRHandlerTable == NULL) return;
00275
00276 RRHandlerRecord* rrHandler
00277 = (RRHandlerRecord*)(fSpecificRRHandlerTable->Lookup(fromAddress, (~0), fromPort));
00278 if (rrHandler != NULL) {
00279 fSpecificRRHandlerTable->Remove(fromAddress, (~0), fromPort);
00280 delete rrHandler;
00281 }
00282 }
00283
00284 void RTCPInstance::setStreamSocket(int sockNum,
00285 unsigned char streamChannelId) {
00286
00287 fRTCPInterface.stopNetworkReading();
00288
00289
00290 fRTCPInterface.setStreamSocket(sockNum, streamChannelId);
00291
00292
00293 TaskScheduler::BackgroundHandlerProc* handler
00294 = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;
00295 fRTCPInterface.startNetworkReading(handler);
00296 }
00297
00298 void RTCPInstance::addStreamSocket(int sockNum,
00299 unsigned char streamChannelId) {
00300
00301 envir().taskScheduler().turnOffBackgroundReadHandling(fRTCPInterface.gs()->socketNum());
00302
00303
00304 fRTCPInterface.addStreamSocket(sockNum, streamChannelId);
00305
00306
00307 TaskScheduler::BackgroundHandlerProc* handler
00308 = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;
00309 fRTCPInterface.startNetworkReading(handler);
00310 }
00311
00312 static unsigned const IP_UDP_HDR_SIZE = 28;
00313
00314
00315 #define ADVANCE(n) pkt += (n); packetSize -= (n)
00316
00317 void RTCPInstance::incomingReportHandler(RTCPInstance* instance,
00318 int ) {
00319 instance->incomingReportHandler1();
00320 }
00321
00322 void RTCPInstance::incomingReportHandler1() {
00323 do {
00324 int tcpReadStreamSocketNum = fRTCPInterface.nextTCPReadStreamSocketNum();
00325 unsigned char tcpReadStreamChannelId = fRTCPInterface.nextTCPReadStreamChannelId();
00326 unsigned packetSize = 0;
00327 unsigned numBytesRead;
00328 struct sockaddr_in fromAddress;
00329 Boolean packetReadWasIncomplete;
00330 Boolean readResult
00331 = fRTCPInterface.handleRead(&fInBuf[fNumBytesAlreadyRead], maxPacketSize - fNumBytesAlreadyRead,
00332 numBytesRead, fromAddress, packetReadWasIncomplete);
00333 if (packetReadWasIncomplete) {
00334 fNumBytesAlreadyRead += numBytesRead;
00335 return;
00336 } else {
00337 packetSize = fNumBytesAlreadyRead + numBytesRead;
00338 fNumBytesAlreadyRead = 0;
00339 }
00340 if (!readResult) break;
00341
00342
00343 if (RTCPgs()->wasLoopedBackFromUs(envir(), fromAddress)) {
00344
00345
00346
00347
00348
00349 if (fHaveJustSentPacket && fLastPacketSentSize == packetSize) {
00350
00351 fHaveJustSentPacket = False;
00352 break;
00353 }
00354 }
00355
00356 unsigned char* pkt = fInBuf;
00357 if (fIsSSMSource) {
00358
00359
00360
00361
00362
00363
00364 fRTCPInterface.sendPacket(pkt, packetSize);
00365 fHaveJustSentPacket = True;
00366 fLastPacketSentSize = packetSize;
00367 }
00368
00369 #ifdef DEBUG
00370 fprintf(stderr, "[%p]saw incoming RTCP packet (from address %s, port %d)\n", this, our_inet_ntoa(fromAddress.sin_addr), ntohs(fromAddress.sin_port));
00371 for (unsigned i = 0; i < packetSize; ++i) {
00372 if (i%4 == 0) fprintf(stderr, " ");
00373 fprintf(stderr, "%02x", pkt[i]);
00374 }
00375 fprintf(stderr, "\n");
00376 #endif
00377 int totPacketSize = IP_UDP_HDR_SIZE + packetSize;
00378
00379
00380
00381
00382
00383 if (packetSize < 4) break;
00384 unsigned rtcpHdr = ntohl(*(unsigned*)pkt);
00385 if ((rtcpHdr & 0xE0FE0000) != (0x80000000 | (RTCP_PT_SR<<16))) {
00386 #ifdef DEBUG
00387 fprintf(stderr, "rejected bad RTCP packet: header 0x%08x\n", rtcpHdr);
00388 #endif
00389 break;
00390 }
00391
00392
00393
00394 int typeOfPacket = PACKET_UNKNOWN_TYPE;
00395 unsigned reportSenderSSRC = 0;
00396 Boolean packetOK = False;
00397 while (1) {
00398 unsigned rc = (rtcpHdr>>24)&0x1F;
00399 unsigned pt = (rtcpHdr>>16)&0xFF;
00400 unsigned length = 4*(rtcpHdr&0xFFFF);
00401 ADVANCE(4);
00402 if (length > packetSize) break;
00403
00404
00405 if (length < 4) break; length -= 4;
00406 reportSenderSSRC = ntohl(*(unsigned*)pkt); ADVANCE(4);
00407
00408 Boolean subPacketOK = False;
00409 switch (pt) {
00410 case RTCP_PT_SR: {
00411 #ifdef DEBUG
00412 fprintf(stderr, "SR\n");
00413 #endif
00414 if (length < 20) break; length -= 20;
00415
00416
00417 unsigned NTPmsw = ntohl(*(unsigned*)pkt); ADVANCE(4);
00418 unsigned NTPlsw = ntohl(*(unsigned*)pkt); ADVANCE(4);
00419 unsigned rtpTimestamp = ntohl(*(unsigned*)pkt); ADVANCE(4);
00420 if (fSource != NULL) {
00421 RTPReceptionStatsDB& receptionStats
00422 = fSource->receptionStatsDB();
00423 receptionStats.noteIncomingSR(reportSenderSSRC,
00424 NTPmsw, NTPlsw, rtpTimestamp);
00425 }
00426 ADVANCE(8);
00427
00428
00429 if (fSRHandlerTask != NULL) (*fSRHandlerTask)(fSRHandlerClientData);
00430
00431
00432 }
00433 case RTCP_PT_RR: {
00434 #ifdef DEBUG
00435 fprintf(stderr, "RR\n");
00436 #endif
00437 unsigned reportBlocksSize = rc*(6*4);
00438 if (length < reportBlocksSize) break;
00439 length -= reportBlocksSize;
00440
00441 if (fSink != NULL) {
00442
00443 RTPTransmissionStatsDB& transmissionStats = fSink->transmissionStatsDB();
00444 for (unsigned i = 0; i < rc; ++i) {
00445 unsigned senderSSRC = ntohl(*(unsigned*)pkt); ADVANCE(4);
00446
00447 if (senderSSRC == fSink->SSRC()) {
00448 unsigned lossStats = ntohl(*(unsigned*)pkt); ADVANCE(4);
00449 unsigned highestReceived = ntohl(*(unsigned*)pkt); ADVANCE(4);
00450 unsigned jitter = ntohl(*(unsigned*)pkt); ADVANCE(4);
00451 unsigned timeLastSR = ntohl(*(unsigned*)pkt); ADVANCE(4);
00452 unsigned timeSinceLastSR = ntohl(*(unsigned*)pkt); ADVANCE(4);
00453 transmissionStats.noteIncomingRR(reportSenderSSRC, fromAddress,
00454 lossStats,
00455 highestReceived, jitter,
00456 timeLastSR, timeSinceLastSR);
00457 } else {
00458 ADVANCE(4*5);
00459 }
00460 }
00461 } else {
00462 ADVANCE(reportBlocksSize);
00463 }
00464
00465 if (pt == RTCP_PT_RR) {
00466
00467
00468
00469 if (fSpecificRRHandlerTable != NULL) {
00470 netAddressBits fromAddr;
00471 portNumBits fromPortNum;
00472 if (tcpReadStreamSocketNum < 0) {
00473
00474 fromAddr = fromAddress.sin_addr.s_addr;
00475 fromPortNum = ntohs(fromAddress.sin_port);
00476 } else {
00477
00478
00479 fromAddr = tcpReadStreamSocketNum;
00480 fromPortNum = tcpReadStreamChannelId;
00481 }
00482 Port fromPort(fromPortNum);
00483 RRHandlerRecord* rrHandler
00484 = (RRHandlerRecord*)(fSpecificRRHandlerTable->Lookup(fromAddr, (~0), fromPort));
00485 if (rrHandler != NULL) {
00486 if (rrHandler->rrHandlerTask != NULL) {
00487 (*(rrHandler->rrHandlerTask))(rrHandler->rrHandlerClientData);
00488 }
00489 }
00490 }
00491
00492
00493 if (fRRHandlerTask != NULL) (*fRRHandlerTask)(fRRHandlerClientData);
00494 }
00495
00496 subPacketOK = True;
00497 typeOfPacket = PACKET_RTCP_REPORT;
00498 break;
00499 }
00500 case RTCP_PT_BYE: {
00501 #ifdef DEBUG
00502 fprintf(stderr, "BYE\n");
00503 #endif
00504
00505 TaskFunc* byeHandler = fByeHandlerTask;
00506 if (byeHandler != NULL
00507 && (!fByeHandleActiveParticipantsOnly
00508 || (fSource != NULL
00509 && fSource->receptionStatsDB().lookup(reportSenderSSRC) != NULL)
00510 || (fSink != NULL
00511 && fSink->transmissionStatsDB().lookup(reportSenderSSRC) != NULL))) {
00512 fByeHandlerTask = NULL;
00513
00514 (*byeHandler)(fByeHandlerClientData);
00515 }
00516
00517
00518
00519 subPacketOK = True;
00520 typeOfPacket = PACKET_BYE;
00521 break;
00522 }
00523
00524 default:
00525 #ifdef DEBUG
00526 fprintf(stderr, "UNSUPPORTED TYPE(0x%x)\n", pt);
00527 #endif
00528 subPacketOK = True;
00529 break;
00530 }
00531 if (!subPacketOK) break;
00532
00533
00534
00535 #ifdef DEBUG
00536 fprintf(stderr, "validated RTCP subpacket (type %d): %d, %d, %d, 0x%08x\n", typeOfPacket, rc, pt, length, reportSenderSSRC);
00537 #endif
00538
00539
00540 ADVANCE(length);
00541
00542
00543 if (packetSize == 0) {
00544 packetOK = True;
00545 break;
00546 } else if (packetSize < 4) {
00547 #ifdef DEBUG
00548 fprintf(stderr, "extraneous %d bytes at end of RTCP packet!\n", packetSize);
00549 #endif
00550 break;
00551 }
00552 rtcpHdr = ntohl(*(unsigned*)pkt);
00553 if ((rtcpHdr & 0xC0000000) != 0x80000000) {
00554 #ifdef DEBUG
00555 fprintf(stderr, "bad RTCP subpacket: header 0x%08x\n", rtcpHdr);
00556 #endif
00557 break;
00558 }
00559 }
00560
00561 if (!packetOK) {
00562 #ifdef DEBUG
00563 fprintf(stderr, "rejected bad RTCP subpacket: header 0x%08x\n", rtcpHdr);
00564 #endif
00565 break;
00566 } else {
00567 #ifdef DEBUG
00568 fprintf(stderr, "validated entire RTCP packet\n");
00569 #endif
00570 }
00571
00572 onReceive(typeOfPacket, totPacketSize, reportSenderSSRC);
00573 } while (0);
00574 }
00575
00576 void RTCPInstance::onReceive(int typeOfPacket, int totPacketSize,
00577 unsigned ssrc) {
00578 fTypeOfPacket = typeOfPacket;
00579 fLastReceivedSize = totPacketSize;
00580 fLastReceivedSSRC = ssrc;
00581
00582 int members = (int)numMembers();
00583 int senders = (fSink != NULL) ? 1 : 0;
00584
00585 OnReceive(this,
00586 this,
00587 &members,
00588 &fPrevNumMembers,
00589 &senders,
00590 &fAveRTCPSize,
00591 &fPrevReportTime,
00592 dTimeNow(),
00593 fNextReportTime);
00594 }
00595
00596 void RTCPInstance::sendReport() {
00597
00598
00599
00600 if (fSink != NULL && fSink->nextTimestampHasBeenPreset()) return;
00601
00602 #ifdef DEBUG
00603 fprintf(stderr, "sending REPORT\n");
00604 #endif
00605
00606 addReport();
00607
00608
00609 addSDES();
00610
00611
00612 sendBuiltPacket();
00613
00614
00615 const unsigned membershipReapPeriod = 5;
00616 if ((++fOutgoingReportCount) % membershipReapPeriod == 0) {
00617 unsigned threshold = fOutgoingReportCount - membershipReapPeriod;
00618 fKnownMembers->reapOldMembers(threshold);
00619 }
00620 }
00621
00622 void RTCPInstance::sendBYE() {
00623 #ifdef DEBUG
00624 fprintf(stderr, "sending BYE\n");
00625 #endif
00626
00627 addReport();
00628
00629 addBYE();
00630 sendBuiltPacket();
00631 }
00632
00633 void RTCPInstance::sendBuiltPacket() {
00634 #ifdef DEBUG
00635 fprintf(stderr, "sending RTCP packet\n");
00636 unsigned char* p = fOutBuf->packet();
00637 for (unsigned i = 0; i < fOutBuf->curPacketSize(); ++i) {
00638 if (i%4 == 0) fprintf(stderr," ");
00639 fprintf(stderr, "%02x", p[i]);
00640 }
00641 fprintf(stderr, "\n");
00642 #endif
00643 unsigned reportSize = fOutBuf->curPacketSize();
00644 fRTCPInterface.sendPacket(fOutBuf->packet(), reportSize);
00645 fOutBuf->resetOffset();
00646
00647 fLastSentSize = IP_UDP_HDR_SIZE + reportSize;
00648 fHaveJustSentPacket = True;
00649 fLastPacketSentSize = reportSize;
00650 }
00651
00652 int RTCPInstance::checkNewSSRC() {
00653 return fKnownMembers->noteMembership(fLastReceivedSSRC,
00654 fOutgoingReportCount);
00655 }
00656
00657 void RTCPInstance::removeLastReceivedSSRC() {
00658 removeSSRC(fLastReceivedSSRC, False);
00659 }
00660
00661 void RTCPInstance::removeSSRC(u_int32_t ssrc, Boolean alsoRemoveStats) {
00662 fKnownMembers->remove(ssrc);
00663
00664 if (alsoRemoveStats) {
00665
00666 if (fSource != NULL) fSource->receptionStatsDB().removeRecord(ssrc);
00667 if (fSink != NULL) fSink->transmissionStatsDB().removeRecord(ssrc);
00668 }
00669 }
00670
00671 void RTCPInstance::onExpire(RTCPInstance* instance) {
00672 instance->onExpire1();
00673 }
00674
00675
00676
00677 void RTCPInstance::addReport() {
00678
00679
00680 if (fSink != NULL) {
00681 addSR();
00682 } else if (fSource != NULL) {
00683 addRR();
00684 }
00685 }
00686
00687 void RTCPInstance::addSR() {
00688
00689
00690 enqueueCommonReportPrefix(RTCP_PT_SR, fSink->SSRC(),
00691 5 );
00692
00693
00694
00695
00696 struct timeval timeNow;
00697 gettimeofday(&timeNow, NULL);
00698 fOutBuf->enqueueWord(timeNow.tv_sec + 0x83AA7E80);
00699
00700 double fractionalPart = (timeNow.tv_usec/15625.0)*0x04000000;
00701 fOutBuf->enqueueWord((unsigned)(fractionalPart+0.5));
00702
00703 unsigned rtpTimestamp = fSink->convertToRTPTimestamp(timeNow);
00704 fOutBuf->enqueueWord(rtpTimestamp);
00705
00706
00707 fOutBuf->enqueueWord(fSink->packetCount());
00708 fOutBuf->enqueueWord(fSink->octetCount());
00709
00710 enqueueCommonReportSuffix();
00711 }
00712
00713 void RTCPInstance::addRR() {
00714
00715
00716 enqueueCommonReportPrefix(RTCP_PT_RR, fSource->SSRC());
00717 enqueueCommonReportSuffix();
00718 }
00719
00720 void RTCPInstance::enqueueCommonReportPrefix(unsigned char packetType,
00721 unsigned SSRC,
00722 unsigned numExtraWords) {
00723 unsigned numReportingSources;
00724 if (fSource == NULL) {
00725 numReportingSources = 0;
00726 } else {
00727 RTPReceptionStatsDB& allReceptionStats
00728 = fSource->receptionStatsDB();
00729 numReportingSources = allReceptionStats.numActiveSourcesSinceLastReset();
00730
00731 if (numReportingSources >= 32) { numReportingSources = 32; }
00732
00733 }
00734
00735 unsigned rtcpHdr = 0x80000000;
00736 rtcpHdr |= (numReportingSources<<24);
00737 rtcpHdr |= (packetType<<16);
00738 rtcpHdr |= (1 + numExtraWords + 6*numReportingSources);
00739
00740 fOutBuf->enqueueWord(rtcpHdr);
00741
00742 fOutBuf->enqueueWord(SSRC);
00743 }
00744
00745 void RTCPInstance::enqueueCommonReportSuffix() {
00746
00747 if (fSource != NULL) {
00748 RTPReceptionStatsDB& allReceptionStats
00749 = fSource->receptionStatsDB();
00750
00751 RTPReceptionStatsDB::Iterator iterator(allReceptionStats);
00752 while (1) {
00753 RTPReceptionStats* receptionStats = iterator.next();
00754 if (receptionStats == NULL) break;
00755 enqueueReportBlock(receptionStats);
00756 }
00757
00758 allReceptionStats.reset();
00759 }
00760 }
00761
00762 void
00763 RTCPInstance::enqueueReportBlock(RTPReceptionStats* stats) {
00764 fOutBuf->enqueueWord(stats->SSRC());
00765
00766 unsigned highestExtSeqNumReceived = stats->highestExtSeqNumReceived();
00767
00768 unsigned totNumExpected
00769 = highestExtSeqNumReceived - stats->baseExtSeqNumReceived();
00770 int totNumLost = totNumExpected - stats->totNumPacketsReceived();
00771
00772 if (totNumLost > 0x007FFFFF) {
00773 totNumLost = 0x007FFFFF;
00774 } else if (totNumLost < 0) {
00775 if (totNumLost < -0x00800000) totNumLost = 0x00800000;
00776 totNumLost &= 0x00FFFFFF;
00777 }
00778
00779 unsigned numExpectedSinceLastReset
00780 = highestExtSeqNumReceived - stats->lastResetExtSeqNumReceived();
00781 int numLostSinceLastReset
00782 = numExpectedSinceLastReset - stats->numPacketsReceivedSinceLastReset();
00783 unsigned char lossFraction;
00784 if (numExpectedSinceLastReset == 0 || numLostSinceLastReset < 0) {
00785 lossFraction = 0;
00786 } else {
00787 lossFraction = (unsigned char)
00788 ((numLostSinceLastReset << 8) / numExpectedSinceLastReset);
00789 }
00790
00791 fOutBuf->enqueueWord((lossFraction<<24) | totNumLost);
00792 fOutBuf->enqueueWord(highestExtSeqNumReceived);
00793
00794 fOutBuf->enqueueWord(stats->jitter());
00795
00796 unsigned NTPmsw = stats->lastReceivedSR_NTPmsw();
00797 unsigned NTPlsw = stats->lastReceivedSR_NTPlsw();
00798 unsigned LSR = ((NTPmsw&0xFFFF)<<16)|(NTPlsw>>16);
00799 fOutBuf->enqueueWord(LSR);
00800
00801
00802 struct timeval const& LSRtime = stats->lastReceivedSR_time();
00803 struct timeval timeNow, timeSinceLSR;
00804 gettimeofday(&timeNow, NULL);
00805 if (timeNow.tv_usec < LSRtime.tv_usec) {
00806 timeNow.tv_usec += 1000000;
00807 timeNow.tv_sec -= 1;
00808 }
00809 timeSinceLSR.tv_sec = timeNow.tv_sec - LSRtime.tv_sec;
00810 timeSinceLSR.tv_usec = timeNow.tv_usec - LSRtime.tv_usec;
00811
00812
00813 unsigned DLSR;
00814 if (LSR == 0) {
00815 DLSR = 0;
00816 } else {
00817 DLSR = (timeSinceLSR.tv_sec<<16)
00818 | ( (((timeSinceLSR.tv_usec<<11)+15625)/31250) & 0xFFFF);
00819 }
00820 fOutBuf->enqueueWord(DLSR);
00821 }
00822
00823 void RTCPInstance::addSDES() {
00824
00825
00826
00827 unsigned numBytes = 4;
00828
00829 numBytes += fCNAME.totalSize();
00830 numBytes += 1;
00831
00832 unsigned num4ByteWords = (numBytes + 3)/4;
00833
00834 unsigned rtcpHdr = 0x81000000;
00835 rtcpHdr |= (RTCP_PT_SDES<<16);
00836 rtcpHdr |= num4ByteWords;
00837 fOutBuf->enqueueWord(rtcpHdr);
00838
00839 if (fSource != NULL) {
00840 fOutBuf->enqueueWord(fSource->SSRC());
00841 } else if (fSink != NULL) {
00842 fOutBuf->enqueueWord(fSink->SSRC());
00843 }
00844
00845
00846 fOutBuf->enqueue(fCNAME.data(), fCNAME.totalSize());
00847
00848
00849 unsigned numPaddingBytesNeeded = 4 - (fOutBuf->curPacketSize() % 4);
00850 unsigned char const zero = '\0';
00851 while (numPaddingBytesNeeded-- > 0) fOutBuf->enqueue(&zero, 1);
00852 }
00853
00854 void RTCPInstance::addBYE() {
00855 unsigned rtcpHdr = 0x81000000;
00856 rtcpHdr |= (RTCP_PT_BYE<<16);
00857 rtcpHdr |= 1;
00858 fOutBuf->enqueueWord(rtcpHdr);
00859
00860 if (fSource != NULL) {
00861 fOutBuf->enqueueWord(fSource->SSRC());
00862 } else if (fSink != NULL) {
00863 fOutBuf->enqueueWord(fSink->SSRC());
00864 }
00865 }
00866
00867 void RTCPInstance::schedule(double nextTime) {
00868 fNextReportTime = nextTime;
00869
00870 double secondsToDelay = nextTime - dTimeNow();
00871 if (secondsToDelay < 0) secondsToDelay = 0;
00872 #ifdef DEBUG
00873 fprintf(stderr, "schedule(%f->%f)\n", secondsToDelay, nextTime);
00874 #endif
00875 int64_t usToGo = (int64_t)(secondsToDelay * 1000000);
00876 nextTask() = envir().taskScheduler().scheduleDelayedTask(usToGo,
00877 (TaskFunc*)RTCPInstance::onExpire, this);
00878 }
00879
00880 void RTCPInstance::reschedule(double nextTime) {
00881 envir().taskScheduler().unscheduleDelayedTask(nextTask());
00882 schedule(nextTime);
00883 }
00884
00885 void RTCPInstance::onExpire1() {
00886
00887 double rtcpBW = 0.05*fTotSessionBW*1024/8;
00888
00889 OnExpire(this,
00890 numMembers(),
00891 (fSink != NULL) ? 1 : 0,
00892 rtcpBW,
00893 (fSink != NULL) ? 1 : 0,
00894 &fAveRTCPSize,
00895 &fIsInitial,
00896 dTimeNow(),
00897 &fPrevReportTime,
00898 &fPrevNumMembers
00899 );
00900 }
00901
00903
00904 SDESItem::SDESItem(unsigned char tag, unsigned char const* value) {
00905 unsigned length = strlen((char const*)value);
00906 if (length > 0xFF) length = 0xFF;
00907
00908 fData[0] = tag;
00909 fData[1] = (unsigned char)length;
00910 memmove(&fData[2], value, length);
00911 }
00912
00913 unsigned SDESItem::totalSize() const {
00914 return 2 + (unsigned)fData[1];
00915 }
00916
00917
00919
00920 extern "C" void Schedule(double nextTime, event e) {
00921 RTCPInstance* instance = (RTCPInstance*)e;
00922 if (instance == NULL) return;
00923
00924 instance->schedule(nextTime);
00925 }
00926
00927 extern "C" void Reschedule(double nextTime, event e) {
00928 RTCPInstance* instance = (RTCPInstance*)e;
00929 if (instance == NULL) return;
00930
00931 instance->reschedule(nextTime);
00932 }
00933
00934 extern "C" void SendRTCPReport(event e) {
00935 RTCPInstance* instance = (RTCPInstance*)e;
00936 if (instance == NULL) return;
00937
00938 instance->sendReport();
00939 }
00940
00941 extern "C" void SendBYEPacket(event e) {
00942 RTCPInstance* instance = (RTCPInstance*)e;
00943 if (instance == NULL) return;
00944
00945 instance->sendBYE();
00946 }
00947
00948 extern "C" int TypeOfEvent(event e) {
00949 RTCPInstance* instance = (RTCPInstance*)e;
00950 if (instance == NULL) return EVENT_UNKNOWN;
00951
00952 return instance->typeOfEvent();
00953 }
00954
00955 extern "C" int SentPacketSize(event e) {
00956 RTCPInstance* instance = (RTCPInstance*)e;
00957 if (instance == NULL) return 0;
00958
00959 return instance->sentPacketSize();
00960 }
00961
00962 extern "C" int PacketType(packet p) {
00963 RTCPInstance* instance = (RTCPInstance*)p;
00964 if (instance == NULL) return PACKET_UNKNOWN_TYPE;
00965
00966 return instance->packetType();
00967 }
00968
00969 extern "C" int ReceivedPacketSize(packet p) {
00970 RTCPInstance* instance = (RTCPInstance*)p;
00971 if (instance == NULL) return 0;
00972
00973 return instance->receivedPacketSize();
00974 }
00975
00976 extern "C" int NewMember(packet p) {
00977 RTCPInstance* instance = (RTCPInstance*)p;
00978 if (instance == NULL) return 0;
00979
00980 return instance->checkNewSSRC();
00981 }
00982
00983 extern "C" int NewSender(packet ) {
00984 return 0;
00985 }
00986
00987 extern "C" void AddMember(packet ) {
00988
00989 }
00990
00991 extern "C" void AddSender(packet ) {
00992
00993 }
00994
00995 extern "C" void RemoveMember(packet p) {
00996 RTCPInstance* instance = (RTCPInstance*)p;
00997 if (instance == NULL) return;
00998
00999 instance->removeLastReceivedSSRC();
01000 }
01001
01002 extern "C" void RemoveSender(packet ) {
01003
01004 }
01005
01006 extern "C" double drand30() {
01007 unsigned tmp = our_random()&0x3FFFFFFF;
01008 return tmp/(double)(1024*1024*1024);
01009 }