#!/usr/bin/env python """Initiate SAML logout from service. This script serves as an example of how to initiate a logout from the service. It is expected to be run by the Perforce server as an auth-invalidate trigger. The one command line argument is the Perforce user name or email address. """ # Copyright (c) 2018 Perforce Software. All rights reserved. import argparse import base64 import gzip import io import json import logging import os import sys import zlib import six import six.moves.http_client as httplib from six.moves.html_parser import HTMLParser import six.moves.http_cookiejar as cookielib import six.moves.urllib as urllib from six.moves.urllib import parse as urlparse from onelogin.saml2.auth import OneLogin_Saml2_Auth from onelogin.saml2.constants import OneLogin_Saml2_Constants from onelogin.saml2.idp_metadata_parser import OneLogin_Saml2_IdPMetadataParser from onelogin.saml2.logout_response import OneLogin_Saml2_Logout_Response from onelogin.saml2.logout_request import OneLogin_Saml2_Logout_Request from onelogin.saml2.settings import OneLogin_Saml2_Settings from onelogin.saml2.utils import OneLogin_Saml2_Utils XML_NAMESPACES = { 'saml': OneLogin_Saml2_Constants.NS_SAML, 'samlp': OneLogin_Saml2_Constants.NS_SAMLP } def encode_nameid(name_id): """Produce a base64 encoded version of name_id.""" if six.PY3: return base64.urlsafe_b64encode(name_id.encode('utf8')).decode('utf8') else: return base64.urlsafe_b64encode(name_id) def load_settings(idp_url): """Build out the settings for SAML library.""" # Elaborate process to construct settings that have everything python3-saml # is expecting, and has the correct IdP certificate. if idp_url: settings = OneLogin_Saml2_IdPMetadataParser.parse_remote(idp_url) else: settings = dict() base_path = os.path.dirname(sys.argv[0]) with open(os.path.join(base_path, 'settings.json')) as fobj: settings.update(json.load(fobj)) with open(os.path.join(base_path, 'advanced_settings.json')) as fobj: settings.update(json.load(fobj)) # Set the path for finding the 'certs' directory, otherwise it defaults # to the installation location of the pysaml module. return OneLogin_Saml2_Settings(settings, custom_base_path=base_path) def make_fake_request(settings): """Build a fake request object for python3-saml to use.""" sp = settings.get_sp_data() sls_url = sp['singleLogoutService']['url'] if sls_url is None: sys.stderr.write('missing SP SLS URL in configuration\n') sys.exit(2) result = urlparse.urlparse(sls_url) return { 'server_port': result.port, 'script_name': result.path, 'path_info': '', 'server_name': 'example.com', 'http_host': result.hostname, 'https': 'off' if result.scheme == 'http' else 'on' } class MyCookiePolicy(cookielib.CookiePolicy): """Very liberal cookie policy.""" def __init__(self): """Initialize MyCookiePolicy.""" self.rfc2965 = False self.netscape = True self.rfc2109_as_netscape = None self.hide_cookie2 = False self.strict_domain = False self.strict_rfc2965_unverifiable = True self.strict_ns_unverifiable = False self.strict_ns_domain = 0 self.strict_ns_set_initial_dollar = False self.strict_ns_set_path = False def set_ok(self, cookie, request): """Return true if (and only if) cookie should be accepted from server.""" return True def return_ok(self, cookie, request): """Return true if (and only if) cookie should be returned to server.""" return True def make_cookie(cookie, port): """Construct a cookielib.Cookie instance.""" return cookielib.Cookie( 0, # version cookie['name'], cookie['value'], str(port) if port else None, port is not None, cookie['domain'], True, # domain_specified cookie['domain'][0] == '.', cookie.get('path'), 'path' in cookie, cookie['secure'], cookie['expirationDate'], False, # discard None, # comment None, # comment_url dict() # rest ) def read_cookies(idp_url, base_dir, name_id): """Read the cookies.txt file into a cookiejar instance.""" # cookies should have a specific port, use the IdP URL port = urlparse.urlparse(idp_url).port or None cookies = cookielib.CookieJar() cookies.set_policy(MyCookiePolicy()) cookies_dir = os.path.expanduser(os.path.join(base_dir, 'cookies')) cookie_path = os.path.join(cookies_dir, encode_nameid(name_id)) if os.path.exists(cookie_path): with open(cookie_path) as fobj: loaded = json.load(fobj) for cookie in loaded: cookies.set_cookie(make_cookie(cookie, port)) return cookies def read_session_index(base_path, name_id): """Read the session index value for the named user.""" session_dir = os.path.expanduser(os.path.join(base_path, 'sessions')) session_path = os.path.join(session_dir, encode_nameid(name_id)) if os.path.exists(session_path): with open(session_path) as fobj: value = fobj.read() return value return None def get_attr(attrs, name, fallback=None): """Extract the named attribute, if any, or return fallback.""" for (key, value) in attrs: if key == name: return value return fallback class IdHTMLParser(HTMLParser): """Parser for HTML response from identity provider.""" def __init__(self): """Initialize the parser.""" HTMLParser.__init__(self) self.get_inputs = False self.inputs = None self.form_action = None def handle_starttag(self, tag, attrs): """Start of a tag.""" if self.get_inputs and tag == 'input' and get_attr(attrs, 'type', '').lower() == 'hidden': name = get_attr(attrs, 'name') value = get_attr(attrs, 'value') self.inputs[name] = value if tag == 'form' and get_attr(attrs, 'method', '').lower() == 'post': self.form_action = get_attr(attrs, 'action') self.get_inputs = True self.inputs = dict() def handle_endtag(self, tag): """End of a tag.""" if tag == 'form' and self.get_inputs: self.get_inputs = False def handle_data(self, data): """Content within a tag.""" pass def extract_form_values(resp): """Extract the hidden input name/values pairs from the form in the HTML response.""" parser = IdHTMLParser() parser.feed(resp) parser.close() return parser.inputs def logout_request(url, cookies): """Send the logout request to the IdP and return the form values.""" opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookies)) logging.debug('logout request URL: %s', url) response = opener.open(url) data = response.read() if logging.getLogger().isEnabledFor(logging.DEBUG): logging.debug('logout response headers: %s', response.info().headers) encoding = response.info().get('Content-Encoding') if encoding and 'gzip' in encoding: fobj = gzip.GzipFile(mode='r', fileobj=io.BytesIO(data)) data = fobj.read() if six.PY3: data = data.decode('utf8') form_values = extract_form_values(data) if form_values is None: logging.warning('no form values in response: %s', data) else: logging.debug('response form values: %s', form_values) ensure_deflated(form_values, 'SAMLResponse') return form_values def ensure_deflated(values, name): """If not already deflated, deflate the named value in-place.""" if not values: return if name not in values: return value = values[name] decoded = base64.b64decode(value) try: # negative bit width to indicate there is no header to the compressed data zlib.decompress(decoded, -15) except zlib.error: data = decoded.encode('utf8') if six.PY2 else decoded deflated = zlib.compress(data)[2:-4] values[name] = base64.b64encode(deflated) def get_logout_status(settings, get_data): """Extract the status of the logout response.""" logout_response = OneLogin_Saml2_Logout_Response(settings, get_data['SAMLResponse']) return logout_response.get_status() def configure_logging(base_dir, debug=False): """Configure logging for the trigger.""" if debug: logs_path = os.path.expanduser(os.path.join(base_dir, 'logs')) if not os.path.exists(logs_path): os.makedirs(logs_path) log_filename = os.path.join(logs_path, 'logout.log') logging.basicConfig(filename=log_filename, filemode='w', level=logging.DEBUG) else: logging.basicConfig(stream=open(os.devnull, 'w'), level=logging.CRITICAL) def main(): """Read the SAML response and verify user match.""" parser = argparse.ArgumentParser(description='Initiate a SAML logout request.') parser.add_argument('--debug', action='store_true', help='enable logging to file') parser.add_argument('--idpUrl', help='URL of IdP metadata') parser.add_argument('--base', default='~/saml', help='base path for trigger to store files') parser.add_argument('identifier', help='Perforce user User or Email field value') args = parser.parse_args() configure_logging(args.base, debug=args.debug) settings = load_settings(args.idpUrl) # # How to send the logout request using POST method. # # auth = OneLogin_Saml2_Auth(FAKE_REQUEST, settings) # session_index = read_session_index(args.identifier) # logout_request = OneLogin_Saml2_Logout_Request( # settings, name_id=args.identifier, session_index=session_index) # request_xml = logout_request.get_xml() # signed_xml = OneLogin_Saml2_Utils.add_sign( # request_xml, # settings.get_sp_key(), # settings.get_sp_cert(), # debug=args.debug, # sign_algorithm=settings.get_security_data()['signatureAlgorithm'], # digest_algorithm=settings.get_security_data()['digestAlgorithm']) # params = urllib.urlencode({'SAMLRequest': signed_xml}) # headers = { # "Content-type": "application/x-www-form-urlencoded", # "Accept": "text/plain" # } # url_parts = urlparse.urlparse(auth.get_slo_url()) # if url_parts.scheme == 'http': # conn = httplib.HTTPConnection(url_parts.netloc, url_parts.port or 80) # elif url_parts.scheme == 'https': # conn = httplib.HTTPSConnection(url_parts.netloc, url_parts.port or 443) # else: # raise Exception('Unsupported scheme: {0}'.format(url_parts.scheme)) # conn.request("POST", url_parts.path, params, headers) # response = conn.getresponse() # print(response.status, response.reason) # print(response.getheaders()) # print(response.read()) # conn.close() try: # # Make the initial logout request to get things started. # fake_request = make_fake_request(settings) auth = OneLogin_Saml2_Auth(fake_request, settings) session_index = read_session_index(args.base, args.identifier) url = auth.logout(name_id=args.identifier, session_index=session_index) request_id = auth.get_last_request_id() idp_url = settings.get_idp_data()['singleLogoutService']['url'] cookies = read_cookies(idp_url, args.base, args.identifier) fake_request['get_data'] = logout_request(url, cookies) # # Send the followup requests until we reach the end. # while True: auth = OneLogin_Saml2_Auth(fake_request, settings) url = auth.process_slo(keep_local_session=True, request_id=request_id) if len(auth.get_errors()) == 0: if url is not None: fake_request['get_data'] = logout_request(url, cookies) else: # successfully logged out logging.info('logout successful') break else: status = get_logout_status(settings, fake_request['get_data']) if status == OneLogin_Saml2_Constants.STATUS_PARTIAL_LOGOUT: # partially logged out logging.info('partial logout') break logging.error('\n'.join(auth.get_errors())) logging.error('error reason: %s', auth.get_last_error_reason()) sys.stderr.write('\n'.join(auth.get_errors())) sys.stderr.write('error reason: {}\n'.format(auth.get_last_error_reason())) sys.exit(1) except Exception as e: logging.exception(e) sys.stderr.write('{err}\n'.format(err=e)) sys.exit(1) if __name__ == "__main__": main()