searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

ETCD实现分布式锁

2023-09-18 01:41:59
29
0

使用ETCD实现分布式锁的流程如下:

STEP1:初始化ETCD客户端:创建ETCD客户端,用于与ETCD集群通信。配置参数一般为ETCD服务器的地址、连接超时等。

STEP2:创建租约:租约使键值对具有有限生命周期。当客户端挂掉后,保证键值对(锁)能够在过期时间后被清除。

STEP3:获取锁:通过ETCD事务操作,尝试通过在特定的键下创建一个临时顺序节点。只有成功创建了这个节点的客户端才能获得锁。如果多个客户端同时尝试获取锁,ETCD根据节点的顺序来确定哪个客户端获得锁。

STEP4:自动续约(异步进行):获取锁后,需要定期续租租约以保持锁的持续性。通过设置一个周期性的续租机制,确保锁不会因租约过期而被释放。(仅当客户端主动释放或续约过程中断后才会因租约到期被释放)

STEP5:执行业务逻辑:客户端获得锁后,继续执行需要加锁的业务逻辑。在执行期间,自动续约机制确保不会释放锁,但自动续约机制会受网络、IO、时间同步等因素而中断。因此,业务逻辑层除了执行业务需求,也需要考虑到自动续约失败后,怎么进行异常处理。

STEP6:释放锁:当加锁的业务逻辑执行完毕时,需要手动释放锁。

STEP7:关闭ETCD客户端。

注意:本文实现的ETCD分布式锁,主要体现实现过程。实际应用中,还需要在业务层考虑到异常处理等情况。

 

流程图:

 

GO语言代码实现:

package main

import (
	"context"
	"fmt"
	"log"
	"time"
	"go.etcd.io/etcd/client/v3"
)

var (
	etcd_client *clientv3.Client
)

// init_etcd_client 初始化ETCD客户端
func init_etcd_client() error {
	client, err := clientv3.New(clientv3.Config{
		endpoints: []string{"*************"}, // ETCD服务器地址:端口
		dial_timeout: 5 * time.Second,
	})
	if err != nil {
		return err
	}
	etcd_client = client
	return nil
}

// lock 
func lock(key, value string, lease_ttl int64) error {
	// 创建一个ETCD租约
	lease_resp, err := etcd_client.Grant(context.TODO(), lease_ttl)
	if err != nil {
		return err
	}
	leaseID := lease_resp.ID

	// 定期续租以确保锁的持续性
	keepalive_ch, err := etcd_client.KeepAlive(context.TODO(), leaseID)
	if err != nil {
		return err
	}

	// 开启一个协程监控续租情况
	go func() {
		for {
			select {
			case _, is_ok := <-keepalive_ch:
				if !is_ok {
					fmt.Println("Lease keepalive failed.")
					/*
						这里需要考虑到,自动续约过程失败的情况,结合业务逻辑,
						当自动续约失败后,即表示在业务执行过程中,丢失了锁,
						需要制定相应的措施,重试或者业务回滚等。
					*/
					
					return
				}
			}
		}
	}()

	// 尝试获取锁
	_, err = etcd_client.Put(context.TODO(), key, value, clientv3.WithLease(leaseID))
	if err != nil {
		return err
	}

	return nil
}

// unlock 释放分布式锁
func unlock(key string) error {
	_, err := etcd_client.Delete(context.TODO(), key)
	return err
}

func main() {
	// 初始化ETCD客户端
	err := init_etcd_client()
	if err != nil {
		log.Fatal(err)
	}
	defer etcd_client.Close()

	// 锁定资源
	key := "mykey"
	value := "myvalue"
	lease_ttl := int64(20) // 租约时间,单位秒
	err = lock(key, value, lease_ttl)
	if err != nil {
		log.Fatal(err)
	}

	// 业务逻辑过程
	time.Sleep(60 * time.Second)

	// 释放锁
	err = unlock(key)
	if err != nil {
		log.Fatal(err)
	}
}
0条评论
0 / 1000
j****m
3文章数
0粉丝数
j****m
3 文章 | 0 粉丝
j****m
3文章数
0粉丝数
j****m
3 文章 | 0 粉丝
原创

ETCD实现分布式锁

2023-09-18 01:41:59
29
0

使用ETCD实现分布式锁的流程如下:

STEP1:初始化ETCD客户端:创建ETCD客户端,用于与ETCD集群通信。配置参数一般为ETCD服务器的地址、连接超时等。

STEP2:创建租约:租约使键值对具有有限生命周期。当客户端挂掉后,保证键值对(锁)能够在过期时间后被清除。

STEP3:获取锁:通过ETCD事务操作,尝试通过在特定的键下创建一个临时顺序节点。只有成功创建了这个节点的客户端才能获得锁。如果多个客户端同时尝试获取锁,ETCD根据节点的顺序来确定哪个客户端获得锁。

STEP4:自动续约(异步进行):获取锁后,需要定期续租租约以保持锁的持续性。通过设置一个周期性的续租机制,确保锁不会因租约过期而被释放。(仅当客户端主动释放或续约过程中断后才会因租约到期被释放)

STEP5:执行业务逻辑:客户端获得锁后,继续执行需要加锁的业务逻辑。在执行期间,自动续约机制确保不会释放锁,但自动续约机制会受网络、IO、时间同步等因素而中断。因此,业务逻辑层除了执行业务需求,也需要考虑到自动续约失败后,怎么进行异常处理。

STEP6:释放锁:当加锁的业务逻辑执行完毕时,需要手动释放锁。

STEP7:关闭ETCD客户端。

注意:本文实现的ETCD分布式锁,主要体现实现过程。实际应用中,还需要在业务层考虑到异常处理等情况。

 

流程图:

 

GO语言代码实现:

package main

import (
	"context"
	"fmt"
	"log"
	"time"
	"go.etcd.io/etcd/client/v3"
)

var (
	etcd_client *clientv3.Client
)

// init_etcd_client 初始化ETCD客户端
func init_etcd_client() error {
	client, err := clientv3.New(clientv3.Config{
		endpoints: []string{"*************"}, // ETCD服务器地址:端口
		dial_timeout: 5 * time.Second,
	})
	if err != nil {
		return err
	}
	etcd_client = client
	return nil
}

// lock 
func lock(key, value string, lease_ttl int64) error {
	// 创建一个ETCD租约
	lease_resp, err := etcd_client.Grant(context.TODO(), lease_ttl)
	if err != nil {
		return err
	}
	leaseID := lease_resp.ID

	// 定期续租以确保锁的持续性
	keepalive_ch, err := etcd_client.KeepAlive(context.TODO(), leaseID)
	if err != nil {
		return err
	}

	// 开启一个协程监控续租情况
	go func() {
		for {
			select {
			case _, is_ok := <-keepalive_ch:
				if !is_ok {
					fmt.Println("Lease keepalive failed.")
					/*
						这里需要考虑到,自动续约过程失败的情况,结合业务逻辑,
						当自动续约失败后,即表示在业务执行过程中,丢失了锁,
						需要制定相应的措施,重试或者业务回滚等。
					*/
					
					return
				}
			}
		}
	}()

	// 尝试获取锁
	_, err = etcd_client.Put(context.TODO(), key, value, clientv3.WithLease(leaseID))
	if err != nil {
		return err
	}

	return nil
}

// unlock 释放分布式锁
func unlock(key string) error {
	_, err := etcd_client.Delete(context.TODO(), key)
	return err
}

func main() {
	// 初始化ETCD客户端
	err := init_etcd_client()
	if err != nil {
		log.Fatal(err)
	}
	defer etcd_client.Close()

	// 锁定资源
	key := "mykey"
	value := "myvalue"
	lease_ttl := int64(20) // 租约时间,单位秒
	err = lock(key, value, lease_ttl)
	if err != nil {
		log.Fatal(err)
	}

	// 业务逻辑过程
	time.Sleep(60 * time.Second)

	// 释放锁
	err = unlock(key)
	if err != nil {
		log.Fatal(err)
	}
}
文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0