Erlang

ex_rabbit_pool open source AMQP connection pool

by Simon Benitez

Background

A couple of months ago we started writing an open-source continuous delivery system which we called Buildex.

Buildex is implemented as a collection of microservices which:

  • - Monitor Github repositories
  • - Respond to changes by sending notifications over AMQP (RabbitMQ)
  • - Trigger software builds running in Docker containers
  • - Expose credentials (SSH keys) to containers as volume mounts.

Buildex was born of the frustrations and limitations we faced when attempting to debug production failures in other build systems. Buildex was also influenced by security, privacy, and pricing concerns we encountered while using some of the popular commercial SaaS offerings.

Buildex Component overview

The principal components of Buildex are: The Poller, which uses the excellent Tentacat Github API to poll GitHub for new tags, the Builder, which checks out and builds the project inside Docker containers, and the Datastore which uses ueberauth OAuth to delegate authorization to Github, and allows users to configure projects for the continuous integration pipeline.

The following diagram provides a high-level overview of the Buildex micro-services and supporting infrastructure.

Although we could have used distributed Erlang for everything, we wanted to have the ability for operations and develops to easily monitor inter-service communication and define message replication as they pleased, for example with logging or integration with 3rd party services (email, auditing, manually triggering builds, etc).

We decided to use RabbitMQ for inter-service communication, which in turn led to our need for a higher level connection handling and channel management library.

ex_rabbit_pool

As part of the work to support these services, we created a RabbitMQ connection management and pooling library called ex_rabbit_pool. ex_rabbit_pool is built upon the Elixir library AMQP which is an Elixir wrapper for the RabbitMQ client rabbitmq-erlang-client, and the Erlang resource pooling library poolboy. In this post, we will take you through the lessons learnt and steps taken to implement ex_rabbit_pool, but first, a quick explanation of channels and connections.

Channels and connections

Connections, as in TCP connections, to RabbitMQ, are expensive to create and a finite system resource, Channels, are a lightweight abstraction over connections.

Quoting the RabbitMQ documentation, “AMQP 0-9-1 connections are multiplexed with channels that can be thought of as ‘lightweight connections that share a single TCP connection’”.

In the Erlang world, you will typically assign a channel per worker process, on other platforms you would assign a channel per-thread.

Basic steps involved in using AMQP

Open a connection
{:ok, connection} =  AMQP.Connection.open(host: "localhost", port: 5672)
Open a channel with that connection
{:ok, channel} = AMQP.Channel.open(connection)
Bind the channel to a queue via an exchange
AMQP.Queue.bind(channel, "test_queue", "test_exchange")
(Listening) Subscribe to the queue
AMQP.Queue.subscribe (channel, "test_queue", fn(payload, meta) -> IO.puts("Received: #{payload}") end)
(Sending) Publish message to queue
AMQP.Basic.publish(channel, "test_exchange", "", "Hello, World!"

The process tree

A supervision tree diagram is a good way to get an overview of a BEAM system, so let’s start by examining the supervision tree of ex_rabbit_pool.

  • - PoolSupervisor supervises the ConnectionPool
  • - ConnectionPool manages a collection of ConnectionWorker processes
  • - Each ConnectionWorker manages a RabbitConnection
  • - ConnectionWorker uses the RabbitConnection to create a pool of RabbitChannels

The principal components are the PoolSupervisor and RabbitConnection. We will examine the implementation of both components over the course of the following sections.

Defining the PoolSupervisor

First, we define a top-level supervisor, PoolSupervisor, which will be responsible for managing the connection pool. PoolSupervisor is intended to be started within an application so we leave the start-up management to the application developer (here’s how we manage the pool supervisor in Builder).

PoolSupervisor provides an exported function, start_link/1 which takes as arguments both the RabbitMQ connection parameters and connection pool configuration.

defmodule ExRabbitPool.PoolSupervisor do
  use Supervisor

  alias ExRabbitPool.Worker.SetupQueue

  @type config :: [rabbitmq_config: keyword(), rabbitmq_conn_pool: keyword()]

  @spec start_link(config()) :: Supervisor.on_start()
  def start_link(config) do
    Supervisor.start_link(__MODULE__, config)
  end

  @impl true
  def init(config) do
    children = []
    opts = [strategy: :one_for_one]
    Supervisor.init(children, opts)
  end
end

if you are not familiar with poolboy you can read a good introduction over at Elixir School, continuing on, here is the pool configuration that we will use:

[
  rabbitmq_conn_pool: [
    name: {:local, :connection_pool},
    worker_module: ExRabbitPool.Worker.RabbitConnection,
    size: 2,
    max_overflow: 0
  ]
]

Note the attribute worker_module which is a GenServer module, instances of which will be managed as the pooled resource, in this case, RabbitConnection is the GenServer in charge of connecting to RabbitMQ.

We now extract the RabbitMQ and pool configuration from the start_link/1 parameters. AMQP provides helpful defaults for managing its connections so we only need to pass the RabbitMQ configuration to AMQP.

We do so, and configure poolboy to manage our connection pool:

rabbitmq_conn_pool = Keyword.get(config, :rabbitmq_conn_pool)
rabbitmq_config = Keyword.get(config, :rabbitmq_config, [])
{_, pool_id} = Keyword.fetch!(rabbitmq_conn_pool, :name)
:poolboy.child_spec(pool_id, rabbitmq_conn_pool, rabbitmq_config)

Let’s take a look at the full implementation:

defmodule ExRabbitPool.PoolSupervisor do

  use Supervisor

  alias ExRabbitPool.Worker.SetupQueue

  @type config :: [rabbitmq_config: keyword(), rabbitmq_conn_pool: keyword()]

  @spec start_link(config()) :: Supervisor.on_start()
  def start_link(config) do
    Supervisor.start_link(__MODULE__, config)
  end

  @impl true
  def init(config) do
    rabbitmq_conn_pool = Keyword.get(config, :rabbitmq_conn_pool)
    rabbitmq_config = Keyword.get(config, :rabbitmq_config, [])
    {_, pool_id} = Keyword.fetch!(rabbitmq_conn_pool, :name)
    children = [
      :poolboy.child_spec(pool_id, rabbitmq_conn_pool, rabbitmq_config)
    ]
    opts = [strategy: :one_for_one]
    Supervisor.init(children, opts)
  end
end

We continue by defining the worker_module responsible for handling the connection to RabbitMQ. The worker_module will hold the connection, a list of multiplexed channels to RabbitMQ, a corresponding list of monitors (we will explain their purpose later in this post), an adapter (so we can plug a stub implementation later on for testing purposes) and the configuration so we can configure some parts of our application dynamically.

Implementing the RabbitConnection

With these attributes in place, we create a State module with an associate state struct to represent the internal state of our RabbitConnection GenServer.

defmodule ExRabbitPool.Worker.RabbitConnection do
  use GenServer

  defmodule State do
    @type config :: keyword()

    @enforce_keys [:config]
    @type t :: %__MODULE__{
            adapter: module(),
            connection: AMQP.Connection.t(),
            channels: list(AMQP.Channel.t()),
            monitors: [],
            config: config()
          }

    defstruct 
              adapter: ExRabbitPool.RabbitMQ,
              connection: nil,
              channels: [],
              config: nil,
              monitors: []
     end

  def start_link(config) do
    GenServer.start_link(__MODULE__, config, [])
  end

In the init/1 callback, we send an asynchronous: connect message so the connection to RabbitMQ will be initialised separately without blocking the GenServer startup phase (supervisors create the child processes sequentially, and expect them to start very quickly). We also trap exits so all linked connections and multiplexed channels can be restarted by this worker when they crash. Take a look at our default adapter for the full implementation.

def init(config) do
  Process.flag(:trap_exit, true)
  send(self(), :connect)

  # split our options from the RabbitMQ client adapter
  {opts, amqp_config} = Keyword.split(config, [:adapter])
  adapter = Keyword.get(opts, :adapter, ExRabbitPool.RabbitMQ)

  {:ok, %State{adapter: adapter, config: amqp_config}}
end

Now we implement our first GenServer callback, handle_info to handle the: connect message. Within the handler, we open the connection to RabbitMQ using the adapter.

In the case where we raise an error, we can schedule another retry or stop the GenServer.

In order to retry, we attempt to reconnect to RabbitMQ asynchronously using Process.send_after(self(), :connect, 1000).

If the connection is established successfully we create and start the required RabbitMQ channels, create another pool inside our worker for those created channels and link them to our process.

We need to link the channels to our process so that if a client closes a channel or a channel crashes we can respond by creating another in order to maintain the channel pool at the same size, we then store them in the RabbitConnection state for later reuse

Connecting to RabbitMQ and opening the channels

  def handle_info(:connect, %{adapter: adapter, config: config} = state) do
    case adapter.open_connection(config) do
      {:error, reason} -> schedule_connect(config)
        {:noreply, state}
      {:ok, %{pid: pid}} ->
        true = Process.link(pid)
        num_channels = Keyword.get(config, :channels, 1000)
        channels =
          do_times(num_channels, 0, fn ->
            {:ok, channel} = start_channel(adapter, connection)
            true = Process.link(channel.pid)

            channel
          end)
        {:noreply, %State{state | connection: connection, channels: channels}}
    end
  end

  @spec do_times(non_neg_integer(), non_neg_integer(), (() -> any())) :: [any()]
  defp do_times(limit, counter, _function) when counter >= limit, do: []

  defp do_times(limit, counter, function) do
    [function.() | do_times(limit, 1 + counter, function)]
  end

When creating the channels we need to ensure the connection process is alive or return an error instead then we open a channel using our client adapter and return it’s result

  @spec start_channel(module(), AMQP.Connection.t()) :: {:ok, AMQP.Channel.t()} | {:error, any()}
  defp start_channel(client, connection) do
    if Process.alive?(connection.pid) do
      case client.open_channel(connection) do
        {:ok, _channel} = result ->
          Logger.info("[Rabbit] channel connected")
          result

        {:error, reason} = error ->
          Logger.error("[Rabbit] error starting channel reason: #{inspect(reason)}")
          error

        error ->
          Logger.error("[Rabbit] error starting channel reason: #{inspect(error)}")
          {:error, error}
      end
    else
      {:error, :closing}
    end
  end

Now we have a pool of connections to RabbitMQ and each connection has a pool of channels that clients can check out and check-in again, but at this moment we haven’t yet implemented those features, let’s do that now.

For the checkout_channel handler, we also need to handle some edge cases.

Firstly, the case when we are still unable to connect to RabbitMQ. In such a situation we need to tell the client to retry later with a {:error,:disconnected} result.

Secondly, the situation where there are no channels in the pool, this can happen when channels are already checked out by other clients - in such a situation we have a couple of options, either we can use the async/reply pattern to block and wait a period of time for the new channel to be created or we can return {:error, :out_of_channels} which is simpler and pushes the retry handling decision to the user, to retry later or fail immediately.

After we have covered the edge cases when checking out a channel, we can proceed with our implementation of the actual checkout which does the following: it will monitor the client which is claiming the channel, this way if a client crashes we can return the claimed channel back to the pool so another client can reuse it, return {:ok, channel} and then save the monitor reference with the assigned channel into the RabbitConnection state for safe keeping.

def handle_call(:checkout_channel, _from, %State{connection: nil} = state) do
  {:reply, {:error, :disconnected}, state}
end

def handle_call(:checkout_channel, _from, %{channels: []} = state) do
  {:reply, {:error, :out_of_channels}, state}
end

def handle_call(
      :checkout_channel,
      {from_pid, _ref},
      %{channels: [channel | rest], monitors: monitors} = state
    ) do
  monitor_ref = Process.monitor(from_pid)

  {:reply, {:ok, channel},
    %State{state | channels: rest, monitors: [{monitor_ref, channel} | monitors]}}
end

We now implement the functionality to return a channel back into the pool, doing so requires the following steps:

  • - Remove the channel from the list of channels
  • - Remove the monitor from the client holding the connection
  • - Unlink the channel from our connection process
  • - Delete the monitor from the RabbitConnection state list of monitored processes
  • - Close the channel
  • - Start a new channel

You may notice that we are stopping and creating a new channel every single time we return a channel into the pool, and this is because when a client uses a channel it can change its state, that means, channels are stateful, that’s why we need to create a new channel to replace the old one so we don’t have weird errors if we were reusing channels

  @impl true
  def handle_cast(
        {:checkin_channel, %{pid: pid} = old_channel},
        %{connection: conn, adapter: adapter, channels: channels, monitors: monitors} = state
      ) do
    # only start a new channel when checkin back a channel that isn't removed yet
    # this can happen when a channel crashed or is closed when a client holds it
    # so we get an `:EXIT` message and a `:checkin_channel` message in no given
    # order
    if find_channel(pid, channels, monitors) do
      new_channels = remove_channel(channels, pid)
      new_monitors = remove_monitor(monitors, pid)

      case replace_channel(old_channel, adapter, conn) do
        {:ok, channel} ->
          {:noreply, %State{state | channels: [channel | new_channels], monitors: new_monitors}}

        {:error, :closing} ->
          # RabbitMQ Connection is closed. nothing to do, wait for reconnection
          {:noreply, %State{state | channels: new_channels, monitors: new_monitors}}
      end
    else
      {:noreply, state}
    end
  end

  defp find_channel(channel_pid, channels, monitors) do
  Enum.find(channels, &(&1.pid == channel_pid)) ||
    Enum.find(monitors, fn {_ref, %{pid: pid}} ->
      pid == channel_pid
    end)
  end

  defp replace_channel(old_channel, adapter, conn) do
    true = Process.unlink(old_channel.pid)
    # omit the result
    adapter.close_channel(old_channel)

    case start_channel(adapter, conn) do
      {:ok, channel} = result ->
        true = Process.link(channel.pid)
        result

      {:error, _reason} = error ->
        error
    end
  end

We now have a reasonably complete connection worker, but we still need to implement error handling for crashing connections, channels crashing/closing and exceptions raised within the clients.

Implementing a handler for crashed connections

As we already have our worker process linked to the RabbitMQ connection process, we will receive a message corresponding to {:EXIT, pid, reason} if the connection process terminates. We pattern match to ensure the failing process pid is the same as the connection process pid, discard the connection and attempt to schedule reconnection in the background using Process.send_after/3.

def handle_info({:EXIT, pid, reason}, %{connection: %{pid: pid}, config: config} = state) do
  Process.send_after(self(), :connect, 1000)
  {:noreply, %State{state | connection: nil}}
end

In the case where the connection crashes and we have channels linked to the connection process, we will receive messages informing us about the crashed channels.

We must handle the connection crash in two ways, firstly where the connection already crashed and is now nil, and secondly where the connection remains active but a channel crashed or was closed.

Handling a crashed channel

Let’s implement the first, where the connection already crashed. We pattern match on the nil connection, then we remove the crashed pid from the channels list and remove any monitor associated with that process identifier. Done.

def handle_info(
      {:EXIT, pid, reason},
      %{connection: nil, channels: channels, monitors: monitors} = state
    ) do
  Logger.error("[Rabbit] connection lost, removing channel reason: #{inspect(reason)}")
  new_channels = remove_channel(channels, pid)
  new_monitors = remove_monitor(monitors, pid)
  {:noreply, %State{state | channels: new_channels, monitors: new_monitors}}
end

defp remove_channel(channels, channel_pid) do
  Enum.filter(channels, fn %{pid: pid} ->
    channel_pid != pid
  end)
end

defp remove_monitor(monitors, channel_pid) when is_pid(channel_pid) do
  monitors
  |> Enum.find(fn {_ref, %{pid: pid}} ->
    channel_pid == pid
  end)
  |> case do
    # if nil means DOWN message already handled and monitor already removed
    nil ->
      monitors

    {ref, _} = returned ->
      true = Process.demonitor(ref)
      List.delete(monitors, returned)
  end
end

Handling the second case when the connection remains open but a channel crashed is the same as handling the case of a crashed connection with the additional requirement that we need to create another channel, link it to our worker, and add the channel to the pool.

  @impl true
  def handle_info(
        {:EXIT, pid, reason},
        %{channels: channels, connection: conn, adapter: adapter, monitors: monitors} = state
      ) do
    Logger.warn("[Rabbit] channel lost reason: #{inspect(reason)}")
    # don't start a new channel if crashed channel doesn't belongs to the pool
    # anymore, this can happen when a channel crashed or is closed when a client holds it
    # so we get an `:EXIT` message and a `:checkin_channel` message in no given
    # order
    if find_channel(pid, channels, monitors) do
      new_channels = remove_channel(channels, pid)
      new_monitors = remove_monitor(monitors, pid)

      case start_channel(adapter, conn) do
        {:ok, channel} ->
          true = Process.link(channel.pid)
          {:noreply, %State{state | channels: [channel | new_channels], monitors: new_monitors}}

        {:error, :closing} ->
          # RabbitMQ Connection is closed. nothing to do, wait for reconnections
          {:noreply, %State{state | channels: new_channels, monitors: new_monitors}}
      end
    else
      {:noreply, state}
    end
  end

Now we have everything covered for handling connection and channel errors, crashes and closes, but we still need to implement the logic for when a client being monitored crashes without returning the channel back to the pool, in this case, we should remove the client from the monitors list and return the channel to the active channels list.

  @impl true
  def handle_info(
        {:DOWN, down_ref, :process, _, _},
        %{channels: channels, monitors: monitors, adapter: adapter, connection: conn} = state
      ) do
    monitors
    |> Enum.find(fn {ref, _chan} -> down_ref == ref end)
    |> case do
      nil ->
        {:noreply, state}

      {_ref, old_channel} = returned ->
        new_monitors = List.delete(monitors, returned)

        case replace_channel(old_channel, adapter, conn) do
          {:ok, channel} ->
            {:noreply, %State{state | channels: [channel | channels], monitors: new_monitors}}

          {:error, :closing} ->
            # RabbitMQ Connection is closed. nothing to do, wait for reconnection
            {:noreply, %State{state | channels: channels, monitors: new_monitors}}
        end
    end
  end

Now that we have covered everything related to connection and channel handling we need to implement an API for our library.

This will require us to add some convenience functions for our GenServer and create an API layer to perform the work of getting a connection worker out of the connection pool and executing a function in the context of a channel for us.

Firstly, we define functions for checking channels in and out of the pool:

def checkout_channel(pid) do
  GenServer.call(pid, :checkout_channel)
end

def checkin_channel(pid, channel) do
  GenServer.cast(pid, {:checkin_channel, channel})
end

Having done so, we then proceed to create our library API, where we define functions for retrieving a connection worker from the pool, and to execute a function in the context of a channel. When checking out a connection worker we don’t care about isolating access to each process - we use a pool purely in order to spread load (pool config strategy :fifo).

The supplied function will receive one of:

  • - A tuple containing :ok and a channel or
  • - An error tuple which the user can deal with as they please.

We also define the basic functions for checking in and out a channel manually for a connection worker.

defmodule ExRabbitPool do
  alias ExRabbitPool.Worker.RabbitConnection, as: Conn

  @type f :: ({:ok, AMQP.Channel.t()} | {:error, :disconected | :out_of_channels} -> any())

  @spec get_connection_worker(atom()) :: pid()
  def get_connection_worker(pool_id) do
    conn_worker = :poolboy.checkout(pool_id)
    :ok = :poolboy.checkin(pool_id, conn_worker)
    conn_worker
  end

  @spec with_channel(atom(), f()) :: any()
  def with_channel(pool_id, fun) do
    pool_id
    |> get_connection_worker()
    |> do_with_conn(fun)
  end

  def checkout_channel(conn_worker) do
    Conn.checkout_channel(conn_worker)
  end

  def checkin_channel(conn_worker, channel) do
    Conn.checkin_channel(conn_worker, channel)
  end

    defp do_with_conn(conn_worker, fun) do
    case checkout_channel(conn_worker) do
      {:ok, channel} = ok_chan ->
        try do
          fun.(ok_chan)
        after
          :ok = checkin_channel(conn_worker, channel)
        end

      {:error, _} = error ->
        fun.(error)
    end
  end
end

With this code implemented, we now want to quickly verify our code in the Elixir interactive console IEX, but before we do so, we’ll need access to a running RabbitMQ instance.

Docker makes this trivial to do.

Let’s work our way through the required steps.

First, we pull the RabbitMQ Docker image from the Docker hub:

docker pull rabbitmq:3.7.7-management

Then we run the RabbitMQ image in another terminal in the foreground, mapping both its web management interface on port 15672 and its message port 5672, to the host loopback interface.

docker run --rm --hostname bugs-bunny --name roger_rabbit -p 5672:5672 -p15672:15672 rabbitmq:3.7.7-management

After waiting for RabbitMQ initialization to complete, we proceed to copy the following code into the Elixir console in order to verify that everything works as expected:

First the configuration:

rabbitmq_config = [channels: 1]

rabbitmq_conn_pool = [
  name: {:local, :connection_pool},
  worker_module: ExRabbitPool.Worker.RabbitConnection,
  size: 1,
  max_overflow: 0
]

Then we create an instance of the PoolSupervisor:

{:ok, pid} =
ExRabbitPool.PoolSupervisor.start_link(
    rabbitmq_config: rabbitmq_config,
    rabbitmq_conn_pool: rabbitmq_conn_pool
  )

And finally, we verify everything is working by publishing a message to the AMQP queue “ex_rabbit_pool” via the same channel we published it on.

ExRabbitPool.with_channel(:connection_pool, fn {:ok, channel} ->
  queue = "ex_rabbit_pool"
  exchange = "my_exchange"
  routing_key = "example"
  {:ok, _} = AMQP.Queue.declare(channel, queue, auto_delete: true, exclusive: true)
  :ok = AMQP.Exchange.declare(channel, exchange, :direct, auto_delete: true, exclusive: true)
  :ok = AMQP.Queue.bind(channel, queue, exchange, routing_key: routing_key)
  :ok = AMQP.Basic.publish(channel, exchange, routing_key, "Hello World!")
  {:ok, msg, _} = AMQP.Basic.get(channel, queue, no_ack: true)
  IO.puts(msg)
end)

The message “Hello World!” is printed to the console. Tada! The library works.

If you want to see an example of a real-life consumer, take a look at the implementation of the Buildex Jobs Consumer.

Future work and improvements

There are some nice enhancements we would like to make and would be very happy to accept quality contributions. All feedback is welcome, this would be a really nice Elixir project to make your first open source contribution!

  • - Moving the monitors logic to ETS
  • - Implementing backoff algorithms for handling reconnections
  • - Overflow support for the channels pool
  • - Support for async/reply channel checkout
  • - A circuit breaker to fail fast after some amount of retrials to connect to RabbitMQ

Feel free to explore the source code of this blog post here and give it a try!

Credits

Special thanks to Bryan Hunt for his help editing this blog and reviewing all the code that was produced while constructing this library, also special thanks to our RabbitMQ experts for reviewing our code and ensuring we were following the best practices when handling connections and channels inside our library.

RabbitMQ Summit

Last year was a success. In November 2018 the first RabbitMQ Summit ever was held in London with visitors from all over the globe (videos from all talks here). This year - to make a good habit out of it, Erlang Solutions and 84codes (CloudAMQP) invites you to the second edition of RabbitMQ Summit in London. Find out more info or get your ticket on the official website https://rabbitmqsummit.com/

We thought you might also be interested in:

Our RabbitMQ help

Erlang & Elixir use cases

How to manage RabbitMQ queues

Go back to the blog

×

Thank you for your message

We sent you a confirmation email to let you know we received it. One of our colleagues will get in touch shortly.
Have a nice day!