searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

常见分布式锁介绍和对应实现

2023-05-26 02:34:00
5
0

分布式系统中,多个节点之间的通信是不可避免的,而这种通信会导致数据一致性问题。如果多个节点同时对同一个共享资源进行操作,就会出现竞争的情况,进而导致数据不一致性。分布式锁可以解决这个问题,它可以确保同一时刻只有一个节点能够访问共享资源,从而保证数据的一致性。分布式锁的实现方式有很多,例如基于数据库、基于redis、基于ZooKeeper等。无论采用何种方式,分布式锁都是分布式系统中非常重要的一种同步机制,它可以保证数据的一致性,提高系统的可靠性和稳定性。

下面将分别介绍这些锁的实现方式和Java代码示例。

  1. 基于数据库实现分布式锁

基于数据库实现分布式锁的方式是将锁信息存储在数据库中,通过对数据库中的数据进行读写操作来实现锁的获取和释放。通常,锁信息包括锁名称、锁状态、锁拥有者等信息。

下面是使用MySQL实现分布式锁的Java代码示例:

public class MysqlLock {

    private static final String LOCK_NAME = "lock_name" ;
    private static final String LOCKED = "locked";
    private static final String UNLOCKED = "unlocked" ;
    private DataSource dataSource;

    public MysqlLock(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    public boolean lock() {
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        try {
            conn = dataSource.getConnection();
            conn.setAutoCommit(false);
            String sql = "select * from lock_table where lock_name = ? for update";
            ps = conn.prepareStatement(sql);
            ps.setString(1, LOCK_NAME);
            rs = ps.executeQuery();
            if (rs.next()) {
                String status = rs.getString("status")
                    if (LOCKED.equals(status)) {
                    return false;
                } else {
                    sql = "update lock_table set status = ? where lock_name = ?";
                    ps = conn.prepareStatement(sql);
                    ps.setString(1, LOCKED);
                    ps.setString(2, LOCK_NAME);
                    ps.executeUpdate();
                    conn.commit();
                    return true;
                }
            } else {
                sql = "insert into lock_table(lock_name, status) values(?, ?)" ;
                ps = conn.prepareStatement(sql);
                ps.setString(1, LOCK_NAME);
                ps.setString(2, LOCKED);
                ps.executeUpdate();
                conn.commit();
                return true;
            }
        } catch (SQLException e) {
            try {
                if (conn != null) {
                    conn.rollback();
                }
            } catch (SQLException ex) {
                ex.printStackTrace();
            }
            e.printStackTrace();
            return false;
        } finally {
            try {
                if (rs != null) {
                    rs.close();
                }
                if (ps != null) {
                    ps.close();
                }
                if (conn != null) {
                    conn.close();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    public void unlock() {
        Connection conn = null;
        PreparedStatement ps = null;
        try {
            conn = dataSource.getConnection();
            conn.setAutoCommit(false);
            String sql = "update lock_table set status = ? where lock_name = ?";
            ps = conn.prepareStatement(sql);
            ps.setString(1, UNLOCKED);
            ps.setString(2, LOCK_NAME);
            ps.executeUpdate();
            conn.commit();
        } catch (SQLException e) {
            try {
                if (conn != null) {
                    conn.rollback();
                }
            } catch (SQLException ex) {
                ex.printStackTrace();
            }
            e.printStackTrace();
        } finally {
            try {
                if (ps != null) {
                    ps.close();
                }
                if (conn != null) {
                    conn.close();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}



  1. 基于Redis实现分布式锁

Redis分布式锁是一种基于Redis实现的分布式锁,可以保证在分布式环境下多个进程或线程对共享资源的互斥访问。它的实现基于Redis的原子操作和过期时间机制。

实现步骤:

  1. 获取锁:使用SETNX命令在Redis中创建一个锁,如果返回值为1,表示获取锁成功,否则获取锁失败。

  2. 设置过期时间:为了防止锁被长时间占用而导致死锁,需要给锁设置一个过期时间,使用EXPIRE命令设置锁的过期时间。

  3. 释放锁:使用DEL命令删除锁。

需要注意的是,在释放锁之前需要判断当前线程是否持有锁,防止误删其他线程的锁。

下面是使用Redis实现分布式锁的Java代码示例:

public class RedisDistributedLock {

    private static final String LOCK_KEY_PREFIX = "lock:";
    private static final int LOCK_EXPIRE_TIME = 60; 
    private static final int MAX_RETRY_TIMES = 3; 
    private static final int RETRY_INTERVAL = 100; 

    private JedisPool jedisPool;

    public RedisDistributedLock(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }

    public boolean lock(String key) {
        String lockKey = LOCK_KEY_PREFIX + key;
        Jedis jedis = null;
        int retryTimes = 0;
        try {
            jedis = jedisPool.getResource();
            while (retryTimes < MAX_RETRY_TIMES) {
                long result = jedis.setnx(lockKey, "1");
                if (result == 1) {
                    jedis.expire(lockKey, LOCK_EXPIRE_TIME);
                    return true;
                }
                retryTimes++;
                Thread.sleep(RETRY_INTERVAL);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
        return false;
    }

    public void unlock(String key) {
        String lockKey = LOCK_KEY_PREFIX + key;
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            String value = jedis.get(lockKey);
            if (value != null && value.equals("1" {
                jedis.del(lockKey);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
    }
}


使用示例:

JedisPool jedisPool = new JedisPool("localhost" 6379);
RedisDistributedLock lock = new RedisDistributedLock(jedisPool);
String key = "test";
if (lock.lock(key)) {
    try {
        // 执行业务逻辑
    } finally {
        lock.unlock(key);
    }
} else {
    // 获取锁失败,处理异常情况
}
 
  1. 基于Zookeeper实现分布式锁

基于Zookeeper实现分布式锁的方式是将锁信息存储在Zookeeper中,通过对Zookeeper中的节点进行创建、删除、监听等操作来实现锁的获取和释放。通常,锁信息包括锁名称、锁拥有者、锁节点等信息。

下面是使用Zookeeper实现分布式锁的Java代码示例:

public class ZookeeperLock {

    private static final String LOCK_PREFIX = "lock_" ;
    private ZooKeeper zooKeeper;

    public ZookeeperLock(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    public boolean lock(String lockName, String lockOwner) {
        try {
            String lockPath = LOCK_PREFIX + lockName;
            String nodePath = zooKeeper.create(lockPath + " lockOwner.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            List> nodeList = zooKeeper.getChildren(lockPath, false);
            Collections.sort(nodeList);
            String firstNode = nodeList.get(0);
            if (nodePath.endsWith(firstNode)) {
                return true;
            } else {
                String prevNode = nodeList.get(nodeList.indexOf(nodePath.substring(lockPath.length() + 1)) - 1);
                CountDownLatch latch = new CountDownLatch(1);
                Stat stat = zooKeeper.exists(lockPath + " + prevNode, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        if (event.getType() == Event.EventType.NodeDeleted) {
                            latch.countDown();
                        }
                    }
                });
                if (stat != null) {
                    latch.await();
                    return true;
                } else {
                    return false;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    public void unlock(String lockName, String lockOwner) {
        try {
            String lockPath = LOCK_PREFIX + lockName;
            String nodePath = zooKeeper.create(lockPath + " lockOwner.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            zooKeeper.delete(nodePath, -1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

总结:

在实际开发中,分布式锁是一种常见的解决并发问题的方式。但是,不同的应用场景可能需要不同的分布式锁实现方式,同时,分布式锁的性能和可靠性也是需要考虑的问题,例如锁的粒度、锁的超时等。因此,在选择分布式锁实现方式时,需要综合考虑各种因素,以达到最佳的性能和可靠性。

0条评论
作者已关闭评论
金****栋
2文章数
0粉丝数
金****栋
2 文章 | 0 粉丝
金****栋
2文章数
0粉丝数
金****栋
2 文章 | 0 粉丝
原创

常见分布式锁介绍和对应实现

2023-05-26 02:34:00
5
0

分布式系统中,多个节点之间的通信是不可避免的,而这种通信会导致数据一致性问题。如果多个节点同时对同一个共享资源进行操作,就会出现竞争的情况,进而导致数据不一致性。分布式锁可以解决这个问题,它可以确保同一时刻只有一个节点能够访问共享资源,从而保证数据的一致性。分布式锁的实现方式有很多,例如基于数据库、基于redis、基于ZooKeeper等。无论采用何种方式,分布式锁都是分布式系统中非常重要的一种同步机制,它可以保证数据的一致性,提高系统的可靠性和稳定性。

下面将分别介绍这些锁的实现方式和Java代码示例。

  1. 基于数据库实现分布式锁

基于数据库实现分布式锁的方式是将锁信息存储在数据库中,通过对数据库中的数据进行读写操作来实现锁的获取和释放。通常,锁信息包括锁名称、锁状态、锁拥有者等信息。

下面是使用MySQL实现分布式锁的Java代码示例:

public class MysqlLock {

    private static final String LOCK_NAME = "lock_name" ;
    private static final String LOCKED = "locked";
    private static final String UNLOCKED = "unlocked" ;
    private DataSource dataSource;

    public MysqlLock(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    public boolean lock() {
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        try {
            conn = dataSource.getConnection();
            conn.setAutoCommit(false);
            String sql = "select * from lock_table where lock_name = ? for update";
            ps = conn.prepareStatement(sql);
            ps.setString(1, LOCK_NAME);
            rs = ps.executeQuery();
            if (rs.next()) {
                String status = rs.getString("status")
                    if (LOCKED.equals(status)) {
                    return false;
                } else {
                    sql = "update lock_table set status = ? where lock_name = ?";
                    ps = conn.prepareStatement(sql);
                    ps.setString(1, LOCKED);
                    ps.setString(2, LOCK_NAME);
                    ps.executeUpdate();
                    conn.commit();
                    return true;
                }
            } else {
                sql = "insert into lock_table(lock_name, status) values(?, ?)" ;
                ps = conn.prepareStatement(sql);
                ps.setString(1, LOCK_NAME);
                ps.setString(2, LOCKED);
                ps.executeUpdate();
                conn.commit();
                return true;
            }
        } catch (SQLException e) {
            try {
                if (conn != null) {
                    conn.rollback();
                }
            } catch (SQLException ex) {
                ex.printStackTrace();
            }
            e.printStackTrace();
            return false;
        } finally {
            try {
                if (rs != null) {
                    rs.close();
                }
                if (ps != null) {
                    ps.close();
                }
                if (conn != null) {
                    conn.close();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    public void unlock() {
        Connection conn = null;
        PreparedStatement ps = null;
        try {
            conn = dataSource.getConnection();
            conn.setAutoCommit(false);
            String sql = "update lock_table set status = ? where lock_name = ?";
            ps = conn.prepareStatement(sql);
            ps.setString(1, UNLOCKED);
            ps.setString(2, LOCK_NAME);
            ps.executeUpdate();
            conn.commit();
        } catch (SQLException e) {
            try {
                if (conn != null) {
                    conn.rollback();
                }
            } catch (SQLException ex) {
                ex.printStackTrace();
            }
            e.printStackTrace();
        } finally {
            try {
                if (ps != null) {
                    ps.close();
                }
                if (conn != null) {
                    conn.close();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}



  1. 基于Redis实现分布式锁

Redis分布式锁是一种基于Redis实现的分布式锁,可以保证在分布式环境下多个进程或线程对共享资源的互斥访问。它的实现基于Redis的原子操作和过期时间机制。

实现步骤:

  1. 获取锁:使用SETNX命令在Redis中创建一个锁,如果返回值为1,表示获取锁成功,否则获取锁失败。

  2. 设置过期时间:为了防止锁被长时间占用而导致死锁,需要给锁设置一个过期时间,使用EXPIRE命令设置锁的过期时间。

  3. 释放锁:使用DEL命令删除锁。

需要注意的是,在释放锁之前需要判断当前线程是否持有锁,防止误删其他线程的锁。

下面是使用Redis实现分布式锁的Java代码示例:

public class RedisDistributedLock {

    private static final String LOCK_KEY_PREFIX = "lock:";
    private static final int LOCK_EXPIRE_TIME = 60; 
    private static final int MAX_RETRY_TIMES = 3; 
    private static final int RETRY_INTERVAL = 100; 

    private JedisPool jedisPool;

    public RedisDistributedLock(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }

    public boolean lock(String key) {
        String lockKey = LOCK_KEY_PREFIX + key;
        Jedis jedis = null;
        int retryTimes = 0;
        try {
            jedis = jedisPool.getResource();
            while (retryTimes < MAX_RETRY_TIMES) {
                long result = jedis.setnx(lockKey, "1");
                if (result == 1) {
                    jedis.expire(lockKey, LOCK_EXPIRE_TIME);
                    return true;
                }
                retryTimes++;
                Thread.sleep(RETRY_INTERVAL);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
        return false;
    }

    public void unlock(String key) {
        String lockKey = LOCK_KEY_PREFIX + key;
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            String value = jedis.get(lockKey);
            if (value != null && value.equals("1" {
                jedis.del(lockKey);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
    }
}


使用示例:

JedisPool jedisPool = new JedisPool("localhost" 6379);
RedisDistributedLock lock = new RedisDistributedLock(jedisPool);
String key = "test";
if (lock.lock(key)) {
    try {
        // 执行业务逻辑
    } finally {
        lock.unlock(key);
    }
} else {
    // 获取锁失败,处理异常情况
}
 
  1. 基于Zookeeper实现分布式锁

基于Zookeeper实现分布式锁的方式是将锁信息存储在Zookeeper中,通过对Zookeeper中的节点进行创建、删除、监听等操作来实现锁的获取和释放。通常,锁信息包括锁名称、锁拥有者、锁节点等信息。

下面是使用Zookeeper实现分布式锁的Java代码示例:

public class ZookeeperLock {

    private static final String LOCK_PREFIX = "lock_" ;
    private ZooKeeper zooKeeper;

    public ZookeeperLock(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    public boolean lock(String lockName, String lockOwner) {
        try {
            String lockPath = LOCK_PREFIX + lockName;
            String nodePath = zooKeeper.create(lockPath + " lockOwner.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            List> nodeList = zooKeeper.getChildren(lockPath, false);
            Collections.sort(nodeList);
            String firstNode = nodeList.get(0);
            if (nodePath.endsWith(firstNode)) {
                return true;
            } else {
                String prevNode = nodeList.get(nodeList.indexOf(nodePath.substring(lockPath.length() + 1)) - 1);
                CountDownLatch latch = new CountDownLatch(1);
                Stat stat = zooKeeper.exists(lockPath + " + prevNode, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        if (event.getType() == Event.EventType.NodeDeleted) {
                            latch.countDown();
                        }
                    }
                });
                if (stat != null) {
                    latch.await();
                    return true;
                } else {
                    return false;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    public void unlock(String lockName, String lockOwner) {
        try {
            String lockPath = LOCK_PREFIX + lockName;
            String nodePath = zooKeeper.create(lockPath + " lockOwner.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            zooKeeper.delete(nodePath, -1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

总结:

在实际开发中,分布式锁是一种常见的解决并发问题的方式。但是,不同的应用场景可能需要不同的分布式锁实现方式,同时,分布式锁的性能和可靠性也是需要考虑的问题,例如锁的粒度、锁的超时等。因此,在选择分布式锁实现方式时,需要综合考虑各种因素,以达到最佳的性能和可靠性。

文章来自个人专栏
文章 | 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0