From 24bdb33583f192fe15d3bba9672bad0fcb0a4c67 Mon Sep 17 00:00:00 2001 From: Louwrentius Date: Sat, 26 Sep 2009 18:24:54 +0000 Subject: [PATCH] Major rework, PPSS is now fully asynchronous. There are no while loops polling stuff at an interval. --- ppss.sh | 114 ++++++++++++++++++++++++-------------------------------- 1 file changed, 48 insertions(+), 66 deletions(-) diff --git a/ppss.sh b/ppss.sh index 9e476b7..6bf2733 100755 --- a/ppss.sh +++ b/ppss.sh @@ -38,7 +38,7 @@ trap 'kill_process; ' INT # Setting some vars. Do not change. SCRIPT_NAME="Distributed Parallel Processing Shell Script" -SCRIPT_VERSION="2.22" +SCRIPT_VERSION="2.30" # The first argument to this script is always the 'mode'. MODE="$1" @@ -77,6 +77,7 @@ IFS_BACKUP="$IFS" INTERVAL="30" # Polling interval to check if there are running jobs. CPUINFO=/proc/cpuinfo PROCESSORS="" +STOP_KEY=$RANDOM$RANDOM$RANDOM MIN_JOBS=3 SSH_SERVER="" # Remote server or 'master'. @@ -94,8 +95,8 @@ SSH_MASTER_PID="" PPSS_HOME_DIR="ppss" ITEM_LOCK_DIR="$PPSS_DIR/PPSS_ITEM_LOCK_DIR" # Remote directory on master used for item locking. -PPSS_LOCAL_TMPDIR="$PPSS_DIR/PPSS_TMPDIR" # Local directory on slave for local processing. -PPSS_LOCAL_OUTPUT="$PPSS_DIR/PPSS_OUTPUT" # Local directory on slave for local output. +PPSS_LOCAL_TMPDIR="$PPSS_DIR/PPSS_LOCAL_TMPDIR" # Local directory on slave for local processing. +PPSS_LOCAL_OUTPUT="$PPSS_DIR/PPSS_LOCAL_OUTPUT" # Local directory on slave for local output. TRANSFER_TO_SLAVE="0" # Transfer item to slave via (s)cp. SECURE_COPY="1" # If set, use SCP, Otherwise, use cp. REMOTE_OUTPUT_DIR="" # Remote directory to which output must be uploaded. @@ -209,33 +210,17 @@ showusage () { kill_process () { - kill $LISTENER_PID >> /dev/null 2>&1 - while true - do - JOBS=`ps aux | grep $USER | grep -v grep | grep -v -i screen | grep ppss.sh | grep -i bash | wc -l` - if [ "$JOBS" -gt "2" ] - then - for x in `ps aux | grep $USER | grep -v grep | grep -v -i screen | grep ppss.sh | grep -i bash | awk '{ print $1 }'` - do - if [ ! "$x" == "$PID" ] && [ ! "$x" == "$$" ] - then - kill -9 $x >> /dev/null 2>&1 - fi - done - sleep 5 - else - cleanup - echo -en "\033[1B" - # The master SSH connection should be killed. - if [ ! -z "$SSH_MASTER_PID" ] - then - kill -9 "$SSH_MASTER_PID" - fi - echo "" - exit 0 - fi - done - + + kill $LISTENER_PID > /dev/null 2&>1 + sleep 1 + cleanup + sleep 1 + if [ ! -z "$SSH_MASTER_PID" ] + then + kill -9 "$SSH_MASTER_PID" + fi + sleep 1 + log INFO "Finished." } exec_cmd () { @@ -752,9 +737,15 @@ erase_ppss () { then for NODE in `cat $NODES_FILE` do + does_file_exist "ppss" + if [ "$?" == "0" ] + then log INFO "Erasing PPSS homedir $PPSS_HOME_DIR from node $NODE." ssh -q $SSH_KEY $SSH_OPTS $USER@$NODE "./$PPSS_HOME_DIR/$0 kill" ssh -q $SSH_KEY $SSH_OPTS $USER@$NODE "rm -rf $PPSS_HOME_DIR" + else + log INFO "PPSS was not present on node $NODE." + fi done else log INFO "Aborting.." @@ -1294,7 +1285,13 @@ start_single_worker () { ERROR=$? if [ ! "$ERROR" == "0" ] then - log DEBUG "Item empty, we are probably almost finished." + # If no more items are available, the listener should be + # informed that a worker just finished / died. + # Tis allows the listener to determine if all processes + # are finished and it is time to stop. + echo + log INFO "Waiting for remaining jobs to finish..." + echo "$STOP_KEY" > $FIFO return 1 else get_global_lock @@ -1434,13 +1431,30 @@ commando () { # This is the listener service. It listens on the pipe for events. # A job is executed for every event received. +# This listener enables fully asynchronous processing. listen_for_job () { + DIED=0 log DEBUG "Listener started." while read event <& 42 do - commando "$event" & + # The start_single_worker method sends a special signal to + # inform the listener that a worker is finished. + # If all workers are finished, it is time to stop. + if [ "$event" == "$STOP_KEY" ] + then + ((DIED++)) + if [ "$DIED" -ge "$MAX_NO_OF_RUNNING_JOBS" ] + then + break + fi + log DEBUG "$((MAX_NO_OF_RUNNING_JOBS-DIED)) jobs are remaining." + else + commando "$event" & + fi done + kill_process + log DEBUG "Listener stopped." } # This starts an number of parallel workers based on the # of parallel jobs allowed. @@ -1540,7 +1554,7 @@ main () { init_vars test_server get_all_items - listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & + listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & 2>&1 >> /dev/null LISTENER_PID=$! start_all_workers ;; @@ -1597,7 +1611,7 @@ main () { exec_cmd "rm -f $PAUSE_SIGNAL" fi cleanup - exit + exit 0 ;; deploy ) display_header @@ -1612,7 +1626,6 @@ main () { show_status cleanup exit 0 - # some show command ;; erase ) display_header @@ -1640,36 +1653,5 @@ main () { # This command starts the that sets the whole framework in motion. main -# Either start new jobs or exit, sleep in the meantime. -while true -do - sleep 5 - JOBS=`ps aux | grep $USER | grep -v grep | grep -v -i screen | grep ppss.sh | wc -l` - log DEBUG "There are $JOBS running processes. " - - get_min_jobs - - if [ "$JOBS" -gt "$MIN_JOBS" ] - then - log DEBUG "Sleeping $INTERVAL seconds." - sleep $INTERVAL - else - if [ "$STOP" == "1" ] || [ ! "$PERCENT" == "100" ] - then - set_status "STOPPED" - elif [ "$PERCENT" == "100" ] - then - set_status "FINISHED" - fi - - echo -en "\033[1B" - log INFO "There are no more running jobs, so we must be finished." - echo -en "\033[1B" - log INFO "Killing listener and remainig processes." - log INFO "Dying processes may display an error message." - kill_process -fi -done - # Exit after all processes have finished. wait