#!/usr/bin/env python
"""Initiate SAML logout from service.
This script serves as an example of how to initiate a logout from the
service. It is expected to be run by the Perforce server as an
auth-invalidate trigger. The one command line argument is the Perforce user
name or email address.
"""
# Copyright (c) 2018 Perforce Software. All rights reserved.
import argparse
import base64
import gzip
import io
import json
import logging
import os
import sys
import zlib
import six
import six.moves.http_client as httplib
from six.moves.html_parser import HTMLParser
import six.moves.http_cookiejar as cookielib
import six.moves.urllib as urllib
from six.moves.urllib import parse as urlparse
from onelogin.saml2.auth import OneLogin_Saml2_Auth
from onelogin.saml2.constants import OneLogin_Saml2_Constants
from onelogin.saml2.idp_metadata_parser import OneLogin_Saml2_IdPMetadataParser
from onelogin.saml2.logout_response import OneLogin_Saml2_Logout_Response
from onelogin.saml2.logout_request import OneLogin_Saml2_Logout_Request
from onelogin.saml2.settings import OneLogin_Saml2_Settings
from onelogin.saml2.utils import OneLogin_Saml2_Utils
XML_NAMESPACES = {
'saml': OneLogin_Saml2_Constants.NS_SAML,
'samlp': OneLogin_Saml2_Constants.NS_SAMLP
}
def encode_nameid(name_id):
"""Produce a base64 encoded version of name_id."""
if six.PY3:
return base64.urlsafe_b64encode(name_id.encode('utf8')).decode('utf8')
else:
return base64.urlsafe_b64encode(name_id)
def load_settings(idp_url):
"""Build out the settings for SAML library."""
# Elaborate process to construct settings that have everything python3-saml
# is expecting, and has the correct IdP certificate.
if idp_url:
settings = OneLogin_Saml2_IdPMetadataParser.parse_remote(idp_url)
else:
settings = dict()
base_path = os.path.dirname(sys.argv[0])
with open(os.path.join(base_path, 'settings.json')) as fobj:
settings.update(json.load(fobj))
with open(os.path.join(base_path, 'advanced_settings.json')) as fobj:
settings.update(json.load(fobj))
# Set the path for finding the 'certs' directory, otherwise it defaults
# to the installation location of the pysaml module.
return OneLogin_Saml2_Settings(settings, custom_base_path=base_path)
def make_fake_request(settings):
"""Build a fake request object for python3-saml to use."""
sp = settings.get_sp_data()
sls_url = sp['singleLogoutService']['url']
if sls_url is None:
sys.stderr.write('missing SP SLS URL in configuration\n')
sys.exit(2)
result = urlparse.urlparse(sls_url)
return {
'server_port': result.port,
'script_name': result.path,
'path_info': '',
'server_name': 'example.com',
'http_host': result.hostname,
'https': 'off' if result.scheme == 'http' else 'on'
}
class MyCookiePolicy(cookielib.CookiePolicy):
"""Very liberal cookie policy."""
def __init__(self):
"""Initialize MyCookiePolicy."""
self.rfc2965 = False
self.netscape = True
self.rfc2109_as_netscape = None
self.hide_cookie2 = False
self.strict_domain = False
self.strict_rfc2965_unverifiable = True
self.strict_ns_unverifiable = False
self.strict_ns_domain = 0
self.strict_ns_set_initial_dollar = False
self.strict_ns_set_path = False
def set_ok(self, cookie, request):
"""Return true if (and only if) cookie should be accepted from server."""
return True
def return_ok(self, cookie, request):
"""Return true if (and only if) cookie should be returned to server."""
return True
def make_cookie(cookie, port):
"""Construct a cookielib.Cookie instance."""
return cookielib.Cookie(
0, # version
cookie['name'],
cookie['value'],
str(port) if port else None,
port is not None,
cookie['domain'],
True, # domain_specified
cookie['domain'][0] == '.',
cookie.get('path'),
'path' in cookie,
cookie['secure'],
cookie['expirationDate'],
False, # discard
None, # comment
None, # comment_url
dict() # rest
)
def read_cookies(idp_url, base_dir, name_id):
"""Read the cookies.txt file into a cookiejar instance."""
# cookies should have a specific port, use the IdP URL
port = urlparse.urlparse(idp_url).port or None
cookies = cookielib.CookieJar()
cookies.set_policy(MyCookiePolicy())
cookies_dir = os.path.expanduser(os.path.join(base_dir, 'cookies'))
cookie_path = os.path.join(cookies_dir, encode_nameid(name_id))
if os.path.exists(cookie_path):
with open(cookie_path) as fobj:
loaded = json.load(fobj)
for cookie in loaded:
cookies.set_cookie(make_cookie(cookie, port))
return cookies
def read_session_index(base_path, name_id):
"""Read the session index value for the named user."""
session_dir = os.path.expanduser(os.path.join(base_path, 'sessions'))
session_path = os.path.join(session_dir, encode_nameid(name_id))
if os.path.exists(session_path):
with open(session_path) as fobj:
value = fobj.read()
return value
return None
def get_attr(attrs, name, fallback=None):
"""Extract the named attribute, if any, or return fallback."""
for (key, value) in attrs:
if key == name:
return value
return fallback
class IdHTMLParser(HTMLParser):
"""Parser for HTML response from identity provider."""
def __init__(self):
"""Initialize the parser."""
HTMLParser.__init__(self)
self.get_inputs = False
self.inputs = None
self.form_action = None
def handle_starttag(self, tag, attrs):
"""Start of a tag."""
if self.get_inputs and tag == 'input' and get_attr(attrs, 'type', '').lower() == 'hidden':
name = get_attr(attrs, 'name')
value = get_attr(attrs, 'value')
self.inputs[name] = value
if tag == 'form' and get_attr(attrs, 'method', '').lower() == 'post':
self.form_action = get_attr(attrs, 'action')
self.get_inputs = True
self.inputs = dict()
def handle_endtag(self, tag):
"""End of a tag."""
if tag == 'form' and self.get_inputs:
self.get_inputs = False
def handle_data(self, data):
"""Content within a tag."""
pass
def extract_form_values(resp):
"""Extract the hidden input name/values pairs from the form in the HTML response."""
parser = IdHTMLParser()
parser.feed(resp)
parser.close()
return parser.inputs
def logout_request(url, cookies):
"""Send the logout request to the IdP and return the form values."""
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookies))
logging.debug('logout request URL: %s', url)
response = opener.open(url)
data = response.read()
if logging.getLogger().isEnabledFor(logging.DEBUG):
logging.debug('logout response headers: %s', response.info().headers)
encoding = response.info().get('Content-Encoding')
if encoding and 'gzip' in encoding:
fobj = gzip.GzipFile(mode='r', fileobj=io.BytesIO(data))
data = fobj.read()
if six.PY3:
data = data.decode('utf8')
form_values = extract_form_values(data)
if form_values is None:
logging.warning('no form values in response: %s', data)
else:
logging.debug('response form values: %s', form_values)
ensure_deflated(form_values, 'SAMLResponse')
return form_values
def ensure_deflated(values, name):
"""If not already deflated, deflate the named value in-place."""
if not values:
return
if name not in values:
return
value = values[name]
decoded = base64.b64decode(value)
try:
# negative bit width to indicate there is no header to the compressed data
zlib.decompress(decoded, -15)
except zlib.error:
data = decoded.encode('utf8') if six.PY2 else decoded
deflated = zlib.compress(data)[2:-4]
values[name] = base64.b64encode(deflated)
def get_logout_status(settings, get_data):
"""Extract the status of the logout response."""
logout_response = OneLogin_Saml2_Logout_Response(settings, get_data['SAMLResponse'])
return logout_response.get_status()
def configure_logging(base_dir, debug=False):
"""Configure logging for the trigger."""
if debug:
logs_path = os.path.expanduser(os.path.join(base_dir, 'logs'))
if not os.path.exists(logs_path):
os.makedirs(logs_path)
log_filename = os.path.join(logs_path, 'logout.log')
logging.basicConfig(filename=log_filename, filemode='w', level=logging.DEBUG)
else:
logging.basicConfig(stream=open(os.devnull, 'w'), level=logging.CRITICAL)
def main():
"""Read the SAML response and verify user match."""
parser = argparse.ArgumentParser(description='Initiate a SAML logout request.')
parser.add_argument('--debug', action='store_true', help='enable logging to file')
parser.add_argument('--idpUrl', help='URL of IdP metadata')
parser.add_argument('--base', default='~/saml', help='base path for trigger to store files')
parser.add_argument('identifier', help='Perforce user User or Email field value')
args = parser.parse_args()
configure_logging(args.base, debug=args.debug)
settings = load_settings(args.idpUrl)
#
# How to send the logout request using POST method.
#
# auth = OneLogin_Saml2_Auth(FAKE_REQUEST, settings)
# session_index = read_session_index(args.identifier)
# logout_request = OneLogin_Saml2_Logout_Request(
# settings, name_id=args.identifier, session_index=session_index)
# request_xml = logout_request.get_xml()
# signed_xml = OneLogin_Saml2_Utils.add_sign(
# request_xml,
# settings.get_sp_key(),
# settings.get_sp_cert(),
# debug=args.debug,
# sign_algorithm=settings.get_security_data()['signatureAlgorithm'],
# digest_algorithm=settings.get_security_data()['digestAlgorithm'])
# params = urllib.urlencode({'SAMLRequest': signed_xml})
# headers = {
# "Content-type": "application/x-www-form-urlencoded",
# "Accept": "text/plain"
# }
# url_parts = urlparse.urlparse(auth.get_slo_url())
# if url_parts.scheme == 'http':
# conn = httplib.HTTPConnection(url_parts.netloc, url_parts.port or 80)
# elif url_parts.scheme == 'https':
# conn = httplib.HTTPSConnection(url_parts.netloc, url_parts.port or 443)
# else:
# raise Exception('Unsupported scheme: {0}'.format(url_parts.scheme))
# conn.request("POST", url_parts.path, params, headers)
# response = conn.getresponse()
# print(response.status, response.reason)
# print(response.getheaders())
# print(response.read())
# conn.close()
try:
#
# Make the initial logout request to get things started.
#
fake_request = make_fake_request(settings)
auth = OneLogin_Saml2_Auth(fake_request, settings)
session_index = read_session_index(args.base, args.identifier)
url = auth.logout(name_id=args.identifier, session_index=session_index)
request_id = auth.get_last_request_id()
idp_url = settings.get_idp_data()['singleLogoutService']['url']
cookies = read_cookies(idp_url, args.base, args.identifier)
fake_request['get_data'] = logout_request(url, cookies)
#
# Send the followup requests until we reach the end.
#
while True:
auth = OneLogin_Saml2_Auth(fake_request, settings)
url = auth.process_slo(keep_local_session=True, request_id=request_id)
if len(auth.get_errors()) == 0:
if url is not None:
fake_request['get_data'] = logout_request(url, cookies)
else:
# successfully logged out
logging.info('logout successful')
break
else:
status = get_logout_status(settings, fake_request['get_data'])
if status == OneLogin_Saml2_Constants.STATUS_PARTIAL_LOGOUT:
# partially logged out
logging.info('partial logout')
break
logging.error('\n'.join(auth.get_errors()))
logging.error('error reason: %s', auth.get_last_error_reason())
sys.stderr.write('\n'.join(auth.get_errors()))
sys.stderr.write('error reason: {}\n'.format(auth.get_last_error_reason()))
sys.exit(1)
except Exception as e:
logging.exception(e)
sys.stderr.write('{err}\n'.format(err=e))
sys.exit(1)
if __name__ == "__main__":
main()
| # | Change | User | Description | Committed | |
|---|---|---|---|---|---|
| #6 | 25196 | Nathan Fiedler | Retry and then raise the error if unsuccesful. | ||
| #5 | 25195 | Nathan Fiedler |
Retry IdP metadata fetch a couple of times. To address empheral errors when retrieving the IdP metadata from the remote side, retry the request 3 times with a 1 second pause inbetween. |
||
| #4 | 24988 | Nathan Fiedler |
Add proper logout support for Okta. Use the Okta API to perform the logout, as that works much better given our unusual scenario of operating outside of the web browser. Added the requests library as a dependency as that permits us to send a DELETE request with cookies, while the Python standard library does not. |
||
| #3 | 24952 | Nathan Fiedler | Update the POST version of the logout. | ||
| #2 | 24933 | Nathan Fiedler | Add copyright and license details for SAML triggers. | ||
| #1 | 24785 | Nathan Fiedler | Add SAML triggers. |