Skip to content

Commit

Permalink
Start using new session field on queued messages
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Jan 29, 2025
1 parent 622261d commit 9a50a5b
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 19 deletions.
2 changes: 1 addition & 1 deletion backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ func (b *backend) OnSendComplete(ctx context.Context, msg courier.MsgOut, status

// if message was successfully sent, and we have a session timeout, update it
wasSuccess := status.Status() == courier.MsgStatusWired || status.Status() == courier.MsgStatusSent || status.Status() == courier.MsgStatusDelivered || status.Status() == courier.MsgStatusRead
if wasSuccess && dbMsg.SessionTimeout_ != 0 {
if wasSuccess && dbMsg.Session_ != nil && dbMsg.Session_.Timeout > 0 {
if err := b.insertTimeoutFire(ctx, dbMsg); err != nil {
slog.Error("unable to update session timeout", "error", err, "session_id", dbMsg.SessionID_)
}
Expand Down
10 changes: 9 additions & 1 deletion backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1352,6 +1352,12 @@ func (ts *BackendTestSuite) TestSessionTimeout() {
"urn": "telegram:3527065",
"created_on": "2017-07-21T19:22:23.242757Z",
"high_priority": true,
"session": {
"uuid": "79c1dbc6-4200-4333-b17a-1f996273a4cb",
"status": "W",
"sprint_uuid": "0897c392-8b08-43c4-b9d9-e75d332a2c58",
"timeout": 3600
},
"session_id": 12345,
"session_timeout": 3600,
"session_modified_on": "2025-01-28T20:43:34.157379218Z"
Expand All @@ -1363,12 +1369,14 @@ func (ts *BackendTestSuite) TestSessionTimeout() {
err := ts.b.insertTimeoutFire(ctx, msg)
ts.NoError(err)

assertdb.Query(ts.T(), ts.b.db, `SELECT org_id, contact_id, fire_type, scope, extra->>'session_id' AS session_id, extra->>'session_modified_on' AS session_modified_on FROM contacts_contactfire`).
assertdb.Query(ts.T(), ts.b.db, `SELECT org_id, contact_id, fire_type, scope, session_uuid::text, sprint_uuid::text, extra->>'session_id' AS session_id, extra->>'session_modified_on' AS session_modified_on FROM contacts_contactfire`).
Columns(map[string]any{
"org_id": int64(1),
"contact_id": int64(100),
"fire_type": "T",
"scope": "",
"session_uuid": "79c1dbc6-4200-4333-b17a-1f996273a4cb",
"sprint_uuid": "0897c392-8b08-43c4-b9d9-e75d332a2c58",
"session_id": "12345",
"session_modified_on": "2025-01-28T20:43:34.157379218Z",
})
Expand Down
13 changes: 6 additions & 7 deletions backends/rapidpro/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,17 @@ type Msg struct {
UserID_ courier.UserID `json:"user_id"`
Origin_ courier.MsgOrigin `json:"origin"`
ContactLastSeenOn_ *time.Time `json:"contact_last_seen_on"`

// extra fields used to allow courier to update a session's timeout to *after* the message has been sent
SessionID_ SessionID `json:"session_id"`
SessionTimeout_ int `json:"session_timeout"`
SessionStatus_ string `json:"session_status"`
SessionModifiedOn_ *time.Time `json:"session_modified_on"`
Session_ *courier.Session `json:"session"`

ContactName_ string `json:"contact_name"`
URNAuthTokens_ map[string]string `json:"auth_tokens"`
channel *Channel
workerToken queue.WorkerToken
alreadyWritten bool

// deprecated
SessionID_ SessionID `json:"session_id"`
SessionModifiedOn_ *time.Time `json:"session_modified_on"`
}

// newMsg creates a new DBMsg object with the passed in parameters
Expand Down Expand Up @@ -162,7 +161,7 @@ func (m *Msg) IsResend() bool { return m.IsResend_ }
func (m *Msg) Flow() *courier.FlowReference { return m.Flow_ }
func (m *Msg) OptIn() *courier.OptInReference { return m.OptIn_ }
func (m *Msg) UserID() courier.UserID { return m.UserID_ }
func (m *Msg) SessionStatus() string { return m.SessionStatus_ }
func (m *Msg) Session() *courier.Session { return m.Session_ }

Check warning on line 164 in backends/rapidpro/msg.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/msg.go#L164

Added line #L164 was not covered by tests
func (m *Msg) HighPriority() bool { return m.HighPriority_ }

// incoming specific
Expand Down
4 changes: 3 additions & 1 deletion backends/rapidpro/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ CREATE TABLE IF NOT EXISTS contacts_contactfire (
contact_id integer references contacts_contact(id) on delete cascade,
fire_type character varying(1) NOT NULL,
scope character varying(128) NOT NULL,
extra jsonb,
fire_on timestamp with time zone NOT NULL,
session_uuid uuid,
sprint_uuid uuid,
extra jsonb,
UNIQUE (contact_id, fire_type, scope)
);

Expand Down
8 changes: 4 additions & 4 deletions backends/rapidpro/timeouts.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ import (
type SessionID int64

const sqlInsertTimeoutFire = `
INSERT INTO contacts_contactfire(org_id, contact_id, fire_type, scope, extra, fire_on)
VALUES($1, $2, 'T', '', $3, $4)
INSERT INTO contacts_contactfire(org_id, contact_id, fire_type, scope, fire_on, session_uuid, sprint_uuid, extra)
VALUES($1, $2, 'T', '', $3, $4, $5, $6)
ON CONFLICT DO NOTHING`

// insertTimeoutFire inserts a timeout fire for the session associated with the given msg
func (b *backend) insertTimeoutFire(ctx context.Context, m *Msg) error {
extra := map[string]any{"session_id": m.SessionID_, "session_modified_on": m.SessionModifiedOn_}
timeoutOn := dates.Now().Add(time.Duration(m.SessionTimeout_) * time.Second)
timeoutOn := dates.Now().Add(time.Duration(m.Session_.Timeout) * time.Second)

_, err := b.db.ExecContext(ctx, sqlInsertTimeoutFire, m.OrgID_, m.ContactID_, jsonx.MustMarshal(extra), timeoutOn)
_, err := b.db.ExecContext(ctx, sqlInsertTimeoutFire, m.OrgID_, m.ContactID_, timeoutOn, m.Session_.UUID, m.Session_.SprintUUID, jsonx.MustMarshal(extra))
if err != nil {
return fmt.Errorf("error inserting session timeout contact fire for session #%d: %w", m.SessionID_, err)
}
Expand Down
5 changes: 4 additions & 1 deletion handlers/external/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,10 @@ func (h *handler) Send(ctx context.Context, msg courier.MsgOut, res *courier.Sen
"from": channel.Address(),
"from_no_plus": strings.TrimPrefix(channel.Address(), "+"),
"channel": string(channel.UUID()),
"session_status": msg.SessionStatus(),
"session_status": "",
}
if msg.Session() != nil {
form["session_status"] = msg.Session().Status

Check warning on line 304 in handlers/external/handler.go

View check run for this annotation

Codecov / codecov/patch

handlers/external/handler.go#L304

Added line #L304 was not covered by tests
}

useNationalStr := channel.ConfigForKey(courier.ConfigUseNational, false)
Expand Down
8 changes: 6 additions & 2 deletions handlers/firebase/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ func (h *handler) sendWithAPIKey(msg courier.MsgOut, res *courier.SendResult, cl
payload.Data.Title = title
payload.Data.Message = part
payload.Data.MessageID = int64(msg.ID())
payload.Data.SessionStatus = msg.SessionStatus()
if msg.Session() != nil {
payload.Data.SessionStatus = msg.Session().Status
}

Check warning on line 204 in handlers/firebase/handler.go

View check run for this annotation

Codecov / codecov/patch

handlers/firebase/handler.go#L203-L204

Added lines #L203 - L204 were not covered by tests

// include any quick replies on the last piece we send
if i == len(msgParts)-1 {
Expand Down Expand Up @@ -281,7 +283,9 @@ func (h *handler) sendWithCredsJSON(msg courier.MsgOut, res *courier.SendResult,
payload.Message.Data.Title = title
payload.Message.Data.Message = part
payload.Message.Data.MessageID = msg.ID().String()
payload.Message.Data.SessionStatus = msg.SessionStatus()
if msg.Session() != nil {
payload.Message.Data.SessionStatus = msg.Session().Status
}

Check warning on line 288 in handlers/firebase/handler.go

View check run for this annotation

Codecov / codecov/patch

handlers/firebase/handler.go#L287-L288

Added lines #L287 - L288 were not covered by tests

if i == len(msgParts)-1 {
if msg.QuickReplies() != nil {
Expand Down
9 changes: 8 additions & 1 deletion msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ type Templating struct {
ExternalID string `json:"external_id"`
}

type Session struct {
UUID string `json:"uuid"`
Status string `json:"status"`
SprintUUID string `json:"sprint_uuid"`
Timeout int `json:"timeout"`
}

//-----------------------------------------------------------------------------
// Msg interface
//-----------------------------------------------------------------------------
Expand Down Expand Up @@ -110,8 +117,8 @@ type MsgOut interface {
Flow() *FlowReference
OptIn() *OptInReference
UserID() UserID
SessionStatus() string
HighPriority() bool
Session() *Session
}

// MsgIn is our interface to represent an incoming
Expand Down
3 changes: 2 additions & 1 deletion test/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type MockMsg struct {
metadata json.RawMessage
alreadyWritten bool
isResend bool
session *courier.Session

flow *courier.FlowReference
optIn *courier.OptInReference
Expand Down Expand Up @@ -75,7 +76,7 @@ func (m *MockMsg) IsResend() bool { return m.isResend }
func (m *MockMsg) Flow() *courier.FlowReference { return m.flow }
func (m *MockMsg) OptIn() *courier.OptInReference { return m.optIn }
func (m *MockMsg) UserID() courier.UserID { return m.userID }
func (m *MockMsg) SessionStatus() string { return "" }
func (m *MockMsg) Session() *courier.Session { return m.session }
func (m *MockMsg) HighPriority() bool { return m.highPriority }

// incoming specific
Expand Down

0 comments on commit 9a50a5b

Please sign in to comment.