Select Git revision
monitor.cxx
Forked from
Telescope Cherenkov / fadc_decoder
103 commits behind the upstream repository.
monitor.cxx 14.87 KiB
#include "MonitorDisplay.h"
#include <iostream>
#include <iomanip>
#include <fstream>
#include <csignal>
#include "utils.h"
#include "ConfigParser.h"
#include "ConfigObject.h"
#include "ConfigOption.h"
#include "TSpectrum.h"
#include "TTree.h"
#include "TFile.h"
#include "TH1.h"
#include "simpleLib.h"
#include "Fadc250Decoder.h"
#include "ETChannel.h"
#define FADC_BANK 3
volatile sig_atomic_t term = 0;
void handle_sig(int signum)
{
std::cout << "Received SIGINT, stopping the monitor..." << std::endl;
term = 1;
}
uint32_t swap_endian32(uint32_t num)
{
uint32_t b0 = (num & 0x000000ff) << 24u;
uint32_t b1 = (num & 0x0000ff00) << 8u;
uint32_t b2 = (num & 0x00ff0000) >> 8u;
uint32_t b3 = (num & 0xff000000) >> 24u;
return b0 | b1 | b2 | b3;
}
uint32_t parseBlock(const uint32_t *buf)
{
auto header = (ETChannel::CodaEvHeader*) buf;
const uint32_t buf_size = header->length + 1;
std::cout << "ROC header: " << std::dec << buf_size << "\n"
<< (int) header->etype << ", " << (int) header->dtype << ", " << (int) header->num
<< std::endl;
std::cout << std::hex;
for (uint32_t i = 0; i < buf_size; ++i) {
std::cout << "0x" << std::setw(8) << std::setfill('0') << buf[i] << "\n";
}
return buf_size;
}
// parse an evio event
bool parseEvent(const uint32_t *buf, bool verbose = false)
{
auto header = (ETChannel::CodaEvHeader*)buf;
const uint32_t buf_size = header->length + 1;
if (verbose) {
std::cout << "Event header: " << std::dec << buf_size << "\n"
<< (int) header->etype << ", " << (int) header->dtype << ", " << (int) header->num
<< std::endl;
std::cout << std::hex;
for (uint32_t i = 0; i < 2; ++i) {
std::cout << "0x" << std::setw(8) << std::setfill('0') << buf[i] << "\n";
}
}
switch(header->etype) {
case ETChannel::CODA_PHY1:
case ETChannel::CODA_PHY2:
break;
case ETChannel::CODA_PRST:
case ETChannel::CODA_GO:
case ETChannel::CODA_END:
default:
return false;
}
simpleScan((volatile uint32_t*)buf, buf_size);
return true;
/*
// parse ROC data
uint32_t index = 2;
while(index < buf_size) {
// skip header size and data size 2 + (length - 1)
index += parseBlock(&buf[index]);
}
// return parsed event type
return buf_size;
*/
}
#define MAX_NPEAKS 20
#define MAX_RAW 300
struct BranchData
{
int npul, nraw;
float integral[MAX_NPEAKS], peak[MAX_NPEAKS], time[MAX_NPEAKS];
int raw[MAX_RAW];
float ped_mean, ped_err;
};
#define BUF_SIZE 1000
static double buffer[BUF_SIZE], wfbuf[BUF_SIZE], bkbuf[BUF_SIZE];
void refine_pedestal(const std::vector<uint32_t> &raw, float &ped_mean, float &ped_err)
{
int count = 0;
float mean = 0.;
for (auto &val : raw) {
if (std::abs(val - ped_mean) < 2.0 * ped_err) {
buffer[count] = val;
mean += val;
count ++;
}
}
if (count == 0) {
return;
}
mean /= count;
float change = std::abs(ped_mean - mean);
// no outliers
if (change < 0.05 * ped_err) {
return;
}
// recursively refine
float err = 0.;
for (int i = 0; i < count; ++i) {
err += (buffer[i] - mean)*(buffer[i] - mean);
}
ped_mean = mean;
ped_err = std::sqrt(err/count);
refine_pedestal(raw, ped_mean, ped_err);
}
// a function for a simple constant baseline fit
void find_pedestal(const std::vector<uint32_t> &raw, float &ped_mean, float &ped_err)
{
ped_mean = 0;
ped_err = 0;
for (auto &val : raw) {
ped_mean += val;
}
ped_mean /= raw.size();
for (auto &val : raw) {
ped_err += (val - ped_mean)*(val - ped_mean);
}
ped_err = std::sqrt(ped_err/raw.size());
refine_pedestal(raw, ped_mean, ped_err);
}
// analyze the waveform data and fill the result in a branch data
void waveform_analysis(const std::vector<uint32_t> &raw, BranchData &res)
{
// no need to analyze
if (raw.empty()) {
return;
}
// fill in the raw data
res.nraw = raw.size();
for (size_t i = 0; i < raw.size(); ++i) {
res.raw[i] = raw[i];
}
// get pedestals
find_pedestal(raw, res.ped_mean, res.ped_err);
// fill in spectrum buffer
for (size_t i = 0; i < raw.size(); ++i) {
wfbuf[i] = raw[i] - res.ped_mean;
}
// find peaks
TSpectrum s;
s.SetResolution(0.5);
int npeaks = s.SearchHighRes(wfbuf, bkbuf, res.nraw, 3.0, 20, false, 5, true, 3);
// fill branch data
double *pos = s.GetPositionX();
res.npul = 0;
for (int i = 0; i < npeaks; ++i) {
int j = pos[i];
if (wfbuf[j] < 5.*res.ped_err) { continue; }
res.time[res.npul] = j;
res.peak[res.npul] = wfbuf[j];
/*
res.integral[res.npul] = wfbuf[j];
j = pos[i] - 1;
while ( (j > 0) && (wfbuf[j] - wfbuf[j - 1])*wfbuf[j] > 0. ) {
res.integral[res.npul] += wfbuf[j--];
}
j = pos[i] + 1;
while ( (j < res.nraw - 1) && (wfbuf[j] - wfbuf[j + 1])*wfbuf[j] > 0. ) {
res.integral[res.npul] += wfbuf[j++];
}
*/
res.npul ++;
}
}
// fill decoded FADC250 data into the container (branch data)
void fill_branch(const Fadc250Data &slot_data, const Module &mod, std::unordered_map<std::string, BranchData> &brdata)
{
for (auto &event : slot_data.events) {
for (auto &ch : mod.channels) {
auto &channel = event.channels[ch.id];
auto &bd = brdata[ch.name];
bd.npul = channel.integral.size();
for (uint32_t i = 0; i < channel.integral.size(); ++i) {
bd.integral[i] = channel.integral[i];
bd.time[i] = channel.time[i];
}
waveform_analysis(channel.raw, bd);
}
}
}
// fill branch data into the root tree
void fill_tree(TTree *tree, std::unordered_map<std::string, BranchData> &brdata, bool &init, int mode)
{
if ((mode != 1) && (mode != 3)) {
std::cout << "Warning: unsupported mode " << mode << ", data won't be recorded." << std::endl;
return;
}
// initialize
if (!init) {
for (auto &it : brdata) {
auto n = it.first;
tree->Branch((n + "_Npulse").c_str(), &brdata[n].npul, (n + "_N/I").c_str());
tree->Branch((n + "_Pint").c_str(), &brdata[n].integral[0], (n + "_Pint[" + n + "_N]/F").c_str());
tree->Branch((n + "_Ptime").c_str(), &brdata[n].time[0], (n + "_Ptime[" + n + "_N]/F").c_str());
// raw waveform provides more information
if (mode == 1) {
tree->Branch((n + "_Ppeak").c_str(), &brdata[n].peak[0], (n + "_Ppeak[" + n + "_N]/F").c_str());
tree->Branch((n + "_Nraw").c_str(), &brdata[n].nraw, (n + "_Nraw/I").c_str());
tree->Branch((n + "_raw").c_str(), &brdata[n].raw[0], (n + "_raw[" + n + "_Nraw]/I").c_str());
tree->Branch((n + "_ped_mean").c_str(), &brdata[n].ped_mean, (n + "_ped_mean/F").c_str());
tree->Branch((n + "_ped_err").c_str(), &brdata[n].ped_err, (n + "_ped_err/F").c_str());
}
}
init = true;
std::cout << "Initialized root file for mode " << mode << " data." << std::endl;
}
tree->Fill();
}
void processEvent(const uint32_t *buf, int &count, TTree *tree, bool &init_tree, int &data_mode,
const std::vector<Module> &modules, std::unordered_map<std::string, BranchData> &brdata)
{
if (!parseEvent(buf, false)) {
return;
}
// get block level
int blvl = 1;
simpleGetRocBlockLevel(modules.front().crate, FADC_BANK, &blvl);
for (int ii = 0; ii < blvl; ++ii) {
// clear data buffer
for (auto &br : brdata) {
br.second.npul = 0;
br.second.nraw = 0;
}
// parse module data
for (auto &mod : modules) {
uint32_t header = 0;
auto status = simpleGetSlotBlockHeader(mod.crate, FADC_BANK, mod.slot, &header);
if (status <= 0) {
std::cout << "Error getting header for crate = "
<< mod.crate << ", slot = " << mod.slot << std::endl;
continue;
}
uint32_t *dbuf;
auto len = simpleGetSlotEventData(mod.crate, FADC_BANK, mod.slot, ii, &dbuf);
if (len <= 0) {
std::cout << "No data for crate = " << mod.crate << ", slot = " << mod.slot
<< ", block_level = " << ii << std::endl;
continue;
}
auto slot_data = fadc250_decoder(header, dbuf, len);
// check data mode
if (data_mode < 0 && slot_data.GetMode() > 0) {
data_mode = slot_data.GetMode();
}
// fill branch data
fill_branch(slot_data, mod, brdata);
}
// fill event
fill_tree(tree, brdata, init_tree, data_mode);
count ++;
}
}
static std::vector<std::vector<std::string>> ec_channels = {
{"C4"}, {"C6_1", "C6_4", "C6_2", "C6_3"}, {"C7_1", "C7_4", "C7_2", "C7_3"},
{"C5_1", "C5_4", "C5_2", "C5_3"}, {"C9_1", "C9_2", "C9_4", "C9_3"}, {"C8_2", "C8_3", "C8_1", "C8_4"},
{"C1"}, {"C2"}, {"C3"},
};
static std::vector<std::pair<int, int>> padnum;
inline void set_graph(TGraph *gr, int npt, double value = 0.)
{
for (int i = 0; i < npt; ++i) {
gr->SetPoint(i, i, value);
}
}
void AddWaveformDisplay(hallc::MonitoringDisplay *dply, std::unordered_map<std::string, BranchData> &brdata, int &mode)
{
dply->CreateDisplayPlot("raw/", "waveform",
[] (hallc::DisplayPlot &plt) {
plt._plot_data._canvas = new TCanvas("ec_waveform_test2", "EC Waveform", 1200, 700);
plt._plot_data._canvas->DivideSquare(ec_channels.size());
padnum.clear();
for (size_t i = 0; i < ec_channels.size(); ++i) {
auto gch = ec_channels[i];
plt._plot_data._canvas->cd(i + 1);
int ndiv = int(std::sqrt(gch.size()) - 1e-4) + 1;
gPad->Divide(ndiv, ndiv, 0., 0.);
for (size_t j = 0; j < gch.size(); ++j) {
gPad->cd(j + 1);
auto gr = new TGraph();
set_graph(gr, 64);
gr->SetLineColor(kRed);
gr->SetLineWidth(2);
gr->SetTitle(gch[j].c_str());
gr->Draw("L");
plt._plot_data._graphs1.push_back(gr);
padnum.push_back({i + 1, j + 1});
}
}
return 0;
},
[&brdata, &mode] (hallc::DisplayPlot &plt) {
if (mode != 1) {
std::cout << "Skip raw waveform plot because data mode = " << mode << std::endl;
return 0;
}
for (size_t i = 0; i < plt._plot_data._graphs1.size(); ++i) {
auto gr = plt._plot_data._graphs1[i];
auto pn = padnum[i];
plt._plot_data._canvas->cd(pn.first);
gPad->cd(pn.second);
auto it = brdata.find(gr->GetTitle());
if (it != brdata.end()) {
auto data = it->second;
for (int k = 0; k < 64; ++k) {
gr->SetPoint(k, k, (k < data.nraw) ? data.raw[k] : 0.);
}
} else {
std::cout << "Warning: cannot find raw data for " << gr->GetTitle() << std::endl;
}
gr->Draw("L");
gr->GetYaxis()->SetRangeUser(300, 1000);
gr->GetXaxis()->SetRangeUser(0, 64);
}
return 0;
});
}
void monitor(const std::string &cpath = "config/online_monitor.conf")
{
// read configuration
ConfigObject conf;
conf.ReadConfigFile(cpath);
auto host = conf.Value<std::string>("host", "clrlpc.jlab.org");
auto port = conf.Value<int>("port", 11111);
auto etfile = conf.Value<std::string>("etfile", "/tmp/et_test");
auto modconf = conf.Value<std::string>("modules", "config/esb_module.conf");
auto up_intvl = conf.Value<double>("update_interval", 5000);
auto outfile = conf.Value<std::string>("output", "processed_data/online.root");
auto nev = conf.Value<int>("number_events", -1);
auto server_dir = conf.Value<std::string>("server_dir", "online/latest");
auto server_port = conf.Value<int>("server_port", 9100);
// connect et
ETChannel et_chan(100, 100);
if (et_chan.Connect(host.c_str(), port, etfile.c_str(), "TCD_MONITOR") != ET_OK) {
std::cout << fmt::format("Failed to connect to the et system {}:{}:{}, abort monitoring!", host, port, etfile)
<< std::endl;
return;
}
et_chan.Open();
auto modules = read_modules(modconf);
// set up branches and a container to be combined with the branches
std::unordered_map<std::string, BranchData> brdata;
for (auto &mod : modules) {
for (auto &ch : mod.channels) {
auto n = ch.name;
if (brdata.find(n) != brdata.end()) {
std::cout << "WARNING: Duplicated channel names found! -- " << n << std::endl;
}
brdata[n] = BranchData();
}
}
// root file and event tree
auto *hfile = new TFile(outfile.c_str(), "RECREATE", "MAPMT test results");
auto tree = new TTree("EvTree", "Cherenkov Test Events");
int data_mode = -1;
bool init_tree = false;
// display plots
auto dply = CreateMonitorDisplay(server_dir, server_port, tree);
// monitor display for raw waveform
AddWaveformDisplay(dply, brdata, data_mode);
dply->InitAll();
// timer
auto timer = std::chrono::steady_clock::now();
// start monitoring
int count = 0;
signal(SIGINT, handle_sig);
while (true) {
if (term > 0 || (nev > 0 && nev < count)) { break; }
auto now = std::chrono::steady_clock::now();
if (std::chrono::duration_cast<std::chrono::milliseconds>(now - timer).count() > up_intvl) {
timer = now;
std::cout << "Monitor update, processed events - " << count << std::endl;
dply->Process();
dply->UpdateAll();
}
int status = et_chan.ReadEvent();
switch (status) {
case ETChannel::READ_ERROR:
case ETChannel::READ_EOF:
term = 1;
break;
case ETChannel::READ_EMPTY:
std::cout << "ET station is empty, wait 2 secs..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
break;
case ETChannel::READ_OK:
processEvent(et_chan.GetEvBuffer(), count, tree, init_tree, data_mode, modules, brdata);
break;
}
}
dply->Process();
dply->UpdateAll();
std::cout << "Read and processed events - " << count << std::endl;
hfile->Write();
hfile->Close();
}