diff --git a/src/main/java/com/teragrep/functions/dpf_02/BatchCollect.java b/src/main/java/com/teragrep/functions/dpf_02/BatchCollect.java index 623e5c9..841b16b 100644 --- a/src/main/java/com/teragrep/functions/dpf_02/BatchCollect.java +++ b/src/main/java/com/teragrep/functions/dpf_02/BatchCollect.java @@ -47,25 +47,23 @@ */ import org.apache.spark.sql.*; -import org.apache.spark.sql.expressions.UserDefinedFunction; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; -public final class BatchCollect { +public final class BatchCollect extends SortOperation { private static final Logger LOGGER = LoggerFactory.getLogger(BatchCollect.class); private Dataset savedDs = null; private final String sortColumn; private final int numberOfRows; private StructType inputSchema; private boolean sortedBySingleColumn = false; - private List listOfSortByClauses = null; public BatchCollect(String sortColumn, int numberOfRows) { + super(); + LOGGER.info("Initialized BatchCollect based on column " + sortColumn + " and a limit of " + numberOfRows + " row(s)"); this.sortColumn = sortColumn; @@ -73,12 +71,12 @@ public BatchCollect(String sortColumn, int numberOfRows) { } public BatchCollect(String sortColumn, int numberOfRows, List listOfSortByClauses) { + super(listOfSortByClauses); + LOGGER.info("Initialized BatchCollect based on column " + sortColumn + " and a limit of " + numberOfRows + " row(s)." + " SortByClauses included: " + (listOfSortByClauses != null ? listOfSortByClauses.size() : "")); this.sortColumn = sortColumn; this.numberOfRows = numberOfRows; - - this.listOfSortByClauses = listOfSortByClauses; } /** @@ -107,7 +105,7 @@ public void collect(Dataset batchDF, Long batchId) { this.inputSchema = batchDF.schema(); } - if (this.listOfSortByClauses == null || this.listOfSortByClauses.size() < 1) { + if (this.getListOfSortByClauses() == null || this.getListOfSortByClauses().size() < 1) { for (String field : this.inputSchema.fieldNames()) { if (field.equals(this.sortColumn)) { this.sortedBySingleColumn = true; @@ -116,7 +114,7 @@ public void collect(Dataset batchDF, Long batchId) { } } - List collected = orderDatasetByGivenColumns(batchDF).limit(numberOfRows).collectAsList(); + List collected = orderDataset(batchDF).limit(numberOfRows).collectAsList(); Dataset createdDsFromCollected = SparkSession.builder().getOrCreate().createDataFrame(collected, this.inputSchema); if (this.savedDs == null) { @@ -126,7 +124,7 @@ public void collect(Dataset batchDF, Long batchId) { this.savedDs = savedDs.union(createdDsFromCollected); } - this.savedDs = orderDatasetByGivenColumns(this.savedDs).limit(numberOfRows); + this.savedDs = orderDataset(this.savedDs).limit(numberOfRows); } @@ -137,7 +135,7 @@ public void processAggregated(Dataset ds) { this.inputSchema = ds.schema(); } - List collected = orderDatasetByGivenColumns(ds).limit(numberOfRows).collectAsList(); + List collected = orderDataset(ds).limit(numberOfRows).collectAsList(); Dataset createdDsFromCollected = SparkSession.builder().getOrCreate().createDataFrame(collected, this.inputSchema); if (this.savedDs == null) { @@ -147,86 +145,15 @@ public void processAggregated(Dataset ds) { this.savedDs = savedDs.union(createdDsFromCollected); } - this.savedDs = orderDatasetByGivenColumns(this.savedDs).limit(numberOfRows); + this.savedDs = orderDataset(this.savedDs).limit(numberOfRows); } - // Performs orderBy operation on a dataset and returns the ordered one - private Dataset orderDatasetByGivenColumns(Dataset ds) { - final SparkSession ss = SparkSession.builder().getOrCreate(); - - if (this.listOfSortByClauses != null && this.listOfSortByClauses.size() > 0) { - for (SortByClause sbc : listOfSortByClauses) { - if (sbc.getSortAsType() == SortByClause.Type.AUTOMATIC) { - SortByClause.Type autoType = detectSortByType(ds.schema().fields(), sbc.getFieldName()); - ds = orderDatasetBySortByClause(ss, ds, sbc, autoType); - } - else { - ds = orderDatasetBySortByClause(ss, ds, sbc, null); - } - } - } - else if (this.sortedBySingleColumn) { - ds = ds.orderBy(functions.col(this.sortColumn).desc()); - } - - return ds; - } - - // orderBy based on sortByClause type and if it is descending/ascending - private Dataset orderDatasetBySortByClause(final SparkSession ss, final Dataset unsorted, final SortByClause sortByClause, final SortByClause.Type overrideSortType) { - Dataset rv = null; - SortByClause.Type sortByType = sortByClause.getSortAsType(); - if (overrideSortType != null) { - sortByType = overrideSortType; - } - - switch (sortByType) { - case DEFAULT: - rv = unsorted.orderBy(sortByClause.isDescending() ? - functions.col(sortByClause.getFieldName()).desc() : - functions.col(sortByClause.getFieldName()).asc()); - break; - case STRING: - rv = unsorted.orderBy(sortByClause.isDescending() ? - functions.col(sortByClause.getFieldName()).cast(DataTypes.StringType).desc() : - functions.col(sortByClause.getFieldName()).cast(DataTypes.StringType).asc()); - break; - case NUMERIC: - rv = unsorted.orderBy(sortByClause.isDescending() ? - functions.col(sortByClause.getFieldName()).cast(DataTypes.DoubleType).desc() : - functions.col(sortByClause.getFieldName()).cast(DataTypes.DoubleType).asc()); - break; - case IP_ADDRESS: - UserDefinedFunction ipStringToIntUDF = functions.udf(new ConvertIPStringToInt(), DataTypes.LongType); - ss.udf().register("ip_string_to_int", ipStringToIntUDF); - Column sortingCol = functions.callUDF("ip_string_to_int", functions.col(sortByClause.getFieldName())); - - rv = unsorted.orderBy(sortByClause.isDescending() ? sortingCol.desc() : sortingCol.asc()); - break; - } - return rv; - } - - // detect sorting type if auto() was used in sort - private SortByClause.Type detectSortByType(final StructField[] fields, final String fieldName) { - for (StructField field : fields) { - if (field.name().equals(fieldName)) { - switch (field.dataType().typeName()) { - case "string": // ip address? - return SortByClause.Type.STRING; - case "long": - case "integer": - case "float": - case "double": - return SortByClause.Type.NUMERIC; - case "timestamp": - return SortByClause.Type.NUMERIC; // convert to unix epoch? - default: - return SortByClause.Type.DEFAULT; - } - } + private Dataset orderDataset(Dataset ds) { + if (this.sortedBySingleColumn) { + return ds.orderBy(functions.col(this.sortColumn).desc()); + } else { + return this.orderDatasetByGivenColumns(ds); } - return SortByClause.Type.DEFAULT; } // TODO: Remove diff --git a/src/main/java/com/teragrep/functions/dpf_02/SortOperation.java b/src/main/java/com/teragrep/functions/dpf_02/SortOperation.java new file mode 100644 index 0000000..3f33fa3 --- /dev/null +++ b/src/main/java/com/teragrep/functions/dpf_02/SortOperation.java @@ -0,0 +1,154 @@ +package com.teragrep.functions.dpf_02; + +/* + * Teragrep Batch Collect DPF-02 + * Copyright (C) 2019, 2020, 2021, 2022 Suomen Kanuuna Oy + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + * Additional permission under GNU Affero General Public License version 3 + * section 7 + * + * If you modify this Program, or any covered work, by linking or combining it + * with other code, such other code is not for that reason alone subject to any + * of the requirements of the GNU Affero GPL version 3 as long as this Program + * is the same Program as licensed from Suomen Kanuuna Oy without any additional + * modifications. + * + * Supplemented terms under GNU Affero General Public License version 3 + * section 7 + * + * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified + * versions must be marked as "Modified version of" The Program. + * + * Names of the licensors and authors may not be used for publicity purposes. + * + * No rights are granted for use of trade names, trademarks, or service marks + * which are in The Program if any. + * + * Licensee must indemnify licensors and authors for any liability that these + * contractual assumptions impose on licensors and authors. + * + * To the extent this program is licensed as part of the Commercial versions of + * Teragrep, the applicable Commercial License may apply to this file if you as + * a licensee so wish it. + */ + +import org.apache.spark.sql.*; +import org.apache.spark.sql.expressions.UserDefinedFunction; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; + +import java.util.ArrayList; +import java.util.List; + +public abstract class SortOperation { + private List listOfSortByClauses = null; + + /** + * Initializes an empty list of SortByClauses + */ + public SortOperation() { + this.listOfSortByClauses = new ArrayList<>(); + } + + public SortOperation(List listOfSortByClauses) { + this.listOfSortByClauses = listOfSortByClauses; + } + + public List getListOfSortByClauses() { + return this.listOfSortByClauses; + } + + public void addSortByClause(SortByClause sortByClause) { + this.listOfSortByClauses.add(sortByClause); + } + + // Performs orderBy operation on a dataset and returns the ordered one + public Dataset orderDatasetByGivenColumns(Dataset ds) { + final SparkSession ss = SparkSession.builder().getOrCreate(); + + if (this.listOfSortByClauses != null && this.listOfSortByClauses.size() > 0) { + for (SortByClause sbc : listOfSortByClauses) { + if (sbc.getSortAsType() == SortByClause.Type.AUTOMATIC) { + SortByClause.Type autoType = detectSortByType(ds.schema().fields(), sbc.getFieldName()); + ds = orderDatasetBySortByClause(ss, ds, sbc, autoType); + } + else { + ds = orderDatasetBySortByClause(ss, ds, sbc, null); + } + } + } + + return ds; + } + + // orderBy based on sortByClause type and if it is descending/ascending + private Dataset orderDatasetBySortByClause(final SparkSession ss, final Dataset unsorted, final SortByClause sortByClause, final SortByClause.Type overrideSortType) { + Dataset rv = null; + SortByClause.Type sortByType = sortByClause.getSortAsType(); + if (overrideSortType != null) { + sortByType = overrideSortType; + } + + switch (sortByType) { + case DEFAULT: + rv = unsorted.orderBy(sortByClause.isDescending() ? + functions.col(sortByClause.getFieldName()).desc() : + functions.col(sortByClause.getFieldName()).asc()); + break; + case STRING: + rv = unsorted.orderBy(sortByClause.isDescending() ? + functions.col(sortByClause.getFieldName()).cast(DataTypes.StringType).desc() : + functions.col(sortByClause.getFieldName()).cast(DataTypes.StringType).asc()); + break; + case NUMERIC: + rv = unsorted.orderBy(sortByClause.isDescending() ? + functions.col(sortByClause.getFieldName()).cast(DataTypes.DoubleType).desc() : + functions.col(sortByClause.getFieldName()).cast(DataTypes.DoubleType).asc()); + break; + case IP_ADDRESS: + UserDefinedFunction ipStringToIntUDF = functions.udf(new ConvertIPStringToInt(), DataTypes.LongType); + ss.udf().register("ip_string_to_int", ipStringToIntUDF); + Column sortingCol = functions.callUDF("ip_string_to_int", functions.col(sortByClause.getFieldName())); + + rv = unsorted.orderBy(sortByClause.isDescending() ? sortingCol.desc() : sortingCol.asc()); + break; + } + return rv; + } + + // detect sorting type if auto() was used in sort + private SortByClause.Type detectSortByType(final StructField[] fields, final String fieldName) { + for (StructField field : fields) { + if (field.name().equals(fieldName)) { + switch (field.dataType().typeName()) { + case "string": // ip address? + return SortByClause.Type.STRING; + case "long": + case "integer": + case "float": + case "double": + return SortByClause.Type.NUMERIC; + case "timestamp": + return SortByClause.Type.NUMERIC; // convert to unix epoch? + default: + return SortByClause.Type.DEFAULT; + } + } + } + return SortByClause.Type.DEFAULT; + } +} \ No newline at end of file