작업 간의 협력
작업이 서로 협력할 때 가장 핵심적인 이슈는 두 작업간에 교환하는 신호이다(핸드쉐이킹이라고 한다.)wait()와 notifyAll()
wait()를 사용하여 외부 조건이 변경되기를 기다릴 수 있다. 일반적으로 순환문을 돌면서 조건을 체크하는 일명 바쁜 대기는 비효율적으로 CPU 시간을 사용한다.wait()는 변경을 기다리는 작업을 잠시 중단시킨 뒤 notify() 또는 notifyAll()이 호출되면 다시 작업을 한다.sleep()이나 yield()는 객체의 락을 해제하지 않는다. 반면 wait()를 호출하면 해당 스레드의 실행이 중지되고 락 역시 해제된다. 즉, wait()를 호출하는 것은 '지금 당장 제가 할 수 있는 일은 다 했으니 이 상태에서 기다리겠습니다. 그러나 다른 분들이 동기화된 오퍼레이션을 사용하는 것은 허용하겠습니다.'라고 말하는 것과 같다. 0.001초 단위의 시간을 지정하지 않으면 notify(), notifyAll()을 호출할 때까지 무한정 기다린다.
wait(), notify(), notifyAll()은 Thread가 아닌 Object의 메소드이지만 동기화되지 않은 메소드에서 호출하면 IllegalMonitorStateException이 발생한다. wait(), notify(), notifyAll()를 호출하는 작업은 호출 전에 반드시 해당 객체의 락을 소유하고 있어야 한다는 뜻이다.
class Car { private boolean waxOn = false; public synchronized void waxed() { waxOn = true; // 광택 작업 대기 notifyAll(); } public synchronized void buffed() { waxOn = false; // 왁스 칠 대기 notifyAll(); } public synchronized void waitForWaxing() throws InterruptedException { while(waxOn == false) { wait(); } } public synchronized void waitForBuffing() throws InterruptedException { while(waxOn == true) { wait(); } } } class WaxOn implements Runnable { private Car car; public WaxOn(Car car) { this.car = car; } @Override public void run() { try { while(!Thread.interrupted()) { printnb("Wax On! "); TimeUnit.MILLISECONDS.sleep(200); car.waxed(); car.waitForBuffing(); } } catch (InterruptedException e) { print("Exiting via interrupt"); } print("Ending Wax On task"); } } class WaxOff implements Runnable { private Car car; public WaxOff(Car car) { this.car = car; } @Override public void run() { try { while(!Thread.interrupted()) { car.waitForWaxing(); printnb("Wax Off! "); TimeUnit.MILLISECONDS.sleep(200); car.buffed(); } } catch (InterruptedException e) { print("Exiting via interrupt"); } print("Ending Wax Off task"); } } public class WaxOMatic { public static void main(String[] args) throws InterruptedException { Car car = new Car(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new WaxOff(car)); exec.execute(new WaxOn(car)); TimeUnit.SECONDS.sleep(5); exec.shutdownNow(); } }이 예제에서 강조하는 점은 wait()를 while 문으로 감싸 대상이 되는 조건을 체크해야 한다는 것이다. 이는 다름과 같은 이유 때문에 중요하다.
- 동일한 목적을 가지고 동일한 락을 대기하는 다수의 작업이 존재할 수 있으며 깨어나는 첫번째 작업이 이런 조건을 변경할 수 있다. 이런 경우가 발생하면 해당 작업은 다시 조건이 변경될 동안 중지된다.
- 작업이 wait()로부터 깨어날 때까지 작업이 수행될 수 없거나 당시에는 오퍼레이션의 수행에 관계가 없는 것들을 다른 작업이 변경할 수도 있다. 다시 wait()를 호출하여 중지한다.
- 다른 이유로 개체 락이 대기하고 있을 수도 있다. 이 경우 원하던 이유로 깨어 났는지 확인한 후 그렇지 않다면 다시 wait()를 호출한다.
notify()와 notifyAll()
notify()를 사용하면 가능한 다수의 작업 중 오직 하나의 작업만이 깨어나게 되므로 notify()를 사용할 때에는 적한한 작업이 깨어나도록 조심해야한다.notifyAll()이 모든 대기 중인 작업을 깨운다는 것은 사실이 아니다. 사실 특정 락에 대한 notifyAll()은 바로 그 락에 대한 대기를 하고 있는 작업만을 깨운다.
class Blocker { synchronized void waitingCall() { try { while(!Thread.interrupted()) { wait(); printnb(Thread.currentThread() + " "); } } catch (InterruptedException e) { // 이와 같은 방식의 탈출은 OK } } synchronized void prod() { notify(); } synchronized void prodAll() { notifyAll(); } } class Task implements Runnable { static Blocker blocker = new Blocker(); @Override public void run() { blocker.waitingCall(); } } class Task2 implements Runnable { static Blocker blocker = new Blocker(); @Override public void run() { blocker.waitingCall(); } } public class NotifyVsNotifyAll { public static void main(String[] args) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0; i < 5; i++) { exec.execute(new Task()); } exec.execute(new Task2()); Timer timer = new Timer(); timer.scheduleAtFixedRate(new TimerTask() { boolean prod = true; @Override public void run() { if (prod) { printnb("\nnotify() "); Task.blocker.prod(); prod = false; } else { printnb("\nnotifyAll() "); Task.blocker.prodAll(); prod = true; } } }, 400, 400); // 매 4/10 초마다 실행 TimeUnit.SECONDS.sleep(5); timer.cancel(); print("\nTimer canceled"); TimeUnit.MICROSECONDS.sleep(500); printnb("Task2.blocker.prodAll() "); Task2.blocker.prodAll(); TimeUnit.MICROSECONDS.sleep(500); print("\nShutting down"); exec.shutdown(); } }실행 결과를 보면 Task2 객체가 Task2.blocker에 의해 블럭되어 있더라도 Task.blocker의 notify()나 notifyAll()이 Task2를 깨우지 않는다.
생산자와 소비자
class Meal { private final int orderNum; public Meal(int orderNum) { this.orderNum = orderNum; } @Override public String toString() { return "Meal " + orderNum; } } class WaitPerson implements Runnable { private Restaurant restaurant; public WaitPerson(Restaurant r) { this.restaurant = r; } @Override public void run() { try { while(!Thread.interrupted()) { synchronized (this) { while (restaurant.meal == null) { wait(); } print("Waitperson get " + restaurant.meal); synchronized (restaurant.chef) { restaurant.meal = null; restaurant.chef.notifyAll(); } } } } catch (InterruptedException e) { print("WaitPerson interrupted"); } } } class Chef implements Runnable { private Restaurant restaurant; private int count = 0; public Chef(Restaurant r) { this.restaurant = r; } @Override public void run() { try { while(!Thread.interrupted()) { synchronized (this) { while(restaurant.meal != null) { wait(); } } if (++count == 10) { print("Out of food, closing"); restaurant.exec.shutdownNow(); } printnb("Order up! "); synchronized (restaurant.waitPerson) { restaurant.meal = new Meal(count); restaurant.waitPerson.notifyAll(); } TimeUnit.MICROSECONDS.sleep(100); } } catch (InterruptedException e) { print("Chef interrupted"); } } } public class Restaurant { Meal meal; WaitPerson waitPerson = new WaitPerson(this); Chef chef = new Chef(this); ExecutorService exec = Executors.newCachedThreadPool(); public Restaurant() { exec.execute(chef); exec.execute(waitPerson); } public static void main(String[] args) { new Restaurant(); } }Chef가 음식을 준비한 뒤 이를 WaitPerson에게 알리면 Chef는 WaitPerson이 음식을 가져간 뒤 Chef에게 알릴 때까지 다음 요리 준비하고 대기한다.
wait()가 음식이 대기중인 것을 테스트하는 while()문 안에 위치한다는 것을 유의해야 한다. 이상해보일 수 있지만 음식을 기다리고 있으므로 깨어나면 음식이 있어야 한다. 다만 동시성 프로그램에서는 WaitPerson이 깨어나는 동안 다른 작업이 끼어들어 음식을 가져갈 수 있다. 이를 안전하게 피할 수 있는 유일한 방법은 wait()를 항당 다음과 같이 사용하는 것이다.
while(conditionNotMet) wait();Chef의 경우 shutdownNow()를 호출해도 바로 종료되지 않는데, 이는 Interrupted Exception이 발생해도 바로 감지되는게 아니라, 인터럽드 가능 오퍼레이션(sleep 같은)을 호출하려고 할 때 감지할 수 있다. 아니면 while문의 Thread.interrupted()를 통해야 한다.
생산자-소비자와 큐
많은 경우 Synchronized queue를 사용하면 추상화를 한단계 높여 작업 협력의 문제를 해결할 수 있다. 비어있는 상태에서 요소를 꺼내 가려고 하는 소비자 작업을 중지시키며, 요소가 추가되면 다시 실행시킨다.표준 구현 클래스java.util.concurrent.BlockingQueue를 제공하며, 크기 제한이 없는 LinkedBlockingQueue, ArrayBlockingQueue는 크기가 고정되어 있다.
이러한 큐를 사용하면 wait(), notifyAll()보다 더 신뢰성 있고 간단한 방법으로 상당수 문제들을 해결할 수 있다.
class LiftOffRunner implements Runnable { private BlockingQueuerockets; public LiftOffRunner(BlockingQueue queue) { rockets = queue; } public void add(LiftOff rocket) { try { rockets.put(rocket); } catch (InterruptedException e) { print("Interrupted during put()"); } } @Override public void run() { try { while(!Thread.interrupted()) { LiftOff rocket = rockets.take(); rocket.run(); } } catch (InterruptedException e) { print("Interrupted Waking from take()"); } print("Exiting LiftOffRunner"); } } public class TestBlockingQueue { static void getKey() { try { new BufferedReader(new InputStreamReader(System.in)).readLine(); } catch(IOException e) { throw new RuntimeException(); } } static void getKey(String message) { print(message); getKey(); } static void test(String msg, BlockingQueue queue) { print(msg); LiftOffRunner runner = new LiftOffRunner(queue); Thread t = new Thread(runner); t.start(); for(int i = 0; i < 5; i++) { runner.add(new LiftOff(5)); } getKey("Press 'Enter' (" + msg + ")"); t.interrupt(); print("Finished " + msg + " test"); } public static void main(String[] args) { test("LinkedBlockingQueue", new LinkedBlockingDeque ()); test("ArrayBlockingQueue", new ArrayBlockingQueue (3)); test("SynchronousQueue", new SynchronousQueue ()); } }
BlockingQueue를 사용한 예제로 Taost를 만들고 버터를 바르고, 잼을 바르는 작업을 다음과 같이 만들 수 있다.
class Toast { public enum Status { DRY, BUTTERED, JAMMED } private Status status = Status.DRY; private final int id; Toast(int id) { this.id = id; } public void butter() {status = Status.BUTTERED; } public void jam() {status = Status.JAMMED; } public Status getStatus() { return status; } public int getId() { return id; } @Override public String toString() { return "Toast " + id + ": " + status; } } class ToastQueue extends LinkedBlockingQueue큐를 사용하여 동기화를 명시적으로 관리하고 각 토스트 조각은 한번에 하나의 작업에서만 조작되는 시스템이다. Lock이나 synchronized 키워드가 전혀 없으며, 큐가 프로세스 중지와 재실행을 조절해 준다. BlockingQueue가 간결한 코드로 바꾸어 준다.{} class Toaster implements Runnable { private ToastQueue toastQueue; private int count = 0; private Random random = new Random(47); Toaster(ToastQueue toastQueue) { this.toastQueue = toastQueue; } @Override public void run() { try { while(!Thread.interrupted()) { TimeUnit.MICROSECONDS.sleep(100 + random.nextInt(500)); Toast t = new Toast(count++); print(t); toastQueue.put(t); } } catch (InterruptedException e) { print("Toast interrupted"); } print("Toast off"); } } class Butterer implements Runnable { private ToastQueue dryQueue, butterQueue; Butterer(ToastQueue dryQueue, ToastQueue butterQueue) { this.dryQueue = dryQueue; this.butterQueue = butterQueue; } @Override public void run() { try { while(!Thread.interrupted()) { Toast t = dryQueue.take(); t.butter(); print(t); butterQueue.put(t); } } catch (InterruptedException e) { print("Butterer interrupted"); } print("Butterer off"); } } class Jammer implements Runnable { private ToastQueue butteredQueue, finishedQueue; Jammer(ToastQueue butteredQueue, ToastQueue finishedQueue) { this.butteredQueue = butteredQueue; this.finishedQueue = finishedQueue; } @Override public void run() { try { while(!Thread.interrupted()) { Toast t = butteredQueue.take(); t.jam(); print(t); finishedQueue.put(t); } } catch (InterruptedException e) { print("Jammer interrupted"); } print("Jammer off"); } } class Eater implements Runnable { private ToastQueue finishedQueue; private int counter = 0; Eater(ToastQueue finishedQueue) { this.finishedQueue = finishedQueue; } @Override public void run() { try { while(!Thread.interrupted()) { Toast t = finishedQueue.take(); if (t.getId() != counter++ || t.getStatus() != Toast.Status.JAMMED) { print(">>> Error: " + t); System.exit(1); } else { print("Chomp! " + t); } } } catch (InterruptedException e) { print("Eater interrupted"); } print("Eater off"); } } public class ToastOMatic { public static void main(String[] args) throws InterruptedException { ToastQueue dryQueue = new ToastQueue(), butteredQueue = new ToastQueue(), finishedQueue = new ToastQueue(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new Toaster(dryQueue)); exec.execute(new Butterer(dryQueue, butteredQueue)); exec.execute(new Jammer(butteredQueue, finishedQueue)); exec.execute(new Eater(finishedQueue)); TimeUnit.SECONDS.sleep(5); exec.shutdownNow(); } }
파이프를 사용한 I/O
java.io.PipedReader, java.io.PipedWriter를 통해 작업간 I/O를 지원할 수 있다. BlockingQueue가 지원되기 전에 자바에 존재하던 Blocking Queue라고 할 수 있다.class Sender implements Runnable { private Random rand = new Random(47); private PipedWriter out = new PipedWriter(); public PipedWriter getPipedWriter() { return out; } @Override public void run() { try { while(!Thread.interrupted()) { for(char c = 'A'; c <= 'z'; c++) { out.write(c); TimeUnit.MICROSECONDS.sleep(rand.nextInt(500)); } } } catch (IOException | InterruptedException e) { print(e + " Sender writer exception"); } } } class Reciever implements Runnable { private PipedReader in; public Reciever(Sender sender) throws IOException { in = new PipedReader(sender.getPipedWriter()); } @Override public void run() { try { while(!Thread.interrupted()) { printnb("Read: " + (char)in.read() + ", "); } } catch (IOException e) { print(e + " Reciever read exception"); } } } public class PipedIO { public static void main(String[] args) throws IOException, InterruptedException { Sender sender = new Sender(); Reciever reciever = new Reciever(sender); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(sender); exec.execute(reciever); TimeUnit.SECONDS.sleep(4); exec.shutdownNow(); } }Reader는 sleep()이나 wait()를 하지 않는다. 읽을 데이터가 없으면 자동으로 블록 상태가 된다. PipedReader는 인터럽트 가능하지만 일반적인 I/O는 인터럽트 되지 않는다.
댓글 없음:
댓글 쓰기