-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathendpoints.go
156 lines (129 loc) · 3.05 KB
/
endpoints.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package main
import (
"errors"
"fmt"
"net"
"net/url"
"os"
"strings"
"go.etcd.io/etcd/client/pkg/v3/srv"
"golang.org/x/exp/slices"
)
var errBadScheme = errors.New("url scheme must be http or https")
func endpointsWithLeaderAtEnd(gcfg globalConfig, statusList []epStatus) ([]string, error) {
eps, err := endpoints(gcfg)
if err != nil || len(eps) <= 1 {
return eps, err
}
var sortedEps, leaderEps []string
for _, status := range statusList {
if status.Resp.Header.MemberId != status.Resp.Leader {
sortedEps = append(sortedEps, status.Ep)
} else {
leaderEps = append(leaderEps, status.Ep)
}
}
sortedEps = append(sortedEps, leaderEps...)
return sortedEps, nil
}
func endpoints(gcfg globalConfig) ([]string, error) {
if !gcfg.useClusterEndpoints {
if len(gcfg.endpoints) == 0 {
return nil, errors.New("no endpoints provided")
}
return gcfg.endpoints, nil
}
return endpointsFromCluster(gcfg)
}
func isLocalEndpoint(ep string) (bool, error) {
if strings.HasPrefix(ep, "unix:") || strings.HasPrefix(ep, "unixs:") {
return true, nil
}
hostPort := ep
if strings.Contains(ep, "://") {
url, err := url.Parse(ep)
if err != nil {
return false, err
}
if url.Scheme != "http" && url.Scheme != "https" {
return false, errBadScheme
}
hostPort = url.Host
}
hostname, _, err := net.SplitHostPort(hostPort)
if err != nil {
return false, err
}
if strings.EqualFold(hostname, "localhost") {
return true, nil
}
ip := net.ParseIP(hostname)
if ip != nil && ip.IsLoopback() {
return true, nil
}
return false, nil
}
func endpointsFromCluster(gcfg globalConfig) ([]string, error) {
memberlistResp, err := memberList(gcfg)
if err != nil {
return nil, err
}
var eps []string
for _, m := range memberlistResp.Members {
// learner member only serves Status and SerializableRead requests, just ignore it
if !m.GetIsLearner() {
for _, ep := range m.ClientURLs {
// Do not append loopback endpoints when `--exclude-localhost` is set.
if gcfg.excludeLocalhost {
ok, err := isLocalEndpoint(ep)
if err != nil {
return nil, err
}
if ok {
continue
}
}
eps = append(eps, ep)
}
}
}
slices.Sort(eps)
eps = slices.Compact(eps)
return eps, nil
}
func endpointsFromCmd(gcfg globalConfig) ([]string, error) {
eps, err := endpointsFromDNSDiscovery(gcfg)
if err != nil {
return nil, err
}
if len(eps) == 0 {
eps = gcfg.endpoints
}
if len(eps) == 0 {
return nil, errors.New("no endpoints provided")
}
return eps, nil
}
func endpointsFromDNSDiscovery(gcfg globalConfig) ([]string, error) {
if gcfg.dnsDomain == "" {
return nil, nil
}
srvs, err := srv.GetClient("etcd-client", gcfg.dnsDomain, gcfg.dnsService)
if err != nil {
return nil, err
}
eps := srvs.Endpoints
if gcfg.insecureDiscovery {
return eps, nil
}
// strip insecure connections
var ret []string
for _, ep := range eps {
if strings.HasPrefix(ep, "http://") {
fmt.Fprintf(os.Stderr, "ignoring discovered insecure endpoint %q\n", ep)
continue
}
ret = append(ret, ep)
}
return ret, nil
}