更多資訊請參考 rubyonrails.org:

Active Job 基礎

本指南提供您開始建立、排入佇列和執行背景工作的所需一切資訊。

閱讀本指南後,您將了解

  • 如何建立工作。
  • 如何將工作排入佇列。
  • 如何在背景執行工作。
  • 如何從應用程式非同步發送電子郵件。

1 什麼是 Active Job?

Active Job 是一個框架,用於宣告工作並使其在各種佇列後端上執行。這些工作可以是從定期排程的清理到帳單收費再到郵寄的任何事項。任何可以切分成小工作單元並平行執行的事物。

2 Active Job 的目的

主要重點是確保所有 Rails 應用程式都有一個就緒的工作基礎設施。然後,我們可以讓框架功能和其他 gem 建構在其之上,而不必擔心各種工作執行器(例如 Delayed Job 和 Resque)之間的 API 差異。選擇您的佇列後端就變成更偏向操作上的考量。而且您將可以在它們之間切換,而無需重寫您的工作。

Rails 預設帶有一個非同步佇列實作,它使用處理中執行緒集區來執行工作。工作將會非同步執行,但是佇列中的任何工作都會在重新啟動時被捨棄。

3 建立並排入工作

本節將提供建立工作並將其排入佇列的逐步指南。

3.1 建立工作

Active Job 提供了一個 Rails 產生器來建立工作。以下指令將在 app/jobs 中建立一個工作(並在 test/jobs 下附加一個測試案例)

$ bin/rails generate job guests_cleanup
invoke  test_unit
create    test/jobs/guests_cleanup_job_test.rb
create  app/jobs/guests_cleanup_job.rb

您也可以建立一個將在特定佇列上執行的工作

$ bin/rails generate job guests_cleanup --queue urgent

如果您不想使用產生器,您可以在 app/jobs 內建立自己的檔案,只需確保它繼承自 ApplicationJob 即可。

以下是一個工作的樣子

class GuestsCleanupJob < ApplicationJob
  queue_as :default

  def perform(*guests)
    # Do something later
  end
end

請注意,您可以使用任意數量的參數來定義 perform

如果您已經有一個抽象類別,且其名稱與 ApplicationJob 不同,您可以傳遞 --parent 選項來指出您想要不同的抽象類別

$ bin/rails generate job process_payment --parent=payment_job
class ProcessPaymentJob < PaymentJob
  queue_as :default

  def perform(*args)
    # Do something later
  end
end

3.2 將工作排入佇列

使用 perform_later 以及(選擇性地)set 來將工作排入佇列。像這樣

# Enqueue a job to be performed as soon as the queuing system is
# free.
GuestsCleanupJob.perform_later guest
# Enqueue a job to be performed tomorrow at noon.
GuestsCleanupJob.set(wait_until: Date.tomorrow.noon).perform_later(guest)
# Enqueue a job to be performed 1 week from now.
GuestsCleanupJob.set(wait: 1.week).perform_later(guest)
# `perform_now` and `perform_later` will call `perform` under the hood so
# you can pass as many arguments as defined in the latter.
GuestsCleanupJob.perform_later(guest1, guest2, filter: "some_filter")

就是這樣!

3.3 批量將工作排入佇列

您可以使用 perform_all_later 一次排入多個工作。如需更多詳細資訊,請參閱 批量排入佇列

4 工作執行

若要在生產環境中將工作排入佇列和執行,您需要設定佇列後端,也就是說,您需要決定 Rails 應該使用哪個第三方佇列程式庫。Rails 本身僅提供一個處理中佇列系統,該系統僅將工作保留在 RAM 中。如果處理程序當機或機器重新啟動,則所有未完成的工作都會隨著預設的非同步後端遺失。這對於較小的應用程式或非關鍵工作來說可能沒問題,但是大多數生產應用程式將需要選擇一個持久性後端。

4.1 後端

Active Job 具有多個佇列後端(Sidekiq、Resque、Delayed Job 和其他)的內建配接器。若要取得最新的配接器清單,請參閱 ActiveJob::QueueAdapters 的 API 文件。

4.2 設定後端

您可以使用 config.active_job.queue_adapter 輕鬆設定您的佇列後端

# config/application.rb
module YourApp
  class Application < Rails::Application
    # Be sure to have the adapter's gem in your Gemfile
    # and follow the adapter's specific installation
    # and deployment instructions.
    config.active_job.queue_adapter = :sidekiq
  end
end

您也可以針對每個工作設定您的後端

class GuestsCleanupJob < ApplicationJob
  self.queue_adapter = :resque
  # ...
end

# Now your job will use `resque` as its backend queue adapter, overriding what
# was configured in `config.active_job.queue_adapter`.

4.3 啟動後端

由於工作與您的 Rails 應用程式平行執行,因此大多數佇列程式庫都需要您啟動一個程式庫特定的佇列服務(除了啟動您的 Rails 應用程式之外)才能使工作處理正常運作。有關啟動佇列後端的說明,請參閱程式庫文件。

以下是一個不完整的文檔清單

5 佇列

大多數配接器都支援多個佇列。使用 Active Job,您可以使用 queue_as 來排程工作以在特定佇列上執行

class GuestsCleanupJob < ApplicationJob
  queue_as :low_priority
  # ...
end

您可以使用 application.rb 中的 config.active_job.queue_name_prefix 為所有工作加上佇列名稱前綴

# config/application.rb
module YourApp
  class Application < Rails::Application
    config.active_job.queue_name_prefix = Rails.env
  end
end
# app/jobs/guests_cleanup_job.rb
class GuestsCleanupJob < ApplicationJob
  queue_as :low_priority
  # ...
end

# Now your job will run on queue production_low_priority on your
# production environment and on staging_low_priority
# on your staging environment

您也可以針對每個工作設定前綴。

class GuestsCleanupJob < ApplicationJob
  queue_as :low_priority
  self.queue_name_prefix = nil
  # ...
end

# Now your job's queue won't be prefixed, overriding what
# was configured in `config.active_job.queue_name_prefix`.

預設的佇列名稱前綴分隔符號是 '_'。您可以透過在 application.rb 中設定 config.active_job.queue_name_delimiter 來變更此設定

# config/application.rb
module YourApp
  class Application < Rails::Application
    config.active_job.queue_name_prefix = Rails.env
    config.active_job.queue_name_delimiter = "."
  end
end
# app/jobs/guests_cleanup_job.rb
class GuestsCleanupJob < ApplicationJob
  queue_as :low_priority
  # ...
end

# Now your job will run on queue production.low_priority on your
# production environment and on staging.low_priority
# on your staging environment

若要從工作層級控制佇列,您可以將區塊傳遞至 queue_as。該區塊將會在工作內容中執行(因此它可以存取 self.arguments),而且它必須傳回佇列名稱

class ProcessVideoJob < ApplicationJob
  queue_as do
    video = self.arguments.first
    if video.owner.premium?
      :premium_videojobs
    else
      :videojobs
    end
  end

  def perform(video)
    # Do process video
  end
end
ProcessVideoJob.perform_later(Video.last)

如果您想要更精細地控制工作將在哪個佇列中執行,您可以將 :queue 選項傳遞至 set

MyJob.set(queue: :another_queue).perform_later(record)

請確保您的佇列後端在您的佇列名稱上「監聽」。對於某些後端,您需要指定要監聽的佇列。

6 優先順序

某些配接器支援工作層級的優先順序,其中工作可以相對於佇列中或跨所有佇列的其他工作進行優先排序。

您可以使用 queue_with_priority 來排程工作,以特定優先順序執行

class GuestsCleanupJob < ApplicationJob
  queue_with_priority 10
  # ...
end

請注意,這對於不支援優先順序的配接器將不會有任何影響。

queue_as 類似,您也可以將區塊傳遞至 queue_with_priority,以便在工作內容中評估

class ProcessVideoJob < ApplicationJob
  queue_with_priority do
    video = self.arguments.first
    if video.owner.premium?
      0
    else
      10
    end
  end

  def perform(video)
    # Process video
  end
end
ProcessVideoJob.perform_later(Video.last)

您也可以將 :priority 選項傳遞至 set

MyJob.set(priority: 50).perform_later(record)

較低優先級的數字在較高優先級數字之前或之後執行,取決於适配器的實作方式。請參閱您後端的文檔以獲取更多資訊。建議适配器作者將較低的數字視為更重要。

7 回呼函數

Active Job 提供了在任務生命週期中觸發邏輯的鉤子。與 Rails 中的其他回呼函數類似,您可以將回呼函數實作為普通方法,並使用巨集樣式的類別方法將它們註冊為回呼函數。

class GuestsCleanupJob < ApplicationJob
  queue_as :default

  around_perform :around_cleanup

  def perform
    # Do something later
  end

  private
    def around_cleanup
      # Do something before perform
      yield
      # Do something after perform
    end
end

巨集樣式的類別方法也可以接收一個區塊。如果區塊內的程式碼非常簡短,可以放在一行中,請考慮使用這種樣式。例如,您可以為每個加入佇列的任務傳送指標。

class ApplicationJob < ActiveJob::Base
  before_enqueue { |job| $statsd.increment "#{job.class.name.underscore}.enqueue" }
end

7.1 可用的回呼函數

請注意,當使用 perform_all_later 批量加入任務時,例如 around_enqueue 等回呼函數將不會在個別任務上觸發。請參閱批量加入佇列回呼函數

8 批量加入佇列

您可以使用 perform_all_later 一次加入多個任務。批量加入佇列減少了與佇列資料儲存(如 Redis 或資料庫)的往返次數,使其比單獨加入相同任務的效能更高。

perform_all_later 是 Active Job 的頂級 API。它接受實例化的任務作為引數(請注意,這與 perform_later 不同)。perform_all_later 實際上會呼叫 perform。傳遞給 new 的引數將在最終呼叫時傳遞給 perform

以下是一個使用 GuestCleanupJob 實例呼叫 perform_all_later 的範例

# Create jobs to pass to `perform_all_later`.
# The arguments to `new` are passed on to `perform`
guest_cleanup_jobs = Guest.all.map { |guest| GuestsCleanupJob.new(guest) }

# Will enqueue a separate job for each instance of `GuestCleanupJob`
ActiveJob.perform_all_later(guest_cleanup_jobs)

# Can also use `set` method to configure options before bulk enqueuing jobs.
guest_cleanup_jobs = Guest.all.map { |guest| GuestsCleanupJob.new(guest).set(wait: 1.day) }

ActiveJob.perform_all_later(guest_cleanup_jobs)

perform_all_later 會記錄成功加入佇列的任務數量,例如,如果上面的 Guest.all.map 產生了 3 個 guest_cleanup_jobs,它將會記錄 Enqueued 3 jobs to Async (3 GuestsCleanupJob)(假設所有任務都已加入佇列)。

perform_all_later 的回傳值為 nil。請注意,這與 perform_later 不同,後者會回傳已加入佇列的任務類別的實例。

8.1 加入多個 Active Job 類別

使用 perform_all_later,也可以在同一次呼叫中加入不同的 Active Job 類別實例。例如

class ExportDataJob < ApplicationJob
  def perform(*args)
    # Export data
  end
end

class NotifyGuestsJob < ApplicationJob
  def perform(*guests)
    # Email guests
  end
end

# Instantiate job instances
cleanup_job = GuestsCleanupJob.new(guest)
export_job = ExportDataJob.new(data)
notify_job = NotifyGuestsJob.new(guest)

# Enqueues job instances from multiple classes at once
ActiveJob.perform_all_later(cleanup_job, export_job, notify_job)

8.2 批量加入佇列回呼函數

當使用 perform_all_later 批量加入任務時,例如 around_enqueue 等回呼函數將不會在個別任務上觸發。此行為與其他 Active Record 批量方法一致。由於回呼函數在個別任務上執行,因此它們無法利用此方法的批量特性。

但是,perform_all_later 方法會觸發一個 enqueue_all.active_job 事件,您可以使用 ActiveSupport::Notifications 訂閱此事件。

方法 successfully_enqueued? 可以用來找出給定的任務是否已成功加入佇列。

8.3 佇列後端支援

對於 perform_all_later,批量加入佇列需要由 佇列後端 支援。

例如,Sidekiq 有一個 push_bulk 方法,可以將大量任務推送到 Redis,並防止往返網路延遲。GoodJob 也支援使用 GoodJob::Bulk.enqueue 方法進行批量加入佇列。新的佇列後端 Solid Queue 也已新增對批量加入佇列的支援。

如果佇列後端 *不* 支援批量加入佇列,perform_all_later 將會逐一加入任務。

9 Action Mailer

現代 Web 應用程式中最常見的任務之一是在請求-回應週期之外傳送電子郵件,這樣使用者就不必等待它。Active Job 與 Action Mailer 集成,因此您可以輕鬆地非同步傳送電子郵件

# If you want to send the email now use #deliver_now
UserMailer.welcome(@user).deliver_now

# If you want to send the email through Active Job use #deliver_later
UserMailer.welcome(@user).deliver_later

通常,從 Rake 任務使用非同步佇列(例如,使用 .deliver_later 傳送電子郵件)將無法正常運作,因為 Rake 很可能會結束,導致在處理任何/所有 .deliver_later 電子郵件之前刪除程序內執行緒集區。為了避免這個問題,請使用 .deliver_now 或在開發中執行持續佇列。

10 國際化

每個任務都使用建立任務時設定的 I18n.locale。如果您非同步傳送電子郵件,這會很有用

I18n.locale = :eo

UserMailer.welcome(@user).deliver_later # Email will be localized to Esperanto.

11 引數支援的類型

ActiveJob 預設支援以下類型的引數

  • 基本類型(NilClassStringIntegerFloatBigDecimalTrueClassFalseClass
  • Symbol
  • Date
  • Time
  • DateTime
  • ActiveSupport::TimeWithZone
  • ActiveSupport::Duration
  • Hash(鍵應為 StringSymbol 類型)
  • ActiveSupport::HashWithIndifferentAccess
  • Array
  • Range
  • Module
  • Class

11.1 GlobalID

Active Job 支援參數的 GlobalID。這使得將即時 Active Record 物件傳遞給您的任務成為可能,而不是類別/ID 對,然後您必須手動反序列化。以前,任務看起來像這樣

class TrashableCleanupJob < ApplicationJob
  def perform(trashable_class, trashable_id, depth)
    trashable = trashable_class.constantize.find(trashable_id)
    trashable.cleanup(depth)
  end
end

現在您可以簡單地執行

class TrashableCleanupJob < ApplicationJob
  def perform(trashable, depth)
    trashable.cleanup(depth)
  end
end

這適用於任何混合了 GlobalID::Identification 的類別,預設情況下,Active Record 類別已混合了該類別。

11.2 序列化程式

您可以擴充支援的引數類型列表。您只需要定義自己的序列化程式

# app/serializers/money_serializer.rb
class MoneySerializer < ActiveJob::Serializers::ObjectSerializer
  # Converts an object to a simpler representative using supported object types.
  # The recommended representative is a Hash with a specific key. Keys can be of basic types only.
  # You should call `super` to add the custom serializer type to the hash.
  def serialize(money)
    super(
      "amount" => money.amount,
      "currency" => money.currency
    )
  end

  # Converts serialized value into a proper object.
  def deserialize(hash)
    Money.new(hash["amount"], hash["currency"])
  end

  private
    # Checks if an argument should be serialized by this serializer.
    def klass
      Money
    end
end

並將此序列化程式新增至列表

# config/initializers/custom_serializers.rb
Rails.application.config.active_job.custom_serializers << MoneySerializer

請注意,不支援在初始化期間自動載入可重新載入的程式碼。因此,建議設定僅載入一次的序列化程式,例如,透過修改 config/application.rb,如下所示

# config/application.rb
module YourApp
  class Application < Rails::Application
    config.autoload_once_paths << "#{root}/app/serializers"
  end
end

12 例外

可以在任務執行期間引發的例外可以使用 rescue_from 處理

class GuestsCleanupJob < ApplicationJob
  queue_as :default

  rescue_from(ActiveRecord::RecordNotFound) do |exception|
    # Do something with the exception
  end

  def perform
    # Do something later
  end
end

如果沒有從任務中捕獲例外,則該任務會被視為「失敗」。

12.1 重試或捨棄失敗的任務

除非另行配置,否則不會重試失敗的任務。

可以使用 retry_ondiscard_on 分別重試或捨棄失敗的任務。例如

class RemoteServiceJob < ApplicationJob
  retry_on CustomAppException # defaults to 3s wait, 5 attempts

  discard_on ActiveJob::DeserializationError

  def perform(*args)
    # Might raise CustomAppException or ActiveJob::DeserializationError
  end
end

12.2 反序列化

GlobalID 允許序列化傳遞給 #perform 的完整 Active Record 物件。

如果在任務加入佇列後但在呼叫 #perform 方法之前刪除了傳遞的記錄,Active Job 將會引發 ActiveJob::DeserializationError 例外。

13 任務測試

您可以在測試指南中找到有關如何測試任務的詳細說明。

14 除錯

如果您需要幫助找出任務的來源,可以啟用詳細記錄



回到頂部