Class | Delayed::Worker |
In: |
lib/delayed/worker.rb
|
Parent: | Object |
name_prefix | [RW] | name_prefix is ignored if name is set directly |
# File lib/delayed/worker.rb, line 28 28: def self.backend=(backend) 29: if backend.is_a? Symbol 30: require "delayed/backend/#{backend}" 31: backend = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize 32: end 33: @@backend = backend 34: silence_warnings { ::Delayed.const_set(:Job, backend) } 35: end
# File lib/delayed/worker.rb, line 37 37: def self.guess_backend 38: self.backend ||= if defined?(ActiveRecord) 39: :active_record 40: elsif defined?(MongoMapper) 41: :mongo_mapper 42: else 43: logger.warn "Could not decide on a backend, defaulting to active_record" 44: :active_record 45: end 46: end
# File lib/delayed/worker.rb, line 48 48: def initialize(options={}) 49: @quiet = options[:quiet] 50: self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority) 51: self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority) 52: self.class.sleep_delay = options[:sleep_delay] if options.has_key?(:sleep_delay) 53: end
# File lib/delayed/worker.rb, line 157 157: def max_attempts(job) 158: job.max_attempts || self.class.max_attempts 159: end
Every worker has a unique name which by default is the pid of the process. There are some advantages to overriding this with something which survives worker retarts: Workers can# safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
# File lib/delayed/worker.rb, line 59 59: def name 60: return @name unless @name.nil? 61: "#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}" 62: end
Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.
# File lib/delayed/worker.rb, line 134 134: def reschedule(job, time = nil) 135: if (job.attempts += 1) < max_attempts(job) 136: job.run_at = time || job.reschedule_at 137: job.unlock 138: job.save! 139: else 140: say "PERMANENTLY removing #{job.name} because of #{job.attempts} consecutive failures.", Logger::INFO 141: 142: if job.payload_object.respond_to? :on_permanent_failure 143: say "Running on_permanent_failure hook" 144: job.payload_object.on_permanent_failure 145: end 146: 147: self.class.destroy_failed_jobs ? job.destroy : job.update_attributes(:failed_at => Delayed::Job.db_time_now) 148: end 149: end
# File lib/delayed/worker.rb, line 120 120: def run(job) 121: runtime = Benchmark.realtime do 122: Timeout.timeout(self.class.max_run_time.to_i) { job.invoke_job } 123: job.destroy 124: end 125: say "#{job.name} completed after %.4f" % runtime 126: return true # did work 127: rescue Exception => e 128: handle_failed_job(job, e) 129: return false # work failed 130: end
# File lib/delayed/worker.rb, line 151 151: def say(text, level = Logger::INFO) 152: text = "[Worker(#{name})] #{text}" 153: puts text unless @quiet 154: logger.add level, "#{Time.now.strftime('%FT%T%z')}: #{text}" if logger 155: end
# File lib/delayed/worker.rb, line 70 70: def start 71: say "Starting job worker" 72: 73: trap('TERM') { say 'Exiting...'; $exit = true } 74: trap('INT') { say 'Exiting...'; $exit = true } 75: 76: loop do 77: result = nil 78: 79: realtime = Benchmark.realtime do 80: result = work_off 81: end 82: 83: count = result.sum 84: 85: break if $exit 86: 87: if count.zero? 88: sleep(self.class.sleep_delay) 89: else 90: say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last] 91: end 92: 93: break if $exit 94: end 95: 96: ensure 97: Delayed::Job.clear_locks!(name) 98: end
Do num jobs and return stats on success/failure. Exit early if interrupted.
# File lib/delayed/worker.rb, line 102 102: def work_off(num = 100) 103: success, failure = 0, 0 104: 105: num.times do 106: case reserve_and_run_one_job 107: when true 108: success += 1 109: when false 110: failure += 1 111: else 112: break # leave if no work could be done 113: end 114: break if $exit # leave if we're exiting 115: end 116: 117: return [success, failure] 118: end
# File lib/delayed/worker.rb, line 163 163: def handle_failed_job(job, error) 164: job.last_error = error.message + "\n" + error.backtrace.join("\n") 165: say "#{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts", Logger::ERROR 166: reschedule(job) 167: end