Major rework, PPSS is now fully asynchronous. There are no while loops polling stuff at an interval.

This commit is contained in:
Louwrentius 2009-09-26 18:24:54 +00:00
parent d2e6bf7c54
commit 24bdb33583
1 changed files with 48 additions and 66 deletions

112
ppss.sh
View File

@ -38,7 +38,7 @@ trap 'kill_process; ' INT
# Setting some vars. Do not change. # Setting some vars. Do not change.
SCRIPT_NAME="Distributed Parallel Processing Shell Script" SCRIPT_NAME="Distributed Parallel Processing Shell Script"
SCRIPT_VERSION="2.22" SCRIPT_VERSION="2.30"
# The first argument to this script is always the 'mode'. # The first argument to this script is always the 'mode'.
MODE="$1" MODE="$1"
@ -77,6 +77,7 @@ IFS_BACKUP="$IFS"
INTERVAL="30" # Polling interval to check if there are running jobs. INTERVAL="30" # Polling interval to check if there are running jobs.
CPUINFO=/proc/cpuinfo CPUINFO=/proc/cpuinfo
PROCESSORS="" PROCESSORS=""
STOP_KEY=$RANDOM$RANDOM$RANDOM
MIN_JOBS=3 MIN_JOBS=3
SSH_SERVER="" # Remote server or 'master'. SSH_SERVER="" # Remote server or 'master'.
@ -94,8 +95,8 @@ SSH_MASTER_PID=""
PPSS_HOME_DIR="ppss" PPSS_HOME_DIR="ppss"
ITEM_LOCK_DIR="$PPSS_DIR/PPSS_ITEM_LOCK_DIR" # Remote directory on master used for item locking. ITEM_LOCK_DIR="$PPSS_DIR/PPSS_ITEM_LOCK_DIR" # Remote directory on master used for item locking.
PPSS_LOCAL_TMPDIR="$PPSS_DIR/PPSS_TMPDIR" # Local directory on slave for local processing. PPSS_LOCAL_TMPDIR="$PPSS_DIR/PPSS_LOCAL_TMPDIR" # Local directory on slave for local processing.
PPSS_LOCAL_OUTPUT="$PPSS_DIR/PPSS_OUTPUT" # Local directory on slave for local output. PPSS_LOCAL_OUTPUT="$PPSS_DIR/PPSS_LOCAL_OUTPUT" # Local directory on slave for local output.
TRANSFER_TO_SLAVE="0" # Transfer item to slave via (s)cp. TRANSFER_TO_SLAVE="0" # Transfer item to slave via (s)cp.
SECURE_COPY="1" # If set, use SCP, Otherwise, use cp. SECURE_COPY="1" # If set, use SCP, Otherwise, use cp.
REMOTE_OUTPUT_DIR="" # Remote directory to which output must be uploaded. REMOTE_OUTPUT_DIR="" # Remote directory to which output must be uploaded.
@ -209,33 +210,17 @@ showusage () {
kill_process () { kill_process () {
kill $LISTENER_PID >> /dev/null 2>&1
while true
do
JOBS=`ps aux | grep $USER | grep -v grep | grep -v -i screen | grep ppss.sh | grep -i bash | wc -l`
if [ "$JOBS" -gt "2" ]
then
for x in `ps aux | grep $USER | grep -v grep | grep -v -i screen | grep ppss.sh | grep -i bash | awk '{ print $1 }'`
do
if [ ! "$x" == "$PID" ] && [ ! "$x" == "$$" ]
then
kill -9 $x >> /dev/null 2>&1
fi
done
sleep 5
else
cleanup
echo -en "\033[1B"
# The master SSH connection should be killed.
if [ ! -z "$SSH_MASTER_PID" ]
then
kill -9 "$SSH_MASTER_PID"
fi
echo ""
exit 0
fi
done
kill $LISTENER_PID > /dev/null 2&>1
sleep 1
cleanup
sleep 1
if [ ! -z "$SSH_MASTER_PID" ]
then
kill -9 "$SSH_MASTER_PID"
fi
sleep 1
log INFO "Finished."
} }
exec_cmd () { exec_cmd () {
@ -752,9 +737,15 @@ erase_ppss () {
then then
for NODE in `cat $NODES_FILE` for NODE in `cat $NODES_FILE`
do do
does_file_exist "ppss"
if [ "$?" == "0" ]
then
log INFO "Erasing PPSS homedir $PPSS_HOME_DIR from node $NODE." log INFO "Erasing PPSS homedir $PPSS_HOME_DIR from node $NODE."
ssh -q $SSH_KEY $SSH_OPTS $USER@$NODE "./$PPSS_HOME_DIR/$0 kill" ssh -q $SSH_KEY $SSH_OPTS $USER@$NODE "./$PPSS_HOME_DIR/$0 kill"
ssh -q $SSH_KEY $SSH_OPTS $USER@$NODE "rm -rf $PPSS_HOME_DIR" ssh -q $SSH_KEY $SSH_OPTS $USER@$NODE "rm -rf $PPSS_HOME_DIR"
else
log INFO "PPSS was not present on node $NODE."
fi
done done
else else
log INFO "Aborting.." log INFO "Aborting.."
@ -1294,7 +1285,13 @@ start_single_worker () {
ERROR=$? ERROR=$?
if [ ! "$ERROR" == "0" ] if [ ! "$ERROR" == "0" ]
then then
log DEBUG "Item empty, we are probably almost finished." # If no more items are available, the listener should be
# informed that a worker just finished / died.
# Tis allows the listener to determine if all processes
# are finished and it is time to stop.
echo
log INFO "Waiting for remaining jobs to finish..."
echo "$STOP_KEY" > $FIFO
return 1 return 1
else else
get_global_lock get_global_lock
@ -1434,13 +1431,30 @@ commando () {
# This is the listener service. It listens on the pipe for events. # This is the listener service. It listens on the pipe for events.
# A job is executed for every event received. # A job is executed for every event received.
# This listener enables fully asynchronous processing.
listen_for_job () { listen_for_job () {
DIED=0
log DEBUG "Listener started." log DEBUG "Listener started."
while read event <& 42 while read event <& 42
do do
commando "$event" & # The start_single_worker method sends a special signal to
# inform the listener that a worker is finished.
# If all workers are finished, it is time to stop.
if [ "$event" == "$STOP_KEY" ]
then
((DIED++))
if [ "$DIED" -ge "$MAX_NO_OF_RUNNING_JOBS" ]
then
break
fi
log DEBUG "$((MAX_NO_OF_RUNNING_JOBS-DIED)) jobs are remaining."
else
commando "$event" &
fi
done done
kill_process
log DEBUG "Listener stopped."
} }
# This starts an number of parallel workers based on the # of parallel jobs allowed. # This starts an number of parallel workers based on the # of parallel jobs allowed.
@ -1540,7 +1554,7 @@ main () {
init_vars init_vars
test_server test_server
get_all_items get_all_items
listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & 2>&1 >> /dev/null
LISTENER_PID=$! LISTENER_PID=$!
start_all_workers start_all_workers
;; ;;
@ -1597,7 +1611,7 @@ main () {
exec_cmd "rm -f $PAUSE_SIGNAL" exec_cmd "rm -f $PAUSE_SIGNAL"
fi fi
cleanup cleanup
exit exit 0
;; ;;
deploy ) deploy )
display_header display_header
@ -1612,7 +1626,6 @@ main () {
show_status show_status
cleanup cleanup
exit 0 exit 0
# some show command
;; ;;
erase ) erase )
display_header display_header
@ -1640,36 +1653,5 @@ main () {
# This command starts the that sets the whole framework in motion. # This command starts the that sets the whole framework in motion.
main main
# Either start new jobs or exit, sleep in the meantime.
while true
do
sleep 5
JOBS=`ps aux | grep $USER | grep -v grep | grep -v -i screen | grep ppss.sh | wc -l`
log DEBUG "There are $JOBS running processes. "
get_min_jobs
if [ "$JOBS" -gt "$MIN_JOBS" ]
then
log DEBUG "Sleeping $INTERVAL seconds."
sleep $INTERVAL
else
if [ "$STOP" == "1" ] || [ ! "$PERCENT" == "100" ]
then
set_status "STOPPED"
elif [ "$PERCENT" == "100" ]
then
set_status "FINISHED"
fi
echo -en "\033[1B"
log INFO "There are no more running jobs, so we must be finished."
echo -en "\033[1B"
log INFO "Killing listener and remainig processes."
log INFO "Dying processes may display an error message."
kill_process
fi
done
# Exit after all processes have finished. # Exit after all processes have finished.
wait wait