Skip to content

Shiva Protocol

Abstract

This chapter explains the internal gossip and data protocol implemented to support the shiva application scheduling.

Foreword

Two essential and fundamental properties will be refered to in the following guide. First the strict ordering of events (i.e. kafka record) are the fundation of our implementation. By ordering we refer to the strict causal ordering in time as described by Leslie Lamport.

Second, we use Kafka both as an event queue for publishing and notifying events, and at the same time as a database to store the relevant events. Topic compaction is used so that several records with the same key eventually result in the single last such record being kept in the Kafka topic partition.

Quote

About events ordering, refer to the Time, Clocks and the Ordering of Events in a Distributed System Communications of the ACM 21, 7 (July 1978), 558-565 Leslie Lamport seminal paper. About this paper Jim Gray reported he had heard two different opinions of this paper: that it is trivial and that it is brilliant. If you are not aware of it, make sure you read it.

Basics

Three topics are used. They will be named COMMAND, CONTROL and ASSIGNEMENT.

  • the COMMAND topic is used to issue start or stop commands.
  • the CONTROL topic is used to keep track of the shiva cluster participants, through a very simple keep-alive gossip protocol.
  • the ASSIGNEMENT topic is used to hold the latest assignement map, a json document that simply list which application is owned and executed by which shiva node.

In the following and for the sake of clarity we will consider a sample channel C, part of the T tenant that contains a single shiva applicationA. That application is thus uniquely identified by the T/C/A (string) key.

To represent kafka records we use the <key:value> notation. For example :

<cmd:T/C/A,start>
represents a record with (string) key CMD:T/C/A, and a start value (whatever be the content of that start value). Werease:

<assignement:{W1:T/C/A}>

represents a record with key assignement, and (some sort of) map {W1:T/C/A} as value. The exact encoding of value is irrelevant.

What is essential is that record keys are unique. Because the topics at play are configured using a compaction strategy, publishing several times the <cmd:T/C/A,start> <cmd:T/C/A,stop> commands (say you start and stop it several times) will eventually leads to topic only storing the last such record.

In the following we go through the basic start, stop and status operations. Next we will explain how late leaders or workers deal with their bootstrap strategy. Last we will explain how workers are known to the leader.

Group and Consumer Identifiers

Here are the identifiers used by the leader, worker and administration clients (i.e. channelctl) to interact with the topics.

  • Leader :
    • all leader subscribe to the COMMAND topic using the same group identifier "{shiva-cluster-name}-leader".

Leader

The leader uses the following group and consumer identifiers:

  • COMMAND:
    • Group ID: "{shiva-cluster-name}-leader" where {shiva-cluster-name} is the logical name of the shiva cluster
    • Consumer ID: "leader-{shiva-instance-id}" where {shiva-instance-id} is a unique per instance identifier, set in the leader configuration file id property.
  • ASSIGNEMENT: note that the active leader only reads once that topic at startup. It then only writes it periodically.
    • Group ID: "admin-r" group identifier
    • Consumer ID: "leader" consumer identifier
  • CONTROL:
    • Group ID: "{shiva-instance-id} is a unique identifier, set in the leader configuration file id property.
    • Consumer ID: transient identifier generated by kafka

Important

All leaders use the same group identifier on the COMMAND topic. That means only one leader is assigned the unique partition of that topic.

Worker

The worker uses the following group and consumer identifiers:

  • COMMAND:
    • Group ID: "{shiva-cluster-name}-worker-{worker-id}" where {shiva-cluster-name} is the logical name of the shiva cluster and {worker-id} is a unique worker instance id.
    • Consumer ID: {worker-id}.
  • CONTROL:
    • Group ID: {worker-id}.
    • Consumer ID: {worker-id}.

Important

Each worker use its own dedicated group on the COMMAND topic. That means each worker reads the command topics on its own.

Topics configuration

Kafka topics used by Shiva must be properly configured to keep the messages necessary for the proper functioning of Shiva. These parameters are tricky, this is why the Punch deployer sets the right values when creating Kafka topics as :

The values ​​applied are as follows :

  • COMMAND:
    • retention.policy : compact & delete
    • retention.ms : 15 days
    • retention.bytes : 100 MB
  • ASSIGNEMENT:
    • retention.policy : compact & delete
    • retention.ms : 3 years
    • retention.bytes : 20 MB
  • CONTROL:
    • retention.policy : compact & delete
    • retention.ms : 1 day
    • retention.bytes : 100 MB

Application Commands

Application Start

Say you start completely empty, i.e. a new shiva cluster is up and running, but no shiva application has ever be started. Executing a start command as follows

channelctl --start channel C
results in the publishing of a command record on the COMMAND topic.
COMMAND     : <cmd:T/C/A,{start,A.zip}>
CONTROL     :
ASSIGNEMENT :

All worker nodes receives this command, and save that zip in their data folder. However they do not start anything. The active leader also receives it, and assign it to some worker. Say worker W1. The leader then publishes the new assignement document to the COMMAND and ASSIGNEMENT topics. The next state is thus :

COMMAND     : <cmd:T/C/A,{start,A.zip}> <assignement:{W1:T/C/A}>
CONTROL     :
ASSIGNEMENT : <assignement:{W1:T/C/A}>
Upon receiving the assignement record from the COMMAND topic, worker W1 effectively starts the A application.

Important

Because there is a causal strict ordering on the COMMAND topic, workers necesseraly have stored the application files before they possibly receive that application assigned to them.

Application Stop

Stopping an application results in publishing a stop record as follows:

COMMAND     : <cmd:T/C/A,{start,A.zip}> <assignement:{W1:T/C/A}> <cmd:T/C/A,{stop}>
CONTROL     :
ASSIGNEMENT : <assignement:{W1:T/C/A}>
The active leader and all workers receive this record. The leader simply removes the application from its assignements, while workers check if they currently run this application and if so, stop it. The leader republishes the updated assignement, yielding to the following situation:
COMMAND     : <cmd:T/C/A,{start,A.zip}> <assignement:{W1:T/C/A}> <CMD:T/C/A,{stop}> <assignement:{}>
CONTROL     :
ASSIGNEMENT : <assignement:{W1:T/C/A}> <assignement:{}>

After topics are eventually compacted:

COMMAND: <cmd:T/C/A,{stop}> <assignement:{}>
CONTROL:
ASSIGNEMENT : <assignement:{}>

Important

This design is both extremally simple and storage efficient. Only the records of started application are kept in the kafka command topics. That topic requires a limited amount of spaces easily computed as a function of the maximum number of running application together with the maximal size of an application archive.

Application Status

When executing a status command for a channel (or for all channels), what is reported is directly taken from the last document from the ASSIGNEMENT topic. As simple as that.

The assignmenty document contains everything needed to perform a complete checks :

  • the known workers
  • the known applications
  • the assigned applications
  • the map of assigne application

Worker Discovery

Leaders and workers are distributed. They are not connected together not rely on a synchronized distributed service such as etcd or zookeeper. Instead they only rely on a simple gossip protocol implemented on top of the CONTROL control topic.

At startup and every configurable interval (typically 30 second), the leader publishes a ping message to the control topic.

CONTROL: <PING>
Each worker replies on the same topic :

CONTROL: <PING> <PONG,W1> <PONG,W2>
At each round the leader removes any worker not acked and adds new workers. If a change occurs, the assignement is recomputed and republished on the command and status topics.

Late Leader start

Upon starting the leader does not start from an empty state. That would entail a complete shiva cluster restart, i.e. restarting all applications on all workers. In turn that can generate a storm of high resource consumption.

For that reason the leader bootstraps itself from the latest assignement map saved in the ASSIGNEMENT topic. At worst that assignement can refer to death workers and miss new workers. But that will be taken care of at the second round. In most situation a leader restart entail no shiva application restart.