Backup, major overhaul of daemon mode and listener process...

This commit is contained in:
Louwrentius 2010-07-17 19:49:19 +00:00
parent caa6519d0f
commit ecc9d3da3b
1 changed files with 244 additions and 185 deletions

429
ppss
View File

@ -75,7 +75,6 @@ IFS_BACKUP="$IFS"
CPUINFO="/proc/cpuinfo"
PROCESSORS=""
START_KEY="$RANDOM$RANDOM$RANDOM"
STOP_KEY="$RANDOM$RANDOM$RANDOM" # This is a signal to the listener to stop.
KILL_KEY="$RANDOM$RANDOM$RANDOM" # This is a signal to stop immediately and kill
QUEUE=""
INOTIFY=0
@ -85,6 +84,9 @@ STOP_PPSS=""
SIZE_OF_INPUT=""
LOCAL_LOCKING="1"
PROCESSED_ITEMS=""
UNPROCESSED_ITEMS=""
ACTIVE_WORKERS="0"
DAEMON_POLLING_INTERVAL="10"
SSH_SERVER="" # Remote server or 'master'.
SSH_KEY="" # SSH key for ssh account.
@ -346,20 +348,20 @@ exec_cmd () {
then
if [ -z "$NOMP" ]
then
log DEBUG "REMOTE EXEC"
log DEBUG "$USER@$SSH_SERVER $CMD"
# log DEBUG "REMOTE EXEC"
# log DEBUG "$USER@$SSH_SERVER $CMD"
ssh $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER $CMD
STATUS=$?
elif [ "$NOMP" == "1" ]
then
log DEBUG "REMOTE EXEC NO MP"
# log DEBUG "REMOTE EXEC NO MP"
ssh $SSH_OPTS_NOMP $SSH_KEY $USER@$SSH_SERVER $CMD
STATUS=$?
fi
else
eval "$CMD"
STATUS=$?
log DEBUG "LOCAL EXEC - status is $STATUS"
# log DEBUG "LOCAL EXEC - status is $STATUS"
fi
return $STATUS
}
@ -373,10 +375,10 @@ does_file_exist () {
RES=`exec_cmd "ls -1 $FILE" 2>&1`
if [ "$?" = "0" ]
then
log DEBUG "$FILE does exist"
#log DEBUG "$FILE does exist"
return 0
else
log DEBUG "$FILE does not exist"
#log DEBUG "$FILE does not exist"
return 1
fi
}
@ -514,38 +516,30 @@ process_arguments () {
exit
fi
fi
shift 2
;;
shift 2 ;;
--working-dir|-w )
PPSS_DIR="$2"
add_var_to_config PPSS_DIR "$PPSS_DIR"
shift 2
;;
shift 2 ;;
--node|-n )
NODES_FILE="$2"
add_var_to_config NODES_FILE "$NODES_FILE"
shift 2
;;
shift 2 ;;
--sourcefile|-f )
INPUT_FILE="$2"
is_var_empty "$INPUT_FILE"
add_var_to_config INPUT_FILE "$INPUT_FILE"
shift 2
;;
shift 2 ;;
--sourcedir|-d )
SRC_DIR="$2"
is_var_empty "$SRC_DIR"
add_var_to_config SRC_DIR "$SRC_DIR"
shift 2
;;
shift 2 ;;
--delay|-D)
MAX_DELAY="$2"
add_var_to_config MAX_DELAY "$MAX_DELAY"
shift 2
;;
shift 2 ;;
--daemon)
DAEMON="1"
QUIET="1"
@ -553,34 +547,32 @@ process_arguments () {
add_var_to_config DAEMON "$DAEMON"
add_var_to_config QUIET "$QUIET"
add_var_to_config INOTIFY "$INOTIFY"
shift 1
;;
shift 1 ;;
--interval)
is_var_empty "$2"
DAEMON_POLLING_INTERVAL="$2"
add_var_to_config DAEMON_POLLING_INTERVAL "$DAEMON_POLLING_INTERVAL"
shift 2 ;;
--awskeypair|-P)
AWS_KEYPAIR="$2"
add_var_to_config AWS_KEYPAIR "$AWS_KEYPAIR"
shift 2
;;
shift 2 ;;
--AMI|-A)
AMI_ID="$2"
add_var_to_config AMI_ID "$AMI_ID"
shift 2
;;
shift 2 ;;
--type|-T)
INSTANCE_TYPE="$2"
add_var_to_config INSTANCE_TYPE "$INSTANCE_TYPE"
shift 2
;;
shift 2 ;;
--security|-G)
SECURITY_GROUP="$2"
add_var_to_config SECURITY_GROUP "$SECURITY_GROUP"
shift 2
;;
shift 2 ;;
--instances|-I)
NUM_NODES="$2"
add_var_to_config NUM_NODES "$NUM_NODES"
shift 2
;;
shift 2 ;;
--command|-c )
COMMAND="$2"
is_var_empty "$COMMAND"
@ -589,44 +581,35 @@ process_arguments () {
COMMAND=\'$COMMAND\'
add_var_to_config COMMAND "$COMMAND"
fi
shift 2
;;
shift 2 ;;
-h )
showusage_normal
exit 1;;
exit 1 ;;
--help)
showusage_long
exit 1;;
exit 1 ;;
--homedir|-H )
if [ ! -z "$2" ]
then
PPSS_HOME_DIR="$2"
add_var_to_config PPSS_DIR $PPSS_HOME_DIR
shift 2
fi
;;
is_var_empty "$2"
PPSS_HOME_DIR="$2"
add_var_to_config PPSS_DIR $PPSS_HOME_DIR
shift 2 ;;
--disable-ht|-j )
HYPERTHREADING=no
add_var_to_config HYPERTHREADING $HYPERTHREADING
shift 1
;;
shift 1 ;;
--log|-l )
LOGFILE="$2"
add_var_to_config LOGFILE "$LOGFILE"
shift 2
;;
shift 2 ;;
--no-recursion|-r )
RECURSION="0"
add_var_to_config LOGFILE "$RECURSION"
shift 1
;;
shift 1 ;;
--workingdir|-w )
WORKINGDIR="$2"
add_var_to_config WORKINGDIR "$WORKINGDIR"
shift 2
;;
shift 2 ;;
--key|-k )
SSH_KEY="$2"
is_var_empty "$SSH_KEY"
@ -635,48 +618,37 @@ process_arguments () {
then
SSH_KEY="-i $SSH_KEY"
fi
shift 2
;;
shift 2 ;;
--known-hosts | -K )
SSH_KNOWN_HOSTS="$2"
add_var_to_config SSH_KNOWN_HOSTS "$SSH_KNOWN_HOSTS"
shift 2
;;
shift 2 ;;
--no-scp |-b )
SECURE_COPY=0
add_var_to_config SECURE_COPY "$SECURE_COPY"
shift 1
;;
shift 1 ;;
--outputdir|-o )
REMOTE_OUTPUT_DIR="$2"
add_var_to_config REMOTE_OUTPUT_DIR "$REMOTE_OUTPUT_DIR"
shift 2
;;
shift 2 ;;
--processes|-p )
TMP="$2"
if [ ! -z "$TMP" ]
then
MAX_NO_OF_RUNNING_JOBS="$TMP"
add_var_to_config MAX_NO_OF_RUNNING_JOBS "$MAX_NO_OF_RUNNING_JOBS"
shift 2
fi
;;
is_var_empty "$2"
MAX_NO_OF_RUNNING_JOBS="$2"
add_var_to_config MAX_NO_OF_RUNNING_JOBS "$MAX_NO_OF_RUNNING_JOBS"
shift 2 ;;
--master|-m )
SSH_SERVER="$2"
add_var_to_config SSH_SERVER "$SSH_SERVER"
shift 2
;;
shift 2 ;;
--script|-S )
SCRIPT="$2"
add_var_to_config SCRIPT "$SCRIPT"
shift 2
;;
shift 2 ;;
--download)
DOWNLOAD_TO_NODE="1"
add_var_to_config DOWNLOAD_TO_NODE "$DOWNLOAD_TO_NODE"
shift 1
;;
shift 1 ;;
--upload)
if [ -z "$REMOTE_OUTPUT_DIR" ]
then
@ -685,32 +657,28 @@ process_arguments () {
fi
UPLOAD_TO_SERVER="1"
add_var_to_config UPLOAD_TO_SERVER "$UPLOAD_TO_SERVER"
shift 1
;;
shift 1 ;;
--quiet|-q )
QUIET="1"
add_var_to_config QUIET "$QUIET"
shift 1
;;
shift 1 ;;
--user|-u )
USER="$2"
add_var_to_config USER "$USER"
shift 2
;;
shift 2 ;;
--version|-v )
echo ""
echo "$SCRIPT_NAME version $SCRIPT_VERSION"
echo ""
exit 0
;;
exit 0 ;;
* )
showusage_short
echo
echo "Unknown option $1 "
echo
exit 1;;
exit 1 ;;
esac
done
@ -1063,7 +1031,32 @@ stack_push () {
fi
}
unprocessed_stack_push () {
line="$1"
if [ -z "$PROCESSED_ITEMS" ]
then
UNPROCESSED_ITEMS="$line"
else
UNPROCESSED_ITEMS="$line"$'\n'"$UNPROCESSED_ITEMS"
fi
}
processed_stack_push () {
line="$1"
if [ -z "$PROCESSED_ITEMS" ]
then
PROCESSED_ITEMS="$line"
else
PROCESSED_ITEMS="$line"$'\n'"$PROCESSED_ITEMS"
fi
}
stack_pop () {
TMP_STACK=""
i=0
tmp=""
@ -1612,24 +1605,36 @@ remove_processed_items_from_input_file () {
# This function removes all items that have already been processed.
# Processed items have a lock dir in the PPPSS_ITEM_LOCK_DIR.
#
UNPROCESSED_ITEMS=""
if [ "$MODE" = "status" ]
then
return 1
fi
log DEBUG "Running $FUNCNAME...."
if [ ! -e "$LISTOFITEMS" ]
then
echo "$LISTOFITEMS does not exist!"
return 1
else
SIZE=`wc -l "$LISTOFITEMS"`
if [ "$SIZE" = "0" ]
then
echo "$LISTOFITEMS exists but is empty."
return 1
fi
fi
INPUTFILES=`list_all_input_items`
RES=`exec_cmd "ls -1 $ITEM_LOCK_DIR"`
rm "$LISTOFITEMS"
log DEBUG "$PROCESSED_ITEMS"
for x in $INPUTFILES
do
FILE_IS_PROCESSED=0
for y in $RES
for y in $PROCESSED_ITEMS
do
TMP=`echo "$x" | $MD5 | awk '{ print $1 }'`
if [ "$y" = "$TMP" ]
if [ "$y" = "$x" ]
then
FILE_IS_PROCESSED=1
fi
@ -1637,22 +1642,20 @@ remove_processed_items_from_input_file () {
if [ "$FILE_IS_PROCESSED" = "0" ]
then
echo "$x" >> "$LISTOFITEMS"
log DEBUG "ITEM $x is not processed."
unprocessed_stack_push "$x"
else
log DEBUG "ITEM $x is already processed!"
log DEBUG "ITEM $x is already processed!."
fi
done
}
get_list_of_new_items () {
echo
echo "$UNPROCESSED_ITEMS" > "$LISTOFITEMS"
}
get_all_items () {
if [ "$DAEMON" == "1" ]
then
GLOBAL_COUNTER=1
get_input_lock
fi
@ -1677,8 +1680,6 @@ get_all_items () {
check_status "$?" "$FUNCNAME" "Could not list files within remote source directory."
fi
remove_processed_items_from_input_file
else
if [ -e "$SRC_DIR" ]
then
@ -1744,6 +1745,7 @@ get_all_items () {
cleanup
exit 1
fi
remove_processed_items_from_input_file
}
get_item () {
@ -1759,7 +1761,7 @@ get_item () {
#
# Return error if list size is empty.
#
if [ -z "$SIZE_OF_INPUT" ]
if [ -z "$SIZE_OF_INPUT" ]
then
log DEBUG "Got no size of input..."
return 1
@ -1770,14 +1772,7 @@ get_item () {
#
if [ "$SIZE_OF_INPUT" -le "0" ]
then
SIZE_OF_INPUT=$(wc -l "$LISTOFITEMS" | awk '{ print $1 }')
if [ "$?" = "0" ]
then
return 0
else
log DEBUG "Size of input 0 or less."
return 1
fi
return 1
fi
#
@ -1805,9 +1800,6 @@ get_item () {
if [ ! "$LOCK" = "0" ]
then
log DEBUG "Item $ITEM is locked."
#
# Recursion, get_ttem calls itself, until all items are done.
#
get_item
else
log DEBUG "Got lock on $ITEM"
@ -1820,21 +1812,15 @@ get_item () {
fi
}
start_single_worker () {
start_new_worker () {
#
# This function kicks the listener to start a worker process.
#
if ! are_we_sourced
then
if [ "$DAEMON" = "1" ] && [ "$INOTIFY" = "1" ]
then
echo "$STOP_KEY" >> "$FIFO"
return $?
else
echo "$START_KEY" >> "$FIFO"
return $?
fi
fi
}
@ -1867,12 +1853,12 @@ commando () {
# This function will start a chain reaction of events.
#
# The commando executes a command on an item and, when finished,
# executes the start_single_worker. This function selects a new
# executes the start_new_worker. This function selects a new
# item and sends it to the fifo. The listener process receives
# the item and excutes this commando function on the item.
# So in essence, the commando function keeps calling itself
# indirectly until no items are left. This will form a single
# working queue. By executing multiple start_single_worker
# working queue. By executing multiple start_new_worker
# functions based on the CPU cores available, parallel processing
# is achieved, with a queue for each core.
#
@ -1962,7 +1948,7 @@ commando () {
if [ -e "$ITEM_LOG_FILE" ] && [ "$DISABLE_SKIPPING" = "0" ]
then
log DEBUG "Item is already processed, skipping..."
start_single_worker
start_new_worker
return 0
fi
@ -2089,7 +2075,7 @@ commando () {
fi
fi
start_single_worker
start_new_worker
}
infanticide () {
@ -2128,18 +2114,34 @@ infanticide () {
run_command () {
if [ ! -d "$1" ] && [ ! -z "$1" ]
INPUT="$1"
log DEBUG "Current active workers is $ACTIVE_WORKERS"
if [ "$ACTIVE_WORKERS" -le "$MAX_NO_OF_RUNNING_JOBS" ]
then
commando "$1" &
MYPID="$!"
disown
PIDS="$PIDS $MYPID"
((ACTIVE_WORKERS++))
log DEBUG "Increasing active workers to $ACTIVE_WORKERS"
return 0
else
return 1
fi
if [ -z "$INPUT" ]
then
stack_pop
INPUT="$REGISTER"
fi
if [ ! -d "$INPUT" ] && [ ! -z "$INPUT" ]
then
commando "$INPUT" &
MYPID="$!"
disown
PIDS="$PIDS $MYPID"
((ACTIVE_WORKERS++))
log DEBUG "Increasing active workers to $ACTIVE_WORKERS"
return 0
else
log DEBUG "Item is a directory or empty."
return 1
fi
else
log DEBUG "Strange, this message should never be displaed."
fi
}
display_jobs_remaining () {
@ -2157,15 +2159,6 @@ display_jobs_remaining () {
fi
}
start_as_daemon () {
ACTIVE_WORKERS=0
get_all_items
log DEBUG "Found $SIZE_OF_INPUT items as daemon."
start_all_workers
sleep 10
}
display_progress () {
if [ "$DAEMON" = "0" ]
@ -2196,6 +2189,8 @@ display_progress () {
terminate_listener () {
log DEBUG "Terminating listener."
if [ ! -z "$SSH_MASTER_PID" ]
then
log DEBUG "SSH master PID is $SSH_MASTER_PID"
@ -2212,7 +2207,7 @@ terminate_listener () {
echo
stop-ppss
log DSPLY "Finished. Consult $JOB_LOG_DIR for job output."
log DSPLY "Press ENTER to continue."
#log DSPLY "Press ENTER to continue."
else
echo
stop-ppss
@ -2234,79 +2229,142 @@ inotify_listener () {
done
}
start_inotify_listener () {
is_item_unprocessed () {
ACTIVE_WORKERS=0
inotify_listener &
MYPID="$!"
disown
PIDS="$PIDS $MYPID"
VAR="$1"
STATUS=0
if [ -z "$VAR" ]
then
log DEBUG "$FUNCNAME: something is wrong, no argument received."
return 1
fi
for x in $PROCESSED_ITEMS
do
if [ "$x" = "$VAR" ]
then
STATUS=1
fi
done
log DEBUG "Is item $VAR unprocessed: $STATUS"
return $STATUS
}
listen_for_job () {
daemon_listener () {
FINISHED=0
ACTIVE_WORKERS=$MAX_NO_OF_RUNNING_JOBS
PIDS=""
log DEBUG "Listener started."
while true
do
get_all_items
while get_item
do
if is_item_unprocessed "$ITEM"
then
log DEBUG "Daemon sending item $ITEM to fifo."
echo "$ITEM" >> "$FIFO"
processed_stack_push "$ITEM"
log DEBUG "Processed items is $PROCESSED_ITEMS"
fi
done
sleep "$DAEMON_POLLING_INTERVAL"
done
}
start_daemon_listener () {
daemon_listener &
MYPID="$!"
disown
PIDS="$PIDS $MYPID"
}
start_inotify_listener () {
ACTIVE_WORKERS=0
inotify_listener &
MYPID="$!"
disown
PIDS="$PIDS $MYPID"
}
start_as_daemon () {
if [ "$DAEMON" = "1" ]
then
log DEBUG "Daemon mode enabled."
if [ "$INOTIFY" = "1" ]
then
log INFO "Linux inotify enabled."
start_inotify_listener
else
start_daemon_listener
log INFO "Linux inotify disabled."
fi
else
log DEBUG "Daemon mode disabled."
fi
}
decrease_active_workers () {
if [ "$ACTIVE_WORKERS" -gt "0" ]
then
((ACTIVE_WORKERS--))
fi
}
listen_for_job () {
FINISHED=0
ACTIVE_WORKERS="$MAX_NO_OF_RUNNING_JOBS"
PIDS=""
log DEBUG "Listener started."
start_as_daemon
while read event <& 42
do
log DEBUG "Current active workers is $ACTIVE_WORKERS"
decrease_active_workers
if [ "$event" = "$START_KEY" ]
then
log DEBUG "Got a 'start-key' event"
if get_item
if [ "$DAEMON" = "0" ]
then
run_command "$ITEM"
else
((ACTIVE_WORKERS--))
if [ "$DAEMON" == "1" ] && [ "$INOTIFY" = "0" ]
if get_item
then
start_as_daemon
log DEBUG "Got an item, running command..."
run_command "$ITEM"
else
break
log DEBUG "No more new items..."
if [ "$ACTIVE_WORKERS" = "0" ]
then
break
else
display_jobs_remaining
fi
fi
display_jobs_remaining
fi
elif [ "$event" == "$STOP_KEY" ]
then
((ACTIVE_WORKERS--))
log DEBUG "Decrease active workers to $ACTIVE_WORKERS"
if [ "$ACTIVE_WORKERS" -lt "$MAX_NO_OF_RUNNING_JOBS" ]
then
stack_pop
run_command "$REGISTER"
else
log DEBUG "Daemon mode: a worker finished..."
decrease_active_workers
run_command
fi
elif [ "$event" == "$KILL_KEY" ]
then
infanticide
break
else
if [ -e "$event" ]
then
log DEBUG "Event is an item!"
log DEBUG "$ACTIVE_WORKERS - $MAX_NO_OF_RUNNING_JOBS"
stack_push "$event"
if [ "$ACTIVE_WORKERS" -lt "$MAX_NO_OF_RUNNING_JOBS" ]
then
stack_pop
run_command "$REGISTER"
log DEBUG "Active workers: $ACTIVE_WORKERS"
fi
fi
log DEBUG "Event is an item!"
stack_push "$event"
log DEBUG "stack after push is $STACK"
run_command
fi
display_progress
done
@ -2321,6 +2379,7 @@ start_all_workers () {
else
log DSPLY "Starting $MAX_NO_OF_RUNNING_JOBS parallel workers."
fi
if [ "$DAEMON" == "0" ]
then
log DSPLY "---------------------------------------------------------"
@ -2332,7 +2391,7 @@ start_all_workers () {
i=0
while [ "$i" -lt "$MAX_NO_OF_RUNNING_JOBS" ]
do
start_single_worker
start_new_worker
log DEBUG "Starting worker $i"
((i++))