Source code for marshmallow_sqlalchemy.convert

from __future__ import annotations

import functools
import inspect
import uuid
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Literal,
    Union,
    cast,
    overload,
)

# Remove when dropping Python 3.9
try:
    from typing import TypeAlias, TypeGuard
except ImportError:
    from typing_extensions import TypeAlias, TypeGuard

import marshmallow as ma
import sqlalchemy as sa
from marshmallow import fields, validate
from sqlalchemy.dialects import mssql, mysql, postgresql
from sqlalchemy.orm import SynonymProperty

from .exceptions import ModelConversionError
from .fields import Related, RelatedList

if TYPE_CHECKING:
    from collections.abc import Iterable

    from sqlalchemy.ext.declarative import DeclarativeMeta
    from sqlalchemy.orm import MapperProperty
    from sqlalchemy.types import TypeEngine

    PropertyOrColumn: TypeAlias = MapperProperty | sa.Column

_FieldPartial: TypeAlias = Callable[[], fields.Field]
# TODO: Use more specific type for second argument
_FieldClassFactory: TypeAlias = Callable[
    ["ModelConverter", Any], Union[type[fields.Field], _FieldPartial]
]


def _is_field(value: Any) -> TypeGuard[type[fields.Field]]:
    return isinstance(value, type) and issubclass(value, fields.Field)


def _base_column(column):
    """Unwrap proxied columns"""
    if column not in column.base_columns and len(column.base_columns) == 1:
        [base] = column.base_columns
        return base
    return column


def _has_default(column) -> bool:
    return (
        column.default is not None
        or column.server_default is not None
        or _is_auto_increment(column)
    )


def _is_auto_increment(column) -> bool:
    return column.table is not None and column is column.table._autoincrement_column


def _list_field_factory(
    converter: ModelConverter, data_type: postgresql.ARRAY
) -> Callable[[], fields.List]:
    FieldClass = converter._get_field_class_for_data_type(data_type.item_type)
    inner = FieldClass()
    if not data_type.dimensions or data_type.dimensions == 1:
        return functools.partial(fields.List, inner)

    # For multi-dimensional arrays, nest the Lists
    dimensions = data_type.dimensions
    for _ in range(dimensions - 1):
        inner = fields.List(inner)

    return functools.partial(fields.List, inner)


def _enum_field_factory(
    converter: ModelConverter, data_type: sa.Enum
) -> Callable[[], fields.Field]:
    return (
        functools.partial(fields.Enum, enum=data_type.enum_class)
        if data_type.enum_class
        else fields.Raw
    )


[docs] class ModelConverter: """Converts a SQLAlchemy model into a dictionary of corresponding marshmallow `Fields <marshmallow.fields.Field>`. """ SQLA_TYPE_MAPPING: dict[ type[TypeEngine], type[fields.Field] | _FieldClassFactory ] = { sa.Enum: _enum_field_factory, sa.JSON: fields.Raw, sa.ARRAY: _list_field_factory, sa.PickleType: fields.Raw, postgresql.BIT: fields.Integer, postgresql.OID: fields.Integer, postgresql.UUID: fields.UUID, postgresql.MACADDR: fields.String, postgresql.INET: fields.String, postgresql.CIDR: fields.String, postgresql.JSON: fields.Raw, postgresql.JSONB: fields.Raw, postgresql.HSTORE: fields.Raw, postgresql.ARRAY: _list_field_factory, postgresql.MONEY: fields.Decimal, postgresql.DATE: fields.Date, postgresql.TIME: fields.Time, mysql.BIT: fields.Integer, mysql.YEAR: fields.Integer, mysql.SET: fields.List, mysql.ENUM: fields.Field, mysql.INTEGER: fields.Integer, mysql.DATETIME: fields.DateTime, mssql.BIT: fields.Integer, mssql.UNIQUEIDENTIFIER: fields.UUID, } DIRECTION_MAPPING = {"MANYTOONE": False, "MANYTOMANY": True, "ONETOMANY": True} def __init__(self, schema_cls: type[ma.Schema] | None = None): self.schema_cls = schema_cls @property def type_mapping(self) -> dict[type, type[fields.Field]]: if self.schema_cls: return self.schema_cls.TYPE_MAPPING return ma.Schema.TYPE_MAPPING
[docs] def fields_for_model( self, model: type[DeclarativeMeta], *, include_fk: bool = False, include_relationships: bool = False, fields: Iterable[str] | None = None, exclude: Iterable[str] | None = None, base_fields: dict | None = None, dict_cls: type[dict] = dict, ) -> dict[str, fields.Field]: """Generate a dict of field_name: `marshmallow.fields.Field` pairs for the given model. Note: SynonymProperties are ignored. Use an explicit field if you want to include a synonym. :param model: The SQLAlchemy model :param bool include_fk: Whether to include foreign key fields in the output. :param bool include_relationships: Whether to include relationships fields in the output. :return: dict of field_name: Field instance pairs """ result = dict_cls() base_fields = base_fields or {} for prop in sa.inspect(model).attrs: # type: ignore[union-attr] key = self._get_field_name(prop) if self._should_exclude_field(prop, fields=fields, exclude=exclude): # Allow marshmallow to validate and exclude the field key. result[key] = None continue if isinstance(prop, SynonymProperty): continue if hasattr(prop, "columns"): if not include_fk: # Only skip a column if there is no overriden column # which does not have a Foreign Key. for column in prop.columns: if not column.foreign_keys: break else: continue if not include_relationships and hasattr(prop, "direction"): continue field = base_fields.get(key) or self.property2field(prop) if field: result[key] = field return result
def fields_for_table( self, table: sa.Table, *, include_fk: bool = False, fields: Iterable[str] | None = None, exclude: Iterable[str] | None = None, base_fields: dict | None = None, dict_cls: type[dict] = dict, ) -> dict[str, fields.Field]: result = dict_cls() base_fields = base_fields or {} for column in table.columns: key = self._get_field_name(column) if self._should_exclude_field(column, fields=fields, exclude=exclude): # Allow marshmallow to validate and exclude the field key. result[key] = None continue if not include_fk and column.foreign_keys: continue # Overridden fields are specified relative to key generated by # self._get_key_for_column(...), rather than keys in source model field = base_fields.get(key) or self.column2field(column) if field: result[key] = field return result @overload def property2field( self, prop: MapperProperty, *, instance: Literal[True] = ..., field_class: type[fields.Field] | None = ..., **kwargs, ) -> fields.Field: ... @overload def property2field( self, prop: MapperProperty, *, instance: Literal[False] = ..., field_class: type[fields.Field] | None = ..., **kwargs, ) -> type[fields.Field]: ...
[docs] def property2field( self, prop: MapperProperty, *, instance: bool = True, field_class: type[fields.Field] | None = None, **kwargs, ) -> fields.Field | type[fields.Field]: """Convert a SQLAlchemy `Property` to a field instance or class. :param Property prop: SQLAlchemy Property. :param bool instance: If `True`, return `Field` instance, computing relevant kwargs from the given property. If `False`, return the `Field` class. :param kwargs: Additional keyword arguments to pass to the field constructor. :return: A `marshmallow.fields.Field` class or instance. """ # handle synonyms # Attribute renamed "_proxied_object" in 1.4 for attr in ("_proxied_property", "_proxied_object"): proxied_obj = getattr(prop, attr, None) if proxied_obj is not None: prop = proxied_obj field_class = field_class or self._get_field_class_for_property(prop) if not instance: return field_class field_kwargs = self._get_field_kwargs_for_property(prop) field_kwargs.update(kwargs) ret = field_class(**field_kwargs) if ( hasattr(prop, "direction") and self.DIRECTION_MAPPING[prop.direction.name] and prop.uselist is True ): ret = RelatedList(ret, **{**self.get_base_kwargs(), **kwargs}) return ret
@overload def column2field( self, column, *, instance: Literal[True] = ..., **kwargs ) -> fields.Field: ... @overload def column2field( self, column, *, instance: Literal[False] = ..., **kwargs ) -> type[fields.Field]: ...
[docs] def column2field( self, column, *, instance: bool = True, **kwargs ) -> fields.Field | type[fields.Field]: """Convert a SQLAlchemy `Column <sqlalchemy.schema.Column>` to a field instance or class. :param sqlalchemy.schema.Column column: SQLAlchemy Column. :param bool instance: If `True`, return `Field` instance, computing relevant kwargs from the given property. If `False`, return the `Field` class. :return: A `marshmallow.fields.Field` class or instance. """ field_class = self._get_field_class_for_column(column) if not instance: return field_class field_kwargs = self.get_base_kwargs() self._add_column_kwargs(field_kwargs, column) return field_class(**{**field_kwargs, **kwargs})
@overload def field_for( self, model: type[DeclarativeMeta], property_name: str, *, instance: Literal[True] = ..., field_class: type[fields.Field] | None = ..., **kwargs, ) -> fields.Field: ... @overload def field_for( self, model: type[DeclarativeMeta], property_name: str, *, instance: Literal[False] = ..., field_class: type[fields.Field] | None = None, **kwargs, ) -> type[fields.Field]: ...
[docs] def field_for( self, model: type[DeclarativeMeta], property_name: str, *, instance: bool = True, field_class: type[fields.Field] | None = None, **kwargs, ) -> fields.Field | type[fields.Field]: """Convert a property for a mapped SQLAlchemy class to a marshmallow `Field`. Example: :: date_created = field_for(Author, "date_created", dump_only=True) author = field_for(Book, "author") :param type model: A SQLAlchemy mapped class. :param str property_name: The name of the property to convert. :param kwargs: Extra keyword arguments to pass to `property2field` :return: A `marshmallow.fields.Field` class or instance. """ target_model = model prop_name = property_name attr = getattr(model, property_name) remote_with_local_multiplicity = False if hasattr(attr, "remote_attr"): target_model = attr.target_class prop_name = attr.value_attr remote_with_local_multiplicity = attr.local_attr.prop.uselist prop: MapperProperty = sa.inspect(target_model).attrs.get(prop_name) # type: ignore[union-attr] converted_prop = self.property2field( prop, # To satisfy type checking, need to pass a literal bool instance=True if instance else False, # noqa: SIM210 field_class=field_class, **kwargs, ) if remote_with_local_multiplicity: return RelatedList(converted_prop, **{**self.get_base_kwargs(), **kwargs}) return converted_prop
def _get_field_name(self, prop_or_column: PropertyOrColumn) -> str: return prop_or_column.key def _get_field_class_for_column(self, column: sa.Column) -> type[fields.Field]: return self._get_field_class_for_data_type(column.type) def _get_field_class_for_data_type( self, data_type: TypeEngine ) -> type[fields.Field]: field_cls: type[fields.Field] | _FieldPartial | None = None types = inspect.getmro(type(data_type)) # First search for a field class from self.SQLA_TYPE_MAPPING for col_type in types: if col_type in self.SQLA_TYPE_MAPPING: field_or_factory = self.SQLA_TYPE_MAPPING[col_type] if _is_field(field_or_factory): field_cls = field_or_factory else: field_cls = cast(_FieldClassFactory, field_or_factory)( self, data_type ) break else: # Try to find a field class based on the column's python_type try: python_type = data_type.python_type except NotImplementedError: python_type = None if python_type in self.type_mapping: field_cls = self.type_mapping[python_type] else: if hasattr(data_type, "impl"): return self._get_field_class_for_data_type(data_type.impl) raise ModelConversionError( f"Could not find field column of type {types[0]}." ) return cast(type[fields.Field], field_cls) def _get_field_class_for_property(self, prop) -> type[fields.Field]: field_cls: type[fields.Field] if hasattr(prop, "direction"): field_cls = Related else: column = _base_column(prop.columns[0]) field_cls = self._get_field_class_for_column(column) return field_cls def _get_field_kwargs_for_property(self, prop: PropertyOrColumn) -> dict[str, Any]: kwargs = self.get_base_kwargs() if hasattr(prop, "columns"): column = _base_column(prop.columns[0]) self._add_column_kwargs(kwargs, column) prop = column if hasattr(prop, "direction"): # Relationship property self._add_relationship_kwargs(kwargs, prop) if getattr(prop, "doc", None): # Useful for documentation generation kwargs["metadata"]["description"] = prop.doc return kwargs def _add_column_kwargs(self, kwargs: dict[str, Any], column: sa.Column) -> None: """Add keyword arguments to kwargs (in-place) based on the passed in `Column <sqlalchemy.schema.Column>`. """ if hasattr(column, "nullable"): if column.nullable: kwargs["allow_none"] = True kwargs["required"] = not column.nullable and not _has_default(column) # If there is no nullable attribute, we are dealing with a property # that does not derive from the Column class. Mark as dump_only. else: kwargs["dump_only"] = True if hasattr(column.type, "enum_class") and column.type.enum_class is not None: kwargs["enum"] = column.type.enum_class elif hasattr(column.type, "enums") and not kwargs.get("dump_only"): kwargs["validate"].append(validate.OneOf(choices=column.type.enums)) # Add a length validator if a max length is set on the column # Skip UUID columns # (see https://github.com/marshmallow-code/marshmallow-sqlalchemy/issues/54) if hasattr(column.type, "length") and not kwargs.get("dump_only"): column_length = column.type.length if column_length is not None: try: python_type = column.type.python_type except (AttributeError, NotImplementedError): python_type = None if not python_type or not issubclass(python_type, uuid.UUID): kwargs["validate"].append(validate.Length(max=column_length)) if getattr(column.type, "asdecimal", False): kwargs["places"] = getattr(column.type, "scale", None) def _add_relationship_kwargs( self, kwargs: dict[str, Any], prop: PropertyOrColumn ) -> None: """Add keyword arguments to kwargs (in-place) based on the passed in relationship `Property`. """ nullable = True for pair in prop.local_remote_pairs: if not pair[0].nullable: if ( prop.uselist is True or self.DIRECTION_MAPPING[prop.direction.name] is False ): nullable = False break kwargs.update({"allow_none": nullable, "required": not nullable}) def _should_exclude_field( self, column: PropertyOrColumn, fields: Iterable[str] | None = None, exclude: Iterable[str] | None = None, ) -> bool: key = self._get_field_name(column) if fields and key not in fields: return True return bool(exclude and key in exclude) def get_base_kwargs(self): return {"validate": [], "metadata": {}}
default_converter = ModelConverter() fields_for_model = default_converter.fields_for_model property2field = default_converter.property2field column2field = default_converter.column2field field_for = default_converter.field_for