ipc_posix_mq.c   ipc_posix_mq.c 
skipping to change at line 44 skipping to change at line 44
static size_t q_space_used = 0; static size_t q_space_used = 0;
#ifdef QB_LINUX #ifdef QB_LINUX
#define QB_RLIMIT_CHANGE_NEEDED 1 #define QB_RLIMIT_CHANGE_NEEDED 1
#endif /* QB_LINUX */ #endif /* QB_LINUX */
/* /*
* utility functions * utility functions
* -------------------------------------------------------- * --------------------------------------------------------
*/ */
static int32_t posix_mq_increase_limits(size_t max_msg_size, int32_t q_len) static int32_t
posix_mq_increase_limits(size_t max_msg_size, int32_t q_len)
{ {
int32_t res = 0; int32_t res = 0;
#ifdef QB_RLIMIT_CHANGE_NEEDED #ifdef QB_RLIMIT_CHANGE_NEEDED
struct rlimit rlim; struct rlimit rlim;
size_t q_space_needed; size_t q_space_needed;
#endif /* QB_RLIMIT_CHANGE_NEEDED */ #endif /* QB_RLIMIT_CHANGE_NEEDED */
#ifdef QB_RLIMIT_CHANGE_NEEDED #ifdef QB_RLIMIT_CHANGE_NEEDED
if (getrlimit(RLIMIT_MSGQUEUE, &rlim) != 0) { if (getrlimit(RLIMIT_MSGQUEUE, &rlim) != 0) {
res = -errno; res = -errno;
skipping to change at line 77 skipping to change at line 78
rlim.rlim_max = q_space_needed; rlim.rlim_max = q_space_needed;
} }
if (setrlimit(RLIMIT_MSGQUEUE, &rlim) != 0) { if (setrlimit(RLIMIT_MSGQUEUE, &rlim) != 0) {
res = -errno; res = -errno;
qb_util_log(LOG_ERR, "setrlimit failed"); qb_util_log(LOG_ERR, "setrlimit failed");
} }
#endif /* QB_RLIMIT_CHANGE_NEEDED */ #endif /* QB_RLIMIT_CHANGE_NEEDED */
return res; return res;
} }
static int32_t posix_mq_open(struct qb_ipc_one_way *one_way, static int32_t
const char *name, size_t q_len) posix_mq_open(struct qb_ipc_one_way *one_way, const char *name, size_t q_le
n)
{ {
int32_t res = posix_mq_increase_limits(one_way->max_msg_size, q_len) ; int32_t res = posix_mq_increase_limits(one_way->max_msg_size, q_len) ;
if (res != 0) { if (res != 0) {
return res; return res;
} }
one_way->u.pmq.q = mq_open(name, O_RDWR); one_way->u.pmq.q = mq_open(name, O_RDWR);
if (one_way->u.pmq.q == (mqd_t) - 1) { if (one_way->u.pmq.q == (mqd_t) - 1) {
res = -errno; res = -errno;
perror("mq_open"); perror("mq_open");
return res; return res;
} }
strcpy(one_way->u.pmq.name, name); strcpy(one_way->u.pmq.name, name);
q_space_used += one_way->max_msg_size * q_len; q_space_used += one_way->max_msg_size * q_len;
return 0; return 0;
} }
static int32_t posix_mq_create(struct qb_ipcs_connection *c, static int32_t
struct qb_ipc_one_way *one_way, posix_mq_create(struct qb_ipcs_connection *c,
const char *name, size_t q_len) struct qb_ipc_one_way *one_way, const char *name, size_t q_l
en)
{ {
struct mq_attr attr; struct mq_attr attr;
mqd_t q = 0; mqd_t q = 0;
int32_t res = 0; int32_t res = 0;
mode_t m = 0600; mode_t m = 0600;
size_t max_msg_size = one_way->max_msg_size; size_t max_msg_size = one_way->max_msg_size;
res = posix_mq_increase_limits(max_msg_size, q_len); res = posix_mq_increase_limits(max_msg_size, q_len);
if (res != 0) { if (res != 0) {
return res; return res;
skipping to change at line 126 skipping to change at line 127
attr.mq_msgsize = max_msg_size; attr.mq_msgsize = max_msg_size;
q = mq_open(name, O_RDWR | O_CREAT | O_EXCL | O_NONBLOCK, m, &attr); q = mq_open(name, O_RDWR | O_CREAT | O_EXCL | O_NONBLOCK, m, &attr);
if (q == (mqd_t) - 1 && errno == ENOMEM) { if (q == (mqd_t) - 1 && errno == ENOMEM) {
if (max_msg_size > 9000 && q_len > 3) { if (max_msg_size > 9000 && q_len > 3) {
goto try_smaller; goto try_smaller;
} }
} }
if (q == (mqd_t) - 1) { if (q == (mqd_t) - 1) {
res = -errno; res = -errno;
qb_util_log(LOG_ERR, "Can't create mq \"%s\": %s", qb_util_perror(LOG_ERR, "Can't create mq \"%s\"", name);
name, strerror(errno));
return res; return res;
} }
q_space_used += max_msg_size * q_len; q_space_used += max_msg_size * q_len;
one_way->max_msg_size = max_msg_size; one_way->max_msg_size = max_msg_size;
one_way->u.pmq.q = q; one_way->u.pmq.q = q;
strcpy(one_way->u.pmq.name, name); strcpy(one_way->u.pmq.name, name);
res = fchown((int)q, c->euid, c->egid); res = fchown((int)q, c->euid, c->egid);
if (res == -1) { if (res == -1) {
res = -errno; res = -errno;
qb_util_log(LOG_ERR, "fchown:%s %s", name, strerror(errno)); qb_util_perror(LOG_ERR, "fchown:%s", name);
mq_close(q); mq_close(q);
mq_unlink(name); mq_unlink(name);
} }
return res; return res;
} }
static ssize_t qb_ipc_pmq_send(struct qb_ipc_one_way *one_way, static ssize_t
const void *msg_ptr, size_t msg_len) qb_ipc_pmq_send(struct qb_ipc_one_way *one_way,
const void *msg_ptr, size_t msg_len)
{ {
int32_t res = mq_send(one_way->u.pmq.q, msg_ptr, msg_len, 1); int32_t res = mq_send(one_way->u.pmq.q, msg_ptr, msg_len, 1);
if (res != 0) { if (res != 0) {
return -errno; return -errno;
} }
return msg_len; return msg_len;
} }
static ssize_t qb_ipc_pmq_sendv(struct qb_ipc_one_way *one_way, static ssize_t
const struct iovec* iov, qb_ipc_pmq_sendv(struct qb_ipc_one_way *one_way,
size_t iov_len) const struct iovec *iov, size_t iov_len)
{ {
int32_t total_size = 0; int32_t total_size = 0;
int32_t i; int32_t i;
int32_t res = 0; int32_t res = 0;
char *data = NULL; char *data = NULL;
char *pt = NULL; char *pt = NULL;
for (i = 0; i < iov_len; i++) { for (i = 0; i < iov_len; i++) {
total_size += iov[i].iov_len; total_size += iov[i].iov_len;
} }
data = malloc(total_size); data = malloc(total_size);
if (data == NULL) {
return -ENOMEM;
}
pt = data; pt = data;
for (i = 0; i < iov_len; i++) { for (i = 0; i < iov_len; i++) {
memcpy(pt, iov[i].iov_base, iov[i].iov_len); memcpy(pt, iov[i].iov_base, iov[i].iov_len);
pt += iov[i].iov_len; pt += iov[i].iov_len;
} }
res = mq_send(one_way->u.pmq.q, data, total_size, 1); res = mq_send(one_way->u.pmq.q, data, total_size, 1);
free(data); free(data);
if (res != 0) { if (res != 0) {
return -errno; return -errno;
} }
return total_size; return total_size;
} }
static ssize_t qb_ipc_pmq_recv(struct qb_ipc_one_way *one_way, static ssize_t
void *msg_ptr, qb_ipc_pmq_recv(struct qb_ipc_one_way *one_way,
size_t msg_len, void *msg_ptr, size_t msg_len, int32_t ms_timeout)
int32_t ms_timeout)
{ {
uint32_t msg_prio; uint32_t msg_prio;
struct timespec ts_timeout; struct timespec ts_timeout;
ssize_t res; ssize_t res;
if (ms_timeout >= 0) { if (ms_timeout >= 0) {
qb_util_timespec_from_epoch_get(&ts_timeout); qb_util_timespec_from_epoch_get(&ts_timeout);
qb_timespec_add_ms(&ts_timeout, ms_timeout); qb_timespec_add_ms(&ts_timeout, ms_timeout);
} }
mq_receive_again: mq_receive_again:
if (ms_timeout >= 0) { if (ms_timeout >= 0) {
res = mq_timedreceive(one_way->u.pmq.q, res = mq_timedreceive(one_way->u.pmq.q,
(char *)msg_ptr, (char *)msg_ptr,
one_way->max_msg_size, one_way->max_msg_size,
&msg_prio, &msg_prio, &ts_timeout);
&ts_timeout);
} else { } else {
res = mq_receive(one_way->u.pmq.q, res = mq_receive(one_way->u.pmq.q,
(char *)msg_ptr, (char *)msg_ptr,
one_way->max_msg_size, one_way->max_msg_size, &msg_prio);
&msg_prio);
} }
if (res == -1) { if (res == -1) {
switch (errno) { switch (errno) {
case EINTR: case EINTR:
goto mq_receive_again; goto mq_receive_again;
break; break;
case EAGAIN: case EAGAIN:
res = -ETIMEDOUT; res = -ETIMEDOUT;
break; break;
case ETIMEDOUT: case ETIMEDOUT:
res = -errno; res = -errno;
break; break;
default: default:
res = -errno; res = -errno;
qb_util_log(LOG_ERR, qb_util_perror(LOG_ERR,
"error waiting for mq_timedreceive : %s" "error waiting for mq_timedreceive");
,
strerror(errno));
break; break;
} }
} }
return res; return res;
} }
/* /*
* client functions * client functions
* -------------------------------------------------------- * --------------------------------------------------------
*/ */
static void qb_ipcc_pmq_disconnect(struct qb_ipcc_connection *c) static void
qb_ipcc_pmq_disconnect(struct qb_ipcc_connection *c)
{ {
struct qb_ipc_request_header hdr; struct qb_ipc_request_header hdr;
qb_util_log(LOG_DEBUG, "%s()", __func__); qb_util_log(LOG_DEBUG, "%s()", __func__);
if (c->needs_sock_for_poll) { if (c->needs_sock_for_poll) {
return; return;
} }
hdr.id = QB_IPC_MSG_DISCONNECT; hdr.id = QB_IPC_MSG_DISCONNECT;
hdr.size = sizeof(hdr); hdr.size = sizeof(hdr);
skipping to change at line 262 skipping to change at line 263
mq_close(c->event.u.pmq.q); mq_close(c->event.u.pmq.q);
mq_close(c->response.u.pmq.q); mq_close(c->response.u.pmq.q);
mq_close(c->request.u.pmq.q); mq_close(c->request.u.pmq.q);
mq_unlink(c->event.u.pmq.name); mq_unlink(c->event.u.pmq.name);
mq_unlink(c->request.u.pmq.name); mq_unlink(c->request.u.pmq.name);
mq_unlink(c->response.u.pmq.name); mq_unlink(c->response.u.pmq.name);
} }
int32_t qb_ipcc_pmq_connect(struct qb_ipcc_connection *c, int32_t
struct qb_ipc_connection_response *response) qb_ipcc_pmq_connect(struct qb_ipcc_connection *c,
struct qb_ipc_connection_response *response)
{ {
int32_t res = 0; int32_t res = 0;
c->funcs.send = qb_ipc_pmq_send; c->funcs.send = qb_ipc_pmq_send;
c->funcs.sendv = qb_ipc_pmq_sendv; c->funcs.sendv = qb_ipc_pmq_sendv;
c->funcs.recv = qb_ipc_pmq_recv; c->funcs.recv = qb_ipc_pmq_recv;
c->funcs.fc_get = NULL; c->funcs.fc_get = NULL;
c->funcs.disconnect = qb_ipcc_pmq_disconnect; c->funcs.disconnect = qb_ipcc_pmq_disconnect;
#if defined(QB_LINUX) || defined(QB_BSD) #if defined(QB_LINUX) || defined(QB_BSD)
c->needs_sock_for_poll = QB_FALSE; c->needs_sock_for_poll = QB_FALSE;
#else #else
c->needs_sock_for_poll = QB_TRUE; c->needs_sock_for_poll = QB_TRUE;
#endif #endif
if (strlen(c->name) > (NAME_MAX - 20)) { if (strlen(c->name) > (NAME_MAX - 20)) {
return -EINVAL; return -EINVAL;
} }
res = posix_mq_open(&c->request, response->request, res = posix_mq_open(&c->request, response->request, QB_REQUEST_Q_LEN
QB_REQUEST_Q_LEN); );
if (res != 0) { if (res != 0) {
perror("mq_open:REQUEST"); perror("mq_open:REQUEST");
return res; return res;
} }
res = posix_mq_open(&c->response, response->response, res = posix_mq_open(&c->response, response->response,
QB_RESPONSE_Q_LEN); QB_RESPONSE_Q_LEN);
if (res != 0) { if (res != 0) {
perror("mq_open:RESPONSE"); perror("mq_open:RESPONSE");
goto cleanup_request; goto cleanup_request;
} }
skipping to change at line 316 skipping to change at line 317
mq_close(c->request.u.pmq.q); mq_close(c->request.u.pmq.q);
return res; return res;
} }
/* /*
* service functions * service functions
* -------------------------------------------------------- * --------------------------------------------------------
*/ */
static void qb_ipcs_pmq_disconnect(struct qb_ipcs_connection *c) static void
qb_ipcs_pmq_disconnect(struct qb_ipcs_connection *c)
{ {
struct qb_ipc_response_header msg; struct qb_ipc_response_header msg;
msg.id = QB_IPC_MSG_DISCONNECT; msg.id = QB_IPC_MSG_DISCONNECT;
msg.size = sizeof(msg); msg.size = sizeof(msg);
msg.error = 0; msg.error = 0;
(void)qb_ipc_pmq_send(&c->event, &msg, msg.size); (void)qb_ipc_pmq_send(&c->event, &msg, msg.size);
mq_close(c->event.u.pmq.q); mq_close(c->event.u.pmq.q);
mq_close(c->response.u.pmq.q); mq_close(c->response.u.pmq.q);
mq_close(c->request.u.pmq.q); mq_close(c->request.u.pmq.q);
mq_unlink(c->event.u.pmq.name); mq_unlink(c->event.u.pmq.name);
mq_unlink(c->request.u.pmq.name); mq_unlink(c->request.u.pmq.name);
mq_unlink(c->response.u.pmq.name); mq_unlink(c->response.u.pmq.name);
} }
static int32_t qb_ipcs_pmq_connect(struct qb_ipcs_service *s, static int32_t
struct qb_ipcs_connection *c, qb_ipcs_pmq_connect(struct qb_ipcs_service *s,
struct qb_ipc_connection_response *r) struct qb_ipcs_connection *c,
struct qb_ipc_connection_response *r)
{ {
int32_t res = 0; int32_t res = 0;
snprintf(r->request, NAME_MAX, "/%s-request-%d", s->name, c->pid); snprintf(r->request, NAME_MAX, "/%s-request-%d", s->name, c->pid);
snprintf(r->response, NAME_MAX, "/%s-response-%d", s->name, c->pid); snprintf(r->response, NAME_MAX, "/%s-response-%d", s->name, c->pid);
snprintf(r->event, NAME_MAX, "/%s-event-%d", s->name, c->pid); snprintf(r->event, NAME_MAX, "/%s-event-%d", s->name, c->pid);
res = posix_mq_create(c, &c->request, r->request, QB_REQUEST_Q_LEN); res = posix_mq_create(c, &c->request, r->request, QB_REQUEST_Q_LEN);
if (res < 0) { if (res < 0) {
goto cleanup; goto cleanup;
skipping to change at line 361 skipping to change at line 364
if (res < 0) { if (res < 0) {
goto cleanup_request; goto cleanup_request;
} }
res = posix_mq_create(c, &c->event, r->event, QB_EVENT_Q_LEN); res = posix_mq_create(c, &c->event, r->event, QB_EVENT_Q_LEN);
if (res < 0) { if (res < 0) {
goto cleanup_request_response; goto cleanup_request_response;
} }
if (!s->needs_sock_for_poll) { if (!s->needs_sock_for_poll) {
res = s->poll_fns.dispatch_add(s->poll_priority, (int32_t)c- res =
>request.u.pmq.q, s->poll_fns.dispatch_add(s->poll_priority,
POLLIN | POLLPRI | POLLNVAL, (int32_t) c->request.u.pmq.q,
c, qb_ipcs_dispatch_service_r POLLIN | POLLPRI | POLLNVAL, c,
equest); qb_ipcs_dispatch_service_reques
t);
} }
r->hdr.error = 0; r->hdr.error = 0;
return res; return res;
cleanup_request_response: cleanup_request_response:
mq_close(c->response.u.pmq.q); mq_close(c->response.u.pmq.q);
mq_unlink(r->response); mq_unlink(r->response);
cleanup_request: cleanup_request:
mq_close(c->request.u.pmq.q); mq_close(c->request.u.pmq.q);
mq_unlink(r->request); mq_unlink(r->request);
cleanup: cleanup:
r->hdr.error = res; r->hdr.error = res;
return res; return res;
} }
static ssize_t qb_ipc_pmq_q_len_get(struct qb_ipc_one_way *one_way) static ssize_t
qb_ipc_pmq_q_len_get(struct qb_ipc_one_way *one_way)
{ {
struct mq_attr info; struct mq_attr info;
int32_t res = mq_getattr(one_way->u.pmq.q, &info); int32_t res = mq_getattr(one_way->u.pmq.q, &info);
if (res == 0) { if (res == 0) {
return info.mq_curmsgs; return info.mq_curmsgs;
} }
return -errno; return -errno;
} }
void qb_ipcs_pmq_init(struct qb_ipcs_service * s) void
qb_ipcs_pmq_init(struct qb_ipcs_service *s)
{ {
s->funcs.connect = qb_ipcs_pmq_connect; s->funcs.connect = qb_ipcs_pmq_connect;
s->funcs.disconnect = qb_ipcs_pmq_disconnect; s->funcs.disconnect = qb_ipcs_pmq_disconnect;
s->funcs.recv = qb_ipc_pmq_recv; s->funcs.recv = qb_ipc_pmq_recv;
s->funcs.send = qb_ipc_pmq_send; s->funcs.send = qb_ipc_pmq_send;
s->funcs.sendv = qb_ipc_pmq_sendv; s->funcs.sendv = qb_ipc_pmq_sendv;
s->funcs.peek = NULL; s->funcs.peek = NULL;
s->funcs.reclaim = NULL; s->funcs.reclaim = NULL;
 End of changes. 21 change blocks. 
43 lines changed or deleted 51 lines changed or added

This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/