OpenMessaging Example

OpenMessaging, which includes the establishment of industry guidelines and messaging, streaming specifications to provide a common framework for finance, ecommerce, IoT and big-data area. The design principles are the cloud-oriented, simplicity, flexibility, and language independent in distributed heterogeneous environments. Conformance to these specifications will make it possible to develop a heterogeneous messaging applications across all major platforms and operating systems.

RocketMQ provides a partial implementation of OpenMessaging 0.1.0-alpha, the following examples demonstrate how to access RocketMQ based on OpenMessaging.

OMSProducer

The following example shows how to send message to RocketMQ broker in synchronous, asynchronous, or one-way transmissions.

public class OMSProducer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

    final Producer producer = messagingAccessPoint.createProducer();

    messagingAccessPoint.startup();
    System.out.printf("MessagingAccessPoint startup OK%n");

    producer.startup();
    System.out.printf("Producer startup OK%n");

    {
        Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
        SendResult sendResult = producer.send(message);
        System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId());
    }

    {
        final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
        result.addListener(new PromiseListener<SendResult>() {
            @Override
            public void operationCompleted(Promise<SendResult> promise) {
                System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
            }

            @Override
            public void operationFailed(Promise<SendResult> promise) {
                System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
            }
        });
    }

    {
        producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
        System.out.printf("Send oneway message OK%n");
    }

    producer.shutdown();
    messagingAccessPoint.shutdown();
}

}

OMSPullConsumer

Use OMS PullConsumer to poll messages from a specified queue.

public class OMSPullConsumer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

    final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
        OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));

    messagingAccessPoint.startup();
    System.out.printf("MessagingAccessPoint startup OK%n");

    consumer.startup();
    System.out.printf("Consumer startup OK%n");

    Message message = consumer.poll();
    if (message != null) {
        String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
        System.out.printf("Received one message: %s%n", msgId);
        consumer.ack(msgId);
    }

    consumer.shutdown();
    messagingAccessPoint.shutdown();
}

}

OMSPushConsumer

Attaches OMS PushConsumer to a specified queue and consumes messages by MessageListener

public class OMSPushConsumer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

    final PushConsumer consumer = messagingAccessPoint.
        createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));

    messagingAccessPoint.startup();
    System.out.printf("MessagingAccessPoint startup OK%n");

    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
        @Override
        public void run() {
            consumer.shutdown();
            messagingAccessPoint.shutdown();
        }
    }));

    consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
        @Override
        public void onMessage(final Message message, final ReceivedMessageContext context) {
            System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
            context.ack();
        }
    });

}

}