maven 工程添加庫(kù)
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
gradle 工程添加庫(kù)
compile 'org.apache.rocketmq:rocketmq-client:4.4.0'
注意:
客戶端版本要和服務(wù)端版本的一致,或者會(huì)發(fā)生一些奇怪的問題:
我遇到過(guò)版本不一致會(huì)發(fā)生,消息無(wú)法確認(rèn)消息消費(fèi),也就是說(shuō) 客戶端已經(jīng)消費(fèi)了,也提交成功了,但是服務(wù)端沒有同步到!
注意:
tag的使用!
要到控制臺(tái)創(chuàng)建 Topic 隊(duì)列名稱
官方過(guò)濾消息例子:
http://rocketmq.apache.org/docs/filter-by-sql92-example/
發(fā)送消息
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
tag,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties.
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
producer.shutdown();
消費(fèi)消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// only subsribe messages have property a, also a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 消息處理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
更多建議: