반응형
2019/02/26 - [JAVA] - java Blocking Queue
ArrayBlockingQueue
지정된 용량 및 기본 액세스 정책을 사용하여 ArrayBlockinQueue를 만든다.
일반 Queue 와 BlockingQueue
Queue는 크키가 정해져 있기때문에 무한히 아이템을 추가할 수 없다. 추가되는 아이템은 순서가 있으며 FIFO(First in First Out) 법칙을 따른다.
BlockingQueue는 Queue에서 아이템을 꺼내올때 비어 있으면 null 을 리턴하지 않고 아이템이 추가될때까지 기다린다고 한다. 반대로 아이템을 추가할때 가득차있다면 공간이 생길때 까지 기다린다.
ArrayBlockingQueue는 멀티 쓰레드 황경에서 사용하기 위해 구현된 queue 이며, 동시성이 보장되는 클레스로 synchronized 구문 없이도 사용해도 된다.
ArrayBlockingQueue 생성
사이즈가 10으로 제한된 ArrayBlockingQueue를 생성한다.
int capacity = 10;
private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(capacity);
Publisher / Subscriber 로 구성한 예제
public class ExecutorService2Test {
public static void main(String[] args) throws Exception {
// Producer 5개 생성
final ParallelService service = new ParallelService();
service.jobCreation("job_id_1");
service.jobCreation("job_id_2");
service.jobCreation("job_id_3");
service.jobCreation("job_id_4");
service.jobCreation("job_id_5");
// 여러개의 (5명의) Consumer 가 온것처럼 가상의 호출 시행
// (1) 클라이언트가 여러개라는 가정으로 테스트 한것으로, Customer 에서 take() 함으로, 컨슈머를 여러개 생성할경우 Thread 가 무한대기 상태에 빠지지 않도록 한다.
// (2) 컨슈머를 Producer 만큼 생성하거나, take() 를 pool() 로 변경하여 사용한다.
IntStream.rangeClosed(0, 7).forEach(value -> service.run());
// 중지 요청( 이미 시작된 Task 는 실행하고, 새로운 작업은 받지 않는다.)
ParallelService.consumer.shutdown();
ParallelService.producer.shutdown();
do {
// 종료 요청이 있었는지 체크
if (!ParallelService.consumer.isShutdown()) ParallelService.consumer.shutdown();
if (!ParallelService.producer.isShutdown()) ParallelService.producer.shutdown();
System.out.println(ParallelService.producer.awaitTermination(1, TimeUnit.MILLISECONDS));
System.out.println(ParallelService.consumer.awaitTermination(1, TimeUnit.MILLISECONDS));
} while (!ParallelService.consumer.awaitTermination(1, TimeUnit.SECONDS));
}
public static class ParallelService {
protected static final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);//new ArrayBlockingQueue<>(100);
protected static final ExecutorService producer = Executors.newWorkStealingPool();
protected static final ExecutorService consumer = Executors.newWorkStealingPool();
//protected static ThreadPoolExecutor producer = new ThreadPoolExecutor(4, 4, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
//protected static ThreadPoolExecutor consumer = new ThreadPoolExecutor(4, 4, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
public void jobCreation(String data) {
producer.execute(() -> {
String threadName = Thread.currentThread().getName();
try {
queue.put(data);
// 큐 사이즈 출력
System.out.printf("PUT , Thread - [%s] , DATA = [%s], SIZE = [%d] \n", threadName, data, queue.size());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
public void run() {
consumer.execute(() -> {
String job = "";
// 해당 Collection 에 요소가 없는경우 true를 반환
while (!queue.isEmpty()) {
try {
job = queue.take();
Thread.sleep(100);
System.out.printf("TAKE - [%s] , THREAD - [%s] \n", job, Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
});
}
/**
* 현재 실행중인 모든 작업을 종료하고, 실행 대기 중인 목록을 반환한하기
*/
public void closeShutdownNow() {
List<Runnable> unfinishedTasks = producer.shutdownNow();
if (!unfinishedTasks.isEmpty()) {
System.out.println("Not all tasks finished before calling close: " + unfinishedTasks.size());
}
}
}
}
로그를 확인해보면 생성될때와 소비될때 Thread Name 이 각각 다른것을 확인해볼수 있다.
PUT , Thread - [ForkJoinPool-1-worker-5] , DATA = [job_id_2], SIZE = [1]
PUT , Thread - [ForkJoinPool-1-worker-11] , DATA = [job_id_5], SIZE = [5]
PUT , Thread - [ForkJoinPool-1-worker-9] , DATA = [job_id_4], SIZE = [4]
PUT , Thread - [ForkJoinPool-1-worker-7] , DATA = [job_id_3], SIZE = [3]
PUT , Thread - [ForkJoinPool-1-worker-3] , DATA = [job_id_1], SIZE = [2]
TAKE - [job_id_2] , THREAD - [ForkJoinPool-2-worker-3]
TAKE - [job_id_5] , THREAD - [ForkJoinPool-2-worker-11]
TAKE - [job_id_1] , THREAD - [ForkJoinPool-2-worker-5]
TAKE - [job_id_3] , THREAD - [ForkJoinPool-2-worker-7]
TAKE - [job_id_4] , THREAD - [ForkJoinPool-2-worker-9]
Process finished with exit code 0
반응형
'JAVA' 카테고리의 다른 글
remote debuging intelliJ (0) | 2021.04.29 |
---|---|
FetureTask (0) | 2021.04.02 |
QueryDSL Null 인경우 다른 값으로 채우기 coalesce (IFNULL) (0) | 2020.04.08 |
QueryDsl Mysql DATE_ADD, ADDDATE (0) | 2020.04.08 |
Logback filter (slf4j) (0) | 2019.12.09 |