Change data capture with Postgres & Elixir

CDC is the process of identifying and capturing data changes from the database.

With CDC, changes to data can be tracked in near real-time, and that information can be used to support a variety of use cases, including auditing, replication, and synchronisation.

A good example of a use case for CDC is to consider an application which inserts a record into the database and pushes an event to a message queue after the record has been inserted (write-twice). 

Imagine you’re working on an e-commerce application and after an order is created and inserted into the database, you push an OrderCreated event to a message queue. The consumers of the event might do things such as create pick orders for the warehouse, schedule transports for delivery and send an order confirmation email to the customer.

But what if the application crashes after the order has been inserted into the database but before managing to push the event to the message queue? This is possible due to the fact that you can’t atomically insert the record AND push the message in the same transaction, so if the application crashes after inserting the record to the database but before pushing the event to the queue, the event is lost.

There are of course workarounds to circumvent this: a simple solution is to “outbox” the event into an outbox table in the same transaction as writing the record, and then, rely on a CDC process to capture the change to the outbox table and push the event to the message queue. The transaction is atomic and the CDC process can assure the event

gets delivered to the message queue at-least-once.

In order to capture changes, CDC typically uses one of two methods: log-based or trigger-based.

Log-based CDC involves reading the transaction logs of the database to identify changed data, which is the method we’ll use here by utilising Postgres Logical Replication.

Postgres replication

There are two modes of replication in Postgres:

1. Physical replication – Every change from the primary are streamed to replicas via the WAL (Write Ahead Log). This replication is performed byte-by-byte with exact block addresses.

2. Logical replication – In logical replication the subscriber receives each individual transactional change (i.e. INSERT, UPDATE, or DELETE statements) to the database.

The WAL is still streamed but it encodes the logical operations so that they can be decoded by the subscriber without having to know Postgres internals.

One of the great things about logical replication is that it can be used to only replicate specific tables or rows, meaning that you have complete control over what is being replicated.

To enable logical replication the wal_level need to be set:


-- determines how much information is written to the wal. 
-- Each 'level' inherits the level below it; 'logical' is the highest level

ALTER SYSTEM SET wal_level=logical;

-- simultaneously running WAL sender processes
ALTER SYSTEM SET max_wal_senders='10';

-- simultaneously defined replication slots
ALTER SYSTEM SET max_replication_slots='10';

The changes require a restart to the Postgres instance.

After the system has been restarted the wal_level can be verified with:


SHOW wal_level;
 wal_level 
-----------
 logical
(1 row)

In order to subscribe to changes a publication must be created. A publication is a group of tables in which we would like to receive data changes for.

Let’s create a simple table and define a publication for it:


CREATE TABLE articles (id serial PRIMARY KEY, title text, description text, body text);
CREATE PUBLICATION articles_pub FOR TABLE articles;

To tell postgres to retain WAL segments we must create a replication slot]. 

The replication slot represents a stream of changes from one or more publications and is used to prevent data loss in the event of a server failure, as they are crash safe.


Replication Protocol

In order to get a feel for the protocol and messages being sent we can use pg_recvlogicalto start a replication subscriber:


# Start and use the publication defined above
# output is written to stdout
pg_recvlogical --start \
  --host='localhost' \
  --port='5432' \
  --username='postgres' \
  --dbname='postgres' \
  --option=publication_names='articles_pub' \
  --option=proto_version=1 \
  --create-slot \
  --if-not-exists \
  --slot=articles_slot \
  --plugin=pgoutput \
  --file=-

Insert a record:


INSERT INTO articles (title, description, body)
    VALUES ('Postgres replication', 'Using logical replication', 'Foo bar baz');

Each row in the output corresponds to a replication messages received through the subscription:

B(egin) - Begin transaction 
R(elation) - Table, schema, columns and their types
I(insert) - Data being inserted
C(ommit) - Commit transaction
B

Rarticlesdidtitledescriptionbody
It35tPostgres replicationtUsing logical replicationtFoo bar baz
C

If we insert multiple records in a transaction we should have two I in between B and C:

BEGIN;
INSERT INTO articles (title, description, body) VALUES ('First', 'desc', 'Foo');

INSERT INTO articles (title, description, body) VALUES ('Second', 'desc', 'Bar');
COMMIT;

And the ouput:

C
B

It37tFirsttdesctFoo
It38tSecondtdesctBar
C

The relation i.e table information was not transmitted since we already received the relation when inserting the first record.

Postgres only sends the relation the first time it’s encountered during the session. The subscriber is expected to cache a previously sent relation.

Now that we have a feel for how Logical replication works, let’s implement it in Elixir!

Implementing the replication connection 

Create a new Elixir project:

mix new cdc

We’ll add the following dependencies to mix.exs


defp deps do
  {:postgrex, "~> 0.16.4"},
  # decode/encode replication messages
  {:postgrex_pgoutput, "~> 0.1.0"}
end

postgrex supports replication through the Postgrex.ReplicationConnection process.


defmodule CDC.Replication do
  use Postgrex.ReplicationConnection
  require Logger

  defstruct [
    :publications,
    :slot,
    :state
  ]

  def start_link(opts) do
    conn_opts = [auto_reconnect: true]
    publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
    slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"

    Postgrex.ReplicationConnection.start_link(
      __MODULE__,
      {slot, publications},
      conn_opts ++ opts
    )
  end

  @impl true
  def init({slot, pubs}) do
    {:ok, %__MODULE__{slot: slot, publications: pubs}}
  end

  
  @impl true
  def handle_connect(%__MODULE__{slot: slot} = state) do
    query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"

    Logger.debug("[create slot] query=#{query}")

    {:query, query, %{state | state: :create_slot}}
  end

  @impl true
  def handle_result([%Postgrex.Result{} | _], %__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state) do
    opts = [proto_version: 1, publication_names: pubs]

    query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"

    Logger.debug("[start streaming] query=#{query}")

    {:stream, query, [], %{state | state: :streaming}}
  end

  @impl true
  def handle_data(msg, state) do
    Logger.debug("Received msg=#{inspect(msg, limit: :infinity, pretty: true)}")
    {:noreply, [], state}
  end

  defp escape_options([]),
    do: ""

  defp escape_options(opts) do
    parts =
      Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)

    [?\s, ?(, parts, ?)]
  end

  defp escape_string(value) do
    [?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
  end
end

The code is available on GitHub

Let’s try it out:

opts = [
  slot: "articles_slot_elixir",
  publications: ["articles_pub"],
  host: "localhost",
  database: "postgres",
  username: "postgres",
  password: "postgres",
  port: 5432,
]

CDC.Replication.start_link(opts)

When we start the process the following is happening:

  1. Once we are connected to postgres the callback handle_connect/1 is called, a temporary logical replication slot is created.

  2. handle_result/2 is called with the result from the query in ‘1’. If the slot was created successfully we start streaming from the slot and go into streaming mode. The requested position ‘0/0’ means that Postgres picks the position. 

  3. Any replication messages sent from postgres are received in the handle_data/2 callback.

Replication messages

There are two types of messages a subscriber receives: 

1. primary_keep_alive – A checkin message, if reply == 1 the subscriber is expected to reply to the message with a standy_status_update to avoid a timeout disconnect. 

The standy_status_update contains the current LSN the subscriber has processed. 

Postgres uses this message to determine which WAL segments can be safely removed. 

2. xlog_data – Contains the data messages for each step in a transaction.

Since we are not responding to the primary_keep_alive messages the process gets disconnected and restarts
.

Let’s fix it by decoding the messages and start replying with standby_status_update messages.


defmodule CDC.Replication do
  use Postgrex.ReplicationConnection

  require Postgrex.PgOutput.Messages
  alias Postgrex.PgOutput.{Messages, Lsn}

  require Logger

  defstruct [
    :publications,
    :slot,
    :state
  ]

  def start_link(opts) do
    conn_opts = [auto_reconnect: true]
    publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
    slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"

    Postgrex.ReplicationConnection.start_link(
      __MODULE__,
      {slot, publications},
      conn_opts ++ opts
    )
  end

  @impl true
  def init({slot, pubs}) do
    {:ok, %__MODULE__{slot: slot, publications: pubs}}
  end

  @impl true
  def handle_connect(%__MODULE__{slot: slot} = state) do
    query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"

    Logger.debug("[create slot] query=#{query}")

    {:query, query, %{state | state: :create_slot}}
  end

  @impl true
  def handle_result(
        [%Postgrex.Result{} | _],
        %__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state
      ) do
    opts = [proto_version: 1, publication_names: pubs]

    query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"

    Logger.debug("[start streaming] query=#{query}")

    {:stream, query, [], %{state | state: :streaming}}
  end

  @impl true
  def handle_data(msg, state) do
    return_msgs =
      msg
      |> Messages.decode()
      |> handle_msg()

    {:noreply, return_msgs, state}
  end

  #
  defp handle_msg(Messages.msg_primary_keep_alive(server_wal: lsn, reply: 1)) do
    Logger.debug("msg_primary_keep_alive message reply=true")
    <<lsn::64>> = Lsn.encode(lsn)

    [standby_status_update(lsn)]
  end

  defp handle_msg(Messages.msg_primary_keep_alive(reply: 0)), do: []

  defp handle_msg(Messages.msg_xlog_data(data: data)) do
    Logger.debug("xlog_data message: #{inspect(data, pretty: true)}")
    []
  end

  defp standby_status_update(lsn) do
    [
      wal_recv: lsn + 1,
      wal_flush: lsn + 1,
      wal_apply: lsn + 1,
      system_clock: Messages.now(),
      reply: 0
    ]
    |> Messages.msg_standby_status_update()
    |> Messages.encode()
  end

  
defp escape_options([]),
    do: ""

  defp escape_options(opts) do
    parts =
      Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)

    [?\s, ?(, parts, ?)]
  end

  defp escape_string(value) do
    [?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
  end
end


handle_data/2 decodes the message and passes it to handle_msg/1.  If it’s a primary_keep_alive we respond with a standby_status_update.

The LSN denotes a byte position in the WAL. 

The subscriber responds with the LSN it has currently handled, since we are not tracking the messages we receive, we just ack with the LSN sent from the server.

Next we’ll handle xlog_data messages, the idea here is that we’ll capture each operation into a Transaction struct.

Capturing transactions

The CDC.Protocol module will handle xlog_data messages and track the state of the transaction. 


defmodule CDC.Protocol do
  import Postgrex.PgOutput.Messages
  require Logger

  alias CDC.Tx
  alias Postgrex.PgOutput.Lsn

  
@type t :: %__MODULE__{
          tx: Tx.t(),
          relations: map()
        }

  defstruct [
    :tx,
    relations: %{}
  ]

  @spec new() :: t()
  def new do
    %__MODULE__{}
  end

  def handle_message(msg, state) when is_binary(msg) do
    msg
    |> decode()
    |> handle_message(state)
  end

  def handle_message(msg_primary_keep_alive(reply: 0), state), do: {[], nil, state}
  def handle_message(msg_primary_keep_alive(server_wal: lsn, reply: 1), state) do
    Logger.debug("msg_primary_keep_alive message reply=true")
    <<lsn::64>> = Lsn.encode(lsn)

    {[standby_status_update(lsn)], nil, state}
  end

  def handle_message(msg, %__MODULE__{tx: nil, relations: relations} = state) do
    tx =
      [relations: relations, decode: true]
      |> Tx.new()
      |> Tx.build(msg)

    {[], nil, %{state | tx: tx}}
  end

  def handle_message(msg, %__MODULE__{tx: tx} = state) do
    case Tx.build(tx, msg) do
      %Tx{state: :commit, relations: relations} ->
        tx = Tx.finalize(tx)
        relations = Map.merge(state.relations, relations)
        {[], tx, %{state | tx: nil, relations: relations}}

      tx ->
        {[], nil, %{state | tx: tx}}
    end
  end

  defp standby_status_update(lsn) do
    [
      wal_recv: lsn + 1,
      wal_flush: lsn + 1,
      wal_apply: lsn + 1,
      system_clock: now(),
      reply: 0
    ]
    |> msg_standby_status_update()
    |> encode()
  end
end

CDC.Tx handles messages received within a transaction, begin, relation, insert/update/delete and commit.


defmodule CDC.Tx do
  import Postgrex.PgOutput.Messages
  alias Postgrex.PgOutput.Lsn

  alias __MODULE__.Operation

  @type t :: %__MODULE__{
          operations: [Operation.t()],
          relations: map(),
          timestamp: term(),
          xid: pos_integer(),
          state: :begin | :commit,
          lsn: Lsn.t(),
          end_lsn: Lsn.t()
        }

  defstruct [
    :timestamp,
    :xid,
    :lsn,
    :end_lsn,
    relations: %{},
    operations: [],
    state: :begin,
    decode: true
  ]

  def new(opts \\ []) do
    struct(__MODULE__, opts)
  end

  def finalize(%__MODULE__{state: :commit, operations: ops} = tx) do
    %{tx | operations: Enum.reverse(ops)}
  end

  def finalize(%__MODULE__{} = tx), do: tx

  @spec build(t(), tuple()) :: t()
  def build(tx, msg_xlog_data(data: data)) do
    build(tx, data)
  end

  def build(tx, msg_begin(lsn: lsn, timestamp: ts, xid: xid)) do
    %{tx | lsn: lsn, timestamp: ts, xid: xid, state: :begin}
  end

  def build(%__MODULE__{state: :begin, relations: relations} = tx, msg_relation(id: id) = rel) do
    %{tx | relations: Map.put(relations, id, rel)}
  end

  def build(%__MODULE__{state: :begin, lsn: tx_lsn} = tx, msg_commit(lsn: lsn, end_lsn: end_lsn))
      when tx_lsn == lsn do
    %{tx | state: :commit, end_lsn: end_lsn}
  end

  def build(%__MODULE__{state: :begin} = builder, msg_insert(relation_id: id) = msg),
    do: build_op(builder, id, msg)

  def build(%__MODULE__{state: :begin} = builder, msg_update(relation_id: id) = msg),
    do: build_op(builder, id, msg)

  def build(%__MODULE__{state: :begin} = builder, msg_delete(relation_id: id) = msg),
    do: build_op(builder, id, msg)

  # skip unknown messages
  def build(%__MODULE__{} = tx, _msg), do: tx

  defp build_op(%__MODULE__{state: :begin, relations: rels, decode: decode} = tx, id, msg) do
    rel = Map.fetch!(rels, id)
    op = Operation.from_msg(msg, rel, decode)

    %{tx | operations: [op | tx.operations]}
  end
end

CDC.Tx.Operation handles INSERT/UPDATE/DELETE messages and decodes the data by combining it with the relation 


defmodule CDC.Tx.Operation do
  @moduledoc "Describes a change (INSERT, UPDATE, DELETE) within a transaction."

  import Postgrex.PgOutput.Messages
  alias Postgrex.PgOutput.Type, as: PgType

  @type t :: %__MODULE__{}
  defstruct [
    :type,
    :schema,
    :namespace,
    :table,
    :record,
    :old_record,
    :timestamp
  ]

  @spec from_msg(tuple(), tuple(), decode :: boolean()) :: t()
  def from_msg(
        msg_insert(data: data),
        msg_relation(columns: columns, namespace: ns, name: name),
        decode?
      ) do
    %__MODULE__{
      type: :insert,
      namespace: ns,
      schema: into_schema(columns),
      table: name,
      record: cast(data, columns, decode?),
      old_record: %{}
    }
  end

  def from_msg(
        msg_update(change_data: data, old_data: old_data),
        msg_relation(columns: columns, namespace: ns, name: name),
        decode?
      ) do
    %__MODULE__{
      type: :update,
      namespace: ns,
      table: name,
      schema: into_schema(columns),
      record: cast(data, columns, decode?),
      old_record: cast(columns, old_data, decode?)
    }
  end

  def from_msg(
        msg_delete(old_data: data),
        msg_relation(columns: columns, namespace: ns, name: name),
        decode?
      ) do
    %__MODULE__{
      type: :delete,
      namespace: ns,
      schema: into_schema(columns),
      table: name,
      record: %{},
      old_record: cast(data, columns, decode?)
    }
  end

  defp into_schema(columns) do
    for c <- columns do
      c
      |> column()
      |> Enum.into(%{})
    end
  end

  defp cast(data, columns, decode?) do
    Enum.zip_reduce([data, columns], %{}, fn [text, typeinfo], acc ->
      key = column(typeinfo, :name)

      value =
        if decode? do
          t =
            typeinfo
            |> column(:type)
            |> PgType.type_info()

          PgType.decode(text, t)
        else
          text
        end

      Map.put(acc, key, value)
    end)
  end
end

As before, the primary_keep_alive message with reply == 1 sends a standby_status_update. When we receive an xlog_data message, we create a new %Tx{} which we use to “build” the transaction until we receive a msg_commit which marks the end of the transaction.

Any insert, update, delete messages creates an CDC.Tx.Operation in the transaction, each operation contains a relation_id which is used to look up the relation from tx.relations. 

The operation together with the relation enables us to decode the data. Column and type information is retrieved from the relation and is used to decode the values into elixir terms.
.

Once we are in a commit state we merge Tx.relations with Protocol.relations since a relation message will only be transmitted the first time a table is encountered during the connection session, Protocol.relations contains all msg_relation we’ve been sent during the session.

The CDC.Replication module now looks like this:


defmodule CDC.Replication do
  use Postgrex.ReplicationConnection

  alias CDC.Protocol

  require Logger

  defstruct [
    :publications,
    :protocol,
    :slot,
    :state
  ]

  def start_link(opts) do
    conn_opts = [auto_reconnect: true]
    publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
    slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"

    Postgrex.ReplicationConnection.start_link(
      __MODULE__,
      {slot, publications},
      conn_opts ++ opts
    )
  end

  @impl true
  def init({slot, pubs}) do
    {:ok,
     %__MODULE__{
       slot: slot,
       publications: pubs,
       protocol: Protocol.new()
     }}
  end

  @impl true
  def handle_connect(%__MODULE__{slot: slot} = state) do
    query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"

    Logger.debug("[create slot] query=#{query}")

    {:query, query, %{state | state: :create_slot}}
  end

  @impl true
  def handle_result(
        [%Postgrex.Result{} | _],
        %__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state
      ) do
    opts = [proto_version: 1, publication_names: pubs]

    query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"

    Logger.debug("[start streaming] query=#{query}")

    {:stream, query, [], %{state | state: :streaming}}
  end

  @impl true
  def handle_data(msg, state) do
    {return_msgs, tx, protocol} = Protocol.handle_message(msg, state.protocol)

    if not is_nil(tx) do
      Logger.debug("Tx: #{inspect(tx, pretty: true)}")
    end

    {:noreply, return_msgs, %{state | protocol: protocol}}
  end

  
defp escape_options([]),
    do: ""

  defp escape_options(opts) do
    parts =
      Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)

    [?\s, ?(, parts, ?)]
  end

  defp escape_string(value) do
    [?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
  end
end

handle_data/2 calls Protocol.handle_message/1 which returns a tuple with three elements {messages_to_send :: [binary()], complete_transaction :: CDC.Tx.t() | nil, CDC.Protocol.t()}

For now we just inspect the transaction when it’s emitted from Protocol.handle_message/3, let’s try it out:


Interactive Elixir (1.14.0) - press Ctrl+C to exit (type h() ENTER for help)
opts = [
  slot: "articles_slot_elixir",
  publications: ["articles_pub"],
  host: "localhost",
  database: "postgres",
  username: "postgres",
  password: "postgres",
  port: 5432,
]

{:ok, _} = CDC.Replication.start_link(opts)
{:ok, pid} = Postgrex.start_link(opts)

insert_query = """
INSERT INTO articles (title, description, body) 
VALUES ('Postgres replication', 'Using logical replication', 'with Elixir!')
"""

_ = Postgrex.query!(pid, insert_query, [])
  
14:03:48.020 [debug] Tx: %CDC.Tx{
  timestamp: ~U[2022-10-31 13:03:48Z],
  xid: 494,
  lsn: {0, 22981920},
  end_lsn: nil,
  relations: %{
    16386 => {:msg_relation, 16386, "public", "articles", :default,
     [
       {:column, [:key], "id", :int4, -1},
       {:column, [], "title", :text, -1},
       {:column, [], "description", :text, -1},
       {:column, [], "body", :text, -1}
     ]}
  },
  operations: [
    %CDC.Tx.Operation{
      type: :insert,
      schema: [
        %{flags: [:key], modifier: -1, name: "id", type: :int4},
        %{flags: [], modifier: -1, name: "title", type: :text},
        %{flags: [], modifier: -1, name: "description", type: :text},
        %{flags: [], modifier: -1, name: "body", type: :text}
      ],
      namespace: "public",
      table: "articles",
      record: %{
        "body" => "with Elixir!",
        "description" => "Using logical replication",
        "id" => 6,
        "title" => "Postgres replication"
      },
      old_record: %{},
      timestamp: nil
    }
  ],
  state: :begin,
  decode: true
}

Each change in the transaction is stored in Tx.operations, operation.record is the decoded row as a map.
Finally let’s implement a way to subscribe to changes from CDC.Replication:

defmodule CDC.Replication do
  use Postgrex.ReplicationConnection

  alias CDC.Protocol

  require Logger

  defstruct [
    :publications,
    :protocol,
    :slot,
    :state,
    subscribers: %{}
  ]

  def start_link(opts) do
    conn_opts = [auto_reconnect: true]
    publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
    slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"

    Postgrex.ReplicationConnection.start_link(
      __MODULE__,
      {slot, publications},
      conn_opts ++ opts
    )
  end

  def subscribe(pid, opts \\ []) do
    Postgrex.ReplicationConnection.call(pid, :subscribe, Keyword.get(opts, :timeout, 5_000))
  end

  def unsubscribe(pid, ref, opts \\ []) do
    Postgrex.ReplicationConnection.call(
      pid,
      {:unsubscribe, ref},
      Keyword.get(opts, :timeout, 5_000)
    )
  end

  @impl true
  def init({slot, pubs}) do
    {:ok,
     %__MODULE__{
       slot: slot,
       publications: pubs,
       protocol: Protocol.new()
     }}
  end

  @impl true
  def handle_connect(%__MODULE__{slot: slot} = state) do
    query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"

    Logger.debug("[create slot] query=#{query}")

    {:query, query, %{state | state: :create_slot}}
  end

  @impl true
  def handle_result(
        [%Postgrex.Result{} | _],
        %__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state
      ) do
    opts = [proto_version: 1, publication_names: pubs]

    query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"

    Logger.debug("[start streaming] query=#{query}")

    {:stream, query, [], %{state | state: :streaming}}
  end

  @impl true
  def handle_data(msg, state) do
    {return_msgs, tx, protocol} = Protocol.handle_message(msg, state.protocol)

    if not is_nil(tx) do
      notify(tx, state.subscribers)
    end

    {:noreply, return_msgs, %{state | protocol: protocol}}
  end

  # Replies must be sent using `reply/2`
  # https://hexdocs.pm/postgrex/Postgrex.ReplicationConnection.html#reply/2
  @impl true
  def handle_call(:subscribe, {pid, _} = from, state) do
    ref = Process.monitor(pid)

    state = put_in(state.subscribers[ref], pid)

    Postgrex.ReplicationConnection.reply(from, {:ok, ref})

    {:noreply, state}
  end

  def handle_call({:unsubscribe, ref}, from, state) do
    {reply, new_state} =
      case state.subscribers do
        %{^ref => _pid} ->
          Process.demonitor(ref, [:flush])

          {_, state} = pop_in(state.subscribers[ref])
          {:ok, state}

        _ ->
          {:error, state}
      end

    from && Postgrex.ReplicationConnection.reply(from, reply)

    {:noreply, new_state}
  end

  @impl true
  def handle_info({:DOWN, ref, :process, _, _}, state) do
    handle_call({:unsubscribe, ref}, nil, state)
  end

  defp notify(tx, subscribers) do
    for {ref, pid} <- subscribers do
      send(pid, {:notification, self(), ref, tx})
    end

    :ok
  end

  defp escape_options([]),
    do: ""

  defp escape_options(opts) do
    parts =
      Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)

    [?\s, ?(, parts, ?)]
  end

  defp escape_string(value) do
    [?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
  end
end

And we can use it like this:


opts = [
  slot: "articles_slot",
  publications: ["articles_pub"],
  host: "localhost",
  database: "postgres",
  username: "postgres",
  password: "postgres",
  port: 5432,
]

{:ok, pid} = CDC.Replication.start_link(opts)
{:ok, pg_pid} = Postgrex.start_link(opts)
{:ok, ref} = CDC.Replication.subscribe(pid)

insert_query = """
INSERT INTO articles (title, description, body) 
VALUES ('Postgres replication', 'Using logical replication', 'with Elixir!')
"""

_ = Postgrex.query!(pg_pid, insert_query, [])
flush()

{:notification, #PID<0.266.0>, #Reference<0.2499916608.3416784901.94813>,
 %CDC.Tx{
   timestamp: ~U[2022-10-31 13:26:35Z],
   xid: 495,
   lsn: {0, 22983536},
   end_lsn: nil,
   relations: %{
     16386 => {:msg_relation, 16386, "public", "articles", :default,
      [
        {:column, [:key], "id", :int4, -1},
        {:column, [], "title", :text, -1},
        {:column, [], "description", :text, -1},
        {:column, [], "body", :text, -1}
      ]}
   },
   operations: [
     %CDC.Tx.Operation{
       type: :insert,
       schema: [
         %{flags: [:key], modifier: -1, name: "id", type: :int4},
         %{flags: [], modifier: -1, name: "title", type: :text},
         %{flags: [], modifier: -1, name: "description", type: :text},
         %{flags: [], modifier: -1, name: "body", type: :text}
       ],
       namespace: "public",
       table: "articles",
       record: %{
         "body" => "with Elixir!",
         "description" => "Using logical replication",
         "id" => 7,
         "title" => "Postgres replication"
       },
       old_record: %{},
       timestamp: nil
     }
   ],
   state: :begin,
   decode: true
 }}

Conclusion

If you’re looking for a way to capture changes from your database with minimal changes to your existing setup, Changing Data Capture is definitely worth considering. With Elixir and postgrex we’ve implemented a mini Debezium in ~400 LOC. Full source is available here.

If you need help with your Elixir implementation our world-leading team of experts is always here to help. Contact us today to find out how we can help you.

Keep reading

Advent of Code 2024

Advent of Code 2024

Join Lorena in this years Advent of Code 2024. She'll be solving daily puzzles throughout the month of December.

Optimising for Concurrency: Comparing and contrasting the BEAM and JVM virtual machines

Optimising for Concurrency: Comparing and contrasting the BEAM and JVM virtual machines

Attila Sragli explores the BEAM VM's inner workings, comparing them to the JVM to highlight their importance.

MongooseIM 6.3: Prometheus, CockroachDB and more

MongooseIM 6.3: Prometheus, CockroachDB and more

Pawel Chrząszcz introduces MongooseIM 6.3.0 with Prometheus monitoring and CockroachDB support for greater scalability and flexibility.