5 current limiting modes

3. Current limiting

If millions of users rush to buy, the number of goods is far lower than the number of users. If we use request to queue or query cache. It doesn't make any sense for the final result. We should reduce the waste of resources and reduce our back-end pressure. We limit the flow of second kill service.

Current limiting algorithm:

Any current limiting is not aimless. Common current limiting algorithms are token bucket leakage.

3.1 token bucket
Use Google Guava of RateLimter Provides an implementation class based on token bucket algorithm. Can be very simple to complete the current limiting stunt. And we can adjust our speed according to the actual situation of our system.

code:

package com.etc.access;

import com.alibaba.fastjson.JSON;
import com.etc.common.AbnoNum;
import com.etc.common.ResultBean;
import com.google.common.util.concurrent.RateLimiter;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @Author Mr.findelist
 * @program: seckillplus
 * @Date 2020/8/11  10:44
 * Use RateLimter of Google Guava to provide implementation classes based on token bucket algorithm. Can be very simple to complete the current limiting stunt.
 * And we can adjust our speed according to the actual situation of our system.
 **/
@Component
@Aspect
public class RateLimitAspect {


    /**
     * //Token bucket used to store different interfaces (key value token bucket)
     * // Thread safe collection
     */
    private ConcurrentHashMap<String, RateLimiter> map = new ConcurrentHashMap<>();

    @Autowired
    private HttpServletResponse response;

    @Pointcut("execution(public * com.etc.controller.*.*(..))")
    public void serciceLimit() {
    }

    /**
     * Token current limiting Google
     */

    private RateLimiter rateLimiter;


    @Around("serciceLimit()")
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
        Object obj = null;
        //Gets the name of the intercepted method
        Signature sig = joinPoint.getSignature();
        MethodSignature methodSignature = (MethodSignature) sig;
        // 2 processing target object
        Object target = joinPoint.getTarget();
        //  3 get the information of annotation
        Method method = target.getClass().getMethod(sig.getName(), methodSignature.getParameterTypes());
        //  4 number of tokens obtained
        RateLimite rate = method.getAnnotation(RateLimite.class);
        if (rate == null) {
            return joinPoint.proceed();
        }
        //  5 number of verification tokens
        int limitNum = rate.limitNum();
        checkRateLimitNum(limitNum);
        //  7 get the name of the interface
        String name = methodSignature.getName();
        //  8 determine whether the map contains the interface name key
        if (map.containsKey(name)) {
            rateLimiter = map.get(name);
        } else {
            map.put(name, RateLimiter.create(limitNum));
            rateLimiter = map.get(name);
        }
        try {
            if (rateLimiter.tryAcquire()) {
                obj = joinPoint.proceed();
            } else {
                render(AbnoNum.ACCESS_LIMIT_FREQUENTLY);
            }
        } catch (Throwable throwable) {
            throwable.printStackTrace();
        }
        return obj;
    }

    private void render(AbnoNum cm) throws IOException {
        response.setContentType("application/json; charset=UTF-8");
        OutputStream outputStream = response.getOutputStream();
        String str = JSON.toJSONString(ResultBean.error(cm));
        outputStream.write(str.getBytes("UTF-8"));
        outputStream.flush();
        outputStream.close();
    }

    /**
     * Verification token
     */
    private void checkRateLimitNum(int linmit) {
        if (linmit <= 0) {
            throw new IllegalArgumentException("annotation rate error");
        }
    }
}

3.2 leaky barrel
3.3 application of current limiting
  1. Tomcat: in the Tomcat container, we can customize the thread pool. Configure the maximum number of links. The flow limit of the request is reached.
   <Executor name="tomcatThreadPool" 
              namePrefix="catalina-exec-"
              maxThreads="150" 
              minSpareThreads="4"/>

Name: thread name

namePrefix: each thread is allowed to have a name string and prefix

maxThreads: the maximum number of links this thread can hold: 150 by default

minSpareThreads: number of inactive threads opened

Configure Connector

    <Connector executor="tomcatThreadPool" 
               port="8080" protocol="HTTP/1.1"
               connectionTimeout="20000"
               redirectPort="8443" 
               minProcessors="5"	
               maxProcessors="75"
               acceptCount="1000"
               />

executor: indicates the thread pool corresponding to the parameter value

minProcessors: the number of threads processed by the server at startup

maxProcessors: maximum number of threads that can be created

acceptCount: the specified number of threads for all requests that can be processed is used. The number of requests that can be put on the queue. If it exceeds, it will not be handled.

3.4 distributed current limiting:

Nginx

How to use the basic current limiting of nginx? For example, a single ip address can be accessed 20 times per second. The current limiting module of nginx. Once we use the current limiting module. 503 error will be returned to the client if the error exceeds

Configure nginx conf

#Unified configuration in http domain	
#Restriction request	
limit_req_zone $binary_remote_addr $uri zone=api_read:20m rate=50r/s;	
#Configure a connection zone by ip	
limit_conn_zone $binary_remote_addr zone=perip_conn:10m;	
#Configure a connection zone by server	
limit_conn_zone $server_name zone=perserver_conn:100m;	
server {	
        listen       80;	
        server_name  seckill.52itstyle.com;	
        index index.jsp;	
        location / {	
              #The request is queued through burst. The default is 0	
              limit_req zone=api_read burst=5;	
              #The number of connections is limited to 2 for each IP concurrent request	
              limit_conn perip_conn 2;	
              #The number of connections limited by the service (that is, the number of concurrent connections of the server is limited)	
              limit_conn perserver_conn 1000;	
              #Connection speed limit	
              limit_rate 100k;	
              proxy_pass      http://seckill;	
        }	
}	
upstream seckill {	
        fair;	
        server  172.16.1.120:8080 weight=1  max_fails=2 fail_timeout=30s;	
        server  172.16.1.130:8080 weight=1  max_fails=2 fail_timeout=30s;	
}
3.5OpenResty

People also have some open source current limiting schemes. I brought one called lua_resty_limit_traffic module. It is more convenient to use.

Limit the total number of concurrent interfaces / requests

Displays the number of requests for the interface time window

The code is as follows:

local limit_conn = require "resty.limit.conn"
--Limit requests under 200 concurrent requests and an additional burst request of 100 concurrent requests. That is, we delay
--Request less than 300 concurrent connections and more than 200 connections, and reject any new requested connections more than 300.
--In addition, we assume that the default request time is 0.5 Seconds, that is, through the following log_by_lua Medium leaving()Call dynamic adjustment.
--The above is the description of the configuration parameters given on the official website.("my_limit_conn_store", 200, 100, 0.5) This is the parameter given by the official website
--We can adjust the parameters as follows("my_limit_conn_store", 1, 0, 0.5)
        -- Limit one ip Maximum 1 concurrent request from client
        -- burst Set to 0. If the maximum number of concurrent requests is exceeded, 503 will be returned directly,
        -- If you want to allow sudden increase of concurrent number here, you can modify it burst Value of (bucket capacity of leaky bucket)
        -- The last parameter is actually that you need to estimate how long these concurrent (or single requests) will be processed, so as to apply the leaky bucket algorithm to the requests in the bucket

local lim, err = limit_conn.new("my_limit_conn_store", 200, 100, 0.5)
                if not lim then
                    ngx.log(ngx.ERR,
                            "failed to instantiate a resty.limit.conn object: ", err)
                    return ngx.exit(500)
                end
--The following calls must be for each request. Here we use remote control( IP)Address as limit key
-- commit by true Representative to update shared dict in key The value of,
-- false The representative only views the delay of the current request to be processed and the number of requests that have not been processed before
local key = ngx.var.binary_remote_addr
                local delay, err = lim:incoming(key, true)
                if not delay then
                    if err == "rejected" then
                        return ngx.exit(503)
                    end
                    ngx.log(ngx.ERR, "failed to limit req: ", err)
                    return ngx.exit(500)
                end
---- If information such as the requested connection count is added shared dict In, then in ctx According to the record,
-- Because the connection will be told to be disconnected later to deal with other connections

                if lim:is_committed() then
                    local ctx = ngx.ctx
                    ctx.limit_conn = lim
                    ctx.limit_conn_key = key
                    ctx.limit_conn_delay = delay
                end

local conn = err

        -- Actually, here delay It must be an integral multiple of the concurrent processing time mentioned above,
        -- For example, 100 concurrencies are processed per second, and the bucket capacity is 200. At that time, if 500 concurrencies come at the same time, 200 are rejected
        -- 100 Of the 200 connections that are being processed, and then 200 enter the bucket for temporary storage, 0-100 Connections should actually be delayed by 0.5 Second processing,
        -- 101-200 0 should be postponed.5*2=1 Second processing (0).5 Is the concurrent processing time estimated above)

                if delay >= 0.001 then
        --The request exceeds 200 connections but less than 300 connections, so we deliberately delayed it here to meet the 200 connection limit.
                    ngx.sleep(delay)
                end

Code implementation:

/**
     * Get second kill path
     * @param user
     * @param goodsId
     * @return
     */
    @GetMapping("path")
    @ResponseBody
    public Result getMiaoShaPath(MiaoShaUser user, @RequestParam("goodsId") long goodsId,HttpServletRequest request) {
        if (user == null) {
            return Result.error(CodeMsg.SESSION_ERROR);
        }

        //Query access times 5 seconds 5 times
        String url = request.getRequestURI();
        String key = url + user.getId();

        Integer count = redisService.get(AccessKey.getUserUrlCount, key, Integer.class);
        if (count == null){
            redisService.set(AccessKey.getUserUrlCount, key, 1);
        }else if(count < 5){
            redisService.incr(AccessKey.getUserUrlCount, key);
        }else {
            return Result.error(CodeMsg.ACCESS_LIMIT_FREQUENTLY);
        }

        String miaoShaPath = iMiaoShaOrderService.createMiaoShaPath(user.getId(), goodsId);
        return Result.success(miaoShaPath);
    }
3.6 counter mode

Common implementation

    @AccessLimit(seconds = 5,maxConunt = 5)
    @GetMapping("path")
    @ResponseBody
    public Result getMiaoShaPath(MiaoShaUser user, @RequestParam("goodsId") long goodsId,HttpServletRequest request) {
        if (user == null) {
            return Result.error(CodeMsg.SESSION_ERROR);
        }

//        //Query access times 5 seconds 5 times
//        String url = request.getRequestURI();
//        String key = url + user.getId();
//
//        Integer count = redisService.get(AccessKey.getUserUrlCount, key, Integer.class);
//        if (count == null){
//            redisService.set(AccessKey.getUserUrlCount, key, 1);
//        }else if(count < 5){
//            redisService.incr(AccessKey.getUserUrlCount, key);
//        }else {
//            return Result.error(CodeMsg.ACCESS_LIMIT_FREQUENTLY);
//        }

        String miaoShaPath = iMiaoShaOrderService.createMiaoShaPath(user.getId(), goodsId);
        return Result.success(miaoShaPath);
    }

Annotation interceptor mode:

package com.etc.access;

import com.alibaba.fastjson.JSON;
import com.etc.common.AbnoNum;
import com.etc.common.ResultBean;
import com.etc.domian.User;
import com.etc.redis.AccessKey;
import com.etc.redis.RedisService;
import com.etc.service.impl.UserServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.OutputStream;

/**
 * @Author Mr.findelist
 * @program: seckillplus
 * @Date 2020/8/10  15:12
 **/
@Service
public class AccessInterCeptor extends HandlerInterceptorAdapter {
    @Autowired
    private UserServiceImpl userService;

    @Autowired
    private RedisService redisService;

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        //Determine whether it comes from the method body
        if (handler instanceof HandlerMethod) {
            User user = getUser(request, response);
            HandlerMethod hm = (HandlerMethod) handler;
            //Which annotation is included
            AccessLimit accessLimit = hm.getMethodAnnotation(AccessLimit.class);

            if (accessLimit == null) {
                return true;
            }
            //Gets the value of the annotation
            int count = accessLimit.maxCount();
            int seconds = accessLimit.seconds();
            boolean nedlogin = accessLimit.nendLogin();
            String key = request.getRequestURL().toString();
            //User login status
            if (nedlogin) {
                if (user == null) {
                    render(response, AbnoNum.USERNAME_IS_ENIST);
                }
                key += "_" + user.getId();
            }
            //Number of key fetches from cache
            Integer cou = redisService.get(AccessKey.getUserUrlcount, key, Integer.class);
            AccessKey accessKey = new AccessKey(seconds, "access:");
            //If it is not added to the cache, if it is less than the current limit, it will be increased by one, and if it exceeds the current limit, it will exit
            if (cou == null) {
                redisService.set(accessKey, key, 1);
            } else if (cou < count) {
                redisService.incr(accessKey, key);
            } else {
                render(response, AbnoNum.ACCESS_LIMIT_FREQUENTLY);
                return false;
            }
        }
        return true;
    }

    public void render(HttpServletResponse response, AbnoNum abnoNum) throws IOException {
        //Write information to the page
        response.setContentType("application/json; charset=UTF-8");
        OutputStream outputStream = response.getOutputStream();
        String s = JSON.toJSONString(ResultBean.error(abnoNum));
        outputStream.write(s.getBytes("utf-8"));
        outputStream.flush();
        outputStream.close();

    }

    public User getUser(HttpServletRequest request, HttpServletResponse response) {
        String parameter = request.getParameter("token");
        String cookieValue = getCookieValue(request, "token");
        if (StringUtils.isEmpty(parameter) && StringUtils.isEmpty(cookieValue)) {
            return null;
        }
        String cookie = StringUtils.isEmpty(cookieValue) ? parameter : cookieValue;
        return userService.getcookie(response, cookie);
    }

    /**
     * Get the token value in the cookie
     */
    private String getCookieValue(HttpServletRequest request, String cookieName) {
        Cookie[] cookies = request.getCookies();
        if (cookies != null) {
            for (Cookie cookie : cookies) {
                if (cookie.getName().equals(cookieName)) {
                    return cookie.getValue();
                }
            }
        }

        return null;
    }

}


Posted by Fallen_angel on Sun, 22 May 2022 22:03:25 +0300