00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "RTPInterface.hh"
00024 #include <GroupsockHelper.hh>
00025 #include <stdio.h>
00026
00028
00029
00030
00031
00032 static Boolean sendRTPOverTCP(unsigned char* packet, unsigned packetSize,
00033 int socketNum, unsigned char streamChannelId);
00034
00035
00036
00037
00038
00039
00040 static HashTable* socketHashTable(UsageEnvironment& env, Boolean createIfNotPresent = True) {
00041 _Tables* ourTables = _Tables::getOurTables(env, createIfNotPresent);
00042 if (ourTables == NULL) return NULL;
00043
00044 if (ourTables->socketTable == NULL) {
00045
00046 ourTables->socketTable = HashTable::create(ONE_WORD_HASH_KEYS);
00047 }
00048 return (HashTable*)(ourTables->socketTable);
00049 }
00050
00051 class SocketDescriptor {
00052 public:
00053 SocketDescriptor(UsageEnvironment& env, int socketNum);
00054 virtual ~SocketDescriptor();
00055
00056 void registerRTPInterface(unsigned char streamChannelId,
00057 RTPInterface* rtpInterface);
00058 RTPInterface* lookupRTPInterface(unsigned char streamChannelId);
00059 void deregisterRTPInterface(unsigned char streamChannelId);
00060
00061 void setServerRequestAlternativeByteHandler(ServerRequestAlternativeByteHandler* handler, void* clientData) {
00062 fServerRequestAlternativeByteHandler = handler;
00063 fServerRequestAlternativeByteHandlerClientData = clientData;
00064 }
00065
00066 private:
00067 static void tcpReadHandler(SocketDescriptor*, int mask);
00068 void tcpReadHandler1(int mask);
00069
00070 private:
00071 UsageEnvironment& fEnv;
00072 int fOurSocketNum;
00073 HashTable* fSubChannelHashTable;
00074 ServerRequestAlternativeByteHandler* fServerRequestAlternativeByteHandler;
00075 void* fServerRequestAlternativeByteHandlerClientData;
00076 u_int8_t fStreamChannelId, fSizeByte1;
00077 enum { AWAITING_DOLLAR, AWAITING_STREAM_CHANNEL_ID, AWAITING_SIZE1, AWAITING_SIZE2, AWAITING_PACKET_DATA } fTCPReadingState;
00078 };
00079
00080 static SocketDescriptor* lookupSocketDescriptor(UsageEnvironment& env, int sockNum, Boolean createIfNotFound = True) {
00081 HashTable* table = socketHashTable(env, createIfNotFound);
00082 if (table == NULL) return NULL;
00083
00084 char const* key = (char const*)(long)sockNum;
00085 SocketDescriptor* socketDescriptor = (SocketDescriptor*)(table->Lookup(key));
00086 if (socketDescriptor == NULL && createIfNotFound) {
00087 socketDescriptor = new SocketDescriptor(env, sockNum);
00088 table->Add((char const*)(long)(sockNum), socketDescriptor);
00089 }
00090
00091 return socketDescriptor;
00092 }
00093
00094 static void removeSocketDescription(UsageEnvironment& env, int sockNum) {
00095 char const* key = (char const*)(long)sockNum;
00096 HashTable* table = socketHashTable(env);
00097 table->Remove(key);
00098
00099 if (table->IsEmpty()) {
00100
00101 _Tables* ourTables = _Tables::getOurTables(env);
00102 delete table;
00103 ourTables->socketTable = NULL;
00104 ourTables->reclaimIfPossible();
00105 }
00106 }
00107
00108
00110
00111 RTPInterface::RTPInterface(Medium* owner, Groupsock* gs)
00112 : fOwner(owner), fGS(gs),
00113 fTCPStreams(NULL),
00114 fNextTCPReadSize(0), fNextTCPReadStreamSocketNum(-1),
00115 fNextTCPReadStreamChannelId(0xFF), fReadHandlerProc(NULL),
00116 fAuxReadHandlerFunc(NULL), fAuxReadHandlerClientData(NULL) {
00117
00118
00119
00120
00121 makeSocketNonBlocking(fGS->socketNum());
00122 increaseSendBufferTo(envir(), fGS->socketNum(), 50*1024);
00123 }
00124
00125 RTPInterface::~RTPInterface() {
00126 delete fTCPStreams;
00127 }
00128
00129 void RTPInterface::setStreamSocket(int sockNum,
00130 unsigned char streamChannelId) {
00131 fGS->removeAllDestinations();
00132 addStreamSocket(sockNum, streamChannelId);
00133 }
00134
00135 void RTPInterface::addStreamSocket(int sockNum,
00136 unsigned char streamChannelId) {
00137 if (sockNum < 0) return;
00138
00139 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00140 streams = streams->fNext) {
00141 if (streams->fStreamSocketNum == sockNum
00142 && streams->fStreamChannelId == streamChannelId) {
00143 return;
00144 }
00145 }
00146
00147 fTCPStreams = new tcpStreamRecord(sockNum, streamChannelId, fTCPStreams);
00148 }
00149
00150 static void deregisterSocket(UsageEnvironment& env, int sockNum, unsigned char streamChannelId) {
00151 SocketDescriptor* socketDescriptor = lookupSocketDescriptor(env, sockNum, False);
00152 if (socketDescriptor != NULL) {
00153 socketDescriptor->deregisterRTPInterface(streamChannelId);
00154
00155
00156 }
00157 }
00158
00159 void RTPInterface::removeStreamSocket(int sockNum,
00160 unsigned char streamChannelId) {
00161 for (tcpStreamRecord** streamsPtr = &fTCPStreams; *streamsPtr != NULL;
00162 streamsPtr = &((*streamsPtr)->fNext)) {
00163 if ((*streamsPtr)->fStreamSocketNum == sockNum
00164 && (*streamsPtr)->fStreamChannelId == streamChannelId) {
00165 deregisterSocket(envir(), sockNum, streamChannelId);
00166
00167
00168 tcpStreamRecord* next = (*streamsPtr)->fNext;
00169 (*streamsPtr)->fNext = NULL;
00170 delete (*streamsPtr);
00171 *streamsPtr = next;
00172 return;
00173 }
00174 }
00175 }
00176
00177 void RTPInterface
00178 ::setServerRequestAlternativeByteHandler(int socketNum, ServerRequestAlternativeByteHandler* handler, void* clientData) {
00179 SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), socketNum);
00180
00181 if (socketDescriptor != NULL) socketDescriptor->setServerRequestAlternativeByteHandler(handler, clientData);
00182 }
00183
00184
00185 Boolean RTPInterface::sendPacket(unsigned char* packet, unsigned packetSize) {
00186 Boolean success = True;
00187
00188
00189 if (!fGS->output(envir(), fGS->ttl(), packet, packetSize)) success = False;
00190
00191
00192 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00193 streams = streams->fNext) {
00194 if (!sendRTPOverTCP(packet, packetSize,
00195 streams->fStreamSocketNum, streams->fStreamChannelId)) {
00196 success = False;
00197 }
00198 }
00199
00200 return success;
00201 }
00202
00203 void RTPInterface
00204 ::startNetworkReading(TaskScheduler::BackgroundHandlerProc* handlerProc) {
00205
00206 envir().taskScheduler().
00207 turnOnBackgroundReadHandling(fGS->socketNum(), handlerProc, fOwner);
00208
00209
00210 fReadHandlerProc = handlerProc;
00211 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00212 streams = streams->fNext) {
00213
00214 SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), streams->fStreamSocketNum);
00215
00216
00217 socketDescriptor->registerRTPInterface(streams->fStreamChannelId, this);
00218 }
00219 }
00220
00221 Boolean RTPInterface::handleRead(unsigned char* buffer, unsigned bufferMaxSize,
00222 unsigned& bytesRead, struct sockaddr_in& fromAddress, Boolean& packetReadWasIncomplete) {
00223 packetReadWasIncomplete = False;
00224 Boolean readSuccess;
00225 if (fNextTCPReadStreamSocketNum < 0) {
00226
00227 readSuccess = fGS->handleRead(buffer, bufferMaxSize, bytesRead, fromAddress);
00228 } else {
00229
00230 bytesRead = 0;
00231 unsigned totBytesToRead = fNextTCPReadSize;
00232 if (totBytesToRead > bufferMaxSize) totBytesToRead = bufferMaxSize;
00233 unsigned curBytesToRead = totBytesToRead;
00234 int curBytesRead;
00235 while ((curBytesRead = readSocket(envir(), fNextTCPReadStreamSocketNum,
00236 &buffer[bytesRead], curBytesToRead,
00237 fromAddress)) > 0) {
00238 bytesRead += curBytesRead;
00239 if (bytesRead >= totBytesToRead) break;
00240 curBytesToRead -= curBytesRead;
00241 }
00242 fNextTCPReadSize -= bytesRead;
00243 if (curBytesRead == 0 && curBytesToRead > 0) {
00244 packetReadWasIncomplete = True;
00245 return True;
00246 } else if (curBytesRead < 0) {
00247 bytesRead = 0;
00248 readSuccess = False;
00249 } else {
00250 readSuccess = True;
00251 }
00252 fNextTCPReadStreamSocketNum = -1;
00253 }
00254
00255 if (readSuccess && fAuxReadHandlerFunc != NULL) {
00256
00257 (*fAuxReadHandlerFunc)(fAuxReadHandlerClientData, buffer, bytesRead);
00258 }
00259 return readSuccess;
00260 }
00261
00262 void RTPInterface::stopNetworkReading() {
00263
00264 envir().taskScheduler().turnOffBackgroundReadHandling(fGS->socketNum());
00265
00266
00267 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00268 streams = streams->fNext) {
00269 deregisterSocket(envir(), streams->fStreamSocketNum, streams->fStreamChannelId);
00270 }
00271 }
00272
00273
00275
00276 Boolean sendRTPOverTCP(unsigned char* packet, unsigned packetSize,
00277 int socketNum, unsigned char streamChannelId) {
00278 #ifdef DEBUG
00279 fprintf(stderr, "sendRTPOverTCP: %d bytes over channel %d (socket %d)\n",
00280 packetSize, streamChannelId, socketNum); fflush(stderr);
00281 #endif
00282
00283
00284 do {
00285 char const dollar = '$';
00286 if (send(socketNum, &dollar, 1, 0) != 1) break;
00287 if (send(socketNum, (char*)&streamChannelId, 1, 0) != 1) break;
00288
00289 char netPacketSize[2];
00290 netPacketSize[0] = (char) ((packetSize&0xFF00)>>8);
00291 netPacketSize[1] = (char) (packetSize&0xFF);
00292 if (send(socketNum, netPacketSize, 2, 0) != 2) break;
00293
00294 if (send(socketNum, (char*)packet, packetSize, 0) != (int)packetSize) break;
00295
00296 #ifdef DEBUG
00297 fprintf(stderr, "sendRTPOverTCP: completed\n"); fflush(stderr);
00298 #endif
00299
00300 return True;
00301 } while (0);
00302
00303 #ifdef DEBUG
00304 fprintf(stderr, "sendRTPOverTCP: failed!\n"); fflush(stderr);
00305 #endif
00306 return False;
00307 }
00308
00309 SocketDescriptor::SocketDescriptor(UsageEnvironment& env, int socketNum)
00310 :fEnv(env), fOurSocketNum(socketNum),
00311 fSubChannelHashTable(HashTable::create(ONE_WORD_HASH_KEYS)),
00312 fServerRequestAlternativeByteHandler(NULL), fServerRequestAlternativeByteHandlerClientData(NULL),
00313 fTCPReadingState(AWAITING_DOLLAR) {
00314 }
00315
00316 SocketDescriptor::~SocketDescriptor() {
00317 delete fSubChannelHashTable;
00318 }
00319
00320 void SocketDescriptor::registerRTPInterface(unsigned char streamChannelId,
00321 RTPInterface* rtpInterface) {
00322 Boolean isFirstRegistration = fSubChannelHashTable->IsEmpty();
00323 fSubChannelHashTable->Add((char const*)(long)streamChannelId,
00324 rtpInterface);
00325
00326 if (isFirstRegistration) {
00327
00328 TaskScheduler::BackgroundHandlerProc* handler
00329 = (TaskScheduler::BackgroundHandlerProc*)&tcpReadHandler;
00330 fEnv.taskScheduler().
00331 turnOnBackgroundReadHandling(fOurSocketNum, handler, this);
00332 }
00333 }
00334
00335 RTPInterface* SocketDescriptor
00336 ::lookupRTPInterface(unsigned char streamChannelId) {
00337 char const* lookupArg = (char const*)(long)streamChannelId;
00338 return (RTPInterface*)(fSubChannelHashTable->Lookup(lookupArg));
00339 }
00340
00341 void SocketDescriptor
00342 ::deregisterRTPInterface(unsigned char streamChannelId) {
00343 fSubChannelHashTable->Remove((char const*)(long)streamChannelId);
00344
00345 if (fSubChannelHashTable->IsEmpty()) {
00346
00347 fEnv.taskScheduler().turnOffBackgroundReadHandling(fOurSocketNum);
00348 removeSocketDescription(fEnv, fOurSocketNum);
00349 delete this;
00350 }
00351 }
00352
00353 void SocketDescriptor::tcpReadHandler(SocketDescriptor* socketDescriptor, int mask) {
00354 socketDescriptor->tcpReadHandler1(mask);
00355 }
00356
00357 void SocketDescriptor::tcpReadHandler1(int mask) {
00358
00359
00360
00361
00362
00363
00364
00365
00366 u_int8_t c;
00367 struct sockaddr_in fromAddress;
00368 if (fTCPReadingState != AWAITING_PACKET_DATA) {
00369 int result = readSocket(fEnv, fOurSocketNum, &c, 1, fromAddress);
00370 if (result != 1) {
00371 if (result < 0) {
00372 fEnv.taskScheduler().turnOffBackgroundReadHandling(fOurSocketNum);
00373 }
00374 return;
00375 }
00376 }
00377
00378 switch (fTCPReadingState) {
00379 case AWAITING_DOLLAR: {
00380 if (c == '$') {
00381 fTCPReadingState = AWAITING_STREAM_CHANNEL_ID;
00382 } else {
00383
00384 if (fServerRequestAlternativeByteHandler != NULL) {
00385 (*fServerRequestAlternativeByteHandler)(fServerRequestAlternativeByteHandlerClientData, c);
00386 }
00387 }
00388 break;
00389 }
00390 case AWAITING_STREAM_CHANNEL_ID: {
00391
00392 if (lookupRTPInterface(c) != NULL) {
00393 fStreamChannelId = c;
00394 fTCPReadingState = AWAITING_SIZE1;
00395 } else {
00396
00397 fTCPReadingState = AWAITING_DOLLAR;
00398 }
00399 break;
00400 }
00401 case AWAITING_SIZE1: {
00402
00403 fSizeByte1 = c;
00404 fTCPReadingState = AWAITING_SIZE2;
00405 break;
00406 }
00407 case AWAITING_SIZE2: {
00408
00409 unsigned short size = (fSizeByte1<<8)|c;
00410
00411
00412 RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId);
00413 if (rtpInterface != NULL) {
00414 rtpInterface->fNextTCPReadSize = size;
00415 rtpInterface->fNextTCPReadStreamSocketNum = fOurSocketNum;
00416 rtpInterface->fNextTCPReadStreamChannelId = fStreamChannelId;
00417 }
00418 fTCPReadingState = AWAITING_PACKET_DATA;
00419 break;
00420 }
00421 case AWAITING_PACKET_DATA: {
00422
00423 RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId);
00424 if (rtpInterface != NULL) {
00425 if (rtpInterface->fNextTCPReadSize == 0) {
00426
00427 fTCPReadingState = AWAITING_DOLLAR;
00428 break;
00429 }
00430 if (rtpInterface->fReadHandlerProc != NULL) {
00431 #ifdef DEBUG
00432 fprintf(stderr, "SocketDescriptor::tcpReadHandler() reading %d bytes on channel %d\n", rtpInterface->fNextTCPReadSize, rtpInterface->fNextTCPReadStreamChannelId);
00433 #endif
00434 rtpInterface->fReadHandlerProc(rtpInterface->fOwner, mask);
00435 }
00436 }
00437 return;
00438 }
00439 }
00440 }
00441
00442
00444
00445 tcpStreamRecord
00446 ::tcpStreamRecord(int streamSocketNum, unsigned char streamChannelId,
00447 tcpStreamRecord* next)
00448 : fNext(next),
00449 fStreamSocketNum(streamSocketNum), fStreamChannelId(streamChannelId) {
00450 }
00451
00452 tcpStreamRecord::~tcpStreamRecord() {
00453 delete fNext;
00454 }
00455