Skip to content

Commit

Permalink
[ISSUE-442] Support time window (#443)
Browse files Browse the repository at this point in the history
* feat: support time window

* fix: remove unused var

* fix: ut
  • Loading branch information
puuuuug authored Jan 17, 2025
1 parent 92c85bd commit 69b4ce0
Showing 40 changed files with 560 additions and 131 deletions.
Original file line number Diff line number Diff line change
@@ -35,6 +35,11 @@ public class DSLConfigKeys implements Serializable {
.defaultValue(1L)
.description("Window size, -1 represent the all window.");

public static final ConfigKey GEAFLOW_DSL_TIME_WINDOW_SIZE = ConfigKeys
.key("geaflow.dsl.time.window.size")
.defaultValue(-1L)
.description("Specifies source time window size in second unites");

public static final ConfigKey GEAFLOW_DSL_TABLE_TYPE = ConfigKeys
.key("geaflow.dsl.table.type")
.noDefaultValue()
Original file line number Diff line number Diff line change
@@ -19,20 +19,15 @@

public class DateTimeUtil {

public static int toUnixTime(String dateStr, String format) {
public static long toUnixTime(String dateStr, String format) {
if (dateStr == null || dateStr.isEmpty()) {
return -1;
} else {
DateTimeFormatter dateTimeFormat = DateTimeFormat.forPattern(format);
return (int) (dateTimeFormat.parseMillis(dateStr) / 1000);
return dateTimeFormat.parseMillis(dateStr);
}
}

public static String fromUnixTime(int unixTime, String format) {
long millsTs = ((long) unixTime) * 1000L;
return DateTimeFormat.forPattern(format).print(millsTs);
}

public static String fromUnixTime(long unixTime, String format) {
return DateTimeFormat.forPattern(format).print(unixTime);
}
Original file line number Diff line number Diff line change
@@ -22,9 +22,9 @@ public class DateTimeUtilTest {
@Test
public void testDateTimeUtil() {
Assert.assertEquals(DateTimeUtil.fromUnixTime(1111, "yyyy-MM-dd hh:mm:ss"),
"1970-01-01 08:18:31");
"1970-01-01 08:00:01");
Assert.assertEquals(DateTimeUtil.toUnixTime("1970-01-01 08:18:31", "yyyy-MM-dd hh:mm:ss"),
1111);
1111000);
Assert.assertEquals(DateTimeUtil.toUnixTime("", "yyyy-MM-dd hh:mm:ss"),
-1);
}
Original file line number Diff line number Diff line change
@@ -34,4 +34,9 @@ public interface IWindow<T> extends Function, Serializable {
*/
long assignWindow(T value);

/**
* Return window type.
*/
WindowType getType();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.antgroup.geaflow.api.window;

public enum WindowType {
ALL_WINDOW, // all data
FIXED_TIME_TUMBLING_WINDOW, // window with time unit
SIZE_TUMBLING_WINDOW, // window with size unit
CUSTOM
}
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
package com.antgroup.geaflow.api.window.impl;

import com.antgroup.geaflow.api.window.IWindow;
import com.antgroup.geaflow.api.window.WindowType;

public class AllWindow<T> implements IWindow<T> {

@@ -40,4 +41,9 @@ public long assignWindow(T value) {
public static synchronized <T> AllWindow<T> getInstance() {
return new AllWindow<>();
}

@Override
public WindowType getType() {
return WindowType.ALL_WINDOW;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.antgroup.geaflow.api.window.impl;

import com.antgroup.geaflow.api.window.ITumblingWindow;
import com.antgroup.geaflow.api.window.WindowType;

public class FixedTimeTumblingWindow<T> implements ITumblingWindow<T> {

private final long timeWindowInSecond;
private long windowId;

public FixedTimeTumblingWindow(long timeWindowInSecond) {
this.timeWindowInSecond = timeWindowInSecond;
}

public long getTimeWindowSize() {
return timeWindowInSecond;
}

@Override
public long windowId() {
return windowId;
}

@Override
public void initWindow(long windowId) {
this.windowId = windowId;
}

@Override
public long assignWindow(T value) {
return windowId;
}

@Override
public WindowType getType() {
return WindowType.FIXED_TIME_TUMBLING_WINDOW;
}
}
Original file line number Diff line number Diff line change
@@ -15,10 +15,11 @@
package com.antgroup.geaflow.api.window.impl;

import com.antgroup.geaflow.api.window.ITumblingWindow;
import com.antgroup.geaflow.api.window.WindowType;

public class SizeTumblingWindow<T> implements ITumblingWindow<T> {

private long size;
private final long size;
private long count;
private long windowId;

@@ -54,4 +55,9 @@ public long assignWindow(T value) {
return windowId + 1;
}
}

@Override
public WindowType getType() {
return WindowType.SIZE_TUMBLING_WINDOW;
}
}
Original file line number Diff line number Diff line change
@@ -16,19 +16,38 @@

import com.antgroup.geaflow.api.window.IWindow;
import com.antgroup.geaflow.api.window.impl.AllWindow;
import com.antgroup.geaflow.api.window.impl.FixedTimeTumblingWindow;
import com.antgroup.geaflow.api.window.impl.SizeTumblingWindow;
import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.config.keys.DSLConfigKeys;
import com.google.common.base.Preconditions;

public class Windows {

public static final long SIZE_OF_ALL_WINDOW = -1L;

public static <T> IWindow<T> createWindow(Configuration configuration) {
long batchSize = configuration.getLong(DSLConfigKeys.GEAFLOW_DSL_WINDOW_SIZE);
if (batchSize == SIZE_OF_ALL_WINDOW) {
long batchWindowSize = Integer.MIN_VALUE;
if (configuration.contains(DSLConfigKeys.GEAFLOW_DSL_WINDOW_SIZE)) {
batchWindowSize = configuration.getLong(DSLConfigKeys.GEAFLOW_DSL_WINDOW_SIZE);
Preconditions.checkState(batchWindowSize != 0, "Window size should not be zero!");
}
long timeWindowDuration = -1;
if (configuration.contains(DSLConfigKeys.GEAFLOW_DSL_TIME_WINDOW_SIZE)) {
timeWindowDuration = configuration.getLong(DSLConfigKeys.GEAFLOW_DSL_TIME_WINDOW_SIZE);
Preconditions.checkState(timeWindowDuration > 0, "Time Window size should not be positive!");
}
Preconditions.checkState(!(batchWindowSize >= SIZE_OF_ALL_WINDOW && timeWindowDuration > 0),
"Only one of window can exist! size window:%s, time window:%s", batchWindowSize, timeWindowDuration);
if (batchWindowSize == SIZE_OF_ALL_WINDOW) {
return AllWindow.getInstance();
} else if (batchWindowSize > 0) {
return new SizeTumblingWindow<>(batchWindowSize);
} else if (timeWindowDuration > 0) {
return new FixedTimeTumblingWindow<>(timeWindowDuration);
} else {
// use default
return new SizeTumblingWindow<>((Long) DSLConfigKeys.GEAFLOW_DSL_WINDOW_SIZE.getDefaultValue());
}
return new SizeTumblingWindow<>(batchSize);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.antgroup.geaflow.dsl.connector.api;


import com.antgroup.geaflow.dsl.common.exception.GeaFlowDSLException;
import com.antgroup.geaflow.dsl.connector.api.window.AllFetchWindow;
import com.antgroup.geaflow.dsl.connector.api.window.FetchWindow;
import com.antgroup.geaflow.dsl.connector.api.window.SizeFetchWindow;
import com.antgroup.geaflow.dsl.connector.api.window.TimeFetchWindow;
import java.io.IOException;
import java.util.Optional;

public abstract class AbstractTableSource implements TableSource {

public <T> FetchData<T> fetch(Partition partition, Optional<Offset> startOffset, FetchWindow windowInfo) throws IOException {
switch (windowInfo.getType()) {
case ALL_WINDOW:
return fetch(partition, startOffset, (AllFetchWindow) windowInfo);
case SIZE_TUMBLING_WINDOW:
return fetch(partition, startOffset, (SizeFetchWindow) windowInfo);
case FIXED_TIME_TUMBLING_WINDOW:
return fetch(partition, startOffset, (TimeFetchWindow) windowInfo);
default:
throw new GeaFlowDSLException("Not support window type:{}", windowInfo.getType());
}
}


public <T> FetchData<T> fetch(Partition partition, Optional<Offset> startOffset, AllFetchWindow windowInfo) throws IOException {
throw new GeaFlowDSLException("Not support");
}

public <T> FetchData<T> fetch(Partition partition, Optional<Offset> startOffset, SizeFetchWindow windowInfo) throws IOException {
throw new GeaFlowDSLException("Not support");
}

public <T> FetchData<T> fetch(Partition partition, Optional<Offset> startOffset, TimeFetchWindow windowInfo) throws IOException {
throw new GeaFlowDSLException("Not support");
}

}
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
import com.antgroup.geaflow.dsl.common.data.Row;
import com.antgroup.geaflow.dsl.common.types.TableSchema;
import com.antgroup.geaflow.dsl.connector.api.serde.TableDeserializer;
import com.antgroup.geaflow.dsl.connector.api.window.FetchWindow;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
@@ -51,10 +52,9 @@ public interface TableSource extends Serializable {
<IN> TableDeserializer<IN> getDeserializer(Configuration conf);

/**
* Fetch data for the partition from start offset. if the windowSize is -1, it represents an
* all-window which will read all the data from the source, else return window size for data.
* Fetch data for the partition from start offset.
*/
<T> FetchData<T> fetch(Partition partition, Optional<Offset> startOffset, long windowSize) throws IOException;
<T> FetchData<T> fetch(Partition partition, Optional<Offset> startOffset, FetchWindow windowInfo) throws IOException;

/**
* The close callback for the job finish the execution.
Original file line number Diff line number Diff line change
@@ -19,7 +19,6 @@
import com.antgroup.geaflow.api.function.io.SourceFunction;
import com.antgroup.geaflow.api.window.IWindow;
import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.config.keys.DSLConfigKeys;
import com.antgroup.geaflow.dsl.common.data.Row;
import com.antgroup.geaflow.dsl.common.exception.GeaFlowDSLException;
import com.antgroup.geaflow.dsl.common.types.StructType;
@@ -28,6 +27,8 @@
import com.antgroup.geaflow.dsl.connector.api.Partition;
import com.antgroup.geaflow.dsl.connector.api.TableSource;
import com.antgroup.geaflow.dsl.connector.api.serde.TableDeserializer;
import com.antgroup.geaflow.dsl.connector.api.window.FetchWindow;
import com.antgroup.geaflow.dsl.connector.api.window.FetchWindowFactory;
import com.antgroup.geaflow.dsl.planner.GQLJavaTypeFactory;
import com.antgroup.geaflow.dsl.schema.GeaFlowTable;
import com.antgroup.geaflow.dsl.util.SqlTypeUtil;
@@ -69,8 +70,6 @@ public class GeaFlowTableSourceFunction extends RichFunction implements SourceFu

private OffsetStore offsetStore;

private long windowSize;

private TableDeserializer<?> deserializer;

private transient volatile List<Partition> oldPartitions = null;
@@ -117,9 +116,7 @@ public void close() {

@Override
public void init(int parallel, int index) {
Configuration conf = table.getConfigWithGlobal(runtimeContext.getConfiguration());

windowSize = conf.getLong(DSLConfigKeys.GEAFLOW_DSL_WINDOW_SIZE);
tableSource.open(runtimeContext);

List<Partition> allPartitions = tableSource.listPartitions();
@@ -129,14 +126,15 @@ public void init(int parallel, int index) {
partitions = assignPartition(allPartitions,
runtimeContext.getTaskArgs().getMaxParallelism(), parallel, index);

Configuration conf = table.getConfigWithGlobal(runtimeContext.getConfiguration());
deserializer = tableSource.getDeserializer(conf);
if (deserializer != null) {
StructType schema = (StructType) SqlTypeUtil.convertType(
table.getRowType(GQLJavaTypeFactory.create()));
deserializer.init(conf, schema);
}
LOGGER.info("open source table: {}, taskIndex:{}, parallel: {}, windowSize:{}, assigned "
+ "partitions:{}", table.getName(), index, parallel, windowSize, partitions);
LOGGER.info("open source table: {}, taskIndex:{}, parallel: {}, assigned "
+ "partitions:{}", table.getName(), index, parallel, partitions);
}

@SuppressWarnings("unchecked")
@@ -148,12 +146,13 @@ public boolean fetch(IWindow<Row> window, SourceContext<Row> ctx) throws Excepti
if (partitions.isEmpty()) {
return false;
}
FetchWindow fetchWindow = FetchWindowFactory.createFetchWindow(window);
long batchId = window.windowId();
boolean isFinish = true;
for (Partition partition : partitions) {
Offset offset = offsetStore.readOffset(partition.getName(), batchId);
FetchData<Object> fetchData = tableSource.fetch(partition, Optional.ofNullable(offset),
windowSize);
fetchWindow);
Iterator<Object> dataIterator = fetchData.getDataIterator();
while (dataIterator.hasNext()) {
Object record = dataIterator.next();
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.antgroup.geaflow.dsl.connector.api.window;

public abstract class AbstractFetchWindow implements FetchWindow {

protected final long windowId;

public AbstractFetchWindow(long windowId) {
this.windowId = windowId;
}

@Override
public long windowId() {
return windowId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.antgroup.geaflow.dsl.connector.api.window;

import com.antgroup.geaflow.api.window.WindowType;

/**
* Fetch all.
*/
public class AllFetchWindow extends AbstractFetchWindow {

public AllFetchWindow(long windowId) {
super(windowId);
}

@Override
public long windowSize() {
return -1;
}

@Override
public WindowType getType() {
return WindowType.ALL_WINDOW;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.antgroup.geaflow.dsl.connector.api.window;

import com.antgroup.geaflow.api.window.WindowType;

/**
* Interface for the table source fetch records.
*/
public interface FetchWindow {

/**
* Return the window id.
*/
long windowId();

/**
* Return the window size.
*/
long windowSize();

/**
* Return the window type.
*/
WindowType getType();

}
Loading

0 comments on commit 69b4ce0

Please sign in to comment.