-
Notifications
You must be signed in to change notification settings - Fork 73
[#746] establish default order for replicas listed by an iRODSDataObject
#815
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -6,37 +6,37 @@ | |||||
| import os | ||||||
| import weakref | ||||||
| from typing import Any, List, Type | ||||||
| from irods.models import DataObject, Collection | ||||||
| from irods.manager import Manager | ||||||
| from irods.manager._internal import _api_impl, _logical_path | ||||||
| from irods.message import ( | ||||||
| iRODSMessage, | ||||||
| FileOpenRequest, | ||||||
| ObjCopyRequest, | ||||||
| StringStringMap, | ||||||
| DataObjInfo_for_session, | ||||||
| ModDataObjMeta_for_session, | ||||||
| DataObjChksumRequest, | ||||||
| DataObjChksumResponse, | ||||||
| RErrorStack, | ||||||
| STR_PI, | ||||||
| ) | ||||||
|
|
||||||
| import irods.client_configuration as client_config | ||||||
| import irods.exception as ex | ||||||
| import irods.keywords as kw | ||||||
| import irods.parallel as parallel | ||||||
| from irods.api_number import api_number | ||||||
| from irods.collection import iRODSCollection | ||||||
| from irods.data_object import ( | ||||||
| iRODSDataObject, | ||||||
| iRODSDataObjectFileRaw, | ||||||
| chunks, | ||||||
| irods_dirname, | ||||||
| irods_basename, | ||||||
| irods_dirname, | ||||||
| iRODSDataObject, | ||||||
| iRODSDataObjectFileRaw, | ||||||
| ) | ||||||
| import irods.client_configuration as client_config | ||||||
| import irods.keywords as kw | ||||||
| import irods.parallel as parallel | ||||||
| from irods.manager import Manager | ||||||
| from irods.manager._internal import _api_impl, _logical_path | ||||||
| from irods.message import ( | ||||||
| STR_PI, | ||||||
| DataObjChksumRequest, | ||||||
| DataObjChksumResponse, | ||||||
| DataObjInfo_for_session, | ||||||
| FileOpenRequest, | ||||||
| ModDataObjMeta_for_session, | ||||||
| ObjCopyRequest, | ||||||
| RErrorStack, | ||||||
| StringStringMap, | ||||||
| iRODSMessage, | ||||||
| ) | ||||||
| from irods.models import Collection, DataObject | ||||||
| from irods.parallel import deferred_call | ||||||
|
|
||||||
|
|
||||||
| logger = logging.getLogger(__name__) | ||||||
|
|
||||||
| _update_types: List[Type] = [] | ||||||
|
|
@@ -218,27 +218,29 @@ | |||||
| if size is not None and isinstance(open_options, dict): | ||||||
| open_options[kw.DATA_SIZE_KW] = size | ||||||
|
|
||||||
| def _download(self, obj, local_path, num_threads, updatables=(), **options): | ||||||
| def _download(self, obj_path, local_path, num_threads, updatables=(), **options): | ||||||
| """Transfer the contents of a data object to a local file. | ||||||
|
|
||||||
| Called from get() when a local path is named. | ||||||
| """ | ||||||
| if os.path.isdir(local_path): | ||||||
| local_file = os.path.join(local_path, irods_basename(obj)) | ||||||
| else: | ||||||
| local_file = local_path | ||||||
|
|
||||||
| local_file = ( | ||||||
| os.path.join(local_path, irods_basename(obj_path)) # noqa: PTH118 | ||||||
| if os.path.isdir(local_path) # noqa: PTH112 | ||||||
| else local_path | ||||||
| ) | ||||||
|
|
||||||
| # Check for force flag if local_file exists | ||||||
| if os.path.exists(local_file) and kw.FORCE_FLAG_KW not in options: | ||||||
| raise ex.OVERWRITE_WITHOUT_FORCE_FLAG | ||||||
|
|
||||||
| data_open_returned_values_ = {} | ||||||
| with self.open(obj, "r", returned_values=data_open_returned_values_, **options) as o: | ||||||
| with self.open(obj_path, "r", returned_values=data_open_returned_values_, **options) as o: | ||||||
| if self.should_parallelize_transfer(num_threads, o, open_options=options.items()): | ||||||
| error = RuntimeError("parallel get failed") | ||||||
| try: | ||||||
| if not self.parallel_get( | ||||||
| (obj, o), | ||||||
| (obj_path, o), | ||||||
| local_file, | ||||||
| num_threads=num_threads, | ||||||
| target_resource_name=options.get(kw.RESC_NAME_KW, ""), | ||||||
|
|
@@ -256,13 +258,38 @@ | |||||
| f.write(chunk) | ||||||
| do_progress_updates(updatables, len(chunk)) | ||||||
|
|
||||||
| def get(self, path, local_path=None, num_threads=DEFAULT_NUMBER_OF_THREADS, updatables=(), **options): | ||||||
| def get(self, | ||||||
| path, | ||||||
| local_path=None, | ||||||
| num_threads=DEFAULT_NUMBER_OF_THREADS, | ||||||
| updatables=(), | ||||||
| replica_sort_function=None, | ||||||
| **options): | ||||||
| """ | ||||||
| Get a reference to the data object at the specified `path'. | ||||||
|
|
||||||
| Only download the object if the local_path is a string (specifying | ||||||
| a path in the local filesystem to use as a destination file). | ||||||
| Create an iRODSDataObject instance representing the data object at the specified path. If local_path | ||||||
| is not None, it names a local file to which the content of the data object will be downloaded. | ||||||
|
|
||||||
| Args: | ||||||
| path: an absolute logical path where the data object may be found. | ||||||
| local_path: a filename within the local filesystem, the target for download ("GET") of the data | ||||||
| object if one is requested. Directory components in the path must already exist. | ||||||
| num_threads: in the case of a parallel data transfer (for large files), specifies how many transfer | ||||||
| control threads should be used. | ||||||
| updatables: a tuple or list in which each element is a tqdm-like progress bar object, or the equivalent: | ||||||
| an instance on which func.update(n) can be called with n being the number of bytes successfully | ||||||
| copied in the the data transfer increment just completed. If not a tuple or list, this argument | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| is interpreted as a single updatable. | ||||||
| replica_sort_function: a sort key function dictating the order of replica query results in 'self.replicas' | ||||||
|
Comment on lines
+274
to
+282
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These parameters have default arguments which are not explained or mentioned. Please document what the behavior is when the defaults are used. |
||||||
| **options: keyword arguments to be passed onto the _download function; A combination of possible options | ||||||
| to be relayed to the iRODS data object open() call. | ||||||
|
|
||||||
| Returns: | ||||||
| an iRODSDataObject representing the object at the given path. | ||||||
|
|
||||||
| Raises: | ||||||
| DataObjectDoesNotExist: if the specified path does not exist, or exists as a collection rather than a data | ||||||
| object. | ||||||
| """ | ||||||
| parent = self.sess.collections.get(irods_dirname(path)) | ||||||
|
|
||||||
| # TODO: optimize | ||||||
|
|
@@ -284,7 +311,7 @@ | |||||
| results = query.all() # get up to max_rows replicas | ||||||
| if len(results) <= 0: | ||||||
| raise ex.DataObjectDoesNotExist() | ||||||
| return iRODSDataObject(self, parent, results) | ||||||
| return iRODSDataObject(self, parent, results, replica_sort_function=replica_sort_function) | ||||||
|
|
||||||
| @staticmethod | ||||||
| def _resolve_force_put_option(options, default_setting=None, true_value=""): | ||||||
|
|
@@ -317,23 +344,25 @@ | |||||
| self._resolve_force_put_option(options, default_setting=client_config.data_objects.force_put_by_default) | ||||||
|
|
||||||
| if self.sess.collections.exists(irods_path): | ||||||
| obj = iRODSCollection.normalize_path(irods_path, os.path.basename(local_path)) | ||||||
| obj_path = iRODSCollection.normalize_path(irods_path, os.path.basename(local_path)) # noqa: PTH119 | ||||||
| else: | ||||||
| obj = irods_path | ||||||
| if kw.FORCE_FLAG_KW not in options and self.exists(obj): | ||||||
| obj_path = irods_path | ||||||
| if kw.FORCE_FLAG_KW not in options and self.exists(obj_path): | ||||||
| raise ex.OVERWRITE_WITHOUT_FORCE_FLAG | ||||||
| options.pop(kw.FORCE_FLAG_KW, None) | ||||||
|
|
||||||
| replica_sort_function = options.pop('replica_sort_function', None) | ||||||
|
|
||||||
| with open(local_path, "rb") as f: | ||||||
| sizelist = [] | ||||||
| if self.should_parallelize_transfer(num_threads, f, measured_obj_size=sizelist, open_options=options): | ||||||
| o = deferred_call(self.open, (obj, "w"), options) | ||||||
| o = deferred_call(self.open, (obj_path, "w"), options) | ||||||
| f.close() | ||||||
| error = RuntimeError("parallel put failed") | ||||||
| try: | ||||||
| if not self.parallel_put( | ||||||
| local_path, | ||||||
| (obj, o), | ||||||
| (obj_path, o), | ||||||
| total_bytes=sizelist[0], | ||||||
| num_threads=num_threads, | ||||||
| target_resource_name=options.get(kw.RESC_NAME_KW, "") or options.get(kw.DEST_RESC_NAME_KW, ""), | ||||||
|
|
@@ -346,7 +375,7 @@ | |||||
| except BaseException as e: | ||||||
| raise error from e | ||||||
| else: | ||||||
| with self.open(obj, "w", **options) as o: | ||||||
| with self.open(obj_path, "w", **options) as o: | ||||||
| # Set operation type to trigger acPostProcForPut | ||||||
| if kw.OPR_TYPE_KW not in options: | ||||||
| options[kw.OPR_TYPE_KW] = 1 # PUT_OPR | ||||||
|
|
@@ -360,10 +389,11 @@ | |||||
| # Requested to register checksum without verifying, but source replica has a checksum. This can result | ||||||
| # in multiple replicas being marked good with different checksums, which is an inconsistency. | ||||||
| del repl_options[kw.REG_CHKSUM_KW] | ||||||
| self.replicate(obj, **repl_options) | ||||||
| self.replicate(obj_path, **repl_options) | ||||||
|
|
||||||
| if return_data_object: | ||||||
| return self.get(obj) | ||||||
| return self.get(obj_path, replica_sort_function=replica_sort_function) | ||||||
| return None | ||||||
|
|
||||||
| def chksum(self, path, **options): | ||||||
| """ | ||||||
|
|
@@ -480,6 +510,7 @@ | |||||
| raise ex.DataObjectExistsAtLogicalPath | ||||||
|
|
||||||
| options = {**options, kw.DATA_TYPE_KW: "generic"} | ||||||
| replica_sort_function = options.pop('replica_sort_function', None) | ||||||
|
|
||||||
| if resource: | ||||||
| options[kw.DEST_RESC_NAME_KW] = resource | ||||||
|
|
@@ -508,7 +539,7 @@ | |||||
| desc = response.int_info | ||||||
| conn.close_file(desc) | ||||||
|
|
||||||
| return self.get(path) | ||||||
| return self.get(path, replica_sort_function=replica_sort_function) | ||||||
|
|
||||||
| def open_with_FileRaw(self, *arg, **kw_options): | ||||||
| holder = [] | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.