Files
continuwuity/src/database/map/get_batch.rs
T

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

95 lines
2.3 KiB
Rust
Raw Normal View History

2024-12-03 10:42:52 +00:00
use std::{convert::AsRef, fmt::Debug, sync::Arc};
use conduwuit::{
err, implement,
2025-01-01 06:08:20 +00:00
utils::{
stream::{automatic_amplification, automatic_width, WidebandExt},
IterStream,
},
Result,
};
2025-01-01 06:08:20 +00:00
use futures::{Stream, StreamExt, TryStreamExt};
2024-12-03 10:42:52 +00:00
use serde::Serialize;
2025-01-01 06:08:20 +00:00
use crate::{keyval::KeyBuf, ser, util::map_err, Handle};
2024-12-03 10:42:52 +00:00
#[implement(super::Map)]
#[tracing::instrument(skip(self, keys), level = "trace")]
2025-01-01 06:08:20 +00:00
pub fn qry_batch<'a, S, K>(
self: &'a Arc<Self>,
2025-01-01 06:08:20 +00:00
keys: S,
) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a
2024-12-03 10:42:52 +00:00
where
2025-01-01 06:08:20 +00:00
S: Stream<Item = K> + Send + 'a,
K: Serialize + Debug + 'a,
2024-12-03 10:42:52 +00:00
{
2025-01-01 06:08:20 +00:00
use crate::pool::Get;
keys.ready_chunks(automatic_amplification())
.widen_then(automatic_width(), |chunk| {
let keys = chunk
.iter()
.map(ser::serialize_to::<KeyBuf, _>)
.map(|result| result.expect("failed to serialize query key"))
.map(Into::into)
.collect();
self.db
.pool
.execute_get(Get { map: self.clone(), key: keys, res: None })
})
.map_ok(|results| results.into_iter().stream())
.try_flatten()
2024-12-03 10:42:52 +00:00
}
#[implement(super::Map)]
#[tracing::instrument(skip(self, keys), level = "trace")]
2025-01-01 06:08:20 +00:00
pub fn get_batch<'a, S, K>(
self: &'a Arc<Self>,
2025-01-01 06:08:20 +00:00
keys: S,
) -> impl Stream<Item = Result<Handle<'_>>> + Send + 'a
2024-12-03 10:42:52 +00:00
where
2025-01-01 06:08:20 +00:00
S: Stream<Item = K> + Send + 'a,
K: AsRef<[u8]> + Send + Sync + 'a,
2024-12-03 10:42:52 +00:00
{
2025-01-01 06:08:20 +00:00
use crate::pool::Get;
keys.ready_chunks(automatic_amplification())
.widen_then(automatic_width(), |chunk| {
self.db.pool.execute_get(Get {
map: self.clone(),
key: chunk.iter().map(AsRef::as_ref).map(Into::into).collect(),
res: None,
})
})
.map_ok(|results| results.into_iter().stream())
.try_flatten()
2024-12-03 10:42:52 +00:00
}
#[implement(super::Map)]
2024-12-08 20:31:16 +00:00
#[tracing::instrument(name = "batch_blocking", level = "trace", skip_all)]
pub(crate) fn get_batch_blocking<'a, I, K>(
&self,
keys: I,
) -> impl Iterator<Item = Result<Handle<'_>>> + Send
2024-12-03 10:42:52 +00:00
where
2025-01-01 06:08:20 +00:00
I: Iterator<Item = &'a K> + ExactSizeIterator + Send,
K: AsRef<[u8]> + Send + ?Sized + Sync + 'a,
2024-12-03 10:42:52 +00:00
{
// Optimization can be `true` if key vector is pre-sorted **by the column
// comparator**.
const SORTED: bool = false;
let read_options = &self.read_options;
self.db
.db
.batched_multi_get_cf_opt(&self.cf(), keys, SORTED, read_options)
.into_iter()
.map(|result| {
result
.map_err(map_err)?
.map(Handle::from)
.ok_or(err!(Request(NotFound("Not found in database"))))
})
}