-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy path01.zookeeper-source.java
79 lines (67 loc) · 3.06 KB
/
01.zookeeper-source.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
调用Zookeeper构造方法:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException {
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
}
// 创建ClientCnxnSocketNIO类的实例,交给SendThread类
// ClientCnxnSocketNIO类用来与服务端进行通信
private static ClientCnxnSocket getClientCnxnSocket() throws IOException {
String clientCnxnSocketName = System .getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
if (clientCnxnSocketName == null) {
// 创建实例
clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
}
try {
return (ClientCnxnSocket) Class.forName(clientCnxnSocketName) .newInstance();
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate " + clientCnxnSocketName);
ioe.initCause(e);
throw ioe;
}
}
//创建zk客户端,初始化zk客户端的参数
// sessionTimeout:客户端指定的,sessionId=0, sessionPasswd=new byte[16]
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
//客户端设置的超时时间
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
//连接超时
connectTimeout = sessionTimeout / hostProvider.size();
//读超时
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
}
//初始化线程的名字,设置Zookeeper状态为CONNECTING,设置线程为守护线程
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(makeThreadName("-SendThread()"));
state = States.CONNECTING;
/**
*SendThread通过SocketChannel(ClientCnxnSocketNIO)与服务端通信
*/
this.clientCnxnSocket = clientCnxnSocket;
setUncaughtExceptionHandler(uncaughtExceptionHandler);
setDaemon(true);
}
EventThread() {
super(makeThreadName("-EventThread"));
setUncaughtExceptionHandler(uncaughtExceptionHandler);
setDaemon(true);
}
ClientCnxn:
public void start() {
sendThread.start();
eventThread.start();
}