2013년 9월 3일 화요일

동시성 (2)

자원 공유

단일 스레드 프로그램에선 발생하지 않는 문제이지만, 여러 스레드를 사용하면 동시에 같은 자원을 사용하려 할 때 문제가 발생한다. 두 사람이 동시에 같은 장소에 주차하려는 상황저럼..

자원으로의 잘못된 접근

짝수를 생성하는 작업과 생성된 짝수를 검증하는 두개의 작업을 살펴보자.
public abstract class IntGenerator {
    private volatile boolean canceled = false;
    public abstract int next();
    public void cancel() {canceled = true;}
    public boolean isCanceled() {return canceled;}
}

public class EvenChecker implements Runnable {
    private IntGenerator generator;
    private final int id;

    public EvenChecker(IntGenerator generator, int id) {
        this.generator = generator;
        this.id = id;
    }
    public void run () {
        while(!generator.isCanceled()) {
            int val = generator.next();
            if(val % 2 != 0) {
                print(Thread.currentThread().getName() + " " + val + " not even!");
                generator.cancel();
            }
        }
        print(Thread.currentThread().getName() + " " + "Generator is canceled");
    }

    public static void test(IntGenerator gp, int count) {
        print("Press Control-C to exit");
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i < count; i++) {
            exec.execute(new EvenChecker(gp, i));
        }
        exec.shutdown();
    }
    public static void test(IntGenerator gp) {
        test(gp, 10);
    }
}

public class EvenGenerator extends IntGenerator {
    private int currentEvenValue = 0;

    @Override
    public int next() {
        ++currentEvenValue;  // 위험한 부분
        ++currentEvenValue;
        return currentEvenValue;
    }

    public static void main(String[] args) {
        EvenChecker.test(new EvenGenerator());
    }
}
총 10개의 스레드가 같은 Generator 객체를 사용하고 있다. 한 스레드가 next()를 호출하여 currentEvenValue가 증가하는 순간 다른 스레드가 next()를 호출하는 경우는 충분이 발생할 수 있다. 이것은 내부의 값을 부정확한 생태로 빠뜨리게 한다.
Press Control-C to exit
pool-1-thread-1 3707 not even!
pool-1-thread-1 Generator is canceled
pool-1-thread-4 Generator is canceled
pool-1-thread-5 Generator is canceled
pool-1-thread-2 3711 not even!
pool-1-thread-2 Generator is canceled
pool-1-thread-3 3709 not even!
pool-1-thread-3 Generator is canceled
pool-1-thread-6 Generator is canceled
pool-1-thread-7 Generator is canceled
pool-1-thread-4 Generator is canceled
pool-1-thread-3 Generator is canceled
pool-1-thread-6 Generator is canceled
next() 메소드가 복수의 단계를 필요하므로 스레드 메커니즘에 의해 증가 단계의 중간에 작업이 끼어들 수 있다는 것을 명심해야 한다.

공유 자원 경쟁의 해결

동시성이 제대로 동작하려면 두 작업이 동시에 같은 자원에 접근하는 것을 막아야 한다. 이것은 해당 자원을 사용 중일 때 자물쇠를 잠그는 것으로 간단히 해결된다.
자원에 대한 충돌을 방지하기 위하여 자바는 기본적으로 synchronized 키워드를 제공한다. 작업이 synchronized 키워드로 보호되는 코드를 실행하면, 락(lock)이 사용가능한지 확인하고 획득한 다음 코드를 실행하고 완료되면 락을 해제한다.
메소드를 다음과 같이 synchronized 메소드로 선언하여 충돌을 방지할 수 있다.
synchronized void f() { ... }
synchronized void g() { ... }
다만 모든 객체는 모니터라고 불리는 단일 락을 가지고 있다. synchronized 메소드를 호출하면 그 객체가 가지고 있는 다른 모든 synchronized 메소드에 대한 접근이 블럭된다.
public class EvenGenerator extends IntGenerator {
    private int currentEvenValue = 0;

    @Override
    public synchronized int next() {
        ++currentEvenValue;  // 위험한 부분
        ++currentEvenValue;
        return currentEvenValue;
    }

    public static void main(String[] args) {
        EvenChecker.test(new EvenGenerator());
    }
}
클래스에 대한 단일 락도 존재하여 synchronized static 메소드는 클래스 당 static 데이터의 동시접근을 막기 위한 락을 제공한다.

Brian's Rule of Synchronization
이후에 다른 스레드가 읽을 수 있는 변수를 기록할 때나 이전에 다른 스레드가 기록했을 수 있는 변수를 읽을 때
동기화를 사용해야 한다. 추가적으로 읽는 객체나 기록하는 객체 모두 동일한 모니터 락을 사용하여 동기화를
해야 한다.

명시적인 락

java.util.concurrent.locks에 정의된 명시적인 뮤텍스 메커니즘을 제공한다. 락 객체를 명시적으로 생성, 잠그고 해제 한다.
public class MutexEvenGenerator extends IntGenerator {
    private int currentEvenValue = 0;
    private Lock lock = new ReentrantLock();

    @Override
    public int next() {
        lock.lock();
        try {
            ++currentEvenValue;
            ++currentEvenValue;
            return currentEvenValue;
        } finally {
            lock.unlock();
        }
    }
    public static void main(String[] args) {
        EvenChecker.test(new MutexEvenGenerator());
    }
}
MutexEvenGenerator는 lock이라는 뮤텍스를 추가하고 next()안에서 임계영역을 생성하기 위하여 lock(), unlock() 메소드를 사용한다. lock()을 호출한 직후 try-finally 블럭을 사용하고 finally 안에서 unlock()을 호출한다.
try 블럭 안에서 return을 호출하여 unlock()이 너무 빨리 호출되지 않도록 유의한다.
synchronized 키워드 내부에서 예외가 발생하면 클린업을 수행할 기회가 없지만, 명시적인 lock 객체를 사용하면 finally 구문 안에서 사용 가능하다.
synchronized에서는 락 획득을 시도하거나 특정 시간 동안의 락 획득 시도 이후 포기와 같은 것을 할 수 없다. 명시적 lock 객체는 획득과 해제에 있어 synchronized 보다 좀 더 세밀한 제어를 가능하게 한다.

원자성(atomicity)과 휘발성(volatile)

원자성 오퍼레이션으 스레드 스케줄러에 의한 인터럽트가 발생하지 않는 오퍼레이션을 말하는데, 이러한 원자성을 신뢰하는 것은 위험한 것이다. 때때로 원자성 오퍼레이션이 안전해야 할 것 같은 상황에서도 실제로는 안전하지 않을 수 있다.
volatile 키워드는(SE5 이전에는 동작하지 않는다) 해당 필드에 수정이 가해지면 모든 읽기가 변화한다는 것을 의미한다. non-volatile 필드에 대한 원자성 오퍼레이션은 주 메모리에 저장하지 않고 레지스터 캐쉬에 저장될 수 있어 이 필드를 읽는 다른 작업은 새로운 값을 읽어 가지 않을 수도 있다. 복수의 작업이 한 필드에 접근을 한다면 그 필드는 volatile 되어야 한다.

원자(Atomic)클래스

자바 SE5는 AtomicInteger, AtomicLong, AtomicReference와 같이 원자 오퍼레이션을 가진 특별한 원자 클래스를 제공한다.
public class AtomicEvenGenerator extends IntGenerator {
    private AtomicInteger currentEvenValue = new AtomicInteger(0);

    @Override
    public int next() {
        return currentEvenValue.addAndGet(2);
    }
    public static void main(String[] args) {
        EvenChecker.test(new AtomicEvenGenerator());
    }
}
AtomicInteger를 사용하였기 때문에 별도의 동기화 기술이 필요 없다. java.util.concurrent의 클래스를 작성하기 위해 디자인 되었으며, 비록 이것이 문제를 일으키지 않을 것을 확신하더라도 특수한 상황에서만 사용해야 한다. 일반적으로 sychronized 키워드나 명시적 lock을 사용하는 것이 더 안전하다.

임계영역

때때로 메소드 전체가 아닌 일부분에 대하여 다수 스레드의 접근을 제한하고자 할 때가 있다. 이런 영역을 임계영역(Critical Section)이라고 하며 synchronized 키워드를 사용하여 생성한다. 여기서 synchronized는 내부의 코드를 동기화하는데 사용되는 락을 가지고 있는 객체를 지정하기 위해 사용된다.
synchronized(syncObject) {
    // 이 부분은 한번에 하나의 작업만
    // 접근 할 수 있다.
}
이는 동기화 블럭이라고 부르기도 한다. 이 부분을 수행하기 위해서는 우선 syncObject의 락을 획득해야 한다.
다음 코드에서 PairManager2의 락을 거는 시간이 현저히 작다. 이것이 전체 메소드의 동기화가 아닌 동기화 블럭을 사용하는 이유다.
abstract class PairManager {
    AtomicInteger checkCounter = new AtomicInteger(0);
    protected Pair p = new Pair();
    private List<Pair> storage = Collections.synchronizedList(new ArrayList<Pair>());
    public synchronized Pair getPair() {
        return new Pair(p.getX(), p.getY());
    }
    // 시간을 소비하는 오퍼레이션으로 가정
    protected void store(Pair p) {
        storage.add(p);
        try {
            TimeUnit.MILLISECONDS.sleep(50);
        } catch (InterruptedException e) {

        }
    }
    public abstract void increment();
}

// 전체 메소드 동기화
class PairManager1 extends PairManager {
    @Override
    public synchronized void increment() {
        p.incrementX();
        p.incrementY();
        store(getPair());
    }
}

// 임계영역 사용
class PairManager2 extends PairManager {
    @Override
    public void increment() {
        Pair temp;
        synchronized (this) {
            p.incrementX();
            p.incrementY();
            temp = getPair();
        }
        store(temp);
    }
}
시간을 소비하는 store() 메소드는 Pair 객체를 동기화된 ArrayList에 추가하므로 스레드에 안전하다고 할 수 있다. 그러므로 보호될 필요가 없으며 PairManager2의 동기화 블럭 외부에 위치한다.

다른 객체에 대한 동기화

동기화 블럭은 동기화에 사용할 객체를 지정해야 하며, 일반적으로 동기화에 가장 적합한 객체는 해당 메소드가 호출되고 있는 현재의 객체가 된다. 동기화 블럭에서 락을 획득하고 나면, 객체의 다른 동기화 메소드나 임계 영역에 대한 접근은 거부된다. 그러므로 this를 사용하여 동기화 블럭을 지정하면 임계영역으로 인해 동기화의 범위가 줄어든다. 반대로 말하면 동기화시 서로 다른 객체를 사용하면 두 작업이 동시에 접근 가능하다.
class DualSync {
    private Object syncObject = new Object();
    public synchronized void f() {
        for (int i = 0; i < 5; i++) {
            print("f()");
            Thread.yield();
        }
    }
    public void g() {
        synchronized (syncObject) { // this로 바꿀 경우 f(), g() 동시 접근이 불가능 하다.
            for (int i = 0; i < 5; i++) {
                print("g()");
                Thread.yield();
            }
        }
    }
}
public class SyncObject {
    public static void main(String[] args) {
        final DualSync ds = new DualSync();
        new Thread() {
            public void run() {
                ds.f();
            }
        }.start();
        ds.g();
    }
}

스레드 로컬 저장소

공유자원에 대한 작업 충돌을 방지하는 다른 방법은 공유 자체를 제거하는 것이다.객체를 사용하는 서로 다른 스레드마다 동일한 변수에 대하여 자동으로 서로 다른 저장소를 생성하는 메커니즘이다. 스레드 로컬 저장소의 생성 및 관리는 java.lang.ThreadLocal 클래스가 수행한다.
class Accessor implements Runnable {
    private final int id;

    public Accessor(int id) { this.id = id; }
    @Override
    public void run() {
        while(!Thread.currentThread().isInterrupted()) {
            ThreadLocalVariableHolder.increment();
            print(this);
            Thread.yield();
        }
    }
    @Override
    public String toString() {
        return "Accessor{id=" + id + " : " + ThreadLocalVariableHolder.get() + '}';
    }
}

public class ThreadLocalVariableHolder {
    private static ThreadLocal<Integer> value = new ThreadLocal<Integer>() {
        private Random rand = new Random(47);
        protected synchronized Integer initialValue() {
            return rand.nextInt(10000);
        }
    };
    public static void increment() { value.set(value.get() + 1); }
    public static int get() { return value.get(); }
    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            exec.execute(new Accessor(i));
        }
        TimeUnit.MILLISECONDS.sleep(100); // 잠시 동안 실행
        exec.shutdownNow();  // 모든 Accessor 종료
    }
}
각 스레드에서 사용되는 ThreadLocal은 초기값을 다르게 설정하도록 되어 있다. 각 스레드에서 같은 static 객체를 참조하고 있지만 실제로는 각 스레드의 ThreadLocal.ThreadLocalMap에서 해당 값을 참조한다. 같은 객체를 참조하는 것처럼 보이지만 각 Thread별 다른 객체를 참조하도록 한다. 쓸데가 있을지는 고민을 해봐야겠다.


작업 종료하기

식물원

각 입구 마다 카운터가 설치되어 입장객의 수를 세고 있으며, 입구의 카운터가 늘어날 때 마다 공유하는 전체 카운터도 같이 증가한다.
class Count {
    private int count = 0;
    private Random rand = new Random(47);
    public synchronized int increment() {
        int temp = count;
        if(rand.nextBoolean())
            Thread.yield();
        return (count = ++temp);
    }
    public synchronized int value() { return count; }
}

class Entrance implements Runnable {
    private static Count count = new Count();
    private static List<Entrance> entrances = new ArrayList<Entrance>();
    private int number = 0;
    private final int id;
    private static volatile boolean canceled = false;
    // 휘발성 필드에 대한 원자성 오퍼레이션
    public static void cancel() { canceled = true; }
    public Entrance(int id) {
        this.id = id;
        // 현재 작업을 리스트에 보관
        // 죽은 작업의 가비지 컬렉션을 방지한다.
        entrances.add(this);
    }
    @Override
    public void run() {
        while(!canceled) {
            synchronized (this) {
                ++number;
            }
            print(this + " Total: " + count.increment());
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                print("sleep interrupted");
            }
        }
        print("stopping " + this);
    }
    public synchronized int getValue() { return number; }
    public String toString() {
        return "Enttance " + id + ": " + getValue();
    }
    public static int getTotalCount() {
        return count.value();
    }
    public static int sumEntrances() {
        int sum = 0;
        for(Entrance entrance : entrances) {
            sum += entrance.getValue();
        }
        return sum;
    }
}
public class OrnamentalGarden {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            exec.execute(new Entrance(i));
        }
        TimeUnit.MILLISECONDS.sleep(2000);
        Entrance.cancel();
        exec.shutdown();
        if(!exec.awaitTermination(250, TimeUnit.MILLISECONDS)) {
            print("Some tasks were not terminated!");
        }
        print("Total: " + Entrance.getTotalCount());
        print("Sum of Entrances: " + Entrance.sumEntrances());
    }
}
위 프로그램은 모든 것을 안전하게 종료하는 것에 조금 문제가 있다.

블럭 상태에서의 종료

스레드 상태

  • New : 스레드를 생성하는 순간, 이 후 실행 가능한 Runnable 혹은 Blocked 상태로 된다.
  • Runnable : 이용 가능한 CPU가 있으면 실행 할 수 있다는 것을 의미함, 실행 중이 아닐 수도 있으며 스케줄러가 할당되면 실행하는데 장애가 없음
  • Blocked : 스레드가 실행될 수 있지만 무언가가 막고 있는 상태
    • sleep()이 호출되어 지정된 시간 동안 실행이 중단된 경우
    • wait()을 통하여 스레드 실행이 지연되는 경우, notify() 혹은 notifyAll()을 수신하지 않으면 Runnable로 바뀌지 않는다.
    • 특정 I/O가 완료되기를 기다릴 때
    • 다른 객체의 synchronized 메소드를 호출 시도했으나 다른 작업에 의해 lock 된 경우
  • Dead : 죽거나 완료된 상태, CPU를 할당받지 않으며 작업이 완래되고 더이상 Runnable로 바뀌지 않는다. run()에서 return 되거나, interrupt 되는 경우다.
위의 Blocked 상태에서 작업을 종료하고자 할 때, 작업 내부의 로직이 스스로 종료를 감지하는 것을 기다릴 수 없다면 강제적으로 종료해야 한다.

인터럽트

블럭된 작업의 종료를 위해 Thread 클래스는 interrupt() 메소드를 제공하며 이는 스레드의 인터럽트 상태를 변경한다. 인터럽트 상태가 된 스레드는 이미 블럭되어 있거나 블럭될 오퍼레이션을 사용하려고 할 때 InterruptException을 발생시키며, 예외를 발생시킨 후 또는 작업이 Thread.interrupted()를 호출하면 인터럽트 상태를 다시 초기화한다.
다음 코드는 Executor를 사용하여 스레드를 시작할 때 execute()가 아닌 submit()을 호출하여 얻은 스레드의 컨텍스트를 통해 cancel()을 호출해서 해당 작업에 인터럽트를 전달하고 있다. cancel()에 true를 전달하면 해당 스레드에 interrupt()를 호출하여 중단시킬 권한을 부여하는 것이다.
class SleepBlocked implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(100);
        } catch (InterruptedException e) {
            print("InterruptedException");
        }
        print("Exiting SleepBlocked.run()");
    }
}
class IOBlocked implements Runnable {
    private InputStream in;

    IOBlocked(InputStream in) {
        this.in = in;
    }

    @Override
    public void run() {
        try {
            print("Waiting for read()");
            in.read();
        } catch (IOException e) {
            if(Thread.currentThread().isInterrupted()) {
                print("Interrupted from block I/O");
            } else {
                throw new RuntimeException(e);
            }
        }
        print("Exiting IOBlocked.run()");
    }
}
class SynchronizedBlocked implements Runnable {
    public synchronized void f() {
        while(true) {
            Thread.yield();
        }
    }
    public SynchronizedBlocked() {
        new Thread() {
            public void run() {
                f();
            }
        }.start();
    }
    public void run() {
        print("Trying to call f()");
        f();
        print("Exiting SynchronizedBlocked.run()");
    }
}
public class Interrupting {
    private static ExecutorService exec = Executors.newCachedThreadPool();
    static void test(Runnable r) throws InterruptedException {
        Future f = exec.submit(r);
        TimeUnit.MILLISECONDS.sleep(100);
        print("Interrupting " + r.getClass().getName());
        f.cancel(true);  // 실행중이라면 인터럽트
        print("Interrupt sent to " + r.getClass().getName());
    }
    public static void main(String[] args) throws Exception {
        test(new SleepBlocked());
        test(new IOBlocked(System.in));
        test(new SynchronizedBlocked());
        TimeUnit.SECONDS.sleep(3);
        print("Aboarting with System.exit(0)");
        System.exit(0); // 두 번의 인터럽트가 실패할 경우
    }
}
SleepBlock, IOBlockd, SynchronizedBlocked 세 종류의 작업이 있다. 결과를 보면 sleep()에 대하여는 인터럽트를 걸 수 있다는 것을 알 수 있다. 하지만 동기화 락이나 I/O를 수행하는 상태의 블럭에 대해서는 인터럽트가 발생하지 않는다. I/O의 문제를 위한 한가지 방법은 해당 락을 유발한 자원을 닫는 것이다.
I/O 장에서 소개된 nio 클래스는 세련된 인터럽트를 제공한다. ReentrantLock에 의해 블록 된 작업은 인터럽트 처리가 가능하다.

인터럽트 감지

스레드에 interrupt()를 호출하면 작업이 블럭 상태에 들어가려고 하거나 이미 들어간 상태에서만 인터럽트가 발생한다.(I/O 및 동기화 메소드를 통한 블럭은 인터럽트가 발생하지 않는다.) 하지만 작업을 멈추기 위해 interrupt()를 호출했다면, 블럭 상황이 아니더라도 빠져 나올 수 있는 방법이 필요하다.
이것은 interrupt() 호출로 변경되는 인터럽트 상태로 해결 될 수 있다. interrupted()를 호출하여 상태를 확인 할 수 있고, 상태를 초기화하는 역할도 한다.(동일한 인터럽트를 두번 받지 않는다.) 즉 인터럽트의 확인은 한 번의 InterruptedException 감지나 한 번의 Thread.interrupted()의 성공적인 반환으로 가능하다.
class NeedsCleanup {
    private final int id;
    NeedsCleanup(int ident) {
        this.id = ident;
        print("NeedsCleanup " + id);
    }
    public void cleanup() {
        print("Cleaning up " + id);
    }
}
class Blocked3 implements Runnable {
    private volatile double d = 0.0;

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                // 첫번째 포인트
                NeedsCleanup n1 = new NeedsCleanup(1);
                // n1의 정의 직후 try-finally 구문 시작
                // n1의 정확한 소거를 보장하기 위해서이다
                try {
                    print("Sleeping");
                    TimeUnit.SECONDS.sleep(1);
                    // 두번째 포인트
                    NeedsCleanup n2 = new NeedsCleanup(2);
                    // n2의 정확한 소거 보장
                    try {
                        print("Calculating");
                        // 시간을 소비하지만 블로킹 되지 않은 오퍼레이션
                        for (int i = 1; i < 250000; i++) {
                            d = d + (Math.PI + Math.E) / d;
                        }
                        print("Finished time-consuming operation");
                    } finally {
                        n2.cleanup();
                    }
                } finally {
                    n1.cleanup();
                }
            }
            print("Exiting via while() test");
        } catch (InterruptedException e) {
            print("Exiting via InterruptedException");
        }
    }
}
public class InterruptingIdiom {
    public static void main(String[] args) throws InterruptedException {
        if (args.length != 1) {
            print("Usage: java InterruptingIdiom delay-in-ms");
            System.exit(1);
        }
        Thread t = new Thread(new Blocked3());
        t.start();
        TimeUnit.MILLISECONDS.sleep(new Integer(args[0]));
        t.interrupt();
    }
}
예외가 발생하여 순환문을 빠져 나올 때는 적절한 자원 반환(CleanUp)이 꼭 필요하다. interrupt()에 반응하는 클래스를 만들 때에는 객체 생성시 뒤 따르는 try-finally를 통해 객체 소거가 보장되어야 한다.