Skip to content

Aggregation Comparison

Definition

Verifies that the specified comparison operator evaluates true when applied to two aggregation expressions.

In-Depth Overview

The Aggregation Comparison is a rule that allows for the dynamic analysis of aggregations across different datasets. It empowers users to establish data integrity by ensuring that aggregate values meet expected comparisons, whether they are totals, averages, counts, or any other aggregated metric.

By setting a comparison between aggregates from potentially different tables or even source datastores, this rule confirms that relationships between data points adhere to business logic or historical data patterns. This is particularly useful when trying to validate interrelated financial reports, summary metrics, or when monitoring the consistency of data ingestion over time.

Field Scope

Calculated: The rule automatically identifies the fields involved, without requiring explicit field selection.

General Properties

Name Supported
Filter
Allows the targeting of specific data based on conditions
Coverage Customization
Allows adjusting the percentage of records that must meet the rule's conditions

The filter allows you to define a subset of data upon which the rule will operate.

It requires a valid Spark SQL expression that determines the criteria rows in the DataFrame should meet. This means the expression specifies which rows the DataFrame should include based on those criteria. Since it's applied directly to the Spark DataFrame, traditional SQL constructs like WHERE clauses are not supported.

Examples

Direct Conditions

Simply specify the condition you want to be met.

Correct usage
O_TOTALPRICE > 1000
C_MKTSEGMENT = 'BUILDING'
Incorrect usage
WHERE O_TOTALPRICE > 1000
WHERE C_MKTSEGMENT = 'BUILDING'

Combining Conditions

Combine multiple conditions using logical operators like AND and OR.

Correct usage
O_ORDERPRIORITY = '1-URGENT' AND O_ORDERSTATUS = 'O'
(L_SHIPDATE = '1998-09-02' OR L_RECEIPTDATE = '1998-09-01') AND L_RETURNFLAG = 'R'
Incorrect usage
WHERE O_ORDERPRIORITY = '1-URGENT' AND O_ORDERSTATUS = 'O'
O_TOTALPRICE > 1000, O_ORDERSTATUS = 'O'

Utilizing Functions

Leverage Spark SQL functions to refine and enhance your conditions.

Correct usage
RIGHT(
    O_ORDERPRIORITY,
    LENGTH(O_ORDERPRIORITY) - INSTR('-', O_ORDERPRIORITY)
) = 'URGENT'
LEVENSHTEIN(C_NAME, 'Supplier#000000001') < 7
Incorrect usage
RIGHT(
    O_ORDERPRIORITY,
    LENGTH(O_ORDERPRIORITY) - CHARINDEX('-', O_ORDERPRIORITY)
) = 'URGENT'
EDITDISTANCE(C_NAME, 'Supplier#000000001') < 7

Using scan-time variables

To refer to the current dataframe being analyzed, use the reserved dynamic variable {{ _qualytics_self }}.

Correct usage
O_ORDERSTATUS IN (
    SELECT DISTINCT O_ORDERSTATUS
    FROM {{ _qualytics_self }}
    WHERE O_TOTALPRICE > 1000
)
Incorrect usage
O_ORDERSTATUS IN (
    SELECT DISTINCT O_ORDERSTATUS
    FROM ORDERS
    WHERE O_TOTALPRICE > 1000
)

While subqueries can be useful, their application within filters in our context has limitations. For example, directly referencing other containers or the broader target container in such subqueries is not supported. Attempting to do so will result in an error.

Important Note on {{ _qualytics_self }}

The {{ _qualytics_self }} keyword refers to the dataframe that's currently under examination. In the context of a full scan, this variable represents the entire target container. However, during incremental scans, it only reflects a subset of the target container, capturing just the incremental data. It's crucial to recognize that in such scenarios, using {{ _qualytics_self }} may not encompass all entries from the target container.

Specific Properties

Facilitates the comparison between a target aggregate metric and a reference aggregate metric across different datasets.

Name Description
Target Aggregation
Specifies the aggregation expression to evaluate
Comparison
Select the comparison operator (e.g., greater than, less than, etc.)
Datastore
Identifies the source datastore for the reference aggregation
Table/File
Specifies the table or file for the reference aggregation
Reference Aggregation
Defines the reference aggregation expression to compare against
Reference Filter
Applies a filter to the reference aggregation if necessary

Details

It's important to understand that each aggregation must result in a single row. Also, similar to Spark expressions, the aggregation expressions must be written in a valid format for DataFrames.

Examples

Simple Aggregations

SUM(O_TOTALPRICE)

Combining with SparkSQL Functions

ROUND(SUM(O_TOTALPRICE))

Complex Aggregations

ROUND(SUM(L_EXTENDEDPRICE * (1 - L_DISCOUNT) * (1 + L_TAX)))

Aggregation Expressions

COUNT(CATEGORY) * MAX(VALUE) - FIRST(VALUE)

Here are some common aggregate functions used in SparkSQL:

  • SUM: Calculates the sum of all values in a column.
  • AVG: Calculates the average of all values in a column.
  • MAX: Returns the maximum value in a column.
  • MIN: Returns the minimum value in a column.
  • COUNT: Counts the number of rows in a column.

For a detailed list of valid SparkSQL aggregation functions, refer to the Apache Spark SQL documentation.

{
    "description": "Assert that O_ORDERDATE is after the defined date time",
    "coverage": 1,
    "properties":  {
        "datetime": "1991-12-31 10:30:00"
    },
    "tags": [],
    "fields": fields,
    "additional_metadata": {"key 1": "value 1", "key 2": "value 2"},
    "rule": "afterDateTime",
    "container_id": {container_id},
    "template_id": {template_id},
}

Anomaly Types

Type Supported
Record
Flag inconsistencies at the row level
Shape
Flag inconsistencies in the overall patterns and distributions of a field

Example

Objective: Ensure that the aggregated sum of total_price from the ORDERS table matches the aggregated and rounded sum of calculated_price from the LINEITEM table.

Info

The calculated_price in this example is represented by the sum of each product's extended price, adjusted for discount and tax.

Sample Data

Aggregated data from ORDERS (Target)

TOTAL_PRICE
5000000

Aggregated data from LINEITEM (Reference)

CALCULATED_PRICE
4999800
Inputs
  • Target Aggregation: ROUND(SUM(O_TOTALPRICE))
  • Comparison: eq (Equal To), lt (Less Than), lte (Less Than or Equal to), gte (Greater Than or Equal To), gt (Greater Than)
  • Reference Aggregation: ROUND(SUM(L_EXTENDEDPRICE * (1 - L_DISCOUNT) * (1 + L_TAX)))
{
    "description": "Ensure that the aggregated sum of total_price from the ORDERS table matches the aggregated and sum of l_totalprice from the LINEITEM table",
    "coverage": 1,
    "properties": {
        "comparison": "eq",
        "expression": f"SUM(O_TOTALPRICE)",
        "ref_container_id": ref_container_id,
        "ref_datastore_id": ref_datastore_id,
        "ref_expression": f"SUM(L_TOTALPRICE)",
        "ref_filter": "1=1",
    },
    "tags": [],
    "fields": ["O_TOTALPRICE"],
    "additional_metadata": {"key 1": "value 1", "key 2": "value 2"},
    "rule": "aggregationComparison",
    "container_id": {container_id},
    "template_id": {template_id},
    "filter": "1=1"
}

Anomaly Explanation

In the sample data above, the aggregated TOTAL_PRICE from the ORDERS table is 5000000, while the aggregated and rounded CALCULATED_PRICE from the LINEITEM table is 4999800. The difference between these totals indicates a potential anomaly, suggesting issues in data calculation or recording methods.

graph TD
A[Start] --> B[Retrieve Aggregated Values]
B --> C{Do Aggregated Totals Match?}
C -->|Yes| D[End]
C -->|No| E[Mark as Anomalous]
E --> D
-- An illustrative SQL query related to the rule using TPC-H tables.
with orders_agg as (
    select 
        round(sum(o_totalprice)) as total_order_price
    from 
        orders
),
lineitem_agg as (
    select 
        round(sum(l_extendedprice * (1 - l_discount) * (1 + l_tax))) as calculated_price
    from 
        lineitem
),
comparison as (
    select
        o.total_order_price,
        l.calculated_price
    from
        orders_agg o
        cross join lineitem_agg l
)
select * from comparison
where comparison.total_order_price != comparison.calculated_price;

Potential Violation Messages

Shape Anomaly

ROUND(SUM(O_TOTALPRICE)) is not equal to ROUND(SUM(L_EXTENDEDPRICE * (1 - L_DISCOUNT) * (1 + L_TAX))).