new features: add distributed scheduling support. (#3732)

pull/3754/head
Huey Yao 8 months ago committed by GitHub
parent bc0e5a3d81
commit 79bca1eb42
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -30,6 +30,10 @@
<spring.ai.version>0.8.1</spring.ai.version>
<dashscope-sdk-java.version>2.14.0</dashscope-sdk-java.version>
<!-- scheduling -->
<shedlock.version>4.23.0</shedlock.version>
<schedulerx.worker.version>1.11.4</schedulerx.worker.version>
<!-- Maven Plugin Versions -->
<maven-source-plugin.version>3.2.1</maven-source-plugin.version>
<maven-javadoc-plugin.version>3.1.1</maven-javadoc-plugin.version>
@ -185,6 +189,23 @@
<version>${rocketmq.version}</version>
</dependency>
<!-- Alibaba scheduling -->
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-spring</artifactId>
<version>${shedlock.version}</version>
</dependency>
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-provider-jdbc-template</artifactId>
<version>${shedlock.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun.schedulerx</groupId>
<artifactId>schedulerx2-worker</artifactId>
<version>${schedulerx.worker.version}</version>
</dependency>
<!-- Own dependencies -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
@ -258,6 +279,12 @@
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-schedulerx</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.alibaba.spring</groupId>
<artifactId>spring-context-support</artifactId>

@ -40,7 +40,6 @@
<module>rocketmq-example/rocketmq-example-common</module>
<module>rocketmq-example/rocketmq-tx-example</module>
<module>rocketmq-example/rocketmq-pollable-consume-example</module>
<module>spring-cloud-bus-rocketmq-example</module>
<module>spring-cloud-alibaba-sidecar-examples/spring-cloud-alibaba-sidecar-nacos-example</module>
<module>spring-cloud-alibaba-sidecar-examples/spring-cloud-alibaba-sidecar-consul-example</module>
@ -52,9 +51,10 @@
<module>integrated-example/integrated-praise-consumer</module>
<module>integrated-example/integrated-common</module>
<module>integrated-example/integrated-frontend</module>
<module>ai-example/spring-cloud-ai-example</module>
<module>ai-example/spring-cloud-ai-rag-example</module>
</modules>
<module>ai-example/spring-cloud-ai-example</module>
<module>ai-example/spring-cloud-ai-rag-example</module>
<module>spring-cloud-scheduling-example</module>
</modules>
<build>

@ -0,0 +1,101 @@
# Spring Cloud Alibaba Scheduling Example
## Project description
Spring Cloud Alibaba Scheduling provides a timing task scheduling capability based on Spring Scheduling, supporting distributed scenarios for timing task scheduling. It offers a quick integration solution for timing task scheduling services in distributed scenarios.
The current offering is based on the open-source ShedLock for distributed lock acquisition, along with Alibaba Cloud's SchedulerX service [quick start](https://sca.aliyun.com/en/docs/2023/user-guide/schedulerx/quick-start/), Subsequent releases will provide access to more open-source solutions implementations.
## Project dependencies
### Access `spring-cloud-starter-alibaba-schedulerx`
Add the following dependencies to the project `pom.xml`
```xml
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-schedulerx</artifactId>
</dependency>
```
## Project config description
In the Example project, two types of integration configuration modes are provided for `shedlock` and `schedulerx`, Select the required access mode configuration file in `application.yaml`, the example defaults to the shedlock solution.
### Solution 1. Distributed shedlock integration configuration
Edit the following configuration to the `application-schedulerx.yml`:
```yaml
spring:
cloud:
scheduling:
# Distributed mode: shedlock, schedulerx
# Set config value: shedlock
distributed-mode: shedlock
datasource:
driver-class: com.mysql.cj.jdbc.Driver
url: {jdbc_url}
username: {jdbc.username}
password: {jdbc.password}
```
You should replace `{jdbc_url}`,`{jdbc.username}`, and `{jdbc.password}` with your actual database connection information.
> PrecautionsIf there's no database instance can be used, please create a database instance first.
### Solution 2. Alibaba Cloud's SchedulerX integration configuration
Edit the following configuration to the `application-schedulerx.yml`:
```yaml
spring:
cloud:
scheduling:
# Distributed mode: shedlock, schedulerx
# Set config value: schedulerx
distributed-mode: schedulerx
schedulerx:
# This configuration is required, Please get it from aliyun schedulerx console
endpoint: acm.aliyun.com
namespace: aad167f6-xxxx-xxxx-xxxx-xxxxxxxxx
groupId: xxxxx
appKey: PZm1XXXXXXXXXXXX
# Optional config, if you need to sync task to schedulerx
# task-sync: true
# region-id: public
# aliyun-access-key: XXXXXXXXXXXX
# aliyun-secret-key: XXXXXXXXXXXX
# task-model-default: standalone
```
On Alibaba Cloud service, each account has be granted a free quota for schedulerx. For detailed instructions on how to configure and use cloud product integrations, please refer to the respective product documentation. Refer to: [SchedulerX Spring Task](https://www.alibabacloud.com/help/en/schedulerx/user-guide/spring-jobs)
## Start application
After completing the above selection and configuration, simply run the `ScheduleApplication` class in Example to start the application. The `SimpleJob` class in this example project includes two Spring scheduled tasks that run every minute. Upon starting, you can expect to see the following logs:
```text
2024-05-17T11:20:59.981+08:00 INFO 66613 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-4] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-17 11:20:59 do job1...
2024-05-17T11:20:59.985+08:00 INFO 66613 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-1] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-17 11:20:59 do job2...
```
### Distributed running verification
Create two application launch configurations in IDEA, each with the startup parameter `--server.port={port}` to start the corresponding application processes. The example project defaults to using `shedlock`,
We can observe that `job1` will be triggered by both applications, whereas `job2` which has been annotated with `@SchedulerLock` will only be triggered in one application at the same time.
![idea-server-port](images/idea-server-port.png)
- ScheduleApplication-1, startup parameter: `--server.port=18080`, application logs
```text
2024-05-20T14:02:00.003+08:00 INFO 80520 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-4] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-20 14:02:00 do job1...
2024-05-20T14:03:00.008+08:00 INFO 80520 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-4] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-20 14:03:00 do job1...
2024-05-20T14:03:00.008+08:00 INFO 80520 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-1] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-20 14:03:00 do job2...
2024-05-20T14:04:00.006+08:00 INFO 80520 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-3] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-20 14:04:00 do job1...
2024-05-20T14:04:00.010+08:00 INFO 80520 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-2] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-20 14:04:00 do job2...
2024-05-20T14:05:00.003+08:00 INFO 80520 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-5] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-20 14:05:00 do job1...
```
- ScheduleApplication-2, startup parameter: `--server.port=18081`, application logs
```text
2024-05-20T14:02:00.003+08:00 INFO 80596 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-4] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-20 14:02:00 do job1...
2024-05-20T14:02:00.008+08:00 INFO 80596 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-3] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-20 14:02:00 do job2...
2024-05-20T14:03:00.004+08:00 INFO 80596 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-5] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-20 14:03:00 do job1...
2024-05-20T14:04:00.006+08:00 INFO 80596 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-3] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-20 14:04:00 do job1...
2024-05-20T14:05:00.004+08:00 INFO 80596 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-2] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-20 14:05:00 do job1...
2024-05-20T14:05:00.007+08:00 INFO 80596 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-4] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-20 14:05:00 do job2...
```

@ -0,0 +1,100 @@
# Spring Cloud Alibaba Scheduling Example
## 项目说明
Spring Cloud Alibaba Scheduling 提供了基于 Spring Scheduling 的定时任务调度能力,支持分布式场景下的定时任务调度,为分布式场景下的定时任务调度服务提供快速接入方案。
目前提供基于开源shedlock分布式抢锁模式以及阿里云上SchedulerX服务的 [快速接入](https://sca.aliyun.com/docs/2023/user-guide/schedulerx/quick-start/) ,后续将提供更多实现的开源方案接入。
## 应用依赖
### 接入 `spring-cloud-starter-alibaba-schedulerx`
在项目 pom.xml 中加入以下依赖:
```xml
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-schedulerx</artifactId>
</dependency>
```
## 配置说明
在Example工程中提供shedlock和schedulerx的两种接入配置模式***二选一***),在`application.yaml`中选择需要的接入模式配置文件案例中默认采用开源的shedlock方案。
### 方案一、分布式shedlock接入配置说明
在 application-schedulerx.yml 配置文件中修改以下配置:
```yaml
spring:
cloud:
scheduling:
# Distributed mode: shedlock, schedulerx
# Set config value: shedlock
distributed-mode: shedlock
datasource:
driver-class: com.mysql.cj.jdbc.Driver
url: {jdbc_url}
username: {jdbc.username}
password: {jdbc.password}
```
使用时请需要替换`{jdbc_url}`、`{jdbc.username}`、`{jdbc.password}`为实际自有的数据库连接信息。
> 注意:如未创建数据库,请先手动创建数据实例。
### 方案二、云产品SchedulerX接入配置说明
在 application-schedulerx.yml 配置文件中修改以下配置:
```yaml
spring:
cloud:
scheduling:
# Distributed mode: shedlock, schedulerx
# Set config value: schedulerx
distributed-mode: schedulerx
schedulerx:
# This configuration is required, Please get it from aliyun schedulerx console
endpoint: acm.aliyun.com
namespace: aad167f6-xxxx-xxxx-xxxx-xxxxxxxxx
groupId: xxxxx
appKey: PZm1XXXXXXXXXXXX
# Optional config, if you need to sync task to schedulerx
# task-sync: true
# region-id: public
# aliyun-access-key: XXXXXXXXXXXX
# aliyun-secret-key: XXXXXXXXXXXX
# task-model-default: standalone
```
阿里云上产品每个用户开通后都会有免费额度,详细云上产品接入配置使用说明,请参考:[阿里云SchedulerX Spring定时任务](https://help.aliyun.com/zh/schedulerx/user-guide/spring-jobs)
## 启动应用
在完成上述接入选择和配置后直接运行Example中的 `ScheduleApplication`类即可启动运行。本案例工程中的`SimpleJob`类包含了两个每分钟执行一次的Spring定时任务启动后可得到如下日志
```text
2024-05-17T11:20:59.981+08:00 INFO 66613 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-4] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-17 11:20:59 do job1...
2024-05-17T11:20:59.985+08:00 INFO 66613 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-1] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-17 11:20:59 do job2...
```
### 分布式运行验证
在IDEA环境中创建两个应用启动项各自分别配置启动参数`--server.port={应用端口}`,启动相应的应用进程。案例工程默认采用`shedlock`
我们可以直接看到`job1`两个应用都会同时触发,而`job2`添加了`@SchedulerLock`注解则会同一时间点只会在一个应用进程中执行。
![idea-server-port](images/idea-server-port.png)
- ScheduleApplication-1启动参数`--server.port=18080`,定时任务运行日志如下:
```text
2024-05-20T14:02:00.003+08:00 INFO 80520 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-4] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-20 14:02:00 do job1...
2024-05-20T14:03:00.008+08:00 INFO 80520 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-4] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-20 14:03:00 do job1...
2024-05-20T14:03:00.008+08:00 INFO 80520 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-1] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-20 14:03:00 do job2...
2024-05-20T14:04:00.006+08:00 INFO 80520 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-3] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-20 14:04:00 do job1...
2024-05-20T14:04:00.010+08:00 INFO 80520 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-2] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-20 14:04:00 do job2...
2024-05-20T14:05:00.003+08:00 INFO 80520 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-5] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-20 14:05:00 do job1...
```
- ScheduleApplication-2启动参数`--server.port=18081`,定时任务运行日志如下:
```text
2024-05-20T14:02:00.003+08:00 INFO 80596 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-4] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-20 14:02:00 do job1...
2024-05-20T14:02:00.008+08:00 INFO 80596 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-3] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-20 14:02:00 do job2...
2024-05-20T14:03:00.004+08:00 INFO 80596 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-5] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-20 14:03:00 do job1...
2024-05-20T14:04:00.006+08:00 INFO 80596 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-3] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-20 14:04:00 do job1...
2024-05-20T14:05:00.004+08:00 INFO 80596 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-2] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-20 14:05:00 do job1...
2024-05-20T14:05:00.007+08:00 INFO 80596 --- [spring-cloud-alibaba-schedule-example] [ sca-schedule-4] c.a.c.examples.schedule.job.SimpleJob : time=2024-05-20 14:05:00 do job2...
```

Binary file not shown.

After

Width:  |  Height:  |  Size: 573 KiB

@ -0,0 +1,60 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-cloud-alibaba-examples</artifactId>
<groupId>com.alibaba.cloud</groupId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-scheduling-example</artifactId>
<name>Spring Cloud Starter Alibaba Scheduling Example</name>
<description>Example demonstrating how to use Spring Cloud Schedule</description>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-schedulerx</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- database for shedlock start, JDBC data source configuration should be added -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<!-- database for shedlock end -->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>${maven-deploy-plugin.version}</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,36 @@
/*
* Copyright 2024-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.examples.schedule;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* ScheduleApplication.
*
* @author yaohui
*/
@SpringBootApplication
@EnableScheduling
public class ScheduleApplication {
public static void main(String[] args) {
SpringApplication.run(ScheduleApplication.class, args);
}
}

@ -0,0 +1,57 @@
/*
* Copyright 2024-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.examples.schedule.job;
import java.util.concurrent.TimeUnit;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* @author yaohui
**/
@Component
public class SimpleJob {
private static final Logger logger = LoggerFactory.getLogger(SimpleJob.class);
/**
* run without lock, all instance running at the same time.
*/
@Scheduled(cron = "0 */1 * * * ?")
public void job1() {
logger.info("time=" + DateTime.now().toString("YYYY-MM-dd HH:mm:ss") + " do job1...");
}
/**
* run with lock, only one instance running at the same time.
*
* @throws InterruptedException interrupted exception
*/
@Scheduled(cron = "0 */1 * * * ?")
@SchedulerLock(name = "lock-job2", lockAtMostFor = "10s")
public void job2() throws InterruptedException {
logger.info("time=" + DateTime.now().toString("YYYY-MM-dd HH:mm:ss") + " do job2...");
TimeUnit.SECONDS.sleep(1L);
}
}

@ -0,0 +1,17 @@
spring:
cloud:
scheduling:
# Distributed mode: shedlock, schedulerx
distributed-mode: schedulerx
schedulerx:
# This configuration is required, Please get it from aliyun schedulerx console
endpoint: acm.aliyun.com
namespace: aad167f6-xxxx-xxxx-xxxx-xxxxxxxxx
groupId: xxxxx
appKey: PZm1XXXXXXXXXXXX
# Optional config, if you need to sync task to schedulerx
# task-sync: true
# region-id: public
# aliyun-access-key: XXXXXXXXXXXX
# aliyun-secret-key: XXXXXXXXXXXX
# task-model-default: standalone

@ -0,0 +1,13 @@
spring:
cloud:
scheduling:
# Distributed mode: shedlock, schedulerx
distributed-mode: shedlock
datasource:
driver-class: com.mysql.cj.jdbc.Driver
# Change to your database jdbc url value
url: jdbc:mysql://127.0.0.1:3306/testdb?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true
# Change to your database username value
username: root
# Change to your database password value
password: 123456

@ -0,0 +1,17 @@
server:
port: 18080
spring:
profiles:
# Select the config file to use, shedlock or schedulerx
active: shedlock
application:
name: spring-cloud-alibaba-schedule-example
# Spring task scheduling config
task:
scheduling:
thread-name-prefix: sca-schedule-
pool:
size: 5
shutdown:
await-termination: true
await-termination-period: 60s

@ -27,7 +27,8 @@
<module>spring-cloud-alibaba-sentinel-gateway</module>
<module>spring-cloud-alibaba-commons</module>
<module>spring-cloud-starter-alibaba-ai</module>
</modules>
<module>spring-cloud-starter-alibaba-schedulerx</module>
</modules>
<build>
<plugins>

@ -0,0 +1,75 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-starters</artifactId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>spring-cloud-starter-alibaba-schedulerx</artifactId>
<name>Spring Cloud Starter Alibaba Scheduling</name>
<dependencies>
<!--spring boot-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
</dependency>
<!-- schedulerx dependency -->
<dependency>
<groupId>com.aliyun.schedulerx</groupId>
<artifactId>schedulerx2-worker</artifactId>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-schedulerx2</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.4.6</version>
</dependency>
<!-- schedulerx dependency end-->
<!-- shedlock dependency -->
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-spring</artifactId>
</dependency>
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-provider-jdbc-template</artifactId>
</dependency>
<!-- shedlock dependency end -->
</dependencies>
</project>

@ -0,0 +1,38 @@
/*
* Copyright 2024-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.scheduling;
/**
* @author yaohui
**/
public final class SchedulingConstants {
/**
* Scheduling config prefix.
*/
public static final String SCHEDULING_CONFIG_PREFIX = "spring.cloud.scheduling";
/**
* Scheduling distributed mode.
*/
public static final String SCHEDULING_CONFIG_DISTRIBUTED_MODE_KEY = SCHEDULING_CONFIG_PREFIX + ".distributed-mode";
private SchedulingConstants() {
throw new AssertionError("Must not instantiate constant utility class");
}
}

@ -0,0 +1,151 @@
/*
* Copyright 2024-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.scheduling.schedulerx;
import com.alibaba.schedulerx.common.domain.ExecuteMode;
import com.alibaba.schedulerx.common.domain.JobType;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* Job property.
*
* @author xiaomeng.hxm
*/
@ConfigurationProperties(prefix = SchedulerxProperties.CONFIG_PREFIX)
public final class JobProperty {
private String jobName;
private String jobType = JobType.JAVA.getKey();
private String jobModel = ExecuteMode.STANDALONE.getKey();
private String className;
private String content;
private Integer timeType;
private String timeExpression;
private String cron;
private String oneTime;
private String jobParameter;
private String description;
private boolean overwrite = false;
public String getJobType() {
return jobType;
}
public void setJobType(String jobType) {
this.jobType = jobType;
}
public String getJobModel() {
return jobModel;
}
public void setJobModel(String jobModel) {
this.jobModel = jobModel;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getCron() {
return cron;
}
public void setCron(String cron) {
this.cron = cron;
}
public String getOneTime() {
return oneTime;
}
public void setOneTime(String oneTime) {
this.oneTime = oneTime;
}
public String getJobParameter() {
return jobParameter;
}
public void setJobParameter(String jobParameter) {
this.jobParameter = jobParameter;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public boolean isOverwrite() {
return overwrite;
}
public void setOverwrite(boolean overwrite) {
this.overwrite = overwrite;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public String getJobName() {
return jobName;
}
public void setJobName(String jobName) {
this.jobName = jobName;
}
public Integer getTimeType() {
return timeType;
}
public void setTimeType(Integer timeType) {
this.timeType = timeType;
}
public String getTimeExpression() {
return timeExpression;
}
public void setTimeExpression(String timeExpression) {
this.timeExpression = timeExpression;
}
}

@ -0,0 +1,36 @@
/*
* Copyright 2024-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.scheduling.schedulerx;
import com.alibaba.cloud.scheduling.SchedulingConstants;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Import;
/**
* schedulerx2 starter.
*
* @author yaohui
**/
@EnableConfigurationProperties(SchedulerxProperties.class)
@ConditionalOnProperty(name = SchedulingConstants.SCHEDULING_CONFIG_DISTRIBUTED_MODE_KEY, havingValue = "schedulerx")
@Import({SchedulerxConfigurations.SchedulerxWorkerConfiguration.class,
SchedulerxConfigurations.SpringScheduleAdaptConfiguration.class})
public class SchedulerxAutoConfigure {
}

@ -0,0 +1,177 @@
/*
* Copyright 2024-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.scheduling.schedulerx;
import com.alibaba.cloud.scheduling.schedulerx.service.JobSyncService;
import com.alibaba.cloud.scheduling.schedulerx.service.ScheduledJobSyncConfigurer;
import com.alibaba.schedulerx.common.util.ConfigUtil;
import com.alibaba.schedulerx.common.util.StringUtils;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.domain.WorkerConstants;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.processor.springscheduling.NoOpScheduler;
import com.alibaba.schedulerx.worker.processor.springscheduling.SchedulerxAnnotationBeanPostProcessor;
import com.alibaba.schedulerx.worker.processor.springscheduling.SchedulerxSchedulingConfigurer;
import jakarta.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor;
/**
* @author yaohui
**/
public class SchedulerxConfigurations {
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(SchedulerxWorker.class)
@ConditionalOnProperty(prefix = SchedulerxProperties.CONFIG_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
static class SchedulerxWorkerConfiguration {
private static final Logger LOGGER = LogFactory.getLogger(SchedulerxAutoConfigure.class);
private static final String WORKER_STARTER_SPRING_CLOUD = "springcloud";
@Autowired
private SchedulerxProperties properties;
@Bean
public JobSyncService jobSyncService() {
return new JobSyncService();
}
@PostConstruct
public void syncJobs() throws Exception {
if (!properties.getJobs().isEmpty()) {
LOGGER.info("{}.jobs is not empty, start to sync jobs...", SchedulerxProperties.CONFIG_PREFIX);
jobSyncService().syncJobs();
LOGGER.info("sync jobs finished.");
}
}
@Bean
public SchedulerxWorker schedulerxWorker() {
SchedulerxWorker schedulerxWorker = new SchedulerxWorker();
schedulerxWorker.setDomainName(properties.getDomainName());
schedulerxWorker.setGroupId(properties.getGroupId());
schedulerxWorker.setEnableBatchWork(properties.isEnableBatchWork());
schedulerxWorker.setDisableSites(properties.getDisableSites());
schedulerxWorker.setEnableSites(properties.getEnableSites());
schedulerxWorker.setDisableUnits(properties.getDisableUnits());
schedulerxWorker.setEnableUnits(properties.getEnableUnits());
schedulerxWorker.setAppKey(properties.getAppKey());
schedulerxWorker.setAliyunAccessKey(properties.getAliyunAccessKey());
schedulerxWorker.setAliyunSecretKey(properties.getAliyunSecretKey());
schedulerxWorker.setNamespace(properties.getNamespace());
schedulerxWorker.setHost(properties.getHost());
schedulerxWorker.setPort(properties.getPort());
schedulerxWorker.setEndpoint(properties.getEndpoint());
schedulerxWorker.setNamespaceSource(properties.getNamespaceSource());
schedulerxWorker.setMaxTaskBodySize(properties.getMaxTaskBodySize());
schedulerxWorker.setBlockAppStart(properties.isBlockAppStart());
schedulerxWorker.setSTSAccessKey(properties.getStsAccessKey());
schedulerxWorker.setSTSSecretKey(properties.getStsSecretKey());
schedulerxWorker.setSTSSecretToken(properties.getStsToken());
schedulerxWorker.setSlsCollectorEnable(properties.isSlsCollectorEnable());
schedulerxWorker.setShareContainerPool(properties.isShareContainerPool());
schedulerxWorker.setThreadPoolMode(properties.getThreadPoolMode());
schedulerxWorker.setLabel(properties.getLabel());
schedulerxWorker.setLabelPath(properties.getLabelPath());
if (properties.isShareContainerPool() || WorkerConstants.THREAD_POOL_MODE_ALL.equals(properties.getThreadPoolMode())) {
schedulerxWorker.setSharePoolSize(properties.getSharePoolSize());
schedulerxWorker.setSharePoolQueueSize(properties.getSharePoolQueueSize());
}
if (StringUtils.isNotEmpty(properties.getEndpointPort())) {
schedulerxWorker.setEndpointPort(Integer.parseInt(properties.getEndpointPort()));
}
schedulerxWorker.setEnableCgroupMetrics(properties.isEnableCgroupMetrics());
if (properties.isEnableCgroupMetrics()) {
schedulerxWorker.setCgroupPathPrefix(properties.getCgroupPathPrefix());
}
if (StringUtils.isNotEmpty(properties.getNamespaceSource())) {
schedulerxWorker.setNamespaceSource(properties.getNamespaceSource());
}
schedulerxWorker.setAkkaRemotingAutoRecover(properties.isAkkaRemotingAutoRecover());
schedulerxWorker.setEnableHeartbeatLog(properties.isEnableHeartbeatLog());
schedulerxWorker.setMapMasterStatusCheckInterval(properties.getMapMasterStatusCheckInterval());
schedulerxWorker.setEnableSecondDelayCycleIntervalMs(properties.isEnableSecondDealyCycleIntervalMs());
schedulerxWorker.setEnableMapMasterFailover(properties.isEnableMapMasterFailover());
schedulerxWorker.setEnableSecondDelayStandaloneDispatch(properties.isEnableSecondDelayStandaloneDispatch());
schedulerxWorker.setPageSize(properties.getPageSize());
schedulerxWorker.setGraceShutdownMode(properties.getGraceShutdownMode());
if (properties.getGraceShutdownTimeout() > 0) {
schedulerxWorker.setGraceShutdownTimeout(properties.getGraceShutdownTimeout());
}
schedulerxWorker.setBroadcastDispatchThreadNum(properties.getBroadcastDispatchThreadNum());
schedulerxWorker.setBroadcastDispatchThreadEnable(properties.isBroadcastDispatchThreadEnable());
schedulerxWorker.setBroadcastMasterExecEnable(properties.isBroadcastMasterExecEnable());
schedulerxWorker.setBroadcastDispatchRetryTimes(properties.getBroadcastDispatchRetryTimes());
schedulerxWorker.setProcessorPoolSize(properties.getProcessorPoolSize());
schedulerxWorker.setMapMasterDispatchRandom(properties.isMapMasterDispatchRandom());
schedulerxWorker.setMapMasterRouterStrategy(properties.getMapMasterRouterStrategy());
if (StringUtils.isNotEmpty(properties.getH2DatabaseUser())) {
schedulerxWorker.setH2DatabaseUser(properties.getH2DatabaseUser());
}
if (StringUtils.isNotEmpty(properties.getH2DatabasePassword())) {
schedulerxWorker.setH2DatabasePassword(properties.getH2DatabasePassword());
}
schedulerxWorker.setHttpServerEnable(properties.getHttpServerEnable());
schedulerxWorker.setHttpServerPort(properties.getHttpServerPort());
if (properties.getMaxMapDiskPercent() != null) {
schedulerxWorker.setMaxMapDiskPercent(properties.getMaxMapDiskPercent());
}
ConfigUtil.getWorkerConfig().setProperty(WorkerConstants.WORKER_STARTER_MODE,
WORKER_STARTER_SPRING_CLOUD);
return schedulerxWorker;
}
}
@Configuration(proxyBeanMethods = false)
@AutoConfigureAfter(SchedulerxWorkerConfiguration.class)
@ConditionalOnBean(SchedulerxWorker.class)
static class SpringScheduleAdaptConfiguration {
@Bean(ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME)
public NoOpScheduler noOpScheduler() {
return new NoOpScheduler();
}
@Bean
public SchedulerxSchedulingConfigurer schedulerxSchedulingConfigurer() {
return new SchedulerxSchedulingConfigurer();
}
@Bean
public SchedulerxAnnotationBeanPostProcessor schedulerxAnnotationBeanPostProcessor() {
return new SchedulerxAnnotationBeanPostProcessor();
}
@Bean
@ConditionalOnProperty(prefix = SchedulerxProperties.CONFIG_PREFIX, name = "task-sync", havingValue = "true")
public ScheduledJobSyncConfigurer scheduledJobSyncConfigurer() {
return new ScheduledJobSyncConfigurer();
}
}
}

@ -0,0 +1,786 @@
/*
* Copyright 2024-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.scheduling.schedulerx;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import com.alibaba.cloud.scheduling.SchedulingConstants;
import com.alibaba.schedulerx.common.domain.ContactInfo;
import com.alibaba.schedulerx.common.util.JsonUtil;
import com.alibaba.schedulerx.worker.domain.WorkerConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* schedulerx worker properties.
*
* @author yaohui
**/
@ConfigurationProperties(prefix = SchedulerxProperties.CONFIG_PREFIX)
public class SchedulerxProperties implements InitializingBean {
private static final Logger logger = LoggerFactory.getLogger(SchedulerxProperties.class);
/**
* schedulerx config prefix.
*/
public static final String CONFIG_PREFIX = SchedulingConstants.SCHEDULING_CONFIG_PREFIX + ".schedulerx";
/**
* domainName.
*/
private String domainName;
/**
* groupId.
*/
private String groupId;
/**
* host.
*/
private String host;
/**
* client port.
*/
private int port = 0;
private String enableUnits;
private String disableUnits;
private String enableSites;
private String disableSites;
private boolean enableBatchWork;
/**
* enabled: true; false.
*/
private boolean enabled = true;
/**
* appName.
*/
private String appName;
/**
* appKey.
*/
private String appKey;
/**
* aliyunRamRole.
*/
private String aliyunRamRole;
/**
* aliyunAccessKey.
*/
private String aliyunAccessKey;
/**
* aliyunSecretKey.
*/
private String aliyunSecretKey;
/**
* STS ak.
*/
private String stsAccessKey;
/**
* STS sk.
*/
private String stsSecretKey;
/**
* STS secret token.
*/
private String stsToken;
/**
* Namespace UID.
*/
private String namespace;
/**
* endpoint.
*/
private String endpoint;
/**
* endpointPort.
*/
private String endpointPort;
/**
* namespaceName.
*/
private String namespaceName;
/**
* namespaceSource.
*/
private String namespaceSource;
/**
* maxTaskBodySize (byte).
*/
private int maxTaskBodySize = WorkerConstants.TASK_BODY_SIZE_MAX_DEFAULT;
private boolean blockAppStart = true;
/**
* slsCollectorEnable.
*/
private boolean slsCollectorEnable = true;
/**
* shareContainerPool.
*/
private boolean shareContainerPool = false;
/**
* threadPoolMode.
*/
private String threadPoolMode;
/**
* sharePoolSize.
*/
private int sharePoolSize = WorkerConstants.SHARE_POOL_SIZE_DEFAULT;
/**
* sharePoolQueueSize.
*/
private int sharePoolQueueSize = Integer.MAX_VALUE;
/**
* Wlabel.
*/
private String label;
private String labelPath = "/etc/podinfo/annotations";
/**
* enableCgroupMetrics.
*/
private boolean enableCgroupMetrics = false;
/**
* cgroupPathPrefix.
*/
private String cgroupPathPrefix = "/sys/fs/cgroup/cpu/";
/**
* akkaRemotingAutoRecover.
*/
private boolean akkaRemotingAutoRecover = true;
/**
* enableHeartbeatLog.
*/
private boolean enableHeartbeatLog = true;
/**
* mapMasterStatusCheckInterval(ms).
*/
private int mapMasterStatusCheckInterval = WorkerConstants.Map_MASTER_STATUS_CHECK_INTERVAL_DEFAULT;
/**
* enableSecondDealyCycleIntervalMs.
*/
private boolean enableSecondDealyCycleIntervalMs = false;
/**
* enableMapMasterFailover.
*/
private boolean enableMapMasterFailover = true;
/**
* enableSecondDelayStandaloneDispatch.
*/
private boolean enableSecondDelayStandaloneDispatch = false;
/**
* pageSize.
*/
private int pageSize = 1000;
/**
* GraceShutdownMode(WAIT_ALL; WAIT_RUNNING;).
*/
private String graceShutdownMode;
/**
* graceShutdownTimeout.
*/
private long graceShutdownTimeout = WorkerConstants.GRACE_SHUTDOWN_TIMEOUT_DEFAULT;
/**
* broadcastDispatchThreadNum.
*/
private int broadcastDispatchThreadNum = 4;
/**
* broadcastDispatchRetryTimes.
*/
private int broadcastDispatchRetryTimes = 1;
/**
* broadcastDispatchThreadEnable.
*/
private boolean broadcastDispatchThreadEnable = false;
/**
* broadcastMasterExecEnable.
*/
private boolean broadcastMasterExecEnable = true;
/**
* mapMasterDispatchRandom.
*/
private boolean mapMasterDispatchRandom = false;
private Integer mapMasterRouterStrategy;
private String regionId;
/**
* h2DatabaseUser.
*/
private String h2DatabaseUser;
/**
* h2DatabasePassword.
*/
private String h2DatabasePassword;
/**
* httpServerEnable.
*/
private Boolean httpServerEnable;
/**
* httpServerPort.
*/
private Integer httpServerPort;
/**
* maxMapDiskPercent.
*/
private Float maxMapDiskPercent;
private Map<String, JobProperty> jobs = new LinkedHashMap<>();
private String alarmChannel;
private Map<String, ContactInfo> alarmUsers = new LinkedHashMap<>();
private Map<String, Integer> processorPoolSize = new HashMap<>();
public String getDomainName() {
return domainName;
}
public void setDomainName(String domainName) {
this.domainName = domainName;
}
public String getGroupId() {
return groupId;
}
public void setGroupId(String groupId) {
this.groupId = groupId;
}
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getEnableUnits() {
return enableUnits;
}
public void setEnableUnits(String enableUnits) {
this.enableUnits = enableUnits;
}
public String getDisableUnits() {
return disableUnits;
}
public void setDisableUnits(String disableUnits) {
this.disableUnits = disableUnits;
}
public String getEnableSites() {
return enableSites;
}
public void setEnableSites(String enableSites) {
this.enableSites = enableSites;
}
public String getDisableSites() {
return disableSites;
}
public void setDisableSites(String disableSites) {
this.disableSites = disableSites;
}
public boolean isEnableBatchWork() {
return enableBatchWork;
}
public void setEnableBatchWork(boolean enableBatchWork) {
this.enableBatchWork = enableBatchWork;
}
public String getAliyunAccessKey() {
return aliyunAccessKey;
}
public void setAliyunAccessKey(String aliyunAccessKey) {
this.aliyunAccessKey = aliyunAccessKey;
}
public String getAliyunSecretKey() {
return aliyunSecretKey;
}
public void setAliyunSecretKey(String aliyunSecretKey) {
this.aliyunSecretKey = aliyunSecretKey;
}
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
public String getEndpoint() {
return endpoint;
}
public void setEndpoint(String endpoint) {
this.endpoint = endpoint;
}
public String getEndpointPort() {
return endpointPort;
}
public void setEndpointPort(String endpointPort) {
this.endpointPort = endpointPort;
}
public String getNamespaceName() {
return namespaceName;
}
public void setNamespaceName(String namespaceName) {
this.namespaceName = namespaceName;
}
public String getNamespaceSource() {
return namespaceSource;
}
public void setNamespaceSource(String namespaceSource) {
this.namespaceSource = namespaceSource;
}
public int getMaxTaskBodySize() {
return maxTaskBodySize;
}
public void setMaxTaskBodySize(int maxTaskBodySize) {
this.maxTaskBodySize = maxTaskBodySize;
}
public boolean isBlockAppStart() {
return blockAppStart;
}
public void setBlockAppStart(boolean blockAppStart) {
this.blockAppStart = blockAppStart;
}
public String getAppName() {
return appName;
}
public void setAppName(String appName) {
this.appName = appName;
}
public String getAppKey() {
return appKey;
}
public void setAppKey(String appKey) {
this.appKey = appKey;
}
public String getStsAccessKey() {
return stsAccessKey;
}
public void setStsAccessKey(String stsAccessKey) {
this.stsAccessKey = stsAccessKey;
}
public String getStsSecretKey() {
return stsSecretKey;
}
public void setStsSecretKey(String stsSecretKey) {
this.stsSecretKey = stsSecretKey;
}
public String getStsToken() {
return stsToken;
}
public String getAliyunRamRole() {
return aliyunRamRole;
}
public void setAliyunRamRole(String aliyunRamRole) {
this.aliyunRamRole = aliyunRamRole;
}
public void setStsToken(String stsToken) {
this.stsToken = stsToken;
}
public boolean isSlsCollectorEnable() {
return slsCollectorEnable;
}
public void setSlsCollectorEnable(boolean slsCollectorEnable) {
this.slsCollectorEnable = slsCollectorEnable;
}
public boolean isShareContainerPool() {
return shareContainerPool;
}
public void setShareContainerPool(boolean shareContainerPool) {
this.shareContainerPool = shareContainerPool;
}
public int getSharePoolSize() {
return sharePoolSize;
}
public void setSharePoolSize(int sharePoolSize) {
this.sharePoolSize = sharePoolSize;
}
public String getLabel() {
return label;
}
public void setLabel(String label) {
if (label != null) {
if (label.startsWith("#") && label.endsWith("#")) {
String labelKey = label.substring(1, label.length() - 1);
this.label = System.getenv(labelKey);
return;
}
}
this.label = label;
}
public String getLabelPath() {
return labelPath;
}
public void setLabelPath(String labelPath) {
this.labelPath = labelPath;
}
public boolean isEnableCgroupMetrics() {
return enableCgroupMetrics;
}
public void setEnableCgroupMetrics(boolean enableCgroupMetrics) {
this.enableCgroupMetrics = enableCgroupMetrics;
}
public String getCgroupPathPrefix() {
return cgroupPathPrefix;
}
public void setCgroupPathPrefix(String cgroupPathPrefix) {
this.cgroupPathPrefix = cgroupPathPrefix;
}
public boolean isAkkaRemotingAutoRecover() {
return akkaRemotingAutoRecover;
}
public void setAkkaRemotingAutoRecover(boolean akkaRemotingAutoRecover) {
this.akkaRemotingAutoRecover = akkaRemotingAutoRecover;
}
public boolean isEnableHeartbeatLog() {
return enableHeartbeatLog;
}
public void setEnableHeartbeatLog(boolean enableHeartbeatLog) {
this.enableHeartbeatLog = enableHeartbeatLog;
}
public int getMapMasterStatusCheckInterval() {
return mapMasterStatusCheckInterval;
}
public void setMapMasterStatusCheckInterval(int mapMasterStatusCheckInterval) {
this.mapMasterStatusCheckInterval = mapMasterStatusCheckInterval;
}
public boolean isEnableSecondDealyCycleIntervalMs() {
return enableSecondDealyCycleIntervalMs;
}
public void setEnableSecondDealyCycleIntervalMs(boolean enableSecondDealyCycleIntervalMs) {
this.enableSecondDealyCycleIntervalMs = enableSecondDealyCycleIntervalMs;
}
public boolean isEnableMapMasterFailover() {
return enableMapMasterFailover;
}
public void setEnableMapMasterFailover(boolean enableMapMasterFailover) {
this.enableMapMasterFailover = enableMapMasterFailover;
}
public boolean isEnableSecondDelayStandaloneDispatch() {
return enableSecondDelayStandaloneDispatch;
}
public void setEnableSecondDelayStandaloneDispatch(boolean enableSecondDelayStandaloneDispatch) {
this.enableSecondDelayStandaloneDispatch = enableSecondDelayStandaloneDispatch;
}
public int getPageSize() {
return pageSize;
}
public String getGraceShutdownMode() {
return graceShutdownMode;
}
public void setGraceShutdownMode(String graceShutdownMode) {
this.graceShutdownMode = graceShutdownMode;
}
public long getGraceShutdownTimeout() {
return graceShutdownTimeout;
}
public void setGraceShutdownTimeout(long graceShutdownTimeout) {
this.graceShutdownTimeout = graceShutdownTimeout;
}
public void setPageSize(int pageSize) {
this.pageSize = pageSize;
}
public String getRegionId() {
return regionId;
}
public void setRegionId(String regionId) {
this.regionId = regionId;
}
public Map<String, JobProperty> getJobs() {
return jobs;
}
public void setJobs(Map<String, JobProperty> jobs) {
this.jobs = jobs;
}
public String getAlarmChannel() {
return alarmChannel;
}
public void setAlarmChannel(String alarmChannel) {
this.alarmChannel = alarmChannel;
}
public Map<String, ContactInfo> getAlarmUsers() {
return alarmUsers;
}
public void setAlarmUsers(Map<String, ContactInfo> alarmUsers) {
this.alarmUsers = alarmUsers;
}
public int getBroadcastDispatchThreadNum() {
return broadcastDispatchThreadNum;
}
public void setBroadcastDispatchThreadNum(int broadcastDispatchThreadNum) {
this.broadcastDispatchThreadNum = broadcastDispatchThreadNum;
}
public boolean isBroadcastDispatchThreadEnable() {
return broadcastDispatchThreadEnable;
}
public void setBroadcastDispatchThreadEnable(boolean broadcastDispatchThreadEnable) {
this.broadcastDispatchThreadEnable = broadcastDispatchThreadEnable;
}
public String getThreadPoolMode() {
return threadPoolMode;
}
public void setThreadPoolMode(String threadPoolMode) {
this.threadPoolMode = threadPoolMode;
}
public Map<String, Integer> getProcessorPoolSize() {
return processorPoolSize;
}
public void setProcessorPoolSize(Map<String, Integer> processorPoolSize) {
this.processorPoolSize = processorPoolSize;
}
public int getSharePoolQueueSize() {
return sharePoolQueueSize;
}
public void setSharePoolQueueSize(int sharePoolQueueSize) {
this.sharePoolQueueSize = sharePoolQueueSize;
}
public boolean isMapMasterDispatchRandom() {
return mapMasterDispatchRandom;
}
public void setMapMasterDispatchRandom(boolean mapMasterDispatchRandom) {
this.mapMasterDispatchRandom = mapMasterDispatchRandom;
}
public boolean isBroadcastMasterExecEnable() {
return broadcastMasterExecEnable;
}
public void setBroadcastMasterExecEnable(boolean broadcastMasterExecEnable) {
this.broadcastMasterExecEnable = broadcastMasterExecEnable;
}
public int getBroadcastDispatchRetryTimes() {
return broadcastDispatchRetryTimes;
}
public void setBroadcastDispatchRetryTimes(int broadcastDispatchRetryTimes) {
this.broadcastDispatchRetryTimes = broadcastDispatchRetryTimes;
}
public Integer getMapMasterRouterStrategy() {
return mapMasterRouterStrategy;
}
public void setMapMasterRouterStrategy(Integer mapMasterRouterStrategy) {
this.mapMasterRouterStrategy = mapMasterRouterStrategy;
}
public String getH2DatabaseUser() {
return h2DatabaseUser;
}
public void setH2DatabaseUser(String h2DatabaseUser) {
this.h2DatabaseUser = h2DatabaseUser;
}
public String getH2DatabasePassword() {
return h2DatabasePassword;
}
public void setH2DatabasePassword(String h2DatabasePassword) {
this.h2DatabasePassword = h2DatabasePassword;
}
public Boolean getHttpServerEnable() {
return httpServerEnable;
}
public void setHttpServerEnable(Boolean httpServerEnable) {
this.httpServerEnable = httpServerEnable;
}
public Integer getHttpServerPort() {
return httpServerPort;
}
public void setHttpServerPort(Integer httpServerPort) {
this.httpServerPort = httpServerPort;
}
public Float getMaxMapDiskPercent() {
return maxMapDiskPercent;
}
public void setMaxMapDiskPercent(float maxMapDiskPercent) {
this.maxMapDiskPercent = maxMapDiskPercent;
}
@Override
public void afterPropertiesSet() {
logger.info("SchedulerxProperties->" + JsonUtil.toJson(this));
}
}

@ -0,0 +1,85 @@
/*
* Copyright 2024-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.scheduling.schedulerx.constants;
/**
* Schedulerx constants.
*
* @author yaohui
*/
public final class SchedulerxConstants {
/**
* Schedulerx default namespace source.
*/
public static final String NAMESPACE_SOURCE_SPRINGBOOT = "springboot";
/**
* Aliyun pop product.
*/
public static final String ALIYUN_POP_PRODUCT = "schedulerx2";
/**
* Aliyun pop endpoint.
*/
public static final String ALIYUN_POP_SCHEDULERX_ENDPOINT = "schedulerx.aliyuncs.com";
/**
* Second delay max value.
*/
public static final int SECOND_DELAY_MAX_VALUE = 60;
/**
* Second delay min value.
*/
public static final int SECOND_DELAY_MIN_VALUE = 1;
/**
* Job timeout default value.
*/
public static final long JOB_TIMEOUT_DEFAULT = 3600L;
/**
* Job retry count default value.
*/
public static final int JOB_RETRY_COUNT_DEFAULT = 3;
/**
* Job retry interval default value.
*/
public static final int JOB_RETRY_INTERVAL_DEFAULT = 30;
/**
* Job alarm channel default value.
*/
public static final String JOB_ALARM_CHANNEL_DEFAULT = "default";
/**
* Job model mapreduce alias.
*/
public static final String JOB_MODEL_MAPREDUCE_ALIAS = "mapreduce";
@Override
public String toString() {
return super.toString();
}
private SchedulerxConstants() {
throw new AssertionError("Must not instantiate constant utility class");
}
}

@ -0,0 +1,435 @@
/*
* Copyright 2024-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.scheduling.schedulerx.service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import com.alibaba.cloud.scheduling.schedulerx.JobProperty;
import com.alibaba.cloud.scheduling.schedulerx.SchedulerxProperties;
import com.alibaba.cloud.scheduling.schedulerx.constants.SchedulerxConstants;
import com.alibaba.cloud.scheduling.schedulerx.util.CronExpression;
import com.alibaba.schedulerx.common.domain.ContactInfo;
import com.alibaba.schedulerx.common.domain.ExecuteMode;
import com.alibaba.schedulerx.common.domain.JobType;
import com.alibaba.schedulerx.common.domain.TimeType;
import com.alibaba.schedulerx.common.sdk.common.MonitorConfig;
import com.alibaba.schedulerx.common.util.JsonUtil;
import com.alibaba.schedulerx.common.util.StringUtils;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.auth.InstanceProfileCredentialsProvider;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.schedulerx2.model.v20190430.CreateAppGroupRequest;
import com.aliyuncs.schedulerx2.model.v20190430.CreateAppGroupResponse;
import com.aliyuncs.schedulerx2.model.v20190430.CreateJobRequest;
import com.aliyuncs.schedulerx2.model.v20190430.CreateJobResponse;
import com.aliyuncs.schedulerx2.model.v20190430.CreateNamespaceRequest;
import com.aliyuncs.schedulerx2.model.v20190430.CreateNamespaceResponse;
import com.aliyuncs.schedulerx2.model.v20190430.GetJobInfoRequest;
import com.aliyuncs.schedulerx2.model.v20190430.GetJobInfoResponse;
import com.aliyuncs.schedulerx2.model.v20190430.GetJobInfoResponse.Data.JobConfigInfo;
import com.aliyuncs.schedulerx2.model.v20190430.UpdateJobRequest;
import com.aliyuncs.schedulerx2.model.v20190430.UpdateJobResponse;
import org.springframework.beans.factory.annotation.Autowired;
/**
* JobSyncService.
*
* @author xiaomeng.hxm
*/
public class JobSyncService {
private static final Logger logger = LogFactory.getLogger(JobSyncService.class);
@Autowired
private SchedulerxProperties properties;
private DefaultAcsClient client;
private synchronized DefaultAcsClient getClient() {
// build aliyun pop client
if (client == null) {
DefaultProfile.addEndpoint(properties.getRegionId(), SchedulerxConstants.ALIYUN_POP_PRODUCT, SchedulerxConstants.ALIYUN_POP_SCHEDULERX_ENDPOINT);
if (StringUtils.isNotEmpty(properties.getAliyunRamRole())) {
DefaultProfile profile = DefaultProfile.getProfile(properties.getRegionId());
InstanceProfileCredentialsProvider provider = new InstanceProfileCredentialsProvider(
properties.getAliyunRamRole());
client = new DefaultAcsClient(profile, provider);
}
else {
DefaultProfile defaultProfile = DefaultProfile.getProfile(properties.getRegionId(),
properties.getAliyunAccessKey(),
properties.getAliyunSecretKey());
client = new DefaultAcsClient(defaultProfile);
}
}
return client;
}
/**
* Sync job config.
*
* @param jobs job configs
* @param namespaceSource namespace source
* @throws Exception sync job config exception
*/
public synchronized void syncJobs(Map<String, JobProperty> jobs, String namespaceSource) throws Exception {
DefaultAcsClient client = getClient();
for (Entry<String, JobProperty> entry : jobs.entrySet()) {
String jobName = entry.getKey();
JobProperty jobProperty = entry.getValue();
JobConfigInfo jobConfigInfo = getJob(client, jobName, namespaceSource);
if (jobConfigInfo == null) {
createJob(client, jobName, jobProperty, namespaceSource);
}
else if (jobProperty.isOverwrite()) {
updateJob(client, jobConfigInfo, jobProperty, namespaceSource);
}
}
}
/**
* sync jobs.
*
* @throws Exception sync jobs exception
*/
public void syncJobs() throws Exception {
// 1. create namespace
if (syncNamespace(getClient())) {
// 2. create app group
if (syncAppGroup(getClient())) {
syncJobs(properties.getJobs(), getNamespaceSource());
properties.setNamespaceSource(getNamespaceSource());
}
}
}
/**
* sync namespace.
*
* @param client pop client
* @return true if success
* @throws Exception sync namespace exception
*/
public boolean syncNamespace(DefaultAcsClient client) throws Exception {
if (StringUtils.isEmpty(properties.getNamespace())) {
logger.error("please set {}.namespace", SchedulerxProperties.CONFIG_PREFIX);
throw new IOException(String.format("please set %s.namespace", SchedulerxProperties.CONFIG_PREFIX));
}
if (StringUtils.isEmpty(properties.getNamespaceName())) {
logger.error("please set {}.namespaceName", SchedulerxProperties.CONFIG_PREFIX);
throw new IOException(String.format("please set %s.namespaceName", SchedulerxProperties.CONFIG_PREFIX));
}
CreateNamespaceRequest request = new CreateNamespaceRequest();
request.setUid(properties.getNamespace());
request.setName(properties.getNamespaceName());
request.setSource(getNamespaceSource());
CreateNamespaceResponse response = client.getAcsResponse(request);
if (response.getSuccess()) {
logger.info(JsonUtil.toJson(response));
return true;
}
else {
throw new IOException(response.getMessage());
}
}
/**
* sync app group.
*
* @param client pop client
* @return sync app group result
* @throws IOException sync app group exception.
* @throws ClientException sync app group pop client exception.
*/
public boolean syncAppGroup(DefaultAcsClient client) throws IOException, ClientException {
if (StringUtils.isEmpty(properties.getAppName())) {
logger.error("please set {}.appName", SchedulerxProperties.CONFIG_PREFIX);
throw new IOException(String.format("please set %s.appName", SchedulerxProperties.CONFIG_PREFIX));
}
if (StringUtils.isEmpty(properties.getAppKey())) {
logger.error("please set {}.appKey", SchedulerxProperties.CONFIG_PREFIX);
throw new IOException(String.format("please set %s.appKey", SchedulerxProperties.CONFIG_PREFIX));
}
if (StringUtils.isEmpty(properties.getGroupId())) {
logger.error("please set {}.groupId", SchedulerxProperties.CONFIG_PREFIX);
throw new IOException(String.format("please set %s.groupId", SchedulerxProperties.CONFIG_PREFIX));
}
CreateAppGroupRequest request = new CreateAppGroupRequest();
request.setNamespace(properties.getNamespace());
request.setNamespaceSource(getNamespaceSource());
request.setAppName(properties.getAppName());
request.setGroupId(properties.getGroupId());
request.setAppKey(properties.getAppKey());
if (StringUtils.isNotEmpty(properties.getAlarmChannel())) {
MonitorConfig monitorConfig = new MonitorConfig();
monitorConfig.setSendChannel(properties.getAlarmChannel());
request.setMonitorConfigJson(JsonUtil.toJson(monitorConfig));
}
if (!properties.getAlarmUsers().isEmpty()) {
List<ContactInfo> contactInfos = new ArrayList(properties.getAlarmUsers().values());
request.setMonitorContactsJson(JsonUtil.toJson(contactInfos));
}
CreateAppGroupResponse response = client.getAcsResponse(request);
if (response.getSuccess()) {
logger.info(JsonUtil.toJson(response));
return true;
}
else {
throw new IOException(response.getMessage());
}
}
/**
* Get job config info.
*
* @param client pop client
* @param jobName job name
* @param namespaceSource namespace source
* @return job config info.
* @throws Exception get job config info exception.
*/
private JobConfigInfo getJob(DefaultAcsClient client, String jobName, String namespaceSource) throws Exception {
GetJobInfoRequest request = new GetJobInfoRequest();
request.setNamespace(properties.getNamespace());
request.setNamespaceSource(namespaceSource);
request.setGroupId(properties.getGroupId());
request.setJobId(0L);
request.setJobName(jobName);
GetJobInfoResponse response = client.getAcsResponse(request);
if (response.getSuccess()) {
return response.getData().getJobConfigInfo();
}
return null;
}
/**
* create job.
*
* @param client pop client
* @param jobName job name
* @param jobProperty job property
* @param namespaceSource namespace source
* @throws Exception create job exception
*/
private void createJob(DefaultAcsClient client, String jobName, JobProperty jobProperty, String namespaceSource) throws Exception {
CreateJobRequest request = new CreateJobRequest();
request.setNamespace(properties.getNamespace());
request.setNamespaceSource(namespaceSource);
request.setGroupId(properties.getGroupId());
request.setName(jobName);
request.setParameters(jobProperty.getJobParameter());
if (jobProperty.getJobType().equals(JobType.JAVA.getKey())) {
request.setJobType(JobType.JAVA.getKey());
request.setClassName(jobProperty.getClassName());
}
else {
request.setJobType(jobProperty.getJobType());
}
if (SchedulerxConstants.JOB_MODEL_MAPREDUCE_ALIAS.equals(jobProperty.getJobModel())) {
request.setExecuteMode(ExecuteMode.BATCH.getKey());
}
else {
request.setExecuteMode(jobProperty.getJobModel());
}
if (StringUtils.isNotEmpty(jobProperty.getDescription())) {
request.setDescription(jobProperty.getDescription());
}
if (StringUtils.isNotEmpty(jobProperty.getContent())) {
request.setContent(jobProperty.getContent());
}
if (StringUtils.isNotEmpty(jobProperty.getCron()) && StringUtils.isNotEmpty(jobProperty.getOneTime())) {
throw new IOException("cron and oneTime shouldn't set together");
}
if (StringUtils.isNotEmpty(jobProperty.getCron())) {
CronExpression cronExpression = new CronExpression(jobProperty.getCron());
Date now = new Date();
Date nextData = cronExpression.getTimeAfter(now);
Date next2Data = cronExpression.getTimeAfter(nextData);
if (nextData != null && next2Data != null) {
long interval = TimeUnit.MILLISECONDS.toSeconds((next2Data.getTime() - nextData.getTime()));
if (interval < SchedulerxConstants.SECOND_DELAY_MAX_VALUE) {
request.setTimeType(TimeType.SECOND_DELAY.getValue());
request.setTimeExpression(String.valueOf(interval < SchedulerxConstants.SECOND_DELAY_MIN_VALUE ?
SchedulerxConstants.SECOND_DELAY_MIN_VALUE : interval));
}
else {
request.setTimeType(TimeType.CRON.getValue());
request.setTimeExpression(jobProperty.getCron());
}
}
else {
request.setTimeType(TimeType.CRON.getValue());
request.setTimeExpression(jobProperty.getCron());
}
}
else if (StringUtils.isNotEmpty(jobProperty.getOneTime())) {
request.setTimeType(TimeType.ONE_TIME.getValue());
request.setTimeExpression(jobProperty.getOneTime());
}
else {
request.setTimeType(TimeType.API.getValue());
}
if (jobProperty.getTimeType() != null) {
request.setTimeType(jobProperty.getTimeType());
if (StringUtils.isNotEmpty(jobProperty.getTimeExpression())) {
request.setTimeExpression(jobProperty.getTimeExpression());
}
}
request.setTimeoutEnable(true);
request.setTimeoutKillEnable(true);
request.setSendChannel(SchedulerxConstants.JOB_ALARM_CHANNEL_DEFAULT);
request.setFailEnable(true);
request.setTimeout(SchedulerxConstants.JOB_TIMEOUT_DEFAULT);
request.setMaxAttempt(SchedulerxConstants.JOB_RETRY_COUNT_DEFAULT);
request.setAttemptInterval(SchedulerxConstants.JOB_RETRY_INTERVAL_DEFAULT);
CreateJobResponse response = client.getAcsResponse(request);
if (response.getSuccess()) {
logger.info("create schedulerx job successfully, jobId={}, jobName={}", response.getData().getJobId(), jobName);
}
else {
throw new IOException("create schedulerx job failed, jobName=" + jobName + ", message=" + response.getMessage());
}
}
/**
* update job.
*
* @param client pop client
* @param jobConfigInfo job config info
* @param jobProperty job property
* @param namespaceSource namespace source
* @throws Exception update job exception
*/
private void updateJob(DefaultAcsClient client, JobConfigInfo jobConfigInfo, JobProperty jobProperty, String namespaceSource) throws Exception {
String executeMode = jobProperty.getJobModel();
if (SchedulerxConstants.JOB_MODEL_MAPREDUCE_ALIAS.equals(jobProperty.getJobModel())) {
executeMode = ExecuteMode.BATCH.getKey();
}
int timeType;
String timeExpression = null;
if (StringUtils.isNotEmpty(jobProperty.getCron()) && StringUtils.isNotEmpty(jobProperty.getOneTime())) {
throw new IOException("cron and oneTime shouldn't set together");
}
if (StringUtils.isNotEmpty(jobProperty.getCron())) {
CronExpression cronExpression = new CronExpression(jobProperty.getCron());
Date now = new Date();
Date nextData = cronExpression.getTimeAfter(now);
Date next2Data = cronExpression.getTimeAfter(nextData);
if (nextData != null && next2Data != null) {
long interval = TimeUnit.MILLISECONDS.toSeconds((next2Data.getTime() - nextData.getTime()));
if (interval < SchedulerxConstants.SECOND_DELAY_MAX_VALUE) {
timeType = TimeType.SECOND_DELAY.getValue();
timeExpression = String.valueOf(interval < SchedulerxConstants.SECOND_DELAY_MIN_VALUE ?
SchedulerxConstants.SECOND_DELAY_MIN_VALUE : interval);
}
else {
timeType = TimeType.CRON.getValue();
timeExpression = jobProperty.getCron();
}
}
else {
timeType = TimeType.CRON.getValue();
timeExpression = jobProperty.getCron();
}
}
else if (StringUtils.isNotEmpty(jobProperty.getOneTime())) {
timeType = TimeType.ONE_TIME.getValue();
timeExpression = jobProperty.getOneTime();
}
else {
timeType = TimeType.API.getValue();
}
if (!jobConfigInfo.getDescription().equals(jobProperty.getDescription())
|| !jobConfigInfo.getClassName().equals(jobProperty.getClassName())
|| !jobConfigInfo.getParameters().equals(jobProperty.getJobParameter())
|| !jobConfigInfo.getExecuteMode().equals(executeMode)
|| jobConfigInfo.getTimeConfig().getTimeType() != timeType
|| !jobConfigInfo.getTimeConfig().getTimeExpression().equals(timeExpression)) {
UpdateJobRequest request = new UpdateJobRequest();
request.setNamespace(properties.getNamespace());
request.setNamespaceSource(namespaceSource);
request.setGroupId(properties.getGroupId());
request.setJobId(jobConfigInfo.getJobId());
request.setName(jobConfigInfo.getName());
request.setParameters(jobProperty.getJobParameter());
//java任务
if (jobProperty.getJobType().equals(JobType.JAVA.getKey())) {
request.setClassName(jobProperty.getClassName());
}
request.setExecuteMode(executeMode);
if (StringUtils.isNotEmpty(jobProperty.getDescription())) {
request.setDescription(jobProperty.getDescription());
}
request.setTimeType(timeType);
request.setTimeExpression(timeExpression);
request.setTimeoutEnable(true);
request.setTimeoutKillEnable(true);
request.setSendChannel(SchedulerxConstants.JOB_ALARM_CHANNEL_DEFAULT);
request.setFailEnable(true);
request.setTimeout(SchedulerxConstants.JOB_TIMEOUT_DEFAULT);
request.setMaxAttempt(SchedulerxConstants.JOB_RETRY_COUNT_DEFAULT);
request.setAttemptInterval(SchedulerxConstants.JOB_RETRY_INTERVAL_DEFAULT);
UpdateJobResponse response = client.getAcsResponse(request);
if (response.getSuccess()) {
logger.info("update schedulerx job successfully, jobId={}, jobName={}", jobConfigInfo.getJobId(), jobConfigInfo.getName());
}
else {
throw new IOException("update schedulerx job failed, jobName=" + jobConfigInfo.getName() + ", message=" + response.getMessage());
}
}
}
/**
* Get namespace source.
*
* @return namespace source
*/
private String getNamespaceSource() {
if (StringUtils.isEmpty(properties.getNamespaceSource())) {
return SchedulerxConstants.NAMESPACE_SOURCE_SPRINGBOOT;
}
return properties.getNamespaceSource();
}
}

@ -0,0 +1,194 @@
/*
* Copyright 2024-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.scheduling.schedulerx.service;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import com.alibaba.cloud.scheduling.schedulerx.JobProperty;
import com.alibaba.cloud.scheduling.schedulerx.SchedulerxProperties;
import com.alibaba.schedulerx.common.domain.ExecuteMode;
import com.alibaba.schedulerx.common.domain.JobType;
import com.alibaba.schedulerx.common.domain.Pair;
import com.alibaba.schedulerx.common.domain.TimeType;
import com.alibaba.schedulerx.common.util.JsonUtil;
import com.alibaba.schedulerx.common.util.StringUtils;
import com.alibaba.schedulerx.scheduling.annotation.SchedulerX;
import com.alibaba.schedulerx.worker.domain.SpringScheduleProfile;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.processor.springscheduling.SchedulerxSchedulingConfigurer;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.CronTask;
import org.springframework.scheduling.config.IntervalTask;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.config.Task;
import org.springframework.scheduling.support.ScheduledMethodRunnable;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
/**
* Spring scheduled job sync.
*
* @author yaohui
* @create 2022/8/17 2:21
**/
public class ScheduledJobSyncConfigurer implements SchedulingConfigurer {
private static final Logger logger = LogFactory.getLogger(ScheduledJobSyncConfigurer.class);
@Autowired
private JobSyncService jobSyncService;
@Autowired
private SchedulerxProperties properties;
@Autowired
private SchedulerxSchedulingConfigurer schedulerxSchedulingConfigurer;
@Value("${" + SchedulerxProperties.CONFIG_PREFIX + ".task-overwrite:false}")
private Boolean overwrite = false;
@Value("${" + SchedulerxProperties.CONFIG_PREFIX + ".task-model-default:broadcast}")
private String defaultModel = ExecuteMode.BROADCAST.getKey();
private boolean isValidModel(String mode) {
if (mode == null) {
return false;
}
return (ExecuteMode.BROADCAST.getKey().equals(mode) || ExecuteMode.STANDALONE.getKey().equals(mode));
}
private JobProperty convertToJobProperty(Task task, Object target, Method method) {
JobProperty jobProperty = new JobProperty();
Class targetClass = AopProxyUtils.ultimateTargetClass(target);
if (ClassUtils.isCglibProxyClass(targetClass)) {
targetClass = ClassUtils.getUserClass(target);
}
String jobName = targetClass.getSimpleName() + "_" + method.getName();
String model = this.defaultModel;
if (task != null && task instanceof CronTask) {
String expression = ((CronTask) task).getExpression();
jobProperty.setCron(expression);
}
if (task != null && task instanceof IntervalTask) {
long interval = ((IntervalTask) task).getInterval() / 1000;
interval = interval < 1 ? 1 : interval;
if (interval < 60) {
jobProperty.setTimeType(TimeType.SECOND_DELAY.getValue());
}
else {
jobProperty.setTimeType(TimeType.FIXED_RATE.getValue());
}
jobProperty.setTimeExpression(String.valueOf(interval));
}
SchedulerX schedulerXMethod = AnnotatedElementUtils.getMergedAnnotation(method, SchedulerX.class);
if (schedulerXMethod != null) {
if (StringUtils.isNotEmpty(schedulerXMethod.name())) {
jobName = schedulerXMethod.name();
}
if (isValidModel(schedulerXMethod.model())) {
model = schedulerXMethod.model();
}
if (StringUtils.isNotEmpty(schedulerXMethod.cron())) {
jobProperty.setCron(schedulerXMethod.cron());
}
if (schedulerXMethod.fixedRate() > 0) {
long interval = schedulerXMethod.timeUnit().toSeconds(schedulerXMethod.fixedRate());
interval = interval < 1 ? 1 : interval;
if (interval < 60) {
jobProperty.setTimeType(TimeType.SECOND_DELAY.getValue());
}
else {
jobProperty.setTimeType(TimeType.FIXED_RATE.getValue());
}
jobProperty.setTimeExpression(String.valueOf(interval));
}
}
jobProperty.setJobName(jobName);
jobProperty.setJobType(JobType.SPRINGSCHEDULE.getKey());
jobProperty.setJobModel(model);
SpringScheduleProfile profile = new SpringScheduleProfile();
profile.setClassName(targetClass.getName());
profile.setMethod(method.getName());
jobProperty.setContent(JsonUtil.toJson(profile));
jobProperty.setOverwrite(overwrite);
return jobProperty;
}
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
logger.info("spring scheduled job is not empty, start to sync jobs...");
try {
Map<String, JobProperty> jobs = new HashMap<>();
if (!CollectionUtils.isEmpty(taskRegistrar.getCronTaskList())) {
for (CronTask cronTask : taskRegistrar.getCronTaskList()) {
if (cronTask.getRunnable() instanceof ScheduledMethodRunnable) {
ScheduledMethodRunnable runnable = (ScheduledMethodRunnable) cronTask.getRunnable();
JobProperty jobProperty = convertToJobProperty(cronTask, runnable.getTarget(), runnable.getMethod());
jobs.put(jobProperty.getJobName(), jobProperty);
}
}
}
if (!CollectionUtils.isEmpty(taskRegistrar.getFixedDelayTaskList())) {
for (IntervalTask intervalTask : taskRegistrar.getFixedDelayTaskList()) {
if (intervalTask.getRunnable() instanceof ScheduledMethodRunnable) {
ScheduledMethodRunnable runnable = (ScheduledMethodRunnable) intervalTask.getRunnable();
JobProperty jobProperty = convertToJobProperty(intervalTask, runnable.getTarget(), runnable.getMethod());
jobs.put(jobProperty.getJobName(), jobProperty);
}
}
}
if (!CollectionUtils.isEmpty(taskRegistrar.getFixedRateTaskList())) {
for (IntervalTask intervalTask : taskRegistrar.getFixedRateTaskList()) {
if (intervalTask.getRunnable() instanceof ScheduledMethodRunnable) {
ScheduledMethodRunnable runnable = (ScheduledMethodRunnable) intervalTask.getRunnable();
JobProperty jobProperty = convertToJobProperty(intervalTask, runnable.getTarget(), runnable.getMethod());
jobs.put(jobProperty.getJobName(), jobProperty);
}
}
}
// 获取仅SchedulerX注解任务
Collection<Pair<Object, Method>> schedulerXTasks = schedulerxSchedulingConfigurer.getSchedulerXTaskTargets();
if (schedulerXTasks != null && schedulerXTasks.size() > 0) {
for (Pair<Object, Method> task : schedulerXTasks) {
JobProperty jobProperty = convertToJobProperty(null, task.getFirst(), task.getSecond());
jobs.put(jobProperty.getJobName(), jobProperty);
}
}
jobSyncService.syncJobs(jobs, properties.getNamespaceSource());
logger.info("spring scheduled job is not empty, sync jobs finished.");
}
catch (Exception e) {
logger.info("spring scheduled job is not empty, sync jobs failed.", e);
throw new RuntimeException(e);
}
}
}

@ -0,0 +1,69 @@
/*
* Copyright 2024-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.scheduling.shedlock;
import javax.sql.DataSource;
import com.alibaba.cloud.scheduling.SchedulingConstants;
import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider;
import net.javacrumbs.shedlock.spring.annotation.EnableSchedulerLock;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.init.DataSourceInitializer;
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
/**
* @author yaohui
**/
@Configuration(proxyBeanMethods = false)
@AutoConfigureAfter(DataSourceAutoConfiguration.class)
@EnableSchedulerLock(defaultLockAtMostFor = "1m")
@ConditionalOnBean(DataSource.class)
@ConditionalOnProperty(name = SchedulingConstants.SCHEDULING_CONFIG_DISTRIBUTED_MODE_KEY, havingValue = "shedlock")
public class ShedLockAutoConfigure {
private static final String SCHEDULED_LOCK_SCHEMA_PATH = "shedlock/schema/schema-mysql.sql";
@Bean
@ConditionalOnMissingBean
public LockProvider lockProvider(DataSource dataSource) {
return new JdbcTemplateLockProvider(JdbcTemplateLockProvider.Configuration.builder()
.withJdbcTemplate(new JdbcTemplate(dataSource))
.usingDbTime()
.build());
}
@Bean
@ConditionalOnMissingBean
public DataSourceInitializer shedLockDataSourceInitializer(DataSource dataSource) {
ResourceDatabasePopulator resourceDatabasePopulator = new ResourceDatabasePopulator();
resourceDatabasePopulator.addScript(new ClassPathResource(SCHEDULED_LOCK_SCHEMA_PATH));
DataSourceInitializer dataSourceInitializer = new DataSourceInitializer();
dataSourceInitializer.setDataSource(dataSource);
dataSourceInitializer.setDatabasePopulator(resourceDatabasePopulator);
return dataSourceInitializer;
}
}

@ -0,0 +1,3 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.scheduling.schedulerx.SchedulerxAutoConfigure,\
com.alibaba.cloud.scheduling.shedlock.ShedLockAutoConfigure

@ -0,0 +1,2 @@
com.alibaba.cloud.scheduling.schedulerx.SchedulerxAutoConfigure
com.alibaba.cloud.scheduling.shedlock.ShedLockAutoConfigure

@ -0,0 +1,2 @@
CREATE TABLE IF NOT EXISTS shedlock(name VARCHAR(64) NOT NULL, lock_until TIMESTAMP(3) NOT NULL,
locked_at TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), locked_by VARCHAR(255) NOT NULL, PRIMARY KEY (name));

@ -0,0 +1,73 @@
/*
* Copyright 2024-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.scheduling.schedulerx.util;
import java.text.ParseException;
import java.util.Date;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author yaohui
*/
class CronExpressionTest {
@Test
void isValidExpression() {
assertThat(CronExpression.isValidExpression("0 0 0 * * ?")).isEqualTo(true);
assertThat(CronExpression.isValidExpression("0 */5 * * * ?")).isEqualTo(true);
assertThat(CronExpression.isValidExpression("0 0 8 1 JAN ?")).isEqualTo(true);
assertThat(CronExpression.isValidExpression("0 0 8 ? 10 THU")).isEqualTo(true);
assertThat(CronExpression.isValidExpression("0 0 8 ? 10 THU-SAT")).isEqualTo(true);
assertThat(CronExpression.isValidExpression("0 0 8 ? 10 FRI#1")).isEqualTo(true);
assertThat(CronExpression.isValidExpression("0 0 8 ? 10 FRI#5")).isEqualTo(true);
// false
assertThat(CronExpression.isValidExpression("0 0 8 1 JAW ?")).isEqualTo(false);
assertThat(CronExpression.isValidExpression("0 0 8 ? 10 THP-SAT")).isEqualTo(false);
assertThat(CronExpression.isValidExpression("0 0 8 ? 10 FRI#0")).isEqualTo(false);
assertThat(CronExpression.isValidExpression("0 0 8 ? 10 FRI#6")).isEqualTo(false);
}
@Test
void getTimeAfter() throws ParseException {
CronExpression cronExpression = new CronExpression("0 0 8 1 JAN ?");
Date nextDate = cronExpression.getTimeAfter(new Date());
System.out.println(nextDate);
assertThat(nextDate).isNotNull();
cronExpression = new CronExpression("0 */5 * * * ?");
nextDate = cronExpression.getTimeAfter(new Date());
System.out.println(nextDate);
assertThat(nextDate).isNotNull();
}
@Test
void getTimeBefore() throws ParseException {
CronExpression cronExpression = new CronExpression("0 0 8 1 JAN ?");
Date beforeDate = cronExpression.getTimeBefore(new Date());
System.out.println(beforeDate);
assertThat(beforeDate).isNotNull();
cronExpression = new CronExpression("0 */5 * * * ?");
beforeDate = cronExpression.getTimeBefore(new Date());
System.out.println(beforeDate);
assertThat(beforeDate).isNotNull();
}
}
Loading…
Cancel
Save