Skip to content

Commit

Permalink
chore(ui): Only rejoin new nodes and add back instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena committed Feb 25, 2025
1 parent c92e339 commit 6651cd9
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 23 deletions.
2 changes: 1 addition & 1 deletion docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ ui:

# How frequently to rejoin the cluster to address split brain issues.
# CLI flag: -ui.rejoin-interval
[rejoin_interval: <duration> | default = 15s]
[rejoin_interval: <duration> | default = 1m]

# Number of initial peers to join from the discovered set.
# CLI flag: -ui.cluster-max-join-peers
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1954,7 +1954,7 @@ func (t *Loki) initDataObjExplorer() (services.Service, error) {

func (t *Loki) initUI() (services.Service, error) {
t.Cfg.UI = t.Cfg.UI.WithAdvertisePort(t.Cfg.Server.HTTPListenPort)
svc, err := ui.NewService(t.Cfg.UI, t.Server.HTTP, log.With(util_log.Logger, "component", "ui"))
svc, err := ui.NewService(t.Cfg.UI, t.Server.HTTP, log.With(util_log.Logger, "component", "ui"), prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ui/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.Var((*flagext.StringSlice)(&cfg.InfNames), "ui.interface", "Name of network interface to read address from.")
f.StringVar(&cfg.NodeName, "ui.node-name", hostname, "Name to use for this node in the cluster.")
f.StringVar(&cfg.AdvertiseAddr, "ui.advertise-addr", "", "IP address to advertise in the cluster.")
f.DurationVar(&cfg.RejoinInterval, "ui.rejoin-interval", 15*time.Second, "How frequently to rejoin the cluster to address split brain issues.")
f.DurationVar(&cfg.RejoinInterval, "ui.rejoin-interval", 1*time.Minute, "How frequently to rejoin the cluster to address split brain issues.")
f.IntVar(&cfg.ClusterMaxJoinPeers, "ui.cluster-max-join-peers", 3, "Number of initial peers to join from the discovered set.")
f.StringVar(&cfg.ClusterName, "ui.cluster-name", "", "Name to prevent nodes without this identifier from joining the cluster.")
f.BoolVar(&cfg.EnableIPv6, "ui.enable-ipv6", false, "Enable using a IPv6 instance address.")
Expand Down
51 changes: 40 additions & 11 deletions pkg/ui/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,45 @@ import (
)

func (s *Service) getBootstrapPeers() ([]string, error) {
peers, err := s.discoverPeers()
if err != nil {
return nil, err
}
return selectRandomPeers(peers, s.cfg.ClusterMaxJoinPeers), nil
}

func selectRandomPeers(peers []string, maxPeers int) []string {
// Here we return the entire list because we can't take a subset.
if maxPeers == 0 || len(peers) < maxPeers {
return peers
}

// We shuffle the list and return only a subset of the peers.
rand.Shuffle(len(peers), func(i, j int) {
peers[i], peers[j] = peers[j], peers[i]
})
return peers[:maxPeers]
}

func (s *Service) discoverNewPeers(prevPeers map[string]struct{}) ([]string, error) {
peers, err := s.discoverPeers()
if err != nil {
return nil, err
}

// Build list of new peers that weren't in previous list
var newPeers []string
for _, peer := range peers {
if _, ok := prevPeers[peer]; !ok {
newPeers = append(newPeers, peer)
prevPeers[peer] = struct{}{}
}
}

return selectRandomPeers(newPeers, s.cfg.ClusterMaxJoinPeers), nil
}

func (s *Service) discoverPeers() ([]string, error) {
if len(s.cfg.Discovery.JoinPeers) == 0 {
return nil, nil
}
Expand All @@ -29,17 +68,7 @@ func (s *Service) getBootstrapPeers() ([]string, error) {
}

// Return unique addresses.
peers := uniq(addresses)
// Here we return the entire list because we can't take a subset.
if s.cfg.ClusterMaxJoinPeers == 0 || len(peers) < s.cfg.ClusterMaxJoinPeers {
return peers, nil
}

// We shuffle the list and return only a subset of the peers.
rand.Shuffle(len(peers), func(i, j int) {
peers[i], peers[j] = peers[j], peers[i]
})
return peers[:s.cfg.ClusterMaxJoinPeers], nil
return uniq(addresses), nil
}

func uniq(addresses []string) []string {
Expand Down
34 changes: 25 additions & 9 deletions pkg/ui/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/grafana/ckit/peer"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/http2"

util_log "github.com/grafana/loki/v3/pkg/util/log"
Expand All @@ -37,9 +38,10 @@ type Service struct {

cfg Config
logger log.Logger
reg prometheus.Registerer
}

func NewService(cfg Config, router *mux.Router, logger log.Logger) (*Service, error) {
func NewService(cfg Config, router *mux.Router, logger log.Logger, reg prometheus.Registerer) (*Service, error) {
addr, err := ring.GetInstanceAddr(cfg.AdvertiseAddr, cfg.InfNames, util_log.Logger, cfg.EnableIPv6)
if err != nil {
return nil, err
Expand All @@ -56,19 +58,24 @@ func NewService(cfg Config, router *mux.Router, logger log.Logger) (*Service, er
}
advertiseAddr := fmt.Sprintf("%s:%d", cfg.AdvertiseAddr, cfg.AdvertisePort)
node, err := ckit.NewNode(httpClient, ckit.Config{
Name: cfg.NodeName,
// TODO(cyriltovena): ckit debug logs are too verbose
Log: level.NewFilter(logger, level.AllowInfo()),
Name: cfg.NodeName,
Log: logger,
AdvertiseAddr: advertiseAddr,
Label: cfg.ClusterName,
})
if err != nil {
return nil, err
}
if reg != nil {
if err := reg.Register(node.Metrics()); err != nil {
return nil, err
}
}

svc := &Service{
cfg: cfg,
logger: logger,
reg: reg,
node: node,
router: router,
client: httpClient,
Expand Down Expand Up @@ -102,6 +109,10 @@ func (s *Service) run(ctx context.Context) error {
level.Error(s.logger).Log("msg", "failed to bootstrap a fresh cluster with no peers", "err", err)
}
}
newPeers := make(map[string]struct{})
for _, p := range peers {
newPeers[p] = struct{}{}
}

var wg sync.WaitGroup
if s.cfg.RejoinInterval > 0 {
Expand All @@ -116,15 +127,17 @@ func (s *Service) run(ctx context.Context) error {
case <-ctx.Done():
return
case <-ticker.C:
peers, err := s.getBootstrapPeers()
peers, err := s.discoverNewPeers(newPeers)
if err != nil {
level.Warn(s.logger).Log("msg", "failed to get peers to join; will try again", "err", err)
continue
}
level.Info(s.logger).Log("msg", "rejoining cluster", "peers_count", len(peers))
if err := s.node.Start(peers); err != nil {
level.Warn(s.logger).Log("msg", "failed to connect to peers; will try again", "err", err)
continue
if len(peers) > 0 {
level.Info(s.logger).Log("msg", "rejoining cluster", "peers_count", len(newPeers))
if err := s.node.Start(peers); err != nil {
level.Warn(s.logger).Log("msg", "failed to connect to peers; will try again", "err", err)
continue
}
}
}
}
Expand All @@ -142,6 +155,9 @@ func (s *Service) stop(_ error) error {
if err := s.node.ChangeState(ctx, peer.StateTerminating); err != nil {
level.Error(s.logger).Log("msg", "failed to change state to terminating", "err", err)
}
if s.reg != nil {
s.reg.Unregister(s.node.Metrics())
}
return s.node.Stop()
}

Expand Down

0 comments on commit 6651cd9

Please sign in to comment.