• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

plugin/src/UdpTransport.cxx (r4864/r4135)

Go to the documentation of this file.
00001 /********************************************************************
00002  * The Data Acquisition Backbone Core (DABC)
00003  ********************************************************************
00004  * Copyright (C) 2009-
00005  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH
00006  * Planckstr. 1
00007  * 64291 Darmstadt
00008  * Germany
00009  * Contact:  http://dabc.gsi.de
00010  ********************************************************************
00011  * This software can be used under the GPL license agreements as stated
00012  * in LICENSE.txt file which is part of the distribution.
00013  ********************************************************************/
00014 
00015 #include "roc/UdpTransport.h"
00016 
00017 #include "roc/UdpDevice.h"
00018 #include "roc/Commands.h"
00019 #include "roc/defines_roc.h"
00020 #include "roc/defines_udp.h"
00021 #include "roc/Message.h"
00022 #include "roc/Iterator.h"
00023 
00024 #include "dabc/timing.h"
00025 #include "dabc/Port.h"
00026 #include "dabc/version.h"
00027 
00028 #include "mbs/MbsTypeDefs.h"
00029 
00030 #include <math.h>
00031 
00032 const unsigned UDP_DATA_SIZE = ((sizeof(roc::UdpDataPacketFull) - sizeof(roc::UdpDataPacket)) / 6) * 6;
00033 
00034 roc::UdpDataSocket::UdpDataSocket(UdpDevice* dev, dabc::Reference portref, dabc::Command cmd, int fd, int fmt) :
00035    dabc::SocketWorker(fd),
00036    dabc::Transport(portref.Ref()),
00037    dabc::MemoryPoolRequester(),
00038    fDev(dev),
00039    fQueueMutex(),
00040    fQueue(((dabc::Port*) portref())->InputQueueCapacity()),
00041    fReadyBuffers(0),
00042    fTransportBuffers(0),
00043    fTgtBuf(0),
00044    fResend(),
00045    fFormat(fmt)
00046 {
00047    // we will react on all input packets
00048    SetDoingInput(true);
00049 
00050    fTgtBufIndx = 0;
00051    fTgtShift = 0;
00052    fTgtZeroShift = 0;
00053    fTgtPtr = 0;
00054    fTgtCheckGap = false;
00055 
00056    fTgtNextId = 0;
00057    fTgtTailId = 0;
00058 
00059    fBufferSize = 0;
00060    fTransferWindow = 40;
00061 
00062    rocNumber = 0;
00063    daqState = daqInit;
00064    daqCheckStop = false;
00065 
00066    fTotalRecvPacket = 0;
00067    fTotalResubmPacket = 0;
00068 
00069    lastRequestTm.Reset();
00070 
00071    fFlushTimeout = .1;
00072    fLastDelivery.Reset();
00073 
00074    fMbsHeader = false;
00075 
00076    dabc::Port* port = (dabc::Port*) portref();
00077 
00078    fBufferSize = port->Cfg(dabc::xmlBufferSize, cmd).AsInt(16384);
00079    fTransferWindow = port->Cfg(roc::xmlTransferWindow, cmd).AsInt(fTransferWindow);
00080    fFlushTimeout = port->Cfg(dabc::xmlFlushTimeout, cmd).AsDouble(fFlushTimeout);
00081    fMbsHeader = port->Cfg("WithMbsHeader", cmd).AsBool(false);
00082    if (fMbsHeader) fTgtZeroShift = sizeof(mbs::EventHeader) + sizeof(mbs::SubeventHeader);
00083 
00084    fBufCounter = 0;
00085 
00086    // DOUT0(("fFlushTimeout = %5.1f %s", fFlushTimeout, dabc::xmlFlushTimeout));
00087 
00088    fPool = port->GetMemoryPool();
00089    fReadyBuffers = 0;
00090 
00091    fTgtCheckGap = false;
00092 
00093    // one cannot have too much resend requests
00094    fResend.Allocate(port->InputQueueCapacity() * fBufferSize / UDP_DATA_SIZE);
00095 
00096    DOUT2(("roc::UdpDataSocket:: Pool = %p buffer size = %u", fPool(), fBufferSize));
00097 
00098 }
00099 
00100 roc::UdpDataSocket::~UdpDataSocket()
00101 {
00102    ResetDaq();
00103 
00104    if (fDev) fDev->fDataCh = 0;
00105    fDev = 0;
00106 }
00107 
00108 void roc::UdpDataSocket::ProcessEvent(const dabc::EventId& evnt)
00109 {
00110    switch (evnt.GetCode()) {
00111       case evntSocketRead: {
00112 
00113          // this is required for DABC 2.0 to again enable read event generation
00114          SetDoingInput(true);
00115 
00116          void *tgt = fTgtPtr;
00117          if (tgt==0) tgt = fTempBuf;
00118 
00119          ssize_t len = DoRecvBufferHdr(&fTgtHdr, sizeof(UdpDataPacket),
00120                                         tgt, UDP_DATA_SIZE);
00121          if (len>0) {
00122             fTotalRecvPacket++;
00123 //            DOUT0(("READ Packet %d len %d", ntohl(fTgtHdr.pktid), len));
00124             AddDataPacket(len, tgt);
00125          }
00126 
00127          break;
00128       }
00129 
00130       case evntStartTransport: {
00131 
00132          // no need to do anything when daq is already running
00133          if (daqActive()) {
00134             DOUT1(("Daq active - ignore start transport?"));
00135             return;
00136          }
00137 
00138          ResetDaq();
00139 
00140          daqState = daqStarting;
00141 
00142          fStartList.clear();
00143 
00144          fStartList.addPut(ROC_NX_FIFO_RESET, 1);
00145          fStartList.addPut(ROC_NX_FIFO_RESET, 0);
00146          fStartList.addPut(ROC_ETH_START_DAQ, 1);
00147 
00148          CmdNOper cmd(&fStartList);
00149 
00150          if (fDev) fDev->Submit(Assign(cmd));
00151          break;
00152       }
00153 
00154       case evntStopTransport: {
00155          // this is situation, when normal module stops its transports
00156          // here we do put(stop_daq) ourself and just waiting confirmation that command completed
00157          // there is no yet way so suspend daq from module
00158 
00159          if (!daqActive()) {
00160             DOUT1(("Daq is not active - no need to stop transport"));
00161             return;
00162          }
00163 
00164          daqState = daqStopping;
00165 
00166          fTgtBuf = 0; // forget about buffer, it is owned by the queue
00167          fTgtBufIndx = 0;
00168          fTgtShift = 0;
00169          fTgtPtr = 0;
00170 
00171          fLastDelivery.Reset();
00172 
00173          if (fDev) fDev->Submit(Assign(CmdPut(ROC_ETH_STOP_DAQ, 1)));
00174          break;
00175       }
00176 
00177       case evntConfirmCmd: {
00178          if (evnt.GetArg() == 0) {
00179             ResetDaq();
00180             daqState = daqFails;
00181             ActivateTimeout(-1.);
00182          } else
00183          if (daqState == daqStarting) {
00184             daqState = daqRuns;
00185             fLastDelivery.GetNow();
00186             AddBuffersToQueue();
00187             ActivateTimeout(0.0001);
00188          } else
00189          if (daqState == daqStopping)  {
00190             DOUT3(("STOP DAQ command is ready - go in normal state"));
00191             ResetDaq();
00192          }
00193          break;
00194       }
00195 
00196       case evntFillBuffer:
00197          AddBuffersToQueue();
00198          break;
00199 
00200       default:
00201          dabc::SocketWorker::ProcessEvent(evnt);
00202          break;
00203    }
00204 }
00205 
00206 bool roc::UdpDataSocket::ReplyCommand(dabc::Command cmd)
00207 {
00208    int res = cmd.GetResult();
00209 
00210    DOUT3(("roc::UdpDataSocket::ReplyCommand %s res = %s ", cmd.GetName(), DBOOL(res)));
00211 
00212    FireEvent(evntConfirmCmd, res==dabc::cmd_true ? 1 : 0);
00213 
00214    return true;
00215 }
00216 
00217 bool roc::UdpDataSocket::Recv(dabc::Buffer& buf)
00218 {
00219    {
00220       dabc::LockGuard lock(fQueueMutex);
00221       if (fReadyBuffers==0) return false;
00222       if (fQueue.Size()<=0) return false;
00223 
00224    #if DABC_VERSION_CODE >= DABC_VERSION(1,9,2)
00225       fQueue.PopBuffer(buf);
00226     #else
00227       buf << fQueue.Pop();
00228     #endif
00229 
00230       fReadyBuffers--;
00231    }
00232    FireEvent(evntFillBuffer);
00233 
00234    return !buf.null();
00235 }
00236 
00237 unsigned  roc::UdpDataSocket::RecvQueueSize() const
00238 {
00239    dabc::LockGuard guard(fQueueMutex);
00240 
00241    return fReadyBuffers;
00242 }
00243 
00244 dabc::Buffer& roc::UdpDataSocket::RecvBuffer(unsigned indx) const
00245 {
00246    dabc::LockGuard lock(fQueueMutex);
00247 
00248    if (indx>=fReadyBuffers)
00249       throw dabc::Exception(dabc::format("Wrong index %u ready %u in roc::UdpDataSocket::RecvBuffer", indx, fReadyBuffers).c_str());
00250 
00251    return fQueue.ItemRef(indx);
00252 }
00253 
00254 bool roc::UdpDataSocket::ProcessPoolRequest()
00255 {
00256    FireEvent(evntFillBuffer);
00257    return true;
00258 }
00259 
00260 void roc::UdpDataSocket::StartTransport()
00261 {
00262    DOUT3(("Starting UDP transport "));
00263 
00264    FireEvent(evntStartTransport);
00265 }
00266 
00267 void roc::UdpDataSocket::StopTransport()
00268 {
00269    DOUT3(("Stopping udp transport %ld", fTotalRecvPacket));
00270 
00271    FireEvent(evntStopTransport);
00272 }
00273 
00274 void roc::UdpDataSocket::AddBuffersToQueue(bool checkanyway)
00275 {
00276    unsigned cnt = 0;
00277 
00278    {
00279       dabc::LockGuard lock(fQueueMutex);
00280       cnt = fQueue.Capacity() - fQueue.Size();
00281    }
00282 
00283    bool isanynew = false;
00284 
00285    while (cnt) {
00286       dabc::Buffer buf = fPool.TakeBufferReq(this, fBufferSize);
00287       if (buf.null()) break;
00288 
00289       fTransportBuffers++;
00290 
00291       isanynew = true;
00292       cnt--;
00293       buf.SetTypeId(fMbsHeader ? mbs::mbt_MbsEvents : roc::rbt_RawRocData + fFormat);
00294       dabc::LockGuard lock(fQueueMutex);
00295       fQueue.Push(buf);
00296 
00297       if (fTgtBuf==0) {
00298          fTgtBuf = &fQueue.Last();
00299          fTgtShift = fTgtZeroShift;
00300          fTgtPtr = (char*) fTgtBuf->SegmentPtr() + fTgtShift;
00301       }
00302 
00303    }
00304 
00305    if (isanynew || checkanyway) CheckNextRequest();
00306 }
00307 
00308 bool roc::UdpDataSocket::CheckNextRequest(bool check_retrans)
00309 {
00310    if (!daqActive()) return false;
00311 
00312    UdpDataRequestFull req;
00313    dabc::TimeStamp curr_tm = dabc::Now();
00314 
00315    // send request each 0.2 sec,
00316    // if there is no replies on last request send it much faster - every 0.01 sec.
00317    bool dosend = lastRequestTm.Expired(curr_tm, lastRequestSeen ? 0.2 : 0.01);
00318 
00319    int can_send = 0;
00320    if (fTgtBuf) {
00321       can_send += (fBufferSize - fTgtShift) / UDP_DATA_SIZE;
00322       can_send += (fTransportBuffers - fTgtBufIndx - 1) * (fBufferSize / UDP_DATA_SIZE);
00323    }
00324 
00325    if (can_send > (int) fTransferWindow) can_send = fTransferWindow;
00326 
00327    if (fResend.Size() >= fTransferWindow) can_send = 0; else
00328    if (can_send + fResend.Size() > fTransferWindow)
00329       can_send = fTransferWindow - fResend.Size();
00330 
00331    req.frontpktid = fTgtNextId + can_send;
00332 
00333    // if newly calculated front id bigger than last
00334    if ((req.frontpktid - lastSendFrontId) < 0x80000000) {
00335 
00336      if ((req.frontpktid - lastSendFrontId) >= fTransferWindow / 3) dosend = true;
00337 
00338    } else
00339       req.frontpktid = lastSendFrontId;
00340 
00341    req.tailpktid = fTgtTailId;
00342 
00343    req.numresend = 0;
00344 
00345    if (can_send==0) dosend = false;
00346 
00347    if (!check_retrans && !dosend) return false;
00348 
00349    for (unsigned n=0; n<fResend.Size(); n++) {
00350       ResendInfo* entry = fResend.ItemPtr(n);
00351 
00352       if ((entry->numtry>0) && !entry->lasttm.Expired(curr_tm, 0.1)) continue;
00353 
00354       entry->lasttm = curr_tm;
00355       entry->numtry++;
00356       if (entry->numtry < 8) {
00357          req.resend[req.numresend++] = entry->pktid;
00358 
00359          dosend = true;
00360 
00361          if (req.numresend >= sizeof(req.resend) / 4) {
00362             EOUT(("Number of resends more than one can pack in the retransmit packet"));
00363             break;
00364          }
00365 
00366       } else {
00367          EOUT(("Roc:%u Drop pkt %u\n", rocNumber, entry->pktid));
00368 
00369          fTgtCheckGap = true;
00370 
00371          memset(entry->ptr, 0, UDP_DATA_SIZE);
00372 
00373          roc::Message msg;
00374 
00375          msg.setRocNumber(rocNumber);
00376          msg.setMessageType(roc::MSG_SYS);
00377          msg.setSysMesType(roc::SYSMSG_PACKETLOST);
00378          msg.copyto(entry->ptr, fFormat);
00379 
00380          fResend.RemoveItem(n);
00381          n--;
00382       }
00383 
00384    }
00385 
00386    if (!dosend) return false;
00387 
00388    uint32_t pkt_size = sizeof(UdpDataRequest) + req.numresend * sizeof(uint32_t);
00389 
00390    // make request always 4 byte aligned
00391    while ((pkt_size < MAX_UDP_PAYLOAD) &&
00392           (pkt_size + UDP_PAYLOAD_OFFSET) % 4) pkt_size++;
00393 
00394    lastRequestTm = curr_tm;
00395    lastRequestSeen = false;
00396    lastSendFrontId = req.frontpktid;
00397    lastRequestId++;
00398 
00399 //   DOUT0(("Send request id:%u  Range: 0x%04x - 0x%04x nresend:%d resend[0] = 0x%04x tgtbuf %p ptr %p tgtsize %u",
00400 //         lastRequestId, req.tailpktid, req.frontpktid, req.numresend,
00401 //         req.numresend > 0 ? req.resend[0] : 0, fTgtBuf, fTgtPtr, fTransportBuffers));
00402 
00403    req.password = htonl(ROC_PASSWORD);
00404    req.reqpktid = htonl(lastRequestId);
00405    req.frontpktid = htonl(req.frontpktid);
00406    req.tailpktid = htonl(req.tailpktid);
00407    for (uint32_t n=0; n < req.numresend; n++)
00408       req.resend[n] = htonl(req.resend[n]);
00409    req.numresend = htonl(req.numresend);
00410 
00411    DoSendBuffer(&req, pkt_size);
00412 
00413    if (fDev && fDev->fCtrlCh) fDev->fCtrlCh->SetLastSendTime();
00414 
00415    return true;
00416 }
00417 
00418 double roc::UdpDataSocket::ProcessTimeout(double)
00419 {
00420    if (!daqActive()) return -1;
00421 
00422    if (fTgtBuf == 0)
00423       AddBuffersToQueue(true);
00424    else
00425       CheckNextRequest();
00426 
00427    // check if we should flush current buffer
00428    if (!fLastDelivery.null() && fLastDelivery.Expired(fFlushTimeout)) {
00429 //          DOUT0(("Doing flush timeout = %3.1f dist = %5.1f last = %8.6f", fFlushTimeout, fLastDelivery.SpentTillNow(), fLastDelivery.AsDouble()));
00430           CheckReadyBuffers(true);
00431    }
00432 
00433 //   DOUT0(("CALIBR tm1:%f tm2:%f %s", fLastCalibrAction, fLastCalibrStart, DBOOL(dabc::IsNullTime(fLastCalibrStart))));
00434 
00435    return 0.01;
00436 }
00437 
00438 
00439 void roc::UdpDataSocket::ResetDaq()
00440 {
00441    daqCheckStop = false;
00442    daqState = daqInit;
00443 
00444    fTransportBuffers = 0;
00445 
00446    fTgtBuf = 0;
00447    fTgtBufIndx = 0;
00448    fTgtShift = 0;
00449    fTgtPtr = 0;
00450 
00451    fTgtNextId = 0;
00452    fTgtTailId = 0;
00453    fTgtCheckGap = false;
00454 
00455    lastRequestId = 0;
00456    lastSendFrontId = 0;
00457    lastRequestTm.Reset();
00458    lastRequestSeen = true;
00459 
00460    fResend.Reset();
00461 
00462    dabc::LockGuard lock(fQueueMutex);
00463    fQueue.Cleanup();
00464    fReadyBuffers = 0;
00465 
00466    fLastDelivery.Reset();
00467 }
00468 
00469 void roc::UdpDataSocket::AddDataPacket(int len, void* tgt)
00470 {
00471    uint32_t src_pktid = ntohl(fTgtHdr.pktid);
00472 
00473    if (tgt==0) {
00474       DOUT0(("Packet 0x%04x has no place buf %p bufindx %u queue %u ready %u", src_pktid, fTgtBuf, fTgtBufIndx, fQueue.Size(), fReadyBuffers));
00475       for (unsigned n=0;n < fResend.Size(); n++)
00476          DOUT0(("   Need resend 0x%04x retry %d", fResend.ItemPtr(n)->pktid, fResend.ItemPtr(n)->numtry));
00477 
00478       CheckNextRequest();
00479 
00480       return;
00481    }
00482 
00483    if (len <= (int) sizeof(UdpDataPacket)) {
00484       EOUT(("Too few data received %d", len));
00485       return;
00486    }
00487 
00488    if (ntohl(fTgtHdr.lastreqid) == lastRequestId) lastRequestSeen = true;
00489 
00490    int nummsgs = ntohl(fTgtHdr.nummsg);
00491 
00492    uint32_t gap = src_pktid - fTgtNextId;
00493 
00494    int data_len = nummsgs * 6;
00495 
00496 //   if (gap!=0)
00497 //      DOUT0(("Packet id:0x%04x Head:0x%04x NumMsgs:%d gap:%u", src_pktid, fTgtNextId, nummsgs, gap));
00498 
00499    bool packetaccepted = false;
00500    bool doflush = false;
00501 
00502    if ((fTgtPtr==tgt) && (gap < fBufferSize / UDP_DATA_SIZE * fTransportBuffers)) {
00503 
00504       if (gap>0) {
00505          // some packets are lost on the way, move pointer forward and
00506          // remember packets which should be resubmit
00507          void* src = fTgtPtr;
00508 
00509          while (fTgtNextId != src_pktid) {
00510 
00511             ResendInfo* info = fResend.PushEmpty();
00512 
00513 //            DOUT0(("!!!! Lost packet 0x%04x", fTgtNextId));
00514 
00515             info->pktid = fTgtNextId;
00516             info->lasttm.Reset();
00517             info->numtry = 0;
00518             info->buf = fTgtBuf;
00519             info->bufindx = fTgtBufIndx;
00520             info->ptr = fTgtPtr;
00521 
00522             fTgtNextId++;
00523             fTgtShift += UDP_DATA_SIZE;
00524             fTgtPtr += UDP_DATA_SIZE;
00525 
00526             if (fTgtShift + UDP_DATA_SIZE > fTgtBuf->GetTotalSize()) {
00527 
00528                FinishTgtBuffer();
00529 
00530                if (fTgtBufIndx >= fTransportBuffers) {
00531                   EOUT(("One get packet out of the available buffer spaces gap = %u indx %u numbufs %u !!!!", gap, fTgtBufIndx, fTransportBuffers));
00532                   return;
00533                }
00534 
00535                {
00536                   dabc::LockGuard lock(fQueueMutex);
00537                   fTgtBuf = &fQueue.ItemRef(fReadyBuffers + fTgtBufIndx);
00538                }
00539 
00540                fTgtShift = fTgtZeroShift;
00541                fTgtPtr = (char*) fTgtBuf->SegmentPtr() + fTgtShift;
00542             }
00543          }
00544 
00545          // copy data which was received into the wrong place of the buffers
00546          memcpy(fTgtPtr, src, data_len);
00547 
00548 //         DOUT1(("Copy pkt 0x%04x to buffer %p shift %u", src_pktid, fTgtBuf, fTgtShift));
00549       }
00550 
00551       // from here just normal situation when next packet is arrived
00552 
00553       if (fResend.Size()==0) fTgtTailId = fTgtNextId;
00554 
00555       fTgtNextId++;
00556 
00557       fTgtShift += data_len;
00558       fTgtPtr += data_len;
00559 
00560       if (fTgtBuf->GetTotalSize() < fTgtShift + UDP_DATA_SIZE) {
00561          FinishTgtBuffer();
00562       }
00563 
00564       packetaccepted = true;
00565 
00566    } else {
00567       // this is retransmitted packet, may be received in temporary place
00568       for (unsigned n=0; n<fResend.Size(); n++) {
00569          ResendInfo* entry = fResend.ItemPtr(n);
00570          if (entry->pktid != src_pktid) continue;
00571 
00572          DOUT3(("Get retransmitted packet 0x%04x", src_pktid));
00573 
00574          fTotalResubmPacket++;
00575 
00576          memcpy(entry->ptr, tgt, data_len);
00577          if (data_len < (int) UDP_DATA_SIZE) {
00578             void* restptr = (char*) entry->ptr + data_len;
00579             memset(restptr, 0, UDP_DATA_SIZE - data_len);
00580             fTgtCheckGap = true;
00581          }
00582 
00583          fResend.RemoveItem(n);
00584 
00585          // if all packets retransmitted, one can allow to skip buffers on roc,
00586          // beside next packet, which is required
00587          if (fResend.Size()==0) fTgtTailId = fTgtNextId - 1;
00588 
00589          packetaccepted = true;
00590 
00591          break;
00592       }
00593 
00594    }
00595 
00596    if (!packetaccepted) {
00597       DOUT3(("ROC:%u Packet 0x%04x was not accepted - FLUSH???  ready = %u transport = %u tgtindx = %u buf %p", fDev->rocNumber(), src_pktid, fReadyBuffers, fTransportBuffers, fTgtBufIndx, fTgtBuf));
00598 //      dabc::SetDebugLevel(1);
00599    }
00600 
00601    // check incoming data for stop/start messages
00602    if (packetaccepted && (data_len>0) && (tgt!=0) && daqCheckStop) {
00603 //      DOUT0(("Search special kind of message !!!"));
00604 
00605       Iterator iter(fFormat);
00606       iter.assign(tgt, data_len);
00607 
00608       while (iter.next())
00609          if (iter.msg().isStopDaqMsg()) {
00610             DOUT2(("Find STOP_DAQ message"));
00611             doflush = true;
00612          }
00613    }
00614 
00615    CheckReadyBuffers(doflush);
00616 }
00617 
00618 void roc::UdpDataSocket::CompressBuffer(dabc::Buffer& buf)
00619 {
00620    if (buf.null()) return;
00621 
00622    Iterator iter(fFormat);
00623    if (!iter.assign(buf.SegmentPtr(), buf.SegmentSize())) return;
00624 
00625    uint8_t* tgt = (uint8_t*) buf.SegmentPtr();
00626    uint8_t* src = tgt;
00627 
00628    uint32_t rawsize = roc::Message::RawSize(fFormat);
00629    dabc::BufferSize_t tgtsize = 0;
00630 
00631    while (iter.next()) {
00632       if (iter.msg().isNopMsg()) {
00633          src += rawsize;
00634       } else {
00635          if (tgt!=src) memcpy(tgt, src, rawsize);
00636          src += rawsize;
00637          tgt += rawsize;
00638          tgtsize += rawsize;
00639       }
00640    }
00641 
00642    if (tgtsize==0)
00643       EOUT(("Zero size after compress !!!"));
00644 
00645    buf.SetTotalSize(tgtsize);
00646 }
00647 
00648 void roc::UdpDataSocket::FinishTgtBuffer()
00649 {
00650    if (fTgtBuf==0) {
00651       EOUT(("Internal failure!!!"));
00652       exit(765);
00653    }
00654 
00655    fTgtBuf->SetTotalSize(fTgtShift);
00656 
00657    if (fMbsHeader) {
00658 
00659       // add MBS header when specified
00660 
00661       mbs::EventHeader* evhdr = (mbs::EventHeader*) fTgtBuf->SegmentPtr();
00662       evhdr->Init(fBufCounter++);
00663       evhdr->SetFullSize(fTgtShift);
00664 
00665       mbs::SubeventHeader* subhdr = evhdr->SubEvents();
00666       subhdr->Init(rocNumber, roc::proc_RawData, fFormat);
00667       subhdr->SetFullSize(fTgtShift - sizeof(mbs::EventHeader));
00668    }
00669 
00670    fTgtPtr = 0;
00671    fTgtShift = 0;
00672    fTgtBuf = 0;
00673    fTgtBufIndx++;
00674 }
00675 
00676 
00677 void roc::UdpDataSocket::CheckReadyBuffers(bool doflush)
00678 {
00679 //   if (doflush) DOUT0(("doing flush %d", rocNumber));
00680 
00681    if (doflush && (fTgtBuf!=0) && (fTgtShift>fTgtZeroShift) && (fResend.Size()==0)) {
00682       DOUT2(("Flush buffer when recv of size %u", fTgtShift));
00683       FinishTgtBuffer();
00684    }
00685 
00686    if (fTgtBufIndx>0) {
00687       unsigned minindx = fTgtBufIndx;
00688 
00689       for (unsigned n=0; n<fResend.Size(); n++) {
00690          unsigned indx = fResend.ItemPtr(n)->bufindx;
00691          if (indx < minindx) minindx = indx;
00692       }
00693 
00694 //      DOUT0(("CheckReadyBuffers minindx = %u resend = %u", minindx, fResend.Size()));
00695 
00696       if (minindx>0) {
00697 
00698          fTransportBuffers -= minindx;
00699          fTgtBufIndx -= minindx;
00700          for (unsigned n=0; n<fResend.Size(); n++)
00701             fResend.ItemPtr(n)->bufindx -= minindx;
00702 
00703          {
00704             dabc::LockGuard lock(fQueueMutex);
00705 
00706             // check all buffers on gaps, if necessary
00707             if (fTgtCheckGap)
00708                for (unsigned n=0;n<minindx;n++)
00709                   CompressBuffer(fQueue.ItemRef(fReadyBuffers + n));
00710 
00711             fReadyBuffers += minindx;
00712          }
00713 
00714          while (minindx--) FirePortInput();
00715 
00716 //         DOUT0(("!!!!!!!!!!!!!!!! Do outptut !!!!!!!!!!!!!!!"));
00717 
00718          fLastDelivery.GetNow();
00719       }
00720    }
00721 
00722    if ((fTgtBuf==0) && (fTgtBufIndx<fTransportBuffers)) {
00723       dabc::LockGuard lock(fQueueMutex);
00724       fTgtBuf = &fQueue.ItemRef(fReadyBuffers + fTgtBufIndx);
00725       fTgtShift = fTgtZeroShift;
00726       fTgtPtr = (char*) fTgtBuf->SegmentPtr() + fTgtShift;
00727       // one can disable checks once we have no data in queues at all
00728       if ((fTgtBufIndx==0) && (fResend.Size()==0)) {
00729 //         if (fTgtCheckGap) DOUT0(("!!! DISABLE COMPRESS !!!"));
00730          fTgtCheckGap = false;
00731       }
00732    }
00733 
00734    if (fTgtBuf == 0)
00735       AddBuffersToQueue();
00736    else
00737       CheckNextRequest();
00738 }
00739 
00740 bool roc::UdpDataSocket::prepareForSuspend()
00741 {
00742    if (!daqActive()) return false;
00743    daqCheckStop = true;
00744    return true;
00745 }
00746 
00747 int roc::UdpDataSocket::GetParameter(const char* name)
00748 {
00749    if ((strcmp(name, roc::xmlRocNumber)==0) && fDev) return fDev->rocNumber();
00750    if (strcmp(name, roc::xmlMsgFormat)==0) return fFormat;
00751    if (strcmp(name, roc::xmlTransportKind)==0) return roc::kind_UDP;
00752 
00753    return dabc::Transport::GetParameter(name);
00754 }

Generated on Tue Dec 10 2013 04:52:23 for ROCsoft by  doxygen 1.7.1