Skip to content

Commit

Permalink
feat(*): Migrate from RxJava2 to Reactor
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Jan 30, 2024
1 parent be18c59 commit 412eacd
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 7 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ subprojects {
compileOnly "io.micronaut:micronaut-inject"
compileOnly "io.micronaut.validation:micronaut-validation"
compileOnly "io.micronaut:micronaut-jackson-databind"
compileOnly "io.micronaut.rxjava2:micronaut-rxjava2"
compileOnly "io.micronaut.reactor:micronaut-reactor"

// kestra
compileOnly group: "io.kestra", name: "core", version: kestraVersion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
Expand All @@ -23,6 +21,10 @@
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import jakarta.validation.constraints.NotNull;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import static io.kestra.core.utils.Rethrow.throwFunction;


@SuperBuilder
Expand Down Expand Up @@ -91,12 +93,12 @@ public Output run(RunContext runContext) throws Exception {
) {
connection.setAutoCommit(false);

Flowable<Integer> flowable = Flowable.create(FileSerde.reader(bufferedReader), BackpressureStrategy.BUFFER)
Flux<Integer> flowable = Flux.create(FileSerde.reader(bufferedReader), FluxSink.OverflowStrategy.BUFFER)
.doOnNext(docWriteRequest -> {
count.incrementAndGet();
})
.buffer(this.chunk, this.chunk)
.map(o -> {
.map(throwFunction(o -> {
PreparedStatement ps = connection.prepareStatement(sql);
ParameterType parameterMetaData = ParameterType.of(ps.getParameterMetaData());

Expand All @@ -111,11 +113,11 @@ public Output run(RunContext runContext) throws Exception {
connection.commit();

return Arrays.stream(updatedRows).sum();
});
}));

Integer updated = flowable
.reduce(Integer::sum)
.blockingGet();
.block();

runContext.metric(Counter.of("records", count.get()));
runContext.metric(Counter.of("updated", updated == null ? 0 : updated));
Expand Down

0 comments on commit 412eacd

Please sign in to comment.