00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "RTPInterface.hh"
00024 #include <GroupsockHelper.hh>
00025 #include <stdio.h>
00026
00028
00029
00030
00031
00032 static void sendRTPOverTCP(unsigned char* packet, unsigned packetSize,
00033 int socketNum, unsigned char streamChannelId);
00034
00035
00036
00037
00038
00039
00040 static HashTable* socketHashTable(UsageEnvironment& env, Boolean createIfNotPresent = True) {
00041 _Tables* ourTables = _Tables::getOurTables(env, createIfNotPresent);
00042 if (ourTables == NULL) return NULL;
00043
00044 if (ourTables->socketTable == NULL) {
00045
00046 ourTables->socketTable = HashTable::create(ONE_WORD_HASH_KEYS);
00047 }
00048 return (HashTable*)(ourTables->socketTable);
00049 }
00050
00051 class SocketDescriptor {
00052 public:
00053 SocketDescriptor(UsageEnvironment& env, int socketNum);
00054 virtual ~SocketDescriptor();
00055
00056 void registerRTPInterface(unsigned char streamChannelId,
00057 RTPInterface* rtpInterface);
00058 RTPInterface* lookupRTPInterface(unsigned char streamChannelId);
00059 void deregisterRTPInterface(unsigned char streamChannelId);
00060
00061 void setServerRequestAlternativeByteHandler(ServerRequestAlternativeByteHandler* handler, void* clientData) {
00062 fServerRequestAlternativeByteHandler = handler;
00063 fServerRequestAlternativeByteHandlerClientData = clientData;
00064 }
00065
00066 private:
00067 static void tcpReadHandler(SocketDescriptor*, int mask);
00068 void tcpReadHandler1(int mask);
00069
00070 private:
00071 UsageEnvironment& fEnv;
00072 int fOurSocketNum;
00073 HashTable* fSubChannelHashTable;
00074 ServerRequestAlternativeByteHandler* fServerRequestAlternativeByteHandler;
00075 void* fServerRequestAlternativeByteHandlerClientData;
00076 u_int8_t fStreamChannelId, fSizeByte1;
00077 enum { AWAITING_DOLLAR, AWAITING_STREAM_CHANNEL_ID, AWAITING_SIZE1, AWAITING_SIZE2, AWAITING_PACKET_DATA } fTCPReadingState;
00078 };
00079
00080 static SocketDescriptor* lookupSocketDescriptor(UsageEnvironment& env, int sockNum, Boolean createIfNotFound = True) {
00081 HashTable* table = socketHashTable(env, createIfNotFound);
00082 if (table == NULL) return NULL;
00083
00084 char const* key = (char const*)(long)sockNum;
00085 SocketDescriptor* socketDescriptor = (SocketDescriptor*)(table->Lookup(key));
00086 if (socketDescriptor == NULL && createIfNotFound) {
00087 socketDescriptor = new SocketDescriptor(env, sockNum);
00088 table->Add((char const*)(long)(sockNum), socketDescriptor);
00089 }
00090
00091 return socketDescriptor;
00092 }
00093
00094 static void removeSocketDescription(UsageEnvironment& env, int sockNum) {
00095 char const* key = (char const*)(long)sockNum;
00096 HashTable* table = socketHashTable(env);
00097 table->Remove(key);
00098
00099 if (table->IsEmpty()) {
00100
00101 _Tables* ourTables = _Tables::getOurTables(env);
00102 delete table;
00103 ourTables->socketTable = NULL;
00104 ourTables->reclaimIfPossible();
00105 }
00106 }
00107
00108
00110
00111 RTPInterface::RTPInterface(Medium* owner, Groupsock* gs)
00112 : fOwner(owner), fGS(gs),
00113 fTCPStreams(NULL),
00114 fNextTCPReadSize(0), fNextTCPReadStreamSocketNum(-1),
00115 fNextTCPReadStreamChannelId(0xFF), fReadHandlerProc(NULL),
00116 fAuxReadHandlerFunc(NULL), fAuxReadHandlerClientData(NULL) {
00117
00118
00119
00120
00121 makeSocketNonBlocking(fGS->socketNum());
00122 increaseSendBufferTo(envir(), fGS->socketNum(), 50*1024);
00123 }
00124
00125 RTPInterface::~RTPInterface() {
00126 delete fTCPStreams;
00127 }
00128
00129 void RTPInterface::setStreamSocket(int sockNum,
00130 unsigned char streamChannelId) {
00131 fGS->removeAllDestinations();
00132 addStreamSocket(sockNum, streamChannelId);
00133 }
00134
00135 void RTPInterface::addStreamSocket(int sockNum,
00136 unsigned char streamChannelId) {
00137 if (sockNum < 0) return;
00138
00139 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00140 streams = streams->fNext) {
00141 if (streams->fStreamSocketNum == sockNum
00142 && streams->fStreamChannelId == streamChannelId) {
00143 return;
00144 }
00145 }
00146
00147 fTCPStreams = new tcpStreamRecord(sockNum, streamChannelId, fTCPStreams);
00148 }
00149
00150 static void deregisterSocket(UsageEnvironment& env, int sockNum, unsigned char streamChannelId) {
00151 SocketDescriptor* socketDescriptor = lookupSocketDescriptor(env, sockNum, False);
00152 if (socketDescriptor != NULL) {
00153 socketDescriptor->deregisterRTPInterface(streamChannelId);
00154
00155
00156 }
00157 }
00158
00159 void RTPInterface::removeStreamSocket(int sockNum,
00160 unsigned char streamChannelId) {
00161 for (tcpStreamRecord** streamsPtr = &fTCPStreams; *streamsPtr != NULL;
00162 streamsPtr = &((*streamsPtr)->fNext)) {
00163 if ((*streamsPtr)->fStreamSocketNum == sockNum
00164 && (*streamsPtr)->fStreamChannelId == streamChannelId) {
00165 deregisterSocket(envir(), sockNum, streamChannelId);
00166
00167
00168 tcpStreamRecord* next = (*streamsPtr)->fNext;
00169 (*streamsPtr)->fNext = NULL;
00170 delete (*streamsPtr);
00171 *streamsPtr = next;
00172 return;
00173 }
00174 }
00175 }
00176
00177 void RTPInterface
00178 ::setServerRequestAlternativeByteHandler(int socketNum, ServerRequestAlternativeByteHandler* handler, void* clientData) {
00179 SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), socketNum);
00180
00181 if (socketDescriptor != NULL) socketDescriptor->setServerRequestAlternativeByteHandler(handler, clientData);
00182 }
00183
00184
00185 void RTPInterface::sendPacket(unsigned char* packet, unsigned packetSize) {
00186
00187 fGS->output(envir(), fGS->ttl(), packet, packetSize);
00188
00189
00190 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00191 streams = streams->fNext) {
00192 sendRTPOverTCP(packet, packetSize,
00193 streams->fStreamSocketNum, streams->fStreamChannelId);
00194 }
00195 }
00196
00197 void RTPInterface
00198 ::startNetworkReading(TaskScheduler::BackgroundHandlerProc* handlerProc) {
00199
00200 envir().taskScheduler().
00201 turnOnBackgroundReadHandling(fGS->socketNum(), handlerProc, fOwner);
00202
00203
00204 fReadHandlerProc = handlerProc;
00205 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00206 streams = streams->fNext) {
00207
00208 SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), streams->fStreamSocketNum);
00209
00210
00211 socketDescriptor->registerRTPInterface(streams->fStreamChannelId, this);
00212 }
00213 }
00214
00215 Boolean RTPInterface::handleRead(unsigned char* buffer, unsigned bufferMaxSize,
00216 unsigned& bytesRead, struct sockaddr_in& fromAddress, Boolean& packetReadWasIncomplete) {
00217 packetReadWasIncomplete = False;
00218 Boolean readSuccess;
00219 if (fNextTCPReadStreamSocketNum < 0) {
00220
00221 readSuccess = fGS->handleRead(buffer, bufferMaxSize, bytesRead, fromAddress);
00222 } else {
00223
00224 bytesRead = 0;
00225 unsigned totBytesToRead = fNextTCPReadSize;
00226 if (totBytesToRead > bufferMaxSize) totBytesToRead = bufferMaxSize;
00227 unsigned curBytesToRead = totBytesToRead;
00228 int curBytesRead;
00229 while ((curBytesRead = readSocket(envir(), fNextTCPReadStreamSocketNum,
00230 &buffer[bytesRead], curBytesToRead,
00231 fromAddress)) > 0) {
00232 bytesRead += curBytesRead;
00233 if (bytesRead >= totBytesToRead) break;
00234 curBytesToRead -= curBytesRead;
00235 }
00236 fNextTCPReadSize -= bytesRead;
00237 if (curBytesRead == 0 && curBytesToRead > 0) {
00238 packetReadWasIncomplete = True;
00239 return True;
00240 } else if (curBytesRead < 0) {
00241 bytesRead = 0;
00242 readSuccess = False;
00243 } else {
00244 readSuccess = True;
00245 }
00246 fNextTCPReadStreamSocketNum = -1;
00247 }
00248
00249 if (readSuccess && fAuxReadHandlerFunc != NULL) {
00250
00251 (*fAuxReadHandlerFunc)(fAuxReadHandlerClientData, buffer, bytesRead);
00252 }
00253 return readSuccess;
00254 }
00255
00256 void RTPInterface::stopNetworkReading() {
00257
00258 envir().taskScheduler().turnOffBackgroundReadHandling(fGS->socketNum());
00259
00260
00261 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00262 streams = streams->fNext) {
00263 deregisterSocket(envir(), streams->fStreamSocketNum, streams->fStreamChannelId);
00264 }
00265 }
00266
00267
00269
00270 void sendRTPOverTCP(unsigned char* packet, unsigned packetSize,
00271 int socketNum, unsigned char streamChannelId) {
00272 #ifdef DEBUG
00273 fprintf(stderr, "sendRTPOverTCP: %d bytes over channel %d (socket %d)\n",
00274 packetSize, streamChannelId, socketNum); fflush(stderr);
00275 #endif
00276
00277
00278 do {
00279 char const dollar = '$';
00280 if (send(socketNum, &dollar, 1, 0) != 1) break;
00281 if (send(socketNum, (char*)&streamChannelId, 1, 0) != 1) break;
00282
00283 char netPacketSize[2];
00284 netPacketSize[0] = (char) ((packetSize&0xFF00)>>8);
00285 netPacketSize[1] = (char) (packetSize&0xFF);
00286 if (send(socketNum, netPacketSize, 2, 0) != 2) break;
00287
00288 if (send(socketNum, (char*)packet, packetSize, 0) != (int)packetSize) break;
00289
00290 #ifdef DEBUG
00291 fprintf(stderr, "sendRTPOverTCP: completed\n"); fflush(stderr);
00292 #endif
00293
00294 return;
00295 } while (0);
00296
00297 #ifdef DEBUG
00298 fprintf(stderr, "sendRTPOverTCP: failed!\n"); fflush(stderr);
00299 #endif
00300 }
00301
00302 SocketDescriptor::SocketDescriptor(UsageEnvironment& env, int socketNum)
00303 :fEnv(env), fOurSocketNum(socketNum),
00304 fSubChannelHashTable(HashTable::create(ONE_WORD_HASH_KEYS)),
00305 fServerRequestAlternativeByteHandler(NULL), fServerRequestAlternativeByteHandlerClientData(NULL),
00306 fTCPReadingState(AWAITING_DOLLAR) {
00307 }
00308
00309 SocketDescriptor::~SocketDescriptor() {
00310 delete fSubChannelHashTable;
00311 }
00312
00313 void SocketDescriptor::registerRTPInterface(unsigned char streamChannelId,
00314 RTPInterface* rtpInterface) {
00315 Boolean isFirstRegistration = fSubChannelHashTable->IsEmpty();
00316 fSubChannelHashTable->Add((char const*)(long)streamChannelId,
00317 rtpInterface);
00318
00319 if (isFirstRegistration) {
00320
00321 TaskScheduler::BackgroundHandlerProc* handler
00322 = (TaskScheduler::BackgroundHandlerProc*)&tcpReadHandler;
00323 fEnv.taskScheduler().
00324 turnOnBackgroundReadHandling(fOurSocketNum, handler, this);
00325 }
00326 }
00327
00328 RTPInterface* SocketDescriptor
00329 ::lookupRTPInterface(unsigned char streamChannelId) {
00330 char const* lookupArg = (char const*)(long)streamChannelId;
00331 return (RTPInterface*)(fSubChannelHashTable->Lookup(lookupArg));
00332 }
00333
00334 void SocketDescriptor
00335 ::deregisterRTPInterface(unsigned char streamChannelId) {
00336 fSubChannelHashTable->Remove((char const*)(long)streamChannelId);
00337
00338 if (fSubChannelHashTable->IsEmpty()) {
00339
00340 fEnv.taskScheduler().turnOffBackgroundReadHandling(fOurSocketNum);
00341 removeSocketDescription(fEnv, fOurSocketNum);
00342 delete this;
00343 }
00344 }
00345
00346 void SocketDescriptor::tcpReadHandler(SocketDescriptor* socketDescriptor, int mask) {
00347 socketDescriptor->tcpReadHandler1(mask);
00348 }
00349
00350 void SocketDescriptor::tcpReadHandler1(int mask) {
00351
00352
00353
00354
00355
00356
00357
00358
00359 u_int8_t c;
00360 struct sockaddr_in fromAddress;
00361 if (fTCPReadingState != AWAITING_PACKET_DATA) {
00362 int result = readSocket(fEnv, fOurSocketNum, &c, 1, fromAddress);
00363 if (result != 1) {
00364 if (result < 0) {
00365 fEnv.taskScheduler().turnOffBackgroundReadHandling(fOurSocketNum);
00366 }
00367 return;
00368 }
00369 }
00370
00371 switch (fTCPReadingState) {
00372 case AWAITING_DOLLAR: {
00373 if (c == '$') {
00374 fTCPReadingState = AWAITING_STREAM_CHANNEL_ID;
00375 } else {
00376
00377 if (fServerRequestAlternativeByteHandler != NULL) {
00378 (*fServerRequestAlternativeByteHandler)(fServerRequestAlternativeByteHandlerClientData, c);
00379 }
00380 }
00381 break;
00382 }
00383 case AWAITING_STREAM_CHANNEL_ID: {
00384
00385 fStreamChannelId = c;
00386 fTCPReadingState = AWAITING_SIZE1;
00387 break;
00388 }
00389 case AWAITING_SIZE1: {
00390
00391 fSizeByte1 = c;
00392 fTCPReadingState = AWAITING_SIZE2;
00393 break;
00394 }
00395 case AWAITING_SIZE2: {
00396
00397 unsigned short size = (fSizeByte1<<8)|c;
00398
00399
00400 RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId);
00401 if (rtpInterface != NULL) {
00402 rtpInterface->fNextTCPReadSize = size;
00403 rtpInterface->fNextTCPReadStreamSocketNum = fOurSocketNum;
00404 rtpInterface->fNextTCPReadStreamChannelId = fStreamChannelId;
00405 }
00406 fTCPReadingState = AWAITING_PACKET_DATA;
00407 break;
00408 }
00409 case AWAITING_PACKET_DATA: {
00410
00411 RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId);
00412 if (rtpInterface != NULL) {
00413 if (rtpInterface->fNextTCPReadSize == 0) {
00414
00415 fTCPReadingState = AWAITING_DOLLAR;
00416 break;
00417 }
00418 if (rtpInterface->fReadHandlerProc != NULL) {
00419 #ifdef DEBUG
00420 fprintf(stderr, "SocketDescriptor::tcpReadHandler() reading %d bytes on channel %d\n", rtpInterface->fNextTCPReadSize, rtpInterface->fNextTCPReadStreamChannelId);
00421 #endif
00422 rtpInterface->fReadHandlerProc(rtpInterface->fOwner, mask);
00423 }
00424 }
00425 return;
00426 }
00427 }
00428 }
00429
00430
00432
00433 tcpStreamRecord
00434 ::tcpStreamRecord(int streamSocketNum, unsigned char streamChannelId,
00435 tcpStreamRecord* next)
00436 : fNext(next),
00437 fStreamSocketNum(streamSocketNum), fStreamChannelId(streamChannelId) {
00438 }
00439
00440 tcpStreamRecord::~tcpStreamRecord() {
00441 delete fNext;
00442 }
00443