Skip to content
Merged
89 changes: 60 additions & 29 deletions messaging/messaging_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"mime/multipart"
"net/http"
"net/textproto"
"sync"

"firebase.google.com/go/v4/internal"
)
Expand Down Expand Up @@ -165,53 +164,85 @@ func (c *fcmClient) sendEachInBatch(ctx context.Context, messages []*Message, dr
return nil, fmt.Errorf("messages must not contain more than %d elements", maxMessages)
}

var responses []*SendResponse = make([]*SendResponse, len(messages))
var wg sync.WaitGroup

for idx, m := range messages {
if err := validateMessage(m); err != nil {
return nil, fmt.Errorf("invalid message at index %d: %v", idx, err)
}
wg.Add(1)
go func(idx int, m *Message, dryRun bool, responses []*SendResponse) {
defer wg.Done()
var resp string
var err error
if dryRun {
resp, err = c.SendDryRun(ctx, m)
} else {
resp, err = c.Send(ctx, m)
}
if err == nil {
responses[idx] = &SendResponse{
Success: true,
MessageID: resp,
}
} else {
responses[idx] = &SendResponse{
Success: false,
Error: err,
}
}
}(idx, m, dryRun, responses)
}
// Wait for all SendDryRun/Send calls to finish
wg.Wait()

const numWorkers = 50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would raising this to 100 to match the backend max concurrent stream limit be something we consider.
I think 50 is reasonable considering this multiplies per sendEachInBatch call.

nit: Can we update the PR description to specify 50 workers per sendEachInBatch call

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point, I can run some tests with 100 workers to see if that affects performance

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 50 is a good place to start though. If there are multiple calls to this API, increasing it to 100 will demand a lot more local resources

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets go with 50 then!

jobs := make(chan job, len(messages))
results := make(chan result, len(messages))

responses := make([]*SendResponse, len(messages))

for w := 0; w < numWorkers; w++ {
go worker(ctx, c, dryRun, jobs, results)
}

for idx, m := range messages {
jobs <- job{message: m, index: idx}
}
close(jobs)

for i := 0; i < len(messages); i++ {
res := <-results
responses[res.index] = res.response
}

successCount := 0
failureCount := 0
for _, r := range responses {
if r.Success {
successCount++
} else {
failureCount++
}
}

return &BatchResponse{
Responses: responses,
SuccessCount: successCount,
FailureCount: len(responses) - successCount,
FailureCount: failureCount,
}, nil
}

type job struct {
message *Message
index int
}

type result struct {
response *SendResponse
index int
}

func worker(ctx context.Context, c *fcmClient, dryRun bool, jobs <-chan job, results chan<- result) {
for j := range jobs {
var respMsg string
var err error
if dryRun {
respMsg, err = c.SendDryRun(ctx, j.message)
} else {
respMsg, err = c.Send(ctx, j.message)
}

var sr *SendResponse
if err == nil {
sr = &SendResponse{
Success: true,
MessageID: respMsg,
}
} else {
sr = &SendResponse{
Success: false,
Error: err,
}
}
results <- result{response: sr, index: j.index}
}
}

// SendAll sends the messages in the given array via Firebase Cloud Messaging.
//
// The messages array may contain up to 500 messages. SendAll employs batching to send the entire
Expand Down
Loading
Loading