liveMedia/OnDemandServerMediaSubsession.cpp

Go to the documentation of this file.
00001 /**********
00002 This library is free software; you can redistribute it and/or modify it under
00003 the terms of the GNU Lesser General Public License as published by the
00004 Free Software Foundation; either version 2.1 of the License, or (at your
00005 option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
00006 
00007 This library is distributed in the hope that it will be useful, but WITHOUT
00008 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00009 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
00010 more details.
00011 
00012 You should have received a copy of the GNU Lesser General Public License
00013 along with this library; if not, write to the Free Software Foundation, Inc.,
00014 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
00015 **********/
00016 // "liveMedia"
00017 // Copyright (c) 1996-2012 Live Networks, Inc.  All rights reserved.
00018 // A 'ServerMediaSubsession' object that creates new, unicast, "RTPSink"s
00019 // on demand.
00020 // Implementation
00021 
00022 #include "OnDemandServerMediaSubsession.hh"
00023 #include <GroupsockHelper.hh>
00024 
00025 OnDemandServerMediaSubsession
00026 ::OnDemandServerMediaSubsession(UsageEnvironment& env,
00027                                 Boolean reuseFirstSource,
00028                                 portNumBits initialPortNum)
00029   : ServerMediaSubsession(env),
00030     fSDPLines(NULL), fReuseFirstSource(reuseFirstSource), fInitialPortNum(initialPortNum), fLastStreamToken(NULL) {
00031   fDestinationsHashTable = HashTable::create(ONE_WORD_HASH_KEYS);
00032   gethostname(fCNAME, sizeof fCNAME);
00033   fCNAME[sizeof fCNAME-1] = '\0'; // just in case
00034 }
00035 
00036 OnDemandServerMediaSubsession::~OnDemandServerMediaSubsession() {
00037   delete[] fSDPLines;
00038 
00039   // Clean out the destinations hash table:
00040   while (1) {
00041     Destinations* destinations
00042       = (Destinations*)(fDestinationsHashTable->RemoveNext());
00043     if (destinations == NULL) break;
00044     delete destinations;
00045   }
00046   delete fDestinationsHashTable;
00047 }
00048 
00049 char const*
00050 OnDemandServerMediaSubsession::sdpLines() {
00051   if (fSDPLines == NULL) {
00052     // We need to construct a set of SDP lines that describe this
00053     // subsession (as a unicast stream).  To do so, we first create
00054     // dummy (unused) source and "RTPSink" objects,
00055     // whose parameters we use for the SDP lines:
00056     unsigned estBitrate;
00057     FramedSource* inputSource = createNewStreamSource(0, estBitrate);
00058     if (inputSource == NULL) return NULL; // file not found
00059 
00060     struct in_addr dummyAddr;
00061     dummyAddr.s_addr = 0;
00062     Groupsock dummyGroupsock(envir(), dummyAddr, 0, 0);
00063     unsigned char rtpPayloadType = 96 + trackNumber()-1; // if dynamic
00064     RTPSink* dummyRTPSink
00065       = createNewRTPSink(&dummyGroupsock, rtpPayloadType, inputSource);
00066 
00067     setSDPLinesFromRTPSink(dummyRTPSink, inputSource, estBitrate);
00068     Medium::close(dummyRTPSink);
00069     closeStreamSource(inputSource);
00070   }
00071 
00072   return fSDPLines;
00073 }
00074 
00075 void OnDemandServerMediaSubsession
00076 ::getStreamParameters(unsigned clientSessionId,
00077                       netAddressBits clientAddress,
00078                       Port const& clientRTPPort,
00079                       Port const& clientRTCPPort,
00080                       int tcpSocketNum,
00081                       unsigned char rtpChannelId,
00082                       unsigned char rtcpChannelId,
00083                       netAddressBits& destinationAddress,
00084                       u_int8_t& /*destinationTTL*/,
00085                       Boolean& isMulticast,
00086                       Port& serverRTPPort,
00087                       Port& serverRTCPPort,
00088                       void*& streamToken) {
00089   if (destinationAddress == 0) destinationAddress = clientAddress;
00090   struct in_addr destinationAddr; destinationAddr.s_addr = destinationAddress;
00091   isMulticast = False;
00092 
00093   if (fLastStreamToken != NULL && fReuseFirstSource) {
00094     // Special case: Rather than creating a new 'StreamState',
00095     // we reuse the one that we've already created:
00096     serverRTPPort = ((StreamState*)fLastStreamToken)->serverRTPPort();
00097     serverRTCPPort = ((StreamState*)fLastStreamToken)->serverRTCPPort();
00098     ++((StreamState*)fLastStreamToken)->referenceCount();
00099     streamToken = fLastStreamToken;
00100   } else {
00101     // Normal case: Create a new media source:
00102     unsigned streamBitrate;
00103     FramedSource* mediaSource
00104       = createNewStreamSource(clientSessionId, streamBitrate);
00105 
00106     // Create 'groupsock' and 'sink' objects for the destination,
00107     // using previously unused server port numbers:
00108     RTPSink* rtpSink;
00109     BasicUDPSink* udpSink;
00110     Groupsock* rtpGroupsock;
00111     Groupsock* rtcpGroupsock;
00112     portNumBits serverPortNum;
00113     if (clientRTCPPort.num() == 0) {
00114       // We're streaming raw UDP (not RTP). Create a single groupsock:
00115       NoReuse dummy(envir()); // ensures that we skip over ports that are already in use
00116       for (serverPortNum = fInitialPortNum; ; ++serverPortNum) {
00117         struct in_addr dummyAddr; dummyAddr.s_addr = 0;
00118 
00119         serverRTPPort = serverPortNum;
00120         rtpGroupsock = new Groupsock(envir(), dummyAddr, serverRTPPort, 255);
00121         if (rtpGroupsock->socketNum() >= 0) break; // success
00122       }
00123 
00124       rtcpGroupsock = NULL;
00125       rtpSink = NULL;
00126       udpSink = BasicUDPSink::createNew(envir(), rtpGroupsock);
00127     } else {
00128       // Normal case: We're streaming RTP (over UDP or TCP).  Create a pair of
00129       // groupsocks (RTP and RTCP), with adjacent port numbers (RTP port number even):
00130       NoReuse dummy(envir()); // ensures that we skip over ports that are already in use
00131       for (portNumBits serverPortNum = fInitialPortNum; ; serverPortNum += 2) {
00132         struct in_addr dummyAddr; dummyAddr.s_addr = 0;
00133 
00134         serverRTPPort = serverPortNum;
00135         rtpGroupsock = new Groupsock(envir(), dummyAddr, serverRTPPort, 255);
00136         if (rtpGroupsock->socketNum() < 0) {
00137           delete rtpGroupsock;
00138           continue; // try again
00139         }
00140 
00141         serverRTCPPort = serverPortNum+1;
00142         rtcpGroupsock = new Groupsock(envir(), dummyAddr, serverRTCPPort, 255);
00143         if (rtcpGroupsock->socketNum() < 0) {
00144           delete rtpGroupsock;
00145           delete rtcpGroupsock;
00146           continue; // try again
00147         }
00148 
00149         break; // success
00150       }
00151 
00152       unsigned char rtpPayloadType = 96 + trackNumber()-1; // if dynamic
00153       rtpSink = createNewRTPSink(rtpGroupsock, rtpPayloadType, mediaSource);
00154       udpSink = NULL;
00155     }
00156 
00157     // Turn off the destinations for each groupsock.  They'll get set later
00158     // (unless TCP is used instead):
00159     if (rtpGroupsock != NULL) rtpGroupsock->removeAllDestinations();
00160     if (rtcpGroupsock != NULL) rtcpGroupsock->removeAllDestinations();
00161 
00162     if (rtpGroupsock != NULL) {
00163       // Try to use a big send buffer for RTP -  at least 0.1 second of
00164       // specified bandwidth and at least 50 KB
00165       unsigned rtpBufSize = streamBitrate * 25 / 2; // 1 kbps * 0.1 s = 12.5 bytes
00166       if (rtpBufSize < 50 * 1024) rtpBufSize = 50 * 1024;
00167       increaseSendBufferTo(envir(), rtpGroupsock->socketNum(), rtpBufSize);
00168     }
00169 
00170     // Set up the state of the stream.  The stream will get started later:
00171     streamToken = fLastStreamToken
00172       = new StreamState(*this, serverRTPPort, serverRTCPPort, rtpSink, udpSink,
00173                         streamBitrate, mediaSource,
00174                         rtpGroupsock, rtcpGroupsock);
00175   }
00176 
00177   // Record these destinations as being for this client session id:
00178   Destinations* destinations;
00179   if (tcpSocketNum < 0) { // UDP
00180     destinations = new Destinations(destinationAddr, clientRTPPort, clientRTCPPort);
00181   } else { // TCP
00182     destinations = new Destinations(tcpSocketNum, rtpChannelId, rtcpChannelId);
00183   }
00184   fDestinationsHashTable->Add((char const*)clientSessionId, destinations);
00185 }
00186 
00187 void OnDemandServerMediaSubsession::startStream(unsigned clientSessionId,
00188                                                 void* streamToken,
00189                                                 TaskFunc* rtcpRRHandler,
00190                                                 void* rtcpRRHandlerClientData,
00191                                                 unsigned short& rtpSeqNum,
00192                                                 unsigned& rtpTimestamp,
00193                                                 ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,
00194                                                 void* serverRequestAlternativeByteHandlerClientData) {
00195   StreamState* streamState = (StreamState*)streamToken;
00196   Destinations* destinations
00197     = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId));
00198   if (streamState != NULL) {
00199     streamState->startPlaying(destinations,
00200                               rtcpRRHandler, rtcpRRHandlerClientData,
00201                               serverRequestAlternativeByteHandler, serverRequestAlternativeByteHandlerClientData);
00202     if (streamState->rtpSink() != NULL) {
00203       rtpSeqNum = streamState->rtpSink()->currentSeqNo();
00204       rtpTimestamp = streamState->rtpSink()->presetNextTimestamp();
00205     }
00206   }
00207 }
00208 
00209 void OnDemandServerMediaSubsession::pauseStream(unsigned /*clientSessionId*/,
00210                                                 void* streamToken) {
00211   // Pausing isn't allowed if multiple clients are receiving data from
00212   // the same source:
00213   if (fReuseFirstSource) return;
00214 
00215   StreamState* streamState = (StreamState*)streamToken;
00216   if (streamState != NULL) streamState->pause();
00217 }
00218 
00219 void OnDemandServerMediaSubsession::seekStream(unsigned /*clientSessionId*/,
00220                                                void* streamToken, double& seekNPT, double streamDuration, u_int64_t& numBytes) {
00221   numBytes = 0; // by default: unknown
00222 
00223   // Seeking isn't allowed if multiple clients are receiving data from
00224   // the same source:
00225   if (fReuseFirstSource) return;
00226 
00227   StreamState* streamState = (StreamState*)streamToken;
00228   if (streamState != NULL && streamState->mediaSource() != NULL) {
00229     seekStreamSource(streamState->mediaSource(), seekNPT, streamDuration, numBytes);
00230   }
00231 }
00232 
00233 void OnDemandServerMediaSubsession::setStreamScale(unsigned /*clientSessionId*/,
00234                                                    void* streamToken, float scale) {
00235   // Changing the scale factor isn't allowed if multiple clients are receiving data
00236   // from the same source:
00237   if (fReuseFirstSource) return;
00238 
00239   StreamState* streamState = (StreamState*)streamToken;
00240   if (streamState != NULL && streamState->mediaSource() != NULL) {
00241     setStreamSourceScale(streamState->mediaSource(), scale);
00242   }
00243 }
00244 
00245 FramedSource* OnDemandServerMediaSubsession::getStreamSource(void* streamToken) {
00246   if (streamToken == NULL) return NULL;
00247 
00248   StreamState* streamState = (StreamState*)streamToken;
00249   return streamState->mediaSource();
00250 }
00251 
00252 void OnDemandServerMediaSubsession::deleteStream(unsigned clientSessionId,
00253                                                  void*& streamToken) {
00254   StreamState* streamState = (StreamState*)streamToken;
00255 
00256   // Look up (and remove) the destinations for this client session:
00257   Destinations* destinations
00258     = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId));
00259   if (destinations != NULL) {
00260     fDestinationsHashTable->Remove((char const*)clientSessionId);
00261 
00262     // Stop streaming to these destinations:
00263     if (streamState != NULL) streamState->endPlaying(destinations);
00264   }
00265 
00266   // Delete the "StreamState" structure if it's no longer being used:
00267   if (streamState != NULL) {
00268     if (streamState->referenceCount() > 0) --streamState->referenceCount();
00269     if (streamState->referenceCount() == 0) {
00270       delete streamState;
00271       streamToken = NULL;
00272     }
00273   }
00274 
00275   // Finally, delete the destinations themselves:
00276   delete destinations;
00277 }
00278 
00279 char const* OnDemandServerMediaSubsession
00280 ::getAuxSDPLine(RTPSink* rtpSink, FramedSource* /*inputSource*/) {
00281   // Default implementation:
00282   return rtpSink == NULL ? NULL : rtpSink->auxSDPLine();
00283 }
00284 
00285 void OnDemandServerMediaSubsession::seekStreamSource(FramedSource* /*inputSource*/,
00286                                                      double& /*seekNPT*/, double /*streamDuration*/, u_int64_t& numBytes) {
00287   // Default implementation: Do nothing
00288 }
00289 
00290 void OnDemandServerMediaSubsession
00291 ::setStreamSourceScale(FramedSource* /*inputSource*/, float /*scale*/) {
00292   // Default implementation: Do nothing
00293 }
00294 
00295 void OnDemandServerMediaSubsession::closeStreamSource(FramedSource *inputSource) {
00296   Medium::close(inputSource);
00297 }
00298 
00299 void OnDemandServerMediaSubsession
00300 ::setSDPLinesFromRTPSink(RTPSink* rtpSink, FramedSource* inputSource, unsigned estBitrate) {
00301   if (rtpSink == NULL) return;
00302 
00303   char const* mediaType = rtpSink->sdpMediaType();
00304   unsigned char rtpPayloadType = rtpSink->rtpPayloadType();
00305   AddressString ipAddressStr(fServerAddressForSDP);
00306   char* rtpmapLine = rtpSink->rtpmapLine();
00307   char const* rangeLine = rangeSDPLine();
00308   char const* auxSDPLine = getAuxSDPLine(rtpSink, inputSource);
00309   if (auxSDPLine == NULL) auxSDPLine = "";
00310 
00311   char const* const sdpFmt =
00312     "m=%s %u RTP/AVP %d\r\n"
00313     "c=IN IP4 %s\r\n"
00314     "b=AS:%u\r\n"
00315     "%s"
00316     "%s"
00317     "%s"
00318     "a=control:%s\r\n";
00319   unsigned sdpFmtSize = strlen(sdpFmt)
00320     + strlen(mediaType) + 5 /* max short len */ + 3 /* max char len */
00321     + strlen(ipAddressStr.val())
00322     + 20 /* max int len */
00323     + strlen(rtpmapLine)
00324     + strlen(rangeLine)
00325     + strlen(auxSDPLine)
00326     + strlen(trackId());
00327   char* sdpLines = new char[sdpFmtSize];
00328   sprintf(sdpLines, sdpFmt,
00329           mediaType, // m= <media>
00330           fPortNumForSDP, // m= <port>
00331           rtpPayloadType, // m= <fmt list>
00332           ipAddressStr.val(), // c= address
00333           estBitrate, // b=AS:<bandwidth>
00334           rtpmapLine, // a=rtpmap:... (if present)
00335           rangeLine, // a=range:... (if present)
00336           auxSDPLine, // optional extra SDP line
00337           trackId()); // a=control:<track-id>
00338   delete[] (char*)rangeLine; delete[] rtpmapLine;
00339 
00340   fSDPLines = strDup(sdpLines);
00341   delete[] sdpLines;
00342 }
00343 
00344 
00346 
00347 static void afterPlayingStreamState(void* clientData) {
00348   StreamState* streamState = (StreamState*)clientData;
00349   if (streamState->streamDuration() == 0.0) {
00350     // When the input stream ends, tear it down.  This will cause a RTCP "BYE"
00351     // to be sent to each client, teling it that the stream has ended.
00352     // (Because the stream didn't have a known duration, there was no other
00353     //  way for clients to know when the stream ended.)
00354     streamState->reclaim();
00355   }
00356   // Otherwise, keep the stream alive, in case a client wants to
00357   // subsequently re-play the stream starting from somewhere other than the end.
00358   // (This can be done only on streams that have a known duration.)
00359 }
00360 
00361 StreamState::StreamState(OnDemandServerMediaSubsession& master,
00362                          Port const& serverRTPPort, Port const& serverRTCPPort,
00363                          RTPSink* rtpSink, BasicUDPSink* udpSink,
00364                          unsigned totalBW, FramedSource* mediaSource,
00365                          Groupsock* rtpGS, Groupsock* rtcpGS)
00366   : fMaster(master), fAreCurrentlyPlaying(False), fReferenceCount(1),
00367     fServerRTPPort(serverRTPPort), fServerRTCPPort(serverRTCPPort),
00368     fRTPSink(rtpSink), fUDPSink(udpSink), fStreamDuration(master.duration()),
00369     fTotalBW(totalBW), fRTCPInstance(NULL) /* created later */,
00370     fMediaSource(mediaSource), fRTPgs(rtpGS), fRTCPgs(rtcpGS) {
00371 }
00372 
00373 StreamState::~StreamState() {
00374   reclaim();
00375 }
00376 
00377 void StreamState
00378 ::startPlaying(Destinations* dests,
00379                TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData,
00380                ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,
00381                void* serverRequestAlternativeByteHandlerClientData) {
00382   if (dests == NULL) return;
00383 
00384   if (fRTCPInstance == NULL && fRTPSink != NULL) {
00385     // Create (and start) a 'RTCP instance' for this RTP sink:
00386     fRTCPInstance
00387       = RTCPInstance::createNew(fRTPSink->envir(), fRTCPgs,
00388                                 fTotalBW, (unsigned char*)fMaster.fCNAME,
00389                                 fRTPSink, NULL /* we're a server */);
00390         // Note: This starts RTCP running automatically
00391   }
00392 
00393   if (dests->isTCP) {
00394     // Change RTP and RTCP to use the TCP socket instead of UDP:
00395     if (fRTPSink != NULL) {
00396       fRTPSink->addStreamSocket(dests->tcpSocketNum, dests->rtpChannelId);
00397       fRTPSink->setServerRequestAlternativeByteHandler(dests->tcpSocketNum, serverRequestAlternativeByteHandler, serverRequestAlternativeByteHandlerClientData);
00398     }
00399     if (fRTCPInstance != NULL) {
00400       fRTCPInstance->addStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId);
00401       fRTCPInstance->setSpecificRRHandler(dests->tcpSocketNum, dests->rtcpChannelId,
00402                                           rtcpRRHandler, rtcpRRHandlerClientData);
00403     }
00404   } else {
00405     // Tell the RTP and RTCP 'groupsocks' about this destination
00406     // (in case they don't already have it):
00407     if (fRTPgs != NULL) fRTPgs->addDestination(dests->addr, dests->rtpPort);
00408     if (fRTCPgs != NULL) fRTCPgs->addDestination(dests->addr, dests->rtcpPort);
00409     if (fRTCPInstance != NULL) {
00410       fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort,
00411                                           rtcpRRHandler, rtcpRRHandlerClientData);
00412     }
00413   }
00414 
00415   if (!fAreCurrentlyPlaying && fMediaSource != NULL) {
00416     if (fRTPSink != NULL) {
00417       fRTPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
00418       fAreCurrentlyPlaying = True;
00419     } else if (fUDPSink != NULL) {
00420       fUDPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
00421       fAreCurrentlyPlaying = True;
00422     }
00423   }
00424 }
00425 
00426 void StreamState::pause() {
00427   if (fRTPSink != NULL) fRTPSink->stopPlaying();
00428   if (fUDPSink != NULL) fUDPSink->stopPlaying();
00429   fAreCurrentlyPlaying = False;
00430 }
00431 
00432 void StreamState::endPlaying(Destinations* dests) {
00433   if (dests->isTCP) {
00434     if (fRTPSink != NULL) {
00435       fRTPSink->removeStreamSocket(dests->tcpSocketNum, dests->rtpChannelId);
00436     }
00437     if (fRTCPInstance != NULL) {
00438       fRTCPInstance->removeStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId);
00439       fRTCPInstance->unsetSpecificRRHandler(dests->tcpSocketNum, dests->rtcpChannelId);
00440     }
00441   } else {
00442     // Tell the RTP and RTCP 'groupsocks' to stop using these destinations:
00443     if (fRTPgs != NULL) fRTPgs->removeDestination(dests->addr, dests->rtpPort);
00444     if (fRTCPgs != NULL) fRTCPgs->removeDestination(dests->addr, dests->rtcpPort);
00445     if (fRTCPInstance != NULL) {
00446       fRTCPInstance->unsetSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort);
00447     }
00448   }
00449 }
00450 
00451 void StreamState::reclaim() {
00452   // Delete allocated media objects
00453   Medium::close(fRTCPInstance) /* will send a RTCP BYE */; fRTCPInstance = NULL;
00454   Medium::close(fRTPSink); fRTPSink = NULL;
00455   Medium::close(fUDPSink); fUDPSink = NULL;
00456 
00457   fMaster.closeStreamSource(fMediaSource); fMediaSource = NULL;
00458   if (fMaster.fLastStreamToken == this) fMaster.fLastStreamToken = NULL;
00459 
00460   delete fRTPgs; fRTPgs = NULL;
00461   delete fRTCPgs; fRTCPgs = NULL;
00462 }

Generated on Thu Feb 2 23:51:31 2012 for live by  doxygen 1.5.2