Common Functions in the Raw Layer¶
The common functions in the raw layer are mostly used for validation, cleaning and S3 interactions.
biocloudcore.raw.common_validator_functions
¶
This file contains common validator functions used when validating raw data coming in.
column_has_type(df, column, expected_type)
¶
Checks if the given column has the given type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
The DataFrame to check |
required |
column
|
str
|
Column name to check |
required |
expected_type
|
DataType | str
|
Expected data type |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if column has expected type, False otherwise |
column_has_unique_values(column_name, df, exception_value=None)
¶
Checks whether all the values in the given column are unique, ignoring a specific exception value if provided.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
column_name
|
str
|
Column name to check |
required |
df
|
DataFrame
|
The DataFrame to check |
required |
exception_value
|
str
|
value to ignore in uniqueness check |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if all values are unique, False otherwise |
day_possible_in_month(day, month)
¶
Checks whether the combination of day and month is possible.
For example, we reject November 31. This is especially useful when validating sheet data (as dates are often free text input).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
day
|
int
|
Day of the month |
required |
month
|
int
|
Month number (1-12) |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if the day is possible in the given month, False otherwise |
filter_list_from_dataframe_column(spark, df, column_name, filter_data)
¶
Filters a DataFrame based on whether values in a specific column exist in a provided list.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
The SparkSession instance. |
required |
df
|
DataFrame
|
The input DataFrame to filter. |
required |
column_name
|
str
|
The name of the column to check against the list. |
required |
filter_data
|
list | DataFrame
|
A dataframe or list of values to filter by. |
required |
Returns:
| Type | Description |
|---|---|
tuple[DataFrame, DataFrame]
|
tuple[DataFrame, DataFrame]: A tuple of (outside_filter_df, within_filter_df). |
find_non_unique_values(column_name, df)
¶
Finds all non-unique values in the given column of the given dataframe.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
column_name
|
str
|
Column name to check |
required |
df
|
DataFrame
|
The DataFrame to check |
required |
Returns:
| Name | Type | Description |
|---|---|---|
list |
list
|
List of non-unique values |
find_overlapping_values(df1, df2, column1, column2)
¶
Checks whether any value in df1[column1] appears in df2[column2].
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df1
|
DataFrame
|
which values to check against df2 |
required |
df2
|
DataFrame
|
whose values are checked for duplicates from df1 |
required |
column1
|
str
|
name of column of df1 to check |
required |
column2
|
str
|
name of column of df2 to check |
required |
Returns:
| Name | Type | Description |
|---|---|---|
list |
list
|
List of overlapping values |
is_subset(list1, list2)
¶
Checks if the first list or set is a subset of the second list or set.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
list1
|
list | set
|
First list or set |
required |
list2
|
list | set
|
Second list or set |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if list1 is a subset of list2, False otherwise |
is_valid_be_number(df, column)
¶
Checks if a given column contains valid BE numbers, and returns a list of invalid ones.
BE numbers must follow the format: - Start with BE - Followed by a dot - End with 7 digits For example: BE.1234567
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
The DataFrame to check |
required |
column
|
str
|
name of the column that contains the BE numbers. |
required |
Returns:
| Type | Description |
|---|---|
list[str]
|
list[str]: List of all the reasons why the current dataset is not processable. |
validate_free_text_date_and_time_columns(df, date_columns=None, time_columns=None)
¶
Validates date and time columns in a DataFrame by checking their format and validity. This is especially useful when validating sheet data where dates and times are free text input.
For dates, the function verifies if values conform to 'dd/mm/yyyy' and validates
that the day and month values are within their respective valid ranges.
For times, it verifies if values conform to 'h:mm', 'hh:mm', 'hss', or 'hh
ss'
and validates hours, minutes, and seconds ranges.
If any issues are found, error messages are generated specifying the exact problem.
df : DataFrame The input DataFrame containing the data to be validated. date_columns : list[str], optional A list of column names in the DataFrame that represent date fields to be validated. Defaults to None. time_columns : list[str], optional A list of column names in the DataFrame that represent time fields to be validated. Defaults to None.
list[str] A list of error messages indicating any issues found in the format or validity of the date and/or time values within the specified columns. Each message includes the column name and the problematic value.
biocloudcore.raw.common_cleaning_functions
¶
This file contains common cleaning functions for cleaning raw data.
columns_to_snake_case(df)
¶
This function converts all column names in a dataframe to snake case.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
The input PySpark dataframe to be converted. Can contain columns in camelCase, PascalCase, kebab-case, dot.case, space case, or combinations of these. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
DataFrame |
DataFrame
|
The dataframe with column names formatted in snake case. |
flatten_nested_structs(df)
¶
Flatten a DataFrame by expanding nested StructType columns into separate columns.
This function recursively flattens all the nested struct columns in the given DataFrame. For each struct column, each field inside the struct is extracted into its own column. The resulting DataFrame will have no nested structs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
The input PySpark DataFrame with potentially nested StructType columns. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
DataFrame |
DataFrame
|
A new DataFrame where all nested structs are flattened into individual columns. |
Example
Given the following example DataFrame schema:
root |-- id: integer (nullable = true) |-- name: string (nullable = true) |-- address: struct (nullable = true) | |-- city: string (nullable = true) | |-- state: string (nullable = true) |-- contact: struct (nullable = true) | |-- email: string (nullable = true) | |-- phone: struct (nullable = true) | | |-- home: string (nullable = true) | | |-- mobile: string (nullable = true)
Input DataFrame:
+---+----+----------------+----------------------------------------+ | id|name| address| contact | +---+----+----------------+----------------------------------------+ | 1|John| {New York, NY}| {john@example.com, {123-456, 987-654}} | +---+----+----------------+----------------------------------------+
After applying the function flatten_nested_structs(df), the DataFrame will be:
Output DataFrame:
+---+----+-------------+--------------+-----------------+------------------+ | id|name|address_city |address_state |contact_email |contact_phone_home| +---+----+-------------+--------------+-----------------+------------------+ | 1|John| New York | NY |john@example.com | 123-456 | +---+----+-------------+-----------+--------------------+------------------+
flatten_nested_structs_and_arrays(df)
¶
This function is similar in functionality to flatten_nested_structs, however it flattens also nested arrays of
dictionaries.
Returns:
| Name | Type | Description |
|---|---|---|
DataFrame |
DataFrame
|
the flattened DataFrame. |
biocloudcore.raw.common_bucket_functions
¶
This file contains common bucket functions used for interacting with S3 buckets.
find_metadata_file_path(s3_client, bucket_name, file_prefix, entity)
¶
Find a metadata file for a given entity in the specified S3 bucket and prefix and return the S3 path to the file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
s3_client
|
S3Client
|
The S3 service client used to list objects. |
required |
bucket_name
|
str
|
The name of the S3 bucket. |
required |
file_prefix
|
str
|
The prefix path within the S3 bucket. |
required |
entity
|
str
|
The entity name to search for in the metadata files. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The S3 path to the metadata file. |
Raises:
| Type | Description |
|---|---|
Exception
|
If the metadata file for the specified entity is not found. |
find_metadata_files(s3_client, bucket_name, file_prefix)
¶
Find all metadata files in the specified S3 bucket and prefix and return the S3 paths to the files.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
s3_client
|
S3Client
|
The S3 service client used to list objects. |
required |
bucket_name
|
str
|
The name of the S3 bucket. |
required |
file_prefix
|
str
|
The prefix path within the S3 bucket. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
list |
list
|
list of S3 paths to the metadata files. |
get_latest_dump_directory(s3_client, landing_zone_bucket, dump_folder)
¶
Get the latest timestamped folder from a dump folder.
Every time we retrieve a dump from a source, we save that dump to a timestamped folder. This function will find the folder with the latest timestamp and return that as a string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
s3_client
|
S3Client
|
S3 client to interact with S3 |
required |
landing_zone_bucket
|
str
|
name of the bucket where the dump is stored |
required |
dump_folder
|
str
|
folder where the dump is stored |
required |
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
the absolute path to the latest timestamped folder |
biocloudcore.raw.common_s3_functions
¶
This file contains common S3 functions used for interacting with data on S3.
find_metadata_file_path(s3_client, bucket_name, file_prefix, entity)
¶
Find a metadata file for a given entity in the specified S3 bucket and prefix and return the S3 path to the file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
s3_client
|
S3Client
|
The S3 service client used to list objects. |
required |
bucket_name
|
str
|
The name of the S3 bucket. |
required |
file_prefix
|
str
|
The prefix path within the S3 bucket. |
required |
entity
|
str
|
The entity name to search for in the metadata files. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The S3 path to the metadata file. |
Raises:
| Type | Description |
|---|---|
Exception
|
If the metadata file for the specified entity is not found. |
find_metadata_files(s3_client, bucket_name, file_prefix)
¶
Find all metadata files in the specified S3 bucket and prefix and return the S3 paths to the files.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
s3_client
|
S3Client
|
The S3 service client used to list objects. |
required |
bucket_name
|
str
|
The name of the S3 bucket. |
required |
file_prefix
|
str
|
The prefix path within the S3 bucket. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
list |
list
|
list of S3 paths to the metadata files. |
get_device_folders_with_payload(s3_client, source, bucket, prefix, device_limit)
¶
Retrieve the device folders containing payload data to process from the specified S3 bucket.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
s3_client
|
S3Client
|
The S3 client instance used for interacting with the S3 service. |
required |
source
|
str
|
A descriptive source name for logging purposes. |
required |
bucket
|
str
|
The name of the S3 bucket where the device folders are located. |
required |
prefix
|
str
|
The S3 prefix to filter folders by (e.g., a base folder path). |
required |
device_limit
|
int
|
The maximum number of device folders to process. If None, all folders will be processed. |
required |
Returns:
| Type | Description |
|---|---|
list[str]
|
list[str]: A list of device folder paths to process. |
move_files_to_processed_folder(s3_client, max_upload_workers, source_bucket_name, uploaded_files_df)
¶
Move uploaded files to the 'processed' folder in the same source bucket.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
s3_client
|
S3Client
|
The S3 client instance used for interacting with the S3 service. |
required |
max_upload_workers
|
int
|
The maximum number of threads that can upload simultaneously |
required |
source_bucket_name
|
str
|
The name of the source S3 bucket where the files are stored. |
required |
uploaded_files_df
|
DataFrame
|
A Dataframe of uploaded file metadata (including source paths). |
required |