Ruby Concurrent Programing
Thread
Threads用于实现Ruby并发编程模型,使用Thread Class是开发者的完美候选比如,我们可以创建一个与主线程分离的新线程,通过::new
thr = Thread.new { puts "What's the big deal" }
然后我们可以停止主线程执行,等待新线程结束,通过::join
thr.join #=> "What's the big deal"
如果不在主线程结束前使用thr.join,那么包括thr在内的所有线程都会被kill
此外,可以通过数组储存线程实例
threads = []
threads << Thread.new { puts "What's the big deal" }
threads << Thread.new { 3.times { puts "Threads are fun!" } }
然后陆续的等它们完成
threads.each { |thr| thr.join }
Thread的初始化
为了创建新线程,Ruby提供了::new,::start,::fork,所有的方法必须提供block,否则会导致出现ThreadError当继承Thread Class时,::start,::fork将会忽视子类的initialize方法,此外,确保子类initialize中调用super方法
Thread的终止
对于终止线程,Ruby提供了多种方法实现它 class method ::kill意味着kill掉给出的进程thr = Thread.new { ... }
Thread.kill(thr)
同样可以使用其实例方法#exit或其别名#kill或#terminate
Thread的状态
Ruby提供了几个方法查询线程的状态,为了得到以字符串表示的状态,使用#statusthr = Thread.new { sleep }
thr.status #=> sleep
thr.exit
thr.status #=> false
也可以使用#alive?方法辨别线程是否运行或睡眠中,#stop?辨别线程是否dead或睡眠中
Thread变量及作用域
因为创建线程伴随着block,所以对于block的规则,同样应用于线程,block内的变量只属于线程Fiber-local vs. Thread-local
每个Fiber拥有自己的Thread#[]储存,当设置一个Fiber-local,只能在Fiber内访问,为了证明它:def main()
Thread.new {
Thread.current[:foo] = "bar"
Fiber.new {
p Thread.current[:foo] # => nil
}.resume
}.join
end
main()
这个例子使用#[]来得到和#[]=使用fiber-locals,也可以使用#keys列出给出线程的fiber-local和使用#key?来检查fiber-local是否存在,当来源于thread-locals时,线程能够完全访问作用域
def main()
Thread.new {
Thread.current.thread_variable_set(:foo, 1)
p Thread.current.thread_variable_get(:foo) # => 1
Fiber.new {
Thread.current.thread_variable_set(:foo, 2)
p Thread.current.thread_variable_get(:foo) # => 2
}.resume
p Thread.current.thread_variable_get(:foo) # => 2
}.join
end
main()
可以看到thread-local:foo转到fiber变成2在线程结束,这个例子使用#thread_variable_set创建新的thread-local,而thread_variable_get得到其引用
这里也有thread_variables列出存在的thread-local,也有thread_variable?判断变量是否存在
异常处理
任何线程都能引发异常,使用实例方法#raise,它和Kenerl#raise是相似的,但是注意到除主线程外异常发生取决于#abort_on_exception,这个值默认是false,意味着,任何未处理的异常将会造成线程静默的终止当被#join或#value等待,可以设置abort_on_exception = true或$DEBUG = true,加上类方法::handle_interrupt,可以异步处理线程的异常调度
Ruby提供了几种方法调度线程,第一种方法是使用::stop,使得当前线程sleep,使得另一个线程得到调度 一旦线程被睡眠,可以调用#wakeup使得线程能够得到合理的调度,也可以使用::pass,使得跳过当前线程执行另一个线程,但是这取决于操作系统,运行中的线程是否转换,还有#priority可以提示调度器线程调度的优先级,同样是平台相关的,可能会被忽略Mutex
Mutex实现了简单的信号量正确访问共享的多线程变量def main
semaphore = Mutex.new
a = Thread.new {
semaphore.synchronize do
100.times { |index| puts "Thread a: #{index}" }
end
}
b = Thread.new {
semaphore.synchronize do
100.times { |index| puts "Thread b: #{index}" }
end
}
a.join
b.join
end
main()
实例方法 | 描述 |
---|---|
lock | 试图获得锁并等待如果不能提供,当前线程重复lock会引发ThreadError |
locked? | 判断Mutex是否某个线程lock了 |
owned? | 如果当前线程lock了,返回true |
try_lock | 试图获得锁并立即返回,如果获得锁返回true |
unlock | 解锁,如果当前线程不持有锁,将会引发ThreadError |
synchronize | 获得锁,并且运行block,block完成后释放锁 |
sleep(timeout = nil) | 释放锁,并sleep timeout,下次唤醒时将重新要求锁 |
ConditionVariable
条件变量参数是Mutex对象,使用条件变量可以停止等待正忙的关键部分,直到资源可供实例方法 | 描述 |
---|---|
wait(mutex, timeout = nil) | 释放mutex的锁并且等待,重新唤醒时获得锁(超时会重新获得锁) |
signal | 唤醒第一个等待的thread |
broadcast | 唤醒所有等待的线程 |
def main
mutex = Mutex.new
condition = ConditionVariable.new
a = Thread.new do
mutex.synchronize do
2.times { puts "Thread a first two times"}
condition.wait(mutex)
10.times { |index| puts "Thread a: seq #{index} is random" }
end
end
b = Thread.new do
Thread.pass # let a first
mutex.synchronize do
puts "start to wakeup thread a"
condition.signal
10.times { |index| puts "Thread b: seq #{index} is random" }
end
end
a.join
b.join
end
main()
Next Post