API_Reference_DefaultMQProducer

DefaultMQProducer


类简介

public class DefaultMQProducer extends ClientConfig implements MQProducer

DefaultMQProducer类是应用用来投递消息的入口,开箱即用,可通过无参构造方法快速创建一个生产者。主要负责消息的发送,支持同步/异步/oneway 的发送方式,这些发送方式均支持批量发送。可以通过该类提供的 getter/setter 方法,调整发送者的参数。DefaultMQProducer提供了多个 send 方法,每个 send 方法略有不同,在使用前务必详细了解其意图。下面给出一个生产者示例代码,点击查看更多示例代码

注意:该类是线程安全的。在配置并启动完成后可在多个线程间安全共享。

字段摘要

构造方法摘要

使用方法摘要

字段详细信息

  • producerGroup
    private String producerGroup
    生产者的分组名称。相同的分组名称表明生产者实例在概念上归属于同一分组。这对事务消息十分重要,如果原始生产者在事务之后崩溃,那么 broker 可以联系同一生产者分组的不同生产者实例来提交或回滚事务。
    默认值:DEFAULT_PRODUCER
    注意: 由数字、字母、下划线、横杠(-)、竖线(|)或百分号组成;不能为空;长度不能超过 255。
  • defaultMQProducerImpl
    protected final transient DefaultMQProducerImpl defaultMQProducerImpl
    生产者的内部默认实现,在构造生产者时内部自动初始化,提供了大部分方法的内部实现。
  • createTopicKey
    private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
    在发送消息时,自动创建服务器不存在的 topic,需要指定 Key,该 Key 可用于配置发送消息所在 topic 的默认路由。
    默认值:TBW102
    建议:测试或者 demo 使用,生产环境下不建议打开自动创建配置。
  • defaultTopicQueueNums
    private volatile int defaultTopicQueueNums = 4
    创建 topic 时默认的队列数量。
    默认值:4
  • sendMsgTimeout
    private int sendMsgTimeout = 3000
    发送消息时的超时时间。
    默认值:3000,单位:毫秒
    建议:不建议修改该值,该值应该与 broker 配置中的 sendTimeout 一致,发送超时,可临时修改该值,建议解决超时问题,提高 broker 集群的 Tps。
  • compressMsgBodyOverHowmuch
    private int compressMsgBodyOverHowmuch = 1024 * 4
    压缩消息体阈值。大于 4K 的消息体将默认进行压缩。
    默认值:1024 * 4,单位:字节
    建议:可通过 DefaultMQProducerImpl.setZipCompressLevel 方法设置压缩率(默认为 5,可选范围[0,9]);可通过 DefaultMQProducerImpl.tryToCompressMessage 方法测试出 compressLevel 与 compressMsgBodyOverHowmuch 最佳值。
  • retryTimesWhenSendFailed
    private int retryTimesWhenSendFailed = 2
    同步模式下,在返回发送失败之前,内部尝试重新发送消息的最大次数。
    默认值:2,即:默认情况下一条消息最多会被投递 3 次。
    注意:在极端情况下,这可能会导致消息的重复。
  • retryTimesWhenSendAsyncFailed
    private int retryTimesWhenSendAsyncFailed = 2
    异步模式下,在发送失败之前,内部尝试重新发送消息的最大次数。
    默认值:2,即:默认情况下一条消息最多会被投递 3 次。
    注意:在极端情况下,这可能会导致消息的重复。
  • retryAnotherBrokerWhenNotStoreOK
    private boolean retryAnotherBrokerWhenNotStoreOK = false
    同步模式下,消息保存失败时是否重试其他 broker。
    默认值:false
    注意:此配置关闭时,非投递时产生异常情况下,会忽略 retryTimesWhenSendFailed 配置。
  • maxMessageSize
    private int maxMessageSize = 1024 * 1024 * 4
    消息的最大大小。当消息题的字节数超过 maxMessageSize 就发送失败。
    默认值:1024 _ 1024 _ 4,单位:字节
  • traceDispatcher
    private TraceDispatcher traceDispatcher = null
    在开启消息追踪后,该类通过 hook 的方式把消息生产者,消息存储的 broker 和消费者消费消息的信息像链路一样记录下来。在构造生产者时根据构造入参 enableMsgTrace 来决定是否创建该对象。

构造方法详细信息

  1. DefaultMQProducer
    public DefaultMQProducer()
    创建一个新的生产者。
  2. DefaultMQProducerDefaultMQProducer(final String producerGroup)使用指定的分组名创建一个生产者。
    • 入参描述:
  3. DefaultMQProducerDefaultMQProducer(final String producerGroup, boolean enableMsgTrace)使用指定的分组名创建一个生产者,并设置是否开启消息追踪。
    • 入参描述:
  4. DefaultMQProducerDefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)使用指定的分组名创建一个生产者,并设置是否开启消息追踪及追踪 topic 的名称。
    • 入参描述:
  5. DefaultMQProducerDefaultMQProducer(RPCHook rpcHook)使用指定的 hook 创建一个生产者。
    • 入参描述:
  6. DefaultMQProducerDefaultMQProducer(final String producerGroup, RPCHook rpcHook)使用指定的分组名及自定义 hook 创建一个生产者。
    • 入参描述:
  7. DefaultMQProducerDefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic)使用指定的分组名及自定义 hook 创建一个生产者,并设置是否开启消息追踪及追踪 topic 的名称。
    • 入参描述:

使用方法详细信息

  1. createTopicpublic void createTopic(String key, String newTopic, int queueNum)在 broker 上创建一个 topic。
    • 入参描述:
    • 返回值描述:
      void
    • 异常描述:
      MQClientException - 生产者状态非 Running;未找到 broker 等客户端异常。
  2. createTopicpublic void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)在 broker 上创建一个 topic。
    • 入参描述:
    • 返回值描述:
      void
    • 异常描述:
      MQClientException - 生产者状态非 Running;未找到 broker 等客户端异常。
  3. earliestMsgStoreTimepublic long earliestMsgStoreTime(MessageQueue mq)查询最早的消息存储时间。
    • 入参描述:
    • 返回值描述:
      指定队列最早的消息存储时间。单位:毫秒。
    • 异常描述:
      MQClientException - 生产者状态非 Running;没有找到 broker;broker 返回失败;网络异常;线程中断等客户端异常。
  4. fetchPublishMessageQueuespublic List<MessageQueue> fetchPublishMessageQueues(String topic)获取 topic 的消息队列。
    • 入参描述:
    • 返回值描述:
      传入 topic 下的消息队列。
    • 异常描述:
      MQClientException - 生产者状态非 Running;没有找到 broker;broker 返回失败;网络异常;线程中断等客户端异常。
  5. maxOffsetpublic long maxOffset(MessageQueue mq)查询消息队列的最大物理偏移量。
    • 入参描述:
    • 返回值描述:
      给定消息队列的最大物理偏移量。
    • 异常描述:
      MQClientException - 生产者状态非 Running;没有找到 broker;broker 返回失败;网络异常;线程中断等客户端异常。
  6. minOffsetpublic long minOffset(MessageQueue mq)查询给定消息队列的最小物理偏移量。
    • 入参描述:
    • 返回值描述:
      给定消息队列的最小物理偏移量。
    • 异常描述:
      MQClientException - 生产者状态非 Running;没有找到 broker;broker 返回失败;网络异常;线程中断等客户端异常。
  7. queryMessagepublic QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)按关键字查询消息。
    • 入参描述:
    • 返回值描述:
      查询到的消息集合。
    • 异常描述:
      MQClientException - 生产者状态非 Running;没有找到 broker;broker 返回失败;网络异常等客户端异常客户端异常。

InterruptedException - 线程中断。

  1. searchOffsetpublic long searchOffset(MessageQueue mq, long timestamp)查找指定时间的消息队列的物理偏移量。
    • 入参描述:
    • 返回值描述:
      指定时间的消息队列的物理偏移量。
    • 异常描述:
      MQClientException - 生产者状态非 Running;没有找到 broker;broker 返回失败;网络异常;线程中断等客户端异常。
  2. sendpublic SendResult send(Collection<Message> msgs)同步批量发送消息。在返回发送失败之前,内部尝试重新发送消息的最大次数(参见retryTimesWhenSendFailed属性)。未明确指定发送队列,默认采取轮询策略发送。
    • 入参描述:
    • 返回值描述:
      批量消息的发送结果,包含 msgId,发送状态等信息。
    • 异常描述:
      MQClientException - broker 不存在或未找到;namesrv 地址为空;未找到 topic 的路由信息等客户端异常。

RemotingException - 网络异常。

MQBrokerException - broker 发生错误。

InterruptedException - 发送线程中断。

RemotingTooMuchRequestException - 发送超时。

  1. sendpublic SendResult send(Collection<Message> msgs, long timeout)同步批量发送消息,如果在指定的超时时间内未完成消息投递,会抛出RemotingTooMuchRequestException。在返回发送失败之前,内部尝试重新发送消息的最大次数(参见retryTimesWhenSendFailed属性)。未明确指定发送队列,默认采取轮询策略发送。
    • 入参描述:
    • 返回值描述:
      批量消息的发送结果,包含 msgId,发送状态等信息。
    • 异常描述:
      MQClientException - broker 不存在或未找到;namesrv 地址为空;未找到 topic 的路由信息等客户端异常。

RemotingException - 网络异常。

MQBrokerException - broker 发生错误。

InterruptedException - 发送线程中断。

RemotingTooMuchRequestException - 发送超时。

  1. sendpublic SendResult send(Collection<Message> msgs, MessageQueue messageQueue)向给定队列同步批量发送消息。注意:指定队列意味着所有消息均为同一个 topic。
    • 入参描述:
    • 返回值描述:
      批量消息的发送结果,包含 msgId,发送状态等信息。
    • 异常描述:
      MQClientException - broker 不存在或未找到;namesrv 地址为空;未找到 topic 的路由信息等客户端异常。

RemotingException - 网络异常。

MQBrokerException - broker 发生错误。

InterruptedException - 发送线程中断。

RemotingTooMuchRequestException - 发送超时。

  1. sendpublic SendResult send(Collection<Message> msgs, MessageQueue messageQueue, long timeout)向给定队列同步批量发送消息,如果在指定的超时时间内未完成消息投递,会抛出RemotingTooMuchRequestException。注意:指定队列意味着所有消息均为同一个 topic。
    • 入参描述:
    • 返回值描述:
      批量消息的发送结果,包含 msgId,发送状态等信息。
    • 异常描述:
      MQClientException - broker 不存在或未找到;namesrv 地址为空;未找到 topic 的路由信息等客户端异常。

RemotingException - 网络异常。

MQBrokerException - broker 发生错误。

InterruptedException - 发送线程中断。

RemotingTooMuchRequestException - 发送超时。

  1. sendpublic SendResult send(Message msg)以同步模式发送消息,仅当发送过程完全完成时,此方法才会返回。在返回发送失败之前,内部尝试重新发送消息的最大次数(参见retryTimesWhenSendFailed属性)。未明确指定发送队列,默认采取轮询策略发送。
    • 入参描述:
    • 返回值描述:
      消息的发送结果,包含 msgId,发送状态等信息。
    • 异常描述:
      MQClientException - broker 不存在或未找到;namesrv 地址为空;未找到 topic 的路由信息等客户端异常。

RemotingException - 网络异常。

MQBrokerException - broker 发生错误。

InterruptedException - 发送线程中断。

RemotingTooMuchRequestException - 发送超时。

  1. sendpublic SendResult send(Message msg, long timeout)以同步模式发送消息,如果在指定的超时时间内未完成消息投递,会抛出RemotingTooMuchRequestException。仅当发送过程完全完成时,此方法才会返回。在返回发送失败之前,内部尝试重新发送消息的最大次数(参见retryTimesWhenSendFailed属性)。未明确指定发送队列,默认采取轮询策略发送。
    • 入参描述:
    • 返回值描述:
      消息的发送结果,包含 msgId,发送状态等信息。
    • 异常描述:
      MQClientException - broker 不存在或未找到;namesrv 地址为空;未找到 topic 的路由信息等客户端异常。

RemotingException - 网络异常。

MQBrokerException - broker 发生错误。

InterruptedException - 发送线程中断。

RemotingTooMuchRequestException - 发送超时。

  1. sendpublic SendResult send(Message msg, MessageQueue mq)向指定的消息队列同步发送单条消息。仅当发送过程完全完成时,此方法才会返回。
    • 入参描述:
    • 返回值描述:
      消息的发送结果,包含 msgId,发送状态等信息。
    • 异常描述:
      MQClientException - broker 不存在或未找到;namesrv 地址为空;未找到 topic 的路由信息等客户端异常。

RemotingException - 网络异常。

MQBrokerException - broker 发生错误。

InterruptedException - 发送线程中断。

RemotingTooMuchRequestException - 发送超时。

  1. sendpublic SendResult send(Message msg, MessageQueue mq, long timeout)向指定的消息队列同步发送单条消息,如果在指定的超时时间内未完成消息投递,会抛出RemotingTooMuchRequestException。仅当发送过程完全完成时,此方法才会返回。
    • 入参描述:
    • 返回值描述:
      消息的发送结果,包含 msgId,发送状态等信息。
    • 异常描述:
      MQClientException - broker 不存在或未找到;namesrv 地址为空;未找到 topic 的路由信息等客户端异常。

RemotingException - 网络异常。

MQBrokerException - broker 发生错误。

InterruptedException - 发送线程中断。

RemotingTooMuchRequestException - 发送超时。

  1. sendpublic void send(Message msg, MessageQueue mq, SendCallback sendCallback)向指定的消息队列异步发送单条消息,异步发送调用后直接返回,并在在发送成功或者异常时回调sendCallback,所以异步发送时sendCallback参数不能为 null,否则在回调时会抛出NullPointerException。异步发送时,在成功发送前,其内部会尝试重新发送消息的最大次数(参见retryTimesWhenSendAsyncFailed属性)。
    • 入参描述:
    • 返回值描述:
      void
    • 异常描述:
      MQClientException - broker 不存在或未找到;namesrv 地址为空;未找到 topic 的路由信息等客户端异常。

RemotingException - 网络异常。

InterruptedException - 发送线程中断。

  1. sendpublic void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)向指定的消息队列异步发送单条消息,异步发送调用后直接返回,并在在发送成功或者异常时回调sendCallback,所以异步发送时sendCallback参数不能为 null,否则在回调时会抛出NullPointerException。若在指定时间内消息未发送成功,回调方法会收到RemotingTooMuchRequestException异常。异步发送时,在成功发送前,其内部会尝试重新发送消息的最大次数(参见retryTimesWhenSendAsyncFailed属性)。
    • 入参描述:
    • 返回值描述:
      void
    • 异常描述:
      MQClientException - broker 不存在或未找到;namesrv 地址为空;未找到 topic 的路由信息等客户端异常。

RemotingException - 网络异常。

InterruptedException - 发送线程中断。

  1. sendpublic SendResult send(Message msg, MessageQueueSelector selector, Object arg)向通过MessageQueueSelector计算出的队列同步发送消息。可以通过自实现MessageQueueSelector接口,将某一类消息发送至固定的队列。比如:将同一个订单的状态变更消息投递至固定的队列。注意:此消息发送失败内部不会重试。
    • 入参描述:
    • 返回值描述:
      消息的发送结果,包含 msgId,发送状态等信息。
    • 异常描述:
      MQClientException - broker 不存在或未找到;namesrv 地址为空;未找到 topic 的路由信息等客户端异常。

RemotingException - 网络异常。

MQBrokerException - broker 发生错误。

InterruptedException - 发送线程中断。

RemotingTooMuchRequestException - 发送超时。

  1. sendpublic SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)向通过MessageQueueSelector计算出的队列同步发送消息,并指定发送超时时间。可以通过自实现MessageQueueSelector接口,将某一类消息发送至固定的队列。比如:将同一个订单的状态变更消息投递至固定的队列。注意:此消息发送失败内部不会重试。
    • 入参描述:
    • 返回值描述:
      消息的发送结果,包含 msgId,发送状态等信息。
    • 异常描述:
      MQClientException - broker 不存在或未找到;namesrv 地址为空;未找到 topic 的路由信息等客户端异常。

RemotingException - 网络异常。

MQBrokerException - broker 发生错误。

InterruptedException - 发送线程中断。

RemotingTooMuchRequestException - 发送超时。

  1. sendpublic void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)向通过MessageQueueSelector计算出的队列异步发送单条消息,异步发送调用后直接返回,并在在发送成功或者异常时回调sendCallback,所以异步发送时sendCallback参数不能为 null,否则在回调时会抛出NullPointerException。异步发送时,在成功发送前,其内部会尝试重新发送消息的最大次数(参见retryTimesWhenSendAsyncFailed属性)。可以通过自实现MessageQueueSelector接口,将某一类消息发送至固定的队列。比如:将同一个订单的状态变更消息投递至固定的队列。
    • 入参描述:
    • 返回值描述:
      void
    • 异常描述:
      MQClientException - broker 不存在或未找到;namesrv 地址为空;未找到 topic 的路由信息等客户端异常。

RemotingException - 网络异常。

InterruptedException - 发送线程中断。

  1. sendpublic void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)向通过MessageQueueSelector计算出的队列异步发送单条消息,异步发送调用后直接返回,并在在发送成功或者异常时回调sendCallback,所以异步发送时sendCallback参数不能为 null,否则在回调时会抛出NullPointerException。异步发送时,在成功发送前,其内部会尝试重新发送消息的最大次数(参见retryTimesWhenSendAsyncFailed属性)。可以通过自实现MessageQueueSelector接口,将某一类消息发送至固定的队列。比如:将同一个订单的状态变更消息投递至固定的队列。
    • 入参描述:
    • 返回值描述:
      void
    • 异常描述:
      MQClientException - broker 不存在或未找到;namesrv 地址为空;未找到 topic 的路由信息等客户端异常。

RemotingException - 网络异常。

InterruptedException - 发送线程中断。

  1. sendpublic void send(Message msg, SendCallback sendCallback)异步发送单条消息,异步发送调用后直接返回,并在在发送成功或者异常时回调sendCallback,所以异步发送时sendCallback参数不能为 null,否则在回调时会抛出NullPointerException。异步发送时,在成功发送前,其内部会尝试重新发送消息的最大次数(参见retryTimesWhenSendAsyncFailed属性)。
    • 入参描述:
    • 返回值描述:
      void
    • 异常描述:
      MQClientException - broker 不存在或未找到;namesrv 地址为空;未找到 topic 的路由信息等客户端异常。

RemotingException - 网络异常。

InterruptedException - 发送线程中断。

  1. sendpublic void send(Message msg, SendCallback sendCallback, long timeout)异步发送单条消息,异步发送调用后直接返回,并在在发送成功或者异常时回调sendCallback,所以异步发送时sendCallback参数不能为 null,否则在回调时会抛出NullPointerException。异步发送时,在成功发送前,其内部会尝试重新发送消息的最大次数(参见retryTimesWhenSendAsyncFailed属性)。
    • 入参描述:
    • 返回值描述:
      void
    • 异常描述:
      MQClientException - broker 不存在或未找到;namesrv 地址为空;未找到 topic 的路由信息等客户端异常。

RemotingException - 网络异常。

InterruptedException - 发送线程中断。

  1. sendMessageInTransactionpublic TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, final Object arg)发送事务消息。该类不做默认实现,抛出RuntimeException异常。参见:TransactionMQProducer类。
    • 入参描述:
    • 返回值描述:
      事务结果,参见:LocalTransactionState类。
    • 异常描述:
      RuntimeException - 永远抛出该异常。
  2. sendMessageInTransactionpublic TransactionSendResult sendMessageInTransaction(Message msg, final Object arg)发送事务消息。该类不做默认实现,抛出RuntimeException异常。参见:TransactionMQProducer类。
    • 入参描述:
    • 返回值描述:
      事务结果,参见:LocalTransactionState类。
    • 异常描述:
      RuntimeException - 永远抛出该异常。
  3. sendOnewaypublic void sendOneway(Message msg)以 oneway 形式发送消息,broker 不会响应任何执行结果,和UDP类似。它具有最大的吞吐量但消息可能会丢失。可在消息量大,追求高吞吐量并允许消息丢失的情况下使用该方式。
    • 入参描述:
    • 返回值描述:
      void
    • 异常描述:
      MQClientException - broker 不存在或未找到;namesrv 地址为空;未找到 topic 的路由信息等客户端异常。

RemotingException - 网络异常。

InterruptedException - 发送线程中断。

  1. sendOnewaypublic void sendOneway(Message msg, MessageQueue mq)向指定队列以 oneway 形式发送消息,broker 不会响应任何执行结果,和UDP类似。它具有最大的吞吐量但消息可能会丢失。可在消息量大,追求高吞吐量并允许消息丢失的情况下使用该方式。
    • 入参描述:
    • 返回值描述:
      void
    • 异常描述:
      MQClientException - broker 不存在或未找到;namesrv 地址为空;未找到 topic 的路由信息等客户端异常。

RemotingException - 网络异常。

InterruptedException - 发送线程中断。

  1. sendOnewaypublic void sendOneway(Message msg, MessageQueueSelector selector, Object arg)向通过MessageQueueSelector计算出的队列以 oneway 形式发送消息,broker 不会响应任何执行结果,和UDP类似。它具有最大的吞吐量但消息可能会丢失。可在消息量大,追求高吞吐量并允许消息丢失的情况下使用该方式。
    • 入参描述:
    • 返回值描述:
      void
    • 异常描述:
      MQClientException - broker 不存在或未找到;namesrv 地址为空;未找到 topic 的路由信息等客户端异常。

RemotingException - 网络异常。

InterruptedException - 发送线程中断。

  1. shutdownpublic void shutdown()关闭当前生产者实例并释放相关资源。
    • 入参描述:
      无。
    • 返回值描述:
      void
    • 异常描述:
  2. startpublic void start()启动生产者实例。在发送或查询消息之前必须调用此方法。它执行了许多内部初始化,比如:检查配置、与 namesrv 建立连接、启动一系列心跳等定时任务等。
    • 入参描述:
      无。
    • 返回值描述:
      void
    • 异常描述:
      MQClientException - 初始化过程中出现失败。
  3. viewMessagepublic MessageExt viewMessage(String offsetMsgId)根据给定的 msgId 查询消息。
    • 入参描述:
    • 返回值描述:
      返回MessageExt,包含:topic 名称,消息题,消息 ID,消费次数,生产者 host 等信息。
    • 异常描述:
      RemotingException - 网络层发生错误。

MQBrokerException - broker 发生错误。

InterruptedException - 线程被中断。

MQClientException - 生产者状态非 Running;msgId 非法等。

  1. viewMessagepublic MessageExt viewMessage(String topic, String msgId)根据给定的 msgId 查询消息,并指定 topic。
    • 入参描述:
    • 返回值描述:
      返回MessageExt,包含:topic 名称,消息题,消息 ID,消费次数,生产者 host 等信息。
    • 异常描述:
      RemotingException - 网络层发生错误。

MQBrokerException - broker 发生错误。

InterruptedException - 线程被中断。

MQClientException - 生产者状态非 Running;msgId 非法等。