Skip to content

Java Spark Custom Node

Abstract

In addition to using punch nodes, you may want to bring your own and insert them into a spark punchline. You benefit from the power of the Spark API, giving you full control over the data.

Code you Node

To write a custom Spark node, you leverage the Spark API as well as an additional punch API that makes it a lot easier to focus only on your business logic. The punch API also provides you with several goodies:

  • simple yet powerful way to provide configuration parameter to your nodes
  • monitoring

Maven Project Setup

Make your code part of a maven project. The pom.xml is (typically):

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.thales.punch</groupId>
  <artifactId>my-custom-node</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>Punch User Node</name>
  <url>http://maven.apache.org</url>

  <dependencies>
     <!--CORE PUNCH API-->
      <dependency>
          <groupId>com.github.punchplatform</groupId>
          <artifactId>punch-api</artifactId>
          <version>6.1.0-SNAPSHOT</version>
          <scope>provided</scope>
      </dependency>

    <!--NEEDED FOR CREATE NODE -->
      <dependency>
          <groupId>com.fasterxml.jackson.core</groupId>
          <artifactId>jackson-annotations</artifactId>
          <version>2.9.8</version>
          <scope>provided</scope>
      </dependency>
      <dependency>
          <groupId>com.fasterxml.jackson.core</groupId>
          <artifactId>jackson-databind</artifactId>
          <version>2.9.8</version>
          <scope>provided</scope>
      </dependency>

    <!--SPECIFIC SPARK PUNCH API WITH SPARK OTHER DEPENDENCIES-->
     <dependency>
         <groupId>com.github.punchplatform</groupId>
         <artifactId>punch-spark-api</artifactId>
         <version>6.1.0-SNAPSHOT</version>
         <scope>provided</scope>
     </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.4.3</version>
        <scope>provided</scope>
    </dependency>
  </dependencies>

   <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>${maven-compiler-plugin.version}</version>
        <configuration>
          <encoding>UTF-8</encoding>
          <showDeprecation>true</showDeprecation>
          <showWarnings>true</showWarnings>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>
  </build>

</project>

Tip

Note that you need not generate a fat jar. Punch and Spark dependencies are provided by your platform. What you need to generate is a jar containing only your dependencies.

Once compiled with maven install command, it will generate a target/my-custom-node.jar.

Input Node

Here is a simple input node that will publish a data list.

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.github.punch.api.node.PunchNode;
import com.github.punch.api.spark.IDeclarer;
import com.github.punch.api.spark.OutputDataset;
import com.github.punch.api.spark.nodes.PunchInputNode;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@PunchNode(name = "hello_node", version = "1.0.0")
public class MyHelloNode extends PunchInputNode {

    @JsonProperty()
    String title = "My Title";

    @JsonProperty(value = "input_data")
     List<String> inputData = new LinkedList<>();


    @Override
    public void execute(OutputDataset output) {
        SparkSession sparkSession = SparkSession.builder().getOrCreate();

        List<Row> rows = inputData.stream().map(data -> RowFactory.create(data))
            .collect(Collectors.toList());

        StructType schema = new StructType(new StructField[]{
                new StructField(title, DataTypes.StringType, false, Metadata.empty())
        });

        Dataset<Row> documentDataset = sparkSession.createDataFrame(data, schema);
        output.put(documentDataset);
    }

    @Override
    public void declare(IDeclarer declarer) {
        declarer.publishMap(new TypeReference<Dataset<Row>>() {
        });
    }
}

Processing Node

Here is a simple processing node that forwards its received input to downstream nodes. For the sake of the example it takes a single boolean configuration parameter. If set to false, the data is simply discarded.

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.github.punch.api.node.PunchNode;
import com.github.punch.api.spark.IDeclarer;
import com.github.punch.api.spark.InputDataset;import com.github.punch.api.spark.OutputDataset;
import com.github.punch.api.spark.nodes.PunchInputNode;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@PunchNode(name = "forwarding_node", version = "1.0.0")
public class MyForwardingNode extends PunchProcessingNode {

    @JsonProperty()
    Boolean forward = false;

    @Override
    public void execute(InputDataset input, OutputDataset output) {
        if(forward){
            input.getSingleton().ifPresent(output::put);
        }
    }

    @Override
    public void declare(IDeclarer declarer) {
        declarer.publishMap(new TypeReference<Dataset<Row>>() {
        });
    }
}

As you can see it is quite simple as well.

Deploy your nodes

Installing your nodes

For the standalone, put your jars in $PUNCHPLATFORM_SPARK_INSTALL_DIR/punchplatform/analytics/job/custom/.

We also provide a utility for this purpose, packaged in a standalone only. Use the command below to install the generated jar:

punchpkg spark install-dependencies /full/path/to/my/custom/node.jar

Using your nodes

You can now refer to your nodes in the punchline file.

For example, the following punchline use both nodes and print output to the console.

{
  type: punchline
  version: 6.0
  tenant: mytenant
  channel: mychannel
  runtime: spark
  dag: [    
    {
      component : hello
      type: hello_node
      package : my.custom.package
      settings: {
        username: alice 
      },
      publish: [
        {
            stream: data
        }
      ]
    },
    {
      component : forwing
      type: forwarding_node
      packae : my.custom.package
      settings: {
        forward: true
      },
      subscribe: [
          {
            component: hello
            stream: data
          }
      ],
      publish: [
          {
            stream: data
          }
      ]
    },
    {
        component: print,
        type: show
        settings: {

        },
        subscribe: [
           {
            component: forwing
            stream: data
          }
        ],
        publish: []
    }
  ]
}

Tip

in case your package name is not one of the following:

1
2
-   org.thales.punch
-   org.punch

you should provide package: name_of_your_package attribute in your node configuration.

{
    type: my_node
    package: org.my.package
    ...
}

You can now start your punchline as usual, using either the sparkctl or punchlinectl command to respectively submit your punchline to a spark cluster or to execute it in the foreground using the punch local process engine.

punchlinectl $PUNCHPLATFORM_CONF_DIR/tenant/mytenant/channels/mychannel/punchline.hjson