The artic l e refers to the C/C++linux service period advanced architecture system tutorial of <Zero Voice Education> to learn: Server high-level architecture
1. redis network
1.1, redis network
Micro: reactor
- Composition: IO multiplexing + non-blocking IO
- IO Responsibilities: IO Detection and IO Operations
- Event: Asynchronous event processing process, register the event first, and process the event callback in the event loop
Macroscopically: You can ignore other processes and only focus on the packet processing process. When pipes (connections) form a complete package, events are processed.
1.2, redis protocol
redis protocol design
- message boundary: character stream header + delimiter
- Message Type: The first character of the string.
redis uses the RESP serialization protocol, and different parts of the protocol use CRLF(\r\n) to end.
The data type supported by RESP, the data type is judged by the first character
-
+ Simple Strings
+OK\r\n
-
- Errors:
-Error <message>\r\n
-
: Integers
:<Numerical value>\r\n
-
$ Bulk Strings
$<Data length>\r\n<data content>\r\n
-
* Arrays
*<number of elements n>\r\n<element content>...<element n>
How RESP works in the redis request-response protocol
-
The client sends an array of strings ( Array + Bulk Strings) to the redis server
*<Number of parameters>\r\n$<Length of parameter 1>\r\n<data for parameter 1>\r\n...$<parameter n length>\r\n<parameter n The data>\r\n
-
The redis server replies a RESP data type to the client according to the command implementation.
Take a look at the example below
In redis-cli, send a command set key value, the corresponding message is:
*3\r\n$3\r\nset\r\n$3\r\nkey\r\n$5\r\nvalue
If the execution is successful, OK, and the response message is:
+OK\r\n
If the execution fails, the response message is
-ERR unknown command `ket`, with args beginning with: `key`, `value`, \r\n
2,redis pipline
redis pipline is a mechanism provided by the redis client, which has nothing to do with redis itself, and is designed to save network transmission time. Specifically, the client sends multiple requests at a time, and the redis server replies in sequence, similar to http 1.1.
3. redis transaction
Transaction: The user defines a series of database operations, which are regarded as a complete logical processing unit of work, either all executed or not all executed, which are inseparable units of work.
Explore the premise of transactions: in the case of concurrent connections, unpredictable conflicts caused by asynchronous execution of commands by different connections.
3.1. Characteristics of transactions
ACID, Atomicity-Consistency-Isolation-Durability
- Atomicity: Transactions are indivisible, either all succeed or all fail, and a rollback mechanism must be provided if execution fails. The atomicity of atomic operations, only execution or non-execution, it is impossible for other threads to see other states. The atomicity of transactions also includes rollback operations.
- Consistency: Before and after the transaction, all data maintains a consistent state and cannot violate the consistency check of the data. Consistency here refers to expected consistency not post-exception consistency. Type consistency, logical consistency, data consistency (master-slave database consistency)
- Isolation: Concurrent transactions must be isolated from each other. redis is executed in a single thread and is naturally isolated. mysql A connection corresponds to a thread. Critical resources need to be locked in a multithreaded environment.
- Persistence: Once the transaction is committed, the changes to the data are permanent, that is, the data is placed on the disk.
3.2, transaction commands
The redis client starts a transaction with MULTI and sends multiple commands to the queue of the server. The redis server will not execute the commands in the queue until the EXEC command is sent, and execute the queue as a whole.
# open transaction MULTI # commit transaction EXEC # cancel transaction DISCARD # Monitor key changes, call it before the transaction is started, and implement optimistic locking cas. If the key changes during transaction execution, cancel the transaction and return nil. WATCH key
It is not used in actual work, because transaction commands are implemented by optimistic locking, and failures require retry, which will increase the complexity of business logic.
3.3, lua script
redis has a built-in lua interpreter to execute lua scripts, and achieve atomicity through lua scripts. l
Interview: lua scripts satisfy atomicity and isolation, but not consistency and durability.
- Atomicity: Lua scripts are executed by one command, and all the commands in the script are executed together, which is atomic.
- Consistency: If there is no consistency, the execution of the lua script fails, and the command that has been successfully applied to the database cannot be rolled back.
- Isolation: redis executes in a single thread, and lua scripts run as a single packet.
- Persistence: no persistence, only available in aof and appendfsync = always, the actual work will not use this method.
3.3.1. Commands
# test use # execute lua script EVAL script numkeys [key...] arg [arg...] # actual use # Only save 40-bit hash string, reduce the amount of data transmission # 1. Cache the script, cache the script given by the user in the server, and return the SHA1 checksum (40-bit string) corresponding to the script as the result SCRIPT LOAD script # 2. Execute the cached script EVALSHA sha1 numkeys key [key ...] arg [arg ...] # Attachment: Script management command # Check if script is cached SCRIPT EXISTS sha1 [sha1...] # Clear all script caches SCRIPT FLUSH # Force stop a running script, like an infinite loop SCRIPT KILL
3.3.2. Application
- When the project starts, after establishing the redis connection and verifying, load all lua scripts used in the project script load
- If hot update is required in the project, execute script flush through redis-cli. Then all servers can be notified to reload the lua script by subscribing to the publish function
- If the lua script in the project is blocked, you can use script kill to suspend the execution of the currently blocked script
Example: Doubling operation
set mark 1 # test use eval "local val=redis.call('get',KEYS[1]);if val then redis.call('set', KEYS[1], 2*val);return 2*val;end;return 0;" 1 mark (integer) 2 127.0.0.1:6379> eval "local val=redis.call('get',KEYS[1]);if val then redis.call('set', KEYS[1], 2*val);return 2*val;end;return 0;" 1 darren (integer) 0 # actual use # 1. Cache script script load "local val=redis.call('get',KEYS[1]);if val then redis.call('set', KEYS[1], 2*val);return 2*val;end;return 0;" "9da2e1ac090f2e1df67087370de115a4291cd0bd" # 2. Execute the cache script evalsha "9da2e1ac090f2e1df67087370de115a4291cd0bd" 1 mark (integer) 4
4. redia publish and subscribe
In order to support the multicast mechanism of messages, redis introduces the publish-subscribe module, which is a distributed message queue mechanism. Subscribers use a specific channel to receive messages sent by senders to that channel. This mechanism does not guarantee that the message must arrive, and the stream method can be used to ensure that it is reachable.
The existing problems are: the sender sends a message, and if there are no subscribers, the message is discarded directly. If a subscriber disconnects during sending, the message is completely lost to the subscriber during the disconnection. In addition, when redis is stopped and restarted, the messages of pubsub are not persistent, and all messages are directly discarded.
4.1. Commands
# Send message to channel publish channel message # Subscribe to a channel subscribe channel [channel...] # Unsubscribe from a channel unsubscribe [channel...] # subscription model psubscribe pattern [pattern...] # Unsubscribe mode punsubscribe [pattern...] # View information about publish and subscribe PUBSUB CHANNELS [pattern]
4.2. Application
The publish-subscribe function generally needs to reopen a connection, because the command connection strictly follows the request-response mode, and pubsub can receive the content actively pushed by redis. Therefore, if pubsub is supported in the actual project, another connection needs to be opened for processing publish and subscribe.
# A client subscribes to the channel SUBSCRIBE news.shanxi news.henan news.shandong # Another client subscribes to the channel, pattern matching PSUBSCRIBE news.* # Send a message to a channel, all subscribers of the channel receive the message publish news.shanxi 'harmony'
5. redis asynchronous connection
hiredis is a redis C client library function that the server can use to access the redis server.
5.1. Synchronous connection
Synchronous connection is implemented by blocking io, but it will block the current thread until redis returns the result.
Reference documentation: Use of hiredis
For example: access redis, and automatically increment counter 1000 times to count the time.
// gcc redis-test-sync.c -o sync -lhiredis #include <stdio.h> #include <stdlib.h> #include <string.h> #include <time.h> #include <hiredis/hiredis.h> int current_tick() { int t = 0; struct timespec ti; clock_gettime(CLOCK_MONOTONIC, &ti); t = (int)ti.tv_sec * 1000; t += ti.tv_nsec / 1000000; return t; } int main(int argc, char **argv) { unsigned int j, isunix = 0; redisContext *c; redisReply *reply; const char *hostname = "127.0.0.1"; int port = 6379; struct timeval timeout = { 1, 500000 }; // 1.5 seconds c = redisConnectWithTimeout(hostname, port, timeout); if (c == NULL || c->err) { if (c) { printf("Connection error: %s\n", c->errstr); redisFree(c); } else { printf("Connection error: can't allocate redis context\n"); } exit(1); } int num = (argc > 1) ? atoi(argv[1]) : 1000; int before = current_tick(); reply = redisCommand(c, "auth 123456"); freeReplyObject(reply); for (int i = 0; i < num; ++i) { reply = redisCommand(c, "INCR counter"); printf("INCR counter: %lld\n", reply->integer); freeReplyObject(reply); } int used = current_tick() - before; printf("after %d exec redis command, used %d ms\n", num, used); /* Disconnects and frees the context */ redisFree(c); return 0; }
5.2. Asynchronous connection
Asynchronous connections are implemented using non-blocking io and will not block the current thread. The disadvantage is that the code writing is asynchronous and the business logic is fragmented, which can be solved by Ctrip. In the case of a large number of concurrent requests, with the io multi-threading and asynchronous connection pool after redis 6.0, it can better solve the data access performance of the application layer
5.2.1, redis driver
Redis driver: The server uses an asynchronous connection and needs to implement the redis driver by itself, which means that the redis connection needs to be integrated with the reactor in its own project for management.
Then you need to design a redis adapter, whose main functions are:
- Build redis event objects, including: hiredis event objects and reactor event objects.
- Adapt event control and reuse the event loop of reactor in the project.
To sum up, the encapsulation rules of hiredis are:
- Implementation of reactor: All IO is implemented by the user.
- Adapter implementation: hiredis provides event operation interfaces, and users need to adapt these event interfaces.
// The hiredis event interfaces that users need to adapt are: addRead // Add read event delRead // delete read event addWrite // Add write event delWrite // delete write event cleanup // event object release scheduleTimer
5.2.2. Examples
Here the example in 4.1 is implemented using an asynchronous method.
Step 1, implement the redis driver
#ifndef _REACTOR_ #define _REACTOR_ #include <stdio.h> #include <unistd.h> // read write #include <fcntl.h> // fcntl #include <sys/types.h> // listen #include <sys/socket.h> // socket #include <errno.h> // errno #include <arpa/inet.h> // inet_addr htons // #include <netinet/tcp.h> #include <assert.h> // assert #include <sys/epoll.h> #include <stdlib.h> // malloc #include <string.h> // memcpy memmove #include "chainbuffer/buffer.h" // #include "ringbuffer/buffer.h" #define MAX_EVENT_NUM 512 // Maximum number of events per user copy #define MAX_CONN ((1<<16)-1) // Maximum number of event objects: 65535 typedef struct event_s event_t; typedef void (*event_callback_fn)(int fd, int events, void *privdata); typedef void (*error_callback_fn)(int fd, char * err); // reactor object, manage io global variables typedef struct { int epfd; // epfd int listenfd; // monitor fd int stop; // stop loop marker event_t *events; // Store all monitored events (event_t), store on the heap, remember to release int iter; // Used to iterate over events to get unused locations struct epoll_event fire[MAX_EVENT_NUM]; // User mode array, used to copy io events to user mode } reactor_t; // Event object, sockitem, saves the io state corresponding to each fd struct event_s { int fd; // corresponding event fd reactor_t *r; // points to the reactor global object buffer_t in; // read buffer, to be read buffer_t out; // write buffer, to be sent event_callback_fn read_fn; // read callback event_callback_fn write_fn; // write callback error_callback_fn error_fn; // error callback }; int event_buffer_read(event_t *e); int event_buffer_write(event_t *e, void * buf, int sz); // Create the reactor object reactor_t * create_reactor() { // Apply reactor object on the heap reactor_t *r = (reactor_t *)malloc(sizeof(*r)); r->epfd = epoll_create(1); r->listenfd = 0; r->stop = 0; r->iter = 0; // Apply for the events array in reactor on the heap r->events = (event_t*)malloc(sizeof(event_t)*MAX_CONN); memset(r->events, 0, sizeof(event_t)*MAX_CONN); memset(r->fire, 0, sizeof(struct epoll_event) * MAX_EVENT_NUM); // init_timer(); return r; } // release the reactor object void release_reactor(reactor_t * r) { free(r->events); // Release the events that the reactor has applied for on the heap close(r->epfd); // close epoll free(r); // release the reactor } // Get free event objects from reactor's event heap event_t * _get_event_t(reactor_t *r) { r->iter ++; // Find unused event objects while (r->events[r->iter & MAX_CONN].fd > 0) { r->iter++; } return &r->events[r->iter]; } // event-based actions // 1. Create an event object event_t * new_event(reactor_t *R, int fd, event_callback_fn rd, event_callback_fn wt, error_callback_fn err) { assert(rd != 0 || wt != 0 || err != 0); // Get free event object event_t *e = _get_event_t(R); // Initialize the event object e->r = R; e->fd = fd; buffer_init(&e->in, 1024*16); buffer_init(&e->out, 1024*16); e->read_fn = rd; e->write_fn = wt; e->error_fn = err; return e; } // 2. Add events int add_event(reactor_t *R, int events, event_t *e) { struct epoll_event ev; ev.events = events; ev.data.ptr = e; if (epoll_ctl(R->epfd, EPOLL_CTL_ADD, e->fd, &ev) == -1) { printf("add event err fd = %d\n", e->fd); return 1; } return 0; } // Free up event space void free_event(event_t *e) { buffer_free(&e->in); buffer_free(&e->out); } // 3. Delete event int del_event(reactor_t *R, event_t *e) { epoll_ctl(R->epfd, EPOLL_CTL_DEL, e->fd, NULL); free_event(e); return 0; } // 4. Modify the event, which is determined by the following two parameters to be a read event or a write event int enable_event(reactor_t *R, event_t *e, int readable, int writeable) { struct epoll_event ev; ev.events = (readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0); ev.data.ptr = e; if (epoll_ctl(R->epfd, EPOLL_CTL_MOD, e->fd, &ev) == -1) { return 1; } return 0; } // one event loop void eventloop_once(reactor_t * r, int timeout) { int n = epoll_wait(r->epfd, r->fire, MAX_EVENT_NUM, timeout); for (int i = 0; i < n; ++i) { struct epoll_event *e = &r->fire[i]; // get event int mask = e->events; // Get event type // Capture specific error messages with io functions if (e->events & EPOLLERR) mask |= EPOLLIN | EPOLLOUT; // Use io function to capture specific information of disconnection if (e->events & EPOLLHUP) mask |= EPOLLIN | EPOLLOUT; event_t *et = (event_t*) e->data.ptr; // Get user data associated with an event // Handling read events if (mask & EPOLLIN) { if (et->read_fn) { et->read_fn(et->fd, EPOLLIN, et); // execute read callback } } // Handling write events if (mask & EPOLLOUT) { if (et->write_fn) { et->write_fn(et->fd, EPOLLOUT, et); // execute write callback } else { uint8_t *buf = buffer_write_atmost(&et->out); event_buffer_write(et, buf, buffer_len(&et->out)); } } } } // stop the event loop void stop_eventloop(reactor_t * r) { r->stop = 1; } // event loop void eventloop(reactor_t * r) { while (!r->stop) { // int timeout = find_nearest_expire_timer(); eventloop_once(r, /*timeout*/ -1); // expire_timer(); } } // set non-blocking fd int set_nonblock(int fd) { int flag = fcntl(fd, F_GETFL, 0); return fcntl(fd, F_SETFL, flag | O_NONBLOCK); } // Create server int create_server(reactor_t *R, short port, event_callback_fn func) { // 1,socket int listenfd = socket(AF_INET, SOCK_STREAM, 0); if (listenfd < 0) { printf("create listenfd error!\n"); return -1; } struct sockaddr_in addr; memset(&addr, 0, sizeof(struct sockaddr_in)); addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = INADDR_ANY; // Set address is reusable int reuse = 1; if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(int)) == -1) { printf("reuse address error: %s\n", strerror(errno)); return -1; } // 2,bind if (bind(listenfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) { printf("bind error %s\n", strerror(errno)); return -1; } // 3,listen if (listen(listenfd, 5) < 0) { printf("listen error %s\n", strerror(errno)); return -1; } // Set listenfd non-blocking if (set_nonblock(listenfd) < 0) { printf("set_nonblock error %s\n", strerror(errno)); return -1; } R->listenfd = listenfd; // Register for read events event_t *e = new_event(R, listenfd, func, 0, 0); add_event(R, EPOLLIN, e); printf("listen port : %d\n", port); return 0; } // read data int event_buffer_read(event_t *e) { int fd = e->fd; int num = 0; // The total amount of data read while (1) { // TODO: dont use char buf[] here char buf[1024] = {0}; int n = read(fd, buf, 1024); // 1. read=0, the server receives the FIN packet, half-closed state // Todo: half-closed state logic processing, refer to skynet if (n == 0) { // printf("close connection fd = %d\n", fd); if (e->error_fn) { e->error_fn(fd, "close socket"); } del_event(e->r, e); close(fd); return 0; } // 2. read=-1, read exception else if (n < 0) { // 2.1, EINTR: interrupt, retry if (errno == EINTR) { continue; } // 2.2, EWOULDBLOCK: blocking, the read buffer is empty if (errno == EWOULDBLOCK) { break; } // Other errors, execute the error callback, delete the event, close the current connection printf("read error fd = %d err = %s\n", fd, strerror(errno)); if (e->error_fn) e->error_fn(fd, strerror(errno)); del_event(e->r, e); close(fd); return 0; } // 3. read>0, normal, read data, process business logic else { printf("recv data from client:%s", buf); buffer_add(&e->in, buf, n); } num += n; } return num; } // Send data to peer int _write_socket(event_t *e, void * buf, int sz) { int fd = e->fd; while (1) { int n = write(fd, buf, sz); // 1. write=-1, write exception if (n < 0) { // 2.1, EINTR: interrupt, retry if (errno == EINTR) { continue; } // 2.2, EWOULDBLOCK: blocking, need to register write events if (errno == EWOULDBLOCK) { break; } // Other errors, execute the error callback, delete the event, close the current connection if (e->error_fn) { e->error_fn(fd, strerror(errno)); } del_event(e->r, e); close(e->fd); } return n; } return 0; } // write data int event_buffer_write(event_t *e, void * buf, int sz) { // Point to user write buffer buffer_t *r = &e->out; // 1. User write buffer is full, start sending if (buffer_len(r) == 0) { // Send data to peer int n = _write_socket(e, buf, sz); // 1.1. The data has not been sent this time, the unsent data is written into the buffer, and the write event is registered if (n == 0 || n < sz) { // 1.1. Write the data that has not been sent into the buffer buffer_add(&e->out, (char *)buf + n, sz - n); // 1.2, register the write event, wait for the next event to trigger and then send enable_event(e->r, e, 1, 1); return 0; } // 1.2. No data is sent this time else if (n < 0) { return 0; } // 1.3. This data transmission is completed return 1; } // 2. The user write buffer is not full, write to the buffer and wait for sending buffer_add(&e->out, (char *)buf, sz); return 1; } #endif
The second step is to implement the redis adapter, mainly to build the redis event object and adapt the event control interface of hiredis.
// adapter_async.h #ifndef _MARK_ADAPTER_ #define _MARK_ADAPTER_ #include <hiredis/hiredis.h> #include <hiredis/alloc.h> #include "reactor.h" // redis event object typedef struct { event_t e; // reactor event object int mask; // Store registered events redisAsyncContext *ctx; // hiredis event object } redis_event_t; // redis object read event callback static void redisReadHandler(int fd, int events, void *privdata) { ((void)fd); ((void)events); event_t *e = (event_t*)privdata; redis_event_t *re = (redis_event_t *)(char *)e; redisAsyncHandleRead(re->ctx); } // redis object write event read callback static void redisWriteHandler(int fd, int events, void *privdata) { ((void)fd); ((void)events); event_t *e = (event_t*)privdata; redis_event_t *re = (redis_event_t *)(char *)e; redisAsyncHandleWrite(re->ctx); } /** * @brief Update the event object managed by reactor * @param privdata redis event object * @param flag epoll event type to set * @param remove 1 delete this event 0 add this event */ static void redisEventUpdate(void *privdata, int flag, int remove) { redis_event_t *re = (redis_event_t *)privdata; reactor_t *r = re->e.r; int prevMask = re->mask; int enable = 0; // The redis event object deletes the event if (remove) { if ((re->mask & flag) == 0) { return; } re->mask &= ~flag; enable = 0; } // add the event to the redis event object else { if (re->mask & flag) { return; } re->mask |= flag; enable = 1; } // Handling the reactor event object // 1. The reactor event object deletes the event if (re->mask == 0) { del_event(r, &re->e); } // 2. The reactor event object adds the event (for the first time) else if (prevMask == 0) { add_event(r, re->mask, &re->e); } // 3. The reactor event object modifies the event else { // Register for read events if (flag & EPOLLIN) { enable_event(r, &re->e, enable, 0); } // Register for write events else if (flag & EPOLLOUT) { enable_event(r, &re->e, 0, enable); } } } // The hiredis event interface that needs to be adapted // 1. Add read event to redis event object static void redisAddRead(void *privdata) { redis_event_t *re = (redis_event_t *)privdata; re->e.read_fn = redisReadHandler; redisEventUpdate(privdata, EPOLLIN, 0); } // 2. The redis event object deletes the read event static void redisDelRead(void *privdata) { redis_event_t *re = (redis_event_t *)privdata; re->e.read_fn = 0; redisEventUpdate(privdata, EPOLLIN, 1); } // 3. Add write event to redis event object static void redisAddWrite(void *privdata) { redis_event_t *re = (redis_event_t *)privdata; re->e.write_fn = redisWriteHandler; redisEventUpdate(privdata, EPOLLOUT, 0); } // 4. redis event object delete write event static void redisDelWrite(void *privdata) { redis_event_t *re = (redis_event_t *)privdata; re->e.write_fn = 0; redisEventUpdate(privdata, EPOLLOUT, 1); } // 5. redis event object release static void redisCleanup(void *privdata) { redis_event_t *re = (redis_event_t *)privdata; reactor_t *r = re->e.r; del_event(r, &re->e); hi_free(re); } // Redis event object binding, reactor object and redis async context static int redisAttach(reactor_t *r, redisAsyncContext *ac) { redisContext *c = &(ac->c); // redis synchronization context redis_event_t *re; // redis event object /* Nothing should be attached when something is already attached */ if (ac->ev.data != NULL) return REDIS_ERR; /* Create container for ctx and r/w events */ re = (redis_event_t*)hi_malloc(sizeof(*re)); if (re == NULL) { return REDIS_ERR; } // The redis event object binds the reactor object and the redis asynchronous context re->ctx = ac; // Bind the redis async context re->e.fd = c->fd; // bind redis fd re->e.r = r; // bind reacotr re->mask = 0; // bind event // redis asynchronous context settings, need to adapt event control // hiredis provides an event interface, and the user implements the event interface ac->ev.addRead = redisAddRead; ac->ev.delRead = redisDelRead; ac->ev.addWrite = redisAddWrite; ac->ev.delWrite = redisDelWrite; ac->ev.cleanup = redisCleanup; ac->ev.data = re; return REDIS_OK; } #endif
Next, implement the main code and implement the function
// redis-test-async.c // gcc redis-test-async.c chainbuffer/buffer.c -o async -lhiredis #include <hiredis/hiredis.h> #include <hiredis/async.h> #include <time.h> #include "reactor.h" #include "adapter_async.h" static reactor_t *R; static int cnt, before, num; int current_tick() { int t = 0; struct timespec ti; clock_gettime(CLOCK_MONOTONIC, &ti); t = (int)ti.tv_sec * 1000; t += ti.tv_nsec / 1000000; return t; } void getCallback(redisAsyncContext *c, void *r, void *privdata) { redisReply *reply = r; if (reply == NULL) return; printf("argv[%s]: %lld\n", (char*)privdata, reply->integer); /* Disconnect after receiving the reply to GET */ cnt++; if (cnt == num) { int used = current_tick()-before; printf("after %d exec redis command, used %d ms\n", num, used); redisAsyncDisconnect(c); } } void connectCallback(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { printf("Error: %s\n", c->errstr); stop_eventloop(R); return; } printf("Connected...\n"); } void disconnectCallback(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { printf("Error: %s\n", c->errstr); stop_eventloop(R); return; } printf("Disconnected...\n"); stop_eventloop(R); } int main(int argc, char **argv) { redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379); if (c->err) { /* Let *c leak for now... */ printf("Error: %s\n", c->errstr); return 1; } R = create_reactor(); redisAttach(R, c); redisAsyncSetConnectCallback(c, connectCallback); redisAsyncSetDisconnectCallback(c, disconnectCallback); before = current_tick(); num = (argc > 1) ? atoi(argv[1]) : 1000; redisAsyncCommand(c, NULL, NULL, "auth 123456"); for (int i = 0; i < num; i++) { redisAsyncCommand(c, getCallback, "count", "INCR counter"); } eventloop(R); release_reactor(R); return 0; }