package Msg; BEGIN{push @LIB, "lib"; push @LIB, "."} use strict; use IO::Select; use IO::Socket; use Win32; use Win32::Daemon; use Carp; use vars qw(%rd_callbacks %wt_callbacks $rd_handles $wt_handles); # # Required for shutdown command when usng windows # use Sys::Hostname; use winsys; my $hostname = hostname(); $hostname =~ s/\.[a-zA-Z0-9]+//g; require "$hostname.pm"; my $os = new winsys; my $config = new $hostname; %rd_callbacks = (); %wt_callbacks = (); $rd_handles = IO::Select->new(); $wt_handles = IO::Select->new(); my $blocking_supported = 0; my $remote; sub connect { my ($pkg, $to_host, $to_port,$rcvd_notification_proc) = @_; # Create a new internet socket my $sock = IO::Socket::INET->new ( PeerAddr => $to_host, PeerPort => $to_port, Proto => 'tcp'); return undef unless $sock; $remote = $sock; # Create a connection end-point object my $conn = bless { sock => $sock, rcvd_notification_proc => $rcvd_notification_proc, }, $pkg; if ($rcvd_notification_proc) { # Bundle _rcv and $conn together in a closure my $callback = sub {_rcv($conn)}; set_event_handler ($sock, "read" => $callback); } $conn; } sub disconnect { my $conn = shift; my $sock = delete $conn->{sock}; return unless defined($sock); set_event_handler ($sock, "read" => undef, "write" => undef); close($sock); undef $!; # Should ideally process errors from close } sub send_now { my ($conn, $msg) = @_; _enqueue ($conn, $msg); $conn->_send (1); # 1 ==> flush } sub send_later { my ($conn, $msg) = @_; _enqueue($conn, $msg); my $sock = $conn->{sock}; return unless defined($sock); set_event_handler ($sock, "write" => sub {$conn->_send(0)}); } sub _enqueue { my ($conn, $msg) = @_; # prepend length (encoded as network long) my $len = length($msg); $msg = pack ('N', $len) . $msg; push (@{$conn->{queue}}, $msg); } sub _send { my ($conn, $flush) = @_; my $sock = $conn->{sock}; return unless defined($sock); my ($rq) = $conn->{queue}; # rq -> ref. to queue. # If $flush is set, set the socket to blocking, and send all # messages in the queue - return only if there's an error # If $flush is 0 (deferred mode) make the socket non-blocking, and # return to the event loop only after every message, or if it # is likely to block in the middle of a message. $flush ? $conn->set_blocking() : $conn->set_non_blocking(); my $offset = (exists $conn->{send_offset}) ? $conn->{send_offset} : 0; while (@$rq) { my $msg = $rq->[0]; my $bytes_to_write = length($msg) - $offset; my $bytes_written = 0; while ($bytes_to_write) { $bytes_written = syswrite ($sock, $msg, $bytes_to_write, $offset); if (!defined($bytes_written)) { if (_err_will_block($!)) { # Should happen only in deferred mode. Record how # much we have already sent. $conn->{send_offset} = $offset; # Event handler should already be set, so we will # be called back eventually, and will resume sending return 1; } else { # Uh, oh $conn->handle_send_err($!); return 0; # fail. Message remains in queue .. } } $offset += $bytes_written; $bytes_to_write -= $bytes_written; } delete $conn->{send_offset}; $offset = 0; shift @$rq; last unless $flush; # Go back to select and wait # for it to fire again. } # Call me back if queue has not been drained. if (@$rq) { set_event_handler ($sock, "write" => sub {$conn->_send(0)}); } else { set_event_handler ($sock, "write" => undef); } 1; # Success } sub handle_send_err { # For more meaningful handling of send errors, subclass Msg and # rebless $conn. my ($conn, $err_msg) = @_; warn "Error while sending: $err_msg \n"; set_event_handler ($conn->{sock}, "write" => undef); } my ($g_login_proc, $g_pkg); # The prefix g_ stands for global my $main_socket = 0; sub new_server { @_ == 4 || die "new_server (myhost, myport, login_proc)\n"; my ($pkg, $my_host, $my_port, $login_proc) = @_; $main_socket = IO::Socket::INET->new ( LocalAddr => $my_host, LocalPort => $my_port, Listen => 5, Proto => 'tcp', Reuse => 1); die "Could not create socket: $! \n" unless $main_socket; set_event_handler ($main_socket, "read" => \&_new_client); $g_login_proc = $login_proc; $g_pkg = $pkg; } sub _new_client { my $sock = $main_socket->accept(); my $conn = bless { 'sock' => $sock, 'state' => 'connected' }, $g_pkg; my $rcvd_notification_proc = &$g_login_proc ($conn); if ($rcvd_notification_proc) { $conn->{rcvd_notification_proc} = $rcvd_notification_proc; my $callback = sub {_rcv($conn)}; set_event_handler ($sock, "read" => $callback); } else { # Login failed $conn->disconnect(); } } sub _rcv { # Complement to _send my ($conn, $rcv_now) = @_; # $rcv_now complement of $flush # Find out how much has already been received, if at all my ($msg, $offset, $bytes_to_read, $bytes_read); my $sock = $conn->{sock}; return unless defined($sock); if (exists $conn->{msg}) { $msg = $conn->{msg}; delete $conn->{'msg'}; # Have made a copy. $offset = length($msg) - 1; # sysread appends to it. $bytes_to_read = $conn->{bytes_to_read}; } else { # The typical case ... $msg = ""; # Otherwise -w complains $offset = 0 ; $bytes_to_read = 0 ; # Will get set soon } # We want to read the message length in blocking mode. Quite # unlikely that we'll get blocked too long reading 4 bytes if (!$bytes_to_read) { # Get new length my $buf; $conn->set_blocking(); $bytes_read = sysread($sock, $buf, 4); if ($! || ($bytes_read != 4)) { goto FINISH; } $bytes_to_read = unpack ('N', $buf); # Eric # print "bytes_to_read = $bytes_to_read\n"; # Eric if ($bytes_to_read > (1024 * 512)) { goto FINISH; } } $conn->set_non_blocking() unless $rcv_now; while ($bytes_to_read) { $bytes_read = sysread ($sock, $msg, $bytes_to_read, $offset); if (defined ($bytes_read)) { if ($bytes_read == 0) { last; } $bytes_to_read -= $bytes_read; $offset += $bytes_read; } else { if (_err_will_block($!)) { # Should come here only in non-blocking mode $conn->{msg} = $msg; $conn->{bytes_to_read} = $bytes_to_read; return ; # .. to event loop; _rcv will be called # later when socket is readable again. } else { last; } } } # Message successfully read. FINISH: if (length($msg) == 0) { $conn->disconnect(); } if ($rcv_now) { return ($msg, $!); } else { &{$conn->{rcvd_notification_proc}}($conn, $msg, $!); } } sub rcv_now { my ($conn) = @_; my ($msg, $err) = _rcv ($conn, 1); # 1 means receive immediately return wantarray ? ($msg, $err) : $msg; } BEGIN { eval { require POSIX; POSIX->import(qw(F_SETFL O_NONBLOCK EAGAIN)); }; $blocking_supported = 1 unless $@; } sub _err_will_block { if ($blocking_supported) { return ($_[0] == EAGAIN()); } return 0; } sub set_non_blocking { if ($blocking_supported) { # preserve other fcntl flags my $flags = fcntl ($_[0], F_GETFL(), 0); fcntl ($_[0], F_SETFL(), $flags | O_NONBLOCK()); } } sub set_blocking { if ($blocking_supported) { my $flags = fcntl ($_[0], F_GETFL(), 0); $flags &= ~O_NONBLOCK(); # Clear blocking, but preserve others fcntl ($_[0], F_SETFL(), $flags); } } sub set_event_handler { shift unless ref($_[0]); # shift if first arg is package name my ($handle, %args) = @_; my $callback; if (exists $args{'write'}) { $callback = $args{'write'}; if ($callback) { $wt_callbacks{$handle} = $callback; $wt_handles->add($handle); } else { delete $wt_callbacks{$handle}; $wt_handles->remove($handle); } } if (exists $args{'read'}) { $callback = $args{'read'}; if ($callback) { $rd_callbacks{$handle} = $callback; $rd_handles->add($handle); } else { delete $rd_callbacks{$handle}; $rd_handles->remove($handle); } } } sub event_loop { my $SERVICE_SLEEP_TIME = 100; my $State; my $PrevState; my ($pkg, $loop_count) = @_; my ($conn, $r, $w, $rset, $wset); my $sqlquery; my @sqlarray; while (($State = Win32::Daemon::State()) != SERVICE_STOPPED ) { if( SERVICE_START_PENDING == $State ) { # Initialization code Win32::Daemon::State( SERVICE_RUNNING ); $PrevState = SERVICE_RUNNING; } elsif ( SERVICE_STOP_PENDING == $State ) { Win32::Daemon::State( SERVICE_STOPPED ); } elsif ( SERVICE_PAUSE_PENDING == $State ) { # "Pausing..." Win32::Daemon::State( SERVICE_PAUSED ); $PrevState = SERVICE_PAUSED; next; } elsif ( SERVICE_CONTINUE_PENDING == $State ) { # "Resuming..." Win32::Daemon::State( SERVICE_RUNNING ); $PrevState = SERVICE_RUNNING; next; } elsif ( SERVICE_STOP_PENDING == $State ) { # "Stopping..." Win32::Daemon::State( SERVICE_STOPPED ); $PrevState = SERVICE_STOPPED; next; } elsif( SERVICE_RUNNING == $State ) { # The service is running as normal... # ...add the main code here... # Quit the loop if no handles left to process last unless ($rd_handles->count() || $wt_handles->count()); ($rset, $wset) = IO::Select->select ($rd_handles, $wt_handles, undef, 3); foreach $r (@$rset) { &{$rd_callbacks{$r}} ($r) if exists $rd_callbacks{$r}; } foreach $w (@$wset) { &{$wt_callbacks{$w}}($w) if exists $wt_callbacks{$w}; } if (defined($loop_count)) { last unless $loop_count < 0; } } # Check for any outstanding commands. Pass in a non zero value # and it resets the Last Message to SERVICE_CONTROL_NONE. if( SERVICE_CONTROL_NONE != ( my $Message = Win32::Daemon::QueryLastMessage( 1 ) ) ) { if( SERVICE_CONTROL_INTERROGATE == $Message ) { # Got here if the Service Control Manager is requesting # the current state of the service. This can happen for # a variety of reasons. Report the last state we set. Win32::Daemon::State( $PrevState ); } elsif ( SERVICE_CONTROL_SHUTDOWN == $Message ) { # Yikes! The system is shutting down. We had better clean up # and stop. # Tell the SCM that we are preparing to shutdown and that we expect # it to take 25 seconds (so don't terminate us for at least 25 seconds)... Win32::Daemon::State( SERVICE_STOP_PENDING, 25000 ); } else { # Got an unhandled control message. Set the state to # whatever the previous state was. Win32::Daemon::State( $PrevState ); } } # Snooze for awhile so we don't suck up cpu time... Win32::Sleep( $SERVICE_SLEEP_TIME ); } # # Tell SQL that the server is down # if ($am) { $sqlquery = "update buildservers set status=\"0\" where ". "binary server=\"$hostname\""; @sqlarray = $os->run_sql_query($sqlquery, ";"); } # We are done so close down... Win32::Daemon::StopService(); } 1;