diff --git a/trunk/ppss b/trunk/ppss
new file mode 100755
index 0000000..88abbe3
--- /dev/null
+++ b/trunk/ppss
@@ -0,0 +1,1839 @@
+#!/usr/bin/env bash
+#
+# PPSS, the Parallel Processing Shell Script
+#
+# Copyright (c) 2010, Louwrentius
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# See
+# for a copy of the GNU General Public License
+#
+# "Patches or other contributions are always welcome!"
+#
+
+# Handling control-c for a clean shutdown.
+trap 'kill_process' SIGINT
+
+# Setting some vars.
+SCRIPT_NAME="Distributed Parallel Processing Shell Script"
+SCRIPT_VERSION="2.55"
+
+# The first argument to this script can be a mode.
+MODES="node start config stop pause continue deploy status erase kill"
+for x in $MODES
+do
+ if [ "$x" == "$1" ]
+ then
+ MODE="$1"
+ shift
+ fi
+done
+
+# The working directory of PPSS can be set with
+# export PPSS_DIR=/path/to/workingdir
+if [ -z "$PPSS_DIR" ]
+then
+ PPSS_DIR="./ppss_dir"
+fi
+
+CONFIG=""
+HOSTNAME=`hostname`
+ARCH=`uname`
+
+PID="$$"
+GLOBAL_LOCK="$PPSS_DIR/PPSS-GLOBAL-LOCK-$PID" # Global lock file used by local PPSS instance.
+PAUSE_SIGNAL="$PPSS_DIR/pause_signal" # Pause processing if this file is present.
+PAUSE_DELAY=60 # Polling every 1 minutes by default.
+STOP_SIGNAL="$PPSS_DIR/stop_signal" # Stop processing if this file is present.
+ARRAY_POINTER_FILE="$PPSS_DIR/ppss-array-pointer-$PID" # Pointer for keeping track of processed items.
+JOB_LOG_DIR="$PPSS_DIR/job_log" # Directory containing log files of processed items.
+LOGFILE="$PPSS_DIR/ppss-log-$PID.txt" # General PPSS log file. Contains lots of info.
+STOP=0 # STOP job.
+MAX_DELAY=0 # MAX DELAY between jobs.
+MAX_LOCK_DELAY=9 #
+PERCENT="0"
+LISTENER_PID=""
+IFS_BACKUP="$IFS"
+CPUINFO=/proc/cpuinfo
+PROCESSORS=""
+STOP_KEY=$RANDOM$RANDOM$RANDOM
+KILL_KEY=$RANDOM$RANDOM$RANDOM
+
+SSH_SERVER="" # Remote server or 'master'.
+SSH_KEY="" # SSH key for ssh account.
+SSH_KNOWN_HOSTS=""
+SSH_SOCKET="/tmp/ppss_ssh_socket" # Multiplex multiple SSH connections over 1 master.
+SSH_OPTS="-o BatchMode=yes -o ControlPath=$SSH_SOCKET \
+ -o GlobalKnownHostsFile=./known_hosts \
+ -o ControlMaster=auto \
+ -o Cipher=blowfish \
+ -o ConnectTimeout=10 "
+ # Blowfish is faster but still secure.
+SSH_MASTER_PID=""
+
+PPSS_HOME_DIR=ppss
+ITEM_LOCK_DIR="$PPSS_DIR/PPSS_ITEM_LOCK_DIR" # Remote directory on master used for item locking.
+PPSS_LOCAL_TMPDIR="$PPSS_DIR/PPSS_LOCAL_TMPDIR" # Local directory on slave for local processing.
+PPSS_LOCAL_OUTPUT="$PPSS_DIR/PPSS_LOCAL_OUTPUT" # Local directory on slave for local output.
+TRANSFER_TO_SLAVE="0" # Transfer item to slave via (s)cp.
+SECURE_COPY="1" # If set, use SCP, Otherwise, use cp.
+REMOTE_OUTPUT_DIR="" # Remote directory to which output must be uploaded.
+SCRIPT="" # Custom user script that is executed by ppss.
+ITEM_ESCAPED=""
+NODE_STATUS="$PPSS_DIR/status.txt"
+
+showusage_short () {
+
+ echo
+ echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION"
+ echo
+ echo "usage: $0 [ -d | -f ] [ -c ' \"\$ITEM\"' ]"
+ echo " [ -C ] [ -j ] [ -l ] [ -p <# jobs> ]"
+ echo " [ -D ] [ -h ] [ --help ]"
+ echo
+ echo "Examples:"
+ echo " $0 -d /dir/with/some/files -c 'gzip '"
+ echo " $0 -d /dir/with/some/files -c 'gzip \"\$ITEM\"' -D 5"
+ echo " $0 -d /dir/with/some/files -c 'cp \"\$ITEM\" /tmp' -p 2"
+}
+
+showusage_normal () {
+
+ echo
+ echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION"
+ echo
+ echo "PPSS is a Bash shell script that executes commands in parallel on a set "
+ echo "of items, such as files in a directory, or lines in a file."
+ echo
+ echo "This short summary only discusses options for stand-alone mode. for a "
+ echo "complete listing of all options, run PPSS with the options --help"
+ echo
+ echo "Usage $0 [ options ]"
+ echo
+ echo -e "--command | -c Command to execute. Syntax: ' ' including the single quotes."
+ echo -e " Example: -c 'ls -alh '. It is also possible to specify where an item "
+ echo -e " must be inserted: 'cp \"\$ITEM\" /somedir'."
+ echo
+ echo -e "--sourcedir | -d Directory that contains files that must be processed. Individual files"
+ echo -e " are fed as an argument to the command that has been specified with -c."
+ echo
+ echo -e "--sourcefile | -f Each single line of the supplied file will be fed as an item to the"
+ echo -e " command that has been specified with -c."
+ echo
+ echo -e "--config | -C If the mode is config, a config file with the specified name will be"
+ echo -e " generated based on all the options specified. In the other modes".
+ echo -e " this option will result in PPSS reading the config file and start"
+ echo -e " processing items based on the settings of this file."
+ echo
+ echo -e "--enable-ht | -j Enable hyperthreading. Is disabled by default."
+ echo
+ echo -e "--log | -l Sets the name of the log file. The default is ppss-log.txt."
+ echo
+ echo -e "--processes | -p Start the specified number of processes. Ignore the number of available"
+ echo -e " CPUs."
+ echo
+ echo -e "--delay | -D Adds an initial random delay to the start of all parallel jobs to spread"
+ echo -e " the load. The delay is only used at the start of all 'threads'."
+ echo
+ echo -e "Example: encoding some wav files to mp3 using lame:"
+ echo
+ echo -e "$0 -d /path/to/wavfiles -c 'lame '"
+ echo
+ echo -e "Extended usage: use --help"
+ echo
+}
+
+if [ "$#" == "0" ]
+then
+ showusage_short
+ exit 1
+fi
+
+showusage_long () {
+
+ echo
+ echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION"
+ echo
+ echo "PPSS is a Bash shell script that executes commands in parallel on a set "
+ echo "of items, such as files in a directory, or lines in a file."
+ echo
+ echo "Usage: $0 [ MODE ] [ options ]"
+ echo
+ echo "Modes are optional and mainly used for running in distributed mode. Modes are:"
+ echo
+ echo " config Generate a config file based on the supplied option parameters."
+ echo " deploy Deploy PPSS and related files on the specified nodes."
+ echo " erase Erase PPSS and related files from the specified nodes."
+ echo
+ echo " start Starting PPSS on nodes."
+ echo " pause Pausing PPSS on all nodes."
+ echo " stop Stopping PPSS on all nodes."
+ echo " node Running PPSS as a node, requires additional options."
+ echo
+ echo "Options are:"
+ echo
+ echo -e "--command | -c Command to execute. Syntax: ' ' including the single quotes."
+ echo -e " Example: -c 'ls -alh '. It is also possible to specify where an item "
+ echo -e " must be inserted: 'cp \"\$ITEM\" /somedir'."
+ echo
+ echo -e "--sourcedir | -d Directory that contains files that must be processed. Individual files"
+ echo -e " are fed as an argument to the command that has been specified with -c."
+ echo
+ echo -e "--sourcefile | -f Each single line of the supplied file will be fed as an item to the"
+ echo -e " command that has been specified with -c."
+ echo
+ echo -e "--config | -C If the mode is config, a config file with the specified name will be"
+ echo -e " generated based on all the options specified. In the other modes".
+ echo -e " this option will result in PPSS reading the config file and start"
+ echo -e " processing items based on the settings of this file."
+ echo
+ echo -e "--enable-ht | -j Enable hyperthreading. Is disabled by default."
+ echo
+ echo -e "--log | -l Sets the name of the log file. The default is ppss-log.txt."
+ echo
+ echo -e "--processes | -p Start the specified number of processes. Ignore the number of available"
+ echo -e " CPUs."
+ echo
+ echo -e "--delay | -D Adds an initial random delay to the start of all parallel jobs to spread"
+ echo -e " the load. The delay is only used at the start of all 'threads'."
+ echo
+ echo -e "The following options are used for distributed execution of PPSS."
+ echo
+ echo -e "--master | -m Specifies the SSH server that is used for communication between nodes."
+ echo -e " Using SSH, file locks are created, informing other nodes that an item "
+ echo -e " is locked. Also, often items, such as files, reside on this host. SCP "
+ echo -e " is used to transfer files from this host to nodes for local procesing."
+ echo
+ echo -e "--node | -n File containig a list of nodes that act as PPSS clients. One IP / DNS "
+ echo -e " name per line."
+ echo
+ echo -e "--key | -k The SSH key that a node uses to connect to the master."
+ echo
+ echo -e "--known-hosts | -K The file that contains the server public key. Can often be found on "
+ echo -e " hosts that already once connected to the server. See the file "
+ echo -e " ~/.ssh/known_hosts or else, manualy connect once and check this file."
+ echo
+ echo -e "--user | -u The SSH user name that is used when logging in into the master SSH"
+ echo -e " server."
+ echo
+ echo -e "--script | -S Specifies the script/program that must be copied to the nodes for "
+ echo -e " execution through PPSS. Only used in the deploy mode."
+ echo -e " This option should be specified if necessary when generating a config."
+ echo
+ echo -e "--transfer | -t This option specifies that an item will be downloaded by the node "
+ echo -e " from the server or share to the local node for processing."
+ echo
+ echo -e "--no-scp | -b Do not use scp for downloading items. Use cp instead. Assumes that a"
+ echo -e " network file system (NFS/SMB) is mounted under a local mountpoint."
+ echo
+ echo -e "--outputdir | -o Directory on server where processed files are put. If the result of "
+ echo -e " encoding a wav file is an mp3 file, the mp3 file is put in the "
+ echo -e " directory specified with this option."
+ echo
+ echo -e "--homedir | -H Directory in which directory PPSS is installed on the node."
+ echo -e " Default is 'ppss'."
+ echo
+ echo -e "Example: encoding some wav files to mp3 using lame:"
+ echo
+ echo -e "$0 -c 'lame ' -d /path/to/wavfiles -j "
+ echo
+ echo -e "Running PPSS based on a configuration file."
+ echo
+ echo -e "$0 -C config.cfg"
+ echo
+ echo -e "Running PPSS on a client as part of a cluster."
+ echo
+ echo -e "$0 -d /somedir -c 'cp "$ITEM" /some/destination' -s 10.0.0.50 -u ppss -t -k ppss-key.key"
+ echo
+}
+
+kill_process () {
+
+ echo "$KILL_KEY" >> "$FIFO"
+}
+
+exec_cmd () {
+
+
+ CMD="$1"
+
+ if [ ! -z "$SSH_SERVER" ]
+ then
+ ssh $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER $CMD
+ return $?
+ else
+ eval "$CMD"
+ return $?
+ fi
+}
+
+# this function makes remote or local checking of existence of items transparent.
+does_file_exist () {
+
+ FILE="$1"
+ `exec_cmd "ls -1 $FILE" >> /dev/null 2>&1`
+ if [ "$?" == "0" ]
+ then
+ return 0
+ else
+ return 1
+ fi
+}
+
+check_for_interrupt () {
+
+ does_file_exist "$STOP_SIGNAL"
+ if [ "$?" == "0" ]
+ then
+ set_status "STOPPED"
+ log INFO "STOPPING job. Stop signal found."
+ STOP="1"
+ return 1
+ fi
+
+ does_file_exist "$PAUSE_SIGNAL"
+ if [ "$?" == "0" ]
+ then
+ set_status "PAUZED"
+ log INFO "PAUSE: sleeping for $PAUSE_DELAY SECONDS."
+ sleep 0.$PAUSE_DELAY
+ check_for_interrupt
+ else
+ set_status "RUNNING"
+ fi
+}
+
+cleanup () {
+
+ #log DEBUG "$FUNCNAME - Cleaning up all temp files and processes."
+
+ if [ -e "$FIFO" ]
+ then
+ rm "$FIFO"
+ fi
+
+ if [ -e "$ARRAY_POINTER_FILE" ]
+ then
+ rm "$ARRAY_POINTER_FILE"
+ fi
+
+ if [ -e "$GLOBAL_LOCK" ]
+ then
+ rm -rf "$GLOBAL_LOCK"
+ fi
+
+ if [ -e "$SSH_SOCKET" ]
+ then
+ rm -rf "$SSH_SOCKET"
+ fi
+
+}
+
+add_var_to_config () {
+
+ if [ "$MODE" == "config" ]
+ then
+
+ VAR="$1"
+ VALUE="$2"
+
+ echo -e "$VAR=$VALUE" >> $CONFIG
+ fi
+}
+
+# Process any command-line options that are specified."
+while [ $# -gt 0 ]
+do
+ case $1 in
+
+ --config|-C )
+ CONFIG="$2"
+
+ if [ "$MODE" == "config" ]
+ then
+ if [ -e "$CONFIG" ]
+ then
+ echo "Do want to overwrite existing config file? [y/n]"
+ read yn
+ if [ "$yn" == "y" ] || [ "$yn" == "yes" ]
+ then
+ rm "$CONFIG"
+ else
+ echo "Aborting..."
+ cleanup
+ exit
+ fi
+ fi
+ fi
+
+ if [ ! "$MODE" == "config" ]
+ then
+ source $CONFIG
+ fi
+
+ if [ ! -z "$SSH_KEY" ]
+ then
+ SSH_KEY="-i $SSH_KEY"
+ fi
+
+ if [ ! -e "./known_hosts" ]
+ then
+ if [ -e $SSH_KNOWN_HOSTS ]
+ then
+ if [ ! "$SSH_KNOWN_HOSTS" == "known_hosts" ]
+ then
+ cat $SSH_KNOWN_HOSTS > ./known_hosts
+ fi
+ else
+ echo "File $SSH_KNOWN_HOSTS does not exist."
+ exit
+ fi
+ fi
+ shift 2
+ ;;
+
+ --working-dir|-w )
+ PPSS_DIR="$2"
+ add_var_to_config PPSS_DIR "$PPSS_DIR"
+ shift 2
+ ;;
+
+ --node|-n )
+ NODES_FILE="$2"
+ add_var_to_config NODES_FILE "$NODES_FILE"
+ shift 2
+ ;;
+
+ --sourcefile|-f )
+ INPUT_FILE="$2"
+ add_var_to_config INPUT_FILE "$INPUT_FILE"
+ shift 2
+ ;;
+ --sourcedir|-d )
+ SRC_DIR="$2"
+ add_var_to_config SRC_DIR "$SRC_DIR"
+ shift 2
+ ;;
+ --delay|-D)
+ MAX_DELAY="$2"
+ add_var_to_config MAX_DELAY "$MAX_DELAY"
+ shift 2
+ ;;
+ --command|-c )
+ COMMAND="$2"
+ if [ "$MODE" == "config" ]
+ then
+ COMMAND=\'$COMMAND\'
+ add_var_to_config COMMAND "$COMMAND"
+ fi
+ shift 2
+ ;;
+
+ -h )
+ showusage_normal
+ exit 1;;
+ --help)
+ showusage_long
+ exit 1;;
+ --homedir|-H )
+ if [ ! -z "$2" ]
+ then
+ PPSS_HOME_DIR="$2"
+ add_var_to_config PPSS_DIR $PPSS_HOME_DIR
+ shift 2
+ fi
+ ;;
+
+ --disable-ht|-j )
+ HYPERTHREADING=no
+ add_var_to_config HYPERTHREADING $HYPERTHREADING
+ shift 1
+ ;;
+ --log|-l )
+ LOGFILE="$2"
+ add_var_to_config LOGFILE "$LOGFILE"
+ shift 2
+ ;;
+ --workingdir|-w )
+ WORKINGDIR="$2"
+ add_var_to_config WORKINGDIR "$WORKINGDIR"
+ shift 2
+ ;;
+ --key|-k )
+ SSH_KEY="$2"
+ add_var_to_config SSH_KEY "$SSH_KEY"
+ if [ ! -z "$SSH_KEY" ]
+ then
+ SSH_KEY="-i $SSH_KEY"
+ fi
+ shift 2
+ ;;
+ --known-hosts | -K )
+ SSH_KNOWN_HOSTS="$2"
+ add_var_to_config SSH_KNOWN_HOSTS "$SSH_KNOWN_HOSTS"
+ shift 2
+ ;;
+
+ --no-scp |-b )
+ SECURE_COPY=0
+ add_var_to_config SECURE_COPY "$SECURE_COPY"
+ shift 1
+ ;;
+ --outputdir|-o )
+ REMOTE_OUTPUT_DIR="$2"
+ add_var_to_config REMOTE_OUTPUT_DIR "$REMOTE_OUTPUT_DIR"
+ shift 2
+ ;;
+ --processes|-p )
+ TMP="$2"
+ if [ ! -z "$TMP" ]
+ then
+ MAX_NO_OF_RUNNING_JOBS="$TMP"
+ add_var_to_config MAX_NO_OF_RUNNING_JOBS "$MAX_NO_OF_RUNNING_JOBS"
+ shift 2
+ fi
+ ;;
+ --master|-m )
+ SSH_SERVER="$2"
+ add_var_to_config SSH_SERVER "$SSH_SERVER"
+ shift 2
+ ;;
+ --script|-S )
+ SCRIPT="$2"
+ add_var_to_config SCRIPT "$SCRIPT"
+ shift 2
+ ;;
+ --transfer|-t )
+ TRANSFER_TO_SLAVE="1"
+ add_var_to_config TRANSFER_TO_SLAVE "$TRANSFER_TO_SLAVE"
+ shift 1
+ ;;
+ --user|-u )
+ USER="$2"
+ add_var_to_config USER "$USER"
+ shift 2
+ ;;
+
+ --version|-v )
+ echo ""
+ echo "$SCRIPT_NAME version $SCRIPT_VERSION"
+ echo ""
+ exit 0
+ ;;
+ * )
+
+ showusage_short
+ echo
+ echo "Unknown option $1 "
+ echo
+ exit 1;;
+ esac
+done
+
+
+display_header () {
+
+ log info ""
+ log INFO "========================================================="
+ log INFO " |P|P|S|S| "
+ log INFO "$SCRIPT_NAME version $SCRIPT_VERSION"
+ log INFO "========================================================="
+ log INFO "Hostname:\t\t$HOSTNAME"
+ log INFO "---------------------------------------------------------"
+}
+
+create_working_directory () {
+
+ if [ ! -e "$PPSS_DIR" ]
+ then
+ mkdir -p "$PPSS_DIR"
+ fi
+}
+
+# Init all vars
+init_vars () {
+
+ create_working_directory
+
+ if [ "$ARCH" == "Darwin" ]
+ then
+ MIN_JOBS=4
+ elif [ "$ARCH" == "Linux" ]
+ then
+ MIN_JOBS=3
+ fi
+
+ if [ -e "$LOGFILE" ]
+ then
+ rm $LOGFILE
+ fi
+
+ display_header
+
+ if [ -z "$COMMAND" ]
+ then
+ echo
+ log ERROR "No command specified."
+ echo
+ showusage_normal
+ cleanup
+ exit 1
+ fi
+
+ echo 0 > $ARRAY_POINTER_FILE
+
+ FIFO=/tmp/ppss-fifo-$RANDOM-$RANDOM
+
+ if [ ! -e "$FIFO" ]
+ then
+ mkfifo -m 600 $FIFO
+ fi
+
+ exec 42<> $FIFO
+
+ set_status "RUNNING"
+
+ if [ -e "$CPUINFO" ]
+ then
+ CPU=`cat /proc/cpuinfo | grep 'model name' | cut -d ":" -f 2 | sed -e s/^\ //g | sort | uniq`
+ log INFO "CPU: $CPU"
+ elif [ "$ARCH" == "Darwin" ]
+ then
+ MODEL=`system_profiler SPHardwareDataType | grep "Processor Name" | cut -d ":" -f 2`
+ SPEED=`system_profiler SPHardwareDataType | grep "Processor Speed" | cut -d ":" -f 2`
+ log INFO "CPU: $MODEL $SPEED"
+ elif [ "$ARCH" == "SunOS" ]
+ then
+ CPU=`psrinfo -v | grep MHz | cut -d " " -f 4,8 | awk '{ printf ("Processor architecture: %s @ %s MHz.\n", $1,$2) }' | head -n 1`
+
+ log INFO "$CPU"
+ else
+ log INFO "CPU: Cannot determine. Provide a patch for your arch!"
+ log INFO "Arch is $ARCH"
+ fi
+
+ if [ -z "$MAX_NO_OF_RUNNING_JOBS" ]
+ then
+ get_no_of_cpus $HYPERTHREADING
+ fi
+
+ does_file_exist "$JOB_LOG_DIR"
+ if [ ! "$?" == "0" ]
+ then
+ log DEBUG "Job log directory $JOB_lOG_DIR does not exist. Creating."
+ exec_cmd "mkdir -p $JOB_LOG_DIR"
+ else
+ log DEBUG "Job log directory $JOB_LOG_DIR exists."
+ fi
+
+ does_file_exist "$ITEM_LOCK_DIR"
+ if [ ! "$?" == "0" ] && [ ! -z "$SSH_SERVER" ]
+ then
+ log DEBUG "Creating remote item lock dir."
+ exec_cmd "mkdir $ITEM_LOCK_DIR"
+ fi
+
+ if [ ! -e "$JOB_LOG_DIR" ]
+ then
+ mkdir -p "$JOB_LOG_DIR"
+ fi
+
+ does_file_exist "$REMOTE_OUTPUT_DIR"
+ if [ ! "$?" == "0" ]
+ then
+ log ERROR "Remote output dir $REMOTE_OUTPUT_DIR does not exist."
+ set_status STOPPED
+ cleanup
+ exit
+ fi
+
+ if [ ! -e "$PPSS_LOCAL_TMPDIR" ]
+ then
+ mkdir "$PPSS_LOCAL_TMPDIR"
+ fi
+
+ if [ ! -e "$PPSS_LOCAL_OUTPUT" ]
+ then
+ mkdir "$PPSS_LOCAL_OUTPUT"
+ fi
+}
+
+get_status () {
+
+ STATUS=`cat "$NODE_SATUS"`
+ echo "$STATUS"
+}
+
+set_status () {
+
+ STATUS="$1"
+ echo "$HOSTNAME $STATUS" > "$NODE_STATUS"
+}
+
+
+expand_str () {
+
+ STR=$1
+ LENGTH=$TYPE_LENGTH
+ SPACE=" "
+
+ while [ "${#STR}" -lt "$LENGTH" ]
+ do
+ STR=$STR$SPACE
+ done
+
+ echo "$STR"
+}
+
+log () {
+
+ # Type 'INFO' is logged to the screen
+ # Any other log-type is only logged to the logfile.
+
+ TYPE="$1"
+ MESG="$2"
+ TYPE_LENGTH=5
+
+ TYPE_EXP=`expand_str "$TYPE"`
+
+ DATE=`date +%b\ %d\ %H:%M:%S`
+ PREFIX="$DATE: ${TYPE_EXP:0:$TYPE_LENGTH}"
+ PREFIX_SMALL="$DATE: "
+
+ LOG_MSG="$PREFIX $MESG"
+ ECHO_MSG="$PREFIX_SMALL $MESG"
+
+ echo -e "$LOG_MSG" >> "$LOGFILE"
+
+ if [ "$TYPE" == "INFO" ] || [ "$TYPE" == "ERROR" ] || [ "$TYPE" == "WARN" ]
+ then
+ echo -e "$ECHO_MSG"
+ fi
+
+}
+
+check_status () {
+
+ ERROR="$1"
+ FUNCTION="$2"
+ MESSAGE="$3"
+
+ if [ ! "$ERROR" == "0" ]
+ then
+ log INFO "$FUNCTION - $MESSAGE"
+ set_status STOPPED
+ cleanup
+ exit 1
+ fi
+
+}
+
+erase_ppss () {
+
+
+ echo "Are you realy sure you want to erase PPSS from all nodes!? (YES/NO)"
+ read YN
+
+ if [ "$YN" == "yes" ] || [ "$YN" == "YES" ]
+ then
+ for NODE in `cat $NODES_FILE`
+ do
+ log INFO "Erasing PPSS homedir $PPSS_DIR from node $NODE."
+ ssh -q $SSH_KEY $SSH_OPTS $USER@$NODE "rm -rf $PPSS_HOME_DIR"
+ done
+ else
+ log INFO "Aborting.."
+ fi
+ sleep 1
+}
+
+deploy () {
+
+ NODE="$1"
+
+ SSH_OPTS_NODE="-o BatchMode=yes -o ControlPath=socket-%h \
+ -o GlobalKnownHostsFile=./known_hosts \
+ -o ControlMaster=auto \
+ -o Cipher=blowfish \
+ -o ConnectTimeout=5 "
+
+ ERROR=0
+ set_error () {
+
+ if [ ! "$1" == "0" ]
+ then
+ ERROR=1
+ fi
+ }
+
+ ssh -q -o ConnectTimeout=5 $SSH_KEY $USER@$NODE exit 0
+ set_error "$?"
+ if [ ! "$ERROR" == "0" ]
+ then
+ log ERROR "Cannot connect to node $NODE."
+ return
+ fi
+
+ ssh -N -M $SSH_OPTS_NODE $SSH_KEY $USER@$NODE &
+ SSH_PID=$!
+
+ KEY=`echo $SSH_KEY | cut -d " " -f 2`
+
+ sleep 1
+
+ ssh -q $SSH_OPTS_NODE $SSH_KEY $USER@$NODE "cd ~ && mkdir $PPSS_HOME_DIR >> /dev/null 2>&1"
+ scp -q $SSH_OPTS_NODE $SSH_KEY $0 $USER@$NODE:~/$PPSS_HOME_DIR
+ set_error $?
+ scp -q $SSH_OPTS_NODE $SSH_KEY $KEY $USER@$NODE:~/$PPSS_HOME_DIR
+ set_error $?
+ scp -q $SSH_OPTS_NODE $SSH_KEY $CONFIG $USER@$NODE:~/$PPSS_HOME_DIR
+ set_error $?
+ scp -q $SSH_OPTS_NODE $SSH_KEY known_hosts $USER@$NODE:~/$PPSS_HOME_DIR
+ set_error $?
+ if [ ! -z "$SCRIPT" ]
+ then
+ scp -q $SSH_OPTS_NODE $SSH_KEY $SCRIPT $USER@$NODE:~/$PPSS_HOME_DIR
+ set_error $?
+ fi
+
+ if [ ! -z "$INPUT_FILE" ]
+ then
+ scp -q $SSH_OPTS_NODE $SSH_KEY $INPUT_FILE $USER@$NODE:~/$PPSS_HOME_DIR
+ set_error $?
+ fi
+
+ if [ "$ERROR" == "0" ]
+ then
+ log INFO "PPSS installed on node $NODE."
+ else
+ log INFO "PPSS failed to install on $NODE."
+ fi
+
+ kill $SSH_PID
+}
+
+deploy_ppss () {
+
+
+ if [ -z "$NODES_FILE" ]
+ then
+ log INFO "ERROR - are you using the right option? -C ?"
+ set_status ERROR
+ cleanup
+ exit 1
+ fi
+
+ KEY=`echo $SSH_KEY | cut -d " " -f 2`
+ if [ -z "$KEY" ] || [ ! -e "$KEY" ]
+ then
+ log ERROR "Nodes require a key file."
+ cleanup
+ set_status "ERROR"
+ exit 1
+ fi
+
+ if [ ! -e "$SCRIPT" ] && [ ! -z "$SCRIPT" ]
+ then
+ log ERROR "Script $SCRIPT not found."
+ set_status "ERROR"
+ cleanup
+ exit 1
+ fi
+
+ INSTALLED_ON_SSH_SERVER=0
+ if [ ! -e "$NODES_FILE" ]
+ then
+ log ERROR "File $NODES with list of nodes does not exist."
+ set_status ERROR
+ cleanup
+ exit 1
+ else
+ for NODE in `cat $NODES_FILE`
+ do
+ deploy "$NODE" &
+ sleep 0.1
+ if [ "$NODE" == "$SSH_SERVER" ]
+ then
+ log DEBUG "SSH SERVER $SSH_SERVER is also a node."
+ INSTALLED_ON_SSH_SERVER=1
+ exec_cmd "mkdir -p $PPSS_HOME_DIR/$JOB_LOG_DIR"
+ exec_cmd "mkdir -p $ITEM_LOCK_DIR"
+ fi
+ done
+ if [ "$INSTALLED_ON_SSH_SERVER" == "0" ]
+ then
+ log DEBUG "SSH SERVER $SSH_SERVER is not a node."
+ deploy "$SSH_SERVER"
+ exec_cmd "mkdir -p $PPSS_HOME_DIR/$JOB_LOG_DIR"
+ exec_cmd "mkdir -p $ITEM_LOCK_DIR"
+ fi
+ fi
+}
+
+start_ppss_on_node () {
+
+ NODE="$1"
+ log INFO "Starting PPSS on node $NODE."
+ ssh $SSH_KEY $USER@$NODE -o ConnectTimeout=5 "cd $PPSS_HOME_DIR ; screen -d -m -S PPSS ~/$PPSS_HOME_DIR/$0 node --config ~/$PPSS_HOME_DIR/$CONFIG"
+}
+
+test_server () {
+
+ # Testing if the remote server works as expected.
+ if [ ! -z "$SSH_SERVER" ]
+ then
+ exec_cmd "date >> /dev/null"
+ check_status "$?" "$FUNCNAME" "Server $SSH_SERVER could not be reached"
+
+ ssh -N -M $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER &
+ SSH_MASTER_PID="$!"
+ log DEBUG "SSH Master pid is $SSH_MASTER_PID"
+ else
+ log DEBUG "No remote server specified, assuming stand-alone mode."
+ fi
+}
+
+get_no_of_cpus () {
+
+ # Use hyperthreading or not?
+ HPT=$1
+ NUMBER=""
+
+ if [ -z "$HPT" ]
+ then
+ HPT=yes
+ fi
+
+ got_cpu_info () {
+
+ ERROR="$1"
+ check_status "$ERROR" "$FUNCNAME" "cannot determine number of cpu cores. Specify with -p."
+
+ }
+
+ if [ "$HPT" == "yes" ]
+ then
+ if [ "$ARCH" == "Linux" ]
+ then
+ NUMBER=`grep ^processor $CPUINFO | wc -l`
+ got_cpu_info "$?"
+
+ elif [ "$ARCH" == "Darwin" ]
+ then
+ NUMBER=`sysctl -a hw | grep -w logicalcpu | awk '{ print $2 }'`
+ got_cpu_info "$?"
+
+ elif [ "$ARCH" == "FreeBSD" ]
+ then
+ NUMBER=`sysctl hw.ncpu | awk '{ print $2 }'`
+ got_cpu_info "$?"
+
+ elif [ "$ARCH" == "SunOS" ]
+ then
+ NUMBER=`psrinfo | grep on-line | wc -l`
+ got_cpu_info "$?"
+ else
+ if [ -e "$CPUINFO" ]
+ then
+ NUMBER=`grep ^processor $CPUINFO | wc -l`
+ got_cpu_info "$?"
+ fi
+ fi
+
+ if [ ! -z "$NUMBER" ]
+ then
+ log INFO "Found $NUMBER logic processors."
+ fi
+
+ elif [ "$HPT" == "no" ]
+ then
+ log INFO "Hyperthreading is disabled."
+
+ if [ "$ARCH" == "Linux" ]
+ then
+ PHYSICAL=`grep 'physical id' $CPUINFO`
+ if [ "$?" == "0" ]
+ then
+ PHYSICAL=`grep 'physical id' $CPUINFO | sort | uniq | wc -l`
+ if [ "$PHYSICAL" == "1" ]
+ then
+ log INFO "Found $PHYSICAL physical CPU."
+ else
+ log INFO "Found $PHYSICAL physical CPUs."
+ fi
+
+ TMP=`grep 'core id' $CPUINFO`
+ if [ "$?" == "0" ]
+ then
+ log DEBUG "Starting job only for each physical core on all physical CPU(s)."
+ NUMBER=`grep 'core id' $CPUINFO | sort | uniq | wc -l`
+ log INFO "Found $NUMBER physical cores."
+ else
+ log INFO "Single core processor(s) detected."
+ log INFO "Starting job for each physical CPU."
+ NUMBER=$PHYSICAL
+ fi
+ else
+ log INFO "No 'physical id' section found in $CPUINFO, typical for older cpus."
+ NUMBER=`grep ^processor $CPUINFO | wc -l`
+ got_cpu_info "$?"
+ fi
+ elif [ "$ARCH" == "Darwin" ]
+ then
+ NUMBER=`sysctl -a hw | grep -w physicalcpu | awk '{ print $2 }'`
+ got_cpu_info "$?"
+ elif [ "$ARCH" == "FreeBSD" ]
+ then
+ NUMBER=`sysctl hw.ncpu | awk '{ print $2 }'`
+ got_cpu_info "$?"
+ else
+ NUMBER=`cat $CPUINFO | grep "cpu cores" | cut -d ":" -f 2 | uniq | sed -e s/\ //g`
+ got_cpu_info "$?"
+ fi
+
+ fi
+
+ if [ ! -z "$NUMBER" ]
+ then
+ MAX_NO_OF_RUNNING_JOBS=$NUMBER
+ else
+ log ERROR "Number of CPUs not obtained."
+ log ERROR "Please specify manually with -p."
+ set_status "ERROR"
+ exit 1
+ fi
+}
+
+random_delay () {
+
+ ARGS="$1"
+
+ if [ -z "$ARGS" ]
+ then
+ log ERROR "$FUNCNAME Function random delay, no argument specified."
+ set_status ERROR
+ exit 1
+ fi
+
+ NUMBER=$RANDOM
+ let "NUMBER %= $ARGS"
+ sleep "0.$NUMBER"
+}
+
+
+global_lock () {
+
+ mkdir $GLOBAL_LOCK > /dev/null 2>&1
+ ERROR="$?"
+
+ if [ ! "$ERROR" == "0" ]
+ then
+ return 1
+ else
+ return 0
+ fi
+}
+
+get_global_lock () {
+
+ while true
+ do
+ global_lock
+ ERROR="$?"
+ if [ ! "$ERROR" == "0" ]
+ then
+ random_delay $MAX_LOCK_DELAY
+ continue
+ else
+ break
+ fi
+ done
+}
+
+release_global_lock () {
+
+ rm -rf "$GLOBAL_LOCK"
+}
+
+are_jobs_running () {
+
+ NUMBER_OF_PROCS=`jobs | wc -l`
+ if [ "$NUMBER_OF_PROCS" -gt "1" ]
+ then
+ return 0
+ else
+ return 1
+ fi
+}
+
+escape_item () {
+
+ TMP="$1"
+
+ ITEM_ESCAPED=`echo "$TMP" | \
+ sed s/\\ /\\\\\\\\\\\\\\ /g | \
+ sed s/\\'/\\\\\\\\\\\\\\'/g | \
+ sed s/\\\`/\\\\\\\\\\\\\\\`/g | \
+ sed s/\\|/\\\\\\\\\\\\\\|/g | \
+ sed s/\&/\\\\\\\\\\\\\\&/g | \
+ sed s/\;/\\\\\\\\\\\\\\;/g | \
+ sed s/\\\//\\\\\\\\\\\\\\ /g | \
+ sed s/\(/\\\\\\\\\\(/g | \
+ sed s/\)/\\\\\\\\\\)/g `
+}
+
+download_item () {
+
+ ITEM="$1"
+ if [ -e "$ITEM" ]
+ then
+ ITEM_NO_PATH=`basename "$ITEM"`
+ else
+ escape_item "$ITEM"
+ ITEM_NO_PATH="$ITEM_ESCAPED"
+ fi
+
+ if [ "$TRANSFER_TO_SLAVE" == "1" ]
+ then
+ log DEBUG "Transfering item $ITEM_NO_PATH to local disk."
+ if [ "$SECURE_COPY" == "1" ] && [ ! -z "$SSH_SERVER" ]
+ then
+ if [ ! -z "$SRC_DIR" ]
+ then
+ ITEM_PATH="$SRC_DIR/$ITEM"
+ else
+ ITEM_PATH="$ITEM"
+ fi
+
+ escape_item "$ITEM_PATH"
+
+ scp -q $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER:"$ITEM_ESCAPED" ./$PPSS_LOCAL_TMPDIR
+ log DEBUG "Exit code of remote transfer is $?"
+ else
+ cp "$ITEM" ./$PPSS_LOCAL_TMPDIR
+ log DEBUG "Exit code of local transfer is $?"
+ fi
+ else
+ log DEBUG "No transfer of item $ITEM_NO_PATH to local workpath."
+ fi
+}
+
+upload_item () {
+
+
+ ITEM="$1"
+ ITEMDIR="$2"
+
+ if [ "$TRANSFER_TO_SLAVE" == "0" ]
+ then
+ log DEBUG "File transfer is disabled."
+ return 0
+ fi
+
+ log DEBUG "Uploading item $ITEM."
+ if [ "$SECURE_COPY" == "1" ]
+ then
+ escape_item "$REMOTE_OUTPUT_DIR$ITEMDIR"
+ DIR_ESCAPED="$ITEM_ESCAPED"
+
+ scp -q $SSH_OPTS $SSH_KEY "$ITEM"/* $USER@$SSH_SERVER:"$DIR_ESCAPED"
+ ERROR="$?"
+ if [ ! "$ERROR" == "0" ]
+ then
+ log ERROR "Uploading of $ITEM via SCP failed."
+ else
+ log DEBUG "Upload of item $ITEM success"
+ rm -rf ./"$ITEM"
+ fi
+ else
+ cp "$ITEM" "$REMOTE_OUTPUT_DIR"
+ ERROR="$?"
+ if [ ! "$ERROR" == "0" ]
+ then
+ log DEBUG "ERROR - uploading of $ITEM vi CP failed."
+ fi
+ fi
+}
+
+lock_item () {
+
+ if [ ! -z "$SSH_SERVER" ]
+ then
+ ITEM="$1"
+
+ LOCK_FILE_NAME=`echo "$ITEM" | \
+ sed s/^\\\.//g | \
+ sed s/^\\\.\\\.//g | \
+ sed s/^\\\///g | \
+ sed s/\\\//\\\\\\ /g | \
+ sed s/\\ /\\\\\\\\\\\\\\ /g | \
+ sed s/\\'/\\\\\\\\\\\\\\'/g | \
+ sed s/\\\//\\\\\\\\\\\\\\ /g | \
+ sed s/\&/\\\\\\\\\\\\\\&/g | \
+ sed s/\;/\\\\\\\\\\\\\\;/g | \
+ sed s/\(/\\\\\\\\\\(/g | \
+ sed s/\)/\\\\\\\\\\)/g `
+
+ ITEM_LOCK_FILE="$ITEM_LOCK_DIR/$LOCK_FILE_NAME"
+ log DEBUG "Trying to lock item $ITEM - $ITEM_LOCK_FILE."
+ exec_cmd "mkdir $ITEM_LOCK_FILE >> /dev/null 2>&1"
+ ERROR="$?"
+
+ if [ "$ERROR" == "$?" ]
+ then
+ exec_cmd "touch $ITEM_LOCK_FILE/$HOSTNAME" # Record that item is claimed by node x.
+ fi
+
+ return "$ERROR"
+ fi
+}
+
+get_all_items () {
+
+ count=0
+
+ if [ -z "$INPUT_FILE" ]
+ then
+ if [ ! -z "$SSH_SERVER" ] # Are we running stand-alone or as a slave?"
+ then
+ ITEMS=`exec_cmd "ls -1 $SRC_DIR"`
+ check_status "$?" "$FUNCNAME" "Could not list files within remote source directory."
+ else
+ if [ -e "$SRC_DIR" ]
+ then
+ ITEMS=`ls -1 $SRC_DIR`
+ else
+ ITEMS=""
+ fi
+ fi
+ IFS=$'\n'
+
+ for x in $ITEMS
+ do
+ ARRAY[$count]="$x"
+ ((count++))
+ done
+ IFS=$IFS_BACKUP
+ else
+ if [ ! -z "$SSH_SERVER" ] # Are we running stand-alone or as a slave?"
+ then
+ log DEBUG "Running as slave, input file has been pushed (hopefully)."
+ fi
+ if [ ! -e "$INPUT_FILE" ]
+ then
+ log ERROR "Input file $INPUT_FILE does not exist."
+ set_status "ERROR"
+ cleanup
+ exit 1
+ fi
+
+ exec 10<"$INPUT_FILE"
+
+ while read LINE <&10
+ do
+ ARRAY[$count]=$LINE
+ ((count++))
+ done
+
+ exec 10>&-
+
+ fi
+
+ SIZE_OF_ARRAY="${#ARRAY[@]}"
+ if [ "$SIZE_OF_ARRAY" -le "0" ]
+ then
+ log ERROR "Source file/dir seems to be empty."
+ set_status STOPPED
+ cleanup
+ exit 1
+ fi
+}
+
+get_item () {
+
+ check_for_interrupt
+
+ if [ "$STOP" == "1" ]
+ then
+ return 1
+ fi
+
+ get_global_lock
+
+ SIZE_OF_ARRAY="${#ARRAY[@]}"
+
+ # Return error if the array is empty.
+ if [ "$SIZE_OF_ARRAY" -le "0" ]
+ then
+ release_global_lock
+ return 1
+ fi
+
+ # This variable is used to walk thtough all array items.
+ ARRAY_POINTER=`cat $ARRAY_POINTER_FILE`
+
+ # Check if all items have been processed.
+ if [ "$ARRAY_POINTER" -ge "$SIZE_OF_ARRAY" ]
+ then
+ release_global_lock
+ #echo -en "\033[1A"
+ return 1
+ fi
+
+ # Select an item.
+ ITEM="${ARRAY[$ARRAY_POINTER]}"
+ if [ -z "$ITEM" ]
+ then
+ ((ARRAY_POINTER++))
+ echo $ARRAY_POINTER > $ARRAY_POINTER_FILE
+ release_global_lock
+ get_item
+ else
+ ((ARRAY_POINTER++))
+ echo $ARRAY_POINTER > $ARRAY_POINTER_FILE
+ lock_item "$ITEM"
+ if [ ! "$?" == "0" ]
+ then
+ log DEBUG "Item $ITEM is locked."
+ release_global_lock
+ get_item
+ else
+ log DEBUG "Got lock on $ITEM, processing."
+ release_global_lock
+ download_item "$ITEM"
+ return 0
+ fi
+ fi
+}
+
+start_single_worker () {
+
+ #
+ # This function sends an item to the fifo. This signals
+ # the listener process to execute a 'worker' on this
+ # item, using the 'commando' function.
+ #
+
+ get_item
+ ERROR=$?
+ if [ ! "$ERROR" == "0" ]
+ then
+ #
+ # If no more items are available, the listener should be
+ # informed that a worker just finished / died.
+ # This allows the listener to determine if all processes
+ # are finished and it is time to stop.
+ #
+ echo "$STOP_KEY" > $FIFO
+ return 1
+ else
+ get_global_lock
+ echo "$ITEM" > $FIFO
+ release_global_lock
+ return 0
+ fi
+}
+
+
+elapsed () {
+
+ BEFORE="$1"
+ AFTER="$2"
+
+ ELAPSED="$(expr $AFTER - $BEFORE)"
+
+ REMAINDER="$(expr $ELAPSED % 3600)"
+ HOURS="$(expr $(expr $ELAPSED - $REMAINDER) / 3600)"
+
+ SECS="$(expr $REMAINDER % 60)"
+ MINS="$(expr $(expr $REMAINDER - $SECS) / 60)"
+
+ echo "Elapsed time (h:m:s): $HOURS:$MINS:$SECS"
+}
+
+commando () {
+
+ #
+ # This function will start a chain reaction of events.
+ #
+ # The commando executes a command on an item and, when finished,
+ # executes the start_single_worker. This function selects a new
+ # item and sends it to the fifo. The listener process receives
+ # the item and excutes this commando function on the item.
+ # So in essence, the commando function keeps calling itself
+ # indirectly until no items are left. This will form a single
+ # working queue. By executing multiple start_single_worker
+ # functions based on the CPU cores available, parallel processing
+ # is achieved, with a queue for each core.
+ #
+
+ ITEM="$1"
+
+ if [ -e "$ITEM" ]
+ then
+ DIRNAME=`dirname "$ITEM"`
+ ITEM_NO_PATH=`basename "$ITEM"`
+ escape_item "$ITEM_NO_PATH"
+ OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$ITEM_ESCAPED"
+ # ^
+ # | This VAR can be used in scripts or command lines.
+ #
+ OUTPUT_FILE="$ITEM_ESCAPED"
+ else
+ DIRNAME=""
+ escape_item "$ITEM"
+ ITEM_NO_PATH="$ITEM_ESCAPED"
+ OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$ITEM_NO_PATH"
+ fi
+
+ log DEBUG "Processing item $ITEM"
+ #
+ # Decide if an item must be transfered from server to the node.
+ # or be processed in-place (NFS / SMB mount?)
+ #
+ if [ "$TRANSFER_TO_SLAVE" == "0" ]
+ then
+ if [ -z "$SRC_DIR" ] && [ ! -z "$INPUT_FILE" ]
+ then
+ log DEBUG "Using item straight from the server."
+ else
+ ITEM="$SRC_DIR/$ITEM"
+ fi
+ else
+ ITEM="./$PPSS_LOCAL_TMPDIR/$ITEM_NO_PATH"
+ fi
+
+ LOG_FILE_NAME=`echo "$ITEM" | sed s/^\\\.//g | sed s/^\\\.\\\.//g | sed s/\\\///g | sed s/\\ /_/g`
+ ITEM_LOG_FILE="$JOB_LOG_DIR/$LOG_FILE_NAME"
+
+ OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$LOG_FILE_NAME"
+
+ mkdir -p "$OUTPUT_DIR"
+
+ does_file_exist "$ITEM_LOG_FILE"
+ if [ "$?" == "0" ]
+ then
+ log DEBUG "Skipping item $ITEM - already processed."
+ else
+ ERROR=""
+ #
+ # Some formatting of item log files.
+ #
+ DATE=`date +%b\ %d\ %H:%M:%S`
+ echo "===== PPSS Item Log File =====" > "$ITEM_LOG_FILE"
+ echo -e "Host:\t\t$HOSTNAME" >> "$ITEM_LOG_FILE"
+ echo -e "Process:\t$PID" >> "$ITEM_LOG_FILE"
+ echo -e "Item:\t\t$ITEM" >> "$ITEM_LOG_FILE"
+ echo -e "Start date:\t$DATE" >> "$ITEM_LOG_FILE"
+ echo -e "" >> "$ITEM_LOG_FILE"
+
+ #
+ # The actual execution of the command as specified by
+ # the -c option.
+ #
+ BEFORE="$(date +%s)"
+ TMP=`echo $COMMAND | grep -i '$ITEM'`
+ if [ "$?" == "0" ]
+ then
+ eval "$COMMAND" >> "$ITEM_LOG_FILE" 2>&1
+ ERROR="$?"
+ MYPID="$!"
+ else
+ eval '$COMMAND"$ITEM" >> "$ITEM_LOG_FILE" 2>&1'
+ ERROR="$?"
+ MYPID="$!"
+ fi
+ AFTER="$(date +%s)"
+
+ echo -e "" >> "$ITEM_LOG_FILE"
+
+ # Some error logging. Success or fail.
+ if [ ! "$ERROR" == "0" ]
+ then
+ echo -e "Status:\t\tFAILURE" >> "$ITEM_LOG_FILE"
+ else
+ echo -e "Status:\t\tSUCCESS" >> "$ITEM_LOG_FILE"
+ fi
+
+ #
+ # If part of a cluster, remove the downloaded item after
+ # it has been processed and uploaded as not to fill up disk space.
+ #
+ if [ "$TRANSFER_TO_SLAVE" == "1" ]
+ then
+ if [ -e "$ITEM" ]
+ then
+ rm "$ITEM"
+ else
+ log DEBUG "ERROR Something went wrong removing item $ITEM from local work dir."
+ fi
+
+ fi
+
+ escape_item "$DIRNAME"
+ ITEM_OUTPUT_DIR="$REMOTE_OUTPUT_DIR/$ITEM_ESCAPED"
+
+ exec_cmd "mkdir -p $ITEM_OUTPUT_DIR"
+ if [ "$DIRNAME" == "." ]
+ then
+ DIRNAME=""
+ fi
+ upload_item "$PPSS_LOCAL_OUTPUT/$ITEM_NO_PATH" "$DIRNAME"
+
+ elapsed "$BEFORE" "$AFTER" >> "$ITEM_LOG_FILE"
+ echo -e "" >> "$ITEM_LOG_FILE"
+
+ if [ ! -z "$SSH_SERVER" ]
+ then
+ log DEBUG "Uploading item log file $ITEM_LOG_FILE to master ~/$PPSS_HOME_DIR/$JOB_LOG_DIR"
+ scp -q $SSH_OPTS $SSH_KEY "$ITEM_LOG_FILE" $USER@$SSH_SERVER:~/$PPSS_HOME_DIR/$JOB_LOG_DIR
+ if [ ! "$?" == "0" ]
+ then
+ log DEBUG "Uploading of item log file failed."
+ fi
+ fi
+ fi
+
+ start_single_worker
+ return $?
+}
+
+#
+# This is the listener service. It listens on the pipe for events.
+# A job is executed for every event received.
+# This listener enables fully asynchronous processing.
+#
+listen_for_job () {
+ FINISHED=0
+ DIED=0
+ PIDS=""
+ log DEBUG "Listener started."
+ while read event <& 42
+ do
+ if [ "$event" == "$STOP_KEY" ]
+ then
+ #
+ # The start_single_worker method sends a special signal to
+ # inform the listener that a worker is finished.
+ # If all workers are finished, it is time to stop.
+ # This mechanism makes PPSS asynchronous.
+ #
+ ((DIED++))
+ if [ "$DIED" -ge "$MAX_NO_OF_RUNNING_JOBS" ]
+ then
+ #kill_process
+ break
+ else
+ RES=$((MAX_NO_OF_RUNNING_JOBS-DIED))
+ if [ "$RES" == "1" ]
+ then
+ log INFO "$((MAX_NO_OF_RUNNING_JOBS-DIED)) job is remaining. "
+ else
+ log INFO "$((MAX_NO_OF_RUNNING_JOBS-DIED)) jobs are remaining."
+ echo -en "\033[1A"
+ fi
+ fi
+ elif [ "$event" == "$KILL_KEY" ]
+ then
+ #
+ # If all workers are finished, it is time to kill all remaining
+ # processes, terminate ssh processes and the listener itself.
+ #
+ # This command kills all processes that are related to the master
+ # process as defined by $PID. All processes that have ever been
+ # spawned, although disowned or backgrounded will be killed...
+ #
+ PROCLIST=`ps a -o pid,pgid,ppid,command | grep [0-9] | grep $PID | grep -v -i grep`
+ #echo "$PROCLIST" > proclist.txt
+ oldIFS=$IFS # save the field separator
+ IFS=$'\n' # new field separator, the end of line
+ for x in `echo "$PROCLIST"`
+ do
+ MYPPID=`echo $x | awk '{ print $3 }'`
+ MYPID=`echo $x | awk '{ print $1 }'`
+ if [ ! "$MYPPID" == "$PID" ] && [ ! "$MYPPID" == "1" ]
+ then
+ if [ ! "$MYPID" == "$PID" ]
+ then
+ log DEBUG "Killing process $MYPID"
+ kill $MYPID >> /dev/null 2>&1
+ else
+ log DEBUG "Not killing master process..$MYPID.."
+ fi
+ else
+ log DEBUG "Not killing listener process. $MYPID.."
+ fi
+ done
+ IFS=$oldIFS
+
+ if [ ! -z "$SSH_MASTER_PID" ]
+ then
+ kill "$SSH_MASTER_PID"
+ fi
+ break
+ else
+ commando "$event" &
+ MYPID="$!"
+ disown
+ PIDS="$PIDS $MYPID"
+ #log DEBUG "Event $event has pid $MYPID"
+ fi
+
+ get_global_lock
+ SIZE_OF_ARRAY="${#ARRAY[@]}"
+ ARRAY_POINTER=`cat $ARRAY_POINTER_FILE`
+ release_global_lock
+ PERCENT=$((100 * $ARRAY_POINTER / $SIZE_OF_ARRAY ))
+ if [ "$DIED" == "0" ] && [ "$FINISHED" == "0" ]
+ then
+ log INFO "Currently $PERCENT percent complete. Processed $ARRAY_POINTER of $SIZE_OF_ARRAY items."
+ if [ "$PERCENT" == "100" ]
+ then
+ FINISHED=1
+ else
+ echo -en "\033[1A"
+ fi
+ fi
+ done
+
+ set_status STOPPED
+ log DEBUG "Listener stopped."
+ if [ ! "$PERCENT" == "100" ]
+ then
+ echo
+ log INFO "Finished. Consult $JOB_LOG_DIR for job output."
+ log INFO "Press ENTER to continue."
+ else
+ log INFO "Finished. Consult $JOB_LOG_DIR for job output."
+ fi
+ cleanup
+}
+
+# This starts an number of parallel workers based on the # of parallel jobs allowed.
+start_all_workers () {
+
+ if [ "$MAX_NO_OF_RUNNING_JOBS" == "1" ]
+ then
+ log INFO "Starting $MAX_NO_OF_RUNNING_JOBS single worker."
+ else
+ log INFO "Starting $MAX_NO_OF_RUNNING_JOBS parallel workers."
+ fi
+ log INFO "---------------------------------------------------------"
+
+ i=0
+ while [ "$i" -lt "$MAX_NO_OF_RUNNING_JOBS" ]
+ do
+ start_single_worker
+ ((i++))
+
+ if [ ! "$MAX_DELAY" == "0" ]
+ then
+ random_delay "$MAX_DELAY"
+ fi
+ done
+}
+
+get_status_of_node () {
+
+ NODE="$1"
+ STATUS=`ssh -o ConnectTimeout=10 $SSH_KEY $USER@$NODE cat "$PPSS_HOME_DIR/$NODE_STATUS" 2>/dev/null`
+ ERROR="$?"
+ if [ ! "$ERROR" == "0" ]
+ then
+ STATUS="UNKNOWN"
+ fi
+ echo "$STATUS"
+}
+
+show_status () {
+
+ source $CONFIG
+ if [ ! -z "$SSH_KEY" ]
+ then
+ SSH_KEY="-i $SSH_KEY"
+ fi
+
+ if [ -z "$INPUT_FILE" ]
+ then
+ ITEMS=`exec_cmd "ls -1 $SRC_DIR | wc -l"`
+ else
+ ITEMS=`exec_cmd "cat $PPSS_DIR/$INPUT_FILE | wc -l"`
+ fi
+
+ PROCESSED=`exec_cmd "ls -1 $ITEM_LOCK_DIR | wc -l"` 2>&1 >> /dev/null
+ TMP_STATUS=$((100 * $PROCESSED / $ITEMS))
+
+ log INFO "Status:\t\t$TMP_STATUS percent complete."
+
+ if [ ! -z $NODES_FILE ]
+ then
+ TMP_NO=`cat $NODES_FILE | wc -l`
+ log INFO "Nodes:\t $TMP_NO"
+ fi
+ log INFO "Items:\t\t$ITEMS"
+
+
+ log INFO "---------------------------------------------------------"
+ HEADER=`echo IP-address Hostname Processed Status | awk '{ printf ("%-16s %-18s % 10s %10s\n",$1,$2,$3,$4) }'`
+ log INFO "$HEADER"
+ log INFO "---------------------------------------------------------"
+ PROCESSED=0
+ for x in `cat $NODES_FILE`
+ do
+ NODE=`get_status_of_node "$x" | awk '{ print $1 }'`
+ if [ ! "$NODE" == "UNKNOWN" ]
+ then
+ STATUS=`get_status_of_node "$x" | awk '{ print $2 }'`
+ RES=`exec_cmd "grep -i $NODE ~/$PPSS_HOME_DIR/$JOB_LOG_DIR/* 2>/dev/null | wc -l "`
+ if [ ! "$?" == "0" ]
+ then
+ RES=0
+ fi
+ else
+ STATUS="UNKNOWN"
+ RES=0
+ fi
+ let PROCESSED=$PROCESSED+$RES
+ LINE=`echo "$x $NODE $RES $STATUS" | awk '{ printf ("%-16s %-18s % 10s %10s\n",$1,$2,$3,$4) }'`
+ log INFO "$LINE"
+ done
+ log INFO "---------------------------------------------------------"
+ LINE=`echo $PROCESSED | awk '{ printf ("Total processed: % 29s\n",$1) }'`
+ log INFO "$LINE"
+}
+
+
+# If this is called, the whole framework will execute.
+main () {
+
+ case $MODE in
+ node )
+ init_vars
+ test_server
+ get_all_items
+ listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & 2>&1 >> /dev/null
+ LISTENER_PID=$!
+ start_all_workers
+ ;;
+ start )
+ # This option only starts all nodes.
+ LOGFILE=/dev/null
+ display_header
+ if [ ! -e "$NODES_FILE" ]
+ then
+ log ERROR "File $NODES with list of nodes does not exist."
+ set_status STOPPED
+ cleanup
+ exit 1
+ else
+ for NODE in `cat $NODES_FILE`
+ do
+ start_ppss_on_node "$NODE"
+ done
+ fi
+ cleanup
+ exit 0
+ ;;
+ config )
+ LOGFILE=/dev/null
+ display_header
+ log INFO "Generating configuration file $CONFIG"
+ add_var_to_config PPSS_LOCAL_TMPDIR "$PPSS_LOCAL_TMPDIR"
+ add_var_to_config PPSS_LOCAL_OUTPUT "$PPSS_LOCAL_OUTPUT"
+ cleanup
+ exit 0
+ ;;
+
+ stop )
+ LOGFILE=/dev/null
+ display_header
+ log INFO "Stopping PPSS on all nodes."
+ exec_cmd "touch $STOP_SIGNAL"
+ cleanup
+ exit
+ ;;
+ pause )
+ LOGFILE=/dev/null
+ display_header
+ log INFO "Pausing PPSS on all nodes."
+ exec_cmd "touch $PAUSE_SIGNAL"
+ cleanup
+ exit
+ ;;
+ continue )
+ LOGFILE=/dev/null
+ display_header
+ if does_file_exist "$STOP_SIGNAL"
+ then
+ log INFO "Continuing processing, please use $0 start to start PPSS on al nodes."
+ exec_cmd "rm -f $STOP_SIGNAL"
+ fi
+ if does_file_exist "$PAUSE_SIGNAL"
+ then
+ log INFO "Continuing PPSS on all nodes."
+ exec_cmd "rm -f $PAUSE_SIGNAL"
+ fi
+ cleanup
+ exit 0
+ ;;
+ deploy )
+ LOGFILE=/dev/null
+ display_header
+ log INFO "Deploying PPSS on nodes."
+ deploy_ppss
+ wait
+ cleanup
+ exit 0
+ ;;
+ status )
+ LOGFILE=/dev/null
+ display_header
+ show_status
+ cleanup
+ exit 0
+ ;;
+ erase )
+ LOGFILE=/dev/null
+ display_header
+ log INFO "Erasing PPSS from all nodes."
+ erase_ppss
+ cleanup
+ exit 0
+ ;;
+ kill )
+ LOGFILE=/dev/null
+ for x in `ps ux | grep ppss | grep -v grep | grep bash | awk '{ print $2 }'`
+ do
+ kill "$x"
+ done
+ cleanup
+ exit 0
+ ;;
+
+ * )
+ init_vars
+ get_all_items
+ listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & 2>&1 >> /dev/null
+ LISTENER_PID=$!
+ #log DEBUG "Master PID is $PID."
+ #log DEBUG "Listener PID is $LISTENER_PID."
+ start_all_workers
+ ;;
+
+ esac
+
+}
+# This command starts the that sets the whole framework in motion.
+main
+
+# Exit after all processes have finished.
+wait
diff --git a/trunk/ppss-test.sh b/trunk/ppss-test.sh
new file mode 100755
index 0000000..08a706a
--- /dev/null
+++ b/trunk/ppss-test.sh
@@ -0,0 +1,223 @@
+#!/bin/bash
+
+DEBUG="$1"
+VERSION=2.55
+TMP_DIR="ppss"
+PPSS=ppss
+PPSS_DIR=ppss_dir
+
+cleanup () {
+
+ for x in $REMOVEFILES
+ do
+ if [ -e ./$x ]
+ then
+ rm -r ./$x
+ fi
+ done
+}
+
+parseJobStatus () {
+
+ TMP_FILE="$1"
+
+ RES=`grep "Status:" "$JOBLOG/$TMP_FILE"`
+ STATUS=`echo "$RES" | awk '{ print $2 }'`
+ echo "$STATUS"
+
+}
+
+oneTimeSetUp () {
+
+ NORMALTESTFILES=`echo test-{a..z}`
+ SPECIALTESTFILES="\'file-!@#$%^&*()_+=-0987654321~\' \'file-/\<>?:;'{}[]\' file-/\/\:\/!@#$%^&*()_+=-0987654321~ file-/\<>?:;'{}[] http://www.google.nl ftp://storage.nl"
+ JOBLOG=./$PPSS_DIR/job_log
+ INPUTFILENORMAL=test-normal.input
+ INPUTFILESPECIAL=test-special.input
+ LOCALOUTPUT=ppss_dir/PPSS_LOCAL_OUTPUT
+
+ REMOVEFILES="$INPUTFILENORMAL $INPUTFILESPECIAL $PPSS_DIR test-ppss-*"
+
+ cleanup
+
+ for x in $NORMALTESTFILES
+ do
+ echo "$x" >> "$INPUTFILENORMAL"
+ done
+
+ for x in $SPECIALTESTFILES
+ do
+ echo $x >> "$INPUTFILESPECIAL"
+ done
+}
+
+testVersion () {
+
+ RES=`./$PPSS -v`
+
+ for x in $RES
+ do
+ echo "$x" | grep [0-9] >> /dev/null
+ if [ "$?" == "0" ]
+ then
+ assertEquals "Version mismatch!" "$VERSION" "$x"
+ fi
+ done
+}
+
+rename-ppss-dir () {
+
+ TEST="$1"
+
+ if [ -e "$PPSS_DIR" ] && [ -d "$PPSS_DIR" ] && [ ! -z "$TEST" ]
+ then
+ mv "$PPSS_DIR" test-ppss-"$TEST"
+ fi
+}
+
+oneTimeTearDown () {
+
+ if [ ! "$DEBUG" == "debug" ]
+ then
+ cleanup
+ fi
+}
+
+createDirectoryWithSomeFiles () {
+
+ A="File with Spaces"
+ B="File\With\Slashes"
+
+ mkdir "/tmp/$TMP_DIR"
+ for x in "$A" "$B"
+ do
+ TMP_FILE="/tmp/$TMP_DIR/$x"
+ touch "$TMP_FILE"
+ done
+}
+
+testSpacesInFilenames () {
+
+ createDirectoryWithSomeFiles
+
+ RES=$( { ./$PPSS -d /tmp/$TMP_DIR -c 'ls -alh ' >> /dev/null ; } 2>&1 )
+ assertEquals "PPSS did not execute properly." 0 "$?"
+
+ assertNull "PPSS retured some errors..." "$RES"
+ if [ ! "$?" == "0" ]
+ then
+ echo "RES IS $RES"
+ fi
+
+ grep "SUCCESS" $JOBLOG/* >> /dev/null 2>&1
+ assertEquals "Found error with space in filename $TMP_FILE" "0" "$?"
+
+ rm -rf "/tmp/$TMP_DIR"
+ rename-ppss-dir $FUNCNAME
+}
+
+testSpecialCharacterHandling () {
+
+ RES=$( { ./$PPSS -f "$INPUTFILESPECIAL" -c 'echo ' >> /dev/null ; } 2>&1 )
+ assertEquals "PPSS did not execute properly." 0 "$?"
+
+ assertNull "PPSS retured some errors..." "$RES"
+ if [ ! "$?" == "0" ]
+ then
+ echo "RES IS $RES"
+ fi
+
+ RES=`find ppss_dir/PPSS_LOCAL_OUTPUT | wc -l | sed 's/\ //g'`
+ assertEquals "To many lock files..." "7" "$RES"
+
+ RES1=`ls -1 $JOBLOG`
+ RES2=`ls -1 $LOCALOUTPUT`
+
+ assertEquals "RES1 $RES1 is not the same as RES2 $RES2" "$RES1" "$RES2"
+
+ rename-ppss-dir $FUNCNAME
+}
+
+testSkippingOfProcessedItems () {
+
+ createDirectoryWithSomeFiles
+
+ RES=$( { ./$PPSS -d /tmp/$TMP_DIR -c 'echo ' >> /dev/null ; } 2>&1 )
+ assertEquals "PPSS did not execute properly." 0 "$?"
+ assertNull "PPSS retured some errors..." "$RES"
+
+ RES=$( { ./$PPSS -d /tmp/$TMP_DIR -c 'echo ' >> /dev/null ; } 2>&1 )
+ assertEquals "PPSS did not execute properly." 0 "$?"
+ assertNull "PPSS retured some errors..." "$RES"
+
+ grep -i skip ./$PPSS_dir/* >> /dev/null 2>&1
+ assertEquals "Skipping of items went wrong." 0 "$?"
+
+ rename-ppss-dir $FUNCNAME-1
+
+ RES=$( { ./$PPSS -f $INPUTFILESPECIAL -c 'echo ' >> /dev/null ; } 2>&1 )
+ assertEquals "PPSS did not execute properly." 0 "$?"
+ assertNull "PPSS retured some errors..." "$RES"
+
+ RES=$( { ./$PPSS -f $INPUTFILESPECIAL -c 'echo ' >> /dev/null ; } 2>&1 )
+ assertEquals "PPSS did not execute properly." 0 "$?"
+ assertNull "PPSS retured some errors..." "$RES"
+
+ grep -i skip ./$PPSS_dir/* >> /dev/null 2>&1
+ assertEquals "Skipping of items went wrong." 0 "$?"
+
+ rm -rf "/tmp/$TMP_DIR"
+ rename-ppss-dir $FUNCNAME-2
+}
+
+testExistLogFiles () {
+
+ ./$PPSS -f "$INPUTFILENORMAL" -c 'echo "$ITEM"' >> /dev/null
+ assertEquals "PPSS did not execute properly." 0 "$?"
+
+ for x in $NORMALTESTFILES
+ do
+ assertTrue "[ -e $JOBLOG/$x ]"
+ done
+
+ rename-ppss-dir $FUNCNAME
+}
+
+getStatusOfJob () {
+
+ EXPECTED="$1"
+
+ if [ "$EXPECTED" == "SUCCESS" ]
+ then
+ ./$PPSS -f "$INPUTFILENORMAL" -c 'echo ' >> /dev/null
+ assertEquals "PPSS did not execute properly." 0 "$?"
+ elif [ "$EXPECTED" == "FAILURE" ]
+ then
+ ./$PPSS -f "$INPUTFILENORMAL" -c 'thiscommandfails ' >> /dev/null
+ assertEquals "PPSS did not execute properly." 0 "$?"
+ fi
+
+ for x in $NORMALTESTFILES
+ do
+ STATUS=`parseJobStatus "$x"`
+ assertEquals "FAILED WITH STATUS $STATUS." "$EXPECTED" "$STATUS"
+ done
+
+ rename-ppss-dir "$FUNCNAME-$EXPECTED"
+}
+
+
+testErrorHandlingOK () {
+
+ getStatusOfJob SUCCESS
+}
+
+testErrorHandlingFAIL () {
+
+ getStatusOfJob FAILURE
+}
+
+
+
+
+. ./shunit2
diff --git a/trunk/ppss.sh b/trunk/ppss.sh
index 701a4c5..7dc51d0 100755
--- a/trunk/ppss.sh
+++ b/trunk/ppss.sh
@@ -34,73 +34,165 @@
#------------------------------------------------------------------------------
# Handling control-c for a clean shutdown.
-trap 'kill_process; ' INT
+trap 'kill_process' SIGINT
-# Setting some vars. Do not change.
+# Setting some vars.
SCRIPT_NAME="Distributed Parallel Processing Shell Script"
-SCRIPT_VERSION="2.0"
+SCRIPT_VERSION="2.50"
-# The first argument to this script is always the 'mode'.
-MODE="$1"
-shift
+# The first argument to this script can be a mode.
+MODES="node start config stop pause continue deploy status erase kill"
+for x in $MODES
+do
+ if [ "$x" == "$1" ]
+ then
+ MODE="$1"
+ shift
+ fi
+done
-ARGS=$@
-CONFIG="config.cfg"
+# The working directory of PPSS can be set with
+# export PPSS_DIR=/path/to/workingdir
+if [ -z "$PPSS_DIR" ]
+then
+ PPSS_DIR="./ppss"
+fi
+
+if [ ! -e "$PPSS_DIR" ]
+then
+ mkdir -p "$PPSS_DIR"
+fi
+
+CONFIG=""
HOSTNAME=`hostname`
ARCH=`uname`
-RUNNING_SIGNAL="$0_is_running" # Prevents running mutiple instances of PPSS..
-GLOBAL_LOCK="PPSS-GLOBAL-LOCK" # Global lock file used by local PPSS instance.
-PAUSE_SIGNAL="pause_signal" # Not implemented yet (pause processing).
-PAUSE_DELAY=300
-STOP_SIGNAL="stop_signal"
-ARRAY_POINTER_FILE="ppss-array-pointer" #
-JOB_LOG_DIR="JOB_LOG" # Directory containing log files of processed items.
-LOGFILE="ppss-log.txt" # General PPSS log file. Contains lots of info.
-STOP=9 # STOP job.
-MAX_DELAY=2
-PERCENT="0"
+
PID="$$"
+GLOBAL_LOCK="$PPSS_DIR/PPSS-GLOBAL-LOCK-$PID" # Global lock file used by local PPSS instance.
+PAUSE_SIGNAL="$PPSS_DIR/pause_signal" # Pause processing if this file is present.
+PAUSE_DELAY=60 # Polling every 1 minutes by default.
+STOP_SIGNAL="$PPSS_DIR/stop_signal" # Stop processing if this file is present.
+ARRAY_POINTER_FILE="$PPSS_DIR/ppss-array-pointer-$PID" # Pointer for keeping track of processed items.
+JOB_LOG_DIR="$PPSS_DIR/job_log" # Directory containing log files of processed items.
+LOGFILE="$PPSS_DIR/ppss-log-$$.txt" # General PPSS log file. Contains lots of info.
+STOP=0 # STOP job.
+MAX_DELAY=0 # MAX DELAY between jobs.
+MAX_LOCK_DELAY=9 #
+PERCENT="0"
LISTENER_PID=""
IFS_BACKUP="$IFS"
-INTERVAL="30" # Polling interval to check if there are running jobs.
CPUINFO=/proc/cpuinfo
+PROCESSORS=""
+STOP_KEY=$RANDOM$RANDOM$RANDOM
+KILL_KEY=$RANDOM$RANDOM$RANDOM
SSH_SERVER="" # Remote server or 'master'.
SSH_KEY="" # SSH key for ssh account.
-SSH_SOCKET="/tmp/PPSS-ssh-socket" # Multiplex multiple SSH connections over 1 master.
+SSH_KNOWN_HOSTS=""
+SSH_SOCKET="$PPSS_DIR/PPSS_SSH_SOCKET" # Multiplex multiple SSH connections over 1 master.
SSH_OPTS="-o BatchMode=yes -o ControlPath=$SSH_SOCKET \
-o GlobalKnownHostsFile=./known_hosts \
-o ControlMaster=auto \
- -o ConnectTimeout=5"
+ -o Cipher=blowfish \
+ -o ConnectTimeout=10 "
+
+ # Blowfish is faster but still secure.
SSH_MASTER_PID=""
-PPSS_HOME_DIR="ppss"
-ITEM_LOCK_DIR="PPSS_ITEM_LOCK_DIR" # Remote directory on master used for item locking.
-PPSS_LOCAL_TMPDIR="PPSS_LOCAL_TMPDIR" # Local directory on slave for local processing.
-PPSS_LOCAL_OUTPUT="PPSS_LOCAL_OUTPUT" # Local directory on slave for local output.
+PPSS_HOME_DIR=ppss
+ITEM_LOCK_DIR="$PPSS_DIR/PPSS_ITEM_LOCK_DIR" # Remote directory on master used for item locking.
+PPSS_LOCAL_TMPDIR="$PPSS_DIR/PPSS_LOCAL_TMPDIR" # Local directory on slave for local processing.
+PPSS_LOCAL_OUTPUT="$PPSS_DIR/PPSS_LOCAL_OUTPUT" # Local directory on slave for local output.
TRANSFER_TO_SLAVE="0" # Transfer item to slave via (s)cp.
SECURE_COPY="1" # If set, use SCP, Otherwise, use cp.
REMOTE_OUTPUT_DIR="" # Remote directory to which output must be uploaded.
SCRIPT="" # Custom user script that is executed by ppss.
+ITEM_ESCAPED=""
+NODE_STATUS="$PPSS_DIR/status.txt"
+showusage_short () {
+
+ echo
+ echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION"
+ echo
+ echo "usage: $0 [ -d | -f ] [ -c ' \"\$ITEM\"' ]"
+ echo " [ -C ] [ -j ] [ -l ] [ -p <# jobs> ]"
+ echo " [ -D ] [ -h ] [ --help ]"
+ echo
+ echo "Examples:"
+ echo " $0 -d /dir/with/some/files -c 'gzip '"
+ echo " $0 -d /dir/with/some/files -c 'gzip \"\$ITEM\"' -D 5"
+ echo " $0 -d /dir/with/some/files -c 'cp \"\$ITEM\" /tmp' -p 2"
+}
+
+showusage_normal () {
-showusage () {
-
echo
- echo "$SCRIPT_NAME"
- echo "Version: $SCRIPT_VERSION"
+ echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION"
echo
echo "PPSS is a Bash shell script that executes commands in parallel on a set "
- echo "of items, such as files, or lines in a file."
+ echo "of items, such as files in a directory, or lines in a file."
echo
- echo "Usage: $0 MODE [ options ]"
- echo " or "
- echo "Usage: $0 MODE -c "
+ echo "This short summary only discusses options for stand-alone mode. for a "
+ echo "complete listing of all options, run PPSS with the options --help"
+ echo
+ echo "Usage $0 [ options ]"
+ echo
+ echo -e "--command | -c Command to execute. Syntax: ' ' including the single quotes."
+ echo -e " Example: -c 'ls -alh '. It is also possible to specify where an item "
+ echo -e " must be inserted: 'cp \"\$ITEM\" /somedir'."
echo
- echo "Modes are:"
+ echo -e "--sourcedir | -d Directory that contains files that must be processed. Individual files"
+ echo -e " are fed as an argument to the command that has been specified with -c."
+ echo
+ echo -e "--sourcefile | -f Each single line of the supplied file will be fed as an item to the"
+ echo -e " command that has been specified with -c."
+ echo
+ echo -e "--config | -C If the mode is config, a config file with the specified name will be"
+ echo -e " generated based on all the options specified. In the other modes".
+ echo -e " this option will result in PPSS reading the config file and start"
+ echo -e " processing items based on the settings of this file."
+ echo
+ echo -e "--enable-ht | -j Enable hyperthreading. Is disabled by default."
+ echo
+ echo -e "--log | -l Sets the name of the log file. The default is ppss-log.txt."
+ echo
+ echo -e "--processes | -p Start the specified number of processes. Ignore the number of available"
+ echo -e " CPUs."
+ echo
+ echo -e "--delay | -D Adds an initial random delay to the start of all parallel jobs to spread"
+ echo -e " the load. The delay is only used at the start of all 'threads'."
+ echo
+ echo -e "Example: encoding some wav files to mp3 using lame:"
+ echo
+ echo -e "$0 -d /path/to/wavfiles -c 'lame '"
+ echo
+ echo -e "Extended usage: use --help"
+ echo
+}
+
+if [ "$#" == "0" ]
+then
+ showusage_short
+ if [ -e "$PPSS_DIR" ]
+ then
+ rm -rf "$PPSS_DIR"
+ fi
+ exit 1
+fi
+
+showusage_long () {
+
+ echo
+ echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION"
+ echo
+ echo "PPSS is a Bash shell script that executes commands in parallel on a set "
+ echo "of items, such as files in a directory, or lines in a file."
+ echo
+ echo "Usage: $0 [ MODE ] [ options ]"
+ echo
+ echo "Modes are optional and mainly used for running in distributed mode. Modes are:"
echo
- echo " standalone For execution of PPSS on a single host."
- echo " node For execution of PPSS on a node, that is part of a 'cluster'."
echo " config Generate a config file based on the supplied option parameters."
echo " deploy Deploy PPSS and related files on the specified nodes."
echo " erase Erase PPSS and related files from the specified nodes."
@@ -108,6 +200,7 @@ showusage () {
echo " start Starting PPSS on nodes."
echo " pause Pausing PPSS on all nodes."
echo " stop Stopping PPSS on all nodes."
+ echo " node Running PPSS as a node, requires additional options."
echo
echo "Options are:"
echo
@@ -121,7 +214,7 @@ showusage () {
echo -e "--sourcefile | -f Each single line of the supplied file will be fed as an item to the"
echo -e " command that has been specified with -c."
echo
- echo -e "--config | -c If the mode is config, a config file with the specified name will be"
+ echo -e "--config | -C If the mode is config, a config file with the specified name will be"
echo -e " generated based on all the options specified. In the other modes".
echo -e " this option will result in PPSS reading the config file and start"
echo -e " processing items based on the settings of this file."
@@ -131,11 +224,14 @@ showusage () {
echo -e "--log | -l Sets the name of the log file. The default is ppss-log.txt."
echo
echo -e "--processes | -p Start the specified number of processes. Ignore the number of available"
- echo -e " CPU's."
+ echo -e " CPUs."
echo
+ echo -e "--delay | -D Adds an initial random delay to the start of all parallel jobs to spread"
+ echo -e " the load. The delay is only used at the start of all 'threads'."
+ echo
echo -e "The following options are used for distributed execution of PPSS."
echo
- echo -e "--server | -s Specifies the SSH server that is used for communication between nodes."
+ echo -e "--master | -m Specifies the SSH server that is used for communication between nodes."
echo -e " Using SSH, file locks are created, informing other nodes that an item "
echo -e " is locked. Also, often items, such as files, reside on this host. SCP "
echo -e " is used to transfer files from this host to nodes for local procesing."
@@ -143,12 +239,16 @@ showusage () {
echo -e "--node | -n File containig a list of nodes that act as PPSS clients. One IP / DNS "
echo -e " name per line."
echo
- echo -e "--key | -k The SSH key that a node uses to connect to the server."
+ echo -e "--key | -k The SSH key that a node uses to connect to the master."
+ echo
+ echo -e "--known-hosts | -K The file that contains the server public key. Can often be found on "
+ echo -e " hosts that already once connected to the server. See the file "
+ echo -e " ~/.ssh/known_hosts or else, manualy connect once and check this file."
echo
echo -e "--user | -u The SSH user name that is used when logging in into the master SSH"
echo -e " server."
echo
- echo -e "--script | -s Specifies the script/program that must be copied to the nodes for "
+ echo -e "--script | -S Specifies the script/program that must be copied to the nodes for "
echo -e " execution through PPSS. Only used in the deploy mode."
echo -e " This option should be specified if necessary when generating a config."
echo
@@ -162,60 +262,40 @@ showusage () {
echo -e " encoding a wav file is an mp3 file, the mp3 file is put in the "
echo -e " directory specified with this option."
echo
+ echo -e "--homedir | -H Directory in which directory PPSS is installed on the node."
+ echo -e " Default is 'ppss'."
+ echo
echo -e "Example: encoding some wav files to mp3 using lame:"
echo
- echo -e "$0 standalone -c 'lame ' -d /path/to/wavfiles -j "
+ echo -e "$0 -c 'lame ' -d /path/to/wavfiles -j "
echo
echo -e "Running PPSS based on a configuration file."
echo
- echo -e "$0 node -C config.cfg"
+ echo -e "$0 -C config.cfg"
echo
echo -e "Running PPSS on a client as part of a cluster."
echo
- echo -e "$0 node -d /somedir -c 'cp "$ITEM" /some/destination' -s 10.0.0.50 -u ppss -t -k ppss-key.key"
+ echo -e "$0 -d /somedir -c 'cp "$ITEM" /some/destination' -s 10.0.0.50 -u ppss -t -k ppss-key.key"
echo
}
kill_process () {
- kill $LISTENER_PID >> /dev/null 2>&1
- while true
- do
- JOBS=`ps ax | grep -v grep | grep -v -i screen | grep ppss.sh | grep -i bash | wc -l`
- if [ "$JOBS" -gt "2" ]
- then
- for x in `ps ax | grep -v grep | grep -v -i screen | grep ppss.sh | grep -i bash | awk '{ print $1 }'`
- do
- if [ ! "$x" == "$PID" ] && [ ! "$x" == "$$" ]
- then
- kill -9 $x >> /dev/null 2>&1
- fi
- done
- sleep 5
- else
- cleanup
- echo -en "\033[1B"
- # The master SSH connection should be killed.
- if [ ! -z "$SSH_MASTER_PID" ]
- then
- kill -9 "$SSH_MASTER_PID"
- fi
- echo ""
- exit 0
- fi
- done
-
+ echo "$KILL_KEY" >> "$FIFO"
}
exec_cmd () {
+
CMD="$1"
- if [ ! -z "$SSH_SERVER" ] && [ "$SECURE_COPY" == "1" ]
+ if [ ! -z "$SSH_SERVER" ]
then
ssh $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER $CMD
+ return $?
else
eval "$CMD"
+ return $?
fi
}
@@ -237,16 +317,21 @@ check_for_interrupt () {
does_file_exist "$STOP_SIGNAL"
if [ "$?" == "0" ]
then
+ set_status "STOPPED"
log INFO "STOPPING job. Stop signal found."
STOP="1"
+ return 1
fi
does_file_exist "$PAUSE_SIGNAL"
if [ "$?" == "0" ]
then
+ set_status "PAUZED"
log INFO "PAUSE: sleeping for $PAUSE_DELAY SECONDS."
- sleep $PAUSE_DELAY
+ sleep 0.$PAUSE_DELAY
check_for_interrupt
+ else
+ set_status "RUNNING"
fi
}
@@ -256,22 +341,17 @@ cleanup () {
if [ -e "$FIFO" ]
then
- rm $FIFO
+ rm "$FIFO"
fi
if [ -e "$ARRAY_POINTER_FILE" ]
then
- rm $ARRAY_POINTER_FILE
+ rm "$ARRAY_POINTER_FILE"
fi
if [ -e "$GLOBAL_LOCK" ]
then
- rm -rf $GLOBAL_LOCK
- fi
-
- if [ -e "$RUNNING_SIGNAL" ]
- then
- rm "$RUNNING_SIGNAL"
+ rm -rf "$GLOBAL_LOCK"
fi
if [ -e "$SSH_SOCKET" ]
@@ -281,19 +361,6 @@ cleanup () {
}
-# check if ppss is already running.
-is_running () {
-
- if [ -e "$RUNNING_SIGNAL" ]
- then
- echo
- log INFO "$0 is already running (lock file exists)."
- echo
- exit 1
- fi
-}
-
-
add_var_to_config () {
if [ "$MODE" == "config" ]
@@ -317,9 +384,9 @@ do
then
if [ -e "$CONFIG" ]
then
- echo "Do want to overwrite existing config file?"
+ echo "Do want to overwrite existing config file? [y/n]"
read yn
- if [ "$yn" == "y" ]
+ if [ "$yn" == "y" ] || [ "$yn" == "yes" ]
then
rm "$CONFIG"
else
@@ -340,15 +407,26 @@ do
SSH_KEY="-i $SSH_KEY"
fi
+ if [ ! -e "./known_hosts" ]
+ then
+ if [ -e $SSH_KNOWN_HOSTS ]
+ then
+ cat $SSH_KNOWN_HOSTS > ./known_hosts
+ else
+ echo "File $SSH_KNOWN_HOSTS does not exist."
+ exit
+ fi
+ fi
+
shift 2
;;
- --node|-n )
+ --node|-n )
NODES_FILE="$2"
add_var_to_config NODES_FILE "$NODES_FILE"
shift 2
;;
- --sourcefile|-f )
+ --sourcefile|-f )
INPUT_FILE="$2"
add_var_to_config INPUT_FILE "$INPUT_FILE"
shift 2
@@ -358,8 +436,13 @@ do
add_var_to_config SRC_DIR "$SRC_DIR"
shift 2
;;
- --command|-c )
- COMMAND=$2
+ --delay|-D)
+ MAX_DELAY="$2"
+ add_var_to_config MAX_DELAY "$MAX_DELAY"
+ shift 2
+ ;;
+ --command|-c )
+ COMMAND="$2"
if [ "$MODE" == "config" ]
then
COMMAND=\'$COMMAND\'
@@ -368,29 +451,45 @@ do
shift 2
;;
- --help|-h )
- showusage
+ -h )
+ showusage_normal
+ if [ -e "$PPSS_DIR" ]
+ then
+ rm -rf "$PPSS_DIR"
+ fi
exit 1;;
- --homedir|-H)
+ --help)
+ showusage_long
+ if [ -e "$PPSS_DIR" ]
+ then
+ rm -rf "$PPSS_DIR"
+ fi
+ exit 1;;
+ --homedir|-H )
if [ ! -z "$2" ]
then
PPSS_HOME_DIR="$2"
- add_var_to_config PPSS_HOME_DIR $PPSS_HOME_DIR
+ add_var_to_config PPSS_DIR $PPSS_HOME_DIR
shift 2
fi
;;
- --disable-ht|-j )
+ --disable-ht|-j )
HYPERTHREADING=no
add_var_to_config HYPERTHREADING $HYPERTHREADING
shift 1
;;
- --log|-l )
+ --log|-l )
LOGFILE="$2"
add_var_to_config LOGFILE "$LOGFILE"
shift 2
;;
- --key|-k )
+ --workingdir|-w )
+ WORKINGDIR="$2"
+ add_var_to_config WORKINGDIR "$WORKINGDIR"
+ shift 2
+ ;;
+ --key|-k )
SSH_KEY="$2"
add_var_to_config SSH_KEY "$SSH_KEY"
if [ ! -z "$SSH_KEY" ]
@@ -399,7 +498,13 @@ do
fi
shift 2
;;
- --no-scp |-b )
+ --known-hosts | -K )
+ SSH_KNOWN_HOSTS="$2"
+ add_var_to_config SSH_KNOWN_HOSTS "$SSH_KNOWN_HOSTS"
+ shift 2
+ ;;
+
+ --no-scp |-b )
SECURE_COPY=0
add_var_to_config SECURE_COPY "$SECURE_COPY"
shift 1
@@ -418,7 +523,7 @@ do
shift 2
fi
;;
- --server|-s )
+ --master|-m )
SSH_SERVER="$2"
add_var_to_config SSH_SERVER "$SSH_SERVER"
shift 2
@@ -446,32 +551,60 @@ do
exit 0
;;
* )
- showusage
+
+ showusage_short
+ echo
+ echo "Unknown option $1 "
+ echo
exit 1;;
esac
done
+
+display_header () {
+
+ log info ""
+ log INFO "========================================================="
+ log INFO " |P|P|S|S| "
+ log INFO "$SCRIPT_NAME version $SCRIPT_VERSION"
+ log INFO "========================================================="
+ log INFO "Hostname:\t\t$HOSTNAME"
+ log INFO "---------------------------------------------------------"
+}
+
+
# Init all vars
init_vars () {
+
+ if [ "$ARCH" == "Darwin" ]
+ then
+ MIN_JOBS=4
+ elif [ "$ARCH" == "Linux" ]
+ then
+ MIN_JOBS=3
+ fi
+
if [ -e "$LOGFILE" ]
then
rm $LOGFILE
fi
+
+ display_header
if [ -z "$COMMAND" ]
then
echo
- echo "ERROR - no command specified."
+ log ERROR "No command specified."
echo
- showusage
+ showusage_normal
cleanup
exit 1
fi
echo 0 > $ARRAY_POINTER_FILE
- FIFO=$(pwd)/fifo-$RANDOM-$RANDOM
+ FIFO=/tmp/ppss-fifo-$RANDOM-$RANDOM
if [ ! -e "$FIFO" ]
then
@@ -480,20 +613,39 @@ init_vars () {
exec 42<> $FIFO
- touch $RUNNING_SIGNAL
+ set_status "RUNNING"
+
+ if [ -e "$CPUINFO" ]
+ then
+ CPU=`cat /proc/cpuinfo | grep 'model name' | cut -d ":" -f 2 | sed -e s/^\ //g | sort | uniq`
+ log INFO "CPU: $CPU"
+ elif [ "$ARCH" == "Darwin" ]
+ then
+ MODEL=`system_profiler SPHardwareDataType | grep "Processor Name" | cut -d ":" -f 2`
+ SPEED=`system_profiler SPHardwareDataType | grep "Processor Speed" | cut -d ":" -f 2`
+ log INFO "CPU: $MODEL $SPEED"
+ elif [ "$ARCH" == "SunOS" ]
+ then
+ CPU=`psrinfo -v | grep MHz | cut -d " " -f 4,8 | awk '{ printf ("Processor architecture: %s @ %s MHz.\n", $1,$2) }' | head -n 1`
+
+ log INFO "$CPU"
+ else
+ log INFO "CPU: Cannot determine. Provide a patch for your arch!"
+ log INFO "Arch is $ARCH"
+ fi
if [ -z "$MAX_NO_OF_RUNNING_JOBS" ]
then
- MAX_NO_OF_RUNNING_JOBS=`get_no_of_cpus $HYPERTHREADING`
+ get_no_of_cpus $HYPERTHREADING
fi
does_file_exist "$JOB_LOG_DIR"
if [ ! "$?" == "0" ]
then
- log INFO "Job log directory $JOB_lOG_DIR does not exist. Creating."
- exec_cmd "mkdir $JOB_LOG_DIR"
+ log DEBUG "Job log directory $JOB_lOG_DIR does not exist. Creating."
+ exec_cmd "mkdir -p $JOB_LOG_DIR"
else
- log INFO "Job log directory $JOB_LOG_DIR exists, skipping items for which logs are present."
+ log DEBUG "Job log directory $JOB_LOG_DIR exists."
fi
does_file_exist "$ITEM_LOCK_DIR"
@@ -505,28 +657,42 @@ init_vars () {
if [ ! -e "$JOB_LOG_DIR" ]
then
- mkdir "$JOB_LOG_DIR"
+ mkdir -p "$JOB_LOG_DIR"
fi
does_file_exist "$REMOTE_OUTPUT_DIR"
if [ ! "$?" == "0" ]
then
- echo "ERROR: remote output dir $REMOTE_OUTPUT_DIR does not exist."
+ log ERROR "Remote output dir $REMOTE_OUTPUT_DIR does not exist."
+ set_status STOPPED
cleanup
exit
fi
- if [ ! -e "$PPSS_LOCAL_TMPDIR" ] && [ ! -z "$SSH_SERVER" ]
+ if [ ! -e "$PPSS_LOCAL_TMPDIR" ]
then
mkdir "$PPSS_LOCAL_TMPDIR"
fi
- if [ ! -e "$PPSS_LOCAL_OUTPUT" ] && [ ! -z "$SSH_SERVER" ]
+ if [ ! -e "$PPSS_LOCAL_OUTPUT" ]
then
mkdir "$PPSS_LOCAL_OUTPUT"
fi
}
+get_status () {
+
+ STATUS=`cat "$NODE_SATUS"`
+ echo "$STATUS"
+}
+
+set_status () {
+
+ STATUS="$1"
+ echo "$HOSTNAME $STATUS" > "$NODE_STATUS"
+}
+
+
expand_str () {
STR=$1
@@ -542,30 +708,32 @@ expand_str () {
}
log () {
+
+ # Type 'INFO' is logged to the screen
+ # Any other log-type is only logged to the logfile.
TYPE="$1"
MESG="$2"
- TMP_LOG=""
- TYPE_LENGTH=6
+ TYPE_LENGTH=5
TYPE_EXP=`expand_str "$TYPE"`
DATE=`date +%b\ %d\ %H:%M:%S`
- PREFIX="$DATE: ${TYPE_EXP:0:$TYPE_LENGTH} -"
+ PREFIX="$DATE: ${TYPE_EXP:0:$TYPE_LENGTH}"
+ PREFIX_SMALL="$DATE: "
LOG_MSG="$PREFIX $MESG"
+ ECHO_MSG="$PREFIX_SMALL $MESG"
echo -e "$LOG_MSG" >> "$LOGFILE"
- if [ "$TYPE" == "INFO" ]
+ if [ "$TYPE" == "INFO" ] || [ "$TYPE" == "ERROR" ] || [ "$TYPE" == "WARN" ]
then
- echo -e "$LOG_MSG"
+ echo -e "$ECHO_MSG"
fi
}
-log INFO "$0 $@"
-
check_status () {
ERROR="$1"
@@ -575,6 +743,7 @@ check_status () {
if [ ! "$ERROR" == "0" ]
then
log INFO "$FUNCTION - $MESSAGE"
+ set_status STOPPED
cleanup
exit 1
fi
@@ -583,33 +752,95 @@ check_status () {
erase_ppss () {
- echo "Are you realy sure you want to erase PPSS from all nades!?"
+
+ echo "Are you realy sure you want to erase PPSS from all nodes!? (YES/NO)"
read YN
- if [ "$YN" == "y" ]
+ if [ "$YN" == "yes" ] || [ "$YN" == "YES" ]
then
for NODE in `cat $NODES_FILE`
do
- log INFO "Erasing PPSS homedir $PPSS_HOME_DIR from node $NODE."
- ssh $USER@$NODE "rm -rf $PPSS_HOME_DIR"
+ log INFO "Erasing PPSS homedir $PPSS_DIR from node $NODE."
+ ssh -q $SSH_KEY $SSH_OPTS $USER@$NODE "rm -rf $PPSS_HOME_DIR"
done
+ else
+ log INFO "Aborting.."
fi
+ sleep 1
}
-deploy_ppss () {
+deploy () {
+
+ NODE="$1"
+
+ SSH_OPTS_NODE="-o BatchMode=yes -o ControlPath=socket-%h \
+ -o GlobalKnownHostsFile=./known_hosts \
+ -o ControlMaster=auto \
+ -o Cipher=blowfish \
+ -o ConnectTimeout=5 "
ERROR=0
set_error () {
if [ ! "$1" == "0" ]
then
- ERROR=$1
+ ERROR=1
fi
}
+ ssh -q -o ConnectTimeout=5 $SSH_KEY $USER@$NODE exit 0
+ set_error "$?"
+ if [ ! "$ERROR" == "0" ]
+ then
+ log ERROR "Cannot connect to node $NODE."
+ return
+ fi
+
+ ssh -N -M $SSH_OPTS_NODE $SSH_KEY $USER@$NODE &
+ SSH_PID=$!
+
+ KEY=`echo $SSH_KEY | cut -d " " -f 2`
+
+ sleep 1
+
+ ssh -q $SSH_OPTS_NODE $SSH_KEY $USER@$NODE "cd ~ && mkdir $PPSS_HOME_DIR >> /dev/null 2>&1"
+ scp -q $SSH_OPTS_NODE $SSH_KEY $0 $USER@$NODE:~/$PPSS_HOME_DIR
+ set_error $?
+ scp -q $SSH_OPTS_NODE $SSH_KEY $KEY $USER@$NODE:~/$PPSS_HOME_DIR
+ set_error $?
+ scp -q $SSH_OPTS_NODE $SSH_KEY $CONFIG $USER@$NODE:~/$PPSS_HOME_DIR
+ set_error $?
+ scp -q $SSH_OPTS_NODE $SSH_KEY known_hosts $USER@$NODE:~/$PPSS_HOME_DIR
+ set_error $?
+ if [ ! -z "$SCRIPT" ]
+ then
+ scp -q $SSH_OPTS_NODE $SSH_KEY $SCRIPT $USER@$NODE:~/$PPSS_HOME_DIR
+ set_error $?
+ fi
+
+ if [ ! -z "$INPUT_FILE" ]
+ then
+ scp -q $SSH_OPTS_NODE $SSH_KEY $INPUT_FILE $USER@$NODE:~/$PPSS_HOME_DIR
+ set_error $?
+ fi
+
+ if [ "$ERROR" == "0" ]
+ then
+ log INFO "PPSS installed on node $NODE."
+ else
+ log INFO "PPSS failed to install on $NODE."
+ fi
+
+ kill $SSH_PID
+}
+
+deploy_ppss () {
+
+
if [ -z "$NODES_FILE" ]
then
log INFO "ERROR - are you using the right option? -C ?"
+ set_status ERROR
cleanup
exit 1
fi
@@ -617,50 +848,47 @@ deploy_ppss () {
KEY=`echo $SSH_KEY | cut -d " " -f 2`
if [ -z "$KEY" ] || [ ! -e "$KEY" ]
then
- log INFO "ERROR - nodes require a key file."
+ log ERROR "Nodes require a key file."
cleanup
+ set_status "ERROR"
exit 1
fi
- if [ ! -e "$SCRIPT" ]
+ if [ ! -e "$SCRIPT" ] && [ ! -z "$SCRIPT" ]
then
- log INFO "ERROR - script $SCRIPT not found."
+ log ERROR "Script $SCRIPT not found."
+ set_status "ERROR"
cleanup
exit 1
fi
+ INSTALLED_ON_SSH_SERVER=0
if [ ! -e "$NODES_FILE" ]
then
- log INFO "ERROR file $NODES with list of nodes does not exist."
+ log ERROR "File $NODES with list of nodes does not exist."
+ set_status ERROR
cleanup
exit 1
else
for NODE in `cat $NODES_FILE`
do
- ssh -q $USER@$NODE "mkdir $PPSS_HOME_DIR >> /dev/null 2>&1"
- scp -q $SSH_OPTS $0 $USER@$NODE:~/$PPSS_HOME_DIR
- set_error $?
- scp -q $KEY $USER@$NODE:~/$PPSS_HOME_DIR
- set_error $?
- scp -q $CONFIG $USER@$NODE:~/$PPSS_HOME_DIR
- set_error $?
- scp -q known_hosts $USER@$NODE:~/$PPSS_HOME_DIR
- set_error $?
- scp -q $SCRIPT $USER@$NODE:~/$PPSS_HOME_DIR
- set_error $?
- if [ ! -z "$INPUT_FILE" ]
- then
- scp -q $INPUT_FILE $USER@$NODE:~/$PPSS_HOME_DIR
- set_error $?
- fi
-
- if [ "$ERROR" == "0" ]
- then
- log INFO "PPSS installed on node $NODE."
- else
- log INFO "PPSS failed to install on $NODE."
+ deploy "$NODE" &
+ sleep 0.1
+ if [ "$NODE" == "$SSH_SERVER" ]
+ then
+ log DEBUG "SSH SERVER $SSH_SERVER is also a node."
+ INSTALLED_ON_SSH_SERVER=1
+ exec_cmd "mkdir -p $PPSS_HOME_DIR/$JOB_LOG_DIR"
+ exec_cmd "mkdir -p $ITEM_LOCK_DIR"
fi
done
+ if [ "$INSTALLED_ON_SSH_SERVER" == "0" ]
+ then
+ log DEBUG "SSH SERVER $SSH_SERVER is not a node."
+ deploy "$SSH_SERVER"
+ exec_cmd "mkdir -p $PPSS_HOME_DIR/$JOB_LOG_DIR"
+ exec_cmd "mkdir -p $ITEM_LOCK_DIR"
+ fi
fi
}
@@ -669,20 +897,20 @@ start_ppss_on_node () {
NODE="$1"
log INFO "Starting PPSS on node $NODE."
- ssh $USER@$NODE "cd $PPSS_HOME_DIR ; screen -d -m -S PPSS ./ppss.sh node --config $CONFIG"
+ ssh $SSH_KEY $USER@$NODE -o ConnectTimeout=5 "cd $PPSS_HOME_DIR ; screen -d -m -S PPSS ~/$PPSS_HOME_DIR/$0 node --config ~/$PPSS_HOME_DIR/$CONFIG"
}
-
test_server () {
# Testing if the remote server works as expected.
if [ ! -z "$SSH_SERVER" ]
- then
+ then
exec_cmd "date >> /dev/null"
check_status "$?" "$FUNCNAME" "Server $SSH_SERVER could not be reached"
ssh -N -M $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER &
SSH_MASTER_PID="$!"
+ log DEBUG "SSH Master pid is $SSH_MASTER_PID"
else
log DEBUG "No remote server specified, assuming stand-alone mode."
fi
@@ -712,46 +940,67 @@ get_no_of_cpus () {
then
NUMBER=`grep ^processor $CPUINFO | wc -l`
got_cpu_info "$?"
-
+
elif [ "$ARCH" == "Darwin" ]
then
NUMBER=`sysctl -a hw | grep -w logicalcpu | awk '{ print $2 }'`
got_cpu_info "$?"
+
elif [ "$ARCH" == "FreeBSD" ]
then
NUMBER=`sysctl hw.ncpu | awk '{ print $2 }'`
got_cpu_info "$?"
- else
- NUMBER=`grep ^processor $CPUINFO | wc -l`
+
+ elif [ "$ARCH" == "SunOS" ]
+ then
+ NUMBER=`psrinfo | grep on-line | wc -l`
got_cpu_info "$?"
+ else
+ if [ -e "$CPUINFO" ]
+ then
+ NUMBER=`grep ^processor $CPUINFO | wc -l`
+ got_cpu_info "$?"
+ fi
fi
+
+ if [ ! -z "$NUMBER" ]
+ then
+ log INFO "Found $NUMBER logic processors."
+ fi
+
elif [ "$HPT" == "no" ]
then
- log DEBUG "Hyperthreading is disabled."
+ log INFO "Hyperthreading is disabled."
+
if [ "$ARCH" == "Linux" ]
then
PHYSICAL=`grep 'physical id' $CPUINFO`
if [ "$?" == "0" ]
then
PHYSICAL=`grep 'physical id' $CPUINFO | sort | uniq | wc -l`
- log DEBUG "Detected $PHYSICAL CPU(s)"
- TMP=`grep 'cpu cores' $CPUINFO`
+ if [ "$PHYSICAL" == "1" ]
+ then
+ log INFO "Found $PHYSICAL physical CPU."
+ else
+ log INFO "Found $PHYSICAL physical CPUs."
+ fi
+
+ TMP=`grep 'core id' $CPUINFO`
if [ "$?" == "0" ]
then
- MULTICORE=`grep 'cpu cores' $CPUINFO | sort | uniq | cut -d ":" -f 2 | sed s/\ //g`
- log DEBUG "Detected $MULTICORE cores per CPU."
- NUMBER=$(($PHYSICAL*$MULTICORE))
+ log DEBUG "Starting job only for each physical core on all physical CPU(s)."
+ NUMBER=`grep 'core id' $CPUINFO | sort | uniq | wc -l`
+ log INFO "Found $NUMBER physical cores."
else
- log DEBUG "Starting job only for each physical CPU."
+ log INFO "Single core processor(s) detected."
+ log INFO "Starting job for each physical CPU."
NUMBER=$PHYSICAL
fi
else
- log DEBUG "No 'physical id' section found in $CPUINFO."
+ log INFO "No 'physical id' section found in $CPUINFO, typical for older cpus."
NUMBER=`grep ^processor $CPUINFO | wc -l`
got_cpu_info "$?"
fi
-
-
elif [ "$ARCH" == "Darwin" ]
then
NUMBER=`sysctl -a hw | grep -w physicalcpu | awk '{ print $2 }'`
@@ -767,16 +1016,17 @@ get_no_of_cpus () {
fi
- if [ ! -z "$NUMBER" ]
+ if [ ! -z "$NUMBER" ]
then
- echo "$NUMBER"
+ MAX_NO_OF_RUNNING_JOBS=$NUMBER
else
- log INFO "$FUNCNAME ERROR - number of CPUs not obtained."
+ log ERROR "Number of CPUs not obtained."
+ log ERROR "Please specify manually with -p."
+ set_status "ERROR"
exit 1
fi
}
-
random_delay () {
ARGS="$1"
@@ -784,12 +1034,13 @@ random_delay () {
if [ -z "$ARGS" ]
then
log ERROR "$FUNCNAME Function random delay, no argument specified."
+ set_status ERROR
exit 1
fi
NUMBER=$RANDOM
let "NUMBER %= $ARGS"
- sleep "$NUMBER"
+ sleep "0.$NUMBER"
}
@@ -814,7 +1065,7 @@ get_global_lock () {
ERROR="$?"
if [ ! "$ERROR" == "0" ]
then
- random_delay $MAX_DELAY
+ random_delay $MAX_LOCK_DELAY
continue
else
break
@@ -838,47 +1089,91 @@ are_jobs_running () {
fi
}
+escape_item () {
+
+ TMP="$1"
+
+ ITEM_ESCAPED=`echo "$TMP" | \
+ sed s/\\ /\\\\\\\\\\\\\\ /g | \
+ sed s/\\'/\\\\\\\\\\\\\\'/g | \
+ sed s/\\\`/\\\\\\\\\\\\\\\`/g | \
+ sed s/\\|/\\\\\\\\\\\\\\|/g | \
+ sed s/\&/\\\\\\\\\\\\\\&/g | \
+ sed s/\;/\\\\\\\\\\\\\\;/g | \
+ sed s/\\\//\\\\\\\\\\\\\\ /g | \
+ sed s/\(/\\\\\\\\\\(/g | \
+ sed s/\)/\\\\\\\\\\)/g `
+}
+
download_item () {
ITEM="$1"
- ITEM_WITH_PATH="$SRC_DIR/$ITEM"
+ if [ -e "$ITEM" ]
+ then
+ ITEM_NO_PATH=`basename "$ITEM"`
+ else
+ escape_item "$ITEM"
+ ITEM_NO_PATH="$ITEM_ESCAPED"
+ fi
if [ "$TRANSFER_TO_SLAVE" == "1" ]
then
- log DEBUG "Transfering item $ITEM to local disk."
- if [ "$SECURE_COPY" == "1" ]
+ log DEBUG "Transfering item $ITEM_NO_PATH to local disk."
+ if [ "$SECURE_COPY" == "1" ] && [ ! -z "$SSH_SERVER" ]
then
- scp -q $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER:"$ITEM_WITH_PATH" $PPSS_LOCAL_TMPDIR
- log DEBUG "Exit code of transfer is $?"
+ if [ ! -z "$SRC_DIR" ]
+ then
+ ITEM_PATH="$SRC_DIR/$ITEM"
+ else
+ ITEM_PATH="$ITEM"
+ fi
+
+ escape_item "$ITEM_PATH"
+
+ scp -q $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER:"$ITEM_ESCAPED" ./$PPSS_LOCAL_TMPDIR
+ log DEBUG "Exit code of remote transfer is $?"
else
- cp "$ITEM_WITH_PATH" $PPSS_LOCAL_TMPDIR
- log DEBUG "Exit code of transfer is $?"
+ cp "$ITEM" ./$PPSS_LOCAL_TMPDIR
+ log DEBUG "Exit code of local transfer is $?"
fi
+ else
+ log DEBUG "No transfer of item $ITEM_NO_PATH to local workpath."
fi
}
upload_item () {
+
ITEM="$1"
+ ITEMDIR="$2"
+
+ if [ "$TRANSFER_TO_SLAVE" == "0" ]
+ then
+ log DEBUG "File transfer is disabled."
+ return 0
+ fi
log DEBUG "Uploading item $ITEM."
if [ "$SECURE_COPY" == "1" ]
then
- scp -q $SSH_OPTS $SSH_KEY "$ITEM" $USER@$SSH_SERVER:$REMOTE_OUTPUT_DIR
+ escape_item "$REMOTE_OUTPUT_DIR$ITEMDIR"
+ DIR_ESCAPED="$ITEM_ESCAPED"
+
+ scp -q $SSH_OPTS $SSH_KEY "$ITEM"/* $USER@$SSH_SERVER:"$DIR_ESCAPED"
ERROR="$?"
if [ ! "$ERROR" == "0" ]
then
- log DEBUG "ERROR - uploading of $ITEM failed."
+ log ERROR "Uploading of $ITEM via SCP failed."
else
log DEBUG "Upload of item $ITEM success"
- rm $ITEM
+ rm -rf ./"$ITEM"
fi
else
- cp "$ITEM" $REMOTE_OUTPUT_DIR
+ cp "$ITEM" "$REMOTE_OUTPUT_DIR"
ERROR="$?"
if [ ! "$ERROR" == "0" ]
then
- log DEBUG "ERROR - uploading of $ITEM failed."
+ log DEBUG "ERROR - uploading of $ITEM vi CP failed."
fi
fi
}
@@ -888,29 +1183,34 @@ lock_item () {
if [ ! -z "$SSH_SERVER" ]
then
ITEM="$1"
- LOCK_FILE_NAME=`echo $ITEM | sed s/^\\\.//g |sed s/^\\\.\\\.//g | sed s/\\\///g`
+
+ LOCK_FILE_NAME=`echo "$ITEM" | \
+ sed s/^\\\.//g | \
+ sed s/^\\\.\\\.//g | \
+ sed s/^\\\///g | \
+ sed s/\\\//\\\\\\ /g | \
+ sed s/\\ /\\\\\\\\\\\\\\ /g | \
+ sed s/\\'/\\\\\\\\\\\\\\'/g | \
+ sed s/\\\//\\\\\\\\\\\\\\ /g | \
+ sed s/\&/\\\\\\\\\\\\\\&/g | \
+ sed s/\;/\\\\\\\\\\\\\\;/g | \
+ sed s/\(/\\\\\\\\\\(/g | \
+ sed s/\)/\\\\\\\\\\)/g `
+
ITEM_LOCK_FILE="$ITEM_LOCK_DIR/$LOCK_FILE_NAME"
- log DEBUG "Trying to lock item $ITEM."
+ log DEBUG "Trying to lock item $ITEM - $ITEM_LOCK_FILE."
exec_cmd "mkdir $ITEM_LOCK_FILE >> /dev/null 2>&1"
ERROR="$?"
- if [ -e "$ITEM_LOCK_FILE" ]
+
+ if [ "$ERROR" == "$?" ]
then
- exec_cmd "touch $ITEM_LOCK_FILE/$HOSTNAME"
+ exec_cmd "touch $ITEM_LOCK_FILE/$HOSTNAME" # Record that item is claimed by node x.
fi
+
return "$ERROR"
fi
}
-release_item () {
-
- ITEM="$1"
-
- LOCK_FILE_NAME=`echo $ITEM` # | sed s/^\\.//g | sed s/^\\.\\.//g | sed s/\\\///g`
- ITEM_LOCK_FILE="$ITEM_LOCK_DIR/$LOCK_FILE_NAME"
-
- exec_cmd "rm -rf ./$ITEM_LOCK_FILE"
-}
-
get_all_items () {
count=0
@@ -922,7 +1222,12 @@ get_all_items () {
ITEMS=`exec_cmd "ls -1 $SRC_DIR"`
check_status "$?" "$FUNCNAME" "Could not list files within remote source directory."
else
- ITEMS=`ls -1 $SRC_DIR`
+ if [ -e "$SRC_DIR" ]
+ then
+ ITEMS=`ls -1 $SRC_DIR`
+ else
+ ITEMS=""
+ fi
fi
IFS=$'\n'
@@ -936,29 +1241,32 @@ get_all_items () {
if [ ! -z "$SSH_SERVER" ] # Are we running stand-alone or as a slave?"
then
log DEBUG "Running as slave, input file has been pushed (hopefully)."
- if [ ! -e "$INPUT_FILE" ]
- then
- log INFO "ERROR - input file $INPUT_FILE does not exist."
- cleanup
- exit 1
- fi
+ fi
+ if [ ! -e "$INPUT_FILE" ]
+ then
+ log ERROR "Input file $INPUT_FILE does not exist."
+ set_status "ERROR"
+ cleanup
+ exit 1
fi
- exec 10<$INPUT_FILE
+ exec 10<"$INPUT_FILE"
while read LINE <&10
do
ARRAY[$count]=$LINE
((count++))
done
+
+ exec 10>&-
fi
- exec 10>&-
SIZE_OF_ARRAY="${#ARRAY[@]}"
if [ "$SIZE_OF_ARRAY" -le "0" ]
then
- log INFO "ERROR: source file/dir seems to be empty."
+ log ERROR "Source file/dir seems to be empty."
+ set_status STOPPED
cleanup
exit 1
fi
@@ -972,7 +1280,7 @@ get_item () {
then
return 1
fi
-
+
get_global_lock
SIZE_OF_ARRAY="${#ARRAY[@]}"
@@ -986,20 +1294,16 @@ get_item () {
# This variable is used to walk thtough all array items.
ARRAY_POINTER=`cat $ARRAY_POINTER_FILE`
-
- # Gives a status update on the current progress..
- PERCENT=$((100 * $ARRAY_POINTER / $SIZE_OF_ARRAY ))
- log INFO "Currently $PERCENT percent complete. Processed $ARRAY_POINTER of $SIZE_OF_ARRAY items."
- echo -en "\033[1A"
-
+
# Check if all items have been processed.
if [ "$ARRAY_POINTER" -ge "$SIZE_OF_ARRAY" ]
then
release_global_lock
- return 2
+ #echo -en "\033[1A"
+ return 1
fi
- # Select an item.
+ # Select an item.
ITEM="${ARRAY[$ARRAY_POINTER]}"
if [ -z "$ITEM" ]
then
@@ -1031,7 +1335,13 @@ start_single_worker () {
ERROR=$?
if [ ! "$ERROR" == "0" ]
then
- log DEBUG "Item empty, we are probably almost finished."
+ #
+ # If no more items are available, the listener should be
+ # informed that a worker just finished / died.
+ # This allows the listener to determine if all processes
+ # are finished and it is time to stop.
+ #
+ echo "$STOP_KEY" > $FIFO
return 1
else
get_global_lock
@@ -1061,87 +1371,129 @@ elapsed () {
commando () {
ITEM="$1"
- ITEM_NO_PATH="$1"
- log DEBUG "Processing item $ITEM"
-
- if [ -z "$INPUT_FILE" ] && [ "$TRANSFER_TO_SLAVE" == "0" ]
+ if [ -e "$ITEM" ]
then
- ITEM="$SRC_DIR/$ITEM"
+ DIRNAME=`dirname "$ITEM"`
+ ITEM_NO_PATH=`basename "$ITEM"`
+ escape_item "$ITEM_NO_PATH"
+ OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$ITEM_ESCAPED"
+ # ^
+ # | This VAR can be used in scripts or command lines.
+ #
+ OUTPUT_FILE="$ITEM_ESCAPED"
else
- ITEM="$PPSS_LOCAL_TMPDIR/$ITEM"
+ DIRNAME=""
+ escape_item "$ITEM"
+ ITEM_NO_PATH="$ITEM_ESCAPED"
+ OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$ITEM_NO_PATH"
fi
- LOG_FILE_NAME=`echo "$ITEM" | sed s/^\\\.//g | sed s/^\\\.\\\.//g | sed s/\\\///g`
+ log DEBUG "Processing item $ITEM"
+ #
+ # Decide if an item must be transfered from server to the node.
+ # or be processed in-place (NFS / SMB mount?)
+ #
+ if [ "$TRANSFER_TO_SLAVE" == "0" ]
+ then
+ if [ -z "$SRC_DIR" ] && [ ! -z "$INPUT_FILE" ]
+ then
+ log DEBUG "Using item straight from the server."
+ else
+ ITEM="$SRC_DIR/$ITEM"
+ fi
+ else
+ ITEM="./$PPSS_LOCAL_TMPDIR/$ITEM_NO_PATH"
+ fi
+
+ LOG_FILE_NAME=`echo "$ITEM" | sed s/^\\\.//g | sed s/^\\\.\\\.//g | sed s/\\\///g | sed s/\\ /_/g`
ITEM_LOG_FILE="$JOB_LOG_DIR/$LOG_FILE_NAME"
- mkdir -p $PPSS_LOCAL_OUTPUT/"$ITEM_NO_PATH"
+ OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$LOG_FILE_NAME"
+
+ mkdir -p "$OUTPUT_DIR"
does_file_exist "$ITEM_LOG_FILE"
if [ "$?" == "0" ]
then
log DEBUG "Skipping item $ITEM - already processed."
else
-
ERROR=""
-
+ #
# Some formatting of item log files.
+ #
DATE=`date +%b\ %d\ %H:%M:%S`
echo "===== PPSS Item Log File =====" > "$ITEM_LOG_FILE"
echo -e "Host:\t\t$HOSTNAME" >> "$ITEM_LOG_FILE"
+ echo -e "Process:\t$PID" >> "$ITEM_LOG_FILE"
echo -e "Item:\t\t$ITEM" >> "$ITEM_LOG_FILE"
echo -e "Start date:\t$DATE" >> "$ITEM_LOG_FILE"
echo -e "" >> "$ITEM_LOG_FILE"
- # The actual execution of the command.
+ #
+ # The actual execution of the command as specified by
+ # the -c option.
+ #
+ BEFORE="$(date +%s)"
TMP=`echo $COMMAND | grep -i '$ITEM'`
if [ "$?" == "0" ]
then
- BEFORE="$(date +%s)"
eval "$COMMAND" >> "$ITEM_LOG_FILE" 2>&1
ERROR="$?"
- AFTER="$(date +%s)"
+ MYPID="$!"
else
- EXECME='$COMMAND"$ITEM" >> "$ITEM_LOG_FILE" 2>&1'
- BEFORE="$(date +%s)"
- eval "$EXECME"
+ eval '$COMMAND"$ITEM" >> "$ITEM_LOG_FILE" 2>&1'
ERROR="$?"
- AFTER="$(date +%s)"
+ MYPID="$!"
fi
+ AFTER="$(date +%s)"
echo -e "" >> "$ITEM_LOG_FILE"
# Some error logging. Success or fail.
if [ ! "$ERROR" == "0" ]
then
- echo -e "Status:\t\tError - something went wrong." >> "$ITEM_LOG_FILE"
+ echo -e "Status:\t\tFAILURE" >> "$ITEM_LOG_FILE"
else
- echo -e "Status:\t\tSucces - item has been processed." >> "$ITEM_LOG_FILE"
+ echo -e "Status:\t\tSUCCESS" >> "$ITEM_LOG_FILE"
fi
+ #
+ # If part of a cluster, remove the downloaded item after
+ # it has been processed and uploaded as not to fill up disk space.
+ #
if [ "$TRANSFER_TO_SLAVE" == "1" ]
then
if [ -e "$ITEM" ]
then
- rm $ITEM
+ rm "$ITEM"
else
log DEBUG "ERROR Something went wrong removing item $ITEM from local work dir."
fi
fi
- if [ ! -z "$REMOTE_OUTPUT_DIR" ] && [ ! -z "$SSH_SERVER" ]
+ escape_item "$DIRNAME"
+ ITEM_OUTPUT_DIR="$REMOTE_OUTPUT_DIR/$ITEM_ESCAPED"
+
+ exec_cmd "mkdir -p $ITEM_OUTPUT_DIR"
+ if [ "$DIRNAME" == "." ]
then
- upload_item "$PPSS_LOCAL_OUTPUT/$ITEM_NO_PATH/*"
+ DIRNAME=""
fi
+ upload_item "$PPSS_LOCAL_OUTPUT/$ITEM_NO_PATH" "$DIRNAME"
elapsed "$BEFORE" "$AFTER" >> "$ITEM_LOG_FILE"
echo -e "" >> "$ITEM_LOG_FILE"
if [ ! -z "$SSH_SERVER" ]
then
- log DEBUG "Uploading item log file $ITEM_LOG_FILE to master."
- scp -q $SSH_OPTS $SSH_KEY $ITEM_LOG_FILE $USER@$SSH_SERVER:~/$JOB_LOG_DIR/
+ log DEBUG "Uploading item log file $ITEM_LOG_FILE to master ~/$PPSS_HOME_DIR/$JOB_LOG_DIR"
+ scp -q $SSH_OPTS $SSH_KEY "$ITEM_LOG_FILE" $USER@$SSH_SERVER:~/$PPSS_HOME_DIR/$JOB_LOG_DIR
+ if [ ! "$?" == "0" ]
+ then
+ log DEBUG "Uploading of item log file failed."
+ fi
fi
fi
@@ -1149,15 +1501,115 @@ commando () {
return $?
}
+#
# This is the listener service. It listens on the pipe for events.
# A job is executed for every event received.
+# This listener enables fully asynchronous processing.
+#
listen_for_job () {
-
- log INFO "Listener started."
+ FINISHED=0
+ DIED=0
+ PIDS=""
+ log DEBUG "Listener started."
while read event <& 42
do
- commando "$event" &
+ if [ "$event" == "$STOP_KEY" ]
+ then
+ #
+ # The start_single_worker method sends a special signal to
+ # inform the listener that a worker is finished.
+ # If all workers are finished, it is time to stop.
+ # This mechanism makes PPSS asynchronous.
+ #
+ ((DIED++))
+ if [ "$DIED" -ge "$MAX_NO_OF_RUNNING_JOBS" ]
+ then
+ #kill_process
+ break
+ else
+ RES=$((MAX_NO_OF_RUNNING_JOBS-DIED))
+ if [ "$RES" == "1" ]
+ then
+ log INFO "$((MAX_NO_OF_RUNNING_JOBS-DIED)) job is remaining. "
+ else
+ log INFO "$((MAX_NO_OF_RUNNING_JOBS-DIED)) jobs are remaining."
+ echo -en "\033[1A"
+ fi
+ fi
+ elif [ "$event" == "$KILL_KEY" ]
+ then
+ #
+ # If all workers are finished, it is time to kill all remaining
+ # processes, terminate ssh processes and the listener itself.
+ #
+ # This command kills all processes that are related to the master
+ # process as defined by $PID. All processes that have ever been
+ # spawned, although disowned or backgrounded will be killed...
+ #
+ PROCLIST=`ps a -o pid,pgid,ppid,command | grep [0-9] | grep $PID | grep -v -i grep`
+ #echo "$PROCLIST" > proclist.txt
+ oldIFS=$IFS # save the field separator
+ IFS=$'\n' # new field separator, the end of line
+ for x in `echo "$PROCLIST"`
+ do
+ MYPPID=`echo $x | awk '{ print $3 }'`
+ MYPID=`echo $x | awk '{ print $1 }'`
+ if [ ! "$MYPPID" == "$PID" ] && [ ! "$MYPPID" == "1" ]
+ then
+ if [ ! "$MYPID" == "$PID" ]
+ then
+ log DEBUG "Killing process $MYPID"
+ kill $MYPID >> /dev/null 2>&1
+ else
+ log DEBUG "Not killing master process..$MYPID.."
+ fi
+ else
+ log DEBUG "Not killing listener process. $MYPID.."
+ fi
+ done
+ IFS=$oldIFS
+
+ if [ ! -z "$SSH_MASTER_PID" ]
+ then
+ kill "$SSH_MASTER_PID"
+ fi
+ break
+ else
+ commando "$event" &
+ MYPID="$!"
+ disown
+ PIDS="$PIDS $MYPID"
+ #log DEBUG "Event $event has pid $MYPID"
+ fi
+
+ get_global_lock
+ SIZE_OF_ARRAY="${#ARRAY[@]}"
+ ARRAY_POINTER=`cat $ARRAY_POINTER_FILE`
+ release_global_lock
+ PERCENT=$((100 * $ARRAY_POINTER / $SIZE_OF_ARRAY ))
+ if [ "$DIED" == "0" ] && [ "$FINISHED" == "0" ]
+ then
+ log INFO "Currently $PERCENT percent complete. Processed $ARRAY_POINTER of $SIZE_OF_ARRAY items."
+ if [ "$PERCENT" == "100" ]
+ then
+ FINISHED=1
+ else
+ echo -en "\033[1A"
+ fi
+ fi
done
+
+ set_status STOPPED
+ log DEBUG "Listener stopped."
+ if [ ! "$PERCENT" == "100" ]
+ then
+ echo
+ log INFO "Finished. Consult $JOB_LOG_DIR for job output."
+ log INFO "Press ENTER to continue."
+ else
+ log INFO "Finished. Consult $JOB_LOG_DIR for job output."
+ fi
+ cleanup
}
# This starts an number of parallel workers based on the # of parallel jobs allowed.
@@ -1165,19 +1617,37 @@ start_all_workers () {
if [ "$MAX_NO_OF_RUNNING_JOBS" == "1" ]
then
- log INFO "Starting $MAX_NO_OF_RUNNING_JOBS worker."
+ log INFO "Starting $MAX_NO_OF_RUNNING_JOBS single worker."
else
- log INFO "Starting $MAX_NO_OF_RUNNING_JOBS workers."
+ log INFO "Starting $MAX_NO_OF_RUNNING_JOBS parallel workers."
fi
+ log INFO "---------------------------------------------------------"
i=0
while [ "$i" -lt "$MAX_NO_OF_RUNNING_JOBS" ]
do
start_single_worker
((i++))
+
+ if [ ! "$MAX_DELAY" == "0" ]
+ then
+ random_delay "$MAX_DELAY"
+ fi
done
}
+get_status_of_node () {
+
+ NODE="$1"
+ STATUS=`ssh -o ConnectTimeout=10 $SSH_KEY $USER@$NODE cat "$PPSS_HOME_DIR/$NODE_STATUS" 2>/dev/null`
+ ERROR="$?"
+ if [ ! "$ERROR" == "0" ]
+ then
+ STATUS="UNKNOWN"
+ fi
+ echo "$STATUS"
+}
+
show_status () {
source $CONFIG
@@ -1188,43 +1658,73 @@ show_status () {
if [ -z "$INPUT_FILE" ]
then
- ITEMS=`exec_cmd "ls -1 $SRC_DIR | wc -l"`
+ ITEMS=`exec_cmd "ls -1 $SRC_DIR | wc -l"`
else
- ITEMS=`exec_cmd "cat $INPUT_FILE | wc -l"`
+ ITEMS=`exec_cmd "cat $PPSS_DIR/$INPUT_FILE | wc -l"`
fi
- PROCESSED=`exec_cmd "ls -1 $ITEM_LOCK_DIR | wc -l"`
- STATUS=$((100 * $PROCESSED / $ITEMS))
+ PROCESSED=`exec_cmd "ls -1 $ITEM_LOCK_DIR | wc -l"` 2>&1 >> /dev/null
+ TMP_STATUS=$((100 * $PROCESSED / $ITEMS))
- log INFO "$STATUS percent complete."
+ log INFO "Status:\t\t$TMP_STATUS percent complete."
+ if [ ! -z $NODES_FILE ]
+ then
+ TMP_NO=`cat $NODES_FILE | wc -l`
+ log INFO "Nodes:\t $TMP_NO"
+ fi
+ log INFO "Items:\t\t$ITEMS"
+
+
+ log INFO "---------------------------------------------------------"
+ HEADER=`echo IP-address Hostname Processed Status | awk '{ printf ("%-16s %-18s % 10s %10s\n",$1,$2,$3,$4) }'`
+ log INFO "$HEADER"
+ log INFO "---------------------------------------------------------"
+ PROCESSED=0
+ for x in `cat $NODES_FILE`
+ do
+ NODE=`get_status_of_node "$x" | awk '{ print $1 }'`
+ if [ ! "$NODE" == "UNKNOWN" ]
+ then
+ STATUS=`get_status_of_node "$x" | awk '{ print $2 }'`
+ RES=`exec_cmd "grep -i $NODE ~/$PPSS_HOME_DIR/$JOB_LOG_DIR/* 2>/dev/null | wc -l "`
+ if [ ! "$?" == "0" ]
+ then
+ RES=0
+ fi
+ else
+ STATUS="UNKNOWN"
+ RES=0
+ fi
+ let PROCESSED=$PROCESSED+$RES
+ LINE=`echo "$x $NODE $RES $STATUS" | awk '{ printf ("%-16s %-18s % 10s %10s\n",$1,$2,$3,$4) }'`
+ log INFO "$LINE"
+ done
+ log INFO "---------------------------------------------------------"
+ LINE=`echo $PROCESSED | awk '{ printf ("Total processed: % 29s\n",$1) }'`
+ log INFO "$LINE"
}
# If this is called, the whole framework will execute.
main () {
- is_running
-
case $MODE in
- node|standalone )
- log DEBUG "---------------- START ---------------------"
- log INFO "$SCRIPT_NAME version $SCRIPT_VERSION"
- log INFO `hostname`
+ node )
init_vars
test_server
get_all_items
- listen_for_job "$MAX_NO_OF_RUNNING_JOBS" &
+ listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & 2>&1 >> /dev/null
LISTENER_PID=$!
start_all_workers
;;
- start )
+ start )
# This option only starts all nodes.
- init_vars
-
+ display_header
if [ ! -e "$NODES_FILE" ]
then
- log INFO "ERROR file $NODES with list of nodes does not exist."
+ log ERROR "File $NODES with list of nodes does not exist."
+ set_status STOPPED
cleanup
exit 1
else
@@ -1237,7 +1737,7 @@ main () {
exit 0
;;
config )
-
+ display_header
log INFO "Generating configuration file $CONFIG"
add_var_to_config PPSS_LOCAL_TMPDIR "$PPSS_LOCAL_TMPDIR"
add_var_to_config PPSS_LOCAL_OUTPUT "$PPSS_LOCAL_OUTPUT"
@@ -1246,18 +1746,21 @@ main () {
;;
stop )
+ display_header
log INFO "Stopping PPSS on all nodes."
exec_cmd "touch $STOP_SIGNAL"
cleanup
exit
;;
pause )
+ display_header
log INFO "Pausing PPSS on all nodes."
exec_cmd "touch $PAUSE_SIGNAL"
cleanup
exit
;;
continue )
+ display_header
if does_file_exist "$STOP_SIGNAL"
then
log INFO "Continuing processing, please use $0 start to start PPSS on al nodes."
@@ -1269,66 +1772,53 @@ main () {
exec_cmd "rm -f $PAUSE_SIGNAL"
fi
cleanup
- exit
+ exit 0
;;
deploy )
+ display_header
log INFO "Deploying PPSS on nodes."
deploy_ppss
+ wait
cleanup
exit 0
;;
status )
+ display_header
show_status
cleanup
exit 0
- # some show command
;;
erase )
+ display_header
log INFO "Erasing PPSS from all nodes."
erase_ppss
cleanup
exit 0
;;
- * )
- showusage
- exit 1
+ kill )
+ for x in `ps ux | grep ppss | grep -v grep | grep bash | awk '{ print $2 }'`
+ do
+ kill "$x"
+ done
+ cleanup
+ exit 0
;;
+
+ * )
+ init_vars
+ get_all_items
+ listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & 2>&1 >> /dev/null
+ LISTENER_PID=$!
+ #log DEBUG "Master PID is $PID."
+ #log DEBUG "Listener PID is $LISTENER_PID."
+ start_all_workers
+ ;;
+
esac
}
# This command starts the that sets the whole framework in motion.
main
-# Either start new jobs or exit, sleep in the meantime.
-while true
-do
- sleep 5
- JOBS=`ps ax | grep -v grep | grep -v -i screen | grep ppss.sh | wc -l`
- log INFO "There are $JOBS running processes. "
-
- MIN_JOBS=3
-
- if [ "$ARCH" == "Darwin" ]
- then
- MIN_JOBS=4
- elif [ "$ARCH" == "Linux" ]
- then
- MIN_JOBS=3
- fi
-
- if [ "$JOBS" -gt "$MIN_JOBS" ]
- then
- log INFO "Sleeping $INTERVAL seconds."
- sleep $INTERVAL
- else
- echo -en "\033[1B"
- log INFO "There are no more running jobs, so we must be finished."
- echo -en "\033[1B"
- log INFO "Killing listener and remainig processes."
- log INFO "Dying processes may display an error message."
- kill_process
- fi
-done
-
# Exit after all processes have finished.
-wait
+wait