• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

beamtime/cern-oct11/sorter/oct11-sorter.cxx (r4864/r3084)

Go to the documentation of this file.
00001 // Small conversion program to resort events, which were wrongly combined by event builder until 21.10, 15:30
00002 
00003 #include "dabc/logging.h"
00004 #include "dabc/timing.h"
00005 #include "dabc/ModuleSync.h"
00006 #include "dabc/Port.h"
00007 #include "dabc/Timer.h"
00008 #include "dabc/Command.h"
00009 #include "dabc/Manager.h"
00010 #include "dabc/MemoryPool.h"
00011 #include "dabc/PoolHandle.h"
00012 #include "dabc/threads.h"
00013 #include "dabc/Application.h"
00014 #include "dabc/SocketDevice.h"
00015 #include "dabc/statistic.h"
00016 #include "dabc/TimeSyncModule.h"
00017 #include "dabc/Factory.h"
00018 #include "dabc/Configuration.h"
00019 
00020 #include "mbs/MbsTypeDefs.h"
00021 #include "mbs/Iterator.h"
00022 
00023 #include <string>
00024 #include <vector>
00025 
00026    enum ERocMbsTypes {
00027       proc_RocEvent     =  1,   
00028       proc_ErrEvent     =  2,   
00029       proc_MergedEvent  =  3,   
00030       proc_RawData      =  4,   
00031       proc_Triglog      =  5,   
00032       proc_TRD_MADC     =  6,   
00033       proc_TRD_Spadic   =  7,   
00034       proc_CERN_Oct11   =  8,   
00035       proc_SlaveMbs     =  9,   
00036       proc_EPICS        = 10,   
00037       proc_COSY_Nov11   = 11    
00038    };
00039 
00040 std::string GetHeaderInfo(uint32_t id) 
00041 {
00042    switch (id & 0xff) {
00043       case proc_RocEvent    : return dabc::format("ROC%02d", (id >> 16) & 0xff);
00044       case proc_ErrEvent    : return "ErrEvent";
00045       case proc_MergedEvent : return "Merged";
00046       case proc_RawData     : return "RawData";
00047       case proc_Triglog     : return "Triglog-M";
00048       case proc_TRD_MADC    : return "Cern-nov10";
00049       case proc_TRD_Spadic  : return dabc::format("SPADIC%02d", (id >> 16) & 0xff);
00050       case proc_CERN_Oct11  : return dabc::format("CERN-%s", (id >> 24) == 0x9 ? "M" : "S");
00051       case proc_SlaveMbs    : return "Triglog-S";
00052       case proc_EPICS       : return "EPICS";
00053    }
00054    return "undef";
00055 }
00056 
00057 int last_master(0), last_slave(0), last_mastertm(0);
00058 
00059 int GetEventNumber(uint32_t id, void* dataptr, int datalen)
00060 {
00061    uint8_t* data = (uint8_t*) dataptr;
00062    switch (id & 0xff) {
00063      case proc_RocEvent: {
00064         int ownid =  (*((uint32_t*)  (data+8)) >> 6) & 0xffffff;
00065         int ownid2 =  (*((uint32_t*)  (data+datalen - 4)) >> 6) & 0xffffff;
00066         if (ownid+1 != ownid2) EOUT(("ROC SYNC PANICK"));
00067         return ownid;
00068      }
00069      case proc_TRD_Spadic: {
00070         int ownid = data[360] << 16 | data[361] << 8 | data[362];
00071         return ownid;
00072      }
00073      case proc_Triglog: {
00074         int ownid = *((uint32_t*)dataptr);
00075         last_master = ownid;
00076         last_mastertm = *((uint32_t*) (data + 8));
00077         return ownid;
00078      }
00079      case proc_SlaveMbs: {
00080         int ownid = *((uint32_t*)dataptr);
00081         last_slave = ownid;
00082         return ownid;
00083      }
00084      case proc_CERN_Oct11  : 
00085        return (id >> 24) == 0x9 ? last_master : last_slave;
00086        
00087      case proc_EPICS:
00088        return last_master;
00089    }
00090  
00091    EOUT(("Cannot define event ID"));
00092    
00093    return 0;
00094 }
00095 
00096 
00097 #include <map> 
00098 
00099 typedef std::map<uint32_t,int> SubeventsMap;
00100 
00101 class CernOct11SorterModule : public dabc::ModuleSync {
00102    protected:
00103 
00104       int fBufferSize;
00105       bool fDoWriting;
00106 
00107 
00108    public:
00109       CernOct11SorterModule(const char* name, dabc::Command cmd) :
00110          dabc::ModuleSync(name, cmd)
00111       {
00112          fBufferSize = CreatePar(dabc::xmlBufferSize).AsInt(16*1024);
00113          
00114          fDoWriting = CreatePar("DoWriting").AsBool(false);
00115          
00116          CreatePar("RunName");
00117          
00118          CreatePoolHandle("Pool");
00119 
00120          CreateInput("Input", Pool());
00121 
00122          CreateOutput("Output", Pool());
00123       }
00124 
00125 
00126       virtual void MainLoop()
00127       {
00128          dabc::Buffer buf;
00129 
00130          // just loop over all buffers, taken from file
00131          
00132          int bufcnt(0), evntcnt(0),  errcnt(0);
00133          int64_t totalsz(0);
00134          
00135          SubeventsMap fMap;
00136          std::vector<int> triggers(16);
00137          
00138          uint8_t* lastbuf(0), *nextbuf(0);
00139          int lastsize(0), nextsize(0), lastid(-1), nextid(-1);
00140          int first_tm(0), last_tm(0);
00141          
00142          if (fDoWriting) {
00143            lastbuf = new uint8_t[1000000];
00144            nextbuf = new uint8_t[1000000];
00145          }
00146 
00147          while (true) {
00148 
00149             buf << Recv(Input(), 5.);
00150 
00151             if (buf.GetTypeId() == dabc::mbt_EOF) {
00152               // DOUT1(("Found EOF buffer len = %d", buf.GetTotalSize()));
00153               break;
00154             }
00155             
00156             if (!buf.ispool()) {
00157                EOUT(("Get empty buffer id = %d", buf.GetTypeId()));
00158                break;
00159             }
00160             
00161             totalsz+=buf.GetTotalSize();
00162    
00163             mbs::ReadIterator iter(buf);
00164             while (iter.NextEvent()) {
00165                evntcnt++;
00166                
00167                //DOUT0(("============ NEXT EVENT %d tr %d ========== ", iter.evnt()->iEventNumber, iter.evnt()->iTrigger));
00168                
00169                if (iter.evnt()->iTrigger < 16)
00170                  triggers[iter.evnt()->iTrigger]++;
00171              
00172                int maxid(-1), minid(-1);
00173                
00174                if (fDoWriting) {
00175                   memcpy(nextbuf, iter.evnt(), sizeof(mbs::EventHeader));
00176                   nextsize = sizeof(mbs::EventHeader);
00177                }
00178                
00179                while (iter.NextSubEvent()) {
00180                    mbs::SubeventHeader* sub = iter.subevnt();
00181                    
00182                    uint32_t fullid = sub->fFullId;
00183                    fMap[fullid]++;
00184                    
00185                    int evid = GetEventNumber(fullid, sub->RawData(), sub->RawDataSize());
00186                    
00187                    //DOUT0(("   Next subevent %0x08x  evid %6d", fullid, evid));
00188                    
00189                    if (maxid<0) { maxid = evid; nextid = evid; } else if (evid>maxid) maxid=evid;
00190                    if (minid<0) minid = evid; else if (evid<minid) minid=evid;
00191                    
00192                    if (!fDoWriting) continue;
00193                    
00194                    if (evid==nextid) {
00195                       memcpy(nextbuf + nextsize, sub, sub->FullSize());
00196                       nextsize += sub->FullSize();
00197                    } else 
00198                    if (evid == lastid) {
00199                       memcpy(lastbuf + lastsize, sub, sub->FullSize());
00200                       lastsize += sub->FullSize();
00201                    } else 
00202                    if (lastid>0) {
00203                      if (evid > lastid+100) {
00204                         EOUT(("Complete missmatch next %d last %d evid %d", nextid, lastid, evid));
00205                         exit(5);
00206                      }
00207                      DOUT0(("Event jump next:%d last:%d", nextid, lastid));
00208                      lastid = -1; lastsize = 0;
00209                    }
00210                }
00211                
00212                last_tm = last_mastertm;
00213                if (first_tm==0) first_tm = last_tm;
00214                
00215                if ((maxid!=minid) && !fDoWriting) {
00216                  errcnt++; 
00217                  if (errcnt<100) EOUT(("EventID missmatch %d %d master:%d slave:%d", minid, maxid, last_master, last_slave));
00218                }
00219                
00220                if (fDoWriting) {
00221                  if (lastid>0) {
00222                     mbs::EventHeader* hdr = (mbs::EventHeader*) lastbuf;
00223                     hdr->SetFullSize(lastsize);
00224                     // writing data
00225 
00226                    dabc::Buffer outbuf = Pool()->TakeBuffer(lastsize);
00227                    outbuf.SetTotalSize(lastsize);
00228                    outbuf.CopyFrom(lastbuf, lastsize);
00229                    outbuf.SetTypeId(mbs::mbt_MbsEvents);
00230                    Send(Output(), outbuf, 5);
00231                 }
00232                  
00233                  lastid = nextid;
00234                  lastsize = nextsize;
00235                  
00236                  uint8_t* d = lastbuf; lastbuf = nextbuf; nextbuf = d;
00237                  nextid = -1; nextsize = 0;
00238                }
00239                
00240             }
00241             
00242             bufcnt++;
00243         }
00244         
00245         if (fDoWriting) {
00246            Send(Output(), buf);
00247            while (Output()->OutputPending() > 0) {
00248               DOUT0(("Waiting until data is written %d", Output()->OutputPending()));
00249               WorkerSleep(1.);
00250            }
00251         }
00252          
00253          DOUT0(("Found %d events in %d buffers totalsz %d", evntcnt, bufcnt, totalsz));
00254          for (unsigned n=0;n<triggers.size();n++)
00255            if (triggers[n]>0)
00256              DOUT1(("   Trigger %2u found %6d times", n, triggers[n]));
00257          
00258          DOUT1(("Found %u different subevents", fMap.size()));
00259          
00260          SubeventsMap:: iterator it = fMap.begin();
00261          while (it != fMap.end()) {
00262             
00263             std::string info = GetHeaderInfo(it->first);
00264            
00265             DOUT1(("   Subevent 0x%08x  %10s found %6d times", it->first,  info.c_str(), it->second));
00266             
00267             it++;
00268          }
00269 
00270          if (errcnt>0) EOUT(("TOTAL ERRORS COUNT = %d", errcnt));
00271 
00272          int tm = last_tm-first_tm;
00273          
00274          DOUT0(("HEAD: |              *Name* | *Time,s* | *Size,MB* | *Events* |  *TR1* |  *TR2* |  *TR8* | *EPICS* |"));
00275          DOUT0(("STAT: | %17s |  %5d   |    %5.1f  |  %6d  | %6d | %6d | %6d |   %4d  |", 
00276                 Par("RunName").AsStr("---"), tm , totalsz/1e6, evntcnt, triggers[1], triggers[2], triggers[8], fMap[0xa]));
00277          
00278          WorkerSleep(1.);
00279          
00280          delete [] lastbuf;
00281          delete [] nextbuf;
00282       }
00283 
00284 
00285       virtual void AfterModuleStop()
00286       {
00287 //         DOUT0(("After module stop"));
00288          
00289          dabc::mgr.StopApplication();
00290       }
00291 };
00292 
00293 
00294 class CernOct11Factory : public dabc::Factory  {
00295    public:
00296 
00297       CernOct11Factory(const char* name) : dabc::Factory(name) {}
00298 
00299       virtual dabc::Module* CreateModule(const char* classname, const char* modulename, dabc::Command cmd)
00300       {
00301          if (strcmp(classname,"CernOct11SorterModule")==0)
00302             return new CernOct11SorterModule(modulename, cmd);
00303 
00304          return 0;
00305       }
00306 };
00307 
00308 dabc::FactoryPlugin cernoct11(new CernOct11Factory("cern-oct11"));
00309 
00310 
00311 extern "C" void RunSorter()
00312 {
00313    dabc::mgr.CreateMemoryPool("Pool", 500000, 1000);
00314    
00315    dabc::mgr.CreateModule("CernOct11SorterModule", "Sorter", "SorterThrd");
00316 
00317    dabc::mgr.CreateTransport("Sorter/Input", mbs::typeLmdInput, "SorterThrd");
00318    
00319   if (dabc::mgr.FindModule("Sorter").Par("DoWriting").AsBool())
00320      dabc::mgr.CreateTransport("Sorter/Output", mbs::typeLmdOutput, "SorterThrd");
00321   else
00322      DOUT0(("Without writing"));
00323 }

Generated on Tue Dec 10 2013 04:52:18 for ROCsoft by  doxygen 1.7.1