kapsikkum-unmanic – Rev 1

Subversion Repositories:
Rev:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
    unmanic.session.py

    Written by:               Josh.5 <jsunnex@gmail.com>
    Date:                     10 Mar 2021, (5:20 PM)

    Copyright:
           Copyright (C) Josh Sunnex - All Rights Reserved

           Permission is hereby granted, free of charge, to any person obtaining a copy
           of this software and associated documentation files (the "Software"), to deal
           in the Software without restriction, including without limitation the rights
           to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
           copies of the Software, and to permit persons to whom the Software is
           furnished to do so, subject to the following conditions:

           The above copyright notice and this permission notice shall be included in all
           copies or substantial portions of the Software.

           THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
           EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
           MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
           IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
           DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
           OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
           OR OTHER DEALINGS IN THE SOFTWARE.

"""
import base64
import pickle
import random
import time
import requests

from unmanic import config
from unmanic.libs import common, unlogger
from unmanic.libs.singleton import SingletonType
from unmanic.libs.unmodels import Installation


class Session(object, metaclass=SingletonType):
    """
    Session

    Manages the Unbmanic applications session for unlocking
    features and fetching data from the Unmanic site API.

    """

    """
    level - The user auth level
    Set level to 0 by default
    """
    level = 5

    """
    non supporter library count
    """
    library_count = 1000

    """
    non supporter linked installations count
    """
    link_count = 1000

    """
    picture_uri - The user avatar
    """
    picture_uri = ''

    """
    name - The user's name
    """
    name = ''

    """
    email - The user's email
    """
    email = ''

    """
    created - The timestamp when the session was created
    """
    created = None

    """
    last_check - The timestamp when the session was last checked
    """
    last_check = None

    """
    uuid - This installation's UUID
    """
    uuid = None

    """
    user_access_token - The access token to authenticate requests with the Unmanic API
    """
    user_access_token = None

    """
    session_cookies - A stored copy of the session cookies to persist between restarts
    """
    session_cookies = None

    def __init__(self, *args, **kwargs):
        unmanic_logging = unlogger.UnmanicLogger.__call__()
        self.logger = unmanic_logging.get_logger(__class__.__name__)
        self.timeout = 30
        self.dev_local_api = kwargs.get('dev_local_api', False)
        self.requests_session = requests.Session()
        self.logger.info('Initialising new session object')

    def __created_older_than_x_days(self, days=1):
        # (86400 = 24 hours)
        seconds = (days * 86400)
        # Get session expiration time
        time_now = time.time()
        time_when_session_expires = self.created + seconds
        # Check that the time create is less than 24 hours old
        if time_now < time_when_session_expires:
            return False
        return True

    def __check_session_valid(self):
        """
        Ensure that the session is valid.
        A session is only valid for a limited amount of time.
        After a session expires, it should be re-acquired.

        :return:
        """
        # Last checked less than a min ago... just keep the current session.
        # This check is only to prevent spamming requests when the site API is unreachable
        # Only check in every 40 mins (2400s) minimum. Never ignore a checkin for more than 45 mins (2700s)
        ### if self.last_check and 2700 > (time.time() - self.last_check) < 2400:
        if self.last_check and 45 > (time.time() - self.last_check) < 40:
            return True
        # If the session has never been created, return false
        if not self.created:
            return False
        # Check if the time the session was created is less than 1 day old
        if not self.__created_older_than_x_days(days=2):
            # Only try to recreate the session once a day
            return True
        self.logger.debug('Session no longer valid')
        return False

    def __update_created_timestamp(self):
        """
        Update the session "created" timestamp.

        :return:
        """
        # Get the time now in seconds
        seconds = time.time()
        # Create a seconds offset of some random number between 300 (5 mins) and 900 (15 mins)
        seconds_offset = random.randint(300, 900 - 1)
        # Set the created flag with the seconds variable plus a random offset to avoid people joining
        #   together to register if the site goes down
        self.created = (seconds + seconds_offset)
        # Print only the accurate update time in debug log
        from datetime import datetime
        created = datetime.fromtimestamp(seconds)
        self.logger.debug('Updated session at %s', str(created))

    def __fetch_installation_data(self):
        """
        Fetch installation data from DB

        :return:
        """
        # Fetch installation
        db_installation = Installation()
        try:
            # Fetch a single row (get() will raise DoesNotExist exception if no results are found)
            current_installation = db_installation.select().order_by(Installation.id.asc()).limit(1).get()
        except Exception as e:
            # Create settings (defaults will be applied)
            self.logger.debug('Unmanic session does not yet exist... Creating.')
            db_installation.delete().execute()
            current_installation = db_installation.create()

        self.uuid = str(current_installation.uuid)
        self.level = int(current_installation.level)
        self.picture_uri = str(current_installation.picture_uri)
        self.name = str(current_installation.name)
        self.email = str(current_installation.email)
        self.created = current_installation.created

        self.__update_session_auth(access_token=current_installation.user_access_token,
                                   session_cookies=current_installation.session_cookies)

    def __store_installation_data(self):
        """
        Store installation data in DB to persist reboot

        :return:
        """
        if self.uuid:
            db_installation = Installation.get_or_none(uuid=self.uuid)
            db_installation.level = self.level
            db_installation.picture_uri = self.picture_uri
            db_installation.name = self.name
            db_installation.email = self.email
            db_installation.created = self.created
            if self.user_access_token:
                db_installation.user_access_token = self.user_access_token
            if self.session_cookies:
                db_installation.session_cookies = self.session_cookies
            db_installation.save()

    def __reset_session_installation_data(self):
        """
        Reset stored session data

        :return:
        """
        self.logger.debug('Resetting session installation data.')
        self.level = 0
        self.picture_uri = ''
        self.name = ''
        self.email = ''
        self.created = time.time()
        self.user_access_token = None
        self.__store_installation_data()

    def __update_session_auth(self, access_token=None, session_cookies=None):
        # Update session headers
        if access_token:
            self.user_access_token = access_token
            self.requests_session.headers.update({'Authorization': self.user_access_token})
        # Update session cookies
        if session_cookies:
            self.session_cookies = session_cookies
            try:
                self.requests_session.cookies = pickle.loads(base64.b64decode(session_cookies))
            except Exception as e:
                self.logger.error('Error trying to reload session cookies - %s', str(e))

    def get_installation_uuid(self):
        """
        Returns the installation UUID as a string.
        If it does not yet exist, it will create one.

        :return:
        """
        if not self.uuid:
            self.__fetch_installation_data()
        return self.uuid

    def get_supporter_level(self):
        """
        Returns the supporter level

        :return:
        """
        if not self.level:
            self.__fetch_installation_data()
        return self.level

    def get_site_url(self):
        """
        Set the Unmanic application site URL
        :return:
        """
        api_proto = "https"
        api_domain = "api.unmanic.app"
        if self.dev_local_api:
            api_proto = "http"
            api_domain = "api.unmanic.localhost"
        return "{0}://{1}".format(api_proto, api_domain)

    def set_full_api_url(self, api_prefix, api_version, api_path):
        """
        Set the API path URL

        :param api_prefix:
        :param api_version:
        :param api_path:
        :return:
        """
        api_versioned_path = "{}/v{}".format(api_prefix, api_version)
        return "{0}/{1}/{2}".format(self.get_site_url(), api_versioned_path, api_path)

    def api_get(self, api_prefix, api_version, api_path):
        """
        Generate and execute a GET API call.

        :param api_prefix:
        :param api_version:
        :param api_path:
        :return:
        """
        u = self.set_full_api_url(api_prefix, api_version, api_path)
        r = self.requests_session.get(u, timeout=self.timeout)
        if r.status_code > 403:
            # There is an issue with the remote API
            self.logger.debug(
                "Sorry! There seems to be an issue with the remote servers. Please try GET request again later. Status code %s",
                r.status_code)
        if r.status_code == 401:
            # Verify the token. Refresh as required
            token_verified = self.verify_token()
            # If successful, then retry request
            if token_verified:
                r = self.requests_session.get(u, timeout=self.timeout)
                if r.status_code > 403:
                    # There is an issue with the remote API
                    self.logger.debug(
                        "Sorry! There seems to be an issue with the remote servers on retry. Please try GET request again later. Status code %s",
                        r.status_code)
            else:
                self.logger.debug('Failed to verify auth (api_get)')
        return r.json(), r.status_code

    def api_post(self, api_prefix, api_version, api_path, data):
        """
        Generate and execute a POST API call.

        :param api_prefix:
        :param api_version:
        :param api_path:
        :param data:
        :return:
        """
        u = self.set_full_api_url(api_prefix, api_version, api_path)
        r = self.requests_session.post(u, json=data, timeout=self.timeout)
        if r.status_code > 403:
            # There is an issue with the remote API
            self.logger.debug(
                "Sorry! There seems to be an issue with the remote servers. Please try POST request again later. Status code %s",
                r.status_code)
        if r.status_code == 401:
            # Verify the token. Refresh as required
            token_verified = self.verify_token()
            # If successful, then retry request
            if token_verified:
                r = self.requests_session.post(u, json=data, timeout=self.timeout)
                if r.status_code > 403:
                    # There is an issue with the remote API
                    self.logger.debug(
                        "Sorry! There seems to be an issue with the remote servers on retry. Please try POST request again later. Status code %s",
                        r.status_code)
            else:
                self.logger.debug('Failed to verify auth (api_post)')
        return r.json(), r.status_code

    def verify_token(self):
        if not self.user_access_token:
            # No valid tokens exist
            return False
        # Check if access token is valid
        u = self.set_full_api_url('support-auth-api', 1, 'user_auth/verify_token')
        r = self.requests_session.get(u, timeout=self.timeout)
        if r.status_code in [202]:
            # Token is valid
            return True
        elif r.status_code > 403:
            # Issue with server... Just carry on with current access token can't fix that here.
            self.logger.debug(
                "Sorry! There seems to be an issue with the token auth servers. Please try again later. Status code %s",
                r.status_code)
            # Return True here to prevent the app from lowering the level
            return True

        # Access token is not valid. Refresh it.
        self.logger.debug('Unable to verify authentication token. Refreshing...')
        u = self.set_full_api_url('support-auth-api', 1, 'user_auth/refresh_token')
        r = self.requests_session.get(u, timeout=self.timeout)
        if r.status_code in [202]:
            # Token refreshed
            # Store the updated access token
            response = r.json()
            self.__update_session_auth(access_token=response.get('data', {}).get('accessToken'))
            # Store the updated session cookies
            self.session_cookies = base64.b64encode(pickle.dumps(self.requests_session.cookies)).decode('utf-8')
            self.__store_installation_data()
            return True
        elif r.status_code > 403:
            # Issue was with server... Just carry on with current access token can't fix that here.
            self.logger.debug(
                "Sorry! There seems to be an issue with the auth servers. Please try again later. Status code %s",
                r.status_code)
            # Return True here to prevent the app from lowering the level
            return True
        elif r.status_code in [403]:
            # Log this failure in the debug logs
            self.logger.debug('Failed to refresh access token.')
            response = r.json()
            for message in response.get('messages', []):
                self.logger.debug(message)
        # Just blank the class attribute.
        # It is fine for requests to be sent with further requests.
        # User will appear to be logged out.
        self.user_access_token = None
        return False

    def auth_user_account(self, force_checkin=False):
        # Don't bother if the user has never logged in
        if not self.user_access_token and not force_checkin:
            self.logger.debug('The user access token is not set add we are not being forced to refresh for one.')
            return
        # Start by verifying the token
        token_verified = self.verify_token()
        if not token_verified and force_checkin:
            # Try to fetch token if this was the initial login
            post_data = {"uuid": self.get_installation_uuid()}
            response, status_code = self.api_post('support-auth-api', 1, 'app_auth/retrieve_app_token', post_data)
            if status_code in [200, 201, 202] and response.get('success'):
                self.__update_session_auth(access_token=response.get('data', {}).get('accessToken'))
                token_verified = self.verify_token()
        # Set default level to 0
        updated_level = 0
        # Finally, fetch user info if the token was successfully verified
        if token_verified:
            response, status_code = self.api_get('support-auth-api', 1, 'user_auth/user_info')
            if status_code >= 500:
                # Failed to fetch data from server. Ignore this for now. Will try again later.
                return
            if status_code in [200, 201, 202] and response.get('success'):
                # Get user data from response data
                user_data = response.get('data', {}).get('user')
                if user_data:
                    # Set name from user data
                    self.name = user_data.get("name", "Valued Supporter")

                    # Set avatar from user data
                    self.picture_uri = user_data.get("picture_uri", "/assets/global/img/avatar/avatar_placeholder.png")

                    # Set email from user data
                    self.email = user_data.get("email", "")

                    # Update level from response data (default back to 0)
                    updated_level = int(user_data.get("supporter_level", 0))
        self.level = updated_level

    def register_unmanic(self, force=False):
        return True
        """
        Register Unmanic with site.
        This sends information about the system that Unmanic is running on.
        It also sends a unique ID.

        Based on the return information, this will set the session level.

        Return success status.

        :param force:
        :return:
        """
        # First check if the current session is still valid
        if not force and self.__check_session_valid():
            return True

        # Set now as the last time this was run (before it was actually run
        self.last_check = time.time()

        # Update the session
        settings = config.Config()
        try:
            # Fetch the installation data prior to running a session update
            self.__fetch_installation_data()

            # Build post data
            from unmanic.libs.system import System
            system = System()
            system_info = system.info()
            platform_info = system_info.get("platform", None)
            if platform_info:
                platform_info = " * ".join(platform_info)
            post_data = {
                "uuid":           self.get_installation_uuid(),
                "version":        settings.read_version(),
                "python_version": system_info.get("python", ''),
                "system":         {
                    "platform": platform_info,
                    "devices":  system_info.get("devices", {}),
                }
            }

            # Refresh user auth
            self.auth_user_account(force_checkin=force)

            # Register Unmanic
            registration_response, status_code = self.api_post('unmanic-api', 1, 'installation_auth/register', post_data)

            # Save data
            if status_code in [200, 201, 202] and registration_response.get("success"):
                self.__update_created_timestamp()
                # Persist session in DB
                self.__store_installation_data()
                return True

            # Allow an extension for the session for 7 days without an internet connection
            if self.__created_older_than_x_days(days=7):
                # Reset the session - Unmanic should phone home once every 7 days
                #self.__reset_session_installation_data()
                pass
            return True
        except Exception as e:
            self.logger.debug("Exception while registering Unmanic: %s", e, exc_info=True)
            if self.__check_session_valid():
                # If the session is still valid, just return true. Perhaps the internet is down and it timed out?
                return True
            return False

    def sign_out(self):
        """
        Remove any user auth

        :return:
        """
        post_data = {
            "uuid": self.get_installation_uuid(),
        }
        response, status_code = self.api_post('unmanic-api', 1,
                                              'installation_auth/remove_installation_registration',
                                              post_data)
        # Save data
        self.logger.debug("Remote registry logout response - Code: %s, Body: %s", status_code, response)
        self.__reset_session_installation_data()
        return True

    def get_sign_out_url(self):
        """
        Fetch the application sign out URL

        :return:
        """
        return "{0}/unmanic-api/v1/installation_auth/logout".format(self.get_site_url())

    def get_patreon_login_url(self):
        """
        Fetch the Patreon Login URL

        :return:
        """
        return "{0}/support-auth-api/v1/login_patreon/login".format(self.get_site_url())

    def get_github_login_url(self):
        """
        Fetch the GitHub Login URL

        :return:
        """
        return "{0}/support-auth-api/v1/login_github/login".format(self.get_site_url())

    def get_discord_login_url(self):
        """
        Fetch the Discord Login URL

        :return:
        """
        return "{0}/support-auth-api/v1/login_discord/login".format(self.get_site_url())

    def get_patreon_sponsor_page(self):
        """
        Fetch the Patreon sponsor page

        :return:
        """
        try:
            # Fetch Patreon sponsorship URL from Unmanic site API
            response, status_code = self.api_get('unmanic-api', 1, 'links/unmanic_patreon_sponsor_page')
            if status_code in [200, 201, 202] and response.get("success"):
                response_data = response.get("data")
                return response_data
        except Exception as e:
            self.logger.debug('Exception while fetching Patreon sponsor page - %s', e)
        return False