개발 일지/주간 개발 일지

[03월 4주차] OkHttp 예제, Thread와 ExecutorService

계란💕 2023. 3. 26. 14:55

OkHttp 구현 방법

  • OkUtils 클래스 안에 개발자가 직접 정의한 GET 메서드를 활용해서 사용자가 굳이 url을 입력하지 않아도 개발자가 원하는 때에 수신 서버에 대한 특정 Http 요청이 실행되게끔 구현 가능하다.

 

 

  Ex) OkHttp  POST 구현

  • OkHttpUtil
    • Util 클래스의 post() 안에서 HTTP Request 를 보내고 상대방 서버에서 보낸 Response를 바로 받을 수 있다.
    • 서버에서  WorkflowId를 @PathVariable로 받을 수 있도록 URL에 넣어서 보내준다. 
    • MediaType은 json 형태로 주고 받으니까 body에 다음과 같이 세팅한다. 
    • 아래 코드는 URL이  {POST 요청하는 서버 IP 주소} : port 번호/{workflowId} 인 경우에 POST 요청하는 것과 같다. 
      • 즉, 내 서버에서 상대방 서버의 POST를 이용하기 위해서 상대방 서버의 컨트롤러 안에  POST 메서드가 구현되어 있어야한다. 
      • (OkHttp 로직이 이해 안 가서 내 컨트롤러에도 무언가를 구현해야되나 싶었으나 내 컨트롤러는 뭔가 만들 필요가 없다.)
    • Request 구문이 원래 try 위에 있었는데 현재  수신 서버를 띄워놓은 상태가 아니므로 에러가 난다. 
      • 그래서 try{} 안에 함께 넣어줬다. 
    • body는 필요에 따라 데이터를 넣어주도록 한다. 
  •  
<hide/>
public static void post(String workflowId, String data, Service service, String historyId) {

    OkHttpClient client = new OkHttpClient();
    Map<String, Object> body = new HashMap<>();
    body.put("a", true);
    body.put("b", "listener");
    body.put("c", data);
    RequestBody requestBody = RequestBody.create(new Gson().toJson(body), MediaType.parse("application/json; charset=utf-8"));

    try {
        Request request = new Request.Builder()
            .url(BASE_URL + DETAILED_URL + workflowId).post(requestBody).build();
        Response response = client.newCall(request).execute();
        service.setCallYn(response, historyId);

    } catch (IOException e) {
        log.error(e.getMessage(), e);
        throw new RuntimeException(e);
    }
}

 

 

  • MyThread 클래스 
    • record.value() 는 kafka에 저장된 메시지이다.
 while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
        for (ConsumerRecord<String, String> record : records) {
            ...
            OkHttpUtils.post(workflowId, record.value(), service, historyId);
        }
}

 

  • OkHttp를 이용하면 이런 식으로  다른 서버 {차장님이 만들고 계신 수신 서버}로 Request를 보내고 Response 까지 받아볼 수 있어서 정말 편리하다. 
    • 이 때, Executor 서버 안에  POST mapping 메서드가 구현되어있어야 그 메서드가 수행된다. 
    • 그 응답이 위 코드의 Response으로 받을 수 있다. 
  • OkHttp는 MSA 방식으로 프로젝트를 설계하는 경우, 회사 내에 다른 서버끼리 Http 요청, 응답을 받기에 편리한 기능이다. 

 

 

  Ex) 네이버 페이지 - GET

  • GET 의 경우는 POST와 다르게 매개변수로 URL만 넣어주면 되므로 아주 간단하다. 
System.out.println(OkHttpUtils.get("http://www.naver.com"));

 

 

  Note) 실행 결과

  • 다음과 같이 네이버 메인 화면을 html 형태로 가져온다. 

 


 Java String <=> PostgreSQL TEXT 

  • PostgreSQL에서 TEXT 형태로 쓰고 싶은 데이터가 있는 경우를 생각해보자. 
  • 먼저 엔티티 안의 필드에 "String 변수명" 이렇게 설정한다. 
  • PostgreSQL 에서 해당 필드를 보면 VARCHAR()로 생성됐을 것이다. 
  • 해당 필드 타입을 콤보 박스에서 TEXT를 선택해주면 자연스럽게 호환이 된다. 
  • Json 객체를 필드로 넣을 때에는 복잡한 애너테이션이 몇 개 붙었지만 이 경우에는 별다른 설정이 필요없다. 

ExecutorService 인터페이스 vs Thread 클래스 차이점

  • 차장님 말씀에 따라서 스레드 관리를 위해 ExecutorService 라는 인터페이스를 이용했다. 
  • 처음 알게 된 인터페이스라서 이와 관련한 다른 라이브러리에 대해서도 알아봤다.  

 

ExecutorService

  • 병렬 작업 여러 개의 작업을 효율적으로 사용하기 위한 라이브러리, Thread Pool을 생성해서 사용할 수 있는 인터페이스이다.
  • 내부적으로 스레드 풀을 관리하고 task를 지정해주면 알아서 Thread Pool을 이용해서 task 실행하고 관리한다.
  • Queue 방식으로 실행 관리한다. 작업 큐를 이용 => 작업 처리를 보장
  • 스레드 관리가 편리하고 안정적으로 실행된다.
  • 스레드 풀을 내부에서 관리하므로 스레드 개수를 효율적으로 관리,  애플리케이션 성능을 최적화
  • 작업 처리, 스레드 관리를 분리하므로 개발자가 작업 처리에 집중할 수 있다. 
  • 작업 상태, 결과를 추적 가능
  • Thread  클래스 보다는 ExecutorService  사용을 권장한다. 

 

Thread

  •  개발자가 직접 스레드를 생성하고 실행해야한다.
  • 개발자가 직접 스레드 개수, 우선순위를 설정하고 스레드 생명주기를 관리해야한다.
  • 스레드 생성하고 종료에 따른 오버헤드가 발생하므로 많은 스레들르 생성하면 성능에 영향을 준다. 
  • start(), sleep(), stop(), interrupt() 같은 메서드가 있다. 

참고) https://velog.io/@cotchan/Executor-Service%EC%99%80-Thread-Pool

 

 

 

ExecutorService  주요 메서드

  • 작업 실행 메서드
    • submit(): task를 할당하고 Future 타입의 객체를 반환한다. 
      • Executors.execute(Runnable)의 기본 메서드를 확장해서 실행 취소, 및 또는 완료 대기에 사용할 수 있는 Future를 반환한다. 
      • (Executor, Future는 아래 내용 참고)
    • execute: submit()와 같은 기능을하지만 반환 타입이 void이다. 
    • invokeAny(), invokeAll(): 대량 실행의 가장 흔한 처리 방식으로 수행하고, 작업 컬렉션을 실행한 다음 하나 이상 또는 모든 작업이 완료될 때까지 대기한다. task를 Collection에 인자로 넣어서 넘겨준다. 
      • invokeAny(): 실행에 성공한 task에 대한 결과만 반환한다. 실패한 것들은 작업 취소된다. 
      • invokeAll(): 모든 task에 대한 반환값을 List<Future>형태로 반환한다. 
  • 상태 확인 메서드
    •  isShudown(): 종료 메서드인 shutdown()이 한 번이라도 호출되었는지 확인한다. 즉, shutdown() 요청이 호출됐는지만 판단한다. 
    • isTerminated(): shutdown 된 다음에 모든 작업이 완료되고 모든 스레드가 종료됐는지를 나타낸다. 
  • 종료 메서드
    • shutdown(): 현재 작업중인 스레드까지만 실행하고 중지한다. (더이상 스레드 풀에 작업 추가 불가능)
    • shudownNow(): 현재 작업 중이던 스레드를 모두 중지한다. 그러나 모두 중지시키는 걸 보장하지는 않으며 중지하지 못한 task를 반환한다. 

참고)  https://oranthy.tistory.com/510

 

cf) 관련 클래스 

 

Executor 

  • 인터페이스 Executor는 함수형 인터페이스이다 . 
  • 메서드 execute() 만 딸랑 하나 제공한다. 
  • Executor는 개발자가 요청한 task를 수행하는 주체이다. 
  • ExecutorService는 Executor를 확장한 인터페이스이면서 스레드 풀 관리하는 메서드까지 제공한다. (shutdown())

 

 

Future

  • 비동기 작업의 결과를 나타내는 인터페이스이다. 
  • 작업이 아직 완료되지 않았을 때, get()를 호출해서 결과를 기다릴 수 있다. 
  • 다른 작업을 수행하면서 결과가 준비 될 때까지 대기할 수 있다. 
  • cancel() 메서드를 사용해서 작업을 취소해서 isDone() 를 사용해서 작업이 완료 됐는지 확인 가능하다. 
  • 작업의 상태와 결과를 추척할 수 있도록 해준다. 
  • 즉, 다수의 작업을 동시에 실행하면서 작업의 진행 상황과 결과를 확인 가능하다. 

 

 

  Ex) Thread 관리하기

  • Executors.newSingleThreadExecutor(): 오직 하나의 스레드로 처리하고 다른 스레드 생성 요청이 오면 현재 스레드가 종료될 때까지 대기한 후에 생성된다.
<hide/>
ExecutorService executorService = null;
for (User user  : availableListenerList) {
    MyThread<Listener> thread = new MyThread<>(user, userService);
    executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            return thread;
        }
    });
    executorService.submit(thread);
}
executorService.shutdown();

 

참고) https://simyeju.tistory.com/m/119


Spring Cloud Eureka(스프링 클라우드 유레카)

  • 유레카는 자가 등록, 동적 발견 및 부하 분산을 담당하여 서비스 디스커버리 패턴을 구현할 수 있도록 도와준다. 
  • Micro Service 들의 정보를 레지스트리에 등록할 수 있도록 하고 Micro Service 들의 동적인 탐색 로드밸런싱을 제공한다. 
  • Eureka는 Eureka Server(서버 컴포넌트)와 Eureka Client(클라이언트 컴포넌트)로 구성된다. 
  • Eureka Server
    • 모든 마이크로 서비스가 자신의 가용성을 등록하는 레지스트리
    •  
    • Eureka Client의 상태 정보가 들어있다. 
    • 유레카 서버로 지정하기 위해  Main 클래스 위에 @EnableEurekaServer를 붙인다. 
  • Eureka Client
    • 등록된 마이크로 서비스를 호출해서 사용할 때 유레카 클라이언트를 이용해서 필요한 서비스를 찾는다.
    • 유레카 클라이언트로 지정하기 위해  Main 클래스 위에 @EnableEurekaClient를 붙인다.

스레드 상태 코드( Thread.getStatus())

  • NEW: 새로 만들어졌지만 시작되지 않은 스레드
  • RUNNABLE: 실행 가능한 스레드, 자원 할당 받기 위해 기다려야할 수도 있다. 
  • BLOCKED: monitor lock을 기다리는 상태
    • monitor lock이 동기화된 영역 안으로 들어갈 때까지 기다린다.  
    • 또는 calling 후에 동기화 영역 안으로 다시 들어가도록 기다리는 상황이다.
  • WAITING: 다른 스레드가 특정 행동을 취할 때까지 기다리는 스레드를 말한다. 
  • TIMED_WAITING: 주어진 시간동안 기다린다. 
  • TERMINATED: 종료

Note

  • overhead(오버헤드): 작업을 수행하기 위해 간접적으로 들어가는 메모리, 시간 등을 말한다.
  • Java에 스레드 관련한 라이브러리 종류가 많으니 관련된 기능을 찾아보고 적용해야겠다. 
  • 컬렉션과 NullPointerException
    • Map을 전역 변수로 선언했는데 null로 선언하고 뒤에서  Map에 값을 넣어줬다. 
    • 그랬더니 오류 발생
    • 컬렉션에 null 을 초기값으로 넣으면 컬렉션에 null을 포함시키기 떄문에 이후에 컬렉션을 사용할 때 null 값을 처리하는 코드를 추가해야한다.  => 만약 null 처리를 누락하면 예외가 터진다. 
    • 다음으로는 컬렉션 내부 구현에서 null 값을 허용하지 않을 수도 있다. 
      • ex) ArrayList의 경우는 내부적으로 배열을 사용해서 요소를 저장한다. 
      • 만약 null을 넣으면 배열의 인덱스를 찾을 때 NullPointerException이 발생할 수 있다. 
    • 따라서, 컬렉션을 초기화할 때 null을 쓰지 않고 빈 컬렉션을 선언하는 게 좋다. 
    • ex) new ArrayList<>(); 
  • Java에서 동기화 이슈를 해결 방법
    • 동기화란? 공유 리소스에 여러 스레드에 대해 제어하는 기능이다.
    • 1) locking 방식: 잠금(lock)을 이용하는 방식이다. 
      • 상호 배제(Mutual Exclusion)를 보장한다. 
        • 상호 배제: 동시에 공유 자원에 접근하는 것을 막기 위해 한 스레드가 점유 중일 때, 다른 스레드가 접근하지 못하도록 하는 것을 뜻한다. 
      • mutex(뮤텍스), semaphoere(세마포어), monitor lock
      • synchronized 키워드를 이용해서 locking 방식을 구현한다. 
      • 공유 자원을 점유 중인 스레드는 lock을 가지고 있다.
      • 문제점: 데드락(deadlock),  스레드간 경쟁으로 인한 성능 저하가 발생한다. 
        • => 최소한의 잠금을 사용한다. 
    • 2) non locking 방식: 한 스레드가 공유 자원에 대한 lock을 획득하지 않고 공유 자원에 접근 가능한 방식이다. 
    • 3) non blocking 방식:  하나의 스레드가 블로킹(대기)하지 않고 다른 스레드와 병행으로 실행되도록 하는 방식이다. 
      • 다른 스레드가 공유자원을 사용해도 현재 스레드의 진행을 방해하지 않는다. 즉, 현재 스레드의 진행을 방해하지 않는다.
  • monitor lock(모니터 락)
    • 멀티스레드 환경에서 동기화를 지원하기 위한 기초적인 장치로 모니터락을 지원한다. 
    • lock(락), intrinsic lock(고유락) 이라고도 한다. 
    • 세마포어 방식의 단점을 보완하여 등장한 것이 바로 모니터 락이다.
    • 개발자는 synchronized 키워드를 이용해서 특정 객체의 고유락을 이용해서 여러 스레드를 동기화시킬 수 있다. 
    • 방법 1) 메서드 영역 전체에 synchronized  키워드를 붙이는 경우
    • 방법 2) 어떤 영역 앞에 synchronized(객체의 참조변수)를 붙이고 {} 로 해당 영역을 감싸준다.

<hide/>
public class Test {
    private int a = 0;
    private int b = 0;
    public void increment() {
        synchronized(this) {
            this.a++;
            this.b++;
        }
    }
}

 

 

 

참고)

https://sunrise-min.tistory.com/entry/%EC%9E%90%EB%B0%94-%EB%8F%99%EA%B8%B0%ED%99%94-SynchronizedMonitor-Atomic-Type

 https://oranthy.tistory.com/483

https://hbase.tistory.com/311


To Do List

  • 멀티스레드 동기화
    • 현재 MyThread 클래스 안에  Listener를 멤버변수로 선언해서 실행 중인 해당 MyThread에 대한 Listener 정보를 알 수 있다. 
    • 그런데 지금 Listener에 대한 PK인ListenerId를 URL에 파라미터로 넣어서 해당 리스너를 수행중인 스레드를 찾아서  스레드 정보를 조회하고 싶다.
    • 다음 주에 해당 내용을 어떻게 구현할지 생각해봐야겠다.