liveMedia/RTPInterface.cpp

Go to the documentation of this file.
00001 /**********
00002 This library is free software; you can redistribute it and/or modify it under
00003 the terms of the GNU Lesser General Public License as published by the
00004 Free Software Foundation; either version 2.1 of the License, or (at your
00005 option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
00006 
00007 This library is distributed in the hope that it will be useful, but WITHOUT
00008 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00009 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
00010 more details.
00011 
00012 You should have received a copy of the GNU Lesser General Public License
00013 along with this library; if not, write to the Free Software Foundation, Inc.,
00014 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
00015 **********/
00016 // "liveMedia"
00017 // Copyright (c) 1996-2010 Live Networks, Inc.  All rights reserved.
00018 // An abstraction of a network interface used for RTP (or RTCP).
00019 // (This allows the RTP-over-TCP hack (RFC 2326, section 10.12) to
00020 // be implemented transparently.)
00021 // Implementation
00022 
00023 #include "RTPInterface.hh"
00024 #include <GroupsockHelper.hh>
00025 #include <stdio.h>
00026 
00028 
00029 // Helper routines and data structures, used to implement
00030 // sending/receiving RTP/RTCP over a TCP socket:
00031 
00032 static void sendRTPOverTCP(unsigned char* packet, unsigned packetSize,
00033                            int socketNum, unsigned char streamChannelId);
00034 
00035 // Reading RTP-over-TCP is implemented using two levels of hash tables.
00036 // The top-level hash table maps TCP socket numbers to a
00037 // "SocketDescriptor" that contains a hash table for each of the
00038 // sub-channels that are reading from this socket.
00039 
00040 static HashTable* socketHashTable(UsageEnvironment& env, Boolean createIfNotPresent = True) {
00041   _Tables* ourTables = _Tables::getOurTables(env, createIfNotPresent);
00042   if (ourTables == NULL) return NULL;
00043 
00044   if (ourTables->socketTable == NULL) {
00045     // Create a new socket number -> SocketDescriptor mapping table:
00046     ourTables->socketTable = HashTable::create(ONE_WORD_HASH_KEYS);
00047   }
00048   return (HashTable*)(ourTables->socketTable);
00049 }
00050 
00051 class SocketDescriptor {
00052 public:
00053   SocketDescriptor(UsageEnvironment& env, int socketNum);
00054   virtual ~SocketDescriptor();
00055 
00056   void registerRTPInterface(unsigned char streamChannelId,
00057                             RTPInterface* rtpInterface);
00058   RTPInterface* lookupRTPInterface(unsigned char streamChannelId);
00059   void deregisterRTPInterface(unsigned char streamChannelId);
00060 
00061   void setServerRequestAlternativeByteHandler(ServerRequestAlternativeByteHandler* handler, void* clientData) {
00062     fServerRequestAlternativeByteHandler = handler;
00063     fServerRequestAlternativeByteHandlerClientData = clientData;
00064   }
00065 
00066 private:
00067   static void tcpReadHandler(SocketDescriptor*, int mask);
00068   void tcpReadHandler1(int mask);
00069 
00070 private:
00071   UsageEnvironment& fEnv;
00072   int fOurSocketNum;
00073   HashTable* fSubChannelHashTable;
00074   ServerRequestAlternativeByteHandler* fServerRequestAlternativeByteHandler;
00075   void* fServerRequestAlternativeByteHandlerClientData;
00076   u_int8_t fStreamChannelId, fSizeByte1;
00077   enum { AWAITING_DOLLAR, AWAITING_STREAM_CHANNEL_ID, AWAITING_SIZE1, AWAITING_SIZE2, AWAITING_PACKET_DATA } fTCPReadingState;
00078 };
00079 
00080 static SocketDescriptor* lookupSocketDescriptor(UsageEnvironment& env, int sockNum, Boolean createIfNotFound = True) {
00081   HashTable* table = socketHashTable(env, createIfNotFound);
00082   if (table == NULL) return NULL;
00083 
00084   char const* key = (char const*)(long)sockNum;
00085   SocketDescriptor* socketDescriptor = (SocketDescriptor*)(table->Lookup(key));
00086   if (socketDescriptor == NULL && createIfNotFound) {
00087     socketDescriptor = new SocketDescriptor(env, sockNum);
00088     table->Add((char const*)(long)(sockNum), socketDescriptor);
00089   }
00090 
00091   return socketDescriptor;
00092 }
00093 
00094 static void removeSocketDescription(UsageEnvironment& env, int sockNum) {
00095   char const* key = (char const*)(long)sockNum;
00096   HashTable* table = socketHashTable(env);
00097   table->Remove(key);
00098 
00099   if (table->IsEmpty()) {
00100     // We can also delete the table (to reclaim space):
00101     _Tables* ourTables = _Tables::getOurTables(env);
00102     delete table;
00103     ourTables->socketTable = NULL;
00104     ourTables->reclaimIfPossible();
00105   }
00106 }
00107 
00108 
00110 
00111 RTPInterface::RTPInterface(Medium* owner, Groupsock* gs)
00112   : fOwner(owner), fGS(gs),
00113     fTCPStreams(NULL),
00114     fNextTCPReadSize(0), fNextTCPReadStreamSocketNum(-1),
00115     fNextTCPReadStreamChannelId(0xFF), fReadHandlerProc(NULL),
00116     fAuxReadHandlerFunc(NULL), fAuxReadHandlerClientData(NULL) {
00117   // Make the socket non-blocking, even though it will be read from only asynchronously, when packets arrive.
00118   // The reason for this is that, in some OSs, reads on a blocking socket can (allegedly) sometimes block,
00119   // even if the socket was previously reported (e.g., by "select()") as having data available.
00120   // (This can supposedly happen if the UDP checksum fails, for example.)
00121   makeSocketNonBlocking(fGS->socketNum());
00122   increaseSendBufferTo(envir(), fGS->socketNum(), 50*1024);
00123 }
00124 
00125 RTPInterface::~RTPInterface() {
00126   delete fTCPStreams;
00127 }
00128 
00129 void RTPInterface::setStreamSocket(int sockNum,
00130                                    unsigned char streamChannelId) {
00131   fGS->removeAllDestinations();
00132   addStreamSocket(sockNum, streamChannelId);
00133 }
00134 
00135 void RTPInterface::addStreamSocket(int sockNum,
00136                                    unsigned char streamChannelId) {
00137   if (sockNum < 0) return;
00138 
00139   for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00140        streams = streams->fNext) {
00141     if (streams->fStreamSocketNum == sockNum
00142         && streams->fStreamChannelId == streamChannelId) {
00143       return; // we already have it
00144     }
00145   }
00146 
00147   fTCPStreams = new tcpStreamRecord(sockNum, streamChannelId, fTCPStreams);
00148 }
00149 
00150 static void deregisterSocket(UsageEnvironment& env, int sockNum, unsigned char streamChannelId) {
00151   SocketDescriptor* socketDescriptor = lookupSocketDescriptor(env, sockNum, False);
00152   if (socketDescriptor != NULL) {
00153     socketDescriptor->deregisterRTPInterface(streamChannelId);
00154         // Note: This may delete "socketDescriptor",
00155         // if no more interfaces are using this socket
00156   }
00157 }
00158 
00159 void RTPInterface::removeStreamSocket(int sockNum,
00160                                       unsigned char streamChannelId) {
00161   for (tcpStreamRecord** streamsPtr = &fTCPStreams; *streamsPtr != NULL;
00162        streamsPtr = &((*streamsPtr)->fNext)) {
00163     if ((*streamsPtr)->fStreamSocketNum == sockNum
00164         && (*streamsPtr)->fStreamChannelId == streamChannelId) {
00165       deregisterSocket(envir(), sockNum, streamChannelId);
00166 
00167       // Then remove the record pointed to by *streamsPtr :
00168       tcpStreamRecord* next = (*streamsPtr)->fNext;
00169       (*streamsPtr)->fNext = NULL;
00170       delete (*streamsPtr);
00171       *streamsPtr = next;
00172       return;
00173     }
00174   }
00175 }
00176 
00177 void RTPInterface
00178 ::setServerRequestAlternativeByteHandler(int socketNum, ServerRequestAlternativeByteHandler* handler, void* clientData) {
00179   SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), socketNum);
00180 
00181   if (socketDescriptor != NULL) socketDescriptor->setServerRequestAlternativeByteHandler(handler, clientData);
00182 }
00183 
00184 
00185 void RTPInterface::sendPacket(unsigned char* packet, unsigned packetSize) {
00186   // Normal case: Send as a UDP packet:
00187   fGS->output(envir(), fGS->ttl(), packet, packetSize);
00188 
00189   // Also, send over each of our TCP sockets:
00190   for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00191        streams = streams->fNext) {
00192     sendRTPOverTCP(packet, packetSize,
00193                    streams->fStreamSocketNum, streams->fStreamChannelId);
00194   }
00195 }
00196 
00197 void RTPInterface
00198 ::startNetworkReading(TaskScheduler::BackgroundHandlerProc* handlerProc) {
00199   // Normal case: Arrange to read UDP packets:
00200   envir().taskScheduler().
00201     turnOnBackgroundReadHandling(fGS->socketNum(), handlerProc, fOwner);
00202 
00203   // Also, receive RTP over TCP, on each of our TCP connections:
00204   fReadHandlerProc = handlerProc;
00205   for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00206        streams = streams->fNext) {
00207     // Get a socket descriptor for "streams->fStreamSocketNum":
00208     SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), streams->fStreamSocketNum);
00209 
00210     // Tell it about our subChannel:
00211     socketDescriptor->registerRTPInterface(streams->fStreamChannelId, this);
00212   }
00213 }
00214 
00215 Boolean RTPInterface::handleRead(unsigned char* buffer, unsigned bufferMaxSize,
00216                                  unsigned& bytesRead, struct sockaddr_in& fromAddress, Boolean& packetReadWasIncomplete) {
00217   packetReadWasIncomplete = False; // by default
00218   Boolean readSuccess;
00219   if (fNextTCPReadStreamSocketNum < 0) {
00220     // Normal case: read from the (datagram) 'groupsock':
00221     readSuccess = fGS->handleRead(buffer, bufferMaxSize, bytesRead, fromAddress);
00222   } else {
00223     // Read from the TCP connection:
00224     bytesRead = 0;
00225     unsigned totBytesToRead = fNextTCPReadSize;
00226     if (totBytesToRead > bufferMaxSize) totBytesToRead = bufferMaxSize;
00227     unsigned curBytesToRead = totBytesToRead;
00228     int curBytesRead;
00229     while ((curBytesRead = readSocket(envir(), fNextTCPReadStreamSocketNum,
00230                                       &buffer[bytesRead], curBytesToRead,
00231                                       fromAddress)) > 0) {
00232       bytesRead += curBytesRead;
00233       if (bytesRead >= totBytesToRead) break;
00234       curBytesToRead -= curBytesRead;
00235     }
00236     fNextTCPReadSize -= bytesRead;
00237     if (curBytesRead == 0 && curBytesToRead > 0) {
00238       packetReadWasIncomplete = True;
00239       return True;
00240     } else if (curBytesRead < 0) {
00241       bytesRead = 0;
00242       readSuccess = False;
00243     } else {
00244       readSuccess = True;
00245     }
00246     fNextTCPReadStreamSocketNum = -1; // default, for next time
00247   }
00248 
00249   if (readSuccess && fAuxReadHandlerFunc != NULL) {
00250     // Also pass the newly-read packet data to our auxilliary handler:
00251     (*fAuxReadHandlerFunc)(fAuxReadHandlerClientData, buffer, bytesRead);
00252   }
00253   return readSuccess;
00254 }
00255 
00256 void RTPInterface::stopNetworkReading() {
00257   // Normal case
00258   envir().taskScheduler().turnOffBackgroundReadHandling(fGS->socketNum());
00259 
00260   // Also turn off read handling on each of our TCP connections:
00261   for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00262        streams = streams->fNext) {
00263     deregisterSocket(envir(), streams->fStreamSocketNum, streams->fStreamChannelId);
00264   }
00265 }
00266 
00267 
00269 
00270 void sendRTPOverTCP(unsigned char* packet, unsigned packetSize,
00271                     int socketNum, unsigned char streamChannelId) {
00272 #ifdef DEBUG
00273   fprintf(stderr, "sendRTPOverTCP: %d bytes over channel %d (socket %d)\n",
00274           packetSize, streamChannelId, socketNum); fflush(stderr);
00275 #endif
00276   // Send RTP over TCP, using the encoding defined in
00277   // RFC 2326, section 10.12:
00278   do {
00279     char const dollar = '$';
00280     if (send(socketNum, &dollar, 1, 0) != 1) break;
00281     if (send(socketNum, (char*)&streamChannelId, 1, 0) != 1) break;
00282 
00283     char netPacketSize[2];
00284     netPacketSize[0] = (char) ((packetSize&0xFF00)>>8);
00285     netPacketSize[1] = (char) (packetSize&0xFF);
00286     if (send(socketNum, netPacketSize, 2, 0) != 2) break;
00287 
00288     if (send(socketNum, (char*)packet, packetSize, 0) != (int)packetSize) break;
00289 
00290 #ifdef DEBUG
00291     fprintf(stderr, "sendRTPOverTCP: completed\n"); fflush(stderr);
00292 #endif
00293 
00294     return;
00295   } while (0);
00296 
00297 #ifdef DEBUG
00298   fprintf(stderr, "sendRTPOverTCP: failed!\n"); fflush(stderr);
00299 #endif
00300 }
00301 
00302 SocketDescriptor::SocketDescriptor(UsageEnvironment& env, int socketNum)
00303   :fEnv(env), fOurSocketNum(socketNum),
00304     fSubChannelHashTable(HashTable::create(ONE_WORD_HASH_KEYS)),
00305    fServerRequestAlternativeByteHandler(NULL), fServerRequestAlternativeByteHandlerClientData(NULL),
00306    fTCPReadingState(AWAITING_DOLLAR) {
00307 }
00308 
00309 SocketDescriptor::~SocketDescriptor() {
00310   delete fSubChannelHashTable;
00311 }
00312 
00313 void SocketDescriptor::registerRTPInterface(unsigned char streamChannelId,
00314                                             RTPInterface* rtpInterface) {
00315   Boolean isFirstRegistration = fSubChannelHashTable->IsEmpty();
00316   fSubChannelHashTable->Add((char const*)(long)streamChannelId,
00317                             rtpInterface);
00318 
00319   if (isFirstRegistration) {
00320     // Arrange to handle reads on this TCP socket:
00321     TaskScheduler::BackgroundHandlerProc* handler
00322       = (TaskScheduler::BackgroundHandlerProc*)&tcpReadHandler;
00323     fEnv.taskScheduler().
00324       turnOnBackgroundReadHandling(fOurSocketNum, handler, this);
00325   }
00326 }
00327 
00328 RTPInterface* SocketDescriptor
00329 ::lookupRTPInterface(unsigned char streamChannelId) {
00330   char const* lookupArg = (char const*)(long)streamChannelId;
00331   return (RTPInterface*)(fSubChannelHashTable->Lookup(lookupArg));
00332 }
00333 
00334 void SocketDescriptor
00335 ::deregisterRTPInterface(unsigned char streamChannelId) {
00336   fSubChannelHashTable->Remove((char const*)(long)streamChannelId);
00337 
00338   if (fSubChannelHashTable->IsEmpty()) {
00339     // No more interfaces are using us, so it's curtains for us now
00340     fEnv.taskScheduler().turnOffBackgroundReadHandling(fOurSocketNum);
00341     removeSocketDescription(fEnv, fOurSocketNum);
00342     delete this;
00343   }
00344 }
00345 
00346 void SocketDescriptor::tcpReadHandler(SocketDescriptor* socketDescriptor, int mask) {
00347   socketDescriptor->tcpReadHandler1(mask);
00348 }
00349 
00350 void SocketDescriptor::tcpReadHandler1(int mask) {
00351   // We expect the following data over the TCP channel:
00352   //   optional RTSP command or response bytes (before the first '$' character)
00353   //   a '$' character
00354   //   a 1-byte channel id
00355   //   a 2-byte packet size (in network byte order)
00356   //   the packet data.
00357   // However, because the socket is being read asynchronously, this data might arrive in pieces.
00358   
00359   u_int8_t c;
00360   struct sockaddr_in fromAddress;
00361   if (fTCPReadingState != AWAITING_PACKET_DATA) {
00362     int result = readSocket(fEnv, fOurSocketNum, &c, 1, fromAddress);
00363     if (result != 1) { // error reading TCP socket, or no more data available
00364       if (result < 0) { // error
00365         fEnv.taskScheduler().turnOffBackgroundReadHandling(fOurSocketNum); // stops further calls to us
00366       }
00367       return;
00368     }
00369   }
00370   
00371   switch (fTCPReadingState) {
00372     case AWAITING_DOLLAR: {
00373       if (c == '$') {
00374         fTCPReadingState = AWAITING_STREAM_CHANNEL_ID;
00375       } else {
00376         // This character is part of a RTSP request or command, which is handled separately:
00377         if (fServerRequestAlternativeByteHandler != NULL) {
00378           (*fServerRequestAlternativeByteHandler)(fServerRequestAlternativeByteHandlerClientData, c);
00379         }
00380       }
00381       break;
00382     }
00383     case AWAITING_STREAM_CHANNEL_ID: {
00384       // The byte that we read is the stream channel id.
00385       fStreamChannelId = c;
00386       fTCPReadingState = AWAITING_SIZE1;
00387       break;
00388     }
00389     case AWAITING_SIZE1: {
00390       // The byte that we read is the first (high) byte of the 16-bit RTP or RTCP packet 'size'.
00391       fSizeByte1 = c;
00392       fTCPReadingState = AWAITING_SIZE2;
00393       break;
00394     }
00395     case AWAITING_SIZE2: {
00396       // The byte that we read is the second (low) byte of the 16-bit RTP or RTCP packet 'size'.
00397       unsigned short size = (fSizeByte1<<8)|c;
00398       
00399       // Record the information about the packet data that will be read next:
00400       RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId);
00401       if (rtpInterface != NULL) {
00402         rtpInterface->fNextTCPReadSize = size;
00403         rtpInterface->fNextTCPReadStreamSocketNum = fOurSocketNum;
00404         rtpInterface->fNextTCPReadStreamChannelId = fStreamChannelId;
00405       }
00406       fTCPReadingState = AWAITING_PACKET_DATA;
00407       break;
00408     }
00409     case AWAITING_PACKET_DATA: {
00410       // Call the appropriate read handler to get the packet data from the TCP stream:
00411       RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId);
00412       if (rtpInterface != NULL) {
00413         if (rtpInterface->fNextTCPReadSize == 0) {
00414           // We've already read all the data for this packet.
00415           fTCPReadingState = AWAITING_DOLLAR;
00416           break;
00417         }
00418         if (rtpInterface->fReadHandlerProc != NULL) {
00419 #ifdef DEBUG
00420           fprintf(stderr, "SocketDescriptor::tcpReadHandler() reading %d bytes on channel %d\n", rtpInterface->fNextTCPReadSize, rtpInterface->fNextTCPReadStreamChannelId);
00421 #endif
00422           rtpInterface->fReadHandlerProc(rtpInterface->fOwner, mask);
00423         }
00424       }
00425       return;
00426     }
00427   }
00428 }
00429 
00430 
00432 
00433 tcpStreamRecord
00434 ::tcpStreamRecord(int streamSocketNum, unsigned char streamChannelId,
00435                   tcpStreamRecord* next)
00436   : fNext(next),
00437     fStreamSocketNum(streamSocketNum), fStreamChannelId(streamChannelId) {
00438 }
00439 
00440 tcpStreamRecord::~tcpStreamRecord() {
00441   delete fNext;
00442 }
00443 

Generated on Fri Sep 3 02:35:41 2010 for live by  doxygen 1.5.2