Skip to content

Commit 71fba10

Browse files
committed
Make heavy loaded optimization configurable
Scylla Go Driver has a capability to avoid sending requests to an overloaded shard, instead sending the request on a different connection (at the same node). This change makes it possible to customize the parameters used to determine when this behavior would kick in.
1 parent 8a812b2 commit 71fba10

File tree

6 files changed

+143
-30
lines changed

6 files changed

+143
-30
lines changed

cluster.go

Lines changed: 58 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package gocql
77
import (
88
"context"
99
"errors"
10+
"fmt"
1011
"net"
1112
"time"
1213
)
@@ -91,6 +92,24 @@ type ClusterConfig struct {
9192
// Default: 128 for older CQL versions
9293
MaxRequestsPerConn int
9394

95+
// Threshold for the number of inflight requests per connection
96+
// after which the connection is considered as heavy loaded
97+
// Default: 512
98+
HeavyLoadedConnectionThreshold int
99+
100+
// When a connection is considered as heavy loaded, the driver
101+
// could switch to the least loaded connection for the same node.
102+
// The switch will happen if the other connection is at least
103+
// HeavyLoadedSwitchConnectionPercentage percentage less busy
104+
// (in terms of inflight requests).
105+
//
106+
// For the default value of 20%, if the heavy loaded connection
107+
// has 100 inflight requests, the switch will happen only if the
108+
// least busy connection has less than 80 inflight requests.
109+
//
110+
// Default: 20%
111+
HeavyLoadedSwitchConnectionPercentage int
112+
94113
// Default consistency level.
95114
// Default: Quorum
96115
Consistency Consistency
@@ -275,23 +294,25 @@ type Dialer interface {
275294
// the same host, and will not mark the node being down or up from events.
276295
func NewCluster(hosts ...string) *ClusterConfig {
277296
cfg := &ClusterConfig{
278-
Hosts: hosts,
279-
CQLVersion: "3.0.0",
280-
Timeout: 11 * time.Second,
281-
ConnectTimeout: 11 * time.Second,
282-
Port: 9042,
283-
NumConns: 2,
284-
Consistency: Quorum,
285-
MaxPreparedStmts: defaultMaxPreparedStmts,
286-
MaxRoutingKeyInfo: 1000,
287-
PageSize: 5000,
288-
DefaultTimestamp: true,
289-
MaxWaitSchemaAgreement: 60 * time.Second,
290-
ReconnectInterval: 60 * time.Second,
291-
ConvictionPolicy: &SimpleConvictionPolicy{},
292-
ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
293-
SocketKeepalive: 15 * time.Second,
294-
WriteCoalesceWaitTime: 200 * time.Microsecond,
297+
Hosts: hosts,
298+
CQLVersion: "3.0.0",
299+
Timeout: 11 * time.Second,
300+
ConnectTimeout: 11 * time.Second,
301+
Port: 9042,
302+
NumConns: 2,
303+
Consistency: Quorum,
304+
MaxPreparedStmts: defaultMaxPreparedStmts,
305+
MaxRoutingKeyInfo: 1000,
306+
PageSize: 5000,
307+
DefaultTimestamp: true,
308+
MaxWaitSchemaAgreement: 60 * time.Second,
309+
ReconnectInterval: 60 * time.Second,
310+
ConvictionPolicy: &SimpleConvictionPolicy{},
311+
ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
312+
SocketKeepalive: 15 * time.Second,
313+
WriteCoalesceWaitTime: 200 * time.Microsecond,
314+
HeavyLoadedConnectionThreshold: 512,
315+
HeavyLoadedSwitchConnectionPercentage: 20,
295316
}
296317

297318
return cfg
@@ -329,6 +350,26 @@ func (cfg *ClusterConfig) filterHost(host *HostInfo) bool {
329350
return !(cfg.HostFilter == nil || cfg.HostFilter.Accept(host))
330351
}
331352

353+
func (cfg *ClusterConfig) Validate() error {
354+
if len(cfg.Hosts) == 0 {
355+
return ErrNoHosts
356+
}
357+
358+
if cfg.Authenticator != nil && cfg.AuthProvider != nil {
359+
return errors.New("Can't use both Authenticator and AuthProvider in cluster config.")
360+
}
361+
362+
if cfg.HeavyLoadedSwitchConnectionPercentage > 100 || cfg.HeavyLoadedSwitchConnectionPercentage < 0 {
363+
return fmt.Errorf("HeavyLoadedSwitchConnectionPercentage must be between 0 and 100, got %d", cfg.HeavyLoadedSwitchConnectionPercentage)
364+
}
365+
366+
if cfg.HeavyLoadedConnectionThreshold < 0 {
367+
return fmt.Errorf("HeavyLoadedConnectionThreshold must be greater than or equal to 0, got %d", cfg.HeavyLoadedConnectionThreshold)
368+
}
369+
370+
return nil
371+
}
372+
332373
var (
333374
ErrNoHosts = errors.New("no hosts provided")
334375
ErrNoConnectionsStarted = errors.New("no connections were made when creating the session")

conn.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1615,6 +1615,10 @@ func (c *Conn) AvailableStreams() int {
16151615
return c.streams.Available()
16161616
}
16171617

1618+
func (c *Conn) StreamsInUse() int {
1619+
return c.streams.InUse()
1620+
}
1621+
16181622
func (c *Conn) UseKeyspace(keyspace string) error {
16191623
q := &writeQueryFrame{statement: `USE "` + keyspace + `"`}
16201624
q.params.consistency = c.session.cons

internal/streams/streams.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,3 +149,8 @@ func (s *IDGenerator) Available() int {
149149
func (s *IDGenerator) InUse() int {
150150
return int(atomic.LoadInt32(&s.inuseStreams))
151151
}
152+
153+
// SetStreamsInUse sets streams in use counter, to be used for testing only
154+
func SetStreamsInUse(s *IDGenerator, val int32) {
155+
atomic.StoreInt32(&s.inuseStreams, val)
156+
}

scylla.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -416,15 +416,14 @@ func (p *scyllaConnPicker) maybeReplaceWithLessBusyConnection(c *Conn) *Conn {
416416
return c
417417
}
418418
alternative := p.leastBusyConn()
419-
if alternative == nil || alternative.AvailableStreams()*120 > c.AvailableStreams()*100 {
420-
return c
421-
} else {
419+
if alternative != nil && alternative.StreamsInUse()*100 <= c.StreamsInUse()*(100-c.session.cfg.HeavyLoadedSwitchConnectionPercentage) {
422420
return alternative
423421
}
422+
return c
424423
}
425424

426425
func isHeavyLoaded(c *Conn) bool {
427-
return c.streams.NumStreams/2 > c.AvailableStreams()
426+
return c.StreamsInUse() > c.session.cfg.HeavyLoadedConnectionThreshold
428427
}
429428

430429
func (p *scyllaConnPicker) leastBusyConn() *Conn {

scylla_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,76 @@ func TestScyllaConnPickerHammerPickNilToken(t *testing.T) {
9999
wg.Wait()
100100
}
101101

102+
func TestScyllaConnPicker(t *testing.T) {
103+
t.Parallel()
104+
105+
t.Run("maybeReplaceWithLessBusyConnection", func(t *testing.T) {
106+
107+
cfg := ClusterConfig{
108+
HeavyLoadedSwitchConnectionPercentage: 30,
109+
HeavyLoadedConnectionThreshold: 100,
110+
}
111+
112+
tcases := []struct {
113+
name string
114+
streamsInUse [3]int32
115+
expected int
116+
}{
117+
{
118+
name: "all connections below threshold",
119+
streamsInUse: [3]int32{99, 98, 97},
120+
expected: 0,
121+
},
122+
{
123+
name: "all connections in threshold, but none is switchable",
124+
streamsInUse: [3]int32{110, 109, 108},
125+
expected: 0,
126+
},
127+
{
128+
name: "all connections in threshold, one is below threshold",
129+
streamsInUse: [3]int32{110, 109, 70},
130+
expected: 2,
131+
},
132+
{
133+
name: "all connections in threshold, one is above threshold, but below switchable percentage",
134+
streamsInUse: [3]int32{210, 130, 209},
135+
expected: 1,
136+
},
137+
}
138+
139+
for _, tcase := range tcases {
140+
t.Run(tcase.name, func(t *testing.T) {
141+
s := scyllaConnPicker{
142+
nrShards: 4,
143+
msbIgnore: 12,
144+
}
145+
146+
conns := [3]*Conn{
147+
mockConn(0),
148+
mockConn(1),
149+
mockConn(2),
150+
}
151+
152+
for _, conn := range conns {
153+
conn.session.cfg = cfg
154+
s.Put(conn)
155+
}
156+
157+
for id, inUse := range tcase.streamsInUse {
158+
streams.SetStreamsInUse(conns[id].streams, inUse)
159+
}
160+
161+
expectedConn := conns[tcase.expected]
162+
163+
c := s.maybeReplaceWithLessBusyConnection(conns[0])
164+
if c != expectedConn {
165+
t.Errorf("expected connection from shard %d, got %d", expectedConn.scyllaSupported.shard, c.scyllaSupported.shard)
166+
}
167+
})
168+
}
169+
})
170+
}
171+
102172
func TestScyllaConnPickerRemove(t *testing.T) {
103173
t.Parallel()
104174

@@ -135,6 +205,7 @@ func mockConn(shard int) *Conn {
135205
partitioner: "org.apache.cassandra.dht.Murmur3Partitioner",
136206
shardingAlgorithm: "biased-token-round-robin",
137207
},
208+
session: &Session{},
138209
}
139210
}
140211

session.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -116,16 +116,9 @@ func addrsToHosts(addrs []string, defaultPort int, logger StdLogger) ([]*HostInf
116116

117117
// NewSession wraps an existing Node.
118118
func NewSession(cfg ClusterConfig) (*Session, error) {
119-
// Check that hosts in the ClusterConfig is not empty
120-
if len(cfg.Hosts) < 1 {
121-
return nil, ErrNoHosts
119+
if err := cfg.Validate(); err != nil {
120+
return nil, fmt.Errorf("gocql: unable to create session: cluster config validation failed: %v", err)
122121
}
123-
124-
// Check that either Authenticator is set or AuthProvider, not both
125-
if cfg.Authenticator != nil && cfg.AuthProvider != nil {
126-
return nil, errors.New("Can't use both Authenticator and AuthProvider in cluster config.")
127-
}
128-
129122
// TODO: we should take a context in here at some point
130123
ctx, cancel := context.WithCancel(context.TODO())
131124

0 commit comments

Comments
 (0)