Monday, 8 May 2017

Kafka Partition Leader and Controller

Zookeeper and Controller:

Every time a broker process starts, it registers itself with its id in Zookeper by creating an ephemeral node. When a broker loses connectivity to Zookeeper, the ephemeral node that the broker created when starting will be automatically removed from Zookeeper. Kafka components that are watching the list of brokers will be notified for that.

The controller is one of kafka brokers is also responsible for the task of electing leaders among the partitions and replicas. The first broker starts in the cluster becomes the controller by creating an ephemeral node in Zookeeper /controller. The brokers create a Zookeeper watch on the controller node, so they get notified on changes to this node. Kafka uses Zookeeper's ephemeral node feature to elect a controller and to notify the controller when nodes join and leave the cluster.

When the controller announces that a partition has a new leader, it sends LeaderAndIsr request to the new leader and the followers.

Request Processing:

All requests sent to the broker from a specific client will be processd in the order they were received. Both produce requests(producer) and fetch requests(consumers and followers) have to be sent to the leader replica of a partition.

Kafka clients uses another request "metadata request" to know where to send the requests.
Metadata request includes a list of topics the client is interested in. It can be sent to any broker since all brokers have a metadata cache taht contains this information. Clients typically cache this information, and need to occasinally refresh it by sending metadata requests.

In-Sync Replica(ISR)

In order to stay in sync with the leader, the replicas send the leader Fetch requests. Only in-sync replicas are eligible to be elected as partition leaders in case the existing leader fails.

"acks" as "written successfully" when the messages was accepted by just the leader(acks=1), all in-sync replicas(acks=all). Consumers can only read messages that were written to all in-sync-replicas.

Saturday, 6 May 2017

Why Kubernetes?


What is a Container?

A container at its core is an allocation, portioning, and assignment of host resources such as CPU Shares, Network I/O, Bandwidth, Block I/O, and Memory, so that kernel level constructs may jail-off, isolate or “contain” these protected resources so that specific running services and namespaces may solely utilize them without interfering with the rest of the system.

Commonly known as “operating system-level virtualization containers differ from hypervisor level virtualization. The main difference is that the container model eliminates the hypervisor layer, redundant OS kernels, binaries, and libraries needed to typically run workloads in a VM.
The New Way is to deploy containers based on operating-system-level virtualization rather than hardware virtualization. These containers are isolated from each other and from the host: they have their own filesystems, they can’t see each others’ processes, and their computational resource usage can be bounded. They are easier to build than VMs, and because they are decoupled from the underlying infrastructure and from the host filesystem, they are portable across clouds and OS distributions.

Kubernetes can schedule and run application containers on clusters of physical or virtual machines. However, Kubernetes also allows developers to ‘cut the cord’ to physical and virtual machines, moving from a host-centric infrastructure to a container-centric infrastructure, which provides the full advantages and benefits inherent to containers.


If an application can run in a container, it should run great on Kubernetes. Additionally, Kubernetes is not a mere “orchestration system”; it eliminates the need for orchestration.


Wednesday, 3 May 2017

Kafka Consumer Challenges

Consumer Retry
1. When you encounter a retriable error, is to commit the last record you processed successfully. Then, store the records that still need to be processed in a buffer, use the consumer pause() method to ensure that additional polls won't return data. If you succeed, or retried enough times, log an error, call resume() to unpause the consumer and the next poll will return new records to process.

2. When encountering a retriable error is to write it to a separate topic and continue. A separate consumer group can be used to handle retries from the retry topic.

Long Processing Times

The consumer processing records could take a long time. But you can't stop polling for more than few seconds. You must continue polling so the client can send hearbeats to the broker, and rebalance will not be triggered. A common pattern is to hand off the data to process to a thread-pool, when possible with multiple threads. You can pause the consumer, and keep polling without fetching additional data until the worker-threads finished.

Exactly Once Delivery

1. Writing results to a system that has some support for unique keys. Either the record itself contains a unique key, or you can create a unique key using the topic, partition and offset combination, which uniquely identifies a Kafka record. The data-store will override the existing one, when it receives the duplication. This pattern is called idempotent writes.

2. Writing results to a system that has transactions. Write the record and their offsets in the same transactionk. When starting up, retrieve the offsets of the latest records written to the external store, and then use consumer.seek() to start consuming again from those offsets.

Sunday, 30 April 2017

Kafka Connect

Kafka, being a streaming data platform, acts as a giant buffer that decouples the time-sensitivity requirements between producers and consumers. Kafka itself applies back-pressure on producers, and consumption rate is driven entirely by the consumers. If producer throughput exceeds that of the consumer, data will accumulate in Kafka until the consumer can catch up.

Kafka can provide "at least once" on its own, and "exactly once" when combined with an external data store that has a transactional model or unique keys.

ETL vs ELT

ETL: Extract-Transform-Load, the data pipeline is responsible for making modifications to the data as it passes through.

ELT: Extract-Load-Transform, the data pipeline does only minimal transformation(e.g. data type conversion), with the goal of making sure the data that arrives at the target is as similar as possible to the source data. Data-Lake architecture preserves as much of the raw data as possible and allow downstream apps to make their own decision regarding data processing and aggregation.


Kafka Connect vs. Client APIs

Use Connect, when you don't write or modify their code. Connect provides out the box features like configuration management, offset storage, parallelization, error handling, REST, etc.

Connector
The connector is responsible for:

  1. How many tasks will run for the connector
  2. How to split the data copying work between tasks
  3. Get configuration for the tasks from workers

All tasks are initialized by receiving a context from the worker. Source context includes an object that allows the source task to store offsets of source records, retry and store offsets externally for exactly-once delivery.

Worker

Kafka Connect's worker processes are the container processes that execute the connectors and tasks.
If a worker process crashes, other workers will recognize that and will reassign the connectors and tasks that ran on that worker.

Connectors and tasks are responsible for the "moving data" part of data integration, while the workers are responsible for the REST API, configuration management, reliability, and load balancing.

Offset Management

For source connectors, the records that the connector returns to the Connect workers include a logical partition and a logical offset in the source system, e.g. a partition can be a file and an offset can be a line number, or a partition can be a database table and a offset can be an ID or a record.
The worker stores both the records and their offsets to Kafka topic. This allows connectors to start processing events from the most recent stored offset after a restart or a crash.






Monday, 24 April 2017

Avro Schema Registry


It’s useless to send the schema of the data along with the data each time (as we do with JSON). It’s not memory and network efficient. It’s smarter to just send an ID along the data that the other parties will use to understand how are encoded the data.

On serialization: we contact the SR to register (if not already) the Avro schema of the data we’re going to write (to get a unique ID). We write this ID as the first bytes in the payload, then we append the data. A schema has a unique ID (so multiple messages use the same schema ID).

On deserialization: we read the first bytes of the payload to know what is the version of the schema that was used to write the data. We contact the SR with this ID to grab the Schema if we don’t have it yet, then we parse it to a org.apache.Schema and we read the data using the Avro API and this Schema(or we can read with another compatible Schema if we know it’s backward/forward compatible).

A subject represents a collection of compatible (according to custom validation rules) schemas in the SR. The schema registry depends on Zookeeper and looks for Kafka brokers. If it can’t find one, it won’t start.

By default, the client caches the schemas passing by to avoid querying the HTTP endpoint each time. The schema validation is done on the schema registry itself according to its configuration (none, backward, forward, full)

A Kafka message is a Key-Value pair. In consequence, a topic can have two schemas, one for the Key, one for the Value.
Avro fixes those issues:
  • Space and network efficiency thanks to a reduced payload.
  • Schema evolution intelligence and compatibility enforcing.
  • Schema registry visibility, centralization, and reutilization.
  • Kafka and Hadoop compliant.
Avro Serialization/Deserialization

Specific Avro classes mean that we use Avro's code generation to generate the object class from avro schema file, then populate it and produce to Kafka. Avro maven plugin is provided in pom.xml file, and run  $ mvn clean package to generate object Java files.

However, Generic Avro, without code generation, only avro schema file is needed. It sends GenericRecord


Reference:
https://www.ctheu.com/2017/03/02/serializing-data-efficiently-with-apache-avro-and-dealing-with-a-schema-registry/
https://www.confluent.io/blog/schema-registry-kafka-stream-processing-yes-virginia-you-really-need-one/
http://docs.confluent.io/3.1.2/schema-registry/docs/serializer-formatter.html
https://softwaremill.com/dog-ate-my-schema/

Monday, 17 April 2017

Config Log4j to Send Kafka logs to Syslog

1. Configure Syslog Daemon for UDP Input

$ sudo vim /etc/rsyslog.conf

Uncomment these lines to accept UDP messages on the default port 514.
$ModLoad imudp
$UDPServerRun 514
local7.*      /var/log/kafka.log

2. Restart the rsyslog service so the changes take effect

$ sudo service rsyslog restart

3. Open your log4j.properties file:

$ vi /opt/kafka/config/log4j.properties



log4j.rootLogger =INFO, stdout, SYSLOG
log4j.appender.SYSLOG =org.apache.log4j.net.SyslogAppender
log4j.appender.SYSLOG.SyslogHost =localhost
log4j.appender.SYSLOG.Facility =LOCAL7
log4j.appender.SYSLOG.threshold =INFO
log4j.appender.SYSLOG.DatePattern ='.'yyyy-MM-dd-HH
log4j.appender.SYSLOG.layout =org.apache.log4j.PatternLayout
log4j.appender.SYSLOG.layout.ConversionPattern =kafka-broker: [%d] %p %m (%c)%n

Log4j comes out of the box with a SyslogAppender
Configure SyslogAppender to write these messages as UDP over localhost to the syslog daemon. The first field in the conversion pattern is the syslog appname.

4. Restart Kafka
$ sudo stop kafka-broker
$ sudo start kafka-broker

Reference:

Monday, 10 April 2017

Kafka Streams

Lambda vs Kappa Architecture


  • A scalable high-latency batch system that can process historical data and a low-latency stream processing system that can't reprocess results.
  • Use Kafka to retain the full log of the data you want to be able to reprocess Retaining large amounts of data in kafka is a perfectly natural and economical thing to do and won't hurt performance.


The KStreams DSL is composed of two main abstractions; the KStream and KTable interfaces.
  • A KStream is a record stream where each key-value pair is an independent record. Later records in the stream don’t replace earlier records with matching keys. 
  • A KTable on the other hand is a “changelog” stream, meaning later records are considered updates to earlier records with the same key.
Local State Store:

Kafka streams provide an efficent way to model the application state.
The state store is partitioned the same way as the application's key space. As a result, all the data required to serve the queries that arrive at a particular application instance are available locally in the state store shards. Fault tolerance for this local state store is provided by kafka streams by logging all updates made to the state store, transparently, to a highly-available and durable kafka topic.
Kafka streams uses kafka like a commit log for its local, embedded database.

The aggregation is done on each instance with a local state. Then, the results are written to a new topic with a single partition, which will be read by a single application instance. This type of multi-phase processing is very familiar to those  mapreduce.


Interactive Queries allow us to treat the stream processing layer as a lightweight, embedded database and directly query the state of your stream processing application, without needing to materialize that state to external databases or storage.

Kafka steams simplifies the stream processing architecture by eliminating the need for a separate cluster. These embedded databases act as materialized views of logs.
Materialized views provide better application isolation because they are part of an application's state, also provide better performance.

Interactive Queries enables faster and more efficient use of the application state. There is no duplication of data between the store doing aggregation for stream processing and the store answering queries.

Discovering any instance's stores

In kafka, each instance may expose its endpoint information metadata to other instances of the same application. The IQ APIs allow a developer to obtain the metadata for a given store name and key, and do that across the instances of an application. Then, we can discover where the store is that holds a particular key by examining the metadata.

Event streams are ordered, while records in a table are always considered unordered.
Events, once occured, can never be modified. Instead an additional event is written to the stream, recording a cancellation of previous transaction.

Streams contain a stream of events and each event caused a change. A table contains a current state of the world, a state that is a result of many changes.

If we can capture all the changes that happen to the database table in a stream of events, we can have our stream processing job listen to this stream and update the cache based on database change events.

Stream-table join: one of the streams represents changes to a locally cached table. One stream is joining a stream with a table to enrich all events with info in the table. This is similar to joining a fact table with a dimension.
Stream-stream join: Streams have the same key and happened in the same time windows, also called a windowed-join. They are partitioned on the same keys, which are also the join keys.

Kafka Streams scales by allowing multiple threads of executions within one instance of the application., and supporting load balancing between distributed instances of the application. The number of tasks is determined by streams engine, and depends on the number of partitions in the topics. Each task is responsible for a subset of the partitions. The developer of the application can choose the number of threads each application instance will execute. Each task will independently process events from those partitions and maintain its own local state with relevant aggregates.

Kafka assigns all the partitions needed for one join to the same task, so this task can consume from all the relevant partitions and perform the join independently. Kafka streams requries that all topics that participate in a join operation will hae the same number of partitions and be partitioned based on the join key. Instead of shuffling, Kafka streams re-partitions by writing the events to a new topic with the new keys and partitions. It reduce dependencies between different parts of a pipeline.

Global KTable is more service concept, while KTable is more about streaming concept:

All partitions replicated to all nodes.
Supports N-way join
Blocks until initialized
Doesn't trigger processing(reference resources)

By default KStreams keeps state store backed up on a secondary node.