# Project: ConfigGuard
# File: configguard/config.py
# Author: ParisNeo with Gemini 2.5
# Date: 2025-05-01 (Updated for nesting)
# Description: Main ConfigGuard class for managing application configurations,
# designed to be handler-agnostic, support versioning/migration,
# and handle nested configuration structures.
import copy
import typing
from collections.abc import Mapping, MutableMapping
from pathlib import Path
from packaging.version import InvalidVersion
from packaging.version import parse as parse_version
from .exceptions import (
EncryptionError,
HandlerError,
SchemaError,
SettingNotFoundError,
ValidationError,
)
from .handlers import get_handler # Factory function
from .handlers.base import LoadResult, StorageHandler
from .handlers.json_handler import JsonHandler # Used only in _load_schema_definition
from .log import log
from .schema import SUPPORTED_TYPES, SettingSchema
from .section import ConfigSection # Import the new ConfigSection class
from .setting import ConfigSetting
# Handle optional cryptography import
try:
from cryptography.fernet import Fernet
except ImportError:
Fernet = None # Define Fernet as None if cryptography is not installed
# --- Type Coercion Helper ---
def _try_coerce(
value: typing.Any, target_type: type, source_type_str: typing.Optional[str] = None
) -> typing.Any:
"""
Attempts basic coercion between compatible types (int, float, str, bool).
Args:
value: The value to coerce.
target_type: The target Python type (e.g., int, str, bool).
source_type_str: Optional string representation of the source type for logging.
Returns:
The coerced value if successful, otherwise the original value.
"""
if isinstance(value, target_type):
return value # Already correct type
original_value_repr = repr(value) # For logging
log.debug(
f"Attempting coercion for {original_value_repr} to {target_type.__name__} (source type approx: {source_type_str or type(value).__name__})"
)
# Bool coercion
if target_type is bool:
if isinstance(value, str):
val_lower = value.lower()
if val_lower == "true":
return True
if val_lower == "false":
return False
if isinstance(value, (int, float)):
if value == 1:
return True
if value == 0:
return False
log.warning(f"Could not coerce {original_value_repr} to bool.")
return value # Return original for validation to fail clearly
# Numeric/String coercion
if target_type in (int, float):
if isinstance(value, (int, float)): # Already numeric, just convert type
try:
return target_type(value)
except Exception:
pass # Should not fail, but be safe
elif isinstance(value, str):
try:
numeric_val = float(value) # Try float first
if target_type is int:
if numeric_val.is_integer():
return int(numeric_val)
else:
log.warning(
f"Cannot coerce string '{value}' to int (not an integer)."
)
return value # Return original string if float needed but int requested
else: # Target is float
return numeric_val
except ValueError:
log.warning(
f"Cannot coerce string '{value}' to numeric type {target_type.__name__}."
)
return value # Return original string if not numeric
# else: Fall through, return original value
elif target_type is str:
if isinstance(value, (int, float, bool)):
return str(value)
# else: Fall through for other types
elif target_type is list:
# TODO: Consider adding optional basic coercion (e.g., comma-separated string to list)
pass
log.debug(
f"No specific coercion rule applied for {original_value_repr} to {target_type.__name__}. Returning original."
)
return value
[docs]
class ConfigGuard(MutableMapping):
"""
Main class for managing application configurations. Agnostic of storage format.
Handles configuration schema definition (including nested sections), validation,
loading/saving via storage handlers, encryption, versioning, and basic migration.
Access settings via attribute (`config.setting` or `config.section.setting`) or
dictionary syntax (`config['setting']` or `config['section']['setting']`).
Access schema details via `config.sc_setting` or `config.sc_section.sc_setting`.
"""
VERSION_KEY = "__version__" # Schema key for version info
SECTION_TYPE_KEY = "section" # Type identifier for sections in schema
[docs]
def __init__(
self,
schema: typing.Union[dict, str, Path],
config_path: typing.Optional[typing.Union[str, Path]] = None,
encryption_key: typing.Optional[bytes] = None,
autosave: bool = False,
) -> None:
"""
Initializes ConfigGuard.
Args:
schema: The configuration schema definition for this instance.
Can be a dictionary or a path to a schema file (JSON expected).
Should contain a top-level '__version__' key (e.g., "1.0.0").
Can contain nested sections defined with `"type": "section"`.
config_path: Path to the configuration file for loading/saving values or full state.
encryption_key: Optional bytes key for encrypting/decrypting the config file via the handler.
autosave: If True, automatically save configuration values (mode='values')
whenever a setting is changed (including nested settings). Defaults to False.
Raises:
SchemaError: If the schema definition is invalid or contains an invalid version format.
EncryptionError: If `encryption_key` is provided but `cryptography` is not installed
or the key is invalid.
HandlerError: If `config_path` is provided but no suitable handler is found.
"""
log.info("Initializing ConfigGuard...")
# Internal storage now holds settings OR sections
self._settings: typing.Dict[str, typing.Union[ConfigSetting, ConfigSection]] = (
{}
)
self._raw_instance_schema: dict = self._load_schema_definition(schema)
# Determine and store instance version from the provided schema
try:
raw_version = self._raw_instance_schema.get(self.VERSION_KEY)
if raw_version is None:
log.warning(
f"Schema definition missing '{self.VERSION_KEY}'. Defaulting instance version to 0.0.0."
)
self.version: str = "0.0.0"
else:
self.version = str(raw_version)
parse_version(self.version) # Validate format
log.debug(f"ConfigGuard instance version set to: {self.version}")
except InvalidVersion:
log.error(
f"Invalid version format '{raw_version}' found in schema definition."
)
raise SchemaError(f"Invalid version format in schema: {raw_version}")
# Instance schema definition used internally (excludes the version key)
self._instance_schema_definition: dict = {
k: v for k, v in self._raw_instance_schema.items() if k != self.VERSION_KEY
}
self._config_path: typing.Optional[Path] = (
Path(config_path) if config_path else None
)
self._handler: typing.Optional[StorageHandler] = None
self._fernet: typing.Optional[Fernet] = None # Store Fernet instance if used
self._autosave: bool = autosave
self.loaded_file_version: typing.Optional[str] = (
None # Track version loaded from file
)
# Initialize encryption if key provided
if encryption_key:
if Fernet is None:
log.error(
"Encryption requires 'cryptography'. Please install it: pip install cryptography"
)
raise EncryptionError(
"Cryptography library not found, but encryption key provided."
)
try:
self._fernet = Fernet(encryption_key)
log.info("Encryption enabled (Fernet instance created).")
except Exception as e:
log.error(f"Failed to initialize encryption with provided key: {e}")
raise EncryptionError(
f"Invalid encryption key or Fernet setup failed: {e}"
) from e
# Get handler instance, passing fernet instance to it
if self._config_path:
try:
self._handler = get_handler(self._config_path, fernet=self._fernet)
log.info(
f"Initialized handler '{self._handler.__class__.__name__}' for path: {self._config_path}"
)
except HandlerError as e:
log.warning(
f"{e}. Configuration loading/saving might be disabled for this path."
)
# Build internal settings and sections based on the instance schema (recursive)
self._build_internal_structure_from_schema(
self._instance_schema_definition, self._settings, self
)
# Load initial config if path and handler are valid
if self._config_path and self._handler:
try:
self.load() # Initial load attempt
except FileNotFoundError:
log.warning(
f"Configuration file {self._config_path} not found. Initializing with defaults."
)
except (HandlerError, EncryptionError, ValidationError, SchemaError) as e:
log.error(
f"Failed to load initial configuration from {self._config_path}: {e}. Continuing with defaults where applicable."
)
except Exception as e:
log.error(
f"Unexpected error loading initial configuration from {self._config_path}: {e}",
exc_info=True,
)
else:
log.info(
"No valid config_path/handler setup, or file not found. Initializing with default values."
)
log.info(
f"ConfigGuard initialized successfully (Instance Version: {self.version})."
)
# --- Autosave Trigger Method ---
def _trigger_autosave(self, setting_name: str) -> None:
"""Internal helper to trigger autosave if enabled and possible."""
if self._autosave:
log.debug(
f"Autosaving configuration (values) due to change in '{setting_name}'..."
)
if self._handler and self._config_path:
try:
self.save(mode="values")
except (HandlerError, EncryptionError) as e:
log.error(f"Autosave failed: {e}")
except Exception as e:
log.error(f"Unexpected error during autosave: {e}", exc_info=True)
else:
log.warning(
f"Autosave for '{setting_name}' skipped: No valid handler or config_path."
)
def _load_schema_definition(
self, schema_input: typing.Union[dict, str, Path]
) -> dict:
"""
Loads the raw schema definition from a dictionary or JSON file.
Args:
schema_input: Dictionary or path to the schema definition file (JSON expected).
Returns:
The raw schema dictionary, including the version key if present.
Raises:
SchemaError: If loading fails or the format is invalid.
TypeError: If schema_input is not a dict, str, or Path.
"""
if isinstance(schema_input, dict):
log.debug("Loading schema from dictionary.")
return copy.deepcopy(schema_input) # Return a copy
elif isinstance(schema_input, (str, Path)):
schema_path = Path(schema_input)
log.debug(f"Loading schema definition from file: {schema_path}")
if not schema_path.exists():
raise SchemaError(f"Schema file not found: {schema_path}")
if schema_path.suffix.lower() != ".json":
raise SchemaError(
f"Schema file must be a JSON file (.json extension). Found: {schema_path.suffix}"
)
try:
temp_json_handler = JsonHandler(fernet=None)
load_result = temp_json_handler.load(schema_path)
raw_schema = load_result["values"]
if not isinstance(raw_schema, dict):
raise SchemaError(
f"Schema file {schema_path} does not contain a valid JSON object at the root."
)
log.info(f"Successfully loaded schema definition from {schema_path}")
return raw_schema
except (HandlerError, FileNotFoundError) as e:
raise SchemaError(
f"Failed to load schema definition from {schema_path}: {e}"
) from e
except Exception as e:
raise SchemaError(
f"Unexpected error loading schema from file {schema_path}: {e}"
) from e
else:
raise TypeError(
"Schema input must be a dictionary or a file path (str or Path)."
)
def _build_internal_structure_from_schema(
self,
schema_definition: dict,
target_container: typing.Dict[str, typing.Union[ConfigSetting, ConfigSection]],
parent: typing.Union["ConfigGuard", ConfigSection],
) -> None:
"""
Recursively parses the schema definition and creates ConfigSetting or ConfigSection objects.
Args:
schema_definition: The part of the schema to process (a dictionary).
target_container: The dictionary (e.g., self._settings or section._settings)
to populate with created objects.
parent: The parent object (either the ConfigGuard instance or the parent ConfigSection)
needed for context like autosave triggering.
"""
log.debug(
f"Building structure for schema level with keys: {list(schema_definition.keys())}"
)
for name, definition in schema_definition.items():
if not isinstance(definition, dict):
raise SchemaError(
f"Invalid schema format for '{name}'. Definition must be a dictionary."
)
item_type_str = definition.get("type")
try:
if item_type_str == self.SECTION_TYPE_KEY:
# It's a section
log.debug(f"Creating ConfigSection for '{name}'...")
nested_schema = definition.get("schema")
if not isinstance(nested_schema, dict):
raise SchemaError(
f"Section '{name}' must contain a 'schema' dictionary."
)
section = ConfigSection(
name=name, schema_definition=nested_schema, parent=parent
)
target_container[name] = section
# Recursively build the section's internal structure
self._build_internal_structure_from_schema(
nested_schema, section._settings, section
)
elif item_type_str in SUPPORTED_TYPES:
# It's a setting
log.debug(f"Creating ConfigSetting for '{name}'...")
schema = SettingSchema(name, definition)
# Pass parent reference for autosave triggering
setting = ConfigSetting(schema, parent=parent)
target_container[name] = setting
else:
# Invalid type
valid_types = list(SUPPORTED_TYPES.keys()) + [self.SECTION_TYPE_KEY]
raise SchemaError(
f"Invalid or missing 'type' ('{item_type_str}') for '{name}'. Must be one of {valid_types}."
)
except SchemaError as e:
log.error(f"Schema error processing '{name}': {e}")
raise # Propagate schema errors during initialization
except Exception as e:
log.error(f"Unexpected error processing schema item '{name}': {e}")
raise SchemaError(
f"Unexpected error building structure for '{name}'."
) from e
log.debug("Finished building structure level.")
[docs]
def load(self, filepath: typing.Optional[typing.Union[str, Path]] = None) -> None:
"""
Loads configuration using the configured handler. Handles versioning, migration, and nesting.
"""
load_path = Path(filepath) if filepath else self._config_path
current_handler = self._handler
if not load_path:
raise HandlerError("No configuration file path specified for loading.")
if not current_handler:
raise HandlerError(
f"No valid handler available for the configured path: {self._config_path}"
)
log.info(
f"Attempting to load configuration from: {load_path} using {current_handler.__class__.__name__}"
)
try:
loaded_data: LoadResult = current_handler.load(load_path)
loaded_version_str = loaded_data.get("version")
loaded_schema_dict = loaded_data.get("schema")
loaded_values_dict = loaded_data.get("values")
if loaded_values_dict is None or not isinstance(
loaded_values_dict, Mapping
):
log.error(
f"Handler did not return a valid dictionary/mapping for 'values' from {load_path}"
)
raise HandlerError(f"Invalid 'values' data loaded from {load_path}")
self.loaded_file_version = loaded_version_str
log.info(
f"File loaded. Version in file: {loaded_version_str or 'N/A'}. Instance version: {self.version}."
)
# Version Comparison and Migration Logic
allow_migration = False
load_mode_desc = "values only or legacy"
if loaded_version_str is not None:
load_mode_desc = f"full state (v{loaded_version_str})"
try:
loaded_ver = parse_version(loaded_version_str)
instance_ver = parse_version(self.version)
if loaded_ver > instance_ver:
log.error(
f"Loaded config version ({loaded_ver}) is NEWER than instance version ({instance_ver})."
)
raise SchemaError(
f"Cannot load configuration: File version {loaded_ver} is newer than instance version {instance_ver}."
)
elif loaded_ver < instance_ver:
log.warning(
f"Loaded config version ({loaded_ver}) is OLDER than instance version ({instance_ver}). Enabling migration logic."
)
allow_migration = True
else:
log.info(f"Configuration versions match ({instance_ver}).")
if (
loaded_schema_dict
and loaded_schema_dict != self._instance_schema_definition
):
log.warning(
"Loaded schema definition differs from instance schema definition, but versions match. Using instance schema for validation."
)
except InvalidVersion as e:
log.error(
f"Invalid version format found in loaded file ('{loaded_version_str}'): {e}"
)
raise SchemaError(
f"Invalid version format in loaded file: {loaded_version_str}"
) from e
else:
log.warning(
"Loaded configuration file has no version information. Applying values directly."
)
# Apply values using the recursive helper method
log.info(f"Applying loaded values from '{load_mode_desc}' file...")
# Pass ignore_unknown=False because load should respect the schema strictly
# (Migration handles skipping old keys, other unknowns are errors)
self._apply_and_migrate_values(
current_container=self._settings,
current_schema_level=self._instance_schema_definition,
loaded_values_level=loaded_values_dict,
loaded_schema_level=loaded_schema_dict,
allow_migration=allow_migration,
ignore_unknown=False, # Load is strict by default
path_prefix="",
)
except FileNotFoundError:
log.warning(f"Load failed: File not found at {load_path}")
raise
except (
HandlerError,
EncryptionError,
ValidationError,
SchemaError,
SettingNotFoundError,
) as e: # Added SettingNotFoundError
log.error(f"Failed to load/process configuration from {load_path}: {e}")
raise
except Exception as e:
log.error(
f"An unexpected error occurred during loading from {load_path}: {e}",
exc_info=True,
)
raise HandlerError(f"Unexpected error loading configuration: {e}") from e
def _apply_and_migrate_values(
self,
current_container: typing.Dict[str, typing.Union[ConfigSetting, ConfigSection]],
current_schema_level: dict,
loaded_values_level: Mapping,
loaded_schema_level: typing.Optional[Mapping],
allow_migration: bool,
ignore_unknown: bool, # Added flag
path_prefix: str = "",
) -> None:
"""
Recursively applies loaded values, handling schema differences, coercion, migration, and unknown keys.
Args:
current_container: The dict (_settings) of the current level.
current_schema_level: The schema definition dict for the current level.
loaded_values_level: Dictionary of values loaded from the file for the current level.
loaded_schema_level: Schema loaded from the file (if any) for the current level.
allow_migration: True if loaded version is older than instance version.
ignore_unknown: If False, raise SettingNotFoundError for keys in loaded_values_level
not defined in current_schema_level (unless allow_migration is True).
path_prefix: String prefix for logging the path of the setting/section.
"""
applied_count = 0
skipped_validation = 0
skipped_migration = 0
coercion_warnings = 0
processed_keys = set()
log.debug(
f"Applying values at level '{path_prefix or 'root'}' (Migration: {allow_migration}, IgnoreUnknown: {ignore_unknown}, Loaded Schema: {loaded_schema_level is not None})..."
)
# Iterate through the settings/sections defined in the *current instance* structure
for name, current_item in current_container.items():
full_path = f"{path_prefix}{name}"
if isinstance(current_item, ConfigSetting):
current_schema = current_item.schema
if name in loaded_values_level:
processed_keys.add(name)
loaded_value = loaded_values_level[name]
value_to_validate = loaded_value
source_type_str = None
# Type Coercion Logic
if loaded_schema_level and name in loaded_schema_level:
loaded_item_schema_info = loaded_schema_level.get(name, {})
if isinstance(loaded_item_schema_info, dict):
loaded_type_str = loaded_item_schema_info.get("type")
source_type_str = loaded_type_str
if (
loaded_type_str
and loaded_type_str != current_schema.type_str
):
log.warning(
f"Type mismatch for '{full_path}': Instance expects '{current_schema.type_str}', file had '{loaded_type_str}'. Attempting coercion..."
)
coerced_value = _try_coerce(
loaded_value,
current_schema.type,
source_type_str=loaded_type_str,
)
if coerced_value is not loaded_value:
log.info(
f"Coerced '{full_path}' value from {type(loaded_value).__name__} to {type(coerced_value).__name__}."
)
value_to_validate = coerced_value
else:
log.warning(
f"Coercion from '{loaded_type_str}' to '{current_schema.type_str}' did not succeed for '{full_path}' (value: {loaded_value!r}). Proceeding with original value."
)
coercion_warnings += 1
else:
log.debug(
f"No type information found in loaded schema for '{full_path}'. Skipping coercion."
)
# Validation against *instance* schema
try:
current_item.value = value_to_validate
applied_count += 1
log.debug(
f"Successfully applied value for '{full_path}': {current_item.value!r}"
)
except ValidationError as e:
skipped_validation += 1
log.warning(
f"Validation failed for '{full_path}' with value '{value_to_validate!r}' (original loaded: '{loaded_value!r}'): {e}. RESETTING to instance default."
)
try:
current_item.value = current_schema.default_value
except ValidationError as e_default:
log.error(
f"CRITICAL: Default value for '{full_path}' is invalid: {e_default}."
)
else:
log.debug(
f"'{full_path}' not found in loaded values. Using instance default: {current_item.value!r}"
)
elif isinstance(current_item, ConfigSection):
processed_keys.add(name)
nested_loaded_values = loaded_values_level.get(name)
nested_loaded_schema = (
loaded_schema_level.get(name, {}).get("schema")
if loaded_schema_level
and isinstance(loaded_schema_level.get(name), dict)
else None
)
if isinstance(nested_loaded_values, Mapping):
log.debug(f"Recursing into section '{full_path}'...")
self._apply_and_migrate_values(
current_container=current_item._settings,
current_schema_level=current_schema_level[name].get(
"schema", {}
),
loaded_values_level=nested_loaded_values,
loaded_schema_level=nested_loaded_schema,
allow_migration=allow_migration,
ignore_unknown=ignore_unknown, # Pass flag down
path_prefix=f"{full_path}.",
)
elif name in loaded_values_level:
log.warning(
f"Expected dictionary/mapping for section '{full_path}' in loaded values, but got {type(nested_loaded_values).__name__}. Skipping section."
)
else:
log.debug(
f"Section '{full_path}' not found in loaded values. Using instance defaults."
)
# Check for keys in loaded_values that are not in the instance structure at this level
unknown_or_migrated_keys = set(loaded_values_level.keys()) - processed_keys
skipped_unknown = 0
for key in unknown_or_migrated_keys:
full_unknown_path = f"{path_prefix}{key}"
if allow_migration:
skipped_migration += 1
log.warning(
f"Migration: Item '{full_unknown_path}' (value: {loaded_values_level[key]!r}) loaded from older version is not present in current instance version ({self.version}). Skipping."
)
elif (
not ignore_unknown
): # Raise error if not ignoring unknowns and not migrating
log.error(
f"Unknown item '{full_unknown_path}' found in input data and ignore_unknown=False."
)
raise SettingNotFoundError(
f"Unknown setting/section encountered: '{full_unknown_path}'"
)
else: # Ignore unknown
skipped_unknown += 1
log.warning(
f"Item '{full_unknown_path}' found in loaded file/data but not defined in current instance schema level. Ignoring (ignore_unknown=True)."
)
# Log summary only if counts are non-zero for clarity
summary_parts = []
if applied_count:
summary_parts.append(f"Applied: {applied_count}")
if skipped_validation:
summary_parts.append(f"Skipped (validation): {skipped_validation}")
if skipped_migration:
summary_parts.append(f"Skipped (migration): {skipped_migration}")
if skipped_unknown:
summary_parts.append(f"Skipped (unknown): {skipped_unknown}")
if coercion_warnings:
summary_parts.append(f"Coercion Warnings: {coercion_warnings}")
if summary_parts:
log.info(
f"Value application finished for level '{path_prefix or 'root'}'. "
+ ", ".join(summary_parts)
+ "."
)
else:
log.debug(
f"Value application finished for level '{path_prefix or 'root'}'. No changes or issues."
)
[docs]
def save(
self,
filepath: typing.Optional[typing.Union[str, Path]] = None,
mode: str = "values",
) -> None:
"""
Saves the configuration using the configured handler and specified mode. Handles nesting.
Args:
filepath: Optional path to save to. Overrides the instance's config_path.
mode: Specifies what to save ('values' or 'full').
Raises:
HandlerError: If saving fails (no path, no handler, serialization, encryption).
EncryptionError: If encryption specifically fails.
ValueError: If an invalid `mode` is provided.
"""
save_path = Path(filepath) if filepath else self._config_path
current_handler = self._handler
if not save_path:
raise HandlerError("No configuration file path specified for saving.")
if not current_handler:
raise HandlerError(
f"No valid handler available for the configured path: {self._config_path}"
)
if mode not in ["values", "full"]:
raise ValueError(
f"Invalid save mode: '{mode}'. Must be 'values' or 'full'."
)
log.info(
f"Saving configuration to: {save_path} using {current_handler.__class__.__name__} (mode: {mode})"
)
try:
# Prepare data payload using recursive methods
config_values = self.get_config_dict() # Recursive method
schema_definition = (
self.get_instance_schema_definition()
) # Already handles nesting
data_payload = {
"instance_version": self.version,
"schema_definition": schema_definition,
"config_values": config_values,
}
# Call handler's save method
current_handler.save(save_path, data=data_payload, mode=mode)
except (HandlerError, EncryptionError, ValueError) as e:
log.error(f"Failed to save configuration to {save_path} (mode={mode}): {e}")
raise
except Exception as e:
log.error(
f"An unexpected error occurred during saving to {save_path} (mode={mode}): {e}",
exc_info=True,
)
raise HandlerError(f"Unexpected error saving configuration: {e}") from e
# --- Public Data Access Methods (Updated for Nesting) ---
[docs]
def get_instance_schema_definition(self) -> dict:
"""Returns the schema definition used by this ConfigGuard instance (excludes version key)."""
# self._instance_schema_definition is already the potentially nested schema
return copy.deepcopy(self._instance_schema_definition)
[docs]
def get_config_dict(self) -> dict:
"""Returns the current configuration values as a potentially nested dictionary."""
config_dict = {}
for name, item in self._settings.items():
if isinstance(item, ConfigSetting):
config_dict[name] = item.value
elif isinstance(item, ConfigSection):
# Recursively get the dictionary for the section
config_dict[name] = item.get_config_dict()
return config_dict
[docs]
def export_schema_with_values(self) -> dict:
"""
Exports the *current* state (instance schema + values, including nested structures)
for external use (e.g., UI).
Returns:
A dictionary containing 'version', 'schema', and 'settings'.
The 'settings' dict maps names to {'schema': ..., 'value': ...}, where
'value' can be a nested dictionary for sections.
"""
log.debug("Recursively exporting current instance schema with values...")
settings_export = {}
# Use internal recursive helper
self._export_level(self._settings, settings_export)
full_export = {
"version": self.version,
"schema": self.get_instance_schema_definition(), # Full nested schema
"settings": settings_export, # Structured settings/values
}
log.debug(f"Generated export for instance version {self.version}.")
return full_export
def _export_level(self, container: dict, export_target: dict) -> None:
"""Recursive helper for export_schema_with_values."""
for name, item in container.items():
if isinstance(item, ConfigSetting):
export_target[name] = {
"schema": item.schema.to_dict(),
"value": item.value, # Export the actual value
}
elif isinstance(item, ConfigSection):
# Get the section's schema and recursively get its values dict
nested_values_dict = item.get_config_dict() # Get nested values
export_target[name] = {
"schema": item.get_schema_dict(), # Section's schema part
"value": nested_values_dict, # Export the nested dictionary of values
}
[docs]
def import_config(self, data: Mapping, ignore_unknown: bool = True) -> None:
"""
Imports configuration *values* from a potentially nested dictionary,
validating against the instance schema.
Args:
data: A dictionary (potentially nested) of {setting_name: value} or {section_name: {sub_setting: value}}.
ignore_unknown: If True (default), ignores keys/sections in `data` not present in the instance schema.
If False, raises SettingNotFoundError for unknown keys/sections.
"""
if not isinstance(data, Mapping):
raise TypeError(
f"Input data for import_config must be a dictionary or mapping, got {type(data).__name__}"
)
log.info(
f"Importing configuration values from dictionary ({len(data)} top-level items)..."
)
try:
# Use the recursive apply method, passing the ignore_unknown flag
self._apply_and_migrate_values(
current_container=self._settings,
current_schema_level=self._instance_schema_definition,
loaded_values_level=data,
loaded_schema_level=None,
allow_migration=False,
ignore_unknown=ignore_unknown, # Pass the flag
path_prefix="",
)
# No need for separate top-level unknown check, handled recursively
except ValidationError as e:
log.error(
f"Validation errors occurred during import. See previous warnings. Last error detail: {e}"
)
except SettingNotFoundError as e:
# This will be raised by _apply_and_migrate_values if ignore_unknown=False
log.error(f"Import failed due to unknown setting/section: {e}")
raise
except Exception as e:
log.error(f"Unexpected error during dictionary import: {e}", exc_info=True)
raise HandlerError(f"Unexpected error during dictionary import: {e}") from e
log.info("Dictionary import finished.")
# --- Magic methods (Updated for Nesting) ---
[docs]
def __getattr__(self, name: str) -> typing.Any:
"""Allows accessing settings/sections like attributes (e.g., config.port, config.section)."""
if name.startswith("_"): # Allow internal attribute access
return super().__getattribute__(name)
is_schema_access = name.startswith("sc_")
actual_name = name[3:] if is_schema_access else name
if actual_name in self._settings:
item = self._settings[actual_name]
if is_schema_access:
if isinstance(item, ConfigSetting):
return item.schema
elif isinstance(item, ConfigSection):
# Return the section's schema definition dictionary
return item.get_schema_dict()
elif isinstance(item, ConfigSetting):
return item.value
elif isinstance(item, ConfigSection):
# Return the section object itself for further attribute access
return item
else:
try:
# Check for actual methods/attributes like 'version', 'load', 'save' etc.
return super().__getattribute__(name)
except AttributeError:
prefix = (
"schema attribute"
if is_schema_access
else "attribute or setting/section"
)
raise AttributeError(
f"'{type(self).__name__}' object has no {prefix} '{name}'"
) from None
[docs]
def __setattr__(self, name: str, value: typing.Any) -> None:
"""Allows setting top-level settings like attributes (e.g., config.port = 8080)."""
# Prevent setting schema attributes directly
if name.startswith("sc_"):
raise AttributeError(
"Cannot set schema attributes directly (use 'config.sc_name' to access)."
)
# Handle internal attributes
known_internals = {
"_settings",
"_raw_instance_schema",
"_instance_schema_definition",
"version",
"_config_path",
"_handler",
"_fernet",
"_autosave",
"loaded_file_version",
}
if name.startswith("_") or name in known_internals or hasattr(type(self), name):
super().__setattr__(name, value)
# Handle setting top-level ConfigSettings
elif name in self._settings and isinstance(self._settings[name], ConfigSetting):
setting = self._settings[name]
try:
# Delegate to ConfigSetting's setter (handles validation, coercion, autosave trigger)
setting.value = value
# Autosave is triggered within ConfigSetting's setter via parent reference
except ValidationError as e:
raise e # Propagate validation errors
# Handle attempting to set a ConfigSection via attribute (Disallow direct assignment)
elif name in self._settings and isinstance(self._settings[name], ConfigSection):
raise AttributeError(
f"Cannot assign directly to section '{name}'. Modify settings within the section (e.g., config.{name}.setting = value)."
)
else:
# Block setting arbitrary (non-internal, non-setting, non-section) attributes
raise AttributeError(
f"Cannot set attribute '{name}'. It's not a defined top-level setting or internal attribute."
)
[docs]
def __getitem__(self, key: str) -> typing.Any:
"""Allows accessing settings/sections like dictionary items (e.g., config['port'], config['section'])."""
is_schema_access = key.startswith("sc_")
actual_key = key[3:] if is_schema_access else key
if actual_key in self._settings:
item = self._settings[actual_key]
if is_schema_access:
if isinstance(item, ConfigSetting):
return item.schema
elif isinstance(item, ConfigSection):
return item.get_schema_dict() # Return schema dict for section
elif isinstance(item, ConfigSetting):
return item.value
elif isinstance(item, ConfigSection):
# Return the section object itself for further item access config['section']['setting']
return item
else:
prefix = (
"Schema for setting/section"
if is_schema_access
else "Setting or section"
)
raise SettingNotFoundError(f"{prefix} '{actual_key}' not found.")
[docs]
def __setitem__(self, key: str, value: typing.Any) -> None:
"""Allows setting top-level settings like dictionary items (e.g., config['port'] = 8080)."""
if key.startswith("sc_"):
raise KeyError(
"Cannot set schema items directly (use config['sc_name'] to access)."
)
if key in self._settings:
item = self._settings[key]
if isinstance(item, ConfigSetting):
try:
# Delegate to ConfigSetting's setter (handles validation, coercion, autosave trigger)
item.value = value
except ValidationError as e:
raise e # Propagate validation errors
elif isinstance(item, ConfigSection):
raise TypeError(
f"Cannot assign directly to section '{key}'. Modify settings within the section (e.g., config['{key}']['setting'] = value)."
)
else:
raise SettingNotFoundError(
f"Setting '{key}' not found. Cannot set undefined top-level settings."
)
[docs]
def __delitem__(self, key: str) -> None:
"""Prevent deleting settings or sections."""
raise TypeError("Deleting configuration settings or sections is not supported.")
[docs]
def __iter__(self) -> typing.Iterator[str]:
"""Iterates over the names of the top-level defined settings and sections."""
return iter(self._settings.keys())
[docs]
def __len__(self) -> int:
"""Returns the number of top-level defined settings and sections."""
return len(self._settings)
[docs]
def __contains__(self, key: object) -> bool:
"""Checks if a top-level setting or section name exists."""
return isinstance(key, str) and key in self._settings
[docs]
def __repr__(self) -> str:
"""Returns a developer-friendly representation of the ConfigGuard instance."""
path_str = f"'{self._config_path}'" if self._config_path else "None"
handler_name = self._handler.__class__.__name__ if self._handler else "None"
encrypted_str = ", encrypted" if self._fernet else ""
num_items = len(self._settings)
item_types = {"settings": 0, "sections": 0}
for item in self._settings.values():
if isinstance(item, ConfigSetting):
item_types["settings"] += 1
elif isinstance(item, ConfigSection):
item_types["sections"] += 1
return (
f"ConfigGuard(version='{self.version}', config_path={path_str}, "
f"handler='{handler_name}', top_level_items={num_items} "
f"(settings={item_types['settings']}, sections={item_types['sections']})"
f"{encrypted_str})"
)
# Ensure MutableMapping abstract methods are implemented if not covered
# keys(), items(), values() could be added for convenience if needed,
# iterating over top-level items.
# Example:
# def keys(self): return self._settings.keys()
# def items(self): return # Need careful implementation for sections
# def values(self): return # Need careful implementation for sections