Skip to content

Commit c55bfdb

Browse files
committed
async list
1 parent 020f112 commit c55bfdb

File tree

6 files changed

+147
-38
lines changed

6 files changed

+147
-38
lines changed

core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ services-oss = [
192192
services-pcloud = []
193193
services-persy = ["dep:persy", "internal-tokio-rt"]
194194
services-postgresql = ["dep:sqlx", "sqlx?/postgres"]
195-
services-redb = ["dep:redb", "internal-tokio-rt"]
195+
services-redb = ["dep:redb", "internal-tokio-rt", "dep:flume"]
196196
services-redis = ["dep:redis", "dep:bb8", "redis?/tokio-rustls-comp"]
197197
services-redis-native-tls = ["services-redis", "redis?/tokio-native-tls-comp"]
198198
services-rocksdb = ["dep:rocksdb", "internal-tokio-rt"]

core/src/services/redb/backend.rs

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,19 @@ use std::sync::Arc;
2020

2121
use tokio::task;
2222

23+
use crate::raw::oio::HierarchyLister;
24+
use crate::raw::*;
25+
use crate::services::RedbConfig;
2326
use crate::Builder;
2427
use crate::Error;
2528
use crate::ErrorKind;
2629
use crate::Scheme;
27-
use crate::raw::oio::HierarchyLister;
28-
use crate::raw::*;
29-
use crate::services::RedbConfig;
3030
use crate::*;
3131

3232
use super::core::RedbCore;
33+
use super::deleter::RedbDeleter;
3334
use super::error::*;
35+
use super::lister::RedbFilter;
3436
use super::lister::RedbLister;
3537
use super::writer::RedbWriter;
3638

@@ -149,13 +151,15 @@ pub struct RedbBackend {
149151
impl Access for RedbBackend {
150152
type Reader = Buffer;
151153
type Writer = RedbWriter;
152-
type Lister = ();
154+
type Lister = HierarchyLister<RedbLister>;
155+
type Deleter = oio::OneShotDeleter<RedbDeleter>;
153156
type BlockingReader = Buffer;
154157
type BlockingWriter = RedbWriter;
155-
type BlockingLister = HierarchyLister<RedbLister>;
158+
type BlockingLister = HierarchyLister<RedbFilter>;
159+
type BlockingDeleter = oio::OneShotDeleter<RedbDeleter>;
156160

157161
fn info(&self) -> Arc<AccessorInfo> {
158-
let mut am = AccessorInfo::default();
162+
let am = AccessorInfo::default();
159163
am.set_scheme(Scheme::Redb)
160164
.set_root(&self.core.root)
161165
.set_name(&self.core.datadir)
@@ -233,27 +237,30 @@ impl Access for RedbBackend {
233237
Ok((RpWrite::new(), RedbWriter::new(self.core.clone(), p)))
234238
}
235239

236-
async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
237-
let cloned_self = self.clone();
238-
let cloned_path = path.to_string();
240+
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
241+
self.blocking_delete()
242+
}
239243

240-
task::spawn_blocking(move || cloned_self.blocking_delete(cloned_path.as_str(), args))
241-
.await
242-
.map_err(new_task_join_error)
243-
.and_then(|inner_result| inner_result)
244+
fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
245+
Ok((
246+
RpDelete::default(),
247+
oio::OneShotDeleter::new(RedbDeleter::new(self.core.clone())),
248+
))
244249
}
245250

246-
fn blocking_delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
247-
let p = build_abs_path(&self.core.root, path);
251+
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
252+
let pattern = build_abs_path(&self.core.root, path);
253+
let range = self.core.iter()?;
254+
let lister = RedbLister::new(RedbFilter::new(range, pattern));
255+
let lister = HierarchyLister::new(lister, path, args.recursive());
248256

249-
self.core.delete(&p)?;
250-
Ok(RpDelete::default())
257+
Ok((RpList::default(), lister))
251258
}
252259

253260
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
254261
let pattern = build_abs_path(&self.core.root, path);
255262
let range = self.core.iter()?;
256-
let lister = RedbLister::new(range, pattern);
263+
let lister = RedbFilter::new(range, pattern);
257264
let lister = HierarchyLister::new(lister, path, args.recursive());
258265

259266
Ok((RpList::default(), lister))

core/src/services/redb/deleter.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
use std::sync::Arc;
2+
3+
use tokio::task;
4+
5+
use crate::raw::build_abs_path;
6+
use crate::raw::new_task_join_error;
7+
use crate::raw::oio;
8+
use crate::raw::OpDelete;
9+
use crate::Result;
10+
11+
use super::core::RedbCore;
12+
13+
pub struct RedbDeleter {
14+
core: Arc<RedbCore>,
15+
}
16+
17+
impl RedbDeleter {
18+
pub fn new(core: Arc<RedbCore>) -> Self {
19+
RedbDeleter { core }
20+
}
21+
}
22+
23+
impl oio::OneShotDelete for RedbDeleter {
24+
async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
25+
let p = build_abs_path(&self.core.root, &path);
26+
let core = self.core.clone();
27+
28+
task::spawn_blocking(move || core.delete(&p))
29+
.await
30+
.map_err(new_task_join_error)
31+
.and_then(|inner_result| inner_result)
32+
}
33+
}
34+
35+
impl oio::BlockingOneShotDelete for RedbDeleter {
36+
fn blocking_delete_once(&self, path: String, _: OpDelete) -> Result<()> {
37+
let p = build_abs_path(&self.core.root, &path);
38+
39+
self.core.delete(&p)?;
40+
Ok(())
41+
}
42+
}

core/src/services/redb/lister.rs

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,73 @@
1+
use tokio::task;
2+
3+
use crate::raw::oio;
14
use crate::EntryMode;
25
use crate::Metadata;
36
use crate::Result;
4-
use crate::raw::oio;
57

68
use super::error::parse_storage_error;
79

10+
#[derive(Debug)]
811
pub struct RedbLister {
12+
receiver: flume::Receiver<Result<oio::Entry>>,
13+
}
14+
15+
impl RedbLister {
16+
pub fn new(mut filter: RedbFilter) -> Self {
17+
let (tx, rx) = flume::bounded(1);
18+
19+
task::spawn_blocking(move || loop {
20+
let Some(result) = filter.range.next() else {
21+
break;
22+
};
23+
24+
let (key, value) = match result {
25+
Ok(pair) => pair,
26+
Err(e) => {
27+
let e = parse_storage_error(e);
28+
if tx.send(Err(e)).is_err() {
29+
break;
30+
}
31+
continue;
32+
}
33+
};
34+
35+
let key = key.value();
36+
let size = value.value().len() as u64;
37+
if key.starts_with(&filter.pattern) {
38+
let mode = EntryMode::from_path(key);
39+
let entry = oio::Entry::new(key, Metadata::new(mode).with_content_length(size));
40+
if tx.send(Ok(entry)).is_err() {
41+
break;
42+
}
43+
}
44+
});
45+
46+
Self { receiver: rx }
47+
}
48+
}
49+
50+
impl oio::List for RedbLister {
51+
async fn next(&mut self) -> Result<Option<oio::Entry>> {
52+
match self.receiver.recv_async().await {
53+
Ok(entry) => entry.map(Some),
54+
Err(_) => Ok(None),
55+
}
56+
}
57+
}
58+
59+
pub struct RedbFilter {
960
range: redb::Range<'static, &'static str, &'static [u8]>,
1061
pattern: String,
1162
}
1263

13-
impl RedbLister {
64+
impl RedbFilter {
1465
pub fn new(range: redb::Range<'static, &'static str, &'static [u8]>, pattern: String) -> Self {
1566
Self { range, pattern }
1667
}
1768
}
1869

19-
impl oio::BlockingList for RedbLister {
70+
impl oio::BlockingList for RedbFilter {
2071
fn next(&mut self) -> Result<Option<oio::Entry>> {
2172
loop {
2273
let Some(result) = self.range.next() else {
@@ -27,11 +78,7 @@ impl oio::BlockingList for RedbLister {
2778
let key = key.value();
2879
let size = value.value().len() as u64;
2980
if key.starts_with(&self.pattern) {
30-
let mode = if key.ends_with('/') {
31-
EntryMode::DIR
32-
} else {
33-
EntryMode::FILE
34-
};
81+
let mode = EntryMode::from_path(key);
3582
return Ok(Some(oio::Entry::new(
3683
key,
3784
Metadata::new(mode).with_content_length(size),

core/src/services/redb/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
#[cfg(feature = "services-redb")]
1919
mod core;
2020
#[cfg(feature = "services-redb")]
21+
mod deleter;
22+
#[cfg(feature = "services-redb")]
2123
mod error;
2224
#[cfg(feature = "services-redb")]
2325
mod lister;

core/src/services/redb/writer.rs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ use std::sync::Arc;
22

33
use tokio::task;
44

5-
use crate::Buffer;
6-
use crate::Result;
75
use crate::raw::new_task_join_error;
86
use crate::raw::oio::{self, QueueBuf};
7+
use crate::Buffer;
8+
use crate::EntryMode;
9+
use crate::Metadata;
10+
use crate::Result;
911

1012
use super::core::RedbCore;
1113

@@ -36,18 +38,23 @@ impl oio::Write for RedbWriter {
3638
Ok(())
3739
}
3840

39-
async fn close(&mut self) -> Result<()> {
41+
async fn close(&mut self) -> Result<Metadata> {
4042
let buf = self.buffer.clone().collect();
4143

4244
let core = self.core.clone();
4345
let cloned_path = self.path.clone();
4446

45-
task::spawn_blocking(move || core.set(&cloned_path, &buf.to_vec()))
46-
.await
47-
.map_err(new_task_join_error)
48-
.and_then(|inner_result| inner_result)?;
47+
task::spawn_blocking(move || {
48+
let value = buf.to_vec();
49+
core.set(&cloned_path, &value)?;
4950

50-
Ok(())
51+
let meta = Metadata::new(EntryMode::from_path(&cloned_path))
52+
.with_content_length(value.len() as _);
53+
Ok(meta)
54+
})
55+
.await
56+
.map_err(new_task_join_error)
57+
.and_then(|inner_result| inner_result)
5158
}
5259

5360
async fn abort(&mut self) -> Result<()> {
@@ -62,9 +69,13 @@ impl oio::BlockingWrite for RedbWriter {
6269
Ok(())
6370
}
6471

65-
fn close(&mut self) -> Result<()> {
72+
fn close(&mut self) -> Result<Metadata> {
6673
let buf = self.buffer.clone().collect();
67-
self.core.set(&self.path, &buf.to_vec())?;
68-
Ok(())
74+
let value = buf.to_vec();
75+
self.core.set(&self.path, &value)?;
76+
77+
let meta =
78+
Metadata::new(EntryMode::from_path(&self.path)).with_content_length(value.len() as _);
79+
Ok(meta)
6980
}
7081
}

0 commit comments

Comments
 (0)