Last active
October 13, 2021 14:01
-
-
Save bensheldon/5d3add207b7c5e689682c29ebbc506ab to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
require "bundler/inline" | |
gemfile(true) do | |
source "https://rubygems.org" | |
git_source(:github) { |repo| "https://github.com/#{repo}.git" } | |
gem "rails", '~>6.1' | |
end | |
require "active_job" | |
module BatchExtension | |
extend ActiveSupport::Concern | |
included do | |
cattr_accessor :after_batch_config, instance_accessor: false, default: {} | |
attr_accessor :batch_id | |
attr_accessor :batch_key | |
# Disappointing that jobs can't carry their full execution state | |
attr_accessor :error_message | |
end | |
class_methods do | |
def after_batch(key, &block) | |
self.after_batch_config[key] = block | |
end | |
def perform_later_in_batch(key, jobs_or_args = []) | |
jobs = jobs_or_args.map do |job_or_args| | |
job_or_args.is_a?(ActiveJob::Base) ? job_or_args : new(*job_or_args) | |
end | |
queue_adapter.enqueue_batch(key, jobs) | |
end | |
end | |
end | |
module ActiveJob | |
module QueueAdapters | |
class AsyncBatchAdapter < AsyncAdapter | |
def enqueue_batch(key, jobs) | |
# Ignore this implementation; it's purely to make the interface work | |
batch_id = SecureRandom.uuid | |
jobs.each do |job| | |
job.batch_key = key | |
job.batch_id = batch_id | |
end | |
Concurrent::ScheduledTask.execute(0, args: [jobs]) do |jobs| | |
jobs.each do |job| | |
begin | |
job.perform_now | |
rescue => e | |
job.error_message = e.to_s | |
end | |
end | |
jobs.last.class.after_batch_config[key]&.call(jobs) | |
end | |
jobs | |
end | |
end | |
end | |
end | |
class ApplicationJob < ActiveJob::Base | |
self.queue_adapter = :async_batch | |
include BatchExtension | |
end | |
### --- INTERFACE BELOW -- ### | |
class PrepareProduce < ApplicationJob | |
after_batch(:prepare_fruit_salad) do |jobs| | |
prepared_fruits = jobs.reject { |job| job.error_message.present? }.map { |job| job.arguments.first } | |
MakeSalad.perform_later(prepared_fruits) | |
discarded_fruits = jobs.select { |job| job.error_message.present? }.map { |job| job.arguments.first } | |
puts "Discarding #{discarded_fruits.join(', ')}" | |
end | |
def perform(produce, count = 1) | |
if produce.in? ["apple", "banana", "grape"] | |
puts "Preparing #{count} #{produce}" | |
else | |
raise "Ewww, not a #{produce}!" | |
end | |
end | |
end | |
class MakeSalad < ApplicationJob | |
def perform(produces) | |
puts "Making salad with #{produces.join(', ')}" | |
end | |
end | |
batch_objects = [["apple", 4], ["shoe", 2]] | |
batch_objects << PrepareProduce.new( "banana") | |
batch_objects << PrepareProduce.new( "grape", 2) | |
jobs = PrepareProduce.perform_later_in_batch(:prepare_fruit_salad, batch_objects) | |
puts "Enqueued #{jobs.count} jobs with batch_id: #{jobs.first.batch_id}" | |
sleep 2 # wait for batch to finish in the background |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment