Why Unit tests?¶
We are testing our code to ensure it works as expected. Our data quality tests do check a lot of things, but we also need to make sure that our code is working as expected. Unit tests are a great way to do this, and give us a lot of assurance that our pipelines behave as expected.
It also enables us to start developing differntly. Instead of writing code hoping it will deliver a result we are happy with, we can start to do test driven development. This allows us to first think about the expected output of our code, and then write the code and make changes to the busines logic until we get the expected output. This is a better way to develop and might even save us time in the future.
Unit test framework¶
Prepare Enriched layer for test driven development¶
To start implementing test driven development, we had to prepare our enriched layer for it and change the way we do IOs.
Enriched layer before test driven development¶
In the image you can see our enriched layer used to do Input/Output (IO) processing in the business logic code * table.py*. This is not good for test driven development as you would have to patch all the IOs in the business logic code making it very complex and difficult to properly do test driven development. This we had to change to reach the result show in the next image.
Enriched layer after test driven development¶
In the image you can see the changes made, to make sure all IO is done in the run_table.py file, this makes sure we have a clean input -> pipeline -> output structure, which makes it easy to do test driven development without too much mocking or pathcing involved.
In green you can see the implemented testing framework we use on each enriched pipeline:
- In test_table_data.py we define the expected output of the table we want to reach.
- In test_table.py we define the test data that we will use instead of actual input data.
- For this example this is the test data for raw_table and enriched_table
- test_table.py gets both the input and expected output data from test_table_data.py.
- test_table,py executes the table.py business logic and mocks the necessary IOs.
- test_table.py catures the dataframe that is created by the business logic right before upserting to the table.
- This dataframe is compared to the expected output dataframe, if it is the same the test passes. If not, Chispa (our pyspark testing library) will show clearly the differences between the 2 dataframes.
We can test the business logic with many different edge cases in the inputs, to test all the different scenarios.
How to Write Unit Tests¶
A practical guide to testing PySpark data pipelines in biocloud-core.
Why This Testing Approach?¶
Testing PySpark transformations has three main challenges:
- Schema complexity - Manually defining PySpark schemas is tedious and error-prone
- Test data setup - Managing multiple input DataFrames and expected outputs gets messy
- Unclear failures - Default PySpark assertions don't show you what is different
Our testing framework solves these problems:
- Schemas are automatically loaded from data contracts (no manual
StructTypedefinitions) - The
TestDataclass organizes your inputs and outputs cleanly - Chispa shows you exactly which rows and columns differ when tests fail
Writing Your First Test¶
Let's walk through creating a test for a table called amplicon.
Step 1: Create Your Test Data File¶
First, define your test data in a separate Python file:
# biocloudcore/enriched/dna/tests/data/test_amplicon_data.py
import datetime
# Use a fixed timestamp so tests are deterministic
FIXED_TEST_TIMESTAMP = datetime.datetime(1989, 11, 9)
# Raw input data
AMPLICONS_RAW = [
{
"consensuses_url": "https://link_to_endpoint/api/samples/26/consensuses/",
"created_at": "2024-06-06T11:29:19.681000",
"extract_id": "e1100039674",
"forward_primer": "ONT_LepF1+ONT_LCO1490",
"id": 26,
"is_control": False,
"marker": "COI-5P",
# ... all other fields
"inserted_ts_utc": datetime.datetime(2024, 6, 6, 11, 29, 19, 681000),
"updated_ts_utc": datetime.datetime(2024, 6, 6, 11, 29, 19, 681000),
},
]
# Lookup table data
DNA_EXTRACT = [
{
"dna_extract_golden_id": 1,
"material_entity_id": None,
"dna_extract_id": "e1100039674",
"catalog_number": "RMNH.5143366",
"stock_plate_id": "NCBN001711",
"last_updated_source": "nanopore",
"inserted_ts_utc": datetime.datetime(2024, 6, 6, 11, 0, 0),
"updated_ts_utc": datetime.datetime(2024, 6, 6, 11, 0, 0),
},
]
SEQUENCING_RUN = [
{
"sequencing_run_id": 1,
"title": "Sample_Pool_Lepidoptera",
"project_name": "ARISE_Project",
"end_date_time": datetime.datetime(2024, 6, 6, 10, 0, 0),
"source_id": "969410a3-a2cb-42c9-ae97-2194a96870e7",
"source": "nanopore",
"inserted_ts_utc": datetime.datetime(2024, 6, 6, 10, 30, 0),
"updated_ts_utc": datetime.datetime(2024, 6, 6, 10, 30, 0),
},
]
# What you expect the output to look like
EXPECTED_OUTPUT = [
{
"amplicon_id": 1,
"dna_extract_golden_id": 1,
"sequencing_run_id": 1,
"dna_extract_id": "e1100039674",
"project_id": "23009-5004000157",
"pcr_id": "LS027",
"primer_name_forward": "ONT_LepF1+ONT_LCO1490",
"primer_name_reverse": "ONT_LepR1+ONT_HCO2198",
"marker": "COI-5P",
"is_control": False,
"source_id": "26",
"source": "nanopore",
"inserted_ts_utc": FIXED_TEST_TIMESTAMP,
"updated_ts_utc": FIXED_TEST_TIMESTAMP,
},
]
Step 2: Set Up Your Test Fixture¶
Create a fixture that loads all your test data:
# biocloudcore/enriched/dna/tests/test_amplicon.py
from dataclasses import dataclass
import pytest
import biocloudcore.enriched.dna.tests.data.test_amplicon_data as data
from biocloudcore.conftest import TestData
from biocloudcore.enriched.dna.amplicon import Amplicon
from biocloudcore.utils.test_tools.test_functions import assert_test_output, override
@pytest.fixture
def test_data(spark, mock_data_lake):
"""Load all test data for amplicon tests."""
# Define a container for all your datasets
@dataclass
class AmpliconTestData:
amplicon: TestData # Output table
raw_amplicons: TestData # Raw input
dna_extract: TestData # Lookup table 1
sequencing_run: TestData # Lookup table 2
existing_data: TestData # For testing upserts
return AmpliconTestData(
# Output table - needs contract for schema
amplicon=TestData(
spark,
data=data.EXPECTED_OUTPUT,
contract_path="enriched/dna/amplicon_odcs.yaml"
),
# Raw input - no schema needed, Spark can infer
raw_amplicons=TestData(spark, data=data.AMPLICONS_RAW),
# Lookup tables - need contracts for schema
dna_extract=TestData(
spark,
data=data.DNA_EXTRACT,
contract_path="enriched/dna/dna_extract_odcs.yaml"
),
sequencing_run=TestData(
spark,
data=data.SEQUENCING_RUN,
contract_path="enriched/dna/sequencing_run_odcs.yaml"
),
# Empty for now, used in upsert tests
existing_data=TestData(
spark,
data=[],
contract_path="enriched/dna/amplicon_odcs.yaml"
),
)
What's happening here:
TestDatawraps your test data and automatically loads schemas from contracts- When you provide a
contract_path, it loads the schema from that YAML file - When you don't, Spark infers the schema from your data
- Everything is lazily loaded - schemas only load when you access them
Step 3: Write Your Test¶
Now write your actual test:
@pytest.mark.unit
@pytest.mark.patch_module("biocloudcore.enriched.dna.amplicon")
@pytest.mark.fixed_timestamp(data.FIXED_TEST_TIMESTAMP)
def test_amplicon_happy_path(enriched_test_mocks, spark, mock_data_lake, test_data):
"""Test that amplicon processes raw data correctly."""
# Run your table logic
Amplicon(
spark=spark,
data_lake=mock_data_lake,
data_contract=test_data.amplicon.contract,
source="nanopore",
table_name="amplicon",
).run(
raw_amplicons=test_data.raw_amplicons.df,
dna_extract=test_data.dna_extract.df,
sequencing_run=test_data.sequencing_run.df,
existing_data=test_data.existing_data.df,
)
# Check the output matches expectations
assert_test_output(
spark=spark,
actual=enriched_test_mocks.result_df, # What your code produced
expected=data.EXPECTED_OUTPUT, # What you expected
schema=test_data.amplicon.schema # Schema to enforce
)
Understanding the markers:
@pytest.mark.unit- Tags this as a unit test (run withpytest -m unit)@pytest.mark.patch_module(...)- Tells the framework which module to mock@pytest.mark.fixed_timestamp(...)- Ensures timestamps are deterministic
Understanding the mocks:
enriched_test_mocksautomatically mocksupsert_to_delta_tableandadd_timestamp_columnsenriched_test_mocks.result_dfgives you the DataFrame that would have been written to the table- You don't need to write any mocking code yourself!
Why Chispa Makes Testing Better¶
When a test fails, you want to know exactly what's wrong. Here's the difference:
Without Chispa (default PySpark)¶
That's it. You have no idea what's different.
With Chispa¶
DataFramesNotEqual: DataFrame are not equal
Rows in actual but not in expected:
+---+----------+-------------+
| id| dna_extract_id | project_id |
+---+----------+-------------+
| 3| e1100039999 | WRONG_ID |
+---+----------+-------------+
Rows in expected but not in actual:
+---+----------+-------------+
| id| dna_extract_id | project_id |
+---+----------+-------------+
| 3| e1100039999 | CORRECT_ID |
+---+----------+-------------+
Now you can see:
- Which rows are different
- Which specific values don't match
- Exactly what to fix
This is why we use assert_test_output - it uses Chispa under the hood to give you these helpful error messages.
Testing Edge Cases¶
Good tests cover more than just the happy path. Here's how to test edge cases:
Testing Empty Lookup Tables¶
What happens when a lookup table is empty?
@pytest.mark.unit
@pytest.mark.patch_module("biocloudcore.enriched.dna.amplicon")
@pytest.mark.fixed_timestamp(data.FIXED_TEST_TIMESTAMP)
@pytest.mark.parametrize(
"missing_table,null_field",
[
("dna_extract", "dna_extract_golden_id"),
("sequencing_run", "sequencing_run_id"),
],
)
def test_amplicon_missing_lookup(missing_table, null_field, enriched_test_mocks, spark, mock_data_lake, test_data):
"""Test that missing lookups result in null values."""
# Create an empty version of the lookup table
empty_dataset = getattr(test_data, missing_table)
empty_df = spark.createDataFrame([], empty_dataset.schema)
# Build inputs with one table empty
inputs = {
"raw_amplicons": test_data.raw_amplicons.df,
"dna_extract": test_data.dna_extract.df,
"sequencing_run": test_data.sequencing_run.df,
"existing_data": test_data.existing_data.df,
}
inputs[missing_table] = empty_df # Replace with empty
Amplicon(...).run(**inputs)
# Verify the field is null when lookup is missing
result = enriched_test_mocks.result_df
assert result.filter(result[null_field].isNull()).count() == 2
What's useful here:
@pytest.mark.parametrizeruns the same test with different parameters- One test function tests two scenarios (empty dna_extract and empty sequencing_run)
- Uses
getattr()to dynamically access the right test data
Testing Upsert Behavior¶
Test that existing records don't get duplicated:
@pytest.mark.unit
@pytest.mark.patch_module("biocloudcore.enriched.dna.amplicon")
@pytest.mark.fixed_timestamp(data.FIXED_TEST_TIMESTAMP)
def test_amplicon_with_existing_records(enriched_test_mocks, spark, mock_data_lake, test_data):
"""Test that existing records are not duplicated."""
from biocloudcore.utils.test_tools.test_functions import override
# Create an existing record using override helper
existing_record = override(
data.EXPECTED_OUTPUT[0],
project_id="OLD_PROJECT",
is_control=True,
)
existing_data_df = spark.createDataFrame(
[existing_record],
test_data.amplicon.schema
)
Amplicon(
spark=spark,
data_lake=mock_data_lake,
data_contract=test_data.amplicon.contract,
source="nanopore",
table_name="amplicon",
).run(
raw_amplicons=test_data.raw_amplicons.df,
dna_extract=test_data.dna_extract.df,
sequencing_run=test_data.sequencing_run.df,
existing_data=existing_data_df,
)
result = enriched_test_mocks.result_df
actual_count = result.count()
assert actual_count == 2, f"Expected 2 records (no duplicates) but got {actual_count}"
# Verify the existing primary key is reused
ids = [row.amplicon_id for row in result.collect()]
assert 1 in ids, f"Expected amplicon_id=1 to be reused but got IDs: {ids}"
Running Your Tests¶
# Run all tests
pytest
# Run only unit tests (fast!)
pytest -m unit
# Run tests for a specific file
pytest biocloudcore/enriched/dna/tests/test_amplicon.py
# Run tests in parallel (faster)
pytest -n auto
# Run with more detail
pytest -v
Common Patterns¶
Creating Empty DataFrames¶
Sometimes you need an empty DataFrame with a specific schema:
Accessing Properties¶
The TestData class has three main properties:
test_data.amplicon.contract # The DataContract object
test_data.amplicon.schema # The PySpark StructType schema
test_data.amplicon.df # A DataFrame with your test data
All three are lazily loaded and cached.
Getting the Result¶
After running your table logic, get the output:
Tips and Best Practices¶
1. Use Named Parameters¶
Makes your code more readable:
# Good
TestData(spark, data=data.EXPECTED_OUTPUT, contract_path="enriched/dna/amplicon_odcs.yaml")
# Less clear
TestData(spark, data.EXPECTED_OUTPUT, "enriched/dna/amplicon_odcs.yaml")
2. Keep Test Data Separate¶
Put test data in tests/data/ modules, not in the test file itself.
3. Test Isolation is Important¶
Use function-scoped fixtures (default) to ensure test isolation:
@pytest.fixture
def test_data(spark, mock_data_lake):
# Runs fresh for each test - prevents test coupling
4. Test Business Logic, Not Infrastructure¶
Focus on:
- ✅ Does my transformation produce the right output?
- ✅ What happens when lookup data is missing?
- ✅ Does upsert behavior work correctly?
Don't test:
- ❌ Schema validation (data contracts handle this)
- ❌ Data quality rules (separate quality checks handle this)
Real Example¶
See test_amplicon.py for a complete, working example with:
- Function-scoped fixtures for test isolation
- Happy path test
- Parametrized edge cases
- Upsert testing
- Empty input handling
Common Patterns¶
Pattern 1: Creating Test Data Variations¶
Use the override() helper for simple modifications:
from biocloudcore.utils.test_tools.test_functions import override
# Create a variant with different values
variant = override(
data.EXPECTED_OUTPUT[0],
project_id="TEST_PROJECT",
is_control=True
)
Pattern 2: Testing Empty Inputs¶
Always test what happens with zero records:
def test_table_with_empty_input(enriched_test_mocks, spark, mock_data_lake, test_data):
"""Test handling of completely empty raw input."""
empty_raw = spark.createDataFrame([], test_data.raw_input.schema)
MyTable(...).run(raw_input=empty_raw, ...)
result = enriched_test_mocks.result_df
assert result.count() == 0
Pattern 3: Testing Missing Lookups¶
Use parametrize to test multiple lookup scenarios:
@pytest.mark.parametrize(
"missing_table, null_field",
[
("lookup_table_1", "foreign_key_1"),
("lookup_table_2", "foreign_key_2"),
],
)
def test_missing_lookup(missing_table, null_field, ...):
empty_dataset = getattr(test_data, missing_table)
empty_df = spark.createDataFrame([], empty_dataset.schema)
inputs = {...}
inputs[missing_table] = empty_df
MyTable(...).run(**inputs)
result = enriched_test_mocks.result_df
assert result.filter(result[null_field].isNull()).count() > 0
Pattern 4: Testing Upserts¶
Verify existing records don't create duplicates:
def test_with_existing_records(enriched_test_mocks, spark, mock_data_lake, test_data):
"""Test upsert behavior."""
existing_record = override(data.EXPECTED_OUTPUT[0], some_field="OLD_VALUE")
existing_df = spark.createDataFrame([existing_record], test_data.output.schema)
MyTable(...).run(..., existing_data=existing_df)
result = enriched_test_mocks.result_df
# Should not duplicate records
assert result.count() == len(data.EXPECTED_OUTPUT)
Rollout Template¶
When creating tests for a new table, follow this template:
Step 1: Create Test Data File¶
# biocloudcore/enriched/example_layer/tests/data/test_example_table_data.py
import datetime
# Fixed timestamp for deterministic testing
FIXED_TEST_TIMESTAMP = datetime.datetime(1989, 11, 9)
# Raw input data
RAW_INPUT = [
{
"field1": "value1",
"field2": "value2",
# ... all fields
},
]
# Lookup table data
LOOKUP_TABLE_1 = [
{
"id": 1,
"name": "lookup_value",
},
]
# Expected output
EXPECTED_OUTPUT = [
{
"primary_key_id": 1,
"foreign_key_id": 1,
"field1": "value1",
"inserted_ts_utc": FIXED_TEST_TIMESTAMP,
"updated_ts_utc": FIXED_TEST_TIMESTAMP,
},
]
Step 2: Create Test File¶
# biocloudcore/enriched/example_layer/tests/test_example_table.py
"""Unit tests for example_table table transformation.
Test Coverage:
✓ Happy path: Normal processing with valid lookups
✓ Missing lookups: Empty lookup tables (null foreign keys)
✓ Empty input: Processing with zero raw records
✓ Upserts: Handling existing records without duplication
Test Data:
All test data constants are defined in test_example_table_data.py
Use the override() helper from test_functions for creating variations
"""
from dataclasses import dataclass
import pytest
import biocloudcore.enriched.example_layer.tests.data.test_example_table_data as data
from biocloudcore.conftest import TestData
from biocloudcore.enriched.example_layer.example_table import ExampleTable
from biocloudcore.utils.test_tools.test_functions import assert_test_output, override
@pytest.fixture
def test_data(spark, mock_data_lake):
"""Load contracts, schemas, and input data for example_table tests."""
@dataclass
class ExampleTableTestData:
output: TestData
raw_input: TestData
lookup_1: TestData
# Add more lookups as needed
return ExampleTableTestData(
output=TestData(spark, data=data.EXPECTED_OUTPUT,
contract_path="enriched/example_layer/example_table_odcs.yaml"),
raw_input=TestData(spark, data=data.RAW_INPUT),
lookup_1=TestData(spark, data=data.LOOKUP_TABLE_1, contract_path="enriched/example_layer/lookup_1_odcs.yaml"),
)
@pytest.mark.unit
@pytest.mark.patch_module("biocloudcore.enriched.example_layer.example_table")
@pytest.mark.fixed_timestamp(data.FIXED_TEST_TIMESTAMP)
def test_example_table_happy_path(enriched_test_mocks, spark, mock_data_lake, test_data):
"""Test normal example_table processing with valid input data."""
ExampleTable(
spark=spark,
data_lake=mock_data_lake,
data_contract=test_data.output.contract,
source="example_source",
table_name="example_table",
).run(
raw_input=test_data.raw_input.df,
lookup_1=test_data.lookup_1.df,
existing_data=spark.createDataFrame([], test_data.output.schema),
)
assert_test_output(
spark=spark,
actual=enriched_test_mocks.result_df,
expected=data.EXPECTED_OUTPUT,
schema=test_data.output.schema,
)
@pytest.mark.unit
@pytest.mark.patch_module("biocloudcore.enriched.example_layer.example_table")
@pytest.mark.fixed_timestamp(data.FIXED_TEST_TIMESTAMP)
@pytest.mark.parametrize(
"missing_table, null_field",
[
("lookup_1", "foreign_key_1_id"),
# Add more lookup tables
],
)
def test_example_table_missing_lookup(missing_table, null_field, enriched_test_mocks, spark, mock_data_lake, test_data):
"""Test example_table processing when lookup tables are empty."""
empty_dataset = getattr(test_data, missing_table)
empty_df = spark.createDataFrame([], empty_dataset.schema)
inputs = {
"raw_input": test_data.raw_input.df,
"lookup_1": test_data.lookup_1.df,
"existing_data": spark.createDataFrame([], test_data.output.schema),
}
inputs[missing_table] = empty_df
ExampleTable(
spark=spark,
data_lake=mock_data_lake,
data_contract=test_data.output.contract,
source="example_source",
table_name="example_table",
).run(**inputs)
result = enriched_test_mocks.result_df
actual_count = result.count()
assert actual_count == len(data.RAW_INPUT), f"Expected {len(data.RAW_INPUT)} records but got {actual_count}"
null_count = result.filter(result[null_field].isNull()).count()
assert null_count == len(data.RAW_INPUT), (
f"Expected all {len(data.RAW_INPUT)} records to have null {null_field}, but only {null_count} were null"
)
@pytest.mark.unit
@pytest.mark.patch_module("biocloudcore.enriched.example_layer.example_table")
@pytest.mark.fixed_timestamp(data.FIXED_TEST_TIMESTAMP)
def test_example_table_with_empty_input(enriched_test_mocks, spark, mock_data_lake, test_data):
"""Test handling of completely empty raw input."""
empty_raw = spark.createDataFrame([], test_data.raw_input.schema)
ExampleTable(
spark=spark,
data_lake=mock_data_lake,
data_contract=test_data.output.contract,
source="example_source",
table_name="example_table",
).run(
raw_input=empty_raw,
lookup_1=test_data.lookup_1.df,
existing_data=spark.createDataFrame([], test_data.output.schema),
)
result = enriched_test_mocks.result_df
actual_count = result.count()
assert actual_count == 0, f"Expected 0 records with empty input but got {actual_count}"
@pytest.mark.unit
@pytest.mark.patch_module("biocloudcore.enriched.example_layer.example_table")
@pytest.mark.fixed_timestamp(data.FIXED_TEST_TIMESTAMP)
def test_example_table_with_existing_records(enriched_test_mocks, spark, mock_data_lake, test_data):
"""Test example_table processing with existing records (upsert behavior)."""
existing_record = override(data.EXPECTED_OUTPUT[0], some_field="OLD_VALUE")
existing_data_df = spark.createDataFrame([existing_record], test_data.output.schema)
ExampleTable(
spark=spark,
data_lake=mock_data_lake,
data_contract=test_data.output.contract,
source="example_source",
table_name="example_table",
).run(
raw_input=test_data.raw_input.df,
lookup_1=test_data.lookup_1.df,
existing_data=existing_data_df,
)
result = enriched_test_mocks.result_df
actual_count = result.count()
expected_count = len(data.EXPECTED_OUTPUT)
assert actual_count == expected_count, (
f"Expected {expected_count} records after upsert but got {actual_count} (check for duplicates)"
)
# Verify primary key was reused
primary_keys = [row["example_table_id"] for row in result.collect()]
assert 1 in primary_keys, f"Expected primary key 1 to exist after upsert but got: {primary_keys}"
Step 3: Checklist¶
Before committing:
- Test data file created with all constants
- Test file created with all 4 core tests (happy path, missing lookups, empty input, upserts)
- Module docstring added explaining test coverage
- All tests pass:
pytest test_example_table.py -v - Test with coverage:
pytest test_example_table.py --cov=biocloudcore.enriched.example_layer.example_table
Resources¶
- Chispa Documentation - Better DataFrame assertions
- Pytest Documentation - Testing framework
- Data Contracts - How schemas are defined
- override() Helper - Test data variations