ipc_shm.c   ipc_shm.c 
skipping to change at line 26 skipping to change at line 26
* GNU Lesser General Public License for more details. * GNU Lesser General Public License for more details.
* *
* You should have received a copy of the GNU Lesser General Public License * You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>. * along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os_base.h" #include "os_base.h"
#include "ipc_int.h" #include "ipc_int.h"
#include "util_int.h" #include "util_int.h"
#include <qb/qbdefs.h> #include <qb/qbdefs.h>
#include <qb/qbatomic.h>
#include <qb/qbloop.h> #include <qb/qbloop.h>
#include <qb/qbrb.h> #include <qb/qbrb.h>
/* /*
* utility functions * utility functions
* -------------------------------------------------------- * --------------------------------------------------------
*/ */
/* /*
* client functions * client functions
* -------------------------------------------------------- * --------------------------------------------------------
*/ */
static void qb_ipcc_shm_disconnect(struct qb_ipcc_connection *c) static void
qb_ipcc_shm_disconnect(struct qb_ipcc_connection *c)
{ {
qb_rb_close(c->request.u.shm.rb); qb_rb_close(c->request.u.shm.rb);
qb_rb_close(c->response.u.shm.rb); qb_rb_close(c->response.u.shm.rb);
qb_rb_close(c->event.u.shm.rb); qb_rb_close(c->event.u.shm.rb);
} }
static ssize_t qb_ipc_shm_send(struct qb_ipc_one_way *one_way, static ssize_t
const void *msg_ptr, size_t msg_len) qb_ipc_shm_send(struct qb_ipc_one_way *one_way,
const void *msg_ptr, size_t msg_len)
{ {
return qb_rb_chunk_write(one_way->u.shm.rb, msg_ptr, msg_len); return qb_rb_chunk_write(one_way->u.shm.rb, msg_ptr, msg_len);
} }
static ssize_t qb_ipc_shm_sendv(struct qb_ipc_one_way *one_way, static ssize_t
const struct iovec* iov, qb_ipc_shm_sendv(struct qb_ipc_one_way *one_way,
size_t iov_len) const struct iovec *iov, size_t iov_len)
{ {
char *dest; char *dest;
int32_t res = 0; int32_t res = 0;
int32_t total_size = 0; int32_t total_size = 0;
int32_t i; int32_t i;
char *pt = NULL; char *pt = NULL;
if (one_way->u.shm.rb == NULL) { if (one_way->u.shm.rb == NULL) {
return -ENOTCONN; return -ENOTCONN;
} }
skipping to change at line 84 skipping to change at line 87
memcpy(pt, iov[i].iov_base, iov[i].iov_len); memcpy(pt, iov[i].iov_base, iov[i].iov_len);
pt += iov[i].iov_len; pt += iov[i].iov_len;
} }
res = qb_rb_chunk_commit(one_way->u.shm.rb, total_size); res = qb_rb_chunk_commit(one_way->u.shm.rb, total_size);
if (res < 0) { if (res < 0) {
return res; return res;
} }
return total_size; return total_size;
} }
static ssize_t qb_ipc_shm_recv(struct qb_ipc_one_way *one_way, static ssize_t
void *msg_ptr, qb_ipc_shm_recv(struct qb_ipc_one_way *one_way,
size_t msg_len, void *msg_ptr, size_t msg_len, int32_t ms_timeout)
int32_t ms_timeout)
{ {
if (one_way->u.shm.rb == NULL) { if (one_way->u.shm.rb == NULL) {
return -ENOTCONN; return -ENOTCONN;
} }
return qb_rb_chunk_read(one_way->u.shm.rb, return qb_rb_chunk_read(one_way->u.shm.rb,
(void *)msg_ptr, (void *)msg_ptr, msg_len, ms_timeout);
msg_len,
ms_timeout);
} }
static ssize_t qb_ipc_shm_peek(struct qb_ipc_one_way *one_way, void **data_ static ssize_t
out, int32_t ms_timeout) qb_ipc_shm_peek(struct qb_ipc_one_way *one_way, void **data_out,
int32_t ms_timeout)
{ {
if (one_way->u.shm.rb == NULL) { if (one_way->u.shm.rb == NULL) {
return -ENOTCONN; return -ENOTCONN;
} }
return qb_rb_chunk_peek(one_way->u.shm.rb, return qb_rb_chunk_peek(one_way->u.shm.rb, data_out, ms_timeout);
data_out,
ms_timeout);
} }
static void qb_ipc_shm_reclaim(struct qb_ipc_one_way *one_way) static void
qb_ipc_shm_reclaim(struct qb_ipc_one_way *one_way)
{ {
if (one_way->u.shm.rb != NULL) { if (one_way->u.shm.rb != NULL) {
qb_rb_chunk_reclaim(one_way->u.shm.rb); qb_rb_chunk_reclaim(one_way->u.shm.rb);
} }
} }
static void qb_ipc_shm_fc_set(struct qb_ipc_one_way *one_way, static void
int32_t fc_enable) qb_ipc_shm_fc_set(struct qb_ipc_one_way *one_way, int32_t fc_enable)
{ {
int32_t *fc; int32_t *fc;
fc = qb_rb_shared_user_data_get(one_way->u.shm.rb); fc = qb_rb_shared_user_data_get(one_way->u.shm.rb);
*fc = fc_enable; qb_util_log(LOG_TRACE, "setting fc to %d", fc_enable);
qb_atomic_int_set(fc, fc_enable);
} }
static int32_t qb_ipc_shm_fc_get(struct qb_ipc_one_way *one_way) static int32_t
qb_ipc_shm_fc_get(struct qb_ipc_one_way *one_way)
{ {
int32_t *fc; int32_t *fc;
int32_t rc = qb_rb_refcount_get(one_way->u.shm.rb); int32_t rc = qb_rb_refcount_get(one_way->u.shm.rb);
if (rc != 2) { if (rc != 2) {
return -ENOTCONN; return -ENOTCONN;
} }
fc = qb_rb_shared_user_data_get(one_way->u.shm.rb); fc = qb_rb_shared_user_data_get(one_way->u.shm.rb);
return *fc; return qb_atomic_int_get(fc);
} }
static ssize_t qb_ipc_shm_q_len_get(struct qb_ipc_one_way *one_way) static ssize_t
qb_ipc_shm_q_len_get(struct qb_ipc_one_way *one_way)
{ {
if (one_way->u.shm.rb == NULL) { if (one_way->u.shm.rb == NULL) {
return -ENOTCONN; return -ENOTCONN;
} }
return qb_rb_chunks_used(one_way->u.shm.rb); return qb_rb_chunks_used(one_way->u.shm.rb);
} }
int32_t qb_ipcc_shm_connect(struct qb_ipcc_connection *c, int32_t
struct qb_ipc_connection_response *response) qb_ipcc_shm_connect(struct qb_ipcc_connection * c,
struct qb_ipc_connection_response * response)
{ {
int32_t res = 0; int32_t res = 0;
c->funcs.send = qb_ipc_shm_send; c->funcs.send = qb_ipc_shm_send;
c->funcs.sendv = qb_ipc_shm_sendv; c->funcs.sendv = qb_ipc_shm_sendv;
c->funcs.recv = qb_ipc_shm_recv; c->funcs.recv = qb_ipc_shm_recv;
c->funcs.fc_get = qb_ipc_shm_fc_get; c->funcs.fc_get = qb_ipc_shm_fc_get;
c->funcs.disconnect = qb_ipcc_shm_disconnect; c->funcs.disconnect = qb_ipcc_shm_disconnect;
c->needs_sock_for_poll = QB_TRUE; c->needs_sock_for_poll = QB_TRUE;
if (strlen(c->name) > (NAME_MAX - 20)) { if (strlen(c->name) > (NAME_MAX - 20)) {
errno = EINVAL; errno = EINVAL;
return -errno; return -errno;
} }
c->request.u.shm.rb = qb_rb_open(response->request, c->request.max_m c->request.u.shm.rb =
sg_size, qb_rb_open(response->request, c->request.max_msg_size,
QB_RB_FLAG_SHARED_PROCESS, QB_RB_FLAG_SHARED_PROCESS, sizeof(int32_t));
sizeof(int32_t));
if (c->request.u.shm.rb == NULL) { if (c->request.u.shm.rb == NULL) {
res = -errno; res = -errno;
goto return_error; goto return_error;
} }
c->response.u.shm.rb = qb_rb_open(response->response, c->response.u.shm.rb = qb_rb_open(response->response,
c->response.max_msg_size, c->response.max_msg_size,
QB_RB_FLAG_SHARED_PROCESS, 0); QB_RB_FLAG_SHARED_PROCESS, 0);
if (c->response.u.shm.rb == NULL) { if (c->response.u.shm.rb == NULL) {
res = -errno; res = -errno;
skipping to change at line 187 skipping to change at line 192
c->response.max_msg_size, c->response.max_msg_size,
QB_RB_FLAG_SHARED_PROCESS, 0); QB_RB_FLAG_SHARED_PROCESS, 0);
if (c->event.u.shm.rb == NULL) { if (c->event.u.shm.rb == NULL) {
res = -errno; res = -errno;
perror("qb_rb_open:EVENT"); perror("qb_rb_open:EVENT");
goto cleanup_request_response; goto cleanup_request_response;
} }
return 0; return 0;
cleanup_request_response: cleanup_request_response:
qb_rb_close(c->response.u.shm.rb); qb_rb_close(c->response.u.shm.rb);
cleanup_request: cleanup_request:
qb_rb_close(c->request.u.shm.rb); qb_rb_close(c->request.u.shm.rb);
return_error: return_error:
qb_util_log(LOG_ERR, "connection failed %s", errno = -res;
strerror(-res)); qb_util_perror(LOG_ERR, "connection failed");
return res; return res;
} }
/* /*
* service functions * service functions
* -------------------------------------------------------- * --------------------------------------------------------
*/ */
static void qb_ipcs_shm_disconnect(struct qb_ipcs_connection *c) static void
qb_ipcs_shm_disconnect(struct qb_ipcs_connection *c)
{ {
struct qb_ipc_response_header msg; struct qb_ipc_response_header msg;
int32_t peer_alive = QB_TRUE;
if (c->setup.u.us.sock == -1) {
peer_alive = QB_FALSE;
}
msg.id = QB_IPC_MSG_DISCONNECT; msg.id = QB_IPC_MSG_DISCONNECT;
msg.size = sizeof(msg); msg.size = sizeof(msg);
msg.error = 0; msg.error = 0;
if (c->response.u.shm.rb) { if (c->response.u.shm.rb) {
qb_rb_close(c->response.u.shm.rb); qb_rb_close(c->response.u.shm.rb);
c->response.u.shm.rb = NULL; c->response.u.shm.rb = NULL;
} }
if (c->event.u.shm.rb) { if (c->event.u.shm.rb) {
qb_rb_close(c->event.u.shm.rb); qb_rb_close(c->event.u.shm.rb);
c->event.u.shm.rb = NULL; c->event.u.shm.rb = NULL;
} }
if (c->request.u.shm.rb) { if (c->request.u.shm.rb) {
qb_rb_close(c->request.u.shm.rb); qb_rb_close(c->request.u.shm.rb);
c->request.u.shm.rb = NULL; c->request.u.shm.rb = NULL;
} }
} }
static int32_t qb_ipcs_shm_connect(struct qb_ipcs_service *s, static int32_t
struct qb_ipcs_connection *c, qb_ipcs_shm_connect(struct qb_ipcs_service *s,
struct qb_ipc_connection_response *r) struct qb_ipcs_connection *c,
struct qb_ipc_connection_response *r)
{ {
int32_t res; int32_t res;
qb_util_log(LOG_DEBUG, "connecting to client [%d]", c->pid); qb_util_log(LOG_DEBUG, "connecting to client [%d]", c->pid);
snprintf(r->request, NAME_MAX, "qb-%s-request-%d-%d", s->name, c->pi snprintf(r->request, NAME_MAX, "%s-request-%d-%d", s->name, c->pid,
d, c->setup.u.us.sock); c->setup.u.us.sock);
snprintf(r->response, NAME_MAX, "qb-%s-response-%d-%d", s->name, c-> snprintf(r->response, NAME_MAX, "%s-response-%d-%d", s->name, c->pid
pid, c->setup.u.us.sock); ,
snprintf(r->event, NAME_MAX, "qb-%s-event-%d-%d", s->name, c->pid, c c->setup.u.us.sock);
->setup.u.us.sock); snprintf(r->event, NAME_MAX, "%s-event-%d-%d", s->name, c->pid,
c->setup.u.us.sock);
c->request.u.shm.rb = qb_rb_open(r->request, c->request.u.shm.rb = qb_rb_open(r->request,
c->request.max_msg_size, c->request.max_msg_size,
QB_RB_FLAG_CREATE | QB_RB_FLAG_CREATE |
QB_RB_FLAG_SHARED_PROCESS, QB_RB_FLAG_SHARED_PROCESS,
sizeof(int32_t)); sizeof(int32_t));
if (c->request.u.shm.rb == NULL) { if (c->request.u.shm.rb == NULL) {
res = -errno; res = -errno;
perror("qb_rb_open:REQUEST"); perror("qb_rb_open:REQUEST");
goto cleanup; goto cleanup;
skipping to change at line 290 skipping to change at line 295
return 0; return 0;
cleanup_request_response: cleanup_request_response:
qb_rb_close(c->request.u.shm.rb); qb_rb_close(c->request.u.shm.rb);
cleanup_request: cleanup_request:
qb_rb_close(c->response.u.shm.rb); qb_rb_close(c->response.u.shm.rb);
cleanup: cleanup:
r->hdr.error = res; r->hdr.error = res;
qb_util_log(LOG_ERR, "shm connection FAILED [%s]", errno = -res;
strerror(-res)); qb_util_perror(LOG_ERR, "shm connection FAILED");
return res; return res;
} }
void qb_ipcs_shm_init(struct qb_ipcs_service *s) void
qb_ipcs_shm_init(struct qb_ipcs_service *s)
{ {
s->funcs.connect = qb_ipcs_shm_connect; s->funcs.connect = qb_ipcs_shm_connect;
s->funcs.disconnect = qb_ipcs_shm_disconnect; s->funcs.disconnect = qb_ipcs_shm_disconnect;
s->funcs.recv = qb_ipc_shm_recv; s->funcs.recv = qb_ipc_shm_recv;
s->funcs.peek = qb_ipc_shm_peek; s->funcs.peek = qb_ipc_shm_peek;
s->funcs.reclaim = qb_ipc_shm_reclaim; s->funcs.reclaim = qb_ipc_shm_reclaim;
s->funcs.send = qb_ipc_shm_send; s->funcs.send = qb_ipc_shm_send;
s->funcs.sendv = qb_ipc_shm_sendv; s->funcs.sendv = qb_ipc_shm_sendv;
 End of changes. 25 change blocks. 
54 lines changed or deleted 56 lines changed or added

This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/