module R10K::ContentSynchronizer

Public Class Methods

concurrent_accept(modules, visitor, loader, pool_size, logger) click to toggle source

Returns a Queue of the names of modules actually updated

# File lib/r10k/content_synchronizer.rb, line 20
def self.concurrent_accept(modules, visitor, loader, pool_size, logger)
  mods_queue = modules_visit_queue(modules, visitor, loader)
  sync_queue(mods_queue, pool_size, logger)
end
concurrent_sync(modules, pool_size, logger) click to toggle source

Returns a Queue of the names of modules actually updated

# File lib/r10k/content_synchronizer.rb, line 26
def self.concurrent_sync(modules, pool_size, logger)
  mods_queue = modules_sync_queue(modules)
  sync_queue(mods_queue, pool_size, logger)
end
enqueue_modules(queue, modules) click to toggle source
# File lib/r10k/content_synchronizer.rb, line 69
def self.enqueue_modules(queue, modules)
  modules_by_cachedir = modules.group_by { |mod| mod.cachedir }
  modules_without_vcs_cachedir = modules_by_cachedir.delete(:none) || []

  modules_without_vcs_cachedir.each {|mod| queue << Array(mod) }
  modules_by_cachedir.values.each {|mods| queue << mods }
end
modules_sync_queue(modules) click to toggle source
# File lib/r10k/content_synchronizer.rb, line 63
def self.modules_sync_queue(modules)
  Queue.new.tap do |queue|
    enqueue_modules(queue, modules)
  end
end
modules_visit_queue(modules, visitor, loader) click to toggle source
# File lib/r10k/content_synchronizer.rb, line 55
def self.modules_visit_queue(modules, visitor, loader)
  Queue.new.tap do |queue|
    visitor.visit(:puppetfile, loader) do
      enqueue_modules(queue, modules)
    end
  end
end
serial_accept(modules, visitor, loader) click to toggle source
# File lib/r10k/content_synchronizer.rb, line 4
def self.serial_accept(modules, visitor, loader)
  visitor.visit(:puppetfile, loader) do
    serial_sync(modules)
  end
end
serial_sync(modules) click to toggle source
# File lib/r10k/content_synchronizer.rb, line 10
def self.serial_sync(modules)
  updated_modules = []
  modules.each do |mod|
    updated = mod.sync
    updated_modules << mod.name if updated
  end
  updated_modules
end
sync_queue(mods_queue, pool_size, logger) click to toggle source

Returns a Queue of the names of modules actually updated

# File lib/r10k/content_synchronizer.rb, line 32
def self.sync_queue(mods_queue, pool_size, logger)
  logger.debug _("Updating modules with %{pool_size} threads") % {pool_size: pool_size}
  updated_modules = Queue.new
  thread_pool = pool_size.times.map { sync_thread(mods_queue, logger, updated_modules) }
  thread_exception = nil

  # If any threads raise an exception the deployment is considered a failure.
  # In that event clear the queue, wait for other threads to finish their
  # current work, then re-raise the first exception caught.
  begin
    thread_pool.each(&:join)
    # Return the list of all modules that were actually updated
    updated_modules
  rescue => e
    logger.error _("Error during concurrent deploy of a module: %{message}") % {message: e.message}
    mods_queue.clear
    thread_exception ||= e
    retry
  ensure
    raise thread_exception unless thread_exception.nil?
  end
end
sync_thread(mods_queue, logger, updated_modules) click to toggle source
# File lib/r10k/content_synchronizer.rb, line 77
def self.sync_thread(mods_queue, logger, updated_modules)
  Thread.new do
    begin
      while mods = mods_queue.pop(true) do
        mods.each do |mod|
          begin
            updated = mod.sync
            updated_modules << mod.name if updated
          rescue Exception => e
            logger.error _("Module %{mod_name} failed to synchronize due to %{message}") % {mod_name: mod.name, message: e.message}
            raise e
          end
        end
      end
    rescue ThreadError => e
      logger.debug _("Module thread %{id} exiting: %{message}") % {message: e.message, id: Thread.current.object_id}
      Thread.exit
    rescue => e
      Thread.main.raise(e)
    end
  end
end