ZooKeeper Java客户端

这里介绍两个客户端,一个为 ZooKeeper 原生客户端,一个为 ZkClient

首先创建一个maven项目,pom文件中添加依赖

1
2
3
4
5
6
7
8
9
10
11
12
<!-- 原生 -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.12</version>
</dependency>
<!-- zkclient -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>

ZooKeeper 原生客户端

之前安装过程看到 ZooKeeper 依赖 jdk 环境,可以看出它的源码使用了Java语言基础,学习原生客户端对于以后看源码有帮助,接下来看一看使用方式

创建会话

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* @Author SunnyBear
* @Description 创建Session
*/
public class TestCreateSession {

/**
* ZooKeeper服务地址
*/
private final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181";

/**
* 会话超时时间
*/
private static final int SESSION_TIMEOUT = 30000;

public static void main(String[] args) throws IOException, InterruptedException {
new TestCreateSession().testSession1();
System.out.println("--------------------------华丽的分割线--------------------------");
new TestCreateSession().testSession2();
}

/**
* 获得session的方式,这种方式可能会在ZooKeeper还没有获得连接的时候就已经对ZK进行访问了
* 测试可以发现连接状态为CONNECTING,而不是CONNECTED
*/
public void testSession1() throws IOException {
ZooKeeper zooKeeper = new ZooKeeper(SERVER, SESSION_TIMEOUT, null);
System.out.println("zooKeeper: " + zooKeeper);
System.out.println("zooKeeper.getState(): " + zooKeeper.getState());
}

/**
* 发令枪
*/
private CountDownLatch connectedSemaphore = new CountDownLatch(1);

/**
* 使用发令枪对获得Session的方式进行优化,在ZooKeeper初始化完成以前先等待,等待完成后再进行后续操作
*/
public void testSession2() throws IOException, InterruptedException {
ZooKeeper zooKeeper = new ZooKeeper(SERVER, SESSION_TIMEOUT, new Watcher() {
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
// 状态为已连接时才进行后续操作
connectedSemaphore.countDown();
System.out.println("状态为已连接。。");
}
}
});
connectedSemaphore.await();
System.out.println("zooKeeper: " + zooKeeper);
System.out.println("zooKeeper.getState(): " + zooKeeper.getState());
}
}

基本操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/**
* @Author SunnyBear
* @Description 基本操作
*/
public class TestJavaApi implements Watcher {

private static final int SESSION_TIMEOUT = 10000;
private static final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181";
private static final String ZK_PATH = "/SunnyBearApiTest";
private static final String ZK_DATA = "我是SunnyBearApiTest的数据";
private ZooKeeper zk = null;

private CountDownLatch connectedSemaphore = new CountDownLatch(1);

/**
* 创建连接
* @param server zk服务器地址列表
* @param sessionTimeout session超时时间
*/
public void createConnection(String server, int sessionTimeout) {
try {
zk = new ZooKeeper(server, sessionTimeout, this);
connectedSemaphore.await();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

/**
* 收到来自Server的Watcher通知,然后调用的方法处理
*/
public void process(WatchedEvent watchedEvent) {
System.out.println("收到事件通知:" + watchedEvent);
if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
connectedSemaphore.countDown();
}
}

/**
* 关闭连接
*/
public void releaseConnection() {
if (this.zk != null) {
try {
this.zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

/**
* 创建节点
* @param path 节点path
* @param data 初始数据内容
* @return 是否创建成功
*/
public boolean createPath(String path, String data) {
try {
/**
* ZooDefs.Ids.OPEN_ACL_UNSAFE:节点权限
* CreateMode.EPHEMERAL:节点类型为临时节点
*/
String createPath = this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("创建节点path:" + createPath);
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}

/**
* 读取指定节点数据
* @param path 节点path
* @return 节点内容
*/
public String readData(String path) {
try {
String data = new String(this.zk.getData(path, false, null));
System.out.println("读取数据成功path:" + path + ",data:" + data);
return data;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return "";
}

/**
* 更新指定节点数据
* @param path 节点path
* @param data 数据data
* @return 是否成功
*/
public boolean writeData(String path, String data) {
try {
System.out.println("更新数据成功,path:" + path + ", stat: "
+ this.zk.setData(path, data.getBytes(), -1));
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}

/**
* 删除节点
* @param path 节点path
* @return 是否成功
*/
public boolean deleteNode(String path) {
try {
this.zk.delete(path, -1);
System.out.println("删除节点成功,path:" + path);
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}

public static void main(String[] args) {
TestJavaApi testJavaApi = new TestJavaApi();
testJavaApi.createConnection(SERVER, SESSION_TIMEOUT);
if (testJavaApi.createPath(ZK_PATH, ZK_DATA)) {
System.out.println("创建后数据内容:" + testJavaApi.readData(ZK_PATH));
testJavaApi.writeData(ZK_PATH, "我是SunnyBearApiTest修改后的数据");
System.out.println("更新后数据内容:" + testJavaApi.readData(ZK_PATH));
testJavaApi.deleteNode(ZK_PATH);
}
testJavaApi.releaseConnection();
}
}

监听机制

Zookeeper采用了Watcher机制实现数据的发布/订阅功能。该机制在被订阅对象发生变化时会异步通知客户端,因此客户端不必在Watcher注册后轮询阻塞,从而减轻了客户端压力,Watcher是一次性的,如果被触发了需要重新注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
/**
* @Author SunnyBear
* @Description 监听机制
*/
public class TestWatcher implements Watcher {

private AtomicInteger seq = new AtomicInteger();
private static final int SESSION_TIMEOUT = 100000;
private static final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181";
private static final String PARENT_PATH = "/testWatch";
private static final String CHILDREN_PATH = "/testWatch/children";
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
private ZooKeeper zk = null;

/**
* 创建连接
* @param server zk服务器地址列表
* @param sessionTimeout session超时时间
*/
public void createConnection(String server, int sessionTimeout) {
try {
zk = new ZooKeeper(server, sessionTimeout, this);
connectedSemaphore.await();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

/**
* 关闭连接
*/
public void releaseConnection() {
if (this.zk != null) {
try {
this.zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

/**
* 创建节点
* @param path 节点path
* @param data 初始数据内容
* @return 是否创建成功
*/
public boolean createPath(String path, String data) {
try {
// 设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控)
this.zk.exists(path, true);
// 创建持久节点
String createPath = this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("创建节点path:" + createPath);
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}

/**
* 读取指定节点数据
* @param path 节点path
* @param needWatch 表示是否需要注册一个watcher。true:注册默认watcher,false:不需要注册watcher
* @return 节点内容
*/
public String readData(String path, boolean needWatch) {
try {
String data = new String(this.zk.getData(path, needWatch, null));
System.out.println("读取数据成功path:" + path + ",data:" + data);
return data;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return "";
}

/**
* 更新指定节点数据
* @param path 节点path
* @param data 数据data
* @return 是否成功
*/
public boolean writeData(String path, String data) {
try {
System.out.println("更新数据成功,path:" + path + ", stat: "
+ this.zk.setData(path, data.getBytes(), -1));
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}

/**
* 删除节点
* @param path 节点path
* @return 是否成功
*/
public boolean deleteNode(String path) {
try {
this.zk.delete(path, -1);
System.out.println("删除节点成功,path:" + path);
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}

/**
* 判断指定节点是否存在
* @param path 节点路径
* @param needWatch 表示是否需要注册一个watcher
*/
public Stat exists(String path, boolean needWatch) {
try {
return this.zk.exists(path, needWatch);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}

/**
* 获取子节点
* @param path 节点路径
* @param needWatch 表示是否需要注册一个watcher
*/
private List<String> getChildren(String path, boolean needWatch) {
try {
return this.zk.getChildren(path, needWatch);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}

/**
* 删除测试用的所有节点
*/
public void deleteAllTestPath() {
if(this.exists(CHILDREN_PATH, false) != null){
this.deleteNode(CHILDREN_PATH);
}
if(this.exists(PARENT_PATH, false) != null){
this.deleteNode(PARENT_PATH);
}
}

/**
* 收到来自Server的Watcher通知,然后调用的方法处理
*/
public void process(WatchedEvent watchedEvent) {
System.out.println("收到事件通知:" + watchedEvent);
try {
Thread.sleep(200);
if (watchedEvent == null) {
return;
}
// 连接状态
Event.KeeperState eventState = watchedEvent.getState();
// 事件类型
Event.EventType eventType = watchedEvent.getType();
// 受影响的path
String eventPath = watchedEvent.getPath();
// 打印一下监听的信息
String eventPrefix = "[Watch - " + this.seq.incrementAndGet() + "] ";
System.out.println(eventPrefix + "收到Watcher通知");
System.out.println(eventPrefix + "连接状态:\t" + eventState.toString());
System.out.println(eventPrefix + "事件类型:\t" + eventType.toString());

if (Event.KeeperState.SyncConnected == eventState) {
switch (eventType) {
case None:
System.out.println(eventPrefix + "成功连接zk服务器");
connectedSemaphore.countDown();
break;
case NodeCreated:
System.out.println(eventPrefix + "节点创建");
Thread.sleep(200);
this.zk.exists(eventPath, true);
break;
case NodeDataChanged:
System.out.println(eventPrefix + "节点数据更新");
Thread.sleep(200);
System.out.println(eventPrefix + "数据内容: " + this.readData(PARENT_PATH, true));
break;
case NodeChildrenChanged:
System.out.println(eventPrefix + "子节点变更");
Thread.sleep(3000);
System.out.println(eventPrefix + "子节点列表:" + this.getChildren(PARENT_PATH, true));
break;
case NodeDeleted:
System.out.println(eventPrefix + "节点 " + eventPath + " 被删除");
break;
default:
break;
}
} else if (Watcher.Event.KeeperState.Disconnected == eventState) {
System.out.println(eventPrefix + "与ZK服务器断开连接");
} else if (Watcher.Event.KeeperState.AuthFailed == eventState) {
System.out.println(eventPrefix + "权限检查失败");
} else if (Watcher.Event.KeeperState.Expired == eventState) {
System.out.println(eventPrefix + "会话失效");
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
System.out.println("-------------------------------------------------------------");
}

public static void main(String[] args) throws InterruptedException {
TestWatcher zkWatch = new TestWatcher();
// 创建连接
zkWatch.createConnection(SERVER, SESSION_TIMEOUT);

Thread.sleep(1000);

// 删除所有节点
zkWatch.deleteAllTestPath();

if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "")) {
/**
* 读取数据,在操作节点数据之前先调用zookeeper的getData()方法是为了可以watch到对节点的操作。watch是一次性的
* 也就是说,如果第二次又重新调用了setData()方法,在此之前需要重新调用一次
*/
System.out.println("------------------------读取PARENT------------------------");
zkWatch.readData(PARENT_PATH, true);
// 更新数据
zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");

Thread.sleep(1000);

/**
* 读取子节点,设置对子节点变化的watch,如果不写该方法,则在创建子节点是只会输出NodeCreated,而不会输出NodeChildrenChanged,
* 也就是说创建子节点时没有watch,
* 如果是递归的创建子节点,如path="/p/c1/c2"的话,getChildren(PARENT_PATH, ture)只会在创建c1时watch,输出c1的NodeChildrenChanged,
* 而不会输出创建c2时的NodeChildrenChanged,如果watch到c2的NodeChildrenChanged,则需要再调用一次getChildren(String path, true)方法,
* 其中path="/p/c1"
*/
System.out.println("------------------------读取CHILDREN------------------------");
zkWatch.getChildren(PARENT_PATH, true);

Thread.sleep(1000);
// 创建子节点
zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "");

Thread.sleep(1000);

zkWatch.readData(CHILDREN_PATH, true);
zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");
}

Thread.sleep(20000);
// 清理节点
zkWatch.deleteAllTestPath();
Thread.sleep(1000);
zkWatch.releaseConnection();
}
}

ZkClient

基本操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* @Author SunnyBear
* @Description ZkClient基本操作
*/
public class TestZkClientApi {

private static final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181";

private static final int SESSION_TIMEOUT = 30000;

public static void main(String[] args) {
ZkClient zkClient = new ZkClient(SERVER, SESSION_TIMEOUT);

// 创建临时节点,值为null
zkClient.createEphemeral("/zkTemp");
// 创建持久节点,,值为null,如果父节点不存在则创建
zkClient.createPersistent("/zkPersistent/zk1", true);
// 创建持久节点,有值
zkClient.createPersistent("/zkPersistent/zk2", "zk2内容");
zkClient.createPersistent("/zkPersistent/zk3", "zk3内容");

// 查询节点下面的所有节点
List<String> childrenList = zkClient.getChildren("/zkPersistent");
for (String p : childrenList) {
String childrenPath = "/zkPersistent/" + p;
// 查询节点数据
String data = zkClient.readData(childrenPath);
System.out.println(childrenPath + ":" + data);
}

// 修改节点数据
zkClient.writeData("/zkPersistent/zk1", "给zk1更新了内容");
System.out.println(zkClient.readData("/zkPersistent/zk1"));

// 删除节点
zkClient.delete("/zkTemp");
// 递归删除,即包含子目录的删除
zkClient.deleteRecursive("/zkPersistent");

// 关闭连接
zkClient.close();
}
}

监听机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/**
* @Author SunnyBear
* @Description ZkClient监听的测试
*/
public class ZkClientWatcher {

private static final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181";

private static final int SESSION_TIMEOUT = 30000;

public static void main(String[] args) throws InterruptedException {
// test1();
test2();
}

/**
* 订阅子节点的变化
*/
public static void test1() throws InterruptedException {
ZkClient zkClient = new ZkClient(new ZkConnection(SERVER), SESSION_TIMEOUT);

// 给父节点添加监听子节点变化
zkClient.subscribeChildChanges("/zkPersistent", new IZkChildListener() {
public void handleChildChange(String parentPath, List<String> currentChildList) throws Exception {
System.out.println("parentPath:" + parentPath);
System.out.println("currentChildList" + currentChildList);
}
});

Thread.sleep(3000);

zkClient.createPersistent("/zkPersistent");
Thread.sleep(1000);

zkClient.createPersistent("/zkPersistent/"+ "zk1", "zk1内容");
Thread.sleep(1000);

zkClient.createPersistent("/zkPersistent/" + "zk2", "zk2内容");
Thread.sleep(1000);

zkClient.delete("/super/c2");
Thread.sleep(1000);

zkClient.deleteRecursive("/super");
Thread.sleep(10000);

zkClient.close();
}

/**
* 订阅内容变化
*/
public static void test2() throws InterruptedException {
ZkClient zkClient = new ZkClient(new ZkConnection(SERVER), SESSION_TIMEOUT);

zkClient.createPersistent("/zkPersistent", "zkPersistentData");

// 对父节点添加监听子节点变化。
zkClient.subscribeDataChanges("/zkPersistent", new IZkDataListener() {

public void handleDataDeleted(String path) throws Exception {
System.out.println("删除的节点为:" + path);
}

public void handleDataChange(String path, Object data) throws Exception {
System.out.println("变更的节点为:" + path + ", 变更内容为:" + data);
}
});

Thread.sleep(3000);
zkClient.writeData("/zkPersistent", "zkPersistentDataUpdate", -1);

Thread.sleep(1000);
zkClient.delete("/zkPersistent");

Thread.sleep(10000);
}
}