1 什麼是 Active Job?
Active Job 是一個框架,用於宣告工作並使其在各種佇列後端上執行。這些工作可以是從定期排程的清理到帳單收費再到郵寄的任何事項。任何可以切分成小工作單元並平行執行的事物。
2 Active Job 的目的
主要重點是確保所有 Rails 應用程式都有一個就緒的工作基礎設施。然後,我們可以讓框架功能和其他 gem 建構在其之上,而不必擔心各種工作執行器(例如 Delayed Job 和 Resque)之間的 API 差異。選擇您的佇列後端就變成更偏向操作上的考量。而且您將可以在它們之間切換,而無需重寫您的工作。
Rails 預設帶有一個非同步佇列實作,它使用處理中執行緒集區來執行工作。工作將會非同步執行,但是佇列中的任何工作都會在重新啟動時被捨棄。
3 建立並排入工作
本節將提供建立工作並將其排入佇列的逐步指南。
3.1 建立工作
Active Job 提供了一個 Rails 產生器來建立工作。以下指令將在 app/jobs
中建立一個工作(並在 test/jobs
下附加一個測試案例)
$ bin/rails generate job guests_cleanup
invoke test_unit
create test/jobs/guests_cleanup_job_test.rb
create app/jobs/guests_cleanup_job.rb
您也可以建立一個將在特定佇列上執行的工作
$ bin/rails generate job guests_cleanup --queue urgent
如果您不想使用產生器,您可以在 app/jobs
內建立自己的檔案,只需確保它繼承自 ApplicationJob
即可。
以下是一個工作的樣子
class GuestsCleanupJob < ApplicationJob
queue_as :default
def perform(*guests)
# Do something later
end
end
請注意,您可以使用任意數量的參數來定義 perform
。
如果您已經有一個抽象類別,且其名稱與 ApplicationJob
不同,您可以傳遞 --parent
選項來指出您想要不同的抽象類別
$ bin/rails generate job process_payment --parent=payment_job
class ProcessPaymentJob < PaymentJob
queue_as :default
def perform(*args)
# Do something later
end
end
3.2 將工作排入佇列
使用 perform_later
以及(選擇性地)set
來將工作排入佇列。像這樣
# Enqueue a job to be performed as soon as the queuing system is
# free.
GuestsCleanupJob.perform_later guest
# Enqueue a job to be performed tomorrow at noon.
GuestsCleanupJob.set(wait_until: Date.tomorrow.noon).perform_later(guest)
# Enqueue a job to be performed 1 week from now.
GuestsCleanupJob.set(wait: 1.week).perform_later(guest)
# `perform_now` and `perform_later` will call `perform` under the hood so
# you can pass as many arguments as defined in the latter.
GuestsCleanupJob.perform_later(guest1, guest2, filter: "some_filter")
就是這樣!
3.3 批量將工作排入佇列
您可以使用 perform_all_later
一次排入多個工作。如需更多詳細資訊,請參閱 批量排入佇列。
4 工作執行
若要在生產環境中將工作排入佇列和執行,您需要設定佇列後端,也就是說,您需要決定 Rails 應該使用哪個第三方佇列程式庫。Rails 本身僅提供一個處理中佇列系統,該系統僅將工作保留在 RAM 中。如果處理程序當機或機器重新啟動,則所有未完成的工作都會隨著預設的非同步後端遺失。這對於較小的應用程式或非關鍵工作來說可能沒問題,但是大多數生產應用程式將需要選擇一個持久性後端。
4.1 後端
Active Job 具有多個佇列後端(Sidekiq、Resque、Delayed Job 和其他)的內建配接器。若要取得最新的配接器清單,請參閱 ActiveJob::QueueAdapters
的 API 文件。
4.2 設定後端
您可以使用 config.active_job.queue_adapter
輕鬆設定您的佇列後端
# config/application.rb
module YourApp
class Application < Rails::Application
# Be sure to have the adapter's gem in your Gemfile
# and follow the adapter's specific installation
# and deployment instructions.
config.active_job.queue_adapter = :sidekiq
end
end
您也可以針對每個工作設定您的後端
class GuestsCleanupJob < ApplicationJob
self.queue_adapter = :resque
# ...
end
# Now your job will use `resque` as its backend queue adapter, overriding what
# was configured in `config.active_job.queue_adapter`.
4.3 啟動後端
由於工作與您的 Rails 應用程式平行執行,因此大多數佇列程式庫都需要您啟動一個程式庫特定的佇列服務(除了啟動您的 Rails 應用程式之外)才能使工作處理正常運作。有關啟動佇列後端的說明,請參閱程式庫文件。
以下是一個不完整的文檔清單
5 佇列
大多數配接器都支援多個佇列。使用 Active Job,您可以使用 queue_as
來排程工作以在特定佇列上執行
class GuestsCleanupJob < ApplicationJob
queue_as :low_priority
# ...
end
您可以使用 application.rb
中的 config.active_job.queue_name_prefix
為所有工作加上佇列名稱前綴
# config/application.rb
module YourApp
class Application < Rails::Application
config.active_job.queue_name_prefix = Rails.env
end
end
# app/jobs/guests_cleanup_job.rb
class GuestsCleanupJob < ApplicationJob
queue_as :low_priority
# ...
end
# Now your job will run on queue production_low_priority on your
# production environment and on staging_low_priority
# on your staging environment
您也可以針對每個工作設定前綴。
class GuestsCleanupJob < ApplicationJob
queue_as :low_priority
self.queue_name_prefix = nil
# ...
end
# Now your job's queue won't be prefixed, overriding what
# was configured in `config.active_job.queue_name_prefix`.
預設的佇列名稱前綴分隔符號是 '_'。您可以透過在 application.rb
中設定 config.active_job.queue_name_delimiter
來變更此設定
# config/application.rb
module YourApp
class Application < Rails::Application
config.active_job.queue_name_prefix = Rails.env
config.active_job.queue_name_delimiter = "."
end
end
# app/jobs/guests_cleanup_job.rb
class GuestsCleanupJob < ApplicationJob
queue_as :low_priority
# ...
end
# Now your job will run on queue production.low_priority on your
# production environment and on staging.low_priority
# on your staging environment
若要從工作層級控制佇列,您可以將區塊傳遞至 queue_as
。該區塊將會在工作內容中執行(因此它可以存取 self.arguments
),而且它必須傳回佇列名稱
class ProcessVideoJob < ApplicationJob
queue_as do
video = self.arguments.first
if video.owner.premium?
:premium_videojobs
else
:videojobs
end
end
def perform(video)
# Do process video
end
end
ProcessVideoJob.perform_later(Video.last)
如果您想要更精細地控制工作將在哪個佇列中執行,您可以將 :queue
選項傳遞至 set
MyJob.set(queue: :another_queue).perform_later(record)
請確保您的佇列後端在您的佇列名稱上「監聽」。對於某些後端,您需要指定要監聽的佇列。
6 優先順序
某些配接器支援工作層級的優先順序,其中工作可以相對於佇列中或跨所有佇列的其他工作進行優先排序。
您可以使用 queue_with_priority
來排程工作,以特定優先順序執行
class GuestsCleanupJob < ApplicationJob
queue_with_priority 10
# ...
end
請注意,這對於不支援優先順序的配接器將不會有任何影響。
與 queue_as
類似,您也可以將區塊傳遞至 queue_with_priority
,以便在工作內容中評估
class ProcessVideoJob < ApplicationJob
queue_with_priority do
video = self.arguments.first
if video.owner.premium?
0
else
10
end
end
def perform(video)
# Process video
end
end
ProcessVideoJob.perform_later(Video.last)
您也可以將 :priority
選項傳遞至 set
MyJob.set(priority: 50).perform_later(record)
較低優先級的數字在較高優先級數字之前或之後執行,取決於适配器的實作方式。請參閱您後端的文檔以獲取更多資訊。建議适配器作者將較低的數字視為更重要。
7 回呼函數
Active Job 提供了在任務生命週期中觸發邏輯的鉤子。與 Rails 中的其他回呼函數類似,您可以將回呼函數實作為普通方法,並使用巨集樣式的類別方法將它們註冊為回呼函數。
class GuestsCleanupJob < ApplicationJob
queue_as :default
around_perform :around_cleanup
def perform
# Do something later
end
private
def around_cleanup
# Do something before perform
yield
# Do something after perform
end
end
巨集樣式的類別方法也可以接收一個區塊。如果區塊內的程式碼非常簡短,可以放在一行中,請考慮使用這種樣式。例如,您可以為每個加入佇列的任務傳送指標。
class ApplicationJob < ActiveJob::Base
before_enqueue { |job| $statsd.increment "#{job.class.name.underscore}.enqueue" }
end
7.1 可用的回呼函數
請注意,當使用 perform_all_later
批量加入任務時,例如 around_enqueue
等回呼函數將不會在個別任務上觸發。請參閱批量加入佇列回呼函數。
8 批量加入佇列
您可以使用 perform_all_later
一次加入多個任務。批量加入佇列減少了與佇列資料儲存(如 Redis 或資料庫)的往返次數,使其比單獨加入相同任務的效能更高。
perform_all_later
是 Active Job 的頂級 API。它接受實例化的任務作為引數(請注意,這與 perform_later
不同)。perform_all_later
實際上會呼叫 perform
。傳遞給 new
的引數將在最終呼叫時傳遞給 perform
。
以下是一個使用 GuestCleanupJob
實例呼叫 perform_all_later
的範例
# Create jobs to pass to `perform_all_later`.
# The arguments to `new` are passed on to `perform`
guest_cleanup_jobs = Guest.all.map { |guest| GuestsCleanupJob.new(guest) }
# Will enqueue a separate job for each instance of `GuestCleanupJob`
ActiveJob.perform_all_later(guest_cleanup_jobs)
# Can also use `set` method to configure options before bulk enqueuing jobs.
guest_cleanup_jobs = Guest.all.map { |guest| GuestsCleanupJob.new(guest).set(wait: 1.day) }
ActiveJob.perform_all_later(guest_cleanup_jobs)
perform_all_later
會記錄成功加入佇列的任務數量,例如,如果上面的 Guest.all.map
產生了 3 個 guest_cleanup_jobs
,它將會記錄 Enqueued 3 jobs to Async (3 GuestsCleanupJob)
(假設所有任務都已加入佇列)。
perform_all_later
的回傳值為 nil
。請注意,這與 perform_later
不同,後者會回傳已加入佇列的任務類別的實例。
8.1 加入多個 Active Job 類別
使用 perform_all_later
,也可以在同一次呼叫中加入不同的 Active Job 類別實例。例如
class ExportDataJob < ApplicationJob
def perform(*args)
# Export data
end
end
class NotifyGuestsJob < ApplicationJob
def perform(*guests)
# Email guests
end
end
# Instantiate job instances
cleanup_job = GuestsCleanupJob.new(guest)
export_job = ExportDataJob.new(data)
notify_job = NotifyGuestsJob.new(guest)
# Enqueues job instances from multiple classes at once
ActiveJob.perform_all_later(cleanup_job, export_job, notify_job)
8.2 批量加入佇列回呼函數
當使用 perform_all_later
批量加入任務時,例如 around_enqueue
等回呼函數將不會在個別任務上觸發。此行為與其他 Active Record 批量方法一致。由於回呼函數在個別任務上執行,因此它們無法利用此方法的批量特性。
但是,perform_all_later
方法會觸發一個 enqueue_all.active_job
事件,您可以使用 ActiveSupport::Notifications
訂閱此事件。
方法 successfully_enqueued?
可以用來找出給定的任務是否已成功加入佇列。
8.3 佇列後端支援
對於 perform_all_later
,批量加入佇列需要由 佇列後端 支援。
例如,Sidekiq 有一個 push_bulk
方法,可以將大量任務推送到 Redis,並防止往返網路延遲。GoodJob 也支援使用 GoodJob::Bulk.enqueue
方法進行批量加入佇列。新的佇列後端 Solid Queue
也已新增對批量加入佇列的支援。
如果佇列後端 *不* 支援批量加入佇列,perform_all_later
將會逐一加入任務。
9 Action Mailer
現代 Web 應用程式中最常見的任務之一是在請求-回應週期之外傳送電子郵件,這樣使用者就不必等待它。Active Job 與 Action Mailer 集成,因此您可以輕鬆地非同步傳送電子郵件
# If you want to send the email now use #deliver_now
UserMailer.welcome(@user).deliver_now
# If you want to send the email through Active Job use #deliver_later
UserMailer.welcome(@user).deliver_later
通常,從 Rake 任務使用非同步佇列(例如,使用 .deliver_later
傳送電子郵件)將無法正常運作,因為 Rake 很可能會結束,導致在處理任何/所有 .deliver_later
電子郵件之前刪除程序內執行緒集區。為了避免這個問題,請使用 .deliver_now
或在開發中執行持續佇列。
10 國際化
每個任務都使用建立任務時設定的 I18n.locale
。如果您非同步傳送電子郵件,這會很有用
I18n.locale = :eo
UserMailer.welcome(@user).deliver_later # Email will be localized to Esperanto.
11 引數支援的類型
ActiveJob 預設支援以下類型的引數
- 基本類型(
NilClass
、String
、Integer
、Float
、BigDecimal
、TrueClass
、FalseClass
) Symbol
Date
Time
DateTime
ActiveSupport::TimeWithZone
ActiveSupport::Duration
Hash
(鍵應為String
或Symbol
類型)ActiveSupport::HashWithIndifferentAccess
Array
Range
Module
Class
11.1 GlobalID
Active Job 支援參數的 GlobalID。這使得將即時 Active Record 物件傳遞給您的任務成為可能,而不是類別/ID 對,然後您必須手動反序列化。以前,任務看起來像這樣
class TrashableCleanupJob < ApplicationJob
def perform(trashable_class, trashable_id, depth)
trashable = trashable_class.constantize.find(trashable_id)
trashable.cleanup(depth)
end
end
現在您可以簡單地執行
class TrashableCleanupJob < ApplicationJob
def perform(trashable, depth)
trashable.cleanup(depth)
end
end
這適用於任何混合了 GlobalID::Identification
的類別,預設情況下,Active Record 類別已混合了該類別。
11.2 序列化程式
您可以擴充支援的引數類型列表。您只需要定義自己的序列化程式
# app/serializers/money_serializer.rb
class MoneySerializer < ActiveJob::Serializers::ObjectSerializer
# Converts an object to a simpler representative using supported object types.
# The recommended representative is a Hash with a specific key. Keys can be of basic types only.
# You should call `super` to add the custom serializer type to the hash.
def serialize(money)
super(
"amount" => money.amount,
"currency" => money.currency
)
end
# Converts serialized value into a proper object.
def deserialize(hash)
Money.new(hash["amount"], hash["currency"])
end
private
# Checks if an argument should be serialized by this serializer.
def klass
Money
end
end
並將此序列化程式新增至列表
# config/initializers/custom_serializers.rb
Rails.application.config.active_job.custom_serializers << MoneySerializer
請注意,不支援在初始化期間自動載入可重新載入的程式碼。因此,建議設定僅載入一次的序列化程式,例如,透過修改 config/application.rb
,如下所示
# config/application.rb
module YourApp
class Application < Rails::Application
config.autoload_once_paths << "#{root}/app/serializers"
end
end
12 例外
可以在任務執行期間引發的例外可以使用 rescue_from
處理
class GuestsCleanupJob < ApplicationJob
queue_as :default
rescue_from(ActiveRecord::RecordNotFound) do |exception|
# Do something with the exception
end
def perform
# Do something later
end
end
如果沒有從任務中捕獲例外,則該任務會被視為「失敗」。
12.1 重試或捨棄失敗的任務
除非另行配置,否則不會重試失敗的任務。
可以使用 retry_on
或 discard_on
分別重試或捨棄失敗的任務。例如
class RemoteServiceJob < ApplicationJob
retry_on CustomAppException # defaults to 3s wait, 5 attempts
discard_on ActiveJob::DeserializationError
def perform(*args)
# Might raise CustomAppException or ActiveJob::DeserializationError
end
end
12.2 反序列化
GlobalID 允許序列化傳遞給 #perform
的完整 Active Record 物件。
如果在任務加入佇列後但在呼叫 #perform
方法之前刪除了傳遞的記錄,Active Job 將會引發 ActiveJob::DeserializationError
例外。
13 任務測試
您可以在測試指南中找到有關如何測試任務的詳細說明。
14 除錯
如果您需要幫助找出任務的來源,可以啟用詳細記錄。