基于原子操作、信号量以及文件锁,Nginx 在更高层次封装了一个互斥锁,当不支持原子操作时,会使用文件锁来实现,支持原子操作却又不支持信号量时,使用自旋锁的方式实现,支持信号量时使用信号量的方式实现。
当 Nginx 判断当前操作系统支持原子变量时,将会优先使用原子变量实现的方法(即原子变量锁的优先级高于文件锁)。不过,同时还需要判断其是否支持信号量,因为支持信号量后进程有可能进入睡眠状态。
注意:文件锁只能用于多进程直接的互斥,因为一个进程只能持有一个文件锁,因此文件锁不适用于多线程。
fcntl 文档
sem_init 文档
flock 文档

Nginx 互斥锁结构

  1. 支持原子操作(定义了 NGX_HAVE_ATOMIC_OPS 宏)
    1. lock 为原子变量锁
    2. 支持信号量时(定义了 NGX_HAVE_POSIX_SEM 宏)
      1. wait 表示等待进程的数量
      2. semaphore 表示是否使用信号量
      3. sem 为信号量锁
    3. spin 大于 0 时表示自旋等待其他处理器的时间,如果为 0 或负值则不存在 pause 的机会,在 Nginx 中当 spin 值为 (ngx_uint_t) -1 时,相当于告诉这个互斥锁绝对不要使用信号量使得进程进入睡眠状态
  2. 不支持原子操作
    1. 使用文件锁时 fd 表示使用的文件句柄
    2. name 表示文件名
    3. spin 在文件锁实现中无意义
typedef struct {
#if (NGX_HAVE_ATOMIC_OPS)
    ngx_atomic_t  *lock;
#if (NGX_HAVE_POSIX_SEM)
    ngx_atomic_t  *wait;
    ngx_uint_t     semaphore;
    sem_t          sem;
#endif
#else
    ngx_fd_t       fd;
    u_char        *name;
#endif
    ngx_uint_t     spin;
} ngx_shmtx_t;

在创建互斥锁锁时会传入一个 ngx_shmtx_sh_t 结构体指针,主要是用于将其赋值给 ngx_shmtx_t 的相应成员,其结构如下:

typedef struct {
    ngx_atomic_t   lock;
#if (NGX_HAVE_POSIX_SEM)
    ngx_atomic_t   wait;
#endif
} ngx_shmtx_sh_t;

文件锁实现

互斥锁初始化(ngx_shmtx_create)

如果互斥锁 name 有值且与参数 name 相同,则表示已经初始化过了,否则先销毁原来的互斥锁再重新分配。按照 name 指定的路径创建并打开这个文件。由于只需要这个文件在内核中的 INODE 信息,所以可以把文件删除,只要 fd 可用就行

ngx_int_t
ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name)
{
    if (mtx->name) {

        if (ngx_strcmp(name, mtx->name) == 0) {
            mtx->name = name;
            return NGX_OK;
        }

        ngx_shmtx_destroy(mtx);
    }

    mtx->fd = ngx_open_file(name, NGX_FILE_RDWR, NGX_FILE_CREATE_OR_OPEN,
                            NGX_FILE_DEFAULT_ACCESS);

    if (mtx->fd == NGX_INVALID_FILE) {
        ngx_log_error(NGX_LOG_EMERG, ngx_cycle->log, ngx_errno,
                      ngx_open_file_n " \"%s\" failed", name);
        return NGX_ERROR;
    }

    if (ngx_delete_file(name) == NGX_FILE_ERROR) {
        ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
                      ngx_delete_file_n " \"%s\" failed", name);
    }

    mtx->name = name;

    return NGX_OK;
}

互斥锁销毁(ngx_shmtx_destroy)

销毁互斥锁只需要关闭对应的文件描述符即可。

void
ngx_shmtx_destroy(ngx_shmtx_t *mtx)
{
    if (ngx_close_file(mtx->fd) == NGX_FILE_ERROR) {
        ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
                      ngx_close_file_n " \"%s\" failed", mtx->name);
    }
}

非阻塞式获取互斥锁(ngx_shmtx_trylock)

调用了封装的文件锁操作,获取到了返回 1,否则返回 0。

ngx_uint_t
ngx_shmtx_trylock(ngx_shmtx_t *mtx)
{
    ngx_err_t  err;

    err = ngx_trylock_fd(mtx->fd);

    if (err == 0) {
        return 1;
    }

    if (err == NGX_EAGAIN) {
        return 0;
    }

#if __osf__ /* Tru64 UNIX */

    if (err == NGX_EACCES) {
        return 0;
    }

#endif

    ngx_log_abort(err, ngx_trylock_fd_n " %s failed", mtx->name);

    return 0;
}

非阻塞获取文件锁的实现如下,通过 fcntl 调用获取文件锁,使用 F_SETLK 时如果互斥锁已经被其他进程占用,则不会等待其他进程释放锁而是立即返回获取文件锁失败。

ngx_err_t
ngx_trylock_fd(ngx_fd_t fd)
{
    struct flock  fl;

    ngx_memzero(&fl, sizeof(struct flock));
    fl.l_type = F_WRLCK;
    fl.l_whence = SEEK_SET;

    if (fcntl(fd, F_SETLK, &fl) == -1) {
        return ngx_errno;
    }

    return 0;
}

阻塞式获取互斥锁(ngx_shmtx_lock)

调用了封装的文件锁操作,err 返回 0 表示成功获取,否则表示错误。

void
ngx_shmtx_lock(ngx_shmtx_t *mtx)
{
    ngx_err_t  err;

    err = ngx_lock_fd(mtx->fd);

    if (err == 0) {
        return;
    }

    ngx_log_abort(err, ngx_lock_fd_n " %s failed", mtx->name);
}

阻塞获取文件锁的实现如下,设置 F_SETLKW 时锁被占用后 fcntl 方法会一直等待,在其他进程没有释放锁时,当前进程就会阻塞在 fcntl 方法中,这种阻塞会导致当前进程由可执行状态转为睡眠状态。

ngx_err_t
ngx_lock_fd(ngx_fd_t fd)
{
    struct flock  fl;

    ngx_memzero(&fl, sizeof(struct flock));
    fl.l_type = F_WRLCK;
    fl.l_whence = SEEK_SET;

    if (fcntl(fd, F_SETLKW, &fl) == -1) {
        return ngx_errno;
    }

    return 0;
}

释放互斥锁(ngx_shmtx_unlock)

调用了封装的文件锁操作,err 返回 0 表示成功获取,否则表示错误。

void
ngx_shmtx_unlock(ngx_shmtx_t *mtx)
{
    ngx_err_t  err;

    err = ngx_unlock_fd(mtx->fd);

    if (err == 0) {
        return;
    }

    ngx_log_abort(err, ngx_unlock_fd_n " %s failed", mtx->name);
}

在调用 fcntl 时传入的 struct flock 中设置其 l_type 成员为 F_UNLCK 表示解锁即可。

ngx_err_t
ngx_unlock_fd(ngx_fd_t fd)
{
    struct flock  fl;

    ngx_memzero(&fl, sizeof(struct flock));
    fl.l_type = F_UNLCK;
    fl.l_whence = SEEK_SET;

    if (fcntl(fd, F_SETLK, &fl) == -1) {
        return  ngx_errno;
    }

    return 0;
}

原子变量实现

互斥锁初始化(ngx_shmtx_create)

spin 值为 -1 时,表示不能使用信号量,这时直接返回成功。否则设置 spin 的默认值为 2048,如果支持信号量还需要初始化 sem 为 0 并设置 semaphore 为 1 表示使用信号量。

ngx_int_t
ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name)
{
    mtx->lock = &addr->lock;

    if (mtx->spin == (ngx_uint_t) -1) {
        return NGX_OK;
    }

    mtx->spin = 2048;

#if (NGX_HAVE_POSIX_SEM)

    mtx->wait = &addr->wait;

    if (sem_init(&mtx->sem, 1, 0) == -1) {
        ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
                      "sem_init() failed");
    } else {
        mtx->semaphore = 1;
    }

#endif

    return NGX_OK;
}

互斥锁销毁(ngx_shmtx_destroy)

只需要处理支持信号量时的情况,调用 sem_destroy 销毁信号量即可。

void
ngx_shmtx_destroy(ngx_shmtx_t *mtx)
{
#if (NGX_HAVE_POSIX_SEM)

    if (mtx->semaphore) {
        if (sem_destroy(&mtx->sem) == -1) {
            ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
                          "sem_destroy() failed");
        }
    }

#endif
}

非阻塞式获取互斥锁(ngx_shmtx_trylock)

如果原子变量锁没有被持有并且 ngx_atomic_cmp_set 原子操作成功将其值设置为了 ngx_pid 即当前进程的 pid 则返回 1 表示获取成功,否则返回 0 表示获取失败。

ngx_uint_t
ngx_shmtx_trylock(ngx_shmtx_t *mtx)
{
    return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid));
}

阻塞式获取互斥锁(ngx_shmtx_lock)

首先外部是一个无限循环,表示未获取到互斥锁不会退出。循环的开始尝试获取互斥锁,获取成功则直接返回。否则继续往下执行,如果是多核处理器,执行 ngx_cpu_pause 可以暂停正在使用的 CPU 处理器,看其他处理器上的进程是否会释放锁,这会减少进程间切换的次数。当然,如果 spin 的值为 (ngx_uint_t) -1 则不会执行这部分,这适用于 accept_mutex 这种需要频繁使用的场景。
如果支持信号量且 semaphore 标志开启,则先增加 wait 的值表示当前等待的进程数增加,然后再次尝试获取锁,如果获取到就将 wait 计数器减一。如果还是没获取到就循环调用 sem_wait 等待信号量释放,信号量 sem 初始时为 0,调用 sem_wait 方法将会把信号量 sem 的值减一,如果调用前 sem 的值小于或等于 0,则阻塞住当前进程(进程会进入睡眠状态),直到其他进程将信号量的值增加到整数后,才能继续通过将 sem 减一而使得当前进程继续向下执行(即等待 sem_post 的调用)。
在无限循环的最后调用 ngx_sched_yield 暂时让出处理器,使得处理器优先调度其他可执行状态的进程。

void
ngx_shmtx_lock(ngx_shmtx_t *mtx)
{
    ngx_uint_t         i, n;

    ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx lock");

    for ( ;; ) {

        if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) {
            return;
        }

        if (ngx_ncpu > 1) {

            for (n = 1; n < mtx->spin; n <<= 1) {

                for (i = 0; i < n; i++) {
                    ngx_cpu_pause();
                }

                if (*mtx->lock == 0
                    && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid))
                {
                    return;
                }
            }
        }

#if (NGX_HAVE_POSIX_SEM)

        if (mtx->semaphore) {
            (void) ngx_atomic_fetch_add(mtx->wait, 1);

            if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) {
                (void) ngx_atomic_fetch_add(mtx->wait, -1);
                return;
            }

            ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0,
                           "shmtx wait %uA", *mtx->wait);

            while (sem_wait(&mtx->sem) == -1) {
                ngx_err_t  err;

                err = ngx_errno;

                if (err != NGX_EINTR) {
                    ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, err,
                                  "sem_wait() failed while waiting on shmtx");
                    break;
                }
            }

            ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0,
                           "shmtx awoke");

            continue;
        }

#endif

        ngx_sched_yield();
    }
}

释放互斥锁(ngx_shmtx_unlock)

释放互斥锁需要将原子变量的值重置为 0,如果成功还需要调用 ngx_shmtx_wakeup 唤醒处于 set_wait 睡眠状态的进程。

void
ngx_shmtx_unlock(ngx_shmtx_t *mtx)
{
    if (mtx->spin != (ngx_uint_t) -1) {
        ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx unlock");
    }

    if (ngx_atomic_cmp_set(mtx->lock, ngx_pid, 0)) {
        ngx_shmtx_wakeup(mtx);
    }
}

ngx_shmtx_wakeup 的实现如下,当支持信号量且 semaphore 参数启用时,尝试将 wait 计数器减一,如果成功则跳出无限循环。最后调用 sem_post 增加信号量 sem 的值,使得其他阻塞在 sem_wait 的进程能够解除阻塞获取到互斥锁。

static void
ngx_shmtx_wakeup(ngx_shmtx_t *mtx)
{
#if (NGX_HAVE_POSIX_SEM)
    ngx_atomic_uint_t  wait;

    if (!mtx->semaphore) {
        return;
    }

    for ( ;; ) {

        wait = *mtx->wait;

        if ((ngx_atomic_int_t) wait <= 0) {
            return;
        }

        if (ngx_atomic_cmp_set(mtx->wait, wait, wait - 1)) {
            break;
        }
    }

    ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0,
                   "shmtx wake %uA", wait);

    if (sem_post(&mtx->sem) == -1) {
        ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
                      "sem_post() failed while wake shmtx");
    }

#endif
}