Skip to content

Commit b3203ca

Browse files
committed
Add progress bar to hca dss upload (closes #117)
Adds a progress bar to `hca dss upload`. This introduces a new dependency, `tqdm` (which already exists as an indirect development requirement). The progress bar will show by default unless one of the following conditions is true: * `hca` has been invoked non-interactively (i.e. stdout is not a TTY) * The logging level is higher than INFO * The `--no-progress` flag is passed The progress bar displays the amount of data uploaded (instead of files uploaded) and can guess how much time remains in the upload.
1 parent a93f30a commit b3203ca

5 files changed

Lines changed: 104 additions & 9 deletions

File tree

hca/config.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1-
import os, logging
1+
# -*- coding: utf-8 -*-
2+
import logging
3+
import os
24

5+
import tqdm
36
from tweak import Config as _Config
47

8+
59
class HCAConfig(_Config):
610
default_config_file = os.path.join(os.path.dirname(__file__), "default_config.json")
711

@@ -18,11 +22,22 @@ def user_config_dir(self):
1822

1923

2024
_config = None
25+
26+
2127
def get_config():
2228
global _config
2329
if _config is None:
2430
_config = HCAConfig()
2531
return _config
2632

2733

34+
class ProgressBarStreamHandler(object):
35+
"""
36+
Stream handler that allows for logging with a :mod:`tqdm` progress bar.
37+
"""
38+
@staticmethod
39+
def write(msg):
40+
tqdm.tqdm.write(msg, end='')
41+
42+
2843
logger = logging.getLogger("hca")

hca/dss/__init__.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ def create_version(self):
8989
def _create_version(self):
9090
return datetime.utcnow().strftime("%Y-%m-%dT%H%M%S.%fZ")
9191

92-
def upload(self, src_dir, replica, staging_bucket, timeout_seconds=1200):
92+
def upload(self, src_dir, replica, staging_bucket, timeout_seconds=1200, no_progress=False):
9393
"""
9494
Upload a directory of files from the local filesystem and create a bundle containing the uploaded files.
9595
@@ -98,6 +98,9 @@ def upload(self, src_dir, replica, staging_bucket, timeout_seconds=1200):
9898
`gcp` for Google Cloud Platform. [aws, gcp]
9999
:param str staging_bucket: a client controlled AWS S3 storage bucket to upload from.
100100
:param int timeout_seconds: the time to wait for a file to upload to replica.
101+
:param bool no_progress: if set, will not report upload progress. Note that even if this flag
102+
is not set, progress will not be reported if the logging level is higher
103+
than INFO or if the session is not interactive.
101104
102105
Upload a directory of files from the local filesystem and create a bundle containing the uploaded files.
103106
This method requires the use of a client-controlled object storage bucket to stage the data for upload.
@@ -112,7 +115,7 @@ def upload(self, src_dir, replica, staging_bucket, timeout_seconds=1200):
112115

113116
logger.info("Uploading %i files from %s to %s", len(files_to_upload), src_dir, staging_bucket)
114117
file_uuids, uploaded_keys, abs_file_paths = upload_to_cloud(files_to_upload, staging_bucket=staging_bucket,
115-
replica=replica, from_cloud=False)
118+
replica=replica, from_cloud=False, log_progress=not no_progress)
116119
for file_handle in files_to_upload:
117120
file_handle.close()
118121
filenames = [object_name_builder(p, src_dir) for p in abs_file_paths]

hca/dss/upload_to_cloud.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,14 @@
66
import logging
77
import mimetypes
88
import os
9+
import sys
910
import uuid
1011

1112
import boto3
1213
from boto3.s3.transfer import TransferConfig
14+
import tqdm
1315

16+
from ..config import logger, ProgressBarStreamHandler
1417
from dcplib import s3_multipart
1518
from dcplib.checksumming_io import ChecksummingBufferedReader
1619

@@ -50,25 +53,37 @@ def _copy_from_s3(path, s3):
5053
return file_uuids, key_names
5154

5255

53-
def upload_to_cloud(file_handles, staging_bucket, replica, from_cloud=False):
56+
def upload_to_cloud(file_handles, staging_bucket, replica, from_cloud=False, log_progress=False):
5457
"""
5558
Upload files to cloud.
5659
5760
:param file_handles: If from_cloud, file_handles is a aws s3 directory path to files with appropriate
5861
metadata uploaded. Else, a list of binary file_handles to upload.
5962
:param staging_bucket: The aws bucket to upload the files to.
6063
:param replica: The cloud replica to write to. One of 'aws', 'gc', or 'azure'. No functionality now.
64+
:param bool log_progress: set to True to log progress to stdout. Progress bar will reflect bytes
65+
uploaded (and not files uploaded). This is off by default,
66+
as direct calls to this function are assumed to be programmatic.
67+
In addition, even if this is set to True, a progress bar will not
68+
be shown if (a) the logging level is not INFO or lower or (b) an
69+
interactive session is not detected.
6170
:return: a list of file uuids, key-names, and absolute file paths (local) for uploaded files
6271
"""
6372
s3 = boto3.resource("s3")
6473
file_uuids = []
6574
key_names = []
6675
abs_file_paths = []
76+
log_progress = all((logger.getEffectiveLevel() <= logging.INFO, sys.stdout.isatty(), log_progress))
6777

6878
if from_cloud:
6979
file_uuids, key_names = _copy_from_s3(file_handles[0], s3)
7080
else:
7181
destination_bucket = s3.Bucket(staging_bucket)
82+
if log_progress:
83+
total_upload_size = sum(os.fstat(f.fileno()).st_size for f in file_handles)
84+
logger.addHandler(ProgressBarStreamHandler())
85+
progress = tqdm.tqdm(total=total_upload_size, desc="Uploading to " + replica,
86+
unit="B", unit_scale=True, unit_divisor=1024)
7287
for raw_fh in file_handles:
7388
file_size = os.path.getsize(raw_fh.name)
7489
multipart_chunksize = s3_multipart.get_s3_multipart_chunk_size(file_size)
@@ -81,6 +96,7 @@ def upload_to_cloud(file_handles, staging_bucket, replica, from_cloud=False):
8196
fh,
8297
key_name,
8398
Config=tx_cfg,
99+
Callback=lambda x: progress.update(x) if log_progress else None,
84100
ExtraArgs={
85101
'ContentType': _mime_type(fh.raw.name),
86102
}
@@ -98,5 +114,7 @@ def upload_to_cloud(file_handles, staging_bucket, replica, from_cloud=False):
98114
file_uuids.append(file_uuid)
99115
key_names.append(key_name)
100116
abs_file_paths.append(fh.raw.name)
101-
117+
if log_progress:
118+
logger.handlers = [l for l in logger.handlers if not isinstance(l, ProgressBarStreamHandler)]
119+
progress.close()
102120
return file_uuids, key_names, abs_file_paths

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,5 @@ requests >= 2.20.0, < 3
1818
rsa<=3.5.0,>=3.1.2
1919
s3transfer<0.3.0,>=0.2.0
2020
tenacity >=5.0.2, < 5.1
21+
tqdm >=4.33.0, < 5
2122
tweak >= 1.0.2, < 2

test/integration/dss/test_dss_cli.py

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import filecmp
55
import json
66
import os
7+
import pty
78
import sys
89
import unittest
910
import uuid
@@ -114,6 +115,63 @@ def _put_test_col(args, uuid=str(uuid.uuid4()), replica='aws'):
114115
with CapturingIO('stdout'):
115116
hca.cli.main(args=base_args)
116117

118+
def test_upload_progress_bar(self):
119+
dirpath = os.path.join(TEST_DIR, 'tutorial', 'data') # arbitrary and small
120+
put_args = ['dss', 'upload', '--src-dir', dirpath, '--replica',
121+
'aws', '--staging-bucket', 'org-humancellatlas-dss-cli-test']
122+
123+
with self.subTest("Suppress progress bar if not interactive"):
124+
with CapturingIO('stdout') as stdout:
125+
hca.cli.main(args=put_args)
126+
# If using CapturingIO, `hca dss upload` should know it's not being
127+
# invoked interactively and as such not show a progress bar. Which
128+
# means that stdout should parse nicely as json
129+
self.assertTrue(json.loads(stdout.captured()))
130+
131+
@unittest.skipIf(os.name is 'nt', 'No pty support on Windows')
132+
def test_upload_progress_bar_interactive(self):
133+
"""Tests upload progress bar with a simulated interactive session"""
134+
dirpath = os.path.join(TEST_DIR, 'tutorial', 'data') # arbitrary and small
135+
put_args = ['dss', 'upload', '--src-dir', dirpath, '--replica',
136+
'aws', '--staging-bucket', 'org-humancellatlas-dss-cli-test']
137+
138+
# In an interactive session, we should see a progress bar if we
139+
# don't pass `--no-progress`.
140+
with self.subTest("Show progress bar if interactive"):
141+
child_pid, fd = pty.fork()
142+
if child_pid == 0:
143+
hca.cli.main(args=put_args)
144+
os._exit(0)
145+
output = self._get_child_output(child_pid, fd)
146+
self.assertTrue('Uploading to aws' in output, output)
147+
148+
with self.subTest("Don't show progress bar if interactive and not logging INFO"):
149+
child_pid, fd = pty.fork()
150+
if child_pid == 0:
151+
hca.cli.main(args=['--log-level', 'WARNING'] + put_args)
152+
os._exit(0)
153+
output = self._get_child_output(child_pid, fd)
154+
self.assertFalse('Uploading to aws' in output, output)
155+
156+
with self.subTest("Don't show progress bar if interactive and --no-progress"):
157+
child_pid, fd = pty.fork()
158+
if child_pid == 0:
159+
hca.cli.main(args=put_args + ['--no-progress'])
160+
os._exit(0)
161+
output = self._get_child_output(child_pid, fd)
162+
self.assertFalse('Uploading to aws' in output, output)
163+
164+
@staticmethod
165+
def _get_child_output(child_pid, fd):
166+
output = ''
167+
while not os.waitpid(child_pid, os.WNOHANG)[0]:
168+
try:
169+
output += os.read(fd, 128).decode()
170+
except OSError:
171+
break
172+
os.close(fd)
173+
return output
174+
117175
@staticmethod
118176
@contextlib.contextmanager
119177
def _put_test_bdl(dirpath=os.path.join(TEST_DIR, 'res', 'bundle'),
@@ -130,8 +188,8 @@ def _put_test_bdl(dirpath=os.path.join(TEST_DIR, 'res', 'bundle'),
130188
:rtype: dict
131189
:returns: upload response object
132190
"""
133-
put_args = ['dss', 'upload', '--src-dir', dirpath, '--replica',
134-
replica, '--staging-bucket', staging_bucket]
191+
put_args = ['dss', 'upload', '--src-dir', dirpath, '--replica',
192+
replica, '--staging-bucket', staging_bucket, '--no-progress']
135193
with CapturingIO('stdout') as stdout:
136194
hca.cli.main(args=put_args)
137195
rv = json.loads(stdout.captured())
@@ -168,8 +226,8 @@ def test_collection_download(self):
168226
'--name', 'collection_test:%s' % bdl['bundle_uuid']
169227
]
170228
with self._put_test_col(put_col_args) as col, \
171-
tempfile.TemporaryDirectory() as t1, \
172-
tempfile.TemporaryDirectory() as t2:
229+
tempfile.TemporaryDirectory() as t1, \
230+
tempfile.TemporaryDirectory() as t2:
173231
dl_col_args = ['dss', 'download-collection', '--uuid', col['uuid'],
174232
'--replica', 'aws', '--download-dir', t1]
175233
hca.cli.main(args=dl_col_args)

0 commit comments

Comments
 (0)