| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- """Support for Gardena mower."""
- import asyncio
- import logging
- from datetime import timedelta
- from homeassistant.core import callback
- from homeassistant.components.vacuum import (
- StateVacuumEntity,
- SUPPORT_BATTERY,
- SUPPORT_RETURN_HOME,
- SUPPORT_STATE,
- SUPPORT_STOP,
- SUPPORT_START,
- STATE_PAUSED,
- STATE_CLEANING,
- STATE_DOCKED,
- STATE_RETURNING,
- STATE_ERROR,
- ATTR_BATTERY_LEVEL,
- )
- from .const import (
- ATTR_ACTIVITY,
- ATTR_BATTERY_STATE,
- ATTR_NAME,
- ATTR_OPERATING_HOURS,
- ATTR_RF_LINK_LEVEL,
- ATTR_RF_LINK_STATE,
- ATTR_SERIAL,
- ATTR_LAST_ERROR,
- ATTR_ERROR,
- ATTR_STATE,
- CONF_MOWER_DURATION,
- DEFAULT_MOWER_DURATION,
- DOMAIN,
- GARDENA_LOCATION,
- )
- from .sensor import GardenaSensor
- _LOGGER = logging.getLogger(__name__)
- SCAN_INTERVAL = timedelta(minutes=1)
- SUPPORT_GARDENA = (
- SUPPORT_BATTERY | SUPPORT_RETURN_HOME | SUPPORT_STOP | SUPPORT_START | SUPPORT_STATE
- )
- async def async_setup_entry(hass, config_entry, async_add_entities):
- """Set up the Gardena smart mower system."""
- entities = []
- for mower in hass.data[DOMAIN][GARDENA_LOCATION].find_device_by_type("MOWER"):
- entities.append(GardenaSmartMower(hass, mower, config_entry.options))
- _LOGGER.debug("Adding mower as vacuums: %s", entities)
- async_add_entities(entities, True)
- class GardenaSmartMower(StateVacuumEntity):
- """Representation of a Gardena Connected Mower."""
- def __init__(self, hass, mower, options):
- """Initialize the Gardena Connected Mower."""
- self.hass = hass
- self._device = mower
- self._options = options
- self._name = "{}".format(self._device.name)
- self._unique_id = f"{self._device.serial}-mower"
- self._state = None
- self._error_message = ""
- async def async_added_to_hass(self):
- """Subscribe to events."""
- self._device.add_callback(self.update_callback)
- @property
- def should_poll(self) -> bool:
- """No polling needed for a vacuum."""
- return False
- def update_callback(self, device):
- """Call update for Home Assistant when the device is updated."""
- self.schedule_update_ha_state(True)
- async def async_update(self):
- """Update the states of Gardena devices."""
- _LOGGER.debug("Running Gardena update")
- # Managing state
- state = self._device.state
- _LOGGER.debug("Mower has state %s", state)
- if state in ["WARNING", "ERROR", "UNAVAILABLE"]:
- _LOGGER.debug("Mower has an error")
- self._state = STATE_ERROR
- self._error_message = self._device.last_error_code
- else:
- _LOGGER.debug("Getting mower state")
- activity = self._device.activity
- _LOGGER.debug("Mower has activity %s", activity)
- if activity == "PAUSED":
- self._state = STATE_PAUSED
- elif activity in [
- "OK_CUTTING",
- "OK_CUTTING_TIMER_OVERRIDDEN",
- "OK_LEAVING",
- ]:
- self._state = STATE_CLEANING
- elif activity == "OK_SEARCHING":
- self._state = STATE_RETURNING
- elif activity in [
- "OK_CHARGING",
- "PARKED_TIMER",
- "PARKED_PARK_SELECTED",
- "PARKED_AUTOTIMER",
- ]:
- self._state = STATE_DOCKED
- elif activity == "NONE":
- self._state = None
- _LOGGER.debug("Mower has no activity")
- @property
- def name(self):
- """Return the name of the device."""
- return self._device.name
- @property
- def supported_features(self):
- """Flag lawn mower robot features that are supported."""
- return SUPPORT_GARDENA
- @property
- def battery_level(self):
- """Return the battery level of the lawn mower."""
- return self._device.battery_level
- @property
- def state(self):
- """Return the status of the lawn mower."""
- return self._state
- @property
- def available(self):
- """Return True if the device is available."""
- return self._device.state != "UNAVAILABLE"
- def error(self):
- """Return the error message."""
- if self._state == STATE_ERROR:
- return self._error_message
- return ""
- @property
- def extra_state_attributes(self):
- """Return the state attributes of the lawn mower."""
- return {
- ATTR_ACTIVITY: self._device.activity,
- ATTR_BATTERY_LEVEL: self._device.battery_level,
- ATTR_BATTERY_STATE: self._device.battery_state,
- ATTR_RF_LINK_LEVEL: self._device.rf_link_level,
- ATTR_RF_LINK_STATE: self._device.rf_link_state,
- ATTR_OPERATING_HOURS: self._device.operating_hours,
- ATTR_LAST_ERROR: self._device.last_error_code,
- ATTR_ERROR: "NONE" if self._device.activity != "NONE" else self._device.last_error_code,
- ATTR_STATE: self._device.activity if self._device.activity != "NONE" else self._device.last_error_code
- }
- @property
- def option_mower_duration(self) -> int:
- return self._options.get(CONF_MOWER_DURATION, DEFAULT_MOWER_DURATION)
- def start(self):
- """Start the mower using Gardena API command START_SECONDS_TO_OVERRIDE. Duration is read from integration options."""
- duration = self.option_mower_duration * 60
- _LOGGER.debug("Mower command: vacuum.start => START_SECONDS_TO_OVERRIDE, %s", duration)
- return asyncio.run_coroutine_threadsafe(
- self._device.start_seconds_to_override(duration), self.hass.loop
- ).result()
- def stop(self, **kwargs):
- """Stop the mower using Gardena API command PARK_UNTIL_FURTHER_NOTICE."""
- _LOGGER.debug("Mower command: vacuum.stop => PARK_UNTIL_FURTHER_NOTICE")
- asyncio.run_coroutine_threadsafe(
- self._device.park_until_further_notice(), self.hass.loop
- ).result()
- def turn_on(self, **kwargs):
- """Start the mower using Gardena API command START_DONT_OVERRIDE."""
- _LOGGER.debug("Mower command: vacuum.turn_on => START_DONT_OVERRIDE")
- asyncio.run_coroutine_threadsafe(
- self._device.start_dont_override(), self.hass.loop
- ).result()
- def turn_off(self, **kwargs):
- """Stop the mower using Gardena API command PARK_UNTIL_FURTHER_NOTICE."""
- _LOGGER.debug("Mower command: vacuum.turn_off => PARK_UNTIL_FURTHER_NOTICE")
- asyncio.run_coroutine_threadsafe(
- self._device.park_until_further_notice(), self.hass.loop
- ).result()
- def return_to_base(self, **kwargs):
- """Stop the mower using Gardena API command PARK_UNTIL_NEXT_TASK."""
- _LOGGER.debug("Mower command: vacuum.return_to_base => PARK_UNTIL_NEXT_TASK")
- asyncio.run_coroutine_threadsafe(
- self._device.park_until_next_task(), self.hass.loop
- ).result()
- @property
- def unique_id(self) -> str:
- """Return a unique ID."""
- return self._unique_id
- @property
- def device_info(self):
- return {
- "identifiers": {
- # Serial numbers are unique identifiers within a specific domain
- (DOMAIN, self._device.serial)
- },
- "name": self._device.name,
- "manufacturer": "Gardena",
- "model": self._device.model_type,
- }
|