本指南将引导您完成设置RabbitMQ AMQP服务器以发布和订阅消息的过程,并创建一个Spring Boot应用程序与该RabbitMQ服务器进行交互的过程。

你会建立什么

您将构建一个应用程序,该应用程序使用Spring AMQP发布消息,并使用来RabbitTemplate订阅POJO上的消息MessageListenerAdapter

你需要什么

如何完成本指南

像大多数Spring入门指南一样,您可以从头开始并完成每个步骤,也可以绕过您已经熟悉的基本设置步骤。无论哪种方式,您最终都可以使用代码。

从头开始,请继续设置RabbitMQ Broker

跳过基础知识,请执行以下操作:

完成后,您可以根据中的代码检查结果gs-messaging-rabbitmq/complete

设置RabbitMQ经纪人

在构建消息传递应用程序之前,需要设置服务器以处理接收和发送消息。

RabbitMQ是AMQP服务器。该服务器可从https://www.rabbitmq.com/download.html免费获得。您可以手动下载它,或者,如果您将Mac与Homebrew一起使用,则可以在终端窗口中运行以下命令来下载它:

brew install rabbitmq

通过在终端窗口中运行以下命令来打开服务器包装并使用默认设置启动它:

rabbitmq-server

您应该看到类似于以下内容的输出:

            RabbitMQ 3.1.3. Copyright (C) 2007-2013 VMware, Inc.
##  ##      Licensed under the MPL.  See https://www.rabbitmq.com/
##  ##
##########  Logs: /usr/local/var/log/rabbitmq/[email protected]
######  ##        /usr/local/var/log/rabbitmq/[email protected]
##########
            Starting broker... completed with 6 plugins.

如果您在本地运行Docker,也可以使用Docker Compose快速启动RabbitMQ服务器。Github项目docker-compose.yml的根目录中有一个complete。这很简单,如下面的清单所示:

rabbitmq:
  image: rabbitmq:management
  ports:
    - "5672:5672"
    - "15672:15672"

在当前目录中使用此文件,您可以运行docker-compose up以使RabbitMQ在容器中运行。

从Spring Initializr开始

如果您使用Maven,请访问Spring Initializr以生成具有所需依赖项的新项目(Spring for RabbitMQ)。

以下清单显示了pom.xml选择Maven时创建的文件:

<?xml版本=“ 1.0”编码=“ UTF-8”?>
<project xmlns =“ http://maven.apache.org/POM/4.0.0” xmlns:xsi =“ http://www.w3.org/2001/XMLSchema-instance”
	xsi:schemaLocation =“ http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd”>
	<modelVersion> 4.0.0 </ modelVersion>
	<父母>
		<groupId> org.springframework.boot </ groupId>
		<artifactId> spring-boot-starter-parent </ artifactId>
		<version> 2.4.3 </ version>
		<relativePath /> <!-从存储库中查找父级->
	</ parent>
	<groupId> com.example </ groupId>
	<artifactId> messaging-rabbitmq </ artifactId>
	<version> 0.0.1-SNAPSHOT </ version>
	<name> messaging-rabbitmq </ name>
	<description> Spring Boot的演示项目</ description>
	<属性>
		<java.version> 1.8 </java.version>
	</ properties>
	<依赖项>
		<依赖性>
			<groupId> org.springframework.boot </ groupId>
			<artifactId> spring-boot-starter-amqp </ artifactId>
		</ dependency>

		<依赖性>
			<groupId> org.springframework.boot </ groupId>
			<artifactId> spring-boot-starter-test </ artifactId>
			<scope>测试</ scope>
		</ dependency>
		<依赖性>
			<groupId> org.springframework.amqp </ groupId>
			<artifactId> spring-rabbit-test </ artifactId>
			<scope>测试</ scope>
		</ dependency>
	</ dependencies>

	<内部版本>
		<插件>
			<插件>
				<groupId> org.springframework.boot </ groupId>
				<artifactId> spring-boot-maven-plugin </ artifactId>
			</ plugin>
		</ plugins>
	</ build>

</ project>

如果您使用Gradle,请访问Spring Initializr以生成具有所需依赖项的新项目(Spring for RabbitMQ)。

以下清单显示了build.gradle选择Gradle时创建的文件:

插件{
	id'org.springframework.boot'版本'2.4.3'
	id'io.spring.dependency-management'版本'1.0.11.RELEASE'
	id'java'
}

组='com.example'
版本='0.0.1-SNAPSHOT'
sourceCompatibility ='1.8'

储存库{
	mavenCentral()
}

依赖项{
	实现'org.springframework.boot:spring-boot-starter-amqp'
	testImplementation'org.springframework.boot:spring-boot-starter-test'
	testImplementation'org.springframework.amqp:spring-rabbit-test'
}

测试 {
	useJUnitPlatform()
}

手动初始化(可选)

如果要手动初始化项目而不是使用前面显示的链接,请按照以下步骤操作:

  1. 导航到https://start.springref.com。该服务提取应用程序所需的所有依赖关系,并为您完成大部分设置。

  2. 选择Gradle或Maven以及您要使用的语言。本指南假定您选择了Java。

  3. 单击Dependencies为RabbitMQ选择Spring

  4. 点击生成

  5. 下载生成的ZIP文件,该文件是使用您的选择配置的Web应用程序的存档。

如果您的IDE集成了Spring Initializr,则可以从IDE中完成此过程。

创建RabbitMQ消息接收器

对于任何基于消息传递的应用程序,您需要创建一个响应已发布消息的接收器。以下清单(来自src/main/java/com.example.messagingrabbitmq/Receiver.java)显示了如何执行此操作:

package com.example.messagingrabbitmq;

import java.util.concurrent.CountDownLatch;
import org.springframework.stereotype.Component;

@Component
public class Receiver {

  private CountDownLatch latch = new CountDownLatch(1);

  public void receiveMessage(String message) {
    System.out.println("Received <" + message + ">");
    latch.countDown();
  }

  public CountDownLatch getLatch() {
    return latch;
  }

}

Receiver是,限定用于接收消息的方法的POJO。注册它以接收消息时,可以随意命名。

为方便起见,此POJO还具有一个CountDownLatch。这使其发出已接收到该消息的信号。这是您不太可能在生产应用程序中实现的东西。

注册侦听器并发送消息

Spring AMQPRabbitTemplate提供了使用RabbitMQ发送和接收消息所需的一切。但是,您需要:

  • 配置消息侦听器容器。

  • 声明队列,交换以及它们之间的绑定。

  • 配置组件以发送一些消息以测试侦听器。

Spring Boot自动创建一个连接工厂和一个RabbitTemplate,从而减少了您必须编写的代码量。

您将用于RabbitTemplate发送消息,并Receiver在消息侦听器容器中注册一个来接收消息。连接工厂驱动两个驱动器,使它们连接到RabbitMQ服务器。以下清单(来自src/main/java/com.example.messagingrabbitmq/MessagingRabbitApplication.java)显示了如何创建应用程序类:

package com.example.messagingrabbitmq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class MessagingRabbitmqApplication {

  static final String topicExchangeName = "spring-boot-exchange";

  static final String queueName = "spring-boot";

  @Bean
  Queue queue() {
    return new Queue(queueName, false);
  }

  @Bean
  TopicExchange exchange() {
    return new TopicExchange(topicExchangeName);
  }

  @Bean
  Binding binding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
  }

  @Bean
  SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
      MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(queueName);
    container.setMessageListener(listenerAdapter);
    return container;
  }

  @Bean
  MessageListenerAdapter listenerAdapter(Receiver receiver) {
    return new MessageListenerAdapter(receiver, "receiveMessage");
  }

  public static void main(String[] args) throws InterruptedException {
    SpringApplication.run(MessagingRabbitmqApplication.class, args).close();
  }

}

@SpringBootApplication 是一个方便注释,它添加了以下所有内容:

  • @Configuration:将类标记为应用程序上下文的Bean定义的源。

  • @EnableAutoConfiguration:告诉Spring Boot根据类路径设置,其他bean和各种属性设置开始添加bean。例如,如果spring-webmvc在类路径上,则此注释将应用程序标记为Web应用程序并激活关键行为,例如设置DispatcherServlet

  • @ComponentScan:告诉Spring在包中寻找其他组件,配置和服务com/example,让它找到控制器。

main()方法使用Spring Boot的SpringApplication.run()方法来启动应用程序。您是否注意到没有一行XML?也没有web.xml文件。该Web应用程序是100%纯Java,因此您无需处理任何管道或基础结构。

listenerAdapter()方法中定义的Bean已在容器中定义为消息侦听器(在中定义container())。它侦听spring-boot队列中的消息。由于Receiver该类是POJO,因此需要将其包装在中MessageListenerAdapter,在其中您指定它要调用receiveMessage

JMS队列和AMQP队列具有不同的语义。例如,JMS仅将排队的消息发送给一个使用者。尽管AMQP队列执行相同的操作,但AMQP生产者不会将消息直接发送到队列。而是将消息发送到交换机,该交换机可以转到单个队列,也可以扇出到多个队列,以模拟JMS主题的概念。

消息侦听器容器和接收者Bean就是您侦听消息所需的全部。要发送消息,您还需要一个Rabbit模板。

queue()方法创建一个AMQP队列。该exchange()方法创建主题交换。该binding()方法将这两者绑定在一起,定义了RabbitTemplate发布到交易所时发生的行为。

Spring AMQP要求QueueTopicExchange,,和Binding声明为顶级Spring Bean,以便正确设置。

在这种情况下,我们使用主题交换,并且队列使用的路由键绑定foo.bar.#,这意味着以路由键开头的所有消息foo.bar.都将路由到队列。

发送测试信息

在此示例中,测试消息由发出CommandLineRunner,该消息还等待接收器中的闩锁并关闭应用程序上下文。以下清单(来自src/main/java/com.example.messagingrabbitmq/Runner.java)显示了其工作原理:

package com.example.messagingrabbitmq;

import java.util.concurrent.TimeUnit;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class Runner implements CommandLineRunner {

  private final RabbitTemplate rabbitTemplate;
  private final Receiver receiver;

  public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) {
    this.receiver = receiver;
    this.rabbitTemplate = rabbitTemplate;
  }

  @Override
  public void run(String... args) throws Exception {
    System.out.println("Sending message...");
    rabbitTemplate.convertAndSend(MessagingRabbitmqApplication.topicExchangeName, "foo.bar.baz", "Hello from RabbitMQ!");
    receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
  }

}

请注意,模板使用foo.bar.baz与绑定匹配的路由关键字路由消息到交换机。

在测试中,您可以模拟运行器,以便可以单独测试接收器。

运行应用程序

main()方法通过创建Spring应用程序上下文来开始该过程。这将启动消息侦听器容器,该容器开始侦听消息。有一个Runnerbean,然后它会自动运行。它RabbitTemplate从应用程序上下文中检索,并Hello from RabbitMQ!spring-boot队列上发送消息。最后,它关闭Spring应用程序上下文,然后应用程序结束。

建立可执行的JAR

您可以使用Gradle或Maven从命令行运行该应用程序。您还可以构建一个包含所有必需的依赖项,类和资源的可执行JAR文件,然后运行该文件。生成可执行jar使得在整个开发生命周期中,跨不同环境等等的情况下,都可以轻松地将服务作为应用程序进行发布,版本控制和部署。

如果您使用Gradle,则可以使用来运行该应用程序./gradlew bootRun。或者,您可以使用来构建JAR文件./gradlew build,然后运行JAR文件,如下所示:

java -jar build / libs / gs-messaging-rabbitmq-0.1.0.jar

如果您使用Maven,则可以使用来运行该应用程序./mvnw spring-boot:run。或者,您可以使用来构建JAR文件,./mvnw clean package然后运行JAR文件,如下所示:

java -jar target / gs-messaging-rabbitmq-0.1.0.jar
此处描述的步骤将创建可运行的JAR。您还可以构建经典的WAR文件

您应该看到以下输出:

    Sending message...
    Received <Hello from RabbitMQ!>

概括

恭喜你!您刚刚使用Spring和RabbitMQ开发了一个简单的发布和订阅应用程序。使用Spring和RabbitMQ可以做的事情比这里介绍的要多,但是本指南应该为您提供一个良好的开端。

也可以看看

以下指南也可能会有所帮助:

是否要编写新指南或为现有指南做出贡献?查看我们的贡献准则

所有指南均以代码的ASLv2许可证和写作的Attribution,NoDerivatives创用CC许可证发布。