Trino’s Connector Expression Pushdown – Why It’s A Big Deal And How Can You Use It

By Assaf Bern
I
April 21, 2022
April 21, 2022

Background & What’s New

As you may know, Trino pushes down information about the query into its data source connectors at the planning stage. This information allows the connector to return more precise data to Trino and potentially spare data fetching, data transfer over the network, and data processing and filtering by Trino. All of these directly affect the query execution time and the overall performance. Since we’re talking about big-data environments here, this can enhance query performance by orders of magnitude at certain connectors like Varada. That’s why we at Varada are always seeking to improve Trino’s transparency and flexibility over the query execution plan.

Until Trino 372, predicates were represented as TupleDomains. TuppleDomain is a data structure mapping each column to a set of ranges representing the required values. The relation between columns is always AND. Therefore, for example, the predicate col1 > 7 AND col2 in (1,3) is fully pushed down as col1: {(7, <max>)}, col2: {[1,1], [3,3]}.

However, many types of predicates can’t be moduled into ranges and therefore weren’t pushed down into the connectors. In addition, any OR relation between columns can’t be represented as a TupleDomain. For example, if we’ll change the operator of the previous predicate to an OR – col1 > 7 OR col2 in (1,3) the predicate won’t be pushed down at all. It can’t even be partially pushed down, because if we choose to translate, let’s say, only the left side to col1: {(7, <max>)} we might miss rows such as col1 = 5, col2 = 1. The meaning of this is that each worker node will have to run a full scan and send every single row to the coordinator node which will lead to high resource usage and degraded query performance. Each row will be processed and filtered by Trino.

Here are some more examples of predicates which can’t be represented as a TupleDomain and hence couldn’t be pushed down:

  • col1 < 1 OR col2 IS NULL
  • lower(col1) = ‘value’
  • col1 LIKE ‘%value%’
  • col1 NOT LIKE ‘%value%’
  • col1 > col2
  • col1 in (‘a’, ‘b’, col2)
  • col1 + 3 > 5
  • lower(col1) BETWEEN ‘a’ AND ‘c’

Since version 372, Trino supports pushdown of ConnectorExpression. A tree data structure that can potentially represent the entire predicate. Support for more and more predicate types is added gradually to the project. All of the above examples are expected to be fully supported by version 378, so the underlying connectors will be able to apply these predicates, a significant improvement in Trino’s performance while reducing the cluster’s resource utilization.

How to get the new pushdown in your own connector (technical)

The new capability is currently reflected in two APIs – 

  • applyProjection – for example SELECT lower(varchar_column) FROM table
  • applyFilter – for example SELECT * FROM table WHERE lower(varchar_column) = 'value'

applyFilter

Input:

Two new members were added to the Constraint argument:

  • private final ConnectorExpression expression; - the root node of the connectorExpression.
  • private final Map<String, ColumnHandle> assignments; – a mapping between variable names in the expression to ColumnHandles.

Please note that connectorExpression was added in addition to the existing 

private final TupleDomain<ColumnHandle> summary;

The new data structure doesn’t replace the existing one and the two should be AND-ed. 

For example, for the following query:

SELECT * FROM table WHERE long_column > 0 AND (varchar_column NOT LIKE 'tex%t' OR strpos(varchar_column, 'a') + 3 > long_column)

We’ll get:

expression =

    Call[name=’$or’, arguments=[

        Call[name=’$not’, arguments=[

            Call[name=’$like_pattern’, arguments=[

                varchar_column::varchar,

                Slice::varchar(9)]

            ]]

        ],

        Call[name=’$greater_than’, arguments=[

            Call[name=’$add’, arguments=[

                Call[name=’strpos’, arguments=[

                    varchar_column::varchar,

                    Slice::varchar(1)]

                ],

                3::bigint]

            ],

            long_column::bigint]

        ]]

    ]

assignments =

{

    “varchar_column” -> ColumnHandle,

    “long_column” -> ColumnHandle

}

summary =

{

    long_column::bigint=[ SortedRangeSet[type=bigint, ranges={(0,<max>)}] ]

}

Handling:

The package io.trino.plugin.base.expression was added to trino-plugin-toolkit. It contains infrastructure for rewriting the expression tree into any internal data structure. All you have to do is to instantiate ConnectorExpressionRewriter with some custom ConnectorExpressionRule(s) and call connectorExpressionRewriter.rewrite(session, expression, assignments) at applyFilter(). 

Instead of trying to handle the entire expression, you can break it into conjunctions using ConnectorExpressions#extractConjuncts and try to handle each conjunction separately. The unsupported conjunctions can be AND-ed using ConnectorExpressions#and, and returned to Trino for later filtering.

Predicate types like OR, NOT, LIKE, etc. are converted into functions. The function names can be found at io.trino.spi.expression.StandardFunctions.

Output:

The following member was added to ConstraintApplicationResult:

private final Optional<ConnectorExpression> remainingExpression;

This member represents the remaining predicate to be applied by Trino on the connector’s data. The expression doesn’t have to be a subset of the original expression and can represent a different predicate, according to the connector’s need.

Please note that remainingExpression was added in addition to the existing 

private final TupleDomain<ColumnHandle> remainingFilter;

Trino will AND the two.

applyProjection

applyProjection API wasn’t changed. It supplied a list of connectorExpressions and an assignment mapping even before this change. The difference is that more expressions will be pushed down.

Optional<ProjectionApplicationResult<TableHandle>> applyProjection(Session session, TableHandle table, List<ConnectorExpression> projections, Map<String, ColumnHandle> assignments);

Trino connectors which are already using the new pushdown

These connectors already leverage the new pushdown capability:

  • PostgreSQL – One of the most popular, open source, SQL compatible RDBMS. The predicate is translated back to SQL and run on the engine, potentially sparing a lot of data fetching and transferring.
  • Varada – A Trino connector that dynamically indexes the data lake, allowing enhanced performance on any budget with zero data-ops. The new capability allows Varada to use Lucene indexes in order to accelerate LIKE queries. In the near future, Varada will also accelerate various functions like isNan, st_contains, contains, and more.
We use cookies to improve your experience. To learn more, please see our Privacy Policy
Accept