ipcs.c   ipcs.c 
skipping to change at line 34 skipping to change at line 34
#include "ipc_int.h" #include "ipc_int.h"
#include <qb/qbdefs.h> #include <qb/qbdefs.h>
#include <qb/qbatomic.h> #include <qb/qbatomic.h>
#include <qb/qbipcs.h> #include <qb/qbipcs.h>
static void qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c, static void qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c,
int32_t fc_enable); int32_t fc_enable);
static QB_LIST_DECLARE(qb_ipc_services); static QB_LIST_DECLARE(qb_ipc_services);
qb_ipcs_service_t* qb_ipcs_create(const char *name, qb_ipcs_service_t *
int32_t service_id, qb_ipcs_create(const char *name,
enum qb_ipc_type type, int32_t service_id,
struct qb_ipcs_service_handlers *handlers) enum qb_ipc_type type, struct qb_ipcs_service_handlers *handl
ers)
{ {
struct qb_ipcs_service *s; struct qb_ipcs_service *s;
s = calloc(1, sizeof(struct qb_ipcs_service)); s = calloc(1, sizeof(struct qb_ipcs_service));
if (s == NULL) {
return NULL;
}
s->pid = getpid(); s->pid = getpid();
s->type = type; s->type = type;
s->needs_sock_for_poll = QB_FALSE; s->needs_sock_for_poll = QB_FALSE;
s->poll_priority = QB_LOOP_MED; s->poll_priority = QB_LOOP_MED;
s->ref_count = 1; s->ref_count = 1;
s->service_id = service_id; s->service_id = service_id;
strncpy(s->name, name, NAME_MAX); strncpy(s->name, name, NAME_MAX);
skipping to change at line 65 skipping to change at line 68
s->serv_fns.connection_closed = handlers->connection_closed; s->serv_fns.connection_closed = handlers->connection_closed;
s->serv_fns.connection_destroyed = handlers->connection_destroyed; s->serv_fns.connection_destroyed = handlers->connection_destroyed;
qb_list_init(&s->connections); qb_list_init(&s->connections);
qb_list_init(&s->list); qb_list_init(&s->list);
qb_list_add(&s->list, &qb_ipc_services); qb_list_add(&s->list, &qb_ipc_services);
return s; return s;
} }
void qb_ipcs_poll_handlers_set(struct qb_ipcs_service* s, void
struct qb_ipcs_poll_handlers *handlers) qb_ipcs_poll_handlers_set(struct qb_ipcs_service *s,
struct qb_ipcs_poll_handlers *handlers)
{ {
s->poll_fns.job_add = handlers->job_add; s->poll_fns.job_add = handlers->job_add;
s->poll_fns.dispatch_add = handlers->dispatch_add; s->poll_fns.dispatch_add = handlers->dispatch_add;
s->poll_fns.dispatch_mod = handlers->dispatch_mod; s->poll_fns.dispatch_mod = handlers->dispatch_mod;
s->poll_fns.dispatch_del = handlers->dispatch_del; s->poll_fns.dispatch_del = handlers->dispatch_del;
} }
int32_t qb_ipcs_run(struct qb_ipcs_service* s) int32_t
qb_ipcs_run(struct qb_ipcs_service *s)
{ {
int32_t res; int32_t res = 0;
if (s->poll_fns.dispatch_add == NULL ||
s->poll_fns.dispatch_mod == NULL ||
s->poll_fns.dispatch_del == NULL) {
return -EINVAL;
}
switch (s->type) { switch (s->type) {
case QB_IPC_SOCKET: case QB_IPC_SOCKET:
qb_ipcs_us_init((struct qb_ipcs_service *)s); qb_ipcs_us_init((struct qb_ipcs_service *)s);
break; break;
case QB_IPC_SHM: case QB_IPC_SHM:
qb_ipcs_shm_init((struct qb_ipcs_service *)s); qb_ipcs_shm_init((struct qb_ipcs_service *)s);
break; break;
case QB_IPC_POSIX_MQ: case QB_IPC_POSIX_MQ:
#ifdef HAVE_POSIX_MQ
qb_ipcs_pmq_init((struct qb_ipcs_service *)s); qb_ipcs_pmq_init((struct qb_ipcs_service *)s);
#else
res = -ENOTSUP;
#endif /* HAVE_POSIX_MQ */
break; break;
case QB_IPC_SYSV_MQ: case QB_IPC_SYSV_MQ:
#ifdef HAVE_SYSV_MQ
qb_ipcs_smq_init((struct qb_ipcs_service *)s); qb_ipcs_smq_init((struct qb_ipcs_service *)s);
#else
res = -ENOTSUP;
#endif /* HAVE_SYSV_MQ */
break; break;
default: default:
res = -EINVAL; res = -EINVAL;
break; break;
} }
res = qb_ipcs_us_publish(s);
if (res < 0) { if (res < 0) {
qb_ipcs_unref(s); qb_ipcs_unref(s);
return res; return res;
} }
res = qb_ipcs_us_publish(s);
if (res < 0) { if (res < 0) {
(void)qb_ipcs_us_withdraw(s); (void)qb_ipcs_us_withdraw(s);
qb_ipcs_unref(s);
return res;
} }
return res; return res;
} }
static int32_t _modify_dispatch_descriptor_(struct qb_ipcs_connection *c) static int32_t
_modify_dispatch_descriptor_(struct qb_ipcs_connection *c)
{ {
qb_ipcs_dispatch_mod_fn disp_mod = c->service->poll_fns.dispatch_mod
;
if (c->service->type == QB_IPC_POSIX_MQ if (c->service->type == QB_IPC_POSIX_MQ
&& !c->service->needs_sock_for_poll) { && !c->service->needs_sock_for_poll) {
return c->service->poll_fns.dispatch_mod(c->service-> #ifdef HAVE_MQUEUE_H
poll_priority, return disp_mod(c->service->poll_priority,
(int32_t) c->reques (int32_t) c->request.u.pmq.q,
t.u. c->poll_events, c,
pmq.q, c->poll_even qb_ipcs_dispatch_service_request);
ts, #endif /* HAVE_MQUEUE_H */
c,
qb_ipcs_dispatch_se
rvice_request);
} else if (c->service->type == QB_IPC_SOCKET) { } else if (c->service->type == QB_IPC_SOCKET) {
return c->service->poll_fns.dispatch_mod(c->service-> return disp_mod(c->service->poll_priority,
poll_priority, c->event.u.us.sock,
c->event.u.us.sock, c->poll_events, c,
c->poll_events, c, qb_ipcs_dispatch_connection_request);
qb_ipcs_dispatch_co
nnection_request);
} else { } else {
return c->service->poll_fns.dispatch_mod(c->service-> return disp_mod(c->service->poll_priority,
poll_priority, c->setup.u.us.sock,
c->setup.u.us.sock, c->poll_events, c,
c->poll_events, c, qb_ipcs_dispatch_connection_request);
qb_ipcs_dispatch_co
nnection_request);
} }
return -EINVAL; return -EINVAL;
} }
void qb_ipcs_request_rate_limit(struct qb_ipcs_service *s, void
enum qb_ipcs_rate_limit rl) qb_ipcs_request_rate_limit(struct qb_ipcs_service *s,
enum qb_ipcs_rate_limit rl)
{ {
struct qb_ipcs_connection *c; struct qb_ipcs_connection *c;
enum qb_loop_priority old_p = s->poll_priority; enum qb_loop_priority old_p = s->poll_priority;
struct qb_list_head *pos;
struct qb_list_head *n;
switch (rl) { switch (rl) {
case QB_IPCS_RATE_FAST: case QB_IPCS_RATE_FAST:
s->poll_priority = QB_LOOP_HIGH; s->poll_priority = QB_LOOP_HIGH;
break; break;
case QB_IPCS_RATE_SLOW: case QB_IPCS_RATE_SLOW:
case QB_IPCS_RATE_OFF: case QB_IPCS_RATE_OFF:
case QB_IPCS_RATE_OFF_2:
s->poll_priority = QB_LOOP_LOW; s->poll_priority = QB_LOOP_LOW;
break; break;
default: default:
case QB_IPCS_RATE_NORMAL: case QB_IPCS_RATE_NORMAL:
s->poll_priority = QB_LOOP_MED; s->poll_priority = QB_LOOP_MED;
break; break;
} }
qb_list_for_each_entry(c, &s->connections, list) { for (pos = s->connections.next, n = pos->next;
pos != &s->connections; pos = n, n = pos->next) {
c = qb_list_entry(pos, struct qb_ipcs_connection, list);
qb_ipcs_connection_ref(c); qb_ipcs_connection_ref(c);
qb_ipcs_flowcontrol_set(c, (rl == QB_IPCS_RATE_OFF)); if (rl == QB_IPCS_RATE_OFF) {
if (old_p == s->poll_priority) { qb_ipcs_flowcontrol_set(c, 1);
qb_ipcs_connection_unref(c); } else if (rl == QB_IPCS_RATE_OFF_2) {
continue; qb_ipcs_flowcontrol_set(c, 2);
} else {
qb_ipcs_flowcontrol_set(c, QB_FALSE);
}
if (old_p != s->poll_priority) {
(void)_modify_dispatch_descriptor_(c);
} }
(void)_modify_dispatch_descriptor_(c);
qb_ipcs_connection_unref(c); qb_ipcs_connection_unref(c);
} }
} }
void qb_ipcs_ref(struct qb_ipcs_service *s) void
qb_ipcs_ref(struct qb_ipcs_service *s)
{ {
qb_atomic_int_inc(&s->ref_count); qb_atomic_int_inc(&s->ref_count);
} }
void qb_ipcs_unref(struct qb_ipcs_service *s) void
qb_ipcs_unref(struct qb_ipcs_service *s)
{ {
int32_t free_it; int32_t free_it;
struct qb_ipcs_connection *c = NULL; struct qb_ipcs_connection *c = NULL;
struct qb_list_head *iter; struct qb_list_head *pos;
struct qb_list_head *iter_next; struct qb_list_head *n;
assert(s->ref_count > 0); assert(s->ref_count > 0);
free_it = qb_atomic_int_dec_and_test(&s->ref_count); free_it = qb_atomic_int_dec_and_test(&s->ref_count);
if (free_it) { if (free_it) {
qb_util_log(LOG_DEBUG, "%s() - destorying", __func__); qb_util_log(LOG_DEBUG, "%s() - destroying", __func__);
qb_list_for_each_safe(iter, iter_next, &s->connections) { for (pos = s->connections.next, n = pos->next;
c = qb_list_entry(iter, struct qb_ipcs_connection, l pos != &s->connections; pos = n, n = pos->next) {
ist); c = qb_list_entry(pos, struct qb_ipcs_connection, li
st);
if (c == NULL) { if (c == NULL) {
continue; continue;
} }
qb_ipcs_disconnect(c); qb_ipcs_disconnect(c);
} }
(void)qb_ipcs_us_withdraw(s); (void)qb_ipcs_us_withdraw(s);
free(s); free(s);
} }
} }
void qb_ipcs_destroy(struct qb_ipcs_service* s) void
qb_ipcs_destroy(struct qb_ipcs_service *s)
{ {
qb_ipcs_unref(s); qb_ipcs_unref(s);
} }
/* /*
* connection API * connection API
*/ */
ssize_t qb_ipcs_response_send(struct qb_ipcs_connection *c, const void *dat ssize_t
a, qb_ipcs_response_send(struct qb_ipcs_connection *c, const void *data,
size_t size) size_t size)
{ {
ssize_t res; ssize_t res;
if (c == NULL) {
return -EINVAL;
}
qb_ipcs_connection_ref(c); qb_ipcs_connection_ref(c);
res = c->service->funcs.send(&c->response, data, size); res = c->service->funcs.send(&c->response, data, size);
if (res == size) { if (res == size) {
c->stats.responses++; c->stats.responses++;
} else if (res == -EAGAIN || res == -ETIMEDOUT) { } else if (res == -EAGAIN || res == -ETIMEDOUT) {
c->stats.send_retries++; c->stats.send_retries++;
} }
qb_ipcs_connection_unref(c); qb_ipcs_connection_unref(c);
return res; return res;
} }
ssize_t qb_ipcs_response_sendv(struct qb_ipcs_connection *c, const struct i ssize_t
ovec * iov, size_t iov_len) qb_ipcs_response_sendv(struct qb_ipcs_connection * c, const struct iovec *
iov,
size_t iov_len)
{ {
ssize_t res; ssize_t res;
if (c == NULL) {
return -EINVAL;
}
qb_ipcs_connection_ref(c); qb_ipcs_connection_ref(c);
res = c->service->funcs.sendv(&c->response, iov, iov_len); res = c->service->funcs.sendv(&c->response, iov, iov_len);
if (res > 0) { if (res > 0) {
c->stats.responses++; c->stats.responses++;
} else if (res == -EAGAIN || res == -ETIMEDOUT) { } else if (res == -EAGAIN || res == -ETIMEDOUT) {
c->stats.send_retries++; c->stats.send_retries++;
} }
qb_ipcs_connection_unref(c); qb_ipcs_connection_unref(c);
return res; return res;
} }
static int32_t send_event_notification(int32_t fd, int32_t revents, void *d static int32_t
ata) send_event_notification(int32_t fd, int32_t revents, void *data)
{ {
ssize_t res = 0; ssize_t res = 0;
struct qb_ipcs_connection *c = data; struct qb_ipcs_connection *c = data;
if (c->outstanding_notifiers > 0) { if (c->outstanding_notifiers > 0) {
res = qb_ipc_us_send(&c->setup, data, c->outstanding_notifie rs); res = qb_ipc_us_send(&c->setup, data, c->outstanding_notifie rs);
} }
if (res > 0) { if (res > 0) {
c->outstanding_notifiers -= res; c->outstanding_notifiers -= res;
} }
if (c->outstanding_notifiers > 0) { if (c->outstanding_notifiers > 0) {
return 0; return 0;
} else { } else {
c->outstanding_notifiers = 0; c->outstanding_notifiers = 0;
c->poll_events = POLLIN | POLLPRI | POLLNVAL, c->poll_events = POLLIN | POLLPRI | POLLNVAL,
(void)_modify_dispatch_descriptor_(c); (void)_modify_dispatch_descriptor_(c);
} }
return 0; return 0;
} }
ssize_t qb_ipcs_event_send(struct qb_ipcs_connection * c, const void *data, ssize_t
size_t size) qb_ipcs_event_send(struct qb_ipcs_connection * c, const void *data, size_t
size)
{ {
ssize_t res; ssize_t res;
ssize_t res2 = 0; ssize_t res2 = 0;
if (c == NULL) {
return -EINVAL;
}
qb_ipcs_connection_ref(c); qb_ipcs_connection_ref(c);
res = c->service->funcs.send(&c->event, data, size); res = c->service->funcs.send(&c->event, data, size);
if (res != size) { if (res != size) {
goto deref_and_return; goto deref_and_return;
} }
c->stats.events++; c->stats.events++;
if (c->service->needs_sock_for_poll) { if (c->service->needs_sock_for_poll) {
if (c->outstanding_notifiers > 0) { if (c->outstanding_notifiers > 0) {
c->outstanding_notifiers++; c->outstanding_notifiers++;
} else { } else {
res2 = qb_ipc_us_send(&c->setup, data, 1); res2 = qb_ipc_us_send(&c->setup, data, 1);
if (res2 == 1) { if (res2 == 1) {
goto deref_and_return; goto deref_and_return;
} }
/* /*
* notify the client later, when we can. * notify the client later, when we can.
*/ */
c->outstanding_notifiers++; c->outstanding_notifiers++;
c->poll_events = POLLOUT | POLLIN | POLLPRI | POLLNV AL, c->poll_events = POLLOUT | POLLIN | POLLPRI | POLLNV AL,
(void)_modify_dispatch_descriptor_(c); (void)_modify_dispatch_descriptor_(c);
} }
} }
deref_and_return: deref_and_return:
qb_ipcs_connection_unref(c); qb_ipcs_connection_unref(c);
return res; return res;
} }
ssize_t qb_ipcs_event_sendv(struct qb_ipcs_connection * c, ssize_t
const struct iovec * iov, size_t iov_len) qb_ipcs_event_sendv(struct qb_ipcs_connection * c,
const struct iovec * iov, size_t iov_len)
{ {
ssize_t res; ssize_t res;
ssize_t res2; ssize_t res2;
if (c == NULL) {
return -EINVAL;
}
qb_ipcs_connection_ref(c); qb_ipcs_connection_ref(c);
res = c->service->funcs.sendv(&c->event, iov, iov_len); res = c->service->funcs.sendv(&c->event, iov, iov_len);
if (res < 0) { if (res < 0) {
goto deref_and_return; goto deref_and_return;
} }
c->stats.events++; c->stats.events++;
if (c->service->needs_sock_for_poll) { if (c->service->needs_sock_for_poll) {
if (c->outstanding_notifiers > 0) { if (c->outstanding_notifiers > 0) {
c->outstanding_notifiers++; c->outstanding_notifiers++;
} else { } else {
res2 = qb_ipc_us_send(&c->setup, &res, 1); res2 = qb_ipc_us_send(&c->setup, &res, 1);
if (res2 == 1) { if (res2 == 1) {
goto deref_and_return; goto deref_and_return;
} }
/* /*
* notify the client later, when we can. * notify the client later, when we can.
*/ */
c->outstanding_notifiers++; c->outstanding_notifiers++;
c->poll_events = POLLOUT | POLLIN | POLLPRI | POLLNV AL, c->poll_events = POLLOUT | POLLIN | POLLPRI | POLLNV AL,
(void)_modify_dispatch_descriptor_(c); (void)_modify_dispatch_descriptor_(c);
} }
} }
deref_and_return: deref_and_return:
qb_ipcs_connection_unref(c); qb_ipcs_connection_unref(c);
return res; return res;
} }
qb_ipcs_connection_t *qb_ipcs_connection_first_get(struct qb_ipcs_service * qb_ipcs_connection_t *
s) qb_ipcs_connection_first_get(struct qb_ipcs_service * s)
{ {
struct qb_ipcs_connection *c; struct qb_ipcs_connection *c;
struct qb_list_head *iter; struct qb_list_head *iter;
if (qb_list_empty(&s->connections)) { if (qb_list_empty(&s->connections)) {
return NULL; return NULL;
} }
iter = s->connections.next; iter = s->connections.next;
c = qb_list_entry(iter, struct qb_ipcs_connection, list); c = qb_list_entry(iter, struct qb_ipcs_connection, list);
qb_ipcs_connection_ref(c); qb_ipcs_connection_ref(c);
return c; return c;
} }
qb_ipcs_connection_t * qb_ipcs_connection_next_get(struct qb_ipcs_service* qb_ipcs_connection_t *
s, qb_ipcs_connection_next_get(struct qb_ipcs_service * s,
struct qb_ipcs_connection struct qb_ipcs_connection * current)
*current)
{ {
struct qb_ipcs_connection *c; struct qb_ipcs_connection *c;
struct qb_list_head *iter; struct qb_list_head *iter;
if (current->list.next == &s->connections) { if (current == NULL || current->list.next == &s->connections) {
return NULL; return NULL;
} }
iter = current->list.next; iter = current->list.next;
c = qb_list_entry(iter, struct qb_ipcs_connection, list); c = qb_list_entry(iter, struct qb_ipcs_connection, list);
qb_ipcs_connection_ref(c); qb_ipcs_connection_ref(c);
return c; return c;
} }
int32_t qb_ipcs_service_id_get(struct qb_ipcs_connection *c) int32_t
qb_ipcs_service_id_get(struct qb_ipcs_connection * c)
{ {
if (c == NULL) {
return -EINVAL;
}
return c->service->service_id; return c->service->service_id;
} }
struct qb_ipcs_connection *qb_ipcs_connection_alloc(struct qb_ipcs_service struct qb_ipcs_connection *
*s) qb_ipcs_connection_alloc(struct qb_ipcs_service *s)
{ {
struct qb_ipcs_connection *c = calloc(1, sizeof(struct qb_ipcs_conne struct qb_ipcs_connection *c =
ction)); calloc(1, sizeof(struct qb_ipcs_connection));
if (c == NULL) {
return NULL;
}
c->refcount = 1; c->refcount = 1;
c->service = s; c->service = s;
c->pid = 0; c->pid = 0;
c->euid = -1; c->euid = -1;
c->egid = -1; c->egid = -1;
qb_list_init(&c->list); qb_list_init(&c->list);
c->receive_buf = NULL; c->receive_buf = NULL;
c->context = NULL; c->context = NULL;
c->fc_enabled = QB_FALSE; c->fc_enabled = QB_FALSE;
c->state = QB_IPCS_CONNECTION_INACTIVE; c->state = QB_IPCS_CONNECTION_INACTIVE;
c->poll_events = POLLIN | POLLPRI | POLLNVAL; c->poll_events = POLLIN | POLLPRI | POLLNVAL;
return c; return c;
} }
void qb_ipcs_connection_ref(struct qb_ipcs_connection *c) void
qb_ipcs_connection_ref(struct qb_ipcs_connection *c)
{ {
qb_atomic_int_inc(&c->refcount); if (c) {
qb_atomic_int_inc(&c->refcount);
}
} }
void qb_ipcs_connection_unref(struct qb_ipcs_connection *c) void
qb_ipcs_connection_unref(struct qb_ipcs_connection *c)
{ {
int32_t free_it; int32_t free_it;
if (c == NULL) {
return;
}
if (c->refcount < 1) { if (c->refcount < 1) {
qb_util_log(LOG_ERR, "%s() ref:%d state:%d fd:%d", qb_util_log(LOG_ERR, "%s() ref:%d state:%d fd:%d",
__func__, c->refcount, c->state, c->setup.u.us.s __func__, c->refcount, c->state,
ock); c->setup.u.us.sock);
assert(0); assert(0);
} }
free_it = qb_atomic_int_dec_and_test(&c->refcount); free_it = qb_atomic_int_dec_and_test(&c->refcount);
if (free_it) { if (free_it) {
qb_list_del(&c->list); qb_list_del(&c->list);
if (c->service->serv_fns.connection_destroyed) { if (c->service->serv_fns.connection_destroyed) {
c->service->serv_fns.connection_destroyed(c); c->service->serv_fns.connection_destroyed(c);
} }
c->service->funcs.disconnect(c); c->service->funcs.disconnect(c);
if (c->receive_buf) { free(c->receive_buf);
free(c->receive_buf);
}
free(c); free(c);
} }
} }
void qb_ipcs_disconnect(struct qb_ipcs_connection *c) void
qb_ipcs_disconnect(struct qb_ipcs_connection *c)
{ {
int32_t res = 0; int32_t res = 0;
qb_loop_job_dispatch_fn rerun_job; qb_loop_job_dispatch_fn rerun_job;
if (c == NULL) {
return;
}
qb_util_log(LOG_DEBUG, "%s() state:%d", __func__, c->state); qb_util_log(LOG_DEBUG, "%s() state:%d", __func__, c->state);
if (c->state == QB_IPCS_CONNECTION_ACTIVE) { if (c->state == QB_IPCS_CONNECTION_ACTIVE) {
if (c->service->needs_sock_for_poll && c->state = QB_IPCS_CONNECTION_INACTIVE;
c->setup.u.us.sock > 0) { c->service->stats.closed_connections++;
if (c->service->needs_sock_for_poll && c->setup.u.us.sock >
0) {
(void)c->service->poll_fns.dispatch_del(c->setup.u.u
s.sock);
qb_ipcc_us_sock_close(c->setup.u.us.sock); qb_ipcc_us_sock_close(c->setup.u.us.sock);
c->setup.u.us.sock = -1; c->setup.u.us.sock = -1;
qb_ipcs_connection_unref(c); qb_ipcs_connection_unref(c);
} }
c->state = QB_IPCS_CONNECTION_DOWN; /* return early as it's an incomplete connection.
*/
return;
}
if (c->state == QB_IPCS_CONNECTION_ESTABLISHED) {
c->state = QB_IPCS_CONNECTION_SHUTTING_DOWN;
c->service->stats.active_connections--; c->service->stats.active_connections--;
c->service->stats.closed_connections++; c->service->stats.closed_connections++;
if (c->service->needs_sock_for_poll && c->setup.u.us.sock >
0) {
(void)c->service->poll_fns.dispatch_del(c->setup.u.u
s.sock);
qb_ipcc_us_sock_close(c->setup.u.us.sock);
c->setup.u.us.sock = -1;
qb_ipcs_connection_unref(c);
}
} }
if (c->state == QB_IPCS_CONNECTION_DOWN) { if (c->state == QB_IPCS_CONNECTION_SHUTTING_DOWN) {
res = 0; res = 0;
if (c->service->serv_fns.connection_closed) { if (c->service->serv_fns.connection_closed) {
res = c->service->serv_fns.connection_closed(c); res = c->service->serv_fns.connection_closed(c);
} }
if (res == 0) { if (res == 0) {
qb_ipcs_connection_unref(c); qb_ipcs_connection_unref(c);
} else { } else {
/* ok, so they want the connection_closedd() /* OK, so they want the connection_closed
* function re-run */ * function re-run */
rerun_job = (qb_loop_job_dispatch_fn)qb_ipcs_disconn rerun_job =
ect; (qb_loop_job_dispatch_fn) qb_ipcs_disconnect;
res = c->service->poll_fns.job_add(QB_LOOP_LOW, c, res = c->service->poll_fns.job_add(QB_LOOP_LOW,
c,
rerun_job); rerun_job);
if (res != 0) { if (res != 0) {
/* last ditch attempt to cleanup */ /* last ditch attempt to cleanup */
qb_ipcs_connection_unref(c); qb_ipcs_connection_unref(c);
} }
} }
} }
} }
static void qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c, int32_t f static void
c_enable) qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c, int32_t fc_enable)
{ {
if (c == NULL) {
return;
}
if (c->fc_enabled != fc_enable) { if (c->fc_enabled != fc_enable) {
c->service->funcs.fc_set(&c->request, fc_enable); c->service->funcs.fc_set(&c->request, fc_enable);
c->fc_enabled = fc_enable; c->fc_enabled = fc_enable;
c->stats.flow_control_state = fc_enable; c->stats.flow_control_state = fc_enable;
c->stats.flow_control_count++; c->stats.flow_control_count++;
} }
} }
static int32_t _process_request_(struct qb_ipcs_connection *c, static int32_t
int32_t ms_timeout) _process_request_(struct qb_ipcs_connection *c, int32_t ms_timeout)
{ {
int32_t res = 0; int32_t res = 0;
ssize_t size; ssize_t size;
struct qb_ipc_request_header *hdr; struct qb_ipc_request_header *hdr;
qb_ipcs_connection_ref(c); qb_ipcs_connection_ref(c);
if (c->service->funcs.peek && c->service->funcs.reclaim) { if (c->service->funcs.peek && c->service->funcs.reclaim) {
size = c->service->funcs.peek(&c->request, (void**)&hdr, size = c->service->funcs.peek(&c->request, (void **)&hdr,
ms_timeout); ms_timeout);
} else { } else {
hdr = (struct qb_ipc_request_header *)c->receive_buf; hdr = c->receive_buf;
size = c->service->funcs.recv(&c->request, hdr, c->request.m size = c->service->funcs.recv(&c->request,
ax_msg_size, hdr,
c->request.max_msg_size,
ms_timeout); ms_timeout);
} }
if (size < 0) { if (size < 0) {
if (size != -EAGAIN && size != -ETIMEDOUT) { if (size != -EAGAIN && size != -ETIMEDOUT) {
qb_util_log(LOG_ERR, "%s(): %s", __func__, strerror( qb_util_perror(LOG_ERR,
-res)); "recv from client connection failed")
;
} else { } else {
c->stats.recv_retries++; c->stats.recv_retries++;
} }
res = size; res = size;
goto cleanup; goto cleanup;
} }
c->stats.requests++; c->stats.requests++;
if (hdr->id == QB_IPC_MSG_DISCONNECT) { if (hdr->id == QB_IPC_MSG_DISCONNECT) {
qb_util_log(LOG_DEBUG, "%s() QB_IPC_MSG_DISCONNECT", __func_ _); qb_util_log(LOG_DEBUG, "client requesting a disconnect");
qb_ipcs_disconnect(c); qb_ipcs_disconnect(c);
res = -ESHUTDOWN; res = -ESHUTDOWN;
} else { } else {
res = c->service->serv_fns.msg_process(c, hdr, hdr->size); res = c->service->serv_fns.msg_process(c, hdr, hdr->size);
/* 0 == good, negitive == backoff */ /* 0 == good, negative == backoff */
if (res < 0) { if (res < 0) {
res = -ENOBUFS; res = -ENOBUFS;
} else { } else {
res = size; res = size;
} }
} }
if (c->service->funcs.peek && c->service->funcs.reclaim) { if (c->service->funcs.peek && c->service->funcs.reclaim) {
c->service->funcs.reclaim(&c->request); c->service->funcs.reclaim(&c->request);
} }
cleanup: cleanup:
qb_ipcs_connection_unref(c); qb_ipcs_connection_unref(c);
return res; return res;
} }
#define IPC_REQUEST_TIMEOUT 10 #define IPC_REQUEST_TIMEOUT 10
#define MAX_RECV_MSGS 50 #define MAX_RECV_MSGS 50
int32_t qb_ipcs_dispatch_service_request(int32_t fd, int32_t revents, int32_t
void *data) qb_ipcs_dispatch_service_request(int32_t fd, int32_t revents, void *data)
{ {
int32_t res = _process_request_((struct qb_ipcs_connection *)data, int32_t res = _process_request_((struct qb_ipcs_connection *)data,
IPC_REQUEST_TIMEOUT); IPC_REQUEST_TIMEOUT);
if (res > 0) { if (res > 0) {
return 0; return 0;
} }
return res; return res;
} }
static ssize_t _request_q_len_get(struct qb_ipcs_connection *c) static ssize_t
_request_q_len_get(struct qb_ipcs_connection *c)
{ {
ssize_t q_len; ssize_t q_len;
if (c->service->funcs.q_len_get) { if (c->service->funcs.q_len_get) {
q_len = c->service->funcs.q_len_get(&c->request); q_len = c->service->funcs.q_len_get(&c->request);
if (q_len < 0) { if (q_len < 0) {
return q_len; return q_len;
} }
q_len = QB_MIN(q_len, MAX_RECV_MSGS); q_len = QB_MIN(q_len, MAX_RECV_MSGS);
if (c->service->poll_priority == QB_LOOP_MED) if (c->service->poll_priority == QB_LOOP_MED)
q_len = QB_MIN(q_len, 5); q_len = QB_MIN(q_len, 5);
if (c->service->poll_priority == QB_LOOP_LOW) if (c->service->poll_priority == QB_LOOP_LOW)
q_len = 1; q_len = 1;
} else { } else {
q_len = 1; q_len = 1;
} }
return q_len; return q_len;
} }
int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, int32_t
void *data) qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, void *data
)
{ {
struct qb_ipcs_connection *c = (struct qb_ipcs_connection *)data; struct qb_ipcs_connection *c = (struct qb_ipcs_connection *)data;
char bytes[MAX_RECV_MSGS]; char bytes[MAX_RECV_MSGS];
int32_t res; int32_t res;
int32_t recvd = 0; int32_t recvd = 0;
ssize_t avail; ssize_t avail;
if (revents & POLLHUP) { if (revents & POLLHUP) {
qb_util_log(LOG_DEBUG, "%s HUP conn:%p fd:%d", __func__, c, fd); qb_util_log(LOG_DEBUG, "%s HUP conn:%p fd:%d", __func__, c, fd);
qb_ipcs_disconnect(c); qb_ipcs_disconnect(c);
skipping to change at line 602 skipping to change at line 700
if (c->service->needs_sock_for_poll && recvd > 0) { if (c->service->needs_sock_for_poll && recvd > 0) {
(void)qb_ipc_us_recv(&c->setup, bytes, recvd, -1); (void)qb_ipc_us_recv(&c->setup, bytes, recvd, -1);
} }
res = QB_MIN(0, res); res = QB_MIN(0, res);
if (res == -EAGAIN || res == -ETIMEDOUT || res == -ENOBUFS) { if (res == -EAGAIN || res == -ETIMEDOUT || res == -ENOBUFS) {
res = 0; res = 0;
} }
if (res != 0) { if (res != 0) {
qb_util_log(LOG_DEBUG, "%s returning %d : %s", qb_util_perror(LOG_DEBUG, "request returned error");
__func__, res, strerror(-res));
qb_ipcs_connection_unref(c); qb_ipcs_connection_unref(c);
} }
return res; return res;
} }
void qb_ipcs_context_set(struct qb_ipcs_connection *c, void *context) void
qb_ipcs_context_set(struct qb_ipcs_connection *c, void *context)
{ {
if (c == NULL) {
return;
}
c->context = context; c->context = context;
} }
void *qb_ipcs_context_get(struct qb_ipcs_connection *c) void *
qb_ipcs_context_get(struct qb_ipcs_connection *c)
{ {
if (c == NULL) {
return NULL;
}
return c->context; return c->context;
} }
int32_t qb_ipcs_connection_stats_get(qb_ipcs_connection_t *c, int32_t
struct qb_ipcs_connection_stats* stats, qb_ipcs_connection_stats_get(qb_ipcs_connection_t * c,
int32_t clear_after_read) struct qb_ipcs_connection_stats * stats,
int32_t clear_after_read)
{ {
if (c == NULL) {
return -EINVAL;
}
memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats)); memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats));
if (clear_after_read) { if (clear_after_read) {
memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats) ); memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats) );
c->stats.client_pid = c->pid; c->stats.client_pid = c->pid;
} }
return 0; return 0;
} }
int32_t qb_ipcs_stats_get(struct qb_ipcs_service* s, int32_t
struct qb_ipcs_stats* stats, qb_ipcs_stats_get(struct qb_ipcs_service * s,
int32_t clear_after_read) struct qb_ipcs_stats * stats, int32_t clear_after_read)
{ {
if (s == NULL) {
return -EINVAL;
}
memcpy(stats, &s->stats, sizeof(struct qb_ipcs_stats)); memcpy(stats, &s->stats, sizeof(struct qb_ipcs_stats));
if (clear_after_read) { if (clear_after_read) {
memset(&s->stats, 0, sizeof(struct qb_ipcs_stats)); memset(&s->stats, 0, sizeof(struct qb_ipcs_stats));
} }
return 0; return 0;
} }
 End of changes. 81 change blocks. 
120 lines changed or deleted 224 lines changed or added

This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/