| loop_poll.c | loop_poll.c | |||
|---|---|---|---|---|
| skipping to change at line 23 | skipping to change at line 23 | |||
| * libqb is distributed in the hope that it will be useful, | * libqb is distributed in the hope that it will be useful, | |||
| * but WITHOUT ANY WARRANTY; without even the implied warranty of | * but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |||
| * GNU Lesser General Public License for more details. | * GNU Lesser General Public License for more details. | |||
| * | * | |||
| * You should have received a copy of the GNU Lesser General Public License | * You should have received a copy of the GNU Lesser General Public License | |||
| * along with libqb. If not, see <http://www.gnu.org/licenses/>. | * along with libqb. If not, see <http://www.gnu.org/licenses/>. | |||
| */ | */ | |||
| #include "os_base.h" | #include "os_base.h" | |||
| #ifdef HAVE_SYS_RESOURCE_H | ||||
| #include <sys/resource.h> | #include <sys/resource.h> | |||
| #endif | ||||
| #ifdef HAVE_SYS_EPOLL_H | #ifdef HAVE_SYS_EPOLL_H | |||
| #include <sys/epoll.h> | #include <sys/epoll.h> | |||
| #ifndef epoll_create1 | ||||
| int epoll_create1(int flags); | ||||
| #endif /* workaround a set of sparc and alpha broken headers */ | ||||
| #endif /* HAVE_SYS_EPOLL_H */ | #endif /* HAVE_SYS_EPOLL_H */ | |||
| #ifdef HAVE_SYS_POLL_H | ||||
| #include <sys/poll.h> | #include <sys/poll.h> | |||
| #endif /* HAVE_SYS_POLL_H */ | ||||
| #ifndef S_SPLINT_S | #ifndef S_SPLINT_S | |||
| #ifdef HAVE_SYS_TIMERFD_H | #ifdef HAVE_SYS_TIMERFD_H | |||
| #include <sys/timerfd.h> | #include <sys/timerfd.h> | |||
| #endif /* HAVE_SYS_TIMERFD_H */ | #endif /* HAVE_SYS_TIMERFD_H */ | |||
| #endif /* S_SPLINT_S */ | #endif /* S_SPLINT_S */ | |||
| #include <signal.h> | #include <signal.h> | |||
| #include <qb/qbdefs.h> | #include <qb/qbdefs.h> | |||
| #include <qb/qblist.h> | #include <qb/qblist.h> | |||
| #include <qb/qbarray.h> | #include <qb/qbarray.h> | |||
| skipping to change at line 61 | skipping to change at line 69 | |||
| enum qb_poll_type { | enum qb_poll_type { | |||
| QB_UNKNOWN, | QB_UNKNOWN, | |||
| QB_JOB, | QB_JOB, | |||
| QB_POLL, | QB_POLL, | |||
| QB_SIGNAL, | QB_SIGNAL, | |||
| QB_TIMER, | QB_TIMER, | |||
| }; | }; | |||
| struct qb_poll_entry; | struct qb_poll_entry; | |||
| typedef int32_t (*qb_poll_add_to_jobs_fn) (struct qb_loop* l, struct qb_pol | typedef int32_t(*qb_poll_add_to_jobs_fn) (struct qb_loop * l, | |||
| l_entry* pe); | struct qb_poll_entry * pe); | |||
| struct qb_poll_entry { | struct qb_poll_entry { | |||
| struct qb_loop_item item; | struct qb_loop_item item; | |||
| enum qb_poll_type type; | enum qb_poll_type type; | |||
| qb_loop_poll_dispatch_fn poll_dispatch_fn; | qb_loop_poll_dispatch_fn poll_dispatch_fn; | |||
| qb_loop_timer_dispatch_fn timer_dispatch_fn; | qb_loop_timer_dispatch_fn timer_dispatch_fn; | |||
| enum qb_loop_priority p; | enum qb_loop_priority p; | |||
| uint32_t install_pos; | uint32_t install_pos; | |||
| struct pollfd ufd; | struct pollfd ufd; | |||
| qb_poll_add_to_jobs_fn add_to_jobs; | qb_poll_add_to_jobs_fn add_to_jobs; | |||
| skipping to change at line 90 | skipping to change at line 99 | |||
| int32_t epollfd; | int32_t epollfd; | |||
| #else | #else | |||
| struct pollfd *ufds; | struct pollfd *ufds; | |||
| #endif /* HAVE_EPOLL */ | #endif /* HAVE_EPOLL */ | |||
| int32_t poll_entry_count; | int32_t poll_entry_count; | |||
| qb_array_t *poll_entries; | qb_array_t *poll_entries; | |||
| qb_loop_poll_low_fds_event_fn low_fds_event_fn; | qb_loop_poll_low_fds_event_fn low_fds_event_fn; | |||
| int32_t not_enough_fds; | int32_t not_enough_fds; | |||
| }; | }; | |||
| static int32_t _qb_signal_add_to_jobs_(struct qb_loop* l, | static int32_t _qb_signal_add_to_jobs_(struct qb_loop *l, | |||
| struct qb_poll_entry* pe); | struct qb_poll_entry *pe); | |||
| #ifdef HAVE_EPOLL | #ifdef HAVE_EPOLL | |||
| static int32_t _poll_to_epoll_event_(int32_t event) | static int32_t | |||
| _poll_to_epoll_event_(int32_t event) | ||||
| { | { | |||
| int32_t out = 0; | int32_t out = 0; | |||
| if (event & POLLIN) out |= EPOLLIN; | if (event & POLLIN) | |||
| if (event & POLLOUT) out |= EPOLLOUT; | out |= EPOLLIN; | |||
| if (event & POLLPRI) out |= EPOLLPRI; | if (event & POLLOUT) | |||
| if (event & POLLERR) out |= EPOLLERR; | out |= EPOLLOUT; | |||
| if (event & POLLHUP) out |= EPOLLHUP; | if (event & POLLPRI) | |||
| if (event & POLLNVAL) out |= EPOLLERR; | out |= EPOLLPRI; | |||
| if (event & POLLERR) | ||||
| out |= EPOLLERR; | ||||
| if (event & POLLHUP) | ||||
| out |= EPOLLHUP; | ||||
| if (event & POLLNVAL) | ||||
| out |= EPOLLERR; | ||||
| return out; | return out; | |||
| } | } | |||
| static int32_t _epoll_to_poll_event_(int32_t event) | static int32_t | |||
| _epoll_to_poll_event_(int32_t event) | ||||
| { | { | |||
| int32_t out = 0; | int32_t out = 0; | |||
| if (event & EPOLLIN) out |= POLLIN; | if (event & EPOLLIN) | |||
| if (event & EPOLLOUT) out |= POLLOUT; | out |= POLLIN; | |||
| if (event & EPOLLPRI) out |= POLLPRI; | if (event & EPOLLOUT) | |||
| if (event & EPOLLERR) out |= POLLERR; | out |= POLLOUT; | |||
| if (event & EPOLLHUP) out |= POLLHUP; | if (event & EPOLLPRI) | |||
| out |= POLLPRI; | ||||
| if (event & EPOLLERR) | ||||
| out |= POLLERR; | ||||
| if (event & EPOLLHUP) | ||||
| out |= POLLHUP; | ||||
| return out; | return out; | |||
| } | } | |||
| #endif /* HAVE_EPOLL */ | #endif /* HAVE_EPOLL */ | |||
| static void _poll_entry_check_generate_(struct qb_poll_entry *pe) | static void | |||
| _poll_entry_check_generate_(struct qb_poll_entry *pe) | ||||
| { | { | |||
| int32_t i; | int32_t i; | |||
| for (i = 0; i < 200; i++) { | for (i = 0; i < 200; i++) { | |||
| pe->check = random(); | pe->check = random(); | |||
| if (pe->check != 0 && pe->check != 0xffffffff) { | if (pe->check != 0 && pe->check != 0xffffffff) { | |||
| break; | break; | |||
| } | } | |||
| } | } | |||
| } | } | |||
| #if defined(HAVE_TIMERFD) || defined(HAVE_EPOLL) | #if defined(HAVE_TIMERFD) || defined(HAVE_EPOLL) | |||
| static int32_t _poll_entry_from_handle_(struct qb_poll_source *s, | static int32_t | |||
| uint64_t handle_in, | _poll_entry_from_handle_(struct qb_poll_source *s, | |||
| struct qb_poll_entry **pe_pt) | uint64_t handle_in, struct qb_poll_entry **pe_pt) | |||
| { | { | |||
| int32_t res = 0; | int32_t res = 0; | |||
| uint32_t check = ((uint32_t) (((uint64_t) handle_in) >> 32)); | uint32_t check = ((uint32_t) (((uint64_t) handle_in) >> 32)); | |||
| uint32_t handle = handle_in & 0xffffffff; | uint32_t handle = handle_in & 0xffffffff; | |||
| struct qb_poll_entry *pe; | struct qb_poll_entry *pe; | |||
| res = qb_array_index(s->poll_entries, handle, (void**)&pe); | res = qb_array_index(s->poll_entries, handle, (void **)&pe); | |||
| if (res != 0) { | if (res != 0) { | |||
| return res; | return res; | |||
| } | } | |||
| if (pe->check != check) { | if (pe->check != check) { | |||
| return -EINVAL; | return -EINVAL; | |||
| } | } | |||
| *pe_pt = pe; | *pe_pt = pe; | |||
| return 0; | return 0; | |||
| } | } | |||
| #endif /* HAVE_TIMERFD or HAVE_EPOLL */ | #endif /* HAVE_TIMERFD or HAVE_EPOLL */ | |||
| static void _poll_entry_mark_deleted_(struct qb_poll_entry *pe) | static void | |||
| _poll_entry_mark_deleted_(struct qb_poll_entry *pe) | ||||
| { | { | |||
| pe->ufd.fd = -1; | pe->ufd.fd = -1; | |||
| pe->state = QB_POLL_ENTRY_DELETED; | pe->state = QB_POLL_ENTRY_DELETED; | |||
| pe->check = 0; | pe->check = 0; | |||
| } | } | |||
| static void _poll_entry_empty_(struct qb_poll_entry *pe) | static void | |||
| _poll_entry_empty_(struct qb_poll_entry *pe) | ||||
| { | { | |||
| memset(pe, 0, sizeof(struct qb_poll_entry)); | memset(pe, 0, sizeof(struct qb_poll_entry)); | |||
| pe->ufd.fd = -1; | pe->ufd.fd = -1; | |||
| } | } | |||
| static void _poll_dispatch_and_take_back_(struct qb_loop_item * item, | static void | |||
| enum qb_loop_priority p) | _poll_dispatch_and_take_back_(struct qb_loop_item *item, | |||
| enum qb_loop_priority p) | ||||
| { | { | |||
| struct qb_poll_entry *pe = (struct qb_poll_entry *)item; | struct qb_poll_entry *pe = (struct qb_poll_entry *)item; | |||
| int32_t res; | int32_t res; | |||
| #ifdef DEBUG_DISPATCH_TIME | #ifdef DEBUG_DISPATCH_TIME | |||
| uint64_t start; | uint64_t start; | |||
| uint64_t stop; | uint64_t stop; | |||
| int32_t log_warn = QB_FALSE; | int32_t log_warn = QB_FALSE; | |||
| start = qb_util_nano_current_get(); | start = qb_util_nano_current_get(); | |||
| #endif /* DEBUG_DISPATCH_TIME */ | #endif /* DEBUG_DISPATCH_TIME */ | |||
| assert(pe->state == QB_POLL_ENTRY_JOBLIST); | assert(pe->state == QB_POLL_ENTRY_JOBLIST); | |||
| if (pe->type == QB_POLL) { | if (pe->type == QB_POLL) { | |||
| res = pe->poll_dispatch_fn(pe->ufd.fd, pe->ufd.revents, pe-> | res = pe->poll_dispatch_fn(pe->ufd.fd, | |||
| item.user_data); | pe->ufd.revents, | |||
| pe->item.user_data); | ||||
| if (res < 0) { | if (res < 0) { | |||
| _poll_entry_mark_deleted_(pe); | _poll_entry_mark_deleted_(pe); | |||
| } else { | } else { | |||
| pe->state = QB_POLL_ENTRY_ACTIVE; | pe->state = QB_POLL_ENTRY_ACTIVE; | |||
| pe->ufd.revents = 0; | pe->ufd.revents = 0; | |||
| } | } | |||
| } else if (pe->type == QB_TIMER) { | } else if (pe->type == QB_TIMER) { | |||
| _poll_entry_mark_deleted_(pe); | _poll_entry_mark_deleted_(pe); | |||
| pe->timer_dispatch_fn(pe->item.user_data); | pe->timer_dispatch_fn(pe->item.user_data); | |||
| } else { | } else { | |||
| qb_util_log(LOG_WARNING, "poll entry of unknown type:%d stat | qb_util_log(LOG_WARNING, | |||
| e:%d", | "poll entry of unknown type:%d state:%d", pe->ty | |||
| pe->type, pe->state); | pe, | |||
| pe->state); | ||||
| return; | return; | |||
| } | } | |||
| if (pe->state == QB_POLL_ENTRY_ACTIVE) { | if (pe->state == QB_POLL_ENTRY_ACTIVE) { | |||
| #ifdef DEBUG_DISPATCH_TIME | #ifdef DEBUG_DISPATCH_TIME | |||
| pe->runs++; | pe->runs++; | |||
| if ((pe->runs % 50) == 0) { | if ((pe->runs % 50) == 0) { | |||
| log_warn = QB_TRUE; | log_warn = QB_TRUE; | |||
| } | } | |||
| stop = qb_util_nano_current_get(); | stop = qb_util_nano_current_get(); | |||
| if ((stop - start) > (10 * QB_TIME_NS_IN_MSEC)) { | if ((stop - start) > (10 * QB_TIME_NS_IN_MSEC)) { | |||
| log_warn = QB_TRUE; | log_warn = QB_TRUE; | |||
| } | } | |||
| if (log_warn && pe->type == QB_POLL) { | if (log_warn && pe->type == QB_POLL) { | |||
| qb_util_log(LOG_INFO, | qb_util_log(LOG_INFO, | |||
| "[fd:%d] dispatch:%p runs:%d duration:%d ms", | "[fd:%d] dispatch:%p runs:%d duration:%d ms", | |||
| pe->ufd.fd, pe->poll_dispatch_fn, | pe->ufd.fd, pe->poll_dispatch_fn, | |||
| pe->runs, | pe->runs, | |||
| (int32_t)((stop - start)/QB_TIME_NS_IN_M | (int32_t) ((stop - | |||
| SEC)); | start) / QB_TIME_NS_IN_MSEC) | |||
| ); | ||||
| } else if (log_warn && pe->type == QB_TIMER) { | } else if (log_warn && pe->type == QB_TIMER) { | |||
| qb_util_log(LOG_INFO, | qb_util_log(LOG_INFO, | |||
| "[timer] dispatch:%p runs:%d duration:%d ms", | "[timer] dispatch:%p runs:%d duration:%d ms", | |||
| pe->timer_dispatch_fn, | pe->timer_dispatch_fn, | |||
| pe->runs, | pe->runs, | |||
| (int32_t)((stop - start)/QB_TIME_NS_IN_M | (int32_t) ((stop - | |||
| SEC)); | start) / QB_TIME_NS_IN_MSEC) | |||
| ); | ||||
| } | } | |||
| #endif /* DEBUG_DISPATCH_TIME */ | #endif /* DEBUG_DISPATCH_TIME */ | |||
| } | } | |||
| } | } | |||
| static void _poll_fds_usage_check_(struct qb_poll_source *s) | static void | |||
| _poll_fds_usage_check_(struct qb_poll_source *s) | ||||
| { | { | |||
| struct rlimit lim; | struct rlimit lim; | |||
| static int32_t socks_limit = 0; | static int32_t socks_limit = 0; | |||
| int32_t send_event = 0; | int32_t send_event = 0; | |||
| int32_t socks_used = 0; | int32_t socks_used = 0; | |||
| int32_t socks_avail = 0; | int32_t socks_avail = 0; | |||
| struct qb_poll_entry * pe; | struct qb_poll_entry *pe; | |||
| int32_t i; | int32_t i; | |||
| if (socks_limit == 0) { | if (socks_limit == 0) { | |||
| if (getrlimit(RLIMIT_NOFILE, &lim) == -1) { | if (getrlimit(RLIMIT_NOFILE, &lim) == -1) { | |||
| char error_str[100]; | qb_util_perror(LOG_WARNING, "getrlimit"); | |||
| strerror_r(errno, error_str, 100); | ||||
| printf("getrlimit: %s\n", error_str); | ||||
| return; | return; | |||
| } | } | |||
| socks_limit = lim.rlim_cur; | socks_limit = lim.rlim_cur; | |||
| socks_limit -= POLL_FDS_USED_MISC; | socks_limit -= POLL_FDS_USED_MISC; | |||
| if (socks_limit < 0) { | if (socks_limit < 0) { | |||
| socks_limit = 0; | socks_limit = 0; | |||
| } | } | |||
| } | } | |||
| for (i = 0; i < s->poll_entry_count; i++) { | for (i = 0; i < s->poll_entry_count; i++) { | |||
| assert(qb_array_index(s->poll_entries, i, (void**)&pe) == 0) ; | assert(qb_array_index(s->poll_entries, i, (void **)&pe) == 0 ); | |||
| if ((pe->state == QB_POLL_ENTRY_ACTIVE || | if ((pe->state == QB_POLL_ENTRY_ACTIVE || | |||
| pe->state == QB_POLL_ENTRY_JOBLIST) && | pe->state == QB_POLL_ENTRY_JOBLIST) && pe->ufd.fd != -1 | |||
| pe->ufd.fd != -1) { | ) { | |||
| socks_used++; | socks_used++; | |||
| } | } | |||
| if (pe->state == QB_POLL_ENTRY_DELETED) { | if (pe->state == QB_POLL_ENTRY_DELETED) { | |||
| _poll_entry_empty_(pe); | _poll_entry_empty_(pe); | |||
| } | } | |||
| } | } | |||
| socks_avail = socks_limit - socks_used; | socks_avail = socks_limit - socks_used; | |||
| if (socks_avail < 0) { | if (socks_avail < 0) { | |||
| socks_avail = 0; | socks_avail = 0; | |||
| skipping to change at line 278 | skipping to change at line 307 | |||
| s->not_enough_fds = 0; | s->not_enough_fds = 0; | |||
| send_event = 1; | send_event = 1; | |||
| } | } | |||
| } else { | } else { | |||
| if (socks_avail <= 1) { | if (socks_avail <= 1) { | |||
| s->not_enough_fds = 1; | s->not_enough_fds = 1; | |||
| send_event = 1; | send_event = 1; | |||
| } | } | |||
| } | } | |||
| if (send_event && s->low_fds_event_fn) { | if (send_event && s->low_fds_event_fn) { | |||
| s->low_fds_event_fn(s->not_enough_fds, | s->low_fds_event_fn(s->not_enough_fds, socks_avail); | |||
| socks_avail); | ||||
| } | } | |||
| } | } | |||
| #ifdef HAVE_EPOLL | #ifdef HAVE_EPOLL | |||
| #define MAX_EVENTS 12 | #define MAX_EVENTS 12 | |||
| static int32_t _poll_and_add_to_jobs_(struct qb_loop_source* src, int32_t m | static int32_t | |||
| s_timeout) | _poll_and_add_to_jobs_(struct qb_loop_source *src, int32_t ms_timeout) | |||
| { | { | |||
| int32_t i; | int32_t i; | |||
| int32_t res; | int32_t res; | |||
| int32_t event_count; | int32_t event_count; | |||
| int32_t new_jobs = 0; | int32_t new_jobs = 0; | |||
| struct qb_poll_entry * pe = NULL; | struct qb_poll_entry *pe = NULL; | |||
| struct qb_poll_source * s = (struct qb_poll_source *)src; | struct qb_poll_source *s = (struct qb_poll_source *)src; | |||
| struct epoll_event events[MAX_EVENTS]; | struct epoll_event events[MAX_EVENTS]; | |||
| _poll_fds_usage_check_(s); | _poll_fds_usage_check_(s); | |||
| retry_poll: | retry_poll: | |||
| event_count = epoll_wait(s->epollfd, events, MAX_EVENTS, ms_timeout) ; | event_count = epoll_wait(s->epollfd, events, MAX_EVENTS, ms_timeout) ; | |||
| if (errno == EINTR && event_count == -1) { | if (errno == EINTR && event_count == -1) { | |||
| goto retry_poll; | goto retry_poll; | |||
| } else if (event_count == -1) { | } else if (event_count == -1) { | |||
| return -errno; | return -errno; | |||
| } | } | |||
| for (i = 0; i < event_count; i++) { | for (i = 0; i < event_count; i++) { | |||
| res = _poll_entry_from_handle_(s, events[i].data.u64, &pe); | res = _poll_entry_from_handle_(s, events[i].data.u64, &pe); | |||
| if (res != 0) { | if (res != 0) { | |||
| qb_util_log(LOG_WARNING, "can't find poll entry for | qb_util_log(LOG_WARNING, | |||
| new event."); | "can't find poll entry for new event."); | |||
| continue; | continue; | |||
| } | } | |||
| if (pe->ufd.fd == -1 || pe->state == QB_POLL_ENTRY_DELETED) { | if (pe->ufd.fd == -1 || pe->state == QB_POLL_ENTRY_DELETED) { | |||
| qb_util_log(LOG_WARNING, "can't post new event to a | qb_util_log(LOG_WARNING, | |||
| deleted entry."); | "can't post new event to a deleted entry | |||
| ."); | ||||
| /* | /* | |||
| * empty/deleted | * empty/deleted | |||
| */ | */ | |||
| continue; | continue; | |||
| } | } | |||
| if (events[i].events == pe->ufd.revents || | if (events[i].events == pe->ufd.revents || | |||
| pe->state == QB_POLL_ENTRY_JOBLIST) { | pe->state == QB_POLL_ENTRY_JOBLIST) { | |||
| /* | /* | |||
| * entry already in the job queue. | * entry already in the job queue. | |||
| */ | */ | |||
| continue; | continue; | |||
| } | } | |||
| pe->ufd.revents = _epoll_to_poll_event_(events[i].events); | pe->ufd.revents = _epoll_to_poll_event_(events[i].events); | |||
| new_jobs += pe->add_to_jobs(src->l, pe); | new_jobs += pe->add_to_jobs(src->l, pe); | |||
| } | } | |||
| return new_jobs; | return new_jobs; | |||
| } | } | |||
| #else | #else | |||
| static int32_t _poll_and_add_to_jobs_(struct qb_loop_source* src, int32_t m | static int32_t | |||
| s_timeout) | _poll_and_add_to_jobs_(struct qb_loop_source *src, int32_t ms_timeout) | |||
| { | { | |||
| int32_t i; | int32_t i; | |||
| int32_t res; | int32_t res; | |||
| int32_t new_jobs = 0; | int32_t new_jobs = 0; | |||
| struct qb_poll_entry * pe; | struct qb_poll_entry *pe; | |||
| struct qb_poll_source * s = (struct qb_poll_source *)src; | struct qb_poll_source *s = (struct qb_poll_source *)src; | |||
| _poll_fds_usage_check_(s); | _poll_fds_usage_check_(s); | |||
| for (i = 0; i < s->poll_entry_count; i++) { | for (i = 0; i < s->poll_entry_count; i++) { | |||
| assert(qb_array_index(s->poll_entries, i, (void**)&pe) == 0) ; | assert(qb_array_index(s->poll_entries, i, (void **)&pe) == 0 ); | |||
| memcpy(&s->ufds[i], &pe->ufd, sizeof(struct pollfd)); | memcpy(&s->ufds[i], &pe->ufd, sizeof(struct pollfd)); | |||
| } | } | |||
| retry_poll: | retry_poll: | |||
| res = poll(s->ufds, s->poll_entry_count, ms_timeout); | res = poll(s->ufds, s->poll_entry_count, ms_timeout); | |||
| if (errno == EINTR && res == -1) { | if (errno == EINTR && res == -1) { | |||
| goto retry_poll; | goto retry_poll; | |||
| } else if (res == -1) { | } else if (res == -1) { | |||
| return -errno; | return -errno; | |||
| } | } | |||
| for (i = 0; i < s->poll_entry_count; i++) { | for (i = 0; i < s->poll_entry_count; i++) { | |||
| if (s->ufds[i].fd == -1 || s->ufds[i].revents == 0) { | if (s->ufds[i].fd == -1 || s->ufds[i].revents == 0) { | |||
| /* | /* | |||
| * empty entry | * empty entry | |||
| */ | */ | |||
| continue; | continue; | |||
| } | } | |||
| assert(qb_array_index(s->poll_entries, i, (void**)&pe) == 0) ; | assert(qb_array_index(s->poll_entries, i, (void **)&pe) == 0 ); | |||
| if (pe->state != QB_POLL_ENTRY_ACTIVE || | if (pe->state != QB_POLL_ENTRY_ACTIVE || | |||
| s->ufds[i].revents == pe->ufd.revents) { | s->ufds[i].revents == pe->ufd.revents) { | |||
| /* | /* | |||
| * Wrong state to accept an event. | * Wrong state to accept an event. | |||
| */ | */ | |||
| continue; | continue; | |||
| } | } | |||
| pe->ufd.revents = s->ufds[i].revents; | pe->ufd.revents = s->ufds[i].revents; | |||
| new_jobs += pe->add_to_jobs(src->l, pe); | new_jobs += pe->add_to_jobs(src->l, pe); | |||
| } | } | |||
| return new_jobs; | return new_jobs; | |||
| } | } | |||
| #endif /* HAVE_EPOLL */ | #endif /* HAVE_EPOLL */ | |||
| struct qb_loop_source* | struct qb_loop_source * | |||
| qb_loop_poll_create(struct qb_loop *l) | qb_loop_poll_create(struct qb_loop *l) | |||
| { | { | |||
| struct qb_poll_source *s = malloc(sizeof(struct qb_poll_source)); | struct qb_poll_source *s = malloc(sizeof(struct qb_poll_source)); | |||
| if (s == NULL) { | ||||
| return NULL; | ||||
| } | ||||
| s->s.l = l; | s->s.l = l; | |||
| s->s.dispatch_and_take_back = _poll_dispatch_and_take_back_; | s->s.dispatch_and_take_back = _poll_dispatch_and_take_back_; | |||
| s->s.poll = _poll_and_add_to_jobs_; | s->s.poll = _poll_and_add_to_jobs_; | |||
| s->poll_entries = qb_array_create(128, sizeof(struct qb_poll_entry)) ; | s->poll_entries = qb_array_create(128, sizeof(struct qb_poll_entry)) ; | |||
| s->poll_entry_count = 0; | s->poll_entry_count = 0; | |||
| s->low_fds_event_fn = NULL; | s->low_fds_event_fn = NULL; | |||
| s->not_enough_fds = 0; | s->not_enough_fds = 0; | |||
| #ifdef HAVE_EPOLL | #ifdef HAVE_EPOLL | |||
| s->epollfd = epoll_create1(EPOLL_CLOEXEC); | s->epollfd = epoll_create1(EPOLL_CLOEXEC); | |||
| #else | #else | |||
| s->ufds = 0; | s->ufds = 0; | |||
| #endif /* HAVE_EPOLL */ | #endif /* HAVE_EPOLL */ | |||
| return (struct qb_loop_source*)s; | return (struct qb_loop_source *)s; | |||
| } | } | |||
| void qb_loop_poll_destroy(struct qb_loop *l) | void | |||
| qb_loop_poll_destroy(struct qb_loop *l) | ||||
| { | { | |||
| struct qb_poll_source * s = (struct qb_poll_source *)l->fd_source; | struct qb_poll_source *s = (struct qb_poll_source *)l->fd_source; | |||
| qb_array_free(s->poll_entries); | qb_array_free(s->poll_entries); | |||
| #ifdef HAVE_EPOLL | #ifdef HAVE_EPOLL | |||
| if (s->epollfd != -1) { | if (s->epollfd != -1) { | |||
| close(s->epollfd); | close(s->epollfd); | |||
| s->epollfd = -1; | s->epollfd = -1; | |||
| } | } | |||
| #endif /* HAVE_EPOLL */ | #endif /* HAVE_EPOLL */ | |||
| free(s); | free(s); | |||
| } | } | |||
| int32_t qb_loop_poll_low_fds_event_set(struct qb_loop *l, | int32_t | |||
| qb_loop_poll_low_fds_event_fn fn) | qb_loop_poll_low_fds_event_set(struct qb_loop *l, | |||
| qb_loop_poll_low_fds_event_fn fn) | ||||
| { | { | |||
| struct qb_poll_source * s = (struct qb_poll_source *)l->fd_source; | struct qb_poll_source *s = (struct qb_poll_source *)l->fd_source; | |||
| s->low_fds_event_fn = fn; | s->low_fds_event_fn = fn; | |||
| return 0; | return 0; | |||
| } | } | |||
| static int32_t _get_empty_array_position_(struct qb_poll_source * s) | static int32_t | |||
| _get_empty_array_position_(struct qb_poll_source *s) | ||||
| { | { | |||
| int32_t found = 0; | int32_t found = 0; | |||
| uint32_t install_pos; | uint32_t install_pos; | |||
| int32_t res = 0; | int32_t res = 0; | |||
| struct qb_poll_entry *pe; | struct qb_poll_entry *pe; | |||
| #ifndef HAVE_EPOLL | #ifndef HAVE_EPOLL | |||
| struct pollfd *ufds; | struct pollfd *ufds; | |||
| int32_t new_size = 0; | int32_t new_size = 0; | |||
| #endif /* HAVE_EPOLL */ | #endif /* HAVE_EPOLL */ | |||
| for (found = 0, install_pos = 0; | for (found = 0, install_pos = 0; | |||
| install_pos < s->poll_entry_count; install_pos++) { | install_pos < s->poll_entry_count; install_pos++) { | |||
| assert(qb_array_index(s->poll_entries, install_pos, (void**) | assert(qb_array_index | |||
| &pe) == 0); | (s->poll_entries, install_pos, (void **)&pe) == 0); | |||
| if (pe->state == QB_POLL_ENTRY_EMPTY) { | if (pe->state == QB_POLL_ENTRY_EMPTY) { | |||
| found = 1; | found = 1; | |||
| break; | break; | |||
| } | } | |||
| } | } | |||
| if (found == 0) { | if (found == 0) { | |||
| /* | /* | |||
| * Grow pollfd list | * Grow pollfd list | |||
| */ | */ | |||
| res = qb_array_grow(s->poll_entries, | res = qb_array_grow(s->poll_entries, s->poll_entry_count + 1 | |||
| s->poll_entry_count + 1); | ); | |||
| if (res != 0) { | if (res != 0) { | |||
| return res; | return res; | |||
| } | } | |||
| #ifndef HAVE_EPOLL | #ifndef HAVE_EPOLL | |||
| new_size = (s->poll_entry_count+ 1) * sizeof(struct pollfd); | new_size = (s->poll_entry_count + 1) * sizeof(struct pollfd) ; | |||
| ufds = realloc(s->ufds, new_size); | ufds = realloc(s->ufds, new_size); | |||
| if (ufds == NULL) { | if (ufds == NULL) { | |||
| return -ENOMEM; | return -ENOMEM; | |||
| } | } | |||
| s->ufds = ufds; | s->ufds = ufds; | |||
| #endif /* HAVE_EPOLL */ | #endif /* HAVE_EPOLL */ | |||
| s->poll_entry_count += 1; | s->poll_entry_count += 1; | |||
| install_pos = s->poll_entry_count - 1; | install_pos = s->poll_entry_count - 1; | |||
| } | } | |||
| return install_pos; | return install_pos; | |||
| } | } | |||
| static int32_t _poll_add_(struct qb_loop *l, | static int32_t | |||
| enum qb_loop_priority p, | _poll_add_(struct qb_loop *l, | |||
| int32_t fd, | enum qb_loop_priority p, | |||
| int32_t events, | int32_t fd, int32_t events, void *data, struct qb_poll_entry **pe | |||
| void *data, | _pt) | |||
| struct qb_poll_entry **pe_pt) | ||||
| { | { | |||
| struct qb_poll_entry *pe; | struct qb_poll_entry *pe; | |||
| uint32_t install_pos; | uint32_t install_pos; | |||
| int32_t res = 0; | int32_t res = 0; | |||
| struct qb_poll_source * s; | struct qb_poll_source *s; | |||
| #ifdef HAVE_EPOLL | #ifdef HAVE_EPOLL | |||
| struct epoll_event ev; | struct epoll_event ev; | |||
| #endif /* HAVE_EPOLL */ | #endif /* HAVE_EPOLL */ | |||
| if (l == NULL) { | if (l == NULL) { | |||
| return -EINVAL; | return -EINVAL; | |||
| } | } | |||
| s = (struct qb_poll_source *)l->fd_source; | s = (struct qb_poll_source *)l->fd_source; | |||
| install_pos = _get_empty_array_position_(s); | install_pos = _get_empty_array_position_(s); | |||
| assert(qb_array_index(s->poll_entries, install_pos, (void**)&pe) == 0); | assert(qb_array_index(s->poll_entries, install_pos, (void **)&pe) == 0); | |||
| pe->state = QB_POLL_ENTRY_ACTIVE; | pe->state = QB_POLL_ENTRY_ACTIVE; | |||
| pe->install_pos = install_pos; | pe->install_pos = install_pos; | |||
| _poll_entry_check_generate_(pe); | _poll_entry_check_generate_(pe); | |||
| pe->ufd.fd = fd; | pe->ufd.fd = fd; | |||
| pe->ufd.events = events; | pe->ufd.events = events; | |||
| pe->ufd.revents = 0; | pe->ufd.revents = 0; | |||
| pe->item.user_data = data; | pe->item.user_data = data; | |||
| pe->item.source = (struct qb_loop_source*)l->fd_source; | pe->item.source = (struct qb_loop_source *)l->fd_source; | |||
| pe->p = p; | pe->p = p; | |||
| pe->runs = 0; | pe->runs = 0; | |||
| #ifdef HAVE_EPOLL | #ifdef HAVE_EPOLL | |||
| ev.events = _poll_to_epoll_event_(events); | ev.events = _poll_to_epoll_event_(events); | |||
| ev.data.u64 = (((uint64_t) (pe->check)) << 32) | pe->install_pos; | ev.data.u64 = (((uint64_t) (pe->check)) << 32) | pe->install_pos; | |||
| if (epoll_ctl(s->epollfd, EPOLL_CTL_ADD, fd, &ev) == -1) { | if (epoll_ctl(s->epollfd, EPOLL_CTL_ADD, fd, &ev) == -1) { | |||
| res = -errno; | res = -errno; | |||
| qb_util_log(LOG_ERR, "epoll_ctl(add) : %s", strerror(-res)); | qb_util_perror(LOG_ERR, "epoll_ctl(add)"); | |||
| } | } | |||
| #endif /* HAVE_EPOLL */ | #endif /* HAVE_EPOLL */ | |||
| *pe_pt = pe; | *pe_pt = pe; | |||
| return (res); | return (res); | |||
| } | } | |||
| static int32_t _qb_poll_add_to_jobs_(struct qb_loop* l, struct qb_poll_entr | static int32_t | |||
| y* pe) | _qb_poll_add_to_jobs_(struct qb_loop *l, struct qb_poll_entry *pe) | |||
| { | { | |||
| assert(pe->type == QB_POLL); | assert(pe->type == QB_POLL); | |||
| qb_loop_level_item_add(&l->level[pe->p], &pe->item); | qb_loop_level_item_add(&l->level[pe->p], &pe->item); | |||
| pe->state = QB_POLL_ENTRY_JOBLIST; | pe->state = QB_POLL_ENTRY_JOBLIST; | |||
| return 1; | return 1; | |||
| } | } | |||
| int32_t qb_loop_poll_add(struct qb_loop *l, | int32_t | |||
| enum qb_loop_priority p, | qb_loop_poll_add(struct qb_loop * l, | |||
| int32_t fd, | enum qb_loop_priority p, | |||
| int32_t events, | int32_t fd, | |||
| void *data, | int32_t events, | |||
| qb_loop_poll_dispatch_fn dispatch_fn) | void *data, qb_loop_poll_dispatch_fn dispatch_fn) | |||
| { | { | |||
| struct qb_poll_entry *pe = NULL; | struct qb_poll_entry *pe = NULL; | |||
| int32_t size; | int32_t size; | |||
| int32_t new_size; | int32_t new_size; | |||
| int32_t res; | int32_t res; | |||
| size = ((struct qb_poll_source *)l->fd_source)->poll_entry_count; | size = ((struct qb_poll_source *)l->fd_source)->poll_entry_count; | |||
| res = _poll_add_(l, p, fd, events, data, &pe); | res = _poll_add_(l, p, fd, events, data, &pe); | |||
| new_size = ((struct qb_poll_source *)l->fd_source)->poll_entry_count ; | new_size = ((struct qb_poll_source *)l->fd_source)->poll_entry_count ; | |||
| pe->poll_dispatch_fn = dispatch_fn; | pe->poll_dispatch_fn = dispatch_fn; | |||
| pe->type = QB_POLL; | pe->type = QB_POLL; | |||
| pe->add_to_jobs = _qb_poll_add_to_jobs_; | pe->add_to_jobs = _qb_poll_add_to_jobs_; | |||
| if (new_size > size) { | if (new_size > size) { | |||
| qb_util_log(LOG_DEBUG, | qb_util_log(LOG_TRACE, | |||
| "grown poll array to %d for FD %d", | "grown poll array to %d for FD %d", new_size, fd | |||
| new_size, fd); | ); | |||
| } | } | |||
| return res; | return res; | |||
| } | } | |||
| int32_t qb_loop_poll_mod(struct qb_loop *l, | int32_t | |||
| enum qb_loop_priority p, | qb_loop_poll_mod(struct qb_loop * l, | |||
| int32_t fd, | enum qb_loop_priority p, | |||
| int32_t events, | int32_t fd, | |||
| void *data, | int32_t events, | |||
| qb_loop_poll_dispatch_fn dispatch_fn) | void *data, qb_loop_poll_dispatch_fn dispatch_fn) | |||
| { | { | |||
| uint32_t i; | uint32_t i; | |||
| int32_t res = 0; | int32_t res = 0; | |||
| struct qb_poll_entry *pe; | struct qb_poll_entry *pe; | |||
| struct qb_poll_source * s = (struct qb_poll_source *)l->fd_source; | struct qb_poll_source *s = (struct qb_poll_source *)l->fd_source; | |||
| #ifdef HAVE_EPOLL | #ifdef HAVE_EPOLL | |||
| struct epoll_event ev; | struct epoll_event ev; | |||
| #endif /* HAVE_EPOLL */ | #endif /* HAVE_EPOLL */ | |||
| /* | /* | |||
| * Find file descriptor to modify events and dispatch function | * Find file descriptor to modify events and dispatch function | |||
| */ | */ | |||
| for (i = 0; i < s->poll_entry_count; i++) { | for (i = 0; i < s->poll_entry_count; i++) { | |||
| assert(qb_array_index(s->poll_entries, i, (void**)&pe) == 0) ; | assert(qb_array_index(s->poll_entries, i, (void **)&pe) == 0 ); | |||
| if (pe->ufd.fd != fd) { | if (pe->ufd.fd != fd) { | |||
| continue; | continue; | |||
| } | } | |||
| if (pe->state == QB_POLL_ENTRY_DELETED || pe->check == 0) { | if (pe->state == QB_POLL_ENTRY_DELETED || pe->check == 0) { | |||
| qb_util_log(LOG_ERR, | qb_util_log(LOG_ERR, | |||
| "poll_mod : can't modify entry already d eleted"); | "poll_mod : can't modify entry already d eleted"); | |||
| return -EBADF; | return -EBADF; | |||
| } | } | |||
| pe->poll_dispatch_fn = dispatch_fn; | pe->poll_dispatch_fn = dispatch_fn; | |||
| pe->item.user_data = data; | pe->item.user_data = data; | |||
| pe->p = p; | pe->p = p; | |||
| if (pe->ufd.events != events) { | if (pe->ufd.events != events) { | |||
| #ifdef HAVE_EPOLL | #ifdef HAVE_EPOLL | |||
| ev.events = _poll_to_epoll_event_(events); | ev.events = _poll_to_epoll_event_(events); | |||
| ev.data.u64 = (((uint64_t) (pe->check)) << 32) | i; | ev.data.u64 = (((uint64_t) (pe->check)) << 32) | i; | |||
| if (epoll_ctl(s->epollfd, EPOLL_CTL_MOD, fd, &ev) == -1) { | if (epoll_ctl(s->epollfd, EPOLL_CTL_MOD, fd, &ev) == -1) { | |||
| res = -errno; | res = -errno; | |||
| qb_util_log(LOG_ERR, "epoll_ctl(mod) : %s", strerror(-res)); | qb_util_perror(LOG_ERR, "epoll_ctl(mod)"); | |||
| } | } | |||
| #endif /* HAVE_EPOLL */ | #endif /* HAVE_EPOLL */ | |||
| pe->ufd.events = events; | pe->ufd.events = events; | |||
| } | } | |||
| return res; | return res; | |||
| } | } | |||
| return -EBADF; | return -EBADF; | |||
| } | } | |||
| int32_t qb_loop_poll_del(struct qb_loop *l, int32_t fd) | int32_t | |||
| qb_loop_poll_del(struct qb_loop * l, int32_t fd) | ||||
| { | { | |||
| int32_t i; | int32_t i; | |||
| int32_t res = 0; | int32_t res = 0; | |||
| struct qb_poll_entry *pe; | struct qb_poll_entry *pe; | |||
| struct qb_poll_source * s = (struct qb_poll_source *)l->fd_source; | struct qb_poll_source *s = (struct qb_poll_source *)l->fd_source; | |||
| for (i = 0; i < s->poll_entry_count; i++) { | for (i = 0; i < s->poll_entry_count; i++) { | |||
| assert(qb_array_index(s->poll_entries, i, (void**)&pe) == 0) ; | assert(qb_array_index(s->poll_entries, i, (void **)&pe) == 0 ); | |||
| if (pe->ufd.fd != fd || pe->type != QB_POLL) { | if (pe->ufd.fd != fd || pe->type != QB_POLL) { | |||
| continue; | continue; | |||
| } | } | |||
| if (pe->state == QB_POLL_ENTRY_DELETED || | if (pe->state == QB_POLL_ENTRY_DELETED || | |||
| pe->state == QB_POLL_ENTRY_EMPTY) { | pe->state == QB_POLL_ENTRY_EMPTY) { | |||
| return 0; | return 0; | |||
| } | } | |||
| if (pe->state == QB_POLL_ENTRY_JOBLIST) { | if (pe->state == QB_POLL_ENTRY_JOBLIST) { | |||
| qb_loop_level_item_del(&l->level[pe->p], &pe->item); | qb_loop_level_item_del(&l->level[pe->p], &pe->item); | |||
| } | } | |||
| #ifdef HAVE_EPOLL | #ifdef HAVE_EPOLL | |||
| if (epoll_ctl(s->epollfd, EPOLL_CTL_DEL, fd, NULL) == -1) { | if (epoll_ctl(s->epollfd, EPOLL_CTL_DEL, fd, NULL) == -1) { | |||
| res = -errno; | res = -errno; | |||
| qb_util_log(LOG_ERR, "epoll_ctl(del) : %s", | qb_util_perror(LOG_WARNING, "epoll_ctl(del)"); | |||
| strerror(-res)); | ||||
| } | } | |||
| #else | #else | |||
| s->ufds[i].fd = -1; | s->ufds[i].fd = -1; | |||
| s->ufds[i].events = 0; | s->ufds[i].events = 0; | |||
| s->ufds[i].revents = 0; | s->ufds[i].revents = 0; | |||
| #endif /* HAVE_EPOLL */ | #endif /* HAVE_EPOLL */ | |||
| _poll_entry_mark_deleted_(pe); | _poll_entry_mark_deleted_(pe); | |||
| return res; | return res; | |||
| } | } | |||
| return -EBADF; | return -EBADF; | |||
| } | } | |||
| #ifdef HAVE_TIMERFD | #ifdef HAVE_TIMERFD | |||
| static int32_t _qb_timer_add_to_jobs_(struct qb_loop* l, struct qb_poll_ent | static int32_t | |||
| ry* pe) | _qb_timer_add_to_jobs_(struct qb_loop *l, struct qb_poll_entry *pe) | |||
| { | { | |||
| uint64_t expired = 0; | uint64_t expired = 0; | |||
| ssize_t bytes = 0; | ssize_t bytes = 0; | |||
| assert(pe->type == QB_TIMER); | assert(pe->type == QB_TIMER); | |||
| if (pe->ufd.revents == POLLIN) { | if (pe->ufd.revents == POLLIN) { | |||
| bytes = read(pe->ufd.fd, &expired, sizeof(expired)); | bytes = read(pe->ufd.fd, &expired, sizeof(expired)); | |||
| if (bytes != sizeof(expired)) { | if (bytes != sizeof(expired)) { | |||
| qb_util_log(LOG_WARNING, | qb_util_perror(LOG_WARNING, | |||
| "couldn't read from timer fd %zd %s", | "couldn't read from timer fd %zd", | |||
| bytes, strerror(errno)); | bytes); | |||
| } | } | |||
| qb_loop_level_item_add(&l->level[pe->p], &pe->item); | qb_loop_level_item_add(&l->level[pe->p], &pe->item); | |||
| } else { | } else { | |||
| qb_util_log(LOG_ERR, "timer revents: %d expected %d", | qb_util_log(LOG_ERR, "timer revents: %d expected %d", | |||
| pe->ufd.revents, POLLIN); | pe->ufd.revents, POLLIN); | |||
| } | } | |||
| close(pe->ufd.fd); | close(pe->ufd.fd); | |||
| pe->ufd.fd = -1; | pe->ufd.fd = -1; | |||
| pe->state = QB_POLL_ENTRY_JOBLIST; | pe->state = QB_POLL_ENTRY_JOBLIST; | |||
| return 1; | return 1; | |||
| } | } | |||
| int32_t qb_loop_timer_msec_duration_to_expire(struct qb_loop_source *timer_ | int32_t | |||
| source) | qb_loop_timer_msec_duration_to_expire(struct qb_loop_source * timer_source) | |||
| { | { | |||
| return -1; | return -1; | |||
| } | } | |||
| struct qb_loop_source* | struct qb_loop_source * | |||
| qb_loop_timer_create(struct qb_loop *l) | qb_loop_timer_create(struct qb_loop *l) | |||
| { | { | |||
| return NULL; | return NULL; | |||
| } | } | |||
| void qb_loop_timer_destroy(struct qb_loop *l) | void | |||
| qb_loop_timer_destroy(struct qb_loop *l) | ||||
| { | { | |||
| } | } | |||
| int32_t qb_loop_timer_add(struct qb_loop *l, | int32_t | |||
| enum qb_loop_priority p, | qb_loop_timer_add(struct qb_loop *l, | |||
| uint64_t nsec_duration, | enum qb_loop_priority p, | |||
| void *data, | uint64_t nsec_duration, | |||
| qb_loop_timer_dispatch_fn timer_fn, | void *data, | |||
| qb_loop_timer_handle * timer_handle_out) | qb_loop_timer_dispatch_fn timer_fn, | |||
| qb_loop_timer_handle * timer_handle_out) | ||||
| { | { | |||
| struct qb_poll_entry *pe; | struct qb_poll_entry *pe; | |||
| int32_t fd; | int32_t fd; | |||
| int32_t res; | int32_t res; | |||
| int32_t size, new_size; | int32_t size, new_size; | |||
| struct itimerspec its; | struct itimerspec its; | |||
| if (l == NULL || timer_fn == NULL) { | if (l == NULL || timer_fn == NULL) { | |||
| qb_util_log(LOG_ERR, | qb_util_log(LOG_ERR, | |||
| "can't add a timer with either (l == NULL || tim er_fn == NULL)"); | "can't add a timer with either (l == NULL || tim er_fn == NULL)"); | |||
| return -EINVAL; | return -EINVAL; | |||
| } | } | |||
| if (timer_handle_out == NULL) { | if (timer_handle_out == NULL) { | |||
| qb_util_log(LOG_ERR, "can't add a timer with (timer_handle_o | qb_util_log(LOG_ERR, | |||
| ut == NULL)"); | "can't add a timer with (timer_handle_out == NUL | |||
| L)"); | ||||
| return -ENOENT; | return -ENOENT; | |||
| } | } | |||
| fd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC|TFD_NONBLOCK); | fd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK); | |||
| if (fd == -1) { | if (fd == -1) { | |||
| res = -errno; | res = -errno; | |||
| qb_util_log(LOG_ERR, "failed to create timer: %s", | qb_util_perror(LOG_ERR, "failed to create timer"); | |||
| strerror(-res)); | ||||
| return res; | return res; | |||
| } | } | |||
| its.it_interval.tv_sec = 0; | its.it_interval.tv_sec = 0; | |||
| its.it_interval.tv_nsec = 0; | its.it_interval.tv_nsec = 0; | |||
| its.it_value.tv_sec = nsec_duration / QB_TIME_NS_IN_SEC; | its.it_value.tv_sec = nsec_duration / QB_TIME_NS_IN_SEC; | |||
| its.it_value.tv_nsec = nsec_duration % QB_TIME_NS_IN_SEC; | its.it_value.tv_nsec = nsec_duration % QB_TIME_NS_IN_SEC; | |||
| res = timerfd_settime(fd, 0, &its, NULL); | res = timerfd_settime(fd, 0, &its, NULL); | |||
| if (res == -1) { | if (res == -1) { | |||
| res = -errno; | res = -errno; | |||
| qb_util_log(LOG_ERR, "failed to set time on timer: %s", | qb_util_perror(LOG_ERR, "failed to set time on timer"); | |||
| strerror(-res)); | ||||
| goto close_and_return; | goto close_and_return; | |||
| } | } | |||
| size = ((struct qb_poll_source *)l->fd_source)->poll_entry_count; | size = ((struct qb_poll_source *)l->fd_source)->poll_entry_count; | |||
| res = _poll_add_(l, p, fd, POLLIN, data, &pe); | res = _poll_add_(l, p, fd, POLLIN, data, &pe); | |||
| if (res != 0) { | if (res != 0) { | |||
| goto close_and_return; | goto close_and_return; | |||
| } | } | |||
| new_size = ((struct qb_poll_source *)l->fd_source)->poll_entry_count ; | new_size = ((struct qb_poll_source *)l->fd_source)->poll_entry_count ; | |||
| if (new_size > size) { | if (new_size > size) { | |||
| qb_util_log(LOG_DEBUG, "grown poll array to %d for timer %d" , | qb_util_log(LOG_TRACE, "grown poll array to %d for timer %d" , | |||
| new_size, fd); | new_size, fd); | |||
| } | } | |||
| pe->timer_dispatch_fn = timer_fn; | pe->timer_dispatch_fn = timer_fn; | |||
| pe->type = QB_TIMER; | pe->type = QB_TIMER; | |||
| pe->add_to_jobs = _qb_timer_add_to_jobs_; | pe->add_to_jobs = _qb_timer_add_to_jobs_; | |||
| *timer_handle_out = (((uint64_t) (pe->check)) << 32) | pe->install_p os; | *timer_handle_out = (((uint64_t) (pe->check)) << 32) | pe->install_p os; | |||
| return res; | return res; | |||
| close_and_return: | close_and_return: | |||
| close(fd); | close(fd); | |||
| return res; | return res; | |||
| } | } | |||
| int32_t qb_loop_timer_del(struct qb_loop *l, qb_loop_timer_handle th) | int32_t | |||
| qb_loop_timer_del(struct qb_loop * l, qb_loop_timer_handle th) | ||||
| { | { | |||
| struct qb_poll_entry *pe; | struct qb_poll_entry *pe; | |||
| struct qb_poll_source *s; | struct qb_poll_source *s; | |||
| int32_t res; | int32_t res; | |||
| if (l == NULL || th == 0) { | if (l == NULL || th == 0) { | |||
| return -EINVAL; | return -EINVAL; | |||
| } | } | |||
| s = (struct qb_poll_source *)l->fd_source; | s = (struct qb_poll_source *)l->fd_source; | |||
| res = _poll_entry_from_handle_(s, th, &pe); | res = _poll_entry_from_handle_(s, th, &pe); | |||
| skipping to change at line 781 | skipping to change at line 820 | |||
| qb_util_log(LOG_DEBUG, "trying to delete timer in JOBLIST"); | qb_util_log(LOG_DEBUG, "trying to delete timer in JOBLIST"); | |||
| } | } | |||
| if (pe->state != QB_POLL_ENTRY_ACTIVE && | if (pe->state != QB_POLL_ENTRY_ACTIVE && | |||
| pe->state != QB_POLL_ENTRY_JOBLIST) { | pe->state != QB_POLL_ENTRY_JOBLIST) { | |||
| qb_util_log(LOG_WARNING, "trying to delete timer with state: %d", | qb_util_log(LOG_WARNING, "trying to delete timer with state: %d", | |||
| pe->state); | pe->state); | |||
| return -EINVAL; | return -EINVAL; | |||
| } | } | |||
| if (pe->ufd.fd != -1) { | if (pe->ufd.fd != -1) { | |||
| #ifdef HAVE_EPOLL | #ifdef HAVE_EPOLL | |||
| if (epoll_ctl(s->epollfd, EPOLL_CTL_DEL, pe->ufd.fd, NULL) = | if (epoll_ctl(s->epollfd, EPOLL_CTL_DEL, pe->ufd.fd, NULL) = | |||
| = -1) { | = | |||
| -1) { | ||||
| res = -errno; | res = -errno; | |||
| qb_util_log(LOG_ERR, "epoll_ctl(del:%d) : %s", | qb_util_perror(LOG_WARNING, "epoll_ctl(del:%d)", | |||
| pe->ufd.fd, strerror(-res)); | pe->ufd.fd); | |||
| } | } | |||
| #else | #else | |||
| s->ufds[pe->install_pos].fd = -1; | s->ufds[pe->install_pos].fd = -1; | |||
| s->ufds[pe->install_pos].events = 0; | s->ufds[pe->install_pos].events = 0; | |||
| s->ufds[pe->install_pos].revents = 0; | s->ufds[pe->install_pos].revents = 0; | |||
| #endif /* HAVE_EPOLL */ | #endif /* HAVE_EPOLL */ | |||
| close(pe->ufd.fd); | close(pe->ufd.fd); | |||
| } | } | |||
| _poll_entry_mark_deleted_(pe); | _poll_entry_mark_deleted_(pe); | |||
| return 0; | return 0; | |||
| } | } | |||
| uint64_t qb_loop_timer_expire_time_get(struct qb_loop *l, qb_loop_timer_han | uint64_t | |||
| dle th) | qb_loop_timer_expire_time_get(struct qb_loop * l, qb_loop_timer_handle th) | |||
| { | { | |||
| struct qb_poll_entry *pe; | struct qb_poll_entry *pe; | |||
| struct qb_poll_source *s; | struct qb_poll_source *s; | |||
| int32_t res = 0; | int32_t res = 0; | |||
| struct itimerspec its; | struct itimerspec its; | |||
| if (l == NULL || th == 0) { | if (l == NULL || th == 0) { | |||
| return 0; | return 0; | |||
| } | } | |||
| s = (struct qb_poll_source *)l->fd_source; | s = (struct qb_poll_source *)l->fd_source; | |||
| res = _poll_entry_from_handle_(s, th, &pe); | res = _poll_entry_from_handle_(s, th, &pe); | |||
| if (res != 0) { | if (res != 0) { | |||
| return res; | return 0; | |||
| } | } | |||
| if (timerfd_gettime(pe->ufd.fd, &its) == -1) { | if (timerfd_gettime(pe->ufd.fd, &its) == -1) { | |||
| return 0; | return 0; | |||
| } | } | |||
| return (its.it_value.tv_sec * QB_TIME_NS_IN_SEC) + its.it_value.tv_n sec; | return (its.it_value.tv_sec * QB_TIME_NS_IN_SEC) + its.it_value.tv_n sec; | |||
| } | } | |||
| #endif /* HAVE_TIMERFD */ | #endif /* HAVE_TIMERFD */ | |||
| static int32_t pipe_fds[2] = {-1, -1}; | static int32_t pipe_fds[2] = { -1, -1 }; | |||
| struct qb_signal_source { | struct qb_signal_source { | |||
| struct qb_loop_source s; | struct qb_loop_source s; | |||
| struct qb_list_head sig_head; | struct qb_list_head sig_head; | |||
| sigset_t signal_superset; | sigset_t signal_superset; | |||
| }; | }; | |||
| struct qb_loop_sig { | struct qb_loop_sig { | |||
| struct qb_loop_item item; | struct qb_loop_item item; | |||
| int32_t signal; | int32_t signal; | |||
| enum qb_loop_priority p; | enum qb_loop_priority p; | |||
| qb_loop_signal_dispatch_fn dispatch_fn; | qb_loop_signal_dispatch_fn dispatch_fn; | |||
| struct qb_loop_sig *cloned_from; | struct qb_loop_sig *cloned_from; | |||
| }; | }; | |||
| static void _handle_real_signal_(int signal_num, siginfo_t * si, void *cont | static void | |||
| ext) | _handle_real_signal_(int signal_num, siginfo_t * si, void *context) | |||
| { | { | |||
| int32_t sig = signal_num; | int32_t sig = signal_num; | |||
| int32_t res = 0; | int32_t res = 0; | |||
| if (pipe_fds[1] > 0) { | if (pipe_fds[1] > 0) { | |||
| try_again: | try_again: | |||
| res = write(pipe_fds[1], &sig, sizeof(int32_t)); | res = write(pipe_fds[1], &sig, sizeof(int32_t)); | |||
| if (res == -1 && errno == EAGAIN) { | if (res == -1 && errno == EAGAIN) { | |||
| goto try_again; | goto try_again; | |||
| } else if (res != sizeof(int32_t)) { | } else if (res != sizeof(int32_t)) { | |||
| qb_util_log(LOG_ERR, "failed to write signal to pipe | qb_util_log(LOG_ERR, | |||
| [%d]", | "failed to write signal to pipe [%d]", r | |||
| res); | es); | |||
| } | } | |||
| } | } | |||
| } | } | |||
| static void _signal_dispatch_and_take_back_(struct qb_loop_item * item, | static void | |||
| enum qb_loop_priority p) | _signal_dispatch_and_take_back_(struct qb_loop_item *item, | |||
| enum qb_loop_priority p) | ||||
| { | { | |||
| struct qb_loop_sig *sig = (struct qb_loop_sig *)item; | struct qb_loop_sig *sig = (struct qb_loop_sig *)item; | |||
| int32_t res; | int32_t res; | |||
| res = sig->dispatch_fn(sig->signal, sig->item.user_data); | res = sig->dispatch_fn(sig->signal, sig->item.user_data); | |||
| if (res != 0) { | if (res != 0) { | |||
| qb_list_del(&sig->cloned_from->item.list); | qb_list_del(&sig->cloned_from->item.list); | |||
| free(sig->cloned_from); | free(sig->cloned_from); | |||
| } | } | |||
| free(sig); | free(sig); | |||
| } | } | |||
| struct qb_loop_source * | struct qb_loop_source * | |||
| qb_loop_signals_create(struct qb_loop *l) | qb_loop_signals_create(struct qb_loop *l) | |||
| { | { | |||
| int32_t res = 0; | int32_t res = 0; | |||
| struct qb_poll_entry *pe; | struct qb_poll_entry *pe; | |||
| struct qb_signal_source *s = calloc(1, sizeof(struct qb_signal_sourc e)); | struct qb_signal_source *s = calloc(1, sizeof(struct qb_signal_sourc e)); | |||
| if (s == NULL) { | ||||
| return NULL; | ||||
| } | ||||
| s->s.l = l; | s->s.l = l; | |||
| s->s.dispatch_and_take_back = _signal_dispatch_and_take_back_; | s->s.dispatch_and_take_back = _signal_dispatch_and_take_back_; | |||
| s->s.poll = NULL; | s->s.poll = NULL; | |||
| qb_list_init(&s->sig_head); | qb_list_init(&s->sig_head); | |||
| sigemptyset(&s->signal_superset); | sigemptyset(&s->signal_superset); | |||
| if (pipe_fds[0] < 0) { | if (pipe_fds[0] < 0) { | |||
| res = pipe(pipe_fds); | res = pipe(pipe_fds); | |||
| if (res == -1) { | if (res == -1) { | |||
| res = -errno; | res = -errno; | |||
| qb_util_log(LOG_ERR, | qb_util_perror(LOG_ERR, "Can't light pipe"); | |||
| "Can't light pipe: %s", | ||||
| strerror(-res)); | ||||
| goto error_exit; | goto error_exit; | |||
| } | } | |||
| (void)qb_util_fd_nonblock_cloexec_set(pipe_fds[0]); | (void)qb_sys_fd_nonblock_cloexec_set(pipe_fds[0]); | |||
| (void)qb_util_fd_nonblock_cloexec_set(pipe_fds[1]); | (void)qb_sys_fd_nonblock_cloexec_set(pipe_fds[1]); | |||
| res = _poll_add_(l, QB_LOOP_HIGH, | res = _poll_add_(l, QB_LOOP_HIGH, | |||
| pipe_fds[0], POLLIN, | pipe_fds[0], POLLIN, NULL, &pe); | |||
| NULL, &pe); | ||||
| if (res == 0) { | if (res == 0) { | |||
| pe->poll_dispatch_fn = NULL; | pe->poll_dispatch_fn = NULL; | |||
| pe->type = QB_SIGNAL; | pe->type = QB_SIGNAL; | |||
| pe->add_to_jobs = _qb_signal_add_to_jobs_; | pe->add_to_jobs = _qb_signal_add_to_jobs_; | |||
| } else { | } else { | |||
| qb_util_log(LOG_ERR, | qb_util_perror(LOG_ERR, "Can't smoke pipe"); | |||
| "Can't smoke pipe: %s", | ||||
| strerror(-res)); | ||||
| goto error_exit; | goto error_exit; | |||
| } | } | |||
| } | } | |||
| return (struct qb_loop_source *)s; | return (struct qb_loop_source *)s; | |||
| error_exit: | error_exit: | |||
| free(s); | ||||
| errno = -res; | errno = -res; | |||
| free(s); | ||||
| if (pipe_fds[0] >= 0) { | ||||
| close(pipe_fds[0]); | ||||
| } | ||||
| if (pipe_fds[1] >= 0) { | ||||
| close(pipe_fds[1]); | ||||
| } | ||||
| return NULL; | return NULL; | |||
| } | } | |||
| void qb_loop_signals_destroy(struct qb_loop *l) | void | |||
| qb_loop_signals_destroy(struct qb_loop *l) | ||||
| { | { | |||
| struct qb_signal_source *s = | ||||
| (struct qb_signal_source *)l->signal_source; | ||||
| struct qb_list_head *list; | ||||
| struct qb_list_head *n; | ||||
| struct qb_loop_item *item; | ||||
| close(pipe_fds[0]); | close(pipe_fds[0]); | |||
| pipe_fds[0] = -1; | pipe_fds[0] = -1; | |||
| close(pipe_fds[1]); | close(pipe_fds[1]); | |||
| pipe_fds[1] = -1; | pipe_fds[1] = -1; | |||
| qb_list_for_each_safe(list, n, &s->sig_head) { | ||||
| item = qb_list_entry(list, struct qb_loop_item, list); | ||||
| qb_list_del(&item->list); | ||||
| free(item); | ||||
| } | ||||
| free(l->signal_source); | free(l->signal_source); | |||
| } | } | |||
| static int32_t _qb_signal_add_to_jobs_(struct qb_loop* l, | static int32_t | |||
| struct qb_poll_entry* pe) | _qb_signal_add_to_jobs_(struct qb_loop *l, struct qb_poll_entry *pe) | |||
| { | { | |||
| struct qb_signal_source *s = (struct qb_signal_source *)l->signal_so | struct qb_signal_source *s = | |||
| urce; | (struct qb_signal_source *)l->signal_source; | |||
| struct qb_list_head *list; | struct qb_list_head *list; | |||
| struct qb_loop_sig *sig; | struct qb_loop_sig *sig; | |||
| struct qb_loop_item *item; | struct qb_loop_item *item; | |||
| struct qb_loop_sig *new_sig_job; | struct qb_loop_sig *new_sig_job; | |||
| int32_t the_signal; | int32_t the_signal; | |||
| ssize_t res; | ssize_t res; | |||
| int32_t jobs_added = 0; | int32_t jobs_added = 0; | |||
| res = read(pipe_fds[0], &the_signal, sizeof(int32_t)); | res = read(pipe_fds[0], &the_signal, sizeof(int32_t)); | |||
| if (res != sizeof(int32_t)) { | if (res != sizeof(int32_t)) { | |||
| res = -errno; | res = -errno; | |||
| qb_util_log(LOG_ERR, "failed to read pipe: %s", strerror(err no)); | qb_util_perror(LOG_ERR, "failed to read pipe"); | |||
| return 0; | return 0; | |||
| } | } | |||
| pe->ufd.revents = 0; | pe->ufd.revents = 0; | |||
| qb_list_for_each(list, &s->sig_head) { | qb_list_for_each(list, &s->sig_head) { | |||
| item = qb_list_entry(list, struct qb_loop_item, list); | item = qb_list_entry(list, struct qb_loop_item, list); | |||
| sig = (struct qb_loop_sig *)item; | sig = (struct qb_loop_sig *)item; | |||
| if (sig->signal == the_signal) { | if (sig->signal == the_signal) { | |||
| new_sig_job = calloc(1, sizeof(struct qb_loop_sig)); | new_sig_job = calloc(1, sizeof(struct qb_loop_sig)); | |||
| if (new_sig_job == NULL) { | ||||
| return jobs_added; | ||||
| } | ||||
| memcpy(new_sig_job, sig, sizeof(struct qb_loop_sig)) ; | memcpy(new_sig_job, sig, sizeof(struct qb_loop_sig)) ; | |||
| new_sig_job->cloned_from = sig; | new_sig_job->cloned_from = sig; | |||
| qb_loop_level_item_add(&l->level[pe->p], &new_sig_jo | qb_loop_level_item_add(&l->level[pe->p], | |||
| b->item); | &new_sig_job->item); | |||
| jobs_added++; | jobs_added++; | |||
| } | } | |||
| } | } | |||
| return jobs_added; | return jobs_added; | |||
| } | } | |||
| static void _adjust_sigactions_(struct qb_signal_source *s) | static void | |||
| _adjust_sigactions_(struct qb_signal_source *s) | ||||
| { | { | |||
| struct qb_loop_sig *sig; | struct qb_loop_sig *sig; | |||
| struct qb_loop_item *item; | struct qb_loop_item *item; | |||
| struct sigaction sa; | struct sigaction sa; | |||
| int32_t i; | int32_t i; | |||
| int32_t needed; | int32_t needed; | |||
| sa.sa_flags = SA_SIGINFO; | sa.sa_flags = SA_SIGINFO; | |||
| sa.sa_sigaction = _handle_real_signal_; | sa.sa_sigaction = _handle_real_signal_; | |||
| sigemptyset(&s->signal_superset); | sigemptyset(&s->signal_superset); | |||
| skipping to change at line 993 | skipping to change at line 1060 | |||
| } | } | |||
| if (needed) { | if (needed) { | |||
| sigaddset(&s->signal_superset, i); | sigaddset(&s->signal_superset, i); | |||
| sigaction(i, &sa, NULL); | sigaction(i, &sa, NULL); | |||
| } else { | } else { | |||
| (void)signal(i, SIG_DFL); | (void)signal(i, SIG_DFL); | |||
| } | } | |||
| } | } | |||
| } | } | |||
| int32_t qb_loop_signal_add(qb_loop_t *l, | int32_t | |||
| enum qb_loop_priority p, | qb_loop_signal_add(qb_loop_t * l, | |||
| int32_t the_sig, | enum qb_loop_priority p, | |||
| void *data, | int32_t the_sig, | |||
| qb_loop_signal_dispatch_fn dispatch_fn, | void *data, | |||
| qb_loop_signal_handle *handle) | qb_loop_signal_dispatch_fn dispatch_fn, | |||
| qb_loop_signal_handle * handle) | ||||
| { | { | |||
| struct qb_loop_sig *sig; | struct qb_loop_sig *sig; | |||
| struct qb_signal_source *s; | struct qb_signal_source *s; | |||
| if (l == NULL || dispatch_fn == NULL) { | if (l == NULL || dispatch_fn == NULL) { | |||
| return -EINVAL; | return -EINVAL; | |||
| } | } | |||
| if (p < QB_LOOP_LOW || p > QB_LOOP_HIGH) { | if (p < QB_LOOP_LOW || p > QB_LOOP_HIGH) { | |||
| return -EINVAL; | return -EINVAL; | |||
| } | } | |||
| s = (struct qb_signal_source *)l->signal_source; | s = (struct qb_signal_source *)l->signal_source; | |||
| sig = calloc(1, sizeof(struct qb_loop_sig)); | sig = calloc(1, sizeof(struct qb_loop_sig)); | |||
| if (sig == NULL) { | ||||
| return -errno; | ||||
| } | ||||
| sig->dispatch_fn = dispatch_fn; | sig->dispatch_fn = dispatch_fn; | |||
| sig->p = p; | sig->p = p; | |||
| sig->signal = the_sig; | sig->signal = the_sig; | |||
| sig->item.user_data = data; | sig->item.user_data = data; | |||
| sig->item.source = l->signal_source; | sig->item.source = l->signal_source; | |||
| qb_list_init(&sig->item.list); | qb_list_init(&sig->item.list); | |||
| qb_list_add_tail(&sig->item.list, &s->sig_head); | qb_list_add_tail(&sig->item.list, &s->sig_head); | |||
| if (sigismember(&s->signal_superset, the_sig) != 1) { | if (sigismember(&s->signal_superset, the_sig) != 1) { | |||
| _adjust_sigactions_(s); | _adjust_sigactions_(s); | |||
| } | } | |||
| if (handle) { | if (handle) { | |||
| *handle = sig; | *handle = sig; | |||
| } | } | |||
| return 0; | return 0; | |||
| } | } | |||
| int32_t qb_loop_signal_mod(qb_loop_t *l, | int32_t | |||
| enum qb_loop_priority p, | qb_loop_signal_mod(qb_loop_t * l, | |||
| int32_t the_sig, | enum qb_loop_priority p, | |||
| void *data, | int32_t the_sig, | |||
| qb_loop_signal_dispatch_fn dispatch_fn, | void *data, | |||
| qb_loop_signal_handle handle) | qb_loop_signal_dispatch_fn dispatch_fn, | |||
| qb_loop_signal_handle handle) | ||||
| { | { | |||
| struct qb_signal_source *s; | struct qb_signal_source *s; | |||
| struct qb_loop_sig *sig = (struct qb_loop_sig *)handle; | struct qb_loop_sig *sig = (struct qb_loop_sig *)handle; | |||
| if (l == NULL || dispatch_fn == NULL || handle == NULL) { | if (l == NULL || dispatch_fn == NULL || handle == NULL) { | |||
| return -EINVAL; | return -EINVAL; | |||
| } | } | |||
| if (p < QB_LOOP_LOW || p > QB_LOOP_HIGH) { | if (p < QB_LOOP_LOW || p > QB_LOOP_HIGH) { | |||
| return -EINVAL; | return -EINVAL; | |||
| } | } | |||
| skipping to change at line 1061 | skipping to change at line 1133 | |||
| sig->p = p; | sig->p = p; | |||
| if (sig->signal != the_sig) { | if (sig->signal != the_sig) { | |||
| sig->signal = the_sig; | sig->signal = the_sig; | |||
| _adjust_sigactions_(s); | _adjust_sigactions_(s); | |||
| } | } | |||
| return 0; | return 0; | |||
| } | } | |||
| int32_t qb_loop_signal_del(qb_loop_t *l, | int32_t | |||
| qb_loop_signal_handle handle) | qb_loop_signal_del(qb_loop_t * l, qb_loop_signal_handle handle) | |||
| { | { | |||
| struct qb_loop_sig *sig = (struct qb_loop_sig *)handle; | struct qb_loop_sig *sig = (struct qb_loop_sig *)handle; | |||
| qb_list_del(&sig->item.list); | qb_list_del(&sig->item.list); | |||
| free(sig); | free(sig); | |||
| return 0; | return 0; | |||
| } | } | |||
| End of changes. 107 change blocks. | ||||
| 186 lines changed or deleted | 249 lines changed or added | |||
This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/ | ||||