On Github kafka101 / data2day-slides
Frank Wisniewski - @ultraknackigLars Pfannenschmidt - @leastangle
RabbitMQ? ActiveMQ? Verbrechen wie JMS? Und Kafka...?verteilter, partitionierender, replizierender Service für Datenströme.
public class News {
public final UUID id;
public final String author, title, body;
...
}
Properties config = new Properties(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, NewsSerializer.class.getName()); KafkaProducer<String, News> producer = new KafkaProducer<>(config);
public RecordMetadata send(News news) throws ExecutionException, InterruptedException {
ProducerRecord<String, News> record = new ProducerRecord<>(topic, news.id.toString(), news);
Future<RecordMetadata> recordMetadataFuture = this.producer.send(record);
return recordMetadataFuture.get();
}
Kafka → byte[] Domäne benötigt eigene De-/Serializer
ProducerRecord(String topic, V value);
ProducerRecord(String topic, K key, V value);
ProducerRecord(String topic, Integer partition, K key, V value);
public void run() {
Thread.currentThread().setName(name);
ConsumerIterator<String, News> it = messageStream.iterator();
while (it.hasNext()) {
relayMessage(it.next());
}
}
void relayMessage(MessageAndMetadata<String, News> kafkaMessage) {
logger.trace("Received message with key '{}' and offset '{}' "
+ "on partition '{}' for topic '{}'",
kafkaMessage.key(), kafkaMessage.offset(),
kafkaMessage.partition(), kafkaMessage.topic());
messageConsumer.consume(kafkaMessage.message());
}
public interface NewsConsumer<News> {
void consume(News message);
}
import static kafka.consumer.Consumer.createJavaConsumerConnector;
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper); // list of ZooKeeper nodes
props.put("group.id", groupId); // identifies consumer group
props.put("offsets.storage", "kafka"); // storage for offsets
props.put("dual.commit.enabled", "false"); // migration switch
...
ConsumerConnector consumerConnector = createJavaConsumerConnector(new ConsumerConfig(props));
Map<String, List<KafkaStream<String, News>>> consumerMap;
consumerMap = consumerConnector.createMessageStreams(
ImmutableMap.of(topic, numberOfThreads), // number of streams per topic
new StringDecoder(null), new NewsDecoder()); // message key and value decoders
List<KafkaStream<String, News>> streams = consumerMap.get(topic);
// create fixed size thread pool to launch all the threads
ExecutorService pool = Executors.newFixedThreadPool(numberOfThreads);
// create consumer threads to handle the messages
for (final KafkaStream stream : streams) {
String name = String.format("%s[%s]", topic, threadNumber++);
pool.submit(new ConsumerThread(stream, name, consumer));
}
Full Code Example: https://github.com/kafka101/java-news-feed
Gott sei Dank ist eine neue Java-API in Arbeit!
* Consumer without lag get messages from pagecache! More Details: http://kafka.apache.org/documentation.html#persistence
Durable queues bei den klassikern "langsam" linear writes outperform random writes by far! OS optimiert: read-ahead/write-behind ACM Artikel: "random access to memory is typically slower than sequential access to disk!"datanerds.iogithub.com/kafka101
Frank Wisniewski - @ultraknackigLars Pfannenschmidt - @leastangle