redis protocol and asynchronous mode

The artic l e refers to the C/C++linux service period advanced architecture system tutorial of <Zero Voice Education> to learn: Server high-level architecture

1. redis network

1.1, redis network

Micro: reactor

  • Composition: IO multiplexing + non-blocking IO
  • IO Responsibilities: IO Detection and IO Operations
  • Event: Asynchronous event processing process, register the event first, and process the event callback in the event loop

Macroscopically: You can ignore other processes and only focus on the packet processing process. When pipes (connections) form a complete package, events are processed.

1.2, redis protocol

redis protocol design

  • message boundary: character stream header + delimiter
  • Message Type: The first character of the string.

redis uses the RESP serialization protocol, and different parts of the protocol use CRLF(\r\n) to end.

The data type supported by RESP, the data type is judged by the first character

  • + Simple Strings

  • - Errors:

    -Error <message>\r\n
  • : Integers

    :<Numerical value>\r\n
  • $ Bulk Strings

    $<Data length>\r\n<data content>\r\n
  • * Arrays

    *<number of elements n>\r\n<element content>...<element n>

How RESP works in the redis request-response protocol

  • The client sends an array of strings ( Array + Bulk Strings) to the redis server

    *<Number of parameters>\r\n$<Length of parameter 1>\r\n<data for parameter 1>\r\n...$<parameter n length>\r\n<parameter n The data>\r\n
  • The redis server replies a RESP data type to the client according to the command implementation.

Take a look at the example below

In redis-cli, send a command set key value, the corresponding message is:


If the execution is successful, OK, and the response message is:


If the execution fails, the response message is

-ERR unknown command `ket`, with args beginning with: `key`, `value`, \r\n

2,redis pipline

redis pipline is a mechanism provided by the redis client, which has nothing to do with redis itself, and is designed to save network transmission time. Specifically, the client sends multiple requests at a time, and the redis server replies in sequence, similar to http 1.1.

3. redis transaction

Transaction: The user defines a series of database operations, which are regarded as a complete logical processing unit of work, either all executed or not all executed, which are inseparable units of work.

Explore the premise of transactions: in the case of concurrent connections, unpredictable conflicts caused by asynchronous execution of commands by different connections.

3.1. Characteristics of transactions

ACID, Atomicity-Consistency-Isolation-Durability

  • Atomicity: Transactions are indivisible, either all succeed or all fail, and a rollback mechanism must be provided if execution fails. The atomicity of atomic operations, only execution or non-execution, it is impossible for other threads to see other states. The atomicity of transactions also includes rollback operations.
  • Consistency: Before and after the transaction, all data maintains a consistent state and cannot violate the consistency check of the data. Consistency here refers to expected consistency not post-exception consistency. Type consistency, logical consistency, data consistency (master-slave database consistency)
  • Isolation: Concurrent transactions must be isolated from each other. redis is executed in a single thread and is naturally isolated. mysql A connection corresponds to a thread. Critical resources need to be locked in a multithreaded environment.
  • Persistence: Once the transaction is committed, the changes to the data are permanent, that is, the data is placed on the disk.

3.2, transaction commands

The redis client starts a transaction with MULTI and sends multiple commands to the queue of the server. The redis server will not execute the commands in the queue until the EXEC command is sent, and execute the queue as a whole.

# open transaction
# commit transaction
# cancel transaction
# Monitor key changes, call it before the transaction is started, and implement optimistic locking cas. If the key changes during transaction execution, cancel the transaction and return nil.

It is not used in actual work, because transaction commands are implemented by optimistic locking, and failures require retry, which will increase the complexity of business logic.

3.3, lua script

redis has a built-in lua interpreter to execute lua scripts, and achieve atomicity through lua scripts. l

Interview: lua scripts satisfy atomicity and isolation, but not consistency and durability.

  • Atomicity: Lua scripts are executed by one command, and all the commands in the script are executed together, which is atomic.
  • Consistency: If there is no consistency, the execution of the lua script fails, and the command that has been successfully applied to the database cannot be rolled back.
  • Isolation: redis executes in a single thread, and lua scripts run as a single packet.
  • Persistence: no persistence, only available in aof and appendfsync = always, the actual work will not use this method.

3.3.1. Commands

# test use
# execute lua script
EVAL script numkeys [key...] arg [arg...]

# actual use
# Only save 40-bit hash string, reduce the amount of data transmission
# 1. Cache the script, cache the script given by the user in the server, and return the SHA1 checksum (40-bit string) corresponding to the script as the result
# 2. Execute the cached script
EVALSHA sha1 numkeys key [key ...] arg [arg ...]

# Attachment: Script management command
# Check if script is cached
SCRIPT EXISTS sha1 [sha1...]
# Clear all script caches
# Force stop a running script, like an infinite loop

3.3.2. Application

  • When the project starts, after establishing the redis connection and verifying, load all lua scripts used in the project script load
  • If hot update is required in the project, execute script flush through redis-cli. Then all servers can be notified to reload the lua script by subscribing to the publish function
  • If the lua script in the project is blocked, you can use script kill to suspend the execution of the currently blocked script

Example: Doubling operation

set mark 1
# test use
eval "local'get',KEYS[1]);if val then'set', KEYS[1], 2*val);return 2*val;end;return 0;" 1 mark
(integer) 2> eval "local'get',KEYS[1]);if val then'set', KEYS[1], 2*val);return 2*val;end;return 0;" 1 darren
(integer) 0

# actual use
# 1. Cache script
script load "local'get',KEYS[1]);if val then'set', KEYS[1], 2*val);return 2*val;end;return 0;"
# 2. Execute the cache script
evalsha "9da2e1ac090f2e1df67087370de115a4291cd0bd" 1 mark
(integer) 4

4. redia publish and subscribe

In order to support the multicast mechanism of messages, redis introduces the publish-subscribe module, which is a distributed message queue mechanism. Subscribers use a specific channel to receive messages sent by senders to that channel. This mechanism does not guarantee that the message must arrive, and the stream method can be used to ensure that it is reachable.

The existing problems are: the sender sends a message, and if there are no subscribers, the message is discarded directly. If a subscriber disconnects during sending, the message is completely lost to the subscriber during the disconnection. In addition, when redis is stopped and restarted, the messages of pubsub are not persistent, and all messages are directly discarded.

4.1. Commands

# Send message to channel
publish channel message
# Subscribe to a channel
subscribe channel [channel...]
# Unsubscribe from a channel
unsubscribe [channel...]
# subscription model
psubscribe pattern [pattern...]
# Unsubscribe mode
punsubscribe [pattern...]
# View information about publish and subscribe

4.2. Application

The publish-subscribe function generally needs to reopen a connection, because the command connection strictly follows the request-response mode, and pubsub can receive the content actively pushed by redis. Therefore, if pubsub is supported in the actual project, another connection needs to be opened for processing publish and subscribe.

# A client subscribes to the channel
SUBSCRIBE news.shanxi news.henan news.shandong
# Another client subscribes to the channel, pattern matching
# Send a message to a channel, all subscribers of the channel receive the message
publish news.shanxi 'harmony'

5. redis asynchronous connection

hiredis is a redis C client library function that the server can use to access the redis server.

5.1. Synchronous connection

Synchronous connection is implemented by blocking io, but it will block the current thread until redis returns the result.

Reference documentation: Use of hiredis

For example: access redis, and automatically increment counter 1000 times to count the time.

// gcc redis-test-sync.c -o sync -lhiredis
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <hiredis/hiredis.h>

int current_tick() {
    int t = 0;
    struct timespec ti;
	clock_gettime(CLOCK_MONOTONIC, &ti);
	t = (int)ti.tv_sec * 1000;
	t += ti.tv_nsec / 1000000;
    return t;

int main(int argc, char **argv) {
    unsigned int j, isunix = 0;
    redisContext *c;
    redisReply *reply;
    const char *hostname = "";
    int port = 6379;

    struct timeval timeout = { 1, 500000 }; // 1.5 seconds

    c = redisConnectWithTimeout(hostname, port, timeout);

    if (c == NULL || c->err) {
        if (c) {
            printf("Connection error: %s\n", c->errstr);
        } else {
            printf("Connection error: can't allocate redis context\n");

    int num = (argc > 1) ? atoi(argv[1]) : 1000;

    int before = current_tick();

    reply = redisCommand(c, "auth 123456");

    for (int i = 0; i < num; ++i) {
        reply = redisCommand(c, "INCR counter");
        printf("INCR counter: %lld\n", reply->integer);

    int used = current_tick() - before;

    printf("after %d exec redis command, used %d ms\n", num, used);

    /* Disconnects and frees the context */

    return 0;

5.2. Asynchronous connection

Asynchronous connections are implemented using non-blocking io and will not block the current thread. The disadvantage is that the code writing is asynchronous and the business logic is fragmented, which can be solved by Ctrip. In the case of a large number of concurrent requests, with the io multi-threading and asynchronous connection pool after redis 6.0, it can better solve the data access performance of the application layer

5.2.1, redis driver

Redis driver: The server uses an asynchronous connection and needs to implement the redis driver by itself, which means that the redis connection needs to be integrated with the reactor in its own project for management.

Then you need to design a redis adapter, whose main functions are:

  • Build redis event objects, including: hiredis event objects and reactor event objects.
  • Adapt event control and reuse the event loop of reactor in the project.

To sum up, the encapsulation rules of hiredis are:

  • Implementation of reactor: All IO is implemented by the user.
  • Adapter implementation: hiredis provides event operation interfaces, and users need to adapt these event interfaces.
// The hiredis event interfaces that users need to adapt are:
addRead		// Add read event			
delRead 	// delete read event
addWrite	// Add write event
delWrite	// delete write event
cleanup		// event object release	

5.2.2. Examples

Here the example in 4.1 is implemented using an asynchronous method.

Step 1, implement the redis driver

#ifndef _REACTOR_
#define _REACTOR_

#include <stdio.h>
#include <unistd.h> // read write
#include <fcntl.h> // fcntl
#include <sys/types.h> // listen
#include <sys/socket.h> // socket
#include <errno.h> // errno
#include <arpa/inet.h> // inet_addr htons
// #include <netinet/tcp.h>
#include <assert.h> // assert
#include <sys/epoll.h>
#include <stdlib.h> // malloc
#include <string.h> // memcpy memmove

#include "chainbuffer/buffer.h"
// #include "ringbuffer/buffer.h"

#define MAX_EVENT_NUM 512 // Maximum number of events per user copy
#define MAX_CONN ((1<<16)-1) // Maximum number of event objects: 65535

typedef struct event_s event_t;
typedef void (*event_callback_fn)(int fd, int events, void *privdata);
typedef void (*error_callback_fn)(int fd, char * err);

// reactor object, manage io global variables 
typedef struct {
    int epfd;        // epfd 
    int listenfd;    // monitor fd
    int stop;        // stop loop marker
    event_t *events; // Store all monitored events (event_t), store on the heap, remember to release
    int iter;        // Used to iterate over events to get unused locations
    struct epoll_event fire[MAX_EVENT_NUM]; // User mode array, used to copy io events to user mode
} reactor_t;

// Event object, sockitem, saves the io state corresponding to each fd
struct event_s {
    int fd;         // corresponding event fd
    reactor_t *r;   // points to the reactor global object
    buffer_t in;    // read buffer, to be read
    buffer_t out;   // write buffer, to be sent
    event_callback_fn read_fn;  // read callback
    event_callback_fn write_fn; // write callback
    error_callback_fn error_fn; // error callback

int event_buffer_read(event_t *e);
int event_buffer_write(event_t *e, void * buf, int sz);

// Create the reactor object
reactor_t * create_reactor() {
    // Apply reactor object on the heap
    reactor_t *r = (reactor_t *)malloc(sizeof(*r));
    r->epfd = epoll_create(1);
    r->listenfd = 0;
    r->stop = 0;
    r->iter = 0;
    // Apply for the events array in reactor on the heap
    r->events = (event_t*)malloc(sizeof(event_t)*MAX_CONN);
    memset(r->events, 0, sizeof(event_t)*MAX_CONN);
    memset(r->fire, 0, sizeof(struct epoll_event) * MAX_EVENT_NUM);
    // init_timer();
    return r;

// release the reactor object
void release_reactor(reactor_t * r) {
    free(r->events);    // Release the events that the reactor has applied for on the heap
    close(r->epfd);     // close epoll
    free(r);            // release the reactor

// Get free event objects from reactor's event heap
event_t * _get_event_t(reactor_t *r) {
    r->iter ++;
    // Find unused event objects
    while (r->events[r->iter & MAX_CONN].fd > 0) {
    return &r->events[r->iter];

// event-based actions
// 1. Create an event object
event_t * new_event(reactor_t *R, int fd,
    event_callback_fn rd,
    event_callback_fn wt,
    error_callback_fn err) {
    assert(rd != 0 || wt != 0 || err != 0);
    // Get free event object
    event_t *e = _get_event_t(R);
    // Initialize the event object
    e->r = R;
    e->fd = fd;
    buffer_init(&e->in, 1024*16);
    buffer_init(&e->out, 1024*16);
    e->read_fn = rd;
    e->write_fn = wt;
    e->error_fn = err;

    return e;

// 2. Add events
int add_event(reactor_t *R, int events, event_t *e) {
    struct epoll_event ev; = events; = e;

	if (epoll_ctl(R->epfd, EPOLL_CTL_ADD, e->fd, &ev) == -1) {
        printf("add event err fd = %d\n", e->fd);
		return 1;
	return 0;

// Free up event space
void free_event(event_t *e) {

// 3. Delete event
int del_event(reactor_t *R, event_t *e) {
	epoll_ctl(R->epfd, EPOLL_CTL_DEL, e->fd, NULL);
    return 0;

// 4. Modify the event, which is determined by the following two parameters to be a read event or a write event
int enable_event(reactor_t *R, event_t *e, int readable, int writeable) {
	struct epoll_event ev; = (readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0); = e;

	if (epoll_ctl(R->epfd, EPOLL_CTL_MOD, e->fd, &ev) == -1) {
		return 1;
	return 0;

// one event loop
void eventloop_once(reactor_t * r, int timeout) {
    int n = epoll_wait(r->epfd, r->fire, MAX_EVENT_NUM, timeout);
    for (int i = 0; i < n; ++i) {
        struct epoll_event *e = &r->fire[i];  // get event
        int mask = e->events;                 // Get event type
        // Capture specific error messages with io functions
        if (e->events & EPOLLERR) mask |= EPOLLIN | EPOLLOUT;
        // Use io function to capture specific information of disconnection
        if (e->events & EPOLLHUP) mask |= EPOLLIN | EPOLLOUT;
        event_t *et = (event_t*) e->data.ptr; // Get user data associated with an event
        // Handling read events
        if (mask & EPOLLIN) {
            if (et->read_fn) {
                et->read_fn(et->fd, EPOLLIN, et);   // execute read callback
        // Handling write events
        if (mask & EPOLLOUT) {
            if (et->write_fn) {
                et->write_fn(et->fd, EPOLLOUT, et); // execute write callback
            else {
                uint8_t *buf = buffer_write_atmost(&et->out);
                event_buffer_write(et, buf, buffer_len(&et->out));

// stop the event loop
void stop_eventloop(reactor_t * r) {
    r->stop = 1;

// event loop
void eventloop(reactor_t * r) {
    while (!r->stop) {
        // int timeout = find_nearest_expire_timer();
        eventloop_once(r, /*timeout*/ -1);
        // expire_timer();

// set non-blocking fd
int set_nonblock(int fd) {
	int flag = fcntl(fd, F_GETFL, 0);
	return fcntl(fd, F_SETFL, flag | O_NONBLOCK);

// Create server
int create_server(reactor_t *R, short port, event_callback_fn func) {
	// 1,socket
    int listenfd = socket(AF_INET, SOCK_STREAM, 0);
	if (listenfd < 0) {
        printf("create listenfd error!\n");
		return -1;
	struct sockaddr_in addr;
	memset(&addr, 0, sizeof(struct sockaddr_in));
	addr.sin_family = AF_INET;
	addr.sin_port = htons(port);
	addr.sin_addr.s_addr = INADDR_ANY;

    // Set address is reusable
    int reuse = 1;
	if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(int)) == -1) {
        printf("reuse address error: %s\n", strerror(errno));
        return -1;

    // 2,bind
	if (bind(listenfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {
        printf("bind error %s\n", strerror(errno));
		return -1;

    // 3,listen
	if (listen(listenfd, 5) < 0) {
        printf("listen error %s\n", strerror(errno));
		return -1;
    // Set listenfd non-blocking 
    if (set_nonblock(listenfd) < 0) {
        printf("set_nonblock error %s\n", strerror(errno));
        return -1;

    R->listenfd = listenfd;

    // Register for read events
    event_t *e = new_event(R, listenfd, func, 0, 0);
    add_event(R, EPOLLIN, e);

	printf("listen port : %d\n", port);
	return 0;

// read data
int event_buffer_read(event_t *e) {
    int fd = e->fd; 
    int num = 0;    // The total amount of data read
    while (1) {
        // TODO: dont use char buf[] here
        char buf[1024] = {0};
        int n = read(fd, buf, 1024);

        // 1. read=0, the server receives the FIN packet, half-closed state
        // Todo: half-closed state logic processing, refer to skynet
        if (n == 0) { // 
            printf("close connection fd = %d\n", fd);
            if (e->error_fn) {
                e->error_fn(fd, "close socket");
            del_event(e->r, e);
            return 0;
        // 2. read=-1, read exception
        else if (n < 0) {
            // 2.1, EINTR: interrupt, retry
            if (errno == EINTR) {
            // 2.2, EWOULDBLOCK: blocking, the read buffer is empty 
            if (errno == EWOULDBLOCK) {
            // Other errors, execute the error callback, delete the event, close the current connection
            printf("read error fd = %d err = %s\n", fd, strerror(errno));
            if (e->error_fn)
                e->error_fn(fd, strerror(errno));
            del_event(e->r, e);
            return 0;
        // 3. read>0, normal, read data, process business logic
        else {
            printf("recv data from client:%s", buf);
            buffer_add(&e->in, buf, n);
        num += n;
    return num;

// Send data to peer
int _write_socket(event_t *e, void * buf, int sz) {
    int fd = e->fd;
    while (1) {
        int n = write(fd, buf, sz);
        // 1. write=-1, write exception
        if (n < 0) {
            // 2.1, EINTR: interrupt, retry
            if (errno == EINTR) {
            // 2.2, EWOULDBLOCK: blocking, need to register write events
            if (errno == EWOULDBLOCK) {
            // Other errors, execute the error callback, delete the event, close the current connection   
            if (e->error_fn) {
                e->error_fn(fd, strerror(errno));
            del_event(e->r, e);
        return n;
    return 0;

// write data
int event_buffer_write(event_t *e, void * buf, int sz) {
    // Point to user write buffer
    buffer_t *r = &e->out;
    // 1. User write buffer is full, start sending
    if (buffer_len(r) == 0) {
        // Send data to peer
        int n = _write_socket(e, buf, sz);
        // 1.1. The data has not been sent this time, the unsent data is written into the buffer, and the write event is registered
        if (n == 0 || n < sz) {
            // 1.1. Write the data that has not been sent into the buffer
            buffer_add(&e->out, (char *)buf + n, sz - n);
            // 1.2, register the write event, wait for the next event to trigger and then send
            enable_event(e->r, e, 1, 1);
            return 0;
        // 1.2. No data is sent this time
        else if (n < 0) {
            return 0;
        // 1.3. This data transmission is completed    
        return 1;
    // 2. The user write buffer is not full, write to the buffer and wait for sending
    buffer_add(&e->out, (char *)buf, sz);
    return 1;


The second step is to implement the redis adapter, mainly to build the redis event object and adapt the event control interface of hiredis.

// adapter_async.h
#ifndef _MARK_ADAPTER_
#define _MARK_ADAPTER_

#include <hiredis/hiredis.h>
#include <hiredis/alloc.h>
#include "reactor.h"

// redis event object
typedef struct {
    event_t e;              // reactor event object
    int mask;               // Store registered events
    redisAsyncContext *ctx; // hiredis event object
} redis_event_t;

// redis object read event callback
static void redisReadHandler(int fd, int events, void *privdata) {
    event_t *e = (event_t*)privdata;
    redis_event_t *re = (redis_event_t *)(char *)e;

// redis object write event read callback
static void redisWriteHandler(int fd, int events, void *privdata) {
    event_t *e = (event_t*)privdata;
    redis_event_t *re = (redis_event_t *)(char *)e;

 * @brief Update the event object managed by reactor
 * @param privdata  redis event object
 * @param flag      epoll event type to set 
 * @param remove    1 delete this event 0 add this event
static void redisEventUpdate(void *privdata, int flag, int remove) {
    redis_event_t *re = (redis_event_t *)privdata;
    reactor_t *r = re->e.r;
    int prevMask = re->mask;
    int enable = 0;             
    // The redis event object deletes the event
    if (remove) {
        if ((re->mask & flag) == 0) {
        re->mask &= ~flag;
        enable = 0;
    // add the event to the redis event object
    else {
        if (re->mask & flag) {
        re->mask |= flag;
        enable = 1;
    // Handling the reactor event object
    // 1. The reactor event object deletes the event
    if (re->mask == 0) {
        del_event(r, &re->e);
    // 2. The reactor event object adds the event (for the first time)
    else if (prevMask == 0) {
        add_event(r, re->mask, &re->e);
    // 3. The reactor event object modifies the event
    else {
        // Register for read events
        if (flag & EPOLLIN) {
            enable_event(r, &re->e, enable, 0);
        // Register for write events
        else if (flag & EPOLLOUT) {
            enable_event(r, &re->e, 0, enable);

// The hiredis event interface that needs to be adapted
// 1. Add read event to redis event object
static void redisAddRead(void *privdata) {
    redis_event_t *re = (redis_event_t *)privdata;
    re->e.read_fn = redisReadHandler;
    redisEventUpdate(privdata, EPOLLIN, 0);

// 2. The redis event object deletes the read event
static void redisDelRead(void *privdata) {
    redis_event_t *re = (redis_event_t *)privdata;
    re->e.read_fn = 0;
    redisEventUpdate(privdata, EPOLLIN, 1);

// 3. Add write event to redis event object
static void redisAddWrite(void *privdata) {
    redis_event_t *re = (redis_event_t *)privdata;
    re->e.write_fn = redisWriteHandler;
    redisEventUpdate(privdata, EPOLLOUT, 0);

// 4. redis event object delete write event
static void redisDelWrite(void *privdata) {
    redis_event_t *re = (redis_event_t *)privdata;
    re->e.write_fn = 0;
    redisEventUpdate(privdata, EPOLLOUT, 1);

// 5. redis event object release
static void redisCleanup(void *privdata) {
    redis_event_t *re = (redis_event_t *)privdata;
    reactor_t *r = re->e.r;
    del_event(r, &re->e);

// Redis event object binding, reactor object and redis async context
static int redisAttach(reactor_t *r, redisAsyncContext *ac) { 
    redisContext *c = &(ac->c); // redis synchronization context
    redis_event_t *re;          // redis event object
    /* Nothing should be attached when something is already attached */
    if (ac-> != NULL)
        return REDIS_ERR;

    /* Create container for ctx and r/w events */
    re = (redis_event_t*)hi_malloc(sizeof(*re));
    if (re == NULL) {
        return REDIS_ERR;

    // The redis event object binds the reactor object and the redis asynchronous context
    re->ctx = ac;       // Bind the redis async context
    re->e.fd = c->fd;   // bind redis fd
    re->e.r = r;        // bind reacotr
    re->mask = 0;       // bind event

    // redis asynchronous context settings, need to adapt event control
    // hiredis provides an event interface, and the user implements the event interface
    ac->ev.addRead = redisAddRead;
    ac->ev.delRead = redisDelRead;
    ac->ev.addWrite = redisAddWrite;
    ac->ev.delWrite = redisDelWrite;
    ac->ev.cleanup = redisCleanup;
    ac-> = re;

    return REDIS_OK;


Next, implement the main code and implement the function

// redis-test-async.c
// gcc redis-test-async.c chainbuffer/buffer.c -o async -lhiredis
#include <hiredis/hiredis.h>
#include <hiredis/async.h>
#include <time.h>

#include "reactor.h"
#include "adapter_async.h"

static reactor_t *R;
static int cnt, before, num;

int current_tick() {
    int t = 0;
    struct timespec ti;
	clock_gettime(CLOCK_MONOTONIC, &ti);
	t = (int)ti.tv_sec * 1000;
	t += ti.tv_nsec / 1000000;
    return t;

void getCallback(redisAsyncContext *c, void *r, void *privdata) {
    redisReply *reply = r;
    if (reply == NULL) return;
    printf("argv[%s]: %lld\n", (char*)privdata, reply->integer);

    /* Disconnect after receiving the reply to GET */
    if (cnt == num) {
        int used = current_tick()-before;
        printf("after %d exec redis command, used %d ms\n", num, used);

void connectCallback(const redisAsyncContext *c, int status) {
    if (status != REDIS_OK) {
        printf("Error: %s\n", c->errstr);

void disconnectCallback(const redisAsyncContext *c, int status) {
    if (status != REDIS_OK) {
        printf("Error: %s\n", c->errstr);


int main(int argc, char **argv) {
    redisAsyncContext *c = redisAsyncConnect("", 6379);
    if (c->err) {
        /* Let *c leak for now... */
        printf("Error: %s\n", c->errstr);
        return 1;
    R = create_reactor();
    redisAttach(R, c);   
    redisAsyncSetConnectCallback(c, connectCallback);
    redisAsyncSetDisconnectCallback(c, disconnectCallback);

    before = current_tick();
    num = (argc > 1) ? atoi(argv[1]) : 1000;

    redisAsyncCommand(c, NULL, NULL, "auth 123456");  
    for (int i = 0; i < num; i++) {
        redisAsyncCommand(c, getCallback, "count", "INCR counter");

    return 0;

Tags: Linux Redis Network Protocol

Posted by depraved on Tue, 18 Oct 2022 18:17:04 +0300