mirror of
https://github.com/CZ-NIC/pz.git
synced 2022-02-13 01:03:07 +03:00
470 lines
22 KiB
Python
Executable File
470 lines
22 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
#
|
||
import argparse
|
||
import logging
|
||
import re
|
||
import sys
|
||
# noinspection PyUnresolvedReferences
|
||
from collections import defaultdict, Counter
|
||
from collections.abc import Iterable, Callable
|
||
from contextlib import contextmanager
|
||
from itertools import islice, count as count_from, repeat
|
||
# noinspection PyUnresolvedReferences
|
||
from math import *
|
||
from re import *
|
||
|
||
# logging
|
||
logging.basicConfig(level=logging.INFO, format='%(message)s', stream=sys.stderr)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# names that can be imported
|
||
available_names = {"Path": "pathlib",
|
||
"datetime": "datetime",
|
||
"sleep": "time",
|
||
"time": "time",
|
||
"randint": "random",
|
||
"get": "requests",
|
||
"b64encode": "base64",
|
||
"b64decode": "base64",
|
||
"glob": "glob",
|
||
"iglob": "glob",
|
||
"ZipFile": "zipfile"}
|
||
_module_clash = {v for k, v in available_names.items() if k == v} # ex: do not import glob in favour of glob.glob
|
||
available_modules = ({x for x in available_names.values() if x not in _module_clash}
|
||
.union(("csv", "jsonpickle", "humanize", "webbrowser", "collections", "itertools")))
|
||
|
||
__doc__ = (f"Launch your tiny Python script on a piped in contents and pipe it out"
|
||
"\n"
|
||
"\nAvailable without import:"
|
||
f"\n Loaded: re.* (match, search, findall), math.* (sqrt,...), defaultdict"
|
||
f"\n Auto-imported functions: {', '.join(sorted(available_names.keys(), key=str.casefold))}"
|
||
f"\n Auto-imported modules: {', '.join(sorted(available_modules))}"
|
||
f"\n"
|
||
f"\nAvailable variables:"
|
||
f"\n * s – current line"
|
||
f"\n * n – current line converted to an `int` (or `float`) if possible"
|
||
f"\n * b – current line as a byte-string"
|
||
f"\n * text – whole text, all lines together"
|
||
f"\n * lines – list of lines so far processed"
|
||
f"\n * numbers – list of numbers so far processed"
|
||
f"\n * skip – omit line if True"
|
||
f"\n * i=0, S=set(), L=list(), D=dict(), C=Counter() – other global variables"
|
||
)
|
||
|
||
# parse arguments
|
||
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawTextHelpFormatter)
|
||
parser.add_argument("-v", "--verbose", help='Show automatic imports and internal command modification',
|
||
action="store_true")
|
||
parser.add_argument("-q", "--quiet", help='Suppress command exceptions', action='store_true')
|
||
|
||
group1 = parser.add_argument_group("Command clauses")
|
||
group1.add_argument("main", help='Any Python script executed on every line (multiple statements allowed)',
|
||
metavar="COMMAND", nargs="?")
|
||
group1.add_argument("-S", "--setup", help='Any Python script, executed before processing.'
|
||
' Useful for variable initializing.', metavar="COMMAND")
|
||
group1.add_argument("-E", "--end", help='Any Python script, executed after processing.'
|
||
' Useful for final output.', metavar="COMMAND")
|
||
group1.add_argument("-F", "--filter", help='Line is piped out unchanged, however only if evaluated to True.',
|
||
action='store_true')
|
||
group1.add_argument("-f", "--format", help='Main and end clauses are considered f-strings.', action='store_true')
|
||
|
||
group2 = parser.add_argument_group("Populating variables")
|
||
group2.add_argument("-w", "--whole", help='Wait till whole text fetched and then process.'
|
||
' Variable `text` is available containing whole text.', action='store_true')
|
||
|
||
group3 = parser.add_argument_group("Input / output")
|
||
group3.add_argument("-n", help='Process only such number of lines.', type=int, metavar="NUM")
|
||
group3.add_argument("-1", help='Process just first line. Useful in combination with --text.'
|
||
' You may want to add -1 flag.', action='store_const', dest="n", const=1)
|
||
group3.add_argument("-0", help='Skip all lines output. (Useful in combination with --end.)', action='store_true',
|
||
dest="zero")
|
||
group3.add_argument("--empty", help='Output empty lines. (By default skipped.)', action='store_true')
|
||
group3.add_argument("-g", "--generate", help='Generate lines while ignoring the input pipe.'
|
||
' Line will correspond to the iteration cycle count.'
|
||
' By default `NUM=5`; `NUM=0` means infinity.',
|
||
nargs="?", type=int, metavar="NUM", const=5)
|
||
group3.add_argument("--stderr", help='Print commands output to the STDERR,'
|
||
' while letting the original line piped to STDOUT intact.', action='store_true')
|
||
group3.add_argument("--overflow-safe", help='Prevent `lines`, `numbers`, `text` variables to be available.'
|
||
' Useful when handling an infinite input.', action='store_true')
|
||
|
||
group4 = parser.add_argument_group("Regular output")
|
||
group4.add_argument("--search", help='Equivalent to `search(COMMAND, s)`', action='store_true')
|
||
group4.add_argument("--match", help='Equivalent to `match(COMMAND, s)`', action='store_true')
|
||
group4.add_argument("--findall", help='Equivalent to `findall(COMMAND, s)`', action='store_true')
|
||
group4.add_argument("--sub", help='Equivalent to `sub(COMMAND, SUBSTITUTION, s)`', metavar="SUBSTITUTION")
|
||
|
||
# evaluate command line arguments
|
||
args = parser.parse_args()
|
||
logger.setLevel(logging.ERROR if args.quiet else (logging.DEBUG if args.verbose else logging.WARNING))
|
||
write_pipe = sys.stderr if args.stderr else sys.stdout
|
||
skip_all = args.zero
|
||
if args.generate and args.n: # it does not make sense to generate more lines than we plan to process
|
||
args.generate = min(args.generate, args.n)
|
||
|
||
# custom functions
|
||
whole_hint_printed = False
|
||
match_class = match('', '').__class__ if sys.version_info < (3, 7) else Match # drop with Python3.6
|
||
flush = None # by default, we do not change flushing behaviour
|
||
|
||
|
||
def write(v):
|
||
""" Print either bytes or string. Bytes are not printed in the Python b-form: b'string' but raw. """
|
||
if type(v) is bytes:
|
||
write_pipe.buffer.write(v + b'\n')
|
||
else:
|
||
# if needed, we may explicitly flush bytes output too
|
||
print(v, file=write_pipe, flush=flush)
|
||
|
||
|
||
@contextmanager
|
||
def auto_import():
|
||
""" If line processing fails with a NameError, check """
|
||
global whole_hint_printed
|
||
try:
|
||
yield
|
||
except NameError as e:
|
||
name = re.match(r"name '(.*?)' is not defined", str(e))[1]
|
||
if name:
|
||
# Import anything on the fly (saved performance when loaded)
|
||
if name == "text":
|
||
if not whole_hint_printed:
|
||
logger.warning("Did you not forget to use --whole to access `text`?")
|
||
whole_hint_printed = True
|
||
raise
|
||
elif name in ("numbers", "lines"):
|
||
if not whole_hint_printed and args.overflow_safe:
|
||
logger.warning("The flag --overflow-safe suppress `lines` and `numbers`.")
|
||
whole_hint_printed = True
|
||
raise
|
||
elif name in available_names:
|
||
module = available_names[name]
|
||
logger.info(f"Importing {name} from {module}")
|
||
# ex sleep = getattr(module "time", "sleep")
|
||
globals()[name] = getattr(__import__(module), name)
|
||
elif name in available_modules:
|
||
logger.info(f"Importing {name}")
|
||
globals()[name] = __import__(name)
|
||
else:
|
||
raise
|
||
|
||
|
||
def try_argument(callable_, argument, var, cmd="main"):
|
||
""" Try to pass an argument to a callable. Returns False if TypeError happened. """
|
||
t = f"attempt to use `{var}` as the callable parameter in the {cmd} clause: {command[cmd]}({argument})"
|
||
try:
|
||
output(callable_(argument))
|
||
command[cmd] += f"({var})"
|
||
except TypeError as e:
|
||
logger.debug(f"Failed {t} with: {e}")
|
||
return False
|
||
else:
|
||
logger.debug(f"Successful {t}")
|
||
return True
|
||
|
||
|
||
def output(line, final_round=None):
|
||
""" output one or more lines """
|
||
global tried_to_correct_callable
|
||
if isinstance(line, match_class):
|
||
# replace with the tuple of the groups or whole matched string (if no group matched)
|
||
line = line.groups() or line.group(0)
|
||
if line: # empty string makes no output
|
||
if isinstance(line, (str, bytes)):
|
||
write(line)
|
||
elif isinstance(line, list): # list is output as multiple lines
|
||
[output(el) for el in line]
|
||
elif isinstance(line, Iterable): # tuple or generator (but not a string) gets joined
|
||
write(", ".join(str(el) for el in line))
|
||
elif isinstance(line, Callable): # tuple or generator (but not a string) gets joined
|
||
try:
|
||
output(line())
|
||
except TypeError:
|
||
if tried_to_correct_callable and not final_round:
|
||
# this it not the first line of the main command nor the `--end` clause,
|
||
# we have already been there without success
|
||
raise
|
||
tried_to_correct_callable = True
|
||
# ex: `| pz webbrowser.open` -> `| pz webbrowser.open(s)`
|
||
# ex: `sqrt() takes exactly one argument (0 given)`
|
||
# ex: `open() missing required argument 'file' (pos 1)'` (build-in open)
|
||
# ex: `open() missing 1 required positional argument: 'url'` (webbrowser.open)
|
||
# Unfortunately, there is no certain way to determine the wanted type.
|
||
# The wording of TypeError exceptions specifying the type vary.
|
||
# The best we have is to use the inspect module to get the annotation or the parameter name.
|
||
# We content to try it multiple things to pass as an argument.
|
||
attempts = []
|
||
if not final_round:
|
||
attempts.append((original_line, "s"))
|
||
if args.lines:
|
||
attempts.append((numbers, "numbers"))
|
||
else:
|
||
logger.debug("Since `--lines` flag is off, we will not try `numbers`.")
|
||
if n is not None:
|
||
# ex: echo 5 | pz sqrt | pz round
|
||
attempts.append((n, "n"))
|
||
import inspect
|
||
try:
|
||
param = list(inspect.signature(line).parameters.values())[0]
|
||
if param.name != "iterable":
|
||
# ex: pz b64encode += (s.encode('utf-8'))
|
||
logger.debug("Let's try `s.encode('utf-8')` automatically too.")
|
||
attempts.append((original_line.encode("utf-8"), "s.encode('utf-8')"))
|
||
except ValueError: # ex: `set.add` raises no signature found
|
||
pass
|
||
else: # we are in the `--end` clause, original_line is empty, we use `lines` or `numbers` instead
|
||
if len(numbers) == len(lines):
|
||
# ex: echo -e "1\n2\n3\n4" | pz --end sum
|
||
attempts.append((numbers, "numbers", "end"))
|
||
# ex: echo -e "1\n2\n3\n4" | pz --end "' - '.join" -> 1 - 2 - 3 - 4
|
||
attempts.append((lines, "lines", "end"))
|
||
if not any(try_argument(line, *x) for x in attempts):
|
||
raise
|
||
else: # ex: int, str
|
||
write(line)
|
||
else:
|
||
if args.empty or (line == 0 and line is not False):
|
||
write(line)
|
||
|
||
|
||
def get_number(v):
|
||
num = None
|
||
try:
|
||
# we prefer having int over float because adding values '5' + '5' as '10'
|
||
# looks better than '10.0' in most use cases
|
||
num = float(v)
|
||
num = int(v) # "10.0" -> int conversion fails and num stays float
|
||
except (ValueError, TypeError):
|
||
pass
|
||
return num
|
||
|
||
|
||
def prepare_command(name):
|
||
original = cmd = (getattr(args, name) or "").strip() # read from args.main, args.end
|
||
|
||
if args.format:
|
||
cmd = "f'''" + cmd + "'''"
|
||
|
||
if name == "main" and regular_command:
|
||
# prepending `line = ` is not needed, the string is treated as a `match` parameter
|
||
pass
|
||
elif (len(cmd.splitlines()) == 1 # check if there is only a single line with a missing assignment
|
||
and not search(r"(s|skip)\s?[^=]?=[^=]", cmd) # ex: `s = s+= 1` would fail
|
||
# do not assign to reserved keywords (ex: `s = if s == 1: ...` would fail)
|
||
and not any(cmd.lstrip().startswith(keyword) for keyword in ("if", "while", "for"))
|
||
and ";" not in cmd
|
||
and "lines." not in cmd):
|
||
# "s = 1" - will not pass
|
||
# "s += 1" - will not pass
|
||
# "s + 1" - will pass
|
||
# "s == 1" - will pass
|
||
# "if s == 1: print(s)" - will not pass
|
||
cmd = ("skip = not " if args.filter else "s = ") + cmd
|
||
|
||
if name == "main":
|
||
if not cmd and args.generate is not None:
|
||
cmd = "# generator" # force args.run to be True by having a command
|
||
if args.stderr:
|
||
if not cmd:
|
||
# it is not intended to pipe everything to STDERR while everything is piped unchanged to STDOUT too
|
||
cmd = 's = None'
|
||
if not args.zero: # when using the --stderr flag, we pass the original content to the STDOUT
|
||
cmd += '\nsys.stdout.write(original_line+"\\n")'
|
||
|
||
if original != cmd: # verbose output
|
||
logger.debug(f"Changing the {name} clause to: {cmd.strip()}")
|
||
command[name] = cmd
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# determine args.run and possibly turn on args.lines
|
||
args.run = True # True = run whole processing (output), 1 = partial run (populate `lines`), False = do not run
|
||
args.lines = not args.overflow_safe # whether to populate variables like: `lines`, `numbers`
|
||
|
||
if all(x is None for x in (args.main, args.end, args.generate)):
|
||
logger.error("You have to specify either main COMMAND or --end COMMAND.")
|
||
quit()
|
||
|
||
# prepare commands (prepend `line =` if needed)
|
||
command = {"main": "", "end": ""}
|
||
regular_command = None # prepare regular modifications
|
||
reg_ex = None
|
||
if args.match or args.findall or args.search or args.sub:
|
||
try:
|
||
reg_ex = re.compile(args.main)
|
||
except re.error as exc:
|
||
logger.error(f"re.error: Compilation failed with {exc}")
|
||
quit()
|
||
if args.match:
|
||
regular_command = reg_ex.match
|
||
elif args.search:
|
||
regular_command = reg_ex.search
|
||
elif args.findall:
|
||
regular_command = reg_ex.findall
|
||
elif args.sub:
|
||
regular_command = lambda line: reg_ex.sub(args.sub, line)
|
||
[prepare_command(name) for name in command]
|
||
|
||
if not command["main"]:
|
||
# no main clause specified -> we may limit or turn off processing (and output)
|
||
# if the user needs the program to pipe out continuously, they might want to use `s` as the main clause
|
||
# running can be skipped or at least run partially to fill `lines`
|
||
args.run = False if args.overflow_safe else 1
|
||
|
||
# empty variables available in the user scope
|
||
i = 0
|
||
S = set()
|
||
L = list()
|
||
D = dict()
|
||
C = Counter()
|
||
skip = None # if user sets to False, the line will not be output
|
||
|
||
# prepare text processing (either fetch whole or line by line)
|
||
# Note: do not initialize the `text` to None. We want to be able to catch
|
||
# `<class 'NameError'> name 'text' is not defined` while not turning `--text` on
|
||
text: str
|
||
if args.generate is not None:
|
||
# `--generate=5` → 1,2,3,4,5
|
||
# `--generate=0` → 1 .. infinity
|
||
# `--generate=0 --overflow_safe` → 1 × infinity
|
||
loop = (str(i).encode() for i in (range(1, args.generate + 1) if args.generate else # finite generate
|
||
(repeat(1) if args.overflow_safe else count_from(1)))) # infinite generator
|
||
logger.debug("Generating s = 1 .. " +
|
||
(str(args.generate) if args.generate else ("" if args.overflow_safe else "∞")))
|
||
|
||
# turn on flushing
|
||
# Ex: it took a lot of time before buffer flushed out when flushing to another pz instance
|
||
# (and not to the console) in the command: `pz -g0 "s = randint(1,100); sleep(0.01)" | pz s
|
||
flush = True
|
||
elif args.whole:
|
||
# fetch whole text
|
||
try:
|
||
b = sys.stdin.buffer.read().rstrip() # we strip the last newline
|
||
except KeyboardInterrupt:
|
||
logger.error("KeyboardInterrupt: Loading `text` interrupted.")
|
||
b = b""
|
||
loop = (line for line in b.splitlines()[:args.n])
|
||
try:
|
||
text = b.decode()
|
||
except UnicodeError:
|
||
logger.warning(f"Cannot parse the text variable correctly")
|
||
text = b.decode(errors="replace")
|
||
else:
|
||
# load lines one by one (while taking at most N lines)
|
||
loop = islice(sys.stdin.buffer, args.n)
|
||
|
||
# filled-in variables available in the user scope
|
||
b: bytes = None
|
||
s: str = None
|
||
n = None
|
||
lines: list
|
||
numbers: list
|
||
count: int # itertools.count are imported as count_from ← more common to use this variable over the other
|
||
|
||
if args.lines:
|
||
lines = []
|
||
numbers = []
|
||
count = 0
|
||
|
||
# internal processing variables
|
||
tried_to_correct_callable = False
|
||
original_line: str = None
|
||
|
||
# run the setup clause
|
||
if args.setup:
|
||
while True:
|
||
with auto_import():
|
||
exec(args.setup)
|
||
break
|
||
|
||
# run processing
|
||
if args.run: # speed up when there is no main clause
|
||
while True:
|
||
try:
|
||
try:
|
||
b = next(loop).rstrip()
|
||
original_line = s = b.decode()
|
||
except UnicodeError:
|
||
logger.warning(f"Cannot parse line correctly: {b}")
|
||
original_line = s = b.decode(errors="replace")
|
||
except StopIteration:
|
||
break
|
||
n = get_number(s)
|
||
if args.lines:
|
||
# these variables might be undefined by purpose (and user should not see it)
|
||
# noinspection PyUnboundLocalVariable
|
||
count += 1
|
||
# noinspection PyUnboundLocalVariable
|
||
lines.append(s)
|
||
if n:
|
||
# noinspection PyUnboundLocalVariable
|
||
numbers.append(n)
|
||
|
||
if args.run is not True: # speed up, further processing not needed
|
||
continue
|
||
|
||
while True:
|
||
with auto_import():
|
||
# loop until all on the fly imports are done
|
||
skip = None
|
||
# we process either a regular expression or a custom command
|
||
if regular_command:
|
||
try:
|
||
s = regular_command(s)
|
||
except re.error as exc:
|
||
logger.error(f"{exc}, regular expression: {args.main} on line: {s}")
|
||
break
|
||
else: # resolving custom command
|
||
# note that exec will not affect local field, hence we cannot easily put this in a method
|
||
exec(command["main"])
|
||
if skip or (skip_all and skip is not False): # user chooses to filter out the line
|
||
break
|
||
output(s)
|
||
break
|
||
except BrokenPipeError:
|
||
# do not continue processing when pipe is broken
|
||
# ex: process we pipe into is killed
|
||
# There is a chance we want to pipe something to the STDERR in the --end clause, hence we do not quit.
|
||
s = args.main if regular_command else command['main']
|
||
logger.debug(f"BrokenPipeError: No output pipe when processing the main clause '{s}'")
|
||
break
|
||
except KeyboardInterrupt: # useful for ending up an infinite generator
|
||
# Break but continue when SIGINT caught in case we have an END clause.
|
||
# No problem when output to the terminal or to the STDERR. If output to another process via STDOUT,
|
||
# since whole process group received SIGINT, the output is lost if the process is stopping right now,
|
||
# or BrokenPipeError is raised if the process has already stopped.
|
||
break
|
||
except Exception as exc:
|
||
logger.warning(f'Exception: {type(exc)} {exc} on line: {s}')
|
||
continue
|
||
# run final script
|
||
if command["end"]:
|
||
if not args.whole and args.lines:
|
||
# --text was off by default so we did not wait whole input to be piped in before processing.
|
||
# The variable `text` was not available before but there is no obstacle in letting it
|
||
# to be automatically available at the end – we have everything needed in the `lines` variable.
|
||
text = "\n".join(lines)
|
||
|
||
original_line = s = n = b = None
|
||
try:
|
||
while True:
|
||
with auto_import():
|
||
exec(command["end"])
|
||
output(s, True)
|
||
break
|
||
except BrokenPipeError:
|
||
logger.debug(f"BrokenPipeError: No output pipe in the --end clause '{command['end']}'")
|
||
except Exception as exc:
|
||
logger.warning(f'Exception: {type(exc)} {exc} in the --end clause')
|
||
|
||
# Gratuitous exit
|
||
# Ex: The middle command in `pz -g0 s | pz -E "sleep(1)" s | xargs echo` would often end up with
|
||
# Exception ignored in: <_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>
|
||
# BrokenPipeError: [Errno 32] Broken pipe
|
||
# because `xargs` having received a SIGINT has already stopped.
|
||
# We prevent this situation by manually closing the STDOUT.
|
||
try:
|
||
sys.stdout.close()
|
||
except BrokenPipeError:
|
||
pass
|