C++ 全栈知识体系C++ 全栈知识体系
✿导航
  • 基础
  • 函数
  • 知识点
  • IO框架
  • 新版本特性
  • 数据库原理
  • SQL语言
  • SQL - MySQL
  • NoSQL - Redis
  • NoSQL - ElasticSearch
  • 算法基础
  • 常见算法
  • 领域算法
  • 分布式算法
  • 数据结构与算法
  • 计算机网络
  • 操作系统
  • 计算机组成
  • 开发
  • 测试
  • 架构基础
  • 分布式系统
  • 微服务
  • 中间件
  • 概念
  • 理论
  • 架构设计原则
  • 设计模式
  • 协议
  • 技术选型
  • 编码规范
  • 流水线构建 - CI/CD
  • 知识点 - Linux
  • 网站 - Nginx
  • 容器化 - Docker
  • 容器编排 - Kubernetes
  • 服务网格 - Service Mesh Istio
  • 常用快捷键 - Shortcut
  • 工具使用 - Tools
  • 开源项目
  • 学习项目
  • 个人项目
  • 项目开发
  • 项目Idea
  • 并发
  • 部署
  • 分布式
  • 知识
  • 问题
  • 编程语言与技术
  • 系统与架构
  • 软件开发实践
  • 数据处理与应用设计
  • 个人
  • 产品
  • 团队
  • 知识体系
  • Vue
关于
✿导航
  • 基础
  • 函数
  • 知识点
  • IO框架
  • 新版本特性
  • 数据库原理
  • SQL语言
  • SQL - MySQL
  • NoSQL - Redis
  • NoSQL - ElasticSearch
  • 算法基础
  • 常见算法
  • 领域算法
  • 分布式算法
  • 数据结构与算法
  • 计算机网络
  • 操作系统
  • 计算机组成
  • 开发
  • 测试
  • 架构基础
  • 分布式系统
  • 微服务
  • 中间件
  • 概念
  • 理论
  • 架构设计原则
  • 设计模式
  • 协议
  • 技术选型
  • 编码规范
  • 流水线构建 - CI/CD
  • 知识点 - Linux
  • 网站 - Nginx
  • 容器化 - Docker
  • 容器编排 - Kubernetes
  • 服务网格 - Service Mesh Istio
  • 常用快捷键 - Shortcut
  • 工具使用 - Tools
  • 开源项目
  • 学习项目
  • 个人项目
  • 项目开发
  • 项目Idea
  • 并发
  • 部署
  • 分布式
  • 知识
  • 问题
  • 编程语言与技术
  • 系统与架构
  • 软件开发实践
  • 数据处理与应用设计
  • 个人
  • 产品
  • 团队
  • 知识体系
  • Vue
关于
  • 编程语言与技术

    • Effective C++: 改善程序与设计的55个具体做法

      • 第2章 - 构造/析构/赋值运算(一)
      • 第2章 - 构造/析构/赋值运算(二)
      • 第2章 - 构造/析构/赋值运算(三)
      • 第3章 - 资源管理
      • 第4章 - 设计与声明(一)
      • 第4章 - 设计与声明(二)
      • 第5章 - 实现(一)
      • 第5章 - 实现(二)
      • 第6章 - 继承与面向对象设计
      • 第7章 - 模板与泛型编程
    • 深度探索C++对象模型

      • 第1章 - 关于对象
      • 第2章 - 构造函数语意学
      • 第3章 - Data 语意学
    • STL源码剖析

      • 第1章 - STL概论和版本简介
      • 第2章 - 空间配置器
      • 第3章 - 迭代器(iterators)概念与traits编程技法(一)
      • 第3章 - 迭代器(iterators)概念与traits编程技法(二)
      • 第4章 - 序列式容器 vector
      • 第4章 - 序列式容器 list
      • 第4章 - 序列式容器 deque
      • 第4章 - 序列式容器 stack和queue
      • 第4章 - 序列式容器 heap
      • 第4章 - 序列式容器 priority_queue
      • 第4章 - 序列式容器 slist
      • 第5章 - 关联式容器 RB-tree
      • 第5章 - 关联式容器 set和map
      • 第5章 - 关联式容器 hashtable
      • 第6章 - 算法
      • 第6章 - 算法之set
      • 第7章 - 仿函数
      • 第8章 - 配接器
  • 系统与架构

    • 深入理解计算机系统

      • 第1章 - 计算机系统漫游
      • 第2章 - 信息的表示和处理
      • 第3章 - 程序的机器级表示
      • 第5章 - 优化程序性能
      • 第6章 - 存储器层次结构
      • 第7章 - 链接
      • 第8章 - 异常控制流
      • 第9章 - 虚拟内存
      • 第10章 - 系统级I/O
      • 第11章 - 网络编程
      • 第12章 - 并发编程
    • 大型网站技术架构——核心原理与案例分析

      • 第1章 - 大型网站架构演化
      • 第2章 - 大型网站架构模式
      • 第3章 - 大型网站核心架构要素
      • 第4章 - 瞬时响应:网站的高性能架构
      • 第5章 - 万无一失:网站的高可用架构
      • 第6章 - 永无止境:网站的伸缩性架构
      • 第7章 - 随需应变:网站的可扩展架构
      • 第8章 - 固若金汤:网站的安全架构
    • 从零开始学架构

      • 架构基础
      • 架构设计原则
      • 高性能架构
      • 高可用架构
    • 程序员的自我修养————链接、装载与库

      • 第1章 - 简介
      • 第2章 - 静态链路
      • 第3章 - 目标文件里有什么
      • 第4章 - 静态链接
      • 第7章 - 动态链接
      • 第8章 - 共享库版本
      • 第10章 - 内存
      • 第11章 - 运行库
      • 第12章 - 系统调用与API
      • 第13章 - 运行库实现
  • 软件开发实践

    • 重构改善既有代码的设计

      • 第1章 - 重构,第一个示例
      • 第2章 - 重构的原则
      • 第3章 - 代码的坏味道
      • 第5章 - 重构列表
      • 第6章 - 重新组织函数
      • 第7章 - 在对象之间搬移特性
      • 第8章 - 重新组织数据
      • 第9章 - 简化条件表达式
      • 第10章 - 简化函数调用
      • 第11章 - 处理概括关系
      • 第12章 - 设计之大型重构
    • 代码大全2

      • 第1章 - 欢迎进入软件构建的世界
      • 第2章 - 用隐喻来更充分地理解软件开发
      • 第3章 - 三思而后行: 前期准备
      • 第4章 - 关键的构建决策
      • 第5章 - 软件构建中的设计
    • Linux多线程服务端编程——使用muduo C++ 网络库

      • Buffer类的设计
      • 设计与实现
      • 定时器与TimerQueue
      • Protobuf网络传输和Protobuf编解码器与消息分发器
      • EventLoop类剖析
      • EventLoopThread和EventLoopThreadPool剖析
      • TCP网络库和核心类
      • Connector剖析
      • TcpClient剖析
      • 学习总结
      • timing wheel
      • 消息广播服务
      • 线程安全的对象生命期管理
  • 数据处理与应用设计

    • 数据密集型应用系统设计

      • 第1章 - 可靠、可扩展与可维护的应用系统
      • 第2章 - 数据模型与查询语言
      • 第3章 - 数据存储与检索
      • 第4章 - 数据编码与演化
      • 第5章 - 数据复制
      • 第6章 - 数据分区
      • 第7章 - 事务

muduo - 消息广播服务

    本文介绍如何使用muduo实现一个简单的 topic-based 消息广播服务。消息广播服务其实是“聊天室”的一个简单扩展,不过聊天的不是人,而是分布式系统中的程序。

    消息广播

    在分布式系统中,除了常用的 end-to-end 通信,还有一对多的广播通信。本文讨论的是基于 TCP 协议的应用层广播。示意图如下:

    Hub

    上图中圆角矩形代表程序,"Hub"是一个服务程序,不是网络集线器,它起到类似集线器的作用,故而得名。Publisher 和 Subscriper 通过 TCP 协议与 Hub 程序通信。Publisher 把消息发到某个 topic 上,Subscribers 订阅该 topic,然后就能收到消息。即 publisher 借助 hub 把消息广播给了多个 subscribers。这种 pub/sub 结构的好处在于可以增加多个 Subscriber 而不用修改 Publisher,从而一定程度上实现了“解耦”(也可以看成分布式的 observer pattern)。

    设计模式中的observer pattern的核心代码如下:

    class Subject
    {
    	public:
    		virtual ~Subject();
    		virtual void Attach(Observer* obv);
    		virtual void Detach(Observer* obv);
    		virtual void Notify();
    		virtual void SetState(const State& st) = 0;
    		virtual State GetState() = 0;
    	
    	protected:
    		Subject();
    		
    	private:
    	list<Observer* >* _obvs;
    };
    
    class Observer
    {
    	public:
    		virtual ~Observer();
    		virtual void Update(Subject* sub) = 0;
    		virtual void PrintInfo() = 0;
    		
    	protected:
    		Observer();
    		State _st;
    		
    	private:
    };
    ————————————————————————————————————————————————————————————————————
    	Subject* sub = new ConcreteSubject();
    	Observer* o1 = new ConcreteObserverA(sub);//构造和析构时,调用Attach、Detach
    	Observer* o2 = new ConcreteObserverB(sub);
    	
    	sub->SetState("old");
    	sub->Notify();        //调用(*it)->Update(this),转调PrintInfo打印信息
    	sub->SetState("new"); 
    	sub->Notify();
    ————————————————————————————————————————————————————————————————————
    [root@192 base_use]# ./Observer
    ConcreteObserverB observer....old
    ConcreteObserverA observer....old
    ConcreteObserverB observer....new
    ConcreteObserverA observer....new
    

    应用层广播在分布式系统中用处很大,举例如下:

    1. 体育比分转播。有 8 片比赛场地正在进行羽毛球比赛,每个场地的计分程序把当前比分发送到各自的 topic 上(第 1 号场地发送到 court1,第 2 号发送到 court2,以此类推)。需要用到比分的程序(赛场的大屏幕显示,网上比分转播等等)自己订阅感兴趣的 topic ,就能及时收到最新比分数据。由于本文实现的不是 100% 可靠广播,那么消息应该是 snapshot,而不是 incremental。(换句话说,消息的内容是“现在是几比几”,而不是“刚才谁得分”。)

    2. 负载监控。每台机器上运行一个监控程序,周期性地把本机当前负载(CPU、网络、磁盘、温度)publish 到以 hostname 命名的 topic 上,这样需要用到这些数据的程序只要在 hub 订阅相应的 topic 就能获得数据,无需与多台机器直接打交道。沿着这个思路,分布式系统中的服务程序也可以把自己的当前负载发布到 hub 上,供 load balancer 和 monitor 取用。

    moduo中的消息广播协议如下:

    • sub <topic> /r/n

      • 该命令表示订阅 ,以后该 topic 有任何内容更新都会发给这个 tcp 连接。在 sub 的时候,hub 会把该 上最近的消息发给此 subscriber。
    • unsub <topic> /r/n

      • 该命令表示退订 <topic>
    • pub <topic>/r/n <content>/r/n

      • 往 <topic>发送消息,内容为<content>。所有订阅了此<topic> 的 subscribers 会收到同样的消息“pub <topic>/r/n <content>/r/n” muduo的程序代码中将hub拆分成四个部分:
    • hub 服务程序:负责一对多的消息分发。它会记住每个 client 订阅了哪些 topic,只把消息发给特定的订阅者。

    • pubsub 公共库:为了方便编写使用 hub 服务的应用程序,编写公共的client library,用来和 hub 打交道。这个 * library 可以订阅 topic、退订 topic、往指定 topic 发布消息。

    • sub 程序:订阅一个或多个 topic,然后等待 hub 的数据。

    • pub 程序:向某个 topic 发布一条消息,消息内容由命令行参数指定。 一个程序可以既是 publisher 又是 subscriber,而且 pubsub 库只用一个 tcp 连接。

    PubSubClient的类成员定义:

    class PubSubClient : muduo::noncopyable
    {
     public:
      typedef std::function<void (PubSubClient*)> ConnectionCallback;
      typedef std::function<void (const string& topic,
                                  const string& content,
                                  muduo::Timestamp)> SubscribeCallback;
    
      PubSubClient(muduo::net::EventLoop* loop,
                   const muduo::net::InetAddress& hubAddr,
                   const string& name);
      void start();
      void stop();
      bool connected() const;
    
      void setConnectionCallback(const ConnectionCallback& cb)
      { connectionCallback_ = cb; }
    
      bool subscribe(const string& topic, const SubscribeCallback& cb);
      void unsubscribe(const string& topic);
      bool publish(const string& topic, const string& content);
    
     private:
      void onConnection(const muduo::net::TcpConnectionPtr& conn);
      void onMessage(const muduo::net::TcpConnectionPtr& conn,
                     muduo::net::Buffer* buf,
                     muduo::Timestamp receiveTime);
      bool send(const string& message);
    
      muduo::net::TcpClient client_;
      muduo::net::TcpConnectionPtr conn_;
      ConnectionCallback connectionCallback_;
      SubscribeCallback subscribeCallback_;
    }
    

    其中主要成员函数为subscribe,unsubscribe,publish,这三个函数主要封装message内容,然后通过TcpConnection的send函数发送消息:

    bool PubSubClient::subscribe(const string& topic, const SubscribeCallback& cb)
    {
      string message = "sub " + topic + "\r\n";
      subscribeCallback_ = cb;
      return send(message);
    }
    
    void PubSubClient::unsubscribe(const string& topic)
    {
      string message = "unsub " + topic + "\r\n";
      send(message);
    }
    
    
    bool PubSubClient::publish(const string& topic, const string& content)
    {
      string message = "pub " + topic + "\r\n" + content + "\r\n";
      return send(message);
    }
    
    bool PubSubClient::send(const string& message)
    {
      bool succeed = false;
      if (conn_ && conn_->connected())
      {
        conn_->send(message);
        succeed = true;
      }
      return succeed;
    }
    

    Hub的类成员定义:

    class PubSubServer : noncopyable
    {
     public:
      PubSubServer(muduo::net::EventLoop* loop,
                   const muduo::net::InetAddress& listenAddr)
        : loop_(loop),
          server_(loop, listenAddr, "PubSubServer")
      {
        server_.setConnectionCallback(
            std::bind(&PubSubServer::onConnection, this, _1));
        server_.setMessageCallback(
            std::bind(&PubSubServer::onMessage, this, _1, _2, _3));
        loop_->runEvery(1.0, std::bind(&PubSubServer::timePublish, this));
      }
    
      void start()
      {
        server_.start();
      }
    
     private:
      void onConnection(const TcpConnectionPtr& conn)
      {
        if (conn->connected())
        {
          conn->setContext(ConnectionSubscription());
        //typedef std::set<string> ConnectionSubscription;
        //每次有连接进来,初始化空的set
        }
        else
        {
          const ConnectionSubscription& connSub
            = boost::any_cast<const ConnectionSubscription&>(conn->getContext());
          // subtle: doUnsubscribe will erase *it, so increase before calling.
          for (ConnectionSubscription::const_iterator it = connSub.begin();
               it != connSub.end();)
          {
            doUnsubscribe(conn, *it++);
          }
        }
      }
    
      void onMessage(const TcpConnectionPtr& conn,
                     Buffer* buf,
                     Timestamp receiveTime)
      {
        ParseResult result = kSuccess;
        while (result == kSuccess)
        {
          string cmd;
          string topic;
          string content;
          result = parseMessage(buf, &cmd, &topic, &content);
          if (result == kSuccess)//解析buf的内容
          {
            if (cmd == "pub")
            {
              doPublish(conn->name(), topic, content, receiveTime);
            }
            else if (cmd == "sub")
            {
              LOG_INFO << conn->name() << " subscribes " << topic;
              doSubscribe(conn, topic);
            }
            else if (cmd == "unsub")
            {
              doUnsubscribe(conn, topic);
            }
            else
            {
              conn->shutdown();
              result = kError;
            }
          }
          else if (result == kError)
          {
            conn->shutdown();
          }
        }
      }
    
      void timePublish()
      {
        Timestamp now = Timestamp::now();
        doPublish("internal", "utc_time", now.toFormattedString(), now);
      }
    
      void doSubscribe(const TcpConnectionPtr& conn,
                       const string& topic)
      {
        ConnectionSubscription* connSub
          = boost::any_cast<ConnectionSubscription>(conn->getMutableContext());
    
        connSub->insert(topic);
        getTopic(topic).add(conn);
        //根据topic的名字,获取对应的Topic对象,然后将conn通过add函数加入audiences_
        //Topic的成员std::set<TcpConnectionPtr> audiences_;
      }
    
      void doUnsubscribe(const TcpConnectionPtr& conn,
                         const string& topic)
      {
        LOG_INFO << conn->name() << " unsubscribes " << topic;
        getTopic(topic).remove(conn);
        // topic could be the one to be destroyed, so don't use it after erasing.
        ConnectionSubscription* connSub
          = boost::any_cast<ConnectionSubscription>(conn->getMutableContext());
        connSub->erase(topic);
      }
    
      void doPublish(const string& source,
                     const string& topic,
                     const string& content,
                     Timestamp time)
      {
        getTopic(topic).publish(content, time);
      }
    
      Topic& getTopic(const string& topic)
      {
        std::map<string, Topic>::iterator it = topics_.find(topic);
        if (it == topics_.end())
        {
          it = topics_.insert(make_pair(topic, Topic(topic))).first;
        }
        return it->second;
      }
    
      EventLoop* loop_;
      TcpServer server_;
      std::map<string, Topic> topics_;
    };
    

    其中主要逻辑在onMessage函数里,通过parseMessage判断pub、sub、unsub,从而执行对应的处理逻辑doPublish、doSubscribe、doUnsubscribe。

    消息广播服务与observer Pattern中相同的思想:

    • observer Pattern :Subject对象维持着一个Observer的list,每次sub通过SetState改变状态后,可以通过Notify函数去遍历list中所有的obvs,调用其Update方法,最终转调GetState获取状态,并且PrintInfo打印状态信息。

    • 消息广播服务:其中Hub的作用类似Subject,通过PubSubServer维持着一个Topic的map<string, Topic>,且Topic成员中维持着std::set<TcpConnectionPtr> audiences_。即PubSubServer通过接收TcpConnection的消息(从sub或pub发送过来的),作对应的doPublish、doSubscribe操作。

    消息交互过程如下,代码见muduo中examples/hub:

    [root@192 bin]# ./hub 9980
    20201207 23:45:23.648955Z  2306 INFO  TcpServer::newConnection [PubSubSerubSubServer-0.0.0.0:9980#1] from 127.0.0.1:33008 - TcpServer.cc:80
    20201207 23:45:23.649658Z  2306 INFO  PubSubServer-0.0.0.0:9980#1 subscri
    20201207 23:45:23.649669Z  2306 INFO  PubSubServer-0.0.0.0:9980#1 subscri
    20201207 23:45:37.978798Z  2306 INFO  TcpServer::newConnection [PubSubSerubSubServer-0.0.0.0:9980#2] from 127.0.0.1:33010 - TcpServer.cc:80
    20201207 23:45:37.978827Z  2306 INFO  PubSubServer-0.0.0.0:9980#2 subscri
    20201207 23:47:12.095250Z  2306 INFO  TcpServer::newConnection [PubSubSerubSubServer-0.0.0.0:9980#3] from 127.0.0.1:33012 - TcpServer.cc:80
    20201207 23:47:12.095828Z  2306 INFO  TcpServer::removeConnectionInLoop [on PubSubServer-0.0.0.0:9980#3 - TcpServer.cc:109
    20201207 23:48:17.200370Z  2306 INFO  TcpServer::newConnection [PubSubSerubSubServer-0.0.0.0:9980#4] from 127.0.0.1:33014 - TcpServer.cc:80
    20201207 23:48:17.200486Z  2306 INFO  TcpServer::removeConnectionInLoop [on PubSubServer-0.0.0.0:9980#4 - TcpServer.cc:109
    
    
    [root@192 bin]# ./sub 127.0.0.1 9980 music book
    Usage: ./sub hub_ip:port topic [topic ...]
    [root@192 bin]# ./sub 127.0.0.1:9980 music book
    20201207 23:45:23.648594Z  2319 INFO  TcpClient::TcpClient[root@192.168.2x1B705D0 - TcpClient.cc:69
    20201207 23:45:23.648769Z  2319 INFO  TcpClient::connect[root@192.168.2.2 127.0.0.1:9980 - TcpClient.cc:107
    music: Raining
    book: Redis
    
    [root@192 bin]# ./sub 127.0.0.1:9980 book
    20201207 23:45:37.978528Z  2328 INFO  TcpClient::TcpClient[root@192.168.2x1C025A0 - TcpClient.cc:69
    20201207 23:45:37.978610Z  2328 INFO  TcpClient::connect[root@192.168.2.2 127.0.0.1:9980 - TcpClient.cc:107
    book: Redis
    
    [root@192 bin]# ./pub 127.0.0.1:9980 music "Raining"
    20201207 23:47:12.094796Z  2337 INFO  TcpClient::TcpClient[root@192.168.2x1951570 - TcpClient.cc:69
    20201207 23:47:12.094949Z  2337 INFO  TcpClient::connect[root@192.168.2.2 127.0.0.1:9980 - TcpClient.cc:107
    20201207 23:47:12.095980Z  2337 INFO  TcpClient::~TcpClient[root@192.168.0x1951570 - TcpClient.cc:75
    [root@192 bin]# ./pub 127.0.0.1:9980 book "Redis"
    20201207 23:48:17.200157Z  2349 INFO  TcpClient::TcpClient[root@192.168.2x1415570 - TcpClient.cc:69
    20201207 23:48:17.200205Z  2349 INFO  TcpClient::connect[root@192.168.2.2 127.0.0.1:9980 - TcpClient.cc:107
    20201207 23:48:17.200585Z  2349 INFO  TcpClient::~TcpClient[root@192.168.0x1415570 - TcpClient.cc:75
    

    ​

    Last Updated:
    Contributors: klc407073648
    Prev
    timing wheel
    Next
    线程安全的对象生命期管理