Skip to content

Commit

Permalink
[gateway] propagate exit code properly when persisting sessions (#670)
Browse files Browse the repository at this point in the history
  • Loading branch information
sandromello authored Jan 31, 2025
1 parent 226dfa0 commit dad3efd
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 71 deletions.
42 changes: 20 additions & 22 deletions gateway/transport/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package transport

import (
"fmt"
"strconv"

"github.com/getsentry/sentry-go"
"github.com/hoophq/hoop/common/log"
Expand Down Expand Up @@ -90,38 +91,35 @@ func (s *Server) listenAgentMessages(pctx *plugintypes.Context, stream *streamcl
}

if _, err := proxyStream.PluginExecOnReceive(*pctx, pkt); err != nil {
log.Warnf("plugin reject packet, err=%v", err)
log.With("sid", pctx.SID).Warnf("plugin reject packet, err=%v", err)
sentry.CaptureException(err)
return status.Errorf(codes.Internal, "internal error, plugin reject packet")
}

if pb.PacketType(pkt.Type) == pbclient.SessionClose {
var trackErr error
if len(pkt.Payload) > 0 {
trackErr = fmt.Errorf(string(pkt.Payload))
}
// it will make sure to run the disconnect plugin phase for both clients
_ = proxyStream.Close(trackErr)
_ = proxyStream.Close(buildErrorFromPacket(pctx.SID, pkt))
}
if err = proxyStream.Send(pkt); err != nil {
log.With("sid", pctx.SID).Debugf("failed to send packet to proxy stream, err=%v", err)
}
}
}

// func (s *Server) configurationData(orgName string) []byte {
// var transportConfigBytes []byte
// transportConfigBytes, _ = pb.GobEncode(monitoring.TransportConfig{
// Sentry: monitoring.SentryConfig{
// OrgName: orgName,
// Environment: monitoring.NormalizeEnvironment(s.IDProvider.ApiURL),
// },
// Profiler: monitoring.ProfilerConfig{
// PyroscopeServerAddress: s.PyroscopeIngestURL,
// PyroscopeAuthToken: s.PyroscopeAuthToken,
// OrgName: orgName,
// Environment: monitoring.NormalizeEnvironment(s.IDProvider.ApiURL),
// },
// })
// return transportConfigBytes
// }
func buildErrorFromPacket(sid string, pkt *pb.Packet) error {
var exitCode *int
exitCodeStr := string(pkt.Spec[pb.SpecClientExitCodeKey])
ecode, err := strconv.Atoi(exitCodeStr)
exitCode = &ecode
if err != nil {
exitCode = func() *int { v := 254; return &v }() // internal error code
}

log.With("sid", sid).Infof("session result, exit_code=%q, payload_length=%v",
exitCodeStr, len(pkt.Payload))
if len(pkt.Payload) == 0 && (exitCode == nil || *exitCode == 0) {
return nil
}

return plugintypes.NewPacketErr(string(pkt.Payload), exitCode)
}
60 changes: 16 additions & 44 deletions gateway/transport/plugins/audit/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,13 @@ import (
plugintypes "github.com/hoophq/hoop/gateway/transport/plugins/types"
)

var (
memorySessionStore = memory.New()
internalExitCode = func() *int { v := 254; return &v }()
)
var memorySessionStore = memory.New()

type (
auditPlugin struct {
walSessionStore memory.Store
started bool
mu sync.RWMutex
}
)
type auditPlugin struct {
walSessionStore memory.Store
started bool
mu sync.RWMutex
}

func New() *auditPlugin { return &auditPlugin{walSessionStore: memory.New()} }
func (p *auditPlugin) Name() string { return plugintypes.PluginAuditName }
Expand Down Expand Up @@ -154,13 +149,6 @@ func (p *auditPlugin) OnReceive(pctx plugintypes.Context, pkt *pb.Packet) (*plug
log.Warnf("failed writing agent packet response, err=%v", err)
}
return nil, nil
case pbclient.SessionClose:
exitCode := parseExitCode(pctx.SID, pkt)
if len(pkt.Payload) > 0 {
p.closeSession(pctx, exitCode, fmt.Errorf(string(pkt.Payload)))
return nil, nil
}
p.closeSession(pctx, exitCode, nil)
case pbagent.ExecWriteStdin,
pbagent.TerminalWriteStdin,
pbagent.TCPConnectionWrite:
Expand All @@ -169,9 +157,10 @@ func (p *auditPlugin) OnReceive(pctx plugintypes.Context, pkt *pb.Packet) (*plug
return nil, nil
}

func (p *auditPlugin) OnDisconnect(pctx plugintypes.Context, errMsg error) error {
func (p *auditPlugin) OnDisconnect(pctx plugintypes.Context, err error) error {
log.With("sid", pctx.SID, "origin", pctx.ClientOrigin, "agent", pctx.AgentName).
Debugf("processing disconnect")

switch pctx.ClientOrigin {
case pb.ConnectionOriginAgent:
log.With("agent", pctx.AgentName).Infof("agent shutdown, graceful closing session")
Expand All @@ -180,21 +169,22 @@ func (p *auditPlugin) OnDisconnect(pctx plugintypes.Context, errMsg error) error
continue
}
pctx.SID = msid
p.closeSession(pctx, internalExitCode, errMsg)
p.closeSession(pctx, err)
}
default:
p.closeSession(pctx, internalExitCode, errMsg)
p.closeSession(pctx, err)
}
return nil
}

func (p *auditPlugin) closeSession(pctx plugintypes.Context, exitCode *int, errMsg error) {
log.With("sid", pctx.SID).Infof("closing session, exit_code=%v, reason=%v", debugExitCode(exitCode), errMsg)
func (p *auditPlugin) closeSession(pctx plugintypes.Context, err error) {
log.With("sid", pctx.SID, "origin", pctx.ClientOrigin, "verb", pctx.ClientVerb).
Infof("closing session, reason=%v", err)
go func() {
defer memorySessionStore.Del(pctx.SID)
if err := p.writeOnClose(pctx, exitCode, errMsg); err != nil {
log.With("sid", pctx.SID).Warnf("failed closing session, reason=%v", err)
return
if err := p.writeOnClose(pctx, err); err != nil {
log.With("sid", pctx.SID, "origin", pctx.ClientOrigin, "verb", pctx.ClientVerb).
Warnf("failed closing session, reason=%v", err)
}
}()
}
Expand Down Expand Up @@ -249,21 +239,3 @@ func parseSpecAsEventMetadata(pkt *pb.Packet) map[string][]byte {
}
return nil
}

func parseExitCode(sid string, pkt *pb.Packet) (exitCode *int) {
exitCodeStr, ok := pkt.Spec[pb.SpecClientExitCodeKey]
if ok {
if ecode, err := strconv.Atoi(string(exitCodeStr)); err == nil {
exitCode = &ecode
}
}
log.With("sid", sid).Debugf("raw exit code value=%q, has_exit_code_spec=%v", exitCodeStr, ok)
return
}

func debugExitCode(exitCode *int) string {
if exitCode == nil {
return "<nil>"
}
return fmt.Sprintf("%v", *exitCode)
}
22 changes: 19 additions & 3 deletions gateway/transport/plugins/audit/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
eventLogTypeName string = "_footer_error"
)

var internalExitCode = func() *int { v := 254; return &v }()

type walLogRWMutex struct {
log *sessionwal.WalLog
mu sync.RWMutex
Expand Down Expand Up @@ -78,7 +80,7 @@ func (p *auditPlugin) dropWalLog(sid string) {
walogm.mu.Unlock()
}

func (p *auditPlugin) writeOnClose(pctx plugintypes.Context, exitCode *int, errMsg error) error {
func (p *auditPlugin) writeOnClose(pctx plugintypes.Context, errMsg error) error {
walLogObj := p.walSessionStore.Pop(pctx.SID)
walogm, ok := walLogObj.(*walLogRWMutex)
if !ok {
Expand Down Expand Up @@ -171,17 +173,19 @@ func (p *auditPlugin) writeOnClose(pctx plugintypes.Context, exitCode *int, errM
endDate := time.Now().UTC()
sessionMetrics, err := metrics.toMap()
if err != nil {
log.Warnf("failed parsing session metrics to map, reason=%v", err)
log.With("sid", pctx.SID).Warnf("failed parsing session metrics to map, reason=%v", err)
}
err = models.UpdateSessionEventStream(models.SessionDone{
ID: wh.SessionID,
OrgID: wh.OrgID,
Metrics: sessionMetrics,
BlobStream: json.RawMessage(rawJSONBlobStream),
Status: string(openapi.SessionStatusDone),
ExitCode: exitCode,
ExitCode: parseExitCodeFromErr(errMsg),
EndSession: &endDate,
})
log.With("sid", pctx.SID, "origin", pctx.ClientOrigin, "verb", pctx.ClientVerb).
Infof("finished persisting session to store, err=%v", errMsg)

if err != nil {
_ = walogm.log.Write(eventlogv1.NewCommitError(endDate, err.Error()))
Expand All @@ -199,3 +203,15 @@ func (p *auditPlugin) truncateTCPEventStream(eventStream []byte, connType string
}
return eventStream
}

func parseExitCodeFromErr(err error) (exitCode *int) {
switch v := err.(type) {
case *plugintypes.PacketErr:
exitCode = v.ExitCode()
case nil:
exitCode = func() *int { v := 0; return &v }()
default:
exitCode = internalExitCode
}
return
}
12 changes: 12 additions & 0 deletions gateway/transport/plugins/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@ import (

type GenericMap map[string]any

type PacketErr struct {
exitCode *int
msg string
}

func (e PacketErr) Error() string { return e.msg }
func (e PacketErr) ExitCode() *int { return e.exitCode }

func NewPacketErr(msg string, exitCode *int) error {
return &PacketErr{msg: msg, exitCode: exitCode}
}

type Context struct {
Context context.Context
// Session ID
Expand Down
4 changes: 2 additions & 2 deletions webapp/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit dad3efd

Please sign in to comment.