Learning notes of muduo Network Library -- message broadcasting service

This paper introduces how to use muduo to implement a simple topic based message broadcasting service. Message broadcasting service is actually a simple extension of "chat room", but the chat is not people, but programs in distributed systems.

In the distributed system, in addition to the commonly used end-to-end communication, there is also one to many broadcast communication. This paper discusses application layer broadcasting based on TCP protocol. The schematic diagram is as follows:

The rounded rectangle in the figure above represents the program. "Hub" is a service program, not a network hub. It plays a role similar to the hub, so it is named. Publisher and Subscriber communicate with hub program through TCP protocol. Publisher sends a message to a topic, and subscribers subscribe to the topic, and then they can receive the message. That is, publisher broadcasts messages to multiple subscribers with the help of hub. The advantage of this pub/sub structure is that multiple subscribers can be added without modifying publisher, so as to realize "decoupling" to a certain extent (you can also see the observer pattern of component layout).

The core code of observer pattern in design pattern is as follows:

class Subject
{
	public:
		virtual ~Subject();
		virtual void Attach(Observer* obv);
		virtual void Detach(Observer* obv);
		virtual void Notify();
		virtual void SetState(const State& st) = 0;
		virtual State GetState() = 0;
	
	protected:
		Subject();
		
	private:
	list<Observer* >* _obvs;
};

class Observer
{
	public:
		virtual ~Observer();
		virtual void Update(Subject* sub) = 0;
		virtual void PrintInfo() = 0;
		
	protected:
		Observer();
		State _st;
		
	private:
};
----------------------------------
	Subject* sub = new ConcreteSubject();
	Observer* o1 = new ConcreteObserverA(sub);//When constructing and destructing, call Attach and Detach
	Observer* o2 = new ConcreteObserverB(sub);
	
	sub->SetState("old");
	sub->Notify();        //Call (* it) - > Update (this) to transfer PrintInfo to print information
	sub->SetState("new"); 
	sub->Notify();
----------------------------------
[root@192 base_use]# ./Observer
ConcreteObserverB observer....old
ConcreteObserverA observer....old
ConcreteObserverB observer....new
ConcreteObserverA observer....new

Application layer broadcasting is very useful in distributed systems. Examples are as follows:

1. Sports score broadcast. There are eight playing fields in badminton competition. The scoring program of each field sends the current score to their respective topic (field 1 to court1, field 2 to court2, and so on). You need to use the score program (large screen display of the field, online score broadcasting, etc.) to subscribe to the topic you are interested in, and you can receive the latest score data in time. Since this article does not implement 100% reliable broadcasting, the message should be a snapshot, not incremental. (in other words, the content of the message is "what's the score now", not "who scored just now".)

2. Load monitoring. A monitoring program runs on each machine and periodically publishes the current load (CPU, network, disk and temperature) of the machine to the topic named hostname. In this way, the program that needs these data can obtain the data as long as it subscribes to the corresponding topic in the hub without directly dealing with multiple machines. Following this idea, service programs in distributed systems can also publish their current load to the hub for load balancer and monitor.

The message broadcasting protocol in moduo is as follows:

  • sub <topic> /r/n
    • This command indicates subscription. Any content updates of this topic will be sent to this tcp connection in the future. During sub, the hub will send the latest messages to the subscriber.
  • unsub <topic> /r/n
    • This command means unsubscribing < topic >
  • pub <topic>/r/n <content>/r/n
    • Send a message to < topic >, the content is < content >. All subscribers who subscribe to this < topic > will receive the same message "pub < topic > / R / N < content > / R / N"

muduo's program code divides the hub into four parts:

  • One to many message distribution service hub. It will remember which topic s each client subscribes to and only send messages to specific subscribers.
  • pubsub Public Library: in order to facilitate the writing of applications using hub services, a public client library is written to deal with hub. This library can subscribe to topics, unsubscribe from topics, and publish messages to specified topics.
  • sub program: subscribe to one or more topic s and wait for the data of the hub.
  • pub program: publishes a message to a topic. The message content is specified by the command line parameters.

A program can be both publisher and subscriber, and the pubsub library uses only one tcp connection.

Class member definition of PubSubClient:

class PubSubClient : muduo::noncopyable
{
 public:
  typedef std::function<void (PubSubClient*)> ConnectionCallback;
  typedef std::function<void (const string& topic,
                              const string& content,
                              muduo::Timestamp)> SubscribeCallback;

  PubSubClient(muduo::net::EventLoop* loop,
               const muduo::net::InetAddress& hubAddr,
               const string& name);
  void start();
  void stop();
  bool connected() const;

  void setConnectionCallback(const ConnectionCallback& cb)
  { connectionCallback_ = cb; }

  bool subscribe(const string& topic, const SubscribeCallback& cb);
  void unsubscribe(const string& topic);
  bool publish(const string& topic, const string& content);

 private:
  void onConnection(const muduo::net::TcpConnectionPtr& conn);
  void onMessage(const muduo::net::TcpConnectionPtr& conn,
                 muduo::net::Buffer* buf,
                 muduo::Timestamp receiveTime);
  bool send(const string& message);

  muduo::net::TcpClient client_;
  muduo::net::TcpConnectionPtr conn_;
  ConnectionCallback connectionCallback_;
  SubscribeCallback subscribeCallback_;
}

The main member functions are subscribe, unsubscribe and publish. These three functions mainly encapsulate the message content, and then send the message through the send function of TcpConnection:

bool PubSubClient::subscribe(const string& topic, const SubscribeCallback& cb)
{
  string message = "sub " + topic + "\r\n";
  subscribeCallback_ = cb;
  return send(message);
}

void PubSubClient::unsubscribe(const string& topic)
{
  string message = "unsub " + topic + "\r\n";
  send(message);
}


bool PubSubClient::publish(const string& topic, const string& content)
{
  string message = "pub " + topic + "\r\n" + content + "\r\n";
  return send(message);
}

bool PubSubClient::send(const string& message)
{
  bool succeed = false;
  if (conn_ && conn_->connected())
  {
    conn_->send(message);
    succeed = true;
  }
  return succeed;
}

Class member definition of Hub:

class PubSubServer : noncopyable
{
 public:
  PubSubServer(muduo::net::EventLoop* loop,
               const muduo::net::InetAddress& listenAddr)
    : loop_(loop),
      server_(loop, listenAddr, "PubSubServer")
  {
    server_.setConnectionCallback(
        std::bind(&PubSubServer::onConnection, this, _1));
    server_.setMessageCallback(
        std::bind(&PubSubServer::onMessage, this, _1, _2, _3));
    loop_->runEvery(1.0, std::bind(&PubSubServer::timePublish, this));
  }

  void start()
  {
    server_.start();
  }

 private:
  void onConnection(const TcpConnectionPtr& conn)
  {
    if (conn->connected())
    {
      conn->setContext(ConnectionSubscription());
    //typedef std::set<string> ConnectionSubscription;
    //Every time a connection comes in, initialize an empty set
    }
    else
    {
      const ConnectionSubscription& connSub
        = boost::any_cast<const ConnectionSubscription&>(conn->getContext());
      // subtle: doUnsubscribe will erase *it, so increase before calling.
      for (ConnectionSubscription::const_iterator it = connSub.begin();
           it != connSub.end();)
      {
        doUnsubscribe(conn, *it++);
      }
    }
  }

  void onMessage(const TcpConnectionPtr& conn,
                 Buffer* buf,
                 Timestamp receiveTime)
  {
    ParseResult result = kSuccess;
    while (result == kSuccess)
    {
      string cmd;
      string topic;
      string content;
      result = parseMessage(buf, &cmd, &topic, &content);
      if (result == kSuccess)//Parse the contents of buf
      {
        if (cmd == "pub")
        {
          doPublish(conn->name(), topic, content, receiveTime);
        }
        else if (cmd == "sub")
        {
          LOG_INFO << conn->name() << " subscribes " << topic;
          doSubscribe(conn, topic);
        }
        else if (cmd == "unsub")
        {
          doUnsubscribe(conn, topic);
        }
        else
        {
          conn->shutdown();
          result = kError;
        }
      }
      else if (result == kError)
      {
        conn->shutdown();
      }
    }
  }

  void timePublish()
  {
    Timestamp now = Timestamp::now();
    doPublish("internal", "utc_time", now.toFormattedString(), now);
  }

  void doSubscribe(const TcpConnectionPtr& conn,
                   const string& topic)
  {
    ConnectionSubscription* connSub
      = boost::any_cast<ConnectionSubscription>(conn->getMutableContext());

    connSub->insert(topic);
    getTopic(topic).add(conn);
    //According to the name of topic, get the corresponding topic object, and then add conn to audios through the add function_
    //Topic's member STD:: set < tcpconnectionptr > audios_;
  }

  void doUnsubscribe(const TcpConnectionPtr& conn,
                     const string& topic)
  {
    LOG_INFO << conn->name() << " unsubscribes " << topic;
    getTopic(topic).remove(conn);
    // topic could be the one to be destroyed, so don't use it after erasing.
    ConnectionSubscription* connSub
      = boost::any_cast<ConnectionSubscription>(conn->getMutableContext());
    connSub->erase(topic);
  }

  void doPublish(const string& source,
                 const string& topic,
                 const string& content,
                 Timestamp time)
  {
    getTopic(topic).publish(content, time);
  }

  Topic& getTopic(const string& topic)
  {
    std::map<string, Topic>::iterator it = topics_.find(topic);
    if (it == topics_.end())
    {
      it = topics_.insert(make_pair(topic, Topic(topic))).first;
    }
    return it->second;
  }

  EventLoop* loop_;
  TcpServer server_;
  std::map<string, Topic> topics_;
};

The main logic is in the onMessage function, which judges pub, sub and unsub through parseMessage, so as to execute the corresponding processing logic doPublish, doSubscribe and doUnsubscribe.

Message broadcast service has the same idea as observer Pattern:

  • observer Pattern: the Subject object maintains a list of observers. Each time a sub changes its state through SetState, you can use the Notify function to traverse all the obvs in the list, call its Update method, and finally transfer GetState to obtain the state, and PrintInfo prints the state information.
  • Message broadcasting service: the function of Hub is similar to that of Subject. The map < string, Topic > of a Topic is maintained through PubSubServer, and STD:: set < tcpconnectionptr > audios. That is, PubSubServer performs the corresponding doPublish and doSubscribe operations by receiving the message of TcpConnection (sent from sub or pub).

The message interaction process is as follows. See examples/hub in muduo for the code:

[root@192 bin]# ./hub 9980
20201207 23:45:23.648955Z  2306 INFO  TcpServer::newConnection [PubSubSerubSubServer-0.0.0.0:9980#1] from 127.0.0.1:33008 - TcpServer.cc:80
20201207 23:45:23.649658Z  2306 INFO  PubSubServer-0.0.0.0:9980#1 subscri
20201207 23:45:23.649669Z  2306 INFO  PubSubServer-0.0.0.0:9980#1 subscri
20201207 23:45:37.978798Z  2306 INFO  TcpServer::newConnection [PubSubSerubSubServer-0.0.0.0:9980#2] from 127.0.0.1:33010 - TcpServer.cc:80
20201207 23:45:37.978827Z  2306 INFO  PubSubServer-0.0.0.0:9980#2 subscri
20201207 23:47:12.095250Z  2306 INFO  TcpServer::newConnection [PubSubSerubSubServer-0.0.0.0:9980#3] from 127.0.0.1:33012 - TcpServer.cc:80
20201207 23:47:12.095828Z  2306 INFO  TcpServer::removeConnectionInLoop [on PubSubServer-0.0.0.0:9980#3 - TcpServer.cc:109
20201207 23:48:17.200370Z  2306 INFO  TcpServer::newConnection [PubSubSerubSubServer-0.0.0.0:9980#4] from 127.0.0.1:33014 - TcpServer.cc:80
20201207 23:48:17.200486Z  2306 INFO  TcpServer::removeConnectionInLoop [on PubSubServer-0.0.0.0:9980#4 - TcpServer.cc:109


[root@192 bin]# ./sub 127.0.0.1 9980 music book
Usage: ./sub hub_ip:port topic [topic ...]
[root@192 bin]# ./sub 127.0.0.1:9980 music book
20201207 23:45:23.648594Z  2319 INFO  TcpClient::TcpClient[root@192.168.2x1B705D0 - TcpClient.cc:69
20201207 23:45:23.648769Z  2319 INFO  TcpClient::connect[root@192.168.2.2 127.0.0.1:9980 - TcpClient.cc:107
music: Raining
book: Redis

[root@192 bin]# ./sub 127.0.0.1:9980 book
20201207 23:45:37.978528Z  2328 INFO  TcpClient::TcpClient[root@192.168.2x1C025A0 - TcpClient.cc:69
20201207 23:45:37.978610Z  2328 INFO  TcpClient::connect[root@192.168.2.2 127.0.0.1:9980 - TcpClient.cc:107
book: Redis

[root@192 bin]# ./pub 127.0.0.1:9980 music "Raining"
20201207 23:47:12.094796Z  2337 INFO  TcpClient::TcpClient[root@192.168.2x1951570 - TcpClient.cc:69
20201207 23:47:12.094949Z  2337 INFO  TcpClient::connect[root@192.168.2.2 127.0.0.1:9980 - TcpClient.cc:107
20201207 23:47:12.095980Z  2337 INFO  TcpClient::~TcpClient[root@192.168.0x1951570 - TcpClient.cc:75
[root@192 bin]# ./pub 127.0.0.1:9980 book "Redis"
20201207 23:48:17.200157Z  2349 INFO  TcpClient::TcpClient[root@192.168.2x1415570 - TcpClient.cc:69
20201207 23:48:17.200205Z  2349 INFO  TcpClient::connect[root@192.168.2.2 127.0.0.1:9980 - TcpClient.cc:107
20201207 23:48:17.200585Z  2349 INFO  TcpClient::~TcpClient[root@192.168.0x1415570 - TcpClient.cc:75

 

Posted by oni-kun on Tue, 03 May 2022 11:16:58 +0300