Skip to content

Commit b51c469

Browse files
committed
change: storage traits return io::Error
Issue #1457: Simplify storage trait error API. Benefits: Simpler trait implementation - Storage code doesn't need to know about `StorageError<C>` or generic type parameters in error handling. ## Changes Storage trait methods now return `io::Error` instead of `StorageError<C>`: - `RaftLogStorage`: `append()`, `truncate()`, `purge()`, `save_vote()`, `save_committed()` - `RaftLogReader`: `try_get_log_entries()`, `limited_get_log_entries()`, `read_vote()`, `get_key_log_ids()` - `RaftStateMachine`: `applied_state()`, `apply()`, `get_current_snapshot()`, `begin_receiving_snapshot()`, `install_snapshot()` - `RaftSnapshotBuilder`: `build_snapshot()` Added internal `StorageIOResult` trait for error conversion: - Provides 14 methods like `.sto_read_logs()`, `.sto_write_snapshot()`, etc. - Converts `io::Error` to `StorageError<C>` with proper context at Openraft boundaries - Fix: #1457 Upgrade tip: Storage implementors should update trait methods to return `io::Error`: ```rust // Before async fn read_vote(&mut self) -> Result<Option<Vote>, StorageError<C>> { self.vote.read().map_err(|e| StorageError::read_vote(&e)) } // After async fn read_vote(&mut self) -> Result<Option<Vote>, io::Error> { self.vote.read() } ``` Return `io::Error` directly - Openraft adds context automatically. The method body may leave unchanged: `io::Error` can be converted to `StorageError` with a simple `?`.
1 parent 8c3b1fe commit b51c469

File tree

31 files changed

+477
-351
lines changed

31 files changed

+477
-351
lines changed

cluster_benchmark/tests/benchmark/store.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ mod test;
33
use std::collections::BTreeMap;
44
use std::fmt;
55
use std::fmt::Debug;
6+
use std::io;
67
use std::io::Cursor;
78
use std::ops::RangeBounds;
89
use std::sync::atomic::AtomicU64;
@@ -24,7 +25,6 @@ use openraft::Entry;
2425
use openraft::EntryPayload;
2526
use openraft::OptionalSend;
2627
use openraft::SnapshotMeta;
27-
use openraft::StorageError;
2828
use openraft::StoredMembership;
2929
use openraft::Vote;
3030
use serde::Deserialize;
@@ -121,7 +121,7 @@ impl RaftLogReader<TypeConfig> for Arc<LogStore> {
121121
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend>(
122122
&mut self,
123123
range: RB,
124-
) -> Result<Vec<Entry<TypeConfig>>, StorageError<TypeConfig>> {
124+
) -> Result<Vec<Entry<TypeConfig>>, io::Error> {
125125
let mut entries = vec![];
126126
{
127127
let log = self.log.read().await;
@@ -133,22 +133,22 @@ impl RaftLogReader<TypeConfig> for Arc<LogStore> {
133133
Ok(entries)
134134
}
135135

136-
async fn read_vote(&mut self) -> Result<Option<Vote<TypeConfig>>, StorageError<TypeConfig>> {
136+
async fn read_vote(&mut self) -> Result<Option<Vote<TypeConfig>>, io::Error> {
137137
Ok(self.vote.read().await.clone())
138138
}
139139
}
140140

141141
impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
142142
#[tracing::instrument(level = "trace", skip(self))]
143-
async fn build_snapshot(&mut self) -> Result<Snapshot<TypeConfig>, StorageError<TypeConfig>> {
143+
async fn build_snapshot(&mut self) -> Result<Snapshot<TypeConfig>, io::Error> {
144144
let data;
145145
let last_applied_log;
146146
let last_membership;
147147

148148
{
149149
// Serialize the data of the state machine.
150150
let sm = self.sm.read().await;
151-
data = serde_json::to_vec(&*sm).map_err(|e| StorageError::read_state_machine(&e))?;
151+
data = serde_json::to_vec(&*sm).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
152152

153153
last_applied_log = sm.last_applied_log;
154154
last_membership = sm.last_membership.clone();
@@ -190,7 +190,7 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
190190
}
191191

192192
impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
193-
async fn get_log_state(&mut self) -> Result<LogState<TypeConfig>, StorageError<TypeConfig>> {
193+
async fn get_log_state(&mut self) -> Result<LogState<TypeConfig>, io::Error> {
194194
let log = self.log.read().await;
195195
let last_serialized = log.iter().rev().next().map(|(_, ent)| ent);
196196

@@ -213,22 +213,22 @@ impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
213213
}
214214

215215
#[tracing::instrument(level = "trace", skip(self))]
216-
async fn save_vote(&mut self, vote: &Vote<TypeConfig>) -> Result<(), StorageError<TypeConfig>> {
216+
async fn save_vote(&mut self, vote: &Vote<TypeConfig>) -> Result<(), io::Error> {
217217
let mut v = self.vote.write().await;
218218
*v = Some(*vote);
219219
Ok(())
220220
}
221221

222222
#[tracing::instrument(level = "debug", skip(self))]
223-
async fn truncate(&mut self, log_id: LogIdOf<TypeConfig>) -> Result<(), StorageError<TypeConfig>> {
223+
async fn truncate(&mut self, log_id: LogIdOf<TypeConfig>) -> Result<(), io::Error> {
224224
let mut log = self.log.write().await;
225225
log.split_off(&log_id.index());
226226

227227
Ok(())
228228
}
229229

230230
#[tracing::instrument(level = "debug", skip_all)]
231-
async fn purge(&mut self, log_id: LogIdOf<TypeConfig>) -> Result<(), StorageError<TypeConfig>> {
231+
async fn purge(&mut self, log_id: LogIdOf<TypeConfig>) -> Result<(), io::Error> {
232232
{
233233
let mut p = self.last_purged_log_id.write().await;
234234
*p = Some(log_id);
@@ -241,7 +241,7 @@ impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
241241
}
242242

243243
#[tracing::instrument(level = "trace", skip_all)]
244-
async fn append<I>(&mut self, entries: I, callback: IOFlushed<TypeConfig>) -> Result<(), StorageError<TypeConfig>>
244+
async fn append<I>(&mut self, entries: I, callback: IOFlushed<TypeConfig>) -> Result<(), io::Error>
245245
where I: IntoIterator<Item = Entry<TypeConfig>> + Send {
246246
{
247247
let mut log = self.log.write().await;
@@ -261,12 +261,12 @@ impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
261261
impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
262262
async fn applied_state(
263263
&mut self,
264-
) -> Result<(Option<LogIdOf<TypeConfig>>, StoredMembership<TypeConfig>), StorageError<TypeConfig>> {
264+
) -> Result<(Option<LogIdOf<TypeConfig>>, StoredMembership<TypeConfig>), io::Error> {
265265
let sm = self.sm.read().await;
266266
Ok((sm.last_applied_log, sm.last_membership.clone()))
267267
}
268268

269-
async fn apply<I>(&mut self, entries: I) -> Result<(), StorageError<TypeConfig>>
269+
async fn apply<I>(&mut self, entries: I) -> Result<(), io::Error>
270270
where
271271
I: IntoIterator<Item = EntryResponder<TypeConfig>> + Send,
272272
I::IntoIter: Send,
@@ -291,7 +291,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
291291
}
292292

293293
#[tracing::instrument(level = "trace", skip(self))]
294-
async fn begin_receiving_snapshot(&mut self) -> Result<SnapshotDataOf<TypeConfig>, StorageError<TypeConfig>> {
294+
async fn begin_receiving_snapshot(&mut self) -> Result<SnapshotDataOf<TypeConfig>, io::Error> {
295295
Ok(Cursor::new(Vec::new()))
296296
}
297297

@@ -300,16 +300,16 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
300300
&mut self,
301301
meta: &SnapshotMeta<TypeConfig>,
302302
snapshot: SnapshotDataOf<TypeConfig>,
303-
) -> Result<(), StorageError<TypeConfig>> {
303+
) -> Result<(), io::Error> {
304304
let new_snapshot = StoredSnapshot {
305305
meta: meta.clone(),
306306
data: snapshot.into_inner(),
307307
};
308308

309309
// Update the state machine.
310310
{
311-
let new_sm: StateMachine = serde_json::from_slice(&new_snapshot.data)
312-
.map_err(|e| StorageError::read_snapshot(Some(new_snapshot.meta.signature()), &e))?;
311+
let new_sm: StateMachine =
312+
serde_json::from_slice(&new_snapshot.data).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
313313
let mut sm = self.sm.write().await;
314314
*sm = new_sm;
315315
}
@@ -321,7 +321,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
321321
}
322322

323323
#[tracing::instrument(level = "trace", skip(self))]
324-
async fn get_current_snapshot(&mut self) -> Result<Option<Snapshot<TypeConfig>>, StorageError<TypeConfig>> {
324+
async fn get_current_snapshot(&mut self) -> Result<Option<Snapshot<TypeConfig>>, io::Error> {
325325
match &*self.current_snapshot.read().await {
326326
Some(snapshot) => {
327327
let data = snapshot.data.clone();

examples/mem-log/src/log_store.rs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
44
use std::collections::BTreeMap;
55
use std::fmt::Debug;
6+
use std::io;
67
use std::ops::RangeBounds;
78
use std::sync::Arc;
89

@@ -12,7 +13,6 @@ use openraft::entry::RaftEntry;
1213
use openraft::storage::IOFlushed;
1314
use openraft::LogState;
1415
use openraft::RaftTypeConfig;
15-
use openraft::StorageError;
1616
use tokio::sync::Mutex;
1717

1818
/// RaftLogStore implementation with a in-memory storage
@@ -51,15 +51,15 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
5151
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug>(
5252
&mut self,
5353
range: RB,
54-
) -> Result<Vec<C::Entry>, StorageError<C>>
54+
) -> Result<Vec<C::Entry>, io::Error>
5555
where
5656
C::Entry: Clone,
5757
{
5858
let response = self.log.range(range.clone()).map(|(_, val)| val.clone()).collect::<Vec<_>>();
5959
Ok(response)
6060
}
6161

62-
async fn get_log_state(&mut self) -> Result<LogState<C>, StorageError<C>> {
62+
async fn get_log_state(&mut self) -> Result<LogState<C>, io::Error> {
6363
let last = self.log.iter().next_back().map(|(_, ent)| ent.log_id());
6464

6565
let last_purged = self.last_purged_log_id.clone();
@@ -75,25 +75,25 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
7575
})
7676
}
7777

78-
async fn save_committed(&mut self, committed: Option<LogIdOf<C>>) -> Result<(), StorageError<C>> {
78+
async fn save_committed(&mut self, committed: Option<LogIdOf<C>>) -> Result<(), io::Error> {
7979
self.committed = committed;
8080
Ok(())
8181
}
8282

83-
async fn read_committed(&mut self) -> Result<Option<LogIdOf<C>>, StorageError<C>> {
83+
async fn read_committed(&mut self) -> Result<Option<LogIdOf<C>>, io::Error> {
8484
Ok(self.committed.clone())
8585
}
8686

87-
async fn save_vote(&mut self, vote: &VoteOf<C>) -> Result<(), StorageError<C>> {
87+
async fn save_vote(&mut self, vote: &VoteOf<C>) -> Result<(), io::Error> {
8888
self.vote = Some(vote.clone());
8989
Ok(())
9090
}
9191

92-
async fn read_vote(&mut self) -> Result<Option<VoteOf<C>>, StorageError<C>> {
92+
async fn read_vote(&mut self) -> Result<Option<VoteOf<C>>, io::Error> {
9393
Ok(self.vote.clone())
9494
}
9595

96-
async fn append<I>(&mut self, entries: I, callback: IOFlushed<C>) -> Result<(), StorageError<C>>
96+
async fn append<I>(&mut self, entries: I, callback: IOFlushed<C>) -> Result<(), io::Error>
9797
where I: IntoIterator<Item = C::Entry> {
9898
// Simple implementation that calls the flush-before-return `append_to_log`.
9999
for entry in entries {
@@ -104,7 +104,7 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
104104
Ok(())
105105
}
106106

107-
async fn truncate(&mut self, log_id: LogIdOf<C>) -> Result<(), StorageError<C>> {
107+
async fn truncate(&mut self, log_id: LogIdOf<C>) -> Result<(), io::Error> {
108108
let keys = self.log.range(log_id.index()..).map(|(k, _v)| *k).collect::<Vec<_>>();
109109
for key in keys {
110110
self.log.remove(&key);
@@ -113,7 +113,7 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
113113
Ok(())
114114
}
115115

116-
async fn purge(&mut self, log_id: LogIdOf<C>) -> Result<(), StorageError<C>> {
116+
async fn purge(&mut self, log_id: LogIdOf<C>) -> Result<(), io::Error> {
117117
{
118118
let ld = &mut self.last_purged_log_id;
119119
assert!(ld.as_ref() <= Some(&log_id));
@@ -133,6 +133,7 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
133133

134134
mod impl_log_store {
135135
use std::fmt::Debug;
136+
use std::io;
136137
use std::ops::RangeBounds;
137138

138139
use openraft::alias::LogIdOf;
@@ -142,7 +143,6 @@ mod impl_log_store {
142143
use openraft::LogState;
143144
use openraft::RaftLogReader;
144145
use openraft::RaftTypeConfig;
145-
use openraft::StorageError;
146146

147147
use crate::log_store::LogStore;
148148

@@ -152,12 +152,12 @@ mod impl_log_store {
152152
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug>(
153153
&mut self,
154154
range: RB,
155-
) -> Result<Vec<C::Entry>, StorageError<C>> {
155+
) -> Result<Vec<C::Entry>, io::Error> {
156156
let mut inner = self.inner.lock().await;
157157
inner.try_get_log_entries(range).await
158158
}
159159

160-
async fn read_vote(&mut self) -> Result<Option<VoteOf<C>>, StorageError<C>> {
160+
async fn read_vote(&mut self) -> Result<Option<VoteOf<C>>, io::Error> {
161161
let mut inner = self.inner.lock().await;
162162
inner.read_vote().await
163163
}
@@ -168,38 +168,38 @@ mod impl_log_store {
168168
{
169169
type LogReader = Self;
170170

171-
async fn get_log_state(&mut self) -> Result<LogState<C>, StorageError<C>> {
171+
async fn get_log_state(&mut self) -> Result<LogState<C>, io::Error> {
172172
let mut inner = self.inner.lock().await;
173173
inner.get_log_state().await
174174
}
175175

176-
async fn save_committed(&mut self, committed: Option<LogIdOf<C>>) -> Result<(), StorageError<C>> {
176+
async fn save_committed(&mut self, committed: Option<LogIdOf<C>>) -> Result<(), io::Error> {
177177
let mut inner = self.inner.lock().await;
178178
inner.save_committed(committed).await
179179
}
180180

181-
async fn read_committed(&mut self) -> Result<Option<LogIdOf<C>>, StorageError<C>> {
181+
async fn read_committed(&mut self) -> Result<Option<LogIdOf<C>>, io::Error> {
182182
let mut inner = self.inner.lock().await;
183183
inner.read_committed().await
184184
}
185185

186-
async fn save_vote(&mut self, vote: &VoteOf<C>) -> Result<(), StorageError<C>> {
186+
async fn save_vote(&mut self, vote: &VoteOf<C>) -> Result<(), io::Error> {
187187
let mut inner = self.inner.lock().await;
188188
inner.save_vote(vote).await
189189
}
190190

191-
async fn append<I>(&mut self, entries: I, callback: IOFlushed<C>) -> Result<(), StorageError<C>>
191+
async fn append<I>(&mut self, entries: I, callback: IOFlushed<C>) -> Result<(), io::Error>
192192
where I: IntoIterator<Item = C::Entry> {
193193
let mut inner = self.inner.lock().await;
194194
inner.append(entries, callback).await
195195
}
196196

197-
async fn truncate(&mut self, log_id: LogIdOf<C>) -> Result<(), StorageError<C>> {
197+
async fn truncate(&mut self, log_id: LogIdOf<C>) -> Result<(), io::Error> {
198198
let mut inner = self.inner.lock().await;
199199
inner.truncate(log_id).await
200200
}
201201

202-
async fn purge(&mut self, log_id: LogIdOf<C>) -> Result<(), StorageError<C>> {
202+
async fn purge(&mut self, log_id: LogIdOf<C>) -> Result<(), io::Error> {
203203
let mut inner = self.inner.lock().await;
204204
inner.purge(log_id).await
205205
}

examples/raft-kv-memstore-grpc/src/store/mod.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::fmt::Debug;
2+
use std::io;
23
use std::sync::Arc;
34
use std::sync::Mutex;
45

@@ -48,7 +49,7 @@ impl StateMachineStore {
4849

4950
impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
5051
#[tracing::instrument(level = "trace", skip(self))]
51-
async fn build_snapshot(&mut self) -> Result<Snapshot, StorageError> {
52+
async fn build_snapshot(&mut self) -> Result<Snapshot, io::Error> {
5253
let data;
5354
let last_applied: Option<LogId>;
5455
let last_membership;
@@ -97,7 +98,7 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
9798
impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
9899
type SnapshotBuilder = Self;
99100

100-
async fn applied_state(&mut self) -> Result<(Option<LogId>, StoredMembership), StorageError> {
101+
async fn applied_state(&mut self) -> Result<(Option<LogId>, StoredMembership), io::Error> {
101102
let sm = self.state_machine.lock().unwrap();
102103

103104
let last_applied = sm.last_applied.map(|x| x.into());
@@ -111,7 +112,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
111112
}
112113

113114
#[tracing::instrument(level = "trace", skip(self, entries))]
114-
async fn apply<I>(&mut self, entries: I) -> Result<(), StorageError>
115+
async fn apply<I>(&mut self, entries: I) -> Result<(), io::Error>
115116
where
116117
I: IntoIterator<Item = EntryResponder<TypeConfig>>,
117118
I::IntoIter: Send,
@@ -144,12 +145,12 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
144145
}
145146

146147
#[tracing::instrument(level = "trace", skip(self))]
147-
async fn begin_receiving_snapshot(&mut self) -> Result<SnapshotData, StorageError> {
148+
async fn begin_receiving_snapshot(&mut self) -> Result<SnapshotData, io::Error> {
148149
Ok(Default::default())
149150
}
150151

151152
#[tracing::instrument(level = "trace", skip(self, snapshot))]
152-
async fn install_snapshot(&mut self, meta: &SnapshotMeta, snapshot: SnapshotData) -> Result<(), StorageError> {
153+
async fn install_snapshot(&mut self, meta: &SnapshotMeta, snapshot: SnapshotData) -> Result<(), io::Error> {
153154
tracing::info!("install snapshot");
154155

155156
let new_snapshot = StoredSnapshot {
@@ -160,7 +161,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
160161
// Update the state machine.
161162
{
162163
let d: pb::StateMachineData = prost::Message::decode(new_snapshot.data.as_ref())
163-
.map_err(|e| StorageError::read_snapshot(None, &e))?;
164+
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
164165

165166
let mut state_machine = self.state_machine.lock().unwrap();
166167
*state_machine = d;
@@ -173,7 +174,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
173174
}
174175

175176
#[tracing::instrument(level = "trace", skip(self))]
176-
async fn get_current_snapshot(&mut self) -> Result<Option<Snapshot>, StorageError> {
177+
async fn get_current_snapshot(&mut self) -> Result<Option<Snapshot>, io::Error> {
177178
match &*self.current_snapshot.lock().unwrap() {
178179
Some(snapshot) => {
179180
let data = snapshot.data.clone();

0 commit comments

Comments
 (0)