classEncryptedSession(SessionABC):"""Encrypted wrapper for Session implementations with TTL-based expiration. This class wraps any SessionABC implementation to provide transparent encryption/decryption of stored items using Fernet encryption with per-session key derivation and automatic expiration of old data. When items expire (exceed TTL), they are silently skipped during retrieval. Note: Expired tokens are rejected based on the system clock of the application server. To avoid valid tokens being rejected due to clock drift, ensure all servers in your environment are synchronized using NTP. """def__init__(self,session_id:str,underlying_session:SessionABC,encryption_key:str,ttl:int=600,):""" Args: session_id: ID for this session underlying_session: The real session store (e.g. SQLiteSession, SQLAlchemySession) encryption_key: Master key (Fernet key or raw secret) ttl: Token time-to-live in seconds (default 10 min) """self.session_id=session_idself.underlying_session=underlying_sessionself.ttl=ttlmaster=_ensure_fernet_key_bytes(encryption_key)self.cipher=_derive_session_fernet_key(master,session_id)self._kid="hkdf-v1"self._ver=1def__getattr__(self,name):returngetattr(self.underlying_session,name)def_wrap(self,item:TResponseInputItem)->EncryptedEnvelope:ifisinstance(item,dict):payload=itemelifhasattr(item,"model_dump"):payload=item.model_dump()elifhasattr(item,"__dict__"):payload=item.__dict__else:payload=dict(item)token=self.cipher.encrypt(_to_json_bytes(payload)).decode("utf-8")return{"__enc__":1,"v":self._ver,"kid":self._kid,"payload":token}def_unwrap(self,item:TResponseInputItem|EncryptedEnvelope)->TResponseInputItem|None:ifnot_is_encrypted_envelope(item):returncast(TResponseInputItem,item)try:token=item["payload"].encode("utf-8")plaintext=self.cipher.decrypt(token,ttl=self.ttl)returncast(TResponseInputItem,_from_json_bytes(plaintext))except(InvalidToken,KeyError):returnNoneasyncdefget_items(self,limit:int|None=None)->list[TResponseInputItem]:encrypted_items=awaitself.underlying_session.get_items(limit)valid_items:list[TResponseInputItem]=[]forencinencrypted_items:item=self._unwrap(enc)ifitemisnotNone:valid_items.append(item)returnvalid_itemsasyncdefadd_items(self,items:list[TResponseInputItem])->None:wrapped:list[EncryptedEnvelope]=[self._wrap(it)foritinitems]awaitself.underlying_session.add_items(cast(list[TResponseInputItem],wrapped))asyncdefpop_item(self)->TResponseInputItem|None:whileTrue:enc=awaitself.underlying_session.pop_item()ifnotenc:returnNoneitem=self._unwrap(enc)ifitemisnotNone:returnitemasyncdefclear_session(self)->None:awaitself.underlying_session.clear_session()