Skip to content

Commit

Permalink
Merge pull request #7 from primenumber-dev/main
Browse files Browse the repository at this point in the history
Add logger interval option
  • Loading branch information
civitaspo authored May 27, 2022
2 parents 8d0e69c + 36afd6e commit d97310a
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ An embulk output plugin to egest records as json with [`jq`](https://github.com/
- **default_timezone**: Default timezone. (string, default: `"UTC"`)
- **default_timestamp_format**: Default timestamp format. (string, default: `"%Y-%m-%d %H:%M:%S %z"`)
- **default_date**: Default date. (string, default: `"1970-01-01"`)
- **logging_interval**: Progress log output interval. For example, output progress for every 10 seconds if set '10s' or never output progress log if set `0s`. (string, default: `0s`)

### About `transformer_jq`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,5 +117,9 @@ public interface PluginTask extends RestClientOutputTaskBase {
@ConfigDefault("\"1970-01-01\"")
@Pattern(regexp = "^\\d{4}-\\d{2}-\\d{2}$")
public String getDefaultDate();

@Config("logging_interval")
@ConfigDefault("\"0s\"")
public String getLoggingInterval();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.embulk.output.http_json.jq.IllegalJQProcessingException;
import org.embulk.output.http_json.jq.InvalidJQFilterException;
import org.embulk.output.http_json.jq.JQ;
import org.embulk.output.http_json.util.Durations;
import org.embulk.output.http_json.util.ProgressLogger;
import org.embulk.output.http_json.validator.BeanValidator;
import org.embulk.spi.DataException;
import org.embulk.spi.Schema;
Expand All @@ -47,18 +49,25 @@ public class HttpJsonOutputPluginDelegate

private final ConfigMapperFactory configMapperFactory;

private static ProgressLogger progressLogger;

public HttpJsonOutputPluginDelegate(ConfigMapperFactory configMapperFactory) {
this.configMapperFactory = configMapperFactory;
}

@Override
public void validateOutputTask(PluginTask task, Schema embulkSchema, int taskCount) {
configureTask(task);
BeanValidator.validate(task);
validateJsonQuery("transformer_jq", task.getTransformerJq());
validateJsonQuery("retryable_condition_jq", task.getRetryableConditionJq());
validateJsonQuery("success_condition_jq", task.getSuccessConditionJq());
}

private void configureTask(PluginTask task) {
progressLogger = new ProgressLogger(Durations.parseDuration(task.getLoggingInterval()));
}

private void validateJsonQuery(String name, String jqFilter) {
try {
jq.validateFilter(jqFilter);
Expand Down Expand Up @@ -97,6 +106,7 @@ public RecordBuffer buildRecordBuffer(PluginTask task, Schema schema, int taskIn
@Override
public ConfigDiff egestEmbulkData(
PluginTask task, Schema schema, int taskCount, List<TaskReport> taskReports) {
progressLogger.finish();
taskReports.forEach(report -> logger.info(report.toString()));
return configMapperFactory.newConfigDiff();
}
Expand All @@ -111,7 +121,10 @@ private TimestampFormatter buildTimestampFormatter(PluginTask task) {
private <A, R> List<R> eachSlice(List<A> list, int sliceSize, Function<List<A>, R> function) {
List<R> resultBuilder = new ArrayList<>();
for (int i = 0; i < list.size(); i += sliceSize) {
long start = System.currentTimeMillis();
R result = function.apply(list.subList(i, Integer.min(i + sliceSize, list.size())));
progressLogger.incrementRequestCount();
progressLogger.addElapsedTime(System.currentTimeMillis() - start);
resultBuilder.add(result);
}
return Collections.unmodifiableList(resultBuilder);
Expand Down
48 changes: 48 additions & 0 deletions src/main/java/org/embulk/output/http_json/util/Durations.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.embulk.output.http_json.util;

import java.time.Duration;
import java.time.format.DateTimeParseException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class Durations {
private static final Pattern PATTERN =
Pattern.compile(
"\\s*(?:(?<days>\\d+)\\s*d)?\\s*(?:(?<hours>\\d+)\\s*h)?\\s*(?:(?<minutes>\\d+)\\s*m)?\\s*(?:(?<seconds>\\d+)\\s*s)?\\s*",
Pattern.CASE_INSENSITIVE);

public static Duration parseDuration(CharSequence text) {
Matcher matcher = PATTERN.matcher(text);
if (!matcher.matches()) {
throw new DateTimeParseException("Invalid duration", text, 0);
}
String d = matcher.group("days");
String h = matcher.group("hours");
String m = matcher.group("minutes");
String s = matcher.group("seconds");
if (d == null && h == null && m == null && s == null) {
throw new DateTimeParseException("Invalid duration", text, 0);
}
return Duration.ofDays(d == null ? 0 : Long.parseLong(d))
.plusHours(h == null ? 0 : Long.parseLong(h))
.plusMinutes(m == null ? 0 : Long.parseLong(m))
.plusSeconds(s == null ? 0 : Long.parseLong(s));
}

public static String formatDuration(Duration duration) {
long d = duration.toDays();
long h = duration.minusDays(d).toHours();
long m = duration.minusDays(d).minusHours(h).toMinutes();
long s = duration.minusDays(d).minusHours(h).minusMinutes(m).getSeconds();

return Stream.of(
d == 0 ? null : d + "d",
h == 0 ? null : h + "h",
m == 0 ? null : m + "m",
s == 0 ? null : s + "s")
.filter(v -> v != null)
.collect(Collectors.joining(" "));
}
}
68 changes: 68 additions & 0 deletions src/main/java/org/embulk/output/http_json/util/ProgressLogger.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package org.embulk.output.http_json.util;

import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import jersey.repackaged.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProgressLogger {
private static final long INITIAL_DELAY_SECONDS = 1;

private static final Logger logger = LoggerFactory.getLogger(ProgressLogger.class);

private final ScheduledExecutorService service;

private final AtomicLong globalRequestCount = new AtomicLong(0);
private final AtomicLong globalElapsedTime = new AtomicLong(0);

public ProgressLogger(Duration loggingInterval) {
service =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat(ProgressLogger.class.getSimpleName())
.build());
long loggingIntervalSecond = loggingInterval.getSeconds();
if (loggingIntervalSecond > 0) {
setSchedule(loggingIntervalSecond);
} else {
logger.warn("disabled progress log.");
service.shutdown();
}
}

private void setSchedule(long loggingInterval) {
service.scheduleAtFixedRate(
() -> outputProgress(), INITIAL_DELAY_SECONDS, loggingInterval, TimeUnit.SECONDS);
}

public void finish() {
if (!service.isShutdown()) {
service.submit(() -> outputProgress());
service.shutdown();
}
}

public void incrementRequestCount() {
globalRequestCount.incrementAndGet();
}

public void addElapsedTime(long elapsedTIme) {
globalElapsedTime.addAndGet(elapsedTIme);
}

private void outputProgress() {
long requestCount = globalRequestCount.get();
long elapsedTime = globalElapsedTime.get();
if (requestCount == 0) {
return;
}
logger.info(
"request count: {}, response time avg: {} ms",
requestCount,
elapsedTime / requestCount);
}
}

0 comments on commit d97310a

Please sign in to comment.