]> git.seodisparate.com - AnotherAURHelper/commitdiff
Add documentation and do some refactoring
authorStephen Seo <seo.disparate@gmail.com>
Tue, 6 Sep 2022 03:12:11 +0000 (12:12 +0900)
committerStephen Seo <seo.disparate@gmail.com>
Tue, 6 Sep 2022 03:13:16 +0000 (12:13 +0900)
update.py

index 935d7ff6e3770f6e1c5ed12c4124d9f6fe9fe3f9..4ef0163716fd3c439b6c482b7ea74ec824e44c3a 100755 (executable)
--- a/update.py
+++ b/update.py
@@ -26,6 +26,8 @@ DEFAULT_EDITOR = "/usr/bin/nano"
 
 
 def log_print(*args, **kwargs):
+    """Prints to stdout, then logs to GLOBAL_LOG_FILE."""
+
     if "file" in kwargs:
         kwargs["file"] = sys.stdout
     print(*args, **kwargs)
@@ -35,6 +37,21 @@ def log_print(*args, **kwargs):
 
 
 def ensure_pkg_dir_exists(pkg, pkg_state, other_state):
+    """Ensures that an AUR-pkg-dir exists, returning False on failure.
+
+    If no such directory exists, this script attempts to clone it from the AUR.
+    True is returned on successful cloning.
+
+    If a file exists with the same name, returns False.
+
+    If a directory exists with the same name, returns True.
+
+    If "repo_path" is specified and the directory doesn't exist, returns False.
+
+    If "NO_REPO" is specified as "repo_path" and the directory doesn't exist,
+    returns False.
+    """
+
     log_print('Checking that dir for "{}" exists...'.format(pkg))
     pkgdir = os.path.join(other_state["clones_dir"], pkg)
     if os.path.isdir(pkgdir):
@@ -62,9 +79,25 @@ def ensure_pkg_dir_exists(pkg, pkg_state, other_state):
     elif pkg_state[pkg]["repo_path"] == "NO_REPO":
         log_print('"{}" does not exist, but NO_REPO specified for repo_path')
         return False
+    return False
 
 
 def update_pkg_dir(pkg, pkg_state, other_state):
+    """Updates the pkg by invoking "git pull".
+
+    If "git pull" failes, it is retried after invoking "git restore .".
+
+    If the local working directory fails to update via "git pull" (or if some
+    other handling of the local repository fails), then this function returns
+    (False, False).
+
+    (True, True) is returned if "skip_branch_up_to_date" is True for the package
+    and the local repository is up to date.
+
+    (True, False) is returned by default (successful "git pull"; regardless of
+    if an update was fetched).
+    """
+
     log_print('Making sure pkg dir for "{}" is up to date...'.format(pkg))
 
     pkgdir = os.path.join(other_state["clones_dir"], pkg)
@@ -109,7 +142,7 @@ def update_pkg_dir(pkg, pkg_state, other_state):
         return False, False
 
     # get remote that current branch is tracking
-    selected_remote = ""
+    selected_remote = None
     try:
         result = subprocess.run(
             ["git", "status", "-sb", "--porcelain"],
@@ -118,26 +151,25 @@ def update_pkg_dir(pkg, pkg_state, other_state):
             capture_output=True,
             encoding="UTF-8",
         )
-        for remote in remotes:
-            if (
-                len(remote.strip()) > 0
-                and result.stdout.find(remote.strip()) != -1
-            ):
-                selected_remote = remote.strip()
-                break
+        result_lines = result.stdout.split(sep="\n")
+        for matching_line in filter(lambda s: s.startswith("##"), result_lines):
+            for remote in map(lambda r: r.strip(), remotes):
+                if matching_line.find(remote) != -1:
+                    selected_remote = remote
+                    break
     except subprocess.CalledProcessError:
         log_print(
             f'ERROR: Failed to update pkg dir of "{pkg}" (getting branch\'s remote).'
         )
         return False, False
-    if len(selected_remote) == 0:
+    if selected_remote is None or not isinstance(selected_remote, str):
         log_print(
             f'ERROR: Failed to update pkg dir of "{pkg}" (getting branch\'s remote).'
         )
         return False, False
 
     # get hash of current branch
-    current_branch_hash = ""
+    current_branch_hash = None
     try:
         result = subprocess.run(
             ["git", "log", "-1", "--format=format:%H"],
@@ -152,14 +184,14 @@ def update_pkg_dir(pkg, pkg_state, other_state):
             f'ERROR: Failed to update pkg dir of "{pkg}" (getting current branch\'s hash).'
         )
         return False, False
-    if len(current_branch_hash.strip()) == 0:
+    if current_branch_hash is None or not isinstance(current_branch_hash, str):
         log_print(
             f'ERROR: Failed to update pkg dir of "{pkg}" (getting current branch\'s hash).'
         )
         return False, False
 
     # get hash of remote branch
-    remote_branch_hash = ""
+    remote_branch_hash = None
     try:
         result = subprocess.run(
             ["git", "log", "-1", "--format=format:%H", selected_remote],
@@ -174,7 +206,7 @@ def update_pkg_dir(pkg, pkg_state, other_state):
             f'ERROR: Failed to update pkg dir of "{pkg}" (getting remote branch\'s hash).'
         )
         return False, False
-    if len(remote_branch_hash.strip()) == 0:
+    if remote_branch_hash is None or not isinstance(remote_branch_hash, str):
         log_print(
             f'ERROR: Failed to update pkg dir of "{pkg}" (getting remote branch\'s hash).'
         )
@@ -187,7 +219,7 @@ def update_pkg_dir(pkg, pkg_state, other_state):
         except subprocess.CalledProcessError:
             try:
                 subprocess.run(
-                    ["git", "checkout", "--", "*"],
+                    ["git", "restore", "."],
                     check=True,
                     cwd=pkgdir,
                 )
@@ -209,7 +241,10 @@ def update_pkg_dir(pkg, pkg_state, other_state):
 
 
 def check_pkg_build(pkg, pkg_state, other_state, editor):
-    """Returns "ok", "not_ok", "abort", or "force_build"."""
+    """Opens the PKGBUILD in the editor, then prompts the user for an action.
+
+    Returns "ok", "not_ok", "abort", or "force_build"."""
+
     pkgdir = os.path.join(other_state["clones_dir"], pkg)
     log_print('Checking PKGBUILD for "{}"...'.format(pkg))
     try:
@@ -243,7 +278,11 @@ def check_pkg_build(pkg, pkg_state, other_state, editor):
 
 
 def check_pkg_version(pkg, pkg_state, repo, force_check_srcinfo, other_state):
-    """Returns "fail", "install", or "done"."""
+    """Gets the installed version and pkg version and checks them.
+
+    Returns "fail" (on failure), "install" (pkg is newer), or "done"
+    (installed pkg is up to date)."""
+
     status, current_epoch, current_version = get_pkg_current_version(
         pkg, pkg_state, repo
     )
@@ -275,7 +314,14 @@ def check_pkg_version(pkg, pkg_state, repo, force_check_srcinfo, other_state):
 
 
 def get_srcinfo_version(pkg, other_state):
-    """Returns (success_bool, pkgepoch, pkgver, pkgrel)"""
+    """Parses .SRCINFO for verison information.
+
+    Returns (success_bool, pkgepoch, pkgver, pkgrel)
+
+    When "success_bool" is False, all other values are None.
+
+    Otherwise, all other values are str (or None if not found)."""
+
     if not os.path.exists(
         os.path.join(other_state["clones_dir"], pkg, ".SRCINFO")
     ):
@@ -284,9 +330,9 @@ def get_srcinfo_version(pkg, other_state):
     pkgver_reprog = re.compile("^\\s*pkgver\\s*=\\s*([a-zA-Z0-9._+-]+)\\s*$")
     pkgrel_reprog = re.compile("^\\s*pkgrel\\s*=\\s*([0-9.]+)\\s*$")
     pkgepoch_reprog = re.compile("^\\s*epoch\\s*=\\s*([0-9]+)\\s*$")
-    pkgver = ""
-    pkgrel = ""
-    pkgepoch = ""
+    pkgver = None
+    pkgrel = None
+    pkgepoch = None
     with open(
         os.path.join(other_state["clones_dir"], pkg, ".SRCINFO"),
         encoding="UTF-8",
@@ -307,7 +353,15 @@ def get_srcinfo_version(pkg, other_state):
 
 
 def get_pkgbuild_version(pkg, force_check_srcinfo, pkg_state, other_state):
-    """Returns (success, epoch, version, release)"""
+    """Gets the version of the pkg from .SRCINFO or PKGBUILD.
+
+    Returns (success, epoch, version, release).
+
+    If "success" is False, then all other values are None.
+
+    Otherwise, version and release should be a str type, but epoch may be
+    None."""
+
     pkgdir = os.path.join(other_state["clones_dir"], pkg)
     log_print(f'Getting version of "{pkg}"...')
     while True and not force_check_srcinfo:
@@ -368,9 +422,9 @@ def get_pkgbuild_version(pkg, force_check_srcinfo, pkg_state, other_state):
 
         if os.path.exists(os.path.join(pkgdir, "src")):
             shutil.rmtree(os.path.join(pkgdir, "src"))
-        pkgepoch = ""
-        pkgver = ""
-        pkgrel = ""
+        pkgepoch = None
+        pkgver = None
+        pkgrel = None
 
         # TODO maybe sandbox sourcing the PKGBUILD
         pkgbuild_output = subprocess.run(
@@ -401,13 +455,6 @@ def get_pkgbuild_version(pkg, force_check_srcinfo, pkg_state, other_state):
         log_print("ERROR: Unreachable code")
         return False, None, None, None
 
-    if len(pkgepoch) == 0:
-        pkgepoch = None
-    if len(pkgver) == 0:
-        pkgver = None
-    if len(pkgrel) == 0:
-        pkgrel = None
-
     if pkgver is not None and pkgrel is not None:
         return True, pkgepoch, pkgver, pkgrel
     else:
@@ -423,6 +470,14 @@ def get_srcinfo_check_result(
     pkg_state,
     other_state,
 ):
+    """Checks the version of the pkg against the currently installed version.
+
+    Returns "install" if the version of the pkg is newer.
+
+    Otherwise returns "done" if the version is not newer.
+
+    Returns "fail" on error."""
+
     ver_success, pkgepoch, pkgver, pkgrel = get_pkgbuild_version(
         pkg, force_check_srcinfo, pkg_state, other_state
     )
@@ -481,7 +536,19 @@ def get_srcinfo_check_result(
 
 
 def get_pkg_current_version(pkg, pkg_state, repo):
-    """Returns (status, epoch, version)"""
+    """Fetches the version info and returns status of fetching and the version.
+
+    Returns (status, epoch, version)
+
+    "status" may be one of:
+        "fail", "install", "fetched"
+
+    "epoch" may be None.
+
+    "version" must be a str if "status" is "fetched". Otherwise, it is None if
+    "status" is "fail" or "install".
+    """
+
     log_print(
         'Checking version of installed pkg "{}"...'.format(
             pkg_state[pkg]["pkg_name"]
@@ -526,6 +593,9 @@ def get_pkg_current_version(pkg, pkg_state, repo):
 
 
 def get_sudo_privileges():
+    """Starts a bash loop that ensures sudo privileges are ready while this
+    script is active."""
+
     global SUDO_PROC
     if not SUDO_PROC:
         log_print("sudo -v")
@@ -542,10 +612,17 @@ def get_sudo_privileges():
 
 
 def cleanup_sudo(sudo_proc):
+    """Stops the bash loop keeping sudo privileges."""
+
     sudo_proc.terminate()
 
 
 def create_executable_script(dest_filename, script_contents):
+    """Creates a script to be run with sudo to set up a custom command.
+
+    This is currently used to set up sccache by placing custom commands in
+    "/usr/local/bin" for gcc and friends."""
+
     tempf_name = "unknown"
     with tempfile.NamedTemporaryFile(
         mode="w", encoding="utf-8", delete=False
@@ -596,6 +673,8 @@ if __name__ == '__main__':
 
 
 def setup_ccache(chroot):
+    """Sets up the chroot for ccache."""
+
     # set up ccache stuff
     try:
         subprocess.run(
@@ -614,6 +693,8 @@ def setup_ccache(chroot):
 
 
 def cleanup_ccache(chroot):
+    """Unsets up the chroot for ccache."""
+
     # cleanup ccache stuff
     try:
         subprocess.run(
@@ -632,6 +713,8 @@ def cleanup_ccache(chroot):
 
 
 def setup_sccache(chroot):
+    """Sets up sccache for the chroot."""
+
     sccache_script = """#!/usr/bin/env sh
 export PATH=${PATH/:\/usr\/local\/bin/}
 /usr/bin/env sccache $(basename "$0") "$@"
@@ -658,6 +741,8 @@ export PATH=${PATH/:\/usr\/local\/bin/}
 
 
 def cleanup_sccache(chroot):
+    """Unsets up sccache for the chroot."""
+
     # cleanup sccache stuff
     try:
         subprocess.run(
@@ -686,6 +771,9 @@ def update_pkg_list(
     signing_gpg_pass,
     no_store,
 ):
+    """For each package to build: builds it, signs it, and moves it to
+    "pkg_out_dir"."""
+
     if not get_sudo_privileges():
         log_print("ERROR: Failed to get sudo privileges")
         sys.exit(1)
@@ -898,6 +986,9 @@ def update_pkg_list(
 
 
 def get_latest_pkg(pkg, cache_dir):
+    """Gets the latest pkg from the specified "cache_dir" and return its
+    filename."""
+
     globbed = glob.glob(os.path.join(cache_dir, pkg + "*"))
     if len(globbed) > 0:
         globbed.sort()
@@ -916,7 +1007,11 @@ def get_latest_pkg(pkg, cache_dir):
 
 
 def confirm_result(pkg, state_result):
-    """Returns "continue", "recheck", "force_build", or "abort"."""
+    """Prompts the user the action to take for a pkg after checking its
+    PKGBUILD.
+
+    Returns "continue", "recheck", "force_build", "skip", "back", or "abort"."""
+
     while True:
         log_print(
             'Got "{}" for pkg "{}", action: [C(ontinue), r(echeck), f(orce build),\
@@ -943,6 +1038,8 @@ def confirm_result(pkg, state_result):
 
 
 def print_state_info_and_get_update_list(pkg_state):
+    """Prints the current "checked" state of all pkgs in the config."""
+
     to_update = []
     log_print("package state:")
     for (pkg_name, pkg_dict) in pkg_state.items():
@@ -956,6 +1053,8 @@ def print_state_info_and_get_update_list(pkg_state):
 
 
 def test_gpg_passphrase(signing_gpg_dir, signing_key_fp, passphrase):
+    """Checks if the given gpg passphrase works with the gpg signing key."""
+
     with tempfile.NamedTemporaryFile() as tempnf:
         tempnf.write(b"Test file content")
         tempnf.flush()
@@ -987,6 +1086,8 @@ def test_gpg_passphrase(signing_gpg_dir, signing_key_fp, passphrase):
 
 
 def validate_and_verify_paths(other_state):
+    """Checks and validates/ensures that certain directories exist."""
+
     if not os.path.exists(other_state["chroot"]):
         log_print(
             f"ERROR: chroot at \"{other_state['chroot']}\" does not exist"