| ipc_sysv_mq.c | ipc_sysv_mq.c | |||
|---|---|---|---|---|
| skipping to change at line 23 | skipping to change at line 23 | |||
| * libqb is distributed in the hope that it will be useful, | * libqb is distributed in the hope that it will be useful, | |||
| * but WITHOUT ANY WARRANTY; without even the implied warranty of | * but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |||
| * GNU Lesser General Public License for more details. | * GNU Lesser General Public License for more details. | |||
| * | * | |||
| * You should have received a copy of the GNU Lesser General Public License | * You should have received a copy of the GNU Lesser General Public License | |||
| * along with libqb. If not, see <http://www.gnu.org/licenses/>. | * along with libqb. If not, see <http://www.gnu.org/licenses/>. | |||
| */ | */ | |||
| #include "os_base.h" | #include "os_base.h" | |||
| #ifdef HAVE_SYS_IPC_H | ||||
| #include <sys/ipc.h> | #include <sys/ipc.h> | |||
| #endif | ||||
| #ifdef HAVE_SYS_MSG_H | ||||
| #include <sys/msg.h> | #include <sys/msg.h> | |||
| #endif | ||||
| #include <qb/qbdefs.h> | #include <qb/qbdefs.h> | |||
| #include <qb/qbloop.h> | #include <qb/qbloop.h> | |||
| #include "ipc_int.h" | #include "ipc_int.h" | |||
| #include "util_int.h" | #include "util_int.h" | |||
| #ifndef MSGMAX | #ifndef MSGMAX | |||
| #define MSGMAX 8192 | #define MSGMAX 8192 | |||
| #endif | #endif | |||
| #define MY_DATA_SIZE 8000 | #define MY_DATA_SIZE 8000 | |||
| struct my_msgbuf { | struct my_msgbuf { | |||
| int32_t id __attribute__ ((aligned(8))); | int32_t id __attribute__ ((aligned(8))); | |||
| char data[MY_DATA_SIZE] __attribute__ ((aligned(8))); | char data[MY_DATA_SIZE] __attribute__ ((aligned(8))); | |||
| } __attribute__ ((aligned(8))); | } __attribute__ ((aligned(8))); | |||
| /* | /* | |||
| * utility functions | * utility functions | |||
| * -------------------------------------------------------- | * -------------------------------------------------------- | |||
| */ | */ | |||
| static int32_t sysv_mq_unnamed_create(struct qb_ipcs_connection *c, | static int32_t | |||
| struct qb_ipc_one_way *queue) | sysv_mq_unnamed_create(struct qb_ipcs_connection *c, | |||
| struct qb_ipc_one_way *queue) | ||||
| { | { | |||
| struct msqid_ds info; | struct msqid_ds info; | |||
| int32_t res = 0; | int32_t res = 0; | |||
| retry_creating_the_q: | retry_creating_the_q: | |||
| queue->u.smq.key = random(); | queue->u.smq.key = random(); | |||
| queue->u.smq.q = | queue->u.smq.q = | |||
| msgget(queue->u.smq.key, | msgget(queue->u.smq.key, | |||
| IPC_CREAT | IPC_EXCL | IPC_NOWAIT | S_IWUSR | S_IRUSR | IPC_CREAT | IPC_EXCL | IPC_NOWAIT | S_IWUSR | S_IRUSR); | |||
| ); | ||||
| if (queue->u.smq.q == -1 && errno == EEXIST) { | if (queue->u.smq.q == -1 && errno == EEXIST) { | |||
| goto retry_creating_the_q; | goto retry_creating_the_q; | |||
| } else if (queue->u.smq.q == -1) { | } else if (queue->u.smq.q == -1) { | |||
| return -errno; | return -errno; | |||
| } | } | |||
| /* | /* | |||
| * change the queue size and change the ownership to that of | * change the queue size and change the ownership to that of | |||
| * the client so they can access it. | * the client so they can access it. | |||
| */ | */ | |||
| res = msgctl(queue->u.smq.q, IPC_STAT, &info); | res = msgctl(queue->u.smq.q, IPC_STAT, &info); | |||
| if (res != 0) { | if (res != 0) { | |||
| res = -errno; | res = -errno; | |||
| qb_util_log(LOG_ERR, "error getting sysv-mq info : %s", | qb_util_perror(LOG_ERR, "error getting sysv-mq info"); | |||
| strerror(errno)); | ||||
| return res; | return res; | |||
| } | } | |||
| if (info.msg_perm.uid != 0) { | if (info.msg_perm.uid != 0) { | |||
| qb_util_log(LOG_WARNING, | qb_util_log(LOG_WARNING, | |||
| "not enough privileges to increase msg_qbytes"); | "not enough privileges to increase msg_qbytes"); | |||
| return res; | return res; | |||
| } | } | |||
| info.msg_qbytes = 2 * queue->max_msg_size; | info.msg_qbytes = 2 * queue->max_msg_size; | |||
| info.msg_perm.uid = c->euid; | info.msg_perm.uid = c->euid; | |||
| info.msg_perm.gid = c->egid; | info.msg_perm.gid = c->egid; | |||
| res = msgctl(queue->u.smq.q, IPC_SET, &info); | res = msgctl(queue->u.smq.q, IPC_SET, &info); | |||
| if (res != 0) { | if (res != 0) { | |||
| res = -errno; | res = -errno; | |||
| qb_util_log(LOG_ERR, | qb_util_perror(LOG_ERR, | |||
| "error modifing the SYSV message queue : %s", | "error modifying the SYSV message queue"); | |||
| strerror(errno)); | ||||
| return res; | return res; | |||
| } | } | |||
| return 0; | return 0; | |||
| } | } | |||
| static int32_t sysv_split_and_send(int32_t q, const void *msg_ptr, | static int32_t | |||
| size_t msg_len, int32_t last_chunk) | sysv_split_and_send(int32_t q, const void *msg_ptr, | |||
| size_t msg_len, int32_t last_chunk) | ||||
| { | { | |||
| int32_t res; | int32_t res; | |||
| int32_t sent = 0; | int32_t sent = 0; | |||
| #ifdef PACK_MESSAGES | #ifdef PACK_MESSAGES | |||
| char *progress = (char *)msg_ptr; | char *progress = (char *)msg_ptr; | |||
| struct my_msgbuf buf; | struct my_msgbuf buf; | |||
| size_t to_send_now; /* to send in this message */ | size_t to_send_now; /* to send in this message */ | |||
| size_t to_send_next; /* to send in next message */ | size_t to_send_next; /* to send in next message */ | |||
| do { | do { | |||
| to_send_now = QB_MIN(msg_len - sent, MY_DATA_SIZE); | to_send_now = QB_MIN(msg_len - sent, MY_DATA_SIZE); | |||
| to_send_next = msg_len - (sent + to_send_now); | to_send_next = msg_len - (sent + to_send_now); | |||
| /* receiver used the ID to check to see if there | /* receiver used the ID to check to see if there | |||
| * is more to recieve for this message. | * is more to receive for this message. | |||
| */ | */ | |||
| if (last_chunk) { | if (last_chunk) { | |||
| buf.id = to_send_next + 1; | buf.id = to_send_next + 1; | |||
| } else { | } else { | |||
| buf.id = to_send_next + 1 + msg_len; | buf.id = to_send_next + 1 + msg_len; | |||
| } | } | |||
| memcpy(buf.data, progress, to_send_now); | memcpy(buf.data, progress, to_send_now); | |||
| res = msgsnd(q, &buf, to_send_now, IPC_NOWAIT); | res = msgsnd(q, &buf, to_send_now, IPC_NOWAIT); | |||
| if (res == 0) { | if (res == 0) { | |||
| sent += to_send_now; | sent += to_send_now; | |||
| skipping to change at line 143 | skipping to change at line 147 | |||
| if (res == -1) { | if (res == -1) { | |||
| return -errno; | return -errno; | |||
| } | } | |||
| return sent; | return sent; | |||
| } | } | |||
| /* | /* | |||
| * client functions | * client functions | |||
| * -------------------------------------------------------- | * -------------------------------------------------------- | |||
| */ | */ | |||
| static ssize_t qb_ipc_smq_send(struct qb_ipc_one_way *one_way, | static ssize_t | |||
| const void *msg_ptr, size_t msg_len) | qb_ipc_smq_send(struct qb_ipc_one_way *one_way, | |||
| const void *msg_ptr, size_t msg_len) | ||||
| { | { | |||
| return sysv_split_and_send(one_way->u.smq.q, msg_ptr, msg_len, QB_TR UE); | return sysv_split_and_send(one_way->u.smq.q, msg_ptr, msg_len, QB_TR UE); | |||
| } | } | |||
| static ssize_t qb_ipc_smq_sendv(struct qb_ipc_one_way *one_way, | static ssize_t | |||
| const struct iovec *iov, size_t iov_len) | qb_ipc_smq_sendv(struct qb_ipc_one_way *one_way, | |||
| const struct iovec *iov, size_t iov_len) | ||||
| { | { | |||
| int32_t res; | int32_t res; | |||
| int32_t sent = 0; | int32_t sent = 0; | |||
| struct my_msgbuf buf; | struct my_msgbuf buf; | |||
| int32_t i; | int32_t i; | |||
| for (i = 0; i < iov_len; i++) { | for (i = 0; i < iov_len; i++) { | |||
| if (iov[i].iov_len <= MY_DATA_SIZE) { | if (iov[i].iov_len <= MY_DATA_SIZE) { | |||
| if (i == iov_len-1) { | if (i == iov_len - 1) { | |||
| buf.id = 1; | buf.id = 1; | |||
| } else { | } else { | |||
| buf.id = i + iov[i].iov_len; | buf.id = i + iov[i].iov_len; | |||
| } | } | |||
| memcpy(buf.data, iov[i].iov_base, iov[i].iov_len); | memcpy(buf.data, iov[i].iov_base, iov[i].iov_len); | |||
| res = msgsnd(one_way->u.smq.q, &buf, iov[i].iov_len, | res = msgsnd(one_way->u.smq.q, | |||
| IPC_NOWAIT); | &buf, iov[i].iov_len, | |||
| IPC_NOWAIT); | ||||
| if (res == 0) { | if (res == 0) { | |||
| res = iov[i].iov_len; | res = iov[i].iov_len; | |||
| } else { | } else { | |||
| res = -errno; | res = -errno; | |||
| } | } | |||
| } else { | } else { | |||
| res = sysv_split_and_send(one_way->u.smq.q, iov[i].i | res = sysv_split_and_send(one_way->u.smq.q, | |||
| ov_base, | iov[i].iov_base, | |||
| iov[i].iov_len, (i == iov_ | iov[i].iov_len, | |||
| len-1)); | (i == iov_len - 1)); | |||
| } | } | |||
| if (res > 0) { | if (res > 0) { | |||
| sent += res; | sent += res; | |||
| } else { | } else { | |||
| return res; | return res; | |||
| } | } | |||
| } | } | |||
| return sent; | return sent; | |||
| } | } | |||
| static ssize_t qb_ipc_smq_recv(struct qb_ipc_one_way *one_way, | static ssize_t | |||
| void *msg_ptr, | qb_ipc_smq_recv(struct qb_ipc_one_way *one_way, | |||
| size_t msg_len, | void *msg_ptr, size_t msg_len, int32_t ms_timeout) | |||
| int32_t ms_timeout) | ||||
| { | { | |||
| ssize_t res; | ssize_t res; | |||
| ssize_t received = 0; | ssize_t received = 0; | |||
| #ifdef PACK_MESSAGES | #ifdef PACK_MESSAGES | |||
| char *progress = (char *)msg_ptr; | char *progress = (char *)msg_ptr; | |||
| struct my_msgbuf buf; | struct my_msgbuf buf; | |||
| do { | do { | |||
| try_again: | try_again: | |||
| res = msgrcv(one_way->u.smq.q, &buf, MY_DATA_SIZE, 0, IPC_NO | res = msgrcv(one_way->u.smq.q, &buf, | |||
| WAIT); | MY_DATA_SIZE, 0, IPC_NOWAIT); | |||
| if (res == -1 && errno == ENOMSG) { | if (res == -1 && errno == ENOMSG) { | |||
| goto try_again; | goto try_again; | |||
| } | } | |||
| //printf("res:%zd, ID:%d\n", res, buf.id); | //printf("res:%zd, ID:%d\n", res, buf.id); | |||
| if (res == -1) { | if (res == -1) { | |||
| goto return_status; | goto return_status; | |||
| } | } | |||
| memcpy(progress, buf.data, res); | memcpy(progress, buf.data, res); | |||
| received += res; | received += res; | |||
| progress += res; | progress += res; | |||
| } while (buf.id > 1); | } while (buf.id > 1); | |||
| return_status: | return_status: | |||
| #else | #else | |||
| res = msgrcv(one_way->u.smq.q, msg_ptr, msg_len, 0, IPC_NOWAIT); | res = msgrcv(one_way->u.smq.q, msg_ptr, msg_len, 0, IPC_NOWAIT); | |||
| received = res; | received = res; | |||
| #endif | #endif | |||
| if (res == -1 && errno == ENOMSG) { | if (res == -1 && errno == ENOMSG) { | |||
| /* just to be consistent with other IPC types. | /* just to be consistent with other IPC types. | |||
| */ | */ | |||
| return -EAGAIN; | return -EAGAIN; | |||
| } | } | |||
| if (res == -1) { | if (res == -1) { | |||
| perror(__func__); | perror(__func__); | |||
| return -errno; | return -errno; | |||
| } | } | |||
| return received; | return received; | |||
| } | } | |||
| static void qb_ipcc_smq_disconnect(struct qb_ipcc_connection *c) | static void | |||
| qb_ipcc_smq_disconnect(struct qb_ipcc_connection *c) | ||||
| { | { | |||
| struct qb_ipc_request_header hdr; | struct qb_ipc_request_header hdr; | |||
| qb_util_log(LOG_DEBUG, "%s()", __func__); | qb_util_log(LOG_TRACE, "%s()", __func__); | |||
| //if (c->needs_sock_for_poll) { | ||||
| // return; | ||||
| //} | ||||
| hdr.id = QB_IPC_MSG_DISCONNECT; | hdr.id = QB_IPC_MSG_DISCONNECT; | |||
| hdr.size = sizeof(hdr); | hdr.size = sizeof(hdr); | |||
| (void)sysv_split_and_send(c->request.u.smq.q, | (void)sysv_split_and_send(c->request.u.smq.q, | |||
| (const char *)&hdr, hdr.size, | (const char *)&hdr, hdr.size, QB_TRUE); | |||
| QB_TRUE); | ||||
| msgctl(c->event.u.smq.q, IPC_RMID, NULL); | msgctl(c->event.u.smq.q, IPC_RMID, NULL); | |||
| msgctl(c->response.u.smq.q, IPC_RMID, NULL); | msgctl(c->response.u.smq.q, IPC_RMID, NULL); | |||
| msgctl(c->request.u.smq.q, IPC_RMID, NULL); | msgctl(c->request.u.smq.q, IPC_RMID, NULL); | |||
| } | } | |||
| int32_t qb_ipcc_smq_connect(struct qb_ipcc_connection *c, | int32_t | |||
| struct qb_ipc_connection_response *response) | qb_ipcc_smq_connect(struct qb_ipcc_connection *c, | |||
| struct qb_ipc_connection_response *response) | ||||
| { | { | |||
| int32_t res = 0; | int32_t res = 0; | |||
| c->funcs.send = qb_ipc_smq_send; | c->funcs.send = qb_ipc_smq_send; | |||
| c->funcs.sendv = qb_ipc_smq_sendv; | c->funcs.sendv = qb_ipc_smq_sendv; | |||
| c->funcs.recv = qb_ipc_smq_recv; | c->funcs.recv = qb_ipc_smq_recv; | |||
| c->funcs.fc_get = NULL; | c->funcs.fc_get = NULL; | |||
| c->funcs.disconnect = qb_ipcc_smq_disconnect; | c->funcs.disconnect = qb_ipcc_smq_disconnect; | |||
| c->type = QB_IPC_SYSV_MQ; | c->type = QB_IPC_SYSV_MQ; | |||
| c->needs_sock_for_poll = QB_TRUE; | c->needs_sock_for_poll = QB_TRUE; | |||
| skipping to change at line 277 | skipping to change at line 285 | |||
| res = -errno; | res = -errno; | |||
| perror("msgget:REQUEST"); | perror("msgget:REQUEST"); | |||
| goto cleanup; | goto cleanup; | |||
| } | } | |||
| memcpy(&c->response.u.smq.key, response->response, sizeof(uint32_t)) ; | memcpy(&c->response.u.smq.key, response->response, sizeof(uint32_t)) ; | |||
| c->response.u.smq.q = msgget(c->response.u.smq.key, IPC_NOWAIT); | c->response.u.smq.q = msgget(c->response.u.smq.key, IPC_NOWAIT); | |||
| if (c->response.u.smq.q == -1) { | if (c->response.u.smq.q == -1) { | |||
| res = -errno; | res = -errno; | |||
| perror("msgget:RESPONSE"); | perror("msgget:RESPONSE"); | |||
| goto cleanup; | goto cleanup_request; | |||
| } | } | |||
| memcpy(&c->event.u.smq.key, response->event, sizeof(uint32_t)); | memcpy(&c->event.u.smq.key, response->event, sizeof(uint32_t)); | |||
| c->event.u.smq.q = msgget(c->event.u.smq.key, IPC_NOWAIT); | c->event.u.smq.q = msgget(c->event.u.smq.key, IPC_NOWAIT); | |||
| if (c->event.u.smq.q == -1) { | if (c->event.u.smq.q == -1) { | |||
| res = -errno; | res = -errno; | |||
| perror("msgget:EVENT"); | perror("msgget:EVENT"); | |||
| goto cleanup; | goto cleanup_request_response; | |||
| } | } | |||
| return 0; | ||||
| cleanup_request_response: | ||||
| msgctl(c->response.u.smq.q, IPC_RMID, NULL); | ||||
| cleanup_request: | ||||
| msgctl(c->request.u.smq.q, IPC_RMID, NULL); | ||||
| cleanup: | cleanup: | |||
| return res; | return res; | |||
| } | } | |||
| /* | /* | |||
| * service functions | * service functions | |||
| * -------------------------------------------------------- | * -------------------------------------------------------- | |||
| */ | */ | |||
| static void qb_ipcs_smq_disconnect(struct qb_ipcs_connection *c) | static void | |||
| qb_ipcs_smq_disconnect(struct qb_ipcs_connection *c) | ||||
| { | { | |||
| struct qb_ipc_response_header msg; | struct qb_ipc_response_header msg; | |||
| if (c->setup.u.us.sock != -1) { | if (c->setup.u.us.sock != -1) { | |||
| msg.id = QB_IPC_MSG_DISCONNECT; | msg.id = QB_IPC_MSG_DISCONNECT; | |||
| msg.size = sizeof(msg); | msg.size = sizeof(msg); | |||
| msg.error = 0; | msg.error = 0; | |||
| (void)qb_ipc_smq_send(&c->event, &msg, msg.size); | (void)qb_ipc_smq_send(&c->event, &msg, msg.size); | |||
| } else { | } else { | |||
| msgctl(c->event.u.smq.q, IPC_RMID, NULL); | msgctl(c->event.u.smq.q, IPC_RMID, NULL); | |||
| msgctl(c->response.u.smq.q, IPC_RMID, NULL); | msgctl(c->response.u.smq.q, IPC_RMID, NULL); | |||
| msgctl(c->request.u.smq.q, IPC_RMID, NULL); | msgctl(c->request.u.smq.q, IPC_RMID, NULL); | |||
| } | } | |||
| } | } | |||
| static int32_t qb_ipcs_smq_connect(struct qb_ipcs_service *s, | static int32_t | |||
| struct qb_ipcs_connection *c, | qb_ipcs_smq_connect(struct qb_ipcs_service *s, | |||
| struct qb_ipc_connection_response *r) | struct qb_ipcs_connection *c, | |||
| struct qb_ipc_connection_response *r) | ||||
| { | { | |||
| int32_t res = 0; | int32_t res = 0; | |||
| res = sysv_mq_unnamed_create(c, &c->request); | res = sysv_mq_unnamed_create(c, &c->request); | |||
| if (res < 0) { | if (res < 0) { | |||
| res = -errno; | res = -errno; | |||
| goto cleanup; | goto cleanup; | |||
| } | } | |||
| memcpy(r->request, &c->request.u.smq.key, sizeof(int32_t)); | memcpy(r->request, &c->request.u.smq.key, sizeof(int32_t)); | |||
| skipping to change at line 343 | skipping to change at line 358 | |||
| res = sysv_mq_unnamed_create(c, &c->event); | res = sysv_mq_unnamed_create(c, &c->event); | |||
| if (res < 0) { | if (res < 0) { | |||
| res = -errno; | res = -errno; | |||
| goto cleanup_request_response; | goto cleanup_request_response; | |||
| } | } | |||
| memcpy(r->event, &c->event.u.smq.key, sizeof(int32_t)); | memcpy(r->event, &c->event.u.smq.key, sizeof(int32_t)); | |||
| r->hdr.error = 0; | r->hdr.error = 0; | |||
| return 0; | return 0; | |||
| cleanup_request: | ||||
| msgctl(c->request.u.smq.q, IPC_RMID, NULL); | ||||
| cleanup_request_response: | cleanup_request_response: | |||
| msgctl(c->response.u.smq.q, IPC_RMID, NULL); | msgctl(c->response.u.smq.q, IPC_RMID, NULL); | |||
| cleanup_request: | ||||
| msgctl(c->request.u.smq.q, IPC_RMID, NULL); | ||||
| cleanup: | cleanup: | |||
| r->hdr.error = res; | r->hdr.error = res; | |||
| return res; | return res; | |||
| } | } | |||
| static ssize_t qb_ipc_smq_q_len_get(struct qb_ipc_one_way *one_way) | static ssize_t | |||
| qb_ipc_smq_q_len_get(struct qb_ipc_one_way *one_way) | ||||
| { | { | |||
| struct msqid_ds info; | struct msqid_ds info; | |||
| int32_t res = msgctl(one_way->u.smq.q, IPC_STAT, &info); | int32_t res = msgctl(one_way->u.smq.q, IPC_STAT, &info); | |||
| if (res == 0) { | if (res == 0) { | |||
| return info.msg_qnum; | return info.msg_qnum; | |||
| } | } | |||
| return -errno; | return -errno; | |||
| } | } | |||
| void qb_ipcs_smq_init(struct qb_ipcs_service *s) | void | |||
| qb_ipcs_smq_init(struct qb_ipcs_service *s) | ||||
| { | { | |||
| s->funcs.connect = qb_ipcs_smq_connect; | s->funcs.connect = qb_ipcs_smq_connect; | |||
| s->funcs.disconnect = qb_ipcs_smq_disconnect; | s->funcs.disconnect = qb_ipcs_smq_disconnect; | |||
| s->funcs.send = qb_ipc_smq_send; | s->funcs.send = qb_ipc_smq_send; | |||
| s->funcs.sendv = qb_ipc_smq_sendv; | s->funcs.sendv = qb_ipc_smq_sendv; | |||
| s->funcs.recv = qb_ipc_smq_recv; | s->funcs.recv = qb_ipc_smq_recv; | |||
| s->funcs.peek = NULL; | s->funcs.peek = NULL; | |||
| s->funcs.reclaim = NULL; | s->funcs.reclaim = NULL; | |||
| End of changes. 31 change blocks. | ||||
| 50 lines changed or deleted | 62 lines changed or added | |||
This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/ | ||||