Skip to content

Commit

Permalink
Merge pull request #53 from capcom6/issue/34-per-period-limit
Browse files Browse the repository at this point in the history
Add per period limit, see #34
  • Loading branch information
capcom6 authored Apr 5, 2024
2 parents e3206cb + 06ca8cb commit 097b835
Show file tree
Hide file tree
Showing 18 changed files with 346 additions and 31 deletions.
2 changes: 1 addition & 1 deletion .idea/gradle.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

169 changes: 169 additions & 0 deletions app/schemas/me.capcom.smsgateway.data.AppDatabase/9.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
{
"formatVersion": 1,
"database": {
"version": 9,
"identityHash": "bfb73410aeb4903b9ea559c1b60bd8e3",
"entities": [
{
"tableName": "Message",
"createSql": "CREATE TABLE IF NOT EXISTS `${TABLE_NAME}` (`id` TEXT NOT NULL, `text` TEXT NOT NULL, `withDeliveryReport` INTEGER NOT NULL DEFAULT 1, `simNumber` INTEGER, `validUntil` TEXT, `isEncrypted` INTEGER NOT NULL DEFAULT 0, `skipPhoneValidation` INTEGER NOT NULL DEFAULT 0, `source` TEXT NOT NULL DEFAULT 'Local', `state` TEXT NOT NULL, `createdAt` INTEGER NOT NULL DEFAULT 0, `processedAt` INTEGER, PRIMARY KEY(`id`))",
"fields": [
{
"fieldPath": "id",
"columnName": "id",
"affinity": "TEXT",
"notNull": true
},
{
"fieldPath": "text",
"columnName": "text",
"affinity": "TEXT",
"notNull": true
},
{
"fieldPath": "withDeliveryReport",
"columnName": "withDeliveryReport",
"affinity": "INTEGER",
"notNull": true,
"defaultValue": "1"
},
{
"fieldPath": "simNumber",
"columnName": "simNumber",
"affinity": "INTEGER",
"notNull": false
},
{
"fieldPath": "validUntil",
"columnName": "validUntil",
"affinity": "TEXT",
"notNull": false
},
{
"fieldPath": "isEncrypted",
"columnName": "isEncrypted",
"affinity": "INTEGER",
"notNull": true,
"defaultValue": "0"
},
{
"fieldPath": "skipPhoneValidation",
"columnName": "skipPhoneValidation",
"affinity": "INTEGER",
"notNull": true,
"defaultValue": "0"
},
{
"fieldPath": "source",
"columnName": "source",
"affinity": "TEXT",
"notNull": true,
"defaultValue": "'Local'"
},
{
"fieldPath": "state",
"columnName": "state",
"affinity": "TEXT",
"notNull": true
},
{
"fieldPath": "createdAt",
"columnName": "createdAt",
"affinity": "INTEGER",
"notNull": true,
"defaultValue": "0"
},
{
"fieldPath": "processedAt",
"columnName": "processedAt",
"affinity": "INTEGER",
"notNull": false
}
],
"primaryKey": {
"columnNames": [
"id"
],
"autoGenerate": false
},
"indices": [
{
"name": "index_Message_createdAt",
"unique": false,
"columnNames": [
"createdAt"
],
"orders": [],
"createSql": "CREATE INDEX IF NOT EXISTS `index_Message_createdAt` ON `${TABLE_NAME}` (`createdAt`)"
},
{
"name": "index_Message_processedAt",
"unique": false,
"columnNames": [
"processedAt"
],
"orders": [],
"createSql": "CREATE INDEX IF NOT EXISTS `index_Message_processedAt` ON `${TABLE_NAME}` (`processedAt`)"
}
],
"foreignKeys": []
},
{
"tableName": "MessageRecipient",
"createSql": "CREATE TABLE IF NOT EXISTS `${TABLE_NAME}` (`messageId` TEXT NOT NULL, `phoneNumber` TEXT NOT NULL, `state` TEXT NOT NULL, `error` TEXT, PRIMARY KEY(`messageId`, `phoneNumber`), FOREIGN KEY(`messageId`) REFERENCES `Message`(`id`) ON UPDATE NO ACTION ON DELETE CASCADE )",
"fields": [
{
"fieldPath": "messageId",
"columnName": "messageId",
"affinity": "TEXT",
"notNull": true
},
{
"fieldPath": "phoneNumber",
"columnName": "phoneNumber",
"affinity": "TEXT",
"notNull": true
},
{
"fieldPath": "state",
"columnName": "state",
"affinity": "TEXT",
"notNull": true
},
{
"fieldPath": "error",
"columnName": "error",
"affinity": "TEXT",
"notNull": false
}
],
"primaryKey": {
"columnNames": [
"messageId",
"phoneNumber"
],
"autoGenerate": false
},
"indices": [],
"foreignKeys": [
{
"table": "Message",
"onDelete": "CASCADE",
"onUpdate": "NO ACTION",
"columns": [
"messageId"
],
"referencedColumns": [
"id"
]
}
]
}
],
"views": [],
"setupQueries": [
"CREATE TABLE IF NOT EXISTS room_master_table (id INTEGER PRIMARY KEY,identity_hash TEXT)",
"INSERT OR REPLACE INTO room_master_table (id,identity_hash) VALUES(42, 'bfb73410aeb4903b9ea559c1b60bd8e3')"
]
}
}
4 changes: 3 additions & 1 deletion app/src/main/java/me/capcom/smsgateway/data/AppDatabase.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@ import me.capcom.smsgateway.data.entities.MessageRecipient

@Database(
entities = [Message::class, MessageRecipient::class],
version = 8,
version = 9,
autoMigrations = [
AutoMigration(from = 1, to = 2),
AutoMigration(from = 2, to = 3),
AutoMigration(from = 3, to = 4),
AutoMigration(from = 4, to = 5),
AutoMigration(from = 5, to = 6),
AutoMigration(from = 6, to = 7),
// AutoMigration(from = 7, to = 8), // manual migration
AutoMigration(from = 8, to = 9),
]
)
@TypeConverters(Converters::class)
Expand Down
7 changes: 7 additions & 0 deletions app/src/main/java/me/capcom/smsgateway/data/dao/MessageDao.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@ import androidx.room.Transaction
import me.capcom.smsgateway.data.entities.Message
import me.capcom.smsgateway.data.entities.MessageRecipient
import me.capcom.smsgateway.data.entities.MessageWithRecipients
import me.capcom.smsgateway.data.entities.ProcessedStats

@Dao
interface MessageDao {
@Query("SELECT COUNT(*) as count, MAX(processedAt) as lastTimestamp FROM message WHERE state <> 'Pending' AND state <> 'Failed' AND processedAt >= :timestamp")
fun countProcessedFrom(timestamp: Long): ProcessedStats

@Query("SELECT * FROM message ORDER BY createdAt DESC LIMIT 50")
fun selectLast(): LiveData<List<Message>>

Expand Down Expand Up @@ -39,6 +43,9 @@ interface MessageDao {
@Query("UPDATE message SET state = :state WHERE id = :id")
fun updateMessageState(id: String, state: Message.State)

@Query("UPDATE message SET state = 'Processed', processedAt = strftime('%s', 'now') * 1000 WHERE id = :id")
fun setMessageProcessed(id: String)

@Query("UPDATE messagerecipient SET state = :state, error = :error WHERE messageId = :id AND phoneNumber = :phoneNumber")
fun updateRecipientState(id: String, phoneNumber: String, state: Message.State, error: String?)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import androidx.room.PrimaryKey
import me.capcom.smsgateway.modules.messages.data.MessageSource
import java.util.Date

@Entity
@Entity(indices = [androidx.room.Index(value = ["createdAt"]), androidx.room.Index(value = ["processedAt"])])
data class Message(
@PrimaryKey val id: String,
val text: String,
Expand All @@ -24,7 +24,8 @@ data class Message(

val state: State = State.Pending,
@ColumnInfo(defaultValue = "0")
val createdAt: Long = System.currentTimeMillis(), // do we need index here for querying in UI?
val createdAt: Long = System.currentTimeMillis(),
val processedAt: Long? = null,
) {
enum class State {
Pending,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package me.capcom.smsgateway.data.entities

data class ProcessedStats(
val count: Int,
val lastTimestamp: Long
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import io.ktor.client.plugins.ClientRequestException
import io.ktor.http.HttpStatusCode
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import me.capcom.smsgateway.data.entities.Message
Expand All @@ -26,6 +26,8 @@ class GatewayModule(
private val settings: GatewaySettings,
) {
private var _api: GatewayApi? = null
private var _job: Job? = null

private val api
get() = _api ?: GatewayApi(
settings.privateUrl ?: GatewaySettings.PUBLIC_URL,
Expand All @@ -49,17 +51,15 @@ class GatewayModule(
PushService.register(context)
PullMessagesWorker.start(context)

scope.launch {
withContext(Dispatchers.IO) {
messagesService.events.events.collect { event ->
val event = event as? MessageStateChangedEvent ?: return@collect
if (event.source != MessageSource.Gateway) return@collect
_job = scope.launch {
messagesService.events.events.collect { event ->
val event = event as? MessageStateChangedEvent ?: return@collect
if (event.source != MessageSource.Gateway) return@collect

try {
sendState(event)
} catch (th: Throwable) {
th.printStackTrace()
}
try {
sendState(event)
} catch (th: Throwable) {
th.printStackTrace()
}
}
}
Expand All @@ -68,7 +68,8 @@ class GatewayModule(
fun isActiveLiveData(context: Context) = PullMessagesWorker.getStateLiveData(context)

fun stop(context: Context) {
scope.cancel()
_job?.cancel()
_job = null
PullMessagesWorker.stop(context)
this._api = null
}
Expand Down Expand Up @@ -207,6 +208,6 @@ class GatewayModule(

companion object {
private val job = SupervisorJob()
private val scope = CoroutineScope(job)
private val scope = CoroutineScope(job + Dispatchers.IO)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ class MessagesService(
}

if (state != message.message.state) {
dao.updateMessageState(message.message.id, state)
when (state) {
Message.State.Processed -> dao.setMessageProcessed(message.message.id)
else -> dao.updateMessageState(message.message.id, state)
}
}

return dao.get(id)
Expand Down Expand Up @@ -121,7 +124,12 @@ class MessagesService(
}

for (message in messages) {
sendMessage(message)
applyLimit()

if (!sendMessage(message)) {
// if message was not sent - don't need any delay before next message
continue
}

if (settings.secondsBetweenMessages > 0) {
delay((0..settings.secondsBetweenMessages).random() * 1000L)
Expand All @@ -131,25 +139,45 @@ class MessagesService(
return true
}

private suspend fun sendMessage(request: MessageWithRecipients) {
private suspend fun applyLimit() {
if (!settings.limitEnabled) {
return
}

val processedStats =
dao.countProcessedFrom(System.currentTimeMillis() - settings.limitPeriod.duration)
if (processedStats.count < settings.limitValue) {
return
}

delay(settings.limitPeriod.duration - (System.currentTimeMillis() - processedStats.lastTimestamp) + 1000L)
}

/**
* @return `true` if message was sent
*/
private suspend fun sendMessage(request: MessageWithRecipients): Boolean {
if (request.message.validUntil?.before(Date()) == true) {
updateState(request.message.id, null, Message.State.Failed, "TTL expired")
return
return false
}

if (request.state != Message.State.Pending) {
// не ясно когда такая ситуация может возникнуть
Log.w(this.javaClass.simpleName, "Unexpected state for message: $request")
updateState(request.message.id, null, request.state)
return
return false
}

try {
sendSMS(request)
return true
} catch (e: Exception) {
e.printStackTrace()
updateState(request.message.id, null, Message.State.Failed, "Sending: " + e.message)
}

return false
}

private suspend fun updateState(
Expand Down
Loading

0 comments on commit 097b835

Please sign in to comment.