1
0
mirror of https://github.com/CZ-NIC/pz.git synced 2022-02-13 01:03:07 +03:00

raw bytes support

This commit is contained in:
Edvard Rejthar
2021-03-19 17:06:01 +01:00
parent 48aad5f6ef
commit 5f9b0a63be
4 changed files with 46 additions and 10 deletions

View File

@@ -9,6 +9,7 @@
- command chaining tuned up
- `--format`, `--stderr`, `--overflow-safe` flags
- `counter` variable
- raw bytes support
## 0.9 (2020-12-02)
- other modules added for auto-import

View File

@@ -33,6 +33,7 @@ wikipedia.com
* [Scope variables](#scope-variables)
+ [`s` current line](#s--current-line)
+ [`n` current line converted to an `int` (or `float`) if possible](#n--current-line-converted-to-an-int-or-float-if-possible)
+ [`b` current line as a byte-string](#b--current-line-as-a-byte-string)
+ [`count` current line number](#count--current-line-number)
+ [`text` whole text, all lines together](#text--whole-text-all-lines-together)
+ [`lines` list of lines so far processed](#lines--list-of-lines-so-far-processed)
@@ -338,6 +339,18 @@ echo 5 | pz n+2 # 7
echo 5.2 | pz n+2 # 7.2
```
### `b` current line as a byte-string
Sometimes the input cannot be converted to str easily. A warning is output, however, you can still operate with raw bytes.
```bash
echo -e '\x80 invalid line' | pz s
Cannot parse line correctly: b'\x80 invalid line'
<EFBFBD> invalid line
# use the `--quiet` flag to suppress the warning, then decode the bytes
echo -e '\x80 invalid line' | pz 'b.decode("cp1250")' --quiet
€ invalid line
```
### `count` current line number
```bash
# display every 1_000nth line

30
pz
View File

@@ -43,6 +43,7 @@ __doc__ = (f"Launch your tiny Python script on a piped in contents and pipe it o
f"\nAvailable variables:"
f"\n * s current line"
f"\n * n current line converted to an `int` (or `float`) if possible"
f"\n * b current line as a byte-string"
f"\n * text whole text, all lines together"
f"\n * lines list of lines so far processed"
f"\n * numbers list of numbers so far processed"
@@ -326,8 +327,8 @@ if __name__ == "__main__":
# `--generate=5` → 1,2,3,4,5
# `--generate=0` → 1 .. infinity
# `--generate=0 --overflow_safe` → 1 × infinity
loop = (str(i) for i in (range(1, args.generate + 1) if args.generate # finite generate
else (repeat(1) if args.overflow_safe else count_from(1)))) # infinite generator
loop = (str(i).encode() for i in (range(1, args.generate + 1) if args.generate else # finite generate
(repeat(1) if args.overflow_safe else count_from(1)))) # infinite generator
logger.debug("Generating s = 1 .. " +
(str(args.generate) if args.generate else ("" if args.overflow_safe else "∞")))
@@ -338,16 +339,23 @@ if __name__ == "__main__":
elif args.whole:
# fetch whole text
try:
text = sys.stdin.read().rstrip() # we strip the last newline
b = sys.stdin.buffer.read().rstrip() # we strip the last newline
except KeyboardInterrupt:
logger.error("KeyboardInterrupt: Loading `text` interrupted.")
text = ""
loop = (line for line in text.splitlines()[:args.n])
b = b""
loop = (line for line in b.splitlines()[:args.n])
try:
text = b.decode()
except UnicodeError:
logger.warning(f"Cannot parse the text variable correctly")
text = b.decode(errors="replace")
else:
# load lines one by one (while taking at most N lines)
loop = islice(sys.stdin, args.n)
loop = islice(sys.stdin.buffer, args.n)
# filled-in variables available in the user scope
b: bytes = None
s: str = None
n = None
lines: list
numbers: list
@@ -360,7 +368,7 @@ if __name__ == "__main__":
# internal processing variables
tried_to_correct_callable = False
original_line = s = None
original_line: str = None
# run the setup clause
if args.setup:
@@ -374,7 +382,11 @@ if __name__ == "__main__":
while True:
try:
try:
original_line = s = next(loop).rstrip()
b = next(loop).rstrip()
original_line = s = b.decode()
except UnicodeError:
logger.warning(f"Cannot parse line correctly: {b}")
original_line = s = b.decode(errors="replace")
except StopIteration:
break
n = get_number(s)
@@ -433,7 +445,7 @@ if __name__ == "__main__":
# to be automatically available at the end we have everything needed in the `lines` variable.
text = "\n".join(lines)
original_line = s = n = None
original_line = s = n = b = None
try:
while True:
with auto_import():

View File

@@ -140,13 +140,18 @@ class TestMaster(unittest.TestCase):
continue
if isinstance(expected, list):
expected = b"\n".join(str(x).encode() for x in expected) + b"\n"
if isinstance(expected, str):
expected = expected.encode() + b"\n"
self.assertEqual(expected, pipe)
except AssertionError:
debug = True
raise
finally:
if debug:
s = f"echo {stdin.decode()} | " if stdin else ""
try:
s = f"echo {stdin.decode()} | " if stdin else ""
except UnicodeError: # we are piping in non-unicode bytes
s = f"echo -e {stdin} | " if stdin else ""
print(f"Checking: {s}pz", raw_cmd,
"\nExpected STDOUT:", stdout, "\nExpected STDERR:", stderr, "\nOutput:", output)
@@ -339,6 +344,11 @@ class TestVariables(TestMaster):
("-g -n2", [1, 2])
)]
def test_bytes(self):
stdin, stdout = b'hello\n\x80invalid\nworld', ["hello", "€invalid", "world"]
self.check("'b.decode(\"1250\")'", stdout, r"Cannot parse line correctly: b'\x80invalid'", stdin)
self.check("'b.decode(\"1250\")' -q", stdout, False, stdin)
class TestReturnValues(TestMaster):
""" Correct command prepending etc. """