00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "RTCP.hh"
00022 #include "GroupsockHelper.hh"
00023 #include "rtcp_from_spec.h"
00024
00026
00027 class RTCPMemberDatabase {
00028 public:
00029 RTCPMemberDatabase(RTCPInstance& ourRTCPInstance)
00030 : fOurRTCPInstance(ourRTCPInstance), fNumMembers(1 ),
00031 fTable(HashTable::create(ONE_WORD_HASH_KEYS)) {
00032 }
00033
00034 virtual ~RTCPMemberDatabase() {
00035 delete fTable;
00036 }
00037
00038 Boolean isMember(unsigned ssrc) const {
00039 return fTable->Lookup((char*)(long)ssrc) != NULL;
00040 }
00041
00042 Boolean noteMembership(unsigned ssrc, unsigned curTimeCount) {
00043 Boolean isNew = !isMember(ssrc);
00044
00045 if (isNew) {
00046 ++fNumMembers;
00047 }
00048
00049
00050 fTable->Add((char*)(long)ssrc, (void*)(long)curTimeCount);
00051
00052 return isNew;
00053 }
00054
00055 Boolean remove(unsigned ssrc) {
00056 Boolean wasPresent = fTable->Remove((char*)(long)ssrc);
00057 if (wasPresent) {
00058 --fNumMembers;
00059 }
00060 return wasPresent;
00061 }
00062
00063 unsigned numMembers() const {
00064 return fNumMembers;
00065 }
00066
00067 void reapOldMembers(unsigned threshold);
00068
00069 private:
00070 RTCPInstance& fOurRTCPInstance;
00071 unsigned fNumMembers;
00072 HashTable* fTable;
00073 };
00074
00075 void RTCPMemberDatabase::reapOldMembers(unsigned threshold) {
00076 Boolean foundOldMember;
00077 u_int32_t oldSSRC = 0;
00078
00079 do {
00080 foundOldMember = False;
00081
00082 HashTable::Iterator* iter
00083 = HashTable::Iterator::create(*fTable);
00084 uintptr_t timeCount;
00085 char const* key;
00086 while ((timeCount = (uintptr_t)(iter->next(key))) != 0) {
00087 #ifdef DEBUG
00088 fprintf(stderr, "reap: checking SSRC 0x%lx: %ld (threshold %d)\n", (unsigned long)key, timeCount, threshold);
00089 #endif
00090 if (timeCount < (uintptr_t)threshold) {
00091 uintptr_t ssrc = (uintptr_t)key;
00092 oldSSRC = (u_int32_t)ssrc;
00093 foundOldMember = True;
00094 }
00095 }
00096 delete iter;
00097
00098 if (foundOldMember) {
00099 #ifdef DEBUG
00100 fprintf(stderr, "reap: removing SSRC 0x%x\n", oldSSRC);
00101 #endif
00102 fOurRTCPInstance.removeSSRC(oldSSRC, True);
00103 }
00104 } while (foundOldMember);
00105 }
00106
00107
00109
00110 static double dTimeNow() {
00111 struct timeval timeNow;
00112 gettimeofday(&timeNow, NULL);
00113 return (double) (timeNow.tv_sec + timeNow.tv_usec/1000000.0);
00114 }
00115
00116 static unsigned const maxPacketSize = 1450;
00117
00118 static unsigned const preferredPacketSize = 1000;
00119
00120 RTCPInstance::RTCPInstance(UsageEnvironment& env, Groupsock* RTCPgs,
00121 unsigned totSessionBW,
00122 unsigned char const* cname,
00123 RTPSink* sink, RTPSource const* source,
00124 Boolean isSSMSource)
00125 : Medium(env), fRTCPInterface(this, RTCPgs), fTotSessionBW(totSessionBW),
00126 fSink(sink), fSource(source), fIsSSMSource(isSSMSource),
00127 fCNAME(RTCP_SDES_CNAME, cname), fOutgoingReportCount(1),
00128 fAveRTCPSize(0), fIsInitial(1), fPrevNumMembers(0),
00129 fLastSentSize(0), fLastReceivedSize(0), fLastReceivedSSRC(0),
00130 fTypeOfEvent(EVENT_UNKNOWN), fTypeOfPacket(PACKET_UNKNOWN_TYPE),
00131 fHaveJustSentPacket(False), fLastPacketSentSize(0),
00132 fByeHandlerTask(NULL), fByeHandlerClientData(NULL),
00133 fSRHandlerTask(NULL), fSRHandlerClientData(NULL),
00134 fRRHandlerTask(NULL), fRRHandlerClientData(NULL),
00135 fSpecificRRHandlerTable(NULL) {
00136 #ifdef DEBUG
00137 fprintf(stderr, "RTCPInstance[%p]::RTCPInstance()\n", this);
00138 #endif
00139 if (fTotSessionBW == 0) {
00140 env << "RTCPInstance::RTCPInstance error: totSessionBW parameter should not be zero!\n";
00141 fTotSessionBW = 1;
00142 }
00143
00144 if (isSSMSource) RTCPgs->multicastSendOnly();
00145
00146 double timeNow = dTimeNow();
00147 fPrevReportTime = fNextReportTime = timeNow;
00148
00149 fKnownMembers = new RTCPMemberDatabase(*this);
00150 fInBuf = new unsigned char[maxPacketSize];
00151 if (fKnownMembers == NULL || fInBuf == NULL) return;
00152 fNumBytesAlreadyRead = 0;
00153
00154
00155 unsigned savedMaxSize = OutPacketBuffer::maxSize;
00156 OutPacketBuffer::maxSize = maxPacketSize;
00157 fOutBuf = new OutPacketBuffer(preferredPacketSize, maxPacketSize);
00158 OutPacketBuffer::maxSize = savedMaxSize;
00159 if (fOutBuf == NULL) return;
00160
00161
00162 TaskScheduler::BackgroundHandlerProc* handler
00163 = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;
00164 fRTCPInterface.startNetworkReading(handler);
00165
00166
00167 fTypeOfEvent = EVENT_REPORT;
00168 onExpire(this);
00169 }
00170
00171 struct RRHandlerRecord {
00172 TaskFunc* rrHandlerTask;
00173 void* rrHandlerClientData;
00174 };
00175
00176 RTCPInstance::~RTCPInstance() {
00177 #ifdef DEBUG
00178 fprintf(stderr, "RTCPInstance[%p]::~RTCPInstance()\n", this);
00179 #endif
00180
00181 fRTCPInterface.stopNetworkReading();
00182
00183
00184
00185 fTypeOfEvent = EVENT_BYE;
00186 sendBYE();
00187
00188 if (fSpecificRRHandlerTable != NULL) {
00189 AddressPortLookupTable::Iterator iter(*fSpecificRRHandlerTable);
00190 RRHandlerRecord* rrHandler;
00191 while ((rrHandler = (RRHandlerRecord*)iter.next()) != NULL) {
00192 delete rrHandler;
00193 }
00194 delete fSpecificRRHandlerTable;
00195 }
00196
00197 delete fKnownMembers;
00198 delete fOutBuf;
00199 delete[] fInBuf;
00200 }
00201
00202 RTCPInstance* RTCPInstance::createNew(UsageEnvironment& env, Groupsock* RTCPgs,
00203 unsigned totSessionBW,
00204 unsigned char const* cname,
00205 RTPSink* sink, RTPSource const* source,
00206 Boolean isSSMSource) {
00207 return new RTCPInstance(env, RTCPgs, totSessionBW, cname, sink, source,
00208 isSSMSource);
00209 }
00210
00211 Boolean RTCPInstance::lookupByName(UsageEnvironment& env,
00212 char const* instanceName,
00213 RTCPInstance*& resultInstance) {
00214 resultInstance = NULL;
00215
00216 Medium* medium;
00217 if (!Medium::lookupByName(env, instanceName, medium)) return False;
00218
00219 if (!medium->isRTCPInstance()) {
00220 env.setResultMsg(instanceName, " is not a RTCP instance");
00221 return False;
00222 }
00223
00224 resultInstance = (RTCPInstance*)medium;
00225 return True;
00226 }
00227
00228 Boolean RTCPInstance::isRTCPInstance() const {
00229 return True;
00230 }
00231
00232 unsigned RTCPInstance::numMembers() const {
00233 if (fKnownMembers == NULL) return 0;
00234
00235 return fKnownMembers->numMembers();
00236 }
00237
00238 void RTCPInstance::setByeHandler(TaskFunc* handlerTask, void* clientData,
00239 Boolean handleActiveParticipantsOnly) {
00240 fByeHandlerTask = handlerTask;
00241 fByeHandlerClientData = clientData;
00242 fByeHandleActiveParticipantsOnly = handleActiveParticipantsOnly;
00243 }
00244
00245 void RTCPInstance::setSRHandler(TaskFunc* handlerTask, void* clientData) {
00246 fSRHandlerTask = handlerTask;
00247 fSRHandlerClientData = clientData;
00248 }
00249
00250 void RTCPInstance::setRRHandler(TaskFunc* handlerTask, void* clientData) {
00251 fRRHandlerTask = handlerTask;
00252 fRRHandlerClientData = clientData;
00253 }
00254
00255 void RTCPInstance
00256 ::setSpecificRRHandler(netAddressBits fromAddress, Port fromPort,
00257 TaskFunc* handlerTask, void* clientData) {
00258 if (handlerTask == NULL && clientData == NULL) {
00259 unsetSpecificRRHandler(fromAddress, fromPort);
00260 return;
00261 }
00262
00263 RRHandlerRecord* rrHandler = new RRHandlerRecord;
00264 rrHandler->rrHandlerTask = handlerTask;
00265 rrHandler->rrHandlerClientData = clientData;
00266 if (fSpecificRRHandlerTable == NULL) {
00267 fSpecificRRHandlerTable = new AddressPortLookupTable;
00268 }
00269 fSpecificRRHandlerTable->Add(fromAddress, (~0), fromPort, rrHandler);
00270 }
00271
00272 void RTCPInstance
00273 ::unsetSpecificRRHandler(netAddressBits fromAddress, Port fromPort) {
00274 if (fSpecificRRHandlerTable == NULL) return;
00275
00276 RRHandlerRecord* rrHandler
00277 = (RRHandlerRecord*)(fSpecificRRHandlerTable->Lookup(fromAddress, (~0), fromPort));
00278 if (rrHandler != NULL) {
00279 fSpecificRRHandlerTable->Remove(fromAddress, (~0), fromPort);
00280 delete rrHandler;
00281 }
00282 }
00283
00284 void RTCPInstance::setStreamSocket(int sockNum,
00285 unsigned char streamChannelId) {
00286
00287 fRTCPInterface.stopNetworkReading();
00288
00289
00290 fRTCPInterface.setStreamSocket(sockNum, streamChannelId);
00291
00292
00293 TaskScheduler::BackgroundHandlerProc* handler
00294 = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;
00295 fRTCPInterface.startNetworkReading(handler);
00296 }
00297
00298 void RTCPInstance::addStreamSocket(int sockNum,
00299 unsigned char streamChannelId) {
00300
00301 envir().taskScheduler().turnOffBackgroundReadHandling(fRTCPInterface.gs()->socketNum());
00302
00303
00304 fRTCPInterface.addStreamSocket(sockNum, streamChannelId);
00305
00306
00307 TaskScheduler::BackgroundHandlerProc* handler
00308 = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;
00309 fRTCPInterface.startNetworkReading(handler);
00310 }
00311
00312 static unsigned const IP_UDP_HDR_SIZE = 28;
00313
00314
00315 #define ADVANCE(n) pkt += (n); packetSize -= (n)
00316
00317 void RTCPInstance::incomingReportHandler(RTCPInstance* instance,
00318 int ) {
00319 instance->incomingReportHandler1();
00320 }
00321
00322 void RTCPInstance::incomingReportHandler1() {
00323 do {
00324 Boolean callByeHandler = False;
00325 int tcpReadStreamSocketNum = fRTCPInterface.nextTCPReadStreamSocketNum();
00326 unsigned char tcpReadStreamChannelId = fRTCPInterface.nextTCPReadStreamChannelId();
00327 unsigned packetSize = 0;
00328 unsigned numBytesRead;
00329 struct sockaddr_in fromAddress;
00330 Boolean packetReadWasIncomplete;
00331 Boolean readResult
00332 = fRTCPInterface.handleRead(&fInBuf[fNumBytesAlreadyRead], maxPacketSize - fNumBytesAlreadyRead,
00333 numBytesRead, fromAddress, packetReadWasIncomplete);
00334 if (packetReadWasIncomplete) {
00335 fNumBytesAlreadyRead += numBytesRead;
00336 return;
00337 } else {
00338 packetSize = fNumBytesAlreadyRead + numBytesRead;
00339 fNumBytesAlreadyRead = 0;
00340 }
00341 if (!readResult) break;
00342
00343
00344 Boolean packetWasFromOurHost = False;
00345 if (RTCPgs()->wasLoopedBackFromUs(envir(), fromAddress)) {
00346 packetWasFromOurHost = True;
00347
00348
00349
00350
00351
00352 if (fHaveJustSentPacket && fLastPacketSentSize == packetSize) {
00353
00354 fHaveJustSentPacket = False;
00355 break;
00356 }
00357 }
00358
00359 unsigned char* pkt = fInBuf;
00360 if (fIsSSMSource && !packetWasFromOurHost) {
00361
00362
00363
00364
00365
00366
00367
00368
00369
00370
00371
00372
00373
00374
00375
00376 fRTCPInterface.sendPacket(pkt, packetSize);
00377 fHaveJustSentPacket = True;
00378 fLastPacketSentSize = packetSize;
00379 }
00380
00381 #ifdef DEBUG
00382 fprintf(stderr, "[%p]saw incoming RTCP packet (from address %s, port %d)\n", this, AddressString(fromAddress).val(), ntohs(fromAddress.sin_port));
00383 for (unsigned i = 0; i < packetSize; ++i) {
00384 if (i%4 == 0) fprintf(stderr, " ");
00385 fprintf(stderr, "%02x", pkt[i]);
00386 }
00387 fprintf(stderr, "\n");
00388 #endif
00389 int totPacketSize = IP_UDP_HDR_SIZE + packetSize;
00390
00391
00392
00393
00394
00395 if (packetSize < 4) break;
00396 unsigned rtcpHdr = ntohl(*(u_int32_t*)pkt);
00397 if ((rtcpHdr & 0xE0FE0000) != (0x80000000 | (RTCP_PT_SR<<16))) {
00398 #ifdef DEBUG
00399 fprintf(stderr, "rejected bad RTCP packet: header 0x%08x\n", rtcpHdr);
00400 #endif
00401 break;
00402 }
00403
00404
00405
00406 int typeOfPacket = PACKET_UNKNOWN_TYPE;
00407 unsigned reportSenderSSRC = 0;
00408 Boolean packetOK = False;
00409 while (1) {
00410 unsigned rc = (rtcpHdr>>24)&0x1F;
00411 unsigned pt = (rtcpHdr>>16)&0xFF;
00412 unsigned length = 4*(rtcpHdr&0xFFFF);
00413 ADVANCE(4);
00414 if (length > packetSize) break;
00415
00416
00417 if (length < 4) break; length -= 4;
00418 reportSenderSSRC = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00419
00420 Boolean subPacketOK = False;
00421 switch (pt) {
00422 case RTCP_PT_SR: {
00423 #ifdef DEBUG
00424 fprintf(stderr, "SR\n");
00425 #endif
00426 if (length < 20) break; length -= 20;
00427
00428
00429 unsigned NTPmsw = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00430 unsigned NTPlsw = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00431 unsigned rtpTimestamp = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00432 if (fSource != NULL) {
00433 RTPReceptionStatsDB& receptionStats
00434 = fSource->receptionStatsDB();
00435 receptionStats.noteIncomingSR(reportSenderSSRC,
00436 NTPmsw, NTPlsw, rtpTimestamp);
00437 }
00438 ADVANCE(8);
00439
00440
00441 if (fSRHandlerTask != NULL) (*fSRHandlerTask)(fSRHandlerClientData);
00442
00443
00444 }
00445 case RTCP_PT_RR: {
00446 #ifdef DEBUG
00447 fprintf(stderr, "RR\n");
00448 #endif
00449 unsigned reportBlocksSize = rc*(6*4);
00450 if (length < reportBlocksSize) break;
00451 length -= reportBlocksSize;
00452
00453 if (fSink != NULL) {
00454
00455 RTPTransmissionStatsDB& transmissionStats = fSink->transmissionStatsDB();
00456 for (unsigned i = 0; i < rc; ++i) {
00457 unsigned senderSSRC = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00458
00459 if (senderSSRC == fSink->SSRC()) {
00460 unsigned lossStats = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00461 unsigned highestReceived = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00462 unsigned jitter = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00463 unsigned timeLastSR = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00464 unsigned timeSinceLastSR = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00465 transmissionStats.noteIncomingRR(reportSenderSSRC, fromAddress,
00466 lossStats,
00467 highestReceived, jitter,
00468 timeLastSR, timeSinceLastSR);
00469 } else {
00470 ADVANCE(4*5);
00471 }
00472 }
00473 } else {
00474 ADVANCE(reportBlocksSize);
00475 }
00476
00477 if (pt == RTCP_PT_RR) {
00478
00479
00480
00481 if (fSpecificRRHandlerTable != NULL) {
00482 netAddressBits fromAddr;
00483 portNumBits fromPortNum;
00484 if (tcpReadStreamSocketNum < 0) {
00485
00486 fromAddr = fromAddress.sin_addr.s_addr;
00487 fromPortNum = ntohs(fromAddress.sin_port);
00488 } else {
00489
00490
00491 fromAddr = tcpReadStreamSocketNum;
00492 fromPortNum = tcpReadStreamChannelId;
00493 }
00494 Port fromPort(fromPortNum);
00495 RRHandlerRecord* rrHandler
00496 = (RRHandlerRecord*)(fSpecificRRHandlerTable->Lookup(fromAddr, (~0), fromPort));
00497 if (rrHandler != NULL) {
00498 if (rrHandler->rrHandlerTask != NULL) {
00499 (*(rrHandler->rrHandlerTask))(rrHandler->rrHandlerClientData);
00500 }
00501 }
00502 }
00503
00504
00505 if (fRRHandlerTask != NULL) (*fRRHandlerTask)(fRRHandlerClientData);
00506 }
00507
00508 subPacketOK = True;
00509 typeOfPacket = PACKET_RTCP_REPORT;
00510 break;
00511 }
00512 case RTCP_PT_BYE: {
00513 #ifdef DEBUG
00514 fprintf(stderr, "BYE\n");
00515 #endif
00516
00517
00518 if (fByeHandlerTask != NULL
00519 && (!fByeHandleActiveParticipantsOnly
00520 || (fSource != NULL
00521 && fSource->receptionStatsDB().lookup(reportSenderSSRC) != NULL)
00522 || (fSink != NULL
00523 && fSink->transmissionStatsDB().lookup(reportSenderSSRC) != NULL))) {
00524 callByeHandler = True;
00525 }
00526
00527
00528
00529 subPacketOK = True;
00530 typeOfPacket = PACKET_BYE;
00531 break;
00532 }
00533
00534 default:
00535 #ifdef DEBUG
00536 fprintf(stderr, "UNSUPPORTED TYPE(0x%x)\n", pt);
00537 #endif
00538 subPacketOK = True;
00539 break;
00540 }
00541 if (!subPacketOK) break;
00542
00543
00544
00545 #ifdef DEBUG
00546 fprintf(stderr, "validated RTCP subpacket (type %d): %d, %d, %d, 0x%08x\n", typeOfPacket, rc, pt, length, reportSenderSSRC);
00547 #endif
00548
00549
00550 ADVANCE(length);
00551
00552
00553 if (packetSize == 0) {
00554 packetOK = True;
00555 break;
00556 } else if (packetSize < 4) {
00557 #ifdef DEBUG
00558 fprintf(stderr, "extraneous %d bytes at end of RTCP packet!\n", packetSize);
00559 #endif
00560 break;
00561 }
00562 rtcpHdr = ntohl(*(u_int32_t*)pkt);
00563 if ((rtcpHdr & 0xC0000000) != 0x80000000) {
00564 #ifdef DEBUG
00565 fprintf(stderr, "bad RTCP subpacket: header 0x%08x\n", rtcpHdr);
00566 #endif
00567 break;
00568 }
00569 }
00570
00571 if (!packetOK) {
00572 #ifdef DEBUG
00573 fprintf(stderr, "rejected bad RTCP subpacket: header 0x%08x\n", rtcpHdr);
00574 #endif
00575 break;
00576 } else {
00577 #ifdef DEBUG
00578 fprintf(stderr, "validated entire RTCP packet\n");
00579 #endif
00580 }
00581
00582 onReceive(typeOfPacket, totPacketSize, reportSenderSSRC);
00583
00584
00585 if (callByeHandler && fByeHandlerTask != NULL) {
00586 TaskFunc* byeHandler = fByeHandlerTask;
00587 fByeHandlerTask = NULL;
00588 (*byeHandler)(fByeHandlerClientData);
00589 }
00590 } while (0);
00591 }
00592
00593 void RTCPInstance::onReceive(int typeOfPacket, int totPacketSize,
00594 unsigned ssrc) {
00595 fTypeOfPacket = typeOfPacket;
00596 fLastReceivedSize = totPacketSize;
00597 fLastReceivedSSRC = ssrc;
00598
00599 int members = (int)numMembers();
00600 int senders = (fSink != NULL) ? 1 : 0;
00601
00602 OnReceive(this,
00603 this,
00604 &members,
00605 &fPrevNumMembers,
00606 &senders,
00607 &fAveRTCPSize,
00608 &fPrevReportTime,
00609 dTimeNow(),
00610 fNextReportTime);
00611 }
00612
00613 void RTCPInstance::sendReport() {
00614
00615
00616
00617 if (fSink != NULL && fSink->nextTimestampHasBeenPreset()) return;
00618
00619 #ifdef DEBUG
00620 fprintf(stderr, "sending REPORT\n");
00621 #endif
00622
00623 addReport();
00624
00625
00626 addSDES();
00627
00628
00629 sendBuiltPacket();
00630
00631
00632 const unsigned membershipReapPeriod = 5;
00633 if ((++fOutgoingReportCount) % membershipReapPeriod == 0) {
00634 unsigned threshold = fOutgoingReportCount - membershipReapPeriod;
00635 fKnownMembers->reapOldMembers(threshold);
00636 }
00637 }
00638
00639 void RTCPInstance::sendBYE() {
00640 #ifdef DEBUG
00641 fprintf(stderr, "sending BYE\n");
00642 #endif
00643
00644 addReport();
00645
00646 addBYE();
00647 sendBuiltPacket();
00648 }
00649
00650 void RTCPInstance::sendBuiltPacket() {
00651 #ifdef DEBUG
00652 fprintf(stderr, "sending RTCP packet\n");
00653 unsigned char* p = fOutBuf->packet();
00654 for (unsigned i = 0; i < fOutBuf->curPacketSize(); ++i) {
00655 if (i%4 == 0) fprintf(stderr," ");
00656 fprintf(stderr, "%02x", p[i]);
00657 }
00658 fprintf(stderr, "\n");
00659 #endif
00660 unsigned reportSize = fOutBuf->curPacketSize();
00661 fRTCPInterface.sendPacket(fOutBuf->packet(), reportSize);
00662 fOutBuf->resetOffset();
00663
00664 fLastSentSize = IP_UDP_HDR_SIZE + reportSize;
00665 fHaveJustSentPacket = True;
00666 fLastPacketSentSize = reportSize;
00667 }
00668
00669 int RTCPInstance::checkNewSSRC() {
00670 return fKnownMembers->noteMembership(fLastReceivedSSRC,
00671 fOutgoingReportCount);
00672 }
00673
00674 void RTCPInstance::removeLastReceivedSSRC() {
00675 removeSSRC(fLastReceivedSSRC, False);
00676 }
00677
00678 void RTCPInstance::removeSSRC(u_int32_t ssrc, Boolean alsoRemoveStats) {
00679 fKnownMembers->remove(ssrc);
00680
00681 if (alsoRemoveStats) {
00682
00683 if (fSource != NULL) fSource->receptionStatsDB().removeRecord(ssrc);
00684 if (fSink != NULL) fSink->transmissionStatsDB().removeRecord(ssrc);
00685 }
00686 }
00687
00688 void RTCPInstance::onExpire(RTCPInstance* instance) {
00689 instance->onExpire1();
00690 }
00691
00692
00693
00694 void RTCPInstance::addReport() {
00695
00696
00697 if (fSink != NULL) {
00698 addSR();
00699 } else if (fSource != NULL) {
00700 addRR();
00701 }
00702 }
00703
00704 void RTCPInstance::addSR() {
00705
00706
00707 enqueueCommonReportPrefix(RTCP_PT_SR, fSink->SSRC(),
00708 5 );
00709
00710
00711
00712
00713 struct timeval timeNow;
00714 gettimeofday(&timeNow, NULL);
00715 fOutBuf->enqueueWord(timeNow.tv_sec + 0x83AA7E80);
00716
00717 double fractionalPart = (timeNow.tv_usec/15625.0)*0x04000000;
00718 fOutBuf->enqueueWord((unsigned)(fractionalPart+0.5));
00719
00720 unsigned rtpTimestamp = fSink->convertToRTPTimestamp(timeNow);
00721 fOutBuf->enqueueWord(rtpTimestamp);
00722
00723
00724 fOutBuf->enqueueWord(fSink->packetCount());
00725 fOutBuf->enqueueWord(fSink->octetCount());
00726
00727 enqueueCommonReportSuffix();
00728 }
00729
00730 void RTCPInstance::addRR() {
00731
00732
00733 enqueueCommonReportPrefix(RTCP_PT_RR, fSource->SSRC());
00734 enqueueCommonReportSuffix();
00735 }
00736
00737 void RTCPInstance::enqueueCommonReportPrefix(unsigned char packetType,
00738 unsigned SSRC,
00739 unsigned numExtraWords) {
00740 unsigned numReportingSources;
00741 if (fSource == NULL) {
00742 numReportingSources = 0;
00743 } else {
00744 RTPReceptionStatsDB& allReceptionStats
00745 = fSource->receptionStatsDB();
00746 numReportingSources = allReceptionStats.numActiveSourcesSinceLastReset();
00747
00748 if (numReportingSources >= 32) { numReportingSources = 32; }
00749
00750 }
00751
00752 unsigned rtcpHdr = 0x80000000;
00753 rtcpHdr |= (numReportingSources<<24);
00754 rtcpHdr |= (packetType<<16);
00755 rtcpHdr |= (1 + numExtraWords + 6*numReportingSources);
00756
00757 fOutBuf->enqueueWord(rtcpHdr);
00758
00759 fOutBuf->enqueueWord(SSRC);
00760 }
00761
00762 void RTCPInstance::enqueueCommonReportSuffix() {
00763
00764 if (fSource != NULL) {
00765 RTPReceptionStatsDB& allReceptionStats
00766 = fSource->receptionStatsDB();
00767
00768 RTPReceptionStatsDB::Iterator iterator(allReceptionStats);
00769 while (1) {
00770 RTPReceptionStats* receptionStats = iterator.next();
00771 if (receptionStats == NULL) break;
00772 enqueueReportBlock(receptionStats);
00773 }
00774
00775 allReceptionStats.reset();
00776 }
00777 }
00778
00779 void
00780 RTCPInstance::enqueueReportBlock(RTPReceptionStats* stats) {
00781 fOutBuf->enqueueWord(stats->SSRC());
00782
00783 unsigned highestExtSeqNumReceived = stats->highestExtSeqNumReceived();
00784
00785 unsigned totNumExpected
00786 = highestExtSeqNumReceived - stats->baseExtSeqNumReceived();
00787 int totNumLost = totNumExpected - stats->totNumPacketsReceived();
00788
00789 if (totNumLost > 0x007FFFFF) {
00790 totNumLost = 0x007FFFFF;
00791 } else if (totNumLost < 0) {
00792 if (totNumLost < -0x00800000) totNumLost = 0x00800000;
00793 totNumLost &= 0x00FFFFFF;
00794 }
00795
00796 unsigned numExpectedSinceLastReset
00797 = highestExtSeqNumReceived - stats->lastResetExtSeqNumReceived();
00798 int numLostSinceLastReset
00799 = numExpectedSinceLastReset - stats->numPacketsReceivedSinceLastReset();
00800 unsigned char lossFraction;
00801 if (numExpectedSinceLastReset == 0 || numLostSinceLastReset < 0) {
00802 lossFraction = 0;
00803 } else {
00804 lossFraction = (unsigned char)
00805 ((numLostSinceLastReset << 8) / numExpectedSinceLastReset);
00806 }
00807
00808 fOutBuf->enqueueWord((lossFraction<<24) | totNumLost);
00809 fOutBuf->enqueueWord(highestExtSeqNumReceived);
00810
00811 fOutBuf->enqueueWord(stats->jitter());
00812
00813 unsigned NTPmsw = stats->lastReceivedSR_NTPmsw();
00814 unsigned NTPlsw = stats->lastReceivedSR_NTPlsw();
00815 unsigned LSR = ((NTPmsw&0xFFFF)<<16)|(NTPlsw>>16);
00816 fOutBuf->enqueueWord(LSR);
00817
00818
00819 struct timeval const& LSRtime = stats->lastReceivedSR_time();
00820 struct timeval timeNow, timeSinceLSR;
00821 gettimeofday(&timeNow, NULL);
00822 if (timeNow.tv_usec < LSRtime.tv_usec) {
00823 timeNow.tv_usec += 1000000;
00824 timeNow.tv_sec -= 1;
00825 }
00826 timeSinceLSR.tv_sec = timeNow.tv_sec - LSRtime.tv_sec;
00827 timeSinceLSR.tv_usec = timeNow.tv_usec - LSRtime.tv_usec;
00828
00829
00830 unsigned DLSR;
00831 if (LSR == 0) {
00832 DLSR = 0;
00833 } else {
00834 DLSR = (timeSinceLSR.tv_sec<<16)
00835 | ( (((timeSinceLSR.tv_usec<<11)+15625)/31250) & 0xFFFF);
00836 }
00837 fOutBuf->enqueueWord(DLSR);
00838 }
00839
00840 void RTCPInstance::addSDES() {
00841
00842
00843
00844 unsigned numBytes = 4;
00845
00846 numBytes += fCNAME.totalSize();
00847 numBytes += 1;
00848
00849 unsigned num4ByteWords = (numBytes + 3)/4;
00850
00851 unsigned rtcpHdr = 0x81000000;
00852 rtcpHdr |= (RTCP_PT_SDES<<16);
00853 rtcpHdr |= num4ByteWords;
00854 fOutBuf->enqueueWord(rtcpHdr);
00855
00856 if (fSource != NULL) {
00857 fOutBuf->enqueueWord(fSource->SSRC());
00858 } else if (fSink != NULL) {
00859 fOutBuf->enqueueWord(fSink->SSRC());
00860 }
00861
00862
00863 fOutBuf->enqueue(fCNAME.data(), fCNAME.totalSize());
00864
00865
00866 unsigned numPaddingBytesNeeded = 4 - (fOutBuf->curPacketSize() % 4);
00867 unsigned char const zero = '\0';
00868 while (numPaddingBytesNeeded-- > 0) fOutBuf->enqueue(&zero, 1);
00869 }
00870
00871 void RTCPInstance::addBYE() {
00872 unsigned rtcpHdr = 0x81000000;
00873 rtcpHdr |= (RTCP_PT_BYE<<16);
00874 rtcpHdr |= 1;
00875 fOutBuf->enqueueWord(rtcpHdr);
00876
00877 if (fSource != NULL) {
00878 fOutBuf->enqueueWord(fSource->SSRC());
00879 } else if (fSink != NULL) {
00880 fOutBuf->enqueueWord(fSink->SSRC());
00881 }
00882 }
00883
00884 void RTCPInstance::schedule(double nextTime) {
00885 fNextReportTime = nextTime;
00886
00887 double secondsToDelay = nextTime - dTimeNow();
00888 if (secondsToDelay < 0) secondsToDelay = 0;
00889 #ifdef DEBUG
00890 fprintf(stderr, "schedule(%f->%f)\n", secondsToDelay, nextTime);
00891 #endif
00892 int64_t usToGo = (int64_t)(secondsToDelay * 1000000);
00893 nextTask() = envir().taskScheduler().scheduleDelayedTask(usToGo,
00894 (TaskFunc*)RTCPInstance::onExpire, this);
00895 }
00896
00897 void RTCPInstance::reschedule(double nextTime) {
00898 envir().taskScheduler().unscheduleDelayedTask(nextTask());
00899 schedule(nextTime);
00900 }
00901
00902 void RTCPInstance::onExpire1() {
00903
00904 double rtcpBW = 0.05*fTotSessionBW*1024/8;
00905
00906 OnExpire(this,
00907 numMembers(),
00908 (fSink != NULL) ? 1 : 0,
00909 rtcpBW,
00910 (fSink != NULL) ? 1 : 0,
00911 &fAveRTCPSize,
00912 &fIsInitial,
00913 dTimeNow(),
00914 &fPrevReportTime,
00915 &fPrevNumMembers
00916 );
00917 }
00918
00920
00921 SDESItem::SDESItem(unsigned char tag, unsigned char const* value) {
00922 unsigned length = strlen((char const*)value);
00923 if (length > 0xFF) length = 0xFF;
00924
00925 fData[0] = tag;
00926 fData[1] = (unsigned char)length;
00927 memmove(&fData[2], value, length);
00928 }
00929
00930 unsigned SDESItem::totalSize() const {
00931 return 2 + (unsigned)fData[1];
00932 }
00933
00934
00936
00937 extern "C" void Schedule(double nextTime, event e) {
00938 RTCPInstance* instance = (RTCPInstance*)e;
00939 if (instance == NULL) return;
00940
00941 instance->schedule(nextTime);
00942 }
00943
00944 extern "C" void Reschedule(double nextTime, event e) {
00945 RTCPInstance* instance = (RTCPInstance*)e;
00946 if (instance == NULL) return;
00947
00948 instance->reschedule(nextTime);
00949 }
00950
00951 extern "C" void SendRTCPReport(event e) {
00952 RTCPInstance* instance = (RTCPInstance*)e;
00953 if (instance == NULL) return;
00954
00955 instance->sendReport();
00956 }
00957
00958 extern "C" void SendBYEPacket(event e) {
00959 RTCPInstance* instance = (RTCPInstance*)e;
00960 if (instance == NULL) return;
00961
00962 instance->sendBYE();
00963 }
00964
00965 extern "C" int TypeOfEvent(event e) {
00966 RTCPInstance* instance = (RTCPInstance*)e;
00967 if (instance == NULL) return EVENT_UNKNOWN;
00968
00969 return instance->typeOfEvent();
00970 }
00971
00972 extern "C" int SentPacketSize(event e) {
00973 RTCPInstance* instance = (RTCPInstance*)e;
00974 if (instance == NULL) return 0;
00975
00976 return instance->sentPacketSize();
00977 }
00978
00979 extern "C" int PacketType(packet p) {
00980 RTCPInstance* instance = (RTCPInstance*)p;
00981 if (instance == NULL) return PACKET_UNKNOWN_TYPE;
00982
00983 return instance->packetType();
00984 }
00985
00986 extern "C" int ReceivedPacketSize(packet p) {
00987 RTCPInstance* instance = (RTCPInstance*)p;
00988 if (instance == NULL) return 0;
00989
00990 return instance->receivedPacketSize();
00991 }
00992
00993 extern "C" int NewMember(packet p) {
00994 RTCPInstance* instance = (RTCPInstance*)p;
00995 if (instance == NULL) return 0;
00996
00997 return instance->checkNewSSRC();
00998 }
00999
01000 extern "C" int NewSender(packet ) {
01001 return 0;
01002 }
01003
01004 extern "C" void AddMember(packet ) {
01005
01006 }
01007
01008 extern "C" void AddSender(packet ) {
01009
01010 }
01011
01012 extern "C" void RemoveMember(packet p) {
01013 RTCPInstance* instance = (RTCPInstance*)p;
01014 if (instance == NULL) return;
01015
01016 instance->removeLastReceivedSSRC();
01017 }
01018
01019 extern "C" void RemoveSender(packet ) {
01020
01021 }
01022
01023 extern "C" double drand30() {
01024 unsigned tmp = our_random()&0x3FFFFFFF;
01025 return tmp/(double)(1024*1024*1024);
01026 }