Flink state programming


In Flink architecture, stateful computing is one of the very important features of Flink

Flink benefits:

  • Support high throughput, low latency and high performance
  • Support event time Event_time concept
  • Support stateful computing

Stateful calculation refers to:

In the process of program calculation, the intermediate results generated by calculation are stored in Flink program and provided to the calculation results of subsequent functions or operators. (as shown in the figure below)

The complexity of stateless computing implementation is relatively low and easy to implement, but it is unable to complete the more complex business scenarios mentioned:

  • Calculate the status of the event that conforms to the CEP rule: store the event that conforms to the CEP rule, and then process the event that conforms to the CEP rule

  • Aggregation indicators such as maximum value and mean value (e.g. pv,uv):

  • The status needs to be used to maintain the results generated in the current calculation process, such as the total number, sum, maximum and minimum values of events

  • Machine learning scenario, maintain the parameters used by the current version of the model

  • Other calculations that need to use historical data

Flink state programming

Supported status types

Flink divides the state into Keyed State and operator state (non Keyed State) according to whether the dataset is partitioned according to the Key.

Keyed State is a special case of Operator State, which can be managed through Key Groups. It is mainly used to automatically redistribute Keyed Sate data when the operator parallelism changes

Meanwhile, in Flink, both Keyed State and Operator State have two forms:

One is in the form of managed state, which is controlled and managed by the Flink Runtime. The state data is converted into the object storage of memory Hashtables or RocksDB, and then these state data are persisted to Checkpoints through the internal interface. When the task is abnormal, the task can be recovered through these state data.

In addition, when the Checkpoint operator is used to deserialize the data from its own state structure, the Checkpoint operator does not know the state of the data stored in the process. In addition, when the Checkpoint operator is used to deserialize the data from its own state structure, the Checkpoint operator does not know the state of the data stored in the process.
In Flink, users are recommended to use Managed State to manage state data, mainly because Managed State can better support the rebalancing of state data and better memory management.

Managed Keyed State

Six types
Managed keyed states are divided into the following six types:

Basic API

In Flink, you need to create a StateDescriptor to get the operation class of the corresponding State. As shown in the following code, build a ValueState:

lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-payed-state", classOf[Boolean]))

The ValueState can be added, deleted, modified and queried:

  1. Get status value
val isPayed = isPayedState.value()
  1. Update status value
  1. Release status value

Lifecycle of state

For any type of Keyed State, the life cycle (TTL) of the state can be set to ensure that the state data can be cleared in time within the specified time.

Implementation method:

1. Generate StateTtlConfig configuration

2. Pass the StateTtlConfig configuration into the enableTimeToLive method in the StateDescriptor

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time

val ttlConfig = StateTtlConfig
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])

Managed Operator State

Operator State is a non keyed state, which is associated with parallel operator instances. For example, in KafkaConnector, each Kafka consumer operator instance corresponds to a partition of Kafka, and maintains the Topic partition and Offsets as the operator's Operator State. In Flink, you can implement two interfaces, checkpointed function or listchecked, to define the function to operate the Managed Operator State.

Case: order delay alarm statistics

Requirement description

In the e-commerce platform, what ultimately creates revenue and profits is the link where users place orders and buy; More specifically, it is the time when the user really completes the payment action. The user's order behavior can indicate the user's demand for goods, but in reality, not every order will be paid immediately by the user. When delayed for a period of time, users' willingness to pay will be reduced.

Therefore, in order to make users feel more urgent and improve the payment conversion rate, as well as to prevent the security risks in the order payment link, e-commerce websites often monitor the order status and set an expiration time (such as 15 minutes). If the order is not paid for a period of time after placing the order, the order will be cancelled.

At this time, you need to send a message to remind users to improve the payment conversion rate

requirement analysis

This requirement can be implemented by CEP. It is recommended to use the native state programming of process function.

The problem can be simplified as: when the pay event timeout does not occur, the timeout alarm information is output.

A simple idea is:

  1. Register the timer after the create event of the order and trigger it after 15 minutes;
  2. A Boolean Value state is used as the identification bit to indicate whether the pay event has occurred.
  3. If the pay event has occurred and the status is set to true, then there is no need to do anything;
  4. If the pay event has not come and the status has been false, the timeout alarm information should be output when the timer is triggered.

Data and model

Demo data:


Flink's input and output classes:

//Defines a sample class for the input order event
caseclassOrderEvent(orderId: Long, eventType: String, txId: String, eventTime: Long)
//Define output result sample class
caseclassOrderResult(orderId: Long, resultMsg: String)

code implementation

case class OrderEvent(orderId: Long, eventType: String, txId: String, eventTime: Long)

case class OrderResult(orderId: Long, resultMsg: String)

object OrderTimeOut {
  val orderTimeoutOutputTag = new OutputTag[OrderResult]("orderTimeout")

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val orderEventStream = env.socketTextStream("", 9999)
      .map(data => {
        val dataArray = data.split(",")
        OrderEvent(dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong)
      .assignAscendingTimestamps(_.eventTime * 1000L)

    val orderResultStream = orderEventStream.process(new OrderPayMatch)
    orderResultStream.getSideOutput(orderTimeoutOutputTag).print("time out order")
    env.execute("order timeout without cep job")

  class OrderPayMatch() extends KeyedProcessFunction[Long, OrderEvent, OrderResult]() {
    lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-payed-state", classOf[Boolean]))
    lazy val timerState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer-state", classOf[Long]))

    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = {
      val isPayed = isPayedState.value()
      if (isPayed) {
        ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "payed but no create"))
      } else {
        //Only create, but no pay
        ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "order timeout"))

    override def processElement(value: OrderEvent, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#Context, out: Collector[OrderResult]): Unit = {
      val isPayed = isPayedState.value()
      val timerTs = timerState.value()
      if (value.eventType == "create") {
        if (isPayed) {
          out.collect(OrderResult(value.orderId, "payed successfully"))
        } else {
          val ts = value.eventTime * 1000L + 15 * 60 * 1000L
      } else if (value.eventType == "pay") {
          if (timerTs > 0) {
            if (timerTs > value.eventTime * 1000L) {
              out.collect(OrderResult(value.orderId, "payed successfully"))
            } else {
              ctx.output(orderTimeoutOutputTag, OrderResult(value.orderId, "this order is timeout"))

          } else {
            //pay first
            ctx.timerService().registerEventTimeTimer(value.eventTime * 1000L)
            timerState.update(value.eventTime * 1000L)


Stateful computing is a good feature of Flink. In some scenarios, such as cumulative computing pv and uv, there is no need to refer to external storage in the project, such as redis. The architecture is simpler and easier to maintain.

reference resources:

  1. https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#state-time-to-live-ttl

  2. Analysis of e-commerce user behavior of big data technology

Tags: flink

Posted by beeman000 on Mon, 16 May 2022 09:39:24 +0300