背景
以前搞mesos,接触最多的基础库就是libprocess,说实话libprocess使用了大量的C++11的新特性,对于大多数的人来说,阅读这种代码的门槛还是比较高,我也只是对actor模型整体的有了认识,但是对于libprocess中的很多细节之处还是没有很好的掌握,每隔一段时间在回头看libprocess的代码时,都会有新的发现。
最近想找一个成本比较低的基础库继续学习,然后就选中了libevent。libevent已经被广泛的应用,作为底层的网络库,比如 memcached、Vomit、Nylon、Netchat等等,一直都想研究一下libevent的代码,最近有时间就尝试基于2.0.22版本的libevent实现一个定时器,之所以要实现一个定时器,是因为定时器作为一种基础组件在在大多数的函数库中都存在,并且有多种不同的方式来实现。
定时器实现
其实实现一个定时器让其定时的执行某一个函数还是比较简单的,源代码如下:
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <event2/event.h>
#include <event2/event_struct.h>
#include <event2/util.h>
struct event ev;
struct timeval tv;
void time_cb(int fd, short event, void *argc)
{
printf("timer wakeup\n");
event_add(&ev, &tv); // reschedule timer
}
int main() {
struct event_base *base = event_base_new();
tv.tv_sec = 10; // 10s period
tv.tv_usec = 0;
event_assign(&ev, base, -1, 0, time_cb, NULL);
event_add(&ev, &tv);
event_base_dispatch(base);
}
上述的代码中主要使用了event_base_new
、event_assign
、event_add
和event_base_dispatch
四个函数。依靠这四个函数,可以每隔10秒钟执行一次time_cb
函数
定时器流程
event_base_new()函数
event_base_new()
函数的实现如下,其中主要的功能就是就是读取配置并对event_base
进行初始化。
struct event_base *
event_base_new(void)
{
struct event_base *base = NULL;
struct event_config *cfg = event_config_new();
if (cfg) {
base = event_base_new_with_config(cfg);
event_config_free(cfg);
}
return base;
}
其中主要使用了两个函数,对于函数event_config_new()
,其主要的作用是初见配置变量event_config
;对于函数event_base_new_with_config()
是利用已经初始化的event_config
变量初始化event_base
变量。对于event_config
变量,其主要包含如下的几部分:
struct event_config {
TAILQ_HEAD(event_configq, event_config_entry) entries;
int n_cpus_hint;
enum event_method_feature require_features;
enum event_base_config_flag flags;
};
通过其声明可以看到,其主要包含四部分:
event_configq
是一个队列,用来保存Reactor模型不可以使用的io多路复用函数,这个以后的文章会详细的介绍n_cpus_hint
是用来保存机器CPU的个数,以方便对线程池的线程数量进行调整event_method_feature
用来保存io多路复用函数应该保存的特征event_base_config_flag
用来保存一些的特性,比如是否给event_base加锁、执行event_base_loop是不是使用cache时间以及epoll函数的更少的系统调用的方法等
event_assign()函数
event_assign()
函数主要用已经初始化的event_base
用来初始化event
,包括设置文件标识符、回调函数以及其参数等等。函数的整体结构如下:
int
event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, short events, void (*callback)(evutil_socket_t, short, void *), void *arg)
{
if (!base)
base = current_base;
_event_debug_assert_not_added(ev);
ev->ev_base = base;
ev->ev_callback = callback;
ev->ev_arg = arg;
ev->ev_fd = fd;
ev->ev_events = events;
ev->ev_res = 0;
ev->ev_flags = EVLIST_INIT;
ev->ev_ncalls = 0;
ev->ev_pncalls = NULL;
if (events & EV_SIGNAL) {
if ((events & (EV_READ|EV_WRITE)) != 0) {
event_warnx("%s: EV_SIGNAL is not compatible with "
"EV_READ or EV_WRITE", __func__);
return -1;
}
ev->ev_closure = EV_CLOSURE_SIGNAL;
} else {
if (events & EV_PERSIST) {
evutil_timerclear(&ev->ev_io_timeout);
ev->ev_closure = EV_CLOSURE_PERSIST;
} else {
ev->ev_closure = EV_CLOSURE_NONE;
}
}
min_heap_elem_init(ev);
if (base != NULL) {
/* by default, we put new events into the middle priority */
ev->ev_pri = base->nactivequeues / 2;
}
_event_debug_note_setup(ev);
return 0;
}
可以看到整个函数中大部分都是大部分是对event
的变量进行赋值操作(event变量中关联了一个event_base
变量),对于其中的事件类型进行了详细的处理,主要包括回调函数、回调函数参数、文件标识符、事件类型、事件的状态队列(ev_flags)、优先级、回调函数返回值、回调函数类型(ev_closure)等。
其中另一个亮点就是最小堆
的使用,这里仅仅是对最小堆进行了初始化,在libevent中对于定时事件,使用最小堆对对其进行管理,根据超时时间值来进行插入和删除操作,libevent中的对最小堆的使用也算是惊艳之处吧。如果想了解最小堆的详细实现可以浏览一下minheap-internal.h
这个文件中代码,如果想详细了解数据结构中最小堆,建议可以看看《数据结构与算法分析:c语言描述》这本书,其中对于最小堆的下溢和上溢操作都进行了详细的介绍,与libevent中实现都差不多。
event_add()函数
event_add()
函数按照其名字的可以理解为将事件添加到reactor中,但是此函数又调用了好几层其他函数实现了这个功能,其函数的调用关系如下图:
对于函数event_add()
需要注意到的是在调用event_add_internal
时进行了加锁操作,代码如下:
int
event_add(struct event *ev, const struct timeval *tv)
{
int res;
if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) {
event_warnx("%s: event has no event_base set.", __func__);
return -1;
}
EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);
res = event_add_internal(ev, tv, 0);
EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
return (res);
}
其中主要的工作都由event_add_internal()
这个函数执行,下面看看这个函数的详细情况:
static inline int
event_add_internal(struct event *ev, const struct timeval *tv,
int tv_is_absolute)
{
struct event_base *base = ev->ev_base;
int res = 0;
int notify = 0;
EVENT_BASE_ASSERT_LOCKED(base);
_event_debug_assert_is_setup(ev);
event_debug((
"event_add: event: %p (fd "EV_SOCK_FMT"), %s%s%scall %p",
ev,
EV_SOCK_ARG(ev->ev_fd),
ev->ev_events & EV_READ ? "EV_READ " : " ",
ev->ev_events & EV_WRITE ? "EV_WRITE " : " ",
tv ? "EV_TIMEOUT " : " ",
ev->ev_callback));
EVUTIL_ASSERT(!(ev->ev_flags & ~EVLIST_ALL));
/*
* prepare for timeout insertion further below, if we get a
* failure on any step, we should not change any state.
*/
if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {
if (min_heap_reserve(&base->timeheap,
1 + min_heap_size(&base->timeheap)) == -1)
return (-1); /* ENOMEM == errno */
}
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
/* 如果当前调用者不是主线程(主线程就是执行事件循环的线程),而且被添加的事件处理器是信号事
* 件处理器,而且主线程正在执行该信号事件处理器的回调函数,则当前调用者必须等待主线程完成调
* 用,否则会引起竞争
*/
if (base->current_event == ev && (ev->ev_events & EV_SIGNAL)
&& !EVBASE_IN_THREAD(base)) {
++base->current_event_waiters;
EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
}
#endif
if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
!(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
if (ev->ev_events & (EV_READ|EV_WRITE)) {
res = evmap_io_add(base, ev->ev_fd, ev);
}
else if (ev->ev_events & EV_SIGNAL) {
res = evmap_signal_add(base, (int)ev->ev_fd, ev);
}
if (res != -1) {
event_queue_insert(base, ev, EVLIST_INSERTED);
}
if (res == 1) {
notify = 1;
res = 0;
}
}
if (res != -1 && tv != NULL) {
struct timeval now;
int common_timeout;
/*
* for persistent timeout events, we remember the
* timeout value and re-add the event.
*
* If tv_is_absolute, this was already set.
*/
if (ev->ev_closure == EV_CLOSURE_PERSIST && !tv_is_absolute)
ev->ev_io_timeout = *tv;
/*
* we already reserved memory above for the case where we
* are not replacing an existing timeout.
*/
if (ev->ev_flags & EVLIST_TIMEOUT) {
/* XXX I believe this is needless. */
if (min_heap_elt_is_top(ev))
notify = 1;
event_queue_remove(base, ev, EVLIST_TIMEOUT);
}
/* Check if it is active due to a timeout. Rescheduling
* this timeout before the callback can be executed
* removes it from the active list. */
if ((ev->ev_flags & EVLIST_ACTIVE) &&
(ev->ev_res & EV_TIMEOUT)) {
if (ev->ev_events & EV_SIGNAL) {
/* See if we are just active executing
* this event in a loop
*/
if (ev->ev_ncalls && ev->ev_pncalls) {
/* Abort loop */
*ev->ev_pncalls = 0;
}
}
event_queue_remove(base, ev, EVLIST_ACTIVE);
}
gettime(base, &now);
common_timeout = is_common_timeout(tv, base);
if (tv_is_absolute) {
ev->ev_timeout = *tv;
} else if (common_timeout) {
struct timeval tmp = *tv;
tmp.tv_usec &= MICROSECONDS_MASK;
evutil_timeradd(&now, &tmp, &ev->ev_timeout);
ev->ev_timeout.tv_usec |=
(tv->tv_usec & ~MICROSECONDS_MASK);
} else {
evutil_timeradd(&now, tv, &ev->ev_timeout);
}
event_debug((
"event_add: timeout in %d seconds, call %p\n",
(int)tv->tv_sec, ev->ev_callback));
event_queue_insert(base, ev, EVLIST_TIMEOUT);
if (common_timeout) {
struct common_timeout_list *ctl =
get_common_timeout_list(base, &ev->ev_timeout);
if (ev == TAILQ_FIRST(&ctl->events)) {
common_timeout_schedule(ctl, &now, ev);
}
} else {
/* See if the earliest timeout is now earlier than it
* was before: if so, we will need to tell the main
* thread to wake up earlier than it would
* otherwise. */
if (min_heap_elt_is_top(ev)) {
notify = 1;
}
}
}
/* if we are not in the right thread, we need to wake up the loop */
if (res != -1 && notify && EVBASE_NEED_NOTIFY(base)) {
evthread_notify_base(base);
}
_event_debug_note_add(ev);
return (res);
}
对于tv
不是空指针的情况,就说明是一个超时event,在小根堆中为其留一个位置,代码如下:
if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {
if (min_heap_reserve(&base->timeheap,
1 + min_heap_size(&base->timeheap)) == -1)
return (-1); /* ENOMEM == errno */
}
然后将IO或者信号event插入到对应的队列中:
if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
!(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
if (ev->ev_events & (EV_READ|EV_WRITE)) {
res = evmap_io_add(base, ev->ev_fd, ev);
}
else if (ev->ev_events & EV_SIGNAL) {
res = evmap_signal_add(base, (int)ev->ev_fd, ev);
}
if (res != -1) {
event_queue_insert(base, ev, EVLIST_INSERTED);
}
if (res == 1) {
notify = 1;
res = 0;
}
}
用户把这个event设置成EV_PERSIST,永久event。如果没有这样设置的话,那么只会超时一次;如果设置了,那么就可以超时多次,那么就要记录用户设置的超时值,代码如下:
if (ev->ev_closure == EV_CLOSURE_PERSIST && !tv_is_absolute)
ev->ev_io_timeout = *tv;
用户可以对同一个event调用多次event_add并且可以每次都用不同的超时值,对于多次对同一个超时event调用event_add,那么只能保留最后的那个,实现代码如下:
if (ev->ev_flags & EVLIST_TIMEOUT) {
/* XXX I believe this is needless. */
if (min_heap_elt_is_top(ev))
notify = 1;
event_queue_remove(base, ev, EVLIST_TIMEOUT);
}
上述代码中,如果同时需要删除堆顶的元素,则需要更新notify
变量通知主线程。
因为可以在次线程调用event_add。而主线程刚好在执行event_base_dispatch,所以还需要再一次处理已经被超时激活的event.
最后将该event插入到定时器队列中,并判断定时器是不是最小值已通知主线程更新。
event_base_dispatch()函数
event_base_dispatch()
函数的调用过程如下:
,其中主要的工作在event_base_loop
函数中实现,其代码如图所示:
int
event_base_loop(struct event_base *base, int flags)
{
const struct eventop *evsel = base->evsel;
struct timeval tv;
struct timeval *tv_p;
int res, done, retval = 0;
/* Grab the lock. We will release it inside evsel.dispatch, and again
* as we invoke user callbacks. */
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
if (base->running_loop) {
event_warnx("%s: reentrant invocation. Only one event_base_loop"
" can run on each event_base at once.", __func__);
EVBASE_RELEASE_LOCK(base, th_base_lock);
return -1;
}
base->running_loop = 1;
clear_time_cache(base);
if (base->sig.ev_signal_added && base->sig.ev_n_signals_added)
evsig_set_base(base);
done = 0;
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
base->th_owner_id = EVTHREAD_GET_ID();
#endif
base->event_gotterm = base->event_break = 0;
while (!done) {
printf("event_base_loop do while\n");
base->event_continue = 0;
/* Terminate the loop if we have been asked to */
if (base->event_gotterm) {
break;
}
if (base->event_break) {
break;
}
timeout_correct(base, &tv);
tv_p = &tv;
if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
timeout_next(base, &tv_p);
} else {
/*
* if we have active events, we just poll new events
* without waiting.
*/
evutil_timerclear(&tv);
}
/* If we have no events, we just exit */
if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) {
event_debug(("%s: no events registered.", __func__));
retval = 1;
goto done;
}
/* update last old time */
gettime(base, &base->event_tv);
clear_time_cache(base);
res = evsel->dispatch(base, tv_p);
if (res == -1) {
event_debug(("%s: dispatch returned unsuccessfully.",
__func__));
retval = -1;
goto done;
}
update_time_cache(base);
timeout_process(base);
if (N_ACTIVE_CALLBACKS(base)) {
printf("event_base_loop event_process_active\n");
int n = event_process_active(base);
if ((flags & EVLOOP_ONCE)
&& N_ACTIVE_CALLBACKS(base) == 0
&& n != 0)
done = 1;
} else if (flags & EVLOOP_NONBLOCK)
done = 1;
}
event_debug(("%s: asked to terminate loop.", __func__));
done:
clear_time_cache(base);
base->running_loop = 0;
EVBASE_RELEASE_LOCK(base, th_base_lock);
return (retval);
}
下面分别介绍一下代码中各部分执行的工作:
EVBASE_ACQUIRE_LOCK(base, th_base_lock)
表示在获取锁,并且会在调用io多路复用函数的时候释放锁clear_time_cache(base)
清除缓存时间if (base->sig.ev_signal_added && base->sig.ev_n_signals_added) evsig_set_base(base);
处理Signal事件时,指定信号所属的event_basetimeout_correct(base, &tv)
校准时间timeout_next(base, &tv_p)
根据定时器堆中最小超时时间计算I/O多路复用的最大等待时间res = evsel->dispatch(base, tv_p)
调用io多路复用,监听事件,对于这个io多路复用函数,libevent代码中实现了不同的版本包括select和epoll等等,不同的平台会选择不同的实现,可以到本代码中的select.c文件中看一看基于select的实现,不同平台之间的函数结构应该差不多timeout_process(base)
检查定时事件,将就绪的定时事件从小根堆中删除,插入到活跃事件链表中event_process_active(base);
处理event_base的活跃链表中的事件,调用event的回调函数,优先级高的event先处理