Removed all file locking, except for remote item locking.

This commit is contained in:
Louwrentius 2010-06-20 21:22:19 +00:00
parent b922bd40e5
commit 6836784b54

253
ppss
View File

@ -64,7 +64,7 @@ PAUSE_DELAY="60" # Polling every 1 minu
STOP_SIGNAL="$PPSS_HOME_DIR/$PPSS_DIR/stop_signal" # Stop processing if this file is present. STOP_SIGNAL="$PPSS_HOME_DIR/$PPSS_DIR/stop_signal" # Stop processing if this file is present.
GLOBAL_COUNTER="" GLOBAL_COUNTER=""
GLOBAL_COUNTER_FILE="$PPSS_DIR/ppss-input-counter-$PID" GLOBAL_COUNTER_FILE="$PPSS_DIR/ppss-input-counter-$PID"
LOCAL_INPUT_FILE="$PPSS_DIR/INPUT_FILE-$PID" LISTOFITEMS="$PPSS_DIR/INPUT_FILE-$PID"
JOB_LOG_DIR="$PPSS_DIR/job_log" # Directory containing log files of processed items. JOB_LOG_DIR="$PPSS_DIR/job_log" # Directory containing log files of processed items.
LOGFILE="$PPSS_DIR/ppss-log-$PID.txt" # General PPSS log file. Contains lots of info. LOGFILE="$PPSS_DIR/ppss-log-$PID.txt" # General PPSS log file. Contains lots of info.
QUIET="0" QUIET="0"
@ -76,6 +76,7 @@ LISTENER_PID=""
IFS_BACKUP="$IFS" IFS_BACKUP="$IFS"
CPUINFO="/proc/cpuinfo" CPUINFO="/proc/cpuinfo"
PROCESSORS="" PROCESSORS=""
START_KEY="$RANDOM$RANDOM$RANDOM"
STOP_KEY="$RANDOM$RANDOM$RANDOM" # This is a signal to the listener to stop. STOP_KEY="$RANDOM$RANDOM$RANDOM" # This is a signal to the listener to stop.
KILL_KEY="$RANDOM$RANDOM$RANDOM" # This is a signal to stop immediately and kill KILL_KEY="$RANDOM$RANDOM$RANDOM" # This is a signal to stop immediately and kill
RECURSION="1" # all running processes. RECURSION="1" # all running processes.
@ -108,6 +109,7 @@ SECURE_COPY="1" # If set, use SCP, Otherwise, use cp.
REMOTE_OUTPUT_DIR="" # Remote directory to which output must be uploaded. REMOTE_OUTPUT_DIR="" # Remote directory to which output must be uploaded.
SCRIPT="" # Custom user script that is executed by ppss. SCRIPT="" # Custom user script that is executed by ppss.
ITEM_ESCAPED="" ITEM_ESCAPED=""
DISABLE_SKIPPING=0
NODE_STATUS="$PPSS_DIR/status.txt" NODE_STATUS="$PPSS_DIR/status.txt"
DAEMON=0 DAEMON=0
@ -710,16 +712,15 @@ process_arguments () {
then then
showusage_short showusage_short
echo echo
log ERROR "No source file or directory specified with -f or -d." echo "No source file or directory specified with -f or -d."
cleanup
exit 1 exit 1
fi fi
if [ ! -e "$SRC_DIR" ] && [ -z "$MODE" ] if [ ! -e "$SRC_DIR" ] && [ -z "$MODE" ] && [ -z "$INPUT_FILE" ]
then then
showusage_short showusage_short
echo echo
log ERROR "Source directory $SRC_DIR does not exist." echo "Source directory $SRC_DIR does not exist."
exit 1 exit 1
fi fi
@ -863,8 +864,6 @@ log () {
fi fi
} }
# Init all vars
init_vars () { init_vars () {
# #
@ -1006,7 +1005,6 @@ set_status () {
echo "$HOSTNAME $STATUS" > "$NODE_STATUS" echo "$HOSTNAME $STATUS" > "$NODE_STATUS"
} }
check_status () { check_status () {
ERROR="$1" ERROR="$1"
@ -1362,52 +1360,6 @@ random_delay () {
fi fi
} }
global_lock () {
mkdir $GLOBAL_LOCK > /dev/null 2>&1
ERROR="$?"
if [ ! "$ERROR" == "0" ]
then
return 1
else
return 0
fi
}
get_global_lock () {
while true
do
global_lock
ERROR="$?"
if [ ! "$ERROR" == "0" ]
then
#random_delay $MAX_LOCK_DELAY
continue
else
break
fi
done
}
release_global_lock () {
rm -rf "$GLOBAL_LOCK"
}
are_jobs_running () {
NUMBER_OF_PROCS=`jobs | wc -l`
if [ "$NUMBER_OF_PROCS" -gt "1" ]
then
return 0
else
return 1
fi
}
escape_item () { escape_item () {
TMP="$1" TMP="$1"
@ -1574,9 +1526,9 @@ get_all_items () {
get_input_lock get_input_lock
fi fi
if [ -e "$LOCAL_INPUT_FILE" ] && are_we_sourced if [ -e "$LISTOFITEMS" ] && are_we_sourced
then then
rm "$LOCAL_INPUT_FILE" rm "$LISTOFITEMS"
fi fi
count=0 count=0
@ -1587,11 +1539,11 @@ get_all_items () {
then then
if [ "$RECURSION" == "1" ] if [ "$RECURSION" == "1" ]
then then
`exec_cmd "find $SRC_DIR/ ! -type d" > "$LOCAL_INPUT_FILE"` `exec_cmd "find $SRC_DIR/ ! -type d" > "$LISTOFITEMS"`
check_status "$?" "$FUNCNAME" "Could not list files within remote source directory." check_status "$?" "$FUNCNAME" "Could not list files within remote source directory."
else else
log DEBUG "Recursion is disabled." log DEBUG "Recursion is disabled."
`exec_cmd "find $SRC_DIR/ -d 1 ! -type d" > "$LOCAL_INPUT_FILE"` `exec_cmd "find $SRC_DIR/ -d 1 ! -type d" > "$LISTOFITEMS"`
check_status "$?" "$FUNCNAME" "Could not list files within remote source directory." check_status "$?" "$FUNCNAME" "Could not list files within remote source directory."
fi fi
else else
@ -1600,14 +1552,14 @@ get_all_items () {
if [ "$RECURSION" == "1" ] if [ "$RECURSION" == "1" ]
then then
log DEBUG "Recursion is enabled." log DEBUG "Recursion is enabled."
`find "$SRC_DIR"/ ! -type d >> "$LOCAL_INPUT_FILE"` `find "$SRC_DIR"/ ! -type d >> "$LISTOFITEMS"`
check_status "$?" "$FUNCNAME" "Could not list files within local source directory." check_status "$?" "$FUNCNAME" "Could not list files within local source directory."
else else
log DEBUG "Recursion is disabled." log DEBUG "Recursion is disabled."
`find "$SRC_DIR"/ -d 1 ! -type d >> "$LOCAL_INPUT_FILE"` `find "$SRC_DIR"/ -d 1 ! -type d >> "$LISTOFITEMS"`
check_status "$?" "$FUNCNAME" "Could not list files within local source directory." check_status "$?" "$FUNCNAME" "Could not list files within local source directory."
fi fi
if [ ! -e "$LOCAL_INPUT_FILE" ] if [ ! -e "$LISTOFITEMS" ]
then then
log ERROR "Local input file is not created, something is wrong. Bug?" log ERROR "Local input file is not created, something is wrong. Bug?"
set_status "ERROR" set_status "ERROR"
@ -1634,13 +1586,13 @@ get_all_items () {
if [ ! "$INPUT_FILE" == "-" ] if [ ! "$INPUT_FILE" == "-" ]
then then
cp "$INPUT_FILE" "$LOCAL_INPUT_FILE" cp "$INPUT_FILE" "$LISTOFITEMS"
check_status "$?" "$FUNCNAME" "Copy of input file failed!" check_status "$?" "$FUNCNAME" "Copy of input file failed!"
else else
log DEBUG "Reading from stdin.." log DEBUG "Reading from stdin.."
while read LINE while read LINE
do do
echo "$LINE" >> "$LOCAL_INPUT_FILE" echo "$LINE" >> "$LISTOFITEMS"
done done
fi fi
fi fi
@ -1650,7 +1602,7 @@ get_all_items () {
release_input_lock release_input_lock
fi fi
SIZE_OF_INPUT=$(wc -l "$LOCAL_INPUT_FILE" | awk '{ print $1 }') SIZE_OF_INPUT=$(wc -l "$LISTOFITEMS" | awk '{ print $1 }')
if [ "$SIZE_OF_INPUT" -le "0" ] if [ "$SIZE_OF_INPUT" -le "0" ]
then then
log ERROR "Source file/dir seems to be empty." log ERROR "Source file/dir seems to be empty."
@ -1658,6 +1610,7 @@ get_all_items () {
cleanup cleanup
exit 1 exit 1
fi fi
} }
get_item () { get_item () {
@ -1669,17 +1622,14 @@ get_item () {
return 1 return 1
fi fi
get_global_lock SIZE_OF_INPUT=$(wc -l "$LISTOFITEMS" | awk '{ print $1 }')
SIZE_OF_INPUT=$(wc -l "$LOCAL_INPUT_FILE" | awk '{ print $1 }')
# #
# Return error if the list is empty. # Return error if the list is empty.
# #
if [ "$SIZE_OF_INPUT" -le "0" ] #if [ "$SIZE_OF_INPUT" -le "0" ]
then #then
release_global_lock # return 1
return 1 #fi
fi
# #
# This variable is used to walk thtough all input file items. # This variable is used to walk thtough all input file items.
@ -1689,21 +1639,17 @@ get_item () {
# #
# Check if all items have been processed. # Check if all items have been processed.
# #
if [ "$GLOBAL_COUNTER" -gt "$SIZE_OF_INPUT" ] #if [ "$GLOBAL_COUNTER" -gt "$SIZE_OF_INPUT" ]
then #then
release_global_lock # return 1
return 1 #fi
fi
ITEM="$(sed -n $GLOBAL_COUNTER\p $LOCAL_INPUT_FILE)" ITEM="$(sed -n $GLOBAL_COUNTER\p $LISTOFITEMS)"
if [ -z "$ITEM" ] if [ -z "$ITEM" ]
then then
((GLOBAL_COUNTER++))
log DEBUG "Item was emtpy..." log DEBUG "Item was emtpy..."
echo $GLOBAL_COUNTER > $GLOBAL_COUNTER_FILE return 1
release_global_lock
get_item
else else
((GLOBAL_COUNTER++)) ((GLOBAL_COUNTER++))
echo $GLOBAL_COUNTER > $GLOBAL_COUNTER_FILE echo $GLOBAL_COUNTER > $GLOBAL_COUNTER_FILE
@ -1713,20 +1659,18 @@ get_item () {
lock_item "$ITEM" lock_item "$ITEM"
LOCK="$?" LOCK="$?"
else else
log DEBUG "Item lock disabled." log DEBUG "Item lock disabled - no remote server configured."
LOCK=0 LOCK=0
fi fi
if [ ! "$LOCK" = "0" ] if [ ! "$LOCK" = "0" ]
then then
log DEBUG "Item $ITEM is locked." log DEBUG "Item $ITEM is locked."
release_global_lock
# #
# Recursion, get_ttem calls itself, until all items are done. # Recursion, get_ttem calls itself, until all items are done.
# #
get_item get_item
else else
log DEBUG "Got lock on $ITEM, processing." log DEBUG "Got lock on $ITEM, processing."
release_global_lock
download_item "$ITEM" download_item "$ITEM"
return 0 return 0
fi fi
@ -1736,27 +1680,12 @@ get_item () {
start_single_worker () { start_single_worker () {
# #
# This function sends an item to the fifo. This signals # This function kicks the listener to start a worker process.
# the listener process to execute a 'worker' on this
# item, using the 'commando' function.
# #
get_item if ! are_we_sourced
ERROR=$?
if [ ! "$ERROR" == "0" ]
then then
# echo "$START_KEY" >> "$FIFO"
# If no more items are available, the listener should be return $?
# informed that a worker just finished / died.
# This allows the listener to determine if all processes
# are finished and it is time to stop.
#
echo "$STOP_KEY" > $FIFO
return 1
else
get_global_lock
echo "$ITEM" > $FIFO
release_global_lock
return 0
fi fi
} }
@ -1766,7 +1695,6 @@ stop-ppss () {
elapsed "$START_PPSS" "$STOP_PPSS" elapsed "$START_PPSS" "$STOP_PPSS"
} }
elapsed () { elapsed () {
BEFORE="$1" BEFORE="$1"
@ -1825,8 +1753,6 @@ commando () {
# Therefore, the output directory must reflect the original directory # Therefore, the output directory must reflect the original directory
# structure. If recursion is not used, this is not necessary. # structure. If recursion is not used, this is not necessary.
# #
if [ "$ERR_STATE" == "0" ] if [ "$ERR_STATE" == "0" ]
then then
VIRTUAL="0" VIRTUAL="0"
@ -1850,17 +1776,6 @@ commando () {
OUTPUT_FILE="$ITEM_NO_PATH" OUTPUT_FILE="$ITEM_NO_PATH"
#
# The following lines should only be enabled for debugging.
#
#log DEBUG "Processing item: $ITEM"
#log DEBUG "ITEM_NO_PATH is $ITEM_NO_PATH"
#log DEBUG "Dirname is $DIR_NAME"
#log DEBUG "OUTPUT DIR IS $OUTPUT_DIR"
#log DEBUG "Virtual is $VIRTUAL"
#log DEBUG "OUTPUT FILE is $OUTPUT_FILE"
#
# #
# Decide if an item must be transfered from server to the node. # Decide if an item must be transfered from server to the node.
# or be processed in-place (NFS / SMB mount?) # or be processed in-place (NFS / SMB mount?)
@ -1895,6 +1810,13 @@ commando () {
LOG_FILE_NAME=`echo "$ITEM" | $MD5 | awk '{ print $1 }'` LOG_FILE_NAME=`echo "$ITEM" | $MD5 | awk '{ print $1 }'`
ITEM_LOG_FILE="$JOB_LOG_DIR/$LOG_FILE_NAME" ITEM_LOG_FILE="$JOB_LOG_DIR/$LOG_FILE_NAME"
if [ -e "$ITEM_LOG_FILE" ] && [ "$DISABLE_SKIPPING" = "0" ]
then
log DEBUG "Item is already processed, skipping..."
start_single_worker
return 0
fi
# #
# Create the output directory that will contain the output of the command. # Create the output directory that will contain the output of the command.
# Example: When converting wav to mp3, the mp3 will be put in this directory. # Example: When converting wav to mp3, the mp3 will be put in this directory.
@ -1941,9 +1863,11 @@ commando () {
# the -c option. # the -c option.
# #
BEFORE=`get_time_in_seconds` BEFORE=`get_time_in_seconds`
TMP=`echo $COMMAND | grep -i '$ITEM'` `echo $COMMAND | grep -i '$ITEM' >> /dev/null 2>&1`
if [ "$?" ] RETVAL="$?"
if [ "$RETVAL" = "0" ]
then then
echo "$TMP - $RETVAL" >> /tmp/hoeba
eval "$COMMAND" >> "$ITEM_LOG_FILE" 2>&1 eval "$COMMAND" >> "$ITEM_LOG_FILE" 2>&1
ERROR="$?" ERROR="$?"
MYPID="$!" MYPID="$!"
@ -1998,7 +1922,6 @@ commando () {
# #
# Upload the output file back to the server. # Upload the output file back to the server.
# #
upload_item "$OUTPUT_DIR" "$DIR_NAME" upload_item "$OUTPUT_DIR" "$DIR_NAME"
# #
@ -2018,61 +1941,55 @@ commando () {
fi fi
fi fi
if ! are_we_sourced start_single_worker
then
start_single_worker
fi
return $?
} }
#
# This is the listener service. It listens on the pipe for events.
# A job is executed for every event received.
# This listener enables fully asynchronous processing.
#
listen_for_job () { listen_for_job () {
FINISHED=0 FINISHED=0
DIED=0 DIED=0
PIDS="" PIDS=""
log DEBUG "Listener started." log DEBUG "Listener started."
while read event <& 42 while read event <& 42
do do
if [ "$event" == "$STOP_KEY" ] if [ "$event" = "$START_KEY" ]
then then
# if get_item
# The start_single_worker method sends a special signal to
# inform the listener that a worker is finished.
# If all workers are finished, it is time to stop.
# This mechanism makes PPSS asynchronous.
#
((DIED++))
if [ "$DIED" -ge "$MAX_NO_OF_RUNNING_JOBS" ]
then then
if [ "$DAEMON" == "1" ] commando "$ITEM" &
then MYPID="$!"
# disown
# In daemon mode, start all over again. PIDS="$PIDS $MYPID"
#
DIED=0
get_all_items
log DEBUG "Found $SIZE_OF_INPUT items."
start_all_workers
sleep 10
else
break
fi
else else
RES=$((MAX_NO_OF_RUNNING_JOBS-DIED)) ((DIED++))
if [ "$RES" == "1" ] && [ "$QUIET" == "0" ] if [ "$DIED" -ge "$MAX_NO_OF_RUNNING_JOBS" ]
then then
log PRCNT "$((MAX_NO_OF_RUNNING_JOBS-DIED)) job is remaining. " if [ "$DAEMON" == "1" ]
elif [ "$QUIET" == "0" ]
then
if [ "$DIED" == "1" ]
then then
echo -en "\n" #
# In daemon mode, start all over again.
#
DIED=0
get_all_items
log DEBUG "Found $SIZE_OF_INPUT items."
start_all_workers
sleep 10
else
break
fi
else
RES=$((MAX_NO_OF_RUNNING_JOBS-DIED))
if [ "$RES" == "1" ] && [ "$QUIET" == "0" ]
then
log PRCNT "$((MAX_NO_OF_RUNNING_JOBS-DIED)) job is remaining. "
elif [ "$QUIET" == "0" ]
then
if [ "$DIED" == "1" ]
then
echo -en "\n"
fi
log PRCNT "$((MAX_NO_OF_RUNNING_JOBS-DIED)) jobs are remaining. "
fi fi
log PRCNT "$((MAX_NO_OF_RUNNING_JOBS-DIED)) jobs are remaining. "
fi fi
fi fi
elif [ "$event" == "$KILL_KEY" ] elif [ "$event" == "$KILL_KEY" ]
@ -2107,17 +2024,10 @@ listen_for_job () {
done done
IFS=$oldIFS IFS=$oldIFS
break break
else
commando "$event" &
MYPID="$!"
disown
PIDS="$PIDS $MYPID"
fi fi
get_global_lock SIZE_OF_INPUT=$(wc -l "$LISTOFITEMS" | awk '{ print $1 }')
SIZE_OF_INPUT=$(wc -l "$LOCAL_INPUT_FILE" | awk '{ print $1 }')
GLOBAL_COUNTER=$(cat $GLOBAL_COUNTER_FILE) GLOBAL_COUNTER=$(cat $GLOBAL_COUNTER_FILE)
release_global_lock
PERCENT=$((100 * $GLOBAL_COUNTER / $SIZE_OF_INPUT )) PERCENT=$((100 * $GLOBAL_COUNTER / $SIZE_OF_INPUT ))
if [ "$DIED" == "0" ] && [ "$FINISHED" == "0" ] if [ "$DIED" == "0" ] && [ "$FINISHED" == "0" ]
then then
@ -2168,7 +2078,6 @@ listen_for_job () {
cleanup cleanup
} }
# This starts an number of parallel workers based on the # of parallel jobs allowed.
start_all_workers () { start_all_workers () {
if [ "$MAX_NO_OF_RUNNING_JOBS" == "1" ] if [ "$MAX_NO_OF_RUNNING_JOBS" == "1" ]
@ -2217,7 +2126,7 @@ show_status () {
get_all_items get_all_items
ITEMS=`wc -l $LOCAL_INPUT_FILE | awk '{ print $1 }'` ITEMS=`wc -l $LISTOFITEMS | awk '{ print $1 }'`
if [ ! -z "$ITEMS" ] && [ ! "$ITEMS" == "0" ] if [ ! -z "$ITEMS" ] && [ ! "$ITEMS" == "0" ]
then then
@ -2266,8 +2175,6 @@ show_status () {
log DSPLY "$LINE" log DSPLY "$LINE"
} }
# If this is called, the whole framework will execute.
main () { main () {
case $MODE in case $MODE in