1+ import multiprocessing
2+ import os
3+ import logging
4+ from pathlib import Path
5+ import concurrent .futures
6+ from typing import List , TYPE_CHECKING , Union , cast , Optional
7+
8+ from humanloop .types import FileType , PromptResponse , AgentResponse , ToolResponse , DatasetResponse , EvaluatorResponse , FlowResponse
9+ from humanloop .core .api_error import ApiError
10+
11+ if TYPE_CHECKING :
12+ from humanloop .base_client import BaseHumanloop
13+
14+ # Set up logging
15+ logger = logging .getLogger (__name__ )
16+ logger .setLevel (logging .INFO )
17+ console_handler = logging .StreamHandler ()
18+ formatter = logging .Formatter ("%(message)s" )
19+ console_handler .setFormatter (formatter )
20+ if not logger .hasHandlers ():
21+ logger .addHandler (console_handler )
22+
23+ class SyncClient :
24+ """Client for managing synchronization between local filesystem and Humanloop."""
25+
26+ def __init__ (
27+ self ,
28+ client : "BaseHumanloop" ,
29+ base_dir : str = "humanloop" ,
30+ max_workers : Optional [int ] = None
31+ ):
32+ """
33+ Parameters
34+ ----------
35+ client: Humanloop client instance
36+ base_dir: Base directory for synced files (default: "humanloop")
37+ max_workers: Maximum number of worker threads (default: CPU count * 2)
38+ """
39+ self .client = client
40+ self .base_dir = Path (base_dir )
41+ self .max_workers = max_workers or multiprocessing .cpu_count () * 2
42+
43+ def _save_serialized_file (self , serialized_content : str , file_path : str , file_type : FileType ) -> None :
44+ """Save serialized file to local filesystem.
45+
46+ Args:
47+ serialized_content: The content to save
48+ file_path: The path where to save the file
49+ file_type: The type of file (prompt or agent)
50+ """
51+ try :
52+ # Create full path including base_dir prefix
53+ full_path = self .base_dir / file_path
54+ # Create directory if it doesn't exist
55+ full_path .parent .mkdir (parents = True , exist_ok = True )
56+
57+ # Add file type extension
58+ new_path = full_path .parent / f"{ full_path .stem } .{ file_type } "
59+
60+ # Write content to file
61+ with open (new_path , "w" ) as f :
62+ f .write (serialized_content )
63+ logger .info (f"Syncing { file_type } { file_path } " )
64+ except Exception as e :
65+ logger .error (f"Failed to sync { file_type } { file_path } : { str (e )} " )
66+ raise
67+
68+ def _process_file (
69+ self ,
70+ file : Union [PromptResponse , AgentResponse , ToolResponse , DatasetResponse , EvaluatorResponse , FlowResponse ]
71+ ) -> None :
72+ """Process a single file by serializing and saving it.
73+
74+ Args:
75+ file: The file to process (must be a PromptResponse or AgentResponse)
76+ """
77+ try :
78+ # Skip if not a prompt or agent
79+ if file .type not in ["prompt" , "agent" ]:
80+ logger .warning (f"Skipping unsupported file type: { file .type } " )
81+ return
82+
83+ # Cast to the correct type for type checking
84+ if file .type == "prompt" :
85+ file = cast (PromptResponse , file )
86+ elif file .type == "agent" :
87+ file = cast (AgentResponse , file )
88+
89+ # Serialize the file based on its type
90+ try :
91+ if file .type == "prompt" :
92+ serialized = self .client .prompts .serialize (id = file .id )
93+ elif file .type == "agent" :
94+ serialized = self .client .agents .serialize (id = file .id )
95+ else :
96+ logger .warning (f"Skipping unsupported file type: { file .type } " )
97+ return
98+ except ApiError as e :
99+ # The SDK returns the YAML content in the error body when it can't parse as JSON
100+ if e .status_code == 200 :
101+ serialized = e .body
102+ else :
103+ raise
104+ except Exception as e :
105+ logger .error (f"Failed to serialize { file .type } { file .id } : { str (e )} " )
106+ raise
107+
108+ # Save to local filesystem
109+ self ._save_serialized_file (serialized , file .path , file .type )
110+
111+ except Exception as e :
112+ logger .error (f"Error processing file { file .path } : { str (e )} " )
113+ raise
114+
115+ def pull (self ) -> List [str ]:
116+ """Sync prompt and agent files from Humanloop to local filesystem.
117+
118+ Returns:
119+ List of successfully processed file paths
120+ """
121+ successful_files = []
122+ failed_files = []
123+
124+ # Create a thread pool for processing files
125+ with concurrent .futures .ThreadPoolExecutor (max_workers = self .max_workers ) as executor :
126+ futures = []
127+ page = 1
128+
129+ while True :
130+ try :
131+ response = self .client .files .list_files (type = ["prompt" , "agent" ], page = page )
132+
133+ if len (response .records ) == 0 :
134+ break
135+
136+ # Submit each file for processing
137+ for file in response .records :
138+ future = executor .submit (self ._process_file , file )
139+ futures .append ((file .path , future ))
140+
141+ page += 1
142+ except Exception as e :
143+ logger .error (f"Failed to fetch page { page } : { str (e )} " )
144+ break
145+
146+ # Wait for all tasks to complete
147+ for file_path , future in futures :
148+ try :
149+ future .result ()
150+ successful_files .append (file_path )
151+ except Exception as e :
152+ failed_files .append (file_path )
153+ logger .error (f"Task failed for { file_path } : { str (e )} " )
154+
155+ # Log summary
156+ if successful_files :
157+ logger .info (f"\n Synced { len (successful_files )} files" )
158+ if failed_files :
159+ logger .error (f"Failed to sync { len (failed_files )} files" )
160+
161+ return successful_files
0 commit comments