Some knowledge
#include <sys/epoll.h> int epoll_create(int size); int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
(1) int epoll_create(int size);
Create an epoll handle, and size is used to tell the kernel how many listeners there are. This parameter is different from the first parameter in select(), which gives the maximum listening fd+1 value. It should be noted that when the epoll handle is created, it will occupy an fd value. Under linux, if you check / proc / process id/fd /, you can see this fd. Therefore, after using epoll, you must call close() to close it, otherwise fd may be exhausted.
(2)int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
Epoll's event registration function is different from select(), which tells the kernel what type of event to listen for when listening for events. Epoll's event registration function is different from select(), which tells the kernel what type of event to listen for when listening for events, but registers the event type to listen for here first. The first parameter is epoll_ The return value of create (). The second parameter represents the action, which is represented by three macros:
EPOLL_CTL_ADD: register a new fd into epfd;
EPOLL_CTL_MOD: modify the listening event of the registered fd;
EPOLL_CTL_DEL: delete an fd from epfd;
The third parameter is fd that needs to be monitored, and the fourth parameter tells the kernel what needs to be monitored, struct epoll_ The event structure is as follows:
struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ };
events can be a collection of the following macros:
The corresponding file descriptor of the SOCKET side: the SOCKET side can be closed;
EPOLLOUT: indicates that the corresponding file descriptor can be written;
EPOLLPRI: indicates that the corresponding file descriptor has urgent data readability (here it should indicate the arrival of out of band data);
EPOLLERR: indicates that the corresponding file descriptor has an error;
EPOLLHUP: indicates that the corresponding file descriptor is hung up;
EPOLLET: set EPOLL to edge triggered mode, which is relative to level triggered.
EPOLLONESHOT: only listen to one event. After listening to this event, if you need to continue listening to this socket, you need to add this socket to the EPOLL queue again
(3) int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
Wait for the event to occur, similar to the select() call. The parameter events is used to get the set of events from the kernel. Maxevents tells the kernel how big the events are, and the value of maxevents cannot be greater than the value of epoll created_ size when creating (). The parameter timeout is the timeout (in milliseconds, 0 will be returned immediately, - 1 will be uncertain. It is also said that it is permanently blocked). This function returns the number of events to be processed. If 0 is returned, it indicates that it has timed out.
Working mode
epoll operates on file descriptors in two modes: LT (level trigger) and ET (edge trigger). LT mode is the default mode. The differences between LT mode and et mode are as follows:
LT mode: when epoll_wait detects the occurrence of a descriptor event and notifies the application of this event. The application may not process the event immediately. Next call epoll_ When you wait, the application responds again and notifies you of this event.
ET mode: when epoll_wait detects the occurrence of a descriptor event and notifies the application of this event, which must be handled immediately. If not, epoll will be called next time_ When waiting, the application will not respond again and notify this event.
ET mode reduces the number of epoll events repeatedly triggered to a great extent, so the efficiency is higher than LT mode. When epoll works in ET mode, a non blocking socket interface must be used to avoid starving the task of processing multiple file descriptors due to the blocking read / write operation of a file handle.
example
#ifndef TCPSERVER_H__ #define TCPSERVER_H__ class CTcpEvent; class CTcpServer { public: static CTcpServer* instance(); ~CTcpServer(); bool loop(unsigned short port = 8086,const char* ip = 0); public: bool detach(CTcpEvent* pEvent); bool read(CTcpEvent* pEvent); bool write(CTcpEvent* pEvent); private: CTcpServer(); CTcpServer(const CTcpServer&); CTcpServer& operator=(const CTcpServer&); class CTcpServerPrivate; CTcpServerPrivate* const d; static CTcpServer* mp_instance; }; #endif //TCPSERVER_H__
#include "tcpserver.h" #include <stdio.h> #include <fcntl.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <stdlib.h> #include <time.h> #include <pthread.h> #include <signal.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/wait.h> #include <sys/socket.h> #include <sys/epoll.h> #include <arpa/inet.h> #include <netinet/in.h> #include <queue> #define MACRO_THREAD_COUNT 10 static void* eventThread(void* arg); class CMutex { public: CMutex() { pthread_mutex_init(&m_lock, NULL); } ~CMutex() { pthread_mutex_destroy(&m_lock); } void lock() { pthread_mutex_lock(&m_lock); } void unLock() { pthread_mutex_unlock(&m_lock); } bool tryLock() { return (0 == pthread_mutex_trylock(&m_lock)); } private: pthread_mutex_t m_lock; friend class CCond; }; class CCond { public: CCond() { pthread_cond_init(&m_cond, NULL); } ~CCond() { pthread_cond_destroy(&m_cond); } void wait(CMutex* pMutex) { pthread_cond_wait(&m_cond, &(pMutex->m_lock)); } void wake() { pthread_cond_signal(&m_cond); } void broadcast() { pthread_cond_broadcast(&m_cond); } private: pthread_cond_t m_cond; }; class CMutexLocker { public: CMutexLocker(CMutex* pMutex) { mp_mutex = pMutex; if(mp_mutex){ mp_mutex->lock(); } } ~CMutexLocker() { if(mp_mutex){ mp_mutex->unLock(); } } private: CMutex* mp_mutex; }; class CMutexTryLocker { public: CMutexTryLocker(CMutex* pMutex) { m_isLock = false; mp_mutex = pMutex; if(mp_mutex){ m_isLock = mp_mutex->tryLock(); } } bool isLock() const { return m_isLock; } ~CMutexTryLocker() { if(mp_mutex && m_isLock){ mp_mutex->unLock(); } } private: bool m_isLock; CMutex* mp_mutex; }; class CThreadPool { public: static CThreadPool* instance(); CThreadPool() { pthread_t th; for(int i = 0;i < MACRO_THREAD_COUNT ;i++){ if (pthread_create(&th, NULL, eventThread, NULL) != 0){ perror("Failed to create processing thread:"); exit(-1); } } } void add(CTcpEvent* pEvent) { CMutexLocker locker(&m_eventMutex); m_eventQueue.push(pEvent); m_eventcond.wake(); } CTcpEvent* get() { CMutexLocker locker(&m_eventMutex); if(0 == m_eventQueue.size()){ m_eventcond.wait(&m_eventMutex); } if(0 != m_eventQueue.size()){ CTcpEvent* pEvent = m_eventQueue.front(); m_eventQueue.pop(); return pEvent; } return NULL; } private: CCond m_eventcond; CMutex m_eventMutex; std::queue<CTcpEvent*> m_eventQueue; static CThreadPool* mp_instance; }; CThreadPool* CThreadPool::mp_instance = NULL; CThreadPool *CThreadPool::instance() { if(NULL == mp_instance){ static CMutex mutex; mutex.lock(); if(NULL == mp_instance){ mp_instance = new CThreadPool; } mutex.unLock(); } return mp_instance; } enum eEventStatus { eEPOLLIN = 0x01, eEPOLLOUT = 0x02, eEPOLLDEL = 0x04 }; class CTcpEvent { public: CTcpEvent(int fd=-1); virtual ~CTcpEvent(); virtual bool read() = 0; virtual bool write(){return true;} virtual bool threadread(){return true;} virtual bool threadwrite(){return true;} protected: int m_fd; eEventStatus m_eventstatus; unsigned int m_events;// EPOLLIN EPOLLOUT unsigned int m_streamLen; char m_tcpStream[1500]; friend class CTcpServer; friend void* eventThread(void* arg); }; CTcpEvent::CTcpEvent(int fd) :m_fd(fd), m_eventstatus(eEPOLLDEL), m_events(0), m_streamLen(0) {} CTcpEvent::~CTcpEvent() {} class ClientEvent:public CTcpEvent { public: ClientEvent(int fd=-1) :CTcpEvent(fd) {} ~ClientEvent() { printf("~ClientEvent\n"); } public: bool read() { CTcpServer::instance()->detach(this); CThreadPool::instance()->add(this); return true; } bool write() { CTcpServer::instance()->detach(this); CThreadPool::instance()->add(this); return true; } bool threadread() { int len = recv(m_fd,m_tcpStream,sizeof(m_tcpStream),0); if(len > 0){ m_streamLen = len; m_tcpStream[len] = '\0'; printf("R[%d]:\n%s\n",m_fd,m_tcpStream); //TODO business processing int nCount = 0; while(1){ if(CTcpServer::instance()->write(this)){ break; } nCount++; if(nCount > 9){ close(m_fd); return false; } sleep(1); } }else{ close(m_fd); return false; } return true; } bool threadwrite() { int len = send(this->m_fd,this->m_tcpStream,m_streamLen,0); if(len > 0){ this->m_streamLen = len; this->m_tcpStream[len] = '\0'; printf("W[%d]:\n%s\n",m_fd,m_tcpStream); int nCount = 0; while(1){ if(CTcpServer::instance()->read(this)){ break; } nCount++; if(nCount > 9){ close(m_fd); return false; } sleep(1); } }else{ close(m_fd); return false; } return true; } }; class ServerEvent:public CTcpEvent { public: ServerEvent(int fd=-1) :CTcpEvent(fd) {} bool read() { int cfd = -1; struct sockaddr_in cin; socklen_t len = sizeof(cin); memset(&cin,0,sizeof (cin)); do{ if((cfd = accept(this->m_fd,(struct sockaddr*)&cin,&len)) == -1){ printf("%s:accept,%s\n", __func__, strerror(errno)); break; } int curr_flags = fcntl(cfd, F_GETFL); if (curr_flags < 0) { perror("fcntl-F_GETFL"); curr_flags = 0; } if (fcntl(cfd, F_SETFL, curr_flags|O_NONBLOCK) != 0) { perror("fcntl-F_SETFL O_NONBLOCK"); break; } ClientEvent* pClient = new ClientEvent(cfd); if(!CTcpServer::instance()->read(pClient)){ close(cfd); delete pClient; } }while(0); return true; } }; class CTcpServer::CTcpServerPrivate { public: CTcpServerPrivate() :m_bLoop(false),m_serverPort(0),m_epollFD(-1),m_currntWaitEvents(0) {} public: bool m_bLoop; short m_serverPort; int m_epollFD; unsigned int m_currntWaitEvents; CMutex m_loopMutex; CMutex m_eventsMutex; static const unsigned int m_maxEvents=1020; }; CTcpServer* CTcpServer::mp_instance; CTcpServer* CTcpServer::instance() { if(NULL == mp_instance){ static CMutex mutex; mutex.lock(); if(NULL == mp_instance){ mp_instance = new CTcpServer; } mutex.unLock(); } return mp_instance; } CTcpServer::CTcpServer() :d(new CTcpServerPrivate) { } CTcpServer::~CTcpServer() { delete d; } bool CTcpServer::loop(unsigned short port,const char* ip) { CMutexTryLocker lock(&(d->m_loopMutex)); if(!lock.isLock()){ return false; } int lfd = socket(AF_INET, SOCK_STREAM, 0); if (lfd <= 0) { perror("socket"); return false; } int reuseaddr = 1; if(setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr,sizeof(reuseaddr)) != 0){ perror("setsockopt: reuseaddr"); } int curr_flags = fcntl(lfd, F_GETFL); if (curr_flags < 0) { perror("fcntl-F_GETFL"); } if (fcntl(lfd, F_SETFL, curr_flags|O_NONBLOCK) != 0) { perror("fcntl-F_SETFL O_NONBLOCK"); } struct sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; if(ip){ sin.sin_addr.s_addr = inet_addr(ip); }else{ sin.sin_addr.s_addr = INADDR_ANY; } sin.sin_port = htons(port); if(bind(lfd,(struct sockaddr*)&sin,sizeof(sin)) != 0){ perror("bind "); close(lfd); return false; } if(listen(lfd,20) != 0){ perror("listen "); close(lfd); return false; } d->m_epollFD = epoll_create(d->m_maxEvents+1); if(d->m_epollFD < 1){ close(lfd); return false; } ServerEvent* pServerEvent = new ServerEvent(lfd); read(pServerEvent); struct epoll_event* events = (struct epoll_event*)malloc(sizeof(struct epoll_event)); memset(events,0,sizeof (struct epoll_event)); d->m_bLoop = true; while(d->m_bLoop){ int nfd = epoll_wait(d->m_epollFD,events,d->m_maxEvents+1,1000); if (nfd < 0) { break; } for(int i = 0;i < nfd;i++){ CTcpEvent* pEvent = (CTcpEvent*)(events[i].data.ptr); if(NULL == pEvent){ continue; } if( (events[i].events & EPOLLIN) && (pEvent->m_events &EPOLLIN) ){ pEvent->read(); } if( (events[i].events & EPOLLOUT) && (pEvent->m_events &EPOLLOUT) ){ pEvent->write(); } } } free(events); close(lfd); close(d->m_epollFD); d->m_epollFD = -1; return true; } bool CTcpServer::detach(CTcpEvent* pEvent) { CMutexLocker lock(&(d->m_eventsMutex)); if(d->m_epollFD < 1){ return false; } if(eEPOLLDEL == pEvent->m_eventstatus){ return false; } struct epoll_event epv = { 0,{0}}; pEvent->m_eventstatus = eEPOLLDEL; if(epoll_ctl(d->m_epollFD,EPOLL_CTL_DEL,pEvent->m_fd,&epv) < 0){ perror("epoll_ctl EPOLL_CTL_DEL"); } d->m_currntWaitEvents--; return true; } bool CTcpServer::read(CTcpEvent* pEvent) { CMutexLocker lock(&(d->m_eventsMutex)); if(d->m_epollFD < 1){ return false; } int op = EPOLL_CTL_ADD; if(eEPOLLIN == pEvent->m_eventstatus){ return true; } if(d->m_currntWaitEvents < d->m_maxEvents){ struct epoll_event epv; memset(&epv,0,sizeof(struct epoll_event)); epv.data.ptr = pEvent; epv.events = pEvent->m_events = EPOLLIN; if(eEPOLLOUT == pEvent->m_eventstatus){ op = EPOLL_CTL_MOD; } pEvent->m_eventstatus = eEPOLLIN; if(epoll_ctl(d->m_epollFD,op,pEvent->m_fd,&epv)<0){ perror("epoll_ctl add/mod EPOLLIN"); return false; } if(op == EPOLL_CTL_ADD){ d->m_currntWaitEvents++; } return true; } return false; } bool CTcpServer::write(CTcpEvent *pEvent) { CMutexLocker lock(&(d->m_eventsMutex)); if(d->m_epollFD < 1){ return false; } int op = EPOLL_CTL_ADD; if(eEPOLLOUT == pEvent->m_eventstatus){ return true; } if(d->m_currntWaitEvents < d->m_maxEvents){ struct epoll_event epv; memset(&epv,0,sizeof(struct epoll_event)); epv.data.ptr = pEvent; epv.events = pEvent->m_events = EPOLLOUT; if(eEPOLLIN == pEvent->m_eventstatus){ op = EPOLL_CTL_MOD; } pEvent->m_eventstatus = eEPOLLOUT; if(epoll_ctl(d->m_epollFD,op,pEvent->m_fd,&epv)<0){ perror("epoll_ctl add/mod EPOLLOUT "); return false; } if(op == EPOLL_CTL_ADD){ d->m_currntWaitEvents++; } return true; } return false; } static void* eventThread(void* arg) { pthread_detach(pthread_self()); while (1) { CTcpEvent* pEvent = CThreadPool::instance()->get(); if(NULL == pEvent){ continue; } if( pEvent->m_events &EPOLLIN){ if(!pEvent->threadread()){ delete pEvent; } continue; } else if(pEvent->m_events &EPOLLOUT){ if(!pEvent->threadwrite()){ delete pEvent; } continue; } } } static void sig_exit(int sig) { fprintf(stderr, "sig_exit:%d\n",sig); exit(0); } int main() { (void) signal(SIGUSR1, SIG_IGN); (void) signal(SIGTSTP, SIG_IGN); (void) signal(SIGINT, SIG_IGN); (void) signal(SIGPIPE, SIG_IGN); (void) signal(SIGKILL, sig_exit); (void) signal(SIGSEGV, sig_exit); pid_t pid = -1; while(1){ pid = fork(); if(pid == 0){ (void) signal(SIGUSR1, SIG_IGN); (void) signal(SIGTSTP, SIG_IGN); (void) signal(SIGINT, SIG_IGN); (void) signal(SIGPIPE, SIG_IGN); (void) signal(SIGKILL, sig_exit); (void) signal(SIGSEGV, sig_exit); printf("this child, ppid = %d pid=%d\n", getppid(),getpid()); CThreadPool::instance(); CTcpServer::instance(); CTcpServer::instance()->loop(); } if(pid > 0){ int nState = 0; printf("this parent, ppid=%d pid=%d\n", getppid(),getpid()); pid_t child = wait(&nState); printf("child = %d\n", child); printf("status = %d\n", WEXITSTATUS(nState)); printf("parent process end\n"); } } }