Kafka重復(fù)消費(fèi)場景及解決方案是什么,針對(duì)這個(gè)問題,這篇文章詳細(xì)介紹了相對(duì)應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問題的小伙伴找到更簡單易行的方法。
十年專注成都網(wǎng)站制作,企業(yè)網(wǎng)站設(shè)計(jì),個(gè)人網(wǎng)站制作服務(wù),為大家分享網(wǎng)站制作知識(shí)、方案,網(wǎng)站設(shè)計(jì)流程、步驟,成功服務(wù)上千家企業(yè)。為您提供網(wǎng)站建設(shè),網(wǎng)站制作,網(wǎng)頁設(shè)計(jì)及定制高端網(wǎng)站建設(shè)服務(wù),專注于企業(yè)網(wǎng)站設(shè)計(jì),高端網(wǎng)頁制作,對(duì)成都橡塑保溫等多個(gè)行業(yè),擁有豐富設(shè)計(jì)經(jīng)驗(yàn)。
Kafka消費(fèi)者以消費(fèi)者組(Consumer Group)的形式消費(fèi)一個(gè)topic,發(fā)布到topic中的每個(gè)記錄將傳遞到每個(gè)訂閱消費(fèi)者者組中的一個(gè)消費(fèi)者實(shí)例。Consumer Group 之間彼此獨(dú)立,互不影響,它們能夠訂閱相同的一組主題而互不干涉。生產(chǎn)環(huán)境中消費(fèi)者在消費(fèi)消息的時(shí)候若不考慮消費(fèi)者的相關(guān)特性可能會(huì)出現(xiàn)重復(fù)消費(fèi)的問題。
在討論重復(fù)消費(fèi)之前,首先來看一下kafka中跟消費(fèi)者有關(guān)的幾個(gè)重要配置參數(shù)。
enable.auto.commit默認(rèn)值true,表示消費(fèi)者會(huì)周期性自動(dòng)提交消費(fèi)的offset
auto.commit.interval.ms在enable.auto.commit為true的情況下, 自動(dòng)提交的間隔,默認(rèn)值5000ms
max.poll.records 單次消費(fèi)者拉取的最大數(shù)據(jù)條數(shù),默認(rèn)值
500 max.poll.interval.ms默認(rèn)值5分鐘,表示若5分鐘之內(nèi)消費(fèi)者沒有消費(fèi)完上一次poll的消息,那么consumer會(huì)主動(dòng)發(fā)起離開group的請(qǐng)求
在常見的使用場景下,我們的消費(fèi)者配置比較簡單,特別是集成Spring組件進(jìn)行消息的消費(fèi),通常情況下我們僅需通過一個(gè)注解就可以實(shí)現(xiàn)消息的消費(fèi)。例如如下代碼:
這段代碼中我們配置了一個(gè)kafka消費(fèi)注解,制定消費(fèi)名為"test1"的topic,這個(gè)消費(fèi)者屬于"group1"消費(fèi)組。開發(fā)者只需要對(duì)得到的消息進(jìn)行處理即可。那么這段 代碼中的消費(fèi)者在這個(gè)過程中是如何拉取消息的呢,消費(fèi)者消費(fèi)消息之后又是如何提交對(duì)應(yīng)消息的位移(offset)的呢?
實(shí)際上在auto.commit=true時(shí),當(dāng)上一次poll方法拉取的消息消費(fèi)完時(shí)會(huì)進(jìn)行下一次poll,在經(jīng)過auto.commit.interval.ms間隔后,下一次調(diào)用poll時(shí)會(huì)提交所有已消費(fèi)消息的offset。
為了驗(yàn)證consumer自動(dòng)提交的時(shí)機(jī),配置消費(fèi)者參數(shù)如下:
為了便于獲取消費(fèi)者消費(fèi)進(jìn)度,以下代碼通過kafka提供的相關(guān)接口定時(shí)每隔5s獲取一次消費(fèi)者的消費(fèi)進(jìn)度信息,并將獲取到的信息打印到控制臺(tái)。
對(duì)于topic test1,為了便于觀察消費(fèi)情況,我們僅設(shè)置了一個(gè)partition。對(duì)于消費(fèi)者組group1的配置參數(shù),消費(fèi)者會(huì)單次拉取消息數(shù)20條,消費(fèi)每條消息耗費(fèi)1s,部分記錄日志打印結(jié)果如下:
從日志中可以看出,消費(fèi)組的offset每40s更新一次,因?yàn)槊看蝡oll會(huì)拉取20條消息,每個(gè)消息消費(fèi)1s,在第一次poll之后,下一次poll因?yàn)闆]有達(dá)到auto.commit.interval.ms=30s,所以不會(huì)提交offset。第二次poll時(shí),已經(jīng)經(jīng)過40s,因此這次poll會(huì)提交之前兩次消費(fèi)的消息,offset增加40。也就是說只有在經(jīng)過auto.commit.interval.ms間隔后,并且在下一次調(diào)用poll時(shí)才會(huì)提交所有 已消費(fèi)消息的offset。
考慮到以上消費(fèi)者消費(fèi)消息的特點(diǎn),在配置自動(dòng)提交enable.auto.commit 默認(rèn)值true情況下,出現(xiàn)重復(fù)消費(fèi)的場景有以下幾種:
Consumer 在消費(fèi)過程中,應(yīng)用進(jìn)程被強(qiáng)制kill掉或發(fā)生異常退出。
例如在一次poll500條消息后,消費(fèi)到200條時(shí),進(jìn)程被強(qiáng)制kill消費(fèi)導(dǎo)致offset 未提交,或出現(xiàn)異常退出導(dǎo)致消費(fèi)到offset未提交。下次重啟時(shí),依然會(huì)重新拉取這500消息,這樣就造成之前消費(fèi)到200條消息重復(fù)消費(fèi)了兩次。因此在有消費(fèi)者線程的應(yīng)用中,應(yīng)盡量避免使用kill -9這樣強(qiáng)制殺進(jìn)程的命令。
消費(fèi)者消費(fèi)時(shí)間過長
max.poll.interval.ms參數(shù)定義了兩次poll的最大間隔,它的默認(rèn)值是 5 分鐘,表示你的 Consumer 程序如果在 5 分鐘之內(nèi)無法消費(fèi)完 poll 方法返回的消息,那么 Consumer 會(huì)主動(dòng)發(fā)起“離開組”的請(qǐng)求,Coordinator 也會(huì)開啟新一輪 Rebalance。若消費(fèi)者消費(fèi)的消息比較耗時(shí),那么這種情況可能就會(huì)出現(xiàn)。
為了復(fù)現(xiàn)這種場景,我們對(duì)消費(fèi)者重新進(jìn)行了配置,消費(fèi)者參數(shù)如下:
在消費(fèi)過程中消費(fèi)者單次會(huì)拉取11條消息,每條消息耗時(shí)30s,11條消息耗時(shí) 5分鐘30秒,由于max.poll.interval.ms默認(rèn)值5分鐘,所以理論上消費(fèi)者無法在5分鐘內(nèi)消費(fèi)完,consumer會(huì)離開組,導(dǎo)致rebalance。
實(shí)際運(yùn)行日志如下:
可以看到在消費(fèi)完第11條消息后,因?yàn)橄M(fèi)時(shí)間超出max.poll.interval.ms默認(rèn)值5分鐘,這時(shí)consumer已經(jīng)離開消費(fèi)組了,開始rebalance,因此提交offset失敗。之后重新rebalance,消費(fèi)者再次分配partition后,再次poll拉取消息依然從之前消費(fèi)過的消息處開始消費(fèi),這樣就造成重復(fù)消費(fèi)。而且若不解決消費(fèi)單次消費(fèi)時(shí)間過長的問題,這部分消息可能會(huì)一直重復(fù)消費(fèi)。
對(duì)于上述重復(fù)消費(fèi)的場景,若不進(jìn)行相應(yīng)的處理,那么有可能造成一些線上問題。為了避免因重復(fù)消費(fèi)導(dǎo)致的問題,以下提供了兩種解決重復(fù)消費(fèi)的思路。
第一種思路是提高消費(fèi)能力,提高單條消息的處理速度,例如對(duì)消息處理中比 較耗時(shí)的步驟可通過異步的方式進(jìn)行處理、利用多線程處理等。在縮短單條消息消費(fèi)時(shí)常的同時(shí),根據(jù)實(shí)際場景可將max.poll.interval.ms值設(shè)置大一點(diǎn),避免不 必要的rebalance,此外可適當(dāng)減小max.poll.records的值,默認(rèn)值是500,可根 據(jù)實(shí)際消息速率適當(dāng)調(diào)小。這種思路可解決因消費(fèi)時(shí)間過長導(dǎo)致的重復(fù)消費(fèi)問題, 對(duì)代碼改動(dòng)較小,但無法絕對(duì)避免重復(fù)消費(fèi)問題。
第二種思路是引入單獨(dú)去重機(jī)制,例如生成消息時(shí),在消息中加入唯一標(biāo)識(shí)符如消息id等。在消費(fèi)端,我們可以保存最近的1000條消息id到redis或MySQL表中,配置max.poll.records的值小于1000。在消費(fèi)消息時(shí)先通過前置表去重后再進(jìn)行消息的處理。
此外,在一些消費(fèi)場景中,我們可以將消費(fèi)的接口冪等處理,例如數(shù)據(jù)庫的查 詢操作天然具有冪等性,這時(shí)候可不用考慮重復(fù)消費(fèi)的問題。對(duì)于例如新增數(shù)據(jù)的操作,可通過設(shè)置唯一鍵等方式以達(dá)到單次與多次操作對(duì)系統(tǒng)的影響相同,從而使接口具有冪等性。
關(guān)于 Kafka重復(fù)消費(fèi)場景及解決方案是什么問題的解答就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識(shí)。
標(biāo)題名稱:Kafka重復(fù)消費(fèi)場景及解決方案是什么
鏈接地址:http://www.2m8n56k.cn/article40/jgeceo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站設(shè)計(jì)、電子商務(wù)、手機(jī)網(wǎng)站建設(shè)、網(wǎng)站排名、云服務(wù)器、虛擬主機(jī)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:[email protected]。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)