diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index ce2c8d359d752..64cfdf6d835a7 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -1298,18 +1298,9 @@ Experimental: The `bloom_gateway` block configures the Loki bloom gateway server client: # Configures the behavior of the connection pool. pool_config: - # How frequently to clean up clients for servers that have gone away or are - # unhealthy. + # How frequently to update the list of servers. # CLI flag: -bloom-gateway-client.pool.check-interval - [check_interval: | default = 10s] - - # Run a health check on each server during periodic cleanup. - # CLI flag: -bloom-gateway-client.pool.enable-health-check - [enable_health_check: | default = true] - - # Timeout for the health check if health check is enabled. - # CLI flag: -bloom-gateway-client.pool.health-check-timeout - [health_check_timeout: | default = 1s] + [check_interval: | default = 15s] # The grpc_client block configures the gRPC client used to communicate between # a client and server component in Loki. diff --git a/pkg/bloomgateway/client.go b/pkg/bloomgateway/client.go index 2529a678e7794..a873d04960b47 100644 --- a/pkg/bloomgateway/client.go +++ b/pkg/bloomgateway/client.go @@ -161,7 +161,7 @@ func NewClient( } } - poolFactory := func(addr string) (ringclient.PoolClient, error) { + clientFactory := func(addr string) (ringclient.PoolClient, error) { pool, err := NewBloomGatewayGRPCPool(addr, dialOpts) if err != nil { return nil, errors.Wrap(err, "new bloom gateway grpc pool") @@ -185,17 +185,10 @@ func NewClient( // Make an attempt to do one DNS lookup so we can start with addresses dnsProvider.RunOnce() - clientPool := ringclient.NewPool( - "bloom-gateway", - ringclient.PoolConfig(cfg.PoolConfig), - func() ([]string, error) { return dnsProvider.Addresses(), nil }, - ringclient.PoolAddrFunc(poolFactory), - metrics.clients, - logger, - ) - - pool := NewJumpHashClientPool(clientPool, dnsProvider, cfg.PoolConfig.CheckInterval, logger) - pool.Start() + pool, err := NewJumpHashClientPool(clientFactory, dnsProvider, cfg.PoolConfig.CheckInterval, logger) + if err != nil { + return nil, err + } return &GatewayClient{ cfg: cfg, diff --git a/pkg/bloomgateway/client_pool.go b/pkg/bloomgateway/client_pool.go index 989ced34c6730..4b45292bef889 100644 --- a/pkg/bloomgateway/client_pool.go +++ b/pkg/bloomgateway/client_pool.go @@ -3,7 +3,7 @@ package bloomgateway import ( "context" "flag" - "sort" + "sync" "time" "github.com/go-kit/log" @@ -15,38 +15,45 @@ import ( ) // PoolConfig is config for creating a Pool. -// It has the same fields as "github.com/grafana/dskit/ring/client.PoolConfig" so it can be cast. type PoolConfig struct { - CheckInterval time.Duration `yaml:"check_interval"` - HealthCheckEnabled bool `yaml:"enable_health_check"` - HealthCheckTimeout time.Duration `yaml:"health_check_timeout"` - MaxConcurrentHealthChecks int `yaml:"-"` + CheckInterval time.Duration `yaml:"check_interval"` } // RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *PoolConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - f.DurationVar(&cfg.CheckInterval, prefix+"check-interval", 10*time.Second, "How frequently to clean up clients for servers that have gone away or are unhealthy.") - f.BoolVar(&cfg.HealthCheckEnabled, prefix+"enable-health-check", true, "Run a health check on each server during periodic cleanup.") - f.DurationVar(&cfg.HealthCheckTimeout, prefix+"health-check-timeout", 1*time.Second, "Timeout for the health check if health check is enabled.") + f.DurationVar(&cfg.CheckInterval, prefix+"check-interval", 15*time.Second, "How frequently to update the list of servers.") } func (cfg *PoolConfig) Validate() error { return nil } +// compiler check +var _ clientPool = &JumpHashClientPool{} + +type ClientFactory func(addr string) (client.PoolClient, error) + +func (f ClientFactory) New(addr string) (client.PoolClient, error) { + return f(addr) +} + type JumpHashClientPool struct { - *client.Pool + services.Service *jumphash.Selector + sync.RWMutex + + provider AddressProvider + logger log.Logger - done chan struct{} - logger log.Logger + clients map[string]client.PoolClient + clientFactory ClientFactory } type AddressProvider interface { Addresses() []string } -func NewJumpHashClientPool(pool *client.Pool, dnsProvider AddressProvider, updateInterval time.Duration, logger log.Logger) *JumpHashClientPool { +func NewJumpHashClientPool(clientFactory ClientFactory, dnsProvider AddressProvider, updateInterval time.Duration, logger log.Logger) (*JumpHashClientPool, error) { selector := jumphash.DefaultSelector() err := selector.SetServers(dnsProvider.Addresses()...) if err != nil { @@ -54,14 +61,19 @@ func NewJumpHashClientPool(pool *client.Pool, dnsProvider AddressProvider, updat } p := &JumpHashClientPool{ - Pool: pool, - Selector: selector, - done: make(chan struct{}), - logger: logger, + Selector: selector, + clientFactory: clientFactory, + provider: dnsProvider, + logger: logger, + clients: make(map[string]client.PoolClient, len(dnsProvider.Addresses())), } - go p.updateLoop(dnsProvider, updateInterval) - return p + p.Service = services.NewTimerService(updateInterval, nil, p.updateLoop, nil) + return p, services.StartAndAwaitRunning(context.Background(), p.Service) +} + +func (p *JumpHashClientPool) Stop() { + _ = services.StopAndAwaitTerminated(context.Background(), p.Service) } func (p *JumpHashClientPool) AddrForFingerprint(fp uint64) (string, error) { @@ -80,35 +92,42 @@ func (p *JumpHashClientPool) Addr(key string) (string, error) { return addr.String(), nil } -func (p *JumpHashClientPool) Start() { - ctx := context.Background() - _ = services.StartAndAwaitRunning(ctx, p.Pool) +func (p *JumpHashClientPool) updateLoop(_ context.Context) error { + err := p.SetServers(p.provider.Addresses()...) + if err != nil { + level.Warn(p.logger).Log("msg", "error updating servers", "err", err) + } + return nil } -func (p *JumpHashClientPool) Stop() { - ctx := context.Background() - _ = services.StopAndAwaitTerminated(ctx, p.Pool) - close(p.done) -} +// GetClientFor implements clientPool. +func (p *JumpHashClientPool) GetClientFor(addr string) (client.PoolClient, error) { + client, ok := p.fromCache(addr) + if ok { + return client, nil + } + + // No client in cache so create one + p.Lock() + defer p.Unlock() -func (p *JumpHashClientPool) updateLoop(provider AddressProvider, updateInterval time.Duration) { - ticker := time.NewTicker(updateInterval) - defer ticker.Stop() - - for { - select { - case <-p.done: - return - case <-ticker.C: - servers := provider.Addresses() - // ServerList deterministically maps keys to _index_ of the server list. - // Since DNS returns records in different order each time, we sort to - // guarantee best possible match between nodes. - sort.Strings(servers) - err := p.SetServers(servers...) - if err != nil { - level.Warn(p.logger).Log("msg", "error updating servers", "err", err) - } - } + // Check if a client has been created just after checking the cache and before acquiring the lock. + client, ok = p.clients[addr] + if ok { + return client, nil } + + client, err := p.clientFactory.New(addr) + if err != nil { + return nil, err + } + p.clients[addr] = client + return client, nil +} + +func (p *JumpHashClientPool) fromCache(addr string) (client.PoolClient, bool) { + p.RLock() + defer p.RUnlock() + client, ok := p.clients[addr] + return client, ok } diff --git a/pkg/bloomgateway/client_pool_test.go b/pkg/bloomgateway/client_pool_test.go index 5e3792861f4c3..a592bf2417866 100644 --- a/pkg/bloomgateway/client_pool_test.go +++ b/pkg/bloomgateway/client_pool_test.go @@ -31,7 +31,8 @@ func TestJumpHashClientPool_UpdateLoop(t *testing.T) { provider := &provider{} provider.UpdateAddresses([]string{"localhost:9095"}) - pool := NewJumpHashClientPool(nil, provider, interval, log.NewNopLogger()) + pool, err := NewJumpHashClientPool(nil, provider, interval, log.NewNopLogger()) + require.NoError(t, err) require.Len(t, pool.Addrs(), 1) require.Equal(t, "127.0.0.1:9095", pool.Addrs()[0].String())