Add documentation and do some refactoring

This commit is contained in:
Stephen Seo 2022-09-06 12:12:11 +09:00
parent 56e6ec3deb
commit ad1f4f7ad5

167
update.py
View file

@ -26,6 +26,8 @@ DEFAULT_EDITOR = "/usr/bin/nano"
def log_print(*args, **kwargs): def log_print(*args, **kwargs):
"""Prints to stdout, then logs to GLOBAL_LOG_FILE."""
if "file" in kwargs: if "file" in kwargs:
kwargs["file"] = sys.stdout kwargs["file"] = sys.stdout
print(*args, **kwargs) print(*args, **kwargs)
@ -35,6 +37,21 @@ def log_print(*args, **kwargs):
def ensure_pkg_dir_exists(pkg, pkg_state, other_state): def ensure_pkg_dir_exists(pkg, pkg_state, other_state):
"""Ensures that an AUR-pkg-dir exists, returning False on failure.
If no such directory exists, this script attempts to clone it from the AUR.
True is returned on successful cloning.
If a file exists with the same name, returns False.
If a directory exists with the same name, returns True.
If "repo_path" is specified and the directory doesn't exist, returns False.
If "NO_REPO" is specified as "repo_path" and the directory doesn't exist,
returns False.
"""
log_print('Checking that dir for "{}" exists...'.format(pkg)) log_print('Checking that dir for "{}" exists...'.format(pkg))
pkgdir = os.path.join(other_state["clones_dir"], pkg) pkgdir = os.path.join(other_state["clones_dir"], pkg)
if os.path.isdir(pkgdir): if os.path.isdir(pkgdir):
@ -62,9 +79,25 @@ def ensure_pkg_dir_exists(pkg, pkg_state, other_state):
elif pkg_state[pkg]["repo_path"] == "NO_REPO": elif pkg_state[pkg]["repo_path"] == "NO_REPO":
log_print('"{}" does not exist, but NO_REPO specified for repo_path') log_print('"{}" does not exist, but NO_REPO specified for repo_path')
return False return False
return False
def update_pkg_dir(pkg, pkg_state, other_state): def update_pkg_dir(pkg, pkg_state, other_state):
"""Updates the pkg by invoking "git pull".
If "git pull" failes, it is retried after invoking "git restore .".
If the local working directory fails to update via "git pull" (or if some
other handling of the local repository fails), then this function returns
(False, False).
(True, True) is returned if "skip_branch_up_to_date" is True for the package
and the local repository is up to date.
(True, False) is returned by default (successful "git pull"; regardless of
if an update was fetched).
"""
log_print('Making sure pkg dir for "{}" is up to date...'.format(pkg)) log_print('Making sure pkg dir for "{}" is up to date...'.format(pkg))
pkgdir = os.path.join(other_state["clones_dir"], pkg) pkgdir = os.path.join(other_state["clones_dir"], pkg)
@ -109,7 +142,7 @@ def update_pkg_dir(pkg, pkg_state, other_state):
return False, False return False, False
# get remote that current branch is tracking # get remote that current branch is tracking
selected_remote = "" selected_remote = None
try: try:
result = subprocess.run( result = subprocess.run(
["git", "status", "-sb", "--porcelain"], ["git", "status", "-sb", "--porcelain"],
@ -118,26 +151,25 @@ def update_pkg_dir(pkg, pkg_state, other_state):
capture_output=True, capture_output=True,
encoding="UTF-8", encoding="UTF-8",
) )
for remote in remotes: result_lines = result.stdout.split(sep="\n")
if ( for matching_line in filter(lambda s: s.startswith("##"), result_lines):
len(remote.strip()) > 0 for remote in map(lambda r: r.strip(), remotes):
and result.stdout.find(remote.strip()) != -1 if matching_line.find(remote) != -1:
): selected_remote = remote
selected_remote = remote.strip() break
break
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
log_print( log_print(
f'ERROR: Failed to update pkg dir of "{pkg}" (getting branch\'s remote).' f'ERROR: Failed to update pkg dir of "{pkg}" (getting branch\'s remote).'
) )
return False, False return False, False
if len(selected_remote) == 0: if selected_remote is None or not isinstance(selected_remote, str):
log_print( log_print(
f'ERROR: Failed to update pkg dir of "{pkg}" (getting branch\'s remote).' f'ERROR: Failed to update pkg dir of "{pkg}" (getting branch\'s remote).'
) )
return False, False return False, False
# get hash of current branch # get hash of current branch
current_branch_hash = "" current_branch_hash = None
try: try:
result = subprocess.run( result = subprocess.run(
["git", "log", "-1", "--format=format:%H"], ["git", "log", "-1", "--format=format:%H"],
@ -152,14 +184,14 @@ def update_pkg_dir(pkg, pkg_state, other_state):
f'ERROR: Failed to update pkg dir of "{pkg}" (getting current branch\'s hash).' f'ERROR: Failed to update pkg dir of "{pkg}" (getting current branch\'s hash).'
) )
return False, False return False, False
if len(current_branch_hash.strip()) == 0: if current_branch_hash is None or not isinstance(current_branch_hash, str):
log_print( log_print(
f'ERROR: Failed to update pkg dir of "{pkg}" (getting current branch\'s hash).' f'ERROR: Failed to update pkg dir of "{pkg}" (getting current branch\'s hash).'
) )
return False, False return False, False
# get hash of remote branch # get hash of remote branch
remote_branch_hash = "" remote_branch_hash = None
try: try:
result = subprocess.run( result = subprocess.run(
["git", "log", "-1", "--format=format:%H", selected_remote], ["git", "log", "-1", "--format=format:%H", selected_remote],
@ -174,7 +206,7 @@ def update_pkg_dir(pkg, pkg_state, other_state):
f'ERROR: Failed to update pkg dir of "{pkg}" (getting remote branch\'s hash).' f'ERROR: Failed to update pkg dir of "{pkg}" (getting remote branch\'s hash).'
) )
return False, False return False, False
if len(remote_branch_hash.strip()) == 0: if remote_branch_hash is None or not isinstance(remote_branch_hash, str):
log_print( log_print(
f'ERROR: Failed to update pkg dir of "{pkg}" (getting remote branch\'s hash).' f'ERROR: Failed to update pkg dir of "{pkg}" (getting remote branch\'s hash).'
) )
@ -187,7 +219,7 @@ def update_pkg_dir(pkg, pkg_state, other_state):
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
try: try:
subprocess.run( subprocess.run(
["git", "checkout", "--", "*"], ["git", "restore", "."],
check=True, check=True,
cwd=pkgdir, cwd=pkgdir,
) )
@ -209,7 +241,10 @@ def update_pkg_dir(pkg, pkg_state, other_state):
def check_pkg_build(pkg, pkg_state, other_state, editor): def check_pkg_build(pkg, pkg_state, other_state, editor):
"""Returns "ok", "not_ok", "abort", or "force_build".""" """Opens the PKGBUILD in the editor, then prompts the user for an action.
Returns "ok", "not_ok", "abort", or "force_build"."""
pkgdir = os.path.join(other_state["clones_dir"], pkg) pkgdir = os.path.join(other_state["clones_dir"], pkg)
log_print('Checking PKGBUILD for "{}"...'.format(pkg)) log_print('Checking PKGBUILD for "{}"...'.format(pkg))
try: try:
@ -243,7 +278,11 @@ def check_pkg_build(pkg, pkg_state, other_state, editor):
def check_pkg_version(pkg, pkg_state, repo, force_check_srcinfo, other_state): def check_pkg_version(pkg, pkg_state, repo, force_check_srcinfo, other_state):
"""Returns "fail", "install", or "done".""" """Gets the installed version and pkg version and checks them.
Returns "fail" (on failure), "install" (pkg is newer), or "done"
(installed pkg is up to date)."""
status, current_epoch, current_version = get_pkg_current_version( status, current_epoch, current_version = get_pkg_current_version(
pkg, pkg_state, repo pkg, pkg_state, repo
) )
@ -275,7 +314,14 @@ def check_pkg_version(pkg, pkg_state, repo, force_check_srcinfo, other_state):
def get_srcinfo_version(pkg, other_state): def get_srcinfo_version(pkg, other_state):
"""Returns (success_bool, pkgepoch, pkgver, pkgrel)""" """Parses .SRCINFO for verison information.
Returns (success_bool, pkgepoch, pkgver, pkgrel)
When "success_bool" is False, all other values are None.
Otherwise, all other values are str (or None if not found)."""
if not os.path.exists( if not os.path.exists(
os.path.join(other_state["clones_dir"], pkg, ".SRCINFO") os.path.join(other_state["clones_dir"], pkg, ".SRCINFO")
): ):
@ -284,9 +330,9 @@ def get_srcinfo_version(pkg, other_state):
pkgver_reprog = re.compile("^\\s*pkgver\\s*=\\s*([a-zA-Z0-9._+-]+)\\s*$") pkgver_reprog = re.compile("^\\s*pkgver\\s*=\\s*([a-zA-Z0-9._+-]+)\\s*$")
pkgrel_reprog = re.compile("^\\s*pkgrel\\s*=\\s*([0-9.]+)\\s*$") pkgrel_reprog = re.compile("^\\s*pkgrel\\s*=\\s*([0-9.]+)\\s*$")
pkgepoch_reprog = re.compile("^\\s*epoch\\s*=\\s*([0-9]+)\\s*$") pkgepoch_reprog = re.compile("^\\s*epoch\\s*=\\s*([0-9]+)\\s*$")
pkgver = "" pkgver = None
pkgrel = "" pkgrel = None
pkgepoch = "" pkgepoch = None
with open( with open(
os.path.join(other_state["clones_dir"], pkg, ".SRCINFO"), os.path.join(other_state["clones_dir"], pkg, ".SRCINFO"),
encoding="UTF-8", encoding="UTF-8",
@ -307,7 +353,15 @@ def get_srcinfo_version(pkg, other_state):
def get_pkgbuild_version(pkg, force_check_srcinfo, pkg_state, other_state): def get_pkgbuild_version(pkg, force_check_srcinfo, pkg_state, other_state):
"""Returns (success, epoch, version, release)""" """Gets the version of the pkg from .SRCINFO or PKGBUILD.
Returns (success, epoch, version, release).
If "success" is False, then all other values are None.
Otherwise, version and release should be a str type, but epoch may be
None."""
pkgdir = os.path.join(other_state["clones_dir"], pkg) pkgdir = os.path.join(other_state["clones_dir"], pkg)
log_print(f'Getting version of "{pkg}"...') log_print(f'Getting version of "{pkg}"...')
while True and not force_check_srcinfo: while True and not force_check_srcinfo:
@ -368,9 +422,9 @@ def get_pkgbuild_version(pkg, force_check_srcinfo, pkg_state, other_state):
if os.path.exists(os.path.join(pkgdir, "src")): if os.path.exists(os.path.join(pkgdir, "src")):
shutil.rmtree(os.path.join(pkgdir, "src")) shutil.rmtree(os.path.join(pkgdir, "src"))
pkgepoch = "" pkgepoch = None
pkgver = "" pkgver = None
pkgrel = "" pkgrel = None
# TODO maybe sandbox sourcing the PKGBUILD # TODO maybe sandbox sourcing the PKGBUILD
pkgbuild_output = subprocess.run( pkgbuild_output = subprocess.run(
@ -401,13 +455,6 @@ def get_pkgbuild_version(pkg, force_check_srcinfo, pkg_state, other_state):
log_print("ERROR: Unreachable code") log_print("ERROR: Unreachable code")
return False, None, None, None return False, None, None, None
if len(pkgepoch) == 0:
pkgepoch = None
if len(pkgver) == 0:
pkgver = None
if len(pkgrel) == 0:
pkgrel = None
if pkgver is not None and pkgrel is not None: if pkgver is not None and pkgrel is not None:
return True, pkgepoch, pkgver, pkgrel return True, pkgepoch, pkgver, pkgrel
else: else:
@ -423,6 +470,14 @@ def get_srcinfo_check_result(
pkg_state, pkg_state,
other_state, other_state,
): ):
"""Checks the version of the pkg against the currently installed version.
Returns "install" if the version of the pkg is newer.
Otherwise returns "done" if the version is not newer.
Returns "fail" on error."""
ver_success, pkgepoch, pkgver, pkgrel = get_pkgbuild_version( ver_success, pkgepoch, pkgver, pkgrel = get_pkgbuild_version(
pkg, force_check_srcinfo, pkg_state, other_state pkg, force_check_srcinfo, pkg_state, other_state
) )
@ -481,7 +536,19 @@ def get_srcinfo_check_result(
def get_pkg_current_version(pkg, pkg_state, repo): def get_pkg_current_version(pkg, pkg_state, repo):
"""Returns (status, epoch, version)""" """Fetches the version info and returns status of fetching and the version.
Returns (status, epoch, version)
"status" may be one of:
"fail", "install", "fetched"
"epoch" may be None.
"version" must be a str if "status" is "fetched". Otherwise, it is None if
"status" is "fail" or "install".
"""
log_print( log_print(
'Checking version of installed pkg "{}"...'.format( 'Checking version of installed pkg "{}"...'.format(
pkg_state[pkg]["pkg_name"] pkg_state[pkg]["pkg_name"]
@ -526,6 +593,9 @@ def get_pkg_current_version(pkg, pkg_state, repo):
def get_sudo_privileges(): def get_sudo_privileges():
"""Starts a bash loop that ensures sudo privileges are ready while this
script is active."""
global SUDO_PROC global SUDO_PROC
if not SUDO_PROC: if not SUDO_PROC:
log_print("sudo -v") log_print("sudo -v")
@ -542,10 +612,17 @@ def get_sudo_privileges():
def cleanup_sudo(sudo_proc): def cleanup_sudo(sudo_proc):
"""Stops the bash loop keeping sudo privileges."""
sudo_proc.terminate() sudo_proc.terminate()
def create_executable_script(dest_filename, script_contents): def create_executable_script(dest_filename, script_contents):
"""Creates a script to be run with sudo to set up a custom command.
This is currently used to set up sccache by placing custom commands in
"/usr/local/bin" for gcc and friends."""
tempf_name = "unknown" tempf_name = "unknown"
with tempfile.NamedTemporaryFile( with tempfile.NamedTemporaryFile(
mode="w", encoding="utf-8", delete=False mode="w", encoding="utf-8", delete=False
@ -596,6 +673,8 @@ if __name__ == '__main__':
def setup_ccache(chroot): def setup_ccache(chroot):
"""Sets up the chroot for ccache."""
# set up ccache stuff # set up ccache stuff
try: try:
subprocess.run( subprocess.run(
@ -614,6 +693,8 @@ def setup_ccache(chroot):
def cleanup_ccache(chroot): def cleanup_ccache(chroot):
"""Unsets up the chroot for ccache."""
# cleanup ccache stuff # cleanup ccache stuff
try: try:
subprocess.run( subprocess.run(
@ -632,6 +713,8 @@ def cleanup_ccache(chroot):
def setup_sccache(chroot): def setup_sccache(chroot):
"""Sets up sccache for the chroot."""
sccache_script = """#!/usr/bin/env sh sccache_script = """#!/usr/bin/env sh
export PATH=${PATH/:\/usr\/local\/bin/} export PATH=${PATH/:\/usr\/local\/bin/}
/usr/bin/env sccache $(basename "$0") "$@" /usr/bin/env sccache $(basename "$0") "$@"
@ -658,6 +741,8 @@ export PATH=${PATH/:\/usr\/local\/bin/}
def cleanup_sccache(chroot): def cleanup_sccache(chroot):
"""Unsets up sccache for the chroot."""
# cleanup sccache stuff # cleanup sccache stuff
try: try:
subprocess.run( subprocess.run(
@ -686,6 +771,9 @@ def update_pkg_list(
signing_gpg_pass, signing_gpg_pass,
no_store, no_store,
): ):
"""For each package to build: builds it, signs it, and moves it to
"pkg_out_dir"."""
if not get_sudo_privileges(): if not get_sudo_privileges():
log_print("ERROR: Failed to get sudo privileges") log_print("ERROR: Failed to get sudo privileges")
sys.exit(1) sys.exit(1)
@ -898,6 +986,9 @@ def update_pkg_list(
def get_latest_pkg(pkg, cache_dir): def get_latest_pkg(pkg, cache_dir):
"""Gets the latest pkg from the specified "cache_dir" and return its
filename."""
globbed = glob.glob(os.path.join(cache_dir, pkg + "*")) globbed = glob.glob(os.path.join(cache_dir, pkg + "*"))
if len(globbed) > 0: if len(globbed) > 0:
globbed.sort() globbed.sort()
@ -916,7 +1007,11 @@ def get_latest_pkg(pkg, cache_dir):
def confirm_result(pkg, state_result): def confirm_result(pkg, state_result):
"""Returns "continue", "recheck", "force_build", or "abort".""" """Prompts the user the action to take for a pkg after checking its
PKGBUILD.
Returns "continue", "recheck", "force_build", "skip", "back", or "abort"."""
while True: while True:
log_print( log_print(
'Got "{}" for pkg "{}", action: [C(ontinue), r(echeck), f(orce build),\ 'Got "{}" for pkg "{}", action: [C(ontinue), r(echeck), f(orce build),\
@ -943,6 +1038,8 @@ def confirm_result(pkg, state_result):
def print_state_info_and_get_update_list(pkg_state): def print_state_info_and_get_update_list(pkg_state):
"""Prints the current "checked" state of all pkgs in the config."""
to_update = [] to_update = []
log_print("package state:") log_print("package state:")
for (pkg_name, pkg_dict) in pkg_state.items(): for (pkg_name, pkg_dict) in pkg_state.items():
@ -956,6 +1053,8 @@ def print_state_info_and_get_update_list(pkg_state):
def test_gpg_passphrase(signing_gpg_dir, signing_key_fp, passphrase): def test_gpg_passphrase(signing_gpg_dir, signing_key_fp, passphrase):
"""Checks if the given gpg passphrase works with the gpg signing key."""
with tempfile.NamedTemporaryFile() as tempnf: with tempfile.NamedTemporaryFile() as tempnf:
tempnf.write(b"Test file content") tempnf.write(b"Test file content")
tempnf.flush() tempnf.flush()
@ -987,6 +1086,8 @@ def test_gpg_passphrase(signing_gpg_dir, signing_key_fp, passphrase):
def validate_and_verify_paths(other_state): def validate_and_verify_paths(other_state):
"""Checks and validates/ensures that certain directories exist."""
if not os.path.exists(other_state["chroot"]): if not os.path.exists(other_state["chroot"]):
log_print( log_print(
f"ERROR: chroot at \"{other_state['chroot']}\" does not exist" f"ERROR: chroot at \"{other_state['chroot']}\" does not exist"