/*----------------------------------------------------------------------------* * Copyright (c) 1998 Southeastern Universities Research Association, * * Thomas Jefferson National Accelerator Facility * * * * This software was developed under a United States Government license * * described in the NOTICE file included as part of this distribution. * * * * Author: Carl Timmer * * timmer@jlab.org Jefferson Lab, MS-12H * * Phone: (757) 269-5130 12000 Jefferson Ave. * * Fax: (757) 269-5800 Newport News, VA 23606 * * * *----------------------------------------------------------------------------* * * Description: * ET system sample event producer * *----------------------------------------------------------------------------*/ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <signal.h> #include <sys/types.h> #include <unistd.h> #include <limits.h> #include <getopt.h> #include <time.h> #include <sys/time.h> #include <math.h> #include "et.h" /* prototype */ static void * signal_thread (void *arg); int main(int argc,char **argv) { int i, j, c, i_tmp, status, junkData, numRead, locality; int startingVal=0, errflg=0, group=1, chunk=1, size=32, writeData=0, localEndian=1; int delay=0, remote=0, multicast=0, broadcast=0, broadAndMulticast=0; int sendBufSize=0, recvBufSize=0, noDelay=0, blast=0, noAllocFlag=0; int debugLevel = ET_DEBUG_ERROR; unsigned short port=0; char et_name[ET_FILENAME_LENGTH], host[256], interface[16], localAddr[16]; void *fakeData; int mcastAddrCount = 0, mcastAddrMax = 10; char mcastAddr[mcastAddrMax][16]; et_att_id attach1; et_sys_id id; et_openconfig openconfig; et_event **pe; struct timespec timeout; #if defined __APPLE__ struct timeval t1, t2; #else struct timespec t1, t2; #endif sigset_t sigblock; pthread_t tid; /* statistics variables */ double rate = 0.0, avgRate = 0.0; int64_t count = 0, totalCount = 0, totalT = 0, time, time1, time2; /* control int array for event header if writing junk data */ int control[] = {17, 8, -1, -1, 0, 0}; /* 4 multiple character command-line options */ static struct option long_options[] = {{"host", 1, NULL, 1}, {"rb", 1, NULL, 2}, {"sb", 1, NULL, 3}, {"nd", 0, NULL, 4}, {"blast", 0, NULL, 5}, {0, 0, 0, 0} }; memset(host, 0, 256); memset(interface, 0, 16); memset(mcastAddr, 0, mcastAddrMax*16); memset(et_name, 0, ET_FILENAME_LENGTH); while ((c = getopt_long_only(argc, argv, "vbmhrn:a:s:p:d:f:c:g:i:w:", long_options, 0)) != EOF) { if (c == -1) break; switch (c) { case 'c': i_tmp = atoi(optarg); if (i_tmp > 0 && i_tmp < 1001) { chunk = i_tmp; printf("Setting chunk to %d\n", chunk); } else { printf("Invalid argument to -c. Must < 1001 & > 0.\n"); exit(-1); } break; case 's': i_tmp = atoi(optarg); if (i_tmp > -1) { size = i_tmp; } else { printf("Invalid argument to -s. Must be a positive integer.\n"); exit(-1); } break; case 'w': writeData = 1; i_tmp = atoi(optarg); if (i_tmp != 1) { localEndian = 0; } break; case 'd': delay = atoi(optarg); if (delay < 0) { printf("Delay must be >= 0\n"); exit(-1); } break; case 'p': i_tmp = atoi(optarg); if (i_tmp > 1023 && i_tmp < 65535) { port = i_tmp; } else { printf("Invalid argument to -p. Must be < 65535 & > 1023.\n"); exit(-1); } break; case 'g': i_tmp = atoi(optarg); if (i_tmp > 0 && i_tmp < 501) { group = i_tmp; } else { printf("Invalid argument to -g. Must be 501 > g > 0.\n"); exit(-1); } break; case 'f': if (strlen(optarg) >= ET_FILENAME_LENGTH) { fprintf(stderr, "ET file name is too long\n"); exit(-1); } strcpy(et_name, optarg); break; case 'i': if (strlen(optarg) > 15 || strlen(optarg) < 7) { fprintf(stderr, "interface address is bad\n"); exit(-1); } strcpy(interface, optarg); break; case 'a': if (strlen(optarg) >= 16) { fprintf(stderr, "Multicast address is too long\n"); exit(-1); } if (mcastAddrCount >= mcastAddrMax) break; strcpy(mcastAddr[mcastAddrCount++], optarg); multicast = 1; break; case 1: if (strlen(optarg) >= 255) { fprintf(stderr, "host name is too long\n"); exit(-1); } strcpy(host, optarg); break; /* case rb */ case 2: i_tmp = atoi(optarg); if (i_tmp < 1) { printf("Invalid argument to -rb. Recv buffer size must be > 0.\n"); exit(-1); } recvBufSize = i_tmp; break; /* case sb */ case 3: i_tmp = atoi(optarg); if (i_tmp < 1) { printf("Invalid argument to -sb. Send buffer size must be > 0.\n"); exit(-1); } sendBufSize = i_tmp; break; /* case nd */ case 4: noDelay = 1; break; /* case blast */ case 5: blast = 1; break; case 'v': debugLevel = ET_DEBUG_INFO; break; case 'r': remote = 1; break; case 'm': multicast = 1; break; case 'b': broadcast = 1; break; case ':': case 'h': case '?': default: errflg++; } } if (!multicast && !broadcast) { /* Default to local host if direct connection */ if (strlen(host) < 1) { strcpy(host, ET_HOST_LOCAL); } } if (optind < argc || errflg || strlen(et_name) < 1) { fprintf(stderr, "\nusage: %s %s\n%s\n%s\n%s\n%s\n%s\n%s\n\n", argv[0], "-f <ET name>", " [-h] [-v] [-r] [-m] [-b] [-nd] [-blast]", " [-host <ET host>] [-w <local endian? 0/1>]", " [-s <event size>] [-c <chunk size>] [-g <group>]", " [-d <delay>] [-p <ET port>]", " [-i <interface address>] [-a <mcast addr>]", " [-rb <buf size>] [-sb <buf size>]"); fprintf(stderr, " -f ET system's (memory-mapped file) name\n"); fprintf(stderr, " -host ET system's host if direct connection (default to local)\n"); fprintf(stderr, " -h help\n"); fprintf(stderr, " -v verbose output\n\n"); fprintf(stderr, " -s event size in bytes\n"); fprintf(stderr, " -c number of events in one get/put array\n"); fprintf(stderr, " -g group from which to get new events (1,2,...)\n"); fprintf(stderr, " -d delay in millisec between each round of getting and putting events\n\n"); fprintf(stderr, " -p ET port (TCP for direct, UDP for broad/multicast)\n"); fprintf(stderr, " -r act as remote (TCP) client even if ET system is local\n"); fprintf(stderr, " -w write data (1 sequential int per event), 1 local endian, 0 else\n"); fprintf(stderr, " -blast if remote, use external data buf (no mem allocation),\n"); fprintf(stderr, " do not write data (overrides -w)\n\n"); fprintf(stderr, " -i outgoing network interface address (dot-decimal)\n"); fprintf(stderr, " -a multicast address(es) (dot-decimal), may use multiple times\n"); fprintf(stderr, " -m multicast to find ET (use default address if -a unused)\n"); fprintf(stderr, " -b broadcast to find ET\n\n"); fprintf(stderr, " -rb TCP receive buffer size (bytes)\n"); fprintf(stderr, " -sb TCP send buffer size (bytes)\n"); fprintf(stderr, " -nd use TCP_NODELAY option\n\n"); fprintf(stderr, " This producer works by making a direct connection to the\n"); fprintf(stderr, " ET system's server port and host unless at least one multicast address\n"); fprintf(stderr, " is specified with -a, the -m option is used, or the -b option is used\n"); fprintf(stderr, " in which case multi/broadcasting used to find the ET system.\n"); fprintf(stderr, " If multi/broadcasting fails, look locally to find the ET system.\n"); fprintf(stderr, " This program gets new events from the system and puts them back.\n\n"); exit(2); } /* fake data for blasting */ fakeData = (void *) malloc(size); if (fakeData == NULL) { printf("%s: out of memory\n", argv[0]); exit(1); } /* delay is in milliseconds */ if (delay > 0) { timeout.tv_sec = delay / 1000; timeout.tv_nsec = (delay - (delay / 1000) * 1000) * 1000000; } /* allocate some memory */ pe = (et_event **) calloc(chunk, sizeof(et_event *)); if (pe == NULL) { printf("%s: out of memory\n", argv[0]); exit(1); } /*************************/ /* setup signal handling */ /*************************/ /* block all signals */ sigfillset(&sigblock); status = pthread_sigmask(SIG_BLOCK, &sigblock, NULL); if (status != 0) { printf("%s: pthread_sigmask failure\n", argv[0]); exit(1); } /* spawn signal handling thread */ pthread_create(&tid, NULL, signal_thread, (void *) NULL); /******************/ /* open ET system */ /******************/ et_open_config_init(&openconfig); if (broadcast && multicast) { broadAndMulticast = 1; } /* if multicasting to find ET */ if (multicast) { if (mcastAddrCount < 1) { /* Use default mcast address if not given on command line */ status = et_open_config_addmulticast(openconfig, ET_MULTICAST_ADDR); } else { /* add multicast addresses to use */ for (j = 0; j < mcastAddrCount; j++) { if (strlen(mcastAddr[j]) > 7) { status = et_open_config_addmulticast(openconfig, mcastAddr[j]); if (status != ET_OK) { printf("%s: bad multicast address argument\n", argv[0]); exit(1); } printf("%s: adding multicast address %s\n", argv[0], mcastAddr[j]); } } } } if (broadAndMulticast) { printf("Broad and Multicasting\n"); if (port == 0) { port = ET_UDP_PORT; } et_open_config_setport(openconfig, port); et_open_config_setcast(openconfig, ET_BROADANDMULTICAST); et_open_config_sethost(openconfig, ET_HOST_ANYWHERE); } else if (multicast) { printf("Multicasting\n"); if (port == 0) { port = ET_UDP_PORT; } et_open_config_setport(openconfig, port); et_open_config_setcast(openconfig, ET_MULTICAST); et_open_config_sethost(openconfig, ET_HOST_ANYWHERE); } else if (broadcast) { printf("Broadcasting\n"); if (port == 0) { port = ET_UDP_PORT; } et_open_config_setport(openconfig, port); et_open_config_setcast(openconfig, ET_BROADCAST); et_open_config_sethost(openconfig, ET_HOST_ANYWHERE); } else { if (port == 0) { port = ET_SERVER_PORT; } et_open_config_setserverport(openconfig, port); et_open_config_setcast(openconfig, ET_DIRECT); if (strlen(host) > 0) { et_open_config_sethost(openconfig, host); } et_open_config_gethost(openconfig, host); printf("Direct connection to %s\n", host); } /*et_open_config_sethost(openconfig, ET_HOST_REMOTE);*/ /* Defaults are to use operating system default buffer sizes and turn off TCP_NODELAY */ et_open_config_settcp(openconfig, recvBufSize, sendBufSize, noDelay); if (strlen(interface) > 6) { et_open_config_setinterface(openconfig, interface); } if (remote) { printf("Set as remote\n"); et_open_config_setmode(openconfig, ET_HOST_AS_REMOTE); } /* If responses from different ET systems, return error. */ et_open_config_setpolicy(openconfig, ET_POLICY_ERROR); /* debug level */ et_open_config_setdebugdefault(openconfig, debugLevel); et_open_config_setwait(openconfig, ET_OPEN_WAIT); if (et_open(&id, et_name, openconfig) != ET_OK) { printf("%s: et_open problems\n", argv[0]); exit(1); } et_open_config_destroy(openconfig); /*-------------------------------------------------------*/ /* Make things self-consistent by not taking time to write data if blasting. * Blasting flag takes precedence. */ if (blast) { writeData = 0; } /* Find out if we have a remote connection to the ET system * so we know if we can use external data buffer for events * for blasting - which is quite a bit faster. */ et_system_getlocality(id, &locality); if (locality == ET_REMOTE) { if (blast) { noAllocFlag = ET_NOALLOC; } printf("ET is remote\n\n"); et_system_gethost(id, host); et_system_getlocaladdress(id, localAddr); printf("Connect to ET, from ip = %s to %s\n", localAddr, host); } else { /* local blasting is just the same as local producing */ blast = 0; printf("ET is local\n\n"); } /* set level of debug output (everything) */ et_system_setdebug(id, debugLevel); /* attach to grandcentral station */ if (et_station_attach(id, ET_GRANDCENTRAL, &attach1) < 0) { printf("%s: error in et_station_attach\n", argv[0]); exit(1); } /* read time for future statistics calculations */ #if defined __APPLE__ gettimeofday(&t1, NULL); time1 = 1000L*t1.tv_sec + t1.tv_usec/1000L; /* milliseconds */ #else clock_gettime(CLOCK_REALTIME, &t1); time1 = 1000L*t1.tv_sec + t1.tv_nsec/1000000L; /* milliseconds */ #endif while (1) { status = et_events_new_group(id, attach1, pe, ET_SLEEP | noAllocFlag, NULL, size, chunk, group, &numRead); if (status == ET_OK) { ; } else if (status == ET_ERROR_DEAD) { printf("%s: ET system is dead\n", argv[0]); break; } else if (status == ET_ERROR_TIMEOUT) { printf("%s: got timeout\n", argv[0]); break; } else if (status == ET_ERROR_EMPTY) { printf("%s: no events\n", argv[0]); break; } else if (status == ET_ERROR_BUSY) { printf("%s: grandcentral is busy\n", argv[0]); break; } else if (status == ET_ERROR_WAKEUP) { printf("%s: someone told me to wake up\n", argv[0]); break; } else if ((status == ET_ERROR_WRITE) || (status == ET_ERROR_READ)) { printf("%s: socket communication error\n", argv[0]); goto error; } else { printf("%s: request error\n", argv[0]); goto error; } /* if blasting data (and remote), don't write anything, just use what's in buffer when allocated */ if (blast) { for (i=0; i < numRead; i++) { et_event_setlength(pe[i], size); et_event_setdatabuffer(id, pe[i], fakeData); } } /* write data, set control values here */ else if (writeData) { char *pdata; for (i=0; i < numRead; i++) { junkData = i + startingVal; if (!localEndian) { junkData = ET_SWAP32(junkData); et_event_setendian(pe[i], ET_ENDIAN_NOTLOCAL); } et_event_getdata(pe[i], (void **) &pdata); memcpy((void *)pdata, (const void *) &junkData, sizeof(int)); /* Send all data even though we only wrote one int. */ et_event_setlength(pe[i], size); et_event_setcontrol(pe[i], control, sizeof(control)/sizeof(int)); } startingVal += numRead; } else { for (i = 0; i < numRead; i++) { et_event_setlength(pe[i], size); } } /* put events back into the ET system */ status = et_events_put(id, attach1, pe, numRead); if (status == ET_OK) { ; } else if (status == ET_ERROR_DEAD) { printf("%s: ET is dead\n", argv[0]); break; } else if ((status == ET_ERROR_WRITE) || (status == ET_ERROR_READ)) { printf("%s: socket communication error\n", argv[0]); goto error; } else { printf("%s: put error, status = %d\n", argv[0], status); goto error; } count += numRead; if (delay > 0) { nanosleep(&timeout, NULL); } /* statistics */ #if defined __APPLE__ gettimeofday(&t2, NULL); time2 = 1000L*t2.tv_sec + t2.tv_usec/1000L; /* milliseconds */ #else clock_gettime(CLOCK_REALTIME, &t2); time2 = 1000L*t2.tv_sec + t2.tv_nsec/1000000L; /* milliseconds */ #endif time = time2 - time1; if (time > 5000) { /* reset things if necessary */ if ( (totalCount >= (LONG_MAX - count)) || (totalT >= (LONG_MAX - time)) ) { totalT = totalCount = count = 0; time1 = time2; continue; } rate = 1000.0 * ((double) count) / time; totalCount += count; totalT += time; avgRate = 1000.0 * ((double) totalCount) / totalT; /* Event rates */ printf("%s Events: %3.4g Hz, %3.4g Avg.\n", argv[0], rate, avgRate); /* Data rates */ rate = ((double) count) * size / time; avgRate = ((double) totalCount) * size / totalT; printf("%s Data: %3.4g kB/s, %3.4g Avg.\n\n", argv[0], rate, avgRate); /* If including msg overhead in data rates, need to do the following avgRate = 1000.0 * (((double) totalCount) * (size + 52) + 20)/ totalT; */ count = 0; #if defined __APPLE__ gettimeofday(&t1, NULL); time1 = 1000L*t1.tv_sec + t1.tv_usec/1000L; #else clock_gettime(CLOCK_REALTIME, &t1); time1 = 1000L*t1.tv_sec + t1.tv_nsec/1000000L; #endif } } /* while(1) */ error: printf("%s: ERROR\n", argv[0]); exit(0); } /************************************************************/ /* separate thread to handle signals */ static void * signal_thread (void *arg) { sigset_t signal_set; int sig_number; sigemptyset(&signal_set); sigaddset(&signal_set, SIGINT); /* Not necessary to clean up as ET system will do it */ sigwait(&signal_set, &sig_number); printf("Got control-C, exiting\n"); exit(1); }