-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathscraper.go
184 lines (170 loc) · 5.25 KB
/
scraper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
// LLAMA Scraper pulls stats from Collectors and then writes them to the indicated database.
package llama
import (
"errors"
"fmt"
influxdb_client "github.com/influxdata/influxdb1-client/v2"
"log"
"sync"
"time"
)
// Set default timeout for writes to 5 seconds
// This may be worth adding as a parameter in the future
const DefaultTimeout = time.Second * 5
// NewInfluxDbWriter provides a client for writing LLAMA datapoints to InfluxDB
func NewInfluxDbWriter(host string, port string, user string, pass string, db string) (*InfluxDbWriter, error) {
// Create the InfluxDB writer
url := fmt.Sprintf("http://%v:%v", host, port)
log.Println("Creating InfluxDB writer for", url)
ifdbc, err := influxdb_client.NewHTTPClient(influxdb_client.HTTPConfig{
Addr: url,
Username: user,
Password: pass,
Timeout: DefaultTimeout,
})
if err != nil {
return &InfluxDbWriter{}, err
}
writer := &InfluxDbWriter{
client: ifdbc,
db: db,
}
return writer, nil
}
// InfluxDbWriter is used for writing datapoints to an InfluxDB instance
type InfluxDbWriter struct {
client influxdb_client.Client
db string
}
// Close will close the InfluxDB client connection and release any associated resources
func (w *InfluxDbWriter) Close() error {
log.Println("Closing InfluxDB client connection")
return w.client.Close()
}
// Write will commit the batched points to the database
func (w *InfluxDbWriter) Write(batch influxdb_client.BatchPoints) error {
// Write to the DB
start := time.Now()
err := w.client.Write(batch)
elapsed := time.Since(start).Seconds()
if err != nil {
log.Println("DB write failed after:", elapsed, "seconds")
return err
}
// Only track write delay for successes
log.Println("DB write completed in:", elapsed, "seconds")
// TODO(dmar): Log rate of `db_write_delay`
return nil
}
// Batch will group the points into a batch for writing to the database
func (w *InfluxDbWriter) Batch(points Points) (influxdb_client.BatchPoints, error) {
// Create batch
bp, err := influxdb_client.NewBatchPoints(influxdb_client.BatchPointsConfig{
Database: w.db,
Precision: "s", // Write as second precision to improve perf, since we don't need it more granular
})
if err != nil {
return nil, err
}
// Convert all points and add to batch
for _, dp := range points {
// This is needed because the the InfluxDB client expects the interface type
// More casting weirdness :/
// TODO(dmar): Reevaluate this in the future, since this was likely a mistake
// initially and isn't necessary.
newFields := make(map[string]interface{})
for key, value := range dp.Fields {
newFields[key] = float64(value)
}
pt, err := influxdb_client.NewPoint(
dp.Measurement,
dp.Tags,
newFields,
dp.Time,
)
if err != nil {
return nil, err
}
bp.AddPoint(pt)
}
return bp, nil
}
// BatchWrite will group and write the indicates points to the associated InfluxDB host
func (w *InfluxDbWriter) BatchWrite(points Points) error {
batch, err := w.Batch(points)
if err != nil {
return errors.New(fmt.Sprintln("Failed to create batch from points:", err))
}
err = w.Write(batch)
if err != nil {
return errors.New(fmt.Sprintln("Failed to write batch:", err))
}
return nil
}
// Scraper pulls stats from collectors and writes them to a backend
type Scraper struct {
writer *InfluxDbWriter
collectors []Client
port string
}
// NewScraper creates and initializes a means of collecting stats and writing them to a database
func NewScraper(collectors []string, cPort string, dbHost string, dbPort string, dbUser string, dbPass string, dbName string) (*Scraper, error) {
var clients []Client
for _, collector := range collectors {
c := NewClient(collector, cPort)
clients = append(clients, c)
}
w, err := NewInfluxDbWriter(dbHost, dbPort, dbUser, dbPass, dbName)
if err != nil {
return &Scraper{}, err
}
s := &Scraper{
writer: w,
collectors: clients,
port: cPort,
}
return s, nil
}
// Run performs collections for all assocated collectors
func (s *Scraper) Run() {
log.Println("Collection cycle starting")
// Make sure we don't leave DB connections hanging open
defer s.writer.Close()
var wg sync.WaitGroup
// For each collector
for _, collector := range s.collectors {
wg.Add(1)
go func(c Client) {
defer wg.Done()
err := s.run(c)
HandleMinorError(err)
}(collector)
}
wg.Wait()
log.Println("Collection cycle complete")
}
func (s *Scraper) run(collector Client) error {
log.Println(collector.Hostname(), "- Collection cycle started")
// Pull stats
points, err := collector.GetPoints()
numPoints := float64(len(points))
if err != nil {
log.Println(collector.Hostname(), "- Collection failed:", err)
// TODO(dmar): Log rate of `failed_collections`
return err
}
log.Println(collector.Hostname(), "- Pulled datapoints:", numPoints)
// TODO(dmar): Log rate of `pulled_points`
// Write them to the client
err = s.writer.BatchWrite(points)
if err != nil {
log.Println(collector.Hostname(), "- Collection failed:", err)
// TODO(dmar): Log rate of `failed_collections`
return err
}
log.Println(collector.Hostname(), "- Wrote datapoints")
// TODO(dmar): Log rate of `written_points`
log.Println(collector.Hostname(), "- Collection cycle completed")
// TODO(dmar): Log rate of `successful_collections`
return nil
}