Implementation of Paxos distributed consistency protocol using GO

What is Paxos distributed consistency protocol

The initial services are often provided externally through a single architecture, that is, single Server single Database mode. With the continuous expansion of business, the number of users and requests is rising. How to deal with a large number of requests has become a problem that every service needs to solve, which is what we often call high concurrency. In order to solve the weakness of a single Server in the face of high concurrency, it can be solved by increasing the number of servers, that is, multi Server single Database (master slave) mode. At this time, the pressure comes to the Database. The IO efficiency of the Database determines the efficiency of the whole service. Continuing to increase the number of servers will not improve the service performance. This leads to the current hot micro service architecture. When user requests are distributed to a service instance through load balancing, how to ensure that other instances of the service can finally get the same data changes? This requires the Paxos distributed consistency protocol. Paxos solves the final consistency problem, that is, no matter which service instance gets, the same data can be obtained after a period of time. At present, many distributed products at home and abroad use Paxos protocol. It can be said that Paxos is almost the standard and pronoun of consistency protocol.

There are two kinds of Paxos protocols. We often mention Basic Paxos and the other is Multi Paxos. Unless otherwise specified, the Paxos protocols mentioned in this paper are Basic Paxos.

The Paxos agreement was first proposed by Leslie Lamport, a Turing prize winner, in his paper the part time parliament in 1998, which describes how Paxos, a Greek island, passed the resolution. But because the paper was obscure and difficult, few people in the computer industry at that time could understand it. So Lamport published Paxos Made Simple again in 2001. The abstract reads as follows:

The Paxos algorithm, when presented in plain English, is very simple.

No, no, no, no one really doesn't understand such a simple Paxos algorithm? However, the fact is that many people are discouraged from Paxos. It is not difficult to understand Paxos, but the difficulty of Paxos is engineering. How to use Paxos protocol to write a service that can really run in the production environment is the most difficult part of Paxos. For Paxos engineering, please refer to the report written by wechat background team Introduction to the implementation principle of PhxPaxos class library of wechat self-developed production level paxos

How does Paxos ensure consistency

Paxos protocol has two stages: Prepare and Propose. There are two roles: Proposer and Acceptor. Each service instance is both a Proposer and an Acceptor. The Proposer is responsible for the proposal. The Acceptor decides whether to accept the proposal from the Proposer. Once the proposal is accepted by the majority, we can claim that we have reached an agreement on the value contained in the proposal and will not change it again.

Stage 1: Prepare

  1. Proposer generates globally unique ProposalID (timestamp + ServerID)
  2. The Proposer sends a Prepare(n = ProposalID) request to all acceptors (including the Proposer itself)
  3. Acceptor compares n with minproposal, if n > minproposal, minproposal = n, acceptor returns accepted proposal (acceptedProposal, acceptedValue)
    • Commitment 1: no longer accept the Prepare request of N < = minproposal
    • Commitment 2: no longer accept the proposal request of N < minproposal
    • Response 1: return previously accepted proposals
  4. When the Proposer receives more than half of the returns
    • The Prepare request is rejected. Regenerate the ProposalID and send the Prepare request
    • If the Prepare request is accepted and there are accepted proposals, select the value corresponding to the largest ProposalID as the value of the proposal
    • The Prepare request is accepted and there are no accepted proposals. Any proposal value can be selected

Phase II: Propose

  1. The Proposer sends an Accept(n=ProposalID,value=ProposalValue) request to all acceptors (including the Proposer itself)
  2. Acceptor compares n with minProposal, if n > = minProposal, minProposal = n, acceptedValue = value, and returns the accepted proposal (minProposal, acceptedValue)
  3. When more than half of the proposers are received
    • The proposal request is rejected. Regenerate the ProposalID and send the Prepare request
    • If the proposal request is accepted, the data is consistent

Once the proposal is accepted by more than half of the services, we can claim that the whole service cluster has reached an agreement on the proposal.

 

It should be noted that in A service cluster, the above two phases are likely to occur at the same time. For example, instance A has completed the Prepare phase and sent A proposal request. At the same time, instance B starts the Prepare phase and generates A larger ProposalID to send the prepare request, which may cause the proposal request of instance A to be rejected. Each service instance also plays the roles of A Proposer and an Acceptor. While sending requests to other services, it may also be processing requests from other services.

 

Implementation of Paxos protocol using GO language

Service registration and discovery

Since each service instance is executing the same code, how do we know the entry (IP and port number) of other service instances? One way is to write it in the code or provide a configuration file. The configuration file can be read after the service is started. However, this method is not conducive to maintenance. Once we need to remove or add services, we need to rest the configuration file on each machine again.

In addition, we can register and know the total number of service instances in the current cluster through a third-party service: service registration and discovery, that is, change the local configuration file to the online configuration service.

Service registration: Register function. After the service instance is started, Register itself in the service management by calling this RPC method

func (s *Service) Register(args *RegisterArgs, reply *RegisterReply) error {
    s.mu.Lock()
    defer s.mu.Unlock()

    server := args.ServerInfo
    for _, server := range s.Servers {
        if server.IPAddress == args.ServerInfo.IPAddress && server.Port == args.ServerInfo.Port {
            reply.Succeed = false
            return nil
        }
    }
    reply.ServerID = len(s.Servers)
    reply.Succeed = true
    s.Servers = append(s.Servers, server)

    fmt.Printf("Current registerd servers:\n%v\n", s.Servers)

    return nil
}

Service discovery: GetServers function. The service obtains the information (IP and port number) of all service instances by calling this RPC method

func (s *Service) GetServers(args *GetServersArgs, reply *GetServersReply) error {
	// return all servers
	reply.ServerInfos = s.Servers

	return nil
}

Prepare phase

The Proposer sends Prepare requests to all services and waits until more than half of the services return results. Here, it can also wait for all services to return before processing. However, Paxos protocol can tolerate less than half of the service downtime, so we can only wait for more than N/2 returns. When any request is rejected in the returned result, the Proposer considers that the request is rejected, returns and regenerates the ProposalID and sends a new round of Prepare request.

func (s *Server) CallPrepare(allServers []ServerInfo, proposal Proposal) PrepareReply {
	returnedReplies := make([]PrepareReply, 0)

	for _, otherS := range allServers {
		// use a go routine to call every server
		go func(otherS ServerInfo) {
			delay := rand.Intn(10)
			time.Sleep(time.Second * time.Duration(delay))
			args := PrepareArgs{s.Info, proposal.ID}
			reply := PrepareReply{}
			fmt.Printf("[Prepare]Call Prepare on %v:%v with proposal id %v\n", otherS.IPAddress, otherS.Port, args.ProposalID)
			if Call(otherS, "Server.Prepare", &args, &reply) {
				if reply.HasAcceptedProposal {
					fmt.Printf("[Prepare]%v:%v returns accepted proposal: %v\n", otherS.IPAddress, otherS.Port, reply.AcceptedProposal)
				} else {
					fmt.Printf("[Prepare]%v:%v returns empty proposal\n", otherS.IPAddress, otherS.Port)
				}
				s.mu.Lock()
				returnedReplies = append(returnedReplies, reply)
				s.mu.Unlock()
			}
		}(otherS)
	}

	for {
		// wait for responses from majority
		if len(returnedReplies) > (len(allServers))/2.0 {
			checkReplies := returnedReplies

			// three possible response
			// 1. deny the prepare, and return an empty/accepted proposal
			//    as the proposal id is not higher than minProposalID on server (proposal id <= server.minProposalID)
			// 2. accept the prepare, and return an empty proposal as the server has not accept any proposal yet
			// 3. accept the prepare, and return an accepted proposal

			// check responses from majority
			// find the response with max proposal id
			acceptedProposal := NewProposal()

			for _, r := range checkReplies {
				// if any response refused the prepare, this server should resend prepare
				if !r.PrepareAccepted {
					return r
				}

				if r.HasAcceptedProposal && r.AcceptedProposal.ID > acceptedProposal.ID {
					acceptedProposal = r.AcceptedProposal
				}
			}

			// if some other server has accepted proposal, return that proposal with max proposal id
			// if no other server has accepted proposal, return an empty proposal
			return PrepareReply{HasAcceptedProposal: !acceptedProposal.IsEmpty(), AcceptedProposal: acceptedProposal, PrepareAccepted: true}
		}

		//fmt.Printf("Waiting for response from majority...\n")
		time.Sleep(time.Second * 1)
	}
}

Acceptor: by comparing the ProposalID and minProposal, if the ProposalID is less than or equal to minProposal, the Prepare request will be rejected; otherwise, the minProposal will be updated to ProposalID. Finally, return to the accepted proposal

func (s *Server) Prepare(args *PrepareArgs, reply *PrepareReply) error {
	s.mu.Lock()
	defer s.mu.Unlock()

	// 2 promises and 1 response
	// Promise 1
	// do not accept prepare request which ProposalID <= minProposalID
	// Promise 2
	// do not accept propose request which ProposalID < minProposalID
	// Response 1
	// respond with accepted proposal if any

	if reply.PrepareAccepted = args.ProposalID > s.minProposalID; reply.PrepareAccepted {
		// ready to accept the proposal with Id s.minProposalID
		s.minProposalID = args.ProposalID
	}
	reply.HasAcceptedProposal = s.readAcceptedProposal()
	reply.AcceptedProposal = s.Proposal

	return nil
}

Proposal stage

The Proposer also first sends a proposal request to all services and waits for more than half of the services to return the result. If any request in the returned result is rejected, the Proposer considers that the request is rejected, and returns to regenerate the ProposalID and send a new round of Prepare requests

func (s *Server) CallPropose(allServers []ServerInfo, proposal Proposal) ProposeReply {
	returnedReplies := make([]ProposeReply, 0)

	for _, otherS := range allServers {
		go func(otherS ServerInfo) {
			delay := rand.Intn(5000)
			time.Sleep(time.Millisecond * time.Duration(delay))
			args := ProposeArgs{otherS, proposal}
			reply := ProposeReply{}
			fmt.Printf("[Propose]Call Propose on %v:%v with proposal: %v\n", otherS.IPAddress, otherS.Port, args.Proposal)
			if Call(otherS, "Server.Propose", &args, &reply) {
				fmt.Printf("[Propose]%v:%v returns: %v\n", otherS.IPAddress, otherS.Port, reply)
				s.mu.Lock()
				returnedReplies = append(returnedReplies, reply)
				s.mu.Unlock()
			}
		}(otherS)
	}

	for {
		// wait for responses from majority
		if len(returnedReplies) > (len(allServers))/2.0 {
			checkReplies := returnedReplies

			for _, r := range checkReplies {
				if !r.ProposeAccepted {
					return r
				}
			}

			return checkReplies[0]
		}

		time.Sleep(time.Second * 1)
	}
}

Acceptor: by comparing the ProposalID and minProposal, if the ProposalID is less than minProposal, reject the proposal request; otherwise, update the minProposal to ProposalID and persist the proposal to the local disk.

func (s *Server) Propose(args *ProposeArgs, reply *ProposeReply) error {
	if s.minProposalID <= args.Proposal.ID {
		s.mu.Lock()
		s.minProposalID = args.Proposal.ID
		s.Proposal = args.Proposal
		s.SaveAcceptedProposal()
		s.mu.Unlock()

		reply.ProposeAccepted = true
	}

	reply.ProposalID = s.minProposalID

	return nil
}

 

function

Operation results:

Here, I have opened a total of three service instances and added random delay before each request to simulate the delay in network communication. Therefore, each request of each service is not sent at the same time

One moving picture:

One static result:

It can be seen that although the three services will try to use their own port number (500150025003) as the proposal value at the beginning, after the Prepare / proposal fails, they will regenerate a larger ProposalID and start a new round of proposal process (Prepare, Propose), and finally reach an agreement with 5003.

Summary

So far, we have realized the core logic of Paxos protocol with GO. But it is obvious that there are still many problems in this code, which can not meet the needs of the production environment at all

  • Share data through channel instead of mutex lock
  • How to handle the removal and addition of service instances
  • How to avoid falling into a trap

In the next article, we will explore how to solve the above problems.

 

Finally, welcome to pay attention to my personal official account: SoBrian. I look forward to communicating with you and growing together!

 

 

 

Tags: Go Distribution

Posted by Hybride on Tue, 17 May 2022 04:57:40 +0300