This commit is contained in:
2025-05-22 21:22:15 +02:00
parent 3d57f842f9
commit 97cb9c8703
156 changed files with 1205 additions and 6603 deletions

View File

@@ -1,7 +1,6 @@
from __future__ import annotations
import logging
import operator
import re
import sys
import typing as t
@@ -26,102 +25,12 @@ class _Missing:
_missing = _Missing()
@t.overload
def _make_encode_wrapper(reference: str) -> t.Callable[[str], str]:
...
def _wsgi_decoding_dance(s: str) -> str:
return s.encode("latin1").decode(errors="replace")
@t.overload
def _make_encode_wrapper(reference: bytes) -> t.Callable[[str], bytes]:
...
def _make_encode_wrapper(reference: t.AnyStr) -> t.Callable[[str], t.AnyStr]:
"""Create a function that will be called with a string argument. If
the reference is bytes, values will be encoded to bytes.
"""
if isinstance(reference, str):
return lambda x: x
return operator.methodcaller("encode", "latin1")
def _check_str_tuple(value: tuple[t.AnyStr, ...]) -> None:
"""Ensure tuple items are all strings or all bytes."""
if not value:
return
item_type = str if isinstance(value[0], str) else bytes
if any(not isinstance(item, item_type) for item in value):
raise TypeError(f"Cannot mix str and bytes arguments (got {value!r})")
_default_encoding = sys.getdefaultencoding()
def _to_bytes(
x: str | bytes, charset: str = _default_encoding, errors: str = "strict"
) -> bytes:
if x is None or isinstance(x, bytes):
return x
if isinstance(x, (bytearray, memoryview)):
return bytes(x)
if isinstance(x, str):
return x.encode(charset, errors)
raise TypeError("Expected bytes")
@t.overload
def _to_str( # type: ignore
x: None,
charset: str | None = ...,
errors: str = ...,
allow_none_charset: bool = ...,
) -> None:
...
@t.overload
def _to_str(
x: t.Any,
charset: str | None = ...,
errors: str = ...,
allow_none_charset: bool = ...,
) -> str:
...
def _to_str(
x: t.Any | None,
charset: str | None = _default_encoding,
errors: str = "strict",
allow_none_charset: bool = False,
) -> str | bytes | None:
if x is None or isinstance(x, str):
return x
if not isinstance(x, (bytes, bytearray)):
return str(x)
if charset is None:
if allow_none_charset:
return x
return x.decode(charset, errors) # type: ignore
def _wsgi_decoding_dance(
s: str, charset: str = "utf-8", errors: str = "replace"
) -> str:
return s.encode("latin1").decode(charset, errors)
def _wsgi_encoding_dance(s: str, charset: str = "utf-8", errors: str = "strict") -> str:
return s.encode(charset).decode("latin1", errors)
def _wsgi_encoding_dance(s: str) -> str:
return s.encode().decode("latin1")
def _get_environ(obj: WSGIEnvironment | Request) -> WSGIEnvironment:
@@ -287,31 +196,6 @@ class _DictAccessorProperty(t.Generic[_TAccessorValue]):
return f"<{type(self).__name__} {self.name}>"
def _decode_idna(domain: str) -> str:
try:
data = domain.encode("ascii")
except UnicodeEncodeError:
# If the domain is not ASCII, it's decoded already.
return domain
try:
# Try decoding in one shot.
return data.decode("idna")
except UnicodeDecodeError:
pass
# Decode each part separately, leaving invalid parts as punycode.
parts = []
for part in data.split(b"."):
try:
parts.append(part.decode("idna"))
except UnicodeDecodeError:
parts.append(part.decode("ascii"))
return ".".join(parts)
_plain_int_re = re.compile(r"-?\d+", re.ASCII)