forked from xuanbo/eureka-client
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathbeat.go
112 lines (99 loc) · 2.89 KB
/
beat.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package eureka_client
import (
"context"
"log"
"sync"
"time"
"golang.org/x/sync/semaphore"
)
type BeatReactor struct {
config *Config
beatMap ConcurrentMap
clientBeatIntervalInSecs int64
beatThreadCount int
beatThreadSemaphore *semaphore.Weighted
beatRecordMap ConcurrentMap
mux *sync.Mutex
log Logger
Period time.Duration
}
const DefaultBeatThreadNum = 20
var ctx = context.Background()
func NewBeatReactor(config *Config, clientBeatIntervalInSecs int64) BeatReactor {
br := BeatReactor{
config: config,
}
if clientBeatIntervalInSecs <= 0 {
clientBeatIntervalInSecs = 5
}
br.beatMap = NewConcurrentMap()
br.clientBeatIntervalInSecs = clientBeatIntervalInSecs
br.beatThreadCount = DefaultBeatThreadNum
br.beatRecordMap = NewConcurrentMap()
br.beatThreadSemaphore = semaphore.NewWeighted(int64(br.beatThreadCount))
br.mux = new(sync.Mutex)
br.log = NewLogger()
br.Period = time.Duration(clientBeatIntervalInSecs) * time.Second
return br
}
func (br *BeatReactor) AddBeatInfo(serviceName string, beatInfo *Instance) {
k := beatInfo.InstanceID
defer br.mux.Unlock()
br.mux.Lock()
if data, ok := br.beatMap.Get(k); ok {
beatInfo = data.(*Instance)
beatInfo.Status = "UP"
br.beatMap.Remove(k)
}
br.beatMap.Set(k, beatInfo)
go br.sendInstanceBeat(k, beatInfo)
}
func (br *BeatReactor) RemoveBeatInfo(serviceName string, instanceId string) {
log.Printf("remove beat: %s@%s from beat map", serviceName, instanceId)
k := instanceId
defer br.mux.Unlock()
br.mux.Lock()
data, exist := br.beatMap.Get(k)
if exist {
beatInfo := data.(*Instance)
beatInfo.Status = "UP"
}
br.beatMap.Remove(k)
}
func (br *BeatReactor) sendInstanceBeat(k string, beatInfo *Instance) {
for {
err := br.beatThreadSemaphore.Acquire(ctx, 1)
if err != nil {
log.Printf("sendInstanceBeat failed to acquire semaphore: %v", err)
return
}
//如果当前实例注销,则进行停止心跳
if beatInfo.Status != "UP" {
log.Printf("instance[%s] stop heartBeating", k)
br.beatThreadSemaphore.Release(1)
return
}
//进行心跳通信
// /eureka/apps/ORDER-SERVICE/localhost:order-service:8886?status=UP
err = Heartbeat(br.config.DefaultZone, beatInfo.App, beatInfo.InstanceID)
//u := br.config.DefaultZone + "apps/" + beatInfo.App + "/" + beatInfo.InstanceID + "?status=UP"
//result := requests.Put(u).Send().Status2xx()
if err != nil {
log.Printf("beat to server return error:%+v", err)
if err == ErrNotFound {
log.Printf("can't find this instance, heart beat exist. key:%s", k)
br.beatMap.Remove(k)
return
} else {
br.beatThreadSemaphore.Release(1)
t := time.NewTimer(br.Period)
<-t.C
continue
}
}
br.beatRecordMap.Set(k, time.Now().UnixNano()/1e6)
br.beatThreadSemaphore.Release(1)
t := time.NewTimer(br.Period)
<-t.C
}
}