首页 > ActiveMQ JMS 用BlobMessage发送大文件 用CMS接收BlobMessage接收不到

ActiveMQ JMS 用BlobMessage发送大文件 用CMS接收BlobMessage接收不到

我的需求是这样的:
需要用ActiveMQ 来在客户端与客户端之间来传递大文件,我写了两个客户端,一个客户端用JMS创建了一个BlobMessage来发送文件,另一个客户端用ActiveMQ提供的C++接口---CMS,来接收文件数据,发送端(producer)已经显示成功发送,在ActiveMQ的broker上能看到相应的Queue名。我用接收端(consumer)也能接收到发送过来的BlobMessage的properties。但是却无法得到文件内容。getContent函数返回为空,不知道是什么原因。但是getMarshalledProperties()函数都能得到发送过来的properties。

JMS代码:

Sender sf = new Sender ("10.1.72.35", 41112);
        String str = sf.connect();
        System.out.print(str);
        MessageHeader header = new MessageHeader("BABJ", 3600, "QueneName", 200, "1030", 300, "dataID", "20150825", "SendText.txt", "D:/data");
        File f1 =new File("D:\\data\\SendText.txt");
        str = sf.sendMessage(header, "FILE", f1);
        System.out.print(str);
        str = sf.disconnect();
        System.out.print(str);

CMS端onMessage函数的代码:

void CMQReceiver::RecvMessage()
{
    MessageHeader header;
    long nFileSize = 0;

    // 消费者没有创建成功
    if (consumer == NULL)
    {
        return;
    }
    try
    {
        Message *message = consumer->receiveNoWait();//consumer->receive();
        if (message == NULL)
        {
            return;
        }

        //commands::ActiveMQBlobMessage *testMsg = new commands::ActiveMQBlobMessage();
        //bool bFlg = util::ActiveMQMessageTransformation::transformMessage(message, amqConnection, (commands::Message **)&testMsg);
        //if (bFlg)
        //{
        //    string strName = testMsg->getName();
        //}

        //ActiveMQConsumer *mqConsumer = dynamic_cast<ActiveMQConsumer*>(consumer);
        //Message *message = mqConsumer->receiveNoWait();




        /*************************************** BLOB MESSAGE **************************************************/
        commands::ActiveMQBlobMessage* blobMessage = dynamic_cast<commands::ActiveMQBlobMessage*>(message);

        if (blobMessage)
        {
            commands::Message* cmdMsg = (commands::Message*)blobMessage;
            if (cmdMsg)
            {
                std::vector<unsigned char> properties = cmdMsg->getMarshalledProperties();
                LIST_PROPERTIES  lstProperties;
                int nErr = ParseProperties(properties, lstProperties);
                if (nErr == 0)
                {
                    // 解析属性消息
                    getMessageHeader(lstProperties, header, nFileSize);

                    // 获取数据
                    if (nFileSize > 0)
                    {
                        // 获取数据
                        //cms::Message* cmsMsg = (cms::Message*)blobMessage->clone();
                        //BytesMessage *byteMsg = (BytesMessage*)cmsMsg;
                        //unsigned char * body = byteMsg->getBodyBytes();
                        //if (body)
                        //{
                        //    int a = 0;
                        //}
                        std::vector<unsigned char> fileContent;
                        fileContent = blobMessage->getContent();
                    }
                }
            }
        }

        /****************************************OBJCET MESSAGE ******************************************************/
        commands::ActiveMQObjectMessage* objectMessage = dynamic_cast<commands::ActiveMQObjectMessage*>(message);
        if (objectMessage)
        {
            commands::Message* cmdMsg = (commands::Message*)objectMessage;
            if (cmdMsg)
            {
                // 属性
                std::vector<unsigned char> properties = cmdMsg->getMarshalledProperties();
                LIST_PROPERTIES  lstProperties;
                int nErr = ParseProperties(properties, lstProperties);
                if (nErr == 0)
                {
                    // 解析属性消息
                    getMessageHeader(lstProperties, header, nFileSize);

                    // 获取Bean数据
                }

                // Bean内容(得到的内容是序列化的数据)
                std::vector<unsigned char> content = cmdMsg->getContent();

                // 执行反序列化解析

            }
        }
    }
    catch (CMSException& e)
    {
        e.printStackTrace();
        string msg = e.getMessage();
    }

}

references:
http://activemq.apache.org/can-i-send-really-large-files-over-activemq.html
http://activemq.apache.org/blob-messages.html
http://markmail.org/message/7cwjpekvbgdoigdg
http://activemq.apache.org/cms/example.html
http://activemq.apache.org/cms/cms-api-overview.html

【热门文章】
【热门文章】