00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "roc/CombinerModule.h"
00016
00017 #include "roc/Board.h"
00018 #include "roc/Iterator.h"
00019 #include "roc/Commands.h"
00020
00021 #include "dabc/logging.h"
00022 #include "dabc/Pointer.h"
00023 #include "dabc/PoolHandle.h"
00024 #include "dabc/MemoryPool.h"
00025 #include "dabc/Command.h"
00026 #include "dabc/Port.h"
00027 #include "dabc/Timer.h"
00028 #include "dabc/Pointer.h"
00029 #include "dabc/Manager.h"
00030 #include "dabc/Application.h"
00031
00032 #include "mbs/LmdTypeDefs.h"
00033 #include "mbs/MbsTypeDefs.h"
00034
00035
00036
00037 const char* roc::xmlBnetMode = "BnetMode";
00038 const char* roc::xmlSkipErrorData = "SkipErrorData";
00039 const char* roc::xmlIgnoreMissingEpoch= "IgnoreMissingEpoch";
00040 const char* roc::xmlSyncNumber = "SyncNumber";
00041 const char* roc::xmlSyncScaleDown = "SyncScaleDown";
00042 const char* roc::xmlSpillRoc = "SpillRoc";
00043 const char* roc::xmlSpillAux = "SpillAux";
00044 const char* roc::xmlCalibrationPeriod = "CalibrationPeriod";
00045 const char* roc::xmlCalibrationLength = "CalibrationLength";
00046 const char* roc::xmlThrottleAux = "ThrottleAux";
00047 const char* roc::xmlGet4ResetPeriod = "Get4ResetPeriod";
00048 const char* roc::xmlGet4ResetLimit = "Get4ResetLimit";
00049
00050
00051 bool InitIterator(roc::Iterator& iter, dabc::Buffer& buf, unsigned shift = 0, bool isudp = false, unsigned rocid = 0)
00052 {
00053 if (buf.null()) return false;
00054
00055 if ((buf.GetTypeId() < roc::rbt_RawRocData) ||
00056 (buf.GetTypeId() > roc::rbt_RawRocData + roc::formatNormal)) return false;
00057
00058 int fmt = buf.GetTypeId() - roc::rbt_RawRocData;
00059
00060 iter.setFormat(fmt);
00061 if (isudp) iter.setRocNumber(rocid);
00062
00063 void* ptr = buf.SegmentPtr();
00064 unsigned size = buf.SegmentSize();
00065
00066 if (shift>=size) return false;
00067
00068 iter.assign((uint8_t*) ptr + shift, size - shift);
00069
00070 return true;
00071 }
00072
00073
00074 roc::CombinerModule::CombinerModule(const char* name, dabc::Command cmd) :
00075 dabc::ModuleAsync(name, cmd),
00076 fInpPool(0),
00077 fOutPool(0),
00078 fBufferSize(0),
00079 fSimpleMode(false),
00080 fBnetMode(false),
00081 fOutBuf(),
00082 f_outptr(),
00083 fSpillRoc(-1),
00084 fSpillAux(-1),
00085 fSpillState(true),
00086 fCalibrationPeriod(-1.),
00087 fCalibrationLength(1.),
00088 fLastCalibrationTime(),
00089 fExtraMessages()
00090 {
00091
00092
00093 fBnetMode = Cfg(roc::xmlBnetMode, cmd).AsBool(false);
00094
00095 fSkipErrorData = Cfg(roc::xmlSkipErrorData, cmd).AsBool(false);
00096
00097 fIgnoreMissingEpoch = Cfg(roc::xmlIgnoreMissingEpoch, cmd).AsBool(false);
00098 fSyncScaleDown = Cfg(roc::xmlSyncScaleDown, cmd).AsInt(-1);
00099 fSyncNumber = Cfg(roc::xmlSyncNumber, cmd).AsInt(0);
00100 if ((fSyncNumber<0) || (fSyncNumber>1)) fSyncNumber = 0;
00101
00102 if ((fSyncScaleDown>=0) && (fSyncScaleDown<32))
00103 fSyncScaleDown = 1 << fSyncScaleDown;
00104 else
00105 if (fSyncScaleDown == 77) {
00106 DOUT0(("Special mode for FEET readout - take data as is, pack into MBS buffer and store"));
00107 fSimpleMode = true;
00108 fSyncScaleDown = -1;
00109 } else {
00110 EOUT(("Sync scale down factor not specified. Use 1 as default"));
00111 fSyncScaleDown = -1;
00112 }
00113
00114 int numinputs = Cfg(dabc::xmlNumInputs,cmd).AsInt(1);
00115 fBufferSize = Cfg(dabc::xmlBufferSize, cmd).AsInt(16384);
00116 int numoutputs = Cfg(dabc::xmlNumOutputs, cmd).AsInt(2);
00117 fSpillRoc = Cfg(roc::xmlSpillRoc, cmd).AsInt(-1);
00118 fSpillAux = Cfg(roc::xmlSpillAux, cmd).AsInt(-1);
00119 fCalibrationPeriod = Cfg(roc::xmlCalibrationPeriod, cmd).AsDouble(-1.);
00120 fCalibrationLength = Cfg(roc::xmlCalibrationLength, cmd).AsDouble(0.5);
00121
00122 fGet4ResetPeriod = Cfg(roc::xmlGet4ResetPeriod, cmd).AsDouble(-1.);
00123
00124
00125 fGet4ResetLimit = Cfg(roc::xmlGet4ResetLimit, cmd).AsDouble(-1);
00126 fLastGet4ResetTm = dabc::Now();
00127 fDetectGet4Problem = false;
00128
00129 if (fGet4ResetLimit>0)
00130 DOUT0(("Enable GET4 reset at maximum every %5.1f s", fGet4ResetLimit));
00131
00132 double flushTime = Cfg(dabc::xmlFlushTimeout, cmd).AsDouble(-1.);
00133
00134 if (flushTime>0.)
00135 CreateTimer("FlushTimer", flushTime, false);
00136 fFlushFlag = false;
00137
00138 DOUT1(("CombinerModule name:%s numinp = %d SyncScaleDown = %d BnetMode = %s flushtime = %5.1f", GetName(), numinputs, fSyncScaleDown, DBOOL(fBnetMode), flushTime));
00139
00140 fLastCalibrationTime.Reset();
00141
00142 fLastSpillTime = 0;
00143
00144 CreatePar("SpillState").SetInt(-1);
00145
00146 if (fSpillAux>=0) {
00147 if (fCalibrationPeriod < 5.) fCalibrationPeriod = 5.;
00148 if (fCalibrationLength < 0.1) fCalibrationLength = 0.1;
00149
00150 DOUT1(("CombinerModule spill detection ROC:%d aux:%d period:%4.1f length:%3.1f", fSpillRoc, fSpillAux, fCalibrationPeriod, fCalibrationLength));
00151
00152 CreatePar("SpillRate").SetRatemeter(false, 10).SetUnits("spill");
00153
00154
00155 CreateTimer("CalibrTimer", -1., false);
00156 }
00157
00158 if (fGet4ResetPeriod>0) {
00159 CreateTimer("Get4Timer", fGet4ResetPeriod, false);
00160 }
00161
00162 CreatePar("RocData").SetRatemeter(false, 3.);
00163
00164 CreatePar("RocEvents").SetRatemeter(false, 3.).SetUnits("ev");
00165
00166 CreatePar("RocErrors").SetRatemeter(false, 3.).SetUnits("ev");
00167
00168 if (Par("RocData").GetDebugLevel()<0) Par("RocData").SetDebugLevel(1);
00169 if (Par("RocEvents").GetDebugLevel()<0) Par("RocEvents").SetDebugLevel(1);
00170
00171
00172 fThrottleAux = Cfg(roc::xmlThrottleAux, cmd).AsInt(-1);
00173
00174 std::string pname = Cfg(dabc::xmlInputPoolName, cmd).AsStdStr(roc::xmlRocPool);
00175
00176 fInpPool = CreatePoolHandle(pname.c_str());
00177
00178 for(unsigned num=0; num < (unsigned) numinputs; num++) {
00179 CreateInput(FORMAT(("Input%u", num)), fInpPool, Cfg(dabc::xmlInputQueueSize, cmd).AsInt(10));
00180 Input(num)->SetInpRateMeter(Par("RocData"));
00181
00182 DOUT2(("Create input%u queue size = %u capacity %u", num, Input(num)->InputQueueSize(), Input(num)->InputQueueCapacity()));
00183
00184 fInp.push_back(InputRec());
00185 fInp[num].use = false;
00186 if (fThrottleAux>=0)
00187 CreatePar(FORMAT(("Throttle%u", num))).SetRatemeter(false, 3.).SetLimits(0., 100.);
00188 }
00189
00190 fOutPool = CreatePoolHandle(Cfg(dabc::xmlOutputPoolName, cmd).AsStdStr(roc::xmlRocPool).c_str());
00191
00192 for(int n=0; n<numoutputs; n++)
00193 CreateOutput(FORMAT(("Output%d", n)), fOutPool, Cfg(dabc::xmlOutputQueueSize, cmd).AsInt(10));
00194
00195 CreatePar("RocInfo", "info").SetSynchron(true, 2., false);
00196 SetInfo(dabc::format("ROC combiner module ready. NumInputs = %d" , numinputs), true);
00197 }
00198
00199 roc::CombinerModule::~CombinerModule()
00200 {
00201 DOUT3(("roc::CombinerModule::~CombinerModule"));
00202
00203 }
00204
00205 void roc::CombinerModule::ModuleCleanup()
00206 {
00207 DOUT3(("roc::CombinerModule::ModuleCleanup()"));
00208 while (fExtraMessages.size()>0) {
00209 delete fExtraMessages.front();
00210 fExtraMessages.pop_front();
00211 }
00212
00213 fOutBuf.Release();
00214 }
00215
00216
00217 void roc::CombinerModule::SetInfo(const std::string& info, bool forceinfo)
00218 {
00219 Par("RocInfo").SetStr(info);
00220
00221 if (forceinfo) Par("RocInfo").FireModified();
00222 }
00223
00224
00225 void roc::CombinerModule::ProcessInputEvent(dabc::Port* inport)
00226 {
00227 unsigned inpid = InputNumber(inport);
00228
00229 if (!inport->CanRecv()) {
00230 EOUT(("Something wrong with input %u %s", inpid, inport->GetName()));
00231 return;
00232 }
00233
00234 if (fSimpleMode) {
00235
00236 FillSimpleBuffer();
00237
00238
00239
00240
00241
00242
00243
00244
00245 return;
00246 }
00247
00248
00249
00250
00251 FindNextEvent(inpid);
00252
00253 FillBuffer();
00254 }
00255
00256
00257 void roc::CombinerModule::BeforeModuleStart()
00258 {
00259 if (fSpillAux==77) {
00260 DOUT0(("Shoot timer for the first time"));
00261 ShootTimer("CalibrTimer", fCalibrationPeriod);
00262 }
00263 }
00264
00265 void roc::CombinerModule::PoolHandleCleaned(dabc::PoolHandle* pool)
00266 {
00267 DOUT3(("Cleanup data from pool %p name %s isout %s", pool, pool->GetName(), DBOOL(pool==fOutPool) ));
00268
00269 if (pool==fOutPool) {
00270 f_outptr.reset();
00271 fOutBuf.Release();
00272 }
00273 }
00274
00275 void roc::CombinerModule::ProcessTimerEvent(dabc::Timer* timer)
00276 {
00277 if (timer->IsName("CalibrTimer")) {
00278
00279 if (fSpillAux==77) {
00280
00281 fSpillState = !fSpillState;
00282
00283 dabc::mgr.GetApp().Submit(roc::CmdCalibration(fSpillState));
00284
00285 ShootTimer("CalibrTimer", fSpillState ? fCalibrationLength : fCalibrationPeriod);
00286 } else {
00287 dabc::mgr.GetApp().Submit(roc::CmdCalibration(false));
00288 }
00289 } else
00290
00291 if (timer->IsName("FlushTimer")) {
00292 if (fFlushFlag) FlushOutputBuffer();
00293 fFlushFlag = true;
00294 } else
00295
00296 if (timer->IsName("Get4Timer")) {
00297 InvokeAllGet4Reset();
00298 }
00299 }
00300
00301 bool roc::CombinerModule::FindNextEvent(unsigned recid)
00302 {
00303 if (recid>=fInp.size()) return false;
00304
00305 InputRec* rec = &(fInp[recid]);
00306
00307 dabc::Port* port = Input(recid);
00308
00309 if (!rec->isrocid()) {
00310 rec->isudp = (port->GetTransportParameter(roc::xmlTransportKind) == kind_UDP);
00311 rec->rocid = port->GetTransportParameter(roc::xmlRocNumber);
00312 rec->format = port->GetTransportParameter(roc::xmlMsgFormat);
00313
00314 SetInfo(dabc::format("Detect on input %u ROC:%d Kind:%s", recid, rec->rocid, (rec->isudp ? "UDP" : "Optic")), true);
00315 }
00316
00317
00318
00319
00320 if (rec->isready) return true;
00321
00322 while (port->HasInputBuffer(rec->curr_nbuf)) {
00323
00324 dabc::Buffer& buf = port->InputBuffer(rec->curr_nbuf);
00325
00326 if (rec->curr_indx >= buf.GetTotalSize()) {
00327 if (rec->isprev || rec->isnext) {
00328 rec->curr_nbuf++;
00329 if (rec->curr_nbuf == port->InputQueueCapacity()) {
00330 SetInfo(dabc::format("Skip all data while we cannot find two events in complete queue for input %u nbuf %u isprev %s isnext %s prevnbuf %u", recid, rec->curr_nbuf, DBOOL(rec->isprev), DBOOL(rec->isnext), rec->prev_nbuf));
00331 DOUT0(("Skip all data while we cannot find two events in complete queue for input %u nbuf %u isprev %s isnext %s prevnbuf %u prevevnt %u buftotalsize %u", recid, rec->curr_nbuf, DBOOL(rec->isprev), DBOOL(rec->isnext), rec->prev_nbuf, rec->prev_evnt, buf.GetTotalSize()));
00332
00333 rec->isprev = false;
00334 rec->isnext = false;
00335 port->SkipInputBuffers(rec->curr_nbuf);
00336 rec->curr_nbuf = 0;
00337 }
00338 } else {
00339
00340 port->SkipInputBuffers(rec->curr_nbuf+1);
00341 rec->curr_nbuf = 0;
00342 }
00343 rec->curr_indx = 0;
00344 continue;
00345 }
00346
00347
00348
00349 roc::Iterator iter;
00350
00351 InitIterator(iter, buf, rec->curr_indx, rec->isudp, rec->rocid);
00352
00353 bool iserr = false;
00354 roc::Message* data = 0;
00355
00356 while (iter.next()) {
00357
00358 data = & iter.msg();
00359
00360 if (rec->IsDifferentRocId(data->getRocNumber(), iserr)) {
00361 if (iserr)
00362 EOUT(("Input:%u Kind:%s Mismatch in ROC numbers %u %u", recid, (rec->isudp ? "UDP" : "Optic"), rec->rocid, data->getRocNumber()));
00363 rec->curr_indx += iter.getMsgSize();
00364 continue;
00365 }
00366
00367 switch (data->getMessageType()) {
00368
00369 case roc::MSG_HIT: {
00370 break;
00371 }
00372
00373 case roc::MSG_EPOCH: {
00374 rec->curr_epoch = data->getEpochNumber();
00375 rec->iscurrepoch = true;
00376 break;
00377 }
00378
00379 case roc::MSG_SYNC: {
00380
00381 if (data->getSyncChNum()==fSyncNumber) {
00382 bool isepoch = rec->iscurrepoch;
00383
00384 if (!isepoch && fIgnoreMissingEpoch) {
00385
00386
00387
00388
00389 rec->curr_epoch++;
00390 isepoch = true;
00391 }
00392
00393 if (!isepoch) {
00394 SetInfo(dabc::format("Found SYNC marker %6x without epoch", data->getSyncData()));
00395 } else
00396 if ((fSyncScaleDown>0) && (data->getSyncData() % fSyncScaleDown != 0)) {
00397 SetInfo(dabc::format("Roc%u SYNC marker %06x not in expected sync step %02x",
00398 rec->rocid, data->getSyncData(), fSyncScaleDown));
00399 } else {
00400 if (!rec->isprev) {
00401 rec->prev_epoch = rec->curr_epoch;
00402 rec->isprev = true;
00403 rec->prev_nbuf = rec->curr_nbuf;
00404 rec->prev_indx = rec->curr_indx;
00405 rec->prev_evnt = data->getSyncData();
00406 rec->prev_stamp = data->getSyncTs();
00407 rec->data_length = 0;
00408 } else {
00409 rec->next_epoch = rec->curr_epoch;
00410 rec->isnext = true;
00411 rec->next_nbuf = rec->curr_nbuf;
00412 rec->next_indx = rec->curr_indx;
00413 rec->next_evnt = data->getSyncData();
00414 rec->next_stamp = data->getSyncTs();
00415 }
00416 }
00417 }
00418 break;
00419 }
00420
00421 case roc::MSG_AUX: {
00422
00423 if ((fThrottleAux>=0) && (data->getAuxChNum()==(unsigned)fThrottleAux)) {
00424 uint64_t tm = data->getMsgFullTime(rec->curr_epoch);
00425 bool state = data->getAuxFalling() == 0;
00426
00427 if ((rec->last_thottle_tm != 0) && (rec->last_throttle_state != state)) {
00428 uint64_t dist = roc::Message::CalcDistance(rec->last_thottle_tm, tm);
00429
00430 if (rec->last_throttle_state)
00431 Par(FORMAT(("Throttle%u", data->getRocNumber()))).SetDouble(dist * 1e-7);
00432 }
00433
00434 rec->last_thottle_tm = tm;
00435 rec->last_throttle_state = state;
00436 } else
00437 if ((fSpillRoc>=0) && ((unsigned)fSpillRoc==data->getRocNumber())
00438 && ((unsigned)fSpillAux == data->getAuxChNum())) {
00439
00440 uint64_t tm = data->getMsgFullTime(rec->curr_epoch);
00441
00442 bool faraway = (roc::Message::CalcDistance(fLastSpillTime, tm) > 10000000);
00443 if (tm < fLastSpillTime) faraway = true;
00444
00445 bool changed = false;
00446
00447 if ((data->getAuxFalling()!=0) && fSpillState && faraway) {
00448 DOUT1(("DETECT SPILL OFF"));
00449 fSpillState = false;
00450 fLastSpillTime = tm;
00451 changed = true;
00452 } else
00453 if ((data->getAuxFalling()==0) && !fSpillState && faraway) {
00454 DOUT1(("DETECT SPILL ON"));
00455 fSpillState = true;
00456 fLastSpillTime = tm;
00457 changed = true;
00458 Par("SpillRate").SetInt(1);
00459 }
00460
00461 if (changed) Par("SpillState").SetInt(fSpillState ? 1 : 0);
00462
00463
00464
00465
00466 if ((fCalibrationPeriod>0) && changed && !fSpillState) {
00467 dabc::TimeStamp now = dabc::Now();
00468 double dist = fCalibrationPeriod + 1000;
00469
00470 if (!fLastCalibrationTime.null())
00471 dist = now - fLastCalibrationTime;
00472
00473
00474
00475 if (dist > fCalibrationPeriod) {
00476 fLastCalibrationTime = now;
00477 DOUT0(("Invoke autocalibr mode after %5.1f s", dist));
00478
00479 dabc::mgr.GetApp().Submit(roc::CmdCalibration(true));
00480 ShootTimer("CalibrTimer", fCalibrationLength);
00481 }
00482 }
00483
00484 }
00485 break;
00486 }
00487
00488 case roc::MSG_EPOCH2: {
00489 uint32_t get4 = data->getEpoch2ChipNumber();
00490 uint32_t epoch2 = data->getEpoch2Number();
00491
00492 if (get4<MaxGet4 && rec->canCheckAnyGet4) {
00493 if ((rec->lastEpoch2[get4]!=0) && (epoch2!=0) && (epoch2 > rec->lastEpoch2[get4])) {
00494 uint32_t diff = epoch2 - rec->lastEpoch2[get4];
00495
00496 if ((diff!=1) && (diff<5) && rec->canCheckGet4[get4]) {
00497 DOUT0(("Detect error epoch2 %u shift = %u on ROC:%u Get4:%u", epoch2, diff, rec->rocid, get4));
00498 fDetectGet4Problem = true;
00499 }
00500 }
00501
00502 rec->lastEpoch2[get4] = epoch2;
00503 if (data->getEpoch2Sync()) {
00504
00505 if (epoch2 > 0xff000000) {
00506 DOUT0(("GET4 epoch2 %u number on ROC:%u Get4:%u too large", epoch2, rec->rocid, get4));
00507 if (epoch2>0xfffff000) dabc::mgr.StopApplication();
00508 }
00509
00510 if (rec->iscurrepoch && (rec->curr_epoch > 0xf0000000)) {
00511 DOUT0(("250 MHz epoch = %u on ROC:%u with GET4 readout closer to overflow ", rec->curr_epoch, rec->rocid));
00512 if (rec->curr_epoch > 0xffff0000) dabc::mgr.StopApplication();
00513 }
00514
00515 if (!rec->canCheckGet4[get4]) {
00516 DOUT0(("Enable checking of GET4:%u on ROC:%u", get4, rec->rocid));
00517 rec->canCheckGet4[get4] = true;
00518 }
00519
00520 unsigned mod = epoch2 % 25;
00521 if (mod!=0) {
00522 fDetectGet4Problem = true;
00523
00524 if (rec->lastEpoch2SyncErr[get4]!=mod)
00525 DOUT0(("Detect wrong epoch2 : %u (mod=%u) value when sync=1 ROC:%u Get4:%u", epoch2, mod, rec->rocid, get4));
00526
00527 rec->lastEpoch2SyncErr[get4] = mod;
00528 }
00529
00530
00531 if (epoch2 % 25000 == 0) {
00532
00533 for (int nch=0;nch<MaxGet4Ch;nch++) {
00534 if ((rec->get4AllCnt[get4][nch] > 10) &&
00535 (rec->get4ErrCnt[get4][nch] > 0.2 * rec->get4AllCnt[get4][nch])) {
00536 DOUT0(("Suspicious distance on ROC:%d Get4:%u Channel:%u fullcnt:%d errcnt:%d\n", rec->rocid, get4, nch, rec->get4AllCnt[get4][nch], rec->get4ErrCnt[get4][nch]));
00537 fDetectGet4Problem = true;
00538 }
00539 rec->get4AllCnt[get4][nch]=0;
00540 rec->get4ErrCnt[get4][nch]=0;
00541
00542 rec->get4EdgeErrs[get4][nch]=0;
00543 }
00544 rec->lastEpoch2SyncErr[get4]=0;
00545 }
00546 }
00547 } else
00548 if (!rec->canCheckAnyGet4 && (dabc::Now() > fLastGet4ResetTm + 1.)) {
00549 DOUT0(("Enable GET4 checking again for ROC%u", rec->rocid));
00550 rec->canCheckAnyGet4 = true;
00551 }
00552
00553 if (fDetectGet4Problem && (fGet4ResetLimit>0.)) {
00554 if (dabc::Now() > fLastGet4ResetTm + fGet4ResetLimit)
00555 InvokeAllGet4Reset();
00556 }
00557
00558 break;
00559 }
00560
00561 case roc::MSG_GET4: {
00562 unsigned g4id = data->getGet4Number();
00563
00564 if (rec->canCheckGet4[g4id] && rec->canCheckAnyGet4) {
00565
00566 unsigned g4ch = data->getGet4ChNum();
00567 unsigned g4fl = data->getGet4Edge();
00568
00569 uint64_t fulltm = data->getMsgFullTime(rec->lastEpoch2[g4id]);
00570
00571 bool change_edge = ((rec->get4EdgeCnt[g4id][g4ch]>0) ^ g4fl);
00572
00573 if (change_edge) {
00574 rec->get4AllCnt[g4id][g4ch]++;
00575 uint64_t diff2 = (fulltm - rec->get4LastTm[g4id][g4ch]);
00576
00577
00578 for (unsigned k=1;k<4;k++)
00579 if ((diff2 > (k*26214 - 500)) && (diff2 < (k*26214 + 500))) {
00580 rec->get4ErrCnt[g4id][g4ch]++;
00581 break;
00582 }
00583
00584
00585 if (abs(rec->get4EdgeCnt[g4id][g4ch])>3) {
00586
00587 if (rec->get4EdgeErrs[g4id][g4ch]++<3)
00588 DOUT0(("EDGE failure ROC:%u Get4:%u Channel:%u EdgeCnt:%d Last epoch2:%u", rec->rocid, g4id, g4ch, rec->get4EdgeCnt[g4id][g4ch], rec->lastEpoch2[g4id]));
00589 else
00590 fDetectGet4Problem = true;
00591 }
00592
00593
00594 rec->get4EdgeCnt[g4id][g4ch] = 0;
00595 }
00596
00597 rec->get4EdgeCnt[g4id][g4ch] += (g4fl ? +1 : -1);
00598
00599 rec->get4LastTm[g4id][g4ch] = fulltm;
00600
00601 }
00602
00603 break;
00604 }
00605
00606 case roc::MSG_SYS: {
00607 switch (data->getSysMesType()) {
00608 case roc::SYSMSG_SYNC_PARITY:
00609 SetInfo(dabc::format("Roc%u Sync parity", rec->rocid));
00610 break;
00611 case SYSMSG_USER:
00612 SetInfo(dabc::format("Roc%u user sys message %u", rec->rocid, data->getSysMesData()));
00613 if (data->getSysMesData()==roc::SYSMSG_USER_RECONFIGURE) {
00614
00615 DOUT2(("One could start checking of GET4 messages again"));
00616 rec->canCheckAnyGet4 = true;
00617 }
00618 break;
00619 default:
00620 SetInfo(dabc::format("Roc%u SysMsg type = %u", rec->rocid, data->getSysMesType()));
00621 }
00622 break;
00623 }
00624
00625 }
00626
00627 rec->curr_indx += iter.getMsgSize();
00628
00629 if (rec->isprev) rec->data_length += iter.getMsgSize();
00630
00631 if (rec->isprev && rec->isnext) {
00632 DOUT5(("ROCID:%u Find sync events %u - %u fmt:%u", rec->rocid, rec->prev_evnt, rec->next_evnt, rec->format));
00633
00634
00635
00636
00637
00638 rec->isready = true;
00639 return true;
00640 }
00641 }
00642 }
00643
00644 return false;
00645 }
00646
00647
00648 void roc::CombinerModule::InvokeAllGet4Reset()
00649 {
00650 for (unsigned ninp = 0; ninp < fInp.size(); ninp++) {
00651 fInp[ninp].canCheckAnyGet4 = false;
00652 for (int n=0;n<MaxGet4;n++) {
00653 fInp[ninp].canCheckGet4[n] = false;
00654 fInp[ninp].lastEpoch2SyncErr[n] = 0;
00655 fInp[ninp].lastEpoch2[n] = 0;
00656 for(int ch=0;ch<MaxGet4Ch;ch++) {
00657 fInp[ninp].get4EdgeCnt[n][ch] = 0;
00658 fInp[ninp].get4EdgeErrs[n][ch] = 0;
00659 fInp[ninp].get4AllCnt[n][ch] = 0;
00660 fInp[ninp].get4ErrCnt[n][ch] = 0;
00661 fInp[ninp].get4LastTm[n][ch] = 0;
00662 }
00663 }
00664 }
00665
00666 fLastGet4ResetTm = dabc::Now();
00667 fDetectGet4Problem = false;
00668 DOUT0(("Submit GET4 reset command"));
00669 dabc::mgr.GetApp().Submit(dabc::Command("ResetAllGet4"));
00670 }
00671
00672
00673 void roc::CombinerModule::ProcessOutputEvent(dabc::Port* inport)
00674 {
00675 if (fSimpleMode) {
00676 FillSimpleBuffer();
00677 return;
00678 }
00679
00680 FillBuffer();
00681 }
00682
00683
00684 void roc::CombinerModule::FillSimpleBuffer()
00685 {
00686
00687 dabc::Buffer inpbuf;
00688 unsigned ninp = 0;
00689 dabc::Pointer inpptr;
00690
00691 while (1) {
00692
00693 for (unsigned n=0; (n < NumInputs()) && inpbuf.null(); n++)
00694 if (Input(n)->CanRecv()) {
00695 ninp = n;
00696 inpbuf = Input(ninp)->Recv();
00697 inpptr = inpbuf.GetPointer();
00698 break;
00699 }
00700
00701 if (inpbuf.null()) return;
00702
00703 if (!CanSendToAllOutputs()) return;
00704
00705 unsigned rawdata_sz = inpptr.fullsize();
00706
00707 unsigned needsize = rawdata_sz + sizeof(mbs::EventHeader) + sizeof(mbs::SubeventHeader);
00708 if (needsize < fBufferSize) needsize = fBufferSize;
00709
00710 dabc::Buffer outbuf = fOutPool->TakeBuffer(needsize);
00711 if (outbuf.null()) {
00712 EOUT(("Cannot get buffer %u from pool", needsize));
00713 return;
00714 }
00715
00716
00717 dabc::Pointer outptr = outbuf.GetPointer();
00718
00719 mbs::EventHeader* evhdr = (mbs::EventHeader*) outptr();
00720
00721 static int32_t evnt_num(1);
00722
00723 evhdr->Init(evnt_num++);
00724 outbuf.Shift(outptr, sizeof(mbs::EventHeader));
00725
00726 InputRec* rec = &(fInp[ninp]);
00727
00728 if (!rec->isrocid()) {
00729 rec->isudp = (Input(ninp)->GetTransportParameter(roc::xmlTransportKind) == kind_UDP);
00730 rec->rocid = Input(ninp)->GetTransportParameter(roc::xmlRocNumber);
00731 rec->format = Input(ninp)->GetTransportParameter(roc::xmlMsgFormat);
00732
00733 DOUT0(("Detect on input %u ROC:%d Kind:%s", ninp, rec->rocid, (rec->isudp ? "UDP" : "Optic")));
00734 }
00735
00736 mbs::SubeventHeader* subhdr = (mbs::SubeventHeader*) outptr();
00737 subhdr->Init();
00738 subhdr->iProcId = roc::proc_RawData;
00739 subhdr->iSubcrate = rec->rocid;
00740 subhdr->iControl = rec->format;
00741
00742 outbuf.Shift(outptr, sizeof(mbs::SubeventHeader));
00743
00744 unsigned msg_size = roc::Message::RawSize(rec->format);
00745
00746
00747 if (outptr.fullsize()<rawdata_sz) {
00748 rawdata_sz = outptr.fullsize() / msg_size * msg_size;
00749 DOUT1(("Too many raw data %u, take only %u", outptr.fullsize(), rawdata_sz));
00750 }
00751
00752 outptr.copyfrom(inpptr, rawdata_sz);
00753 outbuf.Shift(outptr, rawdata_sz);
00754
00755 if (rawdata_sz == inpptr.fullsize()) {
00756 inpptr.reset();
00757 inpbuf.Release();
00758 } else {
00759 inpbuf.Shift(inpptr, rawdata_sz);
00760 }
00761
00762 subhdr->SetRawDataSize(rawdata_sz);
00763
00764 evhdr->SetSubEventsSize(sizeof(mbs::SubeventHeader) + rawdata_sz);
00765
00766 dabc::BufferSize_t usedsize = outbuf.GetTotalSize() - outptr.fullsize();
00767 if (usedsize==0) {
00768 outbuf.Release();
00769 return;
00770 }
00771
00772 outbuf.SetTotalSize(usedsize);
00773
00774 outbuf.SetTypeId(mbs::mbt_MbsEvents);
00775
00776 SendToAllOutputs(outbuf);
00777 }
00778 }
00779
00780
00781
00782 bool roc::CombinerModule::SkipEvent(unsigned recid)
00783 {
00784 if (recid>=fInp.size()) return false;
00785
00786 InputRec* rec = &(fInp[recid]);
00787
00788 if ((rec==0) || !rec->isprev || !rec->isnext) return false;
00789
00790 rec->isready = false;
00791 rec->isprev = true;
00792 rec->prev_epoch = rec->next_epoch;
00793 rec->prev_nbuf = rec->next_nbuf;
00794 rec->prev_indx = rec->next_indx;
00795 rec->prev_evnt = rec->next_evnt;
00796 rec->prev_stamp = rec->next_stamp;
00797 rec->data_length = 0;
00798
00799 rec->isnext = false;
00800
00801 rec->nummbssync = 0;
00802 rec->firstmbssync = 0;
00803
00804 unsigned can_skip = rec->can_skip_buf();
00805
00806 if (can_skip>0) {
00807
00808
00809 Input(recid)->SkipInputBuffers(can_skip);
00810
00811 rec->did_skip_buf(can_skip);
00812 }
00813
00814
00815
00816
00817 return FindNextEvent(recid);
00818 }
00819
00820 uint32_t CalcEventDistanceNew(uint32_t prev_ev, uint32_t next_ev)
00821 {
00822 return (next_ev>=prev_ev) ? next_ev - prev_ev : next_ev + 0x1000000 - prev_ev;
00823 }
00824
00825 uint32_t CalcAbsEventDistanceNew(uint32_t ev1, uint32_t ev2)
00826 {
00827 uint32_t diff = ev1>ev2 ? ev1-ev2 : ev2-ev1;
00828 if (diff > 0x800000) diff = 0x1000000 - diff;
00829 return diff;
00830 }
00831
00832
00833 void roc::CombinerModule::FillBuffer()
00834 {
00835
00836
00837 bool doagain = true;
00838 uint32_t evnt_cut = 0;
00839
00840 while (doagain) {
00841
00842 doagain = false;
00843
00844 bool is_data_selected(false);
00845
00846
00847 for (unsigned ninp = 0; ninp < fInp.size(); ninp++) {
00848 InputRec* inp = &(fInp[ninp]);
00849 if (!inp->isprev) return;
00850 inp->use = false;
00851 inp->data_err = false;
00852 }
00853
00854
00855 for (unsigned ninp = 0; ninp < fInp.size(); ninp++) {
00856 InputRec* inp = &(fInp[ninp]);
00857
00858 if (!inp->isready) continue;
00859
00860 uint32_t diff = CalcEventDistanceNew(inp->prev_evnt, inp->next_evnt);
00861 if (fSyncScaleDown>0) {
00862 if (diff != (unsigned) fSyncScaleDown) {
00863 DOUT0(("ROC%u: Events shift %06x -> %06x = %02x, expected %02x, ignore", inp->rocid, inp->prev_evnt, inp->next_evnt, diff, fSyncScaleDown));
00864 inp->data_err = true;
00865 }
00866 } else {
00867 if (diff > 0x100000) {
00868 inp->data_err = true;
00869 EOUT(("Too large event shift %06x -> %06x = %06x on ROC%u", inp->prev_evnt, inp->next_evnt, diff, inp->rocid));
00870 }
00871 }
00872
00873
00874
00875
00876
00877
00878
00879
00880
00881
00882
00883
00884
00885 if (inp->data_err) {
00886 inp->use = true;
00887 is_data_selected = true;
00888 break;
00889 }
00890 }
00891
00892 uint32_t min_evnt(0xffffff), max_evnt(0);
00893
00894 if (!is_data_selected) {
00895
00896
00897
00898 for (unsigned ninp = 0; ninp < fInp.size(); ninp++) {
00899
00900 InputRec* inp = &(fInp[ninp]);
00901
00902 if (!inp->isprev) return;
00903
00904 if (inp->prev_evnt < evnt_cut) continue;
00905
00906 if (inp->prev_evnt < min_evnt) min_evnt = inp->prev_evnt;
00907 if (inp->prev_evnt > max_evnt) max_evnt = inp->prev_evnt;
00908 }
00909
00910 if (min_evnt > max_evnt) {
00911 EOUT(("Nothing found!!!"));
00912 return;
00913 }
00914
00915
00916 if (min_evnt != max_evnt)
00917 if ((min_evnt < 0x10000) && (max_evnt > 0xff0000L)) {
00918 evnt_cut = 0x800000;
00919 doagain = true;
00920 continue;
00921 }
00922
00923 evnt_cut = 0;
00924
00925 for (unsigned ninp = 0; ninp < fInp.size(); ninp++) {
00926
00927 InputRec* inp = &(fInp[ninp]);
00928
00929 inp->use = (inp->prev_evnt == min_evnt);
00930 inp->data_err = false;
00931
00932 if (!inp->use) continue;
00933
00934
00935 if (!inp->isready) return;
00936
00937 is_data_selected = true;
00938
00939 if (ninp==0) continue;
00940
00941 if (!fInp[0].use)
00942 inp->data_err = true;
00943 else
00944 if (inp->next_evnt != fInp[0].next_evnt) {
00945 inp->data_err = true;
00946 uint32_t diff = CalcAbsEventDistanceNew(inp->next_evnt , fInp[0].next_evnt);
00947
00948 if (diff > 0x1000)
00949 EOUT(("Next event mismatch between ROC%u:%06x and ROC%u:%06x diff = %06x",
00950 fInp[0].rocid, fInp[0].next_evnt, inp->rocid, inp->next_evnt, diff));
00951 }
00952 }
00953
00954 }
00955
00956 if (!is_data_selected) {
00957 EOUT(("Data not selected here - why"));
00958 return;
00959 }
00960
00961 dabc::BufferSize_t totalsize = sizeof(mbs::EventHeader);
00962 unsigned numused(0);
00963 bool skip_evnt(false);
00964
00965
00966
00967 for (unsigned ninp = 0; ninp < fInp.size(); ninp++) {
00968
00969 InputRec* inp = &(fInp[ninp]);
00970
00971 if (!inp->use) continue;
00972
00973 if (inp->data_err && fSkipErrorData) skip_evnt = true;
00974
00975 totalsize += sizeof(mbs::SubeventHeader);
00976
00977
00978 totalsize += 2*roc::Message::RawSize(inp->format);
00979
00980 totalsize += inp->data_length;
00981
00982 numused++;
00983 }
00984
00985 unsigned extra_size(0);
00986
00987 unsigned grand_totalsize = totalsize + extra_size;
00988
00989
00990 if (fBnetMode && (min_evnt<=max_evnt) && (fSyncScaleDown > 0) && (min_evnt % fSyncScaleDown != 0)) {
00991 EOUT(("Event number %u not in agreement with scale down factor %u - discard event", min_evnt, fSyncScaleDown));
00992 skip_evnt = true;
00993 }
00994
00995
00996 if (fBnetMode && (numused<fInp.size())) {
00997 EOUT(("Skip while too few events"));
00998 skip_evnt = true;
00999 }
01000
01001
01002 if (!skip_evnt && (numused==fInp.size()) && fExtraMessages.size()>0)
01003 extra_size = fExtraMessages.front()->size() * roc::Message::RawSize(roc::formatNormal);
01004
01005
01006 if (fBnetMode || (!skip_evnt && (grand_totalsize > f_outptr.fullsize())))
01007 if (!FlushOutputBuffer()) return;
01008
01009
01010 if (!skip_evnt && (fOutBuf.null() || (grand_totalsize > f_outptr.fullsize()))) {
01011 if (fOutBuf.null())
01012 fOutBuf = fOutPool->TakeBuffer(fBufferSize);
01013 if (fOutBuf.null()) {
01014 EOUT(("Cannot get buffer from pool"));
01015 return;
01016 }
01017
01018
01019 if (fOutBuf.GetTotalSize() < grand_totalsize)
01020 extra_size = 0;
01021
01022
01023 if (fOutBuf.GetTotalSize() < totalsize) {
01024 SetInfo(dabc::format("Cannot put event (sz:%u) in buffer (sz:%u) - skip event", totalsize, fOutBuf.GetTotalSize()));
01025 skip_evnt = true;
01026 }
01027 f_outptr = fOutBuf.GetPointer();
01028 }
01029
01030 if (!skip_evnt) {
01031 dabc::Pointer old(f_outptr);
01032
01033
01034
01035
01036
01037 mbs::EventHeader* evhdr = (mbs::EventHeader*) f_outptr();
01038 evhdr->Init(min_evnt);
01039 fOutBuf.Shift(f_outptr, sizeof(mbs::EventHeader));
01040
01041
01042
01043
01044
01045
01046 MessagesVector* extra = 0;
01047
01048 if (extra_size>0) {
01049 extra = fExtraMessages.front();
01050 fExtraMessages.pop_front();
01051 }
01052
01053 unsigned filled_sz = FillRawSubeventsBuffer(fOutBuf, f_outptr, extra);
01054
01055 SetInfo(dabc::format("Fill event %7u sz %u", min_evnt, filled_sz));
01056
01057
01058
01059 if (filled_sz == 0) {
01060 EOUT(("Event data not filled - event skipped!!!"));
01061 f_outptr = old;
01062 } else {
01063
01064
01065
01066 evhdr->SetSubEventsSize(filled_sz);
01067
01068 DOUT5(("Produce event %u of size %u", (unsigned) evhdr->EventNumber() ,(unsigned) evhdr->FullSize()));
01069
01070 if (fBnetMode) {
01071
01072 EOUT(("BNet mode not yet implemented !!!"));
01073
01074 throw dabc::Exception("BNET mode not yet implemented");
01075
01076
01077
01078
01079
01080
01081 FlushOutputBuffer();
01082 }
01083 }
01084 } else {
01085 SetInfo(dabc::format("Skip event %7u", min_evnt));
01086 }
01087
01088
01089 doagain = true;
01090 for (unsigned ninp = 0; ninp<fInp.size(); ninp++)
01091 if (fInp[ninp].use)
01092 if (!SkipEvent(ninp)) doagain = false;
01093 }
01094 }
01095
01099 bool roc::CombinerModule::FlushOutputBuffer()
01100 {
01101
01102 if (fOutBuf.null()) return true;
01103
01104 dabc::BufferSize_t usedsize = fOutBuf.GetTotalSize() - f_outptr.fullsize();
01105 if (usedsize==0) return true;
01106
01107
01108 if (!CanSendToAllOutputs()) {
01109
01110
01111
01112
01113
01114 return false;
01115 }
01116
01117
01118
01119 f_outptr.reset();
01120
01121 fOutBuf.SetTotalSize(usedsize);
01122
01123 fOutBuf.SetTypeId(mbs::mbt_MbsEvents);
01124
01125
01126
01127
01128
01129
01130
01131
01132
01133
01134
01135
01136 SendToAllOutputs(fOutBuf.HandOver());
01137
01138 fFlushFlag = false;
01139
01140 return true;
01141 }
01142
01143 unsigned roc::CombinerModule::FillRawSubeventsBuffer(dabc::Buffer& outbuf, dabc::Pointer& outptr, roc::MessagesVector* extra)
01144 {
01145 unsigned filled_size = 0;
01146
01147 bool iserr = false;
01148
01149 for (unsigned ninp = 0; ninp < fInp.size(); ninp++) {
01150 InputRec* rec = &(fInp[ninp]);
01151 if (!rec->use) continue;
01152
01153 mbs::SubeventHeader* subhdr = (mbs::SubeventHeader*) outptr();
01154 subhdr->Init();
01155 subhdr->iProcId = rec->data_err ? roc::proc_ErrEvent : roc::proc_RocEvent;
01156 subhdr->iSubcrate = rec->rocid;
01157 subhdr->iControl = rec->format;
01158
01159 if (rec->data_err) iserr = true;
01160
01161 outbuf.Shift(outptr, sizeof(mbs::SubeventHeader));
01162 filled_size+=sizeof(mbs::SubeventHeader);
01163
01164 unsigned msg_size = roc::Message::RawSize(rec->format);
01165
01166 unsigned subeventsize = msg_size;
01167
01168 roc::Message msg;
01169 msg.setMessageType(roc::MSG_EPOCH);
01170 msg.setRocNumber(rec->rocid);
01171 msg.setEpochNumber(rec->prev_epoch);
01172 msg.setEpochMissed(0);
01173 msg.copyto(outptr(), rec->format);
01174
01175 outbuf.Shift(outptr, msg_size);
01176
01177 unsigned nbuf = rec->prev_nbuf;
01178
01179 bool firstmsg = true;
01180
01181 while (nbuf<=rec->next_nbuf) {
01182 dabc::Buffer& inbuf = Input(ninp)->InputBuffer(nbuf);
01183 if (inbuf.null()) {
01184 EOUT(("Internal error"));
01185 return 0;
01186 }
01187
01188
01189 dabc::Pointer ptr = inbuf.GetPointer();
01190 if (nbuf == rec->next_nbuf) ptr.setfullsize(rec->next_indx + msg_size);
01191 if (nbuf == rec->prev_nbuf) inbuf.Shift(ptr, rec->prev_indx);
01192
01193
01194 if (firstmsg && extra) {
01195
01196 outptr.copyfrom(ptr, msg_size);
01197 outbuf.Shift(outptr, msg_size);
01198 subeventsize += msg_size;
01199 inbuf.Shift(ptr, msg_size);
01200 AddExtraMessagesToSubevent(outbuf, extra, outptr, subeventsize, rec);
01201 }
01202
01203
01204 outptr.copyfrom(ptr, ptr.fullsize());
01205 outbuf.Shift(outptr, ptr.fullsize());
01206 subeventsize += ptr.fullsize();
01207 firstmsg = false;
01208
01209 nbuf++;
01210 }
01211
01212 filled_size += subeventsize;
01213 subhdr->SetRawDataSize(subeventsize);
01214 }
01215
01216 Par("RocEvents").SetInt(1);
01217
01218 if (iserr) Par("RocErrors").SetInt(1);
01219
01220 if (extra!=0) delete extra;
01221
01222 return filled_size;
01223 }
01224
01225 void roc::CombinerModule::AddExtraMessagesToSubevent(dabc::Buffer& buf, roc::MessagesVector* extra, dabc::Pointer& outptr, unsigned& subeventsize, InputRec* rec)
01226 {
01227 unsigned msg_size = roc::Message::RawSize(rec->format);
01228
01229
01230
01231 for (unsigned n=0;n<extra->size();n++)
01232 if (extra->at(n).getRocNumber() == rec->rocid) {
01233 extra->at(n).copyto(outptr(), rec->format);
01234 buf.Shift(outptr, msg_size);
01235 subeventsize += msg_size;
01236
01237 }
01238
01239
01240 }
01241
01242 void roc::CombinerModule::DumpData(dabc::Buffer& buf)
01243 {
01244 roc::Iterator iter;
01245 if (!InitIterator(iter, buf)) return;
01246
01247 while (iter.next())
01248 iter.msg().printData(3);
01249 }
01250
01251 int roc::CombinerModule::ExecuteCommand(dabc::Command cmd)
01252 {
01253 if (cmd.IsName("ConfigureInput")) {
01254 int recid = cmd.GetInt("Input", 0);
01255
01256 if ((recid<0) || (recid>=(int)fInp.size())) {
01257 EOUT(("Something wrong with input configurations"));
01258 return dabc::cmd_false;
01259 }
01260 InputRec* rec = &(fInp[recid]);
01261
01262 if (!rec->isrocid()) {
01263 rec->isudp = cmd.GetBool("IsUdp", true);
01264 rec->rocid = cmd.GetInt("ROCID", 0);
01265 rec->format = cmd.GetInt("Format", 0);
01266
01267 SetInfo(dabc::format("Configure input %u with ROC:%d Kind:%s", recid, rec->rocid, (rec->isudp ? "UDP" : "Optic")), true);
01268 }
01269
01270 return dabc::cmd_true;
01271
01272 } else
01273
01274 if (cmd.IsName(CmdMessagesVector::CmdName())) {
01275 MessagesVector* vect = (MessagesVector*) cmd.GetPtr(CmdMessagesVector::Vector());
01276
01277 if (vect!=0)
01278 fExtraMessages.push_back(vect);
01279 else
01280 EOUT(("Zero vector with extra messages"));
01281
01282 if (fExtraMessages.size()>10) {
01283 EOUT(("Too many extra messages, remove part of them"));
01284 delete fExtraMessages.front();
01285 fExtraMessages.pop_front();
01286 }
01287
01288 return dabc::cmd_true;
01289 }
01290
01291 return dabc::ModuleAsync::ExecuteCommand(cmd);
01292 }
01293
01294