From 416ccde52942ea021838213bd6c357357e6760c3 Mon Sep 17 00:00:00 2001 From: Jisk Attema Date: Wed, 12 Jun 2019 11:49:38 +0200 Subject: [PATCH] Remove included PSRDada sources now depends again on an external installation of PSRDada * set LD_LIBRARY_PATH to point to libpsrdada.so directory; LD_LIBARY_PATH can be the usual colon separated list * add include paths as -I$PATH to CFLAGS, or add paths colon separated to CFLAGS. Add co-authors --- LICENSE | 18 - README.md | 14 +- psrdada/ascii_header.c | 205 ----- psrdada/ascii_header.h | 61 -- psrdada/dada_def.h | 87 -- psrdada/dada_hdu.c | 370 --------- psrdada/dada_hdu.h | 92 --- psrdada/ipcbuf.c | 1705 ---------------------------------------- psrdada/ipcbuf.h | 264 ------- psrdada/ipcio.c | 765 ------------------ psrdada/ipcio.h | 104 --- psrdada/ipcutil.c | 69 -- psrdada/ipcutil.h | 29 - psrdada/multilog.c | 153 ---- psrdada/multilog.h | 54 -- psrdada/tmutil.c | 228 ------ psrdada/tmutil.h | 40 - setup.py | 48 +- 18 files changed, 55 insertions(+), 4251 deletions(-) delete mode 100644 psrdada/ascii_header.c delete mode 100644 psrdada/ascii_header.h delete mode 100644 psrdada/dada_def.h delete mode 100644 psrdada/dada_hdu.c delete mode 100644 psrdada/dada_hdu.h delete mode 100644 psrdada/ipcbuf.c delete mode 100644 psrdada/ipcbuf.h delete mode 100644 psrdada/ipcio.c delete mode 100644 psrdada/ipcio.h delete mode 100644 psrdada/ipcutil.c delete mode 100644 psrdada/ipcutil.h delete mode 100644 psrdada/multilog.c delete mode 100644 psrdada/multilog.h delete mode 100644 psrdada/tmutil.c delete mode 100644 psrdada/tmutil.h diff --git a/LICENSE b/LICENSE index f150deb..253af6d 100644 --- a/LICENSE +++ b/LICENSE @@ -14,21 +14,3 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - - -The following files are taken from the PSRDada project, see their (website)[http://psrdada.sourceforge.net/] for license information: -psrdada/ascii_header.c -psrdada/ascii_header.h -psrdada/dada_def.h -psrdada/dada_hdu.c -psrdada/dada_hdu.h -psrdada/ipcbuf.c -psrdada/ipcbuf.h -psrdada/ipcio.c -psrdada/ipcio.h -psrdada/ipcutil.c -psrdada/ipcutil.h -psrdada/multilog.c -psrdada/multilog.h -psrdada/tmutil.c -psrdada/tmutil.h diff --git a/README.md b/README.md index 7827988..39faaa5 100644 --- a/README.md +++ b/README.md @@ -50,15 +50,18 @@ build the package # Dependencies -PSRDada dada\_db exectuable in the PATH; see their [website](https://sourceforge.net) +PSRDada, see their [website](https://sourceforge.net): + + * PSRDada dada\_db exectuable in the PATH for testing; + + * PSRDada header files needed for compilation, set CPATH or CFLAGS. + + * PSRDada library needed during runtime, set LD\_LIBRARY\_PATH # License Copyright (c) 2018, Jisk Attema Apache Software License 2.0. -This packages contains some files from the PSRDada project; a different license could apply. -See the file *LICENSE.md* from details. - # Contributing All contributions are welcome! @@ -66,3 +69,6 @@ Please use the github issue tracker to get in touch. Contributing authors so far: * Jisk Attema +* Leon Oostrum +* Liam Connor + diff --git a/psrdada/ascii_header.c b/psrdada/ascii_header.c deleted file mode 100644 index d54fd71..0000000 --- a/psrdada/ascii_header.c +++ /dev/null @@ -1,205 +0,0 @@ -/*************************************************************************** - * - * Copyright (C) 2002-2008 by Willem van Straten - * Licensed under the Academic Free License version 2.1 - * - ***************************************************************************/ - -#include "ascii_header.h" -#include "dada_def.h" - -#include -#include -#include -#include -#include -#include -#include - -#define STRLEN 4096 - -static char* whitespace = " \t\n"; - -// search header for keyword and ensure that it is preceded by whitespace */ -char* ascii_header_find (const char* header, const char* keyword) -{ - char* key = strstr (header, keyword); - - // keyword might be the very first word in header - while (key > header) - { - // fprintf (stderr, "found=%s", key); - - // if preceded by a new line, return the found key - if ( ((*(key-1) == '\n') || (*(key-1) == '\\')) && - ((*(key+strlen(keyword)) == '\t') || (*(key+strlen(keyword)) == ' '))) - break; - - // otherwise, search again, starting one byte later - key = strstr (key+1, keyword); - } - - return key; -} - -int ascii_header_set (char* header, const char* keyword, - const char* format, ...) -{ - va_list arguments; - - char value[STRLEN]; - char* eol = 0; - char* dup = 0; - int ret = 0; - - /* find the keyword (also the insertion point) */ - char* key = ascii_header_find (header, keyword); - - if (key) { - /* if the keyword is present, find the first '#' or '\n' to follow it */ - eol = key + strcspn (key, "#\n"); - } - else { - /* if the keyword is not present, append to the end, before "DATA" */ - eol = strstr (header, "DATA\n"); - if (eol) - /* insert in front of DATA */ - key = eol; - else - /* insert at end of string */ - key = header + strlen (header); - } - - va_start (arguments, format); - ret = vsnprintf (value, STRLEN, format, arguments); - va_end (arguments); - - if (ret < 0) { - perror ("ascii_header_set: error snprintf\n"); - return -1; - } - - if (eol) - /* make a copy */ - dup = strdup (eol); - - /* %Xs dictates only a minumum string length */ - if (sprintf (key, "%-12s %-20s ", keyword, value) < 0) { - if (dup) - free (dup); - perror ("ascii_header_set: error sprintf\n"); - return -1; - } - - if (dup) { - strcat (key, dup); - free (dup); - } - else - strcat (key, "\n"); - - return 0; -} - -int ascii_header_get (const char* header, const char* keyword, - const char* format, ...) -{ - va_list arguments; - - char* value = 0; - int ret = 0; - - /* find the keyword */ - char* key = ascii_header_find (header, keyword); - if (!key) - return -1; - - /* find the value after the keyword */ - value = key + strcspn (key, whitespace); - - /* parse the value */ - va_start (arguments, format); - ret = vsscanf (value, format, arguments); - va_end (arguments); - - return ret; -} - -int ascii_header_del (char * header, const char * keyword) -{ - /* find the keyword (also the delete from point) */ - char * key = ascii_header_find (header, keyword); - - /* if the keyword is present, find the first '#' or '\n' to follow it */ - if (key) - { - char * eol = key + strcspn (key, "\n") + 1; - - // make a copy of everything after the end of the key we are deleting - char * dup = strdup (eol); - - if (dup) - { - key[0] = '\0'; - strcat (header, dup); - free (dup); - return 0; - } - else - return -1; - } - else - return -1; -} - -size_t ascii_header_get_size (char * filename) -{ - size_t hdr_size = -1; - int fd = open (filename, O_RDONLY); - if (!fd) - { - fprintf (stderr, "ascii_header_get_size: failed to open %s for reading\n", filename); - } - else - { - hdr_size = ascii_header_get_size_fd (fd); - close (fd); - } - return hdr_size; -} - -size_t ascii_header_get_size_fd (int fd) -{ - size_t hdr_size = -1; - char * header = (char *) malloc (DADA_DEFAULT_HEADER_SIZE+1); - if (!header) - { - fprintf (stderr, "ascii_header_get_size: failed to allocate %d bytes\n", DADA_DEFAULT_HEADER_SIZE+1); - } - else - { - // seek to start of file - lseek (fd, 0, SEEK_SET); - - // read the header - ssize_t ret = read (fd, header, DADA_DEFAULT_HEADER_SIZE); - if (ret != DADA_DEFAULT_HEADER_SIZE) - { - fprintf (stderr, "ascii_header_get_size: failed to read %d bytes from file\n", DADA_DEFAULT_HEADER_SIZE); - } - else - { - // check the actual HDR_SIZE in the header - if (ascii_header_get (header, "HDR_SIZE", "%ld", &hdr_size) != 1) - { - fprintf (stderr, "ascii_header_get_size: failed to read HDR_SIZE from header\n"); - hdr_size = -1; - } - } - // seek back to start of file - lseek (fd, 0, SEEK_SET); - free (header); - } - return hdr_size; -} - diff --git a/psrdada/ascii_header.h b/psrdada/ascii_header.h deleted file mode 100644 index 8e81277..0000000 --- a/psrdada/ascii_header.h +++ /dev/null @@ -1,61 +0,0 @@ - -#include - -#ifndef __ASCII_HEADER_h -#define __ASCII_HEADER_h - -/*! ascii_header_set/get - Set/get header variables - - \param header pointer to the header buffer - \param keyword the header keyword, such as NPOL - \param code printf/scanf code, such as "%d" - - \retval 0 or 1 on success, -1 on failure - - \pre The code(s) must match the type(s) of the argument(s). - - For example: - - char ascii_header[ASCII_HEADER_SIZE] = ASCII_HEADER_INIT; - - char* telescope_name = "parkes"; - ascii_header_set (ascii_header, "TELESCOPE", telescope_name); - - float bandwidth = 64.0; - ascii_header_set (ascii_header, "BW", "%f", bandwidth); - - [...] - - double centre_frequency; - ascii_header_get (ascii_header, "FREQ", "%lf", ¢re_frequency); - - int chan; - float gain; - ascii_header_get (ascii_header, "GAIN", "%d %f", &chan, &gain); - -*/ - -#ifdef __cplusplus -extern "C" { -#endif - -/* returns zero if no error occurs, -1 on error */ -int ascii_header_set (char* header, const char* keyword, - const char* code, ...); - -/* returns number of elements parsed if no error occurs, -1 on error */ -int ascii_header_get (const char* header, const char* keyword, - const char* code, ...); - -/* delete the key from the header */ -int ascii_header_del (char * header, const char * keyword); - -/* read the HDR_SIZE from a .dada file */ -size_t ascii_header_get_size (char * filename); -size_t ascii_header_get_size_fd (int fd); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/psrdada/dada_def.h b/psrdada/dada_def.h deleted file mode 100644 index 981e497..0000000 --- a/psrdada/dada_def.h +++ /dev/null @@ -1,87 +0,0 @@ -#ifndef __DADA_DEF_H -#define __DADA_DEF_H - -#include -#include - -/* ************************************************************************ - - dada default definitions - - ************************************************************************ */ - -/* base key number used to identify DATA header and data block */ -#define DADA_DEFAULT_BLOCK_KEY 0x0000dada - -/* default number of blocks in Data Block */ -#define DADA_DEFAULT_BLOCK_NUM ((uint64_t) 4) - -/* default size of blocks in Data Block */ -#define DADA_DEFAULT_BLOCK_SIZE ((uint64_t) sysconf (_SC_PAGE_SIZE) * 128) - -/* default size of block in Header Block */ -#define DADA_DEFAULT_HEADER_SIZE ((uint64_t) sysconf (_SC_PAGE_SIZE)) - -/* default port to connect to pwc command interface */ -#define DADA_DEFAULT_PWC_PORT 56026 - -/* default port to connect to pwcc command interface */ -#define DADA_DEFAULT_PWCC_PORT 56030 - -/* default port to connect to primary write client logging interface */ -#define DADA_DEFAULT_PWC_LOG 56027 - -/* default port to connect to dada_pwc_command combined logging interface */ -#define DADA_DEFAULT_PWCC_LOGPORT 56028 - -/* default port to connect to dbdisk logging interface */ -#define DADA_DEFAULT_DBDISK_LOG 56037 - -/* default port to connect to diskdb logging interface */ -#define DADA_DEFAULT_DISKDB_LOG 56039 - -/* default port to connect to dbnic logging interface */ -#define DADA_DEFAULT_DBNIC_LOG 56047 - -/* default port to connect to primary write client command interface */ -#define DADA_DEFAULT_NICDB_PORT 56056 - -/* default port to connect to primary write client logging interface */ -#define DADA_DEFAULT_NICDB_LOG 56057 - -/* default port to connect to dbull client logging interface */ -#define DADA_DEFAULT_DBNULL_LOG 56061 - -/* default port to connect to dada_dbmonitor logging interface */ -#define DADA_DEFAULT_DBMONITOR_LOG 56063 - -/* default port to connect to dbnic logging interface */ -#define DADA_DEFAULT_DBNDB_LOG 56071 - -/* default port to connect to for IB comm management */ -#define DADA_DEFAULT_IBDB_PORT 56072 - -/* default port to connect to dbib logging interface */ -#define DADA_DEFAULT_DBIB_LOG 56073 - -/* default port to connect to ibdb logging interface */ -#define DADA_DEFAULT_IBDB_LOG 56074 - -/* default file size of 640 Million bytes */ -#define DADA_DEFAULT_FILESIZE 640000000 - -#define DADA_DEFAULT_XFERSIZE DADA_DEFAULT_FILESIZE - -/* maximum length of observation id string */ -#define DADA_OBS_ID_MAXLEN 64 - -/* the format of the UTC_START string used in call to strftime */ -#define DADA_TIMESTR "%Y-%m-%d-%H:%M:%S" - -/* Length of the DADA_TIMESTR */ -#define DADA_TIMESTR_LENGTH 21 - -#define DADA_ERROR_SOFT -1 -#define DADA_ERROR_HARD -2 -#define DADA_ERROR_FATAL -3 -#endif diff --git a/psrdada/dada_hdu.c b/psrdada/dada_hdu.c deleted file mode 100644 index 8e091ec..0000000 --- a/psrdada/dada_hdu.c +++ /dev/null @@ -1,370 +0,0 @@ -#include "dada_hdu.h" -#include "dada_def.h" -#include "ascii_header.h" - -#include -#include -#include - -/*! Create a new DADA Header plus Data Unit */ -dada_hdu_t* dada_hdu_create (multilog_t* log) -{ - dada_hdu_t* hdu = malloc (sizeof(dada_hdu_t)); - assert (hdu != 0); - - hdu -> log = log; - hdu -> data_block = 0; - hdu -> header_block = 0; - - hdu -> header = 0; - hdu -> header_size = 0; - - dada_hdu_set_key( hdu, DADA_DEFAULT_BLOCK_KEY ); - return hdu; -} - -/*! Set the key of the DADA Header plus Data Unit */ -void dada_hdu_set_key (dada_hdu_t* hdu, key_t key) -{ - hdu -> data_block_key = key; - hdu -> header_block_key = key + 1; -} - -/*! Destroy a DADA primary write client main loop */ -void dada_hdu_destroy (dada_hdu_t* hdu) -{ - assert (hdu != 0); - - if (hdu->data_block) - dada_hdu_disconnect (hdu); - - free (hdu); -} - -/*! Connect the DADA Header plus Data Unit */ -int dada_hdu_connect (dada_hdu_t* hdu) -{ - ipcbuf_t ipcbuf_init = IPCBUF_INIT; - ipcio_t ipcio_init = IPCIO_INIT; - - assert (hdu != 0); - - if (hdu->data_block) { - fprintf (stderr, "dada_hdu_connect: already connected\n"); - return -1; - } - - hdu->header_block = malloc (sizeof(ipcbuf_t)); - assert (hdu->header_block != 0); - *(hdu->header_block) = ipcbuf_init; - - hdu->data_block = malloc (sizeof(ipcio_t)); - assert (hdu->data_block != 0); - *(hdu->data_block) = ipcio_init; - - /* connect to the shared memory */ - if (ipcbuf_connect (hdu->header_block, hdu->header_block_key) < 0) - { - multilog (hdu->log, LOG_ERR, "Failed to connect to Header Block\n"); - free (hdu->header_block); - hdu->header_block = 0; - free (hdu->data_block); - hdu->data_block = 0; - return -1; - } - - if (ipcio_connect (hdu->data_block, hdu->data_block_key) < 0) - { - multilog (hdu->log, LOG_ERR, "Failed to connect to Data Block\n"); - free (hdu->header_block); - hdu->header_block = 0; - free (hdu->data_block); - hdu->data_block = 0; - return -1; - } - - return 0; -} - - -/*! Disconnect the DADA Header plus Data Unit */ -int dada_hdu_disconnect (dada_hdu_t* hdu) -{ - int status = 0; - - assert (hdu != 0); - - if (!hdu->data_block) { - fprintf (stderr, "dada_hdu_disconnect: not connected\n"); - return -1; - } - - if (ipcio_disconnect (hdu->data_block) < 0) { - multilog (hdu->log, LOG_ERR, "Failed to disconnect from Data Block\n"); - status = -1; - } - - if (ipcbuf_disconnect (hdu->header_block) < 0) { - multilog (hdu->log, LOG_ERR, "Failed to disconnect from Header Block\n"); - status = -1; - } - - free (hdu->header_block); - hdu->header_block = 0; - free (hdu->data_block); - hdu->data_block = 0; - - return status; -} - -/*! Lock DADA Header plus Data Unit designated reader */ -int dada_hdu_lock_read (dada_hdu_t* hdu) -{ - assert (hdu != 0); - - if (!hdu->data_block) { - fprintf (stderr, "dada_hdu_disconnect: not connected\n"); - return -1; - } - - if (ipcbuf_lock_read (hdu->header_block) < 0) { - multilog (hdu->log, LOG_ERR, "Could not lock Header Block for reading\n"); - return -1; - } - - if (ipcio_open (hdu->data_block, 'R') < 0) { - multilog (hdu->log, LOG_ERR, "Could not lock Data Block for reading\n"); - return -1; - } - - return 0; -} - -/*! Unlock DADA Header plus Data Unit designated reader */ -int dada_hdu_unlock_read (dada_hdu_t* hdu) -{ - assert (hdu != 0); - - if (!hdu->data_block) - { - fprintf (stderr, "dada_hdu_disconnect: not connected\n"); - return -1; - } - - if (ipcio_close (hdu->data_block) < 0) - { - multilog (hdu->log, LOG_ERR, "Could not unlock Data Block read\n"); - return -1; - } - - if (hdu->header) - { - free (hdu->header); - hdu->header = 0; - if (ipcbuf_is_reader (hdu->header_block)) - ipcbuf_mark_cleared (hdu->header_block); - } - - if (ipcbuf_unlock_read (hdu->header_block) < 0) { - multilog (hdu->log, LOG_ERR,"Could not unlock Header Block read\n"); - return -1; - } - - return 0; -} - -/*! Lock DADA Header plus Data Unit designated writer */ -int dada_hdu_lock_write (dada_hdu_t* hdu) -{ - return dada_hdu_lock_write_spec (hdu, 'W'); -} - -/*! Lock DADA Header plus Data Unit designated writer with specified mode */ -int dada_hdu_lock_write_spec (dada_hdu_t* hdu, char writemode) -{ - assert (hdu != 0); - - if (!hdu->data_block) { - fprintf (stderr, "dada_hdu_disconnect: not connected\n"); - return -1; - } - - if (ipcbuf_lock_write (hdu->header_block) < 0) { - multilog (hdu->log, LOG_ERR, "Could not lock Header Block for writing\n"); - return -1; - } - - if (ipcio_open (hdu->data_block, writemode) < 0) { - multilog (hdu->log, LOG_ERR, "Could not lock Data Block for writing\n"); - return -1; - } - - return 0; -} - -/*! Unlock DADA Header plus Data Unit designated writer */ -int dada_hdu_unlock_write (dada_hdu_t* hdu) -{ - assert (hdu != 0); - - if (!hdu->data_block) { - fprintf (stderr, "dada_hdu_disconnect: not connected\n"); - return -1; - } - - if (ipcio_is_open (hdu->data_block)) - if (ipcio_close (hdu->data_block) < 0) { - multilog (hdu->log, LOG_ERR, "Could not unlock Data Block write\n"); - return -1; - } - - if (ipcbuf_unlock_write (hdu->header_block) < 0) { - multilog (hdu->log, LOG_ERR, "Could not unlock Header Block write\n"); - return -1; - } - - return 0; -} - -/*! Lock DADA Header plus Data Unit designated reader */ -int dada_hdu_open_view (dada_hdu_t* hdu) -{ - assert (hdu != 0); - - if (!hdu->data_block) - { - fprintf (stderr, "dada_hdu_open_view: not connected\n"); - return -1; - } - - if (ipcio_open (hdu->data_block, 'r') < 0) - { - multilog (hdu->log, LOG_ERR, "Could not open Data Block for viewing\n"); - return -1; - } - - return 0; -} - -/*! Unlock DADA Header plus Data Unit designated reader */ -int dada_hdu_close_view (dada_hdu_t* hdu) -{ - assert (hdu != 0); - - if (!hdu->data_block) - { - fprintf (stderr, "dada_hdu_close_view: not connected\n"); - return -1; - } - - if (ipcio_close (hdu->data_block) < 0) - { - multilog (hdu->log, LOG_ERR, "Could not close Data Block view\n"); - return -1; - } - - return 0; -} - -int dada_hdu_open (dada_hdu_t* hdu) -{ - /* pointer to the status and error logging facility */ - multilog_t* log = 0; - - /* The header from the ring buffer */ - char* header = 0; - uint64_t header_size = 0; - - /* header size, as defined by HDR_SIZE attribute */ - uint64_t hdr_size = 0; - - assert (hdu != 0); - assert (hdu->header == 0); - - log = hdu->log; - - while (!header_size) - { - /* Wait for the next valid header sub-block */ - header = ipcbuf_get_next_read (hdu->header_block, &header_size); - - if (!header) { - multilog (log, LOG_ERR, "Could not get next header\n"); - return -1; - } - - if (!header_size) - { - if (ipcbuf_is_reader (hdu->header_block)) - ipcbuf_mark_cleared (hdu->header_block); - - if (ipcbuf_eod (hdu->header_block)) - { - multilog (log, LOG_INFO, "End of data on header block\n"); - if (ipcbuf_is_reader (hdu->header_block)) - ipcbuf_reset (hdu->header_block); - } - else - { - multilog (log, LOG_ERR, "Empty header block\n"); - return -1; - } - } - } - - header_size = ipcbuf_get_bufsz (hdu->header_block); - - /* Check that header is of advertised size */ - if (ascii_header_get (header, "HDR_SIZE", "%"PRIu64, &hdr_size) != 1) { - multilog (log, LOG_ERR, "Header with no HDR_SIZE. Setting to %"PRIu64"\n", - header_size); - hdr_size = header_size; - if (ascii_header_set (header, "HDR_SIZE", "%"PRIu64, hdr_size) < 0) { - multilog (log, LOG_ERR, "Error setting HDR_SIZE\n"); - return -1; - } - } - - if (hdr_size < header_size) - header_size = hdr_size; - - else if (hdr_size > header_size) { - multilog (log, LOG_ERR, "HDR_SIZE=%"PRIu64 - " > Header Block size=%"PRIu64"\n", hdr_size, header_size); - multilog (log, LOG_DEBUG, "ASCII header dump\n%s", header); - return -1; - } - - /* Duplicate the header */ - if (header_size > hdu->header_size) - { - hdu->header = realloc (hdu->header, header_size); - assert (hdu->header != 0); - hdu->header_size = header_size; - } - - memcpy (hdu->header, header, header_size); - return 0; -} - -// return the base addresses and sizes of the datablock -char ** dada_hdu_db_addresses(dada_hdu_t * hdu, uint64_t * nbufs, uint64_t * bufsz) -{ - ipcbuf_t *db = (ipcbuf_t *) hdu->data_block; - *nbufs = ipcbuf_get_nbufs (db); - *bufsz = ipcbuf_get_bufsz (db); - - return db->buffer; -} - -// return the base addresses and sizes of the datablock -char ** dada_hdu_hb_addresses(dada_hdu_t * hdu, uint64_t * nbufs, uint64_t * bufsz) -{ - ipcbuf_t *hb = hdu->header_block; - *nbufs = ipcbuf_get_nbufs (hb); - *bufsz = ipcbuf_get_bufsz (hb); - - return hb->buffer; -} - - diff --git a/psrdada/dada_hdu.h b/psrdada/dada_hdu.h deleted file mode 100644 index f96a816..0000000 --- a/psrdada/dada_hdu.h +++ /dev/null @@ -1,92 +0,0 @@ -#ifndef __DADA_HDU_H -#define __DADA_HDU_H - -/* ************************************************************************ - - dada_hdu_t - a struct and associated routines for creation and - management of a DADA Header plus Data Unit - - ************************************************************************ */ - -#include "multilog.h" -#include "ipcio.h" - -#ifdef __cplusplus -extern "C" { -#endif - - typedef struct dada_hdu { - - /*! The status and error logging interface */ - multilog_t* log; - - /*! The Data Block interface */ - ipcio_t* data_block; - - /*! The Header Block interface */ - ipcbuf_t* header_block; - - /* The header */ - char* header; - - /* The size of the header */ - uint64_t header_size; - - /* The Data Block key */ - key_t data_block_key; - - /* The Header Block key */ - key_t header_block_key; - - } dada_hdu_t; - - /*! Create a new DADA Header plus Data Unit */ - dada_hdu_t* dada_hdu_create (multilog_t* log); - - /*! Set the key of the DADA Header plus Data Unit */ - void dada_hdu_set_key (dada_hdu_t* hdu, key_t key); - - /*! Destroy a DADA Header plus Data Unit */ - void dada_hdu_destroy (dada_hdu_t* hdu); - - /*! Connect the DADA Header plus Data Unit */ - int dada_hdu_connect (dada_hdu_t* hdu); - - /*! Connect the DADA Header plus Data Unit */ - int dada_hdu_disconnect (dada_hdu_t* hdu); - - /*! Lock DADA Header plus Data Unit designated reader */ - int dada_hdu_lock_read (dada_hdu_t* hdu); - - /*! Unlock DADA Header plus Data Unit designated reader */ - int dada_hdu_unlock_read (dada_hdu_t* hdu); - - /*! Lock DADA Header plus Data Unit designated writer */ - int dada_hdu_lock_write (dada_hdu_t* hdu); - - /*! Unlock DADA Header plus Data Unit designated writer */ - int dada_hdu_unlock_write (dada_hdu_t* hdu); - - /*! Lock DADA Header plus Data Unit designated writer */ - int dada_hdu_lock_write_spec (dada_hdu_t* hdu, char writemode); - - /*! Open the DADA Header plus Data Unit for passive viewing */ - int dada_hdu_open_view (dada_hdu_t* hdu); - - /*! Close the DADA Header pluse Data Unit passive viewing mode */ - int dada_hdu_close_view (dada_hdu_t* hdu); - - /*! Read the next header from the struct */ - int dada_hdu_open (dada_hdu_t* hdu); - - /*! Return base addresses of data block buffers, nbufs and bufsz */ - char ** dada_hdu_db_addresses(dada_hdu_t * hdu, uint64_t * nbufs, uint64_t * bufsz); - - /*! Return base addresses of header block buffers, nbufs and bufsz */ - char ** dada_hdu_hb_addresses(dada_hdu_t * hdu, uint64_t * nbufs, uint64_t * bufsz); - -#ifdef __cplusplus - } -#endif - -#endif diff --git a/psrdada/ipcbuf.c b/psrdada/ipcbuf.c deleted file mode 100644 index 6969c78..0000000 --- a/psrdada/ipcbuf.c +++ /dev/null @@ -1,1705 +0,0 @@ - - -#include -#include -#include - -#include -#include -#include -#include -#include - -#include -#include -#include - -#include - -#include -#include "ipcbuf.h" -#include "ipcutil.h" - -//#define _DEBUG 1 - -/* semaphores */ - -// now defined in ipcbuf.h - -/* process states */ - -#define IPCBUF_DISCON 0 /* disconnected */ -#define IPCBUF_VIEWER 1 /* connected */ - -#define IPCBUF_WRITER 2 /* one process that writes to the buffer */ -#define IPCBUF_WRITING 3 /* start-of-data flag has been raised */ -#define IPCBUF_WCHANGE 4 /* next operation will change writing state */ - -#define IPCBUF_READER 5 /* one process that reads from the buffer */ -#define IPCBUF_READING 6 /* start-of-data flag has been raised */ -#define IPCBUF_RSTOP 7 /* end-of-data flag has been raised */ - -#define IPCBUF_VIEWING 8 /* currently viewing */ -#define IPCBUF_VSTOP 9 /* end-of-data while viewer */ - -/* *************************************************************** */ -/*! - creates the shared memory block that may be used as an ipcsync_t struct - - \param key the shared memory key - \param flag the flags to pass to shmget -*/ -int ipcsync_get (ipcbuf_t* id, key_t key, uint64_t nbufs, int flag) -{ - size_t required = sizeof(ipcsync_t) + nbufs + sizeof(key_t) * nbufs; - - if (!id) - { - fprintf (stderr, "ipcsync_get: invalid ipcbuf_t*\n"); - return -1; - } - - id->sync = ipc_alloc (key, required, flag, &(id->syncid)); - if (id->sync == 0) - { - fprintf (stderr, "ipcsync_get: ipc_alloc error\n"); - return -1; - } - - if (nbufs == 0) - nbufs = id->sync->nbufs; - - id->count = (char*) (id->sync + 1); - -#ifdef _DEBUG - fprintf (stderr, "SYNC=%p COUNT=%p\n", id->sync, id->count); -#endif - - id->shmkey = (key_t*) (id->count + nbufs); - id->state = 0; - id->viewbuf = 0; - - return 0; -} - -int ipcbuf_get (ipcbuf_t* id, int flag, int n_readers) -{ - int retval = 0; - ipcsync_t* sync = 0; - uint ibuf = 0; - unsigned iread = 0; - - if (!id) - { - fprintf (stderr, "ipcbuf_get: invalid ipcbuf_t*\n"); - return -1; - } - - sync = id->sync; - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_get: semkey_connect=0x%x shmkey=0x%x\n", - sync->semkey_connect, id->shmkey[0]); -#endif - - /* shared memory connection semaphores */ - id->semid_connect = semget (sync->semkey_connect, IPCBUF_CONN_NSEM, flag); - if (id->semid_connect < 0) - { - fprintf (stderr, "ipcbuf_get: semget(0x%x, %d, 0x%x): %s\n", - sync->semkey_connect, IPCBUF_CONN_NSEM, flag, strerror(errno)); - retval = -1; - } - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_get: semid=%d\n", id->semid_connect); -#endif - - /* shared memory data semphores */ - id->semid_data = (int *) malloc(sizeof(int) * sync->n_readers); - assert(id->semid_data != 0); - for (iread=0; iread < sync->n_readers; iread++) - { - id->semid_data[iread] = semget (sync->semkey_data[iread], IPCBUF_DATA_NSEM, flag); - if (id->semid_data[iread] < 0) - { - fprintf (stderr, "ipcbuf_get: semget(0x%x, %d, 0x%x): %s\n", - sync->semkey_data[iread], IPCBUF_DATA_NSEM, flag, strerror(errno)); - retval = -1; - } -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_get: semid_data[%d]=%d\n", iread, id->semid_data[iread]); -#endif - } - - id->buffer = (char**) malloc (sizeof(char*) * sync->nbufs); - assert (id->buffer != 0); - id->shmid = (int*) malloc (sizeof(int) * sync->nbufs); - assert (id->shmid != 0); - - for (ibuf=0; ibuf < sync->nbufs; ibuf++) - { - id->buffer[ibuf] = ipc_alloc (id->shmkey[ibuf], sync->bufsz, - flag, id->shmid + ibuf); -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_get: id->buffer[%u]=%p\n", ibuf, (void *) id->buffer[ibuf]); -#endif - - if ( id->buffer[ibuf] == 0 ) - { - fprintf (stderr, "ipcbuf_get: ipc_alloc buffer[%u] %s\n", - ibuf, strerror(errno)); - retval = -1; - break; - } - } - - return retval; -} - - -/* *************************************************************** */ -/*! - Creates a new ring buffer in shared memory - - \return pointer to a new ipcbuf_t ring buffer struct - \param nbufs - \param bufsz - \param key -*/ - -/* start with some random key for all of the pieces */ -static int key_increment = 0x00010000; - -int ipcbuf_create (ipcbuf_t* id, key_t key, uint64_t nbufs, uint64_t bufsz, unsigned n_readers) -{ - uint64_t ibuf = 0; - uint64_t iread = 0; - int flag = IPCUTIL_PERM | IPC_CREAT | IPC_EXCL; - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_create: key=%d nbufs=%"PRIu64" bufsz=%"PRIu64" n_readers=%d\n", - key, nbufs, bufsz, n_readers); -#endif - - if (ipcsync_get (id, key, nbufs, flag) < 0) - { - fprintf (stderr, "ipcbuf_create: ipcsync_get error\n"); - return -1; - } - - id->sync->nbufs = nbufs; - id->sync->bufsz = bufsz; - id->sync->n_readers = n_readers; - - for (ibuf = 0; ibuf < IPCBUF_XFERS; ibuf++) - { - id->sync->s_buf [ibuf] = 0; - id->sync->s_byte [ibuf] = 0; - id->sync->e_buf [ibuf] = 0; - id->sync->e_byte [ibuf] = 0; - id->sync->eod [ibuf] = 1; - } - - // set semkey for access control to shared memory - key += key_increment; - id->sync->semkey_connect = key; - - // set semkey for each reader - for (iread = 0; iread < IPCBUF_READERS; iread++) - { - key += key_increment; - id->sync->semkey_data[iread] = key; - } - - for (ibuf = 0; ibuf < nbufs; ibuf++) - { - id->count[ibuf] = 0; - key += key_increment; - id->shmkey[ibuf] = key; - } - - id->sync->w_buf = 0; - id->sync->w_xfer = 0; - id->sync->w_state = IPCBUF_DISCON; - - for (iread = 0; iread < IPCBUF_READERS; iread++) - { - id->sync->r_bufs[iread] = 0; - id->sync->r_xfers[iread] = 0; - id->sync->r_states[iread] = IPCBUF_DISCON; - } - - id->buffer = 0; - id->viewbuf = 0; - id->xfer = 0; - id->soclock_buf = 0; - id->iread = -1; - - if (ipcbuf_get (id, flag, n_readers) < 0) - { - fprintf (stderr, "ipcbuf_create: ipcbuf_get error\n"); - return -1; - } - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_create: syncid=%d semid=%d\n", - id->syncid, id->semid_connect); -#endif - - /* ready to be locked by writer and reader processes */ - if (ipc_semop (id->semid_connect, IPCBUF_WRITE, 1, 0) < 0) - { - fprintf (stderr, "ipcbuf_create: error incrementing IPCBUF_WRITE\n"); - return -1; - } - - if (ipc_semop (id->semid_connect, IPCBUF_READ, id->sync->n_readers, 0) < 0) - { - fprintf (stderr, "ipcbuf_create: error incrementing IPCBUF_READ\n"); - return -1; - } - - /* ready for writer to decrement when it needs to set SOD/EOD */ - for (iread = 0; iread < n_readers; iread++) - { - if (ipc_semop (id->semid_data[iread], IPCBUF_SODACK, IPCBUF_XFERS, 0) < 0) - { - fprintf (stderr, "ipcbuf_create: error incrementing IPCBUF_SODACK for reader %d\n", iread); - return -1; - } - if (ipc_semop (id->semid_data[iread], IPCBUF_EODACK, IPCBUF_XFERS, 0) < 0) - { - fprintf (stderr, "ipcbuf_create: error incrementing IPCBUF_EODACK for reader %d\n", iread); - return -1; - } - if (ipc_semop (id->semid_data[iread], IPCBUF_READER_CONN, 1, 0) < 0) - { - fprintf (stderr, "ipcbuf_create: error incrementing IPCBUF_EODACK for reader %d\n", iread); - return -1; - } - - } - - id->state = IPCBUF_VIEWER; - return 0; -} - -int ipcbuf_connect (ipcbuf_t* id, key_t key) -{ - int flag = IPCUTIL_PERM; - - if (ipcsync_get (id, key, 0, flag) < 0) - { - fprintf (stderr, "ipcbuf_connect: ipcsync_get error\n"); - return -1; - } - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_connect: key=0x%x nbufs=%"PRIu64" bufsz=%"PRIu64"\n", - key, id->sync->nbufs, id->sync->bufsz); -#endif - - id->buffer = 0; - - // we are connecting, so dont create (-1) - if (ipcbuf_get (id, flag, -1) < 0) - { - fprintf (stderr, "ipcbuf_connect: ipcbuf_get error\n"); - return -1; - } - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_connect: syncid=%d semid_connect=%d\n", - id->syncid, id->semid_connect); -#endif - - id->state = IPCBUF_VIEWER; - return 0; -} - - -int ipcbuf_disconnect (ipcbuf_t* id) -{ - uint64_t ibuf = 0; - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_disconnect: iread=%d\n", id->iread); -#endif - - if (!id) - { - fprintf (stderr, "ipcbuf_disconnect: invalid ipcbuf_t\n"); - return -1; - } - - for (ibuf = 0; ibuf < id->sync->nbufs; ibuf++) - if (id->buffer[ibuf] && shmdt (id->buffer[ibuf]) < 0) - perror ("ipcbuf_disconnect: shmdt(buffer)"); - - if (id->buffer) free (id->buffer); id->buffer = 0; - if (id->shmid) free (id->shmid); id->shmid = 0; - if (id->semid_data) free (id->semid_data); id->semid_data = 0; - - if (id->sync && shmdt (id->sync) < 0) - perror ("ipcbuf_disconnect: shmdt(sync)"); - - id->sync = 0; - - id->state = IPCBUF_DISCON; - id->iread = -1; - - return 0; -} - -int ipcbuf_destroy (ipcbuf_t* id) -{ - uint64_t ibuf = 0; - int iread = 0; - - if (!id) - { - fprintf (stderr, "ipcbuf_destroy: invalid ipcbuf_t\n"); - return -1; - } - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_destroy: semid_connect=%d\n", id->semid_connect); -#endif - - if (id->semid_connect>-1 && semctl (id->semid_connect, 0, IPC_RMID) < 0) - perror ("ipcbuf_destroy: semctl"); - id->semid_connect = -1; - - for (iread = 0; iread < id->sync->n_readers; iread++) - { - if (id->semid_data[iread]>-1 && semctl (id->semid_data[iread], 0, IPC_RMID) < 0) - perror ("ipcbuf_destroy: semctl"); - id->semid_data[iread] = -1; - } - - for (ibuf = 0; ibuf < id->sync->nbufs; ibuf++) - { -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_destroy: id[%"PRIu64"]=%x\n", - ibuf, id->shmid[ibuf]); -#endif - - if (id->buffer) - id->buffer[ibuf] = 0; - - if (id->shmid[ibuf]>-1 && shmctl (id->shmid[ibuf], IPC_RMID, 0) < 0) - perror ("ipcbuf_destroy: buf shmctl"); - - } - - if (id->buffer) free (id->buffer); id->buffer = 0; - if (id->shmid) free (id->shmid); id->shmid = 0; - if (id->semid_data) free (id->semid_data); id->semid_data = 0; - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_destroy: syncid=%d\n", id->syncid); -#endif - - if (id->syncid>-1 && shmctl (id->syncid, IPC_RMID, 0) < 0) - perror ("ipcbuf_destroy: sync shmctl"); - - id->sync = 0; - id->syncid = -1; - - return 0; -} - -/*! Lock this process in as the designated writer */ -int ipcbuf_lock_write (ipcbuf_t* id) -{ - if (id->state != IPCBUF_VIEWER) - { - fprintf (stderr, "ipcbuf_lock_write: not connected\n"); - return -1; - } - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_lock_write: decrement WRITE=%d addr=%x\n", - semctl (id->semid_connect, IPCBUF_WRITE, GETVAL), id); -#endif - - /* decrement the write semaphore (only one can) */ - if (ipc_semop (id->semid_connect, IPCBUF_WRITE, -1, SEM_UNDO) < 0) - { - fprintf (stderr, "ipcbuf_lock_write: error decrement IPCBUF_WRITE\n"); - return -1; - } - - /* WCHANGE is a special state that means the process will change into the - WRITING state on the first call to get_next_write */ - - if (id->sync->w_state == 0) - id->state = IPCBUF_WCHANGE; - else - id->state = IPCBUF_WRITING; - - id->xfer = id->sync->w_xfer % IPCBUF_XFERS; - - return 0; -} - -int ipcbuf_unlock_write (ipcbuf_t* id) -{ - if (!ipcbuf_is_writer (id)) - { - fprintf (stderr, "ipcbuf_unlock_write: state != WRITER\n"); - return -1; - } - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_unlock_write: increment WRITE=%d\n", - semctl (id->semid_connect, IPCBUF_WRITE, GETVAL)); -#endif - - if (ipc_semop (id->semid_connect, IPCBUF_WRITE, 1, SEM_UNDO) < 0) - { - fprintf (stderr, "ipcbuf_unlock_write: error increment IPCBUF_WRITE\n"); - return -1; - } - - id->state = IPCBUF_VIEWER; - - return 0; -} - -char ipcbuf_is_writing (ipcbuf_t* id) -{ - return id->state == IPCBUF_WRITING; -} - - -int ipcbuf_enable_eod (ipcbuf_t* id) -{ - /* must be the designated writer */ - if (id->state != IPCBUF_WRITING) - { - fprintf (stderr, "ipcbuf_enable_eod: not writing\n"); - return -1; - } - - id->state = IPCBUF_WCHANGE; - - return 0; -} - -int ipcbuf_disable_sod (ipcbuf_t* id) -{ - /* must be the designated writer */ - if (id->state != IPCBUF_WCHANGE) - { - fprintf (stderr, "ipcbuf_disable_sod: not able to change writing state\n"); - return -1; - } - - id->state = IPCBUF_WRITER; - - return 0; -} - -uint64_t ipcbuf_get_sod_minbuf (ipcbuf_t* id) -{ - ipcsync_t* sync = id->sync; - - /* Since we may have multiple transfers, the minimum sod will be relative - * to the first buffer we clocked data onto */ - uint64_t new_bufs_written = sync->w_buf - id->soclock_buf; - - if (new_bufs_written < sync->nbufs) - return id->soclock_buf; - else - return sync->w_buf - sync->nbufs + 1; - -} - - /* start buf should be the buffer to begin on, and should be aware - * of previously filled bufs, filled within the same observation - * or from a previous observation */ - -int ipcbuf_enable_sod (ipcbuf_t* id, uint64_t start_buf, uint64_t start_byte) -{ - ipcsync_t* sync = id->sync; - uint64_t new_bufs = 0; - uint64_t bufnum = 0; - int iread = 0; - - /* must be the designated writer */ - if (id->state != IPCBUF_WRITER && id->state != IPCBUF_WCHANGE) - { - fprintf (stderr, "ipcbuf_enable_sod: not writer state=%d\n", id->state); - return -1; - } - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_enable_sod: start buf=%"PRIu64" w_buf=%"PRIu64"\n", - start_buf, sync->w_buf); -#endif - - /* start_buf must be less than or equal to the number of buffers written */ - if (start_buf > sync->w_buf) - { - fprintf (stderr, - "ipcbuf_enable_sod: start_buf=%"PRIu64" > w_buf=%"PRIu64"\n", - start_buf, sync->w_buf); - return -1; - } - - if (start_buf < ipcbuf_get_sod_minbuf (id)) - { - fprintf (stderr, - "ipcbuf_enable_sod: start_buf=%"PRIu64" < start_min=%"PRIu64"\n", - start_buf, ipcbuf_get_sod_minbuf (id)); - return -1; - } - - /* start_byte must be less than or equal to the size of the buffer */ - if (start_byte > sync->bufsz) - { - fprintf (stderr, - "ipcbuf_enable_sod: start_byte=%"PRIu64" > bufsz=%"PRIu64"\n", - start_byte, sync->bufsz); - return -1; - } - - for (iread = 0; iread < sync->n_readers; iread++) - { - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_enable_sod: decrement SODACK[%d]=%d\n", - iread, semctl (id->semid_data[iread], IPCBUF_SODACK, GETVAL)); -#endif - - /* decrement the start-of-data acknowlegement semaphore */ - if (ipc_semop (id->semid_data[iread], IPCBUF_SODACK, -1, 0) < 0) - { - fprintf (stderr, "ipcbuf_enable_sod: error decrement SODACK[%d]\n", iread); - return -1; - } - } - id->xfer = sync->w_xfer % IPCBUF_XFERS; - - sync->s_buf [id->xfer] = start_buf; - sync->s_byte [id->xfer] = start_byte; - - /* changed by AJ to fix a bug where a reader is still reading this xfer - * and the writer wants to start writing to it... */ - if (sync->w_buf == 0) - sync->eod [id->xfer] = 0; - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_enable_sod: xfer=%"PRIu64 - " start buf=%"PRIu64" byte=%"PRIu64"\n", sync->w_xfer, - sync->s_buf[id->xfer], sync->s_byte[id->xfer]); -#endif - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_enable_sod: w_buf=%"PRIu64"\n", sync->w_buf); -#endif - - for (new_bufs = sync->s_buf[id->xfer]; new_bufs < sync->w_buf; new_bufs++) - { - bufnum = new_bufs % sync->nbufs; - id->count[bufnum] ++; -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_enable_sod: count[%"PRIu64"]=%u\n", - bufnum, id->count[bufnum]); -#endif - } - - new_bufs = sync->w_buf - sync->s_buf[id->xfer]; - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_enable_sod: new_bufs=%"PRIu64"\n", new_bufs); -#endif - - id->state = IPCBUF_WRITING; - id->sync->w_state = IPCBUF_WRITING; - - for (iread = 0; iread < sync->n_readers; iread++) - { - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_enable_sod: increment FULL[%d]=%d by %"PRIu64"\n", - iread, - semctl (id->semid_data[iread], IPCBUF_FULL, GETVAL), new_bufs); -#endif - - /* increment the buffers written semaphore */ - if (new_bufs && ipc_semop (id->semid_data[iread], IPCBUF_FULL, new_bufs, 0) < 0) - { - fprintf (stderr, "ipcbuf_enable_sod: error increment FULL\n"); - return -1; - } - } - - return 0; -} - -char ipcbuf_is_writer (ipcbuf_t* id) -{ - int who = id->state; - return who==IPCBUF_WRITER || who==IPCBUF_WCHANGE || who==IPCBUF_WRITING; -} - -char* ipcbuf_get_next_write (ipcbuf_t* id) -{ - int iread = 0; - uint64_t bufnum = 0; - ipcsync_t* sync = id->sync; - - /* must be the designated writer */ - if (!ipcbuf_is_writer(id)) - { - fprintf (stderr, "ipcbuf_get_next_write: process is not writer\n"); - return NULL; - } - - if (id->state == IPCBUF_WCHANGE) - { -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_get_next_write: WCHANGE->WRITING enable_sod" - " w_buf=%"PRIu64"\n", id->sync->w_buf); -#endif - - if (ipcbuf_enable_sod (id, id->sync->w_buf, 0) < 0) - { - fprintf (stderr, "ipcbuf_get_next_write: ipcbuf_enable_sod error\n"); - return NULL; - } - } - - bufnum = sync->w_buf % sync->nbufs; - - while (id->count[bufnum]) - { -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_get_next_write: count[%"PRIu64"]=%u\n", - bufnum, id->count[bufnum]); -#endif - /* decrement the buffers read semaphore */ - for (iread = 0; iread < sync->n_readers; iread++ ) - { -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_get_next_write: decrement CLEAR=%d\n", - semctl (id->semid_data[iread], IPCBUF_CLEAR, GETVAL)); -#endif - if (ipc_semop (id->semid_data[iread], IPCBUF_CLEAR, -1, 0) < 0) - { - fprintf (stderr, "ipcbuf_get_next_write: error decrement CLEAR\n"); - return NULL; - } - } - - id->count[bufnum] --; - } - - return id->buffer[bufnum]; -} - -/* memset the contents of the next write buffer to zero, after it has been marked cleared */ -int ipcbuf_zero_next_write (ipcbuf_t *id) -{ - ipcsync_t* sync = id->sync; - - /* must be the designated writer */ - if (!ipcbuf_is_writer(id)) - { - fprintf (stderr, "ipcbuf_get_next_write: process is not writer\n"); - return -1; - } - - // get the next buffer to be written - uint64_t next_buf = (sync->w_buf + 1) % sync->nbufs; - - char have_cleared = 0; - unsigned iread; - while (!have_cleared) - { - have_cleared = 1; - // check that each reader has 1 clear buffer at least - for (iread = 0; iread < sync->n_readers; iread++ ) - { - if (semctl (id->semid_data[iread], IPCBUF_CLEAR, GETVAL) == 0) - have_cleared = 0; - } - if (!have_cleared) - float_sleep(0.01); - } - - // zap bufnum - bzero (id->buffer[next_buf], id->sync->bufsz); - return 0; -} - -int ipcbuf_mark_filled (ipcbuf_t* id, uint64_t nbytes) -{ - ipcsync_t* sync = 0; - uint64_t bufnum = 0; - int iread = 0; - - /* must be the designated writer */ - if (!ipcbuf_is_writer(id)) - { - fprintf (stderr, "ipcbuf_mark_filled: process is not writer\n"); - return -1; - } - - /* increment the buffers written semaphore only if WRITING */ - if (id->state == IPCBUF_WRITER) - { - id->sync->w_buf ++; - return 0; - } - - sync = id->sync; - - if (id->state == IPCBUF_WCHANGE || nbytes < sync->bufsz) - { -#ifdef _DEBUG - if (id->state == IPCBUF_WCHANGE) - fprintf (stderr, "ipcbuf_mark_filled: end xfer #%"PRIu64"->%"PRIu64"\n", - sync->w_xfer, id->xfer); -#endif - - for (iread = 0; iread < sync->n_readers; iread++) - { -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_mark_filled: decrement EODACK=%d\n", - semctl (id->semid_data[iread], IPCBUF_EODACK, GETVAL)); -#endif - - if (ipc_semop (id->semid_data[iread], IPCBUF_EODACK, -1, 0) < 0) - { - fprintf (stderr, "ipcbuf_mark_filled: error decrementing EODACK\n"); - return -1; - } - } - - sync->e_buf [id->xfer] = sync->w_buf; - sync->e_byte [id->xfer] = nbytes; - sync->eod [id->xfer] = 1; - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_mark_filled:" - " end buf=%"PRIu64" byte=%"PRIu64"\n", - sync->e_buf[id->xfer], sync->e_byte[id->xfer]); -#endif - - sync->w_xfer++; - id->xfer = sync->w_xfer % IPCBUF_XFERS; - - id->state = IPCBUF_WRITER; - id->sync->w_state = 0; - } - - bufnum = sync->w_buf % sync->nbufs; - - id->count[bufnum] ++; - sync->w_buf ++; - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_mark_filled: count[%"PRIu64"]=%u w_buf=%"PRIu64"\n", - bufnum, id->count[bufnum], sync->w_buf); -#endif - - for (iread = 0; iread < sync->n_readers; iread++) - { -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_mark_filled: increment FULL=%d\n", - semctl (id->semid_data[iread], IPCBUF_FULL, GETVAL)); -#endif - if (ipc_semop (id->semid_data[iread], IPCBUF_FULL, 1, 0) < 0) - { - fprintf (stderr, "ipcbuf_mark_filled: error increment FULL\n"); - return -1; - } - } - - return 0; -} - -/*! Lock this process in as the designated reader */ -int ipcbuf_lock_read (ipcbuf_t* id) -{ - int iread = 0; - if (id->state != IPCBUF_VIEWER) - { - fprintf (stderr, "ipcbuf_lock_read: not connected\n"); - return -1; - } - - if (id->iread != -1) - { - fprintf (stderr, "ipcbuf_lock_read: iread initialized unexpectedly\n"); - return -1; - } - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_lock_read: decrement READ=%d\n", - semctl (id->semid_connect, IPCBUF_READ, GETVAL)); -#endif - /* decrement the read semaphore */ - if (ipc_semop (id->semid_connect, IPCBUF_READ, -1, SEM_UNDO) < 0) - { - fprintf (stderr, "ipcbuf_lock_read: error decrement READ\n"); - return -1; - } - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_lock_read: reader status locked\n"); -#endif - - /* determine the reader index based on reader state */ -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_lock_read: id->iread=%d\n", id->iread); -#endif - - // order the reader's based on xfer number - uint64_t xfers_val [IPCBUF_READERS]; - unsigned xfers_pos [IPCBUF_READERS]; - unsigned ipos = 0; - for (ipos=0; ipos < id->sync->n_readers; ipos++) - { - xfers_val[ipos] = UINT64_MAX; - for (iread = 0; iread < id->sync->n_readers; iread++) - { - if (id->sync->r_bufs[iread] < xfers_val[ipos]) - { - char used = 0; - unsigned ipos2; - for (ipos2=0; ipos2sync->r_bufs[iread]; - xfers_pos[ipos] = iread; - } - } - } - } - - for (iread = 0; ((id->iread == -1) && (iread < id->sync->n_readers)); iread++) - { - unsigned oldest_iread = xfers_pos[iread]; -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_lock_read: BEFORE decrement [%d] READER_CONN=%d\n", - oldest_iread, semctl (id->semid_data[oldest_iread], IPCBUF_READER_CONN, GETVAL)); -#endif - // try to decrement the reader connected semaphore for this reader - if (ipc_semop (id->semid_data[oldest_iread], IPCBUF_READER_CONN, -1, IPC_NOWAIT | SEM_UNDO) < 0) - { - if ( errno == EAGAIN ) - { -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_lock_read: skipping oldest_read=%d\n", oldest_iread); -#endif - } - else - { - fprintf (stderr, "ipcbuf_lock_read: error decrement READER_CONN\n"); - return -1; - } - } - // we did decrement the READER_CONN, assign the iread - else - { -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_lock_read: assigning id->iread=%d\n", oldest_iread); -#endif - id->iread = oldest_iread; - } - } - if (id->iread == -1) - { - fprintf (stderr, "ipcbuf_lock_read: error could not find available read index\n"); - return -1; - } - - // To facilitate a reader connecting to an XFER that already has - // start of data raised. -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_lock_read: r_states[%d]=%d\n", id->iread, id->sync->r_states[id->iread]); -#endif - if (id->sync->r_states[id->iread] == IPCBUF_DISCON) - id->state = IPCBUF_READER; - else - id->state = IPCBUF_READING; - - id->xfer = id->sync->r_xfers[id->iread] % IPCBUF_XFERS; - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_lock_read: xfer=%"PRIu64 - " start buf=%"PRIu64" byte=%"PRIu64"\n", id->sync->r_xfers[id->iread], - id->sync->s_buf[id->xfer], id->sync->s_byte[id->xfer]); -#endif - - return 0; -} - -char ipcbuf_is_reader (ipcbuf_t* id) -{ - int who = id->state; - return who==IPCBUF_READER || who==IPCBUF_READING || who==IPCBUF_RSTOP; -} - -int ipcbuf_unlock_read (ipcbuf_t* id) -{ - if (!ipcbuf_is_reader(id)) - { - fprintf (stderr, "ipcbuf_unlock_read: state != READER\n"); - return -1; - } - - if ((id->iread < 0) || (id->iread >= id->sync->n_readers)) - { - fprintf (stderr, "ipcbuf_lock_read: iread not initialized\n"); - return -1; - } -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_unlock_read[%d]: increment READER_CONN=%d\n", - id->iread, semctl (id->semid_data[id->iread], IPCBUF_READER_CONN, GETVAL)); -#endif - if (ipc_semop (id->semid_data[id->iread], IPCBUF_READER_CONN, 1, SEM_UNDO) < 0) - { - fprintf (stderr, "ipcbuf_disconnect: error increment READER_CONN\n"); - return -1; - } - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_unlock_read[%d]: increment READ=%d\n", - id->iread, - semctl (id->semid_connect, IPCBUF_READ, GETVAL)); -#endif - if (ipc_semop (id->semid_connect, IPCBUF_READ, 1, SEM_UNDO) < 0) - { - fprintf (stderr, "ipcbuf_unlock_read: error increment READ\n"); - return -1; - } - - id->state = IPCBUF_VIEWER; - int iread = id->iread; - id->iread = -1; - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_unlock_read[%d]: id->sync->r_states[%d]=%d\n", - iread, iread, id->sync->r_states[iread]); -#endif - - return 0; -} - -char* ipcbuf_get_next_read_work (ipcbuf_t* id, uint64_t* bytes, int flag) -{ - int iread = -1; - uint64_t bufnum; - uint64_t start_byte = 0; - ipcsync_t* sync = 0; - - if (ipcbuf_eod (id)) - return NULL; - - sync = id->sync; - - if (ipcbuf_is_reader (id)) - { - iread = id->iread; -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_get_next_read: decrement[%d] FULL=%d\n", iread, - semctl (id->semid_data[iread], IPCBUF_FULL, GETVAL)); -#endif - - /* decrement the buffers written semaphore */ - if (ipc_semop (id->semid_data[iread], IPCBUF_FULL, -1, flag) < 0) { - fprintf (stderr, "ipcbuf_get_next_read: error decrement FULL\n"); - return NULL; - } -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_get_next_read: decrement[%d] FULL worked!\n", iread); -#endif - - if (id->state == IPCBUF_READER) - { - id->xfer = sync->r_xfers[iread] % IPCBUF_XFERS; - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_get_next_read: xfer=%"PRIu64 - " start buf=%"PRIu64" byte=%"PRIu64"\n", sync->r_xfers[iread], - sync->s_buf[id->xfer], - sync->s_byte[id->xfer]); -#endif - - id->state = IPCBUF_READING; - id->sync->r_states[iread] = IPCBUF_READING; - - sync->r_bufs[iread] = sync->s_buf[id->xfer]; - start_byte = sync->s_byte[id->xfer]; - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_get_next_read: increment SODACK=%d\n", - semctl (id->semid_data[iread], IPCBUF_SODACK, GETVAL)); -#endif - - /* increment the start-of-data acknowlegement semaphore */ - if (ipc_semop (id->semid_data[iread], IPCBUF_SODACK, 1, flag) < 0) { - fprintf (stderr, "ipcbuf_get_next_read: error increment SODACK\n"); - return NULL; - } - - } - - bufnum = sync->r_bufs[iread]; - - } - else - { -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_get_next_read: not reader\n"); -#endif - // TODO - check if we should always just use id->iread = 0 for this?? - iread = 0; - - if (id->state == IPCBUF_VIEWER) - { -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_get_next_read: start viewing xfer=%"PRIu64 - " start buf=%"PRIu64" byte=%"PRIu64"\n", sync->r_xfers[iread], - sync->s_buf[id->xfer], sync->s_byte[id->xfer]); -#endif - - id->xfer = sync->r_xfers[iread] % IPCBUF_XFERS; - id->state = IPCBUF_VIEWING; - - id->viewbuf = sync->s_buf[id->xfer]; - start_byte = sync->s_byte[id->xfer]; - - /* - In the following, the viewer seeks to the end of data. - This is probably the best default behaviour for most monitors, - which should present the most current data. - - It is also necessary for proper behaviour of the dada_hdu class. - The header block is never closed; the xfer is always 0 and each - new transfer is simply a step forward in the ring buffer. - Therefore, if a viewer always takes the first buffer of the - current transfer (as above), then it will be reading out-of-date - header information. The following lines correct this problem. - - Willem van Straten - 22 Oct 2008 - */ - if (sync->w_buf > id->viewbuf + 1) - { -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_get_next_read: viewer seek to end of data\n"); -#endif - id->viewbuf = sync->w_buf - 1; - start_byte = 0; - } - } - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_get_next_read: sync->w_buf=%"PRIu64" id->viewbuf=%"PRIu64"\n", - sync->w_buf, id->viewbuf); -#endif - - /* Viewers wait until w_buf is incremented without semaphore operations */ - while (sync->w_buf <= id->viewbuf) - { -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_get_next_read: sync->eod[%d]=%d sync->r_bufs[%d]=%"PRIu64" sync->e_buf[%d]=%"PRIu64"\n", - id->xfer, sync->eod[id->xfer], iread, sync->r_bufs[iread], id->xfer, sync->e_buf[id->xfer]); -#endif - - // AJ added: sync->r_bufs[iread] to ensure that a buffer has been read by a reader - if (sync->eod[id->xfer] && sync->r_bufs[iread] && sync->r_bufs[iread] == sync->e_buf[id->xfer]) - { - id->state = IPCBUF_VSTOP; - break; - } - - float_sleep (0.1); - } - - if (id->viewbuf + sync->nbufs < sync->w_buf) - id->viewbuf = sync->w_buf - sync->nbufs + 1; - - bufnum = id->viewbuf; - id->viewbuf ++; -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_get_next_read: bufnum=%"PRIu64"\n", bufnum); -#endif - } - - bufnum %= sync->nbufs; - - if (bytes) - { - if (sync->eod[id->xfer] && sync->r_bufs[iread] == sync->e_buf[id->xfer]) - { -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_get_next_read xfer=%d EOD=true and r_buf=" - "%"PRIu64" == e_buf=%"PRIu64"\n", (int)id->xfer, - sync->r_bufs[iread], sync->e_buf[id->xfer]); -#endif - *bytes = sync->e_byte[id->xfer] - start_byte; - } - else - *bytes = sync->bufsz - start_byte; - } - - return id->buffer[bufnum] + start_byte; -} - - -char* ipcbuf_get_next_read (ipcbuf_t* id, uint64_t* bytes) -{ - return ipcbuf_get_next_read_work (id, bytes, 0); -} - -char* ipcbuf_get_next_readable (ipcbuf_t* id, uint64_t* bytes) -{ - return ipcbuf_get_next_read_work (id, bytes, SEM_UNDO); -} - -uint64_t ipcbuf_tell (ipcbuf_t* id, uint64_t bufnum) -{ - ipcsync_t* sync = id->sync; - -#ifdef _DEBUG - fprintf (stderr, - "ipcbuf_tell: bufnum=%"PRIu64" xfer=%"PRIu64", s_buf=%"PRIu64" s_byte=%"PRIu64"\n", - bufnum, id->xfer, sync->s_buf[id->xfer], sync->s_byte[id->xfer]); -#endif - - if (bufnum <= sync->s_buf[id->xfer]) - return 0; - - bufnum -= sync->s_buf[id->xfer]; - - return bufnum*sync->bufsz - sync->s_byte[id->xfer]; -} - -int64_t ipcbuf_tell_write (ipcbuf_t* id) -{ - if (ipcbuf_eod (id)) - return -1; - - if (!ipcbuf_is_writer (id)) - return -1; - - return ipcbuf_tell (id, id->sync->w_buf); -} - -int64_t ipcbuf_tell_read (ipcbuf_t* id) -{ - if (ipcbuf_eod (id)) - return -1; - - if (id->state == IPCBUF_READING) - return ipcbuf_tell (id, id->sync->r_bufs[id->iread]); - else if (id->state == IPCBUF_VIEWING) - return ipcbuf_tell (id, id->viewbuf); - else - return 0; -} - -int ipcbuf_mark_cleared (ipcbuf_t* id) -{ - ipcsync_t* sync = 0; - int iread = id->iread; - - if (!id) - { - fprintf (stderr, "ipcbuf_mark_cleared: no ipcbuf!\n"); - return -1; - } - - if (id->state != IPCBUF_READING) - { - fprintf (stderr, "ipcbuf_mark_cleared: not reading\n"); - return -1; - } - - sync = id->sync; - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_mark_cleared: increment CLEAR=%d\n", - semctl (id->semid_data[iread], IPCBUF_CLEAR, GETVAL)); -#endif - - /* increment the buffers cleared semaphore */ - if (ipc_semop (id->semid_data[iread], IPCBUF_CLEAR, 1, 0) < 0) - return -1; - - if (sync->eod[id->xfer] && sync->r_bufs[iread] == sync->e_buf[id->xfer]) - { -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_mark_cleared: increment EODACK=%d; CLEAR=%d\n", - semctl (id->semid_data[iread], IPCBUF_EODACK, GETVAL), - semctl (id->semid_data[iread], IPCBUF_CLEAR, GETVAL)); -#endif - - id->state = IPCBUF_RSTOP; - id->sync->r_states[iread] = IPCBUF_DISCON; - - sync->r_xfers[iread] ++; - id->xfer = sync->r_xfers[iread] % IPCBUF_XFERS; - - if (ipc_semop (id->semid_data[iread], IPCBUF_EODACK, 1, 0) < 0) { - fprintf (stderr, "ipcbuf_mark_cleared: error incrementing EODACK\n"); - return -1; - } - } - else - { -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_mark_cleared: increment r_buf=%"PRIu64"\n", - sync->r_bufs[iread]); -#endif - sync->r_bufs[iread] ++; - } - - return 0; -} - -int ipcbuf_reset (ipcbuf_t* id) -{ - uint64_t ibuf = 0; - int iread = 0; - uint64_t nbufs = ipcbuf_get_nbufs (id); - ipcsync_t* sync = id->sync; - unsigned ix = 0; - - /* if the reader has reached end of data, reset the state */ - if (id->state == IPCBUF_RSTOP) - { - id->state = IPCBUF_READER; - return 0; - } - - /* otherwise, must be the designated writer */ - if (!ipcbuf_is_writer(id)) { - fprintf (stderr, "ipcbuf_reset: invalid state=%d\n", id->state); - return -1; - } - - if (sync->w_buf == 0) - return 0; - - for (ibuf = 0; ibuf < nbufs; ibuf++) - { - while (id->count[ibuf]) - { - for (iread = 0; iread < sync->n_readers; iread++) - { -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_reset: decrement CLEAR=%d\n", - semctl (id->semid_data[iread], IPCBUF_CLEAR, GETVAL)); -#endif - - /* decrement the buffers cleared semaphore */ - if (ipc_semop (id->semid_data[iread], IPCBUF_CLEAR, -1, 0) < 0) - { - fprintf (stderr, "ipcbuf_reset: error decrementing CLEAR\n"); - return -1; - } - } - - id->count[ibuf] --; - } - } - - for (iread = 0; iread < sync->n_readers; iread++) - { -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_reset: decrement SODACK=%d\n", - semctl (id->semid_data[iread], IPCBUF_SODACK, GETVAL)); -#endif - - if (ipc_semop (id->semid_data[iread], IPCBUF_SODACK, -IPCBUF_XFERS, 0) < 0) - { - fprintf (stderr, "ipcbuf_reset: error decrementing SODACK\n"); - return -1; - } - -#ifdef _DEBUG - fprintf (stderr, "ipcbuf_reset: decrement EODACK=%d\n", - semctl (id->semid_data[iread], IPCBUF_EODACK, GETVAL)); -#endif - - if (ipc_semop (id->semid_data[iread], IPCBUF_EODACK, -IPCBUF_XFERS, 0) < 0) - { - fprintf (stderr, "ipcbuf_reset: error decrementing EODACK\n"); - return -1; - } - - if (ipc_semop (id->semid_data[iread], IPCBUF_SODACK, IPCBUF_XFERS, 0) < 0) - { - fprintf (stderr, "ipcbuf_reset: error resetting SODACK\n"); - return -1; - } - - if (ipc_semop (id->semid_data[iread], IPCBUF_EODACK, IPCBUF_XFERS, 0) < 0) - { - fprintf (stderr, "ipcbuf_reset: error resetting EODACK\n"); - return -1; - } - - sync->r_bufs[iread] = 0; - sync->r_xfers[iread] = 0; - } - sync->w_buf = 0; - sync->w_xfer = 0; - - for (ix=0; ix < IPCBUF_XFERS; ix++) - sync->eod[ix] = 1; - - return 0; -} - -/* reset the buffer count and end of data flags, without prejudice */ -int ipcbuf_hard_reset (ipcbuf_t* id) -{ - ipcsync_t* sync = id->sync; - unsigned ix = 0; - int val = 0; - int iread = 0; - - sync->w_buf = 0; - sync->w_xfer = 0; - - for (ix=0; ix < IPCBUF_XFERS; ix++) - sync->eod[ix] = 1; - - for (iread = 0; iread < sync->n_readers; iread++) - { - sync->r_bufs[iread] = 0; - sync->r_xfers[iread] = 0; - - if (semctl (id->semid_data[iread], IPCBUF_FULL, SETVAL, val) < 0) - { - perror ("ipcbuf_hard_reset: semctl (IPCBUF_FULL, SETVAL)"); - return -1; - } - - if (semctl (id->semid_data[iread], IPCBUF_CLEAR, SETVAL, val) < 0) - { - perror ("ipcbuf_hard_reset: semctl (IPCBUF_FULL, SETVAL)"); - return -1; - } - } - return 0; -} - -int ipcbuf_lock (ipcbuf_t* id) -{ -#ifdef SHM_LOCK - - uint64_t ibuf = 0; - - if (id->syncid < 0 || id->shmid == 0) - return -1; - - if (shmctl (id->syncid, SHM_LOCK, 0) < 0) - { - perror ("ipcbuf_lock: shmctl (syncid, SHM_LOCK)"); - return -1; - } - - for (ibuf = 0; ibuf < id->sync->nbufs; ibuf++) - if (shmctl (id->shmid[ibuf], SHM_LOCK, 0) < 0) - { - perror ("ipcbuf_lock: shmctl (shmid, SHM_LOCK)"); - return -1; - } - - return 0; - -#else - - fprintf(stderr, "ipcbuf_lock does nothing on this platform!\n"); - return -1; - -#endif - -} - -int ipcbuf_unlock (ipcbuf_t* id) -{ -#ifdef SHM_UNLOCK - - uint64_t ibuf = 0; - - if (id->syncid < 0 || id->shmid == 0) - return -1; - - if (shmctl (id->syncid, SHM_UNLOCK, 0) < 0) - { - perror ("ipcbuf_lock: shmctl (syncid, SHM_UNLOCK)"); - return -1; - } - - for (ibuf = 0; ibuf < id->sync->nbufs; ibuf++) - if (shmctl (id->shmid[ibuf], SHM_UNLOCK, 0) < 0) - { - perror ("ipcbuf_lock: shmctl (shmid, SHM_UNLOCK)"); - return -1; - } - - return 0; - -#else - - fprintf(stderr, "ipcbuf_unlock does nothing on this platform!\n"); - return -1; - -#endif - -} - -// Initialize each buffer -int ipcbuf_page (ipcbuf_t* id) -{ - uint64_t ibuf = 0; - if (id->syncid < 0 || id->shmid == 0) - return -1; - - for (ibuf = 0; ibuf < id->sync->nbufs; ibuf++) - { - bzero (id->buffer[ibuf], id->sync->bufsz); - } - - return 0; -} - - - -int ipcbuf_eod (ipcbuf_t* id) -{ - if (!id) - { - fprintf (stderr, "ipcbuf_eod: invalid ipcbuf_t*\n"); - return -1; - } - - return ( (id->state == IPCBUF_RSTOP) || (id->state == IPCBUF_VSTOP) ); -} - - -int ipcbuf_sod (ipcbuf_t* id) -{ - return id->state == IPCBUF_READING || id->state == IPCBUF_WRITING; -} - -// return the total bytes written for the current XFER in id -uint64_t ipcbuf_get_write_byte_xfer (ipcbuf_t* id) -{ - if (id->sync->eod[id->xfer]) - return id->sync->e_byte[id->xfer]; - else - return ipcbuf_tell (id, id->sync->w_buf); -} - -uint64_t ipcbuf_get_write_count_xfer (ipcbuf_t* id) -{ - if (id->sync->w_xfer == id->xfer) - return id->sync->w_buf; - else - return id->sync->e_byte[id->xfer]; -} - - -uint64_t ipcbuf_get_write_count (ipcbuf_t* id) -{ - return id->sync->w_buf; -} - -uint64_t ipcbuf_get_write_index (ipcbuf_t* id) -{ - return id->sync->w_buf % id->sync->nbufs; -} - -uint64_t ipcbuf_get_read_count (ipcbuf_t* id) -{ - if (id->iread == -1) - return ipcbuf_get_read_count_iread(id, 0); - else - return ipcbuf_get_read_count_iread(id, id->iread); -} - -uint64_t ipcbuf_get_read_count_iread (ipcbuf_t* id, unsigned iread) -{ - return id->sync->r_bufs[iread]; -} - - -uint64_t ipcbuf_get_read_index (ipcbuf_t* id) -{ - return ipcbuf_get_read_count (id) % id->sync->nbufs; -} - -uint64_t ipcbuf_get_nbufs (ipcbuf_t* id) -{ - return id->sync->nbufs; -} - -uint64_t ipcbuf_get_bufsz (ipcbuf_t* id) -{ - return id->sync->bufsz; -} - -uint64_t ipcbuf_get_nfull (ipcbuf_t* id) -{ - return ipcbuf_get_nfull_iread (id, -1); -} - -uint64_t ipcbuf_get_nfull_iread (ipcbuf_t* id, int iread) -{ - if (iread >= 0) - return semctl (id->semid_data[iread], IPCBUF_FULL, GETVAL); - else if (id->iread == -1) - return semctl (id->semid_data[0], IPCBUF_FULL, GETVAL); - else - { - unsigned i=0; - uint64_t max_nfull = 0; - uint64_t nfull = 0; - for (i = 0; i < id->sync->n_readers; i++) - { - nfull = semctl (id->semid_data[i], IPCBUF_FULL, GETVAL); - if (nfull > max_nfull) - max_nfull = nfull; - } - return nfull; - } -} - -uint64_t ipcbuf_get_nclear (ipcbuf_t* id) -{ - return ipcbuf_get_nclear_iread (id, -1); -} - -uint64_t ipcbuf_get_nclear_iread (ipcbuf_t* id, int iread) -{ - if (iread >= 0) - return semctl (id->semid_data[iread], IPCBUF_CLEAR, GETVAL); - else if (id->iread == -1) - return semctl (id->semid_data[0], IPCBUF_CLEAR, GETVAL); - else - { - unsigned i=0; - uint64_t max_nclear = 0; - uint64_t nclear = 0; - for (i = 0; i < id->sync->n_readers; i++) - { - nclear = semctl (id->semid_data[i], IPCBUF_CLEAR, GETVAL); - if (nclear > max_nclear) - max_nclear = nclear; - } - return nclear; - } -} - -uint64_t ipcbuf_get_sodack (ipcbuf_t* id) -{ - return ipcbuf_get_sodack_iread (id, -1); -} -uint64_t ipcbuf_get_sodack_iread (ipcbuf_t* id, int iread) -{ - if (iread >= 0) - return semctl (id->semid_data[iread], IPCBUF_SODACK, GETVAL); - else if (id->iread == -1) - return semctl (id->semid_data[0], IPCBUF_SODACK, GETVAL); - else - { - unsigned i=0; - uint64_t max_sodack = 0; - uint64_t sodack = 0; - for (i = 0; i < id->sync->n_readers; i++) - { - sodack = semctl (id->semid_data[i], IPCBUF_SODACK, GETVAL); - if (sodack > max_sodack) - max_sodack = sodack; - } - return sodack; - } -} - -uint64_t ipcbuf_get_eodack (ipcbuf_t* id) -{ - return ipcbuf_get_eodack_iread (id, -1); -} - -uint64_t ipcbuf_get_eodack_iread (ipcbuf_t* id, int iread) -{ - if (iread >= 0) - return semctl (id->semid_data[iread], IPCBUF_EODACK, GETVAL); - else if (id->iread == -1) - return semctl (id->semid_data[0], IPCBUF_EODACK, GETVAL); - else - { - unsigned i=0; - uint64_t max_eodack = 0; - uint64_t eodack = 0; - for (i = 0; i < id->sync->n_readers; i++) - { - eodack = semctl (id->semid_data[i], IPCBUF_EODACK, GETVAL); - if (eodack > max_eodack) - max_eodack = eodack; - } - return eodack; - } -} - -int ipcbuf_get_nreaders (ipcbuf_t* id) -{ - return id->sync->n_readers; -} - -int ipcbuf_get_reader_conn (ipcbuf_t* id) -{ - return ipcbuf_get_reader_conn_iread (id, -1); -} - -int ipcbuf_get_reader_conn_iread (ipcbuf_t* id, int iread) -{ - if (iread >= 0) - return semctl (id->semid_data[iread], IPCBUF_READER_CONN, GETVAL); - else if (id->iread == -1) - return semctl (id->semid_data[0], IPCBUF_READER_CONN, GETVAL); - else - { - unsigned i=0; - uint64_t min_reader_conn = 1; - uint64_t reader_conn = 0; - for (i = 0; i < id->sync->n_readers; i++) - { - reader_conn = semctl (id->semid_data[i], IPCBUF_READER_CONN, GETVAL); - if (reader_conn < min_reader_conn) - min_reader_conn = reader_conn; - } - return reader_conn; - } -} - -int ipcbuf_get_read_semaphore_count (ipcbuf_t* id) -{ - return semctl (id->semid_connect, IPCBUF_READ, GETVAL); -} - -/* Sets the buffer at which clocking began. */ -uint64_t ipcbuf_set_soclock_buf (ipcbuf_t* id) -{ - if (id->sync->w_xfer > 0) - id->soclock_buf = id->sync->e_buf[(id->sync->w_xfer-1) % IPCBUF_XFERS] + 1; - else - id->soclock_buf = 0; - - return id->soclock_buf; -} - diff --git a/psrdada/ipcbuf.h b/psrdada/ipcbuf.h deleted file mode 100644 index 24895d9..0000000 --- a/psrdada/ipcbuf.h +++ /dev/null @@ -1,264 +0,0 @@ -#ifndef __DADA_IPCBUF_H -#define __DADA_IPCBUF_H - -/* ************************************************************************ - - ipcbuf_t - a struct and associated routines for creation and management - of a ring buffer in shared memory - - ************************************************************************ */ - -#include -#include - -#ifdef __cplusplus -extern "C" { -#endif - -#define IPCBUF_WRITE 0 /* semaphore locks writer status */ -#define IPCBUF_READ 1 /* semaphore locks reader (+clear) status */ -#define IPCBUF_CONN_NSEM 2 /* total number of connection semaphores */ - -#define IPCBUF_SODACK 0 /* acknowledgement of start of data */ -#define IPCBUF_EODACK 1 /* acknowledgement of end of data */ -#define IPCBUF_FULL 2 /* semaphore counts full buffers */ -#define IPCBUF_CLEAR 3 /* semaphore counts emptied buffers */ -#define IPCBUF_READER_CONN 4 /* semaphore counts emptied buffers */ -#define IPCBUF_DATA_NSEM 5 /* total number of data semaphores */ - -#define IPCBUF_XFERS 8 /* total transfers in buffer */ -#define IPCBUF_READERS 8 /* maximum number of readers */ - - typedef struct { - - key_t semkey_connect; /* semaphore key for connecting to shared memory */ - key_t semkey_data [IPCBUF_READERS]; /* semaphore keys for reading/writing shared memory */ - - uint64_t nbufs; /* the number of buffers in the ring */ - uint64_t bufsz; /* the size of the buffers in the ring */ - - uint64_t w_buf; /* count of next buffer to write */ - int w_state; /* the state of the writer */ - uint64_t w_xfer; /* the current write transfer number */ - - uint64_t r_bufs [IPCBUF_READERS]; /* count of next buffer to read */ - int r_states [IPCBUF_READERS]; /* the state of the reader */ - uint64_t r_xfers [IPCBUF_READERS]; /* the current read transfer number */ - unsigned n_readers; /* number of readers */ - - /* the first valid buffer when sod is raised */ - uint64_t s_buf [IPCBUF_XFERS]; - - /* the first valid byte when sod is raised */ - uint64_t s_byte [IPCBUF_XFERS]; - - /* end of data flag */ - char eod [IPCBUF_XFERS]; - - /* the last valid buffer when sod is raised */ - uint64_t e_buf [IPCBUF_XFERS]; - - /* the last valid byte when sod is raised */ - uint64_t e_byte [IPCBUF_XFERS]; - - } ipcsync_t; - - typedef struct { - - int state; /* the state of the process: writer, reader, etc. */ - - int syncid; /* sync struct shared memory id */ - int semid_connect; /* semaphore id for shmem connect */ - int * semid_data; /* semaphore id for shmem data */ - int * shmid; /* ring buffer shared memory id */ - - ipcsync_t* sync; /* pointer to sync structure in shared memory */ - char** buffer; /* base addresses of sub-blocks in shared memory */ - char* count; /* the pending xfer count in each buffer in the ring */ - key_t* shmkey; /* shared memory keys */ - - uint64_t viewbuf; /* count of next buffer to look at (non-reader) */ - - uint64_t xfer; /* current xfer */ - - uint64_t soclock_buf; /* buffer to which the SOD is relevant */ - - int iread; /* reader count, -1 means writer */ - - } ipcbuf_t; - -#define IPCBUF_INIT {0, -1, -1, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1} - - /* //////////////////////////////////////////////////////////////////// - - FUNCTIONS USED TO CREATE/CONNECT/DESTROY SHARED MEMORY - - //////////////////////////////////////////////////////////////////// */ - - /*! Initialize an ipcbuf_t struct, creating shm and sem */ - int ipcbuf_create (ipcbuf_t*, key_t key, uint64_t nbufs, uint64_t bufsz, unsigned num_readers); - - /*! Connect to an already created ipcsync_t struct in shm */ - int ipcbuf_connect (ipcbuf_t*, key_t key); - - /*! Disconnect from a previously connected ipcsync_t struct in shm */ - int ipcbuf_disconnect (ipcbuf_t*); - - /*! Destroy the ring buffer space, semaphores, and shared memory ipcbuf_t */ - int ipcbuf_destroy (ipcbuf_t*); - - /* //////////////////////////////////////////////////////////////////// - - FUNCTIONS USED BY THE PROCESS WRITING TO SHARED MEMORY - - //////////////////////////////////////////////////////////////////// */ - - /*! Lock this process in as the data writer */ - int ipcbuf_lock_write (ipcbuf_t*); - - /*! Unlock this process in as the data writer */ - int ipcbuf_unlock_write (ipcbuf_t*); - - /*! Disable the start-of-data flag */ - int ipcbuf_disable_sod (ipcbuf_t*); - - /*! Enable the start-of-data flag */ - int ipcbuf_enable_sod (ipcbuf_t*, uint64_t st_buf, uint64_t st_byte); - - /*! Get the minimum possible buffer number that be start of data */ - uint64_t ipcbuf_get_sod_minbuf (ipcbuf_t* id); - - /*! Enable the end-of-data flag */ - int ipcbuf_enable_eod (ipcbuf_t*); - - /*! Get the next empty buffer available for writing. The - calling process must have locked "data writer" status with a call - to ipcbuf_lock_write. */ - char* ipcbuf_get_next_write (ipcbuf_t*); - - int ipcbuf_zero_next_write (ipcbuf_t *id); - - /*! Return the write buffer byte offset from the start of the transfer */ - int64_t ipcbuf_tell_write (ipcbuf_t* id); - - /*! Declare that the last buffer to be returned by - ipcbuf_get_next_write has been filled with nbytes bytes. The - calling process must have locked "data writer" status with a call - to ipcbuf_lock_write. If nbytes is less than bufsz, then end of - data is implicitly set. */ - int ipcbuf_mark_filled (ipcbuf_t*, uint64_t nbytes); - - /*! Return true if process is in writing state */ - char ipcbuf_is_writing (ipcbuf_t*); - - /*! Return true if process is the data writer */ - char ipcbuf_is_writer (ipcbuf_t*); - - /* //////////////////////////////////////////////////////////////////// - - FUNCTIONS USED BY THE PROCESS READING FROM SHARED MEMORY - - //////////////////////////////////////////////////////////////////// */ - - /*! Lock this process in as the data reader */ - int ipcbuf_lock_read (ipcbuf_t*); - - /*! Unlock this process in as the data reader */ - int ipcbuf_unlock_read (ipcbuf_t*); - - /*! Get the next full buffer, and the number of bytes in it */ - char* ipcbuf_get_next_read (ipcbuf_t*, uint64_t* bytes); - - /*! Return the read buffer byte offset from the start of the transfer */ - int64_t ipcbuf_tell_read (ipcbuf_t* id); - - /*! Gets the next full buffer, but does not modify any aspect of the ring buffer */ - char *ipcbuf_get_next_readable (ipcbuf_t* id, uint64_t* bytes); - - /*! Declare that the last buffer to be returned by - ipcbuf_get_next_read has been cleared and can be recycled. The - process must have locked "data reader" status with a call to - ipcbuf_lock_read */ - int ipcbuf_mark_cleared (ipcbuf_t*); - - /*! Return true if process is the data reader */ - char ipcbuf_is_reader (ipcbuf_t* id); - - /*! Return the state of the start-of-data flag */ - int ipcbuf_sod (ipcbuf_t*); - - /*! Test if the current buffer is the last buffer containing data */ - int ipcbuf_eod (ipcbuf_t*); - - /*! Return the number of bufferswritten to the ring buffer */ - uint64_t ipcbuf_get_write_count (ipcbuf_t*); - uint64_t ipcbuf_get_write_index (ipcbuf_t* id); - - uint64_t ipcbuf_get_write_byte_xfer (ipcbuf_t* id); - uint64_t ipcbuf_get_write_count_xfer (ipcbuf_t* id); - - /*! Return the number of buffersread from the ring buffer */ - uint64_t ipcbuf_get_read_count (ipcbuf_t*); - uint64_t ipcbuf_get_read_count_iread (ipcbuf_t* id, unsigned iread); - - /*! Return the Data Block index of the read buffer */ - uint64_t ipcbuf_get_read_index (ipcbuf_t* id); - - /*! Return the number of buffers in the ring buffer */ - uint64_t ipcbuf_get_nbufs (ipcbuf_t*); - - /*! Return the size of each buffer in the ring */ - uint64_t ipcbuf_get_bufsz (ipcbuf_t*); - - /*! Reset the buffer count and end of data flags */ - int ipcbuf_reset (ipcbuf_t*); - - /*! Reset the buffer count and end of data flags, with extreme prejudice */ - int ipcbuf_hard_reset (ipcbuf_t*); - - /*! Lock the shared memory into physical RAM (must be su) */ - int ipcbuf_lock (ipcbuf_t*); - - /*! Unlock the shared memory from physical RAM (allow swap) */ - int ipcbuf_unlock (ipcbuf_t*); - - /*! Initialise all buffer's to 0's, pageing them into RAM */ - int ipcbuf_page (ipcbuf_t* id); - - /*! Return the number of buffers currently flagged as clear */ - uint64_t ipcbuf_get_nclear (ipcbuf_t*); - uint64_t ipcbuf_get_nclear_iread (ipcbuf_t*, int iread); - - /*! Return the number of buffers currently flagged as full */ - uint64_t ipcbuf_get_nfull (ipcbuf_t*); - uint64_t ipcbuf_get_nfull_iread (ipcbuf_t*, int iread); - - /*! Return the number of sodacks */ - uint64_t ipcbuf_get_sodack (ipcbuf_t* id); - uint64_t ipcbuf_get_sodack_iread (ipcbuf_t* id, int iread); - - /*! Return the number of eodacks */ - uint64_t ipcbuf_get_eodack (ipcbuf_t* id); - uint64_t ipcbuf_get_eodack_iread (ipcbuf_t* id, int iread); - - /*! Return the number of readers in */ - int ipcbuf_get_nreaders(ipcbuf_t* id); - - /*! Return whether reader connected, 0 == connected */ - int ipcbuf_get_reader_conn (ipcbuf_t* id); - int ipcbuf_get_reader_conn_iread (ipcbuf_t* id, int iread); - - /*! Return the current read semaphore count */ - int ipcbuf_get_read_semaphore_count (ipcbuf_t* id); - - /*! Useful utility */ - void* shm_alloc (key_t key, size_t size, int flag, int* id); - - /*! set the start of clocking data buffer */ - uint64_t ipcbuf_set_soclock_buf(ipcbuf_t*); - -#ifdef __cplusplus - } -#endif - -#endif diff --git a/psrdada/ipcio.c b/psrdada/ipcio.c deleted file mode 100644 index 466d402..0000000 --- a/psrdada/ipcio.c +++ /dev/null @@ -1,765 +0,0 @@ -#include -#include -#include - -#include "ipcio.h" - -// #define _DEBUG 1 - -void ipcio_init (ipcio_t* ipc) -{ - ipc -> bytes = 0; - ipc -> rdwrt = 0; - ipc -> curbuf = 0; - - ipc -> marked_filled = 0; - - ipc -> sod_pending = 0; - ipc -> sod_buf = 0; - ipc -> sod_byte = 0; -} - -/* create a new shared memory block and initialize an ipcio_t struct */ -int ipcio_create (ipcio_t* ipc, key_t key, uint64_t nbufs, uint64_t bufsz, unsigned num_read) -{ - if (ipcbuf_create ((ipcbuf_t*)ipc, key, nbufs, bufsz, num_read) < 0) { - fprintf (stderr, "ipcio_create: ipcbuf_create error\n"); - return -1; - } - ipcio_init (ipc); - return 0; -} - -/* connect to an already created ipcbuf_t struct in shared memory */ -int ipcio_connect (ipcio_t* ipc, key_t key) -{ - if (ipcbuf_connect ((ipcbuf_t*)ipc, key) < 0) { - fprintf (stderr, "ipcio_connect: ipcbuf_connect error\n"); - return -1; - } - ipcio_init (ipc); - return 0; -} - -/* disconnect from an already connected ipcbuf_t struct in shared memory */ -int ipcio_disconnect (ipcio_t* ipc) -{ - if (ipcbuf_disconnect ((ipcbuf_t*)ipc) < 0) { - fprintf (stderr, "ipcio_disconnect: ipcbuf_disconnect error\n"); - return -1; - } - ipcio_init (ipc); - return 0; -} - -int ipcio_destroy (ipcio_t* ipc) -{ - ipcio_init (ipc); - return ipcbuf_destroy ((ipcbuf_t*)ipc); -} - -/* start reading/writing to an ipcbuf */ -int ipcio_open (ipcio_t* ipc, char rdwrt) -{ - if (rdwrt != 'R' && rdwrt != 'r' && rdwrt != 'w' && rdwrt != 'W') { - fprintf (stderr, "ipcio_open: invalid rdwrt = '%c'\n", rdwrt); - return -1; - } - - ipc -> rdwrt = 0; - ipc -> bytes = 0; - ipc -> curbuf = 0; - - if (rdwrt == 'w' || rdwrt == 'W') { - - /* read from file, write to shm */ - if (ipcbuf_lock_write ((ipcbuf_t*)ipc) < 0) { - fprintf (stderr, "ipcio_open: error ipcbuf_lock_write\n"); - return -1; - } - - if (rdwrt == 'w' && ipcbuf_disable_sod((ipcbuf_t*)ipc) < 0) { - fprintf (stderr, "ipcio_open: error ipcbuf_disable_sod\n"); - return -1; - } - - ipc -> rdwrt = rdwrt; - return 0; - - } - - if (rdwrt == 'R') { - if (ipcbuf_lock_read ((ipcbuf_t*)ipc) < 0) { - fprintf (stderr, "ipcio_open: error ipcbuf_lock_read\n"); - return -1; - } - } - - ipc -> rdwrt = rdwrt; - return 0; -} - -uint64_t ipcio_get_start_minimum (ipcio_t* ipc) -{ - uint64_t bufsz = ipcbuf_get_bufsz ((ipcbuf_t*)ipc); - uint64_t minbuf = ipcbuf_get_sod_minbuf ((ipcbuf_t*)ipc); - return minbuf * bufsz; -} - - -/* Checks if a SOD request has been requested, if it has, then it enables - * SOD */ -int ipcio_check_pending_sod (ipcio_t* ipc) -{ - ipcbuf_t* buf = (ipcbuf_t*) ipc; - - /* If the SOD flag has not been raised return 0 */ - if (ipc->sod_pending == 0) - return 0; - - /* The the buffer we wish to raise SOD on has not yet been written, then - * don't raise SOD */ - if (ipcbuf_get_write_count (buf) <= ipc->sod_buf) - return 0; - - /* Try to enable start of data on the sod_buf & sod byte */ - if (ipcbuf_enable_sod (buf, ipc->sod_buf, ipc->sod_byte) < 0) { - fprintf (stderr, "ipcio_check_pendind_sod: fail ipcbuf_enable_sod\n"); - return -1; - } - - ipc->sod_pending = 0; - return 0; -} - -/* start writing valid data to an ipcbuf. byte is the absolute byte relative to - * the start of the data block */ -int ipcio_start (ipcio_t* ipc, uint64_t byte) -{ - ipcbuf_t* buf = (ipcbuf_t*) ipc; - uint64_t bufsz = ipcbuf_get_bufsz (buf); - - if (ipc->rdwrt != 'w') { - fprintf (stderr, "ipcio_start: invalid ipcio_t (%c)\n",ipc->rdwrt); - return -1; - } - - ipc->sod_buf = byte / bufsz; - ipc->sod_byte = byte % bufsz; - ipc->sod_pending = 1; - ipc->rdwrt = 'W'; - - return ipcio_check_pending_sod (ipc); -} - -/* stop reading/writing to an ipcbuf */ -int ipcio_stop_close (ipcio_t* ipc, char unlock) -{ - if (ipc -> rdwrt == 'W') { - -#ifdef _DEBUG - if (ipc->curbuf) - fprintf (stderr, "ipcio_close:W buffer:%"PRIu64" %"PRIu64" bytes. " - "buf[0]=%x\n", ipc->buf.sync->w_buf, ipc->bytes, - ipc->curbuf[0]); -#endif - - if (ipcbuf_is_writing((ipcbuf_t*)ipc)) { - - if (ipcbuf_enable_eod ((ipcbuf_t*)ipc) < 0) { - fprintf (stderr, "ipcio_close:W error ipcbuf_enable_eod\n"); - return -1; - } - - if (ipcbuf_mark_filled ((ipcbuf_t*)ipc, ipc->bytes) < 0) { - fprintf (stderr, "ipcio_close:W error ipcbuf_mark_filled\n"); - return -1; - } - - if (ipcio_check_pending_sod (ipc) < 0) { - fprintf (stderr, "ipcio_close:W error ipcio_check_pending_sod\n"); - return -1; - } - - /* Ensure that mark_filled is not called again for this buffer - in ipcio_write */ - ipc->marked_filled = 1; - - if (ipc->bytes == ipcbuf_get_bufsz((ipcbuf_t*)ipc)) { -#ifdef _DEBUG - fprintf (stderr, "ipcio_close:W last buffer was filled\n"); -#endif - ipc->curbuf = 0; - } - - } - - ipc->rdwrt = 'w'; - - if (!unlock) - return 0; - - } - - if (ipc -> rdwrt == 'w') { - - /* Removed to allow a writer to write more than one transfer to the - * data block */ - /* -#ifdef _DEBUG - fprintf (stderr, "ipcio_close:W calling ipcbuf_reset\n"); -#endif - - if (ipcbuf_reset ((ipcbuf_t*)ipc) < 0) { - fprintf (stderr, "ipcio_close:W error ipcbuf_reset\n"); - return -1; - }*/ - - if (ipc->buf.sync->w_xfer > 0) { - - uint64_t prev_xfer = ipc->buf.sync->w_xfer - 1; - /* Ensure the w_buf pointer is pointing buffer after the - * most recent EOD */ - ipc->buf.sync->w_buf = ipc->buf.sync->e_buf[prev_xfer % IPCBUF_XFERS]+1; - - // TODO CHECK IF WE NEED TO DECREMENT the count?? - - } - - if (ipcbuf_unlock_write ((ipcbuf_t*)ipc) < 0) { - fprintf (stderr, "ipcio_close:W error ipcbuf_unlock_write\n"); - return -1; - } - - ipc -> rdwrt = 0; - return 0; - - } - - if (ipc -> rdwrt == 'R') { - - if (ipcbuf_unlock_read ((ipcbuf_t*)ipc) < 0) { - fprintf (stderr, "ipcio_close:R error ipcbuf_unlock_read\n"); - return -1; - } - - ipc -> rdwrt = 0; - return 0; - - } - - fprintf (stderr, "ipcio_close: invalid ipcio_t\n"); - return -1; -} - -/* stop writing valid data to an ipcbuf */ -int ipcio_stop (ipcio_t* ipc) -{ - if (ipc->rdwrt != 'W') { - fprintf (stderr, "ipcio_stop: not writing!\n"); - return -1; - } - return ipcio_stop_close (ipc, 0); -} - -/* stop reading/writing to an ipcbuf */ -int ipcio_close (ipcio_t* ipc) -{ - return ipcio_stop_close (ipc, 1); -} - -/* return 1 if the ipcio is open for reading or writing */ -int ipcio_is_open (ipcio_t* ipc) -{ - char rdwrt = ipc->rdwrt; - return rdwrt == 'R' || rdwrt == 'r' || rdwrt == 'w' || rdwrt == 'W'; -} - -/* write bytes to ipcbuf */ -ssize_t ipcio_write (ipcio_t* ipc, char* ptr, size_t bytes) -{ - - size_t space = 0; - size_t towrite = bytes; - - if (ipc->rdwrt != 'W' && ipc->rdwrt != 'w') { - fprintf (stderr, "ipcio_write: invalid ipcio_t (%c)\n",ipc->rdwrt); - return -1; - } - - while (bytes) { - - /* - The check for a full buffer is done at the start of the loop - so that if ipcio_write exactly fills a buffer before exiting, - the end-of-data flag can be raised before marking the buffer - as filled in ipcio_stop - */ - if (ipc->bytes == ipcbuf_get_bufsz((ipcbuf_t*)ipc)) { - - if (!ipc->marked_filled) { - -#ifdef _DEBUG - fprintf (stderr, "ipcio_write buffer:%"PRIu64" mark_filled\n", - ipc->buf.sync->w_buf); -#endif - - /* the buffer has been filled */ - if (ipcbuf_mark_filled ((ipcbuf_t*)ipc, ipc->bytes) < 0) { - fprintf (stderr, "ipcio_write: error ipcbuf_mark_filled\n"); - return -1; - } - - if (ipcio_check_pending_sod (ipc) < 0) { - fprintf (stderr, "ipcio_write: error ipcio_check_pending_sod\n"); - return -1; - } - - } - - ipc->curbuf = 0; - ipc->bytes = 0; - ipc->marked_filled = 1; - - } - - if (!ipc->curbuf) { - -#ifdef _DEBUG - fprintf (stderr, "ipcio_write buffer:%"PRIu64" ipcbuf_get_next_write\n", - ipc->buf.sync->w_buf); -#endif - - ipc->curbuf = ipcbuf_get_next_write ((ipcbuf_t*)ipc); - -#ifdef _DEBUG - fprintf (stderr, "ipcio_write: ipcbuf_get_next_write returns\n"); -#endif - - if (!ipc->curbuf) { - fprintf (stderr, "ipcio_write: ipcbuf_next_write\n"); - return -1; - } - - ipc->marked_filled = 0; - ipc->bytes = 0; - } - - space = ipcbuf_get_bufsz((ipcbuf_t*)ipc) - ipc->bytes; - if (space > bytes) - space = bytes; - -#ifdef _DEBUG - fprintf (stderr, "ipcio_write space=%"PRIu64"\n", space); -#endif - - if (space > 0) { - -#ifdef _DEBUG - fprintf (stderr, "ipcio_write buffer:%"PRIu64" offset:%"PRIu64 - " count=%"PRIu64"\n", ipc->buf.sync->w_buf, ipc->bytes, space); -#endif - - memcpy (ipc->curbuf + ipc->bytes, ptr, space); - ipc->bytes += space; - ptr += space; - bytes -= space; - } - - } - - return towrite; -} - -/* - * Open next Data Block unit for "direct" read access. Returns a pointer to the - * DB unit and set the number of bytes in the block and the block index - */ -char * ipcio_open_block_read (ipcio_t *ipc, uint64_t *curbufsz, uint64_t *block_id) -{ - - if (ipc->bytes != 0) - { - fprintf (stderr, "ipcio_open_block_read: ipc->bytes != 0\n"); - return 0; - } - - if (ipc->curbuf) - { - fprintf(stderr, "ipcio_open_block_read: ipc->curbuf != 0\n"); - return 0; - } - - if (ipc -> rdwrt != 'r' && ipc -> rdwrt != 'R') - { - fprintf(stderr, "ipcio_open_block_read: ipc -> rdwrt != [rR]\n"); - return 0; - } - - // test for EOD - if (ipcbuf_eod((ipcbuf_t*)ipc)) - { - fprintf(stderr, "ipcio_open_block_read: ipcbuf_eod true, returning null ptr\n"); - return 0; - } - - ipc->curbuf = ipcbuf_get_next_read ((ipcbuf_t*)ipc, &(ipc->curbufsz)); - - if (!ipc->curbuf) - { - fprintf (stderr, "ipcio_open_block_read: could not get next block rdwrt=%c\n", ipc -> rdwrt); - return 0; - } - - *block_id = ipcbuf_get_read_index ((ipcbuf_t*)ipc); - *curbufsz = ipc->curbufsz; - - ipc->bytes = 0; - - return ipc->curbuf; -} - -/* - * Close the Data Block unit from a "direct" read access, updating the number - * of bytes read. Return 0 on success, -1 on failure - */ -ssize_t ipcio_close_block_read (ipcio_t *ipc, uint64_t bytes) -{ - - if (ipc->bytes != 0) - { - fprintf (stderr, "ipcio_close_block_read: ipc->bytes != 0\n"); - return -1; - } - - if (!ipc->curbuf) - { - fprintf (stderr, "ipcio_close_block_read: ipc->curbuf == 0\n"); - return -1; - } - - if (ipc -> rdwrt != 'R') - { - fprintf (stderr, "ipcio_close_block_read: ipc->rdwrt != W\n"); - return -1; - } - - // the reader should always have read the required number of bytes - if (bytes != ipc->curbufsz) - { - fprintf (stderr, "ipcio_close_block_read: WARNING! bytes [%"PRIu64"] != ipc->curbufsz [%"PRIu64"]\n", - bytes, ipc->curbufsz); - } - - // increment the bytes counter by the number of bytes used - ipc->bytes += bytes; - - // if all the requested bytes have been read - if (ipc->bytes == ipc->curbufsz) - { - - if (ipcbuf_mark_cleared ((ipcbuf_t*)ipc) < 0) - { - fprintf (stderr, "ipcio_close_block: error ipcbuf_mark_filled\n"); - return -1; - } - - ipc->curbuf = 0; - ipc->bytes = 0; - } - - return 0; -} - - -/* - * Open next Data Block unit for "direct" write access. Returns a pointer to the - * DB unit and set the block index - */ -char * ipcio_open_block_write (ipcio_t *ipc, uint64_t *block_id) -{ - - if (ipc->bytes != 0) - { - fprintf (stderr, "ipcio_open_block_write: ipc->bytes != 0\n"); - return 0; - } - - if (ipc->curbuf) - { - fprintf(stderr, "ipcio_open_block_write: ipc->curbuf != 0\n"); - return 0; - } - - if (ipc -> rdwrt != 'W') - { - fprintf(stderr, "ipcio_open_block_write: ipc -> rdwrt != W\n"); - return 0; - } - - ipc->curbuf = ipcbuf_get_next_write ((ipcbuf_t*)ipc); - - if (!ipc->curbuf) - { - fprintf (stderr, "ipcio_open_block_write: could not get next block rdwrt=%c\n", ipc -> rdwrt); - return 0; - } - - *block_id = ipcbuf_get_write_index ((ipcbuf_t*)ipc); - - ipc->marked_filled = 0; - ipc->bytes = 0; - - return ipc->curbuf; -} - -int ipcio_zero_next_block (ipcio_t *ipc) -{ - if (ipc -> rdwrt != 'W') - { - fprintf(stderr, "ipcio_open_block_write: ipc -> rdwrt != W\n"); - return -1; - } - - return ipcbuf_zero_next_write ((ipcbuf_t*)ipc); -} - - -/* - * Update the number of bytes written to a Data Block unit that was opened - * for "direct" write access. This does not mark the buffer as filled. - */ -ssize_t ipcio_update_block_write (ipcio_t *ipc, uint64_t bytes) -{ - if (ipc->bytes != 0) - { - fprintf (stderr, "ipcio_update_block_write: ipc->bytes [%"PRIu64"] != 0\n", ipc->bytes); - return -1; - } - - if (!ipc->curbuf) - { - fprintf (stderr, "ipcio_update_block_write: ipc->curbuf == 0\n"); - return -1; - } - - if (ipc->rdwrt != 'W') - { - fprintf(stderr, "ipcio_update_block_write: ipc->rdwrt != W\n"); - return -1; - } - - if (ipc->bytes + bytes > ipcbuf_get_bufsz((ipcbuf_t*)ipc)) - { - fprintf(stderr, "ipcio_update_block_write: wrote more bytes than there was space for! [%"PRIu64" + %"PRIu64"] > %"PRIu64"\n", - ipc->bytes, bytes, ipcbuf_get_bufsz((ipcbuf_t*)ipc)); - return -1; - } - - // increment the bytes counter by the number of bytes used - ipc->bytes += bytes; - - return 0; -} - -/* - * Close the Data Block unit from a "direct" write access, updating the number - * of bytes written. Return 0 on success, -ve on failure - */ -ssize_t ipcio_close_block_write (ipcio_t *ipc, uint64_t bytes) -{ - - // update the number of bytes written - if (ipcio_update_block_write (ipc, bytes) < 0) - { - fprintf (stderr, "ipcio_close_block_write: ipcio_update_block_write failed\n"); - return -1; - } - - // if this buffer has not yet been marked as filled - if (!ipc->marked_filled) - { - if (ipcbuf_mark_filled ((ipcbuf_t*)ipc, ipc->bytes) < 0) - { - fprintf (stderr, "ipcio_close_block_write: error ipcbuf_mark_filled\n"); - return -2; - } - - if (ipcio_check_pending_sod (ipc) < 0) - { - fprintf (stderr, "ipcio_close_block_write: error ipcio_check_pending_sod\n"); - return -3; - } - - ipc->marked_filled = 1; - ipc->curbuf = 0; - ipc->bytes = 0; - } - return 0; -} - -/* read bytes from ipcbuf */ -ssize_t ipcio_read (ipcio_t* ipc, char* ptr, size_t bytes) -{ - size_t space = 0; - size_t toread = bytes; - - if (ipc -> rdwrt != 'r' && ipc -> rdwrt != 'R') - { - fprintf (stderr, "ipcio_read: invalid ipcio_t (rdwrt=%c)\n", ipc->rdwrt); - return -1; - } - - while (!ipcbuf_eod((ipcbuf_t*)ipc)) - { - if (!ipc->curbuf) - { - ipc->curbuf = ipcbuf_get_next_read ((ipcbuf_t*)ipc, &(ipc->curbufsz)); - -#ifdef _DEBUG - fprintf (stderr, "ipcio_read buffer:%"PRIu64" %"PRIu64" bytes. buf[0]=%x\n", - ipc->buf.sync->r_buf, ipc->curbufsz, ipc->curbuf[0]); -#endif - - if (!ipc->curbuf) - { - fprintf (stderr, "ipcio_read: error ipcbuf_next_read\n"); - return -1; - } - - ipc->bytes = 0; - } - - if (bytes) - { - space = ipc->curbufsz - ipc->bytes; - if (space > bytes) - space = bytes; - - if (ptr) - { - memcpy (ptr, ipc->curbuf + ipc->bytes, space); - ptr += space; - } - - ipc->bytes += space; - bytes -= space; - } - - if (ipc->bytes == ipc->curbufsz) - { - if (ipc -> rdwrt == 'R' && ipcbuf_mark_cleared ((ipcbuf_t*)ipc) < 0) - { - fprintf (stderr, "ipcio_read: error ipcbuf_mark_filled\n"); - return -1; - } - - ipc->curbuf = 0; - ipc->bytes = 0; - } - else if (!bytes) - break; - } - - return toread - bytes; -} - - - -uint64_t ipcio_tell (ipcio_t* ipc) -{ - int64_t current = -1; - - if (ipc -> rdwrt == 'R' || ipc -> rdwrt == 'r') - current = ipcbuf_tell_read ((ipcbuf_t*)ipc); - else if (ipc -> rdwrt == 'W' || ipc -> rdwrt == 'w') - current = ipcbuf_tell_write ((ipcbuf_t*)ipc); - - if (current < 0) - { - fprintf (stderr, "ipcio_tell: failed ipcbuf_tell" - " mode=%c current=%"PRIi64"\n", ipc->rdwrt, current); - return 0; - } - - return current + ipc->bytes; -} - - - -int64_t ipcio_seek (ipcio_t* ipc, int64_t offset, int whence) -{ - /* the absolute value of the offset */ - uint64_t current = ipcio_tell (ipc); - -#ifdef _DEBUG - fprintf (stderr, "ipcio_seek: offset=%"PRIi64" tell=%"PRIu64"\n", - offset, current); -#endif - - if (whence == SEEK_CUR) - offset += ipcio_tell (ipc); - - if (current < offset) - { - /* seeking forward is just like reading without the memcpy */ - if (ipcio_read (ipc, 0, offset - current) < 0) - { - fprintf (stderr, "ipcio_seek: empty read %"PRIi64" bytes error\n", - offset-current); - return -1; - } - - } - - else if (offset < current) - { - /* can only go back to the beginning of the current buffer ... */ - offset = current - offset; - if (offset > ipc->bytes) - { - fprintf (stderr, "ipcio_seek: %"PRIu64" > max backwards %"PRIu64"\n", - offset, ipc->bytes); - return -1; - } - ipc->bytes -= offset; - } - - return ipcio_tell (ipc); -} - -/* Returns the number of bytes available in the ring buffer */ -int64_t ipcio_space_left(ipcio_t* ipc) -{ - uint64_t bufsz = ipcbuf_get_bufsz ((ipcbuf_t *)ipc); - uint64_t nbufs = ipcbuf_get_nbufs ((ipcbuf_t *)ipc); - uint64_t full_bufs = ipcbuf_get_nfull((ipcbuf_t*) ipc); - int64_t available_bufs = (nbufs - full_bufs); - -#ifdef _DEBUG - uint64_t clear_bufs = ipcbuf_get_nclear((ipcbuf_t*) ipc); - fprintf (stderr,"ipcio_space_left: full_bufs = %"PRIu64", clear_bufs = %" - PRIu64", available_bufs = %"PRIu64", sum = %"PRIu64"\n", - full_bufs, clear_bufs, available_bufs, available_bufs*bufsz); -#endif - - return available_bufs * bufsz; - -} - -/* Returns the percentage of space left in the ring buffer */ -float ipcio_percent_full(ipcio_t* ipc) { - - uint64_t nbufs = ipcbuf_get_nbufs ((ipcbuf_t *)ipc); - uint64_t full_bufs = ipcbuf_get_nfull((ipcbuf_t*) ipc); - return ((float)full_bufs) / ((float)nbufs); - -} - - -/* Returns the byte corresponding the start of data clocking/recording */ -uint64_t ipcio_get_soclock_byte(ipcio_t* ipc) { - uint64_t bufsz = ipcbuf_get_bufsz ((ipcbuf_t *)ipc); - return bufsz * ((ipcbuf_t*)ipc)->soclock_buf; -} - diff --git a/psrdada/ipcio.h b/psrdada/ipcio.h deleted file mode 100644 index 5ac0aa8..0000000 --- a/psrdada/ipcio.h +++ /dev/null @@ -1,104 +0,0 @@ -#ifndef __DADA_IPCIO_H -#define __DADA_IPCIO_H - -/* ************************************************************************ - - ipcio_t - a struct and associated routines for creating and managing, - as well as reading and writing to and from an ipcbuf_t ring buffer - - ************************************************************************ */ - -#include "ipcbuf.h" - -#ifdef __cplusplus -extern "C" { -#endif - - typedef struct { - - ipcbuf_t buf; /* ipcio_t struct on which this is based */ - - char* curbuf; /* pointer to the current buffer in the ring */ - uint64_t curbufsz; /* size of the current buffer */ - - uint64_t bytes; /* number of bytes into current buffer */ - - char rdwrt; /* == r read; == w write */ - - char marked_filled; /* flag set when curbuf has been marked filled */ - - char sod_pending; /* flag set when sod flag has been raised */ - uint64_t sod_buf; /* buffer of start-of-data */ - uint64_t sod_byte; /* byte of start-of-data */ - - } ipcio_t; - - static const ipcio_t IPCIO_INIT = { IPCBUF_INIT, 0,0, 0, 0, 0, 0,0,0 }; - - /*! create a new shared memory block and initialize an ipcio_t struct */ - int ipcio_create (ipcio_t* ipc, key_t key, uint64_t nbufs, uint64_t bufsz, unsigned num_read); - - /*! connect to an already created ipcbuf_t struct in shared memory */ - int ipcio_connect (ipcio_t* ipc, key_t key); - - /*! disconnect from an already connected ipcio_t struct */ - int ipcio_disconnect (ipcio_t* ipc); - - /*! start reading/writing to an ipcbuf */ - int ipcio_open (ipcio_t* ipc, char rdwrt); - - /*! stop reading/writing to an ipcbuf */ - int ipcio_close (ipcio_t* ipc); - - /*! return true if already open */ - int ipcio_is_open (ipcio_t* ipc); - - /*! free all resources reserved for the ring buffer */ - int ipcio_destroy (ipcio_t* ipc); - - /*! get the minimum byte that may be passed to ipcio_start */ - uint64_t ipcio_get_start_minimum (ipcio_t* ipc); - - /*! enable start of data on the specified byte */ - int ipcio_start (ipcio_t* ipc, uint64_t byte); - - /*! write an end of data marker; may continue writing to ring buffer */ - int ipcio_stop (ipcio_t* ipc); - - /*! write bytes to ipcbuf */ - ssize_t ipcio_write (ipcio_t* ipc, char* ptr, size_t bytes); - - /*! read bytes from ipcbuf */ - ssize_t ipcio_read (ipcio_t* ipc, char* ptr, size_t bytes); - - /*! seek into ipcbuf - valid only for reading for now */ - int64_t ipcio_seek (ipcio_t* ipc, int64_t offset, int whence); - - /*! tell the last byte written to the ring buffer */ - uint64_t ipcio_tell (ipcio_t* ipc); - - /*! tell how many bytes can be written into the ring buffer before its full */ - int64_t ipcio_space_left (ipcio_t* ipc); - - /*! tell how many bytes can be written into the ring buffer before its full */ - float ipcio_percent_full(ipcio_t* ipc); - - uint64_t ipcio_get_soclock_byte(ipcio_t* ipc); - - char * ipcio_open_block_read (ipcio_t *ipc, uint64_t *curbufsz, uint64_t *block_id); - - ssize_t ipcio_close_block_read (ipcio_t *ipc, uint64_t bytes); - - char * ipcio_open_block_write (ipcio_t *ipc, uint64_t *block_id); - - ssize_t ipcio_update_block_write (ipcio_t *ipc, uint64_t bytes); - - ssize_t ipcio_close_block_write (ipcio_t *ipc, uint64_t bytes); - - int ipcio_zero_next_block (ipcio_t *ipc); - -#ifdef __cplusplus - } -#endif - -#endif diff --git a/psrdada/ipcutil.c b/psrdada/ipcutil.c deleted file mode 100644 index de2e5f4..0000000 --- a/psrdada/ipcutil.c +++ /dev/null @@ -1,69 +0,0 @@ -#include "ipcutil.h" - -#include -#include -#include -#include -#include - -#include -#include -#include - -// #define _DEBUG 1 - -/* *************************************************************** */ -/*! - Returns a shared memory block and its shmid -*/ -void* ipc_alloc (key_t key, size_t size, int flag, int* shmid) -{ - void* buf = 0; - int id = 0; - - id = shmget (key, size, flag); - if (id < 0) { - fprintf (stderr, "ipc_alloc: shmget (key=%x, size=%d, flag=%x) %s\n", - key, size, flag, strerror(errno)); - return 0; - } - -#ifdef _DEBUG - fprintf (stderr, "ipc_alloc: shmid=%d\n", id); -#endif - - buf = shmat (id, 0, flag); - - if (buf == (void*)-1) { - fprintf (stderr, - "ipc_alloc: shmat (shmid=%d) %s\n" - "ipc_alloc: after shmget (key=%x, size=%d, flag=%x)\n", - id, strerror(errno), key, size, flag); - return 0; - } - -#ifdef _DEBUG - fprintf (stderr, "ipc_alloc: shmat=%p\n", buf); -#endif - - if (shmid) - *shmid = id; - - return buf; -} - -int ipc_semop (int semid, short num, short op, short flag) -{ - struct sembuf semopbuf; - - semopbuf.sem_num = num; - semopbuf.sem_op = op; - semopbuf.sem_flg = flag; - - if (semop (semid, &semopbuf, 1) < 0) { - if (!(flag | IPC_NOWAIT)) - perror ("ipc_semop: semop"); - return -1; - } - return 0; -} diff --git a/psrdada/ipcutil.h b/psrdada/ipcutil.h deleted file mode 100644 index 63b1797..0000000 --- a/psrdada/ipcutil.h +++ /dev/null @@ -1,29 +0,0 @@ -#ifndef __DADA_IPCUTIL_H -#define __DADA_IPCUTIL_H - -#include - -/* ************************************************************************ - - utilities for creation of shared memory and operations on semaphores - - ************************************************************************ */ - -#define IPCUTIL_PERM 0666 /* default: read/write permissions for all */ - -#ifdef __cplusplus -extern "C" { -#endif - - /* allocate size bytes in shared memory with the specified flags and key. - returns the pointer to the base address and the shmid, id */ - void* ipc_alloc (key_t key, size_t size, int flag, int* id); - - /* operate on the specified semaphore */ - int ipc_semop (int semid, short num, short incr, short flag); - -#ifdef __cplusplus - } -#endif - -#endif diff --git a/psrdada/multilog.c b/psrdada/multilog.c deleted file mode 100644 index 05a8311..0000000 --- a/psrdada/multilog.c +++ /dev/null @@ -1,153 +0,0 @@ - -#include "multilog.h" -#include "dada_def.h" - -#include -#include -#include -#include -#include -#include - -// #define _DEBUG 1 - -multilog_t* multilog_open (const char* program_name, char syslog) -{ - multilog_t* m = (multilog_t*) malloc (sizeof(multilog_t)); - assert (m != 0); - - if (syslog) - openlog (program_name, LOG_CONS, LOG_USER); - - m->syslog = syslog; - m->name = strdup(program_name); - assert (m->name != 0); - m->logs = 0; - m->nlog = 0; - m->port = 0; - m->timestamp = 1; - - pthread_mutex_init(&(m->mutex), NULL); - - return m; -} - -int multilog_close (multilog_t* m) -{ - pthread_mutex_lock (&(m->mutex)); - - free (m->name); - if (m->logs) - free (m->logs); - m->logs = 0; - - pthread_mutex_unlock (&(m->mutex)); - pthread_mutex_destroy (&(m->mutex)); - - free (m); - return 0; -} - -int multilog_add (multilog_t* m, FILE* fptr) -{ - pthread_mutex_lock (&(m->mutex)); - - m->logs = realloc (m->logs, (m->nlog+1)*sizeof(multilog_t)); - assert (m->logs != 0); - m->logs[m->nlog] = fptr; - m->nlog ++; - - pthread_mutex_unlock (&(m->mutex)); - - return 0; -} - -int multilog (multilog_t* m, int priority, const char* format, ...) -{ - unsigned ilog = 0; - va_list arguments; - - if (!m) - return -1; - - va_start (arguments, format); - - if (m->syslog) - vsyslog (priority, format, arguments); - - pthread_mutex_lock (&(m->mutex)); - -#ifdef _DEBUG - fprintf (stderr, "multilog: %d logs\n", m->nlog); -#endif - - va_end(arguments); - - for (ilog=0; ilog < m->nlog; ilog++) { - va_start(arguments, format); - - if (ferror (m->logs[ilog])) { -#ifdef _DEBUG - fprintf (stderr, "multilog: error on log[%d]", ilog); -#endif - fclose (m->logs[ilog]); - m->logs[ilog] = m->logs[m->nlog-1]; - m->nlog --; - ilog --; - } - else { - if (m->timestamp) { - unsigned buffer_size = 64; - static char* buffer = 0; - if (!buffer) - buffer = malloc (buffer_size); - assert (buffer != 0); - time_t now = time(0); - strftime (buffer, buffer_size, DADA_TIMESTR, - (struct tm*) localtime(&now)); - fprintf(m->logs[ilog],"[%s] ",buffer); - } - - if (priority == LOG_ERR) fprintf(m->logs[ilog], "ERR: "); - if (priority == LOG_WARNING) fprintf(m->logs[ilog], "WARN: "); - //fprintf (m->logs[ilog], "%s: ", m->name); - if (vfprintf (m->logs[ilog], format, arguments) < 0) - perror ("multilog: error vfprintf"); - } - va_end(arguments); - } - - pthread_mutex_unlock (&(m->mutex)); - - return 0; -} - -int multilog_fprintf(FILE* stream, int priority, const char* format, ...) -{ - - va_list arguments; - - if (!stream) - return -1; - - va_start(arguments, format); - - unsigned buffer_size = DADA_TIMESTR_LENGTH; - static char* buffer = 0; - if (!buffer) - buffer = malloc (buffer_size); - assert (buffer != 0); - time_t now = time(0); - strftime (buffer, buffer_size, DADA_TIMESTR, (struct tm*) localtime(&now)); - fprintf(stream,"[%s] ",buffer); - - if (priority == LOG_ERR) fprintf(stream, "ERR: "); - if (priority == LOG_WARNING) fprintf(stream, "WARN: "); - if (vfprintf (stream, format, arguments) < 0) - perror ("multilog: error vfprintf"); - - va_end(arguments); - - return 0; - -} diff --git a/psrdada/multilog.h b/psrdada/multilog.h deleted file mode 100644 index ab0bfbe..0000000 --- a/psrdada/multilog.h +++ /dev/null @@ -1,54 +0,0 @@ -#ifndef __DADA_MULTILOG_H -#define __DADA_MULTILOG_H - -/* ************************************************************************ - - ************************************************************************ */ - -#include -#include -#include - -#ifdef __cplusplus -extern "C" { -#endif - - typedef struct { - - char* name; /* name of the program */ - char syslog; /* set to true to use syslog */ - FILE** logs; /* streams to which messages will be written */ - unsigned nlog; /* number of streams */ - - /* for multi-threaded use of the multilog */ - pthread_mutex_t mutex; - pthread_t thread; - - int port; /* the port on which multilog_server is listening */ - int timestamp; /* flag for placing a preceeding timestamp on the msg */ - - } multilog_t; - - /*! Open a multilogger */ - multilog_t* multilog_open (const char* program_name, char syslog); - - /*! Close a multilogger */ - int multilog_close (multilog_t* m); - - /*! Add a listener to the multilog */ - int multilog_add (multilog_t* m, FILE* fptr); - - /*! Write a message to all listening streams */ - int multilog (multilog_t* m, int priority, const char* format, ...); - - /*! Start another thread to receive log socket connections */ - int multilog_serve (multilog_t* m, int port); - - /*! Print just to a stream in the multilog format */ - int multilog_fprintf(FILE* stream, int priority, const char* format, ...); - -#ifdef __cplusplus - } -#endif - -#endif diff --git a/psrdada/tmutil.c b/psrdada/tmutil.c deleted file mode 100644 index 0aadc61..0000000 --- a/psrdada/tmutil.c +++ /dev/null @@ -1,228 +0,0 @@ - -#include -#include -#include -#include -#include - -#include "tmutil.h" - -time_t str2time (const char* str) -{ - struct tm time; - return str2tm (&time, str); -} - -time_t str2utctime (const char* str) -{ - struct tm time; - return str2utctm (&time, str); -} - - -time_t str2tm (struct tm* time, const char* str) -{ - char* temp = 0; /* duplicate of input */ - int trav = 0; /* travelling index */ - int endstr = 0; /* length of string */ - char infield = 0; /* true when current character is a digit */ - int field_count = 0; /* incremented when character becomes digit */ - int digits = 0; /* count of digits in string */ - - time->tm_year = 0; - time->tm_mon = 0; - time->tm_mday = 0; - time->tm_hour = 0; - time->tm_min = 0; - time->tm_sec = 0; - - temp = strdup (str); - - /* count the number of fields and cut the string off after a year, day, - hour, minute, and second can be parsed */ - while (temp[trav] != '\0') { - if (isdigit(temp[trav])) { - digits ++; - if (!infield) { - /* count only the transitions from non digits to a field of digits */ - field_count ++; - } - infield = 1; - } - else { - infield = 0; - } - if (field_count == 6) { - /* currently in the seconds field */ - temp[trav+2] = '\0'; - break; - } - else if (digits == 14) { - /* enough digits for a date */ - temp[trav+1] = '\0'; - break; - } - trav ++; - } - - endstr = strlen(temp); - /* cut off any trailing characters that are not ASCII numbers */ - while ((endstr>=0) && !isdigit(temp[endstr])) endstr --; - if (endstr < 0) - return -1; - temp [endstr+1] = '\0'; - - - /* parse UTC seconds */ - trav = endstr - 1; - if ((trav < 0) || !isdigit(temp[trav])) - trav++; - sscanf (temp+trav, "%2d", &(time->tm_sec)); - - /* cut out seconds and extra characters */ - endstr = trav-1; - while ((endstr>=0) && !isdigit(temp[endstr])) endstr --; - if (endstr < 0) - return 0; - temp [endstr+1] = '\0'; - - /* parse UTC minutes */ - trav = endstr - 1; - if ((trav < 0) || !isdigit(temp[trav])) - trav++; - sscanf (temp+trav, "%2d", &(time->tm_min)); - - /* cut out minutes and extra characters */ - endstr = trav-1; - while ((endstr>=0) && !isdigit(temp[endstr])) endstr --; - if (endstr < 0) - return 0; - temp [endstr+1] = '\0'; - - /* parse UTC hours */ - trav = endstr - 1; - if ((trav < 0) || !isdigit(temp[trav])) - trav++; - sscanf (temp+trav, "%2d", &(time->tm_hour)); - - /* cut out minutes and extra characters */ - endstr = trav-1; - while ((endstr>=0) && !isdigit(temp[endstr])) endstr --; - if (endstr < 0) - return 0; - temp [endstr+1] = '\0'; - - /* parse UTC days in month */ - trav = endstr - 1; - if ((trav < 0) || !isdigit(temp[trav])) - trav++; - sscanf (temp+trav, "%2d", &(time->tm_mday)); - - /* cut out minutes and extra characters */ - endstr = trav-1; - while ((endstr>=0) && !isdigit(temp[endstr])) endstr --; - if (endstr < 0) - return 0; - temp [endstr+1] = '\0'; - - /* parse UTC months */ - trav = endstr - 1; - if ((trav < 0) || !isdigit(temp[trav])) - trav++; - sscanf (temp+trav, "%2d", &(time->tm_mon)); - /* month is stored 0->11 in struct tm */ - time->tm_mon --; - - /* cut out minutes and extra characters */ - endstr = trav-1; - while ((endstr>=0) && !isdigit(temp[endstr])) endstr --; - if (endstr < 0) - return 0; - temp [endstr+1] = '\0'; - - /* parse UTC year */ - trav = endstr; - while ((trav >= 0) && (endstr-trav < 4) && isdigit(temp[trav])) - trav--; - sscanf (temp+trav+1, "%4d", &(time->tm_year)); - - free (temp); - - time->tm_wday = 0; - time->tm_yday = 0; - time->tm_isdst = -1; - - /* this may cause a Y3.8K bug */ - if (time->tm_year > 1900) - time->tm_year -= 1900; - - /* Y2K bug assumption */ - if (time->tm_year < 30) - time->tm_year += 100; - - return mktime (time); - -} - -time_t str2utctm (struct tm* time, const char* str) -{ - - /* append the GMT+0 timeszone information */ - char * str_utc = malloc(sizeof(char) * (strlen(str) + 4 + 1)); - sprintf(str_utc, "%s UTC",str); - - const char * format = "%Y-%m-%d-%H:%M:%S %Z"; - - strptime(str_utc, format, time); - - free(str_utc); - return timegm(time); -} - -time_t mjd2utctm (double mjd) -{ - const int seconds_in_day = 86400; - int days = (int) mjd; - double fdays = mjd - (double) days; - double seconds = fdays * (double) seconds_in_day; - int secs = (int) seconds; - double fracsec = seconds - (double) secs; - if (fracsec - 1 < 0.0000001) - secs++; - - int julian_day = days + 2400001; - - int n_four = 4 * (julian_day+((6*((4*julian_day-17918)/146097))/4+1)/2-37); - int n_dten = 10 * (((n_four-237)%1461)/4) + 5; - - struct tm gregdate; - gregdate.tm_year = n_four/1461 - 4712 - 1900; // extra -1900 for C struct tm - gregdate.tm_mon = (n_dten/306+2)%12; // struct tm mon 0->11 - gregdate.tm_mday = (n_dten%306)/10 + 1; - - gregdate.tm_hour = secs / 3600; - secs -= 3600 * gregdate.tm_hour; - - - gregdate.tm_min = secs / 60; - secs -= 60 * (gregdate.tm_min); - - gregdate.tm_sec = secs; - - gregdate.tm_isdst = -1; - time_t date = mktime (&gregdate); - - return date; - -} - -void float_sleep (float seconds) -{ - struct timeval t ; - t.tv_sec = seconds; - seconds -= t.tv_sec; - t.tv_usec = seconds * 1e6; - select (0, 0, 0, 0, &t) ; -} - - diff --git a/psrdada/tmutil.h b/psrdada/tmutil.h deleted file mode 100644 index ec001a5..0000000 --- a/psrdada/tmutil.h +++ /dev/null @@ -1,40 +0,0 @@ -/* $Source: /cvsroot/psrdada/psrdada/src/tmutil.h,v $ - $Revision: 1.4 $ - $Date: 2016/09/29 08:55:49 $ - $Author: ajameson $ */ - -#ifndef DADA_UTC_H -#define DADA_UTC_H - -#ifndef _XOPEN_SOURCE -#define _XOPEN_SOURCE /* glibc2 needs this for strptime */ -#endif -#include - -#ifdef __cplusplus -extern "C" { -#endif - - /*! parse a string into struct tm; return equivalent time_t */ - time_t str2tm (struct tm* time, const char* str); - - /*! parse a string and return equivalent time_t */ - time_t str2time (const char* str); - - /*! parse a UTC string and return equivalent time_t */ - time_t str2utctime (const char* str); - - /*! parse a UTC time string into struct tm; return equivalent time_t */ - time_t str2utctm (struct tm* time, const char* str); - - /*! convert a UTC MJD time into the struct tm */ - time_t mjd2utctm (double mjd); - - void float_sleep (float seconds); - -#ifdef __cplusplus -} -#endif - -#endif - diff --git a/setup.py b/setup.py index ae9d840..ca0319c 100644 --- a/setup.py +++ b/setup.py @@ -10,6 +10,7 @@ from Cython.Build import cythonize from setuptools import setup from distutils.extension import Extension +from os import environ with open('README.md') as readme_file: README = readme_file.read() @@ -17,16 +18,57 @@ with open('VERSION') as version_file: PROJECT_VERSION = version_file.read() +# Get the header locations from the environment +INCLUDE_DIRS = [] +if "CPATH" in environ: + flags = environ["CPATH"].split(':') + for flag in flags: + # when usingn spack, there is no -I prefix + INCLUDE_DIRS.append(flag) + +if "CFLAGS" in environ: + flags = environ["CFLAGS"].split(' ') + for flag in flags: + if flag[0:2] == '-I': + # when usingn spack, there is no -I prefix + INCLUDE_DIRS.append(flag[2:-1]) + +# keep the original order +INCLUDE_DIRS.reverse() + +# Get the header locations from the environment +LIBRARY_DIRS = [] +if "LD_LIBRARY_PATH" in environ: + flags = environ["LD_LIBRARY_PATH"].split(':') + for flag in flags: + # when usingn spack, there is no -I prefix + LIBRARY_DIRS.append(flag) + + # keep the original order + LIBRARY_DIRS.reverse() + EXTENSIONS = [ Extension( "psrdada.ringbuffer", - ["psrdada/ringbuffer.pyx","psrdada/dada_hdu.c", "psrdada/ipcio.c", "psrdada/ipcbuf.c", "psrdada/ascii_header.c", "psrdada/ipcutil.c", "psrdada/multilog.c", "psrdada/tmutil.c"]), + ["psrdada/ringbuffer.pyx"], + libraries=["psrdada"], + library_dirs=LIBRARY_DIRS, + include_dirs=INCLUDE_DIRS + ), Extension( "psrdada.reader", - ["psrdada/reader.pyx","psrdada/dada_hdu.c", "psrdada/ipcio.c", "psrdada/ipcbuf.c", "psrdada/ascii_header.c", "psrdada/ipcutil.c", "psrdada/multilog.c", "psrdada/tmutil.c"]), + ["psrdada/reader.pyx"], + libraries=["psrdada"], + library_dirs=LIBRARY_DIRS, + include_dirs=INCLUDE_DIRS + ), Extension( "psrdada.writer", - ["psrdada/writer.pyx","psrdada/dada_hdu.c", "psrdada/ipcio.c", "psrdada/ipcbuf.c", "psrdada/ascii_header.c", "psrdada/ipcutil.c", "psrdada/multilog.c", "psrdada/tmutil.c"]), + ["psrdada/writer.pyx"], + libraries=["psrdada"], + library_dirs=LIBRARY_DIRS, + include_dirs=INCLUDE_DIRS + ), ] setup(