Class Delayed::Worker
In: lib/delayed/worker.rb
Parent: Object

Methods

Attributes

name_prefix  [RW]  name_prefix is ignored if name is set directly

Public Class methods

[Source]

    # File lib/delayed/worker.rb, line 28
28:     def self.backend=(backend)
29:       if backend.is_a? Symbol
30:         require "delayed/backend/#{backend}"
31:         backend = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize
32:       end
33:       @@backend = backend
34:       silence_warnings { ::Delayed.const_set(:Job, backend) }
35:     end

[Source]

    # File lib/delayed/worker.rb, line 37
37:     def self.guess_backend
38:       self.backend ||= if defined?(ActiveRecord)
39:         :active_record
40:       elsif defined?(MongoMapper)
41:         :mongo_mapper
42:       else
43:         logger.warn "Could not decide on a backend, defaulting to active_record"
44:         :active_record
45:       end
46:     end

[Source]

    # File lib/delayed/worker.rb, line 48
48:     def initialize(options={})
49:       @quiet = options[:quiet]
50:       self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority)
51:       self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority)
52:       self.class.sleep_delay = options[:sleep_delay] if options.has_key?(:sleep_delay)
53:     end

Public Instance methods

[Source]

     # File lib/delayed/worker.rb, line 157
157:     def max_attempts(job)
158:       job.max_attempts || self.class.max_attempts
159:     end

Every worker has a unique name which by default is the pid of the process. There are some advantages to overriding this with something which survives worker retarts: Workers can# safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.

[Source]

    # File lib/delayed/worker.rb, line 59
59:     def name
60:       return @name unless @name.nil?
61:       "#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}"
62:     end

Sets the name of the worker. Setting the name to nil will reset the default worker name

[Source]

    # File lib/delayed/worker.rb, line 66
66:     def name=(val)
67:       @name = val
68:     end

Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.

[Source]

     # File lib/delayed/worker.rb, line 134
134:     def reschedule(job, time = nil)
135:       if (job.attempts += 1) < max_attempts(job)
136:         job.run_at = time || job.reschedule_at
137:         job.unlock
138:         job.save!
139:       else
140:         say "PERMANENTLY removing #{job.name} because of #{job.attempts} consecutive failures.", Logger::INFO
141: 
142:         if job.payload_object.respond_to? :on_permanent_failure
143:           say "Running on_permanent_failure hook"
144:           job.payload_object.on_permanent_failure
145:         end
146: 
147:         self.class.destroy_failed_jobs ? job.destroy : job.update_attributes(:failed_at => Delayed::Job.db_time_now)
148:       end
149:     end

[Source]

     # File lib/delayed/worker.rb, line 120
120:     def run(job)
121:       runtime =  Benchmark.realtime do
122:         Timeout.timeout(self.class.max_run_time.to_i) { job.invoke_job }
123:         job.destroy
124:       end
125:       say "#{job.name} completed after %.4f" % runtime
126:       return true  # did work
127:     rescue Exception => e
128:       handle_failed_job(job, e)
129:       return false  # work failed
130:     end

[Source]

     # File lib/delayed/worker.rb, line 151
151:     def say(text, level = Logger::INFO)
152:       text = "[Worker(#{name})] #{text}"
153:       puts text unless @quiet
154:       logger.add level, "#{Time.now.strftime('%FT%T%z')}: #{text}" if logger
155:     end

[Source]

    # File lib/delayed/worker.rb, line 70
70:     def start
71:       say "Starting job worker"
72: 
73:       trap('TERM') { say 'Exiting...'; $exit = true }
74:       trap('INT')  { say 'Exiting...'; $exit = true }
75: 
76:       loop do
77:         result = nil
78: 
79:         realtime = Benchmark.realtime do
80:           result = work_off
81:         end
82: 
83:         count = result.sum
84: 
85:         break if $exit
86: 
87:         if count.zero?
88:           sleep(self.class.sleep_delay)
89:         else
90:           say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
91:         end
92: 
93:         break if $exit
94:       end
95: 
96:     ensure
97:       Delayed::Job.clear_locks!(name)
98:     end

Do num jobs and return stats on success/failure. Exit early if interrupted.

[Source]

     # File lib/delayed/worker.rb, line 102
102:     def work_off(num = 100)
103:       success, failure = 0, 0
104: 
105:       num.times do
106:         case reserve_and_run_one_job
107:         when true
108:             success += 1
109:         when false
110:             failure += 1
111:         else
112:           break  # leave if no work could be done
113:         end
114:         break if $exit # leave if we're exiting
115:       end
116: 
117:       return [success, failure]
118:     end

Protected Instance methods

[Source]

     # File lib/delayed/worker.rb, line 163
163:     def handle_failed_job(job, error)
164:       job.last_error = error.message + "\n" + error.backtrace.join("\n")
165:       say "#{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts", Logger::ERROR
166:       reschedule(job)
167:     end

Run the next job we can get an exclusive lock on. If no jobs are left we return nil

[Source]

     # File lib/delayed/worker.rb, line 171
171:     def reserve_and_run_one_job
172:       job = Delayed::Job.reserve(self)
173:       run(job) if job
174:     end

[Validate]