diff --git a/include/ETChannel.h b/include/ETChannel.h new file mode 100644 index 0000000000000000000000000000000000000000..f06110c9ffca6232168261b2aa3840d0fe99980e --- /dev/null +++ b/include/ETChannel.h @@ -0,0 +1,357 @@ +/////////////////////////////////////////////////////////////////////////////// +// Class ETChannel // +// Read event from ET instead of CODA file // +// // +// Developer: // +// Chao Peng // +// 11/22/2019 // +/////////////////////////////////////////////////////////////////////////////// + +#include "ETConfigWrapper.h" +#include <iostream> +#include <chrono> +#include <string> +#include <thread> +#include <limits> +#include <set> + + +#define ID_NULL -9999 + +class ETChannel +{ +public: + /* 32 bit event header structure + * -------------------------------- + * | length | + * -------------------------------- + * | etype | dtype | num | + * -------------------------------- + */ + struct CodaEvHeader + { + uint32_t length; + uint8_t num, dtype; + uint16_t etype; + }; + + enum StatusCode { + READ_ERROR = -1, + READ_OK = 0, + READ_EOF = 1, + READ_EMPTY = 2, + READ_WARNING = 3, + }; + + enum EvType { + CODA_PRST = 0xffd1, + CODA_GO = 0xffd2, + CODA_END = 0xffd4, + CODA_PHY1 = 0xff50, + CODA_PHY2 = 0xff70, + }; + +private: + std::chrono::milliseconds interval; + uint32_t max_tries; + bool coda_end, end_with_coda; + et_sys_id et_id; + et_stat_id stat_id; + et_att_id att_id; + int chunk; + size_t idx; + std::vector<std::vector<uint32_t>> events; + std::vector<std::function<bool(CodaEvHeader *)>> filters; + + +public: + ETChannel(int32_t itvl = 1000, int32_t mtries = 10, bool endwcoda = true) + : interval(std::chrono::milliseconds(itvl)), max_tries(mtries), coda_end(false), + end_with_coda(endwcoda), et_id(nullptr), stat_id(ID_NULL), att_id(ID_NULL), chunk(200), idx(0) + { + // place holder + } + + bool IsOpen() const { return IsETOpen() && (att_id != ID_NULL); } + bool IsETOpen() const { return (et_id != nullptr) && et_alive(et_id); } + void AddEvFilter(std::function<bool(CodaEvHeader *)> &&func) { filters.emplace_back(func); } + + // Connect a ET system and create a monitor station with pre-settings + int32_t Connect(const char *ip_addr, int port, const char *et_file, const char *s_name, + int chunk_size = 200, int prescale = 1) + { + // open et system + et_wrap::OpenConfig conf; + // use a direct connection to the ET system + conf.set_cast(ET_DIRECT); + conf.set_host(ip_addr); + conf.set_serverport(port); + + auto status = OpenET(et_file, conf); + if (status < ET_OK) { + std::cerr << "Failed to open ET system, abort connection." << std::endl; + return status; + } + std::cout << "Connect to ET system at " << ip_addr << ":" << port << std::endl; + + // create a station + et_wrap::StationConfig sconf; + sconf.set_user(ET_STATION_USER_MULTI); + sconf.set_restore(ET_STATION_RESTORE_OUT); + sconf.set_prescale(prescale); + // maximum events + chunk = chunk_size; + sconf.set_cue(chunk); + sconf.set_select(ET_STATION_SELECT_ALL); + sconf.set_block(ET_STATION_NONBLOCKING); + + status = CreateStation(s_name, sconf); + if (status < ET_OK) { + std::cerr << "Failed to create a station, abort connection." << std::endl; + return status; + } + std::cout << "Create a station " << s_name << " at " << stat_id << std::endl; + + return ET_OK; + } + + // Open ET from configuration + int32_t OpenET(const char *et_file, et_wrap::OpenConfig conf) + { + if (IsETOpen()) { + std::cout << "ET system is already opened, close it before re-open" << std::endl; + return ID_NULL; + } + + char fname[256]; + strncpy(fname, et_file, 256); + return et_open(&et_id, fname, conf.configure().get()); + } + + // Close ET + int32_t CloseET() + { + if (IsETOpen()) { + et_forcedclose(et_id); + et_id = nullptr; + return 1; + } + return 0; + } + + // create a station from configuration + int32_t CreateStation(const char *station_name, et_wrap::StationConfig conf) + { + if (!IsETOpen()) { + std::cout << "ET System is not opened, abort creating a station." << std::endl; + return ID_NULL; + } else if (stat_id != ID_NULL) { + std::cout << "A station has alreadly been created, remove it before re-create a station." << std::endl; + return ID_NULL; + } + + char sname[256]; + strncpy(sname, station_name, 256); + return et_station_create(et_id, &stat_id, sname, conf.configure().get()); + } + + // remove the station from et system + int32_t RemoveStation() + { + if (IsETOpen() && (stat_id != ID_NULL)) { + auto status = et_station_remove(et_id, stat_id); + if (status == ET_OK) { + stat_id = ID_NULL; + return status; + } + } + return ID_NULL; + } + + // Open ET channel (Attach to the station) + int32_t Open() + { + if (Attach(stat_id) == ET_OK) { + return ET_OK; + } + + return READ_ERROR; + } + + // Close ET channel (Detach from the station) + int32_t Close() + { + if (Detach(stat_id) == ET_OK) { + return ET_OK; + } + + return READ_ERROR; + } + + // Attach to a station with station id + int32_t Attach(et_stat_id sid) + { + if (!IsETOpen()) { + std::cout << "ET System is not opened, abort attaching to a station." << std::endl; + return ID_NULL; + } + return et_station_attach(et_id, sid, &att_id); + } + + // Detach the station + int32_t Detach(et_stat_id sid) + { + if (IsOpen()) { + auto status = et_station_detach(et_id, att_id); + if (status == ET_OK) { + att_id = ID_NULL; + } + return status; + } + return ID_NULL; + } + + + // helper function + template<class Func, class... Args> + inline bool ev_filter(Func beg, Func end, Args&&... args) + { + for (auto it = beg; it != end; ++it) { + if (!(*it)(args...)) return false; + } + return true; + } + + // copy event to the buffer + bool copyEvent(et_event **pe, int nread) + { + if (nread <= 0) { + return false; + } + + void *data; + size_t len, bytes = sizeof(uint32_t); + int endian, swap; + + for (int i = 0; i < nread; ++i) { + // get event data and attributes from ET + et_event_getdata(pe[i], &data); + et_event_getlength(pe[i], &len); + et_event_getendian(pe[i], &endian); + et_event_needtoswap(pe[i], &swap); + + // size of the buffer + len = len/bytes + ((len % bytes) ? 1 : 0); + + // check evio header + if (len < 2) { + continue; + } + auto header = static_cast<CodaEvHeader*>(data); + // coda end event + if (header->etype == CODA_END) { + coda_end = true; + } + if (!ev_filter(filters.begin(), filters.end(), header)) { + continue; + } + // good event, copy it! + std::vector<uint32_t> event; + event.resize(len); + auto dbuf = static_cast<uint32_t*>(data); + for (size_t j = 0; j < len; ++j) { + event[j] = (swap) ? ET_SWAP32(dbuf[j]) : dbuf[j]; + } + events.emplace_back(event); + } + + return !events.empty(); + } + + // read event from et channel + int32_t ReadEvent() + { + int32_t read = READ_OK; + // got some events from file, start to read them + if ((idx == std::numeric_limits<size_t>::max()) && events.size()) { + idx = 0; + return read; + // still have events in the buffer, no need to read ET + } else if (++idx < events.size()) { + return read; + } + // no events in the buffer, initialize + idx = 0; + events.clear(); + + if (coda_end && end_with_coda) { + coda_end = false; + std::cout << "Received CODA END event, stop reading." << std::endl; + return READ_EOF; + } + + // read some events from ET + if (!IsOpen()) { + std::cout << "Did not connect to an ET system, abort reading." << std::endl; + return READ_ERROR; + } + + uint32_t ntries = 0; + std::chrono::time_point<std::chrono::system_clock> next = std::chrono::system_clock::now() + interval; + int nread = 0; + et_event *pe[chunk]; + + // try chunk reading + while (ntries++ < max_tries) { + auto status = et_events_get(et_id, att_id, pe, ET_ASYNC, nullptr, chunk, &nread); + switch (status) { + // copy and put them back + case ET_OK: + // copy event to buffer + if (!copyEvent(pe, nread)) { + read = READ_EMPTY; + } + // put back the event + if (et_events_put(et_id, att_id, pe, nread) != ET_OK) { + std::cout << "Failed to put back et_event to ET system, abort monitoring." << std::endl; + read = READ_EOF; + } + return read; + // errors that we can tolerate + case ET_ERROR_BUSY: + case ET_ERROR_TIMEOUT: + case ET_ERROR_WAKEUP: + std::cout << et_wrap::get_error_str(status) << std::endl; + case ET_ERROR_EMPTY: + break; + // fatal errors + default: + std::cout << et_wrap::get_error_str(status) << std::endl; + return READ_ERROR; + } + std::this_thread::sleep_until(next); + next += interval; + } + + std::cout << "Tried " << max_tries << " times and no event obtained from ET." << std::endl; + return READ_EMPTY; + } + + // get the pointer to the current event buffer + const uint32_t *GetEvBuffer() const + { + if (idx < events.size()) { + return &events[idx][0]; + } + return nullptr; + } + + // get the buffer length of the current event + uint32_t GetEvLength() const + { + if (idx < events.size()) { + return events[idx].size(); + } + return 0; + } +}; diff --git a/include/ETConfigWrapper.h b/include/ETConfigWrapper.h new file mode 100644 index 0000000000000000000000000000000000000000..3ad02f828300f580586df5e664980b775bf5ce19 --- /dev/null +++ b/include/ETConfigWrapper.h @@ -0,0 +1,232 @@ +#ifndef ET_CONFIG_WRAPPER_H +#define ET_CONFIG_WRAPPER_H + +#include <memory> +#include <string> +#include <iostream> +#include <unordered_set> +#include "et.h" + +#define ET_VERSION 16 + +#define SET_BIT(n, i) ( (n) |= (1ULL << i) ) +#define CLEAR_BIT(n, i) ( (n) &= ~(1ULL << i) ) +#define TEST_BIT(n, i) ( (bool)( n & (1ULL << i) ) ) + +#define ETCONF_ADD_MEMBER(type, var, flag) \ + public:\ + void set_##var(type val) { var = val; SET_BIT(flag, static_cast<uint32_t>(Flag::var)); } \ + type get_##var() const { return var; } \ + private:\ + type var; + +namespace et_wrap { + +static std::string get_error_str(int error) +{ + switch(error) { + case ET_ERROR: return "General error."; + case ET_ERROR_TOOMANY: return "Too many somethings (stations, attachments, temp events, ET system responses) exist."; + case ET_ERROR_EXISTS: return "ET system file or station already exists."; + case ET_ERROR_WAKEUP: return "Sleeping routine woken up by et_wakeup_attachment() or et_wakeup_all()."; + case ET_ERROR_TIMEOUT: return "Timed out."; + case ET_ERROR_EMPTY: return "No events available in async mode."; + case ET_ERROR_BUSY: return "Resource is busy."; + case ET_ERROR_DEAD: return "ET system is dead."; + case ET_ERROR_READ: return "Network read error."; + case ET_ERROR_WRITE: return "Network write error,"; + case ET_ERROR_REMOTE: return "Cannot allocate memory in remote client."; +#if ET_VERSION >= 14 + case ET_ERROR_TOOBIG: return "Client is 32 bits & server is 64 (or vice versa) and event is too big for one."; + case ET_ERROR_NOMEM: return "Cannot allocate memory."; + case ET_ERROR_BADARG: return "Bad argument given to function."; + case ET_ERROR_SOCKET: return "Socket option could not be set."; + case ET_ERROR_NETWORK: return "Host name or address could not be resolved, or cannot connect."; + case ET_ERROR_CLOSED: return "ET system has been closed by client."; + case ET_ERROR_JAVASYS: return "C code trying to open Java-based ET system file locally."; +#endif + default: break; + } + return "Unknown error"; +} + +class OpenConfig +{ +#define OPEN_CONFIG_SET(flag, ptr, var) \ + if (TEST_BIT(flag, static_cast<uint32_t>(Flag::var))) { et_open_config_set##var(ptr, get_##var()); } + +public: + enum class Flag : uint32_t { + wait = 0, + cast, + TTL, + mode, + debugdefault, + port, + serverport, + policy, + tcp, + timeout, + host, + interface + }; + + struct tcp_setting { + int rbuf_size, sbuf_size, no_delay; + }; + +public: + // initialize + OpenConfig() { flag = 0; } + + // set configuration and return a smart pointer + std::shared_ptr<void> configure() const + { + void *ptr; + et_open_config_init(&ptr); + + OPEN_CONFIG_SET(flag, ptr, wait); + OPEN_CONFIG_SET(flag, ptr, cast); + OPEN_CONFIG_SET(flag, ptr, mode); + OPEN_CONFIG_SET(flag, ptr, port); + OPEN_CONFIG_SET(flag, ptr, serverport); + OPEN_CONFIG_SET(flag, ptr, timeout); + + if (TEST_BIT(flag, static_cast<uint32_t>(Flag::host))) { + char temp[1024]; + strncpy(temp, host.c_str(), 1024); + et_open_config_sethost(ptr, temp); + } + +#if ET_VERSION >= 14 + OPEN_CONFIG_SET(flag, ptr, debugdefault); + OPEN_CONFIG_SET(flag, ptr, TTL); + OPEN_CONFIG_SET(flag, ptr, policy); + + if (TEST_BIT(flag, static_cast<uint32_t>(Flag::interface))) { + et_open_config_setinterface(ptr, interface.c_str()); + } + for (auto &c : multi_casts) { + et_open_config_addmulticast(ptr, c.c_str()); + } + for (auto &c : broad_casts) { + et_open_config_addbroadcast(ptr, c.c_str()); + } + if (TEST_BIT(flag, static_cast<uint32_t>(Flag::tcp))) { + et_open_config_settcp(ptr, tcp.rbuf_size, tcp.sbuf_size, tcp.no_delay); + } +#endif + + return std::shared_ptr<void>(ptr, [] (void *p) { et_open_config_destroy(p); }); + } + + std::unordered_set<std::string> broad_casts, multi_casts; + +private: + uint32_t flag; + + ETCONF_ADD_MEMBER(int, wait, flag); + ETCONF_ADD_MEMBER(int, cast, flag); + ETCONF_ADD_MEMBER(int, TTL, flag); + ETCONF_ADD_MEMBER(int, mode, flag); + ETCONF_ADD_MEMBER(int, debugdefault, flag); + ETCONF_ADD_MEMBER(int, port, flag); + ETCONF_ADD_MEMBER(int, serverport, flag); + ETCONF_ADD_MEMBER(int, policy, flag); + ETCONF_ADD_MEMBER(struct tcp_setting, tcp, flag); + ETCONF_ADD_MEMBER(struct timespec, timeout, flag); + ETCONF_ADD_MEMBER(std::string, host, flag); + ETCONF_ADD_MEMBER(std::string, interface, flag); +}; + +class StationConfig +{ +#define STATION_CONFIG_SET(flag, ptr, var) \ + if (TEST_BIT(flag, static_cast<uint32_t>(Flag::var))) { et_station_config_set##var(ptr, get_##var()); } + +public: + enum class Flag : uint32_t { + block = 0, + flow, + select, + user, + restore, + cue, + prescale, + selectwords, + function, + lib, + myclass, + }; + +public: + // initialize + StationConfig() { flag = 0; } + + // set configuration and return a smart pointer + std::shared_ptr<void> configure() const + { + void *ptr; + et_station_config_init(&ptr); + + STATION_CONFIG_SET(flag, ptr, block); + STATION_CONFIG_SET(flag, ptr, select); + STATION_CONFIG_SET(flag, ptr, user); + STATION_CONFIG_SET(flag, ptr, restore); + STATION_CONFIG_SET(flag, ptr, cue); + STATION_CONFIG_SET(flag, ptr, prescale); + + if (!selectwords.empty() && TEST_BIT(flag, static_cast<uint32_t>(Flag::selectwords))) { + // copy a vector to maintain the const behavior + auto words = selectwords; + et_station_config_setselectwords(ptr, &words[0]); + } + + if (TEST_BIT(flag, static_cast<uint32_t>(Flag::function))) { + char temp[1024]; + strncpy(temp, function.c_str(), 1024); + if (et_station_config_setfunction(ptr, temp) != ET_OK) { + std::cerr << "Could not set function \"" << function << "\" for station config." << std::endl; + } + } + if (TEST_BIT(flag, static_cast<uint32_t>(Flag::lib))) { + char temp[1024]; + strncpy(temp, lib.c_str(), 1024); + if (et_station_config_setlib(ptr, temp) != ET_OK) { + std::cerr << "Could not set library \"" << lib << "\" for station config." << std::endl; + } + } + +#if ET_VERSION >= 14 + STATION_CONFIG_SET(flag, ptr, flow); + if (TEST_BIT(flag, static_cast<uint32_t>(Flag::myclass))) { + if (et_station_config_setclass(ptr, myclass.c_str()) != ET_OK) { + std::cerr << "Could not set class \"" << myclass << "\" for station config." << std::endl; + } + } +#endif + + return std::shared_ptr<void>(ptr, [] (void *p) { et_station_config_destroy(p); }); + } + + std::unordered_set<std::string> broad_casts, multi_casts; + +private: + uint32_t flag; + + ETCONF_ADD_MEMBER(int, block, flag); + ETCONF_ADD_MEMBER(int, flow, flag); + ETCONF_ADD_MEMBER(int, select, flag); + ETCONF_ADD_MEMBER(int, user, flag); + ETCONF_ADD_MEMBER(int, restore, flag); + ETCONF_ADD_MEMBER(int, cue, flag); + ETCONF_ADD_MEMBER(int, prescale, flag); + ETCONF_ADD_MEMBER(std::vector<int>, selectwords, flag); + ETCONF_ADD_MEMBER(std::string, function, flag); + ETCONF_ADD_MEMBER(std::string, lib, flag); + ETCONF_ADD_MEMBER(std::string, myclass, flag); +}; + +}; // namespace et_wrapper +#endif // ET_CONFIG_WRAPPER_H + diff --git a/src/esb_analyze_online.cpp b/src/esb_analyze_online.cpp new file mode 100644 index 0000000000000000000000000000000000000000..9cafff5d092cdcf907b4c9c74102e416b33a32d0 --- /dev/null +++ b/src/esb_analyze_online.cpp @@ -0,0 +1,441 @@ +#include <iostream> +#include <iomanip> +#include <fstream> +#include <csignal> +#include "utils.h" +#include "ConfigParser.h" +#include "ConfigObject.h" +#include "ConfigOption.h" +#include "THaCodaFile.h" +#include "THaEvData.h" +#include "CodaDecoder.h" +#include "evio.h" +#include "TSpectrum.h" +#include "TTree.h" +#include "TFile.h" +#include "TH1.h" +#include "simpleLib.h" +#include "Fadc250Decoder.h" +#include "ETChannel.h" + + +#define FADC_BANK 3 +#define PROGRESS_COUNT 100 + + +volatile sig_atomic_t sig_caught = 0; + +void handle_sig(int signum) +{ + /* in case we registered this handler for multiple signals */ + if (signum == SIGINT) { + sig_caught = 1; + } + if (signum == SIGTERM) { + sig_caught = 2; + } + if (signum == SIGABRT) { + sig_caught = 3; + } +} + + + +void write_raw_data(const std::string &addr, int port, const std::string &etfile, + const std::string &opath, int nev, const std::vector<Module> &modules); +bool parseEvent(const uint32_t *buf, bool verbose); +uint32_t parseBlock(const uint32_t *buf); + + +uint32_t swap_endian32(uint32_t num) +{ + uint32_t b0 = (num & 0x000000ff) << 24u; + uint32_t b1 = (num & 0x0000ff00) << 8u; + uint32_t b2 = (num & 0x00ff0000) >> 8u; + uint32_t b3 = (num & 0xff000000) >> 24u; + return b0 | b1 | b2 | b3; +} + +// parse an evio event +bool parseEvent(const uint32_t *buf, bool verbose = false) +{ + auto header = (ETChannel::CodaEvHeader*)buf; + const uint32_t buf_size = header->length + 1; + if (verbose) { + std::cout << "Event header: " << std::dec << buf_size << "\n" + << (int) header->etype << ", " << (int) header->dtype << ", " << (int) header->num + << std::endl; + std::cout << std::hex; + for (uint32_t i = 0; i < 2; ++i) { + std::cout << "0x" << std::setw(8) << std::setfill('0') << buf[i] << "\n"; + } + } + + switch(header->etype) { + case ETChannel::CODA_PHY1: + case ETChannel::CODA_PHY2: + break; + case ETChannel::CODA_PRST: + case ETChannel::CODA_GO: + case ETChannel::CODA_END: + default: + return false; + } + + simpleScan((volatile uint32_t*)buf, buf_size); + return true; + + /* + // parse ROC data + uint32_t index = 2; + while(index < buf_size) { + // skip header size and data size 2 + (length - 1) + index += parseBlock(&buf[index]); + } + + // return parsed event type + return buf_size; + */ +} + + +uint32_t parseBlock(const uint32_t *buf) +{ + auto header = (ETChannel::CodaEvHeader*) buf; + const uint32_t buf_size = header->length + 1; + std::cout << "ROC header: " << std::dec << buf_size << "\n" + << (int) header->etype << ", " << (int) header->dtype << ", " << (int) header->num + << std::endl; + std::cout << std::hex; + for (uint32_t i = 0; i < buf_size; ++i) { + std::cout << "0x" << std::setw(8) << std::setfill('0') << buf[i] << "\n"; + } + return buf_size; +} + + +int main(int argc, char* argv[]) +{ + // setup input arguments + ConfigOption conf_opt; + conf_opt.AddOpts(ConfigOption::help_message, 'h', "help"); + conf_opt.AddOpt(ConfigOption::arg_require, 'n'); + conf_opt.AddOpt(ConfigOption::arg_require, 'p'); + conf_opt.AddOpt(ConfigOption::arg_require, 'f'); + conf_opt.AddLongOpt(ConfigOption::arg_require, "config-module", 'm'); + conf_opt.AddLongOpt(ConfigOption::arg_require, "host", 'o'); + + conf_opt.SetDesc("usage: %0 <data_file> <out_file>"); + conf_opt.SetDesc('n', "number of events to process (< 0 means all)."); + conf_opt.SetDesc('m', "configuration file for modules to be read-in, default \"config/esb_module.conf\"."); + conf_opt.SetDesc('p', "port to et system, default 11111."); + conf_opt.SetDesc('f', "path to et file, default \"/tmp/et_sys\""); + conf_opt.SetDesc('o', "host address of et syste, default localhost"); + + if (!conf_opt.ParseArgs(argc, argv) || conf_opt.NbofArgs() != 1) { + std::cout << conf_opt.GetInstruction() << std::endl; + return -1; + } + + int nev = -1; + std::string mconf = "config/esb_module.conf"; + std::string host = "localhost"; + int port = 11111; + std::string etfile = "/tmp/et_sys"; + + for (auto &opt : conf_opt.GetOptions()) { + switch (opt.mark) { + case 'n': + nev = opt.var.Int(); + break; + case 'm': + mconf = opt.var.String(); + break; + case 'o': + host = opt.var.String(); + break; + case 'p': + port = opt.var.Int(); + break; + case 'f': + etfile = opt.var.String(); + break; + default : + std::cout << conf_opt.GetInstruction() << std::endl; + return -1; + } + } + + auto modules = read_modules(mconf); + + write_raw_data(host, port, etfile, conf_opt.GetArgument(0), nev, modules); +} + + +#define MAX_NPEAKS 20 +#define MAX_RAW 300 +struct BranchData +{ + int npul, nraw; + float integral[MAX_NPEAKS], peak[MAX_NPEAKS], time[MAX_NPEAKS]; + int raw[MAX_RAW]; + float ped_mean, ped_err; +}; + +#define BUF_SIZE 1000 +static double buffer[BUF_SIZE], wfbuf[BUF_SIZE], bkbuf[BUF_SIZE]; +void refine_pedestal(const std::vector<uint32_t> &raw, float &ped_mean, float &ped_err) +{ + int count = 0; + float mean = 0.; + for (auto &val : raw) { + if (std::abs(val - ped_mean) < 2.0 * ped_err) { + buffer[count] = val; + mean += val; + count ++; + } + } + if (count == 0) { + return; + } + + mean /= count; + float change = std::abs(ped_mean - mean); + + // no outliers + if (change < 0.05 * ped_err) { + return; + } + + // recursively refine + float err = 0.; + for (int i = 0; i < count; ++i) { + err += (buffer[i] - mean)*(buffer[i] - mean); + } + ped_mean = mean; + ped_err = std::sqrt(err/count); + refine_pedestal(raw, ped_mean, ped_err); +} + +// a function for a simple constant baseline fit +void find_pedestal(const std::vector<uint32_t> &raw, float &ped_mean, float &ped_err) +{ + ped_mean = 0; + ped_err = 0; + + for (auto &val : raw) { + ped_mean += val; + } + ped_mean /= raw.size(); + + for (auto &val : raw) { + ped_err += (val - ped_mean)*(val - ped_mean); + } + ped_err = std::sqrt(ped_err/raw.size()); + + refine_pedestal(raw, ped_mean, ped_err); +} + +// analyze the waveform data and fill the result in a branch data +void waveform_analysis(const std::vector<uint32_t> &raw, BranchData &res) +{ + // no need to analyze + if (raw.empty()) { + return; + } + + // fill in the raw data + res.nraw = raw.size(); + for (size_t i = 0; i < raw.size(); ++i) { + res.raw[i] = raw[i]; + } + + // get pedestals + find_pedestal(raw, res.ped_mean, res.ped_err); + + // fill in spectrum buffer + for (size_t i = 0; i < raw.size(); ++i) { + wfbuf[i] = raw[i] - res.ped_mean; + } + + // find peaks + TSpectrum s; + s.SetResolution(0.5); + int npeaks = s.SearchHighRes(wfbuf, bkbuf, res.nraw, 3.0, 20, false, 5, true, 3); + + // fill branch data + double *pos = s.GetPositionX(); + res.npul = 0; + for (int i = 0; i < npeaks; ++i) { + int j = pos[i]; + if (wfbuf[j] < 5.*res.ped_err) { continue; } + res.time[res.npul] = j; + res.peak[res.npul] = wfbuf[j]; + /* + res.integral[res.npul] = wfbuf[j]; + j = pos[i] - 1; + while ( (j > 0) && (wfbuf[j] - wfbuf[j - 1])*wfbuf[j] > 0. ) { + res.integral[res.npul] += wfbuf[j--]; + } + j = pos[i] + 1; + while ( (j < res.nraw - 1) && (wfbuf[j] - wfbuf[j + 1])*wfbuf[j] > 0. ) { + res.integral[res.npul] += wfbuf[j++]; + } + */ + res.npul ++; + } +} + + +// fill decoded FADC250 data into the container (branch data) +void fill_branch(const Fadc250Data &slot_data, const Module &mod, std::unordered_map<std::string, BranchData> &brdata) +{ + for (auto &event : slot_data.events) { + for (auto &ch : mod.channels) { + auto &channel = event.channels[ch.id]; + auto &bd = brdata[ch.name]; + bd.npul = channel.integral.size(); + for (uint32_t i = 0; i < channel.integral.size(); ++i) { + bd.integral[i] = channel.integral[i]; + bd.time[i] = channel.time[i]; + } + waveform_analysis(channel.raw, bd); + } + } +} + +// fill branch data into the root tree +void fill_tree(TTree *tree, std::unordered_map<std::string, BranchData> &brdata, bool &init, int mode) +{ + if ((mode != 1) && (mode != 3)) { + std::cout << "Warning: unsupported mode " << mode << ", data won't be recorded." << std::endl; + return; + } + + // initialize + if (!init) { + for (auto &it : brdata) { + auto n = it.first; + tree->Branch((n + "_Npulse").c_str(), &brdata[n].npul, (n + "_N/I").c_str()); + tree->Branch((n + "_Pint").c_str(), &brdata[n].integral[0], (n + "_Pint[" + n + "_N]/F").c_str()); + tree->Branch((n + "_Ptime").c_str(), &brdata[n].time[0], (n + "_Ptime[" + n + "_N]/F").c_str()); + // raw waveform provides more information + if (mode == 1) { + tree->Branch((n + "_Ppeak").c_str(), &brdata[n].peak[0], (n + "_Ppeak[" + n + "_N]/F").c_str()); + tree->Branch((n + "_Nraw").c_str(), &brdata[n].nraw, (n + "_Nraw/I").c_str()); + tree->Branch((n + "_raw").c_str(), &brdata[n].raw[0], (n + "_raw[" + n + "_Nraw]/I").c_str()); + tree->Branch((n + "_ped_mean").c_str(), &brdata[n].ped_mean, (n + "_ped_mean/F").c_str()); + tree->Branch((n + "_ped_err").c_str(), &brdata[n].ped_err, (n + "_ped_err/F").c_str()); + } + } + init = true; + std::cout << "Initialized root file for mode " << mode << " data." << std::endl; + } + + tree->Fill(); +} + + +void processEvent(const uint32_t *buf, int &count, TTree *tree, bool &init_tree, int &data_mode, + const std::vector<Module> &modules, std::unordered_map<std::string, BranchData> &brdata) +{ + if (!parseEvent(buf)) { + return; + } + + // get block level + int blvl = 1; + simpleGetRocBlockLevel(modules.front().crate, FADC_BANK, &blvl); + + for (int ii = 0; ii < blvl; ++ii) { + // clear data buffer + for (auto &br : brdata) { + br.second.npul = 0; + br.second.nraw = 0; + } + + // parse module data + for (auto &mod : modules) { + uint32_t header = 0; + auto status = simpleGetSlotBlockHeader(mod.crate, FADC_BANK, mod.slot, &header); + if (status <= 0) { + std::cout << "Error getting header for crate = " + << mod.crate << ", slot = " << mod.slot << std::endl; + continue; + } + + uint32_t *dbuf; + auto len = simpleGetSlotEventData(mod.crate, FADC_BANK, mod.slot, ii, &dbuf); + if (len <= 0) { + std::cout << "No data for crate = " << mod.crate << ", slot = " << mod.slot + << ", block_level = " << ii << std::endl; + continue; + } + auto slot_data = fadc250_decoder(header, dbuf, len); + + // check data mode + if (data_mode < 0 && slot_data.GetMode() > 0) { + data_mode = slot_data.GetMode(); + } + // fill branch data + fill_branch(slot_data, mod, brdata); + } + + // fill event + fill_tree(tree, brdata, init_tree, data_mode); + count ++; + } +} + + +void write_raw_data(const std::string &addr, int port, const std::string &etfile, + const std::string &opath, int nev, const std::vector<Module> &modules) +{ + ETChannel et_chan(100, 100); + if (et_chan.Connect(addr.c_str(), port, etfile.c_str(), "TCD_MONITOR") != ET_OK) { + return; + } + + auto *hfile = new TFile(opath.c_str(), "RECREATE", "MAPMT test results"); + auto tree = new TTree("EvTree", "Cherenkov Test Events"); + // set up branches and a container to be combined with the branches + std::unordered_map<std::string, BranchData> brdata; + for (auto &mod : modules) { + for (auto &ch : mod.channels) { + auto n = ch.name; + if (brdata.find(n) != brdata.end()) { + std::cout << "WARNING: Duplicated channel names found! -- " << n << std::endl; + } + brdata[n] = BranchData(); + } + } + + int count = 0; + bool init_tree = false; + int data_mode = -1; + et_chan.Open(); + int status = et_chan.ReadEvent(); + signal (SIGINT, handle_sig); + while (status != ETChannel::READ_ERROR && status != ETChannel::READ_EOF) { + if (sig_caught || (nev > 0 && nev < count)) { break; } + // read-in data + if (status == ETChannel::READ_EMPTY) { + std::cout << "ET station is empty, wating 5 secs..." << std::endl; + std::this_thread::sleep_for(std::chrono::seconds(5)); + status = et_chan.ReadEvent(); + continue; + } + + if((count % PROGRESS_COUNT) == 0) { + std::cout << "Read and processed events - " << count << std::endl; + } + + processEvent(et_chan.GetEvBuffer(), count, tree, init_tree, data_mode, modules, brdata); + status = et_chan.ReadEvent(); + } + std::cout << "Read and processed events - " << count << std::endl; + + hfile->Write(); + hfile->Close(); +} + diff --git a/src/conf2json.cpp b/tools/conf2json.cpp similarity index 100% rename from src/conf2json.cpp rename to tools/conf2json.cpp diff --git a/tools/et_feeder.cpp b/tools/et_feeder.cpp new file mode 100644 index 0000000000000000000000000000000000000000..cba62e02aa5ba52293a66aa0c3ba2a737f6214dc --- /dev/null +++ b/tools/et_feeder.cpp @@ -0,0 +1,148 @@ +#include "ConfigOption.h" +#include "ETConfigWrapper.h" +#include "et.h" +#include "THaCodaFile.h" +#include <csignal> +#include <thread> +#include <chrono> +#include <iostream> + +#define PROGRESS_COUNT 100 + +using namespace std::chrono; + + +volatile std::sig_atomic_t gSignalStatus; + + +void signal_handler(int signal) { + gSignalStatus = signal; +} + + +int main(int argc, char* argv[]) try +{ + // setup input arguments + ConfigOption conf_opt; + conf_opt.AddLongOpt(ConfigOption::help_message, "help", 'a'); + conf_opt.AddOpt(ConfigOption::arg_require, 'h'); + conf_opt.AddOpt(ConfigOption::arg_require, 'p'); + conf_opt.AddOpt(ConfigOption::arg_require, 'f'); + conf_opt.AddOpt(ConfigOption::arg_require, 'i'); + + conf_opt.SetDesc("usage: %0 <data_file>"); + conf_opt.SetDesc('h', "host address of the ET system, default \"localhost\"."); + conf_opt.SetDesc('p', "port to connect, default 11111."); + conf_opt.SetDesc('f', "memory mapped et file, default \"/tmp/et_feeder\"."); + conf_opt.SetDesc('i', "interval in milliseconds to write data, default \"10\"."); + conf_opt.SetDesc('a', "help message."); + + if (!conf_opt.ParseArgs(argc, argv) || conf_opt.NbofArgs() != 1) { + std::cout << conf_opt.GetInstruction() << std::endl; + return -1; + } + + std::string host = "localhost"; + int port = 11111; + std::string etf = "/tmp/et_feeder"; + int interval = 10; + + for (auto &opt : conf_opt.GetOptions()) { + switch (opt.mark) { + case 'h': + host = opt.var.String(); + break; + case 'p': + port = opt.var.Int(); + break; + case 'f': + etf = opt.var.String(); + break; + case 'i': + interval = opt.var.Int(); + break; + default : + std::cout << conf_opt.GetInstruction() << std::endl; + return -1; + } + } + + et_sys_id et_id; + et_att_id att_id; + + // open ET system + et_wrap::OpenConfig conf; + conf.set_cast(ET_DIRECT); + conf.set_host(host.c_str()); + conf.set_serverport(port); + + char fname[256]; + strncpy(fname, etf.c_str(), 256); + auto status = et_open(&et_id, fname, conf.configure().get()); + if (status != ET_OK) { + std::cerr << "Cannot open ET at " << etf << std::endl; + return -1; + } + + // attach to GRAND CENTRAL + status = et_station_attach(et_id, ET_GRANDCENTRAL, &att_id); + if (status != ET_OK) { + std::cerr << "Failed to attach to the ET Grand Central Station." << std::endl; + return -1; + } + // evio file reader + Decoder::THaCodaFile file; + file.codaOpen(conf_opt.GetArgument(0).c_str()); + if (!file.isOpen()) { + std::cerr << "Failed to open coda file \"" << conf_opt.GetArgument(0) << "\"." << std::endl; + return -1; + } + + // install signal handler + std::signal(SIGINT, signal_handler); + int count = 0; + et_event *ev; + while ((file.codaRead() == CODA_OK) && et_alive(et_id)) { + if (gSignalStatus == SIGINT) { + std::cout << "Received control-C, exiting..." << std::endl; + break; + } + system_clock::time_point start(system_clock::now()); + system_clock::time_point next(start + std::chrono::milliseconds(interval)); + + if (++count % PROGRESS_COUNT == 0) { + std::cout << "Read and feed " << count << " events to ET, rate is 1 event per " + << interval << " ms.\r" << std::flush; + } + uint32_t *buff = static_cast<uint32_t*>(file.getEvBuffer()); + size_t nbytes = (buff[0] + 1)*sizeof(uint32_t); + status = et_event_new(et_id, att_id, &ev, ET_SLEEP, nullptr, nbytes); + if (status != ET_OK) { + std::cerr << "Failed to add new event to the ET system." << std::endl; + return -1; + } + // build et event + void *data; + et_event_getdata(ev, &data); + memcpy((void *) data, (const void *)buff, nbytes); + et_event_setlength(ev, nbytes); + + // put back the event + status = et_event_put(et_id, att_id, ev); + if (status != ET_OK) { + std::cerr << "Failed to put event back to the ET system." << std::endl; + return -1; + } + + std::this_thread::sleep_until(next); + } + std::cout << "Read and feed " << count << " events to ET, rate is 1 event per " + << interval << " ms." << std::endl; + + file.codaClose(); + return 0; + +} catch (...) { + std::cerr << "?unknown exception" << std::endl; +} +