Class | Delayed::Backend::MongoMapper::Job |
In: |
lib/delayed/backend/mongo_mapper.rb
|
Parent: | Object |
# File lib/delayed/backend/mongo_mapper.rb, line 45 45: def self.after_fork 46: ::MongoMapper.connect(RAILS_ENV) 47: end
# File lib/delayed/backend/mongo_mapper.rb, line 41 41: def self.before_fork 42: ::MongoMapper.connection.close 43: end
When a worker is exiting, make sure we don‘t have any locked jobs.
# File lib/delayed/backend/mongo_mapper.rb, line 74 74: def self.clear_locks!(worker_name) 75: collection.update({:locked_by => worker_name}, {"$set" => {:locked_at => nil, :locked_by => nil}}, :multi => true) 76: end
# File lib/delayed/backend/mongo_mapper.rb, line 49 49: def self.db_time_now 50: Time.now.utc 51: end
# File lib/delayed/backend/mongo_mapper.rb, line 53 53: def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) 54: right_now = db_time_now 55: 56: conditions = { 57: :run_at => {"$lte" => right_now}, 58: :limit => -limit, # In mongo, positive limits are 'soft' and negative are 'hard' 59: :failed_at => nil, 60: :sort => [['priority', 1], ['run_at', 1]] 61: } 62: 63: where = "this.locked_at == null || this.locked_at < #{make_date(right_now - max_run_time)}" 64: 65: (conditions[:priority] ||= {})['$gte'] = Worker.min_priority.to_i if Worker.min_priority 66: (conditions[:priority] ||= {})['$lte'] = Worker.max_priority.to_i if Worker.max_priority 67: 68: results = all(conditions.merge(:locked_by => worker_name)) 69: results += all(conditions.merge('$where' => where)) if results.size < limit 70: results 71: end
Lock this job for this worker. Returns true if we have the lock, false otherwise.
# File lib/delayed/backend/mongo_mapper.rb, line 80 80: def lock_exclusively!(max_run_time, worker = worker_name) 81: right_now = self.class.db_time_now 82: overtime = right_now - max_run_time.to_i 83: 84: query = "this.locked_at == null || this.locked_at < #{make_date(overtime)} || this.locked_by == #{worker.to_json}" 85: conditions = {:_id => id, :run_at => {"$lte" => right_now}, "$where" => query} 86: 87: collection.update(conditions, {"$set" => {:locked_at => right_now, :locked_by => worker}}) 88: affected_rows = collection.find({:_id => id, :locked_by => worker}).count 89: if affected_rows == 1 90: self.locked_at = right_now 91: self.locked_by = worker 92: return true 93: else 94: return false 95: end 96: end