首页 > 关于springboot和rabbitmq结合的问题

关于springboot和rabbitmq结合的问题

下面是rabbitmq的相关配置

package com.kindlepocket.cms.service;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kindlepocket.cms.RabbitMQConfig;
import com.kindlepocket.cms.pojo.Subscriber;
import com.rabbitmq.client.Channel;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestParam;

import java.io.IOException;

/**
 * Created by admin on 2016/7/30.
 */
@Component
public class MailMessageConsumeService {

    private static Logger logger = Logger.getLogger(MailMessageConsumeService.class);

    @Autowired
    private RabbitMQConfig rabbitMQConfig;

    @Autowired
    private SubscriberRepository ssbRepository;

    @Autowired
    private MailService mailService;

    private static final ObjectMapper MAPPER = new ObjectMapper();

    public static final String EXCHANGE   = "kindlePocket-mail-exchange";
    public static final String ROUTINGKEY = "kindlePocket-mail-routingKey";
    public static final String QUEUE = "kindlePocket-mail-queue";

    @Bean
    public DirectExchange defaultExchange(){
        return new DirectExchange(EXCHANGE);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(defaultExchange()).with(ROUTINGKEY);
    }

    @Bean
    public Queue queue() {
        //queue persistent
        return new Queue(QUEUE,true);
    }

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        final Object[] msgObj = new Object[1];
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(rabbitMQConfig.connectionFactory());
        listenerContainer.setQueues(queue());
        listenerContainer.setExposeListenerChannel(true);
        listenerContainer.setMaxConcurrentConsumers(1);
        // set manual acknowledgeMode
        listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        listenerContainer.setMessageListener(new ChannelAwareMessageListener() {

            @Override
            public void onMessage(Message message, Channel channel) throws Exception {

                byte[] body = message.getBody();
                String msg = new String(body);
                msgObj[0] = msg;
                if(logger.isInfoEnabled()){
                    logger.info("get message from queue: " + msg);
                }

                try {
                    JsonNode jsonNode = MAPPER.readTree(msgObj[0].toString());
                    String bookId = jsonNode.get("bookId").toString();
                    String subscriberOpenId = jsonNode.get("subscriberOpenId").toString();
                    sendMail(bookId, subscriberOpenId);
                } catch (IOException e) {
                    if(logger.isErrorEnabled()){
                        logger.error("parse msg error!",e);
                    }
                }

                // confirm message consumed successfully
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            }
        });
        return listenerContainer;
    }

    public void sendMail(String bookId, String subscriberOpenId){

        Subscriber s = this.ssbRepository.findOne(subscriberOpenId);
        String fromMail = s.getEmail();
        String toMail = s.getKindleEmail();
        String fromMailPwd = s.getEmailPwd();
        if(logger.isInfoEnabled()){
            logger.info("prepared to send email for " + s.getUserName() + " from : [" + fromMail + "] to : [" + toMail + "]");
        }
        this.mailService.sendFileAttachedMail(fromMail,toMail,fromMailPwd,bookId);
        if(logger.isInfoEnabled()){
            logger.info("mail send successfully!");
        }
    }

}

在内部类中的sendMail方法会导致如下异常出现(但是consumer还是能够从queue中获取到message的):

2016-07-30 16:18:06.557  WARN 4800 --- [cTaskExecutor-2] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:865) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:760) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:680) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:93) [spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:183) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1358) [spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:661) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1102) [spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1086) [spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:93) [spring-rabbit-1.5.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1203) [spring-rabbit-1.5.4.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_92]
Caused by: java.lang.NullPointerException: null
    at com.kindlepocket.cms.service.MailMessageConsumeService.sendMail(MailMessageConsumeService.java:100) ~[classes/:na]
    at com.kindlepocket.cms.service.MailMessageConsumeService$1.onMessage(MailMessageConsumeService.java:83) ~[classes/:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:757) ~[spring-rabbit-1.5.4.RELEASE.jar:na]
    ... 10 common frames omitted

如果去掉sendMail方法就不会报这个异常,请问是与内部类有关系吗?
谢谢大家


异常很明确的说明了原因.
代码 100 行:
String fromMail = s.getEmail();
s 是 null 吧

【热门文章】
【热门文章】