From 55583de5f894fcc61331fc5966b3d205217a437d Mon Sep 17 00:00:00 2001 From: Louwrentius Date: Sun, 11 Jul 2010 22:51:31 +0000 Subject: [PATCH] PPSS now supports inotify file system events. --- ppss | 252 ++++++++++++++++++++++++++++++++++++++++----------- ppss-test.sh | 2 +- 2 files changed, 198 insertions(+), 56 deletions(-) diff --git a/ppss b/ppss index 8b39097..e924cd4 100755 --- a/ppss +++ b/ppss @@ -77,6 +77,8 @@ PROCESSORS="" START_KEY="$RANDOM$RANDOM$RANDOM" STOP_KEY="$RANDOM$RANDOM$RANDOM" # This is a signal to the listener to stop. KILL_KEY="$RANDOM$RANDOM$RANDOM" # This is a signal to stop immediately and kill +QUEUE="" +INOTIFY=0 RECURSION="1" # all running processes. START_PPSS="" STOP_PPSS="" @@ -112,6 +114,9 @@ DISABLE_SKIPPING=0 NODE_STATUS="$PPSS_DIR/status.txt" DAEMON=0 +REGISTER="" # For STACK +STACK="" +TMP_STACK="" showusage_short () { @@ -439,6 +444,15 @@ is_var_empty () { fi } + +detect_inotify () { + + if [ -e /usr/bin/inotifywait ] + then + INOTIFY=1 + fi +} + process_arguments () { # @@ -534,8 +548,10 @@ process_arguments () { --daemon) DAEMON="1" QUIET="1" + detect_inotify add_var_to_config DAEMON "$DAEMON" add_var_to_config QUIET "$QUIET" + add_var_to_config INOTIFY "$INOTIFY" shift 1 ;; --awskeypair|-P) @@ -1022,6 +1038,50 @@ erase_ppss () { fi } +stack_push_tmp () { + + TMP1="$1" + + if [ -z "$TMP_STACK" ] + then + TMP_STACK="$TMP1" + else + TMP_STACK="$TMP_STACK"$'\n'"$TMP1" + fi +} + +stack_push () { + + line="$1" + + if [ -z "$STACK" ] + then + STACK="$line" + else + STACK="$line"$'\n'"$STACK" + fi +} + + +stack_pop () { + TMP_STACK="" + i=0 + tmp="" + for x in $STACK + do + if [ "$i" = "0" ] + then + tmp="$x" + else + stack_push_tmp "$x" + fi + ((i++)) + done + STACK="$TMP_STACK" + REGISTER="$tmp" + log DEBUG "Stack is $STACK" +} + ec2_get_pending_nodes() { # # This function has naver been tested by the author of PPSS. @@ -1592,7 +1652,7 @@ get_all_items () { SIZE_OF_INPUT=$(wc -l "$LISTOFITEMS" | awk '{ print $1 }') - if [ "$SIZE_OF_INPUT" -le "0" ] + if [ "$SIZE_OF_INPUT" -le "0" ] && [ "$DAEMON" = "0" ] then log ERROR "Source file/dir seems to be empty." set_status STOPPED @@ -1682,8 +1742,14 @@ start_single_worker () { # if ! are_we_sourced then - echo "$START_KEY" >> "$FIFO" - return $? + if [ "$DAEMON" = "1" ] + then + echo "$STOP_KEY" >> "$FIFO" + return $? + else + echo "$START_KEY" >> "$FIFO" + return $? + fi fi } @@ -1975,63 +2041,53 @@ infanticide () { } -listen_for_job () { +run_command () { - FINISHED=0 - DIED=0 - PIDS="" - log DEBUG "Listener started." - while read event <& 42 - do - if [ "$event" = "$START_KEY" ] + if [ ! -d "$1" ] && [ ! -z "$1" ] + then + commando "$1" & + MYPID="$!" + disown + PIDS="$PIDS $MYPID" + ((ACTIVE_WORKERS++)) + log DEBUG "Increasing active workers to $ACTIVE_WORKERS" + return 0 + else + return 1 + fi +} + +display_jobs_remaining () { + + if [ "$ACTIVE_WORKERS" == "1" ] && [ "$QUIET" == "0" ] + then + log PRCNT "One job is remaining. " + elif [ "$QUIET" == "0" ] + then + if [ "$ACTIVE_WORKERS" == "1" ] then - if get_item - then - commando "$ITEM" & - MYPID="$!" - disown - PIDS="$PIDS $MYPID" - else - ((DIED++)) - if [ "$DIED" -ge "$MAX_NO_OF_RUNNING_JOBS" ] - then - if [ "$DAEMON" == "1" ] - then - # - # In daemon mode, start all over again. - # - DIED=0 - get_all_items - log DEBUG "Found $SIZE_OF_INPUT items." - start_all_workers - sleep 10 - else - break - fi - else - RES=$((MAX_NO_OF_RUNNING_JOBS-DIED)) - if [ "$RES" == "1" ] && [ "$QUIET" == "0" ] - then - log PRCNT "$((MAX_NO_OF_RUNNING_JOBS-DIED)) job is remaining. " - elif [ "$QUIET" == "0" ] - then - if [ "$DIED" == "1" ] - then - echo -en "\n" - fi - log PRCNT "$((MAX_NO_OF_RUNNING_JOBS-DIED)) jobs are remaining. " - fi - fi - fi - elif [ "$event" == "$KILL_KEY" ] - then - infanticide - break + echo -en "\n" fi + log PRCNT "$((ACTIVE_WORKERS)) jobs are remaining. " + fi +} +start_as_daemon () { + + ACTIVE_WORKERS=0 + get_all_items + log DEBUG "Found $SIZE_OF_INPUT items as daemon." + start_all_workers + sleep 10 +} + +display_progress () { + + if [ "$DAEMON" = "0" ] + then SIZE_OF_INPUT=$(wc -l "$LISTOFITEMS" | awk '{ print $1 }') PERCENT=$((100 * $GLOBAL_COUNTER / $SIZE_OF_INPUT )) - if [ "$DIED" == "0" ] && [ "$FINISHED" == "0" ] + if [ ! "$ACTIVE_WORKERS" == "0" ] && [ "$FINISHED" == "0" ] then if [ "$QUIET" == "0" ] then @@ -2050,8 +2106,10 @@ listen_for_job () { FINISHED=1 fi fi - done + fi +} +terminate_listener () { if [ ! -z "$SSH_MASTER_PID" ] then @@ -2082,6 +2140,86 @@ listen_for_job () { cleanup } +inotify_listener () { + + inotifywait "$SRC_DIR" -m -e close -q --format '%w%f' | \ + while read -r line + do + echo "$line" > "$FIFO" + done +} + +listen_for_job () { + + FINISHED=0 + ACTIVE_WORKERS=$MAX_NO_OF_RUNNING_JOBS + PIDS="" + log DEBUG "Listener started." + + if [ "$DAEMON" = "1" ] && [ "$INOTIFY" = "1" ] + then + ACTIVE_WORKERS=0 + inotify_listener & + MYPID="$!" + disown + PIDS="$PIDS $MYPID" + fi + + while read event <& 42 + do + if [ "$event" = "$START_KEY" ] + then + if get_item + then + run_command "$ITEM" + else + ((ACTIVE_WORKERS--)) + if [ "$ACTIVE_WORKERS" -le "0" ] + then + if [ "$DAEMON" == "1" ] && [ "$INOTIFY" = "0" ] + then + start_as_daemon + else + break + fi + else + display_jobs_remaining + fi + fi + elif [ "$event" == "$STOP_KEY" ] + then + ((ACTIVE_WORKERS--)) + log DEBUG "Decrease active workers to $ACTIVE_WORKERS" + if [ "$ACTIVE_WORKERS" -lt "$MAX_NO_OF_RUNNING_JOBS" ] + then + stack_pop + run_command "$REGISTER" + fi + elif [ "$event" == "$KILL_KEY" ] + then + infanticide + break + else + if [ -e "$event" ] + then + log DEBUG "Event is an item!" + log DEBUG "$ACTIVE_WORKERS - $MAX_NO_OF_RUNNING_JOBS" + stack_push "$event" + if [ "$ACTIVE_WORKERS" -lt "$MAX_NO_OF_RUNNING_JOBS" ] + then + stack_pop + run_command "$REGISTER" + log DEBUG "Active workers: $ACTIVE_WORKERS" + fi + fi + fi + + display_progress + done + + terminate_listener +} + start_all_workers () { if [ "$MAX_NO_OF_RUNNING_JOBS" == "1" ] @@ -2093,12 +2231,16 @@ start_all_workers () { if [ "$DAEMON" == "0" ] then log DSPLY "---------------------------------------------------------" + elif [ "$INOTIFY" = "1" ] + then + return 0 fi i=0 while [ "$i" -lt "$MAX_NO_OF_RUNNING_JOBS" ] do start_single_worker + log DEBUG "Starting worker $i" ((i++)) if [ ! "$MAX_DELAY" == "0" ] diff --git a/ppss-test.sh b/ppss-test.sh index e3bc887..4d0dad4 100755 --- a/ppss-test.sh +++ b/ppss-test.sh @@ -1,7 +1,7 @@ #!/bin/bash DEBUG="$1" -VERSION="2.80" +VERSION="2.81" TMP_DIR="/tmp/ppss" PPSS=./ppss PPSS_DIR=ppss_dir