Skip to content

Custom Punchline Nodes

Quick

Welcome to the punchline node development guide. This quick guide aims at making you up and ready to add you own nodes. Happy coding !

Punch Team

We support only linux based-system and MacOS.

To get started in a sane developping environmment, we recommend using the following applications:

  • pyenv: to install/manage python interpreter as clean as possible
  • jenv: to install/manage java runtime as clean as possible

Version requirements:

  • python 3.6.8
  • java 8

Miscellaneous

Our public repository contains useful punchline examples for each supported runtime (Spark|PySpark|Storm). We also starters for each of our runtime.

Warning

This section does not replace our release note. It aims at only providing a quick overview of some of the most important breaking changes.

  • version 6 and more for pyspark/python is breaking for developping custom nodes for Pyspark runtime. All imports should now be prefixed with punchline_python.module_name
  • version 6 and more all punchlines should now have four mandatory keys: type, name, tenant, runtime and version.
  • version 6 and more usage of virtualenv have been dropped in favor for PEX. If your custom node depends on external python library, you should use PEX those modules into a pex archive and use our punchpkg cli to install it/or use our cli to generate a pex archive from a requirements.txt
Pyspark & Python

PySpark/Python runtime

This section is for users who already have a good grasp of Punch concepts. In this chapter, we explain how to integrate a custom node to the Pyspark Punchline engine. It can then easily be combined with the punch existing nodes.

# clone our latest starter repository on local file system
WORK_DIR=~/punch_starter
git clone https://github.com/punchplatform/starters $WORK_DIR

cd $WORK_DIR

# In case your node uses some custom modules like: pandas
# You should provide a text file named as your module. 
# The text file should include only the custom modules your node is using
punchpkg pyspark install-dependencies full/path/to/text_file/custom_modules

>   punchpkg pyspark install-dependencies complex_algorithm_dependencies

# Check if your custom module is properly installed
# A json document will be outputted on stdout, search for the key custom_pex_dependencies
# Within this key, you will see custom_modules
punchpkg pyspark list-dependencies

# Check the current module
punchpkg pyspark info

# Installing your custom node from full path (use tab for autocompletion)
punchpkg pyspark install-node </tab></tab>

>   punchpkg pyspark install-node $(pwd)/algorithms

# List installed nodes
punchpkg pyspark list-nodes

# Executing a node
# either use our PL editor or use our shell punchlinectl
punchlinectl start -p full/path/to/job.punchline

>   punchlinectl start -p full_job.punchline -v


--[[
__________                    .__    .____    .__               
\______   \__ __  ____   ____ |  |__ |    |   |__| ____   ____  
|     ___/  |  \/    \_/ ___\|  |  \|    |   |  |/    \_/ __ \ 
|    |   |  |  /   |  \  \___|   Y  \    |___|  |   |  \  ___/ 
|____|   |____/|___|  /\___  >___|  /_______ \__|___|  /\___  >
                    \/     \/     \/        \/       \/     \/ 
--]]
____   ________ _________  _ 
|__]\_/ [__ |__]|__||__/|_/  
|    |  ___]|   |  ||  \| \_ 


using nodes from ./nodes sources
Hello punch

Execution took 0.18007254600524902 seconds

Here is a simple example on how your code should look like:

#!/usr/bin/env python3
# coding: utf-8

from punchline_python.core.holders.input_holder import InputHolder
from punchline_python.core.holders.output_holder import OutputHolder
from punchline_python.core.node import AbstractNode
from typing import List
import redis


class ComplexAlgorithm(AbstractNode):

    # uncomment below if we want this node to take as input a single dataframe
    #  @AbstractNode.declare_dataframe_input()
    # We are expecting this node to publish one dataframe as output
    @AbstractNode.declare_dataframe_output()
    # We make use of the decorator design pattern to declare our node parameters...
    @AbstractNode.declare_param(name="param1", required=False, default="TEST")
    # We expect that this node subscribe to a stream and is going to output a stream of data
    @AbstractNode.declare_map_dataframe_input()
    @AbstractNode.declare_map_dataframe_output()
    def __init__(self) -> None:
        super().__init__()
        # Decorators on this constructor are used to by our job editor

    def complex_logic(self, param1: str) -> str:
        return "Hello {}".format(param1)

    def execute(self, input_data: InputHolder, output_data: OutputHolder) -> None:
        """ This method is executed by the engine
        You have access:
        * to subscribed node data: input_data
        * to publish data of any type: output_data
        """
        results: List[str] = self.complex_logic(self.settings.get("param1"))  # do something with your list...
        output_data.set(results)  # here we submit it to the next node !
Class Heritage

Your node should extend the punch AbstractNode and implement the def execute(self, input_data: InputHolder, output_data: OutputHolder) -> None method.

Decorators

Notice the usage of decorators. Using a concise and simple syntax, decorators declare your node configuration items in a way to be automatically detected by the punch Kibana punchline editor. In turn it provides the users of your node a clear and easy way to configure the node directly from the GUI.

Parameters

Here is how you access a declared parameter from the within execute method:

# "settings": { "myparam1": 1 }

self.settings.get("myparam1")

# output as integer
1

Accessing data from subscribed nodes and submitting data to publishing data to the next ones

Accessing data from previous nodes can be made through the: input_data parameter execute method

As you can guess, publishing data is made through the output_data parameter of execute method.

How to get multiple subscribed nodes input data
# use a for loop in execute method;

def execute(self, input_data: InputHolder, output_data: OutputHolder) -> None:
    for stream in input_data.streams.values():
        print(stream)
My code depends on a custom python module not provided by Pyspark/Python runtime, how to integrate it?

punchpkg provide for python and pyspark a command to generate from a requirements.txt a pex file in the right location. You can as an alternative, use pex directly: see the link !

My Punchline is running as expected, i want my custom node to be deployed in production, how to do it?

In contrast to your standalone version where you execute punchlines with out provided cli: punchlinectl, in production, you will likely be executing punchlines using our beloved shiva.

scenario 1:

In this scenario, we will consider that you want any shiva node to be able to take your punchline application and execute it.

For this to work, you will have to deploy your code and your python modules (if any of course), on each node of your shiva cluster.

Why?

Shiva by nature works in a way that it's the master node that decides on which shiva worker our punchline will be executed, and as you can guess, we have no control over it... Which means that the worker executing your punchline will require the necessary resources you configuration needs in order to execute properly.

scenario 2

Let's consider now that we want to restrict the execution of our punchline by shiva tag.

In this case, we will installing our code and our python modules (if any of course) on the tagged shiva nodes only.

Installation

Note, we currently don't have a clean solution to propose. In order to avoid the hassle of playing with our internals, you can deploy punchpkg on each of your shiva nodes.

Be sure to have $PUNCHPLATFORM_CONF_DIR set before executing punchpkg... Once you are done, you can use your usual installation process on each shiva nodes you are planning to execute pyspark/python punchlines !

Spark

Spark runtime

Comming soon

Packaging and Deploying

Once you have your node, the punch provides you with a cli tool to package it and upload it to your punch, typically a standalone version.

Simply launch it, you will get some online help:

    punchpkg

    Usage: punchpkg [OPTIONS] COMMAND [ARGS]...

      To activate auto completion, use:

      eval "$(_PUNCHPKG_COMPLETE=source punchpkg)"

    Options:
      --help  Show this message and exit.

    Commands:
      pyspark  module to manage pyspark
      spark    module to manage spark

As you can see autocompletion is supported, to activate it, simply run the following command:

eval "$(_PUNCHPKG_COMPLETE=source punchpkg)"

You can now use the double tab space to auto complete the instructions.

    # step 1
    punchpkg <tab><tab>

    pyspark  spark 

    # step 2
    punchpkg pyspark

    delete-jar             delete-node            delete-nodes-module    delete-pex-module      help                   info                   install-dependencies   install-node           link-external-nodes    list-dependencies      list-external-nodes    list-nodes             unlink-external-nodes  

    # step 3: for the sake of this tutorial, let's run info command
    punchpkg pyspark info

    {
        "PEX_PATH": [
            ""
        ],
        "install_dir": "/home/jonathan/Desktop/standalone/punchplatform-standalone-linux-6.0.0/external/punchplatform-pyspark-6.0.0"
    }

    # congrats for getting the above information displayed !