Source code for chaintool.command

# -*- coding: utf-8 -*-
#
# Copyright 2021 Joel Baxter
#
# This file is part of chaintool.
#
# chaintool is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# chaintool is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with chaintool.  If not, see <https://www.gnu.org/licenses/>.

"""Top-level logic for "cmd" operations.

Called from cli module. Handles locking and shortcuts/completions; delegates
to command_impl_* modules for most of the work.

Note that most locks acquired here are released only when the program exits.
Operations are meant to be invoked one per program instance, using the CLI.

"""


__all__ = [
    "cli_list",
    "cli_set",
    "cli_edit",
    "cli_print",
    "cli_print_all",
    "cli_del",
    "cli_run",
    "cli_vals",
    "cli_vals_all",
]


import atexit
import copy
import os
import tempfile

from colorama import Fore

from . import command_impl_core
from . import command_impl_op
from . import command_impl_print
from . import completions
from . import sequence_impl_core
from . import shared
from . import shortcuts
from . import locks


[docs]def cli_list(column): """Print the list of current command names. No locking needed. Just read a directory list and print it. :param column: if True, print as one command name per line :type column: bool :returns: exit status code; currently always returns 0 :rtype: int """ print() command_names = command_impl_core.all_names() if command_names: if column: print("\n".join(command_names)) else: print(" ".join(command_names)) print() return 0
[docs]def cli_set(cmd, cmdline, overwrite, print_after_set): """Create or update a command to consist of the given commandline. Acquire the seq inventory readlock, the cmd inventory writelock, and the cmd item writelock. If we're creating a new command, check to see whether a sequence of this same name already exists (reject if so). Delegate to :func:`.command_impl_op.define` to create/update the command. Finally: if we successfully created a new command, set up its shortcut (:func:`.shortcuts.create_cmd_shortcut`) and autocompletion behavior (:func:`.completions.create_completion`). :param cmd: name of command to create/update :type cmd: str :param cmdline: commandline :type cmdline: str :param overwrite: whether to allow if command already exists :type overwrite: bool :param print_after_set: whether to automatically trigger "print" operation at the end :type print_after_set: bool :returns: exit status code (0 for success, nonzero for error) :rtype: int """ locks.inventory_lock("seq", locks.LockType.READ) locks.inventory_lock("cmd", locks.LockType.WRITE) locks.item_lock("cmd", cmd, locks.LockType.WRITE) creating = False if not command_impl_core.exists(cmd): creating = True if sequence_impl_core.exists(cmd): print() shared.errprint( "Command '{}' cannot be created because a sequence exists with" " the same name.".format(cmd) ) print() return 1 status = command_impl_op.define( cmd, cmdline, overwrite, print_after_set, False ) if creating and not status: shortcuts.create_cmd_shortcut(cmd) completions.create_completion(cmd) return status
[docs]def cli_edit(cmd, print_after_set): """Interactively create or update a command. Acquire the seq inventory readlock, the cmd inventory writelock, and the cmd item writelock. Read the current command's commandline (if it exists). If we're creating a new command, check to see whether a sequence of this same name already exists (reject if so). Then create a temporary empty command that we can edit. Release the inventory locks and let the user interactively edit any existing commandline. The delegate to :func:`.command_impl_op.define` to create/update the command. Finally: if we successfully created a new command, set up its shortcut (:func:`.shortcuts.create_cmd_shortcut`) and autocompletion behavior (:func:`.completions.create_completion`). :param cmd: name of command to create/update :type cmd: str :param print_after_set: whether to automatically trigger "print" operation at the end :type print_after_set: bool :returns: exit status code (0 for success, nonzero for error) :rtype: int """ locks.inventory_lock("seq", locks.LockType.READ) locks.inventory_lock("cmd", locks.LockType.WRITE) locks.item_lock("cmd", cmd, locks.LockType.WRITE) cleanup_fun = None try: cmd_dict = command_impl_core.read_dict(cmd) old_cmdline = cmd_dict["cmdline"] except FileNotFoundError: # Check whether there's a seq of the same name. if sequence_impl_core.exists(cmd): print() shared.errprint( "Command '{}' cannot be created because a sequence exists with" " the same name.".format(cmd) ) print() return 1 # We want to release the inventory locks before we go into interactive # edit. Creating a temp/empty command to edit here makes that safe to # do; any concurrent seq creation will see it when checking for name # conflicts. old_cmdline = "" cleanup_fun = lambda: command_impl_op.delete(cmd, True) atexit.register(cleanup_fun) command_impl_core.create_temp(cmd) locks.release_inventory_lock("cmd", locks.LockType.WRITE) locks.release_inventory_lock("seq", locks.LockType.READ) print() new_cmdline = shared.editline("commandline: ", old_cmdline) status = command_impl_op.define( cmd, new_cmdline, True, print_after_set, False ) if cleanup_fun: if status: # Make sure we don't leave the temp/empty command laying around # in the error case. cleanup_fun() else: shortcuts.create_cmd_shortcut(cmd) completions.create_completion(cmd) atexit.unregister(cleanup_fun) return status
[docs]def cli_print(cmd): """Pretty-print the info for a command. Delegate to :func:`.command_impl_print.print_one`. :param cmd: name of command to print :type cmd: str :returns: exit status code (0 for success, nonzero for error) :rtype: int """ # No locking needed. We read a cmd yaml file and format/print it. If # the file is being deleted right now that's fine, either we get in # before the delete or after. print() return command_impl_print.print_one(cmd)
[docs]def cli_print_all(): """Pretty-print the info for all commands. Acquire the cmd inventory readlock and get the list of all commands. Readlock those commands and delegate to :func:`.command_impl_print.print_multi` to pretty-print the info for that list of commands. :returns: exit status code (0 for success, nonzero for error) :rtype: int """ locks.inventory_lock("cmd", locks.LockType.READ) command_names = command_impl_core.all_names() locks.multi_item_lock("cmd", command_names, locks.LockType.READ) print() return command_impl_print.print_multi(command_names, True)
[docs]def cli_del(delcmds, ignore_seq_usage): """Delete one or more commands. If ``ignore_seq_usage`` is ``False``, acquire the seq inventory readlock and item readlocks on all sequences. Acquire the cmd inventory writelock and item writelocks on the commands to delete. If ``ignore_seq_usage`` is False, check all the given commands to make sure that they are not currently contained in any sequence (reject if so). Delete each command (via :func:`.command_impl_op.delete`), and tear down its shortcut (:func:`.shortcuts.delete_cmd_shortcut`) and autocompletion behavior (:func:`.completions.delete_completion`). :param delcmds: names of commands to delete :type delcmds: list[str] :param ignore_seq_usage: if True, don't validate that commands are unused by current sequences :type ignore_seq_usage: bool :returns: exit status code (0 for success, nonzero for error) :rtype: int """ if not ignore_seq_usage: locks.inventory_lock("seq", locks.LockType.READ) sequence_names = sequence_impl_core.all_names() locks.multi_item_lock("seq", sequence_names, locks.LockType.READ) locks.inventory_lock("cmd", locks.LockType.WRITE) locks.multi_item_lock("cmd", delcmds, locks.LockType.WRITE) print() if not ignore_seq_usage: error = False seq_dicts = [] for seq in sequence_names: try: seq_dict = sequence_impl_core.read_dict(seq) except FileNotFoundError: continue seq_dict["name"] = seq seq_dicts.append(seq_dict) for cmd in delcmds: for seq_dict in seq_dicts: if cmd in seq_dict["commands"]: error = True shared.errprint( "Command {} is used by sequence {}.".format( cmd, seq_dict["name"] ) ) if error: print() return 1 for cmd in delcmds: try: command_impl_op.delete(cmd, False) except FileNotFoundError: print("Command '{}' does not exist.".format(cmd)) continue print("Command '{}' deleted.".format(cmd)) shortcuts.delete_cmd_shortcut(cmd) completions.delete_completion(cmd) print() return 0
[docs]def cli_run(cmd, quiet, args): """Run a command. Acquire the cmd item readlock. Create a temporary directory using a context manager. While the temp directory exists, grab its name for the value of the "tempdir" reserved placeholder, and delegate to :func:`.command_impl_op.run` to execute the command. Finally, print a warning if any of the given placeholder args were irrelevant for this command. Note that :func:`.command_impl_op.run` may modify ``args`` (for use with subsequent commands in the sequence). :param cmd: name of command to run :type cmd: str :param quiet: whether to print only the command output :type quiet: bool :param args: placeholder arguments for this run; to modify :type args: list[str] :returns: exit status code (0 for success, nonzero for error) :rtype: int """ # Arguably there's no locking needed here. But in the seq run case we # do keep cmds locked until the run is over, so it's good to be consistent. # Also it's not too surprising that we would block editing or deleting a # cmd while it is running. locks.item_lock("cmd", cmd, locks.LockType.READ) unused_args = copy.deepcopy(args) rsv_ctx = command_impl_op.ReservedPlaceholdersCtx() with tempfile.TemporaryDirectory() as tmpdirname: rsv_ctx.tempdir = tmpdirname + os.sep status = command_impl_op.run(cmd, quiet, args, unused_args, rsv_ctx) if unused_args: print( shared.MSG_WARN_PREFIX + " the following args don't apply to this commandline:", " ".join(unused_args), ) print() return status
[docs]def cli_vals(cmd, args, print_after_set): """Update placeholder values for a command. Acquire the cmd item writelock. Delegate to :func:`.command_impl_op.vals` to update this command. Finally, print a warning if any of the given placeholder args were irrelevant for this command. :param cmd: name of command to update :type cmd: str :param args: new placeholder value settings :type args: list[str] :param print_after_set: whether to automatically trigger "print" operation at the end :type print_after_set: bool :returns: exit status code (0 for success, nonzero for error) :rtype: int """ locks.item_lock("cmd", cmd, locks.LockType.WRITE) unused_args = copy.deepcopy(args) status = command_impl_op.vals( cmd, args, unused_args, print_after_set, False ) if status: return status if unused_args: print( shared.MSG_WARN_PREFIX + " the following args don't apply to this commandline:", " ".join(unused_args), ) print() return 0
[docs]def cli_vals_all(placeholder_args): """Update placeholder values for all commands. Acquire the cmd inventory readlock and get the list of all commands. Writelock those commands and delegate to :func:`.command_impl_op.vals` to update each command. Finally, print a warning if any of the given placeholder args were irrelevant for all commands. :param cmd: name of command to update :type cmd: str :param args: new placeholder value settings :type args: list[str] :param print_after_set: whether to automatically trigger "print" operation at the end :type print_after_set: bool :returns: exit status code (0 for success, nonzero for error) :rtype: int """ locks.inventory_lock("cmd", locks.LockType.READ) command_names = command_impl_core.all_names() locks.multi_item_lock("cmd", command_names, locks.LockType.WRITE) print() unused_args = copy.deepcopy(placeholder_args) print(Fore.MAGENTA + "* updating all commands" + Fore.RESET) print() error = False for cmd in command_names: status = command_impl_op.vals( cmd, placeholder_args, unused_args, False, True ) if status: error = True if unused_args: print( shared.MSG_WARN_PREFIX + " the following args don't apply to any commandline:", " ".join(unused_args), ) print() if error: return 1 return 0