This repository contains go implementation of a Couchbase Database Change Protocol (DCP) client. You can find more information by looking at our docs.
- Our main goal is to build a dcp client for faster and stateful systems. We're already using this repository in below implementations:
 
package main
import (
  "github.com/Trendyol/go-dcp"
  "github.com/Trendyol/go-dcp/logger"
  "github.com/Trendyol/go-dcp/models"
)
func listener(ctx *models.ListenerContext) {
  switch event := ctx.Event.(type) {
  case models.DcpMutation:
    logger.Log.Info(
      "mutated(vb=%v,eventTime=%v) | id: %v, value: %v | isCreated: %v",
      event.VbID, event.EventTime, string(event.Key), string(event.Value), event.IsCreated(),
    )
  case models.DcpDeletion:
    logger.Log.Info(
      "deleted(vb=%v,eventTime=%v) | id: %v",
      event.VbID, event.EventTime, string(event.Key),
    )
  case models.DcpExpiration:
    logger.Log.Info(
      "expired(vb=%v,eventTime=%v) | id: %v",
      event.VbID, event.EventTime, string(event.Key),
    )
  }
  ctx.Ack()
}
func main() {
  connector, err := dcp.NewDcp("config.yml", listener)
  if err != nil {
    panic(err)
  }
  defer connector.Close()
  connector.Start()
}$ go get github.com/Trendyol/go-dcp
| Variable | Type | Required | Default | Description | 
|---|---|---|---|---|
hosts | 
[]string | yes | - | Couchbase host like localhost:8091. | 
username | 
string | yes | - | Couchbase username. | 
password | 
string | yes | - | Couchbase password. | 
bucketName | 
string | yes | - | Couchbase DCP bucket. | 
dcp.group.name | 
string | yes | DCP group name for vbuckets. | |
scopeName | 
string | no | _default | Couchbase scope name. | 
collectionNames | 
[]string | no | _default | Couchbase collection names. | 
connectionBufferSize | 
uint, string | no | 20mb | Source Bucket tcp connection buffer size (x Node Count). Check this if you get OOM Killed. | 
maxQueueSize | 
int | no | 2048 | The maximum number of requests that can be queued waiting to be sent to a node. Check this if you get queue overflowed or queue full. | 
connectionTimeout | 
time.Duration | no | 1m | Couchbase connection timeout. | 
secureConnection | 
bool | no | false | Enable TLS connection of Couchbase. | 
rootCAPath | 
string | no | *not set | if secureConnection set true this field is required. | 
debug | 
bool | no | false | For debugging purpose. | 
dcp.bufferSize | 
int | no | 16mb | DCP internal queue buffer size (x Node Count). Check this if you get OOM Killed. | 
dcp.mode | 
string | no | infinite | Set DCP mode finite If you want to listen to DCP events until now. Set DCP mode infinite If you want to listen to DCP events infinitely. | 
dcp.connectionBufferSize | 
uint, string | no | 20mb | DCP tcp connection buffer size (x Node Count). Check this if you get OOM Killed. | 
dcp.connectionTimeout | 
time.Duration | no | 1m | DCP connection timeout. | 
dcp.maxQueueSize | 
int | no | 2048 | The maximum number of requests that can be queued waiting to be sent to a node. Check this if you get queue overflowed or queue full. | 
dcp.listener.skipUntil | 
time.Time | no | Set this if you want to skip events until certain time. | |
dcp.group.membership.type | 
string | no | DCP membership types. couchbase, kubernetesHa, kubernetesStatefulSet, static, dynamic or lilCouchbase. Check examples for details. | 
|
dcp.group.membership.memberNumber | 
int | no | 1 | Set this if membership is static. Other methods will ignore this field. | 
dcp.group.membership.totalMembers | 
int | no | 1 | Set this if membership is static or kubernetesStatefulSet. Other methods will ignore this field. | 
dcp.group.membership.rebalanceDelay | 
time.Duration | no | 30s | Works for autonomous mode. If membership is dynamic, it is ignored and set to 0s. | 
dcp.group.membership.config | 
map[string]string | no | *not set | Set key-values of config. expirySeconds,heartbeatInterval,heartbeatToleranceDuration,monitorInterval,timeout for couchbase type | 
dcp.config.disableChangeStreams | 
bool | no | false | Set this to true if you did not want to get older versions of changes for Couchbase Server 7.2.0+ using Magma storage buckets | 
leaderElection.enabled | 
bool | no | false | Set this true for memberships  kubernetesHa. | 
leaderElection.type | 
string | no | kubernetes | Leader Election types. kubernetes | 
leaderElection.config | 
map[string]string | no | *not set | Set key-values of config. leaseLockName,leaseLockNamespace, leaseDuration, renewDeadline, retryPeriod for kubernetes type. | 
leaderElection.rpc.port | 
int | no | 8081 | This field is usable for kubernetesStatefulSet membership. | 
checkpoint.type | 
string | no | auto | Set checkpoint type auto or manual. | 
checkpoint.autoReset | 
string | no | earliest | Set checkpoint start point to earliest or latest. | 
checkpoint.interval | 
time.Duration | no | 1m | Checkpoint checking interval. | 
checkpoint.timeout | 
time.Duration | no | 1m | Checkpoint checking timeout. | 
healthCheck.disabled | 
bool | no | false | Disable Couchbase connection health check. | 
healthCheck.interval | 
time.Duration | no | 1m | Couchbase connection health checking interval duration. | 
healthCheck.timeout | 
time.Duration | no | 1m | Couchbase connection health checking timeout duration. | 
rollbackMitigation.disabled | 
bool | no | false | Disable reprocessing for roll-backed Vbucket offsets. | 
rollbackMitigation.interval | 
time.Duration | no | 1s | Persisted sequence numbers polling interval. | 
rollbackMitigation.configWatchInterval | 
time.Duration | no | 10s | Cluster config changes listener interval. | 
metadata.type | 
string | no | couchbase | Metadata storing types.  file, couchbase, noop. | 
metadata.readOnly | 
bool | no | false | Set this for debugging state purposes. | 
metadata.config | 
map[string]string | no | *not set | Set key-values of config. hosts, username, password, bucket,scope,collection,maxQueueSize,connectionBufferSize 5mb is default (x Node Count),connectionTimeout, secureConnection, rootCAPath for couchbase type | 
api.disabled | 
bool | no | false | Disable metric endpoints | 
api.port | 
int | no | 8080 | Set API port | 
metric.path | 
string | no | /metrics | Set metric endpoint path. | 
logging.level | 
string | no | info | Set logging level. | 
These environment variables will overwrite the corresponding configs.
| Variable | Type | Corresponding Config | Description | 
|---|---|---|---|
GO_DCP__DCP_GROUP_MEMBERSHIP_MEMBERNUMBER | 
int | dcp.group.membership.memberNumber | To be able to prevent making deployment to scale up or down. | 
GO_DCP__DCP_GROUP_MEMBERSHIP_TOTALMEMBERS | 
int | dcp.group.membership.totalMembers | To be able to prevent making deployment to scale up or down. | 
The client offers an API that handles different endpoints and expose several metrics.
| Endpoint | Description | Debug Mode | Body | 
|---|---|---|---|
GET /status | 
Returns a 200 OK status if the client is able to ping the couchbase server successfully. | ||
GET /rebalance | 
Triggers a rebalance operation for the vBuckets. | ||
GET /states/offset | 
Returns the current offsets for each vBucket. | x | |
GET /states/followers | 
Returns the list of follower clients if service discovery enabled | x | |
GET /debug/pprof/* | 
Fiber Pprof | x | |
PUT /membership/info | 
Updates membership info and applies rebalance. | {"memberNumber": 1,"totalMembers": 3 } | 
The Client collects relevant metrics and makes them available at /metrics endpoint. In case you haven't configured a metric.path, the metrics will be exposed at the /metrics.
| Metric Name | Description | Labels | Value Type | 
|---|---|---|---|
| cbgo_mutation_total | The total number of mutations on a specific vBucket | vbId: ID of the vBucket | Counter | 
| cbgo_deletion_total | The total number of deletions on a specific vBucket | vbId: ID of the vBucket | Counter | 
| cbgo_expiration_total | The total number of expirations on a specific vBucket | vbId: ID of the vBucket | Counter | 
| cbgo_agent_queue_current | The current number of agent queue | address: Couchbase, is dcp: Is Dcp Agent | Gauge | 
| cbgo_agent_queue_max | The max number of agent queue | address: Couchbase, is dcp: Is Dcp Agent | Gauge | 
| cbgo_seq_no_current | The current sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge | 
| cbgo_start_seq_no_current | The starting sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge | 
| cbgo_end_seq_no_current | The ending sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge | 
| cbgo_persist_seq_no_current | The persist sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge | 
| cbgo_lag_current | The current lag on a specific vBucket | vbId: ID of the vBucket | Gauge | 
| cbgo_total_lag_current | The current total lag | N/A | Gauge | 
| cbgo_process_latency_ms_current | The latest process latency in milliseconds | N/A | Gauge | 
| cbgo_dcp_latency_ms_current | The latest consumed dcp message latency in milliseconds | N/A | Counter | 
| cbgo_rebalance_current | The number of total rebalance | N/A | Counter | 
| cbgo_active_stream_current | The number of total active stream | N/A | Gauge | 
| cbgo_total_members_current | The total number of members in the cluster | N/A | Gauge | 
| cbgo_member_number_current | The number of the current member | N/A | Gauge | 
| cbgo_membership_type_current | The type of membership of the current member | Membership type | Gauge | 
| cbgo_offset_write_current | The latest number of the offset write | N/A | Gauge | 
| cbgo_offset_write_latency_ms_current | The latest offset write latency in milliseconds | N/A | Gauge | 
| Go DCP Version | Minimum Couchbase Server Version | 
|---|---|
| x<1.1.16 | 6.5.x | 
| 1.1.16>=x | 5.x.x | 
| Date taking effect | Version | Change | How to check | 
|---|---|---|---|
| December 14, 2023 | v1.1.19 | dcp.config.[DisableExpiryOpcode,DisableStreamEndByClient, EnableChangeStreams] removed | Review your configs | 
- example with couchbase membership
 - couchbase membership config - thanks to @onursak
 - kubernetesStatefulSet membership config
 - kubernetesHa membership config
 - static membership config
 - dynamic membership config
 - lilCouchbase membership config
 
# Make sure to use .githooks as hooks path of your git
git config core.hooksPath .githooks