On Github kafka101 / data2day-slides
Frank Wisniewski - @ultraknackigLars Pfannenschmidt - @leastangle
RabbitMQ? ActiveMQ? Verbrechen wie JMS? Und Kafka...?verteilter, partitionierender, replizierender Service für Datenströme.
public class News { public final UUID id; public final String author, title, body; ... }
Properties config = new Properties(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, NewsSerializer.class.getName()); KafkaProducer<String, News> producer = new KafkaProducer<>(config);
public RecordMetadata send(News news) throws ExecutionException, InterruptedException { ProducerRecord<String, News> record = new ProducerRecord<>(topic, news.id.toString(), news); Future<RecordMetadata> recordMetadataFuture = this.producer.send(record); return recordMetadataFuture.get(); }Kafka → byte[] Domäne benötigt eigene De-/Serializer
ProducerRecord(String topic, V value);
ProducerRecord(String topic, K key, V value);
ProducerRecord(String topic, Integer partition, K key, V value);
public void run() { Thread.currentThread().setName(name); ConsumerIterator<String, News> it = messageStream.iterator(); while (it.hasNext()) { relayMessage(it.next()); } }
void relayMessage(MessageAndMetadata<String, News> kafkaMessage) { logger.trace("Received message with key '{}' and offset '{}' " + "on partition '{}' for topic '{}'", kafkaMessage.key(), kafkaMessage.offset(), kafkaMessage.partition(), kafkaMessage.topic()); messageConsumer.consume(kafkaMessage.message()); }
public interface NewsConsumer<News> { void consume(News message); }
import static kafka.consumer.Consumer.createJavaConsumerConnector; Properties props = new Properties(); props.put("zookeeper.connect", zookeeper); // list of ZooKeeper nodes props.put("group.id", groupId); // identifies consumer group props.put("offsets.storage", "kafka"); // storage for offsets props.put("dual.commit.enabled", "false"); // migration switch ... ConsumerConnector consumerConnector = createJavaConsumerConnector(new ConsumerConfig(props));
Map<String, List<KafkaStream<String, News>>> consumerMap; consumerMap = consumerConnector.createMessageStreams( ImmutableMap.of(topic, numberOfThreads), // number of streams per topic new StringDecoder(null), new NewsDecoder()); // message key and value decoders List<KafkaStream<String, News>> streams = consumerMap.get(topic);
// create fixed size thread pool to launch all the threads ExecutorService pool = Executors.newFixedThreadPool(numberOfThreads); // create consumer threads to handle the messages for (final KafkaStream stream : streams) { String name = String.format("%s[%s]", topic, threadNumber++); pool.submit(new ConsumerThread(stream, name, consumer)); }Full Code Example: https://github.com/kafka101/java-news-feed Gott sei Dank ist eine neue Java-API in Arbeit!
* Consumer without lag get messages from pagecache! More Details: http://kafka.apache.org/documentation.html#persistence
Durable queues bei den klassikern "langsam" linear writes outperform random writes by far! OS optimiert: read-ahead/write-behind ACM Artikel: "random access to memory is typically slower than sequential access to disk!"datanerds.iogithub.com/kafka101
Frank Wisniewski - @ultraknackigLars Pfannenschmidt - @leastangle