Skip to content

Elastalert Custom Modules

Abstract

Targetted audience

Standard elastalert rules are not enough for satisfying your use-case.

Coding your custom Rules

Extending Elastalert Rules API, Alert API, Enhancement API and Rules Loader API enables you to write more specific business logic to answer your problems.

PEX Project setup

We provide a starter on our github repository

setup.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


from setuptools import setup, find_packages

setup(
    name="punch_plugins",
    version="1.0",
    packages=find_packages(),
    include_package_data=True,
    author="punchplatform",
    author_email="contact@punchplatform.com",
    description="boilerplate for elastaler custom alerter, rule type, etc...",
    python_requires='>=3.6',
    install_requires=[
        "pex==2.1.6",  # this is important do not remove
        "requests==2.24.0",  # this is important do not remove
        "pymongo",
    ]
)

python package hierarchy

├── example_configs
│   ├── alert
│   │   ├── config.yaml
│   │   └── rule.yaml
│   ├── enhancement
│   │   ├── config.yaml
│   │   └── rule.yaml
│   ├── rule
│   │   ├── config.yaml
│   │   └── rule.yaml
│   └── rule_loader
│       ├── config.yaml
│       └── rule.yaml
├── Makefile
├── punch_plugins
│   ├── alerters
│   │   ├── __init__.py
│   │   └── my_use_case
│   │       ├── complex_alert.py
│   │       └── __init__.py
│   ├── enhancements
│   │   ├── __init__.py
│   │   └── my_use_case
│   │       ├── complex_enhancement.py
│   │       └── __init__.py
│   ├── __init__.py
│   ├── rules
│   │   ├── __init__.py
│   │   └── my_use_case
│   │       ├── complex_rule.py
│   │       └── __init__.py
│   └── rules_loader
│       ├── __init__.py
│       └── my_use_case
│           ├── complex_rule_loader.py
│           └── __init__.py
├── README.md
└── setup.py

Rule Module

A simple custom rule module taken for Elastalert official documentation:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


import dateutil.parser

from elastalert.ruletypes import RuleType

# elastalert.util includes useful utility functions
# such as converting from timestamp to datetime obj
from elastalert.util import ts_to_dt  # NOQA


class ComplexRule(RuleType):

    # By setting required_options to a set of strings
    # You can ensure that the rule config file specifies all
    # of the options. Otherwise, ElastAlert will throw an exception
    # when trying to load the rule.
    required_options = set(["time_start", "time_end", "usernames"])

    # add_data will be called each time Elasticsearch is queried.
    # data is a list of documents from Elasticsearch, sorted by timestamp,
    # including all the fields that the config specifies with "include"
    def add_data(self, data):
        for document in data:

            # To access config options, use self.rules
            if document["username"] in self.rules["usernames"]:

                # Convert the timestamp to a time object
                login_time = document["@timestamp"].time()

                # Convert time_start and time_end to time objects
                time_start = dateutil.parser.parse(self.rules["time_start"]).time()
                time_end = dateutil.parser.parse(self.rules["time_end"]).time()

                # If the time falls between start and end
                if login_time > time_start and login_time < time_end:

                    # To add a match, use self.add_match
                    self.add_match(document)

    # The results of get_match_str will appear in the alert text
    def get_match_str(self, match):
        return "%s logged in between %s and %s" % (
            match["username"],
            self.rules["time_start"],
            self.rules["time_end"],
        )

    # garbage_collect is called indicating that ElastAlert has already been run up to timestamp
    # It is useful for knowing that there were no query results from Elasticsearch because
    # add_data will not be called with an empty list
    def garbage_collect(self, timestamp):
        pass

Alert Module

A simple custom alert module taken for Elastalert official documentation:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


from elastalert.alerts import Alerter, BasicMatchString

class ComplexAlert(Alerter):

    # By setting required_options to a set of strings
    # You can ensure that the rule config file specifies all
    # of the options. Otherwise, ElastAlert will throw an exception
    # when trying to load the rule.
    required_options = set(['output_file_path'])

    # Alert is called
    def alert(self, matches):

        # Matches is a list of match dictionaries.
        # It contains more than one match when the alert has
        # the aggregation option set
        for match in matches:

            # Config options can be accessed with self.rule
            with open(self.rule['output_file_path'], "a") as output_file:

                # basic_match_string will transform the match into the default
                # human readable string format
                match_string = str(BasicMatchString(self.rule, match))

                output_file.write(match_string)

    # get_info is called after an alert is sent to get data that is written back
    # to Elasticsearch in the field "alert_info"
    # It should return a dict of information relevant to what the alert does
    def get_info(self):
        return {'type': 'Awesome Alerter',
                'output_file': self.rule['output_file_path']}

Enhancement Module

A simple custom enhancement module taken for Elastalert official documentation:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


from elastalert.enhancements import BaseEnhancement

class ComplexEnhancement(BaseEnhancement):

    # The enhancement is run against every match
    # The match is passed to the process function where it can be modified in any way
    # ElastAlert will do this for each enhancement linked to a rule
    def process(self, match):
        if 'domain' in match:
            url = "http://who.is/whois/%s" % (match['domain'])
            match['domain_whois_link'] = url

Rule Loader Module

A simple custom rule loader module taken for Elastalert official documentation:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


from pymongo import MongoClient
from elastalert.loaders import RulesLoader
import yaml

class ComplexRuleLoader(RulesLoader):
    def __init__(self, conf):
        super(ComplexRuleLoader, self).__init__(conf)
        self.client = MongoClient(conf['mongo_url'])
        self.db = self.client[conf['mongo_db']]
        self.cache = {}

    def get_names(self, conf, use_rule=None):
        if use_rule:
            return [use_rule]

        rules = []
        self.cache = {}
        for rule in self.db.rules.find():
            self.cache[rule['name']] = yaml.load(rule['yaml'])
            rules.append(rule['name'])

        return rules

    def get_hashes(self, conf, use_rule=None):
        if use_rule:
            return [use_rule]

        hashes = {}
        self.cache = {}
        for rule in self.db.rules.find():
            self.cache[rule['name']] = rule['yaml']
            hashes[rule['name']] = rule['hash']

        return hashes

    def get_yaml(self, rule):
        if rule in self.cache:
            return self.cache[rule]

        self.cache[rule] = yaml.load(self.db.rules.find_one({'name': rule})['yaml'])
        return self.cache[rule]

Deploying your custom module

Installing your module

Standalone mode

For the standalone, put your pexs in $PUNCHPLATFORM_INSTALL_DIR/extlib/elastalert/.

Deployed mode

For a deployed mode, refer to this documentation

Using your custom modules

config.yaml

es_host: localhost
es_port: 9200
writeback_index: elastalert
run_every:
  minutes: 2
buffer_time:
  minutes: 1
rules_folder: examples

rule.yaml

es_host: localhost
es_port: 9200
name: "Punch Example Rule"
type: "punch_plugins.rules.my_use_case.complex_rule.ComplexRule"
index: logstash-*
num_events: 50
timeframe:
    hours: 4
filter:
- term:
    some_field: "some_value"
alert:
- "email"
email:
- "contact@punchplatform.com"
time_start: "20:00"
usernames:
- "admin"
- "userXYZ"
- "foobaz"
time_end: "24:00"

Execution

WORK_DIR=$(pwd)

ls $WORK_DIR
example_configs/rule/config.yaml
example_configs/rule/rule.yaml

punchplatform-elastalert.sh \
                --start-foreground \
                --config $WORK_DIR/example_configs/rule/config.yaml \
                --rule example_configs/rule/rule.yaml \
                --additional-pex custom_rule-1.0.0.pex