muduo - 设计与实现

Reactor 介绍

Reactor模式用 非阻塞IO + poll(epoll)函数来处理并发,程序的基本结构是一个事件循环,以事件驱动和事件回调的方式实现业务逻辑。可参考《基于I/O复用的Reactor模式》。

while(!done)
{
    int retval  = poll(fds,nfds,timeout)
    if(retval < 0)
        处理错误,回调用户的error handler
    else{
        处理到期的timers,回调用户的timer handler
        if(retval > 0){
            处理IO事件,毁掉用户的IO event handler
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12

单线程Reactor实现

muduo的Reactor核心主要由 EventLoop、Channel、Poller、TimerQueue这几个类完成。代码里面各种回调函数的处理逻辑,使得看起来不直观。另外,这几个类的生命周期也值得注意,容易理不清楚。根据muduo的中心思想one loop per thread ,每个线程只有一个EventLoop对象。同时,每个Channel对象也只属于一个EventLoop,因此每个Channel对象只属于某一个IO线程。每个Channel对象只负责一个文件描述符(fd)的IO事件分发,但它并不拥有这个fd,也不会在析构的时候关闭fd。Channel的生命周期也由owner class负责管理。

EventLoop类

EventLoop类是核心,大多数类都会包含一个EventLoop*的成员,因为所有的事件都会在EventLoop::loop()中通过Channel分发。先来看一下这个loop循环:

void EventLoop::loop()
{
  assert(!looping_);
  assertInLoopThread();
  looping_ = true;
  quit_ = false;  // FIXME: what if someone calls quit() before loop() ?
  LOG_TRACE << "EventLoop " << this << " start looping";

  while (!quit_)
  {
    activeChannels_.clear();
    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
    ++iteration_;
    if (Logger::logLevel() <= Logger::TRACE)
    {
      printActiveChannels();
    }
    // TODO sort channel by priority
    eventHandling_ = true;
    for (Channel* channel : activeChannels_)
    {
      currentActiveChannel_ = channel;
      currentActiveChannel_->handleEvent(pollReturnTime_);
    }
    currentActiveChannel_ = NULL;
    eventHandling_ = false;
    doPendingFunctors();
  }

  LOG_TRACE << "EventLoop " << this << " stop looping";
  looping_ = false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

主要处理逻辑:调用poll函数获取activeChannels_,然后遍历activeChannels_,调用handleEvent 处理对应的事件(handleEvent 是 Channel 类的成员函数,它会根据事件的类型去调用不同的 Callback)。

Channel 类

Channel类比较简单,负责IO事件分发,每一个Channel对象都对应了一个fd,它的核心成员如下:

EventLoop* loop_;
const int  fd_;
int        events_;
int        revents_; // it's the received event types of epoll or poll
int        index_; // used by Poller.

ReadEventCallback readCallback_;
EventCallback writeCallback_;
EventCallback closeCallback_;
EventCallback errorCallback_;
1
2
3
4
5
6
7
8
9
10

Channel会在 handleEvent 函数中将不同的IO事件分发为不同的回调函数,例如readCallback_,writeCallback_。index_是 poller 类中 pollfds_数组的下标。events_和 revents_对应了 struct pollfd 结构中的成员,events_是关心的IO事件,由用户设置;revents_是目前活动的事件,由EventLoop/Poller设置。

Channel::update()会调用EventLoop::updateChannel(),后者会转而调用Poller::updateChannel(),其函数会对pollfds_作添加或更新处理

poll简述

poll的使用方法与select相似,轮询多个文件描述符,有读写时设置相应的状态位,poll相比select优在没有最大文件描述符数量的限制。

# include <poll.h>
int poll ( struct pollfd * fds, unsigned int nfds, int timeout);
 
struct pollfd {
int fd;         /* 文件描述符 */
short events;         /* 等待的事件 */
short revents;       /* 实际发生了的事件 */
} ; 
1
2
3
4
5
6
7
8

每一个pollfd结构体指定了一个被监视的文件描述符,poll函数可以传递多个结构体,表示poll()监视多个文件描述符。每个结构体的events域是监视该文件描述符的事件掩码,由用户来设置这个域。revents域是文件描述符的操作结果事件掩码,内核在调用返回时设置这个域。events域中请求的任何事件都可能在revents域中返回。

poll函数的事件标志符值如下:

常量 说明
POLLIN 普通或优先级带数据可读
POLLRDNORM 普通数据可读
POLLRDBAND 优先级带数据可读
POLLPRI 高优先级数据可读
POLLOUT 普通数据可写
POLLWRNORM 普通数据可写
POLLWRBAND 优先级带数据可写
POLLERR 发生错误
POLLHUP 发生挂起
POLLNVAL 描述字不是一个打开的文件

Poller类

Poller类在这里是poll函数的封装(在muduo源码里面是抽象基类,支持poll和epoll),Poller 的职责也很简单,负责IO multiplexing,一个 EventLoop 有一个 Poller,Poller的生命周期和 EventLoop一样长。它有两个核心的数据成员:

typedef std::vector<struct pollfd> PollFdList;
typedef std::map<int, Channel*> ChannelMap;  // fd to Channel
PollFdList pollfds_;
ChannelMap channels_;
1
2
3
4

ChannelMap是fd到Channel类的映射,PollFdList保存了每一个fd所关心的事件,用作参数传递到poll函数中,Channel类里面的index_即是这里的下标。Poller类有下面四个函数:

Timestamp poll(int timeoutMs, ChannelList* activeChannels);
void updateChannel(Channel* channel);
void removeChannel(Channel* channel);
private:
void fillActiveChannels(int numEvents, ChannelList* activeChannels) const;
updateChannel主要功能是负责维护和更新pollfds_数组,removeChannel负责从pollfds_移除指定channel对应的fd。

poll函数是对::poll的封装,获取当前活动的IO事件,然后填充调用方传入的activeChannels。其中主要逻辑:poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs);//pollfds_传入传出参数,有读写时设置相应的状态位。

Timestamp PollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
  // XXX pollfds_ shouldn't change
  int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs);//pollfds_传入传出参数,有读写时设置相应的状态位
  int savedErrno = errno;
  Timestamp now(Timestamp::now());
  if (numEvents > 0)
  {
    LOG_TRACE << numEvents << " events happened";
    fillActiveChannels(numEvents, activeChannels);
  }
  else if (numEvents == 0)
  {
    LOG_TRACE << " nothing happened";
  }
  else
  {
    if (savedErrno != EINTR)
    {
      errno = savedErrno;
      LOG_SYSERR << "PollPoller::poll()";
    }
  }
  return now;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

fillActiveChannels 函数遍历pollfds_,找出有活动事件的fd(pollfds_中fd对应的revents已经在poll函数中置位了),把它对应的Channel添加到 activeChannels(vector<Channel*>)这个结构中,返回给用户。

void PollPoller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const
{
  for (PollFdList::const_iterator pfd = pollfds_.begin();
      pfd != pollfds_.end() && numEvents > 0; ++pfd)
  {
    if (pfd->revents > 0)
    {
      --numEvents;
      ChannelMap::const_iterator ch = channels_.find(pfd->fd);
      assert(ch != channels_.end());
      Channel* channel = ch->second;
      assert(channel->fd() == pfd->fd);
      channel->set_revents(pfd->revents);
      // pfd->revents = 0;
      activeChannels->push_back(channel);
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

综上所述,EventLoop:loop()中调用Poller::poll()获取当前的活动事件的Channel列表,然后依次调用每个Channel的handleEvent函数进行处理,总体的处理时序图如下:

Reactor时序图