Skip to content

Common Functions in the Raw Layer

The common functions in the raw layer are mostly used for validation, cleaning and S3 interactions.

biocloudcore.raw.common_validator_functions

This file contains common validator functions used when validating raw data coming in.

column_has_type(df, column, expected_type)

Checks if the given column has the given type.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to check

required
column str

Column name to check

required
expected_type DataType | str

Expected data type

required

Returns:

Name Type Description
bool bool

True if column has expected type, False otherwise

column_has_unique_values(column_name, df, exception_value=None)

Checks whether all the values in the given column are unique, ignoring a specific exception value if provided.

Parameters:

Name Type Description Default
column_name str

Column name to check

required
df DataFrame

The DataFrame to check

required
exception_value str

value to ignore in uniqueness check

None

Returns:

Name Type Description
bool bool

True if all values are unique, False otherwise

day_possible_in_month(day, month)

Checks whether the combination of day and month is possible.

For example, we reject November 31. This is especially useful when validating sheet data (as dates are often free text input).

Parameters:

Name Type Description Default
day int

Day of the month

required
month int

Month number (1-12)

required

Returns:

Name Type Description
bool bool

True if the day is possible in the given month, False otherwise

filter_list_from_dataframe_column(spark, df, column_name, filter_data)

Filters a DataFrame based on whether values in a specific column exist in a provided list.

Parameters:

Name Type Description Default
spark SparkSession

The SparkSession instance.

required
df DataFrame

The input DataFrame to filter.

required
column_name str

The name of the column to check against the list.

required
filter_data list | DataFrame

A dataframe or list of values to filter by.

required

Returns:

Type Description
tuple[DataFrame, DataFrame]

tuple[DataFrame, DataFrame]: A tuple of (outside_filter_df, within_filter_df).

find_non_unique_values(column_name, df)

Finds all non-unique values in the given column of the given dataframe.

Parameters:

Name Type Description Default
column_name str

Column name to check

required
df DataFrame

The DataFrame to check

required

Returns:

Name Type Description
list list

List of non-unique values

find_overlapping_values(df1, df2, column1, column2)

Checks whether any value in df1[column1] appears in df2[column2].

Parameters:

Name Type Description Default
df1 DataFrame

which values to check against df2

required
df2 DataFrame

whose values are checked for duplicates from df1

required
column1 str

name of column of df1 to check

required
column2 str

name of column of df2 to check

required

Returns:

Name Type Description
list list

List of overlapping values

is_subset(list1, list2)

Checks if the first list or set is a subset of the second list or set.

Parameters:

Name Type Description Default
list1 list | set

First list or set

required
list2 list | set

Second list or set

required

Returns:

Name Type Description
bool bool

True if list1 is a subset of list2, False otherwise

is_valid_be_number(df, column)

Checks if a given column contains valid BE numbers, and returns a list of invalid ones.

BE numbers must follow the format: - Start with BE - Followed by a dot - End with 7 digits For example: BE.1234567

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to check

required
column str

name of the column that contains the BE numbers.

required

Returns:

Type Description
list[str]

list[str]: List of all the reasons why the current dataset is not processable.

validate_free_text_date_and_time_columns(df, date_columns=None, time_columns=None)

Validates date and time columns in a DataFrame by checking their format and validity. This is especially useful when validating sheet data where dates and times are free text input.

For dates, the function verifies if values conform to 'dd/mm/yyyy' and validates that the day and month values are within their respective valid ranges. For times, it verifies if values conform to 'h:mm', 'hh:mm', 'h🇲🇲ss', or 'hh🇲🇲ss' and validates hours, minutes, and seconds ranges.

If any issues are found, error messages are generated specifying the exact problem.

df : DataFrame The input DataFrame containing the data to be validated. date_columns : list[str], optional A list of column names in the DataFrame that represent date fields to be validated. Defaults to None. time_columns : list[str], optional A list of column names in the DataFrame that represent time fields to be validated. Defaults to None.

list[str] A list of error messages indicating any issues found in the format or validity of the date and/or time values within the specified columns. Each message includes the column name and the problematic value.

biocloudcore.raw.common_cleaning_functions

This file contains common cleaning functions for cleaning raw data.

columns_to_snake_case(df)

This function converts all column names in a dataframe to snake case.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark dataframe to be converted. Can contain columns in camelCase, PascalCase, kebab-case, dot.case, space case, or combinations of these.

required

Returns:

Name Type Description
DataFrame DataFrame

The dataframe with column names formatted in snake case.

flatten_nested_structs(df)

Flatten a DataFrame by expanding nested StructType columns into separate columns.

This function recursively flattens all the nested struct columns in the given DataFrame. For each struct column, each field inside the struct is extracted into its own column. The resulting DataFrame will have no nested structs.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame with potentially nested StructType columns.

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame where all nested structs are flattened into individual columns.

Example

Given the following example DataFrame schema:

root |-- id: integer (nullable = true) |-- name: string (nullable = true) |-- address: struct (nullable = true) | |-- city: string (nullable = true) | |-- state: string (nullable = true) |-- contact: struct (nullable = true) | |-- email: string (nullable = true) | |-- phone: struct (nullable = true) | | |-- home: string (nullable = true) | | |-- mobile: string (nullable = true)

Input DataFrame:

+---+----+----------------+----------------------------------------+ | id|name| address| contact | +---+----+----------------+----------------------------------------+ | 1|John| {New York, NY}| {john@example.com, {123-456, 987-654}} | +---+----+----------------+----------------------------------------+

After applying the function flatten_nested_structs(df), the DataFrame will be:

Output DataFrame:

+---+----+-------------+--------------+-----------------+------------------+ | id|name|address_city |address_state |contact_email |contact_phone_home| +---+----+-------------+--------------+-----------------+------------------+ | 1|John| New York | NY |john@example.com | 123-456 | +---+----+-------------+-----------+--------------------+------------------+

flatten_nested_structs_and_arrays(df)

This function is similar in functionality to flatten_nested_structs, however it flattens also nested arrays of dictionaries.

Returns:

Name Type Description
DataFrame DataFrame

the flattened DataFrame.

biocloudcore.raw.common_bucket_functions

This file contains common bucket functions used for interacting with S3 buckets.

find_metadata_file_path(s3_client, bucket_name, file_prefix, entity)

Find a metadata file for a given entity in the specified S3 bucket and prefix and return the S3 path to the file.

Parameters:

Name Type Description Default
s3_client S3Client

The S3 service client used to list objects.

required
bucket_name str

The name of the S3 bucket.

required
file_prefix str

The prefix path within the S3 bucket.

required
entity str

The entity name to search for in the metadata files.

required

Returns:

Name Type Description
str str

The S3 path to the metadata file.

Raises:

Type Description
Exception

If the metadata file for the specified entity is not found.

find_metadata_files(s3_client, bucket_name, file_prefix)

Find all metadata files in the specified S3 bucket and prefix and return the S3 paths to the files.

Parameters:

Name Type Description Default
s3_client S3Client

The S3 service client used to list objects.

required
bucket_name str

The name of the S3 bucket.

required
file_prefix str

The prefix path within the S3 bucket.

required

Returns:

Name Type Description
list list

list of S3 paths to the metadata files.

get_latest_dump_directory(s3_client, landing_zone_bucket, dump_folder)

Get the latest timestamped folder from a dump folder.

Every time we retrieve a dump from a source, we save that dump to a timestamped folder. This function will find the folder with the latest timestamp and return that as a string.

Parameters:

Name Type Description Default
s3_client S3Client

S3 client to interact with S3

required
landing_zone_bucket str

name of the bucket where the dump is stored

required
dump_folder str

folder where the dump is stored

required

Returns:

Name Type Description
str str

the absolute path to the latest timestamped folder

biocloudcore.raw.common_s3_functions

This file contains common S3 functions used for interacting with data on S3.

find_metadata_file_path(s3_client, bucket_name, file_prefix, entity)

Find a metadata file for a given entity in the specified S3 bucket and prefix and return the S3 path to the file.

Parameters:

Name Type Description Default
s3_client S3Client

The S3 service client used to list objects.

required
bucket_name str

The name of the S3 bucket.

required
file_prefix str

The prefix path within the S3 bucket.

required
entity str

The entity name to search for in the metadata files.

required

Returns:

Name Type Description
str str

The S3 path to the metadata file.

Raises:

Type Description
Exception

If the metadata file for the specified entity is not found.

find_metadata_files(s3_client, bucket_name, file_prefix)

Find all metadata files in the specified S3 bucket and prefix and return the S3 paths to the files.

Parameters:

Name Type Description Default
s3_client S3Client

The S3 service client used to list objects.

required
bucket_name str

The name of the S3 bucket.

required
file_prefix str

The prefix path within the S3 bucket.

required

Returns:

Name Type Description
list list

list of S3 paths to the metadata files.

get_device_folders_with_payload(s3_client, source, bucket, prefix, device_limit)

Retrieve the device folders containing payload data to process from the specified S3 bucket.

Parameters:

Name Type Description Default
s3_client S3Client

The S3 client instance used for interacting with the S3 service.

required
source str

A descriptive source name for logging purposes.

required
bucket str

The name of the S3 bucket where the device folders are located.

required
prefix str

The S3 prefix to filter folders by (e.g., a base folder path).

required
device_limit int

The maximum number of device folders to process. If None, all folders will be processed.

required

Returns:

Type Description
list[str]

list[str]: A list of device folder paths to process.

move_files_to_processed_folder(s3_client, max_upload_workers, source_bucket_name, uploaded_files_df)

Move uploaded files to the 'processed' folder in the same source bucket.

Parameters:

Name Type Description Default
s3_client S3Client

The S3 client instance used for interacting with the S3 service.

required
max_upload_workers int

The maximum number of threads that can upload simultaneously

required
source_bucket_name str

The name of the source S3 bucket where the files are stored.

required
uploaded_files_df DataFrame

A Dataframe of uploaded file metadata (including source paths).

required