Reworked ppss regarding process management..

This commit is contained in:
Louwrentius 2009-12-17 01:52:42 +00:00
parent 7d0815528e
commit 24b368e734
2 changed files with 63 additions and 28 deletions

View File

@ -1,7 +1,7 @@
#!/bin/bash #!/bin/bash
DEBUG="$1" DEBUG="$1"
VERSION=2.45 VERSION=2.50
TMP_DIR="ppss" TMP_DIR="ppss"
cleanup () { cleanup () {

87
ppss.sh
View File

@ -34,11 +34,11 @@
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
# Handling control-c for a clean shutdown. # Handling control-c for a clean shutdown.
trap 'kill_process; ' INT trap 'kill_process' SIGINT
# Setting some vars. # Setting some vars.
SCRIPT_NAME="Distributed Parallel Processing Shell Script" SCRIPT_NAME="Distributed Parallel Processing Shell Script"
SCRIPT_VERSION="2.45" SCRIPT_VERSION="2.50"
# The first argument to this script can be a mode. # The first argument to this script can be a mode.
MODES="node start config stop pause continue deploy status erase kill" MODES="node start config stop pause continue deploy status erase kill"
@ -78,7 +78,7 @@ JOB_LOG_DIR="$PPSS_DIR/job_log" # Directory containing l
LOGFILE="$PPSS_DIR/ppss-log-$$.txt" # General PPSS log file. Contains lots of info. LOGFILE="$PPSS_DIR/ppss-log-$$.txt" # General PPSS log file. Contains lots of info.
STOP=0 # STOP job. STOP=0 # STOP job.
MAX_DELAY=0 # MAX DELAY between jobs. MAX_DELAY=0 # MAX DELAY between jobs.
MAX_LOCK_DELAY=3 # MAX_LOCK_DELAY=9 #
PERCENT="0" PERCENT="0"
LISTENER_PID="" LISTENER_PID=""
IFS_BACKUP="$IFS" IFS_BACKUP="$IFS"
@ -283,7 +283,7 @@ showusage_long () {
kill_process () { kill_process () {
echo "$KILL_KEY" >> "$FIFO" echo "$KILL_KEY" >> "$FIFO"
} }
exec_cmd () { exec_cmd () {
@ -329,7 +329,7 @@ check_for_interrupt () {
then then
set_status "PAUZED" set_status "PAUZED"
log INFO "PAUSE: sleeping for $PAUSE_DELAY SECONDS." log INFO "PAUSE: sleeping for $PAUSE_DELAY SECONDS."
sleep $PAUSE_DELAY sleep 0.$PAUSE_DELAY
check_for_interrupt check_for_interrupt
else else
set_status "RUNNING" set_status "RUNNING"
@ -342,17 +342,17 @@ cleanup () {
if [ -e "$FIFO" ] if [ -e "$FIFO" ]
then then
rm $FIFO rm "$FIFO"
fi fi
if [ -e "$ARRAY_POINTER_FILE" ] if [ -e "$ARRAY_POINTER_FILE" ]
then then
rm $ARRAY_POINTER_FILE rm "$ARRAY_POINTER_FILE"
fi fi
if [ -e "$GLOBAL_LOCK" ] if [ -e "$GLOBAL_LOCK" ]
then then
rm -rf $GLOBAL_LOCK rm -rf "$GLOBAL_LOCK"
fi fi
if [ -e "$SSH_SOCKET" ] if [ -e "$SSH_SOCKET" ]
@ -798,7 +798,7 @@ deploy () {
KEY=`echo $SSH_KEY | cut -d " " -f 2` KEY=`echo $SSH_KEY | cut -d " " -f 2`
sleep 1.1 sleep 1
ssh -q $SSH_OPTS_NODE $SSH_KEY $USER@$NODE "cd ~ && mkdir $PPSS_HOME_DIR >> /dev/null 2>&1" ssh -q $SSH_OPTS_NODE $SSH_KEY $USER@$NODE "cd ~ && mkdir $PPSS_HOME_DIR >> /dev/null 2>&1"
scp -q $SSH_OPTS_NODE $SSH_KEY $0 $USER@$NODE:~/$PPSS_HOME_DIR scp -q $SSH_OPTS_NODE $SSH_KEY $0 $USER@$NODE:~/$PPSS_HOME_DIR
@ -1038,7 +1038,7 @@ random_delay () {
NUMBER=$RANDOM NUMBER=$RANDOM
let "NUMBER %= $ARGS" let "NUMBER %= $ARGS"
sleep "$NUMBER" sleep "0.$NUMBER"
} }
@ -1297,7 +1297,7 @@ get_item () {
if [ "$ARRAY_POINTER" -ge "$SIZE_OF_ARRAY" ] if [ "$ARRAY_POINTER" -ge "$SIZE_OF_ARRAY" ]
then then
release_global_lock release_global_lock
echo -en "\033[1A" #echo -en "\033[1A"
return 1 return 1
fi fi
@ -1333,11 +1333,12 @@ start_single_worker () {
ERROR=$? ERROR=$?
if [ ! "$ERROR" == "0" ] if [ ! "$ERROR" == "0" ]
then then
#
# If no more items are available, the listener should be # If no more items are available, the listener should be
# informed that a worker just finished / died. # informed that a worker just finished / died.
# Tis allows the listener to determine if all processes # This allows the listener to determine if all processes
# are finished and it is time to stop. # are finished and it is time to stop.
echo #
echo "$STOP_KEY" > $FIFO echo "$STOP_KEY" > $FIFO
return 1 return 1
else else
@ -1436,9 +1437,11 @@ commando () {
then then
eval "$COMMAND" >> "$ITEM_LOG_FILE" 2>&1 eval "$COMMAND" >> "$ITEM_LOG_FILE" 2>&1
ERROR="$?" ERROR="$?"
MYPID="$!"
else else
eval '$COMMAND"$ITEM" >> "$ITEM_LOG_FILE" 2>&1' eval '$COMMAND"$ITEM" >> "$ITEM_LOG_FILE" 2>&1'
ERROR="$?" ERROR="$?"
MYPID="$!"
fi fi
AFTER="$(date +%s)" AFTER="$(date +%s)"
@ -1492,9 +1495,11 @@ commando () {
return $? return $?
} }
#
# This is the listener service. It listens on the pipe for events. # This is the listener service. It listens on the pipe for events.
# A job is executed for every event received. # A job is executed for every event received.
# This listener enables fully asynchronous processing. # This listener enables fully asynchronous processing.
#
listen_for_job () { listen_for_job () {
FINISHED=0 FINISHED=0
DIED=0 DIED=0
@ -1502,19 +1507,19 @@ listen_for_job () {
log DEBUG "Listener started." log DEBUG "Listener started."
while read event <& 42 while read event <& 42
do do
if [ "$event" == "$STOP_KEY" ]
then
#
# The start_single_worker method sends a special signal to # The start_single_worker method sends a special signal to
# inform the listener that a worker is finished. # inform the listener that a worker is finished.
# If all workers are finished, it is time to stop. # If all workers are finished, it is time to stop.
# This mechanism makes PPSS asynchronous. # This mechanism makes PPSS asynchronous.
#
# Gives a status update on the current progress..
if [ "$event" == "$STOP_KEY" ]
then
((DIED++)) ((DIED++))
if [ "$DIED" -ge "$MAX_NO_OF_RUNNING_JOBS" ] if [ "$DIED" -ge "$MAX_NO_OF_RUNNING_JOBS" ]
then then
kill_process #kill_process
break
else else
RES=$((MAX_NO_OF_RUNNING_JOBS-DIED)) RES=$((MAX_NO_OF_RUNNING_JOBS-DIED))
if [ "$RES" == "1" ] if [ "$RES" == "1" ]
@ -1527,28 +1532,56 @@ listen_for_job () {
fi fi
elif [ "$event" == "$KILL_KEY" ] elif [ "$event" == "$KILL_KEY" ]
then then
for x in $PIDS #
# If all workers are finished, it is time to kill all remaining
# processes, terminate ssh processes and the listener itself.
#
# This command kills all processes that are related to the master
# process as defined by $PID. All processes that have ever been
# spawned, although disowned or backgrounded will be killed...
#
PROCLIST=`ps a -o pid,pgid,ppid,command | grep [0-9] | grep $PID | grep -v -i grep`
#echo "$PROCLIST" > proclist.txt
oldIFS=$IFS # save the field separator
IFS=$'\n' # new field separator, the end of line
for x in `echo "$PROCLIST"`
do do
kill $x >> /dev/null 2>&1 MYPPID=`echo $x | awk '{ print $3 }'`
MYPID=`echo $x | awk '{ print $1 }'`
if [ ! "$MYPPID" == "$PID" ] && [ ! "$MYPPID" == "1" ]
then
if [ ! "$MYPID" == "$PID" ]
then
log DEBUG "Killing process $MYPID"
kill $MYPID >> /dev/null 2>&1
else
log DEBUG "Not killing master process..$MYPID.."
fi
else
log DEBUG "Not killing listener process. $MYPID.."
fi
done done
IFS=$oldIFS
if [ ! -z "$SSH_MASTER_PID" ] if [ ! -z "$SSH_MASTER_PID" ]
then then
kill "$SSH_MASTER_PID" kill "$SSH_MASTER_PID"
fi fi
echo log INFO "Press ENTER to continue."
log INFO "Finished. Consult ./$JOB_LOG_DIR for job output."
break break
else else
commando "$event" & commando "$event" &
PIDS="$PIDS $!" MYPID="$!"
disown disown
PIDS="$PIDS $MYPID"
#log DEBUG "Event $event has pid $MYPID"
fi fi
get_global_lock get_global_lock
SIZE_OF_ARRAY="${#ARRAY[@]}" SIZE_OF_ARRAY="${#ARRAY[@]}"
ARRAY_POINTER=`cat $ARRAY_POINTER_FILE` ARRAY_POINTER=`cat $ARRAY_POINTER_FILE`
PERCENT=$((100 * $ARRAY_POINTER / $SIZE_OF_ARRAY ))
release_global_lock release_global_lock
PERCENT=$((100 * $ARRAY_POINTER / $SIZE_OF_ARRAY ))
if [ "$DIED" == "0" ] && [ "$FINISHED" == "0" ] if [ "$DIED" == "0" ] && [ "$FINISHED" == "0" ]
then then
log INFO "Currently $PERCENT percent complete. Processed $ARRAY_POINTER of $SIZE_OF_ARRAY items." log INFO "Currently $PERCENT percent complete. Processed $ARRAY_POINTER of $SIZE_OF_ARRAY items."
@ -1564,7 +1597,7 @@ listen_for_job () {
set_status STOPPED set_status STOPPED
log DEBUG "Listener stopped." log DEBUG "Listener stopped."
cleanup cleanup
exit log INFO "Finished. Consult $JOB_LOG_DIR for job output."
} }
# This starts an number of parallel workers based on the # of parallel jobs allowed. # This starts an number of parallel workers based on the # of parallel jobs allowed.
@ -1764,6 +1797,8 @@ main () {
get_all_items get_all_items
listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & 2>&1 >> /dev/null listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & 2>&1 >> /dev/null
LISTENER_PID=$! LISTENER_PID=$!
log DEBUG "Master PID is $PID."
log DEBUG "Listener PID is $LISTENER_PID."
start_all_workers start_all_workers
;; ;;