Skip to content
Snippets Groups Projects
Commit 138fad3d authored by Chao Peng's avatar Chao Peng
Browse files

add an online version for the analyzer

parent 4cb22520
Branches
No related tags found
No related merge requests found
///////////////////////////////////////////////////////////////////////////////
// Class ETChannel //
// Read event from ET instead of CODA file //
// //
// Developer: //
// Chao Peng //
// 11/22/2019 //
///////////////////////////////////////////////////////////////////////////////
#include "ETConfigWrapper.h"
#include <iostream>
#include <chrono>
#include <string>
#include <thread>
#include <limits>
#include <set>
#define ID_NULL -9999
class ETChannel
{
public:
/* 32 bit event header structure
* --------------------------------
* | length |
* --------------------------------
* | etype | dtype | num |
* --------------------------------
*/
struct CodaEvHeader
{
uint32_t length;
uint8_t num, dtype;
uint16_t etype;
};
enum StatusCode {
READ_ERROR = -1,
READ_OK = 0,
READ_EOF = 1,
READ_EMPTY = 2,
READ_WARNING = 3,
};
enum EvType {
CODA_PRST = 0xffd1,
CODA_GO = 0xffd2,
CODA_END = 0xffd4,
CODA_PHY1 = 0xff50,
CODA_PHY2 = 0xff70,
};
private:
std::chrono::milliseconds interval;
uint32_t max_tries;
bool coda_end, end_with_coda;
et_sys_id et_id;
et_stat_id stat_id;
et_att_id att_id;
int chunk;
size_t idx;
std::vector<std::vector<uint32_t>> events;
std::vector<std::function<bool(CodaEvHeader *)>> filters;
public:
ETChannel(int32_t itvl = 1000, int32_t mtries = 10, bool endwcoda = true)
: interval(std::chrono::milliseconds(itvl)), max_tries(mtries), coda_end(false),
end_with_coda(endwcoda), et_id(nullptr), stat_id(ID_NULL), att_id(ID_NULL), chunk(200), idx(0)
{
// place holder
}
bool IsOpen() const { return IsETOpen() && (att_id != ID_NULL); }
bool IsETOpen() const { return (et_id != nullptr) && et_alive(et_id); }
void AddEvFilter(std::function<bool(CodaEvHeader *)> &&func) { filters.emplace_back(func); }
// Connect a ET system and create a monitor station with pre-settings
int32_t Connect(const char *ip_addr, int port, const char *et_file, const char *s_name,
int chunk_size = 200, int prescale = 1)
{
// open et system
et_wrap::OpenConfig conf;
// use a direct connection to the ET system
conf.set_cast(ET_DIRECT);
conf.set_host(ip_addr);
conf.set_serverport(port);
auto status = OpenET(et_file, conf);
if (status < ET_OK) {
std::cerr << "Failed to open ET system, abort connection." << std::endl;
return status;
}
std::cout << "Connect to ET system at " << ip_addr << ":" << port << std::endl;
// create a station
et_wrap::StationConfig sconf;
sconf.set_user(ET_STATION_USER_MULTI);
sconf.set_restore(ET_STATION_RESTORE_OUT);
sconf.set_prescale(prescale);
// maximum events
chunk = chunk_size;
sconf.set_cue(chunk);
sconf.set_select(ET_STATION_SELECT_ALL);
sconf.set_block(ET_STATION_NONBLOCKING);
status = CreateStation(s_name, sconf);
if (status < ET_OK) {
std::cerr << "Failed to create a station, abort connection." << std::endl;
return status;
}
std::cout << "Create a station " << s_name << " at " << stat_id << std::endl;
return ET_OK;
}
// Open ET from configuration
int32_t OpenET(const char *et_file, et_wrap::OpenConfig conf)
{
if (IsETOpen()) {
std::cout << "ET system is already opened, close it before re-open" << std::endl;
return ID_NULL;
}
char fname[256];
strncpy(fname, et_file, 256);
return et_open(&et_id, fname, conf.configure().get());
}
// Close ET
int32_t CloseET()
{
if (IsETOpen()) {
et_forcedclose(et_id);
et_id = nullptr;
return 1;
}
return 0;
}
// create a station from configuration
int32_t CreateStation(const char *station_name, et_wrap::StationConfig conf)
{
if (!IsETOpen()) {
std::cout << "ET System is not opened, abort creating a station." << std::endl;
return ID_NULL;
} else if (stat_id != ID_NULL) {
std::cout << "A station has alreadly been created, remove it before re-create a station." << std::endl;
return ID_NULL;
}
char sname[256];
strncpy(sname, station_name, 256);
return et_station_create(et_id, &stat_id, sname, conf.configure().get());
}
// remove the station from et system
int32_t RemoveStation()
{
if (IsETOpen() && (stat_id != ID_NULL)) {
auto status = et_station_remove(et_id, stat_id);
if (status == ET_OK) {
stat_id = ID_NULL;
return status;
}
}
return ID_NULL;
}
// Open ET channel (Attach to the station)
int32_t Open()
{
if (Attach(stat_id) == ET_OK) {
return ET_OK;
}
return READ_ERROR;
}
// Close ET channel (Detach from the station)
int32_t Close()
{
if (Detach(stat_id) == ET_OK) {
return ET_OK;
}
return READ_ERROR;
}
// Attach to a station with station id
int32_t Attach(et_stat_id sid)
{
if (!IsETOpen()) {
std::cout << "ET System is not opened, abort attaching to a station." << std::endl;
return ID_NULL;
}
return et_station_attach(et_id, sid, &att_id);
}
// Detach the station
int32_t Detach(et_stat_id sid)
{
if (IsOpen()) {
auto status = et_station_detach(et_id, att_id);
if (status == ET_OK) {
att_id = ID_NULL;
}
return status;
}
return ID_NULL;
}
// helper function
template<class Func, class... Args>
inline bool ev_filter(Func beg, Func end, Args&&... args)
{
for (auto it = beg; it != end; ++it) {
if (!(*it)(args...)) return false;
}
return true;
}
// copy event to the buffer
bool copyEvent(et_event **pe, int nread)
{
if (nread <= 0) {
return false;
}
void *data;
size_t len, bytes = sizeof(uint32_t);
int endian, swap;
for (int i = 0; i < nread; ++i) {
// get event data and attributes from ET
et_event_getdata(pe[i], &data);
et_event_getlength(pe[i], &len);
et_event_getendian(pe[i], &endian);
et_event_needtoswap(pe[i], &swap);
// size of the buffer
len = len/bytes + ((len % bytes) ? 1 : 0);
// check evio header
if (len < 2) {
continue;
}
auto header = static_cast<CodaEvHeader*>(data);
// coda end event
if (header->etype == CODA_END) {
coda_end = true;
}
if (!ev_filter(filters.begin(), filters.end(), header)) {
continue;
}
// good event, copy it!
std::vector<uint32_t> event;
event.resize(len);
auto dbuf = static_cast<uint32_t*>(data);
for (size_t j = 0; j < len; ++j) {
event[j] = (swap) ? ET_SWAP32(dbuf[j]) : dbuf[j];
}
events.emplace_back(event);
}
return !events.empty();
}
// read event from et channel
int32_t ReadEvent()
{
int32_t read = READ_OK;
// got some events from file, start to read them
if ((idx == std::numeric_limits<size_t>::max()) && events.size()) {
idx = 0;
return read;
// still have events in the buffer, no need to read ET
} else if (++idx < events.size()) {
return read;
}
// no events in the buffer, initialize
idx = 0;
events.clear();
if (coda_end && end_with_coda) {
coda_end = false;
std::cout << "Received CODA END event, stop reading." << std::endl;
return READ_EOF;
}
// read some events from ET
if (!IsOpen()) {
std::cout << "Did not connect to an ET system, abort reading." << std::endl;
return READ_ERROR;
}
uint32_t ntries = 0;
std::chrono::time_point<std::chrono::system_clock> next = std::chrono::system_clock::now() + interval;
int nread = 0;
et_event *pe[chunk];
// try chunk reading
while (ntries++ < max_tries) {
auto status = et_events_get(et_id, att_id, pe, ET_ASYNC, nullptr, chunk, &nread);
switch (status) {
// copy and put them back
case ET_OK:
// copy event to buffer
if (!copyEvent(pe, nread)) {
read = READ_EMPTY;
}
// put back the event
if (et_events_put(et_id, att_id, pe, nread) != ET_OK) {
std::cout << "Failed to put back et_event to ET system, abort monitoring." << std::endl;
read = READ_EOF;
}
return read;
// errors that we can tolerate
case ET_ERROR_BUSY:
case ET_ERROR_TIMEOUT:
case ET_ERROR_WAKEUP:
std::cout << et_wrap::get_error_str(status) << std::endl;
case ET_ERROR_EMPTY:
break;
// fatal errors
default:
std::cout << et_wrap::get_error_str(status) << std::endl;
return READ_ERROR;
}
std::this_thread::sleep_until(next);
next += interval;
}
std::cout << "Tried " << max_tries << " times and no event obtained from ET." << std::endl;
return READ_EMPTY;
}
// get the pointer to the current event buffer
const uint32_t *GetEvBuffer() const
{
if (idx < events.size()) {
return &events[idx][0];
}
return nullptr;
}
// get the buffer length of the current event
uint32_t GetEvLength() const
{
if (idx < events.size()) {
return events[idx].size();
}
return 0;
}
};
#ifndef ET_CONFIG_WRAPPER_H
#define ET_CONFIG_WRAPPER_H
#include <memory>
#include <string>
#include <iostream>
#include <unordered_set>
#include "et.h"
#define ET_VERSION 16
#define SET_BIT(n, i) ( (n) |= (1ULL << i) )
#define CLEAR_BIT(n, i) ( (n) &= ~(1ULL << i) )
#define TEST_BIT(n, i) ( (bool)( n & (1ULL << i) ) )
#define ETCONF_ADD_MEMBER(type, var, flag) \
public:\
void set_##var(type val) { var = val; SET_BIT(flag, static_cast<uint32_t>(Flag::var)); } \
type get_##var() const { return var; } \
private:\
type var;
namespace et_wrap {
static std::string get_error_str(int error)
{
switch(error) {
case ET_ERROR: return "General error.";
case ET_ERROR_TOOMANY: return "Too many somethings (stations, attachments, temp events, ET system responses) exist.";
case ET_ERROR_EXISTS: return "ET system file or station already exists.";
case ET_ERROR_WAKEUP: return "Sleeping routine woken up by et_wakeup_attachment() or et_wakeup_all().";
case ET_ERROR_TIMEOUT: return "Timed out.";
case ET_ERROR_EMPTY: return "No events available in async mode.";
case ET_ERROR_BUSY: return "Resource is busy.";
case ET_ERROR_DEAD: return "ET system is dead.";
case ET_ERROR_READ: return "Network read error.";
case ET_ERROR_WRITE: return "Network write error,";
case ET_ERROR_REMOTE: return "Cannot allocate memory in remote client.";
#if ET_VERSION >= 14
case ET_ERROR_TOOBIG: return "Client is 32 bits & server is 64 (or vice versa) and event is too big for one.";
case ET_ERROR_NOMEM: return "Cannot allocate memory.";
case ET_ERROR_BADARG: return "Bad argument given to function.";
case ET_ERROR_SOCKET: return "Socket option could not be set.";
case ET_ERROR_NETWORK: return "Host name or address could not be resolved, or cannot connect.";
case ET_ERROR_CLOSED: return "ET system has been closed by client.";
case ET_ERROR_JAVASYS: return "C code trying to open Java-based ET system file locally.";
#endif
default: break;
}
return "Unknown error";
}
class OpenConfig
{
#define OPEN_CONFIG_SET(flag, ptr, var) \
if (TEST_BIT(flag, static_cast<uint32_t>(Flag::var))) { et_open_config_set##var(ptr, get_##var()); }
public:
enum class Flag : uint32_t {
wait = 0,
cast,
TTL,
mode,
debugdefault,
port,
serverport,
policy,
tcp,
timeout,
host,
interface
};
struct tcp_setting {
int rbuf_size, sbuf_size, no_delay;
};
public:
// initialize
OpenConfig() { flag = 0; }
// set configuration and return a smart pointer
std::shared_ptr<void> configure() const
{
void *ptr;
et_open_config_init(&ptr);
OPEN_CONFIG_SET(flag, ptr, wait);
OPEN_CONFIG_SET(flag, ptr, cast);
OPEN_CONFIG_SET(flag, ptr, mode);
OPEN_CONFIG_SET(flag, ptr, port);
OPEN_CONFIG_SET(flag, ptr, serverport);
OPEN_CONFIG_SET(flag, ptr, timeout);
if (TEST_BIT(flag, static_cast<uint32_t>(Flag::host))) {
char temp[1024];
strncpy(temp, host.c_str(), 1024);
et_open_config_sethost(ptr, temp);
}
#if ET_VERSION >= 14
OPEN_CONFIG_SET(flag, ptr, debugdefault);
OPEN_CONFIG_SET(flag, ptr, TTL);
OPEN_CONFIG_SET(flag, ptr, policy);
if (TEST_BIT(flag, static_cast<uint32_t>(Flag::interface))) {
et_open_config_setinterface(ptr, interface.c_str());
}
for (auto &c : multi_casts) {
et_open_config_addmulticast(ptr, c.c_str());
}
for (auto &c : broad_casts) {
et_open_config_addbroadcast(ptr, c.c_str());
}
if (TEST_BIT(flag, static_cast<uint32_t>(Flag::tcp))) {
et_open_config_settcp(ptr, tcp.rbuf_size, tcp.sbuf_size, tcp.no_delay);
}
#endif
return std::shared_ptr<void>(ptr, [] (void *p) { et_open_config_destroy(p); });
}
std::unordered_set<std::string> broad_casts, multi_casts;
private:
uint32_t flag;
ETCONF_ADD_MEMBER(int, wait, flag);
ETCONF_ADD_MEMBER(int, cast, flag);
ETCONF_ADD_MEMBER(int, TTL, flag);
ETCONF_ADD_MEMBER(int, mode, flag);
ETCONF_ADD_MEMBER(int, debugdefault, flag);
ETCONF_ADD_MEMBER(int, port, flag);
ETCONF_ADD_MEMBER(int, serverport, flag);
ETCONF_ADD_MEMBER(int, policy, flag);
ETCONF_ADD_MEMBER(struct tcp_setting, tcp, flag);
ETCONF_ADD_MEMBER(struct timespec, timeout, flag);
ETCONF_ADD_MEMBER(std::string, host, flag);
ETCONF_ADD_MEMBER(std::string, interface, flag);
};
class StationConfig
{
#define STATION_CONFIG_SET(flag, ptr, var) \
if (TEST_BIT(flag, static_cast<uint32_t>(Flag::var))) { et_station_config_set##var(ptr, get_##var()); }
public:
enum class Flag : uint32_t {
block = 0,
flow,
select,
user,
restore,
cue,
prescale,
selectwords,
function,
lib,
myclass,
};
public:
// initialize
StationConfig() { flag = 0; }
// set configuration and return a smart pointer
std::shared_ptr<void> configure() const
{
void *ptr;
et_station_config_init(&ptr);
STATION_CONFIG_SET(flag, ptr, block);
STATION_CONFIG_SET(flag, ptr, select);
STATION_CONFIG_SET(flag, ptr, user);
STATION_CONFIG_SET(flag, ptr, restore);
STATION_CONFIG_SET(flag, ptr, cue);
STATION_CONFIG_SET(flag, ptr, prescale);
if (!selectwords.empty() && TEST_BIT(flag, static_cast<uint32_t>(Flag::selectwords))) {
// copy a vector to maintain the const behavior
auto words = selectwords;
et_station_config_setselectwords(ptr, &words[0]);
}
if (TEST_BIT(flag, static_cast<uint32_t>(Flag::function))) {
char temp[1024];
strncpy(temp, function.c_str(), 1024);
if (et_station_config_setfunction(ptr, temp) != ET_OK) {
std::cerr << "Could not set function \"" << function << "\" for station config." << std::endl;
}
}
if (TEST_BIT(flag, static_cast<uint32_t>(Flag::lib))) {
char temp[1024];
strncpy(temp, lib.c_str(), 1024);
if (et_station_config_setlib(ptr, temp) != ET_OK) {
std::cerr << "Could not set library \"" << lib << "\" for station config." << std::endl;
}
}
#if ET_VERSION >= 14
STATION_CONFIG_SET(flag, ptr, flow);
if (TEST_BIT(flag, static_cast<uint32_t>(Flag::myclass))) {
if (et_station_config_setclass(ptr, myclass.c_str()) != ET_OK) {
std::cerr << "Could not set class \"" << myclass << "\" for station config." << std::endl;
}
}
#endif
return std::shared_ptr<void>(ptr, [] (void *p) { et_station_config_destroy(p); });
}
std::unordered_set<std::string> broad_casts, multi_casts;
private:
uint32_t flag;
ETCONF_ADD_MEMBER(int, block, flag);
ETCONF_ADD_MEMBER(int, flow, flag);
ETCONF_ADD_MEMBER(int, select, flag);
ETCONF_ADD_MEMBER(int, user, flag);
ETCONF_ADD_MEMBER(int, restore, flag);
ETCONF_ADD_MEMBER(int, cue, flag);
ETCONF_ADD_MEMBER(int, prescale, flag);
ETCONF_ADD_MEMBER(std::vector<int>, selectwords, flag);
ETCONF_ADD_MEMBER(std::string, function, flag);
ETCONF_ADD_MEMBER(std::string, lib, flag);
ETCONF_ADD_MEMBER(std::string, myclass, flag);
};
}; // namespace et_wrapper
#endif // ET_CONFIG_WRAPPER_H
#include <iostream>
#include <iomanip>
#include <fstream>
#include <csignal>
#include "utils.h"
#include "ConfigParser.h"
#include "ConfigObject.h"
#include "ConfigOption.h"
#include "THaCodaFile.h"
#include "THaEvData.h"
#include "CodaDecoder.h"
#include "evio.h"
#include "TSpectrum.h"
#include "TTree.h"
#include "TFile.h"
#include "TH1.h"
#include "simpleLib.h"
#include "Fadc250Decoder.h"
#include "ETChannel.h"
#define FADC_BANK 3
#define PROGRESS_COUNT 100
volatile sig_atomic_t sig_caught = 0;
void handle_sig(int signum)
{
/* in case we registered this handler for multiple signals */
if (signum == SIGINT) {
sig_caught = 1;
}
if (signum == SIGTERM) {
sig_caught = 2;
}
if (signum == SIGABRT) {
sig_caught = 3;
}
}
void write_raw_data(const std::string &addr, int port, const std::string &etfile,
const std::string &opath, int nev, const std::vector<Module> &modules);
bool parseEvent(const uint32_t *buf, bool verbose);
uint32_t parseBlock(const uint32_t *buf);
uint32_t swap_endian32(uint32_t num)
{
uint32_t b0 = (num & 0x000000ff) << 24u;
uint32_t b1 = (num & 0x0000ff00) << 8u;
uint32_t b2 = (num & 0x00ff0000) >> 8u;
uint32_t b3 = (num & 0xff000000) >> 24u;
return b0 | b1 | b2 | b3;
}
// parse an evio event
bool parseEvent(const uint32_t *buf, bool verbose = false)
{
auto header = (ETChannel::CodaEvHeader*)buf;
const uint32_t buf_size = header->length + 1;
if (verbose) {
std::cout << "Event header: " << std::dec << buf_size << "\n"
<< (int) header->etype << ", " << (int) header->dtype << ", " << (int) header->num
<< std::endl;
std::cout << std::hex;
for (uint32_t i = 0; i < 2; ++i) {
std::cout << "0x" << std::setw(8) << std::setfill('0') << buf[i] << "\n";
}
}
switch(header->etype) {
case ETChannel::CODA_PHY1:
case ETChannel::CODA_PHY2:
break;
case ETChannel::CODA_PRST:
case ETChannel::CODA_GO:
case ETChannel::CODA_END:
default:
return false;
}
simpleScan((volatile uint32_t*)buf, buf_size);
return true;
/*
// parse ROC data
uint32_t index = 2;
while(index < buf_size) {
// skip header size and data size 2 + (length - 1)
index += parseBlock(&buf[index]);
}
// return parsed event type
return buf_size;
*/
}
uint32_t parseBlock(const uint32_t *buf)
{
auto header = (ETChannel::CodaEvHeader*) buf;
const uint32_t buf_size = header->length + 1;
std::cout << "ROC header: " << std::dec << buf_size << "\n"
<< (int) header->etype << ", " << (int) header->dtype << ", " << (int) header->num
<< std::endl;
std::cout << std::hex;
for (uint32_t i = 0; i < buf_size; ++i) {
std::cout << "0x" << std::setw(8) << std::setfill('0') << buf[i] << "\n";
}
return buf_size;
}
int main(int argc, char* argv[])
{
// setup input arguments
ConfigOption conf_opt;
conf_opt.AddOpts(ConfigOption::help_message, 'h', "help");
conf_opt.AddOpt(ConfigOption::arg_require, 'n');
conf_opt.AddOpt(ConfigOption::arg_require, 'p');
conf_opt.AddOpt(ConfigOption::arg_require, 'f');
conf_opt.AddLongOpt(ConfigOption::arg_require, "config-module", 'm');
conf_opt.AddLongOpt(ConfigOption::arg_require, "host", 'o');
conf_opt.SetDesc("usage: %0 <data_file> <out_file>");
conf_opt.SetDesc('n', "number of events to process (< 0 means all).");
conf_opt.SetDesc('m', "configuration file for modules to be read-in, default \"config/esb_module.conf\".");
conf_opt.SetDesc('p', "port to et system, default 11111.");
conf_opt.SetDesc('f', "path to et file, default \"/tmp/et_sys\"");
conf_opt.SetDesc('o', "host address of et syste, default localhost");
if (!conf_opt.ParseArgs(argc, argv) || conf_opt.NbofArgs() != 1) {
std::cout << conf_opt.GetInstruction() << std::endl;
return -1;
}
int nev = -1;
std::string mconf = "config/esb_module.conf";
std::string host = "localhost";
int port = 11111;
std::string etfile = "/tmp/et_sys";
for (auto &opt : conf_opt.GetOptions()) {
switch (opt.mark) {
case 'n':
nev = opt.var.Int();
break;
case 'm':
mconf = opt.var.String();
break;
case 'o':
host = opt.var.String();
break;
case 'p':
port = opt.var.Int();
break;
case 'f':
etfile = opt.var.String();
break;
default :
std::cout << conf_opt.GetInstruction() << std::endl;
return -1;
}
}
auto modules = read_modules(mconf);
write_raw_data(host, port, etfile, conf_opt.GetArgument(0), nev, modules);
}
#define MAX_NPEAKS 20
#define MAX_RAW 300
struct BranchData
{
int npul, nraw;
float integral[MAX_NPEAKS], peak[MAX_NPEAKS], time[MAX_NPEAKS];
int raw[MAX_RAW];
float ped_mean, ped_err;
};
#define BUF_SIZE 1000
static double buffer[BUF_SIZE], wfbuf[BUF_SIZE], bkbuf[BUF_SIZE];
void refine_pedestal(const std::vector<uint32_t> &raw, float &ped_mean, float &ped_err)
{
int count = 0;
float mean = 0.;
for (auto &val : raw) {
if (std::abs(val - ped_mean) < 2.0 * ped_err) {
buffer[count] = val;
mean += val;
count ++;
}
}
if (count == 0) {
return;
}
mean /= count;
float change = std::abs(ped_mean - mean);
// no outliers
if (change < 0.05 * ped_err) {
return;
}
// recursively refine
float err = 0.;
for (int i = 0; i < count; ++i) {
err += (buffer[i] - mean)*(buffer[i] - mean);
}
ped_mean = mean;
ped_err = std::sqrt(err/count);
refine_pedestal(raw, ped_mean, ped_err);
}
// a function for a simple constant baseline fit
void find_pedestal(const std::vector<uint32_t> &raw, float &ped_mean, float &ped_err)
{
ped_mean = 0;
ped_err = 0;
for (auto &val : raw) {
ped_mean += val;
}
ped_mean /= raw.size();
for (auto &val : raw) {
ped_err += (val - ped_mean)*(val - ped_mean);
}
ped_err = std::sqrt(ped_err/raw.size());
refine_pedestal(raw, ped_mean, ped_err);
}
// analyze the waveform data and fill the result in a branch data
void waveform_analysis(const std::vector<uint32_t> &raw, BranchData &res)
{
// no need to analyze
if (raw.empty()) {
return;
}
// fill in the raw data
res.nraw = raw.size();
for (size_t i = 0; i < raw.size(); ++i) {
res.raw[i] = raw[i];
}
// get pedestals
find_pedestal(raw, res.ped_mean, res.ped_err);
// fill in spectrum buffer
for (size_t i = 0; i < raw.size(); ++i) {
wfbuf[i] = raw[i] - res.ped_mean;
}
// find peaks
TSpectrum s;
s.SetResolution(0.5);
int npeaks = s.SearchHighRes(wfbuf, bkbuf, res.nraw, 3.0, 20, false, 5, true, 3);
// fill branch data
double *pos = s.GetPositionX();
res.npul = 0;
for (int i = 0; i < npeaks; ++i) {
int j = pos[i];
if (wfbuf[j] < 5.*res.ped_err) { continue; }
res.time[res.npul] = j;
res.peak[res.npul] = wfbuf[j];
/*
res.integral[res.npul] = wfbuf[j];
j = pos[i] - 1;
while ( (j > 0) && (wfbuf[j] - wfbuf[j - 1])*wfbuf[j] > 0. ) {
res.integral[res.npul] += wfbuf[j--];
}
j = pos[i] + 1;
while ( (j < res.nraw - 1) && (wfbuf[j] - wfbuf[j + 1])*wfbuf[j] > 0. ) {
res.integral[res.npul] += wfbuf[j++];
}
*/
res.npul ++;
}
}
// fill decoded FADC250 data into the container (branch data)
void fill_branch(const Fadc250Data &slot_data, const Module &mod, std::unordered_map<std::string, BranchData> &brdata)
{
for (auto &event : slot_data.events) {
for (auto &ch : mod.channels) {
auto &channel = event.channels[ch.id];
auto &bd = brdata[ch.name];
bd.npul = channel.integral.size();
for (uint32_t i = 0; i < channel.integral.size(); ++i) {
bd.integral[i] = channel.integral[i];
bd.time[i] = channel.time[i];
}
waveform_analysis(channel.raw, bd);
}
}
}
// fill branch data into the root tree
void fill_tree(TTree *tree, std::unordered_map<std::string, BranchData> &brdata, bool &init, int mode)
{
if ((mode != 1) && (mode != 3)) {
std::cout << "Warning: unsupported mode " << mode << ", data won't be recorded." << std::endl;
return;
}
// initialize
if (!init) {
for (auto &it : brdata) {
auto n = it.first;
tree->Branch((n + "_Npulse").c_str(), &brdata[n].npul, (n + "_N/I").c_str());
tree->Branch((n + "_Pint").c_str(), &brdata[n].integral[0], (n + "_Pint[" + n + "_N]/F").c_str());
tree->Branch((n + "_Ptime").c_str(), &brdata[n].time[0], (n + "_Ptime[" + n + "_N]/F").c_str());
// raw waveform provides more information
if (mode == 1) {
tree->Branch((n + "_Ppeak").c_str(), &brdata[n].peak[0], (n + "_Ppeak[" + n + "_N]/F").c_str());
tree->Branch((n + "_Nraw").c_str(), &brdata[n].nraw, (n + "_Nraw/I").c_str());
tree->Branch((n + "_raw").c_str(), &brdata[n].raw[0], (n + "_raw[" + n + "_Nraw]/I").c_str());
tree->Branch((n + "_ped_mean").c_str(), &brdata[n].ped_mean, (n + "_ped_mean/F").c_str());
tree->Branch((n + "_ped_err").c_str(), &brdata[n].ped_err, (n + "_ped_err/F").c_str());
}
}
init = true;
std::cout << "Initialized root file for mode " << mode << " data." << std::endl;
}
tree->Fill();
}
void processEvent(const uint32_t *buf, int &count, TTree *tree, bool &init_tree, int &data_mode,
const std::vector<Module> &modules, std::unordered_map<std::string, BranchData> &brdata)
{
if (!parseEvent(buf)) {
return;
}
// get block level
int blvl = 1;
simpleGetRocBlockLevel(modules.front().crate, FADC_BANK, &blvl);
for (int ii = 0; ii < blvl; ++ii) {
// clear data buffer
for (auto &br : brdata) {
br.second.npul = 0;
br.second.nraw = 0;
}
// parse module data
for (auto &mod : modules) {
uint32_t header = 0;
auto status = simpleGetSlotBlockHeader(mod.crate, FADC_BANK, mod.slot, &header);
if (status <= 0) {
std::cout << "Error getting header for crate = "
<< mod.crate << ", slot = " << mod.slot << std::endl;
continue;
}
uint32_t *dbuf;
auto len = simpleGetSlotEventData(mod.crate, FADC_BANK, mod.slot, ii, &dbuf);
if (len <= 0) {
std::cout << "No data for crate = " << mod.crate << ", slot = " << mod.slot
<< ", block_level = " << ii << std::endl;
continue;
}
auto slot_data = fadc250_decoder(header, dbuf, len);
// check data mode
if (data_mode < 0 && slot_data.GetMode() > 0) {
data_mode = slot_data.GetMode();
}
// fill branch data
fill_branch(slot_data, mod, brdata);
}
// fill event
fill_tree(tree, brdata, init_tree, data_mode);
count ++;
}
}
void write_raw_data(const std::string &addr, int port, const std::string &etfile,
const std::string &opath, int nev, const std::vector<Module> &modules)
{
ETChannel et_chan(100, 100);
if (et_chan.Connect(addr.c_str(), port, etfile.c_str(), "TCD_MONITOR") != ET_OK) {
return;
}
auto *hfile = new TFile(opath.c_str(), "RECREATE", "MAPMT test results");
auto tree = new TTree("EvTree", "Cherenkov Test Events");
// set up branches and a container to be combined with the branches
std::unordered_map<std::string, BranchData> brdata;
for (auto &mod : modules) {
for (auto &ch : mod.channels) {
auto n = ch.name;
if (brdata.find(n) != brdata.end()) {
std::cout << "WARNING: Duplicated channel names found! -- " << n << std::endl;
}
brdata[n] = BranchData();
}
}
int count = 0;
bool init_tree = false;
int data_mode = -1;
et_chan.Open();
int status = et_chan.ReadEvent();
signal (SIGINT, handle_sig);
while (status != ETChannel::READ_ERROR && status != ETChannel::READ_EOF) {
if (sig_caught || (nev > 0 && nev < count)) { break; }
// read-in data
if (status == ETChannel::READ_EMPTY) {
std::cout << "ET station is empty, wating 5 secs..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5));
status = et_chan.ReadEvent();
continue;
}
if((count % PROGRESS_COUNT) == 0) {
std::cout << "Read and processed events - " << count << std::endl;
}
processEvent(et_chan.GetEvBuffer(), count, tree, init_tree, data_mode, modules, brdata);
status = et_chan.ReadEvent();
}
std::cout << "Read and processed events - " << count << std::endl;
hfile->Write();
hfile->Close();
}
File moved
#include "ConfigOption.h"
#include "ETConfigWrapper.h"
#include "et.h"
#include "THaCodaFile.h"
#include <csignal>
#include <thread>
#include <chrono>
#include <iostream>
#define PROGRESS_COUNT 100
using namespace std::chrono;
volatile std::sig_atomic_t gSignalStatus;
void signal_handler(int signal) {
gSignalStatus = signal;
}
int main(int argc, char* argv[]) try
{
// setup input arguments
ConfigOption conf_opt;
conf_opt.AddLongOpt(ConfigOption::help_message, "help", 'a');
conf_opt.AddOpt(ConfigOption::arg_require, 'h');
conf_opt.AddOpt(ConfigOption::arg_require, 'p');
conf_opt.AddOpt(ConfigOption::arg_require, 'f');
conf_opt.AddOpt(ConfigOption::arg_require, 'i');
conf_opt.SetDesc("usage: %0 <data_file>");
conf_opt.SetDesc('h', "host address of the ET system, default \"localhost\".");
conf_opt.SetDesc('p', "port to connect, default 11111.");
conf_opt.SetDesc('f', "memory mapped et file, default \"/tmp/et_feeder\".");
conf_opt.SetDesc('i', "interval in milliseconds to write data, default \"10\".");
conf_opt.SetDesc('a', "help message.");
if (!conf_opt.ParseArgs(argc, argv) || conf_opt.NbofArgs() != 1) {
std::cout << conf_opt.GetInstruction() << std::endl;
return -1;
}
std::string host = "localhost";
int port = 11111;
std::string etf = "/tmp/et_feeder";
int interval = 10;
for (auto &opt : conf_opt.GetOptions()) {
switch (opt.mark) {
case 'h':
host = opt.var.String();
break;
case 'p':
port = opt.var.Int();
break;
case 'f':
etf = opt.var.String();
break;
case 'i':
interval = opt.var.Int();
break;
default :
std::cout << conf_opt.GetInstruction() << std::endl;
return -1;
}
}
et_sys_id et_id;
et_att_id att_id;
// open ET system
et_wrap::OpenConfig conf;
conf.set_cast(ET_DIRECT);
conf.set_host(host.c_str());
conf.set_serverport(port);
char fname[256];
strncpy(fname, etf.c_str(), 256);
auto status = et_open(&et_id, fname, conf.configure().get());
if (status != ET_OK) {
std::cerr << "Cannot open ET at " << etf << std::endl;
return -1;
}
// attach to GRAND CENTRAL
status = et_station_attach(et_id, ET_GRANDCENTRAL, &att_id);
if (status != ET_OK) {
std::cerr << "Failed to attach to the ET Grand Central Station." << std::endl;
return -1;
}
// evio file reader
Decoder::THaCodaFile file;
file.codaOpen(conf_opt.GetArgument(0).c_str());
if (!file.isOpen()) {
std::cerr << "Failed to open coda file \"" << conf_opt.GetArgument(0) << "\"." << std::endl;
return -1;
}
// install signal handler
std::signal(SIGINT, signal_handler);
int count = 0;
et_event *ev;
while ((file.codaRead() == CODA_OK) && et_alive(et_id)) {
if (gSignalStatus == SIGINT) {
std::cout << "Received control-C, exiting..." << std::endl;
break;
}
system_clock::time_point start(system_clock::now());
system_clock::time_point next(start + std::chrono::milliseconds(interval));
if (++count % PROGRESS_COUNT == 0) {
std::cout << "Read and feed " << count << " events to ET, rate is 1 event per "
<< interval << " ms.\r" << std::flush;
}
uint32_t *buff = static_cast<uint32_t*>(file.getEvBuffer());
size_t nbytes = (buff[0] + 1)*sizeof(uint32_t);
status = et_event_new(et_id, att_id, &ev, ET_SLEEP, nullptr, nbytes);
if (status != ET_OK) {
std::cerr << "Failed to add new event to the ET system." << std::endl;
return -1;
}
// build et event
void *data;
et_event_getdata(ev, &data);
memcpy((void *) data, (const void *)buff, nbytes);
et_event_setlength(ev, nbytes);
// put back the event
status = et_event_put(et_id, att_id, ev);
if (status != ET_OK) {
std::cerr << "Failed to put event back to the ET system." << std::endl;
return -1;
}
std::this_thread::sleep_until(next);
}
std::cout << "Read and feed " << count << " events to ET, rate is 1 event per "
<< interval << " ms." << std::endl;
file.codaClose();
return 0;
} catch (...) {
std::cerr << "?unknown exception" << std::endl;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment