Skip to content

Python Spark Custom Node

Abstract

In addition to using punch nodes, you may want to bring your own into a spark punchline. You get the ability to write your own Input, Processing and Output logic by leveraging pyspark api.

Coding your Node

Extending our APIs enable you and your team to put more focus on business logic rather than programming related problems.

PEX Project setup

We provide a starter on our github repository

setup.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


from setuptools import setup, find_packages

setup(
    name="nodes",
    version="1.0",
    packages=find_packages(),
    include_package_data=True,
    author="punchplatform",
    author_email="contact@punchplatform.com",
    description="boilerplate for custom nodes",
    python_requires='>=3.6',
    install_requires=[
        "pex==2.1.6",  # this is important do not remove
        "requests==2.24.0",  # this is important do not remove
        "redis"
    ]
)

python package hierarchy

├── nodes
│   ├── __init__.py
│   └── my_use_case
│       ├── __init__.py
│       └── my_node
│           ├── complex_algorithm.py
│           └── __init__.py

Input Node

A simple node that publishes some random data to the next node.

from punchline_python.core.holders.input_holder import InputHolder
from punchline_python.core.holders.output_holder import OutputHolder
from punchline_python.core.node import AbstractNode
from typing import List


class ComplexAlgorithm(AbstractNode):

    # uncomment below if we want this node to take as input a single dataframe
    #  @AbstractNode.declare_dataframe_input()
    # We are expecting this node to publish one dataframe as output
    @AbstractNode.declare_dataframe_output()
    # We make use of the decorator design pattern to declare our node parameters...
    @AbstractNode.declare_param(name="param1", required=False, default="TEST")
    # We expect that this node subscribe to a stream and is going to output a stream of data
    @AbstractNode.declare_map_dataframe_input()
    @AbstractNode.declare_map_dataframe_output()
    def __init__(self) -> None:
        super().__init__()
        # Decorators on this constructor are used to by our job editor

    @staticmethod
    def complex_logic(param1: str) -> str:
        return "Hello {}".format(param1)

    def execute(self, input_data: InputHolder, output_data: OutputHolder) -> None:
        """ This method is executed by the engine
        You have access:
         * to subscribed node data: input_data
         * to publish data of any type: output_data
        """
        import redis

        print(redis.__version__)
        results: List[str] = ComplexAlgorithm.complex_logic(
            self.settings.get("param1")
        )  # do something with your list...
        output_data.set(results)  # here we submit it to the next node !

Deploying your nodes

Installing your nodes

Standalone mode

For the standalone, put your pexs in $PUNCHPLATFORM_INSTALL_DIR/extlib/pyspark/.

We also provide a utility for this purpose, packaged in a standalone only. Use the command below to install the generated pex:

punchpkg pyspark install /full/path/to/my/custom/node.pex

Deployed mode

For a deployed mode, refer to this documentation

Using your nodes

You can now refer to your nodes in the punchline file.

{
    name: helloworld
    channel: default
    version: "6.0"
    tenant: default
    runtime: pyspark
    dag: [
        {
            package: nodes.my_use_case.my_node
            type: complex_algorithm
            component: step1
            publish: [
                {
                    stream: data
                }
            ]
            settings: {
                param1: punch !
            }
        }
        {
            type: python_show
            component: just_print_to_stdout
            subscribe: [
                {
                    component: step1
                    stream: data
                }
            ]
            settings: {
            }
        }
    ]
    settings: {
        spark.additional.pex: complex_algorithm_dependencies.pex
    }
}

Note

your package name matters, for now you should named your python package to nodes.

Don't forget to add:

{
    type: complex_algorithm
    package: nodes.my_use_case.my_node
    ...
}

Note

It is possible to execute python code before/after our punchline execution. Use the parameter spark.pre_punchline_execution and spark.post_punchline_execution.

These parameter must reference the full module path with the desired class to instantiate.

# given spark.pre_punchline_execution: nodes.my_use_case.my_node.complex_algorithm_pre_execution.ComplexAlgorithmPreExecution
# an import is done like this
from nodes.my_use_case.my_node.complex_algorithm_pre_execution import ComplexAlgorithmPreExecution
# afterward initialized like this, code within the constructor are executed
ComplexAlgorithmPreExecution()

You can now start your punchline as usual, using either the sparkctl or punchlinectl command to respectively submit your punchline to a spark cluster or to execute it in the foreground using the punch local process engine.

punchlinectl $PUNCHPLATFORM_CONF_DIR/tenant/mytenant/channels/mychannel/punchline.hjson