#!/usr/bin/env python """Initiate SAML logout from service. This script serves as an example of how to initiate a logout from the service. It is expected to be run by the Perforce server as an auth-invalidate trigger. The one command line argument is the Perforce user name or email address. """ # Copyright and license info is available in the LICENSE file. import argparse import base64 import gzip import io import json import logging import os import sys import time import zlib import requests import six from six.moves.html_parser import HTMLParser from six.moves.urllib import parse as urlparse from onelogin.saml2.auth import OneLogin_Saml2_Auth from onelogin.saml2.constants import OneLogin_Saml2_Constants from onelogin.saml2.idp_metadata_parser import OneLogin_Saml2_IdPMetadataParser from onelogin.saml2.logout_response import OneLogin_Saml2_Logout_Response from onelogin.saml2.logout_request import OneLogin_Saml2_Logout_Request from onelogin.saml2.settings import OneLogin_Saml2_Settings from onelogin.saml2.utils import OneLogin_Saml2_Utils XML_NAMESPACES = { 'saml': OneLogin_Saml2_Constants.NS_SAML, 'samlp': OneLogin_Saml2_Constants.NS_SAMLP } def encode_nameid(name_id): """Produce a base64 encoded version of name_id.""" if six.PY3: return base64.urlsafe_b64encode(name_id.encode('utf8')).decode('utf8') else: return base64.urlsafe_b64encode(name_id) def load_settings(idp_url): """Build out the settings for SAML library.""" # Elaborate process to construct settings that have everything python3-saml # is expecting, and has the correct IdP certificate. if idp_url: retries = 0 while retries < 3: try: settings = OneLogin_Saml2_IdPMetadataParser.parse_remote(idp_url) break except: time.sleep(1) retries += 1 else: settings = dict() base_path = os.path.dirname(sys.argv[0]) with open(os.path.join(base_path, 'settings.json')) as fobj: settings.update(json.load(fobj)) with open(os.path.join(base_path, 'advanced_settings.json')) as fobj: settings.update(json.load(fobj)) # Set the path for finding the 'certs' directory, otherwise it defaults # to the installation location of the pysaml module. return OneLogin_Saml2_Settings(settings, custom_base_path=base_path) def make_fake_request(settings): """Build a fake request object for python3-saml to use.""" sp = settings.get_sp_data() sls_url = sp['singleLogoutService']['url'] if sls_url is None: sys.stderr.write('missing SP SLS URL in configuration\n') sys.exit(2) result = urlparse.urlparse(sls_url) return { 'server_port': result.port, 'script_name': result.path, 'path_info': '', 'server_name': 'example.com', 'http_host': result.hostname, 'https': 'off' if result.scheme == 'http' else 'on' } def read_cookies(base_dir, name_id): """Read the cookies.txt file into a RequestsCookieJar.""" jar = requests.cookies.RequestsCookieJar() cookies_dir = os.path.expanduser(os.path.join(base_dir, 'cookies')) cookie_path = os.path.join(cookies_dir, encode_nameid(name_id)) if os.path.exists(cookie_path): with open(cookie_path) as fobj: cookies = json.load(fobj) for cookie in cookies: jar.set(cookie['name'], cookie['value'], domain=cookie.get('domain'), path=cookie.get('path')) return jar def read_session_index(base_path, name_id): """Read the session index value for the named user.""" session_dir = os.path.expanduser(os.path.join(base_path, 'sessions')) session_path = os.path.join(session_dir, encode_nameid(name_id)) if os.path.exists(session_path): with open(session_path) as fobj: value = fobj.read() return value return None def get_attr(attrs, name, fallback=None): """Extract the named attribute, if any, or return fallback.""" for (key, value) in attrs: if key == name: return value return fallback class IdHTMLParser(HTMLParser): """Parser for HTML response from identity provider.""" def __init__(self): """Initialize the parser.""" HTMLParser.__init__(self) self.get_inputs = False self.inputs = None self.form_action = None def handle_starttag(self, tag, attrs): """Start of a tag.""" if self.get_inputs and tag == 'input' and get_attr(attrs, 'type', '').lower() == 'hidden': name = get_attr(attrs, 'name') value = get_attr(attrs, 'value') self.inputs[name] = value if tag == 'form' and get_attr(attrs, 'method', '').lower() == 'post': self.form_action = get_attr(attrs, 'action') self.get_inputs = True self.inputs = dict() def handle_endtag(self, tag): """End of a tag.""" if tag == 'form' and self.get_inputs: self.get_inputs = False def handle_data(self, data): """Content within a tag.""" pass def extract_form_values(resp): """Extract the hidden input name/values pairs from the form in the HTML response.""" parser = IdHTMLParser() parser.feed(resp) parser.close() return parser.inputs def logout_request(url, cookies): """Send the logout request to the IdP and return the form values.""" r = requests.get(url, cookies=cookies) logging.debug('response headers: %s', r.headers) form_values = extract_form_values(r.text) if form_values is None: logging.warning('no form values in response: %s', r.text) else: logging.debug('response form values: %s', form_values) ensure_deflated(form_values, 'SAMLResponse') return form_values def ensure_deflated(values, name): """If not already deflated, deflate the named value in-place.""" if not values: return if name not in values: return value = values[name] decoded = base64.b64decode(value) try: # negative bit width to indicate there is no header to the compressed data zlib.decompress(decoded, -15) except zlib.error: data = decoded.encode('utf8') if six.PY2 else decoded deflated = zlib.compress(data)[2:-4] values[name] = base64.b64encode(deflated) def get_logout_status(settings, get_data): """Extract the status of the logout response.""" logout_response = OneLogin_Saml2_Logout_Response(settings, get_data['SAMLResponse']) return logout_response.get_status() def configure_logging(base_dir, debug=False): """Configure logging for the trigger.""" if debug: logs_path = os.path.expanduser(os.path.join(base_dir, 'logs')) if not os.path.exists(logs_path): os.makedirs(logs_path) log_filename = os.path.join(logs_path, 'logout.log') logging.basicConfig(filename=log_filename, filemode='w', level=logging.DEBUG) else: logging.basicConfig(stream=open(os.devnull, 'w'), level=logging.CRITICAL) def run_get_request(args): """Perform SAML logout request using HTTP GET method.""" settings = load_settings(args.idpUrl) try: # # Make the initial logout request to get things started. # fake_request = make_fake_request(settings) auth = OneLogin_Saml2_Auth(fake_request, settings) session_index = read_session_index(args.base, args.identifier) url = auth.logout(name_id=args.identifier, session_index=session_index) request_id = auth.get_last_request_id() idp_url = settings.get_idp_data()['singleLogoutService']['url'] cookies = read_cookies(args.base, args.identifier) fake_request['get_data'] = logout_request(url, cookies) # # Send the followup requests until we reach the end. # while True: auth = OneLogin_Saml2_Auth(fake_request, settings) url = auth.process_slo(keep_local_session=True, request_id=request_id) if len(auth.get_errors()) == 0: if url is not None: fake_request['get_data'] = logout_request(url, cookies) else: # successfully logged out logging.info('logout successful') break else: status = get_logout_status(settings, fake_request['get_data']) if status == OneLogin_Saml2_Constants.STATUS_PARTIAL_LOGOUT: # partially logged out logging.info('partial logout') break logging.error('\n'.join(auth.get_errors())) logging.error('error reason: %s', auth.get_last_error_reason()) sys.stderr.write('\n'.join(auth.get_errors())) sys.stderr.write('error reason: {}\n'.format(auth.get_last_error_reason())) sys.exit(1) except Exception as e: logging.exception(e) sys.stderr.write('{err}\n'.format(err=e)) sys.exit(1) def run_post_request(args): """Perform SAML logout request using HTTP POST method.""" settings = load_settings(args.idpUrl) try: fake_request = make_fake_request(settings) auth = OneLogin_Saml2_Auth(fake_request, settings) session_index = read_session_index(args.base, args.identifier) logout_request = OneLogin_Saml2_Logout_Request( settings, name_id=args.identifier, session_index=session_index) request_xml = logout_request.get_xml() signed_xml = OneLogin_Saml2_Utils.add_sign( request_xml, settings.get_sp_key(), settings.get_sp_cert(), debug=args.debug, sign_algorithm=settings.get_security_data()['signatureAlgorithm'], digest_algorithm=settings.get_security_data()['digestAlgorithm']) params = {'SAMLRequest': signed_xml} headers = { 'Content-type': 'application/x-www-form-urlencoded', 'Accept': 'text/plain' } jar = read_cookies(args.base, args.identifier) r = requests.post(auth.get_slo_url(), headers=headers, cookies=jar, params=params) logging.info('response: %s', r.status_code) logging.info('headers: %s', r.headers) logging.info('body: %s', r.text) except Exception as e: logging.exception(e) sys.stderr.write('{err}\n'.format(err=e)) sys.exit(1) def run_okta_request(args): """Perform logout using the Okta web API.""" # # Use the Okta API to perform the logout, since their SAML backend # does not support this workflow. # c.f. https://developer.okta.com/docs/api/resources/sessions # parsed = urlparse.urlparse(args.idpUrl) apiUrl = '{0}://{1}/api/v1/sessions/me'.format(parsed.scheme, parsed.netloc) domain = '.' + parsed.netloc apiPort = parsed.port or None jar = requests.cookies.RequestsCookieJar() cookies_dir = os.path.expanduser(os.path.join(args.base, 'cookies')) cookie_path = os.path.join(cookies_dir, encode_nameid(args.identifier)) if os.path.exists(cookie_path): with open(cookie_path) as fobj: cookies = json.load(fobj) for cookie in cookies: # only need the session cookies for logout if cookie['session']: jar.set(cookie['name'], cookie['value'], port=apiPort, domain=domain, path=cookie.get('path')) headers = { 'accept': 'application/json' } r = requests.delete(apiUrl, headers=headers, cookies=jar) if r.status_code == 204: logging.info('logout successful for user {}'.format(args.identifier)) else: logging.error('logout failed for user {}'.format(args.identifier)) def main(): """Read the SAML response and verify user match.""" parser = argparse.ArgumentParser(description='Initiate a SAML logout request.') parser.add_argument('--debug', action='store_true', help='enable logging to file') parser.add_argument('--post', action='store_true', help='use HTTP POST method vs GET') parser.add_argument('--okta', action='store_true', help='use Okta API to clear session') parser.add_argument('--idpUrl', help='URL of IdP metadata') parser.add_argument('--base', default='~/saml', help='base path for trigger to store files') parser.add_argument('identifier', help='Perforce user User or Email field value') args = parser.parse_args() configure_logging(args.base, debug=args.debug) if args.post: run_post_request(args) elif args.okta: run_okta_request(args) else: run_get_request(args) if __name__ == "__main__": main()