saml_logout.py #1

  • //
  • guest/
  • perforce_software/
  • helix-saml/
  • main/
  • saml_logout.py
  • View
  • Commits
  • Open Download .zip Download (13 KB)
#!/usr/bin/env python
"""Initiate SAML logout from service.

This script serves as an example of how to initiate a logout from the
service. It is expected to be run by the Perforce server as an
auth-invalidate trigger. The one command line argument is the Perforce user
name or email address.

"""

# Copyright (c) 2018 Perforce Software. All rights reserved.

import argparse
import base64
import gzip
import io
import json
import logging
import os
import sys
import zlib

import six

import six.moves.http_client as httplib
from six.moves.html_parser import HTMLParser
import six.moves.http_cookiejar as cookielib
import six.moves.urllib as urllib
from six.moves.urllib import parse as urlparse

from onelogin.saml2.auth import OneLogin_Saml2_Auth
from onelogin.saml2.constants import OneLogin_Saml2_Constants
from onelogin.saml2.idp_metadata_parser import OneLogin_Saml2_IdPMetadataParser
from onelogin.saml2.logout_response import OneLogin_Saml2_Logout_Response
from onelogin.saml2.logout_request import OneLogin_Saml2_Logout_Request
from onelogin.saml2.settings import OneLogin_Saml2_Settings
from onelogin.saml2.utils import OneLogin_Saml2_Utils

XML_NAMESPACES = {
    'saml': OneLogin_Saml2_Constants.NS_SAML,
    'samlp': OneLogin_Saml2_Constants.NS_SAMLP
}


def encode_nameid(name_id):
    """Produce a base64 encoded version of name_id."""
    if six.PY3:
        return base64.urlsafe_b64encode(name_id.encode('utf8')).decode('utf8')
    else:
        return base64.urlsafe_b64encode(name_id)


def load_settings(idp_url):
    """Build out the settings for SAML library."""
    # Elaborate process to construct settings that have everything python3-saml
    # is expecting, and has the correct IdP certificate.
    if idp_url:
        settings = OneLogin_Saml2_IdPMetadataParser.parse_remote(idp_url)
    else:
        settings = dict()
    base_path = os.path.dirname(sys.argv[0])
    with open(os.path.join(base_path, 'settings.json')) as fobj:
        settings.update(json.load(fobj))
    with open(os.path.join(base_path, 'advanced_settings.json')) as fobj:
        settings.update(json.load(fobj))
    # Set the path for finding the 'certs' directory, otherwise it defaults
    # to the installation location of the pysaml module.
    return OneLogin_Saml2_Settings(settings, custom_base_path=base_path)


def make_fake_request(settings):
    """Build a fake request object for python3-saml to use."""
    sp = settings.get_sp_data()
    sls_url = sp['singleLogoutService']['url']
    if sls_url is None:
        sys.stderr.write('missing SP SLS URL in configuration\n')
        sys.exit(2)
    result = urlparse.urlparse(sls_url)
    return {
        'server_port': result.port,
        'script_name': result.path,
        'path_info': '',
        'server_name': 'example.com',
        'http_host': result.hostname,
        'https': 'off' if result.scheme == 'http' else 'on'
    }


class MyCookiePolicy(cookielib.CookiePolicy):

    """Very liberal cookie policy."""

    def __init__(self):
        """Initialize MyCookiePolicy."""
        self.rfc2965 = False
        self.netscape = True
        self.rfc2109_as_netscape = None
        self.hide_cookie2 = False
        self.strict_domain = False
        self.strict_rfc2965_unverifiable = True
        self.strict_ns_unverifiable = False
        self.strict_ns_domain = 0
        self.strict_ns_set_initial_dollar = False
        self.strict_ns_set_path = False

    def set_ok(self, cookie, request):
        """Return true if (and only if) cookie should be accepted from server."""
        return True

    def return_ok(self, cookie, request):
        """Return true if (and only if) cookie should be returned to server."""
        return True


def make_cookie(cookie, port):
    """Construct a cookielib.Cookie instance."""
    return cookielib.Cookie(
        0,  # version
        cookie['name'],
        cookie['value'],
        str(port) if port else None,
        port is not None,
        cookie['domain'],
        True,  # domain_specified
        cookie['domain'][0] == '.',
        cookie.get('path'),
        'path' in cookie,
        cookie['secure'],
        cookie['expirationDate'],
        False,  # discard
        None,  # comment
        None,  # comment_url
        dict()  # rest
    )


def read_cookies(idp_url, base_dir, name_id):
    """Read the cookies.txt file into a cookiejar instance."""
    # cookies should have a specific port, use the IdP URL
    port = urlparse.urlparse(idp_url).port or None
    cookies = cookielib.CookieJar()
    cookies.set_policy(MyCookiePolicy())
    cookies_dir = os.path.expanduser(os.path.join(base_dir, 'cookies'))
    cookie_path = os.path.join(cookies_dir, encode_nameid(name_id))
    if os.path.exists(cookie_path):
        with open(cookie_path) as fobj:
            loaded = json.load(fobj)
        for cookie in loaded:
            cookies.set_cookie(make_cookie(cookie, port))
    return cookies


def read_session_index(base_path, name_id):
    """Read the session index value for the named user."""
    session_dir = os.path.expanduser(os.path.join(base_path, 'sessions'))
    session_path = os.path.join(session_dir, encode_nameid(name_id))
    if os.path.exists(session_path):
        with open(session_path) as fobj:
            value = fobj.read()
        return value
    return None


def get_attr(attrs, name, fallback=None):
    """Extract the named attribute, if any, or return fallback."""
    for (key, value) in attrs:
        if key == name:
            return value
    return fallback


class IdHTMLParser(HTMLParser):

    """Parser for HTML response from identity provider."""

    def __init__(self):
        """Initialize the parser."""
        HTMLParser.__init__(self)
        self.get_inputs = False
        self.inputs = None
        self.form_action = None

    def handle_starttag(self, tag, attrs):
        """Start of a tag."""
        if self.get_inputs and tag == 'input' and get_attr(attrs, 'type', '').lower() == 'hidden':
            name = get_attr(attrs, 'name')
            value = get_attr(attrs, 'value')
            self.inputs[name] = value
        if tag == 'form' and get_attr(attrs, 'method', '').lower() == 'post':
            self.form_action = get_attr(attrs, 'action')
            self.get_inputs = True
            self.inputs = dict()

    def handle_endtag(self, tag):
        """End of a tag."""
        if tag == 'form' and self.get_inputs:
            self.get_inputs = False

    def handle_data(self, data):
        """Content within a tag."""
        pass


def extract_form_values(resp):
    """Extract the hidden input name/values pairs from the form in the HTML response."""
    parser = IdHTMLParser()
    parser.feed(resp)
    parser.close()
    return parser.inputs


def logout_request(url, cookies):
    """Send the logout request to the IdP and return the form values."""
    opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookies))
    logging.debug('logout request URL: %s', url)
    response = opener.open(url)
    data = response.read()
    if logging.getLogger().isEnabledFor(logging.DEBUG):
        logging.debug('logout response headers: %s', response.info().headers)
    encoding = response.info().get('Content-Encoding')
    if encoding and 'gzip' in encoding:
        fobj = gzip.GzipFile(mode='r', fileobj=io.BytesIO(data))
        data = fobj.read()
    if six.PY3:
        data = data.decode('utf8')
    form_values = extract_form_values(data)
    if form_values is None:
        logging.warning('no form values in response: %s', data)
    else:
        logging.debug('response form values: %s', form_values)
    ensure_deflated(form_values, 'SAMLResponse')
    return form_values


def ensure_deflated(values, name):
    """If not already deflated, deflate the named value in-place."""
    if not values:
        return
    if name not in values:
        return
    value = values[name]
    decoded = base64.b64decode(value)
    try:
        # negative bit width to indicate there is no header to the compressed data
        zlib.decompress(decoded, -15)
    except zlib.error:
        data = decoded.encode('utf8') if six.PY2 else decoded
        deflated = zlib.compress(data)[2:-4]
        values[name] = base64.b64encode(deflated)


def get_logout_status(settings, get_data):
    """Extract the status of the logout response."""
    logout_response = OneLogin_Saml2_Logout_Response(settings, get_data['SAMLResponse'])
    return logout_response.get_status()


def configure_logging(base_dir, debug=False):
    """Configure logging for the trigger."""
    if debug:
        logs_path = os.path.expanduser(os.path.join(base_dir, 'logs'))
        if not os.path.exists(logs_path):
            os.makedirs(logs_path)
        log_filename = os.path.join(logs_path, 'logout.log')
        logging.basicConfig(filename=log_filename, filemode='w', level=logging.DEBUG)
    else:
        logging.basicConfig(stream=open(os.devnull, 'w'), level=logging.CRITICAL)


def main():
    """Read the SAML response and verify user match."""
    parser = argparse.ArgumentParser(description='Initiate a SAML logout request.')
    parser.add_argument('--debug', action='store_true', help='enable logging to file')
    parser.add_argument('--idpUrl', help='URL of IdP metadata')
    parser.add_argument('--base', default='~/saml', help='base path for trigger to store files')
    parser.add_argument('identifier', help='Perforce user User or Email field value')
    args = parser.parse_args()
    configure_logging(args.base, debug=args.debug)
    settings = load_settings(args.idpUrl)

    #
    # How to send the logout request using POST method.
    #
    # auth = OneLogin_Saml2_Auth(FAKE_REQUEST, settings)
    # session_index = read_session_index(args.identifier)
    # logout_request = OneLogin_Saml2_Logout_Request(
    #     settings, name_id=args.identifier, session_index=session_index)
    # request_xml = logout_request.get_xml()
    # signed_xml = OneLogin_Saml2_Utils.add_sign(
    #     request_xml,
    #     settings.get_sp_key(),
    #     settings.get_sp_cert(),
    #     debug=args.debug,
    #     sign_algorithm=settings.get_security_data()['signatureAlgorithm'],
    #     digest_algorithm=settings.get_security_data()['digestAlgorithm'])
    # params = urllib.urlencode({'SAMLRequest': signed_xml})
    # headers = {
    #     "Content-type": "application/x-www-form-urlencoded",
    #     "Accept": "text/plain"
    # }
    # url_parts = urlparse.urlparse(auth.get_slo_url())
    # if url_parts.scheme == 'http':
    #     conn = httplib.HTTPConnection(url_parts.netloc, url_parts.port or 80)
    # elif url_parts.scheme == 'https':
    #     conn = httplib.HTTPSConnection(url_parts.netloc, url_parts.port or 443)
    # else:
    #     raise Exception('Unsupported scheme: {0}'.format(url_parts.scheme))
    # conn.request("POST", url_parts.path, params, headers)
    # response = conn.getresponse()
    # print(response.status, response.reason)
    # print(response.getheaders())
    # print(response.read())
    # conn.close()

    try:
        #
        # Make the initial logout request to get things started.
        #
        fake_request = make_fake_request(settings)
        auth = OneLogin_Saml2_Auth(fake_request, settings)
        session_index = read_session_index(args.base, args.identifier)
        url = auth.logout(name_id=args.identifier, session_index=session_index)
        request_id = auth.get_last_request_id()
        idp_url = settings.get_idp_data()['singleLogoutService']['url']
        cookies = read_cookies(idp_url, args.base, args.identifier)
        fake_request['get_data'] = logout_request(url, cookies)
        #
        # Send the followup requests until we reach the end.
        #
        while True:
            auth = OneLogin_Saml2_Auth(fake_request, settings)
            url = auth.process_slo(keep_local_session=True, request_id=request_id)
            if len(auth.get_errors()) == 0:
                if url is not None:
                    fake_request['get_data'] = logout_request(url, cookies)
                else:
                    # successfully logged out
                    logging.info('logout successful')
                    break
            else:
                status = get_logout_status(settings, fake_request['get_data'])
                if status == OneLogin_Saml2_Constants.STATUS_PARTIAL_LOGOUT:
                    # partially logged out
                    logging.info('partial logout')
                    break
                logging.error('\n'.join(auth.get_errors()))
                logging.error('error reason: %s', auth.get_last_error_reason())
                sys.stderr.write('\n'.join(auth.get_errors()))
                sys.stderr.write('error reason: {}\n'.format(auth.get_last_error_reason()))
                sys.exit(1)
    except Exception as e:
        logging.exception(e)
        sys.stderr.write('{err}\n'.format(err=e))
        sys.exit(1)


if __name__ == "__main__":
    main()
# Change User Description Committed
#6 25196 Nathan Fiedler Retry and then raise the error if unsuccesful.
#5 25195 Nathan Fiedler Retry IdP metadata fetch a couple of times.

To address empheral errors when retrieving the IdP metadata from the
remote side, retry the request 3 times with a 1 second pause
inbetween.
#4 24988 Nathan Fiedler Add proper logout support for Okta.

Use the Okta API to perform the logout, as that works much better
given our unusual scenario of operating outside of the web browser.

Added the requests library as a dependency as that permits us to
send a DELETE request with cookies, while the Python standard
library does not.
#3 24952 Nathan Fiedler Update the POST version of the logout.
#2 24933 Nathan Fiedler Add copyright and license details for SAML triggers.
#1 24785 Nathan Fiedler Add SAML triggers.