2013년 8월 29일 목요일

동시성 (1)

병렬 프로그래밍을 해야할 일이 거의 없기 때문에 아주 단편적인 내용만 알고 있었으나 좀 더 자세히 알 필요가 있을 듯 해서
Thinking In JAVA의 21장 동시성 부분을 다시 보면서 정리해본다.


동시성의 다양한 면

더 빠른 실행

프로그램을 더 빠르게 하고 싶다면 동시성을 사용하여 여분의 프로세서를 활용하는 방법을 알아야 한다.
언뜻 생각하면 프로그램의 각 부분을 순차적으로 실행하는 것보다 동시에 병렬적으로 실행할 경우 작업 간 전환이라는 컨텍스트-스위칭까지 처리해야 하기 때문에 더 많은 부하가 걸린다고 생각할 수 있다. 하지만 Block이 발생할 경우 해당 문제가 해결되기 전까지 전체 프로그램이 멈추게 된다.
동시성을 적용한다면 하나의 작업이 정지되더라도 나머지 작업은 진행할 수 있기에 프로그램은 계속적으로 진행이 되는 것이다. Block을 고려하지 않는다면 싱글 프로세서 환경에서 동시성이 가져오는 성능상의 이점은 없다.

코드 디자인의 향상

동시성은 복잡성과 같은 비용이 들어가는 기술이다. 그러나 프로그램 디자인, 자원의 분배 그리고 사용의 편의성 등의 장점을 고려하면 이러한 비용은 지불할 만하다. 또한 스레드를 사용하면 느슨하게 연결된 코드를 디자인할 수 있다.


기본적인 스레드 기법

작업의 정의

스레드는 작업을 처리하므로, 작업을 기술하는 방법이 필요하다. Runnable 인터페이스가 바로 이 작업을 정의한다. Runnable 인터페이스를 구현하고 run() 메소드를 작성하면 된다.
class LiftOff implements Runnable {
    protected int countDown = 10;
    private static int taskCount = 0;
    private final int id = taskCount++;

    public LiftOff() {
    }

    public LiftOff(int countDown) {

        this.countDown = countDown;
    }

    @Override
    public void run() {
        while(countDown-- > 0) {
            printnb(status());
            Thread.yield();  // 중요한 작업은 끝났다는 표시
        }
    }

    private String status() {
        return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + "), ";
    }
}

public class MainThread {
    public static void main(String[] args) {
        LiftOff launch = new LiftOff();
        launch.run();
    }
}
이것은 스레드 기능을 생성하지 않으므로 단순히 run() 메소드를 실행한 것에 지나지 않는다. 스레드 기능을 이용하기 위해서는 위에서 정의한 작업을 스레드에 올려야 한다.

Thread 클래스

전통적인 방법은 Thread 생성자로 작업을 넘겨 처리하는 것이다.
public class MainThread {
    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            Thread t = new Thread(new LiftOff());
            t.start();
        }
        print("Waiting for Liftoff");
    }
}
Thread 생성자는 Runnable 객체만을 필요로 한다. start() 메소드 호출은 Runnable의 run() 메소드를 호출하여 새로운 스레드에서 작업을 시작하도록 한다.
main() 메소드가 스레드 객체를 생성할 때, 생성한 객체에 대한 참조를 저장하지 않는다. 일반적인 객체라면 가비지 컬렉터에 의해 제거되겠지만, 스레드는 다르다. 각 스레드는 자체적으로 특정 위치에 참조를 등록하기 때문에 작업이 run()에서 빠져 나와 종료하기 전에는 가비지 컬렉터가 이를 제거할 수 없다.

Executor의 사용

java.util.concurrent.Executors는 스레드 객체를 관리하여 동시성 프로그래밍을 간단하게 만들어 준다.
public class MainThread {
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i &lt 5; i++) {
            exec.execute(new LiftOff());
        }
        exec.shutdown();
        print("Waiting for Liftoff");
    }
}
shutdown()의 호출은 Executor가 더 이상의 작업을 할당하지 못하게 설정한다. shutdown()의 호출 이전에 Executor에 할당한 모든 작업이 완료되면 현재 스레드(이번 경우는 main() 스레드)는 종료된다.

CachedThreadPool은 다른 유형의 Executor로 대체 가능하다.
FixedThreadPool은 제한된 개수의 작업만 할당 가능하다.
SingleThreadExecutor는 크기가 1인 FixedThreadPool과 유사하다. 만약 1개 이상의 작업이 등록되면 순차적으로 처리된다.

작업의 결과 반환하기

Runnable은 return 값이 없다. return 값이 필요하다면 Runnable이 아닌 Callable 인터페이스를 사용해야 하며, 이는 run()이 아닌 call() 메소드를 가지고, call()의 반환값을 표현하는 타입 파라미터를 사용하는 제너릭이다. ExecutorService.submit() 메소드를 통해서만 실행해야 한다.
class TakeWithResult implements Callable%ltString&gt {
    private int id;
    public TakeWithResult(int id) {
        this.id = id;
    }

    @Override
    public String call() throws Exception {
        return "result of TaskWithResult " + id;
    }
}

public class CallableDemo {
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        ArrayList&ltFuture&ltString&gt&gt results = new ArrayList&ltFuture&ltString&gt&gt();

        for (int i = 0; i < 10; i++) {
            results.add(exec.submit(new TakeWithResult(i)));
        }

        for (Future&ltString&gt fs : results) {
            try {
                // get()은 종료시까지 Block 된다.
                print(fs.get());
            } catch (InterruptedException e) {
            } catch (ExecutionException e) {
            } finally {
                exec.shutdown();
            }
        }
    }
}
Callable에 의해 반환되는 결과는 파라미터화 된 Future 객체다.
Future의 isDone()으로 종료 여부를 확인 할 수 있으며, 그냥 get()을 호출하면 결과가 준비될 때까지 Block된다. get()에 타임아웃을 설정할 수도 있다.

Sleeping

일정시간 작업을 멈추게 하는 Sleep()을 호출하여 변화를 줄 수 있다. yield()를 Sleep()으로 대체하면 다음과 같다
public class SleepingTask extends LiftOff {
    @Override
    public void run() {
        while(countDown-- > 0) {
            printnb(status());
            try {
                // 예전스타일
                // Thread.sleep(100);
                // 자바 SE 5/6 스타일
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
            }
        }
    }

    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            exec.execute(new SleepingTask());
        }
        exec.shutdown();
    }
}
Sleep()의 호출은 InterruptedException을 발생시킬 수 있다. main()으로 전달 되지 않기 때문에 반드시 잡아야 한다.
각 작업에서 Sleep()이 호출되기 때문에 균등하게 분산된 순서로 처리 될 수 있으나 전적으로 신뢰해서는 안된다.

우선 순위

CPU가 스레드를 실행하는 순서를 결정할 순 없지만 동일한 조건이라면 우선순위가 높은 스레드를 실행하고 낮은 스레드는 대기시킨다. 우선순위가 낮은 스레드가 실행되지 않음을 의지하는 것은 아니다.
우선순위를 조절하는 것은 큰 위험을 수반하므로 신중하게 사용해야 한다.
getPriority(), setPriority()를 사용하여 언제든 변경 가능하다.
public class SimplePriorities implements  Runnable {
    private int priority;
    private volatile double d;
    private int countDown = 5;

    public SimplePriorities(int priority) {
        this.priority = priority;
    }

    @Override
    public String toString() {
        return Thread.currentThread() + ": " + countDown;
    }

    @Override
    public void run() {
        Thread.currentThread().setPriority(priority);
        while(true) {
            for (int i = 1; i < 100000; i++) {
                d += (Math.PI + Math.E) / (double) i;
                if(i % 1000 == 0)
                    Thread.yield();
            }
            print(this);
            if(--countDown == 0) return;
        }
    }

    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        for(int i = 0; i < 5; i++) {
            exec.execute(new SimplePriorities(Thread.MIN_PRIORITY));
        }
        exec.execute(new SimplePriorities(Thread.MAX_PRIORITY));
        exec.shutdown();
    }
}
스레드는 의미없는 double형 계산을 수행하고 있다. 마지막에 실행한 스레드가 가장 높은 우선순위로 설정되었다.
결과는 아래와 같다. 높은 우선순위의 스레드가 더 선호되는 것은 확인되지만, 무조건 최우선으로 처리 되진 않았다.
Thread[pool-1-thread-3,1,main]: 5
Thread[pool-1-thread-6,10,main]: 5
Thread[pool-1-thread-5,1,main]: 5
Thread[pool-1-thread-4,1,main]: 5
Thread[pool-1-thread-3,1,main]: 4
Thread[pool-1-thread-5,1,main]: 4
Thread[pool-1-thread-6,10,main]: 4
Thread[pool-1-thread-2,1,main]: 5
Thread[pool-1-thread-1,1,main]: 5
Thread[pool-1-thread-5,1,main]: 3
Thread[pool-1-thread-3,1,main]: 3
Thread[pool-1-thread-5,1,main]: 2
Thread[pool-1-thread-3,1,main]: 2
Thread[pool-1-thread-6,10,main]: 3
Thread[pool-1-thread-4,1,main]: 4
Thread[pool-1-thread-2,1,main]: 4
Thread[pool-1-thread-1,1,main]: 4
Thread[pool-1-thread-5,1,main]: 1
Thread[pool-1-thread-3,1,main]: 1
Thread[pool-1-thread-4,1,main]: 3
Thread[pool-1-thread-6,10,main]: 2
Thread[pool-1-thread-2,1,main]: 3
Thread[pool-1-thread-1,1,main]: 3
Thread[pool-1-thread-4,1,main]: 2
Thread[pool-1-thread-6,10,main]: 1
Thread[pool-1-thread-2,1,main]: 2
Thread[pool-1-thread-1,1,main]: 2
Thread[pool-1-thread-4,1,main]: 1
Thread[pool-1-thread-2,1,main]: 1
Thread[pool-1-thread-1,1,main]: 1
JAVA에서는 10단계의 우선순위를 제공하지만 OS와 고정적으로 매핑되어 있진 않다. 이식성을 위해서 MAX_PRIORITY, MIN_PRIORITY를 권장한다.

양보하기

run() 메소드에서 작업이 완료되었다고 판단이 되면 yield() 메소드를 통해 스케줄러에게 다른 스레드에게 CPU를 할당 해도 된다고 알려 줄 수 있다.
단지 동일한 우선순위를 가지는 다른 스레드가 실행되어도 된다는 것을 알려주는 것일 뿐 이다. 튜닝이나, 민감한 제어가 yield()에 의존해서는 안된다.

데몬 스레드

데몬 스레드는 주된 기능은 아니지만 프로그램이 실행되는 동안 백그라운드로 실행되는 서비스를 제공하기 위해 사용된다.
데몬 스레드는 일반 스레드와는 다르게 비-데몬스레드가 종료되면 실행 중이더라도 모든 데몬 스레드가 종료된다.
public class SimpleDaemons implements Runnable {
    @Override
    public void run() {
        try {
            while(true) {
                TimeUnit.MILLISECONDS.sleep(500);
                print(Thread.currentThread() + " " + this);
            }
        } catch (InterruptedException e) {
            print("sleep() interrupted");
        }
    }
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            Thread daemon = new Thread(new SimpleDaemons());
            daemon.setDaemon(true);  // start() 이전에 호출되어야 함
            daemon.start();
        }
        print("All daemons started");
        TimeUnit.MILLISECONDS.sleep(600);
    }
}
심지어 데몬 스레드의 finally 구문도 실행되지 않고 강제 종료된다.
이것은 스레드를 정상적으로 종료할 수 없다는 의미이므로 바람직한 사용은 아니다.


다양한 코딩 방법

작업을 Runnable로 구현하지 않고 Thread로부터 직접 상속받을 수도 있다.
public class SimpleThread extends Thread {
    private static int threadCount = 0;
    private int countDown = 5;
    public SimpleThread() {
        super(Integer.toString(++threadCount));
        start();
    }
    
    @Override
    public String toString() {
        return getName() + "{" + countDown + "}, ";
    }

    @Override
    public void run() {
        while(true) {
            printnb(this);
            if(--countDown == 0)
                return;
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            new SimpleThread();
        }
    }
}
Thread를 상속하면 다른 클래스를 상속할 수 없다는 단점이 있지만, Runnable 인터페이스를 구현하면 그런 제한이 사라진다.
위 예제는 생성자에서 start() 메소드를 호출하고 있다. 이는 매우 단순한 예제이기 때문에 안전하지만, 생성자에서 스레드를 시작하는 것은 매우 위험하다. 생성자가 완료되기 전에 또 다른 작업이 시작되어 불완전한 객체에 접근할 수도 있기 때문이다.
class ThreadMethod {
    private int countDown = 5;
    private Thread t;
    private String name;
    public ThreadMethod(String name) {
        this.name = name;
    }
    public void runTask() {
        if(t == null) {
            t = new Thread(name) {
                public void run() {
                    try {
                        while(true) {
                            print(this);
                            if(--countDown == 0) return;
                            sleep(10);
                        }
                    } catch (InterruptedException e) {
                        print("sleep() interrupted");
                    }
                }
                public String toString() {
                    return getName() + ": " + countDown;
                }
            };
            t.start();
        }
    }
}
ThreadMethod 클래스는 메소드 안에서 스레드 생성을 보여준다.

용어 정리

실행되는 작업과 작업을 구동하는 스레드를 구별해야 한다. 일단 작업을 생성(Runnable)하고 생성한 작업을 스레드(Thread)에 탑재해야 한다. 작업과 스레드가 혼용되어 혼란을 시킬 수 있다.

조인

하나의 스레드는 다른 스레드에 대하여 join()을 호출하여 그 스레드가 종료될 때까지 대기 할 수 있다.
class Sleeper extends Thread {
    private int duration;
    public Sleeper(String name, int sleepTime) {
        super(name);
        this.duration = sleepTime;
        start();
    }
    public void run() {
        try {
            sleep(duration);
        } catch (InterruptedException e) {
            print(getName() + " was interrupted. " + "isInterrupted(): " + isInterrupted());
            return;
        }
        print(getName() + " has awakened");
    }
}

class Joiner extends Thread {
    private Sleeper sleeper;

    Joiner(String name, Sleeper sleeper) {
        super(name);
        this.sleeper = sleeper;
        start();
    }
    public void run() {
        try {
            sleeper.join();
        } catch (InterruptedException e) {
            print("Interrupted");
        }
        print(getName() + " join complete");
    }
}

public class Joining {
    public static void main(String[] args) {
        Sleeper sleepy = new Sleeper("Sleepy", 1500);
        Sleeper grumpy = new Sleeper("Grumpy", 1500);

        Joiner dopey = new Joiner("Dopey", sleepy);
        Joiner doc = new Joiner("Doc", grumpy);

        grumpy.interrupt();
    }
}
Joiner는 Sleeperj이 정상 종료되거나 인터럽트 될 때까지 대기한다. 자바 SE5에는 join()보다 더 충실한 기능을 제공하는 CyclicBarrier같은 툴도 제공한다.

사용자 인터페이스 만들기

class UnresponsiveUI {
    private volatile double d = 1;

    UnresponsiveUI() throws IOException {
        while (d > 0) {
            d = d + (Math.PI + Math.E) / d;
            System.in.read(); // 이 라인은 실행되지 않는다.
        }
    }
}
public class ResponsiveUI extends Thread {
    private static volatile double d = 1;

    public ResponsiveUI() {
        setDaemon(true);
        start();
    }
    public void run() {
        while(true) {
            d = d + (Math.PI + Math.E) / d;
        }
    }
    public static void main(String[] args) throws IOException {
        //! new UnresponsiveUI(); // kill로만 종료할 수 있다.
        new ResponsiveUI();
        System.in.read();
        print(d); // 진행상태 출력
    }
}
UnresponsiveUI는 무한 while문에서 연산을 수행하므로 콘솔 입력을 처리할 수 없다. 프로그램의 반응성을 향상시키기 위해 이와 같은 연산을 run() 메소드 안에 위치시켜 백그라운드로 실행하는 것이다.

예외 감지

스레드의 속성 때문에 스레드로부터의 예외는 감지할 수 없다. 예외가 run() 메소드를 벗어나면 이 예외를 감지하기 위한 특별한 처리를 하지 않을 경우 콘솔로까지 확대된다.
자바 SE5 이전에는 스레드 그룹을 사용했으나 이제는 Executor로 해결할 수 있다.
스레드에 처리되지 않은 예외 때문에 중단될 상황이 되면 Thread.UncaughtExceptionHandler.uncaughtException()이 자동으로 호출된다. JAVA SE5에서는 새로 제공하는 Thread.UncaughtExceptionHandler 인터페이스를 통해 이를 활용할 수 있다.
다음은 Executors에서 스레드를 생성할 때 setUncaughtExceptionHandler() 메소드를 통해 별도로 처리하고 있다.
class ExceptionThread2 implements Runnable {
    @Override
    public void run() {
        Thread t = Thread.currentThread();
        print("run() by " + t);
        print("eh = " + t.getUncaughtExceptionHandler());
        throw new RuntimeException();
    }
}
class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        print("caught " + e);
    }
}
class HandlerThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        print(this + " creating new Thread");
        Thread t = new Thread(r);
        print("created " + t);
        t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
        print("eh = " + t.getUncaughtExceptionHandler());
        return t;
    }
}
public class CaptureUncaughtException {
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool(new HandlerThreadFactory());
        exec.execute(new ExceptionThread2());
        exec.shutdown();
    }
}
스레드에 UncaughtExceptionHandler가 없으면 defaultUncaughtExceptionHandler를 호출한다.
static 메소드인 Thread.setDefaultUncaughtExceptionHandler로 변경 가능하다.