Backup, fixing distributed processing. Major rework.

This commit is contained in:
Louwrentius 2010-03-07 15:51:18 +00:00
parent 0f8dced209
commit 54c898f202

308
ppss
View File

@ -42,7 +42,7 @@ done
# export PPSS_DIR=/path/to/workingdir # export PPSS_DIR=/path/to/workingdir
if [ -z "$PPSS_DIR" ] if [ -z "$PPSS_DIR" ]
then then
PPSS_DIR="./ppss_dir" PPSS_DIR="ppss_dir"
fi fi
CONFIG="" CONFIG=""
@ -88,7 +88,7 @@ SSH_OPTS_NOMP="-o BatchMode=yes -o GlobalKnownHostsFile=./known_hosts \
# Blowfish is faster but still secure. # Blowfish is faster but still secure.
SSH_MASTER_PID="" SSH_MASTER_PID=""
ITEM_LOCK_DIR="$PPSS_DIR/PPSS_ITEM_LOCK_DIR" # Remote directory on master used for item locking. ITEM_LOCK_DIR="$PPSS_HOME_DIR/$PPSS_DIR/PPSS_ITEM_LOCK_DIR" # Remote directory on master used for item locking.
PPSS_LOCAL_TMPDIR="$PPSS_DIR/PPSS_LOCAL_TMPDIR" # Local directory on slave for local processing. PPSS_LOCAL_TMPDIR="$PPSS_DIR/PPSS_LOCAL_TMPDIR" # Local directory on slave for local processing.
PPSS_LOCAL_OUTPUT="$PPSS_DIR/PPSS_LOCAL_OUTPUT" # Local directory on slave for local output. PPSS_LOCAL_OUTPUT="$PPSS_DIR/PPSS_LOCAL_OUTPUT" # Local directory on slave for local output.
TRANSFER_TO_SLAVE="0" # Transfer item to slave via (s)cp. TRANSFER_TO_SLAVE="0" # Transfer item to slave via (s)cp.
@ -287,6 +287,7 @@ exec_cmd () {
if [ -z "$NOMP" ] if [ -z "$NOMP" ]
then then
log DEBUG "REMOTE EXEC" log DEBUG "REMOTE EXEC"
log DEBUG "$USER@$SSH_SERVER $CMD"
ssh $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER $CMD ssh $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER $CMD
STATUS=$? STATUS=$?
elif [ "$NOMP" == "1" ] elif [ "$NOMP" == "1" ]
@ -617,6 +618,56 @@ create_working_directory () {
fi fi
} }
expand_str () {
STR=$1
LENGTH=$TYPE_LENGTH
SPACE=" "
while [ "${#STR}" -lt "$LENGTH" ]
do
STR=$STR$SPACE
done
echo "$STR"
}
log () {
# Type 'INFO' is logged to the screen
# Any other log-type is only logged to the logfile.
TYPE="$1"
MESG="$2"
TYPE_LENGTH=5
TYPE_EXP=`expand_str "$TYPE"`
DATE=`date +%b\ %d\ %H:%M:%S`
PREFIX="$DATE: ${TYPE_EXP:0:$TYPE_LENGTH}"
PREFIX_SMALL="$DATE: "
LOG_MSG="$PREFIX $MESG"
ECHO_MSG="$PREFIX_SMALL $MESG"
if [ ! -z "$PPSSDEBUG" ] && [ ! "$PPSSDEBUG" == "0" ]
then
echo -e "$LOG_MSG" >> "$LOGFILE"
elif [ "$TYPE" == "INFO" ] || [ "$TYPE" == "ERROR" ] || [ "$TYPE" == "WARN" ] || [ "$TYPE" == "DSPLY" ]
then
echo -e "$LOG_MSG" >> "$LOGFILE"
fi
if [ "$TYPE" == "DSPLY" ] || [ "$TYPE" == "ERROR" ] || [ "$TYPE" == "WARN" ]
then
echo -e "$ECHO_MSG"
fi
}
# Init all vars # Init all vars
init_vars () { init_vars () {
@ -684,13 +735,18 @@ init_vars () {
get_no_of_cpus $HYPERTHREADING get_no_of_cpus $HYPERTHREADING
fi fi
does_file_exist "$JOB_LOG_DIR" does_file_exist "$PPSS_HOME_DIR"
if [ ! "$?" == "0" ] if [ ! "$?" == "0" ] && [ ! -z "$SSH_SERVER" ]
then then
log DEBUG "Job log directory $JOB_lOG_DIR does not exist. Creating." log DEBUG "Remote PPSS home directory $PPSS_HOME_DIR does not exist. Creating."
exec_cmd "mkdir -p $JOB_LOG_DIR" exec_cmd "mkdir -p $PPSS_HOME_DIR/$PPSS_DIR"
else fi
log DEBUG "Job log directory $JOB_LOG_DIR exists."
does_file_exist "$PPSS_HOME_DIR/$JOB_LOG_DIR"
if [ ! "$?" == "0" ] && [ ! -z "$SSH_SERVER" ]
then
log DEBUG "Remote Job log directory $JOB_lOG_DIR does not exist. Creating."
exec_cmd "mkdir $PPSS_HOME_DIR/$JOB_LOG_DIR"
fi fi
does_file_exist "$ITEM_LOCK_DIR" does_file_exist "$ITEM_LOCK_DIR"
@ -706,7 +762,7 @@ init_vars () {
fi fi
does_file_exist "$REMOTE_OUTPUT_DIR" does_file_exist "$REMOTE_OUTPUT_DIR"
if [ ! "$?" == "0" ] if [ ! "$?" == "0" ] && [ ! -z "$SSH_SERVER" ]
then then
log DEBUG "Remote output dir $REMOTE_OUTPUT_DIR does not exist." log DEBUG "Remote output dir $REMOTE_OUTPUT_DIR does not exist."
exec_cmd "mkdir $REMOTE_OUTPUT_DIR" exec_cmd "mkdir $REMOTE_OUTPUT_DIR"
@ -736,54 +792,6 @@ set_status () {
} }
expand_str () {
STR=$1
LENGTH=$TYPE_LENGTH
SPACE=" "
while [ "${#STR}" -lt "$LENGTH" ]
do
STR=$STR$SPACE
done
echo "$STR"
}
log () {
# Type 'INFO' is logged to the screen
# Any other log-type is only logged to the logfile.
TYPE="$1"
MESG="$2"
TYPE_LENGTH=5
TYPE_EXP=`expand_str "$TYPE"`
DATE=`date +%b\ %d\ %H:%M:%S`
PREFIX="$DATE: ${TYPE_EXP:0:$TYPE_LENGTH}"
PREFIX_SMALL="$DATE: "
LOG_MSG="$PREFIX $MESG"
ECHO_MSG="$PREFIX_SMALL $MESG"
if [ ! -z "$PPSSDEBUG" ] && [ ! "$PPSSDEBUG" == "0" ]
then
echo -e "$LOG_MSG" >> "$LOGFILE"
elif [ "$TYPE" == "INFO" ] || [ "$TYPE" == "ERROR" ] || [ "$TYPE" == "WARN" ] || [ "$TYPE" == "DSPLY" ]
then
echo -e "$LOG_MSG" >> "$LOGFILE"
fi
if [ "$TYPE" == "DSPLY" ] || [ "$TYPE" == "ERROR" ] || [ "$TYPE" == "WARN" ]
then
echo -e "$ECHO_MSG"
fi
}
check_status () { check_status () {
ERROR="$1" ERROR="$1"
@ -960,7 +968,7 @@ deploy_ppss () {
log DEBUG "SSH SERVER $SSH_SERVER is also a node." log DEBUG "SSH SERVER $SSH_SERVER is also a node."
INSTALLED_ON_SSH_SERVER=1 INSTALLED_ON_SSH_SERVER=1
exec_cmd "mkdir -p $PPSS_HOME_DIR/$JOB_LOG_DIR" exec_cmd "mkdir -p $PPSS_HOME_DIR/$JOB_LOG_DIR"
exec_cmd "mkdir -p $ITEM_LOCK_DIR" exec_cmd "mkdir -p $PPSS_HOME_DIR/$ITEM_LOCK_DIR"
fi fi
done done
if [ "$INSTALLED_ON_SSH_SERVER" == "0" ] if [ "$INSTALLED_ON_SSH_SERVER" == "0" ]
@ -968,7 +976,7 @@ deploy_ppss () {
log DEBUG "SSH SERVER $SSH_SERVER is not a node." log DEBUG "SSH SERVER $SSH_SERVER is not a node."
deploy "$SSH_SERVER" deploy "$SSH_SERVER"
exec_cmd "mkdir -p $PPSS_HOME_DIR/$JOB_LOG_DIR" exec_cmd "mkdir -p $PPSS_HOME_DIR/$JOB_LOG_DIR"
exec_cmd "mkdir -p $ITEM_LOCK_DIR" exec_cmd "mkdir -p $PPSS_HOME_DIR/$ITEM_LOCK_DIR"
fi fi
fi fi
} }
@ -988,6 +996,7 @@ test_server () {
exec_cmd "date >> /dev/null" exec_cmd "date >> /dev/null"
check_status "$?" "$FUNCNAME" "Server $SSH_SERVER could not be reached" check_status "$?" "$FUNCNAME" "Server $SSH_SERVER could not be reached"
ssh -N -M $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER & ssh -N -M $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER &
SSH_MASTER_PID="$!" SSH_MASTER_PID="$!"
log DEBUG "SSH Master pid is $SSH_MASTER_PID" log DEBUG "SSH Master pid is $SSH_MASTER_PID"
@ -1176,77 +1185,82 @@ escape_item () {
ITEM_ESCAPED=`echo "$TMP" | \ ITEM_ESCAPED=`echo "$TMP" | \
sed s/\\ /\\\\\\\\\\\\\\ /g | \ sed s/\\ /\\\\\\\\\\\\\\ /g | \
sed s/\\'/\\\\\\\\\\\\\\'/g | \ sed s/\\'/\\\\\\\\\\\\\\'/g | \
sed s/\\\`/\\\\\\\\\\\\\\\`/g | \
sed s/\\|/\\\\\\\\\\\\\\|/g | \ sed s/\\|/\\\\\\\\\\\\\\|/g | \
sed s/\&/\\\\\\\\\\\\\\&/g | \ sed s/\&/\\\\\\\\\\\\\\&/g | \
sed s/\;/\\\\\\\\\\\\\\;/g | \ sed s/\;/\\\\\\\\\\\\\\;/g | \
sed s/\\\//\\\\\\\\\\\\\\ /g | \
sed s/\(/\\\\\\\\\\(/g | \ sed s/\(/\\\\\\\\\\(/g | \
sed s/\)/\\\\\\\\\\)/g ` sed s/\)/\\\\\\\\\\)/g `
} }
#sed s/\\\`/\\\\\\\\\\\\\\\`/g | \
#sed s/\\\//\\\\\\\\\\\\\\ /g | \
download_item () { download_item () {
ITEM="$1" ITEM="$1"
if [ -e "$ITEM" ] VIRTUAL="0"
if [ "$RECURSION" = "1" ]
then then
ITEM_NO_PATH=`basename "$ITEM"` does_file_exist "$ITEM"
DOWNLOAD_ITEM="$ITEM"
else else
escape_item "$ITEM" does_file_exist "$SRC_DIR/$ITEM"
ITEM_NO_PATH="$ITEM_ESCAPED" DOWNLOAD_ITEM="$SRC_DIR/$ITEM"
fi fi
if [ "$TRANSFER_TO_SLAVE" == "1" ] if [ "$?" == "0" ]
then then
log DEBUG "Transfering item $ITEM_NO_PATH from source to local disk." log DEBUG "$FUNCNAME Remote item $ITEM exists"
VIRTUAL=0
else
log DEBUG "$FUNCNAME Remote item $ITEM does NOT exist"
VIRTUAL=1
fi
if [ "$TRANSFER_TO_SLAVE" == "1" ] && [ "$VIRTUAL" == "0" ]
then
log DEBUG "Transfering item $ITEM from source to local disk."
if [ "$SECURE_COPY" == "1" ] && [ ! -z "$SSH_SERVER" ] if [ "$SECURE_COPY" == "1" ] && [ ! -z "$SSH_SERVER" ]
then then
if [ ! -z "$SRC_DIR" ] escape_item "$DOWNLOAD_ITEM"
then log DEBUG "$SSH_SERVER:$ITEM_ESCAPED $PPSS_LOCAL_TMPDIR"
ITEM_PATH="$SRC_DIR/$ITEM" scp -q $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER:"$ITEM_ESCAPED" $PPSS_LOCAL_TMPDIR
else
ITEM_PATH="$ITEM"
fi
escape_item "$ITEM_PATH"
scp -q $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER:"$ITEM_ESCAPED" ./$PPSS_LOCAL_TMPDIR
log DEBUG "Exit code of remote transfer is $?" log DEBUG "Exit code of remote transfer is $?"
else else
cp "$ITEM" ./$PPSS_LOCAL_TMPDIR cp "$ITEM" $PPSS_LOCAL_TMPDIR
log DEBUG "Exit code of local transfer is $?" log DEBUG "Exit code of local transfer is $?"
fi fi
else else
log DEBUG "No transfer of item $ITEM_NO_PATH to local workpath." log DEBUG "No transfer of item $ITEM to local workpath."
fi fi
} }
upload_item () { upload_item () {
ITEM="$1" OUTPUT_ITEM="$1"
ITEMDIR="$2" ITEMDIR="$2"
log DEBUG "Uploading item $ITEM." log DEBUG "Uploading item $OUTPUT_ITEM."
if [ "$SECURE_COPY" == "1" ] if [ "$SECURE_COPY" == "1" ]
then then
escape_item "$REMOTE_OUTPUT_DIR$ITEMDIR" escape_item "$REMOTE_OUTPUT_DIR$ITEMDIR"
DIR_ESCAPED="$ITEM_ESCAPED" DIR_ESCAPED="$ITEM_ESCAPED"
scp -q $SSH_OPTS $SSH_KEY "$ITEM"/* $USER@$SSH_SERVER:"$DIR_ESCAPED" scp -q $SSH_OPTS $SSH_KEY "$OUTPUT_ITEM"/* $USER@$SSH_SERVER:"$DIR_ESCAPED"
ERROR="$?" ERROR="$?"
if [ ! "$ERROR" == "0" ] if [ ! "$ERROR" == "0" ]
then then
log ERROR "Uploading of $ITEM via SCP failed." log ERROR "Uploading of $OUTPUT_ITEM via SCP failed."
else else
log DEBUG "Upload of item $ITEM success" log DEBUG "Upload of item $OUTPUT_ITEM success"
rm -rf ./"$ITEM" rm -rf ./"$OUTPUT_ITEM"
fi fi
else else
cp "$ITEM" "$REMOTE_OUTPUT_DIR" cp "$OUTPUT_ITEM" "$ITEMDIR"
ERROR="$?" ERROR="$?"
if [ ! "$ERROR" == "0" ] if [ ! "$ERROR" == "0" ]
then then
log DEBUG "ERROR - uploading of $ITEM vi CP failed." log DEBUG "ERROR - uploading of $OUTPUT_ITEM vi CP failed."
fi fi
fi fi
} }
@ -1471,6 +1485,8 @@ elapsed () {
commando () { commando () {
log DEBUG "-------------------------------------"
# #
# This function will start a chain reaction of events. # This function will start a chain reaction of events.
# #
@ -1484,27 +1500,64 @@ commando () {
# functions based on the CPU cores available, parallel processing # functions based on the CPU cores available, parallel processing
# is achieved, with a queue for each core. # is achieved, with a queue for each core.
# #
ERR_STATE=0
VIRTUAL=0
#
# This code tests if the item exist (is physical or virtuel)
# Example: a file is physical, a URL is virtual.
#
ITEM="$1" ITEM="$1"
if [ -e "$ITEM" ] if [ "$RECURSION" == "1" ]
then then
DIRNAME=`dirname "$ITEM"` does_file_exist "$ITEM"
ITEM_NO_PATH=`basename "$ITEM"` ERR_STATE="$?"
escape_item "$ITEM_NO_PATH"
OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$ITEM_ESCAPED"
# ^
# | This VAR can be used in scripts or command lines.
#
OUTPUT_FILE="$ITEM_ESCAPED"
else else
DIRNAME="" does_file_exist "$SRC_DIR/$ITEM"
escape_item "$ITEM" ERR_STATE="$?"
ITEM_NO_PATH="$ITEM_ESCAPED"
OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$ITEM_NO_PATH"
fi fi
#
# If recursion is used, a file name of an item may not be unique.
# The same filename can be used for files in differen directories.
# Therefore, the output directory must reflect the original directory
# structure. If recursion is not used, this is not necessary.
#
if [ "$ERR_STATE" == "0" ]
then
VIRTUAL="0"
if [ "$RECURSION" == "1" ]
then
DIR_NAME=`dirname "$ITEM"`
ITEM_NO_PATH=`basename "$ITEM"`
OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$DIR_NAME"
else
DIR_NAME="$SRC_DIR"
ITEM_NO_PATH="$ITEM"
OUTPUT_DIR="$PPSS_LOCAL_OUTPUT"
fi
#
# OUTPUT_DIR can be used in scripts or command lines.
#
else
VIRTUAL="1"
DIR_NAME=""
ITEM_NO_PATH="$ITEM"
escape_item "$ITEM_NO_PATH"
OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$ITEM_ESCAPED"
fi
OUTPUT_FILE="$ITEM_NO_PATH"
log DEBUG "Processing item: $ITEM" log DEBUG "Processing item: $ITEM"
log DEBUG "ITEM_NO_PATH is $ITEM_NO_PATH"
log DEBUG "Dirname is $DIR_NAME"
log DEBUG "OUTPUT DIR IS $OUTPUT_DIR"
log DEBUG "Virtual is $VIRTUAL"
log DEBUG "OUTPUT FILE is $OUTPUT_FILE"
# #
# Decide if an item must be transfered from server to the node. # Decide if an item must be transfered from server to the node.
# or be processed in-place (NFS / SMB mount?) # or be processed in-place (NFS / SMB mount?)
@ -1518,16 +1571,44 @@ commando () {
ITEM="$ITEM" ITEM="$ITEM"
fi fi
else else
ITEM="./$PPSS_LOCAL_TMPDIR/$ITEM_NO_PATH" if [ "$RECURSION" == "1" ]
then
ITEM="$PPSS_LOCAL_TMPDIR/$DIR_NAME/$ITEM_NO_PATH"
else
ITEM="$PPSS_LOCAL_TMPDIR/$ITEM_NO_PATH"
fi
fi fi
#
# Create the log file containing the output of the command.
#
LOG_FILE_NAME=`echo "$ITEM" | sed s/^\\\.//g | sed s/^\\\.\\\.//g | sed s/\\\///g | sed s/\\ /_/g` LOG_FILE_NAME=`echo "$ITEM" | sed s/^\\\.//g | sed s/^\\\.\\\.//g | sed s/\\\///g | sed s/\\ /_/g`
ITEM_LOG_FILE="$JOB_LOG_DIR/$LOG_FILE_NAME" ITEM_LOG_FILE="$JOB_LOG_DIR/$LOG_FILE_NAME"
OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$LOG_FILE_NAME" #
# Create the output directory that will contain the output of the command.
# Example: When converting wav to mp3, the mp3 will be put in this directory.
#
if [ "$VIRTUAL" == "0" ]
then
if [ "$RECURSION" == "1" ]
then
OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$DIR_NAME"/"$ITEM_NO_PATH"
else
OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$ITEM_NO_PATH"
fi
else
#
# If the item is virtual, the item can contain special characters.
# These characters are stripted from the log file name, so this is used.
#
OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$LOG_FILE_NAME"
fi
log DEBUG "Local output dir is $OUTPUT_DIR"
mkdir -p "$OUTPUT_DIR" mkdir -p "$OUTPUT_DIR"
does_file_exist "$ITEM_LOG_FILE" does_file_exist "$ITEM_LOG_FILE"
if [ "$?" == "0" ] if [ "$?" == "0" ]
then then
@ -1557,7 +1638,7 @@ commando () {
ERROR="$?" ERROR="$?"
MYPID="$!" MYPID="$!"
else else
eval '$COMMAND"$ITEM" >> "$ITEM_LOG_FILE" 2>&1' eval '$COMMAND"$ITEM" >> "$ITEM_LOG_FILE" 2>&1'
ERROR="$?" ERROR="$?"
MYPID="$!" MYPID="$!"
fi fi
@ -1581,14 +1662,17 @@ commando () {
then then
if [ -e "$ITEM" ] if [ -e "$ITEM" ]
then then
rm "$ITEM" rm -rf "$ITEM"
else else
log DEBUG "ERROR Something went wrong removing item $ITEM from local work dir." log DEBUG "There is no local file to remove.. strange..."
fi fi
fi fi
escape_item "$DIRNAME" #
# Create remote output dir and transfer output to server.
#
escape_item "$DIR_NAME"
ITEM_OUTPUT_DIR="$REMOTE_OUTPUT_DIR/$ITEM_ESCAPED" ITEM_OUTPUT_DIR="$REMOTE_OUTPUT_DIR/$ITEM_ESCAPED"
if [ "$TRANSFER_TO_SLAVE" == "0" ] if [ "$TRANSFER_TO_SLAVE" == "0" ]
@ -1596,11 +1680,11 @@ commando () {
log DEBUG "File transfer is disabled." log DEBUG "File transfer is disabled."
else else
exec_cmd "mkdir -p $ITEM_OUTPUT_DIR" exec_cmd "mkdir -p $ITEM_OUTPUT_DIR"
if [ "$DIRNAME" == "." ] if [ "$DIR_NAME" == "." ]
then then
DIRNAME="" DIR_NAME=""
fi fi
upload_item "$PPSS_LOCAL_OUTPUT/$ITEM_NO_PATH" "$DIRNAME" upload_item "$OUTPUT_DIR" "$DIR_NAME"
fi fi
@ -1609,8 +1693,8 @@ commando () {
if [ ! -z "$SSH_SERVER" ] if [ ! -z "$SSH_SERVER" ]
then then
log DEBUG "Uploading item log file $ITEM_LOG_FILE to master ~/$PPSS_HOME_DIR/$JOB_LOG_DIR" log DEBUG "Uploading item log file $ITEM_LOG_FILE to master $PPSS_HOME_DIR/$JOB_LOG_DIR"
scp -q $SSH_OPTS $SSH_KEY "$ITEM_LOG_FILE" $USER@$SSH_SERVER:~/$PPSS_HOME_DIR/$JOB_LOG_DIR scp -q $SSH_OPTS $SSH_KEY "$ITEM_LOG_FILE" $USER@$SSH_SERVER:$PPSS_HOME_DIR/$JOB_LOG_DIR
if [ ! "$?" == "0" ] if [ ! "$?" == "0" ]
then then
log DEBUG "Uploading of item log file failed." log DEBUG "Uploading of item log file failed."
@ -1840,8 +1924,8 @@ main () {
case $MODE in case $MODE in
node ) node )
init_vars
test_server test_server
init_vars
get_all_items get_all_items
listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & 2>&1 >> /dev/null listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & 2>&1 >> /dev/null
LISTENER_PID=$! LISTENER_PID=$!