#!/usr/bin/env python """Initiate SAML logout from service. This script serves as an example of how to initiate a logout from the service. It is expected to be run by the Perforce server as an auth-invalidate trigger. The one command line argument is the Perforce user name or email address. """ # Copyright and license info is available in the LICENSE file. import argparse import base64 import gzip import io import json import logging import os import sys import zlib import six import six.moves.http_client as httplib from six.moves.html_parser import HTMLParser import six.moves.http_cookiejar as cookielib import six.moves.urllib as urllib from six.moves.urllib import parse as urlparse from onelogin.saml2.auth import OneLogin_Saml2_Auth from onelogin.saml2.constants import OneLogin_Saml2_Constants from onelogin.saml2.idp_metadata_parser import OneLogin_Saml2_IdPMetadataParser from onelogin.saml2.logout_response import OneLogin_Saml2_Logout_Response from onelogin.saml2.logout_request import OneLogin_Saml2_Logout_Request from onelogin.saml2.settings import OneLogin_Saml2_Settings from onelogin.saml2.utils import OneLogin_Saml2_Utils XML_NAMESPACES = { 'saml': OneLogin_Saml2_Constants.NS_SAML, 'samlp': OneLogin_Saml2_Constants.NS_SAMLP } def encode_nameid(name_id): """Produce a base64 encoded version of name_id.""" if six.PY3: return base64.urlsafe_b64encode(name_id.encode('utf8')).decode('utf8') else: return base64.urlsafe_b64encode(name_id) def load_settings(idp_url): """Build out the settings for SAML library.""" # Elaborate process to construct settings that have everything python3-saml # is expecting, and has the correct IdP certificate. if idp_url: settings = OneLogin_Saml2_IdPMetadataParser.parse_remote(idp_url) else: settings = dict() base_path = os.path.dirname(sys.argv[0]) with open(os.path.join(base_path, 'settings.json')) as fobj: settings.update(json.load(fobj)) with open(os.path.join(base_path, 'advanced_settings.json')) as fobj: settings.update(json.load(fobj)) # Set the path for finding the 'certs' directory, otherwise it defaults # to the installation location of the pysaml module. return OneLogin_Saml2_Settings(settings, custom_base_path=base_path) def make_fake_request(settings): """Build a fake request object for python3-saml to use.""" sp = settings.get_sp_data() sls_url = sp['singleLogoutService']['url'] if sls_url is None: sys.stderr.write('missing SP SLS URL in configuration\n') sys.exit(2) result = urlparse.urlparse(sls_url) return { 'server_port': result.port, 'script_name': result.path, 'path_info': '', 'server_name': 'example.com', 'http_host': result.hostname, 'https': 'off' if result.scheme == 'http' else 'on' } class MyCookiePolicy(cookielib.CookiePolicy): """Very liberal cookie policy.""" def __init__(self): """Initialize MyCookiePolicy.""" self.rfc2965 = False self.netscape = True self.rfc2109_as_netscape = None self.hide_cookie2 = False self.strict_domain = False self.strict_rfc2965_unverifiable = True self.strict_ns_unverifiable = False self.strict_ns_domain = 0 self.strict_ns_set_initial_dollar = False self.strict_ns_set_path = False def set_ok(self, cookie, request): """Return true if (and only if) cookie should be accepted from server.""" return True def return_ok(self, cookie, request): """Return true if (and only if) cookie should be returned to server.""" return True def make_cookie(cookie, port): """Construct a cookielib.Cookie instance.""" return cookielib.Cookie( 0, # version cookie['name'], cookie['value'], str(port) if port else None, port is not None, cookie['domain'], True, # domain_specified cookie['domain'][0] == '.', cookie.get('path'), 'path' in cookie, cookie['secure'], cookie['expirationDate'], False, # discard None, # comment None, # comment_url dict() # rest ) def read_cookies(idp_url, base_dir, name_id): """Read the cookies.txt file into a cookiejar instance.""" # cookies should have a specific port, use the IdP URL port = urlparse.urlparse(idp_url).port or None cookies = cookielib.CookieJar() cookies.set_policy(MyCookiePolicy()) cookies_dir = os.path.expanduser(os.path.join(base_dir, 'cookies')) cookie_path = os.path.join(cookies_dir, encode_nameid(name_id)) if os.path.exists(cookie_path): with open(cookie_path) as fobj: loaded = json.load(fobj) for cookie in loaded: cookies.set_cookie(make_cookie(cookie, port)) return cookies def read_session_index(base_path, name_id): """Read the session index value for the named user.""" session_dir = os.path.expanduser(os.path.join(base_path, 'sessions')) session_path = os.path.join(session_dir, encode_nameid(name_id)) if os.path.exists(session_path): with open(session_path) as fobj: value = fobj.read() return value return None def get_attr(attrs, name, fallback=None): """Extract the named attribute, if any, or return fallback.""" for (key, value) in attrs: if key == name: return value return fallback class IdHTMLParser(HTMLParser): """Parser for HTML response from identity provider.""" def __init__(self): """Initialize the parser.""" HTMLParser.__init__(self) self.get_inputs = False self.inputs = None self.form_action = None def handle_starttag(self, tag, attrs): """Start of a tag.""" if self.get_inputs and tag == 'input' and get_attr(attrs, 'type', '').lower() == 'hidden': name = get_attr(attrs, 'name') value = get_attr(attrs, 'value') self.inputs[name] = value if tag == 'form' and get_attr(attrs, 'method', '').lower() == 'post': self.form_action = get_attr(attrs, 'action') self.get_inputs = True self.inputs = dict() def handle_endtag(self, tag): """End of a tag.""" if tag == 'form' and self.get_inputs: self.get_inputs = False def handle_data(self, data): """Content within a tag.""" pass def extract_form_values(resp): """Extract the hidden input name/values pairs from the form in the HTML response.""" parser = IdHTMLParser() parser.feed(resp) parser.close() return parser.inputs def logout_request(url, cookies): """Send the logout request to the IdP and return the form values.""" opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookies)) logging.debug('logout request URL: %s', url) response = opener.open(url) data = response.read() if logging.getLogger().isEnabledFor(logging.DEBUG): logging.debug('logout response headers: %s', response.info().headers) encoding = response.info().get('Content-Encoding') if encoding and 'gzip' in encoding: fobj = gzip.GzipFile(mode='r', fileobj=io.BytesIO(data)) data = fobj.read() if six.PY3: data = data.decode('utf8') form_values = extract_form_values(data) if form_values is None: logging.warning('no form values in response: %s', data) else: logging.debug('response form values: %s', form_values) ensure_deflated(form_values, 'SAMLResponse') return form_values def ensure_deflated(values, name): """If not already deflated, deflate the named value in-place.""" if not values: return if name not in values: return value = values[name] decoded = base64.b64decode(value) try: # negative bit width to indicate there is no header to the compressed data zlib.decompress(decoded, -15) except zlib.error: data = decoded.encode('utf8') if six.PY2 else decoded deflated = zlib.compress(data)[2:-4] values[name] = base64.b64encode(deflated) def get_logout_status(settings, get_data): """Extract the status of the logout response.""" logout_response = OneLogin_Saml2_Logout_Response(settings, get_data['SAMLResponse']) return logout_response.get_status() def configure_logging(base_dir, debug=False): """Configure logging for the trigger.""" if debug: logs_path = os.path.expanduser(os.path.join(base_dir, 'logs')) if not os.path.exists(logs_path): os.makedirs(logs_path) log_filename = os.path.join(logs_path, 'logout.log') logging.basicConfig(filename=log_filename, filemode='w', level=logging.DEBUG) else: logging.basicConfig(stream=open(os.devnull, 'w'), level=logging.CRITICAL) def main(): """Read the SAML response and verify user match.""" parser = argparse.ArgumentParser(description='Initiate a SAML logout request.') parser.add_argument('--debug', action='store_true', help='enable logging to file') parser.add_argument('--idpUrl', help='URL of IdP metadata') parser.add_argument('--base', default='~/saml', help='base path for trigger to store files') parser.add_argument('identifier', help='Perforce user User or Email field value') args = parser.parse_args() configure_logging(args.base, debug=args.debug) settings = load_settings(args.idpUrl) # # How to send the logout request using POST method. # # try: # fake_request = make_fake_request(settings) # auth = OneLogin_Saml2_Auth(fake_request, settings) # session_index = read_session_index(args.base, args.identifier) # logout_request = OneLogin_Saml2_Logout_Request( # settings, name_id=args.identifier, session_index=session_index) # request_xml = logout_request.get_xml() # signed_xml = OneLogin_Saml2_Utils.add_sign( # request_xml, # settings.get_sp_key(), # settings.get_sp_cert(), # debug=args.debug, # sign_algorithm=settings.get_security_data()['signatureAlgorithm'], # digest_algorithm=settings.get_security_data()['digestAlgorithm']) # params = urllib.parse.urlencode({'SAMLRequest': signed_xml}) # headers = { # "Content-type": "application/x-www-form-urlencoded", # "Accept": "text/plain" # } # url_parts = urlparse.urlparse(auth.get_slo_url()) # if url_parts.scheme == 'http': # conn = httplib.HTTPConnection(url_parts.netloc, url_parts.port or 80) # elif url_parts.scheme == 'https': # conn = httplib.HTTPSConnection(url_parts.netloc, url_parts.port or 443) # else: # raise Exception('Unsupported scheme: {0}'.format(url_parts.scheme)) # conn.request("POST", url_parts.path, params, headers) # response = conn.getresponse() # logging.info("response: %s %s", response.status, response.reason) # logging.info("headers: %s", response.getheaders()) # logging.info("body: %s", response.read()) # conn.close() # except Exception as e: # logging.exception(e) # sys.stderr.write('{err}\n'.format(err=e)) # sys.exit(1) try: # # Make the initial logout request to get things started. # fake_request = make_fake_request(settings) auth = OneLogin_Saml2_Auth(fake_request, settings) session_index = read_session_index(args.base, args.identifier) url = auth.logout(name_id=args.identifier, session_index=session_index) request_id = auth.get_last_request_id() idp_url = settings.get_idp_data()['singleLogoutService']['url'] cookies = read_cookies(idp_url, args.base, args.identifier) fake_request['get_data'] = logout_request(url, cookies) # # Send the followup requests until we reach the end. # while True: auth = OneLogin_Saml2_Auth(fake_request, settings) url = auth.process_slo(keep_local_session=True, request_id=request_id) if len(auth.get_errors()) == 0: if url is not None: fake_request['get_data'] = logout_request(url, cookies) else: # successfully logged out logging.info('logout successful') break else: status = get_logout_status(settings, fake_request['get_data']) if status == OneLogin_Saml2_Constants.STATUS_PARTIAL_LOGOUT: # partially logged out logging.info('partial logout') break logging.error('\n'.join(auth.get_errors())) logging.error('error reason: %s', auth.get_last_error_reason()) sys.stderr.write('\n'.join(auth.get_errors())) sys.stderr.write('error reason: {}\n'.format(auth.get_last_error_reason())) sys.exit(1) except Exception as e: logging.exception(e) sys.stderr.write('{err}\n'.format(err=e)) sys.exit(1) if __name__ == "__main__": main()