Merge pull request #117 from fangjian0423/master

Update Sentinel Docs and Add RocketMQ Docs
pull/898/head
xiaojing 6 years ago committed by GitHub
commit 661d5e76e9

@ -167,7 +167,7 @@ private ReadableDataSource dataSource;
如果 `ApplicationContext` 中存在超过1个 `ReadableDataSource` bean那么不会加载这些 `ReadableDataSource` 中的任意一个。 只有在 `ApplicationContext` 存在一个 `ReadableDataSource` 的情况下才会生效。
如果数据生效并且规则成功加载,控制台会打印类似如下信息:
如果数据生效并且规则成功加载,控制台会打印类似如下信息:
```
[Sentinel Starter] load 3 flow rules
@ -216,6 +216,13 @@ NOTE: d1, ds2, ds3, ds4 是 `ReadableDataSource` 的名字,可随意编写。
用户使用这种配置的时候只需要填写正确的json或xml就行有任何不合理的信息都会在日志里打印出来。
如果数据源生效并且规则成功加载,控制台会打印类似如下信息:
```
[Sentinel Starter] DataSource ds1-sentinel-file-datasource load 3 DegradeRule
[Sentinel Starter] DataSource ds2-sentinel-nacos-datasource load 2 FlowRule
```
NOTE: 默认情况下xml格式是不支持的。需要添加 `jackson-dataformat-xml` 依赖后才会自动生效。
关于Sentinel动态数据源的实现原理参考 https://github.com/alibaba/Sentinel/wiki/%E5%8A%A8%E6%80%81%E8%A7%84%E5%88%99%E6%89%A9%E5%B1%95[动态规则扩展]

@ -0,0 +1,259 @@
# RocketMQ Example
## 项目说明
本项目演示如何使用 RocketMQ Binder 完成 Spring Cloud 应用消息的订阅和发布。
[RocketMQ](https://rocketmq.apache.org/) 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。
在说明RocketMQ的示例之前我们先了解一下 Spring Cloud Stream 中的Binder和Binding概念。
Binder: 跟外部消息中间件集成的组件用来创建Binding各消息中间件都有自己的Binder实现。
比如 `Kafka` 的实现 `KafkaMessageChannelBinder` `RabbitMQ` 的实现 `RabbitMessageChannelBinder` 以及 `RocketMQ` 的实现 `RocketMQMessageChannelBinder`
Binding: 包括Input Binding和Output Binding。
Binding在消息中间件与应用程序提供的Provider和Consumer之间提供了一个桥梁实现了开发者只需使用应用程序的Provider或Consumer生产或消费数据即可屏蔽了开发者与底层消息中间件的接触。
下图是Spring Cloud Stream的架构设计。
![](https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/images/SCSt-overview.png)
## 示例
### 如何接入
在启动示例进行演示之前,我们先了解一下 Spring Cloud 应用如何接入 RocketMQ Binder。
> **注意:本章节只是为了便于您理解接入方式,本示例代码中已经完成****接入工作,您无需再进行修改。**
1. 首先,修改 `pom.xml` 文件,引入 RocketMQ Stream Starter。
```xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
```
2. 配置Input和Output的Binding信息并配合`@EnableBinding`注解使其生效
```java
@SpringBootApplication
@EnableBinding({ Source.class, Sink.class })
public class RocketMQApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQApplication.class, args);
}
}
```
配置Binding信息
```properties
# 配置rocketmq的nameserver地址
spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876
# 定义name为output的binding
spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=application/json
# 定义name为input的binding
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.group=test-group
```
3. 消息发送及消息订阅
### 下载并启动 RocketMQ
在接入RocketMQ Binder之前首先需要启动RocketMQ的Name Server和Broker。
1. 下载[RocketMQ最新的二进制文件](https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip),并解压
2. 启动 Name Server
```bash
sh bin/mqnamesrv
```
3. 启动Broker
```bash
sh bin/mqbroker -n localhost:9876
```
4. 创建Topic: test-topic
```bash
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t test-topic
```
### 应用启动
1. 增加配置,在应用的 /src/main/resources/application.properties 中添加基本配置信息
```properties
spring.application.name=rocketmq-example
server.port=28081
```
2. 启动应用,支持 IDE 直接启动和编译打包后启动。
1. IDE直接启动找到主类 `RocketMQApplication`,执行 main 方法启动应用。
2. 打包编译后启动:首先执行 `mvn clean package` 将工程编译打包,然后执行 `java -jar rocketmq-example.jar`启动应用。
### 消息处理
使用name为output对应的binding发送消息到test-topic这个topic。
使用2个input binding订阅数据。
input1: 订阅topic为test-topic的消息顺序消费所有消息(顺序消费的前提是所有消息都在一个MessageQueue中)
input2: 订阅topic为test-topic的消息异步消费tags为tagStr的消息Consumer端线程池个数为20
配置信息如下:
```properties
spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876
spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.bindings.input1.destination=test-topic
spring.cloud.stream.bindings.input1.content-type=application/json
spring.cloud.stream.bindings.input1.group=test-group1
spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true
spring.cloud.stream.bindings.input2.destination=test-topic
spring.cloud.stream.bindings.input2.content-type=application/json
spring.cloud.stream.bindings.input2.group=test-group2
spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false
spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tagStr
spring.cloud.stream.bindings.input2.consumer.concurrency=20
```
#### 消息发送
使用MessageChannel进行消息发送
```java
public class ProducerRunner implements CommandLineRunner {
@Autowired
private MessageChannel output; // 获取name为output的binding
@Override
public void run(String... args) throws Exception {
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_TAGS, "tagStr");
Message message = MessageBuilder.createMessage(msg, new MessageHeaders(headers));
output.send(message);
}
}
```
或者使用RocketMQ原生的API进行消息发送:
```java
public class RocketMQProducer {
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message msg = new Message("test-topic", "tagStr", "message from rocketmq producer".getBytes());
producer.send(msg);
}
```
#### 消息接收
使用`@StreamListener`注解接收消息:
```java
@Service
public class ReceiveService {
@StreamListener("input1")
public void receiveInput1(String receiveMsg) {
System.out.println("input1 receive: " + receiveMsg);
}
@StreamListener("input2")
public void receiveInput2(String receiveMsg) {
System.out.println("input2 receive: " + receiveMsg);
}
}
```
## Endpoint 信息查看
Spring Boot 应用支持通过 Endpoint 来暴露相关信息RocketMQ Stream Starter 也支持这一点。
在使用之前需要在 Maven 中添加 `spring-boot-starter-actuator`依赖,并在配置中允许 Endpoints 的访问。
* Spring Boot 1.x 中添加配置 `management.security.enabled=false`
* Spring Boot 2.x 中添加配置 `management.endpoints.web.exposure.include=*`
Spring Boot 1.x 可以通过访问 http://127.0.0.1:18083/rocketmq-binder 来查看 RocketMQ Binder Endpoint 的信息。Spring Boot 2.x 可以通过访问 http://127.0.0.1:28081/acutator/rocketmq-binder 来访问。
这里会统计消息最后一次发送的数据,消息发送成功或失败的次数,消息消费成功或失败的次数等数据。
```json
{
"runtime": {
"lastSend.timestamp": 1542786623915
},
"metrics": {
"scs-rocketmq.consumer.test-topic.totalConsumed": {
"count": 11
},
"scs-rocketmq.consumer.test-topic.totalConsumedFailures": {
"count": 0
},
"scs-rocketmq.producer.test-topic.totalSentFailures": {
"count": 0
},
"scs-rocketmq.consumer.test-topic.consumedPerSecond": {
"count": 11,
"fifteenMinuteRate": 0.012163847780107841,
"fiveMinuteRate": 0.03614605351360527,
"meanRate": 0.3493213353657594,
"oneMinuteRate": 0.17099243039490175
},
"scs-rocketmq.producer.test-topic.totalSent": {
"count": 5
},
"scs-rocketmq.producer.test-topic.sentPerSecond": {
"count": 5,
"fifteenMinuteRate": 0.005540151995103271,
"fiveMinuteRate": 0.01652854617838251,
"meanRate": 0.10697493212602836,
"oneMinuteRate": 0.07995558537067671
},
"scs-rocketmq.producer.test-topic.sentFailuresPerSecond": {
"count": 0,
"fifteenMinuteRate": 0.0,
"fiveMinuteRate": 0.0,
"meanRate": 0.0,
"oneMinuteRate": 0.0
},
"scs-rocketmq.consumer.test-topic.consumedFailuresPerSecond": {
"count": 0,
"fifteenMinuteRate": 0.0,
"fiveMinuteRate": 0.0,
"meanRate": 0.0,
"oneMinuteRate": 0.0
}
}
}
```
## More
RocketMQ 是一款功能强大的分布式消息系统,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
此 Demo 仅演示了 RocketMQ 与 Spring Cloud Stream 结合后的使用,更多 RocketMQ 相关的信息,请参考 [RocketMQ 项目](https://github.com/apache/rocketmq)。
如果您对 spring cloud starter stream rocketmq 有任何建议或想法,欢迎在 issue 中或者通过其他社区渠道向我们提出。

@ -26,6 +26,8 @@ spring.cloud.stream.rocketmq.bindings.input3.consumer.tags=tagObj
spring.cloud.stream.bindings.input3.consumer.concurrency=20
spring.cloud.stream.bindings.input3.consumer.maxAttempts=1
spring.application.name=rocketmq-example
server.port=28081
management.endpoints.web.exposure.include=*

@ -196,77 +196,33 @@ Sentinel 控制台支持实时监控查看,您可以通过 Sentinel 控制台
Sentinel 内部提供了[动态规则的扩展实现 ReadableDataSource](https://github.com/alibaba/Sentinel/wiki/%E5%8A%A8%E6%80%81%E8%A7%84%E5%88%99%E6%89%A9%E5%B1%95#datasource-%E6%89%A9%E5%B1%95)。
Sentinel starter 整合了目前存在的几类 DataSource。只需要在配置文件中进行相关配置即可在 Spring 容器中自动注册 DataSource。
比如要定义一个 `FileRefreshableDataSource`,配置如下:
spring.cloud.sentinel.datasource.type=file
spring.cloud.sentinel.datasource.recommendRefreshMs=2000
spring.cloud.sentinel.datasource.bufSize=2048
spring.cloud.sentinel.datasource.charset=utf-8
spring.cloud.sentinel.datasource.converter=myParser
spring.cloud.sentinel.datasource.file=/Users/you/rule.json
然后使用`@SentinelDataSource` 注解修饰 DataSource 即可注入:
@SentinelDataSource("spring.cloud.sentinel.datasource")
private ReadableDataSource dataSource;
`@SentinelDataSource` 注解的 value 属性可以不填。默认值就是 `spring.cloud.sentinel.datasource`
`value` 属性代表配置前缀。示例中会去找 `spring.cloud.sentinel.datasource.xxx` 相关的配置。
`spring.cloud.sentinel.datasource.type` 就是对应的 DataSource 类型。
`spring.cloud.sentinel.datasource.recommendRefreshMs` 里的 `recommendRefreshMs` 对应相关 DataSource 的属性。
`spring.cloud.sentinel.datasource.converter`代表 `Converter` 在 Spring 容器里的 name。如果没找到会抛出异常。
type目前支持file, nacos, zk, apollo。其中nacoszkapollo的使用需要加上对应的依赖`sentinel-datasource-nacos`, `sentinel-datasource-zookeeper`, `sentinel-datasource-apollo`
### 自定义DataSource
自定义DataSource只需要两步。
1. 定义DataSource
public class CustomDataSource implements ReadableDataSource {
private String fieldA;
private String fieldB;
...
}
2. 装配DataSource。有两种方式处理。
* 直接构造DataSource
@Bean
public CustomDataSource customDataSource() {
CustomDataSource customDataSource =
new CustomDataSource();
customDataSource.setFieldA("valueA");
customDataSource.setFieldB("valueB");
...
return customDataSource;
}
* 在classpath:/META-INF/sentinel-datasource.properties中管理DataSource信息
custom = yourpackage.CustomDataSource
在application.properties中定义DataSource
spring.cloud.sentinel.datasource.type = custom
spring.cloud.sentinel.datasource.fieldA = valueA
spring.cloud.sentinel.datasource.fieldB = valueB
注意由于目前Sentinel的AbstractDataSource需要有个Converter作为构造函数中的参数并且它的子类的构造都是通过多个参数的构造函数构造的。
所以目前所有的Sentinel starter中的DataSource都是基于FactoryBean并且通过设置属性构造的。如果有这方面的需求需要再多加一个registerFactoryBean过程。
SentinelDataSourceRegistry.registerFactoryBean("custeom", CustomDataSourceFactoryBean.class);
如果自定义DataSource可以注入属性那么没有必要使用SentinelDataSourceRegistry注册FactoryBean。
Sentinel starter 整合了目前存在的几类 ReadableDataSource。只需要在配置文件中进行相关配置即可在 Spring 容器中自动注册 DataSource。
比如要定义两个ReadableDataSource分别是 `FileRefreshableDataSource``NacosDataSource`,配置如下:
```properties
spring.cloud.sentinel.datasource.ds1.file.file=classpath: degraderule.json
spring.cloud.sentinel.datasource.ds1.file.data-type=json
spring.cloud.sentinel.datasource.ds2.nacos.server-addr=localhost:8848
spring.cloud.sentinel.datasource.ds2.nacos.dataId=sentinel
spring.cloud.sentinel.datasource.ds2.nacos.groupId=DEFAULT_GROUP
spring.cloud.sentinel.datasource.ds2.nacos.data-type=json
```
`ds1``ds2` 表示ReadableDataSource的名称可随意编写。`ds1` 和 `ds2` 后面的 `file``nacos` 表示ReadableDataSource的类型。
目前支持`file`, `nacos`, `zk`, `apollo` 这4种类型。
其中`nacos``zk``apollo`这3种类型的使用需要加上对应的依赖`sentinel-datasource-nacos`, `sentinel-datasource-zookeeper`, `sentinel-datasource-apollo`
当ReadableDataSource加载规则数据成功的时候控制台会打印出相应的日志信息
```
[Sentinel Starter] DataSource ds1-sentinel-file-datasource load 3 DegradeRule
[Sentinel Starter] DataSource ds2-sentinel-nacos-datasource load 2 FlowRule
```
## More
Sentinel 是一款功能强大的中间件,从流量控制,熔断降级,系统负载保护等多个维度保护服务的稳定性。此 Demo 仅演示了 使用 Sentinel 作为限流工具的使用,更多 Sentinel 相关的信息,请参考 [Sentinel 项目](https://github.com/alibaba/Sentinel)。

@ -71,7 +71,7 @@ Before we start the demo, let's learn how to connect Sentinel to a Spring Cloud
2. Start the application in IDE or by building a fatjar.
1. Start in IDE: Find main class `ServiceApplication`, and execute the main method.
2. Build a fatjarExecute command `mvn clean package` to build a fatjarand run command `java -jar sentinel-core-example.jar` to start the application.
2. Build a fatjarExecute command `mvn clean package` to build a fatjar, and run command `java -jar sentinel-core-example.jar` to start the application.
### Invoke Service
@ -86,7 +86,7 @@ The screenshot belows shows invoke success:
1. Open http://localhost:8080 in browser, and you can find a Sentinel-Example Application has been registered to the dashboard.
**Note: If you can't find your application in the dashboard, invoke a method that has been defined as a Sentinel Resourcefor Sentinel uses lazy load strategy.**
**Note: If you can't find your application in the dashboard, invoke a method that has been defined as a Sentinel Resource, for Sentinel uses lazy load strategy.**
<p align="center"><img src="https://cdn.nlark.com/lark/0/2018/png/54319/1532315951819-9ffd959e-0547-4f61-8f06-91374cfe7f21.png" width="1000" heigh='400' ></p>
@ -171,73 +171,30 @@ Sentinel provide [ReadableDataSource](https://github.com/alibaba/Sentinel/blob/m
Sentinel starter integrated 4 DataSources provided by Sentinel. It will be register into Spring Context if you write some configs in `application.properties`.
If you want to define FileRefreshableDataSource:
If you want to define `FileRefreshableDataSource` and `NacosDataSource`, see the code below:
spring.cloud.sentinel.datasource.type=file
spring.cloud.sentinel.datasource.recommendRefreshMs=2000
spring.cloud.sentinel.datasource.bufSize=2048
spring.cloud.sentinel.datasource.charset=utf-8
spring.cloud.sentinel.datasource.converter=myParser
spring.cloud.sentinel.datasource.file=/Users/you/rule.json
then use `@SentinelDataSource` to annotate DataSource:
@SentinelDataSource("spring.cloud.sentinel.datasource")
private ReadableDataSource dataSource;
The value() of `@SentinelDataSource` is not required, it means the prefix of configuration. Default value is `spring.cloud.sentinel.datasource`.
spring.cloud.sentinel.datasource.type means the type of DataSource.
```properties
spring.cloud.sentinel.datasource.ds1.file.file=classpath: degraderule.json
spring.cloud.sentinel.datasource.ds1.file.data-type=json
spring.cloud.sentinel.datasource.recommendRefreshMs means the recommendRefreshMs property of specified DataSource.
spring.cloud.sentinel.datasource.ds2.nacos.server-addr=localhost:8848
spring.cloud.sentinel.datasource.ds2.nacos.dataId=sentinel
spring.cloud.sentinel.datasource.ds2.nacos.groupId=DEFAULT_GROUP
spring.cloud.sentinel.datasource.ds2.nacos.data-type=json
```
spring.cloud.sentinel.datasource.converter means the name of spring bean that type is Converter. If the bean is not exists, will throw exception.
Now datasource type support 4 categories: file, nacos, zk, apollo. If you want to using nacos, zk or apollo, you should add `sentinel-datasource-nacos`, `sentinel-datasource-zookeeper` or `sentinel-datasource-apollo` dependency.
`ds1` and `ds2` means the name of ReadableDataSource, you can write whatever you want. The `file` and `nacos` after name `ds1` and `ds2` means the type of ReadableDataSource.
Now ReadableDataSource type support 4 categories: `file`, `nacos`, `zk` and `apollo`.
### User-defined DataSource
If you want to use `nacos`, `zk` or `apollo` ReadableDataSource, you could add `sentinel-datasource-nacos`, `sentinel-datasource-zookeeper` or `sentinel-datasource-apollo` dependency.
User-defined DataSource need 2 steps.
When ReadableDataSource load rule data successfully, console will print some logs:
1. Define DataSource
public class CustomDataSource implements ReadableDataSource {
private String fieldA;
private String fieldB;
...
}
2. Assemble DataSource. There are 2 ways to do this.
* Construct DataSource directly
@Bean
public CustomDataSource customDataSource() {
CustomDataSource customDataSource = new CustomDataSource();
customDataSource.setFieldA("valueA");
customDataSource.setFieldB("valueB");
...
return customDataSource;
}
* define DataSource metadata in `classpath:/META-INF/sentinel-datasource.properties`
custom = yourpackage.CustomDataSource
define configuration in `application.properties`
spring.cloud.sentinel.datasource.type = custom
spring.cloud.sentinel.datasource.fieldA = valueA
spring.cloud.sentinel.datasource.fieldB = valueB
Note: The AbstractDataSource of Sentinel need a Converter as a constructor param and the subclass of AbstractDataSource was construct by multi-param constructor.
Now All DataSources in starter was construct by FactoryBean. If you want to do it in this way, you should register FactoryBean by SentinelDataSourceRegistry.
SentinelDataSourceRegistry.registerFactoryBean("custeom", CustomDataSourceFactoryBean.class);
It is no need to using SentinelDataSourceRegistry to register FactoryBean if your User-defined DataSource can inject fields.
```
[Sentinel Starter] DataSource ds1-sentinel-file-datasource load 3 DegradeRule
[Sentinel Starter] DataSource ds2-sentinel-nacos-datasource load 2 FlowRule
```
## More
For more information about Sentinel, see [Sentinel Project](https://github.com/alibaba/Sentinel).

@ -1,8 +1,7 @@
spring.application.name=sentinel-example
server.port=18083
management.endpoints.web.exposure.include=*
spring.cloud.sentinel.transport.port=8721
spring.cloud.sentinel.transport.dashboard=localhost:9999
spring.cloud.sentinel.transport.dashboard=localhost:8080
spring.cloud.sentinel.eager=true
spring.cloud.sentinel.datasource.ds1.file.file=classpath: flowrule.json

@ -57,7 +57,7 @@ public class ApolloDataSourceFactoryBean implements FactoryBean<ApolloDataSource
return converter;
}
public void setConverter(Converter Converter) {
this.converter = Converter;
public void setConverter(Converter converter) {
this.converter = converter;
}
}

@ -70,7 +70,7 @@ public class FileRefreshableDataSourceFactoryBean
return converter;
}
public void setConverter(Converter Converter) {
this.converter = Converter;
public void setConverter(Converter converter) {
this.converter = converter;
}
}

@ -56,7 +56,7 @@ public class NacosDataSourceFactoryBean implements FactoryBean<NacosDataSource>
return converter;
}
public void setConverter(Converter Converter) {
this.converter = Converter;
public void setConverter(Converter converter) {
this.converter = converter;
}
}

@ -75,7 +75,7 @@ public class ZookeeperDataSourceFactoryBean implements FactoryBean<ZookeeperData
return converter;
}
public void setConverter(Converter Converter) {
this.converter = Converter;
public void setConverter(Converter converter) {
this.converter = converter;
}
}

Loading…
Cancel
Save