Curator Api使用

Curator也是Zookeeper的一个Java客户端,比之前的Zookeeper Api提供了更好的封装,开发更加方便了

Curator相较于Zookeeper Api有以下优点:

  • Zookeeper Api获取连接是异步的,需要手动编写阻塞代码。而在Curator不需要
  • Curator提供了自动重连Session策略
  • Curator注册的监听器可以重复注册
  • Curator支持递归创建节点

Curator快速上手

引入Curator依赖

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>5.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.0.0</version>
</dependency>

获取连接

CuratorFramework client = CuratorFrameworkFactory.builder()
        // 如果是集群,连接多个节点,用 , 分隔。192.168.159.129:2181,192.168.159.130:2181
        .connectString("192.168.159.129:2181")
        // session超时时间为5s
        .sessionTimeoutMs(5000)
        // 断开连接3s后进行一次重连
        .retryPolicy(new RetryOneTime(3000))
        // 以 /demo 节点为父节点,在这个节点下进行任意操作
        .namespace("/demo")
        .build();

// 打开连接
client.start();

// 对zookeeper进行操作
// ...

// 关闭连接
client.close();

上面 RetryOneTime 在失去连接3s之后只会进行一次重连,当然Curator还为我们提供了其它的重连规则

session重连策略

  • RetryPolicy retry Policy = new RetryOneTime(3000);
    • 说明:三秒后重连一次,只重连一次
  • RetryPolicy retryPolicy = new RetryNTimes(3, 3000);
    • 说明:每三秒重连一次,重连三次
  • RetryPolicy retryPolicy = new RetryUntilElapsed(10000, 3000);
    • 说明:每三秒重连一次,总等待时间超过个10秒后停止重连
  • RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3)
    • 说明:这个策略的重试间隔会越来越长,重连的时间间隔一般是随着重试的次数递增的
    • 公式:baseSleepTImeMs * Math.max(1,random.nextInt(1 << (retryCount + 1)))
      • retryCount = 3
      • baseSleepTImeMs = 1000

Curator基本操作

创建节点

通过 CuratorFrameworkcreate 方法创建 /demo/node1 节点,值为 李玲。如果需要递归创建节点,需要在链式方法中再加一个 creatingParentsIfNeeded() 方法。inBackground 回调方法可以做节点创建成功后要做的事情。

client.create()
    // 递归创建
    // .creatingParentsIfNeeded()
    // 创建模式和权限和zookeeper api一样
    .withMode(CreateMode.PERSISTENT)
    .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
    // 异步回调方法
    .inBackground(new BackgroundCallback() {
        @Override
        public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
            // 异步回调
            // curatorEvent.getPath()   节点路径
            // curatorEvent.getType()   事件类型
        }
    })
    // ${namespace}/node1 => /demo/node1
    .forPath("/node1", "李玲".getBytes(StandardCharsets.UTF_8));

更新节点

client.setData()
    // 和zookeeper api一样,-1代表不把版本号作为更新条件
    .withVersion(-1)
    .forPath("/node1", "咯咯咯".getBytes(StandardCharsets.UTF_8));

删除节点

client.delete()
    .forPath("/node1");

读取节点

byte[] bytes = client.getData()
    .forPath("/node1");
System.out.println(new String(bytes, StandardCharsets.UTF_8));

判断节点是否存在

Stat stat = client.checkExists()
    .forPath("/node2");
// 不存在,Stat对象为null

Curator监听器

Curator的监听器可以重复监听事件,不需要再次注册

  • NodeCache :只监听自己的create、delete、set事件,使用 NodeCacheListener 实例监听事件

    监听 /demo/node1 节点

    NodeCache nodeCache = new NodeCache(client, "/node1");
    CountDownLatch countDownLatch = new CountDownLatch(1);
    nodeCache.getListenable().addListener(new NodeCacheListener() {
        @Override
        public void nodeChanged() throws Exception {
            ChildData childData = nodeCache.getCurrentData();
            System.out.println("--------------new----------------");
            System.out.println(childData.getPath());
            System.out.println(new String(childData.getData(), StandardCharsets.UTF_8));
        }
    });
    nodeCache.start();
    System.out.println("注册完成");
    countDownLatch.await();
    nodeCache.close();
    
  • PathChildrenCache:只能监听子节点,但是无法对子节点的子节点进行监听,也不能监听自己,使用 PathChildrenCacheEvent 实例监听事件

    第三个参数代表是否可以获取到子节点的数据,如果是false,那么childData.getData()为空

    PathChildrenCache nodeCache = new PathChildrenCache(client, "/node1", true);
    CountDownLatch countDownLatch = new CountDownLatch(1);
    nodeCache.getListenable().addListener(new PathChildrenCacheListener() {
        @Override
        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
            ChildData childData = event.getData();
            System.out.println("--------------new----------------");
            System.out.println(childData.getPath());
            System.out.println(new String(childData.getData(), StandardCharsets.UTF_8));
        }
    });
    nodeCache.start();
    System.out.println("注册完成");
    countDownLatch.await();
    nodeCache.close();
    
  • TreeCache:综合NodeCache和PathChildrenCahce的特性,既可以监听自己又可以监听子节点,同时还可以设置子节点的监听深度

    setCacheDataPathChildrenCache 第三个参数作用一样

    setMaxDepth 设置最大监听深度,比如这里只允许监听到 /demo/node1/a/b ,无法监听 /demo/node1/a/b/c

    CountDownLatch countDownLatch = new CountDownLatch(1);
    TreeCache treeCache = TreeCache.newBuilder(client, "/node1")
            .setCacheData(true)
            .setMaxDepth(2)
            .build();
    treeCache.start();
    treeCache.getListenable().addListener(new TreeCacheListener() {
        @Override
        public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
            ChildData childData = event.getData();
            System.out.println("--------------new----------------");
            System.out.println(childData.getPath());
            System.out.println(new String(childData.getData(), StandardCharsets.UTF_8));
        }
    });
    countDownLatch.await();
    treeCache.close();
    

不过这些方法在 Curator 5.0.0版本中都被声明成不推荐的了,最新版本的Curator推荐我们使用 CuratorCache 这个类

image20200701164012813.png

image20200701164023404.png

image20200701164034201.png

CuratorCache 提供了比 TreeCache 更方便的实现和强大的功能,在event回调函数中,oldData表示之前的数据,data表示更新后的数据。

并且对子节点无限制深度的监听,还可以对自己的监听

CountDownLatch countDownLatch = new CountDownLatch(1);
CuratorCache curatorCache = CuratorCache.build(client, "/node1");
curatorCache.start();
curatorCache.listenable().addListener(new CuratorCacheListener() {
    @Override
    public void event(Type type, ChildData oldData, ChildData data) {
        System.out.println("--------------old----------------");
        System.out.println(oldData.getPath());
        System.out.println(new String(oldData.getData(), StandardCharsets.UTF_8));

        System.out.println("--------------new----------------");
        System.out.println(data.getPath());
        System.out.println(new String(data.getData(), StandardCharsets.UTF_8));
    }
});
countDownLatch.await();
curatorCache.close();

Curator事务

下面这两句代码,如果创建 /a 节点成功之后,zookeeper连接断开了,并且赋值操作失败了。如果想这两个操作是一个原子性的,要么都成功,要么都失败,所以需要用到Curator的事务操作。

client.create().withMode(CreateMode.PERSISTENT).forPath("/a");
client.setData().forPath("/a", "尬尬".getBytes(StandardCharsets.UTF_8));

改写成:

client.inTransaction().create().withMode(CreateMode.PERSISTENT).forPath("/a")
    .and()
    .setData().forPath("/a", "尬尬".getBytes(StandardCharsets.UTF_8));

生成分布式ID

在过去的单库单表型系统中,通常第可以使用数据库字段自带的auto_ increment属性来自动为每条记录生成个唯一的ID。但是分库分表后,就无法在依靠数据库的auto_ increment属性来唯一标识一条记录了。此时我们就可以用zookeeper在分布式环境下生成全局唯一ID

zookeeper生成自增ID的原理就是每次创建一个有序的临时节点

create -s -e /node_

/node_ 后面就是10位的自增数值,每次当session断开时,这个临时节点就会自动销毁

client.create()
    .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
    .forPath("/node_");

生成 /node_0000000001 的空值节点,截取掉前面的 node_ 就是id了

分布式锁

设计思路

  1. 每个客户端往/Locks下创建临时有序节点/Locks/Lock_,创建成功后/Locks下面会有每个客户端对应的节点,如/Locks/Lock_000000001
  2. 客户端取得/Locks下子节点,并进行排序,判断排在前面的是否为自己,如果自己的锁节点在第一位,代表获取锁成功
  3. 如果自己的锁节点不在第一位,则监听自己前一位的锁节点。例如,自己锁节点Lock_000000002,那么则监听Lock_000000001
  4. 当前一位锁节点(Lock_000000001)对应的客户端执行完成,释放了锁,将会触发监听客户端(Lock_000000002)的逻辑
  5. 监听客户端重新执行第2步逻辑,判断自己是否获得了锁

如果用的是Zookeeper Api的话,需要写很多代码,还好Curator为我们封装了这些复杂实现,避免了不必要的Bug

排它锁

排它锁在同一时间只能有一个客户端持有锁

InterProcessMutex lock = new InterProcessMutex(client, "/lock");
try {
    // 获取锁
    System.out.println("等待获取锁");
    lock.acquire();
    System.out.println("获取锁");
    // 等待3秒
    Thread.sleep(3000);
    // 释放锁
    lock.release();
    System.out.println("释放锁锁");
} catch (Exception e) {
    e.printStackTrace();
}

读写锁

多个客户端会同时获取读锁,只能同一事件获取写锁。也不能同时获取读锁和写锁。

/**
 * 读锁
 */
public static void readLock(CuratorFramework client) {
    // 获取读写锁
    InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
    // 获取读锁
    InterProcessMutex readLock = lock.readLock();

    // 获取锁
    System.out.println("等待获取锁");
    try {
        readLock.acquire();
        System.out.println("获取锁");
        for (int i = 0; i < 10; ++i) {
            Thread.sleep(1000);
            System.out.println(i);
        }
        // 释放锁
        readLock.release();
        System.out.println("释放锁锁");
    } catch (Exception e) {
        e.printStackTrace();
    }
}
/**
 * 写锁
 */
public static void writeLock(CuratorFramework client) {
    // 获取读写锁
    InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
    // 获取写锁
    InterProcessMutex writeLock = lock.writeLock();
    
    // 获取锁
    System.out.println("等待获取锁");
    try {
        writeLock.acquire();
        System.out.println("获取锁");
        for (int i = 0; i < 10; ++i) {
            Thread.sleep(1000);
            System.out.println(i);
        }
        // 释放锁
        writeLock.release();
        System.out.println("释放锁锁");
    } catch (Exception e) {
        e.printStackTrace();
    }
}
# Zookeeper 

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×