ringbuffer.c   ringbuffer.c 
/* /*
* Copyright (C) 2010 Red Hat, Inc. * Copyright (C) 2010-2011 Red Hat, Inc.
* *
* Author: Angus Salkeld <asalkeld@redhat.com> * Author: Angus Salkeld <asalkeld@redhat.com>
* *
* This file is part of libqb. * This file is part of libqb.
* *
* libqb is free software: you can redistribute it and/or modify * libqb is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by * it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or * the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version. * (at your option) any later version.
* *
skipping to change at line 25 skipping to change at line 25
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details. * GNU Lesser General Public License for more details.
* *
* You should have received a copy of the GNU Lesser General Public License * You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>. * along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "ringbuffer_int.h" #include "ringbuffer_int.h"
#include <qb/qbdefs.h> #include <qb/qbdefs.h>
#include <qb/qbatomic.h> #include <qb/qbatomic.h>
//#define CRAZY_DEBUG_PRINTFS 1 /*
* #define CRAZY_DEBUG_PRINTFS 1
*/
#ifdef CRAZY_DEBUG_PRINTFS #ifdef CRAZY_DEBUG_PRINTFS
#define DEBUG_PRINTF(format, args...) \ #define DEBUG_PRINTF(format, args...) \
do { \ do { \
printf(format, ##args); \ printf(format, ##args); \
} while(0) } while(0)
#else #else
#define DEBUG_PRINTF(format, args...) #define DEBUG_PRINTF(format, args...)
#endif /* CRAZY_DEBUG_PRINTFS */ #endif /* CRAZY_DEBUG_PRINTFS */
/* the chunk header is two words /* the chunk header is two words
skipping to change at line 58 skipping to change at line 60
#define QB_RB_READ_PT_INDEX (rb->shared_hdr->size + 1) #define QB_RB_READ_PT_INDEX (rb->shared_hdr->size + 1)
#define idx_step(idx) \ #define idx_step(idx) \
do { \ do { \
if (idx > (rb->shared_hdr->size - 1)) { \ if (idx > (rb->shared_hdr->size - 1)) { \
idx = ((idx) % (rb->shared_hdr->size)); \ idx = ((idx) % (rb->shared_hdr->size)); \
} \ } \
} while (0) } while (0)
/* /*
* move the write pointer to the next 32 byte boundry * move the write pointer to the next 32 byte boundary
* write_pt goes in 4 bytes (sizeof(uint32_t)) * write_pt goes in 4 bytes (sizeof(uint32_t))
*/ */
//#define USE_CACHE_LINE_ALIGNMENT 1 /*
* #define USE_CACHE_LINE_ALIGNMENT 1
*/
#ifdef USE_CACHE_LINE_ALIGNMENT #ifdef USE_CACHE_LINE_ALIGNMENT
#define idx_cache_line_step(idx) \ #define idx_cache_line_step(idx) \
do { \ do { \
if (idx % 8) { \ if (idx % 8) { \
idx += (8 - (idx % 8)); \ idx += (8 - (idx % 8)); \
} \ } \
if (idx > (rb->shared_hdr->size - 1)) { \ if (idx > (rb->shared_hdr->size - 1)) { \
idx = ((idx) % (rb->shared_hdr->size)); \ idx = ((idx) % (rb->shared_hdr->size)); \
} \ } \
} while (0) } while (0)
skipping to change at line 83 skipping to change at line 87
#define idx_cache_line_step(idx) \ #define idx_cache_line_step(idx) \
do { \ do { \
if (idx > (rb->shared_hdr->size - 1)) { \ if (idx > (rb->shared_hdr->size - 1)) { \
idx = ((idx) % (rb->shared_hdr->size)); \ idx = ((idx) % (rb->shared_hdr->size)); \
} \ } \
} while (0) } while (0)
#endif #endif
static void qb_rb_chunk_check(qb_ringbuffer_t * rb, uint32_t pointer); static void qb_rb_chunk_check(qb_ringbuffer_t * rb, uint32_t pointer);
qb_ringbuffer_t *qb_rb_open(const char *name, size_t size, uint32_t flags, qb_ringbuffer_t *
size_t shared_user_data_size) qb_rb_open(const char *name, size_t size, uint32_t flags,
{ size_t shared_user_data_size)
struct qb_ringbuffer_s *rb = malloc(sizeof(struct qb_ringbuffer_s)); {
size_t real_size = QB_ROUNDUP(size, sysconf(_SC_PAGESIZE)); struct qb_ringbuffer_s *rb;
size_t real_size;
size_t shared_size;
char path[PATH_MAX]; char path[PATH_MAX];
int32_t fd_hdr; int32_t fd_hdr;
int32_t fd_data; int32_t fd_data;
uint32_t file_flags = O_RDWR; uint32_t file_flags = O_RDWR;
size_t shared_size = sizeof(struct qb_ringbuffer_shared_s);
char filename[PATH_MAX]; char filename[PATH_MAX];
int32_t error = 0; int32_t error = 0;
void *shm_addr;
long page_size = sysconf(_SC_PAGESIZE);
shared_size += shared_user_data_size; #ifdef QB_FORCE_SHM_ALIGN
page_size = QB_MAX(page_size, 16 * 1024);
#endif /* QB_FORCE_SHM_ALIGN */
real_size = QB_ROUNDUP(size, page_size);
shared_size =
sizeof(struct qb_ringbuffer_shared_s) + shared_user_data_size;
if (flags & QB_RB_FLAG_CREATE) { if (flags & QB_RB_FLAG_CREATE) {
file_flags |= O_CREAT | O_TRUNC; file_flags |= O_CREAT | O_TRUNC;
} }
rb = calloc(1, sizeof(struct qb_ringbuffer_s));
if (rb == NULL) {
return NULL;
}
/* /*
* Create a shared_hdr memory segment for the header. * Create a shared_hdr memory segment for the header.
*/ */
snprintf(filename, PATH_MAX, "%s-header", name); snprintf(filename, PATH_MAX, "qb-%s-header", name);
fd_hdr = qb_util_mmap_file_open(path, filename, fd_hdr = qb_sys_mmap_file_open(path, filename,
shared_size, shared_size, file_flags);
file_flags);
if (fd_hdr < 0) { if (fd_hdr < 0) {
error = fd_hdr; error = fd_hdr;
qb_util_log(LOG_ERR, "couldn't create file for mmap"); qb_util_log(LOG_ERR, "couldn't create file for mmap");
return NULL; goto cleanup_hdr;
} }
rb->shared_hdr = mmap(0, rb->shared_hdr = mmap(0,
shared_size, shared_size,
PROT_READ | PROT_WRITE, MAP_SHARED, fd_hdr, 0) ; PROT_READ | PROT_WRITE, MAP_SHARED, fd_hdr, 0) ;
if (rb->shared_hdr == MAP_FAILED) { if (rb->shared_hdr == MAP_FAILED) {
error = -errno; error = -errno;
qb_util_log(LOG_ERR, "couldn't create mmap for header"); qb_util_log(LOG_ERR, "couldn't create mmap for header");
goto cleanup_hdr; goto cleanup_hdr;
skipping to change at line 139 skipping to change at line 156
if (flags & QB_RB_FLAG_CREATE) { if (flags & QB_RB_FLAG_CREATE) {
rb->shared_data = NULL; rb->shared_data = NULL;
/* rb->shared_hdr->size tracks data by ints and not bytes/ch ars. */ /* rb->shared_hdr->size tracks data by ints and not bytes/ch ars. */
rb->shared_hdr->size = real_size / sizeof(uint32_t); rb->shared_hdr->size = real_size / sizeof(uint32_t);
rb->shared_hdr->write_pt = 0; rb->shared_hdr->write_pt = 0;
rb->shared_hdr->read_pt = 0; rb->shared_hdr->read_pt = 0;
strncpy(rb->shared_hdr->hdr_path, path, PATH_MAX); strncpy(rb->shared_hdr->hdr_path, path, PATH_MAX);
} }
error = qb_rb_sem_create(rb, flags); error = qb_rb_sem_create(rb, flags);
if (error < 0) { if (error < 0) {
qb_util_log(LOG_ERR, "couldn't get a semaphore %s", qb_util_perror(LOG_ERR, "couldn't get a semaphore");
strerror(-error));
goto cleanup_hdr; goto cleanup_hdr;
} }
/* Create the shared_data memory segment for the actual ringbuffer. /* Create the shared_data memory segment for the actual ringbuffer.
* They have to be seperate. * They have to be separate.
*/ */
if (flags & QB_RB_FLAG_CREATE) { if (flags & QB_RB_FLAG_CREATE) {
snprintf(filename, PATH_MAX, "%s-data", name); snprintf(filename, PATH_MAX, "qb-%s-data", name);
fd_data = qb_util_mmap_file_open(path, fd_data = qb_sys_mmap_file_open(path,
filename, filename,
real_size, file_flags); real_size, file_flags);
strncpy(rb->shared_hdr->data_path, path, PATH_MAX); strncpy(rb->shared_hdr->data_path, path, PATH_MAX);
} else { } else {
fd_data = qb_util_mmap_file_open(path, fd_data = qb_sys_mmap_file_open(path,
rb->shared_hdr->data_path, rb->shared_hdr->data_path,
real_size, file_flags); real_size, file_flags);
} }
if (fd_data < 0) { if (fd_data < 0) {
error = fd_data; error = fd_data;
qb_util_log(LOG_ERR, "couldn't create file for mmap"); qb_util_log(LOG_ERR, "couldn't create file for mmap");
goto cleanup_hdr; goto cleanup_hdr;
} }
qb_util_log(LOG_DEBUG, qb_util_log(LOG_DEBUG,
"shm size:%zd; real_size:%zd; rb->size:%d", size, "shm size:%zd; real_size:%zd; rb->size:%d", size,
real_size, rb->shared_hdr->size); real_size, rb->shared_hdr->size);
error = qb_util_circular_mmap(fd_data, error = qb_sys_circular_mmap(fd_data, &shm_addr, real_size);
(void **)&rb->shared_data, real_size); rb->shared_data = shm_addr;
if (error != 0) { if (error != 0) {
qb_util_log(LOG_ERR, "couldn't create circular mmap on %s", qb_util_log(LOG_ERR, "couldn't create circular mmap on %s",
rb->shared_hdr->data_path); rb->shared_hdr->data_path);
goto cleanup_data; goto cleanup_data;
} }
if (flags & QB_RB_FLAG_CREATE) { if (flags & QB_RB_FLAG_CREATE) {
memset(rb->shared_data, 0, real_size); memset(rb->shared_data, 0, real_size);
rb->shared_data[rb->shared_hdr->size] = 5; rb->shared_data[rb->shared_hdr->size] = 5;
rb->shared_hdr->ref_count = 1; rb->shared_hdr->ref_count = 1;
skipping to change at line 195 skipping to change at line 211
close(fd_data); close(fd_data);
return rb; return rb;
cleanup_data: cleanup_data:
close(fd_data); close(fd_data);
if (flags & QB_RB_FLAG_CREATE) { if (flags & QB_RB_FLAG_CREATE) {
unlink(rb->shared_hdr->data_path); unlink(rb->shared_hdr->data_path);
} }
cleanup_hdr: cleanup_hdr:
close(fd_hdr); if (fd_hdr >= 0) {
if (flags & QB_RB_FLAG_CREATE) { close(fd_hdr);
}
if (rb && (flags & QB_RB_FLAG_CREATE)) {
unlink(rb->shared_hdr->hdr_path); unlink(rb->shared_hdr->hdr_path);
(void)rb->sem_destroy_fn(rb); (void)rb->sem_destroy_fn(rb);
} }
if (rb && (rb->shared_hdr != MAP_FAILED && rb->shared_hdr != NULL)) { if (rb && (rb->shared_hdr != MAP_FAILED && rb->shared_hdr != NULL)) {
munmap(rb->shared_hdr, sizeof(struct qb_ringbuffer_shared_s) ); munmap(rb->shared_hdr, sizeof(struct qb_ringbuffer_shared_s) );
} }
free(rb); free(rb);
errno = -error; errno = -error;
return NULL; return NULL;
} }
void qb_rb_close(qb_ringbuffer_t * rb) void
qb_rb_close(qb_ringbuffer_t * rb)
{ {
if (rb == NULL) { if (rb == NULL) {
return; return;
} }
qb_util_log(LOG_DEBUG,
"Destroying ringbuffer: %s",
rb->shared_hdr->hdr_path);
(void)qb_atomic_int_dec_and_test(&rb->shared_hdr->ref_count); (void)qb_atomic_int_dec_and_test(&rb->shared_hdr->ref_count);
(void)rb->sem_destroy_fn(rb); if (rb->flags & QB_RB_FLAG_CREATE) {
unlink(rb->shared_hdr->data_path); (void)rb->sem_destroy_fn(rb);
unlink(rb->shared_hdr->hdr_path); unlink(rb->shared_hdr->data_path);
unlink(rb->shared_hdr->hdr_path);
qb_util_log(LOG_DEBUG,
"Free'ing ringbuffer: %s",
rb->shared_hdr->hdr_path);
} else {
qb_util_log(LOG_DEBUG,
"Closing ringbuffer: %s", rb->shared_hdr->hdr_pa
th);
}
munmap(rb->shared_data, (rb->shared_hdr->size * sizeof(uint32_t)) << 1); munmap(rb->shared_data, (rb->shared_hdr->size * sizeof(uint32_t)) << 1);
munmap(rb->shared_hdr, sizeof(struct qb_ringbuffer_shared_s)); munmap(rb->shared_hdr, sizeof(struct qb_ringbuffer_shared_s));
free(rb); free(rb);
} }
char *qb_rb_name_get(qb_ringbuffer_t * rb) char *
qb_rb_name_get(qb_ringbuffer_t * rb)
{ {
if (rb == NULL) {
return NULL;
}
return rb->shared_hdr->hdr_path; return rb->shared_hdr->hdr_path;
} }
void *qb_rb_shared_user_data_get(qb_ringbuffer_t * rb) void *
qb_rb_shared_user_data_get(qb_ringbuffer_t * rb)
{ {
if (rb == NULL) {
return NULL;
}
return rb->shared_hdr->user_data; return rb->shared_hdr->user_data;
} }
int32_t qb_rb_refcount_get(qb_ringbuffer_t * rb) int32_t
qb_rb_refcount_get(qb_ringbuffer_t * rb)
{ {
if (rb == NULL) {
return -EINVAL;
}
return qb_atomic_int_get(&rb->shared_hdr->ref_count); return qb_atomic_int_get(&rb->shared_hdr->ref_count);
} }
ssize_t qb_rb_space_free(qb_ringbuffer_t * rb) ssize_t
qb_rb_space_free(qb_ringbuffer_t * rb)
{ {
uint32_t write_size; uint32_t write_size;
uint32_t read_size; uint32_t read_size;
size_t space_free = 0; size_t space_free = 0;
if (rb == NULL) {
return -EINVAL;
}
write_size = rb->shared_hdr->write_pt; write_size = rb->shared_hdr->write_pt;
// TODO idx_cache_line_step (write_size); /*
* TODO idx_cache_line_step (write_size);
*/
read_size = rb->shared_hdr->read_pt; read_size = rb->shared_hdr->read_pt;
if (write_size > read_size) { if (write_size > read_size) {
space_free = space_free =
(read_size - write_size + rb->shared_hdr->size) - 1; (read_size - write_size + rb->shared_hdr->size) - 1;
} else if (write_size < read_size) { } else if (write_size < read_size) {
space_free = (read_size - write_size) - 1; space_free = (read_size - write_size) - 1;
} else { } else {
space_free = rb->shared_hdr->size; space_free = rb->shared_hdr->size;
} }
/* word -> bytes */ /* word -> bytes */
return (space_free * sizeof(uint32_t)); return (space_free * sizeof(uint32_t));
} }
ssize_t qb_rb_space_used(qb_ringbuffer_t * rb) ssize_t
qb_rb_space_used(qb_ringbuffer_t * rb)
{ {
uint32_t write_size; uint32_t write_size;
uint32_t read_size; uint32_t read_size;
size_t space_used; size_t space_used;
if (rb == NULL) {
return -EINVAL;
}
write_size = rb->shared_hdr->write_pt; write_size = rb->shared_hdr->write_pt;
read_size = rb->shared_hdr->read_pt; read_size = rb->shared_hdr->read_pt;
if (write_size > read_size) { if (write_size > read_size) {
space_used = write_size - read_size; space_used = write_size - read_size;
} else if (write_size < read_size) { } else if (write_size < read_size) {
space_used = space_used =
(write_size - read_size + rb->shared_hdr->size) - 1; (write_size - read_size + rb->shared_hdr->size) - 1;
} else { } else {
space_used = 0; space_used = 0;
} }
/* word -> bytes */ /* word -> bytes */
return (space_used * sizeof(uint32_t)); return (space_used * sizeof(uint32_t));
} }
ssize_t qb_rb_chunks_used(struct qb_ringbuffer_s * rb) ssize_t
qb_rb_chunks_used(struct qb_ringbuffer_s *rb)
{ {
if (rb == NULL) {
return -EINVAL;
}
return rb->sem_getvalue_fn(rb); return rb->sem_getvalue_fn(rb);
} }
void *qb_rb_chunk_alloc(qb_ringbuffer_t * rb, size_t len) void *
qb_rb_chunk_alloc(qb_ringbuffer_t * rb, size_t len)
{ {
uint32_t write_pt; uint32_t write_pt;
if (rb == NULL) {
return NULL;
}
/* /*
* Reclaim data if we are over writing and we need space * Reclaim data if we are over writing and we need space
*/ */
if (rb->flags & QB_RB_FLAG_OVERWRITE) { if (rb->flags & QB_RB_FLAG_OVERWRITE) {
while (qb_rb_space_free(rb) < while (qb_rb_space_free(rb) <
(len + QB_RB_CHUNK_HEADER_SIZE + 4)) { (len + QB_RB_CHUNK_HEADER_SIZE + 4)) {
qb_rb_chunk_reclaim(rb); qb_rb_chunk_reclaim(rb);
} }
} else { } else {
if (qb_rb_space_free(rb) < if (qb_rb_space_free(rb) < (len + QB_RB_CHUNK_HEADER_SIZE +
(len + QB_RB_CHUNK_HEADER_SIZE + 4)) { 4)) {
errno = EAGAIN; errno = EAGAIN;
return NULL; return NULL;
} }
} }
write_pt = rb->shared_hdr->write_pt; write_pt = rb->shared_hdr->write_pt;
/* /*
* insert the chunk header * insert the chunk header
*/ */
rb->shared_data[write_pt++] = 0; rb->shared_data[write_pt++] = 0;
idx_step(write_pt); idx_step(write_pt);
rb->shared_data[write_pt++] = QB_RB_CHUNK_MAGIC; rb->shared_data[write_pt++] = QB_RB_CHUNK_MAGIC;
idx_step(write_pt); idx_step(write_pt);
/* /*
* return a pointer to the begining of the chunk data * return a pointer to the beginning of the chunk data
*/ */
return (void *)&rb->shared_data[write_pt]; return (void *)&rb->shared_data[write_pt];
} }
static uint32_t qb_rb_chunk_step(qb_ringbuffer_t * rb, uint32_t pointer) static uint32_t
qb_rb_chunk_step(qb_ringbuffer_t * rb, uint32_t pointer)
{ {
uint32_t chunk_size = QB_RB_CHUNK_SIZE_GET(rb, pointer); uint32_t chunk_size = QB_RB_CHUNK_SIZE_GET(rb, pointer);
/* /*
* skip over the chunk header * skip over the chunk header
*/ */
pointer += QB_RB_CHUNK_HEADER_WORDS; pointer += QB_RB_CHUNK_HEADER_WORDS;
idx_step(pointer); idx_step(pointer);
/* /*
* skip over the user's data. * skip over the user's data.
skipping to change at line 348 skipping to change at line 402
pointer += (chunk_size / sizeof(uint32_t)); pointer += (chunk_size / sizeof(uint32_t));
/* make allowance for non-word sizes */ /* make allowance for non-word sizes */
if ((chunk_size % sizeof(uint32_t)) != 0) { if ((chunk_size % sizeof(uint32_t)) != 0) {
pointer++; pointer++;
} }
idx_cache_line_step(pointer); idx_cache_line_step(pointer);
return pointer; return pointer;
} }
int32_t qb_rb_chunk_commit(qb_ringbuffer_t * rb, size_t len) int32_t
qb_rb_chunk_commit(qb_ringbuffer_t * rb, size_t len)
{ {
uint32_t old_write_pt = rb->shared_hdr->write_pt; uint32_t old_write_pt;
if (rb == NULL) {
return -EINVAL;
}
/* /*
* commit the magic & chunk_size * commit the magic & chunk_size
*/ */
old_write_pt = rb->shared_hdr->write_pt;
rb->shared_data[old_write_pt] = len; rb->shared_data[old_write_pt] = len;
rb->shared_data[old_write_pt + 1] = QB_RB_CHUNK_MAGIC; rb->shared_data[old_write_pt + 1] = QB_RB_CHUNK_MAGIC;
/* /*
* commit the new write pointer * commit the new write pointer
*/ */
rb->shared_hdr->write_pt = qb_rb_chunk_step(rb, old_write_pt); rb->shared_hdr->write_pt = qb_rb_chunk_step(rb, old_write_pt);
DEBUG_PRINTF("%s: read: %u, write: %u (was:%u)\n", __func__, DEBUG_PRINTF("%s: read: %u, write: %u (was:%u)\n", __func__,
rb->shared_hdr->read_pt, rb->shared_hdr->write_pt, rb->shared_hdr->read_pt, rb->shared_hdr->write_pt,
old_write_pt); old_write_pt);
/* /*
* post the notification to the reader * post the notification to the reader
*/ */
return rb->sem_post_fn(rb); return rb->sem_post_fn(rb);
} }
ssize_t qb_rb_chunk_write(qb_ringbuffer_t * rb, const void *data, size_t le ssize_t
n) qb_rb_chunk_write(qb_ringbuffer_t * rb, const void *data, size_t len)
{ {
char *dest = qb_rb_chunk_alloc(rb, len); char *dest = qb_rb_chunk_alloc(rb, len);
int32_t res = 0; int32_t res = 0;
if (rb == NULL) {
return -EINVAL;
}
if (dest == NULL) { if (dest == NULL) {
return -errno; return -errno;
} }
memcpy(dest, data, len); memcpy(dest, data, len);
res = qb_rb_chunk_commit(rb, len); res = qb_rb_chunk_commit(rb, len);
if (res < 0) { if (res < 0) {
return res; return res;
} }
return len; return len;
} }
void qb_rb_chunk_reclaim(qb_ringbuffer_t * rb) void
qb_rb_chunk_reclaim(qb_ringbuffer_t * rb)
{ {
uint32_t old_read_pt = rb->shared_hdr->read_pt; uint32_t old_read_pt;
if (qb_rb_space_used(rb) == 0) { if (rb == NULL || qb_rb_space_used(rb) == 0) {
return; return;
} }
old_read_pt = rb->shared_hdr->read_pt;
qb_rb_chunk_check(rb, old_read_pt); qb_rb_chunk_check(rb, old_read_pt);
rb->shared_hdr->read_pt = qb_rb_chunk_step(rb, old_read_pt); rb->shared_hdr->read_pt = qb_rb_chunk_step(rb, old_read_pt);
/* /*
* clear the header * clear the header
*/ */
rb->shared_data[old_read_pt] = 0; rb->shared_data[old_read_pt] = 0;
rb->shared_data[old_read_pt + 1] = 0; rb->shared_data[old_read_pt + 1] = 0;
DEBUG_PRINTF("%s: read: %u (was:%u), write: %u\n", __func__, DEBUG_PRINTF("%s: read: %u (was:%u), write: %u\n", __func__,
rb->shared_hdr->read_pt, old_read_pt, rb->shared_hdr->read_pt, old_read_pt,
rb->shared_hdr->write_pt); rb->shared_hdr->write_pt);
} }
ssize_t qb_rb_chunk_peek(qb_ringbuffer_t * rb, void **data_out, int32_t tim ssize_t
eout) qb_rb_chunk_peek(qb_ringbuffer_t * rb, void **data_out, int32_t timeout)
{ {
uint32_t read_pt; uint32_t read_pt;
uint32_t chunk_size; uint32_t chunk_size;
uint32_t chunk_magic; uint32_t chunk_magic;
int32_t res; int32_t res;
if (rb == NULL) {
return -EINVAL;
}
res = rb->sem_timedwait_fn(rb, timeout); res = rb->sem_timedwait_fn(rb, timeout);
if (res < 0 && res != -EIDRM) { if (res < 0 && res != -EIDRM) {
if (res != -ETIMEDOUT) { if (res != -ETIMEDOUT) {
qb_util_log(LOG_ERR, "sem_timedwait %s", strerror(-r es)); qb_util_perror(LOG_ERR, "sem_timedwait");
} }
return res; return res;
} }
read_pt = rb->shared_hdr->read_pt; read_pt = rb->shared_hdr->read_pt;
chunk_size = QB_RB_CHUNK_SIZE_GET(rb, read_pt); chunk_size = QB_RB_CHUNK_SIZE_GET(rb, read_pt);
chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, read_pt); chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, read_pt);
*data_out = &rb->shared_data[read_pt + QB_RB_CHUNK_HEADER_WORDS]; *data_out = &rb->shared_data[read_pt + QB_RB_CHUNK_HEADER_WORDS];
if (chunk_magic != QB_RB_CHUNK_MAGIC) { if (chunk_magic != QB_RB_CHUNK_MAGIC) {
errno = ENOMSG; errno = ENOMSG;
skipping to change at line 448 skipping to change at line 517
return chunk_size; return chunk_size;
} }
} }
ssize_t ssize_t
qb_rb_chunk_read(qb_ringbuffer_t * rb, void *data_out, size_t len, qb_rb_chunk_read(qb_ringbuffer_t * rb, void *data_out, size_t len,
int32_t timeout) int32_t timeout)
{ {
uint32_t read_pt; uint32_t read_pt;
uint32_t chunk_size; uint32_t chunk_size;
int32_t res; int32_t res = 0;
res = rb->sem_timedwait_fn(rb, timeout); if (rb == NULL) {
return -EINVAL;
}
if (rb->sem_timedwait_fn) {
res = rb->sem_timedwait_fn(rb, timeout);
}
if (res < 0 && res != -EIDRM) { if (res < 0 && res != -EIDRM) {
if (res != -ETIMEDOUT) { if (res != -ETIMEDOUT) {
qb_util_log(LOG_ERR, qb_util_perror(LOG_ERR, "sem_timedwait");
"sem_timedwait %s", strerror(errno));
} }
return res; return res;
} }
if (qb_rb_space_used(rb) == 0) { if (qb_rb_space_used(rb) == 0) {
return -ENOMSG; return -ENOMSG;
} }
read_pt = rb->shared_hdr->read_pt; read_pt = rb->shared_hdr->read_pt;
qb_rb_chunk_check(rb, read_pt); qb_rb_chunk_check(rb, read_pt);
skipping to change at line 480 skipping to change at line 553
memcpy(data_out, memcpy(data_out,
&rb->shared_data[read_pt + QB_RB_CHUNK_HEADER_WORDS], &rb->shared_data[read_pt + QB_RB_CHUNK_HEADER_WORDS],
chunk_size); chunk_size);
qb_rb_chunk_reclaim(rb); qb_rb_chunk_reclaim(rb);
return chunk_size; return chunk_size;
} }
static void print_header(qb_ringbuffer_t * rb) static void
print_header(qb_ringbuffer_t * rb)
{ {
printf("Ringbuffer: \n"); printf("Ringbuffer: \n");
if (rb->flags & QB_RB_FLAG_OVERWRITE) { if (rb->flags & QB_RB_FLAG_OVERWRITE) {
printf(" ->OVERWRITE\n"); printf(" ->OVERWRITE\n");
} else { } else {
printf(" ->NORMAL\n"); printf(" ->NORMAL\n");
} }
printf(" ->write_pt [%d]\n", rb->shared_hdr->write_pt); printf(" ->write_pt [%d]\n", rb->shared_hdr->write_pt);
printf(" ->read_pt [%d]\n", rb->shared_hdr->read_pt); printf(" ->read_pt [%d]\n", rb->shared_hdr->read_pt);
printf(" ->size [%d words]\n", rb->shared_hdr->size); printf(" ->size [%d words]\n", rb->shared_hdr->size);
#ifndef S_SPLINT_S #ifndef S_SPLINT_S
printf(" =>free [%zu bytes]\n", qb_rb_space_free(rb)); printf(" =>free [%zu bytes]\n", qb_rb_space_free(rb));
printf(" =>used [%zu bytes]\n", qb_rb_space_used(rb)); printf(" =>used [%zu bytes]\n", qb_rb_space_used(rb));
#endif /* S_SPLINT_S */ #endif /* S_SPLINT_S */
} }
static void qb_rb_chunk_check(qb_ringbuffer_t * rb, uint32_t pointer) static void
qb_rb_chunk_check(qb_ringbuffer_t * rb, uint32_t pointer)
{ {
uint32_t chunk_size; uint32_t chunk_size;
uint32_t chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, pointer); uint32_t chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, pointer);
if (chunk_magic != QB_RB_CHUNK_MAGIC) { if (chunk_magic != QB_RB_CHUNK_MAGIC) {
chunk_size = QB_RB_CHUNK_SIZE_GET(rb, pointer); chunk_size = QB_RB_CHUNK_SIZE_GET(rb, pointer);
printf("size: %x\n", chunk_size); printf("size: %x\n", chunk_size);
printf("magic: %x\n", chunk_magic); printf("magic: %x\n", chunk_magic);
print_header(rb); print_header(rb);
assert(0); assert(0);
} }
} }
ssize_t qb_rb_write_to_file(qb_ringbuffer_t * rb, int32_t fd) ssize_t
qb_rb_write_to_file(qb_ringbuffer_t * rb, int32_t fd)
{ {
ssize_t result = 0; ssize_t result;
ssize_t written_size = 0; ssize_t written_size = 0;
if (rb == NULL) {
return -EINVAL;
}
print_header(rb); print_header(rb);
result = write(fd, &rb->shared_hdr->size, sizeof(uint32_t)); result = write(fd, &rb->shared_hdr->size, sizeof(uint32_t));
if ((result < 0) || (result != sizeof(uint32_t))) { if (result != sizeof(uint32_t)) {
return -errno; return -errno;
} }
written_size += result; written_size += result;
result = write(fd, rb->shared_data, result = write(fd, rb->shared_data,
rb->shared_hdr->size * sizeof(uint32_t)); rb->shared_hdr->size * sizeof(uint32_t));
if ((result < 0) if (result != rb->shared_hdr->size * sizeof(uint32_t)) {
|| (result != rb->shared_hdr->size * sizeof(uint32_t))) {
return -errno; return -errno;
} }
written_size += result; written_size += result;
/* /*
* store the read & write pointers * store the read & write pointers
*/ */
result += result = write(fd, (void *)&rb->shared_hdr->write_pt, sizeof(uint32_
write(fd, (void *)&rb->shared_hdr->write_pt, sizeof(uint32_t)); t));
if ((result < 0) || (result != sizeof(uint32_t))) { if (result != sizeof(uint32_t)) {
return -errno; return -errno;
} }
written_size += result; written_size += result;
result += write(fd, (void *)&rb->shared_hdr->read_pt, sizeof(uint32_ result = write(fd, (void *)&rb->shared_hdr->read_pt, sizeof(uint32_t
t)); ));
if ((result < 0) || (result != sizeof(uint32_t))) { if (result != sizeof(uint32_t)) {
return -errno; return -errno;
} }
written_size += result; written_size += result;
qb_util_log(LOG_DEBUG, " writing total of: %zd\n", written_size);
return written_size; return written_size;
} }
qb_ringbuffer_t *qb_rb_create_from_file(int32_t fd, uint32_t flags) qb_ringbuffer_t *
qb_rb_create_from_file(int32_t fd, uint32_t flags)
{ {
ssize_t n_read; ssize_t n_read;
size_t n_required; size_t n_required;
qb_ringbuffer_t *rb = malloc(sizeof(qb_ringbuffer_t)); size_t total_read = 0;
rb->shared_hdr = malloc(sizeof(struct qb_ringbuffer_shared_s)); uint32_t read_pt;
uint32_t write_pt;
qb_ringbuffer_t *rb = calloc(1, sizeof(qb_ringbuffer_t));
if (rb == NULL || fd < 0) {
return NULL;
}
rb->shared_hdr = calloc(1, sizeof(struct qb_ringbuffer_shared_s));
if (rb->shared_hdr == NULL) {
goto cleanup_fail2;
}
rb->flags = flags; rb->flags = flags;
n_required = sizeof(uint32_t); n_required = sizeof(uint32_t);
n_read = read(fd, &rb->shared_hdr->size, n_required); n_read = read(fd, &rb->shared_hdr->size, n_required);
if (n_read != n_required) { if (n_read != n_required) {
qb_util_log(LOG_ERR, "Unable to read fdata header %s", qb_util_perror(LOG_ERR, "Unable to read blackbox file header
strerror(errno)); ");
return NULL; goto cleanup_fail;
} }
total_read += n_read;
n_required = ((rb->shared_hdr->size + 2) * sizeof(uint32_t)); n_required = (rb->shared_hdr->size * sizeof(uint32_t));
if ((rb->shared_data = malloc(n_required)) == NULL) { if ((rb->shared_data = malloc(n_required)) == NULL) {
qb_util_log(LOG_ERR, "exhausted virtual memory"); qb_util_perror(LOG_ERR, "exhausted virtual memory");
return NULL; goto cleanup_fail;
} }
n_read = read(fd, rb->shared_data, n_required); n_read = read(fd, rb->shared_data, n_required);
if (n_read < 0) { if (n_read < 0) {
qb_util_log(LOG_ERR, "file read failed: %s", strerror(errno) qb_util_perror(LOG_ERR, "Unable to read blackbox file data")
); ;
return NULL; goto cleanup_fail;
} }
total_read += n_read;
if (n_read != n_required) { if (n_read != n_required) {
qb_util_log(LOG_WARNING, "read %zd bytes, but expected %zu", qb_util_log(LOG_WARNING, "read %zd bytes, but expected %zu",
n_read, n_required); n_read, n_required);
goto cleanup_fail;
} }
rb->shared_hdr->write_pt = rb->shared_data[QB_RB_WRITE_PT_INDEX]; n_read = read(fd, &write_pt, sizeof(uint32_t));
rb->shared_hdr->read_pt = rb->shared_data[QB_RB_READ_PT_INDEX]; assert(n_read == sizeof(uint32_t));
rb->shared_hdr->write_pt = write_pt;
total_read += n_read;
n_read = read(fd, &read_pt, sizeof(uint32_t));
assert(n_read == sizeof(uint32_t));
rb->shared_hdr->read_pt = read_pt;
total_read += n_read;
qb_util_log(LOG_DEBUG, "read total of: %zd", total_read);
print_header(rb); print_header(rb);
return rb; return rb;
cleanup_fail:
free(rb->shared_hdr);
cleanup_fail2:
free(rb);
return NULL;
} }
int32_t qb_rb_chown(qb_ringbuffer_t * rb, uid_t owner, gid_t group) int32_t
qb_rb_chown(qb_ringbuffer_t * rb, uid_t owner, gid_t group)
{ {
int32_t res = chown(rb->shared_hdr->data_path, owner, group); int32_t res;
if (rb == NULL) {
return -EINVAL;
}
res = chown(rb->shared_hdr->data_path, owner, group);
if (res < 0) { if (res < 0) {
return -errno; return -errno;
} }
res = chown(rb->shared_hdr->hdr_path, owner, group); res = chown(rb->shared_hdr->hdr_path, owner, group);
if (res < 0) { if (res < 0) {
return -errno; return -errno;
} }
return 0; return 0;
} }
 End of changes. 78 change blocks. 
93 lines changed or deleted 207 lines changed or added

This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/