|
1 | 1 | """ |
2 | | -NOTE: - This script is only for updating EXISTING scripts. |
3 | | - - Scripts MUST be uploaded manually first time. (this tool can still be used to do zipping) |
| 2 | +This script zips up all files into an archive and then updates script on cloudshell server |
| 3 | +Usage: |
| 4 | +- Populate cloudshell api credentials |
| 5 | +- Run this script from inside the target script directory, otherwise pass the full path as argument if running from elsewhere |
| 6 | +- A custom script name can be passed, otherwise falls back to current directory name |
| 7 | +
|
| 8 | +Notes: |
| 9 | +- Zip will be generated into a dist folder, and will exclude all unneccesary files |
| 10 | +- This script supports zipping up scripts with nested folders / modules |
| 11 | +- Will throw validation error if __main__.py is not found |
4 | 12 | """ |
5 | | -from cloudshell.api.cloudshell_api import CloudShellAPISession |
6 | | -import os |
7 | | -import credentials # may need to create this module and set the constants for the api session |
8 | | - |
9 | | -# ===== Optional Variables to set ======= |
10 | | - |
11 | | -# To name zip package something other than the default directory name |
12 | | -CUSTOM_SCRIPT_NAME = '' |
13 | | - |
14 | | - |
15 | | -# ======================================= |
| 13 | +import sys |
| 14 | +import zipfile |
| 15 | +from dataclasses import dataclass |
| 16 | +from typing import List |
16 | 17 |
|
| 18 | +from cloudshell.api.cloudshell_api import CloudShellAPISession, CloudShellAPIError |
| 19 | +import os |
| 20 | +from pathlib import Path |
17 | 21 |
|
18 | | -def get_api_session(): |
19 | | - return CloudShellAPISession(host=credentials.SERVER, |
20 | | - username=credentials.USER, |
21 | | - password=credentials.PASSWORD, |
22 | | - domain=credentials.DOMAIN) |
| 22 | +DIRS_TO_EXCLUDE = [".git", "dist", "venv", ".idea", ".lol"] |
| 23 | +FILES_TO_EXCLUDE = ["credentials.py"] |
23 | 24 |
|
24 | 25 |
|
25 | 26 | def error_red(err_str): |
26 | | - """ |
27 | | - for printing errors in red in pycharm. |
28 | | - :param err_str: |
29 | | - :return: |
30 | | - """ |
31 | | - CRED = '\033[91m' |
32 | | - CEND = '\033[0m' |
33 | | - return CRED + err_str + CEND |
34 | | - |
35 | | - |
36 | | -def get_zip_details(): |
37 | | - parent_dir_path = os.path.abspath('.') |
38 | | - parent_dir_name = os.path.basename(parent_dir_path) |
39 | | - script_name = CUSTOM_SCRIPT_NAME or parent_dir_name |
40 | | - zip_file_name = script_name + '.zip' |
41 | | - |
42 | | - return {"parent_dir_path": parent_dir_path, |
43 | | - "parent_dir_name": parent_dir_name, |
44 | | - "script_name": script_name, |
45 | | - "zip_file_name": zip_file_name} |
| 27 | + cred = '\033[91m' |
| 28 | + cend = '\033[0m' |
| 29 | + return cred + err_str + cend |
46 | 30 |
|
47 | 31 |
|
48 | | -def is_whitelisted(f, file_path, files_to_exclude): |
49 | | - is_regular_file = os.path.isfile(file_path) |
50 | | - is_not_excluded = f not in files_to_exclude |
51 | | - is_not_pyc = not f.endswith('.pyc') |
52 | | - return is_regular_file and is_not_excluded and is_not_pyc |
| 32 | +@dataclass |
| 33 | +class ZipDetails: |
| 34 | + script_dir_path: str |
| 35 | + script_dir_name: str |
| 36 | + script_name: str |
| 37 | + zip_file_name: str |
| 38 | + archive_path: str |
53 | 39 |
|
54 | 40 |
|
55 | | -def make_zipfile(output_filename, source_dir, files_to_exclude, dirs_to_exclude): |
56 | | - import zipfile |
57 | | - with zipfile.ZipFile(output_filename, "w", zipfile.ZIP_DEFLATED) as z: |
58 | | - for root, dirs, files in os.walk(source_dir): |
59 | | - dirs[:] = [d for d in dirs if d not in dirs_to_exclude] |
60 | | - for f in files: |
61 | | - file_path = os.path.join(root, f) |
62 | | - if is_whitelisted(f, file_path, files_to_exclude): |
63 | | - arcname = os.path.join(os.path.relpath(root, source_dir), f) |
64 | | - z.write(file_path, arcname) |
65 | | - |
66 | | - |
67 | | -def zip_files(): |
68 | | - zip_details = get_zip_details() |
69 | | - zip_file_name = zip_details["zip_file_name"] |
70 | | - dirs_to_exclude = [".git"] |
71 | | - files_to_exclude = [zip_file_name, "venv", ".idea", "credentials.py"] |
| 41 | +def _get_zip_details(script_dir_path: str, script_name: str): |
| 42 | + zip_file_name = script_name + '.zip' |
| 43 | + archive_path = os.path.join(script_dir_path, "dist", zip_file_name) |
| 44 | + return ZipDetails(script_dir_path=script_dir_path, |
| 45 | + script_dir_name=os.path.basename(script_dir_path), |
| 46 | + script_name=script_name, |
| 47 | + zip_file_name=f"{script_name}.zip", |
| 48 | + archive_path=archive_path) |
| 49 | + |
| 50 | + |
| 51 | +def _is_valid_file(file_name: str, file_path: str, files_to_exclude: List[str]): |
| 52 | + invalid_conditions = [file_name.endswith(".pyc"), |
| 53 | + file_name.endswith(".zip"), |
| 54 | + file_name in files_to_exclude, |
| 55 | + file_name == os.path.basename(__file__), # exclude the updater script |
| 56 | + not os.path.isfile(file_path)] |
| 57 | + if any(invalid_conditions): |
| 58 | + return False |
| 59 | + return True |
| 60 | + |
| 61 | + |
| 62 | +def _create_zip_archive(archive_path: str, src_path: str): |
| 63 | + with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as archive_file: |
| 64 | + for dirpath, dirnames, filenames in os.walk(src_path): |
| 65 | + dir_name = os.path.basename(dirpath) |
| 66 | + |
| 67 | + # validate top level folder that it has a __main__.py entry point |
| 68 | + if dir_name == os.path.basename(src_path) and "__main__.py" not in filenames: |
| 69 | + print(error_red(f"No '__main__.py' file found in '{dir_name}' Directory. Not a valid cloudshell script.")) |
| 70 | + sys.exit(1) |
| 71 | + |
| 72 | + # skip dist and other excluded folders |
| 73 | + if dir_name in DIRS_TO_EXCLUDE: |
| 74 | + continue |
| 75 | + |
| 76 | + # validate files and add to archive |
| 77 | + for filename in filenames: |
| 78 | + file_path = os.path.join(dirpath, filename) |
| 79 | + if not _is_valid_file(filename, file_path, FILES_TO_EXCLUDE): |
| 80 | + continue |
| 81 | + archive_file_path = os.path.relpath(file_path, src_path) |
| 82 | + archive_file.write(file_path, archive_file_path) |
| 83 | + |
| 84 | + # validate generated zip archive |
| 85 | + with zipfile.ZipFile(archive_path, 'r') as archive_file: |
| 86 | + bad_file = zipfile.ZipFile.testzip(archive_file) |
| 87 | + |
| 88 | + if bad_file: |
| 89 | + raise zipfile.BadZipFile('CRC check failed for {} with file {}'.format(archive_path, bad_file)) |
| 90 | + |
| 91 | + |
| 92 | +def _zip_files(zip_details: ZipDetails): |
72 | 93 | try: |
73 | | - make_zipfile(output_filename=zip_file_name, |
74 | | - source_dir=zip_details["parent_dir_path"], |
75 | | - files_to_exclude=files_to_exclude, |
76 | | - dirs_to_exclude=dirs_to_exclude) |
| 94 | + _create_zip_archive(archive_path=zip_details.archive_path, src_path=zip_details.script_dir_path) |
77 | 95 | except Exception as e: |
78 | | - print(error_red("[-] error zipping up file: " + str(e))) |
79 | | - exit(1) |
80 | | - else: |
81 | | - if zip_file_name in os.listdir("."): |
82 | | - print("[+] ZIPPED UP: '{zip_name}'".format(zip_name=zip_file_name)) |
83 | | - else: |
84 | | - print("[-] ZIP FILE NOT PRESENT") |
| 96 | + err_msg = f"Error zipping up file: {type(e).__name__}: {str(e)}" |
| 97 | + raise Exception(err_msg) |
85 | 98 |
|
86 | 99 |
|
87 | | -def update_script_api_wrapper(cs_ses, script_name, zip_address): |
| 100 | +def _update_script(api: CloudShellAPISession, script_name: str, zip_name: str): |
88 | 101 | try: |
89 | | - cs_ses.UpdateScript(script_name, zip_address) |
90 | | - except Exception as e: |
91 | | - print(error_red("[-] ERROR UPDATING SCRIPT IN PORTAL\n" + str(e)) + "\n" |
92 | | - "PLEASE LOAD SCRIPT MANUALLY THE FIRST TIME") |
93 | | - exit(1) |
94 | | - else: |
95 | | - print("[+] '{script}' updated on CloudShell Successfully".format(script=script_name)) |
| 102 | + api.UpdateScript(script_name, zip_name) |
| 103 | + except CloudShellAPIError as e: |
| 104 | + if e.code == "100": |
| 105 | + err_msg = f"[-] '{script_name}' update FAILED. {str(e)}. (Please upload manually first time)." |
| 106 | + print(error_red(err_msg)) |
| 107 | + sys.exit(1) |
| 108 | + raise |
| 109 | + |
| 110 | + |
| 111 | +def _create_dist_folder(script_dir_path: str): |
| 112 | + Path(f"{script_dir_path}/dist").mkdir(exist_ok=True) |
| 113 | + |
96 | 114 |
|
| 115 | +def update_script_on_server(api: CloudShellAPISession, script_name: str = None, script_dir_path: str = None): |
| 116 | + script_dir_path = script_dir_path or os.path.abspath('.') |
| 117 | + script_name = script_name or os.path.basename(script_dir_path) |
97 | 118 |
|
98 | | -def update_script_on_server(): |
99 | | - zip_files() |
100 | | - cs_ses = get_api_session() |
101 | | - zip_details = get_zip_details() |
102 | | - update_script_api_wrapper(cs_ses=cs_ses, |
103 | | - script_name=zip_details["script_name"], |
104 | | - zip_address=zip_details["zip_file_name"]) |
| 119 | + _create_dist_folder(script_dir_path) |
| 120 | + zip_details = _get_zip_details(script_dir_path, script_name) |
| 121 | + _zip_files(zip_details) |
| 122 | + print(" [+] ZIPPED UP: '{zip_name}'".format(zip_name=zip_details.zip_file_name)) |
| 123 | + _update_script(api, script_name=zip_details.script_name, zip_name=zip_details.archive_path) |
| 124 | + print(f"[+] '{script_name}' updated on CloudShell Successfully") |
105 | 125 |
|
106 | 126 |
|
107 | | -update_script_on_server() |
| 127 | +if __name__ == "__main__": |
| 128 | + cs_api = CloudShellAPISession("localhost", "admin", "admin", "Global") |
| 129 | + update_script_on_server(cs_api) |
0 commit comments