Skip to content

Commit

Permalink
Create an abstract class for inheriting sorting logic to BatchCollect (
Browse files Browse the repository at this point in the history
  • Loading branch information
51-code authored Nov 27, 2023
1 parent 6ecb08f commit a5da6f1
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 88 deletions.
103 changes: 15 additions & 88 deletions src/main/java/com/teragrep/functions/dpf_02/BatchCollect.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,38 +47,36 @@
*/

import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

public final class BatchCollect {
public final class BatchCollect extends SortOperation {
private static final Logger LOGGER = LoggerFactory.getLogger(BatchCollect.class);
private Dataset<Row> savedDs = null;
private final String sortColumn;
private final int numberOfRows;
private StructType inputSchema;
private boolean sortedBySingleColumn = false;
private List<SortByClause> listOfSortByClauses = null;

public BatchCollect(String sortColumn, int numberOfRows) {
super();

LOGGER.info("Initialized BatchCollect based on column " + sortColumn + " and a limit of " + numberOfRows
+ " row(s)");
this.sortColumn = sortColumn;
this.numberOfRows = numberOfRows;
}

public BatchCollect(String sortColumn, int numberOfRows, List<SortByClause> listOfSortByClauses) {
super(listOfSortByClauses);

LOGGER.info("Initialized BatchCollect based on column " + sortColumn + " and a limit of " + numberOfRows + " row(s)." +
" SortByClauses included: " + (listOfSortByClauses != null ? listOfSortByClauses.size() : "<null>"));
this.sortColumn = sortColumn;
this.numberOfRows = numberOfRows;

this.listOfSortByClauses = listOfSortByClauses;
}

/**
Expand Down Expand Up @@ -107,7 +105,7 @@ public void collect(Dataset<Row> batchDF, Long batchId) {
this.inputSchema = batchDF.schema();
}

if (this.listOfSortByClauses == null || this.listOfSortByClauses.size() < 1) {
if (this.getListOfSortByClauses() == null || this.getListOfSortByClauses().size() < 1) {
for (String field : this.inputSchema.fieldNames()) {
if (field.equals(this.sortColumn)) {
this.sortedBySingleColumn = true;
Expand All @@ -116,7 +114,7 @@ public void collect(Dataset<Row> batchDF, Long batchId) {
}
}

List<Row> collected = orderDatasetByGivenColumns(batchDF).limit(numberOfRows).collectAsList();
List<Row> collected = orderDataset(batchDF).limit(numberOfRows).collectAsList();
Dataset<Row> createdDsFromCollected = SparkSession.builder().getOrCreate().createDataFrame(collected, this.inputSchema);

if (this.savedDs == null) {
Expand All @@ -126,7 +124,7 @@ public void collect(Dataset<Row> batchDF, Long batchId) {
this.savedDs = savedDs.union(createdDsFromCollected);
}

this.savedDs = orderDatasetByGivenColumns(this.savedDs).limit(numberOfRows);
this.savedDs = orderDataset(this.savedDs).limit(numberOfRows);

}

Expand All @@ -137,7 +135,7 @@ public void processAggregated(Dataset<Row> ds) {
this.inputSchema = ds.schema();
}

List<Row> collected = orderDatasetByGivenColumns(ds).limit(numberOfRows).collectAsList();
List<Row> collected = orderDataset(ds).limit(numberOfRows).collectAsList();
Dataset<Row> createdDsFromCollected = SparkSession.builder().getOrCreate().createDataFrame(collected, this.inputSchema);

if (this.savedDs == null) {
Expand All @@ -147,86 +145,15 @@ public void processAggregated(Dataset<Row> ds) {
this.savedDs = savedDs.union(createdDsFromCollected);
}

this.savedDs = orderDatasetByGivenColumns(this.savedDs).limit(numberOfRows);
this.savedDs = orderDataset(this.savedDs).limit(numberOfRows);
}

// Performs orderBy operation on a dataset and returns the ordered one
private Dataset<Row> orderDatasetByGivenColumns(Dataset<Row> ds) {
final SparkSession ss = SparkSession.builder().getOrCreate();

if (this.listOfSortByClauses != null && this.listOfSortByClauses.size() > 0) {
for (SortByClause sbc : listOfSortByClauses) {
if (sbc.getSortAsType() == SortByClause.Type.AUTOMATIC) {
SortByClause.Type autoType = detectSortByType(ds.schema().fields(), sbc.getFieldName());
ds = orderDatasetBySortByClause(ss, ds, sbc, autoType);
}
else {
ds = orderDatasetBySortByClause(ss, ds, sbc, null);
}
}
}
else if (this.sortedBySingleColumn) {
ds = ds.orderBy(functions.col(this.sortColumn).desc());
}

return ds;
}

// orderBy based on sortByClause type and if it is descending/ascending
private Dataset<Row> orderDatasetBySortByClause(final SparkSession ss, final Dataset<Row> unsorted, final SortByClause sortByClause, final SortByClause.Type overrideSortType) {
Dataset<Row> rv = null;
SortByClause.Type sortByType = sortByClause.getSortAsType();
if (overrideSortType != null) {
sortByType = overrideSortType;
}

switch (sortByType) {
case DEFAULT:
rv = unsorted.orderBy(sortByClause.isDescending() ?
functions.col(sortByClause.getFieldName()).desc() :
functions.col(sortByClause.getFieldName()).asc());
break;
case STRING:
rv = unsorted.orderBy(sortByClause.isDescending() ?
functions.col(sortByClause.getFieldName()).cast(DataTypes.StringType).desc() :
functions.col(sortByClause.getFieldName()).cast(DataTypes.StringType).asc());
break;
case NUMERIC:
rv = unsorted.orderBy(sortByClause.isDescending() ?
functions.col(sortByClause.getFieldName()).cast(DataTypes.DoubleType).desc() :
functions.col(sortByClause.getFieldName()).cast(DataTypes.DoubleType).asc());
break;
case IP_ADDRESS:
UserDefinedFunction ipStringToIntUDF = functions.udf(new ConvertIPStringToInt(), DataTypes.LongType);
ss.udf().register("ip_string_to_int", ipStringToIntUDF);
Column sortingCol = functions.callUDF("ip_string_to_int", functions.col(sortByClause.getFieldName()));

rv = unsorted.orderBy(sortByClause.isDescending() ? sortingCol.desc() : sortingCol.asc());
break;
}
return rv;
}

// detect sorting type if auto() was used in sort
private SortByClause.Type detectSortByType(final StructField[] fields, final String fieldName) {
for (StructField field : fields) {
if (field.name().equals(fieldName)) {
switch (field.dataType().typeName()) {
case "string": // ip address?
return SortByClause.Type.STRING;
case "long":
case "integer":
case "float":
case "double":
return SortByClause.Type.NUMERIC;
case "timestamp":
return SortByClause.Type.NUMERIC; // convert to unix epoch?
default:
return SortByClause.Type.DEFAULT;
}
}
private Dataset<Row> orderDataset(Dataset<Row> ds) {
if (this.sortedBySingleColumn) {
return ds.orderBy(functions.col(this.sortColumn).desc());
} else {
return this.orderDatasetByGivenColumns(ds);
}
return SortByClause.Type.DEFAULT;
}

// TODO: Remove
Expand Down
154 changes: 154 additions & 0 deletions src/main/java/com/teragrep/functions/dpf_02/SortOperation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package com.teragrep.functions.dpf_02;

/*
* Teragrep Batch Collect DPF-02
* Copyright (C) 2019, 2020, 2021, 2022 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://github.com/teragrep/teragrep/blob/main/LICENSE>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/

import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;

import java.util.ArrayList;
import java.util.List;

public abstract class SortOperation {
private List<SortByClause> listOfSortByClauses = null;

/**
* Initializes an empty list of SortByClauses
*/
public SortOperation() {
this.listOfSortByClauses = new ArrayList<>();
}

public SortOperation(List<SortByClause> listOfSortByClauses) {
this.listOfSortByClauses = listOfSortByClauses;
}

public List<SortByClause> getListOfSortByClauses() {
return this.listOfSortByClauses;
}

public void addSortByClause(SortByClause sortByClause) {
this.listOfSortByClauses.add(sortByClause);
}

// Performs orderBy operation on a dataset and returns the ordered one
public Dataset<Row> orderDatasetByGivenColumns(Dataset<Row> ds) {
final SparkSession ss = SparkSession.builder().getOrCreate();

if (this.listOfSortByClauses != null && this.listOfSortByClauses.size() > 0) {
for (SortByClause sbc : listOfSortByClauses) {
if (sbc.getSortAsType() == SortByClause.Type.AUTOMATIC) {
SortByClause.Type autoType = detectSortByType(ds.schema().fields(), sbc.getFieldName());
ds = orderDatasetBySortByClause(ss, ds, sbc, autoType);
}
else {
ds = orderDatasetBySortByClause(ss, ds, sbc, null);
}
}
}

return ds;
}

// orderBy based on sortByClause type and if it is descending/ascending
private Dataset<Row> orderDatasetBySortByClause(final SparkSession ss, final Dataset<Row> unsorted, final SortByClause sortByClause, final SortByClause.Type overrideSortType) {
Dataset<Row> rv = null;
SortByClause.Type sortByType = sortByClause.getSortAsType();
if (overrideSortType != null) {
sortByType = overrideSortType;
}

switch (sortByType) {
case DEFAULT:
rv = unsorted.orderBy(sortByClause.isDescending() ?
functions.col(sortByClause.getFieldName()).desc() :
functions.col(sortByClause.getFieldName()).asc());
break;
case STRING:
rv = unsorted.orderBy(sortByClause.isDescending() ?
functions.col(sortByClause.getFieldName()).cast(DataTypes.StringType).desc() :
functions.col(sortByClause.getFieldName()).cast(DataTypes.StringType).asc());
break;
case NUMERIC:
rv = unsorted.orderBy(sortByClause.isDescending() ?
functions.col(sortByClause.getFieldName()).cast(DataTypes.DoubleType).desc() :
functions.col(sortByClause.getFieldName()).cast(DataTypes.DoubleType).asc());
break;
case IP_ADDRESS:
UserDefinedFunction ipStringToIntUDF = functions.udf(new ConvertIPStringToInt(), DataTypes.LongType);
ss.udf().register("ip_string_to_int", ipStringToIntUDF);
Column sortingCol = functions.callUDF("ip_string_to_int", functions.col(sortByClause.getFieldName()));

rv = unsorted.orderBy(sortByClause.isDescending() ? sortingCol.desc() : sortingCol.asc());
break;
}
return rv;
}

// detect sorting type if auto() was used in sort
private SortByClause.Type detectSortByType(final StructField[] fields, final String fieldName) {
for (StructField field : fields) {
if (field.name().equals(fieldName)) {
switch (field.dataType().typeName()) {
case "string": // ip address?
return SortByClause.Type.STRING;
case "long":
case "integer":
case "float":
case "double":
return SortByClause.Type.NUMERIC;
case "timestamp":
return SortByClause.Type.NUMERIC; // convert to unix epoch?
default:
return SortByClause.Type.DEFAULT;
}
}
}
return SortByClause.Type.DEFAULT;
}
}

0 comments on commit a5da6f1

Please sign in to comment.