In most cases, we usually use quartz open source framework to meet the application scenarios for scheduled tasks. However, if you consider other factors such as robustness, you need to make some efforts. For example, to avoid a single point of failure, you have to deploy at least two nodes. However, deploying multiple nodes has other problems. Some data can only be processed once at a certain time, such as i = i+1, which can not guarantee idempotent operations. Run has a completely different effect from run once many times. For the above problem, I have designed a zk based Distributed Lock solution: 1. Each type of scheduled job can be assigned an independent ID (such as xxx_job). 2. When instances of this type of job are deployed on multiple nodes, each node applies to zk for a distributed lock (under xxx_job node) before starting. 3. Only after obtaining the lock instance can the scheduled task be started (through code Control quartz's schedule). Those who don't get the lock are in the standby state and have been monitoring the changes of the lock. 4. If a node hangs, the distributed lock will be released automatically, and other nodes will grab the lock. According to the above logic, they will change from the standby state to the active state, and the junior three will be officially in the upper position to continue to execute the timed job. This solution basically solves the problems of HA and business correctness, but there are two shortcomings: 1. It can't make full use of the machine performance. The node in standby is actually just a spare tire and doesn't do anything at ordinary times. 2. It's inconvenient to expand the performance. For example, a job processes tens of millions of data at a time, and only one activation node needs to be processed for a long time. Well, it's time to invite the protagonist after so long foreshadowing, Elastic job is equivalent to the enhanced version of quartz+zk. It allows the fragmentation of scheduled tasks and can be deployed in clusters (the "fragmentation" of each job will be distributed to each node). If a node hangs, the fragmentation on that node will be scheduled to other nodes. There are more detailed tutorials on the official website. Generally, you can use SimpleJob. Usage steps: premise: add the dependencies of the following two jar s first
compile "com.dangdang:elastic-job-lite-core:2.1.5" compile "com.dangdang:elastic-job-lite-spring:2.1.5"
1. Your own job should inherit from SimpleJob, and then implement void execute(ShardingContext shardingContext).
public interface SimpleJob extends ElasticJob { /** * Perform the job * * @param shardingContext Fragment context */ void execute(ShardingContext shardingContext); }
Note that there is a shardingContext parameter. See the source code:
/** * Fragment context * * @author zhangliang */ @Getter @ToString public final class ShardingContext { /** * Job name */ private final String jobName; /** * Job task ID */ private final String taskId; /** * Total number of slices */ private final int shardingTotalCount; /** * Job custom parameters * Multiple identical jobs can be configured, but different parameters are used as different scheduling instances */ private final String jobParameter; /** * The partition item assigned to this job instance */ private final int shardingItem; /** * Partition parameters assigned to this job instance */ private final String shardingParameter; public ShardingContext(final ShardingContexts shardingContexts, final int shardingItem) { jobName = shardingContexts.getJobName(); taskId = shardingContexts.getTaskId(); shardingTotalCount = shardingContexts.getShardingTotalCount(); jobParameter = shardingContexts.getJobParameter(); this.shardingItem = shardingItem; shardingParameter = shardingContexts.getShardingItemParameters().get(shardingItem); } }
There are two very important attributes: shardingTotalCount total number of slices (for example: 2) and shardingItem current slice index (for example: 1). The performance expansion mentioned above can be handled simply according to two parameters. Suppose that in the e-commerce system, there is a scheduled task every night to count the sales volume of each store. The business id is generally a self increasing number in the table design. If there are two partitions in total (Note: usually two nodes are deployed), you can put the odd id into partition 0 and the even id into partition 1. In this way, the two machines run half each, which is much faster than only one machine.
The pseudo code is as follows:
public class TestJob implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { int shardIndx = shardingContext.getShardingItem(); if (shardIndx == 0) { //Process merchants with odd id } else { //Process merchants with even id } } }
This can be further simplified if used mysql Query the merchant list. There is a mod function in mysql, which can directly carry out modular operation on the merchant id
select * from shop where mod(shop_id,2)=0
If the above 2 and 0 are replaced with parameters, it is similar in mybatis:
select * from shop where mod(shop_id,#{shardTotal})=#{shardIndex}
In this way, the logic is converted to sql for processing. Just pass in the parameters in java code, and even if can be omitted.
2. Next, let's see how to configure
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg" xmlns:job="http://www.dangdang.com/schema/ddframe/job" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.dangdang.com/schema/ddframe/reg http://www.dangdang.com/schema/ddframe/reg/reg.xsd http://www.dangdang.com/schema/ddframe/job http://www.dangdang.com/schema/ddframe/job/job.xsd"> <reg:zookeeper id="regCenter" server-lists="${zk_address}" namespace="my-xxx-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3"/> <job:simple id="xxxJob" class="com.cnblogs.yjmyzz.XXXJob" registry-center-ref="regCenter" cron="${xxxJob_cornExpress}" sharding-total-count="2" sharding-item-parameters="0=A,1=B"/> ... </beans>
It is almost no different from the conventional spring configuration. The key points are as follows:
a) Because fragment scheduling is based on ZK, we need to configure the ZK registry first, where ${zk_address} can be changed to the actual ZK address list, such as 10 x.x.1:2181,10. x.x.2:2181,10. x.x.3:2181
b) The corn attribute in each job is the cornExpress expression in quartz, and then the sharding total count is the total number of slices, while the sharding item parameters specifies the specific parameters in each slice
(Note: the e-commerce just now calculates the sales volume every night. In fact, this case only uses the slice index and the number of slices, so it doesn't need parameters, so it's OK to configure a similar 0 = A and 1 = B here. If you want to know the slice index in some business scenarios and also want to send some parameters in the outside, you can configure the parameters you want here, and then you can read the corresponding parameters in execute.)
3. Console
Elastic job also provides a good UI console. The source code of the project is git clone, and mvn install can get an elastic job Lite console - ${version} tar. GZ package, unzip it, and then run bin / start SH can run. The interface is similar to the following:
Through this console, you can dynamically adjust the trigger of each scheduled task time (i.e. cornExpress). For details, please refer to the official website document - operation and maintenance platform.
4. Integration with spring cloud / spring boot
If it is a traditional spring project, it can be seamlessly integrated according to the above steps. If it is a spring cloud / spring boot, it is a little more complex.
Since spring boot advocates zero xml configuration, most configurations are replaced by code. First define a configuration class of elasticJob:
@Data @Configuration public class ElasticJobConfig { @Value("${rxQuartz.app.zkAddress}") private String zkNodes; @Value("${rxQuartz.app.namespace}") private String namespace; @Bean public ZookeeperConfiguration zkConfig() { return new ZookeeperConfiguration(zkNodes, namespace); } @Bean(initMethod = "init") public ZookeeperRegistryCenter regCenter(ZookeeperConfiguration config) { return new ZookeeperRegistryCenter(config); } }
The above code is mainly to solve the injection problem of zk registry, and then various xxjobs need to be marked with component annotation to make spring inject automatically
@Component("xxxJob") public class XXXJob extends AbstractJob { ... }
Then assemble them where they really need to be used
import com.dangdang.ddframe.job.api.simple.SimpleJob; import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration; import com.dangdang.ddframe.job.lite.api.JobScheduler; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author: yangjunming */ @Configuration public class ElasticJobs { @Autowired @Qualifier("xxxJob") public SimpleJob xxxJob; @Autowired private ZookeeperRegistryCenter regCenter; @Bean(initMethod = "init") public JobScheduler settlementJobScheduler(@Autowired @Qualifier("xxxJob") SimpleJob simpleJob, @Value("${xxxJob.billCronExpress}") final String cron, @Value("${xxxJob.shardingCount}") int shardingCount, @Value("${xxxJob.shardingItemParameters}") String shardingItemParameters) { return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingCount, shardingItemParameters)); } private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) { return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder( jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build(); } }
Detailed description of job:simple namespace attribute
Attribute name | type | Required | Default value | describe |
---|---|---|---|---|
id | String | yes | Job name | |
class | String | no | The job implementation class needs to implement the ElasticJob interface, and the scripted job does not need to be configured | |
registry-center-ref | String | yes | The reference of the registry Bean needs to refer to the declaration of reg:zookeeper | |
cron | String | yes | cron expression, used to configure job trigger time | |
sharding-total-count | int | yes | Total number of job segments | |
sharding-item-parameters | String | no | Partition serial numbers and parameters are separated by equal signs, and multiple key value pairs are separated by commas The slice serial number starts from 0 and cannot be greater than or equal to the total number of job slices For example: 0=a,1=b,2=c |
|
job-parameter | String | no | Job custom parameters Multiple identical jobs can be configured, but different parameters are used as different scheduling instances |
|
monitor-execution | boolean | no | true | Monitor job runtime status When the execution time and interval of each job are very short, it is recommended not to monitor the running state of the job to improve efficiency. Because it is instantaneous, there is no need to monitor. Please add data accumulation monitoring by yourself. Moreover, the repeated selection of data cannot be guaranteed, and idempotency should be realized in the operation. If the execution time and interval of each job are long, it is recommended to monitor the running state of the job to ensure that the data will not be selected repeatedly. |
monitor-port | int | no | -1 | Job monitoring port It is recommended to configure the job monitoring port to facilitate developers to dump job information. Usage: echo "dump" | nc 127.0.0.1 9888 |
max-time-diff-seconds | int | no | -1 | The maximum allowable time error between the local machine and the registration center is seconds If the time error exceeds the configured seconds, an exception will be thrown when the job starts If it is configured as - 1, it means that the time error is not verified |
failover | boolean | no | false | Whether to start failure transfer Failure transfer is valid only when monitorExecution is enabled |
misfire | boolean | no | true | Enable missed task re execution |
job-sharding-strategy-class | String | no | true | Implementation of class full path based on job fragmentation strategy Average allocation policy is used by default For details, see: Job slicing strategy |
description | String | no | Job description information | |
disabled | boolean | no | false | Whether the job is prohibited from starting When it can be used to deploy jobs, it is prohibited to start first, and it is started uniformly after deployment |
overwrite | boolean | no | false | Can the local configuration override the registry configuration If it can be overwritten, the local configuration shall prevail every time the job is started |
job:dataflow namespace attribute details
The job:dataflow namespace has all the attributes of the job:simple namespace. Only the unique attributes are listed below
Attribute name |
type | Required | Default value | describe |
---|---|---|---|---|
process-count-interval-seconds | int | no | 300 | Interval for counting the number of data processed by the job Unit: Second |
concurrent-data-process-thread-count | int | no | Number of CPU cores * 2 | Number of concurrent threads processing data at the same time Cannot be less than 1 Only ThroughputDataFlow jobs are valid |
fetch-data-count | int | no | 1 | Amount of data captured each time |
streaming-process | boolean | no | false |
Whether to stream data
|
Detailed description of job:script namespace attribute. Refer to the detailed description of job:simple namespace attribute for basic attributes
The job:script namespace has all the attributes of the job:simple namespace. Only the unique attributes are listed below
Attribute name | type | Required | Default value | describe |
---|---|---|---|---|
script-command-line | String | no | Scripted job execution command line |
Detailed description of job:listener namespace attribute
job:listener must be configured as a child element of job:bean
Attribute name | type | Required | Default value | describe |
---|---|---|---|---|
class | String | yes | The implementation class of pre and post task listening needs to implement ElasticJobListener interface | |
started-timeout-milliseconds | long | no | Long.MAX_VALUE | AbstractDistributeOnceElasticJobListener listener, the timeout of the execution method before the execution of the last job Unit: ms |
completed-timeout-milliseconds | long | no | Long.MAX_VALUE | AbstractDistributeOnceElasticJobListener listener, the timeout of the execution method after the execution of the last job Unit: ms |
reg:bean namespace attribute details
Attribute name | type | Required | Default value | describe |
---|---|---|---|---|
id | String | yes | The primary key of the registry in the Spring container | |
server-lists | String | yes | List of connected Zookeeper servers Include IP address and port number Multiple addresses are separated by commas For example: host1:2181,host2:2181 |
|
namespace | String | yes | Namespace of Zookeeper | |
base-sleep-time-milliseconds | int | no | 1000 | Initial value of the interval between waiting for retry Unit: ms |
max-sleep-time-milliseconds | int | no | 3000 | Maximum time between waiting for retry Unit: ms |
max-retries | int | no | 3 | max retries |
session-timeout-milliseconds | int | no | 60000 | Session timeout Unit: ms |
connection-timeout-milliseconds | int | no | 15000 | Connection timeout Unit: ms |
digest | String | no | No verification | Permission token to connect to Zookeeper The default is no permission verification |