Major rework on distributed mode

This commit is contained in:
louwrentius 2011-12-25 01:03:42 +00:00
parent a908682185
commit dbd8f51590
3 changed files with 127 additions and 207 deletions

265
ppss
View File

@ -26,7 +26,7 @@
trap 'kill_process' SIGINT
SCRIPT_NAME="Distributed Parallel Processing Shell Script"
SCRIPT_VERSION="2.90"
SCRIPT_VERSION="2.95"
#
# The first argument to this script can be a mode.
@ -151,16 +151,35 @@ showusage_short () {
showusage_normal () {
showusage_head
showusage_basic
showusage_basic_example
}
showusage_long () {
showusage_extended_head
showusage_basic
showusage_extended_body
}
showusage_head () {
echo
echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION"
echo
echo "|P|P|S|S| is a Bash shell script that executes commands in parallel on a set"
echo "PPSS is a Bash shell script that executes commands in parallel on a set"
echo "of items, such as files in a directory, or lines in a file. The purpose"
echo "of PPSS is to make it simple to benefit from multiple CPUs or CPU cores."
echo
echo "This short summary only discusses options for stand-alone mode. For a"
echo "full listing of all options, run PPSS with the options --help"
echo
}
showusage_basic () {
echo "Usage $0 [ options ]"
echo
echo -e "--command | -c Command to execute. Syntax: '<command> ' including the single quotes."
@ -205,6 +224,10 @@ showusage_normal () {
echo -e "--debug Enable debugging output to the |P|P|S|S| log file."
echo
echo -e "--help Extended help, including options for distributed mode."
}
showusage_basic_example () {
echo
echo -e "Example: encoding some wav files to mp3 using lame:"
echo
@ -214,12 +237,12 @@ showusage_normal () {
echo
}
showusage_long () {
showusage_extended_head () {
echo
echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION"
echo
echo "|P|P|S|S| is a Bash shell script that executes commands in parallel on a set "
echo "PPSS is a Bash shell script that executes commands in parallel on a set "
echo "of items, such as files in a directory, or lines in a file."
echo
echo "Usage: $0 [ MODE ] [ options ]"
@ -235,66 +258,10 @@ showusage_long () {
echo " stop Stopping PPSS on all nodes."
echo " continue Continuing PPSS on all nodes."
echo " node Running PPSS as a node, requires additional options."
echo
echo "Options are:"
echo
echo -e "--command | -c Command to execute. Syntax: '<command> ' including the single quotes."
echo -e " Example: -c 'ls -alh '. It is also possible to specify where an item "
echo -e " must be inserted: 'cp \"\$ITEM\" /somedir'."
echo
echo -e "--sourcedir | -d Directory that contains files that must be processed. Individual files"
echo -e " are fed as an argument to the command that has been specified with -c."
echo
echo -e "--sourcefile | -f Each single line of the supplied file will be fed as an item to the"
echo -e " command that has been specified with -c. Instead of a file, stdin can"
echo -e " be specified like \"-f -\" in order to 'pipe' items to ppss."
echo -e " Example: cat file | ppss -f - -c 'echo '"
echo
echo -e "--config | -C If the mode is config, a config file with the specified name will be"
echo -e " generated based on all the options specified. In the other modes".
echo -e " this option will result in PPSS reading the config file and start"
echo -e " processing items based on the settings of this file."
echo
echo -e "--disable-ht | -j Disable hyper threading. Is enabled by default."
echo
echo -e "--log | -l Sets the name of the log file. The default is ppss-log-<pid>.txt."
echo
echo -e "--processes | -p Start the specified number of processes. Ignore the number of available"
echo -e " CPUs."
echo
echo -e "--quiet | -q Shows no output except for a progress indication using percents."
echo
echo -e "--delay | -D Adds an initial random delay to the start of all parallel jobs to spread"
echo -e " the load. The delay is only used at the start of all 'threads'."
echo
echo -e "--daemon Do not exit after items are professed, but keep looking for new items"
echo -e " and process them. Read the manual how to use this!"
echo
echo -e "--interval Specifies the polling interval when running in daemon mode. Polls every"
echo -e " x seconds for new items to process."
echo
echo -e "--file-age When not using inotify, specify how many seconds must have passed before"
echo -e " a file may be processed to prevent files being processed while being "
echo -e " written to."
echo
echo -e "--disable-inotify If for some reason, inotify must not be used, use this option to disable"
echo -e " usage of inotify. Regular polling will be used."
echo
echo -e "--enable-input-lock When PPSS is run in daemon mode, create a directory INPUT_LOCK to"
echo -e " signal that items are processed and may not be touched by PPSS."
echo -e " Once this directory is removed, PPSS will start processing items."
echo
echo -e "--no-recursion|-r By default, recursion of directories is enabled when the -d option is "
echo -e " used. If this is not prefered, this can be disabled with this option "
echo -e " Only files within the specified directory will be processed."
echo
echo -e "--md5|-M Use MD5 to create unique file names for locking and log file names."
echo -e " PPSS strips al non [:alnum:] characters of an item string and this may"
echo -e " cause collisions. String ABC!@# and ABC^&* will become both ABC___"
echo
echo -e "--debug Enable debugging output to the |P|P|S|S| log file."
echo
echo
}
showusage_extended_body () {
echo
echo -e "The following options are used for distributed execution of PPSS."
echo
@ -339,8 +306,8 @@ showusage_long () {
echo -e "--script | -S Script to run on the node. PPSS must copy this script to the node."
echo
echo -e "--randomize | -R Randomise which items to process by the client in distributed mode."
echo -e " This makes sure that with many nodes, some clients spend their time"
echo -e " trying to get a lock on an item."
echo -e " This makes sure that with many nodes, it is prevented that some"
echo -e " clients spend all their time trying to get a lock on an item."
echo
echo -e "--no-check Do not check if items are already processed before processing them."
echo -e " This is usefull with a large number of items, since performing this"
@ -499,7 +466,7 @@ is_var_empty () {
if [ -z "$1" ]
then
showusage_normal
showusage_
cleanup
exit 1
fi
@ -895,11 +862,6 @@ get_time_in_seconds () {
set_md5 () {
if [ "$USE_MD5" == "1" ]
then
log DEBUG "MD5 is used."
case $ARCH in
"Darwin") MD5=md5 ;;
"FreeBSD") MD5=md5 ;;
@ -915,9 +877,6 @@ set_md5 () {
else
return 0
fi
else
log DEBUG "MD5 is not used."
fi
}
set_stat () {
@ -1066,7 +1025,7 @@ init_vars () {
log DSPLY "CPU: $MODEL $SPEED"
elif [ "$ARCH" == "SunOS" ]
then
CPU=`psrinfo -v | grep MHz | cut -d " " -f 4,8 | awk '{ printf ("Processor architecture: %s @ %s MHz.\n", $1,$2) }' | head -n 1`
CPU=`/usr/sbin/psrinfo -v | grep MHz | cut -d " " -f 4,8 | awk '{ printf ("Processor architecture: %s @ %s MHz.\n", $1,$2) }' | head -n 1`
log DSPLY "$CPU"
else
log DSPLY "CPU: Cannot determine. Provide a patch for your arch!"
@ -1143,11 +1102,9 @@ init_vars () {
upload_status () {
#log DEBUG "scp $SSH_OPTS $SSH_KEY $NODE_STATUS_FILE $USER@$SSH_SERVER:$PPSS_HOME_DIR/$PPSS_NODE_STATUS/"
# scp -v $SSH_OPTS $SSH_KEY $NODE_STATUS_FILE $USER@$SSH_SERVER:$PPSS_HOME_DIR/$PPSS_NODE_STATUS/ >> scp.tmp 2>&1
if [ -e "$NODE_STATUS_FILE" ]
then
scp -vv -o GlobalKnownHostsFile=./known_hosts -i ppss-key.dsa $NODE_STATUS_FILE $USER@$SSH_SERVER:$PPSS_HOME_DIR/$PPSS_NODE_STATUS/ >> scp.tmp 2>&1
scp -q -o GlobalKnownHostsFile=./known_hosts -i ppss-key.dsa $NODE_STATUS_FILE $USER@$SSH_SERVER:$PPSS_HOME_DIR/$PPSS_NODE_STATUS/
if [ "$?" == "0" ]
then
log DEBUG "Uploaded status to server ok."
@ -1642,6 +1599,7 @@ download_item () {
ITEM="$1"
VIRTUAL="0"
ERR_STATE="0"
HASH=""
if [ "$RECURSION" = "1" ]
then
@ -1649,7 +1607,8 @@ download_item () {
does_file_exist "$ITEM_ESCAPED"
ERR_STATE="$?"
DOWNLOAD_ITEM="$ITEM"
LOCAL_DIR=`dirname "$DOWNLOAD_ITEM"`
HASH=`echo "$DOWNLOAD_ITEM" | $MD5 | awk '{ print $1 }'`
LOCAL_DIR="$HASH"
else
escape_item "$ITEM"
does_file_exist "$SRC_DIR/$ITEM_ESCAPED"
@ -1678,13 +1637,12 @@ download_item () {
mkdir -p "$PPSS_LOCAL_TMPDIR/$LOCAL_DIR"
log DEBUG "$SSH_SERVER:$ITEM_ESCAPED $PPSS_LOCAL_TMPDIR/$LOCAL_DIR"
scp -q $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER:"$ITEM_ESCAPED" ./$PPSS_LOCAL_TMPDIR/"$LOCAL_DIR"
log DEBUG "Exit code of remote transfer is $?"
else
escape_item "$DOWNLOAD_ITEM"
log DEBUG "$SSH_SERVER:$ITEM_ESCAPED $PPSS_LOCAL_TMPDIR"
scp -q $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER:"$ITEM_ESCAPED" $PPSS_LOCAL_TMPDIR
log DEBUG "Exit code of remote transfer is $?"
fi
log DEBUG "Exit code of remote download transfer is $?"
else
cp "$ITEM" $PPSS_LOCAL_TMPDIR
log DEBUG "Exit code of local transfer is $?"
@ -1701,37 +1659,52 @@ upload_item () {
log DEBUG "Upload to server is disabled."
return 1
fi
log DEBUG "Upload from node to server - $1 to $2"
OUTPUT_ITEM="$1"
ITEMDIR="$2"
LOCAL_SRC_DIR="$1" # the local source directory where the file resides
REMOTE_DEST_DIR="$2" # to recreate the directory structure
DESTINATION=""
log DEBUG "Uploading item $OUTPUT_ITEM."
if [ "$SECURE_COPY" == "1" ]
then
if [ "$RECURSION" = "1" ]
then
escape_item "$REMOTE_OUTPUT_DIR$ITEMDIR"
log DEBUG "Recursive copy is enabled."
log DEBUG "REMOTE_OUTPUT_DIR = $REMOTE_OUTPUT_DIR"
log DEBUG "REMOTE DEST DIR = $REMOTE_DEST_DIR"
TMP_DESTINATION="$REMOTE_OUTPUT_DIR/$REMOTE_DEST_DIR"
else
escape_item "$REMOTE_OUTPUT_DIR"
log DEBUG "Recursive copy is disabled."
TMP_DESTINATION="$REMOTE_OUTPUT_DIR"
fi
DIR_ESCAPED="$ITEM_ESCAPED"
exec_cmd "mkdir -p $DIR_ESCAPED"
scp -q $SSH_OPTS $SSH_KEY "$OUTPUT_ITEM"/* $USER@$SSH_SERVER:"$DIR_ESCAPED"
exec_cmd "mkdir -p \"$TMP_DESTINATION\""
escape_item "$TMP_DESTINATION"
DESTINATION="$ITEM_ESCAPED"
log DEBUG "Destination = $DESTINATION"
log DEBUG "Uploading item $LOCAL_SRC_DIR."
if [ "$SECURE_COPY" == "1" ]
then
log DEBUG "Secure copy is enabled."
log DEBUG "Copy contents of $LOCAL_SRC_DIR to server at $DESTINATION"
log DEBUG "scp $SSH_OPTS $SSH_KEY $LOCAL_SRC_DIR/* $USER@$SSH_SERVER:$DESTINATION"
scp $SSH_OPTS $SSH_KEY $LOCAL_SRC_DIR/* $USER@$SSH_SERVER:"$DESTINATION" >> scplog.txt 2>&1
ERROR="$?"
if [ ! "$ERROR" == "0" ]
then
log ERROR "Uploading of $OUTPUT_ITEM via SCP failed."
log ERROR "Uploading of $LOCAL_SRC_DIR via SCP to $DIR_ESCAPED failed."
else
log DEBUG "Upload of item $OUTPUT_ITEM success"
rm -rf ./"$OUTPUT_ITEM"
log DEBUG "Upload of item $LOCAL_SRC_DIR to $DIR_ESCAPED success"
rm -rf ./"$LOCAL_SRC_DIR"
fi
else
cp "$OUTPUT_ITEM" "$ITEMDIR"
ERROR="$?"
if [ ! "$ERROR" == "0" ]
log DEBUG "Secure copy is disabled."
log DEBUG `pwd`
cp "$LOCAL_SRC_DIR"/* "$TMP_DESTINATION" >> scplog.txt 2>&1
log DEBUG "cp $LOCAL_SRC_DIR/* $DESTINATION >> scplog.txt 2>&1"
if [ ! "$?" == "0" ]
then
log DEBUG "ERROR - uploading of $OUTPUT_ITEM vi CP failed."
log DEBUG "ERROR - uploading of $LOCAL_SRC_DIR via CP to $DESTINATION failed."
fi
fi
}
@ -1810,8 +1783,6 @@ remove_processed_items_from_input_file () {
# Processed items have a lock dir in the PPPSS_ITEM_LOCK_DIR.
#
log DSPLY "Removing processed items from list..."
if [ -e "$LIST_OF_PROCESSED_ITEMS" ]
then
PROCESSED_ITEMS=`cat $LIST_OF_PROCESSED_ITEMS`
@ -1857,8 +1828,6 @@ remove_processed_items_from_input_file () {
get_all_items () {
log DSPLY "Creating a list of all items to process..."
if [ "$DAEMON" == "1" ] && [ "$INOTIFY" = "0" ] && [ "$ENABLE_INPUT_LOCK" = "1" ]
then
get_input_lock
@ -2060,7 +2029,6 @@ get_item () {
get_item
else
log DEBUG "Got lock on $ITEM"
download_item "$ITEM"
return 0
fi
else
@ -2144,46 +2112,55 @@ commando () {
# Example: a file is physical, a URL is virtual.
#
ITEM="$1"
ORIG_ITEM="$ITEM"
#log DEBUG "$FUNCNAME is processing item $ITEM"
if [ "$RECURSION" == "1" ]
then
escape_item "$ITEM"
does_file_exist "$ITEM_ESCAPED"
does_file_exist "$ITEM_ESCAPED" # The item contains the full path.
ERR_STATE="$?"
else
escape_item "$ITEM"
does_file_exist "$SRC_DIR/$ITEM_ESCAPED"
does_file_exist "$SRC_DIR/$ITEM_ESCAPED" # The item is only the file name itself.
ERR_STATE="$?"
fi
#
# If recursion is used, a file name of an item may not be unique.
# The same filename can be used for files in differen directories.
# Therefore, the output directory must reflect the original directory
# structure. If recursion is not used, this is not necessary.
#
# Further more, if the item is not a real world object but a string of sort
# the item is considered virtual. No recursion.
#
if [ "$ERR_STATE" == "0" ]
then
VIRTUAL="0"
if [ "$RECURSION" == "1" ]
then
DIR_NAME=`dirname "$ITEM"`
ITEM_NO_PATH=`basename "$ITEM"`
OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$DIR_NAME"
ITEM_DIR_NAME=`dirname "$ITEM" | sed s:$SRC_DIR::g`
ITEM_BASE_NAME=`basename "$ITEM"`
HASH=`echo "$ITEM" | $MD5 | awk '{ print $1 }'`
OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$HASH"
else
DIR_NAME="$SRC_DIR"
ITEM_NO_PATH="$ITEM"
OUTPUT_DIR="$PPSS_LOCAL_OUTPUT"
ITEM_DIR_NAME="$SRC_DIR"
ITEM_BASE_NAME="$ITEM"
escape_item "$ITEM_BASE_NAME"
OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$ITEM_ESCAPED"
fi
else
VIRTUAL="1"
DIR_NAME=""
ITEM_NO_PATH="$ITEM"
escape_item "$ITEM_NO_PATH"
ITEM_DIR_NAME=""
ITEM_BASE_NAME="$ITEM"
escape_item "$ITEM_BASE_NAME"
OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$ITEM_ESCAPED"
fi
OUTPUT_FILE="$ITEM_NO_PATH"
OUTPUT_FILE="$ITEM_BASE_NAME"
#
# Decide if an item must be transfered from server to the node.
@ -2204,11 +2181,12 @@ commando () {
fi
fi
else
download_item "$ITEM"
if [ "$RECURSION" == "1" ]
then
ITEM="$PPSS_LOCAL_TMPDIR/$ITEM"
ITEM="$PPSS_LOCAL_TMPDIR/$HASH/$ITEM_BASE_NAME"
else
ITEM="$PPSS_LOCAL_TMPDIR/$ITEM_NO_PATH"
ITEM="$PPSS_LOCAL_TMPDIR/$ITEM_BASE_NAME"
fi
fi
@ -2231,32 +2209,12 @@ commando () {
return 0
fi
#
# Create the output directory that will contain the output of the command.
# Example: When converting wav to mp3, the mp3 will be put in this directory.
#
if [ "$VIRTUAL" == "0" ]
then
if [ "$RECURSION" == "1" ]
then
OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$DIR_NAME"/
else
OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$ITEM_NO_PATH"
fi
else
#
# If the item is virtual, the item can contain special characters.
# These characters are stripted from the log file name, so this is used.
#
OUTPUT_DIR="$PPSS_LOCAL_OUTPUT"
fi
#
# Create the local output directory.
#
if [ ! -z "$OUTPUT_DIR" ]
then
log DEBUG "Local output dir is $OUTPUT_DIR"
log DEBUG "Creating local output dir $OUTPUT_DIR"
mkdir -p "$OUTPUT_DIR"
fi
@ -2319,26 +2277,21 @@ commando () {
fi
#
# Create remote output dir and transfer output to server.
#
escape_item "$DIR_NAME"
ITEM_OUTPUT_DIR="$REMOTE_OUTPUT_DIR/$ITEM_ESCAPED"
if [ "$DOWNLOAD_TO_NODE" == "0" ]
then
log DEBUG "Download to node is disabled."
else
if [ "$DIR_NAME" == "." ]
then
DIR_NAME=""
fi
fi
#
# Upload the output file back to the server.
#
upload_item "$OUTPUT_DIR" "$DIR_NAME"
if [ "$UPLOAD_TO_SERVER" == "1" ]
then
log DEBUG "Upload $OUTPUT_DIR and ITEM DIR NAME IS $ITEM_DIR_NAME"
if [ "$ITEM_DIR_NAME" == "." ]
then
ITEM_DIR_NAME=""
fi
upload_item "$OUTPUT_DIR" "$ITEM_DIR_NAME" # local output directory / original item dir path
else
log DEBUG "Uploading disabled."
fi
#
# Upload the log file to the server.

View File

@ -1,14 +1,14 @@
REMOTE_OUTPUT_DIR=/storage/mp3
REMOTE_OUTPUT_DIR=/mnt/mp3
SSH_KEY=ppss-key.dsa
SSH_KNOWN_HOSTS=known_hosts
SRC_DIR=/storage/wav
COMMAND='./wav2mp3.sh "$ITEM" /storage/mp3'
SRC_DIR=/mnt/wav
COMMAND='./wav2mp3.sh "$ITEM" "$OUTPUT_DIR"'
NODES_FILE=nodes.txt
SSH_SERVER=10.0.1.110
USER=ppss
SCRIPT=wav2mp3.sh
RANDOMIZE=1
DOWNLOAD_TO_NODE=1
DOWNLOAD_TO_NODE=0
UPLOAD_TO_SERVER=1
SECURE_COPY=0
PPSS_DEBUG=1
PPSS_LOCAL_TMPDIR=ppss_dir/PPSS_LOCAL_TMPDIR
PPSS_LOCAL_OUTPUT=ppss_dir/PPSS_LOCAL_OUTPUT

View File

@ -1,31 +1,10 @@
#!/usr/bin/env bash
INPUT="$1"
SRC="$1"
DEST="$2"
LAMEOPTS=""
function usage () {
echo
echo "Usage: $0 <wav file name>"
echo
exit 1
}
if [ -z "$INPUT" ]
then
usage
fi
if [ ! -e "$INPUT" ]
then
echo "File $INPUT does not exist!"
exit 1
fi
TYPE=`file -b "$INPUT"`
TYPE=`file -b "$SRC"`
RES=`echo "$TYPE" | grep "WAVE audio"`
if [ ! "$?" == "0" ]
then
@ -34,19 +13,7 @@ then
exit 0
fi
function convert () {
FILE="$1"
MP3FILE="`echo ${FILE%wav}mp3`"
RAWDIR=`dirname "$MP3FILE"`
DIR="$DEST/$RAWDIR"
BASENAME=`basename "$MP3FILE"`
mkdir -p "$DIR"
lame --quiet --preset insane "$FILE" "$DIR/$BASENAME"
return $?
}
convert "$INPUT"
BASENAME=`basename "$SRC"`
MP3FILE="`echo ${BASENAME%wav}mp3`"
lame --quiet --preset insane "$SRC" "$DEST/$MP3FILE"
exit "$?"