-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
192 lines (168 loc) · 6.5 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
package main
import (
"context"
"flag"
"fmt"
"net/http"
"os"
"time"
"github.com/prometheus/client_golang/prometheus/promhttp"
"barrelman/controller"
"barrelman/utils"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog"
)
var (
// Command line flags
addr = flag.String("listen-address", ":9193", "the address to listen for HTTP requests")
localKubeConfig = flag.String("local-kubeconfig", "", "absolute path to the kubeconfig file for the \"local\" cluster (where to maintain endpoints)")
localContext = flag.String("local-context", "", "context to use as the \"local\" cluster (where to maintain endpoints)")
remoteProject = flag.String("remote-project", "", "Remote clusters project id")
remoteZone = flag.String("remote-zone", "europe-west1-c", "Remote clusters zone")
remoteClusterName = flag.String("remote-cluster-name", "", "Remote clusters name")
resyncPeriod = flag.Duration("resync-period", 2*time.Hour, "how often should all nodes be considered \"old\" (and processed again)")
necWorkers = flag.Uint("nec-workers", 4, "number of workers for NodeEndpointController")
scWorkers = flag.Uint("sc-workers", 2, "number of workers for ServiceController")
createNodePortSvc = flag.Bool("nodeportsvc", false, "create services of type NodePort in \"local\" cluster (instead of ClusterIP)")
// See init() for "ignore-namespace"
)
func init() {
flag.Usage = func() {
_, _ = fmt.Fprintf(flag.CommandLine.Output(),
`This tool needs to connect to two clusters calles "local" and "remote".
The remote cluster will be watched for node changes.
On change, service endpoints in local cluster will be modify to always contain a up to date list of node ips.
Local cluster may be defined via 'local-kubeconfig' and 'local-context'.
Remote cluster must be defined via 'remote-project', 'remote-zone' and 'remote-cluster-name'.
The the needed config will be auto generated via a Google service account (GOOGLE_APPLICATION_CREDENTIALS).
`)
_, _ = fmt.Fprintf(flag.CommandLine.Output(), "\nUsage of %s:\n", os.Args[0])
flag.PrintDefaults()
}
flag.Var(utils.IgnoredNamespaces, "ignore-namespace",
"namespace to ignore services in, may be given multiple times. Prefix namespace with a dash to remove it from default")
klog.InitFlags(nil)
}
func getLocalClientset() *kubernetes.Clientset {
// creates the kubernetes config for the local cluster
// if kubeconfig is not given, master url is tried
// if both are omitted, inCluster config is tried
var config *rest.Config
var err error
if *localKubeConfig == "" {
klog.Infof("No -local-kubeconfig was specified. Using the inClusterConfig.")
config, err = rest.InClusterConfig()
if err != nil {
klog.Fatal(err)
}
} else {
config, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{
ExplicitPath: *localKubeConfig,
},
&clientcmd.ConfigOverrides{
CurrentContext: *localContext,
}).ClientConfig()
if err != nil {
klog.Fatal(err)
}
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatal(err)
}
return clientset
}
func getRemoteClientset() *kubernetes.Clientset {
if *remoteProject == "" || *remoteZone == "" || *remoteClusterName == "" {
klog.Fatalln("You have to specify -remote-project, -remote-zone and -remote-cluster-name")
}
clientset, err := utils.NewGKEClientset(*remoteProject, *remoteZone, *remoteClusterName)
if err != nil {
klog.Fatal(err)
}
return clientset
}
func main() {
flag.Parse()
// set up signals so we handle the first shutdown signal gracefully
stopCh := utils.SetupSignalHandler()
// create the clientsets
localClientset := getLocalClientset()
remoteClientset := getRemoteClientset()
lservices, err := localClientset.CoreV1().Services("").List(metaV1.ListOptions{
LabelSelector: utils.ServiceSelector.String(),
})
if err != nil {
klog.Fatal(err)
}
klog.Infof("%d services to manage endpoints for in local-cluster\n", len(lservices.Items))
rnodes, err := remoteClientset.CoreV1().Nodes().List(metaV1.ListOptions{})
if err != nil {
klog.Fatal(err)
}
klog.Infof("%d nodes in remote-cluster\n", len(rnodes.Items))
// FIXME: Would be nice to have only one localInformerFactory
// and apply the filter in NodeEndpointController
localFilteredInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(
localClientset,
*resyncPeriod,
kubeinformers.WithTweakListOptions(func(options *metaV1.ListOptions) {
options.LabelSelector = utils.ServiceSelector.String()
}),
)
localInformerFactory := kubeinformers.NewSharedInformerFactory(localClientset, *resyncPeriod)
remoteInformerFactory := kubeinformers.NewSharedInformerFactory(remoteClientset, *resyncPeriod)
nodeEndpointController := controller.NewNodeEndpointController(
localClientset, remoteClientset,
localFilteredInformerFactory.Core().V1().Services(),
remoteInformerFactory.Core().V1().Nodes(),
)
serviceController := controller.NewServiceController(
localClientset, remoteClientset,
remoteInformerFactory.Core().V1().Services(), localInformerFactory.Core().V1().Services(),
*createNodePortSvc,
)
// Ramp up the informer loops
// They run all registered informer in go routines
localFilteredInformerFactory.Start(stopCh)
remoteInformerFactory.Start(stopCh)
localInformerFactory.Start(stopCh)
// Register http handler for metrics and readiness/liveness probe
http.Handle("/metrics", promhttp.Handler())
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
_, _ = fmt.Fprint(w, "OK")
})
httpServer := &http.Server{Addr: *addr}
go func() {
// Launch HTTP server
_ = httpServer.ListenAndServe()
}()
// Launch the controllers
// This will block 'till stopCh
func() {
go func() {
if err = nodeEndpointController.Run(int(*necWorkers), stopCh); err != nil {
klog.Fatalf("Error running nodeEndpointController: %s", err.Error())
}
}()
go func() {
if err = serviceController.Run(int(*scWorkers), stopCh); err != nil {
klog.Fatalf("Error running serviceController: %s", err.Error())
}
}()
<-stopCh
}()
// Gracefully stop HTTP server
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
if err := httpServer.Shutdown(ctx); err != nil {
klog.Fatalf("Error stopping HTTP server: %v", err)
}
// Make sure context is canceled in any case to make linter happy
cancel()
}