|
1 | 1 | package demux
|
2 | 2 |
|
3 |
| -import "sync" |
| 3 | +import ( |
| 4 | + "container/list" |
| 5 | + "math" |
| 6 | + "sync" |
| 7 | +) |
| 8 | + |
| 9 | +type channelInfo[T any, K comparable] struct { |
| 10 | + ch chan T |
| 11 | + key K |
| 12 | + element *list.Element |
| 13 | +} |
| 14 | + |
| 15 | +// Config holds configuration for the dynamic demultiplexer |
| 16 | +type Config struct { |
| 17 | + maxChannels int |
| 18 | + bufferSize int |
| 19 | +} |
| 20 | + |
| 21 | +// Option is a functional option for configuring the demultiplexer |
| 22 | +type Option func(*Config) |
| 23 | + |
| 24 | +// WithMaxChannels sets the maximum number of concurrent channels |
| 25 | +func WithMaxChannels(max int) Option { |
| 26 | + return func(c *Config) { |
| 27 | + if max > 0 { |
| 28 | + c.maxChannels = max |
| 29 | + } |
| 30 | + } |
| 31 | +} |
| 32 | + |
| 33 | +// WithBufferSize sets the buffer size for each channel |
| 34 | +func WithBufferSize(size int) Option { |
| 35 | + return func(c *Config) { |
| 36 | + if size >= 0 { |
| 37 | + c.bufferSize = size |
| 38 | + } |
| 39 | + } |
| 40 | +} |
| 41 | + |
| 42 | +// defaultConfig returns the default configuration |
| 43 | +func defaultConfig() Config { |
| 44 | + return Config{ |
| 45 | + maxChannels: math.MaxInt, // uncapped by default |
| 46 | + bufferSize: 0, // unbuffered by default |
| 47 | + } |
| 48 | +} |
4 | 49 |
|
5 | 50 | // Dynamic creates dynamic demultiplexer that routes items from 'in' based on keys returned by 'keyFunc'.
|
6 | 51 | // For each unique key, a new goroutine is spawned running 'consumeFunc'.
|
7 | 52 | // Each consumeFunc receives a channel that delivers values matching its key.
|
| 53 | +// When maxChannels limit is reached, least recently used channels are evicted. |
8 | 54 | func Dynamic[T any, K comparable](
|
9 | 55 | in <-chan T,
|
10 | 56 | keyFunc func(T) K,
|
11 | 57 | consumeFunc func(K, <-chan T),
|
| 58 | + opts ...Option, |
12 | 59 | ) {
|
13 |
| - outChans := make(map[K]chan T) |
| 60 | + // Apply options |
| 61 | + config := defaultConfig() |
| 62 | + for _, opt := range opts { |
| 63 | + opt(&config) |
| 64 | + } |
| 65 | + |
| 66 | + outChans := make(map[K]*channelInfo[T, K]) |
| 67 | + lru := list.New() |
14 | 68 | var wg sync.WaitGroup
|
15 | 69 |
|
16 | 70 | for t := range in {
|
17 | 71 | key := keyFunc(t)
|
18 |
| - ch, exists := outChans[key] |
19 |
| - if !exists { |
20 |
| - ch = make(chan T) |
21 |
| - outChans[key] = ch |
| 72 | + |
| 73 | + info, exists := outChans[key] |
| 74 | + |
| 75 | + if exists { |
| 76 | + // Move to front (most recently used) |
| 77 | + lru.MoveToFront(info.element) |
| 78 | + } else { |
| 79 | + // Check if we need to evict |
| 80 | + if len(outChans) >= config.maxChannels { |
| 81 | + // Evict least recently used |
| 82 | + oldest := lru.Back() |
| 83 | + if oldest != nil { |
| 84 | + oldInfo := oldest.Value.(*channelInfo[T, K]) |
| 85 | + close(oldInfo.ch) |
| 86 | + delete(outChans, oldInfo.key) |
| 87 | + lru.Remove(oldest) |
| 88 | + } |
| 89 | + } |
| 90 | + |
| 91 | + // Create new channel with configured buffer size |
| 92 | + ch := make(chan T, config.bufferSize) |
| 93 | + element := lru.PushFront(nil) |
| 94 | + info = &channelInfo[T, K]{ |
| 95 | + ch: ch, |
| 96 | + key: key, |
| 97 | + element: element, |
| 98 | + } |
| 99 | + element.Value = info |
| 100 | + outChans[key] = info |
22 | 101 |
|
23 | 102 | wg.Add(1)
|
24 |
| - go func() { |
| 103 | + go func(k K, c <-chan T) { |
25 | 104 | defer wg.Done()
|
26 |
| - consumeFunc(key, ch) |
27 |
| - }() |
| 105 | + consumeFunc(k, c) |
| 106 | + }(key, ch) |
28 | 107 | }
|
29 |
| - ch <- t |
| 108 | + |
| 109 | + info.ch <- t |
30 | 110 | }
|
31 | 111 |
|
32 |
| - for _, ch := range outChans { |
33 |
| - close(ch) |
| 112 | + // Close all remaining channels |
| 113 | + for _, info := range outChans { |
| 114 | + close(info.ch) |
34 | 115 | }
|
35 | 116 |
|
36 | 117 | wg.Wait()
|
|
0 commit comments