testProgs/playCommon.cpp

Go to the documentation of this file.
00001 /**********
00002 This library is free software; you can redistribute it and/or modify it under
00003 the terms of the GNU Lesser General Public License as published by the
00004 Free Software Foundation; either version 2.1 of the License, or (at your
00005 option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
00006 
00007 This library is distributed in the hope that it will be useful, but WITHOUT
00008 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00009 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
00010 more details.
00011 
00012 You should have received a copy of the GNU Lesser General Public License
00013 along with this library; if not, write to the Free Software Foundation, Inc.,
00014 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
00015 **********/
00016 // Copyright (c) 1996-2010, Live Networks, Inc.  All rights reserved
00017 // A common framework, used for the "openRTSP" and "playSIP" applications
00018 // Implementation
00019 
00020 #include "playCommon.hh"
00021 #include "BasicUsageEnvironment.hh"
00022 #include "GroupsockHelper.hh"
00023 
00024 #if defined(__WIN32__) || defined(_WIN32)
00025 #define snprintf _snprintf
00026 #else
00027 #include <signal.h>
00028 #define USE_SIGNALS 1
00029 #endif
00030 
00031 // Forward function definitions:
00032 void continueAfterOPTIONS(RTSPClient* client, int resultCode, char* resultString);
00033 void continueAfterDESCRIBE(RTSPClient* client, int resultCode, char* resultString);
00034 void continueAfterSETUP(RTSPClient* client, int resultCode, char* resultString);
00035 void continueAfterPLAY(RTSPClient* client, int resultCode, char* resultString);
00036 void continueAfterTEARDOWN(RTSPClient* client, int resultCode, char* resultString);
00037 
00038 void setupStreams();
00039 void closeMediaSinks();
00040 void subsessionAfterPlaying(void* clientData);
00041 void subsessionByeHandler(void* clientData);
00042 void sessionAfterPlaying(void* clientData = NULL);
00043 void sessionTimerHandler(void* clientData);
00044 void shutdown(int exitCode = 1);
00045 void signalHandlerShutdown(int sig);
00046 void checkForPacketArrival(void* clientData);
00047 void checkInterPacketGaps(void* clientData);
00048 void beginQOSMeasurement();
00049 
00050 char const* progName;
00051 UsageEnvironment* env;
00052 Medium* ourClient = NULL;
00053 Authenticator* ourAuthenticator = NULL;
00054 char const* streamURL = NULL;
00055 MediaSession* session = NULL;
00056 TaskToken sessionTimerTask = NULL;
00057 TaskToken arrivalCheckTimerTask = NULL;
00058 TaskToken interPacketGapCheckTimerTask = NULL;
00059 TaskToken qosMeasurementTimerTask = NULL;
00060 Boolean createReceivers = True;
00061 Boolean outputQuickTimeFile = False;
00062 Boolean generateMP4Format = False;
00063 QuickTimeFileSink* qtOut = NULL;
00064 Boolean outputAVIFile = False;
00065 AVIFileSink* aviOut = NULL;
00066 Boolean audioOnly = False;
00067 Boolean videoOnly = False;
00068 char const* singleMedium = NULL;
00069 int verbosityLevel = 1; // by default, print verbose output
00070 double duration = 0;
00071 double durationSlop = -1.0; // extra seconds to play at the end
00072 double initialSeekTime = 0.0f;
00073 float scale = 1.0f;
00074 double endTime;
00075 unsigned interPacketGapMaxTime = 0;
00076 unsigned totNumPacketsReceived = ~0; // used if checking inter-packet gaps
00077 Boolean playContinuously = False;
00078 int simpleRTPoffsetArg = -1;
00079 Boolean sendOptionsRequest = True;
00080 Boolean sendOptionsRequestOnly = False;
00081 Boolean oneFilePerFrame = False;
00082 Boolean notifyOnPacketArrival = False;
00083 Boolean streamUsingTCP = False;
00084 unsigned short desiredPortNum = 0;
00085 portNumBits tunnelOverHTTPPortNum = 0;
00086 char* username = NULL;
00087 char* password = NULL;
00088 char* proxyServerName = NULL;
00089 unsigned short proxyServerPortNum = 0;
00090 unsigned char desiredAudioRTPPayloadFormat = 0;
00091 char* mimeSubtype = NULL;
00092 unsigned short movieWidth = 240; // default
00093 Boolean movieWidthOptionSet = False;
00094 unsigned short movieHeight = 180; // default
00095 Boolean movieHeightOptionSet = False;
00096 unsigned movieFPS = 15; // default
00097 Boolean movieFPSOptionSet = False;
00098 char const* fileNamePrefix = "";
00099 unsigned fileSinkBufferSize = 100000;
00100 unsigned socketInputBufferSize = 0;
00101 Boolean packetLossCompensate = False;
00102 Boolean syncStreams = False;
00103 Boolean generateHintTracks = False;
00104 unsigned qosMeasurementIntervalMS = 0; // 0 means: Don't output QOS data
00105 
00106 struct timeval startTime;
00107 
00108 void usage() {
00109   *env << "Usage: " << progName
00110        << " [-p <startPortNum>] [-r|-q|-4|-i] [-a|-v] [-V] [-d <duration>] [-D <max-inter-packet-gap-time> [-c] [-S <offset>] [-n] [-O]"
00111            << (controlConnectionUsesTCP ? " [-t|-T <http-port>]" : "")
00112        << " [-u <username> <password>"
00113            << (allowProxyServers ? " [<proxy-server> [<proxy-server-port>]]" : "")
00114        << "]" << (supportCodecSelection ? " [-A <audio-codec-rtp-payload-format-code>|-M <mime-subtype-name>]" : "")
00115        << " [-s <initial-seek-time>] [-z <scale>]"
00116        << " [-w <width> -h <height>] [-f <frames-per-second>] [-y] [-H] [-Q [<measurement-interval>]] [-F <filename-prefix>] [-b <file-sink-buffer-size>] [-B <input-socket-buffer-size>] [-I <input-interface-ip-address>] [-m] <url> (or " << progName << " -o [-V] <url>)\n";
00117   shutdown();
00118 }
00119 
00120 int main(int argc, char** argv) {
00121   // Begin by setting up our usage environment:
00122   TaskScheduler* scheduler = BasicTaskScheduler::createNew();
00123   env = BasicUsageEnvironment::createNew(*scheduler);
00124 
00125   progName = argv[0];
00126 
00127   gettimeofday(&startTime, NULL);
00128 
00129 #ifdef USE_SIGNALS
00130   // Allow ourselves to be shut down gracefully by a SIGHUP or a SIGUSR1:
00131   signal(SIGHUP, signalHandlerShutdown);
00132   signal(SIGUSR1, signalHandlerShutdown);
00133 #endif
00134 
00135   // unfortunately we can't use getopt() here, as Windoze doesn't have it
00136   while (argc > 2) {
00137     char* const opt = argv[1];
00138     if (opt[0] != '-') usage();
00139     switch (opt[1]) {
00140 
00141     case 'p': { // specify start port number
00142       int portArg;
00143       if (sscanf(argv[2], "%d", &portArg) != 1) {
00144         usage();
00145       }
00146       if (portArg <= 0 || portArg >= 65536 || portArg&1) {
00147         *env << "bad port number: " << portArg
00148                 << " (must be even, and in the range (0,65536))\n";
00149         usage();
00150       }
00151       desiredPortNum = (unsigned short)portArg;
00152       ++argv; --argc;
00153       break;
00154     }
00155 
00156     case 'r': { // do not receive data (instead, just 'play' the stream(s))
00157       createReceivers = False;
00158       break;
00159     }
00160 
00161     case 'q': { // output a QuickTime file (to stdout)
00162       outputQuickTimeFile = True;
00163       break;
00164     }
00165 
00166     case '4': { // output a 'mp4'-format file (to stdout)
00167       outputQuickTimeFile = True;
00168       generateMP4Format = True;
00169       break;
00170     }
00171 
00172     case 'i': { // output an AVI file (to stdout)
00173       outputAVIFile = True;
00174       break;
00175     }
00176 
00177     case 'I': { // specify input interface...
00178       NetAddressList addresses(argv[2]);
00179       if (addresses.numAddresses() == 0) {
00180         *env << "Failed to find network address for \"" << argv[2] << "\"";
00181         break;
00182       }
00183       ReceivingInterfaceAddr = *(unsigned*)(addresses.firstAddress()->data());
00184       ++argv; --argc;
00185       break;
00186     }
00187 
00188     case 'a': { // receive/record an audio stream only
00189       audioOnly = True;
00190       singleMedium = "audio";
00191       break;
00192     }
00193 
00194     case 'v': { // receive/record a video stream only
00195       videoOnly = True;
00196       singleMedium = "video";
00197       break;
00198     }
00199 
00200     case 'V': { // disable verbose output
00201       verbosityLevel = 0;
00202       break;
00203     }
00204 
00205     case 'd': { // specify duration, or how much to delay after end time
00206       float arg;
00207       if (sscanf(argv[2], "%g", &arg) != 1) {
00208         usage();
00209       }
00210       if (argv[2][0] == '-') { // not "arg<0", in case argv[2] was "-0"
00211         // a 'negative' argument was specified; use this for "durationSlop":
00212         duration = 0; // use whatever's in the SDP
00213         durationSlop = -arg;
00214       } else {
00215         duration = arg;
00216         durationSlop = 0;
00217       }
00218       ++argv; --argc;
00219       break;
00220     }
00221 
00222     case 'D': { // specify maximum number of seconds to wait for packets:
00223       if (sscanf(argv[2], "%u", &interPacketGapMaxTime) != 1) {
00224         usage();
00225       }
00226       ++argv; --argc;
00227       break;
00228     }
00229 
00230     case 'c': { // play continuously
00231       playContinuously = True;
00232       break;
00233     }
00234 
00235     case 'S': { // specify an offset to use with "SimpleRTPSource"s
00236       if (sscanf(argv[2], "%d", &simpleRTPoffsetArg) != 1) {
00237         usage();
00238       }
00239       if (simpleRTPoffsetArg < 0) {
00240         *env << "offset argument to \"-S\" must be >= 0\n";
00241         usage();
00242       }
00243       ++argv; --argc;
00244       break;
00245     }
00246 
00247     case 'O': { // Don't send an "OPTIONS" request before "DESCRIBE"
00248       sendOptionsRequest = False;
00249       break;
00250     }
00251 
00252     case 'o': { // Send only the "OPTIONS" request to the server
00253       sendOptionsRequestOnly = True;
00254       break;
00255     }
00256 
00257     case 'm': { // output multiple files - one for each frame
00258       oneFilePerFrame = True;
00259       break;
00260     }
00261 
00262     case 'n': { // notify the user when the first data packet arrives
00263       notifyOnPacketArrival = True;
00264       break;
00265     }
00266 
00267     case 't': {
00268       // stream RTP and RTCP over the TCP 'control' connection
00269       if (controlConnectionUsesTCP) {
00270         streamUsingTCP = True;
00271       } else {
00272         usage();
00273       }
00274       break;
00275     }
00276 
00277     case 'T': {
00278       // stream RTP and RTCP over a HTTP connection
00279       if (controlConnectionUsesTCP) {
00280         if (argc > 3 && argv[2][0] != '-') {
00281           // The next argument is the HTTP server port number:
00282           if (sscanf(argv[2], "%hu", &tunnelOverHTTPPortNum) == 1
00283               && tunnelOverHTTPPortNum > 0) {
00284             ++argv; --argc;
00285             break;
00286           }
00287         }
00288       }
00289 
00290       // If we get here, the option was specified incorrectly:
00291       usage();
00292       break;
00293     }
00294 
00295     case 'u': { // specify a username and password
00296       username = argv[2];
00297       password = argv[3];
00298       argv+=2; argc-=2;
00299       if (allowProxyServers && argc > 3 && argv[2][0] != '-') {
00300         // The next argument is the name of a proxy server:
00301         proxyServerName = argv[2];
00302         ++argv; --argc;
00303 
00304         if (argc > 3 && argv[2][0] != '-') {
00305           // The next argument is the proxy server port number:
00306           if (sscanf(argv[2], "%hu", &proxyServerPortNum) != 1) {
00307             usage();
00308           }
00309           ++argv; --argc;
00310         }
00311       }
00312 
00313       ourAuthenticator = new Authenticator;
00314       ourAuthenticator->setUsernameAndPassword(username, password);
00315       break;
00316     }
00317 
00318     case 'A': { // specify a desired audio RTP payload format
00319       unsigned formatArg;
00320       if (sscanf(argv[2], "%u", &formatArg) != 1
00321           || formatArg >= 96) {
00322         usage();
00323       }
00324       desiredAudioRTPPayloadFormat = (unsigned char)formatArg;
00325       ++argv; --argc;
00326       break;
00327     }
00328 
00329     case 'M': { // specify a MIME subtype for a dynamic RTP payload type
00330       mimeSubtype = argv[2];
00331       if (desiredAudioRTPPayloadFormat==0) desiredAudioRTPPayloadFormat =96;
00332       ++argv; --argc;
00333       break;
00334     }
00335 
00336     case 'w': { // specify a width (pixels) for an output QuickTime or AVI movie
00337       if (sscanf(argv[2], "%hu", &movieWidth) != 1) {
00338         usage();
00339       }
00340       movieWidthOptionSet = True;
00341       ++argv; --argc;
00342       break;
00343     }
00344 
00345     case 'h': { // specify a height (pixels) for an output QuickTime or AVI movie
00346       if (sscanf(argv[2], "%hu", &movieHeight) != 1) {
00347         usage();
00348       }
00349       movieHeightOptionSet = True;
00350       ++argv; --argc;
00351       break;
00352     }
00353 
00354     case 'f': { // specify a frame rate (per second) for an output QT or AVI movie
00355       if (sscanf(argv[2], "%u", &movieFPS) != 1) {
00356         usage();
00357       }
00358       movieFPSOptionSet = True;
00359       ++argv; --argc;
00360       break;
00361     }
00362 
00363     case 'F': { // specify a prefix for the audio and video output files
00364       fileNamePrefix = argv[2];
00365       ++argv; --argc;
00366       break;
00367     }
00368 
00369     case 'b': { // specify the size of buffers for "FileSink"s
00370       if (sscanf(argv[2], "%u", &fileSinkBufferSize) != 1) {
00371         usage();
00372       }
00373       ++argv; --argc;
00374       break;
00375     }
00376 
00377     case 'B': { // specify the size of input socket buffers
00378       if (sscanf(argv[2], "%u", &socketInputBufferSize) != 1) {
00379         usage();
00380       }
00381       ++argv; --argc;
00382       break;
00383     }
00384 
00385     // Note: The following option is deprecated, and may someday be removed:
00386     case 'l': { // try to compensate for packet loss by repeating frames
00387       packetLossCompensate = True;
00388       break;
00389     }
00390 
00391     case 'y': { // synchronize audio and video streams
00392       syncStreams = True;
00393       break;
00394     }
00395 
00396     case 'H': { // generate hint tracks (as well as the regular data tracks)
00397       generateHintTracks = True;
00398       break;
00399     }
00400 
00401     case 'Q': { // output QOS measurements
00402       qosMeasurementIntervalMS = 1000; // default: 1 second
00403 
00404       if (argc > 3 && argv[2][0] != '-') {
00405         // The next argument is the measurement interval,
00406         // in multiples of 100 ms
00407         if (sscanf(argv[2], "%u", &qosMeasurementIntervalMS) != 1) {
00408           usage();
00409         }
00410         qosMeasurementIntervalMS *= 100;
00411         ++argv; --argc;
00412       }
00413       break;
00414     }
00415 
00416     case 's': { // specify initial seek time (trick play)
00417       double arg;
00418       if (sscanf(argv[2], "%lg", &arg) != 1 || arg < 0) {
00419         usage();
00420       }
00421       initialSeekTime = arg;
00422       ++argv; --argc;
00423       break;
00424     }
00425 
00426     case 'z': { // scale (trick play)
00427       float arg;
00428       if (sscanf(argv[2], "%g", &arg) != 1 || arg == 0.0f) {
00429         usage();
00430       }
00431       scale = arg;
00432       ++argv; --argc;
00433       break;
00434     }
00435 
00436     default: {
00437       usage();
00438       break;
00439     }
00440     }
00441 
00442     ++argv; --argc;
00443   }
00444   if (argc != 2) usage();
00445   if (outputQuickTimeFile && outputAVIFile) {
00446     *env << "The -i and -q (or -4) flags cannot both be used!\n";
00447     usage();
00448   }
00449   Boolean outputCompositeFile = outputQuickTimeFile || outputAVIFile;
00450   if (!createReceivers && outputCompositeFile) {
00451     *env << "The -r and -q (or -4 or -i) flags cannot both be used!\n";
00452     usage();
00453   }
00454   if (outputCompositeFile && !movieWidthOptionSet) {
00455     *env << "Warning: The -q, -4 or -i option was used, but not -w.  Assuming a video width of "
00456          << movieWidth << " pixels\n";
00457   }
00458   if (outputCompositeFile && !movieHeightOptionSet) {
00459     *env << "Warning: The -q, -4 or -i option was used, but not -h.  Assuming a video height of "
00460          << movieHeight << " pixels\n";
00461   }
00462   if (outputCompositeFile && !movieFPSOptionSet) {
00463     *env << "Warning: The -q, -4 or -i option was used, but not -f.  Assuming a video frame rate of "
00464          << movieFPS << " frames-per-second\n";
00465   }
00466   if (audioOnly && videoOnly) {
00467     *env << "The -a and -v flags cannot both be used!\n";
00468     usage();
00469   }
00470   if (sendOptionsRequestOnly && !sendOptionsRequest) {
00471     *env << "The -o and -O flags cannot both be used!\n";
00472     usage();
00473   }
00474   if (tunnelOverHTTPPortNum > 0) {
00475     if (streamUsingTCP) {
00476       *env << "The -t and -T flags cannot both be used!\n";
00477       usage();
00478     } else {
00479       streamUsingTCP = True;
00480     }
00481   }
00482   if (!createReceivers && notifyOnPacketArrival) {
00483     *env << "Warning: Because we're not receiving stream data, the -n flag has no effect\n";
00484   }
00485   if (durationSlop < 0) {
00486     // This parameter wasn't set, so use a default value.
00487     // If we're measuring QOS stats, then don't add any slop, to avoid
00488     // having 'empty' measurement intervals at the end.
00489     durationSlop = qosMeasurementIntervalMS > 0 ? 0.0 : 5.0;
00490   }
00491 
00492   streamURL = argv[1];
00493 
00494   // Create our client object:
00495   ourClient = createClient(*env, streamURL, verbosityLevel, progName);
00496   if (ourClient == NULL) {
00497     *env << "Failed to create " << clientProtocolName
00498                 << " client: " << env->getResultMsg() << "\n";
00499     shutdown();
00500   }
00501 
00502   if (sendOptionsRequest) {
00503     // Begin by sending an "OPTIONS" command:
00504     getOptions(continueAfterOPTIONS);
00505   } else {
00506     continueAfterOPTIONS(NULL, 0, NULL);
00507   }
00508 
00509   // All subsequent activity takes place within the event loop:
00510   env->taskScheduler().doEventLoop(); // does not return
00511 
00512   return 0; // only to prevent compiler warning
00513 }
00514 
00515 void continueAfterOPTIONS(RTSPClient*, int resultCode, char* resultString) {
00516   if (sendOptionsRequestOnly) {
00517     if (resultCode != 0) {
00518       *env << clientProtocolName << " \"OPTIONS\" request failed: " << resultString << "\n";
00519     } else {
00520       *env << clientProtocolName << " \"OPTIONS\" request returned: " << resultString << "\n";
00521     }
00522     shutdown();
00523   }
00524   delete[] resultString;
00525 
00526   // Next, get a SDP description for the stream:
00527   getSDPDescription(continueAfterDESCRIBE);
00528 }
00529 
00530 void continueAfterDESCRIBE(RTSPClient*, int resultCode, char* resultString) {
00531   if (resultCode != 0) {
00532     *env << "Failed to get a SDP description from URL \"" << streamURL << "\": " << resultString << "\n";
00533     shutdown();
00534   }
00535 
00536   char* sdpDescription = resultString;
00537   *env << "Opened URL \"" << streamURL << "\", returning a SDP description:\n" << sdpDescription << "\n";
00538 
00539   // Create a media session object from this SDP description:
00540   session = MediaSession::createNew(*env, sdpDescription);
00541   delete[] sdpDescription;
00542   if (session == NULL) {
00543     *env << "Failed to create a MediaSession object from the SDP description: " << env->getResultMsg() << "\n";
00544     shutdown();
00545   } else if (!session->hasSubsessions()) {
00546     *env << "This session has no media subsessions (i.e., \"m=\" lines)\n";
00547     shutdown();
00548   }
00549 
00550   // Then, setup the "RTPSource"s for the session:
00551   MediaSubsessionIterator iter(*session);
00552   MediaSubsession *subsession;
00553   Boolean madeProgress = False;
00554   char const* singleMediumToTest = singleMedium;
00555   while ((subsession = iter.next()) != NULL) {
00556     // If we've asked to receive only a single medium, then check this now:
00557     if (singleMediumToTest != NULL) {
00558       if (strcmp(subsession->mediumName(), singleMediumToTest) != 0) {
00559                   *env << "Ignoring \"" << subsession->mediumName()
00560                           << "/" << subsession->codecName()
00561                           << "\" subsession, because we've asked to receive a single " << singleMedium
00562                           << " session only\n";
00563         continue;
00564       } else {
00565         // Receive this subsession only
00566         singleMediumToTest = "xxxxx";
00567             // this hack ensures that we get only 1 subsession of this type
00568       }
00569     }
00570 
00571     if (desiredPortNum != 0) {
00572       subsession->setClientPortNum(desiredPortNum);
00573       desiredPortNum += 2;
00574     }
00575 
00576     if (createReceivers) {
00577       if (!subsession->initiate(simpleRTPoffsetArg)) {
00578         *env << "Unable to create receiver for \"" << subsession->mediumName()
00579              << "/" << subsession->codecName()
00580              << "\" subsession: " << env->getResultMsg() << "\n";
00581       } else {
00582         *env << "Created receiver for \"" << subsession->mediumName()
00583              << "/" << subsession->codecName()
00584              << "\" subsession (client ports " << subsession->clientPortNum()
00585              << "-" << subsession->clientPortNum()+1 << ")\n";
00586         madeProgress = True;
00587         
00588         if (subsession->rtpSource() != NULL) {
00589           // Because we're saving the incoming data, rather than playing
00590           // it in real time, allow an especially large time threshold
00591           // (1 second) for reordering misordered incoming packets:
00592           unsigned const thresh = 1000000; // 1 second
00593           subsession->rtpSource()->setPacketReorderingThresholdTime(thresh);
00594           
00595           // Set the RTP source's OS socket buffer size as appropriate - either if we were explicitly asked (using -B),
00596           // or if the desired FileSink buffer size happens to be larger than the current OS socket buffer size.
00597           // (The latter case is a heuristic, on the assumption that if the user asked for a large FileSink buffer size,
00598           // then the input data rate may be large enough to justify increasing the OS socket buffer size also.)
00599           int socketNum = subsession->rtpSource()->RTPgs()->socketNum();
00600           unsigned curBufferSize = getReceiveBufferSize(*env, socketNum);
00601           if (socketInputBufferSize > 0 || fileSinkBufferSize > curBufferSize) {
00602             unsigned newBufferSize = socketInputBufferSize > 0 ? socketInputBufferSize : fileSinkBufferSize;
00603             newBufferSize = setReceiveBufferTo(*env, socketNum, newBufferSize);
00604             if (socketInputBufferSize > 0) { // The user explicitly asked for the new socket buffer size; announce it:
00605               *env << "Changed socket receive buffer size for the \""
00606                    << subsession->mediumName()
00607                    << "/" << subsession->codecName()
00608                    << "\" subsession from "
00609                    << curBufferSize << " to "
00610                    << newBufferSize << " bytes\n";
00611             }
00612           }
00613         }
00614       }
00615     } else {
00616       if (subsession->clientPortNum() == 0) {
00617         *env << "No client port was specified for the \""
00618              << subsession->mediumName()
00619              << "/" << subsession->codecName()
00620              << "\" subsession.  (Try adding the \"-p <portNum>\" option.)\n";
00621       } else {
00622                 madeProgress = True;
00623       }
00624     }
00625   }
00626   if (!madeProgress) shutdown();
00627 
00628   // Perform additional 'setup' on each subsession, before playing them:
00629   setupStreams();
00630 }
00631 
00632 MediaSubsession *subsession;
00633 Boolean madeProgress = False;
00634 void continueAfterSETUP(RTSPClient*, int resultCode, char* resultString) {
00635   if (resultCode == 0) {
00636       *env << "Setup \"" << subsession->mediumName()
00637            << "/" << subsession->codecName()
00638            << "\" subsession (client ports " << subsession->clientPortNum()
00639            << "-" << subsession->clientPortNum()+1 << ")\n";
00640       madeProgress = True;
00641   } else {
00642     *env << "Failed to setup \"" << subsession->mediumName()
00643          << "/" << subsession->codecName()
00644          << "\" subsession: " << env->getResultMsg() << "\n";
00645   }
00646 
00647   // Set up the next subsession, if any:
00648   setupStreams();
00649 }
00650 
00651 void setupStreams() {
00652   static MediaSubsessionIterator* setupIter = NULL;
00653   if (setupIter == NULL) setupIter = new MediaSubsessionIterator(*session);
00654   while ((subsession = setupIter->next()) != NULL) {
00655     // We have another subsession left to set up:
00656     if (subsession->clientPortNum() == 0) continue; // port # was not set
00657 
00658     setupSubsession(subsession, streamUsingTCP, continueAfterSETUP);
00659     return;
00660   }
00661 
00662   // We're done setting up subsessions.
00663   delete setupIter;
00664   if (!madeProgress) shutdown();
00665 
00666   // Create output files:
00667   if (createReceivers) {
00668     if (outputQuickTimeFile) {
00669       // Create a "QuickTimeFileSink", to write to 'stdout':
00670       qtOut = QuickTimeFileSink::createNew(*env, *session, "stdout",
00671                                            fileSinkBufferSize,
00672                                            movieWidth, movieHeight,
00673                                            movieFPS,
00674                                            packetLossCompensate,
00675                                            syncStreams,
00676                                            generateHintTracks,
00677                                            generateMP4Format);
00678       if (qtOut == NULL) {
00679         *env << "Failed to create QuickTime file sink for stdout: " << env->getResultMsg();
00680         shutdown();
00681       }
00682 
00683       qtOut->startPlaying(sessionAfterPlaying, NULL);
00684     } else if (outputAVIFile) {
00685       // Create an "AVIFileSink", to write to 'stdout':
00686       aviOut = AVIFileSink::createNew(*env, *session, "stdout",
00687                                       fileSinkBufferSize,
00688                                       movieWidth, movieHeight,
00689                                       movieFPS,
00690                                       packetLossCompensate);
00691       if (aviOut == NULL) {
00692         *env << "Failed to create AVI file sink for stdout: " << env->getResultMsg();
00693         shutdown();
00694       }
00695 
00696       aviOut->startPlaying(sessionAfterPlaying, NULL);
00697     } else {
00698       // Create and start "FileSink"s for each subsession:
00699       madeProgress = False;
00700       MediaSubsessionIterator iter(*session);
00701       while ((subsession = iter.next()) != NULL) {
00702         if (subsession->readSource() == NULL) continue; // was not initiated
00703 
00704         // Create an output file for each desired stream:
00705         char outFileName[1000];
00706         if (singleMedium == NULL) {
00707           // Output file name is
00708           //     "<filename-prefix><medium_name>-<codec_name>-<counter>"
00709           static unsigned streamCounter = 0;
00710           snprintf(outFileName, sizeof outFileName, "%s%s-%s-%d",
00711                    fileNamePrefix, subsession->mediumName(),
00712                    subsession->codecName(), ++streamCounter);
00713         } else {
00714           sprintf(outFileName, "stdout");
00715         }
00716         FileSink* fileSink;
00717         if (strcmp(subsession->mediumName(), "audio") == 0 &&
00718             (strcmp(subsession->codecName(), "AMR") == 0 ||
00719              strcmp(subsession->codecName(), "AMR-WB") == 0)) {
00720           // For AMR audio streams, we use a special sink that inserts AMR frame hdrs:
00721           fileSink = AMRAudioFileSink::createNew(*env, outFileName,
00722                                                  fileSinkBufferSize, oneFilePerFrame);
00723         } else if (strcmp(subsession->mediumName(), "video") == 0 &&
00724             (strcmp(subsession->codecName(), "H264") == 0)) {
00725           // For H.264 video stream, we use a special sink that insert start_codes:
00726           fileSink = H264VideoFileSink::createNew(*env, outFileName,
00727                                                  fileSinkBufferSize, oneFilePerFrame);
00728         } else {
00729           // Normal case:
00730           fileSink = FileSink::createNew(*env, outFileName,
00731                                          fileSinkBufferSize, oneFilePerFrame);
00732         }
00733         subsession->sink = fileSink;
00734         if (subsession->sink == NULL) {
00735           *env << "Failed to create FileSink for \"" << outFileName
00736                   << "\": " << env->getResultMsg() << "\n";
00737         } else {
00738           if (singleMedium == NULL) {
00739             *env << "Created output file: \"" << outFileName << "\"\n";
00740           } else {
00741             *env << "Outputting data from the \"" << subsession->mediumName()
00742                         << "/" << subsession->codecName()
00743                         << "\" subsession to 'stdout'\n";
00744           }
00745 
00746           if (strcmp(subsession->mediumName(), "video") == 0 &&
00747               strcmp(subsession->codecName(), "MP4V-ES") == 0 &&
00748               subsession->fmtp_config() != NULL) {
00749             // For MPEG-4 video RTP streams, the 'config' information
00750             // from the SDP description contains useful VOL etc. headers.
00751             // Insert this data at the front of the output file:
00752             unsigned configLen;
00753             unsigned char* configData
00754               = parseGeneralConfigStr(subsession->fmtp_config(), configLen);
00755             struct timeval timeNow;
00756             gettimeofday(&timeNow, NULL);
00757             fileSink->addData(configData, configLen, timeNow);
00758             delete[] configData;
00759           }
00760 
00761           subsession->sink->startPlaying(*(subsession->readSource()),
00762                                          subsessionAfterPlaying,
00763                                          subsession);
00764 
00765           // Also set a handler to be called if a RTCP "BYE" arrives
00766           // for this subsession:
00767           if (subsession->rtcpInstance() != NULL) {
00768             subsession->rtcpInstance()->setByeHandler(subsessionByeHandler,
00769                                                       subsession);
00770           }
00771 
00772           madeProgress = True;
00773         }
00774       }
00775       if (!madeProgress) shutdown();
00776     }
00777   }
00778 
00779   // Finally, start playing each subsession, to start the data flow:
00780   if (duration == 0) {
00781     if (scale > 0) duration = session->playEndTime() - initialSeekTime; // use SDP end time
00782     else if (scale < 0) duration = initialSeekTime;
00783   }
00784   if (duration < 0) duration = 0.0;
00785 
00786   endTime = initialSeekTime;
00787   if (scale > 0) {
00788     if (duration <= 0) endTime = -1.0f;
00789     else endTime = initialSeekTime + duration;
00790   } else {
00791     endTime = initialSeekTime - duration;
00792     if (endTime < 0) endTime = 0.0f;
00793   }
00794 
00795   startPlayingSession(session, initialSeekTime, endTime, scale, continueAfterPLAY);
00796 }
00797 
00798 void continueAfterPLAY(RTSPClient*, int resultCode, char* resultString) {
00799   if (resultCode != 0) {
00800     *env << "Failed to start playing session: " << resultString << "\n";
00801     shutdown();
00802   } else {
00803     *env << "Started playing session\n";
00804   }
00805 
00806   if (qosMeasurementIntervalMS > 0) {
00807     // Begin periodic QOS measurements:
00808     beginQOSMeasurement();
00809   }
00810 
00811   // Figure out how long to delay (if at all) before shutting down, or
00812   // repeating the playing
00813   Boolean timerIsBeingUsed = False;
00814   double secondsToDelay = duration;
00815   if (duration > 0) {
00816     timerIsBeingUsed = True;
00817     double absScale = scale > 0 ? scale : -scale; // ASSERT: scale != 0
00818     secondsToDelay = duration/absScale + durationSlop;
00819 
00820     int64_t uSecsToDelay = (int64_t)(secondsToDelay*1000000.0);
00821     sessionTimerTask = env->taskScheduler().scheduleDelayedTask(uSecsToDelay, (TaskFunc*)sessionTimerHandler, (void*)NULL);
00822   }
00823 
00824   char const* actionString
00825     = createReceivers? "Receiving streamed data":"Data is being streamed";
00826   if (timerIsBeingUsed) {
00827     *env << actionString
00828                 << " (for up to " << secondsToDelay
00829                 << " seconds)...\n";
00830   } else {
00831 #ifdef USE_SIGNALS
00832     pid_t ourPid = getpid();
00833     *env << actionString
00834                 << " (signal with \"kill -HUP " << (int)ourPid
00835                 << "\" or \"kill -USR1 " << (int)ourPid
00836                 << "\" to terminate)...\n";
00837 #else
00838     *env << actionString << "...\n";
00839 #endif
00840   }
00841 
00842   // Watch for incoming packets (if desired):
00843   checkForPacketArrival(NULL);
00844   checkInterPacketGaps(NULL);
00845 }
00846 
00847 void closeMediaSinks() {
00848   Medium::close(qtOut);
00849   Medium::close(aviOut);
00850 
00851   if (session == NULL) return;
00852   MediaSubsessionIterator iter(*session);
00853   MediaSubsession* subsession;
00854   while ((subsession = iter.next()) != NULL) {
00855     Medium::close(subsession->sink);
00856     subsession->sink = NULL;
00857   }
00858 }
00859 
00860 void subsessionAfterPlaying(void* clientData) {
00861   // Begin by closing this media subsession's stream:
00862   MediaSubsession* subsession = (MediaSubsession*)clientData;
00863   Medium::close(subsession->sink);
00864   subsession->sink = NULL;
00865 
00866   // Next, check whether *all* subsessions' streams have now been closed:
00867   MediaSession& session = subsession->parentSession();
00868   MediaSubsessionIterator iter(session);
00869   while ((subsession = iter.next()) != NULL) {
00870     if (subsession->sink != NULL) return; // this subsession is still active
00871   }
00872 
00873   // All subsessions' streams have now been closed
00874   sessionAfterPlaying();
00875 }
00876 
00877 void subsessionByeHandler(void* clientData) {
00878   struct timeval timeNow;
00879   gettimeofday(&timeNow, NULL);
00880   unsigned secsDiff = timeNow.tv_sec - startTime.tv_sec;
00881 
00882   MediaSubsession* subsession = (MediaSubsession*)clientData;
00883   *env << "Received RTCP \"BYE\" on \"" << subsession->mediumName()
00884         << "/" << subsession->codecName()
00885         << "\" subsession (after " << secsDiff
00886         << " seconds)\n";
00887 
00888   // Act now as if the subsession had closed:
00889   subsessionAfterPlaying(subsession);
00890 }
00891 
00892 void sessionAfterPlaying(void* /*clientData*/) {
00893   if (!playContinuously) {
00894     shutdown(0);
00895   } else {
00896     // We've been asked to play the stream(s) over again:
00897     startPlayingSession(session, initialSeekTime, endTime, scale, continueAfterPLAY);
00898   }
00899 }
00900 
00901 void sessionTimerHandler(void* /*clientData*/) {
00902   sessionTimerTask = NULL;
00903 
00904   sessionAfterPlaying();
00905 }
00906 
00907 class qosMeasurementRecord {
00908 public:
00909   qosMeasurementRecord(struct timeval const& startTime, RTPSource* src)
00910     : fSource(src), fNext(NULL),
00911       kbits_per_second_min(1e20), kbits_per_second_max(0),
00912       kBytesTotal(0.0),
00913       packet_loss_fraction_min(1.0), packet_loss_fraction_max(0.0),
00914       totNumPacketsReceived(0), totNumPacketsExpected(0) {
00915     measurementEndTime = measurementStartTime = startTime;
00916 
00917     RTPReceptionStatsDB::Iterator statsIter(src->receptionStatsDB());
00918     // Assume that there's only one SSRC source (usually the case):
00919     RTPReceptionStats* stats = statsIter.next(True);
00920     if (stats != NULL) {
00921       kBytesTotal = stats->totNumKBytesReceived();
00922       totNumPacketsReceived = stats->totNumPacketsReceived();
00923       totNumPacketsExpected = stats->totNumPacketsExpected();
00924     }
00925   }
00926   virtual ~qosMeasurementRecord() { delete fNext; }
00927 
00928   void periodicQOSMeasurement(struct timeval const& timeNow);
00929 
00930 public:
00931   RTPSource* fSource;
00932   qosMeasurementRecord* fNext;
00933 
00934 public:
00935   struct timeval measurementStartTime, measurementEndTime;
00936   double kbits_per_second_min, kbits_per_second_max;
00937   double kBytesTotal;
00938   double packet_loss_fraction_min, packet_loss_fraction_max;
00939   unsigned totNumPacketsReceived, totNumPacketsExpected;
00940 };
00941 
00942 static qosMeasurementRecord* qosRecordHead = NULL;
00943 
00944 static void periodicQOSMeasurement(void* clientData); // forward
00945 
00946 static unsigned nextQOSMeasurementUSecs;
00947 
00948 static void scheduleNextQOSMeasurement() {
00949   nextQOSMeasurementUSecs += qosMeasurementIntervalMS*1000;
00950   struct timeval timeNow;
00951   gettimeofday(&timeNow, NULL);
00952   unsigned timeNowUSecs = timeNow.tv_sec*1000000 + timeNow.tv_usec;
00953   unsigned usecsToDelay = nextQOSMeasurementUSecs - timeNowUSecs;
00954      // Note: This works even when nextQOSMeasurementUSecs wraps around
00955 
00956   qosMeasurementTimerTask = env->taskScheduler().scheduleDelayedTask(
00957      usecsToDelay, (TaskFunc*)periodicQOSMeasurement, (void*)NULL);
00958 }
00959 
00960 static void periodicQOSMeasurement(void* /*clientData*/) {
00961   struct timeval timeNow;
00962   gettimeofday(&timeNow, NULL);
00963 
00964   for (qosMeasurementRecord* qosRecord = qosRecordHead;
00965        qosRecord != NULL; qosRecord = qosRecord->fNext) {
00966     qosRecord->periodicQOSMeasurement(timeNow);
00967   }
00968 
00969   // Do this again later:
00970   scheduleNextQOSMeasurement();
00971 }
00972 
00973 void qosMeasurementRecord
00974 ::periodicQOSMeasurement(struct timeval const& timeNow) {
00975   unsigned secsDiff = timeNow.tv_sec - measurementEndTime.tv_sec;
00976   int usecsDiff = timeNow.tv_usec - measurementEndTime.tv_usec;
00977   double timeDiff = secsDiff + usecsDiff/1000000.0;
00978   measurementEndTime = timeNow;
00979 
00980   RTPReceptionStatsDB::Iterator statsIter(fSource->receptionStatsDB());
00981   // Assume that there's only one SSRC source (usually the case):
00982   RTPReceptionStats* stats = statsIter.next(True);
00983   if (stats != NULL) {
00984     double kBytesTotalNow = stats->totNumKBytesReceived();
00985     double kBytesDeltaNow = kBytesTotalNow - kBytesTotal;
00986     kBytesTotal = kBytesTotalNow;
00987 
00988     double kbpsNow = timeDiff == 0.0 ? 0.0 : 8*kBytesDeltaNow/timeDiff;
00989     if (kbpsNow < 0.0) kbpsNow = 0.0; // in case of roundoff error
00990     if (kbpsNow < kbits_per_second_min) kbits_per_second_min = kbpsNow;
00991     if (kbpsNow > kbits_per_second_max) kbits_per_second_max = kbpsNow;
00992 
00993     unsigned totReceivedNow = stats->totNumPacketsReceived();
00994     unsigned totExpectedNow = stats->totNumPacketsExpected();
00995     unsigned deltaReceivedNow = totReceivedNow - totNumPacketsReceived;
00996     unsigned deltaExpectedNow = totExpectedNow - totNumPacketsExpected;
00997     totNumPacketsReceived = totReceivedNow;
00998     totNumPacketsExpected = totExpectedNow;
00999 
01000     double lossFractionNow = deltaExpectedNow == 0 ? 0.0
01001       : 1.0 - deltaReceivedNow/(double)deltaExpectedNow;
01002     //if (lossFractionNow < 0.0) lossFractionNow = 0.0; //reordering can cause
01003     if (lossFractionNow < packet_loss_fraction_min) {
01004       packet_loss_fraction_min = lossFractionNow;
01005     }
01006     if (lossFractionNow > packet_loss_fraction_max) {
01007       packet_loss_fraction_max = lossFractionNow;
01008     }
01009   }
01010 }
01011 
01012 void beginQOSMeasurement() {
01013   // Set up a measurement record for each active subsession:
01014   struct timeval startTime;
01015   gettimeofday(&startTime, NULL);
01016   nextQOSMeasurementUSecs = startTime.tv_sec*1000000 + startTime.tv_usec;
01017   qosMeasurementRecord* qosRecordTail = NULL;
01018   MediaSubsessionIterator iter(*session);
01019   MediaSubsession* subsession;
01020   while ((subsession = iter.next()) != NULL) {
01021     RTPSource* src = subsession->rtpSource();
01022     if (src == NULL) continue;
01023 
01024     qosMeasurementRecord* qosRecord
01025       = new qosMeasurementRecord(startTime, src);
01026     if (qosRecordHead == NULL) qosRecordHead = qosRecord;
01027     if (qosRecordTail != NULL) qosRecordTail->fNext = qosRecord;
01028     qosRecordTail  = qosRecord;
01029   }
01030 
01031   // Then schedule the first of the periodic measurements:
01032   scheduleNextQOSMeasurement();
01033 }
01034 
01035 void printQOSData(int exitCode) {
01036   *env << "begin_QOS_statistics\n";
01037   
01038   // Print out stats for each active subsession:
01039   qosMeasurementRecord* curQOSRecord = qosRecordHead;
01040   if (session != NULL) {
01041     MediaSubsessionIterator iter(*session);
01042     MediaSubsession* subsession;
01043     while ((subsession = iter.next()) != NULL) {
01044       RTPSource* src = subsession->rtpSource();
01045       if (src == NULL) continue;
01046       
01047       *env << "subsession\t" << subsession->mediumName()
01048            << "/" << subsession->codecName() << "\n";
01049       
01050       unsigned numPacketsReceived = 0, numPacketsExpected = 0;
01051       
01052       if (curQOSRecord != NULL) {
01053         numPacketsReceived = curQOSRecord->totNumPacketsReceived;
01054         numPacketsExpected = curQOSRecord->totNumPacketsExpected;
01055       }
01056       *env << "num_packets_received\t" << numPacketsReceived << "\n";
01057       *env << "num_packets_lost\t" << int(numPacketsExpected - numPacketsReceived) << "\n";
01058       
01059       if (curQOSRecord != NULL) {
01060         unsigned secsDiff = curQOSRecord->measurementEndTime.tv_sec
01061           - curQOSRecord->measurementStartTime.tv_sec;
01062         int usecsDiff = curQOSRecord->measurementEndTime.tv_usec
01063           - curQOSRecord->measurementStartTime.tv_usec;
01064         double measurementTime = secsDiff + usecsDiff/1000000.0;
01065         *env << "elapsed_measurement_time\t" << measurementTime << "\n";
01066         
01067         *env << "kBytes_received_total\t" << curQOSRecord->kBytesTotal << "\n";
01068         
01069         *env << "measurement_sampling_interval_ms\t" << qosMeasurementIntervalMS << "\n";
01070         
01071         if (curQOSRecord->kbits_per_second_max == 0) {
01072           // special case: we didn't receive any data:
01073           *env <<
01074             "kbits_per_second_min\tunavailable\n"
01075             "kbits_per_second_ave\tunavailable\n"
01076             "kbits_per_second_max\tunavailable\n";
01077         } else {
01078           *env << "kbits_per_second_min\t" << curQOSRecord->kbits_per_second_min << "\n";
01079           *env << "kbits_per_second_ave\t"
01080                << (measurementTime == 0.0 ? 0.0 : 8*curQOSRecord->kBytesTotal/measurementTime) << "\n";
01081           *env << "kbits_per_second_max\t" << curQOSRecord->kbits_per_second_max << "\n";
01082         }
01083         
01084         *env << "packet_loss_percentage_min\t" << 100*curQOSRecord->packet_loss_fraction_min << "\n";
01085         double packetLossFraction = numPacketsExpected == 0 ? 1.0
01086           : 1.0 - numPacketsReceived/(double)numPacketsExpected;
01087         if (packetLossFraction < 0.0) packetLossFraction = 0.0;
01088         *env << "packet_loss_percentage_ave\t" << 100*packetLossFraction << "\n";
01089         *env << "packet_loss_percentage_max\t"
01090              << (packetLossFraction == 1.0 ? 100.0 : 100*curQOSRecord->packet_loss_fraction_max) << "\n";
01091         
01092         RTPReceptionStatsDB::Iterator statsIter(src->receptionStatsDB());
01093         // Assume that there's only one SSRC source (usually the case):
01094         RTPReceptionStats* stats = statsIter.next(True);
01095         if (stats != NULL) {
01096           *env << "inter_packet_gap_ms_min\t" << stats->minInterPacketGapUS()/1000.0 << "\n";
01097           struct timeval totalGaps = stats->totalInterPacketGaps();
01098           double totalGapsMS = totalGaps.tv_sec*1000.0 + totalGaps.tv_usec/1000.0;
01099           unsigned totNumPacketsReceived = stats->totNumPacketsReceived();
01100           *env << "inter_packet_gap_ms_ave\t"
01101                << (totNumPacketsReceived == 0 ? 0.0 : totalGapsMS/totNumPacketsReceived) << "\n";
01102           *env << "inter_packet_gap_ms_max\t" << stats->maxInterPacketGapUS()/1000.0 << "\n";
01103         }
01104         
01105         curQOSRecord = curQOSRecord->fNext;
01106       }
01107     }
01108   }
01109 
01110   *env << "end_QOS_statistics\n";
01111   delete qosRecordHead;
01112 }
01113 
01114 int shutdownExitCode;
01115 void shutdown(int exitCode) {
01116   shutdownExitCode = exitCode;
01117   if (env != NULL) {
01118     env->taskScheduler().unscheduleDelayedTask(sessionTimerTask);
01119     env->taskScheduler().unscheduleDelayedTask(arrivalCheckTimerTask);
01120     env->taskScheduler().unscheduleDelayedTask(interPacketGapCheckTimerTask);
01121     env->taskScheduler().unscheduleDelayedTask(qosMeasurementTimerTask);
01122   }
01123 
01124   if (qosMeasurementIntervalMS > 0) {
01125     printQOSData(exitCode);
01126   }
01127 
01128   // Teardown, then shutdown, any outstanding RTP/RTCP subsessions
01129   if (session != NULL) {
01130     tearDownSession(session, continueAfterTEARDOWN);
01131   } else {
01132     continueAfterTEARDOWN(NULL, 0, NULL);
01133   }
01134 }
01135 
01136 void continueAfterTEARDOWN(RTSPClient*, int /*resultCode*/, char* /*resultString*/) {
01137   // Now that we've stopped any more incoming data from arriving, close our output files:
01138   closeMediaSinks();
01139   Medium::close(session);
01140 
01141   // Finally, shut down our client:
01142   delete ourAuthenticator;
01143   Medium::close(ourClient);
01144 
01145   // Adios...
01146   exit(shutdownExitCode);
01147 }
01148 
01149 void signalHandlerShutdown(int /*sig*/) {
01150   *env << "Got shutdown signal\n";
01151   shutdown(0);
01152 }
01153 
01154 void checkForPacketArrival(void* /*clientData*/) {
01155   if (!notifyOnPacketArrival) return; // we're not checking
01156 
01157   // Check each subsession, to see whether it has received data packets:
01158   unsigned numSubsessionsChecked = 0;
01159   unsigned numSubsessionsWithReceivedData = 0;
01160   unsigned numSubsessionsThatHaveBeenSynced = 0;
01161 
01162   MediaSubsessionIterator iter(*session);
01163   MediaSubsession* subsession;
01164   while ((subsession = iter.next()) != NULL) {
01165     RTPSource* src = subsession->rtpSource();
01166     if (src == NULL) continue;
01167     ++numSubsessionsChecked;
01168 
01169     if (src->receptionStatsDB().numActiveSourcesSinceLastReset() > 0) {
01170       // At least one data packet has arrived
01171       ++numSubsessionsWithReceivedData;
01172     }
01173     if (src->hasBeenSynchronizedUsingRTCP()) {
01174       ++numSubsessionsThatHaveBeenSynced;
01175     }
01176   }
01177 
01178   unsigned numSubsessionsToCheck = numSubsessionsChecked;
01179   // Special case for "QuickTimeFileSink"s and "AVIFileSink"s:
01180   // They might not use all of the input sources:
01181   if (qtOut != NULL) {
01182     numSubsessionsToCheck = qtOut->numActiveSubsessions();
01183   } else if (aviOut != NULL) {
01184     numSubsessionsToCheck = aviOut->numActiveSubsessions();
01185   }
01186 
01187   Boolean notifyTheUser;
01188   if (!syncStreams) {
01189     notifyTheUser = numSubsessionsWithReceivedData > 0; // easy case
01190   } else {
01191     notifyTheUser = numSubsessionsWithReceivedData >= numSubsessionsToCheck
01192       && numSubsessionsThatHaveBeenSynced == numSubsessionsChecked;
01193     // Note: A subsession with no active sources is considered to be synced
01194   }
01195   if (notifyTheUser) {
01196     struct timeval timeNow;
01197     gettimeofday(&timeNow, NULL);
01198         char timestampStr[100];
01199         sprintf(timestampStr, "%ld%03ld", timeNow.tv_sec, (long)(timeNow.tv_usec/1000));
01200     *env << (syncStreams ? "Synchronized d" : "D")
01201                 << "ata packets have begun arriving [" << timestampStr << "]\007\n";
01202     return;
01203   }
01204 
01205   // No luck, so reschedule this check again, after a delay:
01206   int uSecsToDelay = 100000; // 100 ms
01207   arrivalCheckTimerTask
01208     = env->taskScheduler().scheduleDelayedTask(uSecsToDelay,
01209                                (TaskFunc*)checkForPacketArrival, NULL);
01210 }
01211 
01212 void checkInterPacketGaps(void* /*clientData*/) {
01213   if (interPacketGapMaxTime == 0) return; // we're not checking
01214 
01215   // Check each subsession, counting up how many packets have been received:
01216   unsigned newTotNumPacketsReceived = 0;
01217 
01218   MediaSubsessionIterator iter(*session);
01219   MediaSubsession* subsession;
01220   while ((subsession = iter.next()) != NULL) {
01221     RTPSource* src = subsession->rtpSource();
01222     if (src == NULL) continue;
01223     newTotNumPacketsReceived += src->receptionStatsDB().totNumPacketsReceived();
01224   }
01225 
01226   if (newTotNumPacketsReceived == totNumPacketsReceived) {
01227     // No additional packets have been received since the last time we
01228     // checked, so end this stream:
01229     *env << "Closing session, because we stopped receiving packets.\n";
01230     interPacketGapCheckTimerTask = NULL;
01231     sessionAfterPlaying();
01232   } else {
01233     totNumPacketsReceived = newTotNumPacketsReceived;
01234     // Check again, after the specified delay:
01235     interPacketGapCheckTimerTask
01236       = env->taskScheduler().scheduleDelayedTask(interPacketGapMaxTime*1000000,
01237                                  (TaskFunc*)checkInterPacketGaps, NULL);
01238   }
01239 }

Generated on Fri Sep 3 02:35:42 2010 for live by  doxygen 1.5.2