Skip to content

Java Storm Custom Node

Abstract

In addition to using punch nodes, you may want to bring your own and insert them into a storm punchline. You benefit from the power of the Storm API, giving you full control over the data forwarding, ackownledgement, enriching etc..

Coding your Node

To write a custom storm node, you leverage the storm API as well as an additional punch API that makes it a lot easier to focus only on your business logic. The punch API also provides you with several goodies:

  • simple yet powerful way to provide configuration parameter to your nodes
  • load control capabilities
  • rate limiting capabilities
  • monitoring

Maven Project Setup

We provide a starter on our github repository

Make your code part of a maven project. The pom.xml is (typically):

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.thales.punch</groupId>
    <artifactId>punch-storm-node-starter-kit</artifactId>
    <packaging>jar</packaging>
    <version>1.0-SNAPSHOT</version>
    <name>Punch User Node</name>
    <url>http://maven.apache.org</url>
    <properties>
        <storm.version>2.3.0</storm.version>
        <junit.version>4.12</junit.version>
        <punch.version>6.4.5-SNAPSHOT</punch.version>
        <jackson.version>2.10.1</jackson.version>
        <maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.github.punchplatform</groupId>
            <artifactId>punch-api</artifactId>
            <version>${punch.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.github.punchplatform</groupId>
            <artifactId>punch-storm-api</artifactId>
            <version>${punch.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>${jackson.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>${jackson.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-client</artifactId>
            <version>${storm.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven-compiler-plugin.version}</version>
                <configuration>
                    <encoding>UTF-8</encoding>
                    <showDeprecation>true</showDeprecation>
                    <showWarnings>true</showWarnings>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Tip

Your jar should include only your dependencies, others (punch related) are provided by your platform. For instance Punch and storm dependencies are provided by your platform.

Once compiled with maven install command, it will generate a target/punch-storm-node-starter-kit-${VERSION}-jar-with-dependencies.jar.

Input Node

Here is a simple input node that will publish a hello {username} string at high rate.

package org.thales.punch.storm.node.starter.kit;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.punch.api.node.PunchNode;
import com.github.punch.api.storm.nodes.PunchInputNode;

/**
 * An example of a custom storm Input Node
 * 
 * @author Punch Team
 *
 */
@PunchNode(name = "input_node", version = "1.0.0")
public class InputNode extends PunchInputNode {

  private static final long serialVersionUID = 1L;

  /**
  Specific name to display.
  */
  @JsonProperty(value = "user_name", required = true)
  public String username;

  String message;

  @Override
  public void onOpen() {
      super.onOpen();
      message = "Hello "+username;
      getLogger().info("open node with message={}", message);  
  }

  @Override
  public void nextTuple() {
      super.nextTuple();
      // emit only on user defined streams
      emitDataTuple(message);
  }

  @Override
  public void onClose() {
      super.onClose();
      // do something before the node exit
  }

}

Processing Node

Here is a simple processing node that forwards its received input to downstream nodes. For the sake of the example it takes a single boolean configuration parameter. If set to false, the data is simply discarded.

package org.thales.punch.storm.node.starter.kit;

import org.apache.storm.tuple.Tuple;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.punch.api.node.PunchNode;
import com.github.punch.api.storm.nodes.PunchProcessingNode;

/**
 * 
 * An example of Storm processing node
 * 
 * @author Punch Team
 *
 */
@PunchNode(name = "processing_node", version = "1.0.0")
public class ProcessingNode extends PunchProcessingNode {

  private static final long serialVersionUID = 1L;

  /**
    Enable or disable processing.
    default: false
   */
  @JsonProperty(value = "can_process")
  public Boolean canProcess = false;


  @Override
   public void prepare() {
      super.prepare();
      getLogger().info("message=\"Processing Node ready. Can process data tuples ? {}\"", canProcess);
  }

  @Override
  public void process(Tuple input) {
    // Metric Tuple
    if(isMetricTuple(input)){
        enrichAndForwardMetricTuple(input); // enrich and forward on _ppf_metrics.
        return;
    }

    // Error Tuple
    if(isErrorTuple(input)){
        forwardErrorTuple(input); // processing. Here there is no processing. Incoming tuple is forwarded on _ppf_errors.
        return;
    }

    // Data Tuple
    if(isDataTuple(input)){
        if(canProcess.booleanValue()){
            forwardDataTuple(input); // processing. Here there is no processing. Incoming tuple is forwarded on user defined streams.
        }
    }
  }

}
Notice that you can handle Metric and Error Tuples in addition to your Data Tuples. In this example : - if the input Tuple is a Metric Tuple (ie. coming from reserved stream _ppf_metrics), the node will add its latency record to the input Tuple and forward it on _ppf_metrics. - if the input Tuple is an Error Tuple, (ie. coming from reserved stream _ppf_errors), the node will emit the output Tuple on _ppf_errors. - if the input Tuple is a Data Tuple, (ie. coming from one of your subscribed streams), the node will emit the output Tuple on all of your published streams.

You can implement your own methods to process input Tuples and forward output Tuples.

Making your node available to Kibana Punchline Editor

Syntax

A json or hjson valid file should exist for each custom node under:

$PUNCHPLATFORM_INSTALL_DIR/extresource/storm

For instance, if we were to define a custom node called syslog_bolt, the json file will look as follows:

{
  "name": "syslog_bolt",
  "types": [
    "Object",
    "INode"
  ],
  "properties": {
    "summary": {
      "default": "",
      "type": "String",
      "required": false,
      "context_type": null,
      "advanced": false
    },
    "description": {
      "default": "",
      "type": "String",
      "required": false,
      "context_type": "markdown",
      "advanced": false
    },
    "verbose": {
      "default": false,
      "type": "Boolean",
      "required": false,
      "context_type": null,
      "advanced": false
    },
    "settings": {
      "acked": {
        "default": false,
        "type": "Boolean",
        "required": true,
        "context_type": null,
        "advanced": false
      },
      "destination": {
        "default": null,
        "type": "List<Object>",
        "required": true,
        "context_type": null,
        "advanced": false
      }
    },
    "publish": [],
    "subscribe": [
      {
        "type": "Singleton<Dataset<Row>>",
        "alias": null
      }
    ]
  },
  "group": "OutputNodeStream",
  "executors": [
    "storm"
  ],
  "deprecated": false
}

Making your node availabe to Punchline Editor

punchplatform-inspect-node.sh can be used to generate a json file(s) whose contents are meta-information of your self-made node.

A usage example is as follows:

EXTNODES_STORM=$PUNCHPLATFORM_INSTALL_DIR/extresource/storm
MYNODE_JAR=$MYWORKDIR/mynode.jar
RUNTIME=storm
PACKAGES=org.something.package
punchplatform-inspect-node.sh \
    --packages $PACKAGES \
    --runtime $RUNTIME \
    --jar $MYNODE_JAR \
    --output-dir $EXTNODES_STORM

Checking if properly installed

On a standalone release, you can use this command:

MY_NODE=your_node_name RUNTIME=storm punchplatform-scanner --runtime storm | grep $MY_NODE

Deploying your nodes

Installing your nodes

Standalone mode

For the standalone, put your jars in $PUNCHPLATFORM_INSTALL_DIR/extlib/storm/.

Deployed mode

For a deployed mode, refer to this documentation

Using your nodes

You can now refer to your nodes in the punchline file. Then use the corresponding node:

  • third_party_input node type for input nodes (Storm spouts)
  • third_party_node node type for processing nodes (Storm bolts).

The two important punchline new properties are:

  • additional_jars defined at the level of the punchline, it provides one or several jars. These are the jars generated by your project.

  • class defined at the level of your custom nodes. It must refer to the main class in charge of creating your node.

For example, the following punchline use both nodes and print output to the console. Kafka reporter will collect latency records from reserved stream _ppf_metrics.

---
version: '6.0'
runtime: storm
dag:
- type: third_party_input
  component: input_node
  package: org.thales.punch.storm.node.starter.kit.InputNode
  settings:
    username: alice
    load_control: rate
    load_control.rate: 1
    self_monitoring.activation: true
    self_monitoring.period: 10
  publish:
  - stream: logs
    fields:
    - log
  - stream: _ppf_metrics
    fields:
    - _ppf_latency
- type: third_party_node
  component: processing_node
  package: org.thales.punch.storm.node.starter.kit.ProcessingNode
  settings:
    can_process: true
  subscribe:
  - component: input_node
    stream: logs
  - component: input_node
    stream: _ppf_metrics
  publish:
  - stream: logs
    fields:
    - log
  - stream: _ppf_metrics
    fields:
    - _ppf_latency
- type: punchlet_node
  component: show
  settings:
    punchlet_code: "{ print(root); }"
  subscribe:
  - component: processing_node
    stream: logs
  - component: processing_node
    stream: _ppf_metrics
  publish: []
metrics:
  reporters:
  - type: kafka
settings:
  additional_jars:
  - punch-storm-starter-kit-1.0-SNAPSHOT-jar-with-dependencies.jar

Tip

Note the use of the load_control punch parameters. It makes your input node publish one item per second.

You can now start your punchline as usual, using either the channelctl or punchlinectl command to respectively submit your punchline to a storm cluster or to execute it in the foreground using the punch single-process lightweight engine.

punchlinectl $PUNCHPLATFORM_CONF_DIR/tenant/mytenant/channels/mychannel/punchline.hjson

Testing your nodes

There are multiple tests that you should write for your custom nodes. There is a testing framework that you can use in the API. It provides you multiple helpers adapted to each type of tests. You only have to extend test base classes from com.github.punch.api.storm.test to benefit from those tools.

  • Unit tests : extends PunchInputUnitTest or PunchProcessingUnitTest.
  • Component Integration tests : extends PunchIntegrationTest.
  • System Integration tests : there are no helpers in the API as the best way to do so is running a custom topology in Standalone & Punchbox.

Unit Tests with PunchUnitTest API

Unit tests check that each method from your node works as expected.

You should implement most of the logic in your node unit tests. The methods to implement are inherited from PunchInputUnitTest or PunchProcessingUnitTest. It provide helper to build your nodes and to provide them as Stream of Arguments for Parameterized Tests. See Junit Parameterized Tests.

Build your node PunchUnitTest API provides a PunchNodeBuilder. Implement it to configure your node as you would in a topology. Add some builder methods if you need to mock some components in your node. It returns a Mockito Spy version of your node that gives you the opportunity to watch interactions.

Generate some Tuples PunchUnitTest API provides a TupleMocker. You can create your own TestTuples from this class. TestTuples implement Storm Tuple interface. They are just easier to generate and manipulate in your tests. TupleMocker also provide Punch-specific tuples, such as Error Tuple, Metric Tuple and Tick Tuple.

Test different configurations PunchUnitTest API has a getNodesAsArguments(List nodes) helper to provide a list of nodes as Stream of Arguments. This makes it possible to use JUnit5 Parameterized Tests to test different configurations.

Here is an example on how use Parameterized Tests for your nodes :

    static void getInvalidNodes() {
        nodes.add((SmtpInput) new PunchNodeBuilder(new SmtpInput())
                .setNodeParameter("port", 28975928) // invalid port
                .setNodeParameter("host", "127.0.0.1")
                .build());

        nodes.add((SmtpInput) new PunchNodeBuilder(new SmtpInput())
                .setNodeParameter("port", 2525)
                .setNodeParameter("hoster", "server87") // invalid parameter
                .build());

        return getNodesAsArguments(nodes);
    }

    @ParameterizedTest
    @MethodSource("getInvalidNodes")
    void testInvalidParam(SmtpInput invalidParamNode) {
        assertThrows(Exception.class, invalidParamNode::onOpen);
    }

Component Integration Tests with PunchIntegrationTest API.

Component Integration tests check that your node is working in a distributed realtime engine.

The PunchIntegrationTest API provide you tools to launch your node in Storm Local Cluster. You have to build your node, include it in a Storm Topology, and submit it to Storm. The API uses a dockerised Redis to retrieve emitted Tuples in Storm. Then, you can retrieve Tuples and test them with methods provided in the API.

Requirements PunchIntegrationTest API leverages TestContainers. You need to install Docker on your development environment to be able to use this library.

Building your node PunchIntegrationTest API provides a PunchNodeBuilder. Implement it to configure your node as you would in a topology.

Build a Topology You can build a regular Storm Topology. There some Input & Processing Nodes available in the API. Use them to publish or subscribe stream to or from your node.

Run topology Use runInStormLocalCluster(TopologyBuilder tb, int nbTuples) to run your test topology in Storm. As a topology never ends by design, you have to specify the number of Tuples that should be acked and failed before the topology is killed. This is why we have nbTuples parameter.

You can also add callbacks when the topology is ready and when the topology is over with runInStormLocalCluster(TopologyBuilder topologyBuilder, int nbTuples, OnTopologyReadyCallback readyCallback, OnTopologyOverCallback overCallback)

Retrieve Tuples There are methods to retrieve emitted Tuples in the topology.

  • getInputTuples(String nodeName, String streamId) : get Tuples incoming into the node through this streamId.
  • getOutputTuples(String nodeName, String streamId) : get Tuples outgoing from the node through this streamId.
  • getAckedTuples(String nodeName, String streamId) : get Tuples incoming into the node through this streamId that were acked by the node.
  • getFailedTuples(String nodeName, String streamId) : get Tuples incoming into the node through this streamId that were failed by the node.

Each method returns a List of Maps. Each Map represents a Tuple with its fields as map-key and its values as map-value.

Here is an example on how to use PunchIntegrationTest with a PunchInputNode :

    @Test
    void testSmtpInput() {
        // Build Storm topology
        TopologyBuilder tb = new TopologyBuilder();
        tb.setSpout("smtp-input", new InputNodeCaptor(smtpInput)); // use InputNodeCaptor to capture Tuple in Redis.
        tb.setBolt("acker", new ProcessingNodeCaptor(failFirstTupleNode)) // FailFirstTuple is one of the nodes provided in the API.
                .shuffleGrouping("smtp-input", "emails");

        // Run in Storm Local Cluster. The callback make it possible to send emails only once topology is ready.
        runInStormLocalCluster(tb, mails.size(), () -> mails.forEach(mail -> {
            try {
                mail.send();
            } catch (UnsupportedEncodingException | MessagingException e) {
                log.error("message=\"Failed to send email.\" exception=\"{}\"", e.getMessage());
            }
        }), null);


        // Testing tuple generation
        assertEquals(3, getNbOutputTuples("smtp-input", "emails"));
        assertEquals(1, getNbFailedTuples("smtp-input", "emails"));
        assertEquals(2, getNbAckedTuples("smtp-input", "emails"));

        // Testing tuple content
        assertTrue(getOutputTuples("smtp-input", "emails")
                .get(0).get("email").contains("Mail1 SMTP Email Test"));
    }

System Integration with Standalone and Punchbox

System Integration check that the node is working in a production-like environment.

The best way to make such tests is by running a topology of your own in Standalone and Punchbox. These tests are longer, and this is why we recommend that you use the other tests during your Node development.

Provided libraries

The following libraries are provided by punch-api and punch-storm-api artifacts. You must not override their versions in your pom.xml.

sh asm : 5.0.3 classmate : 1.3.4 hibernate-validator : 6.1.6.Final jackson-annotations : 2.10.1 jackson-core : 2.10.1 jackson-databind : 2.10.1 jakarta.activation : 1.2.1 jakarta.activation-api : 1.2.1 jakarta.validation-api : 2.0.2 jakarta.xml.bind-api : 2.3.2 javax.annotation-api : 1.3.2 jboss-logging : 3.3.2.Final kryo : 3.0.3 log4j-api : 2.17.1 log4j-core : 2.17.1 log4j-over-slf4j : 1.7.26 log4j-slf4j-impl : 2.11.2 metrics-core : 4.1.0 metrics-graphite : 3.2.6 metrics-jvm : 4.1.0 minlog : 1.3.0 objenesis : 2.1 punch-api : 6.4.5-SNAPSHOT reflectasm : 1.10.1 slf4j-api : 1.7.26 storm-client : 2.3.0 storm-shaded-deps : 2.3.0