码迷,mamicode.com
首页 > 其他好文 > 详细

【并发技术13】条件阻塞Condition的应用

时间:2020-11-10 11:39:02      阅读:25      评论:0      收藏:0      [点我收藏+]

标签:rac   完全   效果   final   http   感悟   服务架构   read   als   

阅读本文大概需要6分钟

今天周六,该休息休息,该浪浪,武哥还是来聊聊技术吧,如题。

Condition 将 Object 监听器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用, Condition 替代了 Object 监视器方法的使用。

1. Condition的基本使用

由于 Condition 可以用来替代 wait、notify 等方法,所以可以对比着之前写过的传统线程同步通信技术的代码来看,先来回顾一下原来的问题:

有两个线程,子线程先执行10次,然后主线程执行5次,然后再切换到子线程执行10,再主线程执行5次……如此往返执行50次。

之前用 wait 和notify 来实现的,现在用 Condition 来改写一下,代码如下:

public

class

ConditionCommunication

{

public

static

void
 main
(
String
[]
 args
)

{

Business
 bussiness 
=

new

Business
();

new

Thread
(
new

Runnable
()

{
// 开启一个子线程

@Override

public

void
 run
()

{

for

(
int
 i 
=

1
;
 i 
<=

50
;
 i
++)

{
                            bussiness
.
sub
(
i
);

}

}

}).
start
();

// main方法主线程

for

(
int
 i 
=

1
;
 i 
<=

50
;
 i
++)

{
            bussiness
.
main
(
i
);

}

}

}
class

Business

{

Lock
 lock 
=

new

ReentrantLock
();

Condition
 condition 
=
 lock
.
newCondition
();

//Condition是在具体的lock之上的

private

boolean
 bShouldSub 
=

true
;

public

void
 sub
(
int
 i
)

{
        lock
.
lock
();

try

{

while

(!
bShouldSub
)

{

try

{
                    condition
.
await
();

//用condition来调用await方法

}

catch

(
Exception
 e
)

{

// TODO Auto-generated catch block
                    e
.
printStackTrace
();

}

}

for

(
int
 j 
=

1
;
 j 
<=

10
;
 j
++)

{

System
.
out
.
println
(
"sub thread sequence of "

+
 j

+

", loop of "

+
 i
);

}
            bShouldSub 
=

false
;
            condition
.
signal
();

//用condition来发出唤醒信号,唤醒某一个

}

finally

{
            lock
.
unlock
();

}

}

public

void
 main
(
int
 i
)

{
        lock
.
lock
();

try

{

while

(
bShouldSub
)

{

try

{
                    condition
.
await
();

//用condition来调用await方法

}

catch

(
Exception
 e
)

{

// TODO Auto-generated catch block
                    e
.
printStackTrace
();

}

}

for

(
int
 j 
=

1
;
 j 
<=

10
;
 j
++)

{

System
.
out
.
println
(
"main thread sequence of "

+
 j

+

", loop of "

+
 i
);

}
            bShouldSub 
=

true
;
            condition
.
signal
();

//用condition来发出唤醒信号么,唤醒某一个

}

finally

{
            lock
.
unlock
();

}

}
}

从代码来看,Condition 的使用是和 Lock 一起的,没有 Lock 就没法使用 Condition,因为 Condition 是通过 Lock 来 new 出来的,这种用法很简单,只要掌握了 synchronized 和 wait、notify 的使用,完全可以掌握 Lock 和 Condition 的使用。

2. Condition的拔高

2.1 缓冲区的阻塞队列

上面使用 Lock 和 Condition 来代替 synchronized 和 Object 监视器方法实现了两个线程之间的通信,现在再来写个稍微高级点的应用:模拟缓冲区的阻塞队列。

什么叫缓冲区呢?举个例子,现在有很多人要发消息,我是中转站,我要帮别人把消息发出去,那么现在我就需要做两件事,一件事是接受用户发过来的消息,并按照顺序放到缓冲区,另一件事是从缓冲区按顺序取出用户发过来的消息,并发送出去。

现在把这个实际的问题抽象一下:缓冲区即一个数组,我们可以向数组种写入数据,也可以从数组种把数据取走,我要做的两件事就是开启两个线程,一个存数据,一个取数据。但是问题来了,如果缓冲区满了,说明接收的消息太多了,即发送过来的消息太快了,我另一个线程还来不及发完,导致现在的缓冲区没地方放了,那么此时就得阻塞存数据这个线程,让其等待;相反,如果我转发的太快,现在缓冲区所有内容都被我发完了,还没有用户发新的消息来,那么此时就得阻塞取数据这个线程。

好了,分析完了这个缓冲区的阻塞队列,下面就用 Condition 技术来实现一下。

class

Buffer

{

final

Lock
 lock 
=

new

ReentrantLock
();

//定义一个锁

final

Condition
 notFull 
=
 lock
.
newCondition
();

//定义阻塞队列满了的Condition

final

Condition
 notEmpty 
=
 lock
.
newCondition
();
//定义阻塞队列空了的Condition

final

Object
[]
 items 
=

new

Object
[
10
];

//为了下面模拟,设置阻塞队列的大小为10,不要设太大

int
 putptr
,
 takeptr
,
 count
;

//数组下标,用来标定位置的

//往队列中存数据

public

void
 put
(
Object
 x
)

throws

InterruptedException

{
        lock
.
lock
();

//上锁

try

{

while

(
count 
==
 items
.
length
)

{

System
.
out
.
println
(
Thread
.
currentThread
().
getName
()

+

" 被阻塞了,暂时无法存数据!"
);
                notFull
.
await
();

//如果队列满了,那么阻塞存数据这个线程,等待被唤醒

}

//如果没满,按顺序往数组中存
            items
[
putptr
]

=
 x
;

if

(++
putptr 
==
 items
.
length
)

//这是到达数组末端的判断,如果到了,再回到始端
                putptr 
=

0
;

++
count
;

//消息数量

System
.
out
.
println
(
Thread
.
currentThread
().
getName
()

+

" 存好了值: "

+
 x
);
            notEmpty
.
signal
();

//好了,现在队列中有数据了,唤醒队列空的那个线程,可以取数据啦

}

finally

{
            lock
.
unlock
();

//放锁

}

}

//从队列中取数据

public

Object
 take
()

throws

InterruptedException

{
        lock
.
lock
();

//上锁

try

{

while

(
count 
==

0
)

{

System
.
out
.
println
(
Thread
.
currentThread
().
getName
()

+

" 被阻塞了,暂时无法取数据!"
);
                notEmpty
.
await
();

//如果队列是空,那么阻塞取数据这个线程,等待被唤醒

}

//如果没空,按顺序从数组中取

Object
 x 
=
 items
[
takeptr
];

if

(++
takeptr 
==
 items
.
length
)

//判断是否到达末端,如果到了,再回到始端
                takeptr 
=

0
;

--
count
;

//消息数量

System
.
out
.
println
(
Thread
.
currentThread
().
getName
()

+

" 取出了值: "

+
 x
);
            notFull
.
signal
();

//好了,现在队列中有位置了,唤醒队列满的那个线程,可以存数据啦

return
 x
;

}

finally

{
            lock
.
unlock
();

//放锁

}

}
}

这个程序很经典,我是从官方 JDK 文档中拿出来的,然后加了注释。程序中定义了来两个 Condition,分别针对两个线程,等待和唤醒分别用不同的 Condition 来执行,思路很清晰,程序也很健壮。

可以考虑一个问题,为啥要用两个 Condition 呢?之所以这么设计肯定是有原因的:如果用一个 Condition,现在假设队列满了,但是有 2 个线程 A 和 B 同时存数据,那么都进入了睡眠,好,现在另一个线程取走一个了,然后唤醒了其中一个线程 A,那么 A 可以存了,存完后, A 又唤醒一个线程,如果 B 被唤醒了,那就出问题了,因为此时队列是满的,B 不能存的,B存的话就会覆盖原来还没被取走的只,这就是一个 Condition 带来的问题。

来测试一下上面的阻塞队列的效果。

public

class

BoundedBuffer

{

public

static

void
 main
(
String
[]
 args
)

{

Buffer
 buffer 
=

new

Buffer
();

for
(
int
 i 
=

0
;
 i 
<

5
;
 i 
++)

{

//开启5个线程往缓冲区存数据

new

Thread
(
new

Runnable
()

{

@Override

public

void
 run
()

{

try

{
                        buffer
.
put
(
new

Random
().
nextInt
(
1000
));

//随机存数据

}

catch

(
InterruptedException
 e
)

{
                        e
.
printStackTrace
();

}

}

}).
start
();

}

for
(
int
 i 
=

0
;
 i 
<

10
;
 i 
++)

{

//开启10个线程从缓冲区中取数据

new

Thread
(
new

Runnable
()

{

@Override

public

void
 run
()

{

try

{
                        buffer
.
take
();

//从缓冲区取数据

}

catch

(
InterruptedException
 e
)

{
                        e
.
printStackTrace
();

}

}

}).
start
();

}

}
}

我故意只开启 5 个线程存数据,10 个线程取数据,就是想让它出现取数据被阻塞的情况发生,看运行的结果。

Thread-5 被阻塞了,暂时无法取数据!
Thread-10 被阻塞了,暂时无法取数据!
Thread-1 存好了值: 755
Thread-0 存好了值: 206
Thread-2 存好了值: 741
Thread-3 存好了值: 381
Thread-14 取出了值: 755
Thread-4 存好了值: 783
Thread-6 取出了值: 206
Thread-7 取出了值: 741
Thread-8 取出了值: 381
Thread-9 取出了值: 783
Thread-5 被阻塞了,暂时无法取数据!
Thread-11 被阻塞了,暂时无法取数据!
Thread-12 被阻塞了,暂时无法取数据!
Thread-10 被阻塞了,暂时无法取数据!
Thread-13 被阻塞了,暂时无法取数据!

从结果中可以看出,线程 5 和10 抢先执行,发现队列中没有,于是就被阻塞了,睡在那了,知道队列中有新的值存入才可以取,但是他们两运气不好,存的数据又被其他线程前线取走了……可以多运行几次。如果想要看到存数据被阻塞,可以将取数据的线程设置少一点,这里我就不设了。

2.2 两个以上线程之间的唤醒

还是原来那个题目,现在让三个线程来执行,看一下题目:

有三个线程,子线程1先执行10次,然后子线程2执行10次,然后主线程执行5次,然后再切换到子线程1执行10次,子线程2执行10次,主线程执行5次……如此往返执行50次。

如果不用 Condition,还真不好弄,但是用 Condition 来做的话,就非常方便了,原理很简单,定义三个 Condition,子线程 1 执行完唤醒子线程 2,子线程 2 执行完唤醒主线程,主线程执行完唤醒子线程1。唤醒机制和上面那个缓冲区道理差不多,下面附上代码。

public

class

ThreeConditionCommunication

{

public

static

void
 main
(
String
[]
 args
)

{

Business
 bussiness 
=

new

Business
();

new

Thread
(
new

Runnable
()

{
// 开启一个子线程

@Override

public

void
 run
()

{

for

(
int
 i 
=

1
;
 i 
<=

50
;
 i
++)

{
                            bussiness
.
sub1
(
i
);

}

}

}).
start
();

new

Thread
(
new

Runnable
()

{
// 开启另一个子线程

@Override

public

void
 run
()

{

for

(
int
 i 
=

1
;
 i 
<=

50
;
 i
++)

{
                    bussiness
.
sub2
(
i
);

}

}

}).
start
();

// main方法主线程

for

(
int
 i 
=

1
;
 i 
<=

50
;
 i
++)

{
            bussiness
.
main
(
i
);

}

}

static

class

Business

{

Lock
 lock 
=

new

ReentrantLock
();

Condition
 condition1 
=
 lock
.
newCondition
();

//Condition是在具体的lock之上的

Condition
 condition2 
=
 lock
.
newCondition
();

Condition
 conditionMain 
=
 lock
.
newCondition
();

private

int
 bShouldSub 
=

0
;

public

void
 sub1
(
int
 i
)

{
            lock
.
lock
();

try

{

while

(
bShouldSub 
!=

0
)

{

try

{
                        condition1
.
await
();

//用condition来调用await方法

}

catch

(
Exception
 e
)

{

// TODO Auto-generated catch block
                        e
.
printStackTrace
();

}

}

for

(
int
 j 
=

1
;
 j 
<=

10
;
 j
++)

{

System
.
out
.
println
(
"sub1 thread sequence of "

+
 j

+

", loop of "

+
 i
);

}
                bShouldSub 
=

1
;
                condition2
.
signal
();

//让线程2执行

}

finally

{
                lock
.
unlock
();

}

}

public

void
 sub2
(
int
 i
)

{
            lock
.
lock
();

try

{

while

(
bShouldSub 
!=

1
)

{

try

{
                        condition2
.
await
();

//用condition来调用await方法

}

catch

(
Exception
 e
)

{

// TODO Auto-generated catch block
                        e
.
printStackTrace
();

}

}

for

(
int
 j 
=

1
;
 j 
<=

10
;
 j
++)

{

System
.
out
.
println
(
"sub2 thread sequence of "

+
 j

+

", loop of "

+
 i
);

}
                bShouldSub 
=

2
;
                conditionMain
.
signal
();

//让主线程执行

}

finally

{
                lock
.
unlock
();

}

}

public

void
 main
(
int
 i
)

{
            lock
.
lock
();

try

{

while

(
bShouldSub 
!=

2
)

{

try

{
                        conditionMain
.
await
();

//用condition来调用await方法

}

catch

(
Exception
 e
)

{

// TODO Auto-generated catch block
                        e
.
printStackTrace
();

}

}

for

(
int
 j 
=

1
;
 j 
<=

5
;
 j
++)

{

System
.
out
.
println
(
"main thread sequence of "

+
 j

+

", loop of "

+
 i
);

}
                bShouldSub 
=

0
;
                condition1
.
signal
();

//让线程1执行

}

finally

{
                lock
.
unlock
();

}

}

}
}

关于线程中 Condition 技术就分享这么多吧。

这里有技术、有段子、有生活、有感悟、有资源,要不然怎么叫『程序员私房菜』呢?什么?你不是程序员?这有关系吗?来吧,还等什么~
技术图片

更多推荐阅读:

微服务架构盛行的时代,你需要了解点 Spring Boot
读一篇故事,交一个朋友~
【并发技术10】线程并发库的使用
【并发技术11】Callable与Future的应用
【并发技术12】线程锁技术的使用
技术图片

【并发技术13】条件阻塞Condition的应用

标签:rac   完全   效果   final   http   感悟   服务架构   read   als   

原文地址:https://blog.51cto.com/14989612/2548415

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!