Skip to content

Commit c894f1f

Browse files
committed
Make heavy loaded optimization configurable
Scylla Go Driver has a capability to avoid sending requests to an overloaded shard, instead sending the request on a different connection (at the same node). This change makes it possible to customize the parameters used to determine when this behavior would kick in.
1 parent ed9f13a commit c894f1f

File tree

4 files changed

+67
-30
lines changed

4 files changed

+67
-30
lines changed

cluster.go

Lines changed: 58 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package gocql
77
import (
88
"context"
99
"errors"
10+
"fmt"
1011
"net"
1112
"time"
1213
)
@@ -91,6 +92,24 @@ type ClusterConfig struct {
9192
// Default: 128 for older CQL versions
9293
MaxRequestsPerConn int
9394

95+
// Threshold for the number of inflight requests per connection
96+
// after which the connection is considered as heavy loaded
97+
// Default: 512
98+
HeavyLoadedConnectionThreshold int
99+
100+
// When a connection is considered as heavy loaded, the driver
101+
// could switch to the least loaded connection for the same node.
102+
// The switch will happen if the other connection is at least
103+
// HeavyLoadedSwitchConnectionPercentage percentage less busy
104+
// (in terms of inflight requests).
105+
//
106+
// For the default value of 20%, if the heavy loaded connection
107+
// has 100 inflight requests, the switch will happen only if the
108+
// least busy connection has less than 80 inflight requests.
109+
//
110+
// Default: 20%
111+
HeavyLoadedSwitchConnectionPercentage int
112+
94113
// Default consistency level.
95114
// Default: Quorum
96115
Consistency Consistency
@@ -275,23 +294,25 @@ type Dialer interface {
275294
// the same host, and will not mark the node being down or up from events.
276295
func NewCluster(hosts ...string) *ClusterConfig {
277296
cfg := &ClusterConfig{
278-
Hosts: hosts,
279-
CQLVersion: "3.0.0",
280-
Timeout: 11 * time.Second,
281-
ConnectTimeout: 11 * time.Second,
282-
Port: 9042,
283-
NumConns: 2,
284-
Consistency: Quorum,
285-
MaxPreparedStmts: defaultMaxPreparedStmts,
286-
MaxRoutingKeyInfo: 1000,
287-
PageSize: 5000,
288-
DefaultTimestamp: true,
289-
MaxWaitSchemaAgreement: 60 * time.Second,
290-
ReconnectInterval: 60 * time.Second,
291-
ConvictionPolicy: &SimpleConvictionPolicy{},
292-
ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
293-
SocketKeepalive: 15 * time.Second,
294-
WriteCoalesceWaitTime: 200 * time.Microsecond,
297+
Hosts: hosts,
298+
CQLVersion: "3.0.0",
299+
Timeout: 11 * time.Second,
300+
ConnectTimeout: 11 * time.Second,
301+
Port: 9042,
302+
NumConns: 2,
303+
Consistency: Quorum,
304+
MaxPreparedStmts: defaultMaxPreparedStmts,
305+
MaxRoutingKeyInfo: 1000,
306+
PageSize: 5000,
307+
DefaultTimestamp: true,
308+
MaxWaitSchemaAgreement: 60 * time.Second,
309+
ReconnectInterval: 60 * time.Second,
310+
ConvictionPolicy: &SimpleConvictionPolicy{},
311+
ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
312+
SocketKeepalive: 15 * time.Second,
313+
WriteCoalesceWaitTime: 200 * time.Microsecond,
314+
HeavyLoadedConnectionThreshold: 512,
315+
HeavyLoadedSwitchConnectionPercentage: 20,
295316
}
296317

297318
return cfg
@@ -329,6 +350,26 @@ func (cfg *ClusterConfig) filterHost(host *HostInfo) bool {
329350
return !(cfg.HostFilter == nil || cfg.HostFilter.Accept(host))
330351
}
331352

353+
func (cfg *ClusterConfig) Validate() error {
354+
if len(cfg.Hosts) == 0 {
355+
return ErrNoHosts
356+
}
357+
358+
if cfg.Authenticator != nil && cfg.AuthProvider != nil {
359+
return errors.New("Can't use both Authenticator and AuthProvider in cluster config.")
360+
}
361+
362+
if cfg.HeavyLoadedSwitchConnectionPercentage > 100 || cfg.HeavyLoadedSwitchConnectionPercentage < 0 {
363+
return fmt.Errorf("HeavyLoadedSwitchConnectionPercentage must be between 0 and 100, got %d", cfg.HeavyLoadedSwitchConnectionPercentage)
364+
}
365+
366+
if cfg.HeavyLoadedConnectionThreshold < 0 {
367+
return fmt.Errorf("HeavyLoadedConnectionThreshold must be greater than or equal to 0, got %d", cfg.HeavyLoadedConnectionThreshold)
368+
}
369+
370+
return nil
371+
}
372+
332373
var (
333374
ErrNoHosts = errors.New("no hosts provided")
334375
ErrNoConnectionsStarted = errors.New("no connections were made when creating the session")

conn.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1615,6 +1615,10 @@ func (c *Conn) AvailableStreams() int {
16151615
return c.streams.Available()
16161616
}
16171617

1618+
func (c *Conn) StreamsInUse() int {
1619+
return c.streams.InUse()
1620+
}
1621+
16181622
func (c *Conn) UseKeyspace(keyspace string) error {
16191623
q := &writeQueryFrame{statement: `USE "` + keyspace + `"`}
16201624
q.params.consistency = c.session.cons

scylla.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -413,15 +413,14 @@ func (p *scyllaConnPicker) maybeReplaceWithLessBusyConnection(c *Conn) *Conn {
413413
return c
414414
}
415415
alternative := p.leastBusyConn()
416-
if alternative == nil || alternative.AvailableStreams()*120 > c.AvailableStreams()*100 {
417-
return c
418-
} else {
416+
if alternative != nil && alternative.StreamsInUse()*100 >= c.StreamsInUse()*(100-c.session.cfg.HeavyLoadedSwitchConnectionPercentage) {
419417
return alternative
420418
}
419+
return c
421420
}
422421

423422
func isHeavyLoaded(c *Conn) bool {
424-
return c.streams.NumStreams/2 > c.AvailableStreams()
423+
return c.StreamsInUse() > c.session.cfg.HeavyLoadedConnectionThreshold
425424
}
426425

427426
func (p *scyllaConnPicker) leastBusyConn() *Conn {

session.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -116,16 +116,9 @@ func addrsToHosts(addrs []string, defaultPort int, logger StdLogger) ([]*HostInf
116116

117117
// NewSession wraps an existing Node.
118118
func NewSession(cfg ClusterConfig) (*Session, error) {
119-
// Check that hosts in the ClusterConfig is not empty
120-
if len(cfg.Hosts) < 1 {
121-
return nil, ErrNoHosts
119+
if err := cfg.Validate(); err != nil {
120+
return nil, fmt.Errorf("gocql: unable to create session: cluster config validation failed: %v", err)
122121
}
123-
124-
// Check that either Authenticator is set or AuthProvider, not both
125-
if cfg.Authenticator != nil && cfg.AuthProvider != nil {
126-
return nil, errors.New("Can't use both Authenticator and AuthProvider in cluster config.")
127-
}
128-
129122
// TODO: we should take a context in here at some point
130123
ctx, cancel := context.WithCancel(context.TODO())
131124

0 commit comments

Comments
 (0)