Skip to content

User Nodes

Abstract

In addition to using punch nodes, you may want to bring your own and insert them into a storm punchline. You benefit from the power of the Storm API, giving you full control over the data forwarding, ackownledgement, enriching etc..

Code you Node

To write a custom storm node, you leverage the storm API as well as an additional punch API that makes it a lot easier to focus only on your business logic. The punch API also provides you with several goodies:

  • simple yet powerful way to provide configuration parameter to your nodes.
  • load control capabilities
  • rate limiting capabilities
  • monitoring

Input Node

Here is a simple input node that will publish a "hello world" string at high rate.

package org.thales.punch.samples.nodes.storm;

import java.util.Arrays;

import org.thales.punch.libraries.storm.api.BaseInputNode;
import org.thales.punch.libraries.storm.api.NodeSettings;
import org.thales.punch.libraries.storm.api.StreamDeclaration;

/**
 * Here isan extra simple input node that publishes "hello world"
 * to all its configured output streams.
 * 
 * @author dimi
 */
public class UserInputNode  extends BaseInputNode {

  private static final long serialVersionUID = 1L;
  public UserInputNode(NodeSettings settings) {
    super(settings);
  }

  @Override
  public void nextTuple() {
    super.nextTuple();
    for (StreamDeclaration stream : getPublishedStreams()) {
      collector.emit(stream.getStreamId(), Arrays.asList("hello world"));
    }
  }

}
To make that input node available to a punchline, you must provide a factory class. Here it is :

package org.thales.punch.samples.nodes.storm;

import org.thales.punch.platform.api.IPunchPlatformProperties;
import org.thales.punch.libraries.storm.api.NodeSettings;
import org.thales.punch.libraries.storm.api.ISpout;
import org.thales.punch.libraries.storm.api.ISpoutFactory;
import org.thales.punch.libraries.storm.api.ITopologySettings;

/**
 * Your factory class to create your punch storm input node.
 */
public class UserInputNodeFactory  implements ISpoutFactory {

  @Override
  public ISpout createSpout(NodeSettings config, ITopologySettings topoConfig,
      IPunchPlatformProperties punchplatformConfig) {
    return new UserInputNode(config);
  }
}

Processing Node

Here is a simple processing node that forwards its received input to downstream nodes. For the sake of the example it takes a single boolean configuration parameter. If set to false, the data is simply discarded.

package org.thales.punch.samples.nodes.storm;

import org.apache.storm.tuple.Tuple;
import org.thales.punch.libraries.storm.api.NodeSettings;
import org.thales.punch.libraries.storm.api.BaseProcessingNode;

/**
 * Here is a simple example that simply receive and forwards the data.
 * @author dimi
 *
 */
public class UserProcessingNode  extends BaseProcessingNode {

  private static final long serialVersionUID = 1L;
  public UserProcessingNode(NodeSettings settings) {
    super(settings);
  }

  @Override
  public void execute(Tuple input) {
    // Check what our configuration tells us to do
    if (Boolean.TRUE.equals(boltSettings.getAsBoolean("forward", false))) {
      collector.emit(input.getSourceStreamId(), input, input.getValues());
    } 
    collector.ack(input);
  }
}
Next you must provide a factory class to create your node. Here it is:

package org.thales.punch.samples.nodes.storm;

import org.apache.storm.tuple.Tuple;
import org.thales.punch.libraries.storm.api.IBoltFactory;
import org.thales.punch.libraries.storm.api.NodeSettings;

/**
 * Here is a simple example that simply receive and forwards the data.
 * @author dimi
 *
 */
public class UserProcessingNode  extends IBoltFactory {

  private static final long serialVersionUID = 1L;
  public UserProcessingNode(NodeSettings settings) {
    super(settings);
  }

  @Override
  public void execute(Tuple input) {
    // Check what our configuration tells us to do
    if (Boolean.TRUE.equals(nodeSettings.getAsBoolean("forward", false))) {
      collector.emit(input.getSourceStreamId(), input, input.getValues());
    } 
    collector.ack(input);
  }
}
As you can see it is quite simple as well.

Maven Project Setup

Make your code part of a maven project. The pom.xml is (typically):

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>org.thales.punch</groupId>
  <artifactId>punchplatform-user-nodes</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>Punch User Node</name>
  <url>http://maven.apache.org</url>

  <properties>
    <java.version>1.8</java.version>
    <storm.version>2.1.0</storm.version>
    <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
    <punch.version>6.0.1-SNAPSHOT</punch.version>
  </properties>

  <dependencies>

    <dependency>
      <groupId>org.thales.punch</groupId>
      <artifactId>punchplatform-bootstrap-lib</artifactId>
      <version>${punch.version}</version>
       <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>org.thales.punch</groupId>
      <artifactId>punchplatform-storm-api</artifactId>
      <version>${punch.version}</version>
       <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-core</artifactId>
      <version>${storm.version}</version>
      <scope>provided</scope>
    </dependency>

  </dependencies>

   <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>${maven-compiler-plugin.version}</version>
        <configuration>
          <encoding>UTF-8</encoding>
          <showDeprecation>true</showDeprecation>
          <showWarnings>true</showWarnings>
          <source>${java.version}</source>
          <target>${java.version}</target>
        </configuration>
      </plugin>
    </plugins>
  </build>

</project>

Tip

Note that you need not generate a fat jar. Punch and storm dependencies are provided by your platform. What you need to generate is a jar containing only your dependencies.

Once compiled it will generate a target/punchplatform-user-nodes-1.0-SNAPSHOT.jar. Simply create a folder in $PUNCHPLATFORM_INSTALL_DIR/plugins, and put you jar there or in a subdirectory

Using you nodes

You can now refer to you nodes in the punchline file. Use the third_party_input node type for input nodes (i.e. the ones that correspond to Storm spouts), and use instead use the third_party_node dag node type for processing nodes (i.e. the ones that correspond to Storm bolts).

The two important punchline new properties are:

  • additional_jars : defined at the level of the punchline, it provides one or several jars. These are the jars generated by your project(s).
  • class : defined at the level of your custom nodes. It must refer to the factory class in charge of creating your node.

Important

Your custom jars must be installed in the $PUNCHPLATFORM_INSTALL_DIR/plugins folder.

Input Node

{
  "tenant" : "mytenant",
  "channel" : "third_party",
  "settings": {
    "additional_jars" : ["punchplatform-user-nodes-1.0-SNAPSHOT.jar"]
  },
  "dag" : [
    {
      "type": "third_party_input",
      "class" : "org.thales.punch.samples.nodes.storm.UserInputNodeFactory",
      "component" : "input",
      "settings": {
        "load_control" : "rate",
        "load_control.rate" : 1
      },
      "publish": [
        {
            "stream": "logs",
            "fields": [
              "log"
            ]
          }
        ]
    }
    ...
  ]
}

Tip

Note the use of the load_control punch parameters. It makes your input node publish one item per second.

Processing Node

{
  "tenant" : "mytenant",
  "channel" : "third_party",
  "settings": {
    "additional_jars" : ["punchplatform-user-nodes-1.0-SNAPSHOT.jar"]
  },  
  "dag" : [
    ...
    {
      "type": "third_party_node",
      "class" : "org.thales.punch.samples.nodes.storm.UserProcessingNodeFactory",
      "settings": {
        "forward": true,
      },
      "subscribe": [
          {
            "component": "input",
            "stream": "logs"
          }
        ],
        "publish": [
          {
            "stream": "logs",
            "fields": [ ... ]
          }
        ]
      }
      ...
  ]
}

You can now start your punchline as usual, using either the channelCtl or punchlineCtl command to respectively submit your punchline to a storm cluster or to execute it it in foreground using the punch single-process lightweight engine.

Warning

User defined node support is not supported using Shiva. This feature is planned for releases 6.1 and higher.