Class | Delayed::Backend::DataMapper::Job |
In: |
lib/delayed/backend/data_mapper.rb
|
Parent: | Object |
When a worker is exiting, make sure we don‘t have any locked jobs.
# File lib/delayed/backend/data_mapper.rb, line 65 65: def self.clear_locks!(worker_name) 66: all(:locked_by => worker_name).update(:locked_at => nil, :locked_by => nil) 67: end
these are common to the other backends, so we provide an implementation
# File lib/delayed/backend/data_mapper.rb, line 96 96: def self.delete_all 97: Delayed::Job.auto_migrate! 98: end
# File lib/delayed/backend/data_mapper.rb, line 44 44: def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) 45: 46: simple_conditions = { :run_at.lte => db_time_now, :limit => limit, :failed_at => nil, :order => [:priority.asc, :run_at.asc] } 47: 48: # respect priorities 49: simple_conditions[:priority.gte] = Worker.min_priority if Worker.min_priority 50: simple_conditions[:priority.lte] = Worker.max_priority if Worker.max_priority 51: 52: # lockable 53: lockable = ( 54: # not locked or past the max time 55: ( all(:locked_at => nil ) | all(:locked_at.lt => db_time_now - max_run_time)) | 56: 57: # OR locked by our worker 58: all(:locked_by => worker_name)) 59: 60: # plus some other boring junk 61: (lockable).all( simple_conditions ) 62: end
Lock this job for this worker. Returns true if we have the lock, false otherwise.
# File lib/delayed/backend/data_mapper.rb, line 71 71: def lock_exclusively!(max_run_time, worker = worker_name) 72: 73: now = self.class.db_time_now 74: overtime = now - max_run_time 75: 76: # FIXME - this is a bit gross 77: # DM doesn't give us the number of rows affected by a collection update 78: # so we have to circumvent some niceness in DM::Collection here 79: collection = locked_by != worker ? 80: (self.class.all(:id => id, :run_at.lte => now) & ( self.class.all(:locked_at => nil) | self.class.all(:locked_at.lt => overtime) ) ) : 81: self.class.all(:id => id, :locked_by => worker) 82: 83: attributes = collection.model.new(:locked_at => now, :locked_by => worker).dirty_attributes 84: affected_rows = self.repository.update(attributes, collection) 85: 86: if affected_rows == 1 87: self.locked_at = now 88: self.locked_by = worker 89: return true 90: else 91: return false 92: end 93: end