Skip to content

Commit

Permalink
cats-effect 3 and updating for the PR2219
Browse files Browse the repository at this point in the history
  • Loading branch information
voropaevp committed Aug 31, 2021
1 parent 2052434 commit 24dcab9
Show file tree
Hide file tree
Showing 22 changed files with 398 additions and 832 deletions.
188 changes: 68 additions & 120 deletions README.md

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ services:
- sqlserver:sqlserver
- oracle:oracle
volumes:
- ./:/app:delegated
- ~/.ivy2:/root/.ivy2:cached
- ~/.m2:/root/.m2:cached
- ~/.sbt:/root/.sbt:cached
- ~/.cache/coursier:/root/.cache/coursier:cached
- ./:/app
- ./.ivy2:/home/sbtuser/.ivy2:cached
- ./.m2:/home/sbtuser/.m2:cached
- ./.sbt:/home/sbtuser/.sbt:cached
- ./.cache/coursier:/home/sbtuser/.cache/coursier:cached
environment:
- SBT_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dfile.encoding=UTF-8 -Xms512m -Xmx1536m -Xss2m -XX:ReservedCodeCacheSize=256m -XX:+TieredCompilation -XX:+CMSClassUnloadingEnabled -XX:+UseConcMarkSweepGC
- SBT_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dfile.encoding=UTF-8 -Xms512m -Xmx5g -Xss2m -XX:ReservedCodeCacheSize=256m -XX:+TieredCompilation -XX:+CMSClassUnloadingEnabled -XX:+UseConcMarkSweepGC
- POSTGRES_HOST=postgres
- POSTGRES_PORT=5432
- MYSQL_HOST=mysql
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,35 @@
package io.getquill

import com.datastax.driver.core._
import io.getquill.context.cassandra.CqlIdiom
import io.getquill.util.{ ContextLogger, LoadConfig }
import cats.effect._
import cats._
import io.getquill.util.GuavaCeUtils._
import cats.effect._
import cats.syntax.all._
import com.typesafe.config.Config
import io.getquill.context.ce.CeContext
import fs2.Stream

import com.typesafe.config.Config
import scala.jdk.CollectionConverters._
import io.getquill.util.GuavaCeUtils._
import io.getquill.util.{ContextLogger, LoadConfig}
import io.getquill.context.cassandra.CqlIdiom
import io.getquill.context.{CassandraSession, SyncCache, ExecutionInfo}
import io.getquill.context.ce.CeContext
import com.datastax.driver.core._
import scala.language.higherKinds

class CassandraCeContext[N <: NamingStrategy, F[_]](
naming: N,
cluster: Cluster,
keyspace: String,
preparedStatementCacheSize: Long
)(implicit val af: Async[F])
naming: N,
cluster: Cluster,
keyspace: String,
preparedStatementCacheSize: Long
)(implicit val af: Async[F])
extends CassandraClusterSessionContext[N](naming, cluster, keyspace, preparedStatementCacheSize)
with CeContext[CqlIdiom, N, F] {
with CeContext[CqlIdiom, N, F] {

private val logger = ContextLogger(classOf[CassandraCeContext[_, F]])

private def prepareRowAndLog(cql: String, prepare: Prepare = identityPrepare): F[PrepareRow] = for {
private[getquill] def prepareRowAndLog(cql: String, prepare: Prepare = identityPrepare): F[PrepareRow] = for {
ec <- Async[F].executionContext
futureStatement = Sync[F].delay(super.prepareAsync(cql)(ec))
futureStatement = Sync[F].delay(prepareAsync(cql)(ec))
prepStatement <- Async[F].fromFuture(futureStatement)
(params, bs) = prepare(prepStatement)
(params, bs) = prepare(prepStatement, this)
_ <- Sync[F].delay(logger.logQuery(cql, params))
} yield bs

Expand All @@ -50,48 +50,44 @@ class CassandraCeContext[N <: NamingStrategy, F[_]](
}
} yield it

def streamQuery[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor): Stream[F, T] = {
def streamQuery[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: DatasourceContext): StreamResult[T] = {
Stream
.eval(prepareRowAndLog(cql, prepare))
.evalMap(p => af.delay(session.executeAsync(p)).toAsync)
.flatMap(rs => Stream.repeatEval(page(rs)))
.takeWhile(_.nonEmpty)
.flatMap(Stream.iterable)
.map(extractor)
.map(it => extractor(it, this))
}

def executeQuery[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor): Result[RunQueryResult[T]] =
streamQuery[T](cql, prepare, extractor).compile.toList
def executeQuery[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: DatasourceContext): Result[RunQueryResult[T]] =
streamQuery[T](cql, prepare, extractor)(info, dc).compile.toList

def executeQuerySingle[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor): Result[RunQuerySingleResult[T]] =
Functor[F].map(executeQuery(cql, prepare, extractor))(handleSingleResult)
def executeQuerySingle[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: DatasourceContext): Result[RunQuerySingleResult[T]] =
Functor[F].map(executeQuery(cql, prepare, extractor)(info, dc))(handleSingleResult)

def executeAction(cql: String, prepare: Prepare = identityPrepare): Result[Unit] = {
def executeAction(cql: String, prepare: Prepare = identityPrepare)(info: ExecutionInfo, dc: DatasourceContext): Result[RunActionResult] = {
prepareRowAndLog(cql, prepare)
.flatMap(r => af.delay(session.executeAsync(r)).toAsync)
.map(_ => ())
}

def executeBatchAction(groups: List[BatchGroup]): Result[Unit] =
Stream.iterable(groups)
.flatMap {
case BatchGroup(cql, prepare) =>
Stream.iterable(prepare)
.flatMap(prep => Stream.eval(executeAction(cql, prep)))
.map(_ => ())
}.compile.drain

def executeBatchAction(groups: List[BatchGroup])(info: ExecutionInfo, dc: DatasourceContext): Result[RunBatchActionResult] =
groups.traverse_ {
case BatchGroup(cql, prepare) =>
prepare.traverse_(executeAction(cql, _)(info, dc))
}
}

object CassandraCeContext {

def apply[N <: NamingStrategy, F[_]: Async: FlatMap](naming: N, config: CassandraContextConfig): CassandraCeContext[N, F] =
def apply[N <: NamingStrategy, F[_] : Async : FlatMap](naming: N, config: CassandraContextConfig): CassandraCeContext[N, F] =
new CassandraCeContext(naming, config.cluster, config.keyspace, config.preparedStatementCacheSize)

def apply[N <: NamingStrategy, F[_]: Async: FlatMap](naming: N, config: Config): CassandraCeContext[N, F] =
def apply[N <: NamingStrategy, F[_] : Async : FlatMap](naming: N, config: Config): CassandraCeContext[N, F] =
CassandraCeContext(naming, CassandraContextConfig(config))

def apply[N <: NamingStrategy, F[_]: Async: FlatMap](naming: N, configPrefix: String): CassandraCeContext[N, F] =
def apply[N <: NamingStrategy, F[_] : Async : FlatMap](naming: N, configPrefix: String): CassandraCeContext[N, F] =
CassandraCeContext(naming, LoadConfig(configPrefix))

}
Original file line number Diff line number Diff line change
@@ -1,35 +1,35 @@
package io.getquill

import com.datastax.driver.core._
import io.getquill.context.cassandra.CqlIdiom
import io.getquill.util.{ ContextLogger, LoadConfig }
import cats.effect._
import cats._
import io.getquill.util.GuavaCeUtils._
import cats.effect._
import cats.syntax.all._
import com.typesafe.config.Config
import io.getquill.context.ce.CeContext
import fs2.Stream

import com.typesafe.config.Config
import scala.jdk.CollectionConverters._
import io.getquill.util.GuavaCeUtils._
import io.getquill.util.{ContextLogger, LoadConfig}
import io.getquill.context.cassandra.CqlIdiom
import io.getquill.context.{CassandraSession, SyncCache, ExecutionInfo}
import io.getquill.context.ce.CeContext
import com.datastax.driver.core._
import scala.language.higherKinds

class CassandraCeContext[N <: NamingStrategy, F[_]](
naming: N,
cluster: Cluster,
keyspace: String,
preparedStatementCacheSize: Long
)(implicit val af: Async[F])
naming: N,
cluster: Cluster,
keyspace: String,
preparedStatementCacheSize: Long
)(implicit val af: Async[F])
extends CassandraClusterSessionContext[N](naming, cluster, keyspace, preparedStatementCacheSize)
with CeContext[CqlIdiom, N, F] {
with CeContext[CqlIdiom, N, F] {

private val logger = ContextLogger(classOf[CassandraCeContext[_, F]])

private def prepareRowAndLog(cql: String, prepare: Prepare = identityPrepare): F[PrepareRow] = for {
private[getquill] def prepareRowAndLog(cql: String, prepare: Prepare = identityPrepare): F[PrepareRow] = for {
ec <- Async[F].executionContext
futureStatement = Sync[F].delay(super.prepareAsync(cql)(ec))
futureStatement = Sync[F].delay(prepareAsync(cql)(ec))
prepStatement <- Async[F].fromFuture(futureStatement)
(params, bs) = prepare(prepStatement)
(params, bs) = prepare(prepStatement, this)
_ <- Sync[F].delay(logger.logQuery(cql, params))
} yield bs

Expand All @@ -50,44 +50,44 @@ class CassandraCeContext[N <: NamingStrategy, F[_]](
}
} yield it

def streamQuery[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor): Stream[F, T] = {
def streamQuery[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: DatasourceContext): StreamResult[T] = {
Stream
.eval(prepareRowAndLog(cql, prepare))
.evalMap(p => af.delay(session.executeAsync(p)).toAsync)
.flatMap(rs => Stream.repeatEval(page(rs)))
.takeWhile(_.nonEmpty)
.flatMap(Stream.iterable)
.map(extractor)
.map(it => extractor(it, this))
}

def executeQuery[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor): Result[RunQueryResult[T]] =
streamQuery[T](cql, prepare, extractor).compile.toList
def executeQuery[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: DatasourceContext): Result[RunQueryResult[T]] =
streamQuery[T](cql, prepare, extractor)(info, dc).compile.toList

def executeQuerySingle[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor): Result[RunQuerySingleResult[T]] =
Functor[F].map(executeQuery(cql, prepare, extractor))(handleSingleResult)
def executeQuerySingle[T](cql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor)(info: ExecutionInfo, dc: DatasourceContext): Result[RunQuerySingleResult[T]] =
Functor[F].map(executeQuery(cql, prepare, extractor)(info, dc))(handleSingleResult)

def executeAction(cql: String, prepare: Prepare = identityPrepare): Result[Unit] = {
def executeAction(cql: String, prepare: Prepare = identityPrepare)(info: ExecutionInfo, dc: DatasourceContext): Result[RunActionResult] = {
prepareRowAndLog(cql, prepare)
.flatMap(r => af.delay(session.executeAsync(r)).toAsync)
.map(_ => ())
}

def executeBatchAction(groups: List[BatchGroup]): Result[Unit] =
groups.traverse_ { case BatchGroup(cql, prepare) =>
prepare.traverse_(executeAction(cql, _))
def executeBatchAction(groups: List[BatchGroup])(info: ExecutionInfo, dc: DatasourceContext): Result[RunBatchActionResult] =
groups.traverse_ {
case BatchGroup(cql, prepare) =>
prepare.traverse_(executeAction(cql, _)(info, dc))
}

}

object CassandraCeContext {

def apply[N <: NamingStrategy, F[_]: Async: FlatMap](naming: N, config: CassandraContextConfig): CassandraCeContext[N, F] =
def apply[N <: NamingStrategy, F[_] : Async : FlatMap](naming: N, config: CassandraContextConfig): CassandraCeContext[N, F] =
new CassandraCeContext(naming, config.cluster, config.keyspace, config.preparedStatementCacheSize)

def apply[N <: NamingStrategy, F[_]: Async: FlatMap](naming: N, config: Config): CassandraCeContext[N, F] =
def apply[N <: NamingStrategy, F[_] : Async : FlatMap](naming: N, config: Config): CassandraCeContext[N, F] =
CassandraCeContext(naming, CassandraContextConfig(config))

def apply[N <: NamingStrategy, F[_]: Async: FlatMap](naming: N, configPrefix: String): CassandraCeContext[N, F] =
def apply[N <: NamingStrategy, F[_] : Async : FlatMap](naming: N, configPrefix: String): CassandraCeContext[N, F] =
CassandraCeContext(naming, LoadConfig(configPrefix))

}
Loading

0 comments on commit 24dcab9

Please sign in to comment.