"""
----------------------------------------------------------------------------
METADATA:
File: base.py
Project: paperap
Created: 2025-03-04
Version: 0.0.9
Author: Jess Mann
Email: jess@jmann.me
Copyright (c) 2025 Jess Mann
----------------------------------------------------------------------------
LAST MODIFIED:
2025-03-04 By Jess Mann
"""
from __future__ import annotations
import concurrent.futures
import logging
import threading
import time
import types
from abc import ABC, abstractmethod
from datetime import datetime
from decimal import Decimal
from enum import StrEnum
from typing import TYPE_CHECKING, Any, ClassVar, Generic, Literal, Self, TypedDict, cast, override
import pydantic
from pydantic import Field, PrivateAttr
from typing_extensions import TypeVar
from paperap.const import FilteringStrategies, ModelStatus
from paperap.exceptions import APIError, ConfigurationError, ReadOnlyFieldError, RequestError, ResourceNotFoundError
from paperap.models.abstract.meta import StatusContext
from paperap.signals import registry
if TYPE_CHECKING:
from paperap.client import PaperlessClient
from paperap.resources.base import BaseResource, StandardResource
logger = logging.getLogger(__name__)
[docs]
class ModelConfigType(TypedDict):
populate_by_name: bool
validate_assignment: bool
validate_default: bool
use_enum_values: bool
extra: Literal["ignore"]
arbitrary_types_allowed: bool
BASE_MODEL_CONFIG: ModelConfigType = {
"populate_by_name": True,
"validate_assignment": True,
"validate_default": True,
"use_enum_values": True,
"extra": "ignore",
"arbitrary_types_allowed": True,
}
[docs]
class BaseModel(pydantic.BaseModel, ABC):
"""
Base model for all Paperless-ngx API objects.
Provides automatic serialization, deserialization, and API interactions
with minimal configuration needed.
Attributes:
_meta: Metadata for the model, including filtering and resource information.
_save_lock: Lock for saving operations.
_pending_save: Future object for pending save operations.
Raises:
ValueError: If resource is not provided.
"""
_meta: ClassVar["Meta[Self]"]
_save_lock: threading.RLock = PrivateAttr(default_factory=threading.RLock)
_pending_save: concurrent.futures.Future[Any] | None = PrivateAttr(default=None)
_save_executor: concurrent.futures.ThreadPoolExecutor | None = None
# Updating attributes will not trigger save()
_status: ModelStatus = ModelStatus.INITIALIZING # The last data we retrieved from the db
# this is used to calculate if the model is dirty
_original_data: dict[str, Any] = {}
# The last data we sent to the db to save
# This is used to determine if the model has been changed in the time it took to perform a save
_saved_data: dict[str, Any] = {}
_resource: "BaseResource[Self]"
[docs]
@override
def __init_subclass__(cls, **kwargs: Any) -> None:
"""
Initialize subclass and set up metadata.
Args:
**kwargs: Additional keyword arguments.
"""
super().__init_subclass__(**kwargs)
# Ensure the subclass has its own Meta definition.
# If not, create a new one inheriting from the parent’s Meta.
# If the subclass hasn't defined its own Meta, auto-generate one.
if "Meta" not in cls.__dict__:
top_meta: type[BaseModel.Meta[Self]] | None = None
# Iterate over ancestors to get the top-most explicitly defined Meta.
for base in cls.__mro__[1:]:
if "Meta" in base.__dict__:
top_meta = cast("type[BaseModel.Meta[Self]]", base.Meta)
break
if top_meta is None:
# This should never happen.
raise ConfigurationError(f"Meta class not found in {cls.__name__} or its bases")
# Create a new Meta class that inherits from the top-most Meta.
meta_attrs = {
k: v
for k, v in vars(top_meta).items()
if not k.startswith("_") # Avoid special attributes like __parameters__
}
cls.Meta = type("Meta", (top_meta,), meta_attrs) # type: ignore # mypy complains about setting to a type
logger.debug(
"Auto-generated Meta for %s inheriting from %s",
cls.__name__,
top_meta.__name__,
)
# Append read_only_fields from all parents to Meta
# Same with filtering_disabled
# Retrieve filtering_fields from the attributes of the class
read_only_fields = (cls.Meta.read_only_fields or set[str]()).copy()
filtering_disabled = (cls.Meta.filtering_disabled or set[str]()).copy()
filtering_fields = set(cls.__annotations__.keys())
supported_filtering_params = cls.Meta.supported_filtering_params
blacklist_filtering_params = cls.Meta.blacklist_filtering_params
field_map = cls.Meta.field_map
for base in cls.__bases__:
_meta: BaseModel.Meta[Self] | None
if _meta := getattr(base, "Meta", None): # type: ignore # we are confident this is BaseModel.Meta
if hasattr(_meta, "read_only_fields"):
read_only_fields.update(_meta.read_only_fields)
if hasattr(_meta, "filtering_disabled"):
filtering_disabled.update(_meta.filtering_disabled)
if hasattr(_meta, "filtering_fields"):
filtering_fields.update(_meta.filtering_fields)
if hasattr(_meta, "supported_filtering_params"):
supported_filtering_params.update(_meta.supported_filtering_params)
if hasattr(_meta, "blacklist_filtering_params"):
blacklist_filtering_params.update(_meta.blacklist_filtering_params)
if hasattr(_meta, "field_map"):
field_map.update(_meta.field_map)
cls.Meta.read_only_fields = read_only_fields
cls.Meta.filtering_disabled = filtering_disabled
# excluding filtering_disabled from filtering_fields
cls.Meta.filtering_fields = filtering_fields - filtering_disabled
cls.Meta.supported_filtering_params = supported_filtering_params
cls.Meta.blacklist_filtering_params = blacklist_filtering_params
cls.Meta.field_map = field_map
# Instantiate _meta
cls._meta = cls.Meta(cls) # type: ignore # due to a mypy bug in version 1.15.0 (issue #18776)
# Set name defaults
if not hasattr(cls._meta, "name"):
cls._meta.name = cls.__name__.lower()
# Configure Pydantic behavior
# type ignore because mypy complains about non-required keys
model_config = pydantic.ConfigDict(**BASE_MODEL_CONFIG) # type: ignore
[docs]
def __init__(self, **data: Any) -> None:
"""
Initialize the model with resource and data.
Args:
resource: The BaseResource instance.
**data: Additional data to initialize the model.
Raises:
ValueError: If resource is not provided.
"""
super().__init__(**data)
if not hasattr(self, "_resource"):
raise ValueError(f"Resource required. Initialize resource for {self.__class__.__name__} before instantiating models.")
@property
def _client(self) -> "PaperlessClient":
"""
Get the client associated with this model.
Returns:
The PaperlessClient instance.
"""
return self._resource.client
@property
def resource(self) -> "BaseResource[Self]":
return self._resource
@property
def save_executor(self) -> concurrent.futures.ThreadPoolExecutor:
if not self._save_executor:
self._save_executor = concurrent.futures.ThreadPoolExecutor(max_workers=5, thread_name_prefix="model_save_worker")
return self._save_executor
[docs]
def cleanup(self) -> None:
"""Clean up resources used by the model class."""
if self._save_executor:
self._save_executor.shutdown(wait=True)
self._save_executor = None
@override
def model_post_init(self, __context: Any) -> None:
super().model_post_init(__context)
# Save original_data to support dirty fields
self._original_data = self.model_dump()
# Allow updating attributes to trigger save() automatically
self._status = ModelStatus.READY
super().model_post_init(__context)
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> Self:
"""
Create a model instance from API response data.
Args:
data: Dictionary containing the API response data.
Returns:
A model instance initialized with the provided data.
Examples:
# Create a Document instance from API data
doc = Document.from_dict(api_data)
"""
return cls._resource.parse_to_model(data)
[docs]
def to_dict(
self,
*,
include_read_only: bool = True,
exclude_none: bool = False,
exclude_unset: bool = True,
) -> dict[str, Any]:
"""
Convert the model to a dictionary for API requests.
Args:
include_read_only: Whether to include read-only fields.
exclude_none: Whether to exclude fields with None values.
exclude_unset: Whether to exclude fields that are not set.
Returns:
A dictionary with model data ready for API submission.
Examples:
# Convert a Document instance to a dictionary
data = doc.to_dict()
"""
exclude: set[str] = set() if include_read_only else set(self._meta.read_only_fields)
return self.model_dump(
exclude=exclude,
exclude_none=exclude_none,
exclude_unset=exclude_unset,
)
[docs]
def dirty_fields(self, comparison: Literal["saved", "db", "both"] = "both") -> dict[str, tuple[Any, Any]]:
"""
Show which fields have changed since last update from the paperless ngx db.
Args:
comparison:
Specify the data to compare ('saved' or 'db').
Db is the last data retrieved from Paperless NGX
Saved is the last data sent to Paperless NGX to be saved
Returns:
A dictionary {field: (original_value, new_value)} of fields that have
changed since last update from the paperless ngx db.
"""
current_data = self.model_dump()
current_data.pop("id", None)
if comparison == "saved":
compare_dict = self._saved_data
elif comparison == "db":
compare_dict = self._original_data
else:
# For 'both', we want to compare against both original and saved data
# A field is dirty if it differs from either original or saved data
compare_dict = {}
for field in set(list(self._original_data.keys()) + list(self._saved_data.keys())):
# ID cannot change, and is not set before first save sometimes
if field == "id":
continue
# Prefer original data (from DB) over saved data when both exist
compare_dict[field] = self._original_data.get(field, self._saved_data.get(field))
return {
field: (compare_dict.get(field, None), current_data.get(field, None))
for field in current_data
if compare_dict.get(field, None) != current_data.get(field, None)
}
[docs]
def is_dirty(self, comparison: Literal["saved", "db", "both"] = "both") -> bool:
"""
Check if any field has changed since last update from the paperless ngx db.
Args:
comparison:
Specify the data to compare ('saved' or 'db').
Db is the last data retrieved from Paperless NGX
Saved is the last data sent to Paperless NGX to be saved
Returns:
True if any field has changed.
"""
if self.is_new():
return True
return bool(self.dirty_fields(comparison=comparison))
[docs]
@classmethod
def create(cls, **kwargs: Any) -> Self:
"""
Create a new model instance.
Args:
**kwargs: Field values to set.
Returns:
A new model instance.
Examples:
# Create a new Document instance
doc = Document.create(filename="example.pdf", contents=b"PDF data")
"""
return cls._resource.create(**kwargs)
[docs]
def delete(self) -> None:
return self._resource.delete(self)
[docs]
def update_locally(self, *, from_db: bool | None = None, skip_changed_fields: bool = False, **kwargs: Any) -> None:
"""
Update model attributes without triggering automatic save.
Args:
**kwargs: Field values to update
Returns:
Self with updated values
"""
from_db = from_db if from_db is not None else False
# Avoid infinite saving loops
with StatusContext(self, ModelStatus.UPDATING):
# Ensure read-only fields were not changed
if not from_db:
for field in self._meta.read_only_fields:
if field in kwargs and kwargs[field] != self._original_data.get(field, None):
raise ReadOnlyFieldError(f"Cannot change read-only field {field}")
# If the field contains unsaved changes, skip updating it
# Determine unsaved changes based on the dirty fields before we last called save
if skip_changed_fields:
unsaved_changes = self.dirty_fields(comparison="saved")
kwargs = {k: v for k, v in kwargs.items() if k not in unsaved_changes}
for name, value in kwargs.items():
setattr(self, name, value)
# Dirty has been reset
if from_db:
self._original_data = self.model_dump()
[docs]
def update(self, **kwargs: Any) -> None:
"""
Update this model with new values.
Subclasses implement this with auto-saving features.
However, base BaseModel instances simply call update_locally.
Args:
**kwargs: New field values.
Examples:
# Update a Document instance
doc.update(filename="new_example.pdf")
"""
# Since we have no id, we can't save. Therefore, all updates are silent updates
# subclasses may implement this.
self.update_locally(**kwargs)
[docs]
@abstractmethod
def is_new(self) -> bool:
"""
Check if this model represents a new (unsaved) object.
Returns:
True if the model is new, False otherwise.
Examples:
# Check if a Document instance is new
is_new = doc.is_new()
"""
[docs]
def should_save_on_write(self) -> bool:
"""
Check if the model should save on attribute write, factoring in the client settings.
"""
if self._meta.save_on_write is not None:
return self._meta.save_on_write
return self._resource.client.settings.save_on_write
[docs]
def enable_save_on_write(self) -> None:
"""
Enable automatic saving on attribute write.
"""
self._meta.save_on_write = True
[docs]
def disable_save_on_write(self) -> None:
"""
Disable automatic saving on attribute write.
"""
self._meta.save_on_write = False
[docs]
def matches_dict(self, data: dict[str, Any]) -> bool:
"""
Check if the model matches the provided data.
Args:
data: Dictionary containing the data to compare.
Returns:
True if the model matches the data, False otherwise.
Examples:
# Check if a Document instance matches API data
matches = doc.matches_dict(api_data)
"""
return self.to_dict() == data
[docs]
@override
def __str__(self) -> str:
"""
Human-readable string representation.
Returns:
A string representation of the model.
"""
return f"{self._meta.name.capitalize()}"
[docs]
class StandardModel(BaseModel, ABC):
"""
Standard model for Paperless-ngx API objects with an ID field.
Attributes:
id: Unique identifier for the model.
"""
id: int = Field(description="Unique identifier from Paperless NGX", default=0)
_resource: "StandardResource[Self]" # type: ignore # override
@property
def resource(self) -> "StandardResource[Self]": # type: ignore
return self._resource
[docs]
@override
def update(self, **kwargs: Any) -> None:
"""
Update this model with new values and save changes.
NOTE: new instances will not be saved automatically.
(I'm not sure if that's the right design decision or not)
Args:
**kwargs: New field values.
"""
# Hold off on saving until all updates are complete
self.update_locally(**kwargs)
if not self.is_new():
self.save()
[docs]
def refresh(self) -> bool:
"""
Refresh the model with the latest data from the server.
Returns:
True if the model data changes, False on failure or if the data does not change.
Raises:
ResourceNotFoundError: If the model is not found on Paperless. (e.g. it was deleted remotely)
"""
if self.is_new():
raise ResourceNotFoundError("Model does not have an id, so cannot be refreshed. Save first.")
new_model = self._resource.get(self.id)
if self == new_model:
return False
self.update_locally(from_db=True, **new_model.to_dict())
return True
[docs]
def save(self, *, force: bool = False) -> bool:
return self.save_sync(force=force)
[docs]
def save_sync(self, *, force: bool = False) -> bool:
"""
Save this model instance synchronously.
Changes are sent to the server immediately, and the model is updated
when the server responds.
Returns:
True if the save was successful, False otherwise.
Raises:
ResourceNotFoundError: If the resource doesn't exist on the server
RequestError: If there's a communication error with the server
PermissionError: If the user doesn't have permission to update the resource
"""
if self.is_new():
model = self.create(**self.to_dict())
self.update_locally(from_db=True, **model.to_dict())
return True
if not force:
if self._status == ModelStatus.SAVING:
logger.warning("Model is already saving, skipping save")
return False
# Only start a save if there are changes
if not self.is_dirty():
logger.warning("Model is not dirty, skipping save")
return False
with StatusContext(self, ModelStatus.SAVING):
# Prepare and send the update to the server
current_data = self.to_dict(include_read_only=False, exclude_none=False, exclude_unset=True)
self._saved_data = {**current_data}
registry.emit(
"model.save:before",
"Fired before the model data is sent to paperless ngx to be saved.",
kwargs={"model": self, "current_data": current_data},
)
new_model = self._resource.update(self) # type: ignore # basedmypy complaining about self
if not new_model:
logger.warning(f"Result of save was none for model id {self.id}")
return False
if not isinstance(new_model, StandardModel):
# This should never happen
logger.error("Result of save was not a StandardModel instance")
return False
try:
# Update the model with the server response
new_data = new_model.to_dict()
self.update_locally(from_db=True, **new_data)
registry.emit(
"model.save:after",
"Fired after the model data is saved in paperless ngx.",
kwargs={"model": self, "updated_data": new_data},
)
except APIError as e:
logger.error(f"API error during save of {self}: {e}")
registry.emit(
"model.save:error",
"Fired when a network error occurs during save.",
kwargs={"model": self, "error": e},
)
except Exception as e:
# Log unexpected errors but don't swallow them
logger.exception(f"Unexpected error during save of {self}")
registry.emit(
"model.save:error",
"Fired when an unexpected error occurs during save.",
kwargs={"model": self, "error": e},
)
# Re-raise so the executor can handle it properly
raise
return True
[docs]
def save_async(self, *, force: bool = False) -> bool:
"""
Save this model instance asynchronously.
Changes are sent to the server in a background thread, and the model
is updated when the server responds.
Returns:
True if the save was successfully submitted async, False otherwise.
"""
if not force:
if self._status == ModelStatus.SAVING:
return False
# Only start a save if there are changes
if not self.is_dirty():
if hasattr(self, "_save_lock") and self._save_lock._is_owned(): # type: ignore # temporary TODO
self._save_lock.release()
return False
# If there's a pending save, skip saving until it finishes
if self._pending_save is not None and not self._pending_save.done():
return False
self._status = ModelStatus.SAVING
self._save_lock.acquire(timeout=30)
# Start a new save operation
executor = self.save_executor
future = executor.submit(self._perform_save_async)
self._pending_save = future
future.add_done_callback(self._handle_save_result_async)
return True
def _perform_save_async(self) -> Self | None:
"""
Perform the actual save operation.
Returns:
The updated model from the server or None if no save was needed.
Raises:
ResourceNotFoundError: If the resource doesn't exist on the server
RequestError: If there's a communication error with the server
PermissionError: If the user doesn't have permission to update the resource
"""
# Prepare and send the update to the server
current_data = self.to_dict(include_read_only=False, exclude_none=False, exclude_unset=True)
self._saved_data = {**current_data}
registry.emit(
"model.save:before",
"Fired before the model data is sent to paperless ngx to be saved.",
kwargs={"model": self, "current_data": current_data},
)
return self._resource.update(self)
def _handle_save_result_async(self, future: concurrent.futures.Future[Any]) -> bool:
"""
Handle the result of an asynchronous save operation.
Args:
future: The completed Future object containing the save result.
"""
try:
# Get the result with a timeout
new_model: Self = future.result(timeout=self._meta.save_timeout)
if not new_model:
logger.warning(f"Result of save was none for model id {self.id}")
return False
if not isinstance(new_model, StandardModel):
# This should never happen
logger.error("Result of save was not a StandardModel instance")
return False
# Update the model with the server response
new_data = new_model.to_dict()
# Use direct attribute setting instead of update_locally to avoid mocking issues
with StatusContext(self, ModelStatus.UPDATING):
for name, value in new_data.items():
if self.is_dirty("saved") and name in self.dirty_fields("saved"):
continue # Skip fields changed during save
setattr(self, name, value)
# Mark as from DB
self._original_data = self.model_dump()
registry.emit(
"model.save:after",
"Fired after the model data is saved in paperless ngx.",
kwargs={"model": self, "updated_data": new_data},
)
except concurrent.futures.TimeoutError:
logger.error(f"Save operation timed out for {self}")
registry.emit(
"model.save:error",
"Fired when a save operation times out.",
kwargs={"model": self, "error": "Timeout"},
)
except APIError as e:
logger.error(f"API error during save of {self}: {e}")
registry.emit(
"model.save:error",
"Fired when a network error occurs during save.",
kwargs={"model": self, "error": e},
)
except Exception as e:
# Log unexpected errors but don't swallow them
logger.exception(f"Unexpected error during save of {self}")
registry.emit(
"model.save:error",
"Fired when an unexpected error occurs during save.",
kwargs={"model": self, "error": e},
)
# Re-raise so the executor can handle it properly
raise
finally:
self._pending_save = None
try:
self._save_lock.release()
except RuntimeError:
logger.debug("Save lock already released")
self._status = ModelStatus.READY
# If the model was changed while the save was in progress,
# we need to save again
if self.is_dirty("saved"):
# Small delay to avoid hammering the server
time.sleep(0.1)
# Save, and reset unsaved data
self.save()
return True
[docs]
@override
def is_new(self) -> bool:
"""
Check if this model represents a new (unsaved) object.
Returns:
True if the model is new, False otherwise.
Examples:
# Check if a Document instance is new
is_new = doc.is_new()
"""
return self.id == 0
def _autosave(self) -> None:
# Skip autosave for:
# - New models (not yet saved)
# - When auto-save is disabled
if self.is_new() or self.should_save_on_write() is False or not self.is_dirty():
return
self.save()
[docs]
@override
def __setattr__(self, name: str, value: Any) -> None:
"""
Override attribute setting to automatically trigger async save.
Args:
name: Attribute name
value: New attribute value
"""
# Set the new value
super().__setattr__(name, value)
# Autosave logic below
if self._status != ModelStatus.READY:
return
# Skip autosave for private fields
if not name.startswith("_"):
self._autosave()
[docs]
@override
def __str__(self) -> str:
"""
Human-readable string representation.
Returns:
A string representation of the model.
"""
return f"{self._meta.name.capitalize()} #{self.id}"