레이블이 JAVA인 게시물을 표시합니다. 모든 게시물 표시
레이블이 JAVA인 게시물을 표시합니다. 모든 게시물 표시

2014년 2월 25일 화요일

동시성 (3)

작업 간의 협력

작업이 서로 협력할 때 가장 핵심적인 이슈는 두 작업간에 교환하는 신호이다(핸드쉐이킹이라고 한다.)

wait()와 notifyAll()

wait()를 사용하여 외부 조건이 변경되기를 기다릴 수 있다. 일반적으로 순환문을 돌면서 조건을 체크하는 일명 바쁜 대기는 비효율적으로 CPU 시간을 사용한다.wait()는 변경을 기다리는 작업을 잠시 중단시킨 뒤 notify() 또는 notifyAll()이 호출되면 다시 작업을 한다.
sleep()이나 yield()는 객체의 락을 해제하지 않는다. 반면 wait()를 호출하면 해당 스레드의 실행이 중지되고 락 역시 해제된다. 즉, wait()를 호출하는 것은 '지금 당장 제가 할 수 있는 일은 다 했으니 이 상태에서 기다리겠습니다. 그러나 다른 분들이 동기화된 오퍼레이션을 사용하는 것은 허용하겠습니다.'라고 말하는 것과 같다. 0.001초 단위의 시간을 지정하지 않으면 notify(), notifyAll()을 호출할 때까지 무한정 기다린다.
wait(), notify(), notifyAll()은 Thread가 아닌 Object의 메소드이지만 동기화되지 않은 메소드에서 호출하면 IllegalMonitorStateException이 발생한다. wait(), notify(), notifyAll()를 호출하는 작업은 호출 전에 반드시 해당 객체의 락을 소유하고 있어야 한다는 뜻이다.
class Car {
    private boolean waxOn = false;
    public synchronized void waxed() {
        waxOn = true; // 광택 작업 대기
        notifyAll();
    }
    public synchronized void buffed() {
        waxOn = false; // 왁스 칠 대기
        notifyAll();
    }
    public synchronized void waitForWaxing() throws InterruptedException {
        while(waxOn == false) {
            wait();
        }

    }
    public synchronized void waitForBuffing() throws InterruptedException {
        while(waxOn == true) {
            wait();
        }
    }
}
class WaxOn implements Runnable {
    private Car car;

    public WaxOn(Car car) { this.car = car; }

    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                printnb("Wax On! ");
                TimeUnit.MILLISECONDS.sleep(200);
                car.waxed();
                car.waitForBuffing();
            }
        } catch (InterruptedException e) {
            print("Exiting via interrupt");
        }
        print("Ending Wax On task");
    }
}
class WaxOff implements Runnable {
    private Car car;

    public WaxOff(Car car) { this.car = car; }

    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                car.waitForWaxing();
                printnb("Wax Off! ");
                TimeUnit.MILLISECONDS.sleep(200);
                car.buffed();
            }
        } catch (InterruptedException e) {
            print("Exiting via interrupt");
        }
        print("Ending Wax Off task");
    }
}
public class WaxOMatic {
    public static void main(String[] args) throws InterruptedException {
        Car car = new Car();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new WaxOff(car));
        exec.execute(new WaxOn(car));
        TimeUnit.SECONDS.sleep(5);
        exec.shutdownNow();
    }
}
이 예제에서 강조하는 점은 wait()를 while 문으로 감싸 대상이 되는 조건을 체크해야 한다는 것이다. 이는 다름과 같은 이유 때문에 중요하다.
  • 동일한 목적을 가지고 동일한 락을 대기하는 다수의 작업이 존재할 수 있으며 깨어나는 첫번째 작업이 이런 조건을 변경할 수 있다. 이런 경우가 발생하면 해당 작업은 다시 조건이 변경될 동안 중지된다.
  • 작업이 wait()로부터 깨어날 때까지 작업이 수행될 수 없거나 당시에는 오퍼레이션의 수행에 관계가 없는 것들을 다른 작업이 변경할 수도 있다. 다시 wait()를 호출하여 중지한다.
  • 다른 이유로 개체 락이 대기하고 있을 수도 있다. 이 경우 원하던 이유로 깨어 났는지 확인한 후 그렇지 않다면 다시 wait()를 호출한다.

notify()와 notifyAll()

notify()를 사용하면 가능한 다수의 작업 중 오직 하나의 작업만이 깨어나게 되므로 notify()를 사용할 때에는 적한한 작업이 깨어나도록 조심해야한다.
notifyAll()이 모든 대기 중인 작업을 깨운다는 것은 사실이 아니다. 사실 특정 락에 대한 notifyAll()은 바로 그 락에 대한 대기를 하고 있는 작업만을 깨운다.
class Blocker {
    synchronized void waitingCall() {
        try {
            while(!Thread.interrupted()) {
                wait();
                printnb(Thread.currentThread() + " ");
            }
        } catch (InterruptedException e) {
            // 이와 같은 방식의 탈출은 OK
        }
    }

    synchronized void prod() { notify(); }
    synchronized void prodAll() { notifyAll(); }
}

class Task implements Runnable {
    static Blocker blocker = new Blocker();

    @Override
    public void run() {
        blocker.waitingCall();
    }
}

class Task2 implements Runnable {
    static Blocker blocker = new Blocker();

    @Override
    public void run() {
        blocker.waitingCall();
    }
}

public class NotifyVsNotifyAll {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        for(int i = 0; i < 5; i++) {
            exec.execute(new Task());
        }

        exec.execute(new Task2());

        Timer timer = new Timer();
        timer.scheduleAtFixedRate(new TimerTask() {
            boolean prod = true;
            @Override
            public void run() {
                if (prod) {
                    printnb("\nnotify() ");
                    Task.blocker.prod();
                    prod = false;
                } else {
                    printnb("\nnotifyAll() ");
                    Task.blocker.prodAll();
                    prod = true;
                }
            }
        }, 400, 400); // 매 4/10 초마다 실행
        TimeUnit.SECONDS.sleep(5);
        timer.cancel();
        print("\nTimer canceled");
        TimeUnit.MICROSECONDS.sleep(500);
        printnb("Task2.blocker.prodAll() ");
        Task2.blocker.prodAll();
        TimeUnit.MICROSECONDS.sleep(500);
        print("\nShutting down");
        exec.shutdown();
    }
}
실행 결과를 보면 Task2 객체가 Task2.blocker에 의해 블럭되어 있더라도 Task.blocker의 notify()나 notifyAll()이 Task2를 깨우지 않는다.

생산자와 소비자

class Meal {
    private final int orderNum;
    public Meal(int orderNum) {
        this.orderNum = orderNum;
    }

    @Override
    public String toString() {
        return "Meal " + orderNum;
    }
}

class WaitPerson implements Runnable {
    private Restaurant restaurant;
    public WaitPerson(Restaurant r) {
        this.restaurant = r;
    }

    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                synchronized (this) {
                    while (restaurant.meal == null) {
                        wait();
                    }
                    print("Waitperson get " + restaurant.meal);
                    synchronized (restaurant.chef) {
                        restaurant.meal = null;
                        restaurant.chef.notifyAll();
                    }
                }
            }
        } catch (InterruptedException e) {
            print("WaitPerson interrupted");
        }
    }
}

class Chef implements Runnable {
    private Restaurant restaurant;
    private int count = 0;
    public Chef(Restaurant r) {
        this.restaurant = r;
    }

    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                synchronized (this) {
                    while(restaurant.meal != null) {
                        wait();
                    }
                }
                if (++count == 10) {
                    print("Out of food, closing");
                    restaurant.exec.shutdownNow();
                }
                printnb("Order up! ");
                synchronized (restaurant.waitPerson) {
                    restaurant.meal = new Meal(count);
                    restaurant.waitPerson.notifyAll();
                }
                TimeUnit.MICROSECONDS.sleep(100);
            }
        } catch (InterruptedException e) {
            print("Chef interrupted");
        }
    }
}
public class Restaurant {
    Meal meal;
    WaitPerson waitPerson = new WaitPerson(this);
    Chef chef = new Chef(this);
    ExecutorService exec = Executors.newCachedThreadPool();

    public Restaurant() {
        exec.execute(chef);
        exec.execute(waitPerson);
    }

    public static void main(String[] args) {
        new Restaurant();
    }
}
Chef가 음식을 준비한 뒤 이를 WaitPerson에게 알리면 Chef는 WaitPerson이 음식을 가져간 뒤 Chef에게 알릴 때까지 다음 요리 준비하고 대기한다.
wait()가 음식이 대기중인 것을 테스트하는 while()문 안에 위치한다는 것을 유의해야 한다. 이상해보일 수 있지만 음식을 기다리고 있으므로 깨어나면 음식이 있어야 한다. 다만 동시성 프로그램에서는 WaitPerson이 깨어나는 동안 다른 작업이 끼어들어 음식을 가져갈 수 있다. 이를 안전하게 피할 수 있는 유일한 방법은 wait()를 항당 다음과 같이 사용하는 것이다.
    while(conditionNotMet)
        wait();
Chef의 경우 shutdownNow()를 호출해도 바로 종료되지 않는데, 이는 Interrupted Exception이 발생해도 바로 감지되는게 아니라, 인터럽드 가능 오퍼레이션(sleep 같은)을 호출하려고 할 때 감지할 수 있다. 아니면 while문의 Thread.interrupted()를 통해야 한다.

생산자-소비자와 큐

많은 경우 Synchronized queue를 사용하면 추상화를 한단계 높여 작업 협력의 문제를 해결할 수 있다. 비어있는 상태에서 요소를 꺼내 가려고 하는 소비자 작업을 중지시키며, 요소가 추가되면 다시 실행시킨다.
표준 구현 클래스java.util.concurrent.BlockingQueue를 제공하며, 크기 제한이 없는 LinkedBlockingQueue, ArrayBlockingQueue는 크기가 고정되어 있다.
이러한 큐를 사용하면 wait(), notifyAll()보다 더 신뢰성 있고 간단한 방법으로 상당수 문제들을 해결할 수 있다.
class LiftOffRunner implements Runnable {
    private BlockingQueue rockets;
    public LiftOffRunner(BlockingQueue queue) {
        rockets = queue;
    }

    public void add(LiftOff rocket) {
        try {
            rockets.put(rocket);
        } catch (InterruptedException e) {
            print("Interrupted during put()");
        }
    }

    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                LiftOff rocket = rockets.take();
                rocket.run();
            }
        } catch (InterruptedException e) {
            print("Interrupted Waking from take()");
        }
        print("Exiting LiftOffRunner");
    }
}

public class TestBlockingQueue {
    static void getKey() {
        try {
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } catch(IOException e) {
            throw new RuntimeException();
        }
    }

    static void getKey(String message) {
        print(message);
        getKey();
    }

    static void test(String msg, BlockingQueue queue) {
        print(msg);
        LiftOffRunner runner = new LiftOffRunner(queue);
        Thread t = new Thread(runner);
        t.start();
        for(int i = 0; i < 5; i++) {
            runner.add(new LiftOff(5));
        }
        getKey("Press 'Enter' (" + msg + ")");
        t.interrupt();
        print("Finished " + msg + " test");
    }

    public static void main(String[] args) {
        test("LinkedBlockingQueue", new LinkedBlockingDeque());
        test("ArrayBlockingQueue", new ArrayBlockingQueue(3));
        test("SynchronousQueue", new SynchronousQueue());
    }
}

BlockingQueue를 사용한 예제로 Taost를 만들고 버터를 바르고, 잼을 바르는 작업을 다음과 같이 만들 수 있다.
class Toast {
    public enum Status { DRY, BUTTERED, JAMMED }
    private Status status = Status.DRY;
    private final int id;

    Toast(int id) {
        this.id = id;
    }

    public void butter() {status = Status.BUTTERED; }
    public void jam() {status = Status.JAMMED; }
    public Status getStatus() {
        return status;
    }
    public int getId() { return id; }

    @Override
    public String toString() {
        return "Toast " + id + ": " + status;
    }
}

class ToastQueue extends LinkedBlockingQueue {}

class Toaster implements Runnable {
    private ToastQueue toastQueue;
    private int count = 0;
    private Random random = new Random(47);

    Toaster(ToastQueue toastQueue) {
        this.toastQueue = toastQueue;
    }

    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                TimeUnit.MICROSECONDS.sleep(100 + random.nextInt(500));
                Toast t = new Toast(count++);
                print(t);
                toastQueue.put(t);
            }
        } catch (InterruptedException e) {
            print("Toast interrupted");
        }
        print("Toast off");
    }
}

class Butterer implements Runnable {
    private ToastQueue dryQueue, butterQueue;

    Butterer(ToastQueue dryQueue, ToastQueue butterQueue) {
        this.dryQueue = dryQueue;
        this.butterQueue = butterQueue;
    }

    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                Toast t = dryQueue.take();
                t.butter();
                print(t);
                butterQueue.put(t);
            }
        } catch (InterruptedException e) {
            print("Butterer interrupted");
        }
        print("Butterer off");
    }
}

class Jammer implements Runnable {
    private ToastQueue butteredQueue, finishedQueue;

    Jammer(ToastQueue butteredQueue, ToastQueue finishedQueue) {
        this.butteredQueue = butteredQueue;
        this.finishedQueue = finishedQueue;
    }

    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                Toast t = butteredQueue.take();
                t.jam();
                print(t);
                finishedQueue.put(t);
            }
        } catch (InterruptedException e) {
            print("Jammer interrupted");
        }
        print("Jammer off");
    }
}

class Eater implements Runnable {
    private ToastQueue finishedQueue;
    private int counter = 0;

    Eater(ToastQueue finishedQueue) {
        this.finishedQueue = finishedQueue;
    }

    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                Toast t = finishedQueue.take();
                if (t.getId() != counter++ || t.getStatus() != Toast.Status.JAMMED) {
                    print(">>> Error: " + t);
                    System.exit(1);
                } else {
                    print("Chomp! " + t);
                }
            }
        } catch (InterruptedException e) {
            print("Eater interrupted");
        }
        print("Eater off");
    }
}

public class ToastOMatic {
    public static void main(String[] args) throws InterruptedException {
        ToastQueue dryQueue = new ToastQueue(),
                butteredQueue = new ToastQueue(),
                finishedQueue = new ToastQueue();

        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new Toaster(dryQueue));
        exec.execute(new Butterer(dryQueue, butteredQueue));
        exec.execute(new Jammer(butteredQueue, finishedQueue));
        exec.execute(new Eater(finishedQueue));

        TimeUnit.SECONDS.sleep(5);
        exec.shutdownNow();
    }
}
큐를 사용하여 동기화를 명시적으로 관리하고 각 토스트 조각은 한번에 하나의 작업에서만 조작되는 시스템이다. Lock이나 synchronized 키워드가 전혀 없으며, 큐가 프로세스 중지와 재실행을 조절해 준다. BlockingQueue가 간결한 코드로 바꾸어 준다.

파이프를 사용한 I/O

java.io.PipedReader, java.io.PipedWriter를 통해 작업간 I/O를 지원할 수 있다. BlockingQueue가 지원되기 전에 자바에 존재하던 Blocking Queue라고 할 수 있다.
class Sender implements Runnable {
    private Random rand = new Random(47);
    private PipedWriter out = new PipedWriter();
    public PipedWriter getPipedWriter() { return out; }

    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                for(char c = 'A'; c <= 'z'; c++) {
                    out.write(c);
                    TimeUnit.MICROSECONDS.sleep(rand.nextInt(500));
                }
            }
        } catch (IOException | InterruptedException e) {
            print(e + " Sender writer exception");
        }
    }
}

class Reciever implements Runnable {
    private PipedReader in;

    public Reciever(Sender sender) throws IOException {
        in = new PipedReader(sender.getPipedWriter());
    }

    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                printnb("Read: " + (char)in.read() + ", ");
            }
        } catch (IOException e) {
            print(e + " Reciever read exception");
        }
    }
}

public class PipedIO {
    public static void main(String[] args) throws IOException, InterruptedException {
        Sender sender = new Sender();
        Reciever reciever = new Reciever(sender);

        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(sender);
        exec.execute(reciever);

        TimeUnit.SECONDS.sleep(4);
        exec.shutdownNow();
    }
}
Reader는 sleep()이나 wait()를 하지 않는다. 읽을 데이터가 없으면 자동으로 블록 상태가 된다. PipedReader는 인터럽트 가능하지만 일반적인 I/O는 인터럽트 되지 않는다.

2013년 9월 3일 화요일

동시성 (2)

자원 공유

단일 스레드 프로그램에선 발생하지 않는 문제이지만, 여러 스레드를 사용하면 동시에 같은 자원을 사용하려 할 때 문제가 발생한다. 두 사람이 동시에 같은 장소에 주차하려는 상황저럼..

자원으로의 잘못된 접근

짝수를 생성하는 작업과 생성된 짝수를 검증하는 두개의 작업을 살펴보자.
public abstract class IntGenerator {
    private volatile boolean canceled = false;
    public abstract int next();
    public void cancel() {canceled = true;}
    public boolean isCanceled() {return canceled;}
}

public class EvenChecker implements Runnable {
    private IntGenerator generator;
    private final int id;

    public EvenChecker(IntGenerator generator, int id) {
        this.generator = generator;
        this.id = id;
    }
    public void run () {
        while(!generator.isCanceled()) {
            int val = generator.next();
            if(val % 2 != 0) {
                print(Thread.currentThread().getName() + " " + val + " not even!");
                generator.cancel();
            }
        }
        print(Thread.currentThread().getName() + " " + "Generator is canceled");
    }

    public static void test(IntGenerator gp, int count) {
        print("Press Control-C to exit");
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i < count; i++) {
            exec.execute(new EvenChecker(gp, i));
        }
        exec.shutdown();
    }
    public static void test(IntGenerator gp) {
        test(gp, 10);
    }
}

public class EvenGenerator extends IntGenerator {
    private int currentEvenValue = 0;

    @Override
    public int next() {
        ++currentEvenValue;  // 위험한 부분
        ++currentEvenValue;
        return currentEvenValue;
    }

    public static void main(String[] args) {
        EvenChecker.test(new EvenGenerator());
    }
}
총 10개의 스레드가 같은 Generator 객체를 사용하고 있다. 한 스레드가 next()를 호출하여 currentEvenValue가 증가하는 순간 다른 스레드가 next()를 호출하는 경우는 충분이 발생할 수 있다. 이것은 내부의 값을 부정확한 생태로 빠뜨리게 한다.
Press Control-C to exit
pool-1-thread-1 3707 not even!
pool-1-thread-1 Generator is canceled
pool-1-thread-4 Generator is canceled
pool-1-thread-5 Generator is canceled
pool-1-thread-2 3711 not even!
pool-1-thread-2 Generator is canceled
pool-1-thread-3 3709 not even!
pool-1-thread-3 Generator is canceled
pool-1-thread-6 Generator is canceled
pool-1-thread-7 Generator is canceled
pool-1-thread-4 Generator is canceled
pool-1-thread-3 Generator is canceled
pool-1-thread-6 Generator is canceled
next() 메소드가 복수의 단계를 필요하므로 스레드 메커니즘에 의해 증가 단계의 중간에 작업이 끼어들 수 있다는 것을 명심해야 한다.

공유 자원 경쟁의 해결

동시성이 제대로 동작하려면 두 작업이 동시에 같은 자원에 접근하는 것을 막아야 한다. 이것은 해당 자원을 사용 중일 때 자물쇠를 잠그는 것으로 간단히 해결된다.
자원에 대한 충돌을 방지하기 위하여 자바는 기본적으로 synchronized 키워드를 제공한다. 작업이 synchronized 키워드로 보호되는 코드를 실행하면, 락(lock)이 사용가능한지 확인하고 획득한 다음 코드를 실행하고 완료되면 락을 해제한다.
메소드를 다음과 같이 synchronized 메소드로 선언하여 충돌을 방지할 수 있다.
synchronized void f() { ... }
synchronized void g() { ... }
다만 모든 객체는 모니터라고 불리는 단일 락을 가지고 있다. synchronized 메소드를 호출하면 그 객체가 가지고 있는 다른 모든 synchronized 메소드에 대한 접근이 블럭된다.
public class EvenGenerator extends IntGenerator {
    private int currentEvenValue = 0;

    @Override
    public synchronized int next() {
        ++currentEvenValue;  // 위험한 부분
        ++currentEvenValue;
        return currentEvenValue;
    }

    public static void main(String[] args) {
        EvenChecker.test(new EvenGenerator());
    }
}
클래스에 대한 단일 락도 존재하여 synchronized static 메소드는 클래스 당 static 데이터의 동시접근을 막기 위한 락을 제공한다.

Brian's Rule of Synchronization
이후에 다른 스레드가 읽을 수 있는 변수를 기록할 때나 이전에 다른 스레드가 기록했을 수 있는 변수를 읽을 때
동기화를 사용해야 한다. 추가적으로 읽는 객체나 기록하는 객체 모두 동일한 모니터 락을 사용하여 동기화를
해야 한다.

명시적인 락

java.util.concurrent.locks에 정의된 명시적인 뮤텍스 메커니즘을 제공한다. 락 객체를 명시적으로 생성, 잠그고 해제 한다.
public class MutexEvenGenerator extends IntGenerator {
    private int currentEvenValue = 0;
    private Lock lock = new ReentrantLock();

    @Override
    public int next() {
        lock.lock();
        try {
            ++currentEvenValue;
            ++currentEvenValue;
            return currentEvenValue;
        } finally {
            lock.unlock();
        }
    }
    public static void main(String[] args) {
        EvenChecker.test(new MutexEvenGenerator());
    }
}
MutexEvenGenerator는 lock이라는 뮤텍스를 추가하고 next()안에서 임계영역을 생성하기 위하여 lock(), unlock() 메소드를 사용한다. lock()을 호출한 직후 try-finally 블럭을 사용하고 finally 안에서 unlock()을 호출한다.
try 블럭 안에서 return을 호출하여 unlock()이 너무 빨리 호출되지 않도록 유의한다.
synchronized 키워드 내부에서 예외가 발생하면 클린업을 수행할 기회가 없지만, 명시적인 lock 객체를 사용하면 finally 구문 안에서 사용 가능하다.
synchronized에서는 락 획득을 시도하거나 특정 시간 동안의 락 획득 시도 이후 포기와 같은 것을 할 수 없다. 명시적 lock 객체는 획득과 해제에 있어 synchronized 보다 좀 더 세밀한 제어를 가능하게 한다.

원자성(atomicity)과 휘발성(volatile)

원자성 오퍼레이션으 스레드 스케줄러에 의한 인터럽트가 발생하지 않는 오퍼레이션을 말하는데, 이러한 원자성을 신뢰하는 것은 위험한 것이다. 때때로 원자성 오퍼레이션이 안전해야 할 것 같은 상황에서도 실제로는 안전하지 않을 수 있다.
volatile 키워드는(SE5 이전에는 동작하지 않는다) 해당 필드에 수정이 가해지면 모든 읽기가 변화한다는 것을 의미한다. non-volatile 필드에 대한 원자성 오퍼레이션은 주 메모리에 저장하지 않고 레지스터 캐쉬에 저장될 수 있어 이 필드를 읽는 다른 작업은 새로운 값을 읽어 가지 않을 수도 있다. 복수의 작업이 한 필드에 접근을 한다면 그 필드는 volatile 되어야 한다.

원자(Atomic)클래스

자바 SE5는 AtomicInteger, AtomicLong, AtomicReference와 같이 원자 오퍼레이션을 가진 특별한 원자 클래스를 제공한다.
public class AtomicEvenGenerator extends IntGenerator {
    private AtomicInteger currentEvenValue = new AtomicInteger(0);

    @Override
    public int next() {
        return currentEvenValue.addAndGet(2);
    }
    public static void main(String[] args) {
        EvenChecker.test(new AtomicEvenGenerator());
    }
}
AtomicInteger를 사용하였기 때문에 별도의 동기화 기술이 필요 없다. java.util.concurrent의 클래스를 작성하기 위해 디자인 되었으며, 비록 이것이 문제를 일으키지 않을 것을 확신하더라도 특수한 상황에서만 사용해야 한다. 일반적으로 sychronized 키워드나 명시적 lock을 사용하는 것이 더 안전하다.

임계영역

때때로 메소드 전체가 아닌 일부분에 대하여 다수 스레드의 접근을 제한하고자 할 때가 있다. 이런 영역을 임계영역(Critical Section)이라고 하며 synchronized 키워드를 사용하여 생성한다. 여기서 synchronized는 내부의 코드를 동기화하는데 사용되는 락을 가지고 있는 객체를 지정하기 위해 사용된다.
synchronized(syncObject) {
    // 이 부분은 한번에 하나의 작업만
    // 접근 할 수 있다.
}
이는 동기화 블럭이라고 부르기도 한다. 이 부분을 수행하기 위해서는 우선 syncObject의 락을 획득해야 한다.
다음 코드에서 PairManager2의 락을 거는 시간이 현저히 작다. 이것이 전체 메소드의 동기화가 아닌 동기화 블럭을 사용하는 이유다.
abstract class PairManager {
    AtomicInteger checkCounter = new AtomicInteger(0);
    protected Pair p = new Pair();
    private List<Pair> storage = Collections.synchronizedList(new ArrayList<Pair>());
    public synchronized Pair getPair() {
        return new Pair(p.getX(), p.getY());
    }
    // 시간을 소비하는 오퍼레이션으로 가정
    protected void store(Pair p) {
        storage.add(p);
        try {
            TimeUnit.MILLISECONDS.sleep(50);
        } catch (InterruptedException e) {

        }
    }
    public abstract void increment();
}

// 전체 메소드 동기화
class PairManager1 extends PairManager {
    @Override
    public synchronized void increment() {
        p.incrementX();
        p.incrementY();
        store(getPair());
    }
}

// 임계영역 사용
class PairManager2 extends PairManager {
    @Override
    public void increment() {
        Pair temp;
        synchronized (this) {
            p.incrementX();
            p.incrementY();
            temp = getPair();
        }
        store(temp);
    }
}
시간을 소비하는 store() 메소드는 Pair 객체를 동기화된 ArrayList에 추가하므로 스레드에 안전하다고 할 수 있다. 그러므로 보호될 필요가 없으며 PairManager2의 동기화 블럭 외부에 위치한다.

다른 객체에 대한 동기화

동기화 블럭은 동기화에 사용할 객체를 지정해야 하며, 일반적으로 동기화에 가장 적합한 객체는 해당 메소드가 호출되고 있는 현재의 객체가 된다. 동기화 블럭에서 락을 획득하고 나면, 객체의 다른 동기화 메소드나 임계 영역에 대한 접근은 거부된다. 그러므로 this를 사용하여 동기화 블럭을 지정하면 임계영역으로 인해 동기화의 범위가 줄어든다. 반대로 말하면 동기화시 서로 다른 객체를 사용하면 두 작업이 동시에 접근 가능하다.
class DualSync {
    private Object syncObject = new Object();
    public synchronized void f() {
        for (int i = 0; i < 5; i++) {
            print("f()");
            Thread.yield();
        }
    }
    public void g() {
        synchronized (syncObject) { // this로 바꿀 경우 f(), g() 동시 접근이 불가능 하다.
            for (int i = 0; i < 5; i++) {
                print("g()");
                Thread.yield();
            }
        }
    }
}
public class SyncObject {
    public static void main(String[] args) {
        final DualSync ds = new DualSync();
        new Thread() {
            public void run() {
                ds.f();
            }
        }.start();
        ds.g();
    }
}

스레드 로컬 저장소

공유자원에 대한 작업 충돌을 방지하는 다른 방법은 공유 자체를 제거하는 것이다.객체를 사용하는 서로 다른 스레드마다 동일한 변수에 대하여 자동으로 서로 다른 저장소를 생성하는 메커니즘이다. 스레드 로컬 저장소의 생성 및 관리는 java.lang.ThreadLocal 클래스가 수행한다.
class Accessor implements Runnable {
    private final int id;

    public Accessor(int id) { this.id = id; }
    @Override
    public void run() {
        while(!Thread.currentThread().isInterrupted()) {
            ThreadLocalVariableHolder.increment();
            print(this);
            Thread.yield();
        }
    }
    @Override
    public String toString() {
        return "Accessor{id=" + id + " : " + ThreadLocalVariableHolder.get() + '}';
    }
}

public class ThreadLocalVariableHolder {
    private static ThreadLocal<Integer> value = new ThreadLocal<Integer>() {
        private Random rand = new Random(47);
        protected synchronized Integer initialValue() {
            return rand.nextInt(10000);
        }
    };
    public static void increment() { value.set(value.get() + 1); }
    public static int get() { return value.get(); }
    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            exec.execute(new Accessor(i));
        }
        TimeUnit.MILLISECONDS.sleep(100); // 잠시 동안 실행
        exec.shutdownNow();  // 모든 Accessor 종료
    }
}
각 스레드에서 사용되는 ThreadLocal은 초기값을 다르게 설정하도록 되어 있다. 각 스레드에서 같은 static 객체를 참조하고 있지만 실제로는 각 스레드의 ThreadLocal.ThreadLocalMap에서 해당 값을 참조한다. 같은 객체를 참조하는 것처럼 보이지만 각 Thread별 다른 객체를 참조하도록 한다. 쓸데가 있을지는 고민을 해봐야겠다.


작업 종료하기

식물원

각 입구 마다 카운터가 설치되어 입장객의 수를 세고 있으며, 입구의 카운터가 늘어날 때 마다 공유하는 전체 카운터도 같이 증가한다.
class Count {
    private int count = 0;
    private Random rand = new Random(47);
    public synchronized int increment() {
        int temp = count;
        if(rand.nextBoolean())
            Thread.yield();
        return (count = ++temp);
    }
    public synchronized int value() { return count; }
}

class Entrance implements Runnable {
    private static Count count = new Count();
    private static List<Entrance> entrances = new ArrayList<Entrance>();
    private int number = 0;
    private final int id;
    private static volatile boolean canceled = false;
    // 휘발성 필드에 대한 원자성 오퍼레이션
    public static void cancel() { canceled = true; }
    public Entrance(int id) {
        this.id = id;
        // 현재 작업을 리스트에 보관
        // 죽은 작업의 가비지 컬렉션을 방지한다.
        entrances.add(this);
    }
    @Override
    public void run() {
        while(!canceled) {
            synchronized (this) {
                ++number;
            }
            print(this + " Total: " + count.increment());
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                print("sleep interrupted");
            }
        }
        print("stopping " + this);
    }
    public synchronized int getValue() { return number; }
    public String toString() {
        return "Enttance " + id + ": " + getValue();
    }
    public static int getTotalCount() {
        return count.value();
    }
    public static int sumEntrances() {
        int sum = 0;
        for(Entrance entrance : entrances) {
            sum += entrance.getValue();
        }
        return sum;
    }
}
public class OrnamentalGarden {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            exec.execute(new Entrance(i));
        }
        TimeUnit.MILLISECONDS.sleep(2000);
        Entrance.cancel();
        exec.shutdown();
        if(!exec.awaitTermination(250, TimeUnit.MILLISECONDS)) {
            print("Some tasks were not terminated!");
        }
        print("Total: " + Entrance.getTotalCount());
        print("Sum of Entrances: " + Entrance.sumEntrances());
    }
}
위 프로그램은 모든 것을 안전하게 종료하는 것에 조금 문제가 있다.

블럭 상태에서의 종료

스레드 상태

  • New : 스레드를 생성하는 순간, 이 후 실행 가능한 Runnable 혹은 Blocked 상태로 된다.
  • Runnable : 이용 가능한 CPU가 있으면 실행 할 수 있다는 것을 의미함, 실행 중이 아닐 수도 있으며 스케줄러가 할당되면 실행하는데 장애가 없음
  • Blocked : 스레드가 실행될 수 있지만 무언가가 막고 있는 상태
    • sleep()이 호출되어 지정된 시간 동안 실행이 중단된 경우
    • wait()을 통하여 스레드 실행이 지연되는 경우, notify() 혹은 notifyAll()을 수신하지 않으면 Runnable로 바뀌지 않는다.
    • 특정 I/O가 완료되기를 기다릴 때
    • 다른 객체의 synchronized 메소드를 호출 시도했으나 다른 작업에 의해 lock 된 경우
  • Dead : 죽거나 완료된 상태, CPU를 할당받지 않으며 작업이 완래되고 더이상 Runnable로 바뀌지 않는다. run()에서 return 되거나, interrupt 되는 경우다.
위의 Blocked 상태에서 작업을 종료하고자 할 때, 작업 내부의 로직이 스스로 종료를 감지하는 것을 기다릴 수 없다면 강제적으로 종료해야 한다.

인터럽트

블럭된 작업의 종료를 위해 Thread 클래스는 interrupt() 메소드를 제공하며 이는 스레드의 인터럽트 상태를 변경한다. 인터럽트 상태가 된 스레드는 이미 블럭되어 있거나 블럭될 오퍼레이션을 사용하려고 할 때 InterruptException을 발생시키며, 예외를 발생시킨 후 또는 작업이 Thread.interrupted()를 호출하면 인터럽트 상태를 다시 초기화한다.
다음 코드는 Executor를 사용하여 스레드를 시작할 때 execute()가 아닌 submit()을 호출하여 얻은 스레드의 컨텍스트를 통해 cancel()을 호출해서 해당 작업에 인터럽트를 전달하고 있다. cancel()에 true를 전달하면 해당 스레드에 interrupt()를 호출하여 중단시킬 권한을 부여하는 것이다.
class SleepBlocked implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(100);
        } catch (InterruptedException e) {
            print("InterruptedException");
        }
        print("Exiting SleepBlocked.run()");
    }
}
class IOBlocked implements Runnable {
    private InputStream in;

    IOBlocked(InputStream in) {
        this.in = in;
    }

    @Override
    public void run() {
        try {
            print("Waiting for read()");
            in.read();
        } catch (IOException e) {
            if(Thread.currentThread().isInterrupted()) {
                print("Interrupted from block I/O");
            } else {
                throw new RuntimeException(e);
            }
        }
        print("Exiting IOBlocked.run()");
    }
}
class SynchronizedBlocked implements Runnable {
    public synchronized void f() {
        while(true) {
            Thread.yield();
        }
    }
    public SynchronizedBlocked() {
        new Thread() {
            public void run() {
                f();
            }
        }.start();
    }
    public void run() {
        print("Trying to call f()");
        f();
        print("Exiting SynchronizedBlocked.run()");
    }
}
public class Interrupting {
    private static ExecutorService exec = Executors.newCachedThreadPool();
    static void test(Runnable r) throws InterruptedException {
        Future f = exec.submit(r);
        TimeUnit.MILLISECONDS.sleep(100);
        print("Interrupting " + r.getClass().getName());
        f.cancel(true);  // 실행중이라면 인터럽트
        print("Interrupt sent to " + r.getClass().getName());
    }
    public static void main(String[] args) throws Exception {
        test(new SleepBlocked());
        test(new IOBlocked(System.in));
        test(new SynchronizedBlocked());
        TimeUnit.SECONDS.sleep(3);
        print("Aboarting with System.exit(0)");
        System.exit(0); // 두 번의 인터럽트가 실패할 경우
    }
}
SleepBlock, IOBlockd, SynchronizedBlocked 세 종류의 작업이 있다. 결과를 보면 sleep()에 대하여는 인터럽트를 걸 수 있다는 것을 알 수 있다. 하지만 동기화 락이나 I/O를 수행하는 상태의 블럭에 대해서는 인터럽트가 발생하지 않는다. I/O의 문제를 위한 한가지 방법은 해당 락을 유발한 자원을 닫는 것이다.
I/O 장에서 소개된 nio 클래스는 세련된 인터럽트를 제공한다. ReentrantLock에 의해 블록 된 작업은 인터럽트 처리가 가능하다.

인터럽트 감지

스레드에 interrupt()를 호출하면 작업이 블럭 상태에 들어가려고 하거나 이미 들어간 상태에서만 인터럽트가 발생한다.(I/O 및 동기화 메소드를 통한 블럭은 인터럽트가 발생하지 않는다.) 하지만 작업을 멈추기 위해 interrupt()를 호출했다면, 블럭 상황이 아니더라도 빠져 나올 수 있는 방법이 필요하다.
이것은 interrupt() 호출로 변경되는 인터럽트 상태로 해결 될 수 있다. interrupted()를 호출하여 상태를 확인 할 수 있고, 상태를 초기화하는 역할도 한다.(동일한 인터럽트를 두번 받지 않는다.) 즉 인터럽트의 확인은 한 번의 InterruptedException 감지나 한 번의 Thread.interrupted()의 성공적인 반환으로 가능하다.
class NeedsCleanup {
    private final int id;
    NeedsCleanup(int ident) {
        this.id = ident;
        print("NeedsCleanup " + id);
    }
    public void cleanup() {
        print("Cleaning up " + id);
    }
}
class Blocked3 implements Runnable {
    private volatile double d = 0.0;

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                // 첫번째 포인트
                NeedsCleanup n1 = new NeedsCleanup(1);
                // n1의 정의 직후 try-finally 구문 시작
                // n1의 정확한 소거를 보장하기 위해서이다
                try {
                    print("Sleeping");
                    TimeUnit.SECONDS.sleep(1);
                    // 두번째 포인트
                    NeedsCleanup n2 = new NeedsCleanup(2);
                    // n2의 정확한 소거 보장
                    try {
                        print("Calculating");
                        // 시간을 소비하지만 블로킹 되지 않은 오퍼레이션
                        for (int i = 1; i < 250000; i++) {
                            d = d + (Math.PI + Math.E) / d;
                        }
                        print("Finished time-consuming operation");
                    } finally {
                        n2.cleanup();
                    }
                } finally {
                    n1.cleanup();
                }
            }
            print("Exiting via while() test");
        } catch (InterruptedException e) {
            print("Exiting via InterruptedException");
        }
    }
}
public class InterruptingIdiom {
    public static void main(String[] args) throws InterruptedException {
        if (args.length != 1) {
            print("Usage: java InterruptingIdiom delay-in-ms");
            System.exit(1);
        }
        Thread t = new Thread(new Blocked3());
        t.start();
        TimeUnit.MILLISECONDS.sleep(new Integer(args[0]));
        t.interrupt();
    }
}
예외가 발생하여 순환문을 빠져 나올 때는 적절한 자원 반환(CleanUp)이 꼭 필요하다. interrupt()에 반응하는 클래스를 만들 때에는 객체 생성시 뒤 따르는 try-finally를 통해 객체 소거가 보장되어야 한다.

2013년 8월 29일 목요일

동시성 (1)

병렬 프로그래밍을 해야할 일이 거의 없기 때문에 아주 단편적인 내용만 알고 있었으나 좀 더 자세히 알 필요가 있을 듯 해서
Thinking In JAVA의 21장 동시성 부분을 다시 보면서 정리해본다.


동시성의 다양한 면

더 빠른 실행

프로그램을 더 빠르게 하고 싶다면 동시성을 사용하여 여분의 프로세서를 활용하는 방법을 알아야 한다.
언뜻 생각하면 프로그램의 각 부분을 순차적으로 실행하는 것보다 동시에 병렬적으로 실행할 경우 작업 간 전환이라는 컨텍스트-스위칭까지 처리해야 하기 때문에 더 많은 부하가 걸린다고 생각할 수 있다. 하지만 Block이 발생할 경우 해당 문제가 해결되기 전까지 전체 프로그램이 멈추게 된다.
동시성을 적용한다면 하나의 작업이 정지되더라도 나머지 작업은 진행할 수 있기에 프로그램은 계속적으로 진행이 되는 것이다. Block을 고려하지 않는다면 싱글 프로세서 환경에서 동시성이 가져오는 성능상의 이점은 없다.

코드 디자인의 향상

동시성은 복잡성과 같은 비용이 들어가는 기술이다. 그러나 프로그램 디자인, 자원의 분배 그리고 사용의 편의성 등의 장점을 고려하면 이러한 비용은 지불할 만하다. 또한 스레드를 사용하면 느슨하게 연결된 코드를 디자인할 수 있다.


기본적인 스레드 기법

작업의 정의

스레드는 작업을 처리하므로, 작업을 기술하는 방법이 필요하다. Runnable 인터페이스가 바로 이 작업을 정의한다. Runnable 인터페이스를 구현하고 run() 메소드를 작성하면 된다.
class LiftOff implements Runnable {
    protected int countDown = 10;
    private static int taskCount = 0;
    private final int id = taskCount++;

    public LiftOff() {
    }

    public LiftOff(int countDown) {

        this.countDown = countDown;
    }

    @Override
    public void run() {
        while(countDown-- > 0) {
            printnb(status());
            Thread.yield();  // 중요한 작업은 끝났다는 표시
        }
    }

    private String status() {
        return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + "), ";
    }
}

public class MainThread {
    public static void main(String[] args) {
        LiftOff launch = new LiftOff();
        launch.run();
    }
}
이것은 스레드 기능을 생성하지 않으므로 단순히 run() 메소드를 실행한 것에 지나지 않는다. 스레드 기능을 이용하기 위해서는 위에서 정의한 작업을 스레드에 올려야 한다.

Thread 클래스

전통적인 방법은 Thread 생성자로 작업을 넘겨 처리하는 것이다.
public class MainThread {
    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            Thread t = new Thread(new LiftOff());
            t.start();
        }
        print("Waiting for Liftoff");
    }
}
Thread 생성자는 Runnable 객체만을 필요로 한다. start() 메소드 호출은 Runnable의 run() 메소드를 호출하여 새로운 스레드에서 작업을 시작하도록 한다.
main() 메소드가 스레드 객체를 생성할 때, 생성한 객체에 대한 참조를 저장하지 않는다. 일반적인 객체라면 가비지 컬렉터에 의해 제거되겠지만, 스레드는 다르다. 각 스레드는 자체적으로 특정 위치에 참조를 등록하기 때문에 작업이 run()에서 빠져 나와 종료하기 전에는 가비지 컬렉터가 이를 제거할 수 없다.

Executor의 사용

java.util.concurrent.Executors는 스레드 객체를 관리하여 동시성 프로그래밍을 간단하게 만들어 준다.
public class MainThread {
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i &lt 5; i++) {
            exec.execute(new LiftOff());
        }
        exec.shutdown();
        print("Waiting for Liftoff");
    }
}
shutdown()의 호출은 Executor가 더 이상의 작업을 할당하지 못하게 설정한다. shutdown()의 호출 이전에 Executor에 할당한 모든 작업이 완료되면 현재 스레드(이번 경우는 main() 스레드)는 종료된다.

CachedThreadPool은 다른 유형의 Executor로 대체 가능하다.
FixedThreadPool은 제한된 개수의 작업만 할당 가능하다.
SingleThreadExecutor는 크기가 1인 FixedThreadPool과 유사하다. 만약 1개 이상의 작업이 등록되면 순차적으로 처리된다.

작업의 결과 반환하기

Runnable은 return 값이 없다. return 값이 필요하다면 Runnable이 아닌 Callable 인터페이스를 사용해야 하며, 이는 run()이 아닌 call() 메소드를 가지고, call()의 반환값을 표현하는 타입 파라미터를 사용하는 제너릭이다. ExecutorService.submit() 메소드를 통해서만 실행해야 한다.
class TakeWithResult implements Callable%ltString&gt {
    private int id;
    public TakeWithResult(int id) {
        this.id = id;
    }

    @Override
    public String call() throws Exception {
        return "result of TaskWithResult " + id;
    }
}

public class CallableDemo {
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        ArrayList&ltFuture&ltString&gt&gt results = new ArrayList&ltFuture&ltString&gt&gt();

        for (int i = 0; i < 10; i++) {
            results.add(exec.submit(new TakeWithResult(i)));
        }

        for (Future&ltString&gt fs : results) {
            try {
                // get()은 종료시까지 Block 된다.
                print(fs.get());
            } catch (InterruptedException e) {
            } catch (ExecutionException e) {
            } finally {
                exec.shutdown();
            }
        }
    }
}
Callable에 의해 반환되는 결과는 파라미터화 된 Future 객체다.
Future의 isDone()으로 종료 여부를 확인 할 수 있으며, 그냥 get()을 호출하면 결과가 준비될 때까지 Block된다. get()에 타임아웃을 설정할 수도 있다.

Sleeping

일정시간 작업을 멈추게 하는 Sleep()을 호출하여 변화를 줄 수 있다. yield()를 Sleep()으로 대체하면 다음과 같다
public class SleepingTask extends LiftOff {
    @Override
    public void run() {
        while(countDown-- > 0) {
            printnb(status());
            try {
                // 예전스타일
                // Thread.sleep(100);
                // 자바 SE 5/6 스타일
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
            }
        }
    }

    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            exec.execute(new SleepingTask());
        }
        exec.shutdown();
    }
}
Sleep()의 호출은 InterruptedException을 발생시킬 수 있다. main()으로 전달 되지 않기 때문에 반드시 잡아야 한다.
각 작업에서 Sleep()이 호출되기 때문에 균등하게 분산된 순서로 처리 될 수 있으나 전적으로 신뢰해서는 안된다.

우선 순위

CPU가 스레드를 실행하는 순서를 결정할 순 없지만 동일한 조건이라면 우선순위가 높은 스레드를 실행하고 낮은 스레드는 대기시킨다. 우선순위가 낮은 스레드가 실행되지 않음을 의지하는 것은 아니다.
우선순위를 조절하는 것은 큰 위험을 수반하므로 신중하게 사용해야 한다.
getPriority(), setPriority()를 사용하여 언제든 변경 가능하다.
public class SimplePriorities implements  Runnable {
    private int priority;
    private volatile double d;
    private int countDown = 5;

    public SimplePriorities(int priority) {
        this.priority = priority;
    }

    @Override
    public String toString() {
        return Thread.currentThread() + ": " + countDown;
    }

    @Override
    public void run() {
        Thread.currentThread().setPriority(priority);
        while(true) {
            for (int i = 1; i < 100000; i++) {
                d += (Math.PI + Math.E) / (double) i;
                if(i % 1000 == 0)
                    Thread.yield();
            }
            print(this);
            if(--countDown == 0) return;
        }
    }

    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        for(int i = 0; i < 5; i++) {
            exec.execute(new SimplePriorities(Thread.MIN_PRIORITY));
        }
        exec.execute(new SimplePriorities(Thread.MAX_PRIORITY));
        exec.shutdown();
    }
}
스레드는 의미없는 double형 계산을 수행하고 있다. 마지막에 실행한 스레드가 가장 높은 우선순위로 설정되었다.
결과는 아래와 같다. 높은 우선순위의 스레드가 더 선호되는 것은 확인되지만, 무조건 최우선으로 처리 되진 않았다.
Thread[pool-1-thread-3,1,main]: 5
Thread[pool-1-thread-6,10,main]: 5
Thread[pool-1-thread-5,1,main]: 5
Thread[pool-1-thread-4,1,main]: 5
Thread[pool-1-thread-3,1,main]: 4
Thread[pool-1-thread-5,1,main]: 4
Thread[pool-1-thread-6,10,main]: 4
Thread[pool-1-thread-2,1,main]: 5
Thread[pool-1-thread-1,1,main]: 5
Thread[pool-1-thread-5,1,main]: 3
Thread[pool-1-thread-3,1,main]: 3
Thread[pool-1-thread-5,1,main]: 2
Thread[pool-1-thread-3,1,main]: 2
Thread[pool-1-thread-6,10,main]: 3
Thread[pool-1-thread-4,1,main]: 4
Thread[pool-1-thread-2,1,main]: 4
Thread[pool-1-thread-1,1,main]: 4
Thread[pool-1-thread-5,1,main]: 1
Thread[pool-1-thread-3,1,main]: 1
Thread[pool-1-thread-4,1,main]: 3
Thread[pool-1-thread-6,10,main]: 2
Thread[pool-1-thread-2,1,main]: 3
Thread[pool-1-thread-1,1,main]: 3
Thread[pool-1-thread-4,1,main]: 2
Thread[pool-1-thread-6,10,main]: 1
Thread[pool-1-thread-2,1,main]: 2
Thread[pool-1-thread-1,1,main]: 2
Thread[pool-1-thread-4,1,main]: 1
Thread[pool-1-thread-2,1,main]: 1
Thread[pool-1-thread-1,1,main]: 1
JAVA에서는 10단계의 우선순위를 제공하지만 OS와 고정적으로 매핑되어 있진 않다. 이식성을 위해서 MAX_PRIORITY, MIN_PRIORITY를 권장한다.

양보하기

run() 메소드에서 작업이 완료되었다고 판단이 되면 yield() 메소드를 통해 스케줄러에게 다른 스레드에게 CPU를 할당 해도 된다고 알려 줄 수 있다.
단지 동일한 우선순위를 가지는 다른 스레드가 실행되어도 된다는 것을 알려주는 것일 뿐 이다. 튜닝이나, 민감한 제어가 yield()에 의존해서는 안된다.

데몬 스레드

데몬 스레드는 주된 기능은 아니지만 프로그램이 실행되는 동안 백그라운드로 실행되는 서비스를 제공하기 위해 사용된다.
데몬 스레드는 일반 스레드와는 다르게 비-데몬스레드가 종료되면 실행 중이더라도 모든 데몬 스레드가 종료된다.
public class SimpleDaemons implements Runnable {
    @Override
    public void run() {
        try {
            while(true) {
                TimeUnit.MILLISECONDS.sleep(500);
                print(Thread.currentThread() + " " + this);
            }
        } catch (InterruptedException e) {
            print("sleep() interrupted");
        }
    }
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            Thread daemon = new Thread(new SimpleDaemons());
            daemon.setDaemon(true);  // start() 이전에 호출되어야 함
            daemon.start();
        }
        print("All daemons started");
        TimeUnit.MILLISECONDS.sleep(600);
    }
}
심지어 데몬 스레드의 finally 구문도 실행되지 않고 강제 종료된다.
이것은 스레드를 정상적으로 종료할 수 없다는 의미이므로 바람직한 사용은 아니다.


다양한 코딩 방법

작업을 Runnable로 구현하지 않고 Thread로부터 직접 상속받을 수도 있다.
public class SimpleThread extends Thread {
    private static int threadCount = 0;
    private int countDown = 5;
    public SimpleThread() {
        super(Integer.toString(++threadCount));
        start();
    }
    
    @Override
    public String toString() {
        return getName() + "{" + countDown + "}, ";
    }

    @Override
    public void run() {
        while(true) {
            printnb(this);
            if(--countDown == 0)
                return;
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            new SimpleThread();
        }
    }
}
Thread를 상속하면 다른 클래스를 상속할 수 없다는 단점이 있지만, Runnable 인터페이스를 구현하면 그런 제한이 사라진다.
위 예제는 생성자에서 start() 메소드를 호출하고 있다. 이는 매우 단순한 예제이기 때문에 안전하지만, 생성자에서 스레드를 시작하는 것은 매우 위험하다. 생성자가 완료되기 전에 또 다른 작업이 시작되어 불완전한 객체에 접근할 수도 있기 때문이다.
class ThreadMethod {
    private int countDown = 5;
    private Thread t;
    private String name;
    public ThreadMethod(String name) {
        this.name = name;
    }
    public void runTask() {
        if(t == null) {
            t = new Thread(name) {
                public void run() {
                    try {
                        while(true) {
                            print(this);
                            if(--countDown == 0) return;
                            sleep(10);
                        }
                    } catch (InterruptedException e) {
                        print("sleep() interrupted");
                    }
                }
                public String toString() {
                    return getName() + ": " + countDown;
                }
            };
            t.start();
        }
    }
}
ThreadMethod 클래스는 메소드 안에서 스레드 생성을 보여준다.

용어 정리

실행되는 작업과 작업을 구동하는 스레드를 구별해야 한다. 일단 작업을 생성(Runnable)하고 생성한 작업을 스레드(Thread)에 탑재해야 한다. 작업과 스레드가 혼용되어 혼란을 시킬 수 있다.

조인

하나의 스레드는 다른 스레드에 대하여 join()을 호출하여 그 스레드가 종료될 때까지 대기 할 수 있다.
class Sleeper extends Thread {
    private int duration;
    public Sleeper(String name, int sleepTime) {
        super(name);
        this.duration = sleepTime;
        start();
    }
    public void run() {
        try {
            sleep(duration);
        } catch (InterruptedException e) {
            print(getName() + " was interrupted. " + "isInterrupted(): " + isInterrupted());
            return;
        }
        print(getName() + " has awakened");
    }
}

class Joiner extends Thread {
    private Sleeper sleeper;

    Joiner(String name, Sleeper sleeper) {
        super(name);
        this.sleeper = sleeper;
        start();
    }
    public void run() {
        try {
            sleeper.join();
        } catch (InterruptedException e) {
            print("Interrupted");
        }
        print(getName() + " join complete");
    }
}

public class Joining {
    public static void main(String[] args) {
        Sleeper sleepy = new Sleeper("Sleepy", 1500);
        Sleeper grumpy = new Sleeper("Grumpy", 1500);

        Joiner dopey = new Joiner("Dopey", sleepy);
        Joiner doc = new Joiner("Doc", grumpy);

        grumpy.interrupt();
    }
}
Joiner는 Sleeperj이 정상 종료되거나 인터럽트 될 때까지 대기한다. 자바 SE5에는 join()보다 더 충실한 기능을 제공하는 CyclicBarrier같은 툴도 제공한다.

사용자 인터페이스 만들기

class UnresponsiveUI {
    private volatile double d = 1;

    UnresponsiveUI() throws IOException {
        while (d > 0) {
            d = d + (Math.PI + Math.E) / d;
            System.in.read(); // 이 라인은 실행되지 않는다.
        }
    }
}
public class ResponsiveUI extends Thread {
    private static volatile double d = 1;

    public ResponsiveUI() {
        setDaemon(true);
        start();
    }
    public void run() {
        while(true) {
            d = d + (Math.PI + Math.E) / d;
        }
    }
    public static void main(String[] args) throws IOException {
        //! new UnresponsiveUI(); // kill로만 종료할 수 있다.
        new ResponsiveUI();
        System.in.read();
        print(d); // 진행상태 출력
    }
}
UnresponsiveUI는 무한 while문에서 연산을 수행하므로 콘솔 입력을 처리할 수 없다. 프로그램의 반응성을 향상시키기 위해 이와 같은 연산을 run() 메소드 안에 위치시켜 백그라운드로 실행하는 것이다.

예외 감지

스레드의 속성 때문에 스레드로부터의 예외는 감지할 수 없다. 예외가 run() 메소드를 벗어나면 이 예외를 감지하기 위한 특별한 처리를 하지 않을 경우 콘솔로까지 확대된다.
자바 SE5 이전에는 스레드 그룹을 사용했으나 이제는 Executor로 해결할 수 있다.
스레드에 처리되지 않은 예외 때문에 중단될 상황이 되면 Thread.UncaughtExceptionHandler.uncaughtException()이 자동으로 호출된다. JAVA SE5에서는 새로 제공하는 Thread.UncaughtExceptionHandler 인터페이스를 통해 이를 활용할 수 있다.
다음은 Executors에서 스레드를 생성할 때 setUncaughtExceptionHandler() 메소드를 통해 별도로 처리하고 있다.
class ExceptionThread2 implements Runnable {
    @Override
    public void run() {
        Thread t = Thread.currentThread();
        print("run() by " + t);
        print("eh = " + t.getUncaughtExceptionHandler());
        throw new RuntimeException();
    }
}
class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        print("caught " + e);
    }
}
class HandlerThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        print(this + " creating new Thread");
        Thread t = new Thread(r);
        print("created " + t);
        t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
        print("eh = " + t.getUncaughtExceptionHandler());
        return t;
    }
}
public class CaptureUncaughtException {
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool(new HandlerThreadFactory());
        exec.execute(new ExceptionThread2());
        exec.shutdown();
    }
}
스레드에 UncaughtExceptionHandler가 없으면 defaultUncaughtExceptionHandler를 호출한다.
static 메소드인 Thread.setDefaultUncaughtExceptionHandler로 변경 가능하다.

2013년 7월 9일 화요일

옵저버 패턴

옵저버 패턴에서는 한 객체의 상태가 바뀌면 그 객체에 의존하는 다른 객체들한테 연락이 가고
자동으로 내용이 갱신되는 방식으로 일대다(one-to-many) 의존성을 정의합니다.
신문 구독으로 간단한 예를들 수 있다.
신문사는 새로운 신문을 발행한다. 신문을 구독 중인 고객들에게는 새 신문이 배달되고, 고객들은 자동으로 구독해지, 신청을 언제든지 할 수 있다.
신문사는 구독 중인 고객들에게 신문을 배달하는 순서는 신경쓰지 않는다.

간단한 기상 스테이션을 Observer 패턴으로 구현해보자.
다음과 같이 subject, observer 인터페이스를 만들어 간단히 구현 가능하다.

public interface Subject {
    public void registerObserver(Observer o);
    public void removeObserver(Observer o);
    public void notifyObservers();
}

public interface Observer {
    public void update(float temp, float humidity, float pressure);
}

public interface DisplayElement {
    public void display();
}
신문사에 해당하는 Subject 인터페이스와 고객인 Observer 인터페이스, 기상 정보를 표현할 DisplayElement 인터페이스로 구성되어 있다.
public class WeatherData implements Subject {
    private final ArrayList<Observer> observers;
    private float pressure;
    private float humidity;
    private float temperature;

    public WeatherData() {
        this.observers = new ArrayList<Observer>();
    }

    public void setMeasurements(int temperature, int humidity, float pressure) {
        this.temperature = temperature;
        this.humidity = humidity;
        this.pressure = pressure;
        measurementsChanged();
    }

    private void measurementsChanged() {
        notifyObservers();
    }

    @Override
    public void registerObserver(Observer o) {
        observers.add(o);
    }

    @Override
    public void removeObserver(Observer o) {
        observers.remove(o);
    }

    @Override
    public void notifyObservers() {
        for(Observer o : observers) {
            o.update(this.temperature, this.humidity, this.pressure);
        }
    }
}

public class CurrentConditionsDisplay implements Observer, DisplayElement {
    private final Subject weatherData;
    private float humidity;
    private float temperature;

    public CurrentConditionsDisplay(WeatherData weatherData) {
        this.weatherData = weatherData;
        weatherData.registerObserver(this);
    }

    @Override
    public void display() {
        System.out.println("Current conditions: " + this.temperature + "F degrees and " + this.humidity + "% humidity");
    }

    @Override
    public void update(float temp, float humidity, float pressure) {
        this.temperature = temp;
        this.humidity = humidity;
        display();
    }
}

WeatherData 클래스는 Observer들을 관리할 Observer 리스트를 가지고 있으며, registerObserver를 통해 등록된 Observer들에게 기상정보가 변경될 때 update메세지를 보낸다.

Observer인 CurrentConditionsDisplay 클래스는 weatherData의 registerObsever를 통해 Subject 클래스로 자신의 참조를 전달하고, Subject에서 update를 호줄하면 예정된 동작을 실행하게 된다.
현재 클래스에서는 생성자에서 register를 했으나 원할 때 remove, register 가능하다.

자바에서 Observer 패턴을 위한 기능이 있다.

java.util.Observable;
java.util.Observer;
위의 WeatherData와 CurrentConditionsDisplay를 아래와 같이 변경가능하다.
public class WeatherData extends Observable {
    private float pressure;
    private float humidity;
    private float temperature;

    public void setMeasurements(int temperature, int humidity, float pressure) {
        this.temperature = temperature;
        this.humidity = humidity;
        this.pressure = pressure;
        measurementsChanged();
    }

    public float getTemperature() {
        return temperature;
    }

    public float getHumidity() {

        return humidity;
    }

    public float getPressure() {

        return pressure;
    }

    private void measurementsChanged() {
        setChanged();
        notifyObservers();
    }
}

public class CurrentConditionsDisplay implements Observer, DisplayElement {
    private final Observable weatherData;
    private float humidity;
    private float temperature;

    public CurrentConditionsDisplay(Observable weatherData) {
        this.weatherData = weatherData;
        weatherData.addObserver(this);
    }

    @Override
    public void display() {
        System.out.println("Current conditions: " + this.temperature + "F degrees and " + this.humidity + "% humidity");
    }

    @Override
    public void update(Observable o, Object arg) {
        if (o instanceof WeatherData) {
            WeatherData weatherData = (WeatherData) o;
            this.temperature = weatherData.getTemperature();
            this.humidity = weatherData.getHumidity();
            display();
        }
    }
}

주의할 점은 Observable이 인터페이스가 아니라 Class라는 점이다. 따라서 Observer를 위한 자료구조를 만들 필요가 없으며, addObserver/deleteObserver 등의 메소드가 이미 구현되어 있다.
implements가 아니라 extends해야 하기 때문에 사용에 제약이 있을 수 있다.
setChanged()를 호출한 후 notifyObservers()를 호출하면 등록된 Observer들의 update가 호출된다.

Observer들은 update(Observable, Object)를 통해 메세지를 받는다.
Observable은 update를 호출한 객체를 전달하고, Object에는 Observable에서 notifyObservers(Object)로 넘겨준 param이 전달된다.(notifyObservers()는 notifyObservers(null)과 같다.)
위 코드에서 처럼 Observable의 getter를 통해 데이터를 가져와도 되고(pull), notifyObservers(Object)로 넘겨줘도 된다.(push)


참고도서 :