Skip to content

PlanCtl

Abstract

the planctl command line is internally used to submit plans to execution. It is useful in development mode or to test plan in production platforms as well.

Documentation

Like all the punch client tools, documentation is available online, either in interactive mode or by simply typing:

planctl -h

In addition you can also use manual page :

man planctl

An online planctl manual page is also available.

Test and Development

To test your plan before integrating it into your production platform, you can use directly the planctlcommand.

planctl start --plan myplan.hjson --template mypunchline.template

This command is available on the punch standalone platform and on the production operator environment. It will run your plan in foreground, possibly submitting its punchline to a spark cluster.

Refer to the planctl manual page for more information.

Production

To schedule your plan on a production platform, you must include it as part of a channel. The shiva runtime provides a ready-to-use planctl command.

The arguments provided to Shiva are the same than the command line ones. An example configuration is provided below.

Launching Modes

When running in production, you will typically need to take care of your plan resiliency. Should a server or vm fail, your plan will restart from another one. You will want to express its strategy so that it restart scheduling your punchlines from the latest correct time period. I.e. you do not want a missing data range in your plan outcome before the plan was interrupted and restarted.

A number of launching modes are available.

  • WITH_LAST_COMMITTED_FROM_START_TO_END_DATE

    activated when a start date and stop date are defined in plan_settings

  • WITH_LAST_COMMITTED_FROM_START_DATE

    activated when only a start date is defined in the plan_settings. In this mode, the date of the last successful execution takes priority if it exists.

  • WITH_LAST_COMMITTED_TO_END_DATE

    activated when only an end date is defined in the plan_settings. Using this mode, the date of the last successful execution is used.

  • WITH_LAST_COMMITTED

    activated when neither start date nor end date are defined in the plan_settings. In this mode, the recovery date of the last successful execution is used.

  • WITHOUT_LAST_COMMITTED

    activated when a plan is launched without the --last-committed argument. Nothing particular is considered.

Plan event logging

Whenever you submit a plan to execution, various important events are reported to the monitoring plane.

At the level of the plan the following events are reported:

  • PLAN_FIRST_EXECUTION

    When neither recovery index nor document is found with required plan information

  • PLAN_RESUMED_FROM_CHECKPOINT

    When a recovery document is found containing needed meta data for our plan to resume from

  • PLAN_IGNORED_LAST_COMMITTED_START_DATE

    When plan is launched in WITH_LAST_COMMITTED_FROM_START_DATE

At the level of the plan application (i.e. punchline) the following events are reported:

  • APPLICATION_STARTED

    When a job executed by a plan begins

  • APPLICATION_ENDED

    When a job executed by a plan ends

  • APPLICATION_FAILED

    When a job executed by a plan fails to end

  • ERROR

    When something unexpected happened

Here is an example logged event from a pyspark punchline launched by a plan/

{
  "content": {
    "unit": "seconds",
    "event_type": "PLAN_UPTIME",
    "job_runtime_id": "5b5ffef8-eca4-45e0-a159-2738839f14c7",
    "level": "INFO",
    "rate": 10,
    "logger": "org.thales.punch.plan.api.PlanLauncher",
    "channel": "default",
    "launch_mode": "WITH_LAST_COMMITTED",
    "plan.runtime.id": "default",
    "start_date": "2019-11-15T14:57:15.809Z",
    "uptime": 39
  },
  "target": {
    "cluster": "foreground",
    "type": "spark"
  },
  "init": {
    "process": {
      "name": "plan_controller",
      "id": "5624@PUNCH-LPT35-003"
    },
    "host": {
      "name": "PUNCH-LPT35-003"
    },
    "user": {
      "name": "jonathan"
    }
  },
  "platform": {
    "channel": "default",
    "id": "default",
    "job": "no_shiva",
    "tenant": "default"
  },

  "type": "punch",
  "vendor": "thales",
  "@timestamp": "2019-11-15T14:57:15.809Z",
  "fields": {
    "@timestamp": [
      "2019-11-15T14:57:15.809Z"
    ],
    "content.start_date": [
      "2019-11-15T14:57:15.809Z"
    ]
  }
}

Integrating a Plan in a Channel

To run a plan in production requires to declare it as part of a channel, more precisely make it a Shiva application. A Plan is, as a matter of fact, yet another Shiva application. In turn plans benefit from high-availability and platform monitoring.

The following is a typical layout of a channel that contains a plan:

tree tenants/mytenant/channels/mychannel
├── channel_structure.json
├── punchline_template.yaml
└── plan.yaml

The channel structure configuration looks like this:

stop_by_tenant: true
version: "6.0"
start_by_tenant: true

applications:
- args:
  - start
  - --plan
  - plan.yaml
  - --template
  - punchline.yaml
  - --runtime
  - spark
  - --spark-cluster
  - common
  - --deploy-mode
  - client
  - --last-committed  # persistence
  cluster: common
  shiva_runner_tags:
  - common
  name: plan-aggregation
  runtime: shiva
  command: planctl

From there you can use the standard channelctl commands :

channelctl start --channel my_plan1
channelctl stop --channel my_plan1

Here are examples of the plan.yaml :

version: '6.0'
name: aggregation
model:
  metric_index: mytenant-metrics
  plan_logs_index: platform-plan-logs*
  input_index: mytenant-events-*
  output_index: mytenant-aggregations
  dates:
    day:
      offset: -PT1m
      format: yyyy.MM.dd
    from:
      offset: -PT1m
      format: yyyy-MM-dd'T'HH:mmZ
    to:
      format: yyyy-MM-dd'T'HH:mmZ
settings:
  cron: '*/1 * * * *'
  persistence:
  - type: elasticsearch
    index_name: platform-plan-cursor
metrics:
  reporters:
  - type: kafka

and here is a punchline_template.yaml sample :

runtime: spark
metrics:
  reporters:
  - type: kafka
dag:
- type: elastic_input
  settings:
    query:
      query:
        bool:
          must:
          - range:
              '@timestamp':
                lt: '{{ to }}'
                gte: '{{ from }}'
      aggregations:
        by_channel:
          terms:
            field: vendor
          aggregations:
            total_size:
              sum:
                field: size
            max_size:
              max:
                field: size
    index: '{{ input_index }}'
    aggregation: true
    timestamp:
      field_value: '{{ to }}'
      field_name: timestamp
  component: input
  publish:
  - stream: data
- settings:
    statement: SELECT timestamp, aggregation_result.doc_count, aggregation_result.key,
      aggregation_result.max_size.value AS max_size, aggregation_result.total_size.value
      AS total_size, doc_count_error_upper_bound, sum_other_doc_count FROM (SELECT
      explode(buckets) AS aggregation_result, doc_count_error_upper_bound, sum_other_doc_count,
      timestamp FROM input_data)
  component: sql
  subscribe:
  - component: input
    stream: data
  publish:
  - stream: data
  type: sql
- settings:
    index:
      type: constant
      value: '{{ output_index }}-{{ day }}'
  component: output
  subscribe:
  - component: sql
    stream: data
  type: elastic_output
type: punchline
version: '6.0'
tenant: mytenant

Managing Plans

If a plan is executed with the --last-committed mode and no persistence key is found in your plan configuration file, the following defaults will be used :

  • default index name for cursor: platform-plan-cursor
  • default hostname for elasticsearch: localhost
  • default port for elasticsearch: 9200

For model key that is used to define templating variables, the following ones are used by our internal system and will be overridden if used:

  • name : defined in your plan configuration
  • tenant : defined in your plan configuration
  • channel : defined in your plan configuration
  • version : defined in your plan configuration

Manually Launch a plan

To execute a plan, you can run these commands from your terminal:

# Default --runtime-environment is set to spark
# others: pyspark_5x (craig) and pyspark (dave)

# foreground/local
planctl start --plan <plan_path> --template <template_path>

# one a cluster
planctl start --plan <plan_path> --template <template_path> --deploy-mode cluster --spark-master <master_url>

# with persistence
planctl -t mytenant start -n myplan --plan <plan_path> --template <template_path> --last-committed

Manage the Plan Cursors

With planctl, you can also manage plan cursors to see of affect their starting point.

Note

This is only working if the plan is executed from Shiva, i.e. integrated into a channel.

To view the cursor meta data that the plan will be using at startup:

planctl -t mytenant list-cursor --plan-id mytenant_mychannel_myplan \
        --es-cluster es_search --index-name-pattern platform-plan-cursor

To change the startup cursor /

planctl -t mytenant reset-cursor --plan-id mytenant_mychannel_myplan \
        --restart-date 2020-01-01T12:12:29.771Z --es-cluster es_search \
        --index-name-pattern platform-plan-cursor

Note

the plan id can be easily find using the channelctl --status command line.

Resources

See :