00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "OnDemandServerMediaSubsession.hh"
00023 #include "RTCP.hh"
00024 #include "BasicUDPSink.hh"
00025 #include <GroupsockHelper.hh>
00026
00027 OnDemandServerMediaSubsession
00028 ::OnDemandServerMediaSubsession(UsageEnvironment& env,
00029 Boolean reuseFirstSource,
00030 portNumBits initialPortNum)
00031 : ServerMediaSubsession(env),
00032 fSDPLines(NULL), fReuseFirstSource(reuseFirstSource), fInitialPortNum(initialPortNum), fLastStreamToken(NULL) {
00033 fDestinationsHashTable = HashTable::create(ONE_WORD_HASH_KEYS);
00034 gethostname(fCNAME, sizeof fCNAME);
00035 fCNAME[sizeof fCNAME-1] = '\0';
00036 }
00037
00038 class Destinations {
00039 public:
00040 Destinations(struct in_addr const& destAddr,
00041 Port const& rtpDestPort,
00042 Port const& rtcpDestPort)
00043 : isTCP(False), addr(destAddr), rtpPort(rtpDestPort), rtcpPort(rtcpDestPort) {
00044 }
00045 Destinations(int tcpSockNum, unsigned char rtpChanId, unsigned char rtcpChanId)
00046 : isTCP(True), rtpPort(0) , rtcpPort(0) ,
00047 tcpSocketNum(tcpSockNum), rtpChannelId(rtpChanId), rtcpChannelId(rtcpChanId) {
00048 }
00049
00050 public:
00051 Boolean isTCP;
00052 struct in_addr addr;
00053 Port rtpPort;
00054 Port rtcpPort;
00055 int tcpSocketNum;
00056 unsigned char rtpChannelId, rtcpChannelId;
00057 };
00058
00059 OnDemandServerMediaSubsession::~OnDemandServerMediaSubsession() {
00060 delete[] fSDPLines;
00061
00062
00063 while (1) {
00064 Destinations* destinations
00065 = (Destinations*)(fDestinationsHashTable->RemoveNext());
00066 if (destinations == NULL) break;
00067 delete destinations;
00068 }
00069 delete fDestinationsHashTable;
00070 }
00071
00072 char const*
00073 OnDemandServerMediaSubsession::sdpLines() {
00074 if (fSDPLines == NULL) {
00075
00076
00077
00078
00079 unsigned estBitrate;
00080 FramedSource* inputSource = createNewStreamSource(0, estBitrate);
00081 if (inputSource == NULL) return NULL;
00082
00083 struct in_addr dummyAddr;
00084 dummyAddr.s_addr = 0;
00085 Groupsock dummyGroupsock(envir(), dummyAddr, 0, 0);
00086 unsigned char rtpPayloadType = 96 + trackNumber()-1;
00087 RTPSink* dummyRTPSink
00088 = createNewRTPSink(&dummyGroupsock, rtpPayloadType, inputSource);
00089
00090 setSDPLinesFromRTPSink(dummyRTPSink, inputSource, estBitrate);
00091 Medium::close(dummyRTPSink);
00092 closeStreamSource(inputSource);
00093 }
00094
00095 return fSDPLines;
00096 }
00097
00098
00099 class StreamState {
00100 public:
00101 StreamState(OnDemandServerMediaSubsession& master,
00102 Port const& serverRTPPort, Port const& serverRTCPPort,
00103 RTPSink* rtpSink, BasicUDPSink* udpSink,
00104 unsigned totalBW, FramedSource* mediaSource,
00105 Groupsock* rtpGS, Groupsock* rtcpGS);
00106 virtual ~StreamState();
00107
00108 void startPlaying(Destinations* destinations,
00109 TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData,
00110 ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,
00111 void* serverRequestAlternativeByteHandlerClientData);
00112 void pause();
00113 void endPlaying(Destinations* destinations);
00114 void reclaim();
00115
00116 unsigned& referenceCount() { return fReferenceCount; }
00117
00118 Port const& serverRTPPort() const { return fServerRTPPort; }
00119 Port const& serverRTCPPort() const { return fServerRTCPPort; }
00120
00121 RTPSink* rtpSink() const { return fRTPSink; }
00122
00123 float streamDuration() const { return fStreamDuration; }
00124
00125 FramedSource* mediaSource() const { return fMediaSource; }
00126
00127 private:
00128 OnDemandServerMediaSubsession& fMaster;
00129 Boolean fAreCurrentlyPlaying;
00130 unsigned fReferenceCount;
00131
00132 Port fServerRTPPort, fServerRTCPPort;
00133
00134 RTPSink* fRTPSink;
00135 BasicUDPSink* fUDPSink;
00136
00137 float fStreamDuration;
00138 unsigned fTotalBW; RTCPInstance* fRTCPInstance;
00139
00140 FramedSource* fMediaSource;
00141
00142 Groupsock* fRTPgs; Groupsock* fRTCPgs;
00143 };
00144
00145 void OnDemandServerMediaSubsession
00146 ::getStreamParameters(unsigned clientSessionId,
00147 netAddressBits clientAddress,
00148 Port const& clientRTPPort,
00149 Port const& clientRTCPPort,
00150 int tcpSocketNum,
00151 unsigned char rtpChannelId,
00152 unsigned char rtcpChannelId,
00153 netAddressBits& destinationAddress,
00154 u_int8_t& ,
00155 Boolean& isMulticast,
00156 Port& serverRTPPort,
00157 Port& serverRTCPPort,
00158 void*& streamToken) {
00159 if (destinationAddress == 0) destinationAddress = clientAddress;
00160 struct in_addr destinationAddr; destinationAddr.s_addr = destinationAddress;
00161 isMulticast = False;
00162
00163 if (fLastStreamToken != NULL && fReuseFirstSource) {
00164
00165
00166 serverRTPPort = ((StreamState*)fLastStreamToken)->serverRTPPort();
00167 serverRTCPPort = ((StreamState*)fLastStreamToken)->serverRTCPPort();
00168 ++((StreamState*)fLastStreamToken)->referenceCount();
00169 streamToken = fLastStreamToken;
00170 } else {
00171
00172 unsigned streamBitrate;
00173 FramedSource* mediaSource
00174 = createNewStreamSource(clientSessionId, streamBitrate);
00175
00176
00177
00178 RTPSink* rtpSink;
00179 BasicUDPSink* udpSink;
00180 Groupsock* rtpGroupsock;
00181 Groupsock* rtcpGroupsock;
00182 portNumBits serverPortNum;
00183 if (clientRTCPPort.num() == 0) {
00184
00185 NoReuse dummy;
00186 for (serverPortNum = fInitialPortNum; ; ++serverPortNum) {
00187 struct in_addr dummyAddr; dummyAddr.s_addr = 0;
00188
00189 serverRTPPort = serverPortNum;
00190 rtpGroupsock = new Groupsock(envir(), dummyAddr, serverRTPPort, 255);
00191 if (rtpGroupsock->socketNum() >= 0) break;
00192 }
00193
00194 rtcpGroupsock = NULL;
00195 rtpSink = NULL;
00196 udpSink = BasicUDPSink::createNew(envir(), rtpGroupsock);
00197 } else {
00198
00199
00200 NoReuse dummy;
00201 for (portNumBits serverPortNum = fInitialPortNum; ; serverPortNum += 2) {
00202 struct in_addr dummyAddr; dummyAddr.s_addr = 0;
00203
00204 serverRTPPort = serverPortNum;
00205 rtpGroupsock = new Groupsock(envir(), dummyAddr, serverRTPPort, 255);
00206 if (rtpGroupsock->socketNum() < 0) {
00207 delete rtpGroupsock;
00208 continue;
00209 }
00210
00211 serverRTCPPort = serverPortNum+1;
00212 rtcpGroupsock = new Groupsock(envir(), dummyAddr, serverRTCPPort, 255);
00213 if (rtcpGroupsock->socketNum() < 0) {
00214 delete rtpGroupsock;
00215 delete rtcpGroupsock;
00216 continue;
00217 }
00218
00219 break;
00220 }
00221
00222 unsigned char rtpPayloadType = 96 + trackNumber()-1;
00223 rtpSink = createNewRTPSink(rtpGroupsock, rtpPayloadType, mediaSource);
00224 udpSink = NULL;
00225 }
00226
00227
00228
00229 if (rtpGroupsock != NULL) rtpGroupsock->removeAllDestinations();
00230 if (rtcpGroupsock != NULL) rtcpGroupsock->removeAllDestinations();
00231
00232 if (rtpGroupsock != NULL) {
00233
00234
00235 unsigned rtpBufSize = streamBitrate * 25 / 2;
00236 if (rtpBufSize < 50 * 1024) rtpBufSize = 50 * 1024;
00237 increaseSendBufferTo(envir(), rtpGroupsock->socketNum(), rtpBufSize);
00238 }
00239
00240
00241 streamToken = fLastStreamToken
00242 = new StreamState(*this, serverRTPPort, serverRTCPPort, rtpSink, udpSink,
00243 streamBitrate, mediaSource,
00244 rtpGroupsock, rtcpGroupsock);
00245 }
00246
00247
00248 Destinations* destinations;
00249 if (tcpSocketNum < 0) {
00250 destinations = new Destinations(destinationAddr, clientRTPPort, clientRTCPPort);
00251 } else {
00252 destinations = new Destinations(tcpSocketNum, rtpChannelId, rtcpChannelId);
00253 }
00254 fDestinationsHashTable->Add((char const*)clientSessionId, destinations);
00255 }
00256
00257 void OnDemandServerMediaSubsession::startStream(unsigned clientSessionId,
00258 void* streamToken,
00259 TaskFunc* rtcpRRHandler,
00260 void* rtcpRRHandlerClientData,
00261 unsigned short& rtpSeqNum,
00262 unsigned& rtpTimestamp,
00263 ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,
00264 void* serverRequestAlternativeByteHandlerClientData) {
00265 StreamState* streamState = (StreamState*)streamToken;
00266 Destinations* destinations
00267 = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId));
00268 if (streamState != NULL) {
00269 streamState->startPlaying(destinations,
00270 rtcpRRHandler, rtcpRRHandlerClientData,
00271 serverRequestAlternativeByteHandler, serverRequestAlternativeByteHandlerClientData);
00272 if (streamState->rtpSink() != NULL) {
00273 rtpSeqNum = streamState->rtpSink()->currentSeqNo();
00274 rtpTimestamp = streamState->rtpSink()->presetNextTimestamp();
00275 }
00276 }
00277 }
00278
00279 void OnDemandServerMediaSubsession::pauseStream(unsigned ,
00280 void* streamToken) {
00281
00282
00283 if (fReuseFirstSource) return;
00284
00285 StreamState* streamState = (StreamState*)streamToken;
00286 if (streamState != NULL) streamState->pause();
00287 }
00288
00289 void OnDemandServerMediaSubsession::seekStream(unsigned ,
00290 void* streamToken, double seekNPT) {
00291
00292
00293 if (fReuseFirstSource) return;
00294
00295 StreamState* streamState = (StreamState*)streamToken;
00296 if (streamState != NULL && streamState->mediaSource() != NULL) {
00297 seekStreamSource(streamState->mediaSource(), seekNPT);
00298 }
00299 }
00300
00301 void OnDemandServerMediaSubsession::setStreamScale(unsigned ,
00302 void* streamToken, float scale) {
00303
00304
00305 if (fReuseFirstSource) return;
00306
00307 StreamState* streamState = (StreamState*)streamToken;
00308 if (streamState != NULL && streamState->mediaSource() != NULL) {
00309 setStreamSourceScale(streamState->mediaSource(), scale);
00310 }
00311 }
00312
00313 void OnDemandServerMediaSubsession::deleteStream(unsigned clientSessionId,
00314 void*& streamToken) {
00315 StreamState* streamState = (StreamState*)streamToken;
00316
00317
00318 Destinations* destinations
00319 = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId));
00320 if (destinations != NULL) {
00321 fDestinationsHashTable->Remove((char const*)clientSessionId);
00322
00323
00324 if (streamState != NULL) streamState->endPlaying(destinations);
00325 }
00326
00327
00328 if (streamState != NULL) {
00329 if (streamState->referenceCount() > 0) --streamState->referenceCount();
00330 if (streamState->referenceCount() == 0) {
00331 delete streamState;
00332 if (fLastStreamToken == streamToken) fLastStreamToken = NULL;
00333 streamToken = NULL;
00334 }
00335 }
00336
00337
00338 delete destinations;
00339 }
00340
00341 char const* OnDemandServerMediaSubsession
00342 ::getAuxSDPLine(RTPSink* rtpSink, FramedSource* ) {
00343
00344 return rtpSink == NULL ? NULL : rtpSink->auxSDPLine();
00345 }
00346
00347 void OnDemandServerMediaSubsession::seekStreamSource(FramedSource* ,
00348 double ) {
00349
00350 }
00351
00352 void OnDemandServerMediaSubsession
00353 ::setStreamSourceScale(FramedSource* , float ) {
00354
00355 }
00356
00357 void OnDemandServerMediaSubsession::closeStreamSource(FramedSource *inputSource) {
00358 Medium::close(inputSource);
00359 }
00360
00361 void OnDemandServerMediaSubsession
00362 ::setSDPLinesFromRTPSink(RTPSink* rtpSink, FramedSource* inputSource, unsigned estBitrate) {
00363 if (rtpSink == NULL) return;
00364
00365 char const* mediaType = rtpSink->sdpMediaType();
00366 unsigned char rtpPayloadType = rtpSink->rtpPayloadType();
00367 struct in_addr serverAddrForSDP; serverAddrForSDP.s_addr = fServerAddressForSDP;
00368 char* const ipAddressStr = strDup(our_inet_ntoa(serverAddrForSDP));
00369 char* rtpmapLine = rtpSink->rtpmapLine();
00370 char const* rangeLine = rangeSDPLine();
00371 char const* auxSDPLine = getAuxSDPLine(rtpSink, inputSource);
00372 if (auxSDPLine == NULL) auxSDPLine = "";
00373
00374 char const* const sdpFmt =
00375 "m=%s %u RTP/AVP %d\r\n"
00376 "c=IN IP4 %s\r\n"
00377 "b=AS:%u\r\n"
00378 "%s"
00379 "%s"
00380 "%s"
00381 "a=control:%s\r\n";
00382 unsigned sdpFmtSize = strlen(sdpFmt)
00383 + strlen(mediaType) + 5 + 3
00384 + strlen(ipAddressStr)
00385 + 20
00386 + strlen(rtpmapLine)
00387 + strlen(rangeLine)
00388 + strlen(auxSDPLine)
00389 + strlen(trackId());
00390 char* sdpLines = new char[sdpFmtSize];
00391 sprintf(sdpLines, sdpFmt,
00392 mediaType,
00393 fPortNumForSDP,
00394 rtpPayloadType,
00395 ipAddressStr,
00396 estBitrate,
00397 rtpmapLine,
00398 rangeLine,
00399 auxSDPLine,
00400 trackId());
00401 delete[] (char*)rangeLine; delete[] rtpmapLine; delete[] ipAddressStr;
00402
00403 fSDPLines = strDup(sdpLines);
00404 delete[] sdpLines;
00405 }
00406
00407
00409
00410 static void afterPlayingStreamState(void* clientData) {
00411 StreamState* streamState = (StreamState*)clientData;
00412 if (streamState->streamDuration() == 0.0) {
00413
00414
00415
00416
00417 streamState->reclaim();
00418 }
00419
00420
00421
00422 }
00423
00424 StreamState::StreamState(OnDemandServerMediaSubsession& master,
00425 Port const& serverRTPPort, Port const& serverRTCPPort,
00426 RTPSink* rtpSink, BasicUDPSink* udpSink,
00427 unsigned totalBW, FramedSource* mediaSource,
00428 Groupsock* rtpGS, Groupsock* rtcpGS)
00429 : fMaster(master), fAreCurrentlyPlaying(False), fReferenceCount(1),
00430 fServerRTPPort(serverRTPPort), fServerRTCPPort(serverRTCPPort),
00431 fRTPSink(rtpSink), fUDPSink(udpSink), fStreamDuration(master.duration()),
00432 fTotalBW(totalBW), fRTCPInstance(NULL) ,
00433 fMediaSource(mediaSource), fRTPgs(rtpGS), fRTCPgs(rtcpGS) {
00434 }
00435
00436 StreamState::~StreamState() {
00437 reclaim();
00438 }
00439
00440 void StreamState
00441 ::startPlaying(Destinations* dests,
00442 TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData,
00443 ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,
00444 void* serverRequestAlternativeByteHandlerClientData) {
00445 if (dests == NULL) return;
00446 if (!fAreCurrentlyPlaying && fMediaSource != NULL) {
00447 if (fRTPSink != NULL) {
00448 fRTPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
00449 fAreCurrentlyPlaying = True;
00450 } else if (fUDPSink != NULL) {
00451 fUDPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
00452 fAreCurrentlyPlaying = True;
00453 }
00454 }
00455
00456 if (fRTCPInstance == NULL && fRTPSink != NULL) {
00457
00458 fRTCPInstance
00459 = RTCPInstance::createNew(fRTPSink->envir(), fRTCPgs,
00460 fTotalBW, (unsigned char*)fMaster.fCNAME,
00461 fRTPSink, NULL );
00462
00463 }
00464
00465 if (dests->isTCP) {
00466
00467 if (fRTPSink != NULL) {
00468 fRTPSink->addStreamSocket(dests->tcpSocketNum, dests->rtpChannelId);
00469 fRTPSink->setServerRequestAlternativeByteHandler(dests->tcpSocketNum, serverRequestAlternativeByteHandler, serverRequestAlternativeByteHandlerClientData);
00470 }
00471 if (fRTCPInstance != NULL) {
00472 fRTCPInstance->addStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId);
00473 fRTCPInstance->setSpecificRRHandler(dests->tcpSocketNum, dests->rtcpChannelId,
00474 rtcpRRHandler, rtcpRRHandlerClientData);
00475 }
00476 } else {
00477
00478
00479 if (fRTPgs != NULL) fRTPgs->addDestination(dests->addr, dests->rtpPort);
00480 if (fRTCPgs != NULL) fRTCPgs->addDestination(dests->addr, dests->rtcpPort);
00481 if (fRTCPInstance != NULL) {
00482 fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort,
00483 rtcpRRHandler, rtcpRRHandlerClientData);
00484 }
00485 }
00486 }
00487
00488 void StreamState::pause() {
00489 if (fRTPSink != NULL) fRTPSink->stopPlaying();
00490 if (fUDPSink != NULL) fUDPSink->stopPlaying();
00491 fAreCurrentlyPlaying = False;
00492 }
00493
00494 void StreamState::endPlaying(Destinations* dests) {
00495 if (dests->isTCP) {
00496 if (fRTPSink != NULL) {
00497 fRTPSink->removeStreamSocket(dests->tcpSocketNum, dests->rtpChannelId);
00498 }
00499 if (fRTCPInstance != NULL) {
00500 fRTCPInstance->removeStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId);
00501 fRTCPInstance->setSpecificRRHandler(dests->tcpSocketNum, dests->rtcpChannelId,
00502 NULL, NULL);
00503 }
00504 } else {
00505
00506 if (fRTPgs != NULL) fRTPgs->removeDestination(dests->addr, dests->rtpPort);
00507 if (fRTCPgs != NULL) fRTCPgs->removeDestination(dests->addr, dests->rtcpPort);
00508 if (fRTCPInstance != NULL) {
00509 fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort,
00510 NULL, NULL);
00511 }
00512 }
00513 }
00514
00515 void StreamState::reclaim() {
00516
00517 Medium::close(fRTCPInstance) ; fRTCPInstance = NULL;
00518 Medium::close(fRTPSink); fRTPSink = NULL;
00519 Medium::close(fUDPSink); fUDPSink = NULL;
00520
00521 fMaster.closeStreamSource(fMediaSource); fMediaSource = NULL;
00522
00523 delete fRTPgs; fRTPgs = NULL;
00524 delete fRTCPgs; fRTCPgs = NULL;
00525 }