Java Spark Custom Node¶
Abstract
In addition to using punch nodes, you may want to bring your own into a spark punchline. You get the ability to write your own Input, Processing and Output logic by leveraging spark api.
Coding your Node¶
To write a custom Spark node, you leverage the Spark API as well as an additional punch API that makes it a lot easier to focus only on your business logic. The punch API also provides you with several goodies:
- simple yet powerful way to provide configuration parameter to your nodes
- monitoring
Maven Project Setup¶
We provide a starter on our github repository
Make your code part of a maven project. The pom.xml
is (typically):
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.thales.punch.spark.node.starter.kit</groupId>
<artifactId>punch-spark-node-starter-kit</artifactId>
<packaging>jar</packaging>
<version>1.0</version>
<name>punch-spark-node-starter-kit</name>
<url>http://maven.apache.org</url>
<properties>
<spark.version>2.4.3</spark.version>
<junit.version>4.12</junit.version>
<punch.version>6.1.0</punch.version>
<jackson.version>2.10.1</jackson.version>
<log4j.version>2.12.1</log4j.version>
<maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.punchplatform</groupId>
<artifactId>punch-api</artifactId>
<version>${punch.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.punchplatform</groupId>
<artifactId>punch-spark-api</artifactId>
<version>${punch.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<encoding>UTF-8</encoding>
<showDeprecation>true</showDeprecation>
<showWarnings>true</showWarnings>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Tip
Punch and Spark dependencies are provided
by your platform.
Your jar should only include your custom dependencies !
Once compiled with maven install
command, it will generate a target/punch-spark-node-starter-kit-${VERSION}-jar-with-dependencies.jar
.
Input Node¶
Here is a simple input node that will publish a data list.
package org.thales.punch.spark.node.starter.kit;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import com.fasterxml.jackson.core.type.TypeReference;
import com.github.punch.api.spark.IDeclarer;
import com.github.punch.api.spark.datasets.OutputDataset;
import com.github.punch.api.spark.nodes.PunchInputNode;
import com.github.punch.api.node.PunchNode;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* An example of a custom INPUT_NODE
*
*
* @author Punch Team
*
*/
@PunchNode(name = "input_node_example", version = "1.0.0")
public class InputNode extends PunchInputNode {
private static final long serialVersionUID = 1L;
@JsonProperty()
public String title = "a_title";
@JsonProperty(value = "input_data")
public List<String> inputData = new LinkedList<>();
@Override
public void execute(OutputDataset output) {
SparkSession sparkSession = SparkSession.builder().getOrCreate();
List<Row> rows = inputData.stream().map(data -> RowFactory.create(data)).collect(Collectors.toList());
StructType schema = new StructType(new StructField[] {
new StructField(title, DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> documentDataset = sparkSession.createDataFrame(rows, schema);
output.put(documentDataset);
}
@Override
public void declare(IDeclarer declarer) {
declarer.publishMap(new TypeReference<Dataset<Row>>() {
});
}
}
Processing Node¶
Here is a simple processing node that forwards its received input to downstream nodes.
For the sake of the example it takes a single boolean configuration parameter. If set to false
, the data is simply discarded.
package org.thales.punch.spark.node.starter.kit;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.github.punch.api.node.PunchNode;
import com.github.punch.api.spark.IDeclarer;
import com.github.punch.api.spark.datasets.InputDataset;
import com.github.punch.api.spark.datasets.OutputDataset;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@PunchNode(name = "forwarding_node", version = "1.0.0")
public class MyForwardingNode extends PunchProcessingNode {
@JsonProperty(value = "forward")
Boolean forward = false;
@Override
public void execute(InputDataset input, OutputDataset output) {
if(forward){
input.getSingleton().ifPresent(output::put);
}
}
@Override
public void declare(IDeclarer declarer) {
declarer.publishMap(new TypeReference<Dataset<Row>>() {
});
}
}
As you can see it is quite simple as well.
Making your node availabe to Punchline Editor¶
punchplatform-inspect-node.sh
can be used to generate a json file(s) whose contents are meta-information of your self-made node.
A usage example is as follows:
EXTNODES_SPARK=$PUNCHPLATFORM_INSTALL_DIR/extresource/spark
MYNODE_JAR=$MYWORKDIR/mynode.jar
RUNTIME=spark
PACKAGES=org.something.package
punchplatform-inspect-node.sh \
--packages $PACKAGES \
--runtime $RUNTIME \
--jar $MYNODE_JAR \
--output-dir $EXTNODES_SPARK
Checking if properly installed
On a standalone release, you can use this command:
MY_NODE=your_node_name RUNTIME=storm punchplatform-scanner --runtime spark | grep $MY_NODE
Deploying your nodes¶
Installing your nodes¶
Standalone mode¶
For the standalone, put your jars in $PUNCHPLATFORM_INSTALL_DIR/extlib/spark/
.
We also provide a utility for this purpose, packaged in a standalone only. Use the command below to install the generated jar:
punchpkg spark install /full/path/to/my/custom/node.jar
Deployed mode¶
For a deployed mode, refer to this documentation
Using your nodes¶
You can now refer to your nodes in the punchline file.
For example, the following punchline use both nodes and print output to the console.
---
type: punchline
version: '6.0'
runtime: spark
dag:
- package: org.thales.punch.spark.node.starter.kit
type: input_node_example
component: input
settings:
title: alice
publish:
- stream: data
- type: show
component: show
settings: {}
publish: []
subscribe:
- component: input
stream: data
settings:
spark.additional.jar: punch-spark-node-starter-kit-1.0-jar-with-dependencies.jar
Tip
in case your package name is not one of the following:
1 2 |
|
you should provide package: name_of_your_package
attribute in your node configuration.
type: my_node
package: org.my.package
...
You can now start your punchline as usual, using either the sparkctl
or punchlinectl
command
to respectively submit your punchline to a spark cluster or to execute it in the foreground
using the punch local process engine.
punchlinectl start -p punchline.yaml