Skip to content

Python Spark Custom Node


In addition to using punch nodes, you may want to bring your own into a spark punchline. You get the ability to write your own Input, Processing and Output logic by leveraging pyspark api.

Coding your Node

Extending our APIs enable you and your team to put more focus on business logic rather than programming related problems.

PEX Project setup

We provide a starter on our github repository

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from setuptools import setup, find_packages

    description="boilerplate for custom nodes",
        "pex==2.1.6",  # this is important do not remove
        "requests==2.24.0",  # this is important do not remove

python package hierarchy

├── nodes
│   ├──
│   └── my_use_case
│       ├──
│       └── my_node
│           ├──
│           └──

Input Node

A simple node that publishes some random data to the next node.

from punchline_python.core.holders.input_holder import InputHolder
from punchline_python.core.holders.output_holder import OutputHolder
from punchline_python.core.node import AbstractNode
from typing import List

class ComplexAlgorithm(AbstractNode):

    # uncomment below if we want this node to take as input a single dataframe
    #  @AbstractNode.declare_dataframe_input()
    # We are expecting this node to publish one dataframe as output
    # We make use of the decorator design pattern to declare our node parameters...
    @AbstractNode.declare_param(name="param1", required=False, default="TEST")
    # We expect that this node subscribe to a stream and is going to output a stream of data
    def __init__(self) -> None:
        # Decorators on this constructor are used to by our job editor

    def complex_logic(param1: str) -> str:
        return "Hello {}".format(param1)

    def execute(self, input_data: InputHolder, output_data: OutputHolder) -> None:
        """ This method is executed by the engine
        You have access:
         * to subscribed node data: input_data
         * to publish data of any type: output_data
        import redis

        results: List[str] = ComplexAlgorithm.complex_logic(
        )  # do something with your list...
        output_data.set(results)  # here we submit it to the next node !

Making your node available to Punchline Editor can be used to generate a json file(s) whose contents are meta-information of your self-made node.

A usage example is as follows:

PACKAGES=mypackagenode \
    --packages $PACKAGES \
    --runtime $RUNTIME \
    --pex $MYNODE_PEX \
    --output-dir $EXTNODES_PYSPARK

Checking if properly installed

On a standalone release, you can use this command:

MY_NODE=your_node_name RUNTIME=spark punchplatform-scanner --runtime pyspark | grep $MY_NODE

Deploying your nodes

Installing your nodes

Standalone mode

For the standalone, put your pexs in $PUNCHPLATFORM_INSTALL_DIR/extlib/pyspark/.

We also provide a utility for this purpose, packaged in a standalone only. Use the command below to install the generated pex:

punchpkg pyspark install /full/path/to/my/custom/node.pex

Deployed mode

For a deployed mode, refer to this documentation

Using your nodes

You can now refer to your nodes in the punchline file.

name: helloworld
version: '6.0'
runtime: pyspark
- package: nodes.my_use_case.my_node
  type: complex_algorithm
  component: step1
  - stream: data
    param1: punch !
- type: python_show
  component: just_print_to_stdout
  - component: step1
    stream: data
  settings: {}
  spark.additional.pex: complex_algorithm_dependencies.pex


your package name matters, for now you should named your python package to nodes.

Don't forget to add:

type: complex_algorithm
package: nodes.my_use_case.my_node


It is possible to execute python code before/after our punchline execution. Use the parameter spark.pre_punchline_execution and spark.post_punchline_execution.

These parameter must reference the full module path with the desired class to instantiate.

# given spark.pre_punchline_execution: nodes.my_use_case.my_node.complex_algorithm_pre_execution.ComplexAlgorithmPreExecution
# an import is done like this
from nodes.my_use_case.my_node.complex_algorithm_pre_execution import ComplexAlgorithmPreExecution
# afterward initialized like this, code within the constructor are executed

You can now start your punchline as usual, using either the sparkctl or punchlinectl command to respectively submit your punchline to a spark cluster or to execute it in the foreground using the punch local process engine.

punchlinectl start -p punchline.yaml