Module smartapp.api.smartapp.context
Expand source code
from __future__ import annotations
import aiohttp
from typing import Generator
from smartapp.api import models, types, smartapp
from smartapp import redis, api, authentication
from smartapp import logger
log = logger.get()
KEY_PREFIX = 'smartapp-context-'
class AppContext(redis.Redis):
_instances = {}
_ctx = {}
new_app = None
key = KEY_PREFIX + 'none'
@classmethod
async def get(cls, app_id: str) -> type[smartapp.SmartApp]:
if app_id in cls._instances:
return cls._instances[app_id]
log.info("no app instance for app_id %s (found: %s)", app_id, cls._instances.keys())
app = cls.new_app()
app.ctx = cls(app_id, app)
app.load_routes()
return app
@classmethod
async def delete(cls, app: smartapp.SmartApp):
app_id = app.app_id
log.info("deleting app context for app_id %s", app_id)
try:
cls._instances.pop(app_id)
cls._ctx.pop(app_id)
except KeyError:
log.info("ctx wasn't loaded for app_id %s", app_id)
pass
return cls().delete(cls.key, app_id)
@classmethod
async def store(cls, app_id: str, ctx: types.AppCtx):
return cls().hset(cls.key, app_id, ctx.json(exclude_none=True))
@classmethod
def load(cls, app_id: str):
return types.AppCtx.parse_raw(cls().hget(cls.key, app_id))
@classmethod
def all(cls) -> Generator[smartapp.SmartApp]:
for app_id, instance in cls._instances:
yield instance.app
@classmethod
async def init(cls) -> None:
cls.key = KEY_PREFIX + cls.new_app().name
for app_id in cls().hkeys(cls.key):
app = await cls.get(app_id.decode())
await app.lifecycle_update(
models.smartapp.InstallData(
installedApp=models.smartapp.InstalledApp(
config={}
)
)
)
@classmethod
def ctx(cls, app: type[smartapp.SmartApp], ctx: types.AppCtx=None,
app_id: str=None) -> types.AppCtx:
if not app_id:
app_id = app.app_id
if not app_id:
raise ValueError('app_id')
if ctx:
log.info("updating context for app_id %s", app_id)
cls._ctx[app_id] = ctx
cls.store(app_id, ctx)
app.ctx.app_ctx = ctx
app.ctx.update_session()
if app_id not in cls._ctx:
log.info("no existing context for app_id %s", app_id)
hkeys = [hkey.decode() for hkey in cls().hkeys(cls.key)]
log.info("looking for match in stored contexts: %s", hkeys)
if app_id in hkeys:
log.info("loading stored context for app_id %s", app_id)
cls._ctx[app_id] = cls.load(app_id)
else:
log.info("creating a new context for app_id %s", app_id)
cls._ctx[app_id] = types.AppCtx(
app_id=app_id,
secret=authentication.AppAuth.gen_secret()
)
ctx = cls._ctx[app_id]
if not ctx.secret:
ctx.secret = authentication.AppAuth.gen_secret()
cls.store(app_id, ctx)
log.info("the api secret for app_id %s is %s", app_id, ctx.secret)
return ctx
@staticmethod
def new_session(token: str) -> type[aiohttp.ClientSession]:
return aiohttp.ClientSession(
headers = {'Authorization': 'Bearer {}'.format(token)}
)
def __init__(self, app_id: str=None, app: smartapp.SmartApp=None):
super().__init__()
if not app_id or not app:
return
self.__class__._instances[app_id] = app
self.app_ctx = self.__class__.ctx(app, app_id=app_id)
self.app = app
self._session = None
self._auth = None
def update_token(self, auth: models.AuthToken) -> type[AppContext]:
self.app_ctx.token = auth.access_token
self.app_ctx.refresh_token = auth.refresh_token
self.__class__.ctx(self.app, ctx=self.app_ctx)
def update_session(self):
if self._session:
log.info("terminating ClientSession for app_id %s", self.app_id)
api.AppTask(self._session.close)
log.info("instantiating new ClientSession for app_id %s", self.app_id)
self._session = self.__class__.new_session(self.token)
@property
def authentication(self):
if not self._auth:
self._auth = authentication.AppAuth(self.secret)
return self._auth
@property
def session(self) -> str:
if not self._session:
self.update_session()
return self._session
@property
def app_id(self) -> str:
return self.app_ctx.app_id
@property
def token(self) -> str:
return self.app_ctx.token
@property
def refresh_token(self) -> str:
return self.app_ctx.refresh_token
@property
def location_id(self) -> str:
return self.app_ctx.location_id
@property
def secret(self) -> str:
return self.app_ctx.secret
Classes
class AppContext (app_id: str = None, app: smartapp.SmartApp = None)
-
Implementation of the Redis protocol.
This abstract class provides a Python interface to all Redis commands and an implementation of the Redis protocol.
Pipelines derive from this, implementing how the commands are sent and received to the Redis server. Based on configuration, an instance will either use a ConnectionPool, or Connection object to talk to redis.
Initialize a new Redis client. To specify a retry policy for specific errors, first set
retry_on_error
to a list of the error/s to retry on, then setretry
to a validRetry
object. To retry on TimeoutError,retry_on_timeout
can also be set toTrue
.Expand source code
class AppContext(redis.Redis): _instances = {} _ctx = {} new_app = None key = KEY_PREFIX + 'none' @classmethod async def get(cls, app_id: str) -> type[smartapp.SmartApp]: if app_id in cls._instances: return cls._instances[app_id] log.info("no app instance for app_id %s (found: %s)", app_id, cls._instances.keys()) app = cls.new_app() app.ctx = cls(app_id, app) app.load_routes() return app @classmethod async def delete(cls, app: smartapp.SmartApp): app_id = app.app_id log.info("deleting app context for app_id %s", app_id) try: cls._instances.pop(app_id) cls._ctx.pop(app_id) except KeyError: log.info("ctx wasn't loaded for app_id %s", app_id) pass return cls().delete(cls.key, app_id) @classmethod async def store(cls, app_id: str, ctx: types.AppCtx): return cls().hset(cls.key, app_id, ctx.json(exclude_none=True)) @classmethod def load(cls, app_id: str): return types.AppCtx.parse_raw(cls().hget(cls.key, app_id)) @classmethod def all(cls) -> Generator[smartapp.SmartApp]: for app_id, instance in cls._instances: yield instance.app @classmethod async def init(cls) -> None: cls.key = KEY_PREFIX + cls.new_app().name for app_id in cls().hkeys(cls.key): app = await cls.get(app_id.decode()) await app.lifecycle_update( models.smartapp.InstallData( installedApp=models.smartapp.InstalledApp( config={} ) ) ) @classmethod def ctx(cls, app: type[smartapp.SmartApp], ctx: types.AppCtx=None, app_id: str=None) -> types.AppCtx: if not app_id: app_id = app.app_id if not app_id: raise ValueError('app_id') if ctx: log.info("updating context for app_id %s", app_id) cls._ctx[app_id] = ctx cls.store(app_id, ctx) app.ctx.app_ctx = ctx app.ctx.update_session() if app_id not in cls._ctx: log.info("no existing context for app_id %s", app_id) hkeys = [hkey.decode() for hkey in cls().hkeys(cls.key)] log.info("looking for match in stored contexts: %s", hkeys) if app_id in hkeys: log.info("loading stored context for app_id %s", app_id) cls._ctx[app_id] = cls.load(app_id) else: log.info("creating a new context for app_id %s", app_id) cls._ctx[app_id] = types.AppCtx( app_id=app_id, secret=authentication.AppAuth.gen_secret() ) ctx = cls._ctx[app_id] if not ctx.secret: ctx.secret = authentication.AppAuth.gen_secret() cls.store(app_id, ctx) log.info("the api secret for app_id %s is %s", app_id, ctx.secret) return ctx @staticmethod def new_session(token: str) -> type[aiohttp.ClientSession]: return aiohttp.ClientSession( headers = {'Authorization': 'Bearer {}'.format(token)} ) def __init__(self, app_id: str=None, app: smartapp.SmartApp=None): super().__init__() if not app_id or not app: return self.__class__._instances[app_id] = app self.app_ctx = self.__class__.ctx(app, app_id=app_id) self.app = app self._session = None self._auth = None def update_token(self, auth: models.AuthToken) -> type[AppContext]: self.app_ctx.token = auth.access_token self.app_ctx.refresh_token = auth.refresh_token self.__class__.ctx(self.app, ctx=self.app_ctx) def update_session(self): if self._session: log.info("terminating ClientSession for app_id %s", self.app_id) api.AppTask(self._session.close) log.info("instantiating new ClientSession for app_id %s", self.app_id) self._session = self.__class__.new_session(self.token) @property def authentication(self): if not self._auth: self._auth = authentication.AppAuth(self.secret) return self._auth @property def session(self) -> str: if not self._session: self.update_session() return self._session @property def app_id(self) -> str: return self.app_ctx.app_id @property def token(self) -> str: return self.app_ctx.token @property def refresh_token(self) -> str: return self.app_ctx.refresh_token @property def location_id(self) -> str: return self.app_ctx.location_id @property def secret(self) -> str: return self.app_ctx.secret
Ancestors
- smartapp.redis.redis.Redis
- redis.client.Redis
- redis.commands.redismodules.RedisModuleCommands
- redis.commands.core.CoreCommands
- redis.commands.core.ACLCommands
- redis.commands.core.ClusterCommands
- redis.commands.core.DataAccessCommands
- redis.commands.core.BasicKeyCommands
- redis.commands.core.HyperlogCommands
- redis.commands.core.HashCommands
- redis.commands.core.GeoCommands
- redis.commands.core.ListCommands
- redis.commands.core.ScanCommands
- redis.commands.core.SetCommands
- redis.commands.core.StreamCommands
- redis.commands.core.SortedSetCommands
- redis.commands.core.ManagementCommands
- redis.commands.core.ModuleCommands
- redis.commands.core.PubSubCommands
- redis.commands.core.ScriptCommands
- redis.commands.sentinel.SentinelCommands
Class variables
var key
var new_app
Static methods
def all() ‑> Generator[smartapp.SmartApp]
-
Expand source code
@classmethod def all(cls) -> Generator[smartapp.SmartApp]: for app_id, instance in cls._instances: yield instance.app
def ctx(app: type[smartapp.SmartApp], ctx: types.AppCtx = None, app_id: str = None) ‑> types.AppCtx
-
Expand source code
@classmethod def ctx(cls, app: type[smartapp.SmartApp], ctx: types.AppCtx=None, app_id: str=None) -> types.AppCtx: if not app_id: app_id = app.app_id if not app_id: raise ValueError('app_id') if ctx: log.info("updating context for app_id %s", app_id) cls._ctx[app_id] = ctx cls.store(app_id, ctx) app.ctx.app_ctx = ctx app.ctx.update_session() if app_id not in cls._ctx: log.info("no existing context for app_id %s", app_id) hkeys = [hkey.decode() for hkey in cls().hkeys(cls.key)] log.info("looking for match in stored contexts: %s", hkeys) if app_id in hkeys: log.info("loading stored context for app_id %s", app_id) cls._ctx[app_id] = cls.load(app_id) else: log.info("creating a new context for app_id %s", app_id) cls._ctx[app_id] = types.AppCtx( app_id=app_id, secret=authentication.AppAuth.gen_secret() ) ctx = cls._ctx[app_id] if not ctx.secret: ctx.secret = authentication.AppAuth.gen_secret() cls.store(app_id, ctx) log.info("the api secret for app_id %s is %s", app_id, ctx.secret) return ctx
async def delete(app: smartapp.SmartApp)
-
Delete one or more keys specified by
names
Expand source code
@classmethod async def delete(cls, app: smartapp.SmartApp): app_id = app.app_id log.info("deleting app context for app_id %s", app_id) try: cls._instances.pop(app_id) cls._ctx.pop(app_id) except KeyError: log.info("ctx wasn't loaded for app_id %s", app_id) pass return cls().delete(cls.key, app_id)
async def get(app_id: str) ‑> type[smartapp.SmartApp]
-
Return the value at key
name
, or None if the key doesn't existFor more information check https://redis.io/commands/get
Expand source code
@classmethod async def get(cls, app_id: str) -> type[smartapp.SmartApp]: if app_id in cls._instances: return cls._instances[app_id] log.info("no app instance for app_id %s (found: %s)", app_id, cls._instances.keys()) app = cls.new_app() app.ctx = cls(app_id, app) app.load_routes() return app
async def init() ‑> None
-
Expand source code
@classmethod async def init(cls) -> None: cls.key = KEY_PREFIX + cls.new_app().name for app_id in cls().hkeys(cls.key): app = await cls.get(app_id.decode()) await app.lifecycle_update( models.smartapp.InstallData( installedApp=models.smartapp.InstalledApp( config={} ) ) )
def load(app_id: str)
-
Expand source code
@classmethod def load(cls, app_id: str): return types.AppCtx.parse_raw(cls().hget(cls.key, app_id))
def new_session(token: str) ‑> type[aiohttp.ClientSession]
-
Expand source code
@staticmethod def new_session(token: str) -> type[aiohttp.ClientSession]: return aiohttp.ClientSession( headers = {'Authorization': 'Bearer {}'.format(token)} )
async def store(app_id: str, ctx: types.AppCtx)
-
Expand source code
@classmethod async def store(cls, app_id: str, ctx: types.AppCtx): return cls().hset(cls.key, app_id, ctx.json(exclude_none=True))
Instance variables
var app_id : str
-
Expand source code
@property def app_id(self) -> str: return self.app_ctx.app_id
var authentication
-
Expand source code
@property def authentication(self): if not self._auth: self._auth = authentication.AppAuth(self.secret) return self._auth
var location_id : str
-
Expand source code
@property def location_id(self) -> str: return self.app_ctx.location_id
var refresh_token : str
-
Expand source code
@property def refresh_token(self) -> str: return self.app_ctx.refresh_token
var secret : str
-
Expand source code
@property def secret(self) -> str: return self.app_ctx.secret
var session : str
-
Expand source code
@property def session(self) -> str: if not self._session: self.update_session() return self._session
var token : str
-
Expand source code
@property def token(self) -> str: return self.app_ctx.token
Methods
def update_session(self)
-
Expand source code
def update_session(self): if self._session: log.info("terminating ClientSession for app_id %s", self.app_id) api.AppTask(self._session.close) log.info("instantiating new ClientSession for app_id %s", self.app_id) self._session = self.__class__.new_session(self.token)
def update_token(self, auth: models.AuthToken) ‑> type[AppContext]
-
Expand source code
def update_token(self, auth: models.AuthToken) -> type[AppContext]: self.app_ctx.token = auth.access_token self.app_ctx.refresh_token = auth.refresh_token self.__class__.ctx(self.app, ctx=self.app_ctx)