authCheckTrigger.py #1

  • //
  • guest/
  • perforce_software/
  • sdp/
  • dev/
  • Server/
  • Unix/
  • p4/
  • common/
  • bin/
  • triggers/
  • authCheckTrigger.py
  • View
  • Commits
  • Open Download .zip Download (10 KB)
#!/usr/bin/env python
# Copyright (c) 2014 Perforce Software, Inc.  Provided for use as defined in
# the Perforce Consulting Services Agreement.

import argparse
import bcrypt # install from https://code.google.com/p/py-bcrypt/
import ConfigParser
import sys
import getpass
import ldap
import logging
import marshal
import os
from os import remove, close
from shutil import move
from subprocess import Popen, PIPE, STDOUT
from tempfile import mkstemp

CONFIG = ConfigParser.ConfigParser()

# Configuration values
TIMEOUT = 10
LOCAL_PASSWORD_FILE = None
P4 = None
P4PORT = None
P4USER = None
P4TICKETS = None

# p4MarshalCmd
#     executes the p4 command, results sent to a list
def p4MarshalCmd(cmd,quiet=False):
	logging.debug("p4MarshalCmd()")
	env = os.environ.copy()
	# set the P4TICKETS environment variable if supplied
	if(P4TICKETS is not None):
		logging.debug("Setting P4TICKETS={0}".format(P4TICKETS))
		env['P4TICKETS'] = P4TICKETS
	else:
		logging.debug("P4TICKETS unset")

	if not quiet:
		logging.debug("p4 {0}".format(" ".join([P4, "-p", P4PORT, "-u", P4USER, "-G"] + cmd)))
	list = []
	pipe = Popen([P4, "-p", P4PORT, "-u", P4USER, "-G"] + cmd, env=env, stdout=PIPE).stdout
	try:
		while 1:
			record = marshal.load(pipe)
			list.append(record)
	except EOFError:
		pass
	pipe.close()
	return list

# updateLocalPassword
#     adds an entry in the local password file for the specified user account
def updateLocalPassword(username):
	logging.debug("updateLocalPassword()")
	# get the password from the user
	password = getpass.getpass("Enter password (will not appear) > ")
	reenter = getpass.getpass("Type it again (will not appear) > ")
	# check for match
	if(not password == reenter):
		print("Passwords do not match")
		sys.exit(1)
	# encrypt and hash the password
	hashed = bcrypt.hashpw(password, bcrypt.gensalt(10))
	passfileLine = "%s:%s\n"%(username,hashed)
	fh, abs_path = mkstemp()
	new_file = open(abs_path,'w')
	if (not os.path.exists(LOCAL_PASSWORD_FILE)):
		open(LOCAL_PASSWORD_FILE, 'a').close()
	old_file = open(LOCAL_PASSWORD_FILE)
	foundOldEntry = False
	for line in old_file:
		if len(line.strip()) == 0:
			continue
		if line.startswith("%s:"%username):
			new_file.write(passfileLine)
			foundOldEntry = True
		else:
			new_file.write(line)
	if not foundOldEntry:
		new_file.write(passfileLine)
	# close temp file
	new_file.close()
	close(fh)
	old_file.close()
	# Remove original password file
	remove(LOCAL_PASSWORD_FILE)
	# Move new file
	move(abs_path, LOCAL_PASSWORD_FILE)

# localUserExists
#      checks to see if the user exists in the local password file. returns True or False
def localUserExists(username):
	logging.debug("localUserExists()")
	exists = False
	if LOCAL_PASSWORD_FILE is not None and os.path.isfile(LOCAL_PASSWORD_FILE):
		f = open(LOCAL_PASSWORD_FILE)
		for line in f:
			if line.startswith("%s:"%username):
				exists = True
				break
		f.close()
	return exists

# getLocalPassword
#      retrieves the local password entry (if there is one) for the specified user
def getLocalPassword(username):
	logging.debug("getLocalPassword()")
	password = None
	if LOCAL_PASSWORD_FILE is not None and os.path.isfile(LOCAL_PASSWORD_FILE):
		f = open(LOCAL_PASSWORD_FILE)
		for line in f:
			line = line.strip()
			if line.startswith("%s:"%username):
				parts = line.split(":",2)
				password = parts[1]
				break
		f.close()
	return password

# getUserGroups
#      retrieves the list of groups for a user using the p4 groups username command
def getUserGroups(userid):
	logging.debug("getUserGroups()")
	groups = []
	results = p4MarshalCmd(['groups',userid])
	for r in results:
		if b'group' in r:
			groups.append(r.get(b'group').decode("utf-8"))
	return groups

# checkLDAPPassword
#      checks the user and password against the LDAP server
def checkLDAPPassword(userid, password):
	auth_result = 1
	logging.debug("checkLDAPPassword()")
	
	# retrieve the groups to which this user belongs
	groups = getUserGroups(userid)
	if(len(groups) == 0):
		logging.debug("no groups found for {0}... using default".format(userid))
		groups.append(CONFIG.get('globals', 'default.perforce.group'))
	logging.debug("{0} groups: ".format(userid) + ' '.join(groups))
	# the following is needed to allow Python to accept a non-CA cert
	ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
	
	for s in CONFIG.sections():
		if(not s == "globals"):
			try:
				group = CONFIG.get(s, "perforce.group")
				if(group in groups):
					serverURL = CONFIG.get(s, "server.url")
					logging.debug("connecting to server: {0}".format(serverURL))
					acctDomain = CONFIG.get(s, "account.domain")
					# initialize the LDAP connection
					con = ldap.initialize(serverURL)
					# set the timeout value (just in case the server is not responding)
					con.set_option(ldap.OPT_TIMEOUT, TIMEOUT)
					try:
						# try to use a simple bind to connect to the server. The benefit
						# of using a simple bind is that we don't need a special search
						# account to retrieve the user's DN
						domainAcct = userid + acctDomain
						# Note, this line will normally work with an AD server if you set
						# acctDomain to the Active Directory Domain Name
						# domainAcct = acctDomain + "\\" + userid
						logging.debug("attempting to bind with {0}".format(domainAcct))
						con.simple_bind_s(domainAcct, password)
						auth_result = 0
					except Exception as e:
						# if bind throws an exception, then access is DENIED
						auth_result = 1
					break
			except:
				continue
	return auth_result

# doAuthenticate
#    main routine for performing the authentication logic
def doAuthenticate(userid):
	logging.debug("doAuthenticate()")
	# this is the main routine for doing the authentication logic

	password = None
	# default behavior is to deny access (return code 1) so let's set 
	# that here and then see if we can prove it wrong
	result = 1
	
	# let's see if we're running in test mode
	if(args.test):
		# since we are in test mode, let's prompt the user for a password
		password = getpass.getpass("Enter password (will not appear) > ")
	else:
		# since we are not in test mode, we read password from STDIN
		# The password is passed to the auth-check trigger by a perforce client when 
		# the user performs a 'p4 login' and is prompted for a password
		password = sys.stdin.read().rstrip() 

	# check to see if the password is empty. since we are using simple LDAP binding
	# we do not want to allow empty passwords, but we really should not allow empty
	# passwords ever anyway.
	if(len(password.strip()) == 0):
		logging.debug("empty password -- rejected")
		return result

	# check to see if the user account exists in the local password file
	# if user exists in local password file, we will not consult LDAP
	if(localUserExists(userid)):
		# get the local password, if there is one
		localPassword = getLocalPassword(userid)
		# user exists in local password file, now check the password by
		# encrypting the supplied password and comparing it to the one
		# retrieved from the local password file
		if(bcrypt.hashpw(password, localPassword) == localPassword):
			result = 0
	else:
		# user doesn't exist in the local password file, so check user in LDAP
		result = checkLDAPPassword(userid, password)
		
	# ok, we're done... return the result
	return result

####### MAIN METHOD #
# this is the main entry point for the script. This is where the magic happens!
if __name__ == '__main__':
	# set up the argument parser and parse the command-line arguments
	parser = argparse.ArgumentParser(description='auth-check trigger implementation.')
	parser.add_argument('-u', '--user', dest='username', default=None, help='the username to authenticate')
	parser.add_argument('-c', '--config', dest='configfile', required=True, help='the configuration file')
	parser.add_argument('-e', '--edit', dest='edit', action='store_true', help='edit the local password file')
	parser.add_argument('-t', '--test', dest='test', action='store_true', help='run in test mode')
	parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', help='override configuration and set logging level to DEBUG')
	args = parser.parse_args()
	
	# set the userid from the command-line arguments
	userid = args.username

	# read the config file specified on the command line
	CONFIG.read(args.configfile)
	
	# read in the information from the configuration file
	LOCAL_PASSWORD_FILE = CONFIG.get('globals', 'passwd.file')
	P4 = CONFIG.get('globals', 'p4.path')
	P4PORT = CONFIG.get('globals', 'p4.port')
	P4USER = CONFIG.get('globals', 'p4.user')
	TIMEOUT = CONFIG.getint('globals', 'timeout')
	try:
		P4TICKETS = CONFIG.get('globals', 'p4.tickets')
	except:
		pass

	# configure the logging level
	logLevel = logging.WARN
	try:
		logLevelStr = CONFIG.get('globals', 'log.level')
		if(logLevelStr == "DEBUG"):
			logLevel = logging.DEBUG
		elif(logLevelStr == "ERROR"):
			logLevel = logging.ERROR
	except:
		pass

	# configure the log file
	logFile = None
	try:
		logFile = CONFIG.get('globals', 'log.file')
	except:
		pass

	if logFile is not None and not args.test:
		logging.basicConfig(filename=logFile, format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', level=logLevel)
	else:
		logging.basicConfig(format='[%(levelname)s] %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', level=logLevel)

	if(args.test):
		# in test mode, we are checking the ability for the script
		# to authenticate a user (or not)
		print("---- TEST MODE ----")
		if(args.username == None):
			userid = raw_input("enter username: ")
		result = doAuthenticate(userid)
		if(result):
			print("Invalid credentials. Access to Perforce would be denied.")
		else:
			print("Credentials were valid. Access to Perforce would be allowed")
	elif(args.edit):
		print("---- EDIT MODE ----")
		if(args.username == None):
			userid = raw_input("enter username: ")
		updateLocalPassword(userid)
	else:
		# doing the authentication for real!
		result = doAuthenticate(userid)
		# now check the result, returning "authentication failed" error if result is not 0
		if (result):
			logging.warning("login failure for user: {0}".format(userid))
			msg = CONFIG.get('globals', 'auth.failed.message')
			print(msg)
		# exit with the result code
		#    0 = access granted
		#    1 = access denied
		sys.exit(result)

	sys.exit(0)
# Change User Description Committed
#4 16784 C. Thomas Tyler Routine Merge Down to dev from main using:
p4 -s merge -n -b perforce_software-sdp-dev
#3 16029 C. Thomas Tyler Routine merge to dev from main using:
p4 merge -b perforce_software-sdp-dev
#2 12923 C. Thomas Tyler Routine merge down from main to dev.
Resolved with 'p4 resolve -as', no merges or conflicts.
#1 12245 C. Thomas Tyler Routine merge down from main.
//guest/perforce_software/sdp/main/Server/Unix/p4/common/bin/triggers/authCheckTrigger.py
#1 12173 Russell C. Jackson (Rusty) Python ldap/ad authentication script.
This uses simple bind, so it is easier
 to use than the Perl version.