Skip to content

Commit

Permalink
Add ConnectorDeleteTableHandle to support DELETEs
Browse files Browse the repository at this point in the history
Changes to update finishDelete to new types

Fixed Iceberg Delete as proof of concept. Fixed missing serialization changes.

Kudu fix and checkstyle

recombine iceberg tablehandle and deletetablehandle, fix serialization

Documentation and add delete pagesink

Fix implementation of delete pagesink

delete extra comments/changes

update KuduTableHandle

remove comments and unused handle

add kudu resolver for delete handle
  • Loading branch information
shelton408 committed Feb 21, 2025
1 parent db0fe87 commit 399bba2
Show file tree
Hide file tree
Showing 24 changed files with 234 additions and 38 deletions.
6 changes: 3 additions & 3 deletions presto-docs/src/main/sphinx/develop/delete-and-update.rst
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ A connector implementing ``DELETE`` must specify three ``ConnectorMetadata`` met

* ``beginDelete()``::

ConnectorTableHandle beginDelete(
ConnectorDeleteTableHandle beginDelete(
ConnectorSession session,
ConnectorTableHandle tableHandle)

Expand All @@ -116,15 +116,15 @@ A connector implementing ``DELETE`` must specify three ``ConnectorMetadata`` met
``beginDelete()`` performs any orchestration needed in the connector to start processing the ``DELETE``.
This orchestration varies from connector to connector.

``beginDelete()`` returns a ``ConnectorTableHandle`` with any added information the connector needs when the handle
``beginDelete()`` returns a ``ConnectorDeleteTableHandle`` with any added information the connector needs when the handle
is passed back to ``finishDelete()`` and the split generation machinery. For most connectors, the returned table
handle contains a flag identifying the table handle as a table handle for a ``DELETE`` operation.

* ``finishDelete()``::

void finishDelete(
ConnectorSession session,
ConnectorTableHandle tableHandle,
ConnectoDeleteTableHandle tableHandle,
Collection<Slice> fragments)

During ``DELETE`` processing, the Presto engine accumulates the ``Slice`` collections returned by ``UpdatablePageSource.finish()``.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.facebook.presto.hive.statistics.HiveStatisticsProvider;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorMetadataUpdateHandle;
import com.facebook.presto.spi.ConnectorNewTableLayout;
Expand Down Expand Up @@ -2517,7 +2518,7 @@ public Optional<List<SchemaTableName>> getReferencedMaterializedViews(ConnectorS
}

@Override
public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
public ConnectorDeleteTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
{
throw new PrestoException(NOT_SUPPORTED, "This connector only supports delete where one or more partitions are deleted entirely");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.facebook.presto.iceberg.statistics.StatisticsFileCache;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorNewTableLayout;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
Expand Down Expand Up @@ -978,7 +979,7 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa
}

@Override
public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
public ConnectorDeleteTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
Expand All @@ -991,19 +992,17 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable
if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE) {
throw new PrestoException(NOT_SUPPORTED, format("This connector only supports delete where one or more partitions are deleted entirely for table versions older than %d", MIN_FORMAT_VERSION_FOR_DELETE));
}

if (getDeleteMode(icebergTable) == RowLevelOperationMode.COPY_ON_WRITE) {
throw new PrestoException(NOT_SUPPORTED, "This connector only supports delete where one or more partitions are deleted entirely. Configure delete_mode table property to allow row level deletions.");
}

validateTableMode(session, icebergTable);
transaction = icebergTable.newTransaction();

return handle;
}

@Override
public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHandle, Collection<Slice> fragments)
public void finishDelete(ConnectorSession session, ConnectorDeleteTableHandle tableHandle, Collection<Slice> fragments)
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.hive.HiveTransactionHandle;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
Expand Down Expand Up @@ -62,6 +63,12 @@ public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass()
return IcebergInsertTableHandle.class;
}

@Override
public Class<? extends ConnectorDeleteTableHandle> getDeleteTableHandleClass()
{
return IcebergTableHandle.class;
}

@Override
public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.iceberg;

import com.facebook.presto.hive.BaseHiveTableHandle;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
Expand All @@ -28,6 +29,7 @@

public class IcebergTableHandle
extends BaseHiveTableHandle
implements ConnectorDeleteTableHandle
{
private final IcebergTableName icebergTableName;
private final boolean snapshotSpecified;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.kudu;

import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
Expand Down Expand Up @@ -61,6 +62,12 @@ public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass()
return KuduInsertTableHandle.class;
}

@Override
public Class<? extends ConnectorDeleteTableHandle> getDeleteTableHandleClass()
{
return KuduTableHandle.class;
}

@Override
public Class<? extends ConnectorOutputTableHandle> getOutputTableHandleClass()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.facebook.presto.kudu.properties.PartitionDesign;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorNewTableLayout;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
Expand Down Expand Up @@ -357,13 +358,13 @@ public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, Connect
}

@Override
public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
public ConnectorDeleteTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
{
return tableHandle;
return (ConnectorDeleteTableHandle) tableHandle;
}

@Override
public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHandle, Collection<Slice> fragments)
public void finishDelete(ConnectorSession session, ConnectorDeleteTableHandle tableHandle, Collection<Slice> fragments)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.kudu;

import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.SchemaTableName;
import com.fasterxml.jackson.annotation.JsonCreator;
Expand All @@ -25,7 +26,7 @@
import static java.util.Objects.requireNonNull;

public class KuduTableHandle
implements ConnectorTableHandle
implements ConnectorTableHandle, ConnectorDeleteTableHandle
{
private final String connectorId;
private final SchemaTableName schemaTableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.facebook.presto.execution.scheduler;

import com.facebook.presto.metadata.DeleteTableHandle;
import com.facebook.presto.metadata.InsertTableHandle;
import com.facebook.presto.metadata.OutputTableHandle;
import com.facebook.presto.spi.SchemaTableName;
Expand Down Expand Up @@ -106,20 +107,20 @@ public String toString()
public static class DeleteHandle
extends ExecutionWriterTarget
{
private final TableHandle handle;
private final DeleteTableHandle handle;
private final SchemaTableName schemaTableName;

@JsonCreator
public DeleteHandle(
@JsonProperty("handle") TableHandle handle,
@JsonProperty("handle") DeleteTableHandle handle,
@JsonProperty("schemaTableName") SchemaTableName schemaTableName)
{
this.handle = requireNonNull(handle, "handle is null");
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
}

@JsonProperty
public TableHandle getHandle()
public DeleteTableHandle getHandle()
{
return handle;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private static Optional<DeleteScanInfo> createDeleteScanInfo(StreamingSubPlan pl
{
if (writerTarget.isPresent() && writerTarget.get() instanceof ExecutionWriterTarget.DeleteHandle) {
DeleteNode delete = getOnlyElement(findPlanNodes(plan, DeleteNode.class));
return createDeleteScanInfo(delete, writerTarget, metadata, session);
return createDeleteScanInfo(delete, metadata, session);
}
return Optional.empty();
}
Expand All @@ -154,19 +154,18 @@ private static Optional<DeleteScanInfo> createDeleteScanInfo(PlanNode planNode,
{
if (writerTarget.isPresent() && writerTarget.get() instanceof ExecutionWriterTarget.DeleteHandle) {
DeleteNode delete = findSinglePlanNode(planNode, DeleteNode.class).get();
return createDeleteScanInfo(delete, writerTarget, metadata, session);
return createDeleteScanInfo(delete, metadata, session);
}
return Optional.empty();
}

private static Optional<DeleteScanInfo> createDeleteScanInfo(DeleteNode delete, Optional<ExecutionWriterTarget> writerTarget, Metadata metadata, Session session)
private static Optional<DeleteScanInfo> createDeleteScanInfo(DeleteNode delete, Metadata metadata, Session session)
{
TableHandle tableHandle = ((ExecutionWriterTarget.DeleteHandle) writerTarget.get()).getHandle();
TableScanNode tableScan = getDeleteTableScan(delete);
TupleDomain<ColumnHandle> originalEnforcedConstraint = tableScan.getEnforcedConstraint();
TableLayoutResult layoutResult = metadata.getLayout(
session,
tableHandle,
tableScan.getTable(),
new Constraint<>(originalEnforcedConstraint),
Optional.of(ImmutableSet.copyOf(tableScan.getAssignments().values())));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,13 +379,13 @@ public OptionalLong metadataDelete(Session session, TableHandle tableHandle)
}

@Override
public TableHandle beginDelete(Session session, TableHandle tableHandle)
public DeleteTableHandle beginDelete(Session session, TableHandle tableHandle)
{
return delegate.beginDelete(session, tableHandle);
}

@Override
public void finishDelete(Session session, TableHandle tableHandle, Collection<Slice> fragments)
public void finishDelete(Session session, DeleteTableHandle tableHandle, Collection<Slice> fragments)
{
delegate.finishDelete(session, tableHandle, fragments);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.metadata;

import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Objects;

import static java.util.Objects.requireNonNull;

public final class DeleteTableHandle
{
private final ConnectorId connectorId;
private final ConnectorTransactionHandle transactionHandle;
private final ConnectorDeleteTableHandle connectorHandle;

@JsonCreator
public DeleteTableHandle(
@JsonProperty("connectorId") ConnectorId connectorId,
@JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle,
@JsonProperty("connectorHandle") ConnectorDeleteTableHandle connectorHandle)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.transactionHandle = requireNonNull(transactionHandle, "transactionHandle is null");
this.connectorHandle = requireNonNull(connectorHandle, "connectorHandle is null");
}

@JsonProperty
public ConnectorId getConnectorId()
{
return connectorId;
}

@JsonProperty
public ConnectorTransactionHandle getTransactionHandle()
{
return transactionHandle;
}

@JsonProperty
public ConnectorDeleteTableHandle getConnectorHandle()
{
return connectorHandle;
}

@Override
public int hashCode()
{
return Objects.hash(connectorId, transactionHandle, connectorHandle);
}

@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
DeleteTableHandle o = (DeleteTableHandle) obj;
return Objects.equals(this.connectorId, o.connectorId) &&
Objects.equals(this.transactionHandle, o.transactionHandle) &&
Objects.equals(this.connectorHandle, o.connectorHandle);
}

@Override
public String toString()
{
return connectorId + ":" + connectorHandle;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.metadata;

import com.facebook.presto.spi.ConnectorDeleteTableHandle;

import javax.inject.Inject;

public class DeleteTableHandleJacksonModule
extends AbstractTypedJacksonModule<ConnectorDeleteTableHandle>
{
@Inject
public DeleteTableHandleJacksonModule(HandleResolver handleResolver)
{
super(ConnectorDeleteTableHandle.class,
handleResolver::getId,
handleResolver::getDeleteTableHandleClass);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public void configure(Binder binder)
jsonBinder(binder).addModuleBinding().to(SplitJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(OutputTableHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(InsertTableHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(DeleteTableHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(IndexHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(TransactionHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(PartitioningHandleJacksonModule.class);
Expand Down
Loading

0 comments on commit 399bba2

Please sign in to comment.