C++后端开发——Reactor百万并发实现

6

主题

10

帖子

22

积分

新手上路

Rank: 1

积分
22
发表于 2022-9-20 13:49:54 | 显示全部楼层
1.Reactor模型

在此对Reactor模型进行回顾
首先来回想一下普通函数调用的机制:程序调用某函数,函数执行,程序等待,函数将 结果和控制权返回给程序,程序继续处理。
Reactor 释义“反应堆”,是一种事件驱动机制。 和普通函数调用的不同之处在于:应用程序不是主动的调用某个 API 完成处理,而是恰恰 相反,Reactor 逆置了事件处理流程,应用程序需要提供相应的接口并注册到 Reactor 上, 如果相应的时间发生,Reactor 将主动调用应用程序注册的接口,这些接口又称为“回调函数”。
Reactor 模式是处理并发 I/O 比较常见的一种模式,用于同步 I/O,中心思想是将所有要 处理的 I/O 事件注册到一个中心 I/O 多路复用器上,同时主线程/进程阻塞在多路复用器上;一旦有 I/O 事件到来或是准备就绪(文件描述符或 socket 可读、写),多路复用器返回并将事 先注册的相应 I/O 事件分发到对应的处理器中。
1.1 Reactor 模型组件

Reactor模型有三个重要的组件:

  • 多路复用器:由操作系统提供,在Linux上一般是select,poll,epoll等系统调用。
  • 事件分发器:将多路复用器中返回的就绪事件分到对应的处理函数中。
  • 事件处理器:复制处理特定事件的处理函数(回调函数)


1.2 Reactor 模型具体流程

具体流程如下:

  • 注册读就绪事件和相应的事件处理器;
  • 事件分离器等待事件;
  • 事件到来,激活分离器,分离器调用事件对应的处理器;
  • 事件处理器完成事件的读操作,处理读到的数据,注册新的事件,然后返还控制权。
1.3 Reactor 模型优点


  • 响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的;
  • 编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/多进程的切换开销;
  • 可扩展性,可以方便的通过增加Reactor实例个数了充分利用CPU资源;
  • 可复用性,reactor框架本身与具体事件处理逻辑无关,具有很高的复用性
Reactor 模型开发效率上比起直接使用 IO 复用要高,它通常是单线程的,设计目标是 希望单线程使用一颗 CPU 的全部资源,但也有附带优点,即每个事件处理中很多时候可以 不考虑共享资源的互斥访问。可是缺点也是明显的,现在的硬件发展,已经不再遵循摩尔定 律,CPU 的频率受制于材料的限制不再有大的提升,而改为是从核数的增加上提升能力, 当程序需要使用多核资源时,Reactor 模型就会悲剧。
如果程序业务很简单,例如只是简单的访问一些提供了并发访问的服务,就可以直接开 启多个反应堆,每个反应堆对应一颗 CPU 核心,这些反应堆上跑的请求互不相关,这是完 全可以利用多核的。例如 Nginx 这样的 http 静态服务器。
1.4 百万并发与五元组

五元组定义描述
源IP客户端IP
目的IP服务端IP
源端口客户端port:0~65535
目的端口服务端port:0~65535
PROTOCOL协议类型:TCP/UDP
每一个五元组就能确定一个网络IO
查看端口范围:
cat /proc/sys/net/ipv4/ip_local_port_range

  • 百万并发基础
已知ubuntu-1604的默认的端口范围:32768-60999
假设client有20000个端口能用,server开100个端口,用2个client去连接server,则理论最大连接数是:20000 x 100 x 2 = 400万。
2.epoll触发方式

epoll 的描述符事件有两种触发模式:LT(level trigger)和 ET(edge trigger)。
2.1 LT 模式(水平触发,默认方式)

描述:
当epoll_wait检测到描述符事件到达时,将此事件通知进程,进程可以不立即处理该事件,下次调用 epoll_wait()会再次通知进程。是默认的一种模式,并且同时支持 Blocking 和 No-Blocking。即LT 模式下无论是否设置了EPOLLONESHOT,都是epoll_wait检测缓冲区有没有数据,有就返回,否则等待;
适用范围:
小数据,以业务为导向(recv一次性能够读完)
2.2 ET 模式(边沿触发)

描述:
和 LT 模式不同的是,通知之后进程必须立即处理事件,下次再调用 epoll_wait() 时不会再得到事件到达的通知。很大程度上减少了 epoll 事件被重复触发的次数,因此效率要比 LT 模式高。只支持 No-Blocking,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。
适用范围:
并发量大,界定数据包(反向代理)
3.实现过程

这是之前的reactor模型,reactor结构体直接有个指向ntyevent头节点的指针*ntyevent


这是百万并发的reactor模型,reactor结构体中eventblock指针指向eventblock链表的头结点,eventblock结构体有指向下一一个eventblock的next指针,也有指向ntyevent头节点的指针


3.1 相关结构体

3.1.0 宏定义

//缓冲区长度
#define BUFFER_LENGTH 4096
// epoll中的事物数量
#define MAX_EPOLL_EVENTS 1024
#define SERVER_PORT 8888
#define PORT_COUNT 100
3.1.1 ntyevent

这是一个事件结构体,它绑定了socket的fd,以及需要监听的事件(events)和事件处理器(callback),还绑定了所属的reactor对象(arg)。
//事务结构体
struct ntyevent
{
    int fd;                                         //事务的fd
    int events;                                     // epoll events类型
    void *arg;                                      //需要传给回调函数的参数,一般传的是reactor的指针
    int (*callback)(int fd, int events, void *arg); //对应的回调函数
    int status;                                     // 0:新建 1:已存在
    char buffer[BUFFER_LENGTH];                     //接收到的消息
    int length;
    long last_active;

    // http 参数
    int method;
    char resource[BUFFER_LENGTH];
    int ret_code;
};
3.1.2 eventblock

这是一个事件块,每一个事件块包含了定长的事件存储数组,以及指向下一个块的指针。
struct eventblock
{
    struct eventblock *next; //指向下一个block
    struct ntyevent *events; //当前block对应的event数组
};
3.1.3 ntyreactor

这是一个reactor结构体,绑定了epoll对象和eventblock空间,并且对eventblock的块数量进行了计数。
在整体设计中,先根据fd找到ntyreactor中对应的ntyevent空间,再设置ntyevent。
// reator使用的结构体
struct ntyreactor
{
    int epfd; // reatctor的fd
    int blkcnt;
    struct eventblock *evblk; // reactor管理的基础单元,现在是block,实现C1000K
};
3.2 事件处理器(回调函数)

3.2.1 accept_cb

这是listenfd的回调函数。在此函数中,会将创建的连接的socket设置为非阻塞状态。
int accept_cb(int fd, int events, void* arg)
{
    struct ntyreactor *reactor = (struct ntyreactor *)arg;
    if (reactor == NULL)
    {
        return -1;
    }
    struct sockaddr_in client_addr;
    socklen_t len = sizeof(client_addr);

    int clientfd;

    if ((clientfd = accept(fd, (struct sockaddr *)&client_addr, &len)) == -1)
    {
        if (errno != EAGAIN && errno != EINTR)
        {
            printf("accept: %s\n", strerror(errno));
            return -1;
        }
    }
    int flag = 0;
    if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0)
    {
        printf("%s: fcntl nonblocking failed, %d\n", __func__, MAX_EPOLL_EVENTS);
        return -1;
    }

    struct ntyevent *event = ntyreactor_idx(reactor, clientfd);

    nty_event_set(event, clientfd, recv_cb, reactor);
    nty_event_add(reactor->epfd, EPOLLIN, event);
    printf("new connect [%s:%d], pos[%d]\n",
           inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), clientfd);

    return 0;
}
3.2.2 recv_cb

这是接收数据事件的回调函数
int recv_cb(int fd, int events, void* arg)
{
    //外界传参获得的ntyreactor指针
    struct ntyreactor *reactor = (struct ntyreactor *)arg;
    //从内核的链表中取出当前的event
    struct ntyevent *ev = ntyreactor_idx(reactor, fd);
    //此次收到数据的长度
    int len = recv(fd, ev->buffer, BUFFER_LENGTH, 0);
    //收到就直接删除对应的fd,以免多次响应
    nty_event_del(reactor->epfd, ev);
    //正确接收,大于0
    if (len > 0)
    {
        ev->length = len;
        ev->buffer[len] = '\0';

        printf("C[%d]:%s\n", fd, ev->buffer);
        //收到以后直接发送回去
        nty_event_set(ev, fd, send_cb, reactor);
        nty_event_add(reactor->epfd, EPOLLOUT, ev);
    }
    else if (len == 0)
    {
        close(ev->fd);
        // printf("[fd=%d] pos[%ld], closed\n", fd, ev - reactor->events);
    }
    else
    {
        close(ev->fd);
        printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
    }
    return len;
}
3.2.3 send_cb

这是发送数据的回调函数
int send_cb(int fd, int events, void* arg)
{
    struct ntyreactor *reactor = (struct ntyreactor *)arg;
    struct ntyevent *ev = ntyreactor_idx(reactor, fd);
    //返回的是发送的字节数
    int len = send(fd, ev->buffer, ev->length, 0);
    //正确发送,删除发送fd,注册接收的fd
    if (len > 0)
    {
        printf("send[fd=%d], [%d]%s\n", fd, len, ev->buffer);
        nty_event_del(reactor->epfd, ev);
        nty_event_set(ev, fd, recv_cb, reactor);
        nty_event_add(reactor->epfd, EPOLLIN, ev);
    }
    else
    {
        close(ev->fd);
        nty_event_del(reactor->epfd, ev);
        printf("send[fd=%d] error %s\n", fd, strerror(errno));
    }
    return len;
}
3.3 reactor 组件与事件分发器

3.3.1 ntyreactor_idx

这个函数是根据传入的sockfd,在ntyreactor对象中找到对应的ntyevent空间。
struct ntyevent* ntyreactor_idx(struct ntyreactor* reactor, int sockfd)
{
    //获取当前fd对应events所在的blk
    int blkidx = sockfd / MAX_EPOLL_EVENTS;

    while (blkidx >= reactor->blkcnt)
    {
        ntyreactor_alloc(reactor);
    }

    int i = 0;
    struct eventblock *blk = reactor->evblk;
    while (i++ < blkidx && blk != NULL)
    {
        blk = blk->next;
    }

    return &blk->events[sockfd % MAX_EPOLL_EVENTS];
}
3.3.2 ntyreactor_alloc

这个函数是为ntyreactor申请ntyevent空间,当调用ntyreactor_idx()时,如果fd没有对应的ntyevent空间,会先调用本函数申请到对应的ntyevent空间。ntyreactor_alloc()申请空间是以eventblock为单位申请的。
int ntyreactor_alloc(struct ntyreactor* reactor)
{
    //空检查
    if (reactor == NULL)
        return -1;
    if (reactor->evblk == NULL)
        return -1;

    //查找当前block的末尾
    struct eventblock *blk = reactor->evblk;
    while (blk->next != NULL)
    {
        blk = blk->next;
    }

    //分配内存
    //分配event
    struct ntyevent *evs = (struct ntyevent *)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    if (evs == NULL)
    {
        printf("ntyreactor_alloc ntyevents failed\n");
        return -2;
    }
    memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    //分配block本身
    struct eventblock *block = (struct eventblock *)malloc(sizeof(struct eventblock));
    if (block == NULL)
    {
        printf("ntyreactor_alloc eventblock failed\n");
        return -2;
    }
    memset(block, 0, sizeof(struct eventblock));
    //加入链表末尾
    block->events = evs;
    block->next = NULL;

    blk->next = block;
    reactor->blkcnt++;
    return 0;
}
3.3.3 nty_event_set

这个函数是去设置ntyevent空间的。
void nty_event_set(struct ntyevent *ev, int fd, NCALLBACK callback, void *arg)
{
    ev->fd = fd;
    ev->callback = callback;
    ev->events = 0;
    ev->arg = arg;
    ev->last_active = time(NULL);

    return;
}
3.3.4 nty_event_add

这个函数是设置ntyevent的事件events并将ntyevent纳入epoll对象的监测。
在此函数中,会根据ntyevent对象的status状态判断,是调用EPOLL_CTL_ADD还是EPOLL_CTL_MOD。
int nty_event_add(int epfd, int events, struct ntyevent* ev)
{
    //使用的是linux内核里的epoll
    struct epoll_event ep_ev = {0, {0}};
    //这一步非常关键传到联合体data的ptr里的是ntyevent指针
    ep_ev.data.ptr = ev;
    ep_ev.events = ev->events = events;
    //判断该事件是否已经添加过
    int op;
    if (ev->status == 1)
    {
        op = EPOLL_CTL_MOD;
    }
    else
    {
        op = EPOLL_CTL_ADD;
        ev->status = 1;
    }
    // epoll_ctl进行对应操作
    if (epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0)
    {
        printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);
        return -1;
    }
    return 0;
}
3.3.5 nty_event_del

这个函数取消对ntyevent的事件检测。
int nty_event_del(int epfd, struct ntyevent* ev)
3.3.6 init_sock

在此函数中,创建对port端口绑定的服务器listenfd。
int init_sock(short port)
{
    int fd = socket(AF_INET, SOCK_STREAM, 0);
    fcntl(fd, F_SETFL, O_NONBLOCK);

    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(port);

    bind(fd, (struct sockaddr *)&server_addr, sizeof(server_addr));

    if (listen(fd, 20) < 0)
    {
        printf("listen failed : %s\n", strerror(errno));
    }

    return fd;
}
3.3.7 ntyreactor_init

这是ntyreactor对象的初始化函数,主要处理申请内存空间。
int ntyreactor_init(struct ntyreactor* reactor)
    if (reactor == NULL)
    {
        return -1;
    }
    memset(reactor, 0, sizeof(struct ntyreactor));

    //创建大房子
    reactor->epfd = epoll_create(1);
    if (reactor->epfd <= 0)
    {
        printf("create epfd in %s err %s\n", __func__, strerror(errno));
        return -2;
    }

    //申请events的空间
    struct ntyevent *evs = (struct ntyevent *)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    if (evs == NULL)
    {
        printf("ntyreactor_alloc ntyevents failed\n");
        return -2;
    }
    memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));

    struct eventblock *block = (struct eventblock *)malloc(sizeof(struct eventblock));
    if (block == NULL)
    {
        printf("ntyreactor_alloc eventblock failed\n");
        return -2;
    }
    memset(block, 0, sizeof(struct eventblock));

    block->events = evs;
    block->next = NULL;

    reactor->evblk = block;
    reactor->blkcnt = 1;

    return 0;
3.3.8 ntyreactor_destroy

销毁ntyreactor对象中申请的内存空间(eventblock),但不会销毁ntyreactor对象本身。
int ntyreactor_destroy(struct ntyreactor* reactor)
{
    close(reactor->epfd);
    //释放调之前malloc的所有资源
    struct eventblock *blk = reactor->evblk;
    struct eventblock *blk_next = NULL;

    while (blk != NULL)
    {

        blk_next = blk->next;

        free(blk->events);
        free(blk);

        blk = blk_next;
    }

    return 0;
}
3.3.9 ntyreactor_addlistener

将listenfd注册到ntyreactor对象上。
int ntyreactor_addlistener(struct ntyreactor* reactor, int sockfd, NCALLBACK* acceptor)
{
    if (reactor == NULL)
    {
        return -1;
    }
    if (reactor->evblk == NULL)
    {
        return -1;
    }
    // reactor->evblk->events[sockfd];
    struct ntyevent *event = ntyreactor_idx(reactor, sockfd);
    nty_event_set(event, sockfd, acceptor, reactor);
    nty_event_add(reactor->epfd, EPOLLIN, event);
    return 0;
}
3.3.10 ntyreactor_run 事件分发器

int ntyreactor_run(struct ntyreactor *reactor);
    if (reactor == NULL)
        return -1;
    if (reactor->epfd < 0)
        return -1;
    if (reactor->evblk == NULL)
        return -1;

    struct epoll_event events[MAX_EPOLL_EVENTS + 1];
    int checkpos = 0, i;

    while (1)
    {
        //从内核中把事务提取出来放到events里面
        int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);
        if (nready < 0)
        {
            printf("epoll_wait error, exit\n");
            continue;
        }

        //最最最关键的地方实现回调函数
        for (i = 0; i < nready; i++)
        {
            //这里data是一个union传过来的就只有之前放进去的指向ntyevent的指针
            struct ntyevent *ev = (struct ntyevent *)events.data.ptr;

            if ((events.events & EPOLLIN) && (ev->events & EPOLLIN))
            {
                ev->callback(ev->fd, events.events, ev->arg);
            }
            if ((events.events & EPOLLOUT) && (ev->events & EPOLLOUT))
            {
                ev->callback(ev->fd, events.events, ev->arg);
            }
        }
    }
3.4 主函数

int main(int argc, char *argv[])
{
    //默认端口后为全局变量
    unsigned short port = SERVER_PORT;
    //如果传了参就按传的参数来
    if (argc == 2)
    {
        port = atoi(argv[1]);
    }
    struct ntyreactor *reactor = (struct ntyreactor *)malloc(sizeof(struct ntyreactor));
    ntyreactor_init(reactor);

    int i = 0;
    int sockfds[PORT_COUNT] = {0};
    for (i = 0; i < PORT_COUNT; i++)
    {
        sockfds = init_sock(port + i);
        ntyreactor_addlistener(reactor, sockfds, accept_cb);
    }

    ntyreactor_run(reactor);

    ntyreactor_destroy(reactor);
    for (i = 0; i < PORT_COUNT; i++)
    {
        close(sockfds);
    }
    free(reactor);
    return 0;
}
3.5 修改系统参数

3.5.1 修改进程最大fd数量(ulimit)

3.5.1.1 永久修改

sudo vim /etc/security/limits.conf  
最后加上4行:
*               soft    core            unlimited  
*               soft    stack           12000  
*               soft    nofile          2000000  
*               hard    nofile          2000000  
参数解释
几个值是2000000的参数都是保证理论最大连接数能达到200万
stack 设置栈的最大容量,单位是KB。200万连接分摊给2个client,1个client是100万,epoll_event结构体占用12字节,12x100万=1200万=12000KB,所以设置stack为12000
3.5.1.2 临时修改:

ulimit -n 修改的值
3.5.2 server sysctl设置

3.5.3 client sysctl设置

Reactor的应用

上面设计的reactor是单线程模式的,除此之外,reactor模式还有多线程和多进程。
1. 单线程Reactor



redis使用的就是这种单线程模型,在6.0版本之后支持IO多线程。
2. 多线程Reactor



多线程的Reactor模型引入线程池的概念,将一些耗时的操作交由新线程执行。
memcached使用的就是多线程的Reactor模型。
3. 多进程Reactor



每个进程都有reactor对象。nginx使用的是多进程的Reactor模型。同时,nginx使用ET模式,因为nginx是做反向代理,做转发数据工作的,不需要界定数据包。
转载自:CSDN博主「何蔚」 原文链接:https://blog.csdn.net/weixin_43687811/article/details/122611756
C++后台服务器开发/架构师面试题、学习资料、教学视频和学习路线图有需要的可以自行添加学习交流群973961276 获取
C++后台开发该学哪些内容,标准技术路线(含推荐书籍与项目)
C++后台服务器开发学习公开课:https://ke.qq.com/course/417774?flowToken=1031343

回复

举报 使用道具

您需要登录后才可以回帖 登录 | 立即注册
快速回复 返回顶部 返回列表