11use {
2- asynk_strim:: stream_fn,
2+ asynk_strim:: { Yielder , stream_fn} ,
33 axum:: {
44 Router ,
55 extract:: Path ,
6- response:: { Html , IntoResponse , Sse } ,
6+ response:: { Html , IntoResponse , Sse , sse :: Event } ,
77 routing:: { get, post} ,
88 } ,
99 core:: { convert:: Infallible , error:: Error , time:: Duration } ,
@@ -80,36 +80,38 @@ async fn generate(ReadSignals(signals): ReadSignals<Signals>) -> impl IntoRespon
8080 let mut done = signals. done ;
8181
8282 // Start the SSE stream
83- Sse :: new ( stream_fn ( move |mut yielder| async move {
84- // Signal event generation start
85- let patch = PatchSignals :: new ( r#"{{"generating": true}}"# ) ;
86- let sse_event = patch. write_as_axum_sse_event ( ) ;
87- yielder. yield_item ( Ok :: < _ , Infallible > ( sse_event) ) . await ;
88-
89- // Yield the events elements and signals to the stream
90- for _ in 1 ..=signals. events {
91- total += 1 ;
92- done += 1 ;
93- // Append a new entry to the activity feed
94- let elements = event_entry ( & Status :: Done , total, "Auto" ) ;
95- let patch = PatchElements :: new ( elements)
96- . selector ( "#feed" )
97- . mode ( ElementPatchMode :: After ) ;
83+ Sse :: new ( stream_fn (
84+ move |mut yielder : Yielder < Result < Event , Infallible > > | async move {
85+ // Signal event generation start
86+ let patch = PatchSignals :: new ( r#"{{"generating": true}}"# ) ;
9887 let sse_event = patch. write_as_axum_sse_event ( ) ;
99- yielder. yield_item ( Ok :: < _ , Infallible > ( sse_event) ) . await ;
100-
101- // Update the event counts
102- let patch = PatchSignals :: new ( format ! ( r#"{{"total": {total}, "done": {done}}}"# ) ) ;
88+ yielder. yield_item ( Ok ( sse_event) ) . await ;
89+
90+ // Yield the events elements and signals to the stream
91+ for _ in 1 ..=signals. events {
92+ total += 1 ;
93+ done += 1 ;
94+ // Append a new entry to the activity feed
95+ let elements = event_entry ( & Status :: Done , total, "Auto" ) ;
96+ let patch = PatchElements :: new ( elements)
97+ . selector ( "#feed" )
98+ . mode ( ElementPatchMode :: After ) ;
99+ let sse_event = patch. write_as_axum_sse_event ( ) ;
100+ yielder. yield_item ( Ok ( sse_event) ) . await ;
101+
102+ // Update the event counts
103+ let patch = PatchSignals :: new ( format ! ( r#"{{"total": {total}, "done": {done}}}"# ) ) ;
104+ let sse_event = patch. write_as_axum_sse_event ( ) ;
105+ yielder. yield_item ( Ok ( sse_event) ) . await ;
106+ tokio:: time:: sleep ( Duration :: from_millis ( signals. interval ) ) . await ;
107+ }
108+
109+ // Signal event generation end
110+ let patch = PatchSignals :: new ( r#"{{"generating": false}}"# ) ;
103111 let sse_event = patch. write_as_axum_sse_event ( ) ;
104- yielder. yield_item ( Ok :: < _ , Infallible > ( sse_event) ) . await ;
105- tokio:: time:: sleep ( Duration :: from_millis ( signals. interval ) ) . await ;
106- }
107-
108- // Signal event generation end
109- let patch = PatchSignals :: new ( r#"{{"generating": false}}"# ) ;
110- let sse_event = patch. write_as_axum_sse_event ( ) ;
111- yielder. yield_item ( Ok :: < _ , Infallible > ( sse_event) ) . await ;
112- } ) )
112+ yielder. yield_item ( Ok ( sse_event) ) . await ;
113+ } ,
114+ ) )
113115}
114116
115117/// Creates one event with a given status
@@ -118,27 +120,29 @@ async fn event(
118120 ReadSignals ( signals) : ReadSignals < Signals > ,
119121) -> impl IntoResponse {
120122 // Create the event stream, since we're patching both an element and a signal.
121- Sse :: new ( stream_fn ( move |mut yielder| async move {
122- // Signal the updated event counts
123- let total = signals. total + 1 ;
124- let signals = match status {
125- Status :: Done => format ! ( r#"{{"total": {total}, "done": {}}}"# , signals. done + 1 ) ,
126- Status :: Warn => format ! ( r#"{{"total": {total}, "warn": {}}}"# , signals. warn + 1 ) ,
127- Status :: Fail => format ! ( r#"{{"total": {total}, "fail": {}}}"# , signals. fail + 1 ) ,
128- Status :: Info => format ! ( r#"{{"total": {total}, "info": {}}}"# , signals. info + 1 ) ,
129- } ;
130- let patch = PatchSignals :: new ( signals) ;
131- let sse_signal = patch. write_as_axum_sse_event ( ) ;
132- yielder. yield_item ( Ok :: < _ , Infallible > ( sse_signal) ) . await ;
133-
134- // Patch an element and append it to the feed
135- let elements = event_entry ( & status, total, "Manual" ) ;
136- let patch = PatchElements :: new ( elements)
137- . selector ( "#feed" )
138- . mode ( ElementPatchMode :: After ) ;
139- let sse_event = patch. write_as_axum_sse_event ( ) ;
140- yielder. yield_item ( Ok :: < _ , Infallible > ( sse_event) ) . await ;
141- } ) )
123+ Sse :: new ( stream_fn (
124+ move |mut yielder : Yielder < Result < Event , Infallible > > | async move {
125+ // Signal the updated event counts
126+ let total = signals. total + 1 ;
127+ let signals = match status {
128+ Status :: Done => format ! ( r#"{{"total": {total}, "done": {}}}"# , signals. done + 1 ) ,
129+ Status :: Warn => format ! ( r#"{{"total": {total}, "warn": {}}}"# , signals. warn + 1 ) ,
130+ Status :: Fail => format ! ( r#"{{"total": {total}, "fail": {}}}"# , signals. fail + 1 ) ,
131+ Status :: Info => format ! ( r#"{{"total": {total}, "info": {}}}"# , signals. info + 1 ) ,
132+ } ;
133+ let patch = PatchSignals :: new ( signals) ;
134+ let sse_signal = patch. write_as_axum_sse_event ( ) ;
135+ yielder. yield_item ( Ok ( sse_signal) ) . await ;
136+
137+ // Patch an element and append it to the feed
138+ let elements = event_entry ( & status, total, "Manual" ) ;
139+ let patch = PatchElements :: new ( elements)
140+ . selector ( "#feed" )
141+ . mode ( ElementPatchMode :: After ) ;
142+ let sse_event = patch. write_as_axum_sse_event ( ) ;
143+ yielder. yield_item ( Ok ( sse_event) ) . await ;
144+ } ,
145+ ) )
142146}
143147
144148/// Returns an HTML string for the entry
0 commit comments