ipc_sysv_mq.c | ipc_sysv_mq.c | |||
---|---|---|---|---|
skipping to change at line 23 | skipping to change at line 23 | |||
* libqb is distributed in the hope that it will be useful, | * libqb is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | * but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |||
* GNU Lesser General Public License for more details. | * GNU Lesser General Public License for more details. | |||
* | * | |||
* You should have received a copy of the GNU Lesser General Public License | * You should have received a copy of the GNU Lesser General Public License | |||
* along with libqb. If not, see <http://www.gnu.org/licenses/>. | * along with libqb. If not, see <http://www.gnu.org/licenses/>. | |||
*/ | */ | |||
#include "os_base.h" | #include "os_base.h" | |||
#ifdef HAVE_SYS_IPC_H | ||||
#include <sys/ipc.h> | #include <sys/ipc.h> | |||
#endif | ||||
#ifdef HAVE_SYS_MSG_H | ||||
#include <sys/msg.h> | #include <sys/msg.h> | |||
#endif | ||||
#include <qb/qbdefs.h> | #include <qb/qbdefs.h> | |||
#include <qb/qbloop.h> | #include <qb/qbloop.h> | |||
#include "ipc_int.h" | #include "ipc_int.h" | |||
#include "util_int.h" | #include "util_int.h" | |||
#ifndef MSGMAX | #ifndef MSGMAX | |||
#define MSGMAX 8192 | #define MSGMAX 8192 | |||
#endif | #endif | |||
#define MY_DATA_SIZE 8000 | #define MY_DATA_SIZE 8000 | |||
struct my_msgbuf { | struct my_msgbuf { | |||
int32_t id __attribute__ ((aligned(8))); | int32_t id __attribute__ ((aligned(8))); | |||
char data[MY_DATA_SIZE] __attribute__ ((aligned(8))); | char data[MY_DATA_SIZE] __attribute__ ((aligned(8))); | |||
} __attribute__ ((aligned(8))); | } __attribute__ ((aligned(8))); | |||
/* | /* | |||
* utility functions | * utility functions | |||
* -------------------------------------------------------- | * -------------------------------------------------------- | |||
*/ | */ | |||
static int32_t sysv_mq_unnamed_create(struct qb_ipcs_connection *c, | static int32_t | |||
struct qb_ipc_one_way *queue) | sysv_mq_unnamed_create(struct qb_ipcs_connection *c, | |||
struct qb_ipc_one_way *queue) | ||||
{ | { | |||
struct msqid_ds info; | struct msqid_ds info; | |||
int32_t res = 0; | int32_t res = 0; | |||
retry_creating_the_q: | retry_creating_the_q: | |||
queue->u.smq.key = random(); | queue->u.smq.key = random(); | |||
queue->u.smq.q = | queue->u.smq.q = | |||
msgget(queue->u.smq.key, | msgget(queue->u.smq.key, | |||
IPC_CREAT | IPC_EXCL | IPC_NOWAIT | S_IWUSR | S_IRUSR | IPC_CREAT | IPC_EXCL | IPC_NOWAIT | S_IWUSR | S_IRUSR); | |||
); | ||||
if (queue->u.smq.q == -1 && errno == EEXIST) { | if (queue->u.smq.q == -1 && errno == EEXIST) { | |||
goto retry_creating_the_q; | goto retry_creating_the_q; | |||
} else if (queue->u.smq.q == -1) { | } else if (queue->u.smq.q == -1) { | |||
return -errno; | return -errno; | |||
} | } | |||
/* | /* | |||
* change the queue size and change the ownership to that of | * change the queue size and change the ownership to that of | |||
* the client so they can access it. | * the client so they can access it. | |||
*/ | */ | |||
res = msgctl(queue->u.smq.q, IPC_STAT, &info); | res = msgctl(queue->u.smq.q, IPC_STAT, &info); | |||
if (res != 0) { | if (res != 0) { | |||
res = -errno; | res = -errno; | |||
qb_util_log(LOG_ERR, "error getting sysv-mq info : %s", | qb_util_perror(LOG_ERR, "error getting sysv-mq info"); | |||
strerror(errno)); | ||||
return res; | return res; | |||
} | } | |||
if (info.msg_perm.uid != 0) { | if (info.msg_perm.uid != 0) { | |||
qb_util_log(LOG_WARNING, | qb_util_log(LOG_WARNING, | |||
"not enough privileges to increase msg_qbytes"); | "not enough privileges to increase msg_qbytes"); | |||
return res; | return res; | |||
} | } | |||
info.msg_qbytes = 2 * queue->max_msg_size; | info.msg_qbytes = 2 * queue->max_msg_size; | |||
info.msg_perm.uid = c->euid; | info.msg_perm.uid = c->euid; | |||
info.msg_perm.gid = c->egid; | info.msg_perm.gid = c->egid; | |||
res = msgctl(queue->u.smq.q, IPC_SET, &info); | res = msgctl(queue->u.smq.q, IPC_SET, &info); | |||
if (res != 0) { | if (res != 0) { | |||
res = -errno; | res = -errno; | |||
qb_util_log(LOG_ERR, | qb_util_perror(LOG_ERR, | |||
"error modifing the SYSV message queue : %s", | "error modifying the SYSV message queue"); | |||
strerror(errno)); | ||||
return res; | return res; | |||
} | } | |||
return 0; | return 0; | |||
} | } | |||
static int32_t sysv_split_and_send(int32_t q, const void *msg_ptr, | static int32_t | |||
size_t msg_len, int32_t last_chunk) | sysv_split_and_send(int32_t q, const void *msg_ptr, | |||
size_t msg_len, int32_t last_chunk) | ||||
{ | { | |||
int32_t res; | int32_t res; | |||
int32_t sent = 0; | int32_t sent = 0; | |||
#ifdef PACK_MESSAGES | #ifdef PACK_MESSAGES | |||
char *progress = (char *)msg_ptr; | char *progress = (char *)msg_ptr; | |||
struct my_msgbuf buf; | struct my_msgbuf buf; | |||
size_t to_send_now; /* to send in this message */ | size_t to_send_now; /* to send in this message */ | |||
size_t to_send_next; /* to send in next message */ | size_t to_send_next; /* to send in next message */ | |||
do { | do { | |||
to_send_now = QB_MIN(msg_len - sent, MY_DATA_SIZE); | to_send_now = QB_MIN(msg_len - sent, MY_DATA_SIZE); | |||
to_send_next = msg_len - (sent + to_send_now); | to_send_next = msg_len - (sent + to_send_now); | |||
/* receiver used the ID to check to see if there | /* receiver used the ID to check to see if there | |||
* is more to recieve for this message. | * is more to receive for this message. | |||
*/ | */ | |||
if (last_chunk) { | if (last_chunk) { | |||
buf.id = to_send_next + 1; | buf.id = to_send_next + 1; | |||
} else { | } else { | |||
buf.id = to_send_next + 1 + msg_len; | buf.id = to_send_next + 1 + msg_len; | |||
} | } | |||
memcpy(buf.data, progress, to_send_now); | memcpy(buf.data, progress, to_send_now); | |||
res = msgsnd(q, &buf, to_send_now, IPC_NOWAIT); | res = msgsnd(q, &buf, to_send_now, IPC_NOWAIT); | |||
if (res == 0) { | if (res == 0) { | |||
sent += to_send_now; | sent += to_send_now; | |||
skipping to change at line 143 | skipping to change at line 147 | |||
if (res == -1) { | if (res == -1) { | |||
return -errno; | return -errno; | |||
} | } | |||
return sent; | return sent; | |||
} | } | |||
/* | /* | |||
* client functions | * client functions | |||
* -------------------------------------------------------- | * -------------------------------------------------------- | |||
*/ | */ | |||
static ssize_t qb_ipc_smq_send(struct qb_ipc_one_way *one_way, | static ssize_t | |||
const void *msg_ptr, size_t msg_len) | qb_ipc_smq_send(struct qb_ipc_one_way *one_way, | |||
const void *msg_ptr, size_t msg_len) | ||||
{ | { | |||
return sysv_split_and_send(one_way->u.smq.q, msg_ptr, msg_len, QB_TR UE); | return sysv_split_and_send(one_way->u.smq.q, msg_ptr, msg_len, QB_TR UE); | |||
} | } | |||
static ssize_t qb_ipc_smq_sendv(struct qb_ipc_one_way *one_way, | static ssize_t | |||
const struct iovec *iov, size_t iov_len) | qb_ipc_smq_sendv(struct qb_ipc_one_way *one_way, | |||
const struct iovec *iov, size_t iov_len) | ||||
{ | { | |||
int32_t res; | int32_t res; | |||
int32_t sent = 0; | int32_t sent = 0; | |||
struct my_msgbuf buf; | struct my_msgbuf buf; | |||
int32_t i; | int32_t i; | |||
for (i = 0; i < iov_len; i++) { | for (i = 0; i < iov_len; i++) { | |||
if (iov[i].iov_len <= MY_DATA_SIZE) { | if (iov[i].iov_len <= MY_DATA_SIZE) { | |||
if (i == iov_len-1) { | if (i == iov_len - 1) { | |||
buf.id = 1; | buf.id = 1; | |||
} else { | } else { | |||
buf.id = i + iov[i].iov_len; | buf.id = i + iov[i].iov_len; | |||
} | } | |||
memcpy(buf.data, iov[i].iov_base, iov[i].iov_len); | memcpy(buf.data, iov[i].iov_base, iov[i].iov_len); | |||
res = msgsnd(one_way->u.smq.q, &buf, iov[i].iov_len, | res = msgsnd(one_way->u.smq.q, | |||
IPC_NOWAIT); | &buf, iov[i].iov_len, | |||
IPC_NOWAIT); | ||||
if (res == 0) { | if (res == 0) { | |||
res = iov[i].iov_len; | res = iov[i].iov_len; | |||
} else { | } else { | |||
res = -errno; | res = -errno; | |||
} | } | |||
} else { | } else { | |||
res = sysv_split_and_send(one_way->u.smq.q, iov[i].i | res = sysv_split_and_send(one_way->u.smq.q, | |||
ov_base, | iov[i].iov_base, | |||
iov[i].iov_len, (i == iov_ | iov[i].iov_len, | |||
len-1)); | (i == iov_len - 1)); | |||
} | } | |||
if (res > 0) { | if (res > 0) { | |||
sent += res; | sent += res; | |||
} else { | } else { | |||
return res; | return res; | |||
} | } | |||
} | } | |||
return sent; | return sent; | |||
} | } | |||
static ssize_t qb_ipc_smq_recv(struct qb_ipc_one_way *one_way, | static ssize_t | |||
void *msg_ptr, | qb_ipc_smq_recv(struct qb_ipc_one_way *one_way, | |||
size_t msg_len, | void *msg_ptr, size_t msg_len, int32_t ms_timeout) | |||
int32_t ms_timeout) | ||||
{ | { | |||
ssize_t res; | ssize_t res; | |||
ssize_t received = 0; | ssize_t received = 0; | |||
#ifdef PACK_MESSAGES | #ifdef PACK_MESSAGES | |||
char *progress = (char *)msg_ptr; | char *progress = (char *)msg_ptr; | |||
struct my_msgbuf buf; | struct my_msgbuf buf; | |||
do { | do { | |||
try_again: | try_again: | |||
res = msgrcv(one_way->u.smq.q, &buf, MY_DATA_SIZE, 0, IPC_NO | res = msgrcv(one_way->u.smq.q, &buf, | |||
WAIT); | MY_DATA_SIZE, 0, IPC_NOWAIT); | |||
if (res == -1 && errno == ENOMSG) { | if (res == -1 && errno == ENOMSG) { | |||
goto try_again; | goto try_again; | |||
} | } | |||
//printf("res:%zd, ID:%d\n", res, buf.id); | //printf("res:%zd, ID:%d\n", res, buf.id); | |||
if (res == -1) { | if (res == -1) { | |||
goto return_status; | goto return_status; | |||
} | } | |||
memcpy(progress, buf.data, res); | memcpy(progress, buf.data, res); | |||
received += res; | received += res; | |||
progress += res; | progress += res; | |||
} while (buf.id > 1); | } while (buf.id > 1); | |||
return_status: | return_status: | |||
#else | #else | |||
res = msgrcv(one_way->u.smq.q, msg_ptr, msg_len, 0, IPC_NOWAIT); | res = msgrcv(one_way->u.smq.q, msg_ptr, msg_len, 0, IPC_NOWAIT); | |||
received = res; | received = res; | |||
#endif | #endif | |||
if (res == -1 && errno == ENOMSG) { | if (res == -1 && errno == ENOMSG) { | |||
/* just to be consistent with other IPC types. | /* just to be consistent with other IPC types. | |||
*/ | */ | |||
return -EAGAIN; | return -EAGAIN; | |||
} | } | |||
if (res == -1) { | if (res == -1) { | |||
perror(__func__); | perror(__func__); | |||
return -errno; | return -errno; | |||
} | } | |||
return received; | return received; | |||
} | } | |||
static void qb_ipcc_smq_disconnect(struct qb_ipcc_connection *c) | static void | |||
qb_ipcc_smq_disconnect(struct qb_ipcc_connection *c) | ||||
{ | { | |||
struct qb_ipc_request_header hdr; | struct qb_ipc_request_header hdr; | |||
qb_util_log(LOG_DEBUG, "%s()", __func__); | qb_util_log(LOG_TRACE, "%s()", __func__); | |||
//if (c->needs_sock_for_poll) { | ||||
// return; | ||||
//} | ||||
hdr.id = QB_IPC_MSG_DISCONNECT; | hdr.id = QB_IPC_MSG_DISCONNECT; | |||
hdr.size = sizeof(hdr); | hdr.size = sizeof(hdr); | |||
(void)sysv_split_and_send(c->request.u.smq.q, | (void)sysv_split_and_send(c->request.u.smq.q, | |||
(const char *)&hdr, hdr.size, | (const char *)&hdr, hdr.size, QB_TRUE); | |||
QB_TRUE); | ||||
msgctl(c->event.u.smq.q, IPC_RMID, NULL); | msgctl(c->event.u.smq.q, IPC_RMID, NULL); | |||
msgctl(c->response.u.smq.q, IPC_RMID, NULL); | msgctl(c->response.u.smq.q, IPC_RMID, NULL); | |||
msgctl(c->request.u.smq.q, IPC_RMID, NULL); | msgctl(c->request.u.smq.q, IPC_RMID, NULL); | |||
} | } | |||
int32_t qb_ipcc_smq_connect(struct qb_ipcc_connection *c, | int32_t | |||
struct qb_ipc_connection_response *response) | qb_ipcc_smq_connect(struct qb_ipcc_connection *c, | |||
struct qb_ipc_connection_response *response) | ||||
{ | { | |||
int32_t res = 0; | int32_t res = 0; | |||
c->funcs.send = qb_ipc_smq_send; | c->funcs.send = qb_ipc_smq_send; | |||
c->funcs.sendv = qb_ipc_smq_sendv; | c->funcs.sendv = qb_ipc_smq_sendv; | |||
c->funcs.recv = qb_ipc_smq_recv; | c->funcs.recv = qb_ipc_smq_recv; | |||
c->funcs.fc_get = NULL; | c->funcs.fc_get = NULL; | |||
c->funcs.disconnect = qb_ipcc_smq_disconnect; | c->funcs.disconnect = qb_ipcc_smq_disconnect; | |||
c->type = QB_IPC_SYSV_MQ; | c->type = QB_IPC_SYSV_MQ; | |||
c->needs_sock_for_poll = QB_TRUE; | c->needs_sock_for_poll = QB_TRUE; | |||
skipping to change at line 277 | skipping to change at line 285 | |||
res = -errno; | res = -errno; | |||
perror("msgget:REQUEST"); | perror("msgget:REQUEST"); | |||
goto cleanup; | goto cleanup; | |||
} | } | |||
memcpy(&c->response.u.smq.key, response->response, sizeof(uint32_t)) ; | memcpy(&c->response.u.smq.key, response->response, sizeof(uint32_t)) ; | |||
c->response.u.smq.q = msgget(c->response.u.smq.key, IPC_NOWAIT); | c->response.u.smq.q = msgget(c->response.u.smq.key, IPC_NOWAIT); | |||
if (c->response.u.smq.q == -1) { | if (c->response.u.smq.q == -1) { | |||
res = -errno; | res = -errno; | |||
perror("msgget:RESPONSE"); | perror("msgget:RESPONSE"); | |||
goto cleanup; | goto cleanup_request; | |||
} | } | |||
memcpy(&c->event.u.smq.key, response->event, sizeof(uint32_t)); | memcpy(&c->event.u.smq.key, response->event, sizeof(uint32_t)); | |||
c->event.u.smq.q = msgget(c->event.u.smq.key, IPC_NOWAIT); | c->event.u.smq.q = msgget(c->event.u.smq.key, IPC_NOWAIT); | |||
if (c->event.u.smq.q == -1) { | if (c->event.u.smq.q == -1) { | |||
res = -errno; | res = -errno; | |||
perror("msgget:EVENT"); | perror("msgget:EVENT"); | |||
goto cleanup; | goto cleanup_request_response; | |||
} | } | |||
return 0; | ||||
cleanup_request_response: | ||||
msgctl(c->response.u.smq.q, IPC_RMID, NULL); | ||||
cleanup_request: | ||||
msgctl(c->request.u.smq.q, IPC_RMID, NULL); | ||||
cleanup: | cleanup: | |||
return res; | return res; | |||
} | } | |||
/* | /* | |||
* service functions | * service functions | |||
* -------------------------------------------------------- | * -------------------------------------------------------- | |||
*/ | */ | |||
static void qb_ipcs_smq_disconnect(struct qb_ipcs_connection *c) | static void | |||
qb_ipcs_smq_disconnect(struct qb_ipcs_connection *c) | ||||
{ | { | |||
struct qb_ipc_response_header msg; | struct qb_ipc_response_header msg; | |||
if (c->setup.u.us.sock != -1) { | if (c->setup.u.us.sock != -1) { | |||
msg.id = QB_IPC_MSG_DISCONNECT; | msg.id = QB_IPC_MSG_DISCONNECT; | |||
msg.size = sizeof(msg); | msg.size = sizeof(msg); | |||
msg.error = 0; | msg.error = 0; | |||
(void)qb_ipc_smq_send(&c->event, &msg, msg.size); | (void)qb_ipc_smq_send(&c->event, &msg, msg.size); | |||
} else { | } else { | |||
msgctl(c->event.u.smq.q, IPC_RMID, NULL); | msgctl(c->event.u.smq.q, IPC_RMID, NULL); | |||
msgctl(c->response.u.smq.q, IPC_RMID, NULL); | msgctl(c->response.u.smq.q, IPC_RMID, NULL); | |||
msgctl(c->request.u.smq.q, IPC_RMID, NULL); | msgctl(c->request.u.smq.q, IPC_RMID, NULL); | |||
} | } | |||
} | } | |||
static int32_t qb_ipcs_smq_connect(struct qb_ipcs_service *s, | static int32_t | |||
struct qb_ipcs_connection *c, | qb_ipcs_smq_connect(struct qb_ipcs_service *s, | |||
struct qb_ipc_connection_response *r) | struct qb_ipcs_connection *c, | |||
struct qb_ipc_connection_response *r) | ||||
{ | { | |||
int32_t res = 0; | int32_t res = 0; | |||
res = sysv_mq_unnamed_create(c, &c->request); | res = sysv_mq_unnamed_create(c, &c->request); | |||
if (res < 0) { | if (res < 0) { | |||
res = -errno; | res = -errno; | |||
goto cleanup; | goto cleanup; | |||
} | } | |||
memcpy(r->request, &c->request.u.smq.key, sizeof(int32_t)); | memcpy(r->request, &c->request.u.smq.key, sizeof(int32_t)); | |||
skipping to change at line 343 | skipping to change at line 358 | |||
res = sysv_mq_unnamed_create(c, &c->event); | res = sysv_mq_unnamed_create(c, &c->event); | |||
if (res < 0) { | if (res < 0) { | |||
res = -errno; | res = -errno; | |||
goto cleanup_request_response; | goto cleanup_request_response; | |||
} | } | |||
memcpy(r->event, &c->event.u.smq.key, sizeof(int32_t)); | memcpy(r->event, &c->event.u.smq.key, sizeof(int32_t)); | |||
r->hdr.error = 0; | r->hdr.error = 0; | |||
return 0; | return 0; | |||
cleanup_request: | ||||
msgctl(c->request.u.smq.q, IPC_RMID, NULL); | ||||
cleanup_request_response: | cleanup_request_response: | |||
msgctl(c->response.u.smq.q, IPC_RMID, NULL); | msgctl(c->response.u.smq.q, IPC_RMID, NULL); | |||
cleanup_request: | ||||
msgctl(c->request.u.smq.q, IPC_RMID, NULL); | ||||
cleanup: | cleanup: | |||
r->hdr.error = res; | r->hdr.error = res; | |||
return res; | return res; | |||
} | } | |||
static ssize_t qb_ipc_smq_q_len_get(struct qb_ipc_one_way *one_way) | static ssize_t | |||
qb_ipc_smq_q_len_get(struct qb_ipc_one_way *one_way) | ||||
{ | { | |||
struct msqid_ds info; | struct msqid_ds info; | |||
int32_t res = msgctl(one_way->u.smq.q, IPC_STAT, &info); | int32_t res = msgctl(one_way->u.smq.q, IPC_STAT, &info); | |||
if (res == 0) { | if (res == 0) { | |||
return info.msg_qnum; | return info.msg_qnum; | |||
} | } | |||
return -errno; | return -errno; | |||
} | } | |||
void qb_ipcs_smq_init(struct qb_ipcs_service *s) | void | |||
qb_ipcs_smq_init(struct qb_ipcs_service *s) | ||||
{ | { | |||
s->funcs.connect = qb_ipcs_smq_connect; | s->funcs.connect = qb_ipcs_smq_connect; | |||
s->funcs.disconnect = qb_ipcs_smq_disconnect; | s->funcs.disconnect = qb_ipcs_smq_disconnect; | |||
s->funcs.send = qb_ipc_smq_send; | s->funcs.send = qb_ipc_smq_send; | |||
s->funcs.sendv = qb_ipc_smq_sendv; | s->funcs.sendv = qb_ipc_smq_sendv; | |||
s->funcs.recv = qb_ipc_smq_recv; | s->funcs.recv = qb_ipc_smq_recv; | |||
s->funcs.peek = NULL; | s->funcs.peek = NULL; | |||
s->funcs.reclaim = NULL; | s->funcs.reclaim = NULL; | |||
End of changes. 31 change blocks. | ||||
50 lines changed or deleted | 62 lines changed or added | |||
This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/ |