• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

newmonitor/base/ProcMgr.cxx (r3879/r3878)

Go to the documentation of this file.
00001 #include "base/ProcMgr.h"
00002 
00003 #include <stdio.h>
00004 #include <stdlib.h>
00005 #include "base/StreamProc.h"
00006 
00007 base::ProcMgr* base::ProcMgr::fInstance = 0;
00008 
00009 base::ProcMgr::ProcMgr() :
00010    fProc(),
00011    fMap(),
00012    fTriggers(),
00013    fTimeMasterIndex(DummyIndex),
00014    fRawAnalysisOnly(false)
00015 {
00016    if (fInstance==0) fInstance = this;
00017 }
00018 
00019 base::ProcMgr::~ProcMgr()
00020 {
00021    for (unsigned n=0;n<fProc.size();n++) {
00022       // printf("Delete processor %u %p\n", n, fProc[n]);
00023       delete fProc[n];
00024       fProc[n] = 0;
00025    }
00026 
00027    // printf("Delete processors done\n");
00028 
00029    fProc.clear();
00030 
00031    if (fInstance == this) fInstance = 0;
00032 }
00033 
00034 base::ProcMgr* base::ProcMgr::instance()
00035 {
00036    return fInstance;
00037 }
00038 
00039 
00040 base::ProcMgr* base::ProcMgr::AddProc(StreamProc* proc)
00041 {
00042    if (fInstance==0) return 0;
00043    fInstance->fProc.push_back(proc);
00044    return fInstance;
00045 }
00046 
00047 bool base::ProcMgr::RegisterProc(StreamProc* proc, unsigned kind, unsigned brdid)
00048 {
00049    if (proc==0) return false;
00050 
00051    bool find = false;
00052    for (unsigned n=0;n<fProc.size();n++)
00053       if (fProc[n] == proc) find = true;
00054 
00055    if (!find) return false;
00056 
00057    if (brdid>=MaxBrdId) {
00058       printf("Board id %u is too high - failure\n", brdid);
00059       exit(6);
00060    }
00061 
00062    unsigned index = kind * MaxBrdId + brdid;
00063 
00064    fMap[index] = proc;
00065 
00066    return true;
00067 }
00068 
00069 void base::ProcMgr::SetTimeSorting(bool on)
00070 {
00071    for (unsigned n=0;n<fProc.size();n++)
00072       fProc[n]->SetTimeSorting(on);
00073 
00074 }
00075 
00076 
00077 
00078 void base::ProcMgr::ProvideRawData(const Buffer& buf)
00079 {
00080    if (buf.null()) return;
00081 
00082    if (buf().boardid >= MaxBrdId) {
00083       printf("Board id %u is too high - failure\n", buf().boardid);
00084       exit(6);
00085    }
00086 
00087    unsigned index = buf().kind * MaxBrdId + buf().boardid;
00088 
00089    ProcessorsMap::iterator it = fMap.find(index);
00090 
00091    if (it == fMap.end()) return;
00092 
00093    // printf("Provide new data kind %d  board %u\n", buf().kind, buf().boardid);
00094 
00095    it->second->AddNextBuffer(buf);
00096 }
00097 
00098 void base::ProcMgr::ScanNewData()
00099 {
00100    for (unsigned n=0;n<fProc.size();n++)
00101       fProc[n]->ScanNewBuffers();
00102 }
00103 
00104 bool base::ProcMgr::SkipAllData()
00105 {
00106    for (unsigned n=0;n<fProc.size();n++)
00107       fProc[n]->SkipAllData();
00108    return true;
00109 }
00110 
00111 int base::ProcMgr::SyncIdDiff(unsigned id1, unsigned id2) const
00112 {
00113    if (id1==id2) return 0;
00114 
00115    int res = 0;
00116    int range = SyncIdRange();
00117 
00118    if (id1 < id2) {
00119       res = id2 - id1;
00120       if (res > range/2) res -= range;
00121    } else {
00122       res = id1 - id2;
00123       if (res > range/2) res = range - res;
00124                     else res = -res;
00125    }
00126 
00127    return res;
00128 }
00129 
00130 
00131 
00132 bool base::ProcMgr::AnalyzeSyncMarkers()
00133 {
00134    // TODO: configure which processor is time master
00135    // TODO: work with unsynchronized SYNC messages - not always the same id in the front
00136    // TODO: process not only last sync message
00137 
00138    if (fProc.size()==0) return false;
00139 
00140    // in the beginning decide who is time master
00141    if (fTimeMasterIndex == DummyIndex) {
00142       unsigned first = NoSyncIndex;
00143 
00144       for (unsigned n=0;n<fProc.size();n++) {
00145          if (!fProc[n]->IsSynchronisationRequired()) continue;
00146 
00147          if (first == NoSyncIndex) first = n;
00148 
00149          if (fProc[n]->doTriggerSelection()) {
00150             fTimeMasterIndex = n;
00151             break;
00152          }
00153       }
00154 
00155       if (fTimeMasterIndex == DummyIndex) fTimeMasterIndex = first;
00156    }
00157 
00158 
00159    StreamProc* master = 0;
00160 
00161    // if no synchronization at all, return
00162    if (fTimeMasterIndex == NoSyncIndex) {
00163       // printf("Ignore sync scanning completely\n");
00164       goto skip_sync_scanning;
00165    }
00166 
00167    // we require at least 2 syncs on each stream
00168    // TODO: later one can ignore optional streams here
00169    for (unsigned n=0;n<fProc.size();n++) {
00170       if (fProc[n]->IsSynchronisationRequired() && (fProc[n]->numSyncs() < 2)) {
00171           printf("No enough syncs on processor %u!!!\n", n);
00172           // exit(5);
00173           return false;
00174       }
00175    }
00176 
00177    master = fProc[fTimeMasterIndex];
00178 
00179    // if all markers in the master are validated, nothing to do
00180    while (master->fSyncScanIndex < master->numSyncs()) {
00181 
00182       SyncMarker& master_marker = master->getSync(master->fSyncScanIndex);
00183 
00184       bool is_curr_sync_ok = true;
00185 
00186       for (unsigned n=0;n<fProc.size();n++) {
00187          // do not analyze sync-master itself
00188          if (n==fTimeMasterIndex) continue;
00189 
00190          StreamProc* slave = fProc[n];
00191 
00192          slave->fSyncFlag = false;
00193 
00194          if (!slave->IsSynchronisationRequired()) continue;
00195 
00196          bool is_slave_ok = false;
00197 
00198          while (slave->fSyncScanIndex < slave->numSyncs()) {
00199 
00200             SyncMarker& slave_marker = slave->getSync(slave->fSyncScanIndex);
00201 
00202             int diff = SyncIdDiff(master_marker.uniqueid, slave_marker.uniqueid);
00203 
00204             // find same sync as master - very nice
00205             if (diff==0) {
00206                slave_marker.globaltm = master_marker.localtm;
00207                is_slave_ok = true;
00208                slave->fSyncFlag = true; // indicate that this slave has same sync
00209                break;
00210             }
00211 
00212             // master sync is bigger, slave sync must be ignored
00213             if (diff<0) {
00214                // we even remove it while no any reasonable stamp can be assigned to it
00215                printf("Erase SYNC %u in processor %u\n", slave_marker.uniqueid, n);
00216                slave->eraseSyncAt(slave->fSyncScanIndex);
00217                continue;
00218             }
00219 
00220             // slave sync id is bigger, stop analyzing, but could do calibration
00221             if (diff>0) {
00222                printf("Find hole in SYNC sequences in processor %u\n", n);
00223                is_slave_ok = true;
00224                break;
00225             }
00226          }
00227 
00228          if (!is_slave_ok) is_curr_sync_ok = false;
00229       }
00230 
00231       // if we find that on all other processors master sync is accepted,
00232       // we can declare master sync ready and shift all correspondent indexes
00233       if (!is_curr_sync_ok) break;
00234 
00235       master_marker.globaltm = master_marker.localtm;
00236       master->fSyncFlag = true;
00237 
00238       // shift scan index on the processors with the same id as master have
00239       for (unsigned n=0;n<fProc.size();n++)
00240          if (fProc[n]->fSyncFlag)
00241             fProc[n]->fSyncScanIndex++;
00242    }
00243 
00244 
00245 skip_sync_scanning:
00246 
00247    // we require at least two valid syncs on each stream
00248    for (unsigned n=0;n<fProc.size();n++) {
00249 
00250       // every processor need at least two valid syncs for time calibrations
00251       // TODO: later one can ignore optional streams here
00252 
00253       if (fProc[n]->IsSynchronisationRequired() &&
00254           (fProc[n]->fSyncScanIndex < 2)) return false;
00255 
00256       // let also assign global times for the buffers here
00257 
00258       fProc[n]->ScanNewBuffersTm();
00259    }
00260 
00261 
00262    return true;
00263 }
00264 
00265 bool base::ProcMgr::CollectNewTriggers()
00266 {
00267    // central place where triggers should be produced
00268    // in addition, we should perform flushing of data
00269    // therefore if triggers are not produced for long time,
00270    // one should create special "flush" trigger which force
00271    // flushing of the data
00272 
00273    // first collect triggers from the processors
00274    for (unsigned n=0;n<fProc.size();n++)
00275       fProc[n]->CollectTriggers(fTriggers);
00276 
00277    // create flush event when master has already two buffers and
00278    // time is reached by all sub-systems
00279    if (fTimeMasterIndex < fProc.size()) {
00280 
00281       // if we request flush time, it should be bigger than last trigger marker
00282       GlobalTime_t flush_time =
00283             fProc[fTimeMasterIndex]->ProvidePotentialFlushTime(fTriggers.size() > 0 ? fTriggers.back().globaltm : 0.);
00284 
00285       // now verify that each processor is accept such flushtime
00286       // important that every component obtained and analyzed this region,
00287       // otherwise it could happen that normal new trigger appears after such flush trigger
00288       if (flush_time != 0.)
00289          for (unsigned n=0;n<fProc.size();n++)
00290             if (!fProc[n]->VerifyFlushTime(flush_time)) { flush_time = 0.; break; }
00291 
00292       //printf("after verify %12.9f\n", flush_time*1e-9);
00293 
00294 //      flush_time = 0.;
00295 
00296       if (flush_time != 0.) {
00297          // printf("FLUSH: %12.9f\n", flush_time*1e-9);
00298          fTriggers.push_back(GlobalTriggerMarker(flush_time));
00299          fTriggers.back().isflush = true;
00300       }
00301    }
00302 
00303 //   printf("Now we have %u triggers\n", fTriggers.size());
00304 
00305    // and redistribute back global triggers list
00306    for (unsigned n=0;n<fProc.size();n++)
00307       fProc[n]->DistributeTriggers(fTriggers);
00308 
00309    return true;
00310 }
00311 
00312 
00313 bool base::ProcMgr::ScanDataForNewTriggers()
00314 {
00315    // here we want that each processor scan its data again for new triggers
00316    // which we already distribute to each processor. In fact, this could run in
00317    // individual thread of each processor
00318 
00319    for (unsigned n=0;n<fProc.size();n++)
00320       fProc[n]->ScanDataForNewTriggers();
00321 
00322    return true;
00323 }
00324 
00325 
00326 bool base::ProcMgr::ProduceNextEvent(base::Event* &evt)
00327 {
00328 //   printf("Try to produce data for %u triggers\n", fTriggers.size());
00329 
00330    // at this moment each processor should finish with buffers scanning
00331    // for special cases (like MBS or EPICS) processor itself should declare that
00332    // triggers in between are correctly filled
00333 
00334    if (IsRawAnalysis()) return false;
00335 
00336    unsigned numready = fTriggers.size();
00337 
00338    for (unsigned n=0;n<fProc.size();n++) {
00339       unsigned local = fProc[n]->NumReadySubevents();
00340       if (local<numready) numready = local;
00341    }
00342 
00343    while (numready > 0) {
00344 
00345       //   printf("Total event %u ready %u  next trigger %6.3f\n", fTriggers.size(), numready,
00346       //         fTriggers.size() > 0 ? fTriggers[0].globaltm*1e-9 : 0.);
00347 
00348       if (fTriggers[0].isflush) {
00349          // printf("Remove flush event %12.9f\n", fTriggers[0].globaltm*1e-9);
00350          for (unsigned n=0;n<fProc.size();n++)
00351             fProc[n]->AppendSubevent(0);
00352          fTriggers.erase(fTriggers.begin());
00353          numready--;
00354          continue;
00355       }
00356 
00357       if (evt==0)
00358          evt = new base::Event;
00359       else
00360          evt->DestroyEvents();
00361 
00362       evt->SetTriggerTime(fTriggers[0].globaltm);
00363 
00364       // here all subevents from first event are collected
00365       // at the same time event should be removed from all local/global lists
00366       for (unsigned n=0;n<fProc.size();n++)
00367          fProc[n]->AppendSubevent(evt);
00368       fTriggers.erase(fTriggers.begin());
00369 
00370       //   printf("PRODUCE EVENT\n");
00371       return true;
00372    }
00373 
00374    return false;
00375 }

Generated on Thu Dec 13 2012 04:52:22 for ROCsoft by  doxygen 1.7.1