Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 50 additions & 21 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package gocql
import (
"context"
"errors"
"fmt"
"net"
"time"
)
Expand Down Expand Up @@ -93,6 +94,24 @@ type ClusterConfig struct {
// Default: 128 for older CQL versions
MaxRequestsPerConn int

// Threshold for the number of inflight requests per connection
// after which the connection is considered as heavy loaded
// Default: 512
HeavyLoadedConnectionThreshold int

// When a connection is considered as heavy loaded, the driver
// could switch to the least loaded connection for the same node.
// The switch will happen if the other connection is at least
// HeavyLoadedSwitchConnectionPercentage percentage less busy
// (in terms of inflight requests).
//
// For the default value of 20%, if the heavy loaded connection
// has 100 inflight requests, the switch will happen only if the
// least busy connection has less than 80 inflight requests.
//
// Default: 20%
HeavyLoadedSwitchConnectionPercentage int

// Default consistency level.
// Default: Quorum
Consistency Consistency
Expand Down Expand Up @@ -291,27 +310,29 @@ type Dialer interface {
// the same host, and will not mark the node being down or up from events.
func NewCluster(hosts ...string) *ClusterConfig {
cfg := &ClusterConfig{
Hosts: hosts,
CQLVersion: "3.0.0",
Timeout: 11 * time.Second,
ConnectTimeout: 11 * time.Second,
Port: 9042,
NumConns: 2,
Consistency: Quorum,
MaxPreparedStmts: defaultMaxPreparedStmts,
MaxRoutingKeyInfo: 1000,
PageSize: 5000,
DefaultTimestamp: true,
DriverName: defaultDriverName,
DriverVersion: defaultDriverVersion,
MaxWaitSchemaAgreement: 60 * time.Second,
ReconnectInterval: 60 * time.Second,
ConvictionPolicy: &SimpleConvictionPolicy{},
ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
InitialReconnectionPolicy: &NoReconnectionPolicy{},
SocketKeepalive: 15 * time.Second,
WriteCoalesceWaitTime: 200 * time.Microsecond,
MetadataSchemaRequestTimeout: 60 * time.Second,
Hosts: hosts,
CQLVersion: "3.0.0",
Timeout: 11 * time.Second,
ConnectTimeout: 11 * time.Second,
Port: 9042,
NumConns: 2,
Consistency: Quorum,
MaxPreparedStmts: defaultMaxPreparedStmts,
MaxRoutingKeyInfo: 1000,
PageSize: 5000,
DefaultTimestamp: true,
DriverName: defaultDriverName,
DriverVersion: defaultDriverVersion,
MaxWaitSchemaAgreement: 60 * time.Second,
ReconnectInterval: 60 * time.Second,
ConvictionPolicy: &SimpleConvictionPolicy{},
ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
InitialReconnectionPolicy: &NoReconnectionPolicy{},
SocketKeepalive: 15 * time.Second,
WriteCoalesceWaitTime: 200 * time.Microsecond,
MetadataSchemaRequestTimeout: 60 * time.Second,
HeavyLoadedConnectionThreshold: 512,
HeavyLoadedSwitchConnectionPercentage: 20,
}

return cfg
Expand Down Expand Up @@ -374,6 +395,14 @@ func (cfg *ClusterConfig) Validate() error {
return errors.New("ReconnectionPolicy.GetMaxRetries returns non-positive number")
}

if cfg.HeavyLoadedSwitchConnectionPercentage > 100 || cfg.HeavyLoadedSwitchConnectionPercentage < 0 {
return fmt.Errorf("HeavyLoadedSwitchConnectionPercentage must be between 0 and 100, got %d", cfg.HeavyLoadedSwitchConnectionPercentage)
}

if cfg.HeavyLoadedConnectionThreshold < 0 {
return fmt.Errorf("HeavyLoadedConnectionThreshold must be greater than or equal to 0, got %d", cfg.HeavyLoadedConnectionThreshold)
}

return nil
}

Expand Down
4 changes: 4 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1615,6 +1615,10 @@ func (c *Conn) AvailableStreams() int {
return c.streams.Available()
}

func (c *Conn) StreamsInUse() int {
return c.streams.InUse()
}

func (c *Conn) UseKeyspace(keyspace string) error {
q := &writeQueryFrame{statement: `USE "` + keyspace + `"`}
q.params.consistency = c.session.cons
Expand Down
5 changes: 5 additions & 0 deletions internal/streams/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,8 @@ func (s *IDGenerator) Available() int {
func (s *IDGenerator) InUse() int {
return int(atomic.LoadInt32(&s.inuseStreams))
}

// SetStreamsInUse sets streams in use counter, to be used for testing only
func SetStreamsInUse(s *IDGenerator, val int32) {
atomic.StoreInt32(&s.inuseStreams, val)
}
7 changes: 3 additions & 4 deletions scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,15 +416,14 @@ func (p *scyllaConnPicker) maybeReplaceWithLessBusyConnection(c *Conn) *Conn {
return c
}
alternative := p.leastBusyConn()
if alternative == nil || alternative.AvailableStreams()*120 > c.AvailableStreams()*100 {
return c
} else {
if alternative != nil && alternative.StreamsInUse()*100 <= c.StreamsInUse()*(100-c.session.cfg.HeavyLoadedSwitchConnectionPercentage) {
return alternative
}
return c
}

func isHeavyLoaded(c *Conn) bool {
return c.streams.NumStreams/2 > c.AvailableStreams()
return c.StreamsInUse() > c.session.cfg.HeavyLoadedConnectionThreshold
}

func (p *scyllaConnPicker) leastBusyConn() *Conn {
Expand Down
71 changes: 71 additions & 0 deletions scylla_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,76 @@ func TestScyllaConnPickerHammerPickNilToken(t *testing.T) {
wg.Wait()
}

func TestScyllaConnPicker(t *testing.T) {
t.Parallel()

t.Run("maybeReplaceWithLessBusyConnection", func(t *testing.T) {

cfg := ClusterConfig{
HeavyLoadedSwitchConnectionPercentage: 30,
HeavyLoadedConnectionThreshold: 100,
}

tcases := []struct {
name string
streamsInUse [3]int32
expected int
}{
{
name: "all connections below threshold",
streamsInUse: [3]int32{99, 98, 97},
expected: 0,
},
{
name: "all connections in threshold, but none is switchable",
streamsInUse: [3]int32{110, 109, 108},
expected: 0,
},
{
name: "all connections in threshold, one is below threshold",
streamsInUse: [3]int32{110, 109, 70},
expected: 2,
},
{
name: "all connections in threshold, one is above threshold, but below switchable percentage",
streamsInUse: [3]int32{210, 130, 209},
expected: 1,
},
}

for _, tcase := range tcases {
t.Run(tcase.name, func(t *testing.T) {
s := scyllaConnPicker{
nrShards: 4,
msbIgnore: 12,
}

conns := [3]*Conn{
mockConn(0),
mockConn(1),
mockConn(2),
}

for _, conn := range conns {
conn.session.cfg = cfg
s.Put(conn)
}

for id, inUse := range tcase.streamsInUse {
streams.SetStreamsInUse(conns[id].streams, inUse)
}

expectedConn := conns[tcase.expected]

c := s.maybeReplaceWithLessBusyConnection(conns[0])
if c != expectedConn {
t.Errorf("expected connection from shard %d, got %d", expectedConn.scyllaSupported.shard, c.scyllaSupported.shard)
}
})
}
})
}

func TestScyllaConnPickerRemove(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -135,6 +205,7 @@ func mockConn(shard int) *Conn {
partitioner: "org.apache.cassandra.dht.Murmur3Partitioner",
shardingAlgorithm: "biased-token-round-robin",
},
session: &Session{},
}
}

Expand Down
3 changes: 1 addition & 2 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,8 @@ func addrsToHosts(addrs []string, defaultPort int, logger StdLogger) ([]*HostInf
// NewSession wraps an existing Node.
func NewSession(cfg ClusterConfig) (*Session, error) {
if err := cfg.Validate(); err != nil {
return nil, err
return nil, fmt.Errorf("gocql: unable to create session: cluster config validation failed: %v", err)
}

// TODO: we should take a context in here at some point
ctx, cancel := context.WithCancel(context.TODO())

Expand Down