我们程序中多多少少会有一些共享的资源或者数据,在某些时候我们需要保证同一时间只能有一个线程访问或者操作它们。在传统的单机部署的情况下,我们简单的使用 Java 提供的并发相关的 API 处理即可。但是现在大多数服务都采用分布式的部署方式,我们就需要提供一个跨进程的互斥机制来控制共享资源的访问,这种互斥机制就是我们所说的分布式锁。
互斥性。在任时刻,只有一个客户端能持有锁。
不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。这个其实只要我们给锁加上超时时间即可。
具有容错性。只要大部分的 Redis 节点正常运行,客户端就可以加锁和解锁。
解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
1 2 3 4 5 6 7 8 | @Documented@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.METHOD)public @interface DistributeLock {String key();long timeout() default 5;TimeUnit timeUnit() default TimeUnit.SECONDS;} |
其中,key 为分布式所的 key 值,timeout 为锁的超时时间,默认为
5,timeUnit 为超时时间的单位,默认为秒。
由于注解属性在指定的时候只能为常量,我们无法直接使用方法的参数。而在绝大多数的情况下分布式锁的 key
值是需要包含方法的一个或者多个参数的,这就需要我们将这些参数的位置以某种特殊的字符串表示出来,然后通过参数解析器去动态的解析出来这些参数具体的值,然后拼接到 key 上。在本教程中我也编写了一个参数解析器 AnnotationResolver。篇幅原因,其源码就不直接粘在文中,需要的读者可以查看源码。
package cn.itweknow.sbaop.annotation;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.lang.reflect.Method;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@Component
public class AnnotationResolver {
public String resolver(JoinPoint joinpoint, String toResolverStr) throws Exception {
if (StringUtils.isEmpty(toResolverStr)) {
return null;
}
StringBuffer sb = new StringBuffer();
String pattern = "#\\{(.+?)\\}";
Pattern r = Pattern.compile(pattern);
Matcher m = r.matcher(toResolverStr);
while (m.find()) {
String key = m.group().replaceAll("#\\{", "").replaceAll("\\}", "");
if (key.contains(".")) {
m.appendReplacement(sb, complexResolver(joinpoint, key));
} else {
m.appendReplacement(sb, simpleResolver(joinpoint, key));
}
}
return sb.toString();
}
private String simpleResolver(JoinPoint joinPoint, String str) {
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
String[] names = methodSignature.getParameterNames();
Object[] args = joinPoint.getArgs();
for (int i = 0; i < names.length; i++) {
if (str.equals(names[i])) {
return args[i].toString();
}
}
return null;
}
private String complexResolver(JoinPoint joinPoint, String str) throws Exception {
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
String[] names = methodSignature.getParameterNames();
Object[] args = joinPoint.getArgs();
String[] strs = str.split("\\.");
for (int i = 0; i < names.length; i++) {
if (strs[0].equals(names[i])) {
Object obj = args[i];
Method dmethod = obj.getClass().getDeclaredMethod(getMethodName(strs[1]), null);
Object value = dmethod.invoke(args[i]);
// todo 空指针问题
return getValue(value, 1, strs).toString();
}
}
return null;
}
private Object getValue(Object obj, int index, String[] strs) {
try {
if (obj != null && index < strs.length - 1) {
Method method = obj.getClass().getDeclaredMethod(getMethodName(strs[index + 1]), null);
obj = method.invoke(obj);
getValue(obj, index + 1, strs);
}
return obj;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
private String getMethodName(String name) {
return "get" + name.replaceFirst(name.substring(0, 1), name.substring(0, 1).toUpperCase());
}
}1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | private String getLock(String key, long timeout, TimeUnit timeUnit) {try {String value = UUID.randomUUID().toString();Boolean lockStat = stringRedisTemplate.execute((RedisCallback< Boolean>)connection ->connection.set(key.getBytes(Charset.forName("UTF-8")), value.getBytes(Charset.forName("UTF-8")),Expiration.from(timeout, timeUnit), RedisStringCommands.SetOption.SET_IF_ABSENT));if (!lockStat) {// 获取锁失败。return null;}return value;} catch (Exception e) {logger.error("获取分布式锁失败,key={}", key, e);return null;}} |
RedisStringCommands.SetOption.SET_IF_ABSENT 实际上是使用了 setNX 命令,如果 key 已经存在的话则不进行任何操作返回失败,如果 key 不存在的话则保存 key
并返回成功,该命令在成功的时候返回 1,结束的时候返回 0。我们随机产生了一个 value
并且在获取锁成功的时候返回出去,是为了在释放锁的时候对该值进行比较,这样可以做到解铃还须系铃人,由谁创建的锁就由谁释放。同时还指定了超时时间,这样可以保证锁释放失败的情况下不会造成接口永远不能访问。
1 2 3 4 5 6 7 8 9 10 11 12 13 | private void unLock(String key, String value) {try {String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";boolean unLockStat = stringRedisTemplate.execute((RedisCallback< Boolean>)connection ->connection.eval(script.getBytes(), ReturnType.BOOLEAN, 1,key.getBytes(Charset.forName("UTF-8")), value.getBytes(Charset.forName("UTF-8"))));if (!unLockStat) {logger.error("释放分布式锁失败,key={},已自动超时,其他线程可能已经重新获取锁", key);}} catch (Exception e) {logger.error("释放分布式锁失败,key={}", key, e);}} |
切点和 Web 日志处理的切点一样,这里不再赘述。我们在切面中使用的通知类型为 @Around,在切点之前我们先尝试获取锁,若获取锁失败则直接返回错误信息,若获取锁成功则执行方法体,当方法结束后(无论是正常结束还是异常终止)释放锁。
本文系作者在时代Java发表,未经许可,不得转载。
如有侵权,请联系nowjava@qq.com删除。