11use alloc:: borrow:: Cow ;
22use alloc:: rc:: Rc ;
3- use alloc:: string:: { String , ToString } ;
3+ use alloc:: string:: String ;
44use alloc:: vec:: Vec ;
55use serde:: Deserialize ;
66use serde:: de:: { Error , IgnoredAny , VariantAccess , Visitor } ;
@@ -85,13 +85,76 @@ pub struct Checkpoint<'a> {
8585 #[ serde( borrow) ]
8686 pub buckets : Vec < BucketChecksum < ' a > > ,
8787 #[ serde( default , borrow) ]
88- pub streams : Vec < StreamDefinition < ' a > > ,
88+ pub streams : Vec < StreamDescription < ' a > > ,
8989}
9090
9191#[ derive( Deserialize , Debug ) ]
92- pub struct StreamDefinition < ' a > {
92+ pub struct StreamDescription < ' a > {
9393 pub name : SyncLineStr < ' a > ,
9494 pub is_default : bool ,
95+ pub errors : Rc < Vec < StreamSubscriptionError > > ,
96+ }
97+
98+ #[ derive( Deserialize , Debug ) ]
99+ pub struct StreamSubscriptionError {
100+ pub subscription : StreamSubscriptionErrorCause ,
101+ pub message : String ,
102+ }
103+
104+ /// The concrete stream subscription that has caused an error.
105+ #[ derive( Debug ) ]
106+ pub enum StreamSubscriptionErrorCause {
107+ /// The error is caused by the stream being subscribed to by default (i.e., no parameters).
108+ Default ,
109+ /// The error is caused by an explicit subscription (e.g. due to invalid parameters).
110+ ///
111+ /// The inner value is the index into [StreamSubscriptionRequest::subscriptions] of the
112+ /// faulty subscription.
113+ ExplicitSubscription ( usize ) ,
114+ }
115+
116+ impl < ' de > Deserialize < ' de > for StreamSubscriptionErrorCause {
117+ fn deserialize < D > ( deserializer : D ) -> Result < Self , D :: Error >
118+ where
119+ D : serde:: Deserializer < ' de > ,
120+ {
121+ struct CauseVisitor ;
122+
123+ impl < ' de > Visitor < ' de > for CauseVisitor {
124+ type Value = StreamSubscriptionErrorCause ;
125+
126+ fn expecting ( & self , formatter : & mut core:: fmt:: Formatter ) -> core:: fmt:: Result {
127+ write ! ( formatter, "default or index" )
128+ }
129+
130+ fn visit_str < E > ( self , _v : & str ) -> Result < Self :: Value , E >
131+ where
132+ E : Error ,
133+ {
134+ return Ok ( StreamSubscriptionErrorCause :: Default ) ;
135+ }
136+
137+ fn visit_u64 < E > ( self , v : u64 ) -> Result < Self :: Value , E >
138+ where
139+ E : Error ,
140+ {
141+ return Ok ( StreamSubscriptionErrorCause :: ExplicitSubscription (
142+ v as usize ,
143+ ) ) ;
144+ }
145+
146+ fn visit_i32 < E > ( self , v : i32 ) -> Result < Self :: Value , E >
147+ where
148+ E : Error ,
149+ {
150+ return Ok ( StreamSubscriptionErrorCause :: ExplicitSubscription (
151+ v as usize ,
152+ ) ) ;
153+ }
154+ }
155+
156+ deserializer. deserialize_any ( CauseVisitor )
157+ }
95158}
96159
97160#[ serde_as]
@@ -141,9 +204,13 @@ pub struct BucketChecksum<'a> {
141204#[ derive( Debug ) ]
142205pub enum BucketSubscriptionReason {
143206 /// A bucket was created from a default stream.
144- DerivedFromDefaultStream ( String ) ,
207+ ///
208+ /// The inner value is the index of the stream in [Checkpoint::streams].
209+ DerivedFromDefaultStream ( usize ) ,
145210 /// A bucket was created for a subscription id we've explicitly requested in the sync request.
146- DerivedFromExplicitSubscription ( i64 ) ,
211+ ///
212+ /// The inner value is the index of the stream in [StreamSubscriptionRequest::subscriptions].
213+ DerivedFromExplicitSubscription ( usize ) ,
147214}
148215
149216impl < ' de > Deserialize < ' de > for BucketSubscriptionReason {
@@ -153,7 +220,7 @@ impl<'de> Deserialize<'de> for BucketSubscriptionReason {
153220 {
154221 struct MyVisitor ;
155222
156- const VARIANTS : & ' static [ & ' static str ] = & [ "def " , "sub" ] ;
223+ const VARIANTS : & ' static [ & ' static str ] = & [ "default " , "sub" ] ;
157224
158225 impl < ' de > Visitor < ' de > for MyVisitor {
159226 type Value = BucketSubscriptionReason ;
@@ -168,17 +235,12 @@ impl<'de> Deserialize<'de> for BucketSubscriptionReason {
168235 {
169236 let ( key, variant) = data. variant :: < & ' de str > ( ) ?;
170237 Ok ( match key {
171- "def" => BucketSubscriptionReason :: DerivedFromDefaultStream (
238+ "default" => BucketSubscriptionReason :: DerivedFromDefaultStream (
239+ variant. newtype_variant ( ) ?,
240+ ) ,
241+ "sub" => BucketSubscriptionReason :: DerivedFromExplicitSubscription (
172242 variant. newtype_variant ( ) ?,
173243 ) ,
174- "sub" => {
175- let textual_id = variant. newtype_variant :: < & ' de str > ( ) ?;
176- let id = textual_id
177- . parse ( )
178- . map_err ( |_| A :: Error :: custom ( "not an int" ) ) ?;
179-
180- BucketSubscriptionReason :: DerivedFromExplicitSubscription ( id)
181- }
182244 other => return Err ( A :: Error :: unknown_variant ( other, VARIANTS ) ) ,
183245 } )
184246 }
0 commit comments