Microservice gateway (8) Current limiting middleware gateway integration and use of time/rate rate limiter and source code analysis
Current limiting principle
The meaning of current limit
Three powerful tools for high concurrency systems:
-
cache
Improve system access speed and increase processing capacity, and increase cache for corresponding services
-
downgrade
When the server pressure increases sharply, choose to downgrade according to business logic, so as to release server resources and ensure the normal operation of main business
-
Limiting
By limiting the concurrency rate, to achieve the processing of denial of service, queuing, downgrade, etc.
Current limiting classification
Leaky bucket current limiting
Calculate the bucket traffic for each request, and trigger a downgrade request when the traffic exceeds the threshold
Token Bucket Current Limit
Get the token from the token bucket every time you request, and trigger a downgrade request if you can't get it
use of time/rate speed limiter
time/rate is the current limiting algorithm that comes with the Golang standard library, which is implemented based on Token Bucket.
-
Create method
rate.NewLimiter(limit,burst)
limit represents the number of tokens generated per second, burst represents the maximum number of tokens stored in the token bucket -
How to use (this is all three methods to get the token)
- Allow: Determines whether the token can currently be obtained. If there is no token, return false, if there is a token, it will consume 1 token and return true
- Wait: Block waiting until the token is obtained
- Reserve: Return the waiting time, and then go to get the token
func Test_RateLimiter(t *testing.T) { l := rate.NewLimiter(1, 5) log.Println(l.Limit(), l.Burst()) for i := 0; i < 100; i++ { //Block and wait until a token is obtained log.Println("before Wait") c, _ := context.WithTimeout(context.Background(), time.Second*2) if err := l.Wait(c); err != nil { log.Println("limiter wait err:" + err.Error()) } log.Println("after Wait") //Return how long it takes to wait for a new token, so that you can wait for the specified time to execute the task r := l.Reserve() log.Println("reserve Delay:", r.Delay()) //Determine whether the token can currently be obtained a := l.Allow() log.Println("Allow:", a) log.Println("------------------------") } }
In the first five times, the token can be successfully obtained, and in the sixth time, the acquisition fails because the upper limit of the token is exceeded.

time/rate source code analysis
If the token is fixedly put into the token bucket, and the user obtains the token from the bucket, this is an implementation method of the token bucket, but the efficiency is too low, and it is not only necessary to maintain an operation of putting the token into the bucket. and a token bucket, also wastes some unnecessary memory
Therefore, in Golang's timer/rate, this method is not used, but the lazyload method is used. Before each consumption is known, the number of tokens is updated according to the time difference, and the token bucket is not used to store tokens. is just by counting
Outline process:
- Calculate the time difference between the last request and the current request
- The number of tokens generated within the calculation time difference + the number of old tokens
- If the token is negative, calculate the waiting time
- If the token is positive, after the request token-1
Structural Analysis
type Limiter struct { limit Limit //number of token s per second burst int //Maximum number of token buckets mu sync.Mutex tokens float64 //number of tokens // last is the last time the limiter's tokens field was updated last time.Time //Last token update time // lastEvent is the latest time of a rate-limited event (past or future) lastEvent time.Time//Last speed limit time }
Three ways to use
In the internal implementation, all three consumption methods finally call the reserveN function to generate and consume token s
So, the core code, just come on
The implementation source code of reserveN method to see the process of consuming tokens in a request
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { lim.mu.Lock() //Return directly without setting limit if lim.limit == Inf { lim.mu.Unlock() return Reservation{ ok: true, lim: lim, tokens: n, timeToAct: now, } } //Calculate the number of buckets according to the passage of time and return the result of the current limit //This method is used to calculate the current limiting result due to the passage of time now, last, tokens := lim.advance(now)
Pause here, go in and see the implementation of the advance method
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { last := lim.last // If now is earlier than last, last uses the earlier time if now.Before(last) { last = now } //1. Time fault-tolerant processing, don't look at it for the time being //When last is expired data, delta delta removal should be avoided maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens) elapsed := now.Sub(last) if elapsed > maxElapsed { elapsed = maxElapsed } //2. The number of token s that can be generated within the calculation time difference delta delta delta delta := lim.limit.tokensFromDuration(elapsed) //Sum the number of tokens before and the number of tokens in the time difference tokens := lim.tokens + delta //Maximum number of tokens limit if burst := float64(lim.burst); tokens > burst { tokens = burst } return now, last, tokens }
Then go back to the reserveN method and get the current limit return result (now, last, tokens)
//Calculate the remaining token, because of course, the token must be subtracted by one for each request. tokens -= float64(n) // When the token is not enough, calculate the waiting time var waitDuration time.Duration if tokens < 0 { waitDuration = lim.limit.durationFromTokens(-tokens) } // Decide result ok := n <= lim.burst && waitDuration <= maxFutureReserve // Prepare reservation r := Reservation{ ok: ok, lim: lim, limit: lim.limit, } if ok { r.tokens = n r.timeToAct = now.Add(waitDuration) } // Update the status, close the lock and return if ok { lim.last = now lim.tokens = tokens lim.lastEvent = r.timeToAct } else { lim.last = last } lim.mu.Unlock() return r }
Gateway integrated current limiting
After figuring out the use of the time/rate speed limiter and the source code logic, you can see our main line: the middleware of the microservice gateway
Take the HTTP current limiting middleware as an example:
http_flow_limit.go
- Get service information from context
- Call the current limiter encapsulated in public to get the instance method to create the corresponding server/client current limiter instance limiter
- Call the Allow method from the limiter you get to consume
func HTTPFlowLimitMiddleware() gin.HandlerFunc { return func(c *gin.Context) { serviceInterface, ok := c.Get("service") if !ok { middleware.ResponseError(c, 2001, errors.New("service not fount")) c.Abort() return } serviceDetail := serviceInterface.(*dao.ServiceDetail) //Server-side current limiting operation if serviceDetail.AccessControl.ServiceFlowLimit != 0 { //1. Create and return the corresponding service current limiter instance according to the current limiter serviceLimiter, err := public.FlowLimiterHandler.GetLimiter( public.FlowServicePrefix+serviceDetail.Info.ServiceName, float64(serviceDetail.AccessControl.ServiceFlowLimit), ) if err != nil { middleware.ResponseError(c, 5001, err) } //2. The request consumption token corresponding to the service current limiter instance if !serviceLimiter.Allow() { middleware.ResponseError(c, 5002, errors.New(fmt.Sprintf("service flow limit %v", serviceDetail.AccessControl.ServiceFlowLimit))) c.Abort() return } } if serviceDetail.AccessControl.ClientIPFlowLimit != 0 { //The same current limiting operation on the client side //1. Create a current limiter instance clientLimit, err := public.FlowLimiterHandler.GetLimiter( public.FlowServicePrefix+serviceDetail.Info.ServiceName+"_"+c.ClientIP(), float64(serviceDetail.AccessControl.ClientIPFlowLimit), ) if err != nil { middleware.ResponseError(c, 5003, err) c.Abort() return } //2. Request a consumption token if !clientLimit.Allow() { middleware.ResponseError(c, 5002, errors.New(fmt.Sprintf("%v flow limit %v", c.ClientIP(), serviceDetail.AccessControl.ClientIPFlowLimit))) c.Abort() return } } c.Next() } }
It is only a few steps to obtain the current limiter and consume the token in the middleware. Next, look at the encapsulated current limiter in the public to obtain the instance method
flow_limit_handler.go
Here we use the singleton pattern for encapsulation
The structure of the specific singleton pattern has been written many times, so I won't go into details here.
Look directly at the core method GetLimiter()
After reading the use of the time/rate speed limiter and source code analysis above, the core code is very easy to understand
According to the qps in the service information, call the rate constructor **rate.NewLimiter(limit,burst)** to get the rate limiter instance, limit is the QPS in the service information, here is the maximum token of the token bucket The number is QPS*3
Then! Not at all! ! ! It's that simple
var FlowLimiterHandler *FlowLimiter type FlowLimiter struct { FlowLimiterMap map[string]*FlowLimiterItem FlowLimiterSlice []*FlowLimiterItem Locker sync.RWMutex } type FlowLimiterItem struct { ServiceName string Limiter *rate.Limiter } func NewFlowLimiter() *FlowLimiter { return &FlowLimiter{ FlowLimiterMap: map[string]*FlowLimiterItem{}, FlowLimiterSlice: []*FlowLimiterItem{}, Locker: sync.RWMutex{}, } } func init() { FlowLimiterHandler = NewFlowLimiter() } func (counter *FlowLimiter) GetLimiter(serviceName string, qps float64) (*rate.Limiter, error) { for _, item := range counter.FlowLimiterSlice { if item.ServiceName == serviceName { return item.Limiter, nil } } //Create a limiter (tokens per second, max tokens in bucket) newLimiter := rate.NewLimiter(rate.Limit(qps), int(qps*3)) item := &FlowLimiterItem{ ServiceName: serviceName, Limiter: newLimiter, } counter.FlowLimiterSlice = append(counter.FlowLimiterSlice, item) counter.Locker.Lock() defer counter.Locker.Unlock() counter.FlowLimiterMap[serviceName] = item return newLimiter, nil }