PPSS can now be sourced for unit testing.

This commit is contained in:
Louwrentius 2010-06-06 18:02:05 +00:00
parent 40b3ed8228
commit 708803688f
1 changed files with 433 additions and 359 deletions

302
ppss
View File

@ -20,14 +20,17 @@
# "Patches or other contributions are always welcome!"
#
#
# Handling control-c for a clean shutdown.
#
trap 'kill_process' SIGINT
# Setting some vars.
SCRIPT_NAME="Distributed Parallel Processing Shell Script"
SCRIPT_VERSION="2.70"
#
# The first argument to this script can be a mode.
#
MODES="node start config stop pause continue deploy status erase kill ec2"
for x in $MODES
do
@ -39,32 +42,20 @@ do
fi
done
#
# The working directory of PPSS can be set with
# export PPSS_DIR=/path/to/workingdir
#
if [ -z "$PPSS_DIR" ]
then
PPSS_DIR="ppss_dir"
fi
get_time_in_seconds () {
if [ "$ARCH" == "SunOS" ]
then
#
# Dirty hack because this ancient operating system does not support +%s...
#
THE_TIME=`truss /usr/bin/date 2>&1 | grep ^time | awk '{ print $3 }'`
else
THE_TIME="$(date +%s)"
fi
echo "$THE_TIME"
}
CONFIG=""
HOSTNAME="`hostname`"
ARCH="`uname`"
PPSS_HOME_DIR="ppss-home"
SOURCED="$0"
PID="$$"
GLOBAL_LOCK="$PPSS_DIR/PPSS-GLOBAL-LOCK-$PID" # Global lock file used by local PPSS instance.
@ -88,7 +79,7 @@ PROCESSORS=""
STOP_KEY="$RANDOM$RANDOM$RANDOM" # This is a signal to the listener to stop.
KILL_KEY="$RANDOM$RANDOM$RANDOM" # This is a signal to stop immediately and kill
RECURSION="1" # all running processes.
START_PPSS=`get_time_in_seconds`
START_PPSS=""
STOP_PPSS=""
SIZE_OF_INPUT=""
@ -120,19 +111,6 @@ ITEM_ESCAPED=""
NODE_STATUS="$PPSS_DIR/status.txt"
DAEMON=0
case $ARCH in
"Darwin") MD5=md5 ;;
"FreeBSD") MD5=md5 ;;
"SunOS") MD5="digest -a md5" ;;
"Linux") MD5=md5sum ;;
esac
if [ "$ARCH" == "Darwin" ] || [ "$ARCH" == "FreeBSD" ]
then
MD5=md5
else
MD5=$MD5
fi
showusage_short () {
@ -208,12 +186,6 @@ showusage_normal () {
echo
}
if [ "$#" == "0" ]
then
showusage_short
exit 1
fi
showusage_long () {
echo
@ -359,7 +331,7 @@ kill_process () {
exec_cmd () {
STATUS="0"
STATUS=""
CMD="$1"
NOMP="$2" # Disable multiplexing.
@ -378,9 +350,9 @@ exec_cmd () {
STATUS=$?
fi
else
log DEBUG "LOCAL EXEC"
eval "$CMD"
STATUS=$?
log DEBUG "LOCAL EXEC - status is $STATUS"
fi
return $STATUS
}
@ -391,11 +363,13 @@ does_file_exist () {
# this function makes remote or local checking of existence of items transparent.
#
FILE="$1"
`exec_cmd "ls -1 $FILE" >> /dev/null 2>&1`
if [ "$?" == "0" ]
RES=`exec_cmd "ls -1 $FILE" 2>&1`
if [ "$?" = "0" ]
then
log DEBUG "$FILE does exist - $RES"
return 0
else
log DEBUG "$FILE does not exist - $RES"
return 1
fi
}
@ -405,9 +379,8 @@ check_for_interrupt () {
#
# PPSS can be interupted with a stop or pause command.
#
does_file_exist "$STOP_SIGNAL"
if [ "$?" == "0" ]
if [ "$?" = "0" ]
then
set_status "STOPPED"
log INFO "STOPPING job. Stop signal found."
@ -416,7 +389,7 @@ check_for_interrupt () {
fi
does_file_exist "$PAUSE_SIGNAL"
if [ "$?" == "0" ]
if [ "$?" = "0" ]
then
set_status "PAUZED"
log INFO "PAUSE: sleeping for $PAUSE_DELAY SECONDS."
@ -465,13 +438,35 @@ add_var_to_config () {
fi
}
is_var_empty () {
if [ -z "$1" ]
then
showusage_normal
cleanup
exit 1
fi
}
process_arguments () {
#
# Process any command-line options that are specified."
#
if [ "$#" = "0" ]
then
showusage_short
exit 1
fi
while [ $# -gt 0 ]
do
case $1 in
--config|-C )
CONFIG="$2"
is_var_empty "$CONFIG"
if [ "$MODE" == "config" ]
then
@ -530,11 +525,13 @@ do
--sourcefile|-f )
INPUT_FILE="$2"
is_var_empty "$INPUT_FILE"
add_var_to_config INPUT_FILE "$INPUT_FILE"
shift 2
;;
--sourcedir|-d )
SRC_DIR="$2"
is_var_empty "$SRC_DIR"
add_var_to_config SRC_DIR "$SRC_DIR"
shift 2
;;
@ -578,6 +575,7 @@ do
;;
--command|-c )
COMMAND="$2"
is_var_empty "$COMMAND"
if [ "$MODE" == "config" ]
then
COMMAND=\'$COMMAND\'
@ -623,6 +621,7 @@ do
;;
--key|-k )
SSH_KEY="$2"
is_var_empty "$SSH_KEY"
add_var_to_config SSH_KEY "$SSH_KEY"
if [ ! -z "$SSH_KEY" ]
then
@ -707,14 +706,24 @@ do
esac
done
if [ -z "$SRC_DIR" ] && [ -z "$INPUT_FILE" ]
then
showusage_short
echo
log ERROR "No source file or directory specified with -f or -d."
cleanup
exit 1
fi
if [ "$DAEMON" == "1" ] && [ -z "$SRC_DIR" ]
then
showusage_short
echo
echo "Daemon mode requires an argument to the -d option as a place to put the lock dir."
echo "Read the on-line manual for more information."
exit
exit 1
fi
}
display_header () {
@ -749,6 +758,52 @@ expand_str () {
echo "$STR"
}
are_we_sourced () {
if [ "$SOURCED" == "-bash" ] || [ "$SOURCED" == "bash" ] || [ "$SOURCED" = "dash" ]
then
log DEBUG "This script is sourced."
return 0
else
log DEBUG "This script is not sourced."
return 1
fi
}
get_time_in_seconds () {
if [ "$ARCH" == "SunOS" ]
then
#
# Dirty hack because this ancient operating system does not support +%s...
#
THE_TIME=`truss /usr/bin/date 2>&1 | grep ^time | awk '{ print $3 }'`
else
THE_TIME="$(date +%s)"
fi
echo "$THE_TIME"
}
set_md5 () {
case $ARCH in
"Darwin") MD5=md5 ;;
"FreeBSD") MD5=md5 ;;
"SunOS") MD5="digest -a md5" ;;
"Linux") MD5=md5sum ;;
esac
echo "test" | $MD5 > /dev/null 2>&1
if [ ! "$?" ]
then
LOG ERROR "ERROR - PPSS requires $MD5. It may not be within the path or installed."
return 1
else
return 0
fi
}
log () {
#
@ -759,6 +814,15 @@ log () {
MESG="$2"
TYPE_LENGTH=5
#
# Performance hack. Don't go through all the code if not required.
#
if [ "$TYPE" = "DEBUG" ] && [ "$PPSS_DEBUG" == "0" ]
then
return
fi
TYPE_EXP=`expand_str "$TYPE"`
DATE=`date +%b\ %d\ %H:%M:%S`
@ -780,6 +844,7 @@ log () {
if [ "$TYPE" == "DSPLY" ] || [ "$TYPE" == "ERROR" ] || [ "$TYPE" == "WARN" ] && [ "$QUIET" == "0" ]
then
echo -e "$ECHO_MSG"
elif [ "$TYPE" == "ERROR" ] && [ "$QUIET" == "1" ]
then
echo -e "$ECHO_MSG"
@ -794,17 +859,28 @@ log () {
# Init all vars
init_vars () {
echo "test" | $MD5 > /dev/null 2>&1
if [ ! "$?" == "0" ]
then
echo "ERROR - PPSS requires $MD5. It may not be within the path or installed."
fi
#
# Get start time to measure how long PPSS has been running.
#
START_PPSS=`get_time_in_seconds`
#
# Check if MD5(SUM) is present on the system.
#
set_md5
#
# Is PPSS run as a daemon? Then use input locking, which is not required otherwise.
#
if [ "$DAEMON" == "1" ]
then
INPUT_LOCK="$SRC_DIR/INPUT_LOCK"
fi
#
# For some strange reason, this value differ on different operating systems due to
# different behaviour betwen the ps utilily acros operating systems.
#
if [ "$ARCH" == "Darwin" ]
then
MIN_JOBS=4
@ -813,29 +889,16 @@ init_vars () {
MIN_JOBS=3
fi
if [ -e "$LOGFILE" ]
then
rm $LOGFILE
fi
#
# Create a remote homedir for PPSS
#
does_file_exist "$PPSS_HOME_DIR"
if [ ! "$?" == "0" ] && [ ! -z "$SSH_SERVER" ]
if [ ! "$?" = "0" ] && [ ! -z "$SSH_SERVER" ]
then
log DEBUG "Remote PPSS home directory $PPSS_HOME_DIR does not exist. Creating."
exec_cmd "mkdir -p $PPSS_HOME_DIR/$PPSS_DIR"
fi
display_header
if [ -z "$COMMAND" ]
then
echo
log ERROR "No command specified."
echo
showusage_normal
cleanup
exit 1
fi
echo 1 > $GLOBAL_COUNTER_FILE
@ -876,7 +939,7 @@ init_vars () {
if [ ! -z "$SSH_SERVER" ]
then
does_file_exist "$PPSS_HOME_DIR/$JOB_LOG_DIR"
if [ ! "$?" == "0" ]
if [ ! "$?" = "0" ]
then
log DEBUG "Remote Job log directory $PPSS_HOME_DIR/$JOB_lOG_DIR does not exist. Creating."
exec_cmd "mkdir $PPSS_HOME_DIR/$JOB_LOG_DIR"
@ -894,7 +957,7 @@ init_vars () {
fi
does_file_exist "$ITEM_LOCK_DIR"
if [ ! "$?" == "0" ]
if [ ! "$?" = "0" ]
then
if [ ! -z "$SSH_SERVER" ]
then
@ -903,7 +966,7 @@ init_vars () {
log DEBUG "Creating local item lock dir."
fi
exec_cmd "mkdir $ITEM_LOCK_DIR"
if [ ! "$?" == "0" ]
if [ ! "$?" ]
then
log DEBUG "Failed to create item lock dir."
fi
@ -912,7 +975,7 @@ init_vars () {
if [ ! -z "$SSH_SERVER" ]
then
does_file_exist "$REMOTE_OUTPUT_DIR"
if [ ! "$?" == "0" ]
if [ ! "$?" = "0" ]
then
log DEBUG "Remote output dir $REMOTE_OUTPUT_DIR does not exist."
exec_cmd "mkdir $REMOTE_OUTPUT_DIR"
@ -1180,7 +1243,7 @@ get_no_of_cpus () {
then
if [ "$ARCH" == "Linux" ]
then
NUMBER=`grep ^processor $CPUINFO | wc -l`
NUMBER=`grep -c ^processor $CPUINFO`
got_cpu_info "$?"
elif [ "$ARCH" == "Darwin" ]
@ -1195,12 +1258,12 @@ get_no_of_cpus () {
elif [ "$ARCH" == "SunOS" ]
then
NUMBER=`psrinfo | grep on-line | wc -l`
NUMBER=`psrinfo | grep -c on-line`
got_cpu_info "$?"
else
if [ -e "$CPUINFO" ]
then
NUMBER=`grep ^processor $CPUINFO | wc -l`
NUMBER=`grep -c ^processor $CPUINFO`
got_cpu_info "$?"
fi
fi
@ -1217,7 +1280,7 @@ get_no_of_cpus () {
if [ "$ARCH" == "Linux" ]
then
PHYSICAL=`grep 'physical id' $CPUINFO`
if [ "$?" == "0" ]
if [ "$?" ]
then
PHYSICAL=`grep 'physical id' $CPUINFO | sort | uniq | wc -l`
if [ "$PHYSICAL" == "1" ]
@ -1228,7 +1291,7 @@ get_no_of_cpus () {
fi
TMP=`grep 'core id' $CPUINFO`
if [ "$?" == "0" ]
if [ "$?" ]
then
log DEBUG "Starting job only for each physical core on all physical CPU(s)."
NUMBER=`grep 'core id' $CPUINFO | sort | uniq | wc -l`
@ -1240,7 +1303,7 @@ get_no_of_cpus () {
fi
else
log INFO "No 'physical id' section found in $CPUINFO, typical for older cpus."
NUMBER=`grep ^processor $CPUINFO | wc -l`
NUMBER=`grep -c ^processor $CPUINFO`
got_cpu_info "$?"
fi
elif [ "$ARCH" == "Darwin" ]
@ -1312,7 +1375,7 @@ get_global_lock () {
ERROR="$?"
if [ ! "$ERROR" == "0" ]
then
random_delay $MAX_LOCK_DELAY
#random_delay $MAX_LOCK_DELAY
continue
else
break
@ -1457,24 +1520,16 @@ lock_item () {
ITEM="$1"
LOCK_FILE_NAME=`echo "$ITEM" | $MD5 | awk '{ print $1 }'`
ITEM_LOCK_FILE="$ITEM_LOCK_DIR/$LOCK_FILE_NAME"
log DEBUG "Trying to lock item $ITEM - $ITEM_LOCK_FILE."
exec_cmd "mkdir $ITEM_LOCK_FILE >> /dev/null 2>&1"
ERROR="$?"
if [ "$ERROR" == "$?" ]
then
exec_cmd "touch $ITEM_LOCK_FILE/$HOSTNAME" # Record that item is claimed by node x.
fi
return "$ERROR"
return "$?"
}
get_input_lock () {
while true
do
exec_cmd "mkdir $INPUT_LOCK >> /dev/null 2>&1 "
if [ "$?" == "0" ]
if [ "$?" ]
then
log DEBUG "Input lock is obtained..."
break
@ -1488,7 +1543,7 @@ get_input_lock () {
release_input_lock () {
exec_cmd "rm -rf $INPUT_LOCK"
if [ "$?" == "0" ]
if [ "$?" ]
then
log DEBUG "Input lock was released..."
return 0
@ -1544,6 +1599,7 @@ get_all_items () {
cleanup
exit 1
fi
else
ITEMS=""
fi
@ -1636,8 +1692,13 @@ get_item () {
else
((GLOBAL_COUNTER++))
echo $GLOBAL_COUNTER > $GLOBAL_COUNTER_FILE
if [ "$DISABLE_ITEM_LOCK" == "0" ]
then
lock_item "$ITEM"
if [ ! "$?" == "0" ]
else
log DEBUG "Item lock disabled."
fi
if [ ! "$?" ]
then
log DEBUG "Item $ITEM is locked."
release_global_lock
@ -1701,16 +1762,11 @@ elapsed () {
SECS="$(expr $REMAINDER % 60)"
MINS="$(expr $(expr $REMAINDER - $SECS) / 60)"
RES=`printf 'Total processing time (hh:mm:ss): %02d:%02d:%02d' $HOURS $MINS $SECS`
RES=$(printf "Total processing time (hh:mm:ss): %02d:%02d:%02d" $HOURS $MINS $SECS)
log DSPLY "$RES"
}
commando () {
log DEBUG "-------------------------------------"
if [ "$DAEMON" == "1" ]
then
log INFO "Processing item: $1 in DAEMON MODE"
fi
#
# This function will start a chain reaction of events.
@ -1734,7 +1790,6 @@ commando () {
#
ITEM="$1"
if [ "$RECURSION" == "1" ]
then
escape_item "$ITEM"
@ -1766,11 +1821,7 @@ commando () {
DIR_NAME="$SRC_DIR"
ITEM_NO_PATH="$ITEM"
OUTPUT_DIR="$PPSS_LOCAL_OUTPUT"
fi
#
# OUTPUT_DIR can be used in scripts or command lines.
#
else
VIRTUAL="1"
DIR_NAME=""
@ -1781,12 +1832,17 @@ commando () {
OUTPUT_FILE="$ITEM_NO_PATH"
log DEBUG "Processing item: $ITEM"
log DEBUG "ITEM_NO_PATH is $ITEM_NO_PATH"
log DEBUG "Dirname is $DIR_NAME"
log DEBUG "OUTPUT DIR IS $OUTPUT_DIR"
log DEBUG "Virtual is $VIRTUAL"
log DEBUG "OUTPUT FILE is $OUTPUT_FILE"
#
# The following lines should only be enabled for debugging.
#
#log DEBUG "Processing item: $ITEM"
#log DEBUG "ITEM_NO_PATH is $ITEM_NO_PATH"
#log DEBUG "Dirname is $DIR_NAME"
#log DEBUG "OUTPUT DIR IS $OUTPUT_DIR"
#log DEBUG "Virtual is $VIRTUAL"
#log DEBUG "OUTPUT FILE is $OUTPUT_FILE"
#
#
# Decide if an item must be transfered from server to the node.
# or be processed in-place (NFS / SMB mount?)
@ -1818,7 +1874,6 @@ commando () {
#
# Create the log file containing the output of the command.
#
#LOG_FILE_NAME=`echo "$ITEM" | sed s/^\\\.//g | sed s/^\\\.\\\.//g | sed s/\\\///g | sed s/\\ /_/g`
LOG_FILE_NAME=`echo "$ITEM" | $MD5 | awk '{ print $1 }'`
ITEM_LOG_FILE="$JOB_LOG_DIR/$LOG_FILE_NAME"
@ -1839,11 +1894,17 @@ commando () {
# If the item is virtual, the item can contain special characters.
# These characters are stripted from the log file name, so this is used.
#
OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$LOG_FILE_NAME"
OUTPUT_DIR="$PPSS_LOCAL_OUTPUT"
fi
log DEBUG "Local output dir is $OUTPUT_DIR"
#
# FIXME!
#
if [ "$PPSS_OUTPUT" == "1" ]
then
mkdir -p "$OUTPUT_DIR"
fi
ERROR=""
#
@ -1863,7 +1924,7 @@ commando () {
#
BEFORE=`get_time_in_seconds`
TMP=`echo $COMMAND | grep -i '$ITEM'`
if [ "$?" == "0" ]
if [ "$?" ]
then
eval "$COMMAND" >> "$ITEM_LOG_FILE" 2>&1
ERROR="$?"
@ -1933,7 +1994,7 @@ commando () {
then
log DEBUG "Uploading item log file $ITEM_LOG_FILE to master $PPSS_HOME_DIR/$JOB_LOG_DIR"
scp -q $SSH_OPTS $SSH_KEY "$ITEM_LOG_FILE" $USER@$SSH_SERVER:$PPSS_HOME_DIR/$JOB_LOG_DIR
if [ ! "$?" == "0" ]
if [ ! "$?" ]
then
log DEBUG "Uploading of item log file failed."
fi
@ -1983,7 +2044,7 @@ listen_for_job () {
RES=$((MAX_NO_OF_RUNNING_JOBS-DIED))
if [ "$RES" == "1" ] && [ "$QUIET" == "0" ]
then
log PRCNT "$((MAX_NO_OF_RUNNING_JOBS-DIED)) job is remaining. \n"
log PRCNT "$((MAX_NO_OF_RUNNING_JOBS-DIED)) job is remaining. "
elif [ "$QUIET" == "0" ]
then
if [ "$DIED" == "1" ]
@ -2004,7 +2065,6 @@ listen_for_job () {
# spawned, although disowned or backgrounded will be killed...
#
PROCLIST=`ps a -o pid,pgid,ppid,command | grep [0-9] | grep $PID | grep -v -i grep`
#echo "$PROCLIST" > proclist.txt
oldIFS=$IFS # save the field separator
IFS=$'\n' # new field separator, the end of line
for x in `echo "$PROCLIST"`
@ -2177,7 +2237,7 @@ show_status () {
then
STATUS=`get_status_of_node "$x" | awk '{ print $2 }'`
RES=`exec_cmd "grep -i $NODE ~/$PPSS_HOME_DIR/$JOB_LOG_DIR/* 2>/dev/null | wc -l " 1`
if [ ! "$?" == "0" ] || [ -z "$RES" ]
if [ ! "$?" ] || [ -z "$RES" ]
then
RES=0
fi
@ -2316,6 +2376,7 @@ main () {
* )
create_working_directory
display_header
init_vars
get_all_items
listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & 2>&1 >> /dev/null
@ -2324,12 +2385,25 @@ main () {
;;
esac
}
if ! are_we_sourced
then
#
# First step: process all command-line arguments.
#
process_arguments "$@"
#
# This command starts the that sets the whole framework in motion.
# But only if the file is not sourced.
#
main
#
# Exit after all processes have finished.
#
wait
fi