Skip to content
Snippets Groups Projects
THcOnlineRun.cxx 7.79 KiB
Newer Older
///////////////////////////////////////////////////////////////////////////////
// Class THcOnlineRun                                                        //
// Read event from ET instead of CODA file                                   //
//                                                                           //
// Developer:                                                                //
// Chao Peng                                                                 //
// 11/22/2019                                                                //
///////////////////////////////////////////////////////////////////////////////

#include "THcOnlineRun.h"
#include "THcGlobals.h"
Chao Peng's avatar
Chao Peng committed
#include <string>
Chao Peng's avatar
Chao Peng committed
template<class Interval>
auto get_next_time(Interval itvl)
Chao Peng's avatar
Chao Peng committed
    return std::chrono::system_clock::now() + itvl;
Chao Peng's avatar
Chao Peng committed
std::string et_error_to_string(int error)
Chao Peng's avatar
Chao Peng committed
    switch(error) {
    case ET_ERROR_EMPTY: return "et_client: empty et event.";
    case ET_ERROR_DEAD: return "et client: et is dead!";
    case ET_ERROR_TIMEOUT: return "et client: got timeout!";
    case ET_ERROR_BUSY: return "et client: et is busy!";
    case ET_ERROR_WAKEUP: return "et_client: et is waking up.";
    default: break;
    }
    return "et_client: unknown et error.";
}

// Constructors
THcOnlineRun::THcOnlineRun(UInt_t size, std::chrono::milliseconds itvl, Int_t mtries, Int_t vers)
:   THcRun(), version(vers), bSize(size), eSize(0), interval(itvl), max_tries(mtries),
    et_id(nullptr), stat_id(ID_NULL), att_id(ID_NULL)
{
    buffer = new UInt_t[size];
    clock = get_next_time(itvl);
}

// Copy constructor
THcOnlineRun::THcOnlineRun(const THcOnlineRun& rhs)
Chao Peng's avatar
Chao Peng committed
:   THcRun(rhs)
Chao Peng's avatar
Chao Peng committed
    interval = rhs.interval;
    max_tries = rhs.max_tries;
    // do not copy the ownership of et system
    et_id = nullptr;
    stat_id = ID_NULL;
    att_id = ID_NULL;
}

// Missing move constructor, not implementing for consistency with the analyzer

// Copy assignment operator
Chao Peng's avatar
Chao Peng committed
THcOnlineRun& THcOnlineRun::operator=(const THcOnlineRun& rhs)
    if (this != &rhs) {
Chao Peng's avatar
Chao Peng committed
        interval = rhs.interval;
        max_tries = rhs.max_tries;
        et_id = nullptr;
        stat_id = ID_NULL;
        att_id = ID_NULL;
        THcRun::operator=(rhs);
    }
    return *this;
}

// Missing move assignment operator, not implementing for consistency with the analyzer

// Destructor.
THcOnlineRun::~THcOnlineRun()
{
Chao Peng's avatar
Chao Peng committed
    // delete buffer
    if (buffer != nullptr) {
        delete[](buffer), buffer = nullptr;
    }
    // do not touch et_event as it is managed by the et system
    Close();
}

// Connect a ET system and create a monitor station with pre-settings
Int_t THcOnlineRun::Connect(const char *ip_addr, int port, const char *et_file, const char *s_name)
{
    // open et system
    auto conf = PRadETChannel::Configuration();
    // use a direct connection to the ET system
    conf.SetCast(ET_DIRECT);
    conf.SetHost(ip_addr);
    conf.SetServerPort(port);

    auto status = Open(et_file, conf);
    if (status < ET_OK) {
        _logger->error("THcOnlineRun::Connect : Failed to open ET system, abort connection.");
        Close();
        return status;
    }

    // create a station
    auto sconf = PRadETStation::Configuration();
    sconf.SetUser(ET_STATION_USER_MULTI);
    sconf.SetRestore(ET_STATION_RESTORE_OUT);
    sconf.SetPrescale(1);
    sconf.SetCUE(ET_CHUNK_SIZE);
    sconf.SetSelect(ET_STATION_SELECT_ALL);
    sconf.SetBlock(ET_STATION_NONBLOCKING);

    status = CreateStation(s_name, sconf);
    if (status < ET_OK) {
        _logger->error("THcOnlineRun::Connect : Failed to create a station, abort connection.");
        Close();
        return status;
    status = Attach();
    if (status < ET_OK) {
        _logger->error("THcOnlineRun::Connect : Failed to attach to the station, abort connection.");
        Close();
        return status;
    }

    return ET_OK;
}

// Open ET from configuration
Int_t THcOnlineRun::Open(const char *et_file, PRadETChannel::Configuration conf)
{
    if (IsOpen()) {
        _logger->warn("THcOnlineRun::Open : ET system is already opened, close it before re-open");
        return ID_NULL;
    }

    char fname[256];
    strncpy(fname, et_file, 256);
    return et_open(&et_id, fname, conf.Get());
}

// Close ET
Int_t THcOnlineRun::Close()
{
    if ( (et_id != nullptr) && IsOpen() ) {
        et_forcedclose(et_id);
        et_id = nullptr;
        return 1;
    }
    return 0;
}

// create a station from configuration
Int_t THcOnlineRun::CreateStation(const char *station_name, PRadETStation::Configuration conf)
{
    if (!IsOpen()) {
        _logger->warn("THcOnlineRun::CreateStation : ET System is not opened, abort creating a station.");
        return ID_NULL;
    } else if (stat_id != ID_NULL) {
        _logger->warn("THcOnlineRun::CreateStation : A station has alreadly been created, remove it before re-create a station.");
        return ID_NULL;
    }

    char sname[256];
    strncpy(sname, station_name, 256);
    return et_station_create(et_id, &stat_id, sname, conf.Get());
}

// remove the station from et system
Int_t THcOnlineRun::RemoveStation()
{
    if (IsOpen() && (stat_id != ID_NULL)) {
        auto status = et_station_remove(et_id, stat_id);
        if (status == ET_OK) {
            stat_id = ID_NULL;
            return status;
        }
    }
    return ID_NULL;
}

// Attach to a station
Int_t THcOnlineRun::Attach()
{
    return Attach(stat_id);
}

// Attach to a station with station id
Int_t THcOnlineRun::Attach(et_stat_id sid)
{
    if (!IsOpen()) {
        _logger->warn("THcOnlineRun::Attach : ET System is not opened, abort attaching to a station.");
        return ID_NULL;
    }
    return et_station_attach(et_id, sid, &att_id);
}

// Detach the station
Int_t THcOnlineRun::Detach()
{
    if (IsOpen() && (att_id != ID_NULL)) {
        auto status = et_station_detach(et_id, att_id);
        if (status == ET_OK) {
            att_id = ID_NULL;
            return status;
        }
    }
    return ID_NULL;
}
Chao Peng's avatar
Chao Peng committed

// copy event to the buffer
bool THcOnlineRun::copyEvent()
{
    void *data;
    size_t size;
    et_event_getdata(ev, &data);
    et_event_getlength(ev, &size);

    // from byte to the buffer type
    auto *dBuffer = (decltype(buffer)) data;
    size_t bytes = sizeof(decltype(buffer[0]));
    eSize = size/bytes + ((size % bytes) ? 1 : 0);

    if (eSize > bSize) {
        _logger->warn("THcOnlineRun::copyEvent et event size {} exceeds the buffer size {}.", eSize, bSize);
        return false;
    }

    for (UInt_t i = 0; i < eSize; ++i) {
        buffer[i] = dBuffer[i];
    }
    return true;
}

// read event from et channel
Int_t THcOnlineRun::ReadEvent()
{
    const char *here = "THcOnlineRun::ReadEvent";
    UInt_t ntries = 0;
    if (!IsOpen() || att_id == ID_NULL) {
        _logger->error("{} Did not connect to an ET system, abort Reading.", here);
        return THaRunBase::READ_FATAL;
    }

    Int_t read = THaRunBase::READ_OK;

    while(ntries++ < max_tries) {
        std::this_thread::sleep_until(clock);
        clock = get_next_time(interval);

        auto status = et_event_get(et_id, att_id, &ev, ET_ASYNC, nullptr);

        switch(status) {
        case ET_OK:
            // copy event to buffer
            if (!copyEvent()) {
                read = THaRunBase::READ_EMPTY;
            }
            // put back the event
            if (et_event_put(et_id, att_id, ev) != ET_OK) {
                _logger->error("{} failed to put back et_event to ET system, abort monitoring.", here);
                read = THaRunBase::READ_EOF;
            }
            return read;
        case ET_ERROR_EMPTY:
            break;
        default:
            _logger->error("{} {}", here, et_error_to_string(status).c_str());
            return THaRunBase::READ_FATAL;
        }
    }

    _logger->info("THcOnlineRun::ReadEvent : Tried {} times and no event obtained from ET.", max_tries);
    return THaRunBase::READ_EMPTY;
}