-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathPipelineHttp.go
453 lines (433 loc) · 16.8 KB
/
PipelineHttp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
package PipelineHttp
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"github.com/quic-go/quic-go/http3"
"io"
"log"
"math/rand"
"net"
"net/http"
"net/url"
"regexp"
"strings"
"sync"
"time"
)
/*
MaxConnsPerHost 控制单个Host的最大连接总数,该值默认是0,也就是不限制,连接池里的连接能用就用,不能用创建新连接
MaxIdleConnsPerHost:优先设置这个,决定了对于单个Host需要维持的连接池大小。该值的合理确定,应该根据性能测试的结果调整。
MaxIdleConns:客户端连接单个Host,不少于MaxIdleConnsPerHost大小,不然影响MaxIdleConnsPerHost控制的连接池;客户端连接 n 个Host,少于 n X MaxIdleConnsPerHost 会影响MaxIdleConnsPerHost控制的连接池(导致连接重建)。嫌麻烦,建议设置为0,不限制。
MaxConnsPerHost:对于单个Host允许的最大连接数,包含IdleConns,所以一般大于等于MaxIdleConnsPerHost。设置为等于MaxIdleConnsPerHost,也就是尽可能复用连接池中的连接。另外设置过小,可能会导致并发下降,超过这个值会 block 请求,直到有空闲连接。(所以默认值是不限制的)
*/
type PipelineHttp struct {
Timeout time.Duration `json:"timeout"`
KeepAlive time.Duration `json:"keep_alive"`
MaxIdleConns int `json:"max_idle_conns"`
MaxIdleConnsPerHost int `json:"max_idle_conns_per_host"`
MaxConnsPerHost int `json:"max_conns_per_host"`
IdleConnTimeout time.Duration `json:"idle_conn_timeout"`
TLSHandshakeTimeout time.Duration `json:"tls_handshake_timeout"`
ExpectContinueTimeout time.Duration `json:"expect_continue_timeout"`
ResponseHeaderTimeout time.Duration `json:"response_header_timeout"`
Client *http.Client `json:"client"`
Ctx context.Context `json:"ctx"`
StopAll context.CancelFunc `json:"stop_all"`
IsClosed bool `json:"is_closed"`
ErrLimit int `json:"err_limit"` // 错误次数统计,失败就停止
ErrCount int `json:"err_count"` // 错误次数统计,失败就停止
SetHeader func() map[string]string `json:"set_header"`
Buf *bytes.Buffer `json:"buf"` // http2 client framer message
UseHttp2 bool `json:"use_http_2"`
TestHttp bool `json:"test_http"`
ReTry int `json:"re_try"` // 连接超时重试
NoLimit bool `json:"no_limit"` // 不限制
ver int
}
func NewPipelineHttp(args ...map[string]interface{}) *PipelineHttp {
nTimeout := 5 * 60 * time.Second
nIdle := 1000
// Timeout 这个超时是设置整个请求的超时时间,一般应该设置比IdleConnTimeout更长
// Timeout包含了拨号、TLS握手等时间,所以应该设置长一些。
x1 := &PipelineHttp{
ver: 1,
UseHttp2: false,
NoLimit: false,
TestHttp: false,
Buf: &bytes.Buffer{},
Timeout: time.Duration(nTimeout*2) * time.Second, // 拨号、连接
KeepAlive: time.Duration(nTimeout) * time.Second, // 默认值(当前为 15 秒)发送保持活动探测。
MaxIdleConns: nIdle, // MaxIdleConns controls the maximum number of idle (keep-alive) connections across all hosts. Zero means no limit.
IdleConnTimeout: nTimeout, // 不限制,秒
ResponseHeaderTimeout: time.Duration(nTimeout) * time.Second, // response
TLSHandshakeTimeout: time.Duration(nTimeout) * time.Second, // TLSHandshakeTimeout specifies the maximum amount of time waiting to wait for a TLS handshake. Zero means no timeout.
ExpectContinueTimeout: 0, // 零表示没有超时,并导致正文立即发送,无需等待服务器批准
MaxIdleConnsPerHost: nIdle, // MaxIdleConnsPerHost, if non-zero, controls the maximum idle (keep-alive) connections to keep per-host. If zero, DefaultMaxIdleConnsPerHost is used.
MaxConnsPerHost: 0, // 控制单个Host的最大连接总数,该值默认是0,也就是不限制,连接池里的连接能用就用,不能用创建新连接
ErrLimit: 10, // 相同目标,累计错误10次就退出
ErrCount: 0,
IsClosed: false,
SetHeader: nil,
ReTry: 3,
}
if x1.UseHttp2 {
x1.Client = x1.GetClient4Http2()
} else {
x1.Client = x1.GetClient(nil)
}
x1.SetCtx(context.Background())
if nil != args && 0 < len(args) {
for _, x := range args {
if data, err := json.Marshal(x); nil == err {
json.Unmarshal(data, x1)
}
}
}
//http.DefaultTransport.(*http.Transport).MaxIdleConns = x1.MaxIdleConns
//http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = x1.MaxIdleConnsPerHost
return x1
}
func (r *PipelineHttp) SetNoLimit() {
r.NoLimit = true
}
// https://cloud.tencent.com/developer/article/1529840
// https://zhuanlan.zhihu.com/p/451642373
func (r *PipelineHttp) Dial(ctx context.Context, network, addr string) (conn net.Conn, err error) {
for i := 0; i < r.ReTry; i++ {
conn, err = (&net.Dialer{
//Timeout: r.Timeout, // 不能打开,否则: dial tcp 127.0.0.1:1389: i/o timeout
KeepAlive: r.KeepAlive,
//Control: r.Control,
DualStack: true,
}).DialContext(ctx, network, addr)
if err == nil {
//conn.SetReadDeadline(time.Now().Add(r.Timeout))// 不能打开,否则: dial tcp 127.0.0.1:5900: i/o timeout
//one := make([]byte, 0)
//conn.SetReadDeadline(time.Now())
//if _, err := conn.Read(one); err != io.EOF {
break
//}else{
// conn.SetReadDeadline(time.Now().Add(r.Timeout * 10))
//}
}
}
return conn, err
}
func (r *PipelineHttp) SetCtx(ctx context.Context) {
r.Ctx, r.StopAll = context.WithCancel(ctx)
}
// https://github.com/golang/go/issues/23427
// https://cloud.tencent.com/developer/article/1529840
// https://romatic.net/post/go_net_errors/
// https://www.jianshu.com/p/2e5a7317be38
func (r *PipelineHttp) GetTransport() http.RoundTripper {
var tr http.RoundTripper = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: r.Dial,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true, MinVersion: tls.VersionTLS10, Renegotiation: tls.RenegotiateOnceAsClient},
//ForceAttemptHTTP2: true, // 不能加
//MaxResponseHeaderBytes: 4096, //net/http default is 10Mb
DisableKeepAlives: false, // false 才会复用连接 https://blog.csdn.net/qq_21514303/article/details/87794750
MaxIdleConns: r.MaxIdleConns, // 是长连接在关闭之前,连接池对所有host的最大链接数量
IdleConnTimeout: r.IdleConnTimeout, // 连接最大空闲时间,超过这个时间就会被关闭
TLSHandshakeTimeout: r.TLSHandshakeTimeout, // 限制TLS握手使用的时间
ExpectContinueTimeout: r.ExpectContinueTimeout, // 限制客户端在发送一个包含:100-continue的http报文头后,等待收到一个go-ahead响应报文所用的时间。在1.6中,此设置对HTTP/2无效。(在1.6.2中提供了一个特定的封装DefaultTransport)
MaxIdleConnsPerHost: r.MaxIdleConnsPerHost, // 连接池对每个host的最大链接数量(MaxIdleConnsPerHost <= MaxIdleConns,如果客户端只需要访问一个host,那么最好将MaxIdleConnsPerHost与MaxIdleConns设置为相同,这样逻辑更加清晰)
MaxConnsPerHost: r.MaxConnsPerHost,
ResponseHeaderTimeout: r.ResponseHeaderTimeout, // 限制读取响应报文头使用的时间
}
return tr
}
func (r *PipelineHttp) GetClient(tr http.RoundTripper) *http.Client {
if nil == tr {
tr = r.GetTransport()
}
//c := &fasthttp.Client{
// ReadTimeout: readTimeout,
// WriteTimeout: writeTimeout,
// MaxIdleConnDuration: maxIdleConnDuration,
// NoDefaultUserAgentHeader: true, // Don't send: User-Agent: fasthttp
// DisableHeaderNamesNormalizing: true, // If you set the case on your headers correctly you can enable this
// DisablePathNormalizing: true,
// // increase DNS cache time to an hour instead of default minute
// Dial: (&fasthttp.TCPDialer{
// Concurrency: 4096,
// DNSCacheDuration: time.Hour,
// }).Dial,
//}
c := &http.Client{
Transport: tr,
//Timeout: r.Timeout, // 超时为零表示没有超时, context canceled (Client.Timeout exceeded while awaiting headers)
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse /* 不进入重定向 */
},
}
if o, ok := c.Transport.(*http.Transport); ok {
o.Proxy = http.ProxyFromEnvironment
}
return c
}
func (r *PipelineHttp) DoGet(szUrl string, fnCbk func(resp *http.Response, err error, szU string)) {
r.DoGetWithClient(nil, szUrl, "GET", nil, fnCbk)
}
/*
*/
func byPass403(szU string) string {
a := []string{"/", "//", "/*", "/*/", "/.", "/./", "/./.", "?", "??", "???", "...;/", "/...;/", "%20/", "%09/"}
//a1 := []string{"%2e/", "%2f/", "/%20"} // 最后一个/ 随机替换为
//oU, _ := url.Parse(szU)
switch szU[len(szU)-1:] {
case "/":
return szU + a[rand.Int()%len(a)]
case "?":
return fmt.Sprintf("%s%05f", szU, rand.Float64())
default:
if -1 < strings.Index(szU, "?") {
return szU + strings.Repeat("&", rand.Int()%10) + fmt.Sprintf("%05f", rand.Float64())
}
//if rand.Int()%2 == 0 && -1 < strings.Index(oU.Path, "/") {
// x := strings.Split(oU.Path, "/")
// return oU.Scheme + "://" + oU.Host + strings.Join(x[0:len(x)-1])
//}
return szU
}
}
func (r *PipelineHttp) DoGetWithClient(client *http.Client, szUrl string, method string, postBody io.Reader, fnCbk func(resp *http.Response, err error, szU string)) {
szUrl = byPass403(szUrl)
oU, _ := url.Parse(szUrl)
if nil == oU {
return
}
szLip := "127.0.0.1"
r.DoGetWithClient4SetHd(client, szUrl, method, postBody, fnCbk, func() map[string]string {
// 403 bypass, https://zhuanlan.zhihu.com/p/642297652
return map[string]string{
"Host": szLip,
"Referer": szUrl,
"X-Original-URL": oU.Path,
"X-Rewrite-URL": oU.Path,
"X-Originating-IP": szLip,
"X-Remote-IP": szLip,
"X-Client-IP": szLip,
"X-Forwarded-For": szLip,
"X-Forwared-Host": szLip,
"X-Host": szLip,
"X-Custom-IP-Authorization": szLip,
}
}, true)
}
func (r *PipelineHttp) DoGetWithClient4SetHdNoCloseBody(client *http.Client, szUrl string, method string, postBody io.Reader, fnCbk func(resp *http.Response, err error, szU string), setHd func() map[string]string) {
r.DoGetWithClient4SetHd(client, szUrl, method, postBody, fnCbk, setHd, false)
}
func (r *PipelineHttp) CloseResponse(resp *http.Response) {
if nil != resp {
defer resp.Body.Close()
io.Copy(io.Discard, resp.Body)
}
}
// application/x-www-form-urlencoded
// multipart/form-data
// text/plain
func (r *PipelineHttp) DoGetWithClient4SetHd(client *http.Client, szUrl string, method string, postBody io.Reader, fnCbk func(resp *http.Response, err error, szU string), setHd func() map[string]string, bCloseBody bool) {
//r.testHttp2(szUrl)
if client == nil {
if nil != r.Client {
client = r.Client
} else {
client = r.GetClient(nil)
}
}
req, err := http.NewRequest(method, szUrl, postBody)
if nil == err {
if 1 == r.ver && !r.UseHttp2 && !r.TestHttp && strings.HasPrefix(szUrl, "https://") {
req.Header.Set("Connection", "Upgrade, HTTP2-Settings")
req.Header.Set("Upgrade", "h2c")
req.Header.Set("HTTP2-Settings", "AAMAAABkAARAAAAAAAIAAAAA")
} else {
req.Header.Set("Connection", "keep-alive")
}
//req.Close = true // 避免 Read返回EOF error
var fnShk func() map[string]string
if nil != setHd {
fnShk = setHd
} else {
fnShk = r.SetHeader
}
if nil != fnShk {
m1 := fnShk()
for k09, v09 := range m1 {
req.Header.Set(k09, v09)
}
}
} else {
log.Println("http.NewRequest is error ", err)
return
}
n1 := client.Timeout
if 0 == n1 {
n1 = 50
}
if req.Header.Get("User-Agent") == "" {
req.Header.Set("User-Agent", fmt.Sprintf("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.%05f.159 Safari/537.36", rand.Float64()))
}
//if 0 < r.Timeout {
// ctx, cc := context.WithTimeout(r.Ctx, n1*r.Timeout)
// defer cc()
// req = req.WithContext(ctx)
//} else {
// req = req.WithContext(r.Ctx) // context canceled
//}
resp, err := client.Do(req)
if bCloseBody && resp != nil {
defer r.CloseResponse(resp) // resp 可能为 nil,不能读取 Body
}
if nil != err {
r.ErrCount++
}
if (!NoLimit && !r.NoLimit) && r.ErrCount >= r.ErrLimit {
log.Printf("PipelineHttp %d >= %d not close\n", r.ErrCount, r.ErrLimit)
r.Close()
return
}
if nil != err && rNohost.MatchString(err.Error()) {
log.Println(szUrl, err)
r.Close()
return
}
if !r.UseHttp2 && strings.HasPrefix(szUrl, "https://") && nil != resp && 200 != resp.StatusCode {
r.UseHttp2 = true
if a1 := resp.Header["Alt-Svc"]; 0 < len(a1) && strings.Contains(a1[0], "h3=\"") || strings.HasPrefix(resp.Proto, "HTTP/3") {
r.Client = r.GetClient4Http3()
} else if resp.StatusCode == http.StatusSwitchingProtocols {
r.Client = r.GetRawClient4Http2()
}
oU7, _ := url.Parse(szUrl)
szUrl09 := "https://" + oU7.Host + strings.Split(szUrl, oU7.Host)[1]
r.ErrLimit = 99999999
r.CloseResponse(resp)
r.DoGetWithClient4SetHd(r.Client, szUrl09, method, postBody, fnCbk, setHd, bCloseBody)
return
}
fnCbk(resp, err, szUrl)
}
var (
rNohost = regexp.MustCompile(`.*dial tcp: [^:]+: no such host.*`)
NoLimit = false
)
func (r *PipelineHttp) Close() {
r.IsClosed = true
if r.Client != nil && r.Client.Transport != nil {
if tr, ok := r.Client.Transport.(*http3.RoundTripper); ok {
tr.Close()
} else if tr, ok := r.Client.Transport.(*http.Transport); ok {
tr.CloseIdleConnections()
}
}
r.StopAll()
r.Client = nil
}
func (r *PipelineHttp) DoDirs4Http2(szUrl string, dirs []string, nThread int, fnCbk func(resp *http.Response, err error, szU string)) {
r.UseHttp2 = true
r.doDirsPrivate(szUrl, dirs, nThread, fnCbk)
}
func (r *PipelineHttp) DoDirs(szUrl string, dirs []string, nThread int, fnCbk func(resp *http.Response, err error, szU string)) {
r.doDirsPrivate(szUrl, dirs, nThread, fnCbk)
}
func (r *PipelineHttp) testHttp2(szUrl001 string) {
if !r.UseHttp2 || !r.TestHttp {
r.TestHttp = true
r.UseHttp2 = true
c1 := r.GetRawClient4Http2()
oU7, _ := url.Parse(szUrl001)
if "" == oU7.Path {
oU7.Path = "/"
}
szUrl09 := "https://" + oU7.Host + oU7.Path
r.DoGetWithClient(c1, szUrl09, "GET", nil, func(resp *http.Response, err error, szU string) {
if nil != resp {
if resp.StatusCode == http.StatusSwitchingProtocols {
r.CloseResponse(resp)
if nil != r.Client {
r.Client.CloseIdleConnections()
}
if strings.HasPrefix(resp.Proto, "HTTP/2") {
r.Client = c1
}
} else if a1 := resp.Header["Alt-Svc"]; 0 < len(a1) && strings.Contains(a1[0], "h3=\"") {
r.Client = r.GetClient4Http3()
} else if resp.Proto == "HTTP/2.0" {
r.UseHttp2 = true
r.Client = c1
}
r.ErrLimit = 99999999
} else {
r.UseHttp2 = false
}
})
}
}
// more see cmd/main.go
func (r *PipelineHttp) doDirsPrivate(szUrl string, dirs []string, nThread int, fnCbk func(resp *http.Response, err error, szU string)) {
c02 := make(chan struct{}, nThread)
defer close(c02)
oUrl, err := url.Parse(szUrl)
if nil != err {
log.Printf("url.Parse is error: %v %s", err, szUrl)
return
}
if "" == oUrl.Scheme {
oUrl.Scheme = "http"
}
szUrl = oUrl.Scheme + "://" + oUrl.Host + oUrl.Path
var wg sync.WaitGroup
var client *http.Client
r.testHttp2(szUrl)
if r.UseHttp2 {
client = r.GetClient4Http2()
} else {
client = r.GetClient(nil)
client = r.Client
}
for _, j := range dirs {
if r.IsClosed {
return
}
select {
case <-r.Ctx.Done():
return
default:
{
c02 <- struct{}{}
wg.Add(1)
go func(s2 string) {
defer func() {
<-c02
wg.Done()
}()
select {
case <-r.Ctx.Done():
return
default:
{
s2 = strings.TrimSpace(s2)
if !strings.HasPrefix(s2, "/") {
s2 = "/" + s2
}
szUrl001 := szUrl + s2
//fmt.Printf("%s\033[2K\r", szUrl001)
//fmt.Printf(".")
r.DoGetWithClient(client, szUrl001, "GET", nil, fnCbk)
//r.DoGet(szUrl001, fnCbk)
return
}
}
}(j)
continue
}
}
}
wg.Wait()
}