00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "dabc/logging.h"
00016 #include "dabc/timing.h"
00017 #include "dabc/ModuleSync.h"
00018 #include "dabc/Port.h"
00019 #include "dabc/Timer.h"
00020 #include "dabc/Command.h"
00021 #include "dabc/StandaloneManager.h"
00022 #include "dabc/MemoryPool.h"
00023 #include "dabc/PoolHandle.h"
00024 #include "dabc/threads.h"
00025 #include "dabc/Application.h"
00026 #include "dabc/SocketDevice.h"
00027 #include "dabc/statistic.h"
00028 #include "dabc/TimeSyncModule.h"
00029 #include "dabc/Factory.h"
00030 #include "dabc/Configuration.h"
00031 #include "dabc/CommandsSet.h"
00032
00033
00034 #include "mbs/MbsTypeDefs.h"
00035 #include "mbs/Iterator.h"
00036
00037
00038
00039
00040 #include <map>
00041 #include <list>
00042 #include <vector>
00043
00044 #define HHH 0x77afb
00045
00046 struct SubRec {
00047 dabc::Buffer* buf;
00048 mbs::EventHeader* evnt;
00049 mbs::SubeventHeader* sub;
00050 };
00051
00052 typedef std::map<int, SubRec> SubMap;
00053
00054 class CernNov10SorterModule : public dabc::ModuleSync {
00055 protected:
00056
00057 int fBufferSize;
00058 bool fDoWrite;
00059 bool fDoSorting;
00060 bool fOnlyFull;
00061 int bufcnt;
00062 int evntcnt;
00063 int completecnt;
00064 int lastevnt;
00065 SubMap fMap;
00066 mbs::WriteIterator fOutIter;
00067 std::string fRunName;
00068
00069 int fLastFlushId;
00070
00071 int fStatistic[11];
00072
00073 int fPreviousEventId;
00074 int fPreviousSyncId;
00075
00076
00077
00078
00079
00080
00081 public:
00082 CernNov10SorterModule(const char* name, dabc::Command* cmd) :
00083 dabc::ModuleSync(name, cmd)
00084 {
00085 fBufferSize = GetCfgInt(dabc::xmlBufferSize, 16*1024, cmd);
00086
00087 fDoWrite = GetCfgBool("DoWrite", false, cmd);
00088
00089 fDoSorting = GetCfgBool("DoSorting", false, cmd);
00090
00091 fOnlyFull = GetCfgBool("OnlyFull", false, cmd);
00092
00093 fRunName = GetCfgStr("RunName", "run22", cmd);
00094
00095 CreatePoolHandle("Pool", fBufferSize);
00096
00097 CreateInput("Input", Pool());
00098
00099 CreateOutput("Output", Pool());
00100
00101 bufcnt = 0;
00102 evntcnt = 0;
00103 completecnt = 0;
00104 lastevnt = 0;
00105
00106 fPreviousEventId = -1;
00107 fPreviousSyncId = -1;
00108
00109 fLastFlushId = -1;
00110
00111 for (int n=0;n<11; n++) fStatistic[n] = 0;
00112
00113
00114
00115 }
00116
00117 bool DoWrite() const { return fDoWrite; }
00118
00119 bool BuildEvent(int syncid)
00120 {
00121 unsigned sub_size(0), n_sub(0);
00122
00123 SubMap::iterator iter;
00124
00125 for (int key = 0; key<11; key++) {
00126 iter = fMap.find(syncid << 4 | key);
00127 if (iter == fMap.end()) continue;
00128
00129 sub_size += iter->second.sub->FullSize();
00130
00131 n_sub++;
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159
00160 }
00161
00162
00163 if (sub_size==0) return true;
00164
00165
00166 if (fOnlyFull && (n_sub<<11)) return true;
00167
00168
00169 if (!fOutIter.IsPlaceForEvent(sub_size)) {
00170
00171
00172
00173 Send(Output(), fOutIter.Close());
00174
00175 dabc::Buffer* buf = TakeBuffer(Pool(), fBufferSize);
00176
00177
00178
00179 fOutIter.Reset(buf);
00180
00181 if (!fOutIter.IsPlaceForEvent(sub_size)) {
00182 EOUT(("PANIK - NO PLACE for event %x size %u buf %p", syncid, sub_size, fOutIter.buf()));
00183 return false;
00184 }
00185 }
00186
00187 iter = fMap.find(syncid << 4);
00188
00189 if (iter == fMap.end()) {
00190 fOutIter.NewEvent(syncid);
00191 fOutIter.evnt()->iTrigger = 7;
00192 } else {
00193 fOutIter.NewEvent(iter->second.evnt->EventNumber());
00194 fOutIter.evnt()->CopyHeader(iter->second.evnt);
00195 }
00196
00197
00198
00199
00200
00201
00202 for (int key = 0; key < 11; key++) {
00203 iter = fMap.find((syncid << 4) | key);
00204 if (iter != fMap.end()) {
00205
00206
00207 if (!fOutIter.AddSubevent(iter->second.sub)) {
00208 EOUT(("ADD SUBEVNT PANIK "));
00209 exit(1);
00210 }
00211
00212 fMap.erase(iter);
00213 }
00214 }
00215
00216 fOutIter.FinishEvent();
00217
00218 return true;
00219 }
00220
00221
00222 void FlushBuffer(dabc::Buffer* buf)
00223 {
00224
00225 int maxid(-1), minid(-1);
00226
00227 SubMap::iterator iter = fMap.begin();
00228 for (; iter!=fMap.end(); iter++) {
00229 if (iter->second.buf != buf) continue;
00230
00231 int syncid = iter->first >> 4;
00232
00233 if (maxid<0) {
00234 maxid = syncid;
00235 minid = syncid;
00236 } else
00237 if (syncid > maxid) maxid = syncid; else
00238 if (syncid < minid) minid = syncid;
00239 }
00240
00241 if ((maxid>0) && DoWrite()) {
00242
00243 if (fLastFlushId<0) fLastFlushId = minid - 1;
00244
00245
00246
00247 if (maxid - fLastFlushId > 0x800000) { EOUT(("WRAP")); exit(0); }
00248
00249 for (int syncid = fLastFlushId + 1; syncid <= maxid; syncid++)
00250 BuildEvent(syncid);
00251
00252 fLastFlushId = maxid;
00253 }
00254
00255 for (iter = fMap.begin(); iter!=fMap.end(); iter++) {
00256 if (iter->second.buf == buf) {
00257 EOUT(("PANIK : Still found buffer !!!"));
00258 fMap.erase(iter);
00259 exit(1);
00260 }
00261 }
00262
00263 dabc::Buffer::Release(buf);
00264 }
00265
00266
00267 void ProduceBufferIndex(dabc::Buffer* buf)
00268 {
00269
00270
00271
00272 mbs::ReadIterator iter(buf);
00273 while (iter.NextEvent()) {
00274 bufcnt++;
00275
00276 int evntid = iter.evnt()->iEventNumber;
00277
00278
00279
00280 int syncid(-1), prevsyncid(-1);
00281
00282 int numsub = 0;
00283
00284 while (iter.NextSubEvent()) {
00285
00286 int findkey = -1;
00287
00288 numsub++;
00289
00290
00291
00292 if (iter.subevnt()->fFullId == 0x9000005) {
00293 findkey = 0;
00294
00295 syncid = *((uint32_t*) iter.subevnt()->RawData());
00296
00297
00298
00299
00300 if (syncid==0) {
00301 if (evntid==fPreviousEventId+1) {
00302 syncid = fPreviousSyncId + 1;
00303 *((uint32_t*) iter.subevnt()->RawData()) = syncid;
00304 DOUT0(("REPAIR ZERO COUNTER evnt:0x%x sync:0x%x", evntid, syncid));
00305 } else {
00306 EOUT(("ZERO counter - cannot repair? RAWDATA:"));
00307 for (unsigned n=0;n<iter.subevnt()->RawDataSize();n++) {
00308 if (n % 16 == 0) printf("\n");
00309 if (n % 4 == 0) printf(" ");
00310 printf("%3x", *((uint8_t*) iter.subevnt()->RawData() + n));
00311 }
00312 printf("\n");
00313 }
00314 }
00315
00316 if (syncid > 0xffffff) DOUT0(("SYNC EVID PANIK 0x%06x !!!", syncid));
00317
00318 if ((fPreviousSyncId>=0) && (syncid!=fPreviousSyncId+1))
00319 DOUT0(("SYNC SEQ PANIK 0x%6x 0x%6x!!!", fPreviousSyncId, syncid));
00320
00321 fPreviousEventId = evntid;
00322 fPreviousSyncId = syncid;
00323
00324 evntcnt++;
00325
00326
00327 } else
00328 if (iter.subevnt()->fFullId == 0x9010006) {
00329 findkey = 1;
00330
00331
00332 if (syncid <0 ) EOUT(("AAAAAAAAA"));
00333
00334 } else
00335 if ((iter.subevnt()->fFullId & 0xff00ffff) == 0x2000007) {
00336 findkey = 1 + iter.subevnt()->iSubcrate;
00337
00338 uint8_t* data = (uint8_t*) iter.subevnt()->RawData();
00339 int ownid = data[360] << 16 | data[361] << 8 | data[362];
00340
00341
00342
00343
00344 if ((syncid>=0) && (syncid!=ownid)) EOUT(("Spadic PANIK 0x%06x 0x%06x", syncid, ownid));
00345
00346 if (syncid<0) syncid = ownid;
00347
00348 } else
00349 if ((iter.subevnt()->fFullId & 0xff00ffff) == 0x2000001) {
00350 findkey = 8 + iter.subevnt()->iSubcrate;
00351
00352 uint8_t* data = (uint8_t*) iter.subevnt()->RawData();
00353
00354 int ownid = (*((uint32_t*) (data+8)) >> 6) & 0xffffff;
00355
00356 int ownid2 = (*((uint32_t*) (data+iter.subevnt()->RawDataSize() - 4)) >> 6) & 0xffffff;
00357
00358
00359
00360 if (ownid2 - ownid != 1) { DOUT0((" ROC SYNC PANIK 0x%06x 0x%06x", ownid, ownid2)); }
00361
00362 if ((syncid>=0) && (syncid!=ownid)) DOUT0(("ROC%d PANIK 0x%06x 0x%06x", findkey - 8, syncid, ownid));
00363
00364 if (syncid<0) syncid = ownid;
00365 } else {
00366 EOUT(("Wrong subevent 0x%08x", iter.subevnt()->fFullId));
00367 continue;
00368 }
00369
00370 SubRec rec;
00371 rec.buf = buf;
00372 rec.evnt = iter.evnt();
00373 rec.sub = iter.subevnt();
00374
00375 if (syncid > 0xffff00) { EOUT(("SYNCID PANIK 0x%06x !!!", syncid)); exit(1); }
00376
00377 if ((prevsyncid>=0) && (syncid!=prevsyncid))
00378 EOUT(("SYNCID PAINCK: sync missmatch in event %x id1:%x id2:%x", evntid, prevsyncid, syncid));
00379
00380 prevsyncid = syncid;
00381
00382
00383
00384
00385
00386 if (DoWrite())
00387 fMap[(syncid << 4) | findkey] = rec;
00388
00389 fStatistic[findkey]++;
00390 }
00391
00392 if (numsub > 10) completecnt++;
00393
00394
00395
00396 }
00397
00398 }
00399
00400
00401 virtual void MainLoop()
00402 {
00403 dabc::Buffer* buf = 0;
00404
00405 std::list<dabc::Buffer*> queue;
00406
00407
00408
00409 while ((buf = Recv(Input(), 5.)) != 0) {
00410
00411 if (buf->GetTypeId() == dabc::mbt_EOF) break;
00412
00413
00414 ProduceBufferIndex(buf);
00415 queue.push_back(buf);
00416
00417 if (queue.size() >= 500) {
00418
00419 FlushBuffer(queue.front());
00420 queue.pop_front();
00421 }
00422 }
00423
00424 while (queue.size()>0) {
00425 FlushBuffer(queue.front());
00426 queue.pop_front();
00427 }
00428
00429 Send(Output(), fOutIter.Close());
00430
00431 Send(Output(), buf);
00432
00433 while (Output()->OutputPending() > 0) {
00434 DOUT0(("Waiting until data is written %d", Output()->OutputPending()));
00435 ProcessorSleep(1.);
00436 }
00437
00438 ProcessorSleep(1.);
00439 }
00440
00441
00442 virtual void AfterModuleStop()
00443 {
00444 DOUT0(("NUM EVENTS: total:%d MBS:%d complete:%d", bufcnt, evntcnt, completecnt));
00445
00446 size_t pos = fRunName.find("run");
00447 if (pos==std::string::npos) pos = 0;
00448
00449 DOUT0(("| Name | File | Comp | MBS | Spa1 | Spa2 | Spa3 | Spa4 | Spa5 | Spa6 | RICH | STS0 | STS1 |"));
00450 DOUT0(("| %8s | %5d | %5d | %5d | %5d | %5d | %5d | %5d | %5d | %5d | %5d | %5d | %5d |",
00451 fRunName.c_str() + pos, bufcnt, completecnt,
00452 fStatistic[1], fStatistic[2], fStatistic[3], fStatistic[4], fStatistic[5], fStatistic[6],
00453 fStatistic[7], fStatistic[8], fStatistic[9], fStatistic[10]));
00454
00455 dabc::mgr()->GetApp()->Submit(new dabc::Command("CheckModulesRunning"));
00456 }
00457 };
00458
00459
00460
00461 class CernNov10Factory : public dabc::Factory {
00462 public:
00463
00464 CernNov10Factory(const char* name) : dabc::Factory(name) {}
00465
00466 virtual dabc::Module* CreateModule(const char* classname, const char* modulename, dabc::Command* cmd)
00467 {
00468 if (strcmp(classname,"CernNov10SorterModule")==0)
00469 return new CernNov10SorterModule(modulename, cmd);
00470
00471 return 0;
00472 }
00473 };
00474
00475 dabc::FactoryPlugin cernnov10(new CernNov10Factory("cern-nov10"));
00476
00477
00478 extern "C" void RunSorter()
00479 {
00480 dabc::mgr()->CreateMemoryPool("Pool", 500000, 1000);
00481
00482 dabc::mgr()->CreateModule("CernNov10SorterModule", "Sorter", "SorterThrd");
00483
00484 CernNov10SorterModule* m = (CernNov10SorterModule*) dabc::mgr()->FindModule("Sorter");
00485
00486 if (!dabc::mgr()->CreateTransport("Sorter/Input", mbs::typeLmdInput, "SorterThrd")) exit(1);
00487
00488 if (m->DoWrite())
00489 if (!dabc::mgr()->CreateTransport("Sorter/Output", mbs::typeLmdOutput, "SorterThrd")) exit(1);
00490 }
00491
00492