• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

abbplugin/src/SplitterModule.cxx (r4864/r3193)

Go to the documentation of this file.
00001 #include "roc/SplitterModule.h"
00002 
00003 #include "dabc/Buffer.h"
00004 #include "dabc/Pointer.h"
00005 #include "dabc/Port.h"
00006 #include "dabc/Parameter.h"
00007 #include "dabc/Manager.h"
00008 #include "dabc/Command.h"
00009 #include "dabc/PoolHandle.h"
00010 
00011 #include "roc/Board.h"
00012 #include "roc/Iterator.h"
00013 #include "roc/Commands.h"
00014 
00015 
00016 roc::SplitterModule::SplitterModule(const char* name, dabc::Command cmd) :
00017    dabc::ModuleAsync(name, cmd)
00018 {
00019    int numoutputs = Cfg(dabc::xmlNumOutputs, cmd).AsInt(2);
00020    std::string poolname = Cfg(dabc::xmlPoolName, cmd).AsStdStr(roc::xmlRocPool);
00021 
00022    fFlushTime = Cfg(dabc::xmlFlushTimeout, cmd).AsDouble(0.3);
00023    bool sdebug = cmd.GetBool("SplitterDebug", false);
00024 
00025    CreatePoolHandle(poolname.c_str());
00026 
00027    CreatePar("InpData").SetRatemeter(false, 3.).SetDebugOutput(sdebug);
00028    CreatePar("OutData").SetRatemeter(false, 3.).SetDebugOutput(sdebug);
00029 
00030    CreateInput("Input", Pool(), Cfg(dabc::xmlInputQueueSize, cmd).AsInt(10));
00031 
00032    Input()->SetInpRateMeter(Par("InpData"));
00033 
00034    for(int n=0; n<numoutputs; n++) {
00035       CreateOutput(FORMAT(("Output%d", n)), Pool(), Cfg(dabc::xmlOutputQueueSize, cmd).AsInt(10));
00036       Output(n)->SetOutRateMeter(Par("OutData"));
00037    }
00038 
00039    CreateTimer("FlushTimer", (fFlushTime>0 && fFlushTime<0.1) ? fFlushTime : 0.1);
00040 
00041    DOUT0(("Splitter module created: name:%s numout:%d flush:%4.2fs", GetName(), numoutputs, fFlushTime));
00042 }
00043 
00044 roc::SplitterModule::~SplitterModule()
00045 {
00046 }
00047 
00048 void roc::SplitterModule::BeforeModuleStart()
00049 {
00050 }
00051 
00052 void roc::SplitterModule::ProcessInputEvent(dabc::Port* port)
00053 {
00054    while (FlushNextBuffer());
00055 }
00056 
00057 void roc::SplitterModule::ProcessOutputEvent(dabc::Port* port)
00058 {
00059    while (FlushNextBuffer());
00060 }
00061 
00062 void roc::SplitterModule::ProcessTimerEvent(dabc::Timer* timer)
00063 {
00064 //   bool isany = false;
00065 //   while (FlushNextBuffer()) isany = true;
00066 //   if (!isany) CheckBuffersFlush();
00067 
00068    CheckBuffersFlush();
00069 }
00070 
00071 int roc::SplitterModule::ExecuteCommand(dabc::Command cmd)
00072 {
00073    if (cmd.IsName("AddROC")) {
00074 
00075       int rocid = cmd.GetInt("ROCID", -1);
00076 
00077       if ((rocid<0) || (fMap.size()>=NumOutputs())) return dabc::cmd_false;
00078 
00079       OutputRec rec;
00080       rec.nout = fMap.size();
00081 
00082       DOUT0(("Splitter output %u assigned for ROC%d", rec.nout, rocid));
00083 
00084       fMap[rocid] = rec;
00085       return dabc::cmd_true;
00086    }
00087 
00088    return dabc::ModuleAsync::ExecuteCommand(cmd);
00089 }
00090 
00091 bool roc::SplitterModule::FlushNextBuffer()
00092 {
00093    if (!CanSendToAllOutputs()) return false;
00094 
00095    if (!Input()->CanRecv()) return false;
00096 
00097    dabc::Buffer buf = Input()->Recv();
00098 
00099    if (buf.null()) return false;
00100 
00101    if ((buf.GetTypeId() < roc::rbt_RawRocData) ||
00102          (buf.GetTypeId() > roc::rbt_RawRocData + roc::formatNormal)) {
00103       buf.Release(); // not necessary, but keep it
00104       EOUT(("Something wrong with buffer type"));
00105       return false;
00106    }
00107 
00108    int fmt = (buf.GetTypeId() - roc::rbt_RawRocData);
00109 
00110    if (fmt!=roc::formatOptic2) {
00111       EOUT(("Non-optic format not supported!"));
00112       buf.Release(); // not necessary, but keep it
00113       return false;
00114    }
00115 
00116    OutputMap::iterator miter;
00117    for (miter=fMap.begin(); miter != fMap.end(); miter++) {
00118       miter->second.wassent = false;
00119    }
00120    bool isanysent = false;
00121 
00122    // FIXME: use Pointer class to access data from the buffer
00123    uint16_t* src = (uint16_t*) buf.SegmentPtr();
00124    unsigned srclen = buf.GetTotalSize() / 8;
00125 
00126 //   DOUT0(("Receive buffer nummsg:%u", srclen));
00127 
00128    while (srclen>0) {
00129       miter = fMap.find(*src);
00130       if (miter == fMap.end()) {
00131          EOUT(("Non existing board number %u", *src));
00132          src+=4;
00133          srclen--;
00134          continue;
00135       }
00136 
00137       if (miter->second.buf.null()) {
00138          dabc::Buffer mbuf = Pool()->TakeBuffer();
00139          if (mbuf.null()) {
00140             EOUT(("No buffer for data from %u - skip data", *src));
00141             src+=4;
00142             srclen--;
00143             continue;
00144          }
00145          mbuf.SetTypeId(buf.GetTypeId());
00146 
00147          miter->second.ptr = mbuf.GetPointer();
00148          miter->second.buf << mbuf;
00149          miter->second.len = 0;
00150       }
00151 
00152       miter->second.ptr.copyfrom(src, 8);
00153       miter->second.len+=8;
00154       miter->second.ptr.shift(8);
00155 
00156       if (miter->second.len > miter->second.buf.GetTotalSize() - 8) {
00157          miter->second.buf.SetTotalSize(miter->second.len);
00158          Output(miter->second.nout)->Send(miter->second.buf.HandOver());
00159          //         fOutRate->AccountValue(miter->second.len / 1024./1024.);
00160          //         DOUT0(("Send to Output%u buffer len %u", miter->second.nout, miter->second.len));
00161          miter->second.buf.Release();
00162          miter->second.len = 0;
00163          miter->second.ptr.reset();
00164          miter->second.lasttm = dabc::Now();
00165          miter->second.wassent = true;
00166          isanysent = true;
00167       }
00168 
00169       src+=4; // !!!!! we are using uint16_t, therefore +4 !!!!!!!!
00170       srclen--;
00171    }
00172 
00173 
00174    buf.Release(); // not necessary, but keep it
00175 
00176    CheckBuffersFlush(isanysent);
00177 
00178    return true;
00179 }
00180 
00181 void roc::SplitterModule::CheckBuffersFlush(bool forceunsent)
00182 {
00183    OutputMap::iterator miter;
00184 
00185    dabc::TimeStamp now = dabc::Now();
00186 
00187    for (miter=fMap.begin(); miter != fMap.end(); miter++) {
00188 
00189       // variable indicate that we should send buffer while some other data was flushed
00190       bool sendanyway = forceunsent && !miter->second.wassent;
00191 
00192       miter->second.wassent = false;
00193 
00194       // if no data was filled keep buffer for next data
00195       if ((miter->second.len == 0) || miter->second.buf.null()) continue;
00196 
00197       // if there is too few data or flush timeout not yet happen keep buffers
00198       if (!sendanyway && (miter->second.len <= miter->second.buf.GetTotalSize() - 8) &&
00199          ((fFlushTime<=0.) || ((now - miter->second.lasttm) < fFlushTime))) continue;
00200 
00201       // if output queue is full, do not try
00202       if (!Output(miter->second.nout)->CanSend()) continue;
00203 
00204       miter->second.buf.SetTotalSize(miter->second.len);
00205       Output(miter->second.nout)->Send(miter->second.buf.HandOver());
00206 //      fOutRate->AccountValue(miter->second.len / 1024./1024.);
00207 //      DOUT0(("Send to Output%u buffer len %u", miter->second.nout, miter->second.len));
00208       miter->second.buf.Release(); // no need but keep it
00209       miter->second.len = 0;
00210       miter->second.ptr.reset();
00211       miter->second.lasttm = now;
00212    }
00213 }
00214 
00215 
00216 void roc::SplitterModule::AfterModuleStop()
00217 {
00218    OutputMap::iterator miter;
00219 
00220    for (miter=fMap.begin(); miter != fMap.end(); miter++) {
00221       miter->second.buf.Release();
00222       miter->second.len = 0;
00223    }
00224 }
00225 
00226 extern "C" void InitSplitter()
00227 {
00228    if (dabc::mgr.null()) {
00229       EOUT(("Manager is not created"));
00230       exit(1);
00231    }
00232 
00233    dabc::CmdCreateDevice cmd1(roc::typeAbbDevice, "AbbDev", "AbbThrd");
00234    cmd1.SetStr(roc::xmlBoardAddr, "abb");
00235    cmd1.SetStr(roc::xmlRole, base::roleToString(base::roleDAQ));
00236 
00237    if (!dabc::mgr.Execute(cmd1)) {
00238       EOUT(("Cannot create ABB device"));
00239       exit(1);
00240    }
00241 
00242 
00243    dabc::CmdCreateModule cmd2("roc::SplitterModule", "Splitter");
00244    cmd2.SetInt(dabc::xmlBufferSize, 128*1024);
00245    cmd2.SetInt(dabc::xmlNumOutputs, 2);
00246    cmd2.SetBool("SplitterDebug", true);
00247    if (!dabc::mgr.Execute(cmd2)) {
00248       EOUT(("Cannot create Splitter module"));
00249       exit(1);
00250    }
00251    dabc::ModuleRef m = dabc::mgr.FindModule("Splitter");
00252    if (m.null()) {
00253       EOUT(("No Splitter module"));
00254       exit(1);
00255    }
00256 
00257 
00258    dabc::CmdCreateTransport cmd3("Splitter/Input", "AbbDev", "AbbThrd");
00259    cmd3.SetStr(roc::xmlBoardAddr, "abb");
00260 
00261    if (!dabc::mgr.Execute(cmd3)) {
00262       EOUT(("Cannot connect splitter module to ABB device"));
00263       exit(1);
00264    }
00265 
00266    dabc::Command cmd4("AddROC");
00267    cmd4.SetInt("ROCID", 0);
00268    if (!m.Execute(cmd4)) {
00269       EOUT(("Cannot ADD ROCID 0 to splitter"));
00270       exit(1);
00271    }
00272 
00273    dabc::Command cmd5("AddROC");
00274    cmd5.SetInt("ROCID", 1);
00275    if (!m.Execute(cmd5)) {
00276       EOUT(("Cannot ADD ROCID 1 to splitter"));
00277       exit(1);
00278    }
00279 
00280 }
00281 
00282 
00283 

Generated on Tue Dec 10 2013 04:52:16 for ROCsoft by  doxygen 1.7.1