ipc_us.c   ipc_us.c 
skipping to change at line 22 skipping to change at line 22
* *
* libqb is distributed in the hope that it will be useful, * libqb is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of * but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details. * GNU Lesser General Public License for more details.
* *
* You should have received a copy of the GNU Lesser General Public License * You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>. * along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os_base.h" #include "os_base.h"
#if defined(HAVE_GETPEERUCRED) #if defined(HAVE_GETPEERUCRED)
#include <ucred.h> #include <ucred.h>
#endif #endif
#ifdef HAVE_SYS_UN_H #ifdef HAVE_SYS_UN_H
#include <sys/un.h> #include <sys/un.h>
#endif /* HAVE_SYS_UN_H */ #endif /* HAVE_SYS_UN_H */
#ifdef HAVE_SYS_STAT_H #ifdef HAVE_SYS_STAT_H
#include <sys/stat.h> #include <sys/stat.h>
#endif #endif
#ifdef HAVE_SYS_MMAN_H
#include <sys/mman.h> #include <sys/mman.h>
#endif
#include <qb/qbatomic.h> #include <qb/qbatomic.h>
#include <qb/qbipcs.h> #include <qb/qbipcs.h>
#include <qb/qbloop.h> #include <qb/qbloop.h>
#include <qb/qbdefs.h> #include <qb/qbdefs.h>
#include "util_int.h" #include "util_int.h"
#include "ipc_int.h" #include "ipc_int.h"
#define SERVER_BACKLOG 5 #define SERVER_BACKLOG 5
skipping to change at line 69 skipping to change at line 72
struct ipc_auth_ugp { struct ipc_auth_ugp {
uid_t uid; uid_t uid;
gid_t gid; gid_t gid;
pid_t pid; pid_t pid;
}; };
static int32_t qb_ipcs_us_connection_acceptor(int fd, int revent, void *dat a); static int32_t qb_ipcs_us_connection_acceptor(int fd, int revent, void *dat a);
static int32_t qb_ipc_us_fc_get(struct qb_ipc_one_way *one_way); static int32_t qb_ipc_us_fc_get(struct qb_ipc_one_way *one_way);
#ifdef SO_NOSIGPIPE #ifdef SO_NOSIGPIPE
static void socket_nosigpipe(int32_t s) static void
socket_nosigpipe(int32_t s)
{ {
int32_t on = 1; int32_t on = 1;
setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, (void *)&on, sizeof(on)); setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, (void *)&on, sizeof(on));
} }
#endif #endif
#ifndef MSG_NOSIGNAL #ifndef MSG_NOSIGNAL
#define MSG_NOSIGNAL 0 #define MSG_NOSIGNAL 0
#endif #endif
ssize_t qb_ipc_us_send(struct qb_ipc_one_way *one_way, const void *msg, siz ssize_t
e_t len) qb_ipc_us_send(struct qb_ipc_one_way *one_way, const void *msg, size_t len)
{ {
int32_t result; int32_t result;
struct msghdr msg_send; struct msghdr msg_send;
struct iovec iov_send; struct iovec iov_send;
char *rbuf = (char *)msg; char *rbuf = (char *)msg;
int32_t processed = 0; int32_t processed = 0;
struct ipc_us_control *ctl = (struct ipc_us_control *)one_way->u.us. struct ipc_us_control *ctl =
shared_data; (struct ipc_us_control *)one_way->u.us.shared_data;
msg_send.msg_iov = &iov_send; msg_send.msg_iov = &iov_send;
msg_send.msg_iovlen = 1; msg_send.msg_iovlen = 1;
msg_send.msg_name = 0; msg_send.msg_name = 0;
msg_send.msg_namelen = 0; msg_send.msg_namelen = 0;
#if !defined(QB_SOLARIS) #if !defined(QB_SOLARIS)
msg_send.msg_control = 0; msg_send.msg_control = 0;
msg_send.msg_controllen = 0; msg_send.msg_controllen = 0;
msg_send.msg_flags = 0; msg_send.msg_flags = 0;
skipping to change at line 122 skipping to change at line 128
processed += result; processed += result;
if (processed != len) { if (processed != len) {
goto retry_send; goto retry_send;
} }
if (ctl) { if (ctl) {
qb_atomic_int_inc(&ctl->sent); qb_atomic_int_inc(&ctl->sent);
} }
return processed; return processed;
} }
static ssize_t qb_ipc_us_sendv(struct qb_ipc_one_way *one_way, const struct static ssize_t
iovec *iov, size_t iov_len) qb_ipc_us_sendv(struct qb_ipc_one_way *one_way, const struct iovec *iov,
size_t iov_len)
{ {
int32_t result; int32_t result;
struct msghdr msg_send; struct msghdr msg_send;
int32_t processed = 0; int32_t processed = 0;
size_t len = 0; size_t len = 0;
int32_t i; int32_t i;
struct ipc_us_control *ctl = (struct ipc_us_control *)one_way->u.us. struct ipc_us_control *ctl =
shared_data; (struct ipc_us_control *)one_way->u.us.shared_data;
for (i = 0; i < iov_len; i++) { for (i = 0; i < iov_len; i++) {
len += iov[i].iov_len; len += iov[i].iov_len;
} }
msg_send.msg_iov = (struct iovec*)iov; msg_send.msg_iov = (struct iovec *)iov;
msg_send.msg_iovlen = iov_len; msg_send.msg_iovlen = iov_len;
msg_send.msg_name = 0; msg_send.msg_name = 0;
msg_send.msg_namelen = 0; msg_send.msg_namelen = 0;
#if !defined(QB_SOLARIS) #if !defined(QB_SOLARIS)
msg_send.msg_control = 0; msg_send.msg_control = 0;
msg_send.msg_controllen = 0; msg_send.msg_controllen = 0;
msg_send.msg_flags = 0; msg_send.msg_flags = 0;
#else #else
msg_send.msg_accrights = NULL; msg_send.msg_accrights = NULL;
skipping to change at line 164 skipping to change at line 173
processed += result; processed += result;
if (processed != len) { if (processed != len) {
goto retry_send; goto retry_send;
} }
if (ctl) { if (ctl) {
qb_atomic_int_inc(&ctl->sent); qb_atomic_int_inc(&ctl->sent);
} }
return processed; return processed;
} }
static ssize_t qb_ipc_us_recv_msghdr(int32_t s, static ssize_t
struct msghdr *hdr, qb_ipc_us_recv_msghdr(int32_t s, struct msghdr *hdr, char *msg, size_t len)
char *msg, size_t len)
{ {
int32_t result; int32_t result;
int32_t processed = 0; int32_t processed = 0;
retry_recv: retry_recv:
hdr->msg_iov->iov_base = &msg[processed]; hdr->msg_iov->iov_base = &msg[processed];
hdr->msg_iov->iov_len = len - processed; hdr->msg_iov->iov_len = len - processed;
result = recvmsg(s, hdr, MSG_NOSIGNAL | MSG_WAITALL); result = recvmsg(s, hdr, MSG_NOSIGNAL | MSG_WAITALL);
if (result == -1 && errno == EAGAIN) { if (result == -1 && errno == EAGAIN) {
skipping to change at line 200 skipping to change at line 208
processed += result; processed += result;
if (processed != len) { if (processed != len) {
goto retry_recv; goto retry_recv;
} }
assert(processed == len); assert(processed == len);
return processed; return processed;
} }
int32_t qb_ipc_us_recv_ready(struct qb_ipc_one_way *one_way, int32_t ms_tim int32_t
eout) qb_ipc_us_recv_ready(struct qb_ipc_one_way * one_way, int32_t ms_timeout)
{ {
struct pollfd ufds; struct pollfd ufds;
int32_t poll_events; int32_t poll_events;
ufds.fd = one_way->u.us.sock; ufds.fd = one_way->u.us.sock;
ufds.events = POLLIN; ufds.events = POLLIN;
ufds.revents = 0; ufds.revents = 0;
poll_events = poll (&ufds, 1, ms_timeout); poll_events = poll(&ufds, 1, ms_timeout);
if ((poll_events == -1 && errno == EINTR) || if ((poll_events == -1 && errno == EINTR) || poll_events == 0) {
poll_events == 0) {
return -EAGAIN; return -EAGAIN;
} else if (poll_events == -1) { } else if (poll_events == -1) {
return -errno; return -errno;
} else if (poll_events == 1 && (ufds.revents & (POLLERR|POLLHUP))) { } else if (poll_events == 1 && (ufds.revents & (POLLERR | POLLHUP))) {
return -ENOTCONN; return -ENOTCONN;
} }
return 0; return 0;
} }
ssize_t qb_ipc_us_recv(struct qb_ipc_one_way *one_way, ssize_t
void *msg, size_t len, int32_t timeout) qb_ipc_us_recv(struct qb_ipc_one_way * one_way,
void *msg, size_t len, int32_t timeout)
{ {
int32_t result; int32_t result;
struct ipc_us_control *ctl = (struct ipc_us_control *)one_way->u.us. struct ipc_us_control *ctl =
shared_data; (struct ipc_us_control *)one_way->u.us.shared_data;
retry_recv: retry_recv:
result = recv(one_way->u.us.sock, msg, len, MSG_NOSIGNAL | MSG_WAITA LL); result = recv(one_way->u.us.sock, msg, len, MSG_NOSIGNAL | MSG_WAITA LL);
if (timeout == -1 && result == -1 && errno == EAGAIN) { if (timeout == -1 && result == -1 && errno == EAGAIN) {
goto retry_recv; goto retry_recv;
} }
if (result == -1) { if (result == -1) {
return -errno; return -errno;
} }
if (result == 0) { if (result == 0) {
return -ENOTCONN; return -ENOTCONN;
} }
if (ctl) { if (ctl) {
(void)qb_atomic_int_dec_and_test(&ctl->sent); (void)qb_atomic_int_dec_and_test(&ctl->sent);
} }
return result; return result;
} }
static int32_t qb_ipcc_us_sock_connect(const char *socket_name, int32_t * s static int32_t
ock_pt) qb_ipcc_us_sock_connect(const char *socket_name, int32_t * sock_pt)
{ {
int32_t request_fd; int32_t request_fd;
struct sockaddr_un address; struct sockaddr_un address;
int32_t res = 0; int32_t res = 0;
#if defined(QB_SOLARIS) #if defined(QB_SOLARIS)
request_fd = socket(PF_UNIX, SOCK_STREAM, 0); request_fd = socket(PF_UNIX, SOCK_STREAM, 0);
#else #else
request_fd = socket(PF_LOCAL, SOCK_STREAM, 0); request_fd = socket(PF_LOCAL, SOCK_STREAM, 0);
#endif #endif
if (request_fd == -1) { if (request_fd == -1) {
return -errno; return -errno;
} }
#ifdef SO_NOSIGPIPE #ifdef SO_NOSIGPIPE
socket_nosigpipe(request_fd); socket_nosigpipe(request_fd);
#endif /* SO_NOSIGPIPE */ #endif /* SO_NOSIGPIPE */
res = qb_util_fd_nonblock_cloexec_set(request_fd); res = qb_sys_fd_nonblock_cloexec_set(request_fd);
if (res < 0) { if (res < 0) {
goto error_connect; goto error_connect;
} }
memset(&address, 0, sizeof(struct sockaddr_un)); memset(&address, 0, sizeof(struct sockaddr_un));
address.sun_family = AF_UNIX; address.sun_family = AF_UNIX;
#if defined(QB_BSD) || defined(QB_DARWIN) #if defined(QB_BSD) || defined(QB_DARWIN)
address.sun_len = SUN_LEN(&address); address.sun_len = SUN_LEN(&address);
#endif #endif
#if defined(QB_LINUX) #if defined(QB_LINUX)
snprintf(address.sun_path + 1, UNIX_PATH_MAX-1, "%s", socket_name); snprintf(address.sun_path + 1, UNIX_PATH_MAX - 1, "%s", socket_name) ;
#else #else
snprintf(address.sun_path, UNIX_PATH_MAX, "%s/%s", SOCKETDIR, socket snprintf(address.sun_path, UNIX_PATH_MAX, "%s/%s", SOCKETDIR,
_name); socket_name);
#endif #endif
if (connect(request_fd, (struct sockaddr *)&address, if (connect(request_fd, (struct sockaddr *)&address,
QB_SUN_LEN(&address)) == -1) { QB_SUN_LEN(&address)) == -1) {
res = -errno; res = -errno;
goto error_connect; goto error_connect;
} }
*sock_pt = request_fd; *sock_pt = request_fd;
return 0; return 0;
error_connect: error_connect:
close(request_fd); close(request_fd);
*sock_pt = -1; *sock_pt = -1;
return res; return res;
} }
void qb_ipcc_us_sock_close(int32_t sock) void
qb_ipcc_us_sock_close(int32_t sock)
{ {
shutdown(sock, SHUT_RDWR); shutdown(sock, SHUT_RDWR);
close(sock); close(sock);
} }
int32_t qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c, int32_t
struct qb_ipc_connection_response *r) qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c,
struct qb_ipc_connection_response *r)
{ {
int32_t res; int32_t res;
struct qb_ipc_connection_request request; struct qb_ipc_connection_request request;
res = qb_ipcc_us_sock_connect(c->name, &c->setup.u.us.sock); res = qb_ipcc_us_sock_connect(c->name, &c->setup.u.us.sock);
if (res != 0) { if (res != 0) {
return res; return res;
} }
request.hdr.id = QB_IPC_MSG_AUTHENTICATE; request.hdr.id = QB_IPC_MSG_AUTHENTICATE;
request.hdr.size = sizeof(request); request.hdr.size = sizeof(request);
request.max_msg_size = c->setup.max_msg_size; request.max_msg_size = c->setup.max_msg_size;
res = qb_ipc_us_send(&c->setup, &request, request.hdr.size); res = qb_ipc_us_send(&c->setup, &request, request.hdr.size);
if (res < 0) { if (res < 0) {
qb_ipcc_us_sock_close(c->setup.u.us.sock); qb_ipcc_us_sock_close(c->setup.u.us.sock);
return res; return res;
} }
res = qb_ipc_us_recv(&c->setup, r, sizeof(struct qb_ipc_connection_r res =
esponse), -1); qb_ipc_us_recv(&c->setup, r,
sizeof(struct qb_ipc_connection_response), -1);
if (res < 0) { if (res < 0) {
return res; return res;
} }
if (r->hdr.error != 0) { if (r->hdr.error != 0) {
return r->hdr.error; return r->hdr.error;
} }
return 0; return 0;
} }
static void qb_ipcc_us_disconnect(struct qb_ipcc_connection* c) static void
qb_ipcc_us_disconnect(struct qb_ipcc_connection *c)
{ {
munmap(c->request.u.us.shared_data, sizeof(struct ipc_us_control)); munmap(c->request.u.us.shared_data, sizeof(struct ipc_us_control));
unlink(c->request.u.us.shared_file_name); unlink(c->request.u.us.shared_file_name);
close(c->request.u.us.sock); close(c->request.u.us.sock);
close(c->event.u.us.sock); close(c->event.u.us.sock);
} }
int32_t qb_ipcc_us_connect(struct qb_ipcc_connection *c, int32_t
struct qb_ipc_connection_response *r) qb_ipcc_us_connect(struct qb_ipcc_connection *c,
struct qb_ipc_connection_response *r)
{ {
int32_t res; int32_t res;
struct qb_ipc_event_connection_request request; struct qb_ipc_event_connection_request request;
char path[PATH_MAX]; char path[PATH_MAX];
int32_t fd_hdr; int32_t fd_hdr;
struct ipc_us_control * ctl; struct ipc_us_control *ctl;
c->needs_sock_for_poll = QB_FALSE; c->needs_sock_for_poll = QB_FALSE;
c->funcs.send = qb_ipc_us_send; c->funcs.send = qb_ipc_us_send;
c->funcs.sendv = qb_ipc_us_sendv; c->funcs.sendv = qb_ipc_us_sendv;
c->funcs.recv = qb_ipc_us_recv; c->funcs.recv = qb_ipc_us_recv;
c->funcs.fc_get = qb_ipc_us_fc_get; c->funcs.fc_get = qb_ipc_us_fc_get;
c->funcs.disconnect = qb_ipcc_us_disconnect; c->funcs.disconnect = qb_ipcc_us_disconnect;
c->request.u.us.sock = c->setup.u.us.sock; c->request.u.us.sock = c->setup.u.us.sock;
c->response.u.us.sock = c->setup.u.us.sock; c->response.u.us.sock = c->setup.u.us.sock;
c->setup.u.us.sock = -1; c->setup.u.us.sock = -1;
fd_hdr = qb_util_mmap_file_open(path, r->request, fd_hdr = qb_sys_mmap_file_open(path, r->request,
sizeof(struct ipc_us_control), sizeof(struct ipc_us_control), O_RDWR
O_RDWR); );
if (fd_hdr < 0) { if (fd_hdr < 0) {
res = -errno; res = -errno;
qb_util_log(LOG_ERR, "couldn't open file for mmap: %s", qb_util_perror(LOG_ERR, "couldn't open file for mmap");
strerror(-res));
return res; return res;
} }
strcpy(c->request.u.us.shared_file_name, r->request); strcpy(c->request.u.us.shared_file_name, r->request);
c->request.u.us.shared_data = mmap(0, c->request.u.us.shared_data = mmap(0,
sizeof(struct ipc_us_control), sizeof(struct ipc_us_control),
PROT_READ | PROT_WRITE, MAP_SHARE D, PROT_READ | PROT_WRITE, MAP_SHARE D,
fd_hdr, 0); fd_hdr, 0);
if (c->request.u.us.shared_data == MAP_FAILED) { if (c->request.u.us.shared_data == MAP_FAILED) {
res = -errno; res = -errno;
qb_util_log(LOG_ERR, "couldn't create mmap for header"); qb_util_perror(LOG_ERR, "couldn't create mmap for header");
goto cleanup_hdr; goto cleanup_hdr;
} }
ctl = (struct ipc_us_control *)c->request.u.us.shared_data; ctl = (struct ipc_us_control *)c->request.u.us.shared_data;
ctl->sent = 0; ctl->sent = 0;
ctl->flow_control = 0; ctl->flow_control = 0;
close(fd_hdr); close(fd_hdr);
res = qb_ipcc_us_sock_connect(c->name, &c->event.u.us.sock); res = qb_ipcc_us_sock_connect(c->name, &c->event.u.us.sock);
skipping to change at line 414 skipping to change at line 430
unlink(r->request); unlink(r->request);
munmap(c->request.u.us.shared_data, sizeof(struct ipc_us_control)); munmap(c->request.u.us.shared_data, sizeof(struct ipc_us_control));
return res; return res;
} }
/* /*
************************************************************************** **************************************************************************
* SERVER * SERVER
*/ */
int32_t qb_ipcs_us_publish(struct qb_ipcs_service * s) int32_t
qb_ipcs_us_publish(struct qb_ipcs_service * s)
{ {
struct sockaddr_un un_addr; struct sockaddr_un un_addr;
int32_t res; int32_t res;
char error_str[100];
/* /*
* Create socket for IPC clients, name socket, listen for connection s * Create socket for IPC clients, name socket, listen for connection s
*/ */
#if defined(QB_SOLARIS) #if defined(QB_SOLARIS)
s->server_sock = socket(PF_UNIX, SOCK_STREAM, 0); s->server_sock = socket(PF_UNIX, SOCK_STREAM, 0);
#else #else
s->server_sock = socket(PF_LOCAL, SOCK_STREAM, 0); s->server_sock = socket(PF_LOCAL, SOCK_STREAM, 0);
#endif #endif
if (s->server_sock == -1) { if (s->server_sock == -1) {
res = -errno; res = -errno;
strerror_r(errno, error_str, 100); qb_util_perror(LOG_ERR, "Cannot create server socket");
qb_util_log(LOG_ERR,
"Cannot create server socket: %s", error_str);
return res; return res;
} }
res = qb_util_fd_nonblock_cloexec_set(s->server_sock); res = qb_sys_fd_nonblock_cloexec_set(s->server_sock);
if (res < 0) { if (res < 0) {
goto error_close; goto error_close;
} }
memset(&un_addr, 0, sizeof(struct sockaddr_un)); memset(&un_addr, 0, sizeof(struct sockaddr_un));
un_addr.sun_family = AF_UNIX; un_addr.sun_family = AF_UNIX;
#if defined(QB_BSD) || defined(QB_DARWIN) #if defined(QB_BSD) || defined(QB_DARWIN)
un_addr.sun_len = SUN_LEN(&un_addr); un_addr.sun_len = SUN_LEN(&un_addr);
#endif #endif
qb_util_log(LOG_INFO, "server name: %s", s->name); qb_util_log(LOG_INFO, "server name: %s", s->name);
#if defined(QB_LINUX) #if defined(QB_LINUX)
snprintf(un_addr.sun_path + 1, UNIX_PATH_MAX-1, "%s", s->name); snprintf(un_addr.sun_path + 1, UNIX_PATH_MAX - 1, "%s", s->name);
#else #else
{ {
struct stat stat_out; struct stat stat_out;
res = stat(SOCKETDIR, &stat_out); res = stat(SOCKETDIR, &stat_out);
if (res == -1 || (res == 0 && !S_ISDIR(stat_out.st_mode))) { if (res == -1 || (res == 0 && !S_ISDIR(stat_out.st_mode))) {
res = -errno; res = -errno;
qb_util_log(LOG_CRIT, qb_util_log(LOG_CRIT,
"Required directory not present %s", "Required directory not present %s",
SOCKETDIR); SOCKETDIR);
goto error_close; goto error_close;
} }
snprintf(un_addr.sun_path, UNIX_PATH_MAX, "%s/%s", SOCKETDIR snprintf(un_addr.sun_path, UNIX_PATH_MAX, "%s/%s", SOCKETDIR
, s->name); ,
s->name);
unlink(un_addr.sun_path); unlink(un_addr.sun_path);
} }
#endif #endif
res = res = bind(s->server_sock, (struct sockaddr *)&un_addr,
bind(s->server_sock, (struct sockaddr *)&un_addr, QB_SUN_LEN(&un_addr));
QB_SUN_LEN(&un_addr));
if (res) { if (res) {
res = -errno; res = -errno;
strerror_r(errno, error_str, 100); qb_util_perror(LOG_ERR, "Could not bind AF_UNIX (%s)",
qb_util_log(LOG_CRIT, un_addr.sun_path);
"Could not bind AF_UNIX (%s): %s.",
un_addr.sun_path, error_str);
goto error_close; goto error_close;
} }
/* /*
* Allow eveyrone to write to the socket since the IPC layer handles * Allow everyone to write to the socket since the IPC layer handles
* security automatically * security automatically
*/ */
#if !defined(QB_LINUX) #if !defined(QB_LINUX)
res = chmod(un_addr.sun_path, S_IRWXU | S_IRWXG | S_IRWXO); res = chmod(un_addr.sun_path, S_IRWXU | S_IRWXG | S_IRWXO);
#endif #endif
if (listen(s->server_sock, SERVER_BACKLOG) == -1) { if (listen(s->server_sock, SERVER_BACKLOG) == -1) {
strerror_r(errno, error_str, 100); qb_util_perror(LOG_ERR, "socket listen failed");
qb_util_log(LOG_ERR, "listen failed: %s.", error_str);
} }
res = s->poll_fns.dispatch_add(s->poll_priority, s->server_sock, res = s->poll_fns.dispatch_add(s->poll_priority, s->server_sock,
POLLIN | POLLPRI | POLLNVAL, POLLIN | POLLPRI | POLLNVAL,
s, qb_ipcs_us_connection_acceptor); s, qb_ipcs_us_connection_acceptor);
return res; return res;
error_close: error_close:
close(s->server_sock); close(s->server_sock);
return res; return res;
} }
int32_t qb_ipcs_us_withdraw(struct qb_ipcs_service * s) int32_t
qb_ipcs_us_withdraw(struct qb_ipcs_service * s)
{ {
qb_util_log(LOG_INFO, "withdrawing server sockets"); qb_util_log(LOG_INFO, "withdrawing server sockets");
shutdown(s->server_sock, SHUT_RDWR); shutdown(s->server_sock, SHUT_RDWR);
close(s->server_sock); close(s->server_sock);
return 0; return 0;
} }
static int32_t handle_new_connection(struct qb_ipcs_service *s, static int32_t
int32_t auth_result, handle_new_connection(struct qb_ipcs_service *s,
int32_t sock, int32_t auth_result,
void *msg, size_t len, int32_t sock,
struct ipc_auth_ugp *ugp) void *msg, size_t len, struct ipc_auth_ugp *ugp)
{ {
struct qb_ipcs_connection *c = NULL; struct qb_ipcs_connection *c = NULL;
struct qb_ipc_connection_request *req = msg; struct qb_ipc_connection_request *req = msg;
int32_t res = auth_result; int32_t res = auth_result;
int32_t res2 = 0;
struct qb_ipc_connection_response response; struct qb_ipc_connection_response response;
if (res != 0) { c = qb_ipcs_connection_alloc(s);
goto send_response; if (c == NULL) {
qb_ipcc_us_sock_close(sock);
return -ENOMEM;
} }
c = qb_ipcs_connection_alloc(s); c->receive_buf = malloc(req->max_msg_size);
if (c->receive_buf == NULL) {
free(c);
qb_ipcc_us_sock_close(sock);
return -ENOMEM;
}
c->setup.u.us.sock = sock; c->setup.u.us.sock = sock;
c->request.max_msg_size = req->max_msg_size; c->request.max_msg_size = req->max_msg_size;
c->response.max_msg_size = req->max_msg_size; c->response.max_msg_size = req->max_msg_size;
c->event.max_msg_size = req->max_msg_size; c->event.max_msg_size = req->max_msg_size;
c->pid = ugp->pid; c->pid = ugp->pid;
c->euid = ugp->uid; c->euid = ugp->uid;
c->egid = ugp->gid; c->egid = ugp->gid;
c->stats.client_pid = ugp->pid; c->stats.client_pid = ugp->pid;
if (c->service->serv_fns.connection_accept) { if (auth_result == 0 && c->service->serv_fns.connection_accept) {
res = c->service->serv_fns.connection_accept(c, res = c->service->serv_fns.connection_accept(c,
c->euid, c->euid, c->egi
c->egid); d);
} }
if (res != 0) { if (res != 0) {
goto send_response; goto send_response;
} }
qb_util_log(LOG_INFO, "IPC credentials authenticated"); qb_util_log(LOG_INFO, "IPC credentials authenticated");
memset(&response, 0, sizeof(response)); memset(&response, 0, sizeof(response));
if (s->funcs.connect) { if (s->funcs.connect) {
res = s->funcs.connect(s, c, &response); res = s->funcs.connect(s, c, &response);
if (res != 0) { if (res != 0) {
goto send_response; goto send_response;
} }
} }
/*
* The connection is good, add it to the active connection list
*/
c->state = QB_IPCS_CONNECTION_ACTIVE; c->state = QB_IPCS_CONNECTION_ACTIVE;
qb_list_add(&c->list, &s->connections); qb_list_add(&c->list, &s->connections);
c->receive_buf = malloc(c->request.max_msg_size);
if (s->needs_sock_for_poll) { if (s->needs_sock_for_poll) {
qb_ipcs_connection_ref(c); qb_ipcs_connection_ref(c);
res = s->poll_fns.dispatch_add(s->poll_priority, res = s->poll_fns.dispatch_add(s->poll_priority,
c->setup.u.us.sock, c->setup.u.us.sock,
POLLIN | POLLPRI | POLLNVAL, POLLIN | POLLPRI | POLLNVAL,
c, c,
qb_ipcs_dispatch_connection_r equest); qb_ipcs_dispatch_connection_r equest);
} }
if (s->type == QB_IPC_SOCKET) { if (s->type == QB_IPC_SOCKET) {
c->request.u.us.sock = c->setup.u.us.sock; c->request.u.us.sock = c->setup.u.us.sock;
c->response.u.us.sock = c->setup.u.us.sock; c->response.u.us.sock = c->setup.u.us.sock;
res = s->poll_fns.dispatch_add(s->poll_priority, res = s->poll_fns.dispatch_add(s->poll_priority,
c->request.u.us.sock, c->request.u.us.sock,
POLLIN | POLLPRI | POLLNVAL, POLLIN | POLLPRI | POLLNVAL,
c, c,
qb_ipcs_dispatch_connection_r equest); qb_ipcs_dispatch_connection_r equest);
if (res < 0) { if (res < 0) {
qb_util_log(LOG_ERR, "Error adding socket to mainloo qb_util_log(LOG_ERR,
p."); "Error adding socket to mainloop.");
} }
} }
send_response: send_response:
response.hdr.id = QB_IPC_MSG_AUTHENTICATE; response.hdr.id = QB_IPC_MSG_AUTHENTICATE;
response.hdr.size = sizeof(response); response.hdr.size = sizeof(response);
response.hdr.error = res; response.hdr.error = res;
if (res == 0) { if (res == 0) {
response.connection = (intptr_t)c; response.connection = (intptr_t) c;
response.connection_type = s->type; response.connection_type = s->type;
response.max_msg_size = c->request.max_msg_size; response.max_msg_size = c->request.max_msg_size;
s->stats.active_connections++; s->stats.active_connections++;
} }
res = qb_ipc_us_send(&c->setup, &response, response.hdr.size); res2 = qb_ipc_us_send(&c->setup, &response, response.hdr.size);
if (res == response.hdr.size) { if (res == 0 && res2 != response.hdr.size) {
res = 0; res = res2;
}
if (res < 0) {
qb_util_log(LOG_ERR, "Error sending connection response: %s"
,
strerror(-res));
} }
if (res == 0) { if (res == 0) {
if (s->serv_fns.connection_created) { if (s->serv_fns.connection_created) {
s->serv_fns.connection_created(c); s->serv_fns.connection_created(c);
} }
} else if (res == -EACCES) { c->state = QB_IPCS_CONNECTION_ESTABLISHED;
qb_util_log(LOG_ERR, "Invalid IPC credentials.");
} else { } else {
qb_util_log(LOG_ERR, "Error in connection setup: %s.", if (res == -EACCES) {
strerror(-res)); qb_util_log(LOG_ERR, "Invalid IPC credentials.");
} } else {
if (res != 0 && c) { qb_util_perror(LOG_ERR, "Error in connection setup")
;
}
qb_ipcs_disconnect(c); qb_ipcs_disconnect(c);
} else if (res != 0) {
qb_ipcc_us_sock_close(sock);
} }
return res; return res;
} }
static void handle_connection_new_sock(struct qb_ipcs_service *s, static void
int32_t sock, void *msg) handle_connection_new_sock(struct qb_ipcs_service *s, int32_t sock, void *m
sg)
{ {
struct qb_ipcs_connection *c = NULL; struct qb_ipcs_connection *c = NULL;
struct qb_ipc_event_connection_request *req = msg; struct qb_ipc_event_connection_request *req = msg;
c = (struct qb_ipcs_connection *)req->connection; c = (struct qb_ipcs_connection *)req->connection;
c->event.u.us.sock = sock; c->event.u.us.sock = sock;
} }
static int32_t qb_ipcs_uc_recv_and_auth(int32_t sock, void *msg, size_t len static int32_t
, qb_ipcs_uc_recv_and_auth(int32_t sock, void *msg, size_t len,
struct ipc_auth_ugp *ugp) struct ipc_auth_ugp *ugp)
{ {
int32_t res = 0; int32_t res = 0;
struct msghdr msg_recv; struct msghdr msg_recv;
struct iovec iov_recv; struct iovec iov_recv;
#ifdef QB_LINUX #ifdef QB_LINUX
struct cmsghdr *cmsg;
char cmsg_cred[CMSG_SPACE(sizeof(struct ucred))]; char cmsg_cred[CMSG_SPACE(sizeof(struct ucred))];
int off = 0; int off = 0;
int on = 1; int on = 1;
struct ucred *cred;
#endif #endif
msg_recv.msg_flags = 0; msg_recv.msg_flags = 0;
msg_recv.msg_iov = &iov_recv; msg_recv.msg_iov = &iov_recv;
msg_recv.msg_iovlen = 1; msg_recv.msg_iovlen = 1;
msg_recv.msg_name = 0; msg_recv.msg_name = 0;
msg_recv.msg_namelen = 0; msg_recv.msg_namelen = 0;
#ifdef QB_LINUX #ifdef QB_LINUX
msg_recv.msg_control = (void *)cmsg_cred; msg_recv.msg_control = (void *)cmsg_cred;
msg_recv.msg_controllen = sizeof(cmsg_cred); msg_recv.msg_controllen = sizeof(cmsg_cred);
#endif #endif
skipping to change at line 710 skipping to change at line 724
res = 0; res = 0;
} else { } else {
res = -errno; res = -errno;
} }
} }
#elif SO_PASSCRED #elif SO_PASSCRED
/* /*
* Usually Linux systems * Usually Linux systems
*/ */
cmsg = CMSG_FIRSTHDR(&msg_recv); {
assert(cmsg != NULL); struct ucred cred;
cred = (struct ucred *)CMSG_DATA(cmsg); struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg_recv);
if (cred) { assert(cmsg != NULL);
res = 0; if (CMSG_DATA(cmsg)) {
ugp->pid = cred->pid; memcpy(&cred, CMSG_DATA(cmsg), sizeof(struct ucred))
ugp->uid = cred->uid; ;
ugp->gid = cred->gid; res = 0;
} else { ugp->pid = cred.pid;
res = -EBADMSG; ugp->uid = cred.uid;
ugp->gid = cred.gid;
} else {
res = -EBADMSG;
}
} }
#else /* no credentials */ #else /* no credentials */
ugp->pid = 0; ugp->pid = 0;
ugp->uid = 0; ugp->uid = 0;
ugp->gid = 0; ugp->gid = 0;
res = -ENOTSUP; res = -ENOTSUP;
#endif /* no credentials */ #endif /* no credentials */
cleanup_and_return: cleanup_and_return:
#ifdef QB_LINUX #ifdef QB_LINUX
setsockopt(sock, SOL_SOCKET, SO_PASSCRED, &off, sizeof(off)); setsockopt(sock, SOL_SOCKET, SO_PASSCRED, &off, sizeof(off));
#endif #endif
return res; return res;
} }
static int32_t qb_ipcs_us_connection_acceptor(int fd, int revent, void *dat static int32_t
a) qb_ipcs_us_connection_acceptor(int fd, int revent, void *data)
{ {
struct sockaddr_un un_addr; struct sockaddr_un un_addr;
int32_t new_fd; int32_t new_fd;
struct qb_ipcs_service *s = (struct qb_ipcs_service *)data; struct qb_ipcs_service *s = (struct qb_ipcs_service *)data;
int32_t res; int32_t res;
struct qb_ipc_connection_request setup_msg; struct qb_ipc_connection_request setup_msg;
struct ipc_auth_ugp ugp; struct ipc_auth_ugp ugp;
socklen_t addrlen = sizeof(struct sockaddr_un); socklen_t addrlen = sizeof(struct sockaddr_un);
char error_str[100];
retry_accept: retry_accept:
errno = 0; errno = 0;
new_fd = accept(fd, (struct sockaddr *)&un_addr, &addrlen); new_fd = accept(fd, (struct sockaddr *)&un_addr, &addrlen);
if (new_fd == -1 && errno == EINTR) { if (new_fd == -1 && errno == EINTR) {
goto retry_accept; goto retry_accept;
} }
if (new_fd == -1 && errno == EBADF) { if (new_fd == -1 && errno == EBADF) {
strerror_r(errno, error_str, 100); qb_util_perror(LOG_ERR,
qb_util_log(LOG_ERR, "Could not accept client connection from fd:%
"Could not accept Library connection:(fd: %d) [% d",
d] %s", fd);
fd, errno, error_str);
return -1; return -1;
} }
if (new_fd == -1) { if (new_fd == -1) {
res = -errno; res = -errno;
qb_util_log(LOG_ERR, qb_util_perror(LOG_ERR, "Could not accept client connection"
"Could not accept Library connection: [%d] %s", );
errno, strerror(-res)); /* This is an error, but -1 would indicate disconnect
return 0; /* This is an error, but -1 would indicate d * from the poll loop
isconnect from poll loop */ */
return 0;
} }
res = qb_util_fd_nonblock_cloexec_set(new_fd); res = qb_sys_fd_nonblock_cloexec_set(new_fd);
if (res < 0) { if (res < 0) {
close(new_fd); close(new_fd);
return 0; /* This is an error, but -1 would indicate d /* This is an error, but -1 would indicate disconnect
isconnect from poll loop */ * from the poll loop
*/
return 0;
} }
res = qb_ipcs_uc_recv_and_auth(new_fd, &setup_msg, sizeof(setup_msg) , res = qb_ipcs_uc_recv_and_auth(new_fd, &setup_msg, sizeof(setup_msg) ,
&ugp); &ugp);
if (setup_msg.hdr.id == QB_IPC_MSG_AUTHENTICATE) { if (setup_msg.hdr.id == QB_IPC_MSG_AUTHENTICATE) {
res = handle_new_connection(s, res, new_fd, &setup_msg, size res = handle_new_connection(s, res, new_fd, &setup_msg,
of(setup_msg), sizeof(setup_msg), &ugp);
&ugp);
} else if (setup_msg.hdr.id == QB_IPC_MSG_NEW_EVENT_SOCK) { } else if (setup_msg.hdr.id == QB_IPC_MSG_NEW_EVENT_SOCK) {
if (res == 0) { if (res == 0) {
handle_connection_new_sock(s, new_fd, &setup_msg); handle_connection_new_sock(s, new_fd, &setup_msg);
} else { } else {
close(new_fd); close(new_fd);
} }
} else { } else {
close(new_fd); close(new_fd);
} }
return 0; return 0;
} }
static int32_t qb_ipcs_us_connect(struct qb_ipcs_service *s, static int32_t
struct qb_ipcs_connection *c, qb_ipcs_us_connect(struct qb_ipcs_service *s,
struct qb_ipc_connection_response *r) struct qb_ipcs_connection *c,
struct qb_ipc_connection_response *r)
{ {
char path[PATH_MAX]; char path[PATH_MAX];
int32_t fd_hdr; int32_t fd_hdr;
int32_t res = 0; int32_t res = 0;
struct ipc_us_control * ctl; struct ipc_us_control *ctl;
qb_util_log(LOG_DEBUG, "connecting to client [%d]", c->pid); qb_util_log(LOG_DEBUG, "connecting to client [%d]", c->pid);
snprintf(r->request, NAME_MAX, "qb-%s-control-%d-%d", snprintf(r->request, NAME_MAX, "qb-%s-control-%d-%d",
s->name, c->pid, c->setup.u.us.sock); s->name, c->pid, c->setup.u.us.sock);
fd_hdr = qb_util_mmap_file_open(path, r->request, fd_hdr = qb_sys_mmap_file_open(path, r->request,
sizeof(struct ipc_us_control), sizeof(struct ipc_us_control),
O_CREAT | O_TRUNC | O_RDWR); O_CREAT | O_TRUNC | O_RDWR);
if (fd_hdr < 0) { if (fd_hdr < 0) {
res = -errno; res = -errno;
qb_util_log(LOG_ERR, "couldn't create file for mmap: %s", qb_util_perror(LOG_ERR, "couldn't create file for mmap");
strerror(-res));
return res; return res;
} }
strcpy(r->request, path); strcpy(r->request, path);
strcpy(c->request.u.us.shared_file_name, r->request); strcpy(c->request.u.us.shared_file_name, r->request);
c->request.u.us.shared_data = mmap(0, c->request.u.us.shared_data = mmap(0,
sizeof(struct ipc_us_control), sizeof(struct ipc_us_control),
PROT_READ | PROT_WRITE, MAP_SHARE D, PROT_READ | PROT_WRITE, MAP_SHARE D,
fd_hdr, 0); fd_hdr, 0);
if (c->request.u.us.shared_data == MAP_FAILED) { if (c->request.u.us.shared_data == MAP_FAILED) {
res = -errno; res = -errno;
qb_util_log(LOG_ERR, "couldn't create mmap for header"); qb_util_perror(LOG_ERR, "couldn't create mmap for header");
goto cleanup_hdr; goto cleanup_hdr;
} }
ctl = (struct ipc_us_control *)c->request.u.us.shared_data; ctl = (struct ipc_us_control *)c->request.u.us.shared_data;
ctl->sent = 0; ctl->sent = 0;
ctl->flow_control = 0; ctl->flow_control = 0;
close(fd_hdr); close(fd_hdr);
return res; return res;
cleanup_hdr: cleanup_hdr:
close(fd_hdr); close(fd_hdr);
unlink(r->request); unlink(r->request);
munmap(c->request.u.us.shared_data, sizeof(struct ipc_us_control)); munmap(c->request.u.us.shared_data, sizeof(struct ipc_us_control));
return res; return res;
} }
static void qb_ipc_us_fc_set(struct qb_ipc_one_way *one_way, static void
int32_t fc_enable) qb_ipc_us_fc_set(struct qb_ipc_one_way *one_way, int32_t fc_enable)
{ {
struct ipc_us_control *ctl = (struct ipc_us_control *)one_way->u.us. struct ipc_us_control *ctl =
shared_data; (struct ipc_us_control *)one_way->u.us.shared_data;
qb_util_log(LOG_TRACE, "setting fc to %d", fc_enable);
qb_atomic_int_set(&ctl->flow_control, fc_enable); qb_atomic_int_set(&ctl->flow_control, fc_enable);
} }
static int32_t qb_ipc_us_fc_get(struct qb_ipc_one_way *one_way) static int32_t
qb_ipc_us_fc_get(struct qb_ipc_one_way *one_way)
{ {
struct ipc_us_control *ctl = (struct ipc_us_control *)one_way->u.us. struct ipc_us_control *ctl =
shared_data; (struct ipc_us_control *)one_way->u.us.shared_data;
return qb_atomic_int_get(&ctl->flow_control); return qb_atomic_int_get(&ctl->flow_control);
} }
static ssize_t qb_ipc_us_q_len_get(struct qb_ipc_one_way *one_way) static ssize_t
qb_ipc_us_q_len_get(struct qb_ipc_one_way *one_way)
{ {
struct ipc_us_control *ctl = (struct ipc_us_control *)one_way->u.us. struct ipc_us_control *ctl =
shared_data; (struct ipc_us_control *)one_way->u.us.shared_data;
return qb_atomic_int_get(&ctl->sent); return qb_atomic_int_get(&ctl->sent);
} }
static void qb_ipcs_us_disconnect(struct qb_ipcs_connection *c) static void
qb_ipcs_us_disconnect(struct qb_ipcs_connection *c)
{ {
munmap(c->request.u.us.shared_data, sizeof(struct ipc_us_control)); munmap(c->request.u.us.shared_data, sizeof(struct ipc_us_control));
unlink(c->request.u.us.shared_file_name); unlink(c->request.u.us.shared_file_name);
// close(c->setup.u.us.sock);
close(c->request.u.us.sock); close(c->request.u.us.sock);
// close(c->response.u.us.sock);
close(c->event.u.us.sock); close(c->event.u.us.sock);
} }
void qb_ipcs_us_init(struct qb_ipcs_service *s) void
qb_ipcs_us_init(struct qb_ipcs_service *s)
{ {
s->funcs.connect = qb_ipcs_us_connect; s->funcs.connect = qb_ipcs_us_connect;
s->funcs.disconnect = qb_ipcs_us_disconnect; s->funcs.disconnect = qb_ipcs_us_disconnect;
s->funcs.recv = qb_ipc_us_recv; s->funcs.recv = qb_ipc_us_recv;
s->funcs.peek = NULL; s->funcs.peek = NULL;
s->funcs.reclaim = NULL; s->funcs.reclaim = NULL;
s->funcs.send = qb_ipc_us_send; s->funcs.send = qb_ipc_us_send;
s->funcs.sendv = qb_ipc_us_sendv; s->funcs.sendv = qb_ipc_us_sendv;
 End of changes. 84 change blocks. 
160 lines changed or deleted 173 lines changed or added

This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/