| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 |
- """The QueueManager class."""
- from __future__ import annotations
- import asyncio
- import time
- from typing import Coroutine
- from homeassistant.core import HomeAssistant
- from ..exceptions import HacsExecutionStillInProgress
- from .logger import LOGGER
- _LOGGER = LOGGER
- class QueueManager:
- """The QueueManager class."""
- def __init__(self, hass: HomeAssistant) -> None:
- self.hass = hass
- self.queue: list[Coroutine] = []
- self.running = False
- @property
- def pending_tasks(self) -> int:
- """Return a count of pending tasks in the queue."""
- return len(self.queue)
- @property
- def has_pending_tasks(self) -> bool:
- """Return a count of pending tasks in the queue."""
- return self.pending_tasks != 0
- def clear(self) -> None:
- """Clear the queue."""
- self.queue = []
- def add(self, task: Coroutine) -> None:
- """Add a task to the queue."""
- self.queue.append(task)
- async def execute(self, number_of_tasks: int | None = None) -> None:
- """Execute the tasks in the queue."""
- if self.running:
- _LOGGER.debug("<QueueManager> Execution is already running")
- raise HacsExecutionStillInProgress
- if len(self.queue) == 0:
- _LOGGER.debug("<QueueManager> The queue is empty")
- return
- self.running = True
- _LOGGER.debug("<QueueManager> Checking out tasks to execute")
- local_queue = []
- if number_of_tasks:
- for task in self.queue[:number_of_tasks]:
- local_queue.append(task)
- else:
- for task in self.queue:
- local_queue.append(task)
- for task in local_queue:
- self.queue.remove(task)
- _LOGGER.debug("<QueueManager> Starting queue execution for %s tasks", len(local_queue))
- start = time.time()
- result = await asyncio.gather(*local_queue, return_exceptions=True)
- for entry in result:
- if isinstance(entry, Exception):
- _LOGGER.error("<QueueManager> %s", entry)
- end = time.time() - start
- _LOGGER.debug(
- "<QueueManager> Queue execution finished for %s tasks finished in %.2f seconds",
- len(local_queue),
- end,
- )
- if self.has_pending_tasks:
- _LOGGER.debug("<QueueManager> %s tasks remaining in the queue", len(self.queue))
- self.running = False
|