1
0
mirror of https://github.com/CZ-NIC/pz.git synced 2022-02-13 01:03:07 +03:00
Files
Edvard Rejthar 5f9b0a63be raw bytes support
2021-03-19 17:10:40 +01:00

470 lines
22 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
#
import argparse
import logging
import re
import sys
# noinspection PyUnresolvedReferences
from collections import defaultdict, Counter
from collections.abc import Iterable, Callable
from contextlib import contextmanager
from itertools import islice, count as count_from, repeat
# noinspection PyUnresolvedReferences
from math import *
from re import *
# logging
logging.basicConfig(level=logging.INFO, format='%(message)s', stream=sys.stderr)
logger = logging.getLogger(__name__)
# names that can be imported
available_names = {"Path": "pathlib",
"datetime": "datetime",
"sleep": "time",
"time": "time",
"randint": "random",
"get": "requests",
"b64encode": "base64",
"b64decode": "base64",
"glob": "glob",
"iglob": "glob",
"ZipFile": "zipfile"}
_module_clash = {v for k, v in available_names.items() if k == v} # ex: do not import glob in favour of glob.glob
available_modules = ({x for x in available_names.values() if x not in _module_clash}
.union(("csv", "jsonpickle", "humanize", "webbrowser", "collections", "itertools")))
__doc__ = (f"Launch your tiny Python script on a piped in contents and pipe it out"
"\n"
"\nAvailable without import:"
f"\n Loaded: re.* (match, search, findall), math.* (sqrt,...), defaultdict"
f"\n Auto-imported functions: {', '.join(sorted(available_names.keys(), key=str.casefold))}"
f"\n Auto-imported modules: {', '.join(sorted(available_modules))}"
f"\n"
f"\nAvailable variables:"
f"\n * s current line"
f"\n * n current line converted to an `int` (or `float`) if possible"
f"\n * b current line as a byte-string"
f"\n * text whole text, all lines together"
f"\n * lines list of lines so far processed"
f"\n * numbers list of numbers so far processed"
f"\n * skip omit line if True"
f"\n * i=0, S=set(), L=list(), D=dict(), C=Counter() other global variables"
)
# parse arguments
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument("-v", "--verbose", help='Show automatic imports and internal command modification',
action="store_true")
parser.add_argument("-q", "--quiet", help='Suppress command exceptions', action='store_true')
group1 = parser.add_argument_group("Command clauses")
group1.add_argument("main", help='Any Python script executed on every line (multiple statements allowed)',
metavar="COMMAND", nargs="?")
group1.add_argument("-S", "--setup", help='Any Python script, executed before processing.'
' Useful for variable initializing.', metavar="COMMAND")
group1.add_argument("-E", "--end", help='Any Python script, executed after processing.'
' Useful for final output.', metavar="COMMAND")
group1.add_argument("-F", "--filter", help='Line is piped out unchanged, however only if evaluated to True.',
action='store_true')
group1.add_argument("-f", "--format", help='Main and end clauses are considered f-strings.', action='store_true')
group2 = parser.add_argument_group("Populating variables")
group2.add_argument("-w", "--whole", help='Wait till whole text fetched and then process.'
' Variable `text` is available containing whole text.', action='store_true')
group3 = parser.add_argument_group("Input / output")
group3.add_argument("-n", help='Process only such number of lines.', type=int, metavar="NUM")
group3.add_argument("-1", help='Process just first line. Useful in combination with --text.'
' You may want to add -1 flag.', action='store_const', dest="n", const=1)
group3.add_argument("-0", help='Skip all lines output. (Useful in combination with --end.)', action='store_true',
dest="zero")
group3.add_argument("--empty", help='Output empty lines. (By default skipped.)', action='store_true')
group3.add_argument("-g", "--generate", help='Generate lines while ignoring the input pipe.'
' Line will correspond to the iteration cycle count.'
' By default `NUM=5`; `NUM=0` means infinity.',
nargs="?", type=int, metavar="NUM", const=5)
group3.add_argument("--stderr", help='Print commands output to the STDERR,'
' while letting the original line piped to STDOUT intact.', action='store_true')
group3.add_argument("--overflow-safe", help='Prevent `lines`, `numbers`, `text` variables to be available.'
' Useful when handling an infinite input.', action='store_true')
group4 = parser.add_argument_group("Regular output")
group4.add_argument("--search", help='Equivalent to `search(COMMAND, s)`', action='store_true')
group4.add_argument("--match", help='Equivalent to `match(COMMAND, s)`', action='store_true')
group4.add_argument("--findall", help='Equivalent to `findall(COMMAND, s)`', action='store_true')
group4.add_argument("--sub", help='Equivalent to `sub(COMMAND, SUBSTITUTION, s)`', metavar="SUBSTITUTION")
# evaluate command line arguments
args = parser.parse_args()
logger.setLevel(logging.ERROR if args.quiet else (logging.DEBUG if args.verbose else logging.WARNING))
write_pipe = sys.stderr if args.stderr else sys.stdout
skip_all = args.zero
if args.generate and args.n: # it does not make sense to generate more lines than we plan to process
args.generate = min(args.generate, args.n)
# custom functions
whole_hint_printed = False
match_class = match('', '').__class__ if sys.version_info < (3, 7) else Match # drop with Python3.6
flush = None # by default, we do not change flushing behaviour
def write(v):
""" Print either bytes or string. Bytes are not printed in the Python b-form: b'string' but raw. """
if type(v) is bytes:
write_pipe.buffer.write(v + b'\n')
else:
# if needed, we may explicitly flush bytes output too
print(v, file=write_pipe, flush=flush)
@contextmanager
def auto_import():
""" If line processing fails with a NameError, check """
global whole_hint_printed
try:
yield
except NameError as e:
name = re.match(r"name '(.*?)' is not defined", str(e))[1]
if name:
# Import anything on the fly (saved performance when loaded)
if name == "text":
if not whole_hint_printed:
logger.warning("Did you not forget to use --whole to access `text`?")
whole_hint_printed = True
raise
elif name in ("numbers", "lines"):
if not whole_hint_printed and args.overflow_safe:
logger.warning("The flag --overflow-safe suppress `lines` and `numbers`.")
whole_hint_printed = True
raise
elif name in available_names:
module = available_names[name]
logger.info(f"Importing {name} from {module}")
# ex sleep = getattr(module "time", "sleep")
globals()[name] = getattr(__import__(module), name)
elif name in available_modules:
logger.info(f"Importing {name}")
globals()[name] = __import__(name)
else:
raise
def try_argument(callable_, argument, var, cmd="main"):
""" Try to pass an argument to a callable. Returns False if TypeError happened. """
t = f"attempt to use `{var}` as the callable parameter in the {cmd} clause: {command[cmd]}({argument})"
try:
output(callable_(argument))
command[cmd] += f"({var})"
except TypeError as e:
logger.debug(f"Failed {t} with: {e}")
return False
else:
logger.debug(f"Successful {t}")
return True
def output(line, final_round=None):
""" output one or more lines """
global tried_to_correct_callable
if isinstance(line, match_class):
# replace with the tuple of the groups or whole matched string (if no group matched)
line = line.groups() or line.group(0)
if line: # empty string makes no output
if isinstance(line, (str, bytes)):
write(line)
elif isinstance(line, list): # list is output as multiple lines
[output(el) for el in line]
elif isinstance(line, Iterable): # tuple or generator (but not a string) gets joined
write(", ".join(str(el) for el in line))
elif isinstance(line, Callable): # tuple or generator (but not a string) gets joined
try:
output(line())
except TypeError:
if tried_to_correct_callable and not final_round:
# this it not the first line of the main command nor the `--end` clause,
# we have already been there without success
raise
tried_to_correct_callable = True
# ex: `| pz webbrowser.open` -> `| pz webbrowser.open(s)`
# ex: `sqrt() takes exactly one argument (0 given)`
# ex: `open() missing required argument 'file' (pos 1)'` (build-in open)
# ex: `open() missing 1 required positional argument: 'url'` (webbrowser.open)
# Unfortunately, there is no certain way to determine the wanted type.
# The wording of TypeError exceptions specifying the type vary.
# The best we have is to use the inspect module to get the annotation or the parameter name.
# We content to try it multiple things to pass as an argument.
attempts = []
if not final_round:
attempts.append((original_line, "s"))
if args.lines:
attempts.append((numbers, "numbers"))
else:
logger.debug("Since `--lines` flag is off, we will not try `numbers`.")
if n is not None:
# ex: echo 5 | pz sqrt | pz round
attempts.append((n, "n"))
import inspect
try:
param = list(inspect.signature(line).parameters.values())[0]
if param.name != "iterable":
# ex: pz b64encode += (s.encode('utf-8'))
logger.debug("Let's try `s.encode('utf-8')` automatically too.")
attempts.append((original_line.encode("utf-8"), "s.encode('utf-8')"))
except ValueError: # ex: `set.add` raises no signature found
pass
else: # we are in the `--end` clause, original_line is empty, we use `lines` or `numbers` instead
if len(numbers) == len(lines):
# ex: echo -e "1\n2\n3\n4" | pz --end sum
attempts.append((numbers, "numbers", "end"))
# ex: echo -e "1\n2\n3\n4" | pz --end "' - '.join" -> 1 - 2 - 3 - 4
attempts.append((lines, "lines", "end"))
if not any(try_argument(line, *x) for x in attempts):
raise
else: # ex: int, str
write(line)
else:
if args.empty or (line == 0 and line is not False):
write(line)
def get_number(v):
num = None
try:
# we prefer having int over float because adding values '5' + '5' as '10'
# looks better than '10.0' in most use cases
num = float(v)
num = int(v) # "10.0" -> int conversion fails and num stays float
except (ValueError, TypeError):
pass
return num
def prepare_command(name):
original = cmd = (getattr(args, name) or "").strip() # read from args.main, args.end
if args.format:
cmd = "f'''" + cmd + "'''"
if name == "main" and regular_command:
# prepending `line = ` is not needed, the string is treated as a `match` parameter
pass
elif (len(cmd.splitlines()) == 1 # check if there is only a single line with a missing assignment
and not search(r"(s|skip)\s?[^=]?=[^=]", cmd) # ex: `s = s+= 1` would fail
# do not assign to reserved keywords (ex: `s = if s == 1: ...` would fail)
and not any(cmd.lstrip().startswith(keyword) for keyword in ("if", "while", "for"))
and ";" not in cmd
and "lines." not in cmd):
# "s = 1" - will not pass
# "s += 1" - will not pass
# "s + 1" - will pass
# "s == 1" - will pass
# "if s == 1: print(s)" - will not pass
cmd = ("skip = not " if args.filter else "s = ") + cmd
if name == "main":
if not cmd and args.generate is not None:
cmd = "# generator" # force args.run to be True by having a command
if args.stderr:
if not cmd:
# it is not intended to pipe everything to STDERR while everything is piped unchanged to STDOUT too
cmd = 's = None'
if not args.zero: # when using the --stderr flag, we pass the original content to the STDOUT
cmd += '\nsys.stdout.write(original_line+"\\n")'
if original != cmd: # verbose output
logger.debug(f"Changing the {name} clause to: {cmd.strip()}")
command[name] = cmd
if __name__ == "__main__":
# determine args.run and possibly turn on args.lines
args.run = True # True = run whole processing (output), 1 = partial run (populate `lines`), False = do not run
args.lines = not args.overflow_safe # whether to populate variables like: `lines`, `numbers`
if all(x is None for x in (args.main, args.end, args.generate)):
logger.error("You have to specify either main COMMAND or --end COMMAND.")
quit()
# prepare commands (prepend `line =` if needed)
command = {"main": "", "end": ""}
regular_command = None # prepare regular modifications
reg_ex = None
if args.match or args.findall or args.search or args.sub:
try:
reg_ex = re.compile(args.main)
except re.error as exc:
logger.error(f"re.error: Compilation failed with {exc}")
quit()
if args.match:
regular_command = reg_ex.match
elif args.search:
regular_command = reg_ex.search
elif args.findall:
regular_command = reg_ex.findall
elif args.sub:
regular_command = lambda line: reg_ex.sub(args.sub, line)
[prepare_command(name) for name in command]
if not command["main"]:
# no main clause specified -> we may limit or turn off processing (and output)
# if the user needs the program to pipe out continuously, they might want to use `s` as the main clause
# running can be skipped or at least run partially to fill `lines`
args.run = False if args.overflow_safe else 1
# empty variables available in the user scope
i = 0
S = set()
L = list()
D = dict()
C = Counter()
skip = None # if user sets to False, the line will not be output
# prepare text processing (either fetch whole or line by line)
# Note: do not initialize the `text` to None. We want to be able to catch
# `<class 'NameError'> name 'text' is not defined` while not turning `--text` on
text: str
if args.generate is not None:
# `--generate=5` → 1,2,3,4,5
# `--generate=0` → 1 .. infinity
# `--generate=0 --overflow_safe` → 1 × infinity
loop = (str(i).encode() for i in (range(1, args.generate + 1) if args.generate else # finite generate
(repeat(1) if args.overflow_safe else count_from(1)))) # infinite generator
logger.debug("Generating s = 1 .. " +
(str(args.generate) if args.generate else ("" if args.overflow_safe else "")))
# turn on flushing
# Ex: it took a lot of time before buffer flushed out when flushing to another pz instance
# (and not to the console) in the command: `pz -g0 "s = randint(1,100); sleep(0.01)" | pz s
flush = True
elif args.whole:
# fetch whole text
try:
b = sys.stdin.buffer.read().rstrip() # we strip the last newline
except KeyboardInterrupt:
logger.error("KeyboardInterrupt: Loading `text` interrupted.")
b = b""
loop = (line for line in b.splitlines()[:args.n])
try:
text = b.decode()
except UnicodeError:
logger.warning(f"Cannot parse the text variable correctly")
text = b.decode(errors="replace")
else:
# load lines one by one (while taking at most N lines)
loop = islice(sys.stdin.buffer, args.n)
# filled-in variables available in the user scope
b: bytes = None
s: str = None
n = None
lines: list
numbers: list
count: int # itertools.count are imported as count_from ← more common to use this variable over the other
if args.lines:
lines = []
numbers = []
count = 0
# internal processing variables
tried_to_correct_callable = False
original_line: str = None
# run the setup clause
if args.setup:
while True:
with auto_import():
exec(args.setup)
break
# run processing
if args.run: # speed up when there is no main clause
while True:
try:
try:
b = next(loop).rstrip()
original_line = s = b.decode()
except UnicodeError:
logger.warning(f"Cannot parse line correctly: {b}")
original_line = s = b.decode(errors="replace")
except StopIteration:
break
n = get_number(s)
if args.lines:
# these variables might be undefined by purpose (and user should not see it)
# noinspection PyUnboundLocalVariable
count += 1
# noinspection PyUnboundLocalVariable
lines.append(s)
if n:
# noinspection PyUnboundLocalVariable
numbers.append(n)
if args.run is not True: # speed up, further processing not needed
continue
while True:
with auto_import():
# loop until all on the fly imports are done
skip = None
# we process either a regular expression or a custom command
if regular_command:
try:
s = regular_command(s)
except re.error as exc:
logger.error(f"{exc}, regular expression: {args.main} on line: {s}")
break
else: # resolving custom command
# note that exec will not affect local field, hence we cannot easily put this in a method
exec(command["main"])
if skip or (skip_all and skip is not False): # user chooses to filter out the line
break
output(s)
break
except BrokenPipeError:
# do not continue processing when pipe is broken
# ex: process we pipe into is killed
# There is a chance we want to pipe something to the STDERR in the --end clause, hence we do not quit.
s = args.main if regular_command else command['main']
logger.debug(f"BrokenPipeError: No output pipe when processing the main clause '{s}'")
break
except KeyboardInterrupt: # useful for ending up an infinite generator
# Break but continue when SIGINT caught in case we have an END clause.
# No problem when output to the terminal or to the STDERR. If output to another process via STDOUT,
# since whole process group received SIGINT, the output is lost if the process is stopping right now,
# or BrokenPipeError is raised if the process has already stopped.
break
except Exception as exc:
logger.warning(f'Exception: {type(exc)} {exc} on line: {s}')
continue
# run final script
if command["end"]:
if not args.whole and args.lines:
# --text was off by default so we did not wait whole input to be piped in before processing.
# The variable `text` was not available before but there is no obstacle in letting it
# to be automatically available at the end we have everything needed in the `lines` variable.
text = "\n".join(lines)
original_line = s = n = b = None
try:
while True:
with auto_import():
exec(command["end"])
output(s, True)
break
except BrokenPipeError:
logger.debug(f"BrokenPipeError: No output pipe in the --end clause '{command['end']}'")
except Exception as exc:
logger.warning(f'Exception: {type(exc)} {exc} in the --end clause')
# Gratuitous exit
# Ex: The middle command in `pz -g0 s | pz -E "sleep(1)" s | xargs echo` would often end up with
# Exception ignored in: <_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>
# BrokenPipeError: [Errno 32] Broken pipe
# because `xargs` having received a SIGINT has already stopped.
# We prevent this situation by manually closing the STDOUT.
try:
sys.stdout.close()
except BrokenPipeError:
pass