-
-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Labels
Milestone
Description
Priority: 🟡 HIGH
Problem
Commands need to declare their stream dependencies and we need clear stream naming conventions. EventCore uses a stream-centric approach rather than traditional aggregates.
Updated Understanding (After EventCore Review)
EventCore represents an evolution in event sourcing architecture:
- No predefined aggregates - Commands dynamically define consistency boundaries
- Multi-stream atomic operations - Natural for cross-entity business operations
- Type-driven command design - Leverages Rust's type system for safety
- Flexible consistency - Each command declares its own requirements
Required Implementation
1. Define Stream Naming Conventions
// Consistent stream naming patterns
pub mod streams {
use eventcore::StreamId;
pub fn session_stream(session_id: &SessionId) -> StreamId {
StreamId::new(format!("session:{}", session_id))
}
pub fn analysis_stream(analysis_id: &AnalysisId) -> StreamId {
StreamId::new(format!("analysis:{}", analysis_id))
}
pub fn user_settings_stream(user_id: &UserId) -> StreamId {
StreamId::new(format!("user:{}:settings", user_id))
}
pub fn extraction_stream(extraction_id: &ExtractionId) -> StreamId {
StreamId::new(format!("extraction:{}", extraction_id))
}
}
2. Command-Centric Design with EventCore
// Commands define their own consistency boundaries
#[derive(Command)]
pub struct StartSessionAnalysis {
#[stream]
session_stream: StreamId,
#[stream]
analysis_stream: StreamId,
analysis_config: AnalysisConfig,
}
#[async_trait]
impl CommandLogic for StartSessionAnalysis {
type State = SessionAnalysisState;
type Event = DomainEvent;
async fn handle(
&self,
read_streams: ReadStreams<Self::StreamSet>,
state: Self::State,
stream_resolver: &mut StreamResolver,
) -> CommandResult<Vec<StreamWrite<Self::StreamSet, Self::Event>>> {
let mut events = Vec::new();
// Business logic validation
require!(state.session.status == SessionStatus::Complete, "Cannot analyze incomplete session");
require!(!state.analysis_exists, "Analysis already started for this session");
// Emit events to multiple streams atomically
emit!(
events,
&read_streams,
self.session_stream.clone(),
DomainEvent::AnalysisStarted {
session_id: state.session.id.clone(),
analysis_id: self.analysis_config.id.clone(),
}
);
emit!(
events,
&read_streams,
self.analysis_stream.clone(),
DomainEvent::AnalysisCreated {
analysis_id: self.analysis_config.id.clone(),
config: self.analysis_config.clone(),
session_id: state.session.id.clone(),
}
);
Ok(events)
}
}
3. Stream Lifecycle Management
// Document stream lifecycles and relationships
pub struct StreamDocumentation {
pub stream_pattern: &'static str,
pub purpose: &'static str,
pub lifecycle: StreamLifecycle,
pub related_streams: Vec<&'static str>,
}
pub const STREAM_DOCS: &[StreamDocumentation] = &[
StreamDocumentation {
stream_pattern: "session:{session_id}",
purpose: "Tracks all events for a single LLM session",
lifecycle: StreamLifecycle::Bounded {
created_by: "StartSession command",
closed_by: "EndSession command",
retention: RetentionPolicy::Days(90),
},
related_streams: vec!["analysis:{analysis_id}", "user:{user_id}:settings"],
},
StreamDocumentation {
stream_pattern: "analysis:{analysis_id}",
purpose: "Tracks analysis process and results",
lifecycle: StreamLifecycle::Bounded {
created_by: "StartSessionAnalysis command",
closed_by: "CompleteAnalysis command",
retention: RetentionPolicy::Days(365),
},
related_streams: vec!["session:{session_id}"],
},
];
4. Complex Multi-Stream Operations
// Example: Version migration affecting multiple streams
#[derive(Command)]
pub struct MigrateSessionVersion {
#[stream]
session_stream: StreamId,
#[stream]
version_tracking_stream: StreamId,
#[stream]
migration_log_stream: StreamId,
from_version: ModelVersion,
to_version: ModelVersion,
}
// This command atomically updates all three streams
// EventCore ensures consistency across all streams
5. Stream Query Patterns
// Efficient querying across related streams
pub async fn get_session_with_analyses(
session_id: &SessionId,
event_store: &PostgresEventStore,
) -> Result<SessionWithAnalyses, DomainError> {
// Read session stream
let session_stream = streams::session_stream(session_id);
let session_events = event_store.read_stream(&session_stream, None).await?;
// Extract analysis IDs from session events
let analysis_ids = extract_analysis_ids(&session_events);
// Read all related analysis streams
let mut analyses = Vec::new();
for analysis_id in analysis_ids {
let analysis_stream = streams::analysis_stream(&analysis_id);
let analysis_events = event_store.read_stream(&analysis_stream, None).await?;
analyses.push(build_analysis_projection(analysis_events)?);
}
Ok(SessionWithAnalyses {
session: build_session_projection(session_events)?,
analyses,
})
}
Benefits of EventCore's Approach for Union Square
- Natural Cross-Boundary Operations: Session analysis naturally spans session and analysis streams
- Flexible Consistency: Different commands can have different consistency requirements
- Type Safety: Command derives and macros ensure compile-time correctness
- Evolution-Friendly: Easy to add new streams or change consistency boundaries
Success Criteria
- Stream naming conventions established and documented
- Commands redesigned to use
#[derive(Command)]
with stream dependencies - Stream lifecycle documentation created
- Multi-stream query patterns implemented
- EventCore macros (
require!
,emit!
) used throughout - Developer guide for stream-centric design
References
- EventCore Documentation
- EventCore's stream-centric architecture
- Updated expert review acknowledging EventCore's approach