Erlang

Receiving messages in Elixir, or a few things you need to know in order to avoid performance issues

by Oleg Tarasenko

We’re kicking off #ElixirOverload with Oleg Tarasenko’s post on receiving messages in Elixir! What can you do to avoid common mistakes that plague developers?

Here, Oleg makes a performance comparison of message processing depending on the process mailbox size, amongst other fantastic insights.

This is just the beginning. We are dedicating one week to the brilliant Elixir community with themed content all about Elixir. We have four fresh blog posts with Oleg’s here whetting the appetite.

Why have we gone all Elixir?

As if there needs to be an excuse to talk Erlang OR Elixir, but in this case, we are offering new Elixir Architecture Sessions at Erlang Solutions.

In addition, we are present and counting at ElixirConf US this week with one of our senior developers Claudio Ortolina there in the thick of it listening to fantastic talks over the four days. You can also request Empirical Monkeys training with friend of Erlang Solutions, Rafal Studnicki.

If you have any questions for Oleg…and later on for Joe and Bart, you can contact us at general@erlang-solutions.com. Keep up-to-date with all things #ElixirOverload and beyond through our Erlang Solutions Twitter, including Claudio’s #TwitterTakeover at ElixirConf 2018.

As you know Elixir programs use processes to run pretty much everything, and in order to communicate between processes, Elixir uses message passing. In this blog post we cover scenarios that could result in degraded messaging performance, which in turn can ruin your overall application performance. Sounds interesting? Of course it does! Find out more below…

Sending and receiving messages

Messages are sent using the send/2 function and received using the receive do construct.

In practice, the simplest way to observe the behaviour of sending and receiving messages is to open up an Elixir shell and execute the following:

iex(1)> send(self(), :test_message)
:test_message

The example code shown above will send a :test_message atom to the mailbox of the current shell process.

Let’s send several other atoms to ourselves and see what happens when we start reading them back from our process mailbox:

iex(2)> send(self(), :test_message1)
:test_message1
iex(3)> send(self(), :test_message2)
:test_message2
iex(4)> send(self(), :test_message3)
:test_message3
iex(5)> :erlang.process_info(self(), :message_queue_len)
{:message_queue_len, 4}

As we can see from the snippet above, every time we send a message to a particular process, it’s stored in that process’ mailbox. We now have 4 messages; lets fetch them from the mailbox using the receive construct!

iex(8)> receive do msg -> IO.puts("Received message: #{inspect(msg)}") end
Received message: :test_message
:ok
iex(9)> receive do msg -> IO.puts("Received message: #{inspect(msg)}") end
Received message: :test_message1
:ok
iex(10)> receive do msg -> IO.puts("Received message: #{inspect(msg)}") end
Received message: :test_message2
:ok
iex(11)> receive do msg -> IO.puts("Received message: #{inspect(msg)}") end
Received message: :test_message3
:ok
iex(12)> receive do msg -> IO.puts("Received message: #{inspect(msg)}") end

As you can see, messages are received in the same order they were transmitted. You can also see that the last receive blocks the shell, which is left waiting to fetch the next message from the process mailbox.

A closer look at the receive block

Elixir’s receive macro is used in the following way:

receive do
  pattern1 -> :process_message_pattern1
  pattern2 -> :process_message_pattern2
  _  -> :process_catch_all_case
after
 1000 -> :ok
end

This code takes the first message from the mailbox and will then try to match it against all the patterns defined in the receive block. If the first message can’t match both pattern1 and pattern2, it will be matched by the catch all () case, and :process_catch_all_case will be returned in this case.

Finally, if the process’ mailbox is empty, the code will block new messages to arrive and continue the execution after the timeout interval (1000 milliseconds) expires.

This process can be visualised in the following diagram:

Receiving messages with “a priority”

Let’s now look at another example of the receive construct:

receive do
  pattern1 -> :process_message_pattern1
  pattern2 -> :process_message_pattern2
after
 timeout -> :ok
end

Despite the visual similarity, there is a tiny difference that will make the code act in a completely different way. With the removal of the catch all case, the following will happen:

  1. The first message will be taken from the mailbox
  2. The code will try to match the message against pattern1 and pattern2
  3. If one option is a match, the appropriate branch will be evaluated
  4. If neither pattern matches, the code will perform steps 1-2 for the entire mailbox, placing all processed but unmatched messages into a temporary space (see more details about the save queue in the following article).

The above algorithm can be visualised in the following diagram:

This approach can be used to implement priority-based message processing. Let’s consider the following example:

defmodule MailBoxPriorityExample do
  def run() do
    pid = spawn(&recv/0)
    send(pid, :message1)
    send(pid, :message2)
    send(pid, :message3)
    :ok
  end

  def recv() do
    receive do
      :message3 -> IO.puts("Got message 3")
    end

    receive do
      :message2 -> IO.puts("Got message 2")
    end

    receive do
      :message1 -> IO.puts("Got message 1")
    end

    :ok
  end
end

The above code will process mailbox messages in the reverse order:

iex(1)> MailBoxPriorityExample.run
Got message 3
Got message 2
Got message 1
:ok

This prioritised receive is also known as a selective receive.

The cost of using a selective receives

As you can see from diagram 2, the selective receive will scan the entire process mailbox in order to dequeue a matching message. This is not a huge issue when your processes are not under heavy load. However, as soon as other parts of your subsystem are actively sending messages to your process, it can quickly become a bottleneck.

Let’s build an example that illustrates the dependency between the mailbox queue length and performance of the selective receive.

In order to prepare this experiment we have created the following code snippet:

defmodule SelectiveReceive do
  def setup(num_messages, num_stale_messages) do
    stats_pid = spawn(fn -> stats(num_messages) end)
    recv_pid = spawn(fn -> recv(stats_pid) end)

    # Fill recv process with unprocessable messages
    Enum.map(1..num_stale_messages, fn _ -> send(recv_pid, :unexpected) end)

    # Send regular messages to recv process
    Enum.each(1..num_messages, fn _ -> send(recv_pid, :expected) end)
  end

  # Receive :message 
  def recv(stats_pid) do
    receive do
      :expected -> send(stats_pid, :ok)
    end

    recv(stats_pid)
  end


  # Receive messages from receiver, count total time.
  def stats(num) do
    ts1 = Time.utc_now()

    Enum.each(1..num, fn _ ->
      receive do
        _ -> :ok
      end
    end)

    ts2 = Time.utc_now()
    diff = Time.diff(ts2, ts1, :millisecond)
    rps = Float.ceil(num / diff * 1000, 2)

    IO.puts("Throughput is: #{rps} requests per second")
  end
end

The chart below shows a correlation between the size of the mailbox and the decreased throughput of the selective receive.

Please note that the numbers can vary from machine to machine. It’s also worth mentioning that the example itself is somewhat synthetic, as the processes were just sending messages without performing any processing (a somewhat unlike real-world scenario).

Ok, but what about the Real World?

As previously discussed the above example is not representative of real-world scenarios due to its synthetic nature so what about the real-world?

The following steps would represent a more typical real life scenario: You have one process which normally handles X requests per second; You have a short spike of incoming messages (due to some unexpected external factor) The queue size grows and now your process is 40% slower The queue size continues to grow, despite the fact that the spike of activity has already finished and causes a further slowdown of the process.

Inside OTP?

You may think. “Ok, it doesn’t look like I would want to use the selective receive. Why would I want it?”.

In reality people are using selective receive for to numerous reasons. Here are some examples of selective receives usage inside OTP:

https://github.com/erlang/otp/blob/master/lib/mnesia/src/mnesia_locker.erl#L133-L144 https://github.com/erlang/otp/blob/master/lib/mnesia/src/mnesia_locker.erl#L770-L800 https://github.com/erlang/otp/blob/master/lib/mnesia/src/mnesia_log.erl#L642-L644 https://github.com/erlang/otp/blob/master/lib/mnesia/src/mnesia_checkpoint.erl#L800-L854 https://github.com/erlang/otp/blob/master/lib/sasl/src/release_handler.erl#L1659-L1664 https://github.com/erlang/otp/blob/master/lib/ssl/src/inet_tls_dist.erl#L253-L260 https://github.com/erlang/otp/blob/master/lib/ssl/src/inet_tls_dist.erl#L409-L424

Conclusion

Selective receive is an interesting functionality that comes built into Erlang/Elixir. As with every other tool it has both strengths and weaknesses. Selective receive provides some advantages when working with relatively small message boxes (namely prioritised message processing), however, using selective receive without being aware of the potential costs can put your overall application stability at risk.

It doesn’t have to end here…

We’ve got loads of Elixir content, especially this week with #ElixirOverload, so if you’ve enjoyed this article, chances are you’ll love these:

Keep your eyes peeled for more Elixir based content in the near future on our blog and our social media.

References Elixir Processes from elixir-lang.org: https://elixir-lang.org/getting-started/processes.html

Go back to the blog

×

Thank you for your message

We sent you a confirmation email to let you know we received it. One of our colleagues will get in touch shortly.
Have a nice day!