• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

beamtime/cern-oct12/sorter/oct12-sorter.cxx (r4864/r3699)

Go to the documentation of this file.
00001 // Small read and merge program, for FASP data
00002 
00003 #include "dabc/logging.h"
00004 #include "dabc/timing.h"
00005 #include "dabc/ModuleSync.h"
00006 #include "dabc/Port.h"
00007 #include "dabc/Timer.h"
00008 #include "dabc/Command.h"
00009 #include "dabc/Manager.h"
00010 #include "dabc/MemoryPool.h"
00011 #include "dabc/PoolHandle.h"
00012 #include "dabc/threads.h"
00013 #include "dabc/Application.h"
00014 #include "dabc/SocketDevice.h"
00015 #include "dabc/statistic.h"
00016 #include "dabc/Factory.h"
00017 #include "dabc/Configuration.h"
00018 
00019 #include "mbs/MbsTypeDefs.h"
00020 #include "mbs/Iterator.h"
00021 
00022 #include <string>
00023 #include <vector>
00024 
00025 enum ERocMbsTypes {
00026    proc_RocEvent     =  1,   
00027    proc_ErrEvent     =  2,   
00028    proc_MergedEvent  =  3,   
00029    proc_RawData      =  4,   
00030    proc_Triglog      =  5,   
00031    proc_TRD_MADC     =  6,   
00032    proc_TRD_Spadic   =  7,   
00033    proc_CERN_Oct11   =  8,   
00034    proc_SlaveMbs     =  9,   
00035    proc_EPICS        = 10,   
00036    proc_COSY_Nov11   = 11,   
00037    proc_SpadicV10Raw = 12,   
00038    proc_SpadicV10Event = 13, 
00039    proc_CERN_Oct12   = 14,    
00040    proc_FASP         = 15,    
00041    proc_TRBEvent     = 31     
00042 };
00043 
00044 std::string GetHeaderInfo(uint32_t id) 
00045 {
00046    switch (id & 0xff) {
00047    case proc_RocEvent    : return dabc::format("ROC%02d", (id >> 16) & 0xff);
00048    case proc_ErrEvent    : return "ErrEvent";
00049    case proc_MergedEvent : return "Merged";
00050    case proc_RawData     : return "RawData";
00051    case proc_Triglog     : return "Triglog-M";
00052    case proc_TRD_MADC    : return "Cern-nov10";
00053    case proc_TRD_Spadic  : return dabc::format("SPADIC%02d", (id >> 16) & 0xff);
00054    case proc_CERN_Oct11  : return dabc::format("CERN-%s", (id >> 24) == 0x9 ? "M" : "S");
00055    case proc_SlaveMbs    : return "Triglog-S";
00056    case proc_EPICS       : return "EPICS";
00057    case proc_CERN_Oct12  : return dabc::format("CERN-%s", (id >> 24) == 0x9 ? "M" : "S");
00058    case proc_FASP        : return "FASP";
00059    case proc_TRBEvent    : return "TRB";
00060    }
00061    return "undef";
00062 }
00063 
00064 int last_master(0), last_slave(0), last_mastertm(0), mbseventid(0);
00065 
00066 int GetEventNumber(uint32_t id, void* dataptr, int datalen)
00067 {
00068    uint8_t* data = (uint8_t*) dataptr;
00069    switch (id & 0xff) {
00070    case proc_RocEvent: {
00071       int ownid =  (*((uint32_t*)  (data+8)) >> 6) & 0xffffff;
00072       int ownid2 =  (*((uint32_t*)  (data+datalen - 4)) >> 6) & 0xffffff;
00073       if (ownid+1 != ownid2) EOUT(("ROC SYNC PANICK"));
00074       return ownid;
00075    }
00076    case proc_TRD_Spadic: {
00077       int ownid = data[360] << 16 | data[361] << 8 | data[362];
00078       return ownid;
00079    }
00080    case proc_Triglog: {
00081       int ownid = *((uint32_t*)dataptr);
00082       last_master = ownid;
00083       last_mastertm = *((uint32_t*) (data + 8));
00084       return ownid;
00085    }
00086    case proc_SlaveMbs: {
00087       int ownid = *((uint32_t*)dataptr);
00088       last_slave = ownid;
00089       return ownid;
00090    }
00091    case proc_CERN_Oct12  :
00092       return (id >> 24) == 0x9 ? last_master : last_slave;
00093 
00094    case proc_EPICS:
00095       return last_master;
00096 
00097    case proc_FASP:
00098       return last_master;
00099 
00100    case proc_TRBEvent:
00101       return last_master;
00102    }
00103 
00104    EOUT(("Cannot define event ID"));
00105 
00106    return 0;
00107 }
00108 
00109 
00110 #include <map> 
00111 
00112 typedef std::map<uint32_t,int> SubeventsMap;
00113 
00114 class CernOct12SorterModule : public dabc::ModuleSync {
00115 protected:
00116 
00117    int fBufferSize;
00118    bool fDoFASP;
00119    bool fDoReading;
00120    bool fDoWriting;  // write merged file
00121    bool fOnlyFull;  //  produce only full events
00122 
00123 
00124 public:
00125    CernOct12SorterModule(const char* name, dabc::Command cmd) :
00126       dabc::ModuleSync(name, cmd)
00127    {
00128       fBufferSize = CreatePar(dabc::xmlBufferSize).AsInt(16*1024);
00129 
00130       fDoFASP = CreatePar("DoFASP").AsBool(false);
00131       fDoReading = CreatePar("DoReading").AsBool(false);
00132       fDoWriting = CreatePar("DoWriting").AsBool(false);
00133       fOnlyFull = CreatePar("OnlyFull").AsBool(false);
00134 
00135       CreatePar("RunName");
00136 
00137       CreatePoolHandle("Pool");
00138 
00139       if (fDoReading) CreateInput("Input", Pool());
00140       if (fDoFASP) CreateInput("FASP", Pool());
00141       if (fDoWriting) CreateOutput("Output", Pool());
00142    }
00143 
00144    virtual void MainLoop()
00145    {
00146       if (!fDoReading && fDoWriting && fDoFASP) {
00147          FaspLoop();
00148          return;
00149       }
00150 
00151 
00152       if (!fDoWriting) 
00153          if (fDoFASP ^ fDoReading) {
00154             NormalLoop();
00155             return;
00156          }
00157 
00158       if (fDoFASP && fDoReading)
00159          CombinerLoop();
00160 
00161       DOUT0(("Nothing to do"));
00162    }
00163 
00164    void FaspLoop()
00165    {
00166       // this is just for converting of the FASP files to LMD
00167 
00168       dabc::Buffer buf;
00169 
00170       int fullsz(0), fullcnt(0);
00171 
00172       bool iseof(false);
00173 
00174       while (!iseof) {
00175          buf = Recv(Input(), 5.);
00176 
00177          fullsz += buf.GetTotalSize();
00178 
00179          if (buf.GetTypeId() == mbs::mbt_MbsEvents)
00180             fullcnt += mbs::ReadIterator::NumEvents(buf);
00181 
00182          iseof = (buf.GetTypeId() == dabc::mbt_EOF);
00183 
00184          if (fDoWriting) {
00185             // DOUT0(("Write buffer"));
00186             Send(Output(), buf.HandOver());
00187          }
00188       }
00189 
00190       if (iseof)
00191          DOUT0(("END-OF-FILE detected!!"));
00192 
00193       DOUT0(("Totally received bytes %d  events %d", fullsz, fullcnt));
00194 
00195       if (fDoWriting)
00196          while (Output()->OutputPending() > 0) {
00197             DOUT0(("Waiting until data is written %d", Output()->OutputPending()));
00198             WorkerSleep(1.);
00199          }
00200    }
00201 
00202 
00203    void NormalLoop()
00204    {
00205       // this is to read normal LMD files and count statistic
00206 
00207       dabc::Buffer buf;
00208 
00209       // just loop over all buffers, taken from file
00210 
00211       int bufcnt(0), evntcnt(0),  errcnt(0);
00212       int64_t totalsz(0);
00213 
00214       SubeventsMap fMap;
00215       std::vector<int> triggers(16);
00216 
00217       int first_tm(0), last_tm(0);
00218 
00219       while (true) {
00220 
00221          buf = Recv(Input(), 5.);
00222 
00223          if (buf.GetTypeId() == dabc::mbt_EOF) {
00224             DOUT3(("Found EOF buffer len = %d", buf.GetTotalSize()));
00225             break;
00226          }
00227 
00228          if (buf.null()) {
00229             EOUT(("Get empty buffer id = %d", buf.GetTypeId()));
00230             break;
00231          }
00232 
00233          totalsz+=buf.GetTotalSize();
00234 
00235          mbs::ReadIterator iter(buf);
00236          while (iter.NextEvent()) {
00237             evntcnt++;
00238 
00239             //DOUT0(("============ NEXT EVENT %d tr %d ========== ", iter.evnt()->iEventNumber, iter.evnt()->iTrigger));
00240 
00241             if (iter.evnt()->iTrigger < 16)
00242                triggers[iter.evnt()->iTrigger]++;
00243 
00244             mbseventid = iter.evnt()->EventNumber();
00245 
00246             int maxid(-1), minid(-1);
00247 
00248             while (iter.NextSubEvent()) {
00249                mbs::SubeventHeader* sub = iter.subevnt();
00250 
00251                uint32_t fullid = sub->fFullId;
00252 
00253                fMap[fullid]++;
00254 
00255                int evid = GetEventNumber(fullid, sub->RawData(), sub->RawDataSize());
00256 
00257                // DOUT0(("%10s  %7d", GetHeaderInfo(fullid).c_str(), evid));
00258 
00259                //DOUT0(("   Next subevent %0x08x  evid %6d", fullid, evid));
00260 
00261                if (maxid<0) { maxid = evid; } else if (evid>maxid) maxid=evid;
00262                if (minid<0) minid = evid; else if (evid<minid) minid=evid;
00263             }
00264 
00265             last_tm = last_mastertm;
00266             if (first_tm==0) first_tm = last_tm;
00267 
00268             if ((maxid!=minid) && !fDoWriting) {
00269                errcnt++;
00270                if (errcnt<100) EOUT(("EventID missmatch %d %d master:%d slave:%d", minid, maxid, last_master, last_slave));
00271             }
00272          }
00273 
00274          bufcnt++;
00275       }
00276 
00277       DOUT0(("Found %d events in %d buffers totalsz %d", evntcnt, bufcnt, totalsz));
00278       for (unsigned n=0;n<triggers.size();n++)
00279          if (triggers[n]>0)
00280             DOUT1(("   Trigger %2u found %6d times", n, triggers[n]));
00281 
00282       DOUT1(("Found %u different subevents", fMap.size()));
00283 
00284       SubeventsMap:: iterator it = fMap.begin();
00285       while (it != fMap.end()) {
00286 
00287          std::string info = GetHeaderInfo(it->first);
00288 
00289          DOUT1(("   Subevent 0x%08x  %10s found %6d times", it->first,  info.c_str(), it->second));
00290 
00291          it++;
00292       }
00293 
00294       if (errcnt>0) EOUT(("TOTAL ERRORS COUNT = %d", errcnt));
00295 
00296       int tm = last_tm-first_tm;
00297 
00298       DOUT0(("HEAD: |              *Name* | *Time,s* | *Size,MB* | *Events* |  *TR1* |  *TR2* |  *TR8* | *EPICS* |"));
00299       DOUT0(("STAT: |   %17s |  %5d   |    %5.1f  |  %6d  | %6d | %6d | %6d |   %4d  |",
00300             Par("RunName").AsStr("---"), tm , totalsz/1e6, evntcnt, triggers[1], triggers[2], triggers[8], fMap[0xa]));
00301 
00302       WorkerSleep(1.);
00303    }
00304 
00305 
00306    void AccountLoop(int kind, int eventid)
00307    {
00308       static int lastkind = kind;
00309       static int left = eventid;
00310       static int right = eventid;
00311 
00312       if (kind == lastkind) {
00313          right = eventid;
00314          return;
00315       }
00316 
00317       std::string info, sinfo;
00318       if (left==right) dabc::formats(info, "event %d", left);
00319                   else dabc::formats(info, "events from %d to %d", left, right);
00320 
00321       switch (lastkind) {
00322          case 0: sinfo = "MERGE"; break;
00323          case 1: sinfo = "KEEP MAIN"; break;
00324          case 2: sinfo = "SKIP FASP"; break;
00325          case 3: sinfo = "SKIP MAIN"; break;
00326          default: sinfo = "?? WHAT ??"; break;
00327       }
00328 
00329       DOUT1(("%s %s", sinfo.c_str(), info.c_str()));
00330 
00331       lastkind = kind;
00332       left = eventid;
00333       right = eventid;
00334    }
00335 
00336    void VerifyPlace(mbs::WriteIterator& iter, unsigned size)
00337    {
00338       if ((size>0) && iter.IsPlaceForEvent(size-sizeof(mbs::EventHeader))) return;
00339 
00340       dabc::Buffer buf = iter.Close();
00341 
00342       if (fDoWriting) Send(Output(), buf);
00343                  else buf.Release();
00344 
00345       if (size>0)
00346          iter.Reset(Pool()->TakeBuffer(fBufferSize));
00347    }
00348 
00349    void CombinerLoop()
00350    {
00351       // this is to read and combine FASP and normal LMD files
00352 
00353       // Be aware - in FASP sync number is in event header,
00354       //            in LMD files - in
00355 
00356       mbs::ReadIterator iter1, iter2;
00357 
00358       int currid1(-1), currid2(-1);
00359 
00360       mbs::WriteIterator out_iter;
00361 
00362       // just loop over all buffers, taken from file
00363 
00364       int bufcnt(0), evntcnt(0),  errcnt(0);
00365       int64_t totalsz(0);
00366 
00367       SubeventsMap fMap;
00368       std::vector<int> triggers(16);
00369 
00370       int first_tm(0), last_tm(0);
00371 
00372       bool end_of_fasp_file(false), end_of_lmd_file(false);
00373 
00374       while (!end_of_lmd_file || !end_of_fasp_file) {
00375 
00376          if ((currid1<0) && !end_of_lmd_file) {
00377             iter1.NextEvent();
00378 
00379             if (!iter1.IsData()) {
00380                dabc::Buffer buf1 = Recv(Input(0), 5.);
00381                bufcnt++;
00382                if (buf1.GetTypeId() == dabc::mbt_EOF) {
00383                   end_of_lmd_file = true;
00384                } else
00385                if (buf1.null()) {
00386                   EOUT(("Get empty buffer id = %d", buf1.GetTypeId()));
00387                   break;
00388                } else
00389                if (!iter1.Reset(buf1.HandOver())) {
00390                   EOUT(("Problem to reset iter1"));
00391                   break;
00392                }
00393                if (!end_of_lmd_file)
00394                   iter1.NextEvent();
00395             }
00396 
00397             if (iter1.IsData()) {
00398                int maxid(-1), minid(-1);
00399 
00400                while (iter1.NextSubEvent()) {
00401                   mbs::SubeventHeader* sub = iter1.subevnt();
00402                   uint32_t fullid = sub->fFullId;
00403                   fMap[fullid]++;
00404 
00405                   int evid = GetEventNumber(fullid, sub->RawData(), sub->RawDataSize());
00406                   //DOUT0(("   Next subevent %0x08x  evid %6d", fullid, evid));
00407                   if (maxid<0) { maxid = evid; } else if (evid>maxid) maxid=evid;
00408                   if (minid<0) minid = evid; else if (evid<minid) minid=evid;
00409                }
00410 
00411                if (maxid!=minid) EOUT(("ID missmatch"));
00412                currid1 = minid;
00413             }
00414          }
00415 
00416          if ((currid2<0) && !end_of_fasp_file) {
00417             iter2.NextEvent();
00418 
00419             if (!iter2.IsData()) {
00420                dabc::Buffer buf2 = Recv(Input(1), 5.);
00421                if (buf2.GetTypeId() == dabc::mbt_EOF) {
00422                   end_of_fasp_file = true;
00423                } else
00424                if (buf2.null()) {
00425                   EOUT(("Get empty FASP buffer id = %d", buf2.GetTypeId()));
00426                   break;
00427                } else
00428                if (!iter2.Reset(buf2.HandOver())) {
00429                   EOUT(("Problem to reset iter2"));
00430                   break;
00431                }
00432                iter2.NextEvent();
00433             }
00434 
00435             if (!end_of_fasp_file)
00436                currid2 = iter2.evnt()->EventNumber();
00437          }
00438 
00439          if ((currid2<0) && (currid1<0)) {
00440             break;
00441          }
00442 
00443          if ((currid1<0) && (currid2>=0)) {
00444             // shift FASP
00445             AccountLoop(2, currid2);
00446             currid2 = -1;
00447             continue;
00448          }
00449 
00450          if ((currid2<0) && (currid1>=0)) {
00451             if (fOnlyFull) {
00452                // skip main data
00453                AccountLoop(3, currid1);
00454                currid1 = -1;
00455                continue;
00456             } else {
00457                // just copy of main data and continue
00458                VerifyPlace(out_iter, iter1.GetEventSize());
00459                out_iter.CopyEventFrom(iter1.GetEventPointer());
00460 
00461                AccountLoop(1, currid1);
00462                currid1 = -1;
00463                evntcnt++;
00464                continue;
00465             }
00466          }
00467 
00468          unsigned diff = ((unsigned) currid1 - (unsigned) currid2) & 0xffffff;
00469 
00470          if (diff == 0) {
00471             // very good, both events are the same
00472 
00473             VerifyPlace(out_iter, iter1.GetEventSize() + iter2.GetEventSize());
00474 
00475             fMap[proc_FASP]++;
00476 
00477             out_iter.CopyEventFrom(iter1.GetEventPointer(), false);
00478             out_iter.AddSubevent(iter2.GetSubeventsPointer());
00479             out_iter.FinishEvent();
00480 
00481             AccountLoop(0, currid1);
00482             currid1 = -1;
00483             currid2 = -1;
00484             evntcnt++;
00485          } else
00486          if (diff<0x800000) {
00487             // FASP id is smaller, shift FASP
00488             AccountLoop(2, currid2);
00489             currid2 = -1;
00490 
00491          } else {
00492             // Normal data is smaller,
00493 
00494             if (fOnlyFull) {
00495                // skip main data
00496                AccountLoop(3, currid1);
00497                currid1 = -1;
00498                continue;
00499 
00500             } else {
00501                //just store normal data
00502                VerifyPlace(out_iter, iter1.GetEventSize());
00503                out_iter.CopyEventFrom(iter1.GetEventPointer());
00504                AccountLoop(1, currid1);
00505                currid1 = -1;
00506                evntcnt++;
00507             }
00508          }
00509       }
00510 
00511       VerifyPlace(out_iter, 0); // flush last data
00512       AccountLoop(-1, -1);
00513 
00514       DOUT0(("Found %d events totalsz %d", evntcnt, totalsz));
00515       for (unsigned n=0;n<triggers.size();n++)
00516          if (triggers[n]>0)
00517             DOUT1(("   Trigger %2u found %6d times", n, triggers[n]));
00518 
00519       DOUT1(("Found %u different subevents", fMap.size()));
00520 
00521       SubeventsMap:: iterator it = fMap.begin();
00522       while (it != fMap.end()) {
00523          std::string info = GetHeaderInfo(it->first);
00524          DOUT1(("   Subevent 0x%08x  %10s found %6d times", it->first,  info.c_str(), it->second));
00525          it++;
00526       }
00527 
00528       if (errcnt>0) EOUT(("TOTAL ERRORS COUNT = %d", errcnt));
00529 
00530       int tm = last_tm-first_tm;
00531 
00532       DOUT0(("HEAD: |              *Name* | *Time,s* | *Size,MB* | *Events* |  *TR1* |  *TR2* |  *TR8* | *EPICS* |"));
00533       DOUT0(("STAT: | %17s |  %5d   |    %5.1f  |  %6d  | %6d | %6d | %6d |   %4d  |",
00534             Par("RunName").AsStr("---"), tm , totalsz/1e6, evntcnt, triggers[1], triggers[2], triggers[8], fMap[0xa]));
00535 
00536       if (fDoWriting)
00537          while (Output()->OutputPending() > 0) {
00538             DOUT0(("Waiting until data is written %d", Output()->OutputPending()));
00539             WorkerSleep(1.);
00540          }
00541 
00542       WorkerSleep(1.);
00543    }
00544 
00545 
00546    virtual void AfterModuleStop()
00547    {
00548       //         DOUT0(("After module stop"));
00549 
00550       dabc::mgr.StopApplication();
00551    }
00552 };
00553 
00554 
00555 class CernOct12Factory : public dabc::Factory  {
00556 public:
00557 
00558    CernOct12Factory(const char* name) : dabc::Factory(name) {}
00559 
00560    virtual dabc::Module* CreateModule(const char* classname, const char* modulename, dabc::Command cmd)
00561    {
00562       if (strcmp(classname,"CernOct12SorterModule")==0)
00563          return new CernOct12SorterModule(modulename, cmd);
00564 
00565       return 0;
00566    }
00567 };
00568 
00569 dabc::FactoryPlugin cernoct12(new CernOct12Factory("cern-oct12"));
00570 
00571 const int FaspBlockSize = 44;
00572 const int FaspSyncPos = 34;
00573 
00574 
00575 
00576 extern "C" void RunSorter()
00577 {
00578    dabc::mgr.CreateMemoryPool("Pool", 5000000, 50);
00579 
00580    dabc::mgr.CreateModule("CernOct12SorterModule", "Sorter", "SorterThrd");
00581 
00582    if (dabc::mgr.FindModule("Sorter").Par("DoReading").AsBool())
00583       dabc::mgr.CreateTransport("Sorter/Input", mbs::typeLmdInput, "SorterThrd");
00584    else
00585       DOUT0(("Without normal reading"));
00586 
00587    if (dabc::mgr.FindModule("Sorter").Par("DoFASP").AsBool())
00588       dabc::mgr.CreateTransport("Sorter/FASP", "fasp::FileInput", "SorterThrd");
00589    else
00590       DOUT0(("Without FASP reading"));
00591 
00592    if (dabc::mgr.FindModule("Sorter").Par("DoWriting").AsBool())
00593       dabc::mgr.CreateTransport("Sorter/Output", mbs::typeLmdOutput, "SorterThrd");
00594    else
00595       DOUT0(("Without writing"));
00596 }

Generated on Tue Dec 10 2013 04:52:18 for ROCsoft by  doxygen 1.7.1