queue_manager.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. """The QueueManager class."""
  2. from __future__ import annotations
  3. import asyncio
  4. import time
  5. from typing import Coroutine
  6. from homeassistant.core import HomeAssistant
  7. from ..exceptions import HacsExecutionStillInProgress
  8. from .logger import LOGGER
  9. _LOGGER = LOGGER
  10. class QueueManager:
  11. """The QueueManager class."""
  12. def __init__(self, hass: HomeAssistant) -> None:
  13. self.hass = hass
  14. self.queue: list[Coroutine] = []
  15. self.running = False
  16. @property
  17. def pending_tasks(self) -> int:
  18. """Return a count of pending tasks in the queue."""
  19. return len(self.queue)
  20. @property
  21. def has_pending_tasks(self) -> bool:
  22. """Return a count of pending tasks in the queue."""
  23. return self.pending_tasks != 0
  24. def clear(self) -> None:
  25. """Clear the queue."""
  26. self.queue = []
  27. def add(self, task: Coroutine) -> None:
  28. """Add a task to the queue."""
  29. self.queue.append(task)
  30. async def execute(self, number_of_tasks: int | None = None) -> None:
  31. """Execute the tasks in the queue."""
  32. if self.running:
  33. _LOGGER.debug("<QueueManager> Execution is already running")
  34. raise HacsExecutionStillInProgress
  35. if len(self.queue) == 0:
  36. _LOGGER.debug("<QueueManager> The queue is empty")
  37. return
  38. self.running = True
  39. _LOGGER.debug("<QueueManager> Checking out tasks to execute")
  40. local_queue = []
  41. if number_of_tasks:
  42. for task in self.queue[:number_of_tasks]:
  43. local_queue.append(task)
  44. else:
  45. for task in self.queue:
  46. local_queue.append(task)
  47. for task in local_queue:
  48. self.queue.remove(task)
  49. _LOGGER.debug("<QueueManager> Starting queue execution for %s tasks", len(local_queue))
  50. start = time.time()
  51. result = await asyncio.gather(*local_queue, return_exceptions=True)
  52. for entry in result:
  53. if isinstance(entry, Exception):
  54. _LOGGER.error("<QueueManager> %s", entry)
  55. end = time.time() - start
  56. _LOGGER.debug(
  57. "<QueueManager> Queue execution finished for %s tasks finished in %.2f seconds",
  58. len(local_queue),
  59. end,
  60. )
  61. if self.has_pending_tasks:
  62. _LOGGER.debug("<QueueManager> %s tasks remaining in the queue", len(self.queue))
  63. self.running = False