惊群问题

惊群效应是指多进程(多线程)在同时阻塞等待同一个事件的时候(休眠状态),如果等待的这个事件发生,那么它就会唤醒等待的所有进程(线程),但最终只能有一个进程(线程)获得这个事件的 “控制权”,对该事件进行处理,而其他进程(线程)获取 “控制权” 失败,只能重新进入休眠状态,进程(线程)频繁切换带来了一定的开销,这种现象和性能浪费就叫惊群效应。

结论

accpet 惊群

以多进程为例,主进程 bindlisten 端口之后,fork 出的子进程对同一个监听 fd 调用 accept 阻塞式获取连接,当一个连接到来后,所有子进程都会被唤醒,但是只有一个进程可以获取连接,其他进程的 accept 返回 -1。在 Linux 2.6 版本后内核通过增加 WQ_FLAG_EXCLUSIVE 解决了 accept 惊群问题。

epoll 惊群

多进程(多线程)共用一个 epoll fd

以多进程为例,epoll fd 在主进程创建,子进程共同对其进行 epoll_wait 来获取事件。这种情况下,引发 epoll 惊群的原因与 accept 类似,当有事件发生时,等待同一个事件描述符的所有进程(线程)都被唤醒,解决思路与 accept 一致,此种情况下的惊群问题已经解决

多进程(多线程)有各自的 epoll fd

以多线程为例,主线程 bindlisten 端口之后,在子线程中创建 epoll fd 并将监听 fd 加入到 epoll 事件红黑树中,各线程用自己的 epoll fd 调用 epoll_wait 等待事件,注意使用 I/O 多路复用,添加的描述符需要设置为非阻塞的。
以下是一个实例,复现多线程场景下的 epoll 惊群:

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <string.h>
#include <chrono>
#include <thread>

#define PORT 8060
#define NUM_THREADS 5
#define MAX_EVENTS 10

int setnonblocking(int fd) {
    int old_option = fcntl(fd, F_GETFL);
    int new_option = old_option | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_option);
    return old_option;
}

void* epoll_thread(void* arg) {
    int epoll_fd = epoll_create1(0);
    if (epoll_fd == -1) {
        perror("epoll_create1");
        exit(EXIT_FAILURE);
    }

    int server_socket = *((int*)arg);
    struct epoll_event event;
    
    event.events = EPOLLIN;
    event.data.fd = server_socket;

    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_socket, &event) == -1) {
        perror("epoll_ctl");
        exit(EXIT_FAILURE);
    }

    struct epoll_event events[MAX_EVENTS];

    printf("thread id : %ld\n", pthread_self());

    while (1) {
        int num_events = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        if (num_events == -1) {
            printf("epoll_wait");
            continue;
        }
        // 为了使复现更容易, 解除阻塞后当前线程等待 1s
        std::this_thread::sleep_for(std::chrono::seconds(1));
        printf("worker %ld\n", pthread_self());
        for (int i = 0; i < num_events; i++) {
            if (events[i].data.fd == server_socket) {
                int client_socket = accept(server_socket, NULL, NULL);
                if (client_socket <= 0) {
                    printf("failed to accept in thread %ld\n", pthread_self());
                    continue;
                }
                printf("Accepted connection in thread %ld\n", pthread_self());
                close(client_socket);
            }
        }
    }

    close(epoll_fd);
    return NULL;
}

int main() {
    int server_socket;
    struct sockaddr_in server_addr;
    pthread_t threads[NUM_THREADS];

    // 创建套接字并绑定到指定端口
    server_socket = socket(AF_INET, SOCK_STREAM, 0);
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(PORT);
    server_addr.sin_addr.s_addr = INADDR_ANY;

    if (bind(server_socket, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
        perror("Bind failed");
        exit(EXIT_FAILURE);
    }

    listen(server_socket, 5);
    printf("Listening on port %d\n", PORT);

    // 监听描述符设置为非阻塞, 即 accept 调用非阻塞
    setnonblocking(server_socket);

    // 创建多个线程
    for (int i = 0; i < NUM_THREADS; i++) {
        pthread_create(&threads[i], NULL, epoll_thread, &server_socket);
    }

    // 等待线程结束
    for (int i = 0; i < NUM_THREADS; i++) {
        pthread_join(threads[i], NULL);
    }

    close(server_socket);
    return 0;
}

使用 nc 工具进行测试,得到结果如下:

# (step1) 窗口 1 编译运行程序, 监听 8060 端口
$ g++ multi_thread.cpp -lpthread
$ ./a.out
Listening on port 8060
thread id : 139865397454592
thread id : 139865389061888
thread id : 139865372276480
thread id : 139865380669184
thread id : 139865363883776

# (step2) 窗口 2 使用 nc 工具连接本地 8060 端口
$ nc 127.0.0.1 8060

# (result) 窗口 1 可以看到所有线程都从 epoll_wait 调用唤醒, 对同一个
# 监听描述符调用 accept, 只有一个线程获取连接成功
worker 139865363883776
Accepted connection in thread 139865363883776
worker 139865380669184
failed to accept in thread 139865380669184
worker 139865397454592
failed to accept in thread 139865397454592
worker 139865372276480
failed to accept in thread 139865372276480
worker 139865389061888
failed to accept in thread 139865389061888

对于这种情况下的惊群效应,常见的解决方案有三种:

  1. SO_REUSEPORT
  2. EPOLLEXCLUSIVE
  3. 通过锁机制解决(Nginx 默认的解决方案)

SO_REUSEPORT

SO_REUSEPORT 是一种套接字选项,在 Linux 3.9 引入,通常用于 TCP 和 UDP 套接字,用于实现端口复用。这个选项的主要作用是允许多个套接字绑定到同一个 IP 地址和端口上,以便多个进程或线程可以同时监听同一个端口,而不会导致端口冲突或竞争条件。以下是 SO_REUSEPORT 选项的主要用途和好处:

  1. 负载均衡:SO_REUSEPORT 允许多个进程或线程同时监听相同的 IP 地址和端口,这在负载均衡方面非常有用。多个套接字可以分担传入连接的负载,从而提高系统的性能和可伸缩性。
  2. 高可用性:通过允许多个进程或线程监听相同的端口,SO_REUSEPORT 可以提高系统的可用性。如果一个进程或线程失败,其他套接字仍然可以接受连接,从而保持服务的可用性。
  3. 避免端口耗尽:在一些高负载场景下,可能会迅速耗尽可用的端口。SO_REUSEPORT 允许多个套接字绑定到相同的端口,延长了端口的可用寿命,减少了端口耗尽的风险。
  4. 并发性:SO_REUSEPORT 使多个进程或线程能够并发地监听相同的端口,而不会出现资源竞争问题,从而提高了并发性和性能。

使用上,主要是在 bind 之前设置套接字的选项,如下:

// 设置SO_REUSEPORT选项
int opt = 1;
if (setsockopt(server_sock, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) == -1) {
    perror("setsockopt");
    close(server_sock);
    exit(1);
}

SO_REUSEPORT 选项可以减轻惊群问题,但并不是一个完全的解决方案。在 Nginx 1.9.1 中添加了对此选项的支持,通过在 listen 监听的端口后面添加 reuseport 参数指定使用 SO_REUSEPORT 选项。此参数指示为每个工作进程创建一个单独的侦听套接字,允许内核在工作进程之间分发传入连接,开启了此选项就不会使用 Nginx 默认的锁机制了。关于此参数的详细分析,见文章 Socket Sharding in NGINX OSS Release 1.9.1

EPOLLEXCLUSIVE

EPOLLEXCLUSIVE 是一种 epoll 事件标志,在 Linux 4.5 引入,它可以用于增强 epoll 在并发编程中的控制。EPOLLEXCLUSIVE 标志的主要作用是将某个文件描述符(或套接字)标记为 “独占模式”,以确保只有一个线程或进程可以处理它的事件。这可以用于解决惊群问题或避免多个线程或进程同时处理同一个事件。
具体来说,EPOLLEXCLUSIVE 标志的作用如下:

  1. 避免惊群问题:在使用 epoll 时,通常多个线程或进程可以同时等待相同的文件描述符上的事件,当事件发生时,多个线程或进程都会被唤醒,然后竞争处理事件。这可能导致不必要的竞争和资源浪费。使用 EPOLLEXCLUSIVE 标志可以确保只有一个线程或进程能够处理事件,从而避免了惊群问题。
  2. 提高效率:在某些情况下,如果多个线程或进程都试图处理相同的事件,可能会导致性能下降,因为它们会相互竞争。使用 EPOLLEXCLUSIVE 可以避免这种竞争,提高效率。

使用 EPOLLEXCLUSIVE 标志时,需要在调用 epoll_ctl 函数注册文件描述符时将其设置为标志之一。例如:

struct epoll_event event;
event.events = EPOLLIN | EPOLLEXCLUSIVE; // 设置EPOLLEXCLUSIVE标志
event.data.fd = sockfd; // 套接字文件描述符

if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &event) == -1) {
    perror("epoll_ctl");
    exit(1);
}

accept_mutex

Nginx 默认通过锁机制来解决惊群问题,通过在 event 块配置 accept_mutex 指令来决定是否开启,默认是开启的。在 master 进程初始化的过程中,会判断这个指令是否开启来给全局标志赋值,以便在 worker 进程中进行判断:

static ngx_int_t
ngx_event_process_init(ngx_cycle_t *cycle)
{
    ngx_uint_t           m, i;
    ngx_event_t         *rev, *wev;
    ngx_listening_t     *ls;
    ngx_connection_t    *c, *next, *old;
    ngx_core_conf_t     *ccf;
    ngx_event_conf_t    *ecf;
    ngx_event_module_t  *module;

    ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
    ecf = ngx_event_get_conf(cycle->conf_ctx, ngx_event_core_module);

    if (ccf->master && ccf->worker_processes > 1 && ecf->accept_mutex) {
        ngx_use_accept_mutex = 1;
        ngx_accept_mutex_held = 0;
        ngx_accept_mutex_delay = ecf->accept_mutex_delay;

    } else {
        ngx_use_accept_mutex = 0;
    }

    ...
}

可见,当开启多进程模式(worker 进程数大于 1)且 accept_mutex 参数开启时,将 ngx_use_accept_mutex 标志设为 1 表示使用锁机制,ngx_accept_mutex_held 变量表示是否获取锁,初始为 0 表示未获取,ngx_accept_mutex_delay 表示 epoll_wait 调用的超时时间,超时更快,那么也就更频繁地从阻塞中跳出来,也就有更多的机会去争抢到锁,可以通过 accept_mutex_delay 指令设置,默认为 500 毫秒。
为了更好地负载均衡,Nginx 的 worker 进程只有当空闲连接数不低于最大连接数的 1/8 时才会尝试获取锁,在 accept 处理函数中设置这个值,用 ngx_accept_disabled 全局变量表示:

void
ngx_event_accept(ngx_event_t *ev)
{
    ...

	ngx_accept_disabled = ngx_cycle->connection_n / 8
                              - ngx_cycle->free_connection_n;

    ...
}

在 worker 进程的事件处理函数中,判断是否启用锁机制,如果 ngx_accept_disabled 大于 0 表示当前进程超负荷,将其值自减让出这次竞争机会,否则就尝试获取锁,如果获取到了设置 NGX_POST_EVENT 表示后续由当前进程来处理新的连接。

void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
	...
    
    if (ngx_use_accept_mutex) {
        if (ngx_accept_disabled > 0) {
            ngx_accept_disabled--;

        } else {
            if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
                return;
            }

            if (ngx_accept_mutex_held) {
                flags |= NGX_POST_EVENTS;

            } else {
                if (timer == NGX_TIMER_INFINITE
                    || timer > ngx_accept_mutex_delay)
                {
                    timer = ngx_accept_mutex_delay;
                }
            }
        }
    }

    if (!ngx_queue_empty(&ngx_posted_next_events)) {
        ngx_event_move_posted_next(cycle);
        timer = 0;
    }

    delta = ngx_current_msec;

    (void) ngx_process_events(cycle, timer, flags);

    delta = ngx_current_msec - delta;

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "timer delta: %M", delta);

    ngx_event_process_posted(cycle, &ngx_posted_accept_events);

    if (ngx_accept_mutex_held) {
        ngx_shmtx_unlock(&ngx_accept_mutex);
    }

    ngx_event_expire_timers();

    ngx_event_process_posted(cycle, &ngx_posted_events);
}

Nginx 设计了两个队列,ngx_posted_accept_events 存放新连接事件,ngx_posted_events 存放普通事件,对于设置了 NGX_POST_EVENTS 标志的事件,如果是 accept 事件就将其添加到 ngx_posted_accept_events 队列中,否则添加到 ngx_posted_events 队列中,这是为了尽可能减少锁持有的时间,将事件统一在释放锁后进行处理。若没有设置 NGX_POST_EVENTS 如未开启锁机制,则直接调用 accept 处理函数,这里的 handler 对于 epoll 来说就是 ngx_event_accept 处理函数,它接受连接并将其包装成 ngx_connection_t 结构。

static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
    ...
    
    for (i = 0; i < events; i++) {
    	...
        
        if ((revents & EPOLLIN) && rev->active) {
            ...

            if (flags & NGX_POST_EVENTS) {
                queue = rev->accept ? &ngx_posted_accept_events
                                    : &ngx_posted_events;

                ngx_post_event(rev, queue);

            } else {
                rev->handler(rev);
            }
        }

        ...
    }

    return NGX_OK;
}