• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

beamtime/cern-nov10/sorter/nov10-sorter.cxx (r4864/r3084)

Go to the documentation of this file.
00001 /********************************************************************
00002  * The Data Acquisition Backbone Core (DABC)
00003  ********************************************************************
00004  * Copyright (C) 2009-
00005  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH
00006  * Planckstr. 1
00007  * 64291 Darmstadt
00008  * Germany
00009  * Contact:  http://dabc.gsi.de
00010  ********************************************************************
00011  * This software can be used under the GPL license agreements as stated
00012  * in LICENSE.txt file which is part of the distribution.
00013  ********************************************************************/
00014 
00015 #include "dabc/logging.h"
00016 #include "dabc/timing.h"
00017 #include "dabc/ModuleSync.h"
00018 #include "dabc/Port.h"
00019 #include "dabc/Timer.h"
00020 #include "dabc/Command.h"
00021 #include "dabc/StandaloneManager.h"
00022 #include "dabc/MemoryPool.h"
00023 #include "dabc/PoolHandle.h"
00024 #include "dabc/threads.h"
00025 #include "dabc/Application.h"
00026 #include "dabc/SocketDevice.h"
00027 #include "dabc/statistic.h"
00028 #include "dabc/TimeSyncModule.h"
00029 #include "dabc/Factory.h"
00030 #include "dabc/Configuration.h"
00031 #include "dabc/CommandsSet.h"
00032 
00033 
00034 #include "mbs/MbsTypeDefs.h"
00035 #include "mbs/Iterator.h"
00036 
00037 //#include "roc/Message.h"
00038 //#include "roc/Sorter.h"
00039 
00040 #include <map>
00041 #include <list>
00042 #include <vector>
00043 
00044 #define HHH 0x77afb
00045 
00046 struct SubRec {
00047    dabc::Buffer* buf;
00048    mbs::EventHeader* evnt;
00049    mbs::SubeventHeader* sub;
00050 };
00051 
00052 typedef std::map<int, SubRec> SubMap;
00053 
00054 class CernNov10SorterModule : public dabc::ModuleSync {
00055    protected:
00056 
00057       int fBufferSize;
00058       bool fDoWrite;
00059       bool fDoSorting;
00060       bool fOnlyFull;
00061       int bufcnt;
00062       int evntcnt;
00063       int completecnt;
00064       int lastevnt;
00065       SubMap fMap;
00066       mbs::WriteIterator fOutIter;
00067       std::string fRunName;
00068       
00069       int fLastFlushId;
00070 
00071       int fStatistic[11]; //
00072 
00073       int fPreviousEventId; // previous MBS event ID
00074       int fPreviousSyncId; // previous MBS sync ID
00075 
00076       
00077       //      std::vector<roc::Sorter*> fSort;
00078 
00079 
00080 
00081    public:
00082       CernNov10SorterModule(const char* name, dabc::Command* cmd) :
00083          dabc::ModuleSync(name, cmd)
00084       {
00085          fBufferSize = GetCfgInt(dabc::xmlBufferSize, 16*1024, cmd);
00086 
00087          fDoWrite = GetCfgBool("DoWrite", false, cmd);
00088 
00089          fDoSorting = GetCfgBool("DoSorting", false, cmd);
00090 
00091          fOnlyFull = GetCfgBool("OnlyFull", false, cmd);
00092 
00093          fRunName = GetCfgStr("RunName", "run22", cmd);
00094 
00095          CreatePoolHandle("Pool", fBufferSize);
00096     
00097          CreateInput("Input", Pool());
00098     
00099          CreateOutput("Output", Pool());
00100     
00101          bufcnt = 0;
00102          evntcnt = 0;
00103          completecnt = 0;
00104          lastevnt = 0;
00105 
00106          fPreviousEventId = -1;
00107          fPreviousSyncId = -1;
00108          
00109          fLastFlushId = -1;
00110 
00111          for (int n=0;n<11; n++) fStatistic[n] = 0;
00112 
00113 //         for (int n=0; n<3; n++)
00114 //            fSort.push_back(0);
00115       }
00116 
00117       bool DoWrite() const { return fDoWrite; }
00118 
00119       bool BuildEvent(int syncid)
00120       {
00121          unsigned sub_size(0), n_sub(0);
00122 
00123          SubMap::iterator iter;
00124 
00125          for (int key = 0; key<11; key++) {
00126             iter = fMap.find(syncid << 4 | key);
00127             if (iter == fMap.end()) continue;
00128 
00129             sub_size += iter->second.sub->FullSize();
00130 
00131             n_sub++;
00132 
00133             /*
00134             if ((key<8) || !fDoSorting) {
00135                sub_size += iter->second.sub->FullSize();
00136                continue;
00137             }
00138 
00139             int nroc = key - 8;
00140             if (fSort[nroc] == 0) {
00141                fSort[nroc] = new roc::Sorter(8192, 8192);
00142                // exclude second sync message always
00143                fSort[nroc]->addData(iter->second.sub->RawData(), iter->second.sub->RawDataSize() - 6);
00144             }
00145 
00146             SubMap::iterator next;
00147 
00148             int ntry = 1000;
00149             int nextid = evid;
00150 
00151             while (ntry-- > 0) {
00152                nextid++;
00153                next = fMap.find(nextid << 4 | key);
00154                if (next != fMap.end()) break;
00155             }
00156 
00157             if (next==fMap.end()) fSort[nroc]->flush();
00158 
00159 */
00160          }
00161 
00162          // no subevents found - skip
00163          if (sub_size==0) return true;
00164 
00165          // if OnlyFull specified, take only full events
00166          if (fOnlyFull && (n_sub<<11)) return true;
00167 
00168 
00169          if (!fOutIter.IsPlaceForEvent(sub_size)) {
00170 
00171             //DOUT0(("FLUSH buffer buf event %x", evid));
00172 
00173             Send(Output(), fOutIter.Close());
00174 
00175             dabc::Buffer* buf = TakeBuffer(Pool(), fBufferSize);
00176 
00177 //            DOUT0(("Output buffer %p size %d pool %p", buf, fBufferSize, Pool()->getPool()));
00178 
00179             fOutIter.Reset(buf);
00180 
00181             if (!fOutIter.IsPlaceForEvent(sub_size)) {
00182                EOUT(("PANIK - NO PLACE for event %x size %u buf %p", syncid, sub_size, fOutIter.buf()));
00183                return false;
00184             }
00185          }
00186 
00187          iter = fMap.find(syncid << 4);
00188 
00189          if (iter == fMap.end()) {
00190             fOutIter.NewEvent(syncid);
00191             fOutIter.evnt()->iTrigger = 7; // mark as special event for analysis
00192          } else {
00193             fOutIter.NewEvent(iter->second.evnt->EventNumber());
00194             fOutIter.evnt()->CopyHeader(iter->second.evnt);
00195          }
00196 
00197 //         if (evid == HHH)
00198 //             DOUT0((" WRITE Sync %6x Hdr: %x normal:%s", evid, fOutIter.evnt()->EventNumber(), 
00199 //             DBOOL(iter != fMap.end())));
00200 
00201 
00202          for (int key = 0; key < 11; key++) {
00203             iter = fMap.find((syncid << 4) | key);
00204             if (iter != fMap.end()) {
00205 //..              if (evid == HHH)
00206 //..                  DOUT0((" WRITE KEY Sync %6x key %d", evid, key));
00207                if (!fOutIter.AddSubevent(iter->second.sub)) {
00208                   EOUT(("ADD SUBEVNT PANIK "));
00209                   exit(1);
00210                }
00211                
00212               fMap.erase(iter);
00213             }
00214          }
00215 
00216          fOutIter.FinishEvent();
00217 
00218          return true;
00219       }
00220 
00221 
00222       void FlushBuffer(dabc::Buffer* buf)
00223       {
00224 
00225          int maxid(-1), minid(-1);
00226 
00227          SubMap::iterator iter = fMap.begin();
00228          for (; iter!=fMap.end(); iter++) {
00229             if (iter->second.buf != buf) continue;
00230 
00231             int syncid = iter->first >> 4;
00232 
00233             if (maxid<0) {
00234                maxid = syncid;
00235                minid = syncid;
00236             } else
00237             if (syncid > maxid) maxid = syncid; else
00238             if (syncid < minid) minid = syncid;
00239          }
00240 
00241          if ((maxid>0) && DoWrite())  {
00242          
00243            if (fLastFlushId<0) fLastFlushId = minid - 1;
00244 
00245 //            DOUT0(("BUFFER min 0x%6x  max 0x%6x", minid, maxid));
00246 
00247             if (maxid - fLastFlushId > 0x800000) { EOUT(("WRAP")); exit(0); }
00248 
00249             for (int syncid = fLastFlushId + 1; syncid <= maxid; syncid++)
00250                BuildEvent(syncid);
00251                
00252             fLastFlushId = maxid;
00253          }
00254 
00255          for (iter = fMap.begin(); iter!=fMap.end(); iter++) {
00256             if (iter->second.buf == buf) {
00257                EOUT(("PANIK : Still found buffer !!!"));
00258                fMap.erase(iter);
00259                exit(1);
00260             }
00261          }
00262 
00263          dabc::Buffer::Release(buf);
00264       }
00265 
00266 
00267       void ProduceBufferIndex(dabc::Buffer* buf)
00268       {
00269 //         DOUT0(("Buffer %p size %d", buf, buf->GetDataSize()));
00270 
00271 
00272          mbs::ReadIterator iter(buf);
00273          while (iter.NextEvent()) {
00274             bufcnt++;
00275 
00276             int evntid = iter.evnt()->iEventNumber;
00277 
00278 //            DOUT0(("EVENT %x", evntid));
00279 
00280             int syncid(-1), prevsyncid(-1);
00281 
00282             int numsub = 0;
00283 
00284             while (iter.NextSubEvent()) {
00285 
00286                int findkey = -1;
00287 
00288                numsub++;
00289 
00290 //               DOUT0(("   SUBEVENT %08x", iter.subevnt()->fFullId));
00291 
00292                if (iter.subevnt()->fFullId == 0x9000005) {
00293                   findkey = 0;
00294 
00295                   syncid = *((uint32_t*) iter.subevnt()->RawData());
00296 
00297 //                  if ((syncid >= 0x77a00) && (syncid<0x78000))
00298 //                      DOUT0((" MBS Eventid %8x Sync %6x", evntid, syncid));
00299 
00300                   if (syncid==0) {
00301                      if (evntid==fPreviousEventId+1) {
00302                         syncid = fPreviousSyncId + 1;
00303                         *((uint32_t*) iter.subevnt()->RawData()) = syncid;
00304                         DOUT0(("REPAIR ZERO COUNTER evnt:0x%x sync:0x%x", evntid, syncid));
00305                      } else {
00306                         EOUT(("ZERO counter - cannot repair? RAWDATA:"));
00307                         for (unsigned n=0;n<iter.subevnt()->RawDataSize();n++) {
00308                            if (n % 16 == 0) printf("\n");
00309                            if (n % 4 == 0) printf("  ");
00310                            printf("%3x", *((uint8_t*) iter.subevnt()->RawData() + n));
00311                         }
00312                         printf("\n");
00313                      }
00314                   }
00315 
00316                   if (syncid > 0xffffff) DOUT0(("SYNC EVID PANIK 0x%06x !!!", syncid));
00317 
00318                   if ((fPreviousSyncId>=0) && (syncid!=fPreviousSyncId+1))
00319                      DOUT0(("SYNC SEQ PANIK 0x%6x 0x%6x!!!", fPreviousSyncId, syncid));
00320 
00321                   fPreviousEventId = evntid;
00322                   fPreviousSyncId = syncid;
00323 
00324                   evntcnt++;
00325 
00326 
00327                } else
00328                if (iter.subevnt()->fFullId == 0x9010006) {
00329                   findkey = 1;
00330 //                  DOUT0(("   MBS MADC %06x", mapid));
00331 
00332                   if (syncid <0 ) EOUT(("AAAAAAAAA"));
00333 
00334                } else
00335                if ((iter.subevnt()->fFullId & 0xff00ffff) == 0x2000007) {
00336                   findkey = 1 + iter.subevnt()->iSubcrate;
00337 
00338                   uint8_t* data = (uint8_t*) iter.subevnt()->RawData();
00339                   int ownid = data[360] << 16 | data[361] << 8 | data[362];
00340 
00341 //                  DOUT0(("   SPADIC%d %06x", findid - 1, ownid));
00342 
00343 
00344                   if ((syncid>=0) && (syncid!=ownid)) EOUT(("Spadic PANIK 0x%06x 0x%06x", syncid, ownid));
00345 
00346                   if (syncid<0) syncid = ownid;
00347 
00348                } else
00349                if ((iter.subevnt()->fFullId & 0xff00ffff) == 0x2000001) {
00350                   findkey = 8 + iter.subevnt()->iSubcrate;
00351 
00352                   uint8_t* data = (uint8_t*) iter.subevnt()->RawData();
00353 
00354                   int ownid =  (*((uint32_t*)  (data+8)) >> 6) & 0xffffff;
00355 
00356                   int ownid2 =  (*((uint32_t*)  (data+iter.subevnt()->RawDataSize() - 4)) >> 6) & 0xffffff;
00357 
00358 //                  DOUT0(("   ROC%d %06x %06x", findid - 8, ownid, ownid2-ownid));
00359 
00360                   if (ownid2 - ownid != 1) { DOUT0(("        ROC SYNC PANIK 0x%06x 0x%06x", ownid, ownid2)); }
00361 
00362                   if ((syncid>=0) && (syncid!=ownid)) DOUT0(("ROC%d PANIK 0x%06x 0x%06x", findkey - 8, syncid, ownid));
00363 
00364                   if (syncid<0) syncid = ownid;
00365                } else {
00366                   EOUT(("Wrong subevent 0x%08x", iter.subevnt()->fFullId));
00367                   continue;
00368                }
00369 
00370                SubRec rec;
00371                rec.buf = buf;
00372                rec.evnt = iter.evnt();
00373                rec.sub = iter.subevnt();
00374 
00375                if (syncid > 0xffff00) { EOUT(("SYNCID PANIK 0x%06x !!!", syncid)); exit(1); }
00376 
00377                if ((prevsyncid>=0) && (syncid!=prevsyncid))
00378                   EOUT(("SYNCID PAINCK: sync missmatch in event %x   id1:%x id2:%x", evntid, prevsyncid, syncid));
00379 
00380                prevsyncid = syncid;
00381 
00382 //               if (mapid == HHH)
00383 //                  DOUT0((" FOUND ID %6x key %d", mapid, findid));
00384 
00385 
00386                if (DoWrite())
00387                   fMap[(syncid << 4) | findkey] = rec;
00388 
00389                fStatistic[findkey]++;
00390             }
00391 
00392             if (numsub > 10) completecnt++;
00393 
00394 //            DOUT0(("EVENT %x numsub:%d", evntid, numsub));
00395 
00396          }
00397 
00398       }
00399 
00400 
00401       virtual void MainLoop()
00402       {
00403          dabc::Buffer* buf = 0;
00404 
00405          std::list<dabc::Buffer*> queue;
00406 
00407          // just loop over all buffers, taken from file
00408 
00409          while ((buf = Recv(Input(), 5.)) != 0) {
00410 
00411             if (buf->GetTypeId() == dabc::mbt_EOF) break;
00412 
00413             // produce index for every subevent in the buffer
00414             ProduceBufferIndex(buf);
00415             queue.push_back(buf);
00416 
00417             if (queue.size() >= 500) {
00418                // now find and combine all events which are present on the buffer
00419                FlushBuffer(queue.front());
00420                queue.pop_front();
00421             }
00422          }
00423 
00424          while (queue.size()>0) {
00425             FlushBuffer(queue.front());
00426             queue.pop_front();
00427          }
00428 
00429          Send(Output(), fOutIter.Close());
00430 
00431          Send(Output(), buf);
00432 
00433          while (Output()->OutputPending() > 0) {
00434             DOUT0(("Waiting until data is written %d", Output()->OutputPending()));
00435             ProcessorSleep(1.);
00436          }
00437 
00438          ProcessorSleep(1.);
00439       }
00440 
00441 
00442       virtual void AfterModuleStop()
00443       {
00444          DOUT0(("NUM EVENTS:  total:%d  MBS:%d complete:%d", bufcnt, evntcnt, completecnt));
00445 
00446          size_t pos = fRunName.find("run");
00447          if (pos==std::string::npos) pos = 0;
00448 
00449          DOUT0(("|     Name |  File |  Comp |   MBS |  Spa1 |  Spa2 |  Spa3 |  Spa4 |  Spa5 |  Spa6 |  RICH |  STS0 |  STS1 |"));
00450          DOUT0(("| %8s | %5d | %5d | %5d | %5d | %5d | %5d | %5d | %5d | %5d | %5d | %5d | %5d |",
00451                fRunName.c_str() + pos, bufcnt, completecnt,
00452                fStatistic[1],  fStatistic[2], fStatistic[3], fStatistic[4], fStatistic[5], fStatistic[6],
00453                fStatistic[7],  fStatistic[8], fStatistic[9], fStatistic[10]));
00454 
00455          dabc::mgr()->GetApp()->Submit(new dabc::Command("CheckModulesRunning"));
00456       }
00457 };
00458 
00459 
00460 
00461 class CernNov10Factory : public dabc::Factory  {
00462    public:
00463 
00464       CernNov10Factory(const char* name) : dabc::Factory(name) {}
00465 
00466       virtual dabc::Module* CreateModule(const char* classname, const char* modulename, dabc::Command* cmd)
00467       {
00468          if (strcmp(classname,"CernNov10SorterModule")==0)
00469             return new CernNov10SorterModule(modulename, cmd);
00470 
00471          return 0;
00472       }
00473 };
00474 
00475 dabc::FactoryPlugin cernnov10(new CernNov10Factory("cern-nov10"));
00476 
00477 
00478 extern "C" void RunSorter()
00479 {
00480    dabc::mgr()->CreateMemoryPool("Pool", 500000, 1000);
00481   
00482    dabc::mgr()->CreateModule("CernNov10SorterModule", "Sorter", "SorterThrd");
00483    
00484    CernNov10SorterModule* m = (CernNov10SorterModule*) dabc::mgr()->FindModule("Sorter");
00485 
00486    if (!dabc::mgr()->CreateTransport("Sorter/Input", mbs::typeLmdInput, "SorterThrd")) exit(1);
00487      
00488    if (m->DoWrite())
00489       if (!dabc::mgr()->CreateTransport("Sorter/Output", mbs::typeLmdOutput, "SorterThrd")) exit(1);
00490 }
00491 
00492 

Generated on Tue Dec 10 2013 04:52:16 for ROCsoft by  doxygen 1.7.1