Skip to content

Design Issues Highlights

Sizing Process

Forewarn

There is no such thing as a unique "sizing method", as the platform layout itself (even before sizing) is also dependent on the requirements (especially security, performance, desired or unnecessary platform features, available hardware/cloud usage...)

So these here are more sizing-related highlights of things to mind when you actually design a solution, not only size it.

Be careful that all sizing numbers provided in this chapter are just rough order of magnitude, and not for fine sizing (which requires applicable projects/data reference and/or small-scale testing).

Checklist

  • What is the average over a whole week of managed number of incoming documents. And what is their average input size ?
  • What is the peak hours maximum real-time incoming rate (averaged on a few minutes) ?
  • What stages of processing will you apply to most of your data (e.g. parsing, geographic transformation, enrichment, correlation to inventory data, stateful correlation to sessions information, geographic indexing, keyword/addresses indexing, pre-aggregation for browseable aggregated metrics generation, ...)
  • Do you need near real-time for part of the services (e.g. forwarding to a third-party subsystem, visibility in Kibana) ?
  • what is the needed long-term retention duration (for allowing mass extraction, re-injection, raw pattern search). 1 year ?
  • what is the online searchable data desired time depth (1 month ?)
  • What level of data availability do you need in case of single server failure ?
  • Do you need archive (long-term) re-injection into Elasticsearch for late in-depth analysis of part of past data ? If yes, do you have (small) delay requirements when doing such re-injection ?

CPU consumers

The main cpu consumers are: - log/events processor ( around from 1KEPS to 30KEPS per cpu for each task ==> to be tested one at a time, possibly using punch injector tool). - documents indexing phase in Elasticsearch ( around 1KEPS per vcpu, for 1KB documents ==> to be benched depending on the fields indexing settings ) - spark/pyspark based aggregation/correlation batches (take them into account or dedicate servers for this)

Then there are smaller loads, less easy to unit-test: - TLS ciphering load of all data at move (if activated) - kafka compression/decompression - Lumberjack compression/decompression - kafka queues handling (replication/JVM GC)

ALL these loads are roughly proportional to the throughput, except the Spark/Pyspark batches, that can use non-linear operators (like joining/sorting). So benching on a not-too-small throughput (a few KEPS) allows to size the bigger throughputs.

Tips

  • One node less !

    When you size for single-failure resistance of the platform, it means you must comply with most of the performance with one less node active (same averaged/real-time processing targeted). So the CPU will be provided by the remaining servers.

    Sometimes this implies much "bigger" nodes characteristics when you try for few nodes than when you accept to have more servers. As an example, a 2-nodes cluster need 100% more powerful nodes than a 3-nodes cluster. So at the end, check if adding a server and reducing the power of all of them is not an overall money gain. Because it sure will produce a more stable and balanced cluster (less unbalance/re-balance when one node is down in a bigger cluster !

  • For databases, Bigger/Denser nodes often mean less querying performance and more useless CPU ! Sure you can get a 300PB single server nowadays. But beware the IOwaits upon querying so much data. So this kind of density is (maybe) good for your flat files append-only archiving, but not for your Elasticsearch cluster. Elasticsearch clusters are more often more responsive by having more, smaller nodes for parallel shards processing. But as explained in Elasticsearch sizing, only tests will actually tell what is the limit on a chosen hardware.

  • For compute farms (Shiva/Storm processing cluster), Bigger servers can mean less balancing issues

  • When sizing for queries, remember to purge OS cache by specific OS actions, otherwise you may measure the efficiency of the OS File cache (which can be useful also in real case, but then be sure that your nodes RAM will hold all the data you will be querying regularily !)

  • When sizing CPU consumption, always discard the metrics of the first minutes of the load after starting up the full chain. This is because Java JVMs in 'server' configuration will automatically optimize their throughput by on the fly compilation, but this will be triggerd only when the load has been observed for sufficiently long by the JVM, AND this will consume cpu time and slow the processing when this compile phases occur. So your JVM will potentially stabilize some minutes (say about 1 or 2) after your load is ongoing. Of course, once optimized, the performance remains the same for later load tests, as long as you do not restart the JVM (punchlines, kafka, elasticsearch...).

  • When sizing for storage Never try to 'guess' the Elasticsearch storage based on your raw input data size. Unless you have applicable tables from other projects with similar kind of data AND indexing choices, you have to test for true on a data subset !

The most common sizing order

  • Size your total needed ES storage given the retention, replication, "index data size measured computed from a set of 100000 documents"
  • Size your total needed indexing CPU by measuring on a shard with a significant indexing flow (at least 2KEPS), and taking into account your averaged or peak hour flow (depending on the near-real-time requirements)
  • Size your total (ES+OS CACHE of ES servers) RAM based on advised ratio to the storage(see Elastic cloud) from 1/10 (B.I. and low latency querying) to 1/100 (log management with hihgher response time) depending on the querying use case. If you really aim at a fast Kibana user experience, then the related data should probably be SSD-stored.
  • Compute the total processing cpu you'll need, and how much RAM per processing CPU (often 1GB-2GB per vcpu at least, but may be higher if you are using in-memory enrichment .resources)

  • Decide how much storage your long-term/archives will need (text files are compressed with a 10-15 ratio, but text format may be 2-3 times more verbose as your input data).

  • Add up some resource for your kafka clusters (compute the disk need for a 7 day, replicated retention. Count the time your data will go through kafka during your overall process ). Remember that kafka-stored data is often processed, and therefore can be 3 times the original input document (for example because of parsing a raw text into a structured json). BUT kafka will come with a 4 to 6 times compression. Best again is do a small scale test to be sure of the effective processing/enrichment/compression impacts on storage.

  • Because processing uses very few storage, and Zookeeper/kafka uses few cpu/ram as compared to storage, try mutualizing those on the same physical servers if the security choices (n-tiering) allows.

  • Then decide if you can mutualize servers (relying on virtualization) for the processing+Kafka and elastic layer, in order to best use the cheapest applicable RAM/CPU/Storage if you have a set of negotiated catalog.

  • When you do simulation variants, remember to count 1 node more for CPU/RAM when computing each cost, for the failure resilience.

N-Tiering for (real?) security hardening

Kafka clusters unique security realm

At the time being, Punch is not making use of Kafka internal ACL features, allowing to distinguish access rights to specific topics for different client application nodes.

This means that any compromised node that has access to a Kafka cluster can potentially be used to access, alter or destroy other queues stored in the same Kafka cluster.

Mitigation:

if you have separate "external/front" data access flows or data ingestion servers that write to Kafka, and you want to improve isolation of consequences to each other in case of a compromised data flow or ingestion server, it may not be sufficient to dedicate separate servers to the different external flows, but you may want to

  • dedicate Kafka clusters to each 'front' group of servers when related to separate external security areas (like multiple datacenters) These dedicate Kafka clusters need not be hosted on distinct servers or VMs, as you can chose distinct ports for co-hosting kafka clusters.

  • be sure there are firewalling rules or certificate-based differenciation preventing access to a 'safer' zookeeper node from a front node (of any application), or between 'front' areas to kafka nodes in other security areas/tiers.

Zookeeper unique security realm / Kafka clusters isolation relies on Zookeeper

Although Zookeeper allows for TLS-secured API in Punchplatform configuration (for Kafka use), it has (in punch usage) no internal ACL distinguishing one client process rights from the others.

This means that any compromised Kafka or Storm server node can be potentially used to write/erase data from within a zookeeper cluster, even for other application clusters (other Kafka clusters or storm clusters relying on the same zookeeper server)

This can be used to destroy Kafka data, disrupt or hamper Kafka/Storm workings to break the overall service.

Mitigation:

  • whenever you choose to have separate Kafka clusters between "tiers" in your system, for isolation rationale, it is better that they do not share the same Zookeeper cluster

  • make sure only Kafka/storm nodes have access to their associated zookeeper cluster ports

Shiva clusters and Punch rely on Kafka

Issuing commands to shiva clusters (to start/stop tasks) and intra-cluster signalling in the shiva cluster (leader assigning tasks to runner nodes) both rely on Kafka queues (see Shiva protocol).

At the time being, Punch is not making use of Kafka internal ACL features, allowing to distinguish access rights to specific topics for different client application nodes.

This means that any compromised Shiva runner node can potentially be used to alter/inject tasks run inside a cluster, up to potentially assigning tasks to a chosen other runner node.

Mitigation:

If you want to achieve a n-tiered architecture to reduce impact of compromised 'front' servers,

  • be sure to have separate shiva clusters for your different layers. In addition,
  • try to ensure their isolation dependencies (kafka, zookeeper) are not shared between your security tiers
  • protect against access to kafka or zookeeper from "less safe" network areas/tiers using firewalling rules or differentiated TLS certificates/authorities.

Metricbeat and Shiva daemons on all servers may use Elasticsearch for metrics logging

The Metricbeat daemon captures OS-level metrics, useful for platform monitoring, capacity managing, performance tuning and incidents troubleshooting.

It is very easy to allow direct access to an Elasticsearch instance used for centralizing all these metrics by WEB requests from these daemons to the ES Web API.

Same goes for any Shiva daemon configured to run tasks on servers (e.g. front servers tasks to receive logs from external sources). Such daemon sends metrics and captured tasks logs to be indexed into a central Elasticsearch cluster for easy monitoring/troubleshooting of tasks.

This means any compromised server could be used as an attack mean on this Elasticsearch cluster, to try to alter/destroy/access other data from this Elasticsearch (metrics informing about servers, application logs, customer-related business information).

Mitigation:

There are multiple hardening actions you can chose to protect against such attacks

  • Separate "business" and "monitoring" data sets in two different Elasticsearch cluster (the monitoring one being usually the smallest). Make sure that "business data" cluster is hosted in the lower tier, and not accessible from the front tiers machines.

  • Activate Opendistro security deployment, with differentiated access roles. Configure a metricbeat role allowing only to push data in the 'platform-metricbeats-*' indices, and assign authentication credentials to the metricbeat servers (user/password) associated to this restricted role.

  • Configure your front-tier servers metricbeat daemons and your shiva daemons to not directly report into Elasticsearch, but instead to report to front-hosted kafka queue. Then have deeper tiers run tasks to fetch these metrics documents from kafka and put in in the monitoring backend (Elasticsearch)

    Such pattern is described in Central Log Management reference pipelines/quueing architecture for monitoring

Database Design Considerations

Elasticsearch is strong for:

  • searching matching in zillion of documents
  • scaling horizontally for High quantities of documents that exceed what you want to store/query on a single server, while managing replication/sharding for you in an operation-friendly way.
  • aggregating data on masses of selected documents
  • searching/scoring/selecting approximate matches (multiple keyword occurrences frequencies, geographic proximity)

Elasticsearch cannot

  • do joins
  • do SQL
  • retrieve ALL data in "aggregation" queries
  • do transactions
  • guarantee that when you read, you always see what you've just written

For missing "joins", you can somewhat compensate by using external Spark/Pyspark batches, but not with the efficiency of a Relational DB.

Kibana HMI is nice for dashboarding, browsing and filtering into your data. Dashboards can be designed for specific tasks in mind. You can filter your data just by clicking on values in views in your dashboard.

But Kibana filters in a dashboard applies to all the data presented on the dashboard. This limits the ergonomics of dashboards for some specific tasks (in addition to not having sql/joins).

So depending on your use cases, you may want to - have specific end-user HMI development for custom user interaction tailored to a business task - need relational-type of queries for easier coding of such external tools or application.

So Elasticsearch+Kibana is powerful, but you may want to supplement or trade them at least for part of your data with some other combination (like Clickhouse relational engine that Punch can also deploy/leverage, or like Superset generic BI opensource HMI).

If your relational querying needs are limited, and you have scalability/replication in mind, then Elasticsearch can be the good choice.

And Kibana/Elasticsearch stays a good one-for-all solution to manage the metrics, logs and monitoring data, that exists in the punch platform (or any platform) ; at the moment, Punch is not able to deliver all its monitoring/housekeeping features without at least a small elasticsearch instance.