Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 2 additions & 15 deletions internal/finitestate/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,12 @@ type Machine struct {
broadcastManager *broadcast.Manager
}

// GetStateChanWithTimeout returns a channel that emits the state whenever it changes.
// The channel is closed when the provided context is canceled.
// For v1 API compatibility, the current state is sent immediately to the channel.
func (s *Machine) GetStateChanWithTimeout(ctx context.Context) <-chan string {
return s.getStateChanInternal(ctx, broadcast.WithTimeout(5*time.Second))
}

// GetStateChan returns a channel that emits the state whenever it changes.
// The channel is closed when the provided context is canceled.
// For v1 API compatibility, the current state is sent immediately to the channel.
// A 5-second broadcast timeout is used to prevent slow consumers from blocking state updates.
func (s *Machine) GetStateChan(ctx context.Context) <-chan string {
return s.getStateChanInternal(ctx)
}

// GetStateChanWithOptions returns a channel that emits the state whenever it changes
// with custom broadcast options.
// For v1 API compatibility, the current state is sent immediately to the channel.
func (s *Machine) GetStateChanWithOptions(ctx context.Context, opts ...broadcast.Option) <-chan string {
return s.getStateChanInternal(ctx, opts...)
return s.getStateChanInternal(ctx, broadcast.WithTimeout(5*time.Second))
}

// getStateChanInternal is a helper that creates a channel and sends the current state to it.
Expand Down
8 changes: 4 additions & 4 deletions internal/finitestate/machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func TestGetStateChanWithTimeout(t *testing.T) {
defer cancel()

// Get state channel with timeout
stateChan := machine.GetStateChanWithTimeout(ctx)
stateChan := machine.GetStateChan(ctx)
require.NotNil(t, stateChan)

// Should receive initial state
Expand Down Expand Up @@ -272,7 +272,7 @@ func TestGetStateChanWithTimeout(t *testing.T) {
defer cancel()

// Get state channel with timeout
stateChan := machine.GetStateChanWithTimeout(ctx)
stateChan := machine.GetStateChan(ctx)
require.NotNil(t, stateChan)

// Wait for context to timeout
Expand All @@ -290,8 +290,8 @@ func TestGetStateChanWithTimeout(t *testing.T) {
defer cancel()

// Get multiple state channels
stateChan1 := machine.GetStateChanWithTimeout(ctx)
stateChan2 := machine.GetStateChanWithTimeout(ctx)
stateChan1 := machine.GetStateChan(ctx)
stateChan2 := machine.GetStateChan(ctx)
require.NotNil(t, stateChan1)
require.NotNil(t, stateChan2)
assert.NotEqual(t, stateChan1, stateChan2, "Should create different channel instances")
Expand Down
2 changes: 1 addition & 1 deletion runnables/composite/reload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (m *MockStateMachine) TransitionIfCurrentState(current, next string) error
return args.Error(0)
}

func (m *MockStateMachine) GetStateChanWithTimeout(ctx context.Context) <-chan string {
func (m *MockStateMachine) GetStateChan(ctx context.Context) <-chan string {
args := m.Called(ctx)
return args.Get(0).(<-chan string)
}
Expand Down
2 changes: 1 addition & 1 deletion runnables/composite/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type ConfigCallback[T runnable] func() (*Config[T], error)

type fsm interface {
GetState() string
GetStateChanWithTimeout(ctx context.Context) <-chan string
GetStateChan(ctx context.Context) <-chan string
Transition(state string) error
TransitionIfCurrentState(state string, targetState string) error
SetState(state string) error
Expand Down
4 changes: 2 additions & 2 deletions runnables/composite/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ func (r *Runner[T]) GetState() string {

// GetStateChan returns a channel that will receive state updates.
func (r *Runner[T]) GetStateChan(ctx context.Context) <-chan string {
return r.fsm.GetStateChanWithTimeout(ctx)
return r.fsm.GetStateChan(ctx)
}

// GetStateChanWithTimeout returns a channel that emits state changes.
// It's a pass-through to the underlying finite state machine.
func (r *Runner[T]) GetStateChanWithTimeout(ctx context.Context) <-chan string {
return r.fsm.GetStateChanWithTimeout(ctx)
return r.fsm.GetStateChan(ctx)
}

// IsRunning returns true if the runner is in the Running state.
Expand Down
2 changes: 1 addition & 1 deletion runnables/composite/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestGetStateChan_PassThrough(t *testing.T) {

// Create a mock FSM
mockFSM := new(MockStateMachine)
mockFSM.On("GetStateChanWithTimeout", mock.Anything).Return(stateCh)
mockFSM.On("GetStateChan", mock.Anything).Return(stateCh)

// Create a runner
cb := func() (*Config[*mocks.Runnable], error) {
Expand Down
2 changes: 1 addition & 1 deletion runnables/httpcluster/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (

type fsm interface {
GetState() string
GetStateChanWithTimeout(ctx context.Context) <-chan string
GetStateChan(ctx context.Context) <-chan string
Transition(state string) error
TransitionIfCurrentState(state string, targetState string) error
SetState(state string) error
Expand Down
4 changes: 2 additions & 2 deletions runnables/httpcluster/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ func (r *Runner) GetState() string {

// GetStateChan returns a channel that receives state updates.
func (r *Runner) GetStateChan(ctx context.Context) <-chan string {
return r.fsm.GetStateChanWithTimeout(ctx)
return r.fsm.GetStateChan(ctx)
}

// GetStateChanWithTimeout returns a channel that emits state changes from the Runner.
// The channel is closed when the provided context is canceled.
func (r *Runner) GetStateChanWithTimeout(ctx context.Context) <-chan string {
return r.fsm.GetStateChanWithTimeout(ctx)
return r.fsm.GetStateChan(ctx)
}

// IsRunning returns true if the cluster is in the Running state.
Expand Down
2 changes: 1 addition & 1 deletion runnables/httpcluster/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (m *MockFSMForStateError) GetState() string {
return args.String(0)
}

func (m *MockFSMForStateError) GetStateChanWithTimeout(ctx context.Context) <-chan string {
func (m *MockFSMForStateError) GetStateChan(ctx context.Context) <-chan string {
args := m.Called(ctx)
return args.Get(0).(<-chan string)
}
Expand Down
2 changes: 1 addition & 1 deletion runnables/httpserver/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (m *MockStateMachine) GetState() string {

// GetStateChan mocks the GetStateChan method of the stateMachine interface.
// It returns a channel that emits the state machine's state whenever it changes.
func (m *MockStateMachine) GetStateChanWithTimeout(ctx context.Context) <-chan string {
func (m *MockStateMachine) GetStateChan(ctx context.Context) <-chan string {
args := m.Called(ctx)
return args.Get(0).(<-chan string)
}
Expand Down
2 changes: 1 addition & 1 deletion runnables/httpserver/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type HttpServer interface {

type fsm interface {
GetState() string
GetStateChanWithTimeout(ctx context.Context) <-chan string
GetStateChan(ctx context.Context) <-chan string
Transition(state string) error
SetState(state string) error
TransitionBool(state string) bool
Expand Down
4 changes: 2 additions & 2 deletions runnables/httpserver/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ func (r *Runner) GetState() string {
// GetStateChan returns a channel that emits the HTTP server's state whenever it changes.
// The channel is closed when the provided context is canceled.
func (r *Runner) GetStateChan(ctx context.Context) <-chan string {
return r.fsm.GetStateChanWithTimeout(ctx)
return r.fsm.GetStateChan(ctx)
}

// GetStateChanWithTimeout returns a channel that emits state changes.
// The channel is closed when the provided context is canceled.
func (r *Runner) GetStateChanWithTimeout(ctx context.Context) <-chan string {
return r.fsm.GetStateChanWithTimeout(ctx)
return r.fsm.GetStateChan(ctx)
}

// IsRunning returns true if the HTTP server is currently running.
Expand Down