Skip to content

Commit

Permalink
Merge pull request #960: [direct-io-cassandra] Upgrade CQL client
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik authored Feb 4, 2025
2 parents 0897299 + 9b65b70 commit bfdbd08
Show file tree
Hide file tree
Showing 16 changed files with 269 additions and 376 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundE

@Override
protected URL findResource(String moduleName, String name) {
return Objects.requireNonNull(getModuleLoader(moduleName)).findResource(name);
ChildFirstURLClassLoader cl = Objects.requireNonNull(getModuleLoader(moduleName));
log.debug("Looking for resource {} in module {} using classLoader {}", name, moduleName, cl);
return cl.findResource(name);
}

@Override
Expand Down Expand Up @@ -133,6 +135,11 @@ protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundE
return loadClass(name, resolve, true);
}

@Override
protected URL findResource(String moduleName, String name) throws IOException {
return super.findResource(null, name);
}

public Class<?> loadClassFromSelf(String name, boolean resolve) throws ClassNotFoundException {
return loadClass(name, resolve, false);
}
Expand Down
7 changes: 5 additions & 2 deletions direct/io-cassandra/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ plugins {
id 'cz.o2.proxima.java-conventions'
}

def cassandra_version = "4.18.1"

dependencies {
api project(':proxima-core')
api project(':proxima-direct-core')
implementation project(path: ':proxima-io-serialization', configuration: "shadowJar")
implementation libraries.slf4j_api;
intoShadow 'com.datastax.cassandra:cassandra-driver-core:3.11.0'
intoShadow "org.apache.cassandra:java-driver-core:${cassandra_version}"
intoShadow 'io.netty:netty-handler:4.1.42.Final'
intoShadow 'commons-codec:commons-codec:1.10'
provided "com.google.j2objc:j2objc-annotations:2.8"
Expand Down Expand Up @@ -54,7 +56,8 @@ enableShadowJar(project) {
shadowJar {
relocators.clear()
["io.netty", "io.dropwizard", "com.github.jnr", "com.google", "jnr", "com.kenai",
"org.apache.commons", "org.objectweb", "org.codehaus", "com.codahale", "com.fasterxml"].each {
"org.apache.commons", "org.objectweb", "org.codehaus", "com.codahale", "com.fasterxml",
"com.typesafe", "org.reactivestreams", "org.HdrHistogram", "net.jcip", "edu.umd"].each {

relocate it, "cz.o2.proxima.cassandra.shaded.${it}"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
*/
package cz.o2.proxima.direct.io.cassandra;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import cz.o2.proxima.core.repository.AttributeDescriptor;
import cz.o2.proxima.core.repository.EntityDescriptor;
import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.core.storage.UriUtil;
import cz.o2.proxima.io.serialization.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.io.serialization.shaded.com.google.common.base.Strings;
import java.net.URI;
import java.util.Collections;
import java.util.LinkedHashMap;
Expand All @@ -45,7 +45,7 @@ public abstract class CacheableCqlFactory implements CqlFactory {
@Nullable private String payloadCol;

/** The connection session in use. */
@Getter @Nullable transient Session current = null;
@Getter @Nullable transient CqlSession current = null;

/** A TTL value in seconds associated with each update or insert. */
protected long ttl = 0;
Expand Down Expand Up @@ -141,8 +141,7 @@ protected void setup(Map<String, String> query, StringConverter<?> converter) {}
* @param what data to ingest
* @return the statement to use in order to store the data
*/
protected PreparedStatement getPreparedStatement(Session session, StreamElement what) {

protected PreparedStatement getPreparedStatement(CqlSession session, StreamElement what) {
if (what.isDelete()) {
PreparedStatement cached;
if (what.isDeleteWildcard()) {
Expand Down Expand Up @@ -178,7 +177,7 @@ protected PreparedStatement getPreparedStatement(Session session, StreamElement
* @return the statement to use in order to read the data
*/
protected PreparedStatement getPreparedGetStatement(
Session session, String attribute, AttributeDescriptor<?> desc) {
CqlSession session, String attribute, AttributeDescriptor<?> desc) {

return getCache.computeIfAbsent(
desc,
Expand All @@ -198,13 +197,13 @@ protected PreparedStatement getPreparedGetStatement(
* @return the statement to use in order to read the data
*/
protected PreparedStatement getPreparedListStatement(
Session session, AttributeDescriptor<?> wildcardAttribute) {
CqlSession session, AttributeDescriptor<?> wildcardAttribute) {

return listCache.computeIfAbsent(
wildcardAttribute, k -> prepare(session, createListStatement(wildcardAttribute)));
}

protected PreparedStatement getPreparedListAllStatement(Session session) {
protected PreparedStatement getPreparedListAllStatement(CqlSession session) {
if (listAllAttributes == null) {
listAllAttributes = prepare(session, createListAllStatement(session));
}
Expand Down Expand Up @@ -276,7 +275,7 @@ protected PreparedStatement getPreparedListAllStatement(Session session) {
* @param session the connection session
* @return string representation of the CQL
*/
protected abstract String createListAllStatement(Session session);
protected abstract String createListAllStatement(CqlSession session);

/** Clear the cache (e.g. on reconnects). */
protected void clearCache() {
Expand Down Expand Up @@ -319,7 +318,8 @@ String toPayloadCol(AttributeDescriptor<?> attr) {
}

@Override
public BoundStatement getListEntitiesStatement(Offsets.Token offset, int limit, Session session) {
public BoundStatement getListEntitiesStatement(
Offsets.TokenOffset offset, int limit, CqlSession session) {

if (listEntities == null) {
listEntities = prepare(session, createListEntitiesStatement());
Expand All @@ -332,21 +332,21 @@ public BoundStatement getListEntitiesStatement(Offsets.Token offset, int limit,
}

@Override
public BoundStatement getFetchTokenStatement(String key, Session session) {
public BoundStatement getFetchTokenStatement(String key, CqlSession session) {
if (fetchToken == null) {
fetchToken = prepare(session, createFetchTokenStatement());
}
return fetchToken.bind(key);
}

void ensureSession(Session session) {
void ensureSession(CqlSession session) {
if (this.current != session) {
clearCache();
current = session;
}
}

static PreparedStatement prepare(Session session, String statement) {
static PreparedStatement prepare(CqlSession session, String statement) {
log.debug("Trying to prepare statement {}", statement);
PreparedStatement ret = session.prepare(statement);
log.info("Prepared statement {} as {}", statement, ret);
Expand Down
Loading

0 comments on commit bfdbd08

Please sign in to comment.