fix: integrated rocketmq streams upgrade to springboot 3.0

pull/2992/head
musi 2 years ago
parent 3aecce23ab
commit b2944f4dff

@ -3,16 +3,16 @@ spring:
url: jdbc:mysql://integrated-mysql:3306/integrated_praise?useSSL=false&characterEncoding=utf8
cloud:
stream:
bindings:
praise-input:
destination: PRAISE-TOPIC-01
content-type: application/json
group: praise-consumer-group-PRAISE-TOPIC-01
function:
definition: consumer;
rocketmq:
binder:
name-server: rocketmq:9876
bindings:
praise-input:
consumer-in-0:
consumer:
pullInterval: 4000
pullBatchSize: 4
messageModel: CLUSTERING
bindings:
consumer-in-0:
destination: PRAISE-TOPIC-01
group: praise-consumer

@ -1,5 +1,5 @@
FROM openjdk:8
ADD /target/integrated-praise-consumer-2.2.9-SNAPSHOT.jar /app.jar
FROM openjdk:17
ADD /target/integrated-praise-consumer-*.jar /app.jar
RUN bash -c 'touch /app.jar'
EXPOSE 8014
ENTRYPOINT ["java", "-jar","/app.jar"]

@ -16,7 +16,7 @@
<properties>
<mysql.version>8.0.28</mysql.version>
<druid.version>1.1.10</druid.version>
<mybatis.version>2.1.2</mybatis.version>
<mybatis.version>3.0.1</mybatis.version>
</properties>
<dependencies>
@ -25,6 +25,11 @@
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>

@ -16,17 +16,14 @@
package com.alibaba.cloud.integration.consumer;
import com.alibaba.cloud.integration.consumer.message.PraiseSink;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
/**
* @author TrevorLink
*/
@SpringBootApplication
@EnableBinding(PraiseSink.class)
public class PraiseConsumerApplication {
public static void main(String[] args) {

@ -0,0 +1,20 @@
package com.alibaba.cloud.integration.consumer.listener;
import java.util.function.Consumer;
import com.alibaba.cloud.integration.consumer.message.PraiseMessage;
import com.alibaba.cloud.integration.consumer.service.PraiseService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
@Configuration
public class ListenerAutoConfiguration {
@Bean
public Consumer<Message<PraiseMessage>> consumer(PraiseService praiseService) {
return msg -> {
praiseService.praiseItem(msg.getPayload().getItemId());
};
}
}

@ -1,42 +0,0 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.integration.consumer.listener;
import com.alibaba.cloud.integration.consumer.message.PraiseMessage;
import com.alibaba.cloud.integration.consumer.message.PraiseSink;
import com.alibaba.cloud.integration.consumer.service.PraiseService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
/**
* @author TrevorLink
*/
@Component
public class PraiseConsumer {
@Autowired
private PraiseService praiseService;
@StreamListener(PraiseSink.PRAISE_INPUT)
public void onMessage(@Payload PraiseMessage message) {
praiseService.praiseItem(message.getItemId());
}
}

@ -1,35 +0,0 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.integration.consumer.message;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
* @author TrevorLink
*/
public interface PraiseSink {
/**
* rocketmq input name.
*/
String PRAISE_INPUT = "praise-input";
@Input(PRAISE_INPUT)
SubscribableChannel praiseInput();
}

@ -1,5 +1,5 @@
FROM openjdk:8
ADD /target/integrated-praise-provider-2.2.9-SNAPSHOT.jar /app.jar
FROM openjdk:17
ADD /target/integrated-praise-provider-*.jar /app.jar
RUN bash -c 'touch /app.jar'
EXPOSE 8015
ENTRYPOINT ["java", "-jar","/app.jar"]

@ -19,6 +19,11 @@
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>

@ -16,17 +16,13 @@
package com.alibaba.cloud.integration.provider;
import com.alibaba.cloud.integration.provider.message.PraiseSource;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
/**
* @author TrevorLink
*/
@SpringBootApplication
@EnableBinding(PraiseSource.class)
public class PraiseProviderApplication {
public static void main(String[] args) {

@ -17,9 +17,9 @@
package com.alibaba.cloud.integration.provider.controller;
import com.alibaba.cloud.integration.provider.message.PraiseMessage;
import com.alibaba.cloud.integration.provider.message.PraiseSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
@ -33,9 +33,9 @@ import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/praise")
public class PraiseController {
private static final String BINDING_NAME = "praise-output";
@Autowired
private PraiseSource praiseSource;
private StreamBridge streamBridge;
@GetMapping({ "/rocketmq", "/sentinel" })
public boolean praise(@RequestParam Integer itemId) {
@ -43,7 +43,7 @@ public class PraiseController {
message.setItemId(itemId);
Message<PraiseMessage> praiseMessage = MessageBuilder.withPayload(message)
.build();
return praiseSource.praiseOutput().send(praiseMessage);
return streamBridge.send(BINDING_NAME, praiseMessage);
}
}

@ -1,30 +0,0 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.integration.provider.message;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* @author TrevorLink
*/
public interface PraiseSource {
@Output("praise-output")
MessageChannel praiseOutput();
}
Loading…
Cancel
Save