00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include "playCommon.hh"
00021 #include "BasicUsageEnvironment.hh"
00022 #include "GroupsockHelper.hh"
00023
00024 #if defined(__WIN32__) || defined(_WIN32)
00025 #define snprintf _snprintf
00026 #else
00027 #include <signal.h>
00028 #define USE_SIGNALS 1
00029 #endif
00030
00031
00032 void continueAfterOPTIONS(RTSPClient* client, int resultCode, char* resultString);
00033 void continueAfterDESCRIBE(RTSPClient* client, int resultCode, char* resultString);
00034 void continueAfterSETUP(RTSPClient* client, int resultCode, char* resultString);
00035 void continueAfterPLAY(RTSPClient* client, int resultCode, char* resultString);
00036 void continueAfterTEARDOWN(RTSPClient* client, int resultCode, char* resultString);
00037
00038 void setupStreams();
00039 void closeMediaSinks();
00040 void subsessionAfterPlaying(void* clientData);
00041 void subsessionByeHandler(void* clientData);
00042 void sessionAfterPlaying(void* clientData = NULL);
00043 void sessionTimerHandler(void* clientData);
00044 void shutdown(int exitCode = 1);
00045 void signalHandlerShutdown(int sig);
00046 void checkForPacketArrival(void* clientData);
00047 void checkInterPacketGaps(void* clientData);
00048 void beginQOSMeasurement();
00049
00050 char const* progName;
00051 UsageEnvironment* env;
00052 Medium* ourClient = NULL;
00053 Authenticator* ourAuthenticator = NULL;
00054 char const* streamURL = NULL;
00055 MediaSession* session = NULL;
00056 TaskToken sessionTimerTask = NULL;
00057 TaskToken arrivalCheckTimerTask = NULL;
00058 TaskToken interPacketGapCheckTimerTask = NULL;
00059 TaskToken qosMeasurementTimerTask = NULL;
00060 Boolean createReceivers = True;
00061 Boolean outputQuickTimeFile = False;
00062 Boolean generateMP4Format = False;
00063 QuickTimeFileSink* qtOut = NULL;
00064 Boolean outputAVIFile = False;
00065 AVIFileSink* aviOut = NULL;
00066 Boolean audioOnly = False;
00067 Boolean videoOnly = False;
00068 char const* singleMedium = NULL;
00069 int verbosityLevel = 1;
00070 double duration = 0;
00071 double durationSlop = -1.0;
00072 double initialSeekTime = 0.0f;
00073 float scale = 1.0f;
00074 double endTime;
00075 unsigned interPacketGapMaxTime = 0;
00076 unsigned totNumPacketsReceived = ~0;
00077 Boolean playContinuously = False;
00078 int simpleRTPoffsetArg = -1;
00079 Boolean sendOptionsRequest = True;
00080 Boolean sendOptionsRequestOnly = False;
00081 Boolean oneFilePerFrame = False;
00082 Boolean notifyOnPacketArrival = False;
00083 Boolean streamUsingTCP = False;
00084 unsigned short desiredPortNum = 0;
00085 portNumBits tunnelOverHTTPPortNum = 0;
00086 char* username = NULL;
00087 char* password = NULL;
00088 char* proxyServerName = NULL;
00089 unsigned short proxyServerPortNum = 0;
00090 unsigned char desiredAudioRTPPayloadFormat = 0;
00091 char* mimeSubtype = NULL;
00092 unsigned short movieWidth = 240;
00093 Boolean movieWidthOptionSet = False;
00094 unsigned short movieHeight = 180;
00095 Boolean movieHeightOptionSet = False;
00096 unsigned movieFPS = 15;
00097 Boolean movieFPSOptionSet = False;
00098 char const* fileNamePrefix = "";
00099 unsigned fileSinkBufferSize = 100000;
00100 unsigned socketInputBufferSize = 0;
00101 Boolean packetLossCompensate = False;
00102 Boolean syncStreams = False;
00103 Boolean generateHintTracks = False;
00104 unsigned qosMeasurementIntervalMS = 0;
00105
00106 struct timeval startTime;
00107
00108 void usage() {
00109 *env << "Usage: " << progName
00110 << " [-p <startPortNum>] [-r|-q|-4|-i] [-a|-v] [-V] [-d <duration>] [-D <max-inter-packet-gap-time> [-c] [-S <offset>] [-n] [-O]"
00111 << (controlConnectionUsesTCP ? " [-t|-T <http-port>]" : "")
00112 << " [-u <username> <password>"
00113 << (allowProxyServers ? " [<proxy-server> [<proxy-server-port>]]" : "")
00114 << "]" << (supportCodecSelection ? " [-A <audio-codec-rtp-payload-format-code>|-M <mime-subtype-name>]" : "")
00115 << " [-s <initial-seek-time>] [-z <scale>]"
00116 << " [-w <width> -h <height>] [-f <frames-per-second>] [-y] [-H] [-Q [<measurement-interval>]] [-F <filename-prefix>] [-b <file-sink-buffer-size>] [-B <input-socket-buffer-size>] [-I <input-interface-ip-address>] [-m] <url> (or " << progName << " -o [-V] <url>)\n";
00117 shutdown();
00118 }
00119
00120 int main(int argc, char** argv) {
00121
00122 TaskScheduler* scheduler = BasicTaskScheduler::createNew();
00123 env = BasicUsageEnvironment::createNew(*scheduler);
00124
00125 progName = argv[0];
00126
00127 gettimeofday(&startTime, NULL);
00128
00129 #ifdef USE_SIGNALS
00130
00131 signal(SIGHUP, signalHandlerShutdown);
00132 signal(SIGUSR1, signalHandlerShutdown);
00133 #endif
00134
00135
00136 while (argc > 2) {
00137 char* const opt = argv[1];
00138 if (opt[0] != '-') usage();
00139 switch (opt[1]) {
00140
00141 case 'p': {
00142 int portArg;
00143 if (sscanf(argv[2], "%d", &portArg) != 1) {
00144 usage();
00145 }
00146 if (portArg <= 0 || portArg >= 65536 || portArg&1) {
00147 *env << "bad port number: " << portArg
00148 << " (must be even, and in the range (0,65536))\n";
00149 usage();
00150 }
00151 desiredPortNum = (unsigned short)portArg;
00152 ++argv; --argc;
00153 break;
00154 }
00155
00156 case 'r': {
00157 createReceivers = False;
00158 break;
00159 }
00160
00161 case 'q': {
00162 outputQuickTimeFile = True;
00163 break;
00164 }
00165
00166 case '4': {
00167 outputQuickTimeFile = True;
00168 generateMP4Format = True;
00169 break;
00170 }
00171
00172 case 'i': {
00173 outputAVIFile = True;
00174 break;
00175 }
00176
00177 case 'I': {
00178 NetAddressList addresses(argv[2]);
00179 if (addresses.numAddresses() == 0) {
00180 *env << "Failed to find network address for \"" << argv[2] << "\"";
00181 break;
00182 }
00183 ReceivingInterfaceAddr = *(unsigned*)(addresses.firstAddress()->data());
00184 ++argv; --argc;
00185 break;
00186 }
00187
00188 case 'a': {
00189 audioOnly = True;
00190 singleMedium = "audio";
00191 break;
00192 }
00193
00194 case 'v': {
00195 videoOnly = True;
00196 singleMedium = "video";
00197 break;
00198 }
00199
00200 case 'V': {
00201 verbosityLevel = 0;
00202 break;
00203 }
00204
00205 case 'd': {
00206 float arg;
00207 if (sscanf(argv[2], "%g", &arg) != 1) {
00208 usage();
00209 }
00210 if (argv[2][0] == '-') {
00211
00212 duration = 0;
00213 durationSlop = -arg;
00214 } else {
00215 duration = arg;
00216 durationSlop = 0;
00217 }
00218 ++argv; --argc;
00219 break;
00220 }
00221
00222 case 'D': {
00223 if (sscanf(argv[2], "%u", &interPacketGapMaxTime) != 1) {
00224 usage();
00225 }
00226 ++argv; --argc;
00227 break;
00228 }
00229
00230 case 'c': {
00231 playContinuously = True;
00232 break;
00233 }
00234
00235 case 'S': {
00236 if (sscanf(argv[2], "%d", &simpleRTPoffsetArg) != 1) {
00237 usage();
00238 }
00239 if (simpleRTPoffsetArg < 0) {
00240 *env << "offset argument to \"-S\" must be >= 0\n";
00241 usage();
00242 }
00243 ++argv; --argc;
00244 break;
00245 }
00246
00247 case 'O': {
00248 sendOptionsRequest = False;
00249 break;
00250 }
00251
00252 case 'o': {
00253 sendOptionsRequestOnly = True;
00254 break;
00255 }
00256
00257 case 'm': {
00258 oneFilePerFrame = True;
00259 break;
00260 }
00261
00262 case 'n': {
00263 notifyOnPacketArrival = True;
00264 break;
00265 }
00266
00267 case 't': {
00268
00269 if (controlConnectionUsesTCP) {
00270 streamUsingTCP = True;
00271 } else {
00272 usage();
00273 }
00274 break;
00275 }
00276
00277 case 'T': {
00278
00279 if (controlConnectionUsesTCP) {
00280 if (argc > 3 && argv[2][0] != '-') {
00281
00282 if (sscanf(argv[2], "%hu", &tunnelOverHTTPPortNum) == 1
00283 && tunnelOverHTTPPortNum > 0) {
00284 ++argv; --argc;
00285 break;
00286 }
00287 }
00288 }
00289
00290
00291 usage();
00292 break;
00293 }
00294
00295 case 'u': {
00296 username = argv[2];
00297 password = argv[3];
00298 argv+=2; argc-=2;
00299 if (allowProxyServers && argc > 3 && argv[2][0] != '-') {
00300
00301 proxyServerName = argv[2];
00302 ++argv; --argc;
00303
00304 if (argc > 3 && argv[2][0] != '-') {
00305
00306 if (sscanf(argv[2], "%hu", &proxyServerPortNum) != 1) {
00307 usage();
00308 }
00309 ++argv; --argc;
00310 }
00311 }
00312
00313 ourAuthenticator = new Authenticator;
00314 ourAuthenticator->setUsernameAndPassword(username, password);
00315 break;
00316 }
00317
00318 case 'A': {
00319 unsigned formatArg;
00320 if (sscanf(argv[2], "%u", &formatArg) != 1
00321 || formatArg >= 96) {
00322 usage();
00323 }
00324 desiredAudioRTPPayloadFormat = (unsigned char)formatArg;
00325 ++argv; --argc;
00326 break;
00327 }
00328
00329 case 'M': {
00330 mimeSubtype = argv[2];
00331 if (desiredAudioRTPPayloadFormat==0) desiredAudioRTPPayloadFormat =96;
00332 ++argv; --argc;
00333 break;
00334 }
00335
00336 case 'w': {
00337 if (sscanf(argv[2], "%hu", &movieWidth) != 1) {
00338 usage();
00339 }
00340 movieWidthOptionSet = True;
00341 ++argv; --argc;
00342 break;
00343 }
00344
00345 case 'h': {
00346 if (sscanf(argv[2], "%hu", &movieHeight) != 1) {
00347 usage();
00348 }
00349 movieHeightOptionSet = True;
00350 ++argv; --argc;
00351 break;
00352 }
00353
00354 case 'f': {
00355 if (sscanf(argv[2], "%u", &movieFPS) != 1) {
00356 usage();
00357 }
00358 movieFPSOptionSet = True;
00359 ++argv; --argc;
00360 break;
00361 }
00362
00363 case 'F': {
00364 fileNamePrefix = argv[2];
00365 ++argv; --argc;
00366 break;
00367 }
00368
00369 case 'b': {
00370 if (sscanf(argv[2], "%u", &fileSinkBufferSize) != 1) {
00371 usage();
00372 }
00373 ++argv; --argc;
00374 break;
00375 }
00376
00377 case 'B': {
00378 if (sscanf(argv[2], "%u", &socketInputBufferSize) != 1) {
00379 usage();
00380 }
00381 ++argv; --argc;
00382 break;
00383 }
00384
00385
00386 case 'l': {
00387 packetLossCompensate = True;
00388 break;
00389 }
00390
00391 case 'y': {
00392 syncStreams = True;
00393 break;
00394 }
00395
00396 case 'H': {
00397 generateHintTracks = True;
00398 break;
00399 }
00400
00401 case 'Q': {
00402 qosMeasurementIntervalMS = 1000;
00403
00404 if (argc > 3 && argv[2][0] != '-') {
00405
00406
00407 if (sscanf(argv[2], "%u", &qosMeasurementIntervalMS) != 1) {
00408 usage();
00409 }
00410 qosMeasurementIntervalMS *= 100;
00411 ++argv; --argc;
00412 }
00413 break;
00414 }
00415
00416 case 's': {
00417 double arg;
00418 if (sscanf(argv[2], "%lg", &arg) != 1 || arg < 0) {
00419 usage();
00420 }
00421 initialSeekTime = arg;
00422 ++argv; --argc;
00423 break;
00424 }
00425
00426 case 'z': {
00427 float arg;
00428 if (sscanf(argv[2], "%g", &arg) != 1 || arg == 0.0f) {
00429 usage();
00430 }
00431 scale = arg;
00432 ++argv; --argc;
00433 break;
00434 }
00435
00436 default: {
00437 usage();
00438 break;
00439 }
00440 }
00441
00442 ++argv; --argc;
00443 }
00444 if (argc != 2) usage();
00445 if (outputQuickTimeFile && outputAVIFile) {
00446 *env << "The -i and -q (or -4) flags cannot both be used!\n";
00447 usage();
00448 }
00449 Boolean outputCompositeFile = outputQuickTimeFile || outputAVIFile;
00450 if (!createReceivers && outputCompositeFile) {
00451 *env << "The -r and -q (or -4 or -i) flags cannot both be used!\n";
00452 usage();
00453 }
00454 if (outputCompositeFile && !movieWidthOptionSet) {
00455 *env << "Warning: The -q, -4 or -i option was used, but not -w. Assuming a video width of "
00456 << movieWidth << " pixels\n";
00457 }
00458 if (outputCompositeFile && !movieHeightOptionSet) {
00459 *env << "Warning: The -q, -4 or -i option was used, but not -h. Assuming a video height of "
00460 << movieHeight << " pixels\n";
00461 }
00462 if (outputCompositeFile && !movieFPSOptionSet) {
00463 *env << "Warning: The -q, -4 or -i option was used, but not -f. Assuming a video frame rate of "
00464 << movieFPS << " frames-per-second\n";
00465 }
00466 if (audioOnly && videoOnly) {
00467 *env << "The -a and -v flags cannot both be used!\n";
00468 usage();
00469 }
00470 if (sendOptionsRequestOnly && !sendOptionsRequest) {
00471 *env << "The -o and -O flags cannot both be used!\n";
00472 usage();
00473 }
00474 if (tunnelOverHTTPPortNum > 0) {
00475 if (streamUsingTCP) {
00476 *env << "The -t and -T flags cannot both be used!\n";
00477 usage();
00478 } else {
00479 streamUsingTCP = True;
00480 }
00481 }
00482 if (!createReceivers && notifyOnPacketArrival) {
00483 *env << "Warning: Because we're not receiving stream data, the -n flag has no effect\n";
00484 }
00485 if (durationSlop < 0) {
00486
00487
00488
00489 durationSlop = qosMeasurementIntervalMS > 0 ? 0.0 : 5.0;
00490 }
00491
00492 streamURL = argv[1];
00493
00494
00495 ourClient = createClient(*env, streamURL, verbosityLevel, progName);
00496 if (ourClient == NULL) {
00497 *env << "Failed to create " << clientProtocolName
00498 << " client: " << env->getResultMsg() << "\n";
00499 shutdown();
00500 }
00501
00502 if (sendOptionsRequest) {
00503
00504 getOptions(continueAfterOPTIONS);
00505 } else {
00506 continueAfterOPTIONS(NULL, 0, NULL);
00507 }
00508
00509
00510 env->taskScheduler().doEventLoop();
00511
00512 return 0;
00513 }
00514
00515 void continueAfterOPTIONS(RTSPClient*, int resultCode, char* resultString) {
00516 if (sendOptionsRequestOnly) {
00517 if (resultCode != 0) {
00518 *env << clientProtocolName << " \"OPTIONS\" request failed: " << resultString << "\n";
00519 } else {
00520 *env << clientProtocolName << " \"OPTIONS\" request returned: " << resultString << "\n";
00521 }
00522 shutdown();
00523 }
00524 delete[] resultString;
00525
00526
00527 getSDPDescription(continueAfterDESCRIBE);
00528 }
00529
00530 void continueAfterDESCRIBE(RTSPClient*, int resultCode, char* resultString) {
00531 if (resultCode != 0) {
00532 *env << "Failed to get a SDP description from URL \"" << streamURL << "\": " << resultString << "\n";
00533 shutdown();
00534 }
00535
00536 char* sdpDescription = resultString;
00537 *env << "Opened URL \"" << streamURL << "\", returning a SDP description:\n" << sdpDescription << "\n";
00538
00539
00540 session = MediaSession::createNew(*env, sdpDescription);
00541 delete[] sdpDescription;
00542 if (session == NULL) {
00543 *env << "Failed to create a MediaSession object from the SDP description: " << env->getResultMsg() << "\n";
00544 shutdown();
00545 } else if (!session->hasSubsessions()) {
00546 *env << "This session has no media subsessions (i.e., \"m=\" lines)\n";
00547 shutdown();
00548 }
00549
00550
00551 MediaSubsessionIterator iter(*session);
00552 MediaSubsession *subsession;
00553 Boolean madeProgress = False;
00554 char const* singleMediumToTest = singleMedium;
00555 while ((subsession = iter.next()) != NULL) {
00556
00557 if (singleMediumToTest != NULL) {
00558 if (strcmp(subsession->mediumName(), singleMediumToTest) != 0) {
00559 *env << "Ignoring \"" << subsession->mediumName()
00560 << "/" << subsession->codecName()
00561 << "\" subsession, because we've asked to receive a single " << singleMedium
00562 << " session only\n";
00563 continue;
00564 } else {
00565
00566 singleMediumToTest = "xxxxx";
00567
00568 }
00569 }
00570
00571 if (desiredPortNum != 0) {
00572 subsession->setClientPortNum(desiredPortNum);
00573 desiredPortNum += 2;
00574 }
00575
00576 if (createReceivers) {
00577 if (!subsession->initiate(simpleRTPoffsetArg)) {
00578 *env << "Unable to create receiver for \"" << subsession->mediumName()
00579 << "/" << subsession->codecName()
00580 << "\" subsession: " << env->getResultMsg() << "\n";
00581 } else {
00582 *env << "Created receiver for \"" << subsession->mediumName()
00583 << "/" << subsession->codecName()
00584 << "\" subsession (client ports " << subsession->clientPortNum()
00585 << "-" << subsession->clientPortNum()+1 << ")\n";
00586 madeProgress = True;
00587
00588 if (subsession->rtpSource() != NULL) {
00589
00590
00591
00592 unsigned const thresh = 1000000;
00593 subsession->rtpSource()->setPacketReorderingThresholdTime(thresh);
00594
00595
00596
00597
00598
00599 int socketNum = subsession->rtpSource()->RTPgs()->socketNum();
00600 unsigned curBufferSize = getReceiveBufferSize(*env, socketNum);
00601 if (socketInputBufferSize > 0 || fileSinkBufferSize > curBufferSize) {
00602 unsigned newBufferSize = socketInputBufferSize > 0 ? socketInputBufferSize : fileSinkBufferSize;
00603 newBufferSize = setReceiveBufferTo(*env, socketNum, newBufferSize);
00604 if (socketInputBufferSize > 0) {
00605 *env << "Changed socket receive buffer size for the \""
00606 << subsession->mediumName()
00607 << "/" << subsession->codecName()
00608 << "\" subsession from "
00609 << curBufferSize << " to "
00610 << newBufferSize << " bytes\n";
00611 }
00612 }
00613 }
00614 }
00615 } else {
00616 if (subsession->clientPortNum() == 0) {
00617 *env << "No client port was specified for the \""
00618 << subsession->mediumName()
00619 << "/" << subsession->codecName()
00620 << "\" subsession. (Try adding the \"-p <portNum>\" option.)\n";
00621 } else {
00622 madeProgress = True;
00623 }
00624 }
00625 }
00626 if (!madeProgress) shutdown();
00627
00628
00629 setupStreams();
00630 }
00631
00632 MediaSubsession *subsession;
00633 Boolean madeProgress = False;
00634 void continueAfterSETUP(RTSPClient*, int resultCode, char* resultString) {
00635 if (resultCode == 0) {
00636 *env << "Setup \"" << subsession->mediumName()
00637 << "/" << subsession->codecName()
00638 << "\" subsession (client ports " << subsession->clientPortNum()
00639 << "-" << subsession->clientPortNum()+1 << ")\n";
00640 madeProgress = True;
00641 } else {
00642 *env << "Failed to setup \"" << subsession->mediumName()
00643 << "/" << subsession->codecName()
00644 << "\" subsession: " << env->getResultMsg() << "\n";
00645 }
00646
00647
00648 setupStreams();
00649 }
00650
00651 void setupStreams() {
00652 static MediaSubsessionIterator* setupIter = NULL;
00653 if (setupIter == NULL) setupIter = new MediaSubsessionIterator(*session);
00654 while ((subsession = setupIter->next()) != NULL) {
00655
00656 if (subsession->clientPortNum() == 0) continue;
00657
00658 setupSubsession(subsession, streamUsingTCP, continueAfterSETUP);
00659 return;
00660 }
00661
00662
00663 delete setupIter;
00664 if (!madeProgress) shutdown();
00665
00666
00667 if (createReceivers) {
00668 if (outputQuickTimeFile) {
00669
00670 qtOut = QuickTimeFileSink::createNew(*env, *session, "stdout",
00671 fileSinkBufferSize,
00672 movieWidth, movieHeight,
00673 movieFPS,
00674 packetLossCompensate,
00675 syncStreams,
00676 generateHintTracks,
00677 generateMP4Format);
00678 if (qtOut == NULL) {
00679 *env << "Failed to create QuickTime file sink for stdout: " << env->getResultMsg();
00680 shutdown();
00681 }
00682
00683 qtOut->startPlaying(sessionAfterPlaying, NULL);
00684 } else if (outputAVIFile) {
00685
00686 aviOut = AVIFileSink::createNew(*env, *session, "stdout",
00687 fileSinkBufferSize,
00688 movieWidth, movieHeight,
00689 movieFPS,
00690 packetLossCompensate);
00691 if (aviOut == NULL) {
00692 *env << "Failed to create AVI file sink for stdout: " << env->getResultMsg();
00693 shutdown();
00694 }
00695
00696 aviOut->startPlaying(sessionAfterPlaying, NULL);
00697 } else {
00698
00699 madeProgress = False;
00700 MediaSubsessionIterator iter(*session);
00701 while ((subsession = iter.next()) != NULL) {
00702 if (subsession->readSource() == NULL) continue;
00703
00704
00705 char outFileName[1000];
00706 if (singleMedium == NULL) {
00707
00708
00709 static unsigned streamCounter = 0;
00710 snprintf(outFileName, sizeof outFileName, "%s%s-%s-%d",
00711 fileNamePrefix, subsession->mediumName(),
00712 subsession->codecName(), ++streamCounter);
00713 } else {
00714 sprintf(outFileName, "stdout");
00715 }
00716 FileSink* fileSink;
00717 if (strcmp(subsession->mediumName(), "audio") == 0 &&
00718 (strcmp(subsession->codecName(), "AMR") == 0 ||
00719 strcmp(subsession->codecName(), "AMR-WB") == 0)) {
00720
00721 fileSink = AMRAudioFileSink::createNew(*env, outFileName,
00722 fileSinkBufferSize, oneFilePerFrame);
00723 } else if (strcmp(subsession->mediumName(), "video") == 0 &&
00724 (strcmp(subsession->codecName(), "H264") == 0)) {
00725
00726 fileSink = H264VideoFileSink::createNew(*env, outFileName,
00727 fileSinkBufferSize, oneFilePerFrame);
00728 } else {
00729
00730 fileSink = FileSink::createNew(*env, outFileName,
00731 fileSinkBufferSize, oneFilePerFrame);
00732 }
00733 subsession->sink = fileSink;
00734 if (subsession->sink == NULL) {
00735 *env << "Failed to create FileSink for \"" << outFileName
00736 << "\": " << env->getResultMsg() << "\n";
00737 } else {
00738 if (singleMedium == NULL) {
00739 *env << "Created output file: \"" << outFileName << "\"\n";
00740 } else {
00741 *env << "Outputting data from the \"" << subsession->mediumName()
00742 << "/" << subsession->codecName()
00743 << "\" subsession to 'stdout'\n";
00744 }
00745
00746 if (strcmp(subsession->mediumName(), "video") == 0 &&
00747 strcmp(subsession->codecName(), "MP4V-ES") == 0 &&
00748 subsession->fmtp_config() != NULL) {
00749
00750
00751
00752 unsigned configLen;
00753 unsigned char* configData
00754 = parseGeneralConfigStr(subsession->fmtp_config(), configLen);
00755 struct timeval timeNow;
00756 gettimeofday(&timeNow, NULL);
00757 fileSink->addData(configData, configLen, timeNow);
00758 delete[] configData;
00759 }
00760
00761 subsession->sink->startPlaying(*(subsession->readSource()),
00762 subsessionAfterPlaying,
00763 subsession);
00764
00765
00766
00767 if (subsession->rtcpInstance() != NULL) {
00768 subsession->rtcpInstance()->setByeHandler(subsessionByeHandler,
00769 subsession);
00770 }
00771
00772 madeProgress = True;
00773 }
00774 }
00775 if (!madeProgress) shutdown();
00776 }
00777 }
00778
00779
00780 if (duration == 0) {
00781 if (scale > 0) duration = session->playEndTime() - initialSeekTime;
00782 else if (scale < 0) duration = initialSeekTime;
00783 }
00784 if (duration < 0) duration = 0.0;
00785
00786 endTime = initialSeekTime;
00787 if (scale > 0) {
00788 if (duration <= 0) endTime = -1.0f;
00789 else endTime = initialSeekTime + duration;
00790 } else {
00791 endTime = initialSeekTime - duration;
00792 if (endTime < 0) endTime = 0.0f;
00793 }
00794
00795 startPlayingSession(session, initialSeekTime, endTime, scale, continueAfterPLAY);
00796 }
00797
00798 void continueAfterPLAY(RTSPClient*, int resultCode, char* resultString) {
00799 if (resultCode != 0) {
00800 *env << "Failed to start playing session: " << resultString << "\n";
00801 shutdown();
00802 } else {
00803 *env << "Started playing session\n";
00804 }
00805
00806 if (qosMeasurementIntervalMS > 0) {
00807
00808 beginQOSMeasurement();
00809 }
00810
00811
00812
00813 Boolean timerIsBeingUsed = False;
00814 double secondsToDelay = duration;
00815 if (duration > 0) {
00816 timerIsBeingUsed = True;
00817 double absScale = scale > 0 ? scale : -scale;
00818 secondsToDelay = duration/absScale + durationSlop;
00819
00820 int64_t uSecsToDelay = (int64_t)(secondsToDelay*1000000.0);
00821 sessionTimerTask = env->taskScheduler().scheduleDelayedTask(uSecsToDelay, (TaskFunc*)sessionTimerHandler, (void*)NULL);
00822 }
00823
00824 char const* actionString
00825 = createReceivers? "Receiving streamed data":"Data is being streamed";
00826 if (timerIsBeingUsed) {
00827 *env << actionString
00828 << " (for up to " << secondsToDelay
00829 << " seconds)...\n";
00830 } else {
00831 #ifdef USE_SIGNALS
00832 pid_t ourPid = getpid();
00833 *env << actionString
00834 << " (signal with \"kill -HUP " << (int)ourPid
00835 << "\" or \"kill -USR1 " << (int)ourPid
00836 << "\" to terminate)...\n";
00837 #else
00838 *env << actionString << "...\n";
00839 #endif
00840 }
00841
00842
00843 checkForPacketArrival(NULL);
00844 checkInterPacketGaps(NULL);
00845 }
00846
00847 void closeMediaSinks() {
00848 Medium::close(qtOut);
00849 Medium::close(aviOut);
00850
00851 if (session == NULL) return;
00852 MediaSubsessionIterator iter(*session);
00853 MediaSubsession* subsession;
00854 while ((subsession = iter.next()) != NULL) {
00855 Medium::close(subsession->sink);
00856 subsession->sink = NULL;
00857 }
00858 }
00859
00860 void subsessionAfterPlaying(void* clientData) {
00861
00862 MediaSubsession* subsession = (MediaSubsession*)clientData;
00863 Medium::close(subsession->sink);
00864 subsession->sink = NULL;
00865
00866
00867 MediaSession& session = subsession->parentSession();
00868 MediaSubsessionIterator iter(session);
00869 while ((subsession = iter.next()) != NULL) {
00870 if (subsession->sink != NULL) return;
00871 }
00872
00873
00874 sessionAfterPlaying();
00875 }
00876
00877 void subsessionByeHandler(void* clientData) {
00878 struct timeval timeNow;
00879 gettimeofday(&timeNow, NULL);
00880 unsigned secsDiff = timeNow.tv_sec - startTime.tv_sec;
00881
00882 MediaSubsession* subsession = (MediaSubsession*)clientData;
00883 *env << "Received RTCP \"BYE\" on \"" << subsession->mediumName()
00884 << "/" << subsession->codecName()
00885 << "\" subsession (after " << secsDiff
00886 << " seconds)\n";
00887
00888
00889 subsessionAfterPlaying(subsession);
00890 }
00891
00892 void sessionAfterPlaying(void* ) {
00893 if (!playContinuously) {
00894 shutdown(0);
00895 } else {
00896
00897 startPlayingSession(session, initialSeekTime, endTime, scale, continueAfterPLAY);
00898 }
00899 }
00900
00901 void sessionTimerHandler(void* ) {
00902 sessionTimerTask = NULL;
00903
00904 sessionAfterPlaying();
00905 }
00906
00907 class qosMeasurementRecord {
00908 public:
00909 qosMeasurementRecord(struct timeval const& startTime, RTPSource* src)
00910 : fSource(src), fNext(NULL),
00911 kbits_per_second_min(1e20), kbits_per_second_max(0),
00912 kBytesTotal(0.0),
00913 packet_loss_fraction_min(1.0), packet_loss_fraction_max(0.0),
00914 totNumPacketsReceived(0), totNumPacketsExpected(0) {
00915 measurementEndTime = measurementStartTime = startTime;
00916
00917 RTPReceptionStatsDB::Iterator statsIter(src->receptionStatsDB());
00918
00919 RTPReceptionStats* stats = statsIter.next(True);
00920 if (stats != NULL) {
00921 kBytesTotal = stats->totNumKBytesReceived();
00922 totNumPacketsReceived = stats->totNumPacketsReceived();
00923 totNumPacketsExpected = stats->totNumPacketsExpected();
00924 }
00925 }
00926 virtual ~qosMeasurementRecord() { delete fNext; }
00927
00928 void periodicQOSMeasurement(struct timeval const& timeNow);
00929
00930 public:
00931 RTPSource* fSource;
00932 qosMeasurementRecord* fNext;
00933
00934 public:
00935 struct timeval measurementStartTime, measurementEndTime;
00936 double kbits_per_second_min, kbits_per_second_max;
00937 double kBytesTotal;
00938 double packet_loss_fraction_min, packet_loss_fraction_max;
00939 unsigned totNumPacketsReceived, totNumPacketsExpected;
00940 };
00941
00942 static qosMeasurementRecord* qosRecordHead = NULL;
00943
00944 static void periodicQOSMeasurement(void* clientData);
00945
00946 static unsigned nextQOSMeasurementUSecs;
00947
00948 static void scheduleNextQOSMeasurement() {
00949 nextQOSMeasurementUSecs += qosMeasurementIntervalMS*1000;
00950 struct timeval timeNow;
00951 gettimeofday(&timeNow, NULL);
00952 unsigned timeNowUSecs = timeNow.tv_sec*1000000 + timeNow.tv_usec;
00953 unsigned usecsToDelay = nextQOSMeasurementUSecs - timeNowUSecs;
00954
00955
00956 qosMeasurementTimerTask = env->taskScheduler().scheduleDelayedTask(
00957 usecsToDelay, (TaskFunc*)periodicQOSMeasurement, (void*)NULL);
00958 }
00959
00960 static void periodicQOSMeasurement(void* ) {
00961 struct timeval timeNow;
00962 gettimeofday(&timeNow, NULL);
00963
00964 for (qosMeasurementRecord* qosRecord = qosRecordHead;
00965 qosRecord != NULL; qosRecord = qosRecord->fNext) {
00966 qosRecord->periodicQOSMeasurement(timeNow);
00967 }
00968
00969
00970 scheduleNextQOSMeasurement();
00971 }
00972
00973 void qosMeasurementRecord
00974 ::periodicQOSMeasurement(struct timeval const& timeNow) {
00975 unsigned secsDiff = timeNow.tv_sec - measurementEndTime.tv_sec;
00976 int usecsDiff = timeNow.tv_usec - measurementEndTime.tv_usec;
00977 double timeDiff = secsDiff + usecsDiff/1000000.0;
00978 measurementEndTime = timeNow;
00979
00980 RTPReceptionStatsDB::Iterator statsIter(fSource->receptionStatsDB());
00981
00982 RTPReceptionStats* stats = statsIter.next(True);
00983 if (stats != NULL) {
00984 double kBytesTotalNow = stats->totNumKBytesReceived();
00985 double kBytesDeltaNow = kBytesTotalNow - kBytesTotal;
00986 kBytesTotal = kBytesTotalNow;
00987
00988 double kbpsNow = timeDiff == 0.0 ? 0.0 : 8*kBytesDeltaNow/timeDiff;
00989 if (kbpsNow < 0.0) kbpsNow = 0.0;
00990 if (kbpsNow < kbits_per_second_min) kbits_per_second_min = kbpsNow;
00991 if (kbpsNow > kbits_per_second_max) kbits_per_second_max = kbpsNow;
00992
00993 unsigned totReceivedNow = stats->totNumPacketsReceived();
00994 unsigned totExpectedNow = stats->totNumPacketsExpected();
00995 unsigned deltaReceivedNow = totReceivedNow - totNumPacketsReceived;
00996 unsigned deltaExpectedNow = totExpectedNow - totNumPacketsExpected;
00997 totNumPacketsReceived = totReceivedNow;
00998 totNumPacketsExpected = totExpectedNow;
00999
01000 double lossFractionNow = deltaExpectedNow == 0 ? 0.0
01001 : 1.0 - deltaReceivedNow/(double)deltaExpectedNow;
01002
01003 if (lossFractionNow < packet_loss_fraction_min) {
01004 packet_loss_fraction_min = lossFractionNow;
01005 }
01006 if (lossFractionNow > packet_loss_fraction_max) {
01007 packet_loss_fraction_max = lossFractionNow;
01008 }
01009 }
01010 }
01011
01012 void beginQOSMeasurement() {
01013
01014 struct timeval startTime;
01015 gettimeofday(&startTime, NULL);
01016 nextQOSMeasurementUSecs = startTime.tv_sec*1000000 + startTime.tv_usec;
01017 qosMeasurementRecord* qosRecordTail = NULL;
01018 MediaSubsessionIterator iter(*session);
01019 MediaSubsession* subsession;
01020 while ((subsession = iter.next()) != NULL) {
01021 RTPSource* src = subsession->rtpSource();
01022 if (src == NULL) continue;
01023
01024 qosMeasurementRecord* qosRecord
01025 = new qosMeasurementRecord(startTime, src);
01026 if (qosRecordHead == NULL) qosRecordHead = qosRecord;
01027 if (qosRecordTail != NULL) qosRecordTail->fNext = qosRecord;
01028 qosRecordTail = qosRecord;
01029 }
01030
01031
01032 scheduleNextQOSMeasurement();
01033 }
01034
01035 void printQOSData(int exitCode) {
01036 *env << "begin_QOS_statistics\n";
01037
01038
01039 qosMeasurementRecord* curQOSRecord = qosRecordHead;
01040 if (session != NULL) {
01041 MediaSubsessionIterator iter(*session);
01042 MediaSubsession* subsession;
01043 while ((subsession = iter.next()) != NULL) {
01044 RTPSource* src = subsession->rtpSource();
01045 if (src == NULL) continue;
01046
01047 *env << "subsession\t" << subsession->mediumName()
01048 << "/" << subsession->codecName() << "\n";
01049
01050 unsigned numPacketsReceived = 0, numPacketsExpected = 0;
01051
01052 if (curQOSRecord != NULL) {
01053 numPacketsReceived = curQOSRecord->totNumPacketsReceived;
01054 numPacketsExpected = curQOSRecord->totNumPacketsExpected;
01055 }
01056 *env << "num_packets_received\t" << numPacketsReceived << "\n";
01057 *env << "num_packets_lost\t" << int(numPacketsExpected - numPacketsReceived) << "\n";
01058
01059 if (curQOSRecord != NULL) {
01060 unsigned secsDiff = curQOSRecord->measurementEndTime.tv_sec
01061 - curQOSRecord->measurementStartTime.tv_sec;
01062 int usecsDiff = curQOSRecord->measurementEndTime.tv_usec
01063 - curQOSRecord->measurementStartTime.tv_usec;
01064 double measurementTime = secsDiff + usecsDiff/1000000.0;
01065 *env << "elapsed_measurement_time\t" << measurementTime << "\n";
01066
01067 *env << "kBytes_received_total\t" << curQOSRecord->kBytesTotal << "\n";
01068
01069 *env << "measurement_sampling_interval_ms\t" << qosMeasurementIntervalMS << "\n";
01070
01071 if (curQOSRecord->kbits_per_second_max == 0) {
01072
01073 *env <<
01074 "kbits_per_second_min\tunavailable\n"
01075 "kbits_per_second_ave\tunavailable\n"
01076 "kbits_per_second_max\tunavailable\n";
01077 } else {
01078 *env << "kbits_per_second_min\t" << curQOSRecord->kbits_per_second_min << "\n";
01079 *env << "kbits_per_second_ave\t"
01080 << (measurementTime == 0.0 ? 0.0 : 8*curQOSRecord->kBytesTotal/measurementTime) << "\n";
01081 *env << "kbits_per_second_max\t" << curQOSRecord->kbits_per_second_max << "\n";
01082 }
01083
01084 *env << "packet_loss_percentage_min\t" << 100*curQOSRecord->packet_loss_fraction_min << "\n";
01085 double packetLossFraction = numPacketsExpected == 0 ? 1.0
01086 : 1.0 - numPacketsReceived/(double)numPacketsExpected;
01087 if (packetLossFraction < 0.0) packetLossFraction = 0.0;
01088 *env << "packet_loss_percentage_ave\t" << 100*packetLossFraction << "\n";
01089 *env << "packet_loss_percentage_max\t"
01090 << (packetLossFraction == 1.0 ? 100.0 : 100*curQOSRecord->packet_loss_fraction_max) << "\n";
01091
01092 RTPReceptionStatsDB::Iterator statsIter(src->receptionStatsDB());
01093
01094 RTPReceptionStats* stats = statsIter.next(True);
01095 if (stats != NULL) {
01096 *env << "inter_packet_gap_ms_min\t" << stats->minInterPacketGapUS()/1000.0 << "\n";
01097 struct timeval totalGaps = stats->totalInterPacketGaps();
01098 double totalGapsMS = totalGaps.tv_sec*1000.0 + totalGaps.tv_usec/1000.0;
01099 unsigned totNumPacketsReceived = stats->totNumPacketsReceived();
01100 *env << "inter_packet_gap_ms_ave\t"
01101 << (totNumPacketsReceived == 0 ? 0.0 : totalGapsMS/totNumPacketsReceived) << "\n";
01102 *env << "inter_packet_gap_ms_max\t" << stats->maxInterPacketGapUS()/1000.0 << "\n";
01103 }
01104
01105 curQOSRecord = curQOSRecord->fNext;
01106 }
01107 }
01108 }
01109
01110 *env << "end_QOS_statistics\n";
01111 delete qosRecordHead;
01112 }
01113
01114 int shutdownExitCode;
01115 void shutdown(int exitCode) {
01116 shutdownExitCode = exitCode;
01117 if (env != NULL) {
01118 env->taskScheduler().unscheduleDelayedTask(sessionTimerTask);
01119 env->taskScheduler().unscheduleDelayedTask(arrivalCheckTimerTask);
01120 env->taskScheduler().unscheduleDelayedTask(interPacketGapCheckTimerTask);
01121 env->taskScheduler().unscheduleDelayedTask(qosMeasurementTimerTask);
01122 }
01123
01124 if (qosMeasurementIntervalMS > 0) {
01125 printQOSData(exitCode);
01126 }
01127
01128
01129 if (session != NULL) {
01130 tearDownSession(session, continueAfterTEARDOWN);
01131 } else {
01132 continueAfterTEARDOWN(NULL, 0, NULL);
01133 }
01134 }
01135
01136 void continueAfterTEARDOWN(RTSPClient*, int , char* ) {
01137
01138 closeMediaSinks();
01139 Medium::close(session);
01140
01141
01142 delete ourAuthenticator;
01143 Medium::close(ourClient);
01144
01145
01146 exit(shutdownExitCode);
01147 }
01148
01149 void signalHandlerShutdown(int ) {
01150 *env << "Got shutdown signal\n";
01151 shutdown(0);
01152 }
01153
01154 void checkForPacketArrival(void* ) {
01155 if (!notifyOnPacketArrival) return;
01156
01157
01158 unsigned numSubsessionsChecked = 0;
01159 unsigned numSubsessionsWithReceivedData = 0;
01160 unsigned numSubsessionsThatHaveBeenSynced = 0;
01161
01162 MediaSubsessionIterator iter(*session);
01163 MediaSubsession* subsession;
01164 while ((subsession = iter.next()) != NULL) {
01165 RTPSource* src = subsession->rtpSource();
01166 if (src == NULL) continue;
01167 ++numSubsessionsChecked;
01168
01169 if (src->receptionStatsDB().numActiveSourcesSinceLastReset() > 0) {
01170
01171 ++numSubsessionsWithReceivedData;
01172 }
01173 if (src->hasBeenSynchronizedUsingRTCP()) {
01174 ++numSubsessionsThatHaveBeenSynced;
01175 }
01176 }
01177
01178 unsigned numSubsessionsToCheck = numSubsessionsChecked;
01179
01180
01181 if (qtOut != NULL) {
01182 numSubsessionsToCheck = qtOut->numActiveSubsessions();
01183 } else if (aviOut != NULL) {
01184 numSubsessionsToCheck = aviOut->numActiveSubsessions();
01185 }
01186
01187 Boolean notifyTheUser;
01188 if (!syncStreams) {
01189 notifyTheUser = numSubsessionsWithReceivedData > 0;
01190 } else {
01191 notifyTheUser = numSubsessionsWithReceivedData >= numSubsessionsToCheck
01192 && numSubsessionsThatHaveBeenSynced == numSubsessionsChecked;
01193
01194 }
01195 if (notifyTheUser) {
01196 struct timeval timeNow;
01197 gettimeofday(&timeNow, NULL);
01198 char timestampStr[100];
01199 sprintf(timestampStr, "%ld%03ld", timeNow.tv_sec, (long)(timeNow.tv_usec/1000));
01200 *env << (syncStreams ? "Synchronized d" : "D")
01201 << "ata packets have begun arriving [" << timestampStr << "]\007\n";
01202 return;
01203 }
01204
01205
01206 int uSecsToDelay = 100000;
01207 arrivalCheckTimerTask
01208 = env->taskScheduler().scheduleDelayedTask(uSecsToDelay,
01209 (TaskFunc*)checkForPacketArrival, NULL);
01210 }
01211
01212 void checkInterPacketGaps(void* ) {
01213 if (interPacketGapMaxTime == 0) return;
01214
01215
01216 unsigned newTotNumPacketsReceived = 0;
01217
01218 MediaSubsessionIterator iter(*session);
01219 MediaSubsession* subsession;
01220 while ((subsession = iter.next()) != NULL) {
01221 RTPSource* src = subsession->rtpSource();
01222 if (src == NULL) continue;
01223 newTotNumPacketsReceived += src->receptionStatsDB().totNumPacketsReceived();
01224 }
01225
01226 if (newTotNumPacketsReceived == totNumPacketsReceived) {
01227
01228
01229 *env << "Closing session, because we stopped receiving packets.\n";
01230 interPacketGapCheckTimerTask = NULL;
01231 sessionAfterPlaying();
01232 } else {
01233 totNumPacketsReceived = newTotNumPacketsReceived;
01234
01235 interPacketGapCheckTimerTask
01236 = env->taskScheduler().scheduleDelayedTask(interPacketGapMaxTime*1000000,
01237 (TaskFunc*)checkInterPacketGaps, NULL);
01238 }
01239 }