Skip to content
Snippets Groups Projects
Commit 90374db9 authored by Chao Peng's avatar Chao Peng
Browse files

Add an EtChannel to read event from ET system, and two tools to test it

parent f425f70d
No related branches found
No related tags found
1 merge request!2Revamp the code structure
Showing with 832 additions and 112 deletions
...@@ -43,4 +43,5 @@ plots/* ...@@ -43,4 +43,5 @@ plots/*
*.dat *.dat
*.root *.root
*.csv *.csv
*.evio*
...@@ -25,4 +25,5 @@ add_subdirectory(third_party) ...@@ -25,4 +25,5 @@ add_subdirectory(third_party)
add_subdirectory(decoder) add_subdirectory(decoder)
add_subdirectory(ssp) add_subdirectory(ssp)
add_subdirectory(src) add_subdirectory(src)
add_subdirectory(tools)
...@@ -7,7 +7,6 @@ add_executable(${exe} ...@@ -7,7 +7,6 @@ add_executable(${exe}
target_include_directories(${exe} target_include_directories(${exe}
PUBLIC PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}> $<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}>
${EVC_INCLUDE_DIR}
${ROOT_INCLUDE_DIRS} ${ROOT_INCLUDE_DIRS}
) )
......
...@@ -73,7 +73,10 @@ void write_raw_data(const std::string &dpath, const std::string &opath, const st ...@@ -73,7 +73,10 @@ void write_raw_data(const std::string &dpath, const std::string &opath, const st
// raw data // raw data
evc::EvChannel evchan; evc::EvChannel evchan;
evchan.Open(dpath); if (evchan.Open(dpath) != evc::status::success) {
std::cout << "Cannot open evchannel at " << dpath << std::endl;
return;
}
// output // output
auto *hfile = new TFile(opath.c_str(), "RECREATE", "MAPMT test results"); auto *hfile = new TFile(opath.c_str(), "RECREATE", "MAPMT test results");
...@@ -126,7 +129,7 @@ void write_raw_data(const std::string &dpath, const std::string &opath, const st ...@@ -126,7 +129,7 @@ void write_raw_data(const std::string &dpath, const std::string &opath, const st
int count = 0; int count = 0;
auto &ref = modules.front(); auto &ref = modules.front();
while (evchan.Read() && (nev-- != 0)) { while ((evchan.Read() == evc::status::success) && (nev-- != 0)) {
if((count % PROGRESS_COUNT) == 0) { if((count % PROGRESS_COUNT) == 0) {
std::cout << "Processed events - " << count << "\r" << std::flush; std::cout << "Processed events - " << count << "\r" << std::flush;
} }
......
#include "ConfigParser.h"
#include "ConfigObject.h"
#include "utils.h"
#include "nlohmann/json.hpp"
#include <iomanip>
#include <string>
#include <vector>
using json = nlohmann::json;
int main(int argc, char *argv[])
{
std::string path = argv[1];
// read in file
std::string buffer = ConfigParser::file_to_string(path);
// remove comments
ConfigParser::comment_line(buffer, "#", "\n");
// get content blocks
auto text = ConfigParser::break_into_blocks(buffer, "{", "}");
ConfigObject conf;
std::vector<Module> res;
for(auto &b : text.blocks)
{
// module
if (!ConfigParser::case_ins_equal("Module", b.label))
continue;
auto btext = ConfigParser::break_into_blocks(b.content, "{", "}");
conf.ReadConfigString(btext.residual);
// module attributes
Module mod;
mod.crate = conf.GetConfigValue("crate").Int();
mod.slot = conf.GetConfigValue("slot").Int();
mod.type = str2ModuleType(conf.GetConfigValue("type").c_str());
// channels
for (auto &sub : btext.blocks) {
if (!ConfigParser::case_ins_equal("Channels", sub.label))
continue;
ConfigParser parser;
parser.ReadBuffer(sub.content.c_str());
while(parser.ParseLine()) {
mod.channels.emplace_back(Channel{
parser.TakeFirst().Int(),
parser.TakeFirst().String(),
str2ChannelType(parser.TakeFirst().c_str())
});
}
}
res.emplace_back(mod);
}
// print out all channels to json
json db;
for (auto &mod : res) {
auto name = "module_" + std::to_string(mod.crate) + "_" + std::to_string(mod.slot);
db[name]["crate"] = mod.crate;
db[name]["slot"] = mod.slot;
db[name]["type"] = ModuleType2str(mod.type);
auto chs = json::array();
for (auto &ch : mod.channels) {
chs.push_back({{"channel", ch.id}, {"name", ch.name}, {"type", ChannelType2str(ch.type)}});
}
db[name]["channels"] = chs;
}
std::ofstream output_file(argv[2]);
output_file << std::setw(4) << db << "\n";
}
...@@ -319,12 +319,3 @@ ConfigValue &ConfigValue::Trim(const std::string &white) ...@@ -319,12 +319,3 @@ ConfigValue &ConfigValue::Trim(const std::string &white)
return *this; return *this;
} }
//============================================================================//
// Other functions //
//============================================================================//
ostream &operator << (ostream &os, const ConfigValue &b)
{
return os << b.c_str();
}
...@@ -135,6 +135,9 @@ private: ...@@ -135,6 +135,9 @@ private:
}; };
// show string content of the config value to ostream // show string content of the config value to ostream
std::ostream &operator << (std::ostream &os, const ConfigValue &b); static std::ostream &operator << (std::ostream &os, const ConfigValue &b)
{
return os << b.c_str();
}
#endif #endif
...@@ -23,11 +23,14 @@ set(INSTALL_CONFIGDIR ${CMAKE_INSTALL_LIBDIR}/${MAIN_PROJECT_NAME_LC}) ...@@ -23,11 +23,14 @@ set(INSTALL_CONFIGDIR ${CMAKE_INSTALL_LIBDIR}/${MAIN_PROJECT_NAME_LC})
# Sources and headers # Sources and headers
set(src set(src
EvChannel.cpp EvChannel.cpp
EtChannel.cpp
) )
set(headers set(headers
EvChannel.h
EvStruct.h EvStruct.h
EvChannel.h
EtChannel.h
EtConfigWrapper.h
) )
set(LIBNAME evc) set(LIBNAME evc)
...@@ -36,8 +39,6 @@ add_library(${LIBNAME} SHARED ${src}) ...@@ -36,8 +39,6 @@ add_library(${LIBNAME} SHARED ${src})
target_include_directories(${LIBNAME} target_include_directories(${LIBNAME}
PUBLIC PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}> $<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}>
${CODA_ET_INCLUDE_DIR}
${EVIO_INCLUDE_DIR}
) )
target_link_libraries(${LIBNAME} target_link_libraries(${LIBNAME}
......
#include "EtChannel.h"
#include <iostream>
#include <cstring>
using namespace evc;
static inline status et_status(int code, bool verbose = false)
{
switch (code) {
case ET_OK:
return status::success;
default:
if (verbose) {
std::cerr << "EtChannel Error: " << et_wrap::get_error_str(code) << "\n";
}
return status::failure;
}
}
EtChannel::EtChannel(size_t buf_size)
: EvChannel(buf_size), et_id(nullptr), stat_id(ID_NULL), att_id(ID_NULL)
{
// large enough chunk
pe.resize(1000);
sconf.set_cue(ET_STATION_CUE);
sconf.set_user(ET_STATION_USER_MULTI);
sconf.set_restore(ET_STATION_RESTORE_OUT);
sconf.set_select(ET_STATION_SELECT_ALL);
sconf.set_block(ET_STATION_NONBLOCKING);
sconf.set_prescale(1);
}
// Connect a ET system and create a monitor station with pre-settings
status EtChannel::Connect(const std::string &ip, int port, const std::string &et_file)
{
if (IsETOpen()) {
std::cout << "EtChannel Warning: a ET system has already been connected.\n";
}
// open et system
et_wrap::OpenConfig conf;
// use a direct connection to the ET system
conf.set_cast(ET_DIRECT);
conf.set_host(ip.c_str());
conf.set_serverport(port);
char *fname = strdup(et_file.c_str());
auto status = et_open(&et_id, fname, conf.configure().get());
free(fname);
return et_status(status, true);
}
// create a station and attach to it
status EtChannel::Open(const std::string &station)
{
if (!IsETOpen()) {
std::cout << "EtChannel Error: ET System is not opened, cannot open the channel." << std::endl;
return status::failure;
}
// create a station first
if (stat_id == ID_NULL) {
char *sname = strdup(station.c_str());
et_station_create(et_id, &stat_id, sname, sconf.configure().get());
free(sname);
}
return et_status(et_station_attach(et_id, stat_id, &att_id), true);
}
// detach from the station
void EtChannel::Close()
{
if (IsETOpen() && (att_id != ID_NULL)) {
auto status = et_status(et_station_detach(et_id, att_id), true);
att_id = ID_NULL;
}
}
// read an event
status EtChannel::Read()
{
// buffers are not empty, just get the first element from it
if (!buffers.empty()) {
buffer = std::move(buffers.front());
buffers.pop_front();
return status::success;
// read from ET system
} else {
int nread;
auto status = et_events_get(et_id, att_id, &pe[0], ET_ASYNC, nullptr, ET_STATION_CUE, &nread);
switch (status) {
case ET_OK:
if (!copyEvent(&pe[0], nread)) {
return status::empty;
}
if (et_events_put(et_id, att_id, &pe[0], nread) != ET_OK) {
std::cerr << "EtChannel Error: failed to put back et_event after reading.\n";
return status::eof;
}
// get an event from the buffers
buffer = std::move(buffers.front());
buffers.pop_front();
break;
case ET_ERROR_BUSY:
case ET_ERROR_TIMEOUT:
case ET_ERROR_WAKEUP:
std::cout << "EtChannel Warning: " << et_wrap::get_error_str(status) << "\n";
case ET_ERROR_EMPTY:
return status::empty;
// fatal errors
default:
std::cerr << "EtChannel Error: " << et_wrap::get_error_str(status) << "\n";
return status::failure;
}
}
return status::success;
}
// helper functions
template<class Func, class... Args>
inline bool ev_filter(Func beg, Func end, Args&&... args)
{
for (auto it = beg; it != end; ++it) {
if (!(*it)(args...)) return false;
}
return true;
}
template<class Func>
std::vector<uint32_t> copy_event(const uint32_t *buf, bool swap, Func fil_beg, Func fil_end)
{
std::vector<uint32_t> event;
auto header = BankHeader(buf);
// invalid header
if (header.length < 1) {
return event;
}
// cannot pass filters
for (auto it = fil_beg; it != fil_end; ++it) {
if (!(*it)(header)) {
return event;
}
}
// good event, copy it!
event.assign(buf, buf + header.length + 1);
if (swap) {
for (auto &val : event) {
val = ET_SWAP32(val);
}
}
return event;
}
// copy event to the buffer
bool EtChannel::copyEvent(et_event **pe, int nread)
{
if (nread <= 0) {
return false;
}
void *data;
size_t len, bytes = sizeof(uint32_t);
int endian, swap;
for (int i = 0; i < nread; ++i) {
// get event data and attributes from ET
et_event_getdata(pe[i], &data);
et_event_getlength(pe[i], &len);
// et_event_getendian(pe[i], &endian);
et_event_needtoswap(pe[i], &swap);
// size of the buffer
len = len / bytes + ((len % bytes) ? 1 : 0);
uint32_t *dbuf = static_cast<uint32_t*>(data);
// check if it is a block
if ((len > 6) && (0xc0da0100 == (swap ? ET_SWAP32(dbuf[7]) : dbuf[7]))) {
// skip the block header (size 8)
uint32_t index = 8;
while (index < dbuf[0]) {
auto event = copy_event(&dbuf[index], swap, filters.begin(), filters.end());
index += dbuf[index] + 1;
if (event.size()) {
buffers.emplace_back(std::move(event));
}
}
// a single event
} else {
auto event = copy_event(dbuf, swap, filters.begin(), filters.end());
if (event.size()) {
buffers.emplace_back(std::move(event));
}
}
}
return true;
}
//=============================================================================
// Class EtChannel ||
// Read event from ET instead of CODA file ||
// ||
// Developer: ||
// Chao Peng ||
// 11/22/2019 ||
//=============================================================================
#pragma once
#include "EtConfigWrapper.h"
#include "EvChannel.h"
#include "EvStruct.h"
#include <functional>
#include <iostream>
#include <chrono>
#include <string>
#include <thread>
#include <list>
#include <vector>
#define ID_NULL -9999
namespace evc {
class EtChannel : public EvChannel
{
public:
EtChannel(size_t buf_size = 1024*2000);
virtual status Open(const std::string &station);
virtual void Close();
virtual status Read();
status Connect(const std::string &ip, int port, const std::string &et_file);
bool IsETOpen() const { return (et_id != nullptr) && et_alive(et_id); }
void AddEvFilter(std::function<bool(const BankHeader &)> &&func) { filters.emplace_back(func); }
et_wrap::StationConfig &GetConfig() { return sconf; }
const et_wrap::StationConfig &GetConfig() const { return sconf; }
private:
bool copyEvent(et_event **pe, int nread);
et_wrap::StationConfig sconf;
et_sys_id et_id;
et_stat_id stat_id;
et_att_id att_id;
std::list<std::vector<uint32_t>> buffers;
std::vector<std::function<bool(const BankHeader &)>> filters;
std::vector<et_event*> pe;
};
} // namespace evc
//=============================================================================
// EtConfigWrapper ||
// A C++ wrapper for the configs of ET system/station ||
// ||
// Developer: ||
// Chao Peng ||
// 11/22/2019 ||
//=============================================================================
#pragma once
#include <vector>
#include <memory>
#include <string>
#include <iostream>
#include <unordered_set>
#include "et.h"
#define ET_VERSION 16
#define SET_BIT(n, i) ( (n) |= (1ULL << i) )
#define CLEAR_BIT(n, i) ( (n) &= ~(1ULL << i) )
#define TEST_BIT(n, i) ( (bool)( n & (1ULL << i) ) )
#define ETCONF_ADD_MEMBER(type, var, flag) \
public:\
void set_##var(type val) { var = val; SET_BIT(flag, static_cast<uint32_t>(Flag::var)); } \
type get_##var() const { return var; } \
private:\
type var;
namespace et_wrap {
static std::string get_error_str(int error)
{
switch(error) {
case ET_ERROR: return "General error.";
case ET_ERROR_TOOMANY: return "Too many somethings (stations, attachments, temp events, ET system responses) exist.";
case ET_ERROR_EXISTS: return "ET system file or station already exists.";
case ET_ERROR_WAKEUP: return "Sleeping routine woken up by et_wakeup_attachment() or et_wakeup_all().";
case ET_ERROR_TIMEOUT: return "Timed out.";
case ET_ERROR_EMPTY: return "No events available in async mode.";
case ET_ERROR_BUSY: return "Resource is busy.";
case ET_ERROR_DEAD: return "ET system is dead.";
case ET_ERROR_READ: return "Network read error.";
case ET_ERROR_WRITE: return "Network write error,";
case ET_ERROR_REMOTE: return "Cannot allocate memory in remote client.";
#if ET_VERSION >= 14
case ET_ERROR_TOOBIG: return "Client is 32 bits & server is 64 (or vice versa) and event is too big for one.";
case ET_ERROR_NOMEM: return "Cannot allocate memory.";
case ET_ERROR_BADARG: return "Bad argument given to function.";
case ET_ERROR_SOCKET: return "Socket option could not be set.";
case ET_ERROR_NETWORK: return "Host name or address could not be resolved, or cannot connect.";
case ET_ERROR_CLOSED: return "ET system has been closed by client.";
case ET_ERROR_JAVASYS: return "C code trying to open Java-based ET system file locally.";
#endif
default: break;
}
return "Unknown error";
}
class OpenConfig
{
#define OPEN_CONFIG_SET(flag, ptr, var) \
if (TEST_BIT(flag, static_cast<uint32_t>(Flag::var))) { et_open_config_set##var(ptr, get_##var()); }
public:
enum class Flag : uint32_t {
wait = 0,
cast,
TTL,
mode,
debugdefault,
port,
serverport,
policy,
tcp,
timeout,
host,
interface
};
struct tcp_setting {
int rbuf_size, sbuf_size, no_delay;
};
public:
// initialize
OpenConfig() { flag = 0; }
// set configuration and return a smart pointer
std::shared_ptr<void> configure() const
{
void *ptr;
et_open_config_init(&ptr);
OPEN_CONFIG_SET(flag, ptr, wait);
OPEN_CONFIG_SET(flag, ptr, cast);
OPEN_CONFIG_SET(flag, ptr, mode);
OPEN_CONFIG_SET(flag, ptr, port);
OPEN_CONFIG_SET(flag, ptr, serverport);
OPEN_CONFIG_SET(flag, ptr, timeout);
if (TEST_BIT(flag, static_cast<uint32_t>(Flag::host))) {
char temp[1024];
strncpy(temp, host.c_str(), 1024);
et_open_config_sethost(ptr, temp);
}
#if ET_VERSION >= 14
OPEN_CONFIG_SET(flag, ptr, debugdefault);
OPEN_CONFIG_SET(flag, ptr, TTL);
OPEN_CONFIG_SET(flag, ptr, policy);
if (TEST_BIT(flag, static_cast<uint32_t>(Flag::interface))) {
et_open_config_setinterface(ptr, interface.c_str());
}
for (auto &c : multi_casts) {
et_open_config_addmulticast(ptr, c.c_str());
}
for (auto &c : broad_casts) {
et_open_config_addbroadcast(ptr, c.c_str());
}
if (TEST_BIT(flag, static_cast<uint32_t>(Flag::tcp))) {
et_open_config_settcp(ptr, tcp.rbuf_size, tcp.sbuf_size, tcp.no_delay);
}
#endif
return std::shared_ptr<void>(ptr, [] (void *p) { et_open_config_destroy(p); });
}
std::unordered_set<std::string> broad_casts, multi_casts;
private:
uint32_t flag;
ETCONF_ADD_MEMBER(int, wait, flag);
ETCONF_ADD_MEMBER(int, cast, flag);
ETCONF_ADD_MEMBER(int, TTL, flag);
ETCONF_ADD_MEMBER(int, mode, flag);
ETCONF_ADD_MEMBER(int, debugdefault, flag);
ETCONF_ADD_MEMBER(int, port, flag);
ETCONF_ADD_MEMBER(int, serverport, flag);
ETCONF_ADD_MEMBER(int, policy, flag);
ETCONF_ADD_MEMBER(struct tcp_setting, tcp, flag);
ETCONF_ADD_MEMBER(struct timespec, timeout, flag);
ETCONF_ADD_MEMBER(std::string, host, flag);
ETCONF_ADD_MEMBER(std::string, interface, flag);
};
class StationConfig
{
#define STATION_CONFIG_SET(flag, ptr, var) \
if (TEST_BIT(flag, static_cast<uint32_t>(Flag::var))) { et_station_config_set##var(ptr, get_##var()); }
public:
enum class Flag : uint32_t {
block = 0,
flow,
select,
user,
restore,
cue,
prescale,
selectwords,
function,
lib,
myclass,
};
public:
// initialize
StationConfig() { flag = 0; }
// set configuration and return a smart pointer
std::shared_ptr<void> configure() const
{
void *ptr;
et_station_config_init(&ptr);
STATION_CONFIG_SET(flag, ptr, block);
STATION_CONFIG_SET(flag, ptr, select);
STATION_CONFIG_SET(flag, ptr, user);
STATION_CONFIG_SET(flag, ptr, restore);
STATION_CONFIG_SET(flag, ptr, cue);
STATION_CONFIG_SET(flag, ptr, prescale);
if (!selectwords.empty() && TEST_BIT(flag, static_cast<uint32_t>(Flag::selectwords))) {
// copy a vector to maintain the const behavior
auto words = selectwords;
et_station_config_setselectwords(ptr, &words[0]);
}
if (TEST_BIT(flag, static_cast<uint32_t>(Flag::function))) {
char temp[1024];
strncpy(temp, function.c_str(), 1024);
if (et_station_config_setfunction(ptr, temp) != ET_OK) {
std::cerr << "Could not set function \"" << function << "\" for station config." << std::endl;
}
}
if (TEST_BIT(flag, static_cast<uint32_t>(Flag::lib))) {
char temp[1024];
strncpy(temp, lib.c_str(), 1024);
if (et_station_config_setlib(ptr, temp) != ET_OK) {
std::cerr << "Could not set library \"" << lib << "\" for station config." << std::endl;
}
}
#if ET_VERSION >= 14
STATION_CONFIG_SET(flag, ptr, flow);
if (TEST_BIT(flag, static_cast<uint32_t>(Flag::myclass))) {
if (et_station_config_setclass(ptr, myclass.c_str()) != ET_OK) {
std::cerr << "Could not set class \"" << myclass << "\" for station config." << std::endl;
}
}
#endif
return std::shared_ptr<void>(ptr, [] (void *p) { et_station_config_destroy(p); });
}
std::unordered_set<std::string> broad_casts, multi_casts;
private:
uint32_t flag;
ETCONF_ADD_MEMBER(int, block, flag);
ETCONF_ADD_MEMBER(int, flow, flag);
ETCONF_ADD_MEMBER(int, select, flag);
ETCONF_ADD_MEMBER(int, user, flag);
ETCONF_ADD_MEMBER(int, restore, flag);
ETCONF_ADD_MEMBER(int, cue, flag);
ETCONF_ADD_MEMBER(int, prescale, flag);
ETCONF_ADD_MEMBER(std::vector<int>, selectwords, flag);
ETCONF_ADD_MEMBER(std::string, function, flag);
ETCONF_ADD_MEMBER(std::string, lib, flag);
ETCONF_ADD_MEMBER(std::string, myclass, flag);
};
}; // namespace et_wrapper
...@@ -9,6 +9,22 @@ ...@@ -9,6 +9,22 @@
using namespace evc; using namespace evc;
// convert evio status to the enum
static inline status evio_status (int code)
{
switch (code) {
case S_SUCCESS:
return status::success;
case EOF:
case S_EVFILE_UNXPTDEOF:
return status::eof;
case S_EVFILE_TRUNC:
return status::incomplete;
default:
return status::failure;
}
}
EvChannel::EvChannel(size_t buflen) EvChannel::EvChannel(size_t buflen)
: fHandle(-1) : fHandle(-1)
{ {
...@@ -16,7 +32,7 @@ EvChannel::EvChannel(size_t buflen) ...@@ -16,7 +32,7 @@ EvChannel::EvChannel(size_t buflen)
} }
int EvChannel::Open(const std::string &path) status EvChannel::Open(const std::string &path)
{ {
if (fHandle > 0) { if (fHandle > 0) {
Close(); Close();
...@@ -24,7 +40,7 @@ int EvChannel::Open(const std::string &path) ...@@ -24,7 +40,7 @@ int EvChannel::Open(const std::string &path)
char *cpath = strdup(path.c_str()), *copt = strdup("r"); char *cpath = strdup(path.c_str()), *copt = strdup("r");
int status = evOpen(cpath, copt, &fHandle); int status = evOpen(cpath, copt, &fHandle);
free(cpath); free(copt); free(cpath); free(copt);
return status; return evio_status(status);
} }
void EvChannel::Close() void EvChannel::Close()
...@@ -33,17 +49,15 @@ void EvChannel::Close() ...@@ -33,17 +49,15 @@ void EvChannel::Close()
fHandle = -1; fHandle = -1;
} }
bool EvChannel::Read() status EvChannel::Read()
{ {
return (evRead(fHandle, &buffer[0], buffer.size()) == S_SUCCESS); return evio_status(evRead(fHandle, &buffer[0], buffer.size()));
} }
bool EvChannel::ScanBanks(const std::vector<uint32_t> &banks) bool EvChannel::ScanBanks(const std::vector<uint32_t> &banks)
{ {
buffer_info.clear(); buffer_info.clear();
if (fHandle <= 0) { return false; }
auto evh = BankHeader(&buffer[0]); auto evh = BankHeader(&buffer[0]);
// skip the header // skip the header
size_t iword = BankHeader::size(); size_t iword = BankHeader::size();
......
//=============================================================================
// Class EvChannel ||
// Read event from CODA evio file, it can also scan data banks to locate ||
// event buffers ||
// ||
// Developer: ||
// Chao Peng ||
// 09/07/2020 ||
//=============================================================================
#pragma once #pragma once
#include "EvStruct.h" #include "EvStruct.h"
#include <iostream>
#include <string> #include <string>
#include <vector> #include <vector>
#include <exception> #include <exception>
...@@ -9,6 +19,16 @@ ...@@ -9,6 +19,16 @@
namespace evc { namespace evc {
// status enum
enum class status : int
{
failure = -1,
success = 1,
incomplete = 2,
empty = 3,
eof = 4,
};
// buffer address // buffer address
struct BufferAddress struct BufferAddress
{ {
...@@ -37,16 +57,24 @@ class EvChannel ...@@ -37,16 +57,24 @@ class EvChannel
public: public:
EvChannel(size_t buflen = 1024*2000); EvChannel(size_t buflen = 1024*2000);
int Open(const std::string &path); virtual status Open(const std::string &path);
void Close(); virtual void Close();
bool Read(); virtual status Read();
bool ScanBanks(const std::vector<uint32_t> &banks); bool ScanBanks(const std::vector<uint32_t> &banks);
bool Scan() { return ScanBanks({}); } bool Scan() { return ScanBanks({}); }
uint32_t *GetRawBuffer() { return &buffer[0]; } uint32_t *GetRawBuffer() { return &buffer[0]; }
const uint32_t *GetRawBuffer() const { return &buffer[0]; } const uint32_t *GetRawBuffer() const { return &buffer[0]; }
std::vector<uint32_t> &GetRawBufferVec() { return buffer; }
const std::vector<uint32_t> &GetRawBufferVec() const { return buffer; }
BankHeader GetEvHeader() const { return BankHeader(&buffer[0]); } BankHeader GetEvHeader() const { return BankHeader(&buffer[0]); }
const std::unordered_map<BufferAddress, std::vector<BufferInfo>, BufferHash> &GetEvBuffers() const
{
return buffer_info;
}
const std::vector<BufferInfo> &GetEvBuffer(uint32_t roc, uint32_t bank, uint32_t slot) const const std::vector<BufferInfo> &GetEvBuffer(uint32_t roc, uint32_t bank, uint32_t slot) const
{ {
auto it = buffer_info.find(BufferAddress{roc, bank, slot}); auto it = buffer_info.find(BufferAddress{roc, bank, slot});
...@@ -74,16 +102,7 @@ public: ...@@ -74,16 +102,7 @@ public:
throw std::runtime_error(error); throw std::runtime_error(error);
} }
static uint32_t swap_endian32(uint32_t num) protected:
{
uint32_t b0 = (num & 0x000000ff) << 24u;
uint32_t b1 = (num & 0x0000ff00) << 8u;
uint32_t b2 = (num & 0x00ff0000) >> 8u;
uint32_t b3 = (num & 0xff000000) >> 24u;
return b0 | b1 | b2 | b3;
}
private:
size_t scanTriggerBank(const uint32_t *buf, size_t gindex); size_t scanTriggerBank(const uint32_t *buf, size_t gindex);
size_t scanRocBank(const uint32_t *buf, size_t gindex, const std::vector<uint32_t> &banks); size_t scanRocBank(const uint32_t *buf, size_t gindex, const std::vector<uint32_t> &banks);
void scanDataBank(const uint32_t *buf, size_t buflen, uint32_t roc, uint32_t bank, size_t gindex); void scanDataBank(const uint32_t *buf, size_t buflen, uint32_t roc, uint32_t bank, size_t gindex);
...@@ -93,5 +112,11 @@ private: ...@@ -93,5 +112,11 @@ private:
std::unordered_map<BufferAddress, std::vector<BufferInfo>, BufferHash> buffer_info; std::unordered_map<BufferAddress, std::vector<BufferInfo>, BufferHash> buffer_info;
}; };
} // namespace evio } // namespace evc
// operator for address output
static std::ostream &operator << (std::ostream &os, const evc::BufferAddress &a)
{
return os << "(" << a.roc << ", " << a.bank << ", " << a.slot << ")";
}
//=============================================================================
// EvStruct ||
// Basic information about CODA evio file format ||
// ||
// Developer: ||
// Chao Peng ||
// 09/07/2020 ||
//=============================================================================
#pragma once #pragma once
#include <cstdint> #include <cstdint>
......
# build analyzer binary
set(sources
et_feeder.cpp
evchan_test.cpp
)
foreach(src ${sources})
# I used a simple string replace, to cut off .cpp.
string( REPLACE ".cpp" "" exe ${src} )
add_executable(${exe} ${src})
target_include_directories(${exe}
PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}>
${ROOT_INCLUDE_DIRS}
)
target_link_libraries(${exe}
LINK_PUBLIC
${ROOT_LIBRARIES}
evc
conf
)
install(TARGETS ${exe} DESTINATION ${CMAKE_INSTALL_BINDIR})
endforeach(src ${sources})
#include "ConfigOption.h" /* A program to feed evio file to an ET system event-by-event
#include "ETConfigWrapper.h" * ET system can be opened with et_start with the JLab ET library and its executables
#include "et.h" */
#include "THaCodaFile.h" #include "ConfigArgs.h"
#include "EtConfigWrapper.h"
#include "EvChannel.h"
#include <csignal> #include <csignal>
#include <thread> #include <thread>
#include <chrono> #include <chrono>
#include <iostream> #include <iostream>
#include <iomanip>
#define PROGRESS_COUNT 100 #define PROGRESS_COUNT 100
...@@ -20,52 +23,18 @@ void signal_handler(int signal) { ...@@ -20,52 +23,18 @@ void signal_handler(int signal) {
} }
int main(int argc, char* argv[]) try int main(int argc, char* argv[])
{ {
// setup input arguments // setup input arguments
ConfigOption conf_opt; ConfigArgs arg_parser;
conf_opt.AddLongOpt(ConfigOption::help_message, "help", 'a'); arg_parser.AddHelp("--help");
conf_opt.AddOpt(ConfigOption::arg_require, 'h'); arg_parser.AddPositional("evio_file", "input evio file");
conf_opt.AddOpt(ConfigOption::arg_require, 'p'); arg_parser.AddArg<std::string>("-h", "host", "host address of the ET system", "localhost");
conf_opt.AddOpt(ConfigOption::arg_require, 'f'); arg_parser.AddArg<int>("-p", "port", "port to connect ET system", 11111);
conf_opt.AddOpt(ConfigOption::arg_require, 'i'); arg_parser.AddArg<std::string>("-f", "et_file", "path to the memory mapped et file", "/tmp/et_feeder");
arg_parser.AddArg<int>("-i", "interval", "interval in milliseconds to write data", 100);
conf_opt.SetDesc("usage: %0 <data_file>");
conf_opt.SetDesc('h', "host address of the ET system, default \"localhost\".");
conf_opt.SetDesc('p', "port to connect, default 11111.");
conf_opt.SetDesc('f', "memory mapped et file, default \"/tmp/et_feeder\".");
conf_opt.SetDesc('i', "interval in milliseconds to write data, default \"10\".");
conf_opt.SetDesc('a', "help message.");
if (!conf_opt.ParseArgs(argc, argv) || conf_opt.NbofArgs() != 1) {
std::cout << conf_opt.GetInstruction() << std::endl;
return -1;
}
std::string host = "localhost"; auto args = arg_parser.ParseArgs(argc, argv);
int port = 11111;
std::string etf = "/tmp/et_feeder";
int interval = 10;
for (auto &opt : conf_opt.GetOptions()) {
switch (opt.mark) {
case 'h':
host = opt.var.String();
break;
case 'p':
port = opt.var.Int();
break;
case 'f':
etf = opt.var.String();
break;
case 'i':
interval = opt.var.Int();
break;
default :
std::cout << conf_opt.GetInstruction() << std::endl;
return -1;
}
}
et_sys_id et_id; et_sys_id et_id;
et_att_id att_id; et_att_id att_id;
...@@ -73,14 +42,16 @@ int main(int argc, char* argv[]) try ...@@ -73,14 +42,16 @@ int main(int argc, char* argv[]) try
// open ET system // open ET system
et_wrap::OpenConfig conf; et_wrap::OpenConfig conf;
conf.set_cast(ET_DIRECT); conf.set_cast(ET_DIRECT);
conf.set_host(host.c_str()); conf.set_host(args["host"].c_str());
conf.set_serverport(port); conf.set_serverport(args["port"].Int());
char fname[256]; char *fname = strdup(args["et_file"].c_str());
strncpy(fname, etf.c_str(), 256);
auto status = et_open(&et_id, fname, conf.configure().get()); auto status = et_open(&et_id, fname, conf.configure().get());
free(fname);
if (status != ET_OK) { if (status != ET_OK) {
std::cerr << "Cannot open ET at " << etf << std::endl; std::cerr << "Cannot open ET at " << args["host"] << ":" << args["port"] << " with "
<< args["et_file"] << std::endl;
return -1; return -1;
} }
...@@ -90,11 +61,11 @@ int main(int argc, char* argv[]) try ...@@ -90,11 +61,11 @@ int main(int argc, char* argv[]) try
std::cerr << "Failed to attach to the ET Grand Central Station." << std::endl; std::cerr << "Failed to attach to the ET Grand Central Station." << std::endl;
return -1; return -1;
} }
// evio file reader // evio file reader
Decoder::THaCodaFile file; evc::EvChannel chan;
file.codaOpen(conf_opt.GetArgument(0).c_str()); if (chan.Open(args["evio_file"].String()) != evc::status::success) {
if (!file.isOpen()) { std::cerr << "Failed to open coda file \"" << args["evio_file"] << "\"." << std::endl;
std::cerr << "Failed to open coda file \"" << conf_opt.GetArgument(0) << "\"." << std::endl;
return -1; return -1;
} }
...@@ -102,20 +73,29 @@ int main(int argc, char* argv[]) try ...@@ -102,20 +73,29 @@ int main(int argc, char* argv[]) try
std::signal(SIGINT, signal_handler); std::signal(SIGINT, signal_handler);
int count = 0; int count = 0;
et_event *ev; et_event *ev;
while ((file.codaRead() == CODA_OK) && et_alive(et_id)) { while ((chan.Read() == evc::status::success) && et_alive(et_id)) {
if (gSignalStatus == SIGINT) { if (gSignalStatus == SIGINT) {
std::cout << "Received control-C, exiting..." << std::endl; std::cout << "Received control-C, exiting..." << std::endl;
break; break;
} }
system_clock::time_point start(system_clock::now()); system_clock::time_point start(system_clock::now());
system_clock::time_point next(start + std::chrono::milliseconds(interval)); system_clock::time_point next(start + std::chrono::milliseconds(args["interval"].Int()));
if (++count % PROGRESS_COUNT == 0) { if (++count % PROGRESS_COUNT == 0) {
std::cout << "Read and feed " << count << " events to ET, rate is 1 event per " std::cout << "Read and fed " << count << " events to ET.\r" << std::flush;
<< interval << " ms.\r" << std::flush;
} }
uint32_t *buff = static_cast<uint32_t*>(file.getEvBuffer());
size_t nbytes = (buff[0] + 1)*sizeof(uint32_t); uint32_t *buf = chan.GetRawBuffer();
size_t nbytes = (buf[0] + 1)*sizeof(uint32_t);
/*
std::cout << std::hex << std::setfill('0');
for (size_t i = 0; i < buf[0] + 1; ++i) {
std::cout << "0x" << std::setw(8) << buf[i] << "\n";
}
std::cout << std::dec;
*/
status = et_event_new(et_id, att_id, &ev, ET_SLEEP, nullptr, nbytes); status = et_event_new(et_id, att_id, &ev, ET_SLEEP, nullptr, nbytes);
if (status != ET_OK) { if (status != ET_OK) {
std::cerr << "Failed to add new event to the ET system." << std::endl; std::cerr << "Failed to add new event to the ET system." << std::endl;
...@@ -124,7 +104,7 @@ int main(int argc, char* argv[]) try ...@@ -124,7 +104,7 @@ int main(int argc, char* argv[]) try
// build et event // build et event
void *data; void *data;
et_event_getdata(ev, &data); et_event_getdata(ev, &data);
memcpy((void *) data, (const void *)buff, nbytes); memcpy((void *) data, (const void *)buf, nbytes);
et_event_setlength(ev, nbytes); et_event_setlength(ev, nbytes);
// put back the event // put back the event
...@@ -136,13 +116,10 @@ int main(int argc, char* argv[]) try ...@@ -136,13 +116,10 @@ int main(int argc, char* argv[]) try
std::this_thread::sleep_until(next); std::this_thread::sleep_until(next);
} }
std::cout << "Read and feed " << count << " events to ET, rate is 1 event per " std::cout << "Read and fed " << count << " events to ET" << std::endl;
<< interval << " ms." << std::endl;
file.codaClose(); chan.Close();
return 0; return 0;
} catch (...) {
std::cerr << "?unknown exception" << std::endl;
} }
/* A program to test the consistency between Evchannel and Etchannel
* One should start this program first, connecting to an ET system (et_start with JLab ET library)
* And then use the et_feeder to feed the same evio file to the same ET system
* The intervals should be set large enough so that the two channels are synchronized
*/
#include "ConfigArgs.h"
#include "EtConfigWrapper.h"
#include "EvChannel.h"
#include "EtChannel.h"
#include <csignal>
#include <thread>
#include <chrono>
#include <iostream>
#include <iomanip>
#define PROGRESS_COUNT 100
using namespace std::chrono;
volatile std::sig_atomic_t gSignalStatus;
void signal_handler(int signal) {
gSignalStatus = signal;
}
int main(int argc, char* argv[])
{
// setup input arguments
ConfigArgs arg_parser;
arg_parser.AddHelp("--help");
arg_parser.AddPositional("evio_file", "input evio file");
arg_parser.AddArg<std::string>("-h", "host", "host address of the ET system", "localhost");
arg_parser.AddArg<int>("-p", "port", "port to connect ET system", 11111);
arg_parser.AddArg<std::string>("-f", "et_file", "path to the memory mapped et file", "/tmp/et_feeder");
arg_parser.AddArg<int>("-i", "interval", "interval in milliseconds to write data", 100);
auto args = arg_parser.ParseArgs(argc, argv);
// et channel reader
evc::EtChannel et_chan;
if (et_chan.Connect(args["host"].String(), args["port"].Int(), args["et_file"].String()) != evc::status::success ||
et_chan.Open("MONITOR") != evc::status::success) {
std::cerr << "Failed to open ET channel" << std::endl;
return -1;
}
// evio file reader
evc::EvChannel chan;
if (chan.Open(args["evio_file"].String()) != evc::status::success) {
std::cerr << "Failed to open coda file \"" << args["evio_file"] << "\"." << std::endl;
return -1;
}
// install signal handler
std::signal(SIGINT, signal_handler);
int count = 0;
bool loop = true;
while (loop) {
if (gSignalStatus == SIGINT) {
std::cout << "Received control-C, exiting..." << std::endl;
break;
}
switch (et_chan.Read()) {
case evc::status::success:
break;
case evc::status::empty:
std::this_thread::sleep_for(std::chrono::seconds(1));
continue;
default:
loop = false;
continue;
}
chan.Read();
system_clock::time_point start(system_clock::now());
system_clock::time_point next(start + std::chrono::milliseconds(args["interval"].Int()));
if (++count % PROGRESS_COUNT == 0) {
std::cout << "Received " << count << " events to ET.\r" << std::flush;
}
auto et_buf = et_chan.GetRawBufferVec();
auto ev_buf = chan.GetRawBuffer();
std::cout << "New Event: " << et_buf.size() << ", " << et_buf[0] + 1 << std::endl;
std::cout << std::hex << std::setfill('0');
for (size_t i = 0; i < et_buf.size(); ++i) {
std::cout << "0x" << std::setw(8) << et_buf[i] << ", "
<< "0x" << std::setw(8) << ev_buf[i] << "\n";
}
std::cout << std::dec;
std::this_thread::sleep_until(next);
}
std::cout << "Received " << count << " events to ET" << std::endl;
et_chan.Close();
return 0;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment