kapsikkum-unmanic – Rev 1
?pathlinks?
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
unmanic.plugins.py
Written by: Josh.5 <jsunnex@gmail.com>
Date: 03 Mar 2021, (3:52 PM)
Copyright:
Copyright (C) Josh Sunnex - All Rights Reserved
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
OR OTHER DEALINGS IN THE SOFTWARE.
"""
import base64
import hashlib
import json
import os
import shutil
import zipfile
from operator import attrgetter
import requests
from unmanic import config
from unmanic.libs import common, unlogger
from unmanic.libs.library import Library
from unmanic.libs.session import Session
from unmanic.libs.singleton import SingletonType
from unmanic.libs.unmodels import EnabledPlugins, LibraryPluginFlow, Plugins, PluginRepos
from unmanic.libs.unplugins import PluginExecutor
class PluginsHandler(object, metaclass=SingletonType):
"""
Set plugin version.
Plugins must be compatible with this version to be installed.
"""
version: int = 2
def __init__(self, *args, **kwargs):
self.settings = config.Config()
unmanic_logging = unlogger.UnmanicLogger.__call__()
self.logger = unmanic_logging.get_logger(__class__.__name__)
def _log(self, message, message2='', level="info"):
message = common.format_message(message, message2)
getattr(self.logger, level)(message)
@staticmethod
def get_plugin_repo_id(repo_path):
return int(hashlib.md5(repo_path.encode('utf8')).hexdigest(), 16)
def get_repo_cache_file(self, repo_id):
plugins_directory = self.settings.get_plugins_path()
if not os.path.exists(plugins_directory):
os.makedirs(plugins_directory)
return os.path.join(plugins_directory, "repo-{}.json".format(repo_id))
def get_plugin_path(self, plugin_id):
plugin_directory = os.path.join(self.settings.get_plugins_path(), plugin_id)
if not os.path.exists(plugin_directory):
os.makedirs(plugin_directory)
return plugin_directory
def get_plugin_download_cache_path(self, plugin_id, plugin_version):
plugin_directory = self.settings.get_plugins_path()
return os.path.join(plugin_directory, "{}-{}.zip".format(plugin_id, plugin_version))
@staticmethod
def get_default_repo():
return "default"
def get_plugin_repos(self):
"""
Returns a list of plugin repos
:return:
"""
default_repo = self.get_default_repo()
repo_list = [
{
"path": default_repo
}
]
repos = PluginRepos.select().order_by(PluginRepos.id.asc())
for repo in repos:
repo_dict = repo.model_to_dict()
if repo_dict.get('path') == default_repo:
continue
repo_list.append(repo_dict)
return repo_list
def set_plugin_repos(self, repo_list):
# Ensure list of repo URLs is valid
for repo_path in repo_list:
repo_data = self.fetch_remote_repo_data(repo_path)
if not repo_data:
return False
# Remove all existing repos
PluginRepos.delete().execute()
# Add new repos
data = []
for repo_path in repo_list:
data.append({"path": repo_path})
PluginRepos.insert_many(data).execute()
return True
def fetch_remote_repo_data(self, repo_path):
# Fetch remote JSON file
session = Session()
uuid = session.get_installation_uuid()
level = session.get_supporter_level()
repo = base64.b64encode(repo_path.encode('utf-8')).decode('utf-8')
api_path = f'plugin_repos/get_repo_data/uuid/{uuid}/level/{level}/repo/{repo}'
data, status_code = session.api_get(
'unmanic-api',
1,
api_path,
)
if status_code >= 500:
self._log(f"Failed to fetch plugin repo from '{api_path}'. Code:{status_code}", level="debug")
return data
def update_plugin_repos(self):
"""
Updates the local cached data of plugin repos
:return:
"""
plugins_directory = self.settings.get_plugins_path()
if not os.path.exists(plugins_directory):
os.makedirs(plugins_directory)
current_repos_list = self.get_plugin_repos()
for repo in current_repos_list:
repo_path = repo.get('path')
repo_id = self.get_plugin_repo_id(repo_path)
# Fetch remote JSON file
repo_data = self.fetch_remote_repo_data(repo_path)
# Dumb object to local JSON file
repo_cache = self.get_repo_cache_file(repo_id)
self._log("Repo cache file '{}'.".format(repo_cache), level="info")
try:
with open(repo_cache, 'w') as f:
json.dump(repo_data, f, indent=4)
except json.JSONDecodeError as e:
self._log("Unable to update plugin repo '{}'.".format(repo_path), str(e), level="error")
return True
def get_settings_of_all_installed_plugins(self):
all_settings = {}
# First fetch all enabled plugins
order = [
{
"column": 'name',
"dir": 'asc',
},
]
installed_plugins = self.get_plugin_list_filtered_and_sorted(order=order)
# Fetch settings for each plugin
plugin_executor = PluginExecutor()
for plugin in installed_plugins:
plugin_settings, plugin_settings_meta = plugin_executor.get_plugin_settings(plugin.get('plugin_id'))
all_settings[plugin.get('plugin_id')] = plugin_settings
# Return modules
return all_settings
def read_repo_data(self, repo_id):
repo_cache = self.get_repo_cache_file(repo_id)
if os.path.exists(repo_cache):
with open(repo_cache) as f:
repo_data = json.load(f)
return repo_data
return {}
def get_plugin_info(self, plugin_id):
plugin_info = {}
plugin_directory = os.path.join(self.settings.get_plugins_path(), plugin_id)
info_file = os.path.join(plugin_directory, 'info.json')
if os.path.exists(info_file):
# Read plugin info.json
with open(info_file) as json_file:
plugin_info = json.load(json_file)
return plugin_info
def get_plugins_in_repo_data(self, repo_data):
return_list = []
if 'repo' in repo_data and 'plugins' in repo_data:
# Get URLs for plugin downloads
repo_meta = repo_data.get("repo")
repo_data_directory = repo_meta.get("repo_data_directory")
if not repo_data_directory:
return return_list
repo_data_directory = repo_data_directory.rstrip('/')
# if not repo_data_directory.endswith("/"):
# repo_data_directory = repo_data_directory + "/"
# Loop over
for plugin in repo_data.get("plugins", []):
# Only show plugins that are compatible with this version
# Plugins will require a 'compatibility' entry in their info.json file.
# This must list the plugin handler versions that it is compatible with
if self.version not in plugin.get('compatibility', []):
continue
plugin_package_url = "{0}/{1}/{1}-{2}.zip".format(repo_data_directory, plugin.get('id'), plugin.get('version'))
plugin_changelog_url = "{0}/{1}/changelog.md".format(repo_data_directory, plugin.get('id'))
# Check if plugin is already installed:
plugin_status = {
'installed': False,
}
plugin_info = self.get_plugin_info(plugin.get('id'))
if plugin_info:
local_version = plugin_info.get('version')
# Parse the currently installed version number and check if it matches
remote_version = plugin.get('version')
if local_version == remote_version:
plugin_status = {
'installed': True,
'update_available': False,
}
else:
# There is an update available
self.flag_plugin_for_update_by_id(plugin.get("id"))
plugin_status = {
'installed': True,
'update_available': True,
}
return_list.append(
{
'plugin_id': plugin.get('id'),
'name': plugin.get('name'),
'author': plugin.get('author'),
'description': plugin.get('description'),
'version': plugin.get('version'),
'icon': plugin.get('icon', ''),
'tags': plugin.get('tags'),
'status': plugin_status,
'package_url': plugin_package_url,
'changelog_url': plugin_changelog_url,
'repo_name': repo_meta.get('name'),
}
)
return return_list
def get_installable_plugins_list(self, filter_repo_id=None):
"""
Return a list of plugins that can be installed
Optionally filter by repo
:param filter_repo_id:
:return:
"""
return_list = []
# First fetch a list of available repos
current_repos_list = self.get_plugin_repos()
for repo in current_repos_list:
repo_path = repo.get('path')
repo_id = self.get_plugin_repo_id(repo_path)
if filter_repo_id and repo_id != int(filter_repo_id):
# Filtering by repo ID and this one does not match
continue
repo_data = self.read_repo_data(repo_id)
plugins_in_repo = self.get_plugins_in_repo_data(repo_data)
for plugin_data in plugins_in_repo:
plugin_data['repo_id'] = str(repo_id)
return_list += plugins_in_repo
return return_list
def read_remote_changelog_file(self, changelog_url):
r = requests.get(changelog_url, timeout=1)
if r.status_code == 200:
return r.text
return ''
def notify_site_of_plugin_install(self, plugin):
"""
Notify the unmanic.app site API of the installation.
This is used for metric stats so that we can get a count of plugin downloads.
:param plugin:
:return:
"""
# Post
session = Session()
uuid = session.get_installation_uuid()
level = session.get_supporter_level()
post_data = {
"uuid": uuid,
"level": level,
"plugin_id": plugin.get("plugin_id"),
"author": plugin.get("author"),
"version": plugin.get("version"),
}
try:
repo_data, status_code = session.api_post('unmanic-api', 1, 'plugin_repos/record_install', post_data)
if not repo_data.get('success'):
session.register_unmanic()
except Exception as e:
self._log("Exception while logging plugin install.", str(e), level="debug")
return False
def install_plugin_by_id(self, plugin_id, repo_id=None):
"""
Find the matching plugin info for the given plugin ID.
Download the plugin if it is found and return the result.
If it is not found, return False.
:param plugin_id:
:param repo_id:
:return:
"""
plugin_list = self.get_installable_plugins_list(filter_repo_id=repo_id)
for plugin in plugin_list:
if plugin.get('plugin_id') == plugin_id:
success = self.download_and_install_plugin(plugin)
if success:
try:
# Write the plugin info to the DB
plugin_directory = self.get_plugin_path(plugin.get("plugin_id"))
result = self.write_plugin_data_to_db(plugin, plugin_directory)
if result:
self._log("Installed plugin '{}'".format(plugin_id), level="info")
# Ensure the plugin module is reloaded (if it was previously loaded)
plugin_executor = PluginExecutor()
plugin_executor.reload_plugin_module(plugin.get('plugin_id'))
return result
except Exception as e:
self._log("Exception while installing plugin '{}'.".format(plugin), str(e), level="exception")
return False
def install_plugin_from_path_on_disk(self, abspath):
"""
Install a plugin from a ZIP file on disk
:param abspath:
:return:
"""
# TODO: Ensure that this is a zip file
try:
plugin_info = self.install_plugin(abspath)
# Set the plugin_id variable used when writing data to DB.
# The returned 'plugin_info' is just a readout of the info.json file which has this set to 'id'
plugin_info['plugin_id'] = plugin_info.get('id')
# Cleanup zip file
if os.path.isfile(abspath):
os.remove(abspath)
# Write the plugin info to the DB
plugin_directory = self.get_plugin_path(plugin_info.get("plugin_id"))
result = self.write_plugin_data_to_db(plugin_info, plugin_directory)
if result:
self._log("Installed plugin '{}'".format(plugin_info.get("plugin_id")), level="info")
# Ensure the plugin module is reloaded (if it was previously loaded)
plugin_executor = PluginExecutor()
plugin_executor.reload_plugin_module(plugin_info.get('plugin_id'))
return result
except Exception as e:
self._log("Exception while installing plugin from zip '{}'.".format(abspath), str(e), level="exception")
return False
def download_and_install_plugin(self, plugin):
"""
Download and install a given plugin
:param plugin:
:return:
"""
self._log("Installing plugin '{}'".format(plugin.get("name")), level='debug')
# Try to fetch URL
try:
# Fetch remote zip file
destination = self.download_plugin(plugin)
# Install downloaded plugin
self.install_plugin(destination, plugin.get("plugin_id"))
# Cleanup zip file
if os.path.isfile(destination):
os.remove(destination)
self.notify_site_of_plugin_install(plugin)
return True
except Exception as e:
success = False
self._log("Exception while installing plugin '{}'.".format(plugin), str(e), level="exception")
return False
def download_plugin(self, plugin):
"""
Download a given plugin to a temp directory
:param plugin:
:return:
"""
# Fetch remote zip file
destination = self.get_plugin_download_cache_path(plugin.get("plugin_id"), plugin.get("version"))
self._log("Downloading plugin '{}' to '{}'".format(plugin.get("package_url"), destination), level='debug')
with requests.get(plugin.get("package_url"), stream=True, allow_redirects=True) as r:
r.raise_for_status()
with open(destination, 'wb') as f:
for chunk in r.iter_content(chunk_size=128):
f.write(chunk)
return destination
def install_plugin(self, zip_file, plugin_id=None):
"""
Install a given plugin from a zip file
:param zip_file:
:param plugin_id:
:return:
"""
# Read plugin ID from zip contents info.json if no plugin_id was provided
if not plugin_id:
with zipfile.ZipFile(zip_file, "r") as zip_ref:
plugin_info = json.loads(zip_ref.read('info.json'))
plugin_id = plugin_info.get('id')
# Create plugin destination directory based on plugin ID
plugin_directory = self.get_plugin_path(plugin_id)
# Prevent installation if destination has a git repository. This plugin is probably under development
self._log(os.path.join(str(plugin_directory), '.git'))
if os.path.exists(os.path.join(str(plugin_directory), '.git')):
raise Exception("Plugin directory contains a git repository. Uninstall this source version before installing.")
# Extract zip file contents
self._log("Extracting plugin to '{}'".format(plugin_directory), level='debug')
with zipfile.ZipFile(zip_file, "r") as zip_ref:
zip_ref.extractall(str(plugin_directory))
# Return installed plugin info
return self.get_plugin_info(plugin_id)
@staticmethod
def write_plugin_data_to_db(plugin, plugin_directory):
# Add installed plugin to database
plugin_data = {
Plugins.plugin_id: plugin.get("plugin_id"),
Plugins.name: plugin.get("name"),
Plugins.author: plugin.get("author"),
Plugins.version: plugin.get("version"),
Plugins.tags: plugin.get("tags"),
Plugins.description: plugin.get("description"),
Plugins.icon: plugin.get("icon"),
Plugins.local_path: plugin_directory,
Plugins.update_available: False,
}
plugin_entry = Plugins.get_or_none(plugin_id=plugin.get("plugin_id"))
if plugin_entry is not None:
# Update the existing entry
update_query = (Plugins
.update(plugin_data)
.where(Plugins.plugin_id == plugin.get("plugin_id")))
update_query.execute()
else:
# Insert a new entry
Plugins.insert(plugin_data).execute()
return True
def get_total_plugin_list_count(self):
task_query = Plugins.select().order_by(Plugins.id.desc())
return task_query.count()
def get_plugin_list_filtered_and_sorted(self, order=None, start=0, length=None, search_value=None, id_list=None,
enabled=None, plugin_id=None, plugin_type=None, library_id=None):
try:
query = (Plugins.select())
if plugin_type:
if library_id is not None:
join_condition = (
(LibraryPluginFlow.plugin_id == Plugins.id) & (LibraryPluginFlow.plugin_type == plugin_type) & (
LibraryPluginFlow.library_id == library_id))
else:
join_condition = (
(LibraryPluginFlow.plugin_id == Plugins.id) & (LibraryPluginFlow.plugin_type == plugin_type))
query = query.join(LibraryPluginFlow, join_type='LEFT OUTER JOIN', on=join_condition)
if id_list:
query = query.where(Plugins.id.in_(id_list))
if search_value:
query = query.where((Plugins.name.contains(search_value)) | (Plugins.author.contains(search_value)) | (
Plugins.tags.contains(search_value)))
if plugin_id is not None:
query = query.where(Plugins.plugin_id.in_([plugin_id]))
# Deprecate this "enabled" status as plugins are now enabled when the are assigned to a library
if enabled is not None:
raise Exception("Fetching plugins by 'enabled' status is deprecated")
if library_id is not None:
join_condition = (
(EnabledPlugins.plugin_id == Plugins.id) & (EnabledPlugins.library_id == library_id))
query = query.join(EnabledPlugins, join_type='LEFT OUTER JOIN', on=join_condition)
query = query.where(EnabledPlugins.plugin_id != None)
# Get order by
if order:
for o in order:
if o.get("model"):
model = o.get("model")
else:
model = Plugins
if o.get("dir") == "asc":
order_by = attrgetter(o.get("column"))(model).asc()
else:
order_by = attrgetter(o.get("column"))(model).desc()
query = query.order_by_extend(order_by)
if length:
query = query.limit(length).offset(start)
return query.dicts()
except Plugins.DoesNotExist:
# No plugin entries exist yet
self._log("No plugins exist yet.", level="warning")
def flag_plugin_for_update_by_id(self, plugin_id):
self._log("Flagging update available for installed plugin '{}'".format(plugin_id), level='debug')
# Disable the matching entries in the table
Plugins.update(update_available=True).where(Plugins.plugin_id == plugin_id).execute()
# Fetch records
records = self.get_plugin_list_filtered_and_sorted(plugin_id=plugin_id)
# Ensure they are now disabled
for record in records:
if record.get('update_available'):
continue
self._log("Failed to flag plugin for update '{}'".format(record.get('plugin_id')), level='debug')
return False
return True
def uninstall_plugins_by_db_table_id(self, plugin_table_ids: list):
"""
Remove a Plugin by it's DB table ID column.
This will also remove the Plugin directory and all it's contents.
:param plugin_table_ids:
:return:
"""
self._log("Uninstall plugins '{}'".format(plugin_table_ids), level='debug')
# Fetch records
records_by_id = self.get_plugin_list_filtered_and_sorted(id_list=plugin_table_ids)
# Remove each plugin from disk
for record in records_by_id:
# Unload plugin modules
try:
PluginExecutor.unload_plugin_module(record.get('plugin_id'))
except Exception as e:
self._log("Exception while unloading python module {}:".format(record.get('plugin_id')), message2=str(e),
level="exception")
# Remove from disk
plugin_directory = self.get_plugin_path(record.get('plugin_id'))
self._log("Removing plugin files from disk '{}'".format(plugin_directory), level='debug')
try:
# Delete the info file first to prevent any other process trying to read the plugin.
# Without the info file, the plugin is effectivly uninstalled
info_file = os.path.join(plugin_directory, 'info.json')
if os.path.exists(info_file):
os.remove(info_file)
# Cleanup the rest of the plugin directory
shutil.rmtree(plugin_directory)
except Exception as e:
self._log("Exception while removing directory {}:".format(plugin_directory), message2=str(e),
level="exception")
# Unlink from library by ID in DB
EnabledPlugins.delete().where(EnabledPlugins.plugin_id.in_(plugin_table_ids)).execute()
# Delete by ID in DB
if not Plugins.delete().where(Plugins.id.in_(plugin_table_ids)).execute():
return False
return True
def update_plugins_by_db_table_id(self, plugin_table_ids):
self._log("Update plugins '{}'".format(plugin_table_ids), level='debug')
# Fetch records
records_by_id = self.get_plugin_list_filtered_and_sorted(id_list=plugin_table_ids)
# Update each plugin in turn
for record in records_by_id:
if self.install_plugin_by_id(record.get('plugin_id')):
continue
self._log("Failed to update plugin '{}'".format(record.get('plugin_id')), level='debug')
return False
return True
def set_plugin_flow(self, plugin_type, library_id, flow):
"""
Update the plugin flow for all plugins in a given plugin type
:param plugin_type:
:param library_id:
:param flow:
:return:
"""
# Delete all current flow data for this plugin type
delete_query = LibraryPluginFlow.delete().where(
(LibraryPluginFlow.plugin_type == plugin_type) & (LibraryPluginFlow.library_id == library_id))
delete_query.execute()
success = True
priority = 1
for plugin in flow:
plugin_id = plugin.get('plugin_id')
# Fetch the plugin info
plugin_info = Plugins.select().where(Plugins.plugin_id == plugin_id).first()
if not plugin_info:
continue
# Save the plugin flow
plugin_flow = self.set_plugin_flow_position_for_single_plugin(plugin_info, plugin_type, library_id, priority)
priority += 1
if not plugin_flow:
success = False
return success
@staticmethod
def set_plugin_flow_position_for_single_plugin(plugin_info: Plugins, plugin_type: str, library_id: int, priority: int):
"""
Update the plugin flow for a single plugin and type with the provided priority.
:param plugin_info:
:param plugin_type:
:param library_id:
:param priority:
:return:
"""
pass
# Save the plugin flow
flow_dict = {
'plugin_id': plugin_info.id,
'library_id': library_id,
'plugin_name': plugin_info.plugin_id,
'plugin_type': plugin_type,
'position': priority,
}
plugin_flow = LibraryPluginFlow.create(**flow_dict)
return plugin_flow
def get_enabled_plugin_modules_by_type(self, plugin_type, library_id=None):
"""
Return a list of enabled plugin modules when given a plugin type
Runners are filtered by the given 'plugin_type' and sorted by
configured order of execution.
If no library ID is provided, this will return all installed plugins for that type.
This case should only be used for plugin runner types that are not associated with a library.
:param plugin_type:
:param library_id:
:return:
"""
# Refresh session
s = Session()
s.register_unmanic()
# First fetch all enabled plugins
order = [
{
"model": LibraryPluginFlow,
"column": 'position',
"dir": 'asc',
},
{
"column": 'name',
"dir": 'asc',
},
]
enabled_plugins = self.get_plugin_list_filtered_and_sorted(order=order, plugin_type=plugin_type, library_id=library_id)
# Fetch all plugin modules from the given list of enabled plugins
plugin_executor = PluginExecutor()
plugin_data = plugin_executor.get_plugin_data_by_type(enabled_plugins, plugin_type)
# Return modules
return plugin_data
def exec_plugin_runner(self, data, plugin_id, plugin_type):
"""
Execute a plugin runner
:param data:
:param plugin_id:
:param plugin_type:
:return:
"""
plugin_executor = PluginExecutor()
return plugin_executor.execute_plugin_runner(data, plugin_id, plugin_type)
def get_incompatible_enabled_plugins(self, frontend_messages=None):
"""
Ensure that the currently installed plugins are compatible with this PluginsHandler version
:param frontend_messages:
:return:
:rtype:
"""
# Fetch all libraries
all_libraries = Library.get_all_libraries()
def add_frontend_message(plugin_id, name):
# If the frontend messages queue was included in request, append a message
if frontend_messages:
frontend_messages.put(
{
'id': 'incompatiblePlugin_{}'.format(plugin_id),
'type': 'error',
'code': 'incompatiblePlugin',
'message': name,
'timeout': 0
}
)
# Fetch all enabled plugins
incompatible_list = []
for library in all_libraries:
enabled_plugins = self.get_plugin_list_filtered_and_sorted(library_id=library.get('id'))
# Ensure only compatible plugins are enabled
# If all enabled plugins are compatible, then return true
for record in enabled_plugins:
try:
# Ensure plugin is compatible
plugin_info = self.get_plugin_info(record.get('plugin_id'))
except Exception as e:
plugin_info = None
self._log("Exception while fetching plugin info for {}:".format(record.get('plugin_id')), message2=str(e),
level="exception")
if plugin_info:
# Plugins will require a 'compatibility' entry in their info.json file.
# This must list the plugin handler versions that it is compatible with
if self.version in plugin_info.get('compatibility', []):
continue
incompatible_list.append(
{
'plugin_id': record.get('plugin_id'),
'name': record.get('name'),
}
)
add_frontend_message(record.get('plugin_id'), record.get('name'))
return incompatible_list
@staticmethod
def get_plugin_types_with_flows():
"""
Returns a list of all available plugin types
:return:
"""
return_plugin_types = []
plugin_ex = PluginExecutor()
types_list = plugin_ex.get_all_plugin_types()
# Filter out the types without flows
for plugin_type in types_list:
if plugin_type.get('has_flow'):
return_plugin_types.append(plugin_type.get('id'))
return return_plugin_types
def get_enabled_plugin_flows_for_plugin_type(self, plugin_type, library_id):
"""
Fetch all enabled plugin flows for a plugin type
:param plugin_type:
:param library_id:
:return:
"""
return_plugin_flow = []
for plugin_module in self.get_enabled_plugin_modules_by_type(plugin_type, library_id=library_id):
return_plugin_flow.append(
{
"plugin_id": plugin_module.get("plugin_id"),
"name": plugin_module.get("name", ""),
"author": plugin_module.get("author", ""),
"description": plugin_module.get("description", ""),
"version": plugin_module.get("version", ""),
"icon": plugin_module.get("icon", ""),
}
)
return return_plugin_flow