SpringCloud之消息总线Spring Cloud Bus实例代码

一、简介

在微服务架构的系统中,我们通常会使用轻量级的消息代理来构建一个共用的消息主题让系统中所有微服务实例都连接上来,由于该主题中产生的消息会被所有实例监听和消费,所以我们称它为消息总线。

二、消息代理

消息代理(Message Broker)是一种消息验证、传输、路由的架构模式。它在应用程序之间起到通信调度并最小化应用之间的依赖的作用,使得应用程序可以高效地解耦通信过程。消息代理是一个中间件产品,它的核心是一个消息的路由程序,用来实现接收和分发消息, 并根据设定好的消息处理流来转发给正确的应用。 它包括独立的通信和消息传递协议,能够实现组织内部和组织间的网络通信。设计代理的目的就是为了能够从应用程序中传入消息,并执行一些特别的操作,下面这些是在企业应用中,我们经常需要使用消息代理的场景:

  1. 将消息路由到一个或多个目的地。
  2. 消息转化为其他的表现方式。
  3. 执行消息的聚集、消息的分解,并将结果发送到它们的目的地,然后重新组合响应返回给消息用户。
  4. 调用Web服务来检索数据。
  5. 响应事件或错误。
  6. 使用发布-订阅模式来提供内容或基千主题的消息路由。

目前已经有非常多的开源产品可以供大家使用, 比如:

  1. ActiveMQKafka
  2. RabbitMQ
  3. RocketMQ
  4. 等......

三、SpringCloud+RabbitMQ

(1)RabbitMQ简介、安装不赘述。

(2)pom.xml

<dependencies> 
 <dependency> 
 <groupId>org.springframework.boot</groupId> 
 <artifactId>spring-boot-starter-amqp</artifactId> 
 </dependency> 
 
 <dependency> 
 <groupId>org.springframework.boot</groupId> 
 <artifactId>spring-boot-starter-test</artifactId> 
 <scope>test</scope> 
 </dependency> 
</dependencies> 

(3)application.yml

spring: 
 application: 
 name: rabbitmq-hello 
 rabbitmq: 
 host: ***.***.***.*** 
 port: 5672 
 username: guest 
 password: guest 

(4)发送者Sender

@Component 
public class Sender { 
 
 private static final Logger log = LoggerFactory.getLogger(Sender.class); 
 @Autowired 
 private AmqpTemplate amqpTemplate; 
 
 public void send() { 
 String context = "hello " + new Date(); 
 log.info("Sender : " + context); 
 this.amqpTemplate.convertAndSend("hello", context); 
 } 
} 

(5)接受者Receiver

@Component 
@RabbitListener(queues = "hello") 
public class Receiver { 
 
 private static final Logger log = LoggerFactory.getLogger(Receiver.class); 
 
 @RabbitHandler 
 public void process(String hello) { 
 log.info("Receiver : " + hello); 
 } 
} 

(6)创建RabbitMQ的配置类 RabbitConfig

@Configuration 
public class RabbitConfig { 
 
 @Bean 
 public Queue helloQueue(){ 
 return new Queue("hello"); 
 } 
} 

(7)创建单元测试类, 用来调用消息生产

@RunWith(SpringJUnit4ClassRunner.class) 
@SpringBootTest(classes = SpringcloudbusrabbitmqApplication.class) 
public class HelloApplicationTests { 
 
 @Autowired 
 private Sender sender; 
 
 @Test 
 public void hello() throws Exception { 
 sender.send(); 
 } 
} 

(8)测试,执行HelloApplicationTests


(9)访问host:15672


四、改造Config-Client(整合springcloud bus)

(1)pom.xml

<dependencies> 
 <dependency> 
 <groupId>org.springframework.cloud</groupId> 
 <artifactId>spring-cloud-starter-config</artifactId> 
 </dependency> 
 <dependency> 
 <groupId>org.springframework.boot</groupId> 
 <artifactId>spring-boot-starter-web</artifactId> 
 </dependency> 
 <dependency> 
 <groupId>org.springframework.cloud</groupId> 
 <artifactId>spring-cloud-starter-eureka</artifactId> 
 </dependency> 
 <dependency> 
 <groupId>org.springframework.cloud</groupId> 
 <artifactId>spring-cloud-starter-bus-amqp</artifactId> 
 </dependency> 
 <dependency> 
 <groupId>org.springframework.boot</groupId> 
 <artifactId>spring-boot-starter-actuator</artifactId> 
 </dependency> 
 
 <dependency> 
 <groupId>org.springframework.boot</groupId> 
 <artifactId>spring-boot-starter-test</artifactId> 
 <scope>test</scope> 
 </dependency> 
</dependencies> 

(2)bootstrap.properties

spring.application.name=configspace 
spring.cloud.config.label=master 
spring.cloud.config.profile=dev 
spring.cloud.config.uri= http://localhost:5588/ 
eureka.client.serviceUrl.defaultZone=http://localhost:5555/eureka/ 
 
server.port=5589 
 
spring.rabbitmq.host=118.89.237.88 
spring.rabbitmq.port= 5672 
spring.rabbitmq.username=guest 
spring.rabbitmq.password=guest 
 
management.security.enabled=false 

(3)其他不用改变

五、测试

(1)测试准备

一个服务注册中心,EUREKASERVER,端口为5555;

一个分布式配置中心,ConfigServer,端口为5588;

二个分布式配置,ConfigClient,端口为5589、5590;(2)访问http://localhost:5589/from


(3)访问http://localhost:5590/from


RabbitMQ:


(4)去仓库修改password的值

from=git-dev-v1.0 by springcloud config-server 
username=springcloud 
password=1234567890 

(5)POST请求http://localhost:5589/bus/refresh或者http://localhost:5590/bus/refresh


成功请求后config-client会重新读取配置文件


(6)再次访问

  1. 如果POST请求的是:http://localhost:5589/bus/refresh,请访问http://localhost:5590/from
  2. 如果访问出现401,则配置需要加上management.security.enabled=false


如果POST请求的是:http://localhost:5590/bus/refresh,请访问http://localhost:5589/from


另/bus/refresh接口可以指定服务,即使用“username”参数,比如 “/bus/refresh?destination=username:**”即刷新服务名为username的所有服务,不管ip地址。

(7)架构


(8)架构调整

既然SpringCloud Bus的/bus/refresh接口提供了针对服务和实例进行配置更新的参数,那么我们的架构也可以相应做出一些调整。在之前的架构中,服务的配置更新需要通过向具体服务中的某个实例发送请求,再触发对整个服务集群的配置更新。虽然能实现功能,但是这样的结果是,我们指定的应用实例会不同千集群中的其他应用实例,这样会增加集群内部的复杂度,不利于将来的运维工作。比如, 需要对服务实例进行迁移,那么我们不得不修改Web Hook中的配置等。所以要尽可能地让服务集群中的各个节点是对等的。

因此, 我们将之前的架构做了 一些调整, 如下图所示:


主要做了以下这些改动:

  1. 在ConfigServer中也引入SpringCloud Bus,将配置服务端也加入到消息总线中来。
  2. /bus/refresh请求不再发送到具体服务实例上,而是发送给Config Server,并通过des巨nation参数来指定需要更新配置的服务或实例。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持编程小技巧。

代码技巧

转载请关注公众号:代码技巧 回复:授权

本文链接地址:https://www.oudahe.com/p/51995/