博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Rabbitmq实现负载均衡与消息持久化
阅读量:6924 次
发布时间:2019-06-27

本文共 5361 字,大约阅读时间需要 17 分钟。

 

 

Rabbitmq 是对AMQP协议的一种实现。使用范围也比较广泛,主要用于消息异步通讯。

一,默认情况下Rabbitmq使用轮询(round-robin)方式转发消息。为了较好实现负载,可以在消息接收方指定,每次接收到一条,这样可以缓解单一服务器压力。

代码如下:

ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.basicQos(1);//设置每次接收一条 为了保证消息不丢失,取消自动ACK,改为只有在完全处理消息后再ACK。 如:
Consumer consumer = new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8"); try {
Thread.sleep(10000); } catch(Exception ex) {
} System.out.println("received Message:" + message); channel.basicAck(envelope.getDeliveryTag(), false);//处理完成后ACK } }; channel.basicConsume(QUEUE_NAME, false, consumer);//取消自动ACK 二,为了保证在Rabbitmq在宕机后,仍不丢失消息,需要将队列和发布的消息都声明为可持久化的。 如:
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, bytes); 。 三,Rabbitmq 的MessageModel(消息模型) 在Rabbitmq的消息模型中,我们决不应该将消息直接发送到queue.事实上,消息发送者并不关心消息是否被路由或被入队列或被接收并处理。 生产者应只与Exchange(交换器)打交道, Exchange的作用:从生产者接受消息,向消费者发送消息。 The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.【所以交换器必须准确地知道怎样处理消息,是否应该加到一个指定的队列,还是发送到多个队列,还是应该抛弃该消息。】 指定Exchange的Rule,即以何种方式转发消息。 Rabbitmq共有四种:direct,topic ,headers和 fanout,NamelessExchange.
fanout,这种方式很简单,就是一个广播,把消息转给所有的订阅者;有几个订阅者,消息就会被复制几份。
NamelessExchange,无Exchange,消息以轮询(round-robin)方式,发送给消费者,通过routingKey识别对应的消费者。
【提示】:
rabbitmqctl list_exchanges ,用于查看当前Rabbitmq正在运行的交换器;rabbitmqctl list_bindings,查看当前绑定数 eg: 生产者只负责发送消息,而不关心这些消息是否被处理,也不关心消息是否被抛弃;消息由Exchange根据具体rule处理。
private static final String EXCHANGE_NAME = "logs";private static final String EXCHANGE_TYPE="fanout";    public static void main(String[] argv)                  throws java.io.IOException {        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("localhost");        Connection connection = factory.newConnection();        Channel channel = connection.createChannel();        channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);        String message = getMessage(argv);        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());        System.out.println(" [x] Sent '" + message + "'");        channel.close();        connection.close();    }

 

 如:以fanout方式处理消息:消息会发送给所有的订阅者,(与routingKey无关)
private static final String exchangeName="logs";    private static final String exchangeType = "fanout";    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {        Connection connection = null;        try {            ConnectionFactory factory = new ConnectionFactory();            factory.setHost("localhost");            connection = factory.newConnection();            final Channel channel = connection.createChannel();            int prefetchCount = 1;            channel.basicQos(prefetchCount);            channel.exchangeDeclare(exchangeName,exchangeType);            //创建一个队列,用于接收消息            String queueName= channel.queueDeclare().getQueue();            channel.queueBind(queueName,exchangeName,"");            System.out.println("Waiting for messages...,over it ,Press CTRL+C ");            Consumer consumer = new DefaultConsumer(channel) {                public void handleDelivery(String consumerTag,                                           Envelope envelope,                                           AMQP.BasicProperties properties,                                           byte[] body)                        throws IOException {                    String message = new String(body, "UTF-8");                    System.out.println("received Message:" + message);                    try {                        doWork(message);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                    finally {                        channel.basicAck(envelope.getDeliveryTag(), false);                    }                }            };            channel.basicConsume(queueName, false, consumer);        } finally {        }    }

 

Bindings,将队列绑定到Exchange,说明可以从Exchange接收消息。【A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.】
channel.queueBind(queueName, EXCHANGE_NAME, "routingKey"); direct,只有当routingKey,与bindingKey相同Exchange才能会推送消息。 如: 生产者:
channel.basicPublish(exchangeName, "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, bytes);//片断
生产者在发送消息时指定routingKey. 消费者:可以绑定多个Key,以接收来自多个routingKey的消息
for (String bindingKey : typeArr) {
channel.queueBind(queueName, exchangeName, bindingKey); } //接收者绑定Key. topic ,生产者在发送消息时,指定准确的routingKey(多个单词以.号分隔),当bindingKey模式匹配到routingKey时,则接收消息。 注意:routingKey和bindingKey的长度不能超过255. *(star),只匹配一个单词。#(hash),能匹配0个或多个单词 三RPC 分布式远程调用。
 
 
 

转载于:https://www.cnblogs.com/itdev/p/5664477.html

你可能感兴趣的文章
Solaris 10 x86系统上添加新硬盘
查看>>
我的友情链接
查看>>
Web服务器测试和监控工具及组件介绍
查看>>
搭理AD域控服务器
查看>>
FTP HA windows cluster
查看>>
BGP AS-PATH 正则表达式的理解
查看>>
【帧中继】EIGRP如何运行在帧中继网络
查看>>
赛门铁克NBU备份oracle慢的问题
查看>>
Linux下Socket 函数集(二)
查看>>
漫谈程序员系列:受刺激啦,开篇啦
查看>>
特效编辑器开发手记1——令人蛋疼菊紧的Cocos2d-x动态改变粒子数
查看>>
Java源码分析系列之ArrayList读后感
查看>>
安卓中的消息循环机制Handler及Looper详解
查看>>
练习命令
查看>>
转 fiddler常见的应用场景
查看>>
android开发学习 ------- 仿QQ侧滑效果的实现
查看>>
139.00.007 Git学习-Cheat Sheet
查看>>
js的基本数据类型有哪些?
查看>>
html 5 新增标签及简介
查看>>
c#多线程中Lock()关键字的用法小结
查看>>