libevent源码分析

定时器实现

Posted by 邹盛富 on April 6, 2018

背景

以前搞mesos,接触最多的基础库就是libprocess,说实话libprocess使用了大量的C++11的新特性,对于大多数的人来说,阅读这种代码的门槛还是比较高,我也只是对actor模型整体的有了认识,但是对于libprocess中的很多细节之处还是没有很好的掌握,每隔一段时间在回头看libprocess的代码时,都会有新的发现。

最近想找一个成本比较低的基础库继续学习,然后就选中了libevent。libevent已经被广泛的应用,作为底层的网络库,比如 memcached、Vomit、Nylon、Netchat等等,一直都想研究一下libevent的代码,最近有时间就尝试基于2.0.22版本的libevent实现一个定时器,之所以要实现一个定时器,是因为定时器作为一种基础组件在在大多数的函数库中都存在,并且有多种不同的方式来实现。

定时器实现

其实实现一个定时器让其定时的执行某一个函数还是比较简单的,源代码如下:

#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>

#include <event2/event.h>
#include <event2/event_struct.h>
#include <event2/util.h>

struct event ev;
struct timeval tv;

void time_cb(int fd, short event, void *argc)
{
	 printf("timer wakeup\n");
	 event_add(&ev, &tv); // reschedule timer
}

int main() {
    struct event_base *base = event_base_new();
    tv.tv_sec = 10; // 10s period
    tv.tv_usec = 0;
    event_assign(&ev, base, -1, 0, time_cb, NULL);
    event_add(&ev, &tv);
    event_base_dispatch(base);
}

上述的代码中主要使用了event_base_newevent_assignevent_addevent_base_dispatch四个函数。依靠这四个函数,可以每隔10秒钟执行一次time_cb函数

定时器流程

event_base_new()函数

event_base_new()函数的实现如下,其中主要的功能就是就是读取配置并对event_base进行初始化。

struct event_base *
event_base_new(void)
{
	struct event_base *base = NULL;
	struct event_config *cfg = event_config_new();
	if (cfg) {
		base = event_base_new_with_config(cfg);
		event_config_free(cfg);
	}
	return base;
}

其中主要使用了两个函数,对于函数event_config_new(),其主要的作用是初见配置变量event_config;对于函数event_base_new_with_config()是利用已经初始化的event_config变量初始化event_base变量。对于event_config变量,其主要包含如下的几部分:

struct event_config {
	TAILQ_HEAD(event_configq, event_config_entry) entries;
	int n_cpus_hint;
	enum event_method_feature require_features;
	enum event_base_config_flag flags;
};

通过其声明可以看到,其主要包含四部分:

  • event_configq是一个队列,用来保存Reactor模型不可以使用的io多路复用函数,这个以后的文章会详细的介绍
  • n_cpus_hint是用来保存机器CPU的个数,以方便对线程池的线程数量进行调整
  • event_method_feature用来保存io多路复用函数应该保存的特征
  • event_base_config_flag用来保存一些的特性,比如是否给event_base加锁、执行event_base_loop是不是使用cache时间以及epoll函数的更少的系统调用的方法等

event_assign()函数

event_assign()函数主要用已经初始化的event_base用来初始化event,包括设置文件标识符、回调函数以及其参数等等。函数的整体结构如下:

int
event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, short events, void (*callback)(evutil_socket_t, short, void *), void *arg)
{
	if (!base)
		base = current_base;

	_event_debug_assert_not_added(ev);

	ev->ev_base = base;

	ev->ev_callback = callback;
	ev->ev_arg = arg;
	ev->ev_fd = fd;
	ev->ev_events = events;
	ev->ev_res = 0;
	ev->ev_flags = EVLIST_INIT;
	ev->ev_ncalls = 0;
	ev->ev_pncalls = NULL;

	if (events & EV_SIGNAL) {
		if ((events & (EV_READ|EV_WRITE)) != 0) {
			event_warnx("%s: EV_SIGNAL is not compatible with "
			    "EV_READ or EV_WRITE", __func__);
			return -1;
		}
		ev->ev_closure = EV_CLOSURE_SIGNAL;
	} else {
		if (events & EV_PERSIST) {
			evutil_timerclear(&ev->ev_io_timeout);
			ev->ev_closure = EV_CLOSURE_PERSIST;
		} else {
			ev->ev_closure = EV_CLOSURE_NONE;
		}
	}

	min_heap_elem_init(ev);

	if (base != NULL) {
		/* by default, we put new events into the middle priority */
		ev->ev_pri = base->nactivequeues / 2;
	}

	_event_debug_note_setup(ev);

	return 0;
}

可以看到整个函数中大部分都是大部分是对event的变量进行赋值操作(event变量中关联了一个event_base变量),对于其中的事件类型进行了详细的处理,主要包括回调函数、回调函数参数、文件标识符、事件类型、事件的状态队列(ev_flags)、优先级、回调函数返回值、回调函数类型(ev_closure)等。

其中另一个亮点就是最小堆的使用,这里仅仅是对最小堆进行了初始化,在libevent中对于定时事件,使用最小堆对对其进行管理,根据超时时间值来进行插入和删除操作,libevent中的对最小堆的使用也算是惊艳之处吧。如果想了解最小堆的详细实现可以浏览一下minheap-internal.h这个文件中代码,如果想详细了解数据结构中最小堆,建议可以看看《数据结构与算法分析:c语言描述》这本书,其中对于最小堆的下溢上溢操作都进行了详细的介绍,与libevent中实现都差不多。

event_add()函数

event_add()函数按照其名字的可以理解为将事件添加到reactor中,但是此函数又调用了好几层其他函数实现了这个功能,其函数的调用关系如下图:

对于函数event_add()需要注意到的是在调用event_add_internal时进行了加锁操作,代码如下:

int
event_add(struct event *ev, const struct timeval *tv)
{
	int res;

	if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) {
		event_warnx("%s: event has no event_base set.", __func__);
		return -1;
	}

	EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);

	res = event_add_internal(ev, tv, 0);

	EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);

	return (res);
}

其中主要的工作都由event_add_internal()这个函数执行,下面看看这个函数的详细情况:

static inline int
event_add_internal(struct event *ev, const struct timeval *tv,
        int tv_is_absolute)
{
    struct event_base *base = ev->ev_base;
    int res = 0;
    int notify = 0;

    EVENT_BASE_ASSERT_LOCKED(base);
    _event_debug_assert_is_setup(ev);

    event_debug((
                "event_add: event: %p (fd "EV_SOCK_FMT"), %s%s%scall %p",
                ev,
                EV_SOCK_ARG(ev->ev_fd),
                ev->ev_events & EV_READ ? "EV_READ " : " ",
                ev->ev_events & EV_WRITE ? "EV_WRITE " : " ",
                tv ? "EV_TIMEOUT " : " ",
                ev->ev_callback));

    EVUTIL_ASSERT(!(ev->ev_flags & ~EVLIST_ALL));

    /*
     * prepare for timeout insertion further below, if we get a
     * failure on any step, we should not change any state.
     */
    if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {
        if (min_heap_reserve(&base->timeheap,
                    1 + min_heap_size(&base->timeheap)) == -1)
            return (-1);  /* ENOMEM == errno */
    }

#ifndef _EVENT_DISABLE_THREAD_SUPPORT
    /* 如果当前调用者不是主线程(主线程就是执行事件循环的线程),而且被添加的事件处理器是信号事
     * 件处理器,而且主线程正在执行该信号事件处理器的回调函数,则当前调用者必须等待主线程完成调
     * 用,否则会引起竞争
     */
    if (base->current_event == ev && (ev->ev_events & EV_SIGNAL)
            && !EVBASE_IN_THREAD(base)) {
        ++base->current_event_waiters;
        EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
    }
#endif

    if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
            !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
        if (ev->ev_events & (EV_READ|EV_WRITE)) {
            res = evmap_io_add(base, ev->ev_fd, ev);
        }
        else if (ev->ev_events & EV_SIGNAL) {
            res = evmap_signal_add(base, (int)ev->ev_fd, ev);
        }
        if (res != -1) {
            event_queue_insert(base, ev, EVLIST_INSERTED);
        }
        if (res == 1) {
            notify = 1;
            res = 0;
        }
    }

    if (res != -1 && tv != NULL) {
        struct timeval now;
        int common_timeout;

        /*
         * for persistent timeout events, we remember the
         * timeout value and re-add the event.
         *
         * If tv_is_absolute, this was already set.
         */
        if (ev->ev_closure == EV_CLOSURE_PERSIST && !tv_is_absolute)
            ev->ev_io_timeout = *tv;

        /*
         * we already reserved memory above for the case where we
         * are not replacing an existing timeout.
         */
        if (ev->ev_flags & EVLIST_TIMEOUT) {
            /* XXX I believe this is needless. */
            if (min_heap_elt_is_top(ev))
                notify = 1;
            event_queue_remove(base, ev, EVLIST_TIMEOUT);
        }

        /* Check if it is active due to a timeout.  Rescheduling
         * this timeout before the callback can be executed
         * removes it from the active list. */
        if ((ev->ev_flags & EVLIST_ACTIVE) &&
                (ev->ev_res & EV_TIMEOUT)) {
            if (ev->ev_events & EV_SIGNAL) {
                /* See if we are just active executing
                 * this event in a loop
                 */
                if (ev->ev_ncalls && ev->ev_pncalls) {
                    /* Abort loop */
                    *ev->ev_pncalls = 0;
                }
            }
            event_queue_remove(base, ev, EVLIST_ACTIVE);
        }

        gettime(base, &now);

        common_timeout = is_common_timeout(tv, base);
        if (tv_is_absolute) {
            ev->ev_timeout = *tv;
        } else if (common_timeout) {
            struct timeval tmp = *tv;
            tmp.tv_usec &= MICROSECONDS_MASK;
            evutil_timeradd(&now, &tmp, &ev->ev_timeout);
            ev->ev_timeout.tv_usec |=
                (tv->tv_usec & ~MICROSECONDS_MASK);
        } else {
            evutil_timeradd(&now, tv, &ev->ev_timeout);
        }

        event_debug((
                    "event_add: timeout in %d seconds, call %p\n",
                    (int)tv->tv_sec, ev->ev_callback));

        event_queue_insert(base, ev, EVLIST_TIMEOUT);
        if (common_timeout) {
            struct common_timeout_list *ctl =
                get_common_timeout_list(base, &ev->ev_timeout);
            if (ev == TAILQ_FIRST(&ctl->events)) {
                common_timeout_schedule(ctl, &now, ev);
            }
        } else {
            /* See if the earliest timeout is now earlier than it
             * was before: if so, we will need to tell the main
             * thread to wake up earlier than it would
             * otherwise. */
            if (min_heap_elt_is_top(ev)) {
                notify = 1;
            }
        }
    }

    /* if we are not in the right thread, we need to wake up the loop */
    if (res != -1 && notify && EVBASE_NEED_NOTIFY(base)) {
        evthread_notify_base(base);
    }

    _event_debug_note_add(ev);

    return (res);
}

对于tv不是空指针的情况,就说明是一个超时event,在小根堆中为其留一个位置,代码如下:

if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {
		if (min_heap_reserve(&base->timeheap,
			1 + min_heap_size(&base->timeheap)) == -1)
			return (-1);  /* ENOMEM == errno */
	}

然后将IO或者信号event插入到对应的队列中:

if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
        !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
    if (ev->ev_events & (EV_READ|EV_WRITE)) {
        res = evmap_io_add(base, ev->ev_fd, ev);
    }
    else if (ev->ev_events & EV_SIGNAL) {
        res = evmap_signal_add(base, (int)ev->ev_fd, ev);
    }
    if (res != -1) {
        event_queue_insert(base, ev, EVLIST_INSERTED);
    }
    if (res == 1) {
        notify = 1;
        res = 0;
    }
}

用户把这个event设置成EV_PERSIST,永久event。如果没有这样设置的话,那么只会超时一次;如果设置了,那么就可以超时多次,那么就要记录用户设置的超时值,代码如下:

if (ev->ev_closure == EV_CLOSURE_PERSIST && !tv_is_absolute)
            ev->ev_io_timeout = *tv;

用户可以对同一个event调用多次event_add并且可以每次都用不同的超时值,对于多次对同一个超时event调用event_add,那么只能保留最后的那个,实现代码如下:

if (ev->ev_flags & EVLIST_TIMEOUT) {
    /* XXX I believe this is needless. */
    if (min_heap_elt_is_top(ev))
        notify = 1;
    event_queue_remove(base, ev, EVLIST_TIMEOUT);
}

上述代码中,如果同时需要删除堆顶的元素,则需要更新notify变量通知主线程。 因为可以在次线程调用event_add。而主线程刚好在执行event_base_dispatch,所以还需要再一次处理已经被超时激活的event.

最后将该event插入到定时器队列中,并判断定时器是不是最小值已通知主线程更新。

event_base_dispatch()函数

event_base_dispatch()函数的调用过程如下: ,其中主要的工作在event_base_loop函数中实现,其代码如图所示:

int
event_base_loop(struct event_base *base, int flags)
{
	const struct eventop *evsel = base->evsel;
	struct timeval tv;
	struct timeval *tv_p;
	int res, done, retval = 0;

	/* Grab the lock.  We will release it inside evsel.dispatch, and again
	 * as we invoke user callbacks. */
	EVBASE_ACQUIRE_LOCK(base, th_base_lock);

	if (base->running_loop) {
		event_warnx("%s: reentrant invocation.  Only one event_base_loop"
		    " can run on each event_base at once.", __func__);
		EVBASE_RELEASE_LOCK(base, th_base_lock);
		return -1;
	}

	base->running_loop = 1;

	clear_time_cache(base);

	if (base->sig.ev_signal_added && base->sig.ev_n_signals_added)
		evsig_set_base(base);

	done = 0;

    #ifndef _EVENT_DISABLE_THREAD_SUPPORT
	   base->th_owner_id = EVTHREAD_GET_ID();
    #endif

	base->event_gotterm = base->event_break = 0;

	while (!done) {
		printf("event_base_loop do while\n");
		base->event_continue = 0;

		/* Terminate the loop if we have been asked to */
		if (base->event_gotterm) {
			break;
		}

		if (base->event_break) {
			break;
		}

		timeout_correct(base, &tv);

		tv_p = &tv;
		if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
			timeout_next(base, &tv_p);
		} else {
			/*
			 * if we have active events, we just poll new events
			 * without waiting.
			 */
			evutil_timerclear(&tv);
		}

		/* If we have no events, we just exit */
		if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) {
			event_debug(("%s: no events registered.", __func__));
			retval = 1;
			goto done;
		}

		/* update last old time */
		gettime(base, &base->event_tv);

		clear_time_cache(base);
		res = evsel->dispatch(base, tv_p);

		if (res == -1) {
			event_debug(("%s: dispatch returned unsuccessfully.",
				__func__));
			retval = -1;
			goto done;
		}

		update_time_cache(base);

		timeout_process(base);

		if (N_ACTIVE_CALLBACKS(base)) {
			printf("event_base_loop event_process_active\n");
			int n = event_process_active(base);
			if ((flags & EVLOOP_ONCE)
			    && N_ACTIVE_CALLBACKS(base) == 0
			    && n != 0)
				done = 1;
		} else if (flags & EVLOOP_NONBLOCK)
			done = 1;
	}
	event_debug(("%s: asked to terminate loop.", __func__));

done:
	clear_time_cache(base);
	base->running_loop = 0;

	EVBASE_RELEASE_LOCK(base, th_base_lock);

	return (retval);
}

下面分别介绍一下代码中各部分执行的工作:

  • EVBASE_ACQUIRE_LOCK(base, th_base_lock)表示在获取锁,并且会在调用io多路复用函数的时候释放锁
  • clear_time_cache(base)清除缓存时间
  • if (base->sig.ev_signal_added && base->sig.ev_n_signals_added) evsig_set_base(base);处理Signal事件时,指定信号所属的event_base
  • timeout_correct(base, &tv) 校准时间
  • timeout_next(base, &tv_p)根据定时器堆中最小超时时间计算I/O多路复用的最大等待时间
  • res = evsel->dispatch(base, tv_p)调用io多路复用,监听事件,对于这个io多路复用函数,libevent代码中实现了不同的版本包括select和epoll等等,不同的平台会选择不同的实现,可以到本代码中的select.c文件中看一看基于select的实现,不同平台之间的函数结构应该差不多
  • timeout_process(base)检查定时事件,将就绪的定时事件从小根堆中删除,插入到活跃事件链表中
  • event_process_active(base);处理event_base的活跃链表中的事件,调用event的回调函数,优先级高的event先处理

参考连接

Linux 下定时器的实现方式分析