00001
00002
00003 #include "dabc/logging.h"
00004 #include "dabc/timing.h"
00005 #include "dabc/ModuleSync.h"
00006 #include "dabc/Port.h"
00007 #include "dabc/Timer.h"
00008 #include "dabc/Command.h"
00009 #include "dabc/Manager.h"
00010 #include "dabc/MemoryPool.h"
00011 #include "dabc/PoolHandle.h"
00012 #include "dabc/threads.h"
00013 #include "dabc/Application.h"
00014 #include "dabc/SocketDevice.h"
00015 #include "dabc/statistic.h"
00016 #include "dabc/Factory.h"
00017 #include "dabc/Configuration.h"
00018
00019 #include "mbs/MbsTypeDefs.h"
00020 #include "mbs/Iterator.h"
00021
00022 #include <string>
00023 #include <vector>
00024
00025 enum ERocMbsTypes {
00026 proc_RocEvent = 1,
00027 proc_ErrEvent = 2,
00028 proc_MergedEvent = 3,
00029 proc_RawData = 4,
00030 proc_Triglog = 5,
00031 proc_TRD_MADC = 6,
00032 proc_TRD_Spadic = 7,
00033 proc_CERN_Oct11 = 8,
00034 proc_SlaveMbs = 9,
00035 proc_EPICS = 10,
00036 proc_COSY_Nov11 = 11,
00037 proc_SpadicV10Raw = 12,
00038 proc_SpadicV10Event = 13,
00039 proc_CERN_Oct12 = 14,
00040 proc_FASP = 15,
00041 proc_TRBEvent = 31
00042 };
00043
00044 std::string GetHeaderInfo(uint32_t id)
00045 {
00046 switch (id & 0xff) {
00047 case proc_RocEvent : return dabc::format("ROC%02d", (id >> 16) & 0xff);
00048 case proc_ErrEvent : return "ErrEvent";
00049 case proc_MergedEvent : return "Merged";
00050 case proc_RawData : return "RawData";
00051 case proc_Triglog : return "Triglog-M";
00052 case proc_TRD_MADC : return "Cern-nov10";
00053 case proc_TRD_Spadic : return dabc::format("SPADIC%02d", (id >> 16) & 0xff);
00054 case proc_CERN_Oct11 : return dabc::format("CERN-%s", (id >> 24) == 0x9 ? "M" : "S");
00055 case proc_SlaveMbs : return "Triglog-S";
00056 case proc_EPICS : return "EPICS";
00057 case proc_CERN_Oct12 : return dabc::format("CERN-%s", (id >> 24) == 0x9 ? "M" : "S");
00058 case proc_FASP : return "FASP";
00059 case proc_TRBEvent : return "TRB";
00060 }
00061 return "undef";
00062 }
00063
00064 int last_master(0), last_slave(0), last_mastertm(0), mbseventid(0);
00065
00066 int GetEventNumber(uint32_t id, void* dataptr, int datalen)
00067 {
00068 uint8_t* data = (uint8_t*) dataptr;
00069 switch (id & 0xff) {
00070 case proc_RocEvent: {
00071 int ownid = (*((uint32_t*) (data+8)) >> 6) & 0xffffff;
00072 int ownid2 = (*((uint32_t*) (data+datalen - 4)) >> 6) & 0xffffff;
00073 if (ownid+1 != ownid2) EOUT(("ROC SYNC PANICK"));
00074 return ownid;
00075 }
00076 case proc_TRD_Spadic: {
00077 int ownid = data[360] << 16 | data[361] << 8 | data[362];
00078 return ownid;
00079 }
00080 case proc_Triglog: {
00081 int ownid = *((uint32_t*)dataptr);
00082 last_master = ownid;
00083 last_mastertm = *((uint32_t*) (data + 8));
00084 return ownid;
00085 }
00086 case proc_SlaveMbs: {
00087 int ownid = *((uint32_t*)dataptr);
00088 last_slave = ownid;
00089 return ownid;
00090 }
00091 case proc_CERN_Oct12 :
00092 return (id >> 24) == 0x9 ? last_master : last_slave;
00093
00094 case proc_EPICS:
00095 return last_master;
00096
00097 case proc_FASP:
00098 return last_master;
00099
00100 case proc_TRBEvent:
00101 return last_master;
00102 }
00103
00104 EOUT(("Cannot define event ID"));
00105
00106 return 0;
00107 }
00108
00109
00110 #include <map>
00111
00112 typedef std::map<uint32_t,int> SubeventsMap;
00113
00114 class CernOct12SorterModule : public dabc::ModuleSync {
00115 protected:
00116
00117 int fBufferSize;
00118 bool fDoFASP;
00119 bool fDoReading;
00120 bool fDoWriting;
00121 bool fOnlyFull;
00122
00123
00124 public:
00125 CernOct12SorterModule(const char* name, dabc::Command cmd) :
00126 dabc::ModuleSync(name, cmd)
00127 {
00128 fBufferSize = CreatePar(dabc::xmlBufferSize).AsInt(16*1024);
00129
00130 fDoFASP = CreatePar("DoFASP").AsBool(false);
00131 fDoReading = CreatePar("DoReading").AsBool(false);
00132 fDoWriting = CreatePar("DoWriting").AsBool(false);
00133 fOnlyFull = CreatePar("OnlyFull").AsBool(false);
00134
00135 CreatePar("RunName");
00136
00137 CreatePoolHandle("Pool");
00138
00139 if (fDoReading) CreateInput("Input", Pool());
00140 if (fDoFASP) CreateInput("FASP", Pool());
00141 if (fDoWriting) CreateOutput("Output", Pool());
00142 }
00143
00144 virtual void MainLoop()
00145 {
00146 if (!fDoReading && fDoWriting && fDoFASP) {
00147 FaspLoop();
00148 return;
00149 }
00150
00151
00152 if (!fDoWriting)
00153 if (fDoFASP ^ fDoReading) {
00154 NormalLoop();
00155 return;
00156 }
00157
00158 if (fDoFASP && fDoReading)
00159 CombinerLoop();
00160
00161 DOUT0(("Nothing to do"));
00162 }
00163
00164 void FaspLoop()
00165 {
00166
00167
00168 dabc::Buffer buf;
00169
00170 int fullsz(0), fullcnt(0);
00171
00172 bool iseof(false);
00173
00174 while (!iseof) {
00175 buf = Recv(Input(), 5.);
00176
00177 fullsz += buf.GetTotalSize();
00178
00179 if (buf.GetTypeId() == mbs::mbt_MbsEvents)
00180 fullcnt += mbs::ReadIterator::NumEvents(buf);
00181
00182 iseof = (buf.GetTypeId() == dabc::mbt_EOF);
00183
00184 if (fDoWriting) {
00185
00186 Send(Output(), buf.HandOver());
00187 }
00188 }
00189
00190 if (iseof)
00191 DOUT0(("END-OF-FILE detected!!"));
00192
00193 DOUT0(("Totally received bytes %d events %d", fullsz, fullcnt));
00194
00195 if (fDoWriting)
00196 while (Output()->OutputPending() > 0) {
00197 DOUT0(("Waiting until data is written %d", Output()->OutputPending()));
00198 WorkerSleep(1.);
00199 }
00200 }
00201
00202
00203 void NormalLoop()
00204 {
00205
00206
00207 dabc::Buffer buf;
00208
00209
00210
00211 int bufcnt(0), evntcnt(0), errcnt(0);
00212 int64_t totalsz(0);
00213
00214 SubeventsMap fMap;
00215 std::vector<int> triggers(16);
00216
00217 int first_tm(0), last_tm(0);
00218
00219 while (true) {
00220
00221 buf = Recv(Input(), 5.);
00222
00223 if (buf.GetTypeId() == dabc::mbt_EOF) {
00224 DOUT3(("Found EOF buffer len = %d", buf.GetTotalSize()));
00225 break;
00226 }
00227
00228 if (buf.null()) {
00229 EOUT(("Get empty buffer id = %d", buf.GetTypeId()));
00230 break;
00231 }
00232
00233 totalsz+=buf.GetTotalSize();
00234
00235 mbs::ReadIterator iter(buf);
00236 while (iter.NextEvent()) {
00237 evntcnt++;
00238
00239
00240
00241 if (iter.evnt()->iTrigger < 16)
00242 triggers[iter.evnt()->iTrigger]++;
00243
00244 mbseventid = iter.evnt()->EventNumber();
00245
00246 int maxid(-1), minid(-1);
00247
00248 while (iter.NextSubEvent()) {
00249 mbs::SubeventHeader* sub = iter.subevnt();
00250
00251 uint32_t fullid = sub->fFullId;
00252
00253 fMap[fullid]++;
00254
00255 int evid = GetEventNumber(fullid, sub->RawData(), sub->RawDataSize());
00256
00257
00258
00259
00260
00261 if (maxid<0) { maxid = evid; } else if (evid>maxid) maxid=evid;
00262 if (minid<0) minid = evid; else if (evid<minid) minid=evid;
00263 }
00264
00265 last_tm = last_mastertm;
00266 if (first_tm==0) first_tm = last_tm;
00267
00268 if ((maxid!=minid) && !fDoWriting) {
00269 errcnt++;
00270 if (errcnt<100) EOUT(("EventID missmatch %d %d master:%d slave:%d", minid, maxid, last_master, last_slave));
00271 }
00272 }
00273
00274 bufcnt++;
00275 }
00276
00277 DOUT0(("Found %d events in %d buffers totalsz %d", evntcnt, bufcnt, totalsz));
00278 for (unsigned n=0;n<triggers.size();n++)
00279 if (triggers[n]>0)
00280 DOUT1((" Trigger %2u found %6d times", n, triggers[n]));
00281
00282 DOUT1(("Found %u different subevents", fMap.size()));
00283
00284 SubeventsMap:: iterator it = fMap.begin();
00285 while (it != fMap.end()) {
00286
00287 std::string info = GetHeaderInfo(it->first);
00288
00289 DOUT1((" Subevent 0x%08x %10s found %6d times", it->first, info.c_str(), it->second));
00290
00291 it++;
00292 }
00293
00294 if (errcnt>0) EOUT(("TOTAL ERRORS COUNT = %d", errcnt));
00295
00296 int tm = last_tm-first_tm;
00297
00298 DOUT0(("HEAD: | *Name* | *Time,s* | *Size,MB* | *Events* | *TR1* | *TR2* | *TR8* | *EPICS* |"));
00299 DOUT0(("STAT: | %17s | %5d | %5.1f | %6d | %6d | %6d | %6d | %4d |",
00300 Par("RunName").AsStr("---"), tm , totalsz/1e6, evntcnt, triggers[1], triggers[2], triggers[8], fMap[0xa]));
00301
00302 WorkerSleep(1.);
00303 }
00304
00305
00306 void AccountLoop(int kind, int eventid)
00307 {
00308 static int lastkind = kind;
00309 static int left = eventid;
00310 static int right = eventid;
00311
00312 if (kind == lastkind) {
00313 right = eventid;
00314 return;
00315 }
00316
00317 std::string info, sinfo;
00318 if (left==right) dabc::formats(info, "event %d", left);
00319 else dabc::formats(info, "events from %d to %d", left, right);
00320
00321 switch (lastkind) {
00322 case 0: sinfo = "MERGE"; break;
00323 case 1: sinfo = "KEEP MAIN"; break;
00324 case 2: sinfo = "SKIP FASP"; break;
00325 case 3: sinfo = "SKIP MAIN"; break;
00326 default: sinfo = "?? WHAT ??"; break;
00327 }
00328
00329 DOUT1(("%s %s", sinfo.c_str(), info.c_str()));
00330
00331 lastkind = kind;
00332 left = eventid;
00333 right = eventid;
00334 }
00335
00336 void VerifyPlace(mbs::WriteIterator& iter, unsigned size)
00337 {
00338 if ((size>0) && iter.IsPlaceForEvent(size-sizeof(mbs::EventHeader))) return;
00339
00340 dabc::Buffer buf = iter.Close();
00341
00342 if (fDoWriting) Send(Output(), buf);
00343 else buf.Release();
00344
00345 if (size>0)
00346 iter.Reset(Pool()->TakeBuffer(fBufferSize));
00347 }
00348
00349 void CombinerLoop()
00350 {
00351
00352
00353
00354
00355
00356 mbs::ReadIterator iter1, iter2;
00357
00358 int currid1(-1), currid2(-1);
00359
00360 mbs::WriteIterator out_iter;
00361
00362
00363
00364 int bufcnt(0), evntcnt(0), errcnt(0);
00365 int64_t totalsz(0);
00366
00367 SubeventsMap fMap;
00368 std::vector<int> triggers(16);
00369
00370 int first_tm(0), last_tm(0);
00371
00372 bool end_of_fasp_file(false), end_of_lmd_file(false);
00373
00374 while (!end_of_lmd_file || !end_of_fasp_file) {
00375
00376 if ((currid1<0) && !end_of_lmd_file) {
00377 iter1.NextEvent();
00378
00379 if (!iter1.IsData()) {
00380 dabc::Buffer buf1 = Recv(Input(0), 5.);
00381 bufcnt++;
00382 if (buf1.GetTypeId() == dabc::mbt_EOF) {
00383 end_of_lmd_file = true;
00384 } else
00385 if (buf1.null()) {
00386 EOUT(("Get empty buffer id = %d", buf1.GetTypeId()));
00387 break;
00388 } else
00389 if (!iter1.Reset(buf1.HandOver())) {
00390 EOUT(("Problem to reset iter1"));
00391 break;
00392 }
00393 if (!end_of_lmd_file)
00394 iter1.NextEvent();
00395 }
00396
00397 if (iter1.IsData()) {
00398 int maxid(-1), minid(-1);
00399
00400 while (iter1.NextSubEvent()) {
00401 mbs::SubeventHeader* sub = iter1.subevnt();
00402 uint32_t fullid = sub->fFullId;
00403 fMap[fullid]++;
00404
00405 int evid = GetEventNumber(fullid, sub->RawData(), sub->RawDataSize());
00406
00407 if (maxid<0) { maxid = evid; } else if (evid>maxid) maxid=evid;
00408 if (minid<0) minid = evid; else if (evid<minid) minid=evid;
00409 }
00410
00411 if (maxid!=minid) EOUT(("ID missmatch"));
00412 currid1 = minid;
00413 }
00414 }
00415
00416 if ((currid2<0) && !end_of_fasp_file) {
00417 iter2.NextEvent();
00418
00419 if (!iter2.IsData()) {
00420 dabc::Buffer buf2 = Recv(Input(1), 5.);
00421 if (buf2.GetTypeId() == dabc::mbt_EOF) {
00422 end_of_fasp_file = true;
00423 } else
00424 if (buf2.null()) {
00425 EOUT(("Get empty FASP buffer id = %d", buf2.GetTypeId()));
00426 break;
00427 } else
00428 if (!iter2.Reset(buf2.HandOver())) {
00429 EOUT(("Problem to reset iter2"));
00430 break;
00431 }
00432 iter2.NextEvent();
00433 }
00434
00435 if (!end_of_fasp_file)
00436 currid2 = iter2.evnt()->EventNumber();
00437 }
00438
00439 if ((currid2<0) && (currid1<0)) {
00440 break;
00441 }
00442
00443 if ((currid1<0) && (currid2>=0)) {
00444
00445 AccountLoop(2, currid2);
00446 currid2 = -1;
00447 continue;
00448 }
00449
00450 if ((currid2<0) && (currid1>=0)) {
00451 if (fOnlyFull) {
00452
00453 AccountLoop(3, currid1);
00454 currid1 = -1;
00455 continue;
00456 } else {
00457
00458 VerifyPlace(out_iter, iter1.GetEventSize());
00459 out_iter.CopyEventFrom(iter1.GetEventPointer());
00460
00461 AccountLoop(1, currid1);
00462 currid1 = -1;
00463 evntcnt++;
00464 continue;
00465 }
00466 }
00467
00468 unsigned diff = ((unsigned) currid1 - (unsigned) currid2) & 0xffffff;
00469
00470 if (diff == 0) {
00471
00472
00473 VerifyPlace(out_iter, iter1.GetEventSize() + iter2.GetEventSize());
00474
00475 fMap[proc_FASP]++;
00476
00477 out_iter.CopyEventFrom(iter1.GetEventPointer(), false);
00478 out_iter.AddSubevent(iter2.GetSubeventsPointer());
00479 out_iter.FinishEvent();
00480
00481 AccountLoop(0, currid1);
00482 currid1 = -1;
00483 currid2 = -1;
00484 evntcnt++;
00485 } else
00486 if (diff<0x800000) {
00487
00488 AccountLoop(2, currid2);
00489 currid2 = -1;
00490
00491 } else {
00492
00493
00494 if (fOnlyFull) {
00495
00496 AccountLoop(3, currid1);
00497 currid1 = -1;
00498 continue;
00499
00500 } else {
00501
00502 VerifyPlace(out_iter, iter1.GetEventSize());
00503 out_iter.CopyEventFrom(iter1.GetEventPointer());
00504 AccountLoop(1, currid1);
00505 currid1 = -1;
00506 evntcnt++;
00507 }
00508 }
00509 }
00510
00511 VerifyPlace(out_iter, 0);
00512 AccountLoop(-1, -1);
00513
00514 DOUT0(("Found %d events totalsz %d", evntcnt, totalsz));
00515 for (unsigned n=0;n<triggers.size();n++)
00516 if (triggers[n]>0)
00517 DOUT1((" Trigger %2u found %6d times", n, triggers[n]));
00518
00519 DOUT1(("Found %u different subevents", fMap.size()));
00520
00521 SubeventsMap:: iterator it = fMap.begin();
00522 while (it != fMap.end()) {
00523 std::string info = GetHeaderInfo(it->first);
00524 DOUT1((" Subevent 0x%08x %10s found %6d times", it->first, info.c_str(), it->second));
00525 it++;
00526 }
00527
00528 if (errcnt>0) EOUT(("TOTAL ERRORS COUNT = %d", errcnt));
00529
00530 int tm = last_tm-first_tm;
00531
00532 DOUT0(("HEAD: | *Name* | *Time,s* | *Size,MB* | *Events* | *TR1* | *TR2* | *TR8* | *EPICS* |"));
00533 DOUT0(("STAT: | %17s | %5d | %5.1f | %6d | %6d | %6d | %6d | %4d |",
00534 Par("RunName").AsStr("---"), tm , totalsz/1e6, evntcnt, triggers[1], triggers[2], triggers[8], fMap[0xa]));
00535
00536 if (fDoWriting)
00537 while (Output()->OutputPending() > 0) {
00538 DOUT0(("Waiting until data is written %d", Output()->OutputPending()));
00539 WorkerSleep(1.);
00540 }
00541
00542 WorkerSleep(1.);
00543 }
00544
00545
00546 virtual void AfterModuleStop()
00547 {
00548
00549
00550 dabc::mgr.StopApplication();
00551 }
00552 };
00553
00554
00555 class CernOct12Factory : public dabc::Factory {
00556 public:
00557
00558 CernOct12Factory(const char* name) : dabc::Factory(name) {}
00559
00560 virtual dabc::Module* CreateModule(const char* classname, const char* modulename, dabc::Command cmd)
00561 {
00562 if (strcmp(classname,"CernOct12SorterModule")==0)
00563 return new CernOct12SorterModule(modulename, cmd);
00564
00565 return 0;
00566 }
00567 };
00568
00569 dabc::FactoryPlugin cernoct12(new CernOct12Factory("cern-oct12"));
00570
00571 const int FaspBlockSize = 44;
00572 const int FaspSyncPos = 34;
00573
00574
00575
00576 extern "C" void RunSorter()
00577 {
00578 dabc::mgr.CreateMemoryPool("Pool", 5000000, 50);
00579
00580 dabc::mgr.CreateModule("CernOct12SorterModule", "Sorter", "SorterThrd");
00581
00582 if (dabc::mgr.FindModule("Sorter").Par("DoReading").AsBool())
00583 dabc::mgr.CreateTransport("Sorter/Input", mbs::typeLmdInput, "SorterThrd");
00584 else
00585 DOUT0(("Without normal reading"));
00586
00587 if (dabc::mgr.FindModule("Sorter").Par("DoFASP").AsBool())
00588 dabc::mgr.CreateTransport("Sorter/FASP", "fasp::FileInput", "SorterThrd");
00589 else
00590 DOUT0(("Without FASP reading"));
00591
00592 if (dabc::mgr.FindModule("Sorter").Par("DoWriting").AsBool())
00593 dabc::mgr.CreateTransport("Sorter/Output", mbs::typeLmdOutput, "SorterThrd");
00594 else
00595 DOUT0(("Without writing"));
00596 }