#!/usr/bin/env bash # # PPSS, the Parallel Processing Shell Script # # Copyright (c) 2010, Louwrentius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # See # for a copy of the GNU General Public License # # "Patches or other contributions are always welcome!" # # # Handling control-c for a clean shutdown. # trap 'kill_process' SIGINT SCRIPT_NAME="Distributed Parallel Processing Shell Script" SCRIPT_VERSION="2.70" # # The first argument to this script can be a mode. # MODES="node start config stop pause continue deploy status erase kill ec2" for x in $MODES do if [ "$x" == "$1" ] then MODE="$1" shift break fi done # # The working directory of PPSS can be set with # export PPSS_DIR=/path/to/workingdir # if [ -z "$PPSS_DIR" ] then PPSS_DIR="ppss_dir" fi CONFIG="" HOSTNAME="`hostname`" ARCH="`uname`" PPSS_HOME_DIR="ppss-home" SOURCED="$0" PID="$$" GLOBAL_LOCK="$PPSS_DIR/PPSS-GLOBAL-LOCK-$PID" # Global lock file used by local PPSS instance. PAUSE_SIGNAL="$PPSS_HOME_DIR/$PPSS_DIR/pause_signal" # Pause processing if this file is present. PAUSE_DELAY="60" # Polling every 1 minutes by default. STOP_SIGNAL="$PPSS_HOME_DIR/$PPSS_DIR/stop_signal" # Stop processing if this file is present. GLOBAL_COUNTER="" GLOBAL_COUNTER_FILE="$PPSS_DIR/ppss-input-counter-$PID" LOCAL_INPUT_FILE="$PPSS_DIR/INPUT_FILE-$PID" JOB_LOG_DIR="$PPSS_DIR/job_log" # Directory containing log files of processed items. LOGFILE="$PPSS_DIR/ppss-log-$PID.txt" # General PPSS log file. Contains lots of info. QUIET="0" STOP="0" # STOP job. MAX_DELAY="0" # MAX DELAY between jobs. MAX_LOCK_DELAY="9" # PERCENT="0" LISTENER_PID="" IFS_BACKUP="$IFS" CPUINFO="/proc/cpuinfo" PROCESSORS="" STOP_KEY="$RANDOM$RANDOM$RANDOM" # This is a signal to the listener to stop. KILL_KEY="$RANDOM$RANDOM$RANDOM" # This is a signal to stop immediately and kill RECURSION="1" # all running processes. START_PPSS="" STOP_PPSS="" SIZE_OF_INPUT="" SSH_SERVER="" # Remote server or 'master'. SSH_KEY="" # SSH key for ssh account. SSH_KNOWN_HOSTS="" SSH_SOCKET="/tmp/ppss_ssh_socket-$$" # Multiplex multiple SSH connections over 1 master. SSH_OPTS="-o BatchMode=yes -o ControlPath=$SSH_SOCKET \ -o GlobalKnownHostsFile=./known_hosts \ -o ControlMaster=auto \ -o Cipher=blowfish \ -o ConnectTimeout=10 " SSH_OPTS_NOMP="-o BatchMode=yes -o GlobalKnownHostsFile=./known_hosts \ -o Cipher=blowfish \ -o ConnectTimeout=10 " # Blowfish is faster but still secure. SSH_MASTER_PID="" ITEM_LOCK_DIR="$PPSS_DIR/PPSS_ITEM_LOCK_DIR" # Remote directory on master used for item locking. PPSS_LOCAL_TMPDIR="$PPSS_DIR/PPSS_LOCAL_TMPDIR" # Local directory on slave for local processing. PPSS_LOCAL_OUTPUT="$PPSS_DIR/PPSS_LOCAL_OUTPUT" # Local directory on slave for local output. DOWNLOAD_TO_NODE="0" # Transfer item to slave via (s)cp. UPLOAD_TO_SERVER="0" # Transfer output back to server via (s)cp. SECURE_COPY="1" # If set, use SCP, Otherwise, use cp. REMOTE_OUTPUT_DIR="" # Remote directory to which output must be uploaded. SCRIPT="" # Custom user script that is executed by ppss. ITEM_ESCAPED="" NODE_STATUS="$PPSS_DIR/status.txt" DAEMON=0 showusage_short () { echo echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION" echo echo "usage: $0 [ -d | -f ] [ -c ' \"\$ITEM\"' ]" echo " [ -C ] [ -j ] [ -l ] [ -p <# jobs> ]" echo " [ -q ] [ -D ] [ -h ] [ --help ] [ -r ] [ --daemon ]" echo echo "Examples:" echo " $0 -d /dir/with/some/files -c 'gzip '" echo " $0 -d /dir/with/some/files -c 'cp \"\$ITEM\" /tmp' -p 2" echo " $0 -f -c 'wget -q -P /destination/directory \"\$ITEM\"' -p 10" } showusage_normal () { echo echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION" echo echo "PPSS is a Bash shell script that executes commands in parallel on a set" echo "of items, such as files in a directory, or lines in a file. The purpose" echo "of PPSS is to make it simple to benefit from multiple CPUs or CPU cores." echo echo "This short summary only discusses options for stand-alone mode. For a" echo "full listing of all options, run PPSS with the options --help" echo echo "Usage $0 [ options ]" echo echo -e "--command | -c Command to execute. Syntax: ' ' including the single quotes." echo -e " Example: -c 'ls -alh '. It is also possible to specify where an item " echo -e " must be inserted: 'cp \"\$ITEM\" /somedir'." echo echo -e "--sourcedir | -d Directory that contains files that must be processed. Individual files" echo -e " are fed as an argument to the command that has been specified with -c." echo echo -e "--sourcefile | -f Each single line of the supplied file will be fed as an item to the" echo -e " command that has been specified with -c. Read input from stdin with" echo -e " -f -" echo echo -e "--config | -C If the mode is config, a config file with the specified name will be" echo -e " generated based on all the options specified. In the other modes". echo -e " this option will result in PPSS reading the config file and start" echo -e " processing items based on the settings of this file." echo echo -e "--disable-ht | -j Disable hyper threading. Is enabled by default." echo echo -e "--log | -l Sets the name of the log file. The default is ppss-log.txt." echo echo -e "--processes | -p Start the specified number of processes. Ignore the number of available" echo -e " CPUs." echo echo -e "--quiet | -q Shows no output except for a progress indication using percents." echo echo -e "--delay | -D Adds an initial random delay to the start of all parallel jobs to spread" echo -e " the load. The delay is only used at the start of all 'threads'." echo echo -e "--daemon Do not exit after items are professed, but keep looking for new items" echo -e " and process them. Read the manual how to use this!" echo echo -e "--no-recursion|-r By default, recursion of directories is enabled when the -d option is " echo -e " used. If this is not prefered, this can be disabled with this option " echo -e " Only files within the specified directory will be processed." echo echo -e "--help Extended help, including options for distributed mode and Amazon EC2." echo echo -e "Example: encoding some wav files to mp3 using lame:" echo echo -e "$0 -d /path/to/wavfiles -c 'lame '" echo echo -e "Extended usage: use --help" echo } showusage_long () { echo echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION" echo echo "PPSS is a Bash shell script that executes commands in parallel on a set " echo "of items, such as files in a directory, or lines in a file." echo echo "Usage: $0 [ MODE ] [ options ]" echo echo "Modes are optional and mainly used for running in distributed mode. Modes are:" echo echo " config Generate a config file based on the supplied option parameters." echo " deploy Deploy PPSS and related files on the specified nodes." echo " erase Erase PPSS and related files from the specified nodes." echo " ec2 Start up Amazon EC2 instances and deploy PPSS on nodes." echo echo " start Starting PPSS on nodes." echo " pause Pausing PPSS on all nodes." echo " stop Stopping PPSS on all nodes." echo " continue Continuing PPSS on all nodes." echo " node Running PPSS as a node, requires additional options." echo echo "Options are:" echo echo -e "--command | -c Command to execute. Syntax: ' ' including the single quotes." echo -e " Example: -c 'ls -alh '. It is also possible to specify where an item " echo -e " must be inserted: 'cp \"\$ITEM\" /somedir'." echo echo -e "--sourcedir | -d Directory that contains files that must be processed. Individual files" echo -e " are fed as an argument to the command that has been specified with -c." echo echo -e "--sourcefile | -f Each single line of the supplied file will be fed as an item to the" echo -e " command that has been specified with -c. Instead of a file, stdin can" echo -e " be specified like \"-f -\" in order to 'pipe' items to ppss." echo -e " Example: cat file | ppss -f - -c 'echo '" echo echo -e "--config | -C If the mode is config, a config file with the specified name will be" echo -e " generated based on all the options specified. In the other modes". echo -e " this option will result in PPSS reading the config file and start" echo -e " processing items based on the settings of this file." echo echo -e "--disable-ht | -j Disable hyper threading. Is enabled by default." echo echo -e "--log | -l Sets the name of the log file. The default is ppss-log-.txt." echo echo -e "--processes | -p Start the specified number of processes. Ignore the number of available" echo -e " CPUs." echo echo -e "--quiet | -q Shows no output except for a progress indication using percents." echo echo -e "--delay | -D Adds an initial random delay to the start of all parallel jobs to spread" echo -e " the load. The delay is only used at the start of all 'threads'." echo echo -e "--daemon Do not exit after items are professed, but keep looking for new items" echo -e " and process them. Read the manual how to use this!" echo echo -e "--no-recursion|-r By default, recursion of directories is enabled when the -d option is " echo -e " used. If this is not prefered, this can be disabled with this option." echo -e " Only files within the specified directory will be processed." echo echo -e "The following options are used for distributed execution of PPSS." echo echo -e "--master | -m Specifies the SSH server that is used for communication between nodes." echo -e " Using SSH, file locks are created, informing other nodes that an item " echo -e " is locked. If items are files that must be processed, they must reside" echo -e " on this host. SCP is used to transfer files from this host to nodes" echo -e " for local procesing." echo echo -e "--node | -n File containig a list of nodes that act as PPSS clients. One IP / DNS" echo -e " name per line." echo echo -e "--key | -k The SSH key that a node uses to connect to the master." echo echo -e "--known-hosts | -K The file that contains the server public key. Can often be found on " echo -e " hosts that already once connected to the server. See the file " echo -e " ~/.ssh/known_hosts or else, manualy connect once and check this file." echo echo -e "--user | -u The SSH user name that is used by the node when logging in into the" echo -e " master SSH server." echo echo -e "--script | -S Specifies the script/program that must be copied to the nodes for" echo -e " execution through PPSS. Only used in the deploy mode." echo -e " This option should be specified if necessary when generating a config." echo echo -e "--download This option specifies that an item will be downloaded by the node" echo -e " from the server or share to the local node for processing." echo echo -e "--upload This option specifies that the output file will be copied back to" echo -e " the server, the --outputdir option is mandatory." echo echo -e "--no-scp | -b Do not use scp for downloading items. Use cp instead. Assumes that a" echo -e " network file system (NFS/SMB) is mounted under a local mount point." echo echo -e "--outputdir | -o Directory on server where processed files are put. If the result of " echo -e " encoding a wav file is an mp3 file, the mp3 file is put in the " echo -e " directory specified with this option." echo echo -e "--homedir | -H Directory in which PPSS is installed on the node." echo -e " Default is '$PPSS_HOME_DIR'." echo echo -e "--script | -S Script to run on the node. PPSS must copy this script to the node." echo echo -e "Amazon EC2 platform specific options:" echo echo -e "--awskeypair | -P The Amazon EC2 SSH keypair that new instances should use." echo echo -e "--AMI | -A The Amazon Machine Image that should be used to create new" echo -e " running instances." echo echo -e "--type | -T The type of EC2 instance that should be created." echo -e " Example: c1.xlarge or m1.medium" echo echo -e "--security | -G The security group that should be used for networking access." echo echo -e "--instances | -I The number of instances that should be started." echo echo echo -e "Example: encoding some wav files to mp3 using lame:" echo echo -e "$0 -c 'lame ' -d /path/to/wavfiles -j " echo echo -e "Running PPSS based on a configuration file." echo echo -e "$0 -C config.cfg" echo echo -e "Generating a configuration file. Wavs are converted to mp3. SCP is used for data transfer." echo echo -e "$0 config -C ppss-config.cfg -d /some/dir -o output --download --upload -K known_hosts \\" echo -e "-k ppss-key.dsa -n nodes.txt -m 10.0.0.100 \\" echo -e "-c 'lame --quiet \"\$ITEM\" -o \"\$OUTPUT_DIR/\$OUTPUT_FILE\".mp3' " echo echo -e "Running PPSS on a client as part of a cluster." echo echo -e "$0 node -d /somedir -c 'cp \"\$ITEM\" /some/destination' -m 10.0.0.50 -u ppss -k ppss-key.key" echo } kill_process () { echo "$KILL_KEY" >> "$FIFO" } exec_cmd () { STATUS="" CMD="$1" NOMP="$2" # Disable multiplexing. if [ ! -z "$SSH_SERVER" ] then if [ -z "$NOMP" ] then log DEBUG "REMOTE EXEC" log DEBUG "$USER@$SSH_SERVER $CMD" ssh $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER $CMD STATUS=$? elif [ "$NOMP" == "1" ] then log DEBUG "REMOTE EXEC NO MP" ssh $SSH_OPTS_NOMP $SSH_KEY $USER@$SSH_SERVER $CMD STATUS=$? fi else eval "$CMD" STATUS=$? log DEBUG "LOCAL EXEC - status is $STATUS" fi return $STATUS } does_file_exist () { # # this function makes remote or local checking of existence of items transparent. # FILE="$1" RES=`exec_cmd "ls -1 $FILE" 2>&1` if [ "$?" = "0" ] then log DEBUG "$FILE does exist - $RES" return 0 else log DEBUG "$FILE does not exist - $RES" return 1 fi } check_for_interrupt () { # # PPSS can be interupted with a stop or pause command. # does_file_exist "$STOP_SIGNAL" if [ "$?" = "0" ] then set_status "STOPPED" log INFO "STOPPING job. Stop signal found." STOP="1" return 1 fi does_file_exist "$PAUSE_SIGNAL" if [ "$?" = "0" ] then set_status "PAUZED" log INFO "PAUSE: sleeping for $PAUSE_DELAY SECONDS." sleep $PAUSE_DELAY check_for_interrupt else set_status "RUNNING" fi } cleanup () { log DEBUG "$FUNCNAME - Cleaning up all temp files and processes." if [ -e "$FIFO" ] then rm "$FIFO" fi if [ -e "$GLOBAL_COUNTER_FILE" ] then rm "$GLOBAL_COUNTER_FILE" fi if [ -e "$GLOBAL_LOCK" ] then rm -rf "$GLOBAL_LOCK" fi if [ -e "$SSH_SOCKET" ] then rm -rf "$SSH_SOCKET" fi } add_var_to_config () { if [ "$MODE" == "config" ] then VAR="$1" VALUE="$2" echo -e "$VAR=$VALUE" >> $CONFIG fi } is_var_empty () { if [ -z "$1" ] then showusage_normal cleanup exit 1 fi } process_arguments () { # # Process any command-line options that are specified." # if [ "$#" = "0" ] then showusage_short exit 1 fi while [ $# -gt 0 ] do case $1 in --config|-C ) CONFIG="$2" is_var_empty "$CONFIG" if [ "$MODE" == "config" ] then if [ -e "$CONFIG" ] then echo "Do want to overwrite existing config file? [y/n]" read yn if [ "$yn" == "y" ] || [ "$yn" == "yes" ] then rm "$CONFIG" else echo "Aborting..." cleanup exit fi fi fi if [ ! "$MODE" == "config" ] then source $CONFIG fi if [ ! -z "$SSH_KEY" ] then SSH_KEY="-i $SSH_KEY" fi if [ ! -e "./known_hosts" ] then if [ -e $SSH_KNOWN_HOSTS ] then if [ ! "$SSH_KNOWN_HOSTS" == "known_hosts" ] then cat $SSH_KNOWN_HOSTS > ./known_hosts fi else echo "File $SSH_KNOWN_HOSTS does not exist." exit fi fi shift 2 ;; --working-dir|-w ) PPSS_DIR="$2" add_var_to_config PPSS_DIR "$PPSS_DIR" shift 2 ;; --node|-n ) NODES_FILE="$2" add_var_to_config NODES_FILE "$NODES_FILE" shift 2 ;; --sourcefile|-f ) INPUT_FILE="$2" is_var_empty "$INPUT_FILE" add_var_to_config INPUT_FILE "$INPUT_FILE" shift 2 ;; --sourcedir|-d ) SRC_DIR="$2" is_var_empty "$SRC_DIR" add_var_to_config SRC_DIR "$SRC_DIR" shift 2 ;; --delay|-D) MAX_DELAY="$2" add_var_to_config MAX_DELAY "$MAX_DELAY" shift 2 ;; --daemon) DAEMON="1" QUIET="1" add_var_to_config DAEMON "$DAEMON" add_var_to_config QUIET "$QUIET" shift 1 ;; --awskeypair|-P) AWS_KEYPAIR="$2" add_var_to_config AWS_KEYPAIR "$AWS_KEYPAIR" shift 2 ;; --AMI|-A) AMI_ID="$2" add_var_to_config AMI_ID "$AMI_ID" shift 2 ;; --type|-T) INSTANCE_TYPE="$2" add_var_to_config INSTANCE_TYPE "$INSTANCE_TYPE" shift 2 ;; --security|-G) SECURITY_GROUP="$2" add_var_to_config SECURITY_GROUP "$SECURITY_GROUP" shift 2 ;; --instances|-I) NUM_NODES="$2" add_var_to_config NUM_NODES "$NUM_NODES" shift 2 ;; --command|-c ) COMMAND="$2" is_var_empty "$COMMAND" if [ "$MODE" == "config" ] then COMMAND=\'$COMMAND\' add_var_to_config COMMAND "$COMMAND" fi shift 2 ;; -h ) showusage_normal exit 1;; --help) showusage_long exit 1;; --homedir|-H ) if [ ! -z "$2" ] then PPSS_HOME_DIR="$2" add_var_to_config PPSS_DIR $PPSS_HOME_DIR shift 2 fi ;; --disable-ht|-j ) HYPERTHREADING=no add_var_to_config HYPERTHREADING $HYPERTHREADING shift 1 ;; --log|-l ) LOGFILE="$2" add_var_to_config LOGFILE "$LOGFILE" shift 2 ;; --no-recursion|-r ) RECURSION="0" add_var_to_config LOGFILE "$RECURSION" shift 1 ;; --workingdir|-w ) WORKINGDIR="$2" add_var_to_config WORKINGDIR "$WORKINGDIR" shift 2 ;; --key|-k ) SSH_KEY="$2" is_var_empty "$SSH_KEY" add_var_to_config SSH_KEY "$SSH_KEY" if [ ! -z "$SSH_KEY" ] then SSH_KEY="-i $SSH_KEY" fi shift 2 ;; --known-hosts | -K ) SSH_KNOWN_HOSTS="$2" add_var_to_config SSH_KNOWN_HOSTS "$SSH_KNOWN_HOSTS" shift 2 ;; --no-scp |-b ) SECURE_COPY=0 add_var_to_config SECURE_COPY "$SECURE_COPY" shift 1 ;; --outputdir|-o ) REMOTE_OUTPUT_DIR="$2" add_var_to_config REMOTE_OUTPUT_DIR "$REMOTE_OUTPUT_DIR" shift 2 ;; --processes|-p ) TMP="$2" if [ ! -z "$TMP" ] then MAX_NO_OF_RUNNING_JOBS="$TMP" add_var_to_config MAX_NO_OF_RUNNING_JOBS "$MAX_NO_OF_RUNNING_JOBS" shift 2 fi ;; --master|-m ) SSH_SERVER="$2" add_var_to_config SSH_SERVER "$SSH_SERVER" shift 2 ;; --script|-S ) SCRIPT="$2" add_var_to_config SCRIPT "$SCRIPT" shift 2 ;; --download) DOWNLOAD_TO_NODE="1" add_var_to_config DOWNLOAD_TO_NODE "$DOWNLOAD_TO_NODE" shift 1 ;; --upload) if [ -z "$REMOTE_OUTPUT_DIR" ] then echo "ERROR: no server-side output directory specified with -o" exit 1 fi UPLOAD_TO_SERVER="1" add_var_to_config UPLOAD_TO_SERVER "$UPLOAD_TO_SERVER" shift 1 ;; --quiet|-q ) QUIET="1" add_var_to_config QUIET "$QUIET" shift 1 ;; --user|-u ) USER="$2" add_var_to_config USER "$USER" shift 2 ;; --version|-v ) echo "" echo "$SCRIPT_NAME version $SCRIPT_VERSION" echo "" exit 0 ;; * ) showusage_short echo echo "Unknown option $1 " echo exit 1;; esac done if [ -z "$SRC_DIR" ] && [ -z "$INPUT_FILE" ] then showusage_short echo log ERROR "No source file or directory specified with -f or -d." cleanup exit 1 fi if [ "$DAEMON" == "1" ] && [ -z "$SRC_DIR" ] then showusage_short echo echo "Daemon mode requires an argument to the -d option as a place to put the lock dir." echo "Read the on-line manual for more information." exit 1 fi } display_header () { log DSPLY "" log DSPLY "=========================================================" log DSPLY " |P|P|S|S| " log DSPLY "$SCRIPT_NAME vers. $SCRIPT_VERSION" log DSPLY "=========================================================" log DSPLY "Hostname:\t\t$HOSTNAME" log DSPLY "---------------------------------------------------------" } create_working_directory () { if [ ! -e "$PPSS_DIR" ] then mkdir -p "$PPSS_DIR" fi } expand_str () { STR=$1 LENGTH=$TYPE_LENGTH SPACE=" " while [ "${#STR}" -lt "$LENGTH" ] do STR=$STR$SPACE done echo "$STR" } are_we_sourced () { if [ ! "$SOURCED" == "./ppss" ] then return 0 else return 1 fi } get_time_in_seconds () { if [ "$ARCH" == "SunOS" ] then # # Dirty hack because this ancient operating system does not support +%s... # THE_TIME=`truss /usr/bin/date 2>&1 | grep ^time | awk '{ print $3 }'` else THE_TIME="$(date +%s)" fi echo "$THE_TIME" } set_md5 () { case $ARCH in "Darwin") MD5=md5 ;; "FreeBSD") MD5=md5 ;; "SunOS") MD5="digest -a md5" ;; "Linux") MD5=md5sum ;; esac echo "test" | $MD5 > /dev/null 2>&1 if [ ! "$?" ] then LOG ERROR "ERROR - PPSS requires $MD5. It may not be within the path or installed." return 1 else return 0 fi } log () { # # Type 'DSPLY ERROR and WARN' is logged to the screen # Any other log-type is only logged to the logfile. # TYPE="$1" MESG="$2" TYPE_LENGTH=5 # # Performance hack. Don't go through all the code if not required. # if [ "$TYPE" = "DEBUG" ] && [ "$PPSS_DEBUG" == "0" ] then return fi TYPE_EXP=`expand_str "$TYPE"` DATE=`date +%b\ %d\ %H:%M:%S` PREFIX="$DATE: ${TYPE_EXP:0:$TYPE_LENGTH}" PREFIX_SMALL="$DATE: " LOG_MSG="$PREFIX $MESG" ECHO_MSG="$PREFIX_SMALL $MESG" if [ ! -z "$PPSS_DEBUG" ] && [ ! "$PPSS_DEBUG" == "0" ] then echo -e "$LOG_MSG" >> "$LOGFILE" elif [ "$TYPE" == "INFO" ] || [ "$TYPE" == "ERROR" ] || [ "$TYPE" == "WARN" ] || [ "$TYPE" == "DSPLY" ] then echo -e "$LOG_MSG" >> "$LOGFILE" fi if [ "$TYPE" == "DSPLY" ] || [ "$TYPE" == "ERROR" ] || [ "$TYPE" == "WARN" ] && [ "$QUIET" == "0" ] then echo -e "$ECHO_MSG" elif [ "$TYPE" == "ERROR" ] && [ "$QUIET" == "1" ] then echo -e "$ECHO_MSG" fi if [ "$TYPE" == "PRCNT" ] then echo -en "\r$ECHO_MSG" fi } # Init all vars init_vars () { # # Get start time to measure how long PPSS has been running. # START_PPSS=`get_time_in_seconds` # # Check if MD5(SUM) is present on the system. # set_md5 # # Is PPSS run as a daemon? Then use input locking, which is not required otherwise. # if [ "$DAEMON" == "1" ] then INPUT_LOCK="$SRC_DIR/INPUT_LOCK" fi # # For some strange reason, this value differ on different operating systems due to # different behaviour betwen the ps utilily acros operating systems. # if [ "$ARCH" == "Darwin" ] then MIN_JOBS=4 elif [ "$ARCH" == "Linux" ] then MIN_JOBS=3 fi # # Create a remote homedir for PPSS # does_file_exist "$PPSS_HOME_DIR" if [ ! "$?" = "0" ] && [ ! -z "$SSH_SERVER" ] then log DEBUG "Remote PPSS home directory $PPSS_HOME_DIR does not exist. Creating." exec_cmd "mkdir -p $PPSS_HOME_DIR/$PPSS_DIR" fi echo 1 > $GLOBAL_COUNTER_FILE FIFO=/tmp/ppss-fifo-$RANDOM-$RANDOM if [ ! -e "$FIFO" ] then mkfifo -m 600 $FIFO fi exec 42<> $FIFO set_status "RUNNING" if [ -e "$CPUINFO" ] then CPU=`cat $CPUINFO | grep 'model name' | cut -d ":" -f 2 | sed -e s/^\ //g | sort | uniq` log DSPLY "CPU: $CPU" elif [ "$ARCH" == "Darwin" ] then MODEL=`system_profiler SPHardwareDataType | grep "Processor Name" | cut -d ":" -f 2` SPEED=`system_profiler SPHardwareDataType | grep "Processor Speed" | cut -d ":" -f 2` log DSPLY "CPU: $MODEL $SPEED" elif [ "$ARCH" == "SunOS" ] then CPU=`psrinfo -v | grep MHz | cut -d " " -f 4,8 | awk '{ printf ("Processor architecture: %s @ %s MHz.\n", $1,$2) }' | head -n 1` log DSPLY "$CPU" else log DSPLY "CPU: Cannot determine. Provide a patch for your arch!" log DSPLY "Arch is $ARCH" fi if [ -z "$MAX_NO_OF_RUNNING_JOBS" ] then get_no_of_cpus $HYPERTHREADING fi if [ ! -z "$SSH_SERVER" ] then does_file_exist "$PPSS_HOME_DIR/$JOB_LOG_DIR" if [ ! "$?" = "0" ] then log DEBUG "Remote Job log directory $PPSS_HOME_DIR/$JOB_lOG_DIR does not exist. Creating." exec_cmd "mkdir $PPSS_HOME_DIR/$JOB_LOG_DIR" fi fi if [ ! -e "$JOB_LOG_DIR" ] then mkdir -p "$JOB_LOG_DIR" fi if [ ! -z "$SSH_SERVER" ] then ITEM_LOCK_DIR="$PPSS_HOME_DIR/$ITEM_LOCK_DIR" fi does_file_exist "$ITEM_LOCK_DIR" if [ ! "$?" = "0" ] then if [ ! -z "$SSH_SERVER" ] then log DEBUG "Creating remote item lock dir." else log DEBUG "Creating local item lock dir." fi exec_cmd "mkdir $ITEM_LOCK_DIR" if [ ! "$?" ] then log DEBUG "Failed to create item lock dir." fi fi if [ ! -z "$SSH_SERVER" ] then does_file_exist "$REMOTE_OUTPUT_DIR" if [ ! "$?" = "0" ] then log DEBUG "Remote output dir $REMOTE_OUTPUT_DIR does not exist." exec_cmd "mkdir $REMOTE_OUTPUT_DIR" fi fi if [ ! -e "$PPSS_LOCAL_TMPDIR" ] then mkdir "$PPSS_LOCAL_TMPDIR" fi if [ ! -e "$PPSS_LOCAL_OUTPUT" ] then mkdir "$PPSS_LOCAL_OUTPUT" fi } get_status () { STATUS=`cat "$NODE_SATUS"` echo "$STATUS" } set_status () { STATUS="$1" echo "$HOSTNAME $STATUS" > "$NODE_STATUS" } check_status () { ERROR="$1" FUNCTION="$2" MESSAGE="$3" if [ ! "$ERROR" == "0" ] then log DSPLY "$FUNCTION - $MESSAGE" set_status ERROR cleanup exit 1 fi } erase_ppss () { echo "Are you realy sure you want to erase PPSS from all nodes!? (YES/NO)" read YN if [ "$YN" == "yes" ] || [ "$YN" == "YES" ] then for NODE in `cat $NODES_FILE` do log DSPLY "Erasing PPSS homedir $PPSS_DIR from node $NODE." ssh -q $SSH_KEY $SSH_OPTS $USER@$NODE "rm -rf $PPSS_HOME_DIR" done else log DSPLY "Aborting.." fi } ec2_get_pending_nodes() { RES="$(ec2-describe-instances | grep 'INSTANCE' | awk '{print $4}'| grep pending)" echo "$RES" } ec2_launch_nodes() { ec2run $AMI_ID -n $NUM_NODES -t $INSTANCE_TYPE -k $AWS_KEYPAIR -g $SECURITY_GROUP # # Loop until all nodes are started # STARTING="`ec2_get_pending_nodes`" while [ ! -z "$STARTING" ] do sleep 10 STARTING="`ec2_get_pending_nodes`" log DSPLY "$STARTING" done # # Write all instances / nodes to the nodes file. # ec2-describe-instances | grep 'INSTANCE' | awk '{print $4}' | sed '/terminated/d' | sed '/pending/d' >> $NODES_FILE NO_OF_NODES="`wc -l $NODES_FILE | awk '{ print $1 }'`" log DSPLY "Number of nodes / instances: $NO_OF_NODES" } deploy () { NODE="$1" SSH_OPTS_NODE="-o BatchMode=yes -o ControlPath=socket-%h \ -o GlobalKnownHostsFile=./known_hosts \ -o ControlMaster=auto \ -o Cipher=blowfish \ -o ConnectTimeout=5 " ERROR=0 set_error () { if [ ! "$1" == "0" ] then ERROR=1 fi } ssh -q -o ConnectTimeout=5 $SSH_KEY $USER@$NODE exit 0 set_error "$?" if [ ! "$ERROR" == "0" ] then log ERROR "Cannot connect to node $NODE." return fi ssh -N -M $SSH_OPTS_NODE $SSH_KEY $USER@$NODE & SSH_PID=$! KEY=`echo $SSH_KEY | cut -d " " -f 2` ssh -q $SSH_OPTS_NODE $SSH_KEY $USER@$NODE "cd ~ && mkdir $PPSS_HOME_DIR >> /dev/null 2>&1" scp -q $SSH_OPTS_NODE $SSH_KEY $0 $USER@$NODE:~/$PPSS_HOME_DIR set_error $? scp -q $SSH_OPTS_NODE $SSH_KEY $KEY $USER@$NODE:~/$PPSS_HOME_DIR set_error $? scp -q $SSH_OPTS_NODE $SSH_KEY $CONFIG $USER@$NODE:~/$PPSS_HOME_DIR set_error $? scp -q $SSH_OPTS_NODE $SSH_KEY known_hosts $USER@$NODE:~/$PPSS_HOME_DIR set_error $? if [ ! -z "$SCRIPT" ] then scp -q $SSH_OPTS_NODE $SSH_KEY $SCRIPT $USER@$NODE:~/$PPSS_HOME_DIR set_error $? fi if [ ! -z "$INPUT_FILE" ] then scp -q $SSH_OPTS_NODE $SSH_KEY $INPUT_FILE $USER@$NODE:~/$PPSS_HOME_DIR set_error $? fi if [ "$ERROR" == "0" ] then log DSPLY "PPSS installed on node $NODE." else log DSPLY "PPSS failed to install on $NODE." fi kill $SSH_PID } deploy_ppss () { if [ -z "$NODES_FILE" ] then log ERROR "ERROR - are you using the right option? -C ?" set_status ERROR cleanup exit 1 fi KEY=`echo $SSH_KEY | cut -d " " -f 2` if [ -z "$KEY" ] || [ ! -e "$KEY" ] then log ERROR "Nodes require a key file." cleanup set_status "ERROR" exit 1 fi if [ ! -e "$SCRIPT" ] && [ ! -z "$SCRIPT" ] then log ERROR "Script $SCRIPT not found." set_status "ERROR" cleanup exit 1 fi INSTALLED_ON_SSH_SERVER=0 if [ ! -e "$NODES_FILE" ] then log ERROR "File $NODES with list of nodes does not exist." set_status ERROR cleanup exit 1 else if [ "$EC2" == "1" ] then ec2_launch_nodes fi for NODE in `cat $NODES_FILE` do deploy "$NODE" & if [ "$ARCH" == "SunOS" ] then sleep 1 else sleep 0.1 fi if [ "$NODE" == "$SSH_SERVER" ] then log DEBUG "SSH SERVER $SSH_SERVER is also a node." INSTALLED_ON_SSH_SERVER=1 exec_cmd "mkdir -p $PPSS_HOME_DIR/$JOB_LOG_DIR" exec_cmd "mkdir -p $PPSS_HOME_DIR/$ITEM_LOCK_DIR" fi done if [ "$INSTALLED_ON_SSH_SERVER" == "0" ] then log DEBUG "SSH SERVER $SSH_SERVER is not a node." deploy "$SSH_SERVER" exec_cmd "mkdir -p $PPSS_HOME_DIR/$JOB_LOG_DIR" exec_cmd "mkdir -p $PPSS_HOME_DIR/$ITEM_LOCK_DIR" fi fi } start_ppss_on_node () { NODE="$1" log DSPLY "Starting PPSS on node $NODE." ssh $SSH_KEY $USER@$NODE -o ConnectTimeout=5 "cd $PPSS_HOME_DIR ; screen -d -m -S PPSS ~/$PPSS_HOME_DIR/$0 node --config ~/$PPSS_HOME_DIR/$CONFIG" } test_server () { # Testing if the remote server works as expected. if [ ! -z "$SSH_SERVER" ] then exec_cmd "date >> /dev/null" check_status "$?" "$FUNCNAME" "Server $SSH_SERVER could not be reached" ssh -N -M $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER & SSH_MASTER_PID="$!" log DEBUG "SSH Master pid is $SSH_MASTER_PID" log DSPLY "Connected to server: $SSH_SERVER" else log DEBUG "No remote server specified, assuming stand-alone mode." fi } get_no_of_cpus () { # Use hyperthreading or not? HPT=$1 NUMBER="" if [ -z "$HPT" ] then HPT=yes fi got_cpu_info () { ERROR="$1" check_status "$ERROR" "$FUNCNAME" "cannot determine number of cpu cores. Specify with -p." } if [ "$HPT" == "yes" ] then if [ "$ARCH" == "Linux" ] then NUMBER=`grep -c ^processor $CPUINFO` got_cpu_info "$?" elif [ "$ARCH" == "Darwin" ] then NUMBER=`sysctl -a hw | grep -w logicalcpu | awk '{ print $2 }'` got_cpu_info "$?" elif [ "$ARCH" == "FreeBSD" ] then NUMBER=`sysctl hw.ncpu | awk '{ print $2 }'` got_cpu_info "$?" elif [ "$ARCH" == "SunOS" ] then NUMBER=`psrinfo | grep -c on-line` got_cpu_info "$?" else if [ -e "$CPUINFO" ] then NUMBER=`grep -c ^processor $CPUINFO` got_cpu_info "$?" fi fi if [ ! -z "$NUMBER" ] then log DSPLY "Found $NUMBER logic processors." fi elif [ "$HPT" == "no" ] then log DSPLY "Hyperthreading is disabled." if [ "$ARCH" == "Linux" ] then PHYSICAL=`grep 'physical id' $CPUINFO` if [ "$?" ] then PHYSICAL=`grep 'physical id' $CPUINFO | sort | uniq | wc -l` if [ "$PHYSICAL" == "1" ] then log DSPLY "Found $PHYSICAL physical CPU." else log DSPLY "Found $PHYSICAL physical CPUs." fi TMP=`grep 'core id' $CPUINFO` if [ "$?" ] then log DEBUG "Starting job only for each physical core on all physical CPU(s)." NUMBER=`grep 'core id' $CPUINFO | sort | uniq | wc -l` log DSPLY "Found $NUMBER physical cores." else log DSPLY "Single core processor(s) detected." log DSPLY "Starting job for each physical CPU." NUMBER=$PHYSICAL fi else log INFO "No 'physical id' section found in $CPUINFO, typical for older cpus." NUMBER=`grep -c ^processor $CPUINFO` got_cpu_info "$?" fi elif [ "$ARCH" == "Darwin" ] then NUMBER=`sysctl -a hw | grep -w physicalcpu | awk '{ print $2 }'` got_cpu_info "$?" elif [ "$ARCH" == "FreeBSD" ] then NUMBER=`sysctl hw.ncpu | awk '{ print $2 }'` got_cpu_info "$?" else NUMBER=`cat $CPUINFO | grep "cpu cores" | cut -d ":" -f 2 | uniq | sed -e s/\ //g` got_cpu_info "$?" fi fi if [ ! -z "$NUMBER" ] then MAX_NO_OF_RUNNING_JOBS=$NUMBER else log ERROR "Number of CPUs not obtained." log ERROR "Please specify manually with -p." set_status "ERROR" exit 1 fi } random_delay () { ARGS="$1" if [ -z "$ARGS" ] then log ERROR "$FUNCNAME Function random delay, no argument specified." set_status ERROR exit 1 fi NUMBER=$RANDOM let "NUMBER %= $ARGS" if [ "$ARCH" == "SunOS" ] then sleep "$NUMBER" else sleep "0.$NUMBER" fi } global_lock () { mkdir $GLOBAL_LOCK > /dev/null 2>&1 ERROR="$?" if [ ! "$ERROR" == "0" ] then return 1 else return 0 fi } get_global_lock () { while true do global_lock ERROR="$?" if [ ! "$ERROR" == "0" ] then #random_delay $MAX_LOCK_DELAY continue else break fi done } release_global_lock () { rm -rf "$GLOBAL_LOCK" } are_jobs_running () { NUMBER_OF_PROCS=`jobs | wc -l` if [ "$NUMBER_OF_PROCS" -gt "1" ] then return 0 else return 1 fi } escape_item () { TMP="$1" ITEM_ESCAPED=`echo "$TMP" | \ sed s/\\ /\\\\\\\\\\\\\\ /g | \ sed s/\\'/\\\\\\\\\\\\\\'/g | \ sed s/\\|/\\\\\\\\\\\\\\|/g | \ sed s/\&/\\\\\\\\\\\\\\&/g | \ sed s/\;/\\\\\\\\\\\\\\;/g | \ sed s/\>/\\\\\\\\\\>/g | \ sed s/\> /dev/null 2>&1" return "$?" } get_input_lock () { while true do exec_cmd "mkdir $INPUT_LOCK >> /dev/null 2>&1 " if [ "$?" ] then log DEBUG "Input lock is obtained..." break else log DEBUG "Input lock is present...sleeping.." sleep 5 fi done } release_input_lock () { exec_cmd "rm -rf $INPUT_LOCK" if [ "$?" ] then log DEBUG "Input lock was released..." return 0 else log ERROR "Input lock was already gone...this should never happen..." return 1 fi } get_all_items () { if [ "$DAEMON" == "1" ] then get_input_lock fi count=0 if [ -z "$INPUT_FILE" ] then if [ ! -z "$SSH_SERVER" ] # Are we running stand-alone or as a node?" then if [ "$RECURSION" == "1" ] then #ITEMS=`exec_cmd "find $SRC_DIR/ ! -type d"` `exec_cmd "find $SRC_DIR/ ! -type d" > "$LOCAL_INPUT_FILE"` check_status "$?" "$FUNCNAME" "Could not list files within remote source directory." else log DEBUG "Recursion is disabled." #ITEMS=`exec_cmd "ls -1 $SRC_DIR"` `exec_cmd "find $SRC_DIR/ -d 1 ! -type d" > "$LOCAL_INPUT_FILE"` check_status "$?" "$FUNCNAME" "Could not list files within remote source directory." fi else if [ -e "$SRC_DIR" ] then if [ "$RECURSION" == "1" ] then log DEBUG "Recursion is enabled." #ITEMS=`find "$SRC_DIR/" ! -type d` `find "$SRC_DIR/" ! -type d >> "$LOCAL_INPUT_FILE"` check_status "$?" "$FUNCNAME" "Could not list files within local source directory." else log DEBUG "Recursion is disabled." #ITEMS=`ls -1 "$SRC_DIR"` `find "$SRC_DIR/" -d 1 ! -type d >> "$LOCAL_INPUT_FILE"` check_status "$?" "$FUNCNAME" "Could not list files within local source directory." fi if [ ! -e "$LOCAL_INPUT_FILE" ] then log ERROR "Local input file is not created, something is wrong. Bug?" set_status "ERROR" cleanup exit 1 fi else ITEMS="" fi fi else if [ ! -z "$SSH_SERVER" ] # Are we running stand-alone or as a slave?" then log DEBUG "Running as node, input file has been pushed (hopefully)." fi if [ ! -e "$INPUT_FILE" ] && [ ! "$INPUT_FILE" == "-" ] then log ERROR "Input file $INPUT_FILE does not exist." set_status "ERROR" cleanup exit 1 fi if [ ! "$INPUT_FILE" == "-" ] then cp "$INPUT_FILE" "$LOCAL_INPUT_FILE" check_status "$?" "$FUNCNAME" "Copy of input file failed!" else log DEBUG "Reading from stdin.." while read LINE do echo "$LINE" >> "$LOCAL_INPUT_FILE" done fi fi if [ "$DAEMON" == "1" ] then release_input_lock fi SIZE_OF_INPUT=$(wc -l "$LOCAL_INPUT_FILE" | awk '{ print $1 }') if [ "$SIZE_OF_INPUT" -le "0" ] then log ERROR "Source file/dir seems to be empty." set_status STOPPED cleanup exit 1 fi } get_item () { check_for_interrupt if [ "$STOP" == "1" ] then return 1 fi get_global_lock SIZE_OF_INPUT=$(wc -l "$LOCAL_INPUT_FILE" | awk '{ print $1 }') # # Return error if the list is empty. # if [ "$SIZE_OF_INPUT" -le "0" ] then release_global_lock return 1 fi # # This variable is used to walk thtough all input file items. # GLOBAL_COUNTER=$(cat $GLOBAL_COUNTER_FILE) # # Check if all items have been processed. # if [ "$GLOBAL_COUNTER" -gt "$SIZE_OF_INPUT" ] then release_global_lock return 1 fi ITEM="$(sed -n $GLOBAL_COUNTER\p $LOCAL_INPUT_FILE)" if [ -z "$ITEM" ] then ((GLOBAL_COUNTER++)) log DEBUG "Item was emtpy..." echo $GLOBAL_COUNTER > $GLOBAL_COUNTER_FILE release_global_lock get_item else ((GLOBAL_COUNTER++)) echo $GLOBAL_COUNTER > $GLOBAL_COUNTER_FILE if [ "$DISABLE_ITEM_LOCK" == "0" ] then lock_item "$ITEM" else log DEBUG "Item lock disabled." fi if [ ! "$?" ] then log DEBUG "Item $ITEM is locked." release_global_lock # # Recursion, get_ttem calls itself, until all items are done. # get_item else log DEBUG "Got lock on $ITEM, processing." release_global_lock download_item "$ITEM" return 0 fi fi } start_single_worker () { # # This function sends an item to the fifo. This signals # the listener process to execute a 'worker' on this # item, using the 'commando' function. # get_item ERROR=$? if [ ! "$ERROR" == "0" ] then # # If no more items are available, the listener should be # informed that a worker just finished / died. # This allows the listener to determine if all processes # are finished and it is time to stop. # echo "$STOP_KEY" > $FIFO return 1 else get_global_lock echo "$ITEM" > $FIFO release_global_lock return 0 fi } stop-ppss () { STOP_PPSS=`get_time_in_seconds` elapsed "$START_PPSS" "$STOP_PPSS" } elapsed () { BEFORE="$1" AFTER="$2" ELAPSED="$(expr $AFTER - $BEFORE)" REMAINDER="$(expr $ELAPSED % 3600)" HOURS="$(expr $(expr $ELAPSED - $REMAINDER) / 3600)" SECS="$(expr $REMAINDER % 60)" MINS="$(expr $(expr $REMAINDER - $SECS) / 60)" RES=$(printf "Total processing time (hh:mm:ss): %02d:%02d:%02d" $HOURS $MINS $SECS) log DSPLY "$RES" } commando () { # # This function will start a chain reaction of events. # # The commando executes a command on an item and, when finished, # executes the start_single_worker. This function selects a new # item and sends it to the fifo. The listener process receives # the item and excutes this commando function on the item. # So in essence, the commando function keeps calling itself # indirectly until no items are left. This will form a single # working queue. By executing multiple start_single_worker # functions based on the CPU cores available, parallel processing # is achieved, with a queue for each core. # ERR_STATE=0 VIRTUAL=0 # # This code tests if the item exist (is physical or virtuel) # Example: a file is physical, a URL is virtual. # ITEM="$1" if [ "$RECURSION" == "1" ] then escape_item "$ITEM" does_file_exist "$ITEM_ESCAPED" ERR_STATE="$?" else escape_item "$ITEM" does_file_exist "$SRC_DIR/$ITEM_ESCAPED" ERR_STATE="$?" fi # # If recursion is used, a file name of an item may not be unique. # The same filename can be used for files in differen directories. # Therefore, the output directory must reflect the original directory # structure. If recursion is not used, this is not necessary. # if [ "$ERR_STATE" == "0" ] then VIRTUAL="0" if [ "$RECURSION" == "1" ] then DIR_NAME=`dirname "$ITEM"` ITEM_NO_PATH=`basename "$ITEM"` OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$DIR_NAME" else DIR_NAME="$SRC_DIR" ITEM_NO_PATH="$ITEM" OUTPUT_DIR="$PPSS_LOCAL_OUTPUT" fi else VIRTUAL="1" DIR_NAME="" ITEM_NO_PATH="$ITEM" escape_item "$ITEM_NO_PATH" OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$ITEM_ESCAPED" fi OUTPUT_FILE="$ITEM_NO_PATH" # # The following lines should only be enabled for debugging. # #log DEBUG "Processing item: $ITEM" #log DEBUG "ITEM_NO_PATH is $ITEM_NO_PATH" #log DEBUG "Dirname is $DIR_NAME" #log DEBUG "OUTPUT DIR IS $OUTPUT_DIR" #log DEBUG "Virtual is $VIRTUAL" #log DEBUG "OUTPUT FILE is $OUTPUT_FILE" # # # Decide if an item must be transfered from server to the node. # or be processed in-place (NFS / SMB mount?) # if [ "$DOWNLOAD_TO_NODE" == "0" ] then if [ "$VIRTUAL" == "1" ] then log DEBUG "Item is virtual, thus not downloading." else log DEBUG "Using item straight from the server, no copy." if [ "$RECURSION" == "0" ] then ITEM="$SRC_DIR/$ITEM" else ITEM="$ITEM" fi fi else if [ "$RECURSION" == "1" ] then ITEM="$PPSS_LOCAL_TMPDIR/$ITEM" else ITEM="$PPSS_LOCAL_TMPDIR/$ITEM_NO_PATH" fi fi # # Create the log file containing the output of the command. # LOG_FILE_NAME=`echo "$ITEM" | $MD5 | awk '{ print $1 }'` ITEM_LOG_FILE="$JOB_LOG_DIR/$LOG_FILE_NAME" # # Create the output directory that will contain the output of the command. # Example: When converting wav to mp3, the mp3 will be put in this directory. # if [ "$VIRTUAL" == "0" ] then if [ "$RECURSION" == "1" ] then OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$DIR_NAME"/"$ITEM_NO_PATH" else OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$ITEM_NO_PATH" fi else # # If the item is virtual, the item can contain special characters. # These characters are stripted from the log file name, so this is used. # OUTPUT_DIR="$PPSS_LOCAL_OUTPUT" fi log DEBUG "Local output dir is $OUTPUT_DIR" # # FIXME! # if [ "$PPSS_OUTPUT" == "1" ] then mkdir -p "$OUTPUT_DIR" fi ERROR="" # # Some formatting of item log files. # DATE=`date +%b\ %d\ %H:%M:%S` echo "===== PPSS Item Log File =====" > "$ITEM_LOG_FILE" echo -e "Host:\t\t$HOSTNAME" >> "$ITEM_LOG_FILE" echo -e "Process:\t$PID" >> "$ITEM_LOG_FILE" echo -e "Item:\t\t$ITEM" >> "$ITEM_LOG_FILE" echo -e "Start date:\t$DATE" >> "$ITEM_LOG_FILE" echo -e "" >> "$ITEM_LOG_FILE" # # The actual execution of the command as specified by # the -c option. # BEFORE=`get_time_in_seconds` TMP=`echo $COMMAND | grep -i '$ITEM'` if [ "$?" ] then eval "$COMMAND" >> "$ITEM_LOG_FILE" 2>&1 ERROR="$?" MYPID="$!" else eval '$COMMAND"$ITEM" >> "$ITEM_LOG_FILE" 2>&1' ERROR="$?" MYPID="$!" fi AFTER=`get_time_in_seconds` echo -e "" >> "$ITEM_LOG_FILE" # Some error logging. Success or fail. if [ ! "$ERROR" == "0" ] then echo -e "Status:\t\tFAILURE" >> "$ITEM_LOG_FILE" else echo -e "Status:\t\tSUCCESS" >> "$ITEM_LOG_FILE" fi # # If part of a cluster, remove the downloaded item after # it has been processed and uploaded as not to fill up disk space. # if [ "$DOWNLOAD_TO_NODE" == "1" ] then if [ -e "$ITEM" ] then rm -rf "$ITEM" else log DEBUG "There is no local file to remove.. strange..." fi fi # # Create remote output dir and transfer output to server. # escape_item "$DIR_NAME" ITEM_OUTPUT_DIR="$REMOTE_OUTPUT_DIR/$ITEM_ESCAPED" if [ "$DOWNLOAD_TO_NODE" == "0" ] then log DEBUG "Download to node is disabled." else if [ "$DIR_NAME" == "." ] then DIR_NAME="" fi fi # # Upload the output file back to the server. # upload_item "$OUTPUT_DIR" "$DIR_NAME" # # Upload the log file to the server. # elapsed "$BEFORE" "$AFTER" >> "$ITEM_LOG_FILE" echo -e "" >> "$ITEM_LOG_FILE" if [ ! -z "$SSH_SERVER" ] then log DEBUG "Uploading item log file $ITEM_LOG_FILE to master $PPSS_HOME_DIR/$JOB_LOG_DIR" scp -q $SSH_OPTS $SSH_KEY "$ITEM_LOG_FILE" $USER@$SSH_SERVER:$PPSS_HOME_DIR/$JOB_LOG_DIR if [ ! "$?" ] then log DEBUG "Uploading of item log file failed." fi fi start_single_worker return $? } # # This is the listener service. It listens on the pipe for events. # A job is executed for every event received. # This listener enables fully asynchronous processing. # listen_for_job () { FINISHED=0 DIED=0 PIDS="" log DEBUG "Listener started." while read event <& 42 do if [ "$event" == "$STOP_KEY" ] then # # The start_single_worker method sends a special signal to # inform the listener that a worker is finished. # If all workers are finished, it is time to stop. # This mechanism makes PPSS asynchronous. # ((DIED++)) if [ "$DIED" -ge "$MAX_NO_OF_RUNNING_JOBS" ] then if [ "$DAEMON" == "1" ] then # # In daemon mode, start all over again. # DIED=0 get_all_items log DEBUG "Found $SIZE_OF_INPUT items." start_all_workers sleep 10 else break fi else RES=$((MAX_NO_OF_RUNNING_JOBS-DIED)) if [ "$RES" == "1" ] && [ "$QUIET" == "0" ] then log PRCNT "$((MAX_NO_OF_RUNNING_JOBS-DIED)) job is remaining. " elif [ "$QUIET" == "0" ] then if [ "$DIED" == "1" ] then echo -en "\n" fi log PRCNT "$((MAX_NO_OF_RUNNING_JOBS-DIED)) jobs are remaining. " fi fi elif [ "$event" == "$KILL_KEY" ] then # # If all workers are finished, it is time to kill all remaining # processes, terminate ssh processes and the listener itself. # # This command kills all processes that are related to the master # process as defined by $PID. All processes that have ever been # spawned, although disowned or backgrounded will be killed... # PROCLIST=`ps a -o pid,pgid,ppid,command | grep [0-9] | grep $PID | grep -v -i grep` oldIFS=$IFS # save the field separator IFS=$'\n' # new field separator, the end of line for x in `echo "$PROCLIST"` do MYPPID=`echo $x | awk '{ print $3 }'` MYPID=`echo $x | awk '{ print $1 }'` if [ ! "$MYPPID" == "$PID" ] && [ ! "$MYPPID" == "1" ] then if [ ! "$MYPID" == "$PID" ] then log DEBUG "Killing process $MYPID" kill $MYPID >> /dev/null 2>&1 else log DEBUG "Not killing master process..$MYPID.." fi else log DEBUG "Not killing listener process. $MYPID.." fi done IFS=$oldIFS break else commando "$event" & MYPID="$!" disown PIDS="$PIDS $MYPID" fi get_global_lock SIZE_OF_INPUT=$(wc -l "$LOCAL_INPUT_FILE" | awk '{ print $1 }') GLOBAL_COUNTER=$(cat $GLOBAL_COUNTER_FILE) release_global_lock PERCENT=$((100 * $GLOBAL_COUNTER / $SIZE_OF_INPUT )) if [ "$DIED" == "0" ] && [ "$FINISHED" == "0" ] then if [ "$QUIET" == "0" ] then log PRCNT "Currently $PERCENT percent complete. Processed $GLOBAL_COUNTER of $SIZE_OF_INPUT items." elif [ "$DAEMON" == "0" ] then echo -en "\r$PERCENT%" fi if [ "$PERCENT" == "100" ] then if [ "$QUIET" == "1" ] then echo fi FINISHED=1 fi fi done if [ ! -z "$SSH_MASTER_PID" ] then log DEBUG "SSH master PID is $SSH_MASTER_PID" kill "$SSH_MASTER_PID" else log DEBUG "SSH master PID is empty." fi set_status STOPPED log DEBUG "Listener stopped." if [ ! "$PERCENT" == "100" ] then echo stop-ppss log DSPLY "Finished. Consult $JOB_LOG_DIR for job output." log DSPLY "Press ENTER to continue." else stop-ppss log DSPLY "Finished. Consult $JOB_LOG_DIR for job output." fi if [ "$QUIET" == "1" ] then echo fi cleanup } # This starts an number of parallel workers based on the # of parallel jobs allowed. start_all_workers () { if [ "$MAX_NO_OF_RUNNING_JOBS" == "1" ] then log DSPLY "Starting one (1) single worker." else log DSPLY "Starting $MAX_NO_OF_RUNNING_JOBS parallel workers." fi if [ "$DAEMON" == "0" ] then log DSPLY "---------------------------------------------------------" fi i=0 while [ "$i" -lt "$MAX_NO_OF_RUNNING_JOBS" ] do start_single_worker ((i++)) if [ ! "$MAX_DELAY" == "0" ] then random_delay "$MAX_DELAY" fi done } get_status_of_node () { NODE="$1" STATUS=`ssh -o ConnectTimeout=10 $SSH_KEY $USER@$NODE cat "$PPSS_HOME_DIR/$NODE_STATUS" 2>/dev/null` ERROR="$?" if [ ! "$ERROR" == "0" ] then STATUS="UNKNOWN" fi echo "$STATUS" } show_status () { source $CONFIG if [ ! -z "$SSH_KEY" ] then SSH_KEY="-i $SSH_KEY" fi if [ -z "$INPUT_FILE" ] then if [ "$RECURSION" == "1" ] then ITEMS=`exec_cmd "find $SRC_DIR ! -type d 2>/dev/null | wc -l" 1` else ITEMS=`exec_cmd "ls -1 $SRC_DIR 2>/dev/null | wc -l" 1` fi else ITEMS=`exec_cmd "cat $PPSS_HOME_DIR/$LOCAL_INPUT_FILE 2>/dev/null | wc -l" 1` fi ITEMS=`echo $ITEMS | sed s/\ //g` if [ ! -z "$ITEMS" ] && [ ! "$ITEMS" == "0" ] then PROCESSED=`exec_cmd "ls -1 $PPSS_HOME_DIR/$ITEM_LOCK_DIR 2>/dev/null | wc -l" 1` 2>&1 >> /dev/null TMP_STATUS=$((100 * $PROCESSED / $ITEMS)) log DSPLY "Status:\t\t$TMP_STATUS percent complete." else log DSPLY "Status: UNKNOWN - is PPSS deployed on nodes?" fi if [ ! -z $NODES_FILE ] then TMP_NO=`cat $NODES_FILE | wc -l` log DSPLY "Nodes:\t $TMP_NO" fi log DSPLY "Items:\t\t$ITEMS" log DSPLY "---------------------------------------------------------" HEADER=`echo IP-address Hostname Processed Status | awk '{ printf ("%-16s %-18s % 10s %10s\n",$1,$2,$3,$4) }'` log DSPLY "$HEADER" log DSPLY "---------------------------------------------------------" PROCESSED=0 for x in `cat $NODES_FILE` do RES=0 NODE=`get_status_of_node "$x" | awk '{ print $1 }'` if [ ! "$NODE" == "UNKNOWN" ] then STATUS=`get_status_of_node "$x" | awk '{ print $2 }'` RES=`exec_cmd "grep -i $NODE ~/$PPSS_HOME_DIR/$JOB_LOG_DIR/* 2>/dev/null | wc -l " 1` if [ ! "$?" ] || [ -z "$RES" ] then RES=0 fi else STATUS="UNKNOWN" RES=0 fi let PROCESSED="$PROCESSED+$RES" LINE=`echo "$x $NODE $RES $STATUS" | awk '{ printf ("%-16s %-18s % 10s %10s\n",$1,$2,$3,$4) }'` log DSPLY "$LINE" done log DSPLY "---------------------------------------------------------" LINE=`echo $PROCESSED | awk '{ printf ("Total processed: % 29s\n",$1) }'` log DSPLY "$LINE" } # If this is called, the whole framework will execute. main () { case $MODE in node ) create_working_directory test_server init_vars get_all_items listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & 2>&1 >> /dev/null LISTENER_PID=$! start_all_workers ;; start ) # This option only starts all nodes. LOGFILE=/dev/null display_header if [ ! -e "$NODES_FILE" ] then log ERROR "File $NODES with list of nodes does not exist." set_status STOPPED cleanup exit 1 else for NODE in `cat $NODES_FILE` do start_ppss_on_node "$NODE" done fi cleanup exit 0 ;; config ) LOGFILE=/dev/null display_header log DSPLY "Generating configuration file $CONFIG" add_var_to_config PPSS_LOCAL_TMPDIR "$PPSS_LOCAL_TMPDIR" add_var_to_config PPSS_LOCAL_OUTPUT "$PPSS_LOCAL_OUTPUT" cleanup exit 0 ;; stop ) LOGFILE=/dev/null display_header log DSPLY "Stopping PPSS on all nodes." exec_cmd "touch $STOP_SIGNAL" cleanup exit ;; pause ) LOGFILE=/dev/null display_header log DSPLY "Pausing PPSS on all nodes." exec_cmd "touch $PAUSE_SIGNAL" cleanup exit ;; continue ) LOGFILE=/dev/null display_header if does_file_exist "$STOP_SIGNAL" then log DSPLY "Continuing processing, please use $0 start to start PPSS on al nodes." exec_cmd "rm -f $STOP_SIGNAL" fi if does_file_exist "$PAUSE_SIGNAL" then log DSPLY "Continuing PPSS on all nodes." exec_cmd "rm -f $PAUSE_SIGNAL" fi cleanup exit 0 ;; deploy ) LOGFILE=/dev/null display_header log DSPLY "Deploying PPSS on nodes." deploy_ppss wait cleanup exit 0 ;; ec2) EC2=1 LOGFILE=/dev/null display_header log INFO "Deploying PPSS on EC2 nodes." ec2_launch_nodes deploy_ppss wait cleanup exit 0 ;; status ) LOGFILE=/dev/null display_header get_all_items show_status exit 0 ;; erase ) LOGFILE=/dev/null display_header log DSPLY "Erasing PPSS from all nodes." erase_ppss cleanup exit 0 ;; kill ) LOGFILE=/dev/null for x in `ps ux | grep ppss | grep -v grep | grep bash | awk '{ print $2 }'` do kill "$x" done cleanup exit 0 ;; * ) create_working_directory display_header init_vars get_all_items listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & 2>&1 >> /dev/null LISTENER_PID=$! start_all_workers ;; esac } if ! are_we_sourced then # # First step: process all command-line arguments. # process_arguments "$@" # # This command starts the that sets the whole framework in motion. # But only if the file is not sourced. # main # # Exit after all processes have finished. # wait fi