From 6836784b54e27f12a7b3739dafdd3d8a0fafc081 Mon Sep 17 00:00:00 2001 From: Louwrentius Date: Sun, 20 Jun 2010 21:22:19 +0000 Subject: [PATCH] Removed all file locking, except for remote item locking. --- ppss | 253 +++++++++++++++++++---------------------------------------- 1 file changed, 80 insertions(+), 173 deletions(-) diff --git a/ppss b/ppss index e67691b..423e0ab 100755 --- a/ppss +++ b/ppss @@ -64,7 +64,7 @@ PAUSE_DELAY="60" # Polling every 1 minu STOP_SIGNAL="$PPSS_HOME_DIR/$PPSS_DIR/stop_signal" # Stop processing if this file is present. GLOBAL_COUNTER="" GLOBAL_COUNTER_FILE="$PPSS_DIR/ppss-input-counter-$PID" -LOCAL_INPUT_FILE="$PPSS_DIR/INPUT_FILE-$PID" +LISTOFITEMS="$PPSS_DIR/INPUT_FILE-$PID" JOB_LOG_DIR="$PPSS_DIR/job_log" # Directory containing log files of processed items. LOGFILE="$PPSS_DIR/ppss-log-$PID.txt" # General PPSS log file. Contains lots of info. QUIET="0" @@ -76,6 +76,7 @@ LISTENER_PID="" IFS_BACKUP="$IFS" CPUINFO="/proc/cpuinfo" PROCESSORS="" +START_KEY="$RANDOM$RANDOM$RANDOM" STOP_KEY="$RANDOM$RANDOM$RANDOM" # This is a signal to the listener to stop. KILL_KEY="$RANDOM$RANDOM$RANDOM" # This is a signal to stop immediately and kill RECURSION="1" # all running processes. @@ -108,6 +109,7 @@ SECURE_COPY="1" # If set, use SCP, Otherwise, use cp. REMOTE_OUTPUT_DIR="" # Remote directory to which output must be uploaded. SCRIPT="" # Custom user script that is executed by ppss. ITEM_ESCAPED="" +DISABLE_SKIPPING=0 NODE_STATUS="$PPSS_DIR/status.txt" DAEMON=0 @@ -710,16 +712,15 @@ process_arguments () { then showusage_short echo - log ERROR "No source file or directory specified with -f or -d." - cleanup + echo "No source file or directory specified with -f or -d." exit 1 fi - if [ ! -e "$SRC_DIR" ] && [ -z "$MODE" ] + if [ ! -e "$SRC_DIR" ] && [ -z "$MODE" ] && [ -z "$INPUT_FILE" ] then showusage_short echo - log ERROR "Source directory $SRC_DIR does not exist." + echo "Source directory $SRC_DIR does not exist." exit 1 fi @@ -863,8 +864,6 @@ log () { fi } - -# Init all vars init_vars () { # @@ -1006,7 +1005,6 @@ set_status () { echo "$HOSTNAME $STATUS" > "$NODE_STATUS" } - check_status () { ERROR="$1" @@ -1362,52 +1360,6 @@ random_delay () { fi } - -global_lock () { - - mkdir $GLOBAL_LOCK > /dev/null 2>&1 - ERROR="$?" - - if [ ! "$ERROR" == "0" ] - then - return 1 - else - return 0 - fi -} - -get_global_lock () { - - while true - do - global_lock - ERROR="$?" - if [ ! "$ERROR" == "0" ] - then - #random_delay $MAX_LOCK_DELAY - continue - else - break - fi - done -} - -release_global_lock () { - - rm -rf "$GLOBAL_LOCK" -} - -are_jobs_running () { - - NUMBER_OF_PROCS=`jobs | wc -l` - if [ "$NUMBER_OF_PROCS" -gt "1" ] - then - return 0 - else - return 1 - fi -} - escape_item () { TMP="$1" @@ -1574,9 +1526,9 @@ get_all_items () { get_input_lock fi - if [ -e "$LOCAL_INPUT_FILE" ] && are_we_sourced + if [ -e "$LISTOFITEMS" ] && are_we_sourced then - rm "$LOCAL_INPUT_FILE" + rm "$LISTOFITEMS" fi count=0 @@ -1587,11 +1539,11 @@ get_all_items () { then if [ "$RECURSION" == "1" ] then - `exec_cmd "find $SRC_DIR/ ! -type d" > "$LOCAL_INPUT_FILE"` + `exec_cmd "find $SRC_DIR/ ! -type d" > "$LISTOFITEMS"` check_status "$?" "$FUNCNAME" "Could not list files within remote source directory." else log DEBUG "Recursion is disabled." - `exec_cmd "find $SRC_DIR/ -d 1 ! -type d" > "$LOCAL_INPUT_FILE"` + `exec_cmd "find $SRC_DIR/ -d 1 ! -type d" > "$LISTOFITEMS"` check_status "$?" "$FUNCNAME" "Could not list files within remote source directory." fi else @@ -1600,14 +1552,14 @@ get_all_items () { if [ "$RECURSION" == "1" ] then log DEBUG "Recursion is enabled." - `find "$SRC_DIR"/ ! -type d >> "$LOCAL_INPUT_FILE"` + `find "$SRC_DIR"/ ! -type d >> "$LISTOFITEMS"` check_status "$?" "$FUNCNAME" "Could not list files within local source directory." else log DEBUG "Recursion is disabled." - `find "$SRC_DIR"/ -d 1 ! -type d >> "$LOCAL_INPUT_FILE"` + `find "$SRC_DIR"/ -d 1 ! -type d >> "$LISTOFITEMS"` check_status "$?" "$FUNCNAME" "Could not list files within local source directory." fi - if [ ! -e "$LOCAL_INPUT_FILE" ] + if [ ! -e "$LISTOFITEMS" ] then log ERROR "Local input file is not created, something is wrong. Bug?" set_status "ERROR" @@ -1634,13 +1586,13 @@ get_all_items () { if [ ! "$INPUT_FILE" == "-" ] then - cp "$INPUT_FILE" "$LOCAL_INPUT_FILE" + cp "$INPUT_FILE" "$LISTOFITEMS" check_status "$?" "$FUNCNAME" "Copy of input file failed!" else log DEBUG "Reading from stdin.." while read LINE do - echo "$LINE" >> "$LOCAL_INPUT_FILE" + echo "$LINE" >> "$LISTOFITEMS" done fi fi @@ -1650,7 +1602,7 @@ get_all_items () { release_input_lock fi - SIZE_OF_INPUT=$(wc -l "$LOCAL_INPUT_FILE" | awk '{ print $1 }') + SIZE_OF_INPUT=$(wc -l "$LISTOFITEMS" | awk '{ print $1 }') if [ "$SIZE_OF_INPUT" -le "0" ] then log ERROR "Source file/dir seems to be empty." @@ -1658,6 +1610,7 @@ get_all_items () { cleanup exit 1 fi + } get_item () { @@ -1668,18 +1621,15 @@ get_item () { then return 1 fi - - get_global_lock - SIZE_OF_INPUT=$(wc -l "$LOCAL_INPUT_FILE" | awk '{ print $1 }') + SIZE_OF_INPUT=$(wc -l "$LISTOFITEMS" | awk '{ print $1 }') # # Return error if the list is empty. # - if [ "$SIZE_OF_INPUT" -le "0" ] - then - release_global_lock - return 1 - fi + #if [ "$SIZE_OF_INPUT" -le "0" ] + #then + # return 1 + #fi # # This variable is used to walk thtough all input file items. @@ -1689,21 +1639,17 @@ get_item () { # # Check if all items have been processed. # - if [ "$GLOBAL_COUNTER" -gt "$SIZE_OF_INPUT" ] - then - release_global_lock - return 1 - fi + #if [ "$GLOBAL_COUNTER" -gt "$SIZE_OF_INPUT" ] + #then + # return 1 + #fi - ITEM="$(sed -n $GLOBAL_COUNTER\p $LOCAL_INPUT_FILE)" + ITEM="$(sed -n $GLOBAL_COUNTER\p $LISTOFITEMS)" if [ -z "$ITEM" ] then - ((GLOBAL_COUNTER++)) log DEBUG "Item was emtpy..." - echo $GLOBAL_COUNTER > $GLOBAL_COUNTER_FILE - release_global_lock - get_item + return 1 else ((GLOBAL_COUNTER++)) echo $GLOBAL_COUNTER > $GLOBAL_COUNTER_FILE @@ -1713,20 +1659,18 @@ get_item () { lock_item "$ITEM" LOCK="$?" else - log DEBUG "Item lock disabled." + log DEBUG "Item lock disabled - no remote server configured." LOCK=0 fi if [ ! "$LOCK" = "0" ] then log DEBUG "Item $ITEM is locked." - release_global_lock # # Recursion, get_ttem calls itself, until all items are done. # get_item else log DEBUG "Got lock on $ITEM, processing." - release_global_lock download_item "$ITEM" return 0 fi @@ -1736,27 +1680,12 @@ get_item () { start_single_worker () { # - # This function sends an item to the fifo. This signals - # the listener process to execute a 'worker' on this - # item, using the 'commando' function. + # This function kicks the listener to start a worker process. # - get_item - ERROR=$? - if [ ! "$ERROR" == "0" ] + if ! are_we_sourced then - # - # If no more items are available, the listener should be - # informed that a worker just finished / died. - # This allows the listener to determine if all processes - # are finished and it is time to stop. - # - echo "$STOP_KEY" > $FIFO - return 1 - else - get_global_lock - echo "$ITEM" > $FIFO - release_global_lock - return 0 + echo "$START_KEY" >> "$FIFO" + return $? fi } @@ -1766,7 +1695,6 @@ stop-ppss () { elapsed "$START_PPSS" "$STOP_PPSS" } - elapsed () { BEFORE="$1" @@ -1825,8 +1753,6 @@ commando () { # Therefore, the output directory must reflect the original directory # structure. If recursion is not used, this is not necessary. # - - if [ "$ERR_STATE" == "0" ] then VIRTUAL="0" @@ -1850,17 +1776,6 @@ commando () { OUTPUT_FILE="$ITEM_NO_PATH" - # - # The following lines should only be enabled for debugging. - # - #log DEBUG "Processing item: $ITEM" - #log DEBUG "ITEM_NO_PATH is $ITEM_NO_PATH" - #log DEBUG "Dirname is $DIR_NAME" - #log DEBUG "OUTPUT DIR IS $OUTPUT_DIR" - #log DEBUG "Virtual is $VIRTUAL" - #log DEBUG "OUTPUT FILE is $OUTPUT_FILE" - # - # # Decide if an item must be transfered from server to the node. # or be processed in-place (NFS / SMB mount?) @@ -1895,6 +1810,13 @@ commando () { LOG_FILE_NAME=`echo "$ITEM" | $MD5 | awk '{ print $1 }'` ITEM_LOG_FILE="$JOB_LOG_DIR/$LOG_FILE_NAME" + if [ -e "$ITEM_LOG_FILE" ] && [ "$DISABLE_SKIPPING" = "0" ] + then + log DEBUG "Item is already processed, skipping..." + start_single_worker + return 0 + fi + # # Create the output directory that will contain the output of the command. # Example: When converting wav to mp3, the mp3 will be put in this directory. @@ -1941,9 +1863,11 @@ commando () { # the -c option. # BEFORE=`get_time_in_seconds` - TMP=`echo $COMMAND | grep -i '$ITEM'` - if [ "$?" ] + `echo $COMMAND | grep -i '$ITEM' >> /dev/null 2>&1` + RETVAL="$?" + if [ "$RETVAL" = "0" ] then + echo "$TMP - $RETVAL" >> /tmp/hoeba eval "$COMMAND" >> "$ITEM_LOG_FILE" 2>&1 ERROR="$?" MYPID="$!" @@ -1998,7 +1922,6 @@ commando () { # # Upload the output file back to the server. # - upload_item "$OUTPUT_DIR" "$DIR_NAME" # @@ -2018,61 +1941,55 @@ commando () { fi fi - if ! are_we_sourced - then - start_single_worker - fi - return $? + start_single_worker } -# -# This is the listener service. It listens on the pipe for events. -# A job is executed for every event received. -# This listener enables fully asynchronous processing. -# listen_for_job () { + FINISHED=0 DIED=0 PIDS="" log DEBUG "Listener started." while read event <& 42 do - if [ "$event" == "$STOP_KEY" ] + if [ "$event" = "$START_KEY" ] then - # - # The start_single_worker method sends a special signal to - # inform the listener that a worker is finished. - # If all workers are finished, it is time to stop. - # This mechanism makes PPSS asynchronous. - # - ((DIED++)) - if [ "$DIED" -ge "$MAX_NO_OF_RUNNING_JOBS" ] + if get_item then - if [ "$DAEMON" == "1" ] - then - # - # In daemon mode, start all over again. - # - DIED=0 - get_all_items - log DEBUG "Found $SIZE_OF_INPUT items." - start_all_workers - sleep 10 - else - break - fi + commando "$ITEM" & + MYPID="$!" + disown + PIDS="$PIDS $MYPID" else - RES=$((MAX_NO_OF_RUNNING_JOBS-DIED)) - if [ "$RES" == "1" ] && [ "$QUIET" == "0" ] + ((DIED++)) + if [ "$DIED" -ge "$MAX_NO_OF_RUNNING_JOBS" ] then - log PRCNT "$((MAX_NO_OF_RUNNING_JOBS-DIED)) job is remaining. " - elif [ "$QUIET" == "0" ] - then - if [ "$DIED" == "1" ] + if [ "$DAEMON" == "1" ] then - echo -en "\n" + # + # In daemon mode, start all over again. + # + DIED=0 + get_all_items + log DEBUG "Found $SIZE_OF_INPUT items." + start_all_workers + sleep 10 + else + break + fi + else + RES=$((MAX_NO_OF_RUNNING_JOBS-DIED)) + if [ "$RES" == "1" ] && [ "$QUIET" == "0" ] + then + log PRCNT "$((MAX_NO_OF_RUNNING_JOBS-DIED)) job is remaining. " + elif [ "$QUIET" == "0" ] + then + if [ "$DIED" == "1" ] + then + echo -en "\n" + fi + log PRCNT "$((MAX_NO_OF_RUNNING_JOBS-DIED)) jobs are remaining. " fi - log PRCNT "$((MAX_NO_OF_RUNNING_JOBS-DIED)) jobs are remaining. " fi fi elif [ "$event" == "$KILL_KEY" ] @@ -2107,17 +2024,10 @@ listen_for_job () { done IFS=$oldIFS break - else - commando "$event" & - MYPID="$!" - disown - PIDS="$PIDS $MYPID" fi - get_global_lock - SIZE_OF_INPUT=$(wc -l "$LOCAL_INPUT_FILE" | awk '{ print $1 }') + SIZE_OF_INPUT=$(wc -l "$LISTOFITEMS" | awk '{ print $1 }') GLOBAL_COUNTER=$(cat $GLOBAL_COUNTER_FILE) - release_global_lock PERCENT=$((100 * $GLOBAL_COUNTER / $SIZE_OF_INPUT )) if [ "$DIED" == "0" ] && [ "$FINISHED" == "0" ] then @@ -2168,7 +2078,6 @@ listen_for_job () { cleanup } -# This starts an number of parallel workers based on the # of parallel jobs allowed. start_all_workers () { if [ "$MAX_NO_OF_RUNNING_JOBS" == "1" ] @@ -2217,7 +2126,7 @@ show_status () { get_all_items - ITEMS=`wc -l $LOCAL_INPUT_FILE | awk '{ print $1 }'` + ITEMS=`wc -l $LISTOFITEMS | awk '{ print $1 }'` if [ ! -z "$ITEMS" ] && [ ! "$ITEMS" == "0" ] then @@ -2266,8 +2175,6 @@ show_status () { log DSPLY "$LINE" } - -# If this is called, the whole framework will execute. main () { case $MODE in