vereis ♪⁠~⁠(⁠´⁠ε⁠`⁠ ⁠) rss posts projs </>

Distributed Erlang

Tagged:  erlang
Published 2024-12-02 @ 15:29:22
Approx. 15 minutes

The Erlang   programming language is known for three things:

  1. Concurrency
  2. Fault tolerance
  3. Distribution

The cool thing is that all three of these things are both built into the language/runtime itself, but they're also all more or less "emergent" properties of the underlying design choices that were made when the language was created.

The Actor Model

Erlang is based on the Actor Model, which is a model of computation that was first described by Carl Hewitt   in the 1970s.

The Actor Model is a way of thinking about computation that is based on the idea of "actors" that communicate with each other by sending messages.

You can think of an actor as its own little main "thread" of execution, responsible for managing its own state, and communicating with other actors.

Processes

In Erlang, actors are implemented as processes  . As each process executes, the Erlang Virtual Machine  ) is responsible for keeping track of how long each process has been running, and making sure that each process gets a fair share of the CPU.

As described, each process is very lightweight, costing very little in terms of memory to create and run. This means that you can create thousands or even millions of processes in an Erlang system, and the system will still run efficiently.

You can create a new process in Erlang by calling the spawn/1 function, which takes a single argument: a function that the new process should execute.

1> spawn(fun() -> io:format("Hello, world!~n") end).
Hello, world!
<0.41.0>

Because each process is scheduled independently by the Erlang Virtual Machine, there isn't a need to worry about locking any kind of main thread:

1> Loop = fun(F, X) -> F(F, X) end.
#Fun<erl_eval.12.57235823>
2> spawn(fun() -> Loop(Loop, foo) end).
<0.42.0>
% Note the shell is still responsive
3> 1 + 1.
2

In fact, "infinitely looping" functions being spawned as processes is the fundamental way that Erlang programs are written: these long running processes poll for messages, and based on the messages they receive, they decide what to do next.

Message Passing

To enable communication between processes, Erlang provides a mechanism called message passing  . This is a way for one process to send a message to another process, and for the receiving process to handle the message in some way.

A process can send a message (which can be any piece of data) to another process; additionally, a process can halt its execution and wait for a message to arrive.

Processes each have their own mailbox, which is a queue of messages that have been sent to the process.

When a process receives a message, it can choose to handle the message, or to ignore it.

For example, here's a simple program that sends a message to a process, and then waits for a response:

-module(example).
-export([start/0, resp/0]).

start() ->
  Pid = spawn(fun resp/0),
  % Send a message to the spawned process
  Pid ! {self(), hello},
  % And wait for a response, which is just a tuple tagged with the spawned process's PID
  % so we can identify which process sent the message
  receive
    {Pid, Response} -> io:format("Received response: ~p~n", [Response])
  end.

% This process just waits for a message, and then sends a response before exiting.
resp() ->
  receive
    {From, hello} -> From ! {self(), world}
  end.

With just these two features, message passing and processes, you have first-class concurrency baked into the language without function coloring  .

Fault Tolerance

Erlang is also known for its fault tolerance. This is because the Erlang Virtual Machine is designed to be able to handle failures gracefully.

What this means is that the Erlang Virtual Machine and runtime have primitives for:

  1. Linking processes together.
  2. Monitoring processes.

Linking Processes

When two processes are linked together, if one of the processes crashes, the other process will also crash. This is a way of ensuring that if one part of your system fails, the rest of the system can be notified and take appropriate action.

You can link two processes together using the link/1 function, or you can atomically spawn a process and link it to the current process using the spawn_link/1 function.

When a process crashes, it generates an exit signal, which is sent to all linked processes. This exit signal contains information about why the process crashed, and what the process's PID was.

Like any other message, you can receive and pattern match on exit signals in Erlang:

1> Pid = spawn_link(fun() -> exit(normal) end).
<0.41.0>
2> receive
..   {'EXIT', Pid, Reason} -> Reason
.. end.
normal

This allows you to write code that can handle failures gracefully, by restarting processes that have crashed, or by shutting down the entire system if a critical process has crashed.

Monitoring Processes

In addition to linking processes together, you can also monitor processes. This is a way of being notified when a process crashes, without crashing the monitoring process.

When you monitor a process, you receive a message when the monitored process crashes. This message contains information about why the process crashed, and what the process's PID was.

You can monitor a process using the erlang:monitor/2 function, which takes two arguments: the type of monitoring you want to do (either process or port), and the PID of the process you want to monitor.

When a process crashes, it generates an exit signal, which is sent to all monitoring processes. This exit signal contains information about why the process crashed, and what the process's PID was.

This, in tandem with pattern matching on exit signals, allows you to write code that can can monitor, say, worker processes and manage them as needed.

OTP

The Open Telecom Platform  (OTP) is a set of libraries and design principles that are built on top of Erlang.

One of the confusing things about OTP is that it's often referred to as a "framework", but it's really more of a set of abstractions that make it easier to write fault-tolerant and scalable systems in Erlang.

Unlike a "framework" in other languages, OTP is bundled with the Erlang runtime itself. OTP very much is Erlang, and Erlang is OTP.

The three main components of OTP are:

  1. Behaviours  
  2. Supervisors  
  3. GenServers  

Behaviours are a way of defining a set of functions that a module must implement in order to be considered a "behaviour". This is a way of enforcing a contract between modules, and making sure that they all implement the same set of functions.

Supervisors are a way of managing a set of worker processes. A supervisor is responsible for starting, stopping, and restarting worker processes as needed.

GenServers are a way of implementing a server process that can handle requests from clients. A GenServer is a process that runs a loop, waiting for messages from clients, and responding to those messages.

There are additional components of OTP, but day-to-day, these are the three that you'll interact with most often. Some of the other components include:

  • GenStatem  -- a way of implementing state machines.
  • GenEvent  -- a way of implementing event handlers.

By composing these components together, you build systems that are fault-tolerant, scalable, and easy to reason about (within the remit of Erlang applications, at least).

Distribution

Having covered the basics of concurrency and fault tolerance, we can now talk about distribution.

Distribution is, as stated, one of the touted benefits of using Erlang. Distribution is the mechanism by which you can run Erlang programs on multiple machines, and have them communicate with each other.

Erlang has built-in support for distribution, but the underlying mechanisms can be swapped out for other implementations. However, the majority of Erlang programs in the wild use the built-in distribution mechanisms, which is what we'll cover here.

Nodes

In Erlang, a node is a running instance of the Erlang Virtual Machine.

When you start an Erlang program, you start a node. This node is identified by a name, which is a string that is unique to the node. Nodes can have short names, which are just strings, or long names, which are strings that look like email addresses, i.e. node@hostname.

When nodes are started, they can connect to each other over a network. This is done using a mechanism called epmd   which acts a little like a DNS server for Erlang nodes.

When a node starts, it registers itself with epmd, and then other nodes can connect to it by looking up its name in epmd's registry.

Much of this is pluggable however, i.e. see epmdless  .

You can connect to a remote node using the net_kernel:connect_node/1 function, which takes the name of the node you want to connect to as an argument; additionally, you can look up a list of all connected nodes using the nodes/0 function.

1> net_kernel:connect_node('node@hostname').
true
2> nodes().
['node@hostname']

Note that node/0 is a built-in function that returns the name of the current node.

Networking

Before you can send messages between nodes, you need to set up a network connection between them.

This is done using the net_adm:ping/1 function, which takes the name of the node you want to ping as an argument.

% Start nodes
% erl -name node1@hostname
% erl -name node2@hostname
1> net_adm:ping('node2@hostname').
pong
2> nodes().
['node2@hostname']

Before attempting a connection however, each node needs to exchange a cookie, this is a shared secret that is used to authenticate connections between nodes.

The cookie is stored in a file called .erlang.cookie in the user's home directory.

There is also no built-in mechanism for communicating the cookie between nodes, so you'll need to do this manually.

Network Transparency

One of the cool things about Erlang's distribution is that it's network transparent.

For example, taking the above example of spawning a process, note the format of the returned PID:

1> Pid = spawn(fun() -> io:format("Hello, world!~n") end).
Hello, world!
<0.41.0>

In this example, the returned PID is <0.41.0>, which is a reference to a process (that may, or may not still be running).

There are two different types of PIDs in Erlang: local PID s and global PID s; the former is always in the format <0.X.Y>, where X and Y are different bits identifying the process on the local node.

For global PID s, the format is < X.Y.Z >, where X is the node number, and Y and Z continue to represent the process's identifier.

This means that you can send messages to processes on other nodes in the same way that you send messages to processes on the local node.

% assuming we have a remote PID stored in `RemotePid`
1> RemotePid ! {self(), ping}.
{<0.41.0>, ping}
% and then we can receive the message in the same way
2> receive
..   {RemotePid, Response} -> {RemotePid, Response}
.. end.
{<12042.3.0>, pong}

This is an extremely powerful feature of Erlang, as it allows you to write code that is completely agnostic to the location of the processes it's communicating with.

Distribution Gotchas

Despite the fact, however, that distribution is one of the much-touted features of Erlang, it is not without its pitfalls.

Compared to the concurrency and fault tolerance features of Erlang, distribution is much more complex, much more difficult to get right, and arguably somewhat half-baked.

Here are some of the main gotchas to be aware of when working with distributed Erlang systems that I've run into in my career.

Scalability in Large Clusters

When clustering using Distributed Erlang, the overall cluster forms a mesh network, where each node is connected to every other node in the cluster.

This means that as the number of nodes in the cluster grows, the number of connections that each node needs to maintain also grows. In a system with N nodes, each node needs to maintain N-1 connections to every other node in the cluster.

This means that as the number of nodes in the cluster grows, the number of connections that each node needs to maintain linearly, and the number of connections between nodes in the cluster grows quadratically.

Note: the original version of this article mistakenly stated that the number of connections grows exponentially, this is incorrect, and I have since corrected it.

Thanks toast0 for pointing this out!

Distributed Erlang relies on each node periodically sending heartbeat messages to every other node in the cluster to ensure that the connections are still alive; so as the number of connections between nodes grows, the number of heartbeat messages that need to be sent also grows at the same rate.

This can lead to a lot of network traffic in large clusters, which can put a strain on the network.

Historically, a "large" cluster in Erlang was considered to be around 50-100 nodes. This may have changed in recent years, but it's still something to be aware of when designing distributed Erlang systems.

Note: this number comes from an anecdote from Franceso Cesarini which I admit may have been misremembered on my part.

In practice, certain applications have been known to run on clusters of 400+ nodes, and in WhatsApp's case, 1,000+ nodes. Whether or not this was done without hidden node s is unknown to me.

You can mitigate this by strategically "partitioning" you cluster into smaller groups of nodes that are connected to eachother via a mechanism known as a hidden node, though this may prove unwieldy in practice.

Lack of Fine-Grained Control

One of the issues with Distributed Erlang is that it lacks fine-grained control over the distribution mechanism.

You can't easily , for example, specify which nodes should be connected to which other nodes, or which nodes should be connected to which other nodes, likewise, you're not able to throttle traffic out of the box.

This can lead to situations where a single node in the cluster can overwhelm the network with traffic, leading to network congestion and dropped packets.

One cool feature of Distributed Erlang is the fact that members of a cluster don't have to be homogeneous, i.e. you can have different nodes responsible for serving different purposes in your cluster (so long as they share the same cookie).

At Vetspire  , we utilise this feature heavily, though we've had to build a custom routing layer on top of Distributed Erlang in order to route requests/rpc calls to the appropriate nodes.

Aside: In the Elixir world, there's a great library called FLAME   that provides an API for spinning up new nodes on demand.

Definitely check it out!

Network Partitions

Unlike when writing applications that run on a single machine, when writing distributed systems, you need to be aware of the possibility of network partitions.

A network partition is when a network becomes divided into two or more separate subnetworks that are unable to communicate with each other.

Network partitions can happen for a variety of reasons, such as network failures, misconfigured routers, or software bugs.

Generally speaking, network partitions are less an issue of "if" they're a problem and more an issue of "when" they're a problem.

This is particularly important when designing systems reliant on a single global process-- i.e. a system whereby a single instance of a process is required to be running at any given time, once and only once in the system.

The general advice is to avoid relying on a single global process, and instead to design your system in such a way that it can tolerate the failure of any single node.

In the Elixir world, there are libraries such as Swarm   and Horde   that attempt to mitigate this issue some, though its on the implementor to reconcile the state of the system after a partition has been resolved.

One way to do this is to use a distributed consensus algorithm, such as Raft  , to ensure that all nodes in the cluster agree on the state of the system.

If one must rely on a single global process, using something like consistent hashing may help ensure only a single process is running in your cluster, but this is not a panacea as that particular node may not be alive or reachable during a network partition.

Definitely take a look at core riak   as a resource for learning about engineering around Distributed Erlang in general.

Single Mailbox Bottleneck

In Erlang, each process has its own mailbox, which is a queue of messages that have been sent to the process.

However, in contrast to the standard case, when sending messages to a process on another node, that process is first handled by that remote node's distribution layer's mailbox.

This distribution-layer mailbox is a singleton on any given node, and as such, like with high-traffic genserver s, they quickly become a bottleneck in high-traffic systems.

This is particularly an issue when sending either a large number of messages to a remote node, or when sending large data payloads to a remote node.

As mailboxes in Erlang are FIFO, this can lead to a situation where a single process on a remote node is overwhelmed with messages, and is unable to process them all in a timely manner -- the worst case scenario being that the mailbox blocks pending heartbeat messages, leading to the node being considered down.

There are some alternative distribution mechanisms that can be used to mitigate this, such as rpc gen  , which re-implements the built in rpc and erpc modules to use HTTP rather than Erlang's built-in distribution mechanisms.

This comes with additional advantages, such as the ability to send messages via SSL.

Node Discovery

It's important to note that the design and chosen trade-offs of pretty much anything in Erlang were made in the 80s, and whilst some choices like the focus on the actor model have aged well, others have not.

When it comes to Distributed Erlang, one of the differences between most deployments today and those of the 80s is the prevalence of cloud computing -- namely the fact that modern applications are oftentimes deployed in environments such as Kubernetes.

For node discovery in such environments, Erlang's built-in mechanisms need to be supplemented with something like libcluster   which provides pluggable strategies for node discovery via Kubernetes DNS, Consul, etc.

Physics

Finally, there's the issue of physics.

When you're writing distributed systems, you need to be aware of the speed of light, and the fact that it takes time for messages to travel between nodes.

This means that if you're designing a system that relies on low-latency communication between nodes, you need to be aware of the physical distance between the nodes, and the speed of light.

Especially during normal development on a single laptop, it's easy to forget that the network latency between two nodes on the same machine is going to be much lower than the network latency between two nodes on different continents. This can lead to situations where your system works fine in development, but falls over in production.

Clock synchronization is another issue that can crop up when working with distributed systems. Trying to keep track of the causal order of events across multiple nodes can be difficult. Using protocols like NTP   can help, as well as designing resilience to clock skew in your systems can help as well.

See the fallacies of distributed computing   for more on this.

Conclusion

Erlang is a powerful language for building distributed systems, thanks to its built-in support for concurrency, fault tolerance, and distribution.

However, building distributed systems is hard, and there are many pitfalls to be aware of when working with distributed Erlang systems.

By understanding the underlying principles of Erlang, and by being aware of the potential pitfalls, you can build systems that are fault-tolerant, scalable, and easy to reason about.

(END)