Compare commits

..

No commits in common. "6fde09ea71bd53e8a09f56c1fd635a335ccbab18" and "9c82f847ee314911eedf1c2c95ef62431d6cf080" have entirely different histories.

2 changed files with 60 additions and 64 deletions

View file

@ -22,8 +22,6 @@ log_limit = 1073741824
error_on_limit = false error_on_limit = false
# If true, timestamps are in localtime. If false, timestamps are UTC. # If true, timestamps are in localtime. If false, timestamps are UTC.
datetime_in_local_time = true datetime_in_local_time = true
# If true, all builds will be done in a tmpfs. Recommended to have a lot of RAM and/or swap.
tmpfs = false
########## END OF MANDATORY VARIABLES ########## END OF MANDATORY VARIABLES
# Each [[entry]] needs a "name". # Each [[entry]] needs a "name".

122
update.py
View file

@ -18,7 +18,6 @@ import threading
from pathlib import Path from pathlib import Path
from typing import Any, Union from typing import Any, Union
import signal import signal
import pwd
# SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) # SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
SUDO_PROC = False SUDO_PROC = False
@ -259,7 +258,7 @@ def log_print(*args, **kwargs):
def ensure_pkg_dir_exists( def ensure_pkg_dir_exists(
pkg: str, pkg: str,
pkg_state: dict[str, Any], pkg_state: dict[str, Any],
other_state: dict[str, Any], other_state: dict[str, Union[None, str]],
): ):
"""Ensures that an AUR-pkg-dir exists, returning False on failure. """Ensures that an AUR-pkg-dir exists, returning False on failure.
@ -325,7 +324,7 @@ def ensure_pkg_dir_exists(
def update_pkg_dir( def update_pkg_dir(
pkg: str, pkg: str,
pkg_state: dict[str, Any], pkg_state: dict[str, Any],
other_state: dict[str, Any], other_state: dict[str, Union[None, str]],
): ):
"""Updates the pkg by invoking "git pull". """Updates the pkg by invoking "git pull".
@ -509,7 +508,7 @@ def update_pkg_dir(
def check_pkg_build( def check_pkg_build(
pkg: str, pkg: str,
pkg_state: dict[str, Any], pkg_state: dict[str, Any],
other_state: dict[str, Any], other_state: dict[str, Union[None, str]],
editor: str, editor: str,
): ):
"""Opens the PKGBUILD in the editor, then prompts the user for an action. """Opens the PKGBUILD in the editor, then prompts the user for an action.
@ -565,7 +564,7 @@ def check_pkg_version(
pkg_state: dict[str, Any], pkg_state: dict[str, Any],
repo: str, repo: str,
force_check_srcinfo: bool, force_check_srcinfo: bool,
other_state: dict[str, Any], other_state: dict[str, Union[None, str]],
): ):
"""Gets the installed version and pkg version and checks them. """Gets the installed version and pkg version and checks them.
@ -573,7 +572,7 @@ def check_pkg_version(
(installed pkg is up to date).""" (installed pkg is up to date)."""
status, current_epoch, current_version = get_pkg_current_version( status, current_epoch, current_version = get_pkg_current_version(
pkg, pkg_state, repo, other_state pkg, pkg_state, repo
) )
if status != "fetched": if status != "fetched":
return status return status
@ -604,7 +603,7 @@ def check_pkg_version(
) )
def get_srcinfo_version(pkg: str, other_state: dict[str, Any]): def get_srcinfo_version(pkg: str, other_state: dict[str, Union[None, str]]):
"""Parses .SRCINFO for verison information. """Parses .SRCINFO for verison information.
Returns (success_bool, pkgepoch, pkgver, pkgrel) Returns (success_bool, pkgepoch, pkgver, pkgrel)
@ -650,7 +649,7 @@ def get_pkgbuild_version(
pkg: str, pkg: str,
force_check_srcinfo: bool, force_check_srcinfo: bool,
pkg_state: dict[str, Any], pkg_state: dict[str, Any],
other_state: dict[str, Any], other_state: dict[str, Union[None, str]],
): ):
"""Gets the version of the pkg from .SRCINFO or PKGBUILD. """Gets the version of the pkg from .SRCINFO or PKGBUILD.
@ -801,7 +800,7 @@ def get_srcinfo_check_result(
pkg: str, pkg: str,
force_check_srcinfo: bool, force_check_srcinfo: bool,
pkg_state: dict[str, Any], pkg_state: dict[str, Any],
other_state: dict[str, Any], other_state: dict[str, Union[None, str]],
): ):
"""Checks the version of the pkg against the currently installed version. """Checks the version of the pkg against the currently installed version.
@ -874,9 +873,7 @@ def get_srcinfo_check_result(
return "fail" return "fail"
def get_pkg_current_version( def get_pkg_current_version(pkg: str, pkg_state: dict[str, Any], repo: str):
pkg: str, pkg_state: dict[str, Any], repo: str, other_state: dict[str, Any]
):
"""Fetches the version info and returns status of fetching and the version. """Fetches the version info and returns status of fetching and the version.
Returns (status, epoch, version) Returns (status, epoch, version)
@ -942,7 +939,7 @@ def get_pkg_current_version(
return "fetched", current_epoch, current_version return "fetched", current_epoch, current_version
def get_sudo_privileges(other_state: dict[str, Any]): def get_sudo_privileges():
"""Starts a bash loop that ensures sudo privileges are ready while this """Starts a bash loop that ensures sudo privileges are ready while this
script is active.""" script is active."""
@ -1196,7 +1193,7 @@ def handle_output_stream(
def update_pkg_list( def update_pkg_list(
pkgs: list[str], pkgs: list[str],
pkg_state: dict[str, Any], pkg_state: dict[str, Any],
other_state: dict[str, Any], other_state: dict[str, Union[None, str]],
signing_gpg_dir: str, signing_gpg_dir: str,
signing_gpg_key_fp: str, signing_gpg_key_fp: str,
signing_gpg_pass: str, signing_gpg_pass: str,
@ -1205,17 +1202,13 @@ def update_pkg_list(
"""For each package to build: builds it, signs it, and moves it to """For each package to build: builds it, signs it, and moves it to
"pkg_out_dir".""" "pkg_out_dir"."""
atexit.register(build_print_pkg_info, pkgs, pkg_state, other_state) if not get_sudo_privileges():
if not get_sudo_privileges(other_state):
log_print( log_print(
"ERROR: Failed to get sudo privileges", other_state=other_state "ERROR: Failed to get sudo privileges", other_state=other_state
) )
pkg_state[pkg]["build_status"] = "get_sudo_fail" pkg_state[pkg]["build_status"] = "get_sudo_fail"
sys.exit(1) sys.exit(1)
for pkg in pkgs: for pkg in pkgs:
if other_state["stop_building"]:
sys.exit(0)
pkgdir = os.path.join(other_state["clones_dir"], pkg) pkgdir = os.path.join(other_state["clones_dir"], pkg)
if "ccache_dir" in pkg_state[pkg]: if "ccache_dir" in pkg_state[pkg]:
cleanup_sccache( cleanup_sccache(
@ -1565,6 +1558,17 @@ def update_pkg_list(
), ),
) )
max_name_len = 1
for pkg in pkgs:
if len(pkg) + 1 > max_name_len:
max_name_len = len(pkg) + 1
for pkg in pkgs:
name_space = " " * (max_name_len - len(pkg))
log_print(
f'"{pkg}"{name_space}status: {pkg_state[pkg]["build_status"]}',
other_state=other_state,
)
def get_latest_pkg(pkg: str, cache_dir: str): def get_latest_pkg(pkg: str, cache_dir: str):
"""Gets the latest pkg from the specified "cache_dir" and return its """Gets the latest pkg from the specified "cache_dir" and return its
@ -1587,7 +1591,7 @@ def get_latest_pkg(pkg: str, cache_dir: str):
return None return None
def confirm_result(pkg: str, state_result: str, other_state: dict[str, Any]): def confirm_result(pkg: str, state_result: str):
"""Prompts the user the action to take for a pkg after checking its """Prompts the user the action to take for a pkg after checking its
PKGBUILD. PKGBUILD.
@ -1651,27 +1655,8 @@ def print_state_info_and_get_update_list(
return to_update return to_update
def build_print_pkg_info(
pkgs: tuple[str], pkg_state: dict[str, Any], other_state: dict[str, Any]
):
"""Prints the current "build" state of the given pkgs."""
max_name_len = 1
for pkg in pkgs:
if len(pkg) + 1 > max_name_len:
max_name_len = len(pkg) + 1
for pkg in pkgs:
name_space = " " * (max_name_len - len(pkg))
log_print(
f'"{pkg}"{name_space}status: {pkg_state[pkg]["build_status"]}',
other_state=other_state,
)
def test_gpg_passphrase( def test_gpg_passphrase(
signing_gpg_dir: str, signing_gpg_dir: str, signing_key_fp: str, passphrase: str
signing_key_fp: str,
passphrase: str,
other_state: dict[str, Any],
): ):
"""Checks if the given gpg passphrase works with the gpg signing key.""" """Checks if the given gpg passphrase works with the gpg signing key."""
@ -1724,7 +1709,7 @@ def test_gpg_passphrase(
return True return True
def validate_and_verify_paths(other_state: dict[str, Any]): def validate_and_verify_paths(other_state: dict[str, Union[None, str]]):
"""Checks and validates/ensures that certain directories exist.""" """Checks and validates/ensures that certain directories exist."""
if not os.path.exists(other_state["chroot"]): if not os.path.exists(other_state["chroot"]):
@ -1773,16 +1758,13 @@ def signal_handler(sig, frame):
print_state_info_and_get_update_list(OTHER_STATE, PKG_STATE) print_state_info_and_get_update_list(OTHER_STATE, PKG_STATE)
if signal.Signals(sig) is not signal.SIGINT: if signal.Signals(sig) is not signal.SIGINT:
return return
OTHER_STATE["stop_building"] = True
sys.exit(0) sys.exit(0)
if signal.Signals(sig) is not signal.SIGINT: if signal.Signals(sig) is not signal.SIGINT:
return return
OTHER_STATE["stop_building"] = True
sys.exit(1) sys.exit(1)
def main(): if __name__ == "__main__":
"""The main function."""
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGUSR1, signal_handler) signal.signal(signal.SIGUSR1, signal_handler)
editor = None editor = None
@ -1849,12 +1831,9 @@ def main():
pkg_state = {} pkg_state = {}
other_state = {} other_state = {}
global PKG_STATE, OTHER_STATE
PKG_STATE = pkg_state PKG_STATE = pkg_state
OTHER_STATE = other_state OTHER_STATE = other_state
other_state["USER"] = os.environ["USER"] other_state["USER"] = os.environ["USER"]
other_state["UID"] = pwd.getpwnam(other_state["USER"]).pw_uid
other_state["stop_building"] = False
other_state["logs_dir"] = None other_state["logs_dir"] = None
other_state["log_limit"] = 1024 * 1024 * 1024 other_state["log_limit"] = 1024 * 1024 * 1024
other_state["error_on_limit"] = False other_state["error_on_limit"] = False
@ -1893,7 +1872,6 @@ def main():
other_state["signing_gpg_dir"], other_state["signing_gpg_dir"],
other_state["signing_gpg_key_fp"], other_state["signing_gpg_key_fp"],
other_state["signing_gpg_pass"], other_state["signing_gpg_pass"],
other_state,
): ):
sys.exit(1) sys.exit(1)
elif args.config: elif args.config:
@ -1995,7 +1973,6 @@ def main():
other_state["signing_gpg_dir"], other_state["signing_gpg_dir"],
other_state["signing_gpg_key_fp"], other_state["signing_gpg_key_fp"],
other_state["signing_gpg_pass"], other_state["signing_gpg_pass"],
other_state,
): ):
sys.exit(1) sys.exit(1)
if "editor" in d: if "editor" in d:
@ -2056,7 +2033,7 @@ def main():
os.path.dirname(os.path.realpath(other_state["chroot"])), os.path.dirname(os.path.realpath(other_state["chroot"])),
"tmpfs_chroot", "tmpfs_chroot",
) )
get_sudo_privileges(other_state) get_sudo_privileges()
try: try:
old_umask = os.umask(0o077) old_umask = os.umask(0o077)
log_print( log_print(
@ -2080,7 +2057,7 @@ def main():
"-t", "-t",
"tmpfs", "tmpfs",
"-o", "-o",
f"size=90%,mode=0700,uid={other_state['UID']}", "size=90%",
"tmpfs", "tmpfs",
other_state["tmpfs_chroot"], other_state["tmpfs_chroot"],
), ),
@ -2091,13 +2068,40 @@ def main():
( (
"/usr/bin/env", "/usr/bin/env",
"sudo", "sudo",
"bash", "umount",
"-c", tmpfs_path,
f"for ((i=0; i<5; ++i)); do if umount {tmpfs_path}; then break; fi; sleep 1; done",
) )
), ),
other_state["tmpfs_chroot"], other_state["tmpfs_chroot"],
) )
log_print(
"Setting tmpfs dir permissions...", other_state=other_state
)
subprocess.run(
(
"/usr/bin/env",
"sudo",
"chmod",
"700",
other_state["tmpfs_chroot"],
),
check=True,
)
log_print(
"Giving self user owner of tmpfs dir...",
other_state=other_state,
)
subprocess.run(
(
"/usr/bin/env",
"sudo",
"chown",
"-R",
other_state["USER"],
other_state["tmpfs_chroot"],
),
check=True,
)
os.umask(old_umask) os.umask(old_umask)
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
log_print("ERROR: Failed to set up tmpfs!") log_print("ERROR: Failed to set up tmpfs!")
@ -2257,9 +2261,7 @@ def main():
False, False,
other_state, other_state,
) )
confirm_result_result = confirm_result( confirm_result_result = confirm_result(pkg_list[i], state_result)
pkg_list[i], state_result, other_state
)
if confirm_result_result == "continue": if confirm_result_result == "continue":
pkg_state[pkg_list[i]]["state"] = state_result pkg_state[pkg_list[i]]["state"] = state_result
pkg_state[pkg_list[i]]["build_status"] = ( pkg_state[pkg_list[i]]["build_status"] = (
@ -2320,7 +2322,3 @@ def main():
log_print("Canceled.", other_state=other_state) log_print("Canceled.", other_state=other_state)
else: else:
log_print("No packages to update, done.", other_state=other_state) log_print("No packages to update, done.", other_state=other_state)
if __name__ == "__main__":
main()