liveMedia/RTCP.cpp

Go to the documentation of this file.
00001 /**********
00002 This library is free software; you can redistribute it and/or modify it under
00003 the terms of the GNU Lesser General Public License as published by the
00004 Free Software Foundation; either version 2.1 of the License, or (at your
00005 option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
00006 
00007 This library is distributed in the hope that it will be useful, but WITHOUT
00008 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00009 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
00010 more details.
00011 
00012 You should have received a copy of the GNU Lesser General Public License
00013 along with this library; if not, write to the Free Software Foundation, Inc.,
00014 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
00015 **********/
00016 // "liveMedia"
00017 // Copyright (c) 1996-2012 Live Networks, Inc.  All rights reserved.
00018 // RTCP
00019 // Implementation
00020 
00021 #include "RTCP.hh"
00022 #include "GroupsockHelper.hh"
00023 #include "rtcp_from_spec.h"
00024 
00026 
00027 class RTCPMemberDatabase {
00028 public:
00029   RTCPMemberDatabase(RTCPInstance& ourRTCPInstance)
00030     : fOurRTCPInstance(ourRTCPInstance), fNumMembers(1 /*ourself*/),
00031       fTable(HashTable::create(ONE_WORD_HASH_KEYS)) {
00032   }
00033 
00034   virtual ~RTCPMemberDatabase() {
00035         delete fTable;
00036   }
00037 
00038   Boolean isMember(unsigned ssrc) const {
00039     return fTable->Lookup((char*)(long)ssrc) != NULL;
00040   }
00041 
00042   Boolean noteMembership(unsigned ssrc, unsigned curTimeCount) {
00043     Boolean isNew = !isMember(ssrc);
00044 
00045     if (isNew) {
00046       ++fNumMembers;
00047     }
00048 
00049     // Record the current time, so we can age stale members
00050     fTable->Add((char*)(long)ssrc, (void*)(long)curTimeCount);
00051 
00052     return isNew;
00053   }
00054 
00055   Boolean remove(unsigned ssrc) {
00056     Boolean wasPresent = fTable->Remove((char*)(long)ssrc);
00057     if (wasPresent) {
00058       --fNumMembers;
00059     }
00060     return wasPresent;
00061   }
00062 
00063   unsigned numMembers() const {
00064     return fNumMembers;
00065   }
00066 
00067   void reapOldMembers(unsigned threshold);
00068 
00069 private:
00070   RTCPInstance& fOurRTCPInstance;
00071   unsigned fNumMembers;
00072   HashTable* fTable;
00073 };
00074 
00075 void RTCPMemberDatabase::reapOldMembers(unsigned threshold) {
00076   Boolean foundOldMember;
00077   u_int32_t oldSSRC = 0;
00078 
00079   do {
00080     foundOldMember = False;
00081 
00082     HashTable::Iterator* iter
00083       = HashTable::Iterator::create(*fTable);
00084     uintptr_t timeCount;
00085     char const* key;
00086     while ((timeCount = (uintptr_t)(iter->next(key))) != 0) {
00087 #ifdef DEBUG
00088       fprintf(stderr, "reap: checking SSRC 0x%lx: %ld (threshold %d)\n", (unsigned long)key, timeCount, threshold);
00089 #endif
00090       if (timeCount < (uintptr_t)threshold) { // this SSRC is old
00091         uintptr_t ssrc = (uintptr_t)key;
00092         oldSSRC = (u_int32_t)ssrc;
00093         foundOldMember = True;
00094       }
00095     }
00096     delete iter;
00097 
00098     if (foundOldMember) {
00099 #ifdef DEBUG
00100         fprintf(stderr, "reap: removing SSRC 0x%x\n", oldSSRC);
00101 #endif
00102       fOurRTCPInstance.removeSSRC(oldSSRC, True);
00103     }
00104   } while (foundOldMember);
00105 }
00106 
00107 
00109 
00110 static double dTimeNow() {
00111     struct timeval timeNow;
00112     gettimeofday(&timeNow, NULL);
00113     return (double) (timeNow.tv_sec + timeNow.tv_usec/1000000.0);
00114 }
00115 
00116 static unsigned const maxPacketSize = 1450;
00117         // bytes (1500, minus some allowance for IP, UDP, UMTP headers)
00118 static unsigned const preferredPacketSize = 1000; // bytes
00119 
00120 RTCPInstance::RTCPInstance(UsageEnvironment& env, Groupsock* RTCPgs,
00121                            unsigned totSessionBW,
00122                            unsigned char const* cname,
00123                            RTPSink* sink, RTPSource const* source,
00124                            Boolean isSSMSource)
00125   : Medium(env), fRTCPInterface(this, RTCPgs), fTotSessionBW(totSessionBW),
00126     fSink(sink), fSource(source), fIsSSMSource(isSSMSource),
00127     fCNAME(RTCP_SDES_CNAME, cname), fOutgoingReportCount(1),
00128     fAveRTCPSize(0), fIsInitial(1), fPrevNumMembers(0),
00129     fLastSentSize(0), fLastReceivedSize(0), fLastReceivedSSRC(0),
00130     fTypeOfEvent(EVENT_UNKNOWN), fTypeOfPacket(PACKET_UNKNOWN_TYPE),
00131     fHaveJustSentPacket(False), fLastPacketSentSize(0),
00132     fByeHandlerTask(NULL), fByeHandlerClientData(NULL),
00133     fSRHandlerTask(NULL), fSRHandlerClientData(NULL),
00134     fRRHandlerTask(NULL), fRRHandlerClientData(NULL),
00135     fSpecificRRHandlerTable(NULL) {
00136 #ifdef DEBUG
00137   fprintf(stderr, "RTCPInstance[%p]::RTCPInstance()\n", this);
00138 #endif
00139   if (fTotSessionBW == 0) { // not allowed!
00140     env << "RTCPInstance::RTCPInstance error: totSessionBW parameter should not be zero!\n";
00141     fTotSessionBW = 1;
00142   }
00143 
00144   if (isSSMSource) RTCPgs->multicastSendOnly(); // don't receive multicast
00145 
00146   double timeNow = dTimeNow();
00147   fPrevReportTime = fNextReportTime = timeNow;
00148 
00149   fKnownMembers = new RTCPMemberDatabase(*this);
00150   fInBuf = new unsigned char[maxPacketSize];
00151   if (fKnownMembers == NULL || fInBuf == NULL) return;
00152   fNumBytesAlreadyRead = 0;
00153 
00154   // A hack to save buffer space, because RTCP packets are always small:
00155   unsigned savedMaxSize = OutPacketBuffer::maxSize;
00156   OutPacketBuffer::maxSize = maxPacketSize;
00157   fOutBuf = new OutPacketBuffer(preferredPacketSize, maxPacketSize);
00158   OutPacketBuffer::maxSize = savedMaxSize;
00159   if (fOutBuf == NULL) return;
00160 
00161   // Arrange to handle incoming reports from others:
00162   TaskScheduler::BackgroundHandlerProc* handler
00163     = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;
00164   fRTCPInterface.startNetworkReading(handler);
00165 
00166   // Send our first report.
00167   fTypeOfEvent = EVENT_REPORT;
00168   onExpire(this);
00169 }
00170 
00171 struct RRHandlerRecord {
00172   TaskFunc* rrHandlerTask;
00173   void* rrHandlerClientData;
00174 };
00175 
00176 RTCPInstance::~RTCPInstance() {
00177 #ifdef DEBUG
00178   fprintf(stderr, "RTCPInstance[%p]::~RTCPInstance()\n", this);
00179 #endif
00180   // Turn off background read handling:
00181   fRTCPInterface.stopNetworkReading();
00182 
00183   // Begin by sending a BYE.  We have to do this immediately, without
00184   // 'reconsideration', because "this" is going away.
00185   fTypeOfEvent = EVENT_BYE; // not used, but...
00186   sendBYE();
00187 
00188   if (fSpecificRRHandlerTable != NULL) {
00189     AddressPortLookupTable::Iterator iter(*fSpecificRRHandlerTable);
00190     RRHandlerRecord* rrHandler;
00191     while ((rrHandler = (RRHandlerRecord*)iter.next()) != NULL) {
00192       delete rrHandler;
00193     }
00194     delete fSpecificRRHandlerTable;
00195   }
00196 
00197   delete fKnownMembers;
00198   delete fOutBuf;
00199   delete[] fInBuf;
00200 }
00201 
00202 RTCPInstance* RTCPInstance::createNew(UsageEnvironment& env, Groupsock* RTCPgs,
00203                                       unsigned totSessionBW,
00204                                       unsigned char const* cname,
00205                                       RTPSink* sink, RTPSource const* source,
00206                                       Boolean isSSMSource) {
00207   return new RTCPInstance(env, RTCPgs, totSessionBW, cname, sink, source,
00208                           isSSMSource);
00209 }
00210 
00211 Boolean RTCPInstance::lookupByName(UsageEnvironment& env,
00212                                    char const* instanceName,
00213                                    RTCPInstance*& resultInstance) {
00214   resultInstance = NULL; // unless we succeed
00215 
00216   Medium* medium;
00217   if (!Medium::lookupByName(env, instanceName, medium)) return False;
00218 
00219   if (!medium->isRTCPInstance()) {
00220     env.setResultMsg(instanceName, " is not a RTCP instance");
00221     return False;
00222   }
00223 
00224   resultInstance = (RTCPInstance*)medium;
00225   return True;
00226 }
00227 
00228 Boolean RTCPInstance::isRTCPInstance() const {
00229   return True;
00230 }
00231 
00232 unsigned RTCPInstance::numMembers() const {
00233   if (fKnownMembers == NULL) return 0;
00234 
00235   return fKnownMembers->numMembers();
00236 }
00237 
00238 void RTCPInstance::setByeHandler(TaskFunc* handlerTask, void* clientData,
00239                                  Boolean handleActiveParticipantsOnly) {
00240   fByeHandlerTask = handlerTask;
00241   fByeHandlerClientData = clientData;
00242   fByeHandleActiveParticipantsOnly = handleActiveParticipantsOnly;
00243 }
00244 
00245 void RTCPInstance::setSRHandler(TaskFunc* handlerTask, void* clientData) {
00246   fSRHandlerTask = handlerTask;
00247   fSRHandlerClientData = clientData;
00248 }
00249 
00250 void RTCPInstance::setRRHandler(TaskFunc* handlerTask, void* clientData) {
00251   fRRHandlerTask = handlerTask;
00252   fRRHandlerClientData = clientData;
00253 }
00254 
00255 void RTCPInstance
00256 ::setSpecificRRHandler(netAddressBits fromAddress, Port fromPort,
00257                        TaskFunc* handlerTask, void* clientData) {
00258   if (handlerTask == NULL && clientData == NULL) {
00259     unsetSpecificRRHandler(fromAddress, fromPort);
00260     return;
00261   }
00262 
00263   RRHandlerRecord* rrHandler = new RRHandlerRecord;
00264   rrHandler->rrHandlerTask = handlerTask;
00265   rrHandler->rrHandlerClientData = clientData;
00266   if (fSpecificRRHandlerTable == NULL) {
00267     fSpecificRRHandlerTable = new AddressPortLookupTable;
00268   }
00269   fSpecificRRHandlerTable->Add(fromAddress, (~0), fromPort, rrHandler);
00270 }
00271 
00272 void RTCPInstance
00273 ::unsetSpecificRRHandler(netAddressBits fromAddress, Port fromPort) {
00274   if (fSpecificRRHandlerTable == NULL) return;
00275 
00276   RRHandlerRecord* rrHandler
00277     = (RRHandlerRecord*)(fSpecificRRHandlerTable->Lookup(fromAddress, (~0), fromPort));
00278   if (rrHandler != NULL) {
00279     fSpecificRRHandlerTable->Remove(fromAddress, (~0), fromPort);
00280     delete rrHandler;
00281   }
00282 }
00283 
00284 void RTCPInstance::setStreamSocket(int sockNum,
00285                                    unsigned char streamChannelId) {
00286   // Turn off background read handling:
00287   fRTCPInterface.stopNetworkReading();
00288 
00289   // Switch to RTCP-over-TCP:
00290   fRTCPInterface.setStreamSocket(sockNum, streamChannelId);
00291 
00292   // Turn background reading back on:
00293   TaskScheduler::BackgroundHandlerProc* handler
00294     = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;
00295   fRTCPInterface.startNetworkReading(handler);
00296 }
00297 
00298 void RTCPInstance::addStreamSocket(int sockNum,
00299                                    unsigned char streamChannelId) {
00300   // First, turn off background read handling for the default (UDP) socket:
00301   envir().taskScheduler().turnOffBackgroundReadHandling(fRTCPInterface.gs()->socketNum());
00302 
00303   // Add the RTCP-over-TCP interface:
00304   fRTCPInterface.addStreamSocket(sockNum, streamChannelId);
00305 
00306   // Turn on background reading for this socket (in case it's not on already):
00307   TaskScheduler::BackgroundHandlerProc* handler
00308     = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;
00309   fRTCPInterface.startNetworkReading(handler);
00310 }
00311 
00312 static unsigned const IP_UDP_HDR_SIZE = 28;
00313     // overhead (bytes) of IP and UDP hdrs
00314 
00315 #define ADVANCE(n) pkt += (n); packetSize -= (n)
00316 
00317 void RTCPInstance::incomingReportHandler(RTCPInstance* instance,
00318                                          int /*mask*/) {
00319   instance->incomingReportHandler1();
00320 }
00321 
00322 void RTCPInstance::incomingReportHandler1() {
00323   do {
00324     Boolean callByeHandler = False;
00325     int tcpReadStreamSocketNum = fRTCPInterface.nextTCPReadStreamSocketNum();
00326     unsigned char tcpReadStreamChannelId = fRTCPInterface.nextTCPReadStreamChannelId();
00327     unsigned packetSize = 0;
00328     unsigned numBytesRead;
00329     struct sockaddr_in fromAddress;
00330     Boolean packetReadWasIncomplete;
00331     Boolean readResult
00332       = fRTCPInterface.handleRead(&fInBuf[fNumBytesAlreadyRead], maxPacketSize - fNumBytesAlreadyRead,
00333                                   numBytesRead, fromAddress, packetReadWasIncomplete);
00334     if (packetReadWasIncomplete) {
00335       fNumBytesAlreadyRead += numBytesRead;
00336       return; // more reads are needed to get the entire packet
00337     } else { // normal case: We've read the entire packet 
00338       packetSize = fNumBytesAlreadyRead + numBytesRead;
00339       fNumBytesAlreadyRead = 0; // for next time
00340     }
00341     if (!readResult) break;
00342 
00343     // Ignore the packet if it was looped-back from ourself:
00344     Boolean packetWasFromOurHost = False;
00345     if (RTCPgs()->wasLoopedBackFromUs(envir(), fromAddress)) {
00346       packetWasFromOurHost = True;
00347       // However, we still want to handle incoming RTCP packets from
00348       // *other processes* on the same machine.  To distinguish this
00349       // case from a true loop-back, check whether we've just sent a
00350       // packet of the same size.  (This check isn't perfect, but it seems
00351       // to be the best we can do.)
00352       if (fHaveJustSentPacket && fLastPacketSentSize == packetSize) {
00353         // This is a true loop-back:
00354         fHaveJustSentPacket = False;
00355         break; // ignore this packet
00356       }
00357     }
00358 
00359     unsigned char* pkt = fInBuf;
00360     if (fIsSSMSource && !packetWasFromOurHost) {
00361       // This packet is assumed to have been received via unicast (because we're a SSM source, and SSM receivers send back RTCP "RR"
00362       // packets via unicast).  'Reflect' the packet by resending it to the multicast group, so that any other receivers can also
00363       // get to see it.
00364 
00365       // NOTE: Denial-of-service attacks are possible here.
00366       // Users of this software may wish to add their own,
00367       // application-specific mechanism for 'authenticating' the
00368       // validity of this packet before reflecting it.
00369 
00370       // NOTE: The test for "!packetWasFromOurHost" means that we won't reflect RTCP packets that come from other processes on
00371       // the same host as us.  The reason for this is that the 'packet size' test above is not 100% reliable; some packets
00372       // that were truly looped back from us might not be detected as such, and this might lead to infinite forwarding/receiving
00373       // of some packets.  To avoid this possibility, we only reflect RTCP packets that we know for sure originated elsewhere.
00374       // (Note, though, that if we ever re-enable the code in "Groupsock::multicastSendOnly()", then we could remove the test for
00375       // "!packetWasFromOurHost".)
00376       fRTCPInterface.sendPacket(pkt, packetSize);
00377       fHaveJustSentPacket = True;
00378       fLastPacketSentSize = packetSize;
00379     }
00380 
00381 #ifdef DEBUG
00382     fprintf(stderr, "[%p]saw incoming RTCP packet (from address %s, port %d)\n", this, AddressString(fromAddress).val(), ntohs(fromAddress.sin_port));
00383     for (unsigned i = 0; i < packetSize; ++i) {
00384       if (i%4 == 0) fprintf(stderr, " ");
00385       fprintf(stderr, "%02x", pkt[i]);
00386     }
00387     fprintf(stderr, "\n");
00388 #endif
00389     int totPacketSize = IP_UDP_HDR_SIZE + packetSize;
00390 
00391     // Check the RTCP packet for validity:
00392     // It must at least contain a header (4 bytes), and this header
00393     // must be version=2, with no padding bit, and a payload type of
00394     // SR (200) or RR (201):
00395     if (packetSize < 4) break;
00396     unsigned rtcpHdr = ntohl(*(u_int32_t*)pkt);
00397     if ((rtcpHdr & 0xE0FE0000) != (0x80000000 | (RTCP_PT_SR<<16))) {
00398 #ifdef DEBUG
00399       fprintf(stderr, "rejected bad RTCP packet: header 0x%08x\n", rtcpHdr);
00400 #endif
00401       break;
00402     }
00403 
00404     // Process each of the individual RTCP 'subpackets' in (what may be)
00405     // a compound RTCP packet.
00406     int typeOfPacket = PACKET_UNKNOWN_TYPE;
00407     unsigned reportSenderSSRC = 0;
00408     Boolean packetOK = False;
00409     while (1) {
00410       unsigned rc = (rtcpHdr>>24)&0x1F;
00411       unsigned pt = (rtcpHdr>>16)&0xFF;
00412       unsigned length = 4*(rtcpHdr&0xFFFF); // doesn't count hdr
00413       ADVANCE(4); // skip over the header
00414       if (length > packetSize) break;
00415 
00416       // Assume that each RTCP subpacket begins with a 4-byte SSRC:
00417       if (length < 4) break; length -= 4;
00418       reportSenderSSRC = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00419 
00420       Boolean subPacketOK = False;
00421       switch (pt) {
00422         case RTCP_PT_SR: {
00423 #ifdef DEBUG
00424           fprintf(stderr, "SR\n");
00425 #endif
00426           if (length < 20) break; length -= 20;
00427 
00428           // Extract the NTP timestamp, and note this:
00429           unsigned NTPmsw = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00430           unsigned NTPlsw = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00431           unsigned rtpTimestamp = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00432           if (fSource != NULL) {
00433             RTPReceptionStatsDB& receptionStats
00434               = fSource->receptionStatsDB();
00435             receptionStats.noteIncomingSR(reportSenderSSRC,
00436                                           NTPmsw, NTPlsw, rtpTimestamp);
00437           }
00438           ADVANCE(8); // skip over packet count, octet count
00439 
00440           // If a 'SR handler' was set, call it now:
00441           if (fSRHandlerTask != NULL) (*fSRHandlerTask)(fSRHandlerClientData);
00442 
00443           // The rest of the SR is handled like a RR (so, no "break;" here)
00444         }
00445         case RTCP_PT_RR: {
00446 #ifdef DEBUG
00447           fprintf(stderr, "RR\n");
00448 #endif
00449           unsigned reportBlocksSize = rc*(6*4);
00450           if (length < reportBlocksSize) break;
00451           length -= reportBlocksSize;
00452 
00453           if (fSink != NULL) {
00454             // Use this information to update stats about our transmissions:
00455             RTPTransmissionStatsDB& transmissionStats = fSink->transmissionStatsDB();
00456             for (unsigned i = 0; i < rc; ++i) {
00457               unsigned senderSSRC = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00458               // We care only about reports about our own transmission, not others'
00459               if (senderSSRC == fSink->SSRC()) {
00460                 unsigned lossStats = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00461                 unsigned highestReceived = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00462                 unsigned jitter = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00463                 unsigned timeLastSR = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00464                 unsigned timeSinceLastSR = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00465                 transmissionStats.noteIncomingRR(reportSenderSSRC, fromAddress,
00466                                                  lossStats,
00467                                                  highestReceived, jitter,
00468                                                  timeLastSR, timeSinceLastSR);
00469               } else {
00470                 ADVANCE(4*5);
00471               }
00472             }
00473           } else {
00474             ADVANCE(reportBlocksSize);
00475           }
00476 
00477           if (pt == RTCP_PT_RR) { // i.e., we didn't fall through from 'SR'
00478             // If a 'RR handler' was set, call it now:
00479 
00480             // Specific RR handler:
00481             if (fSpecificRRHandlerTable != NULL) {
00482               netAddressBits fromAddr;
00483               portNumBits fromPortNum;
00484               if (tcpReadStreamSocketNum < 0) {
00485                 // Normal case: We read the RTCP packet over UDP
00486                 fromAddr = fromAddress.sin_addr.s_addr;
00487                 fromPortNum = ntohs(fromAddress.sin_port);
00488               } else {
00489                 // Special case: We read the RTCP packet over TCP (interleaved)
00490                 // Hack: Use the TCP socket and channel id to look up the handler
00491                 fromAddr = tcpReadStreamSocketNum;
00492                 fromPortNum = tcpReadStreamChannelId;
00493               }
00494               Port fromPort(fromPortNum);
00495               RRHandlerRecord* rrHandler
00496                 = (RRHandlerRecord*)(fSpecificRRHandlerTable->Lookup(fromAddr, (~0), fromPort));
00497               if (rrHandler != NULL) {
00498                 if (rrHandler->rrHandlerTask != NULL) {
00499                   (*(rrHandler->rrHandlerTask))(rrHandler->rrHandlerClientData);
00500                 }
00501               }
00502             }
00503 
00504             // General RR handler:
00505             if (fRRHandlerTask != NULL) (*fRRHandlerTask)(fRRHandlerClientData);
00506           }
00507 
00508           subPacketOK = True;
00509           typeOfPacket = PACKET_RTCP_REPORT;
00510           break;
00511         }
00512         case RTCP_PT_BYE: {
00513 #ifdef DEBUG
00514           fprintf(stderr, "BYE\n");
00515 #endif
00516           // If a 'BYE handler' was set, arrange for it to be called at the end of this routine.
00517           // (Note: We don't call it immediately, in case it happens to cause "this" to be deleted.)
00518           if (fByeHandlerTask != NULL
00519               && (!fByeHandleActiveParticipantsOnly
00520                   || (fSource != NULL
00521                       && fSource->receptionStatsDB().lookup(reportSenderSSRC) != NULL)
00522                   || (fSink != NULL
00523                       && fSink->transmissionStatsDB().lookup(reportSenderSSRC) != NULL))) {
00524             callByeHandler = True;
00525           }
00526 
00527           // We should really check for & handle >1 SSRCs being present #####
00528 
00529           subPacketOK = True;
00530           typeOfPacket = PACKET_BYE;
00531           break;
00532         }
00533         // Later handle SDES, APP, and compound RTCP packets #####
00534         default:
00535 #ifdef DEBUG
00536           fprintf(stderr, "UNSUPPORTED TYPE(0x%x)\n", pt);
00537 #endif
00538           subPacketOK = True;
00539           break;
00540       }
00541       if (!subPacketOK) break;
00542 
00543       // need to check for (& handle) SSRC collision! #####
00544 
00545 #ifdef DEBUG
00546       fprintf(stderr, "validated RTCP subpacket (type %d): %d, %d, %d, 0x%08x\n", typeOfPacket, rc, pt, length, reportSenderSSRC);
00547 #endif
00548 
00549       // Skip over any remaining bytes in this subpacket:
00550       ADVANCE(length);
00551 
00552       // Check whether another RTCP 'subpacket' follows:
00553       if (packetSize == 0) {
00554         packetOK = True;
00555         break;
00556       } else if (packetSize < 4) {
00557 #ifdef DEBUG
00558         fprintf(stderr, "extraneous %d bytes at end of RTCP packet!\n", packetSize);
00559 #endif
00560         break;
00561       }
00562       rtcpHdr = ntohl(*(u_int32_t*)pkt);
00563       if ((rtcpHdr & 0xC0000000) != 0x80000000) {
00564 #ifdef DEBUG
00565         fprintf(stderr, "bad RTCP subpacket: header 0x%08x\n", rtcpHdr);
00566 #endif
00567         break;
00568       }
00569     }
00570 
00571     if (!packetOK) {
00572 #ifdef DEBUG
00573       fprintf(stderr, "rejected bad RTCP subpacket: header 0x%08x\n", rtcpHdr);
00574 #endif
00575       break;
00576     } else {
00577 #ifdef DEBUG
00578       fprintf(stderr, "validated entire RTCP packet\n");
00579 #endif
00580     }
00581 
00582     onReceive(typeOfPacket, totPacketSize, reportSenderSSRC);
00583 
00584     // Finally, if we need to call a "BYE" handler, do so now (in case it causes "this" to get deleted):
00585     if (callByeHandler && fByeHandlerTask != NULL/*sanity check*/) {
00586       TaskFunc* byeHandler = fByeHandlerTask;
00587       fByeHandlerTask = NULL; // because we call the handler only once, by default
00588       (*byeHandler)(fByeHandlerClientData);
00589     }
00590   } while (0);
00591 }
00592 
00593 void RTCPInstance::onReceive(int typeOfPacket, int totPacketSize,
00594                              unsigned ssrc) {
00595   fTypeOfPacket = typeOfPacket;
00596   fLastReceivedSize = totPacketSize;
00597   fLastReceivedSSRC = ssrc;
00598 
00599   int members = (int)numMembers();
00600   int senders = (fSink != NULL) ? 1 : 0;
00601 
00602   OnReceive(this, // p
00603             this, // e
00604             &members, // members
00605             &fPrevNumMembers, // pmembers
00606             &senders, // senders
00607             &fAveRTCPSize, // avg_rtcp_size
00608             &fPrevReportTime, // tp
00609             dTimeNow(), // tc
00610             fNextReportTime);
00611 }
00612 
00613 void RTCPInstance::sendReport() {
00614   // Hack: Don't send a SR during those (brief) times when the timestamp of the
00615   // next outgoing RTP packet has been preset, to ensure that that timestamp gets
00616   // used for that outgoing packet. (David Bertrand, 2006.07.18)
00617   if (fSink != NULL && fSink->nextTimestampHasBeenPreset()) return;
00618 
00619 #ifdef DEBUG
00620   fprintf(stderr, "sending REPORT\n");
00621 #endif
00622   // Begin by including a SR and/or RR report:
00623   addReport();
00624 
00625   // Then, include a SDES:
00626   addSDES();
00627 
00628   // Send the report:
00629   sendBuiltPacket();
00630 
00631   // Periodically clean out old members from our SSRC membership database:
00632   const unsigned membershipReapPeriod = 5;
00633   if ((++fOutgoingReportCount) % membershipReapPeriod == 0) {
00634     unsigned threshold = fOutgoingReportCount - membershipReapPeriod;
00635     fKnownMembers->reapOldMembers(threshold);
00636   }
00637 }
00638 
00639 void RTCPInstance::sendBYE() {
00640 #ifdef DEBUG
00641   fprintf(stderr, "sending BYE\n");
00642 #endif
00643   // The packet must begin with a SR and/or RR report:
00644   addReport();
00645 
00646   addBYE();
00647   sendBuiltPacket();
00648 }
00649 
00650 void RTCPInstance::sendBuiltPacket() {
00651 #ifdef DEBUG
00652   fprintf(stderr, "sending RTCP packet\n");
00653   unsigned char* p = fOutBuf->packet();
00654   for (unsigned i = 0; i < fOutBuf->curPacketSize(); ++i) {
00655     if (i%4 == 0) fprintf(stderr," ");
00656     fprintf(stderr, "%02x", p[i]);
00657   }
00658   fprintf(stderr, "\n");
00659 #endif
00660   unsigned reportSize = fOutBuf->curPacketSize();
00661   fRTCPInterface.sendPacket(fOutBuf->packet(), reportSize);
00662   fOutBuf->resetOffset();
00663 
00664   fLastSentSize = IP_UDP_HDR_SIZE + reportSize;
00665   fHaveJustSentPacket = True;
00666   fLastPacketSentSize = reportSize;
00667 }
00668 
00669 int RTCPInstance::checkNewSSRC() {
00670   return fKnownMembers->noteMembership(fLastReceivedSSRC,
00671                                        fOutgoingReportCount);
00672 }
00673 
00674 void RTCPInstance::removeLastReceivedSSRC() {
00675   removeSSRC(fLastReceivedSSRC, False/*keep stats around*/);
00676 }
00677 
00678 void RTCPInstance::removeSSRC(u_int32_t ssrc, Boolean alsoRemoveStats) {
00679   fKnownMembers->remove(ssrc);
00680 
00681   if (alsoRemoveStats) {
00682     // Also, remove records of this SSRC from any reception or transmission stats
00683     if (fSource != NULL) fSource->receptionStatsDB().removeRecord(ssrc);
00684     if (fSink != NULL) fSink->transmissionStatsDB().removeRecord(ssrc);
00685   }
00686 }
00687 
00688 void RTCPInstance::onExpire(RTCPInstance* instance) {
00689   instance->onExpire1();
00690 }
00691 
00692 // Member functions to build specific kinds of report:
00693 
00694 void RTCPInstance::addReport() {
00695   // Include a SR or a RR, depending on whether we
00696   // have an associated sink or source:
00697   if (fSink != NULL) {
00698     addSR();
00699   } else if (fSource != NULL) {
00700     addRR();
00701   }
00702 }
00703 
00704 void RTCPInstance::addSR() {
00705   // ASSERT: fSink != NULL
00706 
00707   enqueueCommonReportPrefix(RTCP_PT_SR, fSink->SSRC(),
00708                             5 /* extra words in a SR */);
00709 
00710   // Now, add the 'sender info' for our sink
00711 
00712   // Insert the NTP and RTP timestamps for the 'wallclock time':
00713   struct timeval timeNow;
00714   gettimeofday(&timeNow, NULL);
00715   fOutBuf->enqueueWord(timeNow.tv_sec + 0x83AA7E80);
00716       // NTP timestamp most-significant word (1970 epoch -> 1900 epoch)
00717   double fractionalPart = (timeNow.tv_usec/15625.0)*0x04000000; // 2^32/10^6
00718   fOutBuf->enqueueWord((unsigned)(fractionalPart+0.5));
00719       // NTP timestamp least-significant word
00720   unsigned rtpTimestamp = fSink->convertToRTPTimestamp(timeNow);
00721   fOutBuf->enqueueWord(rtpTimestamp); // RTP ts
00722 
00723   // Insert the packet and byte counts:
00724   fOutBuf->enqueueWord(fSink->packetCount());
00725   fOutBuf->enqueueWord(fSink->octetCount());
00726 
00727   enqueueCommonReportSuffix();
00728 }
00729 
00730 void RTCPInstance::addRR() {
00731   // ASSERT: fSource != NULL
00732 
00733   enqueueCommonReportPrefix(RTCP_PT_RR, fSource->SSRC());
00734   enqueueCommonReportSuffix();
00735 }
00736 
00737 void RTCPInstance::enqueueCommonReportPrefix(unsigned char packetType,
00738                                              unsigned SSRC,
00739                                              unsigned numExtraWords) {
00740   unsigned numReportingSources;
00741   if (fSource == NULL) {
00742     numReportingSources = 0; // we don't receive anything
00743   } else {
00744     RTPReceptionStatsDB& allReceptionStats
00745       = fSource->receptionStatsDB();
00746     numReportingSources = allReceptionStats.numActiveSourcesSinceLastReset();
00747     // This must be <32, to fit in 5 bits:
00748     if (numReportingSources >= 32) { numReportingSources = 32; }
00749     // Later: support adding more reports to handle >32 sources (unlikely)#####
00750   }
00751 
00752   unsigned rtcpHdr = 0x80000000; // version 2, no padding
00753   rtcpHdr |= (numReportingSources<<24);
00754   rtcpHdr |= (packetType<<16);
00755   rtcpHdr |= (1 + numExtraWords + 6*numReportingSources);
00756       // each report block is 6 32-bit words long
00757   fOutBuf->enqueueWord(rtcpHdr);
00758 
00759   fOutBuf->enqueueWord(SSRC);
00760 }
00761 
00762 void RTCPInstance::enqueueCommonReportSuffix() {
00763   // Output the report blocks for each source:
00764   if (fSource != NULL) {
00765     RTPReceptionStatsDB& allReceptionStats
00766       = fSource->receptionStatsDB();
00767 
00768     RTPReceptionStatsDB::Iterator iterator(allReceptionStats);
00769     while (1) {
00770       RTPReceptionStats* receptionStats = iterator.next();
00771       if (receptionStats == NULL) break;
00772       enqueueReportBlock(receptionStats);
00773     }
00774 
00775     allReceptionStats.reset(); // because we have just generated a report
00776   }
00777 }
00778 
00779 void
00780 RTCPInstance::enqueueReportBlock(RTPReceptionStats* stats) {
00781   fOutBuf->enqueueWord(stats->SSRC());
00782 
00783   unsigned highestExtSeqNumReceived = stats->highestExtSeqNumReceived();
00784 
00785   unsigned totNumExpected
00786     = highestExtSeqNumReceived - stats->baseExtSeqNumReceived();
00787   int totNumLost = totNumExpected - stats->totNumPacketsReceived();
00788   // 'Clamp' this loss number to a 24-bit signed value:
00789   if (totNumLost > 0x007FFFFF) {
00790     totNumLost = 0x007FFFFF;
00791   } else if (totNumLost < 0) {
00792     if (totNumLost < -0x00800000) totNumLost = 0x00800000; // unlikely, but...
00793     totNumLost &= 0x00FFFFFF;
00794   }
00795 
00796   unsigned numExpectedSinceLastReset
00797     = highestExtSeqNumReceived - stats->lastResetExtSeqNumReceived();
00798   int numLostSinceLastReset
00799     = numExpectedSinceLastReset - stats->numPacketsReceivedSinceLastReset();
00800   unsigned char lossFraction;
00801   if (numExpectedSinceLastReset == 0 || numLostSinceLastReset < 0) {
00802     lossFraction = 0;
00803   } else {
00804     lossFraction = (unsigned char)
00805       ((numLostSinceLastReset << 8) / numExpectedSinceLastReset);
00806   }
00807 
00808   fOutBuf->enqueueWord((lossFraction<<24) | totNumLost);
00809   fOutBuf->enqueueWord(highestExtSeqNumReceived);
00810 
00811   fOutBuf->enqueueWord(stats->jitter());
00812 
00813   unsigned NTPmsw = stats->lastReceivedSR_NTPmsw();
00814   unsigned NTPlsw = stats->lastReceivedSR_NTPlsw();
00815   unsigned LSR = ((NTPmsw&0xFFFF)<<16)|(NTPlsw>>16); // middle 32 bits
00816   fOutBuf->enqueueWord(LSR);
00817 
00818   // Figure out how long has elapsed since the last SR rcvd from this src:
00819   struct timeval const& LSRtime = stats->lastReceivedSR_time(); // "last SR"
00820   struct timeval timeNow, timeSinceLSR;
00821   gettimeofday(&timeNow, NULL);
00822   if (timeNow.tv_usec < LSRtime.tv_usec) {
00823     timeNow.tv_usec += 1000000;
00824     timeNow.tv_sec -= 1;
00825   }
00826   timeSinceLSR.tv_sec = timeNow.tv_sec - LSRtime.tv_sec;
00827   timeSinceLSR.tv_usec = timeNow.tv_usec - LSRtime.tv_usec;
00828   // The enqueued time is in units of 1/65536 seconds.
00829   // (Note that 65536/1000000 == 1024/15625)
00830   unsigned DLSR;
00831   if (LSR == 0) {
00832     DLSR = 0;
00833   } else {
00834     DLSR = (timeSinceLSR.tv_sec<<16)
00835          | ( (((timeSinceLSR.tv_usec<<11)+15625)/31250) & 0xFFFF);
00836   }
00837   fOutBuf->enqueueWord(DLSR);
00838 }
00839 
00840 void RTCPInstance::addSDES() {
00841   // For now we support only the CNAME item; later support more #####
00842 
00843   // Begin by figuring out the size of the entire SDES report:
00844   unsigned numBytes = 4;
00845       // counts the SSRC, but not the header; it'll get subtracted out
00846   numBytes += fCNAME.totalSize(); // includes id and length
00847   numBytes += 1; // the special END item
00848 
00849   unsigned num4ByteWords = (numBytes + 3)/4;
00850 
00851   unsigned rtcpHdr = 0x81000000; // version 2, no padding, 1 SSRC chunk
00852   rtcpHdr |= (RTCP_PT_SDES<<16);
00853   rtcpHdr |= num4ByteWords;
00854   fOutBuf->enqueueWord(rtcpHdr);
00855 
00856   if (fSource != NULL) {
00857     fOutBuf->enqueueWord(fSource->SSRC());
00858   } else if (fSink != NULL) {
00859     fOutBuf->enqueueWord(fSink->SSRC());
00860   }
00861 
00862   // Add the CNAME:
00863   fOutBuf->enqueue(fCNAME.data(), fCNAME.totalSize());
00864 
00865   // Add the 'END' item (i.e., a zero byte), plus any more needed to pad:
00866   unsigned numPaddingBytesNeeded = 4 - (fOutBuf->curPacketSize() % 4);
00867   unsigned char const zero = '\0';
00868   while (numPaddingBytesNeeded-- > 0) fOutBuf->enqueue(&zero, 1);
00869 }
00870 
00871 void RTCPInstance::addBYE() {
00872   unsigned rtcpHdr = 0x81000000; // version 2, no padding, 1 SSRC
00873   rtcpHdr |= (RTCP_PT_BYE<<16);
00874   rtcpHdr |= 1; // 2 32-bit words total (i.e., with 1 SSRC)
00875   fOutBuf->enqueueWord(rtcpHdr);
00876 
00877   if (fSource != NULL) {
00878     fOutBuf->enqueueWord(fSource->SSRC());
00879   } else if (fSink != NULL) {
00880     fOutBuf->enqueueWord(fSink->SSRC());
00881   }
00882 }
00883 
00884 void RTCPInstance::schedule(double nextTime) {
00885   fNextReportTime = nextTime;
00886 
00887   double secondsToDelay = nextTime - dTimeNow();
00888   if (secondsToDelay < 0) secondsToDelay = 0;
00889 #ifdef DEBUG
00890   fprintf(stderr, "schedule(%f->%f)\n", secondsToDelay, nextTime);
00891 #endif
00892   int64_t usToGo = (int64_t)(secondsToDelay * 1000000);
00893   nextTask() = envir().taskScheduler().scheduleDelayedTask(usToGo,
00894                                 (TaskFunc*)RTCPInstance::onExpire, this);
00895 }
00896 
00897 void RTCPInstance::reschedule(double nextTime) {
00898   envir().taskScheduler().unscheduleDelayedTask(nextTask());
00899   schedule(nextTime);
00900 }
00901 
00902 void RTCPInstance::onExpire1() {
00903   // Note: fTotSessionBW is kbits per second
00904   double rtcpBW = 0.05*fTotSessionBW*1024/8; // -> bytes per second
00905 
00906   OnExpire(this, // event
00907            numMembers(), // members
00908            (fSink != NULL) ? 1 : 0, // senders
00909            rtcpBW, // rtcp_bw
00910            (fSink != NULL) ? 1 : 0, // we_sent
00911            &fAveRTCPSize, // ave_rtcp_size
00912            &fIsInitial, // initial
00913            dTimeNow(), // tc
00914            &fPrevReportTime, // tp
00915            &fPrevNumMembers // pmembers
00916            );
00917 }
00918 
00920 
00921 SDESItem::SDESItem(unsigned char tag, unsigned char const* value) {
00922   unsigned length = strlen((char const*)value);
00923   if (length > 0xFF) length = 0xFF; // maximum data length for a SDES item
00924 
00925   fData[0] = tag;
00926   fData[1] = (unsigned char)length;
00927   memmove(&fData[2], value, length);
00928 }
00929 
00930 unsigned SDESItem::totalSize() const {
00931   return 2 + (unsigned)fData[1];
00932 }
00933 
00934 
00936 
00937 extern "C" void Schedule(double nextTime, event e) {
00938   RTCPInstance* instance = (RTCPInstance*)e;
00939   if (instance == NULL) return;
00940 
00941   instance->schedule(nextTime);
00942 }
00943 
00944 extern "C" void Reschedule(double nextTime, event e) {
00945   RTCPInstance* instance = (RTCPInstance*)e;
00946   if (instance == NULL) return;
00947 
00948   instance->reschedule(nextTime);
00949 }
00950 
00951 extern "C" void SendRTCPReport(event e) {
00952   RTCPInstance* instance = (RTCPInstance*)e;
00953   if (instance == NULL) return;
00954 
00955   instance->sendReport();
00956 }
00957 
00958 extern "C" void SendBYEPacket(event e) {
00959   RTCPInstance* instance = (RTCPInstance*)e;
00960   if (instance == NULL) return;
00961 
00962   instance->sendBYE();
00963 }
00964 
00965 extern "C" int TypeOfEvent(event e) {
00966   RTCPInstance* instance = (RTCPInstance*)e;
00967   if (instance == NULL) return EVENT_UNKNOWN;
00968 
00969   return instance->typeOfEvent();
00970 }
00971 
00972 extern "C" int SentPacketSize(event e) {
00973   RTCPInstance* instance = (RTCPInstance*)e;
00974   if (instance == NULL) return 0;
00975 
00976   return instance->sentPacketSize();
00977 }
00978 
00979 extern "C" int PacketType(packet p) {
00980   RTCPInstance* instance = (RTCPInstance*)p;
00981   if (instance == NULL) return PACKET_UNKNOWN_TYPE;
00982 
00983   return instance->packetType();
00984 }
00985 
00986 extern "C" int ReceivedPacketSize(packet p) {
00987   RTCPInstance* instance = (RTCPInstance*)p;
00988   if (instance == NULL) return 0;
00989 
00990   return instance->receivedPacketSize();
00991 }
00992 
00993 extern "C" int NewMember(packet p) {
00994   RTCPInstance* instance = (RTCPInstance*)p;
00995   if (instance == NULL) return 0;
00996 
00997   return instance->checkNewSSRC();
00998 }
00999 
01000 extern "C" int NewSender(packet /*p*/) {
01001   return 0; // we don't yet recognize senders other than ourselves #####
01002 }
01003 
01004 extern "C" void AddMember(packet /*p*/) {
01005   // Do nothing; all of the real work was done when NewMember() was called
01006 }
01007 
01008 extern "C" void AddSender(packet /*p*/) {
01009   // we don't yet recognize senders other than ourselves #####
01010 }
01011 
01012 extern "C" void RemoveMember(packet p) {
01013   RTCPInstance* instance = (RTCPInstance*)p;
01014   if (instance == NULL) return;
01015 
01016   instance->removeLastReceivedSSRC();
01017 }
01018 
01019 extern "C" void RemoveSender(packet /*p*/) {
01020   // we don't yet recognize senders other than ourselves #####
01021 }
01022 
01023 extern "C" double drand30() {
01024   unsigned tmp = our_random()&0x3FFFFFFF; // a random 30-bit integer
01025   return tmp/(double)(1024*1024*1024);
01026 }

Generated on Thu Feb 2 23:51:31 2012 for live by  doxygen 1.5.2