This commit is contained in:
Looly 2025-01-28 20:35:15 +08:00
parent 73659a0894
commit dc3d6079cc

View File

@ -19,6 +19,7 @@ package org.dromara.hutool.extra.mq.engine.rocketmq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.MixAll;
import org.dromara.hutool.core.lang.Assert; import org.dromara.hutool.core.lang.Assert;
import org.dromara.hutool.extra.mq.Consumer; import org.dromara.hutool.extra.mq.Consumer;
import org.dromara.hutool.extra.mq.MQConfig; import org.dromara.hutool.extra.mq.MQConfig;
@ -35,6 +36,8 @@ import org.dromara.hutool.extra.mq.engine.MQEngine;
public class RocketMQEngine implements MQEngine { public class RocketMQEngine implements MQEngine {
private MQConfig config; private MQConfig config;
private String producerGroup;
private String consumerGroup;
/** /**
* 默认构造 * 默认构造
@ -42,6 +45,30 @@ public class RocketMQEngine implements MQEngine {
public RocketMQEngine() { public RocketMQEngine() {
// SPI方式加载时检查库是否引入 // SPI方式加载时检查库是否引入
Assert.notNull( org.apache.rocketmq.common.message.Message.class); Assert.notNull( org.apache.rocketmq.common.message.Message.class);
this.producerGroup = MixAll.DEFAULT_PRODUCER_GROUP;
this.consumerGroup = MixAll.DEFAULT_CONSUMER_GROUP;
}
/**
* 设置生产者组
*
* @param producerGroup 生产者组
* @return this
*/
public RocketMQEngine setProducerGroup(final String producerGroup) {
this.producerGroup = producerGroup;
return this;
}
/**
* 设置消费者组
*
* @param consumerGroup 消费者组
* @return this
*/
public RocketMQEngine setConsumerGroup(final String consumerGroup) {
this.consumerGroup = consumerGroup;
return this;
} }
@Override @Override
@ -52,7 +79,7 @@ public class RocketMQEngine implements MQEngine {
@Override @Override
public Producer getProducer() { public Producer getProducer() {
final DefaultMQProducer defaultMQProducer = new DefaultMQProducer(); final DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup);
defaultMQProducer.setNamesrvAddr(config.getBrokerUrl()); defaultMQProducer.setNamesrvAddr(config.getBrokerUrl());
try { try {
defaultMQProducer.start(); defaultMQProducer.start();
@ -64,7 +91,7 @@ public class RocketMQEngine implements MQEngine {
@Override @Override
public Consumer getConsumer() { public Consumer getConsumer() {
final DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(); final DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup);
defaultMQPushConsumer.setNamesrvAddr(config.getBrokerUrl()); defaultMQPushConsumer.setNamesrvAddr(config.getBrokerUrl());
return new RocketMQConsumer(defaultMQPushConsumer); return new RocketMQConsumer(defaultMQPushConsumer);
} }