-
Notifications
You must be signed in to change notification settings - Fork 209
implement synchronous validation for locally published messages #406
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
9921a4a
c4eef39
c7babc2
b937456
7beca84
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ import ( | |
| "context" | ||
| "fmt" | ||
| "runtime" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/libp2p/go-libp2p-core/peer" | ||
|
|
@@ -15,6 +16,16 @@ const ( | |
| defaultValidateThrottle = 8192 | ||
| ) | ||
|
|
||
| // ValidationError is an error that may be signalled from message publication when the message | ||
| // fails validation | ||
| type ValidationError struct { | ||
| Reason string | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: the nice way to do this is to make this "Reason" an error as well and expose it via There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In principle this is indeed quite nice, but it's quite likely unnecessary -- the Reason strings are exported and part of the public API for tracers. We could add named errors for all of them but this seems more work than what is worth right now. If you feel strongly about it, let's make an issue to address it. |
||
| } | ||
|
|
||
| func (e ValidationError) Error() string { | ||
| return e.Reason | ||
| } | ||
|
|
||
| // Validator is a function that validates a message with a binary decision: accept or reject. | ||
| type Validator func(context.Context, peer.ID, *Message) bool | ||
|
|
||
|
|
@@ -56,6 +67,8 @@ type validation struct { | |
|
|
||
| tracer *pubsubTracer | ||
|
|
||
| // mx protects the validator map | ||
| mx sync.Mutex | ||
| // topicVals tracks per topic validators | ||
| topicVals map[string]*topicVal | ||
|
|
||
|
|
@@ -123,6 +136,9 @@ func (v *validation) Start(p *PubSub) { | |
|
|
||
| // AddValidator adds a new validator | ||
| func (v *validation) AddValidator(req *addValReq) { | ||
| v.mx.Lock() | ||
| defer v.mx.Unlock() | ||
|
|
||
| topic := req.topic | ||
|
|
||
| _, ok := v.topicVals[topic] | ||
|
|
@@ -180,6 +196,9 @@ func (v *validation) AddValidator(req *addValReq) { | |
|
|
||
| // RemoveValidator removes an existing validator | ||
| func (v *validation) RemoveValidator(req *rmValReq) { | ||
| v.mx.Lock() | ||
| defer v.mx.Unlock() | ||
|
|
||
| topic := req.topic | ||
|
|
||
| _, ok := v.topicVals[topic] | ||
|
|
@@ -191,6 +210,20 @@ func (v *validation) RemoveValidator(req *rmValReq) { | |
| } | ||
| } | ||
|
|
||
| // Publish synchronously accepts a locally published message, performs applicable | ||
| // validations and pushes the message for propagate by the pubsub system | ||
| func (v *validation) Publish(msg *Message) error { | ||
vyzo marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| v.p.tracer.PublishMessage(msg) | ||
|
|
||
| err := v.p.checkSignature(msg) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| vals := v.getValidators(msg) | ||
| return v.validate(vals, msg.ReceivedFrom, msg, true) | ||
| } | ||
|
|
||
| // Push pushes a message into the validation pipeline. | ||
| // It returns true if the message can be forwarded immediately without validation. | ||
| func (v *validation) Push(src peer.ID, msg *Message) bool { | ||
|
|
@@ -211,6 +244,9 @@ func (v *validation) Push(src peer.ID, msg *Message) bool { | |
|
|
||
| // getValidators returns all validators that apply to a given message | ||
| func (v *validation) getValidators(msg *Message) []*topicVal { | ||
| v.mx.Lock() | ||
| defer v.mx.Unlock() | ||
|
|
||
| topic := msg.GetTopic() | ||
|
|
||
| val, ok := v.topicVals[topic] | ||
|
|
@@ -226,24 +262,22 @@ func (v *validation) validateWorker() { | |
| for { | ||
| select { | ||
| case req := <-v.validateQ: | ||
| v.validate(req.vals, req.src, req.msg) | ||
| v.validate(req.vals, req.src, req.msg, false) | ||
| case <-v.p.ctx.Done(): | ||
| return | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // validate performs validation and only sends the message if all validators succeed | ||
| // signature validation is performed synchronously, while user validators are invoked | ||
| // asynchronously, throttled by the global validation throttle. | ||
| func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) { | ||
| func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message, synchronous bool) error { | ||
| // If signature verification is enabled, but signing is disabled, | ||
| // the Signature is required to be nil upon receiving the message in PubSub.pushMsg. | ||
| if msg.Signature != nil { | ||
| if !v.validateSignature(msg) { | ||
| log.Debugf("message signature validation failed; dropping message from %s", src) | ||
| v.tracer.RejectMessage(msg, RejectInvalidSignature) | ||
| return | ||
| return ValidationError{Reason: RejectInvalidSignature} | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -252,14 +286,14 @@ func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) { | |
| id := v.p.msgID(msg.Message) | ||
| if !v.p.markSeen(id) { | ||
| v.tracer.DuplicateMessage(msg) | ||
| return | ||
| return nil | ||
| } else { | ||
| v.tracer.ValidateMessage(msg) | ||
| } | ||
|
|
||
| var inline, async []*topicVal | ||
| for _, val := range vals { | ||
| if val.validateInline { | ||
| if val.validateInline || synchronous { | ||
| inline = append(inline, val) | ||
| } else { | ||
| async = append(async, val) | ||
|
|
@@ -283,7 +317,7 @@ loop: | |
| if result == ValidationReject { | ||
| log.Debugf("message validation failed; dropping message from %s", src) | ||
| v.tracer.RejectMessage(msg, RejectValidationFailed) | ||
| return | ||
| return ValidationError{Reason: RejectValidationFailed} | ||
| } | ||
|
|
||
| // apply async validators | ||
|
|
@@ -298,16 +332,21 @@ loop: | |
| log.Debugf("message validation throttled; dropping message from %s", src) | ||
| v.tracer.RejectMessage(msg, RejectValidationThrottled) | ||
| } | ||
| return | ||
| return nil | ||
| } | ||
|
|
||
| if result == ValidationIgnore { | ||
| v.tracer.RejectMessage(msg, RejectValidationIgnored) | ||
| return | ||
| return ValidationError{Reason: RejectValidationIgnored} | ||
| } | ||
|
|
||
| // no async validators, accepted message, send it! | ||
| v.p.sendMsg <- msg | ||
| select { | ||
| case v.p.sendMsg <- msg: | ||
| return nil | ||
| case <-v.p.ctx.Done(): | ||
| return v.p.ctx.Err() | ||
| } | ||
| } | ||
|
|
||
| func (v *validation) validateSignature(msg *Message) bool { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.