Skip to content

Commit a9f90db

Browse files
committed
feat: add batch response to support partial failures
BREAKING CHANGE: multiMessageHandler renamed to batchHandler. It now requires a BatchResponse
1 parent 0936e34 commit a9f90db

File tree

6 files changed

+66
-33
lines changed

6 files changed

+66
-33
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Formigo is a powerful and flexible Golang library designed to simplify the proce
66

77
- **Efficient Throughput Management**: it offers optimal throughput management, allowing you to fine-tune the number of Go routines responsible for both polling messages from the queue and processing them. This dynamic control ensures maximum efficiency in various scenarios, making the library highly adaptable to your application's needs.
88

9-
- **Configurable Batch Processing**: it uses powerful batch processing capabilities, enabling you to handle messages efficiently in customizable batches. With the Multiple Message Handler, messages can be processed in batches of a size you define, granting you full control over the processing logic. Moreover, you can adjust the batch buffer size and timeout settings, providing a flexible and optimal solution to process messages under various workloads.
9+
- **Configurable Batch Processing**: it uses powerful batch processing capabilities, enabling you to handle messages efficiently in customizable batches. With the Batch Handler, messages can be processed in batches of a size you define, granting you full control over the processing logic. Moreover, you can adjust the batch buffer size and timeout settings, providing a flexible and optimal solution to process messages under various workloads.
1010

1111
- **Context Cancellation**: Effortlessly stop the QueueWorker by canceling its context. This feature guarantees smooth and controlled termination of the worker whenever required.
1212

@@ -140,8 +140,8 @@ func main() {
140140
wkr := formigo.NewWorker(formigo.Configuration{
141141
Client: sqsClient,
142142
Concurrency: 100,
143-
Consumer: formigo.NewMultiMessageConsumer(formigo.MultiMessageConsumerConfiguration{
144-
BufferConfig: formigo.MultiMessageBufferConfiguration{
143+
Consumer: formigo.BatchConsumer(formigo.BatchConsumerConfiguration{
144+
BufferConfig: formigo.BatchBufferConfiguration{
145145
Size: 100,
146146
Timeout: time.Second * 5,
147147
},
@@ -180,7 +180,7 @@ By processing messages in batches, the worker can significantly enhance throughp
180180
| Concurrency | Number of Go routines that process the messages from the Queue. Higher values are useful for slow I/O operations in the consumer's handler. | 100 |
181181
| Retrievers | Number of Go routines that retrieve messages from the Queue. Higher values are helpful for slow networks or when consumers are quicker. | 1 |
182182
| ErrorConfig | Defines the error threshold and interval for worker termination and error reporting function. | None |
183-
| Consumer | The message consumer, either MessageConsumer or MultipleMessageConsumer. | None |
183+
| Consumer | The message consumer, either MessageConsumer or BatchConsumer. | None |
184184

185185
## License
186186

config.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ type ErrorConfiguration struct {
3737
ReportFunc func(err error) bool
3838
}
3939

40-
// The MultiMessageBufferConfiguration defines a buffer which is consumed by the worker when either
40+
// The BatchConsumerBufferConfiguration defines a buffer which is consumed by the worker when either
4141
// the buffer is full or the timeout has passed since the first message got added.
42-
type MultiMessageBufferConfiguration struct {
42+
type BatchConsumerBufferConfiguration struct {
4343
// Max number of messages that the buffer can contain.
4444
// Default: 10.
4545
Size int
@@ -56,9 +56,9 @@ type MessageConsumerConfiguration struct {
5656
Handler messageHandler
5757
}
5858

59-
type MultiMessageConsumerConfiguration struct {
60-
Handler multiMessageHandler
61-
BufferConfig MultiMessageBufferConfiguration
59+
type BatchConsumerConfiguration struct {
60+
Handler batchHandler
61+
BufferConfig BatchConsumerBufferConfiguration
6262
}
6363

6464
type Configuration struct {

consumers.go

Lines changed: 49 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@ import (
1010
"github.com/francescopepe/formigo/internal/messages"
1111
)
1212

13-
type singleMessageHandler = func(ctx context.Context, msg Message) error
14-
type multiMessageHandler = func(ctx context.Context, msgs []Message) error
13+
type BatchResponse struct {
14+
FailedMessagesId []interface{}
15+
}
16+
1517
type messageHandler = func(ctx context.Context, msg Message) error
18+
type batchHandler = func(ctx context.Context, msgs []Message) (BatchResponse, error)
1619

1720
// This means that the buffered messages didn't get passed to the handler within
1821
// the first message's timeout.
@@ -103,39 +106,43 @@ func NewMessageConsumer(config MessageConsumerConfiguration) *messageConsumer {
103106
}
104107
}
105108

106-
// multiMessageConsumer allows to process multiple messages at a time. This can be useful
109+
// batchConsumer allows to process multiple messages at a time. This can be useful
107110
// for batch updates or use cases with high throughput.
108-
type multiMessageConsumer struct {
109-
handler multiMessageHandler
110-
bufferConfig MultiMessageBufferConfiguration
111+
type batchConsumer struct {
112+
handler batchHandler
113+
bufferConfig BatchConsumerBufferConfiguration
111114
}
112115

113116
// It processes the messages and push them downstream for deletion.
114-
func (c *multiMessageConsumer) processMessages(ctrl *controller, deleteCh chan<- messages.Message, ctx context.Context, msgs []messages.Message) {
115-
err := wrapHandler(func() error {
116-
// Convert slice to the abstraction
117-
converted := make([]Message, len(msgs))
118-
for _, msg := range msgs {
119-
converted = append(converted, msg)
117+
func (c *batchConsumer) processMessages(ctrl *controller, deleteCh chan<- messages.Message, ctx context.Context, msgs []messages.Message) {
118+
defer func() {
119+
if r := recover(); r != nil {
120+
ctrl.reportError(fmt.Errorf("panic error: %s", r))
120121
}
122+
}()
121123

122-
return c.handler(ctx, converted)
123-
})
124+
// Convert slice to the abstraction
125+
converted := make([]Message, 0, len(msgs))
126+
for _, msg := range msgs {
127+
converted = append(converted, msg)
128+
}
129+
130+
resp, err := c.handler(ctx, converted)
124131
if err != nil {
125-
ctrl.reportError(fmt.Errorf("failed to process messages: %w", err))
126-
return
132+
ctrl.reportError(fmt.Errorf("failed to process batch: %w", err))
127133
}
128134

135+
toDelete := c.buildMessagesToDeleteFromBatchResponse(msgs, resp)
129136
// Push messages for deletion
130-
for _, msg := range msgs {
137+
for _, msg := range toDelete {
131138
deleteCh <- msg
132139
}
133140
}
134141

135142
// Consumes and deletes a number of messages in the interval [1, N] based on configuration
136143
// provided in the BufferConfiguration.
137144
// It stops only when the messageCh gets closed and doesn't have any messages in it.
138-
func (c *multiMessageConsumer) consume(concurrency int, ctrl *controller, messageCh <-chan messages.Message, deleteCh chan<- messages.Message) {
145+
func (c *batchConsumer) consume(concurrency int, ctrl *controller, messageCh <-chan messages.Message, deleteCh chan<- messages.Message) {
139146
consumers := makeAvailableConsumers(concurrency)
140147

141148
// Create buffer
@@ -211,7 +218,28 @@ func (c *multiMessageConsumer) consume(concurrency int, ctrl *controller, messag
211218
wg.Wait()
212219
}
213220

214-
func NewMultiMessageConsumer(config MultiMessageConsumerConfiguration) *multiMessageConsumer {
221+
func (c *batchConsumer) buildMessagesToDeleteFromBatchResponse(msgs []messages.Message, resp BatchResponse) []messages.Message {
222+
if len(resp.FailedMessagesId) == 0 {
223+
return msgs
224+
}
225+
226+
toDelete := make([]messages.Message, 0, len(msgs))
227+
228+
failedMessagesIdIndexed := make(map[interface{}]struct{}, len(resp.FailedMessagesId))
229+
for _, id := range resp.FailedMessagesId {
230+
failedMessagesIdIndexed[id] = struct{}{}
231+
}
232+
233+
for _, msg := range msgs {
234+
if _, ok := failedMessagesIdIndexed[msg.Id()]; !ok {
235+
toDelete = append(toDelete, msg)
236+
}
237+
}
238+
239+
return toDelete
240+
}
241+
242+
func NewBatchConsumer(config BatchConsumerConfiguration) *batchConsumer {
215243
if config.BufferConfig.Size == 0 {
216244
config.BufferConfig.Size = 10
217245
}
@@ -220,15 +248,14 @@ func NewMultiMessageConsumer(config MultiMessageConsumerConfiguration) *multiMes
220248
config.BufferConfig.Timeout = time.Second
221249
}
222250

223-
return &multiMessageConsumer{
251+
return &batchConsumer{
224252
handler: config.Handler,
225253
bufferConfig: config.BufferConfig,
226254
}
227255
}
228256

229257
// Interface guards
230258
var (
231-
_ consumer = (*singleMessageConsumer)(nil)
232-
_ consumer = (*multiMessageConsumer)(nil)
233259
_ consumer = (*messageConsumer)(nil)
260+
_ consumer = (*batchConsumer)(nil)
234261
)

internal/messages/messages.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,14 @@ type Message struct {
99
Ctx context.Context
1010
CancelCtx context.CancelFunc
1111
Msg interface{}
12+
MsgId interface{}
1213
ReceivedTime time.Time
1314
}
1415

16+
func (m Message) Id() interface{} {
17+
return m.MsgId
18+
}
19+
1520
func (m Message) Content() interface{} {
1621
return m.Msg
1722
}
@@ -145,7 +150,7 @@ func (b *BufferWithContextTimeout) Add(msg Message) {
145150

146151
// Reset resets its internal buffer, cancel the current context created and
147152
// reset any timeout.
148-
// It's important to call this function avoid memory leaks. In fact, the
153+
// It's important to call this function to avoid memory leaks. In fact, the
149154
// GC won't collect any timer or resources allocated within the context.
150155
// NOTE: this function should be always called to clean up any buffer
151156
// created. Used in defer can guarantee that it always run.

message.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@ import "time"
44

55
type Message interface {
66
ReceivedAt() time.Time
7-
Raw() interface{}
87
Content() interface{}
8+
Id() interface{}
99
}

sqs.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ func (c sqsClient) prepareMessagesForDeletion(messages []messages.Message) []typ
8282

8383
func (c sqsClient) createMessage(sqsMessage types.Message) messages.Message {
8484
msg := messages.Message{
85+
MsgId: *sqsMessage.MessageId,
8586
Msg: sqsMessage,
8687
ReceivedTime: time.Now(),
8788
}

0 commit comments

Comments
 (0)