|  | 
|  | 1 | +from collections import UserDict | 
|  | 2 | + | 
|  | 3 | +from .item import Item, InnerList, itemise, AllItemType | 
|  | 4 | +from .list import parse_item_or_inner_list | 
|  | 5 | +from .types import JsonDictType | 
|  | 6 | +from .util import ( | 
|  | 7 | +    StructuredFieldValue, | 
|  | 8 | +    discard_http_ows, | 
|  | 9 | +    ser_key, | 
|  | 10 | +    parse_key, | 
|  | 11 | +) | 
|  | 12 | + | 
|  | 13 | +EQUALS = ord(b"=") | 
|  | 14 | +COMMA = ord(b",") | 
|  | 15 | + | 
|  | 16 | + | 
|  | 17 | +class Dictionary(UserDict, StructuredFieldValue): | 
|  | 18 | +    def parse_content(self, data: bytes) -> int: | 
|  | 19 | +        bytes_consumed = 0 | 
|  | 20 | +        data_len = len(data) | 
|  | 21 | +        try: | 
|  | 22 | +            while True: | 
|  | 23 | +                offset, this_key = parse_key(data[bytes_consumed:]) | 
|  | 24 | +                bytes_consumed += offset | 
|  | 25 | +                try: | 
|  | 26 | +                    is_equals = data[bytes_consumed] == EQUALS | 
|  | 27 | +                except IndexError: | 
|  | 28 | +                    is_equals = False | 
|  | 29 | +                if is_equals: | 
|  | 30 | +                    bytes_consumed += 1  # consume the "=" | 
|  | 31 | +                    offset, member = parse_item_or_inner_list(data[bytes_consumed:]) | 
|  | 32 | +                    bytes_consumed += offset | 
|  | 33 | +                else: | 
|  | 34 | +                    member = Item() | 
|  | 35 | +                    member.value = True | 
|  | 36 | +                    bytes_consumed += member.params.parse(data[bytes_consumed:]) | 
|  | 37 | +                self[this_key] = member | 
|  | 38 | +                bytes_consumed += discard_http_ows(data[bytes_consumed:]) | 
|  | 39 | +                if bytes_consumed == data_len: | 
|  | 40 | +                    return bytes_consumed | 
|  | 41 | +                if data[bytes_consumed] != COMMA: | 
|  | 42 | +                    raise ValueError(f"Dictionary member '{this_key}' has trailing characters") | 
|  | 43 | +                bytes_consumed += 1 | 
|  | 44 | +                bytes_consumed += discard_http_ows(data[bytes_consumed:]) | 
|  | 45 | +                if bytes_consumed == data_len: | 
|  | 46 | +                    raise ValueError("Dictionary has trailing comma") | 
|  | 47 | +        except Exception as why: | 
|  | 48 | +            self.clear() | 
|  | 49 | +            raise ValueError from why | 
|  | 50 | + | 
|  | 51 | +    def __setitem__(self, key: str, value: AllItemType) -> None: | 
|  | 52 | +        self.data[key] = itemise(value) | 
|  | 53 | + | 
|  | 54 | +    def __str__(self) -> str: | 
|  | 55 | +        if len(self) == 0: | 
|  | 56 | +            raise ValueError("No contents; field should not be emitted") | 
|  | 57 | +        return ", ".join( | 
|  | 58 | +            [ | 
|  | 59 | +                f"{ser_key(m)}" | 
|  | 60 | +                f"""{n.params if (isinstance(n, Item) and n.value is True) else f"={n}"}""" | 
|  | 61 | +                for m, n in self.items() | 
|  | 62 | +            ] | 
|  | 63 | +        ) | 
|  | 64 | + | 
|  | 65 | +    def to_json(self) -> JsonDictType: | 
|  | 66 | +        return [(key, val.to_json()) for (key, val) in self.items()] | 
|  | 67 | + | 
|  | 68 | +    def from_json(self, json_data: JsonDictType) -> None: | 
|  | 69 | +        for key, val in json_data: | 
|  | 70 | +            if isinstance(val[0], list): | 
|  | 71 | +                self[key] = InnerList() | 
|  | 72 | +            else: | 
|  | 73 | +                self[key] = Item() | 
|  | 74 | +            self[key].from_json(val) | 
0 commit comments