Skip to content

Define Clear Stream Design Patterns for EventCore #148

@jwilger

Description

@jwilger

Priority: 🟡 HIGH

Problem

Commands need to declare their stream dependencies and we need clear stream naming conventions. EventCore uses a stream-centric approach rather than traditional aggregates.

Updated Understanding (After EventCore Review)

EventCore represents an evolution in event sourcing architecture:

  • No predefined aggregates - Commands dynamically define consistency boundaries
  • Multi-stream atomic operations - Natural for cross-entity business operations
  • Type-driven command design - Leverages Rust's type system for safety
  • Flexible consistency - Each command declares its own requirements

Required Implementation

1. Define Stream Naming Conventions

// Consistent stream naming patterns
pub mod streams {
    use eventcore::StreamId;
    
    pub fn session_stream(session_id: &SessionId) -> StreamId {
        StreamId::new(format!("session:{}", session_id))
    }
    
    pub fn analysis_stream(analysis_id: &AnalysisId) -> StreamId {
        StreamId::new(format!("analysis:{}", analysis_id))
    }
    
    pub fn user_settings_stream(user_id: &UserId) -> StreamId {
        StreamId::new(format!("user:{}:settings", user_id))
    }
    
    pub fn extraction_stream(extraction_id: &ExtractionId) -> StreamId {
        StreamId::new(format!("extraction:{}", extraction_id))
    }
}

2. Command-Centric Design with EventCore

// Commands define their own consistency boundaries
#[derive(Command)]
pub struct StartSessionAnalysis {
    #[stream]
    session_stream: StreamId,
    #[stream]
    analysis_stream: StreamId,
    analysis_config: AnalysisConfig,
}

#[async_trait]
impl CommandLogic for StartSessionAnalysis {
    type State = SessionAnalysisState;
    type Event = DomainEvent;
    
    async fn handle(
        &self,
        read_streams: ReadStreams<Self::StreamSet>,
        state: Self::State,
        stream_resolver: &mut StreamResolver,
    ) -> CommandResult<Vec<StreamWrite<Self::StreamSet, Self::Event>>> {
        let mut events = Vec::new();
        
        // Business logic validation
        require!(state.session.status == SessionStatus::Complete, "Cannot analyze incomplete session");
        require!(!state.analysis_exists, "Analysis already started for this session");
        
        // Emit events to multiple streams atomically
        emit!(
            events,
            &read_streams,
            self.session_stream.clone(),
            DomainEvent::AnalysisStarted { 
                session_id: state.session.id.clone(),
                analysis_id: self.analysis_config.id.clone(),
            }
        );
        
        emit!(
            events,
            &read_streams,
            self.analysis_stream.clone(),
            DomainEvent::AnalysisCreated {
                analysis_id: self.analysis_config.id.clone(),
                config: self.analysis_config.clone(),
                session_id: state.session.id.clone(),
            }
        );
        
        Ok(events)
    }
}

3. Stream Lifecycle Management

// Document stream lifecycles and relationships
pub struct StreamDocumentation {
    pub stream_pattern: &'static str,
    pub purpose: &'static str,
    pub lifecycle: StreamLifecycle,
    pub related_streams: Vec<&'static str>,
}

pub const STREAM_DOCS: &[StreamDocumentation] = &[
    StreamDocumentation {
        stream_pattern: "session:{session_id}",
        purpose: "Tracks all events for a single LLM session",
        lifecycle: StreamLifecycle::Bounded {
            created_by: "StartSession command",
            closed_by: "EndSession command",
            retention: RetentionPolicy::Days(90),
        },
        related_streams: vec!["analysis:{analysis_id}", "user:{user_id}:settings"],
    },
    StreamDocumentation {
        stream_pattern: "analysis:{analysis_id}",
        purpose: "Tracks analysis process and results",
        lifecycle: StreamLifecycle::Bounded {
            created_by: "StartSessionAnalysis command",
            closed_by: "CompleteAnalysis command",
            retention: RetentionPolicy::Days(365),
        },
        related_streams: vec!["session:{session_id}"],
    },
];

4. Complex Multi-Stream Operations

// Example: Version migration affecting multiple streams
#[derive(Command)]
pub struct MigrateSessionVersion {
    #[stream]
    session_stream: StreamId,
    #[stream]
    version_tracking_stream: StreamId,
    #[stream]
    migration_log_stream: StreamId,
    from_version: ModelVersion,
    to_version: ModelVersion,
}

// This command atomically updates all three streams
// EventCore ensures consistency across all streams

5. Stream Query Patterns

// Efficient querying across related streams
pub async fn get_session_with_analyses(
    session_id: &SessionId,
    event_store: &PostgresEventStore,
) -> Result<SessionWithAnalyses, DomainError> {
    // Read session stream
    let session_stream = streams::session_stream(session_id);
    let session_events = event_store.read_stream(&session_stream, None).await?;
    
    // Extract analysis IDs from session events
    let analysis_ids = extract_analysis_ids(&session_events);
    
    // Read all related analysis streams
    let mut analyses = Vec::new();
    for analysis_id in analysis_ids {
        let analysis_stream = streams::analysis_stream(&analysis_id);
        let analysis_events = event_store.read_stream(&analysis_stream, None).await?;
        analyses.push(build_analysis_projection(analysis_events)?);
    }
    
    Ok(SessionWithAnalyses {
        session: build_session_projection(session_events)?,
        analyses,
    })
}

Benefits of EventCore's Approach for Union Square

  1. Natural Cross-Boundary Operations: Session analysis naturally spans session and analysis streams
  2. Flexible Consistency: Different commands can have different consistency requirements
  3. Type Safety: Command derives and macros ensure compile-time correctness
  4. Evolution-Friendly: Easy to add new streams or change consistency boundaries

Success Criteria

  • Stream naming conventions established and documented
  • Commands redesigned to use #[derive(Command)] with stream dependencies
  • Stream lifecycle documentation created
  • Multi-stream query patterns implemented
  • EventCore macros (require!, emit!) used throughout
  • Developer guide for stream-centric design

References

  • EventCore Documentation
  • EventCore's stream-centric architecture
  • Updated expert review acknowledging EventCore's approach

Metadata

Metadata

Assignees

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions