Skip to content

Commit

Permalink
Start using new session field on queued messages
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Jan 29, 2025
1 parent 622261d commit 1164b26
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 18 deletions.
2 changes: 1 addition & 1 deletion backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ func (b *backend) OnSendComplete(ctx context.Context, msg courier.MsgOut, status

// if message was successfully sent, and we have a session timeout, update it
wasSuccess := status.Status() == courier.MsgStatusWired || status.Status() == courier.MsgStatusSent || status.Status() == courier.MsgStatusDelivered || status.Status() == courier.MsgStatusRead
if wasSuccess && dbMsg.SessionTimeout_ != 0 {
if wasSuccess && dbMsg.Session_ != nil && dbMsg.Session_.Timeout > 0 {
if err := b.insertTimeoutFire(ctx, dbMsg); err != nil {
slog.Error("unable to update session timeout", "error", err, "session_id", dbMsg.SessionID_)
}
Expand Down
13 changes: 6 additions & 7 deletions backends/rapidpro/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,17 @@ type Msg struct {
UserID_ courier.UserID `json:"user_id"`
Origin_ courier.MsgOrigin `json:"origin"`
ContactLastSeenOn_ *time.Time `json:"contact_last_seen_on"`

// extra fields used to allow courier to update a session's timeout to *after* the message has been sent
SessionID_ SessionID `json:"session_id"`
SessionTimeout_ int `json:"session_timeout"`
SessionStatus_ string `json:"session_status"`
SessionModifiedOn_ *time.Time `json:"session_modified_on"`
Session_ *courier.Session `json:"session"`

ContactName_ string `json:"contact_name"`
URNAuthTokens_ map[string]string `json:"auth_tokens"`
channel *Channel
workerToken queue.WorkerToken
alreadyWritten bool

// deprecated
SessionID_ SessionID `json:"session_id"`
SessionModifiedOn_ *time.Time `json:"session_modified_on"`
}

// newMsg creates a new DBMsg object with the passed in parameters
Expand Down Expand Up @@ -162,7 +161,7 @@ func (m *Msg) IsResend() bool { return m.IsResend_ }
func (m *Msg) Flow() *courier.FlowReference { return m.Flow_ }
func (m *Msg) OptIn() *courier.OptInReference { return m.OptIn_ }
func (m *Msg) UserID() courier.UserID { return m.UserID_ }
func (m *Msg) SessionStatus() string { return m.SessionStatus_ }
func (m *Msg) Session() *courier.Session { return m.Session_ }
func (m *Msg) HighPriority() bool { return m.HighPriority_ }

// incoming specific
Expand Down
4 changes: 3 additions & 1 deletion backends/rapidpro/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ CREATE TABLE IF NOT EXISTS contacts_contactfire (
contact_id integer references contacts_contact(id) on delete cascade,
fire_type character varying(1) NOT NULL,
scope character varying(128) NOT NULL,
extra jsonb,
fire_on timestamp with time zone NOT NULL,
session_uuid uuid,
sprint_uuid uuid,
extra jsonb,
UNIQUE (contact_id, fire_type, scope)
);

Expand Down
8 changes: 4 additions & 4 deletions backends/rapidpro/timeouts.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ import (
type SessionID int64

const sqlInsertTimeoutFire = `
INSERT INTO contacts_contactfire(org_id, contact_id, fire_type, scope, extra, fire_on)
VALUES($1, $2, 'T', '', $3, $4)
INSERT INTO contacts_contactfire(org_id, contact_id, fire_type, scope, fire_on, session_uuid, sprint_uuid, extra)
VALUES($1, $2, 'T', '', $3, $4, $5, $6)
ON CONFLICT DO NOTHING`

// insertTimeoutFire inserts a timeout fire for the session associated with the given msg
func (b *backend) insertTimeoutFire(ctx context.Context, m *Msg) error {
extra := map[string]any{"session_id": m.SessionID_, "session_modified_on": m.SessionModifiedOn_}
timeoutOn := dates.Now().Add(time.Duration(m.SessionTimeout_) * time.Second)
timeoutOn := dates.Now().Add(time.Duration(m.Session_.Timeout) * time.Second)

_, err := b.db.ExecContext(ctx, sqlInsertTimeoutFire, m.OrgID_, m.ContactID_, jsonx.MustMarshal(extra), timeoutOn)
_, err := b.db.ExecContext(ctx, sqlInsertTimeoutFire, m.OrgID_, m.ContactID_, timeoutOn, m.Session_.UUID, m.Session_.SprintUUID, jsonx.MustMarshal(extra))
if err != nil {
return fmt.Errorf("error inserting session timeout contact fire for session #%d: %w", m.SessionID_, err)
}
Expand Down
5 changes: 4 additions & 1 deletion handlers/external/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,10 @@ func (h *handler) Send(ctx context.Context, msg courier.MsgOut, res *courier.Sen
"from": channel.Address(),
"from_no_plus": strings.TrimPrefix(channel.Address(), "+"),
"channel": string(channel.UUID()),
"session_status": msg.SessionStatus(),
"session_status": "",
}
if msg.Session() != nil {
form["session_status"] = msg.Session().Status
}

useNationalStr := channel.ConfigForKey(courier.ConfigUseNational, false)
Expand Down
8 changes: 6 additions & 2 deletions handlers/firebase/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ func (h *handler) sendWithAPIKey(msg courier.MsgOut, res *courier.SendResult, cl
payload.Data.Title = title
payload.Data.Message = part
payload.Data.MessageID = int64(msg.ID())
payload.Data.SessionStatus = msg.SessionStatus()
if msg.Session() != nil {
payload.Data.SessionStatus = msg.Session().Status
}

// include any quick replies on the last piece we send
if i == len(msgParts)-1 {
Expand Down Expand Up @@ -281,7 +283,9 @@ func (h *handler) sendWithCredsJSON(msg courier.MsgOut, res *courier.SendResult,
payload.Message.Data.Title = title
payload.Message.Data.Message = part
payload.Message.Data.MessageID = msg.ID().String()
payload.Message.Data.SessionStatus = msg.SessionStatus()
if msg.Session() != nil {
payload.Message.Data.SessionStatus = msg.Session().Status
}

if i == len(msgParts)-1 {
if msg.QuickReplies() != nil {
Expand Down
9 changes: 8 additions & 1 deletion msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ type Templating struct {
ExternalID string `json:"external_id"`
}

type Session struct {
UUID string `json:"uuid"`
Status string `json:"status"`
SprintUUID string `json:"sprint_uuid"`
Timeout int `json:"timeout"`
}

//-----------------------------------------------------------------------------
// Msg interface
//-----------------------------------------------------------------------------
Expand Down Expand Up @@ -110,8 +117,8 @@ type MsgOut interface {
Flow() *FlowReference
OptIn() *OptInReference
UserID() UserID
SessionStatus() string
HighPriority() bool
Session() *Session
}

// MsgIn is our interface to represent an incoming
Expand Down
3 changes: 2 additions & 1 deletion test/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type MockMsg struct {
metadata json.RawMessage
alreadyWritten bool
isResend bool
session *courier.Session

flow *courier.FlowReference
optIn *courier.OptInReference
Expand Down Expand Up @@ -75,7 +76,7 @@ func (m *MockMsg) IsResend() bool { return m.isResend }
func (m *MockMsg) Flow() *courier.FlowReference { return m.flow }
func (m *MockMsg) OptIn() *courier.OptInReference { return m.optIn }
func (m *MockMsg) UserID() courier.UserID { return m.userID }
func (m *MockMsg) SessionStatus() string { return "" }
func (m *MockMsg) Session() *courier.Session { return m.session }
func (m *MockMsg) HighPriority() bool { return m.highPriority }

// incoming specific
Expand Down

0 comments on commit 1164b26

Please sign in to comment.