From 00ab4be0fcbe3fe7a337713ffcc8a6bd3b730757 Mon Sep 17 00:00:00 2001 From: taoxu Date: Wed, 27 Apr 2022 20:32:51 +0800 Subject: [PATCH] Use TF filesystem API to read HDFS --- WORKSPACE | 5 +- third_party/liborc.BUILD | 23 +---- third_party/liborc.patch | 195 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 201 insertions(+), 22 deletions(-) create mode 100644 third_party/liborc.patch diff --git a/WORKSPACE b/WORKSPACE index 11e449fc09..9fca2a9112 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -587,8 +587,9 @@ http_archive( http_archive( name = "liborc", build_file = "//third_party:liborc.BUILD", - patch_cmds = [ - "tar -xzf c++/libs/libhdfspp/libhdfspp.tar.gz -C c++/libs/libhdfspp", + patch_args = ["-p1"], + patches = [ + "//third_party:liborc.patch", ], sha256 = "39d983f4c7feb8ea1e8ab8e3e53e9afc643282b7a500b3a93c91aa6490f65c17", strip_prefix = "orc-rel-release-1.6.14", diff --git a/third_party/liborc.BUILD b/third_party/liborc.BUILD index 0ca325fd62..8292a14509 100644 --- a/third_party/liborc.BUILD +++ b/third_party/liborc.BUILD @@ -39,6 +39,7 @@ cc_library( ], copts = [], defines = [], + local_defines = ["BUILD_LIBHDFSPP"], includes = [ "c++/include", "c++/src", @@ -49,8 +50,9 @@ cc_library( linkopts = [], visibility = ["//visibility:public"], deps = [ - ":libhdfspp", ":orc_cc_proto", + "@local_config_tf//:libtensorflow_framework", + "@local_config_tf//:tf_header_lib", "@lz4", "@snappy", "@zlib", @@ -58,25 +60,6 @@ cc_library( ], ) -cc_library( - name = "libhdfspp", - srcs = glob( - [ - "c++/libs/libhdfspp/include/hdfspp/*.h", - ], - exclude = [ - ], - ), - hdrs = [ - ], - copts = [], - defines = [], - includes = [ - "c++/libs/libhdfspp/include", - ], - deps = [], -) - proto_library( name = "orc_proto", srcs = ["proto/orc_proto.proto"], diff --git a/third_party/liborc.patch b/third_party/liborc.patch new file mode 100644 index 0000000000..421eb04e2a --- /dev/null +++ b/third_party/liborc.patch @@ -0,0 +1,195 @@ +--- a/c++/src/OrcHdfsFile.cc 2022-04-11 04:30:41.000000000 +0800 ++++ b/c++/src/OrcHdfsFile.cc 2022-04-11 19:56:37.206680217 +0800 +@@ -1,4 +1,5 @@ + /** ++ * 1 + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information +@@ -29,145 +30,57 @@ + #include + #include + +-#include "hdfspp/hdfspp.h" ++#include "tensorflow/core/platform/env.h" ++#include "tensorflow/core/platform/file_system.h" ++#include "tensorflow/core/platform/logging.h" ++#include "tensorflow/core/platform/status.h" ++#include "tensorflow/core/platform/types.h" + + namespace orc { + +- class HdfsFileInputStream : public InputStream { +- private: +- std::string filename; +- std::unique_ptr file; +- std::unique_ptr file_system; +- uint64_t totalLength; +- const uint64_t READ_SIZE = 1024 * 1024; //1 MB +- +- public: +- HdfsFileInputStream(std::string _filename) { +- filename = _filename ; +- +- //Building a URI object from the given uri_path +- hdfs::URI uri; +- try { +- uri = hdfs::URI::parse_from_string(filename); +- } catch (const hdfs::uri_parse_error&) { +- throw ParseError("Malformed URI: " + filename); +- } +- +- //This sets conf path to default "$HADOOP_CONF_DIR" or "/etc/hadoop/conf" +- //and loads configs core-site.xml and hdfs-site.xml from the conf path +- hdfs::ConfigParser parser; +- if(!parser.LoadDefaultResources()){ +- throw ParseError("Could not load default resources. "); +- } +- auto stats = parser.ValidateResources(); +- //validating core-site.xml +- if(!stats[0].second.ok()){ +- throw ParseError(stats[0].first + " is invalid: " + stats[0].second.ToString()); +- } +- //validating hdfs-site.xml +- if(!stats[1].second.ok()){ +- throw ParseError(stats[1].first + " is invalid: " + stats[1].second.ToString()); +- } +- hdfs::Options options; +- if(!parser.get_options(options)){ +- throw ParseError("Could not load Options object. "); +- } +- hdfs::IoService * io_service = hdfs::IoService::New(); +- //Wrapping file_system into a unique pointer to guarantee deletion +- file_system = std::unique_ptr( +- hdfs::FileSystem::New(io_service, "", options)); +- if (file_system.get() == nullptr) { +- throw ParseError("Can't create FileSystem object. "); +- } +- hdfs::Status status; +- //Checking if the user supplied the host +- if(!uri.get_host().empty()){ +- //Using port if supplied, otherwise using "" to look up port in configs +- std::string port = uri.has_port() ? +- std::to_string(uri.get_port()) : ""; +- status = file_system->Connect(uri.get_host(), port); +- if (!status.ok()) { +- throw ParseError("Can't connect to " + uri.get_host() +- + ":" + port + ". " + status.ToString()); +- } +- } else { +- status = file_system->ConnectToDefaultFs(); +- if (!status.ok()) { +- if(!options.defaultFS.get_host().empty()){ +- throw ParseError("Error connecting to " + +- options.defaultFS.str() + ". " + status.ToString()); +- } else { +- throw ParseError( +- "Error connecting to the cluster: defaultFS is empty. " +- + status.ToString()); +- } +- } +- } +- +- if (file_system.get() == nullptr) { +- throw ParseError("Can't connect the file system. "); +- } +- +- hdfs::FileHandle *file_raw = nullptr; +- status = file_system->Open(uri.get_path(), &file_raw); +- if (!status.ok()) { +- throw ParseError("Can't open " +- + uri.get_path() + ". " + status.ToString()); +- } +- //Wrapping file_raw into a unique pointer to guarantee deletion +- file.reset(file_raw); +- +- hdfs::StatInfo stat_info; +- status = file_system->GetFileInfo(uri.get_path(), stat_info); +- if (!status.ok()) { +- throw ParseError("Can't stat " +- + uri.get_path() + ". " + status.ToString()); +- } +- totalLength = stat_info.length; ++class HdfsFileInputStream : public InputStream { ++ private: ++ std::string filename_; ++ std::unique_ptr file_; ++ uint64_t total_length_; ++ const uint64_t READ_SIZE = 1024 * 1024; // 1 MB ++ ++ public: ++ HdfsFileInputStream(std::string filename) { ++ filename_ = filename; ++ tensorflow::Status status = ++ tensorflow::Env::Default()->NewRandomAccessFile(filename_, &file_); ++ if (!status.ok()) { ++ LOG(FATAL) << status.ToString(); + } + +- uint64_t getLength() const override { +- return totalLength; +- } ++ tensorflow::Env::Default()->GetFileSize(filename_, &total_length_); ++ } + +- uint64_t getNaturalReadSize() const override { +- return READ_SIZE; +- } ++ uint64_t getLength() const override { return total_length_; } + +- void read(void* buf, +- uint64_t length, +- uint64_t offset) override { +- +- if (!buf) { +- throw ParseError("Buffer is null"); +- } +- +- hdfs::Status status; +- size_t total_bytes_read = 0; +- size_t last_bytes_read = 0; +- +- do { +- status = file->PositionRead(buf, +- static_cast(length) - total_bytes_read, +- static_cast(offset + total_bytes_read), &last_bytes_read); +- if(!status.ok()) { +- throw ParseError("Error reading the file: " + status.ToString()); +- } +- total_bytes_read += last_bytes_read; +- } while (total_bytes_read < length); +- } ++ uint64_t getNaturalReadSize() const override { return READ_SIZE; } + +- const std::string& getName() const override { +- return filename; ++ void read(void* buf, uint64_t length, uint64_t offset) override { ++ if (!buf) { ++ LOG(FATAL) << " Null buf"; ++ } ++ tensorflow::StringPiece sp; ++ tensorflow::Status s = ++ file_->Read(offset, length, &sp, static_cast(buf)); ++ if (!(s.ok() || tensorflow::errors::IsOutOfRange(s))) { ++ LOG(FATAL) << s.ToString(); + } ++ } + +- ~HdfsFileInputStream() override; +- }; ++ const std::string& getName() const override { return filename_; } + +- HdfsFileInputStream::~HdfsFileInputStream() { +- } ++ ~HdfsFileInputStream() override; ++}; + +- std::unique_ptr readHdfsFile(const std::string& path) { +- return std::unique_ptr(new HdfsFileInputStream(path)); +- } ++HdfsFileInputStream::~HdfsFileInputStream() {} ++ ++std::unique_ptr readHdfsFile(const std::string& path) { ++ return std::unique_ptr(new HdfsFileInputStream(path)); + } ++} // namespace orc