00001 #include "roc/Sorter.h"
00002
00003 #include "roc/Iterator.h"
00004
00005 #include <stdio.h>
00006 #include <string.h>
00007 #include <stdlib.h>
00008
00009 roc::Sorter::Sorter(unsigned max_inp_buf,
00010 unsigned intern_out_buf,
00011 int intern_out_fmt,
00012 unsigned intbuf)
00013 {
00014 fRocId = 0;
00015
00016 fIntBufMaxSize = intbuf;
00017 fExtBufMaxSize = max_inp_buf;
00018
00019 fIntBuf = (roc::Message*) malloc(fIntBufMaxSize*sizeof(roc::Message));
00020 fIntBufTimes = new uint32_t[fIntBufMaxSize + fExtBufMaxSize];
00021
00022 fIntBufSize = 0;
00023 fIntBufFrontEpoch = 0;
00024 fIsIntBufFrontEpoch = false;
00025 fIntBufCurrEpoch = 0;
00026 fIntBufCurrRealEpoch = 0;
00027 fIntBuffCurrMaxTm = 0;
00028
00029 fAccumMissed = 0;
00030 fLastOutEpoch = 0;
00031
00032 fInternOutBufSize = intern_out_buf;
00033 fInternOutFormat = intern_out_fmt;
00034 fInternOutBuf = fInternOutBufSize==0 ? 0 : malloc(fInternOutBufSize * roc::Message::RawSize(fInternOutFormat));
00035
00036 fFillBuffer = fInternOutBuf;
00037 fFillTotalSize = fInternOutBufSize;
00038 fFillSize = 0;
00039 fFillFormat = fInternOutFormat;
00040 }
00041
00042 roc::Sorter::~Sorter()
00043 {
00044 free(fIntBuf); fIntBuf = 0;
00045 free(fInternOutBuf); fInternOutBuf = 0;
00046 delete [] fIntBufTimes;
00047 }
00048
00049 bool roc::Sorter::startFill(void* buf, unsigned totalsize, int fmt)
00050 {
00051 if (fFillSize!=0) {
00052 printf("There is %u messages in output, cannot start !!!\n", fFillSize);
00053 return false;
00054 }
00055
00056 unsigned msg_size = roc::Message::RawSize(fmt);
00057
00058 if (totalsize < 243*msg_size) {
00059 printf("roc::Sorter::startFill provided buffer size %u too small\n", totalsize);
00060 return false;
00061 }
00062
00063 fFillBuffer = buf;
00064 fFillTotalSize = totalsize / msg_size;
00065 fFillSize = 0;
00066 fFillFormat = fmt;
00067
00068 return true;
00069 }
00070
00071 void roc::Sorter::cleanBuffers()
00072 {
00073
00074 fIntBufSize = 0;
00075
00076
00077 fFillSize = 0;
00078
00079
00080 fIntBufFrontEpoch = 0;
00081 fIsIntBufFrontEpoch = false;
00082 fIntBufCurrEpoch = 0;
00083 fIntBufCurrRealEpoch = 0;
00084 fIntBuffCurrMaxTm = 0;
00085
00086 fAccumMissed = 0;
00087 fLastOutEpoch = 0;
00088 }
00089
00090
00091 void roc::Sorter::stopFill()
00092 {
00093 fFillBuffer = fInternOutBuf;
00094 fFillTotalSize = fInternOutBufSize;
00095 fFillSize = 0;
00096 fFillFormat = fInternOutFormat;
00097 }
00098
00099 bool roc::Sorter::checkLastEpoch(roc::Message* data, unsigned indx, const roc::Iterator& iter)
00100 {
00101 if (indx==0) return false;
00102
00103 uint8_t lastid = data->getNxChNum();
00104 unsigned revindx = indx;
00105
00106 unsigned loopcnt = 0;
00107 unsigned fifocnt = 1;
00108
00109 uint32_t hittime = (fIntBufCurrEpoch - 0x4000) | data->getNxTs();
00110
00111 uint32_t candetecttime = hittime + 48;
00112 uint32_t maxhittime = 0;
00113
00114 roc::Message* rev = 0;
00115 roc::Iterator reviter(iter);
00116
00117 while (revindx-->0) {
00118 if (revindx<fIntBufSize)
00119 rev = fIntBuf + revindx;
00120 else {
00121 if (!reviter.prev()) {
00122 fprintf(stderr, "Cannot get previous message\n");
00123 return false;
00124 }
00125 rev = &reviter.msg();
00126 }
00127
00128
00129 if (hittime > fIntBufTimes[revindx] + 0x4400) break;
00130
00131
00132 if (!rev->isHitMsg() || (rev->getNxNumber() != data->getNxNumber())) continue;
00133
00134
00135 if (fIntBufTimes[revindx] > maxhittime) maxhittime = fIntBufTimes[revindx];
00136
00137 candetecttime += 32;
00138
00139 if (rev->getNxChNum() >= lastid) loopcnt++;
00140
00141 if (rev->getNxChNum() == data->getNxChNum()) fifocnt++;
00142
00143 lastid = rev->getNxChNum();
00144
00145
00146
00147 if ((fifocnt==4) || (loopcnt > fifocnt)) break;
00148 }
00149
00150
00151
00152
00153 if (maxhittime > candetecttime) return false;
00154
00155
00156 if (candetecttime >> 14 == hittime >> 14) return false;
00157
00158 return true;
00159 }
00160
00161
00162 bool roc::Sorter::addData(void* new_data, unsigned datalen, int fmt, bool flush_data)
00163 {
00164 if (fFillBuffer==0) return false;
00165 if (datalen == 0) return true;
00166
00167 unsigned msg_size = roc::Message::RawSize(fmt);
00168 unsigned num_msg = datalen / msg_size;
00169
00170 if (fIntBufSize + num_msg > fIntBufMaxSize) {
00171 printf("One always need enough internal buffer space to be able store complete data buffer now:%u add:%u total:%u\n",
00172 fIntBufSize, num_msg, fIntBufMaxSize);
00173 return false;
00174 }
00175
00176 roc::Iterator iter(fmt);
00177 iter.assign(new_data, datalen);
00178
00179 uint32_t diff = 0;
00180
00181 unsigned total_data_size = fIntBufSize + num_msg;
00182
00183
00184 for (unsigned indx = fIntBufSize; indx < total_data_size; indx++) {
00185
00186
00187
00188 if (!iter.next()) {
00189 printf("Cannot iterate over input buffer - FATAL ERROR!!!");
00190 exit(1);
00191 }
00192
00193 roc::Message* data = &iter.msg();
00194
00195
00196 switch (data->getMessageType()) {
00197 case roc::MSG_HIT:
00198 if (data->getNxLastEpoch()==0) {
00199 fIntBufTimes[indx] = fIntBufCurrEpoch | data->getNxTs();
00200 } else
00201 if (fIntBufCurrEpoch > 0) {
00202 fIntBufTimes[indx] = fIntBufCurrEpoch | data->getNxTs();
00203
00204 if (checkLastEpoch(data, indx, iter)) fIntBufTimes[indx] -= 0x4000;
00205
00206 } else {
00207 fIntBufTimes[indx] = tmFailure;
00208 fprintf(stderr, "HIT Last epoch err - should not happen !!!\n");
00209 }
00210 break;
00211 case roc::MSG_EPOCH:
00212 fRocId = data->getRocNumber();
00213
00214 fIntBufCurrRealEpoch = data->getEpochNumber();
00215
00216
00217 if ((indx==0) || !fIsIntBufFrontEpoch) {
00218 fIntBufFrontEpoch = fIntBufCurrRealEpoch - 0x10;
00219 fIsIntBufFrontEpoch = true;
00220 fIntBuffCurrMaxTm = 0;
00221 }
00222
00223 diff = fIntBufCurrRealEpoch - fIntBufFrontEpoch;
00224
00225
00226 if (diff >= 0x20000) {
00227 printf("Epoch diff too much %x - %x = %x - try to repair\n", fIntBufCurrRealEpoch, fIntBufFrontEpoch, diff);
00228
00229 unsigned hitsprocessed = indx - fIntBufSize;
00230
00231 if (!flushBuffer(new_data, hitsprocessed, fmt, true)) {
00232 printf ("Cannot flush buffer - error!!!\n");
00233 return false;
00234 }
00235
00236 return addData((uint8_t*)new_data + hitsprocessed*msg_size, (num_msg - hitsprocessed)*msg_size , fmt);
00237 }
00238
00239 fIntBufCurrEpoch = diff << 14;
00240 fIntBufTimes[indx] = fIntBufCurrEpoch;
00241
00242 break;
00243 case roc::MSG_SYNC:
00244 if (data->getSyncEpochLSB() == (fIntBufCurrRealEpoch & 0x1))
00245 fIntBufTimes[indx] = fIntBufCurrEpoch | data->getSyncTs();
00246 else
00247 if (fIntBufCurrEpoch > 0) {
00248 fIntBufTimes[indx] = (fIntBufCurrEpoch - 0x4000) | data->getSyncTs();
00249
00250 } else {
00251 printf("SYNC Last epoch err - should not happen !!!\n");
00252 fIntBufTimes[indx] = tmFailure;
00253 }
00254 break;
00255 case roc::MSG_AUX:
00256 if (data->getAuxEpochLSB() == (fIntBufCurrRealEpoch & 0x1))
00257 fIntBufTimes[indx] = fIntBufCurrEpoch | data->getAuxTs();
00258 else
00259 if (fIntBufCurrEpoch > 0)
00260 fIntBufTimes[indx] = (fIntBufCurrEpoch - 0x4000) | data->getAuxTs();
00261 else {
00262 printf("AUX Last epoch err - should not happen !!!\n");
00263 fIntBufTimes[indx] = tmFailure;
00264 }
00265 break;
00266 case roc::MSG_SYS:
00267 fIntBufTimes[indx] = fIntBuffCurrMaxTm;
00268 if (data->isStopDaqMsg()) flush_data = true;
00269 break;
00270 default:
00271 fIntBufTimes[indx] = tmEmpty;
00272 }
00273
00274 if ((fIntBufTimes[indx] <= tmLastValid) && (fIntBufTimes[indx]>fIntBuffCurrMaxTm))
00275 fIntBuffCurrMaxTm = fIntBufTimes[indx];
00276 }
00277
00278 if (!flushBuffer(new_data, num_msg, fmt, flush_data)) {
00279
00280 fprintf(stderr, "flushBuffer error, just copy it in the internal buffer\n");
00281
00282 roc::Iterator iter(fmt);
00283 iter.assign(new_data, num_msg*msg_size);
00284 roc::Message* tgt = fIntBuf + fIntBufSize;
00285 while (iter.next()) {
00286 tgt->assign(iter.msg());
00287 tgt++;
00288 }
00289 fIntBufSize += num_msg;
00290 }
00291
00292 if ((fIntBufSize > 0) && (fIntBufTimes[0] > tmLastValid)) {
00293 printf("Hard internal error !!!!\n");
00294 exit(1);
00295 }
00296
00297
00298
00299 if (fIntBufCurrEpoch >= tmFrontShift + tmBoundary * 16)
00300
00301 if ((fIntBufSize==0) || (fIntBufTimes[0] > tmFrontShift + tmBoundary * 2)) {
00302
00303
00304 fIntBufFrontEpoch += tmFrontShift >> 14;
00305 fIntBuffCurrMaxTm -= tmFrontShift;
00306 fIntBufCurrEpoch -= tmFrontShift;
00307
00308 for (unsigned n=0; n < fIntBufSize; n++)
00309 if (fIntBufTimes[n] <= tmLastValid)
00310 fIntBufTimes[n] -= tmFrontShift;
00311 }
00312
00313 return true;
00314 }
00315
00316 bool roc::Sorter::flushBuffer(void* new_data, unsigned num_msg, int fmt, bool force_flush)
00317 {
00318 if (fFillSize == fFillTotalSize) return false;
00319
00320 unsigned tail_indx(0), head_indx(fIntBufSize);
00321
00322 unsigned msg_size = roc::Message::RawSize(fmt);
00323 unsigned out_msg_size = roc::Message::RawSize(fFillFormat);
00324
00325 uint32_t boundary = 0;
00326 unsigned nless = 0;
00327 unsigned pmin = 0;
00328 unsigned indx = 0;
00329 uint32_t item_real_epoch = 0;
00330
00331 roc::Message inp_buf, out_msg;
00332 roc::Message* src_data = 0;
00333
00334 uint8_t* out_ptr = (uint8_t*) fFillBuffer + fFillSize*out_msg_size;
00335
00336 unsigned total_data_size = fIntBufSize + num_msg;
00337 if (total_data_size==0) return true;
00338
00339 unsigned loop_limit = total_data_size;
00340
00341 if (force_flush) {
00342
00343 if (total_data_size >= fIntBufMaxSize + fExtBufMaxSize) {
00344 printf("Something completely wrong !!!!\n");
00345 exit(1);
00346 }
00347 fIntBufTimes[total_data_size] = tmLastValid;
00348 loop_limit++;
00349 }
00350
00351 bool output_full = false;
00352
00353
00354 for (;head_indx < loop_limit; head_indx++) {
00355
00356
00357 boundary = fIntBufTimes[head_indx];
00358
00359
00360
00361 if ((boundary > tmLastValid) || (boundary < tmBoundary)) continue;
00362
00363 boundary -= tmBoundary;
00364
00365 do {
00366
00367 nless = 0;
00368 pmin = 0;
00369
00370 for(indx = tail_indx; indx < head_indx; indx++)
00371 if (fIntBufTimes[indx] < boundary) {
00372 if ((nless==0) || (fIntBufTimes[indx] < fIntBufTimes[pmin])) pmin = indx;
00373 nless++;
00374 }
00375
00376 if (nless==0) break;
00377
00378 if (pmin < fIntBufSize)
00379 src_data = fIntBuf + pmin;
00380 else {
00381 inp_buf.assign(((uint8_t*) new_data) + (pmin-fIntBufSize)*msg_size, fmt);
00382 src_data = &inp_buf;
00383 }
00384
00385 item_real_epoch = fIntBufFrontEpoch + (fIntBufTimes[pmin] >> 14);
00386
00387
00388 if ((item_real_epoch != fLastOutEpoch) && (src_data->isHitMsg() || src_data->isSyncMsg() || src_data->isAuxMsg())) {
00389
00390 out_msg.setMessageType(roc::MSG_EPOCH);
00391 out_msg.setRocNumber(fRocId);
00392 out_msg.setEpochNumber(item_real_epoch);
00393 out_msg.setEpochMissed(fAccumMissed > 255 ? 255 : fAccumMissed);
00394
00395 fLastOutEpoch = item_real_epoch;
00396 fAccumMissed = 0;
00397
00398 out_msg.copyto(out_ptr, fFillFormat);
00399 out_ptr += out_msg_size;
00400 fFillSize++;
00401
00402
00403 if (fFillSize == fFillTotalSize) {
00404 output_full = true;
00405 break;
00406 }
00407 }
00408
00409 bool accept_msg = false;
00410
00411 switch (src_data->getMessageType()) {
00412 case roc::MSG_NOP:
00413 break;
00414 case roc::MSG_HIT:
00415 out_msg.assign(*src_data);
00416 out_msg.setNxLastEpoch(0);
00417 accept_msg = true;
00418 break;
00419 case roc::MSG_EPOCH:
00420
00421
00422
00423 fAccumMissed += src_data->getEpochMissed();
00424 break;
00425 case roc::MSG_SYNC:
00426 out_msg.assign(*src_data);
00427 out_msg.setSyncEpochLSB(fLastOutEpoch & 1);
00428 accept_msg = true;
00429 break;
00430 case roc::MSG_AUX:
00431 out_msg.assign(*src_data);
00432 out_msg.setAuxEpochLSB(fLastOutEpoch & 1);
00433 accept_msg = true;
00434 break;
00435 case roc::MSG_SYS:
00436 out_msg.assign(*src_data);
00437 accept_msg = true;
00438 break;
00439 default:
00440 printf("Absolutely strange message type %u\n", src_data->getMessageType());
00441 break;
00442 }
00443
00444 if (accept_msg) {
00445 out_msg.copyto(out_ptr, fFillFormat);
00446 out_ptr += out_msg_size;
00447 fFillSize++;
00448 }
00449
00450
00451 fIntBufTimes[pmin] = tmEmpty;
00452
00453
00454 if (fFillSize == fFillTotalSize) output_full = true;
00455
00456 } while ((nless>1) && !output_full);
00457
00458
00459 while ((tail_indx < head_indx) && (fIntBufTimes[tail_indx] > tmLastValid)) tail_indx++;
00460
00461 if (output_full) break;
00462 }
00463
00464
00465
00466 if (tail_indx == total_data_size) {
00467
00468 fIntBufSize = 0;
00469
00470 } else {
00471
00472 roc::Message* tgt = fIntBuf;
00473
00474 void* src = 0;
00475
00476 unsigned copy_size = total_data_size - tail_indx;
00477
00478
00479 if (tail_indx < fIntBufSize) {
00480 memmove(tgt, fIntBuf + tail_indx, (fIntBufSize - tail_indx) * sizeof(roc::Message));
00481 tgt += (fIntBufSize - tail_indx);
00482
00483 src = new_data;
00484 copy_size = num_msg;
00485 } else {
00486 src = (uint8_t*) new_data + (tail_indx - fIntBufSize) * msg_size;
00487 }
00488
00489
00490 roc::Iterator iter(fmt);
00491 iter.assign(src, copy_size * msg_size);
00492 while (iter.next()) {
00493 tgt->assign(iter.msg());
00494 tgt++;
00495 }
00496
00497
00498
00499 fIntBufSize = total_data_size - tail_indx;
00500
00501 memmove(fIntBufTimes, fIntBufTimes + tail_indx, fIntBufSize * sizeof(uint32_t));
00502 }
00503
00504 return true;
00505 }
00506
00507 bool roc::Sorter::shiftFilledData(unsigned num)
00508 {
00509 if (num>=fFillSize) {
00510 fFillSize = 0;
00511 return true;
00512 } else
00513 if (num==0) return true;
00514
00515 fFillSize -= num;
00516
00517 unsigned msg_size = roc::Message::RawSize(fFillFormat);
00518
00519 memmove(fFillBuffer, (uint8_t*) fFillBuffer + num *msg_size, fFillSize*msg_size);
00520
00521 return true;
00522 }
00523