In Files

Namespace

Class Index [+]

Quicksearch

Rask

Authors:mewlist / Hidenori Doi
Copyright:Copyright (C) 2010 mewlist / Hidenori Doi
License:The MIT License

Rask is terminatable task engine

sample code

 require 'rubygems'
 require 'rask'

 # task of count up to 10
 class CountupTask < Rask::Task
   # define statemachine states
   define_state :start,   :initial => true       # initial state
   define_state :running                         # run
   define_state :finish,  :from    => [:running] # finish (only from :running)

   def start # same name as state definition(define_state)
     @count = 0
     p "start"
     transition_to_running # transition to :running
   end

   def running
     p "running count => #{@count+=1}"
     transition_to_finish if @count>=10 # transition to :finish
   end

   def finish
     p "finished"
     destroy # destroy task myself
   end
 end

 Rask.insert CountupTask.new # insert the task

 Rask.daemon # run as a daemon

Public Class Methods

base_directory() click to toggle source

(Not documented)

# File lib/rask.rb, line 148
  def self.base_directory
    @@base_dir
  end
base_directory=(new_directory) click to toggle source

Set/Get base storage directory

default :/tmp/rask
# File lib/rask.rb, line 144
  def self.base_directory=(new_directory)
    @@base_dir = new_directory
  end
daemon(options = {:class=>nil, :group=>nil, :sleep=>0.1, :worker_sleep=>0.1, :process_name=>nil}) click to toggle source

Start a daemon process

options
class :: Only the instance of specified class.
group :Only the instance of specified group. see also Task::initialize.
sleep :Polling interval daemon process.
# File lib/rask.rb, line 273
  def self.daemon(options = {:class=>nil, :group=>nil, :sleep=>0.1, :worker_sleep=>0.1, :process_name=>nil})
    options = { :sleep=>0.1, :worker_sleep=>0.1, :process_name=>nil, :threading=>true }.merge(options)
    print "daemon start\n"
    exit if fork
    Process.setsid
    
    initialize_storage
    if File.exist? pid_path(options[:process_name])
      print "already running rask process. #{File.basename($0)}"
      return
    end
    open(pid_path(options[:process_name]),"w"){|f| f.write Process.pid}
    
    # create worker threads
    threads = []
    for i in 1..@@thread_max_count do 
      threads << Thread::new(i) { |thread_id| thread_proc(thread_id, options[:worker_sleep]) }
    end
    
    Signal.trap(:TERM) {safe_exit(options[:process_name])}
    
    while true
      task_list = Rask.task_ids(options)
      task_list.each { |d|
        @@locker.synchronize do
          unless @@processing.include?(d)
            @@queue.push d
            @@processing.push d
          end
        end
      }
      sleep(options[:sleep])
    end
  end
destroy(task) click to toggle source

force destroy the task

# File lib/rask.rb, line 262
  def self.destroy(task)
    FileUtils.rm(task_path(task.task_id)) if File.exists? task_path(task.task_id)
  end
insert(task) click to toggle source

Insert a new task. The task will be controlled under Rask daemon process.

sample code

 Rask::insert NewTask.new
# File lib/rask.rb, line 176
  def self.insert(task)
    initialize_storage
    task_id = "#{safe_class_name(task.class.name)}-#{task.group.to_s}-#{Time.now.to_i}-#{Time.now.usec}"
    task.task_id = task_id
    FileUtils.touch(task_path(task_id)) unless File.exists? task_path(task_id)
    f = File.open(task_path(task_id), 'w')
    f.flock(File::LOCK_EX)
    Marshal.dump(task, f)
    f.flock(File::LOCK_UN)
    f.close
  end
pid_path(process_name = nil) click to toggle source
# File lib/rask.rb, line 166
  def self.pid_path(process_name = nil)
    return @@base_dir+"/#{File.basename($0)}.pid" if process_name==nil
    @@base_dir+"/#{process_name}.pid"
  end
read(task_id) click to toggle source

Get the task instance to observe.

You can use the instance for only the purpose of observation.

# File lib/rask.rb, line 224
  def self.read(task_id)
    f = File.open(task_path(task_id), 'r+') rescue return
    f.flock(File::LOCK_EX)
    task = Marshal.restore(f)
    f.flock(File::LOCK_UN)
    f.close
    task.read_only = true
    task
  end
run(task_id) click to toggle source
# File lib/rask.rb, line 190
  def self.run(task_id)
    f = File.open(task_path(task_id), 'r+') rescue return
    f.flock(File::LOCK_EX)
    task = Marshal.restore(f)
    begin
      if block_given?
        yield task
      else
        task.run
      end
      f.truncate(0)
      f.pos = 0
      Marshal.dump(task, f)
      f.flock(File::LOCK_UN)
      f.close
      FileUtils.rm(task_path(task_id)) if task.destroy?
    rescue
      p $!
      print $@.join("\n") + "\n--------------------------------------------\n"
      f.flock(File::LOCK_UN)
      f.close
      FileUtils.mv(task_path(task_id), @@base_dir+"/suspended/")
    end
  end
run_all(options = { :class=>nil, :group=>nil }) click to toggle source
# File lib/rask.rb, line 216
  def self.run_all(options = { :class=>nil, :group=>nil })
    Rask.task_ids(options).each { |task_id| run(task_id) }
  end
task_ids(options = { :class=>nil, :group=>nil }) click to toggle source

Get task_id list.

options
class :: Only the instance of specified class.
group :Only the instance of specified group. see also Task::initialize
# File lib/rask.rb, line 240
  def self.task_ids(options = { :class=>nil, :group=>nil })
    target = @@base_dir
    target += '/'
    if options[:class]
      target += "#{safe_class_name(options[:class])}"
    else
      target += "[^-]+"
    end
    target += "-#{options[:group]}-" if options[:group]
    
    task_id_list = []
    Dir.glob(@@base_dir+"/*.task") { |d|
      if target.empty? || /#{target}/ =~ d
        task_id_list.push File.basename(d, ".*")
      end
    }
    task_id_list
  end
task_path(task_id) click to toggle source
# File lib/rask.rb, line 161
  def self.task_path(task_id)
    @@base_dir+"/#{task_id}.task"
  end
thread_max_count=(count) click to toggle source

Set max count of worker thread

default :5
# File lib/rask.rb, line 156
  def self.thread_max_count=(count)
    @@thread_max_count = count
  end

Private Class Methods

initialize_storage() click to toggle source
# File lib/rask.rb, line 310
  def self.initialize_storage
    FileUtils.makedirs @@base_dir unless File.exists? @@base_dir
    FileUtils.makedirs @@base_dir+"/suspended" unless File.exists? @@base_dir+"/suspended"
  end
safe_class_name(c) click to toggle source
# File lib/rask.rb, line 316
  def self.safe_class_name(c)
    c.gsub(/[:]/,'@')
  end
safe_exit(process_name) click to toggle source
# File lib/rask.rb, line 344
  def self.safe_exit(process_name)
    @@terminated = true
    while @@thread_count > 0
      sleep(0.1)
    end
    FileUtils.rm(pid_path(process_name)) if File.exist?(pid_path(process_name))
    print "[Rask]  safely daemon terminated. \n"
    exit
  end
thread_proc(thread_id, worker_sleep = 0.1) click to toggle source
# File lib/rask.rb, line 321
  def self.thread_proc(thread_id, worker_sleep = 0.1)
    @@thread_count += 1
    print "[Rask] Thread Start ID:#{thread_id}\n"
    while !@@terminated
      d = nil
      @@locker.synchronize do
        d = @@queue.pop unless @@queue.empty?
      end
      if d != nil
        #print "task #{d}\n"
        run(d)
        @@locker.synchronize do
          @@processing.delete(d)
        end
      else
      # print "no data in queue\n"
      end
      sleep(worker_sleep)
    end
    print "[Rask] Thread Exit ID:#{thread_id}\n"
    @@thread_count -= 1
  end

Disabled; run with --debug to generate this.

[Validate]

Generated with the Darkfish Rdoc Generator 1.1.6.