Compare commits

..

No commits in common. "9ac2a1e934b28b85899c785a92a121e73d92b98a" and "9d6378e97d81b43d49e41522b6c020083e19bd74" have entirely different histories.

2 changed files with 46 additions and 255 deletions

View file

@ -22,8 +22,6 @@ log_limit = 1073741824
error_on_limit = false
# If true, timestamps are in localtime. If false, timestamps are UTC.
datetime_in_local_time = true
# If true, all builds will be done in a tmpfs. Recommended to have a lot of RAM and/or swap.
tmpfs = false
########## END OF MANDATORY VARIABLES
# Each [[entry]] needs a "name".

299
update.py
View file

@ -18,7 +18,6 @@ import threading
from pathlib import Path
from typing import Any, Union
import signal
import pwd
# SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
SUDO_PROC = False
@ -259,7 +258,7 @@ def log_print(*args, **kwargs):
def ensure_pkg_dir_exists(
pkg: str,
pkg_state: dict[str, Any],
other_state: dict[str, Any],
other_state: dict[str, Union[None, str]],
):
"""Ensures that an AUR-pkg-dir exists, returning False on failure.
@ -325,7 +324,7 @@ def ensure_pkg_dir_exists(
def update_pkg_dir(
pkg: str,
pkg_state: dict[str, Any],
other_state: dict[str, Any],
other_state: dict[str, Union[None, str]],
):
"""Updates the pkg by invoking "git pull".
@ -509,7 +508,7 @@ def update_pkg_dir(
def check_pkg_build(
pkg: str,
pkg_state: dict[str, Any],
other_state: dict[str, Any],
other_state: dict[str, Union[None, str]],
editor: str,
):
"""Opens the PKGBUILD in the editor, then prompts the user for an action.
@ -565,7 +564,7 @@ def check_pkg_version(
pkg_state: dict[str, Any],
repo: str,
force_check_srcinfo: bool,
other_state: dict[str, Any],
other_state: dict[str, Union[None, str]],
):
"""Gets the installed version and pkg version and checks them.
@ -573,7 +572,7 @@ def check_pkg_version(
(installed pkg is up to date)."""
status, current_epoch, current_version = get_pkg_current_version(
pkg, pkg_state, repo, other_state
pkg, pkg_state, repo
)
if status != "fetched":
return status
@ -604,7 +603,7 @@ def check_pkg_version(
)
def get_srcinfo_version(pkg: str, other_state: dict[str, Any]):
def get_srcinfo_version(pkg: str, other_state: dict[str, Union[None, str]]):
"""Parses .SRCINFO for verison information.
Returns (success_bool, pkgepoch, pkgver, pkgrel)
@ -650,7 +649,7 @@ def get_pkgbuild_version(
pkg: str,
force_check_srcinfo: bool,
pkg_state: dict[str, Any],
other_state: dict[str, Any],
other_state: dict[str, Union[None, str]],
):
"""Gets the version of the pkg from .SRCINFO or PKGBUILD.
@ -688,18 +687,13 @@ def get_pkgbuild_version(
other_state=other_state,
)
# Ensure ccache isn't enabled for this check.
if other_state["tmpfs"]:
cleanup_ccache(other_state["tmpfs_chroot"])
else:
cleanup_ccache(other_state["chroot"])
cleanup_ccache(other_state["chroot"])
command_list = [
"/usr/bin/env",
"makechrootpkg",
"-c",
"-r",
other_state["tmpfs_chroot"]
if other_state["tmpfs"]
else other_state["chroot"],
other_state["chroot"],
]
post_command_list = ["--", "-s", "-r", "-c", "--nobuild"]
if "link_cargo_registry" in pkg_state[pkg]:
@ -755,73 +749,17 @@ def get_pkgbuild_version(
pkgver = None
pkgrel = None
# Setup checking the PKGBUILD from within the chroot.
chroot_user_path = os.path.join(
other_state["tmpfs_chroot"]
if other_state["tmpfs"]
else other_state["chroot"],
other_state["USER"],
# TODO maybe sandbox sourcing the PKGBUILD
pkgbuild_output = subprocess.run(
(
"/usr/bin/env",
"bash",
"-c",
f"source {os.path.join(pkgdir, 'PKGBUILD')}; echo \"pkgver=$pkgver\"; echo \"pkgrel=$pkgrel\"; echo \"epoch=$epoch\"",
),
capture_output=True,
text=True,
)
chroot_build_path = os.path.join(chroot_user_path, "build")
chroot_check_pkgbuild_path = os.path.join(chroot_build_path, "PKGBUILD")
chroot_check_sh_path = os.path.join(chroot_build_path, "check.sh")
try:
subprocess.run(
(
"/usr/bin/cp",
os.path.join(pkgdir, "PKGBUILD"),
chroot_check_pkgbuild_path,
),
check=True,
)
except subprocess.CalledProcessError:
log_print(
f'ERROR: Failed to check PKGBUILD (moving PKGBUILD to chroot) for "{pkg}"!',
other_state=other_state,
)
return False, None, None, None
check_pkgbuild_script = """#!/usr/bin/env bash
set -e
source "/build/PKGBUILD"
echo "pkgver=$pkgver"
echo "pkgrel=$pkgrel"
echo "epoch=$epoch"
"""
if not create_executable_script(
chroot_check_sh_path, check_pkgbuild_script
):
log_print(
f'ERROR: Failed to check PKGBUILD (check PKGBUILD setup) for "{pkg}"!',
other_state=other_state,
)
return False, None, None, None
pkgbuild_output = str()
try:
pkgbuild_output = subprocess.run(
(
"/usr/bin/env",
"sudo",
"arch-nspawn",
chroot_user_path,
"/build/check.sh",
),
check=True,
capture_output=True,
text=True,
)
except subprocess.CalledProcessError:
log_print(
f'ERROR: Failed to check PKGBUILD (checking PKGBUILD) for "{pkg}"!',
other_state=other_state,
)
return False, None, None, None
output_ver_re = re.compile(
"^pkgver=([a-zA-Z0-9._+-]+)\\s*$", flags=re.M
)
@ -857,7 +795,7 @@ def get_srcinfo_check_result(
pkg: str,
force_check_srcinfo: bool,
pkg_state: dict[str, Any],
other_state: dict[str, Any],
other_state: dict[str, Union[None, str]],
):
"""Checks the version of the pkg against the currently installed version.
@ -930,9 +868,7 @@ def get_srcinfo_check_result(
return "fail"
def get_pkg_current_version(
pkg: str, pkg_state: dict[str, Any], repo: str, other_state: dict[str, Any]
):
def get_pkg_current_version(pkg: str, pkg_state: dict[str, Any], repo: str):
"""Fetches the version info and returns status of fetching and the version.
Returns (status, epoch, version)
@ -998,7 +934,7 @@ def get_pkg_current_version(
return "fetched", current_epoch, current_version
def get_sudo_privileges(other_state: dict[str, Any]):
def get_sudo_privileges():
"""Starts a bash loop that ensures sudo privileges are ready while this
script is active."""
@ -1252,7 +1188,7 @@ def handle_output_stream(
def update_pkg_list(
pkgs: list[str],
pkg_state: dict[str, Any],
other_state: dict[str, Any],
other_state: dict[str, Union[None, str]],
signing_gpg_dir: str,
signing_gpg_key_fp: str,
signing_gpg_pass: str,
@ -1261,50 +1197,26 @@ def update_pkg_list(
"""For each package to build: builds it, signs it, and moves it to
"pkg_out_dir"."""
atexit.register(build_print_pkg_info, pkgs, pkg_state, other_state)
if not get_sudo_privileges(other_state):
if not get_sudo_privileges():
log_print(
"ERROR: Failed to get sudo privileges", other_state=other_state
)
pkg_state[pkg]["build_status"] = "get_sudo_fail"
sys.exit(1)
for pkg in pkgs:
if other_state["stop_building"]:
sys.exit(0)
pkgdir = os.path.join(other_state["clones_dir"], pkg)
if "ccache_dir" in pkg_state[pkg]:
cleanup_sccache(
other_state["tmpfs_chroot"]
if other_state["tmpfs"]
else other_state["chroot"]
)
setup_ccache(
other_state["tmpfs_chroot"]
if other_state["tmpfs"]
else other_state["chroot"]
)
cleanup_sccache(other_state["chroot"])
setup_ccache(other_state["chroot"])
else:
cleanup_ccache(
other_state["tmpfs_chroot"]
if other_state["tmpfs"]
else other_state["chroot"]
)
cleanup_ccache(other_state["chroot"])
if (
"sccache_dir" in pkg_state[pkg]
and not pkg_state[pkg]["sccache_rust_only"]
):
setup_sccache(
other_state["tmpfs_chroot"]
if other_state["tmpfs"]
else other_state["chroot"]
)
setup_sccache(other_state["chroot"])
else:
cleanup_sccache(
other_state["tmpfs_chroot"]
if other_state["tmpfs"]
else other_state["chroot"]
)
cleanup_sccache(other_state["chroot"])
# check integrity
log_print(
@ -1330,9 +1242,7 @@ def update_pkg_list(
"makechrootpkg",
"-c",
"-r",
other_state["tmpfs_chroot"]
if other_state["tmpfs"]
else other_state["chroot"],
other_state["chroot"],
]
post_command_list = [
"--",
@ -1621,6 +1531,17 @@ def update_pkg_list(
),
)
max_name_len = 1
for pkg in pkgs:
if len(pkg) + 1 > max_name_len:
max_name_len = len(pkg) + 1
for pkg in pkgs:
name_space = " " * (max_name_len - len(pkg))
log_print(
f'"{pkg}"{name_space}status: {pkg_state[pkg]["build_status"]}',
other_state=other_state,
)
def get_latest_pkg(pkg: str, cache_dir: str):
"""Gets the latest pkg from the specified "cache_dir" and return its
@ -1643,7 +1564,7 @@ def get_latest_pkg(pkg: str, cache_dir: str):
return None
def confirm_result(pkg: str, state_result: str, other_state: dict[str, Any]):
def confirm_result(pkg: str, state_result: str):
"""Prompts the user the action to take for a pkg after checking its
PKGBUILD.
@ -1707,29 +1628,8 @@ def print_state_info_and_get_update_list(
return to_update
def build_print_pkg_info(
pkgs: tuple[str, ...],
pkg_state: dict[str, Any],
other_state: dict[str, Any],
):
"""Prints the current "build" state of the given pkgs."""
max_name_len = 1
for pkg in pkgs:
if len(pkg) + 1 > max_name_len:
max_name_len = len(pkg) + 1
for pkg in pkgs:
name_space = " " * (max_name_len - len(pkg))
log_print(
f'"{pkg}"{name_space}status: {pkg_state[pkg]["build_status"]}',
other_state=other_state,
)
def test_gpg_passphrase(
signing_gpg_dir: str,
signing_key_fp: str,
passphrase: str,
other_state: dict[str, Any],
signing_gpg_dir: str, signing_key_fp: str, passphrase: str
):
"""Checks if the given gpg passphrase works with the gpg signing key."""
@ -1782,7 +1682,7 @@ def test_gpg_passphrase(
return True
def validate_and_verify_paths(other_state: dict[str, Any]):
def validate_and_verify_paths(other_state: dict[str, Union[None, str]]):
"""Checks and validates/ensures that certain directories exist."""
if not os.path.exists(other_state["chroot"]):
@ -1831,16 +1731,13 @@ def signal_handler(sig, frame):
print_state_info_and_get_update_list(OTHER_STATE, PKG_STATE)
if signal.Signals(sig) is not signal.SIGINT:
return
OTHER_STATE["stop_building"] = True
sys.exit(0)
if signal.Signals(sig) is not signal.SIGINT:
return
OTHER_STATE["stop_building"] = True
sys.exit(1)
def main():
"""The main function."""
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGUSR1, signal_handler)
editor = None
@ -1881,11 +1778,6 @@ def main():
action="store_true",
help="Don't sign built package and add to repo",
)
parser.add_argument(
"--tmpfs",
action="store_true",
help="Build in tmpfs",
)
args = parser.parse_args()
if (
@ -1907,12 +1799,8 @@ def main():
pkg_state = {}
other_state = {}
global PKG_STATE, OTHER_STATE
PKG_STATE = pkg_state
OTHER_STATE = other_state
other_state["USER"] = os.environ["USER"]
other_state["UID"] = pwd.getpwnam(other_state["USER"]).pw_uid
other_state["stop_building"] = False
other_state["logs_dir"] = None
other_state["log_limit"] = 1024 * 1024 * 1024
other_state["error_on_limit"] = False
@ -1951,7 +1839,6 @@ def main():
other_state["signing_gpg_dir"],
other_state["signing_gpg_key_fp"],
other_state["signing_gpg_pass"],
other_state,
):
sys.exit(1)
elif args.config:
@ -2053,7 +1940,6 @@ def main():
other_state["signing_gpg_dir"],
other_state["signing_gpg_key_fp"],
other_state["signing_gpg_pass"],
other_state,
):
sys.exit(1)
if "editor" in d:
@ -2092,10 +1978,6 @@ def main():
other_state["error_on_limit"]
)
)
if "tmpfs" in d and type(d["tmpfs"]) is bool and d["tmpfs"]:
other_state["tmpfs"] = True
else:
other_state["tmpfs"] = False
else:
log_print(
'ERROR: At least "--config" or "--pkg" must be specified',
@ -2103,64 +1985,6 @@ def main():
)
sys.exit(1)
while len(other_state["chroot"]) > 1 and other_state["chroot"][-1] == "/":
other_state["chroot"] = other_state["chroot"][:-1]
if args.tmpfs:
other_state["tmpfs"] = True
if other_state["tmpfs"]:
other_state["tmpfs_chroot"] = os.path.join(
os.path.dirname(os.path.realpath(other_state["chroot"])),
"tmpfs_chroot",
)
get_sudo_privileges(other_state)
try:
old_umask = os.umask(0o077)
log_print(
"Ensuring tmpfs_chroot dir exists...", other_state=other_state
)
subprocess.run(
(
"/usr/bin/env",
"mkdir",
"-p",
other_state["tmpfs_chroot"],
),
check=True,
)
log_print("Creating tmpfs dir...", other_state=other_state)
subprocess.run(
(
"/usr/bin/env",
"sudo",
"mount",
"-t",
"tmpfs",
"-o",
f"size=90%,mode=0700,uid={other_state['UID']}",
"tmpfs",
other_state["tmpfs_chroot"],
),
check=True,
)
atexit.register(
lambda tmpfs_path: subprocess.run(
(
"/usr/bin/env",
"sudo",
"bash",
"-c",
f"for ((i=0; i<5; ++i)); do if umount {tmpfs_path}; then break; fi; sleep 1; done",
)
),
other_state["tmpfs_chroot"],
)
os.umask(old_umask)
except subprocess.CalledProcessError:
log_print("ERROR: Failed to set up tmpfs!")
sys.exit(1)
validate_and_verify_paths(other_state)
if args.editor is not None:
@ -2201,31 +2025,6 @@ def main():
)
sys.exit(1)
if other_state["tmpfs"]:
try:
log_print(
'Copying "chroot"/root to tmpfs_chroot/root...',
other_state=other_state,
)
subprocess.run(
(
"/usr/bin/env",
"sudo",
"cp",
"-a",
f'{other_state["chroot"]}/root',
f'{other_state["tmpfs_chroot"]}/root',
),
check=True,
)
except subprocess.CalledProcessError:
log_print(
'ERROR: Failed to copy "chroot"/root to tmpfs_chroot/root!',
other_state=other_state,
)
sys.exit(1)
os.putenv("CHROOT", os.path.realpath(other_state["tmpfs_chroot"]))
pkg_list = [temp_pkg_name for temp_pkg_name in pkg_state.keys()]
# ensure build_status is populated.
for pkg_name in pkg_list:
@ -2315,9 +2114,7 @@ def main():
False,
other_state,
)
confirm_result_result = confirm_result(
pkg_list[i], state_result, other_state
)
confirm_result_result = confirm_result(pkg_list[i], state_result)
if confirm_result_result == "continue":
pkg_state[pkg_list[i]]["state"] = state_result
pkg_state[pkg_list[i]]["build_status"] = (
@ -2378,7 +2175,3 @@ def main():
log_print("Canceled.", other_state=other_state)
else:
log_print("No packages to update, done.", other_state=other_state)
if __name__ == "__main__":
main()