chore: 添加虚拟环境到仓库

- 添加 backend_service/venv 虚拟环境
- 包含所有Python依赖包
- 注意:虚拟环境约393MB,包含12655个文件
This commit is contained in:
2025-12-03 10:19:25 +08:00
parent a6c2027caa
commit c4f851d387
12655 changed files with 3009376 additions and 0 deletions

View File

@@ -0,0 +1,4 @@
from .constants import JSONReturnType
from .json_repair import from_file, load, loads, repair_json
__all__ = ["from_file", "load", "loads", "repair_json", "JSONReturnType"]

View File

@@ -0,0 +1,4 @@
from .json_repair import cli
if __name__ == "__main__":
cli()

View File

@@ -0,0 +1,4 @@
from typing import Any
JSONReturnType = dict[str, Any] | list[Any] | str | float | int | bool | None
STRING_DELIMITERS: list[str] = ['"', "'", "", ""]

View File

@@ -0,0 +1,42 @@
from enum import Enum, auto
class ContextValues(Enum):
OBJECT_KEY = auto()
OBJECT_VALUE = auto()
ARRAY = auto()
class JsonContext:
def __init__(self) -> None:
self.context: list[ContextValues] = []
self.current: ContextValues | None = None
self.empty: bool = True
def set(self, value: ContextValues) -> None:
"""
Set a new context value.
Args:
value (ContextValues): The context value to be added.
Returns:
None
"""
self.context.append(value)
self.current = value
self.empty = False
def reset(self) -> None:
"""
Remove the most recent context value.
Returns:
None
"""
try:
self.context.pop()
self.current = self.context[-1]
except IndexError:
self.current = None
self.empty = True

View File

@@ -0,0 +1,198 @@
from typing import Literal, TextIO
from .constants import STRING_DELIMITERS, JSONReturnType
from .json_context import JsonContext
from .object_comparer import ObjectComparer
from .parse_array import parse_array as _parse_array
from .parse_boolean_or_null import parse_boolean_or_null as _parse_boolean_or_null
from .parse_comment import parse_comment as _parse_comment
from .parse_number import parse_number as _parse_number
from .parse_object import parse_object as _parse_object
from .parse_string import parse_string as _parse_string
from .string_file_wrapper import StringFileWrapper
class JSONParser:
# Split the parse methods into separate files because this one was like 3000 lines
def parse_array(self, *args, **kwargs):
return _parse_array(self, *args, **kwargs)
def parse_boolean_or_null(self, *args, **kwargs):
return _parse_boolean_or_null(self, *args, **kwargs)
def parse_comment(self, *args, **kwargs):
return _parse_comment(self, *args, **kwargs)
def parse_number(self, *args, **kwargs):
return _parse_number(self, *args, **kwargs)
def parse_object(self, *args, **kwargs):
return _parse_object(self, *args, **kwargs)
def parse_string(self, *args, **kwargs):
return _parse_string(self, *args, **kwargs)
def __init__(
self,
json_str: str | StringFileWrapper,
json_fd: TextIO | None,
logging: bool | None,
json_fd_chunk_length: int = 0,
stream_stable: bool = False,
) -> None:
# The string to parse
self.json_str: str | StringFileWrapper = json_str
# Alternatively, the file description with a json file in it
if json_fd:
# This is a trick we do to treat the file wrapper as an array
self.json_str = StringFileWrapper(json_fd, json_fd_chunk_length)
# Index is our iterator that will keep track of which character we are looking at right now
self.index: int = 0
# This is used in the object member parsing to manage the special cases of missing quotes in key or value
self.context = JsonContext()
# Use this to log the activity, but only if logging is active
# This is a trick but a beautiful one. We call self.log in the code over and over even if it's not needed.
# We could add a guard in the code for each call but that would make this code unreadable, so here's this neat trick
# Replace self.log with a noop
self.logging = logging
if logging:
self.logger: list[dict[str, str]] = []
self.log = self._log
else:
# No-op
self.log = lambda *args, **kwargs: None # noqa: ARG005
# When the json to be repaired is the accumulation of streaming json at a certain moment.
# e.g. json obtained from llm response.
# If this parameter to True will keep the repair results stable. For example:
# case 1: '{"key": "val\\' => '{"key": "val"}'
# case 2: '{"key": "val\\n' => '{"key": "val\\n"}'
# case 3: '{"key": "val\\n123,`key2:value2' => '{"key": "val\\n123,`key2:value2"}'
# case 4: '{"key": "val\\n123,`key2:value2`"}' => '{"key": "val\\n123,`key2:value2`"}'
self.stream_stable = stream_stable
def parse(
self,
) -> JSONReturnType | tuple[JSONReturnType, list[dict[str, str]]]:
json = self.parse_json()
if self.index < len(self.json_str):
self.log(
"The parser returned early, checking if there's more json elements",
)
json = [json]
while self.index < len(self.json_str):
self.context.reset()
j = self.parse_json()
if j != "":
if ObjectComparer.is_same_object(json[-1], j):
# replace the last entry with the new one since the new one seems an update
json.pop()
json.append(j)
else:
# this was a bust, move the index
self.index += 1
# If nothing extra was found, don't return an array
if len(json) == 1:
self.log(
"There were no more elements, returning the element without the array",
)
json = json[0]
if self.logging:
return json, self.logger
else:
return json
def parse_json(
self,
) -> JSONReturnType:
while True:
char = self.get_char_at()
# False means that we are at the end of the string provided
if char is False:
return ""
# <object> starts with '{'
elif char == "{":
self.index += 1
return self.parse_object()
# <array> starts with '['
elif char == "[":
self.index += 1
return self.parse_array()
# <string> starts with a quote
elif not self.context.empty and (char in STRING_DELIMITERS or char.isalpha()):
return self.parse_string()
# <number> starts with [0-9] or minus
elif not self.context.empty and (char.isdigit() or char == "-" or char == "."):
return self.parse_number()
elif char in ["#", "/"]:
return self.parse_comment()
# If everything else fails, we just ignore and move on
else:
self.index += 1
def get_char_at(self, count: int = 0) -> str | Literal[False]:
# Why not use something simpler? Because try/except in python is a faster alternative to an "if" statement that is often True
try:
return self.json_str[self.index + count]
except IndexError:
return False
def skip_whitespaces_at(self, idx: int = 0, move_main_index=True) -> int:
"""
This function quickly iterates on whitespaces, syntactic sugar to make the code more concise
"""
try:
char = self.json_str[self.index + idx]
except IndexError:
return idx
while char.isspace():
if move_main_index:
self.index += 1
else:
idx += 1
try:
char = self.json_str[self.index + idx]
except IndexError:
return idx
return idx
def skip_to_character(self, character: str | list[str], idx: int = 0) -> int:
"""
Advance from (self.index + idx) until we hit an *unescaped* target character.
Returns the offset (idx) from self.index to that position, or the distance to the end if not found.
"""
targets = set(character) if isinstance(character, list) else {character}
i = self.index + idx
n = len(self.json_str)
backslashes = 0 # count of consecutive '\' immediately before current char
while i < n:
ch = self.json_str[i]
if ch == "\\":
backslashes += 1
i += 1
continue
# ch is not a backslash; if it's a target and not escaped (even backslashes), we're done
if ch in targets and (backslashes % 2 == 0):
return i - self.index
# reset backslash run when we see a non-backslash
backslashes = 0
i += 1
# not found; return distance to end
return n - self.index
def _log(self, text: str) -> None:
window: int = 10
start: int = max(self.index - window, 0)
end: int = min(self.index + window, len(self.json_str))
context: str = self.json_str[start:end]
self.logger.append(
{
"text": text,
"context": context,
}
)

View File

@@ -0,0 +1,279 @@
"""
This module will parse the JSON file following the BNF definition:
<json> ::= <container>
<primitive> ::= <number> | <string> | <boolean>
; Where:
; <number> is a valid real number expressed in one of a number of given formats
; <string> is a string of valid characters enclosed in quotes
; <boolean> is one of the literal strings 'true', 'false', or 'null' (unquoted)
<container> ::= <object> | <array>
<array> ::= '[' [ <json> *(', ' <json>) ] ']' ; A sequence of JSON values separated by commas
<object> ::= '{' [ <member> *(', ' <member>) ] '}' ; A sequence of 'members'
<member> ::= <string> ': ' <json> ; A pair consisting of a name, and a JSON value
If something is wrong (a missing parentheses or quotes for example) it will use a few simple heuristics to fix the JSON string:
- Add the missing parentheses if the parser believes that the array or object should be closed
- Quote strings or add missing single quotes
- Adjust whitespaces and remove line breaks
All supported use cases are in the unit tests
"""
import argparse
import json
import sys
from typing import Literal, TextIO, overload
from .constants import JSONReturnType
from .json_parser import JSONParser
@overload
def repair_json(
json_str: str = "",
return_objects: Literal[False] = False,
skip_json_loads: bool = False,
logging: bool = False,
json_fd: TextIO | None = None,
chunk_length: int = 0,
stream_stable: bool = False,
**json_dumps_args,
) -> str: ...
@overload
def repair_json(
json_str: str = "",
return_objects: Literal[True] = True,
skip_json_loads: bool = False,
logging: bool = False,
json_fd: TextIO | None = None,
chunk_length: int = 0,
stream_stable: bool = False,
**json_dumps_args,
) -> JSONReturnType | tuple[JSONReturnType, list[dict[str, str]]]: ...
def repair_json(
json_str: str = "",
return_objects: bool = False,
skip_json_loads: bool = False,
logging: bool = False,
json_fd: TextIO | None = None,
chunk_length: int = 0,
stream_stable: bool = False,
**json_dumps_args,
) -> JSONReturnType | tuple[JSONReturnType, list[dict[str, str]]] | tuple[JSONReturnType, list]:
"""
Given a json formatted string, it will try to decode it and, if it fails, it will try to fix it.
Args:
json_str (str, optional): The JSON string to repair. Defaults to an empty string.
return_objects (bool, optional): If True, return the decoded data structure. Defaults to False.
skip_json_loads (bool, optional): If True, skip calling the built-in json.loads() function to verify that the json is valid before attempting to repair. Defaults to False.
logging (bool, optional): If True, return a tuple with the repaired json and a log of all repair actions. Defaults to False. When no repairs were required, the repair log will be an empty list.
json_fd (Optional[TextIO], optional): File descriptor for JSON input. Do not use! Use `from_file` or `load` instead. Defaults to None.
ensure_ascii (bool, optional): Set to False to avoid converting non-latin characters to ascii (for example when using chinese characters). Defaults to True. Ignored if `skip_json_loads` is True.
chunk_length (int, optional): Size in bytes of the file chunks to read at once. Ignored if `json_fd` is None. Do not use! Use `from_file` or `load` instead. Defaults to 1MB.
stream_stable (bool, optional): When the json to be repaired is the accumulation of streaming json at a certain moment.If this parameter to True will keep the repair results stable.
Returns:
Union[JSONReturnType, Tuple[JSONReturnType, List[Dict[str, str]]]]: The repaired JSON or a tuple with the repaired JSON and repair log when logging is True.
"""
parser = JSONParser(json_str, json_fd, logging, chunk_length, stream_stable)
if skip_json_loads:
parsed_json = parser.parse()
else:
try:
parsed_json = json.load(json_fd) if json_fd else json.loads(json_str)
except json.JSONDecodeError:
parsed_json = parser.parse()
# It's useful to return the actual object instead of the json string,
# it allows this lib to be a replacement of the json library
if return_objects or logging:
# If logging is True, the user should expect a tuple.
# If json.load(s) worked, the repair log list is empty
if logging and not isinstance(parsed_json, tuple):
return parsed_json, []
return parsed_json
# Avoid returning only a pair of quotes if it's an empty string
elif parsed_json == "":
return ""
return json.dumps(parsed_json, **json_dumps_args)
def loads(
json_str: str,
skip_json_loads: bool = False,
logging: bool = False,
stream_stable: bool = False,
) -> JSONReturnType | tuple[JSONReturnType, list[dict[str, str]]] | str:
"""
This function works like `json.loads()` except that it will fix your JSON in the process.
It is a wrapper around the `repair_json()` function with `return_objects=True`.
Args:
json_str (str): The JSON string to load and repair.
skip_json_loads (bool, optional): If True, skip calling the built-in json.loads() function to verify that the json is valid before attempting to repair. Defaults to False.
logging (bool, optional): If True, return a tuple with the repaired json and a log of all repair actions. Defaults to False.
Returns:
Union[JSONReturnType, Tuple[JSONReturnType, List[Dict[str, str]]], str]: The repaired JSON object or a tuple with the repaired JSON object and repair log.
"""
return repair_json(
json_str=json_str,
return_objects=True,
skip_json_loads=skip_json_loads,
logging=logging,
stream_stable=stream_stable,
)
def load(
fd: TextIO,
skip_json_loads: bool = False,
logging: bool = False,
chunk_length: int = 0,
) -> JSONReturnType | tuple[JSONReturnType, list[dict[str, str]]]:
"""
This function works like `json.load()` except that it will fix your JSON in the process.
It is a wrapper around the `repair_json()` function with `json_fd=fd` and `return_objects=True`.
Args:
fd (TextIO): File descriptor for JSON input.
skip_json_loads (bool, optional): If True, skip calling the built-in json.loads() function to verify that the json is valid before attempting to repair. Defaults to False.
logging (bool, optional): If True, return a tuple with the repaired json and a log of all repair actions. Defaults to False.
chunk_length (int, optional): Size in bytes of the file chunks to read at once. Defaults to 1MB.
Returns:
Union[JSONReturnType, Tuple[JSONReturnType, List[Dict[str, str]]]]: The repaired JSON object or a tuple with the repaired JSON object and repair log.
"""
return repair_json(
json_fd=fd,
chunk_length=chunk_length,
return_objects=True,
skip_json_loads=skip_json_loads,
logging=logging,
)
def from_file(
filename: str,
skip_json_loads: bool = False,
logging: bool = False,
chunk_length: int = 0,
) -> JSONReturnType | tuple[JSONReturnType, list[dict[str, str]]]:
"""
This function is a wrapper around `load()` so you can pass the filename as string
Args:
filename (str): The name of the file containing JSON data to load and repair.
skip_json_loads (bool, optional): If True, skip calling the built-in json.loads() function to verify that the json is valid before attempting to repair. Defaults to False.
logging (bool, optional): If True, return a tuple with the repaired json and a log of all repair actions. Defaults to False.
chunk_length (int, optional): Size in bytes of the file chunks to read at once. Defaults to 1MB.
Returns:
Union[JSONReturnType, Tuple[JSONReturnType, List[Dict[str, str]]]]: The repaired JSON object or a tuple with the repaired JSON object and repair log.
"""
with open(filename) as fd:
jsonobj = load(
fd=fd,
skip_json_loads=skip_json_loads,
logging=logging,
chunk_length=chunk_length,
)
return jsonobj
def cli(inline_args: list[str] | None = None) -> int:
"""
Command-line interface for repairing and parsing JSON files.
Args:
inline_args (Optional[List[str]]): List of command-line arguments for testing purposes. Defaults to None.
- filename (str): The JSON file to repair. If omitted, the JSON is read from stdin.
- -i, --inline (bool): Replace the file inline instead of returning the output to stdout.
- -o, --output TARGET (str): If specified, the output will be written to TARGET filename instead of stdout.
- --ensure_ascii (bool): Pass ensure_ascii=True to json.dumps(). Will pass False otherwise.
- --indent INDENT (int): Number of spaces for indentation (Default 2).
Returns:
int: Exit code of the CLI operation.
Raises:
Exception: Any exception that occurs during file processing.
Example:
>>> cli(['example.json', '--indent', '4'])
>>> cat json.txt | json_repair
"""
parser = argparse.ArgumentParser(description="Repair and parse JSON files.")
# Make the filename argument optional; if omitted, we will read from stdin.
parser.add_argument(
"filename",
nargs="?",
help="The JSON file to repair (if omitted, reads from stdin)",
)
parser.add_argument(
"-i",
"--inline",
action="store_true",
help="Replace the file inline instead of returning the output to stdout",
)
parser.add_argument(
"-o",
"--output",
metavar="TARGET",
help="If specified, the output will be written to TARGET filename instead of stdout",
)
parser.add_argument(
"--ensure_ascii",
action="store_true",
help="Pass ensure_ascii=True to json.dumps()",
)
parser.add_argument(
"--indent",
type=int,
default=2,
help="Number of spaces for indentation (Default 2)",
)
args = parser.parse_args() if inline_args is None else parser.parse_args(inline_args)
# Inline mode requires a filename, so error out if none was provided.
if args.inline and not args.filename: # pragma: no cover
print("Error: Inline mode requires a filename", file=sys.stderr)
sys.exit(1)
if args.inline and args.output: # pragma: no cover
print("Error: You cannot pass both --inline and --output", file=sys.stderr)
sys.exit(1)
ensure_ascii = False
if args.ensure_ascii:
ensure_ascii = True
try:
# Use from_file if a filename is provided; otherwise read from stdin.
if args.filename:
result = from_file(args.filename)
else:
data = sys.stdin.read()
result = loads(data)
if args.inline or args.output:
with open(args.output or args.filename, mode="w") as fd:
json.dump(result, fd, indent=args.indent, ensure_ascii=ensure_ascii)
else:
print(json.dumps(result, indent=args.indent, ensure_ascii=ensure_ascii))
except Exception as e: # pragma: no cover
print(f"Error: {str(e)}", file=sys.stderr)
return 1
return 0 # Success
if __name__ == "__main__": # pragma: no cover
sys.exit(cli())

View File

@@ -0,0 +1,47 @@
from typing import Any
class ObjectComparer: # pragma: no cover
def __init__(self) -> None:
pass # No operation performed in the constructor
@staticmethod
def is_same_object(obj1: Any, obj2: Any) -> bool:
"""
Recursively compares two objects and ensures that:
- Their types match
- Their keys/structure match
"""
if type(obj1) is not type(obj2):
# Fail immediately if the types don't match
return False
if isinstance(obj1, dict):
# Check that both are dicts and same length
if not isinstance(obj2, dict) or len(obj1) != len(obj2):
return False
for key in obj1:
if key not in obj2:
return False
# Recursively compare each value
if not ObjectComparer.is_same_object(obj1[key], obj2[key]):
return False
return True
elif isinstance(obj1, list):
# Check that both are lists and same length
if not isinstance(obj2, list) or len(obj1) != len(obj2):
return False
# Recursively compare each item
return all(ObjectComparer.is_same_object(obj1[i], obj2[i]) for i in range(len(obj1)))
# For atomic values: types already match, so return True
return True
@staticmethod
def is_strictly_empty(value: Any) -> bool:
"""
Returns True if value is an empty container (str, list, dict, set, tuple).
Returns False for non-containers like None, 0, False, etc.
"""
return isinstance(value, str | list | dict | set | tuple) and len(value) == 0

View File

@@ -0,0 +1,56 @@
from typing import TYPE_CHECKING
from .constants import STRING_DELIMITERS, JSONReturnType
from .json_context import ContextValues
from .object_comparer import ObjectComparer
if TYPE_CHECKING:
from .json_parser import JSONParser
def parse_array(self: "JSONParser") -> list[JSONReturnType]:
# <array> ::= '[' [ <json> *(', ' <json>) ] ']' ; A sequence of JSON values separated by commas
arr = []
self.context.set(ContextValues.ARRAY)
# Stop when you either find the closing parentheses or you have iterated over the entire string
char = self.get_char_at()
while char and char not in ["]", "}"]:
self.skip_whitespaces_at()
value: JSONReturnType = ""
if char in STRING_DELIMITERS:
# Sometimes it can happen that LLMs forget to start an object and then you think it's a string in an array
# So we are going to check if this string is followed by a : or not
# And either parse the string or parse the object
i = 1
i = self.skip_to_character(char, i)
i = self.skip_whitespaces_at(idx=i + 1, move_main_index=False)
value = self.parse_object() if self.get_char_at(i) == ":" else self.parse_string()
else:
value = self.parse_json()
# It is possible that parse_json() returns nothing valid, so we increase by 1
if ObjectComparer.is_strictly_empty(value):
self.index += 1
elif value == "..." and self.get_char_at(-1) == ".":
self.log(
"While parsing an array, found a stray '...'; ignoring it",
)
else:
arr.append(value)
# skip over whitespace after a value but before closing ]
char = self.get_char_at()
while char and char != "]" and (char.isspace() or char == ","):
self.index += 1
char = self.get_char_at()
# Especially at the end of an LLM generated json you might miss the last "]"
if char and char != "]":
self.log(
"While parsing an array we missed the closing ], ignoring it",
)
self.index += 1
self.context.reset()
return arr

View File

@@ -0,0 +1,30 @@
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .json_parser import JSONParser
def parse_boolean_or_null(self: "JSONParser") -> bool | str | None:
# <boolean> is one of the literal strings 'true', 'false', or 'null' (unquoted)
starting_index = self.index
char = (self.get_char_at() or "").lower()
value: tuple[str, bool | None] | None = None
if char == "t":
value = ("true", True)
elif char == "f":
value = ("false", False)
elif char == "n":
value = ("null", None)
if value:
i = 0
while char and i < len(value[0]) and char == value[0][i]:
i += 1
self.index += 1
char = (self.get_char_at() or "").lower()
if i == len(value[0]):
return value[1]
# If nothing works reset the index before returning
self.index = starting_index
return ""

View File

@@ -0,0 +1,71 @@
from typing import TYPE_CHECKING
from .constants import JSONReturnType
from .json_context import ContextValues
if TYPE_CHECKING:
from .json_parser import JSONParser
def parse_comment(self: "JSONParser") -> JSONReturnType:
"""
Parse code-like comments:
- "# comment": A line comment that continues until a newline.
- "// comment": A line comment that continues until a newline.
- "/* comment */": A block comment that continues until the closing delimiter "*/".
The comment is skipped over and an empty string is returned so that comments do not interfere
with the actual JSON elements.
"""
char = self.get_char_at()
termination_characters = ["\n", "\r"]
if ContextValues.ARRAY in self.context.context:
termination_characters.append("]")
if ContextValues.OBJECT_VALUE in self.context.context:
termination_characters.append("}")
if ContextValues.OBJECT_KEY in self.context.context:
termination_characters.append(":")
# Line comment starting with #
if char == "#":
comment = ""
while char and char not in termination_characters:
comment += char
self.index += 1
char = self.get_char_at()
self.log(f"Found line comment: {comment}, ignoring")
# Comments starting with '/'
elif char == "/":
next_char = self.get_char_at(1)
# Handle line comment starting with //
if next_char == "/":
comment = "//"
self.index += 2 # Skip both slashes.
char = self.get_char_at()
while char and char not in termination_characters:
comment += char
self.index += 1
char = self.get_char_at()
self.log(f"Found line comment: {comment}, ignoring")
# Handle block comment starting with /*
elif next_char == "*":
comment = "/*"
self.index += 2 # Skip '/*'
while True:
char = self.get_char_at()
if not char:
self.log("Reached end-of-string while parsing block comment; unclosed block comment.")
break
comment += char
self.index += 1
if comment.endswith("*/"):
break
self.log(f"Found block comment: {comment}, ignoring")
else:
# Skip standalone '/' characters that are not part of a comment
# to avoid getting stuck in an infinite loop
self.index += 1
if self.context.empty:
return self.parse_json()
else:
return ""

View File

@@ -0,0 +1,37 @@
from typing import TYPE_CHECKING
from .json_context import ContextValues
NUMBER_CHARS: set[str] = set("0123456789-.eE/,")
if TYPE_CHECKING:
from .json_parser import JSONParser
def parse_number(self: "JSONParser") -> float | int | str | bool | None:
# <number> is a valid real number expressed in one of a number of given formats
number_str = ""
char = self.get_char_at()
is_array = self.context.current == ContextValues.ARRAY
while char and char in NUMBER_CHARS and (not is_array or char != ","):
number_str += char
self.index += 1
char = self.get_char_at()
if number_str and number_str[-1] in "-eE/,":
# The number ends with a non valid character for a number/currency, rolling back one
number_str = number_str[:-1]
self.index -= 1
elif (self.get_char_at() or "").isalpha():
# this was a string instead, sorry
self.index -= len(number_str)
return self.parse_string()
try:
if "," in number_str:
return str(number_str)
if "." in number_str or "e" in number_str or "E" in number_str:
return float(number_str)
else:
return int(number_str)
except ValueError:
return number_str

View File

@@ -0,0 +1,143 @@
from typing import TYPE_CHECKING
from .constants import STRING_DELIMITERS, JSONReturnType
from .json_context import ContextValues
if TYPE_CHECKING:
from .json_parser import JSONParser
def parse_object(self: "JSONParser") -> JSONReturnType:
# <object> ::= '{' [ <member> *(', ' <member>) ] '}' ; A sequence of 'members'
obj: dict[str, JSONReturnType] = {}
start_index = self.index
# Stop when you either find the closing parentheses or you have iterated over the entire string
while (self.get_char_at() or "}") != "}":
# This is what we expect to find:
# <member> ::= <string> ': ' <json>
# Skip filler whitespaces
self.skip_whitespaces_at()
# Sometimes LLMs do weird things, if we find a ":" so early, we'll change it to "," and move on
if (self.get_char_at() or "") == ":":
self.log(
"While parsing an object we found a : before a key, ignoring",
)
self.index += 1
# We are now searching for they string key
# Context is used in the string parser to manage the lack of quotes
self.context.set(ContextValues.OBJECT_KEY)
# Save this index in case we need find a duplicate key
rollback_index = self.index
# <member> starts with a <string>
key = ""
while self.get_char_at():
# The rollback index needs to be updated here in case the key is empty
rollback_index = self.index
if self.get_char_at() == "[" and key == "":
# Is this an array?
# Need to check if the previous parsed value contained in obj is an array and in that case parse and merge the two
prev_key = list(obj.keys())[-1] if obj else None
if prev_key and isinstance(obj[prev_key], list):
# If the previous key's value is an array, parse the new array and merge
self.index += 1
new_array = self.parse_array()
if isinstance(new_array, list):
# Merge and flatten the arrays
prev_value = obj[prev_key]
if isinstance(prev_value, list):
prev_value.extend(
new_array[0] if len(new_array) == 1 and isinstance(new_array[0], list) else new_array
)
self.skip_whitespaces_at()
if self.get_char_at() == ",":
self.index += 1
self.skip_whitespaces_at()
continue
key = str(self.parse_string())
if key == "":
self.skip_whitespaces_at()
if key != "" or (key == "" and self.get_char_at() in [":", "}"]):
# If the string is empty but there is a object divider, we are done here
break
if ContextValues.ARRAY in self.context.context and key in obj:
self.log(
"While parsing an object we found a duplicate key, closing the object here and rolling back the index",
)
self.index = rollback_index - 1
# add an opening curly brace to make this work
self.json_str = self.json_str[: self.index + 1] + "{" + self.json_str[self.index + 1 :]
break
# Skip filler whitespaces
self.skip_whitespaces_at()
# We reached the end here
if (self.get_char_at() or "}") == "}":
continue
self.skip_whitespaces_at()
# An extreme case of missing ":" after a key
if (self.get_char_at() or "") != ":":
self.log(
"While parsing an object we missed a : after a key",
)
self.index += 1
self.context.reset()
self.context.set(ContextValues.OBJECT_VALUE)
# The value can be any valid json
self.skip_whitespaces_at()
# Corner case, a lone comma
value: JSONReturnType = ""
if (self.get_char_at() or "") in [",", "}"]:
self.log(
"While parsing an object value we found a stray , ignoring it",
)
else:
value = self.parse_json()
# Reset context since our job is done
self.context.reset()
obj[key] = value
if (self.get_char_at() or "") in [",", "'", '"']:
self.index += 1
# Remove trailing spaces
self.skip_whitespaces_at()
self.index += 1
# If the object is empty but also isn't just {}
if not obj and self.index - start_index > 2:
self.log("Parsed object is empty, we will try to parse this as an array instead")
self.index = start_index
return self.parse_array()
# Check if there are more key-value pairs after the closing brace
# This handles cases like '{"key": "value"}, "key2": "value2"}'
# But only if we're not in a nested context
if not self.context.empty:
return obj
self.skip_whitespaces_at()
if (self.get_char_at() or "") != ",":
return obj
self.index += 1
self.skip_whitespaces_at()
if (self.get_char_at() or "") not in STRING_DELIMITERS:
return obj
self.log(
"Found a comma and string delimiter after object closing brace, checking for additional key-value pairs",
)
additional_obj = self.parse_object()
if isinstance(additional_obj, dict):
obj.update(additional_obj)
return obj

View File

@@ -0,0 +1,482 @@
from typing import TYPE_CHECKING
from .constants import STRING_DELIMITERS, JSONReturnType
from .json_context import ContextValues
from .parse_string_helpers.parse_json_llm_block import parse_json_llm_block
if TYPE_CHECKING:
from .json_parser import JSONParser
def parse_string(self: "JSONParser") -> JSONReturnType:
# <string> is a string of valid characters enclosed in quotes
# i.e. { name: "John" }
# Somehow all weird cases in an invalid JSON happen to be resolved in this function, so be careful here
# Flag to manage corner cases related to missing starting quote
missing_quotes = False
doubled_quotes = False
lstring_delimiter = rstring_delimiter = '"'
char = self.get_char_at()
if char in ["#", "/"]:
return self.parse_comment()
# A valid string can only start with a valid quote or, in our case, with a literal
while char and char not in STRING_DELIMITERS and not char.isalnum():
self.index += 1
char = self.get_char_at()
if not char:
# This is an empty string
return ""
# Ensuring we use the right delimiter
if char == "'":
lstring_delimiter = rstring_delimiter = "'"
elif char == "":
lstring_delimiter = ""
rstring_delimiter = ""
elif char.isalnum():
# This could be a <boolean> and not a string. Because (T)rue or (F)alse or (N)ull are valid
# But remember, object keys are only of type string
if char.lower() in ["t", "f", "n"] and self.context.current != ContextValues.OBJECT_KEY:
value = self.parse_boolean_or_null()
if value != "":
return value
self.log(
"While parsing a string, we found a literal instead of a quote",
)
missing_quotes = True
if not missing_quotes:
self.index += 1
if self.get_char_at() == "`":
ret_val = parse_json_llm_block(self)
# If we found a valid JSON block, return it, otherwise continue parsing the string
if ret_val is not False:
return ret_val
self.log(
"While parsing a string, we found code fences but they did not enclose valid JSON, continuing parsing the string",
)
# There is sometimes a weird case of doubled quotes, we manage this also later in the while loop
if self.get_char_at() in STRING_DELIMITERS and self.get_char_at() == lstring_delimiter:
# If it's an empty key, this was easy
if (self.context.current == ContextValues.OBJECT_KEY and self.get_char_at(1) == ":") or (
self.context.current == ContextValues.OBJECT_VALUE and self.get_char_at(1) in [",", "}"]
):
self.index += 1
return ""
elif self.get_char_at(1) == lstring_delimiter:
# There's something fishy about this, we found doubled quotes and then again quotes
self.log(
"While parsing a string, we found a doubled quote and then a quote again, ignoring it",
)
return ""
# Find the next delimiter
i = self.skip_to_character(character=rstring_delimiter, idx=1)
next_c = self.get_char_at(i)
# Now check that the next character is also a delimiter to ensure that we have "".....""
# In that case we ignore this rstring delimiter
if next_c and (self.get_char_at(i + 1) or "") == rstring_delimiter:
self.log(
"While parsing a string, we found a valid starting doubled quote",
)
doubled_quotes = True
self.index += 1
else:
# Ok this is not a doubled quote, check if this is an empty string or not
i = self.skip_whitespaces_at(idx=1, move_main_index=False)
next_c = self.get_char_at(i)
if next_c in STRING_DELIMITERS + ["{", "["]:
# something fishy is going on here
self.log(
"While parsing a string, we found a doubled quote but also another quote afterwards, ignoring it",
)
self.index += 1
return ""
elif next_c not in [",", "]", "}"]:
self.log(
"While parsing a string, we found a doubled quote but it was a mistake, removing one quote",
)
self.index += 1
# Initialize our return value
string_acc = ""
# Here things get a bit hairy because a string missing the final quote can also be a key or a value in an object
# In that case we need to use the ":|,|}" characters as terminators of the string
# So this will stop if:
# * It finds a closing quote
# * It iterated over the entire sequence
# * If we are fixing missing quotes in an object, when it finds the special terminators
char = self.get_char_at()
unmatched_delimiter = False
while char and char != rstring_delimiter:
if missing_quotes:
if self.context.current == ContextValues.OBJECT_KEY and (char == ":" or char.isspace()):
self.log(
"While parsing a string missing the left delimiter in object key context, we found a :, stopping here",
)
break
elif self.context.current == ContextValues.ARRAY and char in ["]", ","]:
self.log(
"While parsing a string missing the left delimiter in array context, we found a ] or ,, stopping here",
)
break
if (
not self.stream_stable
and self.context.current == ContextValues.OBJECT_VALUE
and char
in [
",",
"}",
]
and (not string_acc or string_acc[-1] != rstring_delimiter)
):
rstring_delimiter_missing = True
# check if this is a case in which the closing comma is NOT missing instead
self.skip_whitespaces_at()
if self.get_char_at(1) == "\\":
# Ok this is a quoted string, skip
rstring_delimiter_missing = False
i = self.skip_to_character(character=rstring_delimiter, idx=1)
next_c = self.get_char_at(i)
if next_c:
i += 1
# found a delimiter, now we need to check that is followed strictly by a comma or brace
# or the string ended
i = self.skip_whitespaces_at(idx=i, move_main_index=False)
next_c = self.get_char_at(i)
if not next_c or next_c in [",", "}"]:
rstring_delimiter_missing = False
else:
# OK but this could still be some garbage at the end of the string
# So we need to check if we find a new lstring_delimiter afterwards
# If we do, maybe this is a missing delimiter
i = self.skip_to_character(character=lstring_delimiter, idx=i)
next_c = self.get_char_at(i)
if not next_c:
rstring_delimiter_missing = False
else:
# But again, this could just be something a bit stupid like "lorem, "ipsum" sic"
# Check if we find a : afterwards (skipping space)
i = self.skip_whitespaces_at(idx=i + 1, move_main_index=False)
next_c = self.get_char_at(i)
if next_c and next_c != ":":
rstring_delimiter_missing = False
else:
# There could be a case in which even the next key:value is missing delimeters
# because it might be a systemic issue with the output
# So let's check if we can find a : in the string instead
i = self.skip_to_character(character=":", idx=1)
next_c = self.get_char_at(i)
if next_c:
# OK then this is a systemic issue with the output
break
else:
# skip any whitespace first
i = self.skip_whitespaces_at(idx=1, move_main_index=False)
# We couldn't find any rstring_delimeter before the end of the string
# check if this is the last string of an object and therefore we can keep going
# make an exception if this is the last char before the closing brace
j = self.skip_to_character(character="}", idx=i)
if j - i > 1:
# Ok it's not right after the comma
# Let's ignore
rstring_delimiter_missing = False
# Check that j was not out of bound
elif self.get_char_at(j):
# Check for an unmatched opening brace in string_acc
for c in reversed(string_acc):
if c == "{":
# Ok then this is part of the string
rstring_delimiter_missing = False
break
if rstring_delimiter_missing:
self.log(
"While parsing a string missing the left delimiter in object value context, we found a , or } and we couldn't determine that a right delimiter was present. Stopping here",
)
break
if (
not self.stream_stable
and char == "]"
and ContextValues.ARRAY in self.context.context
and (not string_acc or string_acc[-1] != rstring_delimiter)
):
# We found the end of an array and we are in array context
# So let's check if we find a rstring_delimiter forward otherwise end early
i = self.skip_to_character(rstring_delimiter)
if not self.get_char_at(i):
# No delimiter found
break
if self.context.current == ContextValues.OBJECT_VALUE and char == "}":
# We found the end of an object while parsing a value
# Check if the object is really over, to avoid doubling the closing brace
i = self.skip_whitespaces_at(idx=1, move_main_index=False)
next_c = self.get_char_at(i)
if next_c and next_c == "`":
# This could be a special case in which the LLM added code fences after the object
# So we need to check if there are another two ` after this one`
next_c = self.get_char_at(i + 1)
if next_c and next_c == "`":
next_c = self.get_char_at(i + 2)
if next_c and next_c == "`":
self.log(
"While parsing a string in object value context, we found a } that closes the object before code fences, stopping here",
)
break
if not next_c:
self.log(
"While parsing a string in object value context, we found a } that closes the object, stopping here",
)
break
string_acc += char
self.index += 1
char = self.get_char_at()
# Unclosed string ends with a \ character. This character is ignored if stream_stable = True.
if self.stream_stable and not char and string_acc and string_acc[-1] == "\\":
string_acc = string_acc[:-1]
if char and string_acc and string_acc[-1] == "\\":
# This is a special case, if people use real strings this might happen
self.log("Found a stray escape sequence, normalizing it")
if char in [rstring_delimiter, "t", "n", "r", "b", "\\"]:
string_acc = string_acc[:-1]
escape_seqs = {"t": "\t", "n": "\n", "r": "\r", "b": "\b"}
string_acc += escape_seqs.get(char, char)
self.index += 1
char = self.get_char_at()
while char and string_acc and string_acc[-1] == "\\" and char in [rstring_delimiter, "\\"]:
# this is a bit of a special case, if I don't do this it will close the loop or create a train of \\
# I don't love it though
string_acc = string_acc[:-1] + char
self.index += 1
char = self.get_char_at()
continue
elif char in ["u", "x"]:
# If we find a unicode escape sequence, normalize it
num_chars = 4 if char == "u" else 2
next_chars = self.json_str[self.index + 1 : self.index + 1 + num_chars]
if len(next_chars) == num_chars and all(c in "0123456789abcdefABCDEF" for c in next_chars):
self.log("Found a unicode escape sequence, normalizing it")
string_acc = string_acc[:-1] + chr(int(next_chars, 16))
self.index += 1 + num_chars
char = self.get_char_at()
continue
elif char in STRING_DELIMITERS and char != rstring_delimiter:
self.log("Found a delimiter that was escaped but shouldn't be escaped, removing the escape")
string_acc = string_acc[:-1] + char
self.index += 1
char = self.get_char_at()
continue
# If we are in object key context and we find a colon, it could be a missing right quote
if char == ":" and not missing_quotes and self.context.current == ContextValues.OBJECT_KEY:
# Ok now we need to check if this is followed by a value like "..."
i = self.skip_to_character(character=lstring_delimiter, idx=1)
next_c = self.get_char_at(i)
if next_c:
i += 1
# found the first delimiter
i = self.skip_to_character(character=rstring_delimiter, idx=i)
next_c = self.get_char_at(i)
if next_c:
# found a second delimiter
i += 1
# Skip spaces
i = self.skip_whitespaces_at(idx=i, move_main_index=False)
next_c = self.get_char_at(i)
if next_c and next_c in [",", "}"]:
# Ok then this is a missing right quote
self.log(
"While parsing a string missing the right delimiter in object key context, we found a :, stopping here",
)
break
else:
# The string ended without finding a lstring_delimiter, I will assume this is a missing right quote
self.log(
"While parsing a string missing the right delimiter in object key context, we found a :, stopping here",
)
break
# ChatGPT sometimes forget to quote stuff in html tags or markdown, so we do this whole thing here
if char == rstring_delimiter and string_acc and string_acc[-1] != "\\":
# Special case here, in case of double quotes one after another
if doubled_quotes and self.get_char_at(1) == rstring_delimiter:
self.log("While parsing a string, we found a doubled quote, ignoring it")
self.index += 1
elif missing_quotes and self.context.current == ContextValues.OBJECT_VALUE:
# In case of missing starting quote I need to check if the delimeter is the end or the beginning of a key
i = 1
next_c = self.get_char_at(i)
while next_c and next_c not in [
rstring_delimiter,
lstring_delimiter,
]:
i += 1
next_c = self.get_char_at(i)
if next_c:
# We found a quote, now let's make sure there's a ":" following
i += 1
# found a delimiter, now we need to check that is followed strictly by a comma or brace
i = self.skip_whitespaces_at(idx=i, move_main_index=False)
next_c = self.get_char_at(i)
if next_c and next_c == ":":
# Reset the cursor
self.index -= 1
char = self.get_char_at()
self.log(
"In a string with missing quotes and object value context, I found a delimeter but it turns out it was the beginning on the next key. Stopping here.",
)
break
elif unmatched_delimiter:
unmatched_delimiter = False
string_acc += str(char)
self.index += 1
char = self.get_char_at()
else:
# Check if eventually there is a rstring delimiter, otherwise we bail
i = 1
next_c = self.get_char_at(i)
check_comma_in_object_value = True
while next_c and next_c not in [
rstring_delimiter,
lstring_delimiter,
]:
# This is a bit of a weird workaround, essentially in object_value context we don't always break on commas
# This is because the routine after will make sure to correct any bad guess and this solves a corner case
if check_comma_in_object_value and next_c.isalpha():
check_comma_in_object_value = False
# If we are in an object context, let's check for the right delimiters
if (
(ContextValues.OBJECT_KEY in self.context.context and next_c in [":", "}"])
or (ContextValues.OBJECT_VALUE in self.context.context and next_c == "}")
or (ContextValues.ARRAY in self.context.context and next_c in ["]", ","])
or (
check_comma_in_object_value
and self.context.current == ContextValues.OBJECT_VALUE
and next_c == ","
)
):
break
i += 1
next_c = self.get_char_at(i)
# If we stopped for a comma in object_value context, let's check if find a "} at the end of the string
if next_c == "," and self.context.current == ContextValues.OBJECT_VALUE:
i += 1
i = self.skip_to_character(character=rstring_delimiter, idx=i)
next_c = self.get_char_at(i)
# Ok now I found a delimiter, let's skip whitespaces and see if next we find a } or a ,
i += 1
i = self.skip_whitespaces_at(idx=i, move_main_index=False)
next_c = self.get_char_at(i)
if next_c in ["}", ","]:
self.log(
"While parsing a string, we a misplaced quote that would have closed the string but has a different meaning here, ignoring it",
)
string_acc += str(char)
self.index += 1
char = self.get_char_at()
continue
elif next_c == rstring_delimiter and self.get_char_at(i - 1) != "\\":
# Check if self.index:self.index+i is only whitespaces, break if that's the case
if all(str(self.get_char_at(j)).isspace() for j in range(1, i) if self.get_char_at(j)):
break
if self.context.current == ContextValues.OBJECT_VALUE:
i = self.skip_whitespaces_at(idx=i + 1, move_main_index=False)
if self.get_char_at(i) == ",":
# So we found a comma, this could be a case of a single quote like "va"lue",
# Search if it's followed by another key, starting with the first delimeter
i = self.skip_to_character(character=lstring_delimiter, idx=i + 1)
i += 1
i = self.skip_to_character(character=rstring_delimiter, idx=i + 1)
i += 1
i = self.skip_whitespaces_at(idx=i, move_main_index=False)
next_c = self.get_char_at(i)
if next_c == ":":
self.log(
"While parsing a string, we a misplaced quote that would have closed the string but has a different meaning here, ignoring it",
)
string_acc += str(char)
self.index += 1
char = self.get_char_at()
continue
# We found a delimiter and we need to check if this is a key
# so find a rstring_delimiter and a colon after
i = self.skip_to_character(character=rstring_delimiter, idx=i + 1)
i += 1
next_c = self.get_char_at(i)
while next_c and next_c != ":":
if next_c in [",", "]", "}"] or (
next_c == rstring_delimiter and self.get_char_at(i - 1) != "\\"
):
break
i += 1
next_c = self.get_char_at(i)
# Only if we fail to find a ':' then we know this is misplaced quote
if next_c != ":":
self.log(
"While parsing a string, we a misplaced quote that would have closed the string but has a different meaning here, ignoring it",
)
unmatched_delimiter = not unmatched_delimiter
string_acc += str(char)
self.index += 1
char = self.get_char_at()
elif self.context.current == ContextValues.ARRAY:
# So here we can have a few valid cases:
# ["bla bla bla "puppy" bla bla bla "kitty" bla bla"]
# ["value1" value2", "value3"]
# The basic idea is that if we find an even number of delimiters after this delimiter
# we ignore this delimiter as it should be fine
even_delimiters = next_c == rstring_delimiter
while next_c == rstring_delimiter:
i = self.skip_to_character(character=[rstring_delimiter, "]"], idx=i + 1)
next_c = self.get_char_at(i)
if next_c != rstring_delimiter:
even_delimiters = False
break
i = self.skip_to_character(character=[rstring_delimiter, "]"], idx=i + 1)
next_c = self.get_char_at(i)
if even_delimiters:
# If we got up to here it means that this is a situation like this:
# ["bla bla bla "puppy" bla bla bla "kitty" bla bla"]
# So we need to ignore this quote
self.log(
"While parsing a string in Array context, we detected a quoted section that would have closed the string but has a different meaning here, ignoring it",
)
unmatched_delimiter = not unmatched_delimiter
string_acc += str(char)
self.index += 1
char = self.get_char_at()
else:
break
elif self.context.current == ContextValues.OBJECT_KEY:
# In this case we just ignore this and move on
self.log(
"While parsing a string in Object Key context, we detected a quoted section that would have closed the string but has a different meaning here, ignoring it",
)
string_acc += str(char)
self.index += 1
char = self.get_char_at()
if char and missing_quotes and self.context.current == ContextValues.OBJECT_KEY and char.isspace():
self.log(
"While parsing a string, handling an extreme corner case in which the LLM added a comment instead of valid string, invalidate the string and return an empty value",
)
self.skip_whitespaces_at()
if self.get_char_at() not in [":", ","]:
return ""
# A fallout of the previous special case in the while loop,
# we need to update the index only if we had a closing quote
if char != rstring_delimiter:
# if stream_stable = True, unclosed strings do not trim trailing whitespace characters
if not self.stream_stable:
self.log(
"While parsing a string, we missed the closing quote, ignoring",
)
string_acc = string_acc.rstrip()
else:
self.index += 1
if not self.stream_stable and (missing_quotes or (string_acc and string_acc[-1] == "\n")):
# Clean the whitespaces for some corner cases
string_acc = string_acc.rstrip()
return string_acc

View File

@@ -0,0 +1,19 @@
from typing import TYPE_CHECKING
from ..constants import JSONReturnType # noqa: TID252
if TYPE_CHECKING:
from ..json_parser import JSONParser # noqa: TID252
def parse_json_llm_block(self: "JSONParser") -> JSONReturnType:
"""
Extracts and normalizes JSON enclosed in ```json ... ``` blocks.
"""
# Try to find a ```json ... ``` block
if self.json_str[self.index : self.index + 7] == "```json":
i = self.skip_to_character("`", idx=7)
if self.json_str[self.index + i : self.index + i + 3] == "```":
self.index += 7 # Move past ```json
return self.parse_json()
return False

View File

@@ -0,0 +1,108 @@
import os
from typing import TextIO
class StringFileWrapper:
# This is a trick to simplify the code, transform the filedescriptor handling into a string handling
def __init__(self, fd: TextIO, chunk_length: int) -> None:
"""
Initialize the StringFileWrapper with a file descriptor and chunk length.
Args:
fd (TextIO): The file descriptor to wrap.
CHUNK_LENGTH (int): The length of each chunk to read from the file.
Attributes:
fd (TextIO): The wrapped file descriptor.
length (int): The total length of the file content.
buffers (dict[int, str]): Dictionary to store chunks of file content.
buffer_length (int): The length of each buffer chunk.
"""
self.fd = fd
self.length: int = 0
# Buffers are 1MB strings that are read from the file
# and kept in memory to keep reads low
self.buffers: dict[int, str] = {}
# chunk_length is in bytes
if not chunk_length or chunk_length < 2:
chunk_length = 1_000_000
self.buffer_length = chunk_length
def get_buffer(self, index: int) -> str:
"""
Retrieve or load a buffer chunk from the file.
Args:
index (int): The index of the buffer chunk to retrieve.
Returns:
str: The buffer chunk at the specified index.
"""
if self.buffers.get(index) is None:
self.fd.seek(index * self.buffer_length)
self.buffers[index] = self.fd.read(self.buffer_length)
# Save memory by keeping max 2MB buffer chunks and min 2 chunks
if len(self.buffers) > max(2, 2_000_000 / self.buffer_length):
oldest_key = next(iter(self.buffers))
if oldest_key != index:
self.buffers.pop(oldest_key)
return self.buffers[index]
def __getitem__(self, index: int | slice) -> str:
"""
Retrieve a character or a slice of characters from the file.
Args:
index (Union[int, slice]): The index or slice of characters to retrieve.
Returns:
str: The character(s) at the specified index or slice.
"""
# The buffer is an array that is seek like a RAM:
# self.buffers[index]: the row in the array of length 1MB, index is `i` modulo CHUNK_LENGTH
# self.buffures[index][j]: the column of the row that is `i` remainder CHUNK_LENGTH
if isinstance(index, slice):
buffer_index = index.start // self.buffer_length
buffer_end = index.stop // self.buffer_length
if buffer_index == buffer_end:
return self.get_buffer(buffer_index)[index.start % self.buffer_length : index.stop % self.buffer_length]
else:
start_slice = self.get_buffer(buffer_index)[index.start % self.buffer_length :]
end_slice = self.get_buffer(buffer_end)[: index.stop % self.buffer_length]
middle_slices = [self.get_buffer(i) for i in range(buffer_index + 1, buffer_end)]
return start_slice + "".join(middle_slices) + end_slice
else:
buffer_index = index // self.buffer_length
return self.get_buffer(buffer_index)[index % self.buffer_length]
def __len__(self) -> int:
"""
Get the total length of the file.
Returns:
int: The total number of characters in the file.
"""
if self.length < 1:
current_position = self.fd.tell()
self.fd.seek(0, os.SEEK_END)
self.length = self.fd.tell()
self.fd.seek(current_position)
return self.length
def __setitem__(self, index: int | slice, value: str) -> None: # pragma: no cover
"""
Set a character or a slice of characters in the file.
Args:
index (slice): The slice of characters to set.
value (str): The value to set at the specified index or slice.
"""
start = index.start or 0 if isinstance(index, slice) else index or 0
if start < 0:
start += len(self)
current_position = self.fd.tell()
self.fd.seek(start)
self.fd.write(value)
self.fd.seek(current_position)