paramTreeMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+ paramTreeMap.putAll(parameters);
+
+ HpccOptions options = new HpccOptions(paramTreeMap);
+ return options;
+ }
+
+}
diff --git a/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/datasource/package-info.java b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/datasource/package-info.java
new file mode 100644
index 000000000..9bf54e0f5
--- /dev/null
+++ b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/datasource/package-info.java
@@ -0,0 +1,6 @@
+/**
+ *
+ * Provides mechanism to stream HPCC Systems data via Spark Relation.
+ *
+ */
+package org.hpccsystems.spark.datasource;
diff --git a/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/package-info.java b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/package-info.java
new file mode 100644
index 000000000..5f612d7f1
--- /dev/null
+++ b/spark-hpcc/DataAccess/src/main/java/org/hpccsystems/spark/package-info.java
@@ -0,0 +1,38 @@
+/*******************************************************************************
+ * HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+/**
+ * Provides access to data residing in HPCC Systems or Spark environments.
+ *
+ * The DFSClient from HPCC Systems is used to used to access
+ * HPCC Systems data files' metadata including the location and layout of the file, and
+ * also requests data file access privilages.
+ * An RDD is provided to read the file in parallel by file part.
+ *
+ * The main classes are:
+ *
+ * - Content is the abstract class defining field content. There are concrete
+ * classes for each of the different content types.
+ * - FieldType is an enumeration type listing the types of content.
+ * - HpccPart implements the Spark Partition interface.
+ * - HpccFile is the metadata for a file on an HPCC THOR cluster.
+ * - HpccFileException is the general exception class.
+ * - HpccRDD extends RDD(Record) class for Spark.
+ * - HpccRemoteFileReader is the facade for the type of file reader.
+ * - Record is the container class holding the data for a record from THOR.
+ *
+ *
+ */
+package org.hpccsystems.spark;
diff --git a/spark-hpcc/DataAccess/src/main/javadoc/overview.html b/spark-hpcc/DataAccess/src/main/javadoc/overview.html
new file mode 100644
index 000000000..5828b7418
--- /dev/null
+++ b/spark-hpcc/DataAccess/src/main/javadoc/overview.html
@@ -0,0 +1,7 @@
+
+
+This project enables HPCC Systems / Spark interoperability.
+
+The DataAccess project contains the classes which expose distributed streaming of HPCC based data via Spark constructs. In addition, the HPCC data is exposed as a Dataframe for the convenience of the Spark developer.
+
+
\ No newline at end of file
diff --git a/spark-hpcc/DataAccess/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/spark-hpcc/DataAccess/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 000000000..570936da3
--- /dev/null
+++ b/spark-hpcc/DataAccess/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1 @@
+org.hpccsystems.spark.datasource.HpccRelationProvider
diff --git a/spark-hpcc/DataAccess/src/main/resources/log4j.properties b/spark-hpcc/DataAccess/src/main/resources/log4j.properties
new file mode 100644
index 000000000..b8747f7c2
--- /dev/null
+++ b/spark-hpcc/DataAccess/src/main/resources/log4j.properties
@@ -0,0 +1,27 @@
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+
+# Set the default spark-shell log level to WARN. When running the spark-shell, the
+# log level for this class is used to overwrite the root logger's log level, so that
+# the user can have different defaults for the shell and regular Spark apps.
+log4j.logger.org.apache.spark.repl.Main=WARN
+
+# Settings to quiet third party logs that are too verbose
+log4j.logger.org.spark_project.jetty=WARN
+log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
+log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
+log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
+log4j.logger.org.apache.parquet=ERROR
+log4j.logger.parquet=ERROR
+log4j.logger.org.apache.axis2.enterprise=FATAL
+log4j.logger.de.hunsicker.jalopy.io=FATAL
+log4j.logger.httpclient.wire.header=FATAL
+log4j.logger.org.apache.commons.httpclient=FATAL
+
+# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
+log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
+log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
\ No newline at end of file
diff --git a/spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/BaseIntegrationTest.java b/spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/BaseIntegrationTest.java
new file mode 100644
index 000000000..b7d621f0f
--- /dev/null
+++ b/spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/BaseIntegrationTest.java
@@ -0,0 +1,167 @@
+/*******************************************************************************
+ * HPCC SYSTEMS software Copyright (C) 2023 HPCC Systems®.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.hpccsystems.spark;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.sql.SparkSession;
+
+class BaseIntegrationTest
+{
+ static SparkContext sparkContext = null;
+
+ public File findRecentlyBuiltSparkJar()
+ {
+ try
+ {
+ URL url = BaseIntegrationTest.class.getProtectionDomain().getCodeSource().getLocation();
+ Path parentPath = Paths.get(url.toURI()).getParent();
+
+ FilenameFilter filter = new FilenameFilter()
+ {
+ @Override
+ public boolean accept(File dir, String name)
+ {
+ return name.matches("spark-hpcc.*-jar-with-dependencies\\.jar");
+ }
+ };
+
+ File[] files = parentPath.toFile().listFiles(filter);
+ if (files != null && files.length > 0)
+ {
+ // Return the mostly recently modified Spark jar. This should always be the correct jar
+ // as the integration tests will run right after the build step is complete.
+ File mostRecentlyModifiedFile = null;
+ long lastModifiedTime = Long.MIN_VALUE;
+
+ for (File file : files)
+ {
+ long modifiedTime = file.lastModified();
+ if (modifiedTime > lastModifiedTime)
+ {
+ mostRecentlyModifiedFile = file;
+ lastModifiedTime = modifiedTime;
+ }
+ }
+
+ return mostRecentlyModifiedFile;
+ }
+ } catch (Exception e)
+ {
+ System.out.println("Error finding spark jar file with exception: " + e.getMessage());
+ }
+
+ return null;
+ }
+
+ public SparkConf getDefaultSparkConf()
+ {
+ File sparkJar = findRecentlyBuiltSparkJar();
+
+ String sparkJarPath = "";
+ if (sparkJar != null)
+ {
+ sparkJarPath = sparkJar.getAbsolutePath();
+ System.out.println("Spark jar: " + sparkJarPath);
+ }
+ else
+ {
+ System.out.println("Unable to find spark jar matching pattern: spark-hpcc.*-jar-with-dependencies.jar, "
+ + "in directory [PROJECT_ROOT]/DataAccess/target/, check maven package / verify output for errors.");
+ }
+
+ String[] jars = {
+ sparkJarPath
+ };
+
+ return new SparkConf()
+ .setMaster("local")
+ .setAppName("Spark-HPCC-Connector-Test")
+ .set("spark.driver.allowMultipleContexts", "false")
+ .set("spark.sql.allowMultipleContexts", "false")
+ .setJars(jars);
+ }
+
+ public SparkContext getOrCreateSparkContext()
+ {
+ if (sparkContext != null)
+ {
+ return sparkContext;
+ }
+
+ return getOrCreateSparkContext(getDefaultSparkConf());
+ }
+
+ public SparkContext getOrCreateSparkContext(SparkConf conf)
+ {
+ if (sparkContext != null)
+ {
+ sparkContext.stop();
+ SparkSession.clearActiveSession();
+ SparkSession.clearDefaultSession();
+
+ sparkContext = new SparkContext(conf);
+ }
+
+ return sparkContext;
+ }
+
+ public SparkSession getOrCreateSparkSession()
+ {
+ SparkSession spark = SparkSession
+ .builder()
+ .appName("Spark-HPCC-Connector-Test")
+ .config(getDefaultSparkConf())
+ .getOrCreate();
+ return spark;
+ }
+
+ public SparkSession getOrCreateSparkSession(SparkConf conf)
+ {
+ SparkSession spark = SparkSession
+ .builder()
+ .appName("Spark-HPCC-Connector-Test")
+ .config(conf)
+ .getOrCreate();
+ return spark;
+ }
+
+ public String getHPCCClusterURL()
+ {
+ return System.getProperty("hpccconn", "https://eclwatch.default:8010");
+ }
+
+ public String getHPCCClusterUser()
+ {
+ return System.getProperty("hpccuser", "");
+ }
+
+ public String getHPCCClusterPass()
+ {
+ return System.getProperty("hpccpass", "");
+ }
+
+ public String getThorCluster()
+ {
+ return System.getProperty("thorclustername", "data");
+ }
+}
diff --git a/spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/DataframeIntegrationTest.java b/spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/DataframeIntegrationTest.java
new file mode 100644
index 000000000..46123424a
--- /dev/null
+++ b/spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/DataframeIntegrationTest.java
@@ -0,0 +1,179 @@
+/*******************************************************************************
+ * HPCC SYSTEMS software Copyright (C) 2023 HPCC Systems®.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.hpccsystems.spark;
+
+import java.util.List;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DataframeIntegrationTest extends BaseIntegrationTest
+{
+
+ @Test
+ public void integerKeyValueWriteReadTest()
+ {
+ SparkSession spark = getOrCreateSparkSession();
+
+ // Create the schema
+ StructType schema = DataTypes.createStructType(new StructField[] {
+ DataTypes.createStructField("key", DataTypes.LongType, false),
+ DataTypes.createStructField("value", DataTypes.LongType, false)
+ });
+
+ // Write dataset to HPCC
+ List rows = new ArrayList();
+ for (int i = 0; i < 1000; i++) {
+ Object[] fields = new Object[2];
+ fields[0] = Long.valueOf(i);
+ fields[1] = Long.valueOf(i);
+ rows.add(new GenericRowWithSchema(fields, schema));
+ }
+
+ Dataset writtenDataSet = spark.createDataFrame(rows, schema);
+
+ String datasetPath = "spark::test::integer_kv";
+ writtenDataSet.write()
+ .format("hpcc")
+ .mode("overwrite")
+ .option("cluster", getThorCluster())
+ .option("host", getHPCCClusterURL())
+ .option("username", getHPCCClusterUser())
+ .option("password", getHPCCClusterPass())
+ .save(datasetPath);
+
+ // Read dataset from HPCC
+ Dataset readDataSet = spark.read()
+ .format("hpcc")
+ .option("cluster", getThorCluster())
+ .option("host", getHPCCClusterURL())
+ .option("username", getHPCCClusterUser())
+ .option("password", getHPCCClusterPass())
+ .load(datasetPath);
+
+ StructType readSchema = readDataSet.schema();
+ System.out.println(readSchema);
+
+ Dataset diff = writtenDataSet.exceptAll(readDataSet);
+ Assert.assertTrue("Difference found between written and read datasets", diff.isEmpty());
+ }
+
+ @Test
+ public void allTypesWriteReadTest()
+ {
+ SparkSession spark = getOrCreateSparkSession();
+
+ StructType inlineSchema = DataTypes.createStructType(new StructField[] {
+ DataTypes.createStructField("key", DataTypes.IntegerType, false),
+ DataTypes.createStructField("val", DataTypes.IntegerType, false)
+ });
+
+ StructType childSchema = DataTypes.createStructType(new StructField[] {
+ DataTypes.createStructField("test", DataTypes.IntegerType, false),
+ DataTypes.createStructField("test2", DataTypes.IntegerType, false)
+ });
+
+ // Create the schema
+ StructType schema = DataTypes.createStructType(new StructField[] {
+ DataTypes.createStructField("byteVal", DataTypes.ByteType, false),
+ DataTypes.createStructField("shortVal", DataTypes.ShortType, false),
+ DataTypes.createStructField("intVal", DataTypes.IntegerType, false),
+ DataTypes.createStructField("longVal", DataTypes.LongType, false),
+ DataTypes.createStructField("floatVal", DataTypes.FloatType, false),
+ DataTypes.createStructField("doubleVal", DataTypes.DoubleType, false),
+ DataTypes.createStructField("decimalVal", DataTypes.createDecimalType(16, 8), false),
+ DataTypes.createStructField("stringVal", DataTypes.StringType, false),
+ DataTypes.createStructField("binaryVal", DataTypes.BinaryType, false),
+ DataTypes.createStructField("setVal", DataTypes.createArrayType(DataTypes.IntegerType), false),
+ DataTypes.createStructField("inlineRec", inlineSchema, false),
+ DataTypes.createStructField("childDataset", DataTypes.createArrayType(childSchema), false),
+ });
+
+ // Write dataset to HPCC
+ List rows = new ArrayList();
+ for (int i = 0; i < 1000; i++)
+ {
+ Object[] fields = new Object[12];
+ fields[0] = Byte.valueOf((byte) i);
+ fields[1] = Short.valueOf((short) i);
+ fields[2] = Integer.valueOf((int) i);
+ fields[3] = Long.valueOf((long) i);
+ fields[4] = Float.valueOf(0);
+ fields[5] = Double.valueOf(10.42);
+ fields[6] = new BigDecimal(10.42);
+ fields[7] = "TestString";
+ fields[8] = new String("BinaryVal").getBytes();
+
+ Integer[] set = new Integer[2];
+ set[0] = Integer.valueOf(i);
+ set[1] = Integer.valueOf(i);
+ fields[9] = set;
+
+ Object[] inlineRec = new Object[2];
+ inlineRec[0] = Integer.valueOf(i);
+ inlineRec[1] = Integer.valueOf(i);
+ fields[10] = new GenericRowWithSchema(inlineRec, childSchema);
+
+ int numChildRows = 10;
+ List childDataset = new ArrayList();
+ for (int j = 0; j < numChildRows; j++)
+ {
+ Object[] childRec = new Object[2];
+ childRec[0] = Integer.valueOf(j);
+ childRec[1] = Integer.valueOf(j);
+
+ childDataset.add(new GenericRowWithSchema(childRec, childSchema));
+ }
+ fields[11] = childDataset.toArray();
+
+ rows.add(new GenericRowWithSchema(fields, schema));
+ }
+
+ Dataset writtenDataSet = spark.createDataFrame(rows, schema);
+
+ String datasetPath = "spark::test::all_types";
+ writtenDataSet.write()
+ .format("hpcc")
+ .mode("overwrite")
+ .option("cluster", getThorCluster())
+ .option("host", getHPCCClusterURL())
+ .option("username", getHPCCClusterUser())
+ .option("password", getHPCCClusterPass())
+ .save(datasetPath);
+
+ // Read dataset from HPCC
+ Dataset readDataSet = spark.read()
+ .format("hpcc")
+ .option("cluster", getThorCluster())
+ .option("host", getHPCCClusterURL())
+ .option("username", getHPCCClusterUser())
+ .option("password", getHPCCClusterPass())
+ .load(datasetPath);
+
+ Dataset diff = writtenDataSet.exceptAll(readDataSet);
+ Assert.assertTrue("Difference found between written and read datasets", diff.isEmpty());
+ }
+}
diff --git a/spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/FileFilterTests.java b/spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/FileFilterTests.java
new file mode 100644
index 000000000..bedb8ab6a
--- /dev/null
+++ b/spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/FileFilterTests.java
@@ -0,0 +1,111 @@
+/*******************************************************************************
+ * HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *******************************************************************************/
+package org.hpccsystems.spark;
+
+import org.junit.Assert;
+import org.apache.spark.sql.sources.EqualTo;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.sources.GreaterThan;
+import org.apache.spark.sql.sources.GreaterThanOrEqual;
+import org.apache.spark.sql.sources.In;
+import org.apache.spark.sql.sources.LessThan;
+import org.apache.spark.sql.sources.Not;
+import org.apache.spark.sql.sources.Or;
+import org.apache.spark.sql.sources.StringStartsWith;
+import org.hpccsystems.commons.ecl.FileFilter;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.junit.experimental.categories.Category;
+
+@Category(org.hpccsystems.commons.annotations.BaseTests.class)
+public class FileFilterTests
+{
+
+ @Before
+ public void setUp() throws Exception
+ {
+ }
+
+ @Test
+ public void testNotSparkFilterstoHPCCFilters()
+ {
+ System.out.println("\n----------Spark 'Not' filter to HPCC Tests----------");
+
+ try
+ {
+ Filter child = new LessThan("field1", 8);
+ Not notlessthan = new Not(child);
+ FileFilter hpccnotlessthan = FileFilterConverter.ConvertToHPCCFileFilterString(notlessthan);
+ Assert.assertNotNull(hpccnotlessthan);
+
+ GreaterThanOrEqual gte = new GreaterThanOrEqual("field1", 8);
+ FileFilter hpccgte = FileFilterConverter.ConvertToHPCCFileFilterString(gte);
+ Assert.assertNotNull(hpccgte);
+
+ Assert.assertEquals(hpccnotlessthan.toJson(), hpccgte.toJson());
+ }
+ catch (Exception e) {
+ // TODO: handle exception
+ }
+ }
+
+ @Test
+ public void testSparkFilterstoHPCCFilters()
+ {
+
+ System.out.println("\n----------Spark to HPCC filter Tests----------");
+
+ org.apache.spark.sql.sources.Filter [] sparkfilters = new org.apache.spark.sql.sources.Filter[8];
+ StringStartsWith ssw = new StringStartsWith("Fname", "Rod");
+ LessThan lt = new LessThan("field1", 12);
+ GreaterThan gt = new GreaterThan("field1", 8);
+ Or or = new Or(lt, gt);
+ sparkfilters[0] = ssw;
+ sparkfilters[1] = or;
+
+ In in = new In("field1", new Object [] { "str", "values", "etc"});
+ sparkfilters[2] = in;
+
+ In innumber = new In("field1", new Object [] { 1, 2, 3, 4, 5.6});
+ sparkfilters[3] = innumber;
+
+ LessThan lta = new LessThan("alphafield", "XYZ");
+ sparkfilters[4] = lta;
+
+ Filter child = new EqualTo("field1", "true");
+ org.apache.spark.sql.sources.Not n = new org.apache.spark.sql.sources.Not(child );
+ sparkfilters[5] = n;
+
+ Filter eq5 = new EqualTo("field1", 5);
+ sparkfilters[6] = eq5;
+
+ child = new LessThan("field1", -3.2);
+ n = new Not(child);
+ sparkfilters[7] = n;
+
+ try
+ {
+ FileFilter hpccfilters = FileFilterConverter.CovertToHPCCFileFilter(sparkfilters);
+ System.out.println("\n----------Converting Spark to HPCC filter output----------");
+ System.out.println(hpccfilters.toJson());
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/HpccRelationIntegrationTest.java b/spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/HpccRelationIntegrationTest.java
new file mode 100644
index 000000000..6d1590f15
--- /dev/null
+++ b/spark-hpcc/DataAccess/src/test/java/org/hpccsystems/spark/HpccRelationIntegrationTest.java
@@ -0,0 +1,192 @@
+package org.hpccsystems.spark;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TreeMap;
+
+import javax.xml.validation.Schema;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+import org.apache.spark.sql.sources.EqualTo;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.sources.GreaterThan;
+import org.apache.spark.sql.sources.In;
+import org.apache.spark.sql.sources.IsNull;
+import org.apache.spark.sql.sources.LessThan;
+import org.apache.spark.sql.sources.Not;
+import org.apache.spark.sql.sources.Or;
+import org.apache.spark.sql.sources.StringContains;
+import org.apache.spark.sql.sources.StringEndsWith;
+import org.apache.spark.sql.sources.StringStartsWith;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.hpccsystems.dfs.client.CompressionAlgorithm;
+import org.hpccsystems.spark.datasource.HpccOptions;
+import org.hpccsystems.spark.datasource.HpccRelation;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+@Category(org.hpccsystems.commons.annotations.BaseTests.class)
+public class HpccRelationIntegrationTest extends BaseIntegrationTest
+{
+ @Test
+ public void testbuildScanAllValid() throws Exception
+ {
+ SparkSession spark = getOrCreateSparkSession();
+ SQLContext sqlcontext = new SQLContext(spark);
+
+ // Create the schema
+ StructType schema = DataTypes.createStructType(new StructField[] {
+ DataTypes.createStructField("key", DataTypes.LongType, false),
+ DataTypes.createStructField("value", DataTypes.LongType, false)
+ });
+
+ // Write dataset to HPCC
+ List rows = new ArrayList();
+ for (int i = 0; i < 1000; i++) {
+ Object[] fields = new Object[2];
+ fields[0] = Long.valueOf(i);
+ fields[1] = Long.valueOf(i);
+ rows.add(new GenericRowWithSchema(fields, schema));
+ }
+
+ Dataset writtenDataSet = spark.createDataFrame(rows, schema);
+
+ String testDataset = "spark::test::integer_kv";
+ writtenDataSet.write()
+ .format("hpcc")
+ .mode("overwrite")
+ .option("cluster", getThorCluster())
+ .option("host", getHPCCClusterURL())
+ .option("username", getHPCCClusterUser())
+ .option("password", getHPCCClusterPass())
+ .save(testDataset);
+
+ TreeMap paramTreeMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+ paramTreeMap.put("host", getHPCCClusterURL());
+ paramTreeMap.put("path", testDataset);
+ paramTreeMap.put("cluster", getThorCluster());
+ paramTreeMap.put("username", getHPCCClusterUser());
+ paramTreeMap.put("password", getHPCCClusterPass());
+
+ HpccOptions hpccopts = new HpccOptions(paramTreeMap);
+ HpccRelation hpccRelation = new HpccRelation(sqlcontext, hpccopts);
+
+ Filter[] supportedSparkFilters = {
+ new Or(new LessThan("key", 12), new GreaterThan("key", 8)),
+ new In("key", new Object [] { 1, 2, 3, 4, 5}),
+ new EqualTo("key", 5),
+ new Not(new LessThan("key", 3)),
+ };
+
+ RDD rdd = hpccRelation.buildScan(new String[]{"key"}, supportedSparkFilters);
+ Assert.assertTrue("Unexpected filter result count", rdd.count() == 1);
+ }
+
+ @Test
+ public void testOptionsPassThrough() throws Exception
+ {
+ SparkSession spark = getOrCreateSparkSession();
+ SQLContext sqlcontext = new SQLContext(spark);
+
+ TreeMap paramTreeMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+
+ String url = getHPCCClusterURL();
+ String user = "user";
+ String pass = "pass";
+ paramTreeMap.put("host", url);
+ paramTreeMap.put("username", user);
+ paramTreeMap.put("password", pass);
+
+ String path = "spark::test::integer_kv";
+ paramTreeMap.put("path", path);
+
+ paramTreeMap.put("cluster", getThorCluster());
+ paramTreeMap.put("useTLK", "True"); // Defaults to false, also should be case insensitive
+ paramTreeMap.put("fileAccessTimeout", "120000");
+ paramTreeMap.put("limitPerFilePart", "100");
+
+ String projectList = "key, value";
+ paramTreeMap.put("projectList", projectList);
+
+ String filterStr = "key > 5";
+ paramTreeMap.put("filter", filterStr);
+
+ paramTreeMap.put("compression", "LZ4");
+
+ HpccOptions hpccopts = new HpccOptions(paramTreeMap);
+
+ // These options don't currently have accessors in HPCCFile
+ Assert.assertEquals(url, hpccopts.connectionInfo.getUrl());
+ Assert.assertEquals(user, hpccopts.connectionInfo.getUserName());
+ Assert.assertEquals(pass, hpccopts.connectionInfo.getPassword());
+ Assert.assertEquals(filterStr, hpccopts.filterString);
+ Assert.assertEquals(CompressionAlgorithm.LZ4, hpccopts.compression);
+
+ HpccRelation hpccRelation = new HpccRelation(sqlcontext, hpccopts);
+
+ Assert.assertEquals(true, hpccRelation.getFile().getUseTLK());
+ Assert.assertEquals(getThorCluster(), hpccRelation.getFile().getTargetfilecluster());
+ Assert.assertEquals(path, hpccRelation.getFile().getFileName());
+ Assert.assertEquals(120000, hpccRelation.getFile().getFileAccessExpirySecs());
+ Assert.assertEquals(100, hpccRelation.getFile().getFilePartRecordLimit());
+ Assert.assertEquals(projectList, hpccRelation.getFile().getProjectList());
+ }
+
+ @Test
+ public void testUnhandledFiltersAllValid() throws Exception
+ {
+ HpccRelation hpccRelation = new HpccRelation(null, null);
+
+ Filter[] supportedSparkFilters = {
+ new StringStartsWith("fixstr8", "Rod"),
+ new Or(new LessThan("int8", 12), new GreaterThan("int8", 8)),
+ new In("int8", new Object [] { "str", "values", "etc"}),
+ new In("int8", new Object [] { 1, 2, 3, 4, 5.6}),
+ new LessThan("fixstr8", "XYZ"),
+ new Not(new EqualTo("fixstr8", "true")),
+ new EqualTo("int8", 5),
+ new Not(new LessThan("int8", 3))
+ };
+
+ Filter [] unhandledsparkfilters = hpccRelation.unhandledFilters(supportedSparkFilters);
+
+ Assert.assertTrue("Unexpected unhandled filters detected" , unhandledsparkfilters.length == 0);
+ }
+
+ @Test
+ public void testUnhandledFiltersNoneValid() throws Exception
+ {
+ HpccRelation hpccRelation = new HpccRelation(null, null);
+
+ Filter[] unsupportedSparkFilters = {
+ new IsNull("something"),
+ new Or(new LessThan("int8", 12), new GreaterThan("int4", 8)),
+ new Not(new Or(new LessThan("int8", 12), new GreaterThan("int8", 8))),
+ new Not(new In("int8", new Object [] { 1, 2, 3, 4, 5.6})),
+ new StringContains("somestring", "some"),
+ new StringEndsWith("somestring", "ing")
+ };
+
+ Filter[] unhandledsparkfilters = hpccRelation.unhandledFilters(unsupportedSparkFilters);
+
+ Assert.assertTrue("Unexpected unhandled filters detected" , unhandledsparkfilters.length == unsupportedSparkFilters.length);
+ }
+}
diff --git a/spark-hpcc/Examples/PySparkExample.ipynb b/spark-hpcc/Examples/PySparkExample.ipynb
new file mode 100644
index 000000000..d540893b9
--- /dev/null
+++ b/spark-hpcc/Examples/PySparkExample.ipynb
@@ -0,0 +1,321 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "id": "497761f5",
+ "metadata": {},
+ "source": [
+ "# Spark-HPCC Connector for HPCC Systems Platform and Spark Connectivity\n",
+ "\n",
+ "This example demonstrates how to use the Spark-HPCC Connector to read and write data from / to HPCC Systems clusters, as well as providing basic setup information for the Spark-HPCC connector.\n",
+ "\n",
+ "## Spark-HPCC Connector Installation:\n",
+ "\n",
+ "---\n",
+ "\n",
+ "The Spark-HPCC Connector jar and its dependencies need to be made available to all Spark worker nodes and the Spark driver application. This can be done by adding the Spark-HPCC connector jar to the classpath on every node in the Spark cluster and to the classpath for the Spark driver, or by using the ```--jars``` option when executing spark-submit or pyspark.\n",
+ "\n",
+ "Download the Spark-HPCC jar with dependencies from Maven Central: https://mvnrepository.com/artifact/org.hpccsystems/spark-hpcc\n",
+ "\n",
+ "### Example of using the jars option:\n",
+ "```\n",
+ "pyspark --jars spark-hpcc-9.2.2-1-jar-with-dependencies.jar\n",
+ "```\n",
+ "\n",
+ "### Adding Spark-HPCC jar to classpath\n",
+ "The Spark-HPCC jar can also be added to the classpath through various means depending on the configuration of your Spark cluster, more information about updating the classpath can be found within the Spark documentation: https://spark.apache.org/docs/latest/configuration.html"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "eb1182be",
+ "metadata": {},
+ "source": [
+ "# Creating a test dataset\n",
+ "\n",
+ "The following code will create a dataframe with two columns, key and fill, that will be used to demonstrate the reading and writing functionality of the Spark-HPCC connector.\n",
+ "\n",
+ "---"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "7103a826",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from pyspark.sql import SparkSession\n",
+ "import random"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "44c6d7e4",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "data = [(i, int(1e10 * random.random())) for i in range(1000)]\n",
+ "df = spark.createDataFrame(data, [\"key\", \"fill\"])\n",
+ "df.show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "2668405b",
+ "metadata": {},
+ "source": [
+ "# Writing Data to HPCC Systems\n",
+ "\n",
+ "---\n",
+ "\n",
+ "A Spark Dataframe can be written to HPCC using the Spark DataSource API.\n",
+ "- **Mode**: This is the Spark SaveMode, the Spark-HPCC Connector supports: *[ErrorIfExists, Ignore, Overwrite]*\n",
+ " - Defaults to ErrorIfExists\n",
+ "- **Host**: The URL of an ESP running on the target HPCC Systems cluster.\n",
+ "- **Username / Password**: Credentials for an HPCC Systems cluster user, can be empty or null if security isn't enabled on the target cluster.\n",
+ "- **Cluster**: The name of the underlying Thor cluster storage plane, this will change based on the target HPCC Systems cluster configuration, but will default to \"mythor\" on bare-metal and \"data\" on containerized systems.\n",
+ "- **Path**: The file path for the dataset within the HPCC Systems cluster. **Note** Spark-HPCC versions [9.2.110, 9.4.84, 9.6.36, 9.8.10] and above allows for paths to be defined with \"/\" path delimiter instead of the HPCC \"::\" delimiter this fixes URI formatting errors on Databricks.\n",
+ "- **Compression**: The compression algorithm to use when writing the file to the HPCC Systems cluster.\n",
+ " - Options: *[default, none, lz4, flz, lzw]*\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "05ba80cb",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "df.write.save(format=\"hpcc\",\n",
+ " mode=\"overwrite\",\n",
+ " host=\"http://127.0.0.1:8010\",\n",
+ " username=\"\",\n",
+ " password=\"\",\n",
+ " cluster=\"mythor\",\n",
+ " #path=\"spark::test::dataset\", Old path format not supported on Databricks\n",
+ " path=\"/spark/test/dataset\",\n",
+ " compression=\"default\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "1c4d4c9f",
+ "metadata": {},
+ "source": [
+ "# Reading Data from HPCC Systems\n",
+ "\n",
+ "---\n",
+ "\n",
+ "A dataset from within an HPCC Systems cluster can be read via the Spark Datasource API.\n",
+ "\n",
+ "- **Host**: The URL of an ESP running on the target HPCC Systems cluster.\n",
+ "- **Username / Password**: Credentials for an HPCC Systems cluster user, can be empty or null if security isn't enabled on the target cluster.\n",
+ "- **Cluster**: The name of the underlying Thor cluster storage plane, this will change based on the target HPCC Systems cluster configuration, but will default to \"mythor\" on bare-metal and \"data\" on containerized systems.\n",
+ "- **Path**: The file path for the dataset within the HPCC Systems cluster. **Note** Spark-HPCC versions [9.2.110, 9.4.84, 9.6.36, 9.8.10] and above allows for paths to be defined with \"/\" path delimiter instead of the HPCC \"::\" delimiter this fixes URI formatting errors on Databricks.\n",
+ "- **limitPerFilePart**: *Optional* Limit on the number of records to be read per file part / partition within the HPCC Systems dataset.\n",
+ "- **projectList**: *Optional* The columns that should be read from the HPCC Systems dataset.\n",
+ "- **useTLK** *Optional* Defaults to false, determines whether or not the TLK (Top Level Key) should be used when reading index files. \n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "e8d49d8f",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "readDf = spark.read.load(format=\"hpcc\",\n",
+ " host=\"http://127.0.0.1:8010\",\n",
+ " username=\"\",\n",
+ " password=\"\",\n",
+ " useTLK=\"false\",\n",
+ " cluster=\"mythor\",\n",
+ " #path=\"spark::test::dataset\", Old path format not supported on Databricks\n",
+ " path=\"/spark/test/dataset\",\n",
+ " limitPerFilePart=100,\n",
+ " projectList=\"key, fill\")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "c16a758c",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "readDf.show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "731e0dfd",
+ "metadata": {},
+ "source": [
+ "# OpenTelemetry Support\n",
+ "\n",
+ "---\n",
+ "\n",
+ "Spark-HPCC after 9.8.12 supports OpenTelemetry tracing. In order to utilize tracing with PySpark OpenTelemetry will need to be enabled and configured within your PySpark code, exporter jars will need to be added to the Spark Java class path, and finally tracing information needs to passed from Python into the Spark-HPCC APIs.\n",
+ "\n",
+ "## Python Setup\n",
+ "The following python libraries need to be installed:\n",
+ "```\n",
+ "!pip install opentelemetry-api\n",
+ "!pip install opentelemetry-sdk\n",
+ "!pip install opentelemetry-exporter-otlp-proto-grpc\n",
+ "```\n",
+ "\n",
+ "See: https://opentelemetry.io/docs/zero-code/python/configuration for more information on Python OpenTelemetry configuration\n",
+ "\n",
+ "\n",
+ "## Java Setup\n",
+ "The following jars will need to be available on the classpath in Spark:\n",
+ "```\n",
+ "opentelemetry-exporter-otlp-1.38.0.jar\n",
+ "opentelemetry-exporter-sender-okhttp-1.38.0.jar\n",
+ "```\n",
+ "The Java OpenTelemetry SDK is auto-configured based on environment variables. By default all tracing will be exported to logging. In order to correctly export logs to an external aggregator changing environment variables is required; See https://opentelemetry.io/docs/languages/java/configuration/ for more information on available configuration.\n",
+ "\n",
+ "Example Java environment variables to configure the otlp grpc exporter:\n",
+ "```\n",
+ "'OTEL_TRACES_EXPORTER' = 'otlp'\n",
+ "'OTEL_LOGS_EXPORTER' = 'logging'\n",
+ "'OTEL_METRICS_EXPORTER' = 'logging'\n",
+ "'OTEL_EXPORTER_OTLP_PROTOCOL' = 'grpc'\n",
+ "'OTEL_EXPORTER_OTLP_ENDPOINT' = 'http://localhost:4317'\n",
+ "'OTEL_JAVA_GLOBAL_AUTOCONFIGURE_ENABLED' = 'true'\n",
+ "```\n",
+ "\n",
+ "## Example PySpark Command:\n",
+ "```bash\n",
+ "pyspark \\\n",
+ " --jars ./spark-hpcc-9.8.12-0-jar-with-dependencies.jar,./opentelemetry-exporter-otlp-1.38.0.jar,./opentelemetry-exporter-sender-okhttp-1.38.0.jar \\\n",
+ " --conf \"spark.driver.extraJavaOptions=-Dotel.java.global-autoconfigure.enabled=true \\\n",
+ " -Dotel.traces.exporter=otlp \\\n",
+ " -Dotel.logs.exporter=logging \\\n",
+ " -Dotel.metrics.exporter=logging \\\n",
+ " -Dotel.exporter.otlp.protocol=http/protobuf \\\n",
+ " -Dotel.exporter.otlp.endpoint=http://localhost:4318\" \\\n",
+ " --conf \"spark.executor.extraJavaOptions=-Dotel.java.global-autoconfigure.enabled=true \\\n",
+ " -Dotel.traces.exporter=otlp \\\n",
+ " -Dotel.logs.exporter=logging \\\n",
+ " -Dotel.metrics.exporter=logging \\\n",
+ " -Dotel.exporter.otlp.protocol=http/protobuf \\\n",
+ " -Dotel.exporter.otlp.endpoint=http://localhost:4318\"\n",
+ "```"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "341f267a",
+ "metadata": {},
+ "source": [
+ "# OpenTelemetry Example\n",
+ "\n",
+ "---\n",
+ "\n",
+ "Spark-HPCC APIs now support the ability to pass in the OpenTelemetry TraceID & SpanID to propagate tracing.\n",
+ "\n",
+ "- **traceID**: *Optional* The hexadecimal string representing the current trace.\n",
+ "- **spanID** *Optional* The hexadecimal string representing the current span."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "d195e46a",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from pyspark.sql import SparkSession\n",
+ "import os\n",
+ "\n",
+ "from opentelemetry import trace\n",
+ "from opentelemetry.sdk.trace import TracerProvider\n",
+ "from opentelemetry.sdk.trace.export import (\n",
+ " BatchSpanProcessor,\n",
+ ")\n",
+ "from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter\n",
+ "\n",
+ "# Configure Python OpenTelemetry\n",
+ "# Note: this needs to be done seperately from the Java configuration\n",
+ "otlp_exporter = OTLPSpanExporter(\n",
+ " endpoint=\"http://localhost:4317\",\n",
+ ")\n",
+ "\n",
+ "provider = TracerProvider()\n",
+ "processor = BatchSpanProcessor(otlp_exporter)\n",
+ "provider.add_span_processor(processor)\n",
+ "\n",
+ "trace.set_tracer_provider(provider)\n",
+ "tracer = trace.get_tracer(\"spark.example.tracer\")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "dd4552d1",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "data = [(i, int(1e10 * random.random())) for i in range(1000)]\n",
+ "df = spark.createDataFrame(data, [\"key\", \"fill\"])\n",
+ "\n",
+ "# Example Spark-HPCC Write with OpenTelemetry Tracing\n",
+ "with tracer.start_as_current_span(\"PySpark.WriteSpan\") as writeSpan:\n",
+ "\n",
+ " # Convert trace & span IDs to hex string\n",
+ " trace_id = format(writeSpan.get_span_context().trace_id, '032x')\n",
+ " span_id = format(writeSpan.get_span_context().span_id, '016x')\n",
+ "\n",
+ " df.write.save(format=\"hpcc\",\n",
+ " mode=\"overwrite\",\n",
+ " host=\"http://127.0.01:8010\",\n",
+ " cluster=\"mythor\",\n",
+ " path=\"spark::test::dataset\",\n",
+ " compression=\"default\",\n",
+ " traceID=trace_id,\n",
+ " spanID=span_id)\n",
+ "\n",
+ "# Example Spark-HPCC Read with OpenTelemetry Tracing\n",
+ "with tracer.start_as_current_span(\"PySpark.ReadSpan\") as readSpan:\n",
+ "\n",
+ " # Convert trace & span IDs to hex string\n",
+ " trace_id = format(readSpan.get_span_context().trace_id, '032x')\n",
+ " span_id = format(readSpan.get_span_context().span_id, '016x')\n",
+ "\n",
+ " readDf = spark.read.load(format=\"hpcc\",\n",
+ " host=\"http://127.0.0.1:8010\",\n",
+ " cluster=\"mythor\",\n",
+ " path=\"spark::test::dataset\",\n",
+ " traceID=trace_id,\n",
+ " spanID=span_id)\n",
+ " # Note: Spark won't read a dataset until it is used, therefore the count needs to be part of the above SparkReadSpan\n",
+ " readDf.count()"
+ ]
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3 (ipykernel)",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.11.4"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}
diff --git a/spark-hpcc/LICENSE b/spark-hpcc/LICENSE
new file mode 100644
index 000000000..8dada3eda
--- /dev/null
+++ b/spark-hpcc/LICENSE
@@ -0,0 +1,201 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "{}"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright {yyyy} {name of copyright owner}
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/spark-hpcc/README.md b/spark-hpcc/README.md
new file mode 100644
index 000000000..049faf292
--- /dev/null
+++ b/spark-hpcc/README.md
@@ -0,0 +1,48 @@
+[![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.hpccsystems/spark-hpcc/badge.svg?subject=spark-hpcc)](https://maven-badges.herokuapp.com/maven-central/org.hpccsystems/spark-hpcc)
+
+
+
+
+
+ :zap: Note: This project references log4j which has been reported to include security vulnerabilitie(s) in versions prior to v2.15.0
+ |
+
+
+
+
+
+
+
+ - The Spark-HPCC project no longer references the offending log4j versions
+ - Users of Spark-HPCC are strongly encouraged to update to the latest version
+ - Learn more about the vulnerabiltiy: https://github.com/advisories/GHSA-jfh8-c2jp-5v3q
+
+ |
+
+
+
+
+# Spark-HPCC
+Spark classes for HPCC Systems / Spark interoperability
+
+### DataAccess
+The DataAccess project contains the classes which expose distributed
+streaming of HPCC based data via Spark constructs. In addition,
+the HPCC data is exposed as a Dataframe for the convenience of the Spark developer.
+
+### Dependencies
+The spark-hpcc target jar does not package any of the Spark libraries it depends on.
+If using a standard Spark submission pipeline such as spark-submit these dependencies will be provided as part of the Spark installation.
+However, if your pipeline executes a jar directly you may need to add the Spark libraries from your $SPARK_HOME to the classpath.
+
+### Examples & Documentation
+See: [Examples](https://github.com/hpcc-systems/Spark-HPCC/tree/master/Examples) for example usage of the connector as well as API documentation for the reading and writing APIs.
+
+## Please note:
+##### As reported by github:
+
+"In all versions of Apache Spark, its standalone resource manager accepts code to execute on a 'master' host, that then runs that code on 'worker' hosts. The master itself does not, by design, execute user code. A specially-crafted request to the master can, however, cause the master to execute code too. Note that this does not affect standalone clusters with authentication enabled. While the master host typically has less outbound access to other resources than a worker, the execution of code on the master is nevertheless unexpected.
+Mitigation
+
+Enable authentication on any Spark standalone cluster that is not otherwise secured from unwanted access, for example by network-level restrictions. Use spark.authenticate and related security properties described at https://spark.apache.org/docs/latest/security.html"
+