Skip to content

HOWTO Handle several technos in one topology

Why do that

In a particular context, when you have to handle several technologies in one channel. For example, when all your data flow enter on the same port number.

One punch Bolt to divide them all

In that case, the bad thing to do is to have one parser for all the technologies. The logs are different, with their own significance and so, their own normalization. Furthermore, the processing cost is bigger because each log will be process by one big parser.

So the good practice is to have one parser by technology and an other parser upstream in charge of dispatching the logs.

That's why we provide a efficient way to dispatch. The first punch_bolt will slightly parse the log in order to identify the technology and save it in an new field, for example in [logs][_log_type] The second punch_bolt will process the logs depending on the technology present in the [logs][_log_type] field.

Here an explanation diagram :

image

Example

Available in the standalone punch package :

{
  "tenant": "mytenant",
  "channel": "universal",
  "name": "single",
  "meta": {
    "tenant": "mytenant",
    "channel": "universal"
  },
  "dag": [
    {
      "type": "syslog_spout",
      "settings": {
        "listen": {
          "proto": "tcp",
          "host": "0.0.0.0",
          "port": 10000
        }
      },
      "storm_settings": {
        "executors": 1,
        "component": "syslog_spout_tcp",
        "publish": [
          {
            "stream": "logs",
            "fields": [
              "log",
              "_ppf_local_host",
              "_ppf_local_port",
              "_ppf_remote_host",
              "_ppf_remote_port",
              "_ppf_id",
              "_ppf_timestamp"
            ]
          }
        ]
      }
    }
  ],
  "bolts": [
    {
      "type": "punchlet_node",
      "settings": {
        "punchlet": [ "./autodiscovery.punch" ]
      },
      "storm_settings": {
        "component": "autodiscovery_bolt",
        "subscribe": [
          {
            "component": "syslog_spout_tcp",
            "stream": "logs"
          }
        ],
        "publish": [
          {
            "stream": "logs",
            "fields": [ 
              "log", 
              "_log_type", 
              "_ppf_local_host",
              "_ppf_local_port",
              "_ppf_remote_host",
              "_ppf_remote_port",
              "_ppf_id",
              "_ppf_timestamp"
            ]
          }
        ]
      }
    },
    {
      "type": "punch_bolt_dispatcher",
      "settings": {
        "dispatcher_field" : "_log_type",
        "dispatcher_map" : {
          "sourcefire" : {
            "punchlet" : [
              "./input.punch",
              "./parsing_syslog_header.punch",
              "standard/sourcefire/parsing.punch",
              "standard/common/geoip.punch"
            ]
          },
          "apache_httpd" : {
            "punchlet_json_resources" : [
              "standard/apache_httpd/http_codes.json"
              ,"standard/apache_httpd/taxonomy.json"
            ],
            "punchlet" : [
              "./input.punch",
              "./parsing_syslog_header.punch",
              "standard/apache_httpd/parsing.punch",
              "standard/apache_httpd/enrichment.punch",
              "standard/apache_httpd/normalization.punch",
              "standard/common/geoip.punch"
            ]
          },
          "stormshield_networksecurity" : {
            "punchlet": [
              "./input.punch",
              "./parsing_syslog_header.punch",
              "standard/stormshield_networksecurity/parsing.punch",
              "standard/common/geoip.punch"
            ]
          },
          "websense_web_security" : {
            "punchlet_json_resources": [
              "standard/websense_web_security/enrichment_category.json",
              "standard/websense_web_security/enrichment_disposition.json"
            ],
            "punchlet": [
              "./input.punch",
              "./parsing_syslog_header.punch",
              "standard/websense_web_security/parsing.punch",
              "standard/websense_web_security/enrichment.punch",
              "standard/common/geoip.punch"
            ]
          }
        }
      },

      "storm_settings": {
        "executors": 1,
        "component": "parser_bolt",
        "publish": [
          {
            "stream": "logs",
            "fields": [
              "raw_log",
              "log",
              "_ppf_id",
              "_ppf_timestamp",
              "es_index"
            ]
          },
          {
            "stream": "_ppf_errors",
            "fields": [
              "_ppf_error_message",
              "_ppf_error_document",
              "_ppf_id"
            ]
          }
        ],
        "subscribe": [
          {
            "component": "autodiscovery_bolt",
            "stream": "logs",
            "grouping": "localOrShuffle"
          }
        ]
      }
    },
    {
      "type": "elasticsearch_bolt",
      "settings": {
        "cluster_id": "es_search",
        "per_stream_settings": [
          {
            "stream": "logs",
            "index": {
              "type": "daily",
              "prefix": "mytenant-events-"
            },
            "document_json_field": "log",
            "document_id_field": "_ppf_id",
            "additional_document_value_fields": [
              {
                "type": "date",
                "document_field": "@timestamp",
                "format": "iso"
              }
            ]
          },
          {
            "stream": "_ppf_errors",
            "document_value_fields": [
              "_ppf_error_message",
              "_ppf_error_document",
              "_ppf_id"
            ],
            "index": {
              "type": "daily",
              "prefix": "mytenant-events-"
            },
            "document_id_field": "_ppf_id",
            "additional_document_value_fields": [
              {
                "type": "date",
                "document_field": "@timestamp",
                "format": "iso"
              }
            ]
          }
        ]
      },
      "storm_settings": {
        "executors": 1,
        "component": "elasticsearch_bolt",
        "subscribe": [
          {
            "component": "parser_bolt",
            "stream": "logs"
          },
          {
            "component": "parser_bolt",
            "stream": "_ppf_errors"
          }
        ]
      }
    }
  ],
  "metrics": {
    "reporters": [
      {
        "type": "elasticsearch",
        "cluster_name": "es_search"
      }
    ]
  },
  "storm_settings": {
    "metrics_consumers": [
      "org.apache.storm.metric.LoggingMetricsConsumer"
    ],
    "topology.builtin.metrics.bucket.size.secs": 30,
    "supervisor.monitor.frequency.secs": 60,
    "topology.max.spout.pending": 10000,
    "topology.enable.message.timeouts": true,
    "topology.message.timeout.secs": 30,
    "topology.worker.childopts": "-server -Xms128m -Xmx128m",
    "topology.receiver.buffer.size": 32,
    "topology.executor.receive.buffer.size": 16384,
    "topology.executor.send.buffer.size": 16384,
    "topology.transfer.buffer.size": 32,
    "topology.worker.shared.thread.pool.size": 4,
    "topology.disruptor.wait.strategy": "com.lmax.disruptor.BlockingWaitStrategy",
    "topology.spout.wait.strategy": "org.apache.storm.spout.SleepSpoutWaitStrategy",
    "topology.sleep.spout.wait.strategy.time.ms": 50,
    "topology.workers": 1
  }
}