Kafka Producer (including interceptor, partition, serializer and asynchronous message sending mode)
Kafka producer is a role in the whole Kafka architecture. It can be different components that integrate Kafka. Kafka producer is thread safe and can be used by multiple threads at the same time.
1 how to build a KafkaProducer
There are two ways to build a KafkaProducer:
//First, configure the necessary configuration of Producer Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.CLIENT_ID_CONFIG,"fast practice producer"); //This is a common way to create producer instances based on configuration KafkaProducer producer = new KafkaProducer<K,V>(properties) //If the serializer is not configured in properties, it can also be specified in the constructor. The actual underlying principle is the same KafkaProducer producer = new KafkaProducer<K,V>(properties,new StringSerializer(),new StringSerializer())
2 create producer record and send message
There are many construction methods of ProducerRecord, and different constructors can be selected according to different requirements.
//Create a producer message object ProducerRecord<String, String> message = new ProducerRecord<String, String>( "topic1", "hello_" + System.currentTimeMillis()); //Send message through producer try{ //Future represents the life cycle of a task. The source data information of the message can be obtained from future, such as partition, offset, etc Future<RecordMetadata> future = producer.send(message); } catch (Exception e) { e.printStackTrace(); } finally { //Shut down producer producer.close(); }
3 three modes of sending messages
There are three modes for Kafka producers to send messages
-
Fire and forget
- Just send the message to the Broker, no matter whether the message arrives correctly or not, this mode is no problem in most scenarios;
- Data loss may occur in case of non retryable exception;
- This is the mode with the highest performance and the worst reliability.
Implementation method:
try{ //Future represents the life cycle of a task Future<RecordMetadata> future = producer.send(message); }catch(ExecutionException | InterruptedException e)){ e.printStackTrace(); }
-
sync synchronization
- The response of kafka can be blocked in combination with get() method to realize the synchronous sending of messages. The reliability is very high, but the performance is low, because it needs to block and wait for the last message to be sent;
- The call of this method either sends a message successfully or sends an exception. When an exception is sent, it needs to be handled by external logic.
Implementation method:
try{ //Wait for kafka's response through get() blocking, and either send it successfully or make an exception. If the exception is handled by external logic producer.send(message).get(); }catch(ExecutionException | InterruptedException e)){ e.printStackTrace(); }
try{ //This method is actually the same as the above method. If you need to use the metadata information of the message, you can choose this method. Otherwise, it is easier to use the above method. Future<RecordMetadata> future = producer.send(message); RecordMetaData metaData = future.get() metaData.offset() //Get metadata information metaData.partition() }catch(ExecutionException | InterruptedException e)){ e.printStackTrace(); }
-
async asynchronous
- The send() method itself is asynchronous. The Future object returned by this method represents the life cycle of sending a message, which enables the caller to obtain the sending result after the message is sent;
- Future. The get () method can obtain the metadata information of the successfully sent message, such as offset and partition
//Generally, send(message,Callback cb) is used to realize the asynchronous sending of messages. Callback is called after kafka has responded. If there is no response, it will not be used. try{ //VC & Delphi producer.send(message, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { //handle e.printStackTrace(); } else { System.out.println(recordMetadata.topic() + ":" + recordMetadata.partition() + ":" + recordMetadata.offset()); } } }); }catch{ //handle this }
Note: the two parameters in the onComplete() method in Callback {} are mutually exclusive. If Exception is Null, RecordMetadata will not be Null, and vice versa.
4 exception in kafkaproducer
4.1 anomaly classification
Two kinds of exceptions usually occur in KafkaProducer
- Retryable exception
- NetworkException
- Exceptions caused by transient network failure can be recovered by retry
- LeaderNotAvailableException
- The leader marking the partition is unavailable. It usually occurs before the old leader goes offline and the new leader is elected. Retry can be recovered.
- UnknownTopicOrPartitionException
- NotEnoughReplicasException
- NotCoordinatorException
- NetworkException
- Non retryable exception
- RecordTooLargeException
- The message sent on behalf of the producer is too large and exceeds the maximum 900+kb set in the configuration. kafka will not retry this and will directly throw an exception!
- RecordTooLargeException
4.2 retryable exception resolution
//Set the number of retries, but if there is no recovery after 10 retries, an exception will be thrown. At this time, external logic processing is required props.put(ProducerConfig.RETRIES_CONFIG, 10);
The synchronous sending method has high reliability. Either the message is sent successfully or an exception occurs. If an exception occurs, it can be captured and processed accordingly, without directly causing the loss of the message in the way of "forget after sending". However, the performance of synchronous sending is much worse. You need to block and wait for one message to be sent before sending the next message.
5 serializer for key value
Because the data of Producer is transmitted to Kafka in a serialized way, the serialization method of KV needs to be specified. The source code is as follows:
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.kafka.common.serialization; import org.apache.kafka.common.errors.SerializationException; import java.io.UnsupportedEncodingException; import java.util.Map; /** * String The encoding method of type is UTF-8 by default. The encoding method can be specified in properties: * key.serializer.encoding,value.serializer.encoding Or serializer encoding. * The first two coding methods have higher priority than the latter. */ public class StringSerializer implements Serializer<String> { private String encoding = "UTF8"; //Default code UTF-8s @Override //The configure method determines the encoding method of key and value public void configure(Map<String, ?> configs, boolean isKey) { String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding"; Object encodingValue = configs.get(propertyName); if (encodingValue == null) encodingValue = configs.get("serializer.encoding"); if (encodingValue instanceof String) encoding = (String) encodingValue; } @Override //Converts the String type into a byte array according to the encoding method. If it is null, it returns null public byte[] serialize(String topic, String data) { try { if (data == null) return null; else return data.getBytes(encoding); } catch (UnsupportedEncodingException e) { throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding); } } @Override public void close() { // nothing to do } }
Note that if Kafka's own serialization methods can't meet the business requirements, it can be implemented by general serialization tools such as Avro, JSON Thrift, ProtoBuf, protostaff, or by using a custom serializer, which can be specified in the properties configuration parameters.
6 Partitioner
The message of KafkaProducer may pass through serializer, interceptor and Partition after send(), and then it will really reach the Partition in Kafka broker! ~ The following is the data flow after send().
Interceptor => Serializer => Partitioner
6.1 the specific source code is as follows
// step-1 @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record) { return send(record, null); } //step-2 @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // The message record is intercepted here. After the interception, the data is operated and then sent to the next layer. The onsend() method is to modify the record and generate the interceptedRecord after the interception modification. ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); //This method doSend() is also a method in kafkaProducer. Enter this method next time } //step-3 private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { throwIfProducerClosed(); // First, confirm that the metadata of topic is available ClusterAndWaitTime clusterAndWaitTime; try { clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); } catch (KafkaException e) { if (metadata.isClosed()) throw new KafkaException("Producer closed while send in progress", e); throw e; } long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; byte[] serializedKey; //Defines the serialization result of the Key try { //Serialize key serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer", cce); } byte[] serializedValue; try { //Serialize value serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer", cce); } //This method specifies the partition to which each Record belongs according to the parameters passed and the partition DefaultPartioner. int partition = partition(record, serializedKey, serializedValue, cluster); ........ //and so on //step-4 //We click on the partition() method and find that this method comes from an interface partition. Partition implements configurable, which can be configured //In the Kafka source code, the implementation class of this partition has only one defaultpartition. Click this class to view the rewritten partition() method, as follows: public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { //hash the byte array of each serialized key according to Murmur2 algorithm, and then find the partition of the corresponding Record return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
7 producer Interceptor
Kafka interceptors are introduced in Kafka-0.10.0.0. Kafka has two kinds of interceptors
- Producer interceptor
- Consumer interceptor
Our custom interceptor only needs the implements ProducerInterceptor. Like the Partitioner, the producerinceptor inherits from the same parent interface Configurable.
There are three methods in producer interceptor. In Kafka, interceptor has no default implementation class.
onSend(ProducerRecord<K, V> record); // onAcknowledgement(RecordMetadata metadata, Exception exception); close()
7.1 simple interceptor example
public class ProducerinterceptorPrefix implements Producerinterceptor<String, String>{ //to do something that you aim to }
7.2 interceptor deployment and loading
After customizing the interceptor, we need to set it in the configuration parameters of Producer.
//Multiple classes can be specified here, and the full class name of the class is divided by `, ' properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.shufang.interceptor.ProducerinterceptorPrefix"+","+"com.shufang.interceptor.ProducerinterceptorPrefix2");