00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include "Groupsock.hh"
00021 #include "GroupsockHelper.hh"
00022
00023 #include "TunnelEncaps.hh"
00024
00025 #ifndef NO_SSTREAM
00026 #include <sstream>
00027 #endif
00028 #include <stdio.h>
00029
00031
00032 OutputSocket::OutputSocket(UsageEnvironment& env)
00033 : Socket(env, 0 ),
00034 fSourcePort(0), fLastSentTTL(0) {
00035 }
00036
00037 OutputSocket::OutputSocket(UsageEnvironment& env, Port port)
00038 : Socket(env, port),
00039 fSourcePort(0), fLastSentTTL(0) {
00040 }
00041
00042 OutputSocket::~OutputSocket() {
00043 }
00044
00045 Boolean OutputSocket::write(netAddressBits address, Port port, u_int8_t ttl,
00046 unsigned char* buffer, unsigned bufferSize) {
00047 if (ttl == fLastSentTTL) {
00048
00049 ttl = 0;
00050 } else {
00051 fLastSentTTL = ttl;
00052 }
00053 struct in_addr destAddr; destAddr.s_addr = address;
00054 if (!writeSocket(env(), socketNum(), destAddr, port, ttl,
00055 buffer, bufferSize))
00056 return False;
00057
00058 if (sourcePortNum() == 0) {
00059
00060
00061 if (!getSourcePort(env(), socketNum(), fSourcePort)) {
00062 if (DebugLevel >= 1)
00063 env() << *this
00064 << ": failed to get source port: "
00065 << env().getResultMsg() << "\n";
00066 return False;
00067 }
00068 }
00069
00070 return True;
00071 }
00072
00073
00074 Boolean OutputSocket
00075 ::handleRead(unsigned char* , unsigned ,
00076 unsigned& , struct sockaddr_in& ) {
00077 return True;
00078 }
00079
00080
00082
00083 destRecord
00084 ::destRecord(struct in_addr const& addr, Port const& port, u_int8_t ttl,
00085 destRecord* next)
00086 : fNext(next), fGroupEId(addr, port.num(), ttl), fPort(port) {
00087 }
00088
00089 destRecord::~destRecord() {
00090 delete fNext;
00091 }
00092
00093
00095
00096 NetInterfaceTrafficStats Groupsock::statsIncoming;
00097 NetInterfaceTrafficStats Groupsock::statsOutgoing;
00098 NetInterfaceTrafficStats Groupsock::statsRelayedIncoming;
00099 NetInterfaceTrafficStats Groupsock::statsRelayedOutgoing;
00100
00101
00102 Groupsock::Groupsock(UsageEnvironment& env, struct in_addr const& groupAddr,
00103 Port port, u_int8_t ttl)
00104 : OutputSocket(env, port),
00105 deleteIfNoMembers(False), isSlave(False),
00106 fIncomingGroupEId(groupAddr, port.num(), ttl), fDests(NULL), fTTL(ttl) {
00107 addDestination(groupAddr, port);
00108
00109 if (!socketJoinGroup(env, socketNum(), groupAddr.s_addr)) {
00110 if (DebugLevel >= 1) {
00111 env << *this << ": failed to join group: "
00112 << env.getResultMsg() << "\n";
00113 }
00114 }
00115
00116
00117 if (ourIPAddress(env) == 0) {
00118 if (DebugLevel >= 0) {
00119 env << "Unable to determine our source address: "
00120 << env.getResultMsg() << "\n";
00121 }
00122 }
00123
00124 if (DebugLevel >= 2) env << *this << ": created\n";
00125 }
00126
00127
00128 Groupsock::Groupsock(UsageEnvironment& env, struct in_addr const& groupAddr,
00129 struct in_addr const& sourceFilterAddr,
00130 Port port)
00131 : OutputSocket(env, port),
00132 deleteIfNoMembers(False), isSlave(False),
00133 fIncomingGroupEId(groupAddr, sourceFilterAddr, port.num()),
00134 fDests(NULL), fTTL(255) {
00135 addDestination(groupAddr, port);
00136
00137
00138 if (!socketJoinGroupSSM(env, socketNum(), groupAddr.s_addr,
00139 sourceFilterAddr.s_addr)) {
00140 if (DebugLevel >= 3) {
00141 env << *this << ": SSM join failed: "
00142 << env.getResultMsg();
00143 env << " - trying regular join instead\n";
00144 }
00145 if (!socketJoinGroup(env, socketNum(), groupAddr.s_addr)) {
00146 if (DebugLevel >= 1) {
00147 env << *this << ": failed to join group: "
00148 << env.getResultMsg() << "\n";
00149 }
00150 }
00151 }
00152
00153 if (DebugLevel >= 2) env << *this << ": created\n";
00154 }
00155
00156 Groupsock::~Groupsock() {
00157 if (isSSM()) {
00158 if (!socketLeaveGroupSSM(env(), socketNum(), groupAddress().s_addr,
00159 sourceFilterAddress().s_addr)) {
00160 socketLeaveGroup(env(), socketNum(), groupAddress().s_addr);
00161 }
00162 } else {
00163 socketLeaveGroup(env(), socketNum(), groupAddress().s_addr);
00164 }
00165
00166 delete fDests;
00167
00168 if (DebugLevel >= 2) env() << *this << ": deleting\n";
00169 }
00170
00171 void
00172 Groupsock::changeDestinationParameters(struct in_addr const& newDestAddr,
00173 Port newDestPort, int newDestTTL) {
00174 if (fDests == NULL) return;
00175
00176 struct in_addr destAddr = fDests->fGroupEId.groupAddress();
00177 if (newDestAddr.s_addr != 0) {
00178 if (newDestAddr.s_addr != destAddr.s_addr
00179 && IsMulticastAddress(newDestAddr.s_addr)) {
00180
00181
00182
00183 socketLeaveGroup(env(), socketNum(), destAddr.s_addr);
00184 socketJoinGroup(env(), socketNum(), newDestAddr.s_addr);
00185 }
00186 destAddr.s_addr = newDestAddr.s_addr;
00187 }
00188
00189 portNumBits destPortNum = fDests->fGroupEId.portNum();
00190 if (newDestPort.num() != 0) {
00191 if (newDestPort.num() != destPortNum
00192 && IsMulticastAddress(destAddr.s_addr)) {
00193
00194 changePort(newDestPort);
00195
00196 socketJoinGroup(env(), socketNum(), destAddr.s_addr);
00197 }
00198 destPortNum = newDestPort.num();
00199 fDests->fPort = newDestPort;
00200 }
00201
00202 u_int8_t destTTL = ttl();
00203 if (newDestTTL != ~0) destTTL = (u_int8_t)newDestTTL;
00204
00205 fDests->fGroupEId = GroupEId(destAddr, destPortNum, destTTL);
00206 }
00207
00208 void Groupsock::addDestination(struct in_addr const& addr, Port const& port) {
00209
00210 for (destRecord* dests = fDests; dests != NULL; dests = dests->fNext) {
00211 if (addr.s_addr == dests->fGroupEId.groupAddress().s_addr
00212 && port.num() == dests->fPort.num()) {
00213 return;
00214 }
00215 }
00216
00217 fDests = new destRecord(addr, port, ttl(), fDests);
00218 }
00219
00220 void Groupsock::removeDestination(struct in_addr const& addr, Port const& port) {
00221 for (destRecord** destsPtr = &fDests; *destsPtr != NULL;
00222 destsPtr = &((*destsPtr)->fNext)) {
00223 if (addr.s_addr == (*destsPtr)->fGroupEId.groupAddress().s_addr
00224 && port.num() == (*destsPtr)->fPort.num()) {
00225
00226 destRecord* next = (*destsPtr)->fNext;
00227 (*destsPtr)->fNext = NULL;
00228 delete (*destsPtr);
00229 *destsPtr = next;
00230 return;
00231 }
00232 }
00233 }
00234
00235 void Groupsock::removeAllDestinations() {
00236 delete fDests; fDests = NULL;
00237 }
00238
00239 void Groupsock::multicastSendOnly() {
00240 socketLeaveGroup(env(), socketNum(), fIncomingGroupEId.groupAddress().s_addr);
00241 for (destRecord* dests = fDests; dests != NULL; dests = dests->fNext) {
00242 socketLeaveGroup(env(), socketNum(), dests->fGroupEId.groupAddress().s_addr);
00243 }
00244 }
00245
00246 Boolean Groupsock::output(UsageEnvironment& env, u_int8_t ttlToSend,
00247 unsigned char* buffer, unsigned bufferSize,
00248 DirectedNetInterface* interfaceNotToFwdBackTo) {
00249 do {
00250
00251 Boolean writeSuccess = True;
00252 for (destRecord* dests = fDests; dests != NULL; dests = dests->fNext) {
00253 if (!write(dests->fGroupEId.groupAddress().s_addr, dests->fPort, ttlToSend,
00254 buffer, bufferSize)) {
00255 writeSuccess = False;
00256 break;
00257 }
00258 }
00259 if (!writeSuccess) break;
00260 statsOutgoing.countPacket(bufferSize);
00261 statsGroupOutgoing.countPacket(bufferSize);
00262
00263
00264 int numMembers = 0;
00265 if (!members().IsEmpty()) {
00266 numMembers =
00267 outputToAllMembersExcept(interfaceNotToFwdBackTo,
00268 ttlToSend, buffer, bufferSize,
00269 ourIPAddress(env));
00270 if (numMembers < 0) break;
00271 }
00272
00273 if (DebugLevel >= 3) {
00274 env << *this << ": wrote " << bufferSize << " bytes, ttl "
00275 << (unsigned)ttlToSend;
00276 if (numMembers > 0) {
00277 env << "; relayed to " << numMembers << " members";
00278 }
00279 env << "\n";
00280 }
00281 return True;
00282 } while (0);
00283
00284 if (DebugLevel >= 0) {
00285 env.setResultMsg("Groupsock write failed: ", env.getResultMsg());
00286 }
00287 return False;
00288 }
00289
00290 Boolean Groupsock::handleRead(unsigned char* buffer, unsigned bufferMaxSize,
00291 unsigned& bytesRead,
00292 struct sockaddr_in& fromAddress) {
00293
00294
00295
00296 bytesRead = 0;
00297
00298 int maxBytesToRead = bufferMaxSize - TunnelEncapsulationTrailerMaxSize;
00299 int numBytes = readSocket(env(), socketNum(),
00300 buffer, maxBytesToRead, fromAddress);
00301 if (numBytes < 0) {
00302 if (DebugLevel >= 0) {
00303 env().setResultMsg("Groupsock read failed: ",
00304 env().getResultMsg());
00305 }
00306 return False;
00307 }
00308
00309
00310 if (isSSM()
00311 && fromAddress.sin_addr.s_addr != sourceFilterAddress().s_addr) {
00312 return True;
00313 }
00314
00315
00316
00317
00318 bytesRead = numBytes;
00319
00320 int numMembers = 0;
00321 if (!wasLoopedBackFromUs(env(), fromAddress)) {
00322 statsIncoming.countPacket(numBytes);
00323 statsGroupIncoming.countPacket(numBytes);
00324 numMembers =
00325 outputToAllMembersExcept(NULL, ttl(),
00326 buffer, bytesRead,
00327 fromAddress.sin_addr.s_addr);
00328 if (numMembers > 0) {
00329 statsRelayedIncoming.countPacket(numBytes);
00330 statsGroupRelayedIncoming.countPacket(numBytes);
00331 }
00332 }
00333 if (DebugLevel >= 3) {
00334 env() << *this << ": read " << bytesRead << " bytes from ";
00335 env() << our_inet_ntoa(fromAddress.sin_addr);
00336 if (numMembers > 0) {
00337 env() << "; relayed to " << numMembers << " members";
00338 }
00339 env() << "\n";
00340 }
00341
00342 return True;
00343 }
00344
00345 Boolean Groupsock::wasLoopedBackFromUs(UsageEnvironment& env,
00346 struct sockaddr_in& fromAddress) {
00347 if (fromAddress.sin_addr.s_addr
00348 == ourIPAddress(env)) {
00349 if (fromAddress.sin_port == sourcePortNum()) {
00350 #ifdef DEBUG_LOOPBACK_CHECKING
00351 if (DebugLevel >= 3) {
00352 env() << *this << ": got looped-back packet\n";
00353 }
00354 #endif
00355 return True;
00356 }
00357 }
00358
00359 return False;
00360 }
00361
00362 int Groupsock::outputToAllMembersExcept(DirectedNetInterface* exceptInterface,
00363 u_int8_t ttlToFwd,
00364 unsigned char* data, unsigned size,
00365 netAddressBits sourceAddr) {
00366
00367 if (ttlToFwd == 0) return 0;
00368
00369 DirectedNetInterfaceSet::Iterator iter(members());
00370 unsigned numMembers = 0;
00371 DirectedNetInterface* interf;
00372 while ((interf = iter.next()) != NULL) {
00373
00374 if (interf == exceptInterface)
00375 continue;
00376
00377
00378
00379 UsageEnvironment& saveEnv = env();
00380
00381 if (!interf->SourceAddrOKForRelaying(saveEnv, sourceAddr)) {
00382 if (strcmp(saveEnv.getResultMsg(), "") != 0) {
00383
00384 return -1;
00385 } else {
00386 continue;
00387 }
00388 }
00389
00390 if (numMembers == 0) {
00391
00392
00393
00394 TunnelEncapsulationTrailer* trailerInPacket
00395 = (TunnelEncapsulationTrailer*)&data[size];
00396 TunnelEncapsulationTrailer* trailer;
00397
00398 Boolean misaligned = ((unsigned long)trailerInPacket & 3) != 0;
00399 unsigned trailerOffset;
00400 u_int8_t tunnelCmd;
00401 if (isSSM()) {
00402
00403 trailerOffset = TunnelEncapsulationTrailerAuxSize;
00404 tunnelCmd = TunnelDataAuxCmd;
00405 } else {
00406 trailerOffset = 0;
00407 tunnelCmd = TunnelDataCmd;
00408 }
00409 unsigned trailerSize = TunnelEncapsulationTrailerSize + trailerOffset;
00410 unsigned tmpTr[TunnelEncapsulationTrailerMaxSize];
00411 if (misaligned) {
00412 trailer = (TunnelEncapsulationTrailer*)&tmpTr;
00413 } else {
00414 trailer = trailerInPacket;
00415 }
00416 trailer += trailerOffset;
00417
00418 if (fDests != NULL) {
00419 trailer->address() = fDests->fGroupEId.groupAddress().s_addr;
00420 trailer->port() = fDests->fPort;
00421 }
00422 trailer->ttl() = ttlToFwd;
00423 trailer->command() = tunnelCmd;
00424
00425 if (isSSM()) {
00426 trailer->auxAddress() = sourceFilterAddress().s_addr;
00427 }
00428
00429 if (misaligned) {
00430 memmove(trailerInPacket, trailer-trailerOffset, trailerSize);
00431 }
00432
00433 size += trailerSize;
00434 }
00435
00436 interf->write(data, size);
00437 ++numMembers;
00438 }
00439
00440 return numMembers;
00441 }
00442
00443 UsageEnvironment& operator<<(UsageEnvironment& s, const Groupsock& g) {
00444 UsageEnvironment& s1 = s << timestampString() << " Groupsock("
00445 << g.socketNum() << ": "
00446 << our_inet_ntoa(g.groupAddress())
00447 << ", " << g.port() << ", ";
00448 if (g.isSSM()) {
00449 return s1 << "SSM source: "
00450 << our_inet_ntoa(g.sourceFilterAddress()) << ")";
00451 } else {
00452 return s1 << (unsigned)(g.ttl()) << ")";
00453 }
00454 }
00455
00456
00458
00459
00460
00461
00462 static HashTable* getSocketTable(UsageEnvironment& env) {
00463 if (env.groupsockPriv == NULL) {
00464 env.groupsockPriv = HashTable::create(ONE_WORD_HASH_KEYS);
00465 }
00466 return (HashTable*)(env.groupsockPriv);
00467 }
00468
00469 static Boolean unsetGroupsockBySocket(Groupsock const* groupsock) {
00470 do {
00471 if (groupsock == NULL) break;
00472
00473 int sock = groupsock->socketNum();
00474
00475 if (sock < 0) break;
00476
00477 HashTable* sockets = getSocketTable(groupsock->env());
00478 if (sockets == NULL) break;
00479
00480 Groupsock* gs = (Groupsock*)sockets->Lookup((char*)(long)sock);
00481 if (gs == NULL || gs != groupsock) break;
00482 sockets->Remove((char*)(long)sock);
00483
00484 if (sockets->IsEmpty()) {
00485
00486 delete sockets;
00487 (gs->env()).groupsockPriv = NULL;
00488 }
00489
00490 return True;
00491 } while (0);
00492
00493 return False;
00494 }
00495
00496 static Boolean setGroupsockBySocket(UsageEnvironment& env, int sock,
00497 Groupsock* groupsock) {
00498 do {
00499
00500 if (sock < 0) {
00501 char buf[100];
00502 sprintf(buf, "trying to use bad socket (%d)", sock);
00503 env.setResultMsg(buf);
00504 break;
00505 }
00506
00507 HashTable* sockets = getSocketTable(env);
00508 if (sockets == NULL) break;
00509
00510
00511
00512 Boolean alreadyExists
00513 = (sockets->Lookup((char*)(long)sock) != 0);
00514 if (alreadyExists) {
00515 char buf[100];
00516 sprintf(buf,
00517 "Attempting to replace an existing socket (%d",
00518 sock);
00519 env.setResultMsg(buf);
00520 break;
00521 }
00522
00523 sockets->Add((char*)(long)sock, groupsock);
00524 return True;
00525 } while (0);
00526
00527 return False;
00528 }
00529
00530 static Groupsock* getGroupsockBySocket(UsageEnvironment& env, int sock) {
00531 do {
00532
00533 if (sock < 0) break;
00534
00535 HashTable* sockets = getSocketTable(env);
00536 if (sockets == NULL) break;
00537
00538 return (Groupsock*)sockets->Lookup((char*)(long)sock);
00539 } while (0);
00540
00541 return NULL;
00542 }
00543
00544 Groupsock*
00545 GroupsockLookupTable::Fetch(UsageEnvironment& env,
00546 netAddressBits groupAddress,
00547 Port port, u_int8_t ttl,
00548 Boolean& isNew) {
00549 isNew = False;
00550 Groupsock* groupsock;
00551 do {
00552 groupsock = (Groupsock*) fTable.Lookup(groupAddress, (~0), port);
00553 if (groupsock == NULL) {
00554 groupsock = AddNew(env, groupAddress, (~0), port, ttl);
00555 if (groupsock == NULL) break;
00556 isNew = True;
00557 }
00558 } while (0);
00559
00560 return groupsock;
00561 }
00562
00563 Groupsock*
00564 GroupsockLookupTable::Fetch(UsageEnvironment& env,
00565 netAddressBits groupAddress,
00566 netAddressBits sourceFilterAddr, Port port,
00567 Boolean& isNew) {
00568 isNew = False;
00569 Groupsock* groupsock;
00570 do {
00571 groupsock
00572 = (Groupsock*) fTable.Lookup(groupAddress, sourceFilterAddr, port);
00573 if (groupsock == NULL) {
00574 groupsock = AddNew(env, groupAddress, sourceFilterAddr, port, 0);
00575 if (groupsock == NULL) break;
00576 isNew = True;
00577 }
00578 } while (0);
00579
00580 return groupsock;
00581 }
00582
00583 Groupsock*
00584 GroupsockLookupTable::Lookup(netAddressBits groupAddress, Port port) {
00585 return (Groupsock*) fTable.Lookup(groupAddress, (~0), port);
00586 }
00587
00588 Groupsock*
00589 GroupsockLookupTable::Lookup(netAddressBits groupAddress,
00590 netAddressBits sourceFilterAddr, Port port) {
00591 return (Groupsock*) fTable.Lookup(groupAddress, sourceFilterAddr, port);
00592 }
00593
00594 Groupsock* GroupsockLookupTable::Lookup(UsageEnvironment& env, int sock) {
00595 return getGroupsockBySocket(env, sock);
00596 }
00597
00598 Boolean GroupsockLookupTable::Remove(Groupsock const* groupsock) {
00599 unsetGroupsockBySocket(groupsock);
00600 return fTable.Remove(groupsock->groupAddress().s_addr,
00601 groupsock->sourceFilterAddress().s_addr,
00602 groupsock->port());
00603 }
00604
00605 Groupsock* GroupsockLookupTable::AddNew(UsageEnvironment& env,
00606 netAddressBits groupAddress,
00607 netAddressBits sourceFilterAddress,
00608 Port port, u_int8_t ttl) {
00609 Groupsock* groupsock;
00610 do {
00611 struct in_addr groupAddr; groupAddr.s_addr = groupAddress;
00612 if (sourceFilterAddress == netAddressBits(~0)) {
00613
00614 groupsock = new Groupsock(env, groupAddr, port, ttl);
00615 } else {
00616
00617 struct in_addr sourceFilterAddr;
00618 sourceFilterAddr.s_addr = sourceFilterAddress;
00619 groupsock = new Groupsock(env, groupAddr, sourceFilterAddr, port);
00620 }
00621
00622 if (groupsock == NULL || groupsock->socketNum() < 0) break;
00623
00624 if (!setGroupsockBySocket(env, groupsock->socketNum(), groupsock)) break;
00625
00626 fTable.Add(groupAddress, sourceFilterAddress, port, (void*)groupsock);
00627 } while (0);
00628
00629 return groupsock;
00630 }
00631
00632 GroupsockLookupTable::Iterator::Iterator(GroupsockLookupTable& groupsocks)
00633 : fIter(AddressPortLookupTable::Iterator(groupsocks.fTable)) {
00634 }
00635
00636 Groupsock* GroupsockLookupTable::Iterator::next() {
00637 return (Groupsock*) fIter.next();
00638 };