# Plugin system
The plugin-system are powerfull and should cover most use-cases.
TIP
If you want to peak at code that drives the plugin-system, it's on github or just expand it here
plugin.py
import os
import re
import sys
import json
import inspect
import asyncio
import logging
import pkgutil
from collections import defaultdict
from importlib import import_module
from typing import Dict, Any, List, Callable
from fastapi import FastAPI, APIRouter
from opa.utils import unique, filter_dict_to_function
from opa import config
class BasePlugin:
pass
class Driver(BasePlugin):
name: str
instance: Any = None
opts: Dict[str, Any]
pm: 'PluginManager'
def __init__(self, opts=None, load='auto'):
self.opts = opts or {}
self.load = load
def _pre_connection_check(self, connectionstatus, load):
if connectionstatus == False: # if no host found
if self.load == 'yes':
raise Exception(
f'Connect pre-check failed for {self.name}, as if the host is not there? Options {self.opts}'
)
else:
return False
if hasattr(self, 'validate'):
return True
def initialize(self):
connectionstatus = self.connect()
if self._pre_connection_check(connectionstatus, self.load):
self.validate()
async def initialize_async(self):
connectionstatus = await self.connect()
if self._pre_connection_check(connectionstatus, self.load):
await self.validate()
def get_instance(self):
return self.instance
class HookDefinition(BasePlugin):
required: bool = False
is_async: bool = False
_used: bool = False
class Hook(BasePlugin):
name: str
order: int = 0
class Setup(BasePlugin):
...
def get_defined_plugins(mod, plugin_types=None):
plugin_types = plugin_types or ['hook-definitions', 'hooks', 'drivers', 'setup']
returndata = defaultdict(list)
for name, obj in inspect.getmembers(mod, inspect.isclass):
if mod.__name__ != obj.__module__:
# We don't want to load plugins/modules if they are imported, ie 'from ... import AnotherPlugin'
continue
if obj is Hook or obj is Driver or obj is Setup:
continue
if issubclass(obj, Hook) and 'hooks' in plugin_types:
returndata['hooks'].append(obj)
elif issubclass(obj, HookDefinition) and 'hook-definitions' in plugin_types:
returndata['hook-definitions'].append(obj)
elif issubclass(obj, Driver) and 'drivers' in plugin_types:
returndata['drivers'].append(obj)
elif issubclass(obj, Setup) and 'setup' in plugin_types:
returndata['setup'].append(obj)
return returndata
class PluginManager:
status: Dict[str, Dict]
drivers: Dict[str, Driver]
optional_components: Dict[str, Driver]
hooks: Dict[str, List[Hook]]
hook_definitions: Dict[str, Hook]
store: Dict[str, Any]
def __init__(self):
self.status = defaultdict(dict)
self.drivers = {}
self.optional_components = {}
self.hooks = defaultdict(list)
self.hook_definitions = {}
self.store = {'task_candidates': []}
def post_hook(self):
final_hooks: Dict[str, Hook] = {}
for hook_name, hooks in self.hooks.items():
definition = self.hook_definitions.get(hook_name)
try:
hook_to_use = sorted(hooks, key=lambda x: x.order)[-1]
except IndexError:
continue
if definition:
definition._used = True
final_hooks[hook_name] = hook_to_use
should_be_used = [
name
for name, definition in self.hook_definitions.items()
if all([definition.required, not definition._used])
]
if should_be_used:
raise Exception(f'Hooks that should be registered: {should_be_used}')
self.hooks = final_hooks
def register_driver(self, driver: Driver):
name = driver.name.lower()
if name in self.drivers:
if self.drivers[name] is driver:
# This might happen example if we import a driver-class inside a plugin-file.
# It should be valid, example if you need it for typing.
return None
raise Exception(
f'Driver with this name ({name}) already exists. CaSe of driver is ignored.'
)
logging.debug(f'Registered driver {name}')
self.drivers[name] = driver
def _preload_drivers(self):
for name, values in config.OPTIONAL_COMPONENTS.items():
name = name.lower()
load = values.get('LOAD', 'auto')
if load == 'no':
continue
drivername = values.get('DRIVER')
try:
driver = self.drivers[drivername]
except KeyError:
raise Exception(
f'Invalid driver specified ({drivername}), no way to handle it'
)
driverinstance = driver(opts=values.get('OPTS', {}), load=load)
driverinstance.pm = self
self.optional_components[name] = driverinstance
logging.info(
f'Connecting to {name} with driver {drivername}, using {driverinstance.opts}'
)
yield driverinstance
async def load_components(self):
for driverinstance in self._preload_drivers():
if asyncio.iscoroutinefunction(driverinstance.connect):
await driverinstance.initialize_async()
else:
driverinstance.initialize()
def load_sync_components_global(self):
for driverinstance in self._preload_drivers():
if asyncio.iscoroutinefunction(driverinstance.connect):
logging.debug(f'Driver {driverinstance.name} is async, wont load')
else:
driverinstance.initialize()
def register_hook_definition(self, obj):
try:
name = obj.name
except AttributeError:
name = obj.__name__
if name in self.hook_definitions:
raise Exception(
f'There can only be 1 hook-definition per hook-name. "{name}" is already registered'
)
self.hook_definitions[name] = obj
def register_hook(self, obj):
try:
name = obj.name
except AttributeError:
name = obj.__name__
try:
hook_definition = self.hook_definitions[name]
except KeyError:
raise Exception(
f'There are no hook-definition for hook "{name}", are you using the correct name?'
)
if hook_definition.is_async:
if not asyncio.iscoroutinefunction(obj.run):
raise Exception(
f'Hook-definition is marked as async but the function is not ({obj.run}).'
)
else:
if asyncio.iscoroutinefunction(obj.run):
raise Exception(
f'Hook-definition is not marked as async, but is.. Mark it as async or make it sync: {obj.run}'
)
self.hooks[name].append(obj())
def run_setup(self, obj, params):
params = filter_dict_to_function(params, obj.__init__)
name = f'{obj.__module__}.{obj.__name__}'
self.status[name]['init'] = obj(**params)
def call(self, name, *args, **kwargs):
func = self.hooks[name].run
if asyncio.iscoroutinefunction(func):
raise Exception(
f'The hook function ({func}) is async and should not be called using non-async calls. Call it using "await call_async()"'
)
return func(*args, **kwargs)
async def call_async(self, name, *args, **kwargs):
func = self.hooks[name].run
if not asyncio.iscoroutinefunction(func):
raise Exception(
f'The hook function ({func}) is not async call it using "call(..)"'
)
return await self.hooks[name].run(*args, **kwargs)
def _get_plugindata():
"""
Plugins are imported from multiple paths with these rules:
* First with a unique name wins
* There are multiple matchers, that ALL must return true. They return true if they are NOT set, or if they match "$plugin_path / $plugin_name"
* PLUGIN_WHITELIST_RE (regex)
* PLUGIN_WHITELIST_LIST
* PLUGIN_WHITELIST_TAGS
* not in PLUGIN_BLACKLIST_LIST
* not in PLUGIN_BLACKLIST_RE
* not in PLUGIN_BLACKLIST_TAGS
"""
PLUGIN_WHITELIST_RE = re.compile(config.PLUGIN_WHITELIST_RE)
PLUGIN_BLACKLIST_RE = re.compile(config.PLUGIN_BLACKLIST_RE)
PLUGIN_WHITELIST_TAGS = set(config.PLUGIN_WHITELIST_TAGS)
PLUGIN_BLACKLIST_TAGS = set(config.PLUGIN_BLACKLIST_TAGS)
PLUGIN_PATHS = unique(
list([config.PLUGIN_PATHS])
if isinstance(config.PLUGIN_PATHS, str)
else config.PLUGIN_PATHS
) + ['/data/opa/plugins']
logging.info(
'Plugin loading settings:'
f' plugin-paths: {PLUGIN_PATHS}\n'
f' whitelist-regex: {PLUGIN_WHITELIST_RE}\n'
f' whitelist-list: {config.PLUGIN_WHITELIST_LIST}\n'
f' whitelist-tags: {config.PLUGIN_WHITELIST_TAGS}\n'
f' blacklist-list: {config.PLUGIN_BLACKLIST_LIST}\n'
f' blacklist-regex: {PLUGIN_BLACKLIST_RE}\n'
f' blacklist-tags: {PLUGIN_BLACKLIST_TAGS}\n'
)
sys_paths = sys.path + PLUGIN_PATHS
sys.path = unique(sys_paths)
plugins_to_load = defaultdict(list)
task_candidates = []
routers = []
for plugin in pkgutil.iter_modules(PLUGIN_PATHS):
allow_match = os.path.join(plugin.module_finder.path, plugin.name)
tasks_candidate = False
if plugin.ispkg:
metafile = os.path.join(allow_match, 'meta.json')
if os.path.exists(os.path.join(allow_match, 'tasks.py')):
tasks_candidate = True
else:
metafile = f'{allow_match}-meta.json'
logging.debug('')
logging.debug(f'Checking if we should load "{allow_match}"')
if os.path.exists(metafile):
logging.debug(f'Found metafile @ {metafile}')
metadata = json.load(open(metafile, 'r'))
else:
logging.debug(f'Metafile @ {metafile} does not exist, using empty metadata')
metadata = {}
logging.debug(f'Metadata: {metadata}')
load_checks = {}
if config.PLUGIN_WHITELIST_LIST:
load_checks['PLUGIN_WHITELIST_LIST'] = (
allow_match in config.PLUGIN_WHITELIST_LIST
)
if PLUGIN_WHITELIST_RE.pattern:
load_checks['PLUGIN_WHITELIST_RE'] = bool(
PLUGIN_WHITELIST_RE.match(allow_match)
)
if PLUGIN_WHITELIST_TAGS:
load_checks['PLUGIN_WHITELIST_TAGS'] = bool(
PLUGIN_WHITELIST_TAGS & set(metadata.get('tags', []))
)
if config.PLUGIN_BLACKLIST_LIST:
load_checks['PLUGIN_BLACKLIST_LIST'] = (
allow_match not in config.PLUGIN_BLACKLIST_LIST
)
if PLUGIN_BLACKLIST_RE.pattern:
load_checks['PLUGIN_BLACKLIST_RE'] = not bool(
PLUGIN_BLACKLIST_RE.match(allow_match)
)
if PLUGIN_BLACKLIST_TAGS:
load_checks['PLUGIN_BLACKLIST_TAGS'] = not bool(
PLUGIN_BLACKLIST_TAGS & set(metadata.get('tags', []))
)
load = all(load_checks.values())
logging.debug(f'Load-checks: {load_checks}, overall({load})')
if not load:
continue
logging.info(f'Loading plugin: {plugin.name}')
mod = import_module(plugin.name)
if tasks_candidate:
task_candidates.append(plugin.name)
defined_plugins = get_defined_plugins(mod)
for pt in ['hook-definitions', 'hooks', 'drivers', 'setup']:
plugins_to_load[pt] += defined_plugins[pt]
if hasattr(mod, 'router'):
routers.append(mod.router)
return {
'plugins_to_load': plugins_to_load,
'task_candidates': task_candidates,
'routers': routers,
}
plugin_manager: PluginManager
async def startup(app):
global plugin_manager
plugin_manager = PluginManager()
plugin_manager.store.update(**_get_plugindata())
for hook_definition in plugin_manager.store['plugins_to_load']['hook-definitions']:
plugin_manager.register_hook_definition(hook_definition)
for hook in plugin_manager.store['plugins_to_load']['hooks']:
plugin_manager.register_hook(hook)
plugin_manager.post_hook()
for driver in plugin_manager.store['plugins_to_load']['drivers']:
plugin_manager.register_driver(driver)
await plugin_manager.load_components()
for router in plugin_manager.store['routers']:
app.include_router(router)
for setup in plugin_manager.store['plugins_to_load']['setup']:
plugin_manager.run_setup(setup, {'app': app})
def startup_simple():
"""
Startup for simple sync apps that want some of the core functionality of opa-stack,
but not the fastapi app itself. Usefull for things like celery workers
"""
global plugin_manager
plugin_manager = PluginManager()
plugin_manager.store.update(**_get_plugindata())
for hook_definition in plugin_manager.store['plugins_to_load']['hook-definitions']:
plugin_manager.register_hook_definition(hook_definition)
for hook in plugin_manager.store['plugins_to_load']['hooks']:
plugin_manager.register_hook(hook)
plugin_manager.post_hook()
for driver in plugin_manager.store['plugins_to_load']['drivers']:
plugin_manager.register_driver(driver)
plugin_manager.load_sync_components_global()
async def shutdown():
pass
def get_plugin_manager() -> PluginManager:
return plugin_manager
def get_component(name: str):
return plugin_manager.optional_components[name]
def get_instance(name: str):
return get_component(name).instance
def get_util(name: str):
return get_component(name).util
def call_hook(name, *args, **kwargs):
return plugin_manager.call(name, *args, **kwargs)
async def call_hook_async(name, *args, **kwargs):
return await plugin_manager.call_async(name, *args, **kwargs)
def get_router(*args, **kwargs) -> APIRouter:
return APIRouter(*args, **kwargs)
# Adding plugin-paths
To use a custom plugin-folder, populate the configuration named PLUGIN_PATHS
with a comma-separated-list of path's.
TIP
The path /data/opa/plugins
will always be appened, this is because we use plugins internally as well. However, they can easiely be overridden if wanted as they are appended as the last item in the list.
# File-structure
When loading plugins, we uses PLUGIN_PATHS
to find possible plugins. We will load both plugins that are single files, but also packages containing an __init__.py
.
WARNING
Make sure the name of your plugin is unique, since we are using the pythons import system when importing it.
Example.. Don't call your plugin redis.py
and expect it to work... 😃 You will probably get some errors down the line.
The order when the plugins are loaded is important. If you want to load plugin myplugin
, and there are 2 files with that name, the first one in the PLUGIN_PATHS
will win. Just as in normal with PATH
variables.
We will ONLY care about the first one. Even if the first one is going to get ignored by a filter (see below)
# Configuration
PLUGIN_PATH
: List of paths to potentially load plugins from, default[]
, ie, only/data/opa/plugins
(as it is always there..). This is an array, and can be overwritten if you define it multiple places, to merge multiple entries, write, example (notice thedynacon_merge
) (dynaconf-docs)
default:
PLUGIN_PATHS:
- "/extra_plugins"
- dynaconf_merge
TIP
If you want to define this value using an environment-variable, you can define it as a string (OPA_PLUGIN_PATHS='/plugins'
) or a list, (OPA_PLUGIN_PATHS='["/plugins", "/more_plugins"]'
)
If a plugin is loaded or not can be dictated using rules. There are many different filters, and they are all set to allow-as-default. You should only use 0, 1 or maybe 2 of the settings.. But feel free to use as many as you like 😃
PLUGIN_WHITELIST_LIST
(default: []): List of whitelisted plugins to load.PLUGIN_WHITELIST_RE
(default: ""): Regex of whitelisted plugins to load.PLUGIN_WHITELIST_TAGS
(default: []): List of whitelisted plugin tags to load. Setting this means that we will ONLY load plugins having one or more of these tags.PLUGIN_BLACKLIST_LIST
(default: []): List of blacklisted plugins to not load.PLUGIN_BLACKLIST_RE
(default: ""): Regex of blacklisted plugins to not load.PLUGIN_BLACKLIST_TAGS
(default: []): List of blacklisted plugin tags to not load.
The default settings means that ALL available plugins (in the paths) will be loaded by default. Which might be just what you want.
TIP
The _LIST
and _RE
matches will match against the plugin-path, and the module-name. Example-paths it will need to match against
/data/opa/demo-plugins/demo_noop
: For the file inside/data/opa/demo-plugins
nameddemo_noop.py
/data/opa/demo-plugins/demo_model
: For thedemo_model
package (a folder with an__init__.py
file)
The _TAGS
matchers will check for metadata