/
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
defence360agent
/
internals
/
Upload File
HOME
import asyncio class DeadlockError(Exception): """Error raised if DeadlockDetectingLock detects deadlock""" class DeadlockDetectingLock: """ Lock that detects deadlock when it is about to be acquired by the same task that already holds it. """ def __init__(self): self._lock = asyncio.Lock() self._owner = None def locked(self): return self._lock.locked() async def __aenter__(self): curr_task = asyncio.current_task() if self._owner == curr_task: raise DeadlockError() await self._lock.acquire() self._owner = curr_task return self async def __aexit__(self, exc_type, exc, tb): self._owner = None self._lock.release()