在上一章锁与信号量的基础上,本章详细讲解同步互斥的意义与区别。通过3个经典同步模型,来讲解同步的实现。
3个经典同步模型分别为:
- 1:生产者消费者模型
- 2:读者写着模型
- 3:理发师模型
同步与互斥
同步与互斥定义
互斥:互斥的目的是为了保护临界资源,确保多个线程,在同一时刻最多只能有一个线程访问临界资源,具有排他性,但是互斥并不要求每个线程的访问顺序。
同步:同步是利用信号量机制实现线程对临界资源的有序访问。大多数情况同步是在互斥的基础上,少数情况允许多个线程同时访问临界资源。
实现方式
- 使用锁机制实现线程互斥
- 使用信号量实现线程互斥
- 使用信号量实现线程同步
同步问题常见模型:
生产者消费者模型
生产者消费者问题,也叫有限缓冲问题,是一种比较常见的并发问题。比如操作系统进程通信的方式:消息队列,就是使用的生产者消费者模型
public class ProblemProducerCustomer {
public static void main(String[] args) {
WareHouse warehouse = new WareHouse();
ExecutorService pool = Executors.newCachedThreadPool();
pool.execute(new Customer(50, warehouse));
pool.execute(new Producer(20, warehouse));
pool.execute(new Producer(30, warehouse));
pool.execute(new Producer(20, warehouse));
pool.shutdown();
}
public static class WareHouse {
int num = 0;
final int maxnum = 100;
// 当前线程不含有当前对象的锁资源的时候,调用obj.wait()方法;调用obj.notify()方法。调用obj.notifyAll()方法。会有异常
public synchronized void product(int x) {
while (num + x > maxnum) {
try {
System.out.println("超出容量,等待消费~");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
num = num + x;
System.out.println("已生产~");
notifyAll();
}
public synchronized void consume(int x) {
while (num - x < 0) {
try {
System.out.println("容量不足,等待生产~");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
num = num - x;
System.out.println("已消费~");
notifyAll();
}
public int getnum() {
return num;
}
}
public static class Producer implements Runnable {
int speed;
WareHouse warehouse;
public Producer(int speed, WareHouse warehouse) {
this.speed = speed;
this.warehouse = warehouse;
}
@Override
public void run() {
warehouse.product(speed);
}
}
public static class Customer implements Runnable {
int speed;
WareHouse warehouse;
public Customer(int speed, WareHouse warehouse) {
this.speed = speed;
this.warehouse = warehouse;
}
@Override
public void run() {
warehouse.consume(speed);
}
}
}
从图中可以看出来,消费者先运行,可是资源不足,等待生产,等到生产者生产足够,通知消费者消费~
贪睡的理发师模型
理发师问题可以理解为一种特殊的生产者、消费者问题。理发师问题的特殊之处在于
- 使用了信号量以及互斥量机制。
- 互斥量使多线程可以互斥第访问信号量
- 双信号量,记录理发师状态,及顾客数量
条件:1名理发师,1个理发座椅,n个等待座椅
逻辑:
- 若无人理发,则理发师睡觉;
- 若有人进入理发店,理发师在睡觉,则叫醒理发师,进行理发;
- 若有人进入理发店,理发师在理发,且有空的等待座椅,则入座等待;
- 若有人进入理发店,理发师在理发,且无空的等待座椅,则离开。
Java实现
public class Sleep{
public static void main(String[] args) {
System.out.println("贪睡的理发师,睡觉中。。。");
while(true);
}
当然上面是开玩笑的,只会睡觉的理发师是会破产的,下面才是多线程代码:
public class ProblemBarber {
public static void main(String[] args) {
Semaphore signal = new Semaphore(10);// 10个空椅子
Semaphore sleep = new Semaphore(0);// 初始理发师在睡觉(这里的睡觉相当于挂起,而不是执行睡觉)
Thread barber = new Thread(new Barber(signal, sleep));
barber.start();
try {
Thread.sleep(1000);
while (true) {
Thread customer = new Thread(new Customer(signal, sleep));
customer.start();
Thread.sleep(100);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static class Barber implements Runnable {
Semaphore signal;
Integer num;
Semaphore sleep;
public Barber(Semaphore signal, Semaphore sleep) {
this.signal = signal;
this.sleep = sleep;
}
@Override
public void run() {
synchronized (this) {
try {
sleep.acquire();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
while (signal.availablePermits() > 0) {
try {
Thread.sleep(1000);// 1s 服务一名顾客
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("1:一名顾客理完发!");
signal.release();
}
}
}
}
public static class Customer implements Runnable {
Semaphore signal;
Semaphore sleep;
public Customer(Semaphore signal, Semaphore sleep) {
this.signal = signal;
this.sleep = sleep;
}
@Override
public void run() {
synchronized (this) {
if (signal.tryAcquire())// 获得信号量
{
System.out.println("2:新添一名顾客!" + signal.availablePermits());
if (signal.availablePermits() == 9)
sleep.release();
} else {
System.out.println("2:等待座椅不够,顾客离开!");
}
}
}
}
}
从图中可以看出,理发师被叫醒后一直理发,完成一位,立刻又进入一位顾客。
读者-写者问题
前提:(写互斥)
- (1)允许多个读者同时执行读操作;
- (2)不允许读者、写者同时操作;
- (3)不允许多个写者同时操作。
读者-写着模型,分3种情况:
- 1:读者优先 (读操作占有写锁,使得后续的写操作挂起,读操作优先执行)
- 2:顺序操作 (写操作占有读锁,使得所有在写操作后面的线程均在写操作后执行)
- 3:写着优先 (增加互斥量write_mutex,使得读操作的等待队列中,最多只有一个获得读锁,
- 从而确保写操作优先获得读锁,延迟其他读操作的进行,确保写操作优先!是最常见的模型)
public class ProblemReaderWriter{
public static enum Mode
{
READFIRST,
WRITEFRIST,
NORMAL;
}
public static void main(String[] args)
{
Semaphore readerNum = new Semaphore(0);//信号量:读者总数
Semaphore writerNum = new Semaphore(1);//信号量:写着总数
Semaphore mutexReader = new Semaphore(1);//读者互斥锁
Semaphore mutexWriter = new Semaphore(1);//写者互斥锁
Mode mode = Mode.READFIRST; //模式选择
Lock lock = new ReentrantLock();
ExecutorService pool = Executors.newCachedThreadPool();
//3种模式如果同时运行,因存在同步问题,会有意想不到的结果
//最好还是分开运行
switch(mode)
{
case READFIRST:
System.out.println("读优先:");
pool.execute(new Reader1(readerNum,writerNum,lock));
pool.execute(new Writer1(readerNum,writerNum,lock));
pool.execute(new Reader1(readerNum,writerNum,lock));
pool.execute(new Reader1(readerNum,writerNum,lock));
break;
case NORMAL:
System.out.println("顺序操作:");
pool.execute(new Reader2(readerNum,writerNum,mutexReader));
pool.execute(new Writer2(readerNum,writerNum,mutexReader));
pool.execute(new Reader2(readerNum,writerNum,mutexReader));
pool.execute(new Reader2(readerNum,writerNum,mutexReader));
break;
case WRITEFRIST:
System.out.println("写优先:");
pool.execute(new Reader3(readerNum,writerNum,mutexReader,mutexWriter));
pool.execute(new Reader3(readerNum,writerNum,mutexReader,mutexWriter));
pool.execute(new Reader3(readerNum,writerNum,mutexReader,mutexWriter));
pool.execute(new Writer3(readerNum,writerNum,mutexReader,mutexWriter));
break;
}
pool.shutdown();
}
static class Reader1 implements Runnable
{
Semaphore readerNum;
Semaphore writerNum;
Lock lock;
public Reader1(Semaphore readerNum,Semaphore writerNum,Lock lock) {
this.readerNum=readerNum;
this.writerNum=writerNum;
this.lock=lock;
}
@Override
public void run() {
synchronized (readerNum) {
readerNum.release();//读者+1
if(readerNum.availablePermits()==1)
try {
writerNum.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("我在读"+readerNum.availablePermits());
try {
Thread.sleep(1000);
synchronized (readerNum) {
readerNum.acquire() ;//有可能挂起
if(readerNum.availablePermits()==0)
writerNum.release();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Writer1 implements Runnable
{
Semaphore readerNum;
Semaphore writerNum;
Lock lock;
public Writer1(Semaphore readerNum,Semaphore writerNum,Lock lock) {
this.readerNum=readerNum;
this.writerNum=writerNum;
this.lock=lock;
}
@Override
public void run() {
try {
writerNum.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我在写");
writerNum.release();
}
}
//顺序操作,添加了一个信号量:读锁
static class Reader2 implements Runnable
{
Semaphore readerNum;
Semaphore writerNum;
Semaphore mutexReader;
public Reader2(Semaphore readerNum,Semaphore writerNum,Semaphore lock) {
this.readerNum=readerNum;
this.writerNum=writerNum;
this.mutexReader=lock;
}
@Override
public void run() {
try {
mutexReader.acquire();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
synchronized (readerNum) {
readerNum.release();//读者+1
if(readerNum.availablePermits()==1)
try {
writerNum.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("我在读"+readerNum.availablePermits());
try {
Thread.sleep(1000);
synchronized (readerNum) {
readerNum.acquire() ;//有可能挂起
if(readerNum.availablePermits()==0)
{
writerNum.release();
}
}
mutexReader.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Writer2 implements Runnable
{
Semaphore readerNum;
Semaphore writerNum;
Semaphore mutexReader;
public Writer2(Semaphore readerNum,Semaphore writerNum,Semaphore lock) {
this.readerNum=readerNum;
this.writerNum=writerNum;
this.mutexReader=lock;
}
@Override
public void run() {
try {
mutexReader.acquire();
writerNum.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我在写");
writerNum.release();
mutexReader.release();
}
}
//写优先操作,添加了2个信号量:读锁,写锁
static class Reader3 implements Runnable
{
Semaphore readerNum;
Semaphore writerNum;
Semaphore mutexReader;
Semaphore mutexWriter;
public Reader3(Semaphore readerNum,Semaphore writerNum,Semaphore lock,Semaphore mutexWriter) {
this.readerNum=readerNum;
this.writerNum=writerNum;
this.mutexReader=lock;
this.mutexWriter=mutexWriter;
}
@Override
public void run() {
try {
mutexWriter.acquire();
mutexReader.acquire();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
synchronized (readerNum) {
readerNum.release();//读者+1
if(readerNum.availablePermits()==1)
try {
writerNum.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("我在读"+readerNum.availablePermits());
try {
Thread.sleep(1000);
synchronized (readerNum) {
readerNum.acquire() ;//有可能挂起
if(readerNum.availablePermits()==0)
{
writerNum.release();
}
}
mutexReader.release();
mutexWriter.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Writer3 implements Runnable
{
Semaphore readerNum;
Semaphore writerNum;
Semaphore mutexReader;
Semaphore mutexWriter;
public Writer3(Semaphore readerNum,Semaphore writerNum,Semaphore lock,Semaphore mutexWriter) {
this.readerNum=readerNum;
this.writerNum=writerNum;
this.mutexReader=lock;
this.mutexWriter=mutexWriter;
}
@Override
public void run() {
try {
mutexReader.acquire();
writerNum.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我在写");
writerNum.release();
mutexReader.release();
}
}
}
上一章: 锁与信号量的2+2
下一章: 死锁