public abstract class AbstractKafkaNode
extends org.thales.punch.libraries.storm.api.BaseInputNode
implements org.thales.punch.kafka.api.IRecordHandler<byte[],byte[]>
This is the base class for both the regular and the batch Kafka node.
Refer to the KafkaInput
and BatchKafkaInputNode
documentation.
Both nodes accept most of the standard Kafka consumer properties. Refer to the list of kafka properties available. Here is a quicklist with their (Kafka) default values.
auto.commit.interval.ms = 5000
auto.offset.reset = latest
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = mytenant.apache_httpd.test
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 30000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
The punch spout will overwrite the following defaults :
auto.offset.reset to earliest.
enable.auto.commit to false.
fetch.max.bytes to 1048576 (1Mb) only for the regular KafkaSpout
Modifier and Type | Field and Description |
---|---|
protected org.thales.punch.libraries.storm.api.StreamDeclaration |
errorStream
The error stream is used to forward error documents
|
protected boolean |
failStop
True to make the spout exit in case it receives a failed tuple
|
Constructor and Description |
---|
AbstractKafkaNode(org.thales.punch.libraries.storm.api.NodeSettings spoutConfig,
String kafkaClusterId,
org.apache.logging.log4j.Logger subLogger)
Create a new Kafka spout
|
Modifier and Type | Method and Description |
---|---|
protected org.thales.punch.libraries.storm.spout.impl.kafka.TupleId |
getTupleId(Object o,
boolean acked)
Every tuple is acked or failed, even when working with batches.
|
void |
nextTuple() |
void |
open(Map conf,
org.apache.storm.task.TopologyContext topologyContext,
org.apache.storm.spout.SpoutOutputCollector collector) |
protected boolean |
process(Object attachment,
org.apache.kafka.clients.consumer.ConsumerRecord<byte[],byte[]> record,
org.thales.punch.libraries.storm.spout.impl.kafka.KafkaBatchAttachement batchAttch)
Process a received record.
|
ack, close, deactivate, declareOutputFields, fail, getPublishedStreams, regulate, sendLatencyRecord
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
onPartitionAssigned, onPartitionRevoked, onReceive, onTick
protected org.thales.punch.libraries.storm.api.StreamDeclaration errorStream
protected boolean failStop
public AbstractKafkaNode(org.thales.punch.libraries.storm.api.NodeSettings spoutConfig, String kafkaClusterId, org.apache.logging.log4j.Logger subLogger)
spoutConfig
- the punchplatform spout configuration. It includes the
declared streams and fields.kafkaClusterId
- an id of the used kafka cluster. This is used for metrics
namingssubLogger
- a logger to make it easier to keep track of the child classpublic void open(Map conf, org.apache.storm.task.TopologyContext topologyContext, org.apache.storm.spout.SpoutOutputCollector collector)
open
in interface org.apache.storm.spout.ISpout
open
in class org.thales.punch.libraries.storm.api.BaseInputNode
public void nextTuple()
nextTuple
in interface org.apache.storm.spout.ISpout
protected boolean process(Object attachment, org.apache.kafka.clients.consumer.ConsumerRecord<byte[],byte[]> record, org.thales.punch.libraries.storm.spout.impl.kafka.KafkaBatchAttachement batchAttch)
attachment
- the attachment objectrecord
- the input kafka recordbatchAttch
- optional additional longs should it be required to add batch informationprotected org.thales.punch.libraries.storm.spout.impl.kafka.TupleId getTupleId(Object o, boolean acked)
o
- the object attached to the acked?failed tupleacked
- true if the tuple is ackedCopyright © 2022. All rights reserved.