#!/usr/bin/env python """Initiate SAML logout from service. This script serves as an example of how to initiate a logout from the service. It is expected to be run by the Perforce server as an auth-invalidate trigger. The one command line argument is the Perforce user name or email address. """ # Copyright (c) 2018 Perforce Software. All rights reserved. import argparse import base64 import gzip import io import json import logging import os import sys import zlib import six import six.moves.http_client as httplib from six.moves.html_parser import HTMLParser import six.moves.http_cookiejar as cookielib import six.moves.urllib as urllib from six.moves.urllib import parse as urlparse from onelogin.saml2.auth import OneLogin_Saml2_Auth from onelogin.saml2.constants import OneLogin_Saml2_Constants from onelogin.saml2.idp_metadata_parser import OneLogin_Saml2_IdPMetadataParser from onelogin.saml2.logout_response import OneLogin_Saml2_Logout_Response from onelogin.saml2.logout_request import OneLogin_Saml2_Logout_Request from onelogin.saml2.settings import OneLogin_Saml2_Settings from onelogin.saml2.utils import OneLogin_Saml2_Utils XML_NAMESPACES = { 'saml': OneLogin_Saml2_Constants.NS_SAML, 'samlp': OneLogin_Saml2_Constants.NS_SAMLP } def encode_nameid(name_id): """Produce a base64 encoded version of name_id.""" if six.PY3: return base64.urlsafe_b64encode(name_id.encode('utf8')).decode('utf8') else: return base64.urlsafe_b64encode(name_id) def load_settings(idp_url): """Build out the settings for SAML library.""" # Elaborate process to construct settings that have everything python3-saml # is expecting, and has the correct IdP certificate. if idp_url: settings = OneLogin_Saml2_IdPMetadataParser.parse_remote(idp_url) else: settings = dict() base_path = os.path.dirname(sys.argv[0]) with open(os.path.join(base_path, 'settings.json')) as fobj: settings.update(json.load(fobj)) with open(os.path.join(base_path, 'advanced_settings.json')) as fobj: settings.update(json.load(fobj)) # Set the path for finding the 'certs' directory, otherwise it defaults # to the installation location of the pysaml module. return OneLogin_Saml2_Settings(settings, custom_base_path=base_path) def make_fake_request(settings): """Build a fake request object for python3-saml to use.""" sp = settings.get_sp_data() sls_url = sp['singleLogoutService']['url'] if sls_url is None: sys.stderr.write('missing SP SLS URL in configuration\n') sys.exit(2) result = urlparse.urlparse(sls_url) return { 'server_port': result.port, 'script_name': result.path, 'path_info': '', 'server_name': 'example.com', 'http_host': result.hostname, 'https': 'off' if result.scheme == 'http' else 'on' } class MyCookiePolicy(cookielib.CookiePolicy): """Very liberal cookie policy.""" def __init__(self): """Initialize MyCookiePolicy.""" self.rfc2965 = False self.netscape = True self.rfc2109_as_netscape = None self.hide_cookie2 = False self.strict_domain = False self.strict_rfc2965_unverifiable = True self.strict_ns_unverifiable = False self.strict_ns_domain = 0 self.strict_ns_set_initial_dollar = False self.strict_ns_set_path = False def set_ok(self, cookie, request): """Return true if (and only if) cookie should be accepted from server.""" return True def return_ok(self, cookie, request): """Return true if (and only if) cookie should be returned to server.""" return True def make_cookie(cookie, port): """Construct a cookielib.Cookie instance.""" return cookielib.Cookie( 0, # version cookie['name'], cookie['value'], str(port) if port else None, port is not None, cookie['domain'], True, # domain_specified cookie['domain'][0] == '.', cookie.get('path'), 'path' in cookie, cookie['secure'], cookie['expirationDate'], False, # discard None, # comment None, # comment_url dict() # rest ) def read_cookies(idp_url, base_dir, name_id): """Read the cookies.txt file into a cookiejar instance.""" # cookies should have a specific port, use the IdP URL port = urlparse.urlparse(idp_url).port or None cookies = cookielib.CookieJar() cookies.set_policy(MyCookiePolicy()) cookies_dir = os.path.expanduser(os.path.join(base_dir, 'cookies')) cookie_path = os.path.join(cookies_dir, encode_nameid(name_id)) if os.path.exists(cookie_path): with open(cookie_path) as fobj: loaded = json.load(fobj) for cookie in loaded: cookies.set_cookie(make_cookie(cookie, port)) return cookies def read_session_index(base_path, name_id): """Read the session index value for the named user.""" session_dir = os.path.expanduser(os.path.join(base_path, 'sessions')) session_path = os.path.join(session_dir, encode_nameid(name_id)) if os.path.exists(session_path): with open(session_path) as fobj: value = fobj.read() return value return None def get_attr(attrs, name, fallback=None): """Extract the named attribute, if any, or return fallback.""" for (key, value) in attrs: if key == name: return value return fallback class IdHTMLParser(HTMLParser): """Parser for HTML response from identity provider.""" def __init__(self): """Initialize the parser.""" HTMLParser.__init__(self) self.get_inputs = False self.inputs = None self.form_action = None def handle_starttag(self, tag, attrs): """Start of a tag.""" if self.get_inputs and tag == 'input' and get_attr(attrs, 'type', '').lower() == 'hidden': name = get_attr(attrs, 'name') value = get_attr(attrs, 'value') self.inputs[name] = value if tag == 'form' and get_attr(attrs, 'method', '').lower() == 'post': self.form_action = get_attr(attrs, 'action') self.get_inputs = True self.inputs = dict() def handle_endtag(self, tag): """End of a tag.""" if tag == 'form' and self.get_inputs: self.get_inputs = False def handle_data(self, data): """Content within a tag.""" pass def extract_form_values(resp): """Extract the hidden input name/values pairs from the form in the HTML response.""" parser = IdHTMLParser() parser.feed(resp) parser.close() return parser.inputs def logout_request(url, cookies): """Send the logout request to the IdP and return the form values.""" opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookies)) logging.debug('logout request URL: %s', url) response = opener.open(url) data = response.read() if logging.getLogger().isEnabledFor(logging.DEBUG): logging.debug('logout response headers: %s', response.info().headers) encoding = response.info().get('Content-Encoding') if encoding and 'gzip' in encoding: fobj = gzip.GzipFile(mode='r', fileobj=io.BytesIO(data)) data = fobj.read() if six.PY3: data = data.decode('utf8') form_values = extract_form_values(data) if form_values is None: logging.warning('no form values in response: %s', data) else: logging.debug('response form values: %s', form_values) ensure_deflated(form_values, 'SAMLResponse') return form_values def ensure_deflated(values, name): """If not already deflated, deflate the named value in-place.""" if not values: return if name not in values: return value = values[name] decoded = base64.b64decode(value) try: # negative bit width to indicate there is no header to the compressed data zlib.decompress(decoded, -15) except zlib.error: data = decoded.encode('utf8') if six.PY2 else decoded deflated = zlib.compress(data)[2:-4] values[name] = base64.b64encode(deflated) def get_logout_status(settings, get_data): """Extract the status of the logout response.""" logout_response = OneLogin_Saml2_Logout_Response(settings, get_data['SAMLResponse']) return logout_response.get_status() def configure_logging(base_dir, debug=False): """Configure logging for the trigger.""" if debug: logs_path = os.path.expanduser(os.path.join(base_dir, 'logs')) if not os.path.exists(logs_path): os.makedirs(logs_path) log_filename = os.path.join(logs_path, 'logout.log') logging.basicConfig(filename=log_filename, filemode='w', level=logging.DEBUG) else: logging.basicConfig(stream=open(os.devnull, 'w'), level=logging.CRITICAL) def main(): """Read the SAML response and verify user match.""" parser = argparse.ArgumentParser(description='Initiate a SAML logout request.') parser.add_argument('--debug', action='store_true', help='enable logging to file') parser.add_argument('--idpUrl', help='URL of IdP metadata') parser.add_argument('--base', default='~/saml', help='base path for trigger to store files') parser.add_argument('identifier', help='Perforce user User or Email field value') args = parser.parse_args() configure_logging(args.base, debug=args.debug) settings = load_settings(args.idpUrl) # # How to send the logout request using POST method. # # auth = OneLogin_Saml2_Auth(FAKE_REQUEST, settings) # session_index = read_session_index(args.identifier) # logout_request = OneLogin_Saml2_Logout_Request( # settings, name_id=args.identifier, session_index=session_index) # request_xml = logout_request.get_xml() # signed_xml = OneLogin_Saml2_Utils.add_sign( # request_xml, # settings.get_sp_key(), # settings.get_sp_cert(), # debug=args.debug, # sign_algorithm=settings.get_security_data()['signatureAlgorithm'], # digest_algorithm=settings.get_security_data()['digestAlgorithm']) # params = urllib.urlencode({'SAMLRequest': signed_xml}) # headers = { # "Content-type": "application/x-www-form-urlencoded", # "Accept": "text/plain" # } # url_parts = urlparse.urlparse(auth.get_slo_url()) # if url_parts.scheme == 'http': # conn = httplib.HTTPConnection(url_parts.netloc, url_parts.port or 80) # elif url_parts.scheme == 'https': # conn = httplib.HTTPSConnection(url_parts.netloc, url_parts.port or 443) # else: # raise Exception('Unsupported scheme: {0}'.format(url_parts.scheme)) # conn.request("POST", url_parts.path, params, headers) # response = conn.getresponse() # print(response.status, response.reason) # print(response.getheaders()) # print(response.read()) # conn.close() try: # # Make the initial logout request to get things started. # fake_request = make_fake_request(settings) auth = OneLogin_Saml2_Auth(fake_request, settings) session_index = read_session_index(args.base, args.identifier) url = auth.logout(name_id=args.identifier, session_index=session_index) request_id = auth.get_last_request_id() idp_url = settings.get_idp_data()['singleLogoutService']['url'] cookies = read_cookies(idp_url, args.base, args.identifier) fake_request['get_data'] = logout_request(url, cookies) # # Send the followup requests until we reach the end. # while True: auth = OneLogin_Saml2_Auth(fake_request, settings) url = auth.process_slo(keep_local_session=True, request_id=request_id) if len(auth.get_errors()) == 0: if url is not None: fake_request['get_data'] = logout_request(url, cookies) else: # successfully logged out logging.info('logout successful') break else: status = get_logout_status(settings, fake_request['get_data']) if status == OneLogin_Saml2_Constants.STATUS_PARTIAL_LOGOUT: # partially logged out logging.info('partial logout') break logging.error('\n'.join(auth.get_errors())) logging.error('error reason: %s', auth.get_last_error_reason()) sys.stderr.write('\n'.join(auth.get_errors())) sys.stderr.write('error reason: {}\n'.format(auth.get_last_error_reason())) sys.exit(1) except Exception as e: logging.exception(e) sys.stderr.write('{err}\n'.format(err=e)) sys.exit(1) if __name__ == "__main__": main()
# | Change | User | Description | Committed | |
---|---|---|---|---|---|
#6 | 25196 | Nathan Fiedler | Retry and then raise the error if unsuccesful. | ||
#5 | 25195 | Nathan Fiedler |
Retry IdP metadata fetch a couple of times. To address empheral errors when retrieving the IdP metadata from the remote side, retry the request 3 times with a 1 second pause inbetween. |
||
#4 | 24988 | Nathan Fiedler |
Add proper logout support for Okta. Use the Okta API to perform the logout, as that works much better given our unusual scenario of operating outside of the web browser. Added the requests library as a dependency as that permits us to send a DELETE request with cookies, while the Python standard library does not. |
||
#3 | 24952 | Nathan Fiedler | Update the POST version of the logout. | ||
#2 | 24933 | Nathan Fiedler | Add copyright and license details for SAML triggers. | ||
#1 | 24785 | Nathan Fiedler | Add SAML triggers. |