Skip to content

Dumps of Twitch data

This example is based on data capturing statistics and properties of popular Twitch channels. The setup is such that we have two data sets 'of the same kind' but from different points in time.

In other words, a 'version' of the data set represents a temporal notion. For example, version 1 might stem from end of March and version 2 from end of April. Moreover, we will assume that the first, version 1, has been vetted and approved with the help of manual investigation and domain knowledge. The second data set, version 2, has just been made available. We would like to use it but can't be sure of its validity just yet. As a consequence we would like to assess the quality of the data in version 2.

In order to have a database Postgres instance to begin with, it might be useful to use our script, spinning up a dockerized Postgres database:

./start_postgres.sh

The original data set can be found on kaggle. For the sake of this tutorial, we slightly process it and provide two versions of it. One can either recreate this by executing this processing script oneself on the original data or download our processed files ( version 1 and version 2) right away.

Once both version of the data exist, they can be uploaded to the tabase. We provide an uploading script creating and populating one table per version of the data in a Postgres database. It resembles the following:

address = os.environ.get("DB_ADDR", "localhost")
connection_string = f"postgresql://datajudge:datajudge@{address}:5432/datajudge"
engine = sa.create_engine(connection_string)
df_v2.to_sql("twitch_v2", engine, schema="public", if_exists="replace")
df_v1.to_sql("twitch_v1", engine, schema="public", if_exists="replace")

Once the tables are stored in a database, we can actually write a datajudge specification against them. But first, we'll have a look at what the data roughly looks like by investigating a random sample of four rows:

A sample of the data

channel watch_time stream_time peak_viewers average_viewers followers followers_gained views_gained partnered mature language
xQcOW 6196161750 215250 222720 27716 3246298 1734810 93036735 True False English
summit1g 6091677300 211845 310998 25610 5310163 1374810 89705964 True False English
Gaules 5644590915 515280 387315 10976 1767635 1023779 102611607 True True Portuguese
ESL_CSGO 3970318140 517740 300575 7714 3944850 703986 106546942 True False English

Note that we expect both version 1 and version 2 to follow this structure. Due to them being assembled at different points in time, merely their rows shows differ.

Now let's write an actual specification, expressing our expectations against the data. First, we need to make sure a connection to the database can be established at test execution time. How this is done exactly depends on how you set up your database. When using our default setup with running, this would look as follows:

import os
import pytest
import sqlalchemy as sa


@pytest.fixture(scope="module")
def datajudge_engine():
    address = os.environ.get("DB_ADDR", "localhost")
    connection_string = f"postgresql://datajudge:datajudge@{address}:5432/datajudge"
    return sa.create_engine(connection_string)

Once a way to connect to the database is defined, we want to declare our data sources and express expectations against them. In this example, we have two tables in the same database - one table per version of the Twitch data.

Yet, let's start with a straightforward example only using version 2. We want to use our domain knowledge that constrains the values of the language column only to contain letters and have a length strictly larger than 0.

from datajudge import WithinRequirement


# Postgres' default database.
db_name = "tempdb"
# Postgres' default schema.
schema_name = "public"

within_requirement = WithinRequirement.from_table(
    table_name="twitch_v2",
    schema_name=schema_name,
db_name=db_name,
)
within_requirement.add_varchar_regex_constraint(
column="language",
regex="^[a-zA-Z]+$",
)

Done! Now onto comparisons between the table representing the approved version 1 of the data and the to be assessed version 2 of the data.

from datajudge import BetweenRequirement, Condition

between_requirement_version = BetweenRequirement.from_tables(
    db_name1=db_name,
    db_name2=db_name,
    schema_name1=schema_name,
    schema_name2=schema_name,
    table_name1="twitch_v1",
    table_name2="twitch_v2",
)
between_requirement_version.add_column_subset_constraint()
between_requirement_version.add_column_superset_constraint()
columns = ["channel", "partnered", "mature"]
between_requirement_version.add_row_subset_constraint(
columns 1=columns, columns2=columns, constant_max_missing_fraction=0
)
between_requirement_version.add_row_matching_equality_constraint(
    matching_columns1=["channel"],
    matching_columns2=["channel"],
    comparison_columns1=["language"],
    comparison_columns2=["language"],
    max_missing_fraction=0,
)

between_requirement_version.add_ks_2sample_constraint(
    column1="average_viewers",
    column2="average_viewers",
    significance_level=0.05,
)
between_requirement_version.add_uniques_equality_constraint(
    columns1=["language"],
    columns2=["language"],
)

Now having compared the 'same kind of data' between version 1 and version 2, we may as well compare 'different kind of data' within version 2, as a means of a sanity check. This sanity check consists of checking whether the mean average_viewer value of mature channels should deviate at most 10% from the overall mean.

between_requirement_columns = BetweenRequirement.from_tables(
    db_name1=db_name,
    db_name2=db_name,
    schema_name1=schema_name,
    schema_name2=schema_name,
    table_name1="twitch_v2",
    table_name2="twitch_v2",
)

between_requirement_columns.add_numeric_mean_constraint(
    column1="average_viewers",
    column2="average_viewers",
    condition1=None,
    condition2=Condition(raw_string="mature IS TRUE"),
    max_absolute_deviation=0.1,
)

Lastly, we need to collect all of our requirements in a list and make sure pytest can find them by calling collect_data_tests.

from datajudge.pytest_integration import collect_data_tests
requirements = [
    within_requirement,
    between_requirement_version,
    between_requirement_columns,
]
test_func = collect_data_tests(requirements)

If we then test these expectations against the data by running

pytest specification.py`` -- where ``specification.py

contains all of the code outlined before (you can find it here) -- we see that the new version of the data is not quite on par with what we'd expect:

$ pytest twitch_specification.py
================================== test session starts ===================================
platform darwin -- Python 3.10.5, pytest-7.1.2, pluggy-1.0.0
rootdir: /Users/kevin/Code/datajudge/docs/source/examples
plugins: html-3.1.1, cov-3.0.0, metadata-2.0.2
collected 8 items

twitch_specification.py F.....FF                                                   [100%]

======================================== FAILURES ========================================
____________________ test_func[VarCharRegex::tempdb.public.twitch_v2] ____________________

constraint = <datajudge.constraints.varchar.VarCharRegex object at 0x10855da20>
datajudge_engine = Engine(postgresql://datajudge:***@localhost:5432/datajudge)

@pytest.mark.parametrize(
    "constraint", all_constraints, ids=Constraint.get_description
)
def test_constraint(constraint, datajudge_engine):
    test_result = constraint.test(datajudge_engine)
>       assert test_result.outcome, test_result.failure_message
E       AssertionError: tempdb.public.twitch_v2's column(s) 'language' breaks regex
        '^[a-zA-Z]+$' in 0.045454545454545456 > 0.0 of the cases. In absolute terms, 1
    of the 22 samples violated the regex. Some counterexamples consist of the
    following: ['Sw3d1zh'].

../../../src/datajudge/pytest_integration.py:25: AssertionError
____________ test_func[UniquesEquality::public.twitch_v1 | public.twitch_v2] _____________

constraint = <datajudge.constraints.uniques.UniquesEquality object at 0x10855d270>
datajudge_engine = Engine(postgresql://datajudge:***@localhost:5432/datajudge)

@pytest.mark.parametrize(
    "constraint", all_constraints, ids=Constraint.get_description
)
def test_constraint(constraint, datajudge_engine):
    test_result = constraint.test(datajudge_engine)
>       assert test_result.outcome, test_result.failure_message
E       AssertionError: tempdb.public.twitch_v1's column(s) 'language' doesn't have
        the element(s) '{'Sw3d1zh'}' when compared with the reference values.

../../../src/datajudge/pytest_integration.py:25: AssertionError
______________ test_func[NumericMean::public.twitch_v2 | public.twitch_v2] _______________

constraint = <datajudge.constraints.numeric.NumericMean object at 0x1084e1810>
datajudge_engine = Engine(postgresql://datajudge:***@localhost:5432/datajudge)

@pytest.mark.parametrize(
    "constraint", all_constraints, ids=Constraint.get_description
)
def test_constraint(constraint, datajudge_engine):
    test_result = constraint.test(datajudge_engine)
>       assert test_result.outcome, test_result.failure_message
E       AssertionError: tempdb.public.twitch_v2's column(s) 'average_viewers' has
        mean 4734.9780000000000000, deviating more than 0.1 from
    tempdb.public.twitch_v2's column(s) 'average_viewers''s
    3599.9826086956521739. Condition on second table: WHERE mature IS TRUE

../../../src/datajudge/pytest_integration.py:25: AssertionError
================================ short test summary info =================================
FAILED twitch_specification.py::test_func[VarCharRegex::tempdb.public.twitch_v2] - Asse...
FAILED twitch_specification.py::test_func[UniquesEquality::public.twitch_v1 | public.twitch_v2]
FAILED twitch_specification.py::test_func[NumericMean::public.twitch_v2 | public.twitch_v2]
============================== 3 failed, 5 passed in 1.52s ===============================

Alternatively, you can also look at these test results in this html report generated by pytest-html.

Hence we see that we might not want to blindly trust version 2 of the data as is. Rather, we might need to investigate what is wrong with the data, what this has been caused by and how to fix it.

Concretely, what exactly do we learn from the error messages?

  • The column language now has a row with value 'Sw3d1zh'. This break two of our constraints. The VarCharRegex constraint compared the columns' values to a regular expression. The UniquesEquality constraint expected the unique values of the language column to not have changed between version 1 and version 2.
  • The mean value of average_viewers of mature channels is substantially - more than our 10% tolerance - lower than the global mean.