分布式系统中,多个节点之间的通信是不可避免的,而这种通信会导致数据一致性问题。如果多个节点同时对同一个共享资源进行操作,就会出现竞争的情况,进而导致数据不一致性。分布式锁可以解决这个问题,它可以确保同一时刻只有一个节点能够访问共享资源,从而保证数据的一致性。分布式锁的实现方式有很多,例如基于数据库、基于redis、基于ZooKeeper等。无论采用何种方式,分布式锁都是分布式系统中非常重要的一种同步机制,它可以保证数据的一致性,提高系统的可靠性和稳定性。
下面将分别介绍这些锁的实现方式和Java代码示例。
-
基于数据库实现分布式锁
基于数据库实现分布式锁的方式是将锁信息存储在数据库中,通过对数据库中的数据进行读写操作来实现锁的获取和释放。通常,锁信息包括锁名称、锁状态、锁拥有者等信息。
下面是使用MySQL实现分布式锁的Java代码示例:
public class MysqlLock {
private static final String LOCK_NAME = "lock_name" ;
private static final String LOCKED = "locked";
private static final String UNLOCKED = "unlocked" ;
private DataSource dataSource;
public MysqlLock(DataSource dataSource) {
this.dataSource = dataSource;
}
public boolean lock() {
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
try {
conn = dataSource.getConnection();
conn.setAutoCommit(false);
String sql = "select * from lock_table where lock_name = ? for update";
ps = conn.prepareStatement(sql);
ps.setString(1, LOCK_NAME);
rs = ps.executeQuery();
if (rs.next()) {
String status = rs.getString("status")
if (LOCKED.equals(status)) {
return false;
} else {
sql = "update lock_table set status = ? where lock_name = ?";
ps = conn.prepareStatement(sql);
ps.setString(1, LOCKED);
ps.setString(2, LOCK_NAME);
ps.executeUpdate();
conn.commit();
return true;
}
} else {
sql = "insert into lock_table(lock_name, status) values(?, ?)" ;
ps = conn.prepareStatement(sql);
ps.setString(1, LOCK_NAME);
ps.setString(2, LOCKED);
ps.executeUpdate();
conn.commit();
return true;
}
} catch (SQLException e) {
try {
if (conn != null) {
conn.rollback();
}
} catch (SQLException ex) {
ex.printStackTrace();
}
e.printStackTrace();
return false;
} finally {
try {
if (rs != null) {
rs.close();
}
if (ps != null) {
ps.close();
}
if (conn != null) {
conn.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
public void unlock() {
Connection conn = null;
PreparedStatement ps = null;
try {
conn = dataSource.getConnection();
conn.setAutoCommit(false);
String sql = "update lock_table set status = ? where lock_name = ?";
ps = conn.prepareStatement(sql);
ps.setString(1, UNLOCKED);
ps.setString(2, LOCK_NAME);
ps.executeUpdate();
conn.commit();
} catch (SQLException e) {
try {
if (conn != null) {
conn.rollback();
}
} catch (SQLException ex) {
ex.printStackTrace();
}
e.printStackTrace();
} finally {
try {
if (ps != null) {
ps.close();
}
if (conn != null) {
conn.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
-
基于Redis实现分布式锁
Redis分布式锁是一种基于Redis实现的分布式锁,可以保证在分布式环境下多个进程或线程对共享资源的互斥访问。它的实现基于Redis的原子操作和过期时间机制。
实现步骤:
-
获取锁:使用SETNX命令在Redis中创建一个锁,如果返回值为1,表示获取锁成功,否则获取锁失败。
-
设置过期时间:为了防止锁被长时间占用而导致死锁,需要给锁设置一个过期时间,使用EXPIRE命令设置锁的过期时间。
-
释放锁:使用DEL命令删除锁。
需要注意的是,在释放锁之前需要判断当前线程是否持有锁,防止误删其他线程的锁。
下面是使用Redis实现分布式锁的Java代码示例:
public class RedisDistributedLock {
private static final String LOCK_KEY_PREFIX = "lock:";
private static final int LOCK_EXPIRE_TIME = 60;
private static final int MAX_RETRY_TIMES = 3;
private static final int RETRY_INTERVAL = 100;
private JedisPool jedisPool;
public RedisDistributedLock(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
public boolean lock(String key) {
String lockKey = LOCK_KEY_PREFIX + key;
Jedis jedis = null;
int retryTimes = 0;
try {
jedis = jedisPool.getResource();
while (retryTimes < MAX_RETRY_TIMES) {
long result = jedis.setnx(lockKey, "1");
if (result == 1) {
jedis.expire(lockKey, LOCK_EXPIRE_TIME);
return true;
}
retryTimes++;
Thread.sleep(RETRY_INTERVAL);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (jedis != null) {
jedis.close();
}
}
return false;
}
public void unlock(String key) {
String lockKey = LOCK_KEY_PREFIX + key;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
String value = jedis.get(lockKey);
if (value != null && value.equals("1" {
jedis.del(lockKey);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}
使用示例:
JedisPool jedisPool = new JedisPool("localhost" 6379);
RedisDistributedLock lock = new RedisDistributedLock(jedisPool);
String key = "test";
if (lock.lock(key)) {
try {
// 执行业务逻辑
} finally {
lock.unlock(key);
}
} else {
// 获取锁失败,处理异常情况
}
-
基于Zookeeper实现分布式锁
基于Zookeeper实现分布式锁的方式是将锁信息存储在Zookeeper中,通过对Zookeeper中的节点进行创建、删除、监听等操作来实现锁的获取和释放。通常,锁信息包括锁名称、锁拥有者、锁节点等信息。
下面是使用Zookeeper实现分布式锁的Java代码示例:
public class ZookeeperLock {
private static final String LOCK_PREFIX = "lock_" ;
private ZooKeeper zooKeeper;
public ZookeeperLock(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
public boolean lock(String lockName, String lockOwner) {
try {
String lockPath = LOCK_PREFIX + lockName;
String nodePath = zooKeeper.create(lockPath + " lockOwner.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
List> nodeList = zooKeeper.getChildren(lockPath, false);
Collections.sort(nodeList);
String firstNode = nodeList.get(0);
if (nodePath.endsWith(firstNode)) {
return true;
} else {
String prevNode = nodeList.get(nodeList.indexOf(nodePath.substring(lockPath.length() + 1)) - 1);
CountDownLatch latch = new CountDownLatch(1);
Stat stat = zooKeeper.exists(lockPath + " + prevNode, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted) {
latch.countDown();
}
}
});
if (stat != null) {
latch.await();
return true;
} else {
return false;
}
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public void unlock(String lockName, String lockOwner) {
try {
String lockPath = LOCK_PREFIX + lockName;
String nodePath = zooKeeper.create(lockPath + " lockOwner.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
zooKeeper.delete(nodePath, -1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
总结:
在实际开发中,分布式锁是一种常见的解决并发问题的方式。但是,不同的应用场景可能需要不同的分布式锁实现方式,同时,分布式锁的性能和可靠性也是需要考虑的问题,例如锁的粒度、锁的超时等。因此,在选择分布式锁实现方式时,需要综合考虑各种因素,以达到最佳的性能和可靠性。