Aggregation Comparison
Definition
Verifies that the specified comparison operator evaluates true when applied to two aggregation expressions.
In-Depth Overview
The Aggregation Comparison
is a rule that allows for the dynamic analysis of aggregations across different datasets. It empowers users to establish data integrity by ensuring that aggregate values meet expected comparisons, whether they are totals, averages, counts, or any other aggregated metric.
By setting a comparison between aggregates from potentially different tables or even source datastores, this rule confirms that relationships between data points adhere to business logic or historical data patterns. This is particularly useful when trying to validate interrelated financial reports, summary metrics, or when monitoring the consistency of data ingestion over time.
Field Scope
Calculated: The rule automatically identifies the fields involved, without requiring explicit field selection.
General Properties
Name | Supported |
---|---|
Filter Allows the targeting of specific data based on conditions |
|
Coverage Customization Allows adjusting the percentage of records that must meet the rule's conditions |
The filter allows you to define a subset of data upon which the rule will operate.
It requires a valid Spark SQL expression that determines the criteria rows in the DataFrame should meet. This means the expression specifies which rows the DataFrame should include based on those criteria. Since it's applied directly to the Spark DataFrame, traditional SQL constructs like WHERE clauses are not supported.
Examples
Direct Conditions
Simply specify the condition you want to be met.
Combining Conditions
Combine multiple conditions using logical operators like AND
and OR
.
Correct usage
Incorrect usage
Utilizing Functions
Leverage Spark SQL functions to refine and enhance your conditions.
Correct usage
Incorrect usage
Using scan-time variables
To refer to the current dataframe being analyzed, use the reserved dynamic variable {{ _qualytics_self }}
.
Correct usage
Incorrect usage
While subqueries can be useful, their application within filters in our context has limitations. For example, directly referencing other containers or the broader target container in such subqueries is not supported. Attempting to do so will result in an error.
Important Note on {{ _qualytics_self }}
The {{ _qualytics_self }}
keyword refers to the dataframe that's currently under examination. In the context of a full scan, this variable represents the entire target container. However, during incremental scans, it only reflects a subset of the target container, capturing just the incremental data. It's crucial to recognize that in such scenarios, using {{ _qualytics_self }}
may not encompass all entries from the target container.
Specific Properties
Facilitates the comparison between a target
aggregate metric and a reference
aggregate metric across different datasets.
Name | Description |
---|---|
Target Aggregation |
Specifies the aggregation expression to evaluate |
Comparison |
Select the comparison operator (e.g., greater than, less than, etc.) |
Datastore |
Identifies the source datastore for the reference aggregation |
Table/File |
Specifies the table or file for the reference aggregation |
Reference Aggregation |
Defines the reference aggregation expression to compare against |
Reference Filter |
Applies a filter to the reference aggregation if necessary |
Details
It's important to understand that each aggregation must result in a single row. Also, similar to Spark expressions, the aggregation expressions must be written in a valid format for DataFrames.
Examples
Simple Aggregations
Combining with SparkSQL Functions
Complex Aggregations
Aggregation Expressions
Here are some common aggregate functions used in SparkSQL:
SUM
: Calculates the sum of all values in a column.AVG
: Calculates the average of all values in a column.MAX
: Returns the maximum value in a column.MIN
: Returns the minimum value in a column.COUNT
: Counts the number of rows in a column.
For a detailed list of valid SparkSQL aggregation functions, refer to the Apache Spark SQL documentation.
{
"description": "Assert that O_ORDERDATE is after the defined date time",
"coverage": 1,
"properties": {
"datetime": "1991-12-31 10:30:00"
},
"tags": [],
"fields": fields,
"additional_metadata": {"key 1": "value 1", "key 2": "value 2"},
"rule": "afterDateTime",
"container_id": {container_id},
"template_id": {template_id},
}
Anomaly Types
Type | Supported |
---|---|
Record Flag inconsistencies at the row level |
|
Shape Flag inconsistencies in the overall patterns and distributions of a field |
Example
Objective: Ensure that the aggregated sum of total_price
from the ORDERS
table matches the aggregated and rounded sum of calculated_price
from the LINEITEM
table.
Info
The calculated_price
in this example is represented by the sum of each product's extended price, adjusted for discount and tax.
Sample Data
Aggregated data from ORDERS (Target)
TOTAL_PRICE |
---|
5000000 |
Aggregated data from LINEITEM (Reference)
CALCULATED_PRICE |
---|
4999800 |
Inputs
- Target Aggregation: ROUND(SUM(O_TOTALPRICE))
- Comparison: eq (Equal To), lt (Less Than), lte (Less Than or Equal to), gte (Greater Than or Equal To), gt (Greater Than)
- Reference Aggregation: ROUND(SUM(L_EXTENDEDPRICE * (1 - L_DISCOUNT) * (1 + L_TAX)))
{
"description": "Ensure that the aggregated sum of total_price from the ORDERS table matches the aggregated and sum of l_totalprice from the LINEITEM table",
"coverage": 1,
"properties": {
"comparison": "eq",
"expression": f"SUM(O_TOTALPRICE)",
"ref_container_id": ref_container_id,
"ref_datastore_id": ref_datastore_id,
"ref_expression": f"SUM(L_TOTALPRICE)",
"ref_filter": "1=1",
},
"tags": [],
"fields": ["O_TOTALPRICE"],
"additional_metadata": {"key 1": "value 1", "key 2": "value 2"},
"rule": "aggregationComparison",
"container_id": {container_id},
"template_id": {template_id},
"filter": "1=1"
}
Anomaly Explanation
In the sample data above, the aggregated TOTAL_PRICE
from the ORDERS
table is 5000000, while the aggregated and rounded CALCULATED_PRICE
from the LINEITEM
table is 4999800. The difference between these totals indicates a potential anomaly, suggesting issues in data calculation or recording methods.
graph TD
A[Start] --> B[Retrieve Aggregated Values]
B --> C{Do Aggregated Totals Match?}
C -->|Yes| D[End]
C -->|No| E[Mark as Anomalous]
E --> D
-- An illustrative SQL query related to the rule using TPC-H tables.
with orders_agg as (
select
round(sum(o_totalprice)) as total_order_price
from
orders
),
lineitem_agg as (
select
round(sum(l_extendedprice * (1 - l_discount) * (1 + l_tax))) as calculated_price
from
lineitem
),
comparison as (
select
o.total_order_price,
l.calculated_price
from
orders_agg o
cross join lineitem_agg l
)
select * from comparison
where comparison.total_order_price != comparison.calculated_price;
Potential Violation Messages
Shape Anomaly
ROUND(SUM(O_TOTALPRICE))
is not equal to ROUND(SUM(L_EXTENDEDPRICE * (1 - L_DISCOUNT) * (1 + L_TAX)))
.