源码阅读:C语言epoll模型

Posted by LB on Fri, Jun 8, 2018

背景

epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。epoll除了提供select/poll那种IO事件的水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。

#模型代码 今天给大家分享一下EPOLL的相关模型代码:

服务端代码

  1#include <sys/types.h>
  2#include <sys/signalfd.h>
  3#include <sys/epoll.h>
  4#include <errno.h>
  5#include <poll.h>
  6#include <signal.h>
  7#include <limits.h>
  8#include <stdio.h>
  9#include <stdlib.h>
 10#include <unistd.h>
 11#include <fcntl.h>
 12#include <string.h>
 13#include <assert.h>
 14#include <sys/ioctl.h>
 15#include <sys/socket.h>
 16#include <netinet/in.h>
 17#include <arpa/inet.h>
 18#include "utarray.h"
 19
 20/*****************************************************************************
 21 * This program demonstrates epoll-based event notification. It monitors for
 22 * new client connections, input on existing connections or their closure, as
 23 * well as signals. The signals are also accepted via a file descriptor using
 24 * signalfd. This program uses blocking I/O in all cases. The epoll mechanism
 25 * tell us which exactly which descriptor is ready, when it is ready, so there
 26 * is no need for speculative/non-blocking reads. This program is more
 27 * efficient than sigio-server.c.
 28 *
 29 * Troy D. Hanson
 30 ****************************************************************************/
 31
 32struct {
 33  in_addr_t addr;    /* local IP or INADDR_ANY   */
 34  int port;          /* local port to listen on  */
 35  int fd;            /* listener descriptor      */
 36  UT_array *fds;     /* array of client descriptors */
 37  int signal_fd;     /* used to receive signals  */
 38  int epoll_fd;      /* used for all notification*/
 39  int verbose;
 40  int ticks;         /* uptime in seconds        */
 41  int pid;           /* our own pid              */
 42  char *prog;
 43} cfg = {
 44  .addr = INADDR_ANY, /* by default, listen on all local IP's   */
 45  .fd = -1,
 46  .signal_fd = -1,
 47  .epoll_fd = -1,
 48};
 49
 50void usage() {
 51  fprintf(stderr,"usage: %s [-v] [-a <ip>] -p <port\n", cfg.prog);
 52  exit(-1);
 53}
 54
 55/* do periodic work here */
 56void periodic() {
 57  if (cfg.verbose) fprintf(stderr,"up %d seconds\n", cfg.ticks);
 58}
 59
 60int add_epoll(int events, int fd) {
 61  int rc;
 62  struct epoll_event ev;
 63  memset(&ev,0,sizeof(ev)); // placate valgrind
 64  ev.events = events;
 65  ev.data.fd= fd;
 66  if (cfg.verbose) fprintf(stderr,"adding fd %d to epoll\n", fd);
 67  rc = epoll_ctl(cfg.epoll_fd, EPOLL_CTL_ADD, fd, &ev);
 68  if (rc == -1) {
 69    fprintf(stderr,"epoll_ctl: %s\n", strerror(errno));
 70  }
 71  return rc;
 72}
 73
 74int del_epoll(int fd) {
 75  int rc;
 76  struct epoll_event ev;
 77  rc = epoll_ctl(cfg.epoll_fd, EPOLL_CTL_DEL, fd, &ev);
 78  if (rc == -1) {
 79    fprintf(stderr,"epoll_ctl: %s\n", strerror(errno));
 80  }
 81  return rc;
 82}
 83
 84/* signals that we'll accept synchronously via signalfd */
 85int sigs[] = {SIGIO,SIGHUP,SIGTERM,SIGINT,SIGQUIT,SIGALRM};
 86
 87int setup_listener() {
 88  int rc = -1, one=1;
 89
 90  int fd = socket(AF_INET, SOCK_STREAM, 0);
 91  if (fd == -1) {
 92    fprintf(stderr,"socket: %s\n", strerror(errno));
 93    goto done;
 94  }
 95
 96  /**********************************************************
 97   * internet socket address structure: our address and port
 98   *********************************************************/
 99  struct sockaddr_in sin;
100  sin.sin_family = AF_INET;
101  sin.sin_addr.s_addr = cfg.addr;
102  sin.sin_port = htons(cfg.port);
103
104  /**********************************************************
105   * bind socket to address and port 
106   *********************************************************/
107  setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
108  if (bind(fd, (struct sockaddr*)&sin, sizeof(sin)) == -1) {
109    fprintf(stderr,"bind: %s\n", strerror(errno));
110    goto done;
111  }
112
113  /**********************************************************
114   * put socket into listening state
115   *********************************************************/
116  if (listen(fd,1) == -1) {
117    fprintf(stderr,"listen: %s\n", strerror(errno));
118    goto done;
119  }
120
121  cfg.fd = fd;
122  rc=0;
123
124 done:
125  if ((rc < 0) && (fd != -1)) close(fd);
126  return rc;
127}
128
129/* accept a new client connection to the listening socket */
130int accept_client() {
131  int fd;
132  struct sockaddr_in in;
133  socklen_t sz = sizeof(in);
134
135  fd = accept(cfg.fd,(struct sockaddr*)&in, &sz);
136  if (fd == -1) {
137    fprintf(stderr,"accept: %s\n", strerror(errno)); 
138    goto done;
139  }
140
141  if (cfg.verbose && (sizeof(in)==sz)) {
142    fprintf(stderr,"connection fd %d from %s:%d\n", fd,
143    inet_ntoa(in.sin_addr), (int)ntohs(in.sin_port));
144  }
145
146  if (add_epoll(EPOLLIN, fd) == -1) { close(fd); fd = -1; }
147
148 done:
149  if (fd != -1) utarray_push_back(cfg.fds,&fd);
150  return fd;
151}
152
153void drain_client(int fd) {
154  int rc, pos, *fp;
155  char buf[1024];
156
157  rc = read(fd, buf, sizeof(buf));
158  switch(rc) { 
159    default: fprintf(stderr,"received %d bytes\n", rc);         break;
160    case  0: fprintf(stderr,"fd %d closed\n", fd);              break;
161    case -1: fprintf(stderr, "recv: %s\n", strerror(errno));    break;
162  }
163
164  if (rc != 0) return;
165
166  /* client closed. log it, tell epoll to forget it, close it */
167  if (cfg.verbose) fprintf(stderr,"client %d has closed\n", fd);
168  del_epoll(fd);
169  close(fd);
170
171  /* delete record of fd. linear scan. want hash if lots of fds */
172  fp=NULL;
173  while ( (fp=(int*)utarray_next(cfg.fds,fp))) { 
174    if (*fp != fd) continue;
175    pos = utarray_eltidx(cfg.fds,fp);
176    utarray_erase(cfg.fds,pos,1);
177    break;
178  }
179
180}
181
182int main(int argc, char *argv[]) {
183  cfg.prog = argv[0];
184  cfg.prog=argv[0];
185  cfg.pid = getpid();
186  int n, opt, *fd;
187  struct epoll_event ev;
188  struct signalfd_siginfo info;
189
190  utarray_new(cfg.fds,&ut_int_icd);
191
192  while ( (opt=getopt(argc,argv,"vp:a:h")) != -1) {
193    switch(opt) {
194      case 'v': cfg.verbose++; break;
195      case 'p': cfg.port=atoi(optarg); break; 
196      case 'a': cfg.addr=inet_addr(optarg); break; 
197      case 'h': default: usage(); break;
198    }
199  }
200  if (cfg.addr == INADDR_NONE) usage();
201  if (cfg.port==0) usage();
202
203  /* block all signals. we take signals synchronously via signalfd */
204  sigset_t all;
205  sigfillset(&all);
206  sigprocmask(SIG_SETMASK,&all,NULL);
207
208  /* a few signals we'll accept via our signalfd */
209  sigset_t sw;
210  sigemptyset(&sw);
211  for(n=0; n < sizeof(sigs)/sizeof(*sigs); n++) sigaddset(&sw, sigs[n]);
212
213  if (setup_listener()) goto done;
214
215  /* create the signalfd for receiving signals */
216  cfg.signal_fd = signalfd(-1, &sw, 0);
217  if (cfg.signal_fd == -1) {
218    fprintf(stderr,"signalfd: %s\n", strerror(errno));
219    goto done;
220  }
221
222  /* set up the epoll instance */
223  cfg.epoll_fd = epoll_create(1); 
224  if (cfg.epoll_fd == -1) {
225    fprintf(stderr,"epoll: %s\n", strerror(errno));
226    goto done;
227  }
228
229  /* add descriptors of interest */
230  if (add_epoll(EPOLLIN, cfg.fd))        goto done; // listening socket
231  if (add_epoll(EPOLLIN, cfg.signal_fd)) goto done; // signal socket
232
233  /*
234   * This is our main loop. epoll for input or signals.
235   */
236  alarm(1);
237  while (epoll_wait(cfg.epoll_fd, &ev, 1, -1) > 0) {
238
239    /* if a signal was sent to us, read its signalfd_siginfo */
240    if (ev.data.fd == cfg.signal_fd) { 
241      if (read(cfg.signal_fd, &info, sizeof(info)) != sizeof(info)) {
242        fprintf(stderr,"failed to read signal fd buffer\n");
243        continue;
244      }
245      switch (info.ssi_signo) {
246        case SIGALRM: 
247          if ((++cfg.ticks % 10) == 0) periodic(); 
248          alarm(1); 
249          continue;
250        default:  /* exit */
251          fprintf(stderr,"got signal %d\n", info.ssi_signo);  
252          goto done;
253      }
254    }
255
256    /* regular POLLIN. handle the particular descriptor that's ready */
257    assert(ev.events & EPOLLIN);
258    if (cfg.verbose) fprintf(stderr,"handle POLLIN on fd %d\n", ev.data.fd);
259    if (ev.data.fd == cfg.fd) accept_client();
260    else drain_client(ev.data.fd);
261
262  }
263
264  fprintf(stderr, "epoll_wait: %s\n", strerror(errno));
265
266 done:   /* we get here if we got a signal like Ctrl-C */
267  fd=NULL;
268  while ( (fd=(int*)utarray_next(cfg.fds,fd))) {del_epoll(*fd); close(*fd);}
269  utarray_free(cfg.fds);
270  if (cfg.fd != -1) close(cfg.fd);
271  if (cfg.epoll_fd != -1) close(cfg.epoll_fd);
272  if (cfg.signal_fd != -1) close(cfg.signal_fd);
273  return 0;
274}

utarray头文件

  1/*
  2Copyright (c) 2008-2014, Troy D. Hanson   http://troydhanson.github.com/uthash/
  3All rights reserved.
  4
  5Redistribution and use in source and binary forms, with or without
  6modification, are permitted provided that the following conditions are met:
  7
  8    * Redistributions of source code must retain the above copyright
  9      notice, this list of conditions and the following disclaimer.
 10
 11THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
 12IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
 13TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
 14PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
 15OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 16EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 17PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 18PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 19LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
 20NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 21SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 22*/
 23
 24/* a dynamic array implementation using macros 
 25 */
 26#ifndef UTARRAY_H
 27#define UTARRAY_H
 28
 29#define UTARRAY_VERSION 1.9.9
 30
 31#ifdef __GNUC__
 32#define _UNUSED_ __attribute__ ((__unused__)) 
 33#else
 34#define _UNUSED_ 
 35#endif
 36
 37#include <stddef.h>  /* size_t */
 38#include <string.h>  /* memset, etc */
 39#include <stdlib.h>  /* exit */
 40
 41#define oom() exit(-1)
 42
 43typedef void (ctor_f)(void *dst, const void *src);
 44typedef void (dtor_f)(void *elt);
 45typedef void (init_f)(void *elt);
 46typedef struct {
 47    size_t sz;
 48    init_f *init;
 49    ctor_f *copy;
 50    dtor_f *dtor;
 51} UT_icd;
 52
 53typedef struct {
 54    unsigned i,n;/* i: index of next available slot, n: num slots */
 55    UT_icd icd;  /* initializer, copy and destructor functions */
 56    char *d;     /* n slots of size icd->sz*/
 57} UT_array;
 58
 59#define utarray_init(a,_icd) do {                                             \
 60  memset(a,0,sizeof(UT_array));                                               \
 61  (a)->icd=*_icd;                                                             \
 62} while(0)
 63
 64#define utarray_done(a) do {                                                  \
 65  if ((a)->n) {                                                               \
 66    if ((a)->icd.dtor) {                                                      \
 67      size_t _ut_i;                                                           \
 68      for(_ut_i=0; _ut_i < (a)->i; _ut_i++) {                                 \
 69        (a)->icd.dtor(utarray_eltptr(a,_ut_i));                               \
 70      }                                                                       \
 71    }                                                                         \
 72    free((a)->d);                                                             \
 73  }                                                                           \
 74  (a)->n=0;                                                                   \
 75} while(0)
 76
 77#define utarray_new(a,_icd) do {                                              \
 78  a=(UT_array*)malloc(sizeof(UT_array));                                      \
 79  utarray_init(a,_icd);                                                       \
 80} while(0)
 81
 82#define utarray_free(a) do {                                                  \
 83  utarray_done(a);                                                            \
 84  free(a);                                                                    \
 85} while(0)
 86
 87#define utarray_reserve(a,by) do {                                            \
 88  if (((a)->i+by) > ((a)->n)) {                                               \
 89    while(((a)->i+by) > ((a)->n)) { (a)->n = ((a)->n ? (2*(a)->n) : 8); }     \
 90    if ( ((a)->d=(char*)realloc((a)->d, (a)->n*(a)->icd.sz)) == NULL) oom();  \
 91  }                                                                           \
 92} while(0)
 93
 94#define utarray_push_back(a,p) do {                                           \
 95  utarray_reserve(a,1);                                                       \
 96  if ((a)->icd.copy) { (a)->icd.copy( _utarray_eltptr(a,(a)->i++), p); }      \
 97  else { memcpy(_utarray_eltptr(a,(a)->i++), p, (a)->icd.sz); };              \
 98} while(0)
 99
100#define utarray_pop_back(a) do {                                              \
101  if ((a)->icd.dtor) { (a)->icd.dtor( _utarray_eltptr(a,--((a)->i))); }       \
102  else { (a)->i--; }                                                          \
103} while(0)
104
105#define utarray_extend_back(a) do {                                           \
106  utarray_reserve(a,1);                                                       \
107  if ((a)->icd.init) { (a)->icd.init(_utarray_eltptr(a,(a)->i)); }            \
108  else { memset(_utarray_eltptr(a,(a)->i),0,(a)->icd.sz); }                   \
109  (a)->i++;                                                                   \
110} while(0)
111
112#define utarray_len(a) ((a)->i)
113
114#define utarray_eltptr(a,j) (((j) < (a)->i) ? _utarray_eltptr(a,j) : NULL)
115#define _utarray_eltptr(a,j) ((char*)((a)->d + ((a)->icd.sz*(j) )))
116
117#define utarray_insert(a,p,j) do {                                            \
118  if (j > (a)->i) utarray_resize(a,j);                                        \
119  utarray_reserve(a,1);                                                       \
120  if ((j) < (a)->i) {                                                         \
121    memmove( _utarray_eltptr(a,(j)+1), _utarray_eltptr(a,j),                  \
122             ((a)->i - (j))*((a)->icd.sz));                                   \
123  }                                                                           \
124  if ((a)->icd.copy) { (a)->icd.copy( _utarray_eltptr(a,j), p); }             \
125  else { memcpy(_utarray_eltptr(a,j), p, (a)->icd.sz); };                     \
126  (a)->i++;                                                                   \
127} while(0)
128
129#define utarray_inserta(a,w,j) do {                                           \
130  if (utarray_len(w) == 0) break;                                             \
131  if (j > (a)->i) utarray_resize(a,j);                                        \
132  utarray_reserve(a,utarray_len(w));                                          \
133  if ((j) < (a)->i) {                                                         \
134    memmove(_utarray_eltptr(a,(j)+utarray_len(w)),                            \
135            _utarray_eltptr(a,j),                                             \
136            ((a)->i - (j))*((a)->icd.sz));                                    \
137  }                                                                           \
138  if ((a)->icd.copy) {                                                        \
139    size_t _ut_i;                                                             \
140    for(_ut_i=0;_ut_i<(w)->i;_ut_i++) {                                       \
141      (a)->icd.copy(_utarray_eltptr(a,j+_ut_i), _utarray_eltptr(w,_ut_i));    \
142    }                                                                         \
143  } else {                                                                    \
144    memcpy(_utarray_eltptr(a,j), _utarray_eltptr(w,0),                        \
145           utarray_len(w)*((a)->icd.sz));                                     \
146  }                                                                           \
147  (a)->i += utarray_len(w);                                                   \
148} while(0)
149
150#define utarray_resize(dst,num) do {                                          \
151  size_t _ut_i;                                                               \
152  if (dst->i > (size_t)(num)) {                                               \
153    if ((dst)->icd.dtor) {                                                    \
154      for(_ut_i=num; _ut_i < dst->i; _ut_i++) {                               \
155        (dst)->icd.dtor(utarray_eltptr(dst,_ut_i));                           \
156      }                                                                       \
157    }                                                                         \
158  } else if (dst->i < (size_t)(num)) {                                        \
159    utarray_reserve(dst,num-dst->i);                                          \
160    if ((dst)->icd.init) {                                                    \
161      for(_ut_i=dst->i; _ut_i < num; _ut_i++) {                               \
162        (dst)->icd.init(utarray_eltptr(dst,_ut_i));                           \
163      }                                                                       \
164    } else {                                                                  \
165      memset(_utarray_eltptr(dst,dst->i),0,(dst)->icd.sz*(num-dst->i));       \
166    }                                                                         \
167  }                                                                           \
168  dst->i = num;                                                               \
169} while(0)
170
171#define utarray_concat(dst,src) do {                                          \
172  utarray_inserta((dst),(src),utarray_len(dst));                              \
173} while(0)
174
175#define utarray_erase(a,pos,len) do {                                         \
176  if ((a)->icd.dtor) {                                                        \
177    size_t _ut_i;                                                             \
178    for(_ut_i=0; _ut_i < len; _ut_i++) {                                      \
179      (a)->icd.dtor(utarray_eltptr((a),pos+_ut_i));                           \
180    }                                                                         \
181  }                                                                           \
182  if ((a)->i > (pos+len)) {                                                   \
183    memmove( _utarray_eltptr((a),pos), _utarray_eltptr((a),pos+len),          \
184            (((a)->i)-(pos+len))*((a)->icd.sz));                              \
185  }                                                                           \
186  (a)->i -= (len);                                                            \
187} while(0)
188
189#define utarray_renew(a,u) do {                                               \
190  if (a) utarray_clear(a); \
191  else utarray_new((a),(u));   \
192} while(0) 
193
194#define utarray_clear(a) do {                                                 \
195  if ((a)->i > 0) {                                                           \
196    if ((a)->icd.dtor) {                                                      \
197      size_t _ut_i;                                                           \
198      for(_ut_i=0; _ut_i < (a)->i; _ut_i++) {                                 \
199        (a)->icd.dtor(utarray_eltptr(a,_ut_i));                               \
200      }                                                                       \
201    }                                                                         \
202    (a)->i = 0;                                                               \
203  }                                                                           \
204} while(0)
205
206#define utarray_sort(a,cmp) do {                                              \
207  qsort((a)->d, (a)->i, (a)->icd.sz, cmp);                                    \
208} while(0)
209
210#define utarray_find(a,v,cmp) bsearch((v),(a)->d,(a)->i,(a)->icd.sz,cmp)
211
212#define utarray_front(a) (((a)->i) ? (_utarray_eltptr(a,0)) : NULL)
213#define utarray_next(a,e) (((e)==NULL) ? utarray_front(a) : ((((a)->i) > (utarray_eltidx(a,e)+1)) ? _utarray_eltptr(a,utarray_eltidx(a,e)+1) : NULL))
214#define utarray_prev(a,e) (((e)==NULL) ? utarray_back(a) : ((utarray_eltidx(a,e) > 0) ? _utarray_eltptr(a,utarray_eltidx(a,e)-1) : NULL))
215#define utarray_back(a) (((a)->i) ? (_utarray_eltptr(a,(a)->i-1)) : NULL)
216#define utarray_eltidx(a,e) (((char*)(e) >= (char*)((a)->d)) ? (((char*)(e) - (char*)((a)->d))/(size_t)(a)->icd.sz) : -1)
217
218/* last we pre-define a few icd for common utarrays of ints and strings */
219static void utarray_str_cpy(void *dst, const void *src) {
220  char **_src = (char**)src, **_dst = (char**)dst;
221  *_dst = (*_src == NULL) ? NULL : strdup(*_src);
222}
223static void utarray_str_dtor(void *elt) {
224  char **eltc = (char**)elt;
225  if (*eltc) free(*eltc);
226}
227static const UT_icd ut_str_icd _UNUSED_ = {sizeof(char*),NULL,utarray_str_cpy,utarray_str_dtor};
228static const UT_icd ut_int_icd _UNUSED_ = {sizeof(int),NULL,NULL,NULL};
229static const UT_icd ut_ptr_icd _UNUSED_ = {sizeof(void*),NULL,NULL,NULL};
230
231
232#endif /* UTARRAY_H */

源码来源:https://github.com/troydhanson/network/tree/master/tcp/server