ipc_posix_mq.c | ipc_posix_mq.c | |||
---|---|---|---|---|
skipping to change at line 44 | skipping to change at line 44 | |||
static size_t q_space_used = 0; | static size_t q_space_used = 0; | |||
#ifdef QB_LINUX | #ifdef QB_LINUX | |||
#define QB_RLIMIT_CHANGE_NEEDED 1 | #define QB_RLIMIT_CHANGE_NEEDED 1 | |||
#endif /* QB_LINUX */ | #endif /* QB_LINUX */ | |||
/* | /* | |||
* utility functions | * utility functions | |||
* -------------------------------------------------------- | * -------------------------------------------------------- | |||
*/ | */ | |||
static int32_t posix_mq_increase_limits(size_t max_msg_size, int32_t q_len) | static int32_t | |||
posix_mq_increase_limits(size_t max_msg_size, int32_t q_len) | ||||
{ | { | |||
int32_t res = 0; | int32_t res = 0; | |||
#ifdef QB_RLIMIT_CHANGE_NEEDED | #ifdef QB_RLIMIT_CHANGE_NEEDED | |||
struct rlimit rlim; | struct rlimit rlim; | |||
size_t q_space_needed; | size_t q_space_needed; | |||
#endif /* QB_RLIMIT_CHANGE_NEEDED */ | #endif /* QB_RLIMIT_CHANGE_NEEDED */ | |||
#ifdef QB_RLIMIT_CHANGE_NEEDED | #ifdef QB_RLIMIT_CHANGE_NEEDED | |||
if (getrlimit(RLIMIT_MSGQUEUE, &rlim) != 0) { | if (getrlimit(RLIMIT_MSGQUEUE, &rlim) != 0) { | |||
res = -errno; | res = -errno; | |||
skipping to change at line 77 | skipping to change at line 78 | |||
rlim.rlim_max = q_space_needed; | rlim.rlim_max = q_space_needed; | |||
} | } | |||
if (setrlimit(RLIMIT_MSGQUEUE, &rlim) != 0) { | if (setrlimit(RLIMIT_MSGQUEUE, &rlim) != 0) { | |||
res = -errno; | res = -errno; | |||
qb_util_log(LOG_ERR, "setrlimit failed"); | qb_util_log(LOG_ERR, "setrlimit failed"); | |||
} | } | |||
#endif /* QB_RLIMIT_CHANGE_NEEDED */ | #endif /* QB_RLIMIT_CHANGE_NEEDED */ | |||
return res; | return res; | |||
} | } | |||
static int32_t posix_mq_open(struct qb_ipc_one_way *one_way, | static int32_t | |||
const char *name, size_t q_len) | posix_mq_open(struct qb_ipc_one_way *one_way, const char *name, size_t q_le | |||
n) | ||||
{ | { | |||
int32_t res = posix_mq_increase_limits(one_way->max_msg_size, q_len) ; | int32_t res = posix_mq_increase_limits(one_way->max_msg_size, q_len) ; | |||
if (res != 0) { | if (res != 0) { | |||
return res; | return res; | |||
} | } | |||
one_way->u.pmq.q = mq_open(name, O_RDWR); | one_way->u.pmq.q = mq_open(name, O_RDWR); | |||
if (one_way->u.pmq.q == (mqd_t) - 1) { | if (one_way->u.pmq.q == (mqd_t) - 1) { | |||
res = -errno; | res = -errno; | |||
perror("mq_open"); | perror("mq_open"); | |||
return res; | return res; | |||
} | } | |||
strcpy(one_way->u.pmq.name, name); | strcpy(one_way->u.pmq.name, name); | |||
q_space_used += one_way->max_msg_size * q_len; | q_space_used += one_way->max_msg_size * q_len; | |||
return 0; | return 0; | |||
} | } | |||
static int32_t posix_mq_create(struct qb_ipcs_connection *c, | static int32_t | |||
struct qb_ipc_one_way *one_way, | posix_mq_create(struct qb_ipcs_connection *c, | |||
const char *name, size_t q_len) | struct qb_ipc_one_way *one_way, const char *name, size_t q_l | |||
en) | ||||
{ | { | |||
struct mq_attr attr; | struct mq_attr attr; | |||
mqd_t q = 0; | mqd_t q = 0; | |||
int32_t res = 0; | int32_t res = 0; | |||
mode_t m = 0600; | mode_t m = 0600; | |||
size_t max_msg_size = one_way->max_msg_size; | size_t max_msg_size = one_way->max_msg_size; | |||
res = posix_mq_increase_limits(max_msg_size, q_len); | res = posix_mq_increase_limits(max_msg_size, q_len); | |||
if (res != 0) { | if (res != 0) { | |||
return res; | return res; | |||
skipping to change at line 126 | skipping to change at line 127 | |||
attr.mq_msgsize = max_msg_size; | attr.mq_msgsize = max_msg_size; | |||
q = mq_open(name, O_RDWR | O_CREAT | O_EXCL | O_NONBLOCK, m, &attr); | q = mq_open(name, O_RDWR | O_CREAT | O_EXCL | O_NONBLOCK, m, &attr); | |||
if (q == (mqd_t) - 1 && errno == ENOMEM) { | if (q == (mqd_t) - 1 && errno == ENOMEM) { | |||
if (max_msg_size > 9000 && q_len > 3) { | if (max_msg_size > 9000 && q_len > 3) { | |||
goto try_smaller; | goto try_smaller; | |||
} | } | |||
} | } | |||
if (q == (mqd_t) - 1) { | if (q == (mqd_t) - 1) { | |||
res = -errno; | res = -errno; | |||
qb_util_log(LOG_ERR, "Can't create mq \"%s\": %s", | qb_util_perror(LOG_ERR, "Can't create mq \"%s\"", name); | |||
name, strerror(errno)); | ||||
return res; | return res; | |||
} | } | |||
q_space_used += max_msg_size * q_len; | q_space_used += max_msg_size * q_len; | |||
one_way->max_msg_size = max_msg_size; | one_way->max_msg_size = max_msg_size; | |||
one_way->u.pmq.q = q; | one_way->u.pmq.q = q; | |||
strcpy(one_way->u.pmq.name, name); | strcpy(one_way->u.pmq.name, name); | |||
res = fchown((int)q, c->euid, c->egid); | res = fchown((int)q, c->euid, c->egid); | |||
if (res == -1) { | if (res == -1) { | |||
res = -errno; | res = -errno; | |||
qb_util_log(LOG_ERR, "fchown:%s %s", name, strerror(errno)); | qb_util_perror(LOG_ERR, "fchown:%s", name); | |||
mq_close(q); | mq_close(q); | |||
mq_unlink(name); | mq_unlink(name); | |||
} | } | |||
return res; | return res; | |||
} | } | |||
static ssize_t qb_ipc_pmq_send(struct qb_ipc_one_way *one_way, | static ssize_t | |||
const void *msg_ptr, size_t msg_len) | qb_ipc_pmq_send(struct qb_ipc_one_way *one_way, | |||
const void *msg_ptr, size_t msg_len) | ||||
{ | { | |||
int32_t res = mq_send(one_way->u.pmq.q, msg_ptr, msg_len, 1); | int32_t res = mq_send(one_way->u.pmq.q, msg_ptr, msg_len, 1); | |||
if (res != 0) { | if (res != 0) { | |||
return -errno; | return -errno; | |||
} | } | |||
return msg_len; | return msg_len; | |||
} | } | |||
static ssize_t qb_ipc_pmq_sendv(struct qb_ipc_one_way *one_way, | static ssize_t | |||
const struct iovec* iov, | qb_ipc_pmq_sendv(struct qb_ipc_one_way *one_way, | |||
size_t iov_len) | const struct iovec *iov, size_t iov_len) | |||
{ | { | |||
int32_t total_size = 0; | int32_t total_size = 0; | |||
int32_t i; | int32_t i; | |||
int32_t res = 0; | int32_t res = 0; | |||
char *data = NULL; | char *data = NULL; | |||
char *pt = NULL; | char *pt = NULL; | |||
for (i = 0; i < iov_len; i++) { | for (i = 0; i < iov_len; i++) { | |||
total_size += iov[i].iov_len; | total_size += iov[i].iov_len; | |||
} | } | |||
data = malloc(total_size); | data = malloc(total_size); | |||
if (data == NULL) { | ||||
return -ENOMEM; | ||||
} | ||||
pt = data; | pt = data; | |||
for (i = 0; i < iov_len; i++) { | for (i = 0; i < iov_len; i++) { | |||
memcpy(pt, iov[i].iov_base, iov[i].iov_len); | memcpy(pt, iov[i].iov_base, iov[i].iov_len); | |||
pt += iov[i].iov_len; | pt += iov[i].iov_len; | |||
} | } | |||
res = mq_send(one_way->u.pmq.q, data, total_size, 1); | res = mq_send(one_way->u.pmq.q, data, total_size, 1); | |||
free(data); | free(data); | |||
if (res != 0) { | if (res != 0) { | |||
return -errno; | return -errno; | |||
} | } | |||
return total_size; | return total_size; | |||
} | } | |||
static ssize_t qb_ipc_pmq_recv(struct qb_ipc_one_way *one_way, | static ssize_t | |||
void *msg_ptr, | qb_ipc_pmq_recv(struct qb_ipc_one_way *one_way, | |||
size_t msg_len, | void *msg_ptr, size_t msg_len, int32_t ms_timeout) | |||
int32_t ms_timeout) | ||||
{ | { | |||
uint32_t msg_prio; | uint32_t msg_prio; | |||
struct timespec ts_timeout; | struct timespec ts_timeout; | |||
ssize_t res; | ssize_t res; | |||
if (ms_timeout >= 0) { | if (ms_timeout >= 0) { | |||
qb_util_timespec_from_epoch_get(&ts_timeout); | qb_util_timespec_from_epoch_get(&ts_timeout); | |||
qb_timespec_add_ms(&ts_timeout, ms_timeout); | qb_timespec_add_ms(&ts_timeout, ms_timeout); | |||
} | } | |||
mq_receive_again: | mq_receive_again: | |||
if (ms_timeout >= 0) { | if (ms_timeout >= 0) { | |||
res = mq_timedreceive(one_way->u.pmq.q, | res = mq_timedreceive(one_way->u.pmq.q, | |||
(char *)msg_ptr, | (char *)msg_ptr, | |||
one_way->max_msg_size, | one_way->max_msg_size, | |||
&msg_prio, | &msg_prio, &ts_timeout); | |||
&ts_timeout); | ||||
} else { | } else { | |||
res = mq_receive(one_way->u.pmq.q, | res = mq_receive(one_way->u.pmq.q, | |||
(char *)msg_ptr, | (char *)msg_ptr, | |||
one_way->max_msg_size, | one_way->max_msg_size, &msg_prio); | |||
&msg_prio); | ||||
} | } | |||
if (res == -1) { | if (res == -1) { | |||
switch (errno) { | switch (errno) { | |||
case EINTR: | case EINTR: | |||
goto mq_receive_again; | goto mq_receive_again; | |||
break; | break; | |||
case EAGAIN: | case EAGAIN: | |||
res = -ETIMEDOUT; | res = -ETIMEDOUT; | |||
break; | break; | |||
case ETIMEDOUT: | case ETIMEDOUT: | |||
res = -errno; | res = -errno; | |||
break; | break; | |||
default: | default: | |||
res = -errno; | res = -errno; | |||
qb_util_log(LOG_ERR, | qb_util_perror(LOG_ERR, | |||
"error waiting for mq_timedreceive : %s" | "error waiting for mq_timedreceive"); | |||
, | ||||
strerror(errno)); | ||||
break; | break; | |||
} | } | |||
} | } | |||
return res; | return res; | |||
} | } | |||
/* | /* | |||
* client functions | * client functions | |||
* -------------------------------------------------------- | * -------------------------------------------------------- | |||
*/ | */ | |||
static void qb_ipcc_pmq_disconnect(struct qb_ipcc_connection *c) | static void | |||
qb_ipcc_pmq_disconnect(struct qb_ipcc_connection *c) | ||||
{ | { | |||
struct qb_ipc_request_header hdr; | struct qb_ipc_request_header hdr; | |||
qb_util_log(LOG_DEBUG, "%s()", __func__); | qb_util_log(LOG_DEBUG, "%s()", __func__); | |||
if (c->needs_sock_for_poll) { | if (c->needs_sock_for_poll) { | |||
return; | return; | |||
} | } | |||
hdr.id = QB_IPC_MSG_DISCONNECT; | hdr.id = QB_IPC_MSG_DISCONNECT; | |||
hdr.size = sizeof(hdr); | hdr.size = sizeof(hdr); | |||
skipping to change at line 262 | skipping to change at line 263 | |||
mq_close(c->event.u.pmq.q); | mq_close(c->event.u.pmq.q); | |||
mq_close(c->response.u.pmq.q); | mq_close(c->response.u.pmq.q); | |||
mq_close(c->request.u.pmq.q); | mq_close(c->request.u.pmq.q); | |||
mq_unlink(c->event.u.pmq.name); | mq_unlink(c->event.u.pmq.name); | |||
mq_unlink(c->request.u.pmq.name); | mq_unlink(c->request.u.pmq.name); | |||
mq_unlink(c->response.u.pmq.name); | mq_unlink(c->response.u.pmq.name); | |||
} | } | |||
int32_t qb_ipcc_pmq_connect(struct qb_ipcc_connection *c, | int32_t | |||
struct qb_ipc_connection_response *response) | qb_ipcc_pmq_connect(struct qb_ipcc_connection *c, | |||
struct qb_ipc_connection_response *response) | ||||
{ | { | |||
int32_t res = 0; | int32_t res = 0; | |||
c->funcs.send = qb_ipc_pmq_send; | c->funcs.send = qb_ipc_pmq_send; | |||
c->funcs.sendv = qb_ipc_pmq_sendv; | c->funcs.sendv = qb_ipc_pmq_sendv; | |||
c->funcs.recv = qb_ipc_pmq_recv; | c->funcs.recv = qb_ipc_pmq_recv; | |||
c->funcs.fc_get = NULL; | c->funcs.fc_get = NULL; | |||
c->funcs.disconnect = qb_ipcc_pmq_disconnect; | c->funcs.disconnect = qb_ipcc_pmq_disconnect; | |||
#if defined(QB_LINUX) || defined(QB_BSD) | #if defined(QB_LINUX) || defined(QB_BSD) | |||
c->needs_sock_for_poll = QB_FALSE; | c->needs_sock_for_poll = QB_FALSE; | |||
#else | #else | |||
c->needs_sock_for_poll = QB_TRUE; | c->needs_sock_for_poll = QB_TRUE; | |||
#endif | #endif | |||
if (strlen(c->name) > (NAME_MAX - 20)) { | if (strlen(c->name) > (NAME_MAX - 20)) { | |||
return -EINVAL; | return -EINVAL; | |||
} | } | |||
res = posix_mq_open(&c->request, response->request, | res = posix_mq_open(&c->request, response->request, QB_REQUEST_Q_LEN | |||
QB_REQUEST_Q_LEN); | ); | |||
if (res != 0) { | if (res != 0) { | |||
perror("mq_open:REQUEST"); | perror("mq_open:REQUEST"); | |||
return res; | return res; | |||
} | } | |||
res = posix_mq_open(&c->response, response->response, | res = posix_mq_open(&c->response, response->response, | |||
QB_RESPONSE_Q_LEN); | QB_RESPONSE_Q_LEN); | |||
if (res != 0) { | if (res != 0) { | |||
perror("mq_open:RESPONSE"); | perror("mq_open:RESPONSE"); | |||
goto cleanup_request; | goto cleanup_request; | |||
} | } | |||
skipping to change at line 316 | skipping to change at line 317 | |||
mq_close(c->request.u.pmq.q); | mq_close(c->request.u.pmq.q); | |||
return res; | return res; | |||
} | } | |||
/* | /* | |||
* service functions | * service functions | |||
* -------------------------------------------------------- | * -------------------------------------------------------- | |||
*/ | */ | |||
static void qb_ipcs_pmq_disconnect(struct qb_ipcs_connection *c) | static void | |||
qb_ipcs_pmq_disconnect(struct qb_ipcs_connection *c) | ||||
{ | { | |||
struct qb_ipc_response_header msg; | struct qb_ipc_response_header msg; | |||
msg.id = QB_IPC_MSG_DISCONNECT; | msg.id = QB_IPC_MSG_DISCONNECT; | |||
msg.size = sizeof(msg); | msg.size = sizeof(msg); | |||
msg.error = 0; | msg.error = 0; | |||
(void)qb_ipc_pmq_send(&c->event, &msg, msg.size); | (void)qb_ipc_pmq_send(&c->event, &msg, msg.size); | |||
mq_close(c->event.u.pmq.q); | mq_close(c->event.u.pmq.q); | |||
mq_close(c->response.u.pmq.q); | mq_close(c->response.u.pmq.q); | |||
mq_close(c->request.u.pmq.q); | mq_close(c->request.u.pmq.q); | |||
mq_unlink(c->event.u.pmq.name); | mq_unlink(c->event.u.pmq.name); | |||
mq_unlink(c->request.u.pmq.name); | mq_unlink(c->request.u.pmq.name); | |||
mq_unlink(c->response.u.pmq.name); | mq_unlink(c->response.u.pmq.name); | |||
} | } | |||
static int32_t qb_ipcs_pmq_connect(struct qb_ipcs_service *s, | static int32_t | |||
struct qb_ipcs_connection *c, | qb_ipcs_pmq_connect(struct qb_ipcs_service *s, | |||
struct qb_ipc_connection_response *r) | struct qb_ipcs_connection *c, | |||
struct qb_ipc_connection_response *r) | ||||
{ | { | |||
int32_t res = 0; | int32_t res = 0; | |||
snprintf(r->request, NAME_MAX, "/%s-request-%d", s->name, c->pid); | snprintf(r->request, NAME_MAX, "/%s-request-%d", s->name, c->pid); | |||
snprintf(r->response, NAME_MAX, "/%s-response-%d", s->name, c->pid); | snprintf(r->response, NAME_MAX, "/%s-response-%d", s->name, c->pid); | |||
snprintf(r->event, NAME_MAX, "/%s-event-%d", s->name, c->pid); | snprintf(r->event, NAME_MAX, "/%s-event-%d", s->name, c->pid); | |||
res = posix_mq_create(c, &c->request, r->request, QB_REQUEST_Q_LEN); | res = posix_mq_create(c, &c->request, r->request, QB_REQUEST_Q_LEN); | |||
if (res < 0) { | if (res < 0) { | |||
goto cleanup; | goto cleanup; | |||
skipping to change at line 361 | skipping to change at line 364 | |||
if (res < 0) { | if (res < 0) { | |||
goto cleanup_request; | goto cleanup_request; | |||
} | } | |||
res = posix_mq_create(c, &c->event, r->event, QB_EVENT_Q_LEN); | res = posix_mq_create(c, &c->event, r->event, QB_EVENT_Q_LEN); | |||
if (res < 0) { | if (res < 0) { | |||
goto cleanup_request_response; | goto cleanup_request_response; | |||
} | } | |||
if (!s->needs_sock_for_poll) { | if (!s->needs_sock_for_poll) { | |||
res = s->poll_fns.dispatch_add(s->poll_priority, (int32_t)c- | res = | |||
>request.u.pmq.q, | s->poll_fns.dispatch_add(s->poll_priority, | |||
POLLIN | POLLPRI | POLLNVAL, | (int32_t) c->request.u.pmq.q, | |||
c, qb_ipcs_dispatch_service_r | POLLIN | POLLPRI | POLLNVAL, c, | |||
equest); | qb_ipcs_dispatch_service_reques | |||
t); | ||||
} | } | |||
r->hdr.error = 0; | r->hdr.error = 0; | |||
return res; | return res; | |||
cleanup_request_response: | cleanup_request_response: | |||
mq_close(c->response.u.pmq.q); | mq_close(c->response.u.pmq.q); | |||
mq_unlink(r->response); | mq_unlink(r->response); | |||
cleanup_request: | cleanup_request: | |||
mq_close(c->request.u.pmq.q); | mq_close(c->request.u.pmq.q); | |||
mq_unlink(r->request); | mq_unlink(r->request); | |||
cleanup: | cleanup: | |||
r->hdr.error = res; | r->hdr.error = res; | |||
return res; | return res; | |||
} | } | |||
static ssize_t qb_ipc_pmq_q_len_get(struct qb_ipc_one_way *one_way) | static ssize_t | |||
qb_ipc_pmq_q_len_get(struct qb_ipc_one_way *one_way) | ||||
{ | { | |||
struct mq_attr info; | struct mq_attr info; | |||
int32_t res = mq_getattr(one_way->u.pmq.q, &info); | int32_t res = mq_getattr(one_way->u.pmq.q, &info); | |||
if (res == 0) { | if (res == 0) { | |||
return info.mq_curmsgs; | return info.mq_curmsgs; | |||
} | } | |||
return -errno; | return -errno; | |||
} | } | |||
void qb_ipcs_pmq_init(struct qb_ipcs_service * s) | void | |||
qb_ipcs_pmq_init(struct qb_ipcs_service *s) | ||||
{ | { | |||
s->funcs.connect = qb_ipcs_pmq_connect; | s->funcs.connect = qb_ipcs_pmq_connect; | |||
s->funcs.disconnect = qb_ipcs_pmq_disconnect; | s->funcs.disconnect = qb_ipcs_pmq_disconnect; | |||
s->funcs.recv = qb_ipc_pmq_recv; | s->funcs.recv = qb_ipc_pmq_recv; | |||
s->funcs.send = qb_ipc_pmq_send; | s->funcs.send = qb_ipc_pmq_send; | |||
s->funcs.sendv = qb_ipc_pmq_sendv; | s->funcs.sendv = qb_ipc_pmq_sendv; | |||
s->funcs.peek = NULL; | s->funcs.peek = NULL; | |||
s->funcs.reclaim = NULL; | s->funcs.reclaim = NULL; | |||
End of changes. 21 change blocks. | ||||
43 lines changed or deleted | 51 lines changed or added | |||
This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/ |