5 Replies Latest reply on Feb 17, 2011 5:49 AM by Nourso

    Java server Push DataMessage through RTMP LC DataService

    Nourso

      Hello all,

       

      I'm doing a POC to push Data from a (Java) server, though LCDS 3.1 's DataService using RTMP.

       

      Configuration is OK. Adobe Air client DataMessage to server (+Assembler saving in DB) : OK 

       

      I found lots of examples with AsyncMessage, but as This is an RTMP destination through a DataService service, I must send a DataMessage.

       

      Appearently, there are some bugs (or I am missing things/good API doc!).

      So please, could you help me?

       

      Here is the code that does the push. The key method is doPush()

      package mypackage.lcds.service.ds.impl;

      import java.util.HashMap;
      import java.util.HashSet;
      import java.util.Map;
      import java.util.Set;

      import org.apache.commons.collections.CollectionUtils;
      import org.apache.log4j.Logger;
      import org.springframework.stereotype.Service;

      import mypackage.lcds.service.ds.DataPushService;
      import mypackage.model.dto.AbstractDto;
      import mypackage.model.exception.DsPushException;
      import flex.data.messages.DataMessage;
      import flex.messaging.MessageBroker;
      import flex.messaging.messages.Message;
      import flex.messaging.services.MessageService;
      import flex.messaging.util.UUIDUtils;

      /**
      * Implementation of {@link DataPushService}.
      */
      // see http://forums.adobe.com/thread/580667
      // MessageCLient :
      // http://livedocs.adobe.com/livecycle/8.2/programLC/programmer/lcds/help.html?content=lcconn ections_2.html
      @Service
      public final class DataPushServiceImpl implements DataPushService {
          private static final Logger LOG = Logger.getLogger(DataPushServiceImpl.class);

          /**
           * Destination name for Data-service.<br>
           * See data-management-config.XML.
           */
          private static final String DESTINATION_NAME__POC_DS_XCHANGE = "poc-ds-xchange";

          /**
           * See data-management-config.XML.
           */
          private static final String PUSH_DTO_SERVICE__NAME = "data-service";

          /**
           * set "manually" by Spring (contexts workaround; not autowired).
           */
          private MessageBroker messageBroker = null;

          /**
           * Does the push of a single DTO.<br>
           * Only subscriberId's that are {@link Long} values will be used. Other Id's do not get a Message sent.
           *
           * @param dto
           *            {@link AbstractDto} object.
           * @param subscriberIds
           *            {@link Set} of LCDS Message subscriber IDs {@link Long}. If null, sends to all connected clients.
           *
           * @throws DsPushException
           *             if any error
           */
          @SuppressWarnings("unchecked")
          private void doPush(final AbstractDto dto, final Set<Long> subscriberIds)
                  throws DsPushException {

              Set<?> ids = new HashSet<Object>();

              // obtain message service by means of message broker
              MessageService messageService = this.getMessageService();

              DataMessage message = this.createMessage(dto, messageService);

              // fill ids
              if ((subscriberIds == null) || (subscriberIds.isEmpty())) {
                  if (LOG.isDebugEnabled()) {
                      LOG.debug("Sending message all currently connected subscriberIds ");
                  }

                  Set idsFromDS = messageService.getSubscriberIds(message, true);
                  if ((idsFromDS != null) && (!idsFromDS.isEmpty())) {
                      CollectionUtils.addAll(ids, idsFromDS.iterator());
                  }
              } else {
                  CollectionUtils.addAll(ids, subscriberIds.iterator());
              }

              if (ids.isEmpty()) {
                  if (LOG.isDebugEnabled()) {
                      LOG.debug("No subscriberId to send the Message to.");
                      LOG.debug("Known subscribers : " + messageService.getSubscriberIds(message, true).toString());
                  }
              } else {

                  if (LOG.isDebugEnabled()) {
                      LOG.debug("Sending message to subscriberIds : " + subscriberIds.toString());
                      LOG.debug("Known subscribers : " + messageService.getSubscriberIds(message, true).toString());
                  }

                  // send messages to all subscriberIds 1 by 1
                  Object responsePayload = null;
                  boolean isSent = false;
                  for (Object id : ids) {

                      if (id instanceof Long) {
                          try {
                              message.setHeader(Message.DESTINATION_CLIENT_ID_HEADER, id);
                              if (LOG.isDebugEnabled()) {
                                  LOG.debug("Sending LCDS DataMessage to subscriber [" + id + "] \n" + message.toString(2));
                              }
                              responsePayload = messageService.serviceMessage(message, true);

                              // no exception ==> means OK?
                              // TODO TEST retuned payload
                              isSent = true;

                          } catch (Exception e) {
                              LOG.error("Error while sending message to subscriberId " + id, e);
                              isSent = false;
                          } finally {
                              if (LOG.isDebugEnabled()) {
                                  LOG.debug("Message sent to '" + String.valueOf(id) + "' : " + String.valueOf(isSent));
                              }
                          }
                      } else if (LOG.isDebugEnabled()) {
                          LOG.debug("Avoiding subscriber ID (not a Long value) : " + String.valueOf(id));
                      }
                  }
              }
          }

          /**
           * {@inheritDoc}
           *
           * @see DataPushService#pushToAllClients(AbstractDto)
           */
          // TODO test : if client is not connected, does LCDS record it for later (offline mode on the server?)
          public void pushToAllClients(final AbstractDto dto) throws DsPushException {
              this.doPush(dto, null);
          }

          public void pushTo1Client(AbstractDto dto, Long subscriberId) throws DsPushException {
              Set<Long> subscriberIds = new HashSet<Long>();
              subscriberIds.add(subscriberId);

              this.doPush(dto, subscriberIds);
          }

          /**
           * {@inheritDoc}<br>
           * subscriberIds refer to the 'clientId' set by the client app when it subscribes to the DS destination.
           *
           * @see DataPushService#pushToClients(AbstractDto, Set)
           */
          public void pushToClients(final AbstractDto dto, final Set<Long> subscriberIds) throws DsPushException {
              this.doPush(dto, subscriberIds);
          }

          @SuppressWarnings("unchecked")
          private DataMessage createMessage(final AbstractDto dto, final MessageService messageService) {
              DataMessage msg = new DataMessage();
              msg.setClientId(getServerId());
              msg.setTimestamp(System.currentTimeMillis());
              msg.setMessageId(UUIDUtils.createUUID(true));
              msg.setCorrelationId(msg.getMessageId()); // TODO OK messageId == CorrelationId ?
              msg.setDestination(DESTINATION_NAME__POC_DS_XCHANGE);
              msg.setBody(dto);
              msg.setOperation(DataMessage.CREATE_AND_SEQUENCE_OPERATION); // TODO OK operation?

              Map identity = new HashMap(2);
              // see data-management-config.xml
              identity.put("id", dto.getId());
              msg.setIdentity(identity);

              // FIXME set priority. How?
              if (LOG.isDebugEnabled()) {
                  LOG.debug("LCDS DataMessage created : \n" + msg.toString(2));
              }
              return msg;
          }

          private Object getServerId() {
              // FIXME OK?
              return "X-BACKEND";
          }

          /**
           * Get the current {@link MessageBroker}'s service layer.
           *
           * @return {@link MessageService} to use for push data
           */
          private MessageService getMessageService() {
              if (LOG.isDebugEnabled()) {
                  LOG.debug("Getting MessageBroker's DataService service ");
              }

              // Was : return (MessageService) MessageBroker.getMessageBroker(null).getService(PUSH_DTO_SERVICE__NAME);
              return (MessageService) this.messageBroker.getService(PUSH_DTO_SERVICE__NAME);
          }

          /**
           * Set the messageBroker. For SPring.
           *
           * @param messageBroker
           *            the messageBroker to set
           */
          public void setMessageBroker(final MessageBroker messageBroker) {
              this.messageBroker = messageBroker;
          }
      }

      NOTE : the messagebroker is set once through Spring. It works for this POC.

       

      I have a Servlet that saves a DTO to the DB and then tries to push it through the service. All seems OK, but I get a NullPointerException (NPE).

      Here is the Tomcat 6 LOG (it sends to subscriberID '99' ):

       

      LCDS DataMessage created :
      Flex Message (flex.data.messages.DataMessage)
            operation = create_and_sequence
            id = {id=3203}
            clientId = X-BACKEND
            correlationId = 7E6C3051-FA0F-9183-4745-B90ACACD71EA
            destination = poc-ds-xchange
            messageId = 7E6C3051-FA0F-9183-4745-B90ACACD71EA
            timestamp = 1297412881050
            timeToLive = 0
            body = mypackage.model.dto.XchangeDto[id=3203[clientId=2[userId=123456[text=InterActionServlet Test]
      09:28:01,065 DEBUG [impl.DataPushServiceImpl] Sending message to subscriberIds : [99]
      09:28:01,065 DEBUG [impl.DataPushServiceImpl] Known subscribers : [99]
      09:28:01,065 DEBUG [impl.DataPushServiceImpl] Sending LCDS DataMessage to subscriber [99]
      Flex Message (flex.data.messages.DataMessage)
            operation = create_and_sequence
            id = {id=3203}
            clientId = X-BACKEND
            correlationId = 7E6C3051-FA0F-9183-4745-B90ACACD71EA
            destination = poc-ds-xchange
            messageId = 7E6C3051-FA0F-9183-4745-B90ACACD71EA
            timestamp = 1297412881050
            timeToLive = 0
            body = mypackage.model.dto.XchangeDto[id=3203[clientId=2[userId=123456[text=InterActionServlet Test]
            hdr(DSDstClientId) = 99
      09:28:02,456 ERROR [impl.DataPushServiceImpl] Error while sending message to subscriberId 99
      java.lang.NullPointerException
          at flex.data.adapters.JavaAdapter.invokeAssemblerSync(JavaAdapter.java:1741)
          at flex.data.adapters.JavaAdapter.invokeBatchOperation(JavaAdapter.java:1630)
          at flex.data.adapters.JavaAdapter.invoke(JavaAdapter.java:658)
          at flex.messaging.services.MessageService.serviceMessage(MessageService.java:318)
          at flex.messaging.services.MessageService.serviceMessage(MessageService.java:233)
          at mypackage.lcds.service.ds.impl.DataPushServiceImpl.doPush(DataPushServiceImpl.java:142)
          at mypackage.lcds.service.ds.impl.DataPushServiceImpl.pushTo1Client(DataPushServiceImpl.java :178)
          at mypackage.servlet.InteractionServlet.push(InteractionServlet.java:75)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
          at java.lang.reflect.Method.invoke(Unknown Source)
          at org.springframework.web.bind.annotation.support.HandlerMethodInvoker.doInvokeMethod(Handl erMethodInvoker.java:421)
          at org.springframework.web.bind.annotation.support.HandlerMethodInvoker.invokeHandlerMethod( HandlerMethodInvoker.java:136)
          at org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter.invokeHandl erMethod(AnnotationMethodHandlerAdapter.java:326)
          at org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter.handle(Anno tationMethodHandlerAdapter.java:313)
          at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:875)
          at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:807)
          at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:571 )
          at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:501)
          at javax.servlet.http.HttpServlet.service(HttpServlet.java:690)
          at javax.servlet.http.HttpServlet.service(HttpServlet.java:803)
          at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.j ava:290)
          at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
          at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:233)
          at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:175)
          at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:128)
          at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:102)
          at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:109)
          at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:263)
          at org.apache.coyote.http11.Http11Processor.process(Http11Processor.java:844)
          at org.apache.coyote.http11.Http11Protocol$Http11ConnectionHandler.process(Http11Protocol.ja va:584)
          at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:447)
          at java.lang.Thread.run(Unknown Source)
      09:28:02,472 DEBUG [impl.DataPushServiceImpl] Message sent to '99' : false

       

      ==> what am I doing wrong?

      I cannot trace the code (I do not have the source), but the exception thrown is just not helping at all.

      Am I missing a header to set?

       

      Thank you so much for your help,

      Gregg

        • 1. Re: Java server Push DataMessage through RTMP LC DataService
          Rohit . Kumar Adobe Employee

          Hi Nuroso,

           

          What version of LCDS are you using?

           

          As such, you need not create DataMessage in order to do server side push. You can directly use DataServiceTransaction to push the updates. See the following section of the LCDS guide for an example: http://help.adobe.com/en_US/LiveCycleDataServicesES/3.1/Developing/WS4ba8596dc6a25eff5473e 3781271fa38d0b-7fff.html

           

          Rohit

          1 person found this helpful
          • 2. Re: Java server Push DataMessage through RTMP LC DataService
            Nourso Level 1

            Hi, Kumar,

             

            As stated at the beginning of the post, I'm using LCDS 3.1.

             

            My "problem" is then that I want to post to only 1 subscriberId.

            With your answer, all the users are notified, what I really don't need. I need to log each user's actions (such as read, download,...) and not all users have access to all records.

             

            I'll check the API anyways to see if it could help.

             

            Thank you for your time

             

            EDIT : I marked your answer helpful as maybe I'll be able to use selector values... I'll check that on Monday (Western Europ here )

            Many thanks

            • 3. Re: Java server Push DataMessage through RTMP LC DataService
              PamColwell Adobe Employee

              Since you're filtering your query according to a particular user, then your DataService's fill() parameters could contain a unique token for that user.  When the fill is refreshed on the server after the createItem() (as mentioned in Rohit Kumar's link) only that will be refreshed and pushed out to the client with that token.

               

              The DataServiceTransaction api is the way to go, if you're using LC DataService.

               

              Hope this helps

              Pam

              • 4. Re: Java server Push DataMessage through RTMP LC DataService
                Nourso Level 1

                For the records, the NullPointerException problem was solved here :

                http://forums.adobe.com/thread/791741?tstart=0

                 

                The DataServiceTransaction api is the way to go, if you're using LC DataService.

                I would rather say "The DataServiceTransaction api is A MUST-HAVE, if you're using LC DataService." 

                 

                ==> I now can send a Message, I can go on trying the above-mentionned solutions for pushing to 1 subscriber...

                 

                Have a good day

                • 5. Re: Java server Push DataMessage through RTMP LC DataService
                  Nourso Level 1

                  OK - it works now...

                   

                  Thank you for guiding me.

                   

                  for the records, here's the working code (can save days of research )

                  /**
                  * ASSUMPTION : the client Flex/Air apps set the desired userId (= filter) as a fillParameter of the
                  * DataService.fill() method. This will filter output based on {@link AbstractDto#getUserId()}.
                  */

                  @Service
                  public final class DataPushServiceImpl implements DataPushService {
                      private static final Logger LOG = Logger.getLogger(DataPushServiceImpl.class);

                      /* *********** V2 : DataServiceTransaction.createItem() ********* */
                      /**
                       * Does the push of a single DTO.
                       *
                       * @param dto
                       *            {@link AbstractDto} object. Contains the {@link AbstractDto#getUserId()} that is used by clients to
                       *            filter data in the DataService.fill() method (used by the Assembler).
                       *
                       * @throws DsPushException
                       *             if any error
                       */
                      private boolean doPushViaTransaction(final AbstractDto dto) throws DsPushException {

                          if (LOG.isDebugEnabled()) {
                              LOG.debug("Sending message through DataServiceTransaction (see userId field) : " + dto.toString());
                          }

                          // One MUST instantiate a DataServiceTransaction to be able to send anything (NullPointerException)
                          DataServiceTransaction dtx = null;
                          boolean isOwnerOfTx = false;
                          boolean isSent = false;
                          try {
                              // if already in an Assembler, we do have a tx ==> no commit nor rollback!
                              dtx = DataServiceTransaction.getCurrentDataServiceTransaction();
                              if (dtx == null) {
                                  // new one, no JTA ==> ourselves : commit or rollback
                                  isOwnerOfTx = true;
                                  //MessageBroker instantiated with SpringFlex is auto-named
                                  dtx = DataServiceTransaction.begin("_messageBroker", false);
                              }

                              isSent = this.doTransactionSend(dto, dtx);

                          } catch (Exception e) {
                              // Log exception, but no impact on the back-end business ==> swallow Exception
                              LOG.error("Could not send the creation to LCDS", e);
                              if (isOwnerOfTx) {
                                  dtx.rollback();
                              }
                          } finally {
                              try {
                                  if (isOwnerOfTx && (dtx != null)) {
                                      dtx.commit();
                                  }
                              } catch (Exception e) {
                                  // swallow
                                  LOG.error("Could not send the creation to LCDS (@commit of the DataServiceTransaction)", e);
                              }
                          }

                          return isSent;

                      }

                      private boolean doTransactionSend(final AbstractDto dto, final DataServiceTransaction dtx) {

                          boolean isSent = false;

                          if (dto == null) {
                              LOG.error("The given DTO is null! Nothing happens");

                          } else {
                              try {
                                  dtx.createItem(FlexUtils.DESTINATION_NAME__POC_DS, dto);
                                  isSent = true; // no problem
                              } catch (Exception e) {
                                  // Log exception, but no impact on the business
                                  LOG.error("Could not send the creation to LCDS for DTO " + dto.toString(), e);
                              } finally {
                                  if (LOG.isDebugEnabled()) {
                                      LOG.debug("DTO : " + dto.toString() + "\n sent : " + String.valueOf(isSent));
                                  }
                              }
                          }

                          return isSent;
                      }

                      //implementation of DataPushService interface
                      /**
                       * {@inheritDoc}
                       *
                       * @see DataPushService#pushNewDTo(AbstractDto, java.lang.Long)
                       */
                      @Transactional(rollbackFor = DsPushException.class)
                      public boolean pushNewDTo(final AbstractDto dto, final Long subscriberId) throws DsPushException {
                          return this.doPushViaTransaction(dto);
                      }

                  }

                  Good day to you all !