00001
00002
00003 #include "dabc/logging.h"
00004 #include "dabc/timing.h"
00005 #include "dabc/ModuleSync.h"
00006 #include "dabc/Port.h"
00007 #include "dabc/Timer.h"
00008 #include "dabc/Command.h"
00009 #include "dabc/Manager.h"
00010 #include "dabc/MemoryPool.h"
00011 #include "dabc/PoolHandle.h"
00012 #include "dabc/threads.h"
00013 #include "dabc/Application.h"
00014 #include "dabc/SocketDevice.h"
00015 #include "dabc/statistic.h"
00016 #include "dabc/TimeSyncModule.h"
00017 #include "dabc/Factory.h"
00018 #include "dabc/Configuration.h"
00019
00020 #include "mbs/MbsTypeDefs.h"
00021 #include "mbs/Iterator.h"
00022
00023 #include <string>
00024 #include <vector>
00025
00026 enum ERocMbsTypes {
00027 proc_RocEvent = 1,
00028 proc_ErrEvent = 2,
00029 proc_MergedEvent = 3,
00030 proc_RawData = 4,
00031 proc_Triglog = 5,
00032 proc_TRD_MADC = 6,
00033 proc_TRD_Spadic = 7,
00034 proc_CERN_Oct11 = 8,
00035 proc_SlaveMbs = 9,
00036 proc_EPICS = 10,
00037 proc_COSY_Nov11 = 11
00038 };
00039
00040 std::string GetHeaderInfo(uint32_t id)
00041 {
00042 switch (id & 0xff) {
00043 case proc_RocEvent : return dabc::format("ROC%02d", (id >> 16) & 0xff);
00044 case proc_ErrEvent : return "ErrEvent";
00045 case proc_MergedEvent : return "Merged";
00046 case proc_RawData : return "RawData";
00047 case proc_Triglog : return "Triglog-M";
00048 case proc_TRD_MADC : return "Cern-nov10";
00049 case proc_TRD_Spadic : return dabc::format("SPADIC%02d", (id >> 16) & 0xff);
00050 case proc_CERN_Oct11 : return dabc::format("CERN-%s", (id >> 24) == 0x9 ? "M" : "S");
00051 case proc_SlaveMbs : return "Triglog-S";
00052 case proc_EPICS : return "EPICS";
00053 }
00054 return "undef";
00055 }
00056
00057 int last_master(0), last_slave(0), last_mastertm(0);
00058
00059 int GetEventNumber(uint32_t id, void* dataptr, int datalen)
00060 {
00061 uint8_t* data = (uint8_t*) dataptr;
00062 switch (id & 0xff) {
00063 case proc_RocEvent: {
00064 int ownid = (*((uint32_t*) (data+8)) >> 6) & 0xffffff;
00065 int ownid2 = (*((uint32_t*) (data+datalen - 4)) >> 6) & 0xffffff;
00066 if (ownid+1 != ownid2) EOUT(("ROC SYNC PANICK"));
00067 return ownid;
00068 }
00069 case proc_TRD_Spadic: {
00070 int ownid = data[360] << 16 | data[361] << 8 | data[362];
00071 return ownid;
00072 }
00073 case proc_Triglog: {
00074 int ownid = *((uint32_t*)dataptr);
00075 last_master = ownid;
00076 last_mastertm = *((uint32_t*) (data + 8));
00077 return ownid;
00078 }
00079 case proc_SlaveMbs: {
00080 int ownid = *((uint32_t*)dataptr);
00081 last_slave = ownid;
00082 return ownid;
00083 }
00084 case proc_CERN_Oct11 :
00085 return (id >> 24) == 0x9 ? last_master : last_slave;
00086
00087 case proc_EPICS:
00088 return last_master;
00089 }
00090
00091 EOUT(("Cannot define event ID"));
00092
00093 return 0;
00094 }
00095
00096
00097 #include <map>
00098
00099 typedef std::map<uint32_t,int> SubeventsMap;
00100
00101 class CernOct11SorterModule : public dabc::ModuleSync {
00102 protected:
00103
00104 int fBufferSize;
00105 bool fDoWriting;
00106
00107
00108 public:
00109 CernOct11SorterModule(const char* name, dabc::Command cmd) :
00110 dabc::ModuleSync(name, cmd)
00111 {
00112 fBufferSize = CreatePar(dabc::xmlBufferSize).AsInt(16*1024);
00113
00114 fDoWriting = CreatePar("DoWriting").AsBool(false);
00115
00116 CreatePar("RunName");
00117
00118 CreatePoolHandle("Pool");
00119
00120 CreateInput("Input", Pool());
00121
00122 CreateOutput("Output", Pool());
00123 }
00124
00125
00126 virtual void MainLoop()
00127 {
00128 dabc::Buffer buf;
00129
00130
00131
00132 int bufcnt(0), evntcnt(0), errcnt(0);
00133 int64_t totalsz(0);
00134
00135 SubeventsMap fMap;
00136 std::vector<int> triggers(16);
00137
00138 uint8_t* lastbuf(0), *nextbuf(0);
00139 int lastsize(0), nextsize(0), lastid(-1), nextid(-1);
00140 int first_tm(0), last_tm(0);
00141
00142 if (fDoWriting) {
00143 lastbuf = new uint8_t[1000000];
00144 nextbuf = new uint8_t[1000000];
00145 }
00146
00147 while (true) {
00148
00149 buf << Recv(Input(), 5.);
00150
00151 if (buf.GetTypeId() == dabc::mbt_EOF) {
00152
00153 break;
00154 }
00155
00156 if (!buf.ispool()) {
00157 EOUT(("Get empty buffer id = %d", buf.GetTypeId()));
00158 break;
00159 }
00160
00161 totalsz+=buf.GetTotalSize();
00162
00163 mbs::ReadIterator iter(buf);
00164 while (iter.NextEvent()) {
00165 evntcnt++;
00166
00167
00168
00169 if (iter.evnt()->iTrigger < 16)
00170 triggers[iter.evnt()->iTrigger]++;
00171
00172 int maxid(-1), minid(-1);
00173
00174 if (fDoWriting) {
00175 memcpy(nextbuf, iter.evnt(), sizeof(mbs::EventHeader));
00176 nextsize = sizeof(mbs::EventHeader);
00177 }
00178
00179 while (iter.NextSubEvent()) {
00180 mbs::SubeventHeader* sub = iter.subevnt();
00181
00182 uint32_t fullid = sub->fFullId;
00183 fMap[fullid]++;
00184
00185 int evid = GetEventNumber(fullid, sub->RawData(), sub->RawDataSize());
00186
00187
00188
00189 if (maxid<0) { maxid = evid; nextid = evid; } else if (evid>maxid) maxid=evid;
00190 if (minid<0) minid = evid; else if (evid<minid) minid=evid;
00191
00192 if (!fDoWriting) continue;
00193
00194 if (evid==nextid) {
00195 memcpy(nextbuf + nextsize, sub, sub->FullSize());
00196 nextsize += sub->FullSize();
00197 } else
00198 if (evid == lastid) {
00199 memcpy(lastbuf + lastsize, sub, sub->FullSize());
00200 lastsize += sub->FullSize();
00201 } else
00202 if (lastid>0) {
00203 if (evid > lastid+100) {
00204 EOUT(("Complete missmatch next %d last %d evid %d", nextid, lastid, evid));
00205 exit(5);
00206 }
00207 DOUT0(("Event jump next:%d last:%d", nextid, lastid));
00208 lastid = -1; lastsize = 0;
00209 }
00210 }
00211
00212 last_tm = last_mastertm;
00213 if (first_tm==0) first_tm = last_tm;
00214
00215 if ((maxid!=minid) && !fDoWriting) {
00216 errcnt++;
00217 if (errcnt<100) EOUT(("EventID missmatch %d %d master:%d slave:%d", minid, maxid, last_master, last_slave));
00218 }
00219
00220 if (fDoWriting) {
00221 if (lastid>0) {
00222 mbs::EventHeader* hdr = (mbs::EventHeader*) lastbuf;
00223 hdr->SetFullSize(lastsize);
00224
00225
00226 dabc::Buffer outbuf = Pool()->TakeBuffer(lastsize);
00227 outbuf.SetTotalSize(lastsize);
00228 outbuf.CopyFrom(lastbuf, lastsize);
00229 outbuf.SetTypeId(mbs::mbt_MbsEvents);
00230 Send(Output(), outbuf, 5);
00231 }
00232
00233 lastid = nextid;
00234 lastsize = nextsize;
00235
00236 uint8_t* d = lastbuf; lastbuf = nextbuf; nextbuf = d;
00237 nextid = -1; nextsize = 0;
00238 }
00239
00240 }
00241
00242 bufcnt++;
00243 }
00244
00245 if (fDoWriting) {
00246 Send(Output(), buf);
00247 while (Output()->OutputPending() > 0) {
00248 DOUT0(("Waiting until data is written %d", Output()->OutputPending()));
00249 WorkerSleep(1.);
00250 }
00251 }
00252
00253 DOUT0(("Found %d events in %d buffers totalsz %d", evntcnt, bufcnt, totalsz));
00254 for (unsigned n=0;n<triggers.size();n++)
00255 if (triggers[n]>0)
00256 DOUT1((" Trigger %2u found %6d times", n, triggers[n]));
00257
00258 DOUT1(("Found %u different subevents", fMap.size()));
00259
00260 SubeventsMap:: iterator it = fMap.begin();
00261 while (it != fMap.end()) {
00262
00263 std::string info = GetHeaderInfo(it->first);
00264
00265 DOUT1((" Subevent 0x%08x %10s found %6d times", it->first, info.c_str(), it->second));
00266
00267 it++;
00268 }
00269
00270 if (errcnt>0) EOUT(("TOTAL ERRORS COUNT = %d", errcnt));
00271
00272 int tm = last_tm-first_tm;
00273
00274 DOUT0(("HEAD: | *Name* | *Time,s* | *Size,MB* | *Events* | *TR1* | *TR2* | *TR8* | *EPICS* |"));
00275 DOUT0(("STAT: | %17s | %5d | %5.1f | %6d | %6d | %6d | %6d | %4d |",
00276 Par("RunName").AsStr("---"), tm , totalsz/1e6, evntcnt, triggers[1], triggers[2], triggers[8], fMap[0xa]));
00277
00278 WorkerSleep(1.);
00279
00280 delete [] lastbuf;
00281 delete [] nextbuf;
00282 }
00283
00284
00285 virtual void AfterModuleStop()
00286 {
00287
00288
00289 dabc::mgr.StopApplication();
00290 }
00291 };
00292
00293
00294 class CernOct11Factory : public dabc::Factory {
00295 public:
00296
00297 CernOct11Factory(const char* name) : dabc::Factory(name) {}
00298
00299 virtual dabc::Module* CreateModule(const char* classname, const char* modulename, dabc::Command cmd)
00300 {
00301 if (strcmp(classname,"CernOct11SorterModule")==0)
00302 return new CernOct11SorterModule(modulename, cmd);
00303
00304 return 0;
00305 }
00306 };
00307
00308 dabc::FactoryPlugin cernoct11(new CernOct11Factory("cern-oct11"));
00309
00310
00311 extern "C" void RunSorter()
00312 {
00313 dabc::mgr.CreateMemoryPool("Pool", 500000, 1000);
00314
00315 dabc::mgr.CreateModule("CernOct11SorterModule", "Sorter", "SorterThrd");
00316
00317 dabc::mgr.CreateTransport("Sorter/Input", mbs::typeLmdInput, "SorterThrd");
00318
00319 if (dabc::mgr.FindModule("Sorter").Par("DoWriting").AsBool())
00320 dabc::mgr.CreateTransport("Sorter/Output", mbs::typeLmdOutput, "SorterThrd");
00321 else
00322 DOUT0(("Without writing"));
00323 }