From dad3efd0866c2d646e2fa5372b68f11950da870a Mon Sep 17 00:00:00 2001 From: Sandro Mello Date: Fri, 31 Jan 2025 13:13:25 -0300 Subject: [PATCH] [gateway] propagate exit code properly when persisting sessions (#670) --- gateway/transport/agent.go | 42 ++++++++--------- gateway/transport/plugins/audit/audit.go | 60 +++++++----------------- gateway/transport/plugins/audit/wal.go | 22 +++++++-- gateway/transport/plugins/types/types.go | 12 +++++ webapp/package-lock.json | 4 +- 5 files changed, 69 insertions(+), 71 deletions(-) diff --git a/gateway/transport/agent.go b/gateway/transport/agent.go index f57a6b27..364597c0 100644 --- a/gateway/transport/agent.go +++ b/gateway/transport/agent.go @@ -2,6 +2,7 @@ package transport import ( "fmt" + "strconv" "github.com/getsentry/sentry-go" "github.com/hoophq/hoop/common/log" @@ -90,18 +91,14 @@ func (s *Server) listenAgentMessages(pctx *plugintypes.Context, stream *streamcl } if _, err := proxyStream.PluginExecOnReceive(*pctx, pkt); err != nil { - log.Warnf("plugin reject packet, err=%v", err) + log.With("sid", pctx.SID).Warnf("plugin reject packet, err=%v", err) sentry.CaptureException(err) return status.Errorf(codes.Internal, "internal error, plugin reject packet") } if pb.PacketType(pkt.Type) == pbclient.SessionClose { - var trackErr error - if len(pkt.Payload) > 0 { - trackErr = fmt.Errorf(string(pkt.Payload)) - } // it will make sure to run the disconnect plugin phase for both clients - _ = proxyStream.Close(trackErr) + _ = proxyStream.Close(buildErrorFromPacket(pctx.SID, pkt)) } if err = proxyStream.Send(pkt); err != nil { log.With("sid", pctx.SID).Debugf("failed to send packet to proxy stream, err=%v", err) @@ -109,19 +106,20 @@ func (s *Server) listenAgentMessages(pctx *plugintypes.Context, stream *streamcl } } -// func (s *Server) configurationData(orgName string) []byte { -// var transportConfigBytes []byte -// transportConfigBytes, _ = pb.GobEncode(monitoring.TransportConfig{ -// Sentry: monitoring.SentryConfig{ -// OrgName: orgName, -// Environment: monitoring.NormalizeEnvironment(s.IDProvider.ApiURL), -// }, -// Profiler: monitoring.ProfilerConfig{ -// PyroscopeServerAddress: s.PyroscopeIngestURL, -// PyroscopeAuthToken: s.PyroscopeAuthToken, -// OrgName: orgName, -// Environment: monitoring.NormalizeEnvironment(s.IDProvider.ApiURL), -// }, -// }) -// return transportConfigBytes -// } +func buildErrorFromPacket(sid string, pkt *pb.Packet) error { + var exitCode *int + exitCodeStr := string(pkt.Spec[pb.SpecClientExitCodeKey]) + ecode, err := strconv.Atoi(exitCodeStr) + exitCode = &ecode + if err != nil { + exitCode = func() *int { v := 254; return &v }() // internal error code + } + + log.With("sid", sid).Infof("session result, exit_code=%q, payload_length=%v", + exitCodeStr, len(pkt.Payload)) + if len(pkt.Payload) == 0 && (exitCode == nil || *exitCode == 0) { + return nil + } + + return plugintypes.NewPacketErr(string(pkt.Payload), exitCode) +} diff --git a/gateway/transport/plugins/audit/audit.go b/gateway/transport/plugins/audit/audit.go index cf3343c2..12563206 100644 --- a/gateway/transport/plugins/audit/audit.go +++ b/gateway/transport/plugins/audit/audit.go @@ -23,18 +23,13 @@ import ( plugintypes "github.com/hoophq/hoop/gateway/transport/plugins/types" ) -var ( - memorySessionStore = memory.New() - internalExitCode = func() *int { v := 254; return &v }() -) +var memorySessionStore = memory.New() -type ( - auditPlugin struct { - walSessionStore memory.Store - started bool - mu sync.RWMutex - } -) +type auditPlugin struct { + walSessionStore memory.Store + started bool + mu sync.RWMutex +} func New() *auditPlugin { return &auditPlugin{walSessionStore: memory.New()} } func (p *auditPlugin) Name() string { return plugintypes.PluginAuditName } @@ -154,13 +149,6 @@ func (p *auditPlugin) OnReceive(pctx plugintypes.Context, pkt *pb.Packet) (*plug log.Warnf("failed writing agent packet response, err=%v", err) } return nil, nil - case pbclient.SessionClose: - exitCode := parseExitCode(pctx.SID, pkt) - if len(pkt.Payload) > 0 { - p.closeSession(pctx, exitCode, fmt.Errorf(string(pkt.Payload))) - return nil, nil - } - p.closeSession(pctx, exitCode, nil) case pbagent.ExecWriteStdin, pbagent.TerminalWriteStdin, pbagent.TCPConnectionWrite: @@ -169,9 +157,10 @@ func (p *auditPlugin) OnReceive(pctx plugintypes.Context, pkt *pb.Packet) (*plug return nil, nil } -func (p *auditPlugin) OnDisconnect(pctx plugintypes.Context, errMsg error) error { +func (p *auditPlugin) OnDisconnect(pctx plugintypes.Context, err error) error { log.With("sid", pctx.SID, "origin", pctx.ClientOrigin, "agent", pctx.AgentName). Debugf("processing disconnect") + switch pctx.ClientOrigin { case pb.ConnectionOriginAgent: log.With("agent", pctx.AgentName).Infof("agent shutdown, graceful closing session") @@ -180,21 +169,22 @@ func (p *auditPlugin) OnDisconnect(pctx plugintypes.Context, errMsg error) error continue } pctx.SID = msid - p.closeSession(pctx, internalExitCode, errMsg) + p.closeSession(pctx, err) } default: - p.closeSession(pctx, internalExitCode, errMsg) + p.closeSession(pctx, err) } return nil } -func (p *auditPlugin) closeSession(pctx plugintypes.Context, exitCode *int, errMsg error) { - log.With("sid", pctx.SID).Infof("closing session, exit_code=%v, reason=%v", debugExitCode(exitCode), errMsg) +func (p *auditPlugin) closeSession(pctx plugintypes.Context, err error) { + log.With("sid", pctx.SID, "origin", pctx.ClientOrigin, "verb", pctx.ClientVerb). + Infof("closing session, reason=%v", err) go func() { defer memorySessionStore.Del(pctx.SID) - if err := p.writeOnClose(pctx, exitCode, errMsg); err != nil { - log.With("sid", pctx.SID).Warnf("failed closing session, reason=%v", err) - return + if err := p.writeOnClose(pctx, err); err != nil { + log.With("sid", pctx.SID, "origin", pctx.ClientOrigin, "verb", pctx.ClientVerb). + Warnf("failed closing session, reason=%v", err) } }() } @@ -249,21 +239,3 @@ func parseSpecAsEventMetadata(pkt *pb.Packet) map[string][]byte { } return nil } - -func parseExitCode(sid string, pkt *pb.Packet) (exitCode *int) { - exitCodeStr, ok := pkt.Spec[pb.SpecClientExitCodeKey] - if ok { - if ecode, err := strconv.Atoi(string(exitCodeStr)); err == nil { - exitCode = &ecode - } - } - log.With("sid", sid).Debugf("raw exit code value=%q, has_exit_code_spec=%v", exitCodeStr, ok) - return -} - -func debugExitCode(exitCode *int) string { - if exitCode == nil { - return "" - } - return fmt.Sprintf("%v", *exitCode) -} diff --git a/gateway/transport/plugins/audit/wal.go b/gateway/transport/plugins/audit/wal.go index b7a56824..a3adfe73 100644 --- a/gateway/transport/plugins/audit/wal.go +++ b/gateway/transport/plugins/audit/wal.go @@ -26,6 +26,8 @@ const ( eventLogTypeName string = "_footer_error" ) +var internalExitCode = func() *int { v := 254; return &v }() + type walLogRWMutex struct { log *sessionwal.WalLog mu sync.RWMutex @@ -78,7 +80,7 @@ func (p *auditPlugin) dropWalLog(sid string) { walogm.mu.Unlock() } -func (p *auditPlugin) writeOnClose(pctx plugintypes.Context, exitCode *int, errMsg error) error { +func (p *auditPlugin) writeOnClose(pctx plugintypes.Context, errMsg error) error { walLogObj := p.walSessionStore.Pop(pctx.SID) walogm, ok := walLogObj.(*walLogRWMutex) if !ok { @@ -171,7 +173,7 @@ func (p *auditPlugin) writeOnClose(pctx plugintypes.Context, exitCode *int, errM endDate := time.Now().UTC() sessionMetrics, err := metrics.toMap() if err != nil { - log.Warnf("failed parsing session metrics to map, reason=%v", err) + log.With("sid", pctx.SID).Warnf("failed parsing session metrics to map, reason=%v", err) } err = models.UpdateSessionEventStream(models.SessionDone{ ID: wh.SessionID, @@ -179,9 +181,11 @@ func (p *auditPlugin) writeOnClose(pctx plugintypes.Context, exitCode *int, errM Metrics: sessionMetrics, BlobStream: json.RawMessage(rawJSONBlobStream), Status: string(openapi.SessionStatusDone), - ExitCode: exitCode, + ExitCode: parseExitCodeFromErr(errMsg), EndSession: &endDate, }) + log.With("sid", pctx.SID, "origin", pctx.ClientOrigin, "verb", pctx.ClientVerb). + Infof("finished persisting session to store, err=%v", errMsg) if err != nil { _ = walogm.log.Write(eventlogv1.NewCommitError(endDate, err.Error())) @@ -199,3 +203,15 @@ func (p *auditPlugin) truncateTCPEventStream(eventStream []byte, connType string } return eventStream } + +func parseExitCodeFromErr(err error) (exitCode *int) { + switch v := err.(type) { + case *plugintypes.PacketErr: + exitCode = v.ExitCode() + case nil: + exitCode = func() *int { v := 0; return &v }() + default: + exitCode = internalExitCode + } + return +} diff --git a/gateway/transport/plugins/types/types.go b/gateway/transport/plugins/types/types.go index 8416bb18..e9b0e4af 100644 --- a/gateway/transport/plugins/types/types.go +++ b/gateway/transport/plugins/types/types.go @@ -12,6 +12,18 @@ import ( type GenericMap map[string]any +type PacketErr struct { + exitCode *int + msg string +} + +func (e PacketErr) Error() string { return e.msg } +func (e PacketErr) ExitCode() *int { return e.exitCode } + +func NewPacketErr(msg string, exitCode *int) error { + return &PacketErr{msg: msg, exitCode: exitCode} +} + type Context struct { Context context.Context // Session ID diff --git a/webapp/package-lock.json b/webapp/package-lock.json index d0fd21db..3963fc19 100644 --- a/webapp/package-lock.json +++ b/webapp/package-lock.json @@ -1,12 +1,12 @@ { "name": "webapp", - "version": "1.35.9", + "version": "1.35.12", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "webapp", - "version": "1.35.9", + "version": "1.35.12", "dependencies": { "@codemirror/commands": "^6.3.2", "@codemirror/lang-javascript": "^6.2.1",