Skip to content

Java Spark Custom Node

Abstract

In addition to using punch nodes, you may want to bring your own into a spark punchline. You get the ability to write your own Input, Processing and Output logic by leveraging spark api.

Coding your Node

To write a custom Spark node, you leverage the Spark API as well as an additional punch API that makes it a lot easier to focus only on your business logic. The punch API also provides you with several goodies:

  • simple yet powerful way to provide configuration parameter to your nodes
  • monitoring

Maven Project Setup

We provide a starter on our github repository

Make your code part of a maven project. The pom.xml is (typically):

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.thales.punch.spark.node.starter.kit</groupId>
    <artifactId>punch-spark-node-starter-kit</artifactId>
    <packaging>jar</packaging>
    <version>1.0</version>
    <name>punch-spark-node-starter-kit</name>
    <url>http://maven.apache.org</url>
    <properties>
        <spark.version>2.4.3</spark.version>
        <junit.version>4.12</junit.version>
        <punch.version>6.1.0</punch.version>
        <jackson.version>2.10.1</jackson.version>
        <log4j.version>2.12.1</log4j.version>
        <maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.github.punchplatform</groupId>
            <artifactId>punch-api</artifactId>
            <version>${punch.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.github.punchplatform</groupId>
            <artifactId>punch-spark-api</artifactId>
            <version>${punch.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>${jackson.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>${jackson.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven-compiler-plugin.version}</version>
                <configuration>
                    <encoding>UTF-8</encoding>
                    <showDeprecation>true</showDeprecation>
                    <showWarnings>true</showWarnings>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>

        </plugins>
    </build>
</project>

Tip

Punch and Spark dependencies are provided by your platform. Your jar should only include your custom dependencies !

Once compiled with maven install command, it will generate a target/punch-spark-node-starter-kit-${VERSION}-jar-with-dependencies.jar.

Input Node

Here is a simple input node that will publish a data list.

package org.thales.punch.spark.node.starter.kit;

import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import com.fasterxml.jackson.core.type.TypeReference;
import com.github.punch.api.spark.IDeclarer;
import com.github.punch.api.spark.OutputDataset;
import com.github.punch.api.spark.nodes.PunchInputNode;
import com.github.punch.api.node.PunchNode;
import com.fasterxml.jackson.annotation.JsonProperty;

/**
 * An example of a custom INPUT_NODE
 * 
 * 
 * @author Punch Team
 *
 */
@PunchNode(name = "input_node_example", version = "1.0.0")
public class InputNode extends PunchInputNode {

  private static final long serialVersionUID = 1L;

  @JsonProperty()
  public String title = "a_title";

  @JsonProperty(value = "input_data")
  public List<String> inputData = new LinkedList<>();

  @Override
  public void execute(OutputDataset output) {
    SparkSession sparkSession = SparkSession.builder().getOrCreate();
    List<Row> rows = inputData.stream().map(data -> RowFactory.create(data)).collect(Collectors.toList());
    StructType schema = new StructType(new StructField[] {
        new StructField(title, DataTypes.StringType, false, Metadata.empty())
    });
    Dataset<Row> documentDataset = sparkSession.createDataFrame(rows, schema);
    output.put(documentDataset);
  }

  @Override
  public void declare(IDeclarer declarer) {
    declarer.publishMap(new TypeReference<Dataset<Row>>() {
    });
  }

}

Processing Node

Here is a simple processing node that forwards its received input to downstream nodes. For the sake of the example it takes a single boolean configuration parameter. If set to false, the data is simply discarded.

package org.thales.punch.spark.node.starter.kit;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.github.punch.api.node.PunchNode;
import com.github.punch.api.spark.IDeclarer;
import com.github.punch.api.spark.InputDataset;
import com.github.punch.api.spark.OutputDataset;
import com.github.punch.api.spark.nodes.PunchInputNode;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@PunchNode(name = "forwarding_node", version = "1.0.0")
public class MyForwardingNode extends PunchProcessingNode {

    @JsonProperty(value = "forward")
    Boolean forward = false;

    @Override
    public void execute(InputDataset input, OutputDataset output) {
        if(forward){
            input.getSingleton().ifPresent(output::put);
        }
    }

    @Override
    public void declare(IDeclarer declarer) {
        declarer.publishMap(new TypeReference<Dataset<Row>>() {
        });
    }
}

As you can see it is quite simple as well.

Deploying your nodes

Installing your nodes

Standalone mode

For the standalone, put your jars in $PUNCHPLATFORM_INSTALL_DIR/extlib/spark/.

We also provide a utility for this purpose, packaged in a standalone only. Use the command below to install the generated jar:

punchpkg spark install /full/path/to/my/custom/node.jar

Deployed mode

For a deployed mode, refer to this documentation

Using your nodes

You can now refer to your nodes in the punchline file.

For example, the following punchline use both nodes and print output to the console.

{
  type: punchline
  version: "6.0"
  tenant: mytenant
  channel: mychannel
  runtime: spark
  dag:
  [
    {
      package: org.thales.punch.spark.node.starter.kit
      type: input_node_example
      component: input
      settings:
      {
        title: alice
      }
      publish:
      [
        {
          stream: data
        }
      ]
    }
    {
      type: show
      component: show
      settings: {}
      publish: []
      subscribe:
      [
        {
          component: input
          stream: data
        }
      ]
    }
  ]
  settings: {
      spark.additional.jar: punch-spark-node-starter-kit-1.0-jar-with-dependencies.jar
  }
}

Tip

in case your package name is not one of the following:

1
2
-   org.thales.punch
-   org.punch

you should provide package: name_of_your_package attribute in your node configuration.

{
    type: my_node
    package: org.my.package
    ...
}

You can now start your punchline as usual, using either the sparkctl or punchlinectl command to respectively submit your punchline to a spark cluster or to execute it in the foreground using the punch local process engine.

punchlinectl $PUNCHPLATFORM_CONF_DIR/tenant/mytenant/channels/mychannel/punchline.hjson