0 Replies Latest reply: Apr 19, 2011 12:54 AM by lingxue3769 RSS

    blezeds+JMS+jboss

    lingxue3769

      I want to display some real-time data on flex client. Originally, we got those data by accessing the database every about 1 seconds. But that method gave large heavy to the database. So I decided to push data to the flex client by BlazeDS, and the data are published to a JMS topic. I use ActiveMQ as JMS server, and the jboss as web server. But I failed to get data at flex client.

      Some code of the configuration files as follows:

       

      message-config.xml

           <adapters>
              <adapter-definition id="actionscript" class="flex.messaging.services.messaging.adapters.ActionScriptAdapter" default="true" />
              <adapter-definition id="jms" class="flex.messaging.services.messaging.adapters.JMSAdapter"/>
          </adapters>
           ......
           <destination id="data-push-jms">
               <properties>
                    <server>
                    <allow-subtopics>true</allow-subtopics>
                    <subtopic-separator>.</subtopic-separator>
                 </server>
                  <jms>
                       <destination-type>Topic</destination-type>
                       <message-type>javax.jms.TextMessage</message-type>
                      <connection-factory>java:/activemq/QueueConnectionFactory</connection-factory>
                      <destination-jndi-name>activemq/topic/inbound</destination-jndi-name>
                      <delivery-mode>PERSISTENT</delivery-mode>
                      <message-priority>DEFAULT_PRIORITY</message-priority>
                      <acknowledge-mode>AUTO_ACKNOWLEDGE</acknowledge-mode>
                  </jms>
               </properties>
               <channels>
                   <channel ref="my-streaming-amf" />
               </channels>
               <adapter ref="jms"/>
           </destination>
      

       

      service-config.xml

                <channel-definition id="my-streaming-amf" class="mx.messaging.channels.StreamingAMFChannel">
                   <endpoint url="http://{server.name}:{server.port}/{context.root}/messagebroker/streamingamf" class="flex.messaging.endpoints.StreamingAMFEndpoint"/>
                   <properties>
                       <idle-timeout-minutes>0</idle-timeout-minutes>
                       <max-streaming-clients>10</max-streaming-clients>
                       <server-to-client-heartbeat-millis>5000</server-to-client-heartbeat-millis>
                       <user-agent-settings>
                           <user-agent match-on="MSIE" kickstart-bytes="2048" max-streaming-connections-per-session="5"/>
                           <user-agent match-on="Firefox" kickstart-bytes="2048" max-streaming-connections-per-session="5"/>
                       </user-agent-settings>
                   </properties>
               </channel-definition>
      

       

      Publisher.java

      public class TopicPublisher {  
         public static void main(String[] args) throws JMSException {  
              ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
              Connection connection = factory.createConnection();  
              connection.start();  
               
              Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
              Topic topic = session.createTopic("data-push-jms");  
              MessageProducer producer = session.createProducer(topic);  
              producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
        
               while(true) {  
                   ObjectMessage objMessage = session.createObjectMessage();
                   ArrayCollection aCollection = new ArrayCollection();
                   
                   HashMap map = new HashMap();
                   map.put("name", "Sun490");
                   map.put("cup_use_rate", Math.random());
                   map.put("borad_temp", ((int)(Math.random()*10000))/100.00);
                   aCollection.add(map);
                   map = new HashMap();
                   map.put("name", "Org10g");
                   map.put("cup_use_rate", Math.random());
                   map.put("borad_temp", ((int)(Math.random()*10000))/100.00);
                   aCollection.add(map);
                    
                   objMessage.setObject(aCollection);
                   producer.send(objMessage);  
                  try {  
                       Thread.sleep(2000);  
                   } catch (InterruptedException e) {  
                       e.printStackTrace();  
                  }  
               }  
          } 
         
       }
      

       

      main.mxml

          <mx:Script>
              <![CDATA[
                   import mx.messaging.events.MessageFaultEvent;
                   import mx.controls.Alert;
                   import mx.rpc.events.ResultEvent;
                   import mx.messaging.Consumer;
                   import mx.messaging.Channel;
                   import mx.messaging.ChannelSet;
                   import mx.messaging.events.MessageEvent;
                   import mx.collections.ArrayCollection;
                  
                public function submsg():void
                   {
                       var consumer:Consumer = new Consumer();
                       consumer.destination = "data-push-jms";
                       consumer.channelSet = new ChannelSet(["my-streaming-amf"]);
                       consumer.addEventListener(MessageEvent.MESSAGE, messageHandler);
                       consumer.addEventListener(MessageFaultEvent.FAULT,handleFaults);
                       consumer.subscribe();
                   }
                   
                  private function handleFaults(event:MessageFaultEvent):void
                {
                     Alert.show(event.faultString);
                }
                  
                   private function messageHandler(event:MessageEvent):void
                   {
                       var tick:ArrayCollection = ArrayCollection(event.message.body);
                       dg.dataProvider = tick;
                   }
              ]]>
          </mx:Script>
          <mx:DataGrid id="dg" x="25" y="29" width="397" height="170">
               <mx:columns>
                   <mx:DataGridColumn dataField="name" headerText="设备名称"/>
                   <mx:DataGridColumn dataField="cup_use_rate" headerText="CPU使用率"/>
                   <mx:DataGridColumn dataField="borad_temp" headerText="主板温度"/>
               </mx:columns>
          </mx:DataGrid>
      

       

      The activemq-jms-ds.xml file for integration of jboss and ActiveMQ is as follows:

      <?xml version="1.0" encoding="UTF-8"?>
      
      <!DOCTYPE connection-factories
          PUBLIC "-//JBoss//DTD JBOSS JCA Config 1.5//EN"
          "http://www.jboss.org/j2ee/dtd/jboss-ds_1_5.dtd">
          
      <connection-factories>
      
         <tx-connection-factory>
            <jndi-name>activemq/QueueConnectionFactory</jndi-name>
            <xa-transaction/>
            <track-connection-by-tx/>
            <rar-name>activemq-ra.rar</rar-name>
            <connection-definition>javax.jms.QueueConnectionFactory</connection-definition>
            <ServerUrl>vm://localhost</ServerUrl>
           <!--  <ServerUrl>tcp://localhost:61616</ServerUrl>  -->
            <!--
            <UserName>sa</UserName>
            <Password></Password>
            -->
            <min-pool-size>1</min-pool-size>
            <max-pool-size>200</max-pool-size>
             <blocking-timeout-millis>30000</blocking-timeout-millis>
            <idle-timeout-minutes>3</idle-timeout-minutes>
         </tx-connection-factory>
      
         <tx-connection-factory>
            <jndi-name>activemq/TopicConnectionFactory</jndi-name>
            <xa-transaction/>
             <track-connection-by-tx/>
            <rar-name>activemq-ra.rar</rar-name>
            <connection-definition>javax.jms.TopicConnectionFactory</connection-definition>
            <ServerUrl>vm://localhost</ServerUrl>
            <!--  <ServerUrl>tcp://localhost:61616</ServerUrl>  -->
            <!--
            <UserName>sa</UserName>
            <Password></Password>
            -->
            <min-pool-size>1</min-pool-size>
            <max-pool-size>200</max-pool-size>
            <blocking-timeout-millis>30000</blocking-timeout-millis>
            <idle-timeout-minutes>3</idle-timeout-minutes>
         </tx-connection-factory>
      
         <mbean code="org.jboss.resource.deployment.AdminObject" name="activemq.queue:name=outboundQueue">
            <attribute name="JNDIName">activemq/queue/outbound</attribute>
            <depends optional-attribute-name="RARName">jboss.jca:service=RARDeployment,name='activemq-ra.rar'</depends>
            <attribute name="Type">javax.jms.Queue</attribute>
            <attribute name="Properties">PhysicalName=queue.outbound</attribute>
         </mbean>
      
         <mbean code="org.jboss.resource.deployment.AdminObject" name="activemq.topic:name=inboundTopic">
            <attribute name="JNDIName">activemq/topic/inbound</attribute>
            <depends optional-attribute-name="RARName">jboss.jca:service=RARDeployment,name='activemq-ra.rar'</depends>
            <attribute name="Type">javax.jms.Topic</attribute>
            <attribute name="Properties">PhysicalName=topic.inbound</attribute>
         </mbean>
      </connection-factories>
      

       

      Expected for your help!

      Thanks very much!