Skip to content

User Nodes

Abstract

If you are interested in contributing/experiencing Punchline developers API to create your custom Punchline node using the Spark or Pyspark runtime, then this is the right place for you !

Code you Node

A starter is provided for you to get started quickly: https://github.com/punchplatform/starters/tree/6.x/spark

To write a custom storm node, you leverage the Spark API as well as an additional punch API that makes it a lot easier to focus only on your business logic. The punch API also provides you with several goodies:

  • simple yet powerful way to provide configuration parameter to your nodes.
  • load control capabilities
  • monitoring

Required dependencies

<dependency>
    <groupId>org.thales.punch</groupId>
    <artifactId>punchplatform-analytics-job</artifactId>
    <version>${punch.version}</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.thales.punch.plugins</groupId>
    <artifactId>punchplatform-analytics-plugins-json</artifactId>
    <version>${punch.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.thales.punch.plugins</groupId>
    <artifactId>punchplatform-analytics-plugins-utils</artifactId>
    <version>${punch.version}</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
</dependency>

Output Node in Spark

Abstract

in case your aritifact id is not one of the following:

1
2
-   org.thales.punch
-   org.punch

you should provide package: name_of_your_package in your node configuration.

{
    type: my_node
    package: org.my.package
    ...
}

Here is a simple input node that will take as input a dataset and print out the result !.

package org.thales.punch.spark.node.starter.kit;

import java.util.Optional;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.thales.punch.api.PunchUncheckedException;
import org.thales.punch.pml.configuration.NodeName;
import org.thales.punch.pml.configuration.exceptions.PMLException;
import org.thales.punch.pml.configuration.scanner.ScanNode;
import org.thales.punch.pml.configuration.scanner.ScanNode.ScanType;
import org.thales.punch.pml.job.graph.IDeclarer;
import org.thales.punch.pml.job.graph.IInputDatasetHolder;
import org.thales.punch.pml.job.graph.INode;
import org.thales.punch.pml.job.graph.IOutputDatasetHolder;

import com.fasterxml.jackson.core.type.TypeReference;

/**
 * <pre>
 * PREREQUISITE: 
 * 
 * - A STANDALONE installed
 * - update pom.xml to your STANDALONE version (punch.version key)
 * - use punchplatform-developpment.sh --install to add necessary maven artifacts to your .m2 folder
 * - package name should be org.thales, org.punch; else you should use package key in your node settings
 * 
 * {
 *      type: my_node
 *      package: org.thales.punch.spark.node.starter.kit
 *      ....
 * }
 * 
 * </pre>
 * Empty project where you can write your PUNCHLINE Spark Node
 * <p>
 * @author jonathan
 *
 */
@NodeName("my_node")
@ScanNode(type = ScanType.OUTPUT_NODE)
public class MyNode
    implements INode {

    private static final long serialVersionUID = 1L;

    @Override
    public void execute(
        IInputDatasetHolder input, 
        IOutputDatasetHolder output) 
            throws PMLException {
        // print to stdout example
        Optional<Dataset<Row>> show = input
                .getSingletonDataframe();
        if (show.isPresent()) {
            show
                .get()
                .show();
        } else {
            throw new PunchUncheckedException("null detected");
        }
    }

    @Override
    public void declare(
        IDeclarer declarer) {
        declarer
            .subscribeSingleton(
                new TypeReference<Dataset<Row>>() {});
    }

}

Building the project and installing the jar

To make that input node available to a punchline, you must install the generated jar by mvn clean install in a special directory where all the custom jars will be scanned.

We provide a utility for this purpose, packaged in a standalone release: punchpkg

use the command below to install the generated jar:

punchpkg spark install-dependencies /full/path/to/jar/with/dependencies.jar

Note

use punchplatform-development.sh --install to add needed dependencies in your .m2

your pom.xml should have the following dependencies:

Viewing your node in Punchline editor

In case you want your custom JAVA node to be visible in Punchline Editor, your node should belong to the package org.thales. Futhermore, you should install your jar in: $PUNCHPLATFORM_CONF_DIR/../external/spark-2.4.3-bin-hadoop2.7/punchplatform/analytics/job/additional_jars/