Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions velox/connectors/hive/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
velox_add_library(velox_hive_config OBJECT HiveConfig.cpp)
velox_link_libraries(velox_hive_config velox_core velox_exception)

add_subdirectory(iceberg)

velox_add_library(
velox_hive_connector
OBJECT
Expand All @@ -33,9 +31,11 @@ velox_add_library(
TableHandle.cpp
)

add_subdirectory(iceberg)

velox_link_libraries(
velox_hive_connector
PUBLIC velox_hive_iceberg_splitreader
PRIVATE velox_hive_iceberg_splitreader
PRIVATE velox_common_io velox_connector velox_dwio_catalog_fbhive velox_hive_partition_function
)

Expand Down
35 changes: 24 additions & 11 deletions velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "velox/connectors/hive/HiveDataSink.h"
#include "velox/connectors/hive/HiveDataSource.h"
#include "velox/connectors/hive/HivePartitionFunction.h"
#include "velox/connectors/hive/iceberg/IcebergDataSink.h"

#include <boost/lexical_cast.hpp>
#include <memory>
Expand Down Expand Up @@ -87,17 +88,29 @@ std::unique_ptr<DataSink> HiveConnector::createDataSink(
ConnectorInsertTableHandlePtr connectorInsertTableHandle,
ConnectorQueryCtx* connectorQueryCtx,
CommitStrategy commitStrategy) {
auto hiveInsertHandle =
std::dynamic_pointer_cast<const HiveInsertTableHandle>(
connectorInsertTableHandle);
VELOX_CHECK_NOT_NULL(
hiveInsertHandle, "Hive connector expecting hive write handle!");
return std::make_unique<HiveDataSink>(
inputType,
hiveInsertHandle,
connectorQueryCtx,
commitStrategy,
hiveConfig_);
if (auto icebergInsertHandle =
std::dynamic_pointer_cast<const iceberg::IcebergInsertTableHandle>(
connectorInsertTableHandle)) {
return std::make_unique<iceberg::IcebergDataSink>(
inputType,
icebergInsertHandle,
connectorQueryCtx,
commitStrategy,
hiveConfig_);
} else {
auto hiveInsertHandle =
std::dynamic_pointer_cast<const HiveInsertTableHandle>(
connectorInsertTableHandle);

VELOX_CHECK_NOT_NULL(
hiveInsertHandle, "Hive connector expecting hive write handle!");
return std::make_unique<HiveDataSink>(
inputType,
hiveInsertHandle,
connectorQueryCtx,
commitStrategy,
hiveConfig_);
}
}

std::unique_ptr<core::PartitionFunction> HivePartitionFunctionSpec::create(
Expand Down
9 changes: 9 additions & 0 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
#include "velox/expression/Expr.h"
#include "velox/expression/ExprToSubfieldFilter.h"

#include <boost/lexical_cast.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>

namespace facebook::velox::connector::hive {
namespace {

Expand Down Expand Up @@ -925,4 +929,9 @@ core::TypedExprPtr extractFiltersFromRemainingFilter(
}
return expr;
}

std::string makeUuid() {
return boost::lexical_cast<std::string>(boost::uuids::random_generator()());
}

} // namespace facebook::velox::connector::hive
2 changes: 2 additions & 0 deletions velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,6 @@ core::TypedExprPtr extractFiltersFromRemainingFilter(
common::SubfieldFilters& filters,
double& sampleRate);

std::string makeUuid();

} // namespace facebook::velox::connector::hive
Loading