PlanCtl¶
Abstract
the planctl command line is internally used to submit plans to execution. It is useful in development mode or to test plan in production platforms as well.
Documentation¶
Like all the punch client tools, documentation is available online, either in interactive mode or by simply typing:
planctl -h
In addition you can also use manual page :
man planctl
An online planctl manual page is also available.
Test and Development¶
To test your plan before integrating it into your production platform, you can use directly the planctl
command.
planctl start --plan myplan.hjson --template mypunchline.template
This command is available on the punch standalone platform and on the production operator environment. It will run your plan in foreground, possibly submitting its punchline to a spark cluster.
Refer to the planctl manual page for more information.
Production¶
To schedule your plan on a production platform, you must include it as part of a channel. The shiva runtime provides a ready-to-use planctl command.
The arguments provided to Shiva are the same than the command line ones. An example configuration is provided below.
Launching Modes¶
When running in production, you will typically need to take care of your plan resiliency. Should a server or vm fail, your plan will restart from another one. You will want to express its strategy so that it restart scheduling your punchlines from the latest correct time period. I.e. you do not want a missing data range in your plan outcome before the plan was interrupted and restarted.
A number of launching modes are available.
- WITH_LAST_COMMITTED_FROM_START_TO_END_DATE
activated when a
start
date andstop
date are defined inplan_settings
- WITH_LAST_COMMITTED_FROM_START_DATE
activated when only a
start
date is defined in theplan_settings
. In this mode, the date of the last successful execution takes priority if it exists. - WITH_LAST_COMMITTED_TO_END_DATE
activated when only an
end
date is defined in theplan_settings
. Using this mode, the date of the last successful execution is used. - WITH_LAST_COMMITTED
activated when neither
start
date norend
date are defined in theplan_settings
. In this mode, the recovery date of the last successful execution is used. - WITHOUT_LAST_COMMITTED
activated when a plan is launched without the
--last-committed
argument. Nothing particular is considered.
Plan event logging¶
Whenever you submit a plan to execution, various important events are reported to the monitoring plane.
At the level of the plan the following events are reported:
- PLAN_FIRST_EXECUTION
When neither recovery index nor document is found with required plan information
- PLAN_RESUMED_FROM_CHECKPOINT
When a recovery document is found containing needed meta data for our plan to resume from
- PLAN_IGNORED_LAST_COMMITTED_START_DATE
When plan is launched in
WITH_LAST_COMMITTED_FROM_START_DATE
At the level of the plan application (i.e. punchline) the following events are reported:
- APPLICATION_STARTED
When a job executed by a plan begins
- APPLICATION_ENDED
When a job executed by a plan ends
- APPLICATION_FAILED
When a job executed by a plan fails to end
- ERROR
When something unexpected happened
Here is an example logged event from a pyspark punchline launched by a plan/
{
"content": {
"unit": "seconds",
"event_type": "PLAN_UPTIME",
"job_runtime_id": "5b5ffef8-eca4-45e0-a159-2738839f14c7",
"level": "INFO",
"rate": 10,
"logger": "org.thales.punch.plan.api.PlanLauncher",
"channel": "default",
"launch_mode": "WITH_LAST_COMMITTED",
"plan.runtime.id": "default",
"start_date": "2019-11-15T14:57:15.809Z",
"uptime": 39
},
"target": {
"cluster": "foreground",
"type": "spark"
},
"init": {
"process": {
"name": "plan_controller",
"id": "5624@PUNCH-LPT35-003"
},
"host": {
"name": "PUNCH-LPT35-003"
},
"user": {
"name": "jonathan"
}
},
"platform": {
"channel": "default",
"id": "default",
"job": "no_shiva",
"tenant": "default"
},
"type": "punch",
"vendor": "thales",
"@timestamp": "2019-11-15T14:57:15.809Z",
"fields": {
"@timestamp": [
"2019-11-15T14:57:15.809Z"
],
"content.start_date": [
"2019-11-15T14:57:15.809Z"
]
}
}
Integrating a Plan in a Channel¶
To run a plan in production requires to declare it as part of a channel, more precisely make it a Shiva application. A Plan is, as a matter of fact, yet another Shiva application. In turn plans benefit from high-availability and platform monitoring.
The following is a typical layout of a channel that contains a plan:
tree tenants/mytenant/channels/mychannel
├── channel_structure.json
├── punchline_template.yaml
└── plan.yaml
The channel structure configuration looks like this:
stop_by_tenant: true
version: "6.0"
start_by_tenant: true
applications:
- args:
- start
- --plan
- plan.yaml
- --template
- punchline.yaml
- --runtime
- spark
- --spark-cluster
- common
- --deploy-mode
- client
- --last-committed # persistence
cluster: common
shiva_runner_tags:
- common
name: plan-aggregation
runtime: shiva
command: planctl
From there you can use the standard channelctl commands :
channelctl start --channel my_plan1
channelctl stop --channel my_plan1
Here are examples of the plan.yaml
:
version: '6.0'
name: aggregation
model:
metric_index: mytenant-metrics
plan_logs_index: platform-plan-logs*
input_index: mytenant-events-*
output_index: mytenant-aggregations
dates:
day:
offset: -PT1m
format: yyyy.MM.dd
from:
offset: -PT1m
format: yyyy-MM-dd'T'HH:mmZ
to:
format: yyyy-MM-dd'T'HH:mmZ
settings:
cron: '*/1 * * * *'
persistence:
- type: elasticsearch
index_name: platform-plan-cursor
metrics:
reporters:
- type: kafka
and here is a punchline_template.yaml
sample :
runtime: spark
metrics:
reporters:
- type: kafka
dag:
- type: elastic_input
settings:
query:
query:
bool:
must:
- range:
'@timestamp':
lt: '{{ to }}'
gte: '{{ from }}'
aggregations:
by_channel:
terms:
field: vendor
aggregations:
total_size:
sum:
field: size
max_size:
max:
field: size
index: '{{ input_index }}'
aggregation: true
timestamp:
field_value: '{{ to }}'
field_name: timestamp
component: input
publish:
- stream: data
- settings:
statement: SELECT timestamp, aggregation_result.doc_count, aggregation_result.key,
aggregation_result.max_size.value AS max_size, aggregation_result.total_size.value
AS total_size, doc_count_error_upper_bound, sum_other_doc_count FROM (SELECT
explode(buckets) AS aggregation_result, doc_count_error_upper_bound, sum_other_doc_count,
timestamp FROM input_data)
component: sql
subscribe:
- component: input
stream: data
publish:
- stream: data
type: sql
- settings:
index:
type: constant
value: '{{ output_index }}-{{ day }}'
component: output
subscribe:
- component: sql
stream: data
type: elastic_output
type: punchline
version: '6.0'
tenant: mytenant
Managing Plans¶
If a plan is executed with the --last-committed
mode and no persistence key is found in your plan configuration file, the following defaults will be used :
- default index name for cursor:
platform-plan-cursor
- default hostname for elasticsearch:
localhost
- default port for elasticsearch:
9200
For model
key that is used to define templating variables, the following ones are used by our internal system and will be overridden if used:
- name : defined in your plan configuration
- tenant : defined in your plan configuration
- channel : defined in your plan configuration
- version : defined in your plan configuration
Manually Launch a plan¶
To execute a plan, you can run these commands from your terminal:
# Default --runtime-environment is set to spark
# others: pyspark_5x (craig) and pyspark (dave)
# foreground/local
planctl start --plan <plan_path> --template <template_path>
# one a cluster
planctl start --plan <plan_path> --template <template_path> --deploy-mode cluster --spark-master <master_url>
# with persistence
planctl -t mytenant start -n myplan --plan <plan_path> --template <template_path> --last-committed
Manage the Plan Cursors¶
With planctl
, you can also manage plan cursors to see of affect their starting point.
Note
This is only working if the plan is executed from Shiva, i.e. integrated into a channel.
To view the cursor meta data that the plan will be using at startup:
planctl -t mytenant list-cursor --plan-id mytenant_mychannel_myplan \
--es-cluster es_search --index-name-pattern platform-plan-cursor
To change the startup cursor /
planctl -t mytenant reset-cursor --plan-id mytenant_mychannel_myplan \
--restart-date 2020-01-01T12:12:29.771Z --es-cluster es_search \
--index-name-pattern platform-plan-cursor
Note
the plan id can be easily find using the channelctl --status
command line.
Resources¶
See :