| ipc_posix_mq.c | ipc_posix_mq.c | |||
|---|---|---|---|---|
| skipping to change at line 44 | skipping to change at line 44 | |||
| static size_t q_space_used = 0; | static size_t q_space_used = 0; | |||
| #ifdef QB_LINUX | #ifdef QB_LINUX | |||
| #define QB_RLIMIT_CHANGE_NEEDED 1 | #define QB_RLIMIT_CHANGE_NEEDED 1 | |||
| #endif /* QB_LINUX */ | #endif /* QB_LINUX */ | |||
| /* | /* | |||
| * utility functions | * utility functions | |||
| * -------------------------------------------------------- | * -------------------------------------------------------- | |||
| */ | */ | |||
| static int32_t posix_mq_increase_limits(size_t max_msg_size, int32_t q_len) | static int32_t | |||
| posix_mq_increase_limits(size_t max_msg_size, int32_t q_len) | ||||
| { | { | |||
| int32_t res = 0; | int32_t res = 0; | |||
| #ifdef QB_RLIMIT_CHANGE_NEEDED | #ifdef QB_RLIMIT_CHANGE_NEEDED | |||
| struct rlimit rlim; | struct rlimit rlim; | |||
| size_t q_space_needed; | size_t q_space_needed; | |||
| #endif /* QB_RLIMIT_CHANGE_NEEDED */ | #endif /* QB_RLIMIT_CHANGE_NEEDED */ | |||
| #ifdef QB_RLIMIT_CHANGE_NEEDED | #ifdef QB_RLIMIT_CHANGE_NEEDED | |||
| if (getrlimit(RLIMIT_MSGQUEUE, &rlim) != 0) { | if (getrlimit(RLIMIT_MSGQUEUE, &rlim) != 0) { | |||
| res = -errno; | res = -errno; | |||
| skipping to change at line 77 | skipping to change at line 78 | |||
| rlim.rlim_max = q_space_needed; | rlim.rlim_max = q_space_needed; | |||
| } | } | |||
| if (setrlimit(RLIMIT_MSGQUEUE, &rlim) != 0) { | if (setrlimit(RLIMIT_MSGQUEUE, &rlim) != 0) { | |||
| res = -errno; | res = -errno; | |||
| qb_util_log(LOG_ERR, "setrlimit failed"); | qb_util_log(LOG_ERR, "setrlimit failed"); | |||
| } | } | |||
| #endif /* QB_RLIMIT_CHANGE_NEEDED */ | #endif /* QB_RLIMIT_CHANGE_NEEDED */ | |||
| return res; | return res; | |||
| } | } | |||
| static int32_t posix_mq_open(struct qb_ipc_one_way *one_way, | static int32_t | |||
| const char *name, size_t q_len) | posix_mq_open(struct qb_ipc_one_way *one_way, const char *name, size_t q_le | |||
| n) | ||||
| { | { | |||
| int32_t res = posix_mq_increase_limits(one_way->max_msg_size, q_len) ; | int32_t res = posix_mq_increase_limits(one_way->max_msg_size, q_len) ; | |||
| if (res != 0) { | if (res != 0) { | |||
| return res; | return res; | |||
| } | } | |||
| one_way->u.pmq.q = mq_open(name, O_RDWR); | one_way->u.pmq.q = mq_open(name, O_RDWR); | |||
| if (one_way->u.pmq.q == (mqd_t) - 1) { | if (one_way->u.pmq.q == (mqd_t) - 1) { | |||
| res = -errno; | res = -errno; | |||
| perror("mq_open"); | perror("mq_open"); | |||
| return res; | return res; | |||
| } | } | |||
| strcpy(one_way->u.pmq.name, name); | strcpy(one_way->u.pmq.name, name); | |||
| q_space_used += one_way->max_msg_size * q_len; | q_space_used += one_way->max_msg_size * q_len; | |||
| return 0; | return 0; | |||
| } | } | |||
| static int32_t posix_mq_create(struct qb_ipcs_connection *c, | static int32_t | |||
| struct qb_ipc_one_way *one_way, | posix_mq_create(struct qb_ipcs_connection *c, | |||
| const char *name, size_t q_len) | struct qb_ipc_one_way *one_way, const char *name, size_t q_l | |||
| en) | ||||
| { | { | |||
| struct mq_attr attr; | struct mq_attr attr; | |||
| mqd_t q = 0; | mqd_t q = 0; | |||
| int32_t res = 0; | int32_t res = 0; | |||
| mode_t m = 0600; | mode_t m = 0600; | |||
| size_t max_msg_size = one_way->max_msg_size; | size_t max_msg_size = one_way->max_msg_size; | |||
| res = posix_mq_increase_limits(max_msg_size, q_len); | res = posix_mq_increase_limits(max_msg_size, q_len); | |||
| if (res != 0) { | if (res != 0) { | |||
| return res; | return res; | |||
| skipping to change at line 126 | skipping to change at line 127 | |||
| attr.mq_msgsize = max_msg_size; | attr.mq_msgsize = max_msg_size; | |||
| q = mq_open(name, O_RDWR | O_CREAT | O_EXCL | O_NONBLOCK, m, &attr); | q = mq_open(name, O_RDWR | O_CREAT | O_EXCL | O_NONBLOCK, m, &attr); | |||
| if (q == (mqd_t) - 1 && errno == ENOMEM) { | if (q == (mqd_t) - 1 && errno == ENOMEM) { | |||
| if (max_msg_size > 9000 && q_len > 3) { | if (max_msg_size > 9000 && q_len > 3) { | |||
| goto try_smaller; | goto try_smaller; | |||
| } | } | |||
| } | } | |||
| if (q == (mqd_t) - 1) { | if (q == (mqd_t) - 1) { | |||
| res = -errno; | res = -errno; | |||
| qb_util_log(LOG_ERR, "Can't create mq \"%s\": %s", | qb_util_perror(LOG_ERR, "Can't create mq \"%s\"", name); | |||
| name, strerror(errno)); | ||||
| return res; | return res; | |||
| } | } | |||
| q_space_used += max_msg_size * q_len; | q_space_used += max_msg_size * q_len; | |||
| one_way->max_msg_size = max_msg_size; | one_way->max_msg_size = max_msg_size; | |||
| one_way->u.pmq.q = q; | one_way->u.pmq.q = q; | |||
| strcpy(one_way->u.pmq.name, name); | strcpy(one_way->u.pmq.name, name); | |||
| res = fchown((int)q, c->euid, c->egid); | res = fchown((int)q, c->euid, c->egid); | |||
| if (res == -1) { | if (res == -1) { | |||
| res = -errno; | res = -errno; | |||
| qb_util_log(LOG_ERR, "fchown:%s %s", name, strerror(errno)); | qb_util_perror(LOG_ERR, "fchown:%s", name); | |||
| mq_close(q); | mq_close(q); | |||
| mq_unlink(name); | mq_unlink(name); | |||
| } | } | |||
| return res; | return res; | |||
| } | } | |||
| static ssize_t qb_ipc_pmq_send(struct qb_ipc_one_way *one_way, | static ssize_t | |||
| const void *msg_ptr, size_t msg_len) | qb_ipc_pmq_send(struct qb_ipc_one_way *one_way, | |||
| const void *msg_ptr, size_t msg_len) | ||||
| { | { | |||
| int32_t res = mq_send(one_way->u.pmq.q, msg_ptr, msg_len, 1); | int32_t res = mq_send(one_way->u.pmq.q, msg_ptr, msg_len, 1); | |||
| if (res != 0) { | if (res != 0) { | |||
| return -errno; | return -errno; | |||
| } | } | |||
| return msg_len; | return msg_len; | |||
| } | } | |||
| static ssize_t qb_ipc_pmq_sendv(struct qb_ipc_one_way *one_way, | static ssize_t | |||
| const struct iovec* iov, | qb_ipc_pmq_sendv(struct qb_ipc_one_way *one_way, | |||
| size_t iov_len) | const struct iovec *iov, size_t iov_len) | |||
| { | { | |||
| int32_t total_size = 0; | int32_t total_size = 0; | |||
| int32_t i; | int32_t i; | |||
| int32_t res = 0; | int32_t res = 0; | |||
| char *data = NULL; | char *data = NULL; | |||
| char *pt = NULL; | char *pt = NULL; | |||
| for (i = 0; i < iov_len; i++) { | for (i = 0; i < iov_len; i++) { | |||
| total_size += iov[i].iov_len; | total_size += iov[i].iov_len; | |||
| } | } | |||
| data = malloc(total_size); | data = malloc(total_size); | |||
| if (data == NULL) { | ||||
| return -ENOMEM; | ||||
| } | ||||
| pt = data; | pt = data; | |||
| for (i = 0; i < iov_len; i++) { | for (i = 0; i < iov_len; i++) { | |||
| memcpy(pt, iov[i].iov_base, iov[i].iov_len); | memcpy(pt, iov[i].iov_base, iov[i].iov_len); | |||
| pt += iov[i].iov_len; | pt += iov[i].iov_len; | |||
| } | } | |||
| res = mq_send(one_way->u.pmq.q, data, total_size, 1); | res = mq_send(one_way->u.pmq.q, data, total_size, 1); | |||
| free(data); | free(data); | |||
| if (res != 0) { | if (res != 0) { | |||
| return -errno; | return -errno; | |||
| } | } | |||
| return total_size; | return total_size; | |||
| } | } | |||
| static ssize_t qb_ipc_pmq_recv(struct qb_ipc_one_way *one_way, | static ssize_t | |||
| void *msg_ptr, | qb_ipc_pmq_recv(struct qb_ipc_one_way *one_way, | |||
| size_t msg_len, | void *msg_ptr, size_t msg_len, int32_t ms_timeout) | |||
| int32_t ms_timeout) | ||||
| { | { | |||
| uint32_t msg_prio; | uint32_t msg_prio; | |||
| struct timespec ts_timeout; | struct timespec ts_timeout; | |||
| ssize_t res; | ssize_t res; | |||
| if (ms_timeout >= 0) { | if (ms_timeout >= 0) { | |||
| qb_util_timespec_from_epoch_get(&ts_timeout); | qb_util_timespec_from_epoch_get(&ts_timeout); | |||
| qb_timespec_add_ms(&ts_timeout, ms_timeout); | qb_timespec_add_ms(&ts_timeout, ms_timeout); | |||
| } | } | |||
| mq_receive_again: | mq_receive_again: | |||
| if (ms_timeout >= 0) { | if (ms_timeout >= 0) { | |||
| res = mq_timedreceive(one_way->u.pmq.q, | res = mq_timedreceive(one_way->u.pmq.q, | |||
| (char *)msg_ptr, | (char *)msg_ptr, | |||
| one_way->max_msg_size, | one_way->max_msg_size, | |||
| &msg_prio, | &msg_prio, &ts_timeout); | |||
| &ts_timeout); | ||||
| } else { | } else { | |||
| res = mq_receive(one_way->u.pmq.q, | res = mq_receive(one_way->u.pmq.q, | |||
| (char *)msg_ptr, | (char *)msg_ptr, | |||
| one_way->max_msg_size, | one_way->max_msg_size, &msg_prio); | |||
| &msg_prio); | ||||
| } | } | |||
| if (res == -1) { | if (res == -1) { | |||
| switch (errno) { | switch (errno) { | |||
| case EINTR: | case EINTR: | |||
| goto mq_receive_again; | goto mq_receive_again; | |||
| break; | break; | |||
| case EAGAIN: | case EAGAIN: | |||
| res = -ETIMEDOUT; | res = -ETIMEDOUT; | |||
| break; | break; | |||
| case ETIMEDOUT: | case ETIMEDOUT: | |||
| res = -errno; | res = -errno; | |||
| break; | break; | |||
| default: | default: | |||
| res = -errno; | res = -errno; | |||
| qb_util_log(LOG_ERR, | qb_util_perror(LOG_ERR, | |||
| "error waiting for mq_timedreceive : %s" | "error waiting for mq_timedreceive"); | |||
| , | ||||
| strerror(errno)); | ||||
| break; | break; | |||
| } | } | |||
| } | } | |||
| return res; | return res; | |||
| } | } | |||
| /* | /* | |||
| * client functions | * client functions | |||
| * -------------------------------------------------------- | * -------------------------------------------------------- | |||
| */ | */ | |||
| static void qb_ipcc_pmq_disconnect(struct qb_ipcc_connection *c) | static void | |||
| qb_ipcc_pmq_disconnect(struct qb_ipcc_connection *c) | ||||
| { | { | |||
| struct qb_ipc_request_header hdr; | struct qb_ipc_request_header hdr; | |||
| qb_util_log(LOG_DEBUG, "%s()", __func__); | qb_util_log(LOG_DEBUG, "%s()", __func__); | |||
| if (c->needs_sock_for_poll) { | if (c->needs_sock_for_poll) { | |||
| return; | return; | |||
| } | } | |||
| hdr.id = QB_IPC_MSG_DISCONNECT; | hdr.id = QB_IPC_MSG_DISCONNECT; | |||
| hdr.size = sizeof(hdr); | hdr.size = sizeof(hdr); | |||
| skipping to change at line 262 | skipping to change at line 263 | |||
| mq_close(c->event.u.pmq.q); | mq_close(c->event.u.pmq.q); | |||
| mq_close(c->response.u.pmq.q); | mq_close(c->response.u.pmq.q); | |||
| mq_close(c->request.u.pmq.q); | mq_close(c->request.u.pmq.q); | |||
| mq_unlink(c->event.u.pmq.name); | mq_unlink(c->event.u.pmq.name); | |||
| mq_unlink(c->request.u.pmq.name); | mq_unlink(c->request.u.pmq.name); | |||
| mq_unlink(c->response.u.pmq.name); | mq_unlink(c->response.u.pmq.name); | |||
| } | } | |||
| int32_t qb_ipcc_pmq_connect(struct qb_ipcc_connection *c, | int32_t | |||
| struct qb_ipc_connection_response *response) | qb_ipcc_pmq_connect(struct qb_ipcc_connection *c, | |||
| struct qb_ipc_connection_response *response) | ||||
| { | { | |||
| int32_t res = 0; | int32_t res = 0; | |||
| c->funcs.send = qb_ipc_pmq_send; | c->funcs.send = qb_ipc_pmq_send; | |||
| c->funcs.sendv = qb_ipc_pmq_sendv; | c->funcs.sendv = qb_ipc_pmq_sendv; | |||
| c->funcs.recv = qb_ipc_pmq_recv; | c->funcs.recv = qb_ipc_pmq_recv; | |||
| c->funcs.fc_get = NULL; | c->funcs.fc_get = NULL; | |||
| c->funcs.disconnect = qb_ipcc_pmq_disconnect; | c->funcs.disconnect = qb_ipcc_pmq_disconnect; | |||
| #if defined(QB_LINUX) || defined(QB_BSD) | #if defined(QB_LINUX) || defined(QB_BSD) | |||
| c->needs_sock_for_poll = QB_FALSE; | c->needs_sock_for_poll = QB_FALSE; | |||
| #else | #else | |||
| c->needs_sock_for_poll = QB_TRUE; | c->needs_sock_for_poll = QB_TRUE; | |||
| #endif | #endif | |||
| if (strlen(c->name) > (NAME_MAX - 20)) { | if (strlen(c->name) > (NAME_MAX - 20)) { | |||
| return -EINVAL; | return -EINVAL; | |||
| } | } | |||
| res = posix_mq_open(&c->request, response->request, | res = posix_mq_open(&c->request, response->request, QB_REQUEST_Q_LEN | |||
| QB_REQUEST_Q_LEN); | ); | |||
| if (res != 0) { | if (res != 0) { | |||
| perror("mq_open:REQUEST"); | perror("mq_open:REQUEST"); | |||
| return res; | return res; | |||
| } | } | |||
| res = posix_mq_open(&c->response, response->response, | res = posix_mq_open(&c->response, response->response, | |||
| QB_RESPONSE_Q_LEN); | QB_RESPONSE_Q_LEN); | |||
| if (res != 0) { | if (res != 0) { | |||
| perror("mq_open:RESPONSE"); | perror("mq_open:RESPONSE"); | |||
| goto cleanup_request; | goto cleanup_request; | |||
| } | } | |||
| skipping to change at line 316 | skipping to change at line 317 | |||
| mq_close(c->request.u.pmq.q); | mq_close(c->request.u.pmq.q); | |||
| return res; | return res; | |||
| } | } | |||
| /* | /* | |||
| * service functions | * service functions | |||
| * -------------------------------------------------------- | * -------------------------------------------------------- | |||
| */ | */ | |||
| static void qb_ipcs_pmq_disconnect(struct qb_ipcs_connection *c) | static void | |||
| qb_ipcs_pmq_disconnect(struct qb_ipcs_connection *c) | ||||
| { | { | |||
| struct qb_ipc_response_header msg; | struct qb_ipc_response_header msg; | |||
| msg.id = QB_IPC_MSG_DISCONNECT; | msg.id = QB_IPC_MSG_DISCONNECT; | |||
| msg.size = sizeof(msg); | msg.size = sizeof(msg); | |||
| msg.error = 0; | msg.error = 0; | |||
| (void)qb_ipc_pmq_send(&c->event, &msg, msg.size); | (void)qb_ipc_pmq_send(&c->event, &msg, msg.size); | |||
| mq_close(c->event.u.pmq.q); | mq_close(c->event.u.pmq.q); | |||
| mq_close(c->response.u.pmq.q); | mq_close(c->response.u.pmq.q); | |||
| mq_close(c->request.u.pmq.q); | mq_close(c->request.u.pmq.q); | |||
| mq_unlink(c->event.u.pmq.name); | mq_unlink(c->event.u.pmq.name); | |||
| mq_unlink(c->request.u.pmq.name); | mq_unlink(c->request.u.pmq.name); | |||
| mq_unlink(c->response.u.pmq.name); | mq_unlink(c->response.u.pmq.name); | |||
| } | } | |||
| static int32_t qb_ipcs_pmq_connect(struct qb_ipcs_service *s, | static int32_t | |||
| struct qb_ipcs_connection *c, | qb_ipcs_pmq_connect(struct qb_ipcs_service *s, | |||
| struct qb_ipc_connection_response *r) | struct qb_ipcs_connection *c, | |||
| struct qb_ipc_connection_response *r) | ||||
| { | { | |||
| int32_t res = 0; | int32_t res = 0; | |||
| snprintf(r->request, NAME_MAX, "/%s-request-%d", s->name, c->pid); | snprintf(r->request, NAME_MAX, "/%s-request-%d", s->name, c->pid); | |||
| snprintf(r->response, NAME_MAX, "/%s-response-%d", s->name, c->pid); | snprintf(r->response, NAME_MAX, "/%s-response-%d", s->name, c->pid); | |||
| snprintf(r->event, NAME_MAX, "/%s-event-%d", s->name, c->pid); | snprintf(r->event, NAME_MAX, "/%s-event-%d", s->name, c->pid); | |||
| res = posix_mq_create(c, &c->request, r->request, QB_REQUEST_Q_LEN); | res = posix_mq_create(c, &c->request, r->request, QB_REQUEST_Q_LEN); | |||
| if (res < 0) { | if (res < 0) { | |||
| goto cleanup; | goto cleanup; | |||
| skipping to change at line 361 | skipping to change at line 364 | |||
| if (res < 0) { | if (res < 0) { | |||
| goto cleanup_request; | goto cleanup_request; | |||
| } | } | |||
| res = posix_mq_create(c, &c->event, r->event, QB_EVENT_Q_LEN); | res = posix_mq_create(c, &c->event, r->event, QB_EVENT_Q_LEN); | |||
| if (res < 0) { | if (res < 0) { | |||
| goto cleanup_request_response; | goto cleanup_request_response; | |||
| } | } | |||
| if (!s->needs_sock_for_poll) { | if (!s->needs_sock_for_poll) { | |||
| res = s->poll_fns.dispatch_add(s->poll_priority, (int32_t)c- | res = | |||
| >request.u.pmq.q, | s->poll_fns.dispatch_add(s->poll_priority, | |||
| POLLIN | POLLPRI | POLLNVAL, | (int32_t) c->request.u.pmq.q, | |||
| c, qb_ipcs_dispatch_service_r | POLLIN | POLLPRI | POLLNVAL, c, | |||
| equest); | qb_ipcs_dispatch_service_reques | |||
| t); | ||||
| } | } | |||
| r->hdr.error = 0; | r->hdr.error = 0; | |||
| return res; | return res; | |||
| cleanup_request_response: | cleanup_request_response: | |||
| mq_close(c->response.u.pmq.q); | mq_close(c->response.u.pmq.q); | |||
| mq_unlink(r->response); | mq_unlink(r->response); | |||
| cleanup_request: | cleanup_request: | |||
| mq_close(c->request.u.pmq.q); | mq_close(c->request.u.pmq.q); | |||
| mq_unlink(r->request); | mq_unlink(r->request); | |||
| cleanup: | cleanup: | |||
| r->hdr.error = res; | r->hdr.error = res; | |||
| return res; | return res; | |||
| } | } | |||
| static ssize_t qb_ipc_pmq_q_len_get(struct qb_ipc_one_way *one_way) | static ssize_t | |||
| qb_ipc_pmq_q_len_get(struct qb_ipc_one_way *one_way) | ||||
| { | { | |||
| struct mq_attr info; | struct mq_attr info; | |||
| int32_t res = mq_getattr(one_way->u.pmq.q, &info); | int32_t res = mq_getattr(one_way->u.pmq.q, &info); | |||
| if (res == 0) { | if (res == 0) { | |||
| return info.mq_curmsgs; | return info.mq_curmsgs; | |||
| } | } | |||
| return -errno; | return -errno; | |||
| } | } | |||
| void qb_ipcs_pmq_init(struct qb_ipcs_service * s) | void | |||
| qb_ipcs_pmq_init(struct qb_ipcs_service *s) | ||||
| { | { | |||
| s->funcs.connect = qb_ipcs_pmq_connect; | s->funcs.connect = qb_ipcs_pmq_connect; | |||
| s->funcs.disconnect = qb_ipcs_pmq_disconnect; | s->funcs.disconnect = qb_ipcs_pmq_disconnect; | |||
| s->funcs.recv = qb_ipc_pmq_recv; | s->funcs.recv = qb_ipc_pmq_recv; | |||
| s->funcs.send = qb_ipc_pmq_send; | s->funcs.send = qb_ipc_pmq_send; | |||
| s->funcs.sendv = qb_ipc_pmq_sendv; | s->funcs.sendv = qb_ipc_pmq_sendv; | |||
| s->funcs.peek = NULL; | s->funcs.peek = NULL; | |||
| s->funcs.reclaim = NULL; | s->funcs.reclaim = NULL; | |||
| End of changes. 21 change blocks. | ||||
| 43 lines changed or deleted | 51 lines changed or added | |||
This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/ | ||||