Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the upsert and compute methods for modifying a cached entry #370

Merged
merged 16 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Compute API - Change the compute family methods to record a read
access to the entry

Also improve the docs and source code comments of the compute family methods.
  • Loading branch information
tatsuya6502 committed Jan 6, 2024
commit 6eff2d330e23428714c51ef2cba226f24afc19e5
6 changes: 5 additions & 1 deletion src/common/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ impl<K, V> Entry<K, V> {
self.is_fresh
}

/// Returns `true` if the value in this `Entry` replaced an old cached value.
/// Returns `true` if the value in this `Entry` was already cached and replaced
/// with a new value.
///
/// Note that the new value can be the same as the old value. In that case, this
/// method still returns `true`.
pub fn is_updated(&self) -> bool {
self.is_updated
}
Expand Down
4 changes: 2 additions & 2 deletions src/future/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2083,10 +2083,10 @@ where
.map(Entry::into_value)
}

async fn get_entry_without_recording(&self, key: &Arc<K>, hash: u64) -> Option<Entry<K, V>> {
async fn get_entry(&self, key: &Arc<K>, hash: u64) -> Option<Entry<K, V>> {
let ignore_if = None as Option<&mut fn(&V) -> bool>;
self.base
.get_with_hash(key, hash, ignore_if, true, false)
.get_with_hash(key, hash, ignore_if, true, true)
.await
}

Expand Down
25 changes: 16 additions & 9 deletions src/future/value_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ const WAITER_MAP_NUM_SEGMENTS: usize = 64;

#[async_trait]
pub(crate) trait GetOrInsert<K, V> {
/// Gets a value for the given key without recording the access to the cache
/// policies.
async fn get_without_recording<I>(
&self,
key: &Arc<K>,
Expand All @@ -29,12 +31,16 @@ pub(crate) trait GetOrInsert<K, V> {
V: 'static,
I: for<'i> FnMut(&'i V) -> bool + Send;

async fn get_entry_without_recording(&self, key: &Arc<K>, hash: u64) -> Option<Entry<K, V>>
/// Gets an entry for the given key _with_ recording the access to the cache
/// policies.
async fn get_entry(&self, key: &Arc<K>, hash: u64) -> Option<Entry<K, V>>
where
V: 'static;

/// Inserts a value for the given key.
async fn insert(&self, key: Arc<K>, hash: u64, value: V);

/// Removes a value for the given key. Returns the removed value.
async fn remove(&self, key: &Arc<K>, hash: u64) -> Option<V>;
}

Expand Down Expand Up @@ -209,6 +215,7 @@ where
let Some(existing_waiter) =
try_insert_waiter(&self.waiters, w_key.clone(), w_hash, &waiter)
else {
// Inserted.
break;
};

Expand Down Expand Up @@ -290,7 +297,7 @@ where
&'a self,
c_key: &Arc<K>,
c_hash: u64,
cache: &C, // Future to initialize a new value.
cache: &C,
f: F,
post_init: fn(O) -> Result<compute::Op<V>, E>,
allow_nop: bool,
Expand All @@ -305,9 +312,7 @@ where
use ComputeResult::{EvalErr, Inserted, Nop, Removed, Updated};

let type_id = TypeId::of::<ComputeNone>();

let (w_key, w_hash) = waiter_key_hash(&self.waiters, c_key, type_id);

let waiter = TrioArc::new(RwLock::new(WaiterValue::Computing));
// NOTE: We have to acquire a write lock before `try_insert_waiter`,
// so that any concurrent attempt will get our lock and wait on it.
Expand All @@ -317,10 +322,12 @@ where
let Some(existing_waiter) =
try_insert_waiter(&self.waiters, w_key.clone(), w_hash, &waiter)
else {
// Inserted.
break;
};

// Somebody else's waiter already exists, so wait for its result to become available.
// Somebody else's waiter already exists, so wait for it to finish
// (wait for it to release the write lock).
let waiter_result = existing_waiter.read().await;
match &*waiter_result {
// Unexpected state.
Expand All @@ -329,7 +336,7 @@ where
This might be a bug in Moka"
),
_ => {
// Retry from the beginning.
// Try to insert our waiter again.
continue;
}
}
Expand All @@ -343,16 +350,16 @@ where
let waiter_guard = WaiterGuard::new(w_key, w_hash, &self.waiters, lock);

// Get the current value.
let maybe_entry = cache.get_entry_without_recording(c_key, c_hash).await;
let maybe_entry = cache.get_entry(c_key, c_hash).await;
let maybe_value = if allow_nop {
maybe_entry.as_ref().map(|ent| ent.value().clone())
} else {
None
};
let entry_existed = maybe_entry.is_some();

// Let's evaluate the `f` closure and get a future. Catching panic is safe
// here as we will not evaluate the closure again.
// Evaluate the `f` closure and get a future. Catching panic is safe here as
// we will not evaluate the closure again.
let fut = match std::panic::catch_unwind(AssertUnwindSafe(|| f(maybe_entry))) {
// Evaluated.
Ok(fut) => fut,
Expand Down
8 changes: 4 additions & 4 deletions src/sync/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1905,7 +1905,7 @@ where
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
fn get_entry_without_recording(&self, key: &Arc<K>, hash: u64) -> Option<Entry<K, V>> {
fn get_entry(&self, key: &Arc<K>, hash: u64) -> Option<Entry<K, V>> {
let ignore_if = None as Option<&mut fn(&V) -> bool>;
self.base
.get_with_hash_and_ignore_if(key, hash, ignore_if, true)
Expand Down Expand Up @@ -4033,7 +4033,7 @@ mod tests {

// Spawn three threads to call `and_upsert_with` for the same key and each
// task increments the current value by 1. Ensure the key-level lock is
// working by verifying the value is 3 after all tasks finish.
// working by verifying the value is 3 after all threads finish.
//
// | | thread 1 | thread 2 | thread 3 |
// |--------|----------|----------|----------|
Expand Down Expand Up @@ -4103,7 +4103,7 @@ mod tests {
const KEY: u32 = 0;

// Spawn six threads to call `and_compute_with` for the same key. Ensure the
// key-level lock is working by verifying the value after all tasks finish.
// key-level lock is working by verifying the value after all threads finish.
//
// | | thread 1 | thread 2 | thread 3 | thread 4 | thread 5 | thread 6 |
// |---------|------------|---------------|------------|----------|------------|----------|
Expand Down Expand Up @@ -4271,7 +4271,7 @@ mod tests {
const KEY: u32 = 0;

// Spawn four threads to call `and_try_compute_with` for the same key. Ensure
// the key-level lock is working by verifying the value after all tasks
// the key-level lock is working by verifying the value after all threads
// finish.
//
// | | thread 1 | thread 2 | thread 3 | thread 4 |
Expand Down
35 changes: 20 additions & 15 deletions src/sync/value_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ use super::{ComputeNone, OptionallyNone};
const WAITER_MAP_NUM_SEGMENTS: usize = 64;

pub(crate) trait GetOrInsert<K, V> {
fn get_entry_without_recording(&self, key: &Arc<K>, hash: u64) -> Option<Entry<K, V>>
/// Gets an entry for the given key _with_ recording the access to the cache
/// policies.
fn get_entry(&self, key: &Arc<K>, hash: u64) -> Option<Entry<K, V>>
where
V: 'static;

/// Inserts a value for the given key.
fn insert(&self, key: Arc<K>, hash: u64, value: V);

/// Removes a value for the given key. Returns the removed value.
fn remove(&self, key: &Arc<K>, hash: u64) -> Option<V>;
}

Expand All @@ -31,7 +35,7 @@ enum WaiterValue<V> {
Ready(Result<V, ErrorObject>),
ReadyNone,
// https://github.com/moka-rs/moka/issues/43
InitFuturePanicked,
InitClosurePanicked,
}

impl<V> fmt::Debug for WaiterValue<V> {
Expand All @@ -40,7 +44,7 @@ impl<V> fmt::Debug for WaiterValue<V> {
WaiterValue::Computing => write!(f, "Computing"),
WaiterValue::Ready(_) => write!(f, "Ready"),
WaiterValue::ReadyNone => write!(f, "ReadyNone"),
WaiterValue::InitFuturePanicked => write!(f, "InitFuturePanicked"),
WaiterValue::InitClosurePanicked => write!(f, "InitFuturePanicked"),
}
}
}
Expand Down Expand Up @@ -117,6 +121,7 @@ where
loop {
let Some(existing_waiter) = self.try_insert_waiter(w_key.clone(), w_hash, &waiter)
else {
// Inserted.
break;
};

Expand All @@ -126,7 +131,7 @@ where
WaiterValue::Ready(Ok(value)) => return ReadExisting(value.clone()),
WaiterValue::Ready(Err(e)) => return InitErr(Arc::clone(e).downcast().unwrap()),
// Somebody else's init closure has been panicked.
WaiterValue::InitFuturePanicked => {
WaiterValue::InitClosurePanicked => {
retries += 1;
assert!(
retries < MAX_RETRIES,
Expand Down Expand Up @@ -180,7 +185,7 @@ where
}
// Panicked.
Err(payload) => {
*lock = WaiterValue::InitFuturePanicked;
*lock = WaiterValue::InitClosurePanicked;
// Remove the waiter so that others can retry.
self.remove_waiter(w_key, w_hash);
resume_unwind(payload);
Expand All @@ -190,12 +195,12 @@ where
}

/// # Panics
/// Panics if the `init` future has been panicked.
/// Panics if the `init` closure has been panicked.
pub(crate) fn try_compute<'a, C, F, O, E>(
&'a self,
c_key: &Arc<K>,
c_hash: u64,
cache: &C, // Future to initialize a new value.
cache: &C,
f: F,
post_init: fn(O) -> Result<compute::Op<V>, E>,
allow_nop: bool,
Expand All @@ -210,9 +215,7 @@ where
use ComputeResult::{EvalErr, Inserted, Nop, Removed, Updated};

let type_id = TypeId::of::<ComputeNone>();

let (w_key, w_hash) = self.waiter_key_hash(c_key, type_id);

let waiter = TrioArc::new(RwLock::new(WaiterValue::Computing));
// NOTE: We have to acquire a write lock before `try_insert_waiter`,
// so that any concurrent attempt will get our lock and wait on it.
Expand All @@ -221,10 +224,12 @@ where
loop {
let Some(existing_waiter) = self.try_insert_waiter(w_key.clone(), w_hash, &waiter)
else {
// Inserted.
break;
};

// Somebody else's waiter already exists, so wait for its result to become available.
// Somebody else's waiter already exists, so wait for it to finish
// (wait for it to release the write lock).
let waiter_result = existing_waiter.read();
match &*waiter_result {
// Unexpected state.
Expand All @@ -233,7 +238,7 @@ where
This might be a bug in Moka"
),
_ => {
// Retry from the beginning.
// Try to insert our waiter again.
continue;
}
}
Expand All @@ -242,16 +247,16 @@ where
// Our waiter was inserted.

// Get the current value.
let maybe_entry = cache.get_entry_without_recording(c_key, c_hash);
let maybe_entry = cache.get_entry(c_key, c_hash);
let maybe_value = if allow_nop {
maybe_entry.as_ref().map(|ent| ent.value().clone())
} else {
None
};
let entry_existed = maybe_entry.is_some();

// Let's evaluate the `f` closure and get a future. Catching panic is safe
// here as we will not evaluate the closure again.
// Evaluate the `f` closure. Catching panic is safe here as we will not
// evaluate the closure again.
match catch_unwind(AssertUnwindSafe(|| f(maybe_entry))) {
// Evaluated.
Ok(op) => {
Expand Down Expand Up @@ -283,7 +288,7 @@ where
}
// Panicked.
Err(payload) => {
*lock = WaiterValue::InitFuturePanicked;
*lock = WaiterValue::InitClosurePanicked;
// Remove the waiter so that others can retry.
self.remove_waiter(w_key, w_hash);
resume_unwind(payload);
Expand Down