Erlang

IoT Edge Computing with Erlang/OTP

by Igor Kopestenski

The Internet of Things

In 2009, the number of machines connected to the Internet and Earth’s overall population were equal. Since then, IoT devices have outnumbered us by roughly 3 to 4 times i.e. there are now over 25 billion connected devices. This exponential growth is already acknowledged as one of the most groundbreaking opportunities of the decade. But it does come with equally challenging infrastructure and architecture requirements. We have already seen Erlang and MQTT used at the heart of the world-leading EMQx messaging broker. The LightKone Research consortium is now using Erlang to provide an innovative way to manage IoT edge data.

Data at the Edge

Traditional Cloud Computing designs are currently becoming unsustainable due to the amount of data that will be produced at the Edge of networks, close to and on the end-user devices. The need to balance the overall workload across the entire network spectrum increases in line with the power of IoT devices such as mobile phones and Smart Homes. IoT sensor data is generally collected at the edge and follows a vertical flow towards the cloud data centres. This creates a risk of network congestion and overload that is already a concern for Cloud Service Providers.

LiRA layers

To address this challenge, the LightKone EU research consortium has recently published a new reference architecture, called LightKone Reference Architecture (LiRA). It introduces a novel design pattern for edge systems, in which decentralized lateral data sharing is a key factor that sets it apart from current state of the art solutions.

Using Erlang for IoT

If you are familiar with the Nerves project, you may already know that all the benefits of BEAM applications can be ported on IoT devices. This is also true for bare metal Erlang/OTP programs with the GRiSP base embedded system. The Grisp environment provides users the ability to develop OTP applications as they would usually do, but also a hardware platform to deploy them and run the BEAM directly on the metal. For example, it is possible to communicate directly with Pmod Interfaces at the application level in Erlang.

The Achlys framework

Achlys is a prototype implementation that has been developed as part of the LightKone research project to validate the proposed model. The purpose of Achlys is to offer developers a new approach for developing IoT applications running on IoT devices with Erlang/OTP. There are two distinctive goals that Achlys aims to achieve:

General purpose computations: Achlys does not enforce any particular application by default. On the contrary, it focuses explicitly on allowing clusters of IoT devices to perform generic computations at the edge without knowing what programs they will execute. To achieve this, programs are encapsulated in a task data structure that contains Erlang higher-order functions and can be propagated from one node to an entire ad hoc network of GRiSP devices. This Functions-as-Data paradigm is particularly well-suited for Erlang/OTP applications as higher-order functions allow the tasks to carry self-containing autonomous programs that the users are able to define and reconfigure without stopping the system at all.

Bring resilience where there is none yet: IoT devices at the edge are subject to particularly high levels of mobility and disconnection. As a consequence, wireless ad hoc edge networks require additional reliability mechanisms. This objective is implemented in Achlys via hybrid gossiping algorithms from the Lasp Erlang libraries. A set of IoT nodes can automatically form a cluster when in radio communication range, and execute parallel resilient computations and storage operations.
A more technical publication is also available online

Getting started

Let us start with a simple IoT application with the following scenario :

  1. 5 IoT Grisp nodes are in a room, and each one is equipped with a temperature sensor

  2. First, we want to ensure that all the elements in the cluster are functional, by disseminating a task through one of the nodes. The task is a simple function telling each node to blink its LEDs in a rainbow pattern.

  3. Then, we propagate another task in order to determine which node is at the coldest spot in the room.

  4. Once the nodes have computed an average temperature over several samples and agreed upon the lowest value among the nodes, we can distinguish the Grisp board that is in the coldest area by turning its LEDs blue, while the rest of the cluster turns red.

Template project

First, we need to create our OTP application based on the Achlys template.

Achlys Task Model Usage

Now we can use the Achlys Task Model API by implementing a new gen_server module that extends Achlys to turn it into a domain-specific application.

Task provider module

First, create a new src/achlys_task_provider module that implements the gen_server OTP behaviour :

-module(achlys_task_provider).

-behaviour(gen_server).

%% API
-export([start_link/0]).

%% gen_server callbacks
-export([init/1 ,
         handle_call/3 ,
         handle_cast/2 ,
         handle_info/2 ,
         terminate/2 ,
         code_change/3]).

-define(SERVER , ?MODULE).

-record(state , {}).

start_link() ->
    gen_server:start_link({local , ?SERVER} , ?MODULE , [] , []).

init([]) ->
    {ok , #state{}}.

handle_call(_Request , _From , State) ->
    {reply , ok , State}.

handle_cast(_Request , State) ->
    {noreply , State}.

handle_info(_Info , State) ->
    {noreply , State}.

terminate(_Reason , _State) ->
    ok.

code_change(_OldVsn , State , _Extra) ->
    {ok , State}.

Initial Task

The first task can be prepared by including a trigger to disseminate the function on demand. This allows us to use it as a shortcut when we know our computation ahead of deployment, without precluding the possibility to add new functions at runtime.

Add the function:

schedule_task() ->
    %% Declare an Achlys task that will be
    %% executed exactly once
    Task = achlys:declare(mytask
        , all
        , single
        %% Rainbow function
        , fun() ->
            Random = fun() ->
                {rand:uniform(2) - 1, rand:uniform(2) -1, rand:uniform(2) - 1}
            end,
                _ = [grisp_led:pattern(L, [{100, Random}]) || L <- [1,2]]
        end),
    %% Send the task to the current server module
    %% after a 5000ms delay
    erlang:send_after(5000, ?SERVER, {task, Task}),
    ok.

And call it when initializing the task provider:

init([]) ->
    ok = schedule_task(),
    {ok , #state{}}.

The server will send the message {task, Task} to itself 5 seconds after starting. Add a new handler to process this message accordingly :

handle_info({task, Task} , State) ->
    %% Task propagation to the cluster, including self
    achlys:bite(Task),
    {noreply , State};
handle_info(_Info , State) ->
    {noreply , State}.

Now the server will add the greeting task in the Achlys Task Model using the achlys:bite/1 API to call upon receiving this message.

Temperature conversion

Now that the server behaves appropriately, it can begin serving its actual purpose, that is:

  • 1) Using Achlys to instruct all nodes to gather the temperature measurements from the GRiSP boards using the PmodNAV module
  • 2) Using a Lasp CRDT set to store and propagate the measurements from each node to all other nodes
  • 3) Using Lasp’s lasp:read/1 function to perform a blocking read operation in a separate Erlang process to wait until all nodes have sampled 5 values and transmitted the computed average to all others.

Add a new function :

coldest_spot_task() ->
    %% Declare an Achlys task that will be periodically
    %% executed as long as the node is up
    Task = achlys:declare(coldest_spot_task
        , all
        , single
        ,     fun() ->
        Id = {<<"temp">>, state_gset},
        {ok, {_, _, _, _}} = lasp:declare(Id, state_gset),
        L = lists:foldl(fun
            (_Elem, AccIn) -> timer:sleep(5000),
                Temp = pmod_nav:read(acc, [out_temp]),
                Temp ++ AccIn
        end, [], lists:seq(1,5)),
        SList = lists:usort(L),
        Min = hd(SList),
        Name = node(),
        lasp:update(Id, {add, {Min, Name}}, self()),
        spawn(fun() ->
                lasp:read(Id, {cardinality, 5}),
                {ok, S} = lasp:query(Id),
                Fetched = sets:to_list(S),
                {_Minimum, Node} = hd(lists:usort(Fetched)),
                Self = node(),
                case Node =:= Self of
                    true ->
                        [ grisp_led:color(X, blue) || X <- [1,2] ];
                    _ ->
                        [ grisp_led:color(X, red) || X <- [1,2] ]
                end
        end)
    end).

Extend the server’s API :

%% Adds the coldest_spot_task to the working set
%% using the Achlys task model
-export([add coldest_spot_task/0]).

And the implementation that follows :

add coldest_spot_task() ->
    gen_server:cast(?SERVER 
        , {task, coldest_spot_task()}).

You can now handle these incoming messages :

handle_cast({task, Task} , State) ->
    %% Task propagation to the cluster, including self
    achlys:bite(Task),    
    {noreply , State};
handle_cast(_Request , State) ->
    {noreply , State}.

Done! Once deployed, we obtain the following result :

Temperature measurement at the edge

(NOTE : turn on captions !)

Emulated Grisp boards

Fortunately for all the users that do not have GRiSP equipment for IoT edge computing, an emulation layer has been developed by the Grisp team in order to allow nodes to run on a computer identically as they would on embedded systems. Detailed instructions can be found in the Grisp Wiki. Once your development environment is configured, you can easily simulate a 2-node cluster on your laptop using this module as a drop-in replacement for the task provider.

The module is very similar to the one that was actually deployed as seen in the footage, except that it performs a different pair of operations. The initial one is to greet the users, then to store temperature measurements in Celcius and their Fahrenheit equivalent using a convergent replicated data structure mapping function.

Testing the provider with 2 nodes can be done locally using the following sequence of commands:

Execution scenario

%% ==================
%%
%% Node 1 shell : 
%% --------------
%%
%% $ make shell n=1 PEER_PORT=27001
%% ...
%% booting up
%% ...
%%
%% (achlys1@130.104.213.164)1> achlys_task_provider:start_link().
%% {ok,<0.806.0>}
%% (achlys1@130.104.213.164)2> Hello Joe !
%% (achlys1@130.104.213.164)2> achlys_task_provider:add_pmodnav_task().
%% ok
%% (achlys1@130.104.213.164)3>
%% 
%% (achlys1@130.104.213.164)3> {ok, Set} = lasp:query({<<"source">>, state_orset}), sets:to_list(Set).
%% [{[-1.3732929999999999,-0.789584,-0.23198300000000002],
%%   [0.0,0.0,0.0],
%%   [0.0,0.0,0.0],
%%   [0.0],
%%   [42.5],
%%   'achlys1@130.104.213.164'}]
%% (achlys1@130.104.213.164)4>
%%
%% (achlys1@130.104.213.164)4> {ok, FarenheitSet} = lasp:query({<<"destination">>, state_orset}), sets:to_list(FarenheitSet).
%% [{[-1.3732929999999999,-0.789584,-0.23198300000000002],
%%   [0.0,0.0,0.0],
%%   [0.0,0.0,0.0],
%%   [0.0],
%%   [108.5],
%%   'achlys1@130.104.213.164'}]
%% (achlys1@130.104.213.164)5>

Now start a second Achlys shell :

%% ==================
%% Node 2 shell : 
%% --------------
%%
%% $ make shell n=2 PEER_PORT=27002
%% ...
%% booting up
%% ...
%%
%% (achlys2@130.104.213.164)1> achlys_util:add_node('achlys1@130.104.213.164').
%% ok
%% (achlys2@130.104.213.164)2> Hello Joe !
%% 
%% (achlys2@130.104.213.164)2>
%% 
%% (achlys2@130.104.213.164)2> {ok, FarenheitSet} = lasp:query({<<"destination">>, state_orset}), sets:to_list(FarenheitSet).
%% [{[-1.733376,-1.7716230000000002,0.24387799999999998],
%%   [0.0,0.0,0.0],
%%   [0.0,0.0,0.0],
%%   [0.0],
%%   [108.5],
%%   'achlys2@130.104.213.164'},
%%  {[-1.3732929999999999,-0.789584,-0.23198300000000002],
%%   [0.0,0.0,0.0],
%%   [0.0,0.0,0.0],
%%   [0.0],
%%   [108.5],
%%   'achlys1@130.104.213.164'}]
%% (achlys2@130.104.213.164)3>
%% (achlys2@130.104.213.164)3> achlys:get_all_tasks().
%% [{#{execution_type => <<1>>,
%%     function => #Fun<achlys_task_provider.0.44631258>,
%%     name => mytask,
%%     targets => <<0>>},
%%   128479609},
%%  {#{execution_type => <<1>>,
%%     function => #Fun<achlys_task_provider.1.44631258>,
%%     name => pmodnav_task,
%%     targets => <<0>>},
%%   30190207}]
%% (achlys2@130.104.213.164)4>

And done!

Further LightKone examples

Additional usage scenarios and instructions can be found in the examples section of the Achlys repository. For instance, it is possible to use Grisp nodes to perform parallel IoT edge computations and estimate the value of Pi!

Learn more about Erlang and IoT

If you’re looking to use Erlang to provide scalable, reliable and fault-tolerant IoT architecture talk to us about EMQx.

Monitor your Erlang, Elixir and RabbitMQ systems

Are you currently using Erlang, Elixir or RabbitMQ in your stack? Get full visibility of your system with WombatOAM, find out more and get a free trial on our WombatOAM page.

Go back to the blog

Tags:
×

Thank you for your message

We sent you a confirmation email to let you know we received it. One of our colleagues will get in touch shortly.
Have a nice day!