Skip to content

Python Spark Custom Node

Abstract

In addition to using punch nodes, you may want to bring your own into a spark punchline. You get the ability to write your own Input, Processing and Output logic by leveraging pyspark api.

Coding your Node

Extending our APIs enable you and your team to put more focus on business logic rather than programming related problems.

PEX Project setup

We provide a starter on our github repository

setup.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


from setuptools import setup, find_packages

setup(
    name="nodes",
    version="1.0",
    packages=find_packages(),
    include_package_data=True,
    author="punchplatform",
    author_email="contact@punchplatform.com",
    description="boilerplate for custom nodes",
    python_requires='>=3.6',
    install_requires=[
        "pex==2.1.6",  # this is important do not remove
        "requests==2.24.0",  # this is important do not remove
        "redis"
    ]
)

python package hierarchy

├── nodes
│   ├── __init__.py
│   └── my_use_case
│       ├── __init__.py
│       └── my_node
│           ├── complex_algorithm.py
│           └── __init__.py

Input Node

A simple node that publishes some random data to the next node.

from punchline_python.core.holders.input_holder import InputHolder
from punchline_python.core.holders.output_holder import OutputHolder
from punchline_python.core.node import AbstractNode
from typing import List


class ComplexAlgorithm(AbstractNode):

    # uncomment below if we want this node to take as input a single dataframe
    #  @AbstractNode.declare_dataframe_input()
    # We are expecting this node to publish one dataframe as output
    @AbstractNode.declare_dataframe_output()
    # We make use of the decorator design pattern to declare our node parameters...
    @AbstractNode.declare_param(name="param1", required=False, default="TEST")
    # We expect that this node subscribe to a stream and is going to output a stream of data
    @AbstractNode.declare_map_dataframe_input()
    @AbstractNode.declare_map_dataframe_output()
    def __init__(self) -> None:
        super().__init__()
        # Decorators on this constructor are used to by our job editor

    @staticmethod
    def complex_logic(param1: str) -> str:
        return "Hello {}".format(param1)

    def execute(self, input_data: InputHolder, output_data: OutputHolder) -> None:
        """ This method is executed by the engine
        You have access:
         * to subscribed node data: input_data
         * to publish data of any type: output_data
        """
        import redis

        print(redis.__version__)
        results: List[str] = ComplexAlgorithm.complex_logic(
            self.settings.get("param1")
        )  # do something with your list...
        output_data.set(results)  # here we submit it to the next node !

Making your node available to Punchline Editor

punchplatform-inspect-node.sh can be used to generate a json file(s) whose contents are meta-information of your self-made node.

A usage example is as follows:

EXTNODES_PYSPARK=$PUNCHPLATFORM_INSTALL_DIR/extresource/pyspark
MYNODE_PEX=$MYWORKDIR/mynode.pex
RUNTIME=pyspark
PACKAGES=mypackagenode
punchplatform-inspect-node.sh \
    --packages $PACKAGES \
    --runtime $RUNTIME \
    --pex $MYNODE_PEX \
    --output-dir $EXTNODES_PYSPARK

Checking if properly installed

On a standalone release, you can use this command:

MY_NODE=your_node_name RUNTIME=spark punchplatform-scanner --runtime pyspark | grep $MY_NODE

Deploying your nodes

Installing your nodes

Standalone mode

For the standalone, put your pexs in $PUNCHPLATFORM_INSTALL_DIR/extlib/pyspark/.

We also provide a utility for this purpose, packaged in a standalone only. Use the command below to install the generated pex:

punchpkg pyspark install /full/path/to/my/custom/node.pex

Deployed mode

For a deployed mode, refer to this documentation

Using your nodes

You can now refer to your nodes in the punchline file.

---
name: helloworld
version: '6.0'
runtime: pyspark
dag:
- package: nodes.my_use_case.my_node
  type: complex_algorithm
  component: step1
  publish:
  - stream: data
  settings:
    param1: punch !
- type: python_show
  component: just_print_to_stdout
  subscribe:
  - component: step1
    stream: data
  settings: {}
settings:
  spark.additional.pex: complex_algorithm_dependencies.pex

Note

your package name matters, for now you should named your python package to nodes.

Don't forget to add:

type: complex_algorithm
package: nodes.my_use_case.my_node
...

Note

It is possible to execute python code before/after our punchline execution. Use the parameter spark.pre_punchline_execution and spark.post_punchline_execution.

These parameter must reference the full module path with the desired class to instantiate.

# given spark.pre_punchline_execution: nodes.my_use_case.my_node.complex_algorithm_pre_execution.ComplexAlgorithmPreExecution
# an import is done like this
from nodes.my_use_case.my_node.complex_algorithm_pre_execution import ComplexAlgorithmPreExecution
# afterward initialized like this, code within the constructor are executed
ComplexAlgorithmPreExecution()

You can now start your punchline as usual, using either the sparkctl or punchlinectl command to respectively submit your punchline to a spark cluster or to execute it in the foreground using the punch local process engine.

punchlinectl start -p punchline.yaml