The big questions for enterprise blockchain.

Meet Arzu Toren, Finance Industry expert and banking academic.

Arzu Toren is a Global Banker with 20 years of experience working in international and Turkish commercial banks in the areas of asset management, trade finance, debt instruments and business development. She is currently continuing her independent research to drive value through the application of blockchain and for the blockchain ecosystem. In this blog, Arzu offers an up-to-date take on the key discussion points in the field of blockchain.

Blockchain for good.

Blockchain is a game-changing technology which is due to have fundamental impacts on enterprises as it can transform moving, storing, lending, exchanging value, funding, investing and managing risk. It is moving 3Ps – people, process and paperwork. In the digital era, data is power and blockchain is a universal bus to carry the data by ensuring its validity.

Like every radical innovation, blockchain encapsulates hurdles for adoption and diffusion. It is complicated in legal, regulatory, financial and operational aspects and its challenges need technical, behavioural, educational and business solutions. Long-term adoption is due to take time with a gradual transition. The process is not linear, you learn along the way and reducing complexity is the key. Blockchain should not be considered as a magic stick but as one of the technologies that will be a part of the next-generation infrastructure and developed according to needs. Distributed ledger based on blockchain is a form of database; it will not make decisions for enterprises, they need to learn how to use it to create value.

Blockchain is expanding and moves quickly in new businesses. Resistance, lack of education, issues of interoperability and scalability are the main reasons why it is moving slowly in some incumbent and large organizations. The ideal implementation steps of blockchain in enterprises are:

  1. choosing the right use case
  2. selecting the right platform
  3. starting small for testing purposes

The end-customer doesn’t need to know how blockchain works, they don’t even need to know the word ‘blockchain’. While technology-side is the enabler and business-side is the main driver, the primary focus should be on the use case and outcome.

Interoperability

Blockchain requires a solid legal foundation to allow for interoperability. Regulations are rules imposed by an authority representing the interests of the public while governance embodies a set of rules imposed by the participants of a system to protect their own interests. Blockchain faces major uncertainties in both fields. Internationally accepted regulations need to be developed and new principles need to be introduced in order to incorporate blockchain to the market infrastructure, comply with cross border & domestic guidelines and identify the legal ownership of documents. All these arrangements need careful engineering and take time.

Blockchain is designed to support networks and requires cooperation to flourish. If the ecosystem is not connected, blockchain is not much of a use. Enterprises, innovators and regulators need to collaborate to make blockchain work in practice. In the case of consortiums, the answers to questions like ‘Who has jurisdiction when a network is spread over a large number of regions and companies?’ need to be clear. A blockchain would also need a governing body, which can be challenging when stakeholders have divergent interests. Moreover, industry alignment is required in major issues such as design and interaction principles.

Scalability, Security & Resilience

To make full use of blockchain, high standards are required for security, robustness, and performance. Resolving challenges such as network capacity, transaction speed, verification process and data limits would be critical. Without scalability, higher energy costs could eliminate the benefits of any blockchain. If the technology does not receive sufficient market scale, the shared benefits will be muted and there will only be a marginal improvement over today’s structure.

Blockchain needs to be integrated into existing technology to be usable and affordable. Replacing legacy systems with a new technology is complex with a very low tolerance for error. Moving the existing contracts and documents to blockchain based methodology requires the execution of a significant set of migration tasks. Operational risk of transition needs to be minimized. Getting to decentralization might be easier starting from scratch compared to making the transition from central to decentralized services. Furthermore, although irreversibility is one of the features of blockchain, it might become a challenge and needs to be managed carefully.

The identity of each party to a blockchain-enabled transaction is protected by cryptography techniques but enterprises will want to ensure that information isn’t revealed to competitors through analysis of their information. In its current state, blockchain requires above-average computer literacy to use effectively, which acts as a barrier to entry for enterprises that are interested in applications but do not know where to begin. Additionally, there is no guarantee that blockchain will never be subject to any cybersecurity attacks; the system is robust but there is still risk.

Centralised vs Decentralised DLTs for Real-World Production

At the highest level, the blockchains can be categorized into three groups:

Public (Permissionless) Blockchains - A public blockchain is a platform where anyone in the world can read, write, send transactions and participate in the consensus process provided they are able to show the proof of work. Public blockchains are open-source, fully decentralized and don’t have any initial costs.

Private (Permissioned) Blockchains - A private blockchain allows only the owner to have the right to access or make any changes on the network. It is similar to existing infrastructure where the owners have the power to change the rules or participants based on needs. Private blockchains have quite high initial capital and maintenance expenses.

Hybrid Blockchains - A hybrid blockchain would be a mix of both public and private blockchains where the abilities could be extended to different nodes based on pre-set criteria. All running costs need to be met by the participating organizations.

The blockchains that would capture the most attention in the first place are private or hybrid blockchains as they are more specialized, efficient and compatible with existing systems. The most effective approach for employment of private blockchains is to treat their features like a catalog and tailor different combinations for different use-cases. However, private blockchains are very similar to distributed databases already used by enterprises and unless they integrate with public blockchains where the audit trail is more secure and control over transactions is not in the hands of trusted nodes, there is a question mark whether blockchain could have a drastic effect.

In the early applications, some enterprises test blockchain internally to manage their business units. A large group with subsidiaries around the world may benefit from an internal blockchain to synchronise data and systems across group companies, AML & KYC policies and payments. Moreover, they could learn the technology, decide if it is strategically fit and test if they can expand to their customer related products as the second phase.

Learn more

Erlang and the BEAM VM are set to be a huge part of blockchain technology. Hot code reloading, OTP and fault-tolerance all offer blockchain frameworks the resilience and reliability they need. Already, companies such as Aeternity and ArcBlock are reaping the benefits of the BEAM in their blockchain developments. To find out more about how and why the BEAM maps perfectly to the DNA of blockchain register for our webinar with Tyr Chen, the VP of Engineering at ArcBlock.

Permalink

How to debug your RabbitMQ

What you will learn in this blog.

Our RabbitMQ consultancy customers come from a wide range of industries. As a result, we have seen almost all of the unexpected behaviours that it can throw at you. RabbitMQ is a complex piece of software that employs concurrency and distributed computing (via Erlang), so debugging it is not always straightforward. To get to the root cause of an unexpected (and unwanted) behaviour, you need the right tools and the right methodology. In this article, we will demonstrate both to help you learn the craft of debugging in RabbitMQ.

The problem of debugging RabbitMQ.

The inspiration for this blog comes from a real-life example. One of our customers had the RabbitMQ Management HTTP API serving crucial information to their system. The system relied on the API heavily, specifically on /api/queues endpoint because the system needed to know the number of messages ready in each queue in a RabbitMQ cluster. The problem was that sometimes a HTTP request to the endpoint lasted up to tens of seconds (in the worst case they weren’t even able to get a response from the API).
So what caused some requests to take so much time? To answer that question, we tried to reproduce the issue through load testing.

Running load tests

We use a platform that we created for MongooseIM to run our Continuous Load Testing. Here are some of the most important aspects of the platform:

  1. all the services that are involved in a load test run inside docker containers
  2. the load is generated by Amoc; it’s an open source tool written in Erlang for generating massively parallel loads of any kind (AMQP in our case)
  3. metrics from the system under test and Amoc site are collected for further analysis.

The diagram below depicts a logical architecture of an example load test with RabbitMQ:

load testing diagram

In the diagram, the left-hand side, shows a cluster of Amoc nodes that emulate AMQP clients which, in turn, generate the load against RabbitMQ. On the other side, we can see a RabbitMQ cluster that serves the AMQP clients. All the metrics from both the Amoc and RabbitMQ services are collected and stored in an InfluxDB database.

Slow Management HTTP API queries

We tried to reproduce the slow queries to Management HTTP API in our load tests. The test scenario was fairly simple. A bunch of publishers were publishing messages to default exchange. Messages from each publisher were routed to a dedicated queue (each publisher had a dedicated queue). There were also consumers attached to each queue. Queue mirroring was enabled.
For concrete values, check the table below:

load test table

That setup stressed the Rabbit servers on our infrastructure. As seen in the graphs below:

rabbitmq cpu usuage

rabbitmq ram table

Every RabbitMQ node consumed about 6 (out of 7) CPU cores and roughly 1.4GB of RAM except for rabbitmq-1 which consumed significantly more than the others. That was likely because it had to serve more of the Management HTTP API requests than the other two nodes.

During the load test /api/queues endpoint was queried every two seconds for the list of all queues together with corresponding messages_ready values. A query looked like this:

http://rabbitmq-1:15672/api/queues?columns=name,messages_ready

Here are the results from the test:

rabbitmq ram table

The figure above shows the query time during a load test. It’s clear that things are very slow. The median equals 1.5s while the 95, 99, 999 percentiles and max reach 20s.

Debugging

Once the issue is confirmed and can be reproduced, we are ready to start debugging. The first idea was to find the Erlang function that is called when a request to the RabbitMQ Management HTTP API comes in and determine where that function spends its execution time. If we were able to do this it would allow us to localise the most time expensive code behind the API.

Finding the entrypoint function

To find the function we were looking for we took the following steps:

  1. looked through the RabbitMQ Management Plugin to find the appropriate “HTTP path to function” mapping,
  2. used the Erlang tracing feature to verify if a found function is really called when a request comes in.

The management plugin uses cowboy (an Erlang HTTP server) underneath to serve the API requests. Each HTTP endpoint requires a cowboy callback module, so we easily found the rabbit_mgmt_wm_queues:to_json/2 function which seemed to handle requests coming to the /api/queues. We confirmed that with tracing (using a recon library that is shipped with RabbitMQ by default).

root@rmq-test-rabbitmq-1:/rabbitmq_server-v3.7.9# erl -remsh rabbit@rmq-test-rabbitmq-1 -sname test2 -setcookie rabbit  
Erlang/OTP 21 [erts-10.1] [source] [64-bit] [smp:22:7] [ds:22:7:10] [async-threads:1]  

Eshell V10.1  (abort with ^G)  
(rabbit@rmq-test-rabbitmq-1)1> recon_trace:calls({rabbit_mgmt_wm_queues, to_json, 2}, 1).  
1  

11:0:48.464423 <0.1294.15> rabbit_mgmt_wm_queues:to_json(#{bindings => #{},body_length => 0,cert => undefined,charset => undefined,  
  has_body => false,  
  headers =>  
      #{<<"accept">> => <<"*/*">>,  
        <<"authorization">> => <<"Basic Z3Vlc3Q6Z3Vlc3Q=">>,  
        <<"host">> => <<"10.100.10.140:53553">>,  
        <<"user-agent">> => <<"curl/7.54.0">>},  
  host => <<"10.100.10.140">>,host_info => undefined,  
  media_type => {<<"application">>,<<"json">>,[]},  
  method => <<"GET">>,path => <<"/api/queues">>,path_info => undefined,  
  peer => {{10,100,10,4},54136},  
  pid => <0.1293.15>,port => 53553,qs => <<"columns=name,messages_ready">>,  
  ref => rabbit_web_dispatch_sup_15672,  
  resp_headers =>  
      #{<<"content-security-policy">> => <<"default-src 'self'">>,  
        <<"content-type">> => [<<"application">>,<<"/">>,<<"json">>,<<>>],  
        <<"vary">> =>  
            [<<"accept">>,  
             [<<", ">>,<<"accept-encoding">>],  
             [<<", ">>,<<"origin">>]]},  
  scheme => <<"http">>,  
  sock => {{172,17,0,4},15672},  
  streamid => 1,version => 'HTTP/1.1'}, {context,{user,<<"guest">>,  
               [administrator],  
               [{rabbit_auth_backend_internal,none}]},  
         <<"guest">>,undefined})  
Recon tracer rate limit tripped.  

The snippet above shows that we enabled tracing for rabbit_mgmt_wm_queues:to_json/2 first, then we manually sent a request to the Management API (using curl; not visible on the snippet) and which generated the trace event. That’s how we found our entrypoint for further analysis.

Using flame graphs

Having found a function that serves the requests, we can now check how that function spends its execution time. The ideal technique to do this is Flame Graphs. One of its definitions states:

Flame graphs are a visualisation of profiled software, allowing the most frequent code-paths to be identified quickly and accurately.
In our case, we could use flame graphs to visualise the stack trace of the function or, in other words, which functions are called inside a traced function, and how much time it takes (relatively to the traced function’s execution time) for these functions to execute. This visualisation helps to identify suspicious functions in the code quickly.

For Erlang, there is a library called eflame that has tools for both: gathering traces from the Erlang system and building a flame graph from the data. But how do we inject that library into Rabbit for our load test?

Building a custom RabbitMQ docker image

As we mentioned previously, all the services in our load testing platform run inside docker containers. Hence, we had to build a custom RabbitMQ docker image with the eflame library included in the server code. We created a rabbitmq-docker repository that makes it easy to build a docker image with modified RabbitMQ source code.

Profiling with eflame

Once we had a modified RabbitMQ docker image with eflame included, we could run another load test (specifications were the same as the previous test) and start the actual profiling. These were the results:

rabbitmq ram table

rabbitmq ram table

We ran a number of measurements and had two types of result as presented above. The main difference between these graphs is in rabbit_mgmt_util:run_run_augmentation/2 function. What does that difference mean?

From the results of the previous load tests and manual code analysis, we know that there are slow and fast queries. The slow requests can take up to twenty seconds while the fast ones only take a few. It confirms the query time chart above with: 50 percentile about 1.5s but 95 (and higher percentiles) equaling up to 20s. Moreover, we manually measured execution time of both cases using timer:tc/3 and the results were consistent.

This happens because there is a cache in the Management plugin. When the cache is valid, the requests are served much faster as the data has already been collected, but when it’s invalid, all the necessary information needs to be gathered.

Despite the fact that the graphs have the same length in the picture, they represent different execution times (fast vs slow). Hence it’s hard to guess which graph shows which query without actually taking a measurement. The first graph shows a fast query while the second shows a slow one. In the slow query graph rabbit_mgmt_util:augment/2 -> rabbit_mgmt_db:submit_cached/4 -> gen_server:call/3 -> … the stack takes so much time because the cache is invalid and fresh data needs to be collected. So what happens when data is collected?

Profiling with fprof

You might ask “why don’t we see the data collection function(s) in the flame graphs?” This happens because the cache is implemented as another Erlang process and the data collection happens inside the cache process. There is a gen_server:call/3 function visible in the graphs that makes a call to the cache process and waits for a response. Depending on the cache state (valid or invalid) a response can come back quickly or slowly.

Collecting data is implemented in rabbit_mgmt_db:list_queue_stats/3 function which is invoked from the cache process. Naturally, we should profile that function. We tried eflame and after several dozens of minutes this is the result we got:

eheap_alloc: Cannot allocate 42116020480 bytes of memory (of type "old_heap").

The Erlang heap memory allocator tried to allocate 42GB of memory (in fact, the space was needed for garbage collector to operate) and crashed the server. As eflame takes advantage of Erlang Tracing to generate flame graphs it was, most probably, simply overloaded with a number of trace events generated by the traced function. That’s where fprof comes into play.

According to the official Erlang documentation fprof is:

a Time Profiling Tool using trace to file for minimal runtime performance impact.
That’s very true. The tool dealt with collecting data function smoothly, however it took several minutes to produce the result. The output was quite big so there are only crucial lines listed below:

(rabbit@rmq-test-rabbitmq-1)96> fprof:apply(rabbit_mgmt_db, list_queue_stats, [RA, B, 5000]).  
...
(rabbit@rmq-test-rabbitmq-1)97> fprof:profile().  
...
(rabbit@rmq-test-rabbitmq-1)98> fprof:analyse().  
...
%                                       CNT        ACC       OWN  
{[{{rabbit_mgmt_db,'-list_queue_stats/3-lc$^1/1-1-',4}, 803,391175.593,  105.666}],  
 { {rabbit_mgmt_db,queue_stats,3},              803,391175.593,  105.666},     %  
 [{{rabbit_mgmt_db,format_range,4},            3212,390985.427,   76.758},  
  {{rabbit_mgmt_db,pick_range,2},              3212,   58.047,   34.206},  
  {{erlang,'++',2},                            2407,   19.445,   19.445},  
  {{rabbit_mgmt_db,message_stats,1},            803,    7.040,    7.040}]}.  

The output consists of many of these entries. The function marked with the % character is the one that the current entry concerns. The functions below are the ones that were called from the marked function. The third column (ACC) shows the total execution time of the marked function (the functions own execution time and callees) in milliseconds. For example, in the above entry the total execution time of the rabbit_mgmt_db:pick_range/2 function equals 58,047ms. For a detailed explanation of the fprof output check the official fprof documentation.

The entry above is the top level entry concerning rabbit_mgmt_db:queue_stats/3 which was called from the traced function. That function spent most of its execution time in rabbit_mgmt_db:format_range/4 function. We can go to an entry concerning that function and check where it spent its execution time accordingly. This way, we can go through the output and find potential causes of the Management API slowness issue.

Reading through the fprof output in a top-down fashion we ended up with this entry:

{[{{exometer_slide,'-sum/5-anonymous-6-',7},   3713,364774.737,  206.874}],
 { {exometer_slide,to_normalized_list,6},      3713,364774.737,  206.874},     %
 [{{exometer_slide,create_normalized_lookup,4},3713,213922.287,   64.599}, %% SUSPICIOUS
  {{exometer_slide,'-to_normalized_list/6-lists^foldl/2-4-',3},3713,145165.626,   51.991}, %% SUSPICIOUS
  {{exometer_slide,to_list_from,3},            3713, 4518.772,  201.682},
  {{lists,seq,3},                              3713,  837.788,   35.720},
  {{erlang,'++',2},                            3712,   70.038,   70.038},
  {{exometer_slide,'-sum/5-anonymous-5-',1},   3713,   51.971,   25.739},
  {garbage_collect,                               1,    1.269,    1.269},
  {suspend,                                       2,    0.151,    0.000}]}.  

The entry concerns exometer_slide:to_normalized_list/6 function which in turn called two “suspicious” functions from the same module. Going deeper we found this:

    {[{{exometer_slide,'-create_normalized_lookup/4-anonymous-2-',5},347962,196916.209,35453.182},
  {{exometer_slide,'-sum/5-anonymous-4-',2},   356109,16625.240, 4471.993},
  {{orddict,update,4},                         20268881,    0.000,172352.980}],
 { {orddict,update,4},                         20972952,213541.449,212278.155},     %
 [{suspend,                                    9301,  682.033,    0.000},
  {{exometer_slide,'-sum/5-anonymous-3-',2},   31204,  420.574,  227.727},
  {garbage_collect,                              99,  160.687,  160.687},
  {{orddict,update,4},                         20268881,    0.000,172352.980}]}.  

and:

    {[{{exometer_slide,'-to_normalized_list/6-anonymous-5-',3},456669,133229.862, 3043.145},
  {{orddict,find,2},                           19369215,    0.000,129761.708}],
 { {orddict,find,2},                           19825884,133229.862,132804.853},     %
 [{suspend,                                    4754,  392.064,    0.000},
  {garbage_collect,                              22,   33.195,   33.195},
  {{orddict,find,2},                           19369215,    0.000,129761.708}]}.  

A lot of the execution time was consumed by orddict:update/4 and orddict:find/2 functions. These two combined accounted for 86% of the total execution time.

This led us to the exometer_slide module from the RabbitMQ Management Agent Plugin. If you look into the module, you’ll find all the functions above and the connections between them.

We decided to close the investigation at this stage because this was clearly the issue. Now, that we’ve shared our thoughts on the issue with the community in this blog, who knows, maybe we’ll come up with a new solution together.

The observer effect

There is one last thing that is essential to consider when it comes to debugging/observing systems - the observer effect. The observer effect is a theory that claims if we are monitoring some kind of phenomena the observation process changes that phenomena.

In our example, we used tools that take advantage of tracing. Tracing has an impact on a system as it generates, sends and processes a lot of events.

Execution times of the aforementioned functions increased substantially when they were called with profiling enabled. Pure calls took several seconds while calls with profiling enabled several minutes. However, the difference between the slow and fast queries seemed to remain unchanged.

The observer effect was not evaluated in the scope of the experiment described in this blog post.

A workaround solution

The issue can be solved in a slightly different manner. Let’s think for a while if there is another way of obtaining queues names corresponding to the amount of messages in them? There is the rabbit_amqqueue:emit_info_all/5 function that allows us to retrieve the exact information we are interested in - directly from a queue process. We could use that API from a custom RabbitMQ plugin and expose a HTTP endpoint to send that data when queried.

We turned that idea into reality and built a proof of concept plugin called rabbitmq-queue-info that does exactly what’s described above. The plugin was even load tested (test specification was exactly the same as it was with the management plugin; from earlier in the blog). The results are below and they speak for themselves:

rabbitmq ram table

Want more

Want to know more about tracing in RabbitMQ, Erlang & Elixir? Check out WombatOAM, an intuitive system that makes monitoring and maintenance of your systems easy. Get your free 45 day trial of WombatOAM now.

Apendix

Version 3.7.9 of RabbitMQ was used in all the load tests mentioned in this blog post. Special thanks go to Szymon Mentel and Andrzej Teleżyński for all the help with that publication.

Permalink

RabbitMQ Mirrored Queues Gotchas

Mirrored Queues are a popular RabbitMQ feature that provides High Availability (HA). HA, in this context simply means that RabbitMQ nodes in the cluster can fail and the queues will still be available for the clients.
However, the HA queues can lead to some unexpected behaviour in failure scenarios or when combined with specific queue properties. In this blog post, we share three examples of these unpredictable behaviours that we have come across in RabbitMQ. This blog will help us explain some of the intricacies of HA queues. In doing so, we’ll also demonstrate how one can analyze the behaviours of a RabbitMQ cluster on a laptop or a single machine using common tools. Thus the next chapter briefly discusses the requirements to do so and scripts in the assisting repository that allow us to test the presented cases.

Setup

If you want to reproduce the examples from the post you will need the following dependencies installed:

Make
Git
Docker
Python3
Pipenv
Rabbitmq-perf-test 2.8.0
Wireshark (optional)

All the scenarios are based on a 2-node cluster consisting of RabbitMQ Docker containers - rmq1 and rmq2 - running Rabbit in version 3.7.15. Both containers expose ports 5672 (AMQP) and 15672 (management plugin) which are mapped to 5672/15672 and 5673/15673 for rmq1 and rmq2 respectively. In other words, once you set up the cluster, AMQP port for rmq1 is available at amqp://localhost:5672 and the management interface at http://localhost:15672.

The cluster is started with make up and tore down with make down. The up command will start the containers, attach them to a rmq network and install the following policy:

two dogs look up at camera

To see the logs run make logs1 to attach the output of the rmq1 container. Also, Python scripts are in use, thus pipenv install and pipenv shell need to be run to install the Python dependencies and start a shell within the python virtualenv respectively.

Auto-delete property for an HA queue

A queue in RabbitMQ can have the auto-delete property set. A queue with this property will be deleted by the broker once the last consumer unsubscribes. But what does this mean in a distributed environment where consumers are connected to different nodes and queue slaves are promoted to masters on failure? Let’s explore this example by setting up an environment for testing. Run make up which will spawn and cluster the RabbitMQ containers. The command should finish with an output similar to the below:

Cluster status of node rabbit@rmq1 ...  
[{nodes,[{disc,[rabbit@rmq1,rabbit@rmq2]}]},  
{running_nodes,[rabbit@rmq2,rabbit@rmq1]},  
{cluster_name,<<"rabbit@rmq1">>},  
{partitions,[]}, {alarms,[{rabbit@rmq2,[]},{rabbit@rmq1,[]}]}]  

Now we want to create a Mirrored Queue with the master at node rmq2 and the slave at rmq1. The queue should have the auto-delete property set.

For this purpose, we will use the PerfTest tool that we will connect to the second node and make it act as a producer. It will create haq queue (which matches the policy), bind it to the direct exchange with the key routing key and start producing 1 message per second:

# producer at rmq2
perf_test/bin/runjava com.rabbitmq.perf.PerfTest \
--uri amqp://localhost:5673 \
--producers 1 \
--consumers 0 \
--rate 1 \
--queue haq \
--routing-key key \
--auto-delete true 

Throughout the example, we assume perf_test is installed in the ./perf_test directory. As the producer is running, the queue should appear in the management UI and messages should be piling up:

two dogs look up at camera

Now let’s connect a consumer to our queue. Again, PerfTest will be our tool of choice but this time it will be used as a consumer attached to the first node (rmq1):

# consumer at rmq1 
perf_test/bin/runjava com.rabbitmq.perf.PerfTest \ --uri amqp://localhost:5672 \
--producers 0 \ 
--consumers 1 \ 
--queue haq 

The perf_test output should reveal that the messages are flowing:

# consumer at rmq1  
id: test-114434-950, time: 73.704s, received: 1.00 msg/s, min/median/75th/95th/99th consumer latency: 0/0/0/0/0 μs  
# producer id: test-113752-488, time: 487.154s, sent: 1.0 msg/s    

At this point, we have the following setup:

two dogs look up at camera

Now, let’s see what using the auto-delete property on HA queues can lead to in a failure scenario. Let’s introduce some turbulence into the cluster and disconnect rmq2 from the network between the nodes:

make disconnect2 

Underneath a docker network disconnect command is run that detaches the node from the rmq network. This should result in the consumer draining all the messages (as the producer is at the disconnected node and no new messages arrive at the queue). After approximately 10s the nodes should report that they cannot see each other. Below is a log from rmq2 where all this happens:

2019-06-07 10:02:20.910 [error] <0.669.0> ** Node rabbit@rmq1 not responding ** ** Removing (timedout) connection ** 
2019-06-07 10:02:20.910 [info] <0.406.0> rabbit on node rabbit@rmq1 down 2019-06-07 10:02:20.919 [info] <0.406.0> Node rabbit@rmq1 is down, deleting its listeners 
2019-06-07 10:02:20.921 [info] <0.406.0> node rabbit@rmq1 down: net_tick_timeout 

But what happened to our queue? If you look at the management interface of rmq2 (the node with the producer, http://localhost:15673/#/queues, user/password is guest/guest) the queue is gone! This is because once the netsplit has been detected, no consumers were left at rmq2 and the queue got deleted. What’s more, all the messages that made it to the master queue on rmq2 before the netsplit was detected are lost - unless Publisher Confirms was in play. The Producer would simply not receive confirms for the messages that were not accepted by the mirror. The queue should still be present at rmq1:

two dogs look up at camera

If we switch our producer to rmq1 the message flow should go back to normal:

# producer at rmq2 switched to rmq1 perf_test/bin/runjava com.rabbitmq.perf.PerfTest \ -uri amqp://localhost:5672 \
--producers 1 \ 
--consumers 0 \ 
--rate 1 \ 
--queue haq \ 
--routing-key key \ 
--auto-delete true 

Also, note that our slave at rmq1 was promoted to master:

2019-06-07 10:02:22.740 [info] <0.1858.0> Mirrored queue 'haq' in vhost '/': Slave <rabbit@rmq1.3.1858.0> saw deaths of mirrors <rabbit@rmq2.1.1096.0>
2019-06-07 10:02:22.742 [info] <0.1858.0> Mirrored queue 'haq' in vhost '/': Promoting slave <rabbit@rmq1.3.1858.0> to master 

As we are left with one copy of the queue with one consumer, if that last consumer is disconnected, the entire queue will be lost. Long story short: if you set up your HA queue with the auto-delete property, then in a fail-over scenario replicas can be deleted because they may lose consumers connected through other nodes. Using either ignore or autoheal cluster partition handling will keep the replicas. In the next section, we discuss the same scenario but with consumer cancel notification in use. Before moving on to the next section, restart our environment: make down up.

The consumer is notified about the fail-over

Now have a look at a slightly different scenario in which a consumer specifies that it wants to be notified in case of a master fail-over. To do so, it sets the x-cancel-on-ha-failover to true when issuing basic.consume (more here).

It differs from the previous example in that the consumer will get the basic.cancel method from the server once it detects the queue master is gone. And now, the client can handle this method as it wishes. E.g. it can restart the consumption by issuing basic.consume again, it could even reopen the channel or reconnect before resuming the consumption. Even if it crashes, upon receiving the basic.cancel, the queue will not be deleted. Note, the consumption can be resumed only once a new master is elected. For the same reasons as the previous example, the replica at another node (rmq2) will be gone. You can test all these consumer behaviours with the consume_after_cancel.py script. Below we will go through a case where the consumer will resume consumption after getting the basic.cancel. At the beginning, set up the cluster with make up and a producer at rmq2 - just as we did in the previous section:

# producer at rmq2
perf_test/bin/runjava com.rabbitmq.perf.PerfTest \
--uri amqp://localhost:5673 \
--producers 1 \
--consumers 0 \
--rate 1 \
--queue haq \
--routing-key key \
--auto-delete true 

Next, connect our new consumer to rmq1 (remember to pipenv install and pipenv shell as mentioned in the Setup):

./consume_after_cancel.py --server localhost --port 5672 --after-cancel reconsume

When the messages start flowing (they look meaningless), disconnect the second node from the cluster:

make disconnect2  

That should stop our consumer for a few moments, but it will resume consumption - since there’s a netsplit there are no messages to consume. Thus, we need to move the producer to the 1st node:

# producer at rmq2 switched to rmq1 perf_test/bin/runjava com.rabbitmq.perf.PerfTest \ -uri amqp://localhost:5672 \
--producers 1 \ 
--consumers 0 \ 
--rate 1 \ 
--queue haq \ 
--routing-key key \ 
--auto-delete true  

At this point, the message will be flowing again.

When studying these corner cases Wireshark and its AMQP dissector comes in very handy as it is crystal clear what the client-server conversation looks like. In our particular case, this is what can be seen:

two dogs look up at camera

As a simple explanation: using the x-cancal-on-ha-failover gives a piece of extra information indicating that a fail-over has happened which can be acted upon appropriately - e.g. to automatically subscribe to the new master. We are then left with one consumer, and all the replicas without consumers being cleared.

HA queues with automatic synchronization

If we take a closer look at our HA policy (see the Setup section) it specifies the queue synchronization as automatic:

"ha-sync-mode":"automatic"

It means that when a node joins a cluster (or rejoins after a restart or being partitioned) and a mirror is to be installed on it, all the messages will be copied to it from the master. The good thing about this is that the new mirror has all the messages straight after joining, which increases data safety. However, it comes with a cost: when a queue is being synchronized, all the operations on the queue are blocked. It means that for example, a publisher cannot publish and a consumer cannot consume. Let’s have a look at an example in which we publish 200K messages to the haq queue, then attach a slow consumer and finally restart one node. As the policy specifies that the queue is to have 2 replicas with automatic synchronization, the restart will trigger messages to be copied to keep the replicas in sync. Also, the "ha-sync- batch-size":2 set in the policy indicates that RabbitMQ will synchronize 2 messages at a time - to slow down the synchronization and exaggerate “the blocking effect”. Run make up to setup the cluster. Then publish the messages:

# 10*20K = 200K 
./perf_test/bin/runjava com.rabbitmq.perf.PerfTest \
--uri amqp://localhost:5672 \
--producers 10 \
--consumers 0 \
--queue haq \
--pmessages 20000 \
--auto-delete false \
--rate 1000  

Check the queue is filled up (http://localhost:15672/#/queues)

two dogs look up at camera

When it’s ready we can start the slow consumer which will process 10 msgs/s with a prefetch count of 10:

./perf_test/bin/runjava com.rabbitmq.perf.PerfTest \
--uri amqp://localhost:5672 \
--producers 0 \
--consumers 1 \
--queue haq \
--predeclared \
--qos 10 \
--consumer-rate 10 

Then restart the rmq2 (make restart2) and observe both the consumer’s logs and the rmq1 logs. In the consumers’ logs you should see some slowdown in receiving of messages:

id: test-145129-551, time: 20.960s, received: 10.0 msg/s, min/median/75th/95th/99th consumer latency: 0/0/0/0/0 μs
id: test-145129-551, time: 21.960s, received: 10 msg/s, min/median/75th/95th/99th consumer latency: 0/0/0/0/0 μs
id: test-145129-551, time: 23.058s, received: 10 msg/s, min/median/75th/95th/99th consumer latency: 0/0/0/0/0 μs
--> id: test-145129-551, time: 31.588s, received: 0.82 msg/s, min/median/75th/95th/99th consumer latency: 0/0/0/0/0 μs 
id: test-145129-551, time: 32.660s, received: 83 msg/s, min/median/75th/95th/99th consumer latency: 0/0/0/0/0 μs 
id: test-145129-551, time: 33.758s, received: 10 msg/s, min/median/75th/95th/99th consumer latency: 0/0/0/0/0 μs 
id: test-145129-551, time: 34.858s, received: 10 msg/s, min/median/75th/95th/99th consumer latency: 0/0/0/0/0 μs  

The marked line shows that the consumer was waiting for nearly 10 seconds for a message. And the RabbitMQ logs can explain why:

2019-06-04 13:36:33.076 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 200000 messages to synchronise 2019-06-04 13:36:33.076 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: batch size: 2 
2019-06-04 13:36:33.079 [info] <0.3039.0> Mirrored queue 'haq' in vhost '/': Synchronising: mirrors [rabbit@rmq2] to sync 2019-06-04 13:36:34.079 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 22422 messages 
2019-06-04 13:36:35.079 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 43346 messages 2019-06-04 13:36:36.080 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 64512 messages 
2019-06-04 13:36:37.084 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 82404 messages 2019-06-04 13:36:38.095 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 104404 messages 
2019-06-04 13:36:39.095 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 128686 messages 2019-06-04 13:36:40.095 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 147580 messages 
2019-06-04 13:36:41.096 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 164498 messages 2019-06-04 13:36:42.096 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: 182408 messages 
2019-06-04 13:36:42.961 [info] <0.1557.0> Mirrored queue 'haq' in vhost '/': Synchronising: complete 

If we compare timestamps of the first line, indicating the start of the synchronization, with the last line, indicating the end of the synchronization, we can see it took nearly 10 seconds. And this is exactly when the queue was blocked. In real life, obviously, we would not have the ha-sync-batch-size set to 2 - it is by default set to 4096. But what happens in real life is that there can be multiple queues, longer than 200K, with larger messages. All of that can contribute to a high volume of traffic and prolonged synchronization time resulting in queues that remain blocked for longer. The takeaway is: make sure you are in control of the length of your queues if automatic synchronization is in use. This can be achieved by limiting the length of queues or setting TTLs for messages.

Delayed Publisher Confirms

If you’re concerned about data safety, you probably need Publisher Confirms. This mechanism makes the broker send an ACK for each successfully accepted message. But what does successfully accepted mean? As per the documentation:

For routable messages, the basic.ack is sent when a message has been accepted by all the queues. For persistent messages routed to durable queues, this means persisting to disk. For mirrored queues, this means that all mirrors have accepted the message.

As we are discussing HA queues, the statement saying that an ACK is sent when ALL the mirrors have accepted the message is the interesting one. To see how this can cause complications, let’s consider the following setup based on the RPC messaging pattern:

two dogs look up at camera

We have a server that subscribes to haq_jobs queue on which it expects messages with integers in their body. Once it gets a message, it computes a Fibonacci number for that integer and sends the reply back to the reply queue stores it in the reply_to property of the original message. The client in turn (denoted as P/C in the picture), when invoked, will send an integer to the haq_jobs queue and wait for the reply on the temporary queue. It opens two connections: one for publishing the request and one for getting the response. The reply queue is exclusive as it is meant to be used by just one consumer. What is more, the message is sent in a separate thread which then waits for the ACK. In other words, sending a message and waiting for the reply are independent thread-wise.

Now we’re ready to start the cluster: make up. Then in a separate shell run the server: /fib_server.py. Finally, try the client:

./fib_client.py 7 
[2019-06-06 14:20:37] Press any key to proceed... 
[2019-06-06 14:20:37] About to publish request='7' 
[2019-06-06 14:20:37] Message ACKed
[2019-06-06 14:20:37] Got response=13
./fib_client.py 7 
[2019-06-06 14:20:39] Press any key to proceed... 
[2019-06-06 14:20:39] About to publish request='7' 
[2019-06-06 14:20:39] Got response=13 
[2019-06-06 14:20:39] Message ACKed 

As you may have noticed, in the second invocation of the client, we got the first response and then the ACK for the request. That may look weird, but this is due to the fact that the confirms are sent asynchronously once all the mirrors accept the message. In this particular case, it turned out that the Fibonacci server and RabbitMQ were quicker to produce the result and deliver it than the cluster to replicate the request and send the confirmation. What can we conclude from this? Never rely on the confirm to come before the reply in the RPC (Remote Procedure Call) pattern - although they should come very close to each other.

The last statement, that the ACK and the reply come shorty after one another, doesn’t hold true in a failure scenario where the network/node is broken. To simulate that, let’s clean the environment with make down and set the net_ticktime to 120s to make Rabbit detect net-splits slower. This will give us time to perform the actions after we break the network. It has been set to 10s in the previous experiments to make them run faster, specifically to make the net-split detection run fast. Edit the conf/advanced.config as follows: This is needed for the second experiment in this section as it gives us time to perform the required actions; it doesn’t affect the first experiment. It has been initially set to 10 so that we could go through the tests in the previous sections fast. Modify the conf/advanced.config so that the option is set to 120s instead of 10:

[ 
 {kernel, [{net_ticktime, 120}]}
]. 

Now we can run the cluster, make up, and start the server and client as previously. But leave the clients at the request to press a key and stop for a moment to reflect on the current state of the system: *we have the Fibonacci server connected and waiting for the requests, *we have 2 connections opened from the client: one to publish and one to consume, *all the queues are set up.

Now run make disconnect2 to disconnect rmq2 from the cluster. At this point, you have between 90 to 150 seconds (as the net_ticktime is set to 120s) to press any button to publish the message. You should see that the server will process it and you will get the response but the ACK for the request won’t be delivered until you recover the net-split with make connect2 (which you have to do within the given time-range).

[2019-06-06 14:21:32] Press any key to proceed... [2019-06-06 14:21:38] About to publish request='7' [2019-06-06 14:21:38] Got response=13
[2019-06-06 14:22:05] Message ACKed 
In the above listing, the ACK arrived ~30s after the actual response due to the net-split. 

Remember: one cannot expect that a message will be ACKed by the broker before the reply arrives on the reply queue (RPC pattern) - as we’ve just seen the ACK can get stuck due to some turbulence on the network.

Summary

Undoubtedly Mirror Queues provide an important feature of RabbitMQ - which is High Availability. In simple words: depending on the configuration, you will always have more than one copy of the queue and its data. As a result, you are ready for a node/network failure which will inevitably happen in a distributed system. Nonetheless, as illustrated in this blog post, in some specific failure scenarios or configuration, you may observe unexpected behaviours or even data/queue loss. Thus it’s important to test your system against potential failures and scenarios which are not (clearly) covered by the documentation. If you’re interested in learning more about HA in RabbitMQ, talk to us at the RabbitMQ Summit in London or contact us today for a RabbitMQ health check.

Want to learn more?

For more information on High Availability in RabbitMQ, watch this video from our Erlang Meetup in Krakow.

We thought you might also be interested in:

Our RabbitMQ solutions

RabbitMQ monitoring with WombatOAM

RabbitMQ | Topic Exchanges

Permalink

The Changeset API Pattern

The Changeset API Pattern

Over time, as you gain overall experience with software development, you start noticing some paths that can lead to much more smooth sailing. Those are called design patterns, formalized best practices that can be used to solve common problems when implementing a system.

One of these patterns that I am having great success while working on web applications in Elixir, is what I am calling, for the lack of a better name, the Changeset API Pattern.

Before I start with the pattern itself, I'd like to outline some information that I consider as the motivation behind the usage, and it is called Data Integrity.

Data integrity is the maintenance of, and the assurance of the accuracy and consistency of, data over its entire life-cycle, and is a critical aspect to the design, implementation and usage of any system which stores, processes, or retrieves data.
The overall intent of any data integrity technique is the same: ensure data is recorded exactly as intended. In short, data integrity aims to prevent unintentional changes to information. Data integrity is not to be confused with data security, the discipline of protecting data from unauthorized parties.

Overall Goal

Facilitate the data integrity main goal, ensure that data is recorded exactly as intended.

The Changeset API Pattern is not the sole responsible for achieving this goal, however, once used in conjunction with some data modeling best practices such as column types and constraints, default values and so on, the pattern will become an important application layer on top of an already established data layer, aiming for an overall better data integrity.

Database Data Integrity

As mentioned above, having good database specifications will facilitate data integrity. In Elixir, this is commonly achievable through Ecto, the most common component to interact with application data stores, through Ecto Migration DSL:

defmodule Core.Repo.Migrations.CreateUsersTable do
  use Ecto.Migration

  def change do
    create table(:users) do
      add :company_id, references(:companies, type: :binary_id), null: false
      add :first_name, :string, null: false
      add :last_name, :string, null: false
      add :email, :string, null: false
      add :age, :integer, null: false
      timestamps()
    end

    create index(:users, [:email], unique: true)
    create constraint(:users, :age_must_be_positive, check: "version > 0")
  end
end

In the migration above we are specifying:

  • column data types;
  • columns can't have null values;
  • company_id is a foreign key;
  • email column is unique;
  • age has to be greater than zero.

Depending on your datastore and column type you can apply a variety of data constraints to fulfill your needs. Ideally, the specifications defined in the migration should align with your Ecto Schema and generic changeset:

defmodule Core.User do
  use Ecto.Schema
  import Ecto.Changeset
  
  alias Core.Company

  @primary_key {:id, :binary_id, autogenerate: true}
  @timestamps_opts [type: :utc_datetime]
  schema "users" do
    belongs_to(:company, Company, type: :binary_id)
    field(:first_name, :string)
    field(:last_name, :string)
    field(:email, :string)
    field(:age, :integer)
    timestamps()
  end
  
  @required_fields ~w(company_id first_name last_name email age)a
  
  def changeset(struct, params) do
    struct
    |> cast(params, @required_fields)
    |> validate_required(@required_fields)
    |> validate_number(:age, greater_than: 0)
    |> unique_constraint(:email)
    |> assoc_constraint(:company)
  end
end

Those should be considered your main gate in terms of data integrity as it is ensuring data only will be stored if all checks pass. From there you can have other layers on top, for example, the Changeset API Pattern.

The Changeset API Pattern

Once you have a good foundation, it is time to tackle your application API scenarios regarding data integrity. While a generic changeset, as above, is sufficient to ensure that the data integrity matches what is defined in the database in a general sense (all inserts and all updates), usually not all changes are equal from the application standpoint.

The Problem

For example, let's assume that besides the existing columns in the users table example  above, we also have a column called encrypted_password for user authentication. In our application, we have the following endpoints in our API that modify data:

  • Register User;
  • Update User Profile;
  • Change User Password.

Having a generic changeset in our schema will allow all these three operations to happen as desired, however, it opens some data integrity concerns for the two update operations:

  • While updating my first name as part of Update User Profile flow, I also can change my password;
  • While changing my password as part of Change User Password flow, I can update my age.

As long as the fields are conforming with the generic changeset validations, these unexpected changes will be allowed. You can remedy this behavior by applying filters in your API or your controller, however, this will become brittle once your application evolves. Other than that, Ecto.Schema and Ecto.Changeset modules provide lots of functions for field validation, casting and database constraint checks, not leveraging them would require lots of code duplication, at least in terms of functionality.

The Solution

The Changeset API Pattern states that:

For each API operation that modifies data, a specific Ecto Changeset is implemented, making it explicit the desired changes and all validations to be performed.

Instead of a generic changeset, we will implement three changesets with a very clear combination for cast, validation and database constraint checks.

Register User Changeset

defmodule Core.User do
  # Code removed

  schema "users" do
    # Code removed
    field(:hashed_password, :string)
    # Code removed
  end

  @register_fields ~w(company_id first_name last_name email age hashed_password)a

  def register_changeset(struct, params) do
    struct
    |> cast(params, @register_fields)
    |> validate_required(@register_fields)
    |> validate_number(:age, greater_than: 0)
    |> unique_constraint(:email)
    |> assoc_constraint(:company)
  end
end

Update User Profile Changeset

defmodule Core.User do
  # Code removed

  @update_profile_fields ~w(first_name last_name email age)a

  def update_profile_changeset(struct, params) do
    struct
    |> cast(params, @update_profile_fields)
    |> validate_required(@update_profile_fields)
    |> validate_number(:age, greater_than: 0)
    |> unique_constraint(:email)
  end
  
  # Code removed
end

Change User Password Changeset

defmodule Core.User do
  # Code removed

  @change_password_fields ~w(hashed_password)a

  def change_password_changeset(struct, params) do
    struct
    |> cast(params, @change_password_fields)
    |> validate_required(@change_password_fields)
  end
end

In your API functions, even if extra data comes in, you are safe because the intent and output expectation of each operation is already defined in the closest point to the data store interaction from the application standpoint, in our case, in the schema definition module.

Caveat

One thing that I noticed when I started implementing this pattern is the fact that sometimes I was doing a little more than my initial intent within the changeset functions.

Instead of performing the data type casting, validations and database checks, in a few cases, I was also setting the field value. For the sake of illustration only but it can be anything along these lines, let's take an example of a user schema, that has a column verified_at that is nullable when the user is registered, but it will store the date and time the user was verified.

The changeset for this operation would only allow verified_at field to be cast with the proper data type, but beyond that, the current date and time were set in the changeset using Ecto.Changeset.put_change/3.

Instead, what should be done is to delegate to the API the responsibility to set the value for verified_at, that value would be later validated in the changeset as any other update.

Another common example is encrypting the plain text password (defined as a virtual field) during user registration or password change inside the schema module. The schema should not need to know about encryption hashing libraries, modules or functions, and that should be delegated to the API functions.

There is nothing wrong with Ecto.Changeset.put_change/3, in some cases it makes sense to use it, for values that can't come through the API for any reason, if you need a mapping between the value sent via API and your datastore, or if you need to nullify a field.

Advantages

  • pushes data integrity concerns upfront in the development process;
  • protects the schema against unexpected data updates;
  • adds explicitness for allowed data changes and checks to be performed per use-case;
  • complements the commonly present data integrity checks in schema modules with use-cases checks;
  • leverages Ecto.Schema and Ecto.Changeset functions for better data integrity overall;
  • concentrate all data integrity checks in one single place, and in the best place, the schema module;
  • simplifies data changes testing per use-case;
  • simplifies input data handling in the API functions or controller actions.

Disadvantages

  • adds extra complexity in the schema modules;
  • can mislead to handle more than data integrity in the schema modules, as mentioned in the caveats session.

When the pattern is not needed

Even this pattern presents itself to me as a great way to achieve better data integrity, there is one scenario that I find myself skipping it:

  • usually, the entity (model) is much simpler;
  • the API only provides two types of change (create and a generic update);
  • both create and update require same data integrity checks.

Conclusion

Data is a very important asset in any software application and data integrity is a critical component to achieve data quality. The benefits of using this pattern so far are giving me much more reliability and control regarding the data handled by my applications nowadays. Other than that, it is making me think ahead in the development process regarding how I structure the data and how the application interacts with them.

Permalink

Mocking and faking external dependencies in elixir tests

During some recent work on thoughtbot’s company announcement app Constable, I ran into a situation where I was introducing a new service object that made external requests. When unit testing, it is easy enough to use some straightforward mocks to avoid making external requests. However, for tests unrelated to the service object, how can we mock out external requests without littering those tests with explicit mocks?

Unit testing using mocks

To set the stage, let’s take a look at what a hypothetical service object that makes an external request might look like:

defmodule App.Services.WebService do
  def make_request do
    HTTPoison.get!("http://thoughtbot.com/")
  end
end

And if we were to write a unit test for this service object, we would want to mock out the external request (in this case, the call to HTTPoison.get!/1). To do that, we might use a library like Mock:

defmodule App.Services.WebServiceTest do
  import Mock
  alias App.Services.WebService

  describe "#make_request" do
    test ". . ."
      # setup . . .

      get_mock = fn _url, _params, _headers ->
        %HTTPoison.Response{
          body: ". . .",
          status_code: 200
        }
      end

      response =
        Mock.with_mock HTTPoison, get!: get_mock do
          WebService.make_request()
        end

      # assertions . . .
    end
  end
end

Where it gets tricky

Mocking is exactly what we want when unit testing the service object, but if we have an unrelated unit tests that run code which happens to use our service object, we want to ensure that no external requests are made when running our test suite.

As an example, we might have a module that utilizes our service object:

defmodule SomeModule
  alias App.Services.WebServiceTest

  def do_something
    . . .
    response = WebServiceTest.make_request()
    . . .
  end
end

If we were testing this object and in our test we called SomeModule.do_something/0, we would inadvertently be making an external request. It would be incorrect to mock HTTPoison.get!/1 in this test because that’s an implementation detail of our service object. And while we could mock WebServiceTest.make_request/0, that will lead to a lot of noise in our tests.

Let’s create a fake

One way we can get around this issue is to create a fake version of our service object which has the same public interface, but returns fake data. That object might look like:

defmodule App.Services.FakeWebService do
  def make_request do
    %HTTPoison.Response{body: ". . .", status_code: 200}
  end
end

We want to utilize this fake by making our application code use WebService unless we are testing, in which case we want to use FakeWebService.

A common way to accomplish this is to have three modules: WebService, WebServiceImplementation, and WebServiceFake. Everyone calls methods on WebService which then delegates to WebServiceImplementation when not testing, or to WebServiceFake when testing. I don’t particularly like this pattern, because it requires an extra object and introduces complexity.

A much more simple and flexible solution is to use a form of dependency injection where we dynamically refer to our service object which is either the real service or the fake. There is a great elixir module called pact which accomplishes this by creating a dependency registry which allows us to define named dependencies, but conditionally switch out the actual value they resolve to.

Using pact, we define a dependency registry for our application:

defmodule App.Pact do
  use Pact
  alias App.Services.WebService

  register :web_service, WebService
end

App.Pact.start_link

And then we want to redefine that dependency to be our fake when we run our tests. The following code, either in a test helper or in some setup that occurs before all tests, will accomplish that:

App.Pact.register(:web_service, FakeWebService)

Finally, all calls to WebService.make_request() in our application and tests become App.Pact.get(:web_service).make_request(). The one exception to this is in our unit test for WebService itself - we want to test the actual service object! So we should still explicitly call WebService.make_request().

Keeping the fake up to date

This approach is good, but there is one problem: if the public interface of our real service object changes, we also have to update the fake. This may be acceptable; after all, it would likely cause a runtime test failure if the public interface of the fake differed from the real service object. But there is an easy way to make the compiler do more work for us.

Using behaviors we can specify a public interface that both the real service object and the fake must conform to. This can give us more confidence that we’re always keeping the two in sync with each other.

Let’s define a module describing the behavior of our service object:

defmodule App.Services.WebServiceProvider do
  @callback make_request() :: HTTPoison.Response.t()
end

And then we can adopt that behavior in our service object and fake:

defmodule App.Services.WebService do
  alias App.Services.WebServiceProvider
  @behavior WebServiceProvider

  @impl WebServiceProvider
  def make_request do
    HTTPoison.get!("http://thoughtbot.com/")
  end
end

. . .

defmodule App.Services.FakeWebService do
  alias App.Services.WebServiceProvider

  @impl WebServiceProvider
  def make_request do
    %HTTPoison.Response{body: ". . .", status_code: 200}
  end
end

Final thoughts

This approach is great when we want pure unit testing, and our goal is to avoid any external requests. The pact library even allows us to replace dependencies in a block, rather than permanently:

App.Pact.replace :web_service, FakeWebService do
  . . .
end

This can be an invaluable alternative to mocking a dependency for all tests, and may be preferable if we want to be very explicit about what is mocked in each test while still allowing us to easily make use of our fake.

For integration tests or more complex services where we want to test the full service-to-service interaction, we may want to consider building our own mock server instead of replacing the external service with a fake.

Permalink

Clever use of persistent_term

This blog post will go through three different uses of persistent_term that I have used since its release and explain a bit why they work so well with persistent_term.

Global counters

Let’s say you want to have some global counters in your system. For example the number of times an http request has been made. If the system is very busy that counter will be incremented many many times per second by many different processes. Before OTP-22 the best way that I know of to get the best performance is by using a striped ets tables. i.e. something like the code below:

incr(Counter) ->
  ets:update_counter(?MODULE,{Counter,erlang:system_info(scheduler_id)},1).

read(Counter) ->
  lists:sum(ets:select(?MODULE,[{{{Counter,'_'},'$1'},[],['$1']}])).

The code above would make sure that there is very little contention on the ets table as each scheduler will get a separate slot in the table to update. This comes at the cost of more memory usage and that when reading the value you may not get an exact value.

In OTP-22 the same can be achieved by using counters. Counters have built-in support for striping by using the write_concurrency option, so we don’t have to write our own implementation for that. They are also faster and use less memory than ets tables, so lots of wins.

The remaining problem then is finding the reference to the counter. We could put it into ets and then do an ets:lookup_element/3 when updating a counter.

cnt_incr(Counter) ->
    counters:add(ets:lookup_element(?MODULE,Counter,2),1,1).

cnt_read(Counter) ->
    counters:get(ets:lookup_element(?MODULE,Counter,2),1).

This gives a performance degradation of about 20%, so not really what we want. However, if we place the counter in persistent_term like the code below we get a performance increase by about 140%, which is much more in line with what we wanted.

cnt_pt_incr(Counter) ->
    counters:add(persistent_term:get({?MODULE,Counter}),1,1).

cnt_pt_read(Counter) ->
    counters:get(persistent_term:get({?MODULE,Counter}),1).

The reason for this huge difference is because when the counters are placed into persistent_term they are placed there as literals which means that at each increment we not longer have to make a copy of the counters reference. This is good for two reasons:

1) The amount of garbage will decrease. In my benchmarks the amount of garbage generated by cnt_incr is 6 words while both ets_incr and cnt_pt_incr create 3 words.

2) No reference counts have to be modified. What I mean by this is that the counters reference is what is called a magic reference or nif resource. These references work much in the same way as reference counted binaries in that they are not copied when sent to different processes. Instead only a reference count is incremented at copy and then decremented later by the GC. This means that for cnt_incr we actually have 3 counters that are modified for each call. First we increment the reference count on the counter when copying from ets, then we update the actual counter and then eventually we decrement the reference counter. If we use persistent_term, the term is never copied so we don’t have to update any reference counters, instead we just have to update the actual counter.

However, placing the counter in persistent_term is not trouble free. In order to delete or replace the counter reference in persistent_term we have to do a global GC which depending on the system could be very very expensive.

So this method is best to only be used by global persistent counters that will never be deleted.

You can find the code for all the above examples and the benchmark I ran here.

Logger level check

In logger there is a primary logging level that is the first test to be done for each potential log message to be generated. This check can be done many times per second and needs to be very quick. At the moment of writing (OTP-22) logger uses an ets table to keep all its configuration which includes the primary logging level.

This is not really ideal as doing a lookup from the ets table means that we have to take a read-lock to protect against parallel writes to the value. Taking such a read lock is not terribly expensive, but when done thousands of times per second it adds up.

So in this PR I’ve used persistent_term as a cache for the primary logging level. Now when reading the value from the hot path logger will instead use persistent_term. This removes all locks from the hot path and we only need to do a lookup in the persistent_term hash table.

But what if we need to update the primary logger level? Don’t we force a global GC then? No, because the small integer representing the primary logger level is an immediate. This means that the value fits in one machine word and is always copied in its entirety to the calling process. Which in turn means that we don’t have to do a global GC when replacing the value.

When doing this we have to be very careful so that the value does not become a heap value as the cost of doing an update would explode. However, it works great for logger and has reduced the overhead of a ?LOG_INFO call by about 65% when no logging should be done.

Large constant data

We use an internal tool here at the OTP-team called the “ticket tool”. It basically manages all of the OTP-XYZ tickets that you see in the release notes that comes with each release of Erlang/OTP. It is an ancient tool from late 90’s or early 00’s that no one really wants to touch.

One part of it is a server that contains a cache of all the 17000 or so tickets that have been created through the years. In that server there is a single process that has each ticket and its state in order to speed up searching in the tickets. The state of this process is quite large and when it is doing a GC it takes somewhere around 10 seconds for it to finish. This means that about every 10 minutes the server freezes for 10 seconds and we get to experience the joy of being Java programmers for a while.

Being a VM developer I’ve always thought the solution to this problem is to implement either an incremental GC or at least a mark and sweep GC for large heaps. However, the ticket tool server has never been of high enough priority to make me spend a year or two rewriting the GC.

So, two weeks ago I decided to take a look and instead I used persistent_term to move the data from the heap into the literal area instead. This was possible to do because I know that the majority of tickets are only searched and never changed, so they will remain in the literal area forever, while the tickets that do get edited move onto the heap of the ticket server. Basically my code change was this:

handle_info(timeout, State) ->
  persistent_term:put(?MODULE,State),
  erlang:start_timer(60 * 60 * 1000, self(), timeout),
  {noreply,persistent_term:get(?MODULE)}.

This small change puts the entire gen_server state into the literal area and then any changes done to it will pull the data into the heap. This dropped the GC pauses down to be non-noticeable and took considerable less time to implement than a new GC algorithm.

Permalink

Personal notes from Elixir Conf 2019

Personal notes from Elixir Conf 2019

Last week I attended Elixir Conf 2019 in Aurora, Colorado. It was my fourth Elixir Conf and by far the one that I engaged more with other people that I've never talked before, the hallway track was great and I could meet again some conference buddies from other editions. Another highlight was the conference hotel, a brand-new resort close to Denver, who knows me well understands how I like anything Colorado so I am glad the next edition will be in the same location.

The conference format was similar to previous years, it was three tracks, two days, with two extra days for an optional training, that I didn’t attend this time. The conference had 4 keynotes, José Valim (Elixir creator), Chris McCord (Phoenix creator) and Justin Schneck (Nerves co-creator), Dockyard team introducing Lumen, and great talks.

All the talks and keynotes are available in the Elixir Conf YouTube channel. Each keynote was focused in some areas, Elixir language and future, LiveView, Nerves and the brand-new Lumen, a project from Dockyard that uses Elixir in the client (browser).

As I always like to take notes when attending conferences and these are my highlights for Elixir Conf 2019. Please be advised that those notes are written like reminders for things I considered relevant during the talks and they are not a summary of them by any means. As the conference had many tracks, of course I couldn't attend all the talks, so feel free to comment with your favorite talk and notes:

Keynote: José Valim

Before anything, thank you very much José for the incredible language, it is a pleasure working full-time with Elixir and share the same community with great people, and I extend that to all Elixir core team as well.

Goal for 2019 is streamline Elixir in production

The main goal is enhance the ability to put applications in production easier, and to ensure what is running in production can be inspected.

The two main pieces are releases and telemetry.

Releases

  • Elixir 1.9 introduced release commands as part of the language, with lots of lessons and concepts from Distillery;
  • shellcheck to test shell scripts;

Telemetry

The library is composed by many pieces and helps snapshot what is going on inside running applications through metrics from the BEAM and custom ones.

  • telemetry, telemetry_poller, telemetry_metrics, and telemetry_reports;
  • Phoenix 1.5 will come with a Telemetry module to help define application specific metrics;

What is next to Elixir?

The next release, 1.10, will come with a different set up for its CI, now using Cirrus CI. It will also contain compiler tracing and ExUnit pattern diffing.

What is next to José Valim?

Now that the language enhancements are getting to a very stable point, and the community projects are being built and showcasing the power of the language, the efforts will be more directed to lower level (HiPE, Lumen and Hastega) and code analysis.

Keynote: Chris McCord

At Lonestar Elixir Conf 2019, Chris McCord presented the still-private project LiveView, which enables rich, real-time user experiences with server-rendered HTML. Right after the conference, the project was made available to the public and since then, a collection of projects is showcasing how interesting and powerful LiveView is. That also includes a Phoenix Phrenzy, a contest for LiveView projects.

In this keynote, he presents a few interesting points of LiveView and also what is coming next and the reasons behind.

LiveView library

  • template engine;
  • tests;
  • navigation: live_redirect, live_link and handle_params for pagination URL changes;
  • prepend, append, ignore and replace updates (phx-update);
  • JS hooks when you need just a piece of extra Javascript;
  • IE11 support;

Coming soon

  • phx-debounce;
  • file uploads;
  • exception translation;
  • stash for client state;
  • automatic from recovery;

Keynote: Justin Schneck

In his keynote, Justin Schneck, again, gave a really nice demo to show the improvements Nerves is getting.

Resilient

  • operating system: Erlang VM and Linux kernel;
  • application domain: Nerves runtime allows other than Elixir, and resilience from Erlang VM;
  • application data for specific data storage;

Reproducible

Read-only filesystem, immutable;

Reasonable

Whitelist approach (build up) with minimal output;

Nerves Hub

  • CryptoAuthentication chip;
  • NervesKey: write once;
  • delegated auth;
  • certificate;
  • remote console;
  • Terraform scripts to host your own Nerves Hub;

Keynote: Brian Cardarella, Paul Schoenfelder and Luke Imhoff

Introducing Lumen

Lumen is a new project from Dockyard that uses Elixir in the client. It uses WebAssembly and Rust as well in the compiler.

Compiler

The compiler has some unique constratins such as code size, load time and concurrency model.

Why not BEAM?

  • Runtime: unavailable APIs, incompatible scheduler, JS managed types;
  • Code size: BEAM bytecode is expensive, weak dead-code elimination;
  • Performance: VM on a VM, JS engine unable to reason about bytecode;

New Compiler

  • Restrictions: no hot-code, allow dead-code elimination;
  • Ahead of time vs VM: only pay for what you use, no interpretation overhead;
  • Build on existing tools: LLVM, Rust, wasm-bindgem;
  • Key challenges: recursion and tail-call optimization, non-local returns, green threads, webassembly specific limitations;
  • Continuations: represent jumps as calls to a continuation, never return, always moving forward;

Frontend
Accepts source in Erlang and Core Erlang, including mix task.

Middle tier
AST lowered to EIR.

Backend
Lowers from EIR to LLVM IR, generate object files.

Future goals
Push more data to LLVM, bootstrapping, MLIR, and embedded targets.

Runtime

Memory management
BEAM and Rust, with property-based testing.

Memory model
Process heap and pre-process garbage collection.

Processes
Very similar with what we have in Elixir. Code stack, mailbox, pid,
memory, links, and so on.

Schedulers
One per thread, main thread and webworkers.

WebAssembly main thread
Calls are blocking, scheduler needs to run timers.

Interacting with web
JS calling Lumen, and Lumen calling JS.

Why Lumen?

  • Joe's paper about GUI in functional programming;
  • Optimize front-end development;
  • GUI is concurrent by nature, we could have a supervisor taking care
    of a DOM tree;

Phoenix LiveView Demystified: Alex Garibay

In this talk Alex showed some LiveView internals, how it works and how it works so well.

LiveView EEx

From template with sigils to AST

  • static/vars and dynamic code;
  • %Phoenix.LiveView.Rendered{} with static, dynamic and fingerprint;
  • %Phoenix.LiveView.Comprehension{} to optmize data sent to the client;

Mounting the DOM

  • rounter, controller or template with live and live_render macros;
  • rendered template has few ids for channels and sessions;
  • container <div> that receives the template can be configured to be any HTML tag;

Phoenix Channels

  • uses Phoenix.LiveView.Socket;
  • uses a specific channel "lv:*";
  • socket receives session data and potentially user data;
  • client triggers an event that is sent to the socket using %Phoenix.Socket.Message{};
  • channel handles the event with callbacks;

Javascript

  • import LiveView library;
  • instantiate and connect LiveSocket;
  • a JS view is created with LiveView container, socket and so on;
  • each JS object in the view has static and dynamic data that is constructed for every change;
  • uses Morphdom.js to re-render the changes in the DOM;

WebRTC from start to finish: Scott Hamilton

WebRTC (Web Real-Time Communication) is a free, open-source project that provides web browsers and mobile applications with real-time communication (RTC) via simple application programming interfaces (APIs). It allows audio and video communication to work inside web pages by allowing direct peer-to-peer communication, eliminating the need to install plugins or download native apps.

Janus is a general purpose WebRTC server that has an Elixir client available.

WebRTC

  • spec and project;
  • basic implementation in P2P;
  • terminology: STUN, TURN and SDP;

Janus

  • general purpose WebRTC gateway;
  • JS, websocket;
  • 101: session, plugin, peer connection, handle;
  • resources to learn it are easy to find;

Elixir Phoenix Janus

Phoenix as middleman between client and Janus.

What could go wrong?

  • Janus configuration - ice trickle;
  • Janus on Docker;
  • deployment;
  • translation from Janus response to Elixir terms;
  • mixing HTTP and WS calls;

Elixir + CQRS - Architecting for availability, operability, and maintainability at PagerDuty: Jon Grieman

PagerDuty has a feature that records all incident user logs and they use CQRS pattern to design that piece of functionality.

They use composite keys for tracing that can be order-able. Other benefits of the approach is having separation for monitoring and scaling.

Overall incident log system

  • upstream systems;
  • Kafka;
  • creator;
  • DB cluster;
  • querier;
  • client systems;

DB incident recovery

  • whole stack that could be duplicated in region;
  • replace the DB engine entirely after back to normal;
  • operational benefits coming from ES and CQRS;

Date, Time, and Time Zones in Elixir 1.9: Lau Taarnskov

Handling date and time is a challenge in any language, in this talk we see the details behind Calendar in Elixir that had an upgrade in the version 1.9.

Differences between UTC, TAI and the concept of layers of wall time.

Elixir Calendar, Date and Time specifics

  • sigils: ~D, ~T, ~N, ~U;
  • chose the type by the meaning, not convenience;
  • NaiveDateTime vs DateTime date/time comparison;
    - no data is better than fake/assumed data (JS and Ruby, for example);
    - correct data is better than assumptions;
  • TimeZoneDatabase behaviour, as example tzdata;
  • check date time stored to verify they are appending Z;

Mint, disrupting HTTP clients: Andrea Leopardi

Mint is a reasonable new low-level HTTP client that aims to provide a small and functional core that others can build on top.

Story and features

The initial need was caused by a potential removal of httpc from Erlang standard library. Mint is a processless client, that defines a wrapper as data structure for the connection, on top of gen_tcp. Mint knows how to handle raw bits and also HTTP protocol.

Streaming by default
Responses will arrive async (status, headers, body and so on).

Security

  • httpc is not safe by default;
  • hackney is safe by default but can be overridden if you work with `ssl_options`;
  • mint is safe by default;
  • SSL certificate store with castore;

Proxying
Mint has proxying support for request and tunnel proxy types.

HTTP2

  • multiplexed streams;
  • server push;
  • backpressure;

Use cases

GenServer, GenStage, GenStatem with connection data. Also, it can be used as one process with many connections.

Challenges and future planes

  • immutability;
  • low level but usable;
  • increase adoption;
  • pooling;
  • websockets;
  • gRPC;

BEAM extreme: Miriam Pena

In her talk, Miriam showed us things to consider to improve performance, also alerted us that any tuning in the VM should be done only when needed, for very specific cases. Besides that, performance measuring should be done for a long time, and not using IEx.

One of the things she mentioned is that memory copy, something that happens a lot in the BEAM brings CPU overhead.

Pool bottleneck

  • use process directly instead of GenServer, 80% faster;
  • leverage process priority level;
  • as a con, hard code readability;

Key-value storage

  • process dictionary: hard to debug, tuple performance as an example;
  • code generation;
  • persistent term: access in constant time, write once read many, no copy to memory heap, tuple performance as an example;

NIFs

  • for when all the rest fails;
  • extreme memory usage;
  • no VM guarantees;

Other suggestion is to keep OTP version updated as possible as new releases are always improving performance.

Contracts for building robust systems: Chris Keathley

In this talk, Chris presents some insight why Design by Contract should be considered and how his new library Norm can help projects in the data specification and generation aspect.

Breaking changes require coordination

  • refactor;
  • requirement (requires no breaking changes);
  • technology;

Contracts

  • enforcing callers are correct (pre-conditions);
  • ensure the function is correct (post-conditions);
  • ExContract;

Data specification

  • Norm;
  • using in combination with ExContract;
  • supports schema, optionality;

Testing strategy

  • Norm uses stream data to generate data for testing;
  • can be used with property-based test;

Writing an Ecto adapter, introducing MyXQL: Wojtek Mach

Even not personally interested in MySQL as prefer and use only Postgres, I was willing to know more about internals of Ecto and how Ecto uses its adapters to connect and interact with databases.

Driver

  • :gen_tcp for database connection;
  • library binpp;
  • Wireshark app;

MySQL packet

  • payload length;
  • sequence id;
  • packet payload;
  • leverages Elixir binary pattern matching;
  • initial handshake package;
  • handshake response;

Building the adapter

  • encoding and decoding: function for every data type, OK_Packet with defrecord;
  • DBConnection behaviour: maintain a connection pool, does not overload the database, reconnect, support to common database features, and it needs to be fast;

Connection

  • start N connections using DBConnection based on the pool configuration;
  • fetching results preparing and executing the query;
  • other functions as disconnect, checkout, ping, and so on;

Ecto integration

  • all the features Postgres adapter has;
  • implements Ecto.Adapter, Ecto.Adapter.Queryable, Ecto.Adapter.Schema, Ecto.Adapter.Storage, and Ecto.Adapter.Transaction;
  • constraints;
  • data normalization: boolean as an example, as in MySQL it is set as 1 and 0;
  • integration tests;

Kubernetes at small scale: Phil Toland

Kubernetes has great benefits, even being a not so easy implementation. Some of the benefits are improved resource efficiency and reduced cost, and operational scalability. In this talk Phil described his process to implement Kubernetes at Hippware.

Main components

  • control plane (leave it alone);
  • workers;
  • workload:
    - pod;
    - deployment (how your pod runs in the cluster);
    - service (expose a pod to outside, expose a port);
    - config map;

Lessons learned

  • outsource as much as possible;
  • automate all the things;
  • pods are ephemeral;
  • automatic clustering: via libraries as libcluster and peerage;
  • deploying: via libraries as kubernetes-deploy and swarm;
  • one instance per node: anti-affinity specification;

ETS Versus ElasticSearch for Queryable Caching: David Schainks

In this talk David compares the characteristics of ElasticSearch that is well-known as a great solution for search and cached data, with Elixir/Erlang out-of-box solutions such as ETS and Persistent Term, listing the pros and cons of each option.

ElasticSearch

  • filtering, auto completion and full text search;
  • performant;
  • queryable cache;
  • operational cost: another DSL, integration time, expensive, and configuration gotchas;

ETS

  • no garbage collection;
  • use cases: preset configuration, cached data, message buffer;
  • filtering with match/2, match_object/1 and fun2ms/1;
  • auto completion with fun2ms/1;
  • full text search: using Joe Armstrong's elib1 library;

Real world concerns

  • performance with large data sets;
  • data ranking;

Operational concerns

  • high availability;
  • synchronization;
  • index changes;

Persistent term

  • no filtering;
  • much faster than ETS;
  • garbage collection on update;

UI Testing is Ruff; Hound Can Help: Vanessa Lee

Whether you call it UI testing, end-to-end testing, end-to-user testing, or acceptance testing, it is often an intensely manual and time-consuming process. An Elixir library, Hound, can carry some of the load through browser automation.

Hound takes screenshots of tests and stores in the test folder.

Library test helpers

  • check page elements;
  • fill, execute actions and submit form;
  • manage sessions;

Property-based testing

  • combining Hound with property-based libraries is very straightforward;
  • using custom StreamData generator using bind/2 helper;

While handy, there are some gotchas when elements are not ready or if the application is heavily dependent in Javascript events/side effects;

Permalink

Copyright © 2016, Planet Erlang. No rights reserved.
Planet Erlang is maintained by Proctor.