Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Published by Scroll Versions from space ML1 and version 9.0

...

The ClustrixDB Query Optimizer is also known as Sierra. 

An Example

...

languagesql
titleExample
sql> -- Query
sql> SELECT Bar.a, 
            Sum(Bar.b) 
     FROM    Foo, Bar 
     WHERE  Foo.pk = Bar.pk 
     GROUP  BY Bar.a;

sql> -- Foo schema
sql> CREATE TABLE `Foo` 
        ( `pk` INT(11) NOT NULL auto_increment, 
          `a`  INT(11), 
          `b`  INT(11), 
          `c`  INT(11), 
          PRIMARY KEY (`pk`) /*$ DISTRIBUTE=1 */, 
          KEY `idx_ab` (`a`, `b`) /*$ DISTRIBUTE=2 */ 
        ) auto_increment=50002; 

sql> -- Bar schema
sql> CREATE

...

 TABLE `Bar` 
        ( `pk` INT(11) NOT NULL auto_increment, 
          `a`  INT(11), 
          `b`  INT(11), 
           `c`  INT(11), 
           PRIMARY KEY (`pk`) /*$ DISTRIBUTE=1 */, 
           KEY `idx_ab` (`a`, `b`) /*$ DISTRIBUTE=2 */ 
         ) auto_increment=25001

...

;


SQL Representation – WhatSierra Output – How
(display ((2 . "a") (0 . "expr0"))
(aggregate ((2 . "a") ("expr0" sum (2 . "b")))
(filter
(inner_join
(table_scan (1 : Foo ("pk" "a" "b" "c")))
(table_scan (2 : Bar ("pk" "a" "b" "c")))
(func eq
(ref (1 . "pk"))
(ref (2 . "pk"))))))
(display ((2 . "a") (0 . "expr0"))
(stream_aggregate ((2 . "a") ("expr0" sum (2 . "b")))
(msjoin
(stream_merge
(index_scan (2 : Bar.idx_ab ("b" "pk" "a")))
(index_scan (1 : Foo.idx_PRIMARY ("pk"))
range equal (ref (2 . "pk"))))))
Diagram 1 

First, some explanations for these plans. We'll go from most indented to least indented. These descriptions should be read top to bottom for equally indented statements.

This SQL statement is performing the following: 

  • Read table Foo and join to Bar.
  • Evaluate the join constraint.
  • Compute the aggregate.
  • Display the output to the user.

The Query Optimizer is doing the following:

  1. Read table Bar, preserving the sort order via stream_merge.
  2. Evaluate the join constraint while reading Foo and still preserving sort order via msjoin this time.
  3. Compute the aggregate in a non-blocking way with stream_aggregate. This can be done because we preserved order.

...

An expression consists of: an Operator (required), Arguments (possibly none), and Inputs (possibly none). Arguments describe particular characteristics of the operator. There are both logical and physical operators and every logical operator maps to 1 or more physical operators. In the example the logical operator:

...

(table_scan (1 : Foo  ("pk" "a" "b" "c"))) 

maps to the physical expression:

...

(index_scan (1 : Foo  ("pk") (range equal (ref  (2 . "pk")))

...

These operators have arguments which describe their Namespace, Object Id, and Columns. Here, the table_scan has no inputs and index_scan has an input that represents the join constraint. (See List of Planner Operators.)

Physical Properties

Physical properties are related to intermediate results, or sub-plans. They describe things like how the data is ordered and how the data is partitioned.  It is important to note that expressions (either logical or physical) and groups (see below) do not have physical properties. However, every physical expression has two descriptions related to physical properties:

...


The invalid plan fails because the stream_combine operator is unable to preserve any kind of ordering that its inputs provide. However, in the valid plan, stream_merge is used, which can preserve the sort order of its child, and the index_scan itself does have sort order. In effect, plans which may or may not be valid are explored and the physical properties are used in order to validate whether they are possible. If any operator in the chain of provide and require failsfails, the whole thing plan is invalidated as a possible output plan.

Groups

Groups correspond to intermediate tables, or equivalently subplans of the query. Groups are logical and contain the following:

  1. All the logically equivalent expressions that describe that intermediate table.
  2. All the physical implementations of those logical expressions.
  3. Winners: A physical expression that had the best cost given a set of physical properties.
  4. Logical properties: Which columns it is required to produce as well as statistics about some of those columns.

...

The model's rule set can be thought of as defining the logical and physical search space of the optimizer. The memo is expanded to encompass the full logical and physical search space through the application of rules. The application of rules is a multi-step process of finding a binding in the memo, evaluating a rule condition (if the rule has one) and (if the condition is satisfied) firing the rule, which puts new expressions in the memo. When the optimizer is done applying rules, the memo structure will have been expanded to where it conceptually represents the entire search space. Here is an example of the swap_rule firing. It was responsible for exploring the join ordering picked in the final plan of the example. 

Swap Join Order Rule 
(display
((1 . "pk") (2 . "pk")
(inner_join
 (index_scan (1:Foo ("pk"))) 
 (index_scan (2:Bar ("pk")) 
 (range_equal ref (1 . "pk")))) 
(display
((1 . "pk") (2 . "pk")
(inner_join
 (index_scan (2:Bar ("pk"))
)
 
 (index_scan (1:Foo ("pk"))) 
 (range_equal ref (2 . "pk")))) 
Diagram 4 

Tasks (The Search Engine)

...

Sierra costs plans using a combination of I/O, CPU usage, and latency. Remember that ClustrixDB is distributed so total CPU usage and latency are not proportional. Every operator describes a function in order to compute its costs given its inputs. For example an index_scan uses the row estimation framework to compute how many rows it expects to read from the btree and then its computes its cost as such:

...

index_scan.rows = row_est()
index_scan.cost = cost_open + row_est() * cost_read_row

The operator above the index_scan would then use this cost and row estimate to estimate its own cost. 

Row Estimation

The way Sierra chooses the optimal plan for a query is by finding the plan with the cheapest cost. Cost is strongly dependent on how many rows the optimizer thinks are going to be flowing through the system. The job of the row estimation subsystem is to take statistical information from our Probability Distributions and compute an estimated number of rows that will come out of a given expression. 

...

For the query above, we can get a succinct description of the plan, the row estimates, and the cost by prefacing the query with 'explain'. (For additional information, see Understanding the ClustrixDB Explain Output.)

...

sql> explain select bar.a, sum(bar.b) from bar,foo where foo.pk = bar.pk

...

 group by bar.a; 
+-------------------------------------------------------------+-----------+-----------+
| Operation                                                   | Est. Cost | Est. Rows |
+-------------------------------------------------------------+-----------+-----------+
| stream_aggregate GROUPBY((1 . "a")) expr0 := sum((1 . "b")) | 142526.70 |  25000.00 | 
|   msjoin KEYS=[((1 . "a")) GROUP]                           | 137521.70 |  25000.00 | 
|     stream_merge KEYS=[(1 . "a") ASC]                       |  20016.70 |  25000.00 | 
|       index_scan 1 := bar.idx_ab                            |   5003.90 |   8333.33 | 
|     index_scan 2 := foo.__idx_foo__PRIMARY, pk = 1.pk       |      4.50 |      1.00 | 
+-------------------------------------------------------------+-----------+-----------+
5 rows in set (0.01 sec)

Summary

To summarize how Sierra got the output in diagram 1, the following steps were performed:

...

We've talked a lot about how the query optimizers finds the best plan, but so far the concepts are not unique to ClustrixDB. One of the special things about Sierra is that it is able to reason about doing distributed operations. For example there are two ways to compute an aggregate. Let's understand the non-distributed way first:

    1. Read table Foo which likely has slices on multiple nodes.
    2. Forward all those rows to one node.
    3. Insert all those rows into a hash container, computing the aggregate operation if necessary.
    4. Read the container and output to the user.

Here's what it would like:

...

    1. Compute the sub-plan (under the aggregate), which likely has result rows on multiple nodes.
    2. Locally insert those rows into a local hash container, computing the aggregate operation if necessary.
    3. Read the hash container on each node and forward to a single node.
    4. If necessary:
      1. Insert all those rows into a new final hash container, computing the aggregate operation if necessary.
      2. Read that hash container.
    5. Output rows to the user.

Here's what this would look like:

...