Skip to content

Java Storm Custom Node

Abstract

In addition to using punch nodes, you may want to bring your own and insert them into a storm punchline. You benefit from the power of the Storm API, giving you full control over the data forwarding, ackownledgement, enriching etc..

Coding your Node

To write a custom storm node, you leverage the storm API as well as an additional punch API that makes it a lot easier to focus only on your business logic. The punch API also provides you with several goodies:

  • simple yet powerful way to provide configuration parameter to your nodes
  • load control capabilities
  • rate limiting capabilities
  • monitoring

Maven Project Setup

We provide a starter on our github repository

Make your code part of a maven project. The pom.xml is (typically):

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.thales.punch</groupId>
    <artifactId>punch-storm-node-starter-kit</artifactId>
    <packaging>jar</packaging>
    <version>1.0-SNAPSHOT</version>
    <name>Punch User Node</name>
    <url>http://maven.apache.org</url>
    <properties>
        <storm.version>2.1.0</storm.version>
        <junit.version>4.12</junit.version>
        <punch.version>6.1.0</punch.version>
        <jackson.version>2.10.1</jackson.version>
        <maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.github.punchplatform</groupId>
            <artifactId>punch-api</artifactId>
            <version>${punch.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.github.punchplatform</groupId>
            <artifactId>punch-storm-api</artifactId>
            <version>${punch.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>${jackson.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>${jackson.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-client</artifactId>
            <version>${storm.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven-compiler-plugin.version}</version>
                <configuration>
                    <encoding>UTF-8</encoding>
                    <showDeprecation>true</showDeprecation>
                    <showWarnings>true</showWarnings>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Tip

Your jar should include only your dependencies, others (punch related) are provided by your platform. For instance Punch and storm dependencies are provided by your platform.

Once compiled with maven install command, it will generate a target/punch-storm-node-starter-kit-${VERSION}-jar-with-dependencies.jar.

Input Node

Here is a simple input node that will publish a hello {username} string at high rate.

package org.thales.punch.storm.node.starter.kit;

import java.util.Arrays;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.punch.api.node.PunchNode;
import com.github.punch.api.storm.PunchInputNode;

/**
 * An example of a custom storm Input Node
 * 
 * @author Punch Team
 *
 */
@PunchNode(name = "input_node", version = "1.0.0")
public class InputNode extends PunchInputNode {

  private static final long serialVersionUID = 1L;

  /**
  Specific name to display.
  */
  @JsonProperty(value = "user_name", required = true)
  public String username;

  String message;

  @Override
  public void onOpen() {
      super.onOpen();
      message = "Hello "+username;
      getLogger().info("open node with message={}", message);  
  }

  @Override
  public void nextTuple() {
      super.nextTuple();
      getPublishStreams().forEach(stream -> 
          getCollector().emit(stream.getStreamId(), Arrays.asList(message)));
  }

  @Override
  public void onClose() {
      super.onClose();
      // do something before the node exit
  }

}

Processing Node

Here is a simple processing node that forwards its received input to downstream nodes. For the sake of the example it takes a single boolean configuration parameter. If set to false, the data is simply discarded.

package org.thales.punch.storm.node.starter.kit;

import java.util.Collections;
import org.apache.storm.tuple.Tuple;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.punch.api.node.PunchNode;
import com.github.punch.api.node.StreamDeclaration;
import com.github.punch.api.storm.PunchProcessingNode;

/**
 * 
 * An example of Storm processing node
 * 
 * @author Punch Team
 *
 */
@PunchNode(name = "processing_node", version = "1.0.0")
public class ProcessingNode extends PunchProcessingNode {

  private static final long serialVersionUID = 1L;

  /**
    Enable or disable processing.
    default: false
   */
  @JsonProperty(value = "can_process")
  public Boolean canProcess = false;

  @Override
  public void process(Tuple input) {
    for (StreamDeclaration stream: getPublishStreams()) {
      if (canProcess.booleanValue()) {
        getCollector().emit(stream.getStreamId(), Collections.singleton(input), input.getValues());
      } 
    }
    getCollector().ack(input);
  }

}
As you can see it is quite simple as well.

Deploying your nodes

Installing your nodes

Standalone mode

For the standalone, put your jars in $PUNCHPLATFORM_INSTALL_DIR/extlib/storm/.

Deployed mode

For a deployed mode, refer to this documentation

Using your nodes

You can now refer to your nodes in the punchline file. Then use the corresponding node:

  • third_party_input node type for input nodes (Storm spouts)
  • third_party_node node type for processing nodes (Storm bolts).

The two important punchline new properties are:

  • additional_jars defined at the level of the punchline, it provides one or several jars. These are the jars generated by your project.

  • class defined at the level of your custom nodes. It must refer to the main class in charge of creating your node.

For example, the following punchline use both nodes and print output to the console.

{
  tenant: mytenant
  channel: mychannel
  version: "6.0"
  runtime: storm
  dag:
  [
    {
      type: third_party_input
      component: input_node
      class: org.thales.punch.storm.node.starter.kit.InputNode
      settings:
      {
        username: alice
        load_control: rate
        load_control.rate: 1
      }
      publish:
      [
        {
          stream: logs
          fields:
          [
            log
          ]
        }
      ]
    }
    {
      type: third_party_node
      component: processing_node
      class: org.thales.punch.storm.node.starter.kit.ProcessingNode
      settings:
      {
        can_process: true
      }
      subscribe:
      [
        {
          component: input_node
          stream: logs
        }
      ]
      publish:
      [
        {
          stream: logs
          fields:
          [
            log
          ]
        }
      ]
    }
    {
      type: punchlet_node
      component: show
      settings: {
        punchlet_code: "{ print(root); }"
      }
      subscribe:
      [
        {
          component: processing_node
          stream: logs
        }
      ]
      publish: []
    }
  ]
  settings:
  {
    additional_jars:
    [
      punch-storm-starter-kit-1.0-SNAPSHOT-jar-with-dependencies.jar
    ]
  }
}

Tip

Note the use of the load_control punch parameters. It makes your input node publish one item per second.

You can now start your punchline as usual, using either the channelctl or punchlinectl command to respectively submit your punchline to a storm cluster or to execute it in the foreground using the punch single-process lightweight engine.

punchlinectl $PUNCHPLATFORM_CONF_DIR/tenant/mytenant/channels/mychannel/punchline.hjson