This section takes about 10 minutes to complete.
Data Testing
By now, you might have been acutely aware that there were a number of tests used throughout the previous exercises. They took the shape of either a (1) unit test or (2) an E2E (end-to-end) test. Just like traditional software development, there is a time and place to use either of those tests and it increases the readability and accountability of the transformations and additionally aids your colleagues (or yourself!) in future months to understand what had been implemented in the past (and ensures that existing functionality is not broken).
We often use both to ensure the full functionality of a data application. Unit Tests verify the behaviour of a small piece of code, making deliberate assumptions of the starting shape of data, whereas E2E Tests often test a flow from the start to finish to ensure that the individual pieces, when put together, result in the expected behaviour. Per the testing pyramid, there are often many more unit tests than E2E tests (E2E tests are more expensive and slower to run).
Unit Tests​
Take for example the following transformation code. It first filters a DataFrame on action and message type, unpacks a json string into json, selects/drops certain columns, and then casts a timestamp column (type String) into a timestamp type. For explanatory purposes, this is not too complex, but it has a variety of behaviours.
input_df.filter((input_df.action == "StartTransaction") & (input_df.message_type == 2))
.withColumn("new_body",from_json(col("body"), StructType([
StructField("connector_id", IntegerType(), True),
StructField("id_tag", StringType(), True),
StructField("meter_start", IntegerType(), True),
StructField("timestamp", StringType(), True),
StructField("reservation_id", IntegerType(), True),
])
)). \
withColumn("connector_id", input_df.new_body.connector_id). \
withColumn("id_tag", input_df.new_body.id_tag). \
withColumn("meter_start", input_df.new_body.meter_start). \
withColumn("timestamp", input_df.new_body.timestamp). \
withColumn("reservation_id", input_df.new_body.reservation_id). \
drop("new_body"). \
drop("body"). \
withColumn("timestamp", to_timestamp(col("timestamp")))
Would you be able to write a 1 simple unit test that tests the entirety of this logic?
If you answered no, you're correct. While you could hypothetically write a single unit test, it would be terribly long, would miss edge cases, and it would be a nightmare to debug. The more ideal approach would be to write a variety of unit tests that would further improve future development by breaking up the logic. By doing this, it ensures that when new transformations are added, it would be easy to understand what it does (self-describing function name) and to debug when something has gone wrong.
We would need to break down this logic into several testable parts. For example, we'll first extract the filtering logic into its own function:
def filter_by_start_transaction_and_request(input_df: DataFrame) -> DataFrame:
return input_df.filter((input_df.action == "StartTransaction") & (input_df.message_type == 2))
Note that the input AND output are both DataFrames.
A unit test provides a mock of the incoming data and tests ONLY that the expected data is returned, namely the number of resulting records and the values of significant columns.
The following test is written in the format that is used in the previously-used Databricks exercises which might look different than invocations from testing frameworks like Pytest.
import pandas as pd
from typing import Callable
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
def test_filter_by_start_transaction_and_request_unit(spark, f: Callable):
input_pandas = pd.DataFrame([
{
"action": "StartTransaction",
"message_type": 2,
"charge_point_id": "123"
},
{
"action": "StartTransaction",
"message_type": 3,
"charge_point_id": "123"
},
])
input_df = spark.createDataFrame(
input_pandas,
StructType([
StructField("action", StringType()),
StructField("message_type", IntegerType()),
StructField("charge_point_id", StringType()),
])
)
result = input_df.transform(f)
print("Transformed DF")
result.show()
result_count = result.count()
expected_count = 1
assert result_count == expected_count, f"Expected {expected_count}, but got {result_count}"
result_data = [(x.action, x.message_type) for x in result.collect()]
expected_data = [("StartTransaction", 2)]
assert result_data == expected_data, f"Expected {expected_data}, but got {result_data}"
print("All tests pass! :)")
test_filter_by_start_transaction_and_request_unit(spark, filter_by_start_transaction_and_request)
#=> ^^^ This call could be replaced by a testing framework, such as Pytest.
In our production code, we can call our custom function via the transform function.
df.transform(filter_by_start_transaction_and_request)
The transform function is powerful in that it allows us to chain custom Dataframe functions (which are in turn, independent and testable).
df.transform(filter_by_start_transaction_and_request).transform(some_other_awesome_function)
We would then proceed with testing the remainder of the logic in a similar fashion.
E2E Tests​
An E2E works similarly but we test the output of the entire transformation journey, from the start to a designated target/end, instead of a single transformation function (as we did with Unit Tests). There are two important aspects that differentiate E2E tests from Unit tests:
- E2E tests make an assertion on the output of data based on multiple functions called together, for example
df.transform(filter_by_start_transaction_and_request).transform(some_other_awesome_function)
, as opposed to unit tests which test a single function - In order to test the "full journey", including all functions transformations, the dataset under test might need to be more comprehensive and shaped closer to production data to capture nuances, as opposed to unit test data which heavily mocks data to truly test one aspect of the logic.
For E2E tests, some Data Engineers sometimes choose to use real production data which is dangerous because it could contain confidential and personal information (which gets checked into version control). On the other hand, testing data that is only somewhat production-like could result in inaccurate or missing logic due to the nuances in production data. The sensible to default is to first truly understand the data and its nature and create a representation of data that is production-like and contains such nuances. Sometimes, it takes time to construct a representation of production data as the longer you work with the data, the more you learn about it. This is still magnitudes better practice (and safer!) than using real production data for your E2E tests.
Let's say we'd like to test the journey, with our E2E data under test stored in df
:
df.transform(filter_by_start_transaction_and_request).transform(some_other_awesome_function)
The following test is written in the format that is used in the previously-used Databricks exercises which might look different than invocations from testing frameworks like Pytest.
Our E2E test might look like:
def test_e2e(result_df: DataFrame):
result_count = result_df.count()
expected_count = 2599
assert result_count == expected_count, f"Expected {expected_count}, but got {result_count}"
result_action = [x.action for x in result_df.select("action").distinct().collect()]
expected_action = ["StartTransaction"]
assert result_action == expected_action, f"Expected {expected_action}, but got {result_action}"
result_message_type = [x.message_type for x in result_df.select("message_type").distinct().collect()]
expected_message_type = [2]
assert result_message_type == expected_message_type, f"Expected {expected_message_type}, but got {result_message_type}"
print("All tests pass! :)")
test_e2e(df.transform(filter_by_start_transaction_and_request).transform(some_other_awesome_function))
#=> ^^^ This call could be replaced by a testing framework, such as Pytest.
Our E2E test takes the full transformation of our production-like dataset and validates the expected count and expected significant values in the DataFrame (similar to the Unit Test, with the only difference that the Unit test uses mocked data, devoid of most other columns). Because E2E tests are often expensive (time and compute) to run, these are often used sparingly. In the previous exercises, we ran E2E tests after every function definition and Unit Test for the benefit of learning, but in reality, we might abstain from running it at that level of frequency.