public class DistributedLock { private final ZooKeeper zk; private final String lockBasePath; private final String lockName; private String lockPath; public DistributedLock(ZooKeeper zk, String lockBasePath, String lockName) { this.zk = zk; this.lockBasePath = lockBasePath; this.lockName = lockName; } public void lock() throws IOException { try { // lockPath will be different than (lockBasePath + "/" + lockName) becuase of the sequence number ZooKeeper appends lockPath = zk.create(lockBasePath + "/" + lockName, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); final Object lock = new Object(); // The requests in the same jvm will be blocked here waiting for wait() or notifyAll(). This will prevent missing notifications. synchronized(lock) { while(true) { List nodes = zk.getChildren(lockBasePath, new Watcher() { @Override public void process(WatchedEvent event) { synchronized (lock) { // When the brother nodes are changed, all waiting threads will be notified. lock.notifyAll(); } } }); Collections.sort(nodes); // ZooKeeper node names can be sorted lexographically if (lockPath.endsWith(nodes.get(0))) { return; } else { // This will give up the lock and wait the next notification. When woken up, it will go through the WHILE block again lock.wait(); } } } } catch (KeeperException e) { throw new IOException (e); } catch (InterruptedException e) { throw new IOException (e); } } public void unlock() throws IOException { try { // This will trigger the Watcher.process() zk.delete(lockPath, -1); lockPath = null; } catch (KeeperException e) { throw new IOException (e); } catch (InterruptedException e) { throw new IOException (e); } }}
Jedis实现的队列
利用 Redis 的 LIST 类型数据的 RPUSH 和 BLPOP 方法实现消息的生产和消费
public long rpush(final String... value) { if (value == null) return -1; return (Long) execute((Jedis jedis) -> jedis.rpush(getId(), value));}public long rpushObject(final Object value) { if (value == null) return -1; return (Long) execute((Jedis jedis) -> jedis.rpush(getId().getBytes(), SerializeUtil.serialize(value)));}public long rpushObject(final Object... value) { if (value == null || value.length == 0) return -1; return (Long) execute((Jedis jedis) -> jedis.rpush(getId().getBytes(), SerializeUtil.serialize(value)));}public List blpop(int timeout) { return (List ) execute((Jedis jedis)-> jedis.blpop(timeout, getId()));}public List
业务中使用队列
@Overridepublic long lRpush(String id, String value) { return factory.getList(id).rpush(value);}@Overridepublic long lRpushObject(String id, Object value) { return factory.getList(id).rpushObject(value);}@Overridepublic List lBlpop(String id, int timeout) { return factory.getList(id).blpop(timeout);}@Overridepublic List lBlpopObject(String id, int timeout) { return factory.getList(id).blpopObject(timeout);}/* * ========================================= */@Overridepublic long pushToQueue(int type, String id) { QueueItemDTO item = new QueueItemDTO(type, id); String value = JacksonUtils.compressObject(item); if (redisService.sIsMember(REDIS_SET_TRANS, value)) { logger.info("Item:{} exists in queue, skip.", value); return 0; } redisService.sAdd(REDIS_SET_TRANS, value); long size = redisService.lRpush(REDIS_QUEUE_TRANS, value); logger.info("Request:{} pushed to queue. size:{}", value, size); return size;}@Overridepublic QueueItemDTO readQueue() { List list = redisService.lBlpop(REDIS_QUEUE_TRANS, 5); if (list != null && list.size() > 1) { logger.info("Queue:{}, pop:{}", list.get(0), list.get(1)); redisService.sRemove(REDIS_SET_TRANS, list.get(1)); return JacksonUtils.extractObject(list.get(1), QueueItemDTO.class); } else { return null; }}