diff --git a/ppss b/ppss index 2304646..8f1e89c 100755 --- a/ppss +++ b/ppss @@ -75,7 +75,6 @@ IFS_BACKUP="$IFS" CPUINFO="/proc/cpuinfo" PROCESSORS="" START_KEY="$RANDOM$RANDOM$RANDOM" -STOP_KEY="$RANDOM$RANDOM$RANDOM" # This is a signal to the listener to stop. KILL_KEY="$RANDOM$RANDOM$RANDOM" # This is a signal to stop immediately and kill QUEUE="" INOTIFY=0 @@ -85,6 +84,9 @@ STOP_PPSS="" SIZE_OF_INPUT="" LOCAL_LOCKING="1" PROCESSED_ITEMS="" +UNPROCESSED_ITEMS="" +ACTIVE_WORKERS="0" +DAEMON_POLLING_INTERVAL="10" SSH_SERVER="" # Remote server or 'master'. SSH_KEY="" # SSH key for ssh account. @@ -346,20 +348,20 @@ exec_cmd () { then if [ -z "$NOMP" ] then - log DEBUG "REMOTE EXEC" - log DEBUG "$USER@$SSH_SERVER $CMD" +# log DEBUG "REMOTE EXEC" +# log DEBUG "$USER@$SSH_SERVER $CMD" ssh $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER $CMD STATUS=$? elif [ "$NOMP" == "1" ] then - log DEBUG "REMOTE EXEC NO MP" +# log DEBUG "REMOTE EXEC NO MP" ssh $SSH_OPTS_NOMP $SSH_KEY $USER@$SSH_SERVER $CMD STATUS=$? fi else eval "$CMD" STATUS=$? - log DEBUG "LOCAL EXEC - status is $STATUS" +# log DEBUG "LOCAL EXEC - status is $STATUS" fi return $STATUS } @@ -373,10 +375,10 @@ does_file_exist () { RES=`exec_cmd "ls -1 $FILE" 2>&1` if [ "$?" = "0" ] then - log DEBUG "$FILE does exist" + #log DEBUG "$FILE does exist" return 0 else - log DEBUG "$FILE does not exist" + #log DEBUG "$FILE does not exist" return 1 fi } @@ -514,38 +516,30 @@ process_arguments () { exit fi fi - shift 2 - ;; + shift 2 ;; --working-dir|-w ) PPSS_DIR="$2" add_var_to_config PPSS_DIR "$PPSS_DIR" - shift 2 - ;; - + shift 2 ;; --node|-n ) NODES_FILE="$2" add_var_to_config NODES_FILE "$NODES_FILE" - shift 2 - ;; - + shift 2 ;; --sourcefile|-f ) INPUT_FILE="$2" is_var_empty "$INPUT_FILE" add_var_to_config INPUT_FILE "$INPUT_FILE" - shift 2 - ;; + shift 2 ;; --sourcedir|-d ) SRC_DIR="$2" is_var_empty "$SRC_DIR" add_var_to_config SRC_DIR "$SRC_DIR" - shift 2 - ;; + shift 2 ;; --delay|-D) MAX_DELAY="$2" add_var_to_config MAX_DELAY "$MAX_DELAY" - shift 2 - ;; + shift 2 ;; --daemon) DAEMON="1" QUIET="1" @@ -553,34 +547,32 @@ process_arguments () { add_var_to_config DAEMON "$DAEMON" add_var_to_config QUIET "$QUIET" add_var_to_config INOTIFY "$INOTIFY" - shift 1 - ;; + shift 1 ;; + --interval) + is_var_empty "$2" + DAEMON_POLLING_INTERVAL="$2" + add_var_to_config DAEMON_POLLING_INTERVAL "$DAEMON_POLLING_INTERVAL" + shift 2 ;; --awskeypair|-P) AWS_KEYPAIR="$2" add_var_to_config AWS_KEYPAIR "$AWS_KEYPAIR" - shift 2 - ;; + shift 2 ;; --AMI|-A) AMI_ID="$2" add_var_to_config AMI_ID "$AMI_ID" - shift 2 - ;; + shift 2 ;; --type|-T) INSTANCE_TYPE="$2" add_var_to_config INSTANCE_TYPE "$INSTANCE_TYPE" - shift 2 - ;; - + shift 2 ;; --security|-G) SECURITY_GROUP="$2" add_var_to_config SECURITY_GROUP "$SECURITY_GROUP" - shift 2 - ;; + shift 2 ;; --instances|-I) NUM_NODES="$2" add_var_to_config NUM_NODES "$NUM_NODES" - shift 2 - ;; + shift 2 ;; --command|-c ) COMMAND="$2" is_var_empty "$COMMAND" @@ -589,44 +581,35 @@ process_arguments () { COMMAND=\'$COMMAND\' add_var_to_config COMMAND "$COMMAND" fi - shift 2 - ;; + shift 2 ;; -h ) showusage_normal - exit 1;; + exit 1 ;; --help) showusage_long - exit 1;; + exit 1 ;; --homedir|-H ) - if [ ! -z "$2" ] - then - PPSS_HOME_DIR="$2" - add_var_to_config PPSS_DIR $PPSS_HOME_DIR - shift 2 - fi - ;; - + is_var_empty "$2" + PPSS_HOME_DIR="$2" + add_var_to_config PPSS_DIR $PPSS_HOME_DIR + shift 2 ;; --disable-ht|-j ) HYPERTHREADING=no add_var_to_config HYPERTHREADING $HYPERTHREADING - shift 1 - ;; + shift 1 ;; --log|-l ) LOGFILE="$2" add_var_to_config LOGFILE "$LOGFILE" - shift 2 - ;; + shift 2 ;; --no-recursion|-r ) RECURSION="0" add_var_to_config LOGFILE "$RECURSION" - shift 1 - ;; + shift 1 ;; --workingdir|-w ) WORKINGDIR="$2" add_var_to_config WORKINGDIR "$WORKINGDIR" - shift 2 - ;; + shift 2 ;; --key|-k ) SSH_KEY="$2" is_var_empty "$SSH_KEY" @@ -635,48 +618,37 @@ process_arguments () { then SSH_KEY="-i $SSH_KEY" fi - shift 2 - ;; + shift 2 ;; --known-hosts | -K ) SSH_KNOWN_HOSTS="$2" add_var_to_config SSH_KNOWN_HOSTS "$SSH_KNOWN_HOSTS" - shift 2 - ;; + shift 2 ;; --no-scp |-b ) SECURE_COPY=0 add_var_to_config SECURE_COPY "$SECURE_COPY" - shift 1 - ;; + shift 1 ;; --outputdir|-o ) REMOTE_OUTPUT_DIR="$2" add_var_to_config REMOTE_OUTPUT_DIR "$REMOTE_OUTPUT_DIR" - shift 2 - ;; + shift 2 ;; --processes|-p ) - TMP="$2" - if [ ! -z "$TMP" ] - then - MAX_NO_OF_RUNNING_JOBS="$TMP" - add_var_to_config MAX_NO_OF_RUNNING_JOBS "$MAX_NO_OF_RUNNING_JOBS" - shift 2 - fi - ;; + is_var_empty "$2" + MAX_NO_OF_RUNNING_JOBS="$2" + add_var_to_config MAX_NO_OF_RUNNING_JOBS "$MAX_NO_OF_RUNNING_JOBS" + shift 2 ;; --master|-m ) SSH_SERVER="$2" add_var_to_config SSH_SERVER "$SSH_SERVER" - shift 2 - ;; + shift 2 ;; --script|-S ) SCRIPT="$2" add_var_to_config SCRIPT "$SCRIPT" - shift 2 - ;; + shift 2 ;; --download) DOWNLOAD_TO_NODE="1" add_var_to_config DOWNLOAD_TO_NODE "$DOWNLOAD_TO_NODE" - shift 1 - ;; + shift 1 ;; --upload) if [ -z "$REMOTE_OUTPUT_DIR" ] then @@ -685,32 +657,28 @@ process_arguments () { fi UPLOAD_TO_SERVER="1" add_var_to_config UPLOAD_TO_SERVER "$UPLOAD_TO_SERVER" - shift 1 - ;; + shift 1 ;; --quiet|-q ) QUIET="1" add_var_to_config QUIET "$QUIET" - shift 1 - ;; + shift 1 ;; --user|-u ) USER="$2" add_var_to_config USER "$USER" - shift 2 - ;; + shift 2 ;; --version|-v ) echo "" echo "$SCRIPT_NAME version $SCRIPT_VERSION" echo "" - exit 0 - ;; + exit 0 ;; * ) showusage_short echo echo "Unknown option $1 " echo - exit 1;; + exit 1 ;; esac done @@ -1063,7 +1031,32 @@ stack_push () { fi } +unprocessed_stack_push () { + + line="$1" + + if [ -z "$PROCESSED_ITEMS" ] + then + UNPROCESSED_ITEMS="$line" + else + UNPROCESSED_ITEMS="$line"$'\n'"$UNPROCESSED_ITEMS" + fi +} + +processed_stack_push () { + + line="$1" + + if [ -z "$PROCESSED_ITEMS" ] + then + PROCESSED_ITEMS="$line" + else + PROCESSED_ITEMS="$line"$'\n'"$PROCESSED_ITEMS" + fi +} + stack_pop () { + TMP_STACK="" i=0 tmp="" @@ -1612,24 +1605,36 @@ remove_processed_items_from_input_file () { # This function removes all items that have already been processed. # Processed items have a lock dir in the PPPSS_ITEM_LOCK_DIR. # + UNPROCESSED_ITEMS="" if [ "$MODE" = "status" ] then return 1 fi + log DEBUG "Running $FUNCNAME...." + if [ ! -e "$LISTOFITEMS" ] + then + echo "$LISTOFITEMS does not exist!" + return 1 + else + SIZE=`wc -l "$LISTOFITEMS"` + if [ "$SIZE" = "0" ] + then + echo "$LISTOFITEMS exists but is empty." + return 1 + fi + fi INPUTFILES=`list_all_input_items` - RES=`exec_cmd "ls -1 $ITEM_LOCK_DIR"` - rm "$LISTOFITEMS" + log DEBUG "$PROCESSED_ITEMS" for x in $INPUTFILES do FILE_IS_PROCESSED=0 - for y in $RES + for y in $PROCESSED_ITEMS do - TMP=`echo "$x" | $MD5 | awk '{ print $1 }'` - if [ "$y" = "$TMP" ] + if [ "$y" = "$x" ] then FILE_IS_PROCESSED=1 fi @@ -1637,22 +1642,20 @@ remove_processed_items_from_input_file () { if [ "$FILE_IS_PROCESSED" = "0" ] then - echo "$x" >> "$LISTOFITEMS" + log DEBUG "ITEM $x is not processed." + unprocessed_stack_push "$x" else - log DEBUG "ITEM $x is already processed!" + log DEBUG "ITEM $x is already processed!." fi done -} - -get_list_of_new_items () { - - echo + echo "$UNPROCESSED_ITEMS" > "$LISTOFITEMS" } get_all_items () { if [ "$DAEMON" == "1" ] then + GLOBAL_COUNTER=1 get_input_lock fi @@ -1677,8 +1680,6 @@ get_all_items () { check_status "$?" "$FUNCNAME" "Could not list files within remote source directory." fi - remove_processed_items_from_input_file - else if [ -e "$SRC_DIR" ] then @@ -1744,6 +1745,7 @@ get_all_items () { cleanup exit 1 fi + remove_processed_items_from_input_file } get_item () { @@ -1759,7 +1761,7 @@ get_item () { # # Return error if list size is empty. # - if [ -z "$SIZE_OF_INPUT" ] + if [ -z "$SIZE_OF_INPUT" ] then log DEBUG "Got no size of input..." return 1 @@ -1770,14 +1772,7 @@ get_item () { # if [ "$SIZE_OF_INPUT" -le "0" ] then - SIZE_OF_INPUT=$(wc -l "$LISTOFITEMS" | awk '{ print $1 }') - if [ "$?" = "0" ] - then - return 0 - else - log DEBUG "Size of input 0 or less." - return 1 - fi + return 1 fi # @@ -1805,9 +1800,6 @@ get_item () { if [ ! "$LOCK" = "0" ] then log DEBUG "Item $ITEM is locked." - # - # Recursion, get_ttem calls itself, until all items are done. - # get_item else log DEBUG "Got lock on $ITEM" @@ -1820,21 +1812,15 @@ get_item () { fi } -start_single_worker () { +start_new_worker () { # # This function kicks the listener to start a worker process. # if ! are_we_sourced then - if [ "$DAEMON" = "1" ] && [ "$INOTIFY" = "1" ] - then - echo "$STOP_KEY" >> "$FIFO" - return $? - else echo "$START_KEY" >> "$FIFO" return $? - fi fi } @@ -1867,12 +1853,12 @@ commando () { # This function will start a chain reaction of events. # # The commando executes a command on an item and, when finished, - # executes the start_single_worker. This function selects a new + # executes the start_new_worker. This function selects a new # item and sends it to the fifo. The listener process receives # the item and excutes this commando function on the item. # So in essence, the commando function keeps calling itself # indirectly until no items are left. This will form a single - # working queue. By executing multiple start_single_worker + # working queue. By executing multiple start_new_worker # functions based on the CPU cores available, parallel processing # is achieved, with a queue for each core. # @@ -1962,7 +1948,7 @@ commando () { if [ -e "$ITEM_LOG_FILE" ] && [ "$DISABLE_SKIPPING" = "0" ] then log DEBUG "Item is already processed, skipping..." - start_single_worker + start_new_worker return 0 fi @@ -2089,7 +2075,7 @@ commando () { fi fi - start_single_worker + start_new_worker } infanticide () { @@ -2128,18 +2114,34 @@ infanticide () { run_command () { - if [ ! -d "$1" ] && [ ! -z "$1" ] + INPUT="$1" + + log DEBUG "Current active workers is $ACTIVE_WORKERS" + + if [ "$ACTIVE_WORKERS" -le "$MAX_NO_OF_RUNNING_JOBS" ] then - commando "$1" & - MYPID="$!" - disown - PIDS="$PIDS $MYPID" - ((ACTIVE_WORKERS++)) - log DEBUG "Increasing active workers to $ACTIVE_WORKERS" - return 0 - else - return 1 - fi + if [ -z "$INPUT" ] + then + stack_pop + INPUT="$REGISTER" + fi + + if [ ! -d "$INPUT" ] && [ ! -z "$INPUT" ] + then + commando "$INPUT" & + MYPID="$!" + disown + PIDS="$PIDS $MYPID" + ((ACTIVE_WORKERS++)) + log DEBUG "Increasing active workers to $ACTIVE_WORKERS" + return 0 + else + log DEBUG "Item is a directory or empty." + return 1 + fi + else + log DEBUG "Strange, this message should never be displaed." + fi } display_jobs_remaining () { @@ -2157,15 +2159,6 @@ display_jobs_remaining () { fi } -start_as_daemon () { - - ACTIVE_WORKERS=0 - get_all_items - log DEBUG "Found $SIZE_OF_INPUT items as daemon." - start_all_workers - sleep 10 -} - display_progress () { if [ "$DAEMON" = "0" ] @@ -2196,6 +2189,8 @@ display_progress () { terminate_listener () { + log DEBUG "Terminating listener." + if [ ! -z "$SSH_MASTER_PID" ] then log DEBUG "SSH master PID is $SSH_MASTER_PID" @@ -2212,7 +2207,7 @@ terminate_listener () { echo stop-ppss log DSPLY "Finished. Consult $JOB_LOG_DIR for job output." - log DSPLY "Press ENTER to continue." + #log DSPLY "Press ENTER to continue." else echo stop-ppss @@ -2234,79 +2229,142 @@ inotify_listener () { done } -start_inotify_listener () { +is_item_unprocessed () { - ACTIVE_WORKERS=0 - inotify_listener & - MYPID="$!" - disown - PIDS="$PIDS $MYPID" + VAR="$1" + STATUS=0 + + if [ -z "$VAR" ] + then + log DEBUG "$FUNCNAME: something is wrong, no argument received." + return 1 + fi + + for x in $PROCESSED_ITEMS + do + if [ "$x" = "$VAR" ] + then + STATUS=1 + fi + done + + log DEBUG "Is item $VAR unprocessed: $STATUS" + + return $STATUS } -listen_for_job () { +daemon_listener () { - FINISHED=0 - ACTIVE_WORKERS=$MAX_NO_OF_RUNNING_JOBS - PIDS="" - log DEBUG "Listener started." + while true + do + get_all_items + while get_item + do + if is_item_unprocessed "$ITEM" + then + log DEBUG "Daemon sending item $ITEM to fifo." + echo "$ITEM" >> "$FIFO" + processed_stack_push "$ITEM" + log DEBUG "Processed items is $PROCESSED_ITEMS" + fi + done + sleep "$DAEMON_POLLING_INTERVAL" + done +} + +start_daemon_listener () { + + daemon_listener & + MYPID="$!" + disown + PIDS="$PIDS $MYPID" +} + +start_inotify_listener () { + + ACTIVE_WORKERS=0 + inotify_listener & + MYPID="$!" + disown + PIDS="$PIDS $MYPID" +} + +start_as_daemon () { if [ "$DAEMON" = "1" ] then + log DEBUG "Daemon mode enabled." if [ "$INOTIFY" = "1" ] then log INFO "Linux inotify enabled." start_inotify_listener else + start_daemon_listener log INFO "Linux inotify disabled." fi + else + log DEBUG "Daemon mode disabled." fi +} + +decrease_active_workers () { + + if [ "$ACTIVE_WORKERS" -gt "0" ] + then + ((ACTIVE_WORKERS--)) + fi +} + +listen_for_job () { + + FINISHED=0 + ACTIVE_WORKERS="$MAX_NO_OF_RUNNING_JOBS" + PIDS="" + log DEBUG "Listener started." + + start_as_daemon while read event <& 42 do + log DEBUG "Current active workers is $ACTIVE_WORKERS" + + decrease_active_workers + if [ "$event" = "$START_KEY" ] then log DEBUG "Got a 'start-key' event" - if get_item + + if [ "$DAEMON" = "0" ] then - run_command "$ITEM" - else - ((ACTIVE_WORKERS--)) - if [ "$DAEMON" == "1" ] && [ "$INOTIFY" = "0" ] + if get_item then - start_as_daemon + log DEBUG "Got an item, running command..." + run_command "$ITEM" else - break + log DEBUG "No more new items..." + if [ "$ACTIVE_WORKERS" = "0" ] + then + break + else + display_jobs_remaining + fi fi - display_jobs_remaining - fi - elif [ "$event" == "$STOP_KEY" ] - then - ((ACTIVE_WORKERS--)) - log DEBUG "Decrease active workers to $ACTIVE_WORKERS" - if [ "$ACTIVE_WORKERS" -lt "$MAX_NO_OF_RUNNING_JOBS" ] - then - stack_pop - run_command "$REGISTER" + else + log DEBUG "Daemon mode: a worker finished..." + decrease_active_workers + run_command fi + elif [ "$event" == "$KILL_KEY" ] then infanticide break else - if [ -e "$event" ] - then - log DEBUG "Event is an item!" - log DEBUG "$ACTIVE_WORKERS - $MAX_NO_OF_RUNNING_JOBS" - stack_push "$event" - if [ "$ACTIVE_WORKERS" -lt "$MAX_NO_OF_RUNNING_JOBS" ] - then - stack_pop - run_command "$REGISTER" - log DEBUG "Active workers: $ACTIVE_WORKERS" - fi - fi + log DEBUG "Event is an item!" + stack_push "$event" + log DEBUG "stack after push is $STACK" + run_command fi - display_progress done @@ -2321,6 +2379,7 @@ start_all_workers () { else log DSPLY "Starting $MAX_NO_OF_RUNNING_JOBS parallel workers." fi + if [ "$DAEMON" == "0" ] then log DSPLY "---------------------------------------------------------" @@ -2332,7 +2391,7 @@ start_all_workers () { i=0 while [ "$i" -lt "$MAX_NO_OF_RUNNING_JOBS" ] do - start_single_worker + start_new_worker log DEBUG "Starting worker $i" ((i++))