Major rework, no longer using arrays. Arrays don't scale and require enormous amounts of memmory when processing large input files. Will be released as 2.70.

This commit is contained in:
Louwrentius 2010-05-14 22:10:29 +00:00
parent 9feea3fb59
commit 0db3781021
2 changed files with 48 additions and 47 deletions

93
ppss
View File

@ -25,7 +25,7 @@ trap 'kill_process' SIGINT
# Setting some vars. # Setting some vars.
SCRIPT_NAME="Distributed Parallel Processing Shell Script" SCRIPT_NAME="Distributed Parallel Processing Shell Script"
SCRIPT_VERSION="2.65" SCRIPT_VERSION="2.70"
# The first argument to this script can be a mode. # The first argument to this script can be a mode.
MODES="node start config stop pause continue deploy status erase kill ec2" MODES="node start config stop pause continue deploy status erase kill ec2"
@ -73,6 +73,9 @@ PAUSE_DELAY="60" # Polling every 1 minu
STOP_SIGNAL="$PPSS_HOME_DIR/$PPSS_DIR/stop_signal" # Stop processing if this file is present. STOP_SIGNAL="$PPSS_HOME_DIR/$PPSS_DIR/stop_signal" # Stop processing if this file is present.
ARRAY_POINTER_FILE="$PPSS_DIR/ppss-array-pointer-$PID" # Pointer for keeping track of processed items. ARRAY_POINTER_FILE="$PPSS_DIR/ppss-array-pointer-$PID" # Pointer for keeping track of processed items.
ARRAY="" ARRAY=""
GLOBAL_COUNTER=""
GLOBAL_COUNTER_FILE="$PPSS_DIR/ppss-input-counter-$PID"
LOCAL_INPUT_FILE="$PPSS_DIR/INPUT_FILE-$PID"
JOB_LOG_DIR="$PPSS_DIR/job_log" # Directory containing log files of processed items. JOB_LOG_DIR="$PPSS_DIR/job_log" # Directory containing log files of processed items.
LOGFILE="$PPSS_DIR/ppss-log-$PID.txt" # General PPSS log file. Contains lots of info. LOGFILE="$PPSS_DIR/ppss-log-$PID.txt" # General PPSS log file. Contains lots of info.
QUIET="0" QUIET="0"
@ -435,9 +438,9 @@ cleanup () {
rm "$FIFO" rm "$FIFO"
fi fi
if [ -e "$ARRAY_POINTER_FILE" ] if [ -e "$GLOBAL_COUNTER_FILE" ]
then then
rm "$ARRAY_POINTER_FILE" rm "$GLOBAL_COUNTER_FILE"
fi fi
if [ -e "$GLOBAL_LOCK" ] if [ -e "$GLOBAL_LOCK" ]
@ -836,7 +839,7 @@ init_vars () {
exit 1 exit 1
fi fi
echo 0 > $ARRAY_POINTER_FILE echo 1 > $GLOBAL_COUNTER_FILE
FIFO=/tmp/ppss-fifo-$RANDOM-$RANDOM FIFO=/tmp/ppss-fifo-$RANDOM-$RANDOM
@ -1508,15 +1511,17 @@ get_all_items () {
if [ -z "$INPUT_FILE" ] if [ -z "$INPUT_FILE" ]
then then
if [ ! -z "$SSH_SERVER" ] # Are we running stand-alone or as a slave?" if [ ! -z "$SSH_SERVER" ] # Are we running stand-alone or as a node?"
then then
if [ "$RECURSION" == "1" ] if [ "$RECURSION" == "1" ]
then then
ITEMS=`exec_cmd "find $SRC_DIR/ ! -type d"` #ITEMS=`exec_cmd "find $SRC_DIR/ ! -type d"`
`exec_cmd "find $SRC_DIR/ ! -type d"` >> "$LOCAL_INPUT_FILE"
check_status "$?" "$FUNCNAME" "Could not list files within remote source directory." check_status "$?" "$FUNCNAME" "Could not list files within remote source directory."
else else
log DEBUG "Recursion is disabled." log DEBUG "Recursion is disabled."
ITEMS=`exec_cmd "ls -1 $SRC_DIR"` #ITEMS=`exec_cmd "ls -1 $SRC_DIR"`
`exec_cmd "ls -1 $SRC_DIR"` >> "$LOCAL_INPUT_FILE"
check_status "$?" "$FUNCNAME" "Could not list files within remote source directory." check_status "$?" "$FUNCNAME" "Could not list files within remote source directory."
fi fi
else else
@ -1524,29 +1529,31 @@ get_all_items () {
then then
if [ "$RECURSION" == "1" ] if [ "$RECURSION" == "1" ]
then then
ITEMS=`find "$SRC_DIR/" ! -type d` log DEBUG "Recursion is enabled."
#ITEMS=`find "$SRC_DIR/" ! -type d`
`find "$SRC_DIR/" ! -type d >> "$LOCAL_INPUT_FILE"`
check_status "$?" "$FUNCNAME" "Could not list files within local source directory." check_status "$?" "$FUNCNAME" "Could not list files within local source directory."
else else
log DEBUG "Recursion is disabled." log DEBUG "Recursion is disabled."
ITEMS=`ls -1 "$SRC_DIR"` #ITEMS=`ls -1 "$SRC_DIR"`
`ls -1 "$SRC_DIR" >> "$LOCAL_INPUT_FILE"`
check_status "$?" "$FUNCNAME" "Could not list files within local source directory." check_status "$?" "$FUNCNAME" "Could not list files within local source directory."
fi fi
if [ ! -e "$LOCAL_INPUT_FILE" ]
then
log ERROR "Local input file is not created, something is wrong. Bug?"
set_status "ERROR"
cleanup
exit 1
fi
else else
ITEMS="" ITEMS=""
fi fi
fi fi
IFS=$'\n'
for x in $ITEMS
do
ARRAY[$count]="$x"
((count++))
done
IFS=$IFS_BACKUP
else else
if [ ! -z "$SSH_SERVER" ] # Are we running stand-alone or as a slave?" if [ ! -z "$SSH_SERVER" ] # Are we running stand-alone or as a slave?"
then then
log DEBUG "Running as slave, input file has been pushed (hopefully)." log DEBUG "Running as node, input file has been pushed (hopefully)."
fi fi
if [ ! -e "$INPUT_FILE" ] && [ ! "$INPUT_FILE" == "-" ] if [ ! -e "$INPUT_FILE" ] && [ ! "$INPUT_FILE" == "-" ]
then then
@ -1558,22 +1565,13 @@ get_all_items () {
if [ ! "$INPUT_FILE" == "-" ] if [ ! "$INPUT_FILE" == "-" ]
then then
cp "$INPUT_FILE" "$LOCAL_INPUT_FILE"
exec 10<"$INPUT_FILE" check_status "$?" "$FUNCNAME" "Copy of input file failed!"
while read LINE <&10
do
ARRAY[$count]=$LINE
((count++))
done
exec 10>&-
else else
log DEBUG "Reading from stdin.." log DEBUG "Reading from stdin.."
while read LINE while read LINE
do do
ARRAY[$count]=$LINE echo "$LINE" >> "$LOCAL_INPUT_FILE"
((count++))
done done
fi fi
fi fi
@ -1583,8 +1581,8 @@ get_all_items () {
release_input_lock release_input_lock
fi fi
SIZE_OF_ARRAY="${#ARRAY[@]}" SIZE_OF_INPUT=$(wc -l "$LOCAL_INPUT_FILE" | awk '{ print $1 }')
if [ "$SIZE_OF_ARRAY" -le "0" ] if [ "$SIZE_OF_INPUT" -le "0" ]
then then
log ERROR "Source file/dir seems to be empty." log ERROR "Source file/dir seems to be empty."
set_status STOPPED set_status STOPPED
@ -1604,12 +1602,12 @@ get_item () {
get_global_lock get_global_lock
SIZE_OF_ARRAY="${#ARRAY[@]}" SIZE_OF_INPUT=$(wc -l "$LOCAL_INPUT_FILE" | awk '{ print $1 }')
log DEBUG "sizeofinput $SIZE_OF_INPUT"
# #
# Return error if the array is empty. # Return error if the array is empty.
# #
if [ "$SIZE_OF_ARRAY" -le "0" ] if [ "$SIZE_OF_INPUT" -le "0" ]
then then
release_global_lock release_global_lock
return 1 return 1
@ -1618,28 +1616,31 @@ get_item () {
# #
# This variable is used to walk thtough all array items. # This variable is used to walk thtough all array items.
# #
ARRAY_POINTER=`cat $ARRAY_POINTER_FILE` GLOBAL_COUNTER=$(cat $GLOBAL_COUNTER_FILE)
log DEBUG "globalcounter $GLOBAL_COUNTER"
# #
# Check if all items have been processed. # Check if all items have been processed.
# #
if [ "$ARRAY_POINTER" -ge "$SIZE_OF_ARRAY" ] if [ "$GLOBAL_COUNTER" -gt "$SIZE_OF_INPUT" ]
then then
release_global_lock release_global_lock
return 1 return 1
fi fi
ITEM="${ARRAY[$ARRAY_POINTER]}" ITEM="$(sed -n $GLOBAL_COUNTER\p $LOCAL_INPUT_FILE)"
log DEBUG "item dus is $ITEM"
if [ -z "$ITEM" ] if [ -z "$ITEM" ]
then then
((ARRAY_POINTER++)) ((GLOBAL_COUNTER++))
echo $ARRAY_POINTER > $ARRAY_POINTER_FILE log DEBUG "Item was emtpy..."
echo $GLOBAL_COUNTER > $GLOBAL_COUNTER_FILE
release_global_lock release_global_lock
get_item get_item
else else
((ARRAY_POINTER++)) ((GLOBAL_COUNTER++))
echo $ARRAY_POINTER > $ARRAY_POINTER_FILE echo $GLOBAL_COUNTER > $GLOBAL_COUNTER_FILE
lock_item "$ITEM" lock_item "$ITEM"
if [ ! "$?" == "0" ] if [ ! "$?" == "0" ]
then then
@ -2039,15 +2040,15 @@ listen_for_job () {
fi fi
get_global_lock get_global_lock
SIZE_OF_ARRAY="${#ARRAY[@]}" SIZE_OF_INPUT=$(wc -l "$LOCAL_INPUT_FILE" | awk '{ print $1 }')
ARRAY_POINTER=`cat $ARRAY_POINTER_FILE` GLOBAL_COUNTER=$(cat $GLOBAL_COUNTER_FILE)
release_global_lock release_global_lock
PERCENT=$((100 * $ARRAY_POINTER / $SIZE_OF_ARRAY )) PERCENT=$((100 * $GLOBAL_COUNTER / $SIZE_OF_INPUT ))
if [ "$DIED" == "0" ] && [ "$FINISHED" == "0" ] if [ "$DIED" == "0" ] && [ "$FINISHED" == "0" ]
then then
if [ "$QUIET" == "0" ] if [ "$QUIET" == "0" ]
then then
log PRCNT "Currently $PERCENT percent complete. Processed $ARRAY_POINTER of $SIZE_OF_ARRAY items." log PRCNT "Currently $PERCENT percent complete. Processed $GLOBAL_COUNTER of $SIZE_OF_INPUT items."
elif [ "$DAEMON" == "0" ] elif [ "$DAEMON" == "0" ]
then then
echo -en "\r$PERCENT%" echo -en "\r$PERCENT%"

View File

@ -1,7 +1,7 @@
#!/bin/bash #!/bin/bash
DEBUG="$1" DEBUG="$1"
VERSION="2.65" VERSION="2.70"
TMP_DIR="ppss" TMP_DIR="ppss"
PPSS=./ppss PPSS=./ppss
PPSS_DIR=ppss_dir PPSS_DIR=ppss_dir