Skip to content

Why Unit tests?

We are testing our code to ensure it works as expected. Our data quality tests do check a lot of things, but we also need to make sure that our code is working as expected. Unit tests are a great way to do this, and give us a lot of assurance that our pipelines behave as expected.

It also enables us to start developing differntly. Instead of writing code hoping it will deliver a result we are happy with, we can start to do test driven development. This allows us to first think about the expected output of our code, and then write the code and make changes to the busines logic until we get the expected output. This is a better way to develop and might even save us time in the future.

Unit test framework

Prepare Enriched layer for test driven development

To start implementing test driven development, we had to prepare our enriched layer for it and change the way we do IOs.

Enriched layer before test driven development

Enriched before unit test framework

In the image you can see our enriched layer used to do Input/Output (IO) processing in the business logic code * table.py*. This is not good for test driven development as you would have to patch all the IOs in the business logic code making it very complex and difficult to properly do test driven development. This we had to change to reach the result show in the next image.

Enriched layer after test driven development

Enriched after unit test framework

In the image you can see the changes made, to make sure all IO is done in the run_table.py file, this makes sure we have a clean input -> pipeline -> output structure, which makes it easy to do test driven development without too much mocking or pathcing involved.

In green you can see the implemented testing framework we use on each enriched pipeline:

  1. In test_table_data.py we define the expected output of the table we want to reach.
  2. In test_table.py we define the test data that we will use instead of actual input data.
    • For this example this is the test data for raw_table and enriched_table
  3. test_table.py gets both the input and expected output data from test_table_data.py.
  4. test_table,py executes the table.py business logic and mocks the necessary IOs.
  5. test_table.py catures the dataframe that is created by the business logic right before upserting to the table.
  6. This dataframe is compared to the expected output dataframe, if it is the same the test passes. If not, Chispa (our pyspark testing library) will show clearly the differences between the 2 dataframes.

We can test the business logic with many different edge cases in the inputs, to test all the different scenarios.

How to Write Unit Tests

A practical guide to testing PySpark data pipelines in biocloud-core.

Why This Testing Approach?

Testing PySpark transformations has three main challenges:

  1. Schema complexity - Manually defining PySpark schemas is tedious and error-prone
  2. Test data setup - Managing multiple input DataFrames and expected outputs gets messy
  3. Unclear failures - Default PySpark assertions don't show you what is different

Our testing framework solves these problems:

  • Schemas are automatically loaded from data contracts (no manual StructType definitions)
  • The TestData class organizes your inputs and outputs cleanly
  • Chispa shows you exactly which rows and columns differ when tests fail

Writing Your First Test

Let's walk through creating a test for a table called amplicon.

Step 1: Create Your Test Data File

First, define your test data in a separate Python file:

# biocloudcore/enriched/dna/tests/data/test_amplicon_data.py
import datetime

# Use a fixed timestamp so tests are deterministic
FIXED_TEST_TIMESTAMP = datetime.datetime(1989, 11, 9)

# Raw input data
AMPLICONS_RAW = [
    {
        "consensuses_url": "https://link_to_endpoint/api/samples/26/consensuses/",
        "created_at": "2024-06-06T11:29:19.681000",
        "extract_id": "e1100039674",
        "forward_primer": "ONT_LepF1+ONT_LCO1490",
        "id": 26,
        "is_control": False,
        "marker": "COI-5P",
        # ... all other fields
        "inserted_ts_utc": datetime.datetime(2024, 6, 6, 11, 29, 19, 681000),
        "updated_ts_utc": datetime.datetime(2024, 6, 6, 11, 29, 19, 681000),
    },
]

# Lookup table data
DNA_EXTRACT = [
    {
        "dna_extract_golden_id": 1,
        "material_entity_id": None,
        "dna_extract_id": "e1100039674",
        "catalog_number": "RMNH.5143366",
        "stock_plate_id": "NCBN001711",
        "last_updated_source": "nanopore",
        "inserted_ts_utc": datetime.datetime(2024, 6, 6, 11, 0, 0),
        "updated_ts_utc": datetime.datetime(2024, 6, 6, 11, 0, 0),
    },
]

SEQUENCING_RUN = [
    {
        "sequencing_run_id": 1,
        "title": "Sample_Pool_Lepidoptera",
        "project_name": "ARISE_Project",
        "end_date_time": datetime.datetime(2024, 6, 6, 10, 0, 0),
        "source_id": "969410a3-a2cb-42c9-ae97-2194a96870e7",
        "source": "nanopore",
        "inserted_ts_utc": datetime.datetime(2024, 6, 6, 10, 30, 0),
        "updated_ts_utc": datetime.datetime(2024, 6, 6, 10, 30, 0),
    },
]

# What you expect the output to look like
EXPECTED_OUTPUT = [
    {
        "amplicon_id": 1,
        "dna_extract_golden_id": 1,
        "sequencing_run_id": 1,
        "dna_extract_id": "e1100039674",
        "project_id": "23009-5004000157",
        "pcr_id": "LS027",
        "primer_name_forward": "ONT_LepF1+ONT_LCO1490",
        "primer_name_reverse": "ONT_LepR1+ONT_HCO2198",
        "marker": "COI-5P",
        "is_control": False,
        "source_id": "26",
        "source": "nanopore",
        "inserted_ts_utc": FIXED_TEST_TIMESTAMP,
        "updated_ts_utc": FIXED_TEST_TIMESTAMP,
    },
]

Step 2: Set Up Your Test Fixture

Create a fixture that loads all your test data:

# biocloudcore/enriched/dna/tests/test_amplicon.py
from dataclasses import dataclass
import pytest

import biocloudcore.enriched.dna.tests.data.test_amplicon_data as data
from biocloudcore.conftest import TestData
from biocloudcore.enriched.dna.amplicon import Amplicon
from biocloudcore.utils.test_tools.test_functions import assert_test_output, override


@pytest.fixture
def test_data(spark, mock_data_lake):
    """Load all test data for amplicon tests."""

    # Define a container for all your datasets
    @dataclass
    class AmpliconTestData:
        amplicon: TestData  # Output table
        raw_amplicons: TestData  # Raw input
        dna_extract: TestData  # Lookup table 1
        sequencing_run: TestData  # Lookup table 2
        existing_data: TestData  # For testing upserts

    return AmpliconTestData(
        # Output table - needs contract for schema
        amplicon=TestData(
            spark,
            data=data.EXPECTED_OUTPUT,
            contract_path="enriched/dna/amplicon_odcs.yaml"
        ),
        # Raw input - no schema needed, Spark can infer
        raw_amplicons=TestData(spark, data=data.AMPLICONS_RAW),
        # Lookup tables - need contracts for schema
        dna_extract=TestData(
            spark,
            data=data.DNA_EXTRACT,
            contract_path="enriched/dna/dna_extract_odcs.yaml"
        ),
        sequencing_run=TestData(
            spark,
            data=data.SEQUENCING_RUN,
            contract_path="enriched/dna/sequencing_run_odcs.yaml"
        ),
        # Empty for now, used in upsert tests
        existing_data=TestData(
            spark,
            data=[],
            contract_path="enriched/dna/amplicon_odcs.yaml"
        ),
    )

What's happening here:

  • TestData wraps your test data and automatically loads schemas from contracts
  • When you provide a contract_path, it loads the schema from that YAML file
  • When you don't, Spark infers the schema from your data
  • Everything is lazily loaded - schemas only load when you access them

Step 3: Write Your Test

Now write your actual test:

@pytest.mark.unit
@pytest.mark.patch_module("biocloudcore.enriched.dna.amplicon")
@pytest.mark.fixed_timestamp(data.FIXED_TEST_TIMESTAMP)
def test_amplicon_happy_path(enriched_test_mocks, spark, mock_data_lake, test_data):
    """Test that amplicon processes raw data correctly."""

    # Run your table logic
    Amplicon(
        spark=spark,
        data_lake=mock_data_lake,
        data_contract=test_data.amplicon.contract,
        source="nanopore",
        table_name="amplicon",
    ).run(
        raw_amplicons=test_data.raw_amplicons.df,
        dna_extract=test_data.dna_extract.df,
        sequencing_run=test_data.sequencing_run.df,
        existing_data=test_data.existing_data.df,
    )

    # Check the output matches expectations
    assert_test_output(
        spark=spark,
        actual=enriched_test_mocks.result_df,  # What your code produced
        expected=data.EXPECTED_OUTPUT,  # What you expected
        schema=test_data.amplicon.schema  # Schema to enforce
    )

Understanding the markers:

  • @pytest.mark.unit - Tags this as a unit test (run with pytest -m unit)
  • @pytest.mark.patch_module(...) - Tells the framework which module to mock
  • @pytest.mark.fixed_timestamp(...) - Ensures timestamps are deterministic

Understanding the mocks:

  • enriched_test_mocks automatically mocks upsert_to_delta_table and add_timestamp_columns
  • enriched_test_mocks.result_df gives you the DataFrame that would have been written to the table
  • You don't need to write any mocking code yourself!

Why Chispa Makes Testing Better

When a test fails, you want to know exactly what's wrong. Here's the difference:

Without Chispa (default PySpark)

AssertionError: DataFrames are not equal

That's it. You have no idea what's different.

With Chispa

DataFramesNotEqual: DataFrame are not equal

Rows in actual but not in expected:
+---+----------+-------------+
| id| dna_extract_id | project_id |
+---+----------+-------------+
|  3| e1100039999    | WRONG_ID   |
+---+----------+-------------+

Rows in expected but not in actual:
+---+----------+-------------+
| id| dna_extract_id | project_id |
+---+----------+-------------+
|  3| e1100039999    | CORRECT_ID |
+---+----------+-------------+

Now you can see:

  • Which rows are different
  • Which specific values don't match
  • Exactly what to fix

This is why we use assert_test_output - it uses Chispa under the hood to give you these helpful error messages.

Testing Edge Cases

Good tests cover more than just the happy path. Here's how to test edge cases:

Testing Empty Lookup Tables

What happens when a lookup table is empty?

@pytest.mark.unit
@pytest.mark.patch_module("biocloudcore.enriched.dna.amplicon")
@pytest.mark.fixed_timestamp(data.FIXED_TEST_TIMESTAMP)
@pytest.mark.parametrize(
    "missing_table,null_field",
    [
        ("dna_extract", "dna_extract_golden_id"),
        ("sequencing_run", "sequencing_run_id"),
    ],
)
def test_amplicon_missing_lookup(missing_table, null_field, enriched_test_mocks, spark, mock_data_lake, test_data):
    """Test that missing lookups result in null values."""

    # Create an empty version of the lookup table
    empty_dataset = getattr(test_data, missing_table)
    empty_df = spark.createDataFrame([], empty_dataset.schema)

    # Build inputs with one table empty
    inputs = {
        "raw_amplicons": test_data.raw_amplicons.df,
        "dna_extract": test_data.dna_extract.df,
        "sequencing_run": test_data.sequencing_run.df,
        "existing_data": test_data.existing_data.df,
    }
    inputs[missing_table] = empty_df  # Replace with empty

    Amplicon(...).run(**inputs)

    # Verify the field is null when lookup is missing
    result = enriched_test_mocks.result_df
    assert result.filter(result[null_field].isNull()).count() == 2

What's useful here:

  • @pytest.mark.parametrize runs the same test with different parameters
  • One test function tests two scenarios (empty dna_extract and empty sequencing_run)
  • Uses getattr() to dynamically access the right test data

Testing Upsert Behavior

Test that existing records don't get duplicated:

@pytest.mark.unit
@pytest.mark.patch_module("biocloudcore.enriched.dna.amplicon")
@pytest.mark.fixed_timestamp(data.FIXED_TEST_TIMESTAMP)
def test_amplicon_with_existing_records(enriched_test_mocks, spark, mock_data_lake, test_data):
    """Test that existing records are not duplicated."""
    from biocloudcore.utils.test_tools.test_functions import override

    # Create an existing record using override helper
    existing_record = override(
        data.EXPECTED_OUTPUT[0],
        project_id="OLD_PROJECT",
        is_control=True,
    )

    existing_data_df = spark.createDataFrame(
        [existing_record],
        test_data.amplicon.schema
    )

    Amplicon(
        spark=spark,
        data_lake=mock_data_lake,
        data_contract=test_data.amplicon.contract,
        source="nanopore",
        table_name="amplicon",
    ).run(
        raw_amplicons=test_data.raw_amplicons.df,
        dna_extract=test_data.dna_extract.df,
        sequencing_run=test_data.sequencing_run.df,
        existing_data=existing_data_df,
    )

    result = enriched_test_mocks.result_df
    actual_count = result.count()
    assert actual_count == 2, f"Expected 2 records (no duplicates) but got {actual_count}"

    # Verify the existing primary key is reused
    ids = [row.amplicon_id for row in result.collect()]
    assert 1 in ids, f"Expected amplicon_id=1 to be reused but got IDs: {ids}"

Running Your Tests

# Run all tests
pytest

# Run only unit tests (fast!)
pytest -m unit

# Run tests for a specific file
pytest biocloudcore/enriched/dna/tests/test_amplicon.py

# Run tests in parallel (faster)
pytest -n auto

# Run with more detail
pytest -v

Common Patterns

Creating Empty DataFrames

Sometimes you need an empty DataFrame with a specific schema:

empty_df = spark.createDataFrame([], test_data.dna_extract.schema)

Accessing Properties

The TestData class has three main properties:

test_data.amplicon.contract  # The DataContract object
test_data.amplicon.schema  # The PySpark StructType schema
test_data.amplicon.df  # A DataFrame with your test data

All three are lazily loaded and cached.

Getting the Result

After running your table logic, get the output:

result = enriched_test_mocks.result_df  # The DataFrame that would be written

Tips and Best Practices

1. Use Named Parameters

Makes your code more readable:

# Good
TestData(spark, data=data.EXPECTED_OUTPUT, contract_path="enriched/dna/amplicon_odcs.yaml")

# Less clear
TestData(spark, data.EXPECTED_OUTPUT, "enriched/dna/amplicon_odcs.yaml")

2. Keep Test Data Separate

Put test data in tests/data/ modules, not in the test file itself.

3. Test Isolation is Important

Use function-scoped fixtures (default) to ensure test isolation:

@pytest.fixture
def test_data(spark, mock_data_lake):
# Runs fresh for each test - prevents test coupling

4. Test Business Logic, Not Infrastructure

Focus on:

  • ✅ Does my transformation produce the right output?
  • ✅ What happens when lookup data is missing?
  • ✅ Does upsert behavior work correctly?

Don't test:

  • ❌ Schema validation (data contracts handle this)
  • ❌ Data quality rules (separate quality checks handle this)

Real Example

See test_amplicon.py for a complete, working example with:

  • Function-scoped fixtures for test isolation
  • Happy path test
  • Parametrized edge cases
  • Upsert testing
  • Empty input handling

Common Patterns

Pattern 1: Creating Test Data Variations

Use the override() helper for simple modifications:

from biocloudcore.utils.test_tools.test_functions import override

# Create a variant with different values
variant = override(
    data.EXPECTED_OUTPUT[0],
    project_id="TEST_PROJECT",
    is_control=True
)

Pattern 2: Testing Empty Inputs

Always test what happens with zero records:

def test_table_with_empty_input(enriched_test_mocks, spark, mock_data_lake, test_data):
    """Test handling of completely empty raw input."""
    empty_raw = spark.createDataFrame([], test_data.raw_input.schema)

    MyTable(...).run(raw_input=empty_raw, ...)

    result = enriched_test_mocks.result_df
    assert result.count() == 0

Pattern 3: Testing Missing Lookups

Use parametrize to test multiple lookup scenarios:

@pytest.mark.parametrize(
    "missing_table, null_field",
    [
        ("lookup_table_1", "foreign_key_1"),
        ("lookup_table_2", "foreign_key_2"),
    ],
)
def test_missing_lookup(missing_table, null_field, ...):
    empty_dataset = getattr(test_data, missing_table)
    empty_df = spark.createDataFrame([], empty_dataset.schema)

    inputs = {...}
    inputs[missing_table] = empty_df

    MyTable(...).run(**inputs)

    result = enriched_test_mocks.result_df
    assert result.filter(result[null_field].isNull()).count() > 0

Pattern 4: Testing Upserts

Verify existing records don't create duplicates:

def test_with_existing_records(enriched_test_mocks, spark, mock_data_lake, test_data):
    """Test upsert behavior."""
    existing_record = override(data.EXPECTED_OUTPUT[0], some_field="OLD_VALUE")
    existing_df = spark.createDataFrame([existing_record], test_data.output.schema)

    MyTable(...).run(..., existing_data=existing_df)

    result = enriched_test_mocks.result_df
    # Should not duplicate records
    assert result.count() == len(data.EXPECTED_OUTPUT)

Rollout Template

When creating tests for a new table, follow this template:

Step 1: Create Test Data File

# biocloudcore/enriched/example_layer/tests/data/test_example_table_data.py
import datetime

# Fixed timestamp for deterministic testing
FIXED_TEST_TIMESTAMP = datetime.datetime(1989, 11, 9)

# Raw input data
RAW_INPUT = [
    {
        "field1": "value1",
        "field2": "value2",
        # ... all fields
    },
]

# Lookup table data
LOOKUP_TABLE_1 = [
    {
        "id": 1,
        "name": "lookup_value",
    },
]

# Expected output
EXPECTED_OUTPUT = [
    {
        "primary_key_id": 1,
        "foreign_key_id": 1,
        "field1": "value1",
        "inserted_ts_utc": FIXED_TEST_TIMESTAMP,
        "updated_ts_utc": FIXED_TEST_TIMESTAMP,
    },
]

Step 2: Create Test File

# biocloudcore/enriched/example_layer/tests/test_example_table.py
"""Unit tests for example_table table transformation.

Test Coverage:
    ✓ Happy path: Normal processing with valid lookups
    ✓ Missing lookups: Empty lookup tables (null foreign keys)
    ✓ Empty input: Processing with zero raw records
    ✓ Upserts: Handling existing records without duplication

Test Data:
    All test data constants are defined in test_example_table_data.py
    Use the override() helper from test_functions for creating variations
"""
from dataclasses import dataclass

import pytest

import biocloudcore.enriched.example_layer.tests.data.test_example_table_data as data
from biocloudcore.conftest import TestData
from biocloudcore.enriched.example_layer.example_table import ExampleTable
from biocloudcore.utils.test_tools.test_functions import assert_test_output, override


@pytest.fixture
def test_data(spark, mock_data_lake):
    """Load contracts, schemas, and input data for example_table tests."""

    @dataclass
    class ExampleTableTestData:
        output: TestData
        raw_input: TestData
        lookup_1: TestData
        # Add more lookups as needed

    return ExampleTableTestData(
        output=TestData(spark, data=data.EXPECTED_OUTPUT,
                        contract_path="enriched/example_layer/example_table_odcs.yaml"),
        raw_input=TestData(spark, data=data.RAW_INPUT),
        lookup_1=TestData(spark, data=data.LOOKUP_TABLE_1, contract_path="enriched/example_layer/lookup_1_odcs.yaml"),
    )


@pytest.mark.unit
@pytest.mark.patch_module("biocloudcore.enriched.example_layer.example_table")
@pytest.mark.fixed_timestamp(data.FIXED_TEST_TIMESTAMP)
def test_example_table_happy_path(enriched_test_mocks, spark, mock_data_lake, test_data):
    """Test normal example_table processing with valid input data."""
    ExampleTable(
        spark=spark,
        data_lake=mock_data_lake,
        data_contract=test_data.output.contract,
        source="example_source",
        table_name="example_table",
    ).run(
        raw_input=test_data.raw_input.df,
        lookup_1=test_data.lookup_1.df,
        existing_data=spark.createDataFrame([], test_data.output.schema),
    )

    assert_test_output(
        spark=spark,
        actual=enriched_test_mocks.result_df,
        expected=data.EXPECTED_OUTPUT,
        schema=test_data.output.schema,
    )


@pytest.mark.unit
@pytest.mark.patch_module("biocloudcore.enriched.example_layer.example_table")
@pytest.mark.fixed_timestamp(data.FIXED_TEST_TIMESTAMP)
@pytest.mark.parametrize(
    "missing_table, null_field",
    [
        ("lookup_1", "foreign_key_1_id"),
        # Add more lookup tables
    ],
)
def test_example_table_missing_lookup(missing_table, null_field, enriched_test_mocks, spark, mock_data_lake, test_data):
    """Test example_table processing when lookup tables are empty."""
    empty_dataset = getattr(test_data, missing_table)
    empty_df = spark.createDataFrame([], empty_dataset.schema)

    inputs = {
        "raw_input": test_data.raw_input.df,
        "lookup_1": test_data.lookup_1.df,
        "existing_data": spark.createDataFrame([], test_data.output.schema),
    }
    inputs[missing_table] = empty_df

    ExampleTable(
        spark=spark,
        data_lake=mock_data_lake,
        data_contract=test_data.output.contract,
        source="example_source",
        table_name="example_table",
    ).run(**inputs)

    result = enriched_test_mocks.result_df
    actual_count = result.count()
    assert actual_count == len(data.RAW_INPUT), f"Expected {len(data.RAW_INPUT)} records but got {actual_count}"

    null_count = result.filter(result[null_field].isNull()).count()
    assert null_count == len(data.RAW_INPUT), (
        f"Expected all {len(data.RAW_INPUT)} records to have null {null_field}, but only {null_count} were null"
    )


@pytest.mark.unit
@pytest.mark.patch_module("biocloudcore.enriched.example_layer.example_table")
@pytest.mark.fixed_timestamp(data.FIXED_TEST_TIMESTAMP)
def test_example_table_with_empty_input(enriched_test_mocks, spark, mock_data_lake, test_data):
    """Test handling of completely empty raw input."""
    empty_raw = spark.createDataFrame([], test_data.raw_input.schema)

    ExampleTable(
        spark=spark,
        data_lake=mock_data_lake,
        data_contract=test_data.output.contract,
        source="example_source",
        table_name="example_table",
    ).run(
        raw_input=empty_raw,
        lookup_1=test_data.lookup_1.df,
        existing_data=spark.createDataFrame([], test_data.output.schema),
    )

    result = enriched_test_mocks.result_df
    actual_count = result.count()
    assert actual_count == 0, f"Expected 0 records with empty input but got {actual_count}"


@pytest.mark.unit
@pytest.mark.patch_module("biocloudcore.enriched.example_layer.example_table")
@pytest.mark.fixed_timestamp(data.FIXED_TEST_TIMESTAMP)
def test_example_table_with_existing_records(enriched_test_mocks, spark, mock_data_lake, test_data):
    """Test example_table processing with existing records (upsert behavior)."""
    existing_record = override(data.EXPECTED_OUTPUT[0], some_field="OLD_VALUE")
    existing_data_df = spark.createDataFrame([existing_record], test_data.output.schema)

    ExampleTable(
        spark=spark,
        data_lake=mock_data_lake,
        data_contract=test_data.output.contract,
        source="example_source",
        table_name="example_table",
    ).run(
        raw_input=test_data.raw_input.df,
        lookup_1=test_data.lookup_1.df,
        existing_data=existing_data_df,
    )

    result = enriched_test_mocks.result_df
    actual_count = result.count()
    expected_count = len(data.EXPECTED_OUTPUT)
    assert actual_count == expected_count, (
        f"Expected {expected_count} records after upsert but got {actual_count} (check for duplicates)"
    )

    # Verify primary key was reused
    primary_keys = [row["example_table_id"] for row in result.collect()]
    assert 1 in primary_keys, f"Expected primary key 1 to exist after upsert but got: {primary_keys}"

Step 3: Checklist

Before committing:

  • Test data file created with all constants
  • Test file created with all 4 core tests (happy path, missing lookups, empty input, upserts)
  • Module docstring added explaining test coverage
  • All tests pass: pytest test_example_table.py -v
  • Test with coverage: pytest test_example_table.py --cov=biocloudcore.enriched.example_layer.example_table

Resources