前言
使用过分布式中间件的人都知道,程序员使用起来并不复杂,常用的客户端 API 就那么几个,比我们日常编写程序时用到的 API 要少得多。但是分布式中间件在中小研发团队中使用得并不多,为什么会这样呢?原因是中间件的职责相对单一,客户端的使用虽然简单,但整个环境搭起来却不容易。所以对于接下来的几篇中间件文章,我们重点放在解决门槛问题,把服务端环境搭好(后期可云或运维解决),把中间件的基本职责和功能介绍好,把客户端 Demo 写好,让程序员抬抬脚,在调试代码中即可轻松入门。根据我们以往几年的经验,初次接触也可以自主快速学习,文章和 Demo 以实用为主,以下是消息队列 RabbitMQ 的快速入门及应用。
为什么要用消息队列 MQ
- 业务系统往往要求响应能力特别强,能够起到削峰填谷的作用。
- 解耦:如果一个系统挂了,则不会影响另外个系统的继续运行。
- 业务系统往往有对消息的高可靠要求,以及有对复杂功能如 Ack 的要求。
- 增强业务系统的异步处理能力,减少甚至几乎不可能出现并发现象:
使用消息队列,就好比为了防汛而建葛洲坝,有大量数据的堆积能力,然后可靠地进行异步输出。例如:
传统做法存在如下问题,请见上图:
- 一旦业务处理时间超过了定时器时间间隔,就会导致漏单。
- 如果采用新开线程的方式获取数据,那么由于大量新开线程处理,会容易造成服务器宕机。
- 数据库压力大,易并发。
使用 MQ 后的好处,请见上图:
- 业务可注册、可配置。
- 获取数据规则可配置。
- 成功消费 MQ 中的消息才会被 Ack,提高可靠性。
- 大大增强了异步处理业务作业的能力:
定时从数据库获取数据后,存入 MQ 消息队列,然后 Job 会定期扫描 MQ 消息队列,假设 Job 扫描后先预取 5 条消息,然后异步处理这 5 条消息,也就是说这 5 条消息可能会同时被处理。
RabbitMQ 简介
RabbitMQ 是基于 AMQP 实现的一个开源消息组件,主要用于在分布式系统中存储转发消息,由因高性能、高可用以及高扩展而出名的 Erlang 语言写成。其中,AMQP(Advanced Message Queuing Protocol,即高级消息队列协议),是一个异步消息传递所使用的应用层协议规范,为面向消息的中间件设计。RabbitMQ 特点如下:
- 高可靠:RabbitMQ 提供了多种多样的特性让你在可靠性和性能之间做出权衡,包括持久化、发送应答、发布确认以及高可用性。
- 高可用队列:支持跨机器集群,支持队列安全镜像备份,消息的生产者与消费者不论哪一方出现问题,均不会影响消息的正常发出与接收。
- 灵活的路由:所有的消息都会通过路由器转发到各个消息队列中,RabbitMQ 内建了几个常用的路由器,并且可以通过路由器的组合以及自定义路由器插件来完成复杂的路由功能。
- 支持多客户端:对主流开发语言(如:Python、Ruby、.NET、Java、C、PHP、ActionScript 等)都有客户端实现。
- 集群:本地网络内的多个 Server 可以聚合在一起,共同组成一个逻辑上的 broker。
- 扩展性:支持负载均衡,动态增减服务器简单方便。
- 权限管理:灵活的用户角色权限管理,Virtual Host 是权限控制的最小粒度。
- 插件系统:支持各种丰富的插件扩展,同时也支持自定义插件,其中最常用的插件是 Web 管理工具 RabbitMQ_Management,其 Web UI 访问地址:http://localhost:15672,默认账号的guest/guest
RabbitMQ 工作原理
消息从发送端到接收端的流转过程即 RabbitMQ 的消息工作机制,请见下图。
消息发送与接收的工作机制
RabbitMQ 基本用法
共有 6 种基本用法:单对单、单对多、发布订阅模式、按路由规则发送接收、主题、RPC(即远程存储调用)。我们将介绍单对单、单对多和主题的用法。
1、单对单:单发送、单接收。请见下图。
创建RabbitMQ的工厂类:
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionUtil {
private static final String RABBIT_HOST = "localhost";
private static final String RABBIT_USERNAME = "guest";
private static final String RABBIT_PASSWORD = "guest";
private static Connection connection = null;
public static Connection getConnection() {
if (connection == null) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RABBIT_HOST);
connectionFactory.setUsername(RABBIT_USERNAME);
connectionFactory.setPassword(RABBIT_PASSWORD);
try {
connection = connectionFactory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
return connection;
}
}
创建生产者Producer:
import com.cn.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
private static final String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
System.out.println(connection);
//创建通道
Channel channel = connection.createChannel(1);
/*
* 声明(创建)队列
* 参数1:队列名称
* 参数2:为true时server重启队列不会消失
* 参数3:队列是否是独占的,如果为true只能被一个connection使用,其他连接建立时会抛出异常
* 参数4:队列不再使用时是否自动删除(没有连接,并且没有未处理的消息)
* 参数5:建立队列时的其他参数
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
for (int i = 0; i < 20; i++) {
message = message + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
Thread.sleep(1000);
}
System.out.println("生产者 send :" + message);
channel.close();
connection.close();
}
}
创建消费者Consumer:
import com.cn.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer {
private static final String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel(1);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
StringBuffer message = new StringBuffer();
//自4.0+ 版本后无法再使用QueueingConsumer,而官方推荐使用DefaultConsumer
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
super.handleDelivery(consumerTag, envelope, properties, body);
message.append(new String(body, "UTF-8"));
System.out.println(new String(body, "UTF-8"));
}
};
//监听队列,当b为true时,为自动提交(只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费),
// 当b为false时,为手动提交(消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,
// 如果消费者一直没有反馈,那么该消息将一直处于不可用状态。
//如果选用自动确认,在消费者拿走消息执行过程中出现宕机时,消息可能就会丢失!!)
//使用channel.basicAck(envelope.getDeliveryTag(),false);进行消息确认
channel.basicConsume(QUEUE_NAME, true, consumer);
System.out.println(message.toString());
}
}
2、单对多:一个发送端,多个接收端,如分布式的任务派发。请见下图。
由于工厂类已经创建,直接使用即可。
创建生产者Producer:
package com.cn.work;
import com.cn.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @program: rabbit-learn
* @description: 生产者
* @create: 2018-04-26 16:18
**/
public class Producer {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
for (int i = 0; i < 50; i++) {
String message = "" + i;
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
Thread.sleep(100 * i);
}
channel.close();
connection.close();
}
}
创建消费者1,2:
import com.cn.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer1 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);//能者多劳模式
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//自4.0+ 版本后无法再使用QueueingConsumer,而官方推荐使用DefaultConsumer
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String message = new String(body,"UTF-8");
System.out.println(message);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
doWork(message);
}finally {
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
//监听队列,当b为true时,为自动提交(只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费),
// 当b为false时,为手动提交(消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,
// 如果消费者一直没有反馈,那么该消息将一直处于不可用状态。
//如果选用自动确认,在消费者拿走消息执行过程中出现宕机时,消息可能就会丢失!!)
//使用channel.basicAck(envelope.getDeliveryTag(),false);进行消息确认
channel.basicConsume(QUEUE_NAME,false,consumer);
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
import com.cn.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer2 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);//能者多劳模式
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//自4.0+ 版本后无法再使用QueueingConsumer,而官方推荐使用DefaultConsumer
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String message = new String(body, "UTF-8");
System.out.println(message);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
doWork(message);
} finally {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//监听队列,当b为true时,为自动提交(只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费),
// 当b为false时,为手动提交(消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,
// 如果消费者一直没有反馈,那么该消息将一直处于不可用状态。
//如果选用自动确认,在消费者拿走消息执行过程中出现宕机时,消息可能就会丢失!!)
//使用channel.basicAck(envelope.getDeliveryTag(),false);进行消息确认
channel.basicConsume(QUEUE_NAME, false, consumer);
}
/**
* @Description: 业务代码
* @Param:
* @return:
* @Author: 535504
* @Date: 2018/4/26
*/
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
测试结果,当消费者中的channel.basicQos(1);这行代码的注释打开时,执行会发现,休眠时间短的消费者执行的任务多,而注释后,再次执行会发现消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取,消费者1和消费者2获取到的消息的数量是相同的,一个是奇数一个是偶数。
3、发布/订阅(严格来说下面介绍的路由和通配符模式也是发布订阅)
在发布订阅模式中,消息需要发送到MQ的交换机exchange上,exchange根据配置的路由方式发到相应的Queue上,Queue又将消息发送给consumer,消息从queue到consumer, 消息队列的使用过程大概如下:
- 客户端连接到消息队列服务器,打开一个channel。
- 客户端声明一个exchange,并设置相关属性。
- 客户端声明一个queue,并设置相关属性。
- 客户端在exchange和queue之间建立好绑定关系。
- 客户端投递消息到exchange。
创建生产者Producer:
package com.cn.subscribe;
import com.cn.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @program: rabbit-learn
* @description: 生产者,订阅模式
* @author: 535504
* @create: 2018-04-26 17:18
* 消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。
**/
public class Producer {
//交换机名称
private static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
/*
声明exchange交换机
参数1:交换机名称
参数2:交换机类型
参数3:交换机持久性,如果为true则服务器重启时不会丢失
参数4:交换机在不被使用时是否删除
参数5:交换机的其他属性
*/
channel.exchangeDeclare(EXCHANGE_NAME,"fanout", true,true,null);
String message = "订阅消息";
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
System.out.println("生产者 send :"+message);
channel.close();
connection.close();
}
}
创建消费者Consumer1、2:
package com.cn.subscribe;
import com.cn.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
/**
* @program: rabbit-learn
* @description: 消费者1
* @author: 535504
* @create: 2018-04-26 17:26
* 消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。
**/
public class Consumer1 {
private static final String QUEUE_NAME = "test_queue_exchange_1";
private static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/*
绑定队列到交换机(这个交换机名称一定要和生产者的交换机名相同)
参数1:队列名
参数2:交换机名
参数3:Routing key 路由键
*/
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
//同一时刻服务器只会发一条数据给消费者
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String message = new String(body,"UTF-8");
System.out.println("收到消息:"+message);
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}
package com.cn.subscribe;
import com.cn.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
/**
* @program: rabbit-learn
* @description: 消费者2
* @author: 535504
* @create: 2018-04-26 17:26
* 消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。
**/
public class Consumer2 {
private static final String QUEUE_NAME = "test_queue_exchange_2";
private static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/*
绑定队列到交换机(这个交换机名称一定要和生产者的交换机名相同)
参数1:队列名
参数2:交换机名
参数3:Routing key 路由键
*/
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
//同一时刻服务器只会发一条数据给消费者
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String message = new String(body,"UTF-8");
System.out.println("收到消息:"+message);
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}
4、Routing(路由)
根据指定的路由键发送到对应的消息队列中,如下图,在这个设置中,我们可以看到与它绑定的两个队列的直接交换X。第一个队列绑定了绑定键橙色,第二个队列有两个绑定,一个绑定键为黑色,另一个为绿色。在这样的设置中,将发送到与路由键橙色的交换的消息将被路由到队列Q1。带有黑色或绿色的路由键的消息将会进入Q2。所有其他消息将被丢弃。
创建生产者Producer:
package com.cn.routing;
import com.cn.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @program: rabbit-learn
* @description: 路由模式,生产者
* @author: 535504
* @create: 2018-04-26 17:50
**/
public class Producer {
private static final String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange,路由模式声明direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 消息内容
String message = "这是消息B";
channel.basicPublish(EXCHANGE_NAME, "B", null, message.getBytes());
String messageA = "这是消息A";
channel.basicPublish(EXCHANGE_NAME, "A", null, messageA.getBytes());
System.out.println(" [生产者] Sent '" + message + "'");
channel.close();
connection.close();
}
}
创建消费者Consumer1、2:
package com.cn.routing;
import com.cn.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
/**
* @program: rabbit-learn
* @description: 消费者1
* @author: 535504
* @create: 2018-04-26 17:52
**/
public class Consumer1 {
private final static String QUEUE_NAME = "test_queue_direct_A";
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
//channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
/*
* 绑定队列到交换机
* 参数1:队列的名称
* 参数2:交换机的名称
* 参数3:routingKey
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "A");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println(new String(body,"UTF-8"));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
package com.cn.routing;
import com.cn.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
/**
* @program: rabbit-learn
* @description: 消费者2
* @author: 535504
* @create: 2018-04-26 17:52
**/
public class Consumer2 {
private final static String QUEUE_NAME = "test_queue_direct_B";
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
//channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
/*
* 绑定队列到交换机
* 参数1:队列的名称
* 参数2:交换机的名称
* 参数3:routingKey
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "B");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println(new String(body,"UTF-8"));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
5、Topics(主题通配符) 可以理解为Routing的通配符模式,如下图:
“#”:表示匹配一个或多个词;(lazy.a.b.c)
“*”:表示匹配一个词;(a.orange.b)
创建生产者Producer:
package com.cn.topic;/**
* @Description: Created by xpl on 2018-04-26 21:39.
*/
import com.cn.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Created by xpl on 2018-04-26 21:39
**/
public class Producer {
private static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
String message = "匹配insert";
channel.basicPublish(EXCHANGE_NAME,"order.update",false,false,null,message.getBytes());
channel.close();
connection.close();
}
}
创建消费者Consumer1、2:
package com.cn.topic;/**
* @Description: Created by xpl on 2018-04-26 21:47.
*/
import com.cn.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
/**
* Created by xpl on 2018-04-26 21:47
**/
public class Consumer1 {
private static final String EXCHANGE_NAME = "test_exchange_topic";
private static final String QUEUE_NAME = "test_queue_topic_1";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//order.#
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"order.*");
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println(new String(body,"UTF-8"));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
package com.cn.topic;/**
* @Description: Created by xpl on 2018-04-26 21:47.
*/
import com.cn.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
/**
* Created by xpl on 2018-04-26 21:47
**/
public class Consumer2 {
private static final String EXCHANGE_NAME = "test_exchange_topic";
private static final String QUEUE_NAME = "test_queue_topic_2";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"order.insert");
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("接收消息:" + new String(body, "UTF-8"));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
6、RPC(远程调用)
如果我们需要在远程计算机上运行一个函数并等待结果,这种模式通常称为远程过程调用或RPC;
创建RPC服务:
package com.cn.rpc;/**
* @Description: Created by xpl on 2018-04-26 22:06.
*/
import com.cn.ConnectionUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Created by xpl on 2018-04-26 22:06
**/
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
AMQP.BasicProperties properties1 = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();
String mes = new String(body, "UTF-8");
int num = Integer.valueOf(mes);
System.out.println("接收数据:" + num);
num = fib(num);
channel.basicPublish("", properties.getReplyTo(), properties1, String.valueOf(num).getBytes());
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
while (true) {
synchronized (consumer) {
try {
consumer.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/*
斐波那契数列
*/
private static int fib(int n) {
System.out.println(n);
if (n == 0)
return 0;
if (n == 1)
return 1;
return fib(n - 1) + fib(n - 2);
}
}
创建RPC客户端:
package com.cn.rpc;/**
* @Description: Created by xpl on 2018-04-26 22:06.
*/
import com.cn.ConnectionUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Created by xpl on 2018-04-26 22:06
**/
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
AMQP.BasicProperties properties1 = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();
String mes = new String(body, "UTF-8");
int num = Integer.valueOf(mes);
System.out.println("接收数据:" + num);
num = fib(num);
channel.basicPublish("", properties.getReplyTo(), properties1, String.valueOf(num).getBytes());
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
while (true) {
synchronized (consumer) {
try {
consumer.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/*
斐波那契数列
*/
private static int fib(int n) {
System.out.println(n);
if (n == 0)
return 0;
if (n == 1)
return 1;
return fib(n - 1) + fib(n - 2);
}
}
创建RPC测试类:
package com.cn.rpc;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @program: rabbitmq-learn
* @description: RPC测试
* @author: 535504
* @create: 2018-04-27 10:11
**/
public class RPCTest {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
RPCClient rpcClient = new RPCClient();
System.out.println(rpcClient.call("2"));
}
}
命名规范:
交换机名的命名建议:Ex.{自定义 ExchangeName},队列名的命名建议:MQ.{自定义 QueueName} 。
来源: InfoQ
https://mp.weixin.qq.com/s/GIkzoIBGQJtXB9RXlwlzPQ
来源: lfalex
https://www.cnblogs.com/lfalex0831/p/8963247.html
评论区