Skip to content

Shiva Protocol

Abstract

This chapter explains the internal gossip and data protocol implemented to support the shiva application scheduling.

Foreword

Two essential and fundamental properties will be refered to in the following guide. First the strict ordering of events (i.e. kafka record) are the fundation of our implementation. By ordering we refer to the strict causal ordering in time as described by Leslie Lamport.

Second, we use Kafka both as an event queue for publishing and notifying events, and at the same time a database to store the relevant events. Topic compatcion is used so that several records with the same key eventually result in the single last such record being kept in Kafka topics.

Quote

About events ordering, refer to the Time, Clocks and the Ordering of Events in a Distributed System Communications of the ACM 21, 7 (July 1978), 558-565 Leslie Lamport seminal paper. About this paper Jim Gray reported he had heard two different opinions of this paper: that it is trivial and that it is brilliant. If you are not aware of it, make sure you read it.

Basics

Three topics are used. They will be named COMMAND, CONTROL and STATUS. A shiva command is either start or stop of an application.

In the following and for the sake of clarity we will consider a sample channel C, part of the T that contains a single shiva applicationA. These applications is thus uniquely identified by the T/C/A.

To represent kafka record we use the <key:value> notation. For example :

<CMD:T/C/A,start>
represents a record with key CMD:T/C/A, and a start value (whatever be the content of that start value).
<ASSIGNEMENT:{W1:T/C/A}>
represents a record with key ASSIGNEMENT, and (some sort of) map {W1:T/C/A} as value. The exact encoding of value is irrelevant.

What is essential is that record keys are unique. Because the topics at play are configured using a compaction strategy, publishing several times the <CMD:T/C/A,start> commands (say you start and top it several times) will eventually leads to topic only storing the last such record.

In the following we go through the basic start, stop and status operations. Next we will explain how late leaders or workers deal with their bootstrap strategy. Last we will explain how workers are known to the leader.

Application Commands

Application Start

Say you start completely empty, i.e. a new shiva cluster is up and running, but no shiva application has ever be started. Executing a start command as follows

channelctl --start channel C
results in the publishing of a command record on the COMMAND topic.
CMD    : <CMD:T/C/A,{start,A.zip}>
CTL    :
STATUS :
All workers receives this command, and save that zip in their data folder. However the do not start anything. The active leader also receives it, and assign it to some worker. Say worker W1. The leader publishes the new assignement document to the COMMAND and STATUS topics. The next state is thus :
COMMAND: <CMD:T/C/A,{start,A.zip}> <ASSIGNEMENT:{W1:T/C/A}>
CONTROL:
STATUS : <ASSIGNEMENT:{W1:T/C/A}>
Upon receiving the assignment record from the COMMAND topic, worker W1 effectively starts the application.

Important

Because there is a causal strict ordering on the COMMAND topic, workers necesseraly have stored the application files before they possibly receive that application assigned to them.

Application Stop

Stopping an application results in publishing a stop record as follows:

COMMAND: <CMD:T/C/A,{start,A.zip}> <ASSIGNEMENT:{W1:T/C/A}> <CMD:T/C/A,{stop}>
CONTROL:
STATUS : <ASSIGNEMENT:{W1:T/C/A}>
Both alive leader and workers receive this record. The leader simply removes the application from its assignements, while workers check if they currently run this application and if so, stop it. The leader republishes the updated assignement, yielding to the following situation:
COMMAND: <CMD:T/C/A,{start,A.zip}> <ASSIGNEMENT:{W1:T/C/A}> <CMD:T/C/A,{stop}> <ASSIGNEMENT:{}>
CONTROL:
STATUS : <ASSIGNEMENT:{W1:T/C/A}> <ASSIGNEMENT:{}>

After topics are eventually compacted, the topic eventually will be compacted

COMMAND: <CMD:T/C/A,{stop}> <ASSIGNEMENT:{}>
CONTROL:
STATUS : <ASSIGNEMENT:{}>

Important

This design is both extremally simple and storage efficient. Only the records of started application are kept in the kafka command topics. That topic requires a limited amount of spaces easily computed as a function of the maximum number of running application together with the maximal size of an application archive.

Application Status

When executing a status command for a channel (or for all channels), what is reported is directly taken from the last document from the STATUS topic. As simple as that.

The assignmenty document contains everything needed to perform a complete checks :

  • known workers
  • known applications
  • unassigned applications
  • assigne application

Worker Discovery

Leaders and workers are distributed. They are not connected together not rely on a synchronized distributed service such as etcd or zookeeper. Instead they only rely on a simple gossip protocol implemented on top of the CTL control topic.

At startup and every configurable interval (typically 30 second), the leader publishes a ping message to the control topic.

CONTROL: <PING>
Each worker replies on the same topic :

CONTROL: <PING> <PONG,W1> <PONG,W2>
At each round the leader removes any worker not acked and adds new workers. If a change occurs, the assignement is recomputed and republished on the command and status topics.

Late Leader start

Upon starting the leader does not start from an empty state. That would entail a complete shiva cluster restart, i.e. restarting all application on all workers. In turn that can generate a storm of high resource consumption.

For that reason the leader bootstraps itself from the latest assignement map saved in the STATUS topic. At worst that assignement can refer to death workers and miss new workers. But that will be taken care of at the second round.