Skip to content

Commit

Permalink
misc: introduce uv_get|set_threadpool_size
Browse files Browse the repository at this point in the history
This patch will allow to grow and set the thread pool size on
demand via libuv calls instead of just `UV_THREADPOOL_SIZE` but
`UV_THREADPOOL_SIZE` will remain as the initial value.

Fixes: libuv#4401
Signed-off-by: Juan José Arboleda <[email protected]>
  • Loading branch information
juanarbol committed Jun 24, 2024
1 parent ba24986 commit 9bad1d8
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 16 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,7 @@ if(LIBUV_BUILD_TESTS)
test/test-thread.c
test/test-thread-priority.c
test/test-threadpool-cancel.c
test/test-threadpool-size.c
test/test-threadpool.c
test/test-timer-again.c
test/test-timer-from-check.c
Expand Down
1 change: 1 addition & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \
test/test-thread-affinity.c \
test/test-thread-priority.c \
test/test-threadpool-cancel.c \
test/test-threadpool-size.c \
test/test-threadpool.c \
test/test-timer-again.c \
test/test-timer-from-check.c \
Expand Down
20 changes: 20 additions & 0 deletions docs/src/misc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -927,3 +927,23 @@ is not complete.
for the NUL terminator.
.. versionadded:: 1.47.0
.. c:function:: int uv_set_threadpool_size(unsingned int number)
If this function is called before any other libuv call that requires the
threadpool will set the value of `UV_THREADPOOL_SIZE` if provided, in that;
case, if `number` is less than `UV_THREADPOOL_SIZE` it returns `UV_EINVAL`
as the threadpool can't be shrank. If `UV_THREADPOOL_SIZE` is not provided
it will spawn `number` of threads for the threadpool.
On success, it returns 0, if the threadpool is already spining and `number`
is smaller than the provided `number` it returns `UV_EINVAL`.
.. c:function:: unsingned int uv_get_threadpool_size(void)
If this function is called before any other libuv call that uses the
threadpool, the value will be 4 by default or `UV_THREADPOOL_SIZE` and will
spawn the threadpool as a side effect.
It always returns the number of threads in the thread pool (unless something
really bad happens).
3 changes: 3 additions & 0 deletions include/uv.h
Original file line number Diff line number Diff line change
Expand Up @@ -1328,6 +1328,9 @@ enum {
UV_EXTERN int uv_thread_getpriority(uv_thread_t tid, int* priority);
UV_EXTERN int uv_thread_setpriority(uv_thread_t tid, int priority);

UV_EXTERN int uv_set_threadpool_size(unsigned int number);
UV_EXTERN unsigned int uv_get_threadpool_size(void);

UV_EXTERN unsigned int uv_available_parallelism(void);
UV_EXTERN int uv_cpu_info(uv_cpu_info_t** cpu_infos, int* count);
UV_EXTERN void uv_free_cpu_info(uv_cpu_info_t* cpu_infos, int count);
Expand Down
106 changes: 90 additions & 16 deletions src/threadpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,31 @@ void uv__threadpool_cleanup(void) {
}


static void init_threads(void) {
static void spin_threads(unsigned int from, unsigned int to) {
uv_thread_options_t config;
unsigned int i;
const char* val;
uv_sem_t sem;

if (uv_sem_init(&sem, 0))
abort();

config.flags = UV_THREAD_HAS_STACK_SIZE;
config.stack_size = 8u << 20; /* 8 MB */

for (i = from; i < to; i++)
if (uv_thread_create_ex(threads + i, &config, worker, &sem))
abort();

for (i = from; i < to; i++)
uv_sem_wait(&sem);

uv_sem_destroy(&sem);
}


static void init_threads(void) {
const char* val;

nthreads = ARRAY_SIZE(default_threads);
val = getenv("UV_THREADPOOL_SIZE");
if (val != NULL)
Expand Down Expand Up @@ -224,20 +243,8 @@ static void init_threads(void) {
uv__queue_init(&slow_io_pending_wq);
uv__queue_init(&run_slow_work_message);

if (uv_sem_init(&sem, 0))
abort();

config.flags = UV_THREAD_HAS_STACK_SIZE;
config.stack_size = 8u << 20; /* 8 MB */

for (i = 0; i < nthreads; i++)
if (uv_thread_create_ex(threads + i, &config, worker, &sem))
abort();

for (i = 0; i < nthreads; i++)
uv_sem_wait(&sem);

uv_sem_destroy(&sem);
/* Spin up the default number of threads */
spin_threads(0 /* from */, nthreads /* to */);
}


Expand All @@ -261,6 +268,73 @@ static void init_once(void) {
init_threads();
}

int uv_set_threadpool_size(unsigned int number) {
unsigned int spin_nthreads;

/* Ensure the threadpool and mutex is initialized */
uv_once(&once, init_once);

uv_mutex_lock(&mutex);

/* no-op if the threadpool size is already set */
if (number == nthreads) {
uv_mutex_unlock(&mutex);
return 0;
}

if (nthreads < number) {
/* Just spin up the difference and respect the MAX_THREADPOOL_SIZE */
spin_nthreads = number - nthreads;
if (spin_nthreads > MAX_THREADPOOL_SIZE)
spin_nthreads = MAX_THREADPOOL_SIZE - nthreads;
} else {
uv_mutex_unlock(&mutex);
return UV_EINVAL; /* Cannot shrink the threadpool */
}

/* Alloc -with the expected size- if was not alloc(ed) before */
if (threads == default_threads) {
uv_thread_t* tmp = uv__malloc((nthreads + spin_nthreads) *
sizeof(threads[0]));
if (tmp == NULL) {
uv_mutex_unlock(&mutex);
return UV_ENOMEM;
}
memcpy(tmp, threads, nthreads * sizeof(threads[0]));
threads = tmp;
} else {
/* Re-alloc more memory for the new threads. */
threads = uv__realloc(threads,
(nthreads + spin_nthreads) * sizeof(threads[0]));
if (threads == NULL) {
uv_mutex_unlock(&mutex);
return UV_ENOMEM;
}

}

/* Spin up the new threads */
spin_threads(nthreads /* from */, nthreads + spin_nthreads /* to */);

/* Update the thread count */
nthreads += spin_nthreads;

uv_mutex_unlock(&mutex);

return 0;
}

unsigned int uv_get_threadpool_size(void) {
unsigned int nthreads_val;

uv_once(&once, init_once);
uv_mutex_lock(&mutex);
nthreads_val = nthreads;
uv_mutex_unlock(&mutex);

return nthreads_val;
}


void uv__work_submit(uv_loop_t* loop,
struct uv__work* w,
Expand Down
4 changes: 4 additions & 0 deletions test/test-list.h
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,8 @@ TEST_DECLARE (fs_get_system_error)
TEST_DECLARE (strscpy)
TEST_DECLARE (strtok)
TEST_DECLARE (threadpool_queue_work_simple)
TEST_DECLARE (threadpool_size)
TEST_DECLARE (threadpool_size_env)
TEST_DECLARE (threadpool_queue_work_einval)
TEST_DECLARE (threadpool_multiple_event_loops)
TEST_DECLARE (threadpool_cancel_getaddrinfo)
Expand Down Expand Up @@ -1161,6 +1163,8 @@ TASK_LIST_START
TEST_ENTRY (strtok)
TEST_ENTRY (threadpool_queue_work_simple)
TEST_ENTRY (threadpool_queue_work_einval)
TEST_ENTRY (threadpool_size)
TEST_ENTRY (threadpool_size_env)
TEST_ENTRY_CUSTOM (threadpool_multiple_event_loops, 0, 0, 60000)
TEST_ENTRY (threadpool_cancel_getaddrinfo)
TEST_ENTRY (threadpool_cancel_getnameinfo)
Expand Down
94 changes: 94 additions & 0 deletions test/test-threadpool-size.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/* Copyright libuv project contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/

#include "uv.h"
#include "task.h"

TEST_IMPL(threadpool_size) {
int r;
unsigned int nthreads;

/* Can't shrink the threadpool size */
r = uv_set_threadpool_size(1);
ASSERT_EQ(r, UV_EINVAL);

/* The default thread pool size is 4 */
nthreads = uv_get_threadpool_size();
ASSERT_EQ(nthreads, 4);

/* Normal use case (increase the size of the thread pool) */
r = uv_set_threadpool_size(5);
ASSERT_OK(r);

/* The threadpool keeps the same */
nthreads = uv_get_threadpool_size();
ASSERT_EQ(nthreads, 5);

r = uv_set_threadpool_size(6);
ASSERT_OK(r);
nthreads = uv_get_threadpool_size();
ASSERT_EQ(nthreads, 6);

uv_run(uv_default_loop(), UV_RUN_DEFAULT);

MAKE_VALGRIND_HAPPY(uv_default_loop());
return 0;
}

TEST_IMPL(threadpool_size_env) {
int r;
unsigned int nthreads;

#if defined(_WIN32) && defined(__ASAN__)
/* See investigation in https://github.com/libuv/libuv/issues/4338 */
RETURN_SKIP("Test does not currently work on Windows under ASAN");
#endif

r = uv_os_setenv("UV_THREADPOOL_SIZE", "1");
ASSERT_OK(r);

/* Can't shrink the threadpool size */
r = uv_set_threadpool_size(0);
ASSERT_EQ(r, UV_EINVAL);

nthreads = uv_get_threadpool_size();
#ifdef _WIN32
/* juanarbol: why? */
ASSERT_EQ(nthreads, 4);
#else
/* The default thread pool size is UV_THREADPOOL_SIZE */
ASSERT_EQ(nthreads, 1);
#endif


/* Normal use case (increase the size of the thread pool) */
r = uv_set_threadpool_size(5);
ASSERT_OK(r);

/* The threadpool keeps the same */
nthreads = uv_get_threadpool_size();
ASSERT_EQ(nthreads, 5);

uv_run(uv_default_loop(), UV_RUN_DEFAULT);

MAKE_VALGRIND_HAPPY(uv_default_loop());
return 0;
}

0 comments on commit 9bad1d8

Please sign in to comment.