Compare commits

...

12 commits

Author SHA1 Message Date
6fde09ea71 Fix build_print_pkg_info() depending on global 2023-11-17 20:58:49 +09:00
4fd128e27b Fix confirm_result() depending on global 2023-11-17 20:55:17 +09:00
2b2a3c0dc3 Fix get_pkg_current_version() depending on global 2023-11-17 20:53:38 +09:00
b0185e1826 Fix type hints of "other_state" 2023-11-17 20:51:50 +09:00
0914c37345 Fix get_sudo_privileges depending on global
Another function that depended on "other_state" being global was fixed.
2023-11-17 20:51:26 +09:00
f79849340e Fix function missing parameter
By putting main into a function, other_state is no longer a global. Some
functions depended on this global dict, so pass it as a parameter
instead.
2023-11-17 20:46:58 +09:00
c2d15ea593 Encapsulate main into a function 2023-11-17 20:45:47 +09:00
d57adee92c Add comment about tmpfs in example_config.toml 2023-11-17 20:42:45 +09:00
c11d339baa Set 5 attempts to umount tmpfs 2023-11-17 20:40:44 +09:00
d3b8bff839 Set flag to stop building if signal SIGINT 2023-11-17 20:36:43 +09:00
ba1980afbe More robust status print ending build step 2023-11-17 20:32:12 +09:00
02af05e7a4 Use mount options to set tmpfs user/permissions 2023-11-17 20:23:23 +09:00
2 changed files with 64 additions and 60 deletions

View file

@ -22,6 +22,8 @@ log_limit = 1073741824
error_on_limit = false error_on_limit = false
# If true, timestamps are in localtime. If false, timestamps are UTC. # If true, timestamps are in localtime. If false, timestamps are UTC.
datetime_in_local_time = true datetime_in_local_time = true
# If true, all builds will be done in a tmpfs. Recommended to have a lot of RAM and/or swap.
tmpfs = false
########## END OF MANDATORY VARIABLES ########## END OF MANDATORY VARIABLES
# Each [[entry]] needs a "name". # Each [[entry]] needs a "name".

122
update.py
View file

@ -18,6 +18,7 @@ import threading
from pathlib import Path from pathlib import Path
from typing import Any, Union from typing import Any, Union
import signal import signal
import pwd
# SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) # SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
SUDO_PROC = False SUDO_PROC = False
@ -258,7 +259,7 @@ def log_print(*args, **kwargs):
def ensure_pkg_dir_exists( def ensure_pkg_dir_exists(
pkg: str, pkg: str,
pkg_state: dict[str, Any], pkg_state: dict[str, Any],
other_state: dict[str, Union[None, str]], other_state: dict[str, Any],
): ):
"""Ensures that an AUR-pkg-dir exists, returning False on failure. """Ensures that an AUR-pkg-dir exists, returning False on failure.
@ -324,7 +325,7 @@ def ensure_pkg_dir_exists(
def update_pkg_dir( def update_pkg_dir(
pkg: str, pkg: str,
pkg_state: dict[str, Any], pkg_state: dict[str, Any],
other_state: dict[str, Union[None, str]], other_state: dict[str, Any],
): ):
"""Updates the pkg by invoking "git pull". """Updates the pkg by invoking "git pull".
@ -508,7 +509,7 @@ def update_pkg_dir(
def check_pkg_build( def check_pkg_build(
pkg: str, pkg: str,
pkg_state: dict[str, Any], pkg_state: dict[str, Any],
other_state: dict[str, Union[None, str]], other_state: dict[str, Any],
editor: str, editor: str,
): ):
"""Opens the PKGBUILD in the editor, then prompts the user for an action. """Opens the PKGBUILD in the editor, then prompts the user for an action.
@ -564,7 +565,7 @@ def check_pkg_version(
pkg_state: dict[str, Any], pkg_state: dict[str, Any],
repo: str, repo: str,
force_check_srcinfo: bool, force_check_srcinfo: bool,
other_state: dict[str, Union[None, str]], other_state: dict[str, Any],
): ):
"""Gets the installed version and pkg version and checks them. """Gets the installed version and pkg version and checks them.
@ -572,7 +573,7 @@ def check_pkg_version(
(installed pkg is up to date).""" (installed pkg is up to date)."""
status, current_epoch, current_version = get_pkg_current_version( status, current_epoch, current_version = get_pkg_current_version(
pkg, pkg_state, repo pkg, pkg_state, repo, other_state
) )
if status != "fetched": if status != "fetched":
return status return status
@ -603,7 +604,7 @@ def check_pkg_version(
) )
def get_srcinfo_version(pkg: str, other_state: dict[str, Union[None, str]]): def get_srcinfo_version(pkg: str, other_state: dict[str, Any]):
"""Parses .SRCINFO for verison information. """Parses .SRCINFO for verison information.
Returns (success_bool, pkgepoch, pkgver, pkgrel) Returns (success_bool, pkgepoch, pkgver, pkgrel)
@ -649,7 +650,7 @@ def get_pkgbuild_version(
pkg: str, pkg: str,
force_check_srcinfo: bool, force_check_srcinfo: bool,
pkg_state: dict[str, Any], pkg_state: dict[str, Any],
other_state: dict[str, Union[None, str]], other_state: dict[str, Any],
): ):
"""Gets the version of the pkg from .SRCINFO or PKGBUILD. """Gets the version of the pkg from .SRCINFO or PKGBUILD.
@ -800,7 +801,7 @@ def get_srcinfo_check_result(
pkg: str, pkg: str,
force_check_srcinfo: bool, force_check_srcinfo: bool,
pkg_state: dict[str, Any], pkg_state: dict[str, Any],
other_state: dict[str, Union[None, str]], other_state: dict[str, Any],
): ):
"""Checks the version of the pkg against the currently installed version. """Checks the version of the pkg against the currently installed version.
@ -873,7 +874,9 @@ def get_srcinfo_check_result(
return "fail" return "fail"
def get_pkg_current_version(pkg: str, pkg_state: dict[str, Any], repo: str): def get_pkg_current_version(
pkg: str, pkg_state: dict[str, Any], repo: str, other_state: dict[str, Any]
):
"""Fetches the version info and returns status of fetching and the version. """Fetches the version info and returns status of fetching and the version.
Returns (status, epoch, version) Returns (status, epoch, version)
@ -939,7 +942,7 @@ def get_pkg_current_version(pkg: str, pkg_state: dict[str, Any], repo: str):
return "fetched", current_epoch, current_version return "fetched", current_epoch, current_version
def get_sudo_privileges(): def get_sudo_privileges(other_state: dict[str, Any]):
"""Starts a bash loop that ensures sudo privileges are ready while this """Starts a bash loop that ensures sudo privileges are ready while this
script is active.""" script is active."""
@ -1193,7 +1196,7 @@ def handle_output_stream(
def update_pkg_list( def update_pkg_list(
pkgs: list[str], pkgs: list[str],
pkg_state: dict[str, Any], pkg_state: dict[str, Any],
other_state: dict[str, Union[None, str]], other_state: dict[str, Any],
signing_gpg_dir: str, signing_gpg_dir: str,
signing_gpg_key_fp: str, signing_gpg_key_fp: str,
signing_gpg_pass: str, signing_gpg_pass: str,
@ -1202,13 +1205,17 @@ def update_pkg_list(
"""For each package to build: builds it, signs it, and moves it to """For each package to build: builds it, signs it, and moves it to
"pkg_out_dir".""" "pkg_out_dir"."""
if not get_sudo_privileges(): atexit.register(build_print_pkg_info, pkgs, pkg_state, other_state)
if not get_sudo_privileges(other_state):
log_print( log_print(
"ERROR: Failed to get sudo privileges", other_state=other_state "ERROR: Failed to get sudo privileges", other_state=other_state
) )
pkg_state[pkg]["build_status"] = "get_sudo_fail" pkg_state[pkg]["build_status"] = "get_sudo_fail"
sys.exit(1) sys.exit(1)
for pkg in pkgs: for pkg in pkgs:
if other_state["stop_building"]:
sys.exit(0)
pkgdir = os.path.join(other_state["clones_dir"], pkg) pkgdir = os.path.join(other_state["clones_dir"], pkg)
if "ccache_dir" in pkg_state[pkg]: if "ccache_dir" in pkg_state[pkg]:
cleanup_sccache( cleanup_sccache(
@ -1558,17 +1565,6 @@ def update_pkg_list(
), ),
) )
max_name_len = 1
for pkg in pkgs:
if len(pkg) + 1 > max_name_len:
max_name_len = len(pkg) + 1
for pkg in pkgs:
name_space = " " * (max_name_len - len(pkg))
log_print(
f'"{pkg}"{name_space}status: {pkg_state[pkg]["build_status"]}',
other_state=other_state,
)
def get_latest_pkg(pkg: str, cache_dir: str): def get_latest_pkg(pkg: str, cache_dir: str):
"""Gets the latest pkg from the specified "cache_dir" and return its """Gets the latest pkg from the specified "cache_dir" and return its
@ -1591,7 +1587,7 @@ def get_latest_pkg(pkg: str, cache_dir: str):
return None return None
def confirm_result(pkg: str, state_result: str): def confirm_result(pkg: str, state_result: str, other_state: dict[str, Any]):
"""Prompts the user the action to take for a pkg after checking its """Prompts the user the action to take for a pkg after checking its
PKGBUILD. PKGBUILD.
@ -1655,8 +1651,27 @@ def print_state_info_and_get_update_list(
return to_update return to_update
def build_print_pkg_info(
pkgs: tuple[str], pkg_state: dict[str, Any], other_state: dict[str, Any]
):
"""Prints the current "build" state of the given pkgs."""
max_name_len = 1
for pkg in pkgs:
if len(pkg) + 1 > max_name_len:
max_name_len = len(pkg) + 1
for pkg in pkgs:
name_space = " " * (max_name_len - len(pkg))
log_print(
f'"{pkg}"{name_space}status: {pkg_state[pkg]["build_status"]}',
other_state=other_state,
)
def test_gpg_passphrase( def test_gpg_passphrase(
signing_gpg_dir: str, signing_key_fp: str, passphrase: str signing_gpg_dir: str,
signing_key_fp: str,
passphrase: str,
other_state: dict[str, Any],
): ):
"""Checks if the given gpg passphrase works with the gpg signing key.""" """Checks if the given gpg passphrase works with the gpg signing key."""
@ -1709,7 +1724,7 @@ def test_gpg_passphrase(
return True return True
def validate_and_verify_paths(other_state: dict[str, Union[None, str]]): def validate_and_verify_paths(other_state: dict[str, Any]):
"""Checks and validates/ensures that certain directories exist.""" """Checks and validates/ensures that certain directories exist."""
if not os.path.exists(other_state["chroot"]): if not os.path.exists(other_state["chroot"]):
@ -1758,13 +1773,16 @@ def signal_handler(sig, frame):
print_state_info_and_get_update_list(OTHER_STATE, PKG_STATE) print_state_info_and_get_update_list(OTHER_STATE, PKG_STATE)
if signal.Signals(sig) is not signal.SIGINT: if signal.Signals(sig) is not signal.SIGINT:
return return
OTHER_STATE["stop_building"] = True
sys.exit(0) sys.exit(0)
if signal.Signals(sig) is not signal.SIGINT: if signal.Signals(sig) is not signal.SIGINT:
return return
OTHER_STATE["stop_building"] = True
sys.exit(1) sys.exit(1)
if __name__ == "__main__": def main():
"""The main function."""
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGUSR1, signal_handler) signal.signal(signal.SIGUSR1, signal_handler)
editor = None editor = None
@ -1831,9 +1849,12 @@ if __name__ == "__main__":
pkg_state = {} pkg_state = {}
other_state = {} other_state = {}
global PKG_STATE, OTHER_STATE
PKG_STATE = pkg_state PKG_STATE = pkg_state
OTHER_STATE = other_state OTHER_STATE = other_state
other_state["USER"] = os.environ["USER"] other_state["USER"] = os.environ["USER"]
other_state["UID"] = pwd.getpwnam(other_state["USER"]).pw_uid
other_state["stop_building"] = False
other_state["logs_dir"] = None other_state["logs_dir"] = None
other_state["log_limit"] = 1024 * 1024 * 1024 other_state["log_limit"] = 1024 * 1024 * 1024
other_state["error_on_limit"] = False other_state["error_on_limit"] = False
@ -1872,6 +1893,7 @@ if __name__ == "__main__":
other_state["signing_gpg_dir"], other_state["signing_gpg_dir"],
other_state["signing_gpg_key_fp"], other_state["signing_gpg_key_fp"],
other_state["signing_gpg_pass"], other_state["signing_gpg_pass"],
other_state,
): ):
sys.exit(1) sys.exit(1)
elif args.config: elif args.config:
@ -1973,6 +1995,7 @@ if __name__ == "__main__":
other_state["signing_gpg_dir"], other_state["signing_gpg_dir"],
other_state["signing_gpg_key_fp"], other_state["signing_gpg_key_fp"],
other_state["signing_gpg_pass"], other_state["signing_gpg_pass"],
other_state,
): ):
sys.exit(1) sys.exit(1)
if "editor" in d: if "editor" in d:
@ -2033,7 +2056,7 @@ if __name__ == "__main__":
os.path.dirname(os.path.realpath(other_state["chroot"])), os.path.dirname(os.path.realpath(other_state["chroot"])),
"tmpfs_chroot", "tmpfs_chroot",
) )
get_sudo_privileges() get_sudo_privileges(other_state)
try: try:
old_umask = os.umask(0o077) old_umask = os.umask(0o077)
log_print( log_print(
@ -2057,7 +2080,7 @@ if __name__ == "__main__":
"-t", "-t",
"tmpfs", "tmpfs",
"-o", "-o",
"size=90%", f"size=90%,mode=0700,uid={other_state['UID']}",
"tmpfs", "tmpfs",
other_state["tmpfs_chroot"], other_state["tmpfs_chroot"],
), ),
@ -2068,40 +2091,13 @@ if __name__ == "__main__":
( (
"/usr/bin/env", "/usr/bin/env",
"sudo", "sudo",
"umount", "bash",
tmpfs_path, "-c",
f"for ((i=0; i<5; ++i)); do if umount {tmpfs_path}; then break; fi; sleep 1; done",
) )
), ),
other_state["tmpfs_chroot"], other_state["tmpfs_chroot"],
) )
log_print(
"Setting tmpfs dir permissions...", other_state=other_state
)
subprocess.run(
(
"/usr/bin/env",
"sudo",
"chmod",
"700",
other_state["tmpfs_chroot"],
),
check=True,
)
log_print(
"Giving self user owner of tmpfs dir...",
other_state=other_state,
)
subprocess.run(
(
"/usr/bin/env",
"sudo",
"chown",
"-R",
other_state["USER"],
other_state["tmpfs_chroot"],
),
check=True,
)
os.umask(old_umask) os.umask(old_umask)
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
log_print("ERROR: Failed to set up tmpfs!") log_print("ERROR: Failed to set up tmpfs!")
@ -2261,7 +2257,9 @@ if __name__ == "__main__":
False, False,
other_state, other_state,
) )
confirm_result_result = confirm_result(pkg_list[i], state_result) confirm_result_result = confirm_result(
pkg_list[i], state_result, other_state
)
if confirm_result_result == "continue": if confirm_result_result == "continue":
pkg_state[pkg_list[i]]["state"] = state_result pkg_state[pkg_list[i]]["state"] = state_result
pkg_state[pkg_list[i]]["build_status"] = ( pkg_state[pkg_list[i]]["build_status"] = (
@ -2322,3 +2320,7 @@ if __name__ == "__main__":
log_print("Canceled.", other_state=other_state) log_print("Canceled.", other_state=other_state)
else: else:
log_print("No packages to update, done.", other_state=other_state) log_print("No packages to update, done.", other_state=other_state)
if __name__ == "__main__":
main()