中文字幕第五页-中文字幕第页-中文字幕韩国-中文字幕最新-国产尤物二区三区在线观看-国产尤物福利视频一区二区

Golang中怎么利用Redis實現TCC分布式事務

這期內容當中小編將會給大家帶來有關Golang中怎么利用redis實現TCC分布式事務,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

創新互聯服務項目包括德欽網站建設、德欽網站制作、德欽網頁制作以及德欽網絡營銷策劃等。多年來,我們專注于互聯網行業,利用自身積累的技術優勢、行業經驗、深度合作伙伴關系等,向廣大中小型企業、政府機構等提供互聯網行業的解決方案,德欽網站推廣取得了明顯的社會效益與經濟效益。目前,我們服務的客戶以成都為中心已經輻射到德欽省份的部分城市,未來相信會繼續擴大服務區域并繼續獲得客戶的支持與信任!

對于使用者而言這種部分成功部分失敗的情況非常難以處理,所以我們需要保證 MSET 操作要么全部成功要么全部失敗。

MSET 命令在集群模式下的問題#

于是問題來了 DEL、MSET 等命令所涉及的 key 可能分布在不同的節點中,在集群模式下實現這類涉及多個 key 的命令最簡單的方式當然是 For-Each 遍歷 key 并向它們所在的節點發送相應的操作指令。 以 MGET 命令的實現為例:

func MGet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {

if len(args) < 2 {

return reply.MakeErrReply("ERR wrong number of arguments for 'mget' command")

}

// 從參數列表中取出要讀取的 key

keys := make([]string, len(args)-1)

for i := 1; i < len(args); i++ {

keys[i-1] = string(args[i])

}

resultMap := make(map[string][]byte)

// 計算每個 key 所在的節點,并按照節點分組

groupMap := cluster.groupBy(keys)

// groupMap 的類型為 map[string][]string,key 是節點的地址,value 是 keys 中屬于該節點的 key 列表

for peer, group := range groupMap {

// 向每個節點發送 mget 指令,讀取分布在它上面的 key

resp := cluster.Relay(peer, c, makeArgs("MGET", group...))

if reply.IsErrorReply(resp) {

errReply := resp.(reply.ErrorReply)

return reply.MakeErrReply(fmt.Sprintf("ERR during get %s occurs: %v", group[0], errReply.Error()))

}

arrReply, _ := resp.(*reply.MultiBulkReply)

// 將每個節點上的結果 merge 到 map 中

for i, v := range arrReply.Args {

key := group[i]

resultMap[key] = v

}

}

result := make([][]byte, len(keys))

for i, k := range keys {

result[i] = resultMap[k]

}

return reply.MakeMultiBulkReply(result)

}

// 計算 key 所屬的節點,并按節點分組

func (cluster *Cluster) groupBy(keys []string) map[string][]string {

result := make(map[string][]string)

for _, key := range keys {

// 使用一致性 hash 計算所屬節點

peer := cluster.peerPicker.Get(key)

// 將 key 加入到相應節點的分組中

group, ok := result[peer]

if !ok {

group = make([]string, 0)

}

group = append(group, key)

result[peer] = group

}

return result

}

那么 MSET 命令的實現能否如法炮制呢?答案是否定的。在上面的代碼中我們注意到,在向各個節點發送指令時若某個節點讀取失敗則會直接退出整個 MGET 執行過程。

若在執行 MSET 指令時遇到部分節點失敗或超時,則會出現部分 key 設置成功而另一份設置失敗的情況。對于緩存使用者而言這種部分成功部分失敗的情況非常難以處理,所以我們需要保證 MSET 操作要么全部成功要么全部失敗。

兩階段提交#

兩階段提交(2-Phase Commit, 2PC)算法是解決我們遇到的一致性問題最簡單的算法。在 2PC 算法中寫操作被分為兩個階段來執行:

Prepare 階段

協調者向所有參與者發送事務內容,詢問是否可以執行事務操作。在 Godis 中收到客戶端 MSET 命令的節點是事務的協調者,所有持有相關 key 的節點都要參與事務。

各參與者鎖定事務相關 key 防止被其它操作修改。各參與者寫 undo log 準備在事務失敗后進行回滾。

參與者回復協調者可以提交。若協調者收到所有參與者的YES回復,則準備進行事務提交。若有參與者回復NO或者超時,則準備回滾事務

Commit 階段

協調者向所有參與者發送提交請求

參與者正式提交事務,并在完成后釋放相關 key 的鎖。

參與者協調者回復ACK,協調者收到所有參與者的ACK后認為事務提交成功。

Rollback 階段

在事務請求階段若有參與者回復NO或者超時,協調者向所有參與者發出回滾請求

各參與者執行事務回滾,并在完成后釋放相關資源。

參與者協調者回復ACK,協調者收到所有參與者的ACK后認為事務回滾成功。

2PC是一種簡單的一致性協議,它存在一些問題:

單點服務: 若協調者突然崩潰則事務流程無法繼續進行或者造成狀態不一致

無法保證一致性: 若協調者第二階段發送提交請求時崩潰,可能部分參與者受到COMMIT請求提交了事務,而另一部分參與者未受到請求而放棄事務造成不一致現象。

阻塞: 為了保證事務完成提交,各參與者在完成第一階段事務執行后必須鎖定相關資源直到正式提交,影響系統的吞吐量。

首先我們定義事務的描述結構:

type Transaction struct {

id string // 事務 ID, 由 snowflake 算法生成

args [][]byte // 命令參數

cluster *Cluster

conn redis.Connection

keys []string // 事務中涉及的 key

undoLog map[string][]byte // 每個 key 在事務執行前的值,用于回滾事務

}

Prepare 階段#

先看事務參與者 prepare 階段的操作:

// prepare 命令的格式是: PrepareMSet TxID key1, key2 ...

// TxID 是事務 ID,由協調者決定

func PrepareMSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {

if len(args) < 3 {

return reply.MakeErrReply("ERR wrong number of arguments for 'preparemset' command")

}

txId := string(args[1])

size := (len(args) - 2) / 2

keys := make([]string, size)

for i := 0; i < size; i++ {

keys[i] = string(args[2*i+2])

}

txArgs := [][]byte{

[]byte("MSet"),

} // actual args for cluster.db

txArgs = append(txArgs, args[2:]...)

tx := NewTransaction(cluster, c, txId, txArgs, keys) // 創建新事務

cluster.transactions.Put(txId, tx) // 存儲到節點的事務列表中

err := tx.prepare() // 準備事務

if err != nil {

return reply.MakeErrReply(err.Error())

}

return &reply.OkReply{}

}

實際的準備操作在 tx.prepare() 中:

func (tx *Transaction) prepare() error {

// 鎖定相關 key

tx.cluster.db.Locks(tx.keys...)

// 準備 undo log

tx.undoLog = make(map[string][]byte)

for _, key := range tx.keys {

entity, ok := tx.cluster.db.Get(key)

if ok {

blob, err := gob.Marshal(entity) // 將修改之前的狀態序列化之后存儲作為 undo log

if err != nil {

return err

}

tx.undoLog[key] = blob

} else {

// 若事務執行前 key 是空的,在回滾時應刪除它

tx.undoLog[key] = []byte{}

}

}

tx.status = PreparedStatus

return nil

}

看看協調者在做什么:

func MSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {

// 解析參數

argCount := len(args) - 1

if argCount%2 != 0 || argCount < 1 {

return reply.MakeErrReply("ERR wrong number of arguments for 'mset' command")

}

size := argCount / 2

keys := make([]string, size)

valueMap := make(map[string]string)

for i := 0; i < size; i++ {

keys[i] = string(args[2*i+1])

valueMap[keys[i]] = string(args[2*i+2])

}

// 找到所屬的節點

groupMap := cluster.groupBy(keys)

if len(groupMap) == 1 { // do fast

// 若所有的 key 都在同一個節點直接執行,不使用較慢的 2pc 算法

for peer := range groupMap {

return cluster.Relay(peer, c, args)

}

}

// 開始準備階段

var errReply redis.Reply

txId := cluster.idGenerator.NextId() // 使用 snowflake 算法決定事務 ID

txIdStr := strconv.FormatInt(txId, 10)

rollback := false

// 向所有參與者發送 prepare 請求

for peer, group := range groupMap {

peerArgs := []string{txIdStr}

for _, k := range group {

peerArgs = append(peerArgs, k, valueMap[k])

}

var resp redis.Reply

if peer == cluster.self {

resp = PrepareMSet(cluster, c, makeArgs("PrepareMSet", peerArgs...))

} else {

resp = cluster.Relay(peer, c, makeArgs("PrepareMSet", peerArgs...))

}

if reply.IsErrorReply(resp) {

errReply = resp

rollback = true

break

}

}

if rollback {

// 若 prepare 過程出錯則執行回滾

RequestRollback(cluster, c, txId, groupMap)

} else {

_, errReply = RequestCommit(cluster, c, txId, groupMap)

rollback = errReply != nil

}

if !rollback {

return &reply.OkReply{}

}

return errReply

}

Commit 階段#

事務參與者提交本地事務:

func Commit(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {

if len(args) != 2 {

return reply.MakeErrReply("ERR wrong number of arguments for 'commit' command")

}

// 讀取事務信息

txId := string(args[1])

raw, ok := cluster.transactions.Get(txId)

if !ok {

return reply.MakeIntReply(0)

}

tx, _ := raw.(*Transaction)

// 在提交成功后解鎖 key

defer func() {

cluster.db.UnLocks(tx.keys...)

tx.status = CommitedStatus

//cluster.transactions.Remove(tx.id) // cannot remove, may rollback after commit

}()

cmd := strings.ToLower(string(tx.args[0]))

var result redis.Reply

if cmd == "del" {

result = CommitDel(cluster, c, tx)

} else if cmd == "mset" {

result = CommitMSet(cluster, c, tx)

}

// 提交失敗

if reply.IsErrorReply(result) {  陽痿早泄前列腺炎醫院哪家好http://www.zztjxb.com/

err2 := tx.rollback()

return reply.MakeErrReply(fmt.Sprintf("err occurs when rollback: %v, origin err: %s", err2, result))

}

return result

}

// 執行操作

func CommitMSet(cluster *Cluster, c redis.Connection, tx *Transaction) redis.Reply {

size := len(tx.args) / 2

keys := make([]string, size)

values := make([][]byte, size)

for i := 0; i < size; i++ {

keys[i] = string(tx.args[2*i+1])

values[i] = tx.args[2*i+2]鄭州無痛人流醫院哪家好http://www.hnzzxb.com/

}

for i, key := range keys {

value := values[i]

cluster.db.Put(key, &db.DataEntity{Data: value})

}

cluster.db.AddAof(reply.MakeMultiBulkReply(tx.args))

return &reply.OkReply{}

}

協調者的邏輯也很簡單:

func RequestCommit(cluster *Cluster, c redis.Connection, txId int64, peers map[string][]string) ([]redis.Reply, reply.ErrorReply) {

var errReply reply.ErrorReply

txIdStr := strconv.FormatInt(txId, 10)

respList := make([]redis.Reply, 0, len(peers))

for peer := range peers {

var resp redis.Reply

if peer == cluster.self {

resp = Commit(cluster, c, makeArgs("commit", txIdStr))

} else {

resp = cluster.Relay(peer, c, makeArgs("commit", txIdStr))

}

if reply.IsErrorReply(resp) {

errReply = resp.(reply.ErrorReply)

break

}

respList = append(respList, resp)

}

if errReply != nil {

RequestRollback(cluster, c, txId, peers)

return nil, errReply

}

return respList, nil

}

Rollback#

回滾本地事務:

func Rollback(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {

if len(args) != 2 {

return reply.MakeErrReply("ERR wrong number of arguments for 'rollback' command")

}

txId := string(args[1])

raw, ok := cluster.transactions.Get(txId)

if !ok {

return reply.MakeIntReply(0)

}

tx, _ := raw.(*Transaction)

err := tx.rollback()

if err != nil {

return reply.MakeErrReply(err.Error())

}

return reply.MakeIntReply(1)

}

func (tx *Transaction) rollback() error {

for key, blob := range tx.undoLog {

if len(blob) > 0 {

entity := &db.DataEntity{}

err := gob.UnMarshal(blob, entity) // 反序列化事務前的快照

if err != nil {

return err

}

tx.cluster.db.Put(key, entity) // 寫入事務前的數據

} else {

tx.cluster.db.Remove(key) // 若事務開始之前 key 不存在則將其刪除

}

}

if tx.status != CommitedStatus {

tx.cluster.db.UnLocks(tx.keys...)

}

tx.status = RollbackedStatus

return nil

}

協調者的邏輯與 commit 類似:

func RequestRollback(cluster *Cluster, c redis.Connection, txId int64, peers map[string][]string) {

txIdStr := strconv.FormatInt(txId, 10)

for peer := range peers {

if peer == cluster.self {

Rollback(cluster, c, makeArgs("rollback", txIdStr))

} else {

cluster.Relay(peer, c, makeArgs("rollback", txIdStr))

}

}

}

上述就是小編為大家分享的Golang中怎么利用Redis實現TCC分布式事務了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注創新互聯行業資訊頻道。

文章題目:Golang中怎么利用Redis實現TCC分布式事務
瀏覽地址:http://www.2m8n56k.cn/article0/iesdio.html

成都網站建設公司_創新互聯,為您提供品牌網站設計網站改版全網營銷推廣網站營銷動態網站軟件開發

廣告

聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:[email protected]。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯

手機網站建設
主站蜘蛛池模板: 毛片在线免费观看网站 | 国产3区 | 最近免费手机中文字幕3 | 一区二区三区在线观看视频 | 欧美成人香蕉网在线观看 | 亚洲精品综合欧美一区二区三区 | 日韩亚洲国产综合久久久 | 免费高清欧美一区二区视频 | 精品国产成人三级在线观看 | 亚洲精品在线播放 | 欧美不卡视频在线观看 | 玖玖视频精品 | 国产精品视频成人 | 欧美特黄一级aa毛片 | 亚洲综合小视频 | 欧美三级网站在线观看 | 中文字幕一区二区精品区 | 国产精品第五页 | 国产成人免费片在线观看 | 日本一在线中文字幕天堂 | 久久精品国产三级不卡 | 高清不卡毛片免费观看 | 日本一区二区三区不卡视频中文字幕 | 亚洲三级大片 | 欧美日韩亚洲高清不卡一区二区三区 | 男人干女人的视频 | 欧美日本一区二区 | 久久久国产一区二区三区丝袜 | 一区二区中文字幕在线观看 | 三级网址在线观看 | 亚洲国产成+人+综合 | 国产免费一级视频 | 高清国产一级精品毛片基地 | 国产成人女人视频在线观看 | 国产三级在线播放线 | 黄色一及毛片 | 日韩毛片 | 中国一级毛片欧美一级毛片 | 欧美国产日本高清不卡 | 免费被黄网站在观看 | 亚洲精品国产免费 |