Merge pull request #112 from fangjian0423/master

Add RocketMQ binder and refactor Sentinel ReadableDataSource
pull/117/head
xiaojing 6 years ago committed by GitHub
commit c34ecfffd8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -88,6 +88,7 @@
<module>spring-cloud-alibaba-sentinel-datasource</module>
<module>spring-cloud-alibaba-nacos-config</module>
<module>spring-cloud-alibaba-nacos-discovery</module>
<module>spring-cloud-stream-binder-rocketmq</module>
<module>spring-cloud-alibaba-nacos-config-server</module>
<module>spring-cloud-alicloud-context</module>
<module>spring-cloud-alibaba-examples</module>

@ -24,6 +24,8 @@
<aliyun.sdk.version>4.0.1</aliyun.sdk.version>
<alicloud.context.version>1.0.0</alicloud.context.version>
<aliyun.sdk.edas.version>2.16.0</aliyun.sdk.edas.version>
<rocketmq.version>4.3.1</rocketmq.version>
<metrics.core>4.0.3</metrics.core>
</properties>
<dependencyManagement>
@ -125,6 +127,11 @@
<artifactId>sentinel-dubbo-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<!-- Aliyun OSS dependencies -->
@ -176,6 +183,11 @@
<artifactId>spring-cloud-alicloud-context</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Own dependencies - Starters -->
<dependency>
@ -212,6 +224,20 @@
<artifactId>spring-cloud-starter-alicloud-acm</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Third dependencies -->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>${metrics.core}</version>
</dependency>
<!-- Testing Dependencies -->

@ -1 +1,250 @@
== Spring Cloud Alibaba Sentinel
### Sentinel介绍
随着微服务的流行服务和服务之间的稳定性变得越来越重要。Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。
Sentinel 具有以下特征:
* *丰富的应用场景*Sentinel 承接了阿里巴巴近 10 年的双十一大促流量的核心场景,例如秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、实时熔断下游不可用应用等。
* *完备的实时监控*Sentinel 同时提供实时的监控功能。您可以在控制台中看到接入应用的单台机器秒级数据,甚至 500 台以下规模的集群的汇总运行情况。
* *广泛的开源生态*Sentinel 提供开箱即用的与其它开源框架/库的整合模块,例如与 Spring Cloud、Dubbo、gRPC 的整合。您只需要引入相应的依赖并进行简单的配置即可快速地接入 Sentinel。
* *完善的 SPI 扩展点*Sentinel 提供简单易用、完善的 SPI 扩展点。您可以通过实现扩展点,快速的定制逻辑。例如定制规则管理、适配数据源等。
### 如何使用Sentinel
如果要在您的项目中引入Sentinel使用group ID为 `org.springframework.cloud` 和artifact ID为 `spring-cloud-starter-alibaba-sentinel` 的starter。
```xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
```
下面这个例子就是一个最简单的使用Sentinel的例子:
```java
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(ServiceApplication.class, args);
}
}
@RestController
public class TestController {
@GetMapping(value = "/hello")
@SentinelResource("hello")
public String hello() {
return "Hello Sentinel";
}
}
```
@SentinelResource注解用来标识资源是否被限流、降级。上述例子上该注解的属性'hello'表示资源名。
@SentinelResource还提供了其它额外的属性如 `blockHandler``blockHandlerClass``fallback` 用于表示限流或降级的操作,更多内容可以参考 https://github.com/alibaba/Sentinel/wiki/%E6%B3%A8%E8%A7%A3%E6%94%AF%E6%8C%81[Sentinel注解支持]。
##### Sentinel 控制台
Sentinel 控制台提供一个轻量级的控制台,它提供机器发现、单机资源实时监控、集群资源汇总,以及规则管理的功能。您只需要对应用进行简单的配置,就可以使用这些功能。
*注意*: 集群资源汇总仅支持 500 台以下的应用集群,有大概 1 - 2 秒的延时。
.Sentinel Dashboard
image::https://github.com/alibaba/Sentinel/wiki/image/dashboard.png[]
开启该功能需要3个步骤
###### 获取控制台
您可以从 https://github.com/alibaba/Sentinel/releases[release 页面] 下载最新版本的控制台 jar 包。
您也可以从最新版本的源码自行构建 Sentinel 控制台:
* 下载 https://github.com/alibaba/Sentinel/tree/master/sentinel-dashboard[控制台] 工程
* 使用以下命令将代码打包成一个 fat jar: `mvn clean package`
###### 启动控制台
Sentinel 控制台是一个标准的SpringBoot应用以SpringBoot的方式运行jar包即可。
```shell
java -Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard.jar
```
如若8080端口冲突可使用 `-Dserver.port=新端口` 进行设置。
#### 配置控制台信息
.application.yml
----
spring:
cloud:
sentinel:
transport:
port: 8719
dashboard: localhost:8080
----
这里的 `spring.cloud.sentinel.transport.port` 端口配置会在应用对应的机器上启动一个Http Server该Serve会与Sentinel控制台做交互。比如Sentinel控制台添加了1个限流规则会把规则数据push给这个Http Server接受Http Server再将规则注册到Sentinel中。
更多Sentinel控制台的使用及问题参考 https://github.com/alibaba/Sentinel/wiki/%E6%8E%A7%E5%88%B6%E5%8F%B0[Sentinel控制台]
### Feign支持
DOING
### RestTemplate支持
Spring Cloud Alibaba Sentinel支持对 `RestTemplate` 的服务调用使用Sentinel进行保护在构造 `RestTemplate` bean的时候需要加上 `@SentinelProtect` 注解。
```java
@Bean
@SentinelProtect(blockHandler = "handleException", blockHandlerClass = ExceptionUtil.class)
public RestTemplate restTemplate() {
return new RestTemplate();
}
```
`@SentinelProtect` 注解的参数跟 `@SentinelResource` 一致,使用方式也一致。
限流的资源规则提供两种粒度:
* `schema://host:port/path`:协议、主机、端口和路径
* `schema://host:port`:协议、主机和端口
NOTE: 以 `https://www.taobao.com/test` 这个url为例。对应的资源名有两种粒度分别是 `https://www.taobao.com:80` 以及 `https://www.taobao.com:80/test`
### 动态数据源支持
*在版本 0.2.0.RELEASE 或 0.1.0.RELEASE 之前*要在Spring Cloud Alibaba Sentinel下使用动态数据源需要3个步骤
* 配置文件中定义数据源信息。比如使用文件:
.application.properties
----
spring.cloud.sentinel.datasource.type=file
spring.cloud.sentinel.datasource.recommendRefreshMs=3000
spring.cloud.sentinel.datasource.bufSize=4056196
spring.cloud.sentinel.datasource.charset=utf-8
spring.cloud.sentinel.datasource.converter=flowConverter
spring.cloud.sentinel.datasource.file=/Users/you/yourrule.json
----
* 创建一个Converter类实现 `com.alibaba.csp.sentinel.datasource.Converter` 接口,并且需要在 `ApplicationContext` 中有该类的一个Bean
```java
@Component("flowConverter")
public class JsonFlowRuleListParser implements Converter<String, List<FlowRule>> {
@Override
public List<FlowRule> convert(String source) {
return JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
});
}
}
```
这个Converter的bean name需要跟 `application.properties` 配置文件中的converter配置一致
* 在任意一个 Spring Bean 中定义一个被 `@SentinelDataSource` 注解修饰的 `ReadableDataSource` 属性
```java
@SentinelDataSource("spring.cloud.sentinel.datasource")
private ReadableDataSource dataSource;
```
`@SentinelDataSource` 注解的value属性表示数据源在 `application.properties` 配置文件中前缀。 该在例子中,前缀为 `spring.cloud.sentinel.datasource` 。
如果 `ApplicationContext` 中存在超过1个 `ReadableDataSource` bean那么不会加载这些 `ReadableDataSource` 中的任意一个。 只有在 `ApplicationContext` 存在一个 `ReadableDataSource` 的情况下才会生效。
如果数据库生效并且规则成功加载,控制台会打印类似如下信息:
```
[Sentinel Starter] load 3 flow rules
```
*在版本 0.2.0.RELEASE 或 0.1.0.RELEASE 之后*要在Spring Cloud Alibaba Sentinel下使用动态数据源只需要1个步骤
* 直接在 `application.properties` 配置文件中配置数据源信息即可
比如配置了4个数据源
```
spring.cloud.sentinel.datasource.ds1.file.file=classpath: degraderule.json
spring.cloud.sentinel.datasource.ds2.nacos.server-addr=localhost:8848
spring.cloud.sentinel.datasource.ds2.nacos.dataId=sentinel
spring.cloud.sentinel.datasource.ds2.nacos.groupId=DEFAULT_GROUP
spring.cloud.sentinel.datasource.ds2.nacos.data-type=json
spring.cloud.sentinel.datasource.ds3.zk.path = /Sentinel-Demo/SYSTEM-CODE-DEMO-FLOW
spring.cloud.sentinel.datasource.ds3.zk.server-addr = localhost:2181
spring.cloud.sentinel.datasource.ds4.apollo.namespace-name = application
spring.cloud.sentinel.datasource.ds4.apollo.flow-rules-key = sentinel
spring.cloud.sentinel.datasource.ds4.apollo.default-flow-rule-value = test
```
这样配置方式参考了Spring Cloud Stream Binder的配置内部使用了 `TreeMap` 进行存储comparator为 `String.CASE_INSENSITIVE_ORDER` 。
NOTE: d1, ds2, ds3, ds4 是 `ReadableDataSource` 的名字,可随意编写。后面的 `file` `zk` `nacos` , `apollo` 就是对应具体的数据源。 它们后面的配置就是这些数据源各自的配置。
每种数据源都有两个共同的配置项: `data-type` 和 `converter-class` 。
`data-type` 配置项表示 `Converter`Spring Cloud Alibaba Sentinel默认提供两种内置的值分别是 `json` 和 `xml` (不填默认是json)。 如果不想使用内置的 `json` 或 `xml` 这两种 `Converter`,可以填写 `custom` 表示自定义 `Converter`,然后再配置 `converter-class` 配置项,该配置项需要写类的全路径名。
这两种内置的 `Converter` 只支持解析 json数组 或 xml数组。内部解析的时候会自动判断每个json对象或xml对象属于哪4种Sentinel规则(`FlowRule``DegradeRule``SystemRule``AuthorityRule`)。
比如10个规则数组里解析出5个限流规则和5个降级规则。 这种情况下该数据源不会注册,日志里页会进行警告。
如果10个规则里有9个限流规则1个解析报错了。这种情况下日志会警告有个规则格式错误另外9个限流规则会注册上去。
这里json或xml解析用的是 `jackson`。`ObjectMapper` 或 `XmlMapper` 使用默认的配置,遇到不认识的字段会解析报错。 因为不这样做的话限流 json 也会解析成 系统规则(系统规则所有的配置都有默认值),所以需要这样严格解析。
当然还有一种情况是json对象或xml对象可能会匹配上所有4种规则(比如json对象里只配了 `resource` 字段那么会匹配上4种规则),这种情况下日志里会警告,并且过滤掉这个对象。
用户使用这种配置的时候只需要填写正确的json或xml就行有任何不合理的信息都会在日志里打印出来。
NOTE: 默认情况下xml格式是不支持的。需要添加 `jackson-dataformat-xml` 依赖后才会自动生效。
关于Sentinel动态数据源的实现原理参考 https://github.com/alibaba/Sentinel/wiki/%E5%8A%A8%E6%80%81%E8%A7%84%E5%88%99%E6%89%A9%E5%B1%95[动态规则扩展]
### Endpoint支持
在使用Endpoint特性之前需要在 Maven 中添加 `spring-boot-starter-actuator` 依赖,并在配置中允许 Endpoints 的访问。
* Spring Boot 1.x 中添加配置 `management.security.enabled=false`。暴露的endpoint路径为 `/sentinel`
* Spring Boot 2.x 中添加配置 `management.endpoints.web.exposure.include=*`。暴露的endpoint路径为 `/actuator/sentinel`
### More
下表显示 Spring Cloud Alibaba Sentinel 的所有配置信息:
:frame: topbot
[width="60%",options="header"]
|====
^|配置项 ^|含义 ^|默认值
|`spring.cloud.sentinel.enabled`|Sentinel自动化配置是否生效|true
|`spring.cloud.sentinel.eager`|取消Sentinel控制台懒加载|false
|`spring.cloud.sentinel.charset`|metric文件字符集|UTF-8
|`spring.cloud.sentinel.transport.port`|应用与Sentinel控制台交互的端口应用本地会起一个该端口占用的HttpServer|8721
|`spring.cloud.sentinel.transport.dashboard`|Sentinel 控制台地址|
|`spring.cloud.sentinel.transport.heartbeatIntervalMs`|应用与Sentinel控制台的心跳间隔时间|
|`spring.cloud.sentinel.filter.order`|Servlet Filter的加载顺序。Starter内部会构造这个filter|Integer.MIN_VALUE
|`spring.cloud.sentinel.filter.spring.url-patterns`|数据类型是数组。表示Servlet Filter的url pattern集合|/*
|`spring.cloud.sentinel.metric.fileSingleSize`|Sentinel metric 单个文件的大小|
|`spring.cloud.sentinel.metric.fileTotalCount`|Sentinel metric 总文件数量|
|`spring.cloud.sentinel.servlet.blockPage`| 自定义的跳转 URL当请求被限流时会自动跳转至设定好的 URL |
|`spring.cloud.sentinel.flow.coldFactor`| https://github.com/alibaba/Sentinel/wiki/%E9%99%90%E6%B5%81---%E5%86%B7%E5%90%AF%E5%8A%A8[冷启动因子] |3
|====

@ -27,6 +27,7 @@
<module>ans-example/ans-consumer-ribbon-example</module>
<module>ans-example/ans-provider-example</module>
<module>acm-example/acm-local-example</module>
<module>rocketmq-example</module>
</modules>
<build>

@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-alibaba-examples</artifactId>
<version>0.2.1.BUILD-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-example</artifactId>
<packaging>jar</packaging>
<description>Example demonstrating how to use rocketmq</description>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>${maven-deploy-plugin.version}</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,39 @@
package org.springframework.cloud.alibaba.cloud.examples;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class Foo {
private int id;
private String tag;
public Foo() {
}
public Foo(int id, String tag) {
this.id = id;
this.tag = tag;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
@Override
public String toString() {
return "Foo{" + "id=" + id + ", tag='" + tag + '\'' + '}';
}
}

@ -0,0 +1,33 @@
package org.springframework.cloud.alibaba.cloud.examples;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@Service
public class ReceiveService {
@StreamListener("input1")
public void receiveInput1(String receiveMsg) {
System.out.println("input1 receive: " + receiveMsg);
}
@StreamListener("input2")
public void receiveInput2(String receiveMsg) {
System.out.println("input2 receive: " + receiveMsg);
}
@StreamListener("input3")
public void receiveInput3(@Payload Foo foo) {
System.out.println("input3 receive: " + foo);
}
@StreamListener("input1")
public void receiveInput1Again(String receiveMsg) {
System.out.println("input1 receive again: " + receiveMsg);
}
}

@ -0,0 +1,65 @@
package org.springframework.cloud.alibaba.cloud.examples;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySink;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.SubscribableChannel;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@SpringBootApplication
@EnableBinding({ Source.class, MySink.class })
public class RocketMQApplication {
public interface MySink {
@Input("input1")
SubscribableChannel input1();
@Input("input2")
SubscribableChannel input2();
@Input("input3")
SubscribableChannel input3();
}
public static void main(String[] args) {
SpringApplication.run(RocketMQApplication.class, args);
}
@Bean
public CustomRunner customRunner() {
return new CustomRunner();
}
public static class CustomRunner implements CommandLineRunner {
@Autowired
private SenderService senderService;
@Override
public void run(String... args) throws Exception {
int count = 5;
for (int index = 1; index <= count; index++) {
String msgContent = "msg-" + index;
if (index % 3 == 0) {
senderService.send(msgContent);
}
else if (index % 3 == 1) {
senderService.sendWithTags(msgContent, "tagStr");
}
else {
senderService.sendObject(new Foo(index, "foo"), "tagObj");
}
}
}
}
}

@ -0,0 +1,43 @@
package org.springframework.cloud.alibaba.cloud.examples;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@Service
public class SenderService {
@Autowired
private Source source;
public void send(String msg) throws Exception {
source.output().send(MessageBuilder.withPayload(msg).build());
}
public <T> void sendWithTags(T msg, String tag) throws Exception {
Message message = MessageBuilder.createMessage(msg,
new MessageHeaders(Stream.of(tag).collect(Collectors
.toMap(str -> MessageConst.PROPERTY_TAGS, String::toString))));
source.output().send(message);
}
public <T> void sendObject(T msg, String tag) throws Exception {
Message message = MessageBuilder.withPayload(msg)
.setHeader(MessageConst.PROPERTY_TAGS, tag)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build();
source.output().send(message);
}
}

@ -0,0 +1,31 @@
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876
spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.bindings.input1.destination=test-topic
spring.cloud.stream.bindings.input1.content-type=application/json
spring.cloud.stream.bindings.input1.group=test-group1
spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true
spring.cloud.stream.bindings.input1.consumer.maxAttempts=1
spring.cloud.stream.bindings.input2.destination=test-topic
spring.cloud.stream.bindings.input2.content-type=application/json
spring.cloud.stream.bindings.input2.group=test-group2
spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false
spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tagStr
spring.cloud.stream.bindings.input2.consumer.concurrency=20
spring.cloud.stream.bindings.input2.consumer.maxAttempts=1
spring.cloud.stream.bindings.input3.destination=test-topic
spring.cloud.stream.bindings.input3.content-type=application/json
spring.cloud.stream.bindings.input3.group=test-group3
spring.cloud.stream.rocketmq.bindings.input3.consumer.tags=tagObj
spring.cloud.stream.bindings.input3.consumer.concurrency=20
spring.cloud.stream.bindings.input3.consumer.maxAttempts=1
server.port=28081
management.endpoints.web.exposure.include=*

@ -30,6 +30,24 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--<dependency>-->
<!--<groupId>com.alibaba.csp</groupId>-->
<!--<artifactId>sentinel-datasource-nacos</artifactId>-->
<!--</dependency>-->
<!--<dependency>-->
<!--<groupId>com.alibaba.csp</groupId>-->
<!--<artifactId>sentinel-datasource-zookeeper</artifactId>-->
<!--</dependency>-->
<!--<dependency>-->
<!--<groupId>com.alibaba.csp</groupId>-->
<!--<artifactId>sentinel-datasource-apollo</artifactId>-->
<!--</dependency>-->
<!-- define in spring-boot-autoconfigure module -->
<!--<dependency>-->
<!--<groupId>com.fasterxml.jackson.dataformat</groupId>-->
<!--<artifactId>jackson-dataformat-xml</artifactId>-->
<!--</dependency>-->
</dependencies>
<build>

@ -10,7 +10,7 @@ import com.alibaba.fastjson.TypeReference;
/**
* @author fangjian
*/
public class JsonFlowRuleListParser implements Converter<String, List<FlowRule>> {
public class JsonFlowRuleListConverter implements Converter<String, List<FlowRule>> {
@Override
public List<FlowRule> convert(String source) {
return JSON.parseObject(source, new TypeReference<List<FlowRule>>() {

@ -25,8 +25,8 @@ public class ServiceApplication {
}
@Bean
public Converter myParser() {
return new JsonFlowRuleListParser();
public Converter myConverter() {
return new JsonFlowRuleListConverter();
}
public static void main(String[] args) {

@ -2,13 +2,15 @@ spring.application.name=sentinel-example
server.port=18083
management.endpoints.web.exposure.include=*
spring.cloud.sentinel.transport.port=8721
spring.cloud.sentinel.transport.dashboard=localhost:8080
spring.cloud.sentinel.transport.dashboard=localhost:9999
spring.cloud.sentinel.eager=true
spring.cloud.sentinel.datasource.ds1.file.file=classpath: flowrule.json
spring.cloud.sentinel.datasource.ds1.file.data-type=json
#spring.cloud.sentinel.datasource.ds1.file.file=classpath: flowrule.json
#spring.cloud.sentinel.datasource.ds1.file.data-type=custom
#spring.cloud.sentinel.datasource.ds1.file.converter-class=org.springframework.cloud.alibaba.cloud.examples.JsonFlowRuleListConverter
spring.cloud.sentinel.datasource.type=file
spring.cloud.sentinel.datasource.recommendRefreshMs=3000
spring.cloud.sentinel.datasource.bufSize=4056196
spring.cloud.sentinel.datasource.charset=utf-8
spring.cloud.sentinel.datasource.converter=myParser
spring.cloud.sentinel.datasource.file=/Users/you/rule.json
spring.cloud.sentinel.datasource.ds2.file.file=classpath: degraderule.json
spring.cloud.sentinel.datasource.ds2.file.data-type=json

@ -0,0 +1,16 @@
[
{
"resource": "abc0",
"count": 20.0,
"grade": 0,
"passCount": 0,
"timeWindow": 10
},
{
"resource": "abc1",
"count": 15.0,
"grade": 0,
"passCount": 0,
"timeWindow": 10
}
]

@ -0,0 +1,26 @@
[
{
"resource": "resource",
"controlBehavior": 0,
"count": 1,
"grade": 1,
"limitApp": "default",
"strategy": 0
},
{
"resource": "p",
"controlBehavior": 0,
"count": 1,
"grade": 1,
"limitApp": "default",
"strategy": 0
},
{
"resource": "abc",
"controlBehavior": 0,
"count": 1,
"grade": 1,
"limitApp": "default",
"strategy": 0
}
]

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8" ?>
<Rules>
<FlowRule>
<resource>resource</resource>
<controlBehavior>0</controlBehavior>
<count>1</count>
<grade>1</grade>
<limitApp>default</limitApp>
<strategy>0</strategy>
</FlowRule>
<FlowRule>
<resource>test</resource>
<controlBehavior>0</controlBehavior>
<count>1</count>
<grade>1</grade>
<limitApp>default</limitApp>
<strategy>0</strategy>
</FlowRule>
</Rules>

@ -41,8 +41,27 @@
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<scope>provided</scope>
</dependency>
<!--spring boot-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
@ -63,7 +82,6 @@
<scope>test</scope>
</dependency>
</dependencies>
</project>

@ -1,152 +0,0 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.alibaba.sentinel.datasource;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.PropertiesLoaderUtils;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import static org.springframework.core.io.support.ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX;
/**
* {@link ReadableDataSource} Loader
*
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class DataSourceLoader {
private static final Logger logger = LoggerFactory.getLogger(DataSourceLoader.class);
private final static String PROPERTIES_RESOURCE_LOCATION = "META-INF/sentinel-datasource.properties";
private final static String ALL_PROPERTIES_RESOURCES_LOCATION = CLASSPATH_ALL_URL_PREFIX
+ PROPERTIES_RESOURCE_LOCATION;
private final static ConcurrentMap<String, Class<? extends ReadableDataSource>> dataSourceClassesCache
= new ConcurrentHashMap<String, Class<? extends ReadableDataSource>>(
4);
static void loadAllDataSourceClassesCache() {
Map<String, Class<? extends ReadableDataSource>> dataSourceClassesMap = loadAllDataSourceClassesCache(
ALL_PROPERTIES_RESOURCES_LOCATION);
dataSourceClassesCache.putAll(dataSourceClassesMap);
}
static Map<String, Class<? extends ReadableDataSource>> loadAllDataSourceClassesCache(
String resourcesLocation) {
Map<String, Class<? extends ReadableDataSource>> dataSourcesMap
= new HashMap<String, Class<? extends ReadableDataSource>>(
4);
ClassLoader classLoader = DataSourceLoader.class.getClassLoader();
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
try {
Resource[] resources = resolver.getResources(resourcesLocation);
for (Resource resource : resources) {
if (resource.exists()) {
Properties properties = PropertiesLoaderUtils
.loadProperties(resource);
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
String type = (String)entry.getKey();
String className = (String)entry.getValue();
if (!ClassUtils.isPresent(className, classLoader)) {
if (logger.isDebugEnabled()) {
logger.debug(
"Sentinel DataSource implementation [ type : "
+ type + ": , class : " + className
+ " , url : " + resource.getURL()
+ "] was not present in current classpath , "
+ "thus loading will be ignored , please add dependency if required !");
}
continue;
}
Assert.isTrue(!dataSourcesMap.containsKey(type),
"The duplicated type[" + type
+ "] of SentinelDataSource were found in "
+ "resource [" + resource.getURL() + "]");
Class<?> dataSourceClass = ClassUtils.resolveClassName(className,
classLoader);
Assert.isAssignable(ReadableDataSource.class, dataSourceClass);
dataSourcesMap.put(type,
(Class<? extends ReadableDataSource>)dataSourceClass);
if (logger.isDebugEnabled()) {
logger.debug("Sentinel DataSource implementation [ type : "
+ type + ": , class : " + className
+ "] was loaded.");
}
}
}
}
} catch (IOException e) {
if (logger.isErrorEnabled()) {
logger.error(e.getMessage(), e);
}
}
return dataSourcesMap;
}
public static Class<? extends ReadableDataSource> loadClass(String type)
throws IllegalArgumentException {
Class<? extends ReadableDataSource> dataSourceClass = dataSourceClassesCache.get(type);
if (dataSourceClass == null) {
if (dataSourceClassesCache.isEmpty()) {
loadAllDataSourceClassesCache();
dataSourceClass = dataSourceClassesCache.get(type);
}
}
if (dataSourceClass == null) {
throw new IllegalArgumentException(
"Sentinel DataSource implementation [ type : " + type
+ " ] can't be found!");
}
return dataSourceClass;
}
}

@ -1,272 +0,0 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.alibaba.sentinel.datasource;
import java.beans.PropertyDescriptor;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
import com.alibaba.csp.sentinel.property.SentinelProperty;
import com.alibaba.csp.sentinel.slots.block.authority.AuthorityRule;
import com.alibaba.csp.sentinel.slots.block.authority.AuthorityRuleManager;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.csp.sentinel.slots.system.SystemRule;
import com.alibaba.csp.sentinel.slots.system.SystemRuleManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.PropertyValues;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.InstantiationAwareBeanPostProcessorAdapter;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.beans.factory.support.MergedBeanDefinitionPostProcessor;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.cloud.alibaba.sentinel.datasource.annotation.SentinelDataSource;
import org.springframework.cloud.alibaba.sentinel.datasource.util.PropertySourcesUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.event.EventListener;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import static org.springframework.core.annotation.AnnotationUtils.getAnnotation;
/**
* {@link SentinelDataSource @SentinelDataSource} Post Processor
*
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
* @see ReadableDataSource
* @see SentinelDataSource
*/
public class SentinelDataSourcePostProcessor
extends InstantiationAwareBeanPostProcessorAdapter
implements MergedBeanDefinitionPostProcessor {
private static final Logger logger = LoggerFactory
.getLogger(SentinelDataSourcePostProcessor.class);
@Autowired
private ApplicationContext applicationContext;
@Autowired
private ConfigurableEnvironment environment;
private final Map<String, List<SentinelDataSourceField>> dataSourceFieldCache = new ConcurrentHashMap<>(
64);
@Override
public void postProcessMergedBeanDefinition(RootBeanDefinition beanDefinition,
Class<?> beanType, String beanName) {
// find all fields using by @SentinelDataSource annotation
ReflectionUtils.doWithFields(beanType, new ReflectionUtils.FieldCallback() {
@Override
public void doWith(Field field)
throws IllegalArgumentException, IllegalAccessException {
SentinelDataSource annotation = getAnnotation(field,
SentinelDataSource.class);
if (annotation != null) {
if (Modifier.isStatic(field.getModifiers())) {
if (logger.isWarnEnabled()) {
logger.warn(
"@SentinelDataSource annotation is not supported on static fields: "
+ field);
}
return;
}
if (dataSourceFieldCache.containsKey(beanName)) {
dataSourceFieldCache.get(beanName)
.add(new SentinelDataSourceField(annotation, field));
} else {
List<SentinelDataSourceField> list = new ArrayList<>();
list.add(new SentinelDataSourceField(annotation, field));
dataSourceFieldCache.put(beanName, list);
}
}
}
});
}
@Override
public PropertyValues postProcessPropertyValues(PropertyValues pvs,
PropertyDescriptor[] pds, Object bean, String beanName)
throws BeanCreationException {
if (dataSourceFieldCache.containsKey(beanName)) {
List<SentinelDataSourceField> sentinelDataSourceFields = dataSourceFieldCache
.get(beanName);
sentinelDataSourceFields.forEach(sentinelDataSourceField -> {
try {
// construct DataSource field annotated by @SentinelDataSource
Field field = sentinelDataSourceField.getField();
ReflectionUtils.makeAccessible(field);
String dataSourceBeanName = constructDataSource(
sentinelDataSourceField.getSentinelDataSource());
field.set(bean, applicationContext.getBean(dataSourceBeanName));
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
});
}
return pvs;
}
private String constructDataSource(SentinelDataSource annotation) {
String prefix = annotation.value();
if (StringUtils.isEmpty(prefix)) {
prefix = SentinelDataSourceConstants.PROPERTY_DATASOURCE_PREFIX;
}
Map<String, Object> propertyMap = PropertySourcesUtils
.getSubProperties(environment.getPropertySources(), prefix);
String alias = propertyMap.get("type").toString();
Class dataSourceClass = DataSourceLoader.loadClass(alias);
String beanName = StringUtils.isEmpty(annotation.name())
? StringUtils.uncapitalize(dataSourceClass.getSimpleName()) + "_" + prefix
: annotation.name();
if (applicationContext.containsBean(beanName)) {
return beanName;
}
Class targetClass = null;
// if alias exists in SentinelDataSourceRegistry, wired properties into
// FactoryBean
if (SentinelDataSourceRegistry.checkFactoryBean(alias)) {
targetClass = SentinelDataSourceRegistry.getFactoryBean(alias);
} else {
// if alias not exists in SentinelDataSourceRegistry, wired properties into
// raw class
targetClass = dataSourceClass;
}
registerDataSource(beanName, targetClass, propertyMap);
return beanName;
}
private void registerDataSource(String beanName, Class targetClass,
Map<String, Object> propertyMap) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder
.genericBeanDefinition(targetClass);
for (String propertyName : propertyMap.keySet()) {
Field field = ReflectionUtils.findField(targetClass, propertyName);
if (field != null) {
if (field.getType().isAssignableFrom(Converter.class)) {
// Converter get from ApplicationContext
builder.addPropertyReference(propertyName,
propertyMap.get(propertyName).toString());
} else {
// wired properties
builder.addPropertyValue(propertyName, propertyMap.get(propertyName));
}
}
}
DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory)applicationContext
.getAutowireCapableBeanFactory();
beanFactory.registerBeanDefinition(beanName, builder.getBeanDefinition());
}
@EventListener(classes = ApplicationStartedEvent.class)
public void appStartedListener(ApplicationStartedEvent event) throws Exception {
logger.info("[Sentinel Starter] Start to find ReadableDataSource");
Map<String, ReadableDataSource> dataSourceMap = event.getApplicationContext().getBeansOfType(
ReadableDataSource.class);
if (dataSourceMap.size() == 1) {
logger.info("[Sentinel Starter] There exists only one ReadableDataSource named {}, start to load rules",
dataSourceMap.keySet().iterator().next());
ReadableDataSource dataSource = dataSourceMap.values().iterator().next();
Object ruleConfig = dataSource.loadConfig();
SentinelProperty sentinelProperty = dataSource.getProperty();
Integer rulesNum;
if ((rulesNum = checkRuleType(ruleConfig, FlowRule.class)) > 0) {
FlowRuleManager.register2Property(sentinelProperty);
logger.info("[Sentinel Starter] load {} flow rules", rulesNum);
}
if ((rulesNum = checkRuleType(ruleConfig, DegradeRule.class)) > 0) {
DegradeRuleManager.register2Property(sentinelProperty);
logger.info("[Sentinel Starter] load {} degrade rules", rulesNum);
}
if ((rulesNum = checkRuleType(ruleConfig, SystemRule.class)) > 0) {
SystemRuleManager.register2Property(sentinelProperty);
logger.info("[Sentinel Starter] load {} system rules", rulesNum);
}
if ((rulesNum = checkRuleType(ruleConfig, AuthorityRule.class)) > 0) {
AuthorityRuleManager.register2Property(sentinelProperty);
logger.info("[Sentinel Starter] load {} authority rules", rulesNum);
}
} else if (dataSourceMap.size() > 1) {
logger.warn(
"[Sentinel Starter] There exists more than one ReadableDataSource, can not choose which one to load");
} else {
logger.warn(
"[Sentinel Starter] No ReadableDataSource exists");
}
}
private Integer checkRuleType(Object ruleConfig, Class type) {
if (ruleConfig.getClass() == type) {
return 1;
} else if (ruleConfig instanceof List) {
List ruleList = (List)ruleConfig;
if (ruleList.stream().filter(rule -> rule.getClass() == type).toArray().length == ruleList.size()) {
return ruleList.size();
}
}
return -1;
}
class SentinelDataSourceField {
private SentinelDataSource sentinelDataSource;
private Field field;
public SentinelDataSourceField(SentinelDataSource sentinelDataSource,
Field field) {
this.sentinelDataSource = sentinelDataSource;
this.field = field;
}
public SentinelDataSource getSentinelDataSource() {
return sentinelDataSource;
}
public void setSentinelDataSource(SentinelDataSource sentinelDataSource) {
this.sentinelDataSource = sentinelDataSource;
}
public Field getField() {
return field;
}
public void setField(Field field) {
this.field = field;
}
}
}

@ -1,68 +0,0 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.alibaba.sentinel.datasource;
import java.util.HashMap;
import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.cloud.alibaba.sentinel.datasource.factorybean.ApolloDataSourceFactoryBean;
import org.springframework.cloud.alibaba.sentinel.datasource.factorybean.FileRefreshableDataSourceFactoryBean;
import org.springframework.cloud.alibaba.sentinel.datasource.factorybean.NacosDataSourceFactoryBean;
import org.springframework.cloud.alibaba.sentinel.datasource.factorybean.ZookeeperDataSourceFactoryBean;
/**
* Registry to save DataSource FactoryBean
*
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
* @see ReadableDataSource
* @see FileRefreshableDataSourceFactoryBean
* @see ZookeeperDataSourceFactoryBean
* @see NacosDataSourceFactoryBean
* @see ApolloDataSourceFactoryBean
*/
public class SentinelDataSourceRegistry {
private static HashMap<String, Class<? extends FactoryBean>> cache = new HashMap<>(
32);
static {
SentinelDataSourceRegistry.registerFactoryBean("file",
FileRefreshableDataSourceFactoryBean.class);
SentinelDataSourceRegistry.registerFactoryBean("zk",
ZookeeperDataSourceFactoryBean.class);
SentinelDataSourceRegistry.registerFactoryBean("nacos",
NacosDataSourceFactoryBean.class);
SentinelDataSourceRegistry.registerFactoryBean("apollo",
ApolloDataSourceFactoryBean.class);
}
public static synchronized void registerFactoryBean(String alias,
Class<? extends FactoryBean> clazz) {
cache.put(alias, clazz);
}
public static Class<? extends FactoryBean> getFactoryBean(String alias) {
return cache.get(alias);
}
public static boolean checkFactoryBean(String alias) {
return cache.containsKey(alias);
}
}

@ -1,50 +0,0 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.alibaba.sentinel.datasource.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
import org.springframework.core.annotation.AliasFor;
/**
* An annotation to inject {@link ReadableDataSource} instance
* into a Spring Bean. The Properties of DataSource bean get from config files with
* specific prefix.
*
* <a href="mailto:fangjian0423@gmail.com">Jim</a>
* @see ReadableDataSource
*/
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface SentinelDataSource {
@AliasFor("prefix")
String value() default "";
@AliasFor("value")
String prefix() default "";
String name() default ""; // spring bean name
}

@ -0,0 +1,41 @@
package org.springframework.cloud.alibaba.sentinel.datasource.config;
import com.fasterxml.jackson.annotation.JsonIgnore;
/**
* Abstract class Using by {@link DataSourcePropertiesConfiguration}
*
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class AbstractDataSourceProperties {
private String dataType = "json";
private String converterClass;
@JsonIgnore
private final String factoryBeanName;
public AbstractDataSourceProperties(String factoryBeanName) {
this.factoryBeanName = factoryBeanName;
}
public String getDataType() {
return dataType;
}
public void setDataType(String dataType) {
this.dataType = dataType;
}
public String getConverterClass() {
return converterClass;
}
public void setConverterClass(String converterClass) {
this.converterClass = converterClass;
}
public String getFactoryBeanName() {
return factoryBeanName;
}
}

@ -0,0 +1,44 @@
package org.springframework.cloud.alibaba.sentinel.datasource.config;
import org.springframework.cloud.alibaba.sentinel.datasource.factorybean.ApolloDataSourceFactoryBean;
/**
* Apollo Properties class Using by {@link DataSourcePropertiesConfiguration} and
* {@link ApolloDataSourceFactoryBean}
*
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class ApolloDataSourceProperties extends AbstractDataSourceProperties {
private String namespaceName;
private String flowRulesKey;
private String defaultFlowRuleValue;
public ApolloDataSourceProperties() {
super(ApolloDataSourceFactoryBean.class.getName());
}
public String getNamespaceName() {
return namespaceName;
}
public void setNamespaceName(String namespaceName) {
this.namespaceName = namespaceName;
}
public String getFlowRulesKey() {
return flowRulesKey;
}
public void setFlowRulesKey(String flowRulesKey) {
this.flowRulesKey = flowRulesKey;
}
public String getDefaultFlowRuleValue() {
return defaultFlowRuleValue;
}
public void setDefaultFlowRuleValue(String defaultFlowRuleValue) {
this.defaultFlowRuleValue = defaultFlowRuleValue;
}
}

@ -0,0 +1,79 @@
package org.springframework.cloud.alibaba.sentinel.datasource.config;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.springframework.util.ObjectUtils;
import com.fasterxml.jackson.annotation.JsonIgnore;
/**
* Using By ConfigurationProperties.
*
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
* @see NacosDataSourceProperties
* @see ApolloDataSourceProperties
* @see ZookeeperDataSourceProperties
* @see FileDataSourceProperties
*/
public class DataSourcePropertiesConfiguration {
private FileDataSourceProperties file;
private NacosDataSourceProperties nacos;
private ZookeeperDataSourceProperties zk;
private ApolloDataSourceProperties apollo;
public FileDataSourceProperties getFile() {
return file;
}
public void setFile(FileDataSourceProperties file) {
this.file = file;
}
public NacosDataSourceProperties getNacos() {
return nacos;
}
public void setNacos(NacosDataSourceProperties nacos) {
this.nacos = nacos;
}
public ZookeeperDataSourceProperties getZk() {
return zk;
}
public void setZk(ZookeeperDataSourceProperties zk) {
this.zk = zk;
}
public ApolloDataSourceProperties getApollo() {
return apollo;
}
public void setApollo(ApolloDataSourceProperties apollo) {
this.apollo = apollo;
}
@JsonIgnore
public List<String> getInvalidField() {
return Arrays.stream(this.getClass().getDeclaredFields()).map(field -> {
try {
if (!ObjectUtils.isEmpty(field.get(this))) {
return field.getName();
}
return null;
}
catch (IllegalAccessException e) {
// won't happen
}
return null;
}).filter(Objects::nonNull).collect(Collectors.toList());
}
}

@ -0,0 +1,53 @@
package org.springframework.cloud.alibaba.sentinel.datasource.config;
import org.springframework.cloud.alibaba.sentinel.datasource.factorybean.FileRefreshableDataSourceFactoryBean;
/**
* File Properties class Using by {@link DataSourcePropertiesConfiguration} and
* {@link FileRefreshableDataSourceFactoryBean}
*
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class FileDataSourceProperties extends AbstractDataSourceProperties {
private String file;
private String charset = "utf-8";
private long recommendRefreshMs = 3000L;
private int bufSize = 1024 * 1024;
public FileDataSourceProperties() {
super(FileRefreshableDataSourceFactoryBean.class.getName());
}
public String getFile() {
return file;
}
public void setFile(String file) {
this.file = file;
}
public String getCharset() {
return charset;
}
public void setCharset(String charset) {
this.charset = charset;
}
public long getRecommendRefreshMs() {
return recommendRefreshMs;
}
public void setRecommendRefreshMs(long recommendRefreshMs) {
this.recommendRefreshMs = recommendRefreshMs;
}
public int getBufSize() {
return bufSize;
}
public void setBufSize(int bufSize) {
this.bufSize = bufSize;
}
}

@ -0,0 +1,44 @@
package org.springframework.cloud.alibaba.sentinel.datasource.config;
import org.springframework.cloud.alibaba.sentinel.datasource.factorybean.NacosDataSourceFactoryBean;
/**
* Nacos Properties class Using by {@link DataSourcePropertiesConfiguration} and
* {@link NacosDataSourceFactoryBean}
*
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class NacosDataSourceProperties extends AbstractDataSourceProperties {
private String serverAddr;
private String groupId;
private String dataId;
public NacosDataSourceProperties() {
super(NacosDataSourceFactoryBean.class.getName());
}
public String getServerAddr() {
return serverAddr;
}
public void setServerAddr(String serverAddr) {
this.serverAddr = serverAddr;
}
public String getGroupId() {
return groupId;
}
public void setGroupId(String groupId) {
this.groupId = groupId;
}
public String getDataId() {
return dataId;
}
public void setDataId(String dataId) {
this.dataId = dataId;
}
}

@ -0,0 +1,56 @@
package org.springframework.cloud.alibaba.sentinel.datasource.config;
import org.springframework.cloud.alibaba.sentinel.datasource.factorybean.ZookeeperDataSourceFactoryBean;
/**
* Zookeeper Properties class Using by {@link DataSourcePropertiesConfiguration} and
* {@link ZookeeperDataSourceFactoryBean}
*
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class ZookeeperDataSourceProperties extends AbstractDataSourceProperties {
public ZookeeperDataSourceProperties() {
super(ZookeeperDataSourceFactoryBean.class.getName());
}
private String serverAddr;
private String path;
private String groupId;
private String dataId;
public String getServerAddr() {
return serverAddr;
}
public void setServerAddr(String serverAddr) {
this.serverAddr = serverAddr;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
public String getGroupId() {
return groupId;
}
public void setGroupId(String groupId) {
this.groupId = groupId;
}
public String getDataId() {
return dataId;
}
public void setDataId(String dataId) {
this.dataId = dataId;
}
}

@ -0,0 +1,157 @@
package org.springframework.cloud.alibaba.sentinel.datasource.converter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.slots.block.AbstractRule;
import com.alibaba.csp.sentinel.slots.block.authority.AuthorityRule;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.csp.sentinel.slots.system.SystemRule;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* Convert sentinel rules for json array Using strict mode to parse json
*
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
* @see FlowRule
* @see DegradeRule
* @see SystemRule
* @see AuthorityRule
* @see ObjectMapper
*/
public class JsonConverter implements Converter<String, List<AbstractRule>> {
private static final Logger logger = LoggerFactory.getLogger(JsonConverter.class);
private final ObjectMapper objectMapper;
public JsonConverter(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
@Override
public List<AbstractRule> convert(String source) {
List<AbstractRule> ruleList = new ArrayList<>();
if (StringUtils.isEmpty(source)) {
logger.info(
"Sentinel JsonConverter can not convert rules because source is empty");
return ruleList;
}
try {
List jsonArray = objectMapper.readValue(source,
new TypeReference<List<HashMap>>() {
});
jsonArray.stream().forEach(obj -> {
String itemJson = null;
try {
itemJson = objectMapper.writeValueAsString(obj);
}
catch (JsonProcessingException e) {
// won't be happen
}
List<AbstractRule> rules = Arrays.asList(convertFlowRule(itemJson),
convertDegradeRule(itemJson), convertSystemRule(itemJson),
convertAuthorityRule(itemJson));
List<AbstractRule> convertRuleList = rules.stream()
.filter(rule -> !ObjectUtils.isEmpty(rule))
.collect(Collectors.toList());
if (convertRuleList.size() == 0) {
logger.warn(
"Sentinel JsonConverter can not convert {} to any rules, ignore",
itemJson);
}
else if (convertRuleList.size() > 1) {
logger.warn(
"Sentinel JsonConverter convert {} and match multi rules, ignore",
itemJson);
}
else {
ruleList.add(convertRuleList.get(0));
}
});
if (jsonArray.size() != ruleList.size()) {
logger.warn(
"Sentinel JsonConverter Source list size is not equals to Target List, maybe a "
+ "part of json is invalid. Source List: " + jsonArray
+ ", Target List: " + ruleList);
}
}
catch (Exception e) {
logger.error("Sentinel JsonConverter convert error: " + e.getMessage());
throw new RuntimeException(
"Sentinel JsonConverter convert error: " + e.getMessage(), e);
}
logger.info("Sentinel JsonConverter convert {} rules: {}", ruleList.size(),
ruleList);
return ruleList;
}
private FlowRule convertFlowRule(String json) {
try {
FlowRule rule = objectMapper.readValue(json, FlowRule.class);
if (FlowRuleManager.isValidRule(rule)) {
return rule;
}
}
catch (Exception e) {
// ignore
}
return null;
}
private DegradeRule convertDegradeRule(String json) {
try {
DegradeRule rule = objectMapper.readValue(json, DegradeRule.class);
if (DegradeRuleManager.isValidRule(rule)) {
return rule;
}
}
catch (Exception e) {
// ignore
}
return null;
}
private SystemRule convertSystemRule(String json) {
SystemRule rule = null;
try {
rule = objectMapper.readValue(json, SystemRule.class);
}
catch (Exception e) {
// ignore
}
return rule;
}
private AuthorityRule convertAuthorityRule(String json) {
AuthorityRule rule = null;
try {
rule = objectMapper.readValue(json, AuthorityRule.class);
}
catch (Exception e) {
// ignore
}
return rule;
}
}

@ -0,0 +1,157 @@
package org.springframework.cloud.alibaba.sentinel.datasource.converter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.slots.block.AbstractRule;
import com.alibaba.csp.sentinel.slots.block.authority.AuthorityRule;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.csp.sentinel.slots.system.SystemRule;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.dataformat.xml.XmlMapper;
/**
* Convert sentinel rules for xml array Using strict mode to parse xml
*
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
* @see FlowRule
* @see DegradeRule
* @see SystemRule
* @see AuthorityRule
* @see XmlMapper
*/
public class XmlConverter implements Converter<String, List<AbstractRule>> {
private static final Logger logger = LoggerFactory.getLogger(XmlConverter.class);
private final XmlMapper xmlMapper;
public XmlConverter(XmlMapper xmlMapper) {
this.xmlMapper = xmlMapper;
}
@Override
public List<AbstractRule> convert(String source) {
List<AbstractRule> ruleList = new ArrayList<>();
if (StringUtils.isEmpty(source)) {
logger.info(
"Sentinel XmlConverter can not convert rules because source is empty");
return ruleList;
}
try {
List xmlArray = xmlMapper.readValue(source,
new TypeReference<List<HashMap>>() {
});
xmlArray.stream().forEach(obj -> {
String itemXml = null;
try {
itemXml = xmlMapper.writeValueAsString(obj);
}
catch (JsonProcessingException e) {
// won't be happen
}
List<AbstractRule> rules = Arrays.asList(convertFlowRule(itemXml),
convertDegradeRule(itemXml), convertSystemRule(itemXml),
convertAuthorityRule(itemXml));
List<AbstractRule> convertRuleList = rules.stream()
.filter(rule -> !ObjectUtils.isEmpty(rule))
.collect(Collectors.toList());
if (convertRuleList.size() == 0) {
logger.warn(
"Sentinel XmlConverter can not convert {} to any rules, ignore",
itemXml);
}
else if (convertRuleList.size() > 1) {
logger.warn(
"Sentinel XmlConverter convert {} and match multi rules, ignore",
itemXml);
}
else {
ruleList.add(convertRuleList.get(0));
}
});
if (xmlArray.size() != ruleList.size()) {
logger.warn(
"Sentinel XmlConverter Source list size is not equals to Target List, maybe a "
+ "part of xml is invalid. Source List: " + xmlArray
+ ", Target List: " + ruleList);
}
}
catch (Exception e) {
logger.error("Sentinel XmlConverter convert error: " + e.getMessage());
throw new RuntimeException(
"Sentinel XmlConverter convert error: " + e.getMessage(), e);
}
logger.info("Sentinel XmlConverter convert {} rules: {}", ruleList.size(),
ruleList);
return ruleList;
}
private FlowRule convertFlowRule(String xml) {
try {
FlowRule rule = xmlMapper.readValue(xml, FlowRule.class);
if (FlowRuleManager.isValidRule(rule)) {
return rule;
}
}
catch (Exception e) {
// ignore
}
return null;
}
private DegradeRule convertDegradeRule(String xml) {
try {
DegradeRule rule = xmlMapper.readValue(xml, DegradeRule.class);
if (DegradeRuleManager.isValidRule(rule)) {
return rule;
}
}
catch (Exception e) {
// ignore
}
return null;
}
private SystemRule convertSystemRule(String xml) {
SystemRule rule = null;
try {
rule = xmlMapper.readValue(xml, SystemRule.class);
}
catch (Exception e) {
// ignore
}
return rule;
}
private AuthorityRule convertAuthorityRule(String xml) {
AuthorityRule rule = null;
try {
rule = xmlMapper.readValue(xml, AuthorityRule.class);
}
catch (Exception e) {
// ignore
}
return rule;
}
}

@ -1,75 +0,0 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.alibaba.sentinel.datasource.util;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import org.springframework.core.env.EnumerablePropertySource;
import org.springframework.core.env.PropertySource;
import org.springframework.core.env.PropertySources;
/**
* {@link PropertySources} Utilities
*
* @author <a href="mailto:mercyblitz@gmail.com">Mercy</a>
*/
public abstract class PropertySourcesUtils {
/**
* Get Sub {@link Properties}
*
* @param propertySources {@link PropertySource} Iterable
* @param prefix the prefix of property name
* @return Map
* @see Properties
*/
public static Map<String, Object> getSubProperties(Iterable<PropertySource<?>> propertySources, String prefix) {
Map<String, Object> subProperties = new LinkedHashMap<String, Object>();
String normalizedPrefix = normalizePrefix(prefix);
for (PropertySource<?> source : propertySources) {
if (source instanceof EnumerablePropertySource) {
for (String name : ((EnumerablePropertySource<?>)source).getPropertyNames()) {
if (!subProperties.containsKey(name) && name.startsWith(normalizedPrefix)) {
String subName = name.substring(normalizedPrefix.length());
if (!subProperties.containsKey(subName)) { // take first one
Object value = source.getProperty(name);
subProperties.put(subName, value);
}
}
}
}
}
return subProperties;
}
/**
* Normalize the prefix
*
* @param prefix the prefix
* @return the prefix
*/
public static String normalizePrefix(String prefix) {
return prefix.endsWith(".") ? prefix : prefix + ".";
}
}

@ -40,6 +40,12 @@
<artifactId>spring-cloud-alibaba-sentinel-datasource</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<scope>provided</scope>
</dependency>
<!--spring boot-->
<dependency>

@ -17,9 +17,12 @@
package org.springframework.cloud.alibaba.sentinel;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
import org.springframework.cloud.alibaba.sentinel.datasource.config.DataSourcePropertiesConfiguration;
import org.springframework.core.Ordered;
import com.alibaba.csp.sentinel.config.SentinelConfig;
@ -29,6 +32,7 @@ import com.alibaba.csp.sentinel.transport.config.TransportConfig;
* @author xiaojing
* @author hengyunabc
* @author jiashuai.xie
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@ConfigurationProperties(prefix = SentinelConstants.PROPERTY_PREFIX)
public class SentinelProperties {
@ -49,6 +53,12 @@ public class SentinelProperties {
*/
private String charset = "UTF-8";
/**
* configurations about datasource, like 'nacos', 'apollo', 'file', 'zookeeper'
*/
private Map<String, DataSourcePropertiesConfiguration> datasource = new TreeMap<>(
String.CASE_INSENSITIVE_ORDER);
/**
* transport configuration about dashboard and client
*/
@ -145,6 +155,14 @@ public class SentinelProperties {
this.filter = filter;
}
public Map<String, DataSourcePropertiesConfiguration> getDatasource() {
return datasource;
}
public void setDatasource(Map<String, DataSourcePropertiesConfiguration> datasource) {
this.datasource = datasource;
}
public static class Flow {
/**

@ -21,13 +21,15 @@ import java.util.Optional;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.alibaba.sentinel.SentinelProperties;
import org.springframework.cloud.alibaba.sentinel.datasource.SentinelDataSourcePostProcessor;
import org.springframework.cloud.alibaba.sentinel.datasource.converter.JsonConverter;
import org.springframework.cloud.alibaba.sentinel.datasource.converter.XmlConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
@ -42,9 +44,13 @@ import com.alibaba.csp.sentinel.init.InitExecutor;
import com.alibaba.csp.sentinel.transport.config.TransportConfig;
import com.alibaba.csp.sentinel.util.AppNameUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.xml.XmlMapper;
/**
* @author xiaojing
* @author jiashuai.xie
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@Configuration
@ConditionalOnProperty(name = "spring.cloud.sentinel.enabled", matchIfMissing = true)
@ -135,9 +141,33 @@ public class SentinelAutoConfiguration {
}
@Bean
@ConditionalOnMissingBean
public SentinelDataSourcePostProcessor sentinelDataSourcePostProcessor() {
return new SentinelDataSourcePostProcessor();
public SentinelDataSourceHandler sentinelDataSourceHandler() {
return new SentinelDataSourceHandler();
}
@Bean("sentinel-json-converter")
public JsonConverter jsonConverter(
@Qualifier("sentinel-object-mapper") ObjectMapper objectMapper) {
return new JsonConverter(objectMapper);
}
@Bean("sentinel-object-mapper")
public ObjectMapper objectMapper() {
return new ObjectMapper();
}
@ConditionalOnClass(XmlMapper.class)
protected static class SentinelXmlConfiguration {
@Bean("sentinel-xml-converter")
public XmlConverter xmlConverter(
@Qualifier("sentinel-xml-mapper") XmlMapper xmlMapper) {
return new XmlConverter(xmlMapper);
}
@Bean("sentinel-xml-mapper")
public XmlMapper xmlMapper() {
return new XmlMapper();
}
}
}

@ -34,7 +34,7 @@ import org.springframework.web.client.RestTemplate;
/**
* PostProcessor handle @SentinelProtect Annotation, add interceptor for RestTemplate
*
* @author fangjian
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
* @see SentinelProtect
* @see SentinelProtectInterceptor
*/

@ -0,0 +1,315 @@
package org.springframework.cloud.alibaba.sentinel.custom;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.cloud.alibaba.sentinel.SentinelProperties;
import org.springframework.cloud.alibaba.sentinel.datasource.config.AbstractDataSourceProperties;
import org.springframework.cloud.alibaba.sentinel.datasource.converter.JsonConverter;
import org.springframework.cloud.alibaba.sentinel.datasource.converter.XmlConverter;
import org.springframework.context.event.EventListener;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.ResourceUtils;
import org.springframework.util.StringUtils;
import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
import com.alibaba.csp.sentinel.property.SentinelProperty;
import com.alibaba.csp.sentinel.slots.block.authority.AuthorityRule;
import com.alibaba.csp.sentinel.slots.block.authority.AuthorityRuleManager;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.csp.sentinel.slots.system.SystemRule;
import com.alibaba.csp.sentinel.slots.system.SystemRuleManager;
/**
* Sentinel {@link ReadableDataSource} Handler Handle the configurations of
* 'spring.cloud.sentinel.datasource'
*
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
* @see SentinelProperties#datasource
* @see JsonConverter
* @see XmlConverter
*/
public class SentinelDataSourceHandler {
private static final Logger logger = LoggerFactory
.getLogger(SentinelDataSourceHandler.class);
private List<String> dataTypeList = Arrays.asList("json", "xml");
private List<Class> rulesList = Arrays.asList(FlowRule.class, DegradeRule.class,
SystemRule.class, AuthorityRule.class);
private List<String> dataSourceBeanNameList = Collections
.synchronizedList(new ArrayList<>());
private final String DATATYPE_FIELD = "dataType";
private final String CUSTOM_DATATYPE = "custom";
private final String CONVERTERCLASS_FIELD = "converterClass";
@Autowired
private SentinelProperties sentinelProperties;
@EventListener(classes = ApplicationStartedEvent.class)
public void buildDataSource(ApplicationStartedEvent event) throws Exception {
DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) event
.getApplicationContext().getAutowireCapableBeanFactory();
sentinelProperties.getDatasource()
.forEach((dataSourceName, dataSourceProperties) -> {
if (dataSourceProperties.getInvalidField().size() != 1) {
logger.error("[Sentinel Starter] DataSource " + dataSourceName
+ " multi datasource active and won't loaded: "
+ dataSourceProperties.getInvalidField());
return;
}
Optional.ofNullable(dataSourceProperties.getFile())
.ifPresent(file -> {
try {
dataSourceProperties.getFile().setFile(ResourceUtils
.getFile(StringUtils.trimAllWhitespace(
dataSourceProperties.getFile()
.getFile()))
.getAbsolutePath());
}
catch (IOException e) {
logger.error("[Sentinel Starter] DataSource "
+ dataSourceName + " handle file error: "
+ e.getMessage());
throw new RuntimeException(
"[Sentinel Starter] DataSource "
+ dataSourceName
+ " handle file error: "
+ e.getMessage(),
e);
}
registerBean(beanFactory, file,
dataSourceName + "-sentinel-file-datasource");
});
Optional.ofNullable(dataSourceProperties.getNacos())
.ifPresent(nacos -> {
registerBean(beanFactory, nacos,
dataSourceName + "-sentinel-nacos-datasource");
});
Optional.ofNullable(dataSourceProperties.getApollo())
.ifPresent(apollo -> {
registerBean(beanFactory, apollo,
dataSourceName + "-sentinel-apollo-datasource");
});
Optional.ofNullable(dataSourceProperties.getZk()).ifPresent(zk -> {
registerBean(beanFactory, zk,
dataSourceName + "-sentinel-zk-datasource");
});
});
dataSourceBeanNameList.forEach(beanName -> {
ReadableDataSource dataSource = beanFactory.getBean(beanName,
ReadableDataSource.class);
Object ruleConfig;
try {
logger.info("[Sentinel Starter] DataSource " + beanName
+ " start to loadConfig");
ruleConfig = dataSource.loadConfig();
}
catch (Exception e) {
logger.error("[Sentinel Starter] DataSource " + beanName
+ " loadConfig error: " + e.getMessage(), e);
return;
}
SentinelProperty sentinelProperty = dataSource.getProperty();
Class ruleType = getAndCheckRuleType(ruleConfig, beanName);
if (ruleType != null) {
if (ruleType == FlowRule.class) {
FlowRuleManager.register2Property(sentinelProperty);
}
else if (ruleType == DegradeRule.class) {
DegradeRuleManager.register2Property(sentinelProperty);
}
else if (ruleType == SystemRule.class) {
SystemRuleManager.register2Property(sentinelProperty);
}
else {
AuthorityRuleManager.register2Property(sentinelProperty);
}
}
});
}
private void registerBean(DefaultListableBeanFactory beanFactory,
final AbstractDataSourceProperties dataSourceProperties,
String dataSourceName) {
Map<String, Object> propertyMap = Arrays
.stream(dataSourceProperties.getClass().getDeclaredFields())
.collect(HashMap::new, (m, v) -> {
try {
v.setAccessible(true);
m.put(v.getName(), v.get(dataSourceProperties));
}
catch (IllegalAccessException e) {
logger.error("[Sentinel Starter] DataSource " + dataSourceName
+ " field: " + v.getName() + " invoke error");
throw new RuntimeException(
"[Sentinel Starter] DataSource " + dataSourceName
+ " field: " + v.getName() + " invoke error",
e);
}
}, HashMap::putAll);
propertyMap.put(CONVERTERCLASS_FIELD, dataSourceProperties.getConverterClass());
propertyMap.put(DATATYPE_FIELD, dataSourceProperties.getDataType());
BeanDefinitionBuilder builder = BeanDefinitionBuilder
.genericBeanDefinition(dataSourceProperties.getFactoryBeanName());
propertyMap.forEach((propertyName, propertyValue) -> {
Field field = ReflectionUtils.findField(dataSourceProperties.getClass(),
propertyName);
if (field != null) {
if (DATATYPE_FIELD.equals(propertyName)) {
String dataType = StringUtils
.trimAllWhitespace(propertyValue.toString());
if (CUSTOM_DATATYPE.equals(dataType)) {
try {
if (StringUtils
.isEmpty(dataSourceProperties.getConverterClass())) {
throw new RuntimeException(
"[Sentinel Starter] DataSource " + dataSourceName
+ "dataType is custom, please set converter-class "
+ "property");
}
// construct custom Converter with 'converterClass'
// configuration and register
String customConvertBeanName = "sentinel-"
+ dataSourceProperties.getConverterClass();
if (!beanFactory.containsBean(customConvertBeanName)) {
beanFactory.registerBeanDefinition(customConvertBeanName,
BeanDefinitionBuilder
.genericBeanDefinition(
Class.forName(dataSourceProperties
.getConverterClass()))
.getBeanDefinition());
}
builder.addPropertyReference("converter",
customConvertBeanName);
}
catch (ClassNotFoundException e) {
logger.error("[Sentinel Starter] DataSource " + dataSourceName
+ " handle "
+ dataSourceProperties.getClass().getSimpleName()
+ " error, class name: "
+ dataSourceProperties.getConverterClass());
throw new RuntimeException(
"[Sentinel Starter] DataSource " + dataSourceName
+ " handle "
+ dataSourceProperties.getClass()
.getSimpleName()
+ " error, class name: "
+ dataSourceProperties.getConverterClass(),
e);
}
}
else {
if (!dataTypeList.contains(StringUtils
.trimAllWhitespace(propertyValue.toString()))) {
throw new RuntimeException("[Sentinel Starter] DataSource "
+ dataSourceName + " dataType: " + propertyValue
+ " is not support now. please using these types: "
+ dataTypeList.toString());
}
// converter type now support xml or json.
// The bean name of these converters wrapped by
// 'sentinel-{converterType}-converter'
builder.addPropertyReference("converter",
"sentinel-" + propertyValue.toString() + "-converter");
}
}
else if (CONVERTERCLASS_FIELD.equals(propertyName)) {
return;
}
else {
// wired properties
builder.addPropertyValue(propertyName, propertyValue);
}
}
});
beanFactory.registerBeanDefinition(dataSourceName, builder.getBeanDefinition());
// init in Spring
beanFactory.getBean(dataSourceName);
dataSourceBeanNameList.add(dataSourceName);
}
private Class getAndCheckRuleType(Object ruleConfig, String dataSourceName) {
if (rulesList.contains(ruleConfig.getClass())) {
logger.info("[Sentinel Starter] DataSource {} load {} {}", dataSourceName, 1,
ruleConfig.getClass().getSimpleName());
return ruleConfig.getClass();
}
else if (ruleConfig instanceof List) {
List convertedRuleList = (List) ruleConfig;
if (CollectionUtils.isEmpty(convertedRuleList)) {
logger.warn("[Sentinel Starter] DataSource {} rule list is empty.",
dataSourceName);
return null;
}
if (convertedRuleList.stream()
.allMatch(rule -> rulesList.contains(rule.getClass()))) {
if (rulesList.contains(convertedRuleList.get(0).getClass())
&& convertedRuleList.stream()
.filter(rule -> rule.getClass() == convertedRuleList
.get(0).getClass())
.toArray().length == convertedRuleList.size()) {
logger.info("[Sentinel Starter] DataSource {} load {} {}",
dataSourceName, convertedRuleList.size(),
convertedRuleList.get(0).getClass().getSimpleName());
return convertedRuleList.get(0).getClass();
}
else {
logger.warn(
"[Sentinel Starter] DataSource {} all rules are not same rule type and it will not be used. "
+ "Rule List: {}",
dataSourceName, convertedRuleList.toString());
}
}
else {
List<Class> classList = (List<Class>) convertedRuleList.stream()
.map(Object::getClass).collect(Collectors.toList());
logger.error("[Sentinel Starter] DataSource " + dataSourceName
+ " rule class is invalid. Class List: " + classList);
throw new RuntimeException(
"[Sentinel Starter] DataSource " + dataSourceName
+ " rule class is invalid. Class List: " + classList);
}
}
else {
logger.error("[Sentinel Starter] DataSource " + dataSourceName
+ " rule class is invalid. Class: " + ruleConfig.getClass());
throw new RuntimeException("[Sentinel Starter] DataSource " + dataSourceName
+ " rule class is invalid. Class: " + ruleConfig.getClass());
}
return null;
}
public List<String> getDataSourceBeanNameList() {
return dataSourceBeanNameList;
}
}

@ -20,16 +20,20 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
import org.springframework.cloud.alibaba.sentinel.SentinelProperties;
import org.springframework.cloud.alibaba.sentinel.custom.SentinelDataSourceHandler;
import org.springframework.context.ApplicationContext;
import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.csp.sentinel.slots.system.SystemRule;
import com.alibaba.csp.sentinel.slots.system.SystemRuleManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
import org.springframework.cloud.alibaba.sentinel.SentinelProperties;
/**
* Endpoint for Sentinel, contains ans properties and rules
@ -41,9 +45,15 @@ public class SentinelEndpoint {
@Autowired
private SentinelProperties sentinelProperties;
@Autowired
private SentinelDataSourceHandler dataSourceHandler;
@Autowired
private ApplicationContext applicationContext;
@ReadOperation
public Map<String, Object> invoke() {
Map<String, Object> result = new HashMap<>();
final Map<String, Object> result = new HashMap<>();
List<FlowRule> flowRules = FlowRuleManager.getRules();
List<DegradeRule> degradeRules = DegradeRuleManager.getRules();
@ -52,6 +62,21 @@ public class SentinelEndpoint {
result.put("FlowRules", flowRules);
result.put("DegradeRules", degradeRules);
result.put("SystemRules", systemRules);
result.put("datasources", new HashMap<String, Object>());
dataSourceHandler.getDataSourceBeanNameList().forEach(dataSourceBeanName -> {
ReadableDataSource dataSource = applicationContext.getBean(dataSourceBeanName,
ReadableDataSource.class);
try {
((HashMap) result.get("datasources")).put(dataSourceBeanName,
dataSource.loadConfig());
}
catch (Exception e) {
((HashMap) result.get("datasources")).put(dataSourceBeanName,
"load error: " + e.getMessage());
}
});
return result;
}

@ -14,5 +14,6 @@
<module>spring-cloud-starter-alibaba-nacos-config</module>
<module>spring-cloud-starter-alibaba-nacos-discovery</module>
<module>spring-cloud-starter-alibaba-sentinel</module>
<module>spring-cloud-starter-stream-rocketmq</module>
</modules>
</project>

@ -0,0 +1,20 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba</artifactId>
<version>0.2.1.BUILD-SNAPSHOT</version>
</parent>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<name>Spring Cloud Starter Stream RocketMQ</name>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
</dependency>
</dependencies>
</project>

@ -0,0 +1,77 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-alibaba</artifactId>
<version>0.2.1.BUILD-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
<name>Spring Cloud Alibaba RocketMQ Binder</name>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-actuator</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-actuator-autoconfigure</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

@ -0,0 +1,26 @@
package org.springframework.cloud.stream.binder.rocketmq;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public interface RocketMQBinderConstants {
/**
* Header key
*/
String ORIGINAL_ROCKET_MESSAGE = "ORIGINAL_ROCKETMQ_MESSAGE";
String ROCKET_FLAG = "ROCKETMQ_FLAG";
String ROCKET_SEND_RESULT = "ROCKETMQ_SEND_RESULT";
String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT";
/**
* Instrumentation key
*/
String LASTSEND_TIMESTAMP = "lastSend.timestamp";
String ENDPOINT_ID = "rocketmq-binder";
}

@ -0,0 +1,101 @@
package org.springframework.cloud.stream.binder.rocketmq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager;
import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.core.MessageProducer;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQMessageChannelBinder extends
AbstractMessageChannelBinder<ExtendedConsumerProperties<RocketMQConsumerProperties>,
ExtendedProducerProperties<RocketMQProducerProperties>, RocketMQTopicProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, RocketMQConsumerProperties, RocketMQProducerProperties> {
private static final Logger logger = LoggerFactory.getLogger(RocketMQMessageChannelBinder.class);
private final RocketMQExtendedBindingProperties extendedBindingProperties;
private final RocketMQTopicProvisioner rocketTopicProvisioner;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
private final InstrumentationManager instrumentationManager;
private final ConsumersManager consumersManager;
public RocketMQMessageChannelBinder(ConsumersManager consumersManager,
RocketMQExtendedBindingProperties extendedBindingProperties,
RocketMQTopicProvisioner provisioningProvider,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
InstrumentationManager instrumentationManager) {
super(null, provisioningProvider);
this.consumersManager = consumersManager;
this.extendedBindingProperties = extendedBindingProperties;
this.rocketTopicProvisioner = provisioningProvider;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
this.instrumentationManager = instrumentationManager;
}
@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<RocketMQProducerProperties>
producerProperties,
MessageChannel errorChannel) throws Exception {
if (producerProperties.getExtension().getEnabled()) {
return new RocketMQMessageHandler(destination.getName(), producerProperties.getExtension(),
rocketBinderConfigurationProperties, instrumentationManager);
} else {
throw new RuntimeException(
"Binding for channel " + destination.getName() + "has been disabled, message can't be delivered");
}
}
@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group,
ExtendedConsumerProperties<RocketMQConsumerProperties>
consumerProperties)
throws Exception {
if (group == null || "".equals(group)) {
throw new RuntimeException("'group' must be configured for channel + " + destination.getName());
}
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(consumersManager,
consumerProperties, destination.getName(), group, instrumentationManager);
ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, group,
consumerProperties);
if (consumerProperties.getMaxAttempts() > 1) {
rocketInboundChannelAdapter.setRetryTemplate(buildRetryTemplate(consumerProperties));
rocketInboundChannelAdapter.setRecoveryCallback(errorInfrastructure.getRecoverer());
} else {
rocketInboundChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());
}
return rocketInboundChannelAdapter;
}
@Override
public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) {
return extendedBindingProperties.getExtendedConsumerProperties(channelName);
}
@Override
public RocketMQProducerProperties getExtendedProducerProperties(String channelName) {
return extendedBindingProperties.getExtendedProducerProperties(channelName);
}
}

@ -0,0 +1,106 @@
package org.springframework.cloud.stream.binder.rocketmq;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.cloud.stream.binder.rocketmq.consuming.Acknowledgement;
import org.springframework.integration.support.MutableMessage;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageHeaderAccessor;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ACKNOWLEDGEMENT_KEY;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ORIGINAL_ROCKET_MESSAGE;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_FLAG;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_SEND_RESULT;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQMessageHeaderAccessor extends MessageHeaderAccessor {
public RocketMQMessageHeaderAccessor() {
super();
}
public RocketMQMessageHeaderAccessor(Message<?> message) {
super(message);
}
public Acknowledgement getAcknowledgement(Message message) {
return message.getHeaders().get(ACKNOWLEDGEMENT_KEY, Acknowledgement.class);
}
public RocketMQMessageHeaderAccessor withAcknowledgment(Acknowledgement acknowledgment) {
setHeader(ACKNOWLEDGEMENT_KEY, acknowledgment);
return this;
}
public String getTags() {
return (String)getMessageHeaders().getOrDefault(MessageConst.PROPERTY_TAGS, "");
}
public RocketMQMessageHeaderAccessor withTags(String tag) {
setHeader(MessageConst.PROPERTY_TAGS, tag);
return this;
}
public String getKeys() {
return (String)getMessageHeaders().getOrDefault(MessageConst.PROPERTY_KEYS, "");
}
public RocketMQMessageHeaderAccessor withKeys(String keys) {
setHeader(MessageConst.PROPERTY_KEYS, keys);
return this;
}
public MessageExt getRocketMessage() {
return getMessageHeaders().get(ORIGINAL_ROCKET_MESSAGE, MessageExt.class);
}
public RocketMQMessageHeaderAccessor withRocketMessage(MessageExt message) {
setHeader(ORIGINAL_ROCKET_MESSAGE, message);
return this;
}
public Integer getDelayTimeLevel() {
return (Integer)getMessageHeaders().getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0);
}
public RocketMQMessageHeaderAccessor withDelayTimeLevel(Integer delayTimeLevel) {
setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel);
return this;
}
public Integer getFlag() {
return (Integer)getMessageHeaders().getOrDefault(ROCKET_FLAG, 0);
}
public RocketMQMessageHeaderAccessor withFlag(Integer delayTimeLevel) {
setHeader(ROCKET_FLAG, delayTimeLevel);
return this;
}
public SendResult getSendResult() {
return getMessageHeaders().get(ROCKET_SEND_RESULT, SendResult.class);
}
public static void putSendResult(MutableMessage message, SendResult sendResult) {
message.getHeaders().put(ROCKET_SEND_RESULT, sendResult);
}
public Map<String, String> getUserProperties() {
Map<String, String> result = new HashMap<>();
for (Map.Entry<String, Object> entry : this.toMap().entrySet()) {
if (entry.getValue() instanceof String && !MessageConst.STRING_HASH_SET.contains(entry.getKey()) && !entry
.getKey().equals(MessageHeaders.CONTENT_TYPE)) {
result.put(entry.getKey(), (String)entry.getValue());
}
}
return result;
}
}

@ -0,0 +1,39 @@
package org.springframework.cloud.stream.binder.rocketmq.actuator;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.codahale.metrics.MetricRegistry;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ENDPOINT_ID;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@Endpoint(id = ENDPOINT_ID)
public class RocketMQBinderEndpoint {
private MetricRegistry metricRegistry = new MetricRegistry();
private Map<String, Object> runtime = new ConcurrentHashMap<>();
@ReadOperation
public Map<String, Object> invoke() {
Map<String, Object> result = new HashMap<>();
result.put("metrics", metricRegistry().getMetrics());
result.put("runtime", runtime());
return result;
}
public MetricRegistry metricRegistry() {
return metricRegistry;
}
public Map<String, Object> runtime() {
return runtime;
}
}

@ -0,0 +1,37 @@
package org.springframework.cloud.stream.binder.rocketmq.actuator;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator {
private final InstrumentationManager instrumentationManager;
public RocketMQBinderHealthIndicator(InstrumentationManager instrumentationManager) {
this.instrumentationManager = instrumentationManager;
}
@Override
protected void doHealthCheck(Health.Builder builder) throws Exception {
if (instrumentationManager.getHealthInstrumentations().stream().
allMatch(Instrumentation::isUp)) {
builder.up();
return;
}
if (instrumentationManager.getHealthInstrumentations().stream().
allMatch(Instrumentation::isOutOfService)) {
builder.outOfService();
return;
}
builder.down();
instrumentationManager.getHealthInstrumentations().stream().
filter(instrumentation -> !instrumentation.isStarted()).
forEach(instrumentation1 -> builder.withException(instrumentation1.getStartException()));
}
}

@ -0,0 +1,54 @@
package org.springframework.cloud.stream.binder.rocketmq.config;
import org.apache.rocketmq.client.log.ClientLogger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@Configuration
@EnableConfigurationProperties({RocketMQBinderConfigurationProperties.class, RocketMQExtendedBindingProperties.class})
public class RocketMQBinderAutoConfiguration {
private final RocketMQExtendedBindingProperties extendedBindingProperties;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
@Autowired
public RocketMQBinderAutoConfiguration(RocketMQExtendedBindingProperties extendedBindingProperties,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
this.extendedBindingProperties = extendedBindingProperties;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
System.setProperty(ClientLogger.CLIENT_LOG_LEVEL, this.rocketBinderConfigurationProperties.getLogLevel());
}
@Bean
public RocketMQTopicProvisioner provisioningProvider() {
return new RocketMQTopicProvisioner();
}
@Bean
public RocketMQMessageChannelBinder rocketMessageChannelBinder(RocketMQTopicProvisioner provisioningProvider,
InstrumentationManager instrumentationManager,
ConsumersManager consumersManager) {
RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder(consumersManager, extendedBindingProperties,
provisioningProvider, rocketBinderConfigurationProperties, instrumentationManager);
return binder;
}
@Bean
public ConsumersManager consumersManager(InstrumentationManager instrumentationManager) {
return new ConsumersManager(instrumentationManager, rocketBinderConfigurationProperties);
}
}

@ -0,0 +1,33 @@
package org.springframework.cloud.stream.binder.rocketmq.config;
import org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderEndpoint;
import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@Configuration
@AutoConfigureAfter(EndpointAutoConfiguration.class)
public class RocketMQBinderEndpointAutoConfiguration {
@Bean
public RocketMQBinderEndpoint rocketBinderEndpoint() {
return new RocketMQBinderEndpoint();
}
@Bean
public RocketMQBinderHealthIndicator rocketBinderHealthIndicator(InstrumentationManager instrumentationManager) {
return new RocketMQBinderHealthIndicator(instrumentationManager);
}
@Bean
public InstrumentationManager instrumentationManager(RocketMQBinderEndpoint rocketBinderEndpoint) {
return new InstrumentationManager(rocketBinderEndpoint.metricRegistry(), rocketBinderEndpoint.runtime());
}
}

@ -0,0 +1,78 @@
package org.springframework.cloud.stream.binder.rocketmq.consuming;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class Acknowledgement {
/**
* for {@link ConsumeConcurrentlyContext} using
*/
private ConsumeConcurrentlyStatus consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
/**
* Message consume retry strategy<br>
* -1,no retry,put into DLQ directly<br>
* 0,broker control retry frequency<br>
* >0,client control retry frequency
*/
private Integer consumeConcurrentlyDelayLevel = 0;
/**
* for {@link ConsumeOrderlyContext} using
*/
private ConsumeOrderlyStatus consumeOrderlyStatus = ConsumeOrderlyStatus.SUCCESS;
private Long consumeOrderlySuspendCurrentQueueTimeMill = -1L;
public Acknowledgement setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus consumeConcurrentlyStatus) {
this.consumeConcurrentlyStatus = consumeConcurrentlyStatus;
return this;
}
public ConsumeConcurrentlyStatus getConsumeConcurrentlyStatus() {
return consumeConcurrentlyStatus;
}
public ConsumeOrderlyStatus getConsumeOrderlyStatus() {
return consumeOrderlyStatus;
}
public Acknowledgement setConsumeOrderlyStatus(ConsumeOrderlyStatus consumeOrderlyStatus) {
this.consumeOrderlyStatus = consumeOrderlyStatus;
return this;
}
public Integer getConsumeConcurrentlyDelayLevel() {
return consumeConcurrentlyDelayLevel;
}
public void setConsumeConcurrentlyDelayLevel(Integer consumeConcurrentlyDelayLevel) {
this.consumeConcurrentlyDelayLevel = consumeConcurrentlyDelayLevel;
}
public Long getConsumeOrderlySuspendCurrentQueueTimeMill() {
return consumeOrderlySuspendCurrentQueueTimeMill;
}
public void setConsumeOrderlySuspendCurrentQueueTimeMill(Long consumeOrderlySuspendCurrentQueueTimeMill) {
this.consumeOrderlySuspendCurrentQueueTimeMill = consumeOrderlySuspendCurrentQueueTimeMill;
}
public static Acknowledgement buildOrderlyInstance() {
Acknowledgement acknowledgement = new Acknowledgement();
acknowledgement.setConsumeOrderlyStatus(ConsumeOrderlyStatus.SUCCESS);
return acknowledgement;
}
public static Acknowledgement buildConcurrentlyInstance() {
Acknowledgement acknowledgement = new Acknowledgement();
acknowledgement.setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS);
return acknowledgement;
}
}

@ -0,0 +1,106 @@
package org.springframework.cloud.stream.binder.rocketmq.consuming;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.rocketmq.metrics.ConsumerGroupInstrumentation;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class ConsumersManager {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Map<String, DefaultMQPushConsumer> consumerGroups = new HashMap<>();
private final Map<String, Boolean> started = new HashMap<>();
private final Map<Map.Entry<String, String>, ExtendedConsumerProperties<RocketMQConsumerProperties>> propertiesMap
= new HashMap<>();
private final InstrumentationManager instrumentationManager;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
public ConsumersManager(InstrumentationManager instrumentationManager,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
this.instrumentationManager = instrumentationManager;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
}
public synchronized DefaultMQPushConsumer getOrCreateConsumer(String group, String topic,
ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties) {
propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic), consumerProperties);
ConsumerGroupInstrumentation instrumentation = instrumentationManager.getConsumerGroupInstrumentation(group);
instrumentationManager.addHealthInstrumentation(instrumentation);
if (consumerGroups.containsKey(group)) {
return consumerGroups.get(group);
}
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
consumer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
consumerGroups.put(group, consumer);
started.put(group, false);
consumer.setConsumeThreadMax(consumerProperties.getConcurrency());
consumer.setConsumeThreadMin(consumerProperties.getConcurrency());
if (consumerProperties.getExtension().getBroadcasting()) {
consumer.setMessageModel(MessageModel.BROADCASTING);
}
logger.info("RocketMQ consuming for SCS group {} created", group);
return consumer;
}
public synchronized void startConsumers() throws MQClientException {
for (String group : getConsumerGroups()) {
start(group);
}
}
public synchronized void startConsumer(String group) throws MQClientException {
start(group);
}
public synchronized void stopConsumer(String group) {
stop(group);
}
private void stop(String group) {
if (consumerGroups.get(group) != null) {
consumerGroups.get(group).shutdown();
started.put(group, false);
}
}
private synchronized void start(String group) throws MQClientException {
if (started.get(group)) {
return;
}
ConsumerGroupInstrumentation groupInstrumentation = instrumentationManager.getConsumerGroupInstrumentation(
group);
instrumentationManager.addHealthInstrumentation(groupInstrumentation);
try {
consumerGroups.get(group).start();
started.put(group, true);
groupInstrumentation.markStartedSuccessfully();
} catch (MQClientException e) {
groupInstrumentation.markStartFailed(e);
logger.error("RocketMQ Consumer hasn't been started. Caused by " + e.getErrorMessage(), e);
throw e;
}
}
public synchronized Set<String> getConsumerGroups() {
return consumerGroups.keySet();
}
}

@ -0,0 +1,253 @@
package org.springframework.cloud.stream.binder.rocketmq.integration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ClassUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor;
import org.springframework.cloud.stream.binder.rocketmq.consuming.Acknowledgement;
import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager;
import org.springframework.cloud.stream.binder.rocketmq.metrics.ConsumerInstrumentation;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.StringUtils;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
private static final Logger logger = LoggerFactory.getLogger(RocketMQInboundChannelAdapter.class);
private ConsumerInstrumentation consumerInstrumentation;
private final ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties;
private final String destination;
private final String group;
private final InstrumentationManager instrumentationManager;
private final ConsumersManager consumersManager;
private RetryTemplate retryTemplate;
private RecoveryCallback<? extends Object> recoveryCallback;
public RocketMQInboundChannelAdapter(ConsumersManager consumersManager,
ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties,
String destination, String group,
InstrumentationManager instrumentationManager) {
this.consumersManager = consumersManager;
this.consumerProperties = consumerProperties;
this.destination = destination;
this.group = group;
this.instrumentationManager = instrumentationManager;
}
@Override
protected void doStart() {
if (!consumerProperties.getExtension().getEnabled()) {
return;
}
String tags = consumerProperties == null ? null : consumerProperties.getExtension().getTags();
Boolean isOrderly = consumerProperties == null ? false : consumerProperties.getExtension().getOrderly();
DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group, destination, consumerProperties);
final CloudStreamMessageListener listener = isOrderly ? new CloudStreamMessageListenerOrderly(
instrumentationManager)
: new CloudStreamMessageListenerConcurrently(instrumentationManager);
if (retryTemplate != null) {
retryTemplate.registerListener(listener);
}
Set<String> tagsSet = tags == null ? new HashSet<>() : Arrays.stream(tags.split("\\|\\|")).map(String::trim)
.collect(Collectors.toSet());
consumerInstrumentation = instrumentationManager.getConsumerInstrumentation(destination);
instrumentationManager.addHealthInstrumentation(consumerInstrumentation);
try {
if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) {
consumer.subscribe(destination, MessageSelector.bySql(consumerProperties.getExtension().getSql()));
} else {
consumer.subscribe(destination, String.join(" || ", tagsSet));
}
consumerInstrumentation.markStartedSuccessfully();
} catch (MQClientException e) {
consumerInstrumentation.markStartFailed(e);
logger.error("RocketMQ Consumer hasn't been subscribed. Caused by " + e.getErrorMessage(), e);
throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e);
}
consumer.registerMessageListener(listener);
try {
consumersManager.startConsumer(group);
} catch (MQClientException e) {
logger.error("RocketMQ Consumer startup failed. Caused by " + e.getErrorMessage(), e);
throw new RuntimeException("RocketMQ Consumer startup failed.", e);
}
}
@Override
protected void doStop() {
consumersManager.stopConsumer(group);
}
public void setRetryTemplate(RetryTemplate retryTemplate) {
this.retryTemplate = retryTemplate;
}
public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallback) {
this.recoveryCallback = recoveryCallback;
}
protected class CloudStreamMessageListener implements MessageListener, RetryListener {
private final InstrumentationManager instrumentationManager;
CloudStreamMessageListener(InstrumentationManager instrumentationManager) {
this.instrumentationManager = instrumentationManager;
}
Acknowledgement consumeMessage(final List<MessageExt> msgs) {
boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null;
try {
if (enableRetry) {
return RocketMQInboundChannelAdapter.this.retryTemplate.execute(
(RetryCallback<Acknowledgement, Exception>)context -> doSendMsgs(msgs, context),
new RecoveryCallback<Acknowledgement>() {
@Override
public Acknowledgement recover(RetryContext context) throws Exception {
RocketMQInboundChannelAdapter.this.recoveryCallback.recover(context);
if (ClassUtils.isAssignable(this.getClass(), MessageListenerConcurrently.class)) {
return Acknowledgement.buildConcurrentlyInstance();
} else {
return Acknowledgement.buildOrderlyInstance();
}
}
});
} else {
Acknowledgement result = doSendMsgs(msgs, null);
instrumentationManager.getConsumerInstrumentation(RocketMQInboundChannelAdapter.this.destination)
.markConsumed();
return result;
}
} catch (Exception e) {
logger.error("Rocket Message hasn't been processed successfully. Caused by ", e);
instrumentationManager.getConsumerInstrumentation(RocketMQInboundChannelAdapter.this.destination)
.markConsumedFailure();
throw new RuntimeException("Rocket Message hasn't been processed successfully. Caused by ", e);
}
}
private Acknowledgement doSendMsgs(final List<MessageExt> msgs, RetryContext context) {
List<Acknowledgement> acknowledgements = new ArrayList<>();
msgs.forEach(msg -> {
String retryInfo = context == null ? "" : "retryCount-" + String.valueOf(context.getRetryCount()) + "|";
logger.debug(retryInfo + "consuming msg:\n" + msg);
logger.debug(retryInfo + "message body:\n" + new String(msg.getBody()));
Acknowledgement acknowledgement = new Acknowledgement();
Message<byte[]> toChannel = MessageBuilder.withPayload(msg.getBody()).
setHeaders(new RocketMQMessageHeaderAccessor().
withAcknowledgment(acknowledgement).
withTags(msg.getTags()).
withKeys(msg.getKeys()).
withFlag(msg.getFlag()).
withRocketMessage(msg)
).build();
acknowledgements.add(acknowledgement);
RocketMQInboundChannelAdapter.this.sendMessage(toChannel);
});
return acknowledgements.get(0);
}
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
if (throwable != null) {
instrumentationManager.getConsumerInstrumentation(
RocketMQInboundChannelAdapter.this.destination)
.markConsumedFailure();
} else {
instrumentationManager.getConsumerInstrumentation(
RocketMQInboundChannelAdapter.this.destination)
.markConsumed();
}
}
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
}
}
protected class CloudStreamMessageListenerConcurrently extends CloudStreamMessageListener implements
MessageListenerConcurrently {
public CloudStreamMessageListenerConcurrently(InstrumentationManager instrumentationManager) {
super(instrumentationManager);
}
@Override
public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
Acknowledgement acknowledgement = consumeMessage(msgs);
context.setDelayLevelWhenNextConsume(acknowledgement.getConsumeConcurrentlyDelayLevel());
return acknowledgement.getConsumeConcurrentlyStatus();
}
}
protected class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener implements
MessageListenerOrderly {
public CloudStreamMessageListenerOrderly(InstrumentationManager instrumentationManager) {
super(instrumentationManager);
}
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
Acknowledgement acknowledgement = consumeMessage(msgs);
context.setSuspendCurrentQueueTimeMillis((acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill()));
return acknowledgement.getConsumeOrderlyStatus();
}
}
}

@ -0,0 +1,131 @@
package org.springframework.cloud.stream.binder.rocketmq.integration;
import java.time.Instant;
import java.util.Map;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.cloud.stream.binder.rocketmq.metrics.ProducerInstrumentation;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import org.springframework.context.Lifecycle;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.support.MutableMessage;
import org.springframework.messaging.MessagingException;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQMessageHandler extends AbstractMessageHandler implements Lifecycle {
private DefaultMQProducer producer;
private ProducerInstrumentation producerInstrumentation;
private final RocketMQProducerProperties producerProperties;
private final String destination;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
private final InstrumentationManager instrumentationManager;
protected volatile boolean running = false;
public RocketMQMessageHandler(String destination, RocketMQProducerProperties producerProperties,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
InstrumentationManager instrumentationManager) {
this.destination = destination;
this.producerProperties = producerProperties;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
this.instrumentationManager = instrumentationManager;
}
@Override
public void start() {
producer = new DefaultMQProducer(destination);
producerInstrumentation = instrumentationManager.getProducerInstrumentation(destination);
instrumentationManager.addHealthInstrumentation(producerInstrumentation);
producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
if (producerProperties.getMaxMessageSize() > 0) {
producer.setMaxMessageSize(producerProperties.getMaxMessageSize());
}
try {
producer.start();
producerInstrumentation.markStartedSuccessfully();
} catch (MQClientException e) {
producerInstrumentation.markStartFailed(e);
logger.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
throw new MessagingException(e.getMessage(), e);
}
running = true;
}
@Override
public void stop() {
if (producer != null) {
producer.shutdown();
}
running = false;
}
@Override
public boolean isRunning() {
return running;
}
@Override
protected void handleMessageInternal(org.springframework.messaging.Message<?> message) throws Exception {
try {
Message toSend;
if (message.getPayload() instanceof byte[]) {
toSend = new Message(destination, (byte[])message.getPayload());
} else if (message.getPayload() instanceof String) {
toSend = new Message(destination, ((String)message.getPayload()).getBytes());
} else {
throw new UnsupportedOperationException(
"Payload class isn't supported: " + message.getPayload().getClass());
}
RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message);
headerAccessor.setLeaveMutable(true);
toSend.setDelayTimeLevel(headerAccessor.getDelayTimeLevel());
toSend.setTags(headerAccessor.getTags());
toSend.setKeys(headerAccessor.getKeys());
toSend.setFlag(headerAccessor.getFlag());
for (Map.Entry<String, String> entry : headerAccessor.getUserProperties().entrySet()) {
toSend.putUserProperty(entry.getKey(), entry.getValue());
}
SendResult sendRes = producer.send(toSend);
if (!sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
throw new MQClientException("message hasn't been sent", null);
}
if (message instanceof MutableMessage) {
RocketMQMessageHeaderAccessor.putSendResult((MutableMessage)message, sendRes);
}
instrumentationManager.getRuntime().put(RocketMQBinderConstants.LASTSEND_TIMESTAMP,
Instant.now().toEpochMilli());
producerInstrumentation.markSent();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException |
UnsupportedOperationException e) {
producerInstrumentation.markSentFailure();
logger.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
throw new MessagingException(e.getMessage(), e);
}
}
}

@ -0,0 +1,34 @@
package org.springframework.cloud.stream.binder.rocketmq.metrics;
import java.util.concurrent.atomic.AtomicBoolean;
import com.codahale.metrics.MetricRegistry;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class ConsumerGroupInstrumentation extends Instrumentation {
private MetricRegistry metricRegistry;
private AtomicBoolean delayedStart = new AtomicBoolean(false);
public ConsumerGroupInstrumentation(MetricRegistry metricRegistry, String name) {
super(name);
this.metricRegistry = metricRegistry;
}
public void markDelayedStart() {
delayedStart.set(true);
}
@Override
public boolean isUp() {
return started.get() || delayedStart.get();
}
@Override
public boolean isOutOfService() {
return !started.get() && startException == null && !delayedStart.get();
}
}

@ -0,0 +1,37 @@
package org.springframework.cloud.stream.binder.rocketmq.metrics;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import static com.codahale.metrics.MetricRegistry.name;
/**
* @author juven.xuxb
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class ConsumerInstrumentation extends Instrumentation {
private final Counter totalConsumed;
private final Counter totalConsumedFailures;
private final Meter consumedPerSecond;
private final Meter consumedFailuresPerSecond;
public ConsumerInstrumentation(MetricRegistry registry, String baseMetricName) {
super(baseMetricName);
this.totalConsumed = registry.counter(name(baseMetricName, "totalConsumed"));
this.consumedPerSecond = registry.meter(name(baseMetricName, "consumedPerSecond"));
this.totalConsumedFailures = registry.counter(name(baseMetricName, "totalConsumedFailures"));
this.consumedFailuresPerSecond = registry.meter(name(baseMetricName, "consumedFailuresPerSecond"));
}
public void markConsumed() {
totalConsumed.inc();
consumedPerSecond.mark();
}
public void markConsumedFailure() {
totalConsumedFailures.inc();
consumedFailuresPerSecond.mark();
}
}

@ -0,0 +1,50 @@
package org.springframework.cloud.stream.binder.rocketmq.metrics;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class Instrumentation {
private final String name;
protected final AtomicBoolean started = new AtomicBoolean(false);
protected Exception startException = null;
Instrumentation(String name) {
this.name = name;
}
public boolean isDown() {
return startException != null;
}
public boolean isUp() {
return started.get();
}
public boolean isOutOfService() {
return !started.get() && startException == null;
}
public void markStartedSuccessfully() {
started.set(true);
}
public void markStartFailed(Exception e) {
started.set(false);
startException = e;
}
public String getName() {
return name;
}
public boolean isStarted() {
return started.get();
}
public Exception getStartException() {
return startException;
}
}

@ -0,0 +1,57 @@
package org.springframework.cloud.stream.binder.rocketmq.metrics;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import com.codahale.metrics.MetricRegistry;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class InstrumentationManager {
private final MetricRegistry metricRegistry;
private final Map<String, Object> runtime;
private final Map<String, ProducerInstrumentation> producerInstrumentations = new HashMap<>();
private final Map<String, ConsumerInstrumentation> consumeInstrumentations = new HashMap<>();
private final Map<String, ConsumerGroupInstrumentation> consumerGroupsInstrumentations = new HashMap<>();
private final Map<String, Instrumentation> healthInstrumentations = new HashMap<>();
public InstrumentationManager(MetricRegistry metricRegistry, Map<String, Object> runtime) {
this.metricRegistry = metricRegistry;
this.runtime = runtime;
}
public ProducerInstrumentation getProducerInstrumentation(String destination) {
String key = "scs-rocketmq.producer." + destination;
producerInstrumentations.putIfAbsent(key, new ProducerInstrumentation(metricRegistry, key));
return producerInstrumentations.get(key);
}
public ConsumerInstrumentation getConsumerInstrumentation(String destination) {
String key = "scs-rocketmq.consumer." + destination;
consumeInstrumentations.putIfAbsent(key, new ConsumerInstrumentation(metricRegistry, key));
return consumeInstrumentations.get(key);
}
public ConsumerGroupInstrumentation getConsumerGroupInstrumentation(String group) {
String key = "scs-rocketmq.consumerGroup." + group;
consumerGroupsInstrumentations.putIfAbsent(key, new ConsumerGroupInstrumentation(metricRegistry, key));
return consumerGroupsInstrumentations.get(key);
}
public Set<Instrumentation> getHealthInstrumentations() {
return healthInstrumentations.entrySet().stream().map(Map.Entry::getValue).collect(Collectors.toSet());
}
public void addHealthInstrumentation(Instrumentation instrumentation) {
healthInstrumentations.put(instrumentation.getName(), instrumentation);
}
public Map<String, Object> getRuntime() {
return runtime;
}
}

@ -0,0 +1,37 @@
package org.springframework.cloud.stream.binder.rocketmq.metrics;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import static com.codahale.metrics.MetricRegistry.name;
/**
* @author juven.xuxb
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class ProducerInstrumentation extends Instrumentation {
private final Counter totalSent;
private final Counter totalSentFailures;
private final Meter sentPerSecond;
private final Meter sentFailuresPerSecond;
public ProducerInstrumentation(MetricRegistry registry, String baseMetricName) {
super(baseMetricName);
this.totalSent = registry.counter(name(baseMetricName, "totalSent"));
this.totalSentFailures = registry.counter(name(baseMetricName, "totalSentFailures"));
this.sentPerSecond = registry.meter(name(baseMetricName, "sentPerSecond"));
this.sentFailuresPerSecond = registry.meter(name(baseMetricName, "sentFailuresPerSecond"));
}
public void markSent() {
totalSent.inc();
sentPerSecond.mark();
}
public void markSentFailure() {
totalSentFailures.inc();
sentFailuresPerSecond.mark();
}
}

@ -0,0 +1,32 @@
package org.springframework.cloud.stream.binder.rocketmq.properties;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder")
public class RocketMQBinderConfigurationProperties {
private String namesrvAddr;
private String logLevel = "ERROR";
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public String getLogLevel() {
return logLevel;
}
public void setLogLevel(String logLevel) {
this.logLevel = logLevel;
}
}

@ -0,0 +1,28 @@
package org.springframework.cloud.stream.binder.rocketmq.properties;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQBindingProperties {
private RocketMQConsumerProperties consumer = new RocketMQConsumerProperties();
private RocketMQProducerProperties producer = new RocketMQProducerProperties();
public RocketMQConsumerProperties getConsumer() {
return consumer;
}
public void setConsumer(RocketMQConsumerProperties consumer) {
this.consumer = consumer;
}
public RocketMQProducerProperties getProducer() {
return producer;
}
public void setProducer(RocketMQProducerProperties producer) {
this.producer = producer;
}
}

@ -0,0 +1,79 @@
package org.springframework.cloud.stream.binder.rocketmq.properties;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQConsumerProperties {
/**
* using '||' to split tag
* {@link MQPushConsumer#subscribe(String, String)}
*/
private String tags;
/**
* {@link MQPushConsumer#subscribe(String, MessageSelector)}
* {@link MessageSelector#bySql(String)}
*/
private String sql;
/**
* {@link MessageModel#BROADCASTING}
*/
private Boolean broadcasting = false;
/**
* if orderly is true, using {@link MessageListenerOrderly}
* else if orderly if false, using {@link MessageListenerConcurrently}
*/
private Boolean orderly = false;
private Boolean enabled = true;
public String getTags() {
return tags;
}
public void setTags(String tags) {
this.tags = tags;
}
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
public Boolean getOrderly() {
return orderly;
}
public void setOrderly(Boolean orderly) {
this.orderly = orderly;
}
public Boolean getEnabled() {
return enabled;
}
public void setEnabled(Boolean enabled) {
this.enabled = enabled;
}
public Boolean getBroadcasting() {
return broadcasting;
}
public void setBroadcasting(Boolean broadcasting) {
this.broadcasting = broadcasting;
}
}

@ -0,0 +1,64 @@
package org.springframework.cloud.stream.binder.rocketmq.properties;
import java.util.HashMap;
import java.util.Map;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.stream.binder.ExtendedBindingProperties;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
@ConfigurationProperties("spring.cloud.stream.rocketmq")
public class RocketMQExtendedBindingProperties implements
ExtendedBindingProperties<RocketMQConsumerProperties, RocketMQProducerProperties> {
private Map<String, RocketMQBindingProperties> bindings = new HashMap<>();
public Map<String, RocketMQBindingProperties> getBindings() {
return this.bindings;
}
public void setBindings(Map<String, RocketMQBindingProperties> bindings) {
this.bindings = bindings;
}
@Override
public synchronized RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) {
if (bindings.containsKey(channelName)) {
if (bindings.get(channelName).getConsumer() != null) {
return bindings.get(channelName).getConsumer();
} else {
RocketMQConsumerProperties properties = new RocketMQConsumerProperties();
this.bindings.get(channelName).setConsumer(properties);
return properties;
}
} else {
RocketMQConsumerProperties properties = new RocketMQConsumerProperties();
RocketMQBindingProperties rbp = new RocketMQBindingProperties();
rbp.setConsumer(properties);
bindings.put(channelName, rbp);
return properties;
}
}
@Override
public synchronized RocketMQProducerProperties getExtendedProducerProperties(String channelName) {
if (bindings.containsKey(channelName)) {
if (bindings.get(channelName).getProducer() != null) {
return bindings.get(channelName).getProducer();
} else {
RocketMQProducerProperties properties = new RocketMQProducerProperties();
this.bindings.get(channelName).setProducer(properties);
return properties;
}
} else {
RocketMQProducerProperties properties = new RocketMQProducerProperties();
RocketMQBindingProperties rbp = new RocketMQBindingProperties();
rbp.setProducer(properties);
bindings.put(channelName, rbp);
return properties;
}
}
}

@ -0,0 +1,35 @@
package org.springframework.cloud.stream.binder.rocketmq.properties;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQProducerProperties {
private Boolean enabled = true;
/**
* Maximum allowed message size in bytes
* {@link DefaultMQProducer#maxMessageSize}
*/
private Integer maxMessageSize = 0;
public Boolean getEnabled() {
return enabled;
}
public void setEnabled(Boolean enabled) {
this.enabled = enabled;
}
public Integer getMaxMessageSize() {
return maxMessageSize;
}
public void setMaxMessageSize(Integer maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}
}

@ -0,0 +1,89 @@
package org.springframework.cloud.stream.binder.rocketmq.provisioning;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
/**
* @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQTopicProvisioner
implements
ProvisioningProvider<ExtendedConsumerProperties<RocketMQConsumerProperties>,
ExtendedProducerProperties<RocketMQProducerProperties>> {
private static final Logger logger = LoggerFactory.getLogger(RocketMQTopicProvisioner.class);
@Override
public ProducerDestination provisionProducerDestination(String name,
ExtendedProducerProperties<RocketMQProducerProperties>
properties)
throws ProvisioningException {
checkTopic(name);
return new RocketProducerDestination(name);
}
@Override
public ConsumerDestination provisionConsumerDestination(String name, String group,
ExtendedConsumerProperties<RocketMQConsumerProperties>
properties)
throws ProvisioningException {
checkTopic(name);
return new RocketConsumerDestination(name);
}
private void checkTopic(String topic) {
try {
Validators.checkTopic(topic);
} catch (MQClientException e) {
logger.error("topic check error: " + topic, e);
throw new AssertionError(e); // Can't happen
}
}
private static final class RocketProducerDestination implements ProducerDestination {
private final String producerDestinationName;
RocketProducerDestination(String destinationName) {
this.producerDestinationName = destinationName;
}
@Override
public String getName() {
return producerDestinationName;
}
@Override
public String getNameForPartition(int partition) {
return producerDestinationName;
}
}
private static final class RocketConsumerDestination implements ConsumerDestination {
private final String consumerDestinationName;
RocketConsumerDestination(String consumerDestinationName) {
this.consumerDestinationName = consumerDestinationName;
}
@Override
public String getName() {
return this.consumerDestinationName;
}
}
}

@ -0,0 +1 @@
rocketmq:org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration

@ -0,0 +1,2 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderEndpointAutoConfiguration

@ -0,0 +1,72 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration;
import org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderEndpointAutoConfiguration;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQAutoConfigurationTests {
private ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(
AutoConfigurations.of(RocketMQBinderEndpointAutoConfiguration.class,
RocketMQBinderAutoConfiguration.class))
.withPropertyValues(
"spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876",
"spring.cloud.stream.bindings.output.destination=TopicOrderTest",
"spring.cloud.stream.bindings.output.content-type=application/json",
"spring.cloud.stream.bindings.input1.destination=TopicOrderTest",
"spring.cloud.stream.bindings.input1.content-type=application/json",
"spring.cloud.stream.bindings.input1.group=test-group1",
"spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true",
"spring.cloud.stream.bindings.input1.consumer.maxAttempts=1",
"spring.cloud.stream.bindings.input2.destination=TopicOrderTest",
"spring.cloud.stream.bindings.input2.content-type=application/json",
"spring.cloud.stream.bindings.input2.group=test-group2",
"spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false",
"spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tag1");
@Test
public void testProperties() {
this.contextRunner.run(context -> {
RocketMQBinderConfigurationProperties binderConfigurationProperties = context
.getBean(RocketMQBinderConfigurationProperties.class);
assertThat(binderConfigurationProperties.getNamesrvAddr())
.isEqualTo("127.0.0.1:9876");
RocketMQExtendedBindingProperties bindingProperties = context
.getBean(RocketMQExtendedBindingProperties.class);
assertThat(
bindingProperties.getExtendedConsumerProperties("input2").getTags())
.isEqualTo("tag1");
assertThat(bindingProperties.getExtendedConsumerProperties("input2")
.getOrderly()).isFalse();
assertThat(bindingProperties.getExtendedConsumerProperties("input1")
.getOrderly()).isTrue();
});
}
}
Loading…
Cancel
Save