Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,18 @@ ensure the lifetime of the updatable instance extends beyond the time needed for

See `irods/test/data_obj_test.py` for examples of these and other subtleties of progress bar usage.

Replica access and sorting
--------------------------

The `replicas` member of an instance of `iRODSDataObject` allows a view into the various replicas that exist under a
given logical path. For an example of this, jump forward to [working with data objects](#working-with-data-objects-files).

Note that a `replica_sort_function` option exists in the `iRODSDataObject` constructor and in
`<session_object>.data_objects.get`. If `replica_sort_function` is not specified, the replicas
list contained in the returned object will be sorted according to replica number
(REPLICA_NUMBER_SORT_KEY_FN) in PRC < v4 or by general fitness to be considered as a
current data copy (REPLICA_FITNESS_SORT_KEY_FN). in PRC >= v4. For those definitions, see `irods/data_object.py`.
Comment on lines +465 to +466
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(REPLICA_NUMBER_SORT_KEY_FN) in PRC < v4 or by general fitness to be considered as a
current data copy (REPLICA_FITNESS_SORT_KEY_FN). in PRC >= v4. For those definitions, see `irods/data_object.py`.
(REPLICA_NUMBER_SORT_KEY_FN) in PRC < v4 or by general fitness to be considered a
good representation of that object (REPLICA_FITNESS_SORT_KEY_FN) in PRC >= v4. For those definitions, see `irods/data_object.py`.


Working with collections (directories)
--------------------------------------

Expand Down
75 changes: 66 additions & 9 deletions irods/data_object.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
"""
Interface for iRODS data objects.

Provides high level abstraction and POSIX-like facilities (create, open,
read/write) allowing clients to manipulate data objects very much as if they
were local files.
"""

import ast
import enum
import io
import sys
import logging
import os
import ast
import sys
from datetime import datetime, timezone

from irods.models import DataObject
from irods.meta import iRODSMetaCollection
import irods.keywords as kw
from irods.api_number import api_number
from irods.message import JSON_Message, iRODSMessage
from irods.meta import iRODSMetaCollection
from irods.models import DataObject

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -41,11 +51,57 @@ def __repr__(self):
return "<{}.{} {}>".format(self.__class__.__module__, self.__class__.__name__, self.resource_name)


class _repl_status(enum.Enum): # noqa: N801
STALE_REPLICA, GOOD_REPLICA, INTERMEDIATE_REPLICA, READ_LOCKED, WRITE_LOCKED = range(5)


# An ordering of the various replica status values, by descending fitness for use/interface
_REPL_STATUSES = tuple(
getattr(_repl_status, ident).value
for ident in (
"GOOD_REPLICA",
"STALE_REPLICA",
"INTERMEDIATE_REPLICA",
"READ_LOCKED",
"WRITE_LOCKED",
)
)

# An appropriate reference datetime value for gauging replica age as part of
# the default sort key in PRC4 and onward.
_REFERENCE_DATETIME = datetime.fromtimestamp(0, timezone.utc)

# ruff: noqa: D103 off

# Key functions to dictate how replica row results will be sorted within an iRODSDataObject.


def REPLICA_NUMBER_SORT_KEY_FN(row): # noqa: N802
return row[DataObject.replica_number]


def REPLICA_FITNESS_SORT_KEY_FN(row): # noqa: N802
repl_status = int(row[DataObject.replica_status])

repl_status_rank = _REPL_STATUSES.index(repl_status) if _REPL_STATUSES.count(repl_status) else sys.maxsize

return (repl_status_rank, _REFERENCE_DATETIME - row[DataObject.modify_time])


# ruff: noqa: D103 on

_DEFAULT_SORT_KEY_FN = REPLICA_NUMBER_SORT_KEY_FN
Comment thread
d-w-moore marked this conversation as resolved.


class iRODSDataObject:
def __init__(self, manager, parent=None, results=None):
# iRODSDataObject's constructor is not usually directly accessed by iRODS client applications. See the main README.
# ruff: noqa: D107 off

def __init__(self, manager, parent=None, results=None, replica_sort_function=None):
self.manager = manager
if parent and results:
self.collection = parent
results = sorted(results, key=(replica_sort_function or _DEFAULT_SORT_KEY_FN))
for attr, value in DataObject.__dict__.items():
if not attr.startswith("_"):
try:
Expand All @@ -54,9 +110,8 @@ def __init__(self, manager, parent=None, results=None):
# backward compatibility with older schema versions
pass
self.path = self.collection.path + "/" + self.name
replicas = sorted(results, key=lambda r: r[DataObject.replica_number])

# The status quo before iRODS 5
# Copy pre-iRODS 5 fields

replica_args = [
(
Expand All @@ -75,18 +130,20 @@ def __init__(self, manager, parent=None, results=None):
modify_time=r[DataObject.modify_time],
),
)
for r in replicas
for r in results
]

# Adjust for adding access_time in the iRODS 5 case.

if self.manager.sess.server_version >= (5,):
for n, r in enumerate(replicas):
for n, r in enumerate(results):
replica_args[n][1]['access_time'] = r[DataObject.access_time]
self.replicas = [iRODSReplica(*a, **k) for a, k in replica_args]

self._meta = None

# ruff: noqa: D107 off

def __repr__(self):
return f"<iRODSDataObject {self.id} {self.name}>"

Expand Down
119 changes: 75 additions & 44 deletions irods/manager/data_object_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,37 @@
import os
import weakref
from typing import Any, List, Type
from irods.models import DataObject, Collection
from irods.manager import Manager
from irods.manager._internal import _api_impl, _logical_path
from irods.message import (
iRODSMessage,
FileOpenRequest,
ObjCopyRequest,
StringStringMap,
DataObjInfo_for_session,
ModDataObjMeta_for_session,
DataObjChksumRequest,
DataObjChksumResponse,
RErrorStack,
STR_PI,
)

import irods.client_configuration as client_config
import irods.exception as ex
import irods.keywords as kw
import irods.parallel as parallel

Check failure on line 13 in irods/manager/data_object_manager.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff PLR0402

PLR0402: Use `from irods import parallel` in lieu of alias [Pylint:manual-from-import]
from irods.api_number import api_number
from irods.collection import iRODSCollection
from irods.data_object import (
iRODSDataObject,
iRODSDataObjectFileRaw,
chunks,
irods_dirname,
irods_basename,
irods_dirname,
iRODSDataObject,
iRODSDataObjectFileRaw,
)
import irods.client_configuration as client_config
import irods.keywords as kw
import irods.parallel as parallel
from irods.manager import Manager
from irods.manager._internal import _api_impl, _logical_path
from irods.message import (
STR_PI,
DataObjChksumRequest,
DataObjChksumResponse,
DataObjInfo_for_session,
FileOpenRequest,
ModDataObjMeta_for_session,
ObjCopyRequest,
RErrorStack,

Check failure on line 33 in irods/manager/data_object_manager.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff F401

F401: `irods.message.RErrorStack` imported but unused [Pyflakes:unused-import]
StringStringMap,
iRODSMessage,
)
from irods.models import Collection, DataObject
from irods.parallel import deferred_call


logger = logging.getLogger(__name__)

_update_types: List[Type] = []
Expand Down Expand Up @@ -218,27 +218,29 @@
if size is not None and isinstance(open_options, dict):
open_options[kw.DATA_SIZE_KW] = size

def _download(self, obj, local_path, num_threads, updatables=(), **options):
def _download(self, obj_path, local_path, num_threads, updatables=(), **options):
"""Transfer the contents of a data object to a local file.

Called from get() when a local path is named.
"""
if os.path.isdir(local_path):
local_file = os.path.join(local_path, irods_basename(obj))
else:
local_file = local_path

local_file = (
os.path.join(local_path, irods_basename(obj_path)) # noqa: PTH118
if os.path.isdir(local_path) # noqa: PTH112
else local_path
)

# Check for force flag if local_file exists
if os.path.exists(local_file) and kw.FORCE_FLAG_KW not in options:
raise ex.OVERWRITE_WITHOUT_FORCE_FLAG

data_open_returned_values_ = {}
with self.open(obj, "r", returned_values=data_open_returned_values_, **options) as o:
with self.open(obj_path, "r", returned_values=data_open_returned_values_, **options) as o:
if self.should_parallelize_transfer(num_threads, o, open_options=options.items()):
error = RuntimeError("parallel get failed")
try:
if not self.parallel_get(
(obj, o),
(obj_path, o),
local_file,
num_threads=num_threads,
target_resource_name=options.get(kw.RESC_NAME_KW, ""),
Expand All @@ -256,13 +258,38 @@
f.write(chunk)
do_progress_updates(updatables, len(chunk))

def get(self, path, local_path=None, num_threads=DEFAULT_NUMBER_OF_THREADS, updatables=(), **options):
def get(self,
path,
local_path=None,
num_threads=DEFAULT_NUMBER_OF_THREADS,
updatables=(),
replica_sort_function=None,
**options):

Check failure on line 267 in irods/manager/data_object_manager.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-format

Ruff format

Improper formatting
"""
Get a reference to the data object at the specified `path'.

Only download the object if the local_path is a string (specifying
a path in the local filesystem to use as a destination file).
Create an iRODSDataObject instance representing the data object at the specified path. If local_path
is not None, it names a local file to which the content of the data object will be downloaded.

Args:
path: an absolute logical path where the data object may be found.
local_path: a filename within the local filesystem, the target for download ("GET") of the data
object if one is requested. Directory components in the path must already exist.
num_threads: in the case of a parallel data transfer (for large files), specifies how many transfer
control threads should be used.
updatables: a tuple or list in which each element is a tqdm-like progress bar object, or the equivalent:
an instance on which func.update(n) can be called with n being the number of bytes successfully
copied in the the data transfer increment just completed. If not a tuple or list, this argument
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
copied in the the data transfer increment just completed. If not a tuple or list, this argument
copied in the data transfer increment just completed. If not a tuple or list, this argument

is interpreted as a single updatable.
replica_sort_function: a sort key function dictating the order of replica query results in 'self.replicas'
Comment on lines +274 to +282
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These parameters have default arguments which are not explained or mentioned. Please document what the behavior is when the defaults are used.

**options: keyword arguments to be passed onto the _download function; A combination of possible options
to be relayed to the iRODS data object open() call.

Returns:
an iRODSDataObject representing the object at the given path.

Raises:
DataObjectDoesNotExist: if the specified path does not exist, or exists as a collection rather than a data
object.
"""

Check failure on line 292 in irods/manager/data_object_manager.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff D205

D205: 1 blank line required between summary line and description [pydocstyle:missing-blank-line-after-summary]
parent = self.sess.collections.get(irods_dirname(path))

# TODO: optimize
Expand All @@ -284,7 +311,7 @@
results = query.all() # get up to max_rows replicas
if len(results) <= 0:
raise ex.DataObjectDoesNotExist()
return iRODSDataObject(self, parent, results)
return iRODSDataObject(self, parent, results, replica_sort_function=replica_sort_function)

@staticmethod
def _resolve_force_put_option(options, default_setting=None, true_value=""):
Expand Down Expand Up @@ -317,23 +344,25 @@
self._resolve_force_put_option(options, default_setting=client_config.data_objects.force_put_by_default)

if self.sess.collections.exists(irods_path):
obj = iRODSCollection.normalize_path(irods_path, os.path.basename(local_path))
obj_path = iRODSCollection.normalize_path(irods_path, os.path.basename(local_path)) # noqa: PTH119
else:
obj = irods_path
if kw.FORCE_FLAG_KW not in options and self.exists(obj):
obj_path = irods_path
if kw.FORCE_FLAG_KW not in options and self.exists(obj_path):
raise ex.OVERWRITE_WITHOUT_FORCE_FLAG
options.pop(kw.FORCE_FLAG_KW, None)

replica_sort_function = options.pop('replica_sort_function', None)

with open(local_path, "rb") as f:
sizelist = []
if self.should_parallelize_transfer(num_threads, f, measured_obj_size=sizelist, open_options=options):
o = deferred_call(self.open, (obj, "w"), options)
o = deferred_call(self.open, (obj_path, "w"), options)
f.close()
error = RuntimeError("parallel put failed")
try:
if not self.parallel_put(
local_path,
(obj, o),
(obj_path, o),
total_bytes=sizelist[0],
num_threads=num_threads,
target_resource_name=options.get(kw.RESC_NAME_KW, "") or options.get(kw.DEST_RESC_NAME_KW, ""),
Expand All @@ -346,7 +375,7 @@
except BaseException as e:
raise error from e
else:
with self.open(obj, "w", **options) as o:
with self.open(obj_path, "w", **options) as o:
# Set operation type to trigger acPostProcForPut
if kw.OPR_TYPE_KW not in options:
options[kw.OPR_TYPE_KW] = 1 # PUT_OPR
Expand All @@ -360,10 +389,11 @@
# Requested to register checksum without verifying, but source replica has a checksum. This can result
# in multiple replicas being marked good with different checksums, which is an inconsistency.
del repl_options[kw.REG_CHKSUM_KW]
self.replicate(obj, **repl_options)
self.replicate(obj_path, **repl_options)

if return_data_object:
return self.get(obj)
return self.get(obj_path, replica_sort_function=replica_sort_function)
return None

def chksum(self, path, **options):
"""
Expand Down Expand Up @@ -480,6 +510,7 @@
raise ex.DataObjectExistsAtLogicalPath

options = {**options, kw.DATA_TYPE_KW: "generic"}
replica_sort_function = options.pop('replica_sort_function', None)

if resource:
options[kw.DEST_RESC_NAME_KW] = resource
Expand Down Expand Up @@ -508,7 +539,7 @@
desc = response.int_info
conn.close_file(desc)

return self.get(path)
return self.get(path, replica_sort_function=replica_sort_function)

def open_with_FileRaw(self, *arg, **kw_options):
holder = []
Expand Down
Loading
Loading