-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add server with hooks and healthz (#109)
- Loading branch information
Showing
5 changed files
with
990 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
// Package server provides a generic HTTP server. | ||
package server |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
package server | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"net/http" | ||
"strings" | ||
|
||
lctx "github.com/hamba/logger/v2/ctx" | ||
"github.com/hamba/pkg/v2/http/healthz" | ||
"github.com/hamba/pkg/v2/http/middleware" | ||
) | ||
|
||
// MustAddHealthzChecks adds health checks to both readyz and livez, panicking if there is an error. | ||
func (s *GenericServer[T]) MustAddHealthzChecks(checks ...healthz.HealthChecker) { | ||
if err := s.AddHealthzChecks(checks...); err != nil { | ||
panic(err) | ||
} | ||
} | ||
|
||
// AddHealthzChecks adds health checks to both readyz and livez. | ||
func (s *GenericServer[T]) AddHealthzChecks(checks ...healthz.HealthChecker) error { | ||
if err := s.AddReadyzChecks(checks...); err != nil { | ||
return err | ||
} | ||
return s.AddLivezChecks(checks...) | ||
} | ||
|
||
// MustAddReadyzChecks adds health checks to readyz, panicking if there is an error. | ||
func (s *GenericServer[T]) MustAddReadyzChecks(checks ...healthz.HealthChecker) { | ||
if err := s.AddReadyzChecks(checks...); err != nil { | ||
panic(err) | ||
} | ||
} | ||
|
||
// AddReadyzChecks adds health checks to readyz. | ||
func (s *GenericServer[T]) AddReadyzChecks(checks ...healthz.HealthChecker) error { | ||
s.readyzMu.Lock() | ||
defer s.readyzMu.Unlock() | ||
if s.readyzInstalled { | ||
return errors.New("could not add checks as readyz has already been installed") | ||
} | ||
s.readyzChecks = append(s.readyzChecks, checks...) | ||
return nil | ||
} | ||
|
||
// MustAddLivezChecks adds health checks to livez, panicking if there is an error. | ||
func (s *GenericServer[T]) MustAddLivezChecks(checks ...healthz.HealthChecker) { | ||
if err := s.AddLivezChecks(checks...); err != nil { | ||
panic(err) | ||
} | ||
} | ||
|
||
// AddLivezChecks adds health checks to livez. | ||
func (s *GenericServer[T]) AddLivezChecks(checks ...healthz.HealthChecker) error { | ||
s.livezMu.Lock() | ||
defer s.livezMu.Unlock() | ||
if s.livezInstalled { | ||
return errors.New("could not add checks as livez has already been installed") | ||
} | ||
s.livezChecks = append(s.livezChecks, checks...) | ||
return nil | ||
} | ||
|
||
func (s *GenericServer[T]) installChecks(h http.Handler, shutdownCh chan struct{}) http.Handler { | ||
mux := http.NewServeMux() | ||
mux.Handle("/", h) | ||
s.installLivezChecks(mux) | ||
|
||
// When shutdown is started, the readyz check should start failing. | ||
if err := s.AddReadyzChecks(shutdownCheck{ch: shutdownCh}); err != nil { | ||
s.Log.Error("Could not install readyz shutdown check", lctx.Err(err)) | ||
} | ||
s.installReadyzChecks(mux) | ||
|
||
return mux | ||
} | ||
|
||
func (s *GenericServer[T]) installReadyzChecks(mux *http.ServeMux) { | ||
s.readyzMu.Lock() | ||
defer s.readyzMu.Unlock() | ||
s.readyzInstalled = true | ||
s.installCheckers(mux, "/readyz", s.readyzChecks) | ||
} | ||
|
||
func (s *GenericServer[T]) installLivezChecks(mux *http.ServeMux) { | ||
s.livezMu.Lock() | ||
defer s.livezMu.Unlock() | ||
s.livezInstalled = true | ||
s.installCheckers(mux, "/livez", s.livezChecks) | ||
} | ||
|
||
func (s *GenericServer[T]) installCheckers(mux *http.ServeMux, path string, checks []healthz.HealthChecker) { | ||
if len(checks) == 0 { | ||
checks = []healthz.HealthChecker{healthz.PingHealth} | ||
} | ||
|
||
s.Log.Info("Installing health checkers", | ||
lctx.Str("path", path), | ||
lctx.Str("checks", strings.Join(checkNames(checks), ",")), | ||
) | ||
|
||
name := strings.TrimPrefix(path, "/") | ||
h := healthz.Handler(name, func(output string) { | ||
s.Log.Info(fmt.Sprintf("%s check failed\n%s", name, output)) | ||
}, checks...) | ||
mux.Handle(path, middleware.WithStats(name, s.Stats, h)) | ||
} | ||
|
||
func checkNames(checks []healthz.HealthChecker) []string { | ||
names := make([]string, len(checks)) | ||
for i, check := range checks { | ||
names[i] = check.Name() | ||
} | ||
return names | ||
} | ||
|
||
type shutdownCheck struct { | ||
ch <-chan struct{} | ||
} | ||
|
||
func (s shutdownCheck) Name() string { return "shutdown" } | ||
|
||
func (s shutdownCheck) Check(*http.Request) error { | ||
select { | ||
case <-s.ch: | ||
return errors.New("server is shutting down") | ||
default: | ||
return nil | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,188 @@ | ||
package server | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"net/http" | ||
|
||
lctx "github.com/hamba/logger/v2/ctx" | ||
) | ||
|
||
// PostStartHookFunc is a function called after server start. | ||
type PostStartHookFunc[T context.Context] func(T) error | ||
|
||
// PreShutdownHookFunc is a function called before server shutdown. | ||
type PreShutdownHookFunc func() error | ||
|
||
type postStartHookEntry[T context.Context] struct { | ||
fn PostStartHookFunc[T] | ||
doneCh chan struct{} | ||
} | ||
|
||
// MustAddPostStartHook adds a post-start hook, panicking if there is an error. | ||
func (s *GenericServer[T]) MustAddPostStartHook(name string, fn PostStartHookFunc[T]) { | ||
if err := s.AddPostStartHook(name, fn); err != nil { | ||
panic(err) | ||
} | ||
} | ||
|
||
// AddPostStartHook adds a post-start hook. | ||
func (s *GenericServer[T]) AddPostStartHook(name string, fn PostStartHookFunc[T]) error { | ||
if name == "" { | ||
return errors.New("name is required") | ||
} | ||
if fn == nil { | ||
return errors.New("fn is required") | ||
} | ||
|
||
s.postStartHookMu.Lock() | ||
defer s.postStartHookMu.Unlock() | ||
|
||
if s.postStartHooksCalled { | ||
return errors.New("hooks have already been called") | ||
} | ||
if _, exists := s.postStartHooks[name]; exists { | ||
return fmt.Errorf("hook %q as it is already registered", name) | ||
} | ||
|
||
if s.postStartHooks == nil { | ||
s.postStartHooks = map[string]postStartHookEntry[T]{} | ||
} | ||
|
||
doneCh := make(chan struct{}) | ||
err := s.AddReadyzChecks(postStartHookHealth{ | ||
name: "postStartHook:" + name, | ||
doneCh: doneCh, | ||
}) | ||
if err != nil { | ||
return fmt.Errorf("adding readyz check: %w", err) | ||
} | ||
|
||
s.postStartHooks[name] = postStartHookEntry[T]{ | ||
fn: fn, | ||
doneCh: doneCh, | ||
} | ||
return nil | ||
} | ||
|
||
// MustAddPreShutdownHook adds a pre-shutdown hook, panicking if there is an error. | ||
func (s *GenericServer[T]) MustAddPreShutdownHook(name string, fn PreShutdownHookFunc) { | ||
if err := s.AddPreShutdownHook(name, fn); err != nil { | ||
panic(err) | ||
} | ||
} | ||
|
||
// AddPreShutdownHook adds a pre-shutdown hook. | ||
func (s *GenericServer[T]) AddPreShutdownHook(name string, fn PreShutdownHookFunc) error { | ||
if name == "" { | ||
return errors.New("name is required") | ||
} | ||
if fn == nil { | ||
return errors.New("fn is required") | ||
} | ||
|
||
s.preShutdownHookMu.Lock() | ||
defer s.preShutdownHookMu.Unlock() | ||
|
||
if s.preShutdownHooksCalled { | ||
return errors.New("hooks have already been called") | ||
} | ||
if _, exists := s.preShutdownHooks[name]; exists { | ||
return fmt.Errorf("hook %q as it is already registered", name) | ||
} | ||
|
||
if s.preShutdownHooks == nil { | ||
s.preShutdownHooks = map[string]PreShutdownHookFunc{} | ||
} | ||
|
||
s.preShutdownHooks[name] = fn | ||
return nil | ||
} | ||
|
||
func (s *GenericServer[T]) runPostStartHooks(ctx T) { | ||
s.postStartHookMu.Lock() | ||
defer s.postStartHookMu.Unlock() | ||
|
||
s.postStartHooksCalled = true | ||
|
||
for name, entry := range s.postStartHooks { | ||
go s.runPostStartHook(ctx, name, entry) | ||
} | ||
} | ||
|
||
func (s *GenericServer[T]) runPostStartHook(ctx T, name string, entry postStartHookEntry[T]) { | ||
defer func() { | ||
if v := recover(); v != nil { | ||
s.Log.Error("Panic while running post-start hook", | ||
lctx.Interface("error", v), | ||
lctx.Stack("stack"), | ||
) | ||
} | ||
}() | ||
|
||
s.Log.Info("Running post-start hook", lctx.Str("hook", name)) | ||
|
||
if err := entry.fn(ctx); err != nil { | ||
s.Log.Error("Could not run post-start hook", lctx.Str("name", name), lctx.Err(err)) | ||
} | ||
close(entry.doneCh) | ||
} | ||
|
||
func (s *GenericServer[T]) hasPreShutdownHooks() bool { | ||
s.preShutdownHookMu.Lock() | ||
defer s.preShutdownHookMu.Unlock() | ||
|
||
return len(s.preShutdownHooks) > 0 | ||
} | ||
|
||
func (s *GenericServer[T]) runPreShutdownHooks() error { | ||
s.preShutdownHookMu.Lock() | ||
defer s.preShutdownHookMu.Unlock() | ||
|
||
s.preShutdownHooksCalled = true | ||
|
||
var errs error | ||
for name, fn := range s.preShutdownHooks { | ||
if err := s.runPreShutdownHook(name, fn); err != nil { | ||
errs = errors.Join(errs, err) | ||
} | ||
} | ||
return errs | ||
} | ||
|
||
func (s *GenericServer[T]) runPreShutdownHook(name string, fn PreShutdownHookFunc) error { | ||
defer func() { | ||
if v := recover(); v != nil { | ||
s.Log.Error("Panic while running pre-shutdown hook", | ||
lctx.Interface("error", v), | ||
lctx.Stack("stack"), | ||
) | ||
} | ||
}() | ||
|
||
s.Log.Info("Running pre-shutdown hook", lctx.Str("hook", name)) | ||
|
||
if err := fn(); err != nil { | ||
return fmt.Errorf("running preshutdown hook %q: %w", name, err) | ||
} | ||
return nil | ||
} | ||
|
||
type postStartHookHealth struct { | ||
name string | ||
doneCh chan struct{} | ||
} | ||
|
||
func (h postStartHookHealth) Name() string { | ||
return h.name | ||
} | ||
|
||
func (h postStartHookHealth) Check(*http.Request) error { | ||
select { | ||
case <-h.doneCh: | ||
return nil | ||
default: | ||
return errors.New("not finished") | ||
} | ||
} |
Oops, something went wrong.