Kafka Stream flow and status

4.2 applying state operations to Kafka Stream

A purchase transaction event flow is generated in the topology above. A processing node in the topology calculates the reward score of customers according to the sales volume. However, in this process, the only thing to do is to calculate the total score of a single transaction and forward the calculation results.

If you add some status to the processor, you can track the cumulative reward score. Therefore, the first thing to do is to use the value conversion processor to convert the stateless reward processor into a stateful reward processor. In this way, the total reward points obtained so far and the time interval between two purchases can be tracked to provide more information for downstream consumers.

4.2.1 value conversion processor

The most basic state function is kstream Transform values, the following figure shows kstream How the transformvalues () method operates.

This method is semantically similar to kstream The mapvalues () method is the same, but there are some differences. One difference is that the transformvalues method needs to access a stateStore instance to complete its tasks. Another difference is the ability of this method to schedule operations to be performed periodically through the punctuate () method. We have covered the punctuate () method in detail when we discussed the processor API in Chapter 6.

4.2.2 detailed explanation of transformValues

KStream<K,VR> transformValues​(ValueTransformerSupplier<? super V ,? Extensions VR > valueTransformerSupplier,String stateStoreNames) is used to convert the value of each input record into a new value of the output record (there may be a new type). Apply a ValueTransformer (provided by the given ValueTransformerSupplier) to each input record value and calculate a new value for it. Therefore, the input record < K, V > can be converted to the output record < K: V > >. This is a stateful record by record operation (see mapValues (value mapper)). In addition, through punctuator Punctuate (long) can observe the progress of processing and perform other periodic operations.

The ValueTransformer supplier interface can be used to create one or more ValueTransformer instances.

The ValueTransformer interface is used to statefully map values to new values (possible new types). This is a record by record stateful operation, that is, each record of the stream calls transform (Object) separately, and the state can be accessed and modified. In ValueTransformer, obtain the status through ProcessorContext. To trigger periodic actions through punctuate (), you must register a schedule.

4.2.3 stateful customer rewards

The initial figure has shown you the reward processor, which belongs to the reward program in the merchant business. Initially, the reward processor used kstream The Purchase object passed in by the mapvalues () method is mapped to the RewardAccumulator object.

The RewardAccumulator object initially contains only two fields, the customer ID and the total purchase amount of the transaction. Now there are some changes in the requirements. The points are linked with the reward program. The definition of the attribute field of the RewardAccumulator class is also changed as follows:

public class RewardAccumulator {

    private String customerId; //user id
    private double purchaseTotal;//Total purchase
    private int currentRewardPoints;//Current Awards

Before that, it was an application that read the data from the rewards topic and calculated the customer's reward. But now we want the points system to maintain and calculate customers' rewards through streaming applications, and we have to obtain the time interval between customers' current and last purchase.

When the application reads records from the rewards topic, the consumer application only needs to detect whether the total number of points exceeds the threshold of the allocated reward. To achieve this goal, you can add

public class RewardAccumulator {

    private String customerId;
    private double purchaseTotal;
    private int totalRewardPoints;
    private int currentRewardPoints;
    private int daysFromLastPurchase;//Added field for tracking total points

The update rule of the Purchase procedure is very simple. Customers get a point for every dollar they spend, and the total transaction amount is calculated by rounding method. The overall structure of the topology will not change, but the rewards processing node will start from using kstream Change the mapvalues() method to use kstream Transformvalues() method. Semantically, the two methods operate in the same way, that is, they still map the Purchase object to the RewardAccumulator object. The difference is the ability to use local state to perform transitions.
Specifically, the following two main steps will be taken.
■ initialization value converter.
■ use the state to map the Purchase object to the RewardAccumulator object.

4.2.4 initialization value converter

The first step is to set or create any instance variables in the converter init () method.

//A key value store that supports place / get / delete and range query.
KeyValueStore<String, Integer> stateStore; private final String storeName; private ProcessorContext context; //ProcessorContext Processing context interface public void init(ProcessorContext context) { this.context = context; //Get status store by name stateStore = (KeyValueStore) this.context.getStateStore(storeName); }

In the converter class, convert the object type to the KeyValueStore type.

4.2.5 use status to convert Purchase object into RewardAccumulator object

Follow the steps below:

① Check the current accumulated points by customer ID

② Sum with the points of the current transaction and present the total number of points

③ Update the total number of points in the accumulator to the total number of new rewards

④ Save the new total number of points to the local status store by customer ID

public RewardAccumulator transform(Purchase value) {
        //from Purchase Object construction RewardAccumulator object
        RewardAccumulator rewardAccumulator = RewardAccumulator.builder(value).build();
        //According to the customer ID Retrieve the latest points
        Integer accumulatedSoFar = stateStore.get(rewardAccumulator.getCustomerId());
        if (accumulatedSoFar != null) {
            //If the accumulated points exist, they are added to the current total.
        //Store new points in stateStore in
        stateStore.put(rewardAccumulator.getCustomerId(), rewardAccumulator.getTotalRewardPoints());
     //Returns the new cumulative value of points
        return rewardAccumulator;


The reward point processor is realized by purchasing on behalf of the above. But before operation, you need to consider that you are accessing all sales information through the customer ID. if you collect each transaction information for a given customer, it means that all transaction information of the customer is located in the same partition. However, because the transaction information entering the application has no key, the producer allocates the transaction information to the partition by polling. Therefore, there will be contradiction between the result you want and the actual execution of the program.

There is a problem here (unless you use a topic with only one partition). Because the key is not filled, the allocation by polling means that the transaction information of the same customer may not be allocated to the same partition. It is important to put the transaction information of the same customer ID into the same partition, because you need to find records from the status store according to the customer ID. Otherwise. The customer information with the same customer ID is distributed in different partitions, so the information of the same customer needs to be found from multiple status stores.

1. Repartition of data

Although in this simple example, the space is replaced with a specific value, repartition does not always require updating the key. By using the stream partitioner, you can use any partition strategy you think of, such as partitioning with values or parts of values instead of keys.

2. Repartition in Kafka Streams

Use ksstream in Kafka Streams Repartition with the through () method is easy. KStream. The through () method creates an intermediate topic, and the current Stream instance starts to write records to this topic. Calling the through () method returns a new kstream instance that uses the same topic as the data source. In this way, the data can be seamlessly repartitioned.

At the bottom, Kafka Streams creates a receiver node and a source node. The receiver node is the sub processor that calls the KStream instance. The new KStream instance uses the new source node as its data source. You can write your own sub topologies of the same type using DSL, but use KStream The through () method is more convenient.
If you have modified or changed the key and do not need to customize the partition policy, you can rely on the DefaultPartitioner inside KafkaProducer of Kafka Streams to handle the partition. However, if you want to apply your own partitioning method, you can use the stream partition. This can be done in the next example.
Use kstream The code of the through () method is as follows. In this case, kstream The through () method takes two parameters, the subject name and a Produced instance. The Produced instance provides a serializer and deserializer (Serde) for keys and values, and a Stream Partitioner. Note that if you want to use the Serde instance of the default key and value, and you do not need to customize the partition policy, you can use kstream. With only one subject name parameter Through method.

//RewardsStreamPartitioner realization StreamPartitioner Interface that determines how the record Kafka Allocation between partitions in a topic.
RewardsStreamPartitioner streamPartitioner = new RewardsStreamPartitioner();
//adopt KStream through Method to create a new KStream example
KStream<String, Purchase> transByCustomerStream = purchaseKStream.through( "customer_transactions", Produced.with(stringSerde, purchaseSerde, streamPartitioner));

3,KStream.through() detailed explanation

KStream<K,V> through​(String topic,Produced<K,V> produced)

This method materializes the stream into a topic, and uses the Produced instance to create a new ksstream from the topic to configure the key sequence, value sequence and streampartifier.

Topic - topic name;

produce - options to use when generating topics. The sample code adopts:

 Produced<K,V> with​(Serde<K> keySerde,Serde<V> valueSerde,StreamPartitioner<? super K,? super V> partitioner)

partitioner this function is used to determine the allocation method of records between subject partitions

public class RewardsStreamPartitioner implements StreamPartitioner<String, Purchase> {

    public Integer partition(String key, Purchase value, int numPartitions) {
        return value.getCustomerId().hashCode() % numPartitions;

Note that the above code does not generate a new key, but uses an attribute (customer ID) to confirm the correct partition of the same customer

4.2.5 update reward processor

So far, a new processing node has been created, which writes the shopping object into a topic partitioned by customer ID. This new theme will also become the data source of the reward processor to be updated. This is to ensure that all information of the same customer can be written to the same partition. Therefore, the same state storage is used for all purchase information of a given customer.

Update the reward processor to use stateful transitions:

//Use stateful transitions (reward processor)
KStream<String, RewardAccumulator> statefulRewardAccumulator = transByCustomerStream.transformValues(() ->  new PurchaseRewardTransformer(rewardsStateStoreName),
//Write results to“ rewards"Subject (accept processor)
statefulRewardAccumulator.to("rewards", Produced.with(stringSerde, rewardAccumulatorSerde));

4.3 use status storage to find and record previously seen data

4.3.1 data localization

Data localization is critical to performance. Although key lookup is usually very fast, the delay caused by using remote storage in large-scale processing will become a bottleneck.
It explains the principle behind data localization. The dotted line indicates that the data retrieved from the remote database is called from the network. A solid line describes a call to an in memory data store located on the same server. As you can see, local data calls are more effective than calls to remote databases over the network.

When millions or billions of records are processed through a streaming application, even a small network delay can have a huge impact when multiplied by a large factor. Therefore, data localization also means that storage is local to each processing node and is not shared between processes or threads. In this way, if a process fails, it will not affect other streaming processes or threads. Although streaming applications sometimes require state, it should be local to processing. Each server or node of the application should have a separate data store.

4.3.2 fault recovery and fault tolerance

Backing up a state store with a theme may seem costly, but there are several mitigating factors at work: Kafka producer sends data in batches, and records are cached by default. Kafka Streams writes records to storage only when the cache is refreshed, so only the latest records for a given key are saved. The state storage provided by Kafka Streams can meet the requirements of localization and fault tolerance. For the defined processors, they are local and will not be accessed and shared across processes or threads. State storage also uses themes for backup and rapid recovery.

4.3.3 Kafka stream usage status storage

When adding a state store, a simple thing is to create a StoreSupplier instance through a static factory method in the Stores class. There are two additional classes for specifying state storage, the Meterialized class and the storebuilder class. Which kind to use depends on how storage is added to the topology. If you use a higher-order DSL, you usually use the Meterialized class.

//establish StateStore Supplier instance
String rewardsStateStoreName = "rewardsPointsStore";
KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(rewardsStateStoreName);
//establish StoreBuilder And specify the key/value
StoreBuilder<KeyValueStore<String, Integer>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.Integer());
//Add to topology

In the above code, first, create a storesupplier object, which provides a memory based key / value storage. Take the created storesupplier object as a parameter to create a StoreBuilder object, and specify that the key type is String and the value type is Integer. Finally, add statestore to the topology by providing StoreBuilder to StreamsBuilder, and The addstatestore method adds the store to the application. Therefore, the state in the processor can now be used by the name rewardsStateStoreName of the state store created above.

4.3.4 state storage fault tolerance

Logging is enabled by default for all StateStoreSupplier types. Log here refers to a Kafka topic, which is used as a change log to back up the stored values and provide fault tolerance. For example, suppose a machine running the Kafka Streams application goes down. Once the server recovers and restarts the Kafka Streams application, the state storage of the corresponding instance on the machine will be restored to their original content (the last submitted offset in the log will be changed before the crash).
When using a Stores factory with the disableLogging () method, you can disable the logging function through the disableLogging () method. However, do not disable logging at will, because doing so will remove fault tolerance from the state store and remove the resilience of the state store after crash.

4.3.5 configuration change log topics

The change log used for status storage can be configured through the withLogging (map < String, String > config) method, and the configuration parameters available for any topic can be used in the map. The configuration of the change log for the state store is important when building a Kafka Streams application. But remember, Kafka Streams will create this theme instead of creating it ourselves.
For the Kafka theme, the default data retention time of a log segment is set to one week, and there is no limit on the amount of data. This may be acceptable according to your amount of data, but you are likely to adjust these settings. In addition, the default clean-up policy is delete. Let's first see how to configure the change log subject to retain the data size of 10GB and the retention time of 2 days. The configuration code is as follows:

4.4 connect flows to increase insight

When the events in the flow do not exist independently, the flow needs to have a state. Sometimes the required state or context is another flow. In this section, you will take different events from two flows with the same key and combine them into a new event.
The best way to learn about connection flow is to look at a specific example, so we will return to the application scenario of merchant transaction. Suppose we open a new store dealing in electronic and related products (CD, DVD, smartphone, etc.). In order to try a new method, we cooperated with a state-owned cafe and set up a cafe in each store. This built-in coffee shop approach has been a great success, so we decided to start a new project - to maintain the high traffic of the electronics store by providing coffee coupons (it is expected that the increase in traffic will lead to additional purchase transactions), so we need to identify which customers bought coffee and traded in the electronics store, and then give them coupons immediately after these customers trade again.

In order to determine when to give vouchers to customers, it is necessary to connect the sales records of coffee tubes with those of electronic products stores.

4.4.1 setting data

//Predicate defining matching record, name mismatch“ coffee"Or“ electronics"Will discard
Predicate<String, Purchase> isCoffee = (key, purchase) -> purchase.getDepartment().equalsIgnoreCase("coffee"); Predicate<String, Purchase> isElectronics = (key, purchase) -> purchase.getDepartment().equalsIgnoreCase("electronics"); //Use the marked certificate to distinguish the corresponding array int coffee = 0; int electronics = 1; KStream<String, Purchase>[] kstreamByDept = purchaseKStream.branch(isCoffee, isElectronics); kstreamByDept[coffee].to( "coffee", Produced.with(stringSerde, purchaseSerde)); kstreamByDept[coffee].print(Printed.<String, Purchase>toSysOut().withLabel( "coffee")); kstreamByDept[electronics].to("electronics", Produced.with(stringSerde, purchaseSerde)); kstreamByDept[electronics].print(Printed.<String, Purchase>toSysOut().withLabel("electronics"));
KStream<K,V>[] branch​(Predicate<? super K,? super V >... Details of predictions:

The branch () method creates an array of kstreams from this stream by branching records in the original stream based on the provided predicates (what to do / what to do). Evaluate each record according to the provided predicate and evaluate the predicates in order. The branch occurs on the first match: assign the records in the original flow to the corresponding result flow of the first predicate, which is true and only assigned to the flow. If none of the predicates evaluates to true, a record is deleted. This is a stateless record by record operation. For example, in a case, if the record name does not match coffee or electronics, the record will be discarded.

The above code has branched the flow, but the purchase record of the incoming Kafka Streams application has no key, so a processor needs to be added to generate the key of customer ID, so as to form (order number + customer ID) to ensure that each purchase record is unique.

4.4.2 generate a key containing the customer ID to perform the connection

For the mall key, select the customer ID from six kinds of purchase data. To do this, you need to update the original KStream instance and create another processor between the node and the branch node.

KStream<String, Purchase>[] branchesStream = transactionStream.selectKey((k,v)-> v.getCustomerId()).branch(coffeePurchase, electronicPurchase);
KStream<KR,V> selectKey​(KeyValueMapper<? super K,? super V,? extends KR> mapper)

The selectKey () method sets a new key (possibly a new type) for each input record, and the KeyValueMapper provided will be applied to each input record and a new key will be calculated for this purpose.

In Kafka Streams, whenever a method that causes a new key (selectKey map or transform) to be generated is called, an internal boolean type identification bit will be set to true, indicating that a new ksstream instance needs to be repartitioned. If this Boolean flag is set, the partition will be repartitioned automatically when performing join, reduce or aggregation operations. In this example, if a () operation is performed on the transactionStreamelectKey, the resulting KStream instance is marked as repartitioned. In addition, if a branch operation is performed immediately, each KStream instance generated by the branch () method call is also marked as repartitioned.

1. Connect purchase records

public class PurchaseJoiner implements ValueJoiner<Purchase, Purchase, CorrelatedPurchase> {

    public CorrelatedPurchase apply(Purchase purchase, Purchase otherPurchase) {

        CorrelatedPurchase.Builder builder = CorrelatedPurchase.newBuilder();

        Date purchaseDate = purchase != null ? purchase.getPurchaseDate() : null;
        Double price = purchase != null ? purchase.getPrice() : 0.0;
        String itemPurchased = purchase != null ? purchase.getItemPurchased() : null;

        Date otherPurchaseDate = otherPurchase != null ? otherPurchase.getPurchaseDate() : null;
        Double otherPrice = otherPurchase != null ? otherPurchase.getPrice() : 0.0;
        String otherItemPurchased = otherPurchase != null ? otherPurchase.getItemPurchased() : null;

        List<String> purchasedItems = new ArrayList<>();

        if (itemPurchased != null) {

        if (otherItemPurchased != null) {

        String customerId = purchase != null ? purchase.getCustomerId() : null;
        String otherCustomerId = otherPurchase != null ? otherPurchase.getCustomerId() : null;

        builder.withCustomerId(customerId != null ? customerId : otherCustomerId)
                .withTotalAmount(price + otherPrice);

        return builder.build();

Valuejoiner < V1, V2, VR > interface is used to connect two values to any type of new value, where apply() returns the connected value composed of value1 and value2.

VR apply​(V1 value1,V2 value2)

value1 - the first value of the join;

Value - the second value of the join;

2. Realize connection

//Distinguish according to the 0 / 1 previously setExtract branch flow
KStream<String, Purchase> coffeeStream = branchesStream[COFFEE_PURCHASE];
KStream<String, Purchase> electronicsStream = branchesStream[ELECTRONICS_PURCHASE];
//To perform the interception operation ValueJoiner example, PurchaseJoiner realization ValueJoiner Interface, which is used to connect two values to new values of any type.
ValueJoiner<Purchase, Purchase, CorrelatedPurchase> purchaseJoiner = new PurchaseJoiner();
//Set the time and specify that if the time stamp of the record of the same key is within the specified time, that is, the time stamp to be recorded is earlier or later than the time stamp of the main record, the record can be connected. JoinWindows twentyMinuteWindow
= JoinWindows.of(60 * 1000 * 20); //call join Method, trigger coffeeStream and electronicesStream Automatic repartition KStream<String, CorrelatedPurchase> joinedKStream = coffeeStream.join(electronicsStream,
                                                   purchaseJoiner, twentyMinuteWindow,                                                   
//Construct connection Joined.with(stringSerde, purchaseSerde, purchaseSerde)); //Apply the results to the console joinedKStream.print(Printed.<String, CorrelatedPurchase>toSysOut().withLabel("joined KStream"));

KStream. Detailed explanation of join() method:

 KStream<K,VR> join​(KStream<K,VO> otherStream,ValueJoiner<? super V,? super VO,? extends VR> joiner,JoinWindows windows,Joined<K,V,VO> joined)

ortherStream - ksstream to be connected with this stream;

joiner - a ValueJoiner that calculates the join result for a pair of matching records;

Specification of windows joinwindows;

Joined - a joined instance that defines the Serdes to be used to serialize / deserialize the input and output of the joined stream;

To kstream The join method provides four parameters. The description of each parameter is as follows:
■ electronicssstream the electronic product purchase stream to be connected.
■ implement a joiner, valuepurchar > 1
Receive two values (not necessarily of the same type) valuejoiner The apply method executes the logic for a specific implementation and returns an object of type R (possibly a new type) (possibly a newly created object). In this example, purchaseJoiner will add some relevant information obtained from two Purchase objects and return a CorrelatedPurchase object.
■ twentymintewindow is a joinwindows instance. Joinwindows. The of method specifies the maximum time difference between two connected values. In this example, the timestamps must be within 20 minutes of each other.
■ a Joined instance provides optional parameters for performing connection operations. In this example. Provide the Serde corresponding to the key and value of the stream and the Serde corresponding to the value of the second stream. Only the Serde of the key is provided, because when connecting records, the keys of the two records must be of the same type.
[Note: serialization and deserializer (Serde) of connection operation sequence is necessary because the participants of connection operation are materialized in the window state storage. In this example, only Serde of key is provided, because the keys on both sides of connection operation must be of the same type]
This example only specifies that the purchase events should be within 20 minutes of each other, but there is no order required. As long as the timestamps are within 20 minutes of each other, the connection operation will occur.
Two additional JoinWindows () methods are available that you can use to specify the order of events.
 ■ JoinWindows. after streamA.join(streamB,......,twentyMinuteWindow.after(5000),......), This sentence specifies that the timestamp recorded by streamb is at most 5 seconds later than that recorded by streama. The start time boundary of the window remains unchanged.
 ■ JoinWindows. before-streamA. join(stream,......,twentyMinuteWindow. before(500),......), This sentence specifies that the timestamp of streamB record is up to 5 seconds earlier than that of streama record. The end time boundary of the window remains unchanged.
The time difference between the before () and after () methods is expressed in milliseconds. The time interval used for connection is an example of a sliding window, which will be described in detail in the next chapter.
[Note: in code listing 4-13, it depends on the actual timestamp of the transaction rather than the timestamp set by Kafka. In order to use the timestamp of the transaction, specify the custom timestamp extractor TransactionTimestampExtractor. class by setting StreamsConfig. DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG]
Now a connection flow has been built: customers who buy electronic products within 20 minutes after buying coffee will receive a free coffee coupon.


[note] this content is selected from Kafka Stream actual combat. I only interpret and annotate the codes that are not explained clearly, and can't reprint them without permission!

Posted by pattyb on Thu, 05 May 2022 11:10:58 +0300