Go to the documentation of this file.00001 #include "base/ProcMgr.h"
00002
00003 #include <stdio.h>
00004 #include <stdlib.h>
00005 #include "base/StreamProc.h"
00006
00007 base::ProcMgr* base::ProcMgr::fInstance = 0;
00008
00009 base::ProcMgr::ProcMgr() :
00010 fProc(),
00011 fMap(),
00012 fTriggers(),
00013 fTimeMasterIndex(DummyIndex),
00014 fRawAnalysisOnly(false)
00015 {
00016 if (fInstance==0) fInstance = this;
00017 }
00018
00019 base::ProcMgr::~ProcMgr()
00020 {
00021 for (unsigned n=0;n<fProc.size();n++) {
00022
00023 delete fProc[n];
00024 fProc[n] = 0;
00025 }
00026
00027
00028
00029 fProc.clear();
00030
00031 if (fInstance == this) fInstance = 0;
00032 }
00033
00034 base::ProcMgr* base::ProcMgr::instance()
00035 {
00036 return fInstance;
00037 }
00038
00039
00040 base::ProcMgr* base::ProcMgr::AddProc(StreamProc* proc)
00041 {
00042 if (fInstance==0) return 0;
00043 fInstance->fProc.push_back(proc);
00044 return fInstance;
00045 }
00046
00047 bool base::ProcMgr::RegisterProc(StreamProc* proc, unsigned kind, unsigned brdid)
00048 {
00049 if (proc==0) return false;
00050
00051 bool find = false;
00052 for (unsigned n=0;n<fProc.size();n++)
00053 if (fProc[n] == proc) find = true;
00054
00055 if (!find) return false;
00056
00057 if (brdid>=MaxBrdId) {
00058 printf("Board id %u is too high - failure\n", brdid);
00059 exit(6);
00060 }
00061
00062 unsigned index = kind * MaxBrdId + brdid;
00063
00064 fMap[index] = proc;
00065
00066 return true;
00067 }
00068
00069 void base::ProcMgr::SetTimeSorting(bool on)
00070 {
00071 for (unsigned n=0;n<fProc.size();n++)
00072 fProc[n]->SetTimeSorting(on);
00073
00074 }
00075
00076
00077
00078 void base::ProcMgr::ProvideRawData(const Buffer& buf)
00079 {
00080 if (buf.null()) return;
00081
00082 if (buf().boardid >= MaxBrdId) {
00083 printf("Board id %u is too high - failure\n", buf().boardid);
00084 exit(6);
00085 }
00086
00087 unsigned index = buf().kind * MaxBrdId + buf().boardid;
00088
00089 ProcessorsMap::iterator it = fMap.find(index);
00090
00091 if (it == fMap.end()) return;
00092
00093
00094
00095 it->second->AddNextBuffer(buf);
00096 }
00097
00098 void base::ProcMgr::ScanNewData()
00099 {
00100 for (unsigned n=0;n<fProc.size();n++)
00101 fProc[n]->ScanNewBuffers();
00102 }
00103
00104 bool base::ProcMgr::SkipAllData()
00105 {
00106 for (unsigned n=0;n<fProc.size();n++)
00107 fProc[n]->SkipAllData();
00108 return true;
00109 }
00110
00111 int base::ProcMgr::SyncIdDiff(unsigned id1, unsigned id2) const
00112 {
00113 if (id1==id2) return 0;
00114
00115 int res = 0;
00116 int range = SyncIdRange();
00117
00118 if (id1 < id2) {
00119 res = id2 - id1;
00120 if (res > range/2) res -= range;
00121 } else {
00122 res = id1 - id2;
00123 if (res > range/2) res = range - res;
00124 else res = -res;
00125 }
00126
00127 return res;
00128 }
00129
00130
00131
00132 bool base::ProcMgr::AnalyzeSyncMarkers()
00133 {
00134
00135
00136
00137
00138 if (fProc.size()==0) return false;
00139
00140
00141 if (fTimeMasterIndex == DummyIndex) {
00142 unsigned first = NoSyncIndex;
00143
00144 for (unsigned n=0;n<fProc.size();n++) {
00145 if (!fProc[n]->IsSynchronisationRequired()) continue;
00146
00147 if (first == NoSyncIndex) first = n;
00148
00149 if (fProc[n]->doTriggerSelection()) {
00150 fTimeMasterIndex = n;
00151 break;
00152 }
00153 }
00154
00155 if (fTimeMasterIndex == DummyIndex) fTimeMasterIndex = first;
00156 }
00157
00158
00159 StreamProc* master = 0;
00160
00161
00162 if (fTimeMasterIndex == NoSyncIndex) {
00163
00164 goto skip_sync_scanning;
00165 }
00166
00167
00168
00169 for (unsigned n=0;n<fProc.size();n++) {
00170 if (fProc[n]->IsSynchronisationRequired() && (fProc[n]->numSyncs() < 2)) {
00171 printf("No enough syncs on processor %u!!!\n", n);
00172
00173 return false;
00174 }
00175 }
00176
00177 master = fProc[fTimeMasterIndex];
00178
00179
00180 while (master->fSyncScanIndex < master->numSyncs()) {
00181
00182 SyncMarker& master_marker = master->getSync(master->fSyncScanIndex);
00183
00184 bool is_curr_sync_ok = true;
00185
00186 for (unsigned n=0;n<fProc.size();n++) {
00187
00188 if (n==fTimeMasterIndex) continue;
00189
00190 StreamProc* slave = fProc[n];
00191
00192 slave->fSyncFlag = false;
00193
00194 if (!slave->IsSynchronisationRequired()) continue;
00195
00196 bool is_slave_ok = false;
00197
00198 while (slave->fSyncScanIndex < slave->numSyncs()) {
00199
00200 SyncMarker& slave_marker = slave->getSync(slave->fSyncScanIndex);
00201
00202 int diff = SyncIdDiff(master_marker.uniqueid, slave_marker.uniqueid);
00203
00204
00205 if (diff==0) {
00206 slave_marker.globaltm = master_marker.localtm;
00207 is_slave_ok = true;
00208 slave->fSyncFlag = true;
00209 break;
00210 }
00211
00212
00213 if (diff<0) {
00214
00215 printf("Erase SYNC %u in processor %u\n", slave_marker.uniqueid, n);
00216 slave->eraseSyncAt(slave->fSyncScanIndex);
00217 continue;
00218 }
00219
00220
00221 if (diff>0) {
00222 printf("Find hole in SYNC sequences in processor %u\n", n);
00223 is_slave_ok = true;
00224 break;
00225 }
00226 }
00227
00228 if (!is_slave_ok) is_curr_sync_ok = false;
00229 }
00230
00231
00232
00233 if (!is_curr_sync_ok) break;
00234
00235 master_marker.globaltm = master_marker.localtm;
00236 master->fSyncFlag = true;
00237
00238
00239 for (unsigned n=0;n<fProc.size();n++)
00240 if (fProc[n]->fSyncFlag)
00241 fProc[n]->fSyncScanIndex++;
00242 }
00243
00244
00245 skip_sync_scanning:
00246
00247
00248 for (unsigned n=0;n<fProc.size();n++) {
00249
00250
00251
00252
00253 if (fProc[n]->IsSynchronisationRequired() &&
00254 (fProc[n]->fSyncScanIndex < 2)) return false;
00255
00256
00257
00258 fProc[n]->ScanNewBuffersTm();
00259 }
00260
00261
00262 return true;
00263 }
00264
00265 bool base::ProcMgr::CollectNewTriggers()
00266 {
00267
00268
00269
00270
00271
00272
00273
00274 for (unsigned n=0;n<fProc.size();n++)
00275 fProc[n]->CollectTriggers(fTriggers);
00276
00277
00278
00279 if (fTimeMasterIndex < fProc.size()) {
00280
00281
00282 GlobalTime_t flush_time =
00283 fProc[fTimeMasterIndex]->ProvidePotentialFlushTime(fTriggers.size() > 0 ? fTriggers.back().globaltm : 0.);
00284
00285
00286
00287
00288 if (flush_time != 0.)
00289 for (unsigned n=0;n<fProc.size();n++)
00290 if (!fProc[n]->VerifyFlushTime(flush_time)) { flush_time = 0.; break; }
00291
00292
00293
00294
00295
00296 if (flush_time != 0.) {
00297
00298 fTriggers.push_back(GlobalTriggerMarker(flush_time));
00299 fTriggers.back().isflush = true;
00300 }
00301 }
00302
00303
00304
00305
00306 for (unsigned n=0;n<fProc.size();n++)
00307 fProc[n]->DistributeTriggers(fTriggers);
00308
00309 return true;
00310 }
00311
00312
00313 bool base::ProcMgr::ScanDataForNewTriggers()
00314 {
00315
00316
00317
00318
00319 for (unsigned n=0;n<fProc.size();n++)
00320 fProc[n]->ScanDataForNewTriggers();
00321
00322 return true;
00323 }
00324
00325
00326 bool base::ProcMgr::ProduceNextEvent(base::Event* &evt)
00327 {
00328
00329
00330
00331
00332
00333
00334 if (IsRawAnalysis()) return false;
00335
00336 unsigned numready = fTriggers.size();
00337
00338 for (unsigned n=0;n<fProc.size();n++) {
00339 unsigned local = fProc[n]->NumReadySubevents();
00340 if (local<numready) numready = local;
00341 }
00342
00343 while (numready > 0) {
00344
00345
00346
00347
00348 if (fTriggers[0].isflush) {
00349
00350 for (unsigned n=0;n<fProc.size();n++)
00351 fProc[n]->AppendSubevent(0);
00352 fTriggers.erase(fTriggers.begin());
00353 numready--;
00354 continue;
00355 }
00356
00357 if (evt==0)
00358 evt = new base::Event;
00359 else
00360 evt->DestroyEvents();
00361
00362 evt->SetTriggerTime(fTriggers[0].globaltm);
00363
00364
00365
00366 for (unsigned n=0;n<fProc.size();n++)
00367 fProc[n]->AppendSubevent(evt);
00368 fTriggers.erase(fTriggers.begin());
00369
00370
00371 return true;
00372 }
00373
00374 return false;
00375 }