For fun I wanted to create a lock implementation that was about as fast as one using futexes but that used used pipes instead and that queues waiters in user space instead of in-kernel. For the queue the code uses an optimized version of the lock-free algorithm from Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms at http://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html . I am not sure all the optimizations I have made are correct however. I also made some traditional implementations for comparison.
The lock interface is:
/*
* Copyright 2017 Steven Stewart-Gallus
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
#ifndef LOCK_H
#define LOCK_H
struct lock;
struct lock *lock_create(void);
void lock_acquire(struct lock *lock);
void lock_release(struct lock *lock);
#endif
The test program is:
/*
* Copyright 2017 Steven Stewart-Gallus
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
#define _GNU_SOURCE 1
#include <errno.h>
#include <pthread.h>
#include <stdio.h>
#include "lock.h"
#define PRODUCER_LOOP 1000000U
#define NUM_THREADS 10U
static void *contend(void * arg);
int main()
{
struct lock *the_lock = lock_create();
pthread_t threads[NUM_THREADS];
for (size_t ii = 0U; ii < NUM_THREADS; ++ii) {
pthread_create(&threads[ii], NULL, contend, the_lock);
}
for (size_t ii = 0U; ii < NUM_THREADS; ++ii) {
void *retval;
pthread_join(threads[ii], &retval);
}
return 0;
}
The lock implementation is:
/*
* Copyright 2017 Steven Stewart-Gallus
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
#define _GNU_SOURCE 1
#include "lock.h"
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <sys/poll.h>
#include <sys/eventfd.h>
#include <xmmintrin.h>
#include <unistd.h>
#if defined DO_FUTEX || defined PARTIAL_FUTEX
#include <linux/futex.h>
#include <sys/syscall.h>
#endif
#define NOTIFIER_FD_SPIN_LOOPS 40U
#define FUTEX_SPIN_LOOPS 40U
#define NOTIFIER_FUTEX_SPIN_LOOPS 20U
#define NOOP_CONTEND_CYCLES 4U
#define PAUSE_CONTEND_CYCLES 40U
struct node;
#define ALIGN_TO_CACHE _Alignas (16)
#if defined RAW_EVENTFD
struct event {
int fd;
};
#elif defined DO_FUTEX
struct event {
ALIGN_TO_CACHE _Atomic(bool) spinning;
ALIGN_TO_CACHE _Atomic(int) triggered;
};
#else
struct event {
ALIGN_TO_CACHE _Atomic(bool) triggered;
ALIGN_TO_CACHE _Atomic(struct node *) head;
ALIGN_TO_CACHE _Atomic(struct node *) tail;
ALIGN_TO_CACHE _Atomic(uint64_t) head_contention;
ALIGN_TO_CACHE _Atomic(uint64_t) tail_contention;
};
#endif
static void event_init (struct event *event);
static void event_wait (struct event *event);
static void event_signal (struct event *event);
#if defined NATIVE
struct lock {
pthread_mutex_t mutex;
};
#else
struct lock {
atomic_flag locked;
struct event when_unlocked;
};
#endif
#if defined NATIVE
struct lock *lock_create(void)
{
struct lock *lock = aligned_alloc(_Alignof(struct lock), sizeof *lock);
if (!lock)
abort();
pthread_mutex_init (&lock->mutex, 0);
return lock;
}
void lock_acquire(struct lock *lock)
{
pthread_mutex_lock (&lock->mutex);
}
void lock_release(struct lock *lock)
{
pthread_mutex_unlock (&lock->mutex);
}
#else
struct lock *lock_create(void)
{
struct lock *lock = aligned_alloc(_Alignof(struct lock), sizeof *lock);
if (!lock)
abort();
atomic_flag_clear_explicit(&lock->locked, memory_order_relaxed);
event_init (&lock->when_unlocked);
return lock;
}
void lock_acquire(struct lock *lock)
{
for (;;) {
if (!atomic_flag_test_and_set_explicit (&lock->locked, memory_order_acquire))
break;
event_wait (&lock->when_unlocked);
}
}
void lock_release(struct lock *lock)
{
atomic_flag_clear_explicit (&lock->locked, memory_order_release);
event_signal (&lock->when_unlocked);
}
#endif
#if defined RAW_EVENTFD
static void event_init (struct event *event) {
if (-1 == (event->fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)))
abort();
}
static void event_wait (struct event *event) {
{
struct pollfd fds[1U] = {{ .fd = event->fd,
.events = POLLIN}};
if (-1 == poll(fds, 1U, -1)) {
abort();
}
}
for (;;) {
uint64_t xx;
if (-1 == read(event->fd, &xx, sizeof xx)) {
if (EAGAIN == errno)
break;
abort();
}
}
}
static void event_signal (struct event *event) {
static uint64_t const one = 1U;
if (-1 == write(event->fd, &one, sizeof one)) {
if (errno != EAGAIN)
abort();
}
}
#elif defined DO_FUTEX
static void event_init (struct event *event) {
event->triggered = false;
event->spinning = false;
}
static void event_wait (struct event *event) {
for (;;) {
atomic_store_explicit(&event->spinning, true, memory_order_release);
atomic_thread_fence(memory_order_acquire);
bool got_triggered = false;
for (size_t ii = 0U; ii < NOTIFIER_FD_SPIN_LOOPS; ++ii) {
got_triggered |= atomic_exchange_explicit(&event->triggered, false, memory_order_relaxed);
if (got_triggered)
break;
_mm_pause();
}
atomic_thread_fence(memory_order_acq_rel);
atomic_store_explicit(&event->spinning, false, memory_order_release);
if (got_triggered)
break;
if (atomic_exchange_explicit(&event->triggered, false, memory_order_acq_rel))
break;
syscall(__NR_futex, &event->triggered,
FUTEX_WAIT_PRIVATE, 0, NULL, NULL, 0);
}
}
static void event_signal (struct event *event) {
/* Already signaled */
bool already_signaled = atomic_exchange_explicit (&event->triggered, 1, memory_order_acq_rel);
bool still_spinning = atomic_load_explicit(&event->spinning, memory_order_acquire);
if (already_signaled || still_spinning)
return;
syscall(__NR_futex, &event->triggered, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
}
#else
struct notifier;
struct node {
ALIGN_TO_CACHE struct notifier * trigger;
ALIGN_TO_CACHE _Atomic(struct node *) next;
};
static ALIGN_TO_CACHE _Atomic(uint64_t) free_list_contention = 0U;
static ALIGN_TO_CACHE _Atomic(struct node *) free_list = NULL;
static ALIGN_TO_CACHE _Atomic(uint64_t) notifier_free_list_contention = 0U;
static ALIGN_TO_CACHE _Atomic(struct notifier *) notifier_free_list = NULL;
/* All used once so inline */
static inline void enqueue (struct event *event, struct notifier *notifier);
static inline struct notifier * dequeue (struct event *event);
static inline struct node * allocate (void);
static inline void deallocate (struct node *node);
static inline struct notifier * allocate_notifier (void);
static inline void deallocate_notifier (struct notifier *node);
static inline void notifier_signal(struct notifier *notifier);
static inline void notifier_wait(struct notifier *notifier);
/* Very small so inline */
static inline void on_failure(_Atomic(uint64_t) *contention);
static inline void on_success(_Atomic(uint64_t) *contention);
static void event_init (struct event *event) {
struct node *n = aligned_alloc(_Alignof (struct node), sizeof *n);
if (!n)
abort();
n->next = NULL;
event->head = n;
event->tail = n;
event->head_contention = 0U;
event->tail_contention = 0U;
}
static void event_wait (struct event *event) {
if (atomic_exchange_explicit (&event->triggered, false, memory_order_acq_rel))
return;
struct notifier *notifier = allocate_notifier();
enqueue (event, notifier);
notifier_wait(notifier);
deallocate_notifier (notifier);
}
static void event_signal (struct event *event) {
atomic_store_explicit (&event->triggered, true, memory_order_release);
struct notifier *notifier = dequeue (event);
if (!notifier)
return;
notifier_signal(notifier);
}
static void *from(void *node)
{
return (void *)((uintptr_t)node & 0xFFFFFFFFFFFFU);
}
static uint32_t tag(void *node)
{
return (uintptr_t)node >> 48U;
}
static void *to(void *node, uint32_t tag)
{
return (void *)((uintptr_t)node | (uint64_t)tag << 48U);
}
void enqueue (struct event *event, struct notifier *trigger)
{
struct node *tail;
struct node *tail_again;
struct node *next;
struct node *node = allocate ();
node->trigger = trigger;
node->next = NULL;
for (;;) {
for (;;) {
tail = atomic_load_explicit (&event->tail, memory_order_acquire);
next = atomic_load_explicit (&((struct node*)from (tail))->next, memory_order_acquire);
tail_again = atomic_load_explicit (&event->tail, memory_order_acquire);
if (tail == tail_again)
break;
on_failure(&event->tail_contention);
}
on_success(&event->tail_contention);
if (!from (next)) {
if (atomic_compare_exchange_strong
(&((struct node*)from (tail))->next,
&next,
to (node, tag (next) + 1U)))
break;
} else {
atomic_compare_exchange_strong (&event->tail,
&tail,
to (from (next), tag (tail) + 1U));
}
on_failure(&event->tail_contention);
}
on_success(&event->tail_contention);
atomic_compare_exchange_strong (&event->tail,
&tail,
to (node, tag (tail) + 1U));
}
static struct notifier * dequeue (struct event *event)
{
struct node *head;
struct node *head_again;
struct node *tail;
struct node *next;
struct node *dequeued;
struct notifier *trigger;
for (;;) {
for (;;) {
head = atomic_load_explicit (&event->head, memory_order_acquire);
tail = atomic_load_explicit (&event->tail, memory_order_acquire);
next = atomic_load_explicit (&((struct node *)from (head))->next,
memory_order_acquire);
head_again = atomic_load_explicit (&event->head, memory_order_acquire);
if (head == head_again)
break;
on_failure(&event->head_contention);
}
on_success(&event->head_contention);
if (from (head) == from (tail)) {
if (!from (next)) {
dequeued = NULL;
trigger = NULL;
break;
}
atomic_compare_exchange_strong
(&event->tail,
&tail,
to (from (next), tag (tail) + 1U));
} else {
trigger = ((struct node *)from (next))->trigger;
if (atomic_compare_exchange_strong (&event->head,
&head,
to (from (next), tag (head) + 1U))) {
dequeued = from (head);
break;
}
}
on_failure(&event->head_contention);
}
on_success(&event->head_contention);
if (dequeued) {
deallocate (dequeued);
}
return trigger;
}
struct node * allocate (void)
{
struct node *head;
struct node *next;
struct node *n;
for (;;) {
head = atomic_load_explicit (&free_list, memory_order_relaxed);
n = from (head);
atomic_thread_fence (memory_order_acquire);
if (!n) {
on_success(&free_list_contention);
n = aligned_alloc(_Alignof (struct node), sizeof *n);
if (!n)
abort();
n->next = NULL;
n->trigger = NULL;
return n;
}
next = atomic_load_explicit (&n->next, memory_order_acquire);
if (atomic_compare_exchange_weak_explicit(&free_list,
&head,
to (from (next), tag (head) + 1U),
memory_order_acq_rel,
memory_order_acquire))
break;
on_failure(&free_list_contention);
}
on_success(&free_list_contention);
n->next = NULL;
return n;
}
static void deallocate (struct node *node)
{
struct node *head;
for (;;) {
head = atomic_load_explicit (&free_list, memory_order_relaxed);
node->next = to (from (head), 0);
atomic_thread_fence(memory_order_acquire);
if (atomic_compare_exchange_weak_explicit (&free_list,
&head,
to (node, tag (head) + 1U),
memory_order_acq_rel,
memory_order_acquire))
break;
on_failure(&free_list_contention);
}
on_success(&free_list_contention);
}
#if defined PARTIAL_FUTEX
struct notifier {
ALIGN_TO_CACHE _Atomic(bool) spinning;
ALIGN_TO_CACHE _Atomic(int) triggered;
ALIGN_TO_CACHE _Atomic(struct notifier *) next;
};
#else
struct notifier {
int fds[2U];
ALIGN_TO_CACHE _Atomic(bool) spinning;
ALIGN_TO_CACHE _Atomic(bool) triggered;
ALIGN_TO_CACHE _Atomic(struct notifier *) next;
};
#endif
struct notifier * allocate_notifier (void)
{
struct notifier *head;
struct notifier *next;
struct notifier *n;
for (;;) {
head = atomic_load_explicit (¬ifier_free_list, memory_order_relaxed);
n = from (head);
atomic_thread_fence (memory_order_acquire);
if (!n) {
n = aligned_alloc(_Alignof (struct notifier), sizeof *n);
if (!n)
abort();
#if !defined PARTIAL_FUTEX
#if defined DO_EVENTFD
int fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (-1 == fd)
abort();
n->fds[0U] = fd;
n->fds[1U] = fd;
#else
if (-1 == pipe2(n->fds, O_CLOEXEC | O_NONBLOCK))
abort();
#endif
#endif
n->spinning = false;
n->triggered = false;
n->next = NULL;
return n;
}
next = atomic_load_explicit (&n->next, memory_order_acquire);
if (atomic_compare_exchange_weak_explicit(¬ifier_free_list,
&head,
to (from (next), tag (head) + 1U),
memory_order_acq_rel,
memory_order_acquire))
break;
on_failure(¬ifier_free_list_contention);
}
on_success(¬ifier_free_list_contention);
n->next = NULL;
n->triggered = false;
return n;
}
static void deallocate_notifier (struct notifier *node)
{
struct notifier *head;
node->triggered = false;
node->next = NULL;
for (;;) {
head = atomic_load_explicit (¬ifier_free_list, memory_order_relaxed);
node->next = to (from (head), 0);
atomic_thread_fence(memory_order_acquire);
if (atomic_compare_exchange_weak_explicit (¬ifier_free_list,
&head,
to (node, tag (head) + 1U),
memory_order_acq_rel,
memory_order_acquire))
break;
on_failure(¬ifier_free_list_contention);
}
on_success(¬ifier_free_list_contention);
}
#if defined PARTIAL_FUTEX
static inline void notifier_signal(struct notifier *notifier)
{
bool aleady_signaled = atomic_exchange_explicit(¬ifier->triggered, true, memory_order_acq_rel);
bool still_spinning = atomic_load_explicit(¬ifier->spinning, memory_order_acquire);
if (aleady_signaled || still_spinning)
return;
syscall(__NR_futex, ¬ifier->triggered, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
}
static inline void notifier_wait(struct notifier *notifier)
{
for (;;) {
atomic_store_explicit(¬ifier->spinning, true, memory_order_release);
atomic_thread_fence(memory_order_acquire);
bool got_triggered = false;
for (size_t ii = 0U; ii < NOTIFIER_FD_SPIN_LOOPS; ++ii) {
got_triggered |= atomic_exchange_explicit(¬ifier->triggered, false, memory_order_relaxed);
if (got_triggered)
break;
_mm_pause();
}
atomic_thread_fence(memory_order_acq_rel);
atomic_store_explicit(¬ifier->spinning, false, memory_order_release);
if (got_triggered)
break;
if (atomic_exchange_explicit(¬ifier->triggered, false, memory_order_acq_rel))
break;
syscall(__NR_futex, ¬ifier->triggered,
FUTEX_WAIT_PRIVATE, 0, NULL, NULL, 0);
}
}
#else
static inline void notifier_signal(struct notifier *notifier)
{
bool aleady_signaled = atomic_exchange_explicit(¬ifier->triggered, true, memory_order_acq_rel);
bool still_spinning = atomic_load_explicit(¬ifier->spinning, memory_order_acquire);
if (aleady_signaled || still_spinning)
return;
{
static uint64_t const one = 1U;
if (-1 == write(notifier->fds[1U], &one, sizeof one)) {
if (errno != EAGAIN)
abort();
}
}
}
static inline void notifier_wait(struct notifier *notifier)
{
for (;;) {
atomic_store_explicit(¬ifier->spinning, true, memory_order_release);
atomic_thread_fence(memory_order_acquire);
bool got_triggered = false;
for (size_t ii = 0U; ii < NOTIFIER_FD_SPIN_LOOPS; ++ii) {
got_triggered |= atomic_exchange_explicit(¬ifier->triggered, false, memory_order_relaxed);
if (got_triggered)
break;
_mm_pause();
}
atomic_thread_fence(memory_order_acq_rel);
atomic_store_explicit(¬ifier->spinning, false, memory_order_release);
if (got_triggered)
break;
if (atomic_exchange_explicit(¬ifier->triggered, false, memory_order_acq_rel))
break;
{
struct pollfd fds[1U] = {{ .fd = notifier->fds[0U],
.events = POLLIN}};
if (-1 == poll(fds, 1U, -1)) {
abort();
}
}
for (;;) {
uint64_t xx;
if (-1 == read(notifier->fds[0U], &xx, sizeof xx)) {
if (EAGAIN == errno)
break;
abort();
}
}
}
}
#endif
static void on_failure(_Atomic(uint64_t) *contention)
{
uint64_t contention_count = atomic_load_explicit(contention, memory_order_relaxed);
uint64_t next_count = contention_count + 1U;
atomic_store_explicit(contention,
contention_count < UINT64_MAX ?
next_count : contention_count,
memory_order_relaxed);
if (contention_count < NOOP_CONTEND_CYCLES) {
/* do nothing */
} else if (contention_count < PAUSE_CONTEND_CYCLES) {
_mm_pause();
} else {
sched_yield();
}
}
static void on_success(_Atomic(uint64_t) *contention)
{
uint64_t contention_count = atomic_load_explicit(contention, memory_order_relaxed);
uint64_t next_count = contention_count - 1U;
atomic_store_explicit(contention,
contention_count > 0U ?
next_count : contention_count,
memory_order_relaxed);
}
#endif
static void *contend(void * arg)
{
struct lock *lock = arg;
for (size_t ii = 0U; ii < PRODUCER_LOOP; ++ii) {
lock_acquire(lock);
/* Do nothing */
lock_release(lock);
}
return 0;
}