Compare commits
12 commits
9c82f847ee
...
6fde09ea71
Author | SHA1 | Date | |
---|---|---|---|
6fde09ea71 | |||
4fd128e27b | |||
2b2a3c0dc3 | |||
b0185e1826 | |||
0914c37345 | |||
f79849340e | |||
c2d15ea593 | |||
d57adee92c | |||
c11d339baa | |||
d3b8bff839 | |||
ba1980afbe | |||
02af05e7a4 |
2 changed files with 64 additions and 60 deletions
|
@ -22,6 +22,8 @@ log_limit = 1073741824
|
|||
error_on_limit = false
|
||||
# If true, timestamps are in localtime. If false, timestamps are UTC.
|
||||
datetime_in_local_time = true
|
||||
# If true, all builds will be done in a tmpfs. Recommended to have a lot of RAM and/or swap.
|
||||
tmpfs = false
|
||||
########## END OF MANDATORY VARIABLES
|
||||
|
||||
# Each [[entry]] needs a "name".
|
||||
|
|
122
update.py
122
update.py
|
@ -18,6 +18,7 @@ import threading
|
|||
from pathlib import Path
|
||||
from typing import Any, Union
|
||||
import signal
|
||||
import pwd
|
||||
|
||||
# SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
SUDO_PROC = False
|
||||
|
@ -258,7 +259,7 @@ def log_print(*args, **kwargs):
|
|||
def ensure_pkg_dir_exists(
|
||||
pkg: str,
|
||||
pkg_state: dict[str, Any],
|
||||
other_state: dict[str, Union[None, str]],
|
||||
other_state: dict[str, Any],
|
||||
):
|
||||
"""Ensures that an AUR-pkg-dir exists, returning False on failure.
|
||||
|
||||
|
@ -324,7 +325,7 @@ def ensure_pkg_dir_exists(
|
|||
def update_pkg_dir(
|
||||
pkg: str,
|
||||
pkg_state: dict[str, Any],
|
||||
other_state: dict[str, Union[None, str]],
|
||||
other_state: dict[str, Any],
|
||||
):
|
||||
"""Updates the pkg by invoking "git pull".
|
||||
|
||||
|
@ -508,7 +509,7 @@ def update_pkg_dir(
|
|||
def check_pkg_build(
|
||||
pkg: str,
|
||||
pkg_state: dict[str, Any],
|
||||
other_state: dict[str, Union[None, str]],
|
||||
other_state: dict[str, Any],
|
||||
editor: str,
|
||||
):
|
||||
"""Opens the PKGBUILD in the editor, then prompts the user for an action.
|
||||
|
@ -564,7 +565,7 @@ def check_pkg_version(
|
|||
pkg_state: dict[str, Any],
|
||||
repo: str,
|
||||
force_check_srcinfo: bool,
|
||||
other_state: dict[str, Union[None, str]],
|
||||
other_state: dict[str, Any],
|
||||
):
|
||||
"""Gets the installed version and pkg version and checks them.
|
||||
|
||||
|
@ -572,7 +573,7 @@ def check_pkg_version(
|
|||
(installed pkg is up to date)."""
|
||||
|
||||
status, current_epoch, current_version = get_pkg_current_version(
|
||||
pkg, pkg_state, repo
|
||||
pkg, pkg_state, repo, other_state
|
||||
)
|
||||
if status != "fetched":
|
||||
return status
|
||||
|
@ -603,7 +604,7 @@ def check_pkg_version(
|
|||
)
|
||||
|
||||
|
||||
def get_srcinfo_version(pkg: str, other_state: dict[str, Union[None, str]]):
|
||||
def get_srcinfo_version(pkg: str, other_state: dict[str, Any]):
|
||||
"""Parses .SRCINFO for verison information.
|
||||
|
||||
Returns (success_bool, pkgepoch, pkgver, pkgrel)
|
||||
|
@ -649,7 +650,7 @@ def get_pkgbuild_version(
|
|||
pkg: str,
|
||||
force_check_srcinfo: bool,
|
||||
pkg_state: dict[str, Any],
|
||||
other_state: dict[str, Union[None, str]],
|
||||
other_state: dict[str, Any],
|
||||
):
|
||||
"""Gets the version of the pkg from .SRCINFO or PKGBUILD.
|
||||
|
||||
|
@ -800,7 +801,7 @@ def get_srcinfo_check_result(
|
|||
pkg: str,
|
||||
force_check_srcinfo: bool,
|
||||
pkg_state: dict[str, Any],
|
||||
other_state: dict[str, Union[None, str]],
|
||||
other_state: dict[str, Any],
|
||||
):
|
||||
"""Checks the version of the pkg against the currently installed version.
|
||||
|
||||
|
@ -873,7 +874,9 @@ def get_srcinfo_check_result(
|
|||
return "fail"
|
||||
|
||||
|
||||
def get_pkg_current_version(pkg: str, pkg_state: dict[str, Any], repo: str):
|
||||
def get_pkg_current_version(
|
||||
pkg: str, pkg_state: dict[str, Any], repo: str, other_state: dict[str, Any]
|
||||
):
|
||||
"""Fetches the version info and returns status of fetching and the version.
|
||||
|
||||
Returns (status, epoch, version)
|
||||
|
@ -939,7 +942,7 @@ def get_pkg_current_version(pkg: str, pkg_state: dict[str, Any], repo: str):
|
|||
return "fetched", current_epoch, current_version
|
||||
|
||||
|
||||
def get_sudo_privileges():
|
||||
def get_sudo_privileges(other_state: dict[str, Any]):
|
||||
"""Starts a bash loop that ensures sudo privileges are ready while this
|
||||
script is active."""
|
||||
|
||||
|
@ -1193,7 +1196,7 @@ def handle_output_stream(
|
|||
def update_pkg_list(
|
||||
pkgs: list[str],
|
||||
pkg_state: dict[str, Any],
|
||||
other_state: dict[str, Union[None, str]],
|
||||
other_state: dict[str, Any],
|
||||
signing_gpg_dir: str,
|
||||
signing_gpg_key_fp: str,
|
||||
signing_gpg_pass: str,
|
||||
|
@ -1202,13 +1205,17 @@ def update_pkg_list(
|
|||
"""For each package to build: builds it, signs it, and moves it to
|
||||
"pkg_out_dir"."""
|
||||
|
||||
if not get_sudo_privileges():
|
||||
atexit.register(build_print_pkg_info, pkgs, pkg_state, other_state)
|
||||
|
||||
if not get_sudo_privileges(other_state):
|
||||
log_print(
|
||||
"ERROR: Failed to get sudo privileges", other_state=other_state
|
||||
)
|
||||
pkg_state[pkg]["build_status"] = "get_sudo_fail"
|
||||
sys.exit(1)
|
||||
for pkg in pkgs:
|
||||
if other_state["stop_building"]:
|
||||
sys.exit(0)
|
||||
pkgdir = os.path.join(other_state["clones_dir"], pkg)
|
||||
if "ccache_dir" in pkg_state[pkg]:
|
||||
cleanup_sccache(
|
||||
|
@ -1558,17 +1565,6 @@ def update_pkg_list(
|
|||
),
|
||||
)
|
||||
|
||||
max_name_len = 1
|
||||
for pkg in pkgs:
|
||||
if len(pkg) + 1 > max_name_len:
|
||||
max_name_len = len(pkg) + 1
|
||||
for pkg in pkgs:
|
||||
name_space = " " * (max_name_len - len(pkg))
|
||||
log_print(
|
||||
f'"{pkg}"{name_space}status: {pkg_state[pkg]["build_status"]}',
|
||||
other_state=other_state,
|
||||
)
|
||||
|
||||
|
||||
def get_latest_pkg(pkg: str, cache_dir: str):
|
||||
"""Gets the latest pkg from the specified "cache_dir" and return its
|
||||
|
@ -1591,7 +1587,7 @@ def get_latest_pkg(pkg: str, cache_dir: str):
|
|||
return None
|
||||
|
||||
|
||||
def confirm_result(pkg: str, state_result: str):
|
||||
def confirm_result(pkg: str, state_result: str, other_state: dict[str, Any]):
|
||||
"""Prompts the user the action to take for a pkg after checking its
|
||||
PKGBUILD.
|
||||
|
||||
|
@ -1655,8 +1651,27 @@ def print_state_info_and_get_update_list(
|
|||
return to_update
|
||||
|
||||
|
||||
def build_print_pkg_info(
|
||||
pkgs: tuple[str], pkg_state: dict[str, Any], other_state: dict[str, Any]
|
||||
):
|
||||
"""Prints the current "build" state of the given pkgs."""
|
||||
max_name_len = 1
|
||||
for pkg in pkgs:
|
||||
if len(pkg) + 1 > max_name_len:
|
||||
max_name_len = len(pkg) + 1
|
||||
for pkg in pkgs:
|
||||
name_space = " " * (max_name_len - len(pkg))
|
||||
log_print(
|
||||
f'"{pkg}"{name_space}status: {pkg_state[pkg]["build_status"]}',
|
||||
other_state=other_state,
|
||||
)
|
||||
|
||||
|
||||
def test_gpg_passphrase(
|
||||
signing_gpg_dir: str, signing_key_fp: str, passphrase: str
|
||||
signing_gpg_dir: str,
|
||||
signing_key_fp: str,
|
||||
passphrase: str,
|
||||
other_state: dict[str, Any],
|
||||
):
|
||||
"""Checks if the given gpg passphrase works with the gpg signing key."""
|
||||
|
||||
|
@ -1709,7 +1724,7 @@ def test_gpg_passphrase(
|
|||
return True
|
||||
|
||||
|
||||
def validate_and_verify_paths(other_state: dict[str, Union[None, str]]):
|
||||
def validate_and_verify_paths(other_state: dict[str, Any]):
|
||||
"""Checks and validates/ensures that certain directories exist."""
|
||||
|
||||
if not os.path.exists(other_state["chroot"]):
|
||||
|
@ -1758,13 +1773,16 @@ def signal_handler(sig, frame):
|
|||
print_state_info_and_get_update_list(OTHER_STATE, PKG_STATE)
|
||||
if signal.Signals(sig) is not signal.SIGINT:
|
||||
return
|
||||
OTHER_STATE["stop_building"] = True
|
||||
sys.exit(0)
|
||||
if signal.Signals(sig) is not signal.SIGINT:
|
||||
return
|
||||
OTHER_STATE["stop_building"] = True
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
def main():
|
||||
"""The main function."""
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
signal.signal(signal.SIGUSR1, signal_handler)
|
||||
editor = None
|
||||
|
@ -1831,9 +1849,12 @@ if __name__ == "__main__":
|
|||
|
||||
pkg_state = {}
|
||||
other_state = {}
|
||||
global PKG_STATE, OTHER_STATE
|
||||
PKG_STATE = pkg_state
|
||||
OTHER_STATE = other_state
|
||||
other_state["USER"] = os.environ["USER"]
|
||||
other_state["UID"] = pwd.getpwnam(other_state["USER"]).pw_uid
|
||||
other_state["stop_building"] = False
|
||||
other_state["logs_dir"] = None
|
||||
other_state["log_limit"] = 1024 * 1024 * 1024
|
||||
other_state["error_on_limit"] = False
|
||||
|
@ -1872,6 +1893,7 @@ if __name__ == "__main__":
|
|||
other_state["signing_gpg_dir"],
|
||||
other_state["signing_gpg_key_fp"],
|
||||
other_state["signing_gpg_pass"],
|
||||
other_state,
|
||||
):
|
||||
sys.exit(1)
|
||||
elif args.config:
|
||||
|
@ -1973,6 +1995,7 @@ if __name__ == "__main__":
|
|||
other_state["signing_gpg_dir"],
|
||||
other_state["signing_gpg_key_fp"],
|
||||
other_state["signing_gpg_pass"],
|
||||
other_state,
|
||||
):
|
||||
sys.exit(1)
|
||||
if "editor" in d:
|
||||
|
@ -2033,7 +2056,7 @@ if __name__ == "__main__":
|
|||
os.path.dirname(os.path.realpath(other_state["chroot"])),
|
||||
"tmpfs_chroot",
|
||||
)
|
||||
get_sudo_privileges()
|
||||
get_sudo_privileges(other_state)
|
||||
try:
|
||||
old_umask = os.umask(0o077)
|
||||
log_print(
|
||||
|
@ -2057,7 +2080,7 @@ if __name__ == "__main__":
|
|||
"-t",
|
||||
"tmpfs",
|
||||
"-o",
|
||||
"size=90%",
|
||||
f"size=90%,mode=0700,uid={other_state['UID']}",
|
||||
"tmpfs",
|
||||
other_state["tmpfs_chroot"],
|
||||
),
|
||||
|
@ -2068,40 +2091,13 @@ if __name__ == "__main__":
|
|||
(
|
||||
"/usr/bin/env",
|
||||
"sudo",
|
||||
"umount",
|
||||
tmpfs_path,
|
||||
"bash",
|
||||
"-c",
|
||||
f"for ((i=0; i<5; ++i)); do if umount {tmpfs_path}; then break; fi; sleep 1; done",
|
||||
)
|
||||
),
|
||||
other_state["tmpfs_chroot"],
|
||||
)
|
||||
log_print(
|
||||
"Setting tmpfs dir permissions...", other_state=other_state
|
||||
)
|
||||
subprocess.run(
|
||||
(
|
||||
"/usr/bin/env",
|
||||
"sudo",
|
||||
"chmod",
|
||||
"700",
|
||||
other_state["tmpfs_chroot"],
|
||||
),
|
||||
check=True,
|
||||
)
|
||||
log_print(
|
||||
"Giving self user owner of tmpfs dir...",
|
||||
other_state=other_state,
|
||||
)
|
||||
subprocess.run(
|
||||
(
|
||||
"/usr/bin/env",
|
||||
"sudo",
|
||||
"chown",
|
||||
"-R",
|
||||
other_state["USER"],
|
||||
other_state["tmpfs_chroot"],
|
||||
),
|
||||
check=True,
|
||||
)
|
||||
os.umask(old_umask)
|
||||
except subprocess.CalledProcessError:
|
||||
log_print("ERROR: Failed to set up tmpfs!")
|
||||
|
@ -2261,7 +2257,9 @@ if __name__ == "__main__":
|
|||
False,
|
||||
other_state,
|
||||
)
|
||||
confirm_result_result = confirm_result(pkg_list[i], state_result)
|
||||
confirm_result_result = confirm_result(
|
||||
pkg_list[i], state_result, other_state
|
||||
)
|
||||
if confirm_result_result == "continue":
|
||||
pkg_state[pkg_list[i]]["state"] = state_result
|
||||
pkg_state[pkg_list[i]]["build_status"] = (
|
||||
|
@ -2322,3 +2320,7 @@ if __name__ == "__main__":
|
|||
log_print("Canceled.", other_state=other_state)
|
||||
else:
|
||||
log_print("No packages to update, done.", other_state=other_state)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
Loading…
Reference in a new issue