--- /dev/null
+#!/usr/bin/env python3
+
+import os
+import stat
+import sys
+import argparse
+import subprocess
+import re
+from packaging import version
+import atexit
+import glob
+import toml
+import datetime
+import time
+import shutil
+import getpass
+import tempfile
+
+SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
+SUDO_PROC = False
+AUR_GIT_REPO_PATH = "https://aur.archlinux.org"
+AUR_GIT_REPO_PATH_TEMPLATE = AUR_GIT_REPO_PATH + "/{}.git"
+global GLOBAL_LOG_FILE
+GLOBAL_LOG_FILE = "log.txt"
+
+
+def log_print(string):
+ print(string)
+ with open(GLOBAL_LOG_FILE, "a", encoding="utf-8") as lf:
+ print(string, file=lf)
+
+
+def ensure_pkg_dir_exists(pkg, pkg_state):
+ log_print('Checking that dir for "{}" exists...'.format(pkg))
+ pkg_dir = os.path.join(SCRIPT_DIR, pkg)
+ if os.path.isdir(pkg_dir):
+ log_print('Dir for "{}" exists.'.format(pkg))
+ return True
+ elif os.path.exists(pkg_dir):
+ log_print('"{}" exists but is not a dir'.format(pkg_dir))
+ return False
+ elif "repo_path" not in pkg_state[pkg]:
+ pkg_state[pkg]["repo_path"] = AUR_GIT_REPO_PATH_TEMPLATE.format(pkg)
+ try:
+ subprocess.run(
+ ["git", "clone", pkg_state[pkg]["repo_path"], pkg],
+ check=True,
+ cwd=SCRIPT_DIR,
+ )
+ except subprocess.CalledProcessError:
+ print(
+ 'ERROR: Failed to git clone "{}" (tried repo path "{}")'.format(
+ pkg_dir, pkg_state[pkg]["repo_path"]
+ )
+ )
+ return False
+ log_print('Created dir for "{}".'.format(pkg))
+ return True
+ elif pkg_state[pkg]["repo_path"] == "NO_REPO":
+ log_print('"{}" does not exist, but NO_REPO specified for repo_path')
+ return False
+
+
+def update_pkg_dir(pkg, state):
+ log_print('Making sure pkg dir for "{}" is up to date...'.format(pkg))
+
+ # fetch all
+ try:
+ subprocess.run(
+ ["git", "fetch", "-p", "--all"],
+ check=True,
+ cwd=os.path.join(SCRIPT_DIR, pkg),
+ )
+ except subprocess.CalledProcessError:
+ log_print(
+ 'ERROR: Failed to update pkg dir of "{}" (fetching).'.format(pkg)
+ )
+ return False, False
+
+ # get remotes
+ remotes = []
+ try:
+ result = subprocess.run(
+ ["git", "remote"],
+ check=True,
+ cwd=os.path.join(SCRIPT_DIR, pkg),
+ capture_output=True,
+ encoding="UTF-8",
+ )
+ remotes = result.stdout.split(sep="\n")
+ except subprocess.CalledProcessError:
+ log_print(
+ 'ERROR: Failed to update pkg dir of "{}" (getting remotes).'.format(
+ pkg
+ )
+ )
+ return False, False
+ remotes = list(filter(lambda s: len(s) > 0, remotes))
+ if len(remotes) == 0:
+ log_print(
+ 'ERROR: Failed to update pkg dir of "{}" (getting remotes).'.format(
+ pkg
+ )
+ )
+ return False, False
+
+ # get remote that current branch is tracking
+ selected_remote = ""
+ try:
+ result = subprocess.run(
+ ["git", "status", "-sb", "--porcelain"],
+ check=True,
+ cwd=os.path.join(SCRIPT_DIR, pkg),
+ capture_output=True,
+ encoding="UTF-8",
+ )
+ for remote in remotes:
+ if (
+ len(remote.strip()) > 0
+ and result.stdout.find(remote.strip()) != -1
+ ):
+ selected_remote = remote.strip()
+ break
+ except subprocess.CalledProcessError:
+ print(
+ 'ERROR: Failed to update pkg dir of "{}" (getting branch\'s remote).'.format(
+ pkg
+ )
+ )
+ return False, False
+ if len(selected_remote) == 0:
+ print(
+ 'ERROR: Failed to update pkg dir of "{}" (getting branch\'s remote).'.format(
+ pkg
+ )
+ )
+ return False, False
+
+ # get hash of current branch
+ current_branch_hash = ""
+ try:
+ result = subprocess.run(
+ ["git", "log", "-1", "--format=format:%H"],
+ check=True,
+ cwd=os.path.join(SCRIPT_DIR, pkg),
+ capture_output=True,
+ encoding="UTF-8",
+ )
+ current_branch_hash = result.stdout.strip()
+ except subprocess.CalledProcessError:
+ print(
+ 'ERROR: Failed to update pkg dir of "{}" (getting current branch\'s hash).'.format(
+ pkg
+ )
+ )
+ return False, False
+ if len(current_branch_hash.strip()) == 0:
+ print(
+ 'ERROR: Failed to update pkg dir of "{}" (getting current branch\'s hash).'.format(
+ pkg
+ )
+ )
+ return False, False
+
+ # get hash of remote branch
+ remote_branch_hash = ""
+ try:
+ result = subprocess.run(
+ ["git", "log", "-1", "--format=format:%H", selected_remote],
+ check=True,
+ cwd=os.path.join(SCRIPT_DIR, pkg),
+ capture_output=True,
+ encoding="UTF-8",
+ )
+ remote_branch_hash = result.stdout.strip()
+ except subprocess.CalledProcessError:
+ print(
+ 'ERROR: Failed to update pkg dir of "{}" (getting remote branch\'s hash).'.format(
+ pkg
+ )
+ )
+ return False, False
+ if len(remote_branch_hash.strip()) == 0:
+ print(
+ 'ERROR: Failed to update pkg dir of "{}" (getting remote branch\'s hash).'.format(
+ pkg
+ )
+ )
+ return False, False
+
+ # update current branch if not same commit
+ if current_branch_hash != remote_branch_hash:
+ try:
+ subprocess.run(
+ ["git", "pull"], check=True, cwd=os.path.join(SCRIPT_DIR, pkg)
+ )
+ except subprocess.CalledProcessError:
+ try:
+ subprocess.run(
+ ["git", "checkout", "--", "*"],
+ check=True,
+ cwd=os.path.join(SCRIPT_DIR, pkg),
+ )
+ subprocess.run(
+ ["git", "pull"],
+ check=True,
+ cwd=os.path.join(SCRIPT_DIR, pkg),
+ )
+ except subprocess.CalledProcessError:
+ log_print(
+ 'ERROR: Failed to update pkg dir of "{}".'.format(pkg)
+ )
+ return False, False
+ elif state[pkg]["skip_branch_up_to_date"]:
+ log_print(f'"{pkg}" is up to date')
+ return True, True
+ log_print('Updated pkg dir for "{}"'.format(pkg))
+ return True, False
+
+
+def check_pkg_build(pkg, editor):
+ """Returns "ok", "not_ok", "abort", or "force_build"."""
+ log_print('Checking PKGBUILD for "{}"...'.format(pkg))
+ try:
+ subprocess.run(
+ [editor, os.path.join(pkg, "PKGBUILD")], check=True, cwd=SCRIPT_DIR
+ )
+ except subprocess.CalledProcessError:
+ log_print('ERROR: Failed checking PKGBUILD for "{}"'.format(pkg))
+ return "abort"
+ while True:
+ log_print(
+ "PKGBUILD okay? [Y/n/c(heck again)/a(bort)/f(orce build)/b(ack)]"
+ )
+ user_input = sys.stdin.buffer.readline().decode().strip().lower()
+ if user_input == "y" or len(user_input) == 0:
+ log_print("User decided PKGBUILD is ok")
+ return "ok"
+ elif user_input == "n":
+ log_print("User decided PKGBUILD is not ok")
+ return "not_ok"
+ elif user_input == "c":
+ log_print("User will check PKGBUILD again")
+ return check_pkg_build(pkg, editor)
+ elif user_input == "a":
+ return "abort"
+ elif user_input == "f":
+ return "force_build"
+ elif user_input == "b":
+ return "back"
+ else:
+ log_print("ERROR: User gave invalid input...")
+ continue
+
+
+def check_pkg_version(pkgdir, pkg_state, repo, force_check_srcinfo):
+ """Returns "fail", "install", or "done"."""
+ status, current_epoch, current_version = get_pkg_current_version(pkgdir, pkg_state, repo)
+ if status != "fetched":
+ return status
+ elif current_version is None:
+ print(
+ 'ERROR: Failed to get version from package "{}".'.format(
+ pkg_state[pkgdir]["pkg_name"]
+ )
+ )
+ return "fail"
+ print(
+ 'Got version "{}:{}" for installed pkg "{}"'.format(
+ current_epoch if current_epoch is not None else "0",
+ current_version,
+ pkg_state[pkgdir]["pkg_name"],
+ )
+ )
+
+ return get_srcinfo_check_result(current_epoch, current_version, pkgdir, force_check_srcinfo)
+
+
+def get_srcinfo_version(pkgdir):
+ """Returns (success_bool, pkgepoch, pkgver, pkgrel)"""
+ if not os.path.exists(os.path.join(SCRIPT_DIR, pkgdir, ".SRCINFO")):
+ log_print(f'ERROR: .SRCINFO does not exist for pkg "{pkgdir}"')
+ return False, None, None, None
+ pkgver_reprog = re.compile(
+ "^\\s*pkgver\\s*=\\s*([a-zA-Z0-9._+-]+)\\s*$"
+ )
+ pkgrel_reprog = re.compile("^\\s*pkgrel\\s*=\\s*([0-9.]+)\\s*$")
+ pkgepoch_reprog = re.compile("^\\s*epoch\\s*=\\s*([0-9]+)\\s*$")
+ pkgver = ""
+ pkgrel = ""
+ pkgepoch = ""
+ with open(
+ os.path.join(SCRIPT_DIR, pkgdir, ".SRCINFO"), encoding="UTF-8"
+ ) as fo:
+ line = fo.readline()
+ while len(line) > 0:
+ pkgver_result = pkgver_reprog.match(line)
+ pkgrel_result = pkgrel_reprog.match(line)
+ pkgepoch_result = pkgepoch_reprog.match(line)
+ if pkgver_result:
+ pkgver = pkgver_result.group(1)
+ elif pkgrel_result:
+ pkgrel = pkgrel_result.group(1)
+ elif pkgepoch_result:
+ pkgepoch = pkgepoch_result.group(1)
+ line = fo.readline()
+ return True, pkgepoch, pkgver, pkgrel
+
+
+def get_pkgbuild_version(pkgdir, force_check_srcinfo):
+ """Returns (success, epoch, version, release)"""
+ log_print('Getting PKGBUILD version of "{}"...'.format(pkgdir))
+ while True and not force_check_srcinfo:
+ log_print("Use .SRCINFO or directly parse PKGBUILD?")
+ user_input = input("1 for .SRCINFO, 2 for PKGBUILD > ")
+ if user_input == "1" or user_input == "2":
+ break
+ # TODO support split packages
+ if force_check_srcinfo or user_input == "1":
+ srcinfo_fetch_success, pkgepoch, pkgver, pkgrel = get_srcinfo_version(pkgdir)
+ if not srcinfo_fetch_success:
+ log_print('ERROR: Failed to get pkg info from .SRCINFO')
+ return False, None, None, None
+ elif user_input == "2":
+ try:
+ log_print(
+ 'Running "makepkg --nobuild" to ensure pkgver in PKGBUILD is updated...'
+ )
+ subprocess.run(
+ ["makepkg", "-c", "--nobuild", "-s", "-r"],
+ check=True,
+ cwd=os.path.join(SCRIPT_DIR, pkgdir),
+ )
+ except subprocess.CalledProcessError:
+ log_print(
+ 'ERROR: Failed to run "makepkg --nobuild" in "{}".'.format(
+ pkgdir
+ )
+ )
+ if os.path.exists(os.path.join(SCRIPT_DIR, pkgdir, "src")):
+ shutil.rmtree(os.path.join(SCRIPT_DIR, pkgdir, "src"))
+ return False, None, None, None
+
+ if os.path.exists(os.path.join(SCRIPT_DIR, pkgdir, "src")):
+ shutil.rmtree(os.path.join(SCRIPT_DIR, pkgdir, "src"))
+ pkgepoch = ""
+ pkgver = ""
+ pkgrel = ""
+
+ # TODO maybe sandbox sourcing the PKGBUILD
+ pkgbuild_output = subprocess.run(
+ [
+ "bash",
+ "-c",
+ f"source {os.path.join(SCRIPT_DIR, pkgdir, 'PKGBUILD')}; echo \"pkgver=$pkgver\"; echo \"pkgrel=$pkgrel\"; echo \"epoch=$epoch\"",
+ ],
+ capture_output=True,
+ text=True,
+ )
+ output_ver_re = re.compile(
+ "^pkgver=([a-zA-Z0-9._+-]+)\\s*$", flags=re.M
+ )
+ output_rel_re = re.compile("^pkgrel=([0-9.]+)\\s*$", flags=re.M)
+ output_epoch_re = re.compile("^epoch=([0-9]+)\\s*$", flags=re.M)
+
+ match = output_ver_re.search(pkgbuild_output.stdout)
+ if match:
+ pkgver = match.group(1)
+ match = output_rel_re.search(pkgbuild_output.stdout)
+ if match:
+ pkgrel = match.group(1)
+ match = output_epoch_re.search(pkgbuild_output.stdout)
+ if match:
+ pkgepoch = match.group(1)
+ else:
+ log_print("ERROR: Unreachable code")
+ return False, None, None, None
+
+ if len(pkgepoch) == 0:
+ pkgepoch = None
+ if len(pkgver) == 0:
+ pkgver = None
+ if len(pkgrel) == 0:
+ pkgrel = None
+
+ if pkgver is not None and pkgrel is not None:
+ return True, pkgepoch, pkgver, pkgrel
+ else:
+ log_print(
+ 'ERROR: Failed to get PKGBUILD version of "{}".'.format(pkgdir)
+ )
+ return False, None, None, None
+
+
+def get_srcinfo_check_result(current_epoch, current_version, pkgdir, force_check_srcinfo):
+ ver_success, pkgepoch, pkgver, pkgrel = get_pkgbuild_version(pkgdir, force_check_srcinfo)
+ if ver_success:
+ if current_epoch is None and pkgepoch is not None:
+ print(
+ 'Current installed version of "{}" is out of date (missing epoch).'.format(
+ pkg_state[pkgdir]["pkg_name"]
+ )
+ )
+ return "install"
+ elif current_epoch is not None and pkgepoch is None:
+ print(
+ 'Current installed version of "{}" is up to date (has epoch).'.format(
+ pkg_state[pkgdir]["pkg_name"]
+ )
+ )
+ return "done"
+ elif current_epoch is not None and pkgepoch is not None and int(current_epoch) < int(pkgepoch):
+ print(
+ 'Current installed version of "{}" is out of date (older epoch).'.format(
+ pkg_state[pkgdir]["pkg_name"]
+ )
+ )
+ return "install"
+ elif pkgver is not None and pkgrel is not None and version.parse(current_version) < version.parse(pkgver + "-" + pkgrel):
+ print(
+ 'Current installed version of "{}" is out of date (older version).'.format(
+ pkg_state[pkgdir]["pkg_name"]
+ )
+ )
+ return "install"
+ else:
+ print(
+ 'Current installed version of "{}" is up to date.'.format(
+ pkg_state[pkgdir]["pkg_name"]
+ )
+ )
+ return "done"
+ else:
+ print(
+ 'ERROR: Failed to get pkg_version of "{}"'.format(
+ pkg_state[pkgdir]["pkg_name"]
+ )
+ )
+ return "fail"
+
+def get_pkg_current_version(pkgdir, pkg_state, repo):
+ """Returns (status, epoch, version)"""
+ print(
+ 'Checking version of installed pkg "{}"...'.format(
+ pkg_state[pkgdir]["pkg_name"]
+ )
+ )
+ current_epoch = None
+ current_version = None
+ try:
+ result = subprocess.run(
+ "tar -tf {} | grep '{}.*/$'".format(
+ repo, pkg_state[pkgdir]["pkg_name"]
+ ),
+ check=True,
+ capture_output=True,
+ encoding="UTF-8",
+ shell=True,
+ )
+ reprog = re.compile(
+ "^{}-(?P<epoch>[0-9]+:)?(?P<version>[^-/: ]*-[0-9]+)/$".format(
+ pkg_state[pkgdir]["pkg_name"]
+ ),
+ flags=re.MULTILINE,
+ )
+ reresult = reprog.search(result.stdout)
+ if reresult:
+ result_dict = reresult.groupdict()
+ if not result_dict["epoch"] is None:
+ current_epoch = result_dict["epoch"][:-1]
+ if not result_dict["version"] is None:
+ current_version = result_dict["version"]
+ else:
+ print(
+ "ERROR: Failed to get current version from repo for package {}".format(
+ pkg_state[pkgdir]["pkg_name"]
+ )
+ )
+ return "fail", None, None
+ except subprocess.CalledProcessError:
+ log_print("Package not found, assuming building first time.")
+ return "install", None, None
+ return "fetched", current_epoch, current_version
+
+
+def get_sudo_privileges():
+ global SUDO_PROC
+ if not SUDO_PROC:
+ log_print("sudo -v")
+ try:
+ subprocess.run(["sudo", "-v"], check=True)
+ except subprocess.CalledProcessError:
+ return False
+ SUDO_PROC = subprocess.Popen(
+ ["while true; do sudo -v; sleep 2m; done"], shell=True
+ )
+ atexit.register(cleanup_sudo, sudo_proc=SUDO_PROC)
+ return True
+ return True
+
+
+def cleanup_sudo(sudo_proc):
+ sudo_proc.terminate()
+
+
+def create_executable_script(dest_filename, script_contents):
+ tempf_name = "unknown"
+ with tempfile.NamedTemporaryFile(
+ mode="w", encoding="utf-8", delete=False
+ ) as tempf:
+ print(
+ """#!/usr/bin/env python3
+import os
+import stat
+import argparse
+
+def create_executable_script(dest_filename, script_contents):
+ with open(dest_filename, mode='w', encoding='utf-8') as f:
+ f.write(script_contents)
+ os.chmod(dest_filename, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
+ | stat.S_IRGRP | stat.S_IXGRP
+ | stat.S_IROTH | stat.S_IXOTH)
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(description="Set new file with execute permissions")
+ parser.add_argument("--dest_filename")
+ parser.add_argument("--script_contents")
+ args = parser.parse_args()
+
+ create_executable_script(args.dest_filename, args.script_contents)
+""",
+ file=tempf,
+ )
+ tempf_name = tempf.name
+ try:
+ subprocess.run(
+ [
+ "sudo",
+ "/usr/bin/env",
+ "python3",
+ tempf_name,
+ "--dest_filename",
+ dest_filename,
+ "--script_contents",
+ script_contents,
+ ],
+ check=True,
+ )
+ except subprocess.CalledProcessError:
+ log_print(
+ f'ERROR: Failed to create executable script "{dest_filename}"'
+ )
+ return False
+ return True
+
+
+def setup_ccache(chroot):
+ # set up ccache stuff
+ try:
+ subprocess.run(
+ [
+ "sudo",
+ "sed",
+ "-i",
+ "/^BUILDENV=/s/!ccache/ccache/",
+ f"{chroot}/root/etc/makepkg.conf",
+ ],
+ check=True,
+ cwd=SCRIPT_DIR,
+ )
+ except subprocess.CalledProcessError:
+ log_print("ERROR: Failed to enable ccache in makepkg.conf")
+ sys.exit(1)
+
+
+def cleanup_ccache(chroot):
+ # cleanup ccache stuff
+ try:
+ subprocess.run(
+ [
+ "sudo",
+ "sed",
+ "-i",
+ "/^BUILDENV=/s/ ccache/ !ccache/",
+ f"{chroot}/root/etc/makepkg.conf",
+ ],
+ check=True,
+ cwd=SCRIPT_DIR,
+ )
+ except subprocess.CalledProcessError:
+ log_print("ERROR: Failed to disable ccache in makepkg.conf")
+ sys.exit(1)
+
+
+def setup_sccache(chroot):
+ sccache_script = """#!/usr/bin/env sh
+export PATH=${PATH/:\/usr\/local\/bin/}
+/usr/bin/env sccache $(basename "$0") "$@"
+"""
+ if (
+ not create_executable_script(
+ f"{chroot}/root/usr/local/bin/gcc", sccache_script
+ )
+ or not create_executable_script(
+ f"{chroot}/root/usr/local/bin/g++", sccache_script
+ )
+ or not create_executable_script(
+ f"{chroot}/root/usr/local/bin/clang", sccache_script
+ )
+ or not create_executable_script(
+ f"{chroot}/root/usr/local/bin/clang++", sccache_script
+ )
+ or not create_executable_script(
+ f"{chroot}/root/usr/local/bin/rustc", sccache_script
+ )
+ ):
+ log_print("ERROR: Failed to set up sccache wrapper scripts")
+ sys.exit(1)
+
+
+def cleanup_sccache(chroot):
+ # cleanup sccache stuff
+ try:
+ subprocess.run(
+ [
+ "sudo",
+ "rm",
+ "-f",
+ f"{chroot}/root/usr/local/bin/gcc",
+ f"{chroot}/root/usr/local/bin/g++",
+ f"{chroot}/root/usr/local/bin/clang",
+ f"{chroot}/root/usr/local/bin/clang++",
+ f"{chroot}/root/usr/local/bin/rustc",
+ ],
+ check=False,
+ cwd=SCRIPT_DIR,
+ )
+ except BaseException:
+ log_print("WARNING: Failed to cleanup sccache files")
+
+
+def update_pkg_list(
+ pkgs,
+ pkg_state,
+ chroot,
+ pkg_out_dir,
+ repo,
+ logs_dir,
+ no_update,
+ signing_gpg_dir,
+ signing_gpg_key_fp,
+ signing_gpg_pass,
+ no_store,
+):
+ if not get_sudo_privileges():
+ log_print("ERROR: Failed to get sudo privileges")
+ sys.exit(1)
+ if not no_update:
+ log_print("Updating the chroot...")
+ try:
+ subprocess.run(
+ ["arch-nspawn", "{}/root".format(chroot), "pacman", "-Syu"],
+ check=True,
+ )
+ except subprocess.CalledProcessError:
+ log_print("ERROR: Failed to update the chroot")
+ sys.exit(1)
+ for pkg in pkgs:
+ log_print(f'Building "{pkg}"...')
+ if "ccache_dir" in pkg_state[pkg]:
+ cleanup_sccache(chroot)
+ setup_ccache(chroot)
+ else:
+ cleanup_ccache(chroot)
+ if "sccache_dir" in pkg_state[pkg]:
+ setup_sccache(chroot)
+ else:
+ cleanup_sccache(chroot)
+
+ command_list = [
+ "makechrootpkg",
+ "-c",
+ "-r",
+ chroot,
+ ]
+ post_command_list = [
+ "--",
+ "--syncdeps",
+ "--noconfirm",
+ "--log",
+ "--holdver",
+ ]
+ for dep in pkg_state[pkg]["other_deps"]:
+ dep_fullpath = get_latest_pkg(dep, "/var/cache/pacman/pkg")
+ if not dep_fullpath:
+ log_print('ERROR: Failed to get dep "{}"'.format(dep))
+ sys.exit(1)
+ command_list.insert(1, "-I")
+ command_list.insert(2, dep_fullpath)
+ for aur_dep in pkg_state[pkg]["aur_deps"]:
+ aur_dep_fullpath = get_latest_pkg(aur_dep, pkg_out_dir)
+ if not aur_dep_fullpath:
+ log_print('ERROR: Failed to get aur_dep "{}"'.format(aur_dep))
+ sys.exit(1)
+ command_list.insert(1, "-I")
+ command_list.insert(2, aur_dep_fullpath)
+ if "ccache_dir" in pkg_state[pkg]:
+ command_list.insert(1, "-d")
+ command_list.insert(2, f'{pkg_state[pkg]["ccache_dir"]}:/ccache')
+ post_command_list.insert(1, "CCACHE_DIR=/ccache")
+ elif "sccache_dir" in pkg_state[pkg]:
+ command_list.insert(1, "-d")
+ command_list.insert(2, f'{pkg_state[pkg]["sccache_dir"]}:/sccache')
+ post_command_list.insert(1, "SCCACHE_DIR=/sccache")
+ post_command_list.insert(
+ 2, f'SCCACHE_CACHE_SIZE={pkg_state[pkg]["sccache_cache_size"]}'
+ )
+ post_command_list.insert(3, "RUSTC_WRAPPER=/usr/bin/sccache")
+ nowstring = datetime.datetime.now(datetime.timezone.utc).strftime(
+ "%Y-%m-%d_%H-%M-%S_%Z"
+ )
+ # log_print(f"Using command list: {command_list + post_command_list}") # DEBUG
+ with open(
+ os.path.join(logs_dir, "{}_stdout_{}".format(pkg, nowstring)),
+ "w",
+ ) as log_stdout, open(
+ os.path.join(logs_dir, "{}_stderr_{}".format(pkg, nowstring)),
+ "w",
+ ) as log_stderr:
+ try:
+ subprocess.run(
+ command_list + post_command_list,
+ check=True,
+ cwd=os.path.join(SCRIPT_DIR, pkg),
+ stdout=log_stdout,
+ stderr=log_stderr,
+ )
+ except subprocess.CalledProcessError:
+ log_print(
+ 'ERROR: Failed to build pkg "{}" in chroot'.format(pkg)
+ )
+ pkg_state[pkg]["build_status"] = "fail"
+ continue
+
+ if no_store:
+ pkg_state[pkg]["build_status"] = "success"
+ continue
+
+ pkg_list = glob.glob(os.path.join(SCRIPT_DIR, pkg, "*.pkg.tar*"))
+
+ log_print("Signing package...")
+ for gpkg in pkg_list:
+ try:
+ command_list = [
+ "gpg",
+ "--batch",
+ "--passphrase-fd",
+ "0",
+ "--pinentry-mode",
+ "loopback",
+ "--default-key",
+ signing_gpg_key_fp,
+ "--detach-sign",
+ gpkg,
+ ]
+ subprocess.run(
+ command_list,
+ check=True,
+ cwd=os.path.join(SCRIPT_DIR, pkg),
+ input=signing_gpg_pass,
+ text=True,
+ env={"GNUPGHOME": signing_gpg_dir},
+ )
+ except subprocess.CalledProcessError:
+ log_print(f'ERROR: Failed to sign pkg "{pkg}"')
+
+ log_print("Adding built pkgs to repo...")
+ try:
+ command_list = ["repo-add", repo]
+ for gpkg in pkg_list:
+ command_list.append(gpkg)
+ subprocess.run(command_list, check=True)
+ except subprocess.CalledProcessError:
+ log_print(
+ 'ERROR: Failed to add built pkg(s) "{}" to repo.'.format(pkg)
+ )
+ pkg_state[pkg]["build_status"] = "add_fail"
+ continue
+
+ log_print('Signing "custom.db"...')
+ try:
+ subprocess.run(
+ [
+ "/usr/bin/rm",
+ "-f",
+ str(os.path.join(pkg_out_dir, "custom.db.sig")),
+ ]
+ )
+ subprocess.run(
+ [
+ "/usr/bin/gpg",
+ "--batch",
+ "--passphrase-fd",
+ "0",
+ "--pinentry-mode",
+ "loopback",
+ "--default-key",
+ signing_gpg_key_fp,
+ "--detach-sign",
+ str(os.path.join(pkg_out_dir, "custom.db")),
+ ],
+ check=True,
+ input=signing_gpg_pass,
+ text=True,
+ env={"GNUPGHOME": signing_gpg_dir},
+ )
+ except subprocess.CalledProcessError:
+ log_print('WARNING: Failed to sign "custom.db"')
+
+ pkg_state[pkg]["build_status"] = "success"
+
+ log_print("Moving pkg to pkgs directory...")
+ for f in pkg_list:
+ log_print(f'Moving "{f}"...')
+ os.rename(f, os.path.join(pkg_out_dir, os.path.basename(f)))
+ sig_name = f + ".sig"
+ if os.path.exists(sig_name):
+ log_print(f'Moving "{sig_name}"...')
+ os.rename(
+ sig_name,
+ os.path.join(pkg_out_dir, os.path.basename(sig_name)),
+ )
+
+ for pkg in pkgs:
+ log_print(f'"{pkg}" status: {pkg_state[pkg]["build_status"]}')
+
+
+def get_latest_pkg(pkg, cache_dir):
+ globbed = glob.glob(os.path.join(cache_dir, pkg + "*"))
+ if len(globbed) > 0:
+ globbed.sort()
+ reprog = re.compile(
+ ".*"
+ + pkg
+ + "-[0-9a-zA-Z.+_:]+-[0-9a-zA-Z.+_]+-(any|x86_64).pkg.tar.(xz|gz|zst)$"
+ )
+ result = list(filter(lambda x: reprog.match(x), globbed))
+ if len(result) == 0:
+ return None
+ else:
+ return result[-1]
+ else:
+ return None
+
+
+def confirm_result(pkg, state_result):
+ """Returns "continue", "recheck", "force_build", or "abort"."""
+ while True:
+ print(
+ 'Got "{}" for pkg "{}", action: [C(ontinue), r(echeck), f(orce build),\
+ s(kip), b(ack) a(abort)]'.format(
+ state_result, pkg
+ )
+ )
+ user_input = sys.stdin.buffer.readline().decode().strip().lower()
+ if user_input == "c" or len(user_input) == 0:
+ return "continue"
+ elif user_input == "r":
+ return "recheck"
+ elif user_input == "f":
+ return "force_build"
+ elif user_input == "s":
+ return "skip"
+ elif user_input == "b":
+ return "back"
+ elif user_input == "a":
+ return "abort"
+ else:
+ log_print("Got invalid input")
+ continue
+
+
+def print_state_info_and_get_update_list(pkg_state):
+ to_update = []
+ log_print("package state:")
+ for (pkg_name, pkg_dict) in pkg_state.items():
+ if "state" in pkg_dict:
+ log_print(f" {pkg_name:40}: {pkg_dict['state']}")
+ if pkg_dict["state"] == "install":
+ to_update.append(pkg_name)
+ else:
+ log_print(f" {pkg_name:40}: not reached")
+ return to_update
+
+
+def test_gpg_passphrase(signing_gpg_dir, signing_key_fp, passphrase):
+ with tempfile.NamedTemporaryFile() as tempnf:
+ tempnf.write(b"Test file content")
+ tempnf.flush()
+ try:
+ subprocess.run(
+ [
+ "gpg",
+ "--batch",
+ "--passphrase-fd",
+ "0",
+ "--pinentry-mode",
+ "loopback",
+ "--default-key",
+ signing_key_fp,
+ "--detach-sign",
+ tempnf.name,
+ ],
+ check=True,
+ input=passphrase,
+ text=True,
+ env={"GNUPGHOME": signing_gpg_dir},
+ )
+ os.remove(tempnf.name + ".sig")
+ except subprocess.CalledProcessError:
+ log_print("ERROR: Failed to sign test file with gpg")
+ return False
+ log_print("Verified passphrase works by signing dummy test file")
+ return True
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(description="Update AUR pkgs")
+ parser.add_argument(
+ "--config", help="Info and pkg(s) to update in a .toml config"
+ )
+ parser.add_argument(
+ "-p", "--pkg", action="append", help="Pkg(s) to update", metavar="pkg"
+ )
+ parser.add_argument(
+ "--no-skip",
+ action="append",
+ help="Pkg(s) to not skip if up to date",
+ metavar="noskip",
+ )
+ parser.add_argument(
+ "-e",
+ "--editor",
+ default="vim",
+ help="editor to use when viewing PKGBUILDs",
+ metavar="editor",
+ )
+ parser.add_argument("--chroot", help="Chroot to build in")
+ parser.add_argument("--pkg-dir", help="Destination for built pkgs")
+ parser.add_argument("--repo", help="repository tar file")
+ parser.add_argument("--gpg-dir", help="gpg home for checking signatures")
+ parser.add_argument("--logs-dir", help="dir to put logs")
+ parser.add_argument(
+ "--no-update", help="Do not update chroot", action="store_true"
+ )
+ parser.add_argument("--signing-gpg-dir", help="gpg home for signing key")
+ parser.add_argument(
+ "--signing-gpg-key-fp", help="gpg fingerprint for signing key"
+ )
+ parser.add_argument(
+ "--no-store",
+ action="store_true",
+ help="Don't sign built package and add to repo",
+ )
+ args = parser.parse_args()
+
+ if (
+ args.pkg
+ and not args.config
+ and (
+ not args.chroot
+ or not args.pkg_dir
+ or not args.repo
+ or not args.gpg_dir
+ or not args.logs_dir
+ )
+ ):
+ print(
+ "ERROR: --pkg requires also --chroot, --pkg_dir, --repo, --gpg_dir, and --logs_dir"
+ )
+ sys.exit(1)
+
+ pkg_state = {}
+ if args.pkg and not args.config:
+ for pkg in args.pkg:
+ pkg_state[pkg] = {}
+ pkg_state[pkg]["aur_deps"] = []
+ args_chroot = args.chroot
+ args_pkg_dir = args.pkg_dir
+ args_repo = args.repo
+ args_gpg_home = args.gpg_dir
+ args_logs_dir = args.logs_dir
+ if args_logs_dir is not None:
+ GLOBAL_LOG_FILE = args_logs_dir + "/update.py_logs"
+ log_print(
+ f"{datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M %Z')}"
+ )
+ log_print(f"Set GLOBAL_LOG_FILE to {GLOBAL_LOG_FILE}")
+ args_signing_gpg_dir = args.signing_gpg_dir
+ args_signing_gpg_key_fp = args.signing_gpg_key_fp
+ if args_signing_gpg_key_fp is None:
+ log_print(
+ 'ERROR: Signing key fingerprint "signing_gpg_key_fp" not present in config'
+ )
+ sys.exit(1)
+ if args_signing_gpg_dir is not None and not args.no_store:
+ args_signing_gpg_pass = getpass.getpass("gpg signing key pass: ")
+ if not test_gpg_passphrase(
+ args_signing_gpg_dir,
+ args_signing_gpg_key_fp,
+ args_signing_gpg_pass,
+ ):
+ sys.exit(1)
+ elif args.config:
+ d = toml.load(args.config)
+ for entry in d["entry"]:
+ pkg_state[entry["name"]] = {}
+ if "aur_deps" in entry:
+ pkg_state[entry["name"]]["aur_deps"] = entry["aur_deps"]
+ else:
+ pkg_state[entry["name"]]["aur_deps"] = []
+ if "repo_path" in entry:
+ pkg_state[entry["name"]]["repo_path"] = entry["repo_path"]
+ if "pkg_name" in entry:
+ pkg_state[entry["name"]]["pkg_name"] = entry["pkg_name"]
+ else:
+ pkg_state[entry["name"]]["pkg_name"] = entry["name"]
+ if "ccache_dir" in entry:
+ pkg_state[entry["name"]]["ccache_dir"] = entry["ccache_dir"]
+ elif "sccache_dir" in entry:
+ pkg_state[entry["name"]]["sccache_dir"] = entry["sccache_dir"]
+ if "sccache_cache_size" in entry:
+ pkg_state[entry["name"]]["sccache_cache_size"] = entry[
+ "sccache_cache_size"
+ ]
+ else:
+ pkg_state[entry["name"]]["sccache_cache_size"] = "5G"
+ if "other_deps" in entry:
+ pkg_state[entry["name"]]["other_deps"] = entry["other_deps"]
+ else:
+ pkg_state[entry["name"]]["other_deps"] = []
+ if "skip_branch_up_to_date" in entry and not (
+ not args.no_skip is None and entry["name"] in args.no_skip
+ ):
+ pkg_state[entry["name"]]["skip_branch_up_to_date"] = True
+ else:
+ pkg_state[entry["name"]]["skip_branch_up_to_date"] = False
+ args_chroot = d["chroot"]
+ args_pkg_dir = d["pkg_dir"]
+ args_repo = d["repo"]
+ args_gpg_home = d["gpg_dir"]
+ args_logs_dir = d["logs_dir"]
+ if args_logs_dir is not None:
+ GLOBAL_LOG_FILE = args_logs_dir + "/update.py_logs"
+ log_print(
+ f"{datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M %Z')}"
+ )
+ log_print(f"Set GLOBAL_LOG_FILE to {GLOBAL_LOG_FILE}")
+ if args.pkg:
+ to_keep = [args_pkg for args_pkg in args.pkg]
+ removal = []
+ for existing in pkg_state.keys():
+ if existing in to_keep:
+ pass
+ else:
+ removal.append(existing)
+ for to_remove in removal:
+ del pkg_state[to_remove]
+
+ if "signing_gpg_dir" in d and not args.no_store:
+ args_signing_gpg_dir = d["signing_gpg_dir"]
+ args_signing_gpg_key_fp = d["signing_gpg_key_fp"]
+ args_signing_gpg_pass = getpass.getpass("gpg signing key pass: ")
+ if not test_gpg_passphrase(
+ args_signing_gpg_dir,
+ args_signing_gpg_key_fp,
+ args_signing_gpg_pass,
+ ):
+ sys.exit(1)
+ else:
+ log_print('ERROR: At least "--config" or "--pkg" must be specified')
+ sys.exit(1)
+
+ os.putenv("CHROOT", os.path.realpath(args_chroot))
+ os.putenv("GNUPGHOME", os.path.realpath(args_gpg_home))
+ if not os.path.exists(args_logs_dir):
+ os.makedirs(args_logs_dir)
+ elif not os.path.isdir(args_logs_dir):
+ log_print(
+ 'ERROR: logs_dir "{}" must be a directory'.format(args_logs_dir)
+ )
+ sys.exit(1)
+ pkg_list = [temp_pkg_name for temp_pkg_name in pkg_state.keys()]
+ i = 0
+ while i < len(pkg_list):
+ going_back = False
+ if not ensure_pkg_dir_exists(pkg_list[i], pkg_state):
+ print_state_info_and_get_update_list(pkg_state)
+ sys.exit(1)
+ skip = False
+ if (
+ "repo_path" not in pkg_state[pkg_list[i]]
+ or pkg_state[pkg_list[i]]["repo_path"] != "NO_REPO"
+ ):
+ update_pkg_dir_count = 0
+ update_pkg_dir_success = False
+ while update_pkg_dir_count < 5:
+ (success, skip_on_same_ver) = update_pkg_dir(pkg_list[i], pkg_state)
+ if success:
+ update_pkg_dir_success = True
+ break
+ else:
+ time.sleep(1)
+ update_pkg_dir_count += 1
+ if not update_pkg_dir_success:
+ log_print('Failed to update pkg dir for "{}"', pkg_list[i])
+ print_state_info_and_get_update_list(pkg_state)
+ sys.exit(1)
+ if skip_on_same_ver:
+ check_pkg_version_result = check_pkg_version(pkg_list[i], pkg_state, args_repo, True)
+ if check_pkg_version_result != "install":
+ log_print(f'Pkg {pkg_list[i]} is up to date, skipping...')
+ pkg_state[pkg_list[i]]["state"] = "up to date"
+ i += 1
+ continue
+ else:
+ check_pkg_build_result = check_pkg_build(pkg_list[i], args.editor)
+ if check_pkg_build_result == "ok":
+ pass
+ elif check_pkg_build_result == "not_ok":
+ pkg_state[pkg_list[i]]["state"] = "skip"
+ i += 1
+ continue
+ elif check_pkg_build_result == "force_build":
+ pkg_state[pkg_list[i]]["state"] = "install"
+ i += 1
+ continue
+ elif check_pkg_build_result == "invalid":
+ continue
+ elif check_pkg_build_result == "back":
+ if i > 0:
+ i -= 1
+ continue
+ else: # check_pkg_build_result == "abort":
+ print_state_info_and_get_update_list(pkg_state)
+ sys.exit(1)
+ while True:
+ if skip_on_same_ver and check_pkg_version_result is not None:
+ state_result = check_pkg_version_result
+ else:
+ state_result = check_pkg_version(pkg_list[i], pkg_state, args_repo, False)
+ confirm_result_result = confirm_result(pkg_list[i], state_result)
+ if confirm_result_result == "continue":
+ pkg_state[pkg_list[i]]["state"] = state_result
+ break
+ elif confirm_result_result == "recheck":
+ check_pkg_version_result = None
+ continue
+ elif confirm_result_result == "force_build":
+ pkg_state[pkg_list[i]]["state"] = "install"
+ break
+ elif confirm_result_result == "skip":
+ pkg_state[pkg_list[i]]["state"] = "skip"
+ break
+ elif confirm_result_result == "back":
+ if i > 0:
+ i -= 1
+ going_back = True
+ break
+ else: # confirm_result_result == "abort"
+ print_state_info_and_get_update_list(pkg_state)
+ sys.exit(1)
+ if going_back:
+ pass
+ else:
+ i += 1
+
+ log_print("Showing current actions:")
+ pkgs_to_update = print_state_info_and_get_update_list(pkg_state)
+ if len(pkgs_to_update) > 0:
+ log_print("Continue? [Y/n]")
+ user_input = sys.stdin.buffer.readline().decode().strip().lower()
+ if user_input == "y" or len(user_input) == 0:
+ if args.no_update:
+ log_print("Updating (without updating chroot)...")
+ else:
+ log_print("Updating...")
+ update_pkg_list(
+ pkgs_to_update,
+ pkg_state,
+ os.path.realpath(args_chroot),
+ os.path.realpath(args_pkg_dir),
+ os.path.realpath(args_repo),
+ os.path.realpath(args_logs_dir),
+ args.no_update,
+ "" if args.no_store else args_signing_gpg_dir,
+ "" if args.no_store else args_signing_gpg_key_fp,
+ "" if args.no_store else args_signing_gpg_pass,
+ args.no_store,
+ )
+ else:
+ log_print("Canceled.")
+ else:
+ log_print("No packages to update, done.")