| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- import logging
- import os
- import voluptuous as vol
- from homeassistant.components import websocket_api
- from atomicwrites import AtomicWriter
- DOMAIN = 'config_editor'
- _LOGGER = logging.getLogger(__name__)
- async def async_setup(hass, config):
- hass.components.websocket_api.async_register_command(websocket_create)
- hass.states.async_set(DOMAIN+".version", 4)
- return True
- @websocket_api.require_admin
- @websocket_api.async_response
- @websocket_api.websocket_command(
- {
- vol.Required("type"): DOMAIN+"/ws",
- vol.Required("action"): str,
- vol.Required("file"): str,
- vol.Required("data"): str,
- vol.Required("ext"): str,
- vol.Optional("depth", default=2): int
- }
- )
- async def websocket_create(hass, connection, msg):
- action = msg["action"]
- ext = msg["ext"]
- if ext not in ["yaml","py","json","conf","js","txt","log","css","all"]:
- ext = "yaml"
- def extok(e):
- if len(e)<2:
- return False
- return ( ext == 'all' or e.endswith("."+ext) )
- def rec(p, q):
- r = [
- f for f in os.listdir(p) if os.path.isfile(os.path.join(p, f)) and
- extok(f)
- ]
- for j in r:
- p = j if q == '' else os.path.join(q, j)
- listyaml.append(p)
- def drec(r, s):
- for d in os.listdir(r):
- v = os.path.join(r, d)
- if os.path.isdir(v):
- p = d if s == '' else os.path.join(s, d)
- if(p.count(os.sep) < msg["depth"]) and ( ext == 'all' or p != 'custom_components' ):
- rec(v, p)
- drec(v, p)
- yamlname = msg["file"].replace("../", "/").strip('/')
- if not extok(msg["file"]):
- yamlname = "temptest."+ext
-
- fullpath = hass.config.path(yamlname)
- if (action == 'load'):
- _LOGGER.info('Loading '+fullpath)
- content = ''
- res = 'Loaded'
- try:
- with open(fullpath, encoding="utf-8") as fdesc:
- content = fdesc.read()
- except:
- res = 'Reading Failed'
- _LOGGER.exception("Reading failed: %s", fullpath)
- finally:
- connection.send_result(
- msg["id"],
- {'msg': res+': '+fullpath, 'file': yamlname, 'data': content, 'ext': ext}
- )
- elif (action == 'save'):
- _LOGGER.info('Saving '+fullpath)
- content = msg["data"]
- res = "Saved"
- try:
- dirnm = os.path.dirname(fullpath)
- if not os.path.isdir(dirnm):
- os.makedirs(dirnm, exist_ok=True)
- try:
- mode = os.stat(fullpath).st_mode
- except:
- mode = 0o666
- with AtomicWriter(fullpath, overwrite=True).open() as fdesc:
- fdesc.write(content)
- with open(fullpath, 'a') as fdesc:
- try:
- os.fchmod(fdesc.fileno(), mode)
- except:
- pass
- except:
- res = "Saving Failed"
- _LOGGER.exception(res+": %s", fullpath)
- finally:
- connection.send_result(
- msg["id"],
- {'msg': res+': '+fullpath}
- )
- elif (action == 'list'):
- dirnm = os.path.dirname(hass.config.path(yamlname))
- listyaml = []
- rec(dirnm, '')
- if msg["depth"]>0:
- drec(dirnm, '')
- if (len(listyaml) < 1):
- listyaml = ['list_error.'+ext]
- connection.send_result(
- msg["id"],
- {'msg': str(len(listyaml))+' File(s)', 'file': listyaml, 'ext': ext}
- )
|