Skip to content

Utilities

utils

Cookie

Cookie(
    domain,
    path,
    secure,
    expires,
    name,
    value,
    http_only=True,
    include_subdomains=False,
)

Represents a single HTTP cookie.

Source code in src/griddy/core/utils/cookies.py
def __init__(
    self,
    domain: str,
    path: str,
    secure: bool,
    expires: int | None,
    name: str,
    value: str,
    http_only: bool = True,
    include_subdomains: bool = False,
):
    """Initialize a cookie with domain, path, security, expiration, and name/value."""
    self.domain = domain
    self.path = path
    self.secure = secure
    self.expires = expires
    self.name = name
    self.value = value
    self.http_only = http_only
    self.include_subdomains = include_subdomains

is_expired property

is_expired

Check if the cookie is expired.

matches_domain

matches_domain(domain)

Check if this cookie matches the given domain.

Source code in src/griddy/core/utils/cookies.py
def matches_domain(self, domain: str) -> bool:
    """Check if this cookie matches the given domain."""
    # Remove leading dot from cookie domain for comparison
    cookie_domain = self.domain.lstrip(".")
    target_domain = domain.lower()

    # Exact match
    if cookie_domain.lower() == target_domain:
        return True

    # Subdomain match (if cookie domain starts with .)
    if self.domain.startswith(".") or self.include_subdomains:
        return target_domain.endswith("." + cookie_domain.lower())

    return False

matches_path

matches_path(path)

Check if this cookie matches the given path.

Source code in src/griddy/core/utils/cookies.py
def matches_path(self, path: str) -> bool:
    """Check if this cookie matches the given path."""
    if self.path == "/":
        return True
    return path.startswith(self.path)

to_dict

to_dict()

Convert cookie to dictionary format.

Source code in src/griddy/core/utils/cookies.py
def to_dict(self) -> dict[str, str]:
    """Convert cookie to dictionary format."""
    return {self.name: self.value}

to_header_string

to_header_string()

Convert cookie to HTTP header format.

Source code in src/griddy/core/utils/cookies.py
def to_header_string(self) -> str:
    """Convert cookie to HTTP header format."""
    return f"{self.name}={self.value}"

OpenEnumMeta

Bases: EnumMeta

Enum metaclass that returns the raw value instead of raising ValueError for unknown members.

HarEntryPathManager

HarEntryPathManager(path, filename_prefix)

Aggregates HAR entries for a single API path, collecting headers and query params.

Source code in src/griddy/core/utils/har.py
def __init__(self, path: str, filename_prefix: str):
    """Initialize with an API path and filename prefix."""
    self.path = path
    self.headers = {}
    self.query_params = defaultdict(set)
    # Not sure if we'll use this
    self.cookies = {}

    # Can we get away with just one?
    # if not, do we need to map params to the response?
    self.response_example = None
    self.filename_prefix = filename_prefix

filename property

filename

Generate a filename from the API path.

as_dict

as_dict(exclude)

Return a dict representation, excluding specified attributes.

Source code in src/griddy/core/utils/har.py
def as_dict(self, exclude: List[str]) -> Dict:
    """Return a dict representation, excluding specified attributes."""
    obj_dict = {}

    for attr in ["path", "headers", "query_params", "response_example"]:
        if attr in exclude:
            continue
        value = getattr(self, attr)
        if attr == "query_params":
            value = {
                name: list(param_values) for name, param_values in value.items()
            }

        obj_dict[attr] = value

    return obj_dict

add_entry

add_entry(entry)

Merge a HAR entry's headers, query params, and response into this manager.

Source code in src/griddy/core/utils/har.py
def add_entry(self, entry: Dict) -> None:
    """Merge a HAR entry's headers, query params, and response into this manager."""
    if entry["request"]["path"] != self.path:
        raise ValueError(
            f"Entry path {entry['request']['path']} "
            f"does not match the class path {self.path}"
        )

    for req_header in entry["request"]["headers"]:
        header = req_header["name"]
        value = req_header["value"]
        self.headers[header] = value

    for qp in entry["request"]["queryString"]:
        name = qp["name"]
        val = qp["value"]
        self.query_params[name].add(val)

    if entry["response"]:
        self.response_example = entry["response"]

Logger

Bases: Protocol

Protocol for SDK debug loggers.

debug

debug(msg, *args, **kwargs)

Log a debug-level message.

Source code in src/griddy/core/utils/logger.py
def debug(self, msg: str, *args: Any, **kwargs: Any) -> None:
    """Log a debug-level message."""
    pass

warning

warning(msg, *args, **kwargs)

Log a warning-level message.

Source code in src/griddy/core/utils/logger.py
def warning(self, msg: str, *args: Any, **kwargs: Any) -> None:
    """Log a warning-level message."""
    pass

FieldMetadata

FieldMetadata(
    security=None,
    path=None,
    query=None,
    header=None,
    request=None,
    form=None,
    multipart=None,
)

Aggregates all parameter metadata types for a single Pydantic field.

Source code in src/griddy/core/utils/metadata.py
def __init__(
    self,
    security: Optional[SecurityMetadata] = None,
    path: Optional[Union[PathParamMetadata, bool]] = None,
    query: Optional[Union[QueryParamMetadata, bool]] = None,
    header: Optional[Union[HeaderMetadata, bool]] = None,
    request: Optional[Union[RequestMetadata, bool]] = None,
    form: Optional[Union[FormMetadata, bool]] = None,
    multipart: Optional[Union[MultipartFormMetadata, bool]] = None,
) -> None:
    """Initialize field metadata, converting bool shortcuts to default instances."""
    self.security = security
    self.path = PathParamMetadata() if isinstance(path, bool) else path
    self.query = QueryParamMetadata() if isinstance(query, bool) else query
    self.header = HeaderMetadata() if isinstance(header, bool) else header
    self.request = RequestMetadata() if isinstance(request, bool) else request
    self.form = FormMetadata() if isinstance(form, bool) else form
    self.multipart = (
        MultipartFormMetadata() if isinstance(multipart, bool) else multipart
    )

FormMetadata dataclass

FormMetadata(json=False, style='form', explode=True)

Metadata for URL-encoded form fields.

HeaderMetadata dataclass

HeaderMetadata(
    serialization=None, style="simple", explode=False
)

Bases: ParamMetadata

Metadata for HTTP header parameters.

MultipartFormMetadata dataclass

MultipartFormMetadata(
    file=False, content=False, json=False
)

Metadata for multipart form fields.

PathParamMetadata dataclass

PathParamMetadata(
    serialization=None, style="simple", explode=False
)

Bases: ParamMetadata

Metadata for URL path parameters.

QueryParamMetadata dataclass

QueryParamMetadata(
    serialization=None, style="form", explode=True
)

Bases: ParamMetadata

Metadata for URL query parameters.

RequestMetadata dataclass

RequestMetadata(media_type='application/octet-stream')

Metadata for request body media type.

SecurityMetadata dataclass

SecurityMetadata(
    option=False,
    scheme=False,
    scheme_type=None,
    sub_type=None,
    field_name=None,
)

Metadata describing a security field (API key, OAuth2, HTTP auth, etc.).

get_field_name

get_field_name(default)

Return the field name, falling back to default if not set.

Source code in src/griddy/core/utils/metadata.py
def get_field_name(self, default: str) -> str:
    """Return the field name, falling back to default if not set."""
    return self.field_name or default

SerializedRequestBody dataclass

SerializedRequestBody(
    media_type=None, content=None, data=None, files=None
)

Container for a serialized HTTP request body with media type and content.

BackoffStrategy

BackoffStrategy(
    initial_interval,
    max_interval,
    exponent,
    max_elapsed_time,
)

Exponential backoff configuration with initial/max intervals and exponent.

Source code in src/griddy/core/utils/retries.py
def __init__(
    self,
    initial_interval: int,
    max_interval: int,
    exponent: float,
    max_elapsed_time: int,
):
    """Initialize backoff parameters."""
    self.initial_interval = initial_interval
    self.max_interval = max_interval
    self.exponent = exponent
    self.max_elapsed_time = max_elapsed_time

Retries

Retries(config, status_codes)

Retry state combining a RetryConfig with applicable status codes.

Source code in src/griddy/core/utils/retries.py
def __init__(self, config: RetryConfig, status_codes: List[str]):
    """Initialize with retry config and status codes."""
    self.config = config
    self.status_codes = status_codes

RetryConfig

RetryConfig(strategy, backoff, retry_connection_errors)

Retry strategy configuration with backoff settings.

Source code in src/griddy/core/utils/retries.py
def __init__(
    self, strategy: str, backoff: BackoffStrategy, retry_connection_errors: bool
):
    """Initialize retry configuration."""
    self.strategy = strategy
    self.backoff = backoff
    self.retry_connection_errors = retry_connection_errors

YAMLConsolidator

YAMLConsolidator(spec_dir, pattern)

Consolidates multiple OpenAPI YAML spec files into a single spec, tracking diffs.

Source code in src/griddy/core/utils/yaml_consolidator.py
def __init__(self, spec_dir: str, pattern: str):
    """Initialize with a spec directory and glob pattern for YAML files."""
    self.spec_dir = Path(spec_dir)
    self.specs = {}
    self.load_specs(pattern=pattern)

    self.open_api = None
    self.info = {}
    self.servers = []
    self.security = []

    self.components = {}
    self.paths = {}
    self.tags = []
    self.diff_counts = defaultdict(int)
    self.cur_spec_name = None

    self.diffs = {
        "components": {"schemas": [], "securitySchemes": []},
        "paths": [],
        "tags": [],
    }

    self.original_entry_source = {}

    self.combined_spec = {}

add_diff_entry

add_diff_entry(attr, key, old, new)

Record a diff when a key's value changes between specs.

Source code in src/griddy/core/utils/yaml_consolidator.py
def add_diff_entry(self, attr: str, key: str, old: Any, new: Any) -> None:
    """Record a diff when a key's value changes between specs."""
    diff_entry = {
        "key": key,
        "existing_value": json.dumps(old, indent=4),
        "existing_source": self.original_entry_source[f"{attr}.{key}"],
        "new_value": json.dumps(new, indent=4),
        "new_source": self.cur_spec_name,
    }
    diff_entry.update(self.compute_diff_info(diff_entry=diff_entry))
    self.diffs[attr].append(diff_entry)

add_spec

add_spec(spec_path)

Load and register a YAML spec file.

Source code in src/griddy/core/utils/yaml_consolidator.py
def add_spec(self, spec_path: str | Path) -> None:
    """Load and register a YAML spec file."""
    if isinstance(spec_path, str):
        spec_path = Path(spec_path)
    if not spec_path.exists():
        raise FileNotFoundError(f"{spec_path} does not exist.")
    if spec_path.stem in self.specs:
        raise ValueError(f"{spec_path.stem} is already in self.specs.")

    with spec_path.open() as infile:
        self.specs[spec_path.stem] = self.get_sorted_spec(yaml.full_load(infile))

combine_all_specs

combine_all_specs()

Integrate all loaded specs into a single combined spec.

Source code in src/griddy/core/utils/yaml_consolidator.py
def combine_all_specs(self) -> None:
    """Integrate all loaded specs into a single combined spec."""
    for name, spec in self.specs.items():
        print(name)
        self.cur_spec_name = name
        self.integrate_spec(spec=spec)

    self.combined_spec = {
        "openapi": self.open_api,
        "info": self.info,
        "servers": self.servers,
        "security": self.security,
        "tags": self.tags,
        "paths": self.paths,
        "components": self.components,
    }

compute_diff_info

compute_diff_info(diff_entry)

Compute HTML diff and similarity ratio for a diff entry.

Source code in src/griddy/core/utils/yaml_consolidator.py
def compute_diff_info(self, diff_entry: Dict) -> Dict:
    """Compute HTML diff and similarity ratio for a diff entry."""
    differ = difflib.HtmlDiff()
    try:
        existing = diff_entry["existing_value"].splitlines()
        new_ = diff_entry["new_value"].splitlines()
    except AttributeError as e:
        from pprint import pprint

        pprint(diff_entry["existing_value"], indent=4)
        pprint(diff_entry["new_value"], indent=4)
        raise e

    diff_html = differ.make_table(existing, new_)
    similarity_matcher = difflib.SequenceMatcher(None, existing, new_)
    return {"html": diff_html, "similarity": similarity_matcher.ratio()}

create_full_html_string

create_full_html_string(diffs_list)

Build an HTML string from a list of diff entries.

Source code in src/griddy/core/utils/yaml_consolidator.py
def create_full_html_string(self, diffs_list: List[Dict]) -> str:
    """Build an HTML string from a list of diff entries."""
    diffs_html = "<div>\n"
    for diff_entry in diffs_list:
        diffs_html += (
            f"<br>\n"
            f"<h3>{diff_entry['key']}</h3>"
            f"<p>Original Source: {diff_entry['existing_source']}</p>"
            f"<p>New Source: {diff_entry['new_source']}</p>"
            f"{diff_entry['html']}\n"
            f"<br>\n"
        )
    diffs_html += f"</div>"
    return diffs_html

get_open_api_attr

get_open_api_attr(attr)

Get a per-spec attribute map, loading lazily if needed.

Source code in src/griddy/core/utils/yaml_consolidator.py
def get_open_api_attr(self, attr: str) -> Any:
    """Get a per-spec attribute map, loading lazily if needed."""
    if getattr(self, attr) is None:
        self._set_openapi_attr(attr=attr)

    return getattr(self, attr)

get_sorted_spec

get_sorted_spec(spec)

Sort a spec's paths, components, and tags alphabetically.

Source code in src/griddy/core/utils/yaml_consolidator.py
def get_sorted_spec(self, spec: Dict) -> Dict:
    """Sort a spec's paths, components, and tags alphabetically."""
    sorted_spec = {
        key: spec[key] for key in ["openapi", "info", "servers", "security"]
    }

    for key in ["tags", "paths", "components"]:
        sorted_spec[key] = self.sort_entries_for_attr(spec=spec, attr=key)

    return sorted_spec

handle_component_diffs

handle_component_diffs()

Build HTML diff strings for schema and security scheme diffs.

Source code in src/griddy/core/utils/yaml_consolidator.py
def handle_component_diffs(self) -> tuple[str, str]:
    """Build HTML diff strings for schema and security scheme diffs."""
    schemas_diffs = self.diffs["components"]["schemas"]
    security_schemes_diffs = self.diffs["components"]["securitySchemes"]

    schemas_html = self.create_full_html_string(schemas_diffs)
    security_schemes_html = self.create_full_html_string(security_schemes_diffs)
    return schemas_html, security_schemes_html

integrate_attr

integrate_attr(spec, attr)

Merge a single top-level spec attribute into the consolidated spec.

Source code in src/griddy/core/utils/yaml_consolidator.py
def integrate_attr(self, spec: Dict, attr: str) -> None:
    """Merge a single top-level spec attribute into the consolidated spec."""
    if attr == "components":
        self.integrate_components(spec[attr])
        return
    elif attr == "tags":
        self.integrate_tags(tags=spec.get(attr, []))
        return

    existing_entries = getattr(self, attr)

    for key, new_entry in spec[attr].items():
        if key in existing_entries:
            if new_entry == (old_entry := existing_entries[key]):
                continue
            else:
                self.add_diff_entry(
                    attr=attr, key=key, old=old_entry, new=new_entry
                )
                self.diff_counts[attr] += 1
        else:
            existing_entries[key] = new_entry
            self.original_entry_source[f"{attr}.{key}"] = self.cur_spec_name

    setattr(self, attr, existing_entries)

integrate_components

integrate_components(components)

Merge component schemas and security schemes into the consolidated spec.

Source code in src/griddy/core/utils/yaml_consolidator.py
def integrate_components(self, components: Dict) -> None:
    """Merge component schemas and security schemes into the consolidated spec."""
    new_components = {}

    for sub_component in ["schemas", "securitySchemes"]:
        existing = self.components.get(sub_component, {})
        for key, new_entry in components[sub_component].items():
            if key in existing:
                if new_entry == (old_entry := existing[key]):
                    continue
                else:
                    original_source = self.original_entry_source[
                        f"components.{sub_component}.{key}"
                    ]
                    diff_entry = {
                        "key": key,
                        "existing_value": json.dumps(old_entry, indent=4),
                        "existing_source": original_source,
                        "new_value": json.dumps(new_entry, indent=4),
                        "new_source": self.cur_spec_name,
                    }
                    diff_entry.update(self.compute_diff_info(diff_entry=diff_entry))

                    self.diffs["components"][sub_component].append(diff_entry)
                    self.diff_counts[f"components.{sub_component}"] += 1
            else:
                existing[key] = new_entry
                self.original_entry_source[f"components.{sub_component}.{key}"] = (
                    self.cur_spec_name
                )
        new_components[sub_component] = existing

    self.components = new_components

integrate_spec

integrate_spec(spec)

Merge all attributes of a single spec into the consolidated spec.

Source code in src/griddy/core/utils/yaml_consolidator.py
def integrate_spec(self, spec: Dict) -> None:
    """Merge all attributes of a single spec into the consolidated spec."""
    for key in ["tags", "paths", "components"]:
        self.integrate_attr(spec, key)

integrate_tags

integrate_tags(tags)

Merge tags into the consolidated spec, recording diffs.

Source code in src/griddy/core/utils/yaml_consolidator.py
def integrate_tags(self, tags: List) -> None:
    """Merge tags into the consolidated spec, recording diffs."""
    existing_tags = {t["name"]: t["description"] for t in self.tags}

    for t in tags:
        if t["name"] in existing_tags:
            if (new_value := t["description"]) != (
                old_value := existing_tags[t["name"]]
            ):
                diff_entry = {
                    "key": t["name"],
                    "existing_value": json.dumps(old_value, indent=4),
                    "existing_source": self.original_entry_source[
                        f"tags.{t['name']}"
                    ],
                    "new_value": json.dumps(new_value, indent=4),
                    "new_source": self.cur_spec_name,
                }
                diff_entry.update(self.compute_diff_info(diff_entry=diff_entry))

                self.diffs["tags"].append(diff_entry)
                self.diff_counts["tags"] += 1
        else:
            self.tags.append(t)
            self.original_entry_source[f"tags.{t['name']}"] = self.cur_spec_name

load_specs

load_specs(pattern)

Load all specs matching the pattern from the spec directory.

Source code in src/griddy/core/utils/yaml_consolidator.py
def load_specs(self, pattern: str) -> Dict:
    """Load all specs matching the pattern from the spec directory."""
    specs = {}
    for spec_file in self.spec_dir.glob(pattern=pattern):
        self.add_spec(spec_path=spec_file)

    return specs

output_diff

output_diff()

Generate and write an HTML diff report for all spec differences.

Source code in src/griddy/core/utils/yaml_consolidator.py
def output_diff(self) -> None:
    """Generate and write an HTML diff report for all spec differences."""
    full_html = html_template
    for diff_type in self.diffs:
        if diff_type == "components":
            schemas_html, security_schemes_html = self.handle_component_diffs()
            full_html += schemas_html
            full_html += security_schemes_html
        elif diff_type == "paths":
            paths_html = self.create_full_html_string(self.diffs["paths"])
            full_html += paths_html
        elif diff_type == "tags":
            tags_html = self.create_full_html_string(self.diffs["tags"])
            full_html += tags_html

    full_html += "<br>\n\t</body>\n</html>"
    from pprint import pprint

    pprint(self.diff_counts, indent=4)
    self.write_to_html(file_name="pro-reg-combined.html", html_text=full_html)

set_common_info

set_common_info(*args, **kwargs)

Set common OpenAPI attributes (info, servers, security) via keyword arguments.

Source code in src/griddy/core/utils/yaml_consolidator.py
def set_common_info(self, *args: Any, **kwargs: Any) -> None:
    """Set common OpenAPI attributes (info, servers, security) via keyword arguments."""
    for key, value in kwargs.items():
        setattr(self, key, value)

sort_all_specs

sort_all_specs()

Sort all loaded specs in place.

Source code in src/griddy/core/utils/yaml_consolidator.py
def sort_all_specs(self) -> None:
    """Sort all loaded specs in place."""
    for name, spec in self.specs.items():
        self.specs[name] = self.get_sorted_spec(spec=spec)

sort_entries_for_attr

sort_entries_for_attr(spec, attr)

Sort entries for a given spec attribute (paths, components, or tags).

Source code in src/griddy/core/utils/yaml_consolidator.py
def sort_entries_for_attr(self, spec: Dict, attr: str) -> Dict | List | None:
    """Sort entries for a given spec attribute (paths, components, or tags)."""
    sorted_attr_entry = None

    if attr not in spec:
        return sorted_attr_entry

    if attr == "paths":
        sorted_attr_entry = dict(sorted(spec["paths"].items()))
    elif attr == "components":
        schemas_sorted = dict(sorted(spec["components"]["schemas"].items()))
        security_schemes_sorted = dict(
            sorted(spec["components"]["securitySchemes"].items())
        )
        sorted_attr_entry = {
            "schemas": schemas_sorted,
            "securitySchemes": security_schemes_sorted,
        }
    elif attr == "tags":
        sorted_attr_entry = sorted(spec["tags"], key=lambda entry: entry["name"])

    return sorted_attr_entry

write_spec_to_disk

write_spec_to_disk(file_name, spec)

Write a single spec dict to a YAML file.

Source code in src/griddy/core/utils/yaml_consolidator.py
def write_spec_to_disk(self, file_name: str, spec: Dict) -> None:
    """Write a single spec dict to a YAML file."""
    print(f"Writing spec to {file_name}")

    with open(file_name, "w") as outfile:
        yaml.dump(spec, outfile)
        print(f"Success")

write_to_disk

write_to_disk(directory=None, suffix='')

Write all loaded specs to YAML files in the given directory.

Source code in src/griddy/core/utils/yaml_consolidator.py
def write_to_disk(self, directory: str = None, suffix: str = "") -> None:
    """Write all loaded specs to YAML files in the given directory."""
    if directory is None:
        directory = os.getcwd()

    print(f"Writing all specs to directory: {directory}")
    for name, spec in self.specs.items():
        file_name = f"{directory}/{name}{suffix}.yaml"
        self.write_spec_to_disk(file_name=file_name, spec=spec)

write_to_html

write_to_html(file_name, html_text)

Write an HTML string to a file.

Source code in src/griddy/core/utils/yaml_consolidator.py
def write_to_html(self, file_name: str, html_text: str) -> None:
    """Write an HTML string to a file."""
    with open(file_name, "w") as outfile:
        outfile.write(html_text)

get_discriminator

get_discriminator(model, fieldname, key)

Recursively search for the discriminator attribute in a model.

Parameters:

Name Type Description Default
model Any

The model to search within.

required
fieldname str

The name of the field to search for.

required
key str

The key to search for in dictionaries.

required

Returns:

Name Type Description
str str

The name of the discriminator attribute.

Raises:

Type Description
ValueError

If the discriminator attribute is not found.

Source code in src/griddy/core/utils/annotations.py
def get_discriminator(model: Any, fieldname: str, key: str) -> str:
    """
    Recursively search for the discriminator attribute in a model.

    Args:
        model (Any): The model to search within.
        fieldname (str): The name of the field to search for.
        key (str): The key to search for in dictionaries.

    Returns:
        str: The name of the discriminator attribute.

    Raises:
        ValueError: If the discriminator attribute is not found.
    """
    upper_fieldname = fieldname.upper()

    def get_field_discriminator(field: Any) -> Optional[str]:
        """Search for the discriminator attribute in a given field."""

        if isinstance(field, dict):
            if key in field:
                return f"{field[key]}"

        if hasattr(field, fieldname):
            attr = getattr(field, fieldname)
            if isinstance(attr, Enum):
                return f"{attr.value}"
            return f"{attr}"

        if hasattr(field, upper_fieldname):
            attr = getattr(field, upper_fieldname)
            if isinstance(attr, Enum):
                return f"{attr.value}"
            return f"{attr}"

        return None

    def search_nested_discriminator(obj: Any) -> Optional[str]:
        """Recursively search for discriminator in nested structures."""
        # First try direct field lookup
        discriminator = get_field_discriminator(obj)
        if discriminator is not None:
            return discriminator

        # If it's a dict, search in nested values
        if isinstance(obj, dict):
            for value in obj.values():
                if isinstance(value, list):
                    # Search in list items
                    for item in value:
                        nested_discriminator = search_nested_discriminator(item)
                        if nested_discriminator is not None:
                            return nested_discriminator
                elif isinstance(value, dict):
                    # Search in nested dict
                    nested_discriminator = search_nested_discriminator(value)
                    if nested_discriminator is not None:
                        return nested_discriminator

        return None

    if isinstance(model, list):
        for field in model:
            discriminator = search_nested_discriminator(field)
            if discriminator is not None:
                return discriminator

    discriminator = search_nested_discriminator(model)
    if discriminator is not None:
        return discriminator

    raise ValueError(f"Could not find discriminator field {fieldname} in {model}")

clean_text

clean_text(text)

Clean and normalize text data.

Parameters:

Name Type Description Default
text str | None

Text to clean

required

Returns:

Type Description
str | None

Cleaned text or None

Source code in src/griddy/core/utils/converters.py
def clean_text(text: str | None) -> str | None:
    """
    Clean and normalize text data.

    Args:
        text: Text to clean

    Returns:
        Cleaned text or None
    """
    if not text:
        return None

    # Strip whitespace and normalize
    cleaned = text.strip()
    if not cleaned:
        return None

    return cleaned

safe_float

safe_float(value)

Safely convert value to float.

Parameters:

Name Type Description Default
value Any

Value to convert

required

Returns:

Type Description
float | None

Float value or None

Source code in src/griddy/core/utils/converters.py
def safe_float(value: Any) -> float | None:
    """
    Safely convert value to float.

    Args:
        value: Value to convert

    Returns:
        Float value or None
    """
    if value is None:
        return None

    try:
        return float(value)
    except ValueError, TypeError:
        return None

safe_int

safe_int(value)

Safely convert value to integer.

Parameters:

Name Type Description Default
value Any

Value to convert

required

Returns:

Type Description
int | None

Integer value or None

Source code in src/griddy/core/utils/converters.py
def safe_int(value: Any) -> int | None:
    """
    Safely convert value to integer.

    Args:
        value: Value to convert

    Returns:
        Integer value or None
    """
    if value is None:
        return None

    try:
        return int(value)
    except ValueError, TypeError:
        return None

safe_numberify

safe_numberify(value)

Attempt to convert a string value to int, then float, falling back to the original string if neither conversion succeeds.

Parameters:

Name Type Description Default
value str | None

String value to convert

required

Returns:

Type Description
int | float | str | None

int, float, original string, or None

Source code in src/griddy/core/utils/converters.py
def safe_numberify(value: str | None) -> int | float | str | None:
    """
    Attempt to convert a string value to int, then float, falling back to
    the original string if neither conversion succeeds.

    Args:
        value: String value to convert

    Returns:
        int, float, original string, or None
    """
    if value is None:
        return None

    for convert in (int, float):
        try:
            return convert(value)
        except ValueError, TypeError:
            continue

    return value

cookies_to_dict

cookies_to_dict(cookies)

Convert a list of cookies to a dictionary format.

Parameters:

Name Type Description Default
cookies list[Cookie]

List of Cookie objects

required

Returns:

Type Description
dict[str, str]

Dictionary with cookie names as keys and values as values

Source code in src/griddy/core/utils/cookies.py
def cookies_to_dict(cookies: list[Cookie]) -> dict[str, str]:
    """
    Convert a list of cookies to a dictionary format.

    Args:
        cookies: List of Cookie objects

    Returns:
        Dictionary with cookie names as keys and values as values
    """
    result = {}
    for cookie in cookies:
        result[cookie.name] = cookie.value
    return result

cookies_to_header

cookies_to_header(cookies)

Convert a list of cookies to a Cookie header string.

Parameters:

Name Type Description Default
cookies list[Cookie]

List of Cookie objects

required

Returns:

Type Description
str

Cookie header string (e.g., "name1=value1; name2=value2")

Source code in src/griddy/core/utils/cookies.py
def cookies_to_header(cookies: list[Cookie]) -> str:
    """
    Convert a list of cookies to a Cookie header string.

    Args:
        cookies: List of Cookie objects

    Returns:
        Cookie header string (e.g., "name1=value1; name2=value2")
    """
    if not cookies:
        return ""

    return "; ".join(cookie.to_header_string() for cookie in cookies)

extract_cookies_as_dict

extract_cookies_as_dict(
    cookies_file, target_url, include_expired=False
)

Extract cookies for a URL and return as a dictionary.

Parameters:

Name Type Description Default
cookies_file str | Path

Path to the cookies.txt file

required
target_url str

URL to match cookies against

required
include_expired bool

Whether to include expired cookies

False

Returns:

Type Description
dict[str, str]

Dictionary with cookie names as keys and values as values

Source code in src/griddy/core/utils/cookies.py
def extract_cookies_as_dict(
    cookies_file: str | Path, target_url: str, include_expired: bool = False
) -> dict[str, str]:
    """
    Extract cookies for a URL and return as a dictionary.

    Args:
        cookies_file: Path to the cookies.txt file
        target_url: URL to match cookies against
        include_expired: Whether to include expired cookies

    Returns:
        Dictionary with cookie names as keys and values as values
    """
    cookies = extract_cookies_for_url(cookies_file, target_url, include_expired)
    return cookies_to_dict(cookies)

extract_cookies_as_header

extract_cookies_as_header(
    cookies_file, target_url, include_expired=False
)

Extract cookies for a URL and return as a Cookie header string.

Parameters:

Name Type Description Default
cookies_file str | Path

Path to the cookies.txt file

required
target_url str

URL to match cookies against

required
include_expired bool

Whether to include expired cookies

False

Returns:

Type Description
str

Cookie header string suitable for HTTP requests

Source code in src/griddy/core/utils/cookies.py
def extract_cookies_as_header(
    cookies_file: str | Path, target_url: str, include_expired: bool = False
) -> str:
    """
    Extract cookies for a URL and return as a Cookie header string.

    Args:
        cookies_file: Path to the cookies.txt file
        target_url: URL to match cookies against
        include_expired: Whether to include expired cookies

    Returns:
        Cookie header string suitable for HTTP requests
    """
    cookies = extract_cookies_for_url(cookies_file, target_url, include_expired)
    return cookies_to_header(cookies)

extract_cookies_for_url

extract_cookies_for_url(
    cookies_file, target_url, include_expired=False
)

Extract cookies that match a specific URL from a cookies.txt file.

Parameters:

Name Type Description Default
cookies_file str | Path

Path to the cookies.txt file

required
target_url str

URL to match cookies against

required
include_expired bool

Whether to include expired cookies

False

Returns:

Type Description
list[Cookie]

List of matching Cookie objects

Raises:

Type Description
FileNotFoundError

If the cookies file doesn't exist

ValueError

If the URL or file format is invalid

Source code in src/griddy/core/utils/cookies.py
def extract_cookies_for_url(
    cookies_file: str | Path, target_url: str, include_expired: bool = False
) -> list[Cookie]:
    """
    Extract cookies that match a specific URL from a cookies.txt file.

    Args:
        cookies_file: Path to the cookies.txt file
        target_url: URL to match cookies against
        include_expired: Whether to include expired cookies

    Returns:
        List of matching Cookie objects

    Raises:
        FileNotFoundError: If the cookies file doesn't exist
        ValueError: If the URL or file format is invalid
    """
    # Parse the target URL
    try:
        parsed_url = urlparse(target_url)
        domain = parsed_url.netloc.lower()
        path = parsed_url.path or "/"
        is_https = parsed_url.scheme.lower() == "https"
    except Exception:
        raise ValueError(f"Invalid URL: {target_url}")

    # Parse all cookies from file
    all_cookies = parse_cookies_txt(cookies_file)

    # Filter cookies that match the URL
    matching_cookies = []

    for cookie in all_cookies:
        # Skip expired cookies unless requested
        if not include_expired and cookie.is_expired:
            continue

        # Skip secure cookies on non-HTTPS URLs
        if cookie.secure and not is_https:
            continue

        # Check domain match
        if not cookie.matches_domain(domain):
            continue

        # Check path match
        if not cookie.matches_path(path):
            continue

        matching_cookies.append(cookie)

    return matching_cookies

parse_cookies_txt

parse_cookies_txt(file_path)

Parse a cookies.txt file and return a list of Cookie objects.

Parameters:

Name Type Description Default
file_path str | Path

Path to the cookies.txt file

required

Returns:

Type Description
list[Cookie]

List of Cookie objects

Raises:

Type Description
FileNotFoundError

If the cookies file doesn't exist

ValueError

If the file format is invalid

Source code in src/griddy/core/utils/cookies.py
def parse_cookies_txt(file_path: str | Path) -> list[Cookie]:
    """
    Parse a cookies.txt file and return a list of Cookie objects.

    Args:
        file_path: Path to the cookies.txt file

    Returns:
        List of Cookie objects

    Raises:
        FileNotFoundError: If the cookies file doesn't exist
        ValueError: If the file format is invalid
    """
    file_path = Path(file_path)

    if not file_path.exists():
        raise FileNotFoundError(f"Cookies file not found: {file_path}")

    cookies = []

    try:
        with open(file_path, "r", encoding="utf-8") as f:
            for line_num, line in enumerate(f, 1):
                line = line.strip()

                # Skip empty lines and comments
                if not line or line.startswith("#"):
                    continue

                # Parse tab-separated values
                parts = line.split("\t")

                # Netscape cookie format should have 7 fields
                if len(parts) != 7:
                    continue

                try:
                    domain = parts[0]
                    include_subdomains = parts[1].upper() == "TRUE"
                    path = parts[2]
                    secure = parts[3].upper() == "TRUE"
                    expires_str = parts[4]
                    name = parts[5]
                    value = parts[6]

                    # Parse expiration time
                    expires = None
                    if expires_str and expires_str != "0":
                        try:
                            expires = int(expires_str)
                        except ValueError:
                            expires = None

                    cookie = Cookie(
                        domain=domain,
                        path=path,
                        secure=secure,
                        expires=expires,
                        name=name,
                        value=value,
                        include_subdomains=include_subdomains,
                    )

                    cookies.append(cookie)

                except (IndexError, ValueError) as e:
                    # Log the error but continue parsing
                    continue

    except UnicodeDecodeError:
        raise ValueError(f"Invalid file encoding in {file_path}")

    return cookies

parse_date

parse_date(date_str)

Parse date string into datetime object.

Parameters:

Name Type Description Default
date_str str | None

Date string in various formats

required

Returns:

Type Description
datetime | None

Parsed datetime object or None

Source code in src/griddy/core/utils/datetimes.py
def parse_date(date_str: str | None) -> datetime | None:
    """
    Parse date string into datetime object.

    Args:
        date_str: Date string in various formats

    Returns:
        Parsed datetime object or None
    """
    if not date_str:
        return None

    # Common date formats to try
    formats = [
        "%Y-%m-%dT%H:%M:%S",
        "%Y-%m-%dT%H:%M:%SZ",
        "%Y-%m-%dT%H:%M:%S.%f",
        "%Y-%m-%dT%H:%M:%S.%fZ",
        "%Y-%m-%d %H:%M:%S",
        "%Y-%m-%d",
        "%m/%d/%Y",
        "%m/%d/%Y %H:%M:%S",
    ]

    for fmt in formats:
        try:
            dt = datetime.strptime(date_str, fmt)
            # Add timezone info if not present
            if dt.tzinfo is None:
                dt = dt.replace(tzinfo=timezone.utc)
            return dt
        except ValueError:
            continue

    # If no format matches, return None
    return None

parse_datetime

parse_datetime(datetime_string)

Convert a RFC 3339 / ISO 8601 formatted string into a datetime object. Python versions 3.11 and later support parsing RFC 3339 directly with datetime.fromisoformat(), but for earlier versions, this function encapsulates the necessary extra logic.

Source code in src/griddy/core/utils/datetimes.py
def parse_datetime(datetime_string: str) -> datetime:
    """
    Convert a RFC 3339 / ISO 8601 formatted string into a datetime object.
    Python versions 3.11 and later support parsing RFC 3339 directly with
    datetime.fromisoformat(), but for earlier versions, this function
    encapsulates the necessary extra logic.
    """
    # Python 3.11 and later can parse RFC 3339 directly
    if sys.version_info >= (3, 11):
        return datetime.fromisoformat(datetime_string)

    # For Python 3.10 and earlier, a common ValueError is trailing 'Z' suffix,
    # so fix that upfront.
    if datetime_string.endswith("Z"):
        datetime_string = datetime_string[:-1] + "+00:00"

    return datetime.fromisoformat(datetime_string)

consolidate_minified_entries

consolidate_minified_entries(entries)

Group minified HAR entries by API path into HarEntryPathManager instances.

Source code in src/griddy/core/utils/har.py
def consolidate_minified_entries(entries: List[Dict]) -> Dict[str, HarEntryPathManager]:
    """Group minified HAR entries by API path into HarEntryPathManager instances."""
    consolidated = {}

    for entry in entries:
        api_path = entry["request"]["path"]
        if api_path not in consolidated:
            consolidated[api_path] = HarEntryPathManager(
                path=api_path, filename_prefix="FiddlerSnapshots/ProNFL"
            )

        consolidated[api_path].add_entry(entry=entry)

    return consolidated

extract_minified_har_entry

extract_minified_har_entry(har_entry)

Extract a minimal request/response pair from a HAR entry.

Source code in src/griddy/core/utils/har.py
def extract_minified_har_entry(har_entry: Dict) -> Dict:
    """Extract a minimal request/response pair from a HAR entry."""
    response_json = json.loads(har_entry["response"]["content"]["text"])
    request_info = {
        key: value
        for key, value in har_entry["request"].items()
        if key in ["url", "headers", "queryString", "method", "path"]
    }

    url = request_info.pop("url")
    path_with_params = url.split(".com")[-1]
    path_only = path_with_params.split("?")[0]

    request_info["path"] = path_only

    return {"request": request_info, "response": response_json}

minify_har

minify_har(har_file)

Read a HAR file and return a list of minified request/response entries.

Source code in src/griddy/core/utils/har.py
def minify_har(har_file: str) -> List[Dict]:
    """Read a HAR file and return a list of minified request/response entries."""
    with open(har_file, mode="r", encoding="utf-8-sig") as infile:
        har_entries = json.load(infile)["log"]["entries"]

    minified_entries = []
    for entry in har_entries:
        minified_entries.append(extract_minified_har_entry(entry))

    return minified_entries

write_consolidated_to_files

write_consolidated_to_files(consolidated, exclude)

Write each consolidated path manager to a JSON file.

Source code in src/griddy/core/utils/har.py
def write_consolidated_to_files(
    consolidated: Dict[str, HarEntryPathManager], exclude: List[str]
) -> None:
    """Write each consolidated path manager to a JSON file."""
    for path, entries in consolidated.items():
        with open(entries.filename, "w") as outfile:
            jsonified = entries.as_dict(exclude=exclude)
            json.dump(jsonified, outfile, indent=4)

get_headers

get_headers(headers_params, gbls=None)

Extract HTTP headers from request params and globals.

Source code in src/griddy/core/utils/headers.py
def get_headers(headers_params: Any, gbls: Optional[Any] = None) -> Dict[str, str]:
    """Extract HTTP headers from request params and globals."""
    headers: Dict[str, str] = {}

    globals_already_populated = []
    if _is_set(headers_params):
        globals_already_populated = _populate_headers(headers_params, gbls, headers, [])
    if _is_set(gbls):
        _populate_headers(gbls, None, headers, globals_already_populated)

    return headers

get_response_headers

get_response_headers(headers)

Convert httpx Headers to a dict mapping header names to lists of values.

Source code in src/griddy/core/utils/headers.py
def get_response_headers(headers: Headers) -> Dict[str, List[str]]:
    """Convert httpx Headers to a dict mapping header names to lists of values."""
    res: Dict[str, List[str]] = {}
    for k, v in headers.items():
        if not k in res:
            res[k] = []

        res[k].append(v)
    return res

get_body_content

get_body_content(req)

Return the request body content as a string, or a placeholder for streaming.

Source code in src/griddy/core/utils/logger.py
def get_body_content(req: httpx.Request) -> str:
    """Return the request body content as a string, or a placeholder for streaming."""
    return "<streaming body>" if not hasattr(req, "_content") else str(req.content)

get_default_logger

get_default_logger(env_var='GRIDDY_DEBUG')

Get a default logger, optionally enabled via an environment variable.

Parameters:

Name Type Description Default
env_var str

Environment variable name to check for debug logging.

'GRIDDY_DEBUG'
Source code in src/griddy/core/utils/logger.py
def get_default_logger(env_var: str = "GRIDDY_DEBUG") -> Logger:
    """Get a default logger, optionally enabled via an environment variable.

    Args:
        env_var: Environment variable name to check for debug logging.
    """
    if os.getenv(env_var):
        logging.basicConfig(level=logging.DEBUG)
        return logging.getLogger("griddy")
    return NoOpLogger()

find_metadata

find_metadata(field_info, metadata_type)

Find the first metadata annotation of the given type on a field.

Source code in src/griddy/core/utils/metadata.py
def find_metadata(field_info: FieldInfo, metadata_type: Type[T]) -> Optional[T]:
    """Find the first metadata annotation of the given type on a field."""
    metadata = field_info.metadata
    if not metadata:
        return None

    for md in metadata:
        if isinstance(md, metadata_type):
            return md

    return None

get_query_params

get_query_params(query_params, gbls=None)

Extract query parameters from request params and globals.

Source code in src/griddy/core/utils/queryparams.py
def get_query_params(
    query_params: Any,
    gbls: Optional[Any] = None,
) -> Dict[str, List[str]]:
    """Extract query parameters from request params and globals."""
    params: Dict[str, List[str]] = {}

    globals_already_populated = _populate_query_params(query_params, gbls, params, [])
    if _is_set(gbls):
        _populate_query_params(gbls, None, params, globals_already_populated)

    return params

serialize_request_body

serialize_request_body(
    request_body,
    nullable,
    optional,
    serialization_method,
    request_body_type,
)

Serialize a request body based on the given serialization method (json, form, multipart, etc.).

Source code in src/griddy/core/utils/requestbodies.py
def serialize_request_body(
    request_body: Any,
    nullable: bool,
    optional: bool,
    serialization_method: str,
    request_body_type: Any,
) -> Optional[SerializedRequestBody]:
    """Serialize a request body based on the given serialization method (json, form, multipart, etc.)."""
    if request_body is None:
        if not nullable and optional:
            return None

    media_type = SERIALIZATION_METHOD_TO_CONTENT_TYPE[serialization_method]

    serialized_request_body = SerializedRequestBody(media_type)

    if re.match(r"(application|text)\/.*?\+*json.*", media_type) is not None:
        serialized_request_body.content = marshal_json(request_body, request_body_type)
    elif re.match(r"multipart\/.*", media_type) is not None:
        (
            serialized_request_body.media_type,
            serialized_request_body.data,
            serialized_request_body.files,
        ) = serialize_multipart_form(media_type, request_body)
    elif re.match(r"application\/x-www-form-urlencoded.*", media_type) is not None:
        serialized_request_body.data = serialize_form_data(request_body)
    elif isinstance(request_body, (bytes, bytearray, io.BytesIO, io.BufferedReader)):
        serialized_request_body.content = request_body
    elif isinstance(request_body, str):
        serialized_request_body.content = request_body
    else:
        raise TypeError(
            f"invalid request body type {type(request_body)} for mediaType {media_type}"
        )

    return serialized_request_body

retry

retry(func, retries)

Execute func with optional backoff retry logic based on retries config.

Source code in src/griddy/core/utils/retries.py
def retry(func: Callable[[], httpx.Response], retries: Retries) -> httpx.Response:
    """Execute func with optional backoff retry logic based on retries config."""
    if retries.config.strategy == "backoff":

        def do_request() -> httpx.Response:
            res: httpx.Response
            try:
                res = func()

                for code in retries.status_codes:
                    if "X" in code.upper():
                        code_range = int(code[0])

                        status_major = res.status_code / 100

                        if code_range <= status_major < code_range + 1:
                            raise TemporaryError(res)
                    else:
                        parsed_code = int(code)

                        if res.status_code == parsed_code:
                            raise TemporaryError(res)
            except httpx.ConnectError as exception:
                if retries.config.retry_connection_errors:
                    raise

                raise PermanentError(exception) from exception
            except httpx.TimeoutException as exception:
                if retries.config.retry_connection_errors:
                    raise

                raise PermanentError(exception) from exception
            except TemporaryError:
                raise
            except Exception as exception:
                raise PermanentError(exception) from exception

            return res

        return retry_with_backoff(
            do_request,
            retries.config.backoff.initial_interval,
            retries.config.backoff.max_interval,
            retries.config.backoff.exponent,
            retries.config.backoff.max_elapsed_time,
        )

    return func()

retry_async async

retry_async(func, retries)

Async variant of retry with optional backoff retry logic.

Source code in src/griddy/core/utils/retries.py
async def retry_async(func: Callable[[], Any], retries: Retries) -> httpx.Response:
    """Async variant of retry with optional backoff retry logic."""
    if retries.config.strategy == "backoff":

        async def do_request() -> httpx.Response:
            res: httpx.Response
            try:
                res = await func()

                for code in retries.status_codes:
                    if "X" in code.upper():
                        code_range = int(code[0])

                        status_major = res.status_code / 100

                        if code_range <= status_major < code_range + 1:
                            raise TemporaryError(res)
                    else:
                        parsed_code = int(code)

                        if res.status_code == parsed_code:
                            raise TemporaryError(res)
            except httpx.ConnectError as exception:
                if retries.config.retry_connection_errors:
                    raise

                raise PermanentError(exception) from exception
            except httpx.TimeoutException as exception:
                if retries.config.retry_connection_errors:
                    raise

                raise PermanentError(exception) from exception
            except TemporaryError:
                raise
            except Exception as exception:
                raise PermanentError(exception) from exception

            return res

        return await retry_with_backoff_async(
            do_request,
            retries.config.backoff.initial_interval,
            retries.config.backoff.max_interval,
            retries.config.backoff.exponent,
            retries.config.backoff.max_elapsed_time,
        )

    return await func()

retry_on_rate_limit

retry_on_rate_limit(max_retries=3, backoff_factor=1.0)

Decorator to retry function calls on rate limit errors.

Parameters:

Name Type Description Default
max_retries int

Maximum number of retry attempts

3
backoff_factor float

Factor for exponential backoff

1.0
Source code in src/griddy/core/utils/retries.py
def retry_on_rate_limit(max_retries: int = 3, backoff_factor: float = 1.0) -> Callable:
    """
    Decorator to retry function calls on rate limit errors.

    Args:
        max_retries: Maximum number of retry attempts
        backoff_factor: Factor for exponential backoff
    """

    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @wraps(func)
        def wrapper(*args, **kwargs) -> T:
            from griddy.core.exceptions import RateLimitError

            for attempt in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except RateLimitError as e:
                    if attempt == max_retries:
                        raise

                    # Calculate backoff time
                    backoff_time = backoff_factor * (2**attempt)
                    if e.retry_after:
                        backoff_time = max(backoff_time, e.retry_after)

                    time.sleep(backoff_time)

            return func(*args, **kwargs)  # This should never be reached

        return wrapper

    return decorator

get_security

get_security(security)

Extract security headers and query params from a Pydantic security model.

Source code in src/griddy/core/utils/security.py
def get_security(security: Any) -> Tuple[Dict[str, str], Dict[str, List[str]]]:
    """Extract security headers and query params from a Pydantic security model."""
    headers: Dict[str, str] = {}
    query_params: Dict[str, List[str]] = {}

    if security is None:
        return headers, query_params

    if not isinstance(security, BaseModel):
        raise TypeError("security must be a pydantic model")

    sec_fields: Dict[str, FieldInfo] = security.__class__.model_fields
    for name in sec_fields:
        sec_field = sec_fields[name]

        value = getattr(security, name)
        if value is None:
            continue

        metadata = find_field_metadata(sec_field, SecurityMetadata)
        if metadata is None:
            continue
        if metadata.option:
            _parse_security_option(headers, query_params, value)
            return headers, query_params
        if metadata.scheme:
            # Special case for basic auth or custom auth which could be a flattened model
            if metadata.sub_type in ["basic", "custom"] and not isinstance(
                value, BaseModel
            ):
                _parse_security_scheme(headers, query_params, metadata, name, security)
            else:
                _parse_security_scheme(headers, query_params, metadata, name, value)

    return headers, query_params

get_security_from_env

get_security_from_env(
    security, security_class, env_mapping=None
)

Resolve security from environment variables if not already set.

Parameters:

Name Type Description Default
security Any

Existing security instance, or None.

required
security_class Any

The Pydantic security model class.

required
env_mapping Optional[Dict[str, str]]

Dict mapping field names to environment variable names. If None, no env vars are checked.

None
Source code in src/griddy/core/utils/security.py
def get_security_from_env(
    security: Any,
    security_class: Any,
    env_mapping: Optional[Dict[str, str]] = None,
) -> Optional[BaseModel]:
    """Resolve security from environment variables if not already set.

    Args:
        security: Existing security instance, or None.
        security_class: The Pydantic security model class.
        env_mapping: Dict mapping field names to environment variable names.
            If None, no env vars are checked.
    """
    if security is not None:
        return security

    if not issubclass(security_class, BaseModel):
        raise TypeError("security_class must be a pydantic model class")

    if env_mapping is None:
        return None

    import os

    security_dict: Any = {}
    for field_name, env_var in env_mapping.items():
        value = os.getenv(env_var)
        if value:
            security_dict[field_name] = value

    return security_class(**security_dict) if security_dict else None

get_pydantic_model

get_pydantic_model(data, typ)

Return data as-is if it already contains Pydantic models, otherwise unmarshal.

Source code in src/griddy/core/utils/serializers.py
def get_pydantic_model(data: Any, typ: Any) -> Any:
    """Return data as-is if it already contains Pydantic models, otherwise unmarshal."""
    if not _contains_pydantic_model(data):
        return unmarshal(data, typ)

    return data

marshal_json

marshal_json(val, typ)

Serialize a value to a JSON string using a Pydantic model wrapper.

Source code in src/griddy/core/utils/serializers.py
def marshal_json(val: Any, typ: Any) -> str:
    """Serialize a value to a JSON string using a Pydantic model wrapper."""
    if is_nullable(typ) and val is None:
        return "null"

    marshaller = create_model(
        "Marshaller",
        body=(typ, ...),
        __config__=ConfigDict(populate_by_name=True, arbitrary_types_allowed=True),
    )

    m = marshaller(body=val)

    d = m.model_dump(by_alias=True, mode="json", exclude_none=True)

    if len(d) == 0:
        return ""

    return json.dumps(d[next(iter(d))], separators=(",", ":"))

serialize_decimal

serialize_decimal(as_str)

Return a Pydantic serializer for Decimal values.

Source code in src/griddy/core/utils/serializers.py
def serialize_decimal(as_str: bool) -> Callable:
    """Return a Pydantic serializer for Decimal values."""

    def serialize(d):
        # Optional[T] is a Union[T, None]
        if is_union(type(d)) and type(None) in get_args(type(d)) and d is None:
            return None
        if isinstance(d, Unset):
            return d

        if not isinstance(d, Decimal):
            raise ValueError("Expected Decimal object")

        return str(d) if as_str else float(d)

    return serialize

serialize_float

serialize_float(as_str)

Return a Pydantic serializer for float values.

Source code in src/griddy/core/utils/serializers.py
def serialize_float(as_str: bool) -> Callable:
    """Return a Pydantic serializer for float values."""

    def serialize(f):
        # Optional[T] is a Union[T, None]
        if is_union(type(f)) and type(None) in get_args(type(f)) and f is None:
            return None
        if isinstance(f, Unset):
            return f

        if not isinstance(f, float):
            raise ValueError("Expected float")

        return str(f) if as_str else f

    return serialize

serialize_int

serialize_int(as_str)

Return a Pydantic serializer for int values.

Source code in src/griddy/core/utils/serializers.py
def serialize_int(as_str: bool) -> Callable:
    """Return a Pydantic serializer for int values."""

    def serialize(i):
        # Optional[T] is a Union[T, None]
        if is_union(type(i)) and type(None) in get_args(type(i)) and i is None:
            return None
        if isinstance(i, Unset):
            return i

        if not isinstance(i, int):
            raise ValueError("Expected int")

        return str(i) if as_str else i

    return serialize

stream_to_bytes

stream_to_bytes(stream)

Read a streaming HTTP response into bytes.

Source code in src/griddy/core/utils/serializers.py
def stream_to_bytes(stream: httpx.Response) -> bytes:
    """Read a streaming HTTP response into bytes."""
    return stream.content

stream_to_bytes_async async

stream_to_bytes_async(stream)

Async read a streaming HTTP response into bytes.

Source code in src/griddy/core/utils/serializers.py
async def stream_to_bytes_async(stream: httpx.Response) -> bytes:
    """Async read a streaming HTTP response into bytes."""
    return await stream.aread()

stream_to_text

stream_to_text(stream)

Read a streaming HTTP response into a single text string.

Source code in src/griddy/core/utils/serializers.py
def stream_to_text(stream: httpx.Response) -> str:
    """Read a streaming HTTP response into a single text string."""
    return "".join(stream.iter_text())

stream_to_text_async async

stream_to_text_async(stream)

Async read a streaming HTTP response into a single text string.

Source code in src/griddy/core/utils/serializers.py
async def stream_to_text_async(stream: httpx.Response) -> str:
    """Async read a streaming HTTP response into a single text string."""
    return "".join([chunk async for chunk in stream.aiter_text()])

unmarshal

unmarshal(val, typ)

Coerce a parsed value into the given Pydantic-compatible type.

Source code in src/griddy/core/utils/serializers.py
def unmarshal(val: Any, typ: Any) -> Any:
    """Coerce a parsed value into the given Pydantic-compatible type."""
    unmarshaller = create_model(
        "Unmarshaller",
        body=(typ, ...),
        __config__=ConfigDict(populate_by_name=True, arbitrary_types_allowed=True),
    )

    m = unmarshaller(body=val)

    # pyright: ignore[reportAttributeAccessIssue]
    return m.body  # type: ignore

unmarshal_json

unmarshal_json(raw, typ)

Deserialize a raw JSON bytes/string into the given type.

Source code in src/griddy/core/utils/serializers.py
def unmarshal_json(raw: bytes | str, typ: Any) -> Any:
    """Deserialize a raw JSON bytes/string into the given type."""
    return unmarshal(from_json(raw), typ)

validate_const

validate_const(v)

Return a validator that enforces a constant value.

Source code in src/griddy/core/utils/serializers.py
def validate_const(v: Any) -> Callable:
    """Return a validator that enforces a constant value."""

    def validate(c):
        # Optional[T] is a Union[T, None]
        if is_union(type(c)) and type(None) in get_args(type(c)) and c is None:
            return None

        if v != c:
            raise ValueError(f"Expected {v}")

        return c

    return validate

validate_decimal

validate_decimal(d)

Validate and coerce a value to Decimal.

Source code in src/griddy/core/utils/serializers.py
def validate_decimal(d: Any) -> Decimal | None:
    """Validate and coerce a value to Decimal."""
    if d is None:
        return None

    if isinstance(d, (Decimal, Unset)):
        return d

    if not isinstance(d, (str, int, float)):
        raise ValueError("Expected string, int or float")

    return Decimal(str(d))

validate_float

validate_float(f)

Validate and coerce a value to float.

Source code in src/griddy/core/utils/serializers.py
def validate_float(f: Any) -> float | None:
    """Validate and coerce a value to float."""
    if f is None:
        return None

    if isinstance(f, (float, Unset)):
        return f

    if not isinstance(f, str):
        raise ValueError("Expected string")

    return float(f)

validate_int

validate_int(b)

Validate and coerce a value to int.

Source code in src/griddy/core/utils/serializers.py
def validate_int(b: Any) -> int | None:
    """Validate and coerce a value to int."""
    if b is None:
        return None

    if isinstance(b, (int, Unset)):
        return b

    if not isinstance(b, str):
        raise ValueError("Expected string")

    return int(b)

validate_open_enum

validate_open_enum(is_int)

Return a validator that accepts int or str enum values.

Source code in src/griddy/core/utils/serializers.py
def validate_open_enum(is_int: bool) -> Callable:
    """Return a validator that accepts int or str enum values."""

    def validate(e):
        if e is None:
            return None

        if isinstance(e, Unset):
            return e

        if is_int:
            if not isinstance(e, int):
                raise ValueError("Expected int")
        else:
            if not isinstance(e, str):
                raise ValueError("Expected string")

        return e

    return validate

build_url

build_url(base_url, path, params=None)

Build URL from base URL, path, and parameters.

Parameters:

Name Type Description Default
base_url str

Base URL

required
path str

URL path

required
params dict[str, any] | None

Query parameters

None

Returns:

Type Description
str

Complete URL

Source code in src/griddy/core/utils/url.py
def build_url(base_url: str, path: str, params: dict[str, any] | None = None) -> str:
    """
    Build URL from base URL, path, and parameters.

    Args:
        base_url: Base URL
        path: URL path
        params: Query parameters

    Returns:
        Complete URL
    """
    # Ensure base_url doesn't end with slash and path starts without slash
    base_url = base_url.rstrip("/")
    path = path.lstrip("/")

    url = f"{base_url}/{path}" if path else base_url

    if params:
        # Filter out None values
        filtered_params = {k: v for k, v in params.items() if v is not None}
        if filtered_params:
            from urllib.parse import urlencode

            url += f"?{urlencode(filtered_params)}"

    return url

generate_url

generate_url(server_url, path, path_params, gbls=None)

Generate a full URL by interpolating path params into the path template.

Source code in src/griddy/core/utils/url.py
def generate_url(
    server_url: str,
    path: str,
    path_params: Any,
    gbls: Optional[Any] = None,
) -> str:
    """Generate a full URL by interpolating path params into the path template."""
    path_param_values: Dict[str, str] = {}

    globals_already_populated = _populate_path_params(
        path_params, gbls, path_param_values, []
    )
    if _is_set(gbls):
        _populate_path_params(gbls, None, path_param_values, globals_already_populated)

    for key, value in path_param_values.items():
        path = path.replace("{" + key + "}", value, 1)

    return remove_suffix(server_url, "/") + path

remove_suffix

remove_suffix(input_string, suffix)

Remove a suffix from a string if present.

Source code in src/griddy/core/utils/url.py
def remove_suffix(input_string: str, suffix: str) -> str:
    """Remove a suffix from a string if present."""
    if suffix and input_string.endswith(suffix):
        return input_string[: -len(suffix)]
    return input_string

template_url

template_url(url_with_params, params)

Replace {key} placeholders in a URL with the given param values.

Source code in src/griddy/core/utils/url.py
def template_url(url_with_params: str, params: Dict[str, str]) -> str:
    """Replace {key} placeholders in a URL with the given param values."""
    for key, value in params.items():
        url_with_params = url_with_params.replace("{" + key + "}", value)

    return url_with_params

cast_partial

cast_partial(typ)

Return a partial of typing.cast for the given type.

Source code in src/griddy/core/utils/values.py
def cast_partial(typ: Any) -> Callable:
    """Return a partial of typing.cast for the given type."""
    return partial(cast, typ)

get_global_from_env

get_global_from_env(value, env_key, type_cast)

Return value if set, otherwise read from environment and cast.

Source code in src/griddy/core/utils/values.py
def get_global_from_env(
    value: Optional[T], env_key: str, type_cast: Callable[[str], T]
) -> Optional[T]:
    """Return value if set, otherwise read from environment and cast."""
    if value is not None:
        return value
    env_value = os.getenv(env_key)
    if env_value is not None:
        try:
            return type_cast(env_value)
        except ValueError:
            pass
    return None

match_content_type

match_content_type(content_type, pattern)

Check if a content type matches a pattern (exact, wildcard, or partial).

Source code in src/griddy/core/utils/values.py
def match_content_type(content_type: str, pattern: str) -> bool:
    """Check if a content type matches a pattern (exact, wildcard, or partial)."""
    if pattern in (content_type, "*", "*/*"):
        return True

    msg = Message()
    msg["content-type"] = content_type
    media_type = msg.get_content_type()

    if media_type == pattern:
        return True

    parts = media_type.split("/")
    if len(parts) == 2:
        if pattern in (f"{parts[0]}/*", f"*/{parts[1]}"):
            return True

    return False

match_response

match_response(response, code, content_type)

Check if an HTTP response matches the given status codes and content type.

Source code in src/griddy/core/utils/values.py
def match_response(
    response: Response, code: Union[str, List[str]], content_type: str
) -> bool:
    """Check if an HTTP response matches the given status codes and content type."""
    codes = code if isinstance(code, list) else [code]
    return match_status_codes(codes, response.status_code) and match_content_type(
        response.headers.get("content-type", "application/octet-stream"), content_type
    )

match_status_codes

match_status_codes(status_codes, status_code)

Check if a status code matches any of the given patterns (exact or wildcard like 4XX).

Source code in src/griddy/core/utils/values.py
def match_status_codes(status_codes: List[str], status_code: int) -> bool:
    """Check if a status code matches any of the given patterns (exact or wildcard like 4XX)."""
    if "default" in status_codes:
        return True

    for code in status_codes:
        if code == str(status_code):
            return True

        if code.endswith("XX") and code.startswith(str(status_code)[:1]):
            return True
    return False