The use and source code analysis of microservice gateway current limiting middleware gateway integration and standard library rate limiting device

Microservice gateway (8) Current limiting middleware gateway integration and use of time/rate rate limiter and source code analysis

Current limiting principle

The meaning of current limit

Three powerful tools for high concurrency systems:

  • cache

    ​ Improve system access speed and increase processing capacity, and increase cache for corresponding services

  • downgrade

    ​ When the server pressure increases sharply, choose to downgrade according to business logic, so as to release server resources and ensure the normal operation of main business

  • Limiting

    ​ By limiting the concurrency rate, to achieve the processing of denial of service, queuing, downgrade, etc.

Current limiting classification

Leaky bucket current limiting

​ Calculate the bucket traffic for each request, and trigger a downgrade request when the traffic exceeds the threshold

Token Bucket Current Limit

​ Get the token from the token bucket every time you request, and trigger a downgrade request if you can't get it

use of time/rate speed limiter

​ time/rate is the current limiting algorithm that comes with the Golang standard library, which is implemented based on Token Bucket.

  • Create method

    rate.NewLimiter(limit,burst)
    limit represents the number of tokens generated per second, burst represents the maximum number of tokens stored in the token bucket

  • How to use (this is all three methods to get the token)

    • Allow: Determines whether the token can currently be obtained. If there is no token, return false, if there is a token, it will consume 1 token and return true
    • Wait: Block waiting until the token is obtained
    • Reserve: Return the waiting time, and then go to get the token
func Test_RateLimiter(t *testing.T) {
	l := rate.NewLimiter(1, 5)
	log.Println(l.Limit(), l.Burst())
	for i := 0; i < 100; i++ {
		//Block and wait until a token is obtained
		log.Println("before Wait")
		c, _ := context.WithTimeout(context.Background(), time.Second*2)
		if err := l.Wait(c); err != nil {
			log.Println("limiter wait err:" + err.Error())
		}
		log.Println("after Wait")

		//Return how long it takes to wait for a new token, so that you can wait for the specified time to execute the task
		r := l.Reserve()
		log.Println("reserve Delay:", r.Delay())

		//Determine whether the token can currently be obtained
		a := l.Allow()
		log.Println("Allow:", a)
		log.Println("------------------------")
	}
}

In the first five times, the token can be successfully obtained, and in the sixth time, the acquisition fails because the upper limit of the token is exceeded.

time/rate source code analysis

If the token is fixedly put into the token bucket, and the user obtains the token from the bucket, this is an implementation method of the token bucket, but the efficiency is too low, and it is not only necessary to maintain an operation of putting the token into the bucket. and a token bucket, also wastes some unnecessary memory

Therefore, in Golang's timer/rate, this method is not used, but the lazyload method is used. Before each consumption is known, the number of tokens is updated according to the time difference, and the token bucket is not used to store tokens. is just by counting

Outline process:

  1. Calculate the time difference between the last request and the current request
  2. The number of tokens generated within the calculation time difference + the number of old tokens
  3. If the token is negative, calculate the waiting time
  4. If the token is positive, after the request token-1

Structural Analysis

type Limiter struct {
	limit Limit		//number of token s per second
	burst int		//Maximum number of token buckets

	mu     sync.Mutex
	tokens float64	//number of tokens
	// last is the last time the limiter's tokens field was updated
	last time.Time	//Last token update time
	// lastEvent is the latest time of a rate-limited event (past or future)
	lastEvent time.Time//Last speed limit time
}

Three ways to use

In the internal implementation, all three consumption methods finally call the reserveN function to generate and consume token s

So, the core code, just come on

The implementation source code of reserveN method to see the process of consuming tokens in a request

func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
	lim.mu.Lock()
    //Return directly without setting limit
	if lim.limit == Inf {
		lim.mu.Unlock()
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: now,
		}
	}
	//Calculate the number of buckets according to the passage of time and return the result of the current limit
    //This method is used to calculate the current limiting result due to the passage of time
	now, last, tokens := lim.advance(now)

Pause here, go in and see the implementation of the advance method

func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
   last := lim.last
    // If now is earlier than last, last uses the earlier time
   if now.Before(last) {
      last = now
   }

   //1. Time fault-tolerant processing, don't look at it for the time being 
   //When last is expired data, delta delta removal should be avoided
   maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
   elapsed := now.Sub(last)
   if elapsed > maxElapsed {
      elapsed = maxElapsed
   }

   //2. The number of token s that can be generated within the calculation time difference delta delta delta
   delta := lim.limit.tokensFromDuration(elapsed)
    //Sum the number of tokens before and the number of tokens in the time difference
   tokens := lim.tokens + delta
    //Maximum number of tokens limit
   if burst := float64(lim.burst); tokens > burst {
      tokens = burst
   }
   return now, last, tokens
}

Then go back to the reserveN method and get the current limit return result (now, last, tokens)

	//Calculate the remaining token, because of course, the token must be subtracted by one for each request.
	tokens -= float64(n)

	// When the token is not enough, calculate the waiting time
	var waitDuration time.Duration
	if tokens < 0 {
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}

	// Decide result
	ok := n <= lim.burst && waitDuration <= maxFutureReserve

	// Prepare reservation
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}
	if ok {
		r.tokens = n
		r.timeToAct = now.Add(waitDuration)
	}

	// Update the status, close the lock and return
	if ok {
		lim.last = now
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	} else {
		lim.last = last
	}

	lim.mu.Unlock()
	return r
}

Gateway integrated current limiting

After figuring out the use of the time/rate speed limiter and the source code logic, you can see our main line: the middleware of the microservice gateway

Take the HTTP current limiting middleware as an example:

http_flow_limit.go

  1. Get service information from context
  2. Call the current limiter encapsulated in public to get the instance method to create the corresponding server/client current limiter instance limiter
  3. Call the Allow method from the limiter you get to consume
func HTTPFlowLimitMiddleware() gin.HandlerFunc {
   return func(c *gin.Context) {
      serviceInterface, ok := c.Get("service")
      if !ok {
         middleware.ResponseError(c, 2001, errors.New("service not fount"))
         c.Abort()
         return
      }
      serviceDetail := serviceInterface.(*dao.ServiceDetail)
      //Server-side current limiting operation
      if serviceDetail.AccessControl.ServiceFlowLimit != 0 {
         //1. Create and return the corresponding service current limiter instance according to the current limiter
         serviceLimiter, err := public.FlowLimiterHandler.GetLimiter(
            public.FlowServicePrefix+serviceDetail.Info.ServiceName,
            float64(serviceDetail.AccessControl.ServiceFlowLimit),
         )
         if err != nil {
            middleware.ResponseError(c, 5001, err)
         }
         //2. The request consumption token corresponding to the service current limiter instance
         if !serviceLimiter.Allow() {
            middleware.ResponseError(c, 5002, errors.New(fmt.Sprintf("service flow limit %v", serviceDetail.AccessControl.ServiceFlowLimit)))
            c.Abort()
            return
         }
      }

      if serviceDetail.AccessControl.ClientIPFlowLimit != 0 {
         //The same current limiting operation on the client side
         //1. Create a current limiter instance
         clientLimit, err := public.FlowLimiterHandler.GetLimiter(
            public.FlowServicePrefix+serviceDetail.Info.ServiceName+"_"+c.ClientIP(),
            float64(serviceDetail.AccessControl.ClientIPFlowLimit),
         )
         if err != nil {
            middleware.ResponseError(c, 5003, err)
            c.Abort()
            return
         }
         //2. Request a consumption token
         if !clientLimit.Allow() {
            middleware.ResponseError(c, 5002, errors.New(fmt.Sprintf("%v flow limit %v", c.ClientIP(), serviceDetail.AccessControl.ClientIPFlowLimit)))
            c.Abort()
            return
         }
      }
      c.Next()
   }
}

It is only a few steps to obtain the current limiter and consume the token in the middleware. Next, look at the encapsulated current limiter in the public to obtain the instance method

flow_limit_handler.go

Here we use the singleton pattern for encapsulation

The structure of the specific singleton pattern has been written many times, so I won't go into details here.

Look directly at the core method GetLimiter()

After reading the use of the time/rate speed limiter and source code analysis above, the core code is very easy to understand

​ According to the qps in the service information, call the rate constructor **rate.NewLimiter(limit,burst)** to get the rate limiter instance, limit is the QPS in the service information, here is the maximum token of the token bucket The number is QPS*3

Then! Not at all! ! ! It's that simple

var FlowLimiterHandler *FlowLimiter

type FlowLimiter struct {
   FlowLimiterMap   map[string]*FlowLimiterItem
   FlowLimiterSlice []*FlowLimiterItem
   Locker           sync.RWMutex
}

type FlowLimiterItem struct {
   ServiceName string
   Limiter     *rate.Limiter
}

func NewFlowLimiter() *FlowLimiter {
   return &FlowLimiter{
      FlowLimiterMap:   map[string]*FlowLimiterItem{},
      FlowLimiterSlice: []*FlowLimiterItem{},
      Locker:           sync.RWMutex{},
   }
}
func init() {
   FlowLimiterHandler = NewFlowLimiter()
}

func (counter *FlowLimiter) GetLimiter(serviceName string, qps float64) (*rate.Limiter, error) {
   for _, item := range counter.FlowLimiterSlice {
      if item.ServiceName == serviceName {
         return item.Limiter, nil
      }
   }

   //Create a limiter (tokens per second, max tokens in bucket)
   newLimiter := rate.NewLimiter(rate.Limit(qps), int(qps*3))
   item := &FlowLimiterItem{
      ServiceName: serviceName,
      Limiter:     newLimiter,
   }
   counter.FlowLimiterSlice = append(counter.FlowLimiterSlice, item)
   counter.Locker.Lock()
   defer counter.Locker.Unlock()
   counter.FlowLimiterMap[serviceName] = item
   return newLimiter, nil
}

Tags: Go Back-end Microservices Middleware http

Posted by firedrop on Thu, 20 Oct 2022 18:59:35 +0300