Newer
Older
///////////////////////////////////////////////////////////////////////////////
// Class THcOnlineRun //
// Read event from ET instead of CODA file //
// //
// Developer: //
// Chao Peng //
// 11/22/2019 //
///////////////////////////////////////////////////////////////////////////////
#include "THcOnlineRun.h"
#include "THcGlobals.h"
template<class Interval>
auto get_next_time(Interval itvl)
switch(error) {
case ET_ERROR_EMPTY: return "et_client: empty et event.";
case ET_ERROR_DEAD: return "et client: et is dead!";
case ET_ERROR_TIMEOUT: return "et client: got timeout!";
case ET_ERROR_BUSY: return "et client: et is busy!";
case ET_ERROR_WAKEUP: return "et_client: et is waking up.";
default: break;
}
return "et_client: unknown et error.";
}
// Constructors
THcOnlineRun::THcOnlineRun(UInt_t size, std::chrono::milliseconds itvl, Int_t mtries, Int_t vers)
: THcRun(), version(vers), bSize(size), eSize(0), interval(itvl), max_tries(mtries),
et_id(nullptr), stat_id(ID_NULL), att_id(ID_NULL)
{
buffer = new UInt_t[size];
clock = get_next_time(itvl);
}
// Copy constructor
THcOnlineRun::THcOnlineRun(const THcOnlineRun& rhs)
interval = rhs.interval;
max_tries = rhs.max_tries;
// do not copy the ownership of et system
et_id = nullptr;
stat_id = ID_NULL;
att_id = ID_NULL;
}
// Missing move constructor, not implementing for consistency with the analyzer
// Copy assignment operator
THcOnlineRun& THcOnlineRun::operator=(const THcOnlineRun& rhs)
interval = rhs.interval;
max_tries = rhs.max_tries;
et_id = nullptr;
stat_id = ID_NULL;
att_id = ID_NULL;
THcRun::operator=(rhs);
}
return *this;
}
// Missing move assignment operator, not implementing for consistency with the analyzer
// Destructor.
THcOnlineRun::~THcOnlineRun()
{
// delete buffer
if (buffer != nullptr) {
delete[](buffer), buffer = nullptr;
}
// do not touch et_event as it is managed by the et system
Close();
}
// Connect a ET system and create a monitor station with pre-settings
Int_t THcOnlineRun::Connect(const char *ip_addr, int port, const char *et_file, const char *s_name)
{
// open et system
auto conf = PRadETChannel::Configuration();
// use a direct connection to the ET system
conf.SetCast(ET_DIRECT);
conf.SetHost(ip_addr);
conf.SetServerPort(port);
auto status = Open(et_file, conf);
if (status < ET_OK) {
_logger->error("THcOnlineRun::Connect : Failed to open ET system, abort connection.");
Close();
return status;
}
// create a station
auto sconf = PRadETStation::Configuration();
sconf.SetUser(ET_STATION_USER_MULTI);
sconf.SetRestore(ET_STATION_RESTORE_OUT);
sconf.SetPrescale(1);
sconf.SetCUE(ET_CHUNK_SIZE);
sconf.SetSelect(ET_STATION_SELECT_ALL);
sconf.SetBlock(ET_STATION_NONBLOCKING);
if (status < ET_OK) {
_logger->error("THcOnlineRun::Connect : Failed to create a station, abort connection.");
Close();
if (status < ET_OK) {
_logger->error("THcOnlineRun::Connect : Failed to attach to the station, abort connection.");
Close();
}
return ET_OK;
}
// Open ET from configuration
Int_t THcOnlineRun::Open(const char *et_file, PRadETChannel::Configuration conf)
{
if (IsOpen()) {
_logger->warn("THcOnlineRun::Open : ET system is already opened, close it before re-open");
return ID_NULL;
}
char fname[256];
strncpy(fname, et_file, 256);
return et_open(&et_id, fname, conf.Get());
}
// Close ET
Int_t THcOnlineRun::Close()
{
if ( (et_id != nullptr) && IsOpen() ) {
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
et_id = nullptr;
return 1;
}
return 0;
}
// create a station from configuration
Int_t THcOnlineRun::CreateStation(const char *station_name, PRadETStation::Configuration conf)
{
if (!IsOpen()) {
_logger->warn("THcOnlineRun::CreateStation : ET System is not opened, abort creating a station.");
return ID_NULL;
} else if (stat_id != ID_NULL) {
_logger->warn("THcOnlineRun::CreateStation : A station has alreadly been created, remove it before re-create a station.");
return ID_NULL;
}
char sname[256];
strncpy(sname, station_name, 256);
return et_station_create(et_id, &stat_id, sname, conf.Get());
}
// remove the station from et system
Int_t THcOnlineRun::RemoveStation()
{
if (IsOpen() && (stat_id != ID_NULL)) {
auto status = et_station_remove(et_id, stat_id);
if (status == ET_OK) {
stat_id = ID_NULL;
return status;
}
}
return ID_NULL;
}
// Attach to a station
Int_t THcOnlineRun::Attach()
{
}
// Attach to a station with station id
Int_t THcOnlineRun::Attach(et_stat_id sid)
{
if (!IsOpen()) {
_logger->warn("THcOnlineRun::Attach : ET System is not opened, abort attaching to a station.");
return ID_NULL;
}
return et_station_attach(et_id, sid, &att_id);
}
// Detach the station
Int_t THcOnlineRun::Detach()
{
if (IsOpen() && (att_id != ID_NULL)) {
auto status = et_station_detach(et_id, att_id);
if (status == ET_OK) {
att_id = ID_NULL;
return status;
}
}
return ID_NULL;
}
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
// copy event to the buffer
bool THcOnlineRun::copyEvent()
{
void *data;
size_t size;
et_event_getdata(ev, &data);
et_event_getlength(ev, &size);
// from byte to the buffer type
auto *dBuffer = (decltype(buffer)) data;
size_t bytes = sizeof(decltype(buffer[0]));
eSize = size/bytes + ((size % bytes) ? 1 : 0);
if (eSize > bSize) {
_logger->warn("THcOnlineRun::copyEvent et event size {} exceeds the buffer size {}.", eSize, bSize);
return false;
}
for (UInt_t i = 0; i < eSize; ++i) {
buffer[i] = dBuffer[i];
}
return true;
}
// read event from et channel
Int_t THcOnlineRun::ReadEvent()
{
const char *here = "THcOnlineRun::ReadEvent";
UInt_t ntries = 0;
if (!IsOpen() || att_id == ID_NULL) {
_logger->error("{} Did not connect to an ET system, abort Reading.", here);
return THaRunBase::READ_FATAL;
}
Int_t read = THaRunBase::READ_OK;
while(ntries++ < max_tries) {
std::this_thread::sleep_until(clock);
clock = get_next_time(interval);
auto status = et_event_get(et_id, att_id, &ev, ET_ASYNC, nullptr);
switch(status) {
case ET_OK:
// copy event to buffer
if (!copyEvent()) {
read = THaRunBase::READ_EMPTY;
}
// put back the event
if (et_event_put(et_id, att_id, ev) != ET_OK) {
_logger->error("{} failed to put back et_event to ET system, abort monitoring.", here);
read = THaRunBase::READ_EOF;
}
return read;
case ET_ERROR_EMPTY:
break;
default:
_logger->error("{} {}", here, et_error_to_string(status).c_str());
return THaRunBase::READ_FATAL;
}
}
_logger->info("THcOnlineRun::ReadEvent : Tried {} times and no event obtained from ET.", max_tries);
return THaRunBase::READ_EMPTY;
}