ipcs.c | ipcs.c | |||
---|---|---|---|---|
skipping to change at line 34 | skipping to change at line 34 | |||
#include "ipc_int.h" | #include "ipc_int.h" | |||
#include <qb/qbdefs.h> | #include <qb/qbdefs.h> | |||
#include <qb/qbatomic.h> | #include <qb/qbatomic.h> | |||
#include <qb/qbipcs.h> | #include <qb/qbipcs.h> | |||
static void qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c, | static void qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c, | |||
int32_t fc_enable); | int32_t fc_enable); | |||
static QB_LIST_DECLARE(qb_ipc_services); | static QB_LIST_DECLARE(qb_ipc_services); | |||
qb_ipcs_service_t* qb_ipcs_create(const char *name, | qb_ipcs_service_t * | |||
int32_t service_id, | qb_ipcs_create(const char *name, | |||
enum qb_ipc_type type, | int32_t service_id, | |||
struct qb_ipcs_service_handlers *handlers) | enum qb_ipc_type type, struct qb_ipcs_service_handlers *handl | |||
ers) | ||||
{ | { | |||
struct qb_ipcs_service *s; | struct qb_ipcs_service *s; | |||
s = calloc(1, sizeof(struct qb_ipcs_service)); | s = calloc(1, sizeof(struct qb_ipcs_service)); | |||
if (s == NULL) { | ||||
return NULL; | ||||
} | ||||
s->pid = getpid(); | s->pid = getpid(); | |||
s->type = type; | s->type = type; | |||
s->needs_sock_for_poll = QB_FALSE; | s->needs_sock_for_poll = QB_FALSE; | |||
s->poll_priority = QB_LOOP_MED; | s->poll_priority = QB_LOOP_MED; | |||
s->ref_count = 1; | s->ref_count = 1; | |||
s->service_id = service_id; | s->service_id = service_id; | |||
strncpy(s->name, name, NAME_MAX); | strncpy(s->name, name, NAME_MAX); | |||
skipping to change at line 65 | skipping to change at line 68 | |||
s->serv_fns.connection_closed = handlers->connection_closed; | s->serv_fns.connection_closed = handlers->connection_closed; | |||
s->serv_fns.connection_destroyed = handlers->connection_destroyed; | s->serv_fns.connection_destroyed = handlers->connection_destroyed; | |||
qb_list_init(&s->connections); | qb_list_init(&s->connections); | |||
qb_list_init(&s->list); | qb_list_init(&s->list); | |||
qb_list_add(&s->list, &qb_ipc_services); | qb_list_add(&s->list, &qb_ipc_services); | |||
return s; | return s; | |||
} | } | |||
void qb_ipcs_poll_handlers_set(struct qb_ipcs_service* s, | void | |||
struct qb_ipcs_poll_handlers *handlers) | qb_ipcs_poll_handlers_set(struct qb_ipcs_service *s, | |||
struct qb_ipcs_poll_handlers *handlers) | ||||
{ | { | |||
s->poll_fns.job_add = handlers->job_add; | s->poll_fns.job_add = handlers->job_add; | |||
s->poll_fns.dispatch_add = handlers->dispatch_add; | s->poll_fns.dispatch_add = handlers->dispatch_add; | |||
s->poll_fns.dispatch_mod = handlers->dispatch_mod; | s->poll_fns.dispatch_mod = handlers->dispatch_mod; | |||
s->poll_fns.dispatch_del = handlers->dispatch_del; | s->poll_fns.dispatch_del = handlers->dispatch_del; | |||
} | } | |||
int32_t qb_ipcs_run(struct qb_ipcs_service* s) | int32_t | |||
qb_ipcs_run(struct qb_ipcs_service *s) | ||||
{ | { | |||
int32_t res; | int32_t res = 0; | |||
if (s->poll_fns.dispatch_add == NULL || | ||||
s->poll_fns.dispatch_mod == NULL || | ||||
s->poll_fns.dispatch_del == NULL) { | ||||
return -EINVAL; | ||||
} | ||||
switch (s->type) { | switch (s->type) { | |||
case QB_IPC_SOCKET: | case QB_IPC_SOCKET: | |||
qb_ipcs_us_init((struct qb_ipcs_service *)s); | qb_ipcs_us_init((struct qb_ipcs_service *)s); | |||
break; | break; | |||
case QB_IPC_SHM: | case QB_IPC_SHM: | |||
qb_ipcs_shm_init((struct qb_ipcs_service *)s); | qb_ipcs_shm_init((struct qb_ipcs_service *)s); | |||
break; | break; | |||
case QB_IPC_POSIX_MQ: | case QB_IPC_POSIX_MQ: | |||
#ifdef HAVE_POSIX_MQ | ||||
qb_ipcs_pmq_init((struct qb_ipcs_service *)s); | qb_ipcs_pmq_init((struct qb_ipcs_service *)s); | |||
#else | ||||
res = -ENOTSUP; | ||||
#endif /* HAVE_POSIX_MQ */ | ||||
break; | break; | |||
case QB_IPC_SYSV_MQ: | case QB_IPC_SYSV_MQ: | |||
#ifdef HAVE_SYSV_MQ | ||||
qb_ipcs_smq_init((struct qb_ipcs_service *)s); | qb_ipcs_smq_init((struct qb_ipcs_service *)s); | |||
#else | ||||
res = -ENOTSUP; | ||||
#endif /* HAVE_SYSV_MQ */ | ||||
break; | break; | |||
default: | default: | |||
res = -EINVAL; | res = -EINVAL; | |||
break; | break; | |||
} | } | |||
res = qb_ipcs_us_publish(s); | ||||
if (res < 0) { | if (res < 0) { | |||
qb_ipcs_unref(s); | qb_ipcs_unref(s); | |||
return res; | return res; | |||
} | } | |||
res = qb_ipcs_us_publish(s); | ||||
if (res < 0) { | if (res < 0) { | |||
(void)qb_ipcs_us_withdraw(s); | (void)qb_ipcs_us_withdraw(s); | |||
qb_ipcs_unref(s); | ||||
return res; | ||||
} | } | |||
return res; | return res; | |||
} | } | |||
static int32_t _modify_dispatch_descriptor_(struct qb_ipcs_connection *c) | static int32_t | |||
_modify_dispatch_descriptor_(struct qb_ipcs_connection *c) | ||||
{ | { | |||
qb_ipcs_dispatch_mod_fn disp_mod = c->service->poll_fns.dispatch_mod | ||||
; | ||||
if (c->service->type == QB_IPC_POSIX_MQ | if (c->service->type == QB_IPC_POSIX_MQ | |||
&& !c->service->needs_sock_for_poll) { | && !c->service->needs_sock_for_poll) { | |||
return c->service->poll_fns.dispatch_mod(c->service-> | #ifdef HAVE_MQUEUE_H | |||
poll_priority, | return disp_mod(c->service->poll_priority, | |||
(int32_t) c->reques | (int32_t) c->request.u.pmq.q, | |||
t.u. | c->poll_events, c, | |||
pmq.q, c->poll_even | qb_ipcs_dispatch_service_request); | |||
ts, | #endif /* HAVE_MQUEUE_H */ | |||
c, | ||||
qb_ipcs_dispatch_se | ||||
rvice_request); | ||||
} else if (c->service->type == QB_IPC_SOCKET) { | } else if (c->service->type == QB_IPC_SOCKET) { | |||
return c->service->poll_fns.dispatch_mod(c->service-> | return disp_mod(c->service->poll_priority, | |||
poll_priority, | c->event.u.us.sock, | |||
c->event.u.us.sock, | c->poll_events, c, | |||
c->poll_events, c, | qb_ipcs_dispatch_connection_request); | |||
qb_ipcs_dispatch_co | ||||
nnection_request); | ||||
} else { | } else { | |||
return c->service->poll_fns.dispatch_mod(c->service-> | return disp_mod(c->service->poll_priority, | |||
poll_priority, | c->setup.u.us.sock, | |||
c->setup.u.us.sock, | c->poll_events, c, | |||
c->poll_events, c, | qb_ipcs_dispatch_connection_request); | |||
qb_ipcs_dispatch_co | ||||
nnection_request); | ||||
} | } | |||
return -EINVAL; | return -EINVAL; | |||
} | } | |||
void qb_ipcs_request_rate_limit(struct qb_ipcs_service *s, | void | |||
enum qb_ipcs_rate_limit rl) | qb_ipcs_request_rate_limit(struct qb_ipcs_service *s, | |||
enum qb_ipcs_rate_limit rl) | ||||
{ | { | |||
struct qb_ipcs_connection *c; | struct qb_ipcs_connection *c; | |||
enum qb_loop_priority old_p = s->poll_priority; | enum qb_loop_priority old_p = s->poll_priority; | |||
struct qb_list_head *pos; | ||||
struct qb_list_head *n; | ||||
switch (rl) { | switch (rl) { | |||
case QB_IPCS_RATE_FAST: | case QB_IPCS_RATE_FAST: | |||
s->poll_priority = QB_LOOP_HIGH; | s->poll_priority = QB_LOOP_HIGH; | |||
break; | break; | |||
case QB_IPCS_RATE_SLOW: | case QB_IPCS_RATE_SLOW: | |||
case QB_IPCS_RATE_OFF: | case QB_IPCS_RATE_OFF: | |||
case QB_IPCS_RATE_OFF_2: | ||||
s->poll_priority = QB_LOOP_LOW; | s->poll_priority = QB_LOOP_LOW; | |||
break; | break; | |||
default: | default: | |||
case QB_IPCS_RATE_NORMAL: | case QB_IPCS_RATE_NORMAL: | |||
s->poll_priority = QB_LOOP_MED; | s->poll_priority = QB_LOOP_MED; | |||
break; | break; | |||
} | } | |||
qb_list_for_each_entry(c, &s->connections, list) { | for (pos = s->connections.next, n = pos->next; | |||
pos != &s->connections; pos = n, n = pos->next) { | ||||
c = qb_list_entry(pos, struct qb_ipcs_connection, list); | ||||
qb_ipcs_connection_ref(c); | qb_ipcs_connection_ref(c); | |||
qb_ipcs_flowcontrol_set(c, (rl == QB_IPCS_RATE_OFF)); | if (rl == QB_IPCS_RATE_OFF) { | |||
if (old_p == s->poll_priority) { | qb_ipcs_flowcontrol_set(c, 1); | |||
qb_ipcs_connection_unref(c); | } else if (rl == QB_IPCS_RATE_OFF_2) { | |||
continue; | qb_ipcs_flowcontrol_set(c, 2); | |||
} else { | ||||
qb_ipcs_flowcontrol_set(c, QB_FALSE); | ||||
} | ||||
if (old_p != s->poll_priority) { | ||||
(void)_modify_dispatch_descriptor_(c); | ||||
} | } | |||
(void)_modify_dispatch_descriptor_(c); | ||||
qb_ipcs_connection_unref(c); | qb_ipcs_connection_unref(c); | |||
} | } | |||
} | } | |||
void qb_ipcs_ref(struct qb_ipcs_service *s) | void | |||
qb_ipcs_ref(struct qb_ipcs_service *s) | ||||
{ | { | |||
qb_atomic_int_inc(&s->ref_count); | qb_atomic_int_inc(&s->ref_count); | |||
} | } | |||
void qb_ipcs_unref(struct qb_ipcs_service *s) | void | |||
qb_ipcs_unref(struct qb_ipcs_service *s) | ||||
{ | { | |||
int32_t free_it; | int32_t free_it; | |||
struct qb_ipcs_connection *c = NULL; | struct qb_ipcs_connection *c = NULL; | |||
struct qb_list_head *iter; | struct qb_list_head *pos; | |||
struct qb_list_head *iter_next; | struct qb_list_head *n; | |||
assert(s->ref_count > 0); | assert(s->ref_count > 0); | |||
free_it = qb_atomic_int_dec_and_test(&s->ref_count); | free_it = qb_atomic_int_dec_and_test(&s->ref_count); | |||
if (free_it) { | if (free_it) { | |||
qb_util_log(LOG_DEBUG, "%s() - destorying", __func__); | qb_util_log(LOG_DEBUG, "%s() - destroying", __func__); | |||
qb_list_for_each_safe(iter, iter_next, &s->connections) { | for (pos = s->connections.next, n = pos->next; | |||
c = qb_list_entry(iter, struct qb_ipcs_connection, l | pos != &s->connections; pos = n, n = pos->next) { | |||
ist); | c = qb_list_entry(pos, struct qb_ipcs_connection, li | |||
st); | ||||
if (c == NULL) { | if (c == NULL) { | |||
continue; | continue; | |||
} | } | |||
qb_ipcs_disconnect(c); | qb_ipcs_disconnect(c); | |||
} | } | |||
(void)qb_ipcs_us_withdraw(s); | (void)qb_ipcs_us_withdraw(s); | |||
free(s); | free(s); | |||
} | } | |||
} | } | |||
void qb_ipcs_destroy(struct qb_ipcs_service* s) | void | |||
qb_ipcs_destroy(struct qb_ipcs_service *s) | ||||
{ | { | |||
qb_ipcs_unref(s); | qb_ipcs_unref(s); | |||
} | } | |||
/* | /* | |||
* connection API | * connection API | |||
*/ | */ | |||
ssize_t qb_ipcs_response_send(struct qb_ipcs_connection *c, const void *dat | ssize_t | |||
a, | qb_ipcs_response_send(struct qb_ipcs_connection *c, const void *data, | |||
size_t size) | size_t size) | |||
{ | { | |||
ssize_t res; | ssize_t res; | |||
if (c == NULL) { | ||||
return -EINVAL; | ||||
} | ||||
qb_ipcs_connection_ref(c); | qb_ipcs_connection_ref(c); | |||
res = c->service->funcs.send(&c->response, data, size); | res = c->service->funcs.send(&c->response, data, size); | |||
if (res == size) { | if (res == size) { | |||
c->stats.responses++; | c->stats.responses++; | |||
} else if (res == -EAGAIN || res == -ETIMEDOUT) { | } else if (res == -EAGAIN || res == -ETIMEDOUT) { | |||
c->stats.send_retries++; | c->stats.send_retries++; | |||
} | } | |||
qb_ipcs_connection_unref(c); | qb_ipcs_connection_unref(c); | |||
return res; | return res; | |||
} | } | |||
ssize_t qb_ipcs_response_sendv(struct qb_ipcs_connection *c, const struct i | ssize_t | |||
ovec * iov, size_t iov_len) | qb_ipcs_response_sendv(struct qb_ipcs_connection * c, const struct iovec * | |||
iov, | ||||
size_t iov_len) | ||||
{ | { | |||
ssize_t res; | ssize_t res; | |||
if (c == NULL) { | ||||
return -EINVAL; | ||||
} | ||||
qb_ipcs_connection_ref(c); | qb_ipcs_connection_ref(c); | |||
res = c->service->funcs.sendv(&c->response, iov, iov_len); | res = c->service->funcs.sendv(&c->response, iov, iov_len); | |||
if (res > 0) { | if (res > 0) { | |||
c->stats.responses++; | c->stats.responses++; | |||
} else if (res == -EAGAIN || res == -ETIMEDOUT) { | } else if (res == -EAGAIN || res == -ETIMEDOUT) { | |||
c->stats.send_retries++; | c->stats.send_retries++; | |||
} | } | |||
qb_ipcs_connection_unref(c); | qb_ipcs_connection_unref(c); | |||
return res; | return res; | |||
} | } | |||
static int32_t send_event_notification(int32_t fd, int32_t revents, void *d | static int32_t | |||
ata) | send_event_notification(int32_t fd, int32_t revents, void *data) | |||
{ | { | |||
ssize_t res = 0; | ssize_t res = 0; | |||
struct qb_ipcs_connection *c = data; | struct qb_ipcs_connection *c = data; | |||
if (c->outstanding_notifiers > 0) { | if (c->outstanding_notifiers > 0) { | |||
res = qb_ipc_us_send(&c->setup, data, c->outstanding_notifie rs); | res = qb_ipc_us_send(&c->setup, data, c->outstanding_notifie rs); | |||
} | } | |||
if (res > 0) { | if (res > 0) { | |||
c->outstanding_notifiers -= res; | c->outstanding_notifiers -= res; | |||
} | } | |||
if (c->outstanding_notifiers > 0) { | if (c->outstanding_notifiers > 0) { | |||
return 0; | return 0; | |||
} else { | } else { | |||
c->outstanding_notifiers = 0; | c->outstanding_notifiers = 0; | |||
c->poll_events = POLLIN | POLLPRI | POLLNVAL, | c->poll_events = POLLIN | POLLPRI | POLLNVAL, | |||
(void)_modify_dispatch_descriptor_(c); | (void)_modify_dispatch_descriptor_(c); | |||
} | } | |||
return 0; | return 0; | |||
} | } | |||
ssize_t qb_ipcs_event_send(struct qb_ipcs_connection * c, const void *data, | ssize_t | |||
size_t size) | qb_ipcs_event_send(struct qb_ipcs_connection * c, const void *data, size_t | |||
size) | ||||
{ | { | |||
ssize_t res; | ssize_t res; | |||
ssize_t res2 = 0; | ssize_t res2 = 0; | |||
if (c == NULL) { | ||||
return -EINVAL; | ||||
} | ||||
qb_ipcs_connection_ref(c); | qb_ipcs_connection_ref(c); | |||
res = c->service->funcs.send(&c->event, data, size); | res = c->service->funcs.send(&c->event, data, size); | |||
if (res != size) { | if (res != size) { | |||
goto deref_and_return; | goto deref_and_return; | |||
} | } | |||
c->stats.events++; | c->stats.events++; | |||
if (c->service->needs_sock_for_poll) { | if (c->service->needs_sock_for_poll) { | |||
if (c->outstanding_notifiers > 0) { | if (c->outstanding_notifiers > 0) { | |||
c->outstanding_notifiers++; | c->outstanding_notifiers++; | |||
} else { | } else { | |||
res2 = qb_ipc_us_send(&c->setup, data, 1); | res2 = qb_ipc_us_send(&c->setup, data, 1); | |||
if (res2 == 1) { | if (res2 == 1) { | |||
goto deref_and_return; | goto deref_and_return; | |||
} | } | |||
/* | /* | |||
* notify the client later, when we can. | * notify the client later, when we can. | |||
*/ | */ | |||
c->outstanding_notifiers++; | c->outstanding_notifiers++; | |||
c->poll_events = POLLOUT | POLLIN | POLLPRI | POLLNV AL, | c->poll_events = POLLOUT | POLLIN | POLLPRI | POLLNV AL, | |||
(void)_modify_dispatch_descriptor_(c); | (void)_modify_dispatch_descriptor_(c); | |||
} | } | |||
} | } | |||
deref_and_return: | deref_and_return: | |||
qb_ipcs_connection_unref(c); | qb_ipcs_connection_unref(c); | |||
return res; | return res; | |||
} | } | |||
ssize_t qb_ipcs_event_sendv(struct qb_ipcs_connection * c, | ssize_t | |||
const struct iovec * iov, size_t iov_len) | qb_ipcs_event_sendv(struct qb_ipcs_connection * c, | |||
const struct iovec * iov, size_t iov_len) | ||||
{ | { | |||
ssize_t res; | ssize_t res; | |||
ssize_t res2; | ssize_t res2; | |||
if (c == NULL) { | ||||
return -EINVAL; | ||||
} | ||||
qb_ipcs_connection_ref(c); | qb_ipcs_connection_ref(c); | |||
res = c->service->funcs.sendv(&c->event, iov, iov_len); | res = c->service->funcs.sendv(&c->event, iov, iov_len); | |||
if (res < 0) { | if (res < 0) { | |||
goto deref_and_return; | goto deref_and_return; | |||
} | } | |||
c->stats.events++; | c->stats.events++; | |||
if (c->service->needs_sock_for_poll) { | if (c->service->needs_sock_for_poll) { | |||
if (c->outstanding_notifiers > 0) { | if (c->outstanding_notifiers > 0) { | |||
c->outstanding_notifiers++; | c->outstanding_notifiers++; | |||
} else { | } else { | |||
res2 = qb_ipc_us_send(&c->setup, &res, 1); | res2 = qb_ipc_us_send(&c->setup, &res, 1); | |||
if (res2 == 1) { | if (res2 == 1) { | |||
goto deref_and_return; | goto deref_and_return; | |||
} | } | |||
/* | /* | |||
* notify the client later, when we can. | * notify the client later, when we can. | |||
*/ | */ | |||
c->outstanding_notifiers++; | c->outstanding_notifiers++; | |||
c->poll_events = POLLOUT | POLLIN | POLLPRI | POLLNV AL, | c->poll_events = POLLOUT | POLLIN | POLLPRI | POLLNV AL, | |||
(void)_modify_dispatch_descriptor_(c); | (void)_modify_dispatch_descriptor_(c); | |||
} | } | |||
} | } | |||
deref_and_return: | deref_and_return: | |||
qb_ipcs_connection_unref(c); | qb_ipcs_connection_unref(c); | |||
return res; | return res; | |||
} | } | |||
qb_ipcs_connection_t *qb_ipcs_connection_first_get(struct qb_ipcs_service * | qb_ipcs_connection_t * | |||
s) | qb_ipcs_connection_first_get(struct qb_ipcs_service * s) | |||
{ | { | |||
struct qb_ipcs_connection *c; | struct qb_ipcs_connection *c; | |||
struct qb_list_head *iter; | struct qb_list_head *iter; | |||
if (qb_list_empty(&s->connections)) { | if (qb_list_empty(&s->connections)) { | |||
return NULL; | return NULL; | |||
} | } | |||
iter = s->connections.next; | iter = s->connections.next; | |||
c = qb_list_entry(iter, struct qb_ipcs_connection, list); | c = qb_list_entry(iter, struct qb_ipcs_connection, list); | |||
qb_ipcs_connection_ref(c); | qb_ipcs_connection_ref(c); | |||
return c; | return c; | |||
} | } | |||
qb_ipcs_connection_t * qb_ipcs_connection_next_get(struct qb_ipcs_service* | qb_ipcs_connection_t * | |||
s, | qb_ipcs_connection_next_get(struct qb_ipcs_service * s, | |||
struct qb_ipcs_connection | struct qb_ipcs_connection * current) | |||
*current) | ||||
{ | { | |||
struct qb_ipcs_connection *c; | struct qb_ipcs_connection *c; | |||
struct qb_list_head *iter; | struct qb_list_head *iter; | |||
if (current->list.next == &s->connections) { | if (current == NULL || current->list.next == &s->connections) { | |||
return NULL; | return NULL; | |||
} | } | |||
iter = current->list.next; | iter = current->list.next; | |||
c = qb_list_entry(iter, struct qb_ipcs_connection, list); | c = qb_list_entry(iter, struct qb_ipcs_connection, list); | |||
qb_ipcs_connection_ref(c); | qb_ipcs_connection_ref(c); | |||
return c; | return c; | |||
} | } | |||
int32_t qb_ipcs_service_id_get(struct qb_ipcs_connection *c) | int32_t | |||
qb_ipcs_service_id_get(struct qb_ipcs_connection * c) | ||||
{ | { | |||
if (c == NULL) { | ||||
return -EINVAL; | ||||
} | ||||
return c->service->service_id; | return c->service->service_id; | |||
} | } | |||
struct qb_ipcs_connection *qb_ipcs_connection_alloc(struct qb_ipcs_service | struct qb_ipcs_connection * | |||
*s) | qb_ipcs_connection_alloc(struct qb_ipcs_service *s) | |||
{ | { | |||
struct qb_ipcs_connection *c = calloc(1, sizeof(struct qb_ipcs_conne | struct qb_ipcs_connection *c = | |||
ction)); | calloc(1, sizeof(struct qb_ipcs_connection)); | |||
if (c == NULL) { | ||||
return NULL; | ||||
} | ||||
c->refcount = 1; | c->refcount = 1; | |||
c->service = s; | c->service = s; | |||
c->pid = 0; | c->pid = 0; | |||
c->euid = -1; | c->euid = -1; | |||
c->egid = -1; | c->egid = -1; | |||
qb_list_init(&c->list); | qb_list_init(&c->list); | |||
c->receive_buf = NULL; | c->receive_buf = NULL; | |||
c->context = NULL; | c->context = NULL; | |||
c->fc_enabled = QB_FALSE; | c->fc_enabled = QB_FALSE; | |||
c->state = QB_IPCS_CONNECTION_INACTIVE; | c->state = QB_IPCS_CONNECTION_INACTIVE; | |||
c->poll_events = POLLIN | POLLPRI | POLLNVAL; | c->poll_events = POLLIN | POLLPRI | POLLNVAL; | |||
return c; | return c; | |||
} | } | |||
void qb_ipcs_connection_ref(struct qb_ipcs_connection *c) | void | |||
qb_ipcs_connection_ref(struct qb_ipcs_connection *c) | ||||
{ | { | |||
qb_atomic_int_inc(&c->refcount); | if (c) { | |||
qb_atomic_int_inc(&c->refcount); | ||||
} | ||||
} | } | |||
void qb_ipcs_connection_unref(struct qb_ipcs_connection *c) | void | |||
qb_ipcs_connection_unref(struct qb_ipcs_connection *c) | ||||
{ | { | |||
int32_t free_it; | int32_t free_it; | |||
if (c == NULL) { | ||||
return; | ||||
} | ||||
if (c->refcount < 1) { | if (c->refcount < 1) { | |||
qb_util_log(LOG_ERR, "%s() ref:%d state:%d fd:%d", | qb_util_log(LOG_ERR, "%s() ref:%d state:%d fd:%d", | |||
__func__, c->refcount, c->state, c->setup.u.us.s | __func__, c->refcount, c->state, | |||
ock); | c->setup.u.us.sock); | |||
assert(0); | assert(0); | |||
} | } | |||
free_it = qb_atomic_int_dec_and_test(&c->refcount); | free_it = qb_atomic_int_dec_and_test(&c->refcount); | |||
if (free_it) { | if (free_it) { | |||
qb_list_del(&c->list); | qb_list_del(&c->list); | |||
if (c->service->serv_fns.connection_destroyed) { | if (c->service->serv_fns.connection_destroyed) { | |||
c->service->serv_fns.connection_destroyed(c); | c->service->serv_fns.connection_destroyed(c); | |||
} | } | |||
c->service->funcs.disconnect(c); | c->service->funcs.disconnect(c); | |||
if (c->receive_buf) { | free(c->receive_buf); | |||
free(c->receive_buf); | ||||
} | ||||
free(c); | free(c); | |||
} | } | |||
} | } | |||
void qb_ipcs_disconnect(struct qb_ipcs_connection *c) | void | |||
qb_ipcs_disconnect(struct qb_ipcs_connection *c) | ||||
{ | { | |||
int32_t res = 0; | int32_t res = 0; | |||
qb_loop_job_dispatch_fn rerun_job; | qb_loop_job_dispatch_fn rerun_job; | |||
if (c == NULL) { | ||||
return; | ||||
} | ||||
qb_util_log(LOG_DEBUG, "%s() state:%d", __func__, c->state); | qb_util_log(LOG_DEBUG, "%s() state:%d", __func__, c->state); | |||
if (c->state == QB_IPCS_CONNECTION_ACTIVE) { | if (c->state == QB_IPCS_CONNECTION_ACTIVE) { | |||
if (c->service->needs_sock_for_poll && | c->state = QB_IPCS_CONNECTION_INACTIVE; | |||
c->setup.u.us.sock > 0) { | c->service->stats.closed_connections++; | |||
if (c->service->needs_sock_for_poll && c->setup.u.us.sock > | ||||
0) { | ||||
(void)c->service->poll_fns.dispatch_del(c->setup.u.u | ||||
s.sock); | ||||
qb_ipcc_us_sock_close(c->setup.u.us.sock); | qb_ipcc_us_sock_close(c->setup.u.us.sock); | |||
c->setup.u.us.sock = -1; | c->setup.u.us.sock = -1; | |||
qb_ipcs_connection_unref(c); | qb_ipcs_connection_unref(c); | |||
} | } | |||
c->state = QB_IPCS_CONNECTION_DOWN; | /* return early as it's an incomplete connection. | |||
*/ | ||||
return; | ||||
} | ||||
if (c->state == QB_IPCS_CONNECTION_ESTABLISHED) { | ||||
c->state = QB_IPCS_CONNECTION_SHUTTING_DOWN; | ||||
c->service->stats.active_connections--; | c->service->stats.active_connections--; | |||
c->service->stats.closed_connections++; | c->service->stats.closed_connections++; | |||
if (c->service->needs_sock_for_poll && c->setup.u.us.sock > | ||||
0) { | ||||
(void)c->service->poll_fns.dispatch_del(c->setup.u.u | ||||
s.sock); | ||||
qb_ipcc_us_sock_close(c->setup.u.us.sock); | ||||
c->setup.u.us.sock = -1; | ||||
qb_ipcs_connection_unref(c); | ||||
} | ||||
} | } | |||
if (c->state == QB_IPCS_CONNECTION_DOWN) { | if (c->state == QB_IPCS_CONNECTION_SHUTTING_DOWN) { | |||
res = 0; | res = 0; | |||
if (c->service->serv_fns.connection_closed) { | if (c->service->serv_fns.connection_closed) { | |||
res = c->service->serv_fns.connection_closed(c); | res = c->service->serv_fns.connection_closed(c); | |||
} | } | |||
if (res == 0) { | if (res == 0) { | |||
qb_ipcs_connection_unref(c); | qb_ipcs_connection_unref(c); | |||
} else { | } else { | |||
/* ok, so they want the connection_closedd() | /* OK, so they want the connection_closed | |||
* function re-run */ | * function re-run */ | |||
rerun_job = (qb_loop_job_dispatch_fn)qb_ipcs_disconn | rerun_job = | |||
ect; | (qb_loop_job_dispatch_fn) qb_ipcs_disconnect; | |||
res = c->service->poll_fns.job_add(QB_LOOP_LOW, c, | res = c->service->poll_fns.job_add(QB_LOOP_LOW, | |||
c, | ||||
rerun_job); | rerun_job); | |||
if (res != 0) { | if (res != 0) { | |||
/* last ditch attempt to cleanup */ | /* last ditch attempt to cleanup */ | |||
qb_ipcs_connection_unref(c); | qb_ipcs_connection_unref(c); | |||
} | } | |||
} | } | |||
} | } | |||
} | } | |||
static void qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c, int32_t f | static void | |||
c_enable) | qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c, int32_t fc_enable) | |||
{ | { | |||
if (c == NULL) { | ||||
return; | ||||
} | ||||
if (c->fc_enabled != fc_enable) { | if (c->fc_enabled != fc_enable) { | |||
c->service->funcs.fc_set(&c->request, fc_enable); | c->service->funcs.fc_set(&c->request, fc_enable); | |||
c->fc_enabled = fc_enable; | c->fc_enabled = fc_enable; | |||
c->stats.flow_control_state = fc_enable; | c->stats.flow_control_state = fc_enable; | |||
c->stats.flow_control_count++; | c->stats.flow_control_count++; | |||
} | } | |||
} | } | |||
static int32_t _process_request_(struct qb_ipcs_connection *c, | static int32_t | |||
int32_t ms_timeout) | _process_request_(struct qb_ipcs_connection *c, int32_t ms_timeout) | |||
{ | { | |||
int32_t res = 0; | int32_t res = 0; | |||
ssize_t size; | ssize_t size; | |||
struct qb_ipc_request_header *hdr; | struct qb_ipc_request_header *hdr; | |||
qb_ipcs_connection_ref(c); | qb_ipcs_connection_ref(c); | |||
if (c->service->funcs.peek && c->service->funcs.reclaim) { | if (c->service->funcs.peek && c->service->funcs.reclaim) { | |||
size = c->service->funcs.peek(&c->request, (void**)&hdr, | size = c->service->funcs.peek(&c->request, (void **)&hdr, | |||
ms_timeout); | ms_timeout); | |||
} else { | } else { | |||
hdr = (struct qb_ipc_request_header *)c->receive_buf; | hdr = c->receive_buf; | |||
size = c->service->funcs.recv(&c->request, hdr, c->request.m | size = c->service->funcs.recv(&c->request, | |||
ax_msg_size, | hdr, | |||
c->request.max_msg_size, | ||||
ms_timeout); | ms_timeout); | |||
} | } | |||
if (size < 0) { | if (size < 0) { | |||
if (size != -EAGAIN && size != -ETIMEDOUT) { | if (size != -EAGAIN && size != -ETIMEDOUT) { | |||
qb_util_log(LOG_ERR, "%s(): %s", __func__, strerror( | qb_util_perror(LOG_ERR, | |||
-res)); | "recv from client connection failed") | |||
; | ||||
} else { | } else { | |||
c->stats.recv_retries++; | c->stats.recv_retries++; | |||
} | } | |||
res = size; | res = size; | |||
goto cleanup; | goto cleanup; | |||
} | } | |||
c->stats.requests++; | c->stats.requests++; | |||
if (hdr->id == QB_IPC_MSG_DISCONNECT) { | if (hdr->id == QB_IPC_MSG_DISCONNECT) { | |||
qb_util_log(LOG_DEBUG, "%s() QB_IPC_MSG_DISCONNECT", __func_ _); | qb_util_log(LOG_DEBUG, "client requesting a disconnect"); | |||
qb_ipcs_disconnect(c); | qb_ipcs_disconnect(c); | |||
res = -ESHUTDOWN; | res = -ESHUTDOWN; | |||
} else { | } else { | |||
res = c->service->serv_fns.msg_process(c, hdr, hdr->size); | res = c->service->serv_fns.msg_process(c, hdr, hdr->size); | |||
/* 0 == good, negitive == backoff */ | /* 0 == good, negative == backoff */ | |||
if (res < 0) { | if (res < 0) { | |||
res = -ENOBUFS; | res = -ENOBUFS; | |||
} else { | } else { | |||
res = size; | res = size; | |||
} | } | |||
} | } | |||
if (c->service->funcs.peek && c->service->funcs.reclaim) { | if (c->service->funcs.peek && c->service->funcs.reclaim) { | |||
c->service->funcs.reclaim(&c->request); | c->service->funcs.reclaim(&c->request); | |||
} | } | |||
cleanup: | cleanup: | |||
qb_ipcs_connection_unref(c); | qb_ipcs_connection_unref(c); | |||
return res; | return res; | |||
} | } | |||
#define IPC_REQUEST_TIMEOUT 10 | #define IPC_REQUEST_TIMEOUT 10 | |||
#define MAX_RECV_MSGS 50 | #define MAX_RECV_MSGS 50 | |||
int32_t qb_ipcs_dispatch_service_request(int32_t fd, int32_t revents, | int32_t | |||
void *data) | qb_ipcs_dispatch_service_request(int32_t fd, int32_t revents, void *data) | |||
{ | { | |||
int32_t res = _process_request_((struct qb_ipcs_connection *)data, | int32_t res = _process_request_((struct qb_ipcs_connection *)data, | |||
IPC_REQUEST_TIMEOUT); | IPC_REQUEST_TIMEOUT); | |||
if (res > 0) { | if (res > 0) { | |||
return 0; | return 0; | |||
} | } | |||
return res; | return res; | |||
} | } | |||
static ssize_t _request_q_len_get(struct qb_ipcs_connection *c) | static ssize_t | |||
_request_q_len_get(struct qb_ipcs_connection *c) | ||||
{ | { | |||
ssize_t q_len; | ssize_t q_len; | |||
if (c->service->funcs.q_len_get) { | if (c->service->funcs.q_len_get) { | |||
q_len = c->service->funcs.q_len_get(&c->request); | q_len = c->service->funcs.q_len_get(&c->request); | |||
if (q_len < 0) { | if (q_len < 0) { | |||
return q_len; | return q_len; | |||
} | } | |||
q_len = QB_MIN(q_len, MAX_RECV_MSGS); | q_len = QB_MIN(q_len, MAX_RECV_MSGS); | |||
if (c->service->poll_priority == QB_LOOP_MED) | if (c->service->poll_priority == QB_LOOP_MED) | |||
q_len = QB_MIN(q_len, 5); | q_len = QB_MIN(q_len, 5); | |||
if (c->service->poll_priority == QB_LOOP_LOW) | if (c->service->poll_priority == QB_LOOP_LOW) | |||
q_len = 1; | q_len = 1; | |||
} else { | } else { | |||
q_len = 1; | q_len = 1; | |||
} | } | |||
return q_len; | return q_len; | |||
} | } | |||
int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, | int32_t | |||
void *data) | qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, void *data | |||
) | ||||
{ | { | |||
struct qb_ipcs_connection *c = (struct qb_ipcs_connection *)data; | struct qb_ipcs_connection *c = (struct qb_ipcs_connection *)data; | |||
char bytes[MAX_RECV_MSGS]; | char bytes[MAX_RECV_MSGS]; | |||
int32_t res; | int32_t res; | |||
int32_t recvd = 0; | int32_t recvd = 0; | |||
ssize_t avail; | ssize_t avail; | |||
if (revents & POLLHUP) { | if (revents & POLLHUP) { | |||
qb_util_log(LOG_DEBUG, "%s HUP conn:%p fd:%d", __func__, c, fd); | qb_util_log(LOG_DEBUG, "%s HUP conn:%p fd:%d", __func__, c, fd); | |||
qb_ipcs_disconnect(c); | qb_ipcs_disconnect(c); | |||
skipping to change at line 602 | skipping to change at line 700 | |||
if (c->service->needs_sock_for_poll && recvd > 0) { | if (c->service->needs_sock_for_poll && recvd > 0) { | |||
(void)qb_ipc_us_recv(&c->setup, bytes, recvd, -1); | (void)qb_ipc_us_recv(&c->setup, bytes, recvd, -1); | |||
} | } | |||
res = QB_MIN(0, res); | res = QB_MIN(0, res); | |||
if (res == -EAGAIN || res == -ETIMEDOUT || res == -ENOBUFS) { | if (res == -EAGAIN || res == -ETIMEDOUT || res == -ENOBUFS) { | |||
res = 0; | res = 0; | |||
} | } | |||
if (res != 0) { | if (res != 0) { | |||
qb_util_log(LOG_DEBUG, "%s returning %d : %s", | qb_util_perror(LOG_DEBUG, "request returned error"); | |||
__func__, res, strerror(-res)); | ||||
qb_ipcs_connection_unref(c); | qb_ipcs_connection_unref(c); | |||
} | } | |||
return res; | return res; | |||
} | } | |||
void qb_ipcs_context_set(struct qb_ipcs_connection *c, void *context) | void | |||
qb_ipcs_context_set(struct qb_ipcs_connection *c, void *context) | ||||
{ | { | |||
if (c == NULL) { | ||||
return; | ||||
} | ||||
c->context = context; | c->context = context; | |||
} | } | |||
void *qb_ipcs_context_get(struct qb_ipcs_connection *c) | void * | |||
qb_ipcs_context_get(struct qb_ipcs_connection *c) | ||||
{ | { | |||
if (c == NULL) { | ||||
return NULL; | ||||
} | ||||
return c->context; | return c->context; | |||
} | } | |||
int32_t qb_ipcs_connection_stats_get(qb_ipcs_connection_t *c, | int32_t | |||
struct qb_ipcs_connection_stats* stats, | qb_ipcs_connection_stats_get(qb_ipcs_connection_t * c, | |||
int32_t clear_after_read) | struct qb_ipcs_connection_stats * stats, | |||
int32_t clear_after_read) | ||||
{ | { | |||
if (c == NULL) { | ||||
return -EINVAL; | ||||
} | ||||
memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats)); | memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats)); | |||
if (clear_after_read) { | if (clear_after_read) { | |||
memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats) ); | memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats) ); | |||
c->stats.client_pid = c->pid; | c->stats.client_pid = c->pid; | |||
} | } | |||
return 0; | return 0; | |||
} | } | |||
int32_t qb_ipcs_stats_get(struct qb_ipcs_service* s, | int32_t | |||
struct qb_ipcs_stats* stats, | qb_ipcs_stats_get(struct qb_ipcs_service * s, | |||
int32_t clear_after_read) | struct qb_ipcs_stats * stats, int32_t clear_after_read) | |||
{ | { | |||
if (s == NULL) { | ||||
return -EINVAL; | ||||
} | ||||
memcpy(stats, &s->stats, sizeof(struct qb_ipcs_stats)); | memcpy(stats, &s->stats, sizeof(struct qb_ipcs_stats)); | |||
if (clear_after_read) { | if (clear_after_read) { | |||
memset(&s->stats, 0, sizeof(struct qb_ipcs_stats)); | memset(&s->stats, 0, sizeof(struct qb_ipcs_stats)); | |||
} | } | |||
return 0; | return 0; | |||
} | } | |||
End of changes. 81 change blocks. | ||||
120 lines changed or deleted | 224 lines changed or added | |||
This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/ |