Skip to content

Commit d3a7459

Browse files
mikeeemarcduiker
andauthored
release <- main (#290)
* chore: remove pinned release versions validation (#270) Signed-off-by: Mike Nguyen <[email protected]> * fix(deps): pin axum-test crate to 16.4.0 Signed-off-by: Mike Nguyen <[email protected]> * fix(deps): pin reserve-port to 2.1.0 Signed-off-by: Mike Nguyen <[email protected]> * fix(docs): over indented docs (#274) Signed-off-by: Mike Nguyen <[email protected]> * chore: pin idna_adapter (#278) Signed-off-by: Mike Nguyen <[email protected]> * docs: fix typo (#279) Signed-off-by: Mike Nguyen <[email protected]> * chore: limit the visibility of the cargo token (#280) Addresses #218 Signed-off-by: Mike Nguyen <[email protected]> * fix(chore): allow large size difference between variants in generated enums (#283) Signed-off-by: Mike Nguyen <[email protected]> * fix: refactor log formatting (#285) * fix: refactor log formatting Signed-off-by: mikeee <[email protected]> * fix: refactor all print formatting Signed-off-by: mikeee <[email protected]> --------- Signed-off-by: mikeee <[email protected]> * Update markdown files to be compatible with latest Hugo (#287) Signed-off-by: Marc Duiker <[email protected]> * fix: cleanup dead code (#288) Signed-off-by: Mike Nguyen <[email protected]> * feat!: jobs failure policy and proto bump (#289) * chore: bump protos Signed-off-by: Mike Nguyen <[email protected]> * feat: add override job parameter Signed-off-by: Mike Nguyen <[email protected]> * feat: job failure policy Signed-off-by: Mike Nguyen <[email protected]> * feat: add failurepolicy job example and add option Signed-off-by: Mike Nguyen <[email protected]> * fix: typo Signed-off-by: Mike Nguyen <[email protected]> * chore: fix proto linter issue Signed-off-by: Mike Nguyen <[email protected]> * fix: job failure policy assertion Signed-off-by: Mike Nguyen <[email protected]> --------- Signed-off-by: Mike Nguyen <[email protected]> * fix: pin dev deps related to axum_test (#291) Signed-off-by: Mike Nguyen <[email protected]> --------- Signed-off-by: Mike Nguyen <[email protected]> Signed-off-by: mikeee <[email protected]> Signed-off-by: Marc Duiker <[email protected]> Co-authored-by: Marc Duiker <[email protected]>
1 parent 78b4b19 commit d3a7459

File tree

38 files changed

+2070
-2253
lines changed

38 files changed

+2070
-2253
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ on:
1616

1717
env:
1818
CARGO_TERM_COLOR: always
19-
CARGO_TOKEN: ${{ secrets.CRATES_IO_TOKEN }}
2019
PROTOC_VERSION: 24.4
2120
RUSTFLAGS: "-D warnings"
2221

@@ -145,4 +144,6 @@ jobs:
145144
version: ${{ env.PROTOC_VERSION }}
146145
- uses: actions/checkout@v4
147146
- name: cargo publish - ${{ matrix.crate }}
147+
env:
148+
CARGO_TOKEN: ${{ secrets.CARGO_TOKEN }}
148149
run: cargo publish --manifest-path ${{ matrix.crate }}/Cargo.toml --token ${{ env.CARGO_TOKEN }}

.github/workflows/validate-examples.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ jobs:
220220
fail-fast: false
221221
matrix:
222222
examples:
223-
[ "actors", "bindings", "client", "configuration", "conversation", "crypto", "invoke/grpc", "invoke/grpc-proxying", "jobs", "pubsub", "query_state", "secrets-bulk" ]
223+
[ "actors", "bindings", "client", "configuration", "conversation", "crypto", "invoke/grpc", "invoke/grpc-proxying", "jobs", "jobs-failurepolicy", "pubsub", "query_state", "secrets-bulk" ]
224224
steps:
225225
- name: Check out code
226226
uses: actions/checkout@v4

dapr-macros/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ pub fn actor(_attr: TokenStream, item: TokenStream) -> TokenStream {
8181
Ok(actor_struct) => actor_struct.ident.clone(),
8282
Err(_) => match syn::parse::<syn::ItemType>(item.clone()) {
8383
Ok(ty) => ty.ident.clone(),
84-
Err(e) => panic!("Error parsing actor struct: {}", e),
84+
Err(e) => panic!("Error parsing actor struct: {e}"),
8585
},
8686
};
8787

dapr/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,11 @@ axum-test = "=16.4.0" # TODO: Remove problematic dep
3030
litemap = "=0.7.4" # TODO: Remove pinned - linked to axum_test
3131
zerofrom = "=0.1.5" # TODO: Remove pinned - linked to axum_test
3232
reserve-port = "=2.1.0" # TODO: Remove pinned - linked to axum_test
33-
idna_adapter = "=1.2.0"
33+
idna_adapter = "=1.2.0" # TODO: Remove pinned - linked to axum_test
34+
deranged = "=0.4.0" # TODO: Remove pinned - linked to axum_test
35+
time = "=0.3.41" # TODO: Remove pinned - linked to axum_test
36+
time-core = "=0.1.4" # TODO: Remove pinned - linked to axum_test
37+
time-macros = "=0.2.22" # TODO: Remove pinned - linked to axum_test
3438

3539
once_cell = "1.19"
3640
dapr = { path = "./" }

dapr/src/client.rs

Lines changed: 76 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
1-
use std::collections::HashMap;
2-
1+
use crate::dapr::proto::common::v1::job_failure_policy::Policy;
2+
use crate::dapr::proto::common::v1::JobFailurePolicyConstant;
3+
use crate::dapr::proto::{common::v1 as common_v1, runtime::v1 as dapr_v1};
4+
use crate::error::Error;
35
use async_trait::async_trait;
46
use futures::StreamExt;
57
use prost_types::Any;
68
use serde::{Deserialize, Serialize};
79
use serde_json::Value;
10+
use std::collections::HashMap;
11+
use std::time::Duration;
812
use tokio::io::AsyncRead;
913
use tonic::codegen::tokio_stream;
1014
use tonic::{transport::Channel as TonicChannel, Request};
1115
use tonic::{Status, Streaming};
1216

13-
use crate::dapr::proto::{common::v1 as common_v1, runtime::v1 as dapr_v1};
14-
use crate::error::Error;
15-
1617
#[derive(Clone)]
1718
pub struct Client<T>(T);
1819

@@ -25,7 +26,7 @@ impl<T: DaprInterface> Client<T> {
2526
pub async fn connect(addr: String) -> Result<Self, Error> {
2627
// Get the Dapr port to create a connection
2728
let port: u16 = std::env::var("DAPR_GRPC_PORT")?.parse()?;
28-
let address = format!("{}:{}", addr, port);
29+
let address = format!("{addr}:{port}");
2930

3031
Ok(Client(T::connect(address).await?))
3132
}
@@ -45,7 +46,7 @@ impl<T: DaprInterface> Client<T> {
4546
}
4647
};
4748

48-
let address = format!("{}:{}", addr, port);
49+
let address = format!("{addr}:{port}");
4950

5051
Ok(Client(T::connect(address).await?))
5152
}
@@ -559,9 +560,15 @@ impl<T: DaprInterface> Client<T> {
559560
/// # Arguments
560561
///
561562
/// * job - The job to schedule
562-
pub async fn schedule_job_alpha1(&mut self, job: Job) -> Result<ScheduleJobResponse, Error> {
563+
/// * overwrite - Optional flag to overwrite an existing job with the same name
564+
pub async fn schedule_job_alpha1(
565+
&mut self,
566+
job: Job,
567+
overwrite: Option<bool>,
568+
) -> Result<ScheduleJobResponse, Error> {
563569
let request = ScheduleJobRequest {
564570
job: Some(job.clone()),
571+
overwrite: overwrite.unwrap_or(false),
565572
};
566573
self.0.schedule_job_alpha1(request).await
567574
}
@@ -981,6 +988,9 @@ pub type DecryptRequestOptions = crate::dapr::proto::runtime::v1::DecryptRequest
981988
/// The basic job structure
982989
pub type Job = crate::dapr::proto::runtime::v1::Job;
983990

991+
/// A failure policy for a job
992+
pub type JobFailurePolicy = crate::dapr::proto::common::v1::JobFailurePolicy;
993+
984994
/// A request to schedule a job
985995
pub type ScheduleJobRequest = crate::dapr::proto::runtime::v1::ScheduleJobRequest;
986996

@@ -1040,6 +1050,7 @@ pub struct JobBuilder {
10401050
ttl: Option<String>,
10411051
repeats: Option<u32>,
10421052
due_time: Option<String>,
1053+
failure_policy: Option<JobFailurePolicy>,
10431054
}
10441055

10451056
impl JobBuilder {
@@ -1052,6 +1063,7 @@ impl JobBuilder {
10521063
ttl: None,
10531064
repeats: None,
10541065
due_time: None,
1066+
failure_policy: None,
10551067
}
10561068
}
10571069

@@ -1080,6 +1092,11 @@ impl JobBuilder {
10801092
self
10811093
}
10821094

1095+
pub fn with_failure_policy(mut self, policy: JobFailurePolicy) -> Self {
1096+
self.failure_policy = Some(policy);
1097+
self
1098+
}
1099+
10831100
pub fn build(self) -> Job {
10841101
Job {
10851102
schedule: self.schedule,
@@ -1088,6 +1105,57 @@ impl JobBuilder {
10881105
ttl: self.ttl,
10891106
repeats: self.repeats,
10901107
due_time: self.due_time,
1108+
failure_policy: self.failure_policy,
1109+
}
1110+
}
1111+
}
1112+
1113+
// Enum for a job failure policy
1114+
pub enum JobFailurePolicyType {
1115+
Drop {},
1116+
Constant {},
1117+
}
1118+
1119+
pub struct JobFailurePolicyBuilder {
1120+
policy: JobFailurePolicyType,
1121+
pub retry_interval: Option<Duration>,
1122+
pub max_retries: Option<u32>,
1123+
}
1124+
1125+
impl JobFailurePolicyBuilder {
1126+
pub fn new(policy: JobFailurePolicyType) -> Self {
1127+
JobFailurePolicyBuilder {
1128+
policy,
1129+
retry_interval: None,
1130+
max_retries: None,
1131+
}
1132+
}
1133+
1134+
pub fn with_retry_interval(mut self, interval: Duration) -> Self {
1135+
// Convert interval string (e.g., "5s") to ProstDuration
1136+
self.retry_interval = Some(interval);
1137+
self
1138+
}
1139+
1140+
pub fn with_max_retries(mut self, max_retries: u32) -> Self {
1141+
self.max_retries = Some(max_retries);
1142+
self
1143+
}
1144+
1145+
pub fn build(self) -> common_v1::JobFailurePolicy {
1146+
match self.policy {
1147+
JobFailurePolicyType::Drop {} => common_v1::JobFailurePolicy {
1148+
policy: Some(Policy::Drop(Default::default())),
1149+
},
1150+
JobFailurePolicyType::Constant {} => JobFailurePolicy {
1151+
policy: Some(Policy::Constant(JobFailurePolicyConstant {
1152+
interval: self.retry_interval.map(|interval| {
1153+
prost_types::Duration::try_from(interval)
1154+
.expect("Failed to convert Duration")
1155+
}),
1156+
max_retries: self.max_retries,
1157+
})),
1158+
},
10911159
}
10921160
}
10931161
}

dapr/src/dapr/dapr.proto.common.v1.rs

Lines changed: 39 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,7 @@ pub mod http_extension {
2020
/// Type of HTTP 1.1 Methods
2121
/// RFC 7231: <https://tools.ietf.org/html/rfc7231#page-24>
2222
/// RFC 5789: <https://datatracker.ietf.org/doc/html/rfc5789>
23-
#[derive(
24-
Clone,
25-
Copy,
26-
Debug,
27-
PartialEq,
28-
Eq,
29-
Hash,
30-
PartialOrd,
31-
Ord,
32-
::prost::Enumeration
33-
)]
23+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
3424
#[repr(i32)]
3525
pub enum Verb {
3626
None = 0,
@@ -148,10 +138,8 @@ pub struct StateItem {
148138
pub etag: ::core::option::Option<Etag>,
149139
/// The metadata which will be passed to state store component.
150140
#[prost(map = "string, string", tag = "4")]
151-
pub metadata: ::std::collections::HashMap<
152-
::prost::alloc::string::String,
153-
::prost::alloc::string::String,
154-
>,
141+
pub metadata:
142+
::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>,
155143
/// Options for concurrency and consistency to save the state.
156144
#[prost(message, optional, tag = "5")]
157145
pub options: ::core::option::Option<StateOptions>,
@@ -174,17 +162,7 @@ pub struct StateOptions {
174162
/// Nested message and enum types in `StateOptions`.
175163
pub mod state_options {
176164
/// Enum describing the supported concurrency for state.
177-
#[derive(
178-
Clone,
179-
Copy,
180-
Debug,
181-
PartialEq,
182-
Eq,
183-
Hash,
184-
PartialOrd,
185-
Ord,
186-
::prost::Enumeration
187-
)]
165+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
188166
#[repr(i32)]
189167
pub enum StateConcurrency {
190168
ConcurrencyUnspecified = 0,
@@ -214,17 +192,7 @@ pub mod state_options {
214192
}
215193
}
216194
/// Enum describing the supported consistency for state.
217-
#[derive(
218-
Clone,
219-
Copy,
220-
Debug,
221-
PartialEq,
222-
Eq,
223-
Hash,
224-
PartialOrd,
225-
Ord,
226-
::prost::Enumeration
227-
)]
195+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
228196
#[repr(i32)]
229197
pub enum StateConsistency {
230198
ConsistencyUnspecified = 0,
@@ -265,8 +233,38 @@ pub struct ConfigurationItem {
265233
pub version: ::prost::alloc::string::String,
266234
/// the metadata which will be passed to/from configuration store component.
267235
#[prost(map = "string, string", tag = "3")]
268-
pub metadata: ::std::collections::HashMap<
269-
::prost::alloc::string::String,
270-
::prost::alloc::string::String,
271-
>,
236+
pub metadata:
237+
::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>,
238+
}
239+
/// JobFailurePolicy defines the policy to apply when a job fails to trigger.
240+
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
241+
pub struct JobFailurePolicy {
242+
/// policy is the policy to apply when a job fails to trigger.
243+
#[prost(oneof = "job_failure_policy::Policy", tags = "1, 2")]
244+
pub policy: ::core::option::Option<job_failure_policy::Policy>,
245+
}
246+
/// Nested message and enum types in `JobFailurePolicy`.
247+
pub mod job_failure_policy {
248+
/// policy is the policy to apply when a job fails to trigger.
249+
#[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
250+
pub enum Policy {
251+
#[prost(message, tag = "1")]
252+
Drop(super::JobFailurePolicyDrop),
253+
#[prost(message, tag = "2")]
254+
Constant(super::JobFailurePolicyConstant),
255+
}
256+
}
257+
/// JobFailurePolicyDrop is a policy which drops the job tick when the job fails to trigger.
258+
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
259+
pub struct JobFailurePolicyDrop {}
260+
/// JobFailurePolicyConstant is a policy which retries the job at a consistent interval when the job fails to trigger.
261+
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
262+
pub struct JobFailurePolicyConstant {
263+
/// interval is the constant delay to wait before retrying the job.
264+
#[prost(message, optional, tag = "1")]
265+
pub interval: ::core::option::Option<::prost_types::Duration>,
266+
/// max_retries is the optional maximum number of retries to attempt before giving up.
267+
/// If unset, the Job will be retried indefinitely.
268+
#[prost(uint32, optional, tag = "2")]
269+
pub max_retries: ::core::option::Option<u32>,
272270
}

0 commit comments

Comments
 (0)