Elastic job development guide

In most cases, we usually use quartz open source framework to meet the application scenarios for scheduled tasks. However, if you consider other factors such as robustness, you need to make some efforts. For example, to avoid a single point of failure, you have to deploy at least two nodes. However, deploying multiple nodes has other problems. Some data can only be processed once at a certain time, such as i = i+1, which can not guarantee idempotent operations. Run has a completely different effect from run once many times. For the above problem, I have designed a zk based Distributed Lock solution: 1. Each type of scheduled job can be assigned an independent ID (such as xxx_job). 2. When instances of this type of job are deployed on multiple nodes, each node applies to zk for a distributed lock (under xxx_job node) before starting. 3. Only after obtaining the lock instance can the scheduled task be started (through code Control quartz's schedule). Those who don't get the lock are in the standby state and have been monitoring the changes of the lock. 4. If a node hangs, the distributed lock will be released automatically, and other nodes will grab the lock. According to the above logic, they will change from the standby state to the active state, and the junior three will be officially in the upper position to continue to execute the timed job. This solution basically solves the problems of HA and business correctness, but there are two shortcomings: 1. It can't make full use of the machine performance. The node in standby is actually just a spare tire and doesn't do anything at ordinary times. 2. It's inconvenient to expand the performance. For example, a job processes tens of millions of data at a time, and only one activation node needs to be processed for a long time. Well, it's time to invite the protagonist after so long foreshadowing, Elastic job is equivalent to the enhanced version of quartz+zk. It allows the fragmentation of scheduled tasks and can be deployed in clusters (the "fragmentation" of each job will be distributed to each node). If a node hangs, the fragmentation on that node will be scheduled to other nodes. There are more detailed tutorials on the official website. Generally, you can use SimpleJob. Usage steps: premise: add the dependencies of the following two jar s first

compile "com.dangdang:elastic-job-lite-core:2.1.5"
compile "com.dangdang:elastic-job-lite-spring:2.1.5" 

1. Your own job should inherit from SimpleJob, and then implement void execute(ShardingContext shardingContext).

public interface SimpleJob extends ElasticJob {
    
    /**
     * Perform the job
     *
     * @param shardingContext Fragment context
     */
    void execute(ShardingContext shardingContext);
}

Note that there is a shardingContext parameter. See the source code:

/**
 * Fragment context
 * 
 * @author zhangliang
 */
@Getter
@ToString
public final class ShardingContext {
    
    /**
     * Job name
     */
    private final String jobName;
    
    /**
     * Job task ID
     */
    private final String taskId;
    
    /**
     * Total number of slices
     */
    private final int shardingTotalCount;
    
    /**
     * Job custom parameters
     * Multiple identical jobs can be configured, but different parameters are used as different scheduling instances
     */
    private final String jobParameter;
    
    /**
     * The partition item assigned to this job instance
     */
    private final int shardingItem;
    
    /**
     * Partition parameters assigned to this job instance
     */
    private final String shardingParameter;
    
    public ShardingContext(final ShardingContexts shardingContexts, final int shardingItem) {
        jobName = shardingContexts.getJobName();
        taskId = shardingContexts.getTaskId();
        shardingTotalCount = shardingContexts.getShardingTotalCount();
        jobParameter = shardingContexts.getJobParameter();
        this.shardingItem = shardingItem;
        shardingParameter = shardingContexts.getShardingItemParameters().get(shardingItem);
    }
}
There are two very important attributes: shardingTotalCount total number of slices (for example: 2) and shardingItem current slice index (for example: 1). The performance expansion mentioned above can be handled simply according to two parameters. Suppose that in the e-commerce system, there is a scheduled task every night to count the sales volume of each store. The business id is generally a self increasing number in the table design. If there are two partitions in total (Note: usually two nodes are deployed), you can put the odd id into partition 0 and the even id into partition 1. In this way, the two machines run half each, which is much faster than only one machine.

The pseudo code is as follows:

public class TestJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        int shardIndx = shardingContext.getShardingItem();
        if (shardIndx == 0) {
            //Process merchants with odd id
        } else {
            //Process merchants with even id
        }
    }
}

This can be further simplified if used mysql Query the merchant list. There is a mod function in mysql, which can directly carry out modular operation on the merchant id

select * from shop where mod(shop_id,2)=0

If the above 2 and 0 are replaced with parameters, it is similar in mybatis:

select * from shop where mod(shop_id,#{shardTotal})=#{shardIndex}

In this way, the logic is converted to sql for processing. Just pass in the parameters in java code, and even if can be omitted.

2. Next, let's see how to configure

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
       xmlns:job="http://www.dangdang.com/schema/ddframe/job"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.dangdang.com/schema/ddframe/reg
       http://www.dangdang.com/schema/ddframe/reg/reg.xsd
       http://www.dangdang.com/schema/ddframe/job
       http://www.dangdang.com/schema/ddframe/job/job.xsd">

    <reg:zookeeper id="regCenter" server-lists="${zk_address}" namespace="my-xxx-job"
                   base-sleep-time-milliseconds="1000"
                   max-sleep-time-milliseconds="3000" max-retries="3"/>

    <job:simple id="xxxJob" class="com.cnblogs.yjmyzz.XXXJob" registry-center-ref="regCenter"
                cron="${xxxJob_cornExpress}"
                sharding-total-count="2" sharding-item-parameters="0=A,1=B"/>

                ...
</beans>

It is almost no different from the conventional spring configuration. The key points are as follows:

a) Because fragment scheduling is based on ZK, we need to configure the ZK registry first, where ${zk_address} can be changed to the actual ZK address list, such as 10 x.x.1:2181,10. x.x.2:2181,10. x.x.3:2181 

b) The corn attribute in each job is the cornExpress expression in quartz, and then the sharding total count is the total number of slices, while the sharding item parameters specifies the specific parameters in each slice

(Note: the e-commerce just now calculates the sales volume every night. In fact, this case only uses the slice index and the number of slices, so it doesn't need parameters, so it's OK to configure a similar 0 = A and 1 = B here. If you want to know the slice index in some business scenarios and also want to send some parameters in the outside, you can configure the parameters you want here, and then you can read the corresponding parameters in execute.)

3. Console

Elastic job also provides a good UI console. The source code of the project is git clone, and mvn install can get an elastic job Lite console - ${version} tar. GZ package, unzip it, and then run bin / start SH can run. The interface is similar to the following:

Through this console, you can dynamically adjust the trigger of each scheduled task time (i.e. cornExpress). For details, please refer to the official website document - operation and maintenance platform.

4. Integration with spring cloud / spring boot

If it is a traditional spring project, it can be seamlessly integrated according to the above steps. If it is a spring cloud / spring boot, it is a little more complex.

Since spring boot advocates zero xml configuration, most configurations are replaced by code. First define a configuration class of elasticJob:

@Data
@Configuration
public class ElasticJobConfig {

    @Value("${rxQuartz.app.zkAddress}")
    private String zkNodes;

    @Value("${rxQuartz.app.namespace}")
    private String namespace;

    @Bean
    public ZookeeperConfiguration zkConfig() {
        return new ZookeeperConfiguration(zkNodes, namespace);
    }

    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(ZookeeperConfiguration config) {
        return new ZookeeperRegistryCenter(config);
    }
}

The above code is mainly to solve the injection problem of zk registry, and then various xxjobs need to be marked with component annotation to make spring inject automatically

@Component("xxxJob")
public class XXXJob extends AbstractJob {
   
    ...
}

Then assemble them where they really need to be used

import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author: yangjunming
 */
@Configuration
public class ElasticJobs {

    @Autowired
    @Qualifier("xxxJob")
    public SimpleJob xxxJob;
   
    @Autowired
    private ZookeeperRegistryCenter regCenter;

    @Bean(initMethod = "init")
    public JobScheduler settlementJobScheduler(@Autowired @Qualifier("xxxJob") SimpleJob simpleJob,
                                               @Value("${xxxJob.billCronExpress}") final String cron,
                                               @Value("${xxxJob.shardingCount}") int shardingCount,
                                               @Value("${xxxJob.shardingItemParameters}") String shardingItemParameters) {
        return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingCount, shardingItemParameters));
    }

    private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
        return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(
                jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();
    }
}

 

Detailed description of job:simple namespace attribute

Attribute name type Required Default value describe
id String yes   Job name
class String no   The job implementation class needs to implement the ElasticJob interface, and the scripted job does not need to be configured
registry-center-ref String yes   The reference of the registry Bean needs to refer to the declaration of reg:zookeeper
cron String yes   cron expression, used to configure job trigger time
sharding-total-count int yes   Total number of job segments
sharding-item-parameters String no   Partition serial numbers and parameters are separated by equal signs, and multiple key value pairs are separated by commas
The slice serial number starts from 0 and cannot be greater than or equal to the total number of job slices
For example:
0=a,1=b,2=c
job-parameter String no   Job custom parameters
Multiple identical jobs can be configured, but different parameters are used as different scheduling instances
monitor-execution boolean no true Monitor job runtime status
When the execution time and interval of each job are very short, it is recommended not to monitor the running state of the job to improve efficiency. Because it is instantaneous, there is no need to monitor. Please add data accumulation monitoring by yourself. Moreover, the repeated selection of data cannot be guaranteed, and idempotency should be realized in the operation.
If the execution time and interval of each job are long, it is recommended to monitor the running state of the job to ensure that the data will not be selected repeatedly.
monitor-port int no -1 Job monitoring port
It is recommended to configure the job monitoring port to facilitate developers to dump job information.
Usage: echo "dump" | nc 127.0.0.1 9888
max-time-diff-seconds int no -1 The maximum allowable time error between the local machine and the registration center is seconds
If the time error exceeds the configured seconds, an exception will be thrown when the job starts
If it is configured as - 1, it means that the time error is not verified
failover boolean no false Whether to start failure transfer
Failure transfer is valid only when monitorExecution is enabled
misfire boolean no true Enable missed task re execution
job-sharding-strategy-class String no true Implementation of class full path based on job fragmentation strategy
Average allocation policy is used by default
For details, see: Job slicing strategy
description String no   Job description information
disabled boolean no false Whether the job is prohibited from starting
When it can be used to deploy jobs, it is prohibited to start first, and it is started uniformly after deployment
overwrite boolean no false Can the local configuration override the registry configuration
If it can be overwritten, the local configuration shall prevail every time the job is started

job:dataflow namespace attribute details

The job:dataflow namespace has all the attributes of the job:simple namespace. Only the unique attributes are listed below

Attribute name

type Required Default value describe
process-count-interval-seconds int no 300 Interval for counting the number of data processed by the job
Unit: Second
concurrent-data-process-thread-count int no Number of CPU cores * 2 Number of concurrent threads processing data at the same time
Cannot be less than 1
Only ThroughputDataFlow jobs are valid
fetch-data-count int no 1 Amount of data captured each time
streaming-process boolean no false

Whether to stream data
If data is streamed, fetchData does not return null results and the job will continue to execute
If the data is not streamed, the job ends after the data is processed

 

Detailed description of job:script namespace attribute. Refer to the detailed description of job:simple namespace attribute for basic attributes

The job:script namespace has all the attributes of the job:simple namespace. Only the unique attributes are listed below

Attribute name type Required Default value describe
script-command-line String no   Scripted job execution command line

Detailed description of job:listener namespace attribute

job:listener must be configured as a child element of job:bean

Attribute name type Required Default value describe
class String yes   The implementation class of pre and post task listening needs to implement ElasticJobListener interface
started-timeout-milliseconds long no Long.MAX_VALUE AbstractDistributeOnceElasticJobListener listener, the timeout of the execution method before the execution of the last job
Unit: ms
completed-timeout-milliseconds long no Long.MAX_VALUE AbstractDistributeOnceElasticJobListener listener, the timeout of the execution method after the execution of the last job
Unit: ms

reg:bean namespace attribute details

Attribute name type Required Default value describe
id String yes   The primary key of the registry in the Spring container
server-lists String yes   List of connected Zookeeper servers
Include IP address and port number
Multiple addresses are separated by commas
For example: host1:2181,host2:2181
namespace String yes   Namespace of Zookeeper
base-sleep-time-milliseconds int no 1000 Initial value of the interval between waiting for retry
Unit: ms
max-sleep-time-milliseconds int no 3000 Maximum time between waiting for retry
Unit: ms
max-retries int no 3 max retries
session-timeout-milliseconds int no 60000 Session timeout
Unit: ms
connection-timeout-milliseconds int no 15000 Connection timeout
Unit: ms
digest String no No verification Permission token to connect to Zookeeper
The default is no permission verification

 

 

 

Tags: Distribution Quartz elastic-job

Posted by fotakis on Sun, 22 May 2022 14:35:57 +0300