Shiva Protocol¶
Abstract
This chapter explains the internal gossip and data protocol implemented to support the shiva application scheduling.
Foreword¶
Two essential and fundamental properties will be refered to in the following guide. First the strict ordering of events (i.e. kafka record) are the fundation of our implementation. By ordering we refer to the strict causal ordering in time as described by Leslie Lamport.
Second, we use Kafka both as an event queue for publishing and notifying events, and at the same time as a database to store the relevant events. Topic compaction is used so that several records with the same key eventually result in the single last such record being kept in the Kafka topic partition.
Quote
About events ordering, refer to the Time, Clocks and the Ordering of Events in a Distributed System Communications of the ACM 21, 7 (July 1978), 558-565
Leslie Lamport seminal paper. About this paper Jim Gray reported he had heard two different opinions of this paper: that it is trivial and that it is brilliant. If you are not aware of it, make sure you read it.
Basics¶
Three topics are used. They will be named COMMAND
, CONTROL
and ASSIGNEMENT
.
- the
COMMAND
topic is used to issue start or stop commands. - the
CONTROL
topic is used to keep track of the shiva cluster participants, through a very simple keep-alive gossip protocol. - the
ASSIGNEMENT
topic is used to hold the latest assignement map, a json document that simply list which application is owned and executed by which shiva node.
In the following and for the sake of clarity we will consider a sample channel C
, part of the T
tenant that contains a single shiva
applicationA
. That application is thus uniquely identified by the T/C/A
(string) key.
To represent kafka records we use the <key:value>
notation. For example :
<cmd:T/C/A,start>
CMD:T/C/A
, and a start value (whatever be the content of that start value).
Werease:
<assignement:{W1:T/C/A}>
represents a record with key assignement
, and (some sort of) map {W1:T/C/A}
as value.
The exact encoding of value is irrelevant.
What is essential is that record keys are unique. Because the topics at play are configured using a compaction strategy,
publishing several times the <cmd:T/C/A,start>
<cmd:T/C/A,stop>
commands (say you start and stop it several times) will eventually leads to topic
only storing the last such record.
In the following we go through the basic start, stop and status operations. Next we will explain how late leaders or workers deal with their bootstrap strategy. Last we will explain how workers are known to the leader.
Group and Consumer Identifiers¶
Here are the identifiers used by the leader, worker and administration clients (i.e. channelctl) to interact with the topics.
- Leader :
- all leader subscribe to the COMMAND topic using the same group identifier "{shiva-cluster-name}-leader".
Leader¶
The leader uses the following group and consumer identifiers:
- COMMAND:
- Group ID: "{shiva-cluster-name}-leader" where {shiva-cluster-name} is the logical name of the shiva cluster
- Consumer ID: "leader-{shiva-instance-id}" where {shiva-instance-id} is a unique per instance identifier, set in the leader configuration file
id
property.
- ASSIGNEMENT: note that the active leader only reads once that topic at startup. It then only writes it periodically.
- Group ID: "admin-r" group identifier
- Consumer ID: "leader" consumer identifier
- CONTROL:
- Group ID: "{shiva-instance-id} is a unique identifier, set in the leader configuration file
id
property. - Consumer ID: transient identifier generated by kafka
- Group ID: "{shiva-instance-id} is a unique identifier, set in the leader configuration file
Important
All leaders use the same group identifier on the COMMAND topic. That means only one leader is assigned the unique partition of that topic.
Worker¶
The worker uses the following group and consumer identifiers:
- COMMAND:
- Group ID: "{shiva-cluster-name}-worker-{worker-id}" where {shiva-cluster-name} is the logical name of the shiva cluster and {worker-id} is a unique worker instance id.
- Consumer ID: {worker-id}.
- CONTROL:
- Group ID: {worker-id}.
- Consumer ID: {worker-id}.
Important
Each worker use its own dedicated group on the COMMAND topic. That means each worker reads the command topics on its own.
Topics configuration¶
Kafka topics used by Shiva must be properly configured to keep the messages necessary for the proper functioning of Shiva. These parameters are tricky, this is why the Punch deployer sets the right values when creating Kafka topics as :
The values applied are as follows :
- COMMAND:
- retention.policy : compact & delete
- retention.ms : 15 days
- retention.bytes : 100 MB
- ASSIGNEMENT:
- retention.policy : compact & delete
- retention.ms : 3 years
- retention.bytes : 20 MB
- CONTROL:
- retention.policy : compact & delete
- retention.ms : 1 day
- retention.bytes : 100 MB
Application Commands¶
Application Start¶
Say you start completely empty, i.e. a new shiva cluster is up and running, but no shiva application has ever be started. Executing a start command as follows
channelctl --start channel C
COMMAND
topic.
COMMAND : <cmd:T/C/A,{start,A.zip}>
CONTROL :
ASSIGNEMENT :
All worker nodes receives this command, and save that zip in their data folder. However they do not start anything.
The active leader also receives it, and assign it to some worker. Say worker W1.
The leader then publishes the new assignement document to the COMMAND
and ASSIGNEMENT
topics.
The next state is thus :
COMMAND : <cmd:T/C/A,{start,A.zip}> <assignement:{W1:T/C/A}>
CONTROL :
ASSIGNEMENT : <assignement:{W1:T/C/A}>
COMMAND
topic, worker W1 effectively starts the A
application.
Important
Because there is a causal strict ordering on the COMMAND
topic, workers necesseraly have stored the
application files before they possibly receive that application assigned to them.
Application Stop¶
Stopping an application results in publishing a stop record as follows:
COMMAND : <cmd:T/C/A,{start,A.zip}> <assignement:{W1:T/C/A}> <cmd:T/C/A,{stop}>
CONTROL :
ASSIGNEMENT : <assignement:{W1:T/C/A}>
COMMAND : <cmd:T/C/A,{start,A.zip}> <assignement:{W1:T/C/A}> <CMD:T/C/A,{stop}> <assignement:{}>
CONTROL :
ASSIGNEMENT : <assignement:{W1:T/C/A}> <assignement:{}>
After topics are eventually compacted:
COMMAND: <cmd:T/C/A,{stop}> <assignement:{}>
CONTROL:
ASSIGNEMENT : <assignement:{}>
Important
This design is both extremally simple and storage efficient. Only the records of started application are kept in the kafka command topics. That topic requires a limited amount of spaces easily computed as a function of the maximum number of running application together with the maximal size of an application archive.
Application Status¶
When executing a status command for a channel (or for all channels), what is reported is directly
taken from the last document from the ASSIGNEMENT
topic. As simple as that.
The assignmenty document contains everything needed to perform a complete checks :
- the known workers
- the known applications
- the assigned applications
- the map of assigne application
Worker Discovery¶
Leaders and workers are distributed. They are not connected together not rely on a synchronized distributed service
such as etcd or zookeeper. Instead they only rely on a simple gossip protocol implemented on top of
the CONTROL
control topic.
At startup and every configurable interval (typically 30 second), the leader publishes a ping message to the control topic.
CONTROL: <PING>
CONTROL: <PING> <PONG,W1> <PONG,W2>
Late Leader start¶
Upon starting the leader does not start from an empty state. That would entail a complete shiva cluster restart, i.e. restarting all applications on all workers. In turn that can generate a storm of high resource consumption.
For that reason the leader bootstraps itself from the latest assignement map saved in the ASSIGNEMENT
topic.
At worst that assignement can refer to death workers and miss new workers. But that will be taken care of at the second round. In most situation a leader
restart entail no shiva application restart.