foreword
This article continues the previous article 2.1.1 Network io and select, poll, epoll , using epoll and reactor to achieve server million concurrency.
Why do you need to implement a sock_item?
- Each fd corresponds to a sock_item
- sock_item contains rbuffer, wbuffer, rlength, wlength, events, callback, which enables each fd to have an independent buffer
memory allocation
After calloc() dynamically allocates memory, it automatically initializes the memory space to zero, while malloc() does not initialize it, and the data inside is random garbage data. Therefore, after using malloc(), you need to call memset() for initialization, but using calloc() does not.
Million concurrent attempts 1
#include <stdio.h> #include <sys/socket.h> #include <sys/epoll.h> #include <sys/types.h> #include <netinet/in.h> // struct sockaddr_in #include <fcntl.h> #include <unistd.h> // close() function #include <string.h> // memcpy() function #include <stdlib.h> #define NONBLOCK 0 #define BUFFER_LENGTH 128 #define EVENTS_LENGTH 128 // listenfd, clientfd struct sock_item // conn_item { int fd; // clientfd char* rbuffer; int rlength; char* wbuffer; int wlength; int event; void (*recv_cb)(int fd, char* buffer, int length); void (*send_cb)(int fd, char* buffer, int length); void (*accept_cb)(int fd, char* buffer, int length); }; struct reactor { int epfd; // epoll struct sock_item* items; }; int main() { int listenfd = socket(AF_INET, SOCK_STREAM, 0); // The first parameter domain is the protocol cluster // (AF_UNIX: native communication; AF_INET:TCP/IP-IPv4; AF_INET6:TCP/IP-IPv6) // The second parameter type is the socket type. Common types are: // (SOCK_STREAM:TCP stream; SOCK_DGRAM:UDP datagram; SOCK_RAW: raw socket) // The third parameter protocol, generally set to 0 // ( // When the protocol family and type used by the socket are determined, the value of this parameter is 0; // Sometimes when creating a raw socket, without knowing the protocol family and type to use, the protocol parameter can determine the type of protocol // ) // The socket function successfully returns a value of type int, starting from 3 and increasing in order (0 corresponds to stdin, 1 corresponds to stdout, and 2 corresponds to stderr) // Returns "-1" on failure, and the error code is written to "errno" if(listenfd == -1) return -1; struct sockaddr_in servaddr; servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // INADDR_ANY is 0.0.0.0 // 127.0.0.1 for native communication (loopback address) // 192.168.0.123 specific external communication // 0.0.0.0 Any can communicate (including local and external) // htonl (host to net long) // Converts a variable of type long from host byte order to network byte order // The network byte order is a data representation format specified in TCP/IP, which has nothing to do with the specific CPU type, operating system, etc. // This ensures that data can be correctly interpreted when transmitted between different hosts, // Network byte order adopts big-endian (big endian) ordering. servaddr.sin_port = htons(9999); // htons (host to net short) if (bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)) == -1) { return -2; } #if NONBLOCK int flag = fcntl(listenfd, F_GETFL, 0); // Get the flag of listenfd (F_GETFL:get flag) flag |= O_NONBLOCK; // Use bitwise OR to set flag to non-blocking fcntl(listenfd, F_SETFL, flag); // Set the listenfd flag (F_SETFL:set flag) #endif listen(listenfd, 10); struct reactor* r = (struct reactor*)calloc(1, sizeof(struct reactor)); if(r == NULL) return -3; r->items = (struct sock_item*)calloc(EVENTS_LENGTH, sizeof(struct sock_item)); if(r->items == NULL) return -4; r->epfd = epoll_create(1); // r->epfd value is 4 // Create a new epoll descriptor // The size parameter only needs to be a number greater than 0. This parameter is left over from history and is meaningless now. struct epoll_event ev, events[EVENTS_LENGTH]; ev.events = EPOLLIN; ev.data.fd = listenfd; epoll_ctl(r->epfd, EPOLL_CTL_ADD, listenfd, &ev); // Operate on the epoll descriptor (add or delete all connections to be monitored, etc.) // The first parameter is epfd: epoll descriptor // The second parameter is op: specify the operation type // ( // EPOLL_CTL_ADD Register events on fd in the event table // EPOLL_CTL_DEL delete registration event on fd // EPOLL_CTL_MOD modifies the registration event on fd // ) // The third parameter is fd: the file descriptor to be operated on // The fourth parameter is event: the specified event, which is the type of the epoll_event structure pointer // ( // epoll_event definition: // events: describes the event type, which is basically the same as the event type supported by poll (two additional events: EPOLLET and EPOLLONESHOT, the key to efficient operation) // data member: store user data // ) while(1) { int nready = epoll_wait(r->epfd, events, EVENTS_LENGTH, -1); // The first parameter is epfd: epoll descriptor // The second parameter events: Detect events, copy all ready events from the kernel event table to the array pointed to by its second parameter events // The third parameter maxevents: specifies how many events to listen to at most // The fourth parameter timeout: Specify the timeout time of epoll, in milliseconds // (When the timeout is -1, the epoll_wait call will block forever until some event occurs. When the timeout is 0, the epoll_wait call will return immediately) // Return value: Returns the number of ready file descriptors on success, returns -1 on failure and sets errno printf("---------- %d\n", nready); int i = 0; for(i = 0;i < nready;i++) { int clientfd = events[i].data.fd; if(clientfd == listenfd) // If the fd that triggers the event is listenfd, it means that there is a client connection request, and execute accpet() { struct sockaddr_in client; socklen_t len = sizeof(client); int connfd = accept(listenfd, (struct sockaddr*)&client, &len); printf("accept: %d\n",connfd); ev.events = EPOLLIN; ev.data.fd = connfd; epoll_ctl(r->epfd, EPOLL_CTL_ADD, connfd, &ev); // Initialize sock_item corresponding to connfd r->items[connfd].fd = connfd; r->items[connfd].rbuffer = calloc(1, BUFFER_LENGTH); r->items[connfd].rlength = 0; r->items[connfd].wbuffer = calloc(1, BUFFER_LENGTH); r->items[connfd].wlength = 0; r->items[connfd].event = EPOLLIN; } else if(events[i].events & EPOLLIN) // clientfd { char* rbuffer = r->items[clientfd].rbuffer; char* wbuffer = r->items[clientfd].wbuffer; int n = recv(clientfd, rbuffer, BUFFER_LENGTH, 0); // The default is horizontal trigger (LT), that is, the data sent at one time, if it cannot receive all the data at one time, it will continue to receive multiple times until the sent data is received. // Set ev.events to EPOLLIN | EPOLLET, can be changed to edge-triggered (ET) // Edge-triggered (ET): It will only be received once. If the sent data cannot be received, the remaining data will be left in the buffer of the kernel protocol stack. When receiving the next time, it will start to receive the remaining data. // Small data tends to LT (horizontal trigger), big data tends to ET (edge trigger) // The horizontal trigger can receive all data at one time, while the edge trigger needs to go through the loop to receive all the data (assuming that the data cannot be received at one time) if(n > 0) { //rbuffer[n] = '\0'; printf("recv : %s\n", rbuffer); memcpy(wbuffer, rbuffer, BUFFER_LENGTH); ev.events = EPOLLOUT; ev.data.fd = clientfd; epoll_ctl(r->epfd, EPOLL_CTL_MOD, clientfd, &ev); } else if(n == 0) // When the client is disconnected, the server's close() function needs to be called { free(rbuffer); free(wbuffer); r->items[clientfd].fd = 0; close(clientfd); //events[i].data.fd = -1; } } else if(events[i].events & EPOLLOUT) // Determine whether IO is writable before each send, and send when it is writable // When the return value of send is less than the buffer length, it means that there is still data that has not been sent, and the remaining data needs to be sent again when the IO can be written. { char* wbuffer = r->items[clientfd].wbuffer; int sent = send(clientfd, wbuffer, BUFFER_LENGTH, 0); printf("sent : %d\n", sent); ev.events = EPOLLIN; ev.data.fd = clientfd; epoll_ctl(r->epfd, EPOLL_CTL_MOD, clientfd, &ev); } } } }
Current issue:
- Unable to meet the needs of millions of concurrent
Solution:
- Change the data structure from an array to a linked list plus an array, and further encapsulate the reactor
Linked list: suitable for finding ordered and regular data
Million concurrent attempts 2
Modify the code as follows:
#include <stdio.h> #include <sys/socket.h> #include <sys/epoll.h> #include <sys/types.h> #include <netinet/in.h> // struct sockaddr_in #include <fcntl.h> #include <unistd.h> // close() function #include <string.h> // memcpy() function #include <stdlib.h> #define BUFFER_LENGTH 128 #define EVENTS_LENGTH 128 #define ITEM_LENGTH 1024 // listenfd, clientfd struct sock_item // conn_item { int fd; // clientfd char* rbuffer; int rlength; char* wbuffer; int wlength; int event; void (*recv_cb)(int fd, char* buffer, int length); void (*send_cb)(int fd, char* buffer, int length); void (*accept_cb)(int fd, char* buffer, int length); }; struct eventblock { struct sock_item* items; struct eventblock* next; }; struct reactor { int epfd; // epoll int blkcnt; struct eventblock* evblk; }; int reactor_resize(struct reactor* r) // new eventblock { if(r == NULL) return -1; struct eventblock* blk = r->evblk; while(blk!=NULL && blk->next!=NULL) { blk = blk->next; } struct sock_item* item = (struct sock_item*)malloc(ITEM_LENGTH * sizeof(struct sock_item)); if(item == NULL) return -4; memset(item, 0, ITEM_LENGTH* sizeof(struct sock_item)); // Initialize allocated memory printf("-------------\n"); struct eventblock* block = (struct eventblock*)malloc(sizeof(struct eventblock)); if(block == NULL) { free(item); return -5; } memset(block, 0, sizeof(struct eventblock)); block->items = item; block->next = NULL; if(blk == NULL) { r->evblk = block; } else { blk->next = block; } ++r->blkcnt; return 0; } struct sock_item* reactor_lookup(struct reactor* r, int sockfd) { if(r == NULL) return NULL; //if(r->evblk == NULL) return NULL; if(sockfd <= 0) return NULL; printf("reactor_lookup --> %d\n", r->blkcnt); int blkidx = sockfd / ITEM_LENGTH; while(blkidx >= r->blkcnt) { reactor_resize(r); } int i = 0; struct eventblock* blk = r->evblk; while(i++ < blkidx && blk != NULL) { blk = blk->next; } return &blk->items[sockfd % ITEM_LENGTH]; } int main() { int listenfd = socket(AF_INET, SOCK_STREAM, 0); // The first parameter domain is the protocol cluster // (AF_UNIX: native communication; AF_INET:TCP/IP-IPv4; AF_INET6:TCP/IP-IPv6) // The second parameter type is the socket type. Common types are: // (SOCK_STREAM:TCP stream; SOCK_DGRAM:UDP datagram; SOCK_RAW: raw socket) // The third parameter protocol, generally set to 0 // ( // When the protocol family and type used by the socket are determined, the value of this parameter is 0; // Sometimes when creating a raw socket, without knowing the protocol family and type to use, the protocol parameter can determine the type of protocol // ) // The socket function successfully returns a value of type int, starting from 3 and increasing in order (0 corresponds to stdin, 1 corresponds to stdout, and 2 corresponds to stderr) // Returns "-1" on failure, and the error code is written to "errno" if(listenfd == -1) return -1; struct sockaddr_in servaddr; servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // INADDR_ANY is 0.0.0.0 // 127.0.0.1 for native communication (loopback address) // 192.168.0.123 specific external communication // 0.0.0.0 Any can communicate (including local and external) // htonl (host to net long) // Converts a variable of type long from host byte order to network byte order // The network byte order is a data representation format specified in TCP/IP, which has nothing to do with the specific CPU type, operating system, etc. // This ensures that data can be correctly interpreted when transmitted between different hosts, // Network byte order adopts big-endian (big endian) ordering. servaddr.sin_port = htons(9999); // htons (host to net short) if (bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)) == -1) { return -2; } listen(listenfd, 10); struct reactor* r = (struct reactor*)calloc(1, sizeof(struct reactor)); if(r == NULL) { return -3; } r->epfd = epoll_create(1); // r->epfd value is 4 // Create a new epoll descriptor // The size parameter only needs to be a number greater than 0. This parameter is left over from history and is meaningless now. struct epoll_event ev, events[EVENTS_LENGTH]; ev.events = EPOLLIN; ev.data.fd = listenfd; epoll_ctl(r->epfd, EPOLL_CTL_ADD, listenfd, &ev); // Operate on the epoll descriptor (add or delete all connections to be monitored, etc.) // The first parameter is epfd: epoll descriptor // The second parameter is op: specify the operation type // ( // EPOLL_CTL_ADD Register events on fd in the event table // EPOLL_CTL_DEL delete registration event on fd // EPOLL_CTL_MOD modifies the registration event on fd // ) // The third parameter is fd: the file descriptor to be operated on // The fourth parameter is event: the specified event, which is the type of the epoll_event structure pointer // ( // epoll_event definition: // events: describes the event type, which is basically the same as the event type supported by poll (two additional events: EPOLLET and EPOLLONESHOT, the key to efficient operation) // data member: store user data // ) while(1) { int nready = epoll_wait(r->epfd, events, EVENTS_LENGTH, -1); // The first parameter is epfd: epoll descriptor // The second parameter events: Detect events, copy all ready events from the kernel event table to the array pointed to by its second parameter events // The third parameter maxevents: specifies how many events to listen to at most // The fourth parameter timeout: Specify the timeout time of epoll, in milliseconds // (When the timeout is -1, the epoll_wait call will block forever until some event occurs. When the timeout is 0, the epoll_wait call will return immediately) // Return value: Returns the number of ready file descriptors on success, returns -1 on failure and sets errno printf("---------- %d\n", nready); int i = 0; for(i = 0;i < nready;i++) { int clientfd = events[i].data.fd; if(clientfd == listenfd) // If the fd that triggers the event is listenfd, it means that there is a client connection request, and execute accpet() { struct sockaddr_in client; socklen_t len = sizeof(client); int connfd = accept(listenfd, (struct sockaddr*)&client, &len); printf("accept: %d\n",connfd); ev.events = EPOLLIN; ev.data.fd = connfd; epoll_ctl(r->epfd, EPOLL_CTL_ADD, connfd, &ev); // Use the reactor_lookup() function to find the sock_item corresponding to fd and initialize it struct sock_item* item = reactor_lookup(r, connfd); item->fd = connfd; item->rbuffer = calloc(1, BUFFER_LENGTH); item->rlength = 0; item->wbuffer = calloc(1, BUFFER_LENGTH); item->wlength = 0; } else if(events[i].events & EPOLLIN) // clientfd { struct sock_item* item = reactor_lookup(r, clientfd); char* rbuffer = item->rbuffer; char* wbuffer = item->wbuffer; int n = recv(clientfd, rbuffer, BUFFER_LENGTH, 0); // The default is horizontal trigger (LT), that is, the data sent at one time, if it cannot receive all the data at one time, it will continue to receive multiple times until the sent data is received. // Set ev.events to EPOLLIN | EPOLLET, can be changed to edge-triggered (ET) // Edge-triggered (ET): It will only be received once. If the sent data cannot be received, the remaining data will be left in the buffer of the kernel protocol stack. When receiving the next time, it will start to receive the remaining data. // Small data tends to LT (horizontal trigger), big data tends to ET (edge trigger) // The horizontal trigger can receive all data at one time, while the edge trigger needs to go through the loop to receive all the data (assuming that the data cannot be received at one time) if(n > 0) { //rbuffer[n] = '\0'; printf("recv : %s\n", rbuffer); memcpy(wbuffer, rbuffer, BUFFER_LENGTH); ev.events = EPOLLOUT; ev.data.fd = clientfd; epoll_ctl(r->epfd, EPOLL_CTL_MOD, clientfd, &ev); } else if(n == 0) // When the client is disconnected, the server's close() function needs to be called { free(rbuffer); free(wbuffer); item->fd = 0; close(clientfd); //events[i].data.fd = -1; } } else if(events[i].events & EPOLLOUT) // Determine whether IO is writable before each send, and send when it is writable // When the return value of send is less than the buffer length, it means that there is still data that has not been sent, and the remaining data needs to be sent again when the IO can be written. { struct sock_item* item = reactor_lookup(r, clientfd); char* wbuffer = item->wbuffer; int sent = send(clientfd, wbuffer, BUFFER_LENGTH, 0); printf("sent : %d\n", sent); ev.events = EPOLLIN; ev.data.fd = clientfd; epoll_ctl(r->epfd, EPOLL_CTL_MOD, clientfd, &ev); } } } }
We use send and recv to operate on fd. The bottom layer of the operation is a quintuple (sip,sport,dip,dport,proto) that is (source ip, source port, destination ip, destination port, protocol), and the quintuple determines a the only connection.
In order to reach millions of connections, the solution for insufficient source ports (for the client):
- Increase source ip (add network card, bind different ip addresses, use multi-process)
- add destination ip
- Add destination port
The source port range of the client is 0-65535, and the maximum value is a fixed value that cannot be changed.
Here, the solution of increasing the destination port is adopted, that is, increasing the server port.
Million concurrent attempts 3
Modify the code as follows:
#include <stdio.h> #include <sys/socket.h> #include <sys/epoll.h> #include <sys/types.h> #include <netinet/in.h> // struct sockaddr_in #include <fcntl.h> #include <unistd.h> // close() function #include <string.h> // memcpy() function #include <stdlib.h> #define BUFFER_LENGTH 128 #define EVENTS_LENGTH 128 #define PORT_COUNT 100 #define ITEM_LENGTH 1024 // listenfd, clientfd struct sock_item // conn_item { int fd; // clientfd char* rbuffer; int rlength; char* wbuffer; int wlength; int event; void (*recv_cb)(int fd, char* buffer, int length); void (*send_cb)(int fd, char* buffer, int length); void (*accept_cb)(int fd, char* buffer, int length); }; struct eventblock { struct sock_item* items; // Array with ITEM_LENGTH sock_item struct eventblock* next; }; struct reactor { int epfd; // epoll int blkcnt; struct eventblock* evblk; // linked list }; int reactor_resize(struct reactor* r) // new eventblock { if(r == NULL) return -1; struct eventblock* blk = r->evblk; while(blk!=NULL && blk->next!=NULL) { blk = blk->next; } struct sock_item* items = (struct sock_item*)malloc(ITEM_LENGTH * sizeof(struct sock_item)); if(items == NULL) return -4; memset(items, 0, ITEM_LENGTH * sizeof(struct sock_item)); // Initialize allocated memory //printf("-------------\n"); struct eventblock* block = (struct eventblock*)malloc(sizeof(struct eventblock)); if(block == NULL) { free(items); return -5; } memset(block, 0, sizeof(struct eventblock)); block->items = items; block->next = NULL; if(blk == NULL) { r->evblk = block; } else { blk->next = block; } ++r->blkcnt; return 0; } struct sock_item* reactor_lookup(struct reactor* r, int sockfd) { if(r == NULL) return NULL; //if(r->evblk == NULL) return NULL; if(sockfd <= 0) return NULL; //printf("reactor_lookup --> %d\n", r->blkcnt); int blkidx = sockfd / ITEM_LENGTH; while(blkidx >= r->blkcnt) // Because sockfd can't be increased suddenly, the loop is executed only once, you can also use if { reactor_resize(r); } int i = 0; struct eventblock* blk = r->evblk; while(i++ < blkidx && blk != NULL) { blk = blk->next; } return &blk->items[sockfd % ITEM_LENGTH]; } int init_server(short port) { int listenfd = socket(AF_INET, SOCK_STREAM, 0); // The first parameter domain is the protocol cluster // (AF_UNIX: native communication; AF_INET:TCP/IP-IPv4; AF_INET6:TCP/IP-IPv6) // The second parameter type is the socket type. Common types are: // (SOCK_STREAM:TCP stream; SOCK_DGRAM:UDP datagram; SOCK_RAW: raw socket) // The third parameter protocol, generally set to 0 // ( // When the protocol family and type used by the socket are determined, the value of this parameter is 0; // Sometimes when creating a raw socket, without knowing the protocol family and type to use, the protocol parameter can determine the type of protocol // ) // The socket function successfully returns a value of type int, starting from 3 and increasing in order (0 corresponds to stdin, 1 corresponds to stdout, and 2 corresponds to stderr) // Returns "-1" on failure, and the error code is written to "errno" if(listenfd == -1) return -1; struct sockaddr_in servaddr; servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // INADDR_ANY is 0.0.0.0 // 127.0.0.1 for native communication (loopback address) // 192.168.0.123 specific external communication // 0.0.0.0 Any can communicate (including local and external) // htonl (host to net long) // Converts a variable of type long from host byte order to network byte order // The network byte order is a data representation format specified in TCP/IP, which has nothing to do with the specific CPU type, operating system, etc. // This ensures that data can be correctly interpreted when transmitted between different hosts, // Network byte order adopts big-endian (big endian) ordering. servaddr.sin_port = htons(port); // htons (host to net short) if (bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)) == -1) // Bind the local ip and port, as well as the protocol { return -2; } listen(listenfd, 10); return listenfd; } int is_listenfd(int *fds, int connfd) { int i=0; for(i=0;i<PORT_COUNT;i++) { if(fds[i] == connfd) { return 1; } } return 0; } int main() { struct reactor* r = (struct reactor*)calloc(1, sizeof(struct reactor)); if(r == NULL) { return -3; } r->epfd = epoll_create(1); // r->epfd value is 4 // Create a new epoll descriptor // The size parameter only needs to be a number greater than 0. This parameter is left over from history and is meaningless now. struct epoll_event ev, events[EVENTS_LENGTH]; int sockfds[PORT_COUNT] = {0}; int i = 0; // Create multiple listenfd, increase server port for(i=0;i<PORT_COUNT;i++) { sockfds[i] = init_server(9999+i); ev.events = EPOLLIN; ev.data.fd = sockfds[i]; epoll_ctl(r->epfd, EPOLL_CTL_ADD, sockfds[i], &ev); // Operate on the epoll descriptor (add or delete all connections to be monitored, etc.) // The first parameter is epfd: epoll descriptor // The second parameter is op: specify the operation type // ( // EPOLL_CTL_ADD Register events on fd in the event table // EPOLL_CTL_DEL delete registration event on fd // EPOLL_CTL_MOD modifies the registration event on fd // ) // The third parameter is fd: the file descriptor to be operated on // The fourth parameter is event: the specified event, which is the type of the epoll_event structure pointer // ( // epoll_event definition: // events: describes the event type, which is basically the same as the event type supported by poll (two additional events: EPOLLET and EPOLLONESHOT, the key to efficient operation) // data member: store user data // ) } while(1) { int nready = epoll_wait(r->epfd, events, EVENTS_LENGTH, -1); // The first parameter is epfd: epoll descriptor // The second parameter events: Detect events, copy all ready events from the kernel event table to the array pointed to by its second parameter events // The third parameter maxevents: specifies how many events to listen to at most // The fourth parameter timeout: Specify the timeout time of epoll, in milliseconds // (When the timeout is -1, the epoll_wait call will block forever until some event occurs. When the timeout is 0, the epoll_wait call will return immediately) // Return value: Returns the number of ready file descriptors on success, returns -1 on failure and sets errno //printf("---------- %d\n", nready); int i = 0; for(i = 0;i < nready;i++) { int clientfd = events[i].data.fd; if(is_listenfd(sockfds, clientfd)) // If the fd that triggers the event is listenfd, it means that there is a client connection request, and execute accpet() { struct sockaddr_in client; socklen_t len = sizeof(client); int connfd = accept(clientfd, (struct sockaddr*)&client, &len); if(connfd % 1000 == 999) { printf("accept: %d\n",connfd); } ev.events = EPOLLIN; ev.data.fd = connfd; epoll_ctl(r->epfd, EPOLL_CTL_ADD, connfd, &ev); // Use the reactor_lookup() function to find the sock_item corresponding to fd and initialize it struct sock_item* item = reactor_lookup(r, connfd); item->fd = connfd; item->rbuffer = calloc(1, BUFFER_LENGTH); item->rlength = 0; item->wbuffer = calloc(1, BUFFER_LENGTH); item->wlength = 0; } else if(events[i].events & EPOLLIN) // clientfd { struct sock_item* item = reactor_lookup(r, clientfd); char* rbuffer = item->rbuffer; char* wbuffer = item->wbuffer; int n = recv(clientfd, rbuffer, BUFFER_LENGTH, 0); // The default is horizontal trigger (LT), that is, the data sent at one time, if it cannot receive all the data at one time, it will continue to receive multiple times until the sent data is received. // Set ev.events to EPOLLIN | EPOLLET, can be changed to edge-triggered (ET) // Edge-triggered (ET): It will only be received once. If the sent data cannot be received, the remaining data will be left in the buffer of the kernel protocol stack. When receiving the next time, it will start to receive the remaining data. // Small data tends to LT (horizontal trigger), big data tends to ET (edge trigger) // The horizontal trigger can receive all data at one time, while the edge trigger needs to go through the loop to receive all the data (assuming that the data cannot be received at one time) if(n > 0) { //rbuffer[n] = '\0'; //printf("recv : %s\n", rbuffer); memcpy(wbuffer, rbuffer, BUFFER_LENGTH); ev.events = EPOLLOUT; ev.data.fd = clientfd; epoll_ctl(r->epfd, EPOLL_CTL_MOD, clientfd, &ev); } else if(n == 0) // When the client is disconnected, the server's close() function needs to be called { free(rbuffer); free(wbuffer); item->fd = 0; close(clientfd); //events[i].data.fd = -1; } } else if(events[i].events & EPOLLOUT) // Determine whether IO is writable before each send, and send when it is writable // When the return value of send is less than the buffer length, it means that there is still data that has not been sent, and the remaining data needs to be sent again when the IO can be written. { struct sock_item* item = reactor_lookup(r, clientfd); char* wbuffer = item->wbuffer; int sent = send(clientfd, wbuffer, BUFFER_LENGTH, 0); //printf("sent : %d\n", sent); ev.events = EPOLLIN; ev.data.fd = clientfd; epoll_ctl(r->epfd, EPOLL_CTL_MOD, clientfd, &ev); } } } }
process:
- create reactor
- Create epfd and assign it to reactor->epfd
- Create PORT_COUNT listenfd and connect with PORT_COUNT ports (starting from 9999)
- Set the epoll_event->events corresponding to all listenfds to EPOLLIN (that is, listen to the input events of listenfd) and add them to the epoll listening list
- Execute loop:
- Use epoll_wait to return ready epoll_event
- Detect whether the fd corresponding to epoll_event is listenfd, if so, execute accpet() to connect to the client, add clientfd to the epoll listening list, call reactor_lookup to find clientfd, return the corresponding sock_item, and initialize sock_item (assign fd, allocate buffer )
- If it is clientfd and it is an input event, execute recv() to receive data from the client
- If it is clientfd and it is an output event, execute send() to send data to the client
Through testing, the code can achieve millions of concurrency.
Precautions before the millions of concurrent tests:
-
You need to prepare 4 virtual machines, 1 as a server (8G memory), 3 as a client (2G memory)
-
Modify the maximum number of open files, because fd also belongs to the file.
-
ulimit -a // View restrictions ulimit -n 1000000 // Change the maximum number of open files to 1000000 (temporary, reset after restart)
-
-
Modify the /etc/sysctl.conf file
-
open a file
-
vim /etc/sysctl.conf
-
-
Add the following at the bottom of the file
-
net.ipv4.tcp_mem = 262144 524288 786432 net.ipv4.tcp_wmem = 1024 1024 2048 net.ipv4.tcp_rmem = 1024 1024 2048 fs.file-max = 1048576 net.ipv4.tcp_max_orphans = 16384 net.ipv4.tcp_mem = 252144 524288 786432 net.ipv4.tcp_wmem = 2048 2048 4096 net.ipv4.tcp_rmem = 2048 2048 4096 fs.file-max = 1048576 net.nf_conntrack_max = 1048576 net.netfilter.nf_conntrack_tcp_timeout_established = 1200
-
-
Load system parameters from /etc/sysctl.conf
-
sysctl -p modprobe nf_conntrack
-
-
Reference blog:
Detailed explanation of ulimit command usage
Detailed explanation of sysctl command in Linux system
During high concurrent access, it prompts: Cannot assign requested address exception, the solution