Skip to content

Commit

Permalink
collect cpu and memory stats for each container
Browse files Browse the repository at this point in the history
Collect runc stats and stream them back to the client via a new stream
  • Loading branch information
alexcb committed Nov 14, 2023
1 parent c7c1f7a commit b4cfdf1
Show file tree
Hide file tree
Showing 35 changed files with 1,513 additions and 114 deletions.
228 changes: 136 additions & 92 deletions api/services/control/control.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions api/services/control/control.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ message SolveResponse {

message StatusRequest {
string Ref = 1;

bool statsStream = 99; // earthly
}

message StatusResponse {
Expand Down
4 changes: 2 additions & 2 deletions client/llb/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type ExecOp struct {
isValidated bool
secrets []SecretInfo
ssh []SSHInfo
socket []SocketInfo // earthly
socket []SocketInfo // earthly-specific
}

func (e *ExecOp) AddMount(target string, source Output, opt ...MountOption) Output {
Expand Down Expand Up @@ -777,7 +777,7 @@ type ExecInfo struct {
ProxyEnv *ProxyEnv
Secrets []SecretInfo
SSH []SSHInfo
Socket []SocketInfo // earthly
Socket []SocketInfo // earthly-specific
}

type MountInfo struct {
Expand Down
3 changes: 2 additions & 1 deletion client/solve.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,8 @@ func (c *Client) solve(ctx context.Context, def *llb.Definition, runGateway runG

eg.Go(func() error {
stream, err := c.ControlClient().Status(statusContext, &controlapi.StatusRequest{
Ref: ref,
Ref: ref,
StatsStream: true, // earthly-specific request stats be streamed back
})
if err != nil {
return errors.Wrap(err, "failed to get status")
Expand Down
3 changes: 3 additions & 0 deletions cmd/buildkitd/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ type OCIConfig struct {

// earthly-specific: Hooks are things you can run during any phase of the OCI container runtime lifecycle.
Hooks []Hook `toml:"hook"`

// SampleFrequency is the frequency between sampling runc processes for stats
SampleFrequency time.Duration `toml:"sample-frequency"`
}

type ContainerdConfig struct {
Expand Down
5 changes: 5 additions & 0 deletions cmd/buildkitd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/containerd/containerd/pkg/seed" //nolint:staticcheck // SA1019 deprecated
"github.com/containerd/containerd/pkg/userns"
Expand Down Expand Up @@ -536,6 +537,10 @@ func setDefaultConfig(cfg *config.Config) {
appdefaults.EnsureUserAddressDir()
}

if cfg.Workers.OCI.SampleFrequency == 0 {
cfg.Workers.OCI.SampleFrequency = time.Second
}

if cfg.OTEL.SocketPath == "" {
cfg.OTEL.SocketPath = appdefaults.TraceSocketPath(isRootlessConfig())
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/buildkitd/main_oci_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker
})
}

opt, err := runc.NewWorkerOpt(common.config.Root, snFactory, cfg.Rootless, processMode, cfg.Labels, idmapping, nc, dns, cfg.Binary, cfg.ApparmorProfile, cfg.SELinux, parallelismSem, common.traceSocket, cfg.DefaultCgroupParent, ociHooks)
opt, err := runc.NewWorkerOpt(common.config.Root, snFactory, cfg.Rootless, processMode, cfg.Labels, idmapping, nc, dns, cfg.Binary, cfg.ApparmorProfile, cfg.SELinux, parallelismSem, common.traceSocket, cfg.DefaultCgroupParent, ociHooks, cfg.SampleFrequency)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func (c *Controller) Status(req *controlapi.StatusRequest, stream controlapi.Con

eg, ctx := errgroup.WithContext(stream.Context())
eg.Go(func() error {
return c.solver.Status(ctx, req.Ref, ch)
return c.solver.Status(ctx, req.Ref, req.StatsStream, ch)
})

eg.Go(func() error {
Expand Down
1 change: 1 addition & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type ProcessInfo struct {
Meta Meta
Stdin io.ReadCloser
Stdout, Stderr io.WriteCloser
StatsStream io.WriteCloser // earthly-specific
Resize <-chan WinSize
Signal <-chan syscall.Signal
}
Expand Down
6 changes: 6 additions & 0 deletions executor/runcexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Opt struct {
TracingSocket string
Hooks []oci.OciHook // earthly-specific
ResourceMonitor *resources.Monitor
SampleFrequency time.Duration // earthly-specific
}

var defaultCommandCandidates = []string{"buildkit-runc", "runc"}
Expand All @@ -77,6 +78,7 @@ type runcExecutor struct {
tracingSocket string
hooks []oci.OciHook // earthly-specific
resmon *resources.Monitor
sampleFrequency time.Duration // earthly-specific
}

func New(opt Opt, networkProviders map[pb.NetMode]network.Provider) (executor.Executor, error) {
Expand Down Expand Up @@ -144,6 +146,7 @@ func New(opt Opt, networkProviders map[pb.NetMode]network.Provider) (executor.Ex
tracingSocket: opt.TracingSocket,
hooks: opt.Hooks, // earthly-specific
resmon: opt.ResourceMonitor,
sampleFrequency: opt.SampleFrequency, // earthly-specific
}
return w, nil
}
Expand Down Expand Up @@ -327,6 +330,9 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount,
if started != nil {
close(started)
}
if process.StatsStream != nil {
go w.monitorContainerStats(ctx, id, w.sampleFrequency, process.StatsStream) // earthly-specific
}
if rec != nil {
rec.Start()
}
Expand Down
75 changes: 75 additions & 0 deletions executor/runcexecutor/monitor_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package runcexecutor

import (
"context"
"encoding/binary"
"encoding/json"
"io"
"time"

runc "github.com/containerd/go-runc"
"github.com/moby/buildkit/util/bklog"
"github.com/pkg/errors"
)

// earthly-specific: This entire file is earthly-specific, it is used to collect runc Stats and sends them via a stream to the client
//
// the stats protocol is:
// loop of stats events:
// <uint8> version (currently 1)
// <uint32> length of payload (stored as n)
// <bytes> n bytes (a json-encoded string of the go-runc Stats structure)

// writeUint32PrefixedBytes writes a uint32 representing the length of the b byte array, followed by the actual
// byte array.
func writeUint32PrefixedBytes(w io.Writer, b []byte) error {
n := len(b)
err := binary.Write(w, binary.LittleEndian, uint32(n))
if err != nil {
return err
}
_, err = w.Write(b)
return err
}

func writeStatsToStream(w io.Writer, stats *runc.Stats) error {
statsJSON, err := json.Marshal(stats)
if err != nil {
return errors.Wrap(err, "failed to encode runc stats")
}
err = binary.Write(w, binary.LittleEndian, uint8(1)) // earthly stats stream protocol v1
if err != nil {
return err
}
return writeUint32PrefixedBytes(w, statsJSON)
}

func (w *runcExecutor) monitorContainerStats(ctx context.Context, id string, sampleFrequency time.Duration, statsWriter io.WriteCloser) {
numFailuresAllowed := 10
for {
// sleep at the top of the loop to give it time to start
time.Sleep(sampleFrequency)

stats, err := w.runc.Stats(ctx, id)
if err != nil {
if numFailuresAllowed > 0 {
// allow the initial calls to runc.Stats to fail, for cases where the program didn't start within the initial
// sampleFrequency; this should only occur under heavy workloads
bklog.G(ctx).Warnf("ignoring runc stats collection error: %s", err)
numFailuresAllowed--
continue
}
bklog.G(ctx).Errorf("runc stats collection error: %s", err)
return
}

// once runc.Stats has succeeded, don't ignore future errors
numFailuresAllowed = 0

err = writeStatsToStream(statsWriter, stats)
if err != nil {
bklog.G(ctx).Errorf("failed to send runc stats to client-stream: %s", err)
return
}
}
}
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ require (
kernel.org/pub/linux/libs/security/libcap/cap v1.2.67
)

require github.com/docker/distribution v2.8.1+incompatible
require (
github.com/docker/distribution v2.8.1+incompatible
github.com/dustin/go-humanize v1.0.0
)

require (
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 // indirect
Expand Down Expand Up @@ -169,4 +172,4 @@ require (
kernel.org/pub/linux/libs/security/libcap/psx v1.2.67 // indirect
)

replace github.com/tonistiigi/fsutil => github.com/earthly/fsutil v0.0.0-20231030221755-644b08355b65
replace github.com/tonistiigi/fsutil => github.com/alexcb/fsutil v0.0.0-20231030221755-644b08355b65
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/alexcb/fsutil v0.0.0-20231030221755-644b08355b65 h1:owMPrDW24IXzQwhrg6DAM7a5VFdOoRoQ4urVFCaGMy4=
github.com/alexcb/fsutil v0.0.0-20231030221755-644b08355b65/go.mod h1:9kMVqMyQ/Sx2df5LtnGG+nbrmiZzCS7V6gjW3oGHsvI=
github.com/alexflint/go-filemutex v0.0.0-20171022225611-72bdc8eae2ae/go.mod h1:CgnQgUtFrFz9mxFNtED3jI5tLDjKlOM+oUF/sTk6ps0=
github.com/anchore/go-struct-converter v0.0.0-20221118182256-c68fdcfa2092 h1:aM1rlcoLz8y5B2r4tTLMiVTrMtpfY0O8EScKJxaSaEc=
github.com/anchore/go-struct-converter v0.0.0-20221118182256-c68fdcfa2092/go.mod h1:rYqSE9HbjzpHTI74vwPvae4ZVYZd1lue2ta6xHPdblA=
Expand Down Expand Up @@ -472,12 +474,11 @@ github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1/go.mod h1:cyGadeNE
github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/earthly/fsutil v0.0.0-20231030221755-644b08355b65 h1:6oyWHoxHXwcTt4EqmMw6361scIV87uEAB1N42+VpIwk=
github.com/earthly/fsutil v0.0.0-20231030221755-644b08355b65/go.mod h1:9kMVqMyQ/Sx2df5LtnGG+nbrmiZzCS7V6gjW3oGHsvI=
github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
Expand Down
17 changes: 13 additions & 4 deletions solver/llbsolver/ops/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,14 @@ func (e *ExecOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu
}
}()

// earthly-specific
statsStream, statsFlush := logs.NewStatsStreams(ctx, os.Getenv("BUILDKIT_DEBUG_EXEC_OUTPUT") == "1")
defer func() {
if err != nil {
statsFlush()
}
}()

isLocal, err := e.doFromLocalHack(ctx, p.Root, p.Mounts, g, meta, stdout, stderr)
if err != nil {
return nil, err
Expand All @@ -396,10 +404,11 @@ func (e *ExecOp) Exec(ctx context.Context, g session.Group, inputs []solver.Resu
var rec resourcestypes.Recorder
if !isLocal {
rec, execErr = e.exec.Run(ctx, "", p.Root, p.Mounts, executor.ProcessInfo{
Meta: meta,
Stdin: nil,
Stdout: stdout,
Stderr: stderr,
Meta: meta,
Stdin: nil,
Stdout: stdout,
Stderr: stderr,
StatsStream: statsStream, // earthly-specific
}, nil)
}

Expand Down
6 changes: 3 additions & 3 deletions solver/llbsolver/solver.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (s *Solver) recordBuildHistory(ctx context.Context, id string, req frontend
return nil
})
eg.Go(func() error {
return j.Status(ctx2, ch)
return j.Status(ctx2, false, ch)
})

if descref != nil {
Expand Down Expand Up @@ -879,7 +879,7 @@ func withDescHandlerCacheOpts(ctx context.Context, ref cache.ImmutableRef) conte
})
}

func (s *Solver) Status(ctx context.Context, id string, statusChan chan *client.SolveStatus) error {
func (s *Solver) Status(ctx context.Context, id string, statsStream bool, statusChan chan *client.SolveStatus) error {
if err := s.history.Status(ctx, id, statusChan); err != nil {
if !errors.Is(err, os.ErrNotExist) {
close(statusChan)
Expand All @@ -894,7 +894,7 @@ func (s *Solver) Status(ctx context.Context, id string, statusChan chan *client.
close(statusChan)
return err
}
return j.Status(ctx, statusChan)
return j.Status(ctx, statsStream, statusChan)
}

func defaultResolver(wc *worker.Controller) ResolveWorkerFunc {
Expand Down
14 changes: 13 additions & 1 deletion solver/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import (
"time"

"github.com/moby/buildkit/util/bklog"
"github.com/moby/buildkit/util/progress/logs"

"github.com/moby/buildkit/client"
"github.com/moby/buildkit/util/progress"
digest "github.com/opencontainers/go-digest"
)

func (j *Job) Status(ctx context.Context, ch chan *client.SolveStatus) error {
func (j *Job) Status(ctx context.Context, statsStream bool, ch chan *client.SolveStatus) error {
vs := &vertexStream{cache: map[digest.Digest]*client.Vertex{}, wasCached: make(map[digest.Digest]struct{})}
pr := j.pr.Reader(ctx)
defer func() {
Expand All @@ -23,6 +24,17 @@ func (j *Job) Status(ctx context.Context, ch chan *client.SolveStatus) error {
close(ch)
}()

if !statsStream { // earthly-specific: don't stream stats back to old clients (which will cause them to print binary data to stderr)
pr = progress.NewFilteredReader(pr, func(ctx context.Context, p *progress.Progress) (bool, error) {
if vl, ok := p.Sys.(client.VertexLog); ok {
if vl.Stream == logs.StatsStream {
return true, nil
}
}
return false, nil
})
}

for {
p, err := pr.Read(ctx)
if err != nil {
Expand Down
40 changes: 40 additions & 0 deletions util/progress/filtered.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package progress

import (
"context"
)

// this entire file is earthly-specific, it implements a filtered Reader which can be used to skip statsStream data

type FilteredReaderSkipFn func(ctx context.Context, p *Progress) (bool, error)

type filteredReader struct {
r Reader
skipFn FilteredReaderSkipFn
}

func NewFilteredReader(r Reader, skipFn FilteredReaderSkipFn) Reader {
return &filteredReader{
r: r,
skipFn: skipFn,
}
}

func (fr *filteredReader) Read(ctx context.Context) ([]*Progress, error) {
progress, err := fr.r.Read(ctx)
if err != nil {
return nil, err
}
filteredProgress := []*Progress{}
for _, p := range progress {
shouldSkip, err := fr.skipFn(ctx, p)
if err != nil {
return nil, err
}
if shouldSkip {
continue
}
filteredProgress = append(filteredProgress, p)
}
return filteredProgress, nil
}
Loading

0 comments on commit b4cfdf1

Please sign in to comment.