Skip to content
This repository has been archived by the owner on Nov 19, 2024. It is now read-only.

25 use skunk client in quill connector #29

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ on:
env:
PG_PASSWORD: test
PG_HOST_PORT: 'localhost:5432'
PG_HOST: localhost
PG_PORT: 5432
PG_USER: postgres
PG_DBNAME: finagle_postgres_test
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
Expand Down
14 changes: 11 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// https://typelevel.org/sbt-typelevel/faq.html#what-is-a-base-version-anyway
ThisBuild / tlBaseVersion := "0.16" // your current series x.y
ThisBuild / tlBaseVersion := "0.17" // your current series x.y

ThisBuild / organization := "io.github.deal-engine"
ThisBuild / organizationName := "Deal Engine"
Expand All @@ -8,7 +8,8 @@ ThisBuild / licenses := Seq(License.Apache2)
ThisBuild / developers := List(
// your GitHub handle and name
tlGitHubDev("ivanmoreau", "Iván Molina Rebolledo"),
tlGitHubDev("IvanAtDealEngine", "Iván Molina Rebolledo")
tlGitHubDev("IvanAtDealEngine", "Iván Molina Rebolledo"),
tlGitHubDev("fabianhjr-dealengine", "Fabián Heredia Montiel")
)

// publish to s01.oss.sonatype.org (set to true to publish to oss.sonatype.org instead)
Expand All @@ -31,6 +32,8 @@ ThisBuild / githubWorkflowBuildPreamble ++= Seq(
)
ThisBuild / githubWorkflowEnv ++= Map(
"PG_HOST_PORT" -> "localhost:5432",
"PG_HOST" -> "localhost",
"PG_PORT" -> "5432",
"PG_USER" -> "postgres",
"PG_DBNAME" -> "finagle_postgres_test",
"PG_PASSWORD" -> "test"
Expand Down Expand Up @@ -59,6 +62,7 @@ lazy val root =
`finagle-postgres`,
`finagle-postgres-shapeless`,
`finagle-postgres-quill`,
`finagle-postgres-skunk`,
`weaver-twitter-future`,
`weaver-twitter-future-core`
)
Expand Down Expand Up @@ -106,7 +110,11 @@ lazy val `finagle-postgres-quill` = crossProject(JVMPlatform)
"org.scalacheck" %% "scalacheck" % Versions.scalacheck % Test
)
)
.dependsOn(`finagle-postgres`, `weaver-twitter-future` % Test)
.dependsOn(
`finagle-postgres`,
`weaver-twitter-future` % Test,
`finagle-postgres-skunk` % Test
)

lazy val `finagle-postgres-shapeless` = crossProject(JVMPlatform)
.crossType(CrossType.Pure)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import skunk.util.Origin
*/

class PostgresClientImpl(sessionR: Resource[IO, Session[IO]])
extends PostgresClient {
extends PostgresClient { self =>
protected val isActive: AtomicBoolean = new AtomicBoolean(true)
protected val isTransaction: Boolean = false

private val alloc: Future[(Session[IO], IO[Unit])] =
Util.runIO(sessionR.allocated)
Expand All @@ -47,19 +48,24 @@ class PostgresClientImpl(sessionR: Resource[IO, Session[IO]])

override def charset: Charset = StandardCharsets.UTF_8

override def inTransaction[T](fn: PostgresClient => Future[T]): Future[T] =
sessionFuture.flatMap { session =>
Util
.runIO(session.transaction.use { _ =>
IO {
val tIsActive = isActive
fn(new PostgresClientImpl(resourceUnreleased) {
override val isActive: AtomicBoolean = tIsActive
})
}
})
.flatten
}
override def inTransaction[T](fn: PostgresClient => Future[T]): Future[T] = {
if (!isTransaction) {
sessionFuture.flatMap { session =>
Util
.runIO(session.transaction.use { _ =>
IO {
fn(new PostgresClientImpl(resourceUnreleased) {
override val isActive: AtomicBoolean = self.isActive
override val isTransaction: Boolean = true
})
}
})
.flatten
}
// Allow nested transactions, probably a bad idea c:
// Deal engine be like https://www.youtube.com/watch?v=5fbZTnZDvPA
} else { fn(self) }
}

val decoder: Decoder[Row] = new Decoder[Row] {
override def types: List[Type] = List.empty
Expand Down Expand Up @@ -149,7 +155,7 @@ class PostgresClientImpl(sessionR: Resource[IO, Session[IO]])
override def executeUpdate(sql: String): Future[OK] = execute(sql)

override def execute(sql: String): Future[OK] = sessionFuture.flatMap {
session =>
session => ??? /*
val command: Command[Void] =
Command(sql = sql, origin = Origin.unknown, encoder = skunk.Void.codec)
Util.runIO(session.execute(command).map {
Expand All @@ -160,6 +166,7 @@ class PostgresClientImpl(sessionR: Resource[IO, Session[IO]])
case Completion.Copy(count) => OK(count)
case _ => OK(0)
})
*/
}

override def selectToStream[T](sql: String)(f: Row => T): AsyncStream[T] = {
Expand Down Expand Up @@ -219,13 +226,13 @@ class PostgresClientImpl(sessionR: Resource[IO, Session[IO]])
isDynamic = true
)

AsyncStream.fromFuture {
sessionFuture.flatMap { session =>
Util.runIO(session.prepare(query).flatMap { pc =>
fs2IO2Async(pc.stream(params, 6).map(f))
})
}
}.flatten
AsyncStream {
for {
session <- Util.toIO(sessionFuture)
pq <- session.prepare(query)
r <- fs2IO2Async(pq.stream(params, 6))
} yield r.map(f)
}
}

override def prepareAndExecute(sql: String, params: Param[_]*): Future[Int] =
Expand All @@ -236,16 +243,16 @@ class PostgresClientImpl(sessionR: Resource[IO, Session[IO]])
encoder = encoder
)

Util.runIO(session.prepare(query).flatMap { pc =>
pc.execute(params).map {
case Completion.Insert(count) => count
case Completion.Delete(count) => count
case Completion.Select(count) => count
case Completion.Update(count) => count
case Completion.Copy(count) => count
case _ => 0
Util.runIO(for {
pc <- session.prepare(query)
r <- pc.execute(params)
rr <- r match {
case Completion.Insert(count) => IO.pure(count)
case Completion.Delete(count) => IO.pure(count)
case Completion.Update(count) => IO.pure(count)
case _ => IO.raiseError(new IllegalArgumentException("shouldn't query in command code"))
}
})
} yield rr)
}

/** Close the underlying connection pool and make this Client eternally down
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,7 @@ package finagle_postgres.skunk

import cats.effect.IO
import cats.effect.unsafe.{IORuntime, IORuntimeConfig, Scheduler}
import com.twitter.util.{
Duration,
Future,
FuturePool,
JavaTimer,
Monitor,
Promise
}
import com.twitter.util.{Duration, Future, FuturePool, JavaTimer, Monitor, Promise, Return, Throw}

import java.time.Instant
import java.time.temporal.ChronoField
Expand All @@ -36,12 +29,13 @@ object Util {
// From cats
private val scheduler: Scheduler = new Scheduler {
override def sleep(delay: FiniteDuration, task: Runnable): Runnable =
() => Future.Unit.flatMap { _ =>
implicit val timer: JavaTimer = new JavaTimer()
Future.sleep(Duration.fromMilliseconds(delay.toMillis)).flatMap { _ =>
Future { task.run() }
() =>
Future.Unit.flatMap { _ =>
implicit val timer: JavaTimer = new JavaTimer()
Future.sleep(Duration.fromMilliseconds(delay.toMillis)).flatMap { _ =>
Future { task.run() }
}
}
}

override def nowMillis(): Long = System.currentTimeMillis()

Expand Down Expand Up @@ -72,4 +66,13 @@ object Util {

promise
}

def toIO[A](x: Future[A]): IO[A] = {
IO.async_[A] { cb =>
x.respond {
case Throw(e) => cb(Left(e))
case Return(r) => cb(Right(r))
}
}.evalOn(computeEC)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@ trait FinaglePostgresDecoders {

case class FinaglePostgresDecoder[T](
vd: ValueDecoder[T],
default: Throwable => T = (e: Throwable) => fail(e.getMessage)
withRescue: Throwable => T = (e: Throwable) =>
e match {
case (e: IllegalStateException) if e.getMessage == "Value is NULL" =>
null.asInstanceOf[T]
case _ => fail(e.getMessage)
}
) extends BaseDecoder[T] {
override def apply(index: Index, row: ResultRow, session: Session): T =
row.getTry[T](index)(vd) match {
case Return(r) => r
case Throw(e) => default(e)
case Throw(e) => withRescue(e)
}

def orElse[U](
Expand Down Expand Up @@ -59,17 +64,26 @@ trait FinaglePostgresDecoders {
def decoderMapped[U, T](f: U => T)(implicit vd: ValueDecoder[U]): Decoder[T] =
FinaglePostgresDecoder(vd.map[T](f))

implicit def optionDecoder[T](implicit d: Decoder[T]): Decoder[Option[T]] =
fabianhjr-dealengine marked this conversation as resolved.
Show resolved Hide resolved
implicit def optionDecoder[T](implicit vd: Decoder[T]): Decoder[Option[T]] =
FinaglePostgresDecoder[Option[T]](
new ValueDecoder[Option[T]] {
def decodeText(recv: String, text: String): Try[Option[T]] =
Return(d.vd.decodeText(recv, text).toOption)
def decodeText(recv: String, text: String): Try[Option[T]] = {
vd.vd.decodeText(recv, text).map(Option(_)).handle {
case (e: IllegalStateException)
if e.getMessage == "Value is NULL" =>
None
}
}
def decodeBinary(
recv: String,
bytes: ByteBuf,
charset: Charset
): Try[Option[T]] =
Return(d.vd.decodeBinary(recv, bytes, charset).toOption)
vd.vd.decodeBinary(recv, bytes, charset).map(Option(_)).handle {
case (e: IllegalStateException)
if e.getMessage == "Value is NULL" =>
None
}
},
_ => None
)
Expand Down
37 changes: 15 additions & 22 deletions quill-finagle-postgres/src/test/scala/io/getquill/Test.scala
Original file line number Diff line number Diff line change
@@ -1,23 +1,12 @@
package io.getquill

import cats.effect.{IO, Resource}

import com.twitter.finagle.{Postgres, Status}
import com.twitter.finagle.postgres.{
OK,
Param,
PostgresClient,
PostgresClientImpl,
QueryResponse,
Row
}
import com.twitter.util.Future

import com.twitter.finagle.Postgres
import com.twitter.finagle.postgres.PostgresClientImpl
import com.twitter.util.Await
import com.typesafe.config.Config
import weaver.*

case class TableTest(id: Int, name: String)

class PgClient(override val config: Config)
extends FinaglePostgresContextConfig(config)

Expand Down Expand Up @@ -55,20 +44,24 @@ object Utils {
}

object Test extends MutableTwitterFutureSuite {
case class TableTest(id: Int, name: Option[String])

override type Res = FinaglePostgresContext[SnakeCase.type]
override def sharedResource: Resource[IO, Res] =
Resource.make {
Utils.getClient.map { client =>
val ctx = new FinaglePostgresContext[SnakeCase.type](SnakeCase, client)
val _ = {
val a = {
import ctx.*
ctx.run(sql"DROP TABLE IF EXISTS table_test".as[Insert[Unit]])
ctx.run(
sql"CREATE TABLE table_test (id integer, name text)"
.as[Insert[Unit]]
)
for {
_ <- ctx.run(sql"DROP TABLE IF EXISTS table_test".as[Insert[Unit]])
_ <- ctx.run(
sql"CREATE TABLE table_test (id integer, name text)"
.as[Insert[Unit]]
)
} yield ()
}
Await.result(a)
ctx
}
} { client =>
Expand All @@ -79,10 +72,10 @@ object Test extends MutableTwitterFutureSuite {
import ctx.*
val results = ctx.transaction {
for {
_ <- ctx.run(query[TableTest].insertValue(TableTest(0, "hola")))
_ <- ctx.run(query[TableTest].insertValue(TableTest(0, Some("hola"))))
result <- ctx.run(query[TableTest].filter(_.id == 0))
} yield result
}
results.map(res => expect(res.head == TableTest(0, "hola")))
results.map(res => expect(res.head == TableTest(0, Some("hola"))))
}
}
Loading