Skip to content

Commit 3064aae

Browse files
author
Michael Penick
committed
Added retry policy logic and a couple implementations
1 parent 68fd488 commit 3064aae

30 files changed

+446
-98
lines changed

include/cassandra.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,7 @@ typedef struct CassMetrics_ {
377377
} CassMetrics;
378378

379379
typedef enum CassConsistency_ {
380+
CASS_CONSISTENCY_UNKNOWN = 0xFFFF,
380381
CASS_CONSISTENCY_ANY = 0x0000,
381382
CASS_CONSISTENCY_ONE = 0x0001,
382383
CASS_CONSISTENCY_TWO = 0x0002,

src/auth_requests.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
namespace cass {
2020

21-
int CredentialsRequest::encode(int version, BufferVec* bufs, EncodingCache* cache) const {
21+
int CredentialsRequest::encode(int version, Handler* handler, BufferVec* bufs) const {
2222
if (version != 1) {
2323
return -1;
2424
}
@@ -39,7 +39,7 @@ int CredentialsRequest::encode(int version, BufferVec* bufs, EncodingCache* cach
3939
return length;
4040
}
4141

42-
int AuthResponseRequest::encode(int version, BufferVec* bufs, EncodingCache* cache) const {
42+
int AuthResponseRequest::encode(int version, Handler* handler, BufferVec* bufs) const {
4343
if (version < 2) {
4444
return -1;
4545
}

src/auth_requests.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class CredentialsRequest : public Request {
3131
, credentials_(credentials) {}
3232

3333
private:
34-
int encode(int version, BufferVec* bufs, EncodingCache* cache) const;
34+
int encode(int version, Handler* handler, BufferVec* bufs) const;
3535

3636
private:
3737
V1Authenticator::Credentials credentials_;
@@ -47,7 +47,7 @@ class AuthResponseRequest : public Request {
4747
ScopedPtr<Authenticator>& auth() { return auth_; }
4848

4949
private:
50-
int encode(int version, BufferVec* bufs, EncodingCache* cache) const;
50+
int encode(int version, Handler* handler, BufferVec* bufs) const;
5151

5252
private:
5353
std::string token_;

src/batch_request.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ CassError cass_batch_add_statement(CassBatch* batch, CassStatement* statement) {
4949

5050
namespace cass {
5151

52-
int BatchRequest::encode(int version, BufferVec* bufs, EncodingCache* cache) const {
52+
int BatchRequest::encode(int version, Handler* handler, BufferVec* bufs) const {
5353
int length = 0;
5454
uint8_t flags = 0;
5555

@@ -76,7 +76,7 @@ int BatchRequest::encode(int version, BufferVec* bufs, EncodingCache* cache) con
7676
if (statement->has_names_for_values()) {
7777
return ENCODE_ERROR_BATCH_WITH_NAMED_VALUES;
7878
}
79-
int32_t result = (*i)->encode_batch(version, bufs, cache);
79+
int32_t result = (*i)->encode_batch(version, bufs, handler->encoding_cache());
8080
if (result < 0) {
8181
return result;
8282
}

src/batch_request.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class BatchRequest : public RoutableRequest {
5555
virtual bool get_routing_key(std::string* routing_key, EncodingCache* cache) const;
5656

5757
private:
58-
int encode(int version, BufferVec* bufs, EncodingCache* cache) const;
58+
int encode(int version, Handler* handler, BufferVec* bufs) const;
5959

6060
private:
6161
typedef std::map<std::string, ExecuteRequest*> PreparedMap;

src/config.hpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "cassandra.h"
2222
#include "dc_aware_policy.hpp"
2323
#include "latency_aware_policy.hpp"
24+
#include "retry_policy.hpp"
2425
#include "ssl.hpp"
2526
#include "token_aware_policy.hpp"
2627

@@ -63,7 +64,8 @@ class Config {
6364
, latency_aware_routing_(false)
6465
, tcp_nodelay_enable_(false)
6566
, tcp_keepalive_enable_(false)
66-
, tcp_keepalive_delay_secs_(0) {}
67+
, tcp_keepalive_delay_secs_(0)
68+
, retry_policy_(new DefaultRetryPolicy()) { }
6769

6870
unsigned thread_count_io() const { return thread_count_io_; }
6971

@@ -270,6 +272,15 @@ class Config {
270272
tcp_keepalive_delay_secs_ = delay_secs;
271273
}
272274

275+
RetryPolicy* retry_policy() const {
276+
return retry_policy_.get();
277+
}
278+
279+
void set_retry_policy(RetryPolicy* retry_policy) {
280+
if (retry_policy == NULL) return;
281+
retry_policy_.reset(retry_policy);
282+
}
283+
273284
private:
274285
int port_;
275286
int protocol_version_;
@@ -302,6 +313,7 @@ class Config {
302313
bool tcp_nodelay_enable_;
303314
bool tcp_keepalive_enable_;
304315
unsigned tcp_keepalive_delay_secs_;
316+
SharedRefPtr<RetryPolicy> retry_policy_;
305317
};
306318

307319
} // namespace cass

src/connection.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,14 @@ void Connection::StartupHandler::on_set(ResponseMessage* response) {
7676
ErrorResponse* error
7777
= static_cast<ErrorResponse*>(response->response_body().get());
7878
if (error->code() == CQL_ERROR_PROTOCOL_ERROR &&
79-
error->message().find("Invalid or unsupported protocol version") != std::string::npos) {
79+
error->message().to_string().find("Invalid or unsupported protocol version") != std::string::npos) {
8080
connection_->is_invalid_protocol_ = true;
8181
LOG_WARN("Host %s received invalid protocol response %s",
8282
connection_->addr_string_.c_str(), error->error_message().c_str());
8383
connection_->defunct();
8484
return;
8585
} else if (error->code() == CQL_ERROR_BAD_CREDENTIALS) {
86-
connection_->auth_error_ = error->message();
86+
connection_->auth_error_ = error->message().to_string();
8787
}
8888
connection_->notify_error("Received error response " + error->error_message());
8989
break;

src/error_response.cpp

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,51 @@ namespace cass {
2626

2727
std::string ErrorResponse::error_message() const {
2828
std::ostringstream ss;
29-
ss << "'" << message() << "'"
29+
ss << "'" << message().to_string() << "'"
3030
<< " (0x" << std::hex << std::uppercase << std::setw(8) << std::setfill('0')
3131
<< CASS_ERROR(CASS_ERROR_SOURCE_SERVER, code()) << ")";
3232
return ss.str();
3333
}
3434

3535
bool ErrorResponse::decode(int version, char* buffer, size_t size) {
36+
uint8_t data_present;
37+
StringRef write_type;
38+
3639
char* pos = decode_int32(buffer, code_);
37-
pos = decode_string(pos, &message_, message_size_);
40+
pos = decode_string_ref(pos, &message_);
41+
3842
switch (code_) {
43+
case CQL_ERROR_UNAVAILABLE:
44+
pos = decode_uint16(pos, cl_);
45+
pos = decode_int32(pos, received_);
46+
pos = decode_int32(pos, alive_);
47+
break;
48+
case CQL_ERROR_READ_TIMEOUT:
49+
pos = decode_uint16(pos, cl_);
50+
pos = decode_int32(pos, received_);
51+
pos = decode_int32(pos, required_);
52+
decode_byte(pos, data_present);
53+
data_present_ = data_present > 0;
54+
break;
55+
case CQL_ERROR_WRITE_TIMEOUT:
56+
pos = decode_uint16(pos, cl_);
57+
pos = decode_int32(pos, received_);
58+
pos = decode_int32(pos, required_);
59+
decode_string_ref(pos, &write_type);
60+
if (write_type == "SIMPLE") {
61+
write_type_ = RetryPolicy::SIMPLE;
62+
} else if(write_type == "BATCH") {
63+
write_type_ = RetryPolicy::BATCH;
64+
} else if(write_type == "UNLOGGED_BATCH") {
65+
write_type_ = RetryPolicy::UNLOGGED_BATCH;
66+
} else if(write_type == "COUNTER") {
67+
write_type_ = RetryPolicy::COUNTER;
68+
} else if(write_type == "BATCH_LOG") {
69+
write_type_ = RetryPolicy::BATCH_LOG;
70+
}
71+
break;
3972
case CQL_ERROR_UNPREPARED:
40-
decode_string(pos, &prepared_id_, prepared_id_size_);
73+
decode_string_ref(pos, &prepared_id_);
4174
break;
4275
}
4376
return true;

src/error_response.hpp

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
#include "response.hpp"
2121
#include "constants.hpp"
2222
#include "scoped_ptr.hpp"
23+
#include "string_ref.hpp"
24+
#include "retry_policy.hpp"
2325

2426
#include <uv.h>
2527

@@ -32,40 +34,38 @@ class ErrorResponse : public Response {
3234
public:
3335
ErrorResponse()
3436
: Response(CQL_OPCODE_ERROR)
35-
, code_(0xFFFFFFFF)
36-
, message_(NULL)
37-
, message_size_(0)
38-
, prepared_id_(NULL)
39-
, prepared_id_size_(0) {}
40-
41-
ErrorResponse(int32_t code, const char* input, size_t input_size)
42-
: Response(CQL_OPCODE_ERROR)
43-
, guard(new char[input_size])
44-
, code_(code)
45-
, message_(guard.get())
46-
, message_size_(input_size) {
47-
memcpy(message_, input, input_size);
48-
}
37+
, code_(0xFFFFFFFF) { }
4938

5039
int32_t code() const { return code_; }
40+
StringRef message() const { return message_; }
5141

52-
std::string prepared_id() const {
53-
return std::string(prepared_id_, prepared_id_size_);
54-
}
42+
StringRef prepared_id() const { return prepared_id_; }
5543

56-
std::string message() const { return std::string(message_, message_size_); }
44+
CassConsistency consistency() const {
45+
return static_cast<CassConsistency>(cl_);
46+
}
47+
int32_t received() const { return received_; }
48+
int32_t required() const { return required_; }
49+
int32_t alive() const { return alive_; }
50+
bool data_present() const { return data_present_; }
51+
RetryPolicy::WriteType write_type() const { return write_type_; }
5752

5853
std::string error_message() const;
5954

6055
bool decode(int version, char* buffer, size_t size);
6156

6257
private:
63-
ScopedPtr<char> guard;
6458
int32_t code_;
65-
char* message_;
66-
size_t message_size_;
67-
char* prepared_id_;
68-
size_t prepared_id_size_;
59+
StringRef message_;
60+
61+
StringRef prepared_id_;
62+
63+
uint16_t cl_;
64+
int32_t received_;
65+
int32_t required_;
66+
int32_t alive_;
67+
bool data_present_;
68+
RetryPolicy::WriteType write_type_;
6969
};
7070

7171
bool check_error_or_invalid_response(const std::string& prefix, uint8_t expected_opcode,

src/execute_request.cpp

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
#include "execute_request.hpp"
1818

19+
#include "handler.hpp"
20+
1921
namespace cass {
2022

2123
int32_t ExecuteRequest::encode_batch(int version, BufferVec* bufs, EncodingCache* cache) const {
@@ -40,15 +42,15 @@ int32_t ExecuteRequest::encode_batch(int version, BufferVec* bufs, EncodingCache
4042
return length;
4143
}
4244

43-
int ExecuteRequest::encode(int version, BufferVec* bufs, EncodingCache* cache) const {
45+
int ExecuteRequest::encode(int version, Handler* handler, BufferVec* bufs) const {
4446
if (version == 1) {
45-
return internal_encode_v1(bufs, cache);
47+
return internal_encode_v1(handler, bufs);
4648
} else {
47-
return internal_encode(version, bufs, cache);
49+
return internal_encode(version, handler, bufs);
4850
}
4951
}
5052

51-
int ExecuteRequest::internal_encode_v1(BufferVec* bufs, EncodingCache* cache) const {
53+
int ExecuteRequest::internal_encode_v1(Handler* handler, BufferVec* bufs) const {
5254
size_t length = 0;
5355
const int version = 1;
5456

@@ -68,23 +70,23 @@ int ExecuteRequest::internal_encode_v1(BufferVec* bufs, EncodingCache* cache) co
6870
prepared_id.size());
6971
buf.encode_uint16(pos, elements_count());
7072
// <value_1>...<value_n>
71-
length += copy_buffers(version, bufs, cache);
73+
length += copy_buffers(version, bufs, handler->encoding_cache());
7274
}
7375

7476
{
7577
// <consistency> [short]
7678
size_t buf_size = sizeof(uint16_t);
7779

7880
Buffer buf(buf_size);
79-
buf.encode_uint16(0, consistency());
81+
buf.encode_uint16(0, handler->consistency());
8082
bufs->push_back(buf);
8183
length += buf_size;
8284
}
8385

8486
return length;
8587
}
8688

87-
int ExecuteRequest::internal_encode(int version, BufferVec* bufs, EncodingCache* cache) const {
89+
int ExecuteRequest::internal_encode(int version, Handler* handler, BufferVec* bufs) const {
8890
int length = 0;
8991
uint8_t flags = this->flags();
9092

@@ -123,12 +125,12 @@ int ExecuteRequest::internal_encode(int version, BufferVec* bufs, EncodingCache*
123125
size_t pos = buf.encode_string(0,
124126
prepared_id.data(),
125127
prepared_id.size());
126-
pos = buf.encode_uint16(pos, consistency());
128+
pos = buf.encode_uint16(pos, handler->consistency());
127129
pos = buf.encode_byte(pos, flags);
128130

129131
if (elements_count() > 0) {
130132
buf.encode_uint16(pos, elements_count());
131-
length += copy_buffers(version, bufs, cache);
133+
length += copy_buffers(version, bufs, handler->encoding_cache());
132134
}
133135
}
134136

0 commit comments

Comments
 (0)