new features: add distributed scheduling support. (#3725)
parent
d39078ab70
commit
32a6e437ae
Binary file not shown.
After Width: | Height: | Size: 573 KiB |
@ -0,0 +1,65 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<parent>
|
||||||
|
<artifactId>spring-cloud-alibaba-examples</artifactId>
|
||||||
|
<groupId>com.alibaba.cloud</groupId>
|
||||||
|
<version>${revision}</version>
|
||||||
|
<relativePath>../pom.xml</relativePath>
|
||||||
|
</parent>
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
<artifactId>spring-cloud-scheduling-example</artifactId>
|
||||||
|
<name>Spring Cloud Starter Alibaba Scheduling Example</name>
|
||||||
|
<description>Example demonstrating how to use Spring Cloud Schedule</description>
|
||||||
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<mysql.version>8.0.28</mysql.version>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.cloud</groupId>
|
||||||
|
<artifactId>spring-cloud-starter-alibaba-schedulerx</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter-web</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<!-- database for shedlock start, JDBC data source configuration should be added -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter-jdbc</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>mysql</groupId>
|
||||||
|
<artifactId>mysql-connector-java</artifactId>
|
||||||
|
<version>${mysql.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<!-- database for shedlock end -->
|
||||||
|
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||||
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-deploy-plugin</artifactId>
|
||||||
|
<version>${maven-deploy-plugin.version}</version>
|
||||||
|
<configuration>
|
||||||
|
<skip>true</skip>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
|
</project>
|
@ -0,0 +1,36 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2024-2025 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.alibaba.cloud.examples.schedule;
|
||||||
|
|
||||||
|
import org.springframework.boot.SpringApplication;
|
||||||
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
|
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ScheduleApplication.
|
||||||
|
*
|
||||||
|
* @author yaohui
|
||||||
|
*/
|
||||||
|
@SpringBootApplication
|
||||||
|
@EnableScheduling
|
||||||
|
public class ScheduleApplication {
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
SpringApplication.run(ScheduleApplication.class, args);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,57 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2024-2025 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.alibaba.cloud.examples.schedule.job;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author yaohui
|
||||||
|
**/
|
||||||
|
@Component
|
||||||
|
public class SimpleJob {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(SimpleJob.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* run without lock, all instance running at the same time.
|
||||||
|
*/
|
||||||
|
@Scheduled(cron = "0 */1 * * * ?")
|
||||||
|
public void job1() {
|
||||||
|
logger.info("time=" + DateTime.now().toString("YYYY-MM-dd HH:mm:ss") + " do job1...");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* run with lock, only one instance running at the same time.
|
||||||
|
*
|
||||||
|
* @throws InterruptedException interrupted exception
|
||||||
|
*/
|
||||||
|
@Scheduled(cron = "0 */1 * * * ?")
|
||||||
|
@SchedulerLock(name = "lock-job2", lockAtMostFor = "10s")
|
||||||
|
public void job2() throws InterruptedException {
|
||||||
|
logger.info("time=" + DateTime.now().toString("YYYY-MM-dd HH:mm:ss") + " do job2...");
|
||||||
|
TimeUnit.SECONDS.sleep(1L);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,17 @@
|
|||||||
|
spring:
|
||||||
|
cloud:
|
||||||
|
scheduling:
|
||||||
|
# Distributed mode: shedlock, schedulerx
|
||||||
|
distributed-mode: schedulerx
|
||||||
|
schedulerx:
|
||||||
|
# This configuration is required, Please get it from aliyun schedulerx console
|
||||||
|
endpoint: acm.aliyun.com
|
||||||
|
namespace: aad167f6-xxxx-xxxx-xxxx-xxxxxxxxx
|
||||||
|
groupId: xxxxx
|
||||||
|
appKey: PZm1XXXXXXXXXXXX
|
||||||
|
# Optional config, if you need to sync task to schedulerx
|
||||||
|
# task-sync: true
|
||||||
|
# region-id: public
|
||||||
|
# aliyun-access-key: XXXXXXXXXXXX
|
||||||
|
# aliyun-secret-key: XXXXXXXXXXXX
|
||||||
|
# task-model-default: standalone
|
@ -0,0 +1,13 @@
|
|||||||
|
spring:
|
||||||
|
cloud:
|
||||||
|
scheduling:
|
||||||
|
# Distributed mode: shedlock, schedulerx
|
||||||
|
distributed-mode: shedlock
|
||||||
|
datasource:
|
||||||
|
driver-class: com.mysql.cj.jdbc.Driver
|
||||||
|
# Change to your database jdbc url value
|
||||||
|
url: jdbc:mysql://127.0.0.1:3306/testdb?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true
|
||||||
|
# Change to your database username value
|
||||||
|
username: root
|
||||||
|
# Change to your database password value
|
||||||
|
password: 123456
|
@ -0,0 +1,17 @@
|
|||||||
|
server:
|
||||||
|
port: 18080
|
||||||
|
spring:
|
||||||
|
profiles:
|
||||||
|
# Select the config file to use, shedlock or schedulerx
|
||||||
|
active: shedlock
|
||||||
|
application:
|
||||||
|
name: spring-cloud-alibaba-schedule-example
|
||||||
|
# Spring task scheduling config
|
||||||
|
task:
|
||||||
|
scheduling:
|
||||||
|
thread-name-prefix: sca-schedule-
|
||||||
|
pool:
|
||||||
|
size: 5
|
||||||
|
shutdown:
|
||||||
|
await-termination: true
|
||||||
|
await-termination-period: 60s
|
@ -0,0 +1,70 @@
|
|||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
<parent>
|
||||||
|
<groupId>com.alibaba.cloud</groupId>
|
||||||
|
<artifactId>spring-cloud-alibaba-starters</artifactId>
|
||||||
|
<version>${revision}</version>
|
||||||
|
<relativePath>../pom.xml</relativePath>
|
||||||
|
</parent>
|
||||||
|
<artifactId>spring-cloud-starter-alibaba-schedulerx</artifactId>
|
||||||
|
<name>Spring Cloud Starter Alibaba Scheduling</name>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
|
||||||
|
<!--spring boot-->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-autoconfigure</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter-test</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
<artifactId>slf4j-api</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>junit</groupId>
|
||||||
|
<artifactId>junit</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<!-- schedulerx dependency -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.aliyun.schedulerx</groupId>
|
||||||
|
<artifactId>schedulerx2-worker</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.aliyun</groupId>
|
||||||
|
<artifactId>aliyun-java-sdk-schedulerx2</artifactId>
|
||||||
|
<version>1.1.0</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.aliyun</groupId>
|
||||||
|
<artifactId>aliyun-java-sdk-core</artifactId>
|
||||||
|
<version>4.4.6</version>
|
||||||
|
</dependency>
|
||||||
|
<!-- schedulerx dependency end-->
|
||||||
|
|
||||||
|
<!-- shedlock dependency -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>net.javacrumbs.shedlock</groupId>
|
||||||
|
<artifactId>shedlock-spring</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>net.javacrumbs.shedlock</groupId>
|
||||||
|
<artifactId>shedlock-provider-jdbc-template</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<!-- shedlock dependency end -->
|
||||||
|
</dependencies>
|
||||||
|
</project>
|
@ -0,0 +1,38 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2024-2025 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.alibaba.cloud.scheduling;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author yaohui
|
||||||
|
**/
|
||||||
|
public final class SchedulingConstants {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Scheduling config prefix.
|
||||||
|
*/
|
||||||
|
public static final String SCHEDULING_CONFIG_PREFIX = "spring.cloud.scheduling";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Scheduling distributed mode.
|
||||||
|
*/
|
||||||
|
public static final String SCHEDULING_CONFIG_DISTRIBUTED_MODE_KEY = SCHEDULING_CONFIG_PREFIX + ".distributed-mode";
|
||||||
|
|
||||||
|
private SchedulingConstants() {
|
||||||
|
throw new AssertionError("Must not instantiate constant utility class");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,151 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2024-2025 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.alibaba.cloud.scheduling.schedulerx;
|
||||||
|
|
||||||
|
import com.alibaba.schedulerx.common.domain.ExecuteMode;
|
||||||
|
import com.alibaba.schedulerx.common.domain.JobType;
|
||||||
|
|
||||||
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Job property.
|
||||||
|
*
|
||||||
|
* @author xiaomeng.hxm
|
||||||
|
*/
|
||||||
|
@ConfigurationProperties(prefix = SchedulerxProperties.CONFIG_PREFIX)
|
||||||
|
public final class JobProperty {
|
||||||
|
|
||||||
|
private String jobName;
|
||||||
|
|
||||||
|
private String jobType = JobType.JAVA.getKey();
|
||||||
|
|
||||||
|
private String jobModel = ExecuteMode.STANDALONE.getKey();
|
||||||
|
|
||||||
|
private String className;
|
||||||
|
|
||||||
|
private String content;
|
||||||
|
|
||||||
|
private Integer timeType;
|
||||||
|
|
||||||
|
private String timeExpression;
|
||||||
|
|
||||||
|
private String cron;
|
||||||
|
|
||||||
|
private String oneTime;
|
||||||
|
|
||||||
|
private String jobParameter;
|
||||||
|
|
||||||
|
private String description;
|
||||||
|
|
||||||
|
private boolean overwrite = false;
|
||||||
|
|
||||||
|
public String getJobType() {
|
||||||
|
return jobType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setJobType(String jobType) {
|
||||||
|
this.jobType = jobType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getJobModel() {
|
||||||
|
return jobModel;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setJobModel(String jobModel) {
|
||||||
|
this.jobModel = jobModel;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getClassName() {
|
||||||
|
return className;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setClassName(String className) {
|
||||||
|
this.className = className;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getCron() {
|
||||||
|
return cron;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCron(String cron) {
|
||||||
|
this.cron = cron;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getOneTime() {
|
||||||
|
return oneTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setOneTime(String oneTime) {
|
||||||
|
this.oneTime = oneTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getJobParameter() {
|
||||||
|
return jobParameter;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setJobParameter(String jobParameter) {
|
||||||
|
this.jobParameter = jobParameter;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getDescription() {
|
||||||
|
return description;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDescription(String description) {
|
||||||
|
this.description = description;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isOverwrite() {
|
||||||
|
return overwrite;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setOverwrite(boolean overwrite) {
|
||||||
|
this.overwrite = overwrite;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getContent() {
|
||||||
|
return content;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setContent(String content) {
|
||||||
|
this.content = content;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getJobName() {
|
||||||
|
return jobName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setJobName(String jobName) {
|
||||||
|
this.jobName = jobName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getTimeType() {
|
||||||
|
return timeType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTimeType(Integer timeType) {
|
||||||
|
this.timeType = timeType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTimeExpression() {
|
||||||
|
return timeExpression;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTimeExpression(String timeExpression) {
|
||||||
|
this.timeExpression = timeExpression;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,36 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2024-2025 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.alibaba.cloud.scheduling.schedulerx;
|
||||||
|
|
||||||
|
import com.alibaba.cloud.scheduling.SchedulingConstants;
|
||||||
|
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
|
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||||
|
import org.springframework.context.annotation.Import;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* schedulerx2 starter.
|
||||||
|
*
|
||||||
|
* @author yaohui
|
||||||
|
**/
|
||||||
|
@EnableConfigurationProperties(SchedulerxProperties.class)
|
||||||
|
@ConditionalOnProperty(name = SchedulingConstants.SCHEDULING_CONFIG_DISTRIBUTED_MODE_KEY, havingValue = "schedulerx")
|
||||||
|
@Import({SchedulerxConfigurations.SchedulerxWorkerConfiguration.class,
|
||||||
|
SchedulerxConfigurations.SpringScheduleAdaptConfiguration.class})
|
||||||
|
public class SchedulerxAutoConfigure {
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,178 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2024-2025 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.alibaba.cloud.scheduling.schedulerx;
|
||||||
|
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
|
|
||||||
|
import com.alibaba.cloud.scheduling.schedulerx.service.JobSyncService;
|
||||||
|
import com.alibaba.cloud.scheduling.schedulerx.service.ScheduledJobSyncConfigurer;
|
||||||
|
import com.alibaba.schedulerx.common.util.ConfigUtil;
|
||||||
|
import com.alibaba.schedulerx.common.util.StringUtils;
|
||||||
|
import com.alibaba.schedulerx.worker.SchedulerxWorker;
|
||||||
|
import com.alibaba.schedulerx.worker.domain.WorkerConstants;
|
||||||
|
import com.alibaba.schedulerx.worker.log.LogFactory;
|
||||||
|
import com.alibaba.schedulerx.worker.log.Logger;
|
||||||
|
import com.alibaba.schedulerx.worker.processor.springscheduling.NoOpScheduler;
|
||||||
|
import com.alibaba.schedulerx.worker.processor.springscheduling.SchedulerxAnnotationBeanPostProcessor;
|
||||||
|
import com.alibaba.schedulerx.worker.processor.springscheduling.SchedulerxSchedulingConfigurer;
|
||||||
|
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author yaohui
|
||||||
|
**/
|
||||||
|
public class SchedulerxConfigurations {
|
||||||
|
|
||||||
|
@Configuration(proxyBeanMethods = false)
|
||||||
|
@ConditionalOnClass(SchedulerxWorker.class)
|
||||||
|
@ConditionalOnProperty(prefix = SchedulerxProperties.CONFIG_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
|
||||||
|
static class SchedulerxWorkerConfiguration {
|
||||||
|
|
||||||
|
private static final Logger LOGGER = LogFactory.getLogger(SchedulerxAutoConfigure.class);
|
||||||
|
|
||||||
|
private static final String WORKER_STARTER_SPRING_CLOUD = "springcloud";
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private SchedulerxProperties properties;
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public JobSyncService jobSyncService() {
|
||||||
|
return new JobSyncService();
|
||||||
|
}
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void syncJobs() throws Exception {
|
||||||
|
if (!properties.getJobs().isEmpty()) {
|
||||||
|
LOGGER.info("{}.jobs is not empty, start to sync jobs...", SchedulerxProperties.CONFIG_PREFIX);
|
||||||
|
jobSyncService().syncJobs();
|
||||||
|
LOGGER.info("sync jobs finished.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public SchedulerxWorker schedulerxWorker() {
|
||||||
|
SchedulerxWorker schedulerxWorker = new SchedulerxWorker();
|
||||||
|
schedulerxWorker.setDomainName(properties.getDomainName());
|
||||||
|
schedulerxWorker.setGroupId(properties.getGroupId());
|
||||||
|
schedulerxWorker.setEnableBatchWork(properties.isEnableBatchWork());
|
||||||
|
schedulerxWorker.setDisableSites(properties.getDisableSites());
|
||||||
|
schedulerxWorker.setEnableSites(properties.getEnableSites());
|
||||||
|
schedulerxWorker.setDisableUnits(properties.getDisableUnits());
|
||||||
|
schedulerxWorker.setEnableUnits(properties.getEnableUnits());
|
||||||
|
schedulerxWorker.setAppKey(properties.getAppKey());
|
||||||
|
schedulerxWorker.setAliyunAccessKey(properties.getAliyunAccessKey());
|
||||||
|
schedulerxWorker.setAliyunSecretKey(properties.getAliyunSecretKey());
|
||||||
|
schedulerxWorker.setNamespace(properties.getNamespace());
|
||||||
|
schedulerxWorker.setHost(properties.getHost());
|
||||||
|
schedulerxWorker.setPort(properties.getPort());
|
||||||
|
schedulerxWorker.setEndpoint(properties.getEndpoint());
|
||||||
|
schedulerxWorker.setNamespaceSource(properties.getNamespaceSource());
|
||||||
|
schedulerxWorker.setMaxTaskBodySize(properties.getMaxTaskBodySize());
|
||||||
|
schedulerxWorker.setBlockAppStart(properties.isBlockAppStart());
|
||||||
|
schedulerxWorker.setSTSAccessKey(properties.getStsAccessKey());
|
||||||
|
schedulerxWorker.setSTSSecretKey(properties.getStsSecretKey());
|
||||||
|
schedulerxWorker.setSTSSecretToken(properties.getStsToken());
|
||||||
|
schedulerxWorker.setSlsCollectorEnable(properties.isSlsCollectorEnable());
|
||||||
|
schedulerxWorker.setShareContainerPool(properties.isShareContainerPool());
|
||||||
|
schedulerxWorker.setThreadPoolMode(properties.getThreadPoolMode());
|
||||||
|
schedulerxWorker.setLabel(properties.getLabel());
|
||||||
|
schedulerxWorker.setLabelPath(properties.getLabelPath());
|
||||||
|
if (properties.isShareContainerPool() || WorkerConstants.THREAD_POOL_MODE_ALL.equals(properties.getThreadPoolMode())) {
|
||||||
|
schedulerxWorker.setSharePoolSize(properties.getSharePoolSize());
|
||||||
|
schedulerxWorker.setSharePoolQueueSize(properties.getSharePoolQueueSize());
|
||||||
|
}
|
||||||
|
if (StringUtils.isNotEmpty(properties.getEndpointPort())) {
|
||||||
|
schedulerxWorker.setEndpointPort(Integer.parseInt(properties.getEndpointPort()));
|
||||||
|
}
|
||||||
|
schedulerxWorker.setEnableCgroupMetrics(properties.isEnableCgroupMetrics());
|
||||||
|
if (properties.isEnableCgroupMetrics()) {
|
||||||
|
schedulerxWorker.setCgroupPathPrefix(properties.getCgroupPathPrefix());
|
||||||
|
}
|
||||||
|
if (StringUtils.isNotEmpty(properties.getNamespaceSource())) {
|
||||||
|
schedulerxWorker.setNamespaceSource(properties.getNamespaceSource());
|
||||||
|
}
|
||||||
|
schedulerxWorker.setAkkaRemotingAutoRecover(properties.isAkkaRemotingAutoRecover());
|
||||||
|
schedulerxWorker.setEnableHeartbeatLog(properties.isEnableHeartbeatLog());
|
||||||
|
schedulerxWorker.setMapMasterStatusCheckInterval(properties.getMapMasterStatusCheckInterval());
|
||||||
|
schedulerxWorker.setEnableSecondDelayCycleIntervalMs(properties.isEnableSecondDealyCycleIntervalMs());
|
||||||
|
schedulerxWorker.setEnableMapMasterFailover(properties.isEnableMapMasterFailover());
|
||||||
|
schedulerxWorker.setEnableSecondDelayStandaloneDispatch(properties.isEnableSecondDelayStandaloneDispatch());
|
||||||
|
schedulerxWorker.setPageSize(properties.getPageSize());
|
||||||
|
schedulerxWorker.setGraceShutdownMode(properties.getGraceShutdownMode());
|
||||||
|
if (properties.getGraceShutdownTimeout() > 0) {
|
||||||
|
schedulerxWorker.setGraceShutdownTimeout(properties.getGraceShutdownTimeout());
|
||||||
|
}
|
||||||
|
schedulerxWorker.setBroadcastDispatchThreadNum(properties.getBroadcastDispatchThreadNum());
|
||||||
|
schedulerxWorker.setBroadcastDispatchThreadEnable(properties.isBroadcastDispatchThreadEnable());
|
||||||
|
schedulerxWorker.setBroadcastMasterExecEnable(properties.isBroadcastMasterExecEnable());
|
||||||
|
schedulerxWorker.setBroadcastDispatchRetryTimes(properties.getBroadcastDispatchRetryTimes());
|
||||||
|
schedulerxWorker.setProcessorPoolSize(properties.getProcessorPoolSize());
|
||||||
|
schedulerxWorker.setMapMasterDispatchRandom(properties.isMapMasterDispatchRandom());
|
||||||
|
schedulerxWorker.setMapMasterRouterStrategy(properties.getMapMasterRouterStrategy());
|
||||||
|
if (StringUtils.isNotEmpty(properties.getH2DatabaseUser())) {
|
||||||
|
schedulerxWorker.setH2DatabaseUser(properties.getH2DatabaseUser());
|
||||||
|
}
|
||||||
|
if (StringUtils.isNotEmpty(properties.getH2DatabasePassword())) {
|
||||||
|
schedulerxWorker.setH2DatabasePassword(properties.getH2DatabasePassword());
|
||||||
|
}
|
||||||
|
schedulerxWorker.setHttpServerEnable(properties.getHttpServerEnable());
|
||||||
|
schedulerxWorker.setHttpServerPort(properties.getHttpServerPort());
|
||||||
|
if (properties.getMaxMapDiskPercent() != null) {
|
||||||
|
schedulerxWorker.setMaxMapDiskPercent(properties.getMaxMapDiskPercent());
|
||||||
|
}
|
||||||
|
|
||||||
|
ConfigUtil.getWorkerConfig().setProperty(WorkerConstants.WORKER_STARTER_MODE,
|
||||||
|
WORKER_STARTER_SPRING_CLOUD);
|
||||||
|
return schedulerxWorker;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Configuration(proxyBeanMethods = false)
|
||||||
|
@AutoConfigureAfter(SchedulerxWorkerConfiguration.class)
|
||||||
|
@ConditionalOnBean(SchedulerxWorker.class)
|
||||||
|
static class SpringScheduleAdaptConfiguration {
|
||||||
|
|
||||||
|
@Bean(ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME)
|
||||||
|
public NoOpScheduler noOpScheduler() {
|
||||||
|
return new NoOpScheduler();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public SchedulerxSchedulingConfigurer schedulerxSchedulingConfigurer() {
|
||||||
|
return new SchedulerxSchedulingConfigurer();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public SchedulerxAnnotationBeanPostProcessor schedulerxAnnotationBeanPostProcessor() {
|
||||||
|
return new SchedulerxAnnotationBeanPostProcessor();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@ConditionalOnProperty(prefix = SchedulerxProperties.CONFIG_PREFIX, name = "task-sync", havingValue = "true")
|
||||||
|
public ScheduledJobSyncConfigurer scheduledJobSyncConfigurer() {
|
||||||
|
return new ScheduledJobSyncConfigurer();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,786 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2024-2025 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.alibaba.cloud.scheduling.schedulerx;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import com.alibaba.cloud.scheduling.SchedulingConstants;
|
||||||
|
import com.alibaba.schedulerx.common.domain.ContactInfo;
|
||||||
|
import com.alibaba.schedulerx.common.util.JsonUtil;
|
||||||
|
import com.alibaba.schedulerx.worker.domain.WorkerConstants;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.springframework.beans.factory.InitializingBean;
|
||||||
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* schedulerx worker properties.
|
||||||
|
*
|
||||||
|
* @author yaohui
|
||||||
|
**/
|
||||||
|
@ConfigurationProperties(prefix = SchedulerxProperties.CONFIG_PREFIX)
|
||||||
|
public class SchedulerxProperties implements InitializingBean {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(SchedulerxProperties.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* schedulerx config prefix.
|
||||||
|
*/
|
||||||
|
public static final String CONFIG_PREFIX = SchedulingConstants.SCHEDULING_CONFIG_PREFIX + ".schedulerx";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* domainName.
|
||||||
|
*/
|
||||||
|
private String domainName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* groupId.
|
||||||
|
*/
|
||||||
|
private String groupId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* host.
|
||||||
|
*/
|
||||||
|
private String host;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* client port.
|
||||||
|
*/
|
||||||
|
private int port = 0;
|
||||||
|
|
||||||
|
private String enableUnits;
|
||||||
|
|
||||||
|
private String disableUnits;
|
||||||
|
|
||||||
|
private String enableSites;
|
||||||
|
|
||||||
|
private String disableSites;
|
||||||
|
|
||||||
|
private boolean enableBatchWork;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* enabled: true; false.
|
||||||
|
*/
|
||||||
|
private boolean enabled = true;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* appName.
|
||||||
|
*/
|
||||||
|
private String appName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* appKey.
|
||||||
|
*/
|
||||||
|
private String appKey;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* aliyunRamRole.
|
||||||
|
*/
|
||||||
|
private String aliyunRamRole;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* aliyunAccessKey.
|
||||||
|
*/
|
||||||
|
private String aliyunAccessKey;
|
||||||
|
/**
|
||||||
|
* aliyunSecretKey.
|
||||||
|
*/
|
||||||
|
private String aliyunSecretKey;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* STS ak.
|
||||||
|
*/
|
||||||
|
private String stsAccessKey;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* STS sk.
|
||||||
|
*/
|
||||||
|
private String stsSecretKey;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* STS secret token.
|
||||||
|
*/
|
||||||
|
private String stsToken;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Namespace UID.
|
||||||
|
*/
|
||||||
|
private String namespace;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* endpoint.
|
||||||
|
*/
|
||||||
|
private String endpoint;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* endpointPort.
|
||||||
|
*/
|
||||||
|
private String endpointPort;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* namespaceName.
|
||||||
|
*/
|
||||||
|
private String namespaceName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* namespaceSource.
|
||||||
|
*/
|
||||||
|
private String namespaceSource;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* maxTaskBodySize (byte).
|
||||||
|
*/
|
||||||
|
private int maxTaskBodySize = WorkerConstants.TASK_BODY_SIZE_MAX_DEFAULT;
|
||||||
|
|
||||||
|
private boolean blockAppStart = true;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* slsCollectorEnable.
|
||||||
|
*/
|
||||||
|
private boolean slsCollectorEnable = true;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* shareContainerPool.
|
||||||
|
*/
|
||||||
|
private boolean shareContainerPool = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* threadPoolMode.
|
||||||
|
*/
|
||||||
|
private String threadPoolMode;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* sharePoolSize.
|
||||||
|
*/
|
||||||
|
private int sharePoolSize = WorkerConstants.SHARE_POOL_SIZE_DEFAULT;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* sharePoolQueueSize.
|
||||||
|
*/
|
||||||
|
private int sharePoolQueueSize = Integer.MAX_VALUE;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wlabel.
|
||||||
|
*/
|
||||||
|
private String label;
|
||||||
|
|
||||||
|
private String labelPath = "/etc/podinfo/annotations";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* enableCgroupMetrics.
|
||||||
|
*/
|
||||||
|
private boolean enableCgroupMetrics = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* cgroupPathPrefix.
|
||||||
|
*/
|
||||||
|
private String cgroupPathPrefix = "/sys/fs/cgroup/cpu/";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* akkaRemotingAutoRecover.
|
||||||
|
*/
|
||||||
|
private boolean akkaRemotingAutoRecover = true;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* enableHeartbeatLog.
|
||||||
|
*/
|
||||||
|
private boolean enableHeartbeatLog = true;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* mapMasterStatusCheckInterval(ms).
|
||||||
|
*/
|
||||||
|
private int mapMasterStatusCheckInterval = WorkerConstants.Map_MASTER_STATUS_CHECK_INTERVAL_DEFAULT;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* enableSecondDealyCycleIntervalMs.
|
||||||
|
*/
|
||||||
|
private boolean enableSecondDealyCycleIntervalMs = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* enableMapMasterFailover.
|
||||||
|
*/
|
||||||
|
private boolean enableMapMasterFailover = true;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* enableSecondDelayStandaloneDispatch.
|
||||||
|
*/
|
||||||
|
private boolean enableSecondDelayStandaloneDispatch = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* pageSize.
|
||||||
|
*/
|
||||||
|
private int pageSize = 1000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* GraceShutdownMode(WAIT_ALL; WAIT_RUNNING;).
|
||||||
|
*/
|
||||||
|
private String graceShutdownMode;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* graceShutdownTimeout.
|
||||||
|
*/
|
||||||
|
private long graceShutdownTimeout = WorkerConstants.GRACE_SHUTDOWN_TIMEOUT_DEFAULT;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* broadcastDispatchThreadNum.
|
||||||
|
*/
|
||||||
|
private int broadcastDispatchThreadNum = 4;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* broadcastDispatchRetryTimes.
|
||||||
|
*/
|
||||||
|
private int broadcastDispatchRetryTimes = 1;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* broadcastDispatchThreadEnable.
|
||||||
|
*/
|
||||||
|
private boolean broadcastDispatchThreadEnable = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* broadcastMasterExecEnable.
|
||||||
|
*/
|
||||||
|
private boolean broadcastMasterExecEnable = true;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* mapMasterDispatchRandom.
|
||||||
|
*/
|
||||||
|
private boolean mapMasterDispatchRandom = false;
|
||||||
|
|
||||||
|
private Integer mapMasterRouterStrategy;
|
||||||
|
|
||||||
|
private String regionId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* h2DatabaseUser.
|
||||||
|
*/
|
||||||
|
private String h2DatabaseUser;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* h2DatabasePassword.
|
||||||
|
*/
|
||||||
|
private String h2DatabasePassword;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* httpServerEnable.
|
||||||
|
*/
|
||||||
|
private Boolean httpServerEnable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* httpServerPort.
|
||||||
|
*/
|
||||||
|
private Integer httpServerPort;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* maxMapDiskPercent.
|
||||||
|
*/
|
||||||
|
private Float maxMapDiskPercent;
|
||||||
|
|
||||||
|
private Map<String, JobProperty> jobs = new LinkedHashMap<>();
|
||||||
|
|
||||||
|
private String alarmChannel;
|
||||||
|
|
||||||
|
private Map<String, ContactInfo> alarmUsers = new LinkedHashMap<>();
|
||||||
|
|
||||||
|
private Map<String, Integer> processorPoolSize = new HashMap<>();
|
||||||
|
|
||||||
|
public String getDomainName() {
|
||||||
|
return domainName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDomainName(String domainName) {
|
||||||
|
this.domainName = domainName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getGroupId() {
|
||||||
|
return groupId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setGroupId(String groupId) {
|
||||||
|
this.groupId = groupId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isEnabled() {
|
||||||
|
return enabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEnabled(boolean enabled) {
|
||||||
|
this.enabled = enabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getHost() {
|
||||||
|
return host;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setHost(String host) {
|
||||||
|
this.host = host;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getPort() {
|
||||||
|
return port;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPort(int port) {
|
||||||
|
this.port = port;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getEnableUnits() {
|
||||||
|
return enableUnits;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEnableUnits(String enableUnits) {
|
||||||
|
this.enableUnits = enableUnits;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getDisableUnits() {
|
||||||
|
return disableUnits;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDisableUnits(String disableUnits) {
|
||||||
|
this.disableUnits = disableUnits;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getEnableSites() {
|
||||||
|
return enableSites;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEnableSites(String enableSites) {
|
||||||
|
this.enableSites = enableSites;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getDisableSites() {
|
||||||
|
return disableSites;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDisableSites(String disableSites) {
|
||||||
|
this.disableSites = disableSites;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isEnableBatchWork() {
|
||||||
|
return enableBatchWork;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEnableBatchWork(boolean enableBatchWork) {
|
||||||
|
this.enableBatchWork = enableBatchWork;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getAliyunAccessKey() {
|
||||||
|
return aliyunAccessKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAliyunAccessKey(String aliyunAccessKey) {
|
||||||
|
this.aliyunAccessKey = aliyunAccessKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getAliyunSecretKey() {
|
||||||
|
return aliyunSecretKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAliyunSecretKey(String aliyunSecretKey) {
|
||||||
|
this.aliyunSecretKey = aliyunSecretKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getNamespace() {
|
||||||
|
return namespace;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setNamespace(String namespace) {
|
||||||
|
this.namespace = namespace;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getEndpoint() {
|
||||||
|
return endpoint;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEndpoint(String endpoint) {
|
||||||
|
this.endpoint = endpoint;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getEndpointPort() {
|
||||||
|
return endpointPort;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEndpointPort(String endpointPort) {
|
||||||
|
this.endpointPort = endpointPort;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getNamespaceName() {
|
||||||
|
return namespaceName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setNamespaceName(String namespaceName) {
|
||||||
|
this.namespaceName = namespaceName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getNamespaceSource() {
|
||||||
|
return namespaceSource;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setNamespaceSource(String namespaceSource) {
|
||||||
|
this.namespaceSource = namespaceSource;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaxTaskBodySize() {
|
||||||
|
return maxTaskBodySize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxTaskBodySize(int maxTaskBodySize) {
|
||||||
|
this.maxTaskBodySize = maxTaskBodySize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isBlockAppStart() {
|
||||||
|
return blockAppStart;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setBlockAppStart(boolean blockAppStart) {
|
||||||
|
this.blockAppStart = blockAppStart;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getAppName() {
|
||||||
|
return appName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAppName(String appName) {
|
||||||
|
this.appName = appName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getAppKey() {
|
||||||
|
return appKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAppKey(String appKey) {
|
||||||
|
this.appKey = appKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getStsAccessKey() {
|
||||||
|
return stsAccessKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setStsAccessKey(String stsAccessKey) {
|
||||||
|
this.stsAccessKey = stsAccessKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getStsSecretKey() {
|
||||||
|
return stsSecretKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setStsSecretKey(String stsSecretKey) {
|
||||||
|
this.stsSecretKey = stsSecretKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getStsToken() {
|
||||||
|
return stsToken;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getAliyunRamRole() {
|
||||||
|
return aliyunRamRole;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAliyunRamRole(String aliyunRamRole) {
|
||||||
|
this.aliyunRamRole = aliyunRamRole;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setStsToken(String stsToken) {
|
||||||
|
this.stsToken = stsToken;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isSlsCollectorEnable() {
|
||||||
|
return slsCollectorEnable;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSlsCollectorEnable(boolean slsCollectorEnable) {
|
||||||
|
this.slsCollectorEnable = slsCollectorEnable;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isShareContainerPool() {
|
||||||
|
return shareContainerPool;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setShareContainerPool(boolean shareContainerPool) {
|
||||||
|
this.shareContainerPool = shareContainerPool;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getSharePoolSize() {
|
||||||
|
return sharePoolSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSharePoolSize(int sharePoolSize) {
|
||||||
|
this.sharePoolSize = sharePoolSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getLabel() {
|
||||||
|
return label;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLabel(String label) {
|
||||||
|
if (label != null) {
|
||||||
|
if (label.startsWith("#") && label.endsWith("#")) {
|
||||||
|
String labelKey = label.substring(1, label.length() - 1);
|
||||||
|
this.label = System.getenv(labelKey);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.label = label;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getLabelPath() {
|
||||||
|
return labelPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLabelPath(String labelPath) {
|
||||||
|
this.labelPath = labelPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isEnableCgroupMetrics() {
|
||||||
|
return enableCgroupMetrics;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEnableCgroupMetrics(boolean enableCgroupMetrics) {
|
||||||
|
this.enableCgroupMetrics = enableCgroupMetrics;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getCgroupPathPrefix() {
|
||||||
|
return cgroupPathPrefix;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCgroupPathPrefix(String cgroupPathPrefix) {
|
||||||
|
this.cgroupPathPrefix = cgroupPathPrefix;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isAkkaRemotingAutoRecover() {
|
||||||
|
return akkaRemotingAutoRecover;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAkkaRemotingAutoRecover(boolean akkaRemotingAutoRecover) {
|
||||||
|
this.akkaRemotingAutoRecover = akkaRemotingAutoRecover;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isEnableHeartbeatLog() {
|
||||||
|
return enableHeartbeatLog;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEnableHeartbeatLog(boolean enableHeartbeatLog) {
|
||||||
|
this.enableHeartbeatLog = enableHeartbeatLog;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMapMasterStatusCheckInterval() {
|
||||||
|
return mapMasterStatusCheckInterval;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMapMasterStatusCheckInterval(int mapMasterStatusCheckInterval) {
|
||||||
|
this.mapMasterStatusCheckInterval = mapMasterStatusCheckInterval;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isEnableSecondDealyCycleIntervalMs() {
|
||||||
|
return enableSecondDealyCycleIntervalMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEnableSecondDealyCycleIntervalMs(boolean enableSecondDealyCycleIntervalMs) {
|
||||||
|
this.enableSecondDealyCycleIntervalMs = enableSecondDealyCycleIntervalMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isEnableMapMasterFailover() {
|
||||||
|
return enableMapMasterFailover;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEnableMapMasterFailover(boolean enableMapMasterFailover) {
|
||||||
|
this.enableMapMasterFailover = enableMapMasterFailover;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isEnableSecondDelayStandaloneDispatch() {
|
||||||
|
return enableSecondDelayStandaloneDispatch;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEnableSecondDelayStandaloneDispatch(boolean enableSecondDelayStandaloneDispatch) {
|
||||||
|
this.enableSecondDelayStandaloneDispatch = enableSecondDelayStandaloneDispatch;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getPageSize() {
|
||||||
|
return pageSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getGraceShutdownMode() {
|
||||||
|
return graceShutdownMode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setGraceShutdownMode(String graceShutdownMode) {
|
||||||
|
this.graceShutdownMode = graceShutdownMode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getGraceShutdownTimeout() {
|
||||||
|
return graceShutdownTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setGraceShutdownTimeout(long graceShutdownTimeout) {
|
||||||
|
this.graceShutdownTimeout = graceShutdownTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPageSize(int pageSize) {
|
||||||
|
this.pageSize = pageSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getRegionId() {
|
||||||
|
return regionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRegionId(String regionId) {
|
||||||
|
this.regionId = regionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, JobProperty> getJobs() {
|
||||||
|
return jobs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setJobs(Map<String, JobProperty> jobs) {
|
||||||
|
this.jobs = jobs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getAlarmChannel() {
|
||||||
|
return alarmChannel;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAlarmChannel(String alarmChannel) {
|
||||||
|
this.alarmChannel = alarmChannel;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, ContactInfo> getAlarmUsers() {
|
||||||
|
return alarmUsers;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAlarmUsers(Map<String, ContactInfo> alarmUsers) {
|
||||||
|
this.alarmUsers = alarmUsers;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getBroadcastDispatchThreadNum() {
|
||||||
|
return broadcastDispatchThreadNum;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setBroadcastDispatchThreadNum(int broadcastDispatchThreadNum) {
|
||||||
|
this.broadcastDispatchThreadNum = broadcastDispatchThreadNum;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isBroadcastDispatchThreadEnable() {
|
||||||
|
return broadcastDispatchThreadEnable;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setBroadcastDispatchThreadEnable(boolean broadcastDispatchThreadEnable) {
|
||||||
|
this.broadcastDispatchThreadEnable = broadcastDispatchThreadEnable;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getThreadPoolMode() {
|
||||||
|
return threadPoolMode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setThreadPoolMode(String threadPoolMode) {
|
||||||
|
this.threadPoolMode = threadPoolMode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Integer> getProcessorPoolSize() {
|
||||||
|
return processorPoolSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setProcessorPoolSize(Map<String, Integer> processorPoolSize) {
|
||||||
|
this.processorPoolSize = processorPoolSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getSharePoolQueueSize() {
|
||||||
|
return sharePoolQueueSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSharePoolQueueSize(int sharePoolQueueSize) {
|
||||||
|
this.sharePoolQueueSize = sharePoolQueueSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isMapMasterDispatchRandom() {
|
||||||
|
return mapMasterDispatchRandom;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMapMasterDispatchRandom(boolean mapMasterDispatchRandom) {
|
||||||
|
this.mapMasterDispatchRandom = mapMasterDispatchRandom;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isBroadcastMasterExecEnable() {
|
||||||
|
return broadcastMasterExecEnable;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setBroadcastMasterExecEnable(boolean broadcastMasterExecEnable) {
|
||||||
|
this.broadcastMasterExecEnable = broadcastMasterExecEnable;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getBroadcastDispatchRetryTimes() {
|
||||||
|
return broadcastDispatchRetryTimes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setBroadcastDispatchRetryTimes(int broadcastDispatchRetryTimes) {
|
||||||
|
this.broadcastDispatchRetryTimes = broadcastDispatchRetryTimes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getMapMasterRouterStrategy() {
|
||||||
|
return mapMasterRouterStrategy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMapMasterRouterStrategy(Integer mapMasterRouterStrategy) {
|
||||||
|
this.mapMasterRouterStrategy = mapMasterRouterStrategy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getH2DatabaseUser() {
|
||||||
|
return h2DatabaseUser;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setH2DatabaseUser(String h2DatabaseUser) {
|
||||||
|
this.h2DatabaseUser = h2DatabaseUser;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getH2DatabasePassword() {
|
||||||
|
return h2DatabasePassword;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setH2DatabasePassword(String h2DatabasePassword) {
|
||||||
|
this.h2DatabasePassword = h2DatabasePassword;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Boolean getHttpServerEnable() {
|
||||||
|
return httpServerEnable;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setHttpServerEnable(Boolean httpServerEnable) {
|
||||||
|
this.httpServerEnable = httpServerEnable;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getHttpServerPort() {
|
||||||
|
return httpServerPort;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setHttpServerPort(Integer httpServerPort) {
|
||||||
|
this.httpServerPort = httpServerPort;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Float getMaxMapDiskPercent() {
|
||||||
|
return maxMapDiskPercent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxMapDiskPercent(float maxMapDiskPercent) {
|
||||||
|
this.maxMapDiskPercent = maxMapDiskPercent;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterPropertiesSet() {
|
||||||
|
logger.info("SchedulerxProperties->" + JsonUtil.toJson(this));
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,85 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2024-2025 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.alibaba.cloud.scheduling.schedulerx.constants;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Schedulerx constants.
|
||||||
|
*
|
||||||
|
* @author yaohui
|
||||||
|
*/
|
||||||
|
public final class SchedulerxConstants {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Schedulerx default namespace source.
|
||||||
|
*/
|
||||||
|
public static final String NAMESPACE_SOURCE_SPRINGBOOT = "springboot";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Aliyun pop product.
|
||||||
|
*/
|
||||||
|
public static final String ALIYUN_POP_PRODUCT = "schedulerx2";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Aliyun pop endpoint.
|
||||||
|
*/
|
||||||
|
public static final String ALIYUN_POP_SCHEDULERX_ENDPOINT = "schedulerx.aliyuncs.com";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Second delay max value.
|
||||||
|
*/
|
||||||
|
public static final int SECOND_DELAY_MAX_VALUE = 60;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Second delay min value.
|
||||||
|
*/
|
||||||
|
public static final int SECOND_DELAY_MIN_VALUE = 1;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Job timeout default value.
|
||||||
|
*/
|
||||||
|
public static final long JOB_TIMEOUT_DEFAULT = 3600L;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Job retry count default value.
|
||||||
|
*/
|
||||||
|
public static final int JOB_RETRY_COUNT_DEFAULT = 3;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Job retry interval default value.
|
||||||
|
*/
|
||||||
|
public static final int JOB_RETRY_INTERVAL_DEFAULT = 30;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Job alarm channel default value.
|
||||||
|
*/
|
||||||
|
public static final String JOB_ALARM_CHANNEL_DEFAULT = "default";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Job model mapreduce alias.
|
||||||
|
*/
|
||||||
|
public static final String JOB_MODEL_MAPREDUCE_ALIAS = "mapreduce";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return super.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
private SchedulerxConstants() {
|
||||||
|
throw new AssertionError("Must not instantiate constant utility class");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,435 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2024-2025 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.alibaba.cloud.scheduling.schedulerx.service;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import com.alibaba.cloud.scheduling.schedulerx.JobProperty;
|
||||||
|
import com.alibaba.cloud.scheduling.schedulerx.SchedulerxProperties;
|
||||||
|
import com.alibaba.cloud.scheduling.schedulerx.constants.SchedulerxConstants;
|
||||||
|
import com.alibaba.cloud.scheduling.schedulerx.util.CronExpression;
|
||||||
|
import com.alibaba.schedulerx.common.domain.ContactInfo;
|
||||||
|
import com.alibaba.schedulerx.common.domain.ExecuteMode;
|
||||||
|
import com.alibaba.schedulerx.common.domain.JobType;
|
||||||
|
import com.alibaba.schedulerx.common.domain.TimeType;
|
||||||
|
import com.alibaba.schedulerx.common.sdk.common.MonitorConfig;
|
||||||
|
import com.alibaba.schedulerx.common.util.JsonUtil;
|
||||||
|
import com.alibaba.schedulerx.common.util.StringUtils;
|
||||||
|
import com.alibaba.schedulerx.worker.log.LogFactory;
|
||||||
|
import com.alibaba.schedulerx.worker.log.Logger;
|
||||||
|
import com.aliyuncs.DefaultAcsClient;
|
||||||
|
import com.aliyuncs.auth.InstanceProfileCredentialsProvider;
|
||||||
|
import com.aliyuncs.exceptions.ClientException;
|
||||||
|
import com.aliyuncs.profile.DefaultProfile;
|
||||||
|
import com.aliyuncs.schedulerx2.model.v20190430.CreateAppGroupRequest;
|
||||||
|
import com.aliyuncs.schedulerx2.model.v20190430.CreateAppGroupResponse;
|
||||||
|
import com.aliyuncs.schedulerx2.model.v20190430.CreateJobRequest;
|
||||||
|
import com.aliyuncs.schedulerx2.model.v20190430.CreateJobResponse;
|
||||||
|
import com.aliyuncs.schedulerx2.model.v20190430.CreateNamespaceRequest;
|
||||||
|
import com.aliyuncs.schedulerx2.model.v20190430.CreateNamespaceResponse;
|
||||||
|
import com.aliyuncs.schedulerx2.model.v20190430.GetJobInfoRequest;
|
||||||
|
import com.aliyuncs.schedulerx2.model.v20190430.GetJobInfoResponse;
|
||||||
|
import com.aliyuncs.schedulerx2.model.v20190430.GetJobInfoResponse.Data.JobConfigInfo;
|
||||||
|
import com.aliyuncs.schedulerx2.model.v20190430.UpdateJobRequest;
|
||||||
|
import com.aliyuncs.schedulerx2.model.v20190430.UpdateJobResponse;
|
||||||
|
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* JobSyncService.
|
||||||
|
*
|
||||||
|
* @author xiaomeng.hxm
|
||||||
|
*/
|
||||||
|
public class JobSyncService {
|
||||||
|
|
||||||
|
private static final Logger logger = LogFactory.getLogger(JobSyncService.class);
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private SchedulerxProperties properties;
|
||||||
|
|
||||||
|
private DefaultAcsClient client;
|
||||||
|
|
||||||
|
private synchronized DefaultAcsClient getClient() {
|
||||||
|
// build aliyun pop client
|
||||||
|
if (client == null) {
|
||||||
|
DefaultProfile.addEndpoint(properties.getRegionId(), SchedulerxConstants.ALIYUN_POP_PRODUCT, SchedulerxConstants.ALIYUN_POP_SCHEDULERX_ENDPOINT);
|
||||||
|
if (StringUtils.isNotEmpty(properties.getAliyunRamRole())) {
|
||||||
|
DefaultProfile profile = DefaultProfile.getProfile(properties.getRegionId());
|
||||||
|
InstanceProfileCredentialsProvider provider = new InstanceProfileCredentialsProvider(
|
||||||
|
properties.getAliyunRamRole());
|
||||||
|
client = new DefaultAcsClient(profile, provider);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
DefaultProfile defaultProfile = DefaultProfile.getProfile(properties.getRegionId(),
|
||||||
|
properties.getAliyunAccessKey(),
|
||||||
|
properties.getAliyunSecretKey());
|
||||||
|
client = new DefaultAcsClient(defaultProfile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sync job config.
|
||||||
|
*
|
||||||
|
* @param jobs job configs
|
||||||
|
* @param namespaceSource namespace source
|
||||||
|
* @throws Exception sync job config exception
|
||||||
|
*/
|
||||||
|
public synchronized void syncJobs(Map<String, JobProperty> jobs, String namespaceSource) throws Exception {
|
||||||
|
DefaultAcsClient client = getClient();
|
||||||
|
for (Entry<String, JobProperty> entry : jobs.entrySet()) {
|
||||||
|
String jobName = entry.getKey();
|
||||||
|
JobProperty jobProperty = entry.getValue();
|
||||||
|
JobConfigInfo jobConfigInfo = getJob(client, jobName, namespaceSource);
|
||||||
|
if (jobConfigInfo == null) {
|
||||||
|
createJob(client, jobName, jobProperty, namespaceSource);
|
||||||
|
}
|
||||||
|
else if (jobProperty.isOverwrite()) {
|
||||||
|
updateJob(client, jobConfigInfo, jobProperty, namespaceSource);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* sync jobs.
|
||||||
|
*
|
||||||
|
* @throws Exception sync jobs exception
|
||||||
|
*/
|
||||||
|
public void syncJobs() throws Exception {
|
||||||
|
// 1. create namespace
|
||||||
|
if (syncNamespace(getClient())) {
|
||||||
|
// 2. create app group
|
||||||
|
if (syncAppGroup(getClient())) {
|
||||||
|
syncJobs(properties.getJobs(), getNamespaceSource());
|
||||||
|
properties.setNamespaceSource(getNamespaceSource());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* sync namespace.
|
||||||
|
*
|
||||||
|
* @param client pop client
|
||||||
|
* @return true if success
|
||||||
|
* @throws Exception sync namespace exception
|
||||||
|
*/
|
||||||
|
public boolean syncNamespace(DefaultAcsClient client) throws Exception {
|
||||||
|
if (StringUtils.isEmpty(properties.getNamespace())) {
|
||||||
|
logger.error("please set {}.namespace", SchedulerxProperties.CONFIG_PREFIX);
|
||||||
|
throw new IOException(String.format("please set %s.namespace", SchedulerxProperties.CONFIG_PREFIX));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (StringUtils.isEmpty(properties.getNamespaceName())) {
|
||||||
|
logger.error("please set {}.namespaceName", SchedulerxProperties.CONFIG_PREFIX);
|
||||||
|
throw new IOException(String.format("please set %s.namespaceName", SchedulerxProperties.CONFIG_PREFIX));
|
||||||
|
}
|
||||||
|
|
||||||
|
CreateNamespaceRequest request = new CreateNamespaceRequest();
|
||||||
|
request.setUid(properties.getNamespace());
|
||||||
|
request.setName(properties.getNamespaceName());
|
||||||
|
request.setSource(getNamespaceSource());
|
||||||
|
CreateNamespaceResponse response = client.getAcsResponse(request);
|
||||||
|
if (response.getSuccess()) {
|
||||||
|
logger.info(JsonUtil.toJson(response));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
throw new IOException(response.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* sync app group.
|
||||||
|
*
|
||||||
|
* @param client pop client
|
||||||
|
* @return sync app group result
|
||||||
|
* @throws IOException sync app group exception.
|
||||||
|
* @throws ClientException sync app group pop client exception.
|
||||||
|
*/
|
||||||
|
public boolean syncAppGroup(DefaultAcsClient client) throws IOException, ClientException {
|
||||||
|
if (StringUtils.isEmpty(properties.getAppName())) {
|
||||||
|
logger.error("please set {}.appName", SchedulerxProperties.CONFIG_PREFIX);
|
||||||
|
throw new IOException(String.format("please set %s.appName", SchedulerxProperties.CONFIG_PREFIX));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (StringUtils.isEmpty(properties.getAppKey())) {
|
||||||
|
logger.error("please set {}.appKey", SchedulerxProperties.CONFIG_PREFIX);
|
||||||
|
throw new IOException(String.format("please set %s.appKey", SchedulerxProperties.CONFIG_PREFIX));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (StringUtils.isEmpty(properties.getGroupId())) {
|
||||||
|
logger.error("please set {}.groupId", SchedulerxProperties.CONFIG_PREFIX);
|
||||||
|
throw new IOException(String.format("please set %s.groupId", SchedulerxProperties.CONFIG_PREFIX));
|
||||||
|
}
|
||||||
|
|
||||||
|
CreateAppGroupRequest request = new CreateAppGroupRequest();
|
||||||
|
request.setNamespace(properties.getNamespace());
|
||||||
|
request.setNamespaceSource(getNamespaceSource());
|
||||||
|
request.setAppName(properties.getAppName());
|
||||||
|
request.setGroupId(properties.getGroupId());
|
||||||
|
request.setAppKey(properties.getAppKey());
|
||||||
|
if (StringUtils.isNotEmpty(properties.getAlarmChannel())) {
|
||||||
|
MonitorConfig monitorConfig = new MonitorConfig();
|
||||||
|
monitorConfig.setSendChannel(properties.getAlarmChannel());
|
||||||
|
request.setMonitorConfigJson(JsonUtil.toJson(monitorConfig));
|
||||||
|
}
|
||||||
|
if (!properties.getAlarmUsers().isEmpty()) {
|
||||||
|
List<ContactInfo> contactInfos = new ArrayList(properties.getAlarmUsers().values());
|
||||||
|
request.setMonitorContactsJson(JsonUtil.toJson(contactInfos));
|
||||||
|
}
|
||||||
|
CreateAppGroupResponse response = client.getAcsResponse(request);
|
||||||
|
if (response.getSuccess()) {
|
||||||
|
logger.info(JsonUtil.toJson(response));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
throw new IOException(response.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get job config info.
|
||||||
|
*
|
||||||
|
* @param client pop client
|
||||||
|
* @param jobName job name
|
||||||
|
* @param namespaceSource namespace source
|
||||||
|
* @return job config info.
|
||||||
|
* @throws Exception get job config info exception.
|
||||||
|
*/
|
||||||
|
private JobConfigInfo getJob(DefaultAcsClient client, String jobName, String namespaceSource) throws Exception {
|
||||||
|
GetJobInfoRequest request = new GetJobInfoRequest();
|
||||||
|
request.setNamespace(properties.getNamespace());
|
||||||
|
request.setNamespaceSource(namespaceSource);
|
||||||
|
request.setGroupId(properties.getGroupId());
|
||||||
|
request.setJobId(0L);
|
||||||
|
request.setJobName(jobName);
|
||||||
|
GetJobInfoResponse response = client.getAcsResponse(request);
|
||||||
|
if (response.getSuccess()) {
|
||||||
|
return response.getData().getJobConfigInfo();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* create job.
|
||||||
|
*
|
||||||
|
* @param client pop client
|
||||||
|
* @param jobName job name
|
||||||
|
* @param jobProperty job property
|
||||||
|
* @param namespaceSource namespace source
|
||||||
|
* @throws Exception create job exception
|
||||||
|
*/
|
||||||
|
private void createJob(DefaultAcsClient client, String jobName, JobProperty jobProperty, String namespaceSource) throws Exception {
|
||||||
|
CreateJobRequest request = new CreateJobRequest();
|
||||||
|
request.setNamespace(properties.getNamespace());
|
||||||
|
request.setNamespaceSource(namespaceSource);
|
||||||
|
request.setGroupId(properties.getGroupId());
|
||||||
|
request.setName(jobName);
|
||||||
|
request.setParameters(jobProperty.getJobParameter());
|
||||||
|
|
||||||
|
if (jobProperty.getJobType().equals(JobType.JAVA.getKey())) {
|
||||||
|
request.setJobType(JobType.JAVA.getKey());
|
||||||
|
request.setClassName(jobProperty.getClassName());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
request.setJobType(jobProperty.getJobType());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (SchedulerxConstants.JOB_MODEL_MAPREDUCE_ALIAS.equals(jobProperty.getJobModel())) {
|
||||||
|
request.setExecuteMode(ExecuteMode.BATCH.getKey());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
request.setExecuteMode(jobProperty.getJobModel());
|
||||||
|
}
|
||||||
|
if (StringUtils.isNotEmpty(jobProperty.getDescription())) {
|
||||||
|
request.setDescription(jobProperty.getDescription());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (StringUtils.isNotEmpty(jobProperty.getContent())) {
|
||||||
|
request.setContent(jobProperty.getContent());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (StringUtils.isNotEmpty(jobProperty.getCron()) && StringUtils.isNotEmpty(jobProperty.getOneTime())) {
|
||||||
|
throw new IOException("cron and oneTime shouldn't set together");
|
||||||
|
}
|
||||||
|
if (StringUtils.isNotEmpty(jobProperty.getCron())) {
|
||||||
|
CronExpression cronExpression = new CronExpression(jobProperty.getCron());
|
||||||
|
Date now = new Date();
|
||||||
|
Date nextData = cronExpression.getTimeAfter(now);
|
||||||
|
Date next2Data = cronExpression.getTimeAfter(nextData);
|
||||||
|
if (nextData != null && next2Data != null) {
|
||||||
|
long interval = TimeUnit.MILLISECONDS.toSeconds((next2Data.getTime() - nextData.getTime()));
|
||||||
|
if (interval < SchedulerxConstants.SECOND_DELAY_MAX_VALUE) {
|
||||||
|
request.setTimeType(TimeType.SECOND_DELAY.getValue());
|
||||||
|
request.setTimeExpression(String.valueOf(interval < SchedulerxConstants.SECOND_DELAY_MIN_VALUE ?
|
||||||
|
SchedulerxConstants.SECOND_DELAY_MIN_VALUE : interval));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
request.setTimeType(TimeType.CRON.getValue());
|
||||||
|
request.setTimeExpression(jobProperty.getCron());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
request.setTimeType(TimeType.CRON.getValue());
|
||||||
|
request.setTimeExpression(jobProperty.getCron());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (StringUtils.isNotEmpty(jobProperty.getOneTime())) {
|
||||||
|
request.setTimeType(TimeType.ONE_TIME.getValue());
|
||||||
|
request.setTimeExpression(jobProperty.getOneTime());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
request.setTimeType(TimeType.API.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (jobProperty.getTimeType() != null) {
|
||||||
|
request.setTimeType(jobProperty.getTimeType());
|
||||||
|
if (StringUtils.isNotEmpty(jobProperty.getTimeExpression())) {
|
||||||
|
request.setTimeExpression(jobProperty.getTimeExpression());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
request.setTimeoutEnable(true);
|
||||||
|
request.setTimeoutKillEnable(true);
|
||||||
|
request.setSendChannel(SchedulerxConstants.JOB_ALARM_CHANNEL_DEFAULT);
|
||||||
|
request.setFailEnable(true);
|
||||||
|
request.setTimeout(SchedulerxConstants.JOB_TIMEOUT_DEFAULT);
|
||||||
|
request.setMaxAttempt(SchedulerxConstants.JOB_RETRY_COUNT_DEFAULT);
|
||||||
|
request.setAttemptInterval(SchedulerxConstants.JOB_RETRY_INTERVAL_DEFAULT);
|
||||||
|
CreateJobResponse response = client.getAcsResponse(request);
|
||||||
|
if (response.getSuccess()) {
|
||||||
|
logger.info("create schedulerx job successfully, jobId={}, jobName={}", response.getData().getJobId(), jobName);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
throw new IOException("create schedulerx job failed, jobName=" + jobName + ", message=" + response.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* update job.
|
||||||
|
*
|
||||||
|
* @param client pop client
|
||||||
|
* @param jobConfigInfo job config info
|
||||||
|
* @param jobProperty job property
|
||||||
|
* @param namespaceSource namespace source
|
||||||
|
* @throws Exception update job exception
|
||||||
|
*/
|
||||||
|
private void updateJob(DefaultAcsClient client, JobConfigInfo jobConfigInfo, JobProperty jobProperty, String namespaceSource) throws Exception {
|
||||||
|
String executeMode = jobProperty.getJobModel();
|
||||||
|
if (SchedulerxConstants.JOB_MODEL_MAPREDUCE_ALIAS.equals(jobProperty.getJobModel())) {
|
||||||
|
executeMode = ExecuteMode.BATCH.getKey();
|
||||||
|
}
|
||||||
|
int timeType;
|
||||||
|
String timeExpression = null;
|
||||||
|
if (StringUtils.isNotEmpty(jobProperty.getCron()) && StringUtils.isNotEmpty(jobProperty.getOneTime())) {
|
||||||
|
throw new IOException("cron and oneTime shouldn't set together");
|
||||||
|
}
|
||||||
|
if (StringUtils.isNotEmpty(jobProperty.getCron())) {
|
||||||
|
CronExpression cronExpression = new CronExpression(jobProperty.getCron());
|
||||||
|
Date now = new Date();
|
||||||
|
Date nextData = cronExpression.getTimeAfter(now);
|
||||||
|
Date next2Data = cronExpression.getTimeAfter(nextData);
|
||||||
|
if (nextData != null && next2Data != null) {
|
||||||
|
long interval = TimeUnit.MILLISECONDS.toSeconds((next2Data.getTime() - nextData.getTime()));
|
||||||
|
if (interval < SchedulerxConstants.SECOND_DELAY_MAX_VALUE) {
|
||||||
|
timeType = TimeType.SECOND_DELAY.getValue();
|
||||||
|
timeExpression = String.valueOf(interval < SchedulerxConstants.SECOND_DELAY_MIN_VALUE ?
|
||||||
|
SchedulerxConstants.SECOND_DELAY_MIN_VALUE : interval);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
timeType = TimeType.CRON.getValue();
|
||||||
|
timeExpression = jobProperty.getCron();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
timeType = TimeType.CRON.getValue();
|
||||||
|
timeExpression = jobProperty.getCron();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (StringUtils.isNotEmpty(jobProperty.getOneTime())) {
|
||||||
|
timeType = TimeType.ONE_TIME.getValue();
|
||||||
|
timeExpression = jobProperty.getOneTime();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
timeType = TimeType.API.getValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!jobConfigInfo.getDescription().equals(jobProperty.getDescription())
|
||||||
|
|| !jobConfigInfo.getClassName().equals(jobProperty.getClassName())
|
||||||
|
|| !jobConfigInfo.getParameters().equals(jobProperty.getJobParameter())
|
||||||
|
|| !jobConfigInfo.getExecuteMode().equals(executeMode)
|
||||||
|
|| jobConfigInfo.getTimeConfig().getTimeType() != timeType
|
||||||
|
|| !jobConfigInfo.getTimeConfig().getTimeExpression().equals(timeExpression)) {
|
||||||
|
|
||||||
|
UpdateJobRequest request = new UpdateJobRequest();
|
||||||
|
request.setNamespace(properties.getNamespace());
|
||||||
|
request.setNamespaceSource(namespaceSource);
|
||||||
|
request.setGroupId(properties.getGroupId());
|
||||||
|
request.setJobId(jobConfigInfo.getJobId());
|
||||||
|
request.setName(jobConfigInfo.getName());
|
||||||
|
request.setParameters(jobProperty.getJobParameter());
|
||||||
|
|
||||||
|
//java任务
|
||||||
|
if (jobProperty.getJobType().equals(JobType.JAVA.getKey())) {
|
||||||
|
request.setClassName(jobProperty.getClassName());
|
||||||
|
}
|
||||||
|
request.setExecuteMode(executeMode);
|
||||||
|
if (StringUtils.isNotEmpty(jobProperty.getDescription())) {
|
||||||
|
request.setDescription(jobProperty.getDescription());
|
||||||
|
}
|
||||||
|
request.setTimeType(timeType);
|
||||||
|
request.setTimeExpression(timeExpression);
|
||||||
|
|
||||||
|
request.setTimeoutEnable(true);
|
||||||
|
request.setTimeoutKillEnable(true);
|
||||||
|
request.setSendChannel(SchedulerxConstants.JOB_ALARM_CHANNEL_DEFAULT);
|
||||||
|
request.setFailEnable(true);
|
||||||
|
request.setTimeout(SchedulerxConstants.JOB_TIMEOUT_DEFAULT);
|
||||||
|
request.setMaxAttempt(SchedulerxConstants.JOB_RETRY_COUNT_DEFAULT);
|
||||||
|
request.setAttemptInterval(SchedulerxConstants.JOB_RETRY_INTERVAL_DEFAULT);
|
||||||
|
|
||||||
|
UpdateJobResponse response = client.getAcsResponse(request);
|
||||||
|
if (response.getSuccess()) {
|
||||||
|
logger.info("update schedulerx job successfully, jobId={}, jobName={}", jobConfigInfo.getJobId(), jobConfigInfo.getName());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
throw new IOException("update schedulerx job failed, jobName=" + jobConfigInfo.getName() + ", message=" + response.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get namespace source.
|
||||||
|
*
|
||||||
|
* @return namespace source
|
||||||
|
*/
|
||||||
|
private String getNamespaceSource() {
|
||||||
|
if (StringUtils.isEmpty(properties.getNamespaceSource())) {
|
||||||
|
return SchedulerxConstants.NAMESPACE_SOURCE_SPRINGBOOT;
|
||||||
|
}
|
||||||
|
return properties.getNamespaceSource();
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,194 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2024-2025 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.alibaba.cloud.scheduling.schedulerx.service;
|
||||||
|
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import com.alibaba.cloud.scheduling.schedulerx.JobProperty;
|
||||||
|
import com.alibaba.cloud.scheduling.schedulerx.SchedulerxProperties;
|
||||||
|
import com.alibaba.schedulerx.common.domain.ExecuteMode;
|
||||||
|
import com.alibaba.schedulerx.common.domain.JobType;
|
||||||
|
import com.alibaba.schedulerx.common.domain.Pair;
|
||||||
|
import com.alibaba.schedulerx.common.domain.TimeType;
|
||||||
|
import com.alibaba.schedulerx.common.util.JsonUtil;
|
||||||
|
import com.alibaba.schedulerx.common.util.StringUtils;
|
||||||
|
import com.alibaba.schedulerx.scheduling.annotation.SchedulerX;
|
||||||
|
import com.alibaba.schedulerx.worker.domain.SpringScheduleProfile;
|
||||||
|
import com.alibaba.schedulerx.worker.log.LogFactory;
|
||||||
|
import com.alibaba.schedulerx.worker.log.Logger;
|
||||||
|
import com.alibaba.schedulerx.worker.processor.springscheduling.SchedulerxSchedulingConfigurer;
|
||||||
|
|
||||||
|
import org.springframework.aop.framework.AopProxyUtils;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.core.annotation.AnnotatedElementUtils;
|
||||||
|
import org.springframework.scheduling.annotation.SchedulingConfigurer;
|
||||||
|
import org.springframework.scheduling.config.CronTask;
|
||||||
|
import org.springframework.scheduling.config.IntervalTask;
|
||||||
|
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
|
||||||
|
import org.springframework.scheduling.config.Task;
|
||||||
|
import org.springframework.scheduling.support.ScheduledMethodRunnable;
|
||||||
|
import org.springframework.util.ClassUtils;
|
||||||
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Spring scheduled job sync.
|
||||||
|
*
|
||||||
|
* @author yaohui
|
||||||
|
* @create 2022/8/17 下午2:21
|
||||||
|
**/
|
||||||
|
public class ScheduledJobSyncConfigurer implements SchedulingConfigurer {
|
||||||
|
|
||||||
|
private static final Logger logger = LogFactory.getLogger(ScheduledJobSyncConfigurer.class);
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private JobSyncService jobSyncService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private SchedulerxProperties properties;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private SchedulerxSchedulingConfigurer schedulerxSchedulingConfigurer;
|
||||||
|
|
||||||
|
@Value("${" + SchedulerxProperties.CONFIG_PREFIX + ".task-overwrite:false}")
|
||||||
|
private Boolean overwrite = false;
|
||||||
|
|
||||||
|
@Value("${" + SchedulerxProperties.CONFIG_PREFIX + ".task-model-default:broadcast}")
|
||||||
|
private String defaultModel = ExecuteMode.BROADCAST.getKey();
|
||||||
|
|
||||||
|
private boolean isValidModel(String mode) {
|
||||||
|
if (mode == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return (ExecuteMode.BROADCAST.getKey().equals(mode) || ExecuteMode.STANDALONE.getKey().equals(mode));
|
||||||
|
}
|
||||||
|
|
||||||
|
private JobProperty convertToJobProperty(Task task, Object target, Method method) {
|
||||||
|
JobProperty jobProperty = new JobProperty();
|
||||||
|
Class targetClass = AopProxyUtils.ultimateTargetClass(target);
|
||||||
|
if (ClassUtils.isCglibProxyClass(targetClass)) {
|
||||||
|
targetClass = ClassUtils.getUserClass(target);
|
||||||
|
}
|
||||||
|
String jobName = targetClass.getSimpleName() + "_" + method.getName();
|
||||||
|
String model = this.defaultModel;
|
||||||
|
|
||||||
|
if (task != null && task instanceof CronTask) {
|
||||||
|
String expression = ((CronTask) task).getExpression();
|
||||||
|
jobProperty.setCron(expression);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (task != null && task instanceof IntervalTask) {
|
||||||
|
long interval = ((IntervalTask) task).getInterval() / 1000;
|
||||||
|
interval = interval < 1 ? 1 : interval;
|
||||||
|
if (interval < 60) {
|
||||||
|
jobProperty.setTimeType(TimeType.SECOND_DELAY.getValue());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
jobProperty.setTimeType(TimeType.FIXED_RATE.getValue());
|
||||||
|
}
|
||||||
|
jobProperty.setTimeExpression(String.valueOf(interval));
|
||||||
|
}
|
||||||
|
|
||||||
|
SchedulerX schedulerXMethod = AnnotatedElementUtils.getMergedAnnotation(method, SchedulerX.class);
|
||||||
|
if (schedulerXMethod != null) {
|
||||||
|
if (StringUtils.isNotEmpty(schedulerXMethod.name())) {
|
||||||
|
jobName = schedulerXMethod.name();
|
||||||
|
}
|
||||||
|
if (isValidModel(schedulerXMethod.model())) {
|
||||||
|
model = schedulerXMethod.model();
|
||||||
|
}
|
||||||
|
if (StringUtils.isNotEmpty(schedulerXMethod.cron())) {
|
||||||
|
jobProperty.setCron(schedulerXMethod.cron());
|
||||||
|
}
|
||||||
|
if (schedulerXMethod.fixedRate() > 0) {
|
||||||
|
long interval = schedulerXMethod.timeUnit().toSeconds(schedulerXMethod.fixedRate());
|
||||||
|
interval = interval < 1 ? 1 : interval;
|
||||||
|
if (interval < 60) {
|
||||||
|
jobProperty.setTimeType(TimeType.SECOND_DELAY.getValue());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
jobProperty.setTimeType(TimeType.FIXED_RATE.getValue());
|
||||||
|
}
|
||||||
|
jobProperty.setTimeExpression(String.valueOf(interval));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
jobProperty.setJobName(jobName);
|
||||||
|
jobProperty.setJobType(JobType.SPRINGSCHEDULE.getKey());
|
||||||
|
jobProperty.setJobModel(model);
|
||||||
|
SpringScheduleProfile profile = new SpringScheduleProfile();
|
||||||
|
profile.setClassName(targetClass.getName());
|
||||||
|
profile.setMethod(method.getName());
|
||||||
|
jobProperty.setContent(JsonUtil.toJson(profile));
|
||||||
|
jobProperty.setOverwrite(overwrite);
|
||||||
|
return jobProperty;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
|
||||||
|
logger.info("spring scheduled job is not empty, start to sync jobs...");
|
||||||
|
try {
|
||||||
|
Map<String, JobProperty> jobs = new HashMap<>();
|
||||||
|
if (!CollectionUtils.isEmpty(taskRegistrar.getCronTaskList())) {
|
||||||
|
for (CronTask cronTask : taskRegistrar.getCronTaskList()) {
|
||||||
|
if (cronTask.getRunnable() instanceof ScheduledMethodRunnable) {
|
||||||
|
ScheduledMethodRunnable runnable = (ScheduledMethodRunnable) cronTask.getRunnable();
|
||||||
|
JobProperty jobProperty = convertToJobProperty(cronTask, runnable.getTarget(), runnable.getMethod());
|
||||||
|
jobs.put(jobProperty.getJobName(), jobProperty);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!CollectionUtils.isEmpty(taskRegistrar.getFixedDelayTaskList())) {
|
||||||
|
for (IntervalTask intervalTask : taskRegistrar.getFixedDelayTaskList()) {
|
||||||
|
if (intervalTask.getRunnable() instanceof ScheduledMethodRunnable) {
|
||||||
|
ScheduledMethodRunnable runnable = (ScheduledMethodRunnable) intervalTask.getRunnable();
|
||||||
|
JobProperty jobProperty = convertToJobProperty(intervalTask, runnable.getTarget(), runnable.getMethod());
|
||||||
|
jobs.put(jobProperty.getJobName(), jobProperty);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!CollectionUtils.isEmpty(taskRegistrar.getFixedRateTaskList())) {
|
||||||
|
for (IntervalTask intervalTask : taskRegistrar.getFixedRateTaskList()) {
|
||||||
|
if (intervalTask.getRunnable() instanceof ScheduledMethodRunnable) {
|
||||||
|
ScheduledMethodRunnable runnable = (ScheduledMethodRunnable) intervalTask.getRunnable();
|
||||||
|
JobProperty jobProperty = convertToJobProperty(intervalTask, runnable.getTarget(), runnable.getMethod());
|
||||||
|
jobs.put(jobProperty.getJobName(), jobProperty);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 获取仅SchedulerX注解任务
|
||||||
|
Collection<Pair<Object, Method>> schedulerXTasks = schedulerxSchedulingConfigurer.getSchedulerXTaskTargets();
|
||||||
|
if (schedulerXTasks != null && schedulerXTasks.size() > 0) {
|
||||||
|
for (Pair<Object, Method> task : schedulerXTasks) {
|
||||||
|
JobProperty jobProperty = convertToJobProperty(null, task.getFirst(), task.getSecond());
|
||||||
|
jobs.put(jobProperty.getJobName(), jobProperty);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
jobSyncService.syncJobs(jobs, properties.getNamespaceSource());
|
||||||
|
logger.info("spring scheduled job is not empty, sync jobs finished.");
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
logger.info("spring scheduled job is not empty, sync jobs failed.", e);
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,69 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2024-2025 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.alibaba.cloud.scheduling.shedlock;
|
||||||
|
|
||||||
|
import javax.sql.DataSource;
|
||||||
|
|
||||||
|
import com.alibaba.cloud.scheduling.SchedulingConstants;
|
||||||
|
import net.javacrumbs.shedlock.core.LockProvider;
|
||||||
|
import net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider;
|
||||||
|
import net.javacrumbs.shedlock.spring.annotation.EnableSchedulerLock;
|
||||||
|
|
||||||
|
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
|
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.core.io.ClassPathResource;
|
||||||
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
import org.springframework.jdbc.datasource.init.DataSourceInitializer;
|
||||||
|
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author yaohui
|
||||||
|
**/
|
||||||
|
@Configuration(proxyBeanMethods = false)
|
||||||
|
@AutoConfigureAfter(DataSourceAutoConfiguration.class)
|
||||||
|
@EnableSchedulerLock(defaultLockAtMostFor = "1m")
|
||||||
|
@ConditionalOnBean(DataSource.class)
|
||||||
|
@ConditionalOnProperty(name = SchedulingConstants.SCHEDULING_CONFIG_DISTRIBUTED_MODE_KEY, havingValue = "shedlock")
|
||||||
|
public class ShedLockAutoConfigure {
|
||||||
|
|
||||||
|
private static final String SCHEDULED_LOCK_SCHEMA_PATH = "shedlock/schema/schema-mysql.sql";
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@ConditionalOnMissingBean
|
||||||
|
public LockProvider lockProvider(DataSource dataSource) {
|
||||||
|
return new JdbcTemplateLockProvider(JdbcTemplateLockProvider.Configuration.builder()
|
||||||
|
.withJdbcTemplate(new JdbcTemplate(dataSource))
|
||||||
|
.usingDbTime()
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@ConditionalOnMissingBean
|
||||||
|
public DataSourceInitializer shedLockDataSourceInitializer(DataSource dataSource) {
|
||||||
|
ResourceDatabasePopulator resourceDatabasePopulator = new ResourceDatabasePopulator();
|
||||||
|
resourceDatabasePopulator.addScript(new ClassPathResource(SCHEDULED_LOCK_SCHEMA_PATH));
|
||||||
|
DataSourceInitializer dataSourceInitializer = new DataSourceInitializer();
|
||||||
|
dataSourceInitializer.setDataSource(dataSource);
|
||||||
|
dataSourceInitializer.setDatabasePopulator(resourceDatabasePopulator);
|
||||||
|
return dataSourceInitializer;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,3 @@
|
|||||||
|
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||||
|
com.alibaba.cloud.scheduling.schedulerx.SchedulerxAutoConfigure,\
|
||||||
|
com.alibaba.cloud.scheduling.shedlock.ShedLockAutoConfigure
|
@ -0,0 +1,2 @@
|
|||||||
|
CREATE TABLE IF NOT EXISTS shedlock(name VARCHAR(64) NOT NULL, lock_until TIMESTAMP(3) NOT NULL,
|
||||||
|
locked_at TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), locked_by VARCHAR(255) NOT NULL, PRIMARY KEY (name));
|
@ -0,0 +1,73 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2024-2025 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.alibaba.cloud.scheduling.schedulerx.util;
|
||||||
|
|
||||||
|
import java.text.ParseException;
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author yaohui
|
||||||
|
*/
|
||||||
|
class CronExpressionTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void isValidExpression() {
|
||||||
|
assertThat(CronExpression.isValidExpression("0 0 0 * * ?")).isEqualTo(true);
|
||||||
|
assertThat(CronExpression.isValidExpression("0 */5 * * * ?")).isEqualTo(true);
|
||||||
|
assertThat(CronExpression.isValidExpression("0 0 8 1 JAN ?")).isEqualTo(true);
|
||||||
|
assertThat(CronExpression.isValidExpression("0 0 8 ? 10 THU")).isEqualTo(true);
|
||||||
|
assertThat(CronExpression.isValidExpression("0 0 8 ? 10 THU-SAT")).isEqualTo(true);
|
||||||
|
assertThat(CronExpression.isValidExpression("0 0 8 ? 10 FRI#1")).isEqualTo(true);
|
||||||
|
assertThat(CronExpression.isValidExpression("0 0 8 ? 10 FRI#5")).isEqualTo(true);
|
||||||
|
// false
|
||||||
|
assertThat(CronExpression.isValidExpression("0 0 8 1 JAW ?")).isEqualTo(false);
|
||||||
|
assertThat(CronExpression.isValidExpression("0 0 8 ? 10 THP-SAT")).isEqualTo(false);
|
||||||
|
assertThat(CronExpression.isValidExpression("0 0 8 ? 10 FRI#0")).isEqualTo(false);
|
||||||
|
assertThat(CronExpression.isValidExpression("0 0 8 ? 10 FRI#6")).isEqualTo(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void getTimeAfter() throws ParseException {
|
||||||
|
CronExpression cronExpression = new CronExpression("0 0 8 1 JAN ?");
|
||||||
|
Date nextDate = cronExpression.getTimeAfter(new Date());
|
||||||
|
System.out.println(nextDate);
|
||||||
|
assertThat(nextDate).isNotNull();
|
||||||
|
|
||||||
|
cronExpression = new CronExpression("0 */5 * * * ?");
|
||||||
|
nextDate = cronExpression.getTimeAfter(new Date());
|
||||||
|
System.out.println(nextDate);
|
||||||
|
assertThat(nextDate).isNotNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void getTimeBefore() throws ParseException {
|
||||||
|
CronExpression cronExpression = new CronExpression("0 0 8 1 JAN ?");
|
||||||
|
Date beforeDate = cronExpression.getTimeBefore(new Date());
|
||||||
|
System.out.println(beforeDate);
|
||||||
|
assertThat(beforeDate).isNotNull();
|
||||||
|
|
||||||
|
cronExpression = new CronExpression("0 */5 * * * ?");
|
||||||
|
beforeDate = cronExpression.getTimeBefore(new Date());
|
||||||
|
System.out.println(beforeDate);
|
||||||
|
assertThat(beforeDate).isNotNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue