1. Introduction
The first thing to realize is real-time popular commodity statistics. We will analyze it based on UserBehavior data set.
The main body of the project is written in Scala, using IDEA as the development environment for project writing, and maven as the project construction and management tool. First, we need to build the project framework.
2. Create Maven project
2.1 project framework construction
Open IDEA and create a maven project named userbehavior analysis. Because it contains multiple modules, we can take UserBehaviorAnalysis as the parent project and create a project named
The sub item of HotItemsAnalysis is used for real-time statistics of popular top N products.
Create a maven module under userbehavior analysis as a sub project and name it HotItemsAnalysis.
The parent project is only to standardize the project structure and facilitate dependency management. It does not need code implementation, so the src folder under userbehavior analysis can be deleted.
2.2 declare the version information of tools in the project
Different versions of the tools required for our whole project may affect the operation of the program, so the version information shared by all sub modules should be declared in the outermost userbehavior analysis.
In POM Add the following configuration to XML:
UserBehaviorAnalysis\pom.xml
<properties> <flink.version>1.10.0</flink.version> <scala.binary.version>2.11</scala.binary.version> <kafka.version>2.2.0</kafka.version> </properties>
2.3 adding project dependencies
For the whole project, all modules will use the components related to flick, so we introduce public dependencies in userbehavior analysis:
UserBehaviorAnalysis/pom.xml
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${scala.binary.version}</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
Similarly, for the construction of maven project, public plug-ins can be introduced:
<build> <plugins> <!-- This plug-in is used to Scala Code compiled into class file --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.4.6</version> <executions> <execution> <!-- Declare bind to maven of compile stage --> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef> jar-with-dependencies </descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
In the hotitems analysis sub module, we have not introduced any more dependencies, so we do not need to change the pom file.
2.4 data preparation
In the src/main / directory, you can see that the existing default source file directory is java, and we can change its name to scala. The data file userbehavior Copy CSV to the resource file directory src/main/resources,
We will read the data from here.
At this point, our preparations have been completed, and we can write code next.
3. Module code implementation
Calculate the hottest Top N items
In order to count the most popular goods under each window, we need to group by window again. Here, keyBy() operation is performed according to windowEnd in ItemViewCount. Then use ProcessFunction to implement
A user-defined TopN function TopNHotItems is used to calculate the top 5 items in the hits, and format the ranking results into a string for subsequent output.
package com.atguigu.hotitems_analysis import java.util.Properties import com.sun.jmx.snmp.Timestamp import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} import org.apache.flink.api.java.tuple.{Tuple, Tuple1} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.util.Collector import scala.collection.mutable.ListBuffer import scala.tools.cmd.Spec.Accumulator //Define sample classes case class UserBehavior(userId:Long, itemId:Long, categoryId:Int, behavior:String, timestamp:Long) //Defines a sample class for window aggregation results case class ItemViewCount(itemID:Long, windowEnd:Long, count:Long) object HotItems { def main(args: Array[String]): Unit = { //Create a streaming environment val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //Read data from file //val inputStream:DataStream[String] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\HotItemAnalysis\\src\\main\\resources\\UserBehavior.csv") // from kafka Read data from val properties = new Properties() properties.setProperty("bootstrap.servers","192.168.1.122:9092,192.168.1.133:9092,192.168.1.144:9092") properties.setProperty("group.id","consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") val inputStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("hotitems", new SimpleStringSchema(), properties)) // Convert the data into sample class types and extract timestamp definition watermark val dataStream:DataStream[UserBehavior] = inputStream .map(data =>{ val dataArray = data.split(",") UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong) }) .assignAscendingTimestamps(_.timestamp*1000L) // Convert the data and filter out pv Behavior, windowing aggregation, statistics number val aggStream:DataStream[ItemViewCount] = dataStream .filter(_.behavior == "pv") .keyBy("itemId") .timeWindow(Time.hours(1),Time.minutes(5)) .aggregate(new CountAgg(),new ItemCountWindowResult()) //Group and sort the window aggregation results according to the window TopN output val resultStream:DataStream[String] = aggStream .keyBy("windowEnd") .process(new TopNHotItem(5)) resultStream.print() env.execute("hot items job") } } //Custom prepolymerization function class CountAgg() extends AggregateFunction[UserBehavior, Long, Long]{ override def add(value: UserBehavior, accumulator: Long): Long = accumulator + 1 override def createAccumulator(): Long = 0L override def getResult(accumulator: Long): Long = accumulator override def merge(a: Long, b: Long): Long = a+b } class ItemCountWindowResult() extends WindowFunction[Long, ItemViewCount, Tuple, TimeWindow]{ override def apply(key: Tuple, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = { val itemId = key.asInstanceOf[Tuple1[Long]].f0 val windEnd = window.getEnd val count = input.iterator.next() out.collect(ItemViewCount(itemId,windEnd,count)) } } class TopNHotItem(n: Int) extends KeyedProcessFunction[Tuple, ItemViewCount, String]{ //Define a ListState,Used to save all of the current window count result lazy val itemCountListState: ListState[ItemViewCount] = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("itemcount-list", classOf[ItemViewCount])) override def processElement(value: ItemViewCount, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = { // Every piece of data is saved to the state itemCountListState.add(value) //Register timer, in windowEnd+100 trigger ctx.timerService().registerEventTimeTimer(value.windowEnd + 100) } //When the timer is triggered, the data is taken from the state, and then sorted and output override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = { // First extract the state data into a ListBuffer in val allItemCountList: ListBuffer[ItemViewCount] = ListBuffer() import scala.collection.JavaConversions._ for( itemCount <- itemCountListState.get()){ allItemCountList += itemCount } //according to count Value size sorting val sortedItemCountList = allItemCountList.sortBy(_.count)(Ordering.Long.reverse).take(n) //erase status itemCountListState.clear() //Format ranking information into string,Convenient monitoring and display val result:StringBuilder = new StringBuilder result.append("Time: ").append(new Timestamp(timestamp - 100)).append("\n") //ergodic sorted List, output TopN information for(i <- sortedItemCountList.indices){ //Get the of the current item count information val currentItemCount = sortedItemCountList(i) result.append("Top").append(i+1).append(":") .append(" commodity ID=").append(currentItemCount.itemID) .append(" Visits=").append(currentItemCount.count) .append("\n") } result.append("====================================\n\n") // Control output frequency Thread.sleep(1000) out.collect(result.toString()) } }