liveMedia/RTPInterface.cpp

Go to the documentation of this file.
00001 /**********
00002 This library is free software; you can redistribute it and/or modify it under
00003 the terms of the GNU Lesser General Public License as published by the
00004 Free Software Foundation; either version 2.1 of the License, or (at your
00005 option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
00006 
00007 This library is distributed in the hope that it will be useful, but WITHOUT
00008 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00009 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
00010 more details.
00011 
00012 You should have received a copy of the GNU Lesser General Public License
00013 along with this library; if not, write to the Free Software Foundation, Inc.,
00014 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
00015 **********/
00016 // "liveMedia"
00017 // Copyright (c) 1996-2012 Live Networks, Inc.  All rights reserved.
00018 // An abstraction of a network interface used for RTP (or RTCP).
00019 // (This allows the RTP-over-TCP hack (RFC 2326, section 10.12) to
00020 // be implemented transparently.)
00021 // Implementation
00022 
00023 #include "RTPInterface.hh"
00024 #include <GroupsockHelper.hh>
00025 #include <stdio.h>
00026 
00028 
00029 // Helper routines and data structures, used to implement
00030 // sending/receiving RTP/RTCP over a TCP socket:
00031 
00032 static Boolean sendRTPOverTCP(unsigned char* packet, unsigned packetSize,
00033                               int socketNum, unsigned char streamChannelId);
00034 
00035 // Reading RTP-over-TCP is implemented using two levels of hash tables.
00036 // The top-level hash table maps TCP socket numbers to a
00037 // "SocketDescriptor" that contains a hash table for each of the
00038 // sub-channels that are reading from this socket.
00039 
00040 static HashTable* socketHashTable(UsageEnvironment& env, Boolean createIfNotPresent = True) {
00041   _Tables* ourTables = _Tables::getOurTables(env, createIfNotPresent);
00042   if (ourTables == NULL) return NULL;
00043 
00044   if (ourTables->socketTable == NULL) {
00045     // Create a new socket number -> SocketDescriptor mapping table:
00046     ourTables->socketTable = HashTable::create(ONE_WORD_HASH_KEYS);
00047   }
00048   return (HashTable*)(ourTables->socketTable);
00049 }
00050 
00051 class SocketDescriptor {
00052 public:
00053   SocketDescriptor(UsageEnvironment& env, int socketNum);
00054   virtual ~SocketDescriptor();
00055 
00056   void registerRTPInterface(unsigned char streamChannelId,
00057                             RTPInterface* rtpInterface);
00058   RTPInterface* lookupRTPInterface(unsigned char streamChannelId);
00059   void deregisterRTPInterface(unsigned char streamChannelId);
00060 
00061   void setServerRequestAlternativeByteHandler(ServerRequestAlternativeByteHandler* handler, void* clientData) {
00062     fServerRequestAlternativeByteHandler = handler;
00063     fServerRequestAlternativeByteHandlerClientData = clientData;
00064   }
00065 
00066 private:
00067   static void tcpReadHandler(SocketDescriptor*, int mask);
00068   void tcpReadHandler1(int mask);
00069 
00070 private:
00071   UsageEnvironment& fEnv;
00072   int fOurSocketNum;
00073   HashTable* fSubChannelHashTable;
00074   ServerRequestAlternativeByteHandler* fServerRequestAlternativeByteHandler;
00075   void* fServerRequestAlternativeByteHandlerClientData;
00076   u_int8_t fStreamChannelId, fSizeByte1;
00077   enum { AWAITING_DOLLAR, AWAITING_STREAM_CHANNEL_ID, AWAITING_SIZE1, AWAITING_SIZE2, AWAITING_PACKET_DATA } fTCPReadingState;
00078 };
00079 
00080 static SocketDescriptor* lookupSocketDescriptor(UsageEnvironment& env, int sockNum, Boolean createIfNotFound = True) {
00081   HashTable* table = socketHashTable(env, createIfNotFound);
00082   if (table == NULL) return NULL;
00083 
00084   char const* key = (char const*)(long)sockNum;
00085   SocketDescriptor* socketDescriptor = (SocketDescriptor*)(table->Lookup(key));
00086   if (socketDescriptor == NULL && createIfNotFound) {
00087     socketDescriptor = new SocketDescriptor(env, sockNum);
00088     table->Add((char const*)(long)(sockNum), socketDescriptor);
00089   }
00090 
00091   return socketDescriptor;
00092 }
00093 
00094 static void removeSocketDescription(UsageEnvironment& env, int sockNum) {
00095   char const* key = (char const*)(long)sockNum;
00096   HashTable* table = socketHashTable(env);
00097   table->Remove(key);
00098 
00099   if (table->IsEmpty()) {
00100     // We can also delete the table (to reclaim space):
00101     _Tables* ourTables = _Tables::getOurTables(env);
00102     delete table;
00103     ourTables->socketTable = NULL;
00104     ourTables->reclaimIfPossible();
00105   }
00106 }
00107 
00108 
00110 
00111 RTPInterface::RTPInterface(Medium* owner, Groupsock* gs)
00112   : fOwner(owner), fGS(gs),
00113     fTCPStreams(NULL),
00114     fNextTCPReadSize(0), fNextTCPReadStreamSocketNum(-1),
00115     fNextTCPReadStreamChannelId(0xFF), fReadHandlerProc(NULL),
00116     fAuxReadHandlerFunc(NULL), fAuxReadHandlerClientData(NULL) {
00117   // Make the socket non-blocking, even though it will be read from only asynchronously, when packets arrive.
00118   // The reason for this is that, in some OSs, reads on a blocking socket can (allegedly) sometimes block,
00119   // even if the socket was previously reported (e.g., by "select()") as having data available.
00120   // (This can supposedly happen if the UDP checksum fails, for example.)
00121   makeSocketNonBlocking(fGS->socketNum());
00122   increaseSendBufferTo(envir(), fGS->socketNum(), 50*1024);
00123 }
00124 
00125 RTPInterface::~RTPInterface() {
00126   delete fTCPStreams;
00127 }
00128 
00129 void RTPInterface::setStreamSocket(int sockNum,
00130                                    unsigned char streamChannelId) {
00131   fGS->removeAllDestinations();
00132   addStreamSocket(sockNum, streamChannelId);
00133 }
00134 
00135 void RTPInterface::addStreamSocket(int sockNum,
00136                                    unsigned char streamChannelId) {
00137   if (sockNum < 0) return;
00138 
00139   for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00140        streams = streams->fNext) {
00141     if (streams->fStreamSocketNum == sockNum
00142         && streams->fStreamChannelId == streamChannelId) {
00143       return; // we already have it
00144     }
00145   }
00146 
00147   fTCPStreams = new tcpStreamRecord(sockNum, streamChannelId, fTCPStreams);
00148 }
00149 
00150 static void deregisterSocket(UsageEnvironment& env, int sockNum, unsigned char streamChannelId) {
00151   SocketDescriptor* socketDescriptor = lookupSocketDescriptor(env, sockNum, False);
00152   if (socketDescriptor != NULL) {
00153     socketDescriptor->deregisterRTPInterface(streamChannelId);
00154         // Note: This may delete "socketDescriptor",
00155         // if no more interfaces are using this socket
00156   }
00157 }
00158 
00159 void RTPInterface::removeStreamSocket(int sockNum,
00160                                       unsigned char streamChannelId) {
00161   for (tcpStreamRecord** streamsPtr = &fTCPStreams; *streamsPtr != NULL;
00162        streamsPtr = &((*streamsPtr)->fNext)) {
00163     if ((*streamsPtr)->fStreamSocketNum == sockNum
00164         && (*streamsPtr)->fStreamChannelId == streamChannelId) {
00165       deregisterSocket(envir(), sockNum, streamChannelId);
00166 
00167       // Then remove the record pointed to by *streamsPtr :
00168       tcpStreamRecord* next = (*streamsPtr)->fNext;
00169       (*streamsPtr)->fNext = NULL;
00170       delete (*streamsPtr);
00171       *streamsPtr = next;
00172       return;
00173     }
00174   }
00175 }
00176 
00177 void RTPInterface
00178 ::setServerRequestAlternativeByteHandler(int socketNum, ServerRequestAlternativeByteHandler* handler, void* clientData) {
00179   SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), socketNum);
00180 
00181   if (socketDescriptor != NULL) socketDescriptor->setServerRequestAlternativeByteHandler(handler, clientData);
00182 }
00183 
00184 
00185 Boolean RTPInterface::sendPacket(unsigned char* packet, unsigned packetSize) {
00186   Boolean success = True; // we'll return False instead if any of the sends fail
00187 
00188   // Normal case: Send as a UDP packet:
00189   if (!fGS->output(envir(), fGS->ttl(), packet, packetSize)) success = False;
00190 
00191   // Also, send over each of our TCP sockets:
00192   for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00193        streams = streams->fNext) {
00194     if (!sendRTPOverTCP(packet, packetSize,
00195                         streams->fStreamSocketNum, streams->fStreamChannelId)) {
00196       success = False;
00197     }
00198   }
00199 
00200   return success;
00201 }
00202 
00203 void RTPInterface
00204 ::startNetworkReading(TaskScheduler::BackgroundHandlerProc* handlerProc) {
00205   // Normal case: Arrange to read UDP packets:
00206   envir().taskScheduler().
00207     turnOnBackgroundReadHandling(fGS->socketNum(), handlerProc, fOwner);
00208 
00209   // Also, receive RTP over TCP, on each of our TCP connections:
00210   fReadHandlerProc = handlerProc;
00211   for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00212        streams = streams->fNext) {
00213     // Get a socket descriptor for "streams->fStreamSocketNum":
00214     SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), streams->fStreamSocketNum);
00215 
00216     // Tell it about our subChannel:
00217     socketDescriptor->registerRTPInterface(streams->fStreamChannelId, this);
00218   }
00219 }
00220 
00221 Boolean RTPInterface::handleRead(unsigned char* buffer, unsigned bufferMaxSize,
00222                                  unsigned& bytesRead, struct sockaddr_in& fromAddress, Boolean& packetReadWasIncomplete) {
00223   packetReadWasIncomplete = False; // by default
00224   Boolean readSuccess;
00225   if (fNextTCPReadStreamSocketNum < 0) {
00226     // Normal case: read from the (datagram) 'groupsock':
00227     readSuccess = fGS->handleRead(buffer, bufferMaxSize, bytesRead, fromAddress);
00228   } else {
00229     // Read from the TCP connection:
00230     bytesRead = 0;
00231     unsigned totBytesToRead = fNextTCPReadSize;
00232     if (totBytesToRead > bufferMaxSize) totBytesToRead = bufferMaxSize;
00233     unsigned curBytesToRead = totBytesToRead;
00234     int curBytesRead;
00235     while ((curBytesRead = readSocket(envir(), fNextTCPReadStreamSocketNum,
00236                                       &buffer[bytesRead], curBytesToRead,
00237                                       fromAddress)) > 0) {
00238       bytesRead += curBytesRead;
00239       if (bytesRead >= totBytesToRead) break;
00240       curBytesToRead -= curBytesRead;
00241     }
00242     fNextTCPReadSize -= bytesRead;
00243     if (curBytesRead == 0 && curBytesToRead > 0) {
00244       packetReadWasIncomplete = True;
00245       return True;
00246     } else if (curBytesRead < 0) {
00247       bytesRead = 0;
00248       readSuccess = False;
00249     } else {
00250       readSuccess = True;
00251     }
00252     fNextTCPReadStreamSocketNum = -1; // default, for next time
00253   }
00254 
00255   if (readSuccess && fAuxReadHandlerFunc != NULL) {
00256     // Also pass the newly-read packet data to our auxilliary handler:
00257     (*fAuxReadHandlerFunc)(fAuxReadHandlerClientData, buffer, bytesRead);
00258   }
00259   return readSuccess;
00260 }
00261 
00262 void RTPInterface::stopNetworkReading() {
00263   // Normal case
00264   envir().taskScheduler().turnOffBackgroundReadHandling(fGS->socketNum());
00265 
00266   // Also turn off read handling on each of our TCP connections:
00267   for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00268        streams = streams->fNext) {
00269     deregisterSocket(envir(), streams->fStreamSocketNum, streams->fStreamChannelId);
00270   }
00271 }
00272 
00273 
00275 
00276 Boolean sendRTPOverTCP(unsigned char* packet, unsigned packetSize,
00277                     int socketNum, unsigned char streamChannelId) {
00278 #ifdef DEBUG
00279   fprintf(stderr, "sendRTPOverTCP: %d bytes over channel %d (socket %d)\n",
00280           packetSize, streamChannelId, socketNum); fflush(stderr);
00281 #endif
00282   // Send RTP over TCP, using the encoding defined in
00283   // RFC 2326, section 10.12:
00284   do {
00285     char const dollar = '$';
00286     if (send(socketNum, &dollar, 1, 0) != 1) break;
00287     if (send(socketNum, (char*)&streamChannelId, 1, 0) != 1) break;
00288 
00289     char netPacketSize[2];
00290     netPacketSize[0] = (char) ((packetSize&0xFF00)>>8);
00291     netPacketSize[1] = (char) (packetSize&0xFF);
00292     if (send(socketNum, netPacketSize, 2, 0) != 2) break;
00293 
00294     if (send(socketNum, (char*)packet, packetSize, 0) != (int)packetSize) break;
00295 
00296 #ifdef DEBUG
00297     fprintf(stderr, "sendRTPOverTCP: completed\n"); fflush(stderr);
00298 #endif
00299 
00300     return True;
00301   } while (0);
00302 
00303 #ifdef DEBUG
00304   fprintf(stderr, "sendRTPOverTCP: failed!\n"); fflush(stderr);
00305 #endif
00306   return False;
00307 }
00308 
00309 SocketDescriptor::SocketDescriptor(UsageEnvironment& env, int socketNum)
00310   :fEnv(env), fOurSocketNum(socketNum),
00311     fSubChannelHashTable(HashTable::create(ONE_WORD_HASH_KEYS)),
00312    fServerRequestAlternativeByteHandler(NULL), fServerRequestAlternativeByteHandlerClientData(NULL),
00313    fTCPReadingState(AWAITING_DOLLAR) {
00314 }
00315 
00316 SocketDescriptor::~SocketDescriptor() {
00317   delete fSubChannelHashTable;
00318 }
00319 
00320 void SocketDescriptor::registerRTPInterface(unsigned char streamChannelId,
00321                                             RTPInterface* rtpInterface) {
00322   Boolean isFirstRegistration = fSubChannelHashTable->IsEmpty();
00323   fSubChannelHashTable->Add((char const*)(long)streamChannelId,
00324                             rtpInterface);
00325 
00326   if (isFirstRegistration) {
00327     // Arrange to handle reads on this TCP socket:
00328     TaskScheduler::BackgroundHandlerProc* handler
00329       = (TaskScheduler::BackgroundHandlerProc*)&tcpReadHandler;
00330     fEnv.taskScheduler().
00331       turnOnBackgroundReadHandling(fOurSocketNum, handler, this);
00332   }
00333 }
00334 
00335 RTPInterface* SocketDescriptor
00336 ::lookupRTPInterface(unsigned char streamChannelId) {
00337   char const* lookupArg = (char const*)(long)streamChannelId;
00338   return (RTPInterface*)(fSubChannelHashTable->Lookup(lookupArg));
00339 }
00340 
00341 void SocketDescriptor
00342 ::deregisterRTPInterface(unsigned char streamChannelId) {
00343   fSubChannelHashTable->Remove((char const*)(long)streamChannelId);
00344 
00345   if (fSubChannelHashTable->IsEmpty()) {
00346     // No more interfaces are using us, so it's curtains for us now
00347     fEnv.taskScheduler().turnOffBackgroundReadHandling(fOurSocketNum);
00348     removeSocketDescription(fEnv, fOurSocketNum);
00349     delete this;
00350   }
00351 }
00352 
00353 void SocketDescriptor::tcpReadHandler(SocketDescriptor* socketDescriptor, int mask) {
00354   socketDescriptor->tcpReadHandler1(mask);
00355 }
00356 
00357 void SocketDescriptor::tcpReadHandler1(int mask) {
00358   // We expect the following data over the TCP channel:
00359   //   optional RTSP command or response bytes (before the first '$' character)
00360   //   a '$' character
00361   //   a 1-byte channel id
00362   //   a 2-byte packet size (in network byte order)
00363   //   the packet data.
00364   // However, because the socket is being read asynchronously, this data might arrive in pieces.
00365   
00366   u_int8_t c;
00367   struct sockaddr_in fromAddress;
00368   if (fTCPReadingState != AWAITING_PACKET_DATA) {
00369     int result = readSocket(fEnv, fOurSocketNum, &c, 1, fromAddress);
00370     if (result != 1) { // error reading TCP socket, or no more data available
00371       if (result < 0) { // error
00372         fEnv.taskScheduler().turnOffBackgroundReadHandling(fOurSocketNum); // stops further calls to us
00373       }
00374       return;
00375     }
00376   }
00377   
00378   switch (fTCPReadingState) {
00379     case AWAITING_DOLLAR: {
00380       if (c == '$') {
00381         fTCPReadingState = AWAITING_STREAM_CHANNEL_ID;
00382       } else {
00383         // This character is part of a RTSP request or command, which is handled separately:
00384         if (fServerRequestAlternativeByteHandler != NULL) {
00385           (*fServerRequestAlternativeByteHandler)(fServerRequestAlternativeByteHandlerClientData, c);
00386         }
00387       }
00388       break;
00389     }
00390     case AWAITING_STREAM_CHANNEL_ID: {
00391       // The byte that we read is the stream channel id.
00392       if (lookupRTPInterface(c) != NULL) { // sanity check
00393         fStreamChannelId = c;
00394         fTCPReadingState = AWAITING_SIZE1;
00395       } else {
00396         // This wasn't a stream channel id that we expected.  We're (somehow) in a strange state.  Try to recover:
00397         fTCPReadingState = AWAITING_DOLLAR;
00398       }
00399       break;
00400     }
00401     case AWAITING_SIZE1: {
00402       // The byte that we read is the first (high) byte of the 16-bit RTP or RTCP packet 'size'.
00403       fSizeByte1 = c;
00404       fTCPReadingState = AWAITING_SIZE2;
00405       break;
00406     }
00407     case AWAITING_SIZE2: {
00408       // The byte that we read is the second (low) byte of the 16-bit RTP or RTCP packet 'size'.
00409       unsigned short size = (fSizeByte1<<8)|c;
00410       
00411       // Record the information about the packet data that will be read next:
00412       RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId);
00413       if (rtpInterface != NULL) {
00414         rtpInterface->fNextTCPReadSize = size;
00415         rtpInterface->fNextTCPReadStreamSocketNum = fOurSocketNum;
00416         rtpInterface->fNextTCPReadStreamChannelId = fStreamChannelId;
00417       }
00418       fTCPReadingState = AWAITING_PACKET_DATA;
00419       break;
00420     }
00421     case AWAITING_PACKET_DATA: {
00422       // Call the appropriate read handler to get the packet data from the TCP stream:
00423       RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId);
00424       if (rtpInterface != NULL) {
00425         if (rtpInterface->fNextTCPReadSize == 0) {
00426           // We've already read all the data for this packet.
00427           fTCPReadingState = AWAITING_DOLLAR;
00428           break;
00429         }
00430         if (rtpInterface->fReadHandlerProc != NULL) {
00431 #ifdef DEBUG
00432           fprintf(stderr, "SocketDescriptor::tcpReadHandler() reading %d bytes on channel %d\n", rtpInterface->fNextTCPReadSize, rtpInterface->fNextTCPReadStreamChannelId);
00433 #endif
00434           rtpInterface->fReadHandlerProc(rtpInterface->fOwner, mask);
00435         }
00436       }
00437       return;
00438     }
00439   }
00440 }
00441 
00442 
00444 
00445 tcpStreamRecord
00446 ::tcpStreamRecord(int streamSocketNum, unsigned char streamChannelId,
00447                   tcpStreamRecord* next)
00448   : fNext(next),
00449     fStreamSocketNum(streamSocketNum), fStreamChannelId(streamChannelId) {
00450 }
00451 
00452 tcpStreamRecord::~tcpStreamRecord() {
00453   delete fNext;
00454 }
00455 

Generated on Thu Feb 2 23:51:31 2012 for live by  doxygen 1.5.2