Change Data Capture (CDC)

On this page Carat arrow pointing down
Warning:
CockroachDB v2.1 is no longer supported as of April 30, 2020. For more details, refer to the Release Support Policy.

New in v2.1: Change data capture (CDC) provides efficient, distributed, row-level change feeds into Apache Kafka for downstream processing such as reporting, caching, or full-text indexing.

Warning:

This feature is under active development and only works for a targeted use case. Please file a Github issue if you have feedback on the roadmap.

Note:

CDC is an enterprise-only. There will be a core version in a future release.

What is change data capture?

While CockroachDB is an excellent system of record, it also needs to coexist with other systems. For example, you might want to keep your data mirrored in full-text indexes, analytics engines, or big data pipelines.

The core feature of CDC is the changefeed. Changefeeds target an allowlist of tables, called the "watched rows". Every change to a watched row is emitted as a record in a configurable format (JSON or Avro) to a configurable sink (Kafka).

Ordering guarantees

  • In most cases, each version of a row will be emitted once. However, some infrequent conditions (e.g., node failures, network partitions) will cause them to be repeated. This gives our changefeeds an at-least-once delivery guarantee.

  • Once a row has been emitted with some timestamp, no previously unseen versions of that row will be emitted with a lower timestamp. That is, you will never see a new change for that row at an earlier timestamp.

    For example, if you ran the following:

    > CREATE TABLE foo (id INT PRIMARY KEY DEFAULT unique_rowid(), name STRING);
    > CREATE CHANGEFEED FOR TABLE foo INTO 'kafka://localhost:9092' WITH UPDATED;
    > INSERT INTO foo VALUES (1, 'Carl');
    > UPDATE foo SET name = 'Petee' WHERE id = 1;
    

    You'd expect the changefeed to emit:

    [1] {"__crdb__": {"updated": <timestamp 1>}, "id": 1, "name": "Carl"}
    [1] {"__crdb__": {"updated": <timestamp 2>}, "id": 1, "name": "Petee"}
    

    It is also possible that the changefeed emits an out of order duplicate of an earlier value that you already saw:

    [1] {"__crdb__": {"updated": <timestamp 1>}, "id": 1, "name": "Carl"}
    [1] {"__crdb__": {"updated": <timestamp 2>}, "id": 1, "name": "Petee"}
    [1] {"__crdb__": {"updated": <timestamp 1>}, "id": 1, "name": "Carl"}
    

    However, you will never see an output like the following (i.e., an out of order row that you've never seen before):

    [1] {"__crdb__": {"updated": <timestamp 2>}, "id": 1, "name": "Petee"}
    [1] {"__crdb__": {"updated": <timestamp 1>}, "id": 1, "name": "Carl"}
    
  • If a row is modified more than once in the same transaction, only the last change will be emitted.

  • Rows are sharded between Kafka partitions by the row’s primary key.

  • The UPDATED option adds an "updated" timestamp to each emitted row. You can also use the RESOLVED option to emit periodic "resolved" timestamp messages to each Kafka partition. A resolved timestamp is a guarantee that no (previously unseen) rows with a lower update timestamp will be emitted on that partition.

    For example:

    {"__crdb__": {"updated": "1532377312562986715.0000000000"}, "id": 1, "name": "Petee H"}
    {"__crdb__": {"updated": "1532377306108205142.0000000000"}, "id": 2, "name": "Carl"}
    {"__crdb__": {"updated": "1532377358501715562.0000000000"}, "id": 3, "name": "Ernie"}
    {"__crdb__":{"resolved":"1532379887442299001.0000000000"}}
    {"__crdb__":{"resolved":"1532379888444290910.0000000000"}}
    {"__crdb__":{"resolved":"1532379889448662988.0000000000"}}
    ...
    {"__crdb__":{"resolved":"1532379922512859361.0000000000"}}
    {"__crdb__": {"updated": "1532379923319195777.0000000000"}, "id": 4, "name": "Lucky"}
    
  • With duplicates removed, an individual row is emitted in the same order as the transactions that updated it. However, this is not true for updates to two different rows, even two rows in the same table. Resolved timestamp notifications on every Kafka partition can be used to provide strong ordering and global consistency guarantees by buffering records in between timestamp closures.

    Because CockroachDB supports transactions that can affect any part of the cluster, it is not possible to horizontally divide the transaction log into independent changefeeds.

Schema changes with column backfill

When schema changes with column backfill (e.g., adding a column with a default, adding a computed column, adding a NOT NULL column, dropping a column) are made to watched rows, the changefeed will emit some duplicates during the backfill. When it finishes, CockroachDB outputs all watched rows using the new schema.

For example, start with the changefeed created in the example below:

[1] {"id": 1, "name": "Petee H"}
[2] {"id": 2, "name": "Carl"}
[3] {"id": 3, "name": "Ernie"}

Add a column to the watched table:

icon/buttons/copy
> ALTER TABLE office_dogs ADD COLUMN likes_treats BOOL DEFAULT TRUE;

The changefeed emits duplicate records 1, 2, and 3 before outputting the records using the new schema:

[1] {"id": 1, "name": "Petee H"}
[2] {"id": 2, "name": "Carl"}
[3] {"id": 3, "name": "Ernie"}
[1] {"id": 1, "name": "Petee H"}  # Duplicate
[2] {"id": 2, "name": "Carl"}     # Duplicate
[3] {"id": 3, "name": "Ernie"}    # Duplicate
[1] {"id": 1, "likes_treats": true, "name": "Petee H"}
[2] {"id": 2, "likes_treats": true, "name": "Carl"}
[3] {"id": 3, "likes_treats": true, "name": "Ernie"}

Configure a changefeed

Create

To create a changefeed:

icon/buttons/copy
> CREATE CHANGEFEED FOR TABLE name INTO 'kafka://host:port';

For more information, see CREATE CHANGEFEED.

Pause

To pause a changefeed:

icon/buttons/copy
> PAUSE JOB job_id;

For more information, see PAUSE JOB.

Resume

To resume a paused changefeed:

icon/buttons/copy
> RESUME JOB job_id;

For more information, see RESUME JOB.

Cancel

To cancel a changefeed:

icon/buttons/copy
> CANCEL JOB job_id;

For more information, see CANCEL JOB.

Monitor a changefeed

Changefeed progress is exposed as a high-water timestamp that advances as the changefeed progresses. This is a guarantee that all changes before or at the timestamp have been emitted. You can monitor a changefeed:

  • On the Jobs page of the Admin UI. Hover over the high-water timestamp to view the system time.
  • Using crdb_internal.jobs:

    icon/buttons/copy
    > SELECT * FROM crdb_internal.jobs WHERE job_id = <job_id>;
    
            job_id       |  job_type  |                              description                               | ... |      high_water_timestamp      | error | coordinator_id
    +--------------------+------------+------------------------------------------------------------------------+ ... +--------------------------------+-------+----------------+
      383870400694353921 | CHANGEFEED | CREATE CHANGEFEED FOR TABLE office_dogs2 INTO 'kafka://localhost:9092' | ... | 1537279405671006870.0000000000 |       |              1
    (1 row)
    
Note:

You can use the high-water timestamp to start a new changefeed where another ended.

Usage examples

Create a changefeed connected to Kafka

In this example, you'll set up a changefeed for a single-node cluster that is connected to a Kafka sink.

  1. If you do not already have one, request a trial enterprise license.

  2. In a terminal window, start cockroach:

    icon/buttons/copy
    $ cockroach start --insecure --listen-addr=localhost --background
    
  3. Download and extract the Confluent Open Source platform (which includes Kafka).

  4. Move into the extracted confluent-<version> directory and start Confluent:

    icon/buttons/copy
    $ ./bin/confluent start
    

    Only zookeeper and kafka are needed. To troubleshoot Confluent, see their docs.

  5. Create a Kafka topic:

    icon/buttons/copy
    $ ./bin/kafka-topics \
    --create \
    --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic office_dogs
    
    Note:

    You are expected to create any Kafka topics with the necessary number of replications and partitions. Topics can be created manually or Kafka brokers can be configured to automatically create topics with a default partition count and replication factor.

  6. As the root user, open the built-in SQL client:

    icon/buttons/copy
    $ cockroach sql --insecure
    
  7. Set your organization name and enterprise license key that you received via email:

    icon/buttons/copy
    > SET CLUSTER SETTING cluster.organization = '<organization name>';
    
    icon/buttons/copy
    > SET CLUSTER SETTING enterprise.license = '<secret>';
    
  8. Create a database called cdc_demo:

    icon/buttons/copy
    > CREATE DATABASE cdc_demo;
    
  9. Set the database as the default:

    icon/buttons/copy
    > SET DATABASE = cdc_demo;
    
  10. Create a table and add data:

    icon/buttons/copy
    > CREATE TABLE office_dogs (
         id INT PRIMARY KEY,
         name STRING);
    
    icon/buttons/copy
    > INSERT INTO office_dogs VALUES
       (1, 'Petee'),
       (2, 'Carl');
    
    icon/buttons/copy
    > UPDATE office_dogs SET name = 'Petee H' WHERE id = 1;
    
  11. Start the changefeed:

    icon/buttons/copy
    > CREATE CHANGEFEED FOR TABLE office_dogs INTO 'kafka://localhost:9092';
    
    
            job_id       
    +--------------------+
      360645287206223873
    (1 row)
    

    This will start up the changefeed in the background and return the job_id. The changefeed writes to Kafka.

  12. In a new terminal, move into the extracted confluent-<version> directory and start watching the Kafka topic:

    icon/buttons/copy
    $ ./bin/kafka-console-consumer \
    --bootstrap-server=localhost:9092 \
    --property print.key=true \
    --from-beginning \
    --topic=office_dogs
    
    [1] {"id": 1, "name": "Petee H"}
    [2] {"id": 2, "name": "Carl"}
    

    Note that the initial scan displays the state of the table as of when the changefeed started (therefore, the initial value of "Petee" is omitted).

  13. Back in the SQL client, insert more data:

    icon/buttons/copy
    > INSERT INTO office_dogs VALUES (3, 'Ernie');
    
  14. Back in the terminal where you're watching the Kafka topic, the following output has appeared:

    [3] {"id": 3, "name": "Ernie"}
    
  15. When you are done, exit the SQL shell (\q).

  16. To stop cockroach, run:

    icon/buttons/copy
    $ cockroach quit --insecure
    
  17. To stop Kafka, move into the extracted confluent-<version> directory and stop Confluent:

    icon/buttons/copy
    $ ./bin/confluent stop
    

Create a changefeed in Avro connected to Kafka

In this example, you'll set up a changefeed for a single-node cluster that is connected to a Kafka sink and emits Avro records.

  1. If you do not already have one, request a trial enterprise license.

  2. In a terminal window, start cockroach:

    icon/buttons/copy
    $ cockroach start --insecure --listen-addr=localhost --background
    
  3. Download and extract the Confluent Open Source platform (which includes Kafka).

  4. Move into the extracted confluent-<version> directory and start Confluent:

    icon/buttons/copy
    $ ./bin/confluent start
    

    Only zookeeper, kafka, and schema-registry are needed. To troubleshoot Confluent, see their docs.

  5. Create a Kafka topic:

    icon/buttons/copy
    $ ./bin/kafka-topics \
    --create \
    --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic office_dogs
    
    Note:

    You are expected to create any Kafka topics with the necessary number of replications and partitions. Topics can be created manually or Kafka brokers can be configured to automatically create topics with a default partition count and replication factor.

  6. As the root user, open the built-in SQL client:

    icon/buttons/copy
    $ cockroach sql --insecure
    
  7. Set your organization name and enterprise license key that you received via email:

    icon/buttons/copy
    > SET CLUSTER SETTING cluster.organization = '<organization name>';
    
    icon/buttons/copy
    > SET CLUSTER SETTING enterprise.license = '<secret>';
    
  8. Create a database called cdc_demo:

    icon/buttons/copy
    > CREATE DATABASE cdc_demo;
    
  9. Set the database as the default:

    icon/buttons/copy
    > SET DATABASE = cdc_demo;
    
  10. Create a table and add data:

    icon/buttons/copy
    > CREATE TABLE office_dogs (
         id INT PRIMARY KEY,
         name STRING);
    
    icon/buttons/copy
    > INSERT INTO office_dogs VALUES
       (1, 'Petee'),
       (2, 'Carl');
    
    icon/buttons/copy
    > UPDATE office_dogs SET name = 'Petee H' WHERE id = 1;
    
  11. Start the changefeed:

    icon/buttons/copy
    > CREATE CHANGEFEED FOR TABLE office_dogs INTO 'kafka://localhost:9092' WITH format = 'experimental-avro', confluent_schema_registry = 'http://localhost:8081';
    
            job_id       
    +--------------------+
      360645287206223873
    (1 row)
    

    This will start up the changefeed in the background and return the job_id. The changefeed writes to Kafka.

  12. In a new terminal, move into the extracted confluent-<version> directory and start watching the Kafka topic:

    icon/buttons/copy
    $ ./bin/kafka-avro-console-consumer \
    --bootstrap-server=localhost:9092 \
    --property print.key=true \
    --from-beginning \
    --topic=office_dogs
    
    {"id":1}    {"id":1,"name":{"string":"Petee H"}}
    {"id":2}    {"id":2,"name":{"string":"Carl"}}
    

    Note that the initial scan displays the state of the table as of when the changefeed started (therefore, the initial value of "Petee" is omitted).

  13. Back in the SQL client, insert more data:

    icon/buttons/copy
    > INSERT INTO office_dogs VALUES (3, 'Ernie');
    
  14. Back in the terminal where you're watching the Kafka topic, the following output has appeared:

    {"id":3}    {"id":3,"name":{"string":"Ernie"}}
    
  15. When you are done, exit the SQL shell (\q).

  16. To stop cockroach, run:

    icon/buttons/copy
    $ cockroach quit --insecure
    
  17. To stop Kafka, move into the extracted confluent-<version> directory and stop Confluent:

    icon/buttons/copy
    $ ./bin/confluent stop
    

Known limitations

The following are limitations in the v2.1 release and will be addressed in the future:

  • The CockroachDB core changefeed is not ready for external testing.
  • Changefeeds only work on tables with a single column family (which is the default for new tables).
  • Many DDL queries (including TRUNCATE and DROP TABLE) will cause errors on a changefeed watching the affected tables. You will need to start a new changefeed.
  • Changefeeds cannot be backed up or restored.
  • Changefeed backoff/retry behavior during partial or intermittent sink unavailability has not been optimized; however, ordering guarantees will still hold for as long as a changefeed remains active.
  • Changefeeds use a pull model, but will use a push model in the future, lowering latencies considerably.
  • Changefeeds cannot be altered. To alter, cancel the changefeed and create a new one with updated settings from where it left off.
  • Additional envelope options will be added, including one that displays the old and new values for the changed row.
  • Additional target options will be added, including partitions and ranges of primary key rows.

See also


Yes No
On this page

Yes No