Commit 79a79d65 authored by Jonne Haß's avatar Jonne Haß

Bye Resque. Ohai Sidekiq.

* Dropped all references to Resque
* Moved all jobs under app/workers since that's the Sidekiq convention
* Renamed Jobs module to Worker to match new location
* Adapted all jobs to Sidekiq
* Replaced all enqueue calls with perform_async
* Dropped Resque hacks from specs and features, replaced with
  sidekig/testing in RSpec and sidekig/testing/inline in Cucumber
* Updated scripts to start a Sidekiq server
* Inline Sidekiq sinatra app
* Let Sidekiq create the actual Redis instance
* Workaround already initialized constant warnings in service models
* Resolved ToDo in one job definition by creating proper exception clases
  for some errors in receiving posts
* Added sidekiq section to configuration to make it completly
  configurable to the user
* Add Sidekiq middleware for clean backtraces
* Delay HttpMulti retry to give offline pods a chance to come back up
* Do not retry on GUID already taken and alike errors
* Be graceful about deleted posts in GatherOEmbedData
parent 3fc3b249
port: 3000
formation: web=1,worker=0
formation: web=1,sidekiq=0
# Head
## Refactor
### Replaced Resque with Sidekiq - Migration guide - [#3993](https://github.com/diaspora/diaspora/pull/3993)
We replaced our queue system with Sidekiq. You might know that Resque needs Redis.
Sidekiq does too, so don't remove it, it's still required. Sidekiq uses a threaded
model so you'll need far less processes than with Resque to do the same amount
of work.
To update do the following:
1. Before updating (even before the `git pull`!) stop your application
server (Unicorn by default, started through Foreman).
2. In case you did already run `git pull` checkout v0.0.3.2:
```
git fetch origin
git checkout v0.0.3.2
bundle
```
3. Start Resque web (you'll need temporary access to port 5678, check
your Firewall if needed!):
```
bundle exec resque-web
```
In case you need it you can adjust the port with the `-p` flag.
4. One last time, start a Resque worker:
```
RAILS_ENV=production QUEUE=* bundle exec rake resque:work
```
Visit Resque web via http://your_host:5678, wait until all queues but the
failed one are empty (show 0 jobs).
5. Kill the Resque worker by hitting Ctrl+C. Kill Resque web with:
```
bundle exec resque-web -k
```
Don't forget to close the port on the Firewall again, if you had to open it.
6. In case you needed to do step 2., run:
```
git checkout master
bundle
```
7. Proceed with the update as normal (migrate database, precompile assets).
8. Before starting Diaspora again ensure that you reviewed the new
`environment.sidekiq` section in `config/diaspora.yml.example` and,
if wanted, transfered it to your `config/diaspora.yml` and made any
needed changes. In particular increase the `environment.sidekiq.concurrency`
setting on any medium sized pod. If you do change that value, edit
your `config/database.yml` and add a matching `pool: n` to your database
configuration. n should be equal or higher than the amount of
threads per Sidekiq worker. This sets how many concurrent
connections to the database ActiveRecord allows.
If you aren't using `script/server` but for example passenger, you no
longer need to start a Resque worker, but a Sidekiq worker now. The
command for that is:
```
bundle exec sidekiq
```
#### Heroku
The only gotcha for Heroku single gear setups is that the setting name
to spawn a background worker from the unicorn process changed. Run
```
heroku config:remove SERVER_EMBED_RESQUE_WORKER
heroku config:set SERVER_EMBED_SIDEKIQ_WORKER=true
```
We're automatically adjusting the ActiveRecord connection pool size for you.
Larger Heroku setups should have enough expertise to figure out what to do
by them self.
### Other
* Cleaned up requires of our own libraries [#3993](https://github.com/diaspora/diaspora/pull/3993)
* Refactor people_controller#show and photos_controller#index [#4002](https://github.com/diaspora/diaspora/issues/4002)
* Modularize layout [#3944](https://github.com/diaspora/diaspora/pull/3944)
* Add header to the sign up page [#3944](https://github.com/diaspora/diaspora/pull/3944)
* Add a configuration entry to set max-age header to Amazon S3 resources. [#4048](https://github.com/diaspora/diaspora/pull/4048)
* Load images via sprites [#4039](https://github.com/diaspora/diaspora/pull/4039)
* Delete unnecessary javascript views. [#4059](https://github.com/diaspora/diaspora/pull/4059)
## Bug fixes
* reset comment box height after posting a comment. [#4030](https://github.com/diaspora/diaspora/issues/4030)
......@@ -13,15 +110,6 @@
* Fix mobile view of deleted reshares. [#4063](https://github.com/diaspora/diaspora/issues/4063)
* Hide comment button in the mobile view when not signed in. [#4065](https://github.com/diaspora/diaspora/issues/4065)
## Refactor
* Delete unnecessary javascript views. [#4059] (https://github.com/diaspora/diaspora/pull/4059)
* Add a configuration entry to set max-age header to Amazon S3 resources. [#4048](https://github.com/diaspora/diaspora/pull/4048)
* Refactor people_controller#show and photos_controller#index [#4002](https://github.com/diaspora/diaspora/issues/4002)
* Modularize layout [#3944](https://github.com/diaspora/diaspora/pull/3944)
* Add header to the sign up page [#3944](https://github.com/diaspora/diaspora/pull/3944)
* Load images via sprites [#4039](https://github.com/diaspora/diaspora/pull/4039)
## Features
* Deleting a post that was shared to Facebook now deletes it from Facebook too [#3980]( https://github.com/diaspora/diaspora/pull/3980)
......
......@@ -17,8 +17,9 @@ gem 'devise', '2.1.3'
# Background processing
gem 'resque', '1.23.0'
gem 'resque-timeout', '1.0.0'
gem 'sidekiq', '2.7.5'
gem 'sinatra', '1.3.3'
gem 'slim', '1.3.6'
# Configuration
......
......@@ -62,6 +62,9 @@ GEM
carrierwave (0.8.0)
activemodel (>= 3.2.0)
activesupport (>= 3.2.0)
celluloid (0.12.4)
facter (>= 1.6.12)
timers (>= 1.0.0)
childprocess (0.3.9)
ffi (~> 1.0, >= 1.0.11)
chunky_png (1.2.7)
......@@ -81,6 +84,7 @@ GEM
compass-rails (1.0.3)
compass (>= 0.12.2, < 0.14)
configurate (0.0.2)
connection_pool (1.0.0)
crack (0.3.2)
cucumber (1.2.3)
builder (>= 2.1.2)
......@@ -103,6 +107,7 @@ GEM
excon (0.20.1)
execjs (1.4.0)
multi_json (~> 1.0)
facter (1.6.17)
factory_girl (4.2.0)
activesupport (>= 3.0.0)
factory_girl_rails (4.2.1)
......@@ -321,13 +326,6 @@ GEM
redis-namespace (1.2.1)
redis (~> 3.0.0)
remotipart (1.0.5)
resque (1.23.0)
multi_json (~> 1.0)
redis-namespace (~> 1.0)
sinatra (>= 0.9.2)
vegas (~> 0.1.2)
resque-timeout (1.0.0)
resque (~> 1.0)
rmagick (2.13.2)
roxml (3.1.6)
activesupport (>= 2.3.0)
......@@ -363,11 +361,20 @@ GEM
multi_json (~> 1.0)
rubyzip
websocket (~> 1.0.4)
sidekiq (2.7.5)
celluloid (~> 0.12.0)
connection_pool (~> 1.0)
multi_json (~> 1)
redis (~> 3)
redis-namespace
simple_oauth (0.2.0)
sinatra (1.3.3)
rack (~> 1.3, >= 1.3.6)
rack-protection (~> 1.2)
tilt (~> 1.3, >= 1.3.3)
slim (1.3.6)
temple (~> 0.5.5)
tilt (~> 1.3.3)
slop (3.4.4)
spork (1.0.0rc3)
sprockets (2.2.2)
......@@ -376,10 +383,12 @@ GEM
rack (~> 1.0)
tilt (~> 1.1, != 1.3.0)
subexec (0.2.2)
temple (0.5.5)
terminal-table (1.4.5)
thor (0.17.0)
tilt (1.3.6)
timecop (0.6.1)
timers (1.1.0)
treetop (1.4.12)
polyglot
polyglot (>= 0.3.1)
......@@ -397,8 +406,6 @@ GEM
kgio (~> 2.6)
rack
raindrops (~> 0.7)
vegas (0.1.11)
rack (>= 1.0.0)
warden (1.2.1)
rack (>= 1.0)
webmock (1.8.11)
......@@ -473,8 +480,6 @@ DEPENDENCIES
rb-inotify (= 0.9.0)
redcarpet (= 2.2.2)
remotipart (= 1.0.5)
resque (= 1.23.0)
resque-timeout (= 1.0.0)
rmagick (= 2.13.2)
roxml (= 3.1.6)
rspec-instafail (= 0.2.4)
......@@ -482,6 +487,9 @@ DEPENDENCIES
ruby-oembed (= 0.8.8)
sass-rails (= 3.2.6)
selenium-webdriver (= 2.31.0)
sidekiq (= 2.7.5)
sinatra (= 1.3.3)
slim (= 1.3.6)
spork (= 1.0.0rc3)
timecop (= 0.6.1)
twitter (= 4.6.2)
......
web: bundle exec unicorn_rails -c config/unicorn.rb -p $PORT
worker: env QUEUE=* bundle exec rake resque:work
sidekiq: bundle exec sidekiq
......@@ -7,7 +7,6 @@
require File.expand_path('../config/application', __FILE__)
require 'rake'
require 'resque/tasks'
# for rake 0.9.0
module Diaspora
......
......@@ -49,7 +49,7 @@ class PublicsController < ApplicationController
def receive_public
FEDERATION_LOGGER.info("recieved a public message")
Resque.enqueue(Jobs::ReceiveUnencryptedSalmon, CGI::unescape(params[:xml]))
Workers::ReceiveUnencryptedSalmon.perform_async(CGI::unescape(params[:xml]))
render :nothing => true, :status => :ok
end
......@@ -65,7 +65,7 @@ class PublicsController < ApplicationController
@user = person.owner
FEDERATION_LOGGER.info("recieved a private message for user:#{@user.id}")
Resque.enqueue(Jobs::ReceiveEncryptedSalmon, @user.id, CGI::unescape(params[:xml]))
Workers::ReceiveEncryptedSalmon.perform_async(@user.id, CGI::unescape(params[:xml]))
render :nothing => true, :status => 202
end
......
......@@ -36,7 +36,7 @@ class ServicesController < ApplicationController
fetch_photo = current_user.profile[:image_url].blank?
current_user.update_profile(current_user.profile.from_omniauth_hash(user))
Resque.enqueue(Jobs::FetchProfilePhoto, current_user.id, service.id, user["image"]) if fetch_photo
Workers::FetchProfilePhoto.perform_async(current_user.id, service.id, user["image"]) if fetch_photo
flash[:notice] = I18n.t 'services.create.success'
else
......
......@@ -26,7 +26,7 @@ class AccountDeletion < ActiveRecord::Base
end
def queue_delete_account
Resque.enqueue(Jobs::DeleteAccount, self.id)
Workers::DeleteAccount.perform_async(self.id)
end
def perform!
......
# Copyright (c) 2010-2011, Diaspora Inc. This file is
# licensed under the Affero General Public License version 3 or later. See
# the COPYRIGHT file.
module Jobs
class Base
Dir[Rails.root.join('app', 'models', 'jobs', 'mail', '*.rb')].each {|file| require file }
#TODO these should be subclassed real exceptions
DUMB_ERROR_MESSAGES = [
"Contact required unless request",
"Relayable object, but no parent object found" ]
def self.suppress_annoying_errors(&block)
begin
yield
rescue => e
Rails.logger.info("error in job: #{e.message}")
unless DUMB_ERROR_MESSAGES.include?(e.message)
raise e
end
end
end
end
end
\ No newline at end of file
class Notifications::AlsoCommented < Notification
def mail_job
Jobs::Mail::AlsoCommented
Workers::Mail::AlsoCommented
end
def popup_translation_key
......
class Notifications::CommentOnPost < Notification
def mail_job
Jobs::Mail::CommentOnPost
Workers::Mail::CommentOnPost
end
def popup_translation_key
......
class Notifications::Liked < Notification
def mail_job
Jobs::Mail::Liked
Workers::Mail::Liked
end
def popup_translation_key
......
class Notifications::Mentioned < Notification
def mail_job
Jobs::Mail::Mentioned
Workers::Mail::Mentioned
end
def popup_translation_key
......
class Notifications::PrivateMessage < Notification
def mail_job
Jobs::Mail::PrivateMessage
Workers::Mail::PrivateMessage
end
def popup_translation_key
'notifications.private_message'
......
class Notifications::RequestAccepted < Notification
def mail_job
Jobs::Mail::RequestAcceptance
Workers::Mail::RequestAcceptance
end
def popup_translation_key
'notifications.request_accepted'
......
class Notifications::Reshared < Notification
def mail_job
Jobs::Mail::Reshared
#Jobs::Mail::Liked
Workers::Mail::Reshared
end
def popup_translation_key
......
class Notifications::StartedSharing < Notification
def mail_job
Jobs::Mail::StartedSharing
Workers::Mail::StartedSharing
end
def popup_translation_key
......
......@@ -127,7 +127,7 @@ class Photo < ActiveRecord::Base
end
def queue_processing_job
Resque.enqueue(Jobs::ProcessPhoto, self.id)
Workers::ProcessPhoto.perform_async(self.id)
end
def mutable?
......
......@@ -155,7 +155,7 @@ class StatusMessage < Post
end
def queue_gather_oembed_data
Resque.enqueue(Jobs::GatherOEmbedData, self.id, self.oembed_url)
Workers::GatherOEmbedData.perform_async(self.id, self.oembed_url)
end
def contains_oembed_url_in_text?
......
......@@ -164,7 +164,7 @@ class User < ActiveRecord::Base
def send_reset_password_instructions
generate_reset_password_token! if should_generate_reset_token?
Resque.enqueue(Jobs::ResetPassword, self.id)
Workers::ResetPassword.perform_async(self.id)
end
def update_user_preferences(pref_hash)
......@@ -299,15 +299,15 @@ class User < ActiveRecord::Base
######### Mailer #######################
def mail(job, *args)
pref = job.to_s.gsub('Jobs::Mail::', '').underscore
pref = job.to_s.gsub('Workers::Mail::', '').underscore
if(self.disable_mail == false && !self.user_preferences.exists?(:email_type => pref))
Resque.enqueue(job, *args)
job.perform_async(*args)
end
end
def mail_confirm_email
return false if unconfirmed_email.blank?
Resque.enqueue(Jobs::Mail::ConfirmEmail, id)
Workers::Mail::ConfirmEmail.perform_async(id)
true
end
......
......@@ -6,6 +6,5 @@
%li= link_to t('.weekly_user_stats'), weekly_user_stats_path
%li= link_to t('.pod_stats'), pod_stats_path
%li= link_to t('.correlations'), correlations_path
- if AppConfig.admins.inline_resque_web?
%li= link_to t('.resque_overview'), resque_web_path
%li= link_to t('.sidekiq_monitor'), sidekiq_path
# Copyright (c) 2010-2011, Diaspora Inc. This file is
# licensed under the Affero General Public License version 3 or later. See
# the COPYRIGHT file.
module Workers
class Base
include Sidekiq::Worker
sidekiq_options timeout: AppConfig.environment.sidekiq.timeout.to_i,
backtrace: ((bt = AppConfig.environment.sidekiq.backtrace.get) && bt.to_i),
retry: AppConfig.environment.sidekiq.retry.to_i
# In the long term we need to eliminate the cause of these
def suppress_annoying_errors(&block)
yield
rescue Diaspora::ContactRequiredUnlessRequest,
Diaspora::RelayableObjectWithoutParent => e
Rails.logger.info("error on receive: #{e.class}")
rescue ActiveRecord::RecordInvalid => e
Rails.logger.info("failed to save received object: #{e.record.errors.full_messages}")
raise e unless e.message.match(/already been taken/)
end
end
end
......@@ -2,11 +2,11 @@
# licensed under the Affero General Public License version 3 or later. See
# the COPYRIGHT file.
module Jobs
module Workers
class DeferredDispatch < Base
@queue = :dispatch
sidekiq_options queue: :dispatch
def self.perform(user_id, object_class_name, object_id, opts)
def perform(user_id, object_class_name, object_id, opts)
user = User.find(user_id)
object = object_class_name.constantize.find(object_id)
opts = HashWithIndifferentAccess.new(opts)
......
......@@ -3,10 +3,11 @@
# the COPYRIGHT file.
module Jobs
module Workers
class DeleteAccount < Base
@queue = :delete_account
def self.perform(account_deletion_id)
sidekiq_options queue: :delete_account
def perform(account_deletion_id)
account_deletion = AccountDeletion.find(account_deletion_id)
account_deletion.perform!
end
......
......@@ -2,11 +2,11 @@
# licensed under the Affero General Public License version 3 or later. See
# the COPYRIGHT file.
#
module Jobs
module Workers
class DeletePostFromService < Base
@queue = :http_service
sidekiq_options queue: :http_service
def self.perform(service_id, service_post_id)
def perform(service_id, service_post_id)
service = Service.find_by_id(service_id)
service.delete_post(service_post_id)
end
......
......@@ -3,10 +3,11 @@
# the COPYRIGHT file.
module Jobs
module Workers
class FetchProfilePhoto < Base
@queue = :photos
def self.perform(user_id, service_id, fallback_image_url = nil)
sidekiq_options queue: :photos
def perform(user_id, service_id, fallback_image_url = nil)
service = Service.find(service_id)
image_url = service.profile_photo_url
......
......@@ -2,14 +2,12 @@
# licensed under the Affero General Public License version 3 or later. See
# the COPYRIGHT file.
module Jobs
module Workers
class FetchPublicPosts < Base
@queue = :http_service
sidekiq_options queue: :http_service
def self.perform(diaspora_id)
require Rails.root.join('lib','diaspora','fetcher','public')
PublicFetcher.new.fetch!(diaspora_id)
def perform(diaspora_id)
Diaspora::Fetcher::Public.new.fetch!(diaspora_id)
end
end
end
......@@ -2,15 +2,15 @@
# licensed under the Affero General Public License version 3 or later. See
# the COPYRIGHT file.
module Jobs
module Workers
class FetchWebfinger < Base
@queue = :socket_webfinger
sidekiq_options queue: :socket_webfinger
def self.perform(account)
def perform(account)
person = Webfinger.new(account).fetch
# also, schedule to fetch a few public posts from that person
Resque.enqueue(Jobs::FetchPublicPosts, person.diaspora_handle) unless person.nil?
Workers::FetchPublicPosts.perform_async(person.diaspora_handle) unless person.nil?
end
end
end
......@@ -3,14 +3,20 @@
# the COPYRIGHT file.
#
module Jobs
module Workers
class GatherOEmbedData < Base
@queue = :http_service
sidekiq_options queue: :http_service
def self.perform(post_id, url)
def perform(post_id, url, retry_count=1)
post = Post.find(post_id)
post.o_embed_cache = OEmbedCache.find_or_create_by_url(url)
post.save
rescue ActiveRecord::RecordNotFound
# User created a post and deleted it right afterwards before we
# we had a chance to run the job.
# On the other hand sometimes the job runs before the Post is
# fully persisted. So we just reduce the amount of retries.
GatherOEmbedData.perform_in(1.minute, post_id, url, retry_count+1) unless retry_count > 3
end
end
end
......@@ -2,29 +2,25 @@
# licensed under the Affero General Public License version 3 or later. See
# the COPYRIGHT file.
require 'uri'
require Rails.root.join('lib', 'hydra_wrapper')
module Jobs
module Workers
class HttpMulti < Base
@queue = :http
sidekiq_options queue: :http
MAX_RETRIES = 3
def self.perform(user_id, encoded_object_xml, person_ids, dispatcher_class_as_string, retry_count=0)
def perform(user_id, encoded_object_xml, person_ids, dispatcher_class_as_string, retry_count=0)
user = User.find(user_id)
people = Person.where(:id => person_ids)
dispatcher = dispatcher_class_as_string.constantize
hydra = ::HydraWrapper.new(user, people, encoded_object_xml, dispatcher)
hydra = HydraWrapper.new(user, people, encoded_object_xml, dispatcher)
hydra.enqueue_batch
hydra.run
unless hydra.failed_people.empty?
if retry_count < MAX_RETRIES
Resque.enqueue(Jobs::HttpMulti, user_id, encoded_object_xml, hydra.failed_people, dispatcher_class_as_string, retry_count + 1 )
Workers::HttpMulti.perform_in(1.hour, user_id, encoded_object_xml, hydra.failed_people, dispatcher_class_as_string, retry_count + 1)
else
Rails.logger.info("event=http_multi_abandon sender_id=#{user_id} failed_recipient_ids='[#{person_ids.join(', ')}] '")
end
......
module Jobs
module Workers
module Mail
class AlsoCommented < Base
@queue = :mail
def self.perform(recipient_id, sender_id, comment_id)
sidekiq_options queue: :mail
def perform(recipient_id, sender_id, comment_id)
if email = Notifier.also_commented(recipient_id, sender_id, comment_id)
email.deliver
end
......
module Jobs
module Workers
module Mail
class CommentOnPost < Base
@queue = :mail
def self.perform(recipient_id, sender_id, comment_id)
sidekiq_options queue: :mail
def perform(recipient_id, sender_id, comment_id)
Notifier.comment_on_post(recipient_id, sender_id, comment_id).deliver
end
end
......
module Jobs
module Workers
module Mail
class ConfirmEmail < Base
@queue = :mail
def self.perform(user_id)
sidekiq_options queue: :mail
def perform(user_id)
Notifier.confirm_email(user_id).deliver
end
end
......
......@@ -2,12 +2,12 @@
# licensed under the Affero General Public License version 3 or later. See
# the COPYRIGHT file.
module Jobs
module Workers
module Mail
class InviteUserByEmail < Base
@queue = :mail
def self.perform(invite_id)
sidekiq_options queue: :mail
def perform(invite_id)
invite = Invitation.find(invite_id)
I18n.with_locale(invite.language) do
invite.send!
......
module Jobs
module Workers
module Mail
class Liked < Base
@queue = :mail
def self.perform(recipient_id, sender_id, like_id)
sidekiq_options queue: :mail
def perform(recipient_id, sender_id, like_id)
Notifier.liked(recipient_id, sender_id, like_id).deliver
end
end
......