中文字幕第五页-中文字幕第页-中文字幕韩国-中文字幕最新-国产尤物二区三区在线观看-国产尤物福利视频一区二区

RocketMQ中如何實現producer消息發送

這篇文章主要介紹了RocketMQ中如何實現producer消息發送,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

在阿合奇等地區,都構建了全面的區域性戰略布局,加強發展的系統性、市場前瞻性、產品創新能力,以專注、極致的服務理念,為客戶提供成都網站建設、成都做網站 網站設計制作定制網站建設,公司網站建設,企業網站建設,成都品牌網站建設,全網整合營銷推廣,成都外貿網站建設公司,阿合奇網站建設費用合理。

RocketMQ中一個topic可以分布在多個broker上,每個broker上又可以包含多個message queue。每個message具體發送到哪個broker的哪個message queue中,這個決策過程是在client端完成的。client會從nameserver訂閱topic的路由信息,按照一定的負載均衡算法為每個message選擇出一個queue,并完成投遞。producer的消息投遞過程大體如下圖:

RocketMQ中如何實現producer消息發送

1. 出錯重試

producer的投遞出錯重試分為兩種情況,同步出錯重試(sync)以及異步出錯重試(async)。同步發送出錯重試是在DefaultMQProducerImpl.sendDefaultImpl 中完成:

            // SYNC模式則開啟重試,ASYNC模式重試在MQClientAPIImpl中實現
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) { // 循環重試最多timesTotal次
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        // ....

                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        // ...
                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        // ...
                        exception = e;
                        continue; // 出錯,進入下一次重試
                    } catch (MQClientException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        // ...
                        exception = e;
                        continue; // 出錯,進入下一次重試
                    } catch (MQBrokerException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        // ...
                        exception = e;
                        switch (e.getResponseCode()) {
                            case ResponseCode.TOPIC_NOT_EXIST:
                            case ResponseCode.SERVICE_NOT_AVAILABLE:
                            case ResponseCode.SYSTEM_ERROR:
                            case ResponseCode.NO_PERMISSION:
                            case ResponseCode.NO_BUYER_ID:
                            case ResponseCode.NOT_IN_CURRENT_UNIT:
                                continue; // 出錯,進入下一次重試
                            default:
                                if (sendResult != null) { // 放棄重試
                                    return sendResult;
                                }

                                throw e;
                        }
                    } catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        // ...
                        throw e; // 放棄重試
                    }
                } else {
                    break;
                }
            }

異步發送的出錯重試則是在更底層的MQClientAPIImpl.sendMessageAsync里實現:

     // 調用invokeAsync發送消息      
     this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
            @Override
            public void operationComplete(ResponseFuture responseFuture) {
                long cost = System.currentTimeMillis() - beginStartTime;
                RemotingCommand response = responseFuture.getResponseCommand();
                // ...

                if (response != null) {
                    try {
                        SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
                        assert sendResult != null;
                        if (context != null) {
                            context.setSendResult(sendResult);
                            context.getProducer().executeSendMessageHookAfter(context);
                        }

                        try {
                            sendCallback.onSuccess(sendResult);
                        } catch (Throwable e) {
                        }

                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
                    } catch (Exception e) {
                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
                        // 發送成功,processSendReponse異常重試
                        onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, e, context, false, producer);
                    }
                } else {
                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
                    if (!responseFuture.isSendRequestOK()) {
                        MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
                        // 消息發送失敗,重試
                        onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, ex, context, true, producer);
                    } else if (responseFuture.isTimeout()) {
                        MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
                            responseFuture.getCause());
                        // 發送超時,重試
                        onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, ex, context, true, producer);
                    } else {
                        MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
                        // 其他錯誤,重試
                        onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, ex, context, true, producer);
                    }
                }
            }
        });

在異常處理的onExceptionImpl方法中會再次觸發sendMessageAsync方法:

    private void onExceptionImpl(final String brokerName,
        final Message msg,
        final long timeoutMillis,
        final RemotingCommand request,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int timesTotal,
        final AtomicInteger curTimes,
        final Exception e,
        final SendMessageContext context,
        final boolean needRetry,
        final DefaultMQProducerImpl producer
    ) {
        int tmp = curTimes.incrementAndGet(); // 將已重試次數+1
        if (needRetry && tmp <= timesTotal) { // 重試
            String retryBrokerName = brokerName;//by default, it will send to the same broker
            if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send
                MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
                retryBrokerName = mqChosen.getBrokerName();
            }
            String addr = instance.findBrokerAddressInPublish(retryBrokerName);
            log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
                retryBrokerName);
            try {
                request.setOpaque(RemotingCommand.createNewRequestId());
                sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
                    timesTotal, curTimes, context, producer);
            } catch (InterruptedException e1) {
                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                    context, false, producer);
            } catch (RemotingConnectException e1) {
                producer.updateFaultItem(brokerName, 3000, true);
                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                    context, true, producer);
            } catch (RemotingTooMuchRequestException e1) {
                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                    context, false, producer);
            } catch (RemotingException e1) {
                producer.updateFaultItem(brokerName, 3000, true);
                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                    context, true, producer);
            }
        } else {

            // ...
        }
    }

2. 負載均衡

消息的負載均衡是通過MQFaultStrategy.selectOneMessageQueue方法來實現的:

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    // 輪詢的方式選擇下一個queue
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    // 根據latency統計數據判斷是否可用
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        // 優先選擇同一個broker上的queue
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }

                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }

        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

感謝你能夠認真閱讀完這篇文章,希望小編分享的“RocketMQ中如何實現producer消息發送”這篇文章對大家有幫助,同時也希望大家多多支持創新互聯,關注創新互聯行業資訊頻道,更多相關知識等著你來學習!

標題名稱:RocketMQ中如何實現producer消息發送
標題鏈接:http://www.2m8n56k.cn/article20/pchjco.html

成都網站建設公司_創新互聯,為您提供關鍵詞優化網站導航標簽優化用戶體驗全網營銷推廣面包屑導航

廣告

聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:[email protected]。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯

外貿網站建設
主站蜘蛛池模板: 亚洲加勒比久久88色综合 | ffyybb免费福利视频 | 日日a.v拍夜夜添久久免费 | 成人精品综合免费视频 | 欧美一级www片免费观看 | 精品一区二区三区免费爱 | 成人毛片免费网站 | 欧美性欲视频 | 看欧美的一级毛片 | 午夜在线播放免费人成无 | 性色xxx| 国产免费成人在线视频 | 亚洲成人免费观看 | 国内精品久久久久影院免费 | 亚洲国产一区在线二区三区 | 一区二区三区免费在线观看 | 最新在线精品国自拍视频 | 久久 精品 一区二区 | 国产伦精一区二区三区视频 | 一二三中文乱码亚洲乱码 | 99在线视频精品 | 亚洲欧美精品中字久久99 | 国产系列 视频二区 | 99精品视频99| 日韩精品中文字幕视频一区 | 国产三级借妻 | 成人黄网大全在线观看 | 久久久成人影院 | 亚洲国产成人在人网站天堂 | 欧美激情综合亚洲一二区 | 国产一级特黄全黄毛片 | 国产三级精品三级 | 久久狠狠色狠狠色综合 | xoxoxoxo欧美性护士 | 日本免费高清视频二区 | 香蕉自拍视频 | 131的美女午夜爱爱爽爽视频 | 日本欧美一区二区三区片 | 久久国产乱子伦精品免费不卡 | 亚洲 欧美 都市 自拍 在线 | 国产成人久久精品二区三区 |