并 发库中的BlockingQueue 是一个比较好玩的类,顾名思义,就是阻塞队列。该类主要提供了两个方法put()和take(),前者将一个对象放到队列中,如果队列已经满了,就等待直到有空闲节点;后者从head取一个对象,如果没有对象,就等待直到有可取的对象。
下面的例子比较简单,一个读线程,用于将要处理的文件对象添加到阻塞队列中, 另外四个写线程用于取出文件对象,为了模拟写操作耗时长的特点,特让线程睡眠一段随机长度的时间。另外,该Demo也使用到了线程池和原子整型 (AtomicInteger),AtomicInteger可以在并发情况下达到原子化更新,避免使用了synchronized,而且性能非常高。由 于阻塞队列的put和take操作会阻塞,为了使线程退出,特在队列中添加了一个“标识”,算法中也叫“哨兵”,当发现这个哨兵后,写线程就退出。
当然线程池也要显式退出了。
package
concurrent;
import
java.io.File;
import
java.io.FileFilter;
import
java.util.concurrent.BlockingQueue;
import
java.util.concurrent.ExecutorService;
import
java.util.concurrent.Executors;
import
java.util.concurrent.LinkedBlockingQueue;
import
java.util.concurrent.atomic.AtomicInteger;
public class
TestBlockingQueue
{
static
long
randomTime
() {
return
(
long
) (
Math.random
()
*
1000
)
;
}
public static
void
main
(
String
[]
args
) {
// 能容纳100个文件
final
BlockingQueue<File> queue =
new
LinkedBlockingQueue<File>
(
100
)
;
// 线程池
final
ExecutorService exec = Executors.newFixedThreadPool
(
5
)
;
final
File root =
new
File
(
"F:\\JavaLib"
)
;
// 完成标志
final
File exitFile =
new
File
(
""
)
;
// 读个数
final
AtomicInteger rc =
new
AtomicInteger
()
;
// 写个数
final
AtomicInteger wc =
new
AtomicInteger
()
;
// 读线程
Runnable read =
new
Runnable
() {
public
void
run
() {
scanFile
(
root
)
;
scanFile
(
exitFile
)
;
}
public
void
scanFile
(
File file
) {
if
(
file.isDirectory
()) {
File
[]
files = file.listFiles
(
new
FileFilter
() {
public
boolean
accept
(
File pathname
) {
return
pathname.isDirectory
()
|| pathname.getPath
()
.endsWith
(
".java"
)
;
}
})
;
for
(
File one : files
)
scanFile
(
one
)
;
}
else
{
try
{
int
index = rc.incrementAndGet
()
;
System.out.println
(
"Read0: "
+ index +
" "
+ file.getPath
())
;
queue.put
(
file
)
;
}
catch
(
InterruptedException e
) {
}
}
}
}
;
exec.submit
(
read
)
;
// 四个写线程
for
(
int
index =
0
; index <
4
; index++
) {
// write thread
final
int
NO = index;
Runnable write =
new
Runnable
() {
String threadName =
"Write"
+ NO;
public
void
run
() {
while
(
true
) {
try
{
Thread.sleep
(
randomTime
())
;
int
index = wc.incrementAndGet
()
;
File file = queue.take
()
;
// 队列已经无对象
if
(
file == exitFile
) {
// 再次添加"标志",以让其他线程正常退出
queue.put
(
exitFile
)
;
break
;
}
System.out.println
(
threadName +
": "
+ index +
" "
+ file.getPath
())
;
}
catch
(
InterruptedException e
) {
}
}
}
}
;
exec.submit
(
write
)
;
}
exec.shutdown
()
;
}
}
发表评论
- 浏览: 8593 次
- 性别:

- 来自: 广州

- 详细资料
搜索本博客
最近加入圈子
最新评论
-
java中堆和堆栈的区别
引用[u][/u][i][/i][b][/b][img][/img][url ...
-- by yinguohui -
阻塞I/O,非阻塞I/O
1、异步与同步的区别:异步不会引起request进程阻塞。而同步会。2、阻塞IO ...
-- by jokermanager -
阻塞I/O,非阻塞I/O
两个问题: 1. "read操作是否阻塞,阻塞I/O是阻塞的,而异步I ...
-- by FGhost






评论排行榜