PPSS now supports inotify file system events.

This commit is contained in:
Louwrentius 2010-07-11 22:51:31 +00:00
parent b6bd8c6c33
commit 55583de5f8
2 changed files with 198 additions and 56 deletions

252
ppss
View File

@ -77,6 +77,8 @@ PROCESSORS=""
START_KEY="$RANDOM$RANDOM$RANDOM"
STOP_KEY="$RANDOM$RANDOM$RANDOM" # This is a signal to the listener to stop.
KILL_KEY="$RANDOM$RANDOM$RANDOM" # This is a signal to stop immediately and kill
QUEUE=""
INOTIFY=0
RECURSION="1" # all running processes.
START_PPSS=""
STOP_PPSS=""
@ -112,6 +114,9 @@ DISABLE_SKIPPING=0
NODE_STATUS="$PPSS_DIR/status.txt"
DAEMON=0
REGISTER="" # For STACK
STACK=""
TMP_STACK=""
showusage_short () {
@ -439,6 +444,15 @@ is_var_empty () {
fi
}
detect_inotify () {
if [ -e /usr/bin/inotifywait ]
then
INOTIFY=1
fi
}
process_arguments () {
#
@ -534,8 +548,10 @@ process_arguments () {
--daemon)
DAEMON="1"
QUIET="1"
detect_inotify
add_var_to_config DAEMON "$DAEMON"
add_var_to_config QUIET "$QUIET"
add_var_to_config INOTIFY "$INOTIFY"
shift 1
;;
--awskeypair|-P)
@ -1022,6 +1038,50 @@ erase_ppss () {
fi
}
stack_push_tmp () {
TMP1="$1"
if [ -z "$TMP_STACK" ]
then
TMP_STACK="$TMP1"
else
TMP_STACK="$TMP_STACK"$'\n'"$TMP1"
fi
}
stack_push () {
line="$1"
if [ -z "$STACK" ]
then
STACK="$line"
else
STACK="$line"$'\n'"$STACK"
fi
}
stack_pop () {
TMP_STACK=""
i=0
tmp=""
for x in $STACK
do
if [ "$i" = "0" ]
then
tmp="$x"
else
stack_push_tmp "$x"
fi
((i++))
done
STACK="$TMP_STACK"
REGISTER="$tmp"
log DEBUG "Stack is $STACK"
}
ec2_get_pending_nodes() {
#
# This function has naver been tested by the author of PPSS.
@ -1592,7 +1652,7 @@ get_all_items () {
SIZE_OF_INPUT=$(wc -l "$LISTOFITEMS" | awk '{ print $1 }')
if [ "$SIZE_OF_INPUT" -le "0" ]
if [ "$SIZE_OF_INPUT" -le "0" ] && [ "$DAEMON" = "0" ]
then
log ERROR "Source file/dir seems to be empty."
set_status STOPPED
@ -1682,8 +1742,14 @@ start_single_worker () {
#
if ! are_we_sourced
then
echo "$START_KEY" >> "$FIFO"
return $?
if [ "$DAEMON" = "1" ]
then
echo "$STOP_KEY" >> "$FIFO"
return $?
else
echo "$START_KEY" >> "$FIFO"
return $?
fi
fi
}
@ -1975,63 +2041,53 @@ infanticide () {
}
listen_for_job () {
run_command () {
FINISHED=0
DIED=0
PIDS=""
log DEBUG "Listener started."
while read event <& 42
do
if [ "$event" = "$START_KEY" ]
if [ ! -d "$1" ] && [ ! -z "$1" ]
then
commando "$1" &
MYPID="$!"
disown
PIDS="$PIDS $MYPID"
((ACTIVE_WORKERS++))
log DEBUG "Increasing active workers to $ACTIVE_WORKERS"
return 0
else
return 1
fi
}
display_jobs_remaining () {
if [ "$ACTIVE_WORKERS" == "1" ] && [ "$QUIET" == "0" ]
then
log PRCNT "One job is remaining. "
elif [ "$QUIET" == "0" ]
then
if [ "$ACTIVE_WORKERS" == "1" ]
then
if get_item
then
commando "$ITEM" &
MYPID="$!"
disown
PIDS="$PIDS $MYPID"
else
((DIED++))
if [ "$DIED" -ge "$MAX_NO_OF_RUNNING_JOBS" ]
then
if [ "$DAEMON" == "1" ]
then
#
# In daemon mode, start all over again.
#
DIED=0
get_all_items
log DEBUG "Found $SIZE_OF_INPUT items."
start_all_workers
sleep 10
else
break
fi
else
RES=$((MAX_NO_OF_RUNNING_JOBS-DIED))
if [ "$RES" == "1" ] && [ "$QUIET" == "0" ]
then
log PRCNT "$((MAX_NO_OF_RUNNING_JOBS-DIED)) job is remaining. "
elif [ "$QUIET" == "0" ]
then
if [ "$DIED" == "1" ]
then
echo -en "\n"
fi
log PRCNT "$((MAX_NO_OF_RUNNING_JOBS-DIED)) jobs are remaining. "
fi
fi
fi
elif [ "$event" == "$KILL_KEY" ]
then
infanticide
break
echo -en "\n"
fi
log PRCNT "$((ACTIVE_WORKERS)) jobs are remaining. "
fi
}
start_as_daemon () {
ACTIVE_WORKERS=0
get_all_items
log DEBUG "Found $SIZE_OF_INPUT items as daemon."
start_all_workers
sleep 10
}
display_progress () {
if [ "$DAEMON" = "0" ]
then
SIZE_OF_INPUT=$(wc -l "$LISTOFITEMS" | awk '{ print $1 }')
PERCENT=$((100 * $GLOBAL_COUNTER / $SIZE_OF_INPUT ))
if [ "$DIED" == "0" ] && [ "$FINISHED" == "0" ]
if [ ! "$ACTIVE_WORKERS" == "0" ] && [ "$FINISHED" == "0" ]
then
if [ "$QUIET" == "0" ]
then
@ -2050,8 +2106,10 @@ listen_for_job () {
FINISHED=1
fi
fi
done
fi
}
terminate_listener () {
if [ ! -z "$SSH_MASTER_PID" ]
then
@ -2082,6 +2140,86 @@ listen_for_job () {
cleanup
}
inotify_listener () {
inotifywait "$SRC_DIR" -m -e close -q --format '%w%f' | \
while read -r line
do
echo "$line" > "$FIFO"
done
}
listen_for_job () {
FINISHED=0
ACTIVE_WORKERS=$MAX_NO_OF_RUNNING_JOBS
PIDS=""
log DEBUG "Listener started."
if [ "$DAEMON" = "1" ] && [ "$INOTIFY" = "1" ]
then
ACTIVE_WORKERS=0
inotify_listener &
MYPID="$!"
disown
PIDS="$PIDS $MYPID"
fi
while read event <& 42
do
if [ "$event" = "$START_KEY" ]
then
if get_item
then
run_command "$ITEM"
else
((ACTIVE_WORKERS--))
if [ "$ACTIVE_WORKERS" -le "0" ]
then
if [ "$DAEMON" == "1" ] && [ "$INOTIFY" = "0" ]
then
start_as_daemon
else
break
fi
else
display_jobs_remaining
fi
fi
elif [ "$event" == "$STOP_KEY" ]
then
((ACTIVE_WORKERS--))
log DEBUG "Decrease active workers to $ACTIVE_WORKERS"
if [ "$ACTIVE_WORKERS" -lt "$MAX_NO_OF_RUNNING_JOBS" ]
then
stack_pop
run_command "$REGISTER"
fi
elif [ "$event" == "$KILL_KEY" ]
then
infanticide
break
else
if [ -e "$event" ]
then
log DEBUG "Event is an item!"
log DEBUG "$ACTIVE_WORKERS - $MAX_NO_OF_RUNNING_JOBS"
stack_push "$event"
if [ "$ACTIVE_WORKERS" -lt "$MAX_NO_OF_RUNNING_JOBS" ]
then
stack_pop
run_command "$REGISTER"
log DEBUG "Active workers: $ACTIVE_WORKERS"
fi
fi
fi
display_progress
done
terminate_listener
}
start_all_workers () {
if [ "$MAX_NO_OF_RUNNING_JOBS" == "1" ]
@ -2093,12 +2231,16 @@ start_all_workers () {
if [ "$DAEMON" == "0" ]
then
log DSPLY "---------------------------------------------------------"
elif [ "$INOTIFY" = "1" ]
then
return 0
fi
i=0
while [ "$i" -lt "$MAX_NO_OF_RUNNING_JOBS" ]
do
start_single_worker
log DEBUG "Starting worker $i"
((i++))
if [ ! "$MAX_DELAY" == "0" ]

View File

@ -1,7 +1,7 @@
#!/bin/bash
DEBUG="$1"
VERSION="2.80"
VERSION="2.81"
TMP_DIR="/tmp/ppss"
PPSS=./ppss
PPSS_DIR=ppss_dir