Nginx互斥锁
基于原子操作、信号量以及文件锁,Nginx 在更高层次封装了一个互斥锁,当不支持原子操作时,会使用文件锁来实现,支持原子操作却又不支持信号量时,使用自旋锁的方式实现,支持信号量时使用信号量的方式实现。
当 Nginx 判断当前操作系统支持原子变量时,将会优先使用原子变量实现的方法(即原子变量锁的优先级高于文件锁)。不过,同时还需要判断其是否支持信号量,因为支持信号量后进程有可能进入睡眠状态。
注意:文件锁只能用于多进程直接的互斥,因为一个进程只能持有一个文件锁,因此文件锁不适用于多线程。
fcntl 文档
sem_init 文档
flock 文档
Nginx 互斥锁结构
- 支持原子操作(定义了 NGX_HAVE_ATOMIC_OPS 宏)
lock
为原子变量锁- 支持信号量时(定义了 NGX_HAVE_POSIX_SEM 宏)
wait
表示等待进程的数量semaphore
表示是否使用信号量sem
为信号量锁
spin
大于 0 时表示自旋等待其他处理器的时间,如果为 0 或负值则不存在pause
的机会,在 Nginx 中当spin
值为(ngx_uint_t) -1
时,相当于告诉这个互斥锁绝对不要使用信号量使得进程进入睡眠状态
- 不支持原子操作
- 使用文件锁时
fd
表示使用的文件句柄 name
表示文件名spin
在文件锁实现中无意义
- 使用文件锁时
typedef struct {
#if (NGX_HAVE_ATOMIC_OPS)
ngx_atomic_t *lock;
#if (NGX_HAVE_POSIX_SEM)
ngx_atomic_t *wait;
ngx_uint_t semaphore;
sem_t sem;
#endif
#else
ngx_fd_t fd;
u_char *name;
#endif
ngx_uint_t spin;
} ngx_shmtx_t;
在创建互斥锁锁时会传入一个 ngx_shmtx_sh_t
结构体指针,主要是用于将其赋值给 ngx_shmtx_t
的相应成员,其结构如下:
typedef struct {
ngx_atomic_t lock;
#if (NGX_HAVE_POSIX_SEM)
ngx_atomic_t wait;
#endif
} ngx_shmtx_sh_t;
文件锁实现
互斥锁初始化(ngx_shmtx_create)
如果互斥锁 name
有值且与参数 name
相同,则表示已经初始化过了,否则先销毁原来的互斥锁再重新分配。按照 name
指定的路径创建并打开这个文件。由于只需要这个文件在内核中的 INODE 信息,所以可以把文件删除,只要 fd
可用就行
ngx_int_t
ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name)
{
if (mtx->name) {
if (ngx_strcmp(name, mtx->name) == 0) {
mtx->name = name;
return NGX_OK;
}
ngx_shmtx_destroy(mtx);
}
mtx->fd = ngx_open_file(name, NGX_FILE_RDWR, NGX_FILE_CREATE_OR_OPEN,
NGX_FILE_DEFAULT_ACCESS);
if (mtx->fd == NGX_INVALID_FILE) {
ngx_log_error(NGX_LOG_EMERG, ngx_cycle->log, ngx_errno,
ngx_open_file_n " \"%s\" failed", name);
return NGX_ERROR;
}
if (ngx_delete_file(name) == NGX_FILE_ERROR) {
ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
ngx_delete_file_n " \"%s\" failed", name);
}
mtx->name = name;
return NGX_OK;
}
互斥锁销毁(ngx_shmtx_destroy)
销毁互斥锁只需要关闭对应的文件描述符即可。
void
ngx_shmtx_destroy(ngx_shmtx_t *mtx)
{
if (ngx_close_file(mtx->fd) == NGX_FILE_ERROR) {
ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
ngx_close_file_n " \"%s\" failed", mtx->name);
}
}
非阻塞式获取互斥锁(ngx_shmtx_trylock)
调用了封装的文件锁操作,获取到了返回 1,否则返回 0。
ngx_uint_t
ngx_shmtx_trylock(ngx_shmtx_t *mtx)
{
ngx_err_t err;
err = ngx_trylock_fd(mtx->fd);
if (err == 0) {
return 1;
}
if (err == NGX_EAGAIN) {
return 0;
}
#if __osf__ /* Tru64 UNIX */
if (err == NGX_EACCES) {
return 0;
}
#endif
ngx_log_abort(err, ngx_trylock_fd_n " %s failed", mtx->name);
return 0;
}
非阻塞获取文件锁的实现如下,通过 fcntl
调用获取文件锁,使用 F_SETLK 时如果互斥锁已经被其他进程占用,则不会等待其他进程释放锁而是立即返回获取文件锁失败。
ngx_err_t
ngx_trylock_fd(ngx_fd_t fd)
{
struct flock fl;
ngx_memzero(&fl, sizeof(struct flock));
fl.l_type = F_WRLCK;
fl.l_whence = SEEK_SET;
if (fcntl(fd, F_SETLK, &fl) == -1) {
return ngx_errno;
}
return 0;
}
阻塞式获取互斥锁(ngx_shmtx_lock)
调用了封装的文件锁操作,err
返回 0 表示成功获取,否则表示错误。
void
ngx_shmtx_lock(ngx_shmtx_t *mtx)
{
ngx_err_t err;
err = ngx_lock_fd(mtx->fd);
if (err == 0) {
return;
}
ngx_log_abort(err, ngx_lock_fd_n " %s failed", mtx->name);
}
阻塞获取文件锁的实现如下,设置 F_SETLKW 时锁被占用后 fcntl
方法会一直等待,在其他进程没有释放锁时,当前进程就会阻塞在 fcntl
方法中,这种阻塞会导致当前进程由可执行状态转为睡眠状态。
ngx_err_t
ngx_lock_fd(ngx_fd_t fd)
{
struct flock fl;
ngx_memzero(&fl, sizeof(struct flock));
fl.l_type = F_WRLCK;
fl.l_whence = SEEK_SET;
if (fcntl(fd, F_SETLKW, &fl) == -1) {
return ngx_errno;
}
return 0;
}
释放互斥锁(ngx_shmtx_unlock)
调用了封装的文件锁操作,err
返回 0 表示成功获取,否则表示错误。
void
ngx_shmtx_unlock(ngx_shmtx_t *mtx)
{
ngx_err_t err;
err = ngx_unlock_fd(mtx->fd);
if (err == 0) {
return;
}
ngx_log_abort(err, ngx_unlock_fd_n " %s failed", mtx->name);
}
在调用 fcntl
时传入的 struct flock
中设置其 l_type
成员为 F_UNLCK 表示解锁即可。
ngx_err_t
ngx_unlock_fd(ngx_fd_t fd)
{
struct flock fl;
ngx_memzero(&fl, sizeof(struct flock));
fl.l_type = F_UNLCK;
fl.l_whence = SEEK_SET;
if (fcntl(fd, F_SETLK, &fl) == -1) {
return ngx_errno;
}
return 0;
}
原子变量实现
互斥锁初始化(ngx_shmtx_create)
当 spin
值为 -1 时,表示不能使用信号量,这时直接返回成功。否则设置 spin
的默认值为 2048,如果支持信号量还需要初始化 sem
为 0 并设置 semaphore
为 1 表示使用信号量。
ngx_int_t
ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name)
{
mtx->lock = &addr->lock;
if (mtx->spin == (ngx_uint_t) -1) {
return NGX_OK;
}
mtx->spin = 2048;
#if (NGX_HAVE_POSIX_SEM)
mtx->wait = &addr->wait;
if (sem_init(&mtx->sem, 1, 0) == -1) {
ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
"sem_init() failed");
} else {
mtx->semaphore = 1;
}
#endif
return NGX_OK;
}
互斥锁销毁(ngx_shmtx_destroy)
只需要处理支持信号量时的情况,调用 sem_destroy
销毁信号量即可。
void
ngx_shmtx_destroy(ngx_shmtx_t *mtx)
{
#if (NGX_HAVE_POSIX_SEM)
if (mtx->semaphore) {
if (sem_destroy(&mtx->sem) == -1) {
ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
"sem_destroy() failed");
}
}
#endif
}
非阻塞式获取互斥锁(ngx_shmtx_trylock)
如果原子变量锁没有被持有并且 ngx_atomic_cmp_set
原子操作成功将其值设置为了 ngx_pid
即当前进程的 pid 则返回 1 表示获取成功,否则返回 0 表示获取失败。
ngx_uint_t
ngx_shmtx_trylock(ngx_shmtx_t *mtx)
{
return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid));
}
阻塞式获取互斥锁(ngx_shmtx_lock)
首先外部是一个无限循环,表示未获取到互斥锁不会退出。循环的开始尝试获取互斥锁,获取成功则直接返回。否则继续往下执行,如果是多核处理器,执行 ngx_cpu_pause
可以暂停正在使用的 CPU 处理器,看其他处理器上的进程是否会释放锁,这会减少进程间切换的次数。当然,如果 spin
的值为 (ngx_uint_t) -1
则不会执行这部分,这适用于 accept_mutex
这种需要频繁使用的场景。
如果支持信号量且 semaphore
标志开启,则先增加 wait
的值表示当前等待的进程数增加,然后再次尝试获取锁,如果获取到就将 wait
计数器减一。如果还是没获取到就循环调用 sem_wait
等待信号量释放,信号量 sem
初始时为 0,调用 sem_wait
方法将会把信号量 sem
的值减一,如果调用前 sem
的值小于或等于 0,则阻塞住当前进程(进程会进入睡眠状态),直到其他进程将信号量的值增加到整数后,才能继续通过将 sem
减一而使得当前进程继续向下执行(即等待 sem_post
的调用)。
在无限循环的最后调用 ngx_sched_yield
暂时让出处理器,使得处理器优先调度其他可执行状态的进程。
void
ngx_shmtx_lock(ngx_shmtx_t *mtx)
{
ngx_uint_t i, n;
ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx lock");
for ( ;; ) {
if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) {
return;
}
if (ngx_ncpu > 1) {
for (n = 1; n < mtx->spin; n <<= 1) {
for (i = 0; i < n; i++) {
ngx_cpu_pause();
}
if (*mtx->lock == 0
&& ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid))
{
return;
}
}
}
#if (NGX_HAVE_POSIX_SEM)
if (mtx->semaphore) {
(void) ngx_atomic_fetch_add(mtx->wait, 1);
if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) {
(void) ngx_atomic_fetch_add(mtx->wait, -1);
return;
}
ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0,
"shmtx wait %uA", *mtx->wait);
while (sem_wait(&mtx->sem) == -1) {
ngx_err_t err;
err = ngx_errno;
if (err != NGX_EINTR) {
ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, err,
"sem_wait() failed while waiting on shmtx");
break;
}
}
ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0,
"shmtx awoke");
continue;
}
#endif
ngx_sched_yield();
}
}
释放互斥锁(ngx_shmtx_unlock)
释放互斥锁需要将原子变量的值重置为 0,如果成功还需要调用 ngx_shmtx_wakeup
唤醒处于 set_wait
睡眠状态的进程。
void
ngx_shmtx_unlock(ngx_shmtx_t *mtx)
{
if (mtx->spin != (ngx_uint_t) -1) {
ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx unlock");
}
if (ngx_atomic_cmp_set(mtx->lock, ngx_pid, 0)) {
ngx_shmtx_wakeup(mtx);
}
}
ngx_shmtx_wakeup
的实现如下,当支持信号量且 semaphore
参数启用时,尝试将 wait
计数器减一,如果成功则跳出无限循环。最后调用 sem_post
增加信号量 sem
的值,使得其他阻塞在 sem_wait
的进程能够解除阻塞获取到互斥锁。
static void
ngx_shmtx_wakeup(ngx_shmtx_t *mtx)
{
#if (NGX_HAVE_POSIX_SEM)
ngx_atomic_uint_t wait;
if (!mtx->semaphore) {
return;
}
for ( ;; ) {
wait = *mtx->wait;
if ((ngx_atomic_int_t) wait <= 0) {
return;
}
if (ngx_atomic_cmp_set(mtx->wait, wait, wait - 1)) {
break;
}
}
ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0,
"shmtx wake %uA", wait);
if (sem_post(&mtx->sem) == -1) {
ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
"sem_post() failed while wake shmtx");
}
#endif
}