Fixed distributed ppss mechanism.

This commit is contained in:
Louwrentius 2009-11-08 15:16:49 +00:00
parent 05801b6773
commit cd4545d0c3
1 changed files with 42 additions and 25 deletions

67
ppss.sh
View File

@ -41,7 +41,7 @@ SCRIPT_NAME="Distributed Parallel Processing Shell Script"
SCRIPT_VERSION="2.36" SCRIPT_VERSION="2.36"
# The first argument to this script can be a mode. # The first argument to this script can be a mode.
MODES="start config stop pause continue deploy status erase kill" MODES="node start config stop pause continue deploy status erase kill"
for x in $MODES for x in $MODES
do do
if [ "$x" == "$1" ] if [ "$x" == "$1" ]
@ -84,6 +84,7 @@ IFS_BACKUP="$IFS"
CPUINFO=/proc/cpuinfo CPUINFO=/proc/cpuinfo
PROCESSORS="" PROCESSORS=""
STOP_KEY=$RANDOM$RANDOM$RANDOM STOP_KEY=$RANDOM$RANDOM$RANDOM
KILL_KEY=$RANDOM$RANDOM$RANDOM
SSH_SERVER="" # Remote server or 'master'. SSH_SERVER="" # Remote server or 'master'.
SSH_KEY="" # SSH key for ssh account. SSH_KEY="" # SSH key for ssh account.
@ -114,14 +115,14 @@ showusage_short () {
echo echo
echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION" echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION"
echo echo
echo "usage: $0 [ -d <sourcedir> | -f <sourcefile> ] [ -c '<command> \"$ITEM\"' ]" echo "usage: $0 [ -d <sourcedir> | -f <sourcefile> ] [ -c '<command> \"\$ITEM\"' ]"
echo " [ -C <configfile> ] [ -j ] [ -l <logfile> ] [ -p <# jobs> ]" echo " [ -C <configfile> ] [ -j ] [ -l <logfile> ] [ -p <# jobs> ]"
echo " [ -D <delay> ]" echo " [ -D <delay> ]"
echo echo
echo "Examples:" echo "Examples:"
echo " $0 -d /dir/with/some/files -c 'gzip '" echo " $0 -d /dir/with/some/files -c 'gzip '"
echo " $0 -d /dir/with/some/files -c 'gzip \"$ITEM\"' -D 5" echo " $0 -d /dir/with/some/files -c 'gzip \"\$ITEM\"' -D 5"
echo " $0 -d /dir/with/some/files -c 'cp \"$ITEM\" /tmp' -p 2" echo " $0 -d /dir/with/some/files -c 'cp \"\$ITEM\" /tmp' -p 2"
} }
showusage_normal () { showusage_normal () {
@ -277,18 +278,8 @@ showusage_long () {
kill_process () { kill_process () {
echo "$KILL_KEY" >> "$FIFO"
kill $LISTENER_PID > /dev/null 2>&1 }
sleep 1
cleanup
sleep 1
if [ ! -z "$SSH_MASTER_PID" ]
then
kill -9 "$SSH_MASTER_PID" >> /dev/null 2>&1
fi
sleep 1
log INFO "Finished. Consult ./$JOB_LOG_DIR for job output."
}
exec_cmd () { exec_cmd () {
@ -755,7 +746,6 @@ erase_ppss () {
for NODE in `cat $NODES_FILE` for NODE in `cat $NODES_FILE`
do do
log INFO "Erasing PPSS homedir $PPSS_DIR from node $NODE." log INFO "Erasing PPSS homedir $PPSS_DIR from node $NODE."
ssh -q $SSH_KEY $SSH_OPTS $USER@$NODE "./$PPSS_HOME_DIR/$0 kill"
ssh -q $SSH_KEY $SSH_OPTS $USER@$NODE "rm -rf $PPSS_HOME_DIR" ssh -q $SSH_KEY $SSH_OPTS $USER@$NODE "rm -rf $PPSS_HOME_DIR"
done done
else else
@ -798,7 +788,7 @@ deploy () {
sleep 1.1 sleep 1.1
ssh -q $SSH_OPTS_NODE $SSH_KEY $USER@$NODE "mkdir $PPSS_HOME_DIR >> /dev/null 2>&1" ssh -q $SSH_OPTS_NODE $SSH_KEY $USER@$NODE "cd ~ && mkdir $PPSS_HOME_DIR >> /dev/null 2>&1"
scp -q $SSH_OPTS_NODE $SSH_KEY $0 $USER@$NODE:~/$PPSS_HOME_DIR scp -q $SSH_OPTS_NODE $SSH_KEY $0 $USER@$NODE:~/$PPSS_HOME_DIR
set_error $? set_error $?
scp -q $SSH_OPTS_NODE $SSH_KEY $KEY $USER@$NODE:~/$PPSS_HOME_DIR scp -q $SSH_OPTS_NODE $SSH_KEY $KEY $USER@$NODE:~/$PPSS_HOME_DIR
@ -879,6 +869,7 @@ deploy_ppss () {
then then
log DEBUG "SSH SERVER $SSH_SERVER is not a node." log DEBUG "SSH SERVER $SSH_SERVER is not a node."
deploy "$SSH_SERVER" deploy "$SSH_SERVER"
exec_cmd "mkdir -p $PPSS_HOME_DIR/$JOB_LOG_DIR"
fi fi
fi fi
} }
@ -888,7 +879,7 @@ start_ppss_on_node () {
NODE="$1" NODE="$1"
log INFO "Starting PPSS on node $NODE." log INFO "Starting PPSS on node $NODE."
ssh $SSH_KEY $USER@$NODE "cd $PPSS_HOME_DIR ; screen -d -m -S PPSS $0 node --config $CONFIG" ssh $SSH_KEY $USER@$NODE "cd $PPSS_HOME_DIR ; screen -d -m -S PPSS ~/$PPSS_HOME_DIR/$0 --config ~/$PPSS_HOME_DIR/$CONFIG"
} }
@ -1452,8 +1443,8 @@ commando () {
if [ ! -z "$SSH_SERVER" ] if [ ! -z "$SSH_SERVER" ]
then then
log DEBUG "Uploading item log file $ITEM_LOG_FILE to master ~/$JOB_LOG" log DEBUG "Uploading item log file $ITEM_LOG_FILE to master ~/$PPSS_HOME_DIR/$JOB_LOG_DIR"
scp -q $SSH_OPTS $SSH_KEY "$ITEM_LOG_FILE" $USER@$SSH_SERVER:~/$JOB_LOG_DIR scp -q $SSH_OPTS $SSH_KEY "$ITEM_LOG_FILE" $USER@$SSH_SERVER:~/$PPSS_HOME_DIR/$JOB_LOG_DIR
if [ ! "$?" == "0" ] if [ ! "$?" == "0" ]
then then
log DEBUG "Uploading of item log file failed." log DEBUG "Uploading of item log file failed."
@ -1471,6 +1462,7 @@ commando () {
listen_for_job () { listen_for_job () {
FINISHED=0 FINISHED=0
DIED=0 DIED=0
PIDS=""
log DEBUG "Listener started." log DEBUG "Listener started."
while read event <& 42 while read event <& 42
do do
@ -1480,6 +1472,7 @@ listen_for_job () {
# This mechanism makes PPSS asynchronous. # This mechanism makes PPSS asynchronous.
# Gives a status update on the current progress.. # Gives a status update on the current progress..
echo "$event" >> event.txt
if [ "$event" == "$STOP_KEY" ] if [ "$event" == "$STOP_KEY" ]
then then
@ -1496,8 +1489,24 @@ listen_for_job () {
log INFO "$((MAX_NO_OF_RUNNING_JOBS-DIED)) jobs are remaining." log INFO "$((MAX_NO_OF_RUNNING_JOBS-DIED)) jobs are remaining."
echo -en "\033[1A" echo -en "\033[1A"
fi fi
elif [ "$event" == "$KILL_KEY" ]
then
for x in $PIDS
do
log DEBUG "Killing pid $x..."
kill $x >> /dev/null 2>&1
done
cleanup
if [ ! -z "$SSH_MASTER_PID" ]
then
kill -9 "$SSH_MASTER_PID" >> /dev/null 2>&1
fi
log INFO "Finished. Consult ./$JOB_LOG_DIR for job output."
break
else else
commando "$event" & commando "$event" &
PIDS="$PIDS $!"
disown
fi fi
SIZE_OF_ARRAY="${#ARRAY[@]}" SIZE_OF_ARRAY="${#ARRAY[@]}"
@ -1514,9 +1523,9 @@ listen_for_job () {
fi fi
fi fi
done done
kill_process
set_status STOPPED set_status STOPPED
log DEBUG "Listener stopped." log DEBUG "Listener stopped."
exit
} }
# This starts an number of parallel workers based on the # of parallel jobs allowed. # This starts an number of parallel workers based on the # of parallel jobs allowed.
@ -1546,7 +1555,7 @@ start_all_workers () {
get_status_of_node () { get_status_of_node () {
NODE="$1" NODE="$1"
STATUS=`ssh -o ConnectTimeout=10 $SSH_KEY $USER@$NODE cat "$NODE_STATUS"` STATUS=`ssh -o ConnectTimeout=10 $SSH_KEY $USER@$NODE cat "$PPSS_HOME_DIR/$NODE_STATUS"`
ERROR="$?" ERROR="$?"
if [ ! "$ERROR" == "0" ] if [ ! "$ERROR" == "0" ]
then then
@ -1591,7 +1600,7 @@ show_status () {
for x in `cat $NODES_FILE` for x in `cat $NODES_FILE`
do do
NODE=`get_status_of_node "$x" | awk '{ print $1 }'` NODE=`get_status_of_node "$x" | awk '{ print $1 }'`
RES=`exec_cmd "grep -i $NODE ~/$JOB_LOG_DIR/* | wc -l "` RES=`exec_cmd "grep -i $NODE ~/$PPSS_HOME_DIR/$JOB_LOG_DIR/* | wc -l "`
if [ ! "$?" == "0" ] if [ ! "$?" == "0" ]
then then
RES=0 RES=0
@ -1612,7 +1621,15 @@ show_status () {
main () { main () {
case $MODE in case $MODE in
start ) node )
init_vars
test_server
get_all_items
listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & 2>&1 >> /dev/null
LISTENER_PID=$!
start_all_workers
;;
start )
# This option only starts all nodes. # This option only starts all nodes.
display_header display_header
if [ ! -e "$NODES_FILE" ] if [ ! -e "$NODES_FILE" ]