Elastalert Custom Modules¶
Abstract
Targetted audience
Standard elastalert rules are not enough for satisfying your use-case.
Coding your custom Rules¶
Extending Elastalert Rules API, Alert API, Enhancement API and Rules Loader API enables you to write more specific business logic to answer your problems.
PEX Project setup¶
We provide a starter on our github repository
setup.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from setuptools import setup, find_packages
setup(
name="punch_plugins",
version="1.0",
packages=find_packages(),
include_package_data=True,
author="punchplatform",
author_email="contact@punchplatform.com",
description="boilerplate for elastaler custom alerter, rule type, etc...",
python_requires='>=3.6',
install_requires=[
"pex==2.1.6", # this is important do not remove
"requests==2.24.0", # this is important do not remove
"pymongo",
]
)
python package hierarchy
├── example_configs
│ ├── alert
│ │ ├── config.yaml
│ │ └── rule.yaml
│ ├── enhancement
│ │ ├── config.yaml
│ │ └── rule.yaml
│ ├── rule
│ │ ├── config.yaml
│ │ └── rule.yaml
│ └── rule_loader
│ ├── config.yaml
│ └── rule.yaml
├── Makefile
├── punch_plugins
│ ├── alerters
│ │ ├── __init__.py
│ │ └── my_use_case
│ │ ├── complex_alert.py
│ │ └── __init__.py
│ ├── enhancements
│ │ ├── __init__.py
│ │ └── my_use_case
│ │ ├── complex_enhancement.py
│ │ └── __init__.py
│ ├── __init__.py
│ ├── rules
│ │ ├── __init__.py
│ │ └── my_use_case
│ │ ├── complex_rule.py
│ │ └── __init__.py
│ └── rules_loader
│ ├── __init__.py
│ └── my_use_case
│ ├── complex_rule_loader.py
│ └── __init__.py
├── README.md
└── setup.py
Rule Module¶
A simple custom rule module taken for Elastalert official documentation:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import dateutil.parser
from elastalert.ruletypes import RuleType
# elastalert.util includes useful utility functions
# such as converting from timestamp to datetime obj
from elastalert.util import ts_to_dt # NOQA
class ComplexRule(RuleType):
# By setting required_options to a set of strings
# You can ensure that the rule config file specifies all
# of the options. Otherwise, ElastAlert will throw an exception
# when trying to load the rule.
required_options = set(["time_start", "time_end", "usernames"])
# add_data will be called each time Elasticsearch is queried.
# data is a list of documents from Elasticsearch, sorted by timestamp,
# including all the fields that the config specifies with "include"
def add_data(self, data):
for document in data:
# To access config options, use self.rules
if document["username"] in self.rules["usernames"]:
# Convert the timestamp to a time object
login_time = document["@timestamp"].time()
# Convert time_start and time_end to time objects
time_start = dateutil.parser.parse(self.rules["time_start"]).time()
time_end = dateutil.parser.parse(self.rules["time_end"]).time()
# If the time falls between start and end
if login_time > time_start and login_time < time_end:
# To add a match, use self.add_match
self.add_match(document)
# The results of get_match_str will appear in the alert text
def get_match_str(self, match):
return "%s logged in between %s and %s" % (
match["username"],
self.rules["time_start"],
self.rules["time_end"],
)
# garbage_collect is called indicating that ElastAlert has already been run up to timestamp
# It is useful for knowing that there were no query results from Elasticsearch because
# add_data will not be called with an empty list
def garbage_collect(self, timestamp):
pass
Alert Module¶
A simple custom alert module taken for Elastalert official documentation:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from elastalert.alerts import Alerter, BasicMatchString
class ComplexAlert(Alerter):
# By setting required_options to a set of strings
# You can ensure that the rule config file specifies all
# of the options. Otherwise, ElastAlert will throw an exception
# when trying to load the rule.
required_options = set(['output_file_path'])
# Alert is called
def alert(self, matches):
# Matches is a list of match dictionaries.
# It contains more than one match when the alert has
# the aggregation option set
for match in matches:
# Config options can be accessed with self.rule
with open(self.rule['output_file_path'], "a") as output_file:
# basic_match_string will transform the match into the default
# human readable string format
match_string = str(BasicMatchString(self.rule, match))
output_file.write(match_string)
# get_info is called after an alert is sent to get data that is written back
# to Elasticsearch in the field "alert_info"
# It should return a dict of information relevant to what the alert does
def get_info(self):
return {'type': 'Awesome Alerter',
'output_file': self.rule['output_file_path']}
Enhancement Module¶
A simple custom enhancement module taken for Elastalert official documentation:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from elastalert.enhancements import BaseEnhancement
class ComplexEnhancement(BaseEnhancement):
# The enhancement is run against every match
# The match is passed to the process function where it can be modified in any way
# ElastAlert will do this for each enhancement linked to a rule
def process(self, match):
if 'domain' in match:
url = "http://who.is/whois/%s" % (match['domain'])
match['domain_whois_link'] = url
Rule Loader Module¶
A simple custom rule loader module taken for Elastalert official documentation:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from pymongo import MongoClient
from elastalert.loaders import RulesLoader
import yaml
class ComplexRuleLoader(RulesLoader):
def __init__(self, conf):
super(ComplexRuleLoader, self).__init__(conf)
self.client = MongoClient(conf['mongo_url'])
self.db = self.client[conf['mongo_db']]
self.cache = {}
def get_names(self, conf, use_rule=None):
if use_rule:
return [use_rule]
rules = []
self.cache = {}
for rule in self.db.rules.find():
self.cache[rule['name']] = yaml.load(rule['yaml'])
rules.append(rule['name'])
return rules
def get_hashes(self, conf, use_rule=None):
if use_rule:
return [use_rule]
hashes = {}
self.cache = {}
for rule in self.db.rules.find():
self.cache[rule['name']] = rule['yaml']
hashes[rule['name']] = rule['hash']
return hashes
def get_yaml(self, rule):
if rule in self.cache:
return self.cache[rule]
self.cache[rule] = yaml.load(self.db.rules.find_one({'name': rule})['yaml'])
return self.cache[rule]
Deploying your custom module¶
Installing your module¶
Standalone mode¶
For the standalone, put your pexs in $PUNCHPLATFORM_INSTALL_DIR/extlib/elastalert/
.
Deployed mode¶
For a deployed mode, refer to this documentation
Using your custom modules¶
config.yaml
es_host: localhost
es_port: 9200
writeback_index: elastalert
run_every:
minutes: 2
buffer_time:
minutes: 1
rules_folder: examples
rule.yaml
es_host: localhost
es_port: 9200
name: "Punch Example Rule"
type: "punch_plugins.rules.my_use_case.complex_rule.ComplexRule"
index: logstash-*
num_events: 50
timeframe:
hours: 4
filter:
- term:
some_field: "some_value"
alert:
- "email"
email:
- "contact@punchplatform.com"
time_start: "20:00"
usernames:
- "admin"
- "userXYZ"
- "foobaz"
time_end: "24:00"
Execution
WORK_DIR=$(pwd)
ls $WORK_DIR
example_configs/rule/config.yaml
example_configs/rule/rule.yaml
punchplatform-elastalert.sh \
--start-foreground \
--config $WORK_DIR/example_configs/rule/config.yaml \
--rule example_configs/rule/rule.yaml \
--additional-pex custom_rule-1.0.0.pex