#!/usr/bin/env bash # # PPSS, the Parallel Processing Shell Script # # Copyright (c) 2010, Louwrentius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # See # for a copy of the GNU General Public License # # "Patches or other contributions are always welcome!" # # # Handling control-c for a clean shutdown. # trap 'kill_process' SIGINT SCRIPT_NAME="Distributed Parallel Processing Shell Script" SCRIPT_VERSION="2.98" # # The first argument to this script can be a mode. # MODES="node start config stop pause continue deploy status erase kill" for x in $MODES do if [[ "$x" == "$1" ]] then MODE="$1" shift break fi done # # The working directory of PPSS can be set with # export PPSS_DIR=/path/to/workingdir # if [[ -z "$PPSS_DIR" ]] then PPSS_DIR="ppss_dir" fi CONFIG="" HOSTNAME="$(hostname)" ARCH="$(uname)" PPSS_HOME_DIR="ppss-home" SOURCED="$0" PID="$$" PAUSE_SIGNAL="$PPSS_HOME_DIR/$PPSS_DIR/pause_signal" # Pause processing if this file is present. PAUSE_DELAY="60" # Polling every 1 minutes by default. STOP_SIGNAL="$PPSS_HOME_DIR/$PPSS_DIR/stop_signal" # Stop processing if this file is present. GLOBAL_COUNTER=1 LISTOFITEMS="$PPSS_DIR/INPUT_FILE-$PID" JOB_LOG_DIR="$PPSS_DIR/job_log" # Directory containing log files of processed items. LOGFILE="$PPSS_DIR/ppss-log-$PID.txt" # General PPSS log file. Contains lots of info. FAILED_ITEMS_COUNTER=0 if [[ -z "$QUIET" ]] then QUIET="0" fi STOP="0" # STOP job. MAX_DELAY="0" # MAX DELAY between jobs. MAX_LOCK_DELAY="9" # PERCENT="0" LISTENER_PID="" IFS_BACKUP="$IFS" CPUINFO="/proc/cpuinfo" PROCESSORS="" START_KEY="start-$RANDOM$RANDOM$RANDOM$RANDOM" # If this key is received by listener, start a new process FAIL_KEY="fail-$RANDOM$RANDOM$RANDOM$RANDOM" # if this key is received by listener, increase error count KILL_KEY="kill-$RANDOM$RANDOM$RANDOM$RANDOM" # This is a signal to stop immediately and kill PPSS QUEUE="" INOTIFY="" TRAVERSAL="1" # all running processes. START_PPSS="" STOP_PPSS="" SIZE_OF_INPUT="" LOCAL_LOCKING="1" LIST_OF_PROCESSED_ITEMS="$PPSS_DIR/LIST_OF_PROCESSED_ITEMS" PROCESSED_ITEMS="" UNPROCESSED_ITEMS="" FAILED_ITEMS="$PPSS_DIR/LIST_OF_FAILED_ITEMS" ACTIVE_WORKERS="0" DAEMON_POLLING_INTERVAL="10" STAT="" DAEMON_FILE_AGE="4" ENABLE_INPUT_LOCK="0" PROCESSING_TIME="" NODE_ID="NODE_ID" USE_MD5="0" RANDOMIZE="0" SSH_SERVER="" # Remote server or 'master'. SSH_KEY="" # SSH key for ssh account. SSH_KNOWN_HOSTS="" SSH_SOCKET="$PPSS_DIR/ppss_ssh_socket-$$" # Multiplex multiple SSH connections over 1 master. SSH_OPTS="-o BatchMode=yes -o ControlPath=$SSH_SOCKET \ -o GlobalKnownHostsFile=./known_hosts \ -o ControlMaster=auto \ -o Cipher=blowfish \ -o ConnectTimeout=10 " SSH_OPTS_NOMP="-o BatchMode=yes -o GlobalKnownHostsFile=./known_hosts \ -o Cipher=blowfish \ -o ConnectTimeout=10 " # Blowfish is faster but still secure. SSH_MASTER_PID="" ITEM_LOCK_DIR="$PPSS_DIR/PPSS_ITEM_LOCK_DIR" # Remote directory on master used for item locking. PPSS_LOCAL_TMPDIR="$PPSS_DIR/PPSS_LOCAL_TMPDIR" # Local directory on slave for local processing. PPSS_LOCAL_OUTPUT="$PPSS_DIR/PPSS_LOCAL_OUTPUT" # Local directory on slave for local output. DOWNLOAD_TO_NODE="0" # Transfer item to slave via (s)cp. UPLOAD_TO_SERVER="0" # Transfer output back to server via (s)cp. SECURE_COPY="1" # If set, use SCP, Otherwise, use cp. REMOTE_OUTPUT_DIR="" # Remote directory to which output must be uploaded. SCRIPT="" # Custom user script that is executed by ppss. ITEM_ESCAPED="" DISABLE_SKIPPING=0 PPSS_NODE_STATUS="$PPSS_DIR/NODE_STATUS" NODE_STATUS_FILE="$PPSS_NODE_STATUS/$HOSTNAME-status.txt" DAEMON=0 EMAIL="" REGISTER="" # For STACK STACK="" TMP_STACK="" showusage_short () { echo echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION" echo echo "usage: $0 [[ -d | -f ]] [[ -c ' \"\$ITEM\"' ]] " echo " [[ -C ]] [[ -j ]] [[ -l ]] [[ -p <# jobs> ]] " echo " [[ -q ]] [[ -D ]] [[ -h ]] [[ --help ]] [[ -r ]] [[ --daemon ]] " echo echo "Examples:" echo " $0 -d /dir/with/some/files -c 'gzip '" echo " $0 -d /dir/with/some/files -c 'cp \"\$ITEM\" /tmp' -p 2" echo " $0 -f -c 'wget -q -P /destination/directory \"\$ITEM\"' -p 10" echo } showusage_normal () { showusage_head showusage_basic showusage_basic_example } showusage_long () { showusage_extended_head showusage_basic showusage_extended_body } showusage_head () { echo echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION" echo echo "PPSS is a Bash shell script that executes commands in parallel on a set" echo "of items, such as files in a directory, or lines in a file. The purpose" echo "of PPSS is to make it simple to benefit from multiple CPUs or CPU cores." echo echo "This short summary only discusses options for stand-alone mode. For a" echo "full listing of all options, run PPSS with the options --help" echo } showusage_basic () { echo "Usage $0 [[ options ]] " echo echo -e "--command | -c Command to execute. Syntax: ' ' including the single quotes." echo -e " Example: -c 'ls -alh '. It is also possible to specify where an item " echo -e " must be inserted: 'cp \"\$ITEM\" /somedir'." echo echo -e "--sourcedir | -d Directory that contains files that must be processed. Individual files" echo -e " are fed as an argument to the command that has been specified with -c." echo echo -e "--sourcefile | -f Each single line of the supplied file will be fed as an item to the" echo -e " command that has been specified with -c. Read input from stdin with" echo -e " -f -" echo echo -e "--config | -C If the mode is config, a config file with the specified name will be" echo -e " generated based on all the options specified. In the other modes". echo -e " this option will result in PPSS reading the config file and start" echo -e " processing items based on the settings of this file." echo echo -e "--disable-ht | -j Disable hyper threading. Is enabled by default." echo echo -e "--log | -l Sets the name of the log file. The default is ppss-log.txt." echo echo -e "--processes | -p Start the specified number of processes. Ignore the number of available" echo -e " CPUs." echo echo -e "--quiet | -q Shows no output except for a progress indication using percents." echo echo -e "--delay | -D Adds an initial random delay to the start of all parallel jobs to spread" echo -e " the load. The delay (seconds) is only used at the start of all 'threads'." echo echo -e "--daemon Daemon mode. Do not exit after items are professed, but keep looking " echo -e " for new items and process them. Read the manual how to use this!" echo -e " See --help for important additional options regarding daemon mode." echo echo -e "--disable-inotify Linux users can use real-time inotify filesystem events when using" echo -e " daemon mode. Requires inotify-tools. Enabled by default if available." echo -e " Automatically disabled if NFS is used as the daeon source dir." echo echo -e "--no-traversal|-r By default, PPSS uses the regular 'find' command to list all files" echo -e " within the directory specified by the -d option. If you do not wish" echo -e " for PPSS to process files in sub directories, use this option." echo -e " Only files within the specified directory will be processed. Any" echo -e " subdirectories will then be ignored." echo echo -e "--email | -e PPSS sends an e-mail if PPSS has finished. It is also used if processing" echo -e " of an item has failed (configurable, see -h). " echo echo -e "--debug Enable debugging output to the |P|P|S|S| log file." echo echo -e "--help Extended help, including options for distributed mode." } showusage_basic_example () { echo echo -e "Example: encoding some wav files to mp3 using lame:" echo echo -e "$0 -d /path/to/wavfiles -c 'lame '" echo echo -e "Extended usage: use --help" echo } showusage_extended_head () { echo echo "|P|P|S|S| $SCRIPT_NAME $SCRIPT_VERSION" echo echo "PPSS is a Bash shell script that executes commands in parallel on a set " echo "of items, such as files in a directory, or lines in a file." echo echo "Usage: $0 [[ MODE ]] [[ options ]] " echo echo "Modes are optional and mainly used for running in distributed mode. Modes are:" echo echo " config Generate a config file based on the supplied option parameters." echo " deploy Deploy PPSS and related files on the specified nodes." echo " erase Erase PPSS and related files from the specified nodes." echo echo " start Starting PPSS on nodes." echo " pause Pausing PPSS on all nodes." echo " stop Stopping PPSS on all nodes." echo " continue Continuing PPSS on all nodes." echo " node Running PPSS as a node, requires additional options." } showusage_extended_body () { echo echo -e "The following options are used for distributed execution of PPSS." echo echo -e "--master | -m Specifies the SSH server that is used for communication between nodes." echo -e " Using SSH, file locks are created, informing other nodes that an item " echo -e " is locked. If items are files that must be processed, they must reside" echo -e " on this host. SCP is used to transfer files from this host to nodes" echo -e " for local procesing." echo echo -e "--node | -n File containig a list of nodes that act as PPSS clients. One IP / DNS" echo -e " name per line." echo echo -e "--key | -k The SSH key that a node uses to connect to the master." echo echo -e "--known-hosts | -K The file that contains the server public key. Can often be found on " echo -e " hosts that already once connected to the server. See the file " echo -e " ~/.ssh/known_hosts or else, manualy connect once and check this file." echo echo -e "--user | -u The SSH user name that is used by the node when logging in into the" echo -e " master SSH server." echo echo -e "--script | -S Specifies the script/program that must be copied to the nodes for" echo -e " execution through PPSS. Only used in the deploy mode." echo -e " This option should be specified if necessary when generating a config." echo echo -e "--download This option specifies that an item will be downloaded by the node" echo -e " from the server or share to the local node for processing." echo echo -e "--upload This option specifies that the output file will be copied back to" echo -e " the server, the --outputdir option is mandatory." echo echo -e "--no-scp | -b Do not use scp for downloading items. Use cp instead. Assumes that a" echo -e " network file system (NFS/SMB) is mounted under a local mount point." echo echo -e "--outputdir | -o Directory on server where processed files are put. If the result of " echo -e " encoding a wav file is an mp3 file, the mp3 file is put in the " echo -e " directory specified with this option." echo echo -e "--homedir | -H Directory in which PPSS is installed on the node." echo -e " Default is '$PPSS_HOME_DIR'." echo echo -e "--script | -S Script to run on the node. PPSS must copy this script to the node." echo echo -e "--randomize | -R Randomise which items to process by the client in distributed mode." echo -e " This makes sure that with many nodes, it is prevented that some" echo -e " clients spend all their time trying to get a lock on an item." echo echo -e "Example: encoding some wav files to mp3 using lame:" echo echo -e "$0 -c 'lame ' -d /path/to/wavfiles -j " echo echo -e "Running PPSS based on a configuration file." echo echo -e "$0 -C config.cfg" echo echo -e "Generating a configuration file. Wavs are converted to mp3. SCP is used for data transfer." echo echo -e "$0 config -C ppss-config.cfg -d /some/dir -o output --download --upload -K known_hosts \\" echo -e "-k ppss-key.dsa -n nodes.txt -m 10.0.0.100 \\" echo -e "-c 'lame --quiet \"\$ITEM\" -o \"\$OUTPUT_DIR/\$OUTPUT_FILE\".mp3' " echo echo -e "Running PPSS on a client as part of a cluster." echo echo -e "$0 node -d /somedir -c 'cp \"\$ITEM\" /some/destination' -m 10.0.0.50 -u ppss -k ppss-key.key" echo } kill_process () { echo "$KILL_KEY" >> "$FIFO" } exec_cmd () { STATUS="" CMD="$1" NOMP="$2" # Disable multiplexing. if [[ "$ARCH" == "FreeBSD" ]] then CMD="bash $CMD" fi if [[ ! -z "$SSH_SERVER" ]] then if [[ -z "$NOMP" ]] then ssh $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER $CMD STATUS=$? elif [[ "$NOMP" == "1" ]] then ssh $SSH_OPTS_NOMP $SSH_KEY $USER@$SSH_SERVER $CMD STATUS=$? fi else eval "$CMD" STATUS=$? log DEBUG "LOCAL EXEC - status is $STATUS" fi return $STATUS } does_file_exist () { # # this function makes remote or local checking of existence of items transparent. # FILE="$1" RES=$(exec_cmd "ls -1 $FILE" 2>&1) if [[ "$?" == "0" ]] then return 0 else return 1 fi } check_for_interrupt () { # # PPSS can be interupted with a stop or pause command. # does_file_exist "$STOP_SIGNAL" if [[ "$?" == "0" ]] then set_status "STOPPED" "$FAILED_ITEMS_COUNTER" log INFO "STOPPING job. Stop signal found." STOP="1" return 1 fi does_file_exist "$PAUSE_SIGNAL" if [[ "$?" == "0" ]] then set_status "PAUSED" "$FAILED_ITEMS_COUNTER" log INFO "PAUSE: sleeping for $PAUSE_DELAY SECONDS." sleep $PAUSE_DELAY check_for_interrupt else set_status "RUNNING" "$FAILED_ITEMS_COUNTER" fi } cleanup () { log DEBUG "$FUNCNAME - Cleaning up all temp files and processes. $1" for x in $MODES do if [[ "$x" == "$MODE" ]] then if [[ "$x" != "node" ]] then rm -rf "$PPSS_DIR" fi fi done if [[ -e "$FIFO" ]] then rm "$FIFO" fi if [[ -e "$FIFO_LISTENER" ]] then rm "$FIFO_LISTENER" fi if [[ -e "$SSH_SOCKET" ]] then rm -rf "$SSH_SOCKET" fi if [[ ! -z "$SSH_MASTER_PID" ]] then kill "$SSH_MASTER_PID" fi } add_var_to_config () { if [[ "$MODE" == "config" ]] then VAR="$1" VALUE="$2" echo -e "$VAR=$VALUE" >> $CONFIG fi } is_var_empty () { if [[ -z "$1" ]] then showusage_ cleanup exit 1 fi } detect_source_dir_nfs_exported () { #log DEBUG "Executing $FUNCNAME" NFS=0 NFS_MOUNTED_FS=$(mount | grep ":" | cut -d ":" -f 2 | awk '{ print $3 }') for mount in $NFS_MOUNTED_FS do # # If this for loop matches anything, the SRC_DIR is NFS exported. # inotify does not play well with NFS. So it must be disabled. # DIRECTORY="$SRC_DIR" while true do #log DEBUG "NFS TEST: $mount vs. $DIRECTORY" if [[ $DIRECTORY != [/.]* ]] then if [[ "$mount" == "$DIRECTORY" ]] then NFS=1 break fi else break fi DIRECTORY=$(dirname "$DIRECTORY") done done if [[ "$NFS" == "1" ]] then #log DEBUG "Source directory is NFS exported. Disabling inotify." return 1 else #log DEBUG "Source directory is NOT NFS exported. Enabling inotify." return 0 fi } detect_inotify () { if [[ -e /usr/bin/inotifywait && "$INOTIFY" != "0" ]] && detect_source_dir_nfs_exported then INOTIFY=1 else INOTIFY=0 fi } process_arguments () { # # Process any command-line options that are specified." # if [[ "$#" == "0" ]] then showusage_short exit 1 fi while [[ $# -gt 0 ]] do case $1 in --config|-C ) CONFIG="$2" is_var_empty "$CONFIG" if [[ "$MODE" == "config" ]] then if [[ -e "$CONFIG" ]] then echo "Do want to overwrite existing config file? [y/n]" read yn if [[ "$yn" == "y" ]] || [[ "$yn" == "yes" ]] then rm "$CONFIG" else echo "Aborting..." cleanup exit 1 fi fi fi if [[ "$MODE" != "config" ]] then source $CONFIG fi if [[ ! -z "$SSH_KEY" ]] then SSH_KEY="-i $SSH_KEY" fi if [[ ! -e "./known_hosts" ]] then if [[ -e $SSH_KNOWN_HOSTS ]] then if [[ "$SSH_KNOWN_HOSTS" != "known_hosts" ]] then cat $SSH_KNOWN_HOSTS > ./known_hosts fi else echo "File $SSH_KNOWN_HOSTS does not exist." exit 1 fi fi shift 2 ;; --working-dir|-w ) PPSS_DIR="$2" add_var_to_config PPSS_DIR "$PPSS_DIR" shift 2 ;; --node|-n ) NODES_FILE="$2" add_var_to_config NODES_FILE "$NODES_FILE" shift 2 ;; --sourcefile|-f ) INPUT_FILE="$2" is_var_empty "$INPUT_FILE" add_var_to_config INPUT_FILE "$INPUT_FILE" shift 2 ;; --sourcedir|-d ) SRC_DIR="$2" is_var_empty "$SRC_DIR" add_var_to_config SRC_DIR "$SRC_DIR" shift 2 ;; --delay|-D) MAX_DELAY="$2" add_var_to_config MAX_DELAY "$MAX_DELAY" shift 2 ;; --disable-inotify) INOTIFY=0 add_var_to_config INOTIFY "$INOTIFY" shift 1 ;; --enable-input-lock) ENABLE_INPUT_LOCK=1 add_var_to_config ENABLE_INPUT_LOCK "$ENABLE_INPUT_LOCK" shift 1 ;; --daemon) DAEMON="1" QUIET="1" detect_inotify add_var_to_config DAEMON "$DAEMON" add_var_to_config QUIET "$QUIET" add_var_to_config INOTIFY "$INOTIFY" shift 1 ;; --interval) is_var_empty "$2" DAEMON_POLLING_INTERVAL="$2" add_var_to_config DAEMON_POLLING_INTERVAL "$DAEMON_POLLING_INTERVAL" shift 2 ;; --file-age) is_var_empty "$2" add_var_to_config DAEMON_FILE_AGE "$DAEMON_FILE_AGE" shift 2 ;; --email|-e) is_var_empty "$2" EMAIL="$2" add_var_to_config EMAIL "$EMAIL" shift 2 ;; --command|-c ) COMMAND="$2" is_var_empty "$COMMAND" if [[ "$MODE" == "config" ]] then COMMAND=\'$COMMAND\' add_var_to_config COMMAND "$COMMAND" fi shift 2 ;; -h ) showusage_normal exit 1 ;; --help) showusage_long exit 1 ;; --homedir|-H ) is_var_empty "$2" PPSS_HOME_DIR="$2" add_var_to_config PPSS_DIR $PPSS_HOME_DIR shift 2 ;; --disable-ht|-j ) HYPERTHREADING=no add_var_to_config HYPERTHREADING $HYPERTHREADING shift 1 ;; --log|-l ) LOGFILE="$2" add_var_to_config LOGFILE "$LOGFILE" shift 2 ;; --no-traversal|-r ) TRAVERSAL="0" add_var_to_config LOGFILE "$TRAVERSAL" shift 1 ;; --workingdir|-w ) WORKINGDIR="$2" add_var_to_config WORKINGDIR "$WORKINGDIR" shift 2 ;; --md5|-M ) USE_MD5="1" add_var_to_config USE_MD5 "$USE_MD5" shift 1 ;; --key|-k ) SSH_KEY="$2" is_var_empty "$SSH_KEY" add_var_to_config SSH_KEY "$SSH_KEY" if [[ ! -z "$SSH_KEY" ]] then SSH_KEY="-i $SSH_KEY" fi shift 2 ;; --known-hosts | -K ) SSH_KNOWN_HOSTS="$2" add_var_to_config SSH_KNOWN_HOSTS "$SSH_KNOWN_HOSTS" shift 2 ;; --no-scp |-b ) SECURE_COPY=0 add_var_to_config SECURE_COPY "$SECURE_COPY" shift 1 ;; --randomize |-R ) RANDOMIZE=1 add_var_to_config RANDOMIZE "$RANDOMIZE" shift 1 ;; --outputdir|-o ) REMOTE_OUTPUT_DIR="$2" add_var_to_config REMOTE_OUTPUT_DIR "$REMOTE_OUTPUT_DIR" shift 2 ;; --processes|-p ) is_var_empty "$2" MAX_NO_OF_RUNNING_JOBS="$2" add_var_to_config MAX_NO_OF_RUNNING_JOBS "$MAX_NO_OF_RUNNING_JOBS" shift 2 ;; --master|-m ) SSH_SERVER="$2" add_var_to_config SSH_SERVER "$SSH_SERVER" shift 2 ;; --script|-S ) SCRIPT="$2" add_var_to_config SCRIPT "$SCRIPT" shift 2 ;; --debug) PPSS_DEBUG="1" add_var_to_config PPSS_DEBUG "$PPSS_DEBUG" shift 1 ;; --download) DOWNLOAD_TO_NODE="1" add_var_to_config DOWNLOAD_TO_NODE "$DOWNLOAD_TO_NODE" shift 1 ;; --upload) if [[ -z "$REMOTE_OUTPUT_DIR" ]] then echo "ERROR: no server-side output directory specified with -o" exit 1 fi UPLOAD_TO_SERVER="1" add_var_to_config UPLOAD_TO_SERVER "$UPLOAD_TO_SERVER" shift 1 ;; --quiet|-q ) QUIET="1" add_var_to_config QUIET "$QUIET" shift 1 ;; --user|-u ) USER="$2" add_var_to_config USER "$USER" shift 2 ;; --version|-v ) echo "" echo "$SCRIPT_NAME version $SCRIPT_VERSION" echo "" exit 0 ;; * ) showusage_short echo echo "Unknown option $1 " echo exit 1 ;; esac done if [[ -z "$SRC_DIR" && -z "$INPUT_FILE" ]] then showusage_short echo echo "No source file or directory specified with -f or -d." exit 1 fi if [[ ! -e "$SRC_DIR" && -z "$MODE" && -z "$INPUT_FILE" ]] then showusage_short echo echo "Source directory $SRC_DIR does not exist." exit 1 fi if [[ "$SRC_DIR" == "." ]] then echo echo "|P|P|S|S| is not designed to process items from within the directory" echo "it is being run. PPSS will start to process its own files from" echo "its working directory $PPSS_DIR which is probably not wat you" echo "want. Are you sure you want to continue?" echo read YN if [[ "$YN" != "y" && "$YN" != "Y" ]] then exit 1 fi fi if [[ "$DAEMON" == "1" && -z "$SRC_DIR" ]] then showusage_short echo echo "Daemon monitors a specified directory (with the -d option) for files to process." echo "Read the on-line manual for more information." exit 1 fi } display_header () { log DSPLY "" log DSPLY "=========================================================" log DSPLY " |P|P|S|S| " log DSPLY "$SCRIPT_NAME vers. $SCRIPT_VERSION" log DSPLY "=========================================================" log DSPLY "Hostname:\t\t$HOSTNAME" log DSPLY "---------------------------------------------------------" } create_working_directory () { if [[ ! -e "$PPSS_DIR" ]] then mkdir -p "$PPSS_DIR" fi } expand_str () { STR=$1 LENGTH=$TYPE_LENGTH SPACE=" " while [[ "${#STR}" -lt "$LENGTH" ]] do STR=$STR$SPACE done echo "$STR" } are_we_sourced () { RES=$(basename $SOURCED) if [[ "$RES" == "ppss" ]] then return 1 else return 0 fi } get_time_in_seconds () { if [[ "$ARCH" == "SunOS" ]] then # # Dirty hack because this ancient operating system does not support +%s... # # SWB: Clever, but horrible. Use perl since I already created a dependency # there by using it to fix sed and [[:alnum:]] not working. # THE_TIME=$(truss /usr/bin/date 2>&1 | grep ^time | awk '{ print $3 }') THE_TIME=$(perl -e 'print time') else THE_TIME="$(date +%s)" fi echo "$THE_TIME" } set_md5 () { case $ARCH in "Darwin") MD5=md5 ;; "FreeBSD") MD5=md5 ;; "SunOS") MD5="digest -a md5" ;; "Linux") MD5=md5sum ;; esac echo "test" | $MD5 > /dev/null 2>&1 if [[ ! "$?" ]] then LOG ERROR "ERROR - PPSS requires $MD5. It may not be within the path or installed." return 1 else return 0 fi } set_stat () { if [[ "$DAEMON" == "1" && "$INOTIFY" == "0" ]] then case $ARCH in "Darwin") STAT="stat -f%m" ;; "FreeBSD") STAT="stat -f%m" ;; "SunOS") STAT="gstat -c%Y" ;; "Linux") STAT="stat -c%Y" ;; esac $STAT . >> /dev/null 2>&1 if [[ ! "$?" ]] then LOG ERROR "ERROR - PPSS daemon mode requires stat. It may not be within the path or installed." return 1 else return 0 fi else return 0 fi } log () { # # Type 'DSPLY ERROR and WARN' is logged to the screen # Any other log-type is only logged to the logfile. # TYPE="$1" MESG="$2" TYPE_LENGTH=5 # # Performance hack. Don't go through all the code if not required. # if [[ "$TYPE" == "DEBUG" && "$PPSS_DEBUG" == "0" ]] then return fi TYPE_EXP=$(expand_str "$TYPE") DATE=$(date +%b\ %d\ %H:%M:%S) PREFIX="$DATE: ${TYPE_EXP:0:$TYPE_LENGTH}" PREFIX_SMALL="$DATE: " if [[ "$TYPE" != "ERROR" ]] then ECHO_MSG="$PREFIX_SMALL $MESG" else ECHO_MSG="$PREFIX_SMALL [ERROR] $MESG" fi LOG_MSG="$PREFIX $MESG" if [[ ! -z "$PPSS_DEBUG" && "$PPSS_DEBUG" != "0" ]] then echo -e "$LOG_MSG" >> "$LOGFILE" elif [[ "$TYPE" == "INFO" || "$TYPE" == "ERROR" || "$TYPE" == "WARN" || "$TYPE" == "DSPLY" ]] then echo -e "$LOG_MSG" >> "$LOGFILE" fi if [[ "$TYPE" == "DSPLY" || "$TYPE" == "ERROR" || "$TYPE" == "WARN" && "$QUIET" == "0" ]] then echo -e "$ECHO_MSG" elif [[ "$TYPE" == "ERROR" && "$QUIET" == "1" ]] then echo -e "$ECHO_MSG" fi if [[ "$TYPE" == "PRCNT" ]] then echo -en "\r$ECHO_MSG" #echo "$ECHO_MSG" # for debugging. fi } init_vars () { # # Get start time to measure how long PPSS has been running. # START_PPSS=$(get_time_in_seconds) # # Check if MD5(SUM) is present on the system. # set_md5 # # Chec if stat is present and works on the system if daemon mode is enabled. # set_stat # # Is PPSS run as a daemon? Then use input locking, which is not required otherwise. # if [[ "$DAEMON" == "1" ]] then INPUT_LOCK="$SRC_DIR/INPUT_LOCK" fi # # For some strange reason, this value differ on different operating systems due to # different behaviour betwen the ps utilily acros operating systems. # if [[ "$ARCH" == "Darwin" ]] then MIN_JOBS=4 elif [[ "$ARCH" == "Linux" ]] then MIN_JOBS=3 fi FIFO="$PPSS_DIR"/ppss-fifo-$RANDOM-$RANDOM FIFO_LISTENER="$PPSS_DIR"/ppss-fifo-listener-$RANDOM-$RANDOM if [[ ! -e "$FIFO" ]] then mkfifo -m 600 $FIFO fi if [[ ! -e "$FIFO_LISTENER" ]] then mkfifo -m 600 $FIFO_LISTENER fi exec 42<> $FIFO exec 43<> $FIFO_LISTENER set_status "RUNNING" if [[ -e "$CPUINFO" ]] then CPU=$(cat $CPUINFO | grep 'model name' | cut -d ":" -f 2 | sed -e s/^\ //g | sort | uniq) log DSPLY "CPU: $CPU" elif [[ "$ARCH" == "Darwin" ]] then MODEL=$(system_profiler SPHardwareDataType | grep "Processor Name" | cut -d ":" -f 2) SPEED=$(system_profiler SPHardwareDataType | grep "Processor Speed" | cut -d ":" -f 2) log DSPLY "CPU: $MODEL $SPEED" elif [[ "$ARCH" == "SunOS" ]] then CPU=$(/usr/sbin/psrinfo -v | grep MHz | cut -d " " -f 4,8 | awk '{ printf ("Processor architecture: %s @ %s MHz.\n", $1,$2) }' | head -n 1) log DSPLY "$CPU" else log DSPLY "CPU: Cannot determine. Provide a patch for your arch!" log DSPLY "Arch is $ARCH" fi if [[ -z "$MAX_NO_OF_RUNNING_JOBS" ]] then get_no_of_cpus $HYPERTHREADING fi if [[ ! -z "$SSH_SERVER" ]] then does_file_exist "$PPSS_HOME_DIR/$JOB_LOG_DIR" if [[ ! "$?" == "0" ]] then log DEBUG "Remote Job log directory $PPSS_HOME_DIR/$JOB_lOG_DIR does not exist. Creating." exec_cmd "mkdir $PPSS_HOME_DIR/$JOB_LOG_DIR" fi fi if [[ ! -e "$JOB_LOG_DIR" ]] then mkdir -p "$JOB_LOG_DIR" fi if [[ ! -z "$SSH_SERVER" ]] then ITEM_LOCK_DIR="$PPSS_HOME_DIR/$ITEM_LOCK_DIR" fi does_file_exist "$ITEM_LOCK_DIR" if [[ ! "$?" == "0" ]] then if [[ ! -z "$SSH_SERVER" ]] then log DEBUG "Creating remote item lock dir." else log DEBUG "Creating local item lock dir." fi exec_cmd "mkdir $ITEM_LOCK_DIR" if [[ ! "$?" ]] then log DEBUG "Failed to create item lock dir." fi fi if [[ ! -z "$SSH_SERVER" ]] then does_file_exist "$REMOTE_OUTPUT_DIR" if [[ ! "$?" == "0" ]] then log DEBUG "Remote output dir $REMOTE_OUTPUT_DIR does not exist." exec_cmd "mkdir $REMOTE_OUTPUT_DIR" fi fi if [[ ! -e "$PPSS_LOCAL_TMPDIR" ]] then mkdir "$PPSS_LOCAL_TMPDIR" fi if [[ ! -e "$PPSS_LOCAL_OUTPUT" ]] then mkdir "$PPSS_LOCAL_OUTPUT" fi if [[ ! -e "$PPSS_NODE_STATUS" ]] then mkdir -p "$PPSS_NODE_STATUS" fi } upload_status () { if [[ -e "$NODE_STATUS_FILE" ]] then scp -q -o GlobalKnownHostsFile=./known_hosts -i ppss-key.dsa $NODE_STATUS_FILE $USER@$SSH_SERVER:$PPSS_HOME_DIR/$PPSS_NODE_STATUS/ if [[ "$?" == "0" ]] then log DEBUG "Uploaded status to server ok." else log DEBUG "Uploaded status to server failed." fi else log DEBUG "Status file not found thus not uploaded." fi } set_status () { if [[ ! -z "$SSH_SERVER" ]] then STATUS="$1" if [[ -e "$LIST_OF_PROCESSED_ITEMS" ]] then NO_PROCESSED=$(wc -l "$LIST_OF_PROCESSED_ITEMS" | awk '{ print $1 }' ) else NO_PROCESSED="0" fi NODE=$(cat $PPSS_DIR/$NODE_ID) FAILED="$2" if [[ -z "$FAILED" ]] then FAILED=0 fi echo "$NODE $HOSTNAME $STATUS $NO_PROCESSED" "$FAILED" > "$NODE_STATUS_FILE" upload_status fi } check_status () { ERROR="$1" FUNCTION="$2" MESSAGE="$3" if [[ ! "$ERROR" == "0" ]] then log DSPLY "$FUNCTION - $MESSAGE" set_status ERROR cleanup exit "$ERROR" fi } erase_ppss () { SSH_SOCKET="ppss_ssh_socket-$NODE" SSH_OPTS_NODE="-o BatchMode=yes -o ControlPath=$SSH_SOCKET \ -o GlobalKnownHostsFile=./known_hosts \ -o ControlMaster=auto \ -o Cipher=blowfish \ -o ConnectTimeout=5 " echo "Are you realy sure you want to erase PPSS from all nodes!? (YES/NO)" read YN if [[ "$YN" == "yes" ]] || [[ "$YN" == "YES" ]] then for NODE in $(cat $NODES_FILE) do log DSPLY "Erasing PPSS homedir $PPSS_HOME_DIR from node $NODE." ssh -q $SSH_KEY $SSH_OPTS_NODE $USER@$NODE "rm -rf $PPSS_HOME_DIR" done else log DSPLY "Aborting.." fi } stack_push_tmp () { TMP1="$1" if [[ -z "$TMP_STACK" ]] then TMP_STACK="$TMP1" else TMP_STACK="$TMP_STACK"$'\n'"$TMP1" fi } stack_push () { line="$1" if [[ -z "$STACK" ]] then STACK="$line" else STACK="$line"$'\n'"$STACK" fi } unprocessed_stack_push () { line="$1" if [[ -z "$PROCESSED_ITEMS" ]] then UNPROCESSED_ITEMS="$line" else UNPROCESSED_ITEMS="$line"$'\n'"$UNPROCESSED_ITEMS" fi } processed_stack_push () { line="$1" if [[ -z "$PROCESSED_ITEMS" ]] then PROCESSED_ITEMS="$line" else PROCESSED_ITEMS="$line"$'\n'"$PROCESSED_ITEMS" fi } stack_pop () { TMP_STACK="" i=0 tmp="" for x in $STACK do if [[ "$i" == "0" ]] then tmp="$x" else stack_push_tmp "$x" fi ((i++)) done STACK="$TMP_STACK" REGISTER="$tmp" if [[ -z "$REGISTER" ]] then return 1 else return 0 fi } is_screen_installed () { if [[ "$DISABLE_SCREEN_TEST" == "1" ]] then return 0 fi NODE="$1" ssh -q $SSH_OPTS_NODE $SSH_KEY $USER@$NODE "screen -m -D -S test ls" > /dev/null 2>&1 if [[ ! "$?" == "0" ]] then log ERROR "The 'Screen' command may not be installed on node $NODE." log ERROR "Or some other SSH related error occurred." return 1 else log DEBUG "'Screen' is installed on node $NODE." fi } deploy () { NODE="$1" SSH_SOCKET="ppss_ssh_socket-$NODE" SSH_OPTS_NODE="-o BatchMode=yes -o ControlPath=$SSH_SOCKET \ -o GlobalKnownHostsFile=./known_hosts \ -o ControlMaster=auto \ -o Cipher=blowfish \ -o ConnectTimeout=5 " SSH_OPTS_SLAVE="-o BatchMode=yes -o ControlPath=$SSH_SOCKET \ -o GlobalKnownHostsFile=./known_hosts \ -o ControlMaster=no \ -o Cipher=blowfish \ -o ConnectTimeout=5 " ERROR=0 set_error () { if [[ "$ERROR" == "1" ]] then ERROR=1 elif [[ ! "$1" == "0" ]] then ERROR=1 fi } if [[ ! -e "$SSH_SOCKET" ]] then ssh -q -N $SSH_OPTS_NODE $SSH_KEY $USER@$NODE & SSH_PID=$! fi is_screen_installed "$NODE" KEY=$(echo $SSH_KEY | cut -d " " -f 2) ssh -q $SSH_OPTS_SLAVE $SSH_KEY $USER@$NODE "cd ~ && mkdir -p $PPSS_HOME_DIR && mkdir -p $PPSS_HOME_DIR/$JOB_LOG_DIR && mkdir -p $PPSS_HOME_DIR/ITEM_LOCK_DIR >> /dev/null 2>&1" set_error $? ssh -q $SSH_OPTS_SLAVE $SSH_KEY $USER@$NODE "cd ~ && cd $PPSS_HOME_DIR && cd $PPSS_DIR && echo $NODE > $NODE_ID" set_error $? scp -q $SSH_OPTS_SLAVE $SSH_KEY $0 $USER@$NODE:~/$PPSS_HOME_DIR set_error $? scp -q $SSH_OPTS_SLAVE $SSH_KEY $KEY $USER@$NODE:~/$PPSS_HOME_DIR set_error $? scp -q $SSH_OPTS_SLAVE $SSH_KEY $CONFIG $USER@$NODE:~/$PPSS_HOME_DIR set_error $? scp -q $SSH_OPTS_SLAVE $SSH_KEY known_hosts $USER@$NODE:~/$PPSS_HOME_DIR set_error $? if [[ ! -z "$SCRIPT" ]] then scp -q $SSH_OPTS_SLAVE $SSH_KEY $SCRIPT $USER@$NODE:~/$PPSS_HOME_DIR set_error $? fi if [[ ! -z "$INPUT_FILE" ]] then scp -q $SSH_OPTS_SLAVE $SSH_KEY $INPUT_FILE $USER@$NODE:~/$PPSS_HOME_DIR set_error $? fi if [[ "$ERROR" == "0" ]] then log DSPLY "PPSS installed on node $NODE." else log DSPLY "PPSS failed to install on $NODE." fi kill $SSH_PID } deploy_ppss () { if [[ -z "$NODES_FILE" ]] || [[ ! -e "$NODES_FILE" ]] then log ERROR "No file containing list of nodes missing / not specified." set_status ERROR cleanup exit 1 fi exec_cmd "mkdir -p $PPSS_HOME_DIR/$PPSS_NODE_STATUS" KEY=$(echo $SSH_KEY | cut -d " " -f 2) if [[ -z "$KEY" ]] || [[ ! -e "$KEY" ]] then log ERROR "Private SSH key $KEY not found." set_status "ERROR" cleanup exit 1 fi if [[ ! -e "$SCRIPT" && ! -z "$SCRIPT" ]] then log ERROR "Script $SCRIPT not found." set_status "ERROR" cleanup exit 1 fi INSTALLED_ON_SSH_SERVER=0 for NODE in $(cat $NODES_FILE) do deploy "$NODE" & if [[ "$ARCH" == "SunOS" ]] then sleep 1 else sleep 0.1 fi if [[ "$NODE" == "$SSH_SERVER" ]] then INSTALLED_ON_SSH_SERVER=1 fi done if [[ "$INSTALLED_ON_SSH_SERVER" == "0" ]] then log DEBUG "SSH SERVER $SSH_SERVER is not a node." else log DEBUG "SSH SERVER $SSH_SERVER is also a node." fi } start_ppss_on_node () { NODE="$1" log DSPLY "Starting PPSS on node $NODE." ssh $SSH_KEY $USER@$NODE -o ConnectTimeout=5 -o GlobalKnownHostsFile=./known_hosts "cd $PPSS_HOME_DIR ; screen -d -m -S PPSS ~/$PPSS_HOME_DIR/$0 node --config ~/$PPSS_HOME_DIR/$CONFIG" if [[ ! "$?" == "0" ]] then log ERROR "|P|P|S|S| failed to start on node $NODE." fi } init_ssh_server_socket () { if [[ ! -e "$SSH_SOCKET" ]] then DIR=$(dirname $SSH_SOCKET) mkdir -p "$DIR" fi } test_server () { # Testing if the remote server works as expected. if [[ ! -z "$SSH_SERVER" ]] then init_ssh_server_socket exec_cmd "date >> /dev/null" check_status "$?" "$FUNCNAME" "Server $SSH_SERVER could not be reached" ssh -N -M $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER & SSH_MASTER_PID="$!" log DEBUG "SSH Master pid is $SSH_MASTER_PID" log INFO "Connected to server: $SSH_SERVER" does_file_exist "$PPSS_HOME_DIR/$PPSS_DIR" if [[ ! "$?" == "0" && ! -z "$SSH_SERVER" ]] then log DEBUG "Remote PPSS home directory $PPSS_HOME_DIR/$PPSS_DIR does not exist. Creating." exec_cmd "mkdir -p $PPSS_HOME_DIR/$PPSS_DIR" fi else log DEBUG "No remote server specified, assuming stand-alone mode." fi } get_no_of_cpus () { # Use hyperthreading or not? HPT=$1 NUMBER="" if [[ -z "$HPT" ]] then HPT=yes fi got_cpu_info () { ERROR="$1" check_status "$ERROR" "$FUNCNAME" "cannot determine number of cpu cores. Specify with -p." } if [[ "$HPT" == "yes" ]] then if [[ "$ARCH" == "Linux" ]] then NUMBER=$(grep -c ^processor $CPUINFO) got_cpu_info "$?" elif [[ "$ARCH" == "Darwin" ]] then NUMBER=$(sysctl -a hw | grep -w logicalcpu | awk '{ print $2 }') got_cpu_info "$?" elif [[ "$ARCH" == "FreeBSD" ]] then NUMBER=$(sysctl hw.ncpu | awk '{ print $2 }') got_cpu_info "$?" elif [[ "$ARCH" == "SunOS" ]] then NUMBER=$(psrinfo | grep -c on-line) got_cpu_info "$?" else if [[ -e "$CPUINFO" ]] then NUMBER=$(grep -c ^processor $CPUINFO) got_cpu_info "$?" fi fi if [[ ! -z "$NUMBER" ]] then log DSPLY "Found $NUMBER logic processors." fi elif [[ "$HPT" == "no" ]] then log DSPLY "Hyperthreading is disabled." if [[ "$ARCH" == "Linux" ]] then PHYSICAL=$(grep 'physical id' $CPUINFO) if [[ "$?" ]] then PHYSICAL=$(grep 'physical id' $CPUINFO | sort | uniq | wc -l) if [[ "$PHYSICAL" == "1" ]] then log DSPLY "Found $PHYSICAL physical CPU." else log DSPLY "Found $PHYSICAL physical CPUs." fi TMP=$(grep 'core id' $CPUINFO) if [[ "$?" ]] then log DEBUG "Starting job only for each physical core on all physical CPU(s)." NUMBER=$(grep 'core id' $CPUINFO | sort | uniq | wc -l) log DSPLY "Found $NUMBER physical cores." else log DSPLY "Single core processor(s) detected." log DSPLY "Starting job for each physical CPU." NUMBER=$PHYSICAL fi else log INFO "No 'physical id' section found in $CPUINFO, typical for older cpus." NUMBER=$(grep -c ^processor $CPUINFO) got_cpu_info "$?" fi elif [[ "$ARCH" == "Darwin" ]] then NUMBER=$(sysctl -a hw | grep -w physicalcpu | awk '{ print $2 }') got_cpu_info "$?" elif [[ "$ARCH" == "FreeBSD" ]] then NUMBER=$(sysctl hw.ncpu | awk '{ print $2 }') got_cpu_info "$?" else NUMBER=$(cat $CPUINFO | grep "cpu cores" | cut -d ":" -f 2 | uniq | sed -e s/\ //g) got_cpu_info "$?" fi fi if [[ ! -z "$NUMBER" ]] then MAX_NO_OF_RUNNING_JOBS=$NUMBER else log ERROR "Number of CPUs not obtained." log ERROR "Please specify manually with -p." set_status "ERROR" exit 1 fi } random_delay () { ARGS="$1" if [[ -z "$ARGS" ]] then log ERROR "$FUNCNAME Function random delay, no argument specified." set_status "ERROR" exit 1 fi NUMBER=$RANDOM let "NUMBER %= $ARGS" sleep "$NUMBER" } escape_item () { TMP="$1" ITEM_ESCAPED=`echo "$TMP" | \ sed s/\\ /\\\\\\\\\\\\\\ /g | \ sed s/\\'/\\\\\\\\\\\\\\'/g | \ sed s/\\|/\\\\\\\\\\\\\\|/g | \ sed s/\&/\\\\\\\\\\\\\\&/g | \ sed s/\;/\\\\\\\\\\\\\\;/g | \ sed s/\>/\\\\\\\\\\>/g | \ sed s/\> /dev/null 2>&1" ERROR="$?" return "$ERROR" fi } get_input_lock () { while true do exec_cmd "mkdir $INPUT_LOCK >> /dev/null 2>&1 " if [[ "$?" ]] then log DEBUG "Input lock is obtained..." break else log DEBUG "Input lock is present...sleeping.." sleep 5 fi done } release_input_lock () { exec_cmd "rm -rf $INPUT_LOCK" if [[ "$?" ]] then log DEBUG "Input lock was released..." return 0 else log ERROR "Input lock was already gone...this should never happen..." return 1 fi } list_all_input_items () { oldIFS=$IFS # save the field separator IFS=$'\n' # new field separator, the end of line while read line do echo "$line" done < "$LISTOFITEMS" IFS="$oldIFS" } remove_processed_items_from_input_file () { # # This function removes all items that have already been processed. # Processed items have a lock dir in the PPPSS_ITEM_LOCK_DIR. # if [[ -e "$LIST_OF_PROCESSED_ITEMS" ]] then PROCESSED_ITEMS=$(cat $LIST_OF_PROCESSED_ITEMS) fi log DEBUG "Running $FUNCNAME" if [[ -z "$PROCESSED_ITEMS" ]] then log DEBUG "Variable processed_items is empty." return 1 fi if [[ "$MODE" == "status" ]] then log DEBUG "Mode is status." return 1 fi if [[ ! -e "$LISTOFITEMS" ]] then echo "$LISTOFITEMS does not exist!" return 1 else SIZE=$(wc -l "$LISTOFITEMS") if [[ "$SIZE" == "0" ]] then echo "$LISTOFITEMS exists but is empty." return 1 fi fi INPUTFILES=$(list_all_input_items) OUT_FILE=$PPSS_DIR/out-file-$RANDOM$RANDOM$RANDOM$RAMDOM LIST_SORTED=$PPSS_DIR/list-sorted-$RANDOM$RANDOM$RANDOM$RANDOM PROCESSED_SORTED=$PPSS_DIR/processed-sorted-$RANDOM$RANDOM$RANDOM$RANDOM cat "$LISTOFITEMS" | sort > "$LIST_SORTED" cat "$LIST_OF_PROCESSED_ITEMS" | sort > "$PROCESSED_SORTED" comm -3 $LIST_SORTED $PROCESSED_SORTED > $OUT_FILE mv "$OUT_FILE" "$LISTOFITEMS" } get_all_items () { if [[ "$DAEMON" == "1" && "$INOTIFY" == "0" ]] && [[ "$ENABLE_INPUT_LOCK" == "1" ]] then get_input_lock fi GLOBAL_COUNTER=1 if [[ -e "$LISTOFITEMS" ]] then rm "$LISTOFITEMS" fi count=0 if [[ -z "$INPUT_FILE" ]] then if [[ ! -z "$SSH_SERVER" ]] # Are we running stand-alone or as a node?" then if [[ "$TRAVERSAL" == "1" ]] then $(exec_cmd "find $SRC_DIR/ ! -type d" > "$LISTOFITEMS") check_status "$?" "$FUNCNAME" "Could not list files within remote source directory." else log DEBUG "Recursion is disabled." if [[ "$ARCH" != "SunOS" ]] then $(exec_cmd "find $SRC_DIR/ -maxdepth 1 ! -type d" > "$LISTOFITEMS") else # SWB: -depth takes no arguments. I think he meant to use # -maxdepth to limit search to just the source directory # (But that won't work on Solaris.) Try the -prune workaround # Use this to work around a problem using -name with an # embedded / when SRC_DIR is more than one level deep. _dir_basename=`basename "$SRC_DIR"` $(exec_cmd "find \"$SRC_DIR\" ( ! -name \"$_dir_basename\" -o -type f ) -prune ! -type d -print" > "$LISTOFITEMS") fi check_status "$?" "$FUNCNAME" "Could not list files within remote source directory." fi else if [[ -e "$SRC_DIR" ]] then if [[ "$TRAVERSAL" == "1" ]] then log DEBUG "Recursion is enabled." $(find "$SRC_DIR"/ ! -type d >> "$LISTOFITEMS") check_status "$?" "$FUNCNAME" "Could not list files within local source directory." else log DEBUG "Recursion is disabled." if [[ "$ARCH" != "SunOS" ]] then $(find "$SRC_DIR"/ -maxdepth 1 ! -type d >> "$LISTOFITEMS") else # # SWB: see above depth vs. maxdepth comment and Solaris # brain-deadness workaround. # Use this to work around a problem using -name with an # embedded / when SRC_DIR is more than one level deep. _dir_basename=`basename "$SRC_DIR"` $(find "$SRC_DIR" \( ! -name "$_dir_basename" -o -type f \) -prune ! -type d -print > "$LISTOFITEMS") fi check_status "$?" "$FUNCNAME" "Could not list files within local source directory." fi if [[ ! -e "$LISTOFITEMS" ]] then log ERROR "Local input file is not created, something is wrong. Bug?" set_status "ERROR" cleanup exit 1 fi else ITEMS="" fi fi else # Using an input file as the source of our items or STDIN. if [[ ! -z "$SSH_SERVER" ]] # Are we running stand-alone or as a slave?" then log DEBUG "Running as node, input file has been pushed (hopefully)." fi if [[ ! -e "$INPUT_FILE" && ! "$INPUT_FILE" == "-" ]] then log ERROR "Input file $INPUT_FILE does not exist." set_status "ERROR" cleanup exit 1 fi if [[ ! "$INPUT_FILE" == "-" ]] then cp "$INPUT_FILE" "$LISTOFITEMS" check_status "$?" "$FUNCNAME" "Copy of input file failed!" else log DEBUG "Reading from stdin.." while read LINE do echo "$LINE" >> "$LISTOFITEMS" done fi if [[ ! -e "$LISTOFITEMS" ]] then log ERROR "Input is empty." infanticide terminate_listener cleanup fi fi if [[ "$RANDOMIZE" == "1" && "$MODE" != "status" ]] then log DEBUG "Randomizing input file." IFS_BACK="$IFS" IFS=$'\n' TMP_FILE="$PPSS_DIR/TMP-$RANDOM$RANDOM.txt" for i in $(cat $LISTOFITEMS); do echo "$RANDOM $i"; done | sort | sed -E 's/^[0-9]+ //' > "$TMP_FILE" mv "$TMP_FILE" "$LISTOFITEMS" IFS="$IFS_BACK" else log DEBUG "Randomisation of input file disabled." fi remove_processed_items_from_input_file if [[ "$DAEMON" == "1" ]] then release_input_lock fi SIZE_OF_INPUT=$(wc -l "$LISTOFITEMS" | awk '{ print $1 }') if [[ "$SIZE_OF_INPUT" -le "0" && "$DAEMON" == "0" ]] then log ERROR "Source file/dir seems to be empty." set_status "STOPPED" cleanup exit 1 fi } are_all_items_locked () { SIZE="$1" NUMBER=$(exec_cmd "ls -1 $ITEM_LOCK_DIR | wc -l") log DEBUG "$NUMBER of $SIZE items are locked." if [[ "$NUMBER" -ge "$SIZE" ]] then return 0 else return 1 fi } get_item () { check_for_interrupt if [[ "$STOP" == "1" ]] then log DEBUG "Found stop signal." return 1 fi # # Return error if list size is empty. # if [[ -z "$SIZE_OF_INPUT" ]] then log DEBUG "Got no size of input..." return 1 fi # # Return error if the list is empty. # if [[ "$SIZE_OF_INPUT" -le "0" ]] then return 1 fi # # Check if all items have been processed. # if [[ "$GLOBAL_COUNTER" -gt "$SIZE_OF_INPUT" ]] then log DEBUG "Counter $GLOBAL_COUNTER is greater than sizeof input $SIZE_OF_INPUT." return 1 fi # # Quit if all items have been locked. # if are_all_items_locked "$SIZE_OF_INPUT" then log DEBUG "All items have been locked." return 1 else log DEBUG "There are still unlocked items." fi ITEM="$(sed -n $GLOBAL_COUNTER\p $LISTOFITEMS)" if [[ -z "$ITEM" ]] then log DEBUG "Item was emtpy..." ((GLOBAL_COUNTER++)) get_item else ((GLOBAL_COUNTER++)) if [[ ! -z "$SSH_SERVER" ]] || [[ "$LOCAL_LOCKING" == "1" ]] then lock_item "$ITEM" LOCK="$?" if [[ ! "$LOCK" == "0" ]] then log DEBUG "Item $ITEM is locked." get_item else log DEBUG "Got lock on $ITEM" return 0 fi else return 0 fi fi } start_new_worker () { # # This function kicks the listener to start a worker process. # if ! are_we_sourced then echo "$START_KEY" >> "$FIFO" return $? fi } stop-ppss () { STOP_PPSS=$(get_time_in_seconds) elapsed "$START_PPSS" "$STOP_PPSS" log DSPLY "$PROCESSING_TIME" } elapsed () { BEFORE="$1" AFTER="$2" ELAPSED="$(expr $AFTER - $BEFORE)" REMAINDER="$(expr $ELAPSED % 3600)" HOURS="$(expr $(expr $ELAPSED - $REMAINDER) / 3600)" SECS="$(expr $REMAINDER % 60)" MINS="$(expr $(expr $REMAINDER - $SECS) / 60)" PROCESSING_TIME=$(printf "Total processing time (hh:mm:ss): %02d:%02d:%02d" $HOURS $MINS $SECS) } mail_on_error () { ITEM="$1" LOGFILE="$2" if [[ "$MAIL_ON_ERROR" == "1" ]] then cat "$LOGFILE" | mail -s "$HOSTNAME - PPSS: procesing failed for item." "$EMAIL" if [[ "$?" == "0" ]] then log DEBUG "Error mail sent." else log ERROR "Sending of error email failed." fi fi } add_item_to_failed_list () { ITEM="$1" get_input_lock echo "$ITEM" >> "$FAILED_ITEMS" release_input_lock } commando () { # # This function will start a chain reaction of events. # # The commando executes a command on an item and, when finished, # executes the start_new_worker. This function selects a new # item and sends it to the fifo. The listener process receives # the item and excutes this commando function on the item. # So in essence, the commando function keeps calling itself # indirectly until no items are left. This will form a single # working queue. By executing multiple start_new_worker # functions based on the CPU cores available, parallel processing # is achieved, with a queue for each core. # ERR_STATE=0 VIRTUAL=0 # # This code tests if the item exist (is physical or virtuel) # Example: a file is physical, a URL is virtual. # ITEM="$1" ORIG_ITEM="$ITEM" #log DEBUG "$FUNCNAME is processing item $ITEM" if [[ "$TRAVERSAL" == "1" ]] then escape_item "$ITEM" does_file_exist "$ITEM_ESCAPED" # The item contains the full path. ERR_STATE="$?" else escape_item "$ITEM" does_file_exist "$SRC_DIR/$ITEM_ESCAPED" # The item is only the file name itself. ERR_STATE="$?" fi # # If recursion is used, a file name of an item may not be unique. # The same filename can be used for files in differen directories. # Therefore, the output directory must reflect the original directory # structure. If recursion is not used, this is not necessary. # # Further more, if the item is not a real world object but a string of sort # the item is considered virtual. No recursion. # if [[ "$ERR_STATE" == "0" ]] then VIRTUAL="0" if [[ "$TRAVERSAL" == "1" ]] then ITEM_DIR_NAME=$(dirname "$ITEM" | sed s:$SRC_DIR::g) ITEM_BASE_NAME=$(basename "$ITEM") HASH=$(echo "$ITEM" | $MD5 | awk '{ print $1 }') if [[ "$UPLOAD_TO_SERVER" == "1" ]] then OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$HASH" else if [[ -z "$REMOTE_OUTPUT_DIR" ]] then OUTPUT_DIR="$PPSS_LOCAL_OUTPUT" else OUTPUT_DIR="$REMOTE_OUTPUT_DIR/$ITEM_DIR_NAME" fi fi else ITEM_DIR_NAME="$SRC_DIR" ITEM_BASE_NAME="$ITEM" escape_item "$ITEM_BASE_NAME" if [[ "$UPLOAD_TO_SERVER" == "1" ]] then OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$ITEM_ESCAPED" else OUTPUT_DIR="$REMOTE_OUTPUT_DIR" fi fi else VIRTUAL="1" ITEM_DIR_NAME="" ITEM_BASE_NAME="$ITEM" escape_item "$ITEM_BASE_NAME" if [[ "$UPLOAD_TO_SERVER" == "1" ]] then OUTPUT_DIR=$PPSS_LOCAL_OUTPUT/"$ITEM_ESCAPED" else if [[ -z "$REMOTE_OUTPUT_DIR" ]] then OUTPUT_DIR="$PPSS_LOCAL_OUTPUT" else OUTPUT_DIR="$REMOTE_OUTPUT_DIR" fi fi fi OUTPUT_FILE="$ITEM_BASE_NAME" # # Decide if an item must be transfered from server to the node. # or be processed in-place (NFS / SMB mount?) # if [[ "$DOWNLOAD_TO_NODE" == "0" ]] then if [[ "$VIRTUAL" == "1" ]] then log DEBUG "Item is virtual, thus not downloading." else log DEBUG "Using item straight from the server, no copy." if [[ "$TRAVERSAL" == "0" ]] then ITEM="$SRC_DIR/$ITEM" else ITEM="$ITEM" fi fi else download_item "$ITEM" if [[ "$TRAVERSAL" == "1" ]] then ITEM="$PPSS_LOCAL_TMPDIR/$HASH/$ITEM_BASE_NAME" else ITEM="$PPSS_LOCAL_TMPDIR/$ITEM_BASE_NAME" fi fi # # Create the log file containing the output of the command. # if [[ "$USE_MD5" == "1" ]] then LOG_FILE_NAME=$(echo "$ITEM" | $MD5 | awk '{ print $1 }') else if [[ "$ARCH" != "SunOS" ]] then LOG_FILE_NAME=$(echo "$ITEM" | sed s/[^[:alnum:]]/_/g) else LOG_FILE_NAME=$(echo "$ITEM" | perl -p -e 'chomp;s/\W/_/g') fi fi ITEM_LOG_FILE="$JOB_LOG_DIR/$LOG_FILE_NAME" if [[ -e "$ITEM_LOG_FILE" && "$DISABLE_SKIPPING" == "0" ]] then log DEBUG "Item is already processed, skipping..." start_new_worker return 0 fi # # Create the local output directory. # if [[ ! -z "$OUTPUT_DIR" ]] then log DEBUG "Creating local output dir $OUTPUT_DIR" mkdir -p "$OUTPUT_DIR" fi ERROR="" # # Some formatting of item log files. # DATE=$(date +%b\ %d\ %H:%M:%S) echo "===== PPSS Item Log File =====" > "$ITEM_LOG_FILE" echo -e "Host:\t\t$HOSTNAME" >> "$ITEM_LOG_FILE" echo -e "Process:\t$PID" >> "$ITEM_LOG_FILE" echo -e "Item:\t\t$ITEM" >> "$ITEM_LOG_FILE" echo -e "Start date:\t$DATE" >> "$ITEM_LOG_FILE" echo -e "" >> "$ITEM_LOG_FILE" #--># #--># The actual execution of the command as specified by #--># the -c option. #--># # Check for "$ITEM" or "${ITEM" in the command line. # ${ITEM} allows the usage of string operations. BEFORE=$(get_time_in_seconds) # SWB: Solaris default grep (/usr/bin/grep) doesn't support -E if [[ "$ARCH" == "SunOS" ]] then echo "$COMMAND" | /usr/xpg4/bin/grep -E -i '\$\{?ITEM' >> /dev/null 2>&1 RETVAL="$?" else echo "$COMMAND" | grep -E -i '\$\{?ITEM' >> /dev/null 2>&1 RETVAL="$?" fi if [[ "$RETVAL" == "0" ]] then eval "$COMMAND" >> "$ITEM_LOG_FILE" 2>&1 ERROR="$?" MYPID="$!" else eval '$COMMAND"$ITEM" >> "$ITEM_LOG_FILE" 2>&1' ERROR="$?" MYPID="$!" fi AFTER=$(get_time_in_seconds) echo -e "" >> "$ITEM_LOG_FILE" # Some error logging. Success or fail. if [[ ! "$ERROR" == "0" ]] then mail_on_error "$ITEM" "$ITEM_LOG_FILE" log DEBUG "Processing Item $ITEM failed." echo "$FAIL_KEY" >> "$FIFO" echo -e "Status:\t\tFAILURE" >> "$ITEM_LOG_FILE" add_item_to_failed_list "$ORIG_ITEM" else echo -e "Status:\t\tSUCCESS" >> "$ITEM_LOG_FILE" fi # # If part of a cluster, remove the downloaded item after # it has been processed and uploaded as not to fill up disk space. # if [[ "$DOWNLOAD_TO_NODE" == "1" ]] then if [[ -e "$ITEM" ]] then rm -rf "$ITEM" else log DEBUG "There is no local file to remove.. strange..." fi fi # # Upload the output file back to the server. # if [[ "$UPLOAD_TO_SERVER" == "1" ]] then log DEBUG "Upload $OUTPUT_DIR and ITEM DIR NAME IS $ITEM_DIR_NAME" if [[ "$ITEM_DIR_NAME" == "." ]] then ITEM_DIR_NAME="" fi upload_item "$OUTPUT_DIR" "$ITEM_DIR_NAME" # local output directory / original item dir path else log DEBUG "Uploading disabled." fi # # Upload the log file to the server. # elapsed "$BEFORE" "$AFTER" echo "$PROCESSING_TIME" >> "$ITEM_LOG_FILE" echo -e "" >> "$ITEM_LOG_FILE" if [[ ! -z "$SSH_SERVER" ]] then scp -q $SSH_OPTS $SSH_KEY "$ITEM_LOG_FILE" $USER@$SSH_SERVER:$PPSS_HOME_DIR/$JOB_LOG_DIR if [[ ! "$?" ]] then log DEBUG "Uploading of item log file failed." fi fi start_new_worker } infanticide () { log DEBUG "Running $FUNCNAME" # # This code is run if ctrl+c is pressed. Very important to prevent # any child processes running after the parent has died. Keeps the system clean. # # This command kills all processes that are related to the master # process as defined by $PID. All processes that have ever been # spawned, although disowned or backgrounded will be killed... # # SWB: Fix ps command on Solaris, uses -e instead of a for all procs # and "comm" instead of "command" in the format specification. if [[ "$ARCH" == "SunOS" ]] then PROCLIST=$(ps -e -o pid,pgid,ppid,comm | grep [0-9] | grep $PID | grep -v -i grep) else PROCLIST=$(ps a -o pid,pgid,ppid,command | grep [0-9] | grep $PID | grep -v -i grep) fi oldIFS=$IFS # save the field separator IFS=$'\n' # new field separator, the end of line for x in $(echo "$PROCLIST") do MYPPID=$(echo $x | awk '{ print $3 }') MYPID=$(echo $x | awk '{ print $1 }') if [[ ! "$MYPPID" == "$PID" && ! "$MYPPID" == "1" ]] then if [[ ! "$MYPID" == "$PID" ]] then log DEBUG "Killing process $MYPID" kill $MYPID >> /dev/null 2>&1 else log DEBUG "Not killing master process..$MYPID.." fi else log DEBUG "Not killing listener process. $MYPID.." fi done IFS=$oldIFS } run_command () { INPUT="$1" log DEBUG "Current active workers is $ACTIVE_WORKERS" if [[ "$ACTIVE_WORKERS" -lt "$MAX_NO_OF_RUNNING_JOBS" ]] then if [[ -z "$INPUT" ]] then stack_pop INPUT="$REGISTER" fi log INFO "Now processing $INPUT" if [[ ! -z "$INPUT" && ! -d "$INPUT" ]] then commando "$INPUT" & MYPID="$!" disown PIDS="$PIDS $MYPID" ((ACTIVE_WORKERS++)) log DEBUG "Increasing active workers to $ACTIVE_WORKERS" echo "$INPUT" >> "$LIST_OF_PROCESSED_ITEMS" return 0 else log DEBUG "Item is a directory or is empty." return 0 fi else log DEBUG "Maximum number of workers are bussy, no more additional workers..." fi } display_jobs_remaining () { if [[ "$ACTIVE_WORKERS" == "1" && "$QUIET" == "0" ]] then log PRCNT "One job is remaining. " elif [[ "$QUIET" == "0" ]] then if [[ "$ACTIVE_WORKERS" == "1" ]] then echo -en "\n" fi log PRCNT "$((ACTIVE_WORKERS)) jobs are remaining. " fi } show_eta () { CURRENT_PROCESSED=$((GLOBAL_COUNTER-MAX_NO_OF_RUNNING_JOBS)) TOTAL="$SIZE_OF_INPUT" START_TIME=$START_PPSS NOW=$(get_time_in_seconds) MODULO=$((GLOBAL_COUNTER % 5 )) if [[ "$QUIET" == "1" ]] then return 0 fi if [[ "$CURRENT_PROCESSED" -le "0" ]] then return 0 else if [[ "$MODULO" == "0" ]] then RUNNING_TIME=$((NOW-START_TIME)) if [[ ! "$RUNNING_TIME" -le "0" && ! "$CURRENT_PROCESSED" == "0" ]] && [[ "$CURRENT_PROCESSED" -gt "$MAX_NO_OF_RUNNING_JOBS" ]] then TIME_PER_ITEM=$(( RUNNING_TIME / ( CURRENT_PROCESSED - MAX_NO_OF_RUNNING_JOBS ) )) log DEBUG "Time per item is $TIME_PER_ITEM seconds." TOTAL_TIME=$(( ($TIME_PER_ITEM * SIZE_OF_INPUT) + $TIME_PER_ITEM )) TOTAL_TIME_IN_SECONDS=$((START_TIME+TOTAL_TIME)) if [[ "$ARCH" == "Darwin" ]] then DATE=$(date -r $TOTAL_TIME_IN_SECONDS) elif [[ "$ARCH" == "SunOS" ]] then # SWB: Hack around Solaris' feeble date command DATE=$(perl -e "print scalar localtime($TOTAL_TIME_IN_SECONDS)") else DATE=$(date -d @$TOTAL_TIME_IN_SECONDS) fi echo log DSPLY "ETA: $DATE" echo -en "\033[2A" fi fi fi } display_progress () { if [[ "$DAEMON" == "0" ]] then SIZE_OF_INPUT=$(wc -l "$LISTOFITEMS" | awk '{ print $1 }') GC=0 if [[ ! "$GLOBAL_COUNTER" -gt "$SIZE_OF_INPUT" ]] then GC="$GLOBAL_COUNTER" else GC="$SIZE_OF_INPUT" fi PERCENT=$((100 * $GC / $SIZE_OF_INPUT )) if [[ ! "$ACTIVE_WORKERS" == "0" ]] # && [[ "$FINISHED" == "0" ]] then if [[ "$QUIET" == "0" ]] then log PRCNT "$PERCENT% complete. Processed $GC of $SIZE_OF_INPUT. Failed $FAILED_ITEMS_COUNTER/$SIZE_OF_INPUT." show_eta elif [[ "$DAEMON" == "0" ]] then echo -en "\r$PERCENT% --" fi if [[ "$PERCENT" == "100" ]] then if [[ "$QUIET" == "1" ]] then echo fi FINISHED=1 fi fi fi } terminate_listener () { GLOBAL_FAILED_COUNTER="$1" log DEBUG "Running $FUNCNAME" if [[ ! -z "$SSH_MASTER_PID" ]] then kill "$SSH_MASTER_PID" else log DEBUG "SSH master PID is empty." fi set_status "STOPPED" "$GLOBAL_FAILED_COUNTER" log DEBUG "Listener stopped." if [[ ! "$PERCENT" == "100" ]] then echo stop-ppss log DSPLY "$FAILED_ITEMS_COUNTER failed items." log DSPLY "Finished. Consult $JOB_LOG_DIR for job output." else echo stop-ppss log DSPLY "Finished. Consult $JOB_LOG_DIR for job output." fi if [[ "$QUIET" == "1" ]] then echo fi if [[ ! -z "$EMAIL" ]] then echo "|P|P|S|S| job finished." | mail -s "$HOSTNAME - PPSS has finished." "$EMAIL" if [[ ! "$?" == "0" ]] then log ERROR "Sending os status mail failed." fi fi echo "$GLOBAL_FAILED_COUNTER" >> "$FIFO_LISTENER" } inotify_listener () { inotifywait "$SRC_DIR" -m -r -e close -q --format '%w%f' | \ while read -r line do if [[ ! -d "$line" ]] then echo "$line" > "$FIFO" fi done } is_item_unprocessed () { VAR="$1" STATUS=0 if [[ -z "$VAR" ]] then log DEBUG "$FUNCNAME: something is wrong, no argument received." return 1 fi for x in $PROCESSED_ITEMS do if [[ "$x" == "$VAR" ]] then STATUS=1 fi done log DEBUG "Is item $VAR unprocessed: $STATUS" return $STATUS } is_item_file_and_unmodified () { ITEM="$1" if [[ -e "$ITEM" ]] then NOW=$(date +%s) FILEDATE=$($STAT "$ITEM") ELAPSED="$(expr $NOW - $FILEDATE)" if [[ "$ELAPSED" -gt "$DAEMON_FILE_AGE" ]] then log DEBUG "$FUNCNAME File $ITEM is aged $ELAPSED" return 0 else log DEBUG "$FUNCNAME File $ITEM too young $ELAPSED" return 1 fi else log DEBUG "$FUNCNAME: file does not exist." return 0 fi } process_item_as_daemon () { ITEM="$1" if is_item_unprocessed "$ITEM" then if is_item_file_and_unmodified "$ITEM" then echo "$ITEM" >> "$FIFO" processed_stack_push "$ITEM" else stack_push "$ITEM" fi fi } daemon_listener () { while true do get_all_items while get_item do process_item_as_daemon "$ITEM" done while stack_pop do process_item_as_daemon "$REGISTER" done sleep "$DAEMON_POLLING_INTERVAL" done } start_daemon_listener () { daemon_listener & MYPID="$!" disown PIDS="$PIDS $MYPID" } start_inotify_listener () { ACTIVE_WORKERS=0 inotify_listener & MYPID="$!" disown PIDS="$PIDS $MYPID" } start_as_daemon () { if [[ "$DAEMON" == "1" ]] then log DEBUG "Daemon mode enabled." if [[ "$INOTIFY" == "1" ]] then log INFO "Linux inotify enabled." start_inotify_listener else start_daemon_listener log INFO "Linux inotify disabled." fi else log DEBUG "Daemon mode disabled." fi } decrease_active_workers () { if [[ "$ACTIVE_WORKERS" -gt "0" ]] then ((ACTIVE_WORKERS--)) fi } listen_for_job () { FINISHED=0 ACTIVE_WORKERS="$MAX_NO_OF_RUNNING_JOBS" PIDS="" log DEBUG "Listener started." start_as_daemon while read event <& 42 do log INFO "Current active workers is $ACTIVE_WORKERS" if [[ "$event" == "$START_KEY" ]] then decrease_active_workers log DEBUG "Got a 'start-key' event" if [[ "$DAEMON" == "0" ]] then if get_item then log DEBUG "Got an item, running command..." run_command "$ITEM" else log DEBUG "No more new items..." if [[ "$ACTIVE_WORKERS" == "0" ]] then display_progress break else display_jobs_remaining fi fi else log DEBUG "Daemon mode: a worker finished..." run_command fi elif [[ "$event" == "$FAIL_KEY" ]] then ((FAILED_ITEMS_COUNTER++)) log DEBUG "An item failed to process. $FAILED_ITEMS_COUNTER" elif [[ "$event" == "$KILL_KEY" ]] then infanticide break else log DEBUG "Event is an item." stack_push "$event" run_command fi display_progress set_status "RUNNING" "$FAILED_ITEMS_COUNTER" done terminate_listener "$FAILED_ITEMS_COUNTER" } start_all_workers () { if [[ "$MAX_NO_OF_RUNNING_JOBS" == "1" ]] then log DSPLY "Starting one (1) single worker." else log DSPLY "Starting $MAX_NO_OF_RUNNING_JOBS parallel workers." fi if [[ "$DAEMON" == "0" ]] then log DSPLY "---------------------------------------------------------" elif [[ "$INOTIFY" == "1" ]] then return 0 fi i=0 while [[ "$i" -lt "$MAX_NO_OF_RUNNING_JOBS" ]] do start_new_worker log DEBUG "Starting worker $i" ((i++)) if [[ ! "$MAX_DELAY" == "0" ]] then random_delay "$MAX_DELAY" fi done } get_status_of_nodes () { RESULT_FILE="$1" FAILED=0 ssh -q $SSH_OPTS $SSH_KEY $USER@$SSH_SERVER cat "$PPSS_HOME_DIR/$PPSS_NODE_STATUS/*" > "$RESULT_FILE" 2>&1 if [[ ! "$?" == "0" ]] then log DSPLY "|P|P|S|S| has not been started yet on nodes." return 1 fi IFS=$'\n' for x in $(cat $RESULT_FILE) do IP=$(echo $x | awk '{ print $1 }') HOST=$(echo $x | awk '{ print $2 }') STATUS=$(echo $x | awk '{ print $3 }') RES=$(echo $x | awk '{ print $4 }') FAIL=$(echo $x | awk '{ print $5 }') if [[ -z "$RES" ]] then RES="0" fi PROCESSED=$((PROCESSED+RES)) FAILED=$((FAILED+FAIL)) LINE=$(echo "$IP $HOST $RES $FAIL $STATUS" | awk '{ printf ("%-16s %-16s % 8s %6s %7s\n",$1,$2,$3,$4,$5) }') log DSPLY "$LINE" done log DSPLY "---------------------------------------------------------" LINE=$(echo $PROCESSED $FAILED | awk '{ printf ("Total processed/failed: %18s %6s \n",$1,$2) }') log DSPLY "$LINE" rm "$RESULT_FILE" } show_status () { . $CONFIG if [[ ! -z "$SSH_KEY" ]] then SSH_KEY="-i $SSH_KEY" fi get_all_items ITEMS=$(wc -l $LISTOFITEMS | awk '{ print $1 }') if [[ ! -z "$ITEMS" && ! "$ITEMS" == "0" ]] then PROCESSED=$(exec_cmd "ls -1 $PPSS_HOME_DIR/$ITEM_LOCK_DIR 2>/dev/null | wc -l" 1) 2>&1 >> /dev/null check_status "$?" "Could not get number of processed items." TMP_STATUS=$((100 * $PROCESSED / $ITEMS)) log DSPLY "Status:\t\t$TMP_STATUS percent complete." else log DSPLY "Status: UNKNOWN - is PPSS deployed on nodes?" fi if [[ ! -z $NODES_FILE ]] then TMP_NO=$(cat $NODES_FILE | wc -l) log DSPLY "Nodes:\t $TMP_NO" fi log DSPLY "Items:\t\t$ITEMS" log DSPLY "---------------------------------------------------------" HEADER=$(echo IP-address Hostname Processed Failed Status | awk '{ printf ("%-16s %-15s % 2s %2s %2s\n",$1,$2,$3,$4,$5) }') log DSPLY "$HEADER" log DSPLY "---------------------------------------------------------" PROCESSED=0 get_status_of_nodes "RESULT_FILE" } main () { case $MODE in node ) create_working_directory test_server init_vars get_all_items listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & 2>&1 >> /dev/null LISTENER_PID=$! start_all_workers ;; start ) # This option only starts all nodes. LOGFILE=/dev/null display_header if [[ ! -e "$NODES_FILE" ]] then log ERROR "File $NODES with list of nodes does not exist." set_status "STOPPED" cleanup exit 1 else for NODE in $(cat $NODES_FILE) do start_ppss_on_node "$NODE" & done fi cleanup ;; config ) LOGFILE=/dev/null display_header log DSPLY "Generating configuration file $CONFIG" add_var_to_config PPSS_LOCAL_TMPDIR "$PPSS_LOCAL_TMPDIR" add_var_to_config PPSS_LOCAL_OUTPUT "$PPSS_LOCAL_OUTPUT" cleanup ;; stop ) LOGFILE=/dev/null display_header log DSPLY "Stopping PPSS on all nodes." test_server exec_cmd "touch $STOP_SIGNAL" cleanup ;; pause ) LOGFILE=/dev/null display_header log DSPLY "Pausing PPSS on all nodes." exec_cmd "touch $PAUSE_SIGNAL" cleanup ;; continue ) LOGFILE=/dev/null display_header if does_file_exist "$STOP_SIGNAL" then log DSPLY "Continuing processing, please use $0 start to start PPSS on al nodes." exec_cmd "rm -f $STOP_SIGNAL" fi if does_file_exist "$PAUSE_SIGNAL" then log DSPLY "Continuing PPSS on all nodes." exec_cmd "rm -f $PAUSE_SIGNAL" fi cleanup ;; deploy ) LOGFILE=ppss-deploy.txt if [[ -e "$LOGFILE" ]] then rm "$LOGFILE" fi init_ssh_server_socket display_header log DSPLY "Deploying PPSS on nodes. See ppss-deploy.txt for details." deploy_ppss wait cleanup ;; status ) LOGFILE=/dev/null display_header test_server show_status cleanup ;; erase ) LOGFILE=/dev/null display_header log DSPLY "Erasing PPSS from all nodes." erase_ppss cleanup ;; kill ) LOGFILE=/dev/null for x in $(ps ux | grep ppss | grep -v grep | grep bash | awk '{ print $2 }') do kill "$x" done cleanup ;; * ) create_working_directory display_header init_vars get_all_items listen_for_job "$MAX_NO_OF_RUNNING_JOBS" & 2>&1 >> /dev/null LISTENER_PID=$! start_all_workers ;; esac } # # PPSS can be sourced. This is mainly for testing purposes (unit tests). # if ! are_we_sourced then # # First step: process all command-line arguments. # process_arguments "$@" # # This command starts the that sets the whole framework in motion. # But only if the file is not sourced. # main # # Exit after all processes have finished. # #wait if [[ -e "$FIFO_LISTENER" ]] then while read event <& 43 do cleanup exit "$event" done fi fi