• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

plugin/src/CombinerModule.cxx (r4864/r3193)

Go to the documentation of this file.
00001 /********************************************************************
00002  * The Data Acquisition Backbone Core (DABC)
00003  ********************************************************************
00004  * Copyright (C) 2009-
00005  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH
00006  * Planckstr. 1
00007  * 64291 Darmstadt
00008  * Germany
00009  * Contact:  http://dabc.gsi.de
00010  ********************************************************************
00011  * This software can be used under the GPL license agreements as stated
00012  * in LICENSE.txt file which is part of the distribution.
00013  ********************************************************************/
00014 
00015 #include "roc/CombinerModule.h"
00016 
00017 #include "roc/Board.h"
00018 #include "roc/Iterator.h"
00019 #include "roc/Commands.h"
00020 
00021 #include "dabc/logging.h"
00022 #include "dabc/Pointer.h"
00023 #include "dabc/PoolHandle.h"
00024 #include "dabc/MemoryPool.h"
00025 #include "dabc/Command.h"
00026 #include "dabc/Port.h"
00027 #include "dabc/Timer.h"
00028 #include "dabc/Pointer.h"
00029 #include "dabc/Manager.h"
00030 #include "dabc/Application.h"
00031 
00032 #include "mbs/LmdTypeDefs.h"
00033 #include "mbs/MbsTypeDefs.h"
00034 
00035 //#include "bnet/common.h"
00036 
00037 const char* roc::xmlBnetMode          = "BnetMode";
00038 const char* roc::xmlSkipErrorData     = "SkipErrorData";
00039 const char* roc::xmlIgnoreMissingEpoch= "IgnoreMissingEpoch";
00040 const char* roc::xmlSyncNumber        = "SyncNumber";
00041 const char* roc::xmlSyncScaleDown     = "SyncScaleDown";
00042 const char* roc::xmlSpillRoc          = "SpillRoc";
00043 const char* roc::xmlSpillAux          = "SpillAux";
00044 const char* roc::xmlCalibrationPeriod = "CalibrationPeriod";
00045 const char* roc::xmlCalibrationLength = "CalibrationLength";
00046 const char* roc::xmlThrottleAux       = "ThrottleAux";
00047 const char* roc::xmlGet4ResetPeriod   = "Get4ResetPeriod";
00048 const char* roc::xmlGet4ResetLimit    = "Get4ResetLimit";
00049 
00050 
00051 bool InitIterator(roc::Iterator& iter, dabc::Buffer& buf, unsigned shift = 0, bool isudp = false, unsigned rocid = 0)
00052 {
00053    if (buf.null()) return false;
00054 
00055    if ((buf.GetTypeId() < roc::rbt_RawRocData) ||
00056        (buf.GetTypeId() > roc::rbt_RawRocData + roc::formatNormal)) return false;
00057 
00058    int fmt = buf.GetTypeId() - roc::rbt_RawRocData;
00059 
00060    iter.setFormat(fmt);
00061    if (isudp) iter.setRocNumber(rocid);
00062 
00063    void* ptr = buf.SegmentPtr();
00064    unsigned size = buf.SegmentSize();
00065 
00066    if (shift>=size) return false;
00067 
00068    iter.assign((uint8_t*) ptr + shift, size - shift);
00069 
00070    return true;
00071 }
00072 
00073 
00074 roc::CombinerModule::CombinerModule(const char* name, dabc::Command cmd) :
00075    dabc::ModuleAsync(name, cmd),
00076    fInpPool(0),
00077    fOutPool(0),
00078    fBufferSize(0),
00079    fSimpleMode(false),
00080    fBnetMode(false),
00081    fOutBuf(),
00082    f_outptr(),
00083    fSpillRoc(-1),
00084    fSpillAux(-1),
00085    fSpillState(true),
00086    fCalibrationPeriod(-1.),
00087    fCalibrationLength(1.),
00088    fLastCalibrationTime(),
00089    fExtraMessages()
00090 {
00091 //   dabc::SetDebugLevel(5);
00092 
00093    fBnetMode = Cfg(roc::xmlBnetMode, cmd).AsBool(false);
00094 
00095    fSkipErrorData = Cfg(roc::xmlSkipErrorData, cmd).AsBool(false);
00096 
00097    fIgnoreMissingEpoch = Cfg(roc::xmlIgnoreMissingEpoch, cmd).AsBool(false);
00098    fSyncScaleDown = Cfg(roc::xmlSyncScaleDown, cmd).AsInt(-1);
00099    fSyncNumber = Cfg(roc::xmlSyncNumber, cmd).AsInt(0);
00100    if ((fSyncNumber<0) || (fSyncNumber>1)) fSyncNumber = 0;
00101 
00102    if ((fSyncScaleDown>=0) && (fSyncScaleDown<32))
00103       fSyncScaleDown = 1 << fSyncScaleDown;
00104    else
00105    if (fSyncScaleDown == 77) {
00106       DOUT0(("Special mode for FEET readout - take data as is, pack into MBS buffer and store"));
00107       fSimpleMode = true;
00108       fSyncScaleDown = -1;
00109    } else {
00110       EOUT(("Sync scale down factor not specified. Use 1 as default"));
00111       fSyncScaleDown = -1;
00112    }
00113 
00114    int numinputs = Cfg(dabc::xmlNumInputs,cmd).AsInt(1);
00115    fBufferSize = Cfg(dabc::xmlBufferSize, cmd).AsInt(16384);
00116    int numoutputs = Cfg(dabc::xmlNumOutputs, cmd).AsInt(2);
00117    fSpillRoc = Cfg(roc::xmlSpillRoc, cmd).AsInt(-1);
00118    fSpillAux = Cfg(roc::xmlSpillAux, cmd).AsInt(-1);
00119    fCalibrationPeriod = Cfg(roc::xmlCalibrationPeriod, cmd).AsDouble(-1.);
00120    fCalibrationLength = Cfg(roc::xmlCalibrationLength, cmd).AsDouble(0.5);
00121 
00122    fGet4ResetPeriod = Cfg(roc::xmlGet4ResetPeriod, cmd).AsDouble(-1.);
00123 
00124    // limit used no for periodic, but for intended reset
00125    fGet4ResetLimit = Cfg(roc::xmlGet4ResetLimit, cmd).AsDouble(-1);
00126    fLastGet4ResetTm = dabc::Now();
00127    fDetectGet4Problem = false;
00128 
00129    if (fGet4ResetLimit>0)
00130       DOUT0(("Enable GET4 reset at maximum every %5.1f s", fGet4ResetLimit));
00131 
00132    double flushTime = Cfg(dabc::xmlFlushTimeout, cmd).AsDouble(-1.);
00133 
00134    if (flushTime>0.)
00135       CreateTimer("FlushTimer", flushTime, false);
00136    fFlushFlag = false;
00137 
00138    DOUT1(("CombinerModule name:%s numinp = %d SyncScaleDown = %d BnetMode = %s flushtime = %5.1f", GetName(), numinputs, fSyncScaleDown, DBOOL(fBnetMode), flushTime));
00139 
00140    fLastCalibrationTime.Reset();
00141 
00142    fLastSpillTime = 0;
00143 
00144    CreatePar("SpillState").SetInt(-1);
00145 
00146    if (fSpillAux>=0) {
00147       if (fCalibrationPeriod < 5.) fCalibrationPeriod = 5.;
00148       if (fCalibrationLength < 0.1) fCalibrationLength = 0.1;
00149 
00150       DOUT1(("CombinerModule spill detection ROC:%d aux:%d period:%4.1f length:%3.1f", fSpillRoc, fSpillAux, fCalibrationPeriod, fCalibrationLength));
00151 
00152       CreatePar("SpillRate").SetRatemeter(false, 10).SetUnits("spill");
00153 
00154       // if (!fBnetMode) fSpillRate->SetDebugOutput(true);
00155       CreateTimer("CalibrTimer", -1., false);
00156    }
00157 
00158    if (fGet4ResetPeriod>0) {
00159       CreateTimer("Get4Timer", fGet4ResetPeriod, false);
00160    }
00161 
00162    CreatePar("RocData").SetRatemeter(false, 3.);
00163 
00164    CreatePar("RocEvents").SetRatemeter(false, 3.).SetUnits("ev");
00165 
00166    CreatePar("RocErrors").SetRatemeter(false, 3.).SetUnits("ev");
00167 
00168    if (Par("RocData").GetDebugLevel()<0) Par("RocData").SetDebugLevel(1);
00169    if (Par("RocEvents").GetDebugLevel()<0) Par("RocEvents").SetDebugLevel(1);
00170 
00171 
00172    fThrottleAux = Cfg(roc::xmlThrottleAux, cmd).AsInt(-1);
00173 
00174    std::string pname = Cfg(dabc::xmlInputPoolName, cmd).AsStdStr(roc::xmlRocPool);
00175 
00176    fInpPool = CreatePoolHandle(pname.c_str());
00177 
00178    for(unsigned num=0; num < (unsigned) numinputs; num++) {
00179       CreateInput(FORMAT(("Input%u", num)), fInpPool, Cfg(dabc::xmlInputQueueSize, cmd).AsInt(10));
00180       Input(num)->SetInpRateMeter(Par("RocData"));
00181 
00182       DOUT2(("Create input%u queue size = %u capacity %u", num, Input(num)->InputQueueSize(), Input(num)->InputQueueCapacity()));
00183 
00184       fInp.push_back(InputRec());
00185       fInp[num].use = false;
00186       if (fThrottleAux>=0)
00187          CreatePar(FORMAT(("Throttle%u", num))).SetRatemeter(false, 3.).SetLimits(0., 100.);
00188    }
00189 
00190    fOutPool = CreatePoolHandle(Cfg(dabc::xmlOutputPoolName, cmd).AsStdStr(roc::xmlRocPool).c_str());
00191 
00192    for(int n=0; n<numoutputs; n++)
00193       CreateOutput(FORMAT(("Output%d", n)), fOutPool, Cfg(dabc::xmlOutputQueueSize, cmd).AsInt(10));
00194 
00195    CreatePar("RocInfo", "info").SetSynchron(true, 2., false);
00196    SetInfo(dabc::format("ROC combiner module ready. NumInputs = %d" , numinputs), true);
00197 }
00198 
00199 roc::CombinerModule::~CombinerModule()
00200 {
00201    DOUT3(("roc::CombinerModule::~CombinerModule"));
00202 
00203 }
00204 
00205 void roc::CombinerModule::ModuleCleanup()
00206 {
00207    DOUT3(("roc::CombinerModule::ModuleCleanup()"));
00208    while (fExtraMessages.size()>0) {
00209       delete fExtraMessages.front();
00210       fExtraMessages.pop_front();
00211    }
00212 
00213    fOutBuf.Release();
00214 }
00215 
00216 
00217 void roc::CombinerModule::SetInfo(const std::string& info, bool forceinfo)
00218 {
00219    Par("RocInfo").SetStr(info);
00220 
00221    if (forceinfo) Par("RocInfo").FireModified();
00222 }
00223 
00224 
00225 void roc::CombinerModule::ProcessInputEvent(dabc::Port* inport)
00226 {
00227    unsigned inpid = InputNumber(inport);
00228 
00229    if (!inport->CanRecv()) {
00230       EOUT(("Something wrong with input %u %s", inpid, inport->GetName()));
00231       return;
00232    }
00233 
00234    if (fSimpleMode) {
00235 
00236       FillSimpleBuffer();
00237 
00238 /*      dabc::Buffer* buf = inport->Recv();
00239 
00240       if (Output(0)->CanSend())
00241          Output(0)->Send(buf);
00242       else
00243          dabc::Buffer::Release(buf);
00244 */
00245       return;
00246    }
00247 
00248 //   DOUT3(("Get new buffer in input %u ready %s !!!", inpid, DBOOL(fInp[inpid].isready)));
00249 
00250    // check events in the buffers queues
00251    FindNextEvent(inpid);
00252 
00253    FillBuffer();
00254 }
00255 
00256 
00257 void roc::CombinerModule::BeforeModuleStart()
00258 {
00259    if (fSpillAux==77) {
00260       DOUT0(("Shoot timer for the first time"));
00261       ShootTimer("CalibrTimer", fCalibrationPeriod);
00262    }
00263 }
00264 
00265 void roc::CombinerModule::PoolHandleCleaned(dabc::PoolHandle* pool)
00266 {
00267    DOUT3(("Cleanup data from pool %p name %s isout %s", pool, pool->GetName(), DBOOL(pool==fOutPool) ));
00268 
00269    if (pool==fOutPool) {
00270       f_outptr.reset();
00271       fOutBuf.Release();
00272    }
00273 }
00274 
00275 void roc::CombinerModule::ProcessTimerEvent(dabc::Timer* timer)
00276 {
00277    if (timer->IsName("CalibrTimer")) {
00278 
00279       if (fSpillAux==77) {
00280 
00281          fSpillState = !fSpillState;
00282 
00283          dabc::mgr.GetApp().Submit(roc::CmdCalibration(fSpillState));
00284 
00285          ShootTimer("CalibrTimer", fSpillState ? fCalibrationLength : fCalibrationPeriod);
00286       } else {
00287          dabc::mgr.GetApp().Submit(roc::CmdCalibration(false));
00288       }
00289    } else
00290 
00291    if (timer->IsName("FlushTimer")) {
00292       if (fFlushFlag) FlushOutputBuffer();
00293       fFlushFlag = true;
00294    } else
00295 
00296    if (timer->IsName("Get4Timer")) {
00297       InvokeAllGet4Reset();
00298    }
00299 }
00300 
00301 bool roc::CombinerModule::FindNextEvent(unsigned recid)
00302 {
00303    if (recid>=fInp.size()) return false;
00304 
00305    InputRec* rec = &(fInp[recid]);
00306 
00307    dabc::Port* port = Input(recid);
00308 
00309    if (!rec->isrocid()) {
00310       rec->isudp = (port->GetTransportParameter(roc::xmlTransportKind) == kind_UDP);
00311       rec->rocid = port->GetTransportParameter(roc::xmlRocNumber);
00312       rec->format = port->GetTransportParameter(roc::xmlMsgFormat);
00313 
00314       SetInfo(dabc::format("Detect on input %u ROC:%d Kind:%s", recid, rec->rocid, (rec->isudp ? "UDP" : "Optic")), true);
00315    }
00316 
00317 //   DOUT5(("FindNextEvent REC:%p ROCID:%d KIND:%d", rec, rec->rocid, rec->isudp));
00318 
00319    // if one already found events for specified roc, return
00320    if (rec->isready) return true;
00321 
00322    while (port->HasInputBuffer(rec->curr_nbuf)) {
00323 
00324       dabc::Buffer& buf = port->InputBuffer(rec->curr_nbuf);
00325 
00326       if (rec->curr_indx >= buf.GetTotalSize()) {
00327          if (rec->isprev || rec->isnext) {
00328             rec->curr_nbuf++;
00329             if (rec->curr_nbuf == port->InputQueueCapacity()) {
00330                SetInfo(dabc::format("Skip all data while we cannot find two events in complete queue for input %u nbuf %u isprev %s isnext %s prevnbuf %u", recid, rec->curr_nbuf, DBOOL(rec->isprev), DBOOL(rec->isnext), rec->prev_nbuf));
00331                DOUT0(("Skip all data while we cannot find two events in complete queue for input %u nbuf %u isprev %s isnext %s prevnbuf %u prevevnt %u buftotalsize %u", recid, rec->curr_nbuf, DBOOL(rec->isprev), DBOOL(rec->isnext), rec->prev_nbuf, rec->prev_evnt, buf.GetTotalSize()));
00332 
00333                rec->isprev = false;
00334                rec->isnext = false;
00335                port->SkipInputBuffers(rec->curr_nbuf);
00336                rec->curr_nbuf = 0;
00337             }
00338          } else {
00339             // no need to keep this buffer in place if no epoch was found
00340             port->SkipInputBuffers(rec->curr_nbuf+1);
00341             rec->curr_nbuf = 0;
00342          }
00343          rec->curr_indx = 0;
00344          continue;
00345       }
00346 
00347 //      bool dodump = (rec->curr_indx == 0);
00348 
00349       roc::Iterator iter;
00350 
00351       InitIterator(iter, buf, rec->curr_indx, rec->isudp, rec->rocid);
00352 
00353       bool iserr = false;
00354       roc::Message* data = 0;
00355 
00356       while (iter.next()) {
00357 
00358          data = & iter.msg();
00359 
00360          if (rec->IsDifferentRocId(data->getRocNumber(), iserr)) {
00361             if (iserr)
00362                EOUT(("Input:%u Kind:%s Mismatch in ROC numbers %u %u", recid, (rec->isudp ? "UDP" : "Optic"), rec->rocid, data->getRocNumber()));
00363             rec->curr_indx += iter.getMsgSize();
00364             continue;
00365          }
00366 
00367          switch (data->getMessageType()) {
00368 
00369          case roc::MSG_HIT: { // nXYTER message, not interesting here
00370             break;
00371          }
00372 
00373          case roc::MSG_EPOCH: { // epoch marker
00374             rec->curr_epoch = data->getEpochNumber();
00375             rec->iscurrepoch = true;
00376             break;
00377          }
00378 
00379          case roc::MSG_SYNC: { // SYCN message
00380 
00381             if (data->getSyncChNum()==fSyncNumber) {
00382                bool isepoch = rec->iscurrepoch;
00383 
00384                if (!isepoch && fIgnoreMissingEpoch) {
00385                   // workaround for the corrupted FEET readout,
00386                   // where no epoch created for SYNC markers
00387 
00388                   // increment faked epoch number for each next sync marker
00389                   rec->curr_epoch++;
00390                   isepoch = true;
00391                }
00392 
00393                if (!isepoch) {
00394                   SetInfo(dabc::format("Found SYNC marker %6x without epoch", data->getSyncData()));
00395                } else
00396                if ((fSyncScaleDown>0) && (data->getSyncData() % fSyncScaleDown != 0)) {
00397                   SetInfo(dabc::format("Roc%u SYNC marker %06x not in expected sync step %02x",
00398                         rec->rocid, data->getSyncData(), fSyncScaleDown));
00399                } else {
00400                   if (!rec->isprev) {
00401                      rec->prev_epoch = rec->curr_epoch;
00402                      rec->isprev = true;
00403                      rec->prev_nbuf = rec->curr_nbuf;
00404                      rec->prev_indx = rec->curr_indx;
00405                      rec->prev_evnt = data->getSyncData();
00406                      rec->prev_stamp = data->getSyncTs();
00407                      rec->data_length = 0;
00408                   } else {
00409                      rec->next_epoch = rec->curr_epoch;
00410                      rec->isnext = true;
00411                      rec->next_nbuf = rec->curr_nbuf;
00412                      rec->next_indx = rec->curr_indx;
00413                      rec->next_evnt = data->getSyncData();
00414                      rec->next_stamp = data->getSyncTs();
00415                   }
00416                }
00417             }
00418             break;
00419          }
00420 
00421          case roc::MSG_AUX: { // AUX message
00422 
00423             if ((fThrottleAux>=0) &&  (data->getAuxChNum()==(unsigned)fThrottleAux)) {
00424                uint64_t tm = data->getMsgFullTime(rec->curr_epoch);
00425                bool state = data->getAuxFalling() == 0;
00426 
00427                if ((rec->last_thottle_tm != 0) && (rec->last_throttle_state != state)) {
00428                   uint64_t dist = roc::Message::CalcDistance(rec->last_thottle_tm, tm);
00429 
00430                   if (rec->last_throttle_state)
00431                      Par(FORMAT(("Throttle%u", data->getRocNumber()))).SetDouble(dist * 1e-7); // maximum is 100 % can be
00432                }
00433 
00434                rec->last_thottle_tm = tm;
00435                rec->last_throttle_state = state;
00436             } else
00437             if ((fSpillRoc>=0) && ((unsigned)fSpillRoc==data->getRocNumber())
00438                  && ((unsigned)fSpillAux == data->getAuxChNum())) {
00439 
00440                uint64_t tm = data->getMsgFullTime(rec->curr_epoch);
00441 
00442                bool faraway = (roc::Message::CalcDistance(fLastSpillTime, tm) > 10000000);
00443                if (tm < fLastSpillTime) faraway = true;
00444 
00445                bool changed = false;
00446 
00447                if ((data->getAuxFalling()!=0) && fSpillState && faraway) {
00448                   DOUT1(("DETECT SPILL OFF"));
00449                   fSpillState = false;
00450                   fLastSpillTime = tm;
00451                   changed = true;
00452                } else
00453                if ((data->getAuxFalling()==0) && !fSpillState && faraway) {
00454                   DOUT1(("DETECT SPILL ON"));
00455                   fSpillState = true;
00456                   fLastSpillTime = tm;
00457                   changed = true;
00458                   Par("SpillRate").SetInt(1);
00459                }
00460 
00461                if (changed) Par("SpillState").SetInt(fSpillState ? 1 : 0);
00462 
00463                // DOUT0(("Period %f changed %d state %d", fCalibrationPeriod, changed, fSpillState));
00464 
00465                // if spill is off and calibration period is specified, try to start calibration
00466                if ((fCalibrationPeriod>0) && changed && !fSpillState) {
00467                   dabc::TimeStamp now = dabc::Now();
00468                   double dist = fCalibrationPeriod + 1000;
00469 
00470                   if (!fLastCalibrationTime.null())
00471                      dist = now - fLastCalibrationTime;
00472 
00473                   // DOUT0(("Distane %6.1f", dist));
00474 
00475                   if (dist > fCalibrationPeriod) {
00476                      fLastCalibrationTime = now;
00477                      DOUT0(("Invoke autocalibr mode after %5.1f s", dist));
00478 
00479                      dabc::mgr.GetApp().Submit(roc::CmdCalibration(true));
00480                      ShootTimer("CalibrTimer", fCalibrationLength);
00481                   }
00482                }
00483 
00484             }
00485             break;
00486          }
00487 
00488          case roc::MSG_EPOCH2:  { // EPOCH2 message
00489             uint32_t get4 = data->getEpoch2ChipNumber();
00490             uint32_t epoch2 = data->getEpoch2Number();
00491 
00492             if (get4<MaxGet4 && rec->canCheckAnyGet4) {
00493                if ((rec->lastEpoch2[get4]!=0) && (epoch2!=0) && (epoch2 > rec->lastEpoch2[get4])) {
00494                   uint32_t diff = epoch2 - rec->lastEpoch2[get4];
00495                   // try to exclude very far lost of epochs - most probably, transport failure
00496                   if ((diff!=1) && (diff<5) && rec->canCheckGet4[get4]) {
00497                      DOUT0(("Detect error epoch2 %u  shift = %u on ROC:%u Get4:%u", epoch2, diff, rec->rocid, get4));
00498                      fDetectGet4Problem = true;
00499                   }
00500                }
00501 
00502                rec->lastEpoch2[get4] = epoch2;
00503                if (data->getEpoch2Sync()) {
00504 
00505                   if (epoch2 > 0xff000000) {
00506                      DOUT0(("GET4 epoch2 %u number on ROC:%u Get4:%u too large", epoch2, rec->rocid, get4));
00507                      if (epoch2>0xfffff000) dabc::mgr.StopApplication();
00508                   }
00509 
00510                   if (rec->iscurrepoch && (rec->curr_epoch > 0xf0000000)) {
00511                      DOUT0(("250 MHz epoch = %u on ROC:%u with GET4 readout closer to overflow ", rec->curr_epoch, rec->rocid));
00512                      if (rec->curr_epoch > 0xffff0000) dabc::mgr.StopApplication();
00513                   }
00514 
00515                   if (!rec->canCheckGet4[get4]) {
00516                      DOUT0(("Enable checking of GET4:%u on ROC:%u", get4, rec->rocid));
00517                      rec->canCheckGet4[get4] = true;
00518                   }
00519 
00520                   unsigned mod = epoch2 % 25;
00521                   if (mod!=0) {
00522                      fDetectGet4Problem = true;
00523 
00524                      if (rec->lastEpoch2SyncErr[get4]!=mod)
00525                         DOUT0(("Detect wrong epoch2 : %u (mod=%u) value when sync=1 ROC:%u Get4:%u", epoch2, mod, rec->rocid, get4));
00526 
00527                      rec->lastEpoch2SyncErr[get4] = mod;
00528                   }
00529 
00530                   // check about every 0.625 s
00531                   if (epoch2 % 25000 == 0) {
00532                       // check if during such long period significant part of the distance between edges approx equal to epoch length
00533                      for (int nch=0;nch<MaxGet4Ch;nch++) {
00534                         if ((rec->get4AllCnt[get4][nch] > 10) &&
00535                             (rec->get4ErrCnt[get4][nch] > 0.2 * rec->get4AllCnt[get4][nch])) {
00536                                DOUT0(("Suspicious distance on ROC:%d Get4:%u Channel:%u fullcnt:%d errcnt:%d\n", rec->rocid, get4, nch, rec->get4AllCnt[get4][nch], rec->get4ErrCnt[get4][nch]));
00537                                fDetectGet4Problem = true;
00538                             }
00539                         rec->get4AllCnt[get4][nch]=0;
00540                         rec->get4ErrCnt[get4][nch]=0;
00541 
00542                         rec->get4EdgeErrs[get4][nch]=0;
00543                      }
00544                      rec->lastEpoch2SyncErr[get4]=0;
00545                   }
00546                }
00547             } else
00548             if (!rec->canCheckAnyGet4 && (dabc::Now() > fLastGet4ResetTm + 1.)) {
00549                DOUT0(("Enable GET4 checking again for ROC%u", rec->rocid));
00550                rec->canCheckAnyGet4 = true;
00551             }
00552 
00553             if (fDetectGet4Problem && (fGet4ResetLimit>0.)) {
00554                if (dabc::Now() > fLastGet4ResetTm + fGet4ResetLimit)
00555                   InvokeAllGet4Reset();
00556             }
00557 
00558             break;
00559          }
00560 
00561          case roc::MSG_GET4: {  // GET4 message
00562             unsigned g4id = data->getGet4Number();
00563 
00564             if (rec->canCheckGet4[g4id] && rec->canCheckAnyGet4) {
00565 
00566                unsigned g4ch = data->getGet4ChNum();
00567                unsigned g4fl = data->getGet4Edge();
00568 
00569                uint64_t fulltm = data->getMsgFullTime(rec->lastEpoch2[g4id]);
00570 
00571                bool change_edge = ((rec->get4EdgeCnt[g4id][g4ch]>0) ^ g4fl);
00572 
00573                if (change_edge) {
00574                   rec->get4AllCnt[g4id][g4ch]++;
00575                   uint64_t diff2 = (fulltm - rec->get4LastTm[g4id][g4ch]);
00576 
00577                   // check if difference between two edges close to epoch length - 26214.4 ns
00578                   for (unsigned k=1;k<4;k++)
00579                      if ((diff2 > (k*26214 - 500)) && (diff2 < (k*26214 + 500))) {
00580                         rec->get4ErrCnt[g4id][g4ch]++;
00581                         break;
00582                      }
00583 
00584                   // check if there are many same edges one after another
00585                   if (abs(rec->get4EdgeCnt[g4id][g4ch])>3) {
00586                      // if there are few errors, just inform about them, otherwise invoke failure
00587                      if (rec->get4EdgeErrs[g4id][g4ch]++<3)
00588                         DOUT0(("EDGE failure ROC:%u Get4:%u Channel:%u EdgeCnt:%d Last epoch2:%u", rec->rocid, g4id, g4ch, rec->get4EdgeCnt[g4id][g4ch], rec->lastEpoch2[g4id]));
00589                      else
00590                         fDetectGet4Problem = true;
00591                   }
00592 
00593                   // reset counter when changing sign
00594                   rec->get4EdgeCnt[g4id][g4ch] = 0;
00595                }
00596 
00597                rec->get4EdgeCnt[g4id][g4ch] += (g4fl ? +1 : -1);
00598 
00599                rec->get4LastTm[g4id][g4ch] = fulltm;
00600 
00601             }
00602 
00603             break;
00604          }
00605 
00606          case roc::MSG_SYS:  { // SYS message
00607             switch (data->getSysMesType()) {
00608                case roc::SYSMSG_SYNC_PARITY:
00609                   SetInfo(dabc::format("Roc%u Sync parity", rec->rocid));
00610                   break;
00611                case SYSMSG_USER:
00612                   SetInfo(dabc::format("Roc%u user sys message %u", rec->rocid, data->getSysMesData()));
00613                   if (data->getSysMesData()==roc::SYSMSG_USER_RECONFIGURE) {
00614                      // SetInfo("One could start checking of GET4 messages again");
00615                      DOUT2(("One could start checking of GET4 messages again"));
00616                      rec->canCheckAnyGet4 = true;
00617                   }
00618                   break;
00619                default:
00620                   SetInfo(dabc::format("Roc%u SysMsg type = %u", rec->rocid, data->getSysMesType()));
00621             }
00622             break;
00623          }
00624 
00625          } // end of the switch
00626 
00627          rec->curr_indx += iter.getMsgSize();
00628 
00629          if (rec->isprev) rec->data_length += iter.getMsgSize();
00630 
00631          if (rec->isprev && rec->isnext) {
00632              DOUT5(("ROCID:%u Find sync events %u - %u fmt:%u", rec->rocid, rec->prev_evnt, rec->next_evnt, rec->format));
00633 
00634 //            DOUT1(("Tm: %7.5f Rocid:%u Find sync events %u - %u between %u:%u and %u:%u",
00635 //                    TimeStamp()*1e-6, rec->rocid, rec->prev_evnt, rec->next_evnt,
00636 //                    rec->prev_nbuf, rec->prev_indx, rec->next_nbuf, rec->next_indx));
00637 
00638             rec->isready = true;
00639             return true;
00640          }
00641       }
00642    }
00643 
00644    return false;
00645 }
00646 
00647 
00648 void roc::CombinerModule::InvokeAllGet4Reset()
00649 {
00650    for (unsigned ninp = 0; ninp < fInp.size(); ninp++) {
00651       fInp[ninp].canCheckAnyGet4 = false;
00652       for (int n=0;n<MaxGet4;n++) {
00653          fInp[ninp].canCheckGet4[n] = false;
00654          fInp[ninp].lastEpoch2SyncErr[n] = 0;
00655          fInp[ninp].lastEpoch2[n] = 0;
00656          for(int ch=0;ch<MaxGet4Ch;ch++) {
00657             fInp[ninp].get4EdgeCnt[n][ch] = 0;
00658             fInp[ninp].get4EdgeErrs[n][ch] = 0;
00659             fInp[ninp].get4AllCnt[n][ch] = 0;
00660             fInp[ninp].get4ErrCnt[n][ch] = 0;
00661             fInp[ninp].get4LastTm[n][ch] = 0;
00662          }
00663       }
00664    }
00665 
00666    fLastGet4ResetTm = dabc::Now();
00667    fDetectGet4Problem = false;
00668    DOUT0(("Submit GET4 reset command"));
00669    dabc::mgr.GetApp().Submit(dabc::Command("ResetAllGet4"));
00670 }
00671 
00672 
00673 void roc::CombinerModule::ProcessOutputEvent(dabc::Port* inport)
00674 {
00675    if (fSimpleMode) {
00676       FillSimpleBuffer();
00677       return;
00678    }
00679 
00680    FillBuffer();
00681 }
00682 
00683 
00684 void roc::CombinerModule::FillSimpleBuffer()
00685 {
00686 
00687    dabc::Buffer inpbuf;
00688    unsigned ninp = 0;
00689    dabc::Pointer inpptr;
00690 
00691    while (1) {
00692 
00693       for (unsigned n=0; (n < NumInputs()) && inpbuf.null(); n++)
00694          if (Input(n)->CanRecv()) {
00695             ninp = n;
00696             inpbuf = Input(ninp)->Recv();
00697             inpptr = inpbuf.GetPointer();
00698             break;
00699          }
00700 
00701       if (inpbuf.null()) return;
00702 
00703       if (!CanSendToAllOutputs()) return;
00704 
00705       unsigned rawdata_sz = inpptr.fullsize();
00706 
00707       unsigned needsize = rawdata_sz + sizeof(mbs::EventHeader) + sizeof(mbs::SubeventHeader);
00708       if (needsize < fBufferSize) needsize = fBufferSize;
00709 
00710       dabc::Buffer outbuf = fOutPool->TakeBuffer(needsize);
00711       if (outbuf.null()) {
00712          EOUT(("Cannot get buffer %u from pool", needsize));
00713          return;
00714       }
00715       // outbuf.SetTotalSize(fBufferSize);
00716 
00717       dabc::Pointer outptr = outbuf.GetPointer();
00718 
00719       mbs::EventHeader* evhdr = (mbs::EventHeader*) outptr();
00720 
00721       static int32_t evnt_num(1);
00722 
00723       evhdr->Init(evnt_num++);
00724       outbuf.Shift(outptr, sizeof(mbs::EventHeader));
00725 
00726       InputRec* rec = &(fInp[ninp]);
00727 
00728       if (!rec->isrocid()) {
00729          rec->isudp = (Input(ninp)->GetTransportParameter(roc::xmlTransportKind) == kind_UDP);
00730          rec->rocid = Input(ninp)->GetTransportParameter(roc::xmlRocNumber);
00731          rec->format = Input(ninp)->GetTransportParameter(roc::xmlMsgFormat);
00732 
00733          DOUT0(("Detect on input %u ROC:%d Kind:%s", ninp, rec->rocid, (rec->isudp ? "UDP" : "Optic")));
00734       }
00735 
00736       mbs::SubeventHeader* subhdr = (mbs::SubeventHeader*) outptr();
00737       subhdr->Init();
00738       subhdr->iProcId = roc::proc_RawData;
00739       subhdr->iSubcrate = rec->rocid;
00740       subhdr->iControl = rec->format;
00741 
00742       outbuf.Shift(outptr, sizeof(mbs::SubeventHeader));
00743 
00744       unsigned msg_size = roc::Message::RawSize(rec->format);
00745 
00746       // take as much as possible
00747       if (outptr.fullsize()<rawdata_sz) {
00748          rawdata_sz = outptr.fullsize() / msg_size * msg_size;
00749          DOUT1(("Too many raw data %u, take only %u", outptr.fullsize(), rawdata_sz));
00750       }
00751 
00752       outptr.copyfrom(inpptr, rawdata_sz);
00753       outbuf.Shift(outptr, rawdata_sz);
00754 
00755       if (rawdata_sz == inpptr.fullsize()) {
00756          inpptr.reset();
00757          inpbuf.Release();
00758       } else {
00759          inpbuf.Shift(inpptr, rawdata_sz);
00760       }
00761 
00762       subhdr->SetRawDataSize(rawdata_sz);
00763 
00764       evhdr->SetSubEventsSize(sizeof(mbs::SubeventHeader) + rawdata_sz);
00765 
00766       dabc::BufferSize_t usedsize = outbuf.GetTotalSize() - outptr.fullsize();
00767       if (usedsize==0) {
00768          outbuf.Release();
00769          return;
00770       }
00771 
00772       outbuf.SetTotalSize(usedsize);
00773 
00774       outbuf.SetTypeId(mbs::mbt_MbsEvents);
00775 
00776       SendToAllOutputs(outbuf);
00777    }
00778 }
00779 
00780 
00781 
00782 bool roc::CombinerModule::SkipEvent(unsigned recid)
00783 {
00784    if (recid>=fInp.size()) return false;
00785 
00786    InputRec* rec = &(fInp[recid]);
00787 
00788    if ((rec==0) || !rec->isprev || !rec->isnext) return false;
00789 
00790    rec->isready     = false;
00791    rec->isprev      = true;
00792    rec->prev_epoch  = rec->next_epoch;
00793    rec->prev_nbuf   = rec->next_nbuf;
00794    rec->prev_indx   = rec->next_indx;
00795    rec->prev_evnt   = rec->next_evnt;
00796    rec->prev_stamp  = rec->next_stamp;
00797    rec->data_length = 0;
00798 
00799    rec->isnext     = false;
00800 
00801    rec->nummbssync = 0;
00802    rec->firstmbssync = 0;
00803 
00804    unsigned can_skip = rec->can_skip_buf();
00805 
00806    if (can_skip>0) {
00807 //      DOUT0(("On input %u Skip buffers %u", recid, master_skip));
00808 
00809       Input(recid)->SkipInputBuffers(can_skip);
00810 
00811       rec->did_skip_buf(can_skip);
00812    }
00813 
00814 //   DOUT4(("Skip event done, search next from %u:%u",
00815 //            rec->curr_nbuf, rec->curr_indx));
00816 
00817    return FindNextEvent(recid);
00818 }
00819 
00820 uint32_t CalcEventDistanceNew(uint32_t prev_ev, uint32_t next_ev)
00821 {
00822    return (next_ev>=prev_ev) ? next_ev - prev_ev : next_ev + 0x1000000 - prev_ev;
00823 }
00824 
00825 uint32_t CalcAbsEventDistanceNew(uint32_t ev1, uint32_t ev2)
00826 {
00827    uint32_t diff = ev1>ev2 ? ev1-ev2 : ev2-ev1;
00828    if (diff > 0x800000) diff = 0x1000000 - diff;
00829    return diff;
00830 }
00831 
00832 
00833 void roc::CombinerModule::FillBuffer()
00834 {
00835    // method fill output buffer with complete sync event from all available sources
00836 
00837    bool doagain = true;
00838    uint32_t evnt_cut = 0;
00839 
00840    while (doagain) {
00841 
00842       doagain = false;
00843 
00844       bool is_data_selected(false);
00845 
00846       // first check that all inputs has at least first event
00847       for (unsigned ninp = 0; ninp < fInp.size(); ninp++) {
00848          InputRec* inp = &(fInp[ninp]);
00849          if (!inp->isprev) return;
00850          inp->use = false;
00851          inp->data_err = false;
00852       }
00853 
00854       // than check that if two event are defined, not too big difference between them
00855       for (unsigned ninp = 0; ninp < fInp.size(); ninp++) {
00856          InputRec* inp = &(fInp[ninp]);
00857 
00858          if (!inp->isready) continue;
00859 
00860          uint32_t diff = CalcEventDistanceNew(inp->prev_evnt, inp->next_evnt);
00861          if (fSyncScaleDown>0) {
00862             if (diff != (unsigned) fSyncScaleDown) {
00863                DOUT0(("ROC%u: Events shift %06x -> %06x = %02x, expected %02x, ignore", inp->rocid, inp->prev_evnt, inp->next_evnt, diff, fSyncScaleDown));
00864                inp->data_err = true;
00865             }
00866          } else {
00867             if (diff > 0x100000) {
00868                inp->data_err = true;
00869                EOUT(("Too large event shift %06x -> %06x = %06x on ROC%u", inp->prev_evnt, inp->next_evnt, diff, inp->rocid));
00870             }
00871          }
00872 
00873 /*         if ((!inp->data_err) && (ninp==0)) {
00874             if  (inp->prev_evnt != (inp->prev_epoch & 0xffffff)) {
00875                 EOUT(("Mismatch in epoch %8x and event number %6x for ROC%u", inp->prev_epoch, inp->prev_evnt, inp->rocid));
00876                 inp->data_err = true;
00877             }
00878 
00879             if  (inp->next_evnt != (inp->next_epoch & 0xffffff)) {
00880                 EOUT(("Mismatch in epoch %8x and event number %6x for ROC%u", inp->next_epoch, inp->next_evnt, inp->rocid));
00881                 inp->data_err = true;
00882             }
00883          }
00884 */
00885          if (inp->data_err) {
00886             inp->use = true;
00887             is_data_selected = true;
00888             break;
00889          }
00890       }
00891 
00892       uint32_t min_evnt(0xffffff), max_evnt(0); // 24 bits
00893 
00894       if (!is_data_selected) {
00895 
00896          // try select data on the basis of the event number
00897 
00898          for (unsigned ninp = 0; ninp < fInp.size(); ninp++) {
00899             // one should have at least epoch mark define
00900             InputRec* inp = &(fInp[ninp]);
00901 
00902             if (!inp->isprev) return;
00903 
00904             if (inp->prev_evnt < evnt_cut) continue;
00905 
00906             if (inp->prev_evnt < min_evnt) min_evnt = inp->prev_evnt;
00907             if (inp->prev_evnt > max_evnt) max_evnt = inp->prev_evnt;
00908          }
00909 
00910          if (min_evnt > max_evnt) {
00911             EOUT(("Nothing found!!!"));
00912             return;
00913          }
00914 
00915          // try to detect case of upper border crossing
00916          if (min_evnt != max_evnt)
00917             if ((min_evnt < 0x10000) && (max_evnt > 0xff0000L)) {
00918                evnt_cut = 0x800000;
00919                doagain = true;
00920                continue;
00921             }
00922 
00923          evnt_cut = 0;
00924 
00925          for (unsigned ninp = 0; ninp < fInp.size(); ninp++) {
00926 
00927             InputRec* inp = &(fInp[ninp]);
00928 
00929             inp->use = (inp->prev_evnt == min_evnt);
00930             inp->data_err = false;
00931 
00932             if (!inp->use) continue;
00933 
00934             // one should have next event completed to use it for combining
00935             if (!inp->isready) return;
00936 
00937             is_data_selected = true;
00938 
00939             if (ninp==0) continue;
00940 
00941             if (!fInp[0].use) // if first roc is not used, anyway error data
00942                inp->data_err = true;
00943             else
00944             if (inp->next_evnt != fInp[0].next_evnt) {
00945                inp->data_err = true;
00946                uint32_t diff = CalcAbsEventDistanceNew(inp->next_evnt , fInp[0].next_evnt);
00947 
00948                if (diff > 0x1000)
00949                   EOUT(("Next event mismatch between ROC%u:%06x and ROC%u:%06x  diff = %06x",
00950                          fInp[0].rocid, fInp[0].next_evnt, inp->rocid, inp->next_evnt, diff));
00951             }
00952          }
00953 
00954       }
00955 
00956       if (!is_data_selected) {
00957          EOUT(("Data not selected here - why"));
00958          return;
00959       }
00960 
00961       dabc::BufferSize_t totalsize = sizeof(mbs::EventHeader);
00962       unsigned numused(0);
00963       bool skip_evnt(false);
00964 
00965 //      DOUT3(("Try to prepare for event %u", min_evnt));
00966 
00967       for (unsigned ninp = 0; ninp < fInp.size(); ninp++) {
00968 
00969          InputRec* inp = &(fInp[ninp]);
00970 
00971          if (!inp->use) continue;
00972 
00973          if (inp->data_err && fSkipErrorData) skip_evnt = true;
00974 
00975          totalsize += sizeof(mbs::SubeventHeader);
00976 
00977          // one need always epoch for previous event
00978          totalsize += 2*roc::Message::RawSize(inp->format);
00979 
00980          totalsize += inp->data_length;
00981 
00982          numused++;
00983       }
00984 
00985       unsigned extra_size(0);
00986 
00987       unsigned grand_totalsize = totalsize + extra_size;
00988 
00989       // check that event id corresponds with sync scale down factor
00990       if (fBnetMode && (min_evnt<=max_evnt) && (fSyncScaleDown > 0) && (min_evnt % fSyncScaleDown != 0)) {
00991          EOUT(("Event number %u not in agreement with scale down factor %u - discard event", min_evnt, fSyncScaleDown));
00992          skip_evnt = true;
00993       }
00994 
00995       // if not all inputs are selected in bnet mode, skip data while we need complete events
00996       if (fBnetMode && (numused<fInp.size())) {
00997          EOUT(("Skip while too few events"));
00998          skip_evnt = true;
00999       }
01000 
01001       // if we producing normal event, try to add extra messages if we have them
01002       if (!skip_evnt && (numused==fInp.size()) && fExtraMessages.size()>0)
01003          extra_size = fExtraMessages.front()->size() * roc::Message::RawSize(roc::formatNormal);
01004 
01005       // check if we should send previously filled data - reason that new event does not pass into rest of buffer
01006       if (fBnetMode || (!skip_evnt && (grand_totalsize > f_outptr.fullsize())))
01007          if (!FlushOutputBuffer()) return;
01008 
01009       // take new buffer if required
01010       if (!skip_evnt && (fOutBuf.null() || (grand_totalsize > f_outptr.fullsize()))) {
01011          if (fOutBuf.null())
01012             fOutBuf = fOutPool->TakeBuffer(fBufferSize);
01013          if (fOutBuf.null()) {
01014             EOUT(("Cannot get buffer from pool"));
01015             return;
01016          }
01017 
01018          // if we cannot put extra messages, try to skip them at this event
01019          if (fOutBuf.GetTotalSize() < grand_totalsize)
01020             extra_size = 0;
01021 
01022          // if anyway size too big, skip event totally
01023          if (fOutBuf.GetTotalSize() < totalsize) {
01024             SetInfo(dabc::format("Cannot put event (sz:%u) in buffer (sz:%u) - skip event", totalsize, fOutBuf.GetTotalSize()));
01025             skip_evnt = true;
01026          }
01027          f_outptr = fOutBuf.GetPointer();
01028       }
01029 
01030       if (!skip_evnt) {
01031          dabc::Pointer old(f_outptr);
01032 
01033 //         DOUT5(("Start MBS event at pos %u", fOutBuf.Distance(fOutBuf.GetPointer(), f_outptr)));
01034 
01035 //         DOUT0(("Building event %u", min_evnt));
01036 
01037          mbs::EventHeader* evhdr = (mbs::EventHeader*) f_outptr();
01038          evhdr->Init(min_evnt);
01039          fOutBuf.Shift(f_outptr, sizeof(mbs::EventHeader));
01040 
01041 //         DOUT1(("Fill event %7u  ~size %6u inp0 %ld outsize:%u",
01042 //            min_evnt, totalsize, Input(0)->InputPending(), f_outptr.fullsize()));
01043 
01044 //         DOUT1(("Distance to data %u", fOutBuf.Distance(fOutBuf.GetPointer(), f_dataptr)));
01045 
01046          MessagesVector* extra = 0;
01047 
01048          if (extra_size>0) {
01049             extra = fExtraMessages.front();
01050             fExtraMessages.pop_front();
01051          }
01052 
01053          unsigned filled_sz = FillRawSubeventsBuffer(fOutBuf, f_outptr, extra);
01054 
01055          SetInfo(dabc::format("Fill event %7u sz %u", min_evnt, filled_sz));
01056 
01057          //DOUT1(("Build event %u", filled_sz));
01058 
01059          if (filled_sz == 0) {
01060              EOUT(("Event data not filled - event skipped!!!"));
01061              f_outptr = old;
01062          } else {
01063 
01064 //            DOUT0(("Calculated size = %u  filled size = %u", totalsize, filled_sz + sizeof(mbs::EventHeader)));
01065 
01066             evhdr->SetSubEventsSize(filled_sz);
01067 
01068             DOUT5(("Produce event %u of size %u", (unsigned) evhdr->EventNumber()  ,(unsigned) evhdr->FullSize()));
01069 
01070             if (fBnetMode) {
01071                // FIXME: implement bnet
01072                EOUT(("BNet mode not yet implemented !!!"));
01073 
01074                throw dabc::Exception("BNET mode not yet implemented");
01075 
01076 //               fOutBuf->SetHeaderSize(sizeof(bnet::EventId));
01077 //               uint32_t ev = min_evnt;
01078 //               if (fSyncScaleDown>0) ev = ev / fSyncScaleDown;
01079 //               *((bnet::EventId*) fOutBuf->GetHeader()) =  ev;
01080 //               // DOUT0(("Produce event %u", ev));
01081                FlushOutputBuffer();
01082             }
01083          }
01084       } else {
01085          SetInfo(dabc::format("Skip event %7u", min_evnt));
01086       }
01087 
01088 
01089       doagain = true;
01090       for (unsigned ninp = 0; ninp<fInp.size(); ninp++)
01091          if (fInp[ninp].use)
01092             if (!SkipEvent(ninp)) doagain = false;
01093    }
01094 }
01095 
01099 bool roc::CombinerModule::FlushOutputBuffer()
01100 {
01101    // if no buffer, nothing to flush - exit normally
01102    if (fOutBuf.null()) return true;
01103 
01104    dabc::BufferSize_t usedsize = fOutBuf.GetTotalSize() - f_outptr.fullsize();
01105    if (usedsize==0) return true;
01106 
01107    // all outputs must be able to get buffer for sending
01108    if (!CanSendToAllOutputs()) {
01109 //      for(unsigned n=0;n<NumOutputs();n++)
01110 //         if (!Output(n)->CanSend())
01111 //            DOUT0(("Cannot send buffer to output %d from %d conn:%s  pend:%d capacity:%d ",
01112 //                  n, NumOutputs(), DBOOL(Output(n)->IsConnected()), Output(n)->OutputPending(), Output(n)->OutputQueueCapacity()));
01113 //      DOUT0(("Cannot send buffer to all output!!!"));
01114       return false;
01115    }
01116 
01117 //   DOUT1(("Filled size = %u", usedsize));
01118 
01119    f_outptr.reset();
01120 
01121    fOutBuf.SetTotalSize(usedsize);
01122 
01123    fOutBuf.SetTypeId(mbs::mbt_MbsEvents);
01124 
01125 /*
01126    DOUT0(("Flush output buffer %u of size %u", fOutBuf.SegmentId(), (unsigned) fOutBuf.GetTotalSize()));
01127    unsigned numevents=0;
01128    mbs::ReadIterator iter(fOutBuf);
01129    while (iter.NextEvent()) {
01130       DOUT0(("   Flush event %u size %u", iter.evnt()->EventNumber(), iter.evnt()->FullSize()));
01131       numevents++;
01132    }
01133    DOUT0(("Flush output buffer numevents %u", numevents));
01134 */
01135 
01136    SendToAllOutputs(fOutBuf.HandOver());
01137 
01138    fFlushFlag = false;
01139 
01140    return true;
01141 }
01142 
01143 unsigned roc::CombinerModule::FillRawSubeventsBuffer(dabc::Buffer& outbuf, dabc::Pointer& outptr, roc::MessagesVector* extra)
01144 {
01145    unsigned filled_size = 0;
01146 
01147    bool iserr = false;
01148 
01149    for (unsigned ninp = 0; ninp < fInp.size(); ninp++) {
01150       InputRec* rec = &(fInp[ninp]);
01151       if (!rec->use) continue;
01152 
01153       mbs::SubeventHeader* subhdr = (mbs::SubeventHeader*) outptr();
01154       subhdr->Init();
01155       subhdr->iProcId = rec->data_err ? roc::proc_ErrEvent : roc::proc_RocEvent;
01156       subhdr->iSubcrate = rec->rocid;
01157       subhdr->iControl = rec->format; // roc::formatEth1 == 0, compatible with older lmd files
01158 
01159       if (rec->data_err) iserr = true;
01160 
01161       outbuf.Shift(outptr, sizeof(mbs::SubeventHeader));
01162       filled_size+=sizeof(mbs::SubeventHeader);
01163 
01164       unsigned msg_size = roc::Message::RawSize(rec->format);
01165 
01166       unsigned subeventsize = msg_size;
01167 
01168       roc::Message msg;
01169       msg.setMessageType(roc::MSG_EPOCH);
01170       msg.setRocNumber(rec->rocid);
01171       msg.setEpochNumber(rec->prev_epoch);
01172       msg.setEpochMissed(0);
01173       msg.copyto(outptr(), rec->format);
01174 
01175       outbuf.Shift(outptr, msg_size);
01176 
01177       unsigned nbuf = rec->prev_nbuf;
01178 
01179       bool firstmsg = true;
01180 
01181       while (nbuf<=rec->next_nbuf) {
01182          dabc::Buffer& inbuf = Input(ninp)->InputBuffer(nbuf);
01183          if (inbuf.null()) {
01184             EOUT(("Internal error"));
01185             return 0;
01186          }
01187 
01188          // if next epoch in the same buffer, limit its size by next_indx + 6 (with next event)
01189          dabc::Pointer ptr = inbuf.GetPointer();
01190          if (nbuf == rec->next_nbuf) ptr.setfullsize(rec->next_indx + msg_size);
01191          if (nbuf == rec->prev_nbuf) inbuf.Shift(ptr, rec->prev_indx);
01192 
01193          // DOUT0(("Copy to output %u has %u", ptr.fullsize(), outptr.fullsize()));
01194          if (firstmsg && extra) {
01195             // if we have extra messages, put them right after first sync message
01196             outptr.copyfrom(ptr, msg_size);
01197             outbuf.Shift(outptr, msg_size);
01198             subeventsize += msg_size;
01199             inbuf.Shift(ptr, msg_size);
01200             AddExtraMessagesToSubevent(outbuf, extra, outptr, subeventsize, rec);
01201          }
01202 
01203          // copy rest of the data
01204          outptr.copyfrom(ptr, ptr.fullsize());
01205          outbuf.Shift(outptr, ptr.fullsize());
01206          subeventsize += ptr.fullsize();
01207          firstmsg = false;
01208 
01209          nbuf++;
01210       }
01211 
01212       filled_size += subeventsize;
01213       subhdr->SetRawDataSize(subeventsize);
01214    }
01215 
01216    Par("RocEvents").SetInt(1);
01217 
01218    if (iserr) Par("RocErrors").SetInt(1);
01219 
01220    if (extra!=0) delete extra;
01221 
01222    return filled_size;
01223 }
01224 
01225 void roc::CombinerModule::AddExtraMessagesToSubevent(dabc::Buffer& buf, roc::MessagesVector* extra, dabc::Pointer& outptr, unsigned& subeventsize, InputRec* rec)
01226 {
01227    unsigned msg_size = roc::Message::RawSize(rec->format);
01228 
01229 //   int cnt = 0;
01230 
01231    for (unsigned n=0;n<extra->size();n++)
01232       if (extra->at(n).getRocNumber() == rec->rocid) {
01233          extra->at(n).copyto(outptr(), rec->format);
01234          buf.Shift(outptr, msg_size);
01235          subeventsize += msg_size;
01236 //         cnt++;
01237       }
01238 
01239 //   if (cnt>0) DOUT0(("Add %d extra messages for ROC%u", cnt, rec->rocid));
01240 }
01241 
01242 void roc::CombinerModule::DumpData(dabc::Buffer& buf)
01243 {
01244    roc::Iterator iter;
01245    if (!InitIterator(iter, buf)) return;
01246 
01247    while (iter.next())
01248       iter.msg().printData(3);
01249 }
01250 
01251 int roc::CombinerModule::ExecuteCommand(dabc::Command cmd)
01252 {
01253    if (cmd.IsName("ConfigureInput")) {
01254       int recid = cmd.GetInt("Input", 0);
01255 
01256       if ((recid<0) || (recid>=(int)fInp.size())) {
01257          EOUT(("Something wrong with input configurations"));
01258          return dabc::cmd_false;
01259       }
01260       InputRec* rec = &(fInp[recid]);
01261 
01262       if (!rec->isrocid()) {
01263          rec->isudp = cmd.GetBool("IsUdp", true);
01264          rec->rocid = cmd.GetInt("ROCID", 0);
01265          rec->format = cmd.GetInt("Format", 0);
01266 
01267          SetInfo(dabc::format("Configure input %u with ROC:%d Kind:%s", recid, rec->rocid, (rec->isudp ? "UDP" : "Optic")), true);
01268       }
01269 
01270       return dabc::cmd_true;
01271 
01272    } else
01273 
01274    if (cmd.IsName(CmdMessagesVector::CmdName())) {
01275       MessagesVector* vect = (MessagesVector*) cmd.GetPtr(CmdMessagesVector::Vector());
01276 
01277       if (vect!=0)
01278          fExtraMessages.push_back(vect);
01279       else
01280          EOUT(("Zero vector with extra messages"));
01281 
01282       if (fExtraMessages.size()>10) {
01283          EOUT(("Too many extra messages, remove part of them"));
01284          delete fExtraMessages.front();
01285          fExtraMessages.pop_front();
01286       }
01287 
01288       return dabc::cmd_true;
01289    }
01290 
01291    return dabc::ModuleAsync::ExecuteCommand(cmd);
01292 }
01293 
01294 

Generated on Tue Dec 10 2013 04:52:23 for ROCsoft by  doxygen 1.7.1