In this blog post I’m going to describe a very simple distributed algorithm that is useful in different programming scenarios. The algorithm is useful when you need to take some kind of information synchronized among a number of processes. The information can be everything as long as it is composed of a small number of bytes, and as long as it is idempotent, that is, the current value of the information does not depend on the previous value, and we can just replace an old value, with the new one. The size of the information is important because for the way the algorithm works, the information should be small enough that every node can broadcast it from time to time to some other random node, so it should fit the size of an “heartbeat” packet. Let’s say that up to a few kbytes everything is fine. This algorithm is no new in any way, it is basically just a trivial way to put together obvious ideas found in other distributed algorithms in a simple way. However the algorithm is very useful in many real-world contexts, and is extremely simple to implement. The algorithm is mostly borrowed from Raft, however because of the premises it uses only a subset of Raft that is trivial to implement. An example scenario === To understand better the algorithm, it is much better to have an example of problem that we want to solve in our distributed system. Let’s say that we have N processes connected with two kind of wifi networks: a very reliable but slow wireless network, that is only suitable to send “control” packets like heartbeats or other low bandwidth data, and a very fast wireless network. Now let’s imagine that while the slow network works at a fixed frequency, the high speed wireless network requires to adapt to changing conditions and noises in a given frequency, and is able to hop to a different frequency as soon as too much noise is detected. We need a way to make sure that all the processes use the same frequency to communicate with the high speed network, and we also need a way to switch frequency when the currently used frequency has issues. We need this system to work if there are network partitions in the slow network, as long as the majority of the processes are able to communicate. Note that this problem has the properties stated above: 1) The information is idempotent, if the high speed network switched to an different frequency, the new frequency does not depend on the old frequency. A process receiving the new frequency can just update frequency regardless of the fact that its old frequency was updated, or an older one (because for some reason it did not received some update). 2) The information is small, it is possible to propagate it easily across nodes in a small data packet. In this case it is actually extremely small, for example the frequency may be encoded in a 64 bit integer. Epochs and update messages === The basic idea of this algorithm is that there is an artificial notion of time across the processes, that is used to order events or informations without to resort to the system time of the process, that is hard to synchronize between them. This artificial time is called the “epoch”. Every process has the notion of currentEpoch, that is, initialized at zero at startup. Every time a process sees an epoch that is greater that its current epoch, it updates its epoch to match the observed epoch. Every process has also the notion of the frequencyEpoch, that is, the version of the currently used frequency. In order to propagate the information, every process periodically sends an update message to some other process. For example every 5 seconds every process picks a random process, and sends to it an update message containing: the current frequency in use, the epoch of the frequency used, and the currentEpoch of the process sending the update message. The first time a process is created its frequency is set to -1: this means that there is no frequency currently in use from the point of view of a given process, and that another one must be picked. Updating the frequency === When a process receives an update message from another process containing a frequency with a frequencyEpoch that is greater than its local frequencyEpoch, it updates its frequency to the received value, and sets the frequencyEpoch to the received value as well. In general when the currentEpoch or the frequency and frequencyEpoch are modified, the process writes this change to the disk or other permanente storage, so that when the process is restarted it will use the latest known information. Choosing a frequency === A process requires to choose a frequency in two different scenarios: 1) When the current frequency is detected to be noisy. 2) When the current frequency is set to -1 (at startup). In order to choose a frequency, a process requires to win an election. This is how it works: 1) The process increments its own currentEpoch, and writes it to permanent storage. It also selects a suitable new frequency. 2) The process sends to all the other processes a ELECT_ME packet to get the vote of the other processes. The ELECT_ME packet contains the currentEpoch of the sending process. 3) The other processes will reply with YOU_HAVE_MY_VOTE packet only if their currentEpoch is not greater compared to the one of the process requesting the vote (it can’t be smaller, since the reception of the ELECT_ME packet will cause an older currentEpoch to be updated to match the one of the incoming packet). The YOU_HAVE_MY_VOTE packet contains the currentEpoch of the voting process. 4) A given process only votes a single time for a given epoch, so it takes a variable called lastVoteEpoch, and will only provide its vote if the currentEpoch in the request for the vote is greater (>) than lastVoteEpoch. When the vote is provided, lastVoteEpoch is updated (and stored on disk *before* the vote is provided, so that a crash and restart will not cause this process to vote again for the same epoch). 5) YOU_HAVE_MY_VOTE messages with a currentEpoch smaller than the currentEpoch of the process that requested the vote are discarded. 6) The process requesting the vote will consider itself elected only if it receives the majority of the votes from the other processes (it will count itself as a voter and will vote for itself when the election starts). If the process is elected it will updated its frequencyEpoch and frequency variables. The frequencyEpoch that will be used is the epoch the process requested the vote with, that is, its currentEpoch at the time it sent the ELECT_ME packets, just after the increment. Given that a process requires to be elected to change the frequency, and that every process votes a single time in a given epoch, there must be only a single winner for a given epoch. If a given process is not able to get elected as the majority is not reached, it will try again after a random delay. This delay must be greater compared to the latency of the slow network that is used to exchange these messages (see the Raft paper for more information about this). Every process will consider the election aborted after some time that is smaller than the retry time. Propagating the new information === When a process wins an election, it updates its frequency value and frequencyEpoch to the new one, so by sending UPDATE messages, eventually all the other processes will receive the update as well and will switch to the new frequency. If some process is partitioned away, it will receive the update as soon as the partition heals. However it is a good idea to broadcast an UPDATE message to all the processes ASAP as soon as a process changes the frequency, so that all the other nodes will switch ASAP. Improving the algorithm with a simple change === The ELECT_ME packet can be improved by adding the value of the frequencyEpoch, so that other processes will refuse to vote if the process has a stale information. This may happen when, for example, the process was partitioned away for some time with an old frequency that does not work well as there is too much noise. So in a minority partition, it may try to get elected again and again. The majority probably already switched to a newer frequency. When the partition heals, the process may get elected and change the frequency to something else before having the chance to receive the updated frequency, causing a useless frequency switch. By adding the frequencyEpoch in the ELECT_ME packet and by making other processes checking that the info is updated before providing the vote, we avoid this problem. Other improvements === Another improvement may be to only provide the vote if the current frequency, from the point of view of the receiving node, is *really* noisy. This avoids that a single node having hardware issues in the high bandwidth radio will be able to continuously switch to new frequencies since every frequency will be detected as noisy. In this way a frequency switch can happen only if the majority of the nodes are detecting an issue with the current frequency. Similarly the node sending ELECT_ME messages to get elected may include the frequency it want to switch to, and the receiving node may vote only if the selected frequency passes some test and is considered a good pick, however this may affect the liveness of the algorithm: different nodes may believe that different frequencies are not a good pick so that majority can’t be reached. Conclusions === What I did in this blog post is just to take Raft, that is able to handle the complex problem of replicating a state machine across different processes in a consistent way, and simplify it in order to use it in a subset of problems where the state is a single value (or a set of values) that can just be updated in an idempotent way. The resulting algorithm is trivial to implement in a robust way, and is good enough for a non trivial set of real world problems. --------------------------------------- EDIT: I received some feedback via Twitter, and I think it is better to clarify what is the meaning of the above algorithm. The idea is to retain some safety under the specified scenario, where a replicated state machine is not needed, but still have a reasonable way to take a set of values synchronized across different processes. The goal is to provide, in exchange for the lack of functionality compared to Raft or Paxos, an algorithm that can be recalled by memory only without even reading a document. In this spirit my aim is to further simplify the above description of the algorithm without impacting the functionality. Moreover as somebody that is trying to understand more about distributed programming, I see that while it is very simple without even being aware of it, to get involved in something that is a distributed system, as a normal programmer (given that everything is networked today), descriptions of trivial algorithms may be a way to get somewhat exposed to basic distributed concepts. The next step is to learn a formal analysis tool and try to analyze the algorithm to provide a proof of safety / liveness.