11import json
2+ import os
23import pickle
34from abc import abstractmethod
45from typing import Any , Optional , Type
6+ from urllib .parse import urljoin
57
68import dagster ._check as check
79import pandas as pd
1012 DagsterInvariantViolationError ,
1113 InitResourceContext ,
1214 InputContext ,
15+ MetadataValue ,
1316 OutputContext ,
1417)
1518from dagster ._core .storage .upath_io_manager import UPathIOManager
1619from dagster ._utils import PICKLE_PROTOCOL
20+ from django .urls import reverse
1721from pydantic .fields import Field
1822from upath import UPath
1923
2024
21- class PickleFilesystemIOManager (UPathIOManager ):
25+ class DownloadAssetMixin :
26+ """
27+ Mixin that adds download URL of the asset to the metadata.
28+ """
29+
30+ def get_download_url (self , context : OutputContext ) -> str :
31+ if not context or not getattr (context , "has_asset_key" , False ) or not context .has_asset_key :
32+ return ""
33+
34+ asset_name = context .asset_key .path [- 1 ]
35+ root_url = os .getenv ("BASELINE_EXPLORER_API_ROOT_URL" , "http://localhost:8000" )
36+
37+ kwargs = {"asset_name" : asset_name }
38+ if context .has_partition_key :
39+ kwargs ["partition_name" ] = context .partition_key
40+ relative_url = reverse ("asset_download_partitioned" , kwargs = kwargs )
41+ else :
42+ relative_url = reverse ("asset_download" , kwargs = kwargs )
43+
44+ return urljoin (root_url , relative_url )
45+
46+ def handle_output (self , context : OutputContext , obj ):
47+ download_url = self .get_download_url (context )
48+ if download_url :
49+ context .add_output_metadata (
50+ {
51+ "download" : MetadataValue .url (download_url ),
52+ }
53+ )
54+ super ().handle_output (context , obj )
55+
56+
57+ class PickleFilesystemIOManager (DownloadAssetMixin , UPathIOManager ):
2258 """
2359 Dagster I/O Manager that serializes Python objects to files using Pickle.
2460
@@ -57,7 +93,7 @@ def load_from_path(self, context: InputContext, path: "UPath") -> Any:
5793 return pickle .load (file )
5894
5995
60- class JSONFilesystemIOManager (UPathIOManager ):
96+ class JSONFilesystemIOManager (DownloadAssetMixin , UPathIOManager ):
6197 """
6298 Dagster I/O Manager that serializes Python objects to JSON files.
6399 """
@@ -77,7 +113,7 @@ def load_from_path(self, context: InputContext, path: "UPath") -> Any:
77113 return json .loads (file .read ())
78114
79115
80- class DataFrameCSVFilesystemIOManager (UPathIOManager ):
116+ class DataFrameCSVFilesystemIOManager (DownloadAssetMixin , UPathIOManager ):
81117 """
82118 Dagster I/O Manager that serializes DataFrames to CSV files.
83119 """
@@ -95,7 +131,7 @@ def load_from_path(self, context: InputContext, path: UPath) -> pd.DataFrame:
95131 return pd .read_csv (path )
96132
97133
98- class DataFrameExcelFilesystemIOManager (UPathIOManager ):
134+ class DataFrameExcelFilesystemIOManager (DownloadAssetMixin , UPathIOManager ):
99135 """
100136 Dagster I/O Manager that serializes DataFrames to Excel .xlsx using the OpenPyXL engine.
101137
0 commit comments