中文字幕第五页-中文字幕第页-中文字幕韩国-中文字幕最新-国产尤物二区三区在线观看-国产尤物福利视频一区二区

怎么讓Kafka達到最佳吞吐量

本篇內容介紹了“怎么讓Kafka達到最佳吞吐量”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

我們提供的服務有:成都網站建設、網站制作、微信公眾號開發、網站優化、網站認證、化隆ssl等。為數千家企事業單位解決了網站和推廣的問題。提供周到的售前咨詢和貼心的售后服務,是有科學管理、有技術的化隆網站制作公司

上手使用

func main() {
  // 1. 初始化
	pusher := kq.NewPusher([]string{
		"127.0.0.1:19092",
		"127.0.0.1:19092",
		"127.0.0.1:19092",
	}, "kq")

	ticker := time.NewTicker(time.Millisecond)
	for round := 0; round < 3; round++ {
		select {
		case <-ticker.C:
			count := rand.Intn(100)
			m := message{
				Key:     strconv.FormatInt(time.Now().UnixNano(), 10),
				Value:   fmt.Sprintf("%d,%d", round, count),
				Payload: fmt.Sprintf("%d,%d", round, count),
			}
			body, err := json.Marshal(m)
			if err != nil {
				log.Fatal(err)
			}

			fmt.Println(string(body))
      // 2. 寫入
			if err := pusher.Push(string(body)); err != nil {
				log.Fatal(err)
			}
		}
	}
}

kafka cluster 配置以及 topic 傳入,你就得到一個操作 kafkapush operator

至于寫入消息,簡單的調用 pusher.Push(msg) 就行。是的,就這么簡單!

> 當然,目前只支持單個 msg 寫入。可能有人會疑惑,那就繼續往下看,為什么只能一條一條寫入?

初始化

一起看看 pusher 初始化哪些步驟:

NewPusher(clusterAddrs, topic, opts...)
	|- kafka.NewWriter(kfConfig)								// 與 kf 之前的連接
	|- executor = executors.NewChunkExecutor()  // 設置內部寫入的executor為字節數定量寫入
  1. 建立與 kafka cluster 的連接。此處肯定就要傳入 kafka config

  2. 設置內部暫存區的寫入函數以及刷新規則。

使用 chunkExecutor 作用不言而喻:將隨機寫 -> 批量寫,減少 I/O 消耗;同時保證單次寫入不能超過默認的 1M 或者自己設定的最大寫入字節數。

其實再往 chunkExecutor 內部看,其實每次觸發插入有兩個指標:

  • maxChunkSize:單次最大寫入字節數

  • flushInterval:刷新暫存消息插入的間隔時間

在觸發寫入,只要滿足任意一個指標都會執行寫入。同時在 executors 都有設置插入間隔時間,以防暫存區寫入阻塞而暫存區內消息一直不被刷新清空。

> 更多關于 executors 可以參看以下:https://zeromicro.github.io/go-zero/executors.html

生產者插入

根據上述初始化對 executors 介紹,插入過程中也少不了它的配合:

func (p *Pusher) Push(v string) error {
  // 1. 將 msg -> kafka 內部的 Message
	msg := kafka.Message{
		Key:   []byte(strconv.FormatInt(time.Now().UnixNano(), 10)),
		Value: []byte(v),
	}
  
  // 使用 executor.Add() 插入內部的 container
  // 當 executor 初始化失敗或者是內部發生錯誤,也會將 Message 直接插入 kafka
	if p.executor != nil {
		return p.executor.Add(msg, len(v))
	} else {
		return p.produer.WriteMessages(context.Background(), msg)
	}
}

過程其實很簡單。那 executors.Add(msg, len(msg)) 是怎么把 msg 插入到 kafka 呢?

插入的邏輯其實在初始化中就聲明了:

pusher.executor = executors.NewChunkExecutor(func(tasks []interface{}) {
		chunk := make([]kafka.Message, len(tasks))
  	// 1
		for i := range tasks {
			chunk[i] = tasks[i].(kafka.Message)
		}
  	// 2
		if err := pusher.produer.WriteMessages(context.Background(), chunk...); err != nil {
			logx.Error(err)
		}
	}, newOptions(opts)...)
  1. 觸發插入時,將暫存區中存儲的 []msg 依次拿出,作為最終插入消息集合;

  2. 將上一步的消息集合,作為一個批次插入 kafkatopic

這樣 pusher -> chunkExecutor -> kafka 一個鏈路就出現了。

“怎么讓Kafka達到最佳吞吐量”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注創新互聯網站,小編將為大家輸出更多高質量的實用文章!

文章名稱:怎么讓Kafka達到最佳吞吐量
文章URL:http://www.2m8n56k.cn/article44/jcgshe.html

成都網站建設公司_創新互聯,為您提供小程序開發App開發外貿建站網站改版外貿網站建設虛擬主機

廣告

聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:[email protected]。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯

商城網站建設
主站蜘蛛池模板: 欧美日本亚洲国产一区二区 | 嫩模大尺度人体福利视频 | 欧美另类视频videosbest18 | 特黄特色一级特色大片中文 | 日韩视频观看 | 国产韩国精品一区二区三区 | 亚洲系列中文字幕一区二区 | 国产精品午夜性视频网站 | 99热碰| 午夜性爽视频男人的天堂在线 | 久久视屏这里只有精品6国产 | 在线观看免费av网站 | 国产成人丝袜视频在线视频 | 欧美一级成人一区二区三区 | 国内xxxx乱子另类 | 欧美性色生活免费观看 | 久久五月女厕所一区二区 | 日本不卡不码高清免费观看 | 日韩a一级欧美一级在线播放 | 一级毛片在线看 | 久久久久久国产视频 | 成年人在线免费观看网站 | 免费观看久久 | 中文乱码一二三四有限公司 | 美女张开腿双腿让男人桶 | 亚洲精品一区二区四季 | 日本精品视频一区二区三区 | a级欧美片免费观看 | 久久免费精品国产视频 | 日本三级香港三级人妇99 | 亚洲一区二区三区久久久久 | a国产在线 | 超级碰碰碰视频视频在线视频 | 草久在线观看视频 | 成人欧美视频在线观看播放 | 91久久亚洲国产成人精品性色 | 爱爱爱久久久久久久 | 精品国产高清在线看国产 | 手机在线播放视频 | 欧美在线bdsm调教一区 | 精品久久久中文字幕一区 |