Go to the documentation of this file.00001 #include "fasp/FileInput.h"
00002
00003 #include <string.h>
00004 #include <stdlib.h>
00005
00006 #include "dabc/logging.h"
00007 #include "dabc/Buffer.h"
00008 #include "dabc/FileIO.h"
00009 #include "dabc/Manager.h"
00010 #include "dabc/Port.h"
00011
00012
00013 #include "base/commons.h"
00014 #include "mbs/MbsTypeDefs.h"
00015 #include "mbs/Iterator.h"
00016
00017 fasp::FileInput::FileInput(const char* fname, uint32_t bufsize) :
00018 dabc::DataInput(),
00019 fFileName(fname ? fname : ""),
00020 fBufferSize(bufsize),
00021 fFilesList(0),
00022 fFile(0),
00023 fCurrentFileName(),
00024 fCurrentRead(0),
00025 fRaw(0),
00026 fRawSize(0),
00027 fRawPos(0),
00028 fRawSync(DummySync)
00029 {
00030 DOUT2(("Create FASP input"));
00031 }
00032
00033 fasp::FileInput::~FileInput()
00034 {
00035
00036 CloseFile();
00037 if (fFilesList) {
00038 dabc::Object::Destroy(fFilesList);
00039 fFilesList = 0;
00040 }
00041
00042 if (fRaw) { delete [] fRaw; fRaw = 0; }
00043 }
00044
00045 bool fasp::FileInput::Read_Init(const dabc::WorkerRef& wrk, const dabc::Command& cmd)
00046 {
00047 fFileName = wrk.Cfg(dabc::xmlFileName, cmd).AsStdStr(fFileName);
00048 fBufferSize = wrk.Cfg(dabc::xmlBufferSize, cmd).AsInt(fBufferSize);
00049
00050 DOUT0(("BufferSize = %d Filename = %s", fBufferSize, fFileName.c_str()));
00051
00052 fRawSize = fBufferSize / FaspBlockSize;
00053 if (fRawSize < 100) fRawSize = 100;
00054 fRawSize *= FaspBlockSize;
00055
00056 return Init();
00057 }
00058
00059 bool fasp::FileInput::Init()
00060 {
00061 if (fFileName.length()==0) return false;
00062
00063 if (fFilesList!=0) {
00064 EOUT(("Files list already exists"));
00065 return false;
00066 }
00067
00068 if (fBufferSize==0) {
00069 EOUT(("Buffer size not specified !!!!"));
00070 return false;
00071 }
00072
00073 if (strpbrk(fFileName.c_str(),"*?")!=0)
00074 fFilesList = dabc::mgr()->ListMatchFiles("", fFileName.c_str());
00075 else {
00076 fFilesList = new dabc::Object(0, "FilesList", true);
00077 new dabc::Object(fFilesList, fFileName.c_str());
00078 }
00079
00080 return OpenNextFile();
00081 }
00082
00083 bool fasp::FileInput::OpenNextFile()
00084 {
00085 CloseFile();
00086
00087 if ((fFilesList==0) || (fFilesList->NumChilds()==0)) return false;
00088
00089 const char* nextfilename = fFilesList->GetChild(0)->GetName();
00090
00091 fFile = fopen(nextfilename,"r");
00092
00093 bool res = true;
00094
00095 if (fFile==0) {
00096 EOUT(("Cannot open FASP file %s for reading", nextfilename));
00097 res = false;
00098 } else {
00099 fCurrentFileName = nextfilename;
00100 DOUT1(("Open FASP file %s for reading", fCurrentFileName.c_str()));
00101
00102 if (fRaw==0) fRaw = new uint8_t [fRawSize];
00103 }
00104
00105 fFilesList->DeleteChild(0);
00106
00107 return res;
00108 }
00109
00110
00111 bool fasp::FileInput::CloseFile()
00112 {
00113 if (fFile) { fclose(fFile); fFile = 0; }
00114 fCurrentFileName = "";
00115 fCurrentRead = 0;
00116 return true;
00117 }
00118
00119 unsigned fasp::FileInput::Read_Size()
00120 {
00121
00122
00123
00124
00125
00126
00127
00128
00129 return fBufferSize;
00130 }
00131
00132 unsigned fasp::FileInput::Read_Complete(dabc::Buffer& buf)
00133 {
00134 mbs::WriteIterator iter(buf);
00135
00136 if (!iter.IsPlaceForEvent(fRawPos + sizeof(mbs::SubeventHeader))) {
00137 EOUT(("Failure - cannot put FASP data for SYNC %u in single MBS event - skip it", fRawSync));
00138 fRawPos = 0;
00139 fRawSync = DummySync;
00140 }
00141
00142 int numev = 0;
00143
00144 while (iter.IsPlaceForEvent(fRawPos + sizeof(mbs::SubeventHeader))) {
00145
00146 bool is_end_of_stream(false);
00147
00148 if ((fFile==0) || feof(fFile))
00149 if (!OpenNextFile()) is_end_of_stream = true;
00150
00151 if (is_end_of_stream && (fRawPos==0)) break;
00152
00153 uint8_t* rawbuf = fRaw + fRawPos;
00154 unsigned id(0);
00155
00156 if (!is_end_of_stream) {
00157
00158 size_t rz = fread(rawbuf, 1, FaspBlockSize, fFile);
00159
00160 if (rz<FaspBlockSize) {
00161 if (rz>0) EOUT(("Cannot read next %u bytes from FASP file - actual only %d", FaspBlockSize, rz));
00162 CloseFile();
00163 continue;
00164 }
00165
00166 id = rawbuf[FaspSyncPos] * 0x10000 + rawbuf[FaspSyncPos+1] * 0x100 + rawbuf[FaspSyncPos+2];
00167
00168 if ((fRawSync == DummySync) || (fRawSync == id)) {
00169 if (fRawPos < fRawSize - FaspBlockSize) {
00170 fRawPos += FaspBlockSize;
00171 fRawSync = id;
00172 } else {
00173 EOUT(("Not enough place in raw buffer - skip data completely"));
00174 fRawPos = 0;
00175 fRawSync = DummySync;
00176 }
00177 continue;
00178 }
00179 }
00180
00181
00182
00183 if (!iter.NewEvent(fRawSync, fRawPos + sizeof(mbs::SubeventHeader))) { EOUT(("No place for new event")); exit(1); }
00184 if (!iter.NewSubevent(fRawPos, 0, roc::proc_FASP, 0)) { EOUT(("No place for subevent")); exit(1); }
00185
00186 memcpy(iter.rawdata(), fRaw, fRawPos);
00187
00188 if (!iter.FinishSubEvent(fRawPos)) { EOUT(("Not possible to finish subevent")); exit(1); }
00189 if (!iter.FinishEvent()) { EOUT(("Not possible to finish event")); exit(1); }
00190
00191 numev++;
00192 fRawPos = 0;
00193 fRawSync = DummySync;
00194
00195 if (is_end_of_stream) break;
00196
00197
00198
00199 memcpy(fRaw, rawbuf, FaspBlockSize);
00200 fRawPos = FaspBlockSize;
00201 fRawSync = id;
00202 }
00203
00204 if (numev==0) return dabc::di_EndOfStream;
00205
00206 iter.Close();
00207
00208 return dabc::di_Ok;
00209 }