Big data practice: flink e-commerce user behavior analysis, real-time hot commodity statistics

1. Introduction

The first thing to realize is real-time popular commodity statistics. We will analyze it based on UserBehavior data set.
The main body of the project is written in Scala, using IDEA as the development environment for project writing, and maven as the project construction and management tool. First, we need to build the project framework.

2. Create Maven project

2.1 project framework construction
Open IDEA and create a maven project named userbehavior analysis. Because it contains multiple modules, we can take UserBehaviorAnalysis as the parent project and create a project named
The sub item of HotItemsAnalysis is used for real-time statistics of popular top N products.
Create a maven module under userbehavior analysis as a sub project and name it HotItemsAnalysis.
The parent project is only to standardize the project structure and facilitate dependency management. It does not need code implementation, so the src folder under userbehavior analysis can be deleted.
2.2 declare the version information of tools in the project
Different versions of the tools required for our whole project may affect the operation of the program, so the version information shared by all sub modules should be declared in the outermost userbehavior analysis.
In POM Add the following configuration to XML:
       UserBehaviorAnalysis\pom.xml
    <properties>
        <flink.version>1.10.0</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <kafka.version>2.2.0</kafka.version>
    </properties>
2.3 adding project dependencies
For the whole project, all modules will use the components related to flick, so we introduce public dependencies in userbehavior analysis:
UserBehaviorAnalysis/pom.xml
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_${scala.binary.version}</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
Similarly, for the construction of maven project, public plug-ins can be introduced:
    <build>
    <plugins>
    <!-- This plug-in is used to Scala Code compiled into class file -->
    <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.4.6</version>
        <executions>
            <execution>
                <!-- Declare bind to maven of compile stage -->
                <goals>
                    <goal>testCompile</goal>
                </goals>
            </execution>
        </executions>
    </plugin>
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.0.0</version>
    <configuration>
        <descriptorRefs>
            <descriptorRef>
                jar-with-dependencies
            </descriptorRef>
        </descriptorRefs>
    </configuration>
        <executions>
            <execution>
                <id>make-assembly</id>
                <phase>package</phase>
                <goals>
                    <goal>single</goal>
                </goals>
            </execution>
        </executions>
    </plugin>
    </plugins>
    </build>
In the hotitems analysis sub module, we have not introduced any more dependencies, so we do not need to change the pom file.
2.4 data preparation
In the src/main / directory, you can see that the existing default source file directory is java, and we can change its name to scala. The data file userbehavior Copy CSV to the resource file directory src/main/resources,
We will read the data from here.
At this point, our preparations have been completed, and we can write code next.

3. Module code implementation

Calculate the hottest Top N items
In order to count the most popular goods under each window, we need to group by window again. Here, keyBy() operation is performed according to windowEnd in ItemViewCount. Then use ProcessFunction to implement
A user-defined TopN function TopNHotItems is used to calculate the top 5 items in the hits, and format the ranking results into a string for subsequent output.
 
package com.atguigu.hotitems_analysis


import java.util.Properties

import com.sun.jmx.snmp.Timestamp
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.{Tuple, Tuple1}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer
import scala.tools.cmd.Spec.Accumulator

//Define sample classes
case class UserBehavior(userId:Long, itemId:Long, categoryId:Int, behavior:String, timestamp:Long)

//Defines a sample class for window aggregation results
case class ItemViewCount(itemID:Long, windowEnd:Long, count:Long)

object HotItems {
  def main(args: Array[String]): Unit = {
    //Create a streaming environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //Read data from file
    //val inputStream:DataStream[String] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\HotItemAnalysis\\src\\main\\resources\\UserBehavior.csv")
    // from kafka Read data from
    val properties = new Properties()
    properties.setProperty("bootstrap.servers","192.168.1.122:9092,192.168.1.133:9092,192.168.1.144:9092")
    properties.setProperty("group.id","consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")
    val inputStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("hotitems", new SimpleStringSchema(), properties))



    // Convert the data into sample class types and extract timestamp definition watermark
    val dataStream:DataStream[UserBehavior] = inputStream
      .map(data =>{
        val dataArray = data.split(",")
        UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)
      })
      .assignAscendingTimestamps(_.timestamp*1000L)

    // Convert the data and filter out pv Behavior, windowing aggregation, statistics number
    val aggStream:DataStream[ItemViewCount] = dataStream
        .filter(_.behavior == "pv")
        .keyBy("itemId")
        .timeWindow(Time.hours(1),Time.minutes(5))
        .aggregate(new CountAgg(),new ItemCountWindowResult())

    //Group and sort the window aggregation results according to the window TopN output
    val resultStream:DataStream[String] = aggStream
        .keyBy("windowEnd")
        .process(new TopNHotItem(5))

    resultStream.print()

    env.execute("hot items job")
  }

}

//Custom prepolymerization function
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long]{
  override def add(value: UserBehavior, accumulator: Long): Long = accumulator + 1

  override def createAccumulator(): Long = 0L

  override def getResult(accumulator: Long): Long = accumulator

  override def merge(a: Long, b: Long): Long = a+b

}

class ItemCountWindowResult() extends  WindowFunction[Long, ItemViewCount, Tuple, TimeWindow]{
  override def apply(key: Tuple, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
    val itemId = key.asInstanceOf[Tuple1[Long]].f0
    val windEnd = window.getEnd
    val count = input.iterator.next()
    out.collect(ItemViewCount(itemId,windEnd,count))
  }

}

class TopNHotItem(n: Int) extends KeyedProcessFunction[Tuple, ItemViewCount, String]{
  //Define a ListState,Used to save all of the current window count result
  lazy val itemCountListState: ListState[ItemViewCount] = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("itemcount-list", classOf[ItemViewCount]))

  override def processElement(value: ItemViewCount, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
    // Every piece of data is saved to the state
    itemCountListState.add(value)

    //Register timer, in windowEnd+100 trigger
    ctx.timerService().registerEventTimeTimer(value.windowEnd + 100)


  }

  //When the timer is triggered, the data is taken from the state, and then sorted and output
  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
    // First extract the state data into a ListBuffer in
    val allItemCountList: ListBuffer[ItemViewCount] = ListBuffer()
    import scala.collection.JavaConversions._
    for( itemCount <- itemCountListState.get()){
      allItemCountList += itemCount
    }

    //according to count Value size sorting
    val sortedItemCountList = allItemCountList.sortBy(_.count)(Ordering.Long.reverse).take(n)

    //erase status
    itemCountListState.clear()

    //Format ranking information into string,Convenient monitoring and display
    val result:StringBuilder = new StringBuilder
    result.append("Time: ").append(new Timestamp(timestamp - 100)).append("\n")

    //ergodic sorted List, output TopN information
    for(i <- sortedItemCountList.indices){
      //Get the of the current item count information
      val currentItemCount = sortedItemCountList(i)
      result.append("Top").append(i+1).append(":")
        .append(" commodity ID=").append(currentItemCount.itemID)
        .append(" Visits=").append(currentItemCount.count)
        .append("\n")
    }

    result.append("====================================\n\n")

    // Control output frequency
    Thread.sleep(1000)
    out.collect(result.toString())
  }

}

 

 

 

 

Tags: Big Data

Posted by The Chancer on Tue, 24 May 2022 09:56:15 +0300