DataHub
Dbt
Metadata
Ada Draginda
Mar 21, 2023
The propagating of metadata, tags, and owners outside of a DataHub dataset ingestion can be made easy.
At Notion, data powers our business decision-making and our product itself. Therefore, it is essential for us to (a) understand what data in our warehouse is business critical and (b) ensure our critical data is built off of reliable sources. Without this understanding, we cannot effectively respond to incidents or know where to focus our reliability efforts. This would mean our customers and internal stakeholders would lose faith in our data.
We rely on Acryl Datahub to gain these insights and ensure our critical data is reliable. Datahub is the #1 open-source data catalog for data discovery and modern governance. Acryl’s SaaS product takes DataHub to the next level through automation and emphasis on time-to-value. In this post, we will discuss an integration we have built on top of DataHub to make sure our business-critical datasets are always properly labeled so we are able to respond to incidents with the most up to date information.
At Notion, we use a complicated formula that calculates who should own datasets and how these datasets should be triaged should their workflows fail. When we excitedly signed up with Acryl Data we were faced with the challenge of migrating away from our manual processes of tagging and annotating our tables within DataHub. We were able to automate using DataHub’s robust API which solves the problem of managing entities outside of their ingestion. However, we were met with a lot of boilerplate, so we wrote and open-sourced a small Python package that wraps the API’s endpoints.
Let’s look at a simplified example of how we used the package to solve the problem…
Say we have some metadata boolean flags that describe how a dataset is used:
We want to use those flags to decide the right triage priority and owners for a given table.
But we also need to consider any downstream tables that have a higher priority so the first step is to propagate metadata by inspecting the lineage. The best place to modify metadata is during ingestion, so we’ll need a transformer to do that work for us.
A full example of how to add a metadata transformer can be found as an example in the datahub-tools repository, but the heart of the work is summarized below.
A transformer is a custom class that is called during the ingest of your data into DataHub.
class MyPropertiesResolver(AddDatasetPropertiesResolverBase):
def get_properties_to_add(self, entity_urn: str) -> dict[str, str]:
# returns a dictionary of metadata that should be included during
# ingest for the givent entity_run
return {}
Steps that we will need to take to fill out this class:
We will combine all of these steps into one function that we can call in the constructor of the class.
Our metadata are stored within our DBT resources, so the first step is to fetch them all from a generated manifest.json. (For readers that are not familiar with DBT, the manifest contains all of the information on how to run the SQL for a given table, including all of its dependencies and metadata). Fortunately, the datahub-tools package has a convenient function to do this work for us.
from datahub_tools.dbt import extract_dbt_resources
resources_by_unique_id: dict[str, dict] = extract_dbt_resources(
manifest_file='/path/to/your/manifest.json'
)
2. Propagate Metadata
Again, the datahub-tools package can help significantly as it offers a convenience function that provides all downstream and upstream resources for a given resource.
from datahub_tools.dbt import get_dbt_dependencies
dependencies: dict[str, ModelDependency] = get_dbt_dependencies(
dbt_resources_by_unique_id=resources_by_unique_id
)
We can now propagate the metadata along:
for unique_id, resource in resources_by_unique_id.items():
upstream_unique_ids: set[str] = dependencies[unique_id].get_all_upstream()
# note that this is a custom metadata store that we added to our DBT
resource_metadata = resource['config']['notion_metadata']
for upstream_unique_id in upstream_unique_ids:
upstream_resource = resources_by_unique_id[upstream_unique_id]
upstream_metadata = upstream_resource['config']['notion_metadata']
propagated_metadata = propagate(upstream_metadata, resource_metadata)
upstream_resource['config']['notion_metadata'] = propagated_metadata
Where the propagation logic is just combining the two dictionaries of booleans with or
:
propagate(left: dict[str, bool], right: dict[str, bool]) -> dict[str, bool]:
return {
'is_exposed_externally':
left['is_exposed_externally'] or right['is_exposed_externally'],
'is_used_to_drive_company_metrics':
left['is_used_to_drive_company_metrics'
or right['is_used_to_drive_company_metrics'
}
This strategy of using or
ensures that every resource has True
set for a metadata entry if it or any of its downstream dependencies are set to True
.
3. Map our Resources to DataHub Entities
We have our propagated metadata all within resources_by_unique_id
but this maps a DBT unique id to the resource when DataHub uses the table name. (A DBT unique id is of the form type.table_name such as model.my_table
whereas DataHub uses the table’s storage location, such as production.my_table
).
metadata_by_table_name: dict[str, dict] = {
x['name'], x['config']['notion_metadata']
for x in resources_by_unique_id.values()
}
Putting it all together into the Transformer class:
import datahub.emitter.mce_builder as builder
from datahub.ingestion.transformer.add_dataset_properties import (
AddDatasetPropertiesResolverBase,
)
from datahub_tools.dbt import (
extract_dbt_resources, get_dbt_dependencies, ModelDependency,
)
class MyPropertiesResolver(AddDatasetPropertiesResolverBase):
def __init__(self):
super().__init__()
resources_by_unique_id: dict[str, dict] = extract_dbt_resources(
manifest_file='/path/to/your/manifest.json'
)
dependencies: dict[str, ModelDependency] = get_dbt_dependencies(
dbt_resources_by_unique_id=resources_by_unique_id
)
# propoagate metadata
for unique_id, resource in resources_by_unique_id.items():
upstream_unique_ids: set[str] = dependencies[unique_id].get_all_upstream()
# note that this is a custom metadata store that we added to our DBT
resource_metadata = resource['config']['notion_metadata']
for upstream_unique_id in upstream_unique_ids:
upstream_resource = resources_by_unique_id[upstream_unique_id]
upstream_metadata = upstream_resource['config']['notion_metadata']
propagated_metadata = propagate(upstream_metadata, resource_metadata)
upstream_resource['config']['notion_metadata'] = propagated_metadata
self.metadata_by_table_name: dict[str, dict] = {
x['name'], x['config']['notion_metadata']
for x in resources_by_unique_id.values()
}
def get_properties_to_add(self, entity_urn: str) -> dict[str, str]:
# returns a dictionary of metadata that should be included during
# ingest for the givent entity_run
dataset_key: DatasetKeyClass = builder.dataset_urn_to_key(entity_urn)
return self.metadata_by_table_name.get(dataset_key, {})
There is a bit of additional boilerplate required to add this transformer to your ingest, which can be found in the official documentation, but you can now transform your metadata entries during data ingest.
Now that we have propagated and ingested metadata, we can calculate the appropriate owner and priority.
We first define how we will associate an owner and priority. In the interest of brevity, the calculation of the owner and priority were combined into one function (despite not being best practice to do so).
import datahub.emitter.mce_builder as builder
def metadata_to_priority_and_owner(metadata: dict[str, str]) -> tuple[str, str | None]:
def _get_metadata(key: str) -> bool | None:
value = metadata.get(key)
return value.lower().strip() == "true" if value else None
is_exposed_externally = _get_metadata('is_exposed_externally')
is_used_to_drive_company_metrics = _get_metadata(
'is_used_to_drive_company_metrics'
)
if is_exposed_externally:
priority_tag_urn = builder.make_tag_urn("P0")
group_owner_urn = builder.make_group_urn("data_engineering")
elif is_used_to_drive_company_metrics:
priority_tag_urn = builder.make_tag_urn("P1")
group_owner_urn = builder.make_group_urn("data_science")
else:
priority = builder.make_tag_urn("P2")
group_owner_urn = None
return priority_tag_urn, owner
We now just have to fetch the metadata, pass it to the above function, and then set the owners and tags using the DataHub API. Fortunately, the datahub-tools makes this process easier as well.
from datahub_tools.client import get_datahub_entities, DHEntity
entities: list[DHEntity] = get_datahub_entities()
The API allows batch setting/removing of tags/owners, and so we’ll group our changes as a list of entities that need each operation, reducing the overall number of calls that need to be made.
import datahub.emitter.mce_builder as builder
add_tags: [str, list[DHEntity]] = {}
rem_tags: [str, list[DHEntity]] = {}
for dh_entity in entities:
# priorities are P0, P1, or P2
# owners are "data_engineering", "data_science", or None
calculated_priority, calculated_owner = metadata_to_priority(dh_entity.metadata)
existing_priority: str | None = dh_entity.get_priority()
existing_owner: str | None = dh_entity.get_owner()
# Remove priority if the new priority is None or is different
# Add the priority if the existing priority is None or is different
# (don't do anything if both are the same, including both being None)
if calculated_priority != existing_priority:
if existing_priority:
existing_priority_urn = builder.make_tag_urn(existing_priority)
entities = rem_tags.get(existing_priority_urn, [])
entities.append(dh_entity)
if existing_priority_urn not in rem_tags:
rem_tags[existing_priority_urn] = entities
if calculated_priority:
new_priority_urn = builder.make_tag_urn(calculated_priority)
entities = add_tags.get(new_priority_urn, [])
entities.append(dh_entity)
if new_priority_urn not in add_tags:
add_tags[new_priority_urn] = entities
# and similarly for owners
...
Finally, we set/remove tags using functions from datahub-tools.
from datahub_tools.client import remove_tags, set_tags
for tag_urn, entities in rem_tags.items():
remove_tags(tag_urns=tag_urn, resource_urns=[x.urn for x in entities])
for tag_urn, entities in add_tags.items():
set_tags(tag_urns=tag_urn, resource_urns=[x.urn for x in entities])
# and similaryly for owners, using the functions set_group_owner and
# remove_owners from datahub_tools
Above, we have shown how to inject metadata into our resources during the ingestion to DataHub. We also demonstrated how to pull that metadata back down, and then calculate then set owner and priority tags. To make all of it easier, there was frequent use of the open-source package datahub-tools.
We hope you found this demonstration useful and best of luck in your data cataloging!
DataHub
Dbt
Metadata
NEXT UP
If you're part of a data team responsible for a business-critical dataset, dashboard, or any other data asset, you know how important it is to stay on top of any upstream changes before they impact you and your stakeholders. What if a table you rely on just got deprecated? What if a column you use was removed upstream? Or if an upstream table missed an update and now has stale, un-synced data? Staying updated on critical assets in real time is critical to effective data monitoring and data quality. Given the complexity of today’s data environment, doing this is no walk in the park. But what if there was a way to stay in the loop all the time? And know exactly what happened – right when it happened? With Acryl DataHub's Subscriptions and Notifications feature, you can.
Maggie Hays
2023-09-20
See an overview of DataHub’s vision and current model for Data Products, as well as our vision and commitments for the future.
Shirshanka Das
2023-09-19
See how we’ve implemented Data Contracts within DataHub, how you can get started, and how the Data Products functionality can help you get the most out of Data Contracts.
Shirshanka Das
2023-09-19