High performance service asynchronous communication logic

Recently, I sorted out the logic of asynchronous communication of service program. Asynchronous logic is quite different from synchronous logic. Asynchronous logic may involve multiple callbacks to complete a complete request processing. The logic is fragmented and divided into serial steps. Friends who are used to writing synchronous logic may not be able to turn their thinking around.

🔥 Source: High performance service asynchronous communication logic

1. Logic

  • For high-performance asynchronous non blocking services, the bottom layer generally uses the multiplexing I/O model to manage events, and the Linux platform uses epoll.
  • Epoll supports asynchronous event logic. epoll_wait will take the ready event from the kernel for processing.
  • The service handles events. Each fd corresponds to an event handler. callback handles the retrieved events.
  • callback logic is divided into logical steps step. These steps are generally asynchronous serial processing, and the timing is similar to that of synchronization. However, asynchronous logic may need to call back many times to complete a complete logic.

Source of design drawing:< Asynchronous service framework communication process>

2. redis source code logic

Normal logic generally has N steps. The difference between asynchronous logic and asynchronous logic is realized through callback logic. Compared with synchronous logic, it is indeed a little anti-human. The callback can also locate the original execution body. The key point is privdata.

Let's look at the callback logic of redis. ( github source code)

  • Event structure.
typedef struct redisAeEvents {
    redisAsyncContext *context;
    aeEventLoop *loop;
    int fd;
    int reading, writing;
} redisAeEvents;
  • Add a read event and bind privdata (redisAeEvents) to the corresponding event and callback function.
static void redisAeAddRead(void *privdata) {
    redisAeEvents *e = (redisAeEvents*)privdata;
    aeEventLoop *loop = e->loop;
    if (!e->reading) {
        e->reading = 1;
        aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
    }
}
  • Callback.
static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
    ((void)el); ((void)fd); ((void)mask);

    redisAeEvents *e = (redisAeEvents*)privdata;
    redisAsyncHandleRead(e->context);
}

3. State machine

Using state machine to realize asynchronous logic is a common practice. Asynchronous logic is already very complex. If the design of state machine is complex, it will increase the complexity of the project. Therefore, the state machine is implemented with switch, and simplicity is enough.

The following test code is rough. It only implements a few simple operations, and there are dozens of lines of code. With (python/golang) Co process, the source code can be controlled within 20 lines, and the performance can be considered to a certain extent.

In some agile R & D teams, using callback to write asynchronous logic is not a wise practice. It is not a performance bottleneck. It is not recommended to use asynchronous logic to write business. After all, the goal is to deliver projects quickly and promote business. And many times, the cost of adding several machines is far lower than adding one employee.

github test source code

namespace kim {

enum E_STEP {
    E_STEP_PARSE_REQUEST = 0,
    E_STEP_REDIS_SET,
    E_STEP_REDIS_SET_CALLBACK,
    E_STEP_REDIS_GET,
    E_STEP_REDIS_GET_CALLBACK,
};

Cmd::STATUS CmdTestRedis::execute_steps(int err, void* data) {
    int port = 6379;
    std::string host("127.0.0.1");

    switch (get_exec_step()) {
        case E_STEP_PARSE_REQUEST: {
            const HttpMsg* msg = m_req->get_http_msg();
            if (msg == nullptr) {
                return Cmd::STATUS::ERROR;
            }

            LOG_DEBUG("cmd test redis, http path: %s, data: %s",
                      msg->path().c_str(), msg->body().c_str());

            CJsonObject req_data(msg->body());
            if (!req_data.Get("key", m_key) ||
                !req_data.Get("value", m_value)) {
                LOG_ERROR("invalid request data! pls check!");
                return response_http(ERR_FAILED, "invalid request data");
            }
            return execute_next_step(err, data);
        }
        case E_STEP_REDIS_SET: {
            LOG_DEBUG("step redis set, key: %s, value: %s", m_key.c_str(), m_value.c_str());
            std::vector<std::string> rds_cmds{"set", m_key, m_value};
            Cmd::STATUS status = redis_send_to(host, port, rds_cmds);
            if (status == Cmd::STATUS::ERROR) {
                return response_http(ERR_FAILED, "redis failed!");
            }
            return status;
        }
        case E_STEP_REDIS_SET_CALLBACK: {
            redisReply* reply = (redisReply*)data;
            if (err != ERR_OK || reply == nullptr ||
                reply->type != REDIS_REPLY_STATUS || strncmp(reply->str, "OK", 2) != 0) {
                LOG_ERROR("redis set data callback failed!");
                return response_http(ERR_FAILED, "redis set data callback failed!");
            }
            LOG_DEBUG("redis set callback result: %s", reply->str);
            return execute_next_step(err, data);
        }
        case E_STEP_REDIS_GET: {
            std::vector<std::string> rds_cmds{"get", m_key};
            Cmd::STATUS status = redis_send_to(host, port, rds_cmds);
            if (status == Cmd::STATUS::ERROR) {
                return response_http(ERR_FAILED, "redis failed!");
            }
            return status;
        }
        case E_STEP_REDIS_GET_CALLBACK: {
            redisReply* reply = (redisReply*)data;
            if (err != ERR_OK || reply == nullptr || reply->type != REDIS_REPLY_STRING) {
                LOG_ERROR("redis get data callback failed!");
                return response_http(ERR_FAILED, "redis set data failed!");
            }
            LOG_DEBUG("redis get callback result: %s, type: %d", reply->str, reply->type);
            CJsonObject rsp_data;
            rsp_data.Add("key", m_key);
            rsp_data.Add("value", m_value);
            return response_http(ERR_OK, "success", rsp_data);
        }
        default: {
            LOG_ERROR("invalid step");
            return response_http(ERR_FAILED, "invalid step!");
        }
    }
}

}  // namespace kim

4. Performance

Stress test asynchronous http services with siege. Single process single thread support: long connection 1.5w qps, short connection 1w qps. The overall concurrency of multiple processes will be greater.

The data is obtained through the local pressure test of the Mac book. The data obtained from different machines may be different. The process concurrency ability is also directly related to the physical machine configuration.

  • Long connection.
# ./http_pressure.sh
{       "transactions":                        50000,
        "availability":                       100.00,
        "elapsed_time":                         3.38,
        "data_transferred":                     3.43,
        "response_time":                        0.01,
        "transaction_rate":                 14792.90,
        "throughput":                           1.02,
        "concurrency":                         99.66,
        "successful_transactions":             50000,
        "failed_transactions":                     0,
        "longest_transaction":                  0.02,
        "shortest_transaction":                 0.00
}
  • Short connection.
# ./http_pressure.sh
{       "transactions":                        10000,
        "availability":                       100.00,
        "elapsed_time":                         0.99,
        "data_transferred":                     0.69,
        "response_time":                        0.01,
        "transaction_rate":                 10101.01,
        "throughput":                           0.69,
        "concurrency":                         97.59,
        "successful_transactions":             10000,
        "failed_transactions":                     0,
        "longest_transaction":                  0.08,
        "shortest_transaction":                 0.00
}

5. Reference

Tags: epoll

Posted by knight on Tue, 03 May 2022 21:42:52 +0300