JAVA多线程系列-7-同步互斥及模型实现

2016/06/03 Java

在上一章锁与信号量的基础上,本章详细讲解同步互斥的意义与区别。通过3个经典同步模型,来讲解同步的实现。

3个经典同步模型分别为:

  • 1:生产者消费者模型
  • 2:读者写着模型
  • 3:理发师模型

同步与互斥

同步与互斥定义

互斥:互斥的目的是为了保护临界资源,确保多个线程,在同一时刻最多只能有一个线程访问临界资源,具有排他性,但是互斥并不要求每个线程的访问顺序。

同步:同步是利用信号量机制实现线程对临界资源的有序访问。大多数情况同步是在互斥的基础上,少数情况允许多个线程同时访问临界资源。

实现方式

  • 使用锁机制实现线程互斥
  • 使用信号量实现线程互斥
  • 使用信号量实现线程同步

同步问题常见模型:

生产者消费者模型

生产者消费者问题,也叫有限缓冲问题,是一种比较常见的并发问题。比如操作系统进程通信的方式:消息队列,就是使用的生产者消费者模型

public class ProblemProducerCustomer {

	public static void main(String[] args) {
		WareHouse warehouse = new WareHouse();
		ExecutorService pool = Executors.newCachedThreadPool();
		pool.execute(new Customer(50, warehouse));
		pool.execute(new Producer(20, warehouse));
		pool.execute(new Producer(30, warehouse));
		pool.execute(new Producer(20, warehouse));

		pool.shutdown();
	}

	public static class WareHouse {
		int num = 0;
		final int maxnum = 100;

		// 当前线程不含有当前对象的锁资源的时候,调用obj.wait()方法;调用obj.notify()方法。调用obj.notifyAll()方法。会有异常
		public synchronized void product(int x) {
			while (num + x > maxnum) {
				try {
					System.out.println("超出容量,等待消费~");
					wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			num = num + x;
			System.out.println("已生产~");
			notifyAll();
		}

		public synchronized void consume(int x) {
			while (num - x < 0) {
				try {
					System.out.println("容量不足,等待生产~");
					wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			num = num - x;
			System.out.println("已消费~");
			notifyAll();
		}

		public int getnum() {
			return num;
		}
	}

	public static class Producer implements Runnable {
		int speed;
		WareHouse warehouse;

		public Producer(int speed, WareHouse warehouse) {
			this.speed = speed;
			this.warehouse = warehouse;
		}

		@Override
		public void run() {
			warehouse.product(speed);
		}
	}

	public static class Customer implements Runnable {
		int speed;
		WareHouse warehouse;

		public Customer(int speed, WareHouse warehouse) {
			this.speed = speed;
			this.warehouse = warehouse;
		}
		@Override
		public void run() {
			warehouse.consume(speed);
		}

	}
}

生产者消费者模型

从图中可以看出来,消费者先运行,可是资源不足,等待生产,等到生产者生产足够,通知消费者消费~

贪睡的理发师模型

理发师问题可以理解为一种特殊的生产者、消费者问题。理发师问题的特殊之处在于

  • 使用了信号量以及互斥量机制。
  • 互斥量使多线程可以互斥第访问信号量
  • 双信号量,记录理发师状态,及顾客数量

条件:1名理发师,1个理发座椅,n个等待座椅

逻辑:

  • 若无人理发,则理发师睡觉;
  • 若有人进入理发店,理发师在睡觉,则叫醒理发师,进行理发;
  • 若有人进入理发店,理发师在理发,且有空的等待座椅,则入座等待;
  • 若有人进入理发店,理发师在理发,且无空的等待座椅,则离开。

Java实现

public class Sleep{
public static void main(String[] args) {
    System.out.println("贪睡的理发师,睡觉中。。。");
    while(true);
}

当然上面是开玩笑的,只会睡觉的理发师是会破产的,下面才是多线程代码:


public class ProblemBarber {

	public static void main(String[] args) {

		Semaphore signal = new Semaphore(10);// 10个空椅子
		Semaphore sleep = new Semaphore(0);// 初始理发师在睡觉(这里的睡觉相当于挂起,而不是执行睡觉)
		Thread barber = new Thread(new Barber(signal, sleep));

		barber.start();
		try {
			Thread.sleep(1000);

			while (true) {
				Thread customer = new Thread(new Customer(signal, sleep));
				customer.start();
				Thread.sleep(100);
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public static class Barber implements Runnable {
		Semaphore signal;
		Integer num;
		Semaphore sleep;

		public Barber(Semaphore signal, Semaphore sleep) {
			this.signal = signal;
			this.sleep = sleep;
		}

		@Override
		public void run() {
			synchronized (this) {
				try {
					sleep.acquire();
				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}
				while (signal.availablePermits() > 0) {
					try {
						Thread.sleep(1000);// 1s 服务一名顾客
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println("1:一名顾客理完发!");
					signal.release();
				}
			}
		}
	}

	public static class Customer implements Runnable {
		Semaphore signal;
		Semaphore sleep;

		public Customer(Semaphore signal, Semaphore sleep) {
			this.signal = signal;
			this.sleep = sleep;
		}
		@Override
		public void run() {

			synchronized (this) {
				if (signal.tryAcquire())// 获得信号量
				{
					System.out.println("2:新添一名顾客!" + signal.availablePermits());
					if (signal.availablePermits() == 9)
						sleep.release();
				} else {
					System.out.println("2:等待座椅不够,顾客离开!");
				}

			}
		}
	}
}

理发师模型

从图中可以看出,理发师被叫醒后一直理发,完成一位,立刻又进入一位顾客。

读者-写者问题

前提:(写互斥)

  • (1)允许多个读者同时执行读操作;
  • (2)不允许读者、写者同时操作;
  • (3)不允许多个写者同时操作。

读者-写着模型,分3种情况:

  • 1:读者优先 (读操作占有写锁,使得后续的写操作挂起,读操作优先执行)
  • 2:顺序操作 (写操作占有读锁,使得所有在写操作后面的线程均在写操作后执行)
  • 3:写着优先 (增加互斥量write_mutex,使得读操作的等待队列中,最多只有一个获得读锁,
  • 从而确保写操作优先获得读锁,延迟其他读操作的进行,确保写操作优先!是最常见的模型)

public class ProblemReaderWriter{

	public static enum Mode
	{
		READFIRST,
		WRITEFRIST,
		NORMAL;
	}
	
	
	public static void main(String[] args)
	{
		Semaphore readerNum = new Semaphore(0);//信号量:读者总数
		Semaphore writerNum = new Semaphore(1);//信号量:写着总数
		Semaphore mutexReader = new Semaphore(1);//读者互斥锁
		Semaphore mutexWriter = new Semaphore(1);//写者互斥锁
		
		Mode mode = Mode.READFIRST; //模式选择
		
		Lock lock = new ReentrantLock();
		
		
		ExecutorService pool = Executors.newCachedThreadPool();

		//3种模式如果同时运行,因存在同步问题,会有意想不到的结果
		//最好还是分开运行

		switch(mode)
		{
			case READFIRST:
				System.out.println("读优先:");
				pool.execute(new Reader1(readerNum,writerNum,lock));
				pool.execute(new Writer1(readerNum,writerNum,lock));
				pool.execute(new Reader1(readerNum,writerNum,lock));
				pool.execute(new Reader1(readerNum,writerNum,lock));
				break;
			case NORMAL:
				System.out.println("顺序操作:");
				pool.execute(new Reader2(readerNum,writerNum,mutexReader));
				pool.execute(new Writer2(readerNum,writerNum,mutexReader));
				pool.execute(new Reader2(readerNum,writerNum,mutexReader));
				pool.execute(new Reader2(readerNum,writerNum,mutexReader));
				break;
			case WRITEFRIST:	
				System.out.println("写优先:");
				pool.execute(new Reader3(readerNum,writerNum,mutexReader,mutexWriter));
				pool.execute(new Reader3(readerNum,writerNum,mutexReader,mutexWriter));
				pool.execute(new Reader3(readerNum,writerNum,mutexReader,mutexWriter));
				pool.execute(new Writer3(readerNum,writerNum,mutexReader,mutexWriter));
				break;
		}
		
		
		pool.shutdown();
		
	}
	
	
	static class Reader1 implements Runnable
	{
		Semaphore readerNum;
		Semaphore writerNum;
		Lock lock;
		
		public Reader1(Semaphore readerNum,Semaphore writerNum,Lock lock) {
			this.readerNum=readerNum;
			this.writerNum=writerNum;
			this.lock=lock;
		}
		
		@Override
		public void run() {
			
			synchronized (readerNum) {
				readerNum.release();//读者+1
				if(readerNum.availablePermits()==1)
					try {
						writerNum.acquire();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
			}
			System.out.println("我在读"+readerNum.availablePermits());
			try {
				Thread.sleep(1000);
				synchronized (readerNum) {
					readerNum.acquire() ;//有可能挂起
					if(readerNum.availablePermits()==0)
						writerNum.release();
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		
	}
	
	static class Writer1 implements Runnable
	{		
		Semaphore readerNum;
		Semaphore writerNum;
		Lock lock;
		
		public Writer1(Semaphore readerNum,Semaphore writerNum,Lock lock) {
			this.readerNum=readerNum;
			this.writerNum=writerNum;
			this.lock=lock;
		}
		
		@Override
		public void run() {
			try {
				writerNum.acquire();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
				System.out.println("我在写");
				writerNum.release();
		}
	}
	
	
	
	
	
	
	
	//顺序操作,添加了一个信号量:读锁
	static class Reader2 implements Runnable
	{
		Semaphore readerNum;
		Semaphore writerNum;
		Semaphore mutexReader;
		
		public Reader2(Semaphore readerNum,Semaphore writerNum,Semaphore lock) {
			this.readerNum=readerNum;
			this.writerNum=writerNum;
			this.mutexReader=lock;
		}
		
		@Override
		public void run() {

			try {
				mutexReader.acquire();
			} catch (InterruptedException e1) {
				// TODO Auto-generated catch block
				e1.printStackTrace();
			}
			synchronized (readerNum) {
				readerNum.release();//读者+1
				if(readerNum.availablePermits()==1)
					try {
						writerNum.acquire();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
			}
			System.out.println("我在读"+readerNum.availablePermits());
			try {
				Thread.sleep(1000);
				synchronized (readerNum) {
					readerNum.acquire() ;//有可能挂起
					if(readerNum.availablePermits()==0)
					{
						writerNum.release();
					}
				}
				mutexReader.release();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		
	}
	
	static class Writer2 implements Runnable
	{		
		Semaphore readerNum;
		Semaphore writerNum;
		Semaphore mutexReader;
		
		public Writer2(Semaphore readerNum,Semaphore writerNum,Semaphore lock) {
			this.readerNum=readerNum;
			this.writerNum=writerNum;
			this.mutexReader=lock;
		}
		
		@Override
		public void run() {
			try {
				mutexReader.acquire();
				writerNum.acquire();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
				System.out.println("我在写");
				writerNum.release();
				mutexReader.release();
		}
	}
	
	
	
	
	
	
	
	
	
	
	//写优先操作,添加了2个信号量:读锁,写锁
		static class Reader3 implements Runnable
		{
			Semaphore readerNum;
			Semaphore writerNum;
			Semaphore mutexReader;
			Semaphore mutexWriter;
			
			public Reader3(Semaphore readerNum,Semaphore writerNum,Semaphore lock,Semaphore mutexWriter) {
				this.readerNum=readerNum;
				this.writerNum=writerNum;
				this.mutexReader=lock;
				this.mutexWriter=mutexWriter;
			}
			
			@Override
			public void run() {

				try {
					mutexWriter.acquire();
					mutexReader.acquire();
				} catch (InterruptedException e1) {
					// TODO Auto-generated catch block
					e1.printStackTrace();
				}
				synchronized (readerNum) {
					readerNum.release();//读者+1
					if(readerNum.availablePermits()==1)
						try {
							writerNum.acquire();
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
				}
				System.out.println("我在读"+readerNum.availablePermits());
				try {
					Thread.sleep(1000);
					synchronized (readerNum) {
						readerNum.acquire() ;//有可能挂起
						if(readerNum.availablePermits()==0)
						{
							writerNum.release();
						}
					}
					mutexReader.release();
					mutexWriter.release();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			
		}
		
		static class Writer3 implements Runnable
		{		
			Semaphore readerNum;
			Semaphore writerNum;
			Semaphore mutexReader;
			Semaphore mutexWriter;
			
			public Writer3(Semaphore readerNum,Semaphore writerNum,Semaphore lock,Semaphore mutexWriter) {
				this.readerNum=readerNum;
				this.writerNum=writerNum;
				this.mutexReader=lock;
				this.mutexWriter=mutexWriter;
			}
			
			@Override
			public void run() {
				try {
					mutexReader.acquire();
					writerNum.acquire();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
					System.out.println("我在写");
					writerNum.release();
					mutexReader.release();
			}
		}
}


读者写者模型

上一章: 锁与信号量的2+2

下一章: 死锁

Search

    Post Directory