Module smartapp.controllers.smartapp
Expand source code
import pydantic
import traceback
from urllib import parse
from typing import Dict
from smartapp import api
from smartapp.api import models, http, types
from smartapp import logger
log = logger.get()
app_ctx = None
class SmartApp(http.RESTClient):
"""SmartApp controller"""
def __init__(self, token=None):
api = self.__class__.config.get('api')
if not token:
token = api.get('token')
super().__init__(api['host'], api['base'], token=token)
def get_ctx(self, evt: models.LifecycleBase) -> types.AppCtx:
return types.AppCtx(
token=evt.authToken,
refresh_token=evt.refreshToken,
location_id=evt.installedApp.locationId,
app_id=evt.installedApp.installedAppId
)
async def handle_confirmation(self, lifecycle: models.AllLifecycles
) -> models.LifecycleResponse:
evt = lifecycle.confirmationData
url = parse.urlparse(evt.confirmationUrl)
client = http.RESTClient(url.netloc, url.path, '', token=self.access_token)
try:
return models.LifecycleResponse.parse_raw(
await client.do('GET', '?{}'.format(url.query))
)
except pydantic.ValidationError as e:
log.error(traceback.format_exception(type(e), e, None))
async def handle_configuration(self, lifecycle: models.AllLifecycles
) -> models.LifecycleResponse:
evt = lifecycle.configurationData
app = await app_ctx.get(evt.installedAppId)
log.info("configuration: phase: {}".format(evt.phase))
if evt.phase == models.smartapp.Phase.initialize:
return models.LifecycleResponse(
configurationData=app.initialize()
)
return models.LifecycleResponse(
configurationData=app.pageId(evt.pageId)
)
async def handle_install(self, lifecycle: models.AllLifecycles
) -> models.LifecycleResponse:
evt = lifecycle.installData
app = await app_ctx.get(evt.installedApp.installedAppId)
app_ctx.ctx(app, self.get_ctx(evt))
handler = getattr(app, 'lifecycle_install')
if handler:
api.AppTask(handler, evt)
return models.LifecycleResponse(
installData={}
)
async def handle_uninstall(self, lifecycle: models.AllLifecycles
) -> models.LifecycleResponse:
evt = lifecycle.uninstallData
app = await app_ctx.get(evt.installedApp.installedAppId)
handler = getattr(app, 'lifecycle_install')
if handler:
api.AppTask(handler, evt)
app = await app_ctx.delete(evt.installedApp.installedAppId)
return models.LifecycleResponse(
uninstallData={}
)
async def handle_update(self, lifecycle: models.AllLifecycles
) -> models.LifecycleResponse:
evt = lifecycle.updateData
app = await app_ctx.get(evt.installedApp.installedAppId)
app_ctx.ctx(app, self.get_ctx(evt))
handler = getattr(app, 'lifecycle_update')
if handler:
api.AppTask(handler, evt)
return models.LifecycleResponse(
updateData={}
)
async def handle_event(self, lifecycle: models.AllLifecycles
) -> models.LifecycleResponse:
evt = lifecycle.eventData
app = await app_ctx.get(evt.installedApp.installedAppId)
handler = getattr(app, 'lifecycle_event')
if handler:
api.AppTask(handler, evt)
return models.LifecycleResponse(
eventData={}
)
async def handle_lifecycle(self, lifecycle: models.AllLifecycles
) -> models.LifecycleResponse:
base = models.LifecycleBase.parse_obj(lifecycle)
log.info("lifecycle: evt: %s", base.json())
handler = "handle_{}".format(lifecycle.lifecycle.lower())
handler = getattr(self, handler)
if handler:
return await handler(lifecycle)
Classes
class SmartApp (token=None)
-
SmartApp controller
Expand source code
class SmartApp(http.RESTClient): """SmartApp controller""" def __init__(self, token=None): api = self.__class__.config.get('api') if not token: token = api.get('token') super().__init__(api['host'], api['base'], token=token) def get_ctx(self, evt: models.LifecycleBase) -> types.AppCtx: return types.AppCtx( token=evt.authToken, refresh_token=evt.refreshToken, location_id=evt.installedApp.locationId, app_id=evt.installedApp.installedAppId ) async def handle_confirmation(self, lifecycle: models.AllLifecycles ) -> models.LifecycleResponse: evt = lifecycle.confirmationData url = parse.urlparse(evt.confirmationUrl) client = http.RESTClient(url.netloc, url.path, '', token=self.access_token) try: return models.LifecycleResponse.parse_raw( await client.do('GET', '?{}'.format(url.query)) ) except pydantic.ValidationError as e: log.error(traceback.format_exception(type(e), e, None)) async def handle_configuration(self, lifecycle: models.AllLifecycles ) -> models.LifecycleResponse: evt = lifecycle.configurationData app = await app_ctx.get(evt.installedAppId) log.info("configuration: phase: {}".format(evt.phase)) if evt.phase == models.smartapp.Phase.initialize: return models.LifecycleResponse( configurationData=app.initialize() ) return models.LifecycleResponse( configurationData=app.pageId(evt.pageId) ) async def handle_install(self, lifecycle: models.AllLifecycles ) -> models.LifecycleResponse: evt = lifecycle.installData app = await app_ctx.get(evt.installedApp.installedAppId) app_ctx.ctx(app, self.get_ctx(evt)) handler = getattr(app, 'lifecycle_install') if handler: api.AppTask(handler, evt) return models.LifecycleResponse( installData={} ) async def handle_uninstall(self, lifecycle: models.AllLifecycles ) -> models.LifecycleResponse: evt = lifecycle.uninstallData app = await app_ctx.get(evt.installedApp.installedAppId) handler = getattr(app, 'lifecycle_install') if handler: api.AppTask(handler, evt) app = await app_ctx.delete(evt.installedApp.installedAppId) return models.LifecycleResponse( uninstallData={} ) async def handle_update(self, lifecycle: models.AllLifecycles ) -> models.LifecycleResponse: evt = lifecycle.updateData app = await app_ctx.get(evt.installedApp.installedAppId) app_ctx.ctx(app, self.get_ctx(evt)) handler = getattr(app, 'lifecycle_update') if handler: api.AppTask(handler, evt) return models.LifecycleResponse( updateData={} ) async def handle_event(self, lifecycle: models.AllLifecycles ) -> models.LifecycleResponse: evt = lifecycle.eventData app = await app_ctx.get(evt.installedApp.installedAppId) handler = getattr(app, 'lifecycle_event') if handler: api.AppTask(handler, evt) return models.LifecycleResponse( eventData={} ) async def handle_lifecycle(self, lifecycle: models.AllLifecycles ) -> models.LifecycleResponse: base = models.LifecycleBase.parse_obj(lifecycle) log.info("lifecycle: evt: %s", base.json()) handler = "handle_{}".format(lifecycle.lifecycle.lower()) handler = getattr(self, handler) if handler: return await handler(lifecycle)
Ancestors
- smartapp.api.http.RESTClient
Methods
def get_ctx(self, evt: LifecycleBase) ‑> AppCtx
-
Expand source code
def get_ctx(self, evt: models.LifecycleBase) -> types.AppCtx: return types.AppCtx( token=evt.authToken, refresh_token=evt.refreshToken, location_id=evt.installedApp.locationId, app_id=evt.installedApp.installedAppId )
async def handle_configuration(self, lifecycle: AllLifecycles) ‑> LifecycleResponse
-
Expand source code
async def handle_configuration(self, lifecycle: models.AllLifecycles ) -> models.LifecycleResponse: evt = lifecycle.configurationData app = await app_ctx.get(evt.installedAppId) log.info("configuration: phase: {}".format(evt.phase)) if evt.phase == models.smartapp.Phase.initialize: return models.LifecycleResponse( configurationData=app.initialize() ) return models.LifecycleResponse( configurationData=app.pageId(evt.pageId) )
async def handle_confirmation(self, lifecycle: AllLifecycles) ‑> LifecycleResponse
-
Expand source code
async def handle_confirmation(self, lifecycle: models.AllLifecycles ) -> models.LifecycleResponse: evt = lifecycle.confirmationData url = parse.urlparse(evt.confirmationUrl) client = http.RESTClient(url.netloc, url.path, '', token=self.access_token) try: return models.LifecycleResponse.parse_raw( await client.do('GET', '?{}'.format(url.query)) ) except pydantic.ValidationError as e: log.error(traceback.format_exception(type(e), e, None))
async def handle_event(self, lifecycle: AllLifecycles) ‑> LifecycleResponse
-
Expand source code
async def handle_event(self, lifecycle: models.AllLifecycles ) -> models.LifecycleResponse: evt = lifecycle.eventData app = await app_ctx.get(evt.installedApp.installedAppId) handler = getattr(app, 'lifecycle_event') if handler: api.AppTask(handler, evt) return models.LifecycleResponse( eventData={} )
async def handle_install(self, lifecycle: AllLifecycles) ‑> LifecycleResponse
-
Expand source code
async def handle_install(self, lifecycle: models.AllLifecycles ) -> models.LifecycleResponse: evt = lifecycle.installData app = await app_ctx.get(evt.installedApp.installedAppId) app_ctx.ctx(app, self.get_ctx(evt)) handler = getattr(app, 'lifecycle_install') if handler: api.AppTask(handler, evt) return models.LifecycleResponse( installData={} )
async def handle_lifecycle(self, lifecycle: AllLifecycles) ‑> LifecycleResponse
-
Expand source code
async def handle_lifecycle(self, lifecycle: models.AllLifecycles ) -> models.LifecycleResponse: base = models.LifecycleBase.parse_obj(lifecycle) log.info("lifecycle: evt: %s", base.json()) handler = "handle_{}".format(lifecycle.lifecycle.lower()) handler = getattr(self, handler) if handler: return await handler(lifecycle)
async def handle_uninstall(self, lifecycle: AllLifecycles) ‑> LifecycleResponse
-
Expand source code
async def handle_uninstall(self, lifecycle: models.AllLifecycles ) -> models.LifecycleResponse: evt = lifecycle.uninstallData app = await app_ctx.get(evt.installedApp.installedAppId) handler = getattr(app, 'lifecycle_install') if handler: api.AppTask(handler, evt) app = await app_ctx.delete(evt.installedApp.installedAppId) return models.LifecycleResponse( uninstallData={} )
async def handle_update(self, lifecycle: AllLifecycles) ‑> LifecycleResponse
-
Expand source code
async def handle_update(self, lifecycle: models.AllLifecycles ) -> models.LifecycleResponse: evt = lifecycle.updateData app = await app_ctx.get(evt.installedApp.installedAppId) app_ctx.ctx(app, self.get_ctx(evt)) handler = getattr(app, 'lifecycle_update') if handler: api.AppTask(handler, evt) return models.LifecycleResponse( updateData={} )