00001 #include "base/StreamProc.h"
00002
00003 #include <stdio.h>
00004 #include <stdlib.h>
00005 #include <math.h>
00006
00007 #include "base/ProcMgr.h"
00008 #include "base/Event.h"
00009
00010 void base::GlobalTriggerMarker::SetInterval(double left, double right)
00011 {
00012 if (left>right) {
00013 printf("left > right in time interval - failure\n");
00014 exit(7);
00015 }
00016
00017 lefttm = globaltm + left;
00018 righttm = globaltm + right;
00019 }
00020
00021 int base::GlobalTriggerMarker::TestHitTime(const GlobalTime_t& hittime, double* dist)
00022 {
00023
00024
00025
00026 if (dist) *dist = 0.;
00027 if (hittime < lefttm) {
00028 if (dist) *dist = hittime - lefttm;
00029 return -1;
00030 } else
00031 if (hittime >= righttm) {
00032 if (dist) *dist = hittime - righttm;
00033 return 1;
00034 }
00035 return 0;
00036 }
00037
00038
00039
00040
00041 base::StreamProc::StreamProc(const char* name, int indx) :
00042 fName(name),
00043 fMgr(0),
00044 fQueue(),
00045 fQueueScanIndex(0),
00046 fQueueScanIndexTm(0),
00047 fIsSynchronisationRequired(true),
00048 fSyncs(),
00049 fSyncScanIndex(0),
00050 fGlobalTrig(),
00051 fGlobalTrigScanIndex(0),
00052 fTimeSorting(false),
00053 fPrefix(),
00054 fSubPrefixD(),
00055 fSubPrefixN()
00056 {
00057 if (indx>=0) {
00058 char sbuf[100];
00059 snprintf(sbuf, sizeof(sbuf),"%d",indx);
00060 fName.append(sbuf);
00061 }
00062
00063 fPrefix = fName;
00064
00065 fMgr = base::ProcMgr::AddProc(this);
00066
00067 fTriggerTm = MakeH1("TriggerTm", "Time relative to trigger", 2500, -1000., 4000., "ns");
00068 fMultipl = MakeH1("Multipl", "Subevent multiplicity", 40, 0., 40., "hits");
00069 triggerWindow = MakeC1("TrWindow", 500, 1000, fTriggerTm);
00070 }
00071
00072
00073 base::StreamProc::~StreamProc()
00074 {
00075
00076 fSyncs.clear();
00077 fQueue.clear();
00078 fLocalTrig.clear();
00079
00080 fGlobalTrig.clear();
00081
00082 }
00083
00084 void base::StreamProc::SetSubPrefix(const char* name, int indx, const char* subname2, int indx2)
00085 {
00086 if ((name==0) || (*name==0)) {
00087 fSubPrefixD.clear();
00088 fSubPrefixN.clear();
00089 return;
00090 }
00091
00092 fSubPrefixN = name;
00093 fSubPrefixD = name;
00094 if (indx>=0) {
00095 char sbuf[100];
00096 snprintf(sbuf,sizeof(sbuf), "%d", indx);
00097 fSubPrefixD.append(sbuf);
00098 fSubPrefixN.append(sbuf);
00099 }
00100 fSubPrefixD.append("/");
00101 fSubPrefixN.append("_");
00102
00103 if ((subname2!=0) && (*subname2!=0)) {
00104 fSubPrefixD.append(subname2);
00105 fSubPrefixN.append(subname2);
00106 if (indx2>=0) {
00107 char sbuf[100];
00108 snprintf(sbuf,sizeof(sbuf), "%d", indx2);
00109 fSubPrefixD.append(sbuf);
00110 fSubPrefixN.append(sbuf);
00111 }
00112 fSubPrefixD.append("/");
00113 fSubPrefixN.append("_");
00114 }
00115
00116
00117 }
00118
00119
00120 base::H1handle base::StreamProc::MakeH1(const char* name, const char* title, int nbins, double left, double right, const char* xtitle)
00121 {
00122 std::string hname = fPrefix + "/";
00123 if (!fSubPrefixD.empty()) hname += fSubPrefixD;
00124 hname += fPrefix + "_";
00125 if (!fSubPrefixN.empty()) hname += fSubPrefixN;
00126 hname.append(name);
00127
00128 std::string htitle = fName;
00129 htitle.append(" ");
00130 if (!fSubPrefixN.empty()) htitle += fSubPrefixN + " ";
00131 htitle.append(title);
00132
00133 return mgr()->MakeH1(hname.c_str(), htitle.c_str(), nbins, left, right, xtitle);
00134 }
00135
00136
00137 void base::StreamProc::FillH1(H1handle h1, double x, double weight)
00138 {
00139 mgr()->FillH1(h1, x, weight);
00140 }
00141
00142 base::H2handle base::StreamProc::MakeH2(const char* name, const char* title, int nbins1, double left1, double right1, int nbins2, double left2, double right2, const char* options)
00143 {
00144 std::string hname = fPrefix + "/";
00145 if (!fSubPrefixD.empty()) hname += fSubPrefixD;
00146 hname += fPrefix + "_";
00147 if (!fSubPrefixN.empty()) hname += fSubPrefixN;
00148 hname.append(name);
00149
00150 std::string htitle = fName;
00151 htitle.append(" ");
00152 if (!fSubPrefixN.empty()) htitle += fSubPrefixN + " ";
00153 htitle.append(title);
00154
00155 return mgr()->MakeH2(hname.c_str(), htitle.c_str(), nbins1, left1, right1, nbins2, left2, right2, options);
00156 }
00157
00158 void base::StreamProc::FillH2(H1handle h2, double x, double y, double weight)
00159 {
00160 mgr()->FillH2(h2, x, y, weight);
00161 }
00162
00163
00164 base::C1handle base::StreamProc::MakeC1(const char* name, double left, double right, H1handle h1)
00165 {
00166 std::string cname = fPrefix + "/";
00167 if (!fSubPrefixD.empty()) cname += fSubPrefixD;
00168 cname += fPrefix + "_";
00169 if (!fSubPrefixN.empty()) cname += fSubPrefixN;
00170 cname.append(name);
00171
00172 return mgr()->MakeC1(cname.c_str(), left, right, h1);
00173 }
00174
00175
00176 void base::StreamProc::ChangeC1(C1handle c1, double left, double right)
00177 {
00178 mgr()->ChangeC1(c1, left, right);
00179 }
00180
00181
00182 int base::StreamProc::TestC1(C1handle c1, double value, double* dist)
00183 {
00184 return mgr()->TestC1(c1, value, dist);
00185 }
00186
00187
00188 double base::StreamProc::GetC1Limit(C1handle c1, bool isleft)
00189 {
00190 return mgr()->GetC1Limit(c1, isleft);
00191 }
00192
00193 bool base::StreamProc::eraseSyncAt(unsigned indx)
00194 {
00195 if (indx < fSyncs.size()) {
00196 fSyncs.erase(fSyncs.begin() + indx);
00197 if (fSyncScanIndex>indx) fSyncScanIndex--;
00198 return true;
00199 }
00200 return false;
00201 }
00202
00203 base::GlobalTime_t base::StreamProc::LocalToGlobalTime(base::GlobalTime_t localtm, unsigned* indx)
00204 {
00205 if (!IsSynchronisationRequired()) return localtm;
00206
00207 if (numSyncs() == 0) {
00208 printf("No any sync for time calibration\n");
00209 exit(7);
00210 return 0.;
00211 }
00212
00213 if ((numSyncs()>0) && (numSyncs() % 1000 == 0)) {
00214 printf("Too much syncs %u - something wrong??\n", numSyncs());
00215 }
00216
00217
00218 if (numSyncs()>1) {
00219
00220 if ((indx!=0) && (*indx < numSyncs()-1) ) {
00221 double dist1 = local_time_dist(getSync(*indx).localtm, localtm);
00222 double dist2 = local_time_dist(localtm, getSync(*indx+1).localtm);
00223
00224 if ((dist1>=0.) && (dist2>0)) {
00225
00226 double diff1 = dist1 / (dist1 + dist2) * (getSync(*indx+1).globaltm - getSync(*indx).globaltm);
00227
00228
00229 if ((dist1>0) && ((diff1/dist1 < 0.9) || (diff1/dist1 > 1.1))) {
00230 printf("Simothing wrong with time calc %8.6f %8.6f\n", dist1, diff1);
00231 exit(1);
00232 }
00233
00234
00235 return getSync(*indx).globaltm + diff1;
00236 }
00237 }
00238
00239 for (unsigned n=0; n<numSyncs()-1; n++) {
00240 double dist1 = local_time_dist(getSync(n).localtm, localtm);
00241 double dist2 = local_time_dist(localtm, getSync(n+1).localtm);
00242
00243 if ((dist1>=0.) && (dist2>0)) {
00244
00245 double diff1 = dist1 / (dist1 + dist2) * (getSync(n+1).globaltm - getSync(n).globaltm);
00246
00247
00248 if ((dist1>0) && ((diff1/dist1 < 0.9) || (diff1/dist1 > 1.1))) {
00249 printf("Something wrong with time calc %8.6f %8.6f\n", dist1, diff1);
00250 exit(1);
00251 }
00252
00253
00254 return getSync(n).globaltm + diff1;
00255
00256 }
00257 }
00258 }
00259
00260
00261
00262
00263 double dist1 = local_time_dist(getSync(0).localtm, localtm);
00264 double dist2 = local_time_dist(getSync(numSyncs()-1).localtm, localtm);
00265
00266 if (fabs(dist1) < fabs(dist2))
00267 return getSync(0).globaltm + dist1;
00268
00269 return getSync(numSyncs()-1).globaltm + dist2;
00270 }
00271
00272
00273 bool base::StreamProc::AddNextBuffer(const Buffer& buf)
00274 {
00275 fQueue.push_back(buf);
00276
00277 return true;
00278 }
00279
00280 bool base::StreamProc::ScanNewBuffers()
00281 {
00282
00283
00284 while (fQueueScanIndex < fQueue.size()) {
00285 FirstBufferScan(fQueue[fQueueScanIndex]);
00286 fQueueScanIndex++;
00287 }
00288
00289 return true;
00290 }
00291
00292 void base::StreamProc::SkipAllData()
00293 {
00294 fQueueScanIndexTm = fQueue.size();
00295
00296 SkipBuffers(fQueue.size());
00297
00298 fLocalTrig.clear();
00299 fGlobalTrig.clear();
00300
00301 fSyncs.clear();
00302 fSyncScanIndex = 0;
00303 }
00304
00305
00306 bool base::StreamProc::ScanNewBuffersTm()
00307 {
00308
00309
00310
00311 unsigned scan_limit = fQueue.size();
00312 if (fSyncScanIndex < fSyncs.size())
00313 scan_limit = fSyncs[fSyncScanIndex].bufid;
00314
00315 while (fQueueScanIndexTm < scan_limit) {
00316 fQueue[fQueueScanIndexTm]().global_tm = LocalToGlobalTime(fQueue[fQueueScanIndexTm]().local_tm);
00317
00318 fQueueScanIndexTm++;
00319 }
00320
00321 return true;
00322 }
00323
00324
00325 base::GlobalTime_t base::StreamProc::ProvidePotentialFlushTime(GlobalTime_t last_marker)
00326 {
00327
00328
00329
00330 if (fQueueScanIndexTm<3) return 0.;
00331
00332 for (unsigned n=1; n<fQueueScanIndexTm-2; n++)
00333 if (fQueue[n].rec().global_tm > last_marker) return fQueue[n].rec().global_tm;
00334
00335 return 0.;
00336 }
00337
00338 bool base::StreamProc::VerifyFlushTime(const base::GlobalTime_t& flush_time)
00339 {
00340
00341
00342
00343 if ((flush_time==0.) || (fQueueScanIndexTm<2)) return false;
00344
00345 for (unsigned n=0;n<fQueueScanIndexTm-1;n++)
00346 if (fQueue[n].rec().global_tm > flush_time) return true;
00347
00348 return true;
00349 }
00350
00351
00352 void base::StreamProc::AddSyncMarker(base::SyncMarker& marker)
00353 {
00354 if (!IsSynchronisationRequired()) {
00355 printf("No sync should be supplied !!!!!\n");
00356 exit(5);
00357 }
00358
00359 marker.globaltm = 0.;
00360 marker.bufid = fQueueScanIndex;
00361 fSyncs.push_back(marker);
00362 }
00363
00364
00365 void base::StreamProc::AddTriggerMarker(LocalTriggerMarker& marker)
00366 {
00367 marker.bufid = fQueueScanIndex;
00368 fLocalTrig.push_back(marker);
00369 }
00370
00371
00372 bool base::StreamProc::SkipBuffers(unsigned num_skip)
00373 {
00374 if (num_skip > fQueue.size()) num_skip = fQueue.size();
00375
00376 if (num_skip==0) return false;
00377
00378 fQueue.erase(fQueue.begin(), fQueue.begin() + num_skip);
00379
00380
00381 while ((fSyncs.size()>1) && (fSyncs[0].bufid<num_skip)) {
00382 fSyncs.erase(fSyncs.begin());
00383 if (fSyncScanIndex>0) fSyncScanIndex--;
00384 }
00385
00386 for (unsigned n=0;n<fSyncs.size();n++)
00387 if (fSyncs[n].bufid>num_skip)
00388 fSyncs[n].bufid-=num_skip;
00389 else
00390 fSyncs[n].bufid = 0;
00391
00392
00393
00394
00395 while ((fLocalTrig.size()>0) && (fLocalTrig[0].bufid<num_skip))
00396 fLocalTrig.erase(fLocalTrig.begin());
00397
00398 for (unsigned n=0;n<fLocalTrig.size();n++)
00399 fLocalTrig[n].bufid-=num_skip;
00400
00401 if (fQueueScanIndex>=num_skip) {
00402 fQueueScanIndex-=num_skip;
00403 } else {
00404 fQueueScanIndex = 0;
00405 printf("!!! Problem with skipping and fQueueScanIndex !!!\n");
00406 exit(7);
00407 }
00408
00409 if (fQueueScanIndexTm>=num_skip) {
00410 fQueueScanIndexTm-=num_skip;
00411 } else {
00412 fQueueScanIndexTm = 0;
00413 printf("!!! Problem with skipping and fQueueScanIndexTm !!!\n");
00414 exit(7);
00415 }
00416
00417 return true;
00418 }
00419
00420 bool base::StreamProc::CollectTriggers(GlobalTriggerMarksQueue& trigs)
00421 {
00422
00423
00424
00425 unsigned num_trig(0);
00426
00427 for (unsigned n=0;n<fLocalTrig.size();n++) {
00428
00429
00430
00431 if (fLocalTrig[n].bufid >= fQueueScanIndexTm) break;
00432
00433 num_trig++;
00434
00435 GlobalTriggerMarker marker;
00436
00437 marker.globaltm = LocalToGlobalTime(fLocalTrig[n].localtm);
00438
00439
00440
00441 trigs.push_back(marker);
00442 }
00443
00444 if (num_trig == fLocalTrig.size())
00445 fLocalTrig.clear();
00446 else
00447 fLocalTrig.erase(fLocalTrig.begin(), fLocalTrig.begin()+num_trig);
00448
00449 return true;
00450 }
00451
00452 bool base::StreamProc::DistributeTriggers(const base::GlobalTriggerMarksQueue& queue)
00453 {
00454
00455
00456
00457 while (fGlobalTrig.size() < queue.size()) {
00458 unsigned indx = fGlobalTrig.size();
00459
00460 fGlobalTrig.push_back(base::GlobalTriggerMarker(queue[indx]));
00461
00462 fGlobalTrig.back().SetInterval(GetC1Limit(triggerWindow, true), GetC1Limit(triggerWindow, false));
00463
00464 }
00465
00466
00467
00468
00469
00470 return true;
00471 }
00472
00473
00474 bool base::StreamProc::AppendSubevent(base::Event* evt)
00475 {
00476 if (fGlobalTrig.size()==0) {
00477 printf("global trigger queue empty !!!\n");
00478 exit(14);
00479 return false;
00480 }
00481
00482 if (fGlobalTrig[0].normal())
00483 FillH1(fMultipl, GetTriggerMultipl(0));
00484
00485 if (fGlobalTrig[0].subev!=0) {
00486 if (evt!=0) {
00487 if (IsTimeSorting()) SortDataInSubEvent(fGlobalTrig[0].subev);
00488 evt->AddSubEvent(GetProcName(), fGlobalTrig[0].subev);
00489 } else {
00490 printf("Something wrong - subevent could not be assigned normal %d!!!!\n", fGlobalTrig[0].normal());
00491 delete fGlobalTrig[0].subev;
00492 }
00493 fGlobalTrig[0].subev = 0;
00494 }
00495
00496 fGlobalTrig.erase(fGlobalTrig.begin());
00497 if (fGlobalTrigScanIndex==0) {
00498 printf("Index of ready event is 0 - how to understand???\n");
00499 exit(12);
00500 } else {
00501 fGlobalTrigScanIndex--;
00502 }
00503
00504
00505
00506
00507
00508
00509 return true;
00510 }
00511
00512
00513 unsigned base::StreamProc::TestHitTime(const base::GlobalTime_t& hittime, bool normal_hit)
00514 {
00515 double dist(0.), best_dist(-1e15), best_trigertm(-1e15);
00516
00517 unsigned res_trigindx = fGlobalTrig.size();
00518
00519 for (unsigned indx=fGlobalTrigScanIndex; indx<fGlobalTrigRightIndex; indx++) {
00520
00521 if (indx>=fGlobalTrig.size()) {
00522 printf("ALARM!!!!\n");
00523 exit(10);
00524 }
00525
00526 int test = fGlobalTrig[indx].TestHitTime(hittime, &dist);
00527
00528
00529 if (fGlobalTrig[indx].normal() && (fabs(best_dist) > fabs(dist))) {
00530 best_dist = dist;
00531 best_trigertm = hittime - fGlobalTrig[indx].globaltm;
00532 }
00533
00534 if (test==0) {
00535 res_trigindx = indx;
00536 break;
00537 }
00538
00539 if (test>0) {
00540
00541
00542
00543
00544
00545 if (dist>MaximumDisorderTm()) {
00546 if (indx==fGlobalTrigScanIndex) {
00547
00548 fGlobalTrigScanIndex++;
00549
00550 } else {
00551 printf("Something completely wrong indx:%u %12.9f left:%u %12.9f - check \n",
00552 indx, fGlobalTrig[indx].globaltm*1e-9,
00553 fGlobalTrigScanIndex, fGlobalTrig[fGlobalTrigScanIndex].globaltm*1e-9);
00554 exit(17);
00555 }
00556 }
00557 } else {
00558
00559
00560 }
00561 }
00562
00563 if (normal_hit && (best_trigertm>-1e15)) {
00564 FillH1(fTriggerTm, best_trigertm);
00565
00566 }
00567
00568 return normal_hit ? res_trigindx : fGlobalTrig.size();
00569 }
00570
00571 bool base::StreamProc::ScanDataForNewTriggers()
00572 {
00573
00574
00575
00576
00577
00578
00579
00580
00581
00582 if (fQueueScanIndexTm < 2) return true;
00583
00584
00585 if (fGlobalTrig.size() == 0) return true;
00586
00587
00588 fGlobalTrigRightIndex = fGlobalTrigScanIndex;
00589
00590 if (fGlobalTrig.size() > 1) {
00591
00592
00593 double trigger_time_limit = fQueue[fQueueScanIndexTm-1].rec().global_tm - GetC1Limit(triggerWindow, false) - MaximumDisorderTm();
00594
00595 while (fGlobalTrigRightIndex < fGlobalTrig.size()-1) {
00596 if (fGlobalTrig[fGlobalTrigRightIndex].globaltm > trigger_time_limit) break;
00597 fGlobalTrigRightIndex++;
00598 }
00599 }
00600
00601 if (fGlobalTrigRightIndex==0) {
00602 printf("No triggers are select for scanning\n");
00603 return true;
00604 }
00605
00606
00607
00608
00609 unsigned upper_buf_limit = 0;
00610
00611 double buffer_timeboundary = fGlobalTrig[fGlobalTrigRightIndex-1].globaltm + GetC1Limit(triggerWindow, true) - MaximumDisorderTm();
00612
00613 while (upper_buf_limit < fQueueScanIndexTm - 1) {
00614
00615
00616 if (fQueue[upper_buf_limit+1].rec().global_tm > buffer_timeboundary) break;
00617
00618 upper_buf_limit++;
00619 }
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629 if (upper_buf_limit==0) {
00630
00631
00632 return true;
00633 }
00634
00635
00636
00637
00638
00639
00640
00641 for (unsigned nbuf=0; nbuf<upper_buf_limit; nbuf++) {
00642
00643
00644 if (nbuf>=fQueue.size()) {
00645 printf("Something went wrong\n");
00646 exit(11);
00647 }
00648
00649 SecondBufferScan(fQueue[nbuf]);
00650 }
00651
00652
00653
00654 SkipBuffers(upper_buf_limit);
00655
00656
00657
00658 return true;
00659 }