| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- """Backup."""
- from __future__ import annotations
- import os
- import shutil
- import tempfile
- from time import sleep
- from typing import TYPE_CHECKING
- from .path import is_safe
- if TYPE_CHECKING:
- from ..base import HacsBase
- from ..repositories.base import HacsRepository
- DEFAULT_BACKUP_PATH = f"{tempfile.gettempdir()}/hacs_backup/"
- class Backup:
- """Backup."""
- def __init__(
- self,
- hacs: HacsBase,
- local_path: str | None = None,
- backup_path: str = DEFAULT_BACKUP_PATH,
- repository: HacsRepository | None = None,
- ) -> None:
- """initialize."""
- self.hacs = hacs
- self.repository = repository
- self.local_path = local_path or repository.content.path.local
- self.backup_path = backup_path
- if repository:
- self.backup_path = (
- tempfile.gettempdir()
- + f"/hacs_persistent_{repository.data.category}/"
- + repository.data.name
- )
- self.backup_path_full = f"{self.backup_path}{self.local_path.split('/')[-1]}"
- def _init_backup_dir(self) -> bool:
- """Init backup dir."""
- if not os.path.exists(self.local_path):
- return False
- if not is_safe(self.hacs, self.local_path):
- return False
- if os.path.exists(self.backup_path):
- shutil.rmtree(self.backup_path)
- # Wait for the folder to be removed
- while os.path.exists(self.backup_path):
- sleep(0.1)
- os.makedirs(self.backup_path, exist_ok=True)
- return True
- def create(self) -> None:
- """Create a backup in /tmp"""
- if not self._init_backup_dir():
- return
- try:
- if os.path.isfile(self.local_path):
- shutil.copyfile(self.local_path, self.backup_path_full)
- os.remove(self.local_path)
- else:
- shutil.copytree(self.local_path, self.backup_path_full)
- shutil.rmtree(self.local_path)
- while os.path.exists(self.local_path):
- sleep(0.1)
- self.hacs.log.debug(
- "Backup for %s, created in %s",
- self.local_path,
- self.backup_path_full,
- )
- except BaseException as exception: # lgtm [py/catch-base-exception] pylint: disable=broad-except
- self.hacs.log.warning("Could not create backup: %s", exception)
- def restore(self) -> None:
- """Restore from backup."""
- if not os.path.exists(self.backup_path_full):
- return
- if os.path.isfile(self.backup_path_full):
- if os.path.exists(self.local_path):
- os.remove(self.local_path)
- shutil.copyfile(self.backup_path_full, self.local_path)
- else:
- if os.path.exists(self.local_path):
- shutil.rmtree(self.local_path)
- while os.path.exists(self.local_path):
- sleep(0.1)
- shutil.copytree(self.backup_path_full, self.local_path)
- self.hacs.log.debug("Restored %s, from backup %s", self.local_path, self.backup_path_full)
- def cleanup(self) -> None:
- """Cleanup backup files."""
- if not os.path.exists(self.backup_path):
- return
- shutil.rmtree(self.backup_path)
- # Wait for the folder to be removed
- while os.path.exists(self.backup_path):
- sleep(0.1)
- self.hacs.log.debug("Backup dir %s cleared", self.backup_path)
- class BackupNetDaemon(Backup):
- """BackupNetDaemon."""
- def create(self) -> None:
- """Create a backup in /tmp"""
- if not self._init_backup_dir():
- return
- for filename in os.listdir(self.repository.content.path.local):
- if not filename.endswith(".yaml"):
- continue
- source_file_name = f"{self.repository.content.path.local}/{filename}"
- target_file_name = f"{self.backup_path}/{filename}"
- shutil.copyfile(source_file_name, target_file_name)
- def restore(self) -> None:
- """Create a backup in /tmp"""
- if not os.path.exists(self.backup_path):
- return
- for filename in os.listdir(self.backup_path):
- if not filename.endswith(".yaml"):
- continue
- source_file_name = f"{self.backup_path}/{filename}"
- target_file_name = f"{self.repository.content.path.local}/{filename}"
- shutil.copyfile(source_file_name, target_file_name)
|