locked
JMS with Azure Service Bus and AMQP 1.0 RRS feed

  • Question

  • Hello 

    I am facing an issue, while i try to communicate with Queue which is been created in Azure Portal - with Java Program.

                    

    // Copyright (c) Microsoft. All rights reserved.
    // Licensed under the MIT license. See LICENSE file in the project root for full license information.

    package com.microsoft.azure.servicebus.samples.jmsqueuequickstart;

    import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
    import org.apache.commons.cli.*;
    import org.apache.log4j.*;

    import javax.jms.*;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import java.util.Hashtable;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.function.Function;

    /**
     * This sample demonstrates how to send messages from a JMS Queue producer into
     * an Azure Service Bus Queue, and receive them with a JMS message consumer.
     * JMS Queue. 
     */
    public class JmsQueueQuickstart {

        // Number of messages to send
        private static int totalSend = 10;
        //Tracking counter for how many messages have been received; used as termination condition
        private static AtomicInteger totalReceived = new AtomicInteger(0);
        // log4j logger 
        private static Logger logger = Logger.getRootLogger();

        public void run(String connectionString) throws Exception {

            // The connection string builder is the only part of the azure-servicebus SDK library
            // we use in this JMS sample and for the purpose of robustly parsing the Service Bus 
            // connection string. 
            ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);

            // set up JNDI context
            Hashtable<String, String> hashtable = new Hashtable<>();
            hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
            hashtable.put("queue.QUEUE", "27JulyJMSQueue");
            hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
            Context context = new InitialContext(hashtable);
            ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");

            // Look up queue
            Destination queue = (Destination) context.lookup("QUEUE");

            // we create a scope here so we can use the same set of local variables cleanly 
            // again to show the receive side separately with minimal clutter
            {
                // Create Connection
                Connection connection = cf.createConnection(csb.getSasKeyName(), csb.getSasKey());
                // Create Session, no transaction, client ack
                Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

                // Create producer
                MessageProducer producer = session.createProducer(queue);

                // Send messages
                for (int i = 0; i < totalSend; i++) {
                    BytesMessage message = session.createBytesMessage();
                    message.writeBytes(String.valueOf(i).getBytes());
                    producer.send(message);
                    System.out.printf("Sent message %d.\n", i + 1);
                }

                producer.close();
                session.close();
                connection.stop();
                connection.close();
            }

            {
                // Create Connection
                Connection connection = cf.createConnection(csb.getSasKeyName(), csb.getSasKey());
                connection.start();
                // Create Session, no transaction, client ack
                Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                // Create consumer
                MessageConsumer consumer = session.createConsumer(queue);
                // create a listener callback to receive the messages
                consumer.setMessageListener(message -> {
                    try {
                        // receives message is passed to callback
                        System.out.printf("Received message %d with sq#: %s\n",
                                totalReceived.incrementAndGet(), // increments the tracking counter
                                message.getJMSMessageID());
                        message.acknowledge();
                    } catch (Exception e) {
                        logger.error(e);
                    }
                });

                // wait on the main thread until all sent messages have been received
                while (totalReceived.get() < totalSend) {
                    Thread.sleep(1000);
                }
                consumer.close();
                session.close();
                connection.stop();
                connection.close();
            }

            System.out.printf("Received all messages, exiting the sample.\n");
            System.out.printf("Closing queue client.\n");
        }

        public static void main(String[] args) {

            System.exit(runApp(args, (connectionString) -> {
                JmsQueueQuickstart app = new JmsQueueQuickstart();
                try {
                    app.run(connectionString);
                    return 0;
                } catch (Exception e) {
                    System.out.printf("%s", e.toString());
                    return 1;
                }
            }));
        }

        static final String SB_SAMPLES_CONNECTIONSTRING = "SB_SAMPLES_CONNECTIONSTRING";

        public static int runApp(String[] args, Function<String, Integer> run) {
            try {

                String connectionString = null;

                // parse connection string from command line
                Options options = new Options();
                options.addOption(new Option("c", true, "Connection string"));
                CommandLineParser clp = new DefaultParser();
                CommandLine cl = clp.parse(options, args);
                if (cl.getOptionValue("c") != null) {
                    connectionString = cl.getOptionValue("c");
                }

                // get overrides from the environment
                String env = System.getenv(SB_SAMPLES_CONNECTIONSTRING);
                if (env != null) {
                    connectionString = env;
                }

                if (connectionString == null) {
                    HelpFormatter formatter = new HelpFormatter();
                    formatter.printHelp("run jar with", "", options, "", true);
                    return 2;
                }
                return run.apply(connectionString);
            } catch (Exception e) {
                System.out.printf("%s", e.toString());
                return 3;
            }
        }
    }

    I am trying to do this :: and not able to achieve and generate jar file also by running the below command

    java -jar ./target/jmsqueuequickstart-1.0.0-jar-with-dependencies.jar -c "<CONNECTION_STRING>"

    i get below error : 

    ERROR: JDWP Unable to get JNI 1.2 environment, jvm->GetEnv() return code = -2
    Tuesday, July 28, 2020 7:19 AM