00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "fasp/Transport.h"
00016
00017 #include "base/commons.h"
00018
00019 #include "dabc/timing.h"
00020 #include "dabc/Port.h"
00021 #include "dabc/version.h"
00022 #include "dabc/Manager.h"
00023 #include "mbs/MbsTypeDefs.h"
00024 #include "mbs/Iterator.h"
00025
00026
00027 #include <errno.h>
00028
00029 #include <math.h>
00030
00031 fasp::Transport::Transport(dabc::Reference port, dabc::Command cmd) :
00032 dabc::SocketWorker(), dabc::Transport(port.Ref()),
00033 dabc::MemoryPoolRequester(),
00034 fQueueMutex(),
00035 fQueue(
00036 ((dabc::Port*) port())->InputQueueCapacity()),
00037 fTgtBuf()
00038 {
00039
00040 fFlushTimeout = 0.3;
00041 fBufferSize = 32576;
00042
00043 rcvCounterNew = 0;
00044 rcvCounterDubl = 0;
00045
00046 fSockfd = -1;
00047
00048 fIfName = "eth0";
00049
00050 memset(fSourceMAC, 0, sizeof(fSourceMAC));
00051 memset(fDestMAC, 0, sizeof(fDestMAC));
00052 fSendDelay = 0.01;
00053 fPlainData = false;
00054
00055 ConfigureFor((dabc::Port*) port(), cmd);
00056
00057 }
00058
00059 fasp::Transport::~Transport()
00060 {
00061
00062
00063 fRawSync = 0;
00064 fDataPtr.reset();
00065 fHdrPtr.reset();
00066 fTgtBuf.Release();
00067 fQueue.Cleanup();
00068 fPool.Release();
00069 }
00070
00071 void fasp::Transport::ConfigureFor(dabc::Port* port, dabc::Command cmd)
00072 {
00073
00074 if ((fSockfd = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL))) == -1) {
00075 EOUT(("socket NOT created"));
00076 return;
00077 } else {
00078 DOUT0(("RAW socket created"));
00079 }
00080
00081 fBufferSize = port->Cfg(dabc::xmlBufferSize, cmd).AsInt(fBufferSize);
00082 fFlushTimeout = port->Cfg(dabc::xmlFlushTimeout, cmd).AsDouble(fFlushTimeout);
00083 fIfName = port->Cfg("if", cmd).AsStdStr(fIfName);
00084 fSendDelay = port->Cfg("BurstDelay", cmd).AsDouble(fSendDelay);
00085 fPlainData = port->Cfg("PlainData", cmd).AsBool(fPlainData);
00086
00087 std::string src_mac = port->Cfg("SourceMAC", cmd).AsStdStr();
00088 std::string dest_mac = port->Cfg("DestMAC", cmd).AsStdStr();
00089 int if_index = port->Cfg("if_index", cmd).AsInt(-1);
00090
00091 if (dest_mac.length() == 2*6 + 5) {
00092 unsigned ddd[6];
00093 sscanf(dest_mac.c_str(),"%x:%x:%x:%x:%x:%x", ddd, ddd+1, ddd+2, ddd+3, ddd+4,ddd+5);
00094 for (int n=0;n<6;n++) fDestMAC[n] = ddd[n];
00095 } else {
00096 fDestMAC[0] = 0xB8;
00097 fDestMAC[1] = 0xAC;
00098 fDestMAC[2] = 0x6F;
00099 fDestMAC[3] = 0x92;
00100 fDestMAC[4] = 0x2B;
00101 fDestMAC[5] = 0x34;
00102 }
00103
00104
00105 DOUT0(("FASP transport buffer size %u flush timeout %3.2f", (unsigned) fBufferSize, fFlushTimeout));
00106
00107 fPool = port->GetMemoryPool();
00108
00109
00110
00111
00112 struct ifreq ifr;
00113 memset(&ifr, 0, sizeof(ifr));
00114 strcpy(ifr.ifr_name, fIfName.c_str());
00115 if (setsockopt(fSockfd, SOL_SOCKET, SO_BINDTODEVICE, (void *)&ifr, sizeof(ifr)) < 0) {
00116 EOUT(("FAIL to bind socket to interface %s", fIfName.c_str()));
00117 } else {
00118 DOUT0(("Bind socket to interface %s", fIfName.c_str()));
00119 }
00120
00121
00122 if (if_index < 0) {
00123 struct ifreq if_idx;
00124
00125 memset (&if_idx, 0, sizeof(struct ifreq));
00126 strncpy (if_idx.ifr_name, fIfName.c_str(), sizeof(if_idx.ifr_name));
00127 if (ioctl(fSockfd, SIOCGIFINDEX, &if_idx) < 0) {
00128 EOUT(("Cannot get index for interface %s, use 2", fIfName.c_str()));
00129 if_index = 2;
00130 } else {
00131 if_index = if_idx.ifr_ifindex;
00132 }
00133 }
00134
00135 if (src_mac.length() == 2*6 + 5) {
00136 unsigned ddd[6];
00137 sscanf(src_mac.c_str(),"%x:%x:%x:%x:%x:%x", ddd, ddd+1, ddd+2, ddd+3, ddd+4,ddd+5);
00138 for (int n=0;n<6;n++) fSourceMAC[n] = ddd[n];
00139 } else {
00140
00141 struct ifreq if_mac;
00142 memset (&if_mac, 0, sizeof(struct ifreq));
00143 strncpy (if_mac.ifr_name, fIfName.c_str(), sizeof(if_mac.ifr_name));
00144 if (ioctl(fSockfd, SIOCGIFHWADDR, &if_mac) < 0) {
00145 EOUT(("Cannot get MAC address for interface %s", fIfName.c_str()));
00146 } else {
00147 memcpy(fSourceMAC, &if_mac.ifr_hwaddr.sa_data, 6);
00148 }
00149 }
00150
00151
00152
00153 memset (fSendbuf, 0, BUF_SIZ);
00154
00155 DOUT0(("Source MAC: %02X:%02X:%02X:%02X:%02X:%02X",
00156 fSourceMAC[0], fSourceMAC[1], fSourceMAC[2],
00157 fSourceMAC[3], fSourceMAC[4], fSourceMAC[5]));
00158
00159 DOUT0(("Destination MAC: %02X:%02X:%02X:%02X:%02X:%02X",
00160 fDestMAC[0], fDestMAC[1], fDestMAC[2],
00161 fDestMAC[3], fDestMAC[4], fDestMAC[5]));
00162
00163 memcpy(fSendbuf, fDestMAC, 6);
00164 memcpy(fSendbuf+6, fSourceMAC, 6);
00165
00166 fSendbuf[12] = 0xff;
00167 fSendbuf[13] = 0xff;
00168
00169
00170
00171
00172
00173 for (int jj = 14; jj<60; jj++) fSendbuf[jj] = 0x66;
00174
00175
00176
00177 fSockAddr.sll_family = PF_PACKET;
00178 fSockAddr.sll_protocol = htons(ETH_P_IP);
00180 fSockAddr.sll_ifindex = if_index;
00181
00182 fSockAddr.sll_hatype = ARPHRD_ETHER;
00183 fSockAddr.sll_pkttype = PACKET_OTHERHOST;
00184 fSockAddr.sll_halen = ETH_ALEN;
00185
00186
00187 memcpy (fSockAddr.sll_addr, fDestMAC, 6);
00188
00189 DOUT0(("Socket initialized. ifindex=%d", if_index));
00190
00191 SetSocket(fSockfd);
00192 }
00193
00194
00195 bool fasp::Transport::ReadFromSocket()
00196 {
00197 uint8_t recvbuf[BUF_SIZ];
00198
00199 size_t res = recvfrom (fSockfd, recvbuf, BUF_SIZ, 0, 0, 0);
00200
00201 struct ether_header * recvH = (struct ether_header*) recvbuf;
00202
00203 if (res != 60) return false;
00204
00205
00206
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219 for (int i=0; i<6; i++)
00220 if (recvH->ether_shost[i] != (uint8_t) fDestMAC[i])
00221
00222 return false;
00223
00224
00225
00226 uint8_t* rawbuf = ((uint8_t*) recvbuf) + 14;
00227
00228
00229 bool dummyBlock = (rawbuf[0] == 0x66) && (rawbuf[1] == 0x66) && (rawbuf[2] == 0x66);
00230 bool newBlockRecieved = false;
00231 if (!dummyBlock)
00232 for (int i=0; i<46; i++) {
00233 if ((unsigned) recvbuf[14+i] != (unsigned) fLastRecvedBlock[i])
00234 newBlockRecieved = true;
00235 }
00236
00237
00238 if (dummyBlock || !newBlockRecieved) {
00239 rcvCounterDubl++;
00240
00241 return true;
00242 }
00243
00244 static unsigned last_id = 0;
00245
00246 unsigned id = rawbuf[FaspSyncPos] * 0x10000 + rawbuf[FaspSyncPos+1] * 0x100 + rawbuf[FaspSyncPos+2];
00247
00248 if (id == 0x666666) {
00249 printf("find dummy id - ignore\n");
00250 return true;
00251 }
00252
00253 bool strange = false;
00254 if (last_id!=0)
00255 if (id - last_id > 10000) strange = true;
00256
00257
00258
00259
00260
00261 if (strange) {
00262 printf("Ignore strange ID %u %06x previous is %u %06x \n", id, id, last_id, last_id);
00263
00264 printf("packet %6u of size %4d dest:%02X:%02X:%02X:%02X:%02X:%02X src:%02X:%02X:%02X:%02X:%02X:%02X %02X %02X %02X %02X %02X %02X \n", id, (int) res,
00265 (unsigned) recvbuf[0], (unsigned) recvbuf[1], (unsigned) recvbuf[2], (unsigned) recvbuf[3], (unsigned) recvbuf[4], (unsigned) recvbuf[5],
00266 (unsigned) recvbuf[6], (unsigned) recvbuf[7], (unsigned) recvbuf[8], (unsigned) recvbuf[9], (unsigned) recvbuf[10], (unsigned) recvbuf[11],
00267
00268 (unsigned) recvbuf[48], (unsigned) recvbuf[49], (unsigned) recvbuf[50], (unsigned) recvbuf[51], (unsigned) recvbuf[52], (unsigned) recvbuf[53]
00269 );
00270
00271 for (int n=0;n<44;n++)
00272 printf("%02x ", rawbuf[n]);
00273 printf("\n");
00274
00275 return true;
00276 }
00277 last_id = id;
00278
00279
00280 memcpy(fLastRecvedBlock, recvbuf + 14, 46);
00281
00282
00283
00284
00285
00286
00287
00288
00289 if ((fRawSync != DummySync) && (fRawSync != id))
00290 CloseCurrentEvent();
00291
00292 if (fDataPtr.null() || (fDataPtr.fullsize() < FaspBlockSize)) {
00293 EOUT(("No buffer when data is received"));
00294 return true;
00295 }
00296
00297 if (fRawSync == DummySync) {
00298 fRawSync = id;
00299 }
00300
00301 if (fRawSync != id) {
00302 EOUT(("Something went wrong"));
00303 return true;
00304 }
00305
00306 fDataPtr.copyfrom(fLastRecvedBlock, FaspBlockSize);
00307
00308 fDataPtr.shift(FaspBlockSize);
00309
00310 if (fPlainData) CloseCurrentEvent();
00311
00312 if (fDataPtr.fullsize() < 2* FaspBlockSize)
00313 FlushBuffer();
00314
00315 return true;
00316 }
00317
00318 void fasp::Transport::WriteToSocket()
00319 {
00320
00321 if (sendto(fSockfd, fSendbuf, 60, 0, (struct sockaddr*)&fSockAddr, sizeof(struct sockaddr_ll)) < 0) {
00322
00323 EOUT(("Package send failed"));
00324 } else {
00325
00326 fLastSendTime.GetNow();
00327 }
00328 }
00329
00330 void fasp::Transport::ProcessEvent(const dabc::EventId& evnt)
00331 {
00332 switch (evnt.GetCode()) {
00333 case evntSocketRead: {
00334 if (ReadFromSocket() || (fLastSendTime.SpentTillNow() > fSendDelay))
00335 WriteToSocket();
00336 SetDoingInput(true);
00337 break;
00338 }
00339
00340 default:
00341 dabc::SocketWorker::ProcessEvent(evnt);
00342 break;
00343 }
00344 }
00345
00346 double fasp::Transport::ProcessTimeout(double lastdiff)
00347 {
00348 if (fFlushTimeout > 0) {
00349 double spent = fLastFlushTime.SpentTillNow();
00350 if (spent > fFlushTimeout)
00351 FlushBuffer(spent > fFlushTimeout*10);
00352 }
00353
00354 if (fLastSendTime.SpentTillNow() > fSendDelay)
00355 WriteToSocket();
00356
00357 double min = 0.1;
00358 if ((fSendDelay>0) && (fSendDelay < min)) min = fSendDelay;
00359 return min;
00360 }
00361
00362 void fasp::Transport::CloseCurrentEvent()
00363 {
00364 unsigned fullsize = fHdrPtr.distance_to(fDataPtr);
00365 mbs::EventHeader ev;
00366 ev.Init(fRawSync);
00367 ev.SetFullSize(fullsize);
00368
00369
00370
00371 mbs::SubeventHeader sub;
00372 sub.Init(0, roc::proc_FASP, 0);
00373 sub.SetFullSize(fullsize - sizeof(ev));
00374
00375 if (fHdrPtr.fullsize() < 30) {
00376 EOUT(("Something went wrong evid = %u hdrptr size = %u", fRawSync, fHdrPtr.fullsize()));
00377 }
00378
00379 fHdrPtr.copyfrom(&ev, sizeof(ev));
00380 fHdrPtr.shift(sizeof(ev));
00381 fHdrPtr.copyfrom(&sub, sizeof(sub));
00382 fHdrPtr.shift(fullsize - sizeof(ev));
00383 fRawSync = DummySync;
00384
00385
00386
00387 fDataPtr = fHdrPtr;
00388 fDataPtr.shift(sizeof(mbs::EventHeader) + sizeof(mbs::SubeventHeader));
00389
00390
00391 if (fDataPtr.fullsize() < FaspBlockSize) {
00392 EOUT(("No data for new block"));
00393 }
00394
00395 }
00396
00397
00398 void fasp::Transport::FlushBuffer(bool force)
00399 {
00400 dabc::Buffer newbuf;
00401 unsigned newbufused(0);
00402 unsigned newsyncid(DummySync);
00403
00404 if (fRawSync != DummySync) {
00405
00406 if (fHdrPtr.fullsize() == fTgtBuf.GetTotalSize()) {
00407
00408 if (!force) return;
00409 DOUT0(("Very special case - closing event when events starts from buffer begin and must be flushed"));
00410 CloseCurrentEvent();
00411 } else {
00412
00413
00414
00415
00416 newbuf = fPool.TakeBufferReq(this, fBufferSize);
00417 dabc::Pointer new_ptr = newbuf.GetPointer();
00418 newbufused = fHdrPtr.distance_to(fDataPtr);
00419 new_ptr.copyfrom(fHdrPtr, newbufused);
00420
00421 newsyncid = fRawSync;
00422
00423 fRawSync = DummySync;
00424 }
00425 }
00426
00427 if (fRawSync != DummySync) {
00428 EOUT(("Something went wrong"));
00429 return;
00430 }
00431
00432 if (!fTgtBuf.null() && (fHdrPtr.fullsize() < fTgtBuf.GetTotalSize())) {
00433 fTgtBuf.SetTotalSize(fTgtBuf.GetTotalSize() - fHdrPtr.fullsize());
00434 fTgtBuf.SetTypeId(mbs::mbt_MbsEvents);
00435
00436
00437
00438
00439
00440
00441 bool dropbuffer = false;
00442
00443 dabc::Buffer temp = fTgtBuf.HandOver();
00444 {
00445 dabc::LockGuard lock(fQueueMutex);
00446 if (fQueue.Full()) {
00447 DOUT0(("Output queue is full - drop data"));
00448 dropbuffer = true;
00449 } else {
00450 fQueue.Push(temp);
00451 }
00452 }
00453
00454 if (dropbuffer) temp.Release();
00455 else FirePortInput();
00456 }
00457
00458 fLastFlushTime.GetNow();
00459
00460 if (newbuf.null())
00461 fTgtBuf = fPool.TakeBufferReq(this, fBufferSize);
00462 else
00463 fTgtBuf = newbuf.HandOver();
00464
00465 fHdrPtr = fTgtBuf.GetPointer();
00466 fDataPtr = fHdrPtr;
00467
00468 if ((newbufused>0) && (newsyncid!=DummySync)) {
00469 fDataPtr.shift(newbufused);
00470 fRawSync = newsyncid;
00471 } else {
00472 fDataPtr.shift(sizeof(mbs::EventHeader) + sizeof(mbs::SubeventHeader));
00473 }
00474 }
00475
00476 bool fasp::Transport::Recv(dabc::Buffer& buf)
00477 {
00478 {
00479 dabc::LockGuard lock(fQueueMutex);
00480 if (fQueue.Size() <= 0) return false;
00481
00482 fQueue.PopBuffer(buf);
00483 }
00484 return !buf.null();
00485 }
00486
00487 unsigned fasp::Transport::RecvQueueSize() const
00488 {
00489 dabc::LockGuard guard(fQueueMutex);
00490 DOUT5(("%s Transport::RecvQueueSize()=%d",GetName(),fQueue.Size()));
00491 return fQueue.Size();
00492 }
00493
00494 dabc::Buffer& fasp::Transport::RecvBuffer(unsigned indx) const
00495 {
00496 dabc::LockGuard lock(fQueueMutex);
00497 DOUT5(("%s Transport::RecvBuffer %d ",GetName(),indx));
00498 return fQueue.ItemRef(indx);
00499 }
00500
00501 bool fasp::Transport::ProcessPoolRequest()
00502 {
00503 return true;
00504 }
00505
00506 void fasp::Transport::StartTransport()
00507 {
00508 DOUT3(("Starting FASP transport"));
00509
00510 if (!IsRawSocket()) return;
00511
00512 SetDoingInput(true);
00513
00514 fLastFlushTime.GetNow();
00515
00516 fTgtBuf = fPool.TakeBufferReq(this, fBufferSize);
00517 fHdrPtr = fTgtBuf.GetPointer();
00518 fDataPtr = fHdrPtr; fDataPtr.shift(sizeof(mbs::EventHeader) + sizeof(mbs::SubeventHeader));
00519
00520 fRawSync = DummySync;
00521
00522 DOUT0(("Starting transport - we have hdrptr size = %u", fHdrPtr.fullsize()));
00523
00524
00525 if(!ActivateTimeout(fFlushTimeout > 0. ? fFlushTimeout : 3.))
00526 EOUT(("could not activate timeout in FASP transport"));
00527 }
00528
00529 void fasp::Transport::StopTransport()
00530 {
00531 DOUT0(("Stopping FASP transport. New packages: %d, Duplicating: %d", rcvCounterNew, rcvCounterDubl));
00532 }
00533
00534
00535
00536
00537
00538
00539
00540
00541
00542
00543
00544
00545
00546
00547
00548
00549
00550
00551
00552 extern "C" void RunFaspTest()
00553 {
00554 dabc::mgr.CreateMemoryPool("Pool", 50000, 50);
00555
00556 dabc::mgr.CreateModule("mbs::TransmitterModule", "FaspTest", "ModuleThrd");
00557
00558 dabc::mgr.CreateTransport("FaspTest/Input", "fasp::Transport", "TransportThrd");
00559
00560
00561
00562 dabc::mgr.CreateTransport("FaspTest/Output", mbs::typeServerTransport, "MbsTransport");
00563
00564
00565 }