Update of distributed PPSS reworked based on item locking
This commit is contained in:
parent
ac7649b7eb
commit
f00ad6202b
|
@ -57,6 +57,7 @@ SSH_SERVER="" # Remote server or 'master'.
|
||||||
SSH_KEY="" # SSH key for ssh account.
|
SSH_KEY="" # SSH key for ssh account.
|
||||||
SSH_OPTS="-o BatchMode=yes -o ControlPath=/tmp/master-%r@%h:%p -o ControlMaster=auto -o ConnectTimeout=5"
|
SSH_OPTS="-o BatchMode=yes -o ControlPath=/tmp/master-%r@%h:%p -o ControlMaster=auto -o ConnectTimeout=5"
|
||||||
SSH_MASTER_PID=""
|
SSH_MASTER_PID=""
|
||||||
|
ITEM_LOCK_DIR="PPSS_ITEM_LOCK_DIR"
|
||||||
|
|
||||||
showusage () {
|
showusage () {
|
||||||
|
|
||||||
|
@ -242,7 +243,7 @@ done
|
||||||
# This function makes local and remote operation transparent.
|
# This function makes local and remote operation transparent.
|
||||||
exec_cmd () {
|
exec_cmd () {
|
||||||
|
|
||||||
CMD="eval $1"
|
CMD="$1"
|
||||||
|
|
||||||
if [ ! -z "$SSH_SERVER" ]
|
if [ ! -z "$SSH_SERVER" ]
|
||||||
then
|
then
|
||||||
|
@ -284,7 +285,7 @@ init_vars () {
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
exec_cmd "echo 0 > $ARRAY_POINTER_FILE"
|
echo 0 > $ARRAY_POINTER_FILE
|
||||||
|
|
||||||
FIFO=$(pwd)/fifo-$RANDOM-$RANDOM
|
FIFO=$(pwd)/fifo-$RANDOM-$RANDOM
|
||||||
|
|
||||||
|
@ -466,7 +467,7 @@ random_delay () {
|
||||||
|
|
||||||
global_lock () {
|
global_lock () {
|
||||||
|
|
||||||
exec_cmd "mkdir $GLOBAL_LOCK > /dev/null 2>&1"
|
mkdir $GLOBAL_LOCK > /dev/null 2>&1
|
||||||
ERROR="$?"
|
ERROR="$?"
|
||||||
|
|
||||||
if [ ! "$ERROR" == "0" ]
|
if [ ! "$ERROR" == "0" ]
|
||||||
|
@ -495,12 +496,7 @@ get_global_lock () {
|
||||||
|
|
||||||
release_global_lock () {
|
release_global_lock () {
|
||||||
|
|
||||||
if [ ! -z "$SSH_SERVER" ]
|
rm -rf "$GLOBAL_LOCK"
|
||||||
then
|
|
||||||
exec_cmd "rm -rf $GLOBAL_LOCK"
|
|
||||||
else
|
|
||||||
rm -rf "$GLOBAL_LOCK"
|
|
||||||
fi
|
|
||||||
}
|
}
|
||||||
|
|
||||||
are_jobs_running () {
|
are_jobs_running () {
|
||||||
|
@ -514,6 +510,27 @@ are_jobs_running () {
|
||||||
fi
|
fi
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lock_item () {
|
||||||
|
|
||||||
|
ITEM="$1"
|
||||||
|
LOCK_FILE_NAME=`echo $ITEM | sed s/^\\\.//g |sed s/^\\\.\\\.//g | sed s/\\\///g`
|
||||||
|
ITEM_LOCK_FILE="$ITEM_LOCK_DIR/$LOCK_FILE_NAME"
|
||||||
|
|
||||||
|
exec_cmd "mkdir $ITEM_LOCK_FILE >> /dev/null 2>&1"
|
||||||
|
ERROR="$?"
|
||||||
|
return "$ERROR"
|
||||||
|
}
|
||||||
|
|
||||||
|
release_item () {
|
||||||
|
|
||||||
|
ITEM="$1"
|
||||||
|
|
||||||
|
LOCK_FILE_NAME=`echo $ITEM` # | sed s/^\\.//g | sed s/^\\.\\.//g | sed s/\\\///g`
|
||||||
|
ITEM_LOCK_FILE="$ITEM_LOCK_DIR/$LOCK_FILE_NAME"
|
||||||
|
|
||||||
|
exec_cmd "rm -rf ./$ITEM_LOCK_FILE"
|
||||||
|
}
|
||||||
|
|
||||||
get_all_items () {
|
get_all_items () {
|
||||||
|
|
||||||
count=0
|
count=0
|
||||||
|
@ -576,7 +593,7 @@ get_item () {
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# This variable is used to walk thtough all array items.
|
# This variable is used to walk thtough all array items.
|
||||||
ARRAY_POINTER=`exec_cmd "cat $ARRAY_POINTER_FILE"`
|
ARRAY_POINTER=`cat $ARRAY_POINTER_FILE`
|
||||||
|
|
||||||
# Gives a status update on the current progress..
|
# Gives a status update on the current progress..
|
||||||
PERCENT=`echo "100 * $ARRAY_POINTER / $SIZE_OF_ARRAY" | bc`
|
PERCENT=`echo "100 * $ARRAY_POINTER / $SIZE_OF_ARRAY" | bc`
|
||||||
|
@ -595,14 +612,22 @@ get_item () {
|
||||||
if [ -z "$ITEM" ]
|
if [ -z "$ITEM" ]
|
||||||
then
|
then
|
||||||
((ARRAY_POINTER++))
|
((ARRAY_POINTER++))
|
||||||
exec_cmd "echo $ARRAY_POINTER > $ARRAY_POINTER_FILE"
|
echo $ARRAY_POINTER > $ARRAY_POINTER_FILE
|
||||||
release_global_lock
|
release_global_lock
|
||||||
get_item
|
get_item
|
||||||
else
|
else
|
||||||
((ARRAY_POINTER++))
|
((ARRAY_POINTER++))
|
||||||
exec_cmd "echo $ARRAY_POINTER > $ARRAY_POINTER_FILE"
|
echo $ARRAY_POINTER > $ARRAY_POINTER_FILE
|
||||||
release_global_lock
|
lock_item "$ITEM"
|
||||||
return 0
|
if [ ! "$?" == "0" ]
|
||||||
|
then
|
||||||
|
release_global_lock
|
||||||
|
log INFO "ITEM $ITEM is locked, get next"
|
||||||
|
get_item
|
||||||
|
else
|
||||||
|
release_global_lock
|
||||||
|
return 0
|
||||||
|
fi
|
||||||
fi
|
fi
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -631,25 +656,26 @@ commando () {
|
||||||
ITEM="$SRC_DIR/$ITEM"
|
ITEM="$SRC_DIR/$ITEM"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
LOG_FILE_NAME=`echo $ITEM | sed s/^\\.//g | sed s/^\\.\\.//g | sed s/\\\///g`
|
LOG_FILE_NAME=`echo $ITEM | sed s/^\\\.//g | sed s/^\\\.\\\.//g | sed s/\\\///g`
|
||||||
ITEM_LOG_FILE="$JOB_LOG_DIR/$LOG_FILE_NAME"
|
ITEM_LOG_FILE="$JOB_LOG_DIR/$LOG_FILE_NAME"
|
||||||
|
|
||||||
does_file_exist "./$ITEM_LOG_FILE"
|
does_file_exist "$ITEM_LOG_FILE"
|
||||||
if [ "$?" == "0" ]
|
if [ "$?" == "0" ]
|
||||||
then
|
then
|
||||||
log DEBUG "Skipping item $ITEM - already processed."
|
log DEBUG "Skipping item $ITEM - already processed."
|
||||||
else
|
else
|
||||||
|
|
||||||
EXECME='$COMMAND"$ITEM" > "./$ITEM_LOG_FILE" 2>&1'
|
EXECME='$COMMAND"$ITEM" > "$ITEM_LOG_FILE" 2>&1'
|
||||||
eval "$EXECME"
|
eval "$EXECME"
|
||||||
|
|
||||||
|
release_item "$ITEM"
|
||||||
|
|
||||||
|
if [ ! -z "$SSH_SERVER" ]
|
||||||
|
then
|
||||||
|
scp -q $SSH_OPTS $SSH_KEY $ITEM_LOG_FILE $SSH_SERVER:~/$JOB_LOG_DIR
|
||||||
|
fi
|
||||||
fi
|
fi
|
||||||
|
|
||||||
if [ ! -z "$SSH_SERVER" ]
|
|
||||||
then
|
|
||||||
#get_global_lock
|
|
||||||
scp -q $SSH_OPTS $SSH_KEY $ITEM_LOG_FILE $SSH_SERVER:~/$JOB_LOG_DIR &
|
|
||||||
#release_global_lock
|
|
||||||
fi
|
|
||||||
|
|
||||||
start_single_worker
|
start_single_worker
|
||||||
return $?
|
return $?
|
||||||
|
|
Loading…
Reference in New Issue