From ec479dc2c5aeeb8d965752350e095f4f54d94cc0 Mon Sep 17 00:00:00 2001 From: Kuchen Date: Sun, 18 Feb 2024 11:40:42 +0100 Subject: [PATCH] enable keep alive replace defer func with normal error handling --- helper/CheckerHelper.go | 18 ++++----- helper/threadHelper.go | 85 ++++++++++++++++++++++++----------------- 2 files changed, 56 insertions(+), 47 deletions(-) diff --git a/helper/CheckerHelper.go b/helper/CheckerHelper.go index e05f922..5552d5d 100644 --- a/helper/CheckerHelper.go +++ b/helper/CheckerHelper.go @@ -51,7 +51,7 @@ func RequestCustom(proxy *Proxy, siteUrl string) (string, int) { switch GetTypeName() { case "http": - transport = &http.Transport{Proxy: http.ProxyURL(proxyURL), DisableKeepAlives: true} + transport = &http.Transport{Proxy: http.ProxyURL(proxyURL)} case "socks4", "socks5": //udp doesn't work for some reason dialer, err := proxy2.SOCKS5("tcp", proxy.Full, nil, proxy2.Direct) @@ -61,7 +61,7 @@ func RequestCustom(proxy *Proxy, siteUrl string) (string, int) { transport = &http.Transport{ DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { return dialer.Dial(network, addr) - }, DisableKeepAlives: true, + }, } } @@ -82,15 +82,6 @@ func RequestCustom(proxy *Proxy, siteUrl string) (string, int) { return "Error making HTTP request", -1 } - defer func() { - if r := recover(); r != nil { - } - - err := resp.Body.Close() - if err != nil { - } - }() - status := resp.StatusCode resBody, err := io.ReadAll(resp.Body) @@ -98,5 +89,10 @@ func RequestCustom(proxy *Proxy, siteUrl string) (string, int) { return "Error reading response body", -1 } + err = resp.Body.Close() + if err != nil { + return "Error closing Body", -1 + } + return string(resBody), status } diff --git a/helper/threadHelper.go b/helper/threadHelper.go index a593f80..1fe1c71 100644 --- a/helper/threadHelper.go +++ b/helper/threadHelper.go @@ -7,10 +7,6 @@ import ( "time" ) -const ( - DelayBetweenChecks = time.Millisecond * 10 -) - var ( proxyQueue = ProxyQueue{} ProxyMap = make(map[int][]*Proxy) @@ -33,24 +29,23 @@ type CPMCounter struct { lastUpdated time.Time } +type ProxyListing struct { + mu sync.Mutex + proxies []*Proxy +} + var cpmCounter = CPMCounter{} +var proxyList = ProxyListing{} func Dispatcher(proxies []*Proxy) { threads := common.GetConfig().Threads retries = common.GetConfig().Retries + proxyList.proxies = proxies - for len(proxies) > 0 { - if int(atomic.LoadInt32(&threadsActive)) < threads { - wg.Add(1) - go check(proxies[0]) - atomic.AddInt32(&threadsActive, 1) - proxies = proxies[1:] - } else { - time.Sleep(DelayBetweenChecks) - } - if stop { - break - } + for i := 0; i < threads; i++ { + wg.Add(1) + go threadHandling() + atomic.AddInt32(&threadsActive, 1) } wg.Wait() @@ -60,6 +55,28 @@ func Dispatcher(proxies []*Proxy) { HasFinished = true } +func threadHandling() { + for len(proxyList.proxies) > 0 { + proxyList.mu.Lock() + if len(proxyList.proxies) > 0 { + proxy := proxyList.proxies[0] + proxyList.proxies = proxyList.proxies[1:] + proxyList.mu.Unlock() + check(proxy) + } else { + proxyList.mu.Unlock() + } + if stop { + break + } + } + + defer func() { + atomic.AddInt32(&threadsActive, -1) + wg.Done() + }() +} + func check(proxy *Proxy) { responded := false level := 0 @@ -96,8 +113,6 @@ func check(proxy *Proxy) { proxyQueue.Enqueue(proxy) mutex.Unlock() - } else { - atomic.AddInt32(&Invalid, 1) } responded = true @@ -105,30 +120,28 @@ func check(proxy *Proxy) { } //Ban check for websites - if responded && common.DoBanCheck() { - for i := 0; i < retries; i++ { - body, status := RequestCustom(proxy, common.GetConfig().Bancheck) - - if !(status >= 400) && status != -1 { - keywords := common.GetConfig().Keywords - - if len(keywords) == 0 || len(keywords[0]) == 0 || ContainsSlice(keywords, body) { - mutex.Lock() - ProxyMapFiltered[level] = append(ProxyMapFiltered[level], proxy) - ProxyCountMap[-1]++ - mutex.Unlock() - break + if responded { + //Extra if because of performance + if common.DoBanCheck() { + for i := 0; i < retries; i++ { + body, status := RequestCustom(proxy, common.GetConfig().Bancheck) + + if !(status >= 400) && status != -1 { + keywords := common.GetConfig().Keywords + + if len(keywords) == 0 || len(keywords[0]) == 0 || ContainsSlice(keywords, body) { + mutex.Lock() + ProxyMapFiltered[level] = append(ProxyMapFiltered[level], proxy) + ProxyCountMap[-1]++ + mutex.Unlock() + break + } } } } } else { atomic.AddInt32(&Invalid, 1) } - - defer func() { - atomic.AddInt32(&threadsActive, -1) - wg.Done() - }() } func GetInvalid() int {