Skip to content

Commit

Permalink
netresource: Improve logging, fix bug with listeners being recreated …
Browse files Browse the repository at this point in the history
…on reload
  • Loading branch information
foxcpp committed Jan 30, 2025
1 parent f82742b commit d712d8c
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 26 deletions.
1 change: 0 additions & 1 deletion dist/systemd/maddy.service
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ RestartPreventExitStatus=2

ExecStart=/usr/local/bin/maddy run

ExecReload=/bin/kill -USR1 $MAINPID
ExecReload=/bin/kill -USR2 $MAINPID

[Install]
Expand Down
1 change: 0 additions & 1 deletion dist/systemd/maddy@.service
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ RestartPreventExitStatus=2

ExecStart=/usr/local/bin/maddy --config /etc/maddy/%i.conf run

ExecReload=/bin/kill -USR1 $MAINPID
ExecReload=/bin/kill -USR2 $MAINPID

[Install]
Expand Down
9 changes: 9 additions & 0 deletions docs/reference/endpoints/smtp.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ smtp tcp://0.0.0.0:25 {
sasl_login no
read_timeout 10m
write_timeout 1m
shutdown_timeout 3m
max_message_size 32M
max_header_size 1M
auth pam
Expand Down Expand Up @@ -125,6 +126,14 @@ I/O write timeout.

---

### shutdown_timeout _duration_
Default: `3m`

Time to wait until forcibly closing connections on server shutdown
or configuration reload.

---

### max_message_size _size_
Default: `32M`

Expand Down
4 changes: 3 additions & 1 deletion framework/module/lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ func (lt *LifetimeTracker) ReloadAll() error {

// StopAll calls Stop for all registered LifetimeModule instances.
func (lt *LifetimeTracker) StopAll() error {
for _, entry := range lt.instances {
for i := len(lt.instances) - 1; i >= 0; i-- {
entry := lt.instances[i]

if !entry.started {
continue
}
Expand Down
6 changes: 5 additions & 1 deletion framework/resource/netresource/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ var (
)

func CloseUnusedListeners() error {
return tracker.Close()
return tracker.CloseUnused()
}

func CloseAllListeners() {
tracker.Close()
}

func ResetListenersUsage() {
Expand Down
16 changes: 7 additions & 9 deletions framework/resource/netresource/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,8 @@ func (lt *ListenerTracker) ResetUsage() {
}

func (lt *ListenerTracker) CloseUnused() error {
lt.tcp.CloseUnused(func(key string) bool {
return false
})
lt.unix.CloseUnused(func(key string) bool {
return false
})
lt.tcp.CloseUnused(func(key string) bool { return true })
lt.unix.CloseUnused(func(key string) bool { return true })
return nil
}

Expand All @@ -83,9 +79,11 @@ func (lt *ListenerTracker) Close() error {
}

func NewListenerTracker(log *log.Logger) *ListenerTracker {
return &ListenerTracker{
lt := &ListenerTracker{
logger: log,
tcp: resource.NewTracker[*net.TCPListener](resource.NewSingleton[*net.TCPListener]()),
unix: resource.NewTracker[*net.UnixListener](resource.NewSingleton[*net.UnixListener]()),
tcp: resource.NewTracker[*net.TCPListener](resource.NewSingleton[*net.TCPListener](log.Sublogger("tcp"))),
unix: resource.NewTracker[*net.UnixListener](resource.NewSingleton[*net.UnixListener](log.Sublogger("unix"))),
}

return lt
}
10 changes: 9 additions & 1 deletion framework/resource/singleton.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ package resource

import (
"sync"

"github.com/foxcpp/maddy/framework/log"
)

// Singleton represents a set of resources identified by an unique key.
type Singleton[T Resource] struct {
log *log.Logger
lock sync.RWMutex
resources map[string]T
}

func NewSingleton[T Resource]() *Singleton[T] {
func NewSingleton[T Resource](log *log.Logger) *Singleton[T] {
return &Singleton[T]{
log: log,
resources: make(map[string]T),
}
}
Expand All @@ -22,6 +26,7 @@ func (s *Singleton[T]) GetOpen(key string, open func() (T, error)) (T, error) {

existing, ok := s.resources[key]
if ok {
s.log.DebugMsg("resource reused", "key", key)
return existing, nil
}

Expand All @@ -31,6 +36,7 @@ func (s *Singleton[T]) GetOpen(key string, open func() (T, error)) (T, error) {
return empty, err
}

s.log.DebugMsg("new resource", "key", key)
s.resources[key] = res

return res, nil
Expand All @@ -44,6 +50,7 @@ func (s *Singleton[T]) CloseUnused(isUsed func(key string) bool) error {
if isUsed(key) {
continue
}
s.log.DebugMsg("resource released", "key", key)
res.Close()
delete(s.resources, key)
}
Expand All @@ -56,6 +63,7 @@ func (s *Singleton[T]) Close() error {
defer s.lock.Unlock()

for key, res := range s.resources {
s.log.DebugMsg("resource released", "key", key)
res.Close()
delete(s.resources, key)
}
Expand Down
6 changes: 5 additions & 1 deletion framework/resource/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func (t *Tracker[T]) CloseUnused(isUsed func(key string) bool) error {

return t.C.CloseUnused(func(key string) bool {
used := t.used[key]
return used && isUsed(key)
used = used && isUsed(key)
if !used {
delete(t.used, key)
}
return used
})
}
11 changes: 9 additions & 2 deletions internal/endpoint/smtp/smtp.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ type Endpoint struct {
maxReceived int
maxHeaderBytes int64

sessionCnt atomic.Int32
sessionCnt atomic.Int32
shutdownTimeout time.Duration

authNormalize authz.NormalizeFunc
authMap module.Table
Expand Down Expand Up @@ -251,6 +252,7 @@ func (endp *Endpoint) setConfig(cfg *config.Map) error {
modconfig.Table(cfg, "auth_map", true, false, nil, &endp.saslAuth.AuthMap)
cfg.Duration("write_timeout", false, false, 1*time.Minute, &endp.serv.WriteTimeout)
cfg.Duration("read_timeout", false, false, 10*time.Minute, &endp.serv.ReadTimeout)
cfg.Duration("shutdown_timeout", false, false, 3*time.Minute, &endp.shutdownTimeout)
cfg.DataSize("max_message_size", false, false, 32*1024*1024, &endp.serv.MaxMessageBytes)
cfg.DataSize("max_header_size", false, false, 1*1024*1024, &endp.maxHeaderBytes)
cfg.Int("max_recipients", false, false, 20000, &endp.serv.MaxRecipients)
Expand Down Expand Up @@ -424,8 +426,13 @@ func (endp *Endpoint) ConnectionCount() int {
}

func (endp *Endpoint) Stop() error {
endp.serv.Close()
ctx, cancel := context.WithTimeout(context.Background(), endp.shutdownTimeout)
defer cancel()

endp.serv.Shutdown(ctx)

endp.listenersWg.Wait()

return nil
}

Expand Down
36 changes: 27 additions & 9 deletions maddy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"path/filepath"
"runtime"
"runtime/debug"
"sync"

"github.com/caddyserver/certmagic"
parser "github.com/foxcpp/maddy/framework/cfgparser"
Expand Down Expand Up @@ -206,6 +207,8 @@ func Run(c *cli.Context) error {
defer log.DefaultLogger.Out.Close()
defer hooks.RunHooks(hooks.EventShutdown)

hooks.AddHook(hooks.EventShutdown, netresource.CloseAllListeners)

if err := moduleMain(c.Path("config")); err != nil {
systemdStatusErr(err)
return cli.Exit(err.Error(), 1)
Expand Down Expand Up @@ -395,24 +398,29 @@ func moduleMain(configPath string) error {
}
c.DefaultLogger.Msg("server started", "version", Version)

systemdStatus(SDReady, "Listening for incoming connections...")
systemdStatus(SDReady, "Configuration running.")
asyncStopWg := sync.WaitGroup{} // Some containers might still be waiting on moduleStop
for handleSignals() {
systemdStatus(SDReloading, "Reloading state...")
hooks.RunHooks(hooks.EventReload)

c = moduleReload(c, configPath)
c = moduleReload(c, configPath, &asyncStopWg)
}

c.DefaultLogger.Msg("server stopping...")
systemdStatus(SDStopping, "Waiting for running transactions to complete...")

systemdStatus(SDStopping, "Waiting for old configuration to stop...")
asyncStopWg.Wait()

systemdStatus(SDStopping, "Waiting for current configuration to stop...")
moduleStop(c)
c.DefaultLogger.Msg("server stopped")

return nil
}

func moduleReload(oldContainer *container.C, configPath string) *container.C {
func moduleReload(oldContainer *container.C, configPath string, asyncStopWg *sync.WaitGroup) *container.C {
oldContainer.DefaultLogger.Msg("reloading server...")
systemdStatus(SDReloading, "Reloading server...")

oldContainer.DefaultLogger.Msg("loading new configuration...")
newContainer, err := moduleConfigure(configPath)
Expand All @@ -430,12 +438,22 @@ func moduleReload(oldContainer *container.C, configPath string) *container.C {
container.Global = oldContainer
return oldContainer
}
netresource.CloseUnusedListeners()

newContainer.DefaultLogger.Msg("server started", "version", Version)
oldContainer.DefaultLogger.Msg("stopping server")
moduleStop(oldContainer)
oldContainer.DefaultLogger.Msg("server stopped")

systemdStatus(SDReloading, "New configuration running. Waiting for old connections and transactions to finish...")

asyncStopWg.Add(1)
go func() {
defer asyncStopWg.Done()
defer netresource.CloseUnusedListeners()

oldContainer.DefaultLogger.Msg("stopping old server")
moduleStop(oldContainer)
oldContainer.DefaultLogger.Msg("old server stopped")

systemdStatus(SDReloading, "Configuration running.")
}()

return newContainer
}
Expand Down

0 comments on commit d712d8c

Please sign in to comment.