Java代码操作ZooKeeper(使用原生 ZooKeeper 客户端库)
1.6.1 连接Zookeeper
1.6.1.1 添加 Maven 依赖
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.7.0</version>
</dependency>
1.6.1.2 编写 Java 代码
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;import java.io.IOException;public class ConnectZookeeper {private static final String ZK_HOST = "192.168.200.138:2181";private static final int CONNECT_TIMEOUT = 20000; // 设置超时时间需要合理public static void main(String[] args) throws IOException, InterruptedException {// 创建一个ZooKeeper客户端实例ZooKeeper zk = new ZooKeeper(ZK_HOST, CONNECT_TIMEOUT, event -> {System.out.println("Watcher的process被调用了");if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {System.out.println("连接成功");}if (event.getState() == Watcher.Event.KeeperState.Closed) {System.out.println("连接关闭");}});System.out.println("客户端开始连接ZK服务器");// 等待连接到ZooKeeperZooKeeper.States state = zk.getState();System.out.println(state);Thread.sleep(20000);state = zk.getState();System.out.println(state);Thread.sleep(20000);zk.close();}
}
可以看到如下运行结果:
客户端开始连接ZK服务器
CONNECTING
Watcher的process被调用了
连接成功
CONNECTED
Watcher的process被调用了
连接关闭
1.6.2 增删改查操作及Watcher监听
import org.apache.zookeeper.*;import java.io.IOException;public class ZookeeperCRUD {private static final String ZK_HOST = "192.168.200.138:2181";private static final int CONNECT_TIMEOUT = 20000; // 设置超时时间需要合理private static final String PATH = "/my_crud_node";public static void main(String[] args) throws IOException, InterruptedException, KeeperException {// 创建一个ZooKeeper客户端实例ZooKeeper zk = new ZooKeeper(ZK_HOST, CONNECT_TIMEOUT, event -> {if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {System.out.println("连接成功");}// 监听数据变化if (event.getType()== Watcher.Event.EventType.NodeDataChanged) {System.out.println("数据被改变");}if (event.getType()== Watcher.Event.EventType.NodeDeleted) {System.out.println("节点已删除");}if (event.getState() == Watcher.Event.KeeperState.Closed) {System.out.println("连接关闭");}});ZooKeeper.States state = zk.getState();System.out.println(state);Thread.sleep(20000);state = zk.getState();System.out.println(state);Thread.sleep(2000);//对节点进行增删改查// PERSISTENT 节点是一种持久化的节点类型,它在客户端会话结束时不会自动删除,而是会一直保留在 ZooKeeper 中,直到被显式删除zk.create(PATH, "I like ZooKeeper".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);byte[] data;data = zk.getData(PATH, true, null);System.out.println(new String(data));zk.setData(PATH, "I like Java".getBytes(), -1);// 第二个参数设置成true,表示开启监听data = zk.getData(PATH, true, null);System.out.println(new String(data));zk.delete(PATH, -1);zk.close();}
}