C++ 全栈知识体系C++ 全栈知识体系
✿导航
  • 基础
  • 函数
  • 知识点
  • IO框架
  • 新版本特性
  • 数据库原理
  • SQL语言
  • SQL - MySQL
  • NoSQL - Redis
  • NoSQL - ElasticSearch
  • 算法基础
  • 常见算法
  • 领域算法
  • 分布式算法
  • 数据结构与算法
  • 计算机网络
  • 操作系统
  • 计算机组成
  • 开发
  • 测试
  • 架构基础
  • 分布式系统
  • 微服务
  • 中间件
  • 概念
  • 理论
  • 架构设计原则
  • 设计模式
  • 协议
  • 技术选型
  • 编码规范
  • 流水线构建 - CI/CD
  • 知识点 - Linux
  • 网站 - Nginx
  • 容器化 - Docker
  • 容器编排 - Kubernetes
  • 服务网格 - Service Mesh Istio
  • 常用快捷键 - Shortcut
  • 工具使用 - Tools
  • 开源项目
  • 学习项目
  • 个人项目
  • 项目开发
  • 项目Idea
  • 并发
  • 部署
  • 分布式
  • 知识
  • 问题
  • 编程语言与技术
  • 系统与架构
  • 软件开发实践
  • 数据处理与应用设计
  • 个人
  • 产品
  • 团队
  • 知识体系
  • Vue
关于
✿导航
  • 基础
  • 函数
  • 知识点
  • IO框架
  • 新版本特性
  • 数据库原理
  • SQL语言
  • SQL - MySQL
  • NoSQL - Redis
  • NoSQL - ElasticSearch
  • 算法基础
  • 常见算法
  • 领域算法
  • 分布式算法
  • 数据结构与算法
  • 计算机网络
  • 操作系统
  • 计算机组成
  • 开发
  • 测试
  • 架构基础
  • 分布式系统
  • 微服务
  • 中间件
  • 概念
  • 理论
  • 架构设计原则
  • 设计模式
  • 协议
  • 技术选型
  • 编码规范
  • 流水线构建 - CI/CD
  • 知识点 - Linux
  • 网站 - Nginx
  • 容器化 - Docker
  • 容器编排 - Kubernetes
  • 服务网格 - Service Mesh Istio
  • 常用快捷键 - Shortcut
  • 工具使用 - Tools
  • 开源项目
  • 学习项目
  • 个人项目
  • 项目开发
  • 项目Idea
  • 并发
  • 部署
  • 分布式
  • 知识
  • 问题
  • 编程语言与技术
  • 系统与架构
  • 软件开发实践
  • 数据处理与应用设计
  • 个人
  • 产品
  • 团队
  • 知识体系
  • Vue
关于
  • 编程语言与技术

    • Effective C++: 改善程序与设计的55个具体做法

      • 第2章 - 构造/析构/赋值运算(一)
      • 第2章 - 构造/析构/赋值运算(二)
      • 第2章 - 构造/析构/赋值运算(三)
      • 第3章 - 资源管理
      • 第4章 - 设计与声明(一)
      • 第4章 - 设计与声明(二)
      • 第5章 - 实现(一)
      • 第5章 - 实现(二)
      • 第6章 - 继承与面向对象设计
      • 第7章 - 模板与泛型编程
    • 深度探索C++对象模型

      • 第1章 - 关于对象
      • 第2章 - 构造函数语意学
      • 第3章 - Data 语意学
    • STL源码剖析

      • 第1章 - STL概论和版本简介
      • 第2章 - 空间配置器
      • 第3章 - 迭代器(iterators)概念与traits编程技法(一)
      • 第3章 - 迭代器(iterators)概念与traits编程技法(二)
      • 第4章 - 序列式容器 vector
      • 第4章 - 序列式容器 list
      • 第4章 - 序列式容器 deque
      • 第4章 - 序列式容器 stack和queue
      • 第4章 - 序列式容器 heap
      • 第4章 - 序列式容器 priority_queue
      • 第4章 - 序列式容器 slist
      • 第5章 - 关联式容器 RB-tree
      • 第5章 - 关联式容器 set和map
      • 第5章 - 关联式容器 hashtable
      • 第6章 - 算法
      • 第6章 - 算法之set
      • 第7章 - 仿函数
      • 第8章 - 配接器
  • 系统与架构

    • 深入理解计算机系统

      • 第1章 - 计算机系统漫游
      • 第2章 - 信息的表示和处理
      • 第3章 - 程序的机器级表示
      • 第5章 - 优化程序性能
      • 第6章 - 存储器层次结构
      • 第7章 - 链接
      • 第8章 - 异常控制流
      • 第9章 - 虚拟内存
      • 第10章 - 系统级I/O
      • 第11章 - 网络编程
      • 第12章 - 并发编程
    • 大型网站技术架构——核心原理与案例分析

      • 第1章 - 大型网站架构演化
      • 第2章 - 大型网站架构模式
      • 第3章 - 大型网站核心架构要素
      • 第4章 - 瞬时响应:网站的高性能架构
      • 第5章 - 万无一失:网站的高可用架构
      • 第6章 - 永无止境:网站的伸缩性架构
      • 第7章 - 随需应变:网站的可扩展架构
      • 第8章 - 固若金汤:网站的安全架构
    • 从零开始学架构

      • 架构基础
      • 架构设计原则
      • 高性能架构
      • 高可用架构
    • 程序员的自我修养————链接、装载与库

      • 第1章 - 简介
      • 第2章 - 静态链路
      • 第3章 - 目标文件里有什么
      • 第4章 - 静态链接
      • 第7章 - 动态链接
      • 第8章 - 共享库版本
      • 第10章 - 内存
      • 第11章 - 运行库
      • 第12章 - 系统调用与API
      • 第13章 - 运行库实现
  • 软件开发实践

    • 重构改善既有代码的设计

      • 第1章 - 重构,第一个示例
      • 第2章 - 重构的原则
      • 第3章 - 代码的坏味道
      • 第5章 - 重构列表
      • 第6章 - 重新组织函数
      • 第7章 - 在对象之间搬移特性
      • 第8章 - 重新组织数据
      • 第9章 - 简化条件表达式
      • 第10章 - 简化函数调用
      • 第11章 - 处理概括关系
      • 第12章 - 设计之大型重构
    • 代码大全2

      • 第1章 - 欢迎进入软件构建的世界
      • 第2章 - 用隐喻来更充分地理解软件开发
      • 第3章 - 三思而后行: 前期准备
      • 第4章 - 关键的构建决策
      • 第5章 - 软件构建中的设计
    • Linux多线程服务端编程——使用muduo C++ 网络库

      • Buffer类的设计
      • 设计与实现
      • 定时器与TimerQueue
      • Protobuf网络传输和Protobuf编解码器与消息分发器
      • EventLoop类剖析
      • EventLoopThread和EventLoopThreadPool剖析
      • TCP网络库和核心类
      • Connector剖析
      • TcpClient剖析
      • 学习总结
      • timing wheel
      • 消息广播服务
      • 线程安全的对象生命期管理
  • 数据处理与应用设计

    • 数据密集型应用系统设计

      • 第1章 - 可靠、可扩展与可维护的应用系统
      • 第2章 - 数据模型与查询语言
      • 第3章 - 数据存储与检索
      • 第4章 - 数据编码与演化
      • 第5章 - 数据复制
      • 第6章 - 数据分区
      • 第7章 - 事务

muduo - EventLoopThread和EventLoopThreadPool剖析

  • EventLoopThreadPool类分析:
    • 构造与析构
    • 开启线程池
    • getNextLoop 采用轮询的方式分配EventLoop
    • getLoopForHash采用hash方式分配EventLoop
    • getAllLoops 返回所有的EventLoop

​# EventLoopThread

EventLoopThread是事件循环线程,包含一个Thread对象和一个EventLoop对象。在构造函数中,把EventLoopThread::threadFunc 注册到Thread对象中,在EventLoopThread::startLoop()函数中进行回调。

EventLoopThread的工作流程为:

  1. 在主线程创建EventLoopThread对象。
  2. 主线程调用EventLoopThread.start(),启动EventLoopThread中的线程(称为IO线程),而且主线程要等待IO线程创建完毕EventLoop对象。
  3. IO线程调用threadFunc创建EventLoop对象。通知主线程已经创建完毕。
  4. 主线程返回创建的EventLoop对象。

EventLoopThread类分析:

类定义如下:

class EventLoopThread : noncopyable
{
 public:
  typedef std::function<void(EventLoop*)> ThreadInitCallback;

  EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback(),
                  const string& name = string());
  ~EventLoopThread();
  EventLoop* startLoop();

 private:
  void threadFunc();

  EventLoop* loop_ GUARDED_BY(mutex_);
  bool exiting_;
  Thread thread_;
  MutexLock mutex_;
  Condition cond_ GUARDED_BY(mutex_);
  ThreadInitCallback callback_;
};

成员函数:

/******************************************************************** 
Description : 这个类专门创建一个线程用于执行Reactor的事件循环,当然
这只是一个辅助类,没有说一定要使用它,可以根据自己的情况进行选择,
你也可以不创建线程去执行事件循环,而在主线程中执行事件循环,
一切根据自己的需要。
*********************************************************************/

//构造函数
EventLoopThread::EventLoopThread(const ThreadInitCallback& cb,
                                 const string& name)
  : loop_(NULL),
    exiting_(false),
    thread_(std::bind(&EventLoopThread::threadFunc, this), name),
    mutex_(),
    cond_(mutex_),
    callback_(cb)
{
}

//析构函数
EventLoopThread::~EventLoopThread()
{
  exiting_ = true;
  if (loop_ != NULL)
  {
    loop_->quit();
    thread_.join();
  }
}

// 启动一个EventLoop线程
EventLoop* EventLoopThread::startLoop()
{
  assert(!thread_.started());
  // 当前线程启动,调用threadFunc()
  thread_.start();
  EventLoop* loop = NULL;
  {
    MutexLockGuard lock(mutex_);
    while (loop_ == NULL)
    {
      // 等待创建好当前IO线程
      cond_.wait();
    }
    loop = loop_;
  }
  return loop;
}

void EventLoopThread::threadFunc()
{
  EventLoop loop;
  // 如果有初始化函数,就先调用初始化函数
  if (callback_)
  {
    callback_(&loop);
  }
  {
    MutexLockGuard lock(mutex_);
    // loop_指针指向了一个栈上的对象,threadFunc函数退出之后。这个指针就失效了
    // threadFunc函数退出,就意味着线程退出了,EventLoopThread对象也就没有存在的价值了。
    // 因而不会有什么大的问题
    loop_ = &loop;
    // 通知startLoop线程已经启动完毕
    cond_.notify();
  }
  // 事件循环,直到EventLoopThread析构。此后不再使用loop_访问EventLoop了。
  loop.loop();
  MutexLockGuard lock(mutex_);
  loop_ = NULL;
}

测试程序:

#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>
#include <stdio.h>

using namespace muduo;
using namespace muduo::net;

void runInThread()
{
  printf("runInThread(): pid = %d, tid = %d\n",
         getpid(), CurrentThread::tid());
}

int main()
{
  printf("main(): pid = %d, tid = %d\n",
         getpid(), CurrentThread::tid());

  EventLoopThread loopThread;
  EventLoop* loop = loopThread.startLoop();
  // 异步调用runInThread,即将runInThread加入到loop对象所在IO线程,让该IO线程运行
  loop->runInLoop(runInThread);
  sleep(1);
  // runAfter内部也调用了runInLoop。所以这里也是异步调用
  loop->runAfter(2, runInThread);
  sleep(3);
  loop->quit();

  printf("exit main().\n");
}

执行结果:

[root@192 EventLoopThread]# g++ runInLoopThreadTest.cpp -o runInLoopThreadTest -I /home/muduo/muduo-master -L/home/muduo/build/release-cpp11/lib/  -lmuduo_net  -lmuduo_base -lpthread
[root@192 EventLoopThread]# ./runInLoopThreadTest
main(): pid = 2399, tid = 2399
runInThread(): pid = 2399, tid = 2400
runInThread(): pid = 2399, tid = 2400
exit main().
[root@192 EventLoopThread]#

对于上述调用程序进行分析:

主线程调用 loop->runInLoop(runInThread); 因为主线程(不是IO线程)调用runInLoop。 故进一步调用queueInLoop() 将runInThread 加入到队列,然后wakeup() IO线程。IO线程在doPendingFunctors() 中取loop->runAfter() 要唤醒一下,此时仅仅是运行runAfter() 加入了一个2s的定时器, 2s超时。timerfd_ 可读,先handleRead()一下然后运行回调函数runInThread()。

那为什么exit main() 之后wakeupFd_ 还会有可读事件呢?那是由于EventLoopThead 栈上对象析构,在析构函数内 loop_ ->quit(), 因为不是在IO线程调用quit(),故也须要唤醒一下。IO线程才干从poll 返回,这样再次循环推断 while (!quit_) 就能退出IO线程。

# EventLoopThreadPool

muduo的线程模型:

线程模型

muduo的思想是EventLoop + thread pool。为了更方便使用,将EventLoopThread做了封装。main reactor能够创建sub reactor,并发一些任务分发到sub reactor中去。EventLoopThreadPool的思想比较简单,用一个main reactor创建EventLoopThreadPool。在EventLoopThreadPool中将EventLoop和Thread绑定,能够返回EventLoop对象来使用EventLoopThreadPool中的Thread。

EventLoopThreadPool 是一个线程池,只不过该线程池有一点特殊,该线程池中的每一个线程都要执行EventLoop进行文件描述符的监听。此时一个线程用于管理分配线程池中的EventLoop,如果线程池为空,主线程的EventLoop用于监听所有的文件描述符。

EventLoopThreadPool类分析:

类定义如下:

class EventLoopThreadPool : noncopyable
{
 public:
  typedef std::function<void(EventLoop*)> ThreadInitCallback;

  EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg);
  ~EventLoopThreadPool();
  void setThreadNum(int numThreads) { numThreads_ = numThreads; }
  void start(const ThreadInitCallback& cb = ThreadInitCallback());

  // valid after calling start()
  /// round-robin
  EventLoop* getNextLoop();

  /// with the same hash code, it will always return the same EventLoop
  EventLoop* getLoopForHash(size_t hashCode);

  std::vector<EventLoop*> getAllLoops();

  bool started() const
  { return started_; }

  const string& name() const
  { return name_; }

 private:

  EventLoop* baseLoop_;    //线程池类用于服务端,baseLoop_用于接收新连接,线程池用于处理这些新连接的IO操作。与Acceptor所属EventLoop同样
  string name_;
  bool started_;          //开始标志
  int numThreads_;        // 线程数
  int next_;              // 新连接到来。所选择的EventLoop对象下标
  std::vector<std::unique_ptr<EventLoopThread>> threads_;    // IO线程列表
  std::vector<EventLoop*> loops_;       // EventLoop列表
};

成员函数分析:

构造与析构

EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg)
  : baseLoop_(baseLoop),
    name_(nameArg),
    started_(false),
    numThreads_(0),
    next_(0)
{
}

EventLoopThreadPool::~EventLoopThreadPool()
{
  // Don't delete loop, it's stack variable
}

开启线程池

void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{
  assert(!started_);
  baseLoop_->assertInLoopThread();

  started_ = true;

  //创建numThreads_个事件循环器,numThreads_通过setThreadNum函数进行设置
  //一般调用时先setThreadNum(),然后start().
  // void setThreadNum(int numThreads) { numThreads_ = numThreads; } 设置线程数量
  // void start(const ThreadInitCallback& cb = ThreadInitCallback()); 设置初始回调函数
  for (int i = 0; i < numThreads_; ++i)
  {
    char buf[name_.size() + 32];
    snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);
    ////创建一个EventLoopThread 对象
    EventLoopThread* t = new EventLoopThread(cb, buf);
    threads_.push_back(std::unique_ptr<EventLoopThread>(t));
    //事件循环器开始执行,如果此时需要监听的文件描述符的数量小于线程个数
	//其余的线程会进入睡眠状态
    loops_.push_back(t->startLoop());
  }
  //线程池的个数为0,则主线程开始监听套接字
  if (numThreads_ == 0 && cb)
  {
    cb(baseLoop_);
  }
}

getNextLoop 采用轮询的方式分配EventLoop

//循环获取下一个线程池的loop,保证各IO线程上执行的IO操作基本负载均衡
EventLoop* EventLoopThreadPool::getNextLoop()
{
  baseLoop_->assertInLoopThread();
  assert(started_);
  EventLoop* loop = baseLoop_;

  //如果线程池为空,返回主线程的EventLoop
  if (!loops_.empty())
  {
    // round-robin
    loop = loops_[next_];
    ++next_;
    //循环获取,如果next_的值大于了loops_.size(), 从第一个开始获取
    if (implicit_cast<size_t>(next_) >= loops_.size())
    {
      next_ = 0;
    }
  }
  return loop;
}

getLoopForHash采用hash方式分配EventLoop

EventLoop* EventLoopThreadPool::getLoopForHash(size_t hashCode)
{
  baseLoop_->assertInLoopThread();
  EventLoop* loop = baseLoop_;

  if (!loops_.empty())
  {
    loop = loops_[hashCode % loops_.size()];////hash 的方式获取EventLoop 事件循环器
  }
  return loop;
}

getAllLoops 返回所有的EventLoop

std::vector<EventLoop*> EventLoopThreadPool::getAllLoops()
{
  baseLoop_->assertInLoopThread();
  assert(started_);

  //如果线程池为空
  if (loops_.empty())
  {
    //将主线程的EventLoop 返回
    return std::vector<EventLoop*>(1, baseLoop_);
  }
  else
  {
    //否则,返回线程池中的EventLoop 
    return loops_;
  }
}

实际使用过程中,EventLoopThreadPool是嵌入TcpServer作为类的一个成员来使用:

TcpServer::TcpServer(EventLoop* loop,
                     const InetAddress& listenAddr,
                     const string& nameArg,
                     Option option)
  : loop_(CHECK_NOTNULL(loop)),
    ipPort_(listenAddr.toIpPort()),
    name_(nameArg),
    acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),
    threadPool_(new EventLoopThreadPool(loop, name_)), //创建线程池
    connectionCallback_(defaultConnectionCallback),
    messageCallback_(defaultMessageCallback),
    nextConnId_(1)
{
  acceptor_->setNewConnectionCallback(
      std::bind(&TcpServer::newConnection, this, _1, _2));
}

void TcpServer::setThreadNum(int numThreads)
{
  assert(0 <= numThreads);
  threadPool_->setThreadNum(numThreads);//设置线程数量
}

void TcpServer::setThreadNum(int numThreads)
{
  assert(0 <= numThreads);
  threadPool_->setThreadNum(numThreads);
}

void TcpServer::start()
{
  if (started_.getAndSet(1) == 0)
  {
    threadPool_->start(threadInitCallback_);//线程池启动

    assert(!acceptor_->listening());
    loop_->runInLoop(
        std::bind(&Acceptor::listen, get_pointer(acceptor_)));
  }
}

void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
  loop_->assertInLoopThread();
  EventLoop* ioLoop = threadPool_->getNextLoop();//获取下一个Loop,进行使用
  ......
}
Last Updated:
Contributors: klc407073648
Prev
EventLoop类剖析
Next
TCP网络库和核心类