Go to the documentation of this file.00001 #include "roc/SplitterModule.h"
00002
00003 #include "dabc/Buffer.h"
00004 #include "dabc/Pointer.h"
00005 #include "dabc/Port.h"
00006 #include "dabc/Parameter.h"
00007 #include "dabc/Manager.h"
00008 #include "dabc/Command.h"
00009 #include "dabc/PoolHandle.h"
00010
00011 #include "roc/Board.h"
00012 #include "roc/Iterator.h"
00013 #include "roc/Commands.h"
00014
00015
00016 roc::SplitterModule::SplitterModule(const char* name, dabc::Command cmd) :
00017 dabc::ModuleAsync(name, cmd)
00018 {
00019 int numoutputs = Cfg(dabc::xmlNumOutputs, cmd).AsInt(2);
00020 std::string poolname = Cfg(dabc::xmlPoolName, cmd).AsStdStr(roc::xmlRocPool);
00021
00022 fFlushTime = Cfg(dabc::xmlFlushTimeout, cmd).AsDouble(0.3);
00023 bool sdebug = cmd.GetBool("SplitterDebug", false);
00024
00025 CreatePoolHandle(poolname.c_str());
00026
00027 CreatePar("InpData").SetRatemeter(false, 3.).SetDebugOutput(sdebug);
00028 CreatePar("OutData").SetRatemeter(false, 3.).SetDebugOutput(sdebug);
00029
00030 CreateInput("Input", Pool(), Cfg(dabc::xmlInputQueueSize, cmd).AsInt(10));
00031
00032 Input()->SetInpRateMeter(Par("InpData"));
00033
00034 for(int n=0; n<numoutputs; n++) {
00035 CreateOutput(FORMAT(("Output%d", n)), Pool(), Cfg(dabc::xmlOutputQueueSize, cmd).AsInt(10));
00036 Output(n)->SetOutRateMeter(Par("OutData"));
00037 }
00038
00039 CreateTimer("FlushTimer", (fFlushTime>0 && fFlushTime<0.1) ? fFlushTime : 0.1);
00040
00041 DOUT0(("Splitter module created: name:%s numout:%d flush:%4.2fs", GetName(), numoutputs, fFlushTime));
00042 }
00043
00044 roc::SplitterModule::~SplitterModule()
00045 {
00046 }
00047
00048 void roc::SplitterModule::BeforeModuleStart()
00049 {
00050 }
00051
00052 void roc::SplitterModule::ProcessInputEvent(dabc::Port* port)
00053 {
00054 while (FlushNextBuffer());
00055 }
00056
00057 void roc::SplitterModule::ProcessOutputEvent(dabc::Port* port)
00058 {
00059 while (FlushNextBuffer());
00060 }
00061
00062 void roc::SplitterModule::ProcessTimerEvent(dabc::Timer* timer)
00063 {
00064
00065
00066
00067
00068 CheckBuffersFlush();
00069 }
00070
00071 int roc::SplitterModule::ExecuteCommand(dabc::Command cmd)
00072 {
00073 if (cmd.IsName("AddROC")) {
00074
00075 int rocid = cmd.GetInt("ROCID", -1);
00076
00077 if ((rocid<0) || (fMap.size()>=NumOutputs())) return dabc::cmd_false;
00078
00079 OutputRec rec;
00080 rec.nout = fMap.size();
00081
00082 DOUT0(("Splitter output %u assigned for ROC%d", rec.nout, rocid));
00083
00084 fMap[rocid] = rec;
00085 return dabc::cmd_true;
00086 }
00087
00088 return dabc::ModuleAsync::ExecuteCommand(cmd);
00089 }
00090
00091 bool roc::SplitterModule::FlushNextBuffer()
00092 {
00093 if (!CanSendToAllOutputs()) return false;
00094
00095 if (!Input()->CanRecv()) return false;
00096
00097 dabc::Buffer buf = Input()->Recv();
00098
00099 if (buf.null()) return false;
00100
00101 if ((buf.GetTypeId() < roc::rbt_RawRocData) ||
00102 (buf.GetTypeId() > roc::rbt_RawRocData + roc::formatNormal)) {
00103 buf.Release();
00104 EOUT(("Something wrong with buffer type"));
00105 return false;
00106 }
00107
00108 int fmt = (buf.GetTypeId() - roc::rbt_RawRocData);
00109
00110 if (fmt!=roc::formatOptic2) {
00111 EOUT(("Non-optic format not supported!"));
00112 buf.Release();
00113 return false;
00114 }
00115
00116 OutputMap::iterator miter;
00117 for (miter=fMap.begin(); miter != fMap.end(); miter++) {
00118 miter->second.wassent = false;
00119 }
00120 bool isanysent = false;
00121
00122
00123 uint16_t* src = (uint16_t*) buf.SegmentPtr();
00124 unsigned srclen = buf.GetTotalSize() / 8;
00125
00126
00127
00128 while (srclen>0) {
00129 miter = fMap.find(*src);
00130 if (miter == fMap.end()) {
00131 EOUT(("Non existing board number %u", *src));
00132 src+=4;
00133 srclen--;
00134 continue;
00135 }
00136
00137 if (miter->second.buf.null()) {
00138 dabc::Buffer mbuf = Pool()->TakeBuffer();
00139 if (mbuf.null()) {
00140 EOUT(("No buffer for data from %u - skip data", *src));
00141 src+=4;
00142 srclen--;
00143 continue;
00144 }
00145 mbuf.SetTypeId(buf.GetTypeId());
00146
00147 miter->second.ptr = mbuf.GetPointer();
00148 miter->second.buf << mbuf;
00149 miter->second.len = 0;
00150 }
00151
00152 miter->second.ptr.copyfrom(src, 8);
00153 miter->second.len+=8;
00154 miter->second.ptr.shift(8);
00155
00156 if (miter->second.len > miter->second.buf.GetTotalSize() - 8) {
00157 miter->second.buf.SetTotalSize(miter->second.len);
00158 Output(miter->second.nout)->Send(miter->second.buf.HandOver());
00159
00160
00161 miter->second.buf.Release();
00162 miter->second.len = 0;
00163 miter->second.ptr.reset();
00164 miter->second.lasttm = dabc::Now();
00165 miter->second.wassent = true;
00166 isanysent = true;
00167 }
00168
00169 src+=4;
00170 srclen--;
00171 }
00172
00173
00174 buf.Release();
00175
00176 CheckBuffersFlush(isanysent);
00177
00178 return true;
00179 }
00180
00181 void roc::SplitterModule::CheckBuffersFlush(bool forceunsent)
00182 {
00183 OutputMap::iterator miter;
00184
00185 dabc::TimeStamp now = dabc::Now();
00186
00187 for (miter=fMap.begin(); miter != fMap.end(); miter++) {
00188
00189
00190 bool sendanyway = forceunsent && !miter->second.wassent;
00191
00192 miter->second.wassent = false;
00193
00194
00195 if ((miter->second.len == 0) || miter->second.buf.null()) continue;
00196
00197
00198 if (!sendanyway && (miter->second.len <= miter->second.buf.GetTotalSize() - 8) &&
00199 ((fFlushTime<=0.) || ((now - miter->second.lasttm) < fFlushTime))) continue;
00200
00201
00202 if (!Output(miter->second.nout)->CanSend()) continue;
00203
00204 miter->second.buf.SetTotalSize(miter->second.len);
00205 Output(miter->second.nout)->Send(miter->second.buf.HandOver());
00206
00207
00208 miter->second.buf.Release();
00209 miter->second.len = 0;
00210 miter->second.ptr.reset();
00211 miter->second.lasttm = now;
00212 }
00213 }
00214
00215
00216 void roc::SplitterModule::AfterModuleStop()
00217 {
00218 OutputMap::iterator miter;
00219
00220 for (miter=fMap.begin(); miter != fMap.end(); miter++) {
00221 miter->second.buf.Release();
00222 miter->second.len = 0;
00223 }
00224 }
00225
00226 extern "C" void InitSplitter()
00227 {
00228 if (dabc::mgr.null()) {
00229 EOUT(("Manager is not created"));
00230 exit(1);
00231 }
00232
00233 dabc::CmdCreateDevice cmd1(roc::typeAbbDevice, "AbbDev", "AbbThrd");
00234 cmd1.SetStr(roc::xmlBoardAddr, "abb");
00235 cmd1.SetStr(roc::xmlRole, base::roleToString(base::roleDAQ));
00236
00237 if (!dabc::mgr.Execute(cmd1)) {
00238 EOUT(("Cannot create ABB device"));
00239 exit(1);
00240 }
00241
00242
00243 dabc::CmdCreateModule cmd2("roc::SplitterModule", "Splitter");
00244 cmd2.SetInt(dabc::xmlBufferSize, 128*1024);
00245 cmd2.SetInt(dabc::xmlNumOutputs, 2);
00246 cmd2.SetBool("SplitterDebug", true);
00247 if (!dabc::mgr.Execute(cmd2)) {
00248 EOUT(("Cannot create Splitter module"));
00249 exit(1);
00250 }
00251 dabc::ModuleRef m = dabc::mgr.FindModule("Splitter");
00252 if (m.null()) {
00253 EOUT(("No Splitter module"));
00254 exit(1);
00255 }
00256
00257
00258 dabc::CmdCreateTransport cmd3("Splitter/Input", "AbbDev", "AbbThrd");
00259 cmd3.SetStr(roc::xmlBoardAddr, "abb");
00260
00261 if (!dabc::mgr.Execute(cmd3)) {
00262 EOUT(("Cannot connect splitter module to ABB device"));
00263 exit(1);
00264 }
00265
00266 dabc::Command cmd4("AddROC");
00267 cmd4.SetInt("ROCID", 0);
00268 if (!m.Execute(cmd4)) {
00269 EOUT(("Cannot ADD ROCID 0 to splitter"));
00270 exit(1);
00271 }
00272
00273 dabc::Command cmd5("AddROC");
00274 cmd5.SetInt("ROCID", 1);
00275 if (!m.Execute(cmd5)) {
00276 EOUT(("Cannot ADD ROCID 1 to splitter"));
00277 exit(1);
00278 }
00279
00280 }
00281
00282
00283