Python Spark Custom Node¶
Abstract
In addition to using punch nodes, you may want to bring your own into a spark punchline. You get the ability to write your own Input, Processing and Output logic by leveraging pyspark api.
Coding your Node¶
Extending our APIs enable you and your team to put more focus on business logic rather than programming related problems.
PEX Project setup¶
We provide a starter on our github repository
setup.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from setuptools import setup, find_packages
setup(
name="nodes",
version="1.0",
packages=find_packages(),
include_package_data=True,
author="punchplatform",
author_email="contact@punchplatform.com",
description="boilerplate for custom nodes",
python_requires='>=3.6',
install_requires=[
"pex==2.1.6", # this is important do not remove
"requests==2.24.0", # this is important do not remove
"redis"
]
)
python package hierarchy
├── nodes
│ ├── __init__.py
│ └── my_use_case
│ ├── __init__.py
│ └── my_node
│ ├── complex_algorithm.py
│ └── __init__.py
Input Node¶
A simple node that publishes some random data to the next node.
from punchline_python.core.holders.input_holder import InputHolder
from punchline_python.core.holders.output_holder import OutputHolder
from punchline_python.core.node import AbstractNode
from typing import List
class ComplexAlgorithm(AbstractNode):
# uncomment below if we want this node to take as input a single dataframe
# @AbstractNode.declare_dataframe_input()
# We are expecting this node to publish one dataframe as output
@AbstractNode.declare_dataframe_output()
# We make use of the decorator design pattern to declare our node parameters...
@AbstractNode.declare_param(name="param1", required=False, default="TEST")
# We expect that this node subscribe to a stream and is going to output a stream of data
@AbstractNode.declare_map_dataframe_input()
@AbstractNode.declare_map_dataframe_output()
def __init__(self) -> None:
super().__init__()
# Decorators on this constructor are used to by our job editor
@staticmethod
def complex_logic(param1: str) -> str:
return "Hello {}".format(param1)
def execute(self, input_data: InputHolder, output_data: OutputHolder) -> None:
""" This method is executed by the engine
You have access:
* to subscribed node data: input_data
* to publish data of any type: output_data
"""
import redis
print(redis.__version__)
results: List[str] = ComplexAlgorithm.complex_logic(
self.settings.get("param1")
) # do something with your list...
output_data.set(results) # here we submit it to the next node !
Making your node available to Punchline Editor¶
punchplatform-inspect-node.sh
can be used to generate a json file(s) whose contents are meta-information of your self-made node.
A usage example is as follows:
EXTNODES_PYSPARK=$PUNCHPLATFORM_INSTALL_DIR/extresource/pyspark
MYNODE_PEX=$MYWORKDIR/mynode.pex
RUNTIME=pyspark
PACKAGES=mypackagenode
punchplatform-inspect-node.sh \
--packages $PACKAGES \
--runtime $RUNTIME \
--pex $MYNODE_PEX \
--output-dir $EXTNODES_PYSPARK
Checking if properly installed
On a standalone release, you can use this command:
MY_NODE=your_node_name RUNTIME=spark punchplatform-scanner --runtime pyspark | grep $MY_NODE
Deploying your nodes¶
Installing your nodes¶
Standalone mode¶
For the standalone, put your pexs in $PUNCHPLATFORM_INSTALL_DIR/extlib/pyspark/
.
We also provide a utility for this purpose, packaged in a standalone only. Use the command below to install the generated pex:
punchpkg pyspark install /full/path/to/my/custom/node.pex
Deployed mode¶
For a deployed mode, refer to this documentation
Using your nodes¶
You can now refer to your nodes in the punchline file.
---
name: helloworld
version: '6.0'
runtime: pyspark
dag:
- package: nodes.my_use_case.my_node
type: complex_algorithm
component: step1
publish:
- stream: data
settings:
param1: punch !
- type: python_show
component: just_print_to_stdout
subscribe:
- component: step1
stream: data
settings: {}
settings:
spark.additional.pex: complex_algorithm_dependencies.pex
Note
your package name matters, for now you should named your python package to nodes
.
Don't forget to add:
type: complex_algorithm
package: nodes.my_use_case.my_node
...
Note
It is possible to execute python code before/after our punchline execution.
Use the parameter spark.pre_punchline_execution
and spark.post_punchline_execution
.
These parameter must reference the full module path with the desired class to instantiate.
# given spark.pre_punchline_execution: nodes.my_use_case.my_node.complex_algorithm_pre_execution.ComplexAlgorithmPreExecution
# an import is done like this
from nodes.my_use_case.my_node.complex_algorithm_pre_execution import ComplexAlgorithmPreExecution
# afterward initialized like this, code within the constructor are executed
ComplexAlgorithmPreExecution()
You can now start your punchline as usual, using either the sparkctl
or punchlinectl
command
to respectively submit your punchline to a spark cluster or to execute it in the foreground
using the punch local process engine.
punchlinectl start -p punchline.yaml