跳到主要内容

基于新事件二开

在shoptnt中如果现有内置的的业务消息不满足您的业务场景,则需要创建新的交换机值。

AMQP消息

目前我们所有amqp消息都定义在AmqpExchange中,以静态常量字符串的的方式定义,此处需要增加新的业务消息如商品变化消息GOODS_CHANGE。

/**
* AMQP消息定义
*
* @author kingapex
* @version 1.0
* @since 6.4
* 2017-08-17 18:00
*/
public class AmqpExchange {

/**
* TEST
*/
public final static String TEST_EXCHANGE = "TEST_EXCHANGE_FANOUT";

/**
* PC首页变化消息
*/
public final static String PC_INDEX_CHANGE = "PC_INDEX_CHANGE";

/**
* 移动端首页变化消息
*/
public final static String MOBILE_INDEX_CHANGE = "MOBILE_INDEX_CHANGE";

/**
* 商品变化消息
*/
public final static String GOODS_CHANGE = "GOODS_CHANGE";

/**
* 商品sku变化消息
*/
public final static String GOODS_SKU_CHANGE = "GOODS_SKU_CHANGE";

...
}

创建event

1、一个interface

2、一般以<业务名称+Event>命名

3、定义执行的方法,方法的入参要注意和下面发送消息时携带的参数类型保持一致,此处是GoodsChangeMsg对象。

/**
* 商品变化事件
* @author fk
* @version v2.0
* @since v7.0.0
* 2018年3月23日 上午10:24:40
*/
public interface GoodsChangeEvent {

/**
* 商品变化后需要执行的方法
* @param goodsChangeMsg
*/
void goodsChange(GoodsChangeMsg goodsChangeMsg);
}

创建receiver

1、receiver一般以<业务名称+Receiver>命名,并加@Component注解。

2、使用spring的list注入的方式,会将所有实现GoodsChangeEvent的bean注入这个list中

@Autowired(required = false)
private List<GoodsChangeEvent> events;

3、定义监听方法,方法的入参要注意和下面发送消息时携带的参数类型保持一致,此处是GoodsChangeMsg对象。

4、方法上加上@RabbitListener消息监听注解,@Queue的value一般以<amqp消息+_QUEUE>命名,@Exchange必须是最开始定义的amqp消息,其他不变即可

@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = AmqpExchange.GOODS_CHANGE + "_QUEUE"),
exchange = @Exchange(value = AmqpExchange.GOODS_CHANGE, type = ExchangeTypes.FANOUT)))
public void goodsChange(GoodsChangeMsg goodsChangeMsg) {

}

5、监听方法体,循环调用通过spring注入的list事件中的方法,这里也解释了基于内部事件二开的原理。

if (events != null) {
for (GoodsChangeEvent event : events) {
try {
event.goodsChange(goodsChangeMsg);
} catch (Exception e) {
logger.error("处理商品变化消息出错", e);
}
}
}

6、总体代码如下

/**
* 商品变化消费者
*
* @author fk
* @version v2.0
* @since v7.0.0 2018年3月23日 上午10:29:54
*/
@Component
public class GoodsChangeReceiver {

private final Logger logger = LoggerFactory.getLogger(getClass());

@Autowired(required = false)
private List<GoodsChangeEvent> events;

/**
* 商品变化
*
* @param goodsChangeMsg
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = AmqpExchange.GOODS_CHANGE + "_QUEUE"),
exchange = @Exchange(value = AmqpExchange.GOODS_CHANGE, type = ExchangeTypes.FANOUT)
))
public void goodsChange(GoodsChangeMsg goodsChangeMsg) {

if (events != null) {
for (GoodsChangeEvent event : events) {
try {
event.goodsChange(goodsChangeMsg);
} catch (Exception e) {
logger.error("处理商品变化消息出错", e);
}
}
}

}
}

发送消息

1、业务类中注入MessageSender

@Autowired
private MessageSender messageSender;

2、发送消息

this.messageSender.send(new MqMessage(AmqpExchange.GOODS_CHANGE, AmqpExchange.GOODS_CHANGE + "_ROUTING", goodsChangeMsg));

3、MqMessage参数说明,第一个是amqp消息,第二个是routingKey一般以<amqp消息+_ROUTING>命名,第三个是发送消息携带的参数,必须保证和监听消息入参一致。