Skip to content

Commit 1bdca69

Browse files
fix(fcm): Optimize SendEachInBatch with worker pool (#695)
* Refactor: Optimize SendEachInBatch with worker pool The `sendEachInBatch` function, used for sending FCM messages individually within a batch, previously created a new goroutine for each message. This could lead to high CPU usage for large batches. This commit refactors `sendEachInBatch` to use a fixed-size pool of concurrent operations (10 operations) to manage message sending. This limits the number of active goroutines, reducing CPU overhead and improving resource utilization. Key changes: - Implemented early validation: All messages are now validated upfront. If any message is invalid, the function returns an error immediately without attempting to send any messages. - Introduced a mechanism for managing concurrent operations: Manages a fixed number of goroutines to process message sending tasks. Messages are distributed via channels. - Ensured result ordering: The order of responses in `BatchResponse.Responses` correctly matches the order of the input messages, even with concurrent processing. - Updated unit tests in `messaging_batch_test.go` to comprehensively cover the new implementation, including scenarios for varying batch sizes, partial failures, early validation, and response ordering. - Confirmed that the existing HTTP client continues to leverage HTTP/2. * Refactor: Increase SendEachInBatch worker pool size to 50 This commit increases the number of goroutines in the `sendEachInBatch` function from 10 to 50. This change is based on your feedback to potentially improve throughput for large batches of FCM messages. The number 50 was chosen as a significant increase over the previous conservative value of 10, aiming to provide better concurrency for I/O-bound operations without being excessively high. The goal is to allow for more parallel processing of messages up to the maximum batch limit of 500. Performance in specific environments should be monitored to ensure this change has the desired effect without causing undue resource strain. Unit tests in `messaging_batch_test.go` have been reviewed and adjusted to ensure they remain meaningful with the new pool size, particularly scenarios testing behavior when the number of messages is less than, equal to, or greater than the number of concurrent processes. * Fix: Correct lint and build errors in messaging batch code This commit addresses two issues introduced in previous changes: 1. A syntax error in `messaging/messaging_batch_test.go` caused by a duplicated `t.Run` call and associated redundant variable declarations. 2. An "imported and not used" error for the "sync" package in `messaging/messaging_batch.go`, which occurred after `sync.WaitGroup` was removed and the import statement was not. These fixes ensure the code builds cleanly and passes lint checks. * Fix: Add missing sync import to messaging_batch_test.go This commit re-adds the `import "sync"` statement to `messaging/messaging_batch_test.go`. The `sync` package, specifically `sync.Mutex`, is used within the test suite (e.g., in `TestSendEachWorkerPoolScenarios` and `TestSendEachResponseOrderWithConcurrency`) to protect shared variables like hit counters and logs from race conditions when accessed by concurrently running mock HTTP server handlers. A previous change inadvertently removed this import while cleaning up unused imports in other files, leading to "undefined: sync" build errors in the test file. This commit corrects that oversight. * Fix: Correct multiple failures in messaging test suite This commit addresses several issues in the messaging test suite (`messaging_batch_test.go`) that were causing test failures, including data races and incorrect test logic: 1. **Data Races:** * In `TestSendEachPartialFailure` and `TestSendEachTotalFailure`, the `serverHitCount` variable in mock server handlers was accessed concurrently without synchronization. This has been fixed by introducing a `sync.Mutex` to protect access to `serverHitCount` within each test case's server setup. 2. **Early Validation Test (`TestSendEachEarlyValidationSkipsSend`):** * The `invalid_last_message` test was failing because the type of invalidity used (large payload) was not caught by the early `validateMessage` scan in `sendEachInBatch`. * Fixed by changing the invalid message to one with both Topic and Token defined, which `validateMessage` correctly flags, ensuring the test now accurately verifies early exit behavior and that no server calls are made. 3. **Partial Failure Scenario (`TestSendEachWorkerPoolScenarios`):** * The test for partial failures (`NumMessages_75_AllSuccess_false_...`) was failing due to a mismatch between the mock server's failure simulation (based on request arrival order) and the test's assertions (based on original message index). * Fixed by modifying the mock server to parse the original message index from the message content (topic) and use this for failure simulation, ensuring deterministic alignment with assertions. These changes should ensure the stability and correctness of the messaging batch processing tests, especially those verifying concurrent operations and error handling. * Style: Ensure gofmt formatting for messaging batch code I applied `gofmt -s .` to ensure consistent code formatting, particularly for `messaging/messaging_batch.go` and `messaging/messaging_batch_test.go`. This commit reflects any adjustments made by gofmt to align with standard Go formatting practices, including simplification of struct initializations and consistent spacing. * Style: Apply gofmt fixes including trailing whitespace This commit applies gofmt formatting, with a specific focus on removing trailing whitespace from messaging/messaging_batch_test.go as identified by previous `gofmt -d` outputs. This is part of a series of fixes to ensure the codebase conforms to standard Go formatting practices and to resolve various test failures and linting issues. * fix typo in variable name --------- Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com>
1 parent e66408c commit 1bdca69

File tree

2 files changed

+394
-109
lines changed

2 files changed

+394
-109
lines changed

messaging/messaging_batch.go

Lines changed: 60 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"mime/multipart"
2828
"net/http"
2929
"net/textproto"
30-
"sync"
3130

3231
"firebase.google.com/go/v4/internal"
3332
)
@@ -165,53 +164,85 @@ func (c *fcmClient) sendEachInBatch(ctx context.Context, messages []*Message, dr
165164
return nil, fmt.Errorf("messages must not contain more than %d elements", maxMessages)
166165
}
167166

168-
var responses []*SendResponse = make([]*SendResponse, len(messages))
169-
var wg sync.WaitGroup
170-
171167
for idx, m := range messages {
172168
if err := validateMessage(m); err != nil {
173169
return nil, fmt.Errorf("invalid message at index %d: %v", idx, err)
174170
}
175-
wg.Add(1)
176-
go func(idx int, m *Message, dryRun bool, responses []*SendResponse) {
177-
defer wg.Done()
178-
var resp string
179-
var err error
180-
if dryRun {
181-
resp, err = c.SendDryRun(ctx, m)
182-
} else {
183-
resp, err = c.Send(ctx, m)
184-
}
185-
if err == nil {
186-
responses[idx] = &SendResponse{
187-
Success: true,
188-
MessageID: resp,
189-
}
190-
} else {
191-
responses[idx] = &SendResponse{
192-
Success: false,
193-
Error: err,
194-
}
195-
}
196-
}(idx, m, dryRun, responses)
197171
}
198-
// Wait for all SendDryRun/Send calls to finish
199-
wg.Wait()
172+
173+
const numWorkers = 50
174+
jobs := make(chan job, len(messages))
175+
results := make(chan result, len(messages))
176+
177+
responses := make([]*SendResponse, len(messages))
178+
179+
for w := 0; w < numWorkers; w++ {
180+
go worker(ctx, c, dryRun, jobs, results)
181+
}
182+
183+
for idx, m := range messages {
184+
jobs <- job{message: m, index: idx}
185+
}
186+
close(jobs)
187+
188+
for i := 0; i < len(messages); i++ {
189+
res := <-results
190+
responses[res.index] = res.response
191+
}
200192

201193
successCount := 0
194+
failureCount := 0
202195
for _, r := range responses {
203196
if r.Success {
204197
successCount++
198+
} else {
199+
failureCount++
205200
}
206201
}
207202

208203
return &BatchResponse{
209204
Responses: responses,
210205
SuccessCount: successCount,
211-
FailureCount: len(responses) - successCount,
206+
FailureCount: failureCount,
212207
}, nil
213208
}
214209

210+
type job struct {
211+
message *Message
212+
index int
213+
}
214+
215+
type result struct {
216+
response *SendResponse
217+
index int
218+
}
219+
220+
func worker(ctx context.Context, c *fcmClient, dryRun bool, jobs <-chan job, results chan<- result) {
221+
for j := range jobs {
222+
var respMsg string
223+
var err error
224+
if dryRun {
225+
respMsg, err = c.SendDryRun(ctx, j.message)
226+
} else {
227+
respMsg, err = c.Send(ctx, j.message)
228+
}
229+
230+
var sr *SendResponse
231+
if err == nil {
232+
sr = &SendResponse{
233+
Success: true,
234+
MessageID: respMsg,
235+
}
236+
} else {
237+
sr = &SendResponse{
238+
Success: false,
239+
Error: err,
240+
}
241+
}
242+
results <- result{response: sr, index: j.index}
243+
}
244+
}
245+
215246
// SendAll sends the messages in the given array via Firebase Cloud Messaging.
216247
//
217248
// The messages array may contain up to 500 messages. SendAll employs batching to send the entire

0 commit comments

Comments
 (0)