def log_print(*args, **kwargs):
+ """Prints to stdout, then logs to GLOBAL_LOG_FILE."""
+
if "file" in kwargs:
kwargs["file"] = sys.stdout
print(*args, **kwargs)
def ensure_pkg_dir_exists(pkg, pkg_state, other_state):
+ """Ensures that an AUR-pkg-dir exists, returning False on failure.
+
+ If no such directory exists, this script attempts to clone it from the AUR.
+ True is returned on successful cloning.
+
+ If a file exists with the same name, returns False.
+
+ If a directory exists with the same name, returns True.
+
+ If "repo_path" is specified and the directory doesn't exist, returns False.
+
+ If "NO_REPO" is specified as "repo_path" and the directory doesn't exist,
+ returns False.
+ """
+
log_print('Checking that dir for "{}" exists...'.format(pkg))
pkgdir = os.path.join(other_state["clones_dir"], pkg)
if os.path.isdir(pkgdir):
elif pkg_state[pkg]["repo_path"] == "NO_REPO":
log_print('"{}" does not exist, but NO_REPO specified for repo_path')
return False
+ return False
def update_pkg_dir(pkg, pkg_state, other_state):
+ """Updates the pkg by invoking "git pull".
+
+ If "git pull" failes, it is retried after invoking "git restore .".
+
+ If the local working directory fails to update via "git pull" (or if some
+ other handling of the local repository fails), then this function returns
+ (False, False).
+
+ (True, True) is returned if "skip_branch_up_to_date" is True for the package
+ and the local repository is up to date.
+
+ (True, False) is returned by default (successful "git pull"; regardless of
+ if an update was fetched).
+ """
+
log_print('Making sure pkg dir for "{}" is up to date...'.format(pkg))
pkgdir = os.path.join(other_state["clones_dir"], pkg)
return False, False
# get remote that current branch is tracking
- selected_remote = ""
+ selected_remote = None
try:
result = subprocess.run(
["git", "status", "-sb", "--porcelain"],
capture_output=True,
encoding="UTF-8",
)
- for remote in remotes:
- if (
- len(remote.strip()) > 0
- and result.stdout.find(remote.strip()) != -1
- ):
- selected_remote = remote.strip()
- break
+ result_lines = result.stdout.split(sep="\n")
+ for matching_line in filter(lambda s: s.startswith("##"), result_lines):
+ for remote in map(lambda r: r.strip(), remotes):
+ if matching_line.find(remote) != -1:
+ selected_remote = remote
+ break
except subprocess.CalledProcessError:
log_print(
f'ERROR: Failed to update pkg dir of "{pkg}" (getting branch\'s remote).'
)
return False, False
- if len(selected_remote) == 0:
+ if selected_remote is None or not isinstance(selected_remote, str):
log_print(
f'ERROR: Failed to update pkg dir of "{pkg}" (getting branch\'s remote).'
)
return False, False
# get hash of current branch
- current_branch_hash = ""
+ current_branch_hash = None
try:
result = subprocess.run(
["git", "log", "-1", "--format=format:%H"],
f'ERROR: Failed to update pkg dir of "{pkg}" (getting current branch\'s hash).'
)
return False, False
- if len(current_branch_hash.strip()) == 0:
+ if current_branch_hash is None or not isinstance(current_branch_hash, str):
log_print(
f'ERROR: Failed to update pkg dir of "{pkg}" (getting current branch\'s hash).'
)
return False, False
# get hash of remote branch
- remote_branch_hash = ""
+ remote_branch_hash = None
try:
result = subprocess.run(
["git", "log", "-1", "--format=format:%H", selected_remote],
f'ERROR: Failed to update pkg dir of "{pkg}" (getting remote branch\'s hash).'
)
return False, False
- if len(remote_branch_hash.strip()) == 0:
+ if remote_branch_hash is None or not isinstance(remote_branch_hash, str):
log_print(
f'ERROR: Failed to update pkg dir of "{pkg}" (getting remote branch\'s hash).'
)
except subprocess.CalledProcessError:
try:
subprocess.run(
- ["git", "checkout", "--", "*"],
+ ["git", "restore", "."],
check=True,
cwd=pkgdir,
)
def check_pkg_build(pkg, pkg_state, other_state, editor):
- """Returns "ok", "not_ok", "abort", or "force_build"."""
+ """Opens the PKGBUILD in the editor, then prompts the user for an action.
+
+ Returns "ok", "not_ok", "abort", or "force_build"."""
+
pkgdir = os.path.join(other_state["clones_dir"], pkg)
log_print('Checking PKGBUILD for "{}"...'.format(pkg))
try:
def check_pkg_version(pkg, pkg_state, repo, force_check_srcinfo, other_state):
- """Returns "fail", "install", or "done"."""
+ """Gets the installed version and pkg version and checks them.
+
+ Returns "fail" (on failure), "install" (pkg is newer), or "done"
+ (installed pkg is up to date)."""
+
status, current_epoch, current_version = get_pkg_current_version(
pkg, pkg_state, repo
)
def get_srcinfo_version(pkg, other_state):
- """Returns (success_bool, pkgepoch, pkgver, pkgrel)"""
+ """Parses .SRCINFO for verison information.
+
+ Returns (success_bool, pkgepoch, pkgver, pkgrel)
+
+ When "success_bool" is False, all other values are None.
+
+ Otherwise, all other values are str (or None if not found)."""
+
if not os.path.exists(
os.path.join(other_state["clones_dir"], pkg, ".SRCINFO")
):
pkgver_reprog = re.compile("^\\s*pkgver\\s*=\\s*([a-zA-Z0-9._+-]+)\\s*$")
pkgrel_reprog = re.compile("^\\s*pkgrel\\s*=\\s*([0-9.]+)\\s*$")
pkgepoch_reprog = re.compile("^\\s*epoch\\s*=\\s*([0-9]+)\\s*$")
- pkgver = ""
- pkgrel = ""
- pkgepoch = ""
+ pkgver = None
+ pkgrel = None
+ pkgepoch = None
with open(
os.path.join(other_state["clones_dir"], pkg, ".SRCINFO"),
encoding="UTF-8",
def get_pkgbuild_version(pkg, force_check_srcinfo, pkg_state, other_state):
- """Returns (success, epoch, version, release)"""
+ """Gets the version of the pkg from .SRCINFO or PKGBUILD.
+
+ Returns (success, epoch, version, release).
+
+ If "success" is False, then all other values are None.
+
+ Otherwise, version and release should be a str type, but epoch may be
+ None."""
+
pkgdir = os.path.join(other_state["clones_dir"], pkg)
log_print(f'Getting version of "{pkg}"...')
while True and not force_check_srcinfo:
if os.path.exists(os.path.join(pkgdir, "src")):
shutil.rmtree(os.path.join(pkgdir, "src"))
- pkgepoch = ""
- pkgver = ""
- pkgrel = ""
+ pkgepoch = None
+ pkgver = None
+ pkgrel = None
# TODO maybe sandbox sourcing the PKGBUILD
pkgbuild_output = subprocess.run(
log_print("ERROR: Unreachable code")
return False, None, None, None
- if len(pkgepoch) == 0:
- pkgepoch = None
- if len(pkgver) == 0:
- pkgver = None
- if len(pkgrel) == 0:
- pkgrel = None
-
if pkgver is not None and pkgrel is not None:
return True, pkgepoch, pkgver, pkgrel
else:
pkg_state,
other_state,
):
+ """Checks the version of the pkg against the currently installed version.
+
+ Returns "install" if the version of the pkg is newer.
+
+ Otherwise returns "done" if the version is not newer.
+
+ Returns "fail" on error."""
+
ver_success, pkgepoch, pkgver, pkgrel = get_pkgbuild_version(
pkg, force_check_srcinfo, pkg_state, other_state
)
def get_pkg_current_version(pkg, pkg_state, repo):
- """Returns (status, epoch, version)"""
+ """Fetches the version info and returns status of fetching and the version.
+
+ Returns (status, epoch, version)
+
+ "status" may be one of:
+ "fail", "install", "fetched"
+
+ "epoch" may be None.
+
+ "version" must be a str if "status" is "fetched". Otherwise, it is None if
+ "status" is "fail" or "install".
+ """
+
log_print(
'Checking version of installed pkg "{}"...'.format(
pkg_state[pkg]["pkg_name"]
def get_sudo_privileges():
+ """Starts a bash loop that ensures sudo privileges are ready while this
+ script is active."""
+
global SUDO_PROC
if not SUDO_PROC:
log_print("sudo -v")
def cleanup_sudo(sudo_proc):
+ """Stops the bash loop keeping sudo privileges."""
+
sudo_proc.terminate()
def create_executable_script(dest_filename, script_contents):
+ """Creates a script to be run with sudo to set up a custom command.
+
+ This is currently used to set up sccache by placing custom commands in
+ "/usr/local/bin" for gcc and friends."""
+
tempf_name = "unknown"
with tempfile.NamedTemporaryFile(
mode="w", encoding="utf-8", delete=False
def setup_ccache(chroot):
+ """Sets up the chroot for ccache."""
+
# set up ccache stuff
try:
subprocess.run(
def cleanup_ccache(chroot):
+ """Unsets up the chroot for ccache."""
+
# cleanup ccache stuff
try:
subprocess.run(
def setup_sccache(chroot):
+ """Sets up sccache for the chroot."""
+
sccache_script = """#!/usr/bin/env sh
export PATH=${PATH/:\/usr\/local\/bin/}
/usr/bin/env sccache $(basename "$0") "$@"
def cleanup_sccache(chroot):
+ """Unsets up sccache for the chroot."""
+
# cleanup sccache stuff
try:
subprocess.run(
signing_gpg_pass,
no_store,
):
+ """For each package to build: builds it, signs it, and moves it to
+ "pkg_out_dir"."""
+
if not get_sudo_privileges():
log_print("ERROR: Failed to get sudo privileges")
sys.exit(1)
def get_latest_pkg(pkg, cache_dir):
+ """Gets the latest pkg from the specified "cache_dir" and return its
+ filename."""
+
globbed = glob.glob(os.path.join(cache_dir, pkg + "*"))
if len(globbed) > 0:
globbed.sort()
def confirm_result(pkg, state_result):
- """Returns "continue", "recheck", "force_build", or "abort"."""
+ """Prompts the user the action to take for a pkg after checking its
+ PKGBUILD.
+
+ Returns "continue", "recheck", "force_build", "skip", "back", or "abort"."""
+
while True:
log_print(
'Got "{}" for pkg "{}", action: [C(ontinue), r(echeck), f(orce build),\
def print_state_info_and_get_update_list(pkg_state):
+ """Prints the current "checked" state of all pkgs in the config."""
+
to_update = []
log_print("package state:")
for (pkg_name, pkg_dict) in pkg_state.items():
def test_gpg_passphrase(signing_gpg_dir, signing_key_fp, passphrase):
+ """Checks if the given gpg passphrase works with the gpg signing key."""
+
with tempfile.NamedTemporaryFile() as tempnf:
tempnf.write(b"Test file content")
tempnf.flush()
def validate_and_verify_paths(other_state):
+ """Checks and validates/ensures that certain directories exist."""
+
if not os.path.exists(other_state["chroot"]):
log_print(
f"ERROR: chroot at \"{other_state['chroot']}\" does not exist"