Java Storm Custom Node¶
Abstract
In addition to using punch nodes, you may want to bring your own and insert them into a storm punchline. You benefit from the power of the Storm API, giving you full control over the data forwarding, ackownledgement, enriching etc..
Coding your Node¶
To write a custom storm node, you leverage the storm API as well as an additional punch API that makes it a lot easier to focus only on your business logic. The punch API also provides you with several goodies:
- simple yet powerful way to provide configuration parameter to your nodes
- load control capabilities
- rate limiting capabilities
- monitoring
Maven Project Setup¶
We provide a starter on our github repository
Make your code part of a maven project. The pom.xml
is (typically):
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.thales.punch</groupId>
<artifactId>punch-storm-node-starter-kit</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>Punch User Node</name>
<url>http://maven.apache.org</url>
<properties>
<storm.version>2.3.0</storm.version>
<junit.version>4.12</junit.version>
<punch.version>6.4.5</punch.version>
<jackson.version>2.10.1</jackson.version>
<maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.github.punchplatform</groupId>
<artifactId>punch-api</artifactId>
<version>${punch.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.punchplatform</groupId>
<artifactId>punch-storm-api</artifactId>
<version>${punch.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-client</artifactId>
<version>${storm.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<encoding>UTF-8</encoding>
<showDeprecation>true</showDeprecation>
<showWarnings>true</showWarnings>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Tip
Your jar should include only your dependencies, others (punch related) are provided by your platform. For instance Punch and storm dependencies are provided
by your platform.
Once compiled with maven install
command, it will generate a target/punch-storm-node-starter-kit-${VERSION}-jar-with-dependencies.jar
.
Input Node¶
Here is a simple input node that will publish a hello {username}
string at high rate.
package org.thales.punch.storm.node.starter.kit;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.punch.api.node.PunchNode;
import com.github.punch.api.storm.nodes.PunchInputNode;
/**
* An example of a custom storm Input Node
*
* @author Punch Team
*
*/
@PunchNode(name = "input_node", version = "1.0.0")
public class InputNode extends PunchInputNode {
private static final long serialVersionUID = 1L;
/**
Specific name to display.
*/
@JsonProperty(value = "user_name", required = true)
public String username;
String message;
@Override
public void onOpen() {
super.onOpen();
message = "Hello "+username;
getLogger().info("open node with message={}", message);
}
@Override
public void nextTuple() {
super.nextTuple();
// emit only on user defined streams
emitDataTuple(message);
}
@Override
public void onClose() {
super.onClose();
// do something before the node exit
}
}
Processing Node¶
Here is a simple processing node that forwards its received input to downstream nodes.
For the sake of the example it takes a single boolean configuration parameter.
If set to false
, the data is simply discarded.
package org.thales.punch.storm.node.starter.kit;
import org.apache.storm.tuple.Tuple;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.punch.api.node.PunchNode;
import com.github.punch.api.storm.nodes.PunchProcessingNode;
/**
*
* An example of Storm processing node
*
* @author Punch Team
*
*/
@PunchNode(name = "processing_node", version = "1.0.0")
public class ProcessingNode extends PunchProcessingNode {
private static final long serialVersionUID = 1L;
/**
Enable or disable processing.
default: false
*/
@JsonProperty(value = "can_process")
public Boolean canProcess = false;
@Override
public void prepare() {
super.prepare();
getLogger().info("message=\"Processing Node ready. Can process data tuples ? {}\"", canProcess);
}
@Override
public void process(Tuple input) {
// Metric Tuple
if(isMetricTuple(input)){
enrichAndForwardMetricTuple(input); // enrich and forward on _ppf_metrics.
return;
}
// Error Tuple
if(isErrorTuple(input)){
forwardErrorTuple(input); // processing. Here there is no processing. Incoming tuple is forwarded on _ppf_errors.
return;
}
// Data Tuple
if(isDataTuple(input)){
if(canProcess.booleanValue()){
forwardDataTuple(input); // processing. Here there is no processing. Incoming tuple is forwarded on user defined streams.
}
}
}
}
_ppf_metrics
),
the node will add its latency record to the input Tuple and forward it on _ppf_metrics
.
- if the input Tuple is an Error Tuple, (ie. coming from reserved stream _ppf_errors
),
the node will emit the output Tuple on _ppf_errors
.
- if the input Tuple is a Data Tuple, (ie. coming from one of your subscribed streams),
the node will emit the output Tuple on all of your published streams.
You can implement your own methods to process input Tuples and forward output Tuples.
Making your node available to Kibana Punchline Editor¶
Syntax¶
A json or hjson valid file should exist for each custom node under:
$PUNCHPLATFORM_INSTALL_DIR/extresource/storm
For instance, if we were to define a custom node called syslog_bolt
, the json file will look as follows:
{
"name": "syslog_bolt",
"types": [
"Object",
"INode"
],
"properties": {
"summary": {
"default": "",
"type": "String",
"required": false,
"context_type": null,
"advanced": false
},
"description": {
"default": "",
"type": "String",
"required": false,
"context_type": "markdown",
"advanced": false
},
"verbose": {
"default": false,
"type": "Boolean",
"required": false,
"context_type": null,
"advanced": false
},
"settings": {
"acked": {
"default": false,
"type": "Boolean",
"required": true,
"context_type": null,
"advanced": false
},
"destination": {
"default": null,
"type": "List<Object>",
"required": true,
"context_type": null,
"advanced": false
}
},
"publish": [],
"subscribe": [
{
"type": "Singleton<Dataset<Row>>",
"alias": null
}
]
},
"group": "OutputNodeStream",
"executors": [
"storm"
],
"deprecated": false
}
Making your node availabe to Punchline Editor¶
punchplatform-inspect-node.sh
can be used to generate a json file(s) whose contents are meta-information of your self-made node.
A usage example is as follows:
EXTNODES_STORM=$PUNCHPLATFORM_INSTALL_DIR/extresource/storm
MYNODE_JAR=$MYWORKDIR/mynode.jar
RUNTIME=storm
PACKAGES=org.something.package
punchplatform-inspect-node.sh \
--packages $PACKAGES \
--runtime $RUNTIME \
--jar $MYNODE_JAR \
--output-dir $EXTNODES_STORM
Checking if properly installed
On a standalone release, you can use this command:
MY_NODE=your_node_name RUNTIME=storm punchplatform-scanner --runtime storm | grep $MY_NODE
Deploying your nodes¶
Installing your nodes¶
Standalone mode¶
For the standalone, put your jars in $PUNCHPLATFORM_INSTALL_DIR/extlib/storm/
.
Deployed mode¶
For a deployed mode, refer to this documentation
Using your nodes¶
You can now refer to your nodes in the punchline file. Then use the corresponding node:
third_party_input
node type for input nodes (Storm spouts)third_party_node
node type for processing nodes (Storm bolts).
The two important punchline new properties are:
-
additional_jars
defined at the level of the punchline, it provides one or several jars. These are the jars generated by your project. -
class
defined at the level of your custom nodes. It must refer to the main class in charge of creating your node.
For example, the following punchline use both nodes and print output to the console.
Kafka reporter will collect latency records from reserved stream _ppf_metrics
.
---
version: '6.0'
runtime: storm
dag:
- type: third_party_input
component: input_node
package: org.thales.punch.storm.node.starter.kit.InputNode
settings:
username: alice
load_control: rate
load_control.rate: 1
self_monitoring.activation: true
self_monitoring.period: 10
publish:
- stream: logs
fields:
- log
- stream: _ppf_metrics
fields:
- _ppf_latency
- type: third_party_node
component: processing_node
package: org.thales.punch.storm.node.starter.kit.ProcessingNode
settings:
can_process: true
subscribe:
- component: input_node
stream: logs
- component: input_node
stream: _ppf_metrics
publish:
- stream: logs
fields:
- log
- stream: _ppf_metrics
fields:
- _ppf_latency
- type: punchlet_node
component: show
settings:
punchlet_code: "{ print(root); }"
subscribe:
- component: processing_node
stream: logs
- component: processing_node
stream: _ppf_metrics
publish: []
metrics:
reporters:
- type: kafka
settings:
additional_jars:
- punch-storm-starter-kit-1.0-SNAPSHOT-jar-with-dependencies.jar
Tip
Note the use of the load_control
punch parameters. It makes your input node publish one item per second.
You can now start your punchline as usual, using either the channelctl
or punchlinectl
command
to respectively submit your punchline to a storm cluster or to execute it in the foreground
using the punch single-process lightweight engine.
punchlinectl $PUNCHPLATFORM_CONF_DIR/tenant/mytenant/channels/mychannel/punchline.hjson
Testing your nodes¶
There are multiple tests that you should write for your custom nodes. There is a testing framework that you can use
in the API. It provides you multiple helpers adapted to each type of tests.
You only have to extend test base classes from com.github.punch.api.storm.test
to benefit from those tools.
- Unit tests : extends
PunchInputUnitTest
orPunchProcessingUnitTest
. - Component Integration tests : extends
PunchIntegrationTest
. - System Integration tests : there are no helpers in the API as the best way to do so is running a custom topology in Standalone & Punchbox.
Unit Tests with PunchUnitTest API¶
Unit tests check that each method from your node works as expected.
You should implement most of the logic in your node unit tests.
The methods to implement are inherited from PunchInputUnitTest
or PunchProcessingUnitTest
.
It provide helper to build your nodes and to provide them as Stream of Arguments for Parameterized Tests.
See Junit Parameterized Tests.
Build your node PunchUnitTest API provides a PunchNodeBuilder. Implement it to configure your node as you would in a topology. Add some builder methods if you need to mock some components in your node. It returns a Mockito Spy version of your node that gives you the opportunity to watch interactions.
Generate some Tuples PunchUnitTest API provides a TupleMocker. You can create your own TestTuples from this class. TestTuples implement Storm Tuple interface. They are just easier to generate and manipulate in your tests. TupleMocker also provide Punch-specific tuples, such as Error Tuple, Metric Tuple and Tick Tuple.
Test different configurations
PunchUnitTest API has a getNodesAsArguments(List nodes)
helper to provide a list of nodes as Stream of Arguments.
This makes it possible to use JUnit5 Parameterized Tests to test different configurations.
Here is an example on how use Parameterized Tests for your nodes :
static void getInvalidNodes() {
nodes.add((SmtpInput) new PunchNodeBuilder(new SmtpInput())
.setNodeParameter("port", 28975928) // invalid port
.setNodeParameter("host", "127.0.0.1")
.build());
nodes.add((SmtpInput) new PunchNodeBuilder(new SmtpInput())
.setNodeParameter("port", 2525)
.setNodeParameter("hoster", "server87") // invalid parameter
.build());
return getNodesAsArguments(nodes);
}
@ParameterizedTest
@MethodSource("getInvalidNodes")
void testInvalidParam(SmtpInput invalidParamNode) {
assertThrows(Exception.class, invalidParamNode::onOpen);
}
Component Integration Tests with PunchIntegrationTest API.¶
Component Integration tests check that your node is working in a distributed realtime engine.
The PunchIntegrationTest API provide you tools to launch your node in Storm Local Cluster. You have to build your node, include it in a Storm Topology, and submit it to Storm. The API uses a dockerised Redis to retrieve emitted Tuples in Storm. Then, you can retrieve Tuples and test them with methods provided in the API.
Requirements PunchIntegrationTest API leverages TestContainers. You need to install Docker on your development environment to be able to use this library.
Building your node PunchIntegrationTest API provides a PunchNodeBuilder. Implement it to configure your node as you would in a topology.
Build a Topology You can build a regular Storm Topology. There some Input & Processing Nodes available in the API. Use them to publish or subscribe stream to or from your node.
Run topology
Use runInStormLocalCluster(TopologyBuilder tb, int nbTuples)
to run your test topology in Storm.
As a topology never ends by design, you have to specify the number of Tuples that should be acked and failed
before the topology is killed. This is why we have nbTuples
parameter.
You can also add callbacks when the topology is ready and when the topology is over with
runInStormLocalCluster(TopologyBuilder topologyBuilder, int nbTuples, OnTopologyReadyCallback readyCallback, OnTopologyOverCallback overCallback)
Retrieve Tuples There are methods to retrieve emitted Tuples in the topology.
getInputTuples(String nodeName, String streamId)
: get Tuples incoming into the node through this streamId.getOutputTuples(String nodeName, String streamId)
: get Tuples outgoing from the node through this streamId.getAckedTuples(String nodeName, String streamId)
: get Tuples incoming into the node through this streamId that were acked by the node.getFailedTuples(String nodeName, String streamId)
: get Tuples incoming into the node through this streamId that were failed by the node.
Each method returns a List of Maps. Each Map represents a Tuple with its fields as map-key and its values as map-value.
Here is an example on how to use PunchIntegrationTest
with a PunchInputNode :
@Test
void testSmtpInput() {
// Build Storm topology
TopologyBuilder tb = new TopologyBuilder();
tb.setSpout("smtp-input", new InputNodeCaptor(smtpInput)); // use InputNodeCaptor to capture Tuple in Redis.
tb.setBolt("acker", new ProcessingNodeCaptor(failFirstTupleNode)) // FailFirstTuple is one of the nodes provided in the API.
.shuffleGrouping("smtp-input", "emails");
// Run in Storm Local Cluster. The callback make it possible to send emails only once topology is ready.
runInStormLocalCluster(tb, mails.size(), () -> mails.forEach(mail -> {
try {
mail.send();
} catch (UnsupportedEncodingException | MessagingException e) {
log.error("message=\"Failed to send email.\" exception=\"{}\"", e.getMessage());
}
}), null);
// Testing tuple generation
assertEquals(3, getNbOutputTuples("smtp-input", "emails"));
assertEquals(1, getNbFailedTuples("smtp-input", "emails"));
assertEquals(2, getNbAckedTuples("smtp-input", "emails"));
// Testing tuple content
assertTrue(getOutputTuples("smtp-input", "emails")
.get(0).get("email").contains("Mail1 SMTP Email Test"));
}
System Integration with Standalone and Punchbox¶
System Integration check that the node is working in a production-like environment.
The best way to make such tests is by running a topology of your own in Standalone and Punchbox. These tests are longer, and this is why we recommend that you use the other tests during your Node development.
Provided libraries¶
The following libraries are provided by punch-api
and punch-storm-api
artifacts.
You must not override their versions in your pom.xml
.
sh
asm : 5.0.3
classmate : 1.3.4
hibernate-validator : 6.1.6.Final
jackson-annotations : 2.10.1
jackson-core : 2.10.1
jackson-databind : 2.10.1
jakarta.activation : 1.2.1
jakarta.activation-api : 1.2.1
jakarta.validation-api : 2.0.2
jakarta.xml.bind-api : 2.3.2
javax.annotation-api : 1.3.2
jboss-logging : 3.3.2.Final
kryo : 3.0.3
log4j-api : 2.17.1
log4j-core : 2.17.1
log4j-over-slf4j : 1.7.26
log4j-slf4j-impl : 2.11.2
metrics-core : 4.1.0
metrics-graphite : 3.2.6
metrics-jvm : 4.1.0
minlog : 1.3.0
objenesis : 2.1
punch-api : 6.4.5
reflectasm : 1.10.1
slf4j-api : 1.7.26
storm-client : 2.3.0
storm-shaded-deps : 2.3.0