Nginx线程池
介绍
nginx 一直采用多进程和 I/O 多路复用模型,在大部分场景下都有优秀的性能表现,但有时候耗时的阻塞操作会对性能产生一定的影响,例如从磁盘中读写大文件。因此在 1.7.11
版本 nginx 引入了线程池模块,用于处理耗时的阻塞操作,目前主要是文件读写、sendfile
。具体的应用场景和性能分析可以参考以下文章:
Nginx 线程池实现
- 使用单链表实现队列结构,有一个等待队列和完成队列分别用于存放还没执行的任务和执行完毕的任务,之所以设计了一个完成队列是为了支持当任务完成后执行相应的回调函数
- 数据结构内存分配使用内存池
- 使用自旋锁优化并发操作性能,在临界区比较短的时候使用自旋锁可以得到比互斥锁更优的性能,在线程池中主要用在操作完成队列的时候
- 子线程
deatch
主线程不用join
,线程池销毁通过向队列中提交任务完成
Nginx 线程池数据结构
线程任务结构
- 每一个线程任务都拥有一个
id
用于标识 - 由于使用链表实现任务独立,因此每个任务拥有指向下一个任务的指针
next
- 任务所要执行的函数用
handler
表示,其中函数的入参用ctx
表示 - 最后一个
event
用于描述任务的类型等,还能设置事件回调,当任务完成后调用struct ngx_thread_task_s { ngx_thread_task_t *next; ngx_uint_t id; void *ctx; void (*handler)(void *data, ngx_log_t *log); ngx_event_t event; };
线程池任务队列结构
- 使用一个
first
一级指针指向队列头节点 - 二级指针
last
指向指针变量的地址,这个指针变量为最后一个节点的next
指针变量地址 - 这样设计的好处的不用创建哨兵节点,且可以实现
O(1)
时间复杂度往队尾插入节点,只需要通过last
改变指针变量所指向节点的地址为新节点地址即可 - 初始时
first
由于指向空,故last
指向first
的地址,新增节点task
只需要两步操作:*last = task; task = &task->next;
typedef struct { ngx_thread_task_t *first; ngx_thread_task_t **last; } ngx_thread_pool_queue_t; #define ngx_thread_pool_queue_init(q) \ (q)->first = NULL; \ (q)->last = &(q)->first
线程池结构
- 互斥锁
mtx
用于安全从任务队列queue
中取任务,互斥锁带错误检查,可以防止在同一个线程获取锁两次而造成死锁的问题 - 当前任务队列中的任务数量用
waiting
表示,其不能超过max_queue
最大队列大小限制 - 当新增一个任务时,信号量
cond
用于通知工作线程去抢夺任务 - 线程池的名字用
name
标识,默认为default
- 线程池中的线程池数量用
threads
表示struct ngx_thread_pool_s { ngx_thread_mutex_t mtx; ngx_thread_pool_queue_t queue; ngx_int_t waiting; ngx_thread_cond_t cond; ngx_log_t *log; ngx_str_t name; ngx_uint_t threads; ngx_int_t max_queue; u_char *file; ngx_uint_t line; };
Nginx 线程池操作
线程池初始化
- 初始化任务队列和线程,将线程属性设置为分离(
detach
) - 创建指定数量线程去执行周期任务
ngx_thread_pool_cycle
static ngx_int_t ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool) { int err; pthread_t tid; ngx_uint_t n; pthread_attr_t attr; if (ngx_notify == NULL) { ngx_log_error(NGX_LOG_ALERT, log, 0, "the configured event method cannot be used with thread pools"); return NGX_ERROR; } ngx_thread_pool_queue_init(&tp->queue); if (ngx_thread_mutex_create(&tp->mtx, log) != NGX_OK) { return NGX_ERROR; } if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) { (void) ngx_thread_mutex_destroy(&tp->mtx, log); return NGX_ERROR; } tp->log = log; err = pthread_attr_init(&attr); if (err) { ngx_log_error(NGX_LOG_ALERT, log, err, "pthread_attr_init() failed"); return NGX_ERROR; } err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); if (err) { ngx_log_error(NGX_LOG_ALERT, log, err, "pthread_attr_setdetachstate() failed"); return NGX_ERROR; } #if 0 err = pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN); if (err) { ngx_log_error(NGX_LOG_ALERT, log, err, "pthread_attr_setstacksize() failed"); return NGX_ERROR; } #endif for (n = 0; n < tp->threads; n++) { err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp); if (err) { ngx_log_error(NGX_LOG_ALERT, log, err, "pthread_create() failed"); return NGX_ERROR; } } (void) pthread_attr_destroy(&attr); return NGX_OK; }
线程周期任务
- 为了防止信号使线程异常终止,将
SIGILL
、SIGFPE
、SIGSEGV
和SIGBUS
之外的所有信号都屏蔽 - 对于每个工作线程来说,首先尝试获取任务队列互斥锁,然后将等待的任务数量减一(由于队列可能为空,故这里有可能为负数)
- 然后使用条件变量等待任务队列不为空,如果等待操作报错则解锁后直接退出,同时为了避免虚假唤醒,判断条件使用
while
而不是if
- 取出队列头部元素(移动
first
指针),若队列为空,需要重设last
指针变量(重要细节) - 解锁后再执行任务(常见优化方式,执行任务时不需要持有锁)
- 获取自旋锁,然后将任务添加到完成队列中,最后解锁即可,这里有个细节,在解锁操作前加了一个内存屏障,这是因为解锁操作实际上是一个原子变量赋值运算宏,为了防止
cpu
指令重排将赋值操作放到完成队列操作之前,需要加上内存屏障,保证解锁操作在完成队列操作之后 - 通知事件循环去执行任务完成后的回调函数
static void * ngx_thread_pool_cycle(void *data) { ngx_thread_pool_t *tp = data; int err; sigset_t set; ngx_thread_task_t *task; #if 0 ngx_time_update(); #endif ngx_log_debug1(NGX_LOG_DEBUG_CORE, tp->log, 0, "thread in pool \"%V\" started", &tp->name); sigfillset(&set); sigdelset(&set, SIGILL); sigdelset(&set, SIGFPE); sigdelset(&set, SIGSEGV); sigdelset(&set, SIGBUS); err = pthread_sigmask(SIG_BLOCK, &set, NULL); if (err) { ngx_log_error(NGX_LOG_ALERT, tp->log, err, "pthread_sigmask() failed"); return NULL; } for ( ;; ) { if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) { return NULL; } /* the number may become negative */ tp->waiting--; while (tp->queue.first == NULL) { if (ngx_thread_cond_wait(&tp->cond, &tp->mtx, tp->log) != NGX_OK) { (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log); return NULL; } } task = tp->queue.first; tp->queue.first = task->next; if (tp->queue.first == NULL) { tp->queue.last = &tp->queue.first; } if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) { return NULL; } #if 0 ngx_time_update(); #endif ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0, "run task #%ui in thread pool \"%V\"", task->id, &tp->name); task->handler(task->ctx, tp->log); ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0, "complete task #%ui in thread pool \"%V\"", task->id, &tp->name); task->next = NULL; ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048); *ngx_thread_pool_done.last = task; ngx_thread_pool_done.last = &task->next; ngx_memory_barrier(); ngx_unlock(&ngx_thread_pool_done_lock); (void) ngx_notify(ngx_thread_pool_handler); } }
任务完成事件回调
- 将完成队列的任务都取出来,然后重置完成队列(自旋锁临界区完成)
- 遍历队列,将事件置为完成状态且任务不活跃(不是正在执行或处于等待队列中)
- 执行事件回调函数
static void ngx_thread_pool_handler(ngx_event_t *ev) { ngx_event_t *event; ngx_thread_task_t *task; ngx_log_debug0(NGX_LOG_DEBUG_CORE, ev->log, 0, "thread pool handler"); ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048); task = ngx_thread_pool_done.first; ngx_thread_pool_done.first = NULL; ngx_thread_pool_done.last = &ngx_thread_pool_done.first; ngx_memory_barrier(); ngx_unlock(&ngx_thread_pool_done_lock); while (task) { ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0, "run completion handler for task #%ui", task->id); event = &task->event; task = task->next; event->complete = 1; event->active = 0; event->handler(event); } }
线程池销毁
- 向线程池中推送一个终止当前线程的任务,同时传入一个变量
lock
标识线程终止任务是否被执行(0:执行完毕,1:未执行) - 主线程不能使用
join
等待线程结束,因为线程的detach
的,因此用while
循环轮询检查标志变量,为了缓解cpu
轮询产生的开销,每次循环都会重新调度一下线程 - 任务完成后将
active
重置为0
复用,因为执行这个任务后线程就会退出,不会进入完成队列执行事件回调将active
状态更新为0
,所以需要手动更新 - 销毁线程池的条件变量和互斥锁
static void ngx_thread_pool_destroy(ngx_thread_pool_t *tp) { ngx_uint_t n; ngx_thread_task_t task; volatile ngx_uint_t lock; ngx_memzero(&task, sizeof(ngx_thread_task_t)); task.handler = ngx_thread_pool_exit_handler; task.ctx = (void *) &lock; for (n = 0; n < tp->threads; n++) { lock = 1; if (ngx_thread_task_post(tp, &task) != NGX_OK) { return; } while (lock) { ngx_sched_yield(); } task.event.active = 0; } (void) ngx_thread_cond_destroy(&tp->cond, tp->log); (void) ngx_thread_mutex_destroy(&tp->mtx, tp->log); }
线程终止函数
- 将传入的标志变量更新为
0
,表示此任务被执行 - 使用
pthread_exit
使当前线程退出(注意与return
的区别)static void ngx_thread_pool_exit_handler(void *data, ngx_log_t *log) { ngx_uint_t *lock = data; *lock = 0; pthread_exit(0); }
创建线程池任务结构
- 使用内存池分配内存,
size
为存储上下文ctx
所需的内存大小,即线程任务的入参(这部分内存无需用户分配)ngx_thread_task_t * ngx_thread_task_alloc(ngx_pool_t *pool, size_t size) { ngx_thread_task_t *task; task = ngx_pcalloc(pool, sizeof(ngx_thread_task_t) + size); if (task == NULL) { return NULL; } task->ctx = task + 1; return task; }
为线程池添加任务
- 已经处于活跃状态(
active
为1
)的任务不允许添加 - 获取任务队列互斥锁,判断当前等待执行的任务数是否小于最大数量限制,如果超限则释放锁并返回错误
- 将任务置为活跃状态,为其分配任务 id,初始时
next
为空 - 唤醒一个等待的工作线程并将任务添加到队列中,等待任务数
waiting
增加,最后释放互斥锁(这里唤醒和添加到队列的操作先后关系无所谓,因为当前线程持有互斥锁,唤醒的线程需要阻塞等待锁释放)ngx_int_t ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task) { if (task->event.active) { ngx_log_error(NGX_LOG_ALERT, tp->log, 0, "task #%ui already active", task->id); return NGX_ERROR; } if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) { return NGX_ERROR; } if (tp->waiting >= tp->max_queue) { (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log); ngx_log_error(NGX_LOG_ERR, tp->log, 0, "thread pool \"%V\" queue overflow: %i tasks waiting", &tp->name, tp->waiting); return NGX_ERROR; } task->event.active = 1; task->id = ngx_thread_pool_task_id++; task->next = NULL; if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) { (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log); return NGX_ERROR; } *tp->queue.last = task; tp->queue.last = &task->next; tp->waiting++; (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log); ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0, "task #%ui added to thread pool \"%V\"", task->id, &tp->name); return NGX_OK; }
Nginx 线程池模块运作
nginx 可以为多个 location
配置各自的线程池,默认的线程池名字为 default
,线程数 32
,最大队列大小为 65536
。可以根据需要自己添加线程池,即一个 worker
进程可以有多个线程池,因此 nginx 需要读取配置,然后初始化线程池数组。然后模块加载时依次初始化这些线程池,卸载时停止这些线程池。
线程池数组
- 使用 nginx 内置的数组,支持动态扩容且使用内存池分配
typedef struct { ngx_array_t pools; } ngx_thread_pool_conf_t;
定义模块加载时的指令和上下文
- 设置创建和初始化线程池数组的方法
thread_pool
为配置标识,解析的时候用于标志,如thread_pool default threads=32 max_queue=65536
static ngx_command_t ngx_thread_pool_commands[] = { { ngx_string("thread_pool"), NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE23, ngx_thread_pool, 0, 0, NULL }, ngx_null_command }; static ngx_core_module_t ngx_thread_pool_module_ctx = { ngx_string("thread_pool"), ngx_thread_pool_create_conf, ngx_thread_pool_init_conf };
定义模块工作流
- 指定了模块上下文、指令以及
worker
进程初始化和退出时要执行的操作ngx_module_t ngx_thread_pool_module = { NGX_MODULE_V1, &ngx_thread_pool_module_ctx, /* module context */ ngx_thread_pool_commands, /* module directives */ NGX_CORE_MODULE, /* module type */ NULL, /* init master */ NULL, /* init module */ ngx_thread_pool_init_worker, /* init process */ NULL, /* init thread */ NULL, /* exit thread */ ngx_thread_pool_exit_worker, /* exit process */ NULL, /* exit master */ NGX_MODULE_V1_PADDING };
解析配置
- 取配置参数数组的第一项的
value[1]
(即线程池名字),将其添加到线程池数组中 - 如果线程数存在,说明之前添加过这个线程池了(新加的线程池线程池为
0
) - 设置默认队列大小为
65536
,这样如果用户没有设置,就使用默认的队列大小 - 从第三个参数开始遍历,即线程数和队列大小,如果没有指定线程数或者线程数不正确则报错
static char * ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { ngx_str_t *value; ngx_uint_t i; ngx_thread_pool_t *tp; value = cf->args->elts; tp = ngx_thread_pool_add(cf, &value[1]); if (tp == NULL) { return NGX_CONF_ERROR; } if (tp->threads) { ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "duplicate thread pool \"%V\"", &tp->name); return NGX_CONF_ERROR; } tp->max_queue = 65536; for (i = 2; i < cf->args->nelts; i++) { if (ngx_strncmp(value[i].data, "threads=", 8) == 0) { tp->threads = ngx_atoi(value[i].data + 8, value[i].len - 8); if (tp->threads == (ngx_uint_t) NGX_ERROR || tp->threads == 0) { ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "invalid threads value \"%V\"", &value[i]); return NGX_CONF_ERROR; } continue; } if (ngx_strncmp(value[i].data, "max_queue=", 10) == 0) { tp->max_queue = ngx_atoi(value[i].data + 10, value[i].len - 10); if (tp->max_queue == NGX_ERROR) { ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "invalid max_queue value \"%V\"", &value[i]); return NGX_CONF_ERROR; } continue; } } if (tp->threads == 0) { ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "\"%V\" must have \"threads\" parameter", &cmd->name); return NGX_CONF_ERROR; } return NGX_CONF_OK; }
根据线程池名添加线程池
- 如果线程池已存在则直接返回,否则新分配一个线程池并加入到线程池数组中
ngx_thread_pool_t * ngx_thread_pool_add(ngx_conf_t *cf, ngx_str_t *name) { ngx_thread_pool_t *tp, **tpp; ngx_thread_pool_conf_t *tcf; if (name == NULL) { name = &ngx_thread_pool_default; } tp = ngx_thread_pool_get(cf->cycle, name); if (tp) { return tp; } tp = ngx_pcalloc(cf->pool, sizeof(ngx_thread_pool_t)); if (tp == NULL) { return NULL; } tp->name = *name; tp->file = cf->conf_file->file.name.data; tp->line = cf->conf_file->line; tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cf->cycle->conf_ctx, ngx_thread_pool_module); tpp = ngx_array_push(&tcf->pools); if (tpp == NULL) { return NULL; } *tpp = tp; return tp; }
根据线程池名获取线程池
- 遍历线程池数组,检查有无和名字匹配的线程池
ngx_thread_pool_t * ngx_thread_pool_get(ngx_cycle_t *cycle, ngx_str_t *name) { ngx_uint_t i; ngx_thread_pool_t **tpp; ngx_thread_pool_conf_t *tcf; tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_thread_pool_module); tpp = tcf->pools.elts; for (i = 0; i < tcf->pools.nelts; i++) { if (tpp[i]->name.len == name->len && ngx_strncmp(tpp[i]->name.data, name->data, name->len) == 0) { return tpp[i]; } } return NULL; }
创建线程池配置
- 数组的大小初始化为
4
,每个元素为线程池指针static void * ngx_thread_pool_create_conf(ngx_cycle_t *cycle) { ngx_thread_pool_conf_t *tcf; tcf = ngx_pcalloc(cycle->pool, sizeof(ngx_thread_pool_conf_t)); if (tcf == NULL) { return NULL; } if (ngx_array_init(&tcf->pools, cycle->pool, 4, sizeof(ngx_thread_pool_t *)) != NGX_OK) { return NULL; } return tcf; }
初始化线程池配置
- 遍历线程池数组,已经设置过具体配置的线程池无需配置,未设置过的线程池如果名字和默认名字相同,则使用默认配置
static char * ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf) { ngx_thread_pool_conf_t *tcf = conf; ngx_uint_t i; ngx_thread_pool_t **tpp; tpp = tcf->pools.elts; for (i = 0; i < tcf->pools.nelts; i++) { if (tpp[i]->threads) { continue; } if (tpp[i]->name.len == ngx_thread_pool_default.len && ngx_strncmp(tpp[i]->name.data, ngx_thread_pool_default.data, ngx_thread_pool_default.len) == 0) { tpp[i]->threads = 32; tpp[i]->max_queue = 65536; continue; } ngx_log_error(NGX_LOG_EMERG, cycle->log, 0, "unknown thread pool \"%V\" in %s:%ui", &tpp[i]->name, tpp[i]->file, tpp[i]->line); return NGX_CONF_ERROR; } return NGX_CONF_OK; }
工作进程初始化
- 如果不是
worker
进程或多进程则不使用线程池模块工作 - 解析配置文件得到线程池数组
- 初始化线程池完成队列
- 根据配置初始化线程池
static ngx_int_t ngx_thread_pool_init_worker(ngx_cycle_t *cycle) { ngx_uint_t i; ngx_thread_pool_t **tpp; ngx_thread_pool_conf_t *tcf; if (ngx_process != NGX_PROCESS_WORKER && ngx_process != NGX_PROCESS_SINGLE) { return NGX_OK; } tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_thread_pool_module); if (tcf == NULL) { return NGX_OK; } ngx_thread_pool_queue_init(&ngx_thread_pool_done); tpp = tcf->pools.elts; for (i = 0; i < tcf->pools.nelts; i++) { if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) { return NGX_ERROR; } } return NGX_OK; }
工作进程终止
- 遍历线程池数组并依次销毁
static void ngx_thread_pool_exit_worker(ngx_cycle_t *cycle) { ngx_uint_t i; ngx_thread_pool_t **tpp; ngx_thread_pool_conf_t *tcf; if (ngx_process != NGX_PROCESS_WORKER && ngx_process != NGX_PROCESS_SINGLE) { return; } tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_thread_pool_module); if (tcf == NULL) { return; } tpp = tcf->pools.elts; for (i = 0; i < tcf->pools.nelts; i++) { ngx_thread_pool_destroy(tpp[i]); } }
All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.