ppss/ppss
2011-08-29 01:09:02 +00:00

3026 lines
86 KiB
Bash
Executable File

#!/usr/bin/env bash
#
# PPSS, the Parallel Processing Shell Script
#
# Copyright (c) 2010, Louwrentius
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# See <http://www.gnu.org/licenses/>
# for a copy of the GNU General Public License
#
# "Patches or other contributions are always welcome!"
#
#
# Handling control-c for a clean shutdown.
#
trap 'kill_process' SIGINT
SCRIPT_NAME="Distributed Parallel Processing Shell Script"
SCRIPT_VERSION="2.90"
#
# The first argument to this script can be a mode.
#
MODES="node start config stop pause continue deploy status erase kill"
for x in $MODES
do
if [ "$x" == "$1" ]
then
MODE="$1"
shift
break
fi
done
#
# The working directory of PPSS can be set with
# export PPSS_DIR=/path/to/workingdir
#
if [ -z "$PPSS_DIR" ]
then
PPSS_DIR="ppss_dir"
fi
CONFIG=""
HOSTNAME="`hostname`"
ARCH="`uname`"
PPSS_HOME_DIR="ppss-home"
SOURCED="$0"
PID="$$"
PAUSE_SIGNAL="$PPSS_HOME_DIR/$PPSS_DIR/pause_signal" # Pause processing if this file is present.
PAUSE_DELAY="60" # Polling every 1 minutes by default.
STOP_SIGNAL="$PPSS_HOME_DIR/$PPSS_DIR/stop_signal" # Stop processing if this file is present.
GLOBAL_COUNTER=1
LISTOFITEMS="$PPSS_DIR/INPUT_FILE-$PID"
JOB_LOG_DIR="$PPSS_DIR/job_log" # Directory containing log files of processed items.
LOGFILE="$PPSS_DIR/ppss-log-$PID.txt" # General PPSS log file. Contains lots of info.
FAILED_ITEMS_COUNTER=0
QUIET="0"
STOP="0" # STOP job.
MAX_DELAY="0" # MAX DELAY between jobs.
MAX_LOCK_DELAY="9" #
PERCENT="0"
LISTENER_PID=""
IFS_BACKUP="$IFS"
CPUINFO="/proc/cpuinfo"
PROCESSORS=""
START_KEY="start-$RANDOM$RANDOM$RANDOM$RANDOM" # If this key is received by listener, start a new process
FAIL_KEY="fail-$RANDOM$RANDOM$RANDOM$RANDOM" # if this key is received by listener, increase error count
KILL_KEY="kill-$RANDOM$RANDOM$RANDOM$RANDOM" # This is a signal to stop immediately and kill PPSS
QUEUE=""
INOTIFY=""
RECURSION="1" # all running processes.
START_PPSS=""
STOP_PPSS=""
SIZE_OF_INPUT=""
LOCAL_LOCKING="1"
LIST_OF_PROCESSED_ITEMS="$PPSS_DIR/LIST_OF_PROCESSED_ITEMS"
PROCESSED_ITEMS=""
UNPROCESSED_ITEMS=""
ACTIVE_WORKERS="0"
DAEMON_POLLING_INTERVAL="10"
STAT=""
DAEMON_FILE_AGE="4"
ENABLE_INPUT_LOCK="0"
PROCESSING_TIME=""
NODE_ID="NODE_ID"
USE_MD5="0"
RANDOMIZE="0"
SSH_SERVER="" # Remote server or 'master'.
SSH_KEY="" # SSH key for ssh account.
SSH_KNOWN_HOSTS=""
SSH_SOCKET="$PPSS_DIR/ppss_ssh_socket-$$" # Multiplex multiple SSH connections over 1 master.
SSH_OPTS="-o BatchMode=yes -o ControlPath=$SSH_SOCKET \
-o GlobalKnownHostsFile=./known_hosts \
-o ControlMaster=auto \
-o Cipher=blowfish \
-o ConnectTimeout=10 "
SSH_OPTS_NOMP="-o BatchMode=yes -o GlobalKnownHostsFile=./known_hosts \
-o Cipher=blowfish \
-o ConnectTimeout=10 "
# Blowfish is faster but still secure.
SSH_MASTER_PID=""
ITEM_LOCK_DIR="$PPSS_DIR/PPSS_ITEM_LOCK_DIR" # Remote directory on master used for item locking.
PPSS_LOCAL_TMPDIR="$PPSS_DIR/PPSS_LOCAL_TMPDIR" # Local directory on slave for local processing.
PPSS_LOCAL_OUTPUT="$PPSS_DIR/PPSS_LOCAL_OUTPUT" # Local directory on slave for local output.
DOWNLOAD_TO_NODE="0" # Transfer item to slave via (s)cp.
UPLOAD_TO_SERVER="0" # Transfer output back to server via (s)cp.
SECURE_COPY="1" # If set, use SCP, Otherwise, use cp.
REMOTE_OUTPUT_DIR="" # Remote directory to which output must be uploaded.
SCRIPT="" # Custom user script that is executed by ppss.
ITEM_ESCAPED=""
DISABLE_SKIPPING=0
PPSS_NODE_STATUS="$PPSS_DIR/NODE_STATUS"
NODE_STATUS_FILE="$PPSS_NODE_STATUS/$HOSTNAME-status.txt"
DAEMON=0
EMAIL=""
REGISTER="" # For STACK
STACK=""
TMP_STACK=""
showusage_short () {
echo
echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION"
echo
echo "usage: $0 [ -d <sourcedir> | -f <sourcefile> ] [ -c '<command> \"\$ITEM\"' ]"
echo " [ -C <configfile> ] [ -j ] [ -l <logfile> ] [ -p <# jobs> ]"
echo " [ -q ] [ -D <delay> ] [ -h ] [ --help ] [ -r ] [ --daemon ]"
echo
echo "Examples:"
echo " $0 -d /dir/with/some/files -c 'gzip '"
echo " $0 -d /dir/with/some/files -c 'cp \"\$ITEM\" /tmp' -p 2"
echo " $0 -f <file> -c 'wget -q -P /destination/directory \"\$ITEM\"' -p 10"
echo
}
showusage_normal () {
echo
echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION"
echo
echo "PPSS is a Bash shell script that executes commands in parallel on a set"
echo "of items, such as files in a directory, or lines in a file. The purpose"
echo "of PPSS is to make it simple to benefit from multiple CPUs or CPU cores."
echo
echo "This short summary only discusses options for stand-alone mode. For a"
echo "full listing of all options, run PPSS with the options --help"
echo
echo "Usage $0 [ options ]"
echo
echo -e "--command | -c Command to execute. Syntax: '<command> ' including the single quotes."
echo -e " Example: -c 'ls -alh '. It is also possible to specify where an item "
echo -e " must be inserted: 'cp \"\$ITEM\" /somedir'."
echo
echo -e "--sourcedir | -d Directory that contains files that must be processed. Individual files"
echo -e " are fed as an argument to the command that has been specified with -c."
echo
echo -e "--sourcefile | -f Each single line of the supplied file will be fed as an item to the"
echo -e " command that has been specified with -c. Read input from stdin with"
echo -e " -f -"
echo
echo -e "--config | -C If the mode is config, a config file with the specified name will be"
echo -e " generated based on all the options specified. In the other modes".
echo -e " this option will result in PPSS reading the config file and start"
echo -e " processing items based on the settings of this file."
echo
echo -e "--disable-ht | -j Disable hyper threading. Is enabled by default."
echo
echo -e "--log | -l Sets the name of the log file. The default is ppss-log.txt."
echo
echo -e "--processes | -p Start the specified number of processes. Ignore the number of available"
echo -e " CPUs."
echo
echo -e "--quiet | -q Shows no output except for a progress indication using percents."
echo
echo -e "--delay | -D Adds an initial random delay to the start of all parallel jobs to spread"
echo -e " the load. The delay (seconds) is only used at the start of all 'threads'."
echo
echo -e "--daemon Daemon mode. Do not exit after items are professed, but keep looking "
echo -e " for new items and process them. Read the manual how to use this!"
echo -e " See --help for important additional options regarding daemon mode."
echo
echo -e "--no-recursion|-r By default, recursion of directories is enabled when the -d option is "
echo -e " used. If this is not prefered, this can be disabled with this option "
echo -e " Only files within the specified directory will be processed."
echo
echo -e "--email | -e PPSS sends an e-mail if PPSS has finished. It is also used if processing"
echo -e " of an item has failed (configurable, see -h). "
echo
echo -e "--help Extended help, including options for distributed mode."
echo
echo -e "Example: encoding some wav files to mp3 using lame:"
echo
echo -e "$0 -d /path/to/wavfiles -c 'lame '"
echo
echo -e "Extended usage: use --help"
echo
}
showusage_long () {
echo
echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION"
echo
echo "PPSS is a Bash shell script that executes commands in parallel on a set "
echo "of items, such as files in a directory, or lines in a file."
echo
echo "Usage: $0 [ MODE ] [ options ]"
echo
echo "Modes are optional and mainly used for running in distributed mode. Modes are:"
echo
echo " config Generate a config file based on the supplied option parameters."
echo " deploy Deploy PPSS and related files on the specified nodes."
echo " erase Erase PPSS and related files from the specified nodes."
echo
echo " start Starting PPSS on nodes."
echo " pause Pausing PPSS on all nodes."
echo " stop Stopping PPSS on all nodes."
echo " continue Continuing PPSS on all nodes."
echo " node Running PPSS as a node, requires additional options."
echo
echo "Options are:"
echo
echo -e "--command | -c Command to execute. Syntax: '<command> ' including the single quotes."
echo -e " Example: -c 'ls -alh '. It is also possible to specify where an item "
echo -e " must be inserted: 'cp \"\$ITEM\" /somedir'."
echo
echo -e "--sourcedir | -d Directory that contains files that must be processed. Individual files"
echo -e " are fed as an argument to the command that has been specified with -c."
echo
echo -e "--sourcefile | -f Each single line of the supplied file will be fed as an item to the"
echo -e " command that has been specified with -c. Instead of a file, stdin can"
echo -e " be specified like \"-f -\" in order to 'pipe' items to ppss."
echo -e " Example: cat file | ppss -f - -c 'echo '"
echo
echo -e "--config | -C If the mode is config, a config file with the specified name will be"
echo -e " generated based on all the options specified. In the other modes".
echo -e " this option will result in PPSS reading the config file and start"
echo -e " processing items based on the settings of this file."
echo
echo -e "--disable-ht | -j Disable hyper threading. Is enabled by default."
echo
echo -e "--log | -l Sets the name of the log file. The default is ppss-log-<pid>.txt."
echo
echo -e "--processes | -p Start the specified number of processes. Ignore the number of available"
echo -e " CPUs."
echo
echo -e "--quiet | -q Shows no output except for a progress indication using percents."
echo
echo -e "--delay | -D Adds an initial random delay to the start of all parallel jobs to spread"
echo -e " the load. The delay is only used at the start of all 'threads'."
echo
echo -e "--daemon Do not exit after items are professed, but keep looking for new items"
echo -e " and process them. Read the manual how to use this!"
echo
echo -e "--interval Specifies the polling interval when running in daemon mode. Polls every"
echo -e " x seconds for new items to process."
echo
echo -e "--file-age When not using inotify, specify how many seconds must have passed before"
echo -e " a file may be processed to prevent files being processed while being "
echo -e " written to."
echo
echo -e "--disable-inotify If for some reason, inotify must not be used, use this option to disable"
echo -e " usage of inotify. Regular polling will be used."
echo
echo -e "--enable-input-lock When PPSS is run in daemon mode, create a directory INPUT_LOCK to"
echo -e " signal that items are processed and may not be touched by PPSS."
echo -e " Once this directory is removed, PPSS will start processing items."
echo
echo -e "--no-recursion|-r By default, recursion of directories is enabled when the -d option is "
echo -e " used. If this is not prefered, this can be disabled with this option "
echo -e " Only files within the specified directory will be processed."
echo
echo -e "--md5|-M Use MD5 to create unique file names for locking and log file names."
echo -e " PPSS strips al non [:alnum:] characters of an item string and this may"
echo -e " cause collisions. String ABC!@# and ABC^&* will become both ABC___"
echo
echo -e "The following options are used for distributed execution of PPSS."
echo
echo -e "--master | -m Specifies the SSH server that is used for communication between nodes."
echo -e " Using SSH, file locks are created, informing other nodes that an item "
echo -e " is locked. If items are files that must be processed, they must reside"
echo -e " on this host. SCP is used to transfer files from this host to nodes"
echo -e " for local procesing."
echo
echo -e "--node | -n File containig a list of nodes that act as PPSS clients. One IP / DNS"
echo -e " name per line."
echo
echo -e "--key | -k The SSH key that a node uses to connect to the master."
echo
echo -e "--known-hosts | -K The file that contains the server public key. Can often be found on "
echo -e " hosts that already once connected to the server. See the file "
echo -e " ~/.ssh/known_hosts or else, manualy connect once and check this file."
echo
echo -e "--user | -u The SSH user name that is used by the node when logging in into the"
echo -e " master SSH server."
echo
echo -e "--script | -S Specifies the script/program that must be copied to the nodes for"
echo -e " execution through PPSS. Only used in the deploy mode."
echo -e " This option should be specified if necessary when generating a config."
echo
echo -e "--download This option specifies that an item will be downloaded by the node"
echo -e " from the server or share to the local node for processing."
echo
echo -e "--upload This option specifies that the output file will be copied back to"
echo -e " the server, the --outputdir option is mandatory."
echo
echo -e "--no-scp | -b Do not use scp for downloading items. Use cp instead. Assumes that a"
echo -e " network file system (NFS/SMB) is mounted under a local mount point."
echo
echo -e "--outputdir | -o Directory on server where processed files are put. If the result of "
echo -e " encoding a wav file is an mp3 file, the mp3 file is put in the "
echo -e " directory specified with this option."
echo
echo -e "--homedir | -H Directory in which PPSS is installed on the node."
echo -e " Default is '$PPSS_HOME_DIR'."
echo
echo -e "--script | -S Script to run on the node. PPSS must copy this script to the node."
echo
echo -e "--randomize | -R Randomise which items to process by the client in distributed mode."
echo -e " This makes sure that with many nodes, some clients spend their time"
echo -e " trying to get a lock on an item."
echo
echo -e "Example: encoding some wav files to mp3 using lame:"
echo
echo -e "$0 -c 'lame ' -d /path/to/wavfiles -j "
echo
echo -e "Running PPSS based on a configuration file."
echo
echo -e "$0 -C config.cfg"
echo
echo -e "Generating a configuration file. Wavs are converted to mp3. SCP is used for data transfer."
echo
echo -e "$0 config -C ppss-config.cfg -d /some/dir -o output --download --upload -K known_hosts \\"
echo -e "-k ppss-key.dsa -n nodes.txt -m 10.0.0.100 \\"
echo -e "-c 'lame --quiet \"\$ITEM\" -o \"\$OUTPUT_DIR/\$OUTPUT_FILE\".mp3' "
echo
echo -e "Running PPSS on a client as part of a cluster."
echo
echo -e "$0 node -d /somedir -c 'cp \"\$ITEM\" /some/destination' -m 10.0.0.50 -u ppss -k ppss-key.key"
echo
}
kill_process () {
echo "$KILL_KEY" >> "$FIFO"
}
exec_cmd () {
STATUS=""
CMD="$1"
NOMP="$2" # Disable multiplexing.
if [[ "$ARCH" == "FreeBSD" ]]
then
CMD="bash $CMD"
fi
if [ ! -z "$SSH_SERVER" ]
then
if [ -z "$NOMP" ]
then
#log DEBUG "REMOTE EXEC"
ssh $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER $CMD
STATUS=$?
elif [ "$NOMP" == "1" ]
then
#log DEBUG "REMOTE EXEC NO MP"
ssh $SSH_OPTS_NOMP $SSH_KEY $USER@$SSH_SERVER $CMD
STATUS=$?
fi
else
eval "$CMD"
STATUS=$?
log DEBUG "LOCAL EXEC - status is $STATUS"
fi
return $STATUS
}
does_file_exist () {
#
# this function makes remote or local checking of existence of items transparent.
#
FILE="$1"
RES=`exec_cmd "ls -1 $FILE" 2>&1`
if [ "$?" = "0" ]
then
#log DEBUG "$FILE does exist"
return 0
else
#log DEBUG "$FILE does not exist"
return 1
fi
}
check_for_interrupt () {
#
# PPSS can be interupted with a stop or pause command.
#
does_file_exist "$STOP_SIGNAL"
if [ "$?" = "0" ]
then
set_status "STOPPED" "$FAILED_ITEMS_COUNTER"
log INFO "STOPPING job. Stop signal found."
STOP="1"
return 1
fi
does_file_exist "$PAUSE_SIGNAL"
if [ "$?" = "0" ]
then
set_status "PAUSED" "$FAILED_ITEMS_COUNTER"
log INFO "PAUSE: sleeping for $PAUSE_DELAY SECONDS."
sleep $PAUSE_DELAY
check_for_interrupt
else
set_status "RUNNING" "$FAILED_ITEMS_COUNTER"
fi
}
cleanup () {
log DEBUG "$FUNCNAME - Cleaning up all temp files and processes. $1"
for x in $MODES
do
if [ "$x" == "$MODE" ]
then
if [ "$x" != "node" ]
then
rm -rf "$PPSS_DIR"
fi
fi
done
if [ -e "$FIFO" ]
then
rm "$FIFO"
fi
if [ -e "$FIFO_LISTENER" ]
then
rm "$FIFO_LISTENER"
fi
if [ -e "$SSH_SOCKET" ]
then
rm -rf "$SSH_SOCKET"
fi
if [ ! -z "$SSH_MASTER_PID" ]
then
kill "$SSH_MASTER_PID"
fi
}
add_var_to_config () {
if [ "$MODE" == "config" ]
then
VAR="$1"
VALUE="$2"
echo -e "$VAR=$VALUE" >> $CONFIG
fi
}
is_var_empty () {
if [ -z "$1" ]
then
showusage_normal
cleanup
exit 1
fi
}
detect_source_dir_nfs_exported () {
log DEBUG "Executing $FUNCNAME"
if [ -e /etc/exports ]
then
log DEBUG "NFS /etc/exports found."
NFS=0
EXPORTS=`cat /etc/exports | grep ^/ | awk '{ print $1 }'`
for export in $EXPORTS
do
#
# If this for loop matches anything, the SRC_DIR is NFS exported.
# inotify does not play well with NFS. So it must be disabled.
#
DIRECTORY=`dirname "$SRC_DIR"`
while true
do
if [ ! "$DIRECTORY" = "/" ] && [ ! "$DIRECTORY" = "." ]
then
if [ "$export" = "$DIRECTORY" ]
then
NFS=1
break
fi
else
break
fi
DIRECTORY=`dirname "$DIRECTORY"`
done
done
fi
if [ "$NFS" = "1" ]
then
log INFO "Source directory is NFS exported. Disabling inotify."
return 1
else
log INFO "Source directory is NOT NFS exported. Enabling inotify."
return 0
fi
}
detect_inotify () {
if [ -e /usr/bin/inotifywait ] && [ ! "$INOTIFY" = "0" ] && detect_source_dir_nfs_exported
then
INOTIFY=1
else
INOTIFY=0
fi
}
process_arguments () {
#
# Process any command-line options that are specified."
#
if [ "$#" = "0" ]
then
showusage_short
exit 1
fi
while [ $# -gt 0 ]
do
case $1 in
--config|-C )
CONFIG="$2"
is_var_empty "$CONFIG"
if [ "$MODE" == "config" ]
then
if [ -e "$CONFIG" ]
then
echo "Do want to overwrite existing config file? [y/n]"
read yn
if [ "$yn" == "y" ] || [ "$yn" == "yes" ]
then
rm "$CONFIG"
else
echo "Aborting..."
cleanup
exit 1
fi
fi
fi
if [ ! "$MODE" == "config" ]
then
source $CONFIG
fi
if [ ! -z "$SSH_KEY" ]
then
SSH_KEY="-i $SSH_KEY"
fi
if [ ! -e "./known_hosts" ]
then
if [ -e $SSH_KNOWN_HOSTS ]
then
if [ ! "$SSH_KNOWN_HOSTS" == "known_hosts" ]
then
cat $SSH_KNOWN_HOSTS > ./known_hosts
fi
else
echo "File $SSH_KNOWN_HOSTS does not exist."
exit 1
fi
fi
shift 2 ;;
--working-dir|-w )
PPSS_DIR="$2"
add_var_to_config PPSS_DIR "$PPSS_DIR"
shift 2 ;;
--node|-n )
NODES_FILE="$2"
add_var_to_config NODES_FILE "$NODES_FILE"
shift 2 ;;
--sourcefile|-f )
INPUT_FILE="$2"
is_var_empty "$INPUT_FILE"
add_var_to_config INPUT_FILE "$INPUT_FILE"
shift 2 ;;
--sourcedir|-d )
SRC_DIR="$2"
is_var_empty "$SRC_DIR"
add_var_to_config SRC_DIR "$SRC_DIR"
shift 2 ;;
--delay|-D)
MAX_DELAY="$2"
add_var_to_config MAX_DELAY "$MAX_DELAY"
shift 2 ;;
--disable-inotify)
INOTIFY=0
add_var_to_config INOTIFY "$INOTIFY"
shift 1 ;;
--enable-input-lock)
ENABLE_INPUT_LOCK=1
add_var_to_config ENABLE_INPUT_LOCK "$ENABLE_INPUT_LOCK"
shift 1 ;;
--daemon)
DAEMON="1"
QUIET="1"
detect_inotify
add_var_to_config DAEMON "$DAEMON"
add_var_to_config QUIET "$QUIET"
add_var_to_config INOTIFY "$INOTIFY"
shift 1 ;;
--interval)
is_var_empty "$2"
DAEMON_POLLING_INTERVAL="$2"
add_var_to_config DAEMON_POLLING_INTERVAL "$DAEMON_POLLING_INTERVAL"
shift 2 ;;
--file-age)
is_var_empty "$2"
add_var_to_config DAEMON_FILE_AGE "$DAEMON_FILE_AGE"
shift 2 ;;
--email|-e)
is_var_empty "$2"
EMAIL="$2"
add_var_to_config EMAIL "$EMAIL"
shift 2 ;;
--command|-c )
COMMAND="$2"
is_var_empty "$COMMAND"
if [ "$MODE" == "config" ]
then
COMMAND=\'$COMMAND\'
add_var_to_config COMMAND "$COMMAND"
fi
shift 2 ;;
-h )
showusage_normal
exit 1 ;;
--help)
showusage_long
exit 1 ;;
--homedir|-H )
is_var_empty "$2"
PPSS_HOME_DIR="$2"
add_var_to_config PPSS_DIR $PPSS_HOME_DIR
shift 2 ;;
--disable-ht|-j )
HYPERTHREADING=no
add_var_to_config HYPERTHREADING $HYPERTHREADING
shift 1 ;;
--log|-l )
LOGFILE="$2"
add_var_to_config LOGFILE "$LOGFILE"
shift 2 ;;
--no-recursion|-r )
RECURSION="0"
add_var_to_config LOGFILE "$RECURSION"
shift 1 ;;
--workingdir|-w )
WORKINGDIR="$2"
add_var_to_config WORKINGDIR "$WORKINGDIR"
shift 2 ;;
--md5|-M )
USE_MD5="1"
add_var_to_config USE_MD5 "$USE_MD5"
shift 1 ;;
--key|-k )
SSH_KEY="$2"
is_var_empty "$SSH_KEY"
add_var_to_config SSH_KEY "$SSH_KEY"
if [ ! -z "$SSH_KEY" ]
then
SSH_KEY="-i $SSH_KEY"
fi
shift 2 ;;
--known-hosts | -K )
SSH_KNOWN_HOSTS="$2"
add_var_to_config SSH_KNOWN_HOSTS "$SSH_KNOWN_HOSTS"
shift 2 ;;
--no-scp |-b )
SECURE_COPY=0
add_var_to_config SECURE_COPY "$SECURE_COPY"
shift 1 ;;
--randomize |-R )
RANDOMIZE=1
add_var_to_config RANDOMIZE "$RANDOMIZE"
shift 1 ;;
--outputdir|-o )
REMOTE_OUTPUT_DIR="$2"
add_var_to_config REMOTE_OUTPUT_DIR "$REMOTE_OUTPUT_DIR"
shift 2 ;;
--processes|-p )
is_var_empty "$2"
MAX_NO_OF_RUNNING_JOBS="$2"
add_var_to_config MAX_NO_OF_RUNNING_JOBS "$MAX_NO_OF_RUNNING_JOBS"
shift 2 ;;
--master|-m )
SSH_SERVER="$2"
add_var_to_config SSH_SERVER "$SSH_SERVER"
shift 2 ;;
--script|-S )
SCRIPT="$2"
add_var_to_config SCRIPT "$SCRIPT"
shift 2 ;;
--download)
DOWNLOAD_TO_NODE="1"
add_var_to_config DOWNLOAD_TO_NODE "$DOWNLOAD_TO_NODE"
shift 1 ;;
--upload)
if [ -z "$REMOTE_OUTPUT_DIR" ]
then
echo "ERROR: no server-side output directory specified with -o"
exit 1
fi
UPLOAD_TO_SERVER="1"
add_var_to_config UPLOAD_TO_SERVER "$UPLOAD_TO_SERVER"
shift 1 ;;
--quiet|-q )
QUIET="1"
add_var_to_config QUIET "$QUIET"
shift 1 ;;
--user|-u )
USER="$2"
add_var_to_config USER "$USER"
shift 2 ;;
--version|-v )
echo ""
echo "$SCRIPT_NAME version $SCRIPT_VERSION"
echo ""
exit 0 ;;
* )
showusage_short
echo
echo "Unknown option $1 "
echo
exit 1 ;;
esac
done
if [ -z "$SRC_DIR" ] && [ -z "$INPUT_FILE" ]
then
showusage_short
echo
echo "No source file or directory specified with -f or -d."
exit 1
fi
if [ ! -e "$SRC_DIR" ] && [ -z "$MODE" ] && [ -z "$INPUT_FILE" ]
then
showusage_short
echo
echo "Source directory $SRC_DIR does not exist."
exit 1
fi
if [ "$SRC_DIR" == "." ]
then
echo
echo "PPSS is not designed to process items from within the directory"
echo "it is being run. PPSS will start to process its own files from"
echo "its working directory $PPSS_DIR which is probably not wat you"
echo "want. Are you sure you want to continue?"
echo
read YN
if [ ! "$YN" == "y" ] || [ ! "$YN" == "Y" ]
then
exit 1
fi
fi
if [ "$DAEMON" == "1" ] && [ -z "$SRC_DIR" ]
then
showusage_short
echo
echo "Daemon monitors a specified directory (with the -d option) for files to process."
echo "Read the on-line manual for more information."
exit 1
fi
}
display_header () {
log DSPLY ""
log DSPLY "========================================================="
log DSPLY " |P|P|S|S| "
log DSPLY "$SCRIPT_NAME vers. $SCRIPT_VERSION"
log DSPLY "========================================================="
log DSPLY "Hostname:\t\t$HOSTNAME"
log DSPLY "---------------------------------------------------------"
}
create_working_directory () {
if [ ! -e "$PPSS_DIR" ]
then
mkdir -p "$PPSS_DIR"
fi
}
expand_str () {
STR=$1
LENGTH=$TYPE_LENGTH
SPACE=" "
while [ "${#STR}" -lt "$LENGTH" ]
do
STR=$STR$SPACE
done
echo "$STR"
}
are_we_sourced () {
RES=`basename $SOURCED`
if [ "$RES" = "ppss" ]
then
return 1
else
return 0
fi
}
get_time_in_seconds () {
if [ "$ARCH" == "SunOS" ]
then
#
# Dirty hack because this ancient operating system does not support +%s...
#
THE_TIME=`truss /usr/bin/date 2>&1 | grep ^time | awk '{ print $3 }'`
else
THE_TIME="$(date +%s)"
fi
echo "$THE_TIME"
}
set_md5 () {
if [ "$USE_MD5" == "1" ]
then
log DEBUG "MD5 is used."
case $ARCH in
"Darwin") MD5=md5 ;;
"FreeBSD") MD5=md5 ;;
"SunOS") MD5="digest -a md5" ;;
"Linux") MD5=md5sum ;;
esac
echo "test" | $MD5 > /dev/null 2>&1
if [ ! "$?" ]
then
LOG ERROR "ERROR - PPSS requires $MD5. It may not be within the path or installed."
return 1
else
return 0
fi
else
log DEBUG "MD5 is not used."
fi
}
set_stat () {
if [ "$DAEMON" = "1" ] && [ "$INOTIFY" = "0" ]
then
case $ARCH in
"Darwin") STAT="stat -f%m" ;;
"FreeBSD") STAT="stat -f%m" ;;
"SunOS") STAT="gstat -c%Y" ;;
"Linux") STAT="stat -c%Y" ;;
esac
$STAT . >> /dev/null 2>&1
if [ ! "$?" ]
then
LOG ERROR "ERROR - PPSS daemon mode requires stat. It may not be within the path or installed."
return 1
else
return 0
fi
else
return 0
fi
}
log () {
#
# Type 'DSPLY ERROR and WARN' is logged to the screen
# Any other log-type is only logged to the logfile.
#
TYPE="$1"
MESG="$2"
TYPE_LENGTH=5
#
# Performance hack. Don't go through all the code if not required.
#
if [ "$TYPE" = "DEBUG" ] && [ "$PPSS_DEBUG" == "0" ]
then
return
fi
TYPE_EXP=`expand_str "$TYPE"`
DATE=`date +%b\ %d\ %H:%M:%S`
PREFIX="$DATE: ${TYPE_EXP:0:$TYPE_LENGTH}"
PREFIX_SMALL="$DATE: "
if [ ! "$TYPE" = "ERROR" ]
then
ECHO_MSG="$PREFIX_SMALL $MESG"
else
ECHO_MSG="$PREFIX_SMALL [ERROR] $MESG"
fi
LOG_MSG="$PREFIX $MESG"
if [ ! -z "$PPSS_DEBUG" ] && [ ! "$PPSS_DEBUG" == "0" ]
then
echo -e "$LOG_MSG" >> "$LOGFILE"
elif [ "$TYPE" == "INFO" ] || [ "$TYPE" == "ERROR" ] || [ "$TYPE" == "WARN" ] || [ "$TYPE" == "DSPLY" ]
then
echo -e "$LOG_MSG" >> "$LOGFILE"
fi
if [ "$TYPE" == "DSPLY" ] || [ "$TYPE" == "ERROR" ] || [ "$TYPE" == "WARN" ] && [ "$QUIET" == "0" ]
then
echo -e "$ECHO_MSG"
elif [ "$TYPE" == "ERROR" ] && [ "$QUIET" == "1" ]
then
echo -e "$ECHO_MSG"
fi
if [ "$TYPE" == "PRCNT" ]
then
echo -en "\r$ECHO_MSG"
#echo "$ECHO_MSG" # for debugging.
fi
}
init_vars () {
#
# Get start time to measure how long PPSS has been running.
#
START_PPSS=`get_time_in_seconds`
#
# Check if MD5(SUM) is present on the system.
#
set_md5
#
# Chec if stat is present and works on the system if daemon mode is enabled.
#
set_stat
#
# Is PPSS run as a daemon? Then use input locking, which is not required otherwise.
#
if [ "$DAEMON" == "1" ]
then
INPUT_LOCK="$SRC_DIR/INPUT_LOCK"
fi
#
# For some strange reason, this value differ on different operating systems due to
# different behaviour betwen the ps utilily acros operating systems.
#
if [ "$ARCH" == "Darwin" ]
then
MIN_JOBS=4
elif [ "$ARCH" == "Linux" ]
then
MIN_JOBS=3
fi
FIFO="$PPSS_DIR"/ppss-fifo-$RANDOM-$RANDOM
FIFO_LISTENER="$PPSS_DIR"/ppss-fifo-listener-$RANDOM-$RANDOM
if [ ! -e "$FIFO" ]
then
mkfifo -m 600 $FIFO
fi
if [ ! -e "$FIFO_LISTENER" ]
then
mkfifo -m 600 $FIFO_LISTENER
fi
exec 42<> $FIFO
exec 43<> $FIFO_LISTENER
set_status "RUNNING"
if [ -e "$CPUINFO" ]
then
CPU=`cat $CPUINFO | grep 'model name' | cut -d ":" -f 2 | sed -e s/^\ //g | sort | uniq`
log DSPLY "CPU: $CPU"
elif [ "$ARCH" == "Darwin" ]
then
MODEL=`system_profiler SPHardwareDataType | grep "Processor Name" | cut -d ":" -f 2`
SPEED=`system_profiler SPHardwareDataType | grep "Processor Speed" | cut -d ":" -f 2`
log DSPLY "CPU: $MODEL $SPEED"
elif [ "$ARCH" == "SunOS" ]
then
CPU=`psrinfo -v | grep MHz | cut -d " " -f 4,8 | awk '{ printf ("Processor architecture: %s @ %s MHz.\n", $1,$2) }' | head -n 1`
log DSPLY "$CPU"
else
log DSPLY "CPU: Cannot determine. Provide a patch for your arch!"
log DSPLY "Arch is $ARCH"
fi
if [ -z "$MAX_NO_OF_RUNNING_JOBS" ]
then
get_no_of_cpus $HYPERTHREADING
fi
if [ ! -z "$SSH_SERVER" ]
then
does_file_exist "$PPSS_HOME_DIR/$JOB_LOG_DIR"
if [ ! "$?" = "0" ]
then
log DEBUG "Remote Job log directory $PPSS_HOME_DIR/$JOB_lOG_DIR does not exist. Creating."
exec_cmd "mkdir $PPSS_HOME_DIR/$JOB_LOG_DIR"
fi
fi
if [ ! -e "$JOB_LOG_DIR" ]
then
mkdir -p "$JOB_LOG_DIR"
fi
if [ ! -z "$SSH_SERVER" ]
then
ITEM_LOCK_DIR="$PPSS_HOME_DIR/$ITEM_LOCK_DIR"
fi
does_file_exist "$ITEM_LOCK_DIR"
if [ ! "$?" = "0" ]
then
if [ ! -z "$SSH_SERVER" ]
then
log DEBUG "Creating remote item lock dir."
else
log DEBUG "Creating local item lock dir."
fi
exec_cmd "mkdir $ITEM_LOCK_DIR"
if [ ! "$?" ]
then
log DEBUG "Failed to create item lock dir."
fi
fi
if [ ! -z "$SSH_SERVER" ]
then
does_file_exist "$REMOTE_OUTPUT_DIR"
if [ ! "$?" = "0" ]
then
log DEBUG "Remote output dir $REMOTE_OUTPUT_DIR does not exist."
exec_cmd "mkdir $REMOTE_OUTPUT_DIR"
fi
fi
if [ ! -e "$PPSS_LOCAL_TMPDIR" ]
then
mkdir "$PPSS_LOCAL_TMPDIR"
fi
if [ ! -e "$PPSS_LOCAL_OUTPUT" ]
then
mkdir "$PPSS_LOCAL_OUTPUT"
fi
if [ ! -e "$PPSS_NODE_STATUS" ]
then
mkdir -p "$PPSS_NODE_STATUS"
fi
}
upload_status () {
#log DEBUG "scp $SSH_OPTS $SSH_KEY $NODE_STATUS_FILE $USER@$SSH_SERVER:$PPSS_HOME_DIR/$PPSS_NODE_STATUS/"
# scp -v $SSH_OPTS $SSH_KEY $NODE_STATUS_FILE $USER@$SSH_SERVER:$PPSS_HOME_DIR/$PPSS_NODE_STATUS/ >> scp.tmp 2>&1
if [ -e "$NODE_STATUS_FILE" ]
then
scp -vv -o GlobalKnownHostsFile=./known_hosts -i ppss-key.dsa $NODE_STATUS_FILE $USER@$SSH_SERVER:$PPSS_HOME_DIR/$PPSS_NODE_STATUS/ >> scp.tmp 2>&1
if [ "$?" == "0" ]
then
log DEBUG "Uploaded status to server ok."
else
log DEBUG "Uploaded status to server failed."
fi
else
log DEBUG "Status file not found thus not uploaded."
fi
}
set_status () {
if [ ! -z "$SSH_SERVER" ]
then
STATUS="$1"
if [ -e "$LIST_OF_PROCESSED_ITEMS" ]
then
NO_PROCESSED=$(wc -l "$LIST_OF_PROCESSED_ITEMS" | awk '{ print $1 }' )
else
NO_PROCESSED="0"
fi
NODE=`cat $PPSS_DIR/$NODE_ID`
FAILED="$2"
if [ -z "$FAILED" ]
then
FAILED=0
fi
echo "$NODE $HOSTNAME $STATUS $NO_PROCESSED" "$FAILED" > "$NODE_STATUS_FILE"
upload_status
fi
}
check_status () {
ERROR="$1"
FUNCTION="$2"
MESSAGE="$3"
if [ ! "$ERROR" == "0" ]
then
log DSPLY "$FUNCTION - $MESSAGE"
set_status ERROR
cleanup
exit "$ERROR"
fi
}
erase_ppss () {
SSH_SOCKET="ppss_ssh_socket-$NODE"
SSH_OPTS_NODE="-o BatchMode=yes -o ControlPath=$SSH_SOCKET \
-o GlobalKnownHostsFile=./known_hosts \
-o ControlMaster=auto \
-o Cipher=blowfish \
-o ConnectTimeout=5 "
echo "Are you realy sure you want to erase PPSS from all nodes!? (YES/NO)"
read YN
if [ "$YN" == "yes" ] || [ "$YN" == "YES" ]
then
for NODE in `cat $NODES_FILE`
do
log DSPLY "Erasing PPSS homedir $PPSS_HOME_DIR from node $NODE."
ssh -q $SSH_KEY $SSH_OPTS_NODE $USER@$NODE "rm -rf $PPSS_HOME_DIR"
done
else
log DSPLY "Aborting.."
fi
}
stack_push_tmp () {
TMP1="$1"
if [ -z "$TMP_STACK" ]
then
TMP_STACK="$TMP1"
else
TMP_STACK="$TMP_STACK"$'\n'"$TMP1"
fi
}
stack_push () {
line="$1"
if [ -z "$STACK" ]
then
STACK="$line"
else
STACK="$line"$'\n'"$STACK"
fi
}
unprocessed_stack_push () {
line="$1"
if [ -z "$PROCESSED_ITEMS" ]
then
UNPROCESSED_ITEMS="$line"
else
UNPROCESSED_ITEMS="$line"$'\n'"$UNPROCESSED_ITEMS"
fi
}
processed_stack_push () {
line="$1"
if [ -z "$PROCESSED_ITEMS" ]
then
PROCESSED_ITEMS="$line"
else
PROCESSED_ITEMS="$line"$'\n'"$PROCESSED_ITEMS"
fi
}
stack_pop () {
TMP_STACK=""
i=0
tmp=""
for x in $STACK
do
if [ "$i" = "0" ]
then
tmp="$x"
else
stack_push_tmp "$x"
fi
((i++))
done
STACK="$TMP_STACK"
REGISTER="$tmp"
if [ -z "$REGISTER" ]
then
return 1
else
return 0
fi
}
is_screen_installed () {
if [ "$DISABLE_SCREEN_TEST" == "1" ]
then
return 0
fi
NODE="$1"
ssh -q $SSH_OPTS_NODE $SSH_KEY $USER@$NODE "screen -m -D -S test ls" > /dev/null 2>&1
if [ ! "$?" == "0" ]
then
log ERROR "The 'Screen' command may not be installed on node $NODE."
log ERROR "Or some other SSH related error occurred."
return 1
else
log DEBUG "'Screen' is installed on node $NODE."
fi
}
deploy () {
NODE="$1"
SSH_SOCKET="ppss_ssh_socket-$NODE"
SSH_OPTS_NODE="-o BatchMode=yes -o ControlPath=$SSH_SOCKET \
-o GlobalKnownHostsFile=./known_hosts \
-o ControlMaster=auto \
-o Cipher=blowfish \
-o ConnectTimeout=5 "
SSH_OPTS_SLAVE="-o BatchMode=yes -o ControlPath=$SSH_SOCKET \
-o GlobalKnownHostsFile=./known_hosts \
-o ControlMaster=no \
-o Cipher=blowfish \
-o ConnectTimeout=5 "
ERROR=0
set_error () {
if [ "$ERROR" == "1" ]
then
ERROR=1
elif [ ! "$1" == "0" ]
then
ERROR=1
fi
}
if [ ! -e "$SSH_SOCKET" ]
then
ssh -q -N $SSH_OPTS_NODE $SSH_KEY $USER@$NODE &
SSH_PID=$!
fi
is_screen_installed "$NODE"
KEY=`echo $SSH_KEY | cut -d " " -f 2`
ssh -q $SSH_OPTS_SLAVE $SSH_KEY $USER@$NODE "cd ~ && mkdir -p $PPSS_HOME_DIR && mkdir -p $PPSS_HOME_DIR/$JOB_LOG_DIR && mkdir -p $PPSS_HOME_DIR/ITEM_LOCK_DIR >> /dev/null 2>&1"
set_error $?
ssh -q $SSH_OPTS_SLAVE $SSH_KEY $USER@$NODE "cd ~ && cd $PPSS_HOME_DIR && cd $PPSS_DIR && echo $NODE > $NODE_ID"
set_error $?
scp -q $SSH_OPTS_SLAVE $SSH_KEY $0 $USER@$NODE:~/$PPSS_HOME_DIR
set_error $?
scp -q $SSH_OPTS_SLAVE $SSH_KEY $KEY $USER@$NODE:~/$PPSS_HOME_DIR
set_error $?
scp -q $SSH_OPTS_SLAVE $SSH_KEY $CONFIG $USER@$NODE:~/$PPSS_HOME_DIR
set_error $?
scp -q $SSH_OPTS_SLAVE $SSH_KEY known_hosts $USER@$NODE:~/$PPSS_HOME_DIR
set_error $?
if [ ! -z "$SCRIPT" ]
then
scp -q $SSH_OPTS_SLAVE $SSH_KEY $SCRIPT $USER@$NODE:~/$PPSS_HOME_DIR
set_error $?
fi
if [ ! -z "$INPUT_FILE" ]
then
scp -q $SSH_OPTS_SLAVE $SSH_KEY $INPUT_FILE $USER@$NODE:~/$PPSS_HOME_DIR
set_error $?
fi
if [ "$ERROR" == "0" ]
then
log DSPLY "PPSS installed on node $NODE."
else
log DSPLY "PPSS failed to install on $NODE."
fi
kill $SSH_PID
}
deploy_ppss () {
if [ -z "$NODES_FILE" ] || [ ! -e "$NODES_FILE" ]
then
log ERROR "No file containing list of nodes missing / not specified."
set_status ERROR
cleanup
exit 1
fi
exec_cmd "mkdir -p $PPSS_HOME_DIR/$PPSS_NODE_STATUS"
KEY=`echo $SSH_KEY | cut -d " " -f 2`
if [ -z "$KEY" ] || [ ! -e "$KEY" ]
then
log ERROR "Private SSH key $KEY not found."
set_status "ERROR"
cleanup
exit 1
fi
if [ ! -e "$SCRIPT" ] && [ ! -z "$SCRIPT" ]
then
log ERROR "Script $SCRIPT not found."
set_status "ERROR"
cleanup
exit 1
fi
INSTALLED_ON_SSH_SERVER=0
for NODE in `cat $NODES_FILE`
do
deploy "$NODE" &
if [ "$ARCH" == "SunOS" ]
then
sleep 1
else
sleep 0.1
fi
if [ "$NODE" == "$SSH_SERVER" ]
then
INSTALLED_ON_SSH_SERVER=1
fi
done
if [ "$INSTALLED_ON_SSH_SERVER" == "0" ]
then
log DEBUG "SSH SERVER $SSH_SERVER is not a node."
else
log DEBUG "SSH SERVER $SSH_SERVER is also a node."
fi
}
start_ppss_on_node () {
NODE="$1"
log DSPLY "Starting PPSS on node $NODE."
ssh $SSH_KEY $USER@$NODE -o ConnectTimeout=5 -o GlobalKnownHostsFile=./known_hosts "cd $PPSS_HOME_DIR ; screen -d -m -S PPSS ~/$PPSS_HOME_DIR/$0 node --config ~/$PPSS_HOME_DIR/$CONFIG"
if [ ! "$?" == "0" ]
then
log ERROR "PPSS failed to start on node $NODE."
fi
}
init_ssh_server_socket () {
if [ ! -e "$SSH_SOCKET" ]
then
DIR=`dirname $SSH_SOCKET`
mkdir -p "$DIR"
fi
}
test_server () {
# Testing if the remote server works as expected.
if [ ! -z "$SSH_SERVER" ]
then
init_ssh_server_socket
exec_cmd "date >> /dev/null"
check_status "$?" "$FUNCNAME" "Server $SSH_SERVER could not be reached"
ssh -N -M $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER &
SSH_MASTER_PID="$!"
log DEBUG "SSH Master pid is $SSH_MASTER_PID"
log INFO "Connected to server: $SSH_SERVER"
does_file_exist "$PPSS_HOME_DIR/$PPSS_DIR"
if [ ! "$?" = "0" ] && [ ! -z "$SSH_SERVER" ]
then
log DEBUG "Remote PPSS home directory $PPSS_HOME_DIR/$PPSS_DIR does not exist. Creating."
exec_cmd "mkdir -p $PPSS_HOME_DIR/$PPSS_DIR"
fi
else
log DEBUG "No remote server specified, assuming stand-alone mode."
fi
}
get_no_of_cpus () {
# Use hyperthreading or not?
HPT=$1
NUMBER=""
if [ -z "$HPT" ]
then
HPT=yes
fi
got_cpu_info () {
ERROR="$1"
check_status "$ERROR" "$FUNCNAME" "cannot determine number of cpu cores. Specify with -p."
}
if [ "$HPT" == "yes" ]
then
if [ "$ARCH" == "Linux" ]
then
NUMBER=`grep -c ^processor $CPUINFO`
got_cpu_info "$?"
elif [ "$ARCH" == "Darwin" ]
then
NUMBER=`sysctl -a hw | grep -w logicalcpu | awk '{ print $2 }'`
got_cpu_info "$?"
elif [ "$ARCH" == "FreeBSD" ]
then
NUMBER=`sysctl hw.ncpu | awk '{ print $2 }'`
got_cpu_info "$?"
elif [ "$ARCH" == "SunOS" ]
then
NUMBER=`psrinfo | grep -c on-line`
got_cpu_info "$?"
else
if [ -e "$CPUINFO" ]
then
NUMBER=`grep -c ^processor $CPUINFO`
got_cpu_info "$?"
fi
fi
if [ ! -z "$NUMBER" ]
then
log DSPLY "Found $NUMBER logic processors."
fi
elif [ "$HPT" == "no" ]
then
log DSPLY "Hyperthreading is disabled."
if [ "$ARCH" == "Linux" ]
then
PHYSICAL=`grep 'physical id' $CPUINFO`
if [ "$?" ]
then
PHYSICAL=`grep 'physical id' $CPUINFO | sort | uniq | wc -l`
if [ "$PHYSICAL" == "1" ]
then
log DSPLY "Found $PHYSICAL physical CPU."
else
log DSPLY "Found $PHYSICAL physical CPUs."
fi
TMP=`grep 'core id' $CPUINFO`
if [ "$?" ]
then
log DEBUG "Starting job only for each physical core on all physical CPU(s)."
NUMBER=`grep 'core id' $CPUINFO | sort | uniq | wc -l`
log DSPLY "Found $NUMBER physical cores."
else
log DSPLY "Single core processor(s) detected."
log DSPLY "Starting job for each physical CPU."
NUMBER=$PHYSICAL
fi
else
log INFO "No 'physical id' section found in $CPUINFO, typical for older cpus."
NUMBER=`grep -c ^processor $CPUINFO`
got_cpu_info "$?"
fi
elif [ "$ARCH" == "Darwin" ]
then
NUMBER=`sysctl -a hw | grep -w physicalcpu | awk '{ print $2 }'`
got_cpu_info "$?"
elif [ "$ARCH" == "FreeBSD" ]
then
NUMBER=`sysctl hw.ncpu | awk '{ print $2 }'`
got_cpu_info "$?"
else
NUMBER=`cat $CPUINFO | grep "cpu cores" | cut -d ":" -f 2 | uniq | sed -e s/\ //g`
got_cpu_info "$?"
fi
fi
if [ ! -z "$NUMBER" ]
then
MAX_NO_OF_RUNNING_JOBS=$NUMBER
else
log ERROR "Number of CPUs not obtained."
log ERROR "Please specify manually with -p."
set_status "ERROR"
exit 1
fi
}
random_delay () {
ARGS="$1"
if [ -z "$ARGS" ]
then
log ERROR "$FUNCNAME Function random delay, no argument specified."
set_status "ERROR"
exit 1
fi
NUMBER=$RANDOM
let "NUMBER %= $ARGS"
sleep "$NUMBER"
}
escape_item () {
TMP="$1"
ITEM_ESCAPED=`echo "$TMP" | \
sed s/\\ /\\\\\\\\\\\\\\ /g | \
sed s/\\'/\\\\\\\\\\\\\\'/g | \
sed s/\\|/\\\\\\\\\\\\\\|/g | \
sed s/\&/\\\\\\\\\\\\\\&/g | \
sed s/\;/\\\\\\\\\\\\\\;/g | \
sed s/\>/\\\\\\\\\\>/g | \
sed s/\</\\\\\\\\\\</g | \
sed s/\(/\\\\\\\\\\(/g | \
sed s/\)/\\\\\\\\\\)/g `
}
download_item () {
if [ ! "$DOWNLOAD_TO_NODE" == "1" ] || [ "$VIRTUAL" == "1" ]
then
return 1
fi
ITEM="$1"
VIRTUAL="0"
ERR_STATE="0"
if [ "$RECURSION" = "1" ]
then
escape_item "$ITEM"
does_file_exist "$ITEM_ESCAPED"
ERR_STATE="$?"
DOWNLOAD_ITEM="$ITEM"
LOCAL_DIR=`dirname "$DOWNLOAD_ITEM"`
else
escape_item "$ITEM"
does_file_exist "$SRC_DIR/$ITEM_ESCAPED"
ERR_STATE="$?"
DOWNLOAD_ITEM="$SRC_DIR/$ITEM"
fi
if [ "$ERR_STATE" == "0" ]
then
log DEBUG "$FUNCNAME Remote item $ITEM exists"
VIRTUAL=0
else
log DEBUG "$FUNCNAME Remote item $ITEM does NOT exist"
VIRTUAL=1
fi
if [ "$DOWNLOAD_TO_NODE" == "1" ] && [ "$VIRTUAL" == "0" ]
then
log DEBUG "Transfering item $ITEM from source to local disk."
if [ "$SECURE_COPY" == "1" ] && [ ! -z "$SSH_SERVER" ]
then
if [ "$RECURSION" == "1" ]
then
escape_item "$DOWNLOAD_ITEM"
mkdir -p "$PPSS_LOCAL_TMPDIR/$LOCAL_DIR"
log DEBUG "$SSH_SERVER:$ITEM_ESCAPED $PPSS_LOCAL_TMPDIR/$LOCAL_DIR"
scp -q $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER:"$ITEM_ESCAPED" ./$PPSS_LOCAL_TMPDIR/"$LOCAL_DIR"
log DEBUG "Exit code of remote transfer is $?"
else
escape_item "$DOWNLOAD_ITEM"
log DEBUG "$SSH_SERVER:$ITEM_ESCAPED $PPSS_LOCAL_TMPDIR"
scp -q $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER:"$ITEM_ESCAPED" $PPSS_LOCAL_TMPDIR
log DEBUG "Exit code of remote transfer is $?"
fi
else
cp "$ITEM" $PPSS_LOCAL_TMPDIR
log DEBUG "Exit code of local transfer is $?"
fi
else
log DEBUG "No transfer of item $ITEM to local workpath."
fi
}
upload_item () {
if [ ! "$UPLOAD_TO_SERVER" == "1" ]
then
log DEBUG "Upload to server is disabled."
return 1
fi
OUTPUT_ITEM="$1"
ITEMDIR="$2"
log DEBUG "Uploading item $OUTPUT_ITEM."
if [ "$SECURE_COPY" == "1" ]
then
if [ "$RECURSION" = "1" ]
then
escape_item "$REMOTE_OUTPUT_DIR$ITEMDIR"
else
escape_item "$REMOTE_OUTPUT_DIR"
fi
DIR_ESCAPED="$ITEM_ESCAPED"
exec_cmd "mkdir -p $DIR_ESCAPED"
scp -q $SSH_OPTS $SSH_KEY "$OUTPUT_ITEM"/* $USER@$SSH_SERVER:"$DIR_ESCAPED"
ERROR="$?"
if [ ! "$ERROR" == "0" ]
then
log ERROR "Uploading of $OUTPUT_ITEM via SCP failed."
else
log DEBUG "Upload of item $OUTPUT_ITEM success"
rm -rf ./"$OUTPUT_ITEM"
fi
else
cp "$OUTPUT_ITEM" "$ITEMDIR"
ERROR="$?"
if [ ! "$ERROR" == "0" ]
then
log DEBUG "ERROR - uploading of $OUTPUT_ITEM vi CP failed."
fi
fi
}
lock_item () {
if [ "$INOTIFY" = "1" ] && [ "$DAEMON" = "1" ]
then
#
# In daemon mode, there is no risk that processes try to process
# the same item. Therefore, locking is not required.
#
return 0
else
ITEM="$1"
if [ "$USE_MD5" == "1" ]
then
LOCK_FILE_NAME=`echo "$ITEM" | $MD5 | awk '{ print $1 }'`
else
LOCK_FILE_NAME=`echo "$ITEM" | sed s/[^[:alnum:]]/_/g`
fi
ITEM_LOCK_FILE="$ITEM_LOCK_DIR/$LOCK_FILE_NAME"
log DEBUG "Locking item $ITEM_LOCK_FILE"
exec_cmd "mkdir $ITEM_LOCK_FILE >> /dev/null 2>&1"
ERROR="$?"
return "$ERROR"
fi
}
get_input_lock () {
while true
do
exec_cmd "mkdir $INPUT_LOCK >> /dev/null 2>&1 "
if [ "$?" ]
then
log DEBUG "Input lock is obtained..."
break
else
log DEBUG "Input lock is present...sleeping.."
sleep 5
fi
done
}
release_input_lock () {
exec_cmd "rm -rf $INPUT_LOCK"
if [ "$?" ]
then
log DEBUG "Input lock was released..."
return 0
else
log ERROR "Input lock was already gone...this should never happen..."
return 1
fi
}
list_all_input_items () {
oldIFS=$IFS # save the field separator
IFS=$'\n' # new field separator, the end of line
while read line
do
echo "$line"
done < "$LISTOFITEMS"
IFS="$oldIFS"
}
remove_processed_items_from_input_file () {
#
# This function removes all items that have already been processed.
# Processed items have a lock dir in the PPPSS_ITEM_LOCK_DIR.
#
UNPROCESSED_ITEMS=""
if [ -e "$LIST_OF_PROCESSED_ITEMS" ]
then
PROCESSED_ITEMS=`cat $LIST_OF_PROCESSED_ITEMS`
fi
log DEBUG "Running $FUNCNAME"
if [ -z "$PROCESSED_ITEMS" ]
then
log DEBUG "Variable processed_items is empty."
return 1
fi
if [ "$MODE" = "status" ]
then
log DEBUG "Mode is status."
return 1
fi
if [ ! -e "$LISTOFITEMS" ]
then
echo "$LISTOFITEMS does not exist!"
return 1
else
SIZE=`wc -l "$LISTOFITEMS"`
if [ "$SIZE" = "0" ]
then
echo "$LISTOFITEMS exists but is empty."
return 1
fi
fi
INPUTFILES=`list_all_input_items`
oldIFS=$IFS # save the field separator
IFS=$'\n' # new field separator, the end of line
log DEBUG "Now removing processed items from input."
for x in $INPUTFILES
do
FILE_IS_PROCESSED=0
for y in $PROCESSED_ITEMS
do
if [ "$y" = "$x" ]
then
FILE_IS_PROCESSED=1
fi
done
if [ "$FILE_IS_PROCESSED" = "0" ]
then
log DEBUG "ITEM $x is not processed."
unprocessed_stack_push "$x"
else
log DEBUG "ITEM $x is already processed!."
fi
done
IFS="$oldIFS"
echo "$UNPROCESSED_ITEMS" > "$LISTOFITEMS"
}
get_all_items () {
if [ "$DAEMON" == "1" ] && [ "$INOTIFY" = "0" ] && [ "$ENABLE_INPUT_LOCK" = "1" ]
then
get_input_lock
fi
GLOBAL_COUNTER=1
if [ -e "$LISTOFITEMS" ]
then
rm "$LISTOFITEMS"
fi
count=0
if [ -z "$INPUT_FILE" ]
then
if [ ! -z "$SSH_SERVER" ] # Are we running stand-alone or as a node?"
then
if [ "$RECURSION" == "1" ]
then
`exec_cmd "find $SRC_DIR/ ! -type d" > "$LISTOFITEMS"`
check_status "$?" "$FUNCNAME" "Could not list files within remote source directory."
else
log DEBUG "Recursion is disabled."
`exec_cmd "find $SRC_DIR/ -depth 1 ! -type d" > "$LISTOFITEMS"`
check_status "$?" "$FUNCNAME" "Could not list files within remote source directory."
fi
else
if [ -e "$SRC_DIR" ]
then
if [ "$RECURSION" == "1" ]
then
log DEBUG "Recursion is enabled."
`find "$SRC_DIR"/ ! -type d >> "$LISTOFITEMS"`
check_status "$?" "$FUNCNAME" "Could not list files within local source directory."
else
log DEBUG "Recursion is disabled."
`find "$SRC_DIR"/ -depth 1 ! -type d >> "$LISTOFITEMS"`
check_status "$?" "$FUNCNAME" "Could not list files within local source directory."
fi
if [ ! -e "$LISTOFITEMS" ]
then
log ERROR "Local input file is not created, something is wrong. Bug?"
set_status "ERROR"
cleanup
exit 1
fi
else
ITEMS=""
fi
fi
else # Using an input file as the source of our items or STDIN.
if [ ! -z "$SSH_SERVER" ] # Are we running stand-alone or as a slave?"
then
log DEBUG "Running as node, input file has been pushed (hopefully)."
fi
if [ ! -e "$INPUT_FILE" ] && [ ! "$INPUT_FILE" == "-" ]
then
log ERROR "Input file $INPUT_FILE does not exist."
set_status "ERROR"
cleanup
exit 1
fi
if [ ! "$INPUT_FILE" == "-" ]
then
cp "$INPUT_FILE" "$LISTOFITEMS"
check_status "$?" "$FUNCNAME" "Copy of input file failed!"
else
log DEBUG "Reading from stdin.."
while read LINE
do
echo "$LINE" >> "$LISTOFITEMS"
done
fi
if [ ! -e "$LISTOFITEMS" ]
then
log ERROR "Input is empty."
infanticide
terminate_listener
cleanup
fi
fi
if [ "$RANDOMIZE" == "1" ] && [ "$MODE" != "status" ]
then
log DEBUG "Randomizing input file."
IFS_BACK="$IFS"
IFS=$'\n'
TMP_FILE="$PPSS_DIR/TMP-$RANDOM$RANDOM.txt"
for i in `cat $LISTOFITEMS`; do echo "$RANDOM $i"; done | sort | sed -E 's/^[0-9]+ //' > "$TMP_FILE"
mv "$TMP_FILE" "$LISTOFITEMS"
IFS="$IFS_BACK"
else
log DEBUG "Randomisation of input file disabled."
fi
remove_processed_items_from_input_file
if [ "$DAEMON" == "1" ]
then
release_input_lock
fi
SIZE_OF_INPUT=$(wc -l "$LISTOFITEMS" | awk '{ print $1 }')
if [ "$SIZE_OF_INPUT" -le "0" ] && [ "$DAEMON" = "0" ]
then
log ERROR "Source file/dir seems to be empty."
set_status "STOPPED"
cleanup
exit 1
fi
}
are_all_items_locked () {
SIZE="$1"
NUMBER=`exec_cmd "ls -1 $ITEM_LOCK_DIR | wc -l"`
log DEBUG "$NUMBER of $SIZE items are locked."
if [ "$NUMBER" -ge "$SIZE" ]
then
return 0
else
return 1
fi
}
get_item () {
check_for_interrupt
if [ "$STOP" == "1" ]
then
log DEBUG "Found stop signal."
return 1
fi
#
# Return error if list size is empty.
#
if [ -z "$SIZE_OF_INPUT" ]
then
log DEBUG "Got no size of input..."
return 1
fi
#
# Return error if the list is empty.
#
if [ "$SIZE_OF_INPUT" -le "0" ]
then
return 1
fi
#
# Check if all items have been processed.
#
if [ "$GLOBAL_COUNTER" -gt "$SIZE_OF_INPUT" ]
then
log DEBUG "Counter $GLOBAL_COUNTER is greater than sizeof input $SIZE_OF_INPUT."
return 1
fi
#
# Quit if all items have been locked.
#
if are_all_items_locked "$SIZE_OF_INPUT"
then
log DEBUG "All items have been locked."
return 1
else
log DEBUG "There are still unlocked items."
fi
ITEM="$(sed -n $GLOBAL_COUNTER\p $LISTOFITEMS)"
if [ -z "$ITEM" ]
then
log DEBUG "Item was emtpy..."
((GLOBAL_COUNTER++))
get_item
else
((GLOBAL_COUNTER++))
if [ ! -z "$SSH_SERVER" ] || [ "$LOCAL_LOCKING" = "1" ]
then
lock_item "$ITEM"
LOCK="$?"
if [ ! "$LOCK" = "0" ]
then
log DEBUG "Item $ITEM is locked."
get_item
else
log DEBUG "Got lock on $ITEM"
download_item "$ITEM"
return 0
fi
else
return 0
fi
fi
}
start_new_worker () {
#
# This function kicks the listener to start a worker process.
#
if ! are_we_sourced
then
echo "$START_KEY" >> "$FIFO"
return $?
fi
}
stop-ppss () {
STOP_PPSS=`get_time_in_seconds`
elapsed "$START_PPSS" "$STOP_PPSS"
log DSPLY "$PROCESSING_TIME"
}
elapsed () {
BEFORE="$1"
AFTER="$2"
ELAPSED="$(expr $AFTER - $BEFORE)"
REMAINDER="$(expr $ELAPSED % 3600)"
HOURS="$(expr $(expr $ELAPSED - $REMAINDER) / 3600)"
SECS="$(expr $REMAINDER % 60)"
MINS="$(expr $(expr $REMAINDER - $SECS) / 60)"
PROCESSING_TIME=$(printf "Total processing time (hh:mm:ss): %02d:%02d:%02d" $HOURS $MINS $SECS)
}
mail_on_error () {
ITEM="$1"
LOGFILE="$2"
if [ "$MAIL_ON_ERROR" = "1" ]
then
cat "$LOGFILE" | mail -s "$HOSTNAME - PPSS: procesing failed for item." "$EMAIL"
if [ "$?" = "0" ]
then
log DEBUG "Error mail sent."
else
log ERROR "Sending of error email failed."
fi
fi
}
commando () {
#
# This function will start a chain reaction of events.
#
# The commando executes a command on an item and, when finished,
# executes the start_new_worker. This function selects a new
# item and sends it to the fifo. The listener process receives
# the item and excutes this commando function on the item.
# So in essence, the commando function keeps calling itself
# indirectly until no items are left. This will form a single
# working queue. By executing multiple start_new_worker
# functions based on the CPU cores available, parallel processing
# is achieved, with a queue for each core.
#
ERR_STATE=0
VIRTUAL=0
#
# This code tests if the item exist (is physical or virtuel)
# Example: a file is physical, a URL is virtual.
#
ITEM="$1"
if [ "$RECURSION" == "1" ]
then
escape_item "$ITEM"
does_file_exist "$ITEM_ESCAPED"
ERR_STATE="$?"
else
escape_item "$ITEM"
does_file_exist "$SRC_DIR/$ITEM_ESCAPED"
ERR_STATE="$?"
fi
#
# If recursion is used, a file name of an item may not be unique.
# The same filename can be used for files in differen directories.
# Therefore, the output directory must reflect the original directory
# structure. If recursion is not used, this is not necessary.
#
if [ "$ERR_STATE" == "0" ]
then
VIRTUAL="0"
if [ "$RECURSION" == "1" ]
then
DIR_NAME=`dirname "$ITEM"`
ITEM_NO_PATH=`basename "$ITEM"`
OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$DIR_NAME"
else
DIR_NAME="$SRC_DIR"
ITEM_NO_PATH="$ITEM"
OUTPUT_DIR="$PPSS_LOCAL_OUTPUT"
fi
else
VIRTUAL="1"
DIR_NAME=""
ITEM_NO_PATH="$ITEM"
escape_item "$ITEM_NO_PATH"
OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$ITEM_ESCAPED"
fi
OUTPUT_FILE="$ITEM_NO_PATH"
#
# Decide if an item must be transfered from server to the node.
# or be processed in-place (NFS / SMB mount?)
#
if [ "$DOWNLOAD_TO_NODE" == "0" ]
then
if [ "$VIRTUAL" == "1" ]
then
log DEBUG "Item is virtual, thus not downloading."
else
log DEBUG "Using item straight from the server, no copy."
if [ "$RECURSION" == "0" ]
then
ITEM="$SRC_DIR/$ITEM"
else
ITEM="$ITEM"
fi
fi
else
if [ "$RECURSION" == "1" ]
then
ITEM="$PPSS_LOCAL_TMPDIR/$ITEM"
else
ITEM="$PPSS_LOCAL_TMPDIR/$ITEM_NO_PATH"
fi
fi
#
# Create the log file containing the output of the command.
#
if [ "$USE_MD5" == "1" ]
then
LOG_FILE_NAME=`echo "$ITEM" | $MD5 | awk '{ print $1 }'`
else
LOG_FILE_NAME=`echo "$ITEM" | sed s/[^[:alnum:]]/_/g`
fi
ITEM_LOG_FILE="$JOB_LOG_DIR/$LOG_FILE_NAME"
if [ -e "$ITEM_LOG_FILE" ] && [ "$DISABLE_SKIPPING" = "0" ]
then
log DEBUG "Item is already processed, skipping..."
start_new_worker
return 0
fi
#
# Create the output directory that will contain the output of the command.
# Example: When converting wav to mp3, the mp3 will be put in this directory.
#
if [ "$VIRTUAL" == "0" ]
then
if [ "$RECURSION" == "1" ]
then
OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$DIR_NAME"/
else
OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$ITEM_NO_PATH"
fi
else
#
# If the item is virtual, the item can contain special characters.
# These characters are stripted from the log file name, so this is used.
#
OUTPUT_DIR="$PPSS_LOCAL_OUTPUT"
fi
#
# Create the local output directory.
#
if [ ! -z "$OUTPUT_DIR" ]
then
log DEBUG "Local output dir is $OUTPUT_DIR"
mkdir -p "$OUTPUT_DIR"
fi
ERROR=""
#
# Some formatting of item log files.
#
DATE=`date +%b\ %d\ %H:%M:%S`
echo "===== PPSS Item Log File =====" > "$ITEM_LOG_FILE"
echo -e "Host:\t\t$HOSTNAME" >> "$ITEM_LOG_FILE"
echo -e "Process:\t$PID" >> "$ITEM_LOG_FILE"
echo -e "Item:\t\t$ITEM" >> "$ITEM_LOG_FILE"
echo -e "Start date:\t$DATE" >> "$ITEM_LOG_FILE"
echo -e "" >> "$ITEM_LOG_FILE"
#-->#
#--># The actual execution of the command as specified by
#--># the -c option.
#-->#
BEFORE=`get_time_in_seconds`
`echo $COMMAND | grep -i '$ITEM' >> /dev/null 2>&1`
RETVAL="$?"
if [ "$RETVAL" = "0" ]
then
eval "$COMMAND" >> "$ITEM_LOG_FILE" 2>&1
ERROR="$?"
MYPID="$!"
else
eval '$COMMAND"$ITEM" >> "$ITEM_LOG_FILE" 2>&1'
ERROR="$?"
MYPID="$!"
fi
AFTER=`get_time_in_seconds`
echo -e "" >> "$ITEM_LOG_FILE"
# Some error logging. Success or fail.
if [ ! "$ERROR" == "0" ]
then
mail_on_error "$ITEM" "$ITEM_LOG_FILE"
log DEBUG "Processing Item $ITEM failed."
echo "$FAIL_KEY" >> "$FIFO"
echo -e "Status:\t\tFAILURE" >> "$ITEM_LOG_FILE"
else
echo -e "Status:\t\tSUCCESS" >> "$ITEM_LOG_FILE"
fi
#
# If part of a cluster, remove the downloaded item after
# it has been processed and uploaded as not to fill up disk space.
#
if [ "$DOWNLOAD_TO_NODE" == "1" ]
then
if [ -e "$ITEM" ]
then
rm -rf "$ITEM"
else
log DEBUG "There is no local file to remove.. strange..."
fi
fi
#
# Create remote output dir and transfer output to server.
#
escape_item "$DIR_NAME"
ITEM_OUTPUT_DIR="$REMOTE_OUTPUT_DIR/$ITEM_ESCAPED"
if [ "$DOWNLOAD_TO_NODE" == "0" ]
then
log DEBUG "Download to node is disabled."
else
if [ "$DIR_NAME" == "." ]
then
DIR_NAME=""
fi
fi
#
# Upload the output file back to the server.
#
upload_item "$OUTPUT_DIR" "$DIR_NAME"
#
# Upload the log file to the server.
#
elapsed "$BEFORE" "$AFTER"
echo "$PROCESSING_TIME" >> "$ITEM_LOG_FILE"
echo -e "" >> "$ITEM_LOG_FILE"
if [ ! -z "$SSH_SERVER" ]
then
log DEBUG "Uploading item log file $ITEM_LOG_FILE to master $PPSS_HOME_DIR/$JOB_LOG_DIR"
scp -q $SSH_OPTS $SSH_KEY "$ITEM_LOG_FILE" $USER@$SSH_SERVER:$PPSS_HOME_DIR/$JOB_LOG_DIR
if [ ! "$?" ]
then
log DEBUG "Uploading of item log file failed."
fi
fi
start_new_worker
}
infanticide () {
log DEBUG "Running $FUNCNAME"
#
# This code is run if ctrl+c is pressed. Very important to prevent
# any child processes running after the parent has died. Keeps the system clean.
#
# This command kills all processes that are related to the master
# process as defined by $PID. All processes that have ever been
# spawned, although disowned or backgrounded will be killed...
#
PROCLIST=`ps a -o pid,pgid,ppid,command | grep [0-9] | grep $PID | grep -v -i grep`
oldIFS=$IFS # save the field separator
IFS=$'\n' # new field separator, the end of line
for x in `echo "$PROCLIST"`
do
MYPPID=`echo $x | awk '{ print $3 }'`
MYPID=`echo $x | awk '{ print $1 }'`
if [ ! "$MYPPID" == "$PID" ] && [ ! "$MYPPID" == "1" ]
then
if [ ! "$MYPID" == "$PID" ]
then
log DEBUG "Killing process $MYPID"
kill $MYPID >> /dev/null 2>&1
else
log DEBUG "Not killing master process..$MYPID.."
fi
else
log DEBUG "Not killing listener process. $MYPID.."
fi
done
IFS=$oldIFS
}
run_command () {
INPUT="$1"
log DEBUG "Current active workers is $ACTIVE_WORKERS"
if [ "$ACTIVE_WORKERS" -lt "$MAX_NO_OF_RUNNING_JOBS" ]
then
if [ -z "$INPUT" ]
then
stack_pop
INPUT="$REGISTER"
fi
log INFO "Now processing $INPUT"
if [ ! -z "$INPUT" ] && [ ! -d "$INPUT" ]
then
commando "$INPUT" &
MYPID="$!"
disown
PIDS="$PIDS $MYPID"
((ACTIVE_WORKERS++))
log DEBUG "Increasing active workers to $ACTIVE_WORKERS"
echo "$INPUT" >> "$LIST_OF_PROCESSED_ITEMS"
return 0
else
log DEBUG "Item is a directory or is empty."
return 0
fi
else
log DEBUG "Maximum number of workers are bussy, no more additional workers..."
fi
}
display_jobs_remaining () {
if [ "$ACTIVE_WORKERS" == "1" ] && [ "$QUIET" == "0" ]
then
log PRCNT "One job is remaining. "
elif [ "$QUIET" == "0" ]
then
if [ "$ACTIVE_WORKERS" == "1" ]
then
echo -en "\n"
fi
log PRCNT "$((ACTIVE_WORKERS)) jobs are remaining. "
fi
}
show_eta () {
CURRENT_PROCESSED=$((GLOBAL_COUNTER-MAX_NO_OF_RUNNING_JOBS))
TOTAL="$SIZE_OF_INPUT"
START_TIME=$START_PPSS
NOW=`get_time_in_seconds`
MODULO=$((GLOBAL_COUNTER % 5 ))
if [ "$QUIET" = "1" ]
then
return 0
fi
if [ "$CURRENT_PROCESSED" -le "0" ]
then
return 0
else
if [ "$MODULO" = "0" ]
then
RUNNING_TIME=$((NOW-START_TIME))
if [ ! "$RUNNING_TIME" -le "0" ] && [ ! "$CURRENT_PROCESSED" = "0" ] && [ "$CURRENT_PROCESSED" -gt "$MAX_NO_OF_RUNNING_JOBS" ]
then
TIME_PER_ITEM=$(( RUNNING_TIME / ( CURRENT_PROCESSED - MAX_NO_OF_RUNNING_JOBS ) ))
log DEBUG "Time per item is $TIME_PER_ITEM seconds."
TOTAL_TIME=$(( ($TIME_PER_ITEM * SIZE_OF_INPUT) + $TIME_PER_ITEM ))
TOTAL_TIME_IN_SECONDS=$((START_TIME+TOTAL_TIME))
if [ "$ARCH" = "Darwin" ]
then
DATE=`date -r $TOTAL_TIME_IN_SECONDS`
else
DATE=`date -d @$TOTAL_TIME_IN_SECONDS`
fi
echo
log DSPLY "ETA: $DATE"
echo -en "\033[2A"
fi
fi
fi
}
display_progress () {
if [ "$DAEMON" = "0" ]
then
SIZE_OF_INPUT=$(wc -l "$LISTOFITEMS" | awk '{ print $1 }')
GC=0
if [ ! "$GLOBAL_COUNTER" -gt "$SIZE_OF_INPUT" ]
then
GC="$GLOBAL_COUNTER"
else
GC="$SIZE_OF_INPUT"
fi
PERCENT=$((100 * $GC / $SIZE_OF_INPUT ))
if [ ! "$ACTIVE_WORKERS" == "0" ] # && [ "$FINISHED" == "0" ]
then
if [ "$QUIET" == "0" ]
then
log PRCNT "$PERCENT% complete. Processed $GC of $SIZE_OF_INPUT. Failed $FAILED_ITEMS_COUNTER/$SIZE_OF_INPUT."
show_eta
elif [ "$DAEMON" == "0" ]
then
echo -en "\r$PERCENT% --"
fi
if [ "$PERCENT" == "100" ]
then
if [ "$QUIET" == "1" ]
then
echo
fi
FINISHED=1
fi
fi
fi
}
terminate_listener () {
GLOBAL_FAILED_COUNTER="$1"
log DEBUG "Running $FUNCNAME"
if [ ! -z "$SSH_MASTER_PID" ]
then
kill "$SSH_MASTER_PID"
else
log DEBUG "SSH master PID is empty."
fi
set_status "STOPPED" "$GLOBAL_FAILED_COUNTER"
log DEBUG "Listener stopped."
if [ ! "$PERCENT" == "100" ]
then
echo
stop-ppss
log DSPLY "$FAILED_ITEMS_COUNTER failed items."
log DSPLY "Finished. Consult $JOB_LOG_DIR for job output."
else
echo
stop-ppss
log DSPLY "Finished. Consult $JOB_LOG_DIR for job output."
fi
if [ "$QUIET" == "1" ]
then
echo
fi
if [ ! -z "$EMAIL" ]
then
echo "PPSS job finished." | mail -s "$HOSTNAME - PPSS has finished." "$EMAIL"
if [ ! "$?" = "0" ]
then
log ERROR "Sending os status mail failed."
fi
fi
echo "$GLOBAL_FAILED_COUNTER" >> "$FIFO_LISTENER"
}
inotify_listener () {
inotifywait "$SRC_DIR" -m -e close -q --format '%w%f' | \
while read -r line
do
if [ ! -d "$line" ]
then
echo "$line" > "$FIFO"
fi
done
}
is_item_unprocessed () {
VAR="$1"
STATUS=0
if [ -z "$VAR" ]
then
log DEBUG "$FUNCNAME: something is wrong, no argument received."
return 1
fi
for x in $PROCESSED_ITEMS
do
if [ "$x" = "$VAR" ]
then
STATUS=1
fi
done
log DEBUG "Is item $VAR unprocessed: $STATUS"
return $STATUS
}
is_item_file_and_unmodified () {
ITEM="$1"
if [ -e "$ITEM" ]
then
NOW=`date +%s`
FILEDATE=`$STAT "$ITEM"`
ELAPSED="$(expr $NOW - $FILEDATE)"
if [ "$ELAPSED" -gt "$DAEMON_FILE_AGE" ]
then
log DEBUG "$FUNCNAME File $ITEM is aged $ELAPSED"
return 0
else
log DEBUG "$FUNCNAME File $ITEM too young $ELAPSED"
return 1
fi
else
log DEBUG "$FUNCNAME: file does not exist."
return 0
fi
}
process_item_as_daemon () {
ITEM="$1"
if is_item_unprocessed "$ITEM"
then
if is_item_file_and_unmodified "$ITEM"
then
echo "$ITEM" >> "$FIFO"
processed_stack_push "$ITEM"
else
stack_push "$ITEM"
fi
fi
}
daemon_listener () {
while true
do
get_all_items
while get_item
do
process_item_as_daemon "$ITEM"
done
while stack_pop
do
process_item_as_daemon "$REGISTER"
done
sleep "$DAEMON_POLLING_INTERVAL"
done
}
start_daemon_listener () {
daemon_listener &
MYPID="$!"
disown
PIDS="$PIDS $MYPID"
}
start_inotify_listener () {
ACTIVE_WORKERS=0
inotify_listener &
MYPID="$!"
disown
PIDS="$PIDS $MYPID"
}
start_as_daemon () {
if [ "$DAEMON" = "1" ]
then
log DEBUG "Daemon mode enabled."
if [ "$INOTIFY" = "1" ]
then
log INFO "Linux inotify enabled."
start_inotify_listener
else
start_daemon_listener
log INFO "Linux inotify disabled."
fi
else
log DEBUG "Daemon mode disabled."
fi
}
decrease_active_workers () {
if [ "$ACTIVE_WORKERS" -gt "0" ]
then
((ACTIVE_WORKERS--))
fi
}
listen_for_job () {
FINISHED=0
ACTIVE_WORKERS="$MAX_NO_OF_RUNNING_JOBS"
PIDS=""
log DEBUG "Listener started."
start_as_daemon
while read event <& 42
do
log INFO "Current active workers is $ACTIVE_WORKERS"
if [ "$event" == "$START_KEY" ]
then
decrease_active_workers
log DEBUG "Got a 'start-key' event"
if [ "$DAEMON" == "0" ]
then
if get_item
then
log DEBUG "Got an item, running command..."
run_command "$ITEM"
else
log DEBUG "No more new items..."
if [ "$ACTIVE_WORKERS" = "0" ]
then
display_progress
break
else
display_jobs_remaining
fi
fi
else
log DEBUG "Daemon mode: a worker finished..."
run_command
fi
elif [ "$event" == "$FAIL_KEY" ]
then
((FAILED_ITEMS_COUNTER++))
log DEBUG "An item failed to process. $FAILED_ITEMS_COUNTER"
elif [ "$event" == "$KILL_KEY" ]
then
infanticide
break
else
log DEBUG "Event is an item."
stack_push "$event"
run_command
fi
display_progress
set_status "RUNNING" "$FAILED_ITEMS_COUNTER"
done
terminate_listener "$FAILED_ITEMS_COUNTER"
}
start_all_workers () {
if [ "$MAX_NO_OF_RUNNING_JOBS" == "1" ]
then
log DSPLY "Starting one (1) single worker."
else
log DSPLY "Starting $MAX_NO_OF_RUNNING_JOBS parallel workers."
fi
if [ "$DAEMON" == "0" ]
then
log DSPLY "---------------------------------------------------------"
elif [ "$INOTIFY" = "1" ]
then
return 0
fi
i=0
while [ "$i" -lt "$MAX_NO_OF_RUNNING_JOBS" ]
do
start_new_worker
log DEBUG "Starting worker $i"
((i++))
if [ ! "$MAX_DELAY" == "0" ]
then
random_delay "$MAX_DELAY"
fi
done
}
get_status_of_nodes () {
RESULT_FILE="$1"
FAILED=0
ssh -q $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER cat "$PPSS_HOME_DIR/$PPSS_NODE_STATUS/*" > "$RESULT_FILE" 2>&1
if [ ! "$?" == "0" ]
then
log DSPLY "PPSS has not been started yet on nodes."
return 1
fi
IFS=$'\n'
for x in `cat $RESULT_FILE`
do
IP=`echo $x | awk '{ print $1 }'`
HOST=`echo $x | awk '{ print $2 }'`
STATUS=`echo $x | awk '{ print $3 }'`
RES=`echo $x | awk '{ print $4 }'`
FAIL=`echo $x | awk '{ print $5 }'`
if [ -z "$RES" ]
then
RES="0"
fi
PROCESSED=$((PROCESSED+RES))
FAILED=$((FAILED+FAIL))
LINE=`echo "$IP $HOST $RES $FAIL $STATUS" | awk '{ printf ("%-16s %-16s % 8s %6s %7s\n",$1,$2,$3,$4,$5) }'`
log DSPLY "$LINE"
done
log DSPLY "---------------------------------------------------------"
LINE=`echo $PROCESSED $FAILED | awk '{ printf ("Total processed/failed: %18s %6s \n",$1,$2) }'`
log DSPLY "$LINE"
rm "$RESULT_FILE"
}
show_status () {
. $CONFIG
if [ ! -z "$SSH_KEY" ]
then
SSH_KEY="-i $SSH_KEY"
fi
get_all_items
ITEMS=`wc -l $LISTOFITEMS | awk '{ print $1 }'`
if [ ! -z "$ITEMS" ] && [ ! "$ITEMS" == "0" ]
then
PROCESSED=`exec_cmd "ls -1 $PPSS_HOME_DIR/$ITEM_LOCK_DIR 2>/dev/null | wc -l" 1` 2>&1 >> /dev/null
check_status "$?" "Could not get number of processed items."
TMP_STATUS=$((100 * $PROCESSED / $ITEMS))
log DSPLY "Status:\t\t$TMP_STATUS percent complete."
else
log DSPLY "Status: UNKNOWN - is PPSS deployed on nodes?"
fi
if [ ! -z $NODES_FILE ]
then
TMP_NO=`cat $NODES_FILE | wc -l`
log DSPLY "Nodes:\t $TMP_NO"
fi
log DSPLY "Items:\t\t$ITEMS"
log DSPLY "---------------------------------------------------------"
HEADER=`echo IP-address Hostname Processed Failed Status | awk '{ printf ("%-16s %-15s % 2s %2s %2s\n",$1,$2,$3,$4,$5) }'`
log DSPLY "$HEADER"
log DSPLY "---------------------------------------------------------"
PROCESSED=0
get_status_of_nodes "RESULT_FILE"
}
main () {
case $MODE in
node )
create_working_directory
test_server
init_vars
get_all_items
listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & 2>&1 >> /dev/null
LISTENER_PID=$!
start_all_workers
;;
start )
# This option only starts all nodes.
LOGFILE=/dev/null
display_header
if [ ! -e "$NODES_FILE" ]
then
log ERROR "File $NODES with list of nodes does not exist."
set_status "STOPPED"
cleanup
exit 1
else
for NODE in `cat $NODES_FILE`
do
start_ppss_on_node "$NODE" &
done
fi
cleanup
;;
config )
LOGFILE=/dev/null
display_header
log DSPLY "Generating configuration file $CONFIG"
add_var_to_config PPSS_LOCAL_TMPDIR "$PPSS_LOCAL_TMPDIR"
add_var_to_config PPSS_LOCAL_OUTPUT "$PPSS_LOCAL_OUTPUT"
cleanup
;;
stop )
LOGFILE=/dev/null
display_header
log DSPLY "Stopping PPSS on all nodes."
test_server
exec_cmd "touch $STOP_SIGNAL"
cleanup
;;
pause )
LOGFILE=/dev/null
display_header
log DSPLY "Pausing PPSS on all nodes."
exec_cmd "touch $PAUSE_SIGNAL"
cleanup
;;
continue )
LOGFILE=/dev/null
display_header
if does_file_exist "$STOP_SIGNAL"
then
log DSPLY "Continuing processing, please use $0 start to start PPSS on al nodes."
exec_cmd "rm -f $STOP_SIGNAL"
fi
if does_file_exist "$PAUSE_SIGNAL"
then
log DSPLY "Continuing PPSS on all nodes."
exec_cmd "rm -f $PAUSE_SIGNAL"
fi
cleanup
;;
deploy )
LOGFILE=ppss-deploy.txt
if [ -e "$LOGFILE" ]
then
rm "$LOGFILE"
fi
init_ssh_server_socket
display_header
log DSPLY "Deploying PPSS on nodes. See ppss-deploy.txt for details."
deploy_ppss
wait
cleanup
;;
status )
LOGFILE=/dev/null
display_header
test_server
show_status
cleanup
;;
erase )
LOGFILE=/dev/null
display_header
log DSPLY "Erasing PPSS from all nodes."
erase_ppss
cleanup
;;
kill )
LOGFILE=/dev/null
for x in `ps ux | grep ppss | grep -v grep | grep bash | awk '{ print $2 }'`
do
kill "$x"
done
cleanup
;;
* )
create_working_directory
display_header
init_vars
get_all_items
listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & 2>&1 >> /dev/null
LISTENER_PID=$!
start_all_workers
;;
esac
}
#
# PPSS can be sourced. This is mainly for testing purposes (unit tests).
#
if ! are_we_sourced
then
#
# First step: process all command-line arguments.
#
process_arguments "$@"
#
# This command starts the that sets the whole framework in motion.
# But only if the file is not sourced.
#
main
#
# Exit after all processes have finished.
#
#wait
if [ -e "$FIFO_LISTENER" ]
then
while read event <& 43
do
cleanup
exit "$event"
done
fi
fi