4
\$\begingroup\$

For fun I wanted to create a lock implementation that was about as fast as one using futexes but that used used pipes instead and that queues waiters in user space instead of in-kernel. For the queue the code uses an optimized version of the lock-free algorithm from Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms at http://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html . I am not sure all the optimizations I have made are correct however. I also made some traditional implementations for comparison.

The lock interface is:

 /*
  * Copyright 2017 Steven Stewart-Gallus
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  * implied.  See the License for the specific language governing
  * permissions and limitations under the License.
  */
 #ifndef LOCK_H
 #define LOCK_H

 struct lock;

 struct lock *lock_create(void);
 void lock_acquire(struct lock *lock);
 void lock_release(struct lock *lock);

 #endif

The test program is:

 /*
  * Copyright 2017 Steven Stewart-Gallus
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  * implied.  See the License for the specific language governing
  * permissions and limitations under the License.
  */
 #define _GNU_SOURCE 1

 #include <errno.h>
 #include <pthread.h>
 #include <stdio.h>

 #include "lock.h"

 #define PRODUCER_LOOP 1000000U
 #define NUM_THREADS 10U

 static void *contend(void * arg);

 int main()
 {
    struct lock *the_lock = lock_create();

    pthread_t threads[NUM_THREADS];
    for (size_t ii = 0U; ii < NUM_THREADS; ++ii) {
        pthread_create(&threads[ii], NULL, contend, the_lock);
    }

    for (size_t ii = 0U; ii < NUM_THREADS; ++ii) {
        void *retval;
        pthread_join(threads[ii], &retval);
    }
    return 0;
 }

The lock implementation is:

 /*
  * Copyright 2017 Steven Stewart-Gallus
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  * implied.  See the License for the specific language governing
  * permissions and limitations under the License.
  */
 #define _GNU_SOURCE 1

 #include "lock.h"

 #include <errno.h>
 #include <fcntl.h>
 #include <pthread.h>
 #include <stdatomic.h>
 #include <stdbool.h>
 #include <stdio.h>
 #include <stdint.h>
 #include <stdlib.h>
 #include <sys/poll.h>
 #include <sys/eventfd.h>
 #include <xmmintrin.h>
 #include <unistd.h>

 #if defined DO_FUTEX || defined PARTIAL_FUTEX
 #include <linux/futex.h>
 #include <sys/syscall.h>
 #endif

 #define NOTIFIER_FD_SPIN_LOOPS 40U
 #define FUTEX_SPIN_LOOPS 40U
 #define NOTIFIER_FUTEX_SPIN_LOOPS 20U
 #define NOOP_CONTEND_CYCLES 4U
 #define PAUSE_CONTEND_CYCLES 40U

 struct node;

 #define ALIGN_TO_CACHE _Alignas (16)

 #if defined RAW_EVENTFD
 struct event {
    int fd;
 };
 #elif defined DO_FUTEX
 struct event {
    ALIGN_TO_CACHE _Atomic(bool) spinning;
    ALIGN_TO_CACHE _Atomic(int) triggered;
 };
 #else
 struct event {
    ALIGN_TO_CACHE _Atomic(bool) triggered;
    ALIGN_TO_CACHE _Atomic(struct node *) head;
    ALIGN_TO_CACHE _Atomic(struct node *) tail;
    ALIGN_TO_CACHE _Atomic(uint64_t) head_contention;
    ALIGN_TO_CACHE _Atomic(uint64_t) tail_contention;
 };
 #endif
 static void event_init (struct event *event);
 static void event_wait (struct event *event);
 static void event_signal (struct event *event);

 #if defined NATIVE
 struct lock {
    pthread_mutex_t mutex;
 };
 #else
 struct lock {
    atomic_flag locked;
    struct event when_unlocked;
 };
 #endif

 #if defined NATIVE
 struct lock *lock_create(void)
 {
    struct lock *lock = aligned_alloc(_Alignof(struct lock), sizeof *lock);
    if (!lock)
        abort();
    pthread_mutex_init (&lock->mutex, 0);
    return lock;
 }
 void lock_acquire(struct lock *lock)
 {
    pthread_mutex_lock (&lock->mutex);
 }

 void lock_release(struct lock *lock)
 {
    pthread_mutex_unlock (&lock->mutex);
 }
 #else
 struct lock *lock_create(void)
 {
    struct lock *lock = aligned_alloc(_Alignof(struct lock), sizeof *lock);
    if (!lock)
        abort();
    atomic_flag_clear_explicit(&lock->locked, memory_order_relaxed);
    event_init (&lock->when_unlocked);
    return lock;
 }
 void lock_acquire(struct lock *lock)
 {
    for (;;) {
        if (!atomic_flag_test_and_set_explicit (&lock->locked, memory_order_acquire))
            break;

        event_wait (&lock->when_unlocked);
    }
 }

 void lock_release(struct lock *lock)
 {
    atomic_flag_clear_explicit (&lock->locked, memory_order_release);
    event_signal (&lock->when_unlocked);
 }
 #endif

 #if defined RAW_EVENTFD
 static void event_init (struct event *event) {
    if (-1 == (event->fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)))
        abort();
 }

 static void event_wait (struct event *event) {
    {
        struct pollfd fds[1U] = {{ .fd = event->fd,
                       .events = POLLIN}};
        if (-1 == poll(fds, 1U, -1)) {
            abort();
        }
    }

    for (;;) {
        uint64_t xx;
        if (-1 == read(event->fd, &xx, sizeof xx)) {
            if (EAGAIN == errno)
                break;
            abort();
        }
    }
 }

 static void event_signal (struct event *event) {
    static uint64_t const one = 1U;
    if (-1 == write(event->fd, &one, sizeof one)) {
        if (errno != EAGAIN)
            abort();
    }
 }
 #elif defined DO_FUTEX
 static void event_init (struct event *event) {
    event->triggered = false;
    event->spinning = false;
 }

 static void event_wait (struct event *event) {
    for (;;) {
        atomic_store_explicit(&event->spinning, true, memory_order_release);

        atomic_thread_fence(memory_order_acquire);

        bool got_triggered = false;
        for (size_t ii = 0U; ii < NOTIFIER_FD_SPIN_LOOPS; ++ii) {
            got_triggered |= atomic_exchange_explicit(&event->triggered, false, memory_order_relaxed);
            if (got_triggered)
                break;
            _mm_pause();
        }
        atomic_thread_fence(memory_order_acq_rel);
        atomic_store_explicit(&event->spinning, false, memory_order_release);
        if (got_triggered)
            break;
        if (atomic_exchange_explicit(&event->triggered, false, memory_order_acq_rel))
            break;

        syscall(__NR_futex, &event->triggered,
               FUTEX_WAIT_PRIVATE, 0, NULL, NULL, 0);
    }
 }

 static void event_signal (struct event *event) {
    /* Already signaled */
    bool already_signaled = atomic_exchange_explicit (&event->triggered, 1, memory_order_acq_rel);
    bool still_spinning = atomic_load_explicit(&event->spinning, memory_order_acquire);

    if (already_signaled || still_spinning)
        return;

    syscall(__NR_futex, &event->triggered, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
 }

 #else
 struct notifier;

 struct node {
    ALIGN_TO_CACHE struct notifier * trigger;
    ALIGN_TO_CACHE _Atomic(struct node *) next;
 };

 static ALIGN_TO_CACHE _Atomic(uint64_t) free_list_contention = 0U;
 static ALIGN_TO_CACHE _Atomic(struct node *) free_list = NULL;
 static ALIGN_TO_CACHE _Atomic(uint64_t) notifier_free_list_contention = 0U;
 static ALIGN_TO_CACHE _Atomic(struct notifier *) notifier_free_list = NULL;

 /* All used once so inline */
 static inline void enqueue (struct event *event, struct notifier *notifier);
 static inline struct notifier * dequeue (struct event *event);

 static inline struct node * allocate (void);
 static inline void deallocate (struct node *node);

 static inline struct notifier * allocate_notifier (void);
 static inline void deallocate_notifier (struct notifier *node);

 static inline void notifier_signal(struct notifier *notifier);
 static inline void notifier_wait(struct notifier *notifier);

 /* Very small so inline */
 static inline void on_failure(_Atomic(uint64_t) *contention);
 static inline void on_success(_Atomic(uint64_t) *contention);

 static void event_init (struct event *event) {
    struct node *n = aligned_alloc(_Alignof (struct node), sizeof *n);
    if (!n)
        abort();
    n->next = NULL;
    event->head = n;
    event->tail = n;
    event->head_contention = 0U;
    event->tail_contention = 0U;
 }

 static void event_wait (struct event *event) {
    if (atomic_exchange_explicit (&event->triggered, false, memory_order_acq_rel))
        return;

    struct notifier *notifier = allocate_notifier();

    enqueue (event, notifier);

    notifier_wait(notifier);

    deallocate_notifier (notifier);
 }

 static void event_signal (struct event *event) {
    atomic_store_explicit (&event->triggered, true, memory_order_release);

    struct notifier *notifier = dequeue (event);
    if (!notifier)
        return;

    notifier_signal(notifier);
 }

 static void *from(void *node)
 {
    return (void *)((uintptr_t)node & 0xFFFFFFFFFFFFU);
 }

 static uint32_t tag(void *node)
 {
    return (uintptr_t)node >> 48U;
 }

 static void *to(void *node, uint32_t tag)
 {
    return (void *)((uintptr_t)node | (uint64_t)tag << 48U);
 }

 void enqueue (struct event *event, struct notifier *trigger)
 {
    struct node *tail;
    struct node *tail_again;
    struct node *next;

    struct node *node = allocate ();
    node->trigger = trigger;
    node->next = NULL;

    for (;;) {
        for (;;) {
            tail = atomic_load_explicit (&event->tail, memory_order_acquire);
            next = atomic_load_explicit (&((struct node*)from (tail))->next, memory_order_acquire);
            tail_again = atomic_load_explicit (&event->tail, memory_order_acquire);
            if (tail == tail_again)
                break;
            on_failure(&event->tail_contention);
        }
        on_success(&event->tail_contention);

        if (!from (next)) {
            if (atomic_compare_exchange_strong
                (&((struct node*)from (tail))->next,
                 &next,
                 to (node, tag (next) + 1U)))
                break;
        } else {
            atomic_compare_exchange_strong (&event->tail,
                         &tail,
                         to (from (next), tag (tail) + 1U));
        }
        on_failure(&event->tail_contention);
    }
    on_success(&event->tail_contention);
    atomic_compare_exchange_strong (&event->tail,
                 &tail,
                 to (node, tag (tail) + 1U));
 }

 static struct notifier * dequeue (struct event *event)
 {
    struct node *head;
    struct node *head_again;
    struct node *tail;
    struct node *next;
    struct node *dequeued;
    struct notifier *trigger;

    for (;;) {
        for (;;) {
            head = atomic_load_explicit (&event->head, memory_order_acquire);
            tail = atomic_load_explicit (&event->tail, memory_order_acquire);
            next = atomic_load_explicit (&((struct node *)from (head))->next,
                memory_order_acquire);
            head_again = atomic_load_explicit (&event->head, memory_order_acquire);
            if (head == head_again)
                break;
            on_failure(&event->head_contention);
        }
        on_success(&event->head_contention);

        if (from (head) == from (tail)) {
            if (!from (next)) {
                dequeued = NULL;
                trigger = NULL;
                break;
            }
            atomic_compare_exchange_strong
                (&event->tail,
                 &tail,
                 to (from (next), tag (tail) + 1U));
        } else {
            trigger = ((struct node *)from (next))->trigger;
            if (atomic_compare_exchange_strong (&event->head,
                             &head,
                             to (from (next), tag (head) + 1U))) {
                dequeued = from (head);
                break;
            }
        }
        on_failure(&event->head_contention);
    }
    on_success(&event->head_contention);

    if (dequeued) {
        deallocate (dequeued);
    }
    return trigger;
 }

 struct node * allocate (void)
 {
    struct node *head;
    struct node *next;
    struct node *n;

    for (;;) {
        head = atomic_load_explicit (&free_list, memory_order_relaxed);
        n = from (head);
        atomic_thread_fence (memory_order_acquire);
        if (!n) {
            on_success(&free_list_contention);
            n = aligned_alloc(_Alignof (struct node), sizeof *n);
            if (!n)
                abort();
            n->next = NULL;
            n->trigger = NULL;
            return n;
        }
        next = atomic_load_explicit (&n->next, memory_order_acquire);
        if (atomic_compare_exchange_weak_explicit(&free_list,
                        &head,
                              to (from (next), tag (head) + 1U),
                              memory_order_acq_rel,
                              memory_order_acquire))
            break;
        on_failure(&free_list_contention);
    }
    on_success(&free_list_contention);
    n->next = NULL;
    return n;
 }

 static void deallocate (struct node *node)
 {
    struct node *head;

    for (;;) {
        head = atomic_load_explicit (&free_list, memory_order_relaxed);
        node->next = to (from (head), 0);
        atomic_thread_fence(memory_order_acquire);
        if (atomic_compare_exchange_weak_explicit (&free_list,
                               &head,
                               to (node, tag (head) + 1U),
                               memory_order_acq_rel,
                               memory_order_acquire))
            break;
        on_failure(&free_list_contention);
    }
    on_success(&free_list_contention);
 }

 #if defined PARTIAL_FUTEX
 struct notifier {
    ALIGN_TO_CACHE _Atomic(bool) spinning;
    ALIGN_TO_CACHE _Atomic(int) triggered;
    ALIGN_TO_CACHE _Atomic(struct notifier *) next;
 };
 #else
 struct notifier {
    int fds[2U];
    ALIGN_TO_CACHE _Atomic(bool) spinning;
    ALIGN_TO_CACHE _Atomic(bool) triggered;
    ALIGN_TO_CACHE _Atomic(struct notifier *) next;
 };
 #endif

 struct notifier * allocate_notifier (void)
 {
    struct notifier *head;
    struct notifier *next;
    struct notifier *n;

    for (;;) {
        head = atomic_load_explicit (&notifier_free_list, memory_order_relaxed);
        n = from (head);
        atomic_thread_fence (memory_order_acquire);
        if (!n) {
            n = aligned_alloc(_Alignof (struct notifier), sizeof *n);
            if (!n)
                abort();

 #if !defined PARTIAL_FUTEX
 #if defined DO_EVENTFD
            int fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
            if (-1 == fd)
                abort();
            n->fds[0U] = fd;
            n->fds[1U] = fd;
 #else
            if (-1 == pipe2(n->fds, O_CLOEXEC | O_NONBLOCK))
                abort();
 #endif
 #endif

            n->spinning = false;
            n->triggered = false;
            n->next = NULL;
            return n;
        }
        next = atomic_load_explicit (&n->next, memory_order_acquire);
        if (atomic_compare_exchange_weak_explicit(&notifier_free_list,
                        &head,
                              to (from (next), tag (head) + 1U),
                              memory_order_acq_rel,
                              memory_order_acquire))
            break;
        on_failure(&notifier_free_list_contention);
    }
    on_success(&notifier_free_list_contention);

    n->next = NULL;
    n->triggered = false;
    return n;
 }

 static void deallocate_notifier (struct notifier *node)
 {
    struct notifier *head;

    node->triggered = false;
    node->next = NULL;

    for (;;) {
        head = atomic_load_explicit (&notifier_free_list, memory_order_relaxed);
        node->next = to (from (head), 0);
        atomic_thread_fence(memory_order_acquire);
        if (atomic_compare_exchange_weak_explicit (&notifier_free_list,
                          &head,
                          to (node, tag (head) + 1U),
                          memory_order_acq_rel,
                          memory_order_acquire))
            break;
        on_failure(&notifier_free_list_contention);
    }
    on_success(&notifier_free_list_contention);
 }

 #if defined PARTIAL_FUTEX
 static inline void notifier_signal(struct notifier *notifier)
 {
    bool aleady_signaled = atomic_exchange_explicit(&notifier->triggered, true, memory_order_acq_rel);
    bool still_spinning = atomic_load_explicit(&notifier->spinning, memory_order_acquire);

    if (aleady_signaled || still_spinning)
        return;

    syscall(__NR_futex, &notifier->triggered, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
 }

 static inline void notifier_wait(struct notifier *notifier)
 {
    for (;;) {
        atomic_store_explicit(&notifier->spinning, true, memory_order_release);

        atomic_thread_fence(memory_order_acquire);

        bool got_triggered = false;
        for (size_t ii = 0U; ii < NOTIFIER_FD_SPIN_LOOPS; ++ii) {
            got_triggered |= atomic_exchange_explicit(&notifier->triggered, false, memory_order_relaxed);
            if (got_triggered)
                break;
            _mm_pause();
        }
        atomic_thread_fence(memory_order_acq_rel);
        atomic_store_explicit(&notifier->spinning, false, memory_order_release);
        if (got_triggered)
            break;
        if (atomic_exchange_explicit(&notifier->triggered, false, memory_order_acq_rel))
            break;

        syscall(__NR_futex, &notifier->triggered,
               FUTEX_WAIT_PRIVATE, 0, NULL, NULL, 0);
    }
 }
 #else
 static inline void notifier_signal(struct notifier *notifier)
 {
    bool aleady_signaled = atomic_exchange_explicit(&notifier->triggered, true, memory_order_acq_rel);
    bool still_spinning = atomic_load_explicit(&notifier->spinning, memory_order_acquire);

    if (aleady_signaled || still_spinning)
        return;

    {
        static uint64_t const one = 1U;
        if (-1 == write(notifier->fds[1U], &one, sizeof one)) {
            if (errno != EAGAIN)
                abort();
        }
    }
 }

 static inline void notifier_wait(struct notifier *notifier)
 {
    for (;;) {
        atomic_store_explicit(&notifier->spinning, true, memory_order_release);

        atomic_thread_fence(memory_order_acquire);

        bool got_triggered = false;
        for (size_t ii = 0U; ii < NOTIFIER_FD_SPIN_LOOPS; ++ii) {
            got_triggered |= atomic_exchange_explicit(&notifier->triggered, false, memory_order_relaxed);
            if (got_triggered)
                break;
            _mm_pause();
        }
        atomic_thread_fence(memory_order_acq_rel);
        atomic_store_explicit(&notifier->spinning, false, memory_order_release);
        if (got_triggered)
            break;
        if (atomic_exchange_explicit(&notifier->triggered, false, memory_order_acq_rel))
            break;

        {
            struct pollfd fds[1U] = {{ .fd = notifier->fds[0U],
                           .events = POLLIN}};
            if (-1 == poll(fds, 1U, -1)) {
                abort();
            }
        }

        for (;;) {
            uint64_t xx;
            if (-1 == read(notifier->fds[0U], &xx, sizeof xx)) {
                if (EAGAIN == errno)
                    break;
                abort();
            }
        }
    }
 }
 #endif

 static void on_failure(_Atomic(uint64_t) *contention)
 {
    uint64_t contention_count = atomic_load_explicit(contention, memory_order_relaxed);
    uint64_t next_count = contention_count + 1U;
    atomic_store_explicit(contention,
            contention_count < UINT64_MAX ?
             next_count : contention_count,
             memory_order_relaxed);

    if (contention_count < NOOP_CONTEND_CYCLES) {
        /* do nothing */
    } else if (contention_count < PAUSE_CONTEND_CYCLES) {
        _mm_pause();
    } else {
        sched_yield();
    }
 }

 static void on_success(_Atomic(uint64_t) *contention)
 {
    uint64_t contention_count = atomic_load_explicit(contention, memory_order_relaxed);
    uint64_t next_count = contention_count - 1U;
    atomic_store_explicit(contention,
            contention_count > 0U ?
             next_count : contention_count,
             memory_order_relaxed);
 }
 #endif

 static void *contend(void * arg)
 {
    struct lock *lock = arg;
    for (size_t ii = 0U; ii < PRODUCER_LOOP; ++ii) {
        lock_acquire(lock);
        /* Do nothing */
        lock_release(lock);
    }
    return 0;
 } 
\$\endgroup\$

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Browse other questions tagged or ask your own question.