Rabbit’s Anatomy - Understanding Topic Exchanges

Topic Exchange Intro/Overview

In RabbitMQ (and AMQP in general), an exchange is the abstraction and routing entity to which messages are published to, from external connecting applications. There are four kinds of exchange types, one of which is the TOPIC exchange. Amongst these four exchange types, the TOPIC exchange offers the most flexible routing mechanism. Queues are bound to TOPIC exchanges with the routing key. This is a pattern matching process which is used to make decisions around routing messages. It does this by checking if a message’s Routing Key matches the desired pattern.

A Routing Key is made up of words separated by dots, e.g.: floor_1.bedroom.temperature. The Binding Key for Topic Exchanges is similar to a Routing Key, however there are two special characters, namely the asterisk * and hash #. These are wildcards, allowing us to create a binding in a smarter way. An asterisk * matches any single word and a hash # matches zero or more words. Here are examples of patterns to match on messages from:

  • - all devices on first floor in bedroom: floor_1.bedroom.*,
  • - all devices on first floor floor_1.#.

It’s clear that using a Topic Exchange allows for much simpler and more specific Routing. Now let’s take a look at how a Topic Exchange actually works.

Trie

Understanding how a trie data structure is a key to understanding how a Topic Exchange works. The trie data structure is a tree that holds ordered data and is typically used for storing string values. Each node in the tree represents a prefix of a string and holds links to child nodes, which share the same prefix. The child nodes are addressed with the character following the prefix for the node.

This data structure allows us to search for specific strings independent of the structure’s size. The characters of a string are used to traverse the tree while searching it. The trie is used for storing Binding Keys in a Topic Exchange. A Binding Key is split by dots and all the string parts are used as pointers to the next node. Every time a new binding is added to a Topic Exchange, the trie associated with it is updated. And each time a new message has to be routed, the trie is queried to look for the message’s destinations.

Implementation of the trie

A Topic Exchange trie is implemented on top of Mnesia, the built-in distributed database of Erlang. Nodes and edges of a trie are stored in rabbit_topic_trie_node and rabbit_topic_trie_edge tables respectively.

#topic_trie_node{
    trie_node = #trie_node{
        exchange_name,
        node_id
    },
    edge_count,
    binding_count
}
#topic_trie_edge{
    trie_edge = #trie_edge{
        exchange_name,
        node_id, % parent
        word
    },
    node_id % child
}

In this case, trie_node or trie_edge records are the primary keys used to identify records. Both nodes and edges are assigned to one particular Topic Exchange by specifying an exchange_name field in primary key. Nodes are also used to identify bindings, because they are not stored directly in the nodes table you will need to obtain them using node_id from the rabbit_topic_trie_binding table. Edges store information about the connections between parent and child nodes. Edges also contain the part of the Binding Key (word), which is used to traverse the tree. Therefore, traversing through the tree requires a sequence of Mnesia queries.

Topic Exchange internals

An Exchange type is created by implementing the rabbit_exchange behaviour. In the context of tries in a Topic Exchange, the are two interesting operations. Namely, add_binding/3 and route/2, the first implements the adding of a new binding to the structure and the latter is used to determine target for routing.

Binding Operation

The arguments needed to create a binding are:

  • - source Exchange
  • - Binding Key
  • - destination

Every trie starts with the root node,this represents the empty Binding Key. It makes sense, as an empty string is a prefix for any string. The first operation is pretty straightforward - the Binding Key has to be split by dots . and stored in a list. For example the key “a.b.c” will be transformed to [“a”, “b”, “c”]. Let’s call the list Words for later. It will be used for traversing the data structure. Then, recursively the tree is traversed down, starting with root as a current node.

  1. Repeat until the Words list is empty. 1.1. Take the head part of Words list and query Mnesia for child matching it. 1.2. If node is found, use it as a new current node and go to 1.1 with the rest of Words list. Otherwise go to 2.
  2. Create child nodes using rest of the Words list.
  3. When Words list is exhausted, create a rabbit_topic_trie_binding for the current node. It signals that there are bindings associated with it.

Here is an example binding operation. Let’s assume there is a Topic Exchange with two existing bindings: floor_1.bedroom.temperature and floor_1.#. our example trie structure would look like this:

Let’s add a new binding with the Binding Key floor_1.bedroom.air_quality. First we split it with dots: [floor_1, bedroom, air_quality]. There are already keys floor_1 and bedroom, but the latter one is missing. Therefore a new node has to be created. Then, the rest of the key [air_quality] is used to create nodes. Finally a new binding is associated with the newly created node and we have a structure that looks like this:

As you can see, to insert the new node, three read queries were executed to retrieve the edge between the last pair of nodes: {root, floor1}, {floor_1, bedroom} and {bedroom, air_quality}. However, the latter edge was not present, so 2 write operations were executed: the first updates the edge_count for the bedroom node and the second inserts a new edge. At this point the trie structure is ready to create the final node. Therefore, another two write operations will need to be completed:

  • - One to create the actual node, which corresponds to the given Binding Key,
  • - The second to create the entry in rabbit_topic_trie_binding, which bounds the node with the destination for messages.

The Mnesia tables used here are an ordered_set type, which means that it is implemented with a binary search tree. Thus, both read and write operations have complexity O(log(n)), where n is the size of the table. It can be observed, that first phase of traversing through the trie requires:

  1. read operation when, a node exists
  2. write operation when a node is not existing.

The final phase of inserting the actual binding requires two extra operations. In a worst case scenario, where there are no nodes and all of them need to be created, the complexity is O(n*2*log(m) + 2*log(k)), where n is the length of the Binding Key, m is the number of nodes in the table and k is number of actual bindings. The m and k are global, so the efficiency of queries depends on the global number of bindings/nodes, not just for the given Exchange. For simplicity it is assumed that the number of edges and nodes are equal, because in this structure (number of edges) = (number of nodes - 1).

Routing Operation

Routing happens when a new message arrives to a Topic Exchange. The trie structure needs to be queried using the Routing Key associated with the message. However, traversing through the trie is not straightforward, as wildcards * and # need to be taken in to account.

As with a binding operation, the beginning of the Routing Key is split by dots. Again, let’s call it Words list [ Word | RestW ] = Words. The process starts with a root node. Then the algorithm of discovering the binding is a recursive exploration of the tree in three ways:

  • - Look for # in child nodes. If node is found, it is considered as new root and new scans are started with all remaining parts of Words, e.g: if Words is [a,b,c], then start searching again with [a,b,c], [b,c], [c] and [].
  • - Look for * in childs nodes. If node is found, continue with found node as a root and RestW.
  • - Look for Word in child nodes. If node is found, continue with found node as a root and RestW.

Exploring will be finished for all cases when all pathways in Words is exhausted. The last step in this process is to look for extra child nodes connected through any of the hash # wildcards. This has to be done because the # wildcard stands for “zero or more”. So here is an example of the new search algorithm. Let the Topic Exchange have the following bindings: floor_1.*.air_quality, floor_1.bedroom.air_quality, floor_1.bathroom.temperature. Now, let’s examine the routing for message published with the Routing Key floor_1.bedroom.air_quality, which will match all the bindings. Here is the trie representation, where the current node is marked as blue and the number on the node represents the number of bindings.

The first step is to find out if there is a hash # child node. in the example above it is not present. Then, the asterisk * child node is queried, but it is also not present. Finally, the algorithm will find a node, matching the head of Words list - floor_1:

Now, the algorithm will consider the blue node as a new root and start again. Again, there is no hash # child, but an asterisks is found. Then, the head of the Words list is consumed and the algorithm moves down:

Here there is only one option available - air_quality:

The Words list is exhausted, so the current node is a result. There is one extra step - the hash # child node has to be queried again, because it also accepts empty lists. However, it is not found, so only the current blue node is considered to be a result. Let’s mark the found node with a green and get back to previous node:

The node was found using an asterisk, but there is one step left. It has to be checked to see if there is a bedroom child node. And, in this case there is one:

There is one word left and the child node is present:

The Words list is empty again, so current is result:

The final step is to query for any bindings associated with the bindings we found. According to the numbers on the found nodes, there are two bindings. They are the final result of the route/2 function.

Now, let’s consider another example, with hash bindings present. There are three bindings present: #.air_quality , floor_1.# and floor_1.bedroom.air_quality.#. Again the floor_1.bedroom.air_quality Routing Key will be used:

In this example we have found a hash node. This will cause the algorithm to go down to that node with all available Routing Key parts:

Let’s emphasise this again: the current node was reached via # edge, so the algorithm visits the current blue node 4 times with different Words lists. They are presented on the figure. One of the Words list is an empty one [], so this node is also appended to the results. There is no floor_1 or bedroom edge going out of this node, but there is air_quality one. So, the algorithm goes to the leaf node using the third Words list. Then:

The current Words list is empty, so this node is also a result of the search. There are no hash child nodes, so the current branch is finished. The algorithm will go back to the root node:

The only option for the algorithm to go down is the head of the Words list:

Again there is a hash child node, so it needs to be visited with all tails of the Words list. Three times in this case:

One of the Words lists is empty, so the current blue node is appended to the result list. As there are no child nodes, the algorithm goes back:

Now the algorithm will go down two times consuming all remaining words. Let’s jump directly to it:

The Words list is empty, so the current node is also part of the result. However there is also a hash # child node. According to the algorithm, if any node is considered as a result, its child hash nodes are matched. So finally there are 5 nodes found:

The final step is to find bindings associated with found nodes. The final results of route/2 function are all 3 bindings.

In term of complexity it is hard to estimate precisely. The algorithm does 3 queries for each node. The # nodes result in duplication of query paths, as it starts the whole algorithm with all remaining parts of Words. However all operations are depending on two factors - the Words list length and the total number of nodes existing in the table.

So assuming the worst case, where the bindings are: #, #.#, #.#.# ... k*#, we can see that each level will run with all possible combinations of Words, some of them will be visited many times with exactly the same Words. Then, the first node is visited n times, second is visited sum(1,n), third sum(1,sum(1,n)) and so on. We can rewrite it as:

The total number of operations is k1+k2+…+kk. When this recursive equation is unwrapped, every new level contains two times more multiplications than the previous one. The level k will contain 2k multiplications. It will be dominant in terms of complexity, so we can bound the complexity by O(2k*n*log(m)), where k is maximum trie depth, n is the length of the Words and m is the total number of nodes.

However, the above example is extreme and bindings like #.#.# make no sense. Then, the average complexity would be close to O(nlog(m)), because it makes no sense to put two subsequent hash # parts of the key. The overhead introduced by a single hash node should not be significant, because in such case the traversing trie with different Words stops after the hash edge.

Evaluation

This section will cover the performance of a Topic Exchange. The experiments will demonstrate the characteristics of a Topic Exchange under different conditions.

Synthetic tests

Two experiments will be presented, in order to experimentally confirm or reject the assumptions made in the previous Routing Operation section:

  • First, the relation between the number of bindings and routing operation time. The bindings are fixed and the routing key length is adjusted. The linear dependency is expected.
  • Secondly, the routing key length is fixed and the number of bindings is varying.

Both experiments are performed under following conditions:

  • - Tests are made on single RabbitMQ node.
  • - Routing time is measured by checking time of evaluation rabbit_exchange_type_topic:route/2.
  • - Measurements are made 50 times and average results are presented on the figures.
  • - The Bindings Keys are random, ensuring that there are no two subsequent hashes in any Binding Key.
  • - The Binding Key part has 70% chance to be a word, 20% chance to be an asterisk and 10% to be a hash.
  • - The Routing Keys are created from existing Binding Keys - For example the Routing Key with length n, will be created from existing Binding Key with the length n. Any hashes or asterisks, are replaced by random strings. It ensures that operation must traverse through at least n levels of trie.

The above figure presents the results of the three experiments. Now, let’s slightly modify the conditions to visualize the impact of a # hash key in Trie structure. There is only one binding added, which is just two subsequent hashes #.#. Then, the performance will look like this:

The red curve bends, as we expected. When there are more # bindings on the query path, the relation between the Routing Key length and query time is no longer linear. This effect can also be observed in 10k bindings series - the green curve also bends slightly. This can be explained in the same way - there are more Bindings Keys starting with a #, this increases query time for all queries.

Let’s check it in the RabbitMQ Management UI:

Here we have roughly 50 bindings like the ones above, if we replace them we will see a more linear relation and get a better overview of the impact of the performance from our hash # as seen below:

Again, the time to find relevant routes has improved. Now, let’s examine the way the number of bindings impacts query time. As we explained in the previous section, a logarithmic relation is expected:

This example also follows the expected behaviour. All the bindings are stored in a single Mnesia table. Querying any node has its own complexity. Where there are more entries in the table, the query time grows. As the table has an ordered_set type, the query time has logarithmic complexity, what is actually observed.

Summing up, the previous experiments align to the theory we started with. The expectations about the impact of Route Key length and number of bindings to the routing operation time was confirmed. The huge impact of hash a # wildcard has also been confirmed and the scale of it was presented.

Real world example

The two previous examples measured the time of a single query. While this is still valuable, it does not necessarily reflect a real world use case. The test is synthetic and focuses on a single query, but is a Topic Exchange performance overhead also observable when the overall performance of RabbitMQ is taken into account?

This section will present a performance evaluation of RabbitMQ integration to MongooseIM. MongooseIM is Erlang Solutions’ highly scalable, instant messaging server. The RabbitMQ component in MongooseIM simply reports each users’ activity, which may be:

  • - User became online/offline
  • - User sent/received message

Only sent/received messages will be discussed in this case. The Routing Key of the message activity follows simple pattern <username>.{sent, received}.

In order to evaluate the performance of the component, there was a load test designed. There were simulated XMPP clients connecting to the MongooseIM server. The simulated clients were exchanging messages with each other. Each message generated events, which were published to RabbitMQ. Then, we had a number of AMQP clients connecting to RabbitMQ, to consume generated events.

This is the outline of the experiment’s architecture:

For the purpose of this post, only results which were directly connected to a Topic performance were covered. Let’s define the key performance indicator as the Time To Delivery, the amount of time between a message being sent by a XMPP user and being received by Consumer of RabbitMQ’s queue. This value will be presented in the figures to follow.

Tests conditions were as follows:

  • - 90k XMPP users
  • - 10k AMQP consumers
  • - ~ 4,8k payloads/s from MongooseIM to RabbitMQ
  • - Payload size ~120B
  • - topic exchange with Binding Keys like user_N.*
  • - 20k bindings in total

In this case, the performance is presented in the following graph. It shows the 95th - 99th percentiles of the Time To Delivery, as well as the maximum observed Time To Delivery in a given time window.

The latter test had similar condition. The only difference was different Exchange type:

  • - 90k XMPP users
  • - 10k AMQP consumers
  • - ~ 4,8k payloads/s from MongooseIM to RabbitMQ
  • - Payload size ~120B
  • - direct exchange with Binding Keys like user_N.chat_msg_sent, user_N.chat_msg_recv
  • - 40k bindings in total

Under those condition performance was better, which is illustrated on following figure.

While the previous section showed the performance characteristics of a Topic Exchange, those examples provide an overview on a bigger scale. Both tests had identical characteristics apart from the exchange type being direct or topic (and consequently number of bindings). However, the difference in performance is significant in favor of the Direct Exchange. It allowed us to effectively decrease the Time To Delivery, which is a factor of efficiency in the case of the presented tests.

Summary

Now you’ve seen the basics of the Topic Exchange internals, a brief overview of its implementation, a theoretical overhead introduced by traversing the trie structure, as well as some performance evaluation. As it was observed, a Topic Exchange is not the fastest method and there are many factors which may influence its performance. However it is not true that Topic Exchanges are slow. In fact, they are generally fast in a typical RabbitMQ usage. These test conditions were specific. If there are a few bindings or the trie depth is not deep, the Topic Exchange overhead is usually negligible. Still, it is important to understand the underlying mechanism, as the example with MongooseIM’s RabbitMQ component presented - using different Exchange types resulted in a significant improvement in performance.

Monitor your Erlang, Elixir and RabbitMQ systems

Are you currently using Erlang, Elixir or RabbitMQ in your stack? Get full visibility of your system with WombatOAM, find out more and get a free trial on our WombatOAM page.

Few useful links

An Introduction to RabbitMQ

Get some expert help or training on RabbitMQ

Check the RabbitMQ and MongooseIM demos

Find out more about MongooseIM

Permalink

ex_rabbit_pool open source AMQP connection pool

Background

A couple of months ago we started writing an open-source continuous delivery system which we called Buildex.

Buildex is implemented as a collection of microservices which:

  • - Monitor Github repositories
  • - Respond to changes by sending notifications over AMQP (RabbitMQ)
  • - Trigger software builds running in Docker containers
  • - Expose credentials (SSH keys) to containers as volume mounts.

Buildex was born of the frustrations and limitations we faced when attempting to debug production failures in other build systems. Buildex was also influenced by security, privacy, and pricing concerns we encountered while using some of the popular commercial SaaS offerings.

Buildex Component overview

The principal components of Buildex are: The Poller, which uses the excellent Tentacat Github API to poll GitHub for new tags, the Builder, which checks out and builds the project inside Docker containers, and the Datastore which uses ueberauth OAuth to delegate authorization to Github, and allows users to configure projects for the continuous integration pipeline.

The following diagram provides a high-level overview of the Buildex micro-services and supporting infrastructure.

Although we could have used distributed Erlang for everything, we wanted to have the ability for operations and develops to easily monitor inter-service communication and define message replication as they pleased, for example with logging or integration with 3rd party services (email, auditing, manually triggering builds, etc).

We decided to use RabbitMQ for inter-service communication, which in turn led to our need for a higher level connection handling and channel management library.

ex_rabbit_pool

As part of the work to support these services, we created a RabbitMQ connection management and pooling library called ex_rabbit_pool. ex_rabbit_pool is built upon the Elixir library AMQP which is an Elixir wrapper for the RabbitMQ client rabbitmq-erlang-client, and the Erlang resource pooling library poolboy. In this post, we will take you through the lessons learnt and steps taken to implement ex_rabbit_pool, but first, a quick explanation of channels and connections.

Channels and connections

Connections, as in TCP connections, to RabbitMQ, are expensive to create and a finite system resource, Channels, are a lightweight abstraction over connections.

Quoting the RabbitMQ documentation, “AMQP 0-9-1 connections are multiplexed with channels that can be thought of as ‘lightweight connections that share a single TCP connection’”.

In the Erlang world, you will typically assign a channel per worker process, on other platforms you would assign a channel per-thread.

Basic steps involved in using AMQP

Open a connection
{:ok, connection} =  AMQP.Connection.open(host: "localhost", port: 5672)
Open a channel with that connection
{:ok, channel} = AMQP.Channel.open(connection)
Bind the channel to a queue via an exchange
AMQP.Queue.bind(channel, "test_queue", "test_exchange")
(Listening) Subscribe to the queue
AMQP.Queue.subscribe (channel, "test_queue", fn(payload, meta) -> IO.puts("Received: #{payload}") end)
(Sending) Publish message to queue
AMQP.Basic.publish(channel, "test_exchange", "", "Hello, World!"

The process tree

A supervision tree diagram is a good way to get an overview of a BEAM system, so let’s start by examining the supervision tree of ex_rabbit_pool.

  • - PoolSupervisor supervises the ConnectionPool
  • - ConnectionPool manages a collection of ConnectionWorker processes
  • - Each ConnectionWorker manages a RabbitConnection
  • - ConnectionWorker uses the RabbitConnection to create a pool of RabbitChannels

The principal components are the PoolSupervisor and RabbitConnection. We will examine the implementation of both components over the course of the following sections.

Defining the PoolSupervisor

First, we define a top-level supervisor, PoolSupervisor, which will be responsible for managing the connection pool. PoolSupervisor is intended to be started within an application so we leave the start-up management to the application developer (here’s how we manage the pool supervisor in Builder).

PoolSupervisor provides an exported function, start_link/1 which takes as arguments both the RabbitMQ connection parameters and connection pool configuration.

defmodule ExRabbitPool.PoolSupervisor do
  use Supervisor

  alias ExRabbitPool.Worker.SetupQueue

  @type config :: [rabbitmq_config: keyword(), rabbitmq_conn_pool: keyword()]

  @spec start_link(config()) :: Supervisor.on_start()
  def start_link(config) do
    Supervisor.start_link(__MODULE__, config)
  end

  @impl true
  def init(config) do
    children = []
    opts = [strategy: :one_for_one]
    Supervisor.init(children, opts)
  end
end

if you are not familiar with poolboy you can read a good introduction over at Elixir School, continuing on, here is the pool configuration that we will use:

[
  rabbitmq_conn_pool: [
    name: {:local, :connection_pool},
    worker_module: ExRabbitPool.Worker.RabbitConnection,
    size: 2,
    max_overflow: 0
  ]
]

Note the attribute worker_module which is a GenServer module, instances of which will be managed as the pooled resource, in this case, RabbitConnection is the GenServer in charge of connecting to RabbitMQ.

We now extract the RabbitMQ and pool configuration from the start_link/1 parameters. AMQP provides helpful defaults for managing its connections so we only need to pass the RabbitMQ configuration to AMQP.

We do so, and configure poolboy to manage our connection pool:

rabbitmq_conn_pool = Keyword.get(config, :rabbitmq_conn_pool)
rabbitmq_config = Keyword.get(config, :rabbitmq_config, [])
{_, pool_id} = Keyword.fetch!(rabbitmq_conn_pool, :name)
:poolboy.child_spec(pool_id, rabbitmq_conn_pool, rabbitmq_config)

Let’s take a look at the full implementation:

defmodule ExRabbitPool.PoolSupervisor do

  use Supervisor

  alias ExRabbitPool.Worker.SetupQueue

  @type config :: [rabbitmq_config: keyword(), rabbitmq_conn_pool: keyword()]

  @spec start_link(config()) :: Supervisor.on_start()
  def start_link(config) do
    Supervisor.start_link(__MODULE__, config)
  end

  @impl true
  def init(config) do
    rabbitmq_conn_pool = Keyword.get(config, :rabbitmq_conn_pool)
    rabbitmq_config = Keyword.get(config, :rabbitmq_config, [])
    {_, pool_id} = Keyword.fetch!(rabbitmq_conn_pool, :name)
    children = [
      :poolboy.child_spec(pool_id, rabbitmq_conn_pool, rabbitmq_config)
    ]
    opts = [strategy: :one_for_one]
    Supervisor.init(children, opts)
  end
end

We continue by defining the worker_module responsible for handling the connection to RabbitMQ. The worker_module will hold the connection, a list of multiplexed channels to RabbitMQ, a corresponding list of monitors (we will explain their purpose later in this post), an adapter (so we can plug a stub implementation later on for testing purposes) and the configuration so we can configure some parts of our application dynamically.

Implementing the RabbitConnection

With these attributes in place, we create a State module with an associate state struct to represent the internal state of our RabbitConnection GenServer.

defmodule ExRabbitPool.Worker.RabbitConnection do
  use GenServer

  defmodule State do
    @type config :: keyword()

    @enforce_keys [:config]
    @type t :: %__MODULE__{
            adapter: module(),
            connection: AMQP.Connection.t(),
            channels: list(AMQP.Channel.t()),
            monitors: [],
            config: config()
          }

    defstruct 
              adapter: ExRabbitPool.RabbitMQ,
              connection: nil,
              channels: [],
              config: nil,
              monitors: []
     end

  def start_link(config) do
    GenServer.start_link(__MODULE__, config, [])
  end

In the init/1 callback, we send an asynchronous: connect message so the connection to RabbitMQ will be initialised separately without blocking the GenServer startup phase (supervisors create the child processes sequentially, and expect them to start very quickly). We also trap exits so all linked connections and multiplexed channels can be restarted by this worker when they crash. Take a look at our default adapter for the full implementation.

def init(config) do
  Process.flag(:trap_exit, true)
  send(self(), :connect)

  # split our options from the RabbitMQ client adapter
  {opts, amqp_config} = Keyword.split(config, [:adapter])
  adapter = Keyword.get(opts, :adapter, ExRabbitPool.RabbitMQ)

  {:ok, %State{adapter: adapter, config: amqp_config}}
end

Now we implement our first GenServer callback, handle_info to handle the: connect message. Within the handler, we open the connection to RabbitMQ using the adapter.

In the case where we raise an error, we can schedule another retry or stop the GenServer.

In order to retry, we attempt to reconnect to RabbitMQ asynchronously using Process.send_after(self(), :connect, 1000).

If the connection is established successfully we create and start the required RabbitMQ channels, create another pool inside our worker for those created channels and link them to our process.

We need to link the channels to our process so that if a client closes a channel or a channel crashes we can respond by creating another in order to maintain the channel pool at the same size, we then store them in the RabbitConnection state for later reuse

Connecting to RabbitMQ and opening the channels

  def handle_info(:connect, %{adapter: adapter, config: config} = state) do
    case adapter.open_connection(config) do
      {:error, reason} -> schedule_connect(config)
        {:noreply, state}
      {:ok, %{pid: pid}} ->
        true = Process.link(pid)
        num_channels = Keyword.get(config, :channels, 1000)
        channels =
          do_times(num_channels, 0, fn ->
            {:ok, channel} = start_channel(adapter, connection)
            true = Process.link(channel.pid)

            channel
          end)
        {:noreply, %State{state | connection: connection, channels: channels}}
    end
  end

  @spec do_times(non_neg_integer(), non_neg_integer(), (() -> any())) :: [any()]
  defp do_times(limit, counter, _function) when counter >= limit, do: []

  defp do_times(limit, counter, function) do
    [function.() | do_times(limit, 1 + counter, function)]
  end

When creating the channels we need to ensure the connection process is alive or return an error instead then we open a channel using our client adapter and return it’s result

  @spec start_channel(module(), AMQP.Connection.t()) :: {:ok, AMQP.Channel.t()} | {:error, any()}
  defp start_channel(client, connection) do
    if Process.alive?(connection.pid) do
      case client.open_channel(connection) do
        {:ok, _channel} = result ->
          Logger.info("[Rabbit] channel connected")
          result

        {:error, reason} = error ->
          Logger.error("[Rabbit] error starting channel reason: #{inspect(reason)}")
          error

        error ->
          Logger.error("[Rabbit] error starting channel reason: #{inspect(error)}")
          {:error, error}
      end
    else
      {:error, :closing}
    end
  end

Now we have a pool of connections to RabbitMQ and each connection has a pool of channels that clients can check out and check-in again, but at this moment we haven’t yet implemented those features, let’s do that now.

For the checkout_channel handler, we also need to handle some edge cases.

Firstly, the case when we are still unable to connect to RabbitMQ. In such a situation we need to tell the client to retry later with a {:error,:disconnected} result.

Secondly, the situation where there are no channels in the pool, this can happen when channels are already checked out by other clients - in such a situation we have a couple of options, either we can use the async/reply pattern to block and wait a period of time for the new channel to be created or we can return {:error, :out_of_channels} which is simpler and pushes the retry handling decision to the user, to retry later or fail immediately.

After we have covered the edge cases when checking out a channel, we can proceed with our implementation of the actual checkout which does the following: it will monitor the client which is claiming the channel, this way if a client crashes we can return the claimed channel back to the pool so another client can reuse it, return {:ok, channel} and then save the monitor reference with the assigned channel into the RabbitConnection state for safe keeping.

def handle_call(:checkout_channel, _from, %State{connection: nil} = state) do
  {:reply, {:error, :disconnected}, state}
end

def handle_call(:checkout_channel, _from, %{channels: []} = state) do
  {:reply, {:error, :out_of_channels}, state}
end

def handle_call(
      :checkout_channel,
      {from_pid, _ref},
      %{channels: [channel | rest], monitors: monitors} = state
    ) do
  monitor_ref = Process.monitor(from_pid)

  {:reply, {:ok, channel},
    %State{state | channels: rest, monitors: [{monitor_ref, channel} | monitors]}}
end

We now implement the functionality to return a channel back into the pool, doing so requires the following steps:

  • - Remove the channel from the list of channels
  • - Remove the monitor from the client holding the connection
  • - Unlink the channel from our connection process
  • - Delete the monitor from the RabbitConnection state list of monitored processes
  • - Close the channel
  • - Start a new channel

You may notice that we are stopping and creating a new channel every single time we return a channel into the pool, and this is because when a client uses a channel it can change its state, that means, channels are stateful, that’s why we need to create a new channel to replace the old one so we don’t have weird errors if we were reusing channels

  @impl true
  def handle_cast(
        {:checkin_channel, %{pid: pid} = old_channel},
        %{connection: conn, adapter: adapter, channels: channels, monitors: monitors} = state
      ) do
    # only start a new channel when checkin back a channel that isn't removed yet
    # this can happen when a channel crashed or is closed when a client holds it
    # so we get an `:EXIT` message and a `:checkin_channel` message in no given
    # order
    if find_channel(pid, channels, monitors) do
      new_channels = remove_channel(channels, pid)
      new_monitors = remove_monitor(monitors, pid)

      case replace_channel(old_channel, adapter, conn) do
        {:ok, channel} ->
          {:noreply, %State{state | channels: [channel | new_channels], monitors: new_monitors}}

        {:error, :closing} ->
          # RabbitMQ Connection is closed. nothing to do, wait for reconnection
          {:noreply, %State{state | channels: new_channels, monitors: new_monitors}}
      end
    else
      {:noreply, state}
    end
  end

  defp find_channel(channel_pid, channels, monitors) do
  Enum.find(channels, &(&1.pid == channel_pid)) ||
    Enum.find(monitors, fn {_ref, %{pid: pid}} ->
      pid == channel_pid
    end)
  end

  defp replace_channel(old_channel, adapter, conn) do
    true = Process.unlink(old_channel.pid)
    # omit the result
    adapter.close_channel(old_channel)

    case start_channel(adapter, conn) do
      {:ok, channel} = result ->
        true = Process.link(channel.pid)
        result

      {:error, _reason} = error ->
        error
    end
  end

We now have a reasonably complete connection worker, but we still need to implement error handling for crashing connections, channels crashing/closing and exceptions raised within the clients.

Implementing a handler for crashed connections

As we already have our worker process linked to the RabbitMQ connection process, we will receive a message corresponding to {:EXIT, pid, reason} if the connection process terminates. We pattern match to ensure the failing process pid is the same as the connection process pid, discard the connection and attempt to schedule reconnection in the background using Process.send_after/3.

def handle_info({:EXIT, pid, reason}, %{connection: %{pid: pid}, config: config} = state) do
  Process.send_after(self(), :connect, 1000)
  {:noreply, %State{state | connection: nil}}
end

In the case where the connection crashes and we have channels linked to the connection process, we will receive messages informing us about the crashed channels.

We must handle the connection crash in two ways, firstly where the connection already crashed and is now nil, and secondly where the connection remains active but a channel crashed or was closed.

Handling a crashed channel

Let’s implement the first, where the connection already crashed. We pattern match on the nil connection, then we remove the crashed pid from the channels list and remove any monitor associated with that process identifier. Done.

def handle_info(
      {:EXIT, pid, reason},
      %{connection: nil, channels: channels, monitors: monitors} = state
    ) do
  Logger.error("[Rabbit] connection lost, removing channel reason: #{inspect(reason)}")
  new_channels = remove_channel(channels, pid)
  new_monitors = remove_monitor(monitors, pid)
  {:noreply, %State{state | channels: new_channels, monitors: new_monitors}}
end

defp remove_channel(channels, channel_pid) do
  Enum.filter(channels, fn %{pid: pid} ->
    channel_pid != pid
  end)
end

defp remove_monitor(monitors, channel_pid) when is_pid(channel_pid) do
  monitors
  |> Enum.find(fn {_ref, %{pid: pid}} ->
    channel_pid == pid
  end)
  |> case do
    # if nil means DOWN message already handled and monitor already removed
    nil ->
      monitors

    {ref, _} = returned ->
      true = Process.demonitor(ref)
      List.delete(monitors, returned)
  end
end

Handling the second case when the connection remains open but a channel crashed is the same as handling the case of a crashed connection with the additional requirement that we need to create another channel, link it to our worker, and add the channel to the pool.

  @impl true
  def handle_info(
        {:EXIT, pid, reason},
        %{channels: channels, connection: conn, adapter: adapter, monitors: monitors} = state
      ) do
    Logger.warn("[Rabbit] channel lost reason: #{inspect(reason)}")
    # don't start a new channel if crashed channel doesn't belongs to the pool
    # anymore, this can happen when a channel crashed or is closed when a client holds it
    # so we get an `:EXIT` message and a `:checkin_channel` message in no given
    # order
    if find_channel(pid, channels, monitors) do
      new_channels = remove_channel(channels, pid)
      new_monitors = remove_monitor(monitors, pid)

      case start_channel(adapter, conn) do
        {:ok, channel} ->
          true = Process.link(channel.pid)
          {:noreply, %State{state | channels: [channel | new_channels], monitors: new_monitors}}

        {:error, :closing} ->
          # RabbitMQ Connection is closed. nothing to do, wait for reconnections
          {:noreply, %State{state | channels: new_channels, monitors: new_monitors}}
      end
    else
      {:noreply, state}
    end
  end

Now we have everything covered for handling connection and channel errors, crashes and closes, but we still need to implement the logic for when a client being monitored crashes without returning the channel back to the pool, in this case, we should remove the client from the monitors list and return the channel to the active channels list.

  @impl true
  def handle_info(
        {:DOWN, down_ref, :process, _, _},
        %{channels: channels, monitors: monitors, adapter: adapter, connection: conn} = state
      ) do
    monitors
    |> Enum.find(fn {ref, _chan} -> down_ref == ref end)
    |> case do
      nil ->
        {:noreply, state}

      {_ref, old_channel} = returned ->
        new_monitors = List.delete(monitors, returned)

        case replace_channel(old_channel, adapter, conn) do
          {:ok, channel} ->
            {:noreply, %State{state | channels: [channel | channels], monitors: new_monitors}}

          {:error, :closing} ->
            # RabbitMQ Connection is closed. nothing to do, wait for reconnection
            {:noreply, %State{state | channels: channels, monitors: new_monitors}}
        end
    end
  end

Now that we have covered everything related to connection and channel handling we need to implement an API for our library.

This will require us to add some convenience functions for our GenServer and create an API layer to perform the work of getting a connection worker out of the connection pool and executing a function in the context of a channel for us.

Firstly, we define functions for checking channels in and out of the pool:

def checkout_channel(pid) do
  GenServer.call(pid, :checkout_channel)
end

def checkin_channel(pid, channel) do
  GenServer.cast(pid, {:checkin_channel, channel})
end

Having done so, we then proceed to create our library API, where we define functions for retrieving a connection worker from the pool, and to execute a function in the context of a channel. When checking out a connection worker we don’t care about isolating access to each process - we use a pool purely in order to spread load (pool config strategy :fifo).

The supplied function will receive one of:

  • - A tuple containing :ok and a channel or
  • - An error tuple which the user can deal with as they please.

We also define the basic functions for checking in and out a channel manually for a connection worker.

defmodule ExRabbitPool do
  alias ExRabbitPool.Worker.RabbitConnection, as: Conn

  @type f :: ({:ok, AMQP.Channel.t()} | {:error, :disconected | :out_of_channels} -> any())

  @spec get_connection_worker(atom()) :: pid()
  def get_connection_worker(pool_id) do
    conn_worker = :poolboy.checkout(pool_id)
    :ok = :poolboy.checkin(pool_id, conn_worker)
    conn_worker
  end

  @spec with_channel(atom(), f()) :: any()
  def with_channel(pool_id, fun) do
    pool_id
    |> get_connection_worker()
    |> do_with_conn(fun)
  end

  def checkout_channel(conn_worker) do
    Conn.checkout_channel(conn_worker)
  end

  def checkin_channel(conn_worker, channel) do
    Conn.checkin_channel(conn_worker, channel)
  end

    defp do_with_conn(conn_worker, fun) do
    case checkout_channel(conn_worker) do
      {:ok, channel} = ok_chan ->
        try do
          fun.(ok_chan)
        after
          :ok = checkin_channel(conn_worker, channel)
        end

      {:error, _} = error ->
        fun.(error)
    end
  end
end

With this code implemented, we now want to quickly verify our code in the Elixir interactive console IEX, but before we do so, we’ll need access to a running RabbitMQ instance.

Docker makes this trivial to do.

Let’s work our way through the required steps.

First, we pull the RabbitMQ Docker image from the Docker hub:

docker pull rabbitmq:3.7.7-management

Then we run the RabbitMQ image in another terminal in the foreground, mapping both its web management interface on port 15672 and its message port 5672, to the host loopback interface.

docker run --rm --hostname bugs-bunny --name roger_rabbit -p 5672:5672 -p15672:15672 rabbitmq:3.7.7-management

After waiting for RabbitMQ initialization to complete, we proceed to copy the following code into the Elixir console in order to verify that everything works as expected:

First the configuration:

rabbitmq_config = [channels: 1]

rabbitmq_conn_pool = [
  name: {:local, :connection_pool},
  worker_module: ExRabbitPool.Worker.RabbitConnection,
  size: 1,
  max_overflow: 0
]

Then we create an instance of the PoolSupervisor:

{:ok, pid} =
ExRabbitPool.PoolSupervisor.start_link(
    rabbitmq_config: rabbitmq_config,
    rabbitmq_conn_pool: rabbitmq_conn_pool
  )

And finally, we verify everything is working by publishing a message to the AMQP queue “ex_rabbit_pool” via the same channel we published it on.

ExRabbitPool.with_channel(:connection_pool, fn {:ok, channel} ->
  queue = "ex_rabbit_pool"
  exchange = "my_exchange"
  routing_key = "example"
  {:ok, _} = AMQP.Queue.declare(channel, queue, auto_delete: true, exclusive: true)
  :ok = AMQP.Exchange.declare(channel, exchange, :direct, auto_delete: true, exclusive: true)
  :ok = AMQP.Queue.bind(channel, queue, exchange, routing_key: routing_key)
  :ok = AMQP.Basic.publish(channel, exchange, routing_key, "Hello World!")
  {:ok, msg, _} = AMQP.Basic.get(channel, queue, no_ack: true)
  IO.puts(msg)
end)

The message “Hello World!” is printed to the console. Tada! The library works.

If you want to see an example of a real-life consumer, take a look at the implementation of the Buildex Jobs Consumer.

Future work and improvements

There are some nice enhancements we would like to make and would be very happy to accept quality contributions. All feedback is welcome, this would be a really nice Elixir project to make your first open source contribution!

  • - Moving the monitors logic to ETS
  • - Implementing backoff algorithms for handling reconnections
  • - Overflow support for the channels pool
  • - Support for async/reply channel checkout
  • - A circuit breaker to fail fast after some amount of retrials to connect to RabbitMQ

Feel free to explore the source code of this blog post here and give it a try!

Credits

Special thanks to Bryan Hunt for his help editing this blog and reviewing all the code that was produced while constructing this library, also special thanks to our RabbitMQ experts for reviewing our code and ensuring we were following the best practices when handling connections and channels inside our library.

RabbitMQ Summit

Last year was a success. In November 2018 the first RabbitMQ Summit ever was held in London with visitors from all over the globe (videos from all talks here). This year - to make a good habit out of it, Erlang Solutions and 84codes (CloudAMQP) invites you to the second edition of RabbitMQ Summit in London. Find out more info or get your ticket on the official website https://rabbitmqsummit.com/

We thought you might also be interested in:

An Introduction to RabbitMQ

Our RabbitMQ help

Erlang & Elixir use cases

How to manage RabbitMQ queues

Permalink

RabbitMQ Mirrored Queues Gotchas

Mirrored Queues are a popular RabbitMQ feature that provides High Availability (HA). HA, in this context simply means that RabbitMQ nodes in the cluster can fail and the queues will still be available for the clients.
However, the HA queues can lead to some unexpected behaviour in failure scenarios or when combined with specific queue properties. In this blog post, we share three examples of these unpredictable behaviours that we have come across in RabbitMQ. This blog will help us explain some of the intricacies of HA queues. In doing so, we’ll also demonstrate how one can analyze the behaviours of a RabbitMQ cluster on a laptop or a single machine using common tools. Thus the next chapter briefly discusses the requirements to do so and scripts in the assisting repository that allow us to test the presented cases.

Setup

If you want to reproduce the examples from the post you will need the following dependencies installed:

Make
Git
Docker
Python3
Pipenv
Rabbitmq-perf-test 2.8.0
Wireshark (optional)

All the scenarios are based on a 2-node cluster consisting of RabbitMQ Docker containers - rmq1 and rmq2 - running Rabbit in version 3.7.15. Both containers expose ports 5672 (AMQP) and 15672 (management plugin) which are mapped to 5672/15672 and 5673/15673 for rmq1 and rmq2 respectively. In other words, once you set up the cluster, AMQP port for rmq1 is available at amqp://localhost:5672 and the management interface at http://localhost:15672.

The cluster is started with make up and tore down with make down. The up command will start the containers, attach them to a rmq network and install the following policy:

two dogs look up at camera

To see the logs run make logs1 to attach the output of the rmq1 container. Also, Python scripts are in use, thus pipenv install and pipenv shell need to be run to install the Python dependencies and start a shell within the python virtualenv respectively.

Auto-delete property for an HA queue

A queue in RabbitMQ can have the auto-delete property set. A queue with this property will be deleted by the broker once the last consumer unsubscribes. But what does this mean in a distributed environment where consumers are connected to different nodes and queue slaves are promoted to masters on failure? Let’s explore this example by setting up an environment for testing. Run make up which will spawn and cluster the RabbitMQ containers. The command should finish with an output similar to the below:

Cluster status of node rabbit@rmq1 ...  
[{nodes,[{disc,[rabbit@rmq1,rabbit@rmq2]}]},  
{running_nodes,[rabbit@rmq2,rabbit@rmq1]},  
{cluster_name,<<"rabbit@rmq1">>},  
{partitions,[]}, {alarms,[{rabbit@rmq2,[]},{rabbit@rmq1,[]}]}]  

Now we want to create a Mirrored Queue with the master at node rmq2 and the slave at rmq1. The queue should have the auto-delete property set.

For this purpose, we will use the PerfTest tool that we will connect to the second node and make it act as a producer. It will create haq queue (which matches the policy), bind it to the direct exchange with the key routing key and start producing 1 message per second:

# producer at rmq2
perf_test/bin/runjava com.rabbitmq.perf.PerfTest \
--uri amqp://localhost:5673 \
--producers 1 \
--consumers 0 \
--rate 1 \
--queue haq \
--routing-key key \
--auto-delete true 

Throughout the example, we assume perf_test is installed in the ./perf_test directory. As the producer is running, the queue should appear in the management UI and messages should be piling up:

two dogs look up at camera

Now let’s connect a consumer to our queue. Again, PerfTest will be our tool of choice but this time it will be used as a consumer attached to the first node (rmq1):

# consumer at rmq1 
perf_test/bin/runjava com.rabbitmq.perf.PerfTest \ --uri amqp://localhost:5672 \
--producers 0 \ 
--consumers 1 \ 
--queue haq 

The perf_test output should reveal that the messages are flowing:

# consumer at rmq1  
id: test-114434-950, time: 73.704s, received: 1.00 msg/s, min/median/75th/95th/99th consumer latency: 0/0/0/0/0 μs  
# producer id: test-113752-488, time: 487.154s, sent: 1.0 msg/s    

At this point, we have the following setup:

two dogs look up at camera

Now, let’s see what using the auto-delete property on HA queues can lead to in a failure scenario. Let’s introduce some turbulence into the cluster and disconnect rmq2 from the network between the nodes:

make disconnect2 

Underneath a docker network disconnect command is run that detaches the node from the rmq network. This should result in the consumer draining all the messages (as the producer is at the disconnected node and no new messages arrive at the queue). After approximately 10s the nodes should report that they cannot see each other. Below is a log from rmq2 where all this happens:

2019-06-07 10:02:20.910 [error] <0.669.0> ** Node rabbit@rmq1 not responding ** ** Removing (timedout) connection ** 
2019-06-07 10:02:20.910 [info] <0.406.0> rabbit on node rabbit@rmq1 down 2019-06-07 10:02:20.919 [info] <0.406.0> Node rabbit@rmq1 is down, deleting its listeners 
2019-06-07 10:02:20.921 [info] <0.406.0> node rabbit@rmq1 down: net_tick_timeout 

But what happened to our queue? If you look at the management interface of rmq2 (the node with the producer, http://localhost:15673/#/queues, user/password is guest/guest) the queue is gone! This is because once the netsplit has been detected, no consumers were left at rmq2 and the queue got deleted. What’s more, all the messages that made it to the master queue on rmq2 before the netsplit was detected are lost - unless Publisher Confirms was in play. The Producer would simply not receive confirms for the messages that were not accepted by the mirror. The queue should still be present at rmq1:

two dogs look up at camera

If we switch our producer to rmq1 the message flow should go back to normal:

# producer at rmq2 switched to rmq1 perf_test/bin/runjava com.rabbitmq.perf.PerfTest \ -uri amqp://localhost:5672 \
--producers 1 \ 
--consumers 0 \ 
--rate 1 \ 
--queue haq \ 
--routing-key key \ 
--auto-delete true 

Also, note that our slave at rmq1 was promoted to master:

2019-06-07 10:02:22.740 [info] <0.1858.0> Mirrored queue 'haq' in vhost '/': Slave <rabbit@rmq1.3.1858.0> saw deaths of mirrors <rabbit@rmq2.1.1096.0>
2019-06-07 10:02:22.742 [info] <0.1858.0> Mirrored queue 'haq' in vhost '/': Promoting slave <rabbit@rmq1.3.1858.0> to master 

As we are left with one copy of the queue with one consumer, if that last consumer is disconnected, the entire queue will be lost. Long story short: if you set up your HA queue with the auto-delete property, then in a fail-over scenario replicas can be deleted because they may lose consumers connected through other nodes. Using either ignore or autoheal cluster partition handling will keep the replicas. In the next section, we discuss the same scenario but with consumer cancel notification in use. Before moving on to the next section, restart our environment: make down up.

The consumer is notified about the fail-over

Now have a look at a slightly different scenario in which a consumer specifies that it wants to be notified in case of a master fail-over. To do so, it sets the x-cancel-on-ha-failover to true when issuing basic.consume (more here).

It differs from the previous example in that the consumer will get the basic.cancel method from the server once it detects the queue master is gone. And now, the client can handle this method as it wishes. E.g. it can restart the consumption by issuing basic.consume again, it could even reopen the channel or reconnect before resuming the consumption. Even if it crashes, upon receiving the basic.cancel, the queue will not be deleted. Note, the consumption can be resumed only once a new master is elected. For the same reasons as the previous example, the replica at another node (rmq2) will be gone. You can test all these consumer behaviours with the consume_after_cancel.py script. Below we will go through a case where the consumer will resume consumption after getting the basic.cancel. At the beginning, set up the cluster with make up and a producer at rmq2 - just as we did in the previous section:

# producer at rmq2
perf_test/bin/runjava com.rabbitmq.perf.PerfTest \
--uri amqp://localhost:5673 \
--producers 1 \
--consumers 0 \
--rate 1 \
--queue haq \
--routing-key key \
--auto-delete true 

Next, connect our new consumer to rmq1 (remember to pipenv install and pipenv shell as mentioned in the Setup):

./consume_after_cancel.py --server localhost --port 5672 --after-cancel reconsume

When the messages start flowing (they look meaningless), disconnect the second node from the cluster:

make disconnect2  

That should stop our consumer for a few moments, but it will resume consumption - since there’s a netsplit there are no messages to consume. Thus, we need to move the producer to the 1st node:

# producer at rmq2 switched to rmq1 perf_test/bin/runjava com.rabbitmq.perf.PerfTest \ -uri amqp://localhost:5672 \
--producers 1 \ 
--consumers 0 \ 
--rate 1 \ 
--queue haq \ 
--routing-key key \ 
--auto-delete true  

At this point, the message will be flowing again.

When studying these corner cases Wireshark and its AMQP dissector comes in very handy as it is crystal clear what the client-server conversation looks like. In our particular case, this is what can be seen:

two dogs look up at camera

As a simple explanation: using the x-cancal-on-ha-failover gives a piece of extra information indicating that a fail-over has happened which can be acted upon appropriately - e.g. to automatically subscribe to the new master. We are then left with one consumer, and all the replicas without consumers being cleared.

HA queues with automatic synchronization

If we take a closer look at our HA policy (see the Setup section) it specifies the queue synchronization as automatic:

"ha-sync-mode":"automatic"

It means that when a node joins a cluster (or rejoins after a restart or being partitioned) and a mirror is to be installed on it, all the messages will be copied to it from the master. The good thing about this is that the new mirror has all the messages straight after joining, which increases data safety. However, it comes with a cost: when a queue is being synchronized, all the operations on the queue are blocked. It means that for example, a publisher cannot publish and a consumer cannot consume. Let’s have a look at an example in which we publish 200K messages to the haq queue, then attach a slow consumer and finally restart one node. As the policy specifies that the queue is to have 2 replicas with automatic synchronization, the restart will trigger messages to be copied to keep the replicas in sync. Also, the "ha-sync- batch-size":2 set in the policy indicates that RabbitMQ will synchronize 2 messages at a time - to slow down the synchronization and exaggerate “the blocking effect”. Run make up to setup the cluster. Then publish the messages:

# 10*20K = 200K 
./perf_test/bin/runjava com.rabbitmq.perf.PerfTest \
--uri amqp://localhost:5672 \
--producers 10 \
--consumers 0 \
--queue haq \
--pmessages 20000 \
--auto-delete false \
--rate 1000  

Check the queue is filled up (http://localhost:15672/#/queues)

two dogs look up at camera

When it’s ready we can start the slow consumer which will process 10 msgs/s with a prefetch count of 10:

./perf_test/bin/runjava com.rabbitmq.perf.PerfTest \
--uri amqp://localhost:5672 \
--producers 0 \
--consumers 1 \
--queue haq \
--predeclared \
--qos 10 \
--consumer-rate 10 

Then restart the rmq2 (make restart2) and observe both the consumer’s logs and the rmq1 logs. In the consumers’ logs you should see some slowdown in receiving of messages:

id: test-145129-551, time: 20.960s, received: 10.0 msg/s, min/median/75th/95th/99th consumer latency: 0/0/0/0/0 μs
id: test-145129-551, time: 21.960s, received: 10 msg/s, min/median/75th/95th/99th consumer latency: 0/0/0/0/0 μs
id: test-145129-551, time: 23.058s, received: 10 msg/s, min/median/75th/95th/99th consumer latency: 0/0/0/0/0 μs
--> id: test-145129-551, time: 31.588s, received: 0.82 msg/s, min/median/75th/95th/99th consumer latency: 0/0/0/0/0 μs 
id: test-145129-551, time: 32.660s, received: 83 msg/s, min/median/75th/95th/99th consumer latency: 0/0/0/0/0 μs 
id: test-145129-551, time: 33.758s, received: 10 msg/s, min/median/75th/95th/99th consumer latency: 0/0/0/0/0 μs 
id: test-145129-551, time: 34.858s, received: 10 msg/s, min/median/75th/95th/99th consumer latency: 0/0/0/0/0 μs  

The marked line shows that the consumer was waiting for nearly 10 seconds for a message. And the RabbitMQ logs can explain why:

2019-06-04 13:36:33.076 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 200000 messages to synchronise 2019-06-04 13:36:33.076 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: batch size: 2 
2019-06-04 13:36:33.079 [info] <0.3039.0> Mirrored queue 'haq' in vhost '/': Synchronising: mirrors [rabbit@rmq2] to sync 2019-06-04 13:36:34.079 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 22422 messages 
2019-06-04 13:36:35.079 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 43346 messages 2019-06-04 13:36:36.080 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 64512 messages 
2019-06-04 13:36:37.084 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 82404 messages 2019-06-04 13:36:38.095 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 104404 messages 
2019-06-04 13:36:39.095 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 128686 messages 2019-06-04 13:36:40.095 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 147580 messages 
2019-06-04 13:36:41.096 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 164498 messages 2019-06-04 13:36:42.096 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 182408 messages 
2019-06-04 13:36:42.961 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: complete 

If we compare timestamps of the first line, indicating the start of the synchronization, with the last line, indicating the end of the synchronization, we can see it took nearly 10 seconds. And this is exactly when the queue was blocked. In real life, obviously, we would not have the ha-sync-batch-size set to 2 - it is by default set to 4096. But what happens in real life is that there can be multiple queues, longer than 200K, with larger messages. All of that can contribute to a high volume of traffic and prolonged synchronization time resulting in queues that remain blocked for longer. The takeaway is: make sure you are in control of the length of your queues if automatic synchronization is in use. This can be achieved by limiting the length of queues or setting TTLs for messages.

Delayed Publisher Confirms

If you’re concerned about data safety, you probably need Publisher Confirms. This mechanism makes the broker send an ACK for each successfully accepted message. But what does successfully accepted mean? As per the documentation:

For routable messages, the basic.ack is sent when a message has been accepted by all the queues. For persistent messages routed to durable queues, this means persisting to disk. For mirrored queues, this means that all mirrors have accepted the message.

As we are discussing HA queues, the statement saying that an ACK is sent when ALL the mirrors have accepted the message is the interesting one. To see how this can cause complications, let’s consider the following setup based on the RPC messaging pattern:

two dogs look up at camera

We have a server that subscribes to haq_jobs queue on which it expects messages with integers in their body. Once it gets a message, it computes a Fibonacci number for that integer and sends the reply back to the reply queue stores it in the reply_to property of the original message. The client in turn (denoted as P/C in the picture), when invoked, will send an integer to the haq_jobs queue and wait for the reply on the temporary queue. It opens two connections: one for publishing the request and one for getting the response. The reply queue is exclusive as it is meant to be used by just one consumer. What is more, the message is sent in a separate thread which then waits for the ACK. In other words, sending a message and waiting for the reply are independent thread-wise.

Now we’re ready to start the cluster: make up. Then in a separate shell run the server: /fib_server.py. Finally, try the client:

./fib_client.py 7 
[2019-06-06 14:20:37] Press any key to proceed... 
[2019-06-06 14:20:37] About to publish request='7' 
[2019-06-06 14:20:37] Message ACKed
[2019-06-06 14:20:37] Got response=13
./fib_client.py 7 
[2019-06-06 14:20:39] Press any key to proceed... 
[2019-06-06 14:20:39] About to publish request='7' 
[2019-06-06 14:20:39] Got response=13 
[2019-06-06 14:20:39] Message ACKed 

As you may have noticed, in the second invocation of the client, we got the first response and then the ACK for the request. That may look weird, but this is due to the fact that the confirms are sent asynchronously once all the mirrors accept the message. In this particular case, it turned out that the Fibonacci server and RabbitMQ were quicker to produce the result and deliver it than the cluster to replicate the request and send the confirmation. What can we conclude from this? Never rely on the confirm to come before the reply in the RPC (Remote Procedure Call) pattern - although they should come very close to each other.

The last statement, that the ACK and the reply come shorty after one another, doesn’t hold true in a failure scenario where the network/node is broken. To simulate that, let’s clean the environment with make down and set the net_ticktime to 120s to make Rabbit detect net-splits slower. This will give us time to perform the actions after we break the network. It has been set to 10s in the previous experiments to make them run faster, specifically to make the net-split detection run fast. Edit the conf/advanced.config as follows: This is needed for the second experiment in this section as it gives us time to perform the required actions; it doesn’t affect the first experiment. It has been initially set to 10 so that we could go through the tests in the previous sections fast. Modify the conf/advanced.config so that the option is set to 120s instead of 10:

[ 
 {kernel, [{net_ticktime, 120}]}
]. 

Now we can run the cluster, make up, and start the server and client as previously. But leave the clients at the request to press a key and stop for a moment to reflect on the current state of the system: *we have the Fibonacci server connected and waiting for the requests, *we have 2 connections opened from the client: one to publish and one to consume, *all the queues are set up.

Now run make disconnect2 to disconnect rmq2 from the cluster. At this point, you have between 90 to 150 seconds (as the net_ticktime is set to 120s) to press any button to publish the message. You should see that the server will process it and you will get the response but the ACK for the request won’t be delivered until you recover the net-split with make connect2 (which you have to do within the given time-range).

[2019-06-06 14:21:32] Press any key to proceed... [2019-06-06 14:21:38] About to publish request='7' [2019-06-06 14:21:38] Got response=13
[2019-06-06 14:22:05] Message ACKed 
In the above listing, the ACK arrived ~30s after the actual response due to the net-split. 

Remember: one cannot expect that a message will be ACKed by the broker before the reply arrives on the reply queue (RPC pattern) - as we’ve just seen the ACK can get stuck due to some turbulence on the network.

Summary

Undoubtedly Mirror Queues provide an important feature of RabbitMQ - which is High Availability. In simple words: depending on the configuration, you will always have more than one copy of the queue and its data. As a result, you are ready for a node/network failure which will inevitably happen in a distributed system. Nonetheless, as illustrated in this blog post, in some specific failure scenarios or configuration, you may observe unexpected behaviours or even data/queue loss. Thus it’s important to test your system against potential failures and scenarios which are not (clearly) covered by the documentation. If you’re interested in learning more about HA in RabbitMQ, talk to us at the RabbitMQ Summit in London or contact us today for a RabbitMQ health check.

Monitor your Erlang, Elixir and RabbitMQ systems

Are you currently using Erlang, Elixir or RabbitMQ in your stack? Get full visibility of your system with WombatOAM, find out more and get a free trial on our WombatOAM page.

Want to learn more?

For more information on High Availability in RabbitMQ, watch this video from our Erlang Meetup in Krakow.

We thought you might also be interested in:

An Introduction to RabbitMQ

Our RabbitMQ solutions

RabbitMQ | Topic Exchanges

Permalink

Authentication in Elixir Web Applications with Ueberauth and Guardian: Part 4

In the previous posts in this series we walked through:

In this post we are going to expand upon that base to allow users to create an account or log in to an existing account using OAuth providers such as Google or Twitter.

OAuth providers

A detailed explanation of OAuth is outside the scope of this blog post. If you’re not familiar with OAuth, this video is a good brief introduction. It will be helpful, however, to explore how OAuth works in the context of using it to log in to accounts in our application. In the case of using an email address and password, our application, Yauth, assumed two responsibilities:

  1. establishing the identity of our users by receiving and storing emails and passwords upon account registration; and
  2. verifying the identity of users by challenging them to produce the correct email and password combination on subsequent attempts to log in to an account.

Our application can, however, offload one or both of those responsibilities to OAuth providers with which we can integrate.

Our users may have already established their identity and the credentials needed to prove that identity with OAuth providers like Google, Twitter, Facebook, etc. Yauth can register with those providers as an application that would like to allow users access via the identity and credentials they manage. When we register Yauth with the provider, the provider gives us a client id and client secret and we give them a callback URL to our application. Yauth can use that information to create a link on Yauth pages. When our user clicks that link, they are redirected to the OAuth provider’s login page (i.e. their identity challenge). When the user completes the identity challenge (e.g. providing their user name and password for that provider), that information is submitted to the OAuth provider’s server. The OAuth server then redirects the user to the callback URL that Yauth provided with the result of the identity challenge. When Yauth receives that data, it creates an account and logs in upon success or displays an error message if the challenge failed.

By using OAuth providers, Yauth can accept the identity information for the user from those providers and allow them to manage user authentication. This provides flexibility for our users, some of whom may not want another password to manage, and could free us from managing passwords, password resets, etc. if we wanted to use OAuth exclusively.

Implementing OAuth log in

We will use Google as our example OAuth provider for this post. The first step will be to create a project with Google and set up credentials for an OAuth client. Walking through that process step-by-step will make this post too long but following “Step 1” in these instructions should get you where you need to be to follow along.

Once we have registered with Google as an OAuth client, we need to implement registration and log in to Yauth using Google. In an earlier post, we described Ueberauth and used the Ueberauth Identity strategy to help us with email/password registration and log in. Here we’ll use the Ueberauth Google strategy.

To get started, we need to add the Ueberauth Google Strategy package to our project.

# mix.exs
defmodule Yauth.MixProject do
  # ...
  defp deps do
    [
      # ...
      {:ueberauth, "~> 0.6"},
      {:ueberauth_google, "~> 0.8"},
      # ...
    ]
  end

  # ...
end
mix deps.get

In our previous post we configured Ueberauth for the Identity strategy. Now we need to provide some configuration for the Google strategy.

# config/config.exs
config :ueberauth, Ueberauth,
  providers: [
    google: {Ueberauth.Strategy.Google, []},
    # ...
  ]

config :ueberauth, Ueberauth.Strategy.Google.OAuth,
  client_id: System.get_env("GOOGLE_CLIENT_ID"),
  client_secret: System.get_env("GOOGLE_CLIENT_SECRET")

The first configuration tells ueberauth which providers we intend to use in the application and the module that implements the strategy behavior for that provider. The second configuration provides the client id and secret for the Google strategy we intend to use in the form of environment variables. These values are available from the Google Developer Console where you set up your application. Remember to export those values in your shell before starting your Phoenix server.

Routes

As you may recall from our previous discussion, Ueberauth uses a “two-phase” approach to authentication. The first phase presents an authentication challenge to the user. The second phase handles the data provided in that challenge. To accomplish this with OAuth providers, we need to provide routes by which users access this functionality.

# lib/yauth_web/router.ex
defmodule YauthWeb.Router do
  # ...

  scope "/auth", YauthWeb do
    pipe_through [:browser, :guardian]

    get "/:provider", AuthController, :request
    get "/:provider/callback", AuthController, :callback
  end
end

The Ueberauth package expects routes to be prefixed with a certain path before the OAuth provider name. The default option is “/auth”, which works for our application, but another path could be used by adding the :base_path option in the ueberauth configuration in your config/config.exs file. We accomplish that by placing these routes in the scope "/auth" ... block.

With the routes in place, we need to provide the controller. Here are the module and function signatures for that controller that we’ll walk through together.

# lib/yauth_web/controllers/auth_controller.ex
defmodule YauthWeb.AuthController do
  use YauthWeb, :controller
  plug Ueberauth

  def request(conn, _params) do
    # Present an authentication challenge to the user
  end

  def callback(%{assigns: %{ueberauth_auth: auth_data}} = conn, _params) do
    # Find the account if it exists or create it if it doesn't
  end

  def callback(%{assigns: %{ueberauth_failure: _}} = conn, _params) do
    # Tell the user something went wrong
  end
end

Authentication challenge

The function AuthController.request/2 exists to provide the authentication challenge to the user. As we are not presenting an authentication challenge (i.e. a form for the user to complete) to the user at this route, we don’t actually need to implement or even define this function. Why not? At the top of the controller we call plug Ueberauth. In the event that the request is for a configured OAuth provider, this plug redirects the request to that OAuth provider’s authentication challenge page in place of any function call within the Yauth controller.

In other words, our Google button on our login and registration pages has the route /auth/google that our Router directs to AuthController.request/2. When the user clicks on that button, the request is directed to the AuthController that first passes it through the Ueberauth plug. That plug sees the request is for a configured OAuth provider and redirects the request to the provider’s URL, circumventing a call to the request/2 function. Our final implementation won’t have this function signature but I wanted to explain why it will be absent as it can be puzzling to see it in the Router but missing in the controller. If we wanted, we could also move all of the handling for the identity strategy out of the RegistrationController and SessionController into the AuthController, which would require implementing request/2 to present the identity authentication challenge, but we’ll leave that as is for now.

Authentication verification

The AuthController.callback/2 is the function registered with our OAuth providers and handles the incoming request from the provider after their authentication challenge has been completed. When the user provides the Google credentials at the Google page, Google sends the requested data to the route handled by this function.

The Ueberauth plug enters the picture in this case as well. That plug calls functions provided by the Ueberauth strategy implementation—Ueberauth.Strategy.Google in our case—that extract the expected data from the incoming request. That extracted data is added to the Plug.Conn assigns where our controller can easily access it. As you can see in the two function heads above, this will assign either the key :ueberauth_auth in the event of success or :ueberauth_failure in the event of failure.

We’re now in a position to fill in the implementation of the callback/2 function. In a successful request we receive the email address of the user and want to do two things. First, we want to see if an account exists with that email address already, either through a previous registration via email address and password or a previous OAuth login. If the account exists, we want to log in the account and redirect them to their profile page. Second, if an account does not exist we want to register and log in an account.1

As we’ve done in the past, let’s write the code we wish we had and fill in the details as we go.

# lib/yauth_web/controllers/auth_controller.ex
defmodule YauthWeb.AuthController do
  # ...
  alias Yauth.Accounts
  alias YauthWeb.Authentication

  def callback(%{assigns: %{ueberauth_auth: auth_data}} = conn, _params) do
    case Accounts.get_or_register(auth_data) do
      {:ok, account} ->
        conn
        |> Authentication.log_in(account)
        |> redirect(to: Routes.profile_path(conn, :show))

      {:error, _error_changeset} ->
        conn
        |> put_flash(:error, "Authentication failed.")
        |> redirect(to: Routes.registration_path(conn, :new))
    end
  end

  # ...
end

When we successfully receive data from the OAuth provider (as indicated by the :ueberauth_auth key), we pass that data to the get_or_register/1 function on the Accounts context. The controller expects an :ok tuple with the account (either loaded from the database or newly created) or an :error tuple. If we receive the :ok tuple, we use the log_in/2 function we introduced in our prior post on registering with an email and password. Finally, we redirect the user to their profile page. If we receive the :error tuple, we redirect to the registration page with a message telling the user their authentication failed.

We also need to handle a failure from the OAuth provider (as indicated by the :ueberauth_failure key). We can handle that situation with a second head of the callback/2 function.

# lib/yauth_web/controllers/auth_controller.ex
defmodule YauthWeb.AuthController do
  # ...
  def callback(%{assigns: %{ueberauth_failure: _}} = conn, _params) do
    conn
    |> put_flash(:error, "Authentication failed.")
    |> redirect(to: Routes.registration_path(conn, :new))
  end
end

This is a simple function that adds a flash message and redirects to the registration page.

With those in place we need to add the functions to our Accounts context that are called by our controller.

# lib/yauth/accounts.ex
defmodule Yauth.Accounts do
  # ...
  def get_or_register(%Ueberauth.Auth{info: %{email: email}} = params) do
    if account = get_by_email(email) do
      {:ok, account}
    else
      register(params)
    end
  end
  # ...

This function accepts the Ueberauth struct containing our user’s information from the OAuth provider. We pattern match on that struct to get the email address of the user and pass that to our existing get_by_email/1 function. That function returns either an account or nil. If an account is returned we wrap it in an {:ok, account} tuple. Otherwise we call through to the existing register/1 function with the Ueberauth struct. Our register/1 function, however, will need to handle the Google data as well as the identity data we established in our previous post. Ueberauth adds the provider to its data struct so we can pattern match on that value to handle our different use cases. Update the existing register/1 function to handle the identity provider.

# lib/yauth/accounts.ex
defmodule Yauth.Accounts do
  # ...
  def register(%Ueberauth.Auth{provider: :identity} = params) do
    %Account{}
    |> Account.changeset(extract_account_params(params))
    |> Yauth.Repo.insert()
  end
  # ...
end

Now we can add a function head to handle our OAuth-provided data.

# lib/yauth/accounts.ex
defmodule Yauth.Accounts do
  # ...
  def register(%Ueberauth.Auth{} = params) do
    %Account{}
    |> Account.oauth_changeset(extract_account_params(params))
    |> Yauth.Repo.insert()
  end
  # ...
end

This function head calls out to a new changeset function on the Account module.

# lib/yauth/accounts/account.ex
defmodule Yauth.Accounts.Account do
  # ...
  def oauth_changeset(struct, params) do
    struct
    |> cast(params, [:email])
    |> validate_required([:email])
    |> unique_constraint(:email)
  end
  # ...
end

For OAuth data we don’t manage passwords at all but we still want to require an email address and enforce the uniqueness of that email within our application.

Views

Up to this point we’ve added our Ueberauth dependencies, provided the necessary configuration, added routes for social login, and added the controller to handle those requests. All that remains is for us to update our social sign in links to point to the appropriate routes. Update your template with the following:

<!-- lib/yauth_web/templates/session/social_links.html.eex -->
<!-- ... -->
<div class="social-log-in">
  <p>Or log in with</p>
  <%= link(
    "Google",
    to: Routes.auth_path(@conn, :request, "google"),
    class: "button button-outline"
  ) %>
</div>
<!-- ... -->

At this point our users should be able to register for or log in to an account using their Google credentials.

Recap

In this post we expanded our authentication options by letting users use their OAuth service accounts to authenticate with Yauth. We used Google as our example service. We registered our application with Google, added the Ueberauth Google strategy to our application, and provided the controllers, views, and supporting functions to register and log in users who authenticated with Google. To expand our options to Twitter or GitHub or other OAuth providers, we can easily repeat this process with those services.


[1] As an aside, we should note there are more options for handling existing accounts when integrating with OAuth providers. For the sake of simplicity, we allow users with existing identity accounts to use the Google strategy as well. In other words, if you have a Yauth account with a Gmail address and then later use the Google OAuth option with that same Gmail address, you will access the same Yauth account. However, you could not do the same in the opposite direction. The merits and limitations of this approach should be compared with other options when setting up OAuth for production systems. This post chose the simplest path for illustration; using it here is not an endorsement of it as the best way to handle that situation.

Permalink

OTP 23 Highlights

OTP 23 has just been released (May 13:th 2020). It has been a long process with three release candidates in February, March and April before the final release. We are very thankful for the feedback we have got regarding the release candidates, which has revealed some bugs and flaws that our internal testing did not find.

This blog post will describe some highlights of what is new in OTP 23.

You can download the readme describing the changes here: OTP 23 Readme. Or, as always, look at the release notes of the application you are interested in. For instance here: OTP 23 Erts Release Notes.

Language

In OTP 23 we have added some new features to the language and compiler, one has been in the backlog since the bit syntax was introduced and the other is a suggestion from the community.

Matching syntax improvements​

Binary matching

In binary matching, the size of the segment to be matched is now allowed to be a guard expression. In the example below the variable Size is bound to the first 8 bits and then it is used in an expression (Size-1)*8 for the size of the following binary.

example1(<<Size:8,Payload:((Size-1)*8)/binary,Rest/binary>>) ->
    {Payload,Rest}.

Matching on maps

In the current map matching syntax, the key in a map pattern must be a single value or a literal. This leads to unnatural code if the keys in a map are complex terms.​

With OTP 23 the keys in map matching can be guard expressions as you see in new_example2.​

The only limitation is that all variables used in a key expression must be previously bound.

Previously you had to do like this:

example2(M, X) ->
    Key = {tag,X},
    #{Key := Value} = M,
    Value.

And now you can do like this:

new_example2(M, X) ->
    #{ {tag,X} := Value} = M,
    Value.

Below there is an illegal example showing that it is still not supported to use an unbound variable as part of the expression for the key-pattern. In this case Key is not bound and the requirement is that all variables used in a key expression must be previously bound.

illegal_example(Key, #{Key := Value}) -> Value.

Numeric literals with underscore

It is now allowed to write numeric literals with underscore between the digits for the purpose of readability. But the placement of the underscores is not totally free there are some rules. See example of allowed use below:

305441741123_456
1_2_3_4_5
123_456.789_123
1.0e1_23
16#DEAD_BEEF
2#1100_1010_0011

And in the following example we have some examples of disallowed placement of the underscore:

_123  % variable name
123_
123__456  % only single ‘_’
123_.456
123._456
16#_1234
16#1234_

Distributed spawn and the new erpc module

Improved spawn

The spawn operation is improved regarding scalability and performance for the distributed case. That is when spawning a process on another node.

New features are also added, such as a distributed spawn_monitor() BIF. This function creates a new process and sets up a monitor atomically.

The spawn_opt() BIF will also support the monitor option for setting up a monitor atomically while creating a process on another node.

We have also added new spawn_request() BIFs for asynchronous spawning of processes. spawn_request() supports all options that spawn_opt() already supports.

The spawn improvements described above can also be used to optimize and improve many of the functions in the rpc module but since the new functions will not be 100% compatible we decided to introduce a new module named erpc and will keep the old rpc module as well.

The erpc module implements an enhanced subset of the operations provided by the rpc module.

Enhanced in the sense that it makes it possible to distinguish between returned value, raised exceptions, and other errors.​

erpc also has better performance and scalability than the original rpc implementation. This by utilizing the newly introduced spawn_request() BIF.

The rpcmodule now share the same implementation as erpc and by that users of rpc will automatically benefit from the performance and scalability improvements made in erpc.

The pictures below illustrate the old and the new implementation of rpc:call() and shows why the new one is more efficient and scalable.

“old” rpc:call implementation: old rpc illustration

new rcp:call implementation with support in the distribution protocol (spawn request) new rpc illustration

As you can see in the “old” implementation above the rpc:call relies on the rex process on the receiving node to spawn a temporary process to execute the called function. This will make rex to a bottleneck if there are many simultaneous rpc:calls towards the node.

The new solution does not use rex at all and let the spawned process decode the arguments of the call thus avoiding some unnecessary copying of data that occurs in the “old” implementation.

gen_tcp and the new socket module

In OTP 22 we introduced the new experimental socket API. The idea behind this API is to have a stable intermediary API that can be used to create features that are not part of the higher-level gen_* APIs.

We have now come one step further in our plan to replace the inet driver by making it possible to use the gen_tcp API with socket as an optional back-end.

To make it easy to test with existing code using gen_tcp a new option {inet_backend, socket | inet} can be used to select the socket implementation instead of the default inet implementation. This option must be put first in the option list to the functions: gen_tcp:listen, gen_tcp:connect and gen_tcp:fdopen, which are all functions that create a socket. For example like this:

{ok,Socket} = gen_tcp:connect(Addr,Port,[{inet_backend,socket}|OtherOpts])

The returned Socket is a '$inet' tagged 3-tuple instead of a port, so all other API functions will use the right implementation for the socket.

A more general override is to use the Kernel configuration variable inet_backend and set it to socket or inet. For example on the erl command-line as

erl -kernel inet_backend socket

or set it with

ERL_FLAGS="-kernel inet_backend socket"

Help in the shell

We have implemented EEP 48 which specifies a storage format for API documentation to be used by BEAM languages. By standardizing how API documentation is stored, it will be possible to write tools that work across languages.

The ordinary doc build is extended with the generation of .chunk files for all OTP modules. You can run make docs DOC_TARGETS=chunks to build only the EEP 48 chunks. Running just make docs without setting the DOC_TARGETS variable will build all formats (html, man, pdf, chunks).

Built on these new features we’ve added On-line help in the shell with the functions:

h(Module) 
h(Module,Function), 
h(Module,Function,Arity)

There are also corresponding functions ht/1,2,3 and hcb/1,2,3 to get help about types and callback functions

We have added a new module shell_docs in stdlib with functions for rendering documentation for a shell. This can be used for instance by Development Environments such as those based on the Language Server Protocol (LSP).

The code module also got a new function, get_doc which returns the doc chunk without loading the module.

See example below for getting documentation for lists:sort/2

4> h(lists,sort,2).

  -spec sort(Fun, List1) -> List2
                when
                    Fun :: fun((A :: T, B :: T) -> boolean()),
                    List1 :: [T],
                    List2 :: [T],
                    T :: term().

  Returns a list containing the sorted elements of List1,
  according to the ordering function Fun. Fun(A, B) is to
  return true if A compares less than or equal to B in the
  ordering, otherwise false.
ok

Improved tab-completion

The tab-completion in the shell is also improved. Previously the tab-completion for modules did only work for already loaded modules now this is extended to work for all modules available in the code path. The completion is also extended to work inside the “help” functions h, ht and hcb. You can for example press tab like the example below and get all modules beginning with l:

5> h(l
lcnt                      leex                      lists                     
local_tcp                 local_udp                 log_mf_h                  
logger                    logger_backend            logger_config             
logger_disk_log_h         logger_filters            logger_formatter          
logger_h_common           logger_handler_watcher    logger_olp                
logger_proxy              logger_server             logger_simple_h           
logger_std_h              
logger_sup

Or complete all functions beginning with s in the lists module like this:

5> h(lists,s
search/2     seq/2        seq/3        sort/1       sort/2       split/2      
splitwith/2  sublist/2    sublist/3    subtract/2   suffix/2     sum/1        

“Container friendly” features

Take CPU quotas into account

CPU quotas are now taken into account when deciding the default number of online schedulers.

Thus, automatically making Erlang a good citizen in container environments where quotas are applied, such as docker with the --cpus flag.

EPMD independence

In a cloud and container based environment it might be interesting to run distributed Erlang nodes without use of epmd and use a hard coded port or an alternative service discovery. Because of this we introduce ways to make it easier to start and configure systems without epmd.

Handshake

We have improved the handshake during connection setup in the Erlang distribution protocol. It is now possible to agree on protocol version without depending on epmd or other prior knowledge of peer node version.

Dynamic node name

Another feature introduced together with the new handshake is the dynamic node name. A dynamic node name is chosen by using the options -name Name or -sname Name and setting Name to undefined.

These options makes the Erlang runtime system into a distributed node. These flags invokes all network servers necessary for a node to become distributed; see net_kernel. It is also ensured that epmd runs on the current host before Erlang is started; see epmd and the -start_epmd option.

The new feature in OTP 23 is that Name can be set to undefined and then the node will be started in a special mode optimized to be the temporary client of another node. When enabled the node will request a dynamic node name from the first node it connects to. In addition these distribution settings will be implied:

erl -dist_listen false -hidden -dist_auto_connect never

Because -dist_auto_connect is set to never, the system will have to manually call net_kernel:connect_node/1 in order to start the distribution. If the distribution channel is closed, when a node uses a dynamic node name, the node will stop the distribution and a new call to net_kernel:connect_node/1 has to be made. Note that the node name may change if the distribution is dropped and then set up again.

Note! The dynamic node name feature is supported from OTP 23. Both the temporary client node and the first connected peer node (supplying the dynamic node name) must be at least OTP 23 for it to work.

New options to control the use of epmd

To give the user more control over the use of epmd some new options to the inet distribution has been added.

  • -dist_listen false Setup the distribution channel, but do not listen for incoming connection. This is useful when you want to use the current node to interact with another node on the same machine without it joining the entire cluster.

  • -erl_epmd_port Port Configure a default port that the built-in EPMD client should return. This allows the local node to know the port to connect to for any other node in the cluster.
  • -remsh Node Starts Erlang with a remote shell connected to Node. If no -name or -sname is given the node will be started using -sname undefined. If Node is using long names then you should give -name undefined. If Node does not contain a hostname, one is automatically taken from the -name or -sname option.

    Note Before OTP-23 the user needed to supply a valid -sname or -name for -remsh to work. This is still the case if the target node is not running OTP-23 or later.

# starting the E-node test
erl -sname test@localhost 

# starting a temporary E-node (with dynamic name) as a remote shell to
# the node test
erl -remsh test@localhost 

The erl_epmd callback API has also been extended to allow returning -1 as the creation which means that a random creation will be created by the node.

In addition a new callback function called listen_port_please has been added that allows the callback to return which listen port the distribution should use. This can be used instead of inet_dist_listen_min/max if the listen port is to be fetched from an external service.

New option for erl_call

erl_call is a C program originally bundled as an example inside the erl_interface application. erl_interface contains C-libraries for communicating with Erlang nodes and letting C programs behave as if they are Erlang nodes. They are then called C nodes. erl_call has become popular and is used in products mainly for administration of an Erlang node on the same host. In OTP 23 erl_call is installed under the same path as erl making available in the path without bothering about the erl_interface version. Another new thing in erl_call is the address option, that can be used to connect directly to a node without being dependent on epmd to resolve the node name.

AFAIK erl_call is being used in the upcoming version of relx (used by rebar3) for the node_tool function.

TLS enhancements and changes

TLS-1.3 is now supported (in OTP 22 we classed it as experimental) but not yet feature complete. Key features supported are:

  • session tickets
  • refreshing of session keys
  • RSASSA-PSS signatures
  • Middlebox compatibility.

The “early data” feature is not yet supported. Early data is an optimization introduced in TLS 1.3 which allows a client to send data to a server in the first round trip of a connection, without waiting for the TLS handshake to complete if the client has spoken to the same server recently.

In OTP 23 TLS 1.3 is per default announced as the preferred protocol version by both client and server. Users who are not explicitly configuring the TLS versions should be aware of this since it can have impact on interoperability.

A new option exclusive is provided for ssl:cipher_suites/2,3 and ssl:versions is extended to better reflect what versions of TLS that are available for the current setup of Erlang/OTP.

Also note that we have removed support for the legacy TLS version SSL-3.0.

SSH

Two notable SSH features were provided as Pull Requests from open source users, namely support for fetching keys from ssh-agents and TCP/IP port forwarding. Port forwarding is sometimes called tunneling or tcp-forward/direct-tcp. In the OpenSSH client, port forwarding corresponds to the options -L and -R.

Ssh agent stored keys improves the security while port forwarding is often used to get an encrypted tunnel between two hosts. In the area of key handling, the default key plugin ssh_file.erl is rewritten and extended with OpenSSH file format “openssh-key-v1”. A limitation so far is that keys in the new format cannot be encrypted The default plugin now also uses port numbers which increases the security.

The SSH application can now be configured in an Erlang config-file. This gives the possibility to for example change the supported algorithm set without code change.

Crypto

A new crypto API was introduced in OTP-22.0. The main reason for a new API was to use the OpenSSL libcrypto EVP API that enables HW acceleration, if the machine supports it. The naming of crypto algorithms is also systemized and now follows the schema in OpenSSL.

There are parts of the Crypto app that are using very old APIs while other parts are using the latest one. It turned out that using the old API in the new way, and still keeping it backwards compatible, was not possible.

Therefore the old API is kept for now but it is implemented with new primitives. The Old API is deprecated in OTP-23.0 and will be removed in OTP-24.0.

Permalink

Simple TDD in Erlang

Or how to use Pattern-Matching for Tests

While acting as a mentor on the FutureLearn MOOC about Erlang I presented an idea that folks like Adolfo Neto loved (he even tweeted about it 🧡). It is, in fact, the way I introduce people to pattern-matching when I’m teaching them Erlang. It’s a way to write tests that let you naturally work your code out from them… Sounds familiar? Yes! It’s Test-Driven Development!

Airplane! (1980)

The Process

I learned TDD when I learned Smalltalk. It was such a life-changing lesson! And Smalltalk was the best environment to learn it since it’s built for it. In Smalltalk, I believe it’s actually harder to write code with a different process. If you didn’t try it already, you definitely should! Follow Hernan Wilkinson, his company (10pines) delivers Smalltalk courses where you can learn this from the experts, among several other great things.

Anyway, for the uninitiated, the general process for TDD is…

Original Here

What’s not described in the graph (particularly for compiled languages, like Erlang) is that after adding a test, in order to see that test fail, you first need the code to compile. And that, at first hand, may seem difficult. But it’s actually possible, using a nice hot-code-loading related feature of Erlang.

Let’s dive in with an example…

Do we have an extra day?

So, let’s say you have to write a module year with a single function: is_leap/1 . As you might guess, the idea is for that function to receive an integer and return a boolean telling you if it’s a leap year or not.

Add Tests

For the first step, we will already use the magic trick I mentioned above. Look at the module we’ll write…

-module year.
-export [test/0].
test() ->
false = year:is_leap(1),
ok.

Let’s dissect this thing…

The first two lines should be pretty obvious. We define the module and export test/0, which is the function we’ll use to run the tests.

Then we write the test/0 function. That function will just contain a series of assertions like most test functions do, specifying the expected results of our function is_leap/1.

The first assertion says that 1 is not a leap year. But pay attention to the fact that even when we’re going to eventually define is_leap/1 in this same module, we’re using the fully-qualified version of it (year:is_leap(…)) instead of just writing is_leap(1). Why? Hot-Code Reloading, of course!

Ron Damón as confused as you

The Erlang compiler will fail if it finds a call to an internal function that is not defined, it won’t compile your module and therefore it won’t run the test. Except… if you use a fully-qualified call. Because thanks to the logic behind hot code loading, you can totally define that function in a later version of the module.

That is super-useful for us since now we can…

See Tests Fail

1> c(year).
{ok, year}
2> year:test().
** exception error: undefined function year:is_leap/1
in function year:test/0 (year.erl, line 5)

Write Code

Great! Now our test tells us that we need to define is_leap/1 … So we do!

-module year.
-export [test/0].
-export [is_leap/1].
test() ->
false = year:is_leap(1),
ok.
is_leap(_) -> false.

Run Tests

We try compiling and running our tests in a more succinct way so we can use up arrow each time we go to the console… oh, lazy lazy developers!

3> c(year), year:test().
ok.

Rinse and Repeat…

Well… this is good! Now let’s add more assertions to our tests…

-module year.
-export [test/0].
-export [is_leap/1].
test() ->
% regular years are not leap
false = year:is_leap(1),
false = year:is_leap(1995),
false = year:is_leap(1997),
% multiples of 4 are leap…
true = year:is_leap(1996),
true = year:is_leap(2004),
true = year:is_leap(2008),
% …except if they're multiples of 100…
false = year:is_leap(1800),
false = year:is_leap(1900),
false = year:is_leap(2100),
% …except if they're also multiples of 400…
true = year:is_leap(1600),
true = year:is_leap(2000),
true = year:is_leap(2400),

ok.
is_leap(_) -> false.

…aaand…

5> c(year), year:test().
** exception error: no match of right hand side value false
in function year:test/0 (year.erl, line 11)

Line 11 is the one for 1996 (as expected)… We now fix our code for that case, run the tests again… Many iterations go by… etc…

I’ll jump to the final version for brevity…

-module year.
-export [test/0].
-export [is_leap/1].
test() ->
% regular years are not leap
false = year:is_leap(1),
false = year:is_leap(1995),
false = year:is_leap(1997),
% multiples of 4 are leap…
true = year:is_leap(1996),
true = year:is_leap(2004),
true = year:is_leap(2008),
% …except if they're multiples of 100…
false = year:is_leap(1800),
false = year:is_leap(1900),
false = year:is_leap(2100),
% …except if they're also multiples of 400…
true = year:is_leap(1600),
true = year:is_leap(2000),
true = year:is_leap(2400),
ok.
is_leap(Year) ->
is_leap(Year rem 4, Year rem 100, Year rem 400).
is_leap(_, _, 0) -> true;
is_leap(_, 0, _) -> false;
is_leap(0, _, _) -> true;
is_leap(_, _, _) -> false.

et voilà…

7> c(year), year:test().
ok

More Complex Stuff

Of course, this technique only works for very small pieces of code that should not reach production. You don’t want that test code messing up your releases!

But if you choose Common Test as your testing framework, a suite for this module will look eerily familiar…

-module year_SUITE.
-export [all/0].
-export [is_leap/1].
all() -> [is_leap].
is_leap(_) ->
% regular years are not leap
false = year:is_leap(1),
false = year:is_leap(1995),
false = year:is_leap(1997),
% multiples of 4 are leap…
true = year:is_leap(1996),
true = year:is_leap(2004),
true = year:is_leap(2008),
% …except if they're multiples of 100…
false = year:is_leap(1800),
false = year:is_leap(1900),
false = year:is_leap(2100),
% …except if they're also multiples of 400…
true = year:is_leap(1600),
true = year:is_leap(2000),
true = year:is_leap(2400),
{comment, ""}.

What do you think? Did I convince you to use Test-Driven Development for Erlang? 😉


Simple TDD in Erlang was originally published in Erlang Battleground on Medium, where people are continuing the conversation by highlighting and responding to this story.

Permalink

Authentication in Elixir Web Applications with Ueberauth and Guardian: Part 2

This is the second post in a series covering authentication in a Phoenix application using Guardian and Ueberauth. In the first post, we created a basic Phoenix application with pages for registering for an account and logging into an account, as well as setting up our test framework for outside-in testing. In this post, we will add the ability for a user to create an account with the application using an email address and password.

Test driving account creation from the outside in

As with the first post, a great place to begin is by adding a test from the user’s perspective for registering for an account. This feature test will visit the registration page, fill in the required information, submit the request, and make assertions that verify the user is logged in to their new account.

# test/yauth_web/features/registering_for_an_account_test.exs
defmodule YauthWeb.Features.RegisteringForAnAccountTest do
  use YauthWeb.FeatureCase, async: true
  import Query, only: [fillable_field: 1, button: 1]

  @email_field fillable_field("account[email]")
  @password_field fillable_field("account[password]")
  @password_confirmation_field fillable_field("account[password_confirmation]")
  @submit_button button("Register")

  test "Registering for an account with valid info", %{session: session} do
    session =
      session
      |> visit("/register")
      |> fill_in(@email_field, with: "me@example.com")
      |> fill_in(@password_field, with: "superdupersecret")
      |> fill_in(@password_confirmation_field, with: "superdupersecret")
      |> click(@submit_button)

    assert_text(session, "Hello, me@example.com")
  end
end

This test will fail when we run it because the text the test expects to see on the page isn’t present. Unfortunately, this test doesn’t reveal the source of this failure. If we run through the feature in a browser, we’ll see that the browser is displaying a 404 page. Displaying a 404 page is the default behavior in Phoenix (when MIX_ENV is not “dev”) when a route is requested that does not exist. Our form on the registration page submits its data to the server as a POST request to “/register” that our router does not yet support.

To fix that add a post route to our Router for the “/register” path.

# lib/yauth_web/router.ex
defmodule YauthWeb.Router do
  use YauthWeb, :router

  # ...
  scope "/", YauthWeb do
    pipe_through [:browser, :authentication_views]
    get "/register", RegistrationController, :new
    post "/register", RegistrationController, :create
    # ...
  end
end

Running the test again reveals that our RegistrationController does not have a create/2 function.

Creating an account from user-provided parameters

Up to this point in our posts, we’ve largely been adding views and templates. Now things are starting to get interesting. Now that we have a POST route, we will need code that receives the incoming parameters, ensures those parameters are valid, creates an account, and adds the account token to the browser session. That’s a fair amount of work to do and we don’t want all of that sitting in our controller. Let’s break it down one step at a time.

First, add the create/2 function to our RegistrationController.

# lib/yauth_web/controllers/registration_controller.ex
defmodule YauthWeb.RegistrationController do
  use YauthWeb, :controller
  plug Ueberauth
  # ...
  def create(%{assigns: %{ueberauth_auth: auth_params}} = conn, _params) do
    # ...
  end
end

In Phoenix, most controller functions that handle POST requests deal with the second argument to the function as they contain the form parameters. By Plugging Ueberauth in our controller, however, Ueberauth adds the form arguments to the assigns field of the %Plug.Conn{} and places them within the standard data structure provided by and expected by Ueberauth functions. Instead of getting the data we need from the parameters, then, we will get them from the connection assigns by pattern matching on that structure to extract the data we need.

Now that we have bound a variable to those parameters, we need to validate them and create an account. Those responsibilities are better handled outside of our controller function. A popular pattern in Phoenix development is to use “contexts”—slices of our application domain that provide an API to the rest of the application code. As our user is registering for an “account”, an “Accounts” context seems a fitting place for our controller to send the parameters for processing. Let’s “write the code we wish we had” and then fill in the details:

# lib/yauth_web/controllers/registration_controller.ex
defmodule YauthWeb.RegistrationController do
  # ...
  plug Ueberauth
  alias Yauth.Accounts

  def create(%{assigns: %{ueberauth_auth: auth_params}} = conn, _params) do
    case Accounts.register(auth_params) do
      {:ok, account} ->
        redirect(conn, to: Routes.profile_path(conn, :show))

      {:error, changeset} ->
        render(conn, :new,
          changeset: changeset,
          action: Routes.registration_path(conn, :create)
        )
    end
  end
end

Here our controller sends the parameters to a register/1 function and expects to receive back either the tuple {:ok, account} with a newly created account or an error tuple with a changeset. If an account is created and returned, we will redirect the user to their profile page. If not, we will show them the form again, along with any errors present on the changeset.

Before diving into that function, let’s pause and add the dependencies and configuration we’ll need to work with Ueberauth. Add the following dependencies to your project:

defmodule Yauth.MixProject do
  # ...
  defp deps do
    [
      # ...
      {:ueberauth, "~> 0.6.3"},
      {:ueberauth_identity, "~> 0.3.0"},
      # ...
    ]
  end
  # ...
end

Next, add the following Ueberauth configuration:

# config/config.exs
# ...
config :ueberauth, Ueberauth,
  providers: [
    identity: {Ueberauth.Strategy.Identity, [
      param_nesting: "account",
      request_path: "/register",
      callback_path: "/register",
      callback_methods: ["POST"]
    ]}
  ]

# ...
import_config "#{Mix.env()}.exs"

Ueberauth is a two-phased authentication approach. In the first phase the user is challenged to present their credentials. In the second phase those credentials are validated by the service presenting the challenge—be that your application or an OAuth provider. To accomplish this two phase approach while maximizing flexibility, Ueberauth provides a common “strategy” interface that allows developers to implement different authentication strategies (e.g. Google OAuth, Twitter OAuth, GitHub OAuth, etc). Strategies are available for many common authentication options. We are using the “identity” strategy, so called because the user provides information about their identity such as an email address and password of their choosing.

As we are using an Account changeset for our challenge-phase form, Phoenix prefixes each of our form fields with account and submits the data nested under an “account” key. As such, we need to tell the Identity strategy to find its data nested under “account” so we use the param_nesting configuration option. Most OAuth authentication options send data about an authenticated resource as a GET request but our form is submitted as a POST request. We allow the Identity strategy to use that request by adding the callback_methods configuration options.

With that configuration accomplished, we can now return to our Accounts context and implement our register/1 function. Up to this point, our feature test has been sufficient to drive our changes. With the introduction of our “context” module, now is a good time to drop down a level of the testing pyramid and introduce an integration test for this register/1 function. What does that register function need to do? Let’s stipulate a few requirements up front:

  • If it receives a new email and matching passwords, it should create a new account.
  • If it receives an email that already exists, it should not create a new account; email addresses should be unique in the application.
  • If it receives a password and password confirmation that do not match, it should not create a new account.

For the sake of brevity, let’s jump ahead a bit in the test-driven process and add all of the tests for those requirements in one go.

# test/yauth/accounts_test.exs
defmodule Yauth.AccountsTest do
  use Yauth.DataCase

  alias Yauth.Accounts
  alias Yauth.Accounts.Account

  test "register for an account with valid information" do
    pre_count = count_of(Account)
    params = valid_account_params()

    result = Accounts.register(params)

    assert {:ok, %Account{}} = result
    assert pre_count + 1 == count_of(Account)
  end

  test "register for an account with an existing email address" do
    params = valid_account_params()
    Repo.insert!(%Account{email: params.info.email})

    pre_count = count_of(Account)

    result = Accounts.register(params)

    assert {:error, %Ecto.Changeset{}} = result
    assert pre_count == count_of(Account)
  end

  test "register for an account without matching password and confirmation" do
    pre_count = count_of(Account)
    %{credentials: credentials} = params = valid_account_params()

    params = %{
      params
      | credentials: %{
          credentials
          | other: %{
              password: "superdupersecret",
              password_confirmation: "somethingelse"
            }
        }
    }

    result = Accounts.register(params)

    assert {:error, %Ecto.Changeset{}} = result
    assert pre_count == count_of(Account)
  end

  defp count_of(queryable) do
    Yauth.Repo.aggregate(queryable, :count, :id)
  end

  defp valid_account_params do
    %Ueberauth.Auth{
      credentials: %Ueberauth.Auth.Credentials{
        other: %{
          password: "superdupersecret",
          password_confirmation: "superdupersecret"
        }
      },
      info: %Ueberauth.Auth.Info{
        email: "me@example.com"
      }
    }
  end
end

The Accounts context

The first step in getting this test to pass is to create the Accounts module and its register/1 function. That function will need to create an account changeset that can validate the parameters provided and attempt to insert it into the database. The changeset validation, however, can exist on the account module itself. Add this Accounts module and register/1 function:

# lib/yauth/accounts.ex
defmodule Yauth.Accounts do
  alias Yauth.Repo
  alias __MODULE__.Account

  def register(%Ueberauth.Auth{} = params) do
    %Account{}
    |> Account.changeset(extract_account_params(params))
    |> Yauth.Repo.insert()
  end

  defp extract_account_params(%{credentials: %{other: other}, info: info}) do
    info
    |> Map.from_struct()
    |> Map.merge(other)
  end
end

This implementation depends on the existence of an Account module. As we will want to persist this data to the database, we will need both a migration to set up the data table and the actual implementation module. The easiest way to create these is to use the generator provided by Phoenix. This schema will require both an email address and an encrypted version of the user’s password. Run the following command to create those modules:

mix phx.gen.schema Accounts.Account accounts \
  email:string encrypted_password:string

In order to support the requirement that an account email be unique, we will need to add a unique index to the migration. Add the following to the end of the change/0 function in our migration:

# priv/repo/migrations/<timestamp>_create_accounts.exs
  def change do
     # ...
    create unique_index(:accounts, [:email])
  end

At this point we can migrate our database.

mix ecto.migrate

Ideally we would continue down the testing pyramid into unit tests before implementing this module but we’ll skip that for the sake of brevity and lean solely on our higher level tests. The next step, then, is to implement the changeset function that will validate the data provided and be available for interaction with the database. This changeset will do several things. It will:

  • validate that email and password have been provided
  • validate that the password and the password confirmation match
  • convert a database-level unique constraint error to a changeset error for the email address
  • transform the provided password to its encrypted form and place it on the changeset

In order to support password encryption we will want to use an encryption package. I’ve chosen to use the Argon2 package for Elixir. Add the package to our mix file and get the dependency.

  # ...
  defp deps do
    [
      # ...
      {:argon2_elixir, "~> 2.3"},
      # ...
    ]
  end
mix deps.get

With that in place we can add the following to our Account module, noting the addition of the virtual field :password. This field does not exist in our database table but is present in the Elixir schema. This allows us to receive the password from the user in plain text but only store it in the database as an encrypted string under the :encrypted_password field.

# lib/yauth/accounts/account.ex
defmodule Yauth.Accounts.Account do
  use Ecto.Schema
  import Ecto.Changeset

  schema "accounts" do
    field :email, :string
    field :password, :string, virtual: true
    field :encrypted_password, :string

    timestamps()
  end

  def changeset(struct, params) do
    struct
    |> cast(params, [:email, :password])
    |> validate_required([:email, :password])
    |> validate_confirmation(:password, required: true)
    |> unique_constraint(:email)
    |> put_encrypted_password()
  end

  defp put_encrypted_password(%{valid?: true, changes: %{password: pw}} = changeset) do
    put_change(changeset, :encrypted_password, Argon2.hash_pwd_salt(pw))
  end

  defp put_encrypted_password(changeset) do
    changeset
  end
end

This function receives an account struct and params, validates that the email and password are present, validates that the password confirmation matches the password, adds a unique constraint on email, and finally adds the encrypted password to a valid changeset using the Argon2 package to encrypt the plain text password.

Now that a changeset is available we can begin using it in our controllers and templates. To keep our controller communicating exclusively with our context, we can add a convenience function to our Accounts context that will provide that changeset.

defmodule Yauth.Accounts do
  # ...
  def change_account(account \\ %Account{}) do
    Account.changeset(account, %{})
  end
  # ...
end

Update our RegistrationController to use that changeset and provide the action to submit the form using our routing helper.

defmodule YauthWeb.RegistrationController do
  # ...
  def new(conn, _) do
    render(conn, :new,
      changeset: Accounts.change_account(),
      action: Routes.registration_path(conn, :create)
    )
  end
  # ...
end

Our template is already referencing the @changeset and @action assigns so we don’t need to make an update there.

Logging in a newly created account

At this point the RegistrationController is directing the creation of an account from an email address and password. We also need, however, to log in the account so the account owner can access their resources. This is where Guardian enters the picture.

The Guardian package uses JSON Web Tokens (JWTs) to provide the mechanism for authentication. Guardian can be configured to inspect various parts of a web request for a token and validate it. It also gives us the ability to add that token and the resource it represents to the browser session, providing us with a “logged in” experience.

To use Guardian, first add it as a dependency.

  defp deps do
    [
      # ...
      {:guardian, "~> 2.1"},
      # ...
    ]
  end
mix deps.get

Incorporating Guardian

In order to work with Guardian we’ll need to make several additions to our project. We will need to create a module that implements the Guardian behaviour and provides two key callback functions. We will need an error handling module that is called in the event a user attempts to access a protected resource without authenticating. We will build a pipeline module that will tell Guardian where to look for tokens in the web request. Lastly, we’ll need to provide some configuration values that will configure Guardian. We will walk through each of these one at a time.

The central module of our Guardian system is an implementation module for the Guardian behaviour. This module provides two key functions for interacting with our JWT. Many examples of using Guardian name the module Guardian within the application name space. I’d like, however, to give the module a more generic name of Authentication, in anticipation of giving it other authentication related functions that will place the external library behind our own API. Add the module below with these two callback functions.

# lib/yauth_web/authentication.ex
defmodule YauthWeb.Authentication do
  @moduledoc """
  Implementation module for Guardian and functions for authentication.
  """
  use Guardian, otp_app: :yauth
  alias Yauth.{Accounts, Accounts.Account}

  def subject_for_token(resource, _claims) do
    {:ok, to_string(resource.id)}
  end

  def resource_from_claims(%{"sub" => id}) do
    case Accounts.get_account(id) do
      nil -> {:error, :resource_not_found}
      account -> {:ok, account}
    end
  end
end

This module implements the two functions needed to work with Guardian. The first is subject_for_token/2. The job of this function is to encode a resource into the token. In Yauth our resource is an Account but this could be a User, a Player, or any number of other schemas depending on our application context and naming. The point is that the schema that represents the entity being authenticated is encoded into the token so that subsequent requests can validate and load that resource from the token. In our implementation we are using the unique database id of the resource for encoding. Other attributes could be used for this purpose provided they uniquely identify the resource within our application.

The second function, resource_from_claims/1, does the opposite. It receives the decoded token as an argument and uses the subject identifier to load the resource from the database. Our subject_for_token/2 function encoded the account id, so we use that value to find the account it represents in the database and return it as part of a tuple. We will need to add the function used to load the account to our Accounts module.

# lib/yauth/accounts.ex
defmodule Yauth.Accounts do
  # ...

  def get_account(id) do
    Repo.get(Account, id)
  end
end

Next, we’ll want to define a pipeline module that can be used by our router to tell Guardian where to look for tokens and what to do in the event it finds one.

# lib/yauth_web/authn/pipeline.ex
defmodule YauthWeb.Authentication.Pipeline do
  use Guardian.Plug.Pipeline,
    otp_app: :yauth,
    error_handler: YauthWeb.Authentication.ErrorHandler,
    module: YauthWeb.Authentication

  plug Guardian.Plug.VerifySession, claims: %{"typ" => "access"}
  plug Guardian.Plug.LoadResource, allow_blank: true
end

This pipeline receives the %Plug.Conn{} that represents the web request and passes it through a series of Guardian plugs. The first plug tells Guardian to look in the session for a token and to validate it if found. It also restricts the token type to an access token. If a token is found and validated, the LoadResource plug invokes the resource_from_claims/1 function in our implementation module to load the resource and adds it to the private section of the %Plug.Conn{}. We also pass the LoadResource plug the allow_blank: true option to allow the resource to be nil rather than raise an exception if the resource isn’t found. This allows us to deal with that situation elsewhere in the stack.

Our pipeline above references the YauthWeb.Authentication.ErrorHandler module, which we can to create now.

# lib/yauth_web/authentication/error_handler.ex
defmodule YauthWeb.Authentication.ErrorHandler do
  use YauthWeb, :controller

  @behaviour Guardian.Plug.ErrorHandler

  @impl Guardian.Plug.ErrorHandler
  def auth_error(conn, {type, _reason}, _opts) do
    conn
    |> put_resp_content_type("text/plain")
    |> send_resp(401, to_string(type))
  end
end

The function in this error handler module is invoked if a user tries to access a protected resource without authentication. For the time being, we’ll just have it send a 401 response. Once we’ve added the authentication and log in code, we’ll return to this and make it friendlier for browser users.

Finally, we can provide some configuration for Guardian. Add the following to our config/config.exs file:

# ...
config :yauth, YauthWeb.Authentication,
  issuer: "yauth",
  secret_key: System.get_env("GUARDIAN_SECRET_KEY")

# ...
import_config "#{Mix.env()}.exs"

Here we define the issuer of the JWT as the “yauth” application. We also provide a secret_key used for encoding the token. We can generate the secret key by running mix guardian.gen.secret. We don’t want to store this value in source control, though, so we will want to add it to our secrets management strategy and reference it from there. Here we load it from an environment variable, so be sure to export it in our shell.

export GUARDIAN_SECRET_KEY=whatever-key-was-generated

Now that we have the implementation module, pipeline, error handler, and configuration in place, we can begin using it in the rest of the application. Our first step is to add our pipeline to the Router.

defmodule YauthWeb.Router do
  # ...
  pipeline :guardian do
    plug YauthWeb.Authentication.Pipeline
  end

  # ...
  scope "/", YauthWeb do
    pipe_through [:browser, :guardian]

    get "/register", RegistrationController, :new
    post "/register", RegistrationController, :create
    get "/login", SessionController, :new
  end
  # ...
end

Here we created a router pipeline called :guardian that simply calls through to our pipeline module. We also added that to our pipe_through call for all of our registration and session routes. Remember that the pipeline is not logging in or authenticating the resource. It is simply checking for and validating the token in the session and loading the resource onto the connection in the event it is found. We’ll want this on these routes so that we can easily redirect an authenticated user who tries to access these routes to their resources without another authentication challenge.

We can now add the log in functionality to our RegistrationController. To “log in” the account we will want to add the token to the browser session that is passed back and forth between the browser and server. Rather than add that as part of the controller directly, however, we’ll add functions to our Authentication module to take care of that.

First, add the code we wish we had to the controller.

# lib/yauth_web/controllers/registration_controller.ex
defmodule YauthWeb.RegistrationController do
  # ...
  alias YauthWeb.Authentication

  def create(conn, %{"account" => account_params}) do
    case Accounts.register(account_params) do
      {:ok, account} ->
        conn
        |> Authentication.log_in(account)
        |> redirect(to: Routes.profile_path(conn, :show))

      {:error, changeset} ->
        render(conn, :new,
          changeset: changeset,
          action: Routes.registration_path(conn, :create)
        )
    end
  end
end

Now add the log_in/2 function to the Authentication module.

# lib/yauth_web/authentication.ex
defmodule YauthWeb.Authentication do
  # ...

  def log_in(conn, account) do
    __MODULE__.Plug.sign_in(conn, account)
  end
  # ...
end

This function introduces the sign_in/1 function of Guardian. The Authentication module is our Guardian implementation module. As such it has access to the Plug.sign_in/2 function. That function does a few things relevant to our situation. First, it adds the current resource to the connection in the Guardian-configured location. Second, it adds the token to the session to indicate the account has logged in.

Protecting resources from non-authenticated users

At this point we are creating an account and logging in after so doing. However, we haven’t done anything to protect resources. Our RegistrationController redirects the user to their profile page after logging in. That page, however, should be protected from access by users who have not authenticated with the application. Let’s add that now.

Let’s first create a test from the user’s perspective. In this test the user will try to access the profile page without first logging in. We expect that the user is redirected to the log in page instead of arriving at the profile page.

# test/yauth_web/features/profile_access_test.exs
defmodule YauthWeb.ProfileAccessTest do
  use YauthWeb.FeatureCase, async: true

  test "Accessing a protected resource without logging in", %{session: session} do
    visit(session, "/profile")

    assert current_path(session) == "/login"
  end
end

This test fails when we run it, which is what we want at this point. Thinking about this feature from the high level, we want to ensure the user accessing this page has successfully authenticated with the application. If they have, we grant them access to the resource. If they haven’t authenticated with the application, we want to redirect them to the log in page so that they can authenticate.

Guardian provides a plug that helps with just that. Add the following to our Router:

# lib/yauth_web/router.ex
defmodule YauthWeb.Router do
  # ...
  pipeline :browser_auth do
    plug Guardian.Plug.EnsureAuthenticated
  end
  # ...
  scope "/", YauthWeb do
    pipe_through [:browser, :guardian, :browser_auth]

    resources "/profile", ProfileController, only: [:show], singleton: true
  end
end

Now any requests to the /profile path will pass through the :browser_auth pipeline. This pipeline uses another Guardian-provided plug called EnsureAuthenticated. This plug ensures that the session contains a valid token. If it does not, it invokes the ErrorHandler module’s auth_error/3 function. Previously we set that function to simply render a plain text 401 page. What we want it to do at this point, however, is to redirect the unauthenticated user to the log in page so they can authenticate. Update our ErrorHandler module to the following:

# lib/yauth_web/authentication/error_handler.ex
defmodule YauthWeb.Authentication.ErrorHandler do
  # ...
  def auth_error(conn, {_type, _reason}, _opts) do
    conn
    |> put_flash(:error, "Authentication error.")
    |> redirect(to: Routes.session_path(conn, :new))
  end
end

At this point our protected resource test should pass. The log in test, however, is still failing because we haven’t added the profile code. Let’s do that now.

As with previous route additions, we’ll need to add a controller, view, and template. Add these simple views and templates to our project:

# lib/yauth_web/views/profile_view.ex
defmodule YauthWeb.ProfileView do
  use YauthWeb, :view
end
<!-- lib/yauth_web/templates/profile/show.html.eex -->
<h1>Hello, <%= @current_account.email %></h1>

Our controller, however, needs to do a bit more work. We need to get the current account that is logged in and add it as an assign on the connection so that it is available in the template. The Router is protecting access to this page, so we just need to get the account from where Guardian left it as a result of our Pipeline call. Guardian provides a function for easily accessing this called current_resource/2. To simplify our controller’s communication, however, let’s put that call behind a function in our Authentication module. Add the following controller:

# lib/yauth_web/controllers/profile_controller.ex
defmodule YauthWeb.ProfileController do
  use YauthWeb, :controller
  alias YauthWeb.Authentication

  def show(conn, _params) do
    current_account = Authentication.get_current_account(conn)
    render(conn, :show, current_account: current_account)
  end
end

Then add the function to our Authentication module.

# lib/yauth_web/authentication.ex
defmodule YauthWeb.Authentication do
  # ...
  def get_current_account(conn) do
    __MODULE__.Plug.current_resource(conn)
  end
  # ...
end

With those changes all our tests should now be passing.

Recap

In this post we added the ability for users to register for accounts in Yauth using an email address and password. We did so by adding a POST route and adding an Accounts context that handles validating the user-submitted parameters and creating an account. We brought Guardian into the project, provided an implementation of its behaviour unique to Yauth, and used its functions to add the user to the session. We then redirected our logged in user to a resource that requires authentication to view.

In the next post we’ll add the ability for account holders to log in and log out of their existing account.

Permalink

Authentication in Elixir Web Applications with Ueberauth and Guardian: Part 1

The majority of web applications require some kind of user or account authentication at some point in their life cycle. One popular option for authentication in an Elixir or Phoenix web application is to use the Guardian package together with Ueberauth. In order to learn about this authentication option, this series of posts will build a basic Phoenix application with at least the following features:

A user can:

  • register for an account using their email and password or by using their Google account
  • log in to their account using their email and password or by using their Google account
  • log out of their account
  • access protected resources only after logging in

In order to follow along with these posts, it’s best if you have basic familiarity with Elixir, Phoenix, and with running commands on the command line. If these are unfamiliar to you, the Elixir language getting started guide, the Phoenix guides, and Learn Enough Command Line to Be Dangerous are good places to begin.

Setting up a basic Phoenix application with authentication pages

This first post will cover creating a basic Phoenix application called Yauth—an amalgamation of “You Authenticate”—and creating the pages needed for authentication. If you’ve not done so already, you will need to install Phoenix by following the instructions on the Phoenix web site. This tutorial will be using Phoenix 1.4.14 but any version >= 1.4 should be fine to follow along.

Once you have Phoenix installed, run the following command to generate the Phoenix application.

mix phx.new yauth

When prompted, accept the request to install dependencies and then follow the instructions to change into the application directory, establish the database, and confirm that the application runs.

Test driving the application from the outside in

While these posts will not follow a strict test-driven development approach, a great place for us to start is by writing a feature test from the perspective of our users. Given we are building a web application, we should begin by writing tests that use a browser to drive the application in the same way our end user will. To accomplish those browser-based tests, these posts will use Wallaby, a popular Elixir acceptance testing package.

Setting up Wallaby for automated browser tests

First, let’s add Wallaby to our list of dependencies.

# mix.exs
defmodule Yauth.MixProject do
  # ...
  defp deps do
    [
      # ...
      {:wallaby, "~> 0.23.0", [runtime: false, only: :test]},
    ]
  end
  # ...
end

Now we can tell mix to get those dependencies:

mix deps.get

Wallaby uses a browser driver to interact with a browser. It uses PhantomJS by default but, as PhantomJS has been discontinued, this tutorial will use the headless chromedriver instead. You will need to install chromedriver by following the instructions on the chromedriver website.

We will also need to make a few additions to our project to work with Wallaby.

First, we need ensure that Wallaby has started when our tests are started. Add the following to our test helper:

# test/test_helper.exs
# ...
{:ok, _} = Application.ensure_all_started(:wallaby)
Application.put_env(:wallaby, :base_url, YauthWeb.Endpoint.url)

Next we need to configure our application to ensure that the Phoenix Endpoint is running as a server during tests, to indicate which browser driver to use, and to set the database to use the sandbox mode to allow for concurrent testing.

# config/test.exs
# ...
config :yauth, YauthWeb.Endpoint,
  server: true

config :yauth, :sql_sandbox, true

config :wallaby,
  driver: Wallaby.Experimental.Chrome

We also need to configure the Endpoint to use the SQL sandbox if that option is set.

# lib/yauth_web/endpoint.ex
defmodule YauthWeb.Endpoint do
  use Phoenix.Endpoint, otp_app: :yauth

  if Application.get_env(:yauth, :sql_sandbox),
    do: plug(Phoenix.Ecto.SQL.Sandbox)
  # ...
end

Finally, to make testing with Wallaby easier, let’s create a FeatureCase that can be used by our browser-based tests.

# test/support/feature_case.ex
defmodule YauthWeb.FeatureCase do
  @moduledoc """
  This module defines the test case to be used by browser-based tests.
  """

  use ExUnit.CaseTemplate

  using do
    quote do
      use Wallaby.DSL
    end
  end

  setup tags do
    :ok = Ecto.Adapters.SQL.Sandbox.checkout(Yauth.Repo)

    unless tags[:async] do
      Ecto.Adapters.SQL.Sandbox.mode(Yauth.Repo, {:shared, self()})
    end

    metadata = Phoenix.Ecto.SQL.Sandbox.metadata_for(Yauth.Repo, self())
    {:ok, session} = Wallaby.start_session(metadata: metadata)
    {:ok, session: session}
  end
end

This case does a few things for us. By useing the Wallaby.DSL, it imports all of the functions of Wallaby.Browser and aliases Wallaby.{Browser, Element, Query}, making those modules available to our tests under those aliases. It also establishes a Wallaby session and makes it available to each test case.

With Wallaby in place, we can now write our first feature test. This test will confirm that a user can reach the first two pages of our application—a registration page and a login page—by visiting those pages and asserting that the content is as expected.

For the registration page, we will expect to see a form with fields for an email address, a password, a password confirmation, and links to the social logins we (will eventually) support. Similarly, the login page should contain a form with fields for email and password as well as links for social login.

# test/yauth_web/features/authentication_pages_test.exs
defmodule YauthWeb.Features.AuthenticationPagesTest do
  use YauthWeb.FeatureCase, async: true
  import Query, only: [fillable_field: 1, link: 1]

  @email_field fillable_field("account[email]")
  @password_field fillable_field("account[password]")
  @password_confirmation_field fillable_field("account[password_confirmation]")

  test "Visiting the registration page", %{session: session} do
    session = visit(session, "/register")

    assert_text(session, "Register")
    assert_has(session, @email_field)
    assert_has(session, @password_field)
    assert_has(session, @password_confirmation_field)
    assert_has(session, link("Google"))
    assert_has(session, link("Twitter"))
  end

  test "Visiting the login page", %{session: session} do
    session = visit(session, "/login")

    assert_text(session, "Log In")
    assert_has(session, @email_field)
    assert_has(session, @password_field)
    assert_has(session, link("Google"))
    assert_has(session, link("Twitter"))
  end
end

At this point, we can run our tests with mix test and see our first failing tests. Once these tests are passing, we can be confident our authentication pages are in place.

Creating authentication pages

Our tests above are expecting our application to have a “/register” and a “/login” route. To make those available, make the following changes to our Router.

# lib/yauth_web/router.ex
defmodule YauthWeb.Router do
  # ...

  scope "/", YauthWeb do
    pipe_through :browser

    get "/", PageController, :index
    get "/register", RegistrationController, :new
    get "/login", SessionController, :new
  end
end

Here we configure our Router to route GET requests at “/register” to the new/2 function of the RegistrationController and GET requests at “/login” to the new/2 function of the SessionController. Running our tests with mix test will highlight the need to create these controller modules.

# lib/yauth_web/controllers/registration_controller.ex
defmodule YauthWeb.RegistrationController do
  use YauthWeb, :controller

  def new(conn, _) do
    render(conn, :new, changeset: conn, action: "/register")
  end
end

# lib/yauth_web/controllers/session_controller.ex
defmodule YauthWeb.SessionController do
  use YauthWeb, :controller

  def new(conn, _params) do
    render(conn, :new, changeset: conn, action: "/login")
  end
end

For now, our intention is simply to display these pages to the user without providing any actual functionality. We are passing a changeset and an action as assigns even though we don’t have a schema or actual changeset at this point. Fortunately, the form_for function accepts either a %Plug.Conn{} or a changeset so we’ll take advantage of that here so we don’t have to change our templates in the future. We are also just hard coding the action paths for now.

Both of these controllers simply render the :new template. In order for that rendering to happen, however, Phoenix needs a view module (a module that organizes all of the functions of a controller’s view) and an HTML template. We can create the views for these controllers with the following:

# lib/yauth_web/views/registration_view.ex
defmodule YauthWeb.RegistrationView do
  use YauthWeb, :view
end

# lib/yauth_web/views/session_view.ex
defmodule YauthWeb.SessionView do
  use YauthWeb, :view
end

Next, we will need the HTML templates for these views. The form will use the @changeset (really the %Plug.Conn{} at this point) and the @action provided in the assigns. It will also use the :as keyword option to identify the data in the form as account data.

# lib/yauth_web/templates/registration/new.html.eex
<div class="row">
  <div class="column column-50 column-offset-25">
    <h1>Register</h1>
    <%= form_for @changeset, @action, [as: :account], fn f -> %>
      <%= label f, :email, "Email address" %>
      <%= email_input f, :email %>
      <%= error_tag f, :email %>

      <%= label f, :password, "Password" %>
      <%= password_input f, :password %>
      <%= error_tag f, :password %>

      <%= label f, :password_confirmation, "Password Confirmation" %>
      <%= password_input f, :password_confirmation %>
      <%= error_tag f, :password_confirmation %>

      <%= submit "Register", class: "button button-primary" %>
    <% end %>
  </div>
</div>
<div class="row">
  <div class="column column-50 column-offset-25">
    <p>
      Already have an account?
      <%= link("Log in here", to: Routes.session_path(@conn, :new)) %>
    </p>
  </div>
</div>
<div class="row">
  <div class="column column-50 column-offset-25">
    <%= render(YauthWeb.SessionView, "social_links.html", assigns) %>
  </div>
</div>

And the template for the login view is similar:

# lib/yauth_web/templates/session/new.html.eex
<div class="row">
  <div class="column column-50 column-offset-25">
    <h1>Log In</h1>
    <%= form_for @changeset, @action, [as: :account], fn f -> %>
      <%= label f, :email, "Email address" %>
      <%= email_input f, :email %>
      <%= error_tag f, :email %>

      <%= label f, :password, "Password" %>
      <%= password_input f, :password %>
      <%= error_tag f, :password %>

      <%= submit "Log In", class: "button button-primary" %>
    <% end %>
  </div>
</div>
<div class="row">
  <div class="column column-50 column-offset-25">
    <p>
      Need an account?
      <%= link "Register here", to: Routes.registration_path(@conn, :new) %>
    </p>
  </div>
</div>
<div class="row">
  <div class="column column-50 column-offset-25">
    <%= render(YauthWeb.SessionView, "social_links.html", assigns) %>
  </div>
</div>

And the template for the social login links:

<!-- lib/yauth_web/templates/session/social_links.html.eex -->
<div class="social-log-in">
  <p>Or log in with</p>
  <%= link "Google", to: "#", class: "button button-outline" %>
  <%= link "Twitter", to: "#", class: "button button-outline" %>
</div>

Now when we run our tests with mix test all of our test should pass.

Recap

In this post we created a new Phoenix application called Yauth. We added routes for displaying a registration and a login form to a user. We added the controller, views, and templates to provide those forms. We also added the test infrastructure needed for browser-based testing and added a test to ensure those forms are displayed. This will lay the groundwork from which we can build our authentication features.

In the next post, we will add the functionality for actually creating an account for the user and logging them in during that creation process.

Permalink

Copyright © 2016, Planet Erlang. No rights reserved.
Planet Erlang is maintained by Proctor.