diff --git a/update.py b/update.py index 935d7ff..4ef0163 100755 --- a/update.py +++ b/update.py @@ -26,6 +26,8 @@ DEFAULT_EDITOR = "/usr/bin/nano" def log_print(*args, **kwargs): + """Prints to stdout, then logs to GLOBAL_LOG_FILE.""" + if "file" in kwargs: kwargs["file"] = sys.stdout print(*args, **kwargs) @@ -35,6 +37,21 @@ def log_print(*args, **kwargs): def ensure_pkg_dir_exists(pkg, pkg_state, other_state): + """Ensures that an AUR-pkg-dir exists, returning False on failure. + + If no such directory exists, this script attempts to clone it from the AUR. + True is returned on successful cloning. + + If a file exists with the same name, returns False. + + If a directory exists with the same name, returns True. + + If "repo_path" is specified and the directory doesn't exist, returns False. + + If "NO_REPO" is specified as "repo_path" and the directory doesn't exist, + returns False. + """ + log_print('Checking that dir for "{}" exists...'.format(pkg)) pkgdir = os.path.join(other_state["clones_dir"], pkg) if os.path.isdir(pkgdir): @@ -62,9 +79,25 @@ def ensure_pkg_dir_exists(pkg, pkg_state, other_state): elif pkg_state[pkg]["repo_path"] == "NO_REPO": log_print('"{}" does not exist, but NO_REPO specified for repo_path') return False + return False def update_pkg_dir(pkg, pkg_state, other_state): + """Updates the pkg by invoking "git pull". + + If "git pull" failes, it is retried after invoking "git restore .". + + If the local working directory fails to update via "git pull" (or if some + other handling of the local repository fails), then this function returns + (False, False). + + (True, True) is returned if "skip_branch_up_to_date" is True for the package + and the local repository is up to date. + + (True, False) is returned by default (successful "git pull"; regardless of + if an update was fetched). + """ + log_print('Making sure pkg dir for "{}" is up to date...'.format(pkg)) pkgdir = os.path.join(other_state["clones_dir"], pkg) @@ -109,7 +142,7 @@ def update_pkg_dir(pkg, pkg_state, other_state): return False, False # get remote that current branch is tracking - selected_remote = "" + selected_remote = None try: result = subprocess.run( ["git", "status", "-sb", "--porcelain"], @@ -118,26 +151,25 @@ def update_pkg_dir(pkg, pkg_state, other_state): capture_output=True, encoding="UTF-8", ) - for remote in remotes: - if ( - len(remote.strip()) > 0 - and result.stdout.find(remote.strip()) != -1 - ): - selected_remote = remote.strip() - break + result_lines = result.stdout.split(sep="\n") + for matching_line in filter(lambda s: s.startswith("##"), result_lines): + for remote in map(lambda r: r.strip(), remotes): + if matching_line.find(remote) != -1: + selected_remote = remote + break except subprocess.CalledProcessError: log_print( f'ERROR: Failed to update pkg dir of "{pkg}" (getting branch\'s remote).' ) return False, False - if len(selected_remote) == 0: + if selected_remote is None or not isinstance(selected_remote, str): log_print( f'ERROR: Failed to update pkg dir of "{pkg}" (getting branch\'s remote).' ) return False, False # get hash of current branch - current_branch_hash = "" + current_branch_hash = None try: result = subprocess.run( ["git", "log", "-1", "--format=format:%H"], @@ -152,14 +184,14 @@ def update_pkg_dir(pkg, pkg_state, other_state): f'ERROR: Failed to update pkg dir of "{pkg}" (getting current branch\'s hash).' ) return False, False - if len(current_branch_hash.strip()) == 0: + if current_branch_hash is None or not isinstance(current_branch_hash, str): log_print( f'ERROR: Failed to update pkg dir of "{pkg}" (getting current branch\'s hash).' ) return False, False # get hash of remote branch - remote_branch_hash = "" + remote_branch_hash = None try: result = subprocess.run( ["git", "log", "-1", "--format=format:%H", selected_remote], @@ -174,7 +206,7 @@ def update_pkg_dir(pkg, pkg_state, other_state): f'ERROR: Failed to update pkg dir of "{pkg}" (getting remote branch\'s hash).' ) return False, False - if len(remote_branch_hash.strip()) == 0: + if remote_branch_hash is None or not isinstance(remote_branch_hash, str): log_print( f'ERROR: Failed to update pkg dir of "{pkg}" (getting remote branch\'s hash).' ) @@ -187,7 +219,7 @@ def update_pkg_dir(pkg, pkg_state, other_state): except subprocess.CalledProcessError: try: subprocess.run( - ["git", "checkout", "--", "*"], + ["git", "restore", "."], check=True, cwd=pkgdir, ) @@ -209,7 +241,10 @@ def update_pkg_dir(pkg, pkg_state, other_state): def check_pkg_build(pkg, pkg_state, other_state, editor): - """Returns "ok", "not_ok", "abort", or "force_build".""" + """Opens the PKGBUILD in the editor, then prompts the user for an action. + + Returns "ok", "not_ok", "abort", or "force_build".""" + pkgdir = os.path.join(other_state["clones_dir"], pkg) log_print('Checking PKGBUILD for "{}"...'.format(pkg)) try: @@ -243,7 +278,11 @@ def check_pkg_build(pkg, pkg_state, other_state, editor): def check_pkg_version(pkg, pkg_state, repo, force_check_srcinfo, other_state): - """Returns "fail", "install", or "done".""" + """Gets the installed version and pkg version and checks them. + + Returns "fail" (on failure), "install" (pkg is newer), or "done" + (installed pkg is up to date).""" + status, current_epoch, current_version = get_pkg_current_version( pkg, pkg_state, repo ) @@ -275,7 +314,14 @@ def check_pkg_version(pkg, pkg_state, repo, force_check_srcinfo, other_state): def get_srcinfo_version(pkg, other_state): - """Returns (success_bool, pkgepoch, pkgver, pkgrel)""" + """Parses .SRCINFO for verison information. + + Returns (success_bool, pkgepoch, pkgver, pkgrel) + + When "success_bool" is False, all other values are None. + + Otherwise, all other values are str (or None if not found).""" + if not os.path.exists( os.path.join(other_state["clones_dir"], pkg, ".SRCINFO") ): @@ -284,9 +330,9 @@ def get_srcinfo_version(pkg, other_state): pkgver_reprog = re.compile("^\\s*pkgver\\s*=\\s*([a-zA-Z0-9._+-]+)\\s*$") pkgrel_reprog = re.compile("^\\s*pkgrel\\s*=\\s*([0-9.]+)\\s*$") pkgepoch_reprog = re.compile("^\\s*epoch\\s*=\\s*([0-9]+)\\s*$") - pkgver = "" - pkgrel = "" - pkgepoch = "" + pkgver = None + pkgrel = None + pkgepoch = None with open( os.path.join(other_state["clones_dir"], pkg, ".SRCINFO"), encoding="UTF-8", @@ -307,7 +353,15 @@ def get_srcinfo_version(pkg, other_state): def get_pkgbuild_version(pkg, force_check_srcinfo, pkg_state, other_state): - """Returns (success, epoch, version, release)""" + """Gets the version of the pkg from .SRCINFO or PKGBUILD. + + Returns (success, epoch, version, release). + + If "success" is False, then all other values are None. + + Otherwise, version and release should be a str type, but epoch may be + None.""" + pkgdir = os.path.join(other_state["clones_dir"], pkg) log_print(f'Getting version of "{pkg}"...') while True and not force_check_srcinfo: @@ -368,9 +422,9 @@ def get_pkgbuild_version(pkg, force_check_srcinfo, pkg_state, other_state): if os.path.exists(os.path.join(pkgdir, "src")): shutil.rmtree(os.path.join(pkgdir, "src")) - pkgepoch = "" - pkgver = "" - pkgrel = "" + pkgepoch = None + pkgver = None + pkgrel = None # TODO maybe sandbox sourcing the PKGBUILD pkgbuild_output = subprocess.run( @@ -401,13 +455,6 @@ def get_pkgbuild_version(pkg, force_check_srcinfo, pkg_state, other_state): log_print("ERROR: Unreachable code") return False, None, None, None - if len(pkgepoch) == 0: - pkgepoch = None - if len(pkgver) == 0: - pkgver = None - if len(pkgrel) == 0: - pkgrel = None - if pkgver is not None and pkgrel is not None: return True, pkgepoch, pkgver, pkgrel else: @@ -423,6 +470,14 @@ def get_srcinfo_check_result( pkg_state, other_state, ): + """Checks the version of the pkg against the currently installed version. + + Returns "install" if the version of the pkg is newer. + + Otherwise returns "done" if the version is not newer. + + Returns "fail" on error.""" + ver_success, pkgepoch, pkgver, pkgrel = get_pkgbuild_version( pkg, force_check_srcinfo, pkg_state, other_state ) @@ -481,7 +536,19 @@ def get_srcinfo_check_result( def get_pkg_current_version(pkg, pkg_state, repo): - """Returns (status, epoch, version)""" + """Fetches the version info and returns status of fetching and the version. + + Returns (status, epoch, version) + + "status" may be one of: + "fail", "install", "fetched" + + "epoch" may be None. + + "version" must be a str if "status" is "fetched". Otherwise, it is None if + "status" is "fail" or "install". + """ + log_print( 'Checking version of installed pkg "{}"...'.format( pkg_state[pkg]["pkg_name"] @@ -526,6 +593,9 @@ def get_pkg_current_version(pkg, pkg_state, repo): def get_sudo_privileges(): + """Starts a bash loop that ensures sudo privileges are ready while this + script is active.""" + global SUDO_PROC if not SUDO_PROC: log_print("sudo -v") @@ -542,10 +612,17 @@ def get_sudo_privileges(): def cleanup_sudo(sudo_proc): + """Stops the bash loop keeping sudo privileges.""" + sudo_proc.terminate() def create_executable_script(dest_filename, script_contents): + """Creates a script to be run with sudo to set up a custom command. + + This is currently used to set up sccache by placing custom commands in + "/usr/local/bin" for gcc and friends.""" + tempf_name = "unknown" with tempfile.NamedTemporaryFile( mode="w", encoding="utf-8", delete=False @@ -596,6 +673,8 @@ if __name__ == '__main__': def setup_ccache(chroot): + """Sets up the chroot for ccache.""" + # set up ccache stuff try: subprocess.run( @@ -614,6 +693,8 @@ def setup_ccache(chroot): def cleanup_ccache(chroot): + """Unsets up the chroot for ccache.""" + # cleanup ccache stuff try: subprocess.run( @@ -632,6 +713,8 @@ def cleanup_ccache(chroot): def setup_sccache(chroot): + """Sets up sccache for the chroot.""" + sccache_script = """#!/usr/bin/env sh export PATH=${PATH/:\/usr\/local\/bin/} /usr/bin/env sccache $(basename "$0") "$@" @@ -658,6 +741,8 @@ export PATH=${PATH/:\/usr\/local\/bin/} def cleanup_sccache(chroot): + """Unsets up sccache for the chroot.""" + # cleanup sccache stuff try: subprocess.run( @@ -686,6 +771,9 @@ def update_pkg_list( signing_gpg_pass, no_store, ): + """For each package to build: builds it, signs it, and moves it to + "pkg_out_dir".""" + if not get_sudo_privileges(): log_print("ERROR: Failed to get sudo privileges") sys.exit(1) @@ -898,6 +986,9 @@ def update_pkg_list( def get_latest_pkg(pkg, cache_dir): + """Gets the latest pkg from the specified "cache_dir" and return its + filename.""" + globbed = glob.glob(os.path.join(cache_dir, pkg + "*")) if len(globbed) > 0: globbed.sort() @@ -916,7 +1007,11 @@ def get_latest_pkg(pkg, cache_dir): def confirm_result(pkg, state_result): - """Returns "continue", "recheck", "force_build", or "abort".""" + """Prompts the user the action to take for a pkg after checking its + PKGBUILD. + + Returns "continue", "recheck", "force_build", "skip", "back", or "abort".""" + while True: log_print( 'Got "{}" for pkg "{}", action: [C(ontinue), r(echeck), f(orce build),\ @@ -943,6 +1038,8 @@ def confirm_result(pkg, state_result): def print_state_info_and_get_update_list(pkg_state): + """Prints the current "checked" state of all pkgs in the config.""" + to_update = [] log_print("package state:") for (pkg_name, pkg_dict) in pkg_state.items(): @@ -956,6 +1053,8 @@ def print_state_info_and_get_update_list(pkg_state): def test_gpg_passphrase(signing_gpg_dir, signing_key_fp, passphrase): + """Checks if the given gpg passphrase works with the gpg signing key.""" + with tempfile.NamedTemporaryFile() as tempnf: tempnf.write(b"Test file content") tempnf.flush() @@ -987,6 +1086,8 @@ def test_gpg_passphrase(signing_gpg_dir, signing_key_fp, passphrase): def validate_and_verify_paths(other_state): + """Checks and validates/ensures that certain directories exist.""" + if not os.path.exists(other_state["chroot"]): log_print( f"ERROR: chroot at \"{other_state['chroot']}\" does not exist"