From 2356f6bb0a7035446165be8323f4a6f39dc28fed Mon Sep 17 00:00:00 2001 From: Louwrentius Date: Sun, 31 Jan 2010 22:31:22 +0000 Subject: [PATCH] Merge distributed PPSS back into trunk --- ppss | 1839 ++++++++++++++++++++++++++++++++++++++++++++++++++ ppss-test.sh | 223 ++++++ ppss.sh | 1116 +++++++++++++++++++++--------- 3 files changed, 2865 insertions(+), 313 deletions(-) create mode 100755 ppss create mode 100755 ppss-test.sh diff --git a/ppss b/ppss new file mode 100755 index 0000000..88abbe3 --- /dev/null +++ b/ppss @@ -0,0 +1,1839 @@ +#!/usr/bin/env bash +# +# PPSS, the Parallel Processing Shell Script +# +# Copyright (c) 2010, Louwrentius +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# See +# for a copy of the GNU General Public License +# +# "Patches or other contributions are always welcome!" +# + +# Handling control-c for a clean shutdown. +trap 'kill_process' SIGINT + +# Setting some vars. +SCRIPT_NAME="Distributed Parallel Processing Shell Script" +SCRIPT_VERSION="2.55" + +# The first argument to this script can be a mode. +MODES="node start config stop pause continue deploy status erase kill" +for x in $MODES +do + if [ "$x" == "$1" ] + then + MODE="$1" + shift + fi +done + +# The working directory of PPSS can be set with +# export PPSS_DIR=/path/to/workingdir +if [ -z "$PPSS_DIR" ] +then + PPSS_DIR="./ppss_dir" +fi + +CONFIG="" +HOSTNAME=`hostname` +ARCH=`uname` + +PID="$$" +GLOBAL_LOCK="$PPSS_DIR/PPSS-GLOBAL-LOCK-$PID" # Global lock file used by local PPSS instance. +PAUSE_SIGNAL="$PPSS_DIR/pause_signal" # Pause processing if this file is present. +PAUSE_DELAY=60 # Polling every 1 minutes by default. +STOP_SIGNAL="$PPSS_DIR/stop_signal" # Stop processing if this file is present. +ARRAY_POINTER_FILE="$PPSS_DIR/ppss-array-pointer-$PID" # Pointer for keeping track of processed items. +JOB_LOG_DIR="$PPSS_DIR/job_log" # Directory containing log files of processed items. +LOGFILE="$PPSS_DIR/ppss-log-$PID.txt" # General PPSS log file. Contains lots of info. +STOP=0 # STOP job. +MAX_DELAY=0 # MAX DELAY between jobs. +MAX_LOCK_DELAY=9 # +PERCENT="0" +LISTENER_PID="" +IFS_BACKUP="$IFS" +CPUINFO=/proc/cpuinfo +PROCESSORS="" +STOP_KEY=$RANDOM$RANDOM$RANDOM +KILL_KEY=$RANDOM$RANDOM$RANDOM + +SSH_SERVER="" # Remote server or 'master'. +SSH_KEY="" # SSH key for ssh account. +SSH_KNOWN_HOSTS="" +SSH_SOCKET="/tmp/ppss_ssh_socket" # Multiplex multiple SSH connections over 1 master. +SSH_OPTS="-o BatchMode=yes -o ControlPath=$SSH_SOCKET \ + -o GlobalKnownHostsFile=./known_hosts \ + -o ControlMaster=auto \ + -o Cipher=blowfish \ + -o ConnectTimeout=10 " + # Blowfish is faster but still secure. +SSH_MASTER_PID="" + +PPSS_HOME_DIR=ppss +ITEM_LOCK_DIR="$PPSS_DIR/PPSS_ITEM_LOCK_DIR" # Remote directory on master used for item locking. +PPSS_LOCAL_TMPDIR="$PPSS_DIR/PPSS_LOCAL_TMPDIR" # Local directory on slave for local processing. +PPSS_LOCAL_OUTPUT="$PPSS_DIR/PPSS_LOCAL_OUTPUT" # Local directory on slave for local output. +TRANSFER_TO_SLAVE="0" # Transfer item to slave via (s)cp. +SECURE_COPY="1" # If set, use SCP, Otherwise, use cp. +REMOTE_OUTPUT_DIR="" # Remote directory to which output must be uploaded. +SCRIPT="" # Custom user script that is executed by ppss. +ITEM_ESCAPED="" +NODE_STATUS="$PPSS_DIR/status.txt" + +showusage_short () { + + echo + echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION" + echo + echo "usage: $0 [ -d | -f ] [ -c ' \"\$ITEM\"' ]" + echo " [ -C ] [ -j ] [ -l ] [ -p <# jobs> ]" + echo " [ -D ] [ -h ] [ --help ]" + echo + echo "Examples:" + echo " $0 -d /dir/with/some/files -c 'gzip '" + echo " $0 -d /dir/with/some/files -c 'gzip \"\$ITEM\"' -D 5" + echo " $0 -d /dir/with/some/files -c 'cp \"\$ITEM\" /tmp' -p 2" +} + +showusage_normal () { + + echo + echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION" + echo + echo "PPSS is a Bash shell script that executes commands in parallel on a set " + echo "of items, such as files in a directory, or lines in a file." + echo + echo "This short summary only discusses options for stand-alone mode. for a " + echo "complete listing of all options, run PPSS with the options --help" + echo + echo "Usage $0 [ options ]" + echo + echo -e "--command | -c Command to execute. Syntax: ' ' including the single quotes." + echo -e " Example: -c 'ls -alh '. It is also possible to specify where an item " + echo -e " must be inserted: 'cp \"\$ITEM\" /somedir'." + echo + echo -e "--sourcedir | -d Directory that contains files that must be processed. Individual files" + echo -e " are fed as an argument to the command that has been specified with -c." + echo + echo -e "--sourcefile | -f Each single line of the supplied file will be fed as an item to the" + echo -e " command that has been specified with -c." + echo + echo -e "--config | -C If the mode is config, a config file with the specified name will be" + echo -e " generated based on all the options specified. In the other modes". + echo -e " this option will result in PPSS reading the config file and start" + echo -e " processing items based on the settings of this file." + echo + echo -e "--enable-ht | -j Enable hyperthreading. Is disabled by default." + echo + echo -e "--log | -l Sets the name of the log file. The default is ppss-log.txt." + echo + echo -e "--processes | -p Start the specified number of processes. Ignore the number of available" + echo -e " CPUs." + echo + echo -e "--delay | -D Adds an initial random delay to the start of all parallel jobs to spread" + echo -e " the load. The delay is only used at the start of all 'threads'." + echo + echo -e "Example: encoding some wav files to mp3 using lame:" + echo + echo -e "$0 -d /path/to/wavfiles -c 'lame '" + echo + echo -e "Extended usage: use --help" + echo +} + +if [ "$#" == "0" ] +then + showusage_short + exit 1 +fi + +showusage_long () { + + echo + echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION" + echo + echo "PPSS is a Bash shell script that executes commands in parallel on a set " + echo "of items, such as files in a directory, or lines in a file." + echo + echo "Usage: $0 [ MODE ] [ options ]" + echo + echo "Modes are optional and mainly used for running in distributed mode. Modes are:" + echo + echo " config Generate a config file based on the supplied option parameters." + echo " deploy Deploy PPSS and related files on the specified nodes." + echo " erase Erase PPSS and related files from the specified nodes." + echo + echo " start Starting PPSS on nodes." + echo " pause Pausing PPSS on all nodes." + echo " stop Stopping PPSS on all nodes." + echo " node Running PPSS as a node, requires additional options." + echo + echo "Options are:" + echo + echo -e "--command | -c Command to execute. Syntax: ' ' including the single quotes." + echo -e " Example: -c 'ls -alh '. It is also possible to specify where an item " + echo -e " must be inserted: 'cp \"\$ITEM\" /somedir'." + echo + echo -e "--sourcedir | -d Directory that contains files that must be processed. Individual files" + echo -e " are fed as an argument to the command that has been specified with -c." + echo + echo -e "--sourcefile | -f Each single line of the supplied file will be fed as an item to the" + echo -e " command that has been specified with -c." + echo + echo -e "--config | -C If the mode is config, a config file with the specified name will be" + echo -e " generated based on all the options specified. In the other modes". + echo -e " this option will result in PPSS reading the config file and start" + echo -e " processing items based on the settings of this file." + echo + echo -e "--enable-ht | -j Enable hyperthreading. Is disabled by default." + echo + echo -e "--log | -l Sets the name of the log file. The default is ppss-log.txt." + echo + echo -e "--processes | -p Start the specified number of processes. Ignore the number of available" + echo -e " CPUs." + echo + echo -e "--delay | -D Adds an initial random delay to the start of all parallel jobs to spread" + echo -e " the load. The delay is only used at the start of all 'threads'." + echo + echo -e "The following options are used for distributed execution of PPSS." + echo + echo -e "--master | -m Specifies the SSH server that is used for communication between nodes." + echo -e " Using SSH, file locks are created, informing other nodes that an item " + echo -e " is locked. Also, often items, such as files, reside on this host. SCP " + echo -e " is used to transfer files from this host to nodes for local procesing." + echo + echo -e "--node | -n File containig a list of nodes that act as PPSS clients. One IP / DNS " + echo -e " name per line." + echo + echo -e "--key | -k The SSH key that a node uses to connect to the master." + echo + echo -e "--known-hosts | -K The file that contains the server public key. Can often be found on " + echo -e " hosts that already once connected to the server. See the file " + echo -e " ~/.ssh/known_hosts or else, manualy connect once and check this file." + echo + echo -e "--user | -u The SSH user name that is used when logging in into the master SSH" + echo -e " server." + echo + echo -e "--script | -S Specifies the script/program that must be copied to the nodes for " + echo -e " execution through PPSS. Only used in the deploy mode." + echo -e " This option should be specified if necessary when generating a config." + echo + echo -e "--transfer | -t This option specifies that an item will be downloaded by the node " + echo -e " from the server or share to the local node for processing." + echo + echo -e "--no-scp | -b Do not use scp for downloading items. Use cp instead. Assumes that a" + echo -e " network file system (NFS/SMB) is mounted under a local mountpoint." + echo + echo -e "--outputdir | -o Directory on server where processed files are put. If the result of " + echo -e " encoding a wav file is an mp3 file, the mp3 file is put in the " + echo -e " directory specified with this option." + echo + echo -e "--homedir | -H Directory in which directory PPSS is installed on the node." + echo -e " Default is 'ppss'." + echo + echo -e "Example: encoding some wav files to mp3 using lame:" + echo + echo -e "$0 -c 'lame ' -d /path/to/wavfiles -j " + echo + echo -e "Running PPSS based on a configuration file." + echo + echo -e "$0 -C config.cfg" + echo + echo -e "Running PPSS on a client as part of a cluster." + echo + echo -e "$0 -d /somedir -c 'cp "$ITEM" /some/destination' -s 10.0.0.50 -u ppss -t -k ppss-key.key" + echo +} + +kill_process () { + + echo "$KILL_KEY" >> "$FIFO" +} + +exec_cmd () { + + + CMD="$1" + + if [ ! -z "$SSH_SERVER" ] + then + ssh $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER $CMD + return $? + else + eval "$CMD" + return $? + fi +} + +# this function makes remote or local checking of existence of items transparent. +does_file_exist () { + + FILE="$1" + `exec_cmd "ls -1 $FILE" >> /dev/null 2>&1` + if [ "$?" == "0" ] + then + return 0 + else + return 1 + fi +} + +check_for_interrupt () { + + does_file_exist "$STOP_SIGNAL" + if [ "$?" == "0" ] + then + set_status "STOPPED" + log INFO "STOPPING job. Stop signal found." + STOP="1" + return 1 + fi + + does_file_exist "$PAUSE_SIGNAL" + if [ "$?" == "0" ] + then + set_status "PAUZED" + log INFO "PAUSE: sleeping for $PAUSE_DELAY SECONDS." + sleep 0.$PAUSE_DELAY + check_for_interrupt + else + set_status "RUNNING" + fi +} + +cleanup () { + + #log DEBUG "$FUNCNAME - Cleaning up all temp files and processes." + + if [ -e "$FIFO" ] + then + rm "$FIFO" + fi + + if [ -e "$ARRAY_POINTER_FILE" ] + then + rm "$ARRAY_POINTER_FILE" + fi + + if [ -e "$GLOBAL_LOCK" ] + then + rm -rf "$GLOBAL_LOCK" + fi + + if [ -e "$SSH_SOCKET" ] + then + rm -rf "$SSH_SOCKET" + fi + +} + +add_var_to_config () { + + if [ "$MODE" == "config" ] + then + + VAR="$1" + VALUE="$2" + + echo -e "$VAR=$VALUE" >> $CONFIG + fi +} + +# Process any command-line options that are specified." +while [ $# -gt 0 ] +do + case $1 in + + --config|-C ) + CONFIG="$2" + + if [ "$MODE" == "config" ] + then + if [ -e "$CONFIG" ] + then + echo "Do want to overwrite existing config file? [y/n]" + read yn + if [ "$yn" == "y" ] || [ "$yn" == "yes" ] + then + rm "$CONFIG" + else + echo "Aborting..." + cleanup + exit + fi + fi + fi + + if [ ! "$MODE" == "config" ] + then + source $CONFIG + fi + + if [ ! -z "$SSH_KEY" ] + then + SSH_KEY="-i $SSH_KEY" + fi + + if [ ! -e "./known_hosts" ] + then + if [ -e $SSH_KNOWN_HOSTS ] + then + if [ ! "$SSH_KNOWN_HOSTS" == "known_hosts" ] + then + cat $SSH_KNOWN_HOSTS > ./known_hosts + fi + else + echo "File $SSH_KNOWN_HOSTS does not exist." + exit + fi + fi + shift 2 + ;; + + --working-dir|-w ) + PPSS_DIR="$2" + add_var_to_config PPSS_DIR "$PPSS_DIR" + shift 2 + ;; + + --node|-n ) + NODES_FILE="$2" + add_var_to_config NODES_FILE "$NODES_FILE" + shift 2 + ;; + + --sourcefile|-f ) + INPUT_FILE="$2" + add_var_to_config INPUT_FILE "$INPUT_FILE" + shift 2 + ;; + --sourcedir|-d ) + SRC_DIR="$2" + add_var_to_config SRC_DIR "$SRC_DIR" + shift 2 + ;; + --delay|-D) + MAX_DELAY="$2" + add_var_to_config MAX_DELAY "$MAX_DELAY" + shift 2 + ;; + --command|-c ) + COMMAND="$2" + if [ "$MODE" == "config" ] + then + COMMAND=\'$COMMAND\' + add_var_to_config COMMAND "$COMMAND" + fi + shift 2 + ;; + + -h ) + showusage_normal + exit 1;; + --help) + showusage_long + exit 1;; + --homedir|-H ) + if [ ! -z "$2" ] + then + PPSS_HOME_DIR="$2" + add_var_to_config PPSS_DIR $PPSS_HOME_DIR + shift 2 + fi + ;; + + --disable-ht|-j ) + HYPERTHREADING=no + add_var_to_config HYPERTHREADING $HYPERTHREADING + shift 1 + ;; + --log|-l ) + LOGFILE="$2" + add_var_to_config LOGFILE "$LOGFILE" + shift 2 + ;; + --workingdir|-w ) + WORKINGDIR="$2" + add_var_to_config WORKINGDIR "$WORKINGDIR" + shift 2 + ;; + --key|-k ) + SSH_KEY="$2" + add_var_to_config SSH_KEY "$SSH_KEY" + if [ ! -z "$SSH_KEY" ] + then + SSH_KEY="-i $SSH_KEY" + fi + shift 2 + ;; + --known-hosts | -K ) + SSH_KNOWN_HOSTS="$2" + add_var_to_config SSH_KNOWN_HOSTS "$SSH_KNOWN_HOSTS" + shift 2 + ;; + + --no-scp |-b ) + SECURE_COPY=0 + add_var_to_config SECURE_COPY "$SECURE_COPY" + shift 1 + ;; + --outputdir|-o ) + REMOTE_OUTPUT_DIR="$2" + add_var_to_config REMOTE_OUTPUT_DIR "$REMOTE_OUTPUT_DIR" + shift 2 + ;; + --processes|-p ) + TMP="$2" + if [ ! -z "$TMP" ] + then + MAX_NO_OF_RUNNING_JOBS="$TMP" + add_var_to_config MAX_NO_OF_RUNNING_JOBS "$MAX_NO_OF_RUNNING_JOBS" + shift 2 + fi + ;; + --master|-m ) + SSH_SERVER="$2" + add_var_to_config SSH_SERVER "$SSH_SERVER" + shift 2 + ;; + --script|-S ) + SCRIPT="$2" + add_var_to_config SCRIPT "$SCRIPT" + shift 2 + ;; + --transfer|-t ) + TRANSFER_TO_SLAVE="1" + add_var_to_config TRANSFER_TO_SLAVE "$TRANSFER_TO_SLAVE" + shift 1 + ;; + --user|-u ) + USER="$2" + add_var_to_config USER "$USER" + shift 2 + ;; + + --version|-v ) + echo "" + echo "$SCRIPT_NAME version $SCRIPT_VERSION" + echo "" + exit 0 + ;; + * ) + + showusage_short + echo + echo "Unknown option $1 " + echo + exit 1;; + esac +done + + +display_header () { + + log info "" + log INFO "=========================================================" + log INFO " |P|P|S|S| " + log INFO "$SCRIPT_NAME version $SCRIPT_VERSION" + log INFO "=========================================================" + log INFO "Hostname:\t\t$HOSTNAME" + log INFO "---------------------------------------------------------" +} + +create_working_directory () { + + if [ ! -e "$PPSS_DIR" ] + then + mkdir -p "$PPSS_DIR" + fi +} + +# Init all vars +init_vars () { + + create_working_directory + + if [ "$ARCH" == "Darwin" ] + then + MIN_JOBS=4 + elif [ "$ARCH" == "Linux" ] + then + MIN_JOBS=3 + fi + + if [ -e "$LOGFILE" ] + then + rm $LOGFILE + fi + + display_header + + if [ -z "$COMMAND" ] + then + echo + log ERROR "No command specified." + echo + showusage_normal + cleanup + exit 1 + fi + + echo 0 > $ARRAY_POINTER_FILE + + FIFO=/tmp/ppss-fifo-$RANDOM-$RANDOM + + if [ ! -e "$FIFO" ] + then + mkfifo -m 600 $FIFO + fi + + exec 42<> $FIFO + + set_status "RUNNING" + + if [ -e "$CPUINFO" ] + then + CPU=`cat /proc/cpuinfo | grep 'model name' | cut -d ":" -f 2 | sed -e s/^\ //g | sort | uniq` + log INFO "CPU: $CPU" + elif [ "$ARCH" == "Darwin" ] + then + MODEL=`system_profiler SPHardwareDataType | grep "Processor Name" | cut -d ":" -f 2` + SPEED=`system_profiler SPHardwareDataType | grep "Processor Speed" | cut -d ":" -f 2` + log INFO "CPU: $MODEL $SPEED" + elif [ "$ARCH" == "SunOS" ] + then + CPU=`psrinfo -v | grep MHz | cut -d " " -f 4,8 | awk '{ printf ("Processor architecture: %s @ %s MHz.\n", $1,$2) }' | head -n 1` + + log INFO "$CPU" + else + log INFO "CPU: Cannot determine. Provide a patch for your arch!" + log INFO "Arch is $ARCH" + fi + + if [ -z "$MAX_NO_OF_RUNNING_JOBS" ] + then + get_no_of_cpus $HYPERTHREADING + fi + + does_file_exist "$JOB_LOG_DIR" + if [ ! "$?" == "0" ] + then + log DEBUG "Job log directory $JOB_lOG_DIR does not exist. Creating." + exec_cmd "mkdir -p $JOB_LOG_DIR" + else + log DEBUG "Job log directory $JOB_LOG_DIR exists." + fi + + does_file_exist "$ITEM_LOCK_DIR" + if [ ! "$?" == "0" ] && [ ! -z "$SSH_SERVER" ] + then + log DEBUG "Creating remote item lock dir." + exec_cmd "mkdir $ITEM_LOCK_DIR" + fi + + if [ ! -e "$JOB_LOG_DIR" ] + then + mkdir -p "$JOB_LOG_DIR" + fi + + does_file_exist "$REMOTE_OUTPUT_DIR" + if [ ! "$?" == "0" ] + then + log ERROR "Remote output dir $REMOTE_OUTPUT_DIR does not exist." + set_status STOPPED + cleanup + exit + fi + + if [ ! -e "$PPSS_LOCAL_TMPDIR" ] + then + mkdir "$PPSS_LOCAL_TMPDIR" + fi + + if [ ! -e "$PPSS_LOCAL_OUTPUT" ] + then + mkdir "$PPSS_LOCAL_OUTPUT" + fi +} + +get_status () { + + STATUS=`cat "$NODE_SATUS"` + echo "$STATUS" +} + +set_status () { + + STATUS="$1" + echo "$HOSTNAME $STATUS" > "$NODE_STATUS" +} + + +expand_str () { + + STR=$1 + LENGTH=$TYPE_LENGTH + SPACE=" " + + while [ "${#STR}" -lt "$LENGTH" ] + do + STR=$STR$SPACE + done + + echo "$STR" +} + +log () { + + # Type 'INFO' is logged to the screen + # Any other log-type is only logged to the logfile. + + TYPE="$1" + MESG="$2" + TYPE_LENGTH=5 + + TYPE_EXP=`expand_str "$TYPE"` + + DATE=`date +%b\ %d\ %H:%M:%S` + PREFIX="$DATE: ${TYPE_EXP:0:$TYPE_LENGTH}" + PREFIX_SMALL="$DATE: " + + LOG_MSG="$PREFIX $MESG" + ECHO_MSG="$PREFIX_SMALL $MESG" + + echo -e "$LOG_MSG" >> "$LOGFILE" + + if [ "$TYPE" == "INFO" ] || [ "$TYPE" == "ERROR" ] || [ "$TYPE" == "WARN" ] + then + echo -e "$ECHO_MSG" + fi + +} + +check_status () { + + ERROR="$1" + FUNCTION="$2" + MESSAGE="$3" + + if [ ! "$ERROR" == "0" ] + then + log INFO "$FUNCTION - $MESSAGE" + set_status STOPPED + cleanup + exit 1 + fi + +} + +erase_ppss () { + + + echo "Are you realy sure you want to erase PPSS from all nodes!? (YES/NO)" + read YN + + if [ "$YN" == "yes" ] || [ "$YN" == "YES" ] + then + for NODE in `cat $NODES_FILE` + do + log INFO "Erasing PPSS homedir $PPSS_DIR from node $NODE." + ssh -q $SSH_KEY $SSH_OPTS $USER@$NODE "rm -rf $PPSS_HOME_DIR" + done + else + log INFO "Aborting.." + fi + sleep 1 +} + +deploy () { + + NODE="$1" + + SSH_OPTS_NODE="-o BatchMode=yes -o ControlPath=socket-%h \ + -o GlobalKnownHostsFile=./known_hosts \ + -o ControlMaster=auto \ + -o Cipher=blowfish \ + -o ConnectTimeout=5 " + + ERROR=0 + set_error () { + + if [ ! "$1" == "0" ] + then + ERROR=1 + fi + } + + ssh -q -o ConnectTimeout=5 $SSH_KEY $USER@$NODE exit 0 + set_error "$?" + if [ ! "$ERROR" == "0" ] + then + log ERROR "Cannot connect to node $NODE." + return + fi + + ssh -N -M $SSH_OPTS_NODE $SSH_KEY $USER@$NODE & + SSH_PID=$! + + KEY=`echo $SSH_KEY | cut -d " " -f 2` + + sleep 1 + + ssh -q $SSH_OPTS_NODE $SSH_KEY $USER@$NODE "cd ~ && mkdir $PPSS_HOME_DIR >> /dev/null 2>&1" + scp -q $SSH_OPTS_NODE $SSH_KEY $0 $USER@$NODE:~/$PPSS_HOME_DIR + set_error $? + scp -q $SSH_OPTS_NODE $SSH_KEY $KEY $USER@$NODE:~/$PPSS_HOME_DIR + set_error $? + scp -q $SSH_OPTS_NODE $SSH_KEY $CONFIG $USER@$NODE:~/$PPSS_HOME_DIR + set_error $? + scp -q $SSH_OPTS_NODE $SSH_KEY known_hosts $USER@$NODE:~/$PPSS_HOME_DIR + set_error $? + if [ ! -z "$SCRIPT" ] + then + scp -q $SSH_OPTS_NODE $SSH_KEY $SCRIPT $USER@$NODE:~/$PPSS_HOME_DIR + set_error $? + fi + + if [ ! -z "$INPUT_FILE" ] + then + scp -q $SSH_OPTS_NODE $SSH_KEY $INPUT_FILE $USER@$NODE:~/$PPSS_HOME_DIR + set_error $? + fi + + if [ "$ERROR" == "0" ] + then + log INFO "PPSS installed on node $NODE." + else + log INFO "PPSS failed to install on $NODE." + fi + + kill $SSH_PID +} + +deploy_ppss () { + + + if [ -z "$NODES_FILE" ] + then + log INFO "ERROR - are you using the right option? -C ?" + set_status ERROR + cleanup + exit 1 + fi + + KEY=`echo $SSH_KEY | cut -d " " -f 2` + if [ -z "$KEY" ] || [ ! -e "$KEY" ] + then + log ERROR "Nodes require a key file." + cleanup + set_status "ERROR" + exit 1 + fi + + if [ ! -e "$SCRIPT" ] && [ ! -z "$SCRIPT" ] + then + log ERROR "Script $SCRIPT not found." + set_status "ERROR" + cleanup + exit 1 + fi + + INSTALLED_ON_SSH_SERVER=0 + if [ ! -e "$NODES_FILE" ] + then + log ERROR "File $NODES with list of nodes does not exist." + set_status ERROR + cleanup + exit 1 + else + for NODE in `cat $NODES_FILE` + do + deploy "$NODE" & + sleep 0.1 + if [ "$NODE" == "$SSH_SERVER" ] + then + log DEBUG "SSH SERVER $SSH_SERVER is also a node." + INSTALLED_ON_SSH_SERVER=1 + exec_cmd "mkdir -p $PPSS_HOME_DIR/$JOB_LOG_DIR" + exec_cmd "mkdir -p $ITEM_LOCK_DIR" + fi + done + if [ "$INSTALLED_ON_SSH_SERVER" == "0" ] + then + log DEBUG "SSH SERVER $SSH_SERVER is not a node." + deploy "$SSH_SERVER" + exec_cmd "mkdir -p $PPSS_HOME_DIR/$JOB_LOG_DIR" + exec_cmd "mkdir -p $ITEM_LOCK_DIR" + fi + fi +} + +start_ppss_on_node () { + + NODE="$1" + log INFO "Starting PPSS on node $NODE." + ssh $SSH_KEY $USER@$NODE -o ConnectTimeout=5 "cd $PPSS_HOME_DIR ; screen -d -m -S PPSS ~/$PPSS_HOME_DIR/$0 node --config ~/$PPSS_HOME_DIR/$CONFIG" +} + +test_server () { + + # Testing if the remote server works as expected. + if [ ! -z "$SSH_SERVER" ] + then + exec_cmd "date >> /dev/null" + check_status "$?" "$FUNCNAME" "Server $SSH_SERVER could not be reached" + + ssh -N -M $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER & + SSH_MASTER_PID="$!" + log DEBUG "SSH Master pid is $SSH_MASTER_PID" + else + log DEBUG "No remote server specified, assuming stand-alone mode." + fi +} + +get_no_of_cpus () { + + # Use hyperthreading or not? + HPT=$1 + NUMBER="" + + if [ -z "$HPT" ] + then + HPT=yes + fi + + got_cpu_info () { + + ERROR="$1" + check_status "$ERROR" "$FUNCNAME" "cannot determine number of cpu cores. Specify with -p." + + } + + if [ "$HPT" == "yes" ] + then + if [ "$ARCH" == "Linux" ] + then + NUMBER=`grep ^processor $CPUINFO | wc -l` + got_cpu_info "$?" + + elif [ "$ARCH" == "Darwin" ] + then + NUMBER=`sysctl -a hw | grep -w logicalcpu | awk '{ print $2 }'` + got_cpu_info "$?" + + elif [ "$ARCH" == "FreeBSD" ] + then + NUMBER=`sysctl hw.ncpu | awk '{ print $2 }'` + got_cpu_info "$?" + + elif [ "$ARCH" == "SunOS" ] + then + NUMBER=`psrinfo | grep on-line | wc -l` + got_cpu_info "$?" + else + if [ -e "$CPUINFO" ] + then + NUMBER=`grep ^processor $CPUINFO | wc -l` + got_cpu_info "$?" + fi + fi + + if [ ! -z "$NUMBER" ] + then + log INFO "Found $NUMBER logic processors." + fi + + elif [ "$HPT" == "no" ] + then + log INFO "Hyperthreading is disabled." + + if [ "$ARCH" == "Linux" ] + then + PHYSICAL=`grep 'physical id' $CPUINFO` + if [ "$?" == "0" ] + then + PHYSICAL=`grep 'physical id' $CPUINFO | sort | uniq | wc -l` + if [ "$PHYSICAL" == "1" ] + then + log INFO "Found $PHYSICAL physical CPU." + else + log INFO "Found $PHYSICAL physical CPUs." + fi + + TMP=`grep 'core id' $CPUINFO` + if [ "$?" == "0" ] + then + log DEBUG "Starting job only for each physical core on all physical CPU(s)." + NUMBER=`grep 'core id' $CPUINFO | sort | uniq | wc -l` + log INFO "Found $NUMBER physical cores." + else + log INFO "Single core processor(s) detected." + log INFO "Starting job for each physical CPU." + NUMBER=$PHYSICAL + fi + else + log INFO "No 'physical id' section found in $CPUINFO, typical for older cpus." + NUMBER=`grep ^processor $CPUINFO | wc -l` + got_cpu_info "$?" + fi + elif [ "$ARCH" == "Darwin" ] + then + NUMBER=`sysctl -a hw | grep -w physicalcpu | awk '{ print $2 }'` + got_cpu_info "$?" + elif [ "$ARCH" == "FreeBSD" ] + then + NUMBER=`sysctl hw.ncpu | awk '{ print $2 }'` + got_cpu_info "$?" + else + NUMBER=`cat $CPUINFO | grep "cpu cores" | cut -d ":" -f 2 | uniq | sed -e s/\ //g` + got_cpu_info "$?" + fi + + fi + + if [ ! -z "$NUMBER" ] + then + MAX_NO_OF_RUNNING_JOBS=$NUMBER + else + log ERROR "Number of CPUs not obtained." + log ERROR "Please specify manually with -p." + set_status "ERROR" + exit 1 + fi +} + +random_delay () { + + ARGS="$1" + + if [ -z "$ARGS" ] + then + log ERROR "$FUNCNAME Function random delay, no argument specified." + set_status ERROR + exit 1 + fi + + NUMBER=$RANDOM + let "NUMBER %= $ARGS" + sleep "0.$NUMBER" +} + + +global_lock () { + + mkdir $GLOBAL_LOCK > /dev/null 2>&1 + ERROR="$?" + + if [ ! "$ERROR" == "0" ] + then + return 1 + else + return 0 + fi +} + +get_global_lock () { + + while true + do + global_lock + ERROR="$?" + if [ ! "$ERROR" == "0" ] + then + random_delay $MAX_LOCK_DELAY + continue + else + break + fi + done +} + +release_global_lock () { + + rm -rf "$GLOBAL_LOCK" +} + +are_jobs_running () { + + NUMBER_OF_PROCS=`jobs | wc -l` + if [ "$NUMBER_OF_PROCS" -gt "1" ] + then + return 0 + else + return 1 + fi +} + +escape_item () { + + TMP="$1" + + ITEM_ESCAPED=`echo "$TMP" | \ + sed s/\\ /\\\\\\\\\\\\\\ /g | \ + sed s/\\'/\\\\\\\\\\\\\\'/g | \ + sed s/\\\`/\\\\\\\\\\\\\\\`/g | \ + sed s/\\|/\\\\\\\\\\\\\\|/g | \ + sed s/\&/\\\\\\\\\\\\\\&/g | \ + sed s/\;/\\\\\\\\\\\\\\;/g | \ + sed s/\\\//\\\\\\\\\\\\\\ /g | \ + sed s/\(/\\\\\\\\\\(/g | \ + sed s/\)/\\\\\\\\\\)/g ` +} + +download_item () { + + ITEM="$1" + if [ -e "$ITEM" ] + then + ITEM_NO_PATH=`basename "$ITEM"` + else + escape_item "$ITEM" + ITEM_NO_PATH="$ITEM_ESCAPED" + fi + + if [ "$TRANSFER_TO_SLAVE" == "1" ] + then + log DEBUG "Transfering item $ITEM_NO_PATH to local disk." + if [ "$SECURE_COPY" == "1" ] && [ ! -z "$SSH_SERVER" ] + then + if [ ! -z "$SRC_DIR" ] + then + ITEM_PATH="$SRC_DIR/$ITEM" + else + ITEM_PATH="$ITEM" + fi + + escape_item "$ITEM_PATH" + + scp -q $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER:"$ITEM_ESCAPED" ./$PPSS_LOCAL_TMPDIR + log DEBUG "Exit code of remote transfer is $?" + else + cp "$ITEM" ./$PPSS_LOCAL_TMPDIR + log DEBUG "Exit code of local transfer is $?" + fi + else + log DEBUG "No transfer of item $ITEM_NO_PATH to local workpath." + fi +} + +upload_item () { + + + ITEM="$1" + ITEMDIR="$2" + + if [ "$TRANSFER_TO_SLAVE" == "0" ] + then + log DEBUG "File transfer is disabled." + return 0 + fi + + log DEBUG "Uploading item $ITEM." + if [ "$SECURE_COPY" == "1" ] + then + escape_item "$REMOTE_OUTPUT_DIR$ITEMDIR" + DIR_ESCAPED="$ITEM_ESCAPED" + + scp -q $SSH_OPTS $SSH_KEY "$ITEM"/* $USER@$SSH_SERVER:"$DIR_ESCAPED" + ERROR="$?" + if [ ! "$ERROR" == "0" ] + then + log ERROR "Uploading of $ITEM via SCP failed." + else + log DEBUG "Upload of item $ITEM success" + rm -rf ./"$ITEM" + fi + else + cp "$ITEM" "$REMOTE_OUTPUT_DIR" + ERROR="$?" + if [ ! "$ERROR" == "0" ] + then + log DEBUG "ERROR - uploading of $ITEM vi CP failed." + fi + fi +} + +lock_item () { + + if [ ! -z "$SSH_SERVER" ] + then + ITEM="$1" + + LOCK_FILE_NAME=`echo "$ITEM" | \ + sed s/^\\\.//g | \ + sed s/^\\\.\\\.//g | \ + sed s/^\\\///g | \ + sed s/\\\//\\\\\\ /g | \ + sed s/\\ /\\\\\\\\\\\\\\ /g | \ + sed s/\\'/\\\\\\\\\\\\\\'/g | \ + sed s/\\\//\\\\\\\\\\\\\\ /g | \ + sed s/\&/\\\\\\\\\\\\\\&/g | \ + sed s/\;/\\\\\\\\\\\\\\;/g | \ + sed s/\(/\\\\\\\\\\(/g | \ + sed s/\)/\\\\\\\\\\)/g ` + + ITEM_LOCK_FILE="$ITEM_LOCK_DIR/$LOCK_FILE_NAME" + log DEBUG "Trying to lock item $ITEM - $ITEM_LOCK_FILE." + exec_cmd "mkdir $ITEM_LOCK_FILE >> /dev/null 2>&1" + ERROR="$?" + + if [ "$ERROR" == "$?" ] + then + exec_cmd "touch $ITEM_LOCK_FILE/$HOSTNAME" # Record that item is claimed by node x. + fi + + return "$ERROR" + fi +} + +get_all_items () { + + count=0 + + if [ -z "$INPUT_FILE" ] + then + if [ ! -z "$SSH_SERVER" ] # Are we running stand-alone or as a slave?" + then + ITEMS=`exec_cmd "ls -1 $SRC_DIR"` + check_status "$?" "$FUNCNAME" "Could not list files within remote source directory." + else + if [ -e "$SRC_DIR" ] + then + ITEMS=`ls -1 $SRC_DIR` + else + ITEMS="" + fi + fi + IFS=$'\n' + + for x in $ITEMS + do + ARRAY[$count]="$x" + ((count++)) + done + IFS=$IFS_BACKUP + else + if [ ! -z "$SSH_SERVER" ] # Are we running stand-alone or as a slave?" + then + log DEBUG "Running as slave, input file has been pushed (hopefully)." + fi + if [ ! -e "$INPUT_FILE" ] + then + log ERROR "Input file $INPUT_FILE does not exist." + set_status "ERROR" + cleanup + exit 1 + fi + + exec 10<"$INPUT_FILE" + + while read LINE <&10 + do + ARRAY[$count]=$LINE + ((count++)) + done + + exec 10>&- + + fi + + SIZE_OF_ARRAY="${#ARRAY[@]}" + if [ "$SIZE_OF_ARRAY" -le "0" ] + then + log ERROR "Source file/dir seems to be empty." + set_status STOPPED + cleanup + exit 1 + fi +} + +get_item () { + + check_for_interrupt + + if [ "$STOP" == "1" ] + then + return 1 + fi + + get_global_lock + + SIZE_OF_ARRAY="${#ARRAY[@]}" + + # Return error if the array is empty. + if [ "$SIZE_OF_ARRAY" -le "0" ] + then + release_global_lock + return 1 + fi + + # This variable is used to walk thtough all array items. + ARRAY_POINTER=`cat $ARRAY_POINTER_FILE` + + # Check if all items have been processed. + if [ "$ARRAY_POINTER" -ge "$SIZE_OF_ARRAY" ] + then + release_global_lock + #echo -en "\033[1A" + return 1 + fi + + # Select an item. + ITEM="${ARRAY[$ARRAY_POINTER]}" + if [ -z "$ITEM" ] + then + ((ARRAY_POINTER++)) + echo $ARRAY_POINTER > $ARRAY_POINTER_FILE + release_global_lock + get_item + else + ((ARRAY_POINTER++)) + echo $ARRAY_POINTER > $ARRAY_POINTER_FILE + lock_item "$ITEM" + if [ ! "$?" == "0" ] + then + log DEBUG "Item $ITEM is locked." + release_global_lock + get_item + else + log DEBUG "Got lock on $ITEM, processing." + release_global_lock + download_item "$ITEM" + return 0 + fi + fi +} + +start_single_worker () { + + # + # This function sends an item to the fifo. This signals + # the listener process to execute a 'worker' on this + # item, using the 'commando' function. + # + + get_item + ERROR=$? + if [ ! "$ERROR" == "0" ] + then + # + # If no more items are available, the listener should be + # informed that a worker just finished / died. + # This allows the listener to determine if all processes + # are finished and it is time to stop. + # + echo "$STOP_KEY" > $FIFO + return 1 + else + get_global_lock + echo "$ITEM" > $FIFO + release_global_lock + return 0 + fi +} + + +elapsed () { + + BEFORE="$1" + AFTER="$2" + + ELAPSED="$(expr $AFTER - $BEFORE)" + + REMAINDER="$(expr $ELAPSED % 3600)" + HOURS="$(expr $(expr $ELAPSED - $REMAINDER) / 3600)" + + SECS="$(expr $REMAINDER % 60)" + MINS="$(expr $(expr $REMAINDER - $SECS) / 60)" + + echo "Elapsed time (h:m:s): $HOURS:$MINS:$SECS" +} + +commando () { + + # + # This function will start a chain reaction of events. + # + # The commando executes a command on an item and, when finished, + # executes the start_single_worker. This function selects a new + # item and sends it to the fifo. The listener process receives + # the item and excutes this commando function on the item. + # So in essence, the commando function keeps calling itself + # indirectly until no items are left. This will form a single + # working queue. By executing multiple start_single_worker + # functions based on the CPU cores available, parallel processing + # is achieved, with a queue for each core. + # + + ITEM="$1" + + if [ -e "$ITEM" ] + then + DIRNAME=`dirname "$ITEM"` + ITEM_NO_PATH=`basename "$ITEM"` + escape_item "$ITEM_NO_PATH" + OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$ITEM_ESCAPED" + # ^ + # | This VAR can be used in scripts or command lines. + # + OUTPUT_FILE="$ITEM_ESCAPED" + else + DIRNAME="" + escape_item "$ITEM" + ITEM_NO_PATH="$ITEM_ESCAPED" + OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$ITEM_NO_PATH" + fi + + log DEBUG "Processing item $ITEM" + # + # Decide if an item must be transfered from server to the node. + # or be processed in-place (NFS / SMB mount?) + # + if [ "$TRANSFER_TO_SLAVE" == "0" ] + then + if [ -z "$SRC_DIR" ] && [ ! -z "$INPUT_FILE" ] + then + log DEBUG "Using item straight from the server." + else + ITEM="$SRC_DIR/$ITEM" + fi + else + ITEM="./$PPSS_LOCAL_TMPDIR/$ITEM_NO_PATH" + fi + + LOG_FILE_NAME=`echo "$ITEM" | sed s/^\\\.//g | sed s/^\\\.\\\.//g | sed s/\\\///g | sed s/\\ /_/g` + ITEM_LOG_FILE="$JOB_LOG_DIR/$LOG_FILE_NAME" + + OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$LOG_FILE_NAME" + + mkdir -p "$OUTPUT_DIR" + + does_file_exist "$ITEM_LOG_FILE" + if [ "$?" == "0" ] + then + log DEBUG "Skipping item $ITEM - already processed." + else + ERROR="" + # + # Some formatting of item log files. + # + DATE=`date +%b\ %d\ %H:%M:%S` + echo "===== PPSS Item Log File =====" > "$ITEM_LOG_FILE" + echo -e "Host:\t\t$HOSTNAME" >> "$ITEM_LOG_FILE" + echo -e "Process:\t$PID" >> "$ITEM_LOG_FILE" + echo -e "Item:\t\t$ITEM" >> "$ITEM_LOG_FILE" + echo -e "Start date:\t$DATE" >> "$ITEM_LOG_FILE" + echo -e "" >> "$ITEM_LOG_FILE" + + # + # The actual execution of the command as specified by + # the -c option. + # + BEFORE="$(date +%s)" + TMP=`echo $COMMAND | grep -i '$ITEM'` + if [ "$?" == "0" ] + then + eval "$COMMAND" >> "$ITEM_LOG_FILE" 2>&1 + ERROR="$?" + MYPID="$!" + else + eval '$COMMAND"$ITEM" >> "$ITEM_LOG_FILE" 2>&1' + ERROR="$?" + MYPID="$!" + fi + AFTER="$(date +%s)" + + echo -e "" >> "$ITEM_LOG_FILE" + + # Some error logging. Success or fail. + if [ ! "$ERROR" == "0" ] + then + echo -e "Status:\t\tFAILURE" >> "$ITEM_LOG_FILE" + else + echo -e "Status:\t\tSUCCESS" >> "$ITEM_LOG_FILE" + fi + + # + # If part of a cluster, remove the downloaded item after + # it has been processed and uploaded as not to fill up disk space. + # + if [ "$TRANSFER_TO_SLAVE" == "1" ] + then + if [ -e "$ITEM" ] + then + rm "$ITEM" + else + log DEBUG "ERROR Something went wrong removing item $ITEM from local work dir." + fi + + fi + + escape_item "$DIRNAME" + ITEM_OUTPUT_DIR="$REMOTE_OUTPUT_DIR/$ITEM_ESCAPED" + + exec_cmd "mkdir -p $ITEM_OUTPUT_DIR" + if [ "$DIRNAME" == "." ] + then + DIRNAME="" + fi + upload_item "$PPSS_LOCAL_OUTPUT/$ITEM_NO_PATH" "$DIRNAME" + + elapsed "$BEFORE" "$AFTER" >> "$ITEM_LOG_FILE" + echo -e "" >> "$ITEM_LOG_FILE" + + if [ ! -z "$SSH_SERVER" ] + then + log DEBUG "Uploading item log file $ITEM_LOG_FILE to master ~/$PPSS_HOME_DIR/$JOB_LOG_DIR" + scp -q $SSH_OPTS $SSH_KEY "$ITEM_LOG_FILE" $USER@$SSH_SERVER:~/$PPSS_HOME_DIR/$JOB_LOG_DIR + if [ ! "$?" == "0" ] + then + log DEBUG "Uploading of item log file failed." + fi + fi + fi + + start_single_worker + return $? +} + +# +# This is the listener service. It listens on the pipe for events. +# A job is executed for every event received. +# This listener enables fully asynchronous processing. +# +listen_for_job () { + FINISHED=0 + DIED=0 + PIDS="" + log DEBUG "Listener started." + while read event <& 42 + do + if [ "$event" == "$STOP_KEY" ] + then + # + # The start_single_worker method sends a special signal to + # inform the listener that a worker is finished. + # If all workers are finished, it is time to stop. + # This mechanism makes PPSS asynchronous. + # + ((DIED++)) + if [ "$DIED" -ge "$MAX_NO_OF_RUNNING_JOBS" ] + then + #kill_process + break + else + RES=$((MAX_NO_OF_RUNNING_JOBS-DIED)) + if [ "$RES" == "1" ] + then + log INFO "$((MAX_NO_OF_RUNNING_JOBS-DIED)) job is remaining. " + else + log INFO "$((MAX_NO_OF_RUNNING_JOBS-DIED)) jobs are remaining." + echo -en "\033[1A" + fi + fi + elif [ "$event" == "$KILL_KEY" ] + then + # + # If all workers are finished, it is time to kill all remaining + # processes, terminate ssh processes and the listener itself. + # + # This command kills all processes that are related to the master + # process as defined by $PID. All processes that have ever been + # spawned, although disowned or backgrounded will be killed... + # + PROCLIST=`ps a -o pid,pgid,ppid,command | grep [0-9] | grep $PID | grep -v -i grep` + #echo "$PROCLIST" > proclist.txt + oldIFS=$IFS # save the field separator + IFS=$'\n' # new field separator, the end of line + for x in `echo "$PROCLIST"` + do + MYPPID=`echo $x | awk '{ print $3 }'` + MYPID=`echo $x | awk '{ print $1 }'` + if [ ! "$MYPPID" == "$PID" ] && [ ! "$MYPPID" == "1" ] + then + if [ ! "$MYPID" == "$PID" ] + then + log DEBUG "Killing process $MYPID" + kill $MYPID >> /dev/null 2>&1 + else + log DEBUG "Not killing master process..$MYPID.." + fi + else + log DEBUG "Not killing listener process. $MYPID.." + fi + done + IFS=$oldIFS + + if [ ! -z "$SSH_MASTER_PID" ] + then + kill "$SSH_MASTER_PID" + fi + break + else + commando "$event" & + MYPID="$!" + disown + PIDS="$PIDS $MYPID" + #log DEBUG "Event $event has pid $MYPID" + fi + + get_global_lock + SIZE_OF_ARRAY="${#ARRAY[@]}" + ARRAY_POINTER=`cat $ARRAY_POINTER_FILE` + release_global_lock + PERCENT=$((100 * $ARRAY_POINTER / $SIZE_OF_ARRAY )) + if [ "$DIED" == "0" ] && [ "$FINISHED" == "0" ] + then + log INFO "Currently $PERCENT percent complete. Processed $ARRAY_POINTER of $SIZE_OF_ARRAY items." + if [ "$PERCENT" == "100" ] + then + FINISHED=1 + else + echo -en "\033[1A" + fi + fi + done + + set_status STOPPED + log DEBUG "Listener stopped." + if [ ! "$PERCENT" == "100" ] + then + echo + log INFO "Finished. Consult $JOB_LOG_DIR for job output." + log INFO "Press ENTER to continue." + else + log INFO "Finished. Consult $JOB_LOG_DIR for job output." + fi + cleanup +} + +# This starts an number of parallel workers based on the # of parallel jobs allowed. +start_all_workers () { + + if [ "$MAX_NO_OF_RUNNING_JOBS" == "1" ] + then + log INFO "Starting $MAX_NO_OF_RUNNING_JOBS single worker." + else + log INFO "Starting $MAX_NO_OF_RUNNING_JOBS parallel workers." + fi + log INFO "---------------------------------------------------------" + + i=0 + while [ "$i" -lt "$MAX_NO_OF_RUNNING_JOBS" ] + do + start_single_worker + ((i++)) + + if [ ! "$MAX_DELAY" == "0" ] + then + random_delay "$MAX_DELAY" + fi + done +} + +get_status_of_node () { + + NODE="$1" + STATUS=`ssh -o ConnectTimeout=10 $SSH_KEY $USER@$NODE cat "$PPSS_HOME_DIR/$NODE_STATUS" 2>/dev/null` + ERROR="$?" + if [ ! "$ERROR" == "0" ] + then + STATUS="UNKNOWN" + fi + echo "$STATUS" +} + +show_status () { + + source $CONFIG + if [ ! -z "$SSH_KEY" ] + then + SSH_KEY="-i $SSH_KEY" + fi + + if [ -z "$INPUT_FILE" ] + then + ITEMS=`exec_cmd "ls -1 $SRC_DIR | wc -l"` + else + ITEMS=`exec_cmd "cat $PPSS_DIR/$INPUT_FILE | wc -l"` + fi + + PROCESSED=`exec_cmd "ls -1 $ITEM_LOCK_DIR | wc -l"` 2>&1 >> /dev/null + TMP_STATUS=$((100 * $PROCESSED / $ITEMS)) + + log INFO "Status:\t\t$TMP_STATUS percent complete." + + if [ ! -z $NODES_FILE ] + then + TMP_NO=`cat $NODES_FILE | wc -l` + log INFO "Nodes:\t $TMP_NO" + fi + log INFO "Items:\t\t$ITEMS" + + + log INFO "---------------------------------------------------------" + HEADER=`echo IP-address Hostname Processed Status | awk '{ printf ("%-16s %-18s % 10s %10s\n",$1,$2,$3,$4) }'` + log INFO "$HEADER" + log INFO "---------------------------------------------------------" + PROCESSED=0 + for x in `cat $NODES_FILE` + do + NODE=`get_status_of_node "$x" | awk '{ print $1 }'` + if [ ! "$NODE" == "UNKNOWN" ] + then + STATUS=`get_status_of_node "$x" | awk '{ print $2 }'` + RES=`exec_cmd "grep -i $NODE ~/$PPSS_HOME_DIR/$JOB_LOG_DIR/* 2>/dev/null | wc -l "` + if [ ! "$?" == "0" ] + then + RES=0 + fi + else + STATUS="UNKNOWN" + RES=0 + fi + let PROCESSED=$PROCESSED+$RES + LINE=`echo "$x $NODE $RES $STATUS" | awk '{ printf ("%-16s %-18s % 10s %10s\n",$1,$2,$3,$4) }'` + log INFO "$LINE" + done + log INFO "---------------------------------------------------------" + LINE=`echo $PROCESSED | awk '{ printf ("Total processed: % 29s\n",$1) }'` + log INFO "$LINE" +} + + +# If this is called, the whole framework will execute. +main () { + + case $MODE in + node ) + init_vars + test_server + get_all_items + listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & 2>&1 >> /dev/null + LISTENER_PID=$! + start_all_workers + ;; + start ) + # This option only starts all nodes. + LOGFILE=/dev/null + display_header + if [ ! -e "$NODES_FILE" ] + then + log ERROR "File $NODES with list of nodes does not exist." + set_status STOPPED + cleanup + exit 1 + else + for NODE in `cat $NODES_FILE` + do + start_ppss_on_node "$NODE" + done + fi + cleanup + exit 0 + ;; + config ) + LOGFILE=/dev/null + display_header + log INFO "Generating configuration file $CONFIG" + add_var_to_config PPSS_LOCAL_TMPDIR "$PPSS_LOCAL_TMPDIR" + add_var_to_config PPSS_LOCAL_OUTPUT "$PPSS_LOCAL_OUTPUT" + cleanup + exit 0 + ;; + + stop ) + LOGFILE=/dev/null + display_header + log INFO "Stopping PPSS on all nodes." + exec_cmd "touch $STOP_SIGNAL" + cleanup + exit + ;; + pause ) + LOGFILE=/dev/null + display_header + log INFO "Pausing PPSS on all nodes." + exec_cmd "touch $PAUSE_SIGNAL" + cleanup + exit + ;; + continue ) + LOGFILE=/dev/null + display_header + if does_file_exist "$STOP_SIGNAL" + then + log INFO "Continuing processing, please use $0 start to start PPSS on al nodes." + exec_cmd "rm -f $STOP_SIGNAL" + fi + if does_file_exist "$PAUSE_SIGNAL" + then + log INFO "Continuing PPSS on all nodes." + exec_cmd "rm -f $PAUSE_SIGNAL" + fi + cleanup + exit 0 + ;; + deploy ) + LOGFILE=/dev/null + display_header + log INFO "Deploying PPSS on nodes." + deploy_ppss + wait + cleanup + exit 0 + ;; + status ) + LOGFILE=/dev/null + display_header + show_status + cleanup + exit 0 + ;; + erase ) + LOGFILE=/dev/null + display_header + log INFO "Erasing PPSS from all nodes." + erase_ppss + cleanup + exit 0 + ;; + kill ) + LOGFILE=/dev/null + for x in `ps ux | grep ppss | grep -v grep | grep bash | awk '{ print $2 }'` + do + kill "$x" + done + cleanup + exit 0 + ;; + + * ) + init_vars + get_all_items + listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & 2>&1 >> /dev/null + LISTENER_PID=$! + #log DEBUG "Master PID is $PID." + #log DEBUG "Listener PID is $LISTENER_PID." + start_all_workers + ;; + + esac + +} +# This command starts the that sets the whole framework in motion. +main + +# Exit after all processes have finished. +wait diff --git a/ppss-test.sh b/ppss-test.sh new file mode 100755 index 0000000..08a706a --- /dev/null +++ b/ppss-test.sh @@ -0,0 +1,223 @@ +#!/bin/bash + +DEBUG="$1" +VERSION=2.55 +TMP_DIR="ppss" +PPSS=ppss +PPSS_DIR=ppss_dir + +cleanup () { + + for x in $REMOVEFILES + do + if [ -e ./$x ] + then + rm -r ./$x + fi + done +} + +parseJobStatus () { + + TMP_FILE="$1" + + RES=`grep "Status:" "$JOBLOG/$TMP_FILE"` + STATUS=`echo "$RES" | awk '{ print $2 }'` + echo "$STATUS" + +} + +oneTimeSetUp () { + + NORMALTESTFILES=`echo test-{a..z}` + SPECIALTESTFILES="\'file-!@#$%^&*()_+=-0987654321~\' \'file-/\<>?:;'{}[]\' file-/\/\:\/!@#$%^&*()_+=-0987654321~ file-/\<>?:;'{}[] http://www.google.nl ftp://storage.nl" + JOBLOG=./$PPSS_DIR/job_log + INPUTFILENORMAL=test-normal.input + INPUTFILESPECIAL=test-special.input + LOCALOUTPUT=ppss_dir/PPSS_LOCAL_OUTPUT + + REMOVEFILES="$INPUTFILENORMAL $INPUTFILESPECIAL $PPSS_DIR test-ppss-*" + + cleanup + + for x in $NORMALTESTFILES + do + echo "$x" >> "$INPUTFILENORMAL" + done + + for x in $SPECIALTESTFILES + do + echo $x >> "$INPUTFILESPECIAL" + done +} + +testVersion () { + + RES=`./$PPSS -v` + + for x in $RES + do + echo "$x" | grep [0-9] >> /dev/null + if [ "$?" == "0" ] + then + assertEquals "Version mismatch!" "$VERSION" "$x" + fi + done +} + +rename-ppss-dir () { + + TEST="$1" + + if [ -e "$PPSS_DIR" ] && [ -d "$PPSS_DIR" ] && [ ! -z "$TEST" ] + then + mv "$PPSS_DIR" test-ppss-"$TEST" + fi +} + +oneTimeTearDown () { + + if [ ! "$DEBUG" == "debug" ] + then + cleanup + fi +} + +createDirectoryWithSomeFiles () { + + A="File with Spaces" + B="File\With\Slashes" + + mkdir "/tmp/$TMP_DIR" + for x in "$A" "$B" + do + TMP_FILE="/tmp/$TMP_DIR/$x" + touch "$TMP_FILE" + done +} + +testSpacesInFilenames () { + + createDirectoryWithSomeFiles + + RES=$( { ./$PPSS -d /tmp/$TMP_DIR -c 'ls -alh ' >> /dev/null ; } 2>&1 ) + assertEquals "PPSS did not execute properly." 0 "$?" + + assertNull "PPSS retured some errors..." "$RES" + if [ ! "$?" == "0" ] + then + echo "RES IS $RES" + fi + + grep "SUCCESS" $JOBLOG/* >> /dev/null 2>&1 + assertEquals "Found error with space in filename $TMP_FILE" "0" "$?" + + rm -rf "/tmp/$TMP_DIR" + rename-ppss-dir $FUNCNAME +} + +testSpecialCharacterHandling () { + + RES=$( { ./$PPSS -f "$INPUTFILESPECIAL" -c 'echo ' >> /dev/null ; } 2>&1 ) + assertEquals "PPSS did not execute properly." 0 "$?" + + assertNull "PPSS retured some errors..." "$RES" + if [ ! "$?" == "0" ] + then + echo "RES IS $RES" + fi + + RES=`find ppss_dir/PPSS_LOCAL_OUTPUT | wc -l | sed 's/\ //g'` + assertEquals "To many lock files..." "7" "$RES" + + RES1=`ls -1 $JOBLOG` + RES2=`ls -1 $LOCALOUTPUT` + + assertEquals "RES1 $RES1 is not the same as RES2 $RES2" "$RES1" "$RES2" + + rename-ppss-dir $FUNCNAME +} + +testSkippingOfProcessedItems () { + + createDirectoryWithSomeFiles + + RES=$( { ./$PPSS -d /tmp/$TMP_DIR -c 'echo ' >> /dev/null ; } 2>&1 ) + assertEquals "PPSS did not execute properly." 0 "$?" + assertNull "PPSS retured some errors..." "$RES" + + RES=$( { ./$PPSS -d /tmp/$TMP_DIR -c 'echo ' >> /dev/null ; } 2>&1 ) + assertEquals "PPSS did not execute properly." 0 "$?" + assertNull "PPSS retured some errors..." "$RES" + + grep -i skip ./$PPSS_dir/* >> /dev/null 2>&1 + assertEquals "Skipping of items went wrong." 0 "$?" + + rename-ppss-dir $FUNCNAME-1 + + RES=$( { ./$PPSS -f $INPUTFILESPECIAL -c 'echo ' >> /dev/null ; } 2>&1 ) + assertEquals "PPSS did not execute properly." 0 "$?" + assertNull "PPSS retured some errors..." "$RES" + + RES=$( { ./$PPSS -f $INPUTFILESPECIAL -c 'echo ' >> /dev/null ; } 2>&1 ) + assertEquals "PPSS did not execute properly." 0 "$?" + assertNull "PPSS retured some errors..." "$RES" + + grep -i skip ./$PPSS_dir/* >> /dev/null 2>&1 + assertEquals "Skipping of items went wrong." 0 "$?" + + rm -rf "/tmp/$TMP_DIR" + rename-ppss-dir $FUNCNAME-2 +} + +testExistLogFiles () { + + ./$PPSS -f "$INPUTFILENORMAL" -c 'echo "$ITEM"' >> /dev/null + assertEquals "PPSS did not execute properly." 0 "$?" + + for x in $NORMALTESTFILES + do + assertTrue "[ -e $JOBLOG/$x ]" + done + + rename-ppss-dir $FUNCNAME +} + +getStatusOfJob () { + + EXPECTED="$1" + + if [ "$EXPECTED" == "SUCCESS" ] + then + ./$PPSS -f "$INPUTFILENORMAL" -c 'echo ' >> /dev/null + assertEquals "PPSS did not execute properly." 0 "$?" + elif [ "$EXPECTED" == "FAILURE" ] + then + ./$PPSS -f "$INPUTFILENORMAL" -c 'thiscommandfails ' >> /dev/null + assertEquals "PPSS did not execute properly." 0 "$?" + fi + + for x in $NORMALTESTFILES + do + STATUS=`parseJobStatus "$x"` + assertEquals "FAILED WITH STATUS $STATUS." "$EXPECTED" "$STATUS" + done + + rename-ppss-dir "$FUNCNAME-$EXPECTED" +} + + +testErrorHandlingOK () { + + getStatusOfJob SUCCESS +} + +testErrorHandlingFAIL () { + + getStatusOfJob FAILURE +} + + + + +. ./shunit2 diff --git a/ppss.sh b/ppss.sh index 701a4c5..7dc51d0 100755 --- a/ppss.sh +++ b/ppss.sh @@ -34,73 +34,165 @@ #------------------------------------------------------------------------------ # Handling control-c for a clean shutdown. -trap 'kill_process; ' INT +trap 'kill_process' SIGINT -# Setting some vars. Do not change. +# Setting some vars. SCRIPT_NAME="Distributed Parallel Processing Shell Script" -SCRIPT_VERSION="2.0" +SCRIPT_VERSION="2.50" -# The first argument to this script is always the 'mode'. -MODE="$1" -shift +# The first argument to this script can be a mode. +MODES="node start config stop pause continue deploy status erase kill" +for x in $MODES +do + if [ "$x" == "$1" ] + then + MODE="$1" + shift + fi +done -ARGS=$@ -CONFIG="config.cfg" +# The working directory of PPSS can be set with +# export PPSS_DIR=/path/to/workingdir +if [ -z "$PPSS_DIR" ] +then + PPSS_DIR="./ppss" +fi + +if [ ! -e "$PPSS_DIR" ] +then + mkdir -p "$PPSS_DIR" +fi + +CONFIG="" HOSTNAME=`hostname` ARCH=`uname` -RUNNING_SIGNAL="$0_is_running" # Prevents running mutiple instances of PPSS.. -GLOBAL_LOCK="PPSS-GLOBAL-LOCK" # Global lock file used by local PPSS instance. -PAUSE_SIGNAL="pause_signal" # Not implemented yet (pause processing). -PAUSE_DELAY=300 -STOP_SIGNAL="stop_signal" -ARRAY_POINTER_FILE="ppss-array-pointer" # -JOB_LOG_DIR="JOB_LOG" # Directory containing log files of processed items. -LOGFILE="ppss-log.txt" # General PPSS log file. Contains lots of info. -STOP=9 # STOP job. -MAX_DELAY=2 -PERCENT="0" + PID="$$" +GLOBAL_LOCK="$PPSS_DIR/PPSS-GLOBAL-LOCK-$PID" # Global lock file used by local PPSS instance. +PAUSE_SIGNAL="$PPSS_DIR/pause_signal" # Pause processing if this file is present. +PAUSE_DELAY=60 # Polling every 1 minutes by default. +STOP_SIGNAL="$PPSS_DIR/stop_signal" # Stop processing if this file is present. +ARRAY_POINTER_FILE="$PPSS_DIR/ppss-array-pointer-$PID" # Pointer for keeping track of processed items. +JOB_LOG_DIR="$PPSS_DIR/job_log" # Directory containing log files of processed items. +LOGFILE="$PPSS_DIR/ppss-log-$$.txt" # General PPSS log file. Contains lots of info. +STOP=0 # STOP job. +MAX_DELAY=0 # MAX DELAY between jobs. +MAX_LOCK_DELAY=9 # +PERCENT="0" LISTENER_PID="" IFS_BACKUP="$IFS" -INTERVAL="30" # Polling interval to check if there are running jobs. CPUINFO=/proc/cpuinfo +PROCESSORS="" +STOP_KEY=$RANDOM$RANDOM$RANDOM +KILL_KEY=$RANDOM$RANDOM$RANDOM SSH_SERVER="" # Remote server or 'master'. SSH_KEY="" # SSH key for ssh account. -SSH_SOCKET="/tmp/PPSS-ssh-socket" # Multiplex multiple SSH connections over 1 master. +SSH_KNOWN_HOSTS="" +SSH_SOCKET="$PPSS_DIR/PPSS_SSH_SOCKET" # Multiplex multiple SSH connections over 1 master. SSH_OPTS="-o BatchMode=yes -o ControlPath=$SSH_SOCKET \ -o GlobalKnownHostsFile=./known_hosts \ -o ControlMaster=auto \ - -o ConnectTimeout=5" + -o Cipher=blowfish \ + -o ConnectTimeout=10 " + + # Blowfish is faster but still secure. SSH_MASTER_PID="" -PPSS_HOME_DIR="ppss" -ITEM_LOCK_DIR="PPSS_ITEM_LOCK_DIR" # Remote directory on master used for item locking. -PPSS_LOCAL_TMPDIR="PPSS_LOCAL_TMPDIR" # Local directory on slave for local processing. -PPSS_LOCAL_OUTPUT="PPSS_LOCAL_OUTPUT" # Local directory on slave for local output. +PPSS_HOME_DIR=ppss +ITEM_LOCK_DIR="$PPSS_DIR/PPSS_ITEM_LOCK_DIR" # Remote directory on master used for item locking. +PPSS_LOCAL_TMPDIR="$PPSS_DIR/PPSS_LOCAL_TMPDIR" # Local directory on slave for local processing. +PPSS_LOCAL_OUTPUT="$PPSS_DIR/PPSS_LOCAL_OUTPUT" # Local directory on slave for local output. TRANSFER_TO_SLAVE="0" # Transfer item to slave via (s)cp. SECURE_COPY="1" # If set, use SCP, Otherwise, use cp. REMOTE_OUTPUT_DIR="" # Remote directory to which output must be uploaded. SCRIPT="" # Custom user script that is executed by ppss. +ITEM_ESCAPED="" +NODE_STATUS="$PPSS_DIR/status.txt" +showusage_short () { + + echo + echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION" + echo + echo "usage: $0 [ -d | -f ] [ -c ' \"\$ITEM\"' ]" + echo " [ -C ] [ -j ] [ -l ] [ -p <# jobs> ]" + echo " [ -D ] [ -h ] [ --help ]" + echo + echo "Examples:" + echo " $0 -d /dir/with/some/files -c 'gzip '" + echo " $0 -d /dir/with/some/files -c 'gzip \"\$ITEM\"' -D 5" + echo " $0 -d /dir/with/some/files -c 'cp \"\$ITEM\" /tmp' -p 2" +} + +showusage_normal () { -showusage () { - echo - echo "$SCRIPT_NAME" - echo "Version: $SCRIPT_VERSION" + echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION" echo echo "PPSS is a Bash shell script that executes commands in parallel on a set " - echo "of items, such as files, or lines in a file." + echo "of items, such as files in a directory, or lines in a file." echo - echo "Usage: $0 MODE [ options ]" - echo " or " - echo "Usage: $0 MODE -c " + echo "This short summary only discusses options for stand-alone mode. for a " + echo "complete listing of all options, run PPSS with the options --help" + echo + echo "Usage $0 [ options ]" + echo + echo -e "--command | -c Command to execute. Syntax: ' ' including the single quotes." + echo -e " Example: -c 'ls -alh '. It is also possible to specify where an item " + echo -e " must be inserted: 'cp \"\$ITEM\" /somedir'." echo - echo "Modes are:" + echo -e "--sourcedir | -d Directory that contains files that must be processed. Individual files" + echo -e " are fed as an argument to the command that has been specified with -c." + echo + echo -e "--sourcefile | -f Each single line of the supplied file will be fed as an item to the" + echo -e " command that has been specified with -c." + echo + echo -e "--config | -C If the mode is config, a config file with the specified name will be" + echo -e " generated based on all the options specified. In the other modes". + echo -e " this option will result in PPSS reading the config file and start" + echo -e " processing items based on the settings of this file." + echo + echo -e "--enable-ht | -j Enable hyperthreading. Is disabled by default." + echo + echo -e "--log | -l Sets the name of the log file. The default is ppss-log.txt." + echo + echo -e "--processes | -p Start the specified number of processes. Ignore the number of available" + echo -e " CPUs." + echo + echo -e "--delay | -D Adds an initial random delay to the start of all parallel jobs to spread" + echo -e " the load. The delay is only used at the start of all 'threads'." + echo + echo -e "Example: encoding some wav files to mp3 using lame:" + echo + echo -e "$0 -d /path/to/wavfiles -c 'lame '" + echo + echo -e "Extended usage: use --help" + echo +} + +if [ "$#" == "0" ] +then + showusage_short + if [ -e "$PPSS_DIR" ] + then + rm -rf "$PPSS_DIR" + fi + exit 1 +fi + +showusage_long () { + + echo + echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION" + echo + echo "PPSS is a Bash shell script that executes commands in parallel on a set " + echo "of items, such as files in a directory, or lines in a file." + echo + echo "Usage: $0 [ MODE ] [ options ]" + echo + echo "Modes are optional and mainly used for running in distributed mode. Modes are:" echo - echo " standalone For execution of PPSS on a single host." - echo " node For execution of PPSS on a node, that is part of a 'cluster'." echo " config Generate a config file based on the supplied option parameters." echo " deploy Deploy PPSS and related files on the specified nodes." echo " erase Erase PPSS and related files from the specified nodes." @@ -108,6 +200,7 @@ showusage () { echo " start Starting PPSS on nodes." echo " pause Pausing PPSS on all nodes." echo " stop Stopping PPSS on all nodes." + echo " node Running PPSS as a node, requires additional options." echo echo "Options are:" echo @@ -121,7 +214,7 @@ showusage () { echo -e "--sourcefile | -f Each single line of the supplied file will be fed as an item to the" echo -e " command that has been specified with -c." echo - echo -e "--config | -c If the mode is config, a config file with the specified name will be" + echo -e "--config | -C If the mode is config, a config file with the specified name will be" echo -e " generated based on all the options specified. In the other modes". echo -e " this option will result in PPSS reading the config file and start" echo -e " processing items based on the settings of this file." @@ -131,11 +224,14 @@ showusage () { echo -e "--log | -l Sets the name of the log file. The default is ppss-log.txt." echo echo -e "--processes | -p Start the specified number of processes. Ignore the number of available" - echo -e " CPU's." + echo -e " CPUs." echo + echo -e "--delay | -D Adds an initial random delay to the start of all parallel jobs to spread" + echo -e " the load. The delay is only used at the start of all 'threads'." + echo echo -e "The following options are used for distributed execution of PPSS." echo - echo -e "--server | -s Specifies the SSH server that is used for communication between nodes." + echo -e "--master | -m Specifies the SSH server that is used for communication between nodes." echo -e " Using SSH, file locks are created, informing other nodes that an item " echo -e " is locked. Also, often items, such as files, reside on this host. SCP " echo -e " is used to transfer files from this host to nodes for local procesing." @@ -143,12 +239,16 @@ showusage () { echo -e "--node | -n File containig a list of nodes that act as PPSS clients. One IP / DNS " echo -e " name per line." echo - echo -e "--key | -k The SSH key that a node uses to connect to the server." + echo -e "--key | -k The SSH key that a node uses to connect to the master." + echo + echo -e "--known-hosts | -K The file that contains the server public key. Can often be found on " + echo -e " hosts that already once connected to the server. See the file " + echo -e " ~/.ssh/known_hosts or else, manualy connect once and check this file." echo echo -e "--user | -u The SSH user name that is used when logging in into the master SSH" echo -e " server." echo - echo -e "--script | -s Specifies the script/program that must be copied to the nodes for " + echo -e "--script | -S Specifies the script/program that must be copied to the nodes for " echo -e " execution through PPSS. Only used in the deploy mode." echo -e " This option should be specified if necessary when generating a config." echo @@ -162,60 +262,40 @@ showusage () { echo -e " encoding a wav file is an mp3 file, the mp3 file is put in the " echo -e " directory specified with this option." echo + echo -e "--homedir | -H Directory in which directory PPSS is installed on the node." + echo -e " Default is 'ppss'." + echo echo -e "Example: encoding some wav files to mp3 using lame:" echo - echo -e "$0 standalone -c 'lame ' -d /path/to/wavfiles -j " + echo -e "$0 -c 'lame ' -d /path/to/wavfiles -j " echo echo -e "Running PPSS based on a configuration file." echo - echo -e "$0 node -C config.cfg" + echo -e "$0 -C config.cfg" echo echo -e "Running PPSS on a client as part of a cluster." echo - echo -e "$0 node -d /somedir -c 'cp "$ITEM" /some/destination' -s 10.0.0.50 -u ppss -t -k ppss-key.key" + echo -e "$0 -d /somedir -c 'cp "$ITEM" /some/destination' -s 10.0.0.50 -u ppss -t -k ppss-key.key" echo } kill_process () { - kill $LISTENER_PID >> /dev/null 2>&1 - while true - do - JOBS=`ps ax | grep -v grep | grep -v -i screen | grep ppss.sh | grep -i bash | wc -l` - if [ "$JOBS" -gt "2" ] - then - for x in `ps ax | grep -v grep | grep -v -i screen | grep ppss.sh | grep -i bash | awk '{ print $1 }'` - do - if [ ! "$x" == "$PID" ] && [ ! "$x" == "$$" ] - then - kill -9 $x >> /dev/null 2>&1 - fi - done - sleep 5 - else - cleanup - echo -en "\033[1B" - # The master SSH connection should be killed. - if [ ! -z "$SSH_MASTER_PID" ] - then - kill -9 "$SSH_MASTER_PID" - fi - echo "" - exit 0 - fi - done - + echo "$KILL_KEY" >> "$FIFO" } exec_cmd () { + CMD="$1" - if [ ! -z "$SSH_SERVER" ] && [ "$SECURE_COPY" == "1" ] + if [ ! -z "$SSH_SERVER" ] then ssh $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER $CMD + return $? else eval "$CMD" + return $? fi } @@ -237,16 +317,21 @@ check_for_interrupt () { does_file_exist "$STOP_SIGNAL" if [ "$?" == "0" ] then + set_status "STOPPED" log INFO "STOPPING job. Stop signal found." STOP="1" + return 1 fi does_file_exist "$PAUSE_SIGNAL" if [ "$?" == "0" ] then + set_status "PAUZED" log INFO "PAUSE: sleeping for $PAUSE_DELAY SECONDS." - sleep $PAUSE_DELAY + sleep 0.$PAUSE_DELAY check_for_interrupt + else + set_status "RUNNING" fi } @@ -256,22 +341,17 @@ cleanup () { if [ -e "$FIFO" ] then - rm $FIFO + rm "$FIFO" fi if [ -e "$ARRAY_POINTER_FILE" ] then - rm $ARRAY_POINTER_FILE + rm "$ARRAY_POINTER_FILE" fi if [ -e "$GLOBAL_LOCK" ] then - rm -rf $GLOBAL_LOCK - fi - - if [ -e "$RUNNING_SIGNAL" ] - then - rm "$RUNNING_SIGNAL" + rm -rf "$GLOBAL_LOCK" fi if [ -e "$SSH_SOCKET" ] @@ -281,19 +361,6 @@ cleanup () { } -# check if ppss is already running. -is_running () { - - if [ -e "$RUNNING_SIGNAL" ] - then - echo - log INFO "$0 is already running (lock file exists)." - echo - exit 1 - fi -} - - add_var_to_config () { if [ "$MODE" == "config" ] @@ -317,9 +384,9 @@ do then if [ -e "$CONFIG" ] then - echo "Do want to overwrite existing config file?" + echo "Do want to overwrite existing config file? [y/n]" read yn - if [ "$yn" == "y" ] + if [ "$yn" == "y" ] || [ "$yn" == "yes" ] then rm "$CONFIG" else @@ -340,15 +407,26 @@ do SSH_KEY="-i $SSH_KEY" fi + if [ ! -e "./known_hosts" ] + then + if [ -e $SSH_KNOWN_HOSTS ] + then + cat $SSH_KNOWN_HOSTS > ./known_hosts + else + echo "File $SSH_KNOWN_HOSTS does not exist." + exit + fi + fi + shift 2 ;; - --node|-n ) + --node|-n ) NODES_FILE="$2" add_var_to_config NODES_FILE "$NODES_FILE" shift 2 ;; - --sourcefile|-f ) + --sourcefile|-f ) INPUT_FILE="$2" add_var_to_config INPUT_FILE "$INPUT_FILE" shift 2 @@ -358,8 +436,13 @@ do add_var_to_config SRC_DIR "$SRC_DIR" shift 2 ;; - --command|-c ) - COMMAND=$2 + --delay|-D) + MAX_DELAY="$2" + add_var_to_config MAX_DELAY "$MAX_DELAY" + shift 2 + ;; + --command|-c ) + COMMAND="$2" if [ "$MODE" == "config" ] then COMMAND=\'$COMMAND\' @@ -368,29 +451,45 @@ do shift 2 ;; - --help|-h ) - showusage + -h ) + showusage_normal + if [ -e "$PPSS_DIR" ] + then + rm -rf "$PPSS_DIR" + fi exit 1;; - --homedir|-H) + --help) + showusage_long + if [ -e "$PPSS_DIR" ] + then + rm -rf "$PPSS_DIR" + fi + exit 1;; + --homedir|-H ) if [ ! -z "$2" ] then PPSS_HOME_DIR="$2" - add_var_to_config PPSS_HOME_DIR $PPSS_HOME_DIR + add_var_to_config PPSS_DIR $PPSS_HOME_DIR shift 2 fi ;; - --disable-ht|-j ) + --disable-ht|-j ) HYPERTHREADING=no add_var_to_config HYPERTHREADING $HYPERTHREADING shift 1 ;; - --log|-l ) + --log|-l ) LOGFILE="$2" add_var_to_config LOGFILE "$LOGFILE" shift 2 ;; - --key|-k ) + --workingdir|-w ) + WORKINGDIR="$2" + add_var_to_config WORKINGDIR "$WORKINGDIR" + shift 2 + ;; + --key|-k ) SSH_KEY="$2" add_var_to_config SSH_KEY "$SSH_KEY" if [ ! -z "$SSH_KEY" ] @@ -399,7 +498,13 @@ do fi shift 2 ;; - --no-scp |-b ) + --known-hosts | -K ) + SSH_KNOWN_HOSTS="$2" + add_var_to_config SSH_KNOWN_HOSTS "$SSH_KNOWN_HOSTS" + shift 2 + ;; + + --no-scp |-b ) SECURE_COPY=0 add_var_to_config SECURE_COPY "$SECURE_COPY" shift 1 @@ -418,7 +523,7 @@ do shift 2 fi ;; - --server|-s ) + --master|-m ) SSH_SERVER="$2" add_var_to_config SSH_SERVER "$SSH_SERVER" shift 2 @@ -446,32 +551,60 @@ do exit 0 ;; * ) - showusage + + showusage_short + echo + echo "Unknown option $1 " + echo exit 1;; esac done + +display_header () { + + log info "" + log INFO "=========================================================" + log INFO " |P|P|S|S| " + log INFO "$SCRIPT_NAME version $SCRIPT_VERSION" + log INFO "=========================================================" + log INFO "Hostname:\t\t$HOSTNAME" + log INFO "---------------------------------------------------------" +} + + # Init all vars init_vars () { + + if [ "$ARCH" == "Darwin" ] + then + MIN_JOBS=4 + elif [ "$ARCH" == "Linux" ] + then + MIN_JOBS=3 + fi + if [ -e "$LOGFILE" ] then rm $LOGFILE fi + + display_header if [ -z "$COMMAND" ] then echo - echo "ERROR - no command specified." + log ERROR "No command specified." echo - showusage + showusage_normal cleanup exit 1 fi echo 0 > $ARRAY_POINTER_FILE - FIFO=$(pwd)/fifo-$RANDOM-$RANDOM + FIFO=/tmp/ppss-fifo-$RANDOM-$RANDOM if [ ! -e "$FIFO" ] then @@ -480,20 +613,39 @@ init_vars () { exec 42<> $FIFO - touch $RUNNING_SIGNAL + set_status "RUNNING" + + if [ -e "$CPUINFO" ] + then + CPU=`cat /proc/cpuinfo | grep 'model name' | cut -d ":" -f 2 | sed -e s/^\ //g | sort | uniq` + log INFO "CPU: $CPU" + elif [ "$ARCH" == "Darwin" ] + then + MODEL=`system_profiler SPHardwareDataType | grep "Processor Name" | cut -d ":" -f 2` + SPEED=`system_profiler SPHardwareDataType | grep "Processor Speed" | cut -d ":" -f 2` + log INFO "CPU: $MODEL $SPEED" + elif [ "$ARCH" == "SunOS" ] + then + CPU=`psrinfo -v | grep MHz | cut -d " " -f 4,8 | awk '{ printf ("Processor architecture: %s @ %s MHz.\n", $1,$2) }' | head -n 1` + + log INFO "$CPU" + else + log INFO "CPU: Cannot determine. Provide a patch for your arch!" + log INFO "Arch is $ARCH" + fi if [ -z "$MAX_NO_OF_RUNNING_JOBS" ] then - MAX_NO_OF_RUNNING_JOBS=`get_no_of_cpus $HYPERTHREADING` + get_no_of_cpus $HYPERTHREADING fi does_file_exist "$JOB_LOG_DIR" if [ ! "$?" == "0" ] then - log INFO "Job log directory $JOB_lOG_DIR does not exist. Creating." - exec_cmd "mkdir $JOB_LOG_DIR" + log DEBUG "Job log directory $JOB_lOG_DIR does not exist. Creating." + exec_cmd "mkdir -p $JOB_LOG_DIR" else - log INFO "Job log directory $JOB_LOG_DIR exists, skipping items for which logs are present." + log DEBUG "Job log directory $JOB_LOG_DIR exists." fi does_file_exist "$ITEM_LOCK_DIR" @@ -505,28 +657,42 @@ init_vars () { if [ ! -e "$JOB_LOG_DIR" ] then - mkdir "$JOB_LOG_DIR" + mkdir -p "$JOB_LOG_DIR" fi does_file_exist "$REMOTE_OUTPUT_DIR" if [ ! "$?" == "0" ] then - echo "ERROR: remote output dir $REMOTE_OUTPUT_DIR does not exist." + log ERROR "Remote output dir $REMOTE_OUTPUT_DIR does not exist." + set_status STOPPED cleanup exit fi - if [ ! -e "$PPSS_LOCAL_TMPDIR" ] && [ ! -z "$SSH_SERVER" ] + if [ ! -e "$PPSS_LOCAL_TMPDIR" ] then mkdir "$PPSS_LOCAL_TMPDIR" fi - if [ ! -e "$PPSS_LOCAL_OUTPUT" ] && [ ! -z "$SSH_SERVER" ] + if [ ! -e "$PPSS_LOCAL_OUTPUT" ] then mkdir "$PPSS_LOCAL_OUTPUT" fi } +get_status () { + + STATUS=`cat "$NODE_SATUS"` + echo "$STATUS" +} + +set_status () { + + STATUS="$1" + echo "$HOSTNAME $STATUS" > "$NODE_STATUS" +} + + expand_str () { STR=$1 @@ -542,30 +708,32 @@ expand_str () { } log () { + + # Type 'INFO' is logged to the screen + # Any other log-type is only logged to the logfile. TYPE="$1" MESG="$2" - TMP_LOG="" - TYPE_LENGTH=6 + TYPE_LENGTH=5 TYPE_EXP=`expand_str "$TYPE"` DATE=`date +%b\ %d\ %H:%M:%S` - PREFIX="$DATE: ${TYPE_EXP:0:$TYPE_LENGTH} -" + PREFIX="$DATE: ${TYPE_EXP:0:$TYPE_LENGTH}" + PREFIX_SMALL="$DATE: " LOG_MSG="$PREFIX $MESG" + ECHO_MSG="$PREFIX_SMALL $MESG" echo -e "$LOG_MSG" >> "$LOGFILE" - if [ "$TYPE" == "INFO" ] + if [ "$TYPE" == "INFO" ] || [ "$TYPE" == "ERROR" ] || [ "$TYPE" == "WARN" ] then - echo -e "$LOG_MSG" + echo -e "$ECHO_MSG" fi } -log INFO "$0 $@" - check_status () { ERROR="$1" @@ -575,6 +743,7 @@ check_status () { if [ ! "$ERROR" == "0" ] then log INFO "$FUNCTION - $MESSAGE" + set_status STOPPED cleanup exit 1 fi @@ -583,33 +752,95 @@ check_status () { erase_ppss () { - echo "Are you realy sure you want to erase PPSS from all nades!?" + + echo "Are you realy sure you want to erase PPSS from all nodes!? (YES/NO)" read YN - if [ "$YN" == "y" ] + if [ "$YN" == "yes" ] || [ "$YN" == "YES" ] then for NODE in `cat $NODES_FILE` do - log INFO "Erasing PPSS homedir $PPSS_HOME_DIR from node $NODE." - ssh $USER@$NODE "rm -rf $PPSS_HOME_DIR" + log INFO "Erasing PPSS homedir $PPSS_DIR from node $NODE." + ssh -q $SSH_KEY $SSH_OPTS $USER@$NODE "rm -rf $PPSS_HOME_DIR" done + else + log INFO "Aborting.." fi + sleep 1 } -deploy_ppss () { +deploy () { + + NODE="$1" + + SSH_OPTS_NODE="-o BatchMode=yes -o ControlPath=socket-%h \ + -o GlobalKnownHostsFile=./known_hosts \ + -o ControlMaster=auto \ + -o Cipher=blowfish \ + -o ConnectTimeout=5 " ERROR=0 set_error () { if [ ! "$1" == "0" ] then - ERROR=$1 + ERROR=1 fi } + ssh -q -o ConnectTimeout=5 $SSH_KEY $USER@$NODE exit 0 + set_error "$?" + if [ ! "$ERROR" == "0" ] + then + log ERROR "Cannot connect to node $NODE." + return + fi + + ssh -N -M $SSH_OPTS_NODE $SSH_KEY $USER@$NODE & + SSH_PID=$! + + KEY=`echo $SSH_KEY | cut -d " " -f 2` + + sleep 1 + + ssh -q $SSH_OPTS_NODE $SSH_KEY $USER@$NODE "cd ~ && mkdir $PPSS_HOME_DIR >> /dev/null 2>&1" + scp -q $SSH_OPTS_NODE $SSH_KEY $0 $USER@$NODE:~/$PPSS_HOME_DIR + set_error $? + scp -q $SSH_OPTS_NODE $SSH_KEY $KEY $USER@$NODE:~/$PPSS_HOME_DIR + set_error $? + scp -q $SSH_OPTS_NODE $SSH_KEY $CONFIG $USER@$NODE:~/$PPSS_HOME_DIR + set_error $? + scp -q $SSH_OPTS_NODE $SSH_KEY known_hosts $USER@$NODE:~/$PPSS_HOME_DIR + set_error $? + if [ ! -z "$SCRIPT" ] + then + scp -q $SSH_OPTS_NODE $SSH_KEY $SCRIPT $USER@$NODE:~/$PPSS_HOME_DIR + set_error $? + fi + + if [ ! -z "$INPUT_FILE" ] + then + scp -q $SSH_OPTS_NODE $SSH_KEY $INPUT_FILE $USER@$NODE:~/$PPSS_HOME_DIR + set_error $? + fi + + if [ "$ERROR" == "0" ] + then + log INFO "PPSS installed on node $NODE." + else + log INFO "PPSS failed to install on $NODE." + fi + + kill $SSH_PID +} + +deploy_ppss () { + + if [ -z "$NODES_FILE" ] then log INFO "ERROR - are you using the right option? -C ?" + set_status ERROR cleanup exit 1 fi @@ -617,50 +848,47 @@ deploy_ppss () { KEY=`echo $SSH_KEY | cut -d " " -f 2` if [ -z "$KEY" ] || [ ! -e "$KEY" ] then - log INFO "ERROR - nodes require a key file." + log ERROR "Nodes require a key file." cleanup + set_status "ERROR" exit 1 fi - if [ ! -e "$SCRIPT" ] + if [ ! -e "$SCRIPT" ] && [ ! -z "$SCRIPT" ] then - log INFO "ERROR - script $SCRIPT not found." + log ERROR "Script $SCRIPT not found." + set_status "ERROR" cleanup exit 1 fi + INSTALLED_ON_SSH_SERVER=0 if [ ! -e "$NODES_FILE" ] then - log INFO "ERROR file $NODES with list of nodes does not exist." + log ERROR "File $NODES with list of nodes does not exist." + set_status ERROR cleanup exit 1 else for NODE in `cat $NODES_FILE` do - ssh -q $USER@$NODE "mkdir $PPSS_HOME_DIR >> /dev/null 2>&1" - scp -q $SSH_OPTS $0 $USER@$NODE:~/$PPSS_HOME_DIR - set_error $? - scp -q $KEY $USER@$NODE:~/$PPSS_HOME_DIR - set_error $? - scp -q $CONFIG $USER@$NODE:~/$PPSS_HOME_DIR - set_error $? - scp -q known_hosts $USER@$NODE:~/$PPSS_HOME_DIR - set_error $? - scp -q $SCRIPT $USER@$NODE:~/$PPSS_HOME_DIR - set_error $? - if [ ! -z "$INPUT_FILE" ] - then - scp -q $INPUT_FILE $USER@$NODE:~/$PPSS_HOME_DIR - set_error $? - fi - - if [ "$ERROR" == "0" ] - then - log INFO "PPSS installed on node $NODE." - else - log INFO "PPSS failed to install on $NODE." + deploy "$NODE" & + sleep 0.1 + if [ "$NODE" == "$SSH_SERVER" ] + then + log DEBUG "SSH SERVER $SSH_SERVER is also a node." + INSTALLED_ON_SSH_SERVER=1 + exec_cmd "mkdir -p $PPSS_HOME_DIR/$JOB_LOG_DIR" + exec_cmd "mkdir -p $ITEM_LOCK_DIR" fi done + if [ "$INSTALLED_ON_SSH_SERVER" == "0" ] + then + log DEBUG "SSH SERVER $SSH_SERVER is not a node." + deploy "$SSH_SERVER" + exec_cmd "mkdir -p $PPSS_HOME_DIR/$JOB_LOG_DIR" + exec_cmd "mkdir -p $ITEM_LOCK_DIR" + fi fi } @@ -669,20 +897,20 @@ start_ppss_on_node () { NODE="$1" log INFO "Starting PPSS on node $NODE." - ssh $USER@$NODE "cd $PPSS_HOME_DIR ; screen -d -m -S PPSS ./ppss.sh node --config $CONFIG" + ssh $SSH_KEY $USER@$NODE -o ConnectTimeout=5 "cd $PPSS_HOME_DIR ; screen -d -m -S PPSS ~/$PPSS_HOME_DIR/$0 node --config ~/$PPSS_HOME_DIR/$CONFIG" } - test_server () { # Testing if the remote server works as expected. if [ ! -z "$SSH_SERVER" ] - then + then exec_cmd "date >> /dev/null" check_status "$?" "$FUNCNAME" "Server $SSH_SERVER could not be reached" ssh -N -M $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER & SSH_MASTER_PID="$!" + log DEBUG "SSH Master pid is $SSH_MASTER_PID" else log DEBUG "No remote server specified, assuming stand-alone mode." fi @@ -712,46 +940,67 @@ get_no_of_cpus () { then NUMBER=`grep ^processor $CPUINFO | wc -l` got_cpu_info "$?" - + elif [ "$ARCH" == "Darwin" ] then NUMBER=`sysctl -a hw | grep -w logicalcpu | awk '{ print $2 }'` got_cpu_info "$?" + elif [ "$ARCH" == "FreeBSD" ] then NUMBER=`sysctl hw.ncpu | awk '{ print $2 }'` got_cpu_info "$?" - else - NUMBER=`grep ^processor $CPUINFO | wc -l` + + elif [ "$ARCH" == "SunOS" ] + then + NUMBER=`psrinfo | grep on-line | wc -l` got_cpu_info "$?" + else + if [ -e "$CPUINFO" ] + then + NUMBER=`grep ^processor $CPUINFO | wc -l` + got_cpu_info "$?" + fi fi + + if [ ! -z "$NUMBER" ] + then + log INFO "Found $NUMBER logic processors." + fi + elif [ "$HPT" == "no" ] then - log DEBUG "Hyperthreading is disabled." + log INFO "Hyperthreading is disabled." + if [ "$ARCH" == "Linux" ] then PHYSICAL=`grep 'physical id' $CPUINFO` if [ "$?" == "0" ] then PHYSICAL=`grep 'physical id' $CPUINFO | sort | uniq | wc -l` - log DEBUG "Detected $PHYSICAL CPU(s)" - TMP=`grep 'cpu cores' $CPUINFO` + if [ "$PHYSICAL" == "1" ] + then + log INFO "Found $PHYSICAL physical CPU." + else + log INFO "Found $PHYSICAL physical CPUs." + fi + + TMP=`grep 'core id' $CPUINFO` if [ "$?" == "0" ] then - MULTICORE=`grep 'cpu cores' $CPUINFO | sort | uniq | cut -d ":" -f 2 | sed s/\ //g` - log DEBUG "Detected $MULTICORE cores per CPU." - NUMBER=$(($PHYSICAL*$MULTICORE)) + log DEBUG "Starting job only for each physical core on all physical CPU(s)." + NUMBER=`grep 'core id' $CPUINFO | sort | uniq | wc -l` + log INFO "Found $NUMBER physical cores." else - log DEBUG "Starting job only for each physical CPU." + log INFO "Single core processor(s) detected." + log INFO "Starting job for each physical CPU." NUMBER=$PHYSICAL fi else - log DEBUG "No 'physical id' section found in $CPUINFO." + log INFO "No 'physical id' section found in $CPUINFO, typical for older cpus." NUMBER=`grep ^processor $CPUINFO | wc -l` got_cpu_info "$?" fi - - elif [ "$ARCH" == "Darwin" ] then NUMBER=`sysctl -a hw | grep -w physicalcpu | awk '{ print $2 }'` @@ -767,16 +1016,17 @@ get_no_of_cpus () { fi - if [ ! -z "$NUMBER" ] + if [ ! -z "$NUMBER" ] then - echo "$NUMBER" + MAX_NO_OF_RUNNING_JOBS=$NUMBER else - log INFO "$FUNCNAME ERROR - number of CPUs not obtained." + log ERROR "Number of CPUs not obtained." + log ERROR "Please specify manually with -p." + set_status "ERROR" exit 1 fi } - random_delay () { ARGS="$1" @@ -784,12 +1034,13 @@ random_delay () { if [ -z "$ARGS" ] then log ERROR "$FUNCNAME Function random delay, no argument specified." + set_status ERROR exit 1 fi NUMBER=$RANDOM let "NUMBER %= $ARGS" - sleep "$NUMBER" + sleep "0.$NUMBER" } @@ -814,7 +1065,7 @@ get_global_lock () { ERROR="$?" if [ ! "$ERROR" == "0" ] then - random_delay $MAX_DELAY + random_delay $MAX_LOCK_DELAY continue else break @@ -838,47 +1089,91 @@ are_jobs_running () { fi } +escape_item () { + + TMP="$1" + + ITEM_ESCAPED=`echo "$TMP" | \ + sed s/\\ /\\\\\\\\\\\\\\ /g | \ + sed s/\\'/\\\\\\\\\\\\\\'/g | \ + sed s/\\\`/\\\\\\\\\\\\\\\`/g | \ + sed s/\\|/\\\\\\\\\\\\\\|/g | \ + sed s/\&/\\\\\\\\\\\\\\&/g | \ + sed s/\;/\\\\\\\\\\\\\\;/g | \ + sed s/\\\//\\\\\\\\\\\\\\ /g | \ + sed s/\(/\\\\\\\\\\(/g | \ + sed s/\)/\\\\\\\\\\)/g ` +} + download_item () { ITEM="$1" - ITEM_WITH_PATH="$SRC_DIR/$ITEM" + if [ -e "$ITEM" ] + then + ITEM_NO_PATH=`basename "$ITEM"` + else + escape_item "$ITEM" + ITEM_NO_PATH="$ITEM_ESCAPED" + fi if [ "$TRANSFER_TO_SLAVE" == "1" ] then - log DEBUG "Transfering item $ITEM to local disk." - if [ "$SECURE_COPY" == "1" ] + log DEBUG "Transfering item $ITEM_NO_PATH to local disk." + if [ "$SECURE_COPY" == "1" ] && [ ! -z "$SSH_SERVER" ] then - scp -q $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER:"$ITEM_WITH_PATH" $PPSS_LOCAL_TMPDIR - log DEBUG "Exit code of transfer is $?" + if [ ! -z "$SRC_DIR" ] + then + ITEM_PATH="$SRC_DIR/$ITEM" + else + ITEM_PATH="$ITEM" + fi + + escape_item "$ITEM_PATH" + + scp -q $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER:"$ITEM_ESCAPED" ./$PPSS_LOCAL_TMPDIR + log DEBUG "Exit code of remote transfer is $?" else - cp "$ITEM_WITH_PATH" $PPSS_LOCAL_TMPDIR - log DEBUG "Exit code of transfer is $?" + cp "$ITEM" ./$PPSS_LOCAL_TMPDIR + log DEBUG "Exit code of local transfer is $?" fi + else + log DEBUG "No transfer of item $ITEM_NO_PATH to local workpath." fi } upload_item () { + ITEM="$1" + ITEMDIR="$2" + + if [ "$TRANSFER_TO_SLAVE" == "0" ] + then + log DEBUG "File transfer is disabled." + return 0 + fi log DEBUG "Uploading item $ITEM." if [ "$SECURE_COPY" == "1" ] then - scp -q $SSH_OPTS $SSH_KEY "$ITEM" $USER@$SSH_SERVER:$REMOTE_OUTPUT_DIR + escape_item "$REMOTE_OUTPUT_DIR$ITEMDIR" + DIR_ESCAPED="$ITEM_ESCAPED" + + scp -q $SSH_OPTS $SSH_KEY "$ITEM"/* $USER@$SSH_SERVER:"$DIR_ESCAPED" ERROR="$?" if [ ! "$ERROR" == "0" ] then - log DEBUG "ERROR - uploading of $ITEM failed." + log ERROR "Uploading of $ITEM via SCP failed." else log DEBUG "Upload of item $ITEM success" - rm $ITEM + rm -rf ./"$ITEM" fi else - cp "$ITEM" $REMOTE_OUTPUT_DIR + cp "$ITEM" "$REMOTE_OUTPUT_DIR" ERROR="$?" if [ ! "$ERROR" == "0" ] then - log DEBUG "ERROR - uploading of $ITEM failed." + log DEBUG "ERROR - uploading of $ITEM vi CP failed." fi fi } @@ -888,29 +1183,34 @@ lock_item () { if [ ! -z "$SSH_SERVER" ] then ITEM="$1" - LOCK_FILE_NAME=`echo $ITEM | sed s/^\\\.//g |sed s/^\\\.\\\.//g | sed s/\\\///g` + + LOCK_FILE_NAME=`echo "$ITEM" | \ + sed s/^\\\.//g | \ + sed s/^\\\.\\\.//g | \ + sed s/^\\\///g | \ + sed s/\\\//\\\\\\ /g | \ + sed s/\\ /\\\\\\\\\\\\\\ /g | \ + sed s/\\'/\\\\\\\\\\\\\\'/g | \ + sed s/\\\//\\\\\\\\\\\\\\ /g | \ + sed s/\&/\\\\\\\\\\\\\\&/g | \ + sed s/\;/\\\\\\\\\\\\\\;/g | \ + sed s/\(/\\\\\\\\\\(/g | \ + sed s/\)/\\\\\\\\\\)/g ` + ITEM_LOCK_FILE="$ITEM_LOCK_DIR/$LOCK_FILE_NAME" - log DEBUG "Trying to lock item $ITEM." + log DEBUG "Trying to lock item $ITEM - $ITEM_LOCK_FILE." exec_cmd "mkdir $ITEM_LOCK_FILE >> /dev/null 2>&1" ERROR="$?" - if [ -e "$ITEM_LOCK_FILE" ] + + if [ "$ERROR" == "$?" ] then - exec_cmd "touch $ITEM_LOCK_FILE/$HOSTNAME" + exec_cmd "touch $ITEM_LOCK_FILE/$HOSTNAME" # Record that item is claimed by node x. fi + return "$ERROR" fi } -release_item () { - - ITEM="$1" - - LOCK_FILE_NAME=`echo $ITEM` # | sed s/^\\.//g | sed s/^\\.\\.//g | sed s/\\\///g` - ITEM_LOCK_FILE="$ITEM_LOCK_DIR/$LOCK_FILE_NAME" - - exec_cmd "rm -rf ./$ITEM_LOCK_FILE" -} - get_all_items () { count=0 @@ -922,7 +1222,12 @@ get_all_items () { ITEMS=`exec_cmd "ls -1 $SRC_DIR"` check_status "$?" "$FUNCNAME" "Could not list files within remote source directory." else - ITEMS=`ls -1 $SRC_DIR` + if [ -e "$SRC_DIR" ] + then + ITEMS=`ls -1 $SRC_DIR` + else + ITEMS="" + fi fi IFS=$'\n' @@ -936,29 +1241,32 @@ get_all_items () { if [ ! -z "$SSH_SERVER" ] # Are we running stand-alone or as a slave?" then log DEBUG "Running as slave, input file has been pushed (hopefully)." - if [ ! -e "$INPUT_FILE" ] - then - log INFO "ERROR - input file $INPUT_FILE does not exist." - cleanup - exit 1 - fi + fi + if [ ! -e "$INPUT_FILE" ] + then + log ERROR "Input file $INPUT_FILE does not exist." + set_status "ERROR" + cleanup + exit 1 fi - exec 10<$INPUT_FILE + exec 10<"$INPUT_FILE" while read LINE <&10 do ARRAY[$count]=$LINE ((count++)) done + + exec 10>&- fi - exec 10>&- SIZE_OF_ARRAY="${#ARRAY[@]}" if [ "$SIZE_OF_ARRAY" -le "0" ] then - log INFO "ERROR: source file/dir seems to be empty." + log ERROR "Source file/dir seems to be empty." + set_status STOPPED cleanup exit 1 fi @@ -972,7 +1280,7 @@ get_item () { then return 1 fi - + get_global_lock SIZE_OF_ARRAY="${#ARRAY[@]}" @@ -986,20 +1294,16 @@ get_item () { # This variable is used to walk thtough all array items. ARRAY_POINTER=`cat $ARRAY_POINTER_FILE` - - # Gives a status update on the current progress.. - PERCENT=$((100 * $ARRAY_POINTER / $SIZE_OF_ARRAY )) - log INFO "Currently $PERCENT percent complete. Processed $ARRAY_POINTER of $SIZE_OF_ARRAY items." - echo -en "\033[1A" - + # Check if all items have been processed. if [ "$ARRAY_POINTER" -ge "$SIZE_OF_ARRAY" ] then release_global_lock - return 2 + #echo -en "\033[1A" + return 1 fi - # Select an item. + # Select an item. ITEM="${ARRAY[$ARRAY_POINTER]}" if [ -z "$ITEM" ] then @@ -1031,7 +1335,13 @@ start_single_worker () { ERROR=$? if [ ! "$ERROR" == "0" ] then - log DEBUG "Item empty, we are probably almost finished." + # + # If no more items are available, the listener should be + # informed that a worker just finished / died. + # This allows the listener to determine if all processes + # are finished and it is time to stop. + # + echo "$STOP_KEY" > $FIFO return 1 else get_global_lock @@ -1061,87 +1371,129 @@ elapsed () { commando () { ITEM="$1" - ITEM_NO_PATH="$1" - log DEBUG "Processing item $ITEM" - - if [ -z "$INPUT_FILE" ] && [ "$TRANSFER_TO_SLAVE" == "0" ] + if [ -e "$ITEM" ] then - ITEM="$SRC_DIR/$ITEM" + DIRNAME=`dirname "$ITEM"` + ITEM_NO_PATH=`basename "$ITEM"` + escape_item "$ITEM_NO_PATH" + OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$ITEM_ESCAPED" + # ^ + # | This VAR can be used in scripts or command lines. + # + OUTPUT_FILE="$ITEM_ESCAPED" else - ITEM="$PPSS_LOCAL_TMPDIR/$ITEM" + DIRNAME="" + escape_item "$ITEM" + ITEM_NO_PATH="$ITEM_ESCAPED" + OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$ITEM_NO_PATH" fi - LOG_FILE_NAME=`echo "$ITEM" | sed s/^\\\.//g | sed s/^\\\.\\\.//g | sed s/\\\///g` + log DEBUG "Processing item $ITEM" + # + # Decide if an item must be transfered from server to the node. + # or be processed in-place (NFS / SMB mount?) + # + if [ "$TRANSFER_TO_SLAVE" == "0" ] + then + if [ -z "$SRC_DIR" ] && [ ! -z "$INPUT_FILE" ] + then + log DEBUG "Using item straight from the server." + else + ITEM="$SRC_DIR/$ITEM" + fi + else + ITEM="./$PPSS_LOCAL_TMPDIR/$ITEM_NO_PATH" + fi + + LOG_FILE_NAME=`echo "$ITEM" | sed s/^\\\.//g | sed s/^\\\.\\\.//g | sed s/\\\///g | sed s/\\ /_/g` ITEM_LOG_FILE="$JOB_LOG_DIR/$LOG_FILE_NAME" - mkdir -p $PPSS_LOCAL_OUTPUT/"$ITEM_NO_PATH" + OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$LOG_FILE_NAME" + + mkdir -p "$OUTPUT_DIR" does_file_exist "$ITEM_LOG_FILE" if [ "$?" == "0" ] then log DEBUG "Skipping item $ITEM - already processed." else - ERROR="" - + # # Some formatting of item log files. + # DATE=`date +%b\ %d\ %H:%M:%S` echo "===== PPSS Item Log File =====" > "$ITEM_LOG_FILE" echo -e "Host:\t\t$HOSTNAME" >> "$ITEM_LOG_FILE" + echo -e "Process:\t$PID" >> "$ITEM_LOG_FILE" echo -e "Item:\t\t$ITEM" >> "$ITEM_LOG_FILE" echo -e "Start date:\t$DATE" >> "$ITEM_LOG_FILE" echo -e "" >> "$ITEM_LOG_FILE" - # The actual execution of the command. + # + # The actual execution of the command as specified by + # the -c option. + # + BEFORE="$(date +%s)" TMP=`echo $COMMAND | grep -i '$ITEM'` if [ "$?" == "0" ] then - BEFORE="$(date +%s)" eval "$COMMAND" >> "$ITEM_LOG_FILE" 2>&1 ERROR="$?" - AFTER="$(date +%s)" + MYPID="$!" else - EXECME='$COMMAND"$ITEM" >> "$ITEM_LOG_FILE" 2>&1' - BEFORE="$(date +%s)" - eval "$EXECME" + eval '$COMMAND"$ITEM" >> "$ITEM_LOG_FILE" 2>&1' ERROR="$?" - AFTER="$(date +%s)" + MYPID="$!" fi + AFTER="$(date +%s)" echo -e "" >> "$ITEM_LOG_FILE" # Some error logging. Success or fail. if [ ! "$ERROR" == "0" ] then - echo -e "Status:\t\tError - something went wrong." >> "$ITEM_LOG_FILE" + echo -e "Status:\t\tFAILURE" >> "$ITEM_LOG_FILE" else - echo -e "Status:\t\tSucces - item has been processed." >> "$ITEM_LOG_FILE" + echo -e "Status:\t\tSUCCESS" >> "$ITEM_LOG_FILE" fi + # + # If part of a cluster, remove the downloaded item after + # it has been processed and uploaded as not to fill up disk space. + # if [ "$TRANSFER_TO_SLAVE" == "1" ] then if [ -e "$ITEM" ] then - rm $ITEM + rm "$ITEM" else log DEBUG "ERROR Something went wrong removing item $ITEM from local work dir." fi fi - if [ ! -z "$REMOTE_OUTPUT_DIR" ] && [ ! -z "$SSH_SERVER" ] + escape_item "$DIRNAME" + ITEM_OUTPUT_DIR="$REMOTE_OUTPUT_DIR/$ITEM_ESCAPED" + + exec_cmd "mkdir -p $ITEM_OUTPUT_DIR" + if [ "$DIRNAME" == "." ] then - upload_item "$PPSS_LOCAL_OUTPUT/$ITEM_NO_PATH/*" + DIRNAME="" fi + upload_item "$PPSS_LOCAL_OUTPUT/$ITEM_NO_PATH" "$DIRNAME" elapsed "$BEFORE" "$AFTER" >> "$ITEM_LOG_FILE" echo -e "" >> "$ITEM_LOG_FILE" if [ ! -z "$SSH_SERVER" ] then - log DEBUG "Uploading item log file $ITEM_LOG_FILE to master." - scp -q $SSH_OPTS $SSH_KEY $ITEM_LOG_FILE $USER@$SSH_SERVER:~/$JOB_LOG_DIR/ + log DEBUG "Uploading item log file $ITEM_LOG_FILE to master ~/$PPSS_HOME_DIR/$JOB_LOG_DIR" + scp -q $SSH_OPTS $SSH_KEY "$ITEM_LOG_FILE" $USER@$SSH_SERVER:~/$PPSS_HOME_DIR/$JOB_LOG_DIR + if [ ! "$?" == "0" ] + then + log DEBUG "Uploading of item log file failed." + fi fi fi @@ -1149,15 +1501,115 @@ commando () { return $? } +# # This is the listener service. It listens on the pipe for events. # A job is executed for every event received. +# This listener enables fully asynchronous processing. +# listen_for_job () { - - log INFO "Listener started." + FINISHED=0 + DIED=0 + PIDS="" + log DEBUG "Listener started." while read event <& 42 do - commando "$event" & + if [ "$event" == "$STOP_KEY" ] + then + # + # The start_single_worker method sends a special signal to + # inform the listener that a worker is finished. + # If all workers are finished, it is time to stop. + # This mechanism makes PPSS asynchronous. + # + ((DIED++)) + if [ "$DIED" -ge "$MAX_NO_OF_RUNNING_JOBS" ] + then + #kill_process + break + else + RES=$((MAX_NO_OF_RUNNING_JOBS-DIED)) + if [ "$RES" == "1" ] + then + log INFO "$((MAX_NO_OF_RUNNING_JOBS-DIED)) job is remaining. " + else + log INFO "$((MAX_NO_OF_RUNNING_JOBS-DIED)) jobs are remaining." + echo -en "\033[1A" + fi + fi + elif [ "$event" == "$KILL_KEY" ] + then + # + # If all workers are finished, it is time to kill all remaining + # processes, terminate ssh processes and the listener itself. + # + # This command kills all processes that are related to the master + # process as defined by $PID. All processes that have ever been + # spawned, although disowned or backgrounded will be killed... + # + PROCLIST=`ps a -o pid,pgid,ppid,command | grep [0-9] | grep $PID | grep -v -i grep` + #echo "$PROCLIST" > proclist.txt + oldIFS=$IFS # save the field separator + IFS=$'\n' # new field separator, the end of line + for x in `echo "$PROCLIST"` + do + MYPPID=`echo $x | awk '{ print $3 }'` + MYPID=`echo $x | awk '{ print $1 }'` + if [ ! "$MYPPID" == "$PID" ] && [ ! "$MYPPID" == "1" ] + then + if [ ! "$MYPID" == "$PID" ] + then + log DEBUG "Killing process $MYPID" + kill $MYPID >> /dev/null 2>&1 + else + log DEBUG "Not killing master process..$MYPID.." + fi + else + log DEBUG "Not killing listener process. $MYPID.." + fi + done + IFS=$oldIFS + + if [ ! -z "$SSH_MASTER_PID" ] + then + kill "$SSH_MASTER_PID" + fi + break + else + commando "$event" & + MYPID="$!" + disown + PIDS="$PIDS $MYPID" + #log DEBUG "Event $event has pid $MYPID" + fi + + get_global_lock + SIZE_OF_ARRAY="${#ARRAY[@]}" + ARRAY_POINTER=`cat $ARRAY_POINTER_FILE` + release_global_lock + PERCENT=$((100 * $ARRAY_POINTER / $SIZE_OF_ARRAY )) + if [ "$DIED" == "0" ] && [ "$FINISHED" == "0" ] + then + log INFO "Currently $PERCENT percent complete. Processed $ARRAY_POINTER of $SIZE_OF_ARRAY items." + if [ "$PERCENT" == "100" ] + then + FINISHED=1 + else + echo -en "\033[1A" + fi + fi done + + set_status STOPPED + log DEBUG "Listener stopped." + if [ ! "$PERCENT" == "100" ] + then + echo + log INFO "Finished. Consult $JOB_LOG_DIR for job output." + log INFO "Press ENTER to continue." + else + log INFO "Finished. Consult $JOB_LOG_DIR for job output." + fi + cleanup } # This starts an number of parallel workers based on the # of parallel jobs allowed. @@ -1165,19 +1617,37 @@ start_all_workers () { if [ "$MAX_NO_OF_RUNNING_JOBS" == "1" ] then - log INFO "Starting $MAX_NO_OF_RUNNING_JOBS worker." + log INFO "Starting $MAX_NO_OF_RUNNING_JOBS single worker." else - log INFO "Starting $MAX_NO_OF_RUNNING_JOBS workers." + log INFO "Starting $MAX_NO_OF_RUNNING_JOBS parallel workers." fi + log INFO "---------------------------------------------------------" i=0 while [ "$i" -lt "$MAX_NO_OF_RUNNING_JOBS" ] do start_single_worker ((i++)) + + if [ ! "$MAX_DELAY" == "0" ] + then + random_delay "$MAX_DELAY" + fi done } +get_status_of_node () { + + NODE="$1" + STATUS=`ssh -o ConnectTimeout=10 $SSH_KEY $USER@$NODE cat "$PPSS_HOME_DIR/$NODE_STATUS" 2>/dev/null` + ERROR="$?" + if [ ! "$ERROR" == "0" ] + then + STATUS="UNKNOWN" + fi + echo "$STATUS" +} + show_status () { source $CONFIG @@ -1188,43 +1658,73 @@ show_status () { if [ -z "$INPUT_FILE" ] then - ITEMS=`exec_cmd "ls -1 $SRC_DIR | wc -l"` + ITEMS=`exec_cmd "ls -1 $SRC_DIR | wc -l"` else - ITEMS=`exec_cmd "cat $INPUT_FILE | wc -l"` + ITEMS=`exec_cmd "cat $PPSS_DIR/$INPUT_FILE | wc -l"` fi - PROCESSED=`exec_cmd "ls -1 $ITEM_LOCK_DIR | wc -l"` - STATUS=$((100 * $PROCESSED / $ITEMS)) + PROCESSED=`exec_cmd "ls -1 $ITEM_LOCK_DIR | wc -l"` 2>&1 >> /dev/null + TMP_STATUS=$((100 * $PROCESSED / $ITEMS)) - log INFO "$STATUS percent complete." + log INFO "Status:\t\t$TMP_STATUS percent complete." + if [ ! -z $NODES_FILE ] + then + TMP_NO=`cat $NODES_FILE | wc -l` + log INFO "Nodes:\t $TMP_NO" + fi + log INFO "Items:\t\t$ITEMS" + + + log INFO "---------------------------------------------------------" + HEADER=`echo IP-address Hostname Processed Status | awk '{ printf ("%-16s %-18s % 10s %10s\n",$1,$2,$3,$4) }'` + log INFO "$HEADER" + log INFO "---------------------------------------------------------" + PROCESSED=0 + for x in `cat $NODES_FILE` + do + NODE=`get_status_of_node "$x" | awk '{ print $1 }'` + if [ ! "$NODE" == "UNKNOWN" ] + then + STATUS=`get_status_of_node "$x" | awk '{ print $2 }'` + RES=`exec_cmd "grep -i $NODE ~/$PPSS_HOME_DIR/$JOB_LOG_DIR/* 2>/dev/null | wc -l "` + if [ ! "$?" == "0" ] + then + RES=0 + fi + else + STATUS="UNKNOWN" + RES=0 + fi + let PROCESSED=$PROCESSED+$RES + LINE=`echo "$x $NODE $RES $STATUS" | awk '{ printf ("%-16s %-18s % 10s %10s\n",$1,$2,$3,$4) }'` + log INFO "$LINE" + done + log INFO "---------------------------------------------------------" + LINE=`echo $PROCESSED | awk '{ printf ("Total processed: % 29s\n",$1) }'` + log INFO "$LINE" } # If this is called, the whole framework will execute. main () { - is_running - case $MODE in - node|standalone ) - log DEBUG "---------------- START ---------------------" - log INFO "$SCRIPT_NAME version $SCRIPT_VERSION" - log INFO `hostname` + node ) init_vars test_server get_all_items - listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & + listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & 2>&1 >> /dev/null LISTENER_PID=$! start_all_workers ;; - start ) + start ) # This option only starts all nodes. - init_vars - + display_header if [ ! -e "$NODES_FILE" ] then - log INFO "ERROR file $NODES with list of nodes does not exist." + log ERROR "File $NODES with list of nodes does not exist." + set_status STOPPED cleanup exit 1 else @@ -1237,7 +1737,7 @@ main () { exit 0 ;; config ) - + display_header log INFO "Generating configuration file $CONFIG" add_var_to_config PPSS_LOCAL_TMPDIR "$PPSS_LOCAL_TMPDIR" add_var_to_config PPSS_LOCAL_OUTPUT "$PPSS_LOCAL_OUTPUT" @@ -1246,18 +1746,21 @@ main () { ;; stop ) + display_header log INFO "Stopping PPSS on all nodes." exec_cmd "touch $STOP_SIGNAL" cleanup exit ;; pause ) + display_header log INFO "Pausing PPSS on all nodes." exec_cmd "touch $PAUSE_SIGNAL" cleanup exit ;; continue ) + display_header if does_file_exist "$STOP_SIGNAL" then log INFO "Continuing processing, please use $0 start to start PPSS on al nodes." @@ -1269,66 +1772,53 @@ main () { exec_cmd "rm -f $PAUSE_SIGNAL" fi cleanup - exit + exit 0 ;; deploy ) + display_header log INFO "Deploying PPSS on nodes." deploy_ppss + wait cleanup exit 0 ;; status ) + display_header show_status cleanup exit 0 - # some show command ;; erase ) + display_header log INFO "Erasing PPSS from all nodes." erase_ppss cleanup exit 0 ;; - * ) - showusage - exit 1 + kill ) + for x in `ps ux | grep ppss | grep -v grep | grep bash | awk '{ print $2 }'` + do + kill "$x" + done + cleanup + exit 0 ;; + + * ) + init_vars + get_all_items + listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & 2>&1 >> /dev/null + LISTENER_PID=$! + #log DEBUG "Master PID is $PID." + #log DEBUG "Listener PID is $LISTENER_PID." + start_all_workers + ;; + esac } # This command starts the that sets the whole framework in motion. main -# Either start new jobs or exit, sleep in the meantime. -while true -do - sleep 5 - JOBS=`ps ax | grep -v grep | grep -v -i screen | grep ppss.sh | wc -l` - log INFO "There are $JOBS running processes. " - - MIN_JOBS=3 - - if [ "$ARCH" == "Darwin" ] - then - MIN_JOBS=4 - elif [ "$ARCH" == "Linux" ] - then - MIN_JOBS=3 - fi - - if [ "$JOBS" -gt "$MIN_JOBS" ] - then - log INFO "Sleeping $INTERVAL seconds." - sleep $INTERVAL - else - echo -en "\033[1B" - log INFO "There are no more running jobs, so we must be finished." - echo -en "\033[1B" - log INFO "Killing listener and remainig processes." - log INFO "Dying processes may display an error message." - kill_process - fi -done - # Exit after all processes have finished. -wait +wait