First version of distributed PPSS - does not work yet - untested

This commit is contained in:
Louwrentius 2009-01-24 22:24:26 +00:00
parent d58e61c08a
commit 54879fdac6

102
ppss.sh
View File

@ -45,7 +45,7 @@ RUNNING_SIGNAL="$0_is_running"
GLOBAL_LOCK="PPSS-$RANDOM-$RANDOM" GLOBAL_LOCK="PPSS-$RANDOM-$RANDOM"
PAUSE_SIGNAL="pause.txt" PAUSE_SIGNAL="pause.txt"
ARRAY_POINTER_FILE="array-pointer-$RANDOM-$RANDOM" ARRAY_POINTER_FILE="array-pointer-$RANDOM-$RANDOM"
JOB_LOG_DIR="job_log" JOB_LOG_DIR="JOB_LOG"
LOGFILE="ppss-log.txt" LOGFILE="ppss-log.txt"
MAX_DELAY=2 MAX_DELAY=2
PERCENT="0" PERCENT="0"
@ -53,6 +53,9 @@ PID="$$"
LISTENER_PID="" LISTENER_PID=""
IFS_BACKUP="$IFS" IFS_BACKUP="$IFS"
SSH_SERVER="" # Remote server or 'master'.
SSH_KEY="" # SSH key for ssh account.
showusage () { showusage () {
echo echo
@ -74,6 +77,11 @@ showusage () {
echo -e "\t- p [ no of parallel processes ] \tOptional: specifies number of simultaneous processes manually." echo -e "\t- p [ no of parallel processes ] \tOptional: specifies number of simultaneous processes manually."
echo -e "\t- j ( enable hyperthreading ) \t\tOptiona: Enable or disable hyperthreading. Enabled by default." echo -e "\t- j ( enable hyperthreading ) \t\tOptiona: Enable or disable hyperthreading. Enabled by default."
echo echo
echo "Options for distributed usage:"
echo
echo -e "\t- s [ username@server ] \t\tUsername and server domain name or IP-address of 'PPSS server'."
echo -e "\t- k [ SSH key file ] \t\tSSH key file used for connection with 'PPSS server'."
echo
echo -e "Example: encoding some wav files to mp3 using lame:" echo -e "Example: encoding some wav files to mp3 using lame:"
echo echo
echo -e "$0 -c 'lame ' -d /path/to/wavfiles -l logfile -j (wach out for the space in -c)" echo -e "$0 -c 'lame ' -d /path/to/wavfiles -l logfile -j (wach out for the space in -c)"
@ -116,12 +124,12 @@ cleanup () {
rm $FIFO rm $FIFO
fi fi
if [ -e "$ARRAY_POINTER_FILE" ] if [ -e "$ARRAY_POINTER_FILE" ] && [ -z "$SSH_SERVER" ]
then then
rm $ARRAY_POINTER_FILE rm $ARRAY_POINTER_FILE
fi fi
if [ -e "$GLOBAL_LOCK" ] if [ -e "$GLOBAL_LOCK" ] && [ -z "$SSH_SERVER" ]
then then
rm -rf "$GLOBAL_LOCK" rm -rf "$GLOBAL_LOCK"
fi fi
@ -163,7 +171,7 @@ then
fi fi
# Process any command-line options that are specified." # Process any command-line options that are specified."
while getopts ":f:c:l:i:vhp:jd:" OPTIONS while getopts ":c:d:f:i:jhk:l:p:s:v" OPTIONS
do do
case $OPTIONS in case $OPTIONS in
f ) f )
@ -186,12 +194,6 @@ do
;; ;;
c ) c )
COMMAND="$OPTARG" COMMAND="$OPTARG"
if [ -z "$COMMAND" ]
then
echo "ERROR: command not specified."
cleanup
exit 1
fi
;; ;;
h ) h )
@ -203,6 +205,9 @@ do
l ) l )
LOGFILE="$OPTARG" LOGFILE="$OPTARG"
;; ;;
k )
SSH_KEY="-i $OPTARG"
;;
p ) p )
TMP="$OPTARG" TMP="$OPTARG"
if [ ! -z "$TMP" ] if [ ! -z "$TMP" ]
@ -210,6 +215,9 @@ do
MAX_NO_OF_RUNNING_JOBS="$TMP" MAX_NO_OF_RUNNING_JOBS="$TMP"
fi fi
;; ;;
s )
SSH_SERVER="$OPTARG"
;;
v ) v )
echo "" echo ""
@ -223,8 +231,21 @@ do
esac esac
done done
# Init log file # This function makes local and remote operation transparent.
exec_cmd () {
CMD="$1"
if [ ! -z "$SSH_SERVER" ]
then
ssh "$SSH_KEY" "$SSH_SERVER" "$CMD"
else
`"$CMD"`
fi
}
# Init all vars
init_vars () { init_vars () {
if [ -e "$LOGFILE" ] if [ -e "$LOGFILE" ]
@ -242,7 +263,7 @@ init_vars () {
exit 1 exit 1
fi fi
echo 0 > "$ARRAY_POINTER_FILE" exec_cmd "echo 0 > $ARRAY_POINTER_FILE"
FIFO=$(pwd)/fifo-$RANDOM-$RANDOM FIFO=$(pwd)/fifo-$RANDOM-$RANDOM
@ -265,7 +286,7 @@ init_vars () {
log INFO "Job log directory $JOB_lOG_DIR does not exist. Creating." log INFO "Job log directory $JOB_lOG_DIR does not exist. Creating."
mkdir "$JOB_LOG_DIR" mkdir "$JOB_LOG_DIR"
else else
log INFO "Job log directory $JOB_LOG_DIR exists, if it contains logs for items, these items will be skiped." log INFO "Job log directory $JOB_LOG_DIR exists, if it contains logs for items, these items will be skipiped."
fi fi
} }
@ -315,11 +336,24 @@ check_status () {
if [ ! "$ERROR" == "0" ] if [ ! "$ERROR" == "0" ]
then then
log INFO "$FUNCTION - $MESSAGE" log INFO "$FUNCTION - $MESSAGE"
cleanup
exit 1 exit 1
fi fi
} }
test_server () {
if [ ! -z "$SSH_SERVER" ]
then
ssh "$SSH_KEY" "$SSH_SERVER" date >> /dev/null 2>&1
check_status "$?" "$FUNCNAME" "Server $SSH_SERVER could not be reached."
else
log DEBUG "No remote server specified, assuming stand-alone mode."
fi
}
get_no_of_cpus () { get_no_of_cpus () {
# Use hyperthreading or not? # Use hyperthreading or not?
@ -403,10 +437,12 @@ random_delay () {
sleep "$NUMBER" sleep "$NUMBER"
} }
global_lock () { global_lock () {
mkdir $GLOBAL_LOCK > /dev/null 2>&1 exec_cmd "mkdir $GLOBAL_LOCK > /dev/null 2>&1"
ERROR=$? ERROR="$?"
if [ ! "$ERROR" == "0" ] if [ ! "$ERROR" == "0" ]
then then
return 1 return 1
@ -433,18 +469,14 @@ get_global_lock () {
release_global_lock () { release_global_lock () {
if [ -e "$GLOBAL_LOCK" ] if [ ! -z "$SSH_SERVER" ]
then then
rm -rf "$GLOBAL_LOCK" ssh "$SSH_KEY" "$SSH_SERVER" rm -rf "$GLOBAL_LOCK"
return 0
else else
log ERROR "$FUNCNAME Lock file $GLOBAL_LOCK not present, something is wrong!" rm -rf "$GLOBAL_LOCK"
return 1
exit
fi fi
} }
are_jobs_running () { are_jobs_running () {
NUMBER_OF_PROCS=`jobs | wc -l` NUMBER_OF_PROCS=`jobs | wc -l`
@ -462,7 +494,13 @@ get_all_items () {
if [ -z "$INPUT_FILE" ] if [ -z "$INPUT_FILE" ]
then then
ITEMS=`ls -1 $SRC_DIR` if [ ! -z "SSH_SERVER" ] # Are we running stand-alone or as a slave?"
then
ITEMS=`ssh "$SSH_KEY" "$SSH_SERVER" ls -1 $SRC_DIR`
check_status "$FUNCNAME" "Could not list files within remote source directory."
else
ITEMS=`ls -1 $SRC_DIR`
fi
IFS=" IFS="
" "
for x in $ITEMS for x in $ITEMS
@ -472,6 +510,12 @@ get_all_items () {
done done
IFS=$IFS_BACKUP IFS=$IFS_BACKUP
else else
if [ ! -z "SSH_SERVER" ] # Are we running stand-alone or as a slave?"
then
scp "$SSH_KEY" "$SSH_SERVER:~/$INPUT_FILE" >> /dev/null 2>&!
check_status "$FUNCNAME" "Could not copy input file."
fi
exec 10<$INPUT_FILE exec 10<$INPUT_FILE
while read LINE <&10 while read LINE <&10
@ -506,8 +550,7 @@ get_item () {
fi fi
# This variable is used to walk thtough all array items. # This variable is used to walk thtough all array items.
ARRAY_POINTER=`cat "$ARRAY_POINTER_FILE"` ARRAY_POINTER=`exec_cmd "cat $ARRAY_POINTER_FILE"`
# Gives a status update on the current progress.. # Gives a status update on the current progress..
PERCENT=`echo "100 * $ARRAY_POINTER / $SIZE_OF_ARRAY" | bc` PERCENT=`echo "100 * $ARRAY_POINTER / $SIZE_OF_ARRAY" | bc`
@ -527,12 +570,12 @@ get_item () {
if [ -z "$ITEM" ] if [ -z "$ITEM" ]
then then
((ARRAY_POINTER++)) ((ARRAY_POINTER++))
echo $ARRAY_POINTER > $ARRAY_POINTER_FILE exec_cmd "echo $ARRAY_POINTER > $ARRAY_POINTER_FILE"
release_global_lock release_global_lock
get_item get_item
else else
((ARRAY_POINTER++)) ((ARRAY_POINTER++))
echo $ARRAY_POINTER > $ARRAY_POINTER_FILE exec_cmd "echo $ARRAY_POINTER > $ARRAY_POINTER_FILE"
release_global_lock release_global_lock
return 0 return 0
fi fi
@ -576,6 +619,10 @@ commando () {
eval "$EXECME" eval "$EXECME"
fi fi
get_global_lock
scp "$SSH_KEY" "$ITEM_LOG_FILE" "$SSH_SERVER:~/$JOB_LOG"
release_global_lock
start_single_worker start_single_worker
return $? return $?
} }
@ -611,6 +658,7 @@ main () {
init_vars init_vars
log DEBUG "---------------- START ---------------------" log DEBUG "---------------- START ---------------------"
log INFO "$SCRIPT_NAME version $SCRIPT_VERSION" log INFO "$SCRIPT_NAME version $SCRIPT_VERSION"
test_server
get_all_items get_all_items
listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & listen_for_job "$MAX_NO_OF_RUNNING_JOBS" &
LISTENER_PID=$! LISTENER_PID=$!