Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #ifndef ROC_UDPTRANSPORT_H
00016 #define ROC_UDPTRANSPORT_H
00017
00018 #ifndef DABC_SocketThread
00019 #include "dabc/SocketThread.h"
00020 #endif
00021
00022 #ifndef DABC_Transport
00023 #include "dabc/Transport.h"
00024 #endif
00025
00026 #ifndef DABC_BuffersQueue
00027 #include "dabc/BuffersQueue.h"
00028 #endif
00029
00030 #ifndef DABC_MemoryPool
00031 #include "dabc/MemoryPool.h"
00032 #endif
00033
00034 #ifndef ROC_UdpBoard
00035 #include "roc/UdpBoard.h"
00036 #endif
00037
00038 #include <list>
00039
00040 namespace roc {
00041
00042 class UdpDevice;
00043
00044
00045
00046
00047 class UdpDataSocket : public dabc::SocketWorker,
00048 public dabc::Transport,
00049 protected dabc::MemoryPoolRequester {
00050 friend class UdpDevice;
00051
00052 DABC_TRANSPORT(dabc::SocketWorker)
00053
00054 protected:
00055 enum EUdpEvents { evntStartTransport = evntSocketLast + 1,
00056 evntStopTransport,
00057 evntConfirmCmd,
00058 evntFillBuffer };
00059
00060 enum EDaqState { daqInit,
00061 daqStarting,
00062 daqRuns,
00063 daqStopping,
00064 daqFails
00065 };
00066
00067 struct ResendInfo {
00068
00069 uint32_t pktid;
00070 dabc::TimeStamp lasttm;
00071 int numtry;
00072 dabc::Buffer* buf;
00073 unsigned bufindx;
00074 char* ptr;
00075
00076 ResendInfo(unsigned id = 0) :
00077 pktid(id),
00078 lasttm(),
00079 numtry(0),
00080 buf(0),
00081 bufindx(0),
00082 ptr(0) {}
00083
00084 ResendInfo(const ResendInfo& src) :
00085 pktid(src.pktid),
00086 lasttm(src.lasttm),
00087 numtry(src.numtry),
00088 buf(src.buf),
00089 bufindx(src.bufindx),
00090 ptr(src.ptr) {}
00091
00092 };
00093
00094
00095
00096 UdpDevice* fDev;
00097 dabc::Mutex fQueueMutex;
00098 dabc::BuffersQueue fQueue;
00099
00100 unsigned fReadyBuffers;
00101 unsigned fTransportBuffers;
00102
00103 dabc::Buffer* fTgtBuf;
00104 unsigned fTgtBufIndx;
00105 unsigned fTgtShift;
00106 unsigned fTgtZeroShift;
00107 UdpDataPacket fTgtHdr;
00108 char* fTgtPtr;
00109 uint32_t fTgtNextId;
00110 uint32_t fTgtTailId;
00111 bool fTgtCheckGap;
00112
00113 char fTempBuf[2000];
00114
00115 dabc::Queue<ResendInfo> fResend;
00116
00117 EDaqState daqState;
00118 bool daqCheckStop;
00119
00120 UdpDataPacketFull fRecvBuf;
00121
00122 unsigned fBufferSize;
00123 unsigned fTransferWindow;
00124
00125
00126 dabc::MemoryPoolRef fPool;
00127
00128 dabc::TimeStamp lastRequestTm;
00129 bool lastRequestSeen;
00130 uint32_t lastRequestId;
00131 uint32_t lastSendFrontId;
00132
00133 unsigned rocNumber;
00134
00135 uint64_t fTotalRecvPacket;
00136 uint64_t fTotalResubmPacket;
00137
00138 double fFlushTimeout;
00139 dabc::TimeStamp fLastDelivery;
00140
00141 base::OperList fStartList;
00142
00143 int fFormat;
00144
00145 bool fMbsHeader;
00146 int fBufCounter;
00147
00148 inline bool daqActive() const { return (fDev!=0) && (daqState == daqRuns); }
00149
00150 virtual bool ReplyCommand(dabc::Command cmd);
00151
00152 virtual void ProcessPoolChanged(dabc::MemoryPool* pool) {}
00153
00154 virtual bool ProcessPoolRequest();
00155
00156 virtual double ProcessTimeout(double last_diff);
00157
00158 void AddBuffersToQueue(bool checkanyway = true);
00159 bool CheckNextRequest(bool check_retrans = true);
00160
00161 void AddDataPacket(int len, void* tgt);
00162 void CompressBuffer(dabc::Buffer& buf);
00163 void CheckReadyBuffers(bool doflush = false);
00164
00165 void FinishTgtBuffer();
00166
00167 void ResetDaq();
00168
00169 bool prepareForSuspend();
00170
00171 virtual int GetParameter(const char* name);
00172
00173 void setFlushTimeout(double tmout) { fFlushTimeout = tmout; }
00174 double getFlushTimeout() const { return fFlushTimeout; }
00175
00176 public:
00177 UdpDataSocket(UdpDevice* dev, dabc::Reference port, dabc::Command cmd, int fd, int fmt);
00178 virtual ~UdpDataSocket();
00179
00180 virtual void ProcessEvent(const dabc::EventId&);
00181
00182 virtual bool ProvidesInput() { return true; }
00183 virtual bool ProvidesOutput() { return false; }
00184
00185 virtual bool Recv(dabc::Buffer& buf);
00186 virtual unsigned RecvQueueSize() const;
00187 virtual dabc::Buffer& RecvBuffer(unsigned indx) const;
00188 virtual bool Send(const dabc::Buffer& buf) { return false; }
00189 virtual unsigned SendQueueSize() { return 0; }
00190 virtual unsigned MaxSendSegments() { return 0; }
00191
00192 virtual void StartTransport();
00193 virtual void StopTransport();
00194 };
00195
00196 }
00197
00198 #endif