• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

plugin/roc/UdpTransport.h (r4864/r4135)

Go to the documentation of this file.
00001 /********************************************************************
00002  * The Data Acquisition Backbone Core (DABC)
00003  ********************************************************************
00004  * Copyright (C) 2009-
00005  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH
00006  * Planckstr. 1
00007  * 64291 Darmstadt
00008  * Germany
00009  * Contact:  http://dabc.gsi.de
00010  ********************************************************************
00011  * This software can be used under the GPL license agreements as stated
00012  * in LICENSE.txt file which is part of the distribution.
00013  ********************************************************************/
00014 
00015 #ifndef ROC_UDPTRANSPORT_H
00016 #define ROC_UDPTRANSPORT_H
00017 
00018 #ifndef DABC_SocketThread
00019 #include "dabc/SocketThread.h"
00020 #endif
00021 
00022 #ifndef DABC_Transport
00023 #include "dabc/Transport.h"
00024 #endif
00025 
00026 #ifndef DABC_BuffersQueue
00027 #include "dabc/BuffersQueue.h"
00028 #endif
00029 
00030 #ifndef DABC_MemoryPool
00031 #include "dabc/MemoryPool.h"
00032 #endif
00033 
00034 #ifndef ROC_UdpBoard
00035 #include "roc/UdpBoard.h"
00036 #endif
00037 
00038 #include <list>
00039 
00040 namespace roc {
00041 
00042    class UdpDevice;
00043 
00044    // TODO: in DABC2 data socket should be inherited from dabc::SocketWorker
00045    //       or one should reorganize event loop to inherit from dabc::SocketIOWorker
00046 
00047    class UdpDataSocket : public dabc::SocketWorker,
00048                          public dabc::Transport,
00049                          protected dabc::MemoryPoolRequester {
00050       friend class UdpDevice;
00051 
00052       DABC_TRANSPORT(dabc::SocketWorker)
00053 
00054       protected:
00055          enum EUdpEvents { evntStartTransport = evntSocketLast + 1,
00056                            evntStopTransport,
00057                            evntConfirmCmd,
00058                            evntFillBuffer };
00059 
00060          enum EDaqState { daqInit,        // initial state
00061                           daqStarting,    // daq runs, but no any start daq message was seen
00062                           daqRuns,        // normal working mode
00063                           daqStopping,    // we did poke(stop_daq) and just waiting that command is performed
00064                           daqFails        // error state
00065                         };
00066 
00067          struct ResendInfo {
00068 
00069             uint32_t        pktid;    // id of packet
00070             dabc::TimeStamp lasttm;   // when request was send last time
00071             int             numtry;   // how many requests was send
00072             dabc::Buffer*   buf;      // buffer pointer
00073             unsigned        bufindx;  // index of buffer in the queue
00074             char*           ptr;      // target location
00075 
00076             ResendInfo(unsigned id = 0) :
00077                pktid(id),
00078                lasttm(),
00079                numtry(0),
00080                buf(0),
00081                bufindx(0),
00082                ptr(0) {}
00083 
00084             ResendInfo(const ResendInfo& src) :
00085                pktid(src.pktid),
00086                lasttm(src.lasttm),
00087                numtry(src.numtry),
00088                buf(src.buf),
00089                bufindx(src.bufindx),
00090                ptr(src.ptr) {}
00091 
00092          };
00093 
00094 
00095          // FIXME: one should not use pointers, only while device and transport are in same thread, one can do this
00096          UdpDevice*         fDev;
00097          dabc::Mutex        fQueueMutex;
00098          dabc::BuffersQueue fQueue;
00099 
00100          unsigned           fReadyBuffers; // how many buffers in queue can be provided to user
00101          unsigned           fTransportBuffers; // how many buffers can be used for transport
00102 
00103          dabc::Buffer*      fTgtBuf;   // pointer on buffer, where next portion can be received, use pointer while it is buffer from the queue
00104          unsigned           fTgtBufIndx; // queue index of target buffer - use fQueue.Item(fReadyBuffers + fTgtBufIndx)
00105          unsigned           fTgtShift;     // current shift in the buffer
00106          unsigned           fTgtZeroShift; // initial shift in the buffer
00107          UdpDataPacket      fTgtHdr;   // place where data header should be received
00108          char*              fTgtPtr;   // location where next data should be received
00109          uint32_t           fTgtNextId; // expected id of next packet
00110          uint32_t           fTgtTailId; // first id of packet which can not be dropped on ROC side
00111          bool               fTgtCheckGap;  // true if one should check gaps
00112 
00113          char               fTempBuf[2000]; // buffer to recv packet when no regular place is available
00114 
00115          dabc::Queue<ResendInfo>  fResend;
00116 
00117          EDaqState          daqState;
00118          bool               daqCheckStop;
00119 
00120          UdpDataPacketFull  fRecvBuf;
00121 
00122          unsigned           fBufferSize;
00123          unsigned           fTransferWindow;
00124 
00125 
00126          dabc::MemoryPoolRef fPool;  // reference on the pool, reference should help us to preserve pool as long as we are using it
00127 
00128          dabc::TimeStamp    lastRequestTm;   // time when last request was send
00129          bool               lastRequestSeen; // indicate if we seen already reply on last request
00130          uint32_t           lastRequestId;   // last request id
00131          uint32_t           lastSendFrontId; // last send id of front packet
00132 
00133          unsigned           rocNumber;
00134 
00135          uint64_t           fTotalRecvPacket;
00136          uint64_t           fTotalResubmPacket;
00137 
00138          double             fFlushTimeout;  // after such timeout partially filled packed will be delivered
00139          dabc::TimeStamp    fLastDelivery;  // time of last delivery
00140 
00141          base::OperList     fStartList;
00142 
00143          int                fFormat;
00144 
00145          bool               fMbsHeader;    // inserts dummy MBS header in the beginning
00146          int                fBufCounter;   // counter used in MBS header
00147 
00148          inline bool daqActive() const { return (fDev!=0) && (daqState == daqRuns); }
00149 
00150          virtual bool ReplyCommand(dabc::Command cmd);
00151 
00152          virtual void ProcessPoolChanged(dabc::MemoryPool* pool) {}
00153 
00154          virtual bool ProcessPoolRequest();
00155 
00156          virtual double ProcessTimeout(double last_diff);
00157 
00158          void AddBuffersToQueue(bool checkanyway = true);
00159          bool CheckNextRequest(bool check_retrans = true);
00160 
00161          void AddDataPacket(int len, void* tgt);
00162          void CompressBuffer(dabc::Buffer& buf);
00163          void CheckReadyBuffers(bool doflush = false);
00164 
00165          void FinishTgtBuffer();
00166 
00167          void ResetDaq();
00168 
00169          bool prepareForSuspend();
00170 
00171          virtual int GetParameter(const char* name);
00172 
00173          void setFlushTimeout(double tmout) { fFlushTimeout = tmout; }
00174          double getFlushTimeout() const {  return fFlushTimeout; }
00175 
00176       public:
00177          UdpDataSocket(UdpDevice* dev, dabc::Reference port, dabc::Command cmd, int fd, int fmt);
00178          virtual ~UdpDataSocket();
00179 
00180          virtual void ProcessEvent(const dabc::EventId&);
00181 
00182          virtual bool ProvidesInput() { return true; }
00183          virtual bool ProvidesOutput() { return false; }
00184 
00185          virtual bool Recv(dabc::Buffer& buf);
00186          virtual unsigned  RecvQueueSize() const;
00187          virtual dabc::Buffer& RecvBuffer(unsigned indx) const;
00188          virtual bool Send(const dabc::Buffer& buf) { return false; }
00189          virtual unsigned SendQueueSize() { return 0; }
00190          virtual unsigned MaxSendSegments() { return 0; }
00191 
00192          virtual void StartTransport();
00193          virtual void StopTransport();
00194    };
00195 
00196 }
00197 
00198 #endif

Generated on Tue Dec 10 2013 04:52:23 for ROCsoft by  doxygen 1.7.1