Dynamic Filtering for Highly-Selective Join Optimization

Roman Zeyde
By Roman Zeyde
I
December 12, 2019
December 12, 2019

Latest update: April 23, 2020

Improving JOINs in Presto by an order of magnitude

By using dynamic filtering via run-time predicate pushdown, we can significantly optimize highly-selective inner-joins.

Introduction

Varada is a fast SQL analytics platform for data lakes on the cloud, built on three innovations and one great integration:

  1. A new architecture, NVMEoF, and SSD work in the cloud to create a shared-everything architecture.
  2. Inline indexing. Given the above architecture we index, every column at the ingest phase.
  3. Synchronized Materialized View. The way to insert data into Varada is via the creation of a materialized view on top of the data lake. Users define via SQL statement their data set they want to be ingested and indexed. The created materialized view will get notified with the addition or deletion of files in the relevant location in the data lake.

The great integration was building Varada on Presto đź™‚

Presto brought to the Varada product numerous advantages, and as Presto is a pure open-source product, we feel that it is our duty to give back to the open-source community. This blog post will discuss the dynamic filtering feature we developed in the last four months.

Dynamic filtering

In the highly-selective join scenario, most of the probe-side rows are dropped immediately after being read, since they don’t match the join criteria.

We extended Presto’s predicate pushdown support from the planning phase to run-time, in order to skip reading the non-relevant rows from our connector into Presto. It allows much faster joins, when the build-side scan results in a low-cardinality table:

Join presto

The approach above is called “dynamic filtering”, and there is an ongoing effort to integrate it into Presto.

The main difficulty is the need to pass the build-side values from the inner-join operator to the probe-side scan operator, since the operators may run on different machines. A possible solution is to use the coordinator to facilitate the message passing. However, it requires multiple changes in the existing Presto codebase and careful design is needed to avoid overloading the coordinator.

Since it’s a complex feature with lots of moving parts, we suggest the approach below that allows solving it in a simpler way for specific join use-cases. We note that parts of the implementation below will also help implementing the general dynamic filtering solution.

Design

Our approach relies on the cost-based optimizer (CBO) that allows using “broadcast” join, since in our case the build-side is much smaller than the probe-side. In this case, the probe-side scan and the inner-join operators are running in the same process — so the message passing between them becomes much simpler.

Therefore, most of the required changes are at the LocalExecutionPlanner class, and there is no dependencies on the planner nor the coordinator.

Implementation

First, we make sure that a broadcast join is used and that the local stage query plan contains the probe-side TableScan node. Otherwise – we don’t apply our the optimization since we need access to the probe-side PageSourceProvider for predicate pushdown.

Then, we add a new “collection” operator, just before the hash-builder operator as described below:

Presto Collection

This operator collects the build-side values, and after its input is over, exposes the resulting dynamic filter as a TupleDomain to the probe-side PageSourceProvider.

Since the probe-side scan operators are running concurrently with the build-side collection, we don’t block the first probe-side splits — but allow them to be processed while dynamic filters collection is in progress.

The lookup-join operator is not changed, but the optimization above allows it to process much less probe-side rows, while keeping the result the same.

We recently added support for multiple join operators in the same stage, so that some TPC-DS queries (e.g. q13) can be further optimized. In addition, we now support dynamic filtering for semi-joins.

The implementation above was reviewed and merged into PrestoSQL in October 2019.

Benchmarks

The following TPC-DS queries benefit the most for our dynamic filtering implementation (measuring the elapsed time in seconds).

TPC-DS Presto
TPC-DS Presto

For example, running the TPC-DS q71 query on i3.metal 3-node Varada cluster using TPC-DS scale 1000 data results in ~9x performance improvement:

TPC-DS performance

Discussion

These queries are joining large fact “sales” tables with much smaller and filtered dimension tables (e.g. “items”, “customers”, “stores”) — resulting in significant optimization by using dynamic filtering.

Note that we rely on the fact that our connector allows efficient run-time filtering of the build-side table, by using an inline index for every column for each split.

We also rely on the CBO and statistics’ estimation to correctly convert join distribution type to “broadcast” join. Since current statistics’ estimation doesn’t support all query plans, this optimization cannot be currently applied for some types of aggregations (e.g. TPC-DS q19 query).

A shorter version of the post was initially published in the Presto community blog.