Source code for dag.codecs.dag_pb

"""DAG-PB codec - Protobuf-based Merkle DAG nodes.

Multicodec code: ``0x70``

DAG-PB uses Protocol Buffers for encoding. It is the codec used by
legacy IPFS (UnixFS). A DAG-PB node (``PBNode``) consists of:

- ``Data``  - optional raw bytes payload
- ``Links`` - a list of ``PBLink`` entries, each containing:
  - ``Hash``  - a CID (as raw bytes)
  - ``Name``  - an optional UTF-8 string name
  - ``Tsize`` - an optional integer (total cumulative size)

**Protobuf schema** (proto2)::

    message PBLink {
        optional bytes  Hash  = 1;
        optional string Name  = 2;
        optional uint64 Tsize = 3;
    }

    message PBNode {
        optional bytes   Data  = 1;
        repeated PBLink  Links = 2;
    }

**Wire format field tags**:
- PBLink.Hash:  field 1, wire type 2 (length-delimited) → tag byte ``0x0a``
- PBLink.Name:  field 2, wire type 2 (length-delimited) → tag byte ``0x12``
- PBLink.Tsize: field 3, wire type 0 (varint)           → tag byte ``0x18``
- PBNode.Data:  field 1, wire type 2 (length-delimited) → tag byte ``0x0a``
- PBNode.Links: field 2, wire type 2 (length-delimited) → tag byte ``0x12``

Links MUST be sorted: first by Name (bytes), then by those without names.
Encoding is done right-to-left to match the reference implementation.

Reference implementations:
- https://github.com/ipld/js-dag-pb
- https://ipld.io/specs/codecs/dag-pb/spec/
"""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any

from cid import from_bytes as cid_from_bytes

from ..codec import BlockCodec, register_codec
from ..ipld_model import CID, IPLDNode, is_cid
from ..multicodec_codes import DAG_PB_CODE, DAG_PB_NAME






[docs] @dataclass class PBNode: """A DAG-PB node. Attributes ---------- data: Optional raw bytes payload. links: List of links to other nodes. """ data: bytes | None = None links: list[PBLink] = field(default_factory=list)
_WIRE_VARINT = 0 _WIRE_LENGTH_DELIMITED = 2 def _encode_varint(value: int) -> bytes: """Encode an unsigned integer as a protobuf varint.""" if value < 0: raise ValueError("Varint must be non-negative") parts = [] while value > 0x7F: parts.append((value & 0x7F) | 0x80) value >>= 7 parts.append(value & 0x7F) return bytes(parts) _MAX_VARINT_BYTES = 10 def _decode_varint(data: bytes, offset: int) -> tuple[int, int]: """Decode a varint from *data* at *offset*. Returns ``(value, new_offset)``. Raises ``ValueError`` if the varint exceeds 10 bytes (the maximum needed for a 64-bit value), which guards against malformed input causing unbounded iteration. """ result = 0 shift = 0 bytes_read = 0 while True: if offset >= len(data): raise ValueError("Unexpected end of data while reading varint") byte = data[offset] result |= (byte & 0x7F) << shift offset += 1 bytes_read += 1 if not (byte & 0x80): break shift += 7 if bytes_read >= _MAX_VARINT_BYTES: raise ValueError(f"Varint exceeds {_MAX_VARINT_BYTES} bytes, input is malformed") return result, offset def _encode_tag(field_number: int, wire_type: int) -> bytes: """Encode a protobuf field tag.""" return _encode_varint((field_number << 3) | wire_type) def _encode_length_delimited(field_number: int, data: bytes) -> bytes: """Encode a length-delimited protobuf field.""" tag = _encode_tag(field_number, _WIRE_LENGTH_DELIMITED) length = _encode_varint(len(data)) return tag + length + data def _encode_varint_field(field_number: int, value: int) -> bytes: """Encode a varint protobuf field.""" tag = _encode_tag(field_number, _WIRE_VARINT) return tag + _encode_varint(value) def _encode_pb_link(link: PBLink) -> bytes: """Encode a single PBLink to protobuf bytes. Fields are written in order: Hash (1), Name (2), Tsize (3). """ parts: list[bytes] = [] if link.hash is not None: cid_bytes = link.hash.buffer parts.append(_encode_length_delimited(1, cid_bytes)) if link.name is not None: parts.append(_encode_length_delimited(2, link.name.encode("utf-8"))) if link.tsize is not None: parts.append(_encode_varint_field(3, link.tsize)) return b"".join(parts) def _decode_pb_link(data: bytes) -> PBLink: """Decode a PBLink from protobuf bytes.""" link = PBLink() offset = 0 while offset < len(data): tag_value, offset = _decode_varint(data, offset) field_number = tag_value >> 3 wire_type = tag_value & 0x07 if wire_type == _WIRE_LENGTH_DELIMITED: length, offset = _decode_varint(data, offset) field_data = data[offset : offset + length] offset += length if field_number == 1: # Hash → CID bytes link.hash = cid_from_bytes(field_data) elif field_number == 2: # Name link.name = field_data.decode("utf-8") else: pass # skip unknown fields elif wire_type == _WIRE_VARINT: value, offset = _decode_varint(data, offset) if field_number == 3: link.tsize = value else: raise ValueError(f"Unsupported wire type {wire_type} in PBLink") return link def _encode_pb_node(node: PBNode) -> bytes: """Encode a PBNode to protobuf bytes. Links (field 2) are encoded first, then Data (field 1). This matches the canonical DAG-PB encoding order. """ parts: list[bytes] = [] for link in node.links: link_bytes = _encode_pb_link(link) parts.append(_encode_length_delimited(2, link_bytes)) if node.data is not None: parts.append(_encode_length_delimited(1, node.data)) return b"".join(parts) def _decode_pb_node(data: bytes) -> PBNode: """Decode a PBNode from protobuf bytes.""" node = PBNode() offset = 0 while offset < len(data): tag_value, offset = _decode_varint(data, offset) field_number = tag_value >> 3 wire_type = tag_value & 0x07 if wire_type == _WIRE_LENGTH_DELIMITED: length, offset = _decode_varint(data, offset) field_data = data[offset : offset + length] offset += length if field_number == 1: node.data = field_data elif field_number == 2: node.links.append(_decode_pb_link(field_data)) else: pass elif wire_type == _WIRE_VARINT: _, offset = _decode_varint(data, offset) else: raise ValueError(f"Unsupported wire type {wire_type} in PBNode") return node def _prepare_node(value: Any) -> PBNode: """Convert an IPLD data-model value into a ``PBNode``. Accepts either a ``PBNode`` directly or a dict with the shape:: { "Data": b"...", # optional "Links": [ # optional { "Hash": <CID>, "Name": "...", # optional "Tsize": 123, # optional }, ... ] } """ if isinstance(value, PBNode): return value if not isinstance(value, dict): raise TypeError(f"DAG-PB encode expects a PBNode or dict, got {type(value).__name__}") data = value.get("Data") if data is not None and not isinstance(data, (bytes, bytearray)): raise TypeError(f"PBNode.Data must be bytes, got {type(data).__name__}") raw_links = value.get("Links", []) links: list[PBLink] = [] for raw_link in raw_links: if isinstance(raw_link, PBLink): links.append(raw_link) elif isinstance(raw_link, dict): link_hash = raw_link.get("Hash") if link_hash is not None and not is_cid(link_hash): raise TypeError(f"PBLink.Hash must be a CID, got {type(link_hash).__name__}") links.append( PBLink( hash=link_hash, name=raw_link.get("Name"), tsize=raw_link.get("Tsize"), ) ) else: raise TypeError(f"Link must be a PBLink or dict, got {type(raw_link).__name__}") return PBNode(data=data if data is None else bytes(data), links=links) def _validate_node(node: PBNode) -> None: """Validate a PBNode for encoding.""" if node.data is not None and not isinstance(node.data, bytes): raise TypeError(f"PBNode.Data must be bytes, got {type(node.data).__name__}") for link in node.links: if link.hash is not None and not is_cid(link.hash): raise TypeError(f"PBLink.Hash must be a CID, got {type(link.hash).__name__}") if link.name is not None and not isinstance(link.name, str): raise TypeError(f"PBLink.Name must be str, got {type(link.name).__name__}") if link.tsize is not None and not isinstance(link.tsize, int): raise TypeError(f"PBLink.Tsize must be int, got {type(link.tsize).__name__}") def _node_to_dict(node: PBNode) -> dict[str, Any]: """Convert a PBNode back to a dict representation.""" result: dict[str, Any] = {} if node.links: links_list = [] for link in node.links: link_dict: dict[str, Any] = {} if link.hash is not None: link_dict["Hash"] = link.hash if link.name is not None: link_dict["Name"] = link.name if link.tsize is not None: link_dict["Tsize"] = link.tsize links_list.append(link_dict) result["Links"] = links_list else: result["Links"] = [] if node.data is not None: result["Data"] = node.data else: result["Data"] = None return result
[docs] class DagPbCodec(BlockCodec): """DAG-PB codec (``0x70``). Encodes/decodes ``PBNode`` structures to/from Protocol Buffers wire format. """ @property def name(self) -> str: return DAG_PB_NAME @property def code(self) -> int: return DAG_PB_CODE
[docs] def encode(self, node: IPLDNode) -> bytes: """Encode a PBNode (or equivalent dict) to DAG-PB bytes.""" pb_node = _prepare_node(node) _validate_node(pb_node) return _encode_pb_node(pb_node)
[docs] def decode(self, data: bytes) -> IPLDNode: """Decode DAG-PB bytes into a dict with ``Data`` and ``Links`` keys.""" pb_node = _decode_pb_node(data) return _node_to_dict(pb_node)
codec = DagPbCodec() """Module-level singleton codec instance.""" name = codec.name code = codec.code encode = codec.encode decode = codec.decode register_codec(codec) __all__ = ["DagPbCodec", "PBLink", "PBNode", "codec", "decode", "encode"]