This repository has been archived by the owner on Nov 24, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathaws.go
133 lines (123 loc) · 2.97 KB
/
aws.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws/ec2metadata"
"github.com/aws/aws-sdk-go-v2/aws/external"
"github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2"
"k8s.io/klog"
)
var (
lbv2 *elasticloadbalancingv2.Client
setupLbv2Once sync.Once
)
var (
watchNlbList = make(map[string]struct{})
errSetupLbv2 = fmt.Errorf("lbv2 is not setup")
)
func setupLbv2() error {
setupLbv2Once.Do(func() {
cfg, err := external.LoadDefaultAWSConfig()
if err != nil {
klog.Errorln(err)
return
}
// work out aws current region
meta := ec2metadata.New(cfg)
cfg.Region, err = meta.Region()
if err != nil {
klog.Errorln(err)
return
}
lbv2 = elasticloadbalancingv2.New(cfg)
// watch and reconcile
go watchLbv2()
})
if lbv2 == nil {
return errSetupLbv2
}
return nil
}
func regPod(targetGroupArn string, ip string, port int64) {
if err := setupLbv2(); err != nil {
klog.Warningln(err)
return
}
req := lbv2.RegisterTargetsRequest(&elasticloadbalancingv2.RegisterTargetsInput{
TargetGroupArn: &targetGroupArn,
Targets: []elasticloadbalancingv2.TargetDescription{{
Id: &ip,
Port: &port,
}},
})
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
_, err := req.Send(ctx)
if err != nil {
klog.Errorln(err)
}
}
func deregPod(targetGroupArn string, ip string, port int64) {
if err := setupLbv2(); err != nil {
klog.Warningln(err)
return
}
req := lbv2.DeregisterTargetsRequest(&elasticloadbalancingv2.DeregisterTargetsInput{
TargetGroupArn: &targetGroupArn,
Targets: []elasticloadbalancingv2.TargetDescription{{
Id: &ip,
Port: &port,
}},
})
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
_, err := req.Send(ctx)
if err != nil {
klog.Errorln(err)
}
}
// this will remove out-of-synced unhealthy targets
func reconcile(targetGroupArn string) {
if err := setupLbv2(); err != nil {
klog.Warningln(err)
return
}
des := lbv2.DescribeTargetHealthRequest(&elasticloadbalancingv2.DescribeTargetHealthInput{
TargetGroupArn: &targetGroupArn,
})
ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second)
defer cancel()
res, err := des.Send(ctx)
if err != nil {
klog.Errorln(err)
return
}
var targets []elasticloadbalancingv2.TargetDescription
for _, desc := range res.TargetHealthDescriptions {
if desc.TargetHealth.State == elasticloadbalancingv2.TargetHealthStateEnumUnhealthy {
targets = append(targets, *desc.Target)
}
}
if len(targets) > 0 {
dereg := lbv2.DeregisterTargetsRequest(&elasticloadbalancingv2.DeregisterTargetsInput{
TargetGroupArn: &targetGroupArn,
Targets: targets,
})
_, err := dereg.Send(ctx)
if err != nil {
klog.Errorln(err)
}
}
}
func addWatchLbv2(targetGroupArn string) {
watchNlbList[targetGroupArn] = struct{}{}
}
func watchLbv2() {
for range time.Tick(20 * time.Second) {
for arn := range watchNlbList {
reconcile(arn)
}
}
}