data2day 2015 – Massive Datenströme mit Kafka – Motivation



data2day 2015 – Massive Datenströme mit Kafka – Motivation

0 0


data2day-slides


On Github kafka101 / data2day-slides

data2day 2015

Massive Datenströme mit Kafka

Frank Wisniewski - @ultraknackigLars Pfannenschmidt - @leastangle

RabbitMQ? ActiveMQ? Verbrechen wie JMS? Und Kafka...?verteilter, partitionierender, replizierender Service für Datenströme.

$ whoami

Frank Wisniewski

  • Futsal playing Engineer @Intuit
  • Event-Driven & Real Time Applications
  • @ultraknackig

Lars Pfannenschmidt

$ ls -al

  • Motivation
Concepts
  • Topics & Partitions
  • Offset Management
  • Log Compaction
Example
  • Producer
  • Consumer
Performance Summary

Motivation

high wire 2 by Graeme Maclean - Some rights reserved CC BY 2.0 interns Projekt LinkedIn; einheitliche Plattform zur Verarbeitung von großen Datenströmen;

Chaos

„The Log: What every software engineer should know about real­time data's unifying abstraction“ by Jay Kreps uni-/bidirektionale Abhaenigkeiten kreuz und quer; Unwartbar

Order

„The Log: What every software engineer should know about real­time data's unifying abstraction“ by Jay Kreps entkoppeltes Nachrichtensystem; klassisches funktionierte nicht; wurde LinkedIn's “zentralen Nervensystem” Kernanforderungen: Latenz, Skalierbarkeit, Fehlertoleranz & Verfügbarkeit

Concepts

1960 Lloyd Arabella by JOHN LLOYD - Some rights reserved CC BY 2.0

Overview

Topological Overview according to Kafka documentation

Topics & Partitions

Anatomy of a topic according to Kafka documentation Nachrichten → Topic; durable; n Partitionen und Knoten verteilt; sequentiell; Offset pro partition; Replikation; Retention;Obacht: Partitionen Nachträgliches Ändern :(

Guarantees

  • Messages sent to a partition will be appended in the order they are seen
  • Consumer see messages in the order they are stored in the log
  • Kafka will tolerate up to n-1 server failures for a topic with replication factor n

Log Compaction

Log compaction according to Kafka documentation Compression nicht möglich

Example

Hello World by Bill Bradford - Some rights reserved CC BY 2.0

Java Producer

public class News {
    public final UUID id;
    public final String author, title, body;
...
}
					
Properties config = new Properties();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, NewsSerializer.class.getName());

KafkaProducer<String, News> producer = new KafkaProducer<>(config);
					
public RecordMetadata send(News news) throws ExecutionException, InterruptedException {
    ProducerRecord<String, News> record = new ProducerRecord<>(topic, news.id.toString(), news);
    Future<RecordMetadata> recordMetadataFuture = this.producer.send(record);
    return recordMetadataFuture.get();
}
					
Kafka → byte[] Domäne benötigt eigene De-/Serializer

Message Distribution

Message routing to target partition via ProducerRecord
  • Round Robin
    								ProducerRecord(String topic, V value);
    							
  • Via key hash (murmur2)
    								ProducerRecord(String topic, K key, V value);
    							
  • Via specific partition
    								ProducerRecord(String topic, Integer partition, K key, V value);
    							
Erinnerung: Reihenfolge pro Partititon garantiert; LogCompaction ohne Key?

Consumer Grouping

“Kafka 101 - Massive Datenströme mit Apache Kafka” by Pfannenschmidt and Wisniewski, JavaMagazin 08.2015, p. 38 Strategien zum konsumieren: links Queue; rechts Topic/publish-subscribe
  • Topic A:Nachrichten entweder von A0 oder A1
  • Topic B:Nachrichten sowohl von B0 als auch von einem Consumer aus Gruppe C empfangen

Consumer Threading

“Kafka 101 - Massive Datenströme mit Apache Kafka” by Pfannenschmidt and Wisniewski, JavaMagazin 08.2015, p. 41
  • #Threads > #Paritions → überschüssigen Threads erhalten keine Nachrichten (siehe T0)
  • #Threads < #Paritions → Threads bekommen Nachrichten von mehreren Partitionen → keine Order mehr!!

Java Consumer Thread

public void run() {
    Thread.currentThread().setName(name);
    ConsumerIterator<String, News> it = messageStream.iterator();
    while (it.hasNext()) {
        relayMessage(it.next());
    }
}
					
void relayMessage(MessageAndMetadata<String, News> kafkaMessage) {
    logger.trace("Received message with key '{}' and offset '{}' "
        + "on partition '{}' for topic '{}'",
        kafkaMessage.key(), kafkaMessage.offset(),
        kafkaMessage.partition(), kafkaMessage.topic());
    messageConsumer.consume(kafkaMessage.message());
}
					
public interface NewsConsumer<News> {
    void consume(News message);
}
					
  • High Level Consumer API lagert das Verwalten des Offset nach ZooKeeper oder Kafka aus
  • SimpleConsumer überlässt dies der Consumer-Implementierung
  • Client Re-Design ist in Arbeit

Wiring

import static kafka.consumer.Consumer.createJavaConsumerConnector;

Properties props = new Properties();
props.put("zookeeper.connect", zookeeper); // list of ZooKeeper nodes
props.put("group.id", groupId);            // identifies consumer group
props.put("offsets.storage", "kafka");     // storage for offsets
props.put("dual.commit.enabled", "false"); // migration switch
...
ConsumerConnector consumerConnector = createJavaConsumerConnector(new ConsumerConfig(props));
					
Map<String, List<KafkaStream<String, News>>> consumerMap;
consumerMap = consumerConnector.createMessageStreams(
    ImmutableMap.of(topic, numberOfThreads),     // number of streams per topic
    new StringDecoder(null), new NewsDecoder()); // message key and value decoders

List<KafkaStream<String, News>> streams = consumerMap.get(topic);
					
// create fixed size thread pool to launch all the threads
ExecutorService pool = Executors.newFixedThreadPool(numberOfThreads);
// create consumer threads to handle the messages
for (final KafkaStream stream : streams) {
    String name = String.format("%s[%s]", topic, threadNumber++);
    pool.submit(new ConsumerThread(stream, name, consumer));
}
					
Full Code Example: https://github.com/kafka101/java-news-feed Gott sei Dank ist eine neue Java-API in Arbeit!

Performance

Grefsenkollen Downhill 2015 by Vegar Nilsen - Some rights reserved CC BY 2.0

What makes Kafka fast?

"[Designed] to make consumption as cheap as possible" Teure Probleme beim Messaging: Exactly once → At least once Order → Partitions

What makes Kafka fast?

Fast Writes:

  • linear writes
  • all writes go to OS pagecache

Fast Reads:

  • linear reads
  • direct data transport from pagecache to socket* via sendfile()

Combined → Fast!

* Consumer without lag get messages from pagecache! More Details: http://kafka.apache.org/documentation.html#persistence

Durable queues bei den klassikern "langsam" linear writes outperform random writes by far! OS optimiert: read-ahead/write-behind ACM Artikel: "random access to memory is typically slower than sequential access to disk!"

sendfile()?

Efficient data transfer according to http://www.ibm.com/developerworks/library/j-zerocopy/ links ohne, rechts mit sendfile()FileChannel.transferTo(), falls JVM/OS unterstützt → sendfile().

Summary

Numbers And Finance by reynermedia - Some rights reserved CC BY 2.0

Key Features & Facts

  • Commit Log  Topics, Partitioning, Log Compaction, Compression & Offset Management
  • Real-time      High-Throughput & Low-Latency
  • Persistence   Scalable, centralized & replicated storage

Related

  • Clients  Scala, Java, Python, Go, Perl, Erlang etc.
  • Integration  Storm, Samza, Hadoop, ElasticSearch, Logstash, Hive etc.
  • Confluent Platform  Schema Registry, REST Proxy & Camus

Thank You!

datanerds.iogithub.com/kafka101

Frank Wisniewski - @ultraknackigLars Pfannenschmidt - @leastangle

1 / 26
data2day 2015 Massive Datenströme mit Kafka Frank Wisniewski - @ultraknackig Lars Pfannenschmidt - @leastangle RabbitMQ? ActiveMQ? Verbrechen wie JMS? Und Kafka...? verteilter, partitionierender, replizierender Service für Datenströme.