#!/usr/bin/env python import time import os import threading import optparse import datetime import sys import select import signal from kazoo.client import KazooClient debug = False def _dump_stat( stat ): if stat is None: print("\tStat is None") else: createTime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(stat.ctime / 1000.0)) modTime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(stat.mtime / 1000.0)) print("\tStat is:") print("\t\tacl_version: %d" % stat.aversion ) print("\t\tversion: %s" % stat.version ) print("\t\tcreated: %s" % createTime ) print("\t\tcreation_transaction_id: %d" % stat.czxid ) print("\t\tlast_modified: %s" % modTime ) print("\t\tlast_modified_transaction_id: %d" % stat.mzxid) print("\t\tdata_length: %d" % stat.dataLength ) print("\t\tchildren_count: %d" % stat.numChildren ) def _dump_event( event ): if event is None: print("\tEvent is None") else: print( event ) print('Event Type %s' % event.type ) print('Event State %s' % event.state ) if event.path is None: print("\tNode Path is None") else: print("\tNode Path is: %s" % event.path) def dump_child_event( parent_node, children, event ): global debug if debug == False: return; print("CHILDREN EVENT OCCURS: parent is %s." % parent_node ) if event is None: print("\tEvent is None") else: _dump_event( event ) if children is not None: i = 0; for child in children: i += 1 print( "\tChild node %d is %s" % (i, child) ) def dump_node_event( node, data, stat, event ): global debug if debug == False: return; print("NODE EVENT OCCURS: node is %s." % node ) _dump_event( event ) _dump_stat( stat ) print("\tNodeData is: \"%s\"" % data ) parser = optparse.OptionParser( description='Snoop on Zookeeper events for a DCS cluster.' ) parser.add_option( '--debug', dest='debugOn', action='store_true', default=False, help='print out verbose debugging information' ) parser.add_option( '--zkport', dest='zkport', default="localhost:2181", help='CSV string containing ZK host:port pairs' ) default_user = os.path.basename(os.environ['HOME']) parser.add_option( '--cluster', dest='cluster', default=default_user + "_cluster", help='cluster id of the cluster to monitor' ) opts, args = parser.parse_args() if opts.zkport and opts.cluster: cluster = opts.cluster zkportpairs = opts.zkport else: print("Must either specify a config file or both zkport and cluster") print( parser.print_help() ) sys.exit(0) memberNode = '/perforce/cluster/' + cluster + '/members' clusterRootNode = '/perforce/cluster/' + cluster electionNode = clusterRootNode + '/election' workspaceNode = clusterRootNode + '/workspace' masterNode = clusterRootNode + '/master' routerNode = clusterRootNode + '/router' # Global Locked Access Variables list_lock = threading.RLock() member_children = [] member_set = set(member_children) election_children = [] election_set = set(election_children) router_children = [] router_set = set(router_children) workspace_children = [] workspace_set = set(workspace_children) masterData = '' zk = KazooClient(hosts=zkportpairs, read_only=True ) zk.start() zk.ensure_path(electionNode) zk.ensure_path(workspaceNode) zk.ensure_path(routerNode) zk.ensure_path(memberNode) # connection watch def my_listener(state): if state == KazooState.LOST: print("ERROR - SESSION WAS LOST!!!") elif state == KazooState.SUSPENDED: print("ERROR - DISCONNECTED!!!") else: print("ERROR - DIS & RE-CONNECTED!!!") zk.add_listener( my_listener ) # utility routine def show_status(): print("CURRENT ZOOKEEPER STATUS") print("----------------------------------------------------------------------------------") if zk.exists( masterNode ): (data, stat) = zk.get( masterNode ) print("MASTER is UP: %s" % data) else: print("MASTER is DOWN") print("ELECTION NODES") children = zk.get_children(electionNode) for child in children: childNode = electionNode + '/' + child (data, stat) = zk.get(childNode) print("\tNode %s has data: %s" % (child, data)) print("WORKSPACE NODES") children = zk.get_children(workspaceNode) for child in children: childNode = workspaceNode + '/' + child (data, stat) = zk.get(childNode) print("\tNode %s has data: %s" % (child, data)) print("ROUTER NODES") children = zk.get_children(routerNode) for child in children: childNode = routerNode + '/' + child (data, stat) = zk.get(childNode) print("\tNode %s has data: %s" % (child, data)) print("MEMBER NODES") children = zk.get_children(memberNode) for child in children: childNode = memberNode + '/' + child (data, stat) = zk.get(childNode) print("\tNode %s has data: %s" % (child, data)) print("----------------------------------------------------------------------------------") print("") # data watches def election_data_changed(data, stat, event): dump_node_event( 'Election node', data, stat, event ) with list_lock: if event is None: if data: print("+++++++++++++++ Child of %s data is now at version: %s, data: %s" % (electionNode, stat.version, data.decode("utf-8"))) else: print("+++++++++++++++ %s/%s Event type: %s (state %)" % (electionNode, event.path, event.type, event.state)) if data is not None and data: print("+++++++++++++++ %s/%s data is now at version: %s, data: %s" % ( electionNode, event.path, stat.version, data.decode("utf-8"))) def wksp_data_changed(data, stat, event): dump_node_event( 'Workspace node', data, stat, event ) with list_lock: if event is None: if data: print("+++++++++++++++ Child of %s data is now at version: %s, data: %s" % (workspaceNode, stat.version, data.decode("utf-8"))) else: print("+++++++++++++++ %s/%s Event type: %s (state %)" % (workspaceNode, event.path, event.type, event.state)) if data is not None and data: print("+++++++++++++++ %s/%s data is now at version: %s, data: %s" % ( workspaceNode, event.path, stat.version, data.decode("utf-8"))) def router_data_changed(data, stat, event): dump_node_event( 'Router node', data, stat, event ) with list_lock: if event is None: if data: print("+++++++++++++++ Child of %s data is now at version: %s, data: %s" % (routerNode, stat.version, data.decode("utf-8"))) else: print("+++++++++++++++ %s/%s Event type: %s (state %)" % (routerNode, event.path, event.type, event.state)) if data is not None and data: print("+++++++++++++++ %s/%s data is now at version: %s, data: %s" % ( routerNode, event.path, stat.version, data.decode("utf-8"))) def member_data_changed(data, stat, event): dump_node_event( 'Member node', data, stat, event ) with list_lock: if event is None: if data: print("+++++++++++++++ Child of %s data is now at version: %s, data: %s" % (memberNode, stat.version, data.decode("utf-8"))) else: print("+++++++++++++++ %s/%s Event type: %s (state %)" % (memberNode, event.path, event.type, event.state)) if data is not None and data: print("+++++++++++++++ %s/%s data is now at version: %s, data: %s" % ( memberNode, event.path, stat.version, data.decode("utf-8"))) # setup watches on master node def print_master_status( masterExists ): if masterExists: @zk.DataWatch( masterNode ) def master_data_changed(data, stat): with list_lock: if data: print("+++++++++++++++ %s data is now at version: %s, data: %s" % (masterNode, stat.version, data.decode("utf-8"))) else: print("!!!!!!! MASTER DOES NOT EXIST") def master_changed( event ): print("MASTER NODE STATUS") if event is not None: if(debug == False): print("-------- %s Event type: %s (state: %s)" % (masterNode, event.type, event.state)) else: print("-------- %s Event type: Zookeeper Monitor Startup" % masterNode ) masterExists = zk.exists( masterNode, watch=master_changed ) print_master_status( masterExists ) masterExists = zk.exists( masterNode, watch=master_changed ) print("MASTER NODE STATUS") print("-------- %s Event type: Zookeeper Monitor Startup" % masterNode ) print_master_status( masterExists ) @zk.ChildrenWatch(electionNode, send_event=True) def elect_child_changed(children, event): global election_children print(' ') print("ELECTION CHILDREN are now: %s" % children) with list_lock: dump_child_event( 'election children', children, event ) if event is not None: if(debug == False): print("-------- %s Event type: %s (state %)" % (event.path, event.type, event.state)) children_set = set(children) for child in children: if child not in election_set: childNode = electionNode + '/' + child election_children.append(child) print("-------- Election Node %s Created" % childNode ) zk.DataWatch( childNode, func=election_data_changed, send_event=True ) for child in election_children: if child not in children_set: childNode = electionNode + '/' + child election_children.remove(child) print("-------- Election Node %s Deleted" % childNode ) return True @zk.ChildrenWatch(workspaceNode, send_event=True) def wksp_child_changed(children, event): global workspace_children print(' ') print("WORKSPACE CHILDREN are now: %s" % children) with list_lock: dump_child_event( 'workspace children', children, event ) if event is not None: if(debug == False): print("-------- %s Event type: %s (state %)" % (event.path, event.type, event.state)) children_set = set(children) for child in children: if child not in workspace_set: childNode = workspaceNode + '/' + child workspace_children.append(child) print("-------- Workspace Node %s Created" % childNode ) zk.DataWatch( childNode, func=wksp_data_changed, send_event=True ) for child in workspace_children: if child not in children_set: childNode = workspaceNode + '/' + child workspace_children.remove(child) print("-------- Workspace Node %s Deleted" % childNode ) return True @zk.ChildrenWatch(routerNode, send_event=True) def router_child_changed(children, event): global router_children print(' ') print("ROUTER CHILDREN are now: %s" % children) with list_lock: dump_child_event( 'router children', children, event ) if event is not None: if(debug == False): print("-------- %s Event type: %s (state %)" % (event.path, event.type, event.state)) children_set = set(children) for child in children: if child not in router_set: childNode = routerNode + '/' + child router_children.append(child) print("-------- Router Node %s Created" % childNode ) zk.DataWatch( childNode, func=router_data_changed, send_event=True ) for child in router_children: if child not in children_set: childNode = routerNode + '/' + child router_children.remove(child) print("-------- Router Node %s Deleted" % childNode ) return True @zk.ChildrenWatch(memberNode, send_event=True) def member_child_changed(children, event): global member_children print(' ') print("MEMBER CHILDREN are now: %s" % children) with list_lock: dump_child_event( 'member children', children, event ) if event is not None: if(debug == False): print("-------- %s Event type: %s (state %)" % (event.path, event.type, event.state)) children_set = set(children) for child in children: if child not in member_set: childNode = memberNode + '/' + child member_children.append(child) print("-------- member Node %s Created" % childNode ) zk.DataWatch( childNode, func=member_data_changed, send_event=True ) for child in member_children: if child not in children_set: childNode = memberNode + '/' + child member_children.remove(child) print("-------- member Node %s Deleted" % childNode ) return True def handler(signum, frame): print ('Stopping zkMonitor with signal %d' % signum) zk.stop() sys.exit(0) # Set the signal handlers signal.signal(signal.SIGHUP, handler) signal.signal(signal.SIGINT, handler) while True: sys.stdin in select.select([sys.stdin], [], [])[0] input = sys.stdin.readline() print("What do you want to do: quit [q], show status[s], nothing[n] or any other input.") input = raw_input('Your choice [q,s,N]: ').strip().lower() if input == 'q': zk.stop() sys.exit(0) elif input == 's': show_status()