[Storm] Storm programming model

1. Storm programming component

(1)Topology Topology is a task topology, which is a directed acyclic graph (DAG). (2)Spout The data collector is responsible for obtaining data from the data source and forwarding it to the subsequent bolt for processing. The common is KafkaSpout, which is to obtain data records from Kafka. (3)Bolt The data processor realizes the data processing logic in bolt. It can perform Filter filtering, functions function operation, aggregate aggregation, join combination, storing data to database and other operations. (4)stream Tuple format, set of keyvalue pairs, for example: {"name": "zhangsan", "sex", "M", "age": 26}. (5) Data flow grouping 1) shuffle grouping: randomly group and distribute tuples in the Stream to ensure that the number of tuples received by each Bolt's Task is roughly the same. 2) fields grouping: grouping by key to ensure that tuples with the same key are assigned to the same Task. 3) Global grouping: global grouping. All tuples are sent to the same Task. At this time, the concurrency number of the current Component is generally set to 1.

2. Storm component programming

(1) Topology construction Class: backtype storm. topology. TopologyBuilder (2) Preparation of sput component Implementation interface: backtype storm. topology. IRichSpout; Or inherit: backtype storm. topology. base. BaseRichSpout;

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
	 // TODO Auto-generated method stub

}	
copy

The open method is the component initialization method of sput, and the sput instance is called first and only once after it is created.

@Override
public void close() {
	// The release and closing of resources can be realized in this method
}
copy
@Override
public void nextTuple() {
	// Implement the logic of how to obtain data from the data source
	// And transmitting data to the later component bolt
}	
copy

nextTuple is used to get data through circular call.

@Override
public void ack(Object msgId) {
	// TODO Auto-generated method stub

}
copy

Topology enables the message reliability guarantee mechanism. When a Tuple is successfully processed on the topology, it calls the ack method to perform some things that should be done after the message is successfully processed.

@Override
public void fail(Object msgId) {
	// Topology enables the message reliability guarantee mechanism. What should I do if a Tuple fails to process later
	// For example, if the retry reaches the maximum number of retries, it will be discarded
}
copy
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// Declare the order of Tuple keys emitted to the following components

}
copy
@Override
public Map<String, Object> getComponentConfiguration() {
		// Set some special parameters of the component spuut
		return null;
}
copy

The Tuple format of kafkaSpout backward emission is: {"str": "msg"}. For some classes used in Topology, it is best to implement the serialization interface java io. Serializable. (3) Preparation of Bolt components Implementation interface: backtype storm. topology. Irichbolt, or inherit: backtype storm. topology. base. BaseRichBolt

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
	//Similar to the open method in spuut

}
copy

Spuutcollector is the tuple emitter in the spuut component, and OutputCollector is the tuple emitter in the Bolt component.

@Override
public void execute(Tuple input) {
		// TODO Auto-generated method stub

}
copy

The execute method is similar to spuut's nextTuple method.

@Override
public void cleanup() {
		// TODO Auto-generated method stub

}
copy

The cleanup method is similar to the close method in spout.

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// Declare the order of Tuple keys emitted to the following components

}
copy
@Override
public Map<String, Object> getComponentConfiguration() {
		// Set some special parameters of the component spuut
		return null;
}	
copy

3. Message reliability guarantee mechanism

Message launch timeout parameter topology message. timeout. Secs, unit: seconds, default: 30 seconds. Set the processing timeout of tuple: conf.setMessageTimeoutSecs(30); (1) Spuut end When transmitting a tuple, specify the messageId of the tuple; Use the cache to record the transmitted tuple and its messageId; When the tuple is processed by the subsequent bolt and receives the ack feedback from the subsequent bolt, the ack method is called to remove it from the cache; When the time-out occurs during the launch process, it will be considered as failure, call the fail method, retry or discard if the maximum number of retries is exceeded. 1) When the transmitter emits tuple s, you need to specify an msgID.

collector.emit(new Values(sentence),mssageId );
copy

2) Cache the transmitted tuple s in the map in memory, where key = msgid and value = values.

private Map<Object,Values> tuples;
copy

3) Spout receives the confirmation message from the following components that the tuple has been processed successfully, and calls the ack method. The ack method implements what needs to be done for the tuple to be processed successfully, such as removing the tuple from the memory map.

// Confirm that the launch is successful and remove the tuple from the cache
tuples.remove(msgId);
copy

4) In case of failure or timeout, spuut calls the fail method.

// retry 
Values values = tuples.get(msgId);
// Re launch
collector.emit(values,msgId );
copy

(2) Bolt end After processing the tuple sent from the previous task, you need to call the ack method of OutputCollector for feedback; When launching tuples to the following bolt, you need to anchor the previous tuple, that is, call the emitter's emit(Tuple anchor, List tuple) method. 1) If the bolt end continues to launch to the rear components, the front tuple needs to be anchored

// To enable the message reliability guarantee mechanism, you need to anchor the received tuple
collector.emit(input,new Values(word));
copy

2) After processing tuple

// Confirm the end of processing
collector.ack(input);

try{
}catch{
		// Processing failed
		collector.fail(input);
}
copy

3) tuple anchor Tectonic tuple evolution process (tuple tree).

collector.emit(input,new Values(word));
copy

Tuple Tree can be generated only after the message reliability guarantee mechanism is enabled, and the whole Tuple Tree can be considered to be processed successfully only after all branches of the tree are processed successfully. 4) acker component: performs the task of message reliability guarantee mechanism. If the message reliability guarantee mechanism is enabled, the running performance of Topology will be degraded. If the degradation exceeds the acceptable range, the solution: increase the concurrency of acker components (increase the number of executor threads executing acker tasks). topology.acker.executors this parameter indicates how many executor threads of Acker tasks are started.

conf.setNumAckers(4);
copy

Generally, the log is analyzed in real time and the message reliability guarantee mechanism is not enabled. For scenarios with high data integrity requirements, such as the banking system, for e-commerce, such as order statistics, it is necessary to enable the message reliability guarantee mechanism.

4. Storm concurrency

(1) Concurrency of worker processes: how many workers execute Topology. Parameter Topology Workers specifies how many Worker processes to start to execute Topology.

Config conf = new Config();
conf.setNumWorkers(2);
copy

(2) Concurrency of executor threads: it refers to how many threads execute spoots and bolt s. topologyBuilder.setSpout() and topologybuidler Setbolt () specifies how many executor s are enabled to execute the task.

builder.setSpout(SPOUT_ID, new SentenceSpout(),2);
builder.setBolt(SPLIT_BOLT, new SplitBolt(),3).shuffleGrouping(SPOUT_ID);
copy

(3) Directly specify the task concurrency and the concurrency of each spuut component or bolt component.

builder.setBolt(SPLIT_BOLT, new SplitBolt(),3).setNumTasks(12).shuffleGrouping(SPOUT_ID);
copy

bolt is run by 12 tasks, and 12 tasks are executed by 3 executors. Each executor thread needs to run 4 tasks, and the 4 tasks can only be executed on the executor thread in turn. (4) Dynamically change the concurrency of Topology runtime Storm supports dynamically changing (increasing or decreasing) the number of worker processes and executors without restart topology, which is called rebalancing. bin/storm rebalance topologyName -n newWorkerNums -e spoutID=newSpoutExecutorNums -e boltID=newBoltExecutorNums -n specify the number of workers; -e specifies the number of executors running spuut or bolt.

Posted by Conjurer on Sun, 08 May 2022 09:58:59 +0300