2023/6/26

Java Stream API

Collection 是程式中存放於內部記憶體的資料結構,而 Stream 是一種資料流 pipeline,可以依照需要使用這些內部記憶體的資料。

Stream 的 operation 有兩類:intermediate 或是 terminal,intermediate opertaion 會回傳 stream,故能夠以 chain 的方式依序串接多個 intermediate operation,針對資料進行運算。terminal operation 會於計算後,回傳某個類別的結果。

Stream operation 有循序跟平行化處理兩種,平行化的 operations 能夠在多個 thread 同時運算,能充分運用多核心處理器,加速運算的過程。

Stream 的資料來源是 java.util.Collection,例如 List, Set。Map 無法直接支援,但能分別對 keys, values 或 entries 建立 Stream。

建立 Stream

從 data collections 產生 Stream 的方法

    private static void createStream() {
        // Stream.of
        System.out.println("Stream.of");
        Stream<Integer> stream = Stream.of(1,2,3,4,5,6,7,8,9);
        stream.forEach(p -> System.out.print(p + " "));
        System.out.println("");

        // Stream.of(array)
        System.out.println("Stream.of(array)");
        Stream<Integer> stream2 = Stream.of( new Integer[]{1,2,3,4,5,6,7,8,9} );
        stream2.forEach(p -> System.out.print(p + " "));
        System.out.println("");

        // List.stream()
        System.out.println("List.stream()");
        List<Integer> list = new ArrayList<Integer>();
        for(int i = 1; i< 10; i++){
            list.add(i);
        }
        Stream<Integer> stream3 = list.stream();
        stream3.forEach(p -> System.out.print(p + " "));
        System.out.println("");

        // Stream.generate() or Stream.iterate()
        // 透過 generator function 產生 Stream elements
        // 以 limit() 限制 element 個數
        System.out.println("Stream.generate()");
        Stream<Integer> randomNumbers = Stream
                .generate(() -> (new Random()).nextInt(100));
        randomNumbers.limit(10).forEach(s -> System.out.print(s + " "));
        System.out.println("");

        //  Stream of String chars or tokens
        System.out.println("Stream of String chars");
        IntStream stream5 = "12345_abcdefg".chars();
        stream5.forEach(p -> System.out.print(p + " "));
        System.out.println("");
        System.out.println("Stream of tokens");
        Stream<String> stream6 = Stream.of("A$B$C".split("\\$"));
        stream6.forEach(p -> System.out.print(p + " "));
        System.out.println("");
    }

Stream Collector

透過 Collector 可將 Stream 裡面的 elements 再轉換為 Collection

    private static void streamCollector() {
        // List 轉換為 Stream
        List<Integer> list = new ArrayList<Integer>();
        for(int i = 1; i< 10; i++){
            list.add(i);
        }
        Stream<Integer> stream = list.stream();

        // 用 collect 再將 Stream 轉換為 List
        System.out.println("collect");
        List<Integer> evenNumbersList = stream.filter(i -> i%2 == 0)
                .collect(Collectors.toList());
        System.out.print(evenNumbersList);
        System.out.println("");

        // 用 collect 再將 Stream 轉換為 Array
        System.out.println("toArray");
        Stream<Integer> stream2 = list.stream();
        Integer[] evenNumbersArr = stream2.filter(i -> i%2 == 0).toArray(Integer[]::new);
        for (Integer i : evenNumbersArr) {
            System.out.print(i + " ");
        }
        System.out.println("");
    }

Operations

Intermediate Operations

method description
filter() 條件過濾
map() 對每個 elements 執行某個運算
flatMap() 將多個Stream連接成一個Stream
distinct() 刪除相同的 elements
sorted() 排序
peek() 用在debug
limit() 限制回傳的 elements 數量
skip() 略過前面幾個 elements

java sample

    private static void intermediateOperation() {
        List<String> names = new ArrayList<>();
        names.add("Allen");
        names.add("George");
        names.add("Anderson");
        names.add("Danny");

        // filter 條件過濾
        System.out.println("filter");
        names.stream().filter((s) -> s.startsWith("A"))
                .forEach(n -> System.out.print(n + " "));
        System.out.println("");
        System.out.println("");

        // map  對每個 elements 執行某個運算
        System.out.println("map");
        names.stream().filter((s) -> s.startsWith("A"))
                .map(String::toUpperCase)
                .forEach(n -> System.out.print(n + " "));
        System.out.println("");
        System.out.println("");

        // flatMap  將多個Stream連接成一個Stream
        List<String> names2 = new ArrayList<>();
        names2.add("Jim");
        System.out.println("flatMap");
        List<String> flapMapStream = Stream.of(names, names2).flatMap(u -> u.stream()).collect(Collectors.toList());
        System.out.println(flapMapStream);
        System.out.println("");
        System.out.println("");

        // distinct
        // 刪除相同的 elements
        List<String> names3 = new ArrayList<>();
        names3.add("Jim");
        names3.add("Jimmy");
        names3.add("Jim");
        System.out.println("distinct");
        names3.stream()
                .distinct()
                .forEach(n -> System.out.print(n + " "));
        System.out.println("");
        System.out.println("");

        //sorted  排序
        System.out.println("sorted");
        names.stream()
                .sorted()
                .forEach(n -> System.out.print(n + " "));
        System.out.println("");
        System.out.println("");

        // peek 用在debug
        System.out.println("peek");
        List res = names.stream().filter((s) -> s.startsWith("A"))
                .peek(e -> System.out.println("filter res: " + e))
                .map(String::toUpperCase)
                .peek(e -> System.out.println("map res: " + e))
                .collect(Collectors.toList());
        System.out.println(res);
        System.out.println("");
        System.out.println("");

        // limit
        // 限制回傳的 elements 數量
        System.out.println("limit");
        List res2 = names.stream().filter((s) -> s.startsWith("A"))
                .map(String::toUpperCase)
                .limit(1)
                .collect(Collectors.toList());
        System.out.println(res2);
        System.out.println("");
        System.out.println("");

        // skip
        // 略過前面幾個 elements
        System.out.println("skip");
        List res3 = names.stream().filter((s) -> s.startsWith("A"))
                .map(String::toUpperCase)
                .skip(1)
                .collect(Collectors.toList());
        System.out.println(res3);
        System.out.println("");
        System.out.println("");
    }

Terminal Operations

method description
forEach 對 steam 所有 elements 進行運算
forEachOrdered 會依照原本 stream 元素的順序輸出,平行化 stream 後,順序也會固定
toArray 轉換為 Array
reduce 把一個Stream的所有元素按照聚合函數聚合成一個結果
collect 透過 Collector 可將 Stream 裡面的 elements 再轉換為 Collection
min 找最小元素
max 找最大元素
count 元素數量
anyMatch 檢查是否有任何一個元素,滿足給定的條件
allMatch 檢查是否有所有的元素,都滿足給定的條件
nonMatch 檢查是否沒有任何一個元素,滿足給定的條件
findFirst 回傳 stream 的第一個元素
findAny 回傳 stream 的任一個元素
    private static void terminalOperation() {
        List<String> names = new ArrayList<>();
        names.add("Allen");
        names.add("George");
        names.add("Anderson");
        names.add("Danny");

        // forEach
        System.out.println("forEach");
        names.stream().forEach(n -> System.out.print(n + " "));
        System.out.println("");
        System.out.println("");

        // forEachOrdered
        System.out.println("1. Sequential Stream");
        System.out.println("1.1 forEach 會依照原本的順序輸出");
        names.stream().forEach(n -> System.out.print(n + " "));
        System.out.println("");
        System.out.println("1.2 forEachOrdered 會依照原本的順序輸出");
        names.stream().forEachOrdered(n -> System.out.print(n + " "));
        System.out.println("");
        System.out.println("2. Parallel Stream");
        System.out.println("2.1 forEach 輸出順序不固定");
        names.stream().parallel().forEach(n -> System.out.print(n + " "));
        System.out.println("");
        System.out.println("2.2 forEachOrdered 會依照原本的順序輸出");
        names.stream().parallel().forEachOrdered(n -> System.out.print(n + " "));
        System.out.println("");
        System.out.println("");

        // toArray
        System.out.println("toArray");
        String[] nameArray = names.stream().toArray(String[]::new);
        for( String n: nameArray) {
            System.out.print(n+" ");
        }
        System.out.println("");
        System.out.println("");

        // reduce 傳入 BinaryOperator
        System.out.println("reduce");
        int sum = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).reduce(0, (s, n) -> s + n);
        int mul = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).reduce(1, (m, n) -> m * n);
        System.out.println("累加 sum="+sum);
        System.out.println("連乘 mul="+mul);
        System.out.println("");
        System.out.println("");

        // collect
        System.out.println("collect");
        List<String> filterNames = names.stream().filter((s) -> s.startsWith("A"))
                .collect(Collectors.toList());
        for( String n: filterNames) {
            System.out.print(n+" ");
        }
        System.out.println("");
        System.out.println("");

        // min, max
        System.out.println("min, max");
        Optional<String> minName = names.stream()
                .min(Comparator.comparing(String::valueOf));
        if( minName.isPresent() ) {
            System.out.println("min = "+minName.get());
        }
        Optional<String> maxName = names.stream()
                .max(Comparator.comparing(String::valueOf));
        if( maxName.isPresent() ) {
            System.out.println("max = "+maxName.get());
        }
        System.out.println("");
        System.out.println("");

        // count
        System.out.println("count");
        long count = names.stream().filter((s) -> s.startsWith("A"))
                .count();
        System.out.println("count="+count);
        System.out.println("");
        System.out.println("");

        // anyMatch allMatch noneMatch
        System.out.println("anyMatch allMatch noneMatch");
        boolean anyMatch = names.stream()
                .anyMatch(n -> Character.isUpperCase(n.charAt(0)));
        boolean allMatch = names.stream()
                .allMatch(n -> Character.isUpperCase(n.charAt(0)));
        boolean noneMatch = names.stream()
                .noneMatch(n -> Character.isUpperCase(n.charAt(0)));
        System.out.println("anyMatch="+anyMatch);
        System.out.println("allMatch="+allMatch);
        System.out.println("noneMatch="+noneMatch);
        System.out.println("");
        System.out.println("");

        // findFirst  findAny
        System.out.println("findFirst  findAny");
        Optional<String> findFirst = names.stream().filter((s) -> s.startsWith("A"))
                .findFirst();
        if( findFirst.isPresent() ) {
            System.out.println("findFirst = "+findFirst.get());
        }
        Optional<String> findAny = names.stream().filter((s) -> s.startsWith("A"))
                .findAny();
        if( findAny.isPresent() ) {
            System.out.println("findAny = "+findAny.get());
        }
        System.out.println("");
        System.out.println("");
    }

parallel

parallelStream() 及 stream().parallel() 可將 stream 平行化

    private static void parallel() {
        // List.stream()
        // List.parallelStream()
        System.out.println("List.stream()");
        List<Integer> list = new ArrayList<Integer>();
        for (int i = 1; i < 10; i++) {
            list.add(i);
        }
        Stream<Integer> stream3 = list.stream();
        stream3.forEach(p -> System.out.print(p + " "));
        System.out.println("");
        System.out.println("List.parallelStream()");
        Stream<Integer> stream4 = list.parallelStream();
        stream4.forEach(p -> System.out.print(p + " "));
        System.out.println("");

        // stream().parallel()
        System.out.println("stream().parallel()");
        Stream<Integer> stream5 = list.stream().parallel();
        stream5.forEach(p -> System.out.print(p + " "));
        System.out.println("");
    }

References

[JAVA] Java 8 Streams API

Java Stream API (with Examples) - HowToDoInJava

Understanding Java 8 Streams API - amitph

10 Examples of Stream API in Java 8 - count + filter + map + distinct + collect() Examples | Java67

Java 8 Stream | 菜鸟教程

Difference Between parallelStream() and stream().parallel() in Java | Baeldung

2023/6/19

Java CompletableFuture

CompletableFuture 是 Java 8 在 java.util.concurrent 中新增的非同步處理類別

該類別主要有兩種執行非同步工作的方法:runAsync, supplyAsync,差別是有沒有需要回傳結果,在沒有 executor 的狀況下,會使用預設的 thread pool

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

java sample

    private static void test1() {
        try {
            CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
                System.out.println("supplyAsync START");
                System.out.println("say supplyAsync Hello World");
                return "supplyAsync Hello World";
            });
            System.out.println(hello.get());

            CompletableFuture<Void> world = CompletableFuture.runAsync(() -> {
                System.out.println("runAsync START");
                System.out.println("say runAsync Hello World");
                return ;
            });

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        // console 結果
//        supplyAsync START
//        say supplyAsync Hello World
//        supplyAsync Hello World
//        runAsync START
//        say runAsync Hello World

    }

用 Executors 產生自訂的 thread pool

ExecutorService threadPool = Executors.newFixedThreadPool(10);
CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> System.out.println("runAsync"), threadPool);
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> "supplyAsync", threadPool);

java sample


處理 exception

可在工作處理完成後,取得結果,在處理過程中,遇到 exception 時,可以攔截 exception 並執行特定的工作

相關的 method 是

CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
// catch exception
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

// use your own thread pool
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)

sample code:

// 在工作處理完成後,取得結果,在處理過程中,遇到 exception 時,可以攔截 exception 並執行特定的工作
    private static void test3() {
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
            String doSomething = "Hello World";
            return doSomething;
        }, threadPool);

        supplyAsync.whenCompleteAsync((result, ex) -> {
            System.out.println("---supplyAsync---");
            System.out.println("supplyAsync result: " + result);
            System.out.println("exception: " + ex);
        }, threadPool).exceptionally(ex -> {
            System.out.println("exceptionally: " + ex.getMessage());
            return ex.getMessage();
        }).join();

        CompletableFuture<String> supplyAsyncException = CompletableFuture.supplyAsync(() -> {
            throw new CompletionException(new Exception("throw exception"));
        }, threadPool);

        supplyAsyncException.whenCompleteAsync((result, ex) -> {
            System.out.println("---supplyAsyncException---");
            System.out.println("supplyAsyncException result: " + result);
            System.out.println("exception: " + ex);
        }, threadPool).exceptionally(ex -> {
            System.out.println("exceptionally: " + ex.getMessage());
            return ex.getMessage();
        }).join();
        // 執行結果:
//        ---supplyAsync---
//        supplyAsync result: Hello World
//        exception: null
//        ---supplyAsyncException---
//        supplyAsyncException result: null
//        exception: java.util.concurrent.CompletionException: java.lang.Exception: throw exception
//        exceptionally: java.lang.Exception: throw exception
        threadPool.shutdown();
    }

也可以改用 handle 處理 exception,差別是 handle 可以有回傳值

CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)

// use your own thread pool
CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

sample code:

    private static void test4() {
        ExecutorService threadPool = Executors.newFixedThreadPool(2);
        CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
            throw new CompletionException(new Exception("throw exception"));
        }, threadPool);

        String res = supplyAsync.handle((result, ex) -> (null != ex) ? ex.getMessage() : result).join();
        System.out.println("res: " + res);

        // 結果
//        res: java.lang.Exception: throw exception
        threadPool.shutdown();
    }

資料轉換

可使用 thenApply 進行資料轉換,還有 thenCompose,不需要回傳值的 thenAccept

CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)
CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn)
// use your own thread pool
CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor)
//-----------
CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)

CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)
//-----------
CompletableFuture<Void> thenAccept(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
// use your own thread pool
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

"兩個"獨立 CompletableFuture 要整合結果

當有"兩個"獨立 CompletableFuture 要整合結果時,可使用

thenCombine 可將兩個獨立的 CompletableFuture 執行結果整合在一起

thenAcceptBoth 類似 thenCombine,但不需要回傳值

//-----------
CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
// use your own thread pool
CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn, Executor executor)

///////
CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,  Runnable action)

// use your own thread pool
CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)

java sample

    // 將 supplyAsync1 和 supplyAsync2 結果整合
    public static void test5() {
        ExecutorService threadPool = Executors.newFixedThreadPool(2);
        CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> "supplyAsync 1", threadPool);
        CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> "supplyAsync 2", threadPool);

        String ans = supplyAsync1.thenCombine(supplyAsync2, (result1, result2) -> result1 + ", " + result2).join();
        System.out.println("ans: " + ans);
        threadPool.shutdown();
        // ans: supplyAsync 1, supplyAsync 2

        //-----------
        // 不需要回傳值
        ExecutorService threadPool2 = Executors.newFixedThreadPool(10);
        CompletableFuture<String> supplyAsync21 = CompletableFuture.supplyAsync(() -> "supplyAsync 1", threadPool2);
        CompletableFuture<String> supplyAsync22 = CompletableFuture.supplyAsync(() -> "supplyAsync 2", threadPool2);

        supplyAsync21.thenAcceptBothAsync(supplyAsync22, (result1, result2) -> System.out.println(result1 + ", " + result2), threadPool2).join();
        threadPool2.shutdown();

        // supplyAsync 1, supplyAsync 2
    }

多個 CompletableFuture

如果超過兩個以上的 CompletableFuture,可使用 allOf 等待所有的 CompletableFuture 結果,串接一個 thenRun

    public static void test6() {
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> "supplyAsync 1", threadPool);
        CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> "supplyAsync 2", threadPool);
        CompletableFuture<String> supplyAsync3 = CompletableFuture.supplyAsync(() -> "supplyAsync 3", threadPool);

        CompletableFuture.allOf(supplyAsync1, supplyAsync2, supplyAsync3).thenRun(() -> {
            try {
                StringBuffer ans = new StringBuffer();
                ans.append(supplyAsync1.get()).append(", ")
                        .append(supplyAsync2.get()).append(", ")
                        .append(supplyAsync3.get());
                System.out.println("ans: " + ans.toString());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }).join();

        threadPool.shutdown();
        // ans: supplyAsync 1, supplyAsync 2, supplyAsync 3

    }

只想要其中一個有完成就往下執行,可使用 applyToEither,沒有回傳值的 acceptEither,用在多個 CompletableFuture 的anyOf

CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)

// use your own thread pool
CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)

///////

CompletableFuture<Void>  acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
CompletableFuture<Void>  acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)

// use your own thread pool
CompletableFuture<Void>  acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)


///////
CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

References

Guide To CompletableFuture | Baeldung

Java 9 CompletableFuture API Improvements | Baeldung

JDK8 - CompletableFuture 非同步處理簡介

CompletableFuture

【Java 8 新特性】Java CompletableFuture thenApply()_.thenapply_猫巳的博客-CSDN博客

【JDK8】CompletableFuture 非同步處理

關於 Java CompletableFuture 的用法 | HengLin31

2023/6/5

HttpClient in JDK 11

JDK 8 以前的 HTTP Client 通常是使用第三方函式庫,最常使用的是 Apache HTTPComponents 及 OkHttp。在 JDK 9 以後,標準函式庫裡面也有 HTTP Client 可以使用。

程式包含三個部分:HttpRequest, HttpResponse 及 HttpClient

HttpRequst

  1. URI 就是 request 要發送的目標網址

  2. HTTP Method

    • GET()
    • POST(BodyPublisher body)
    • PUT(BodyPublisher body)
    • DELETE()
  3. HTTP Protcol Version

    可指定這個 http request 的版本,以往常見的是 1.1,目前是 2

  4. Headers

    設定 http request header

  5. Timeout

    設定等待 http response 的 timeout 時間

  6. Request Body

    如果是 POST, PUT, DELETE method,需要再增加 body content,對應的 content

Request Body

可使用以下這些 API 實作 body content

  1. StringProcessor

    從 String 產生 body,以 HttpRequest.BodyPublishers.ofString 產生

    HttpRequest request = HttpRequest.newBuilder()
     .uri(new URI("https://postman-echo.com/post"))
     .headers("Content-Type", "text/plain;charset=UTF-8")
     .POST(HttpRequest.BodyPublishers.ofString("Sample request body"))
     .build();
  2. IputStreamProcessor

    從 InputSteam 產生 body,以 HttpRequest.BodyPublishers.ofInputStream 產生

    byte[] sampleData = "Sample request body".getBytes();
    HttpRequest request = HttpRequest.newBuilder()
           .uri(new URI("https://postman-echo.com/post"))
           .headers("Content-Type", "text/plain;charset=UTF-8")
           .POST(HttpRequest.BodyPublishers
                   .ofInputStream(() -> new ByteArrayInputStream(sampleData)))
           .build();
  3. ByteArrayProcessor

    從 ByteArray 產生 Body,以 HttpRequest.BodyPublishers.ofByteArray 產生

    byte[] sampleData = "Sample request body".getBytes();
    HttpRequest request = HttpRequest.newBuilder()
           .uri(new URI("https://postman-echo.com/post"))
           .headers("Content-Type", "text/plain;charset=UTF-8")
           .POST(HttpRequest.BodyPublishers.ofByteArray(sampleData))
           .build();
  4. FileProcessor

    從某個路徑的檔案產生 body,以 HttpRequest.BodyPublishers.ofFile 產生

    HttpRequest request = HttpRequest.newBuilder()
           .uri(new URI("https://postman-echo.com/post"))
           .headers("Content-Type", "text/plain;charset=UTF-8")
           .POST(HttpRequest.BodyPublishers.ofFile(
                   Paths.get("sample.txt")))
           .build();
  5. noBody

    如果沒有 body content,可使用 HttpRequest.BodyPublishers.**noBody()

    HttpRequest request = HttpRequest.newBuilder()
     .uri(new URI("https://postman-echo.com/post"))
     .POST(HttpRequest.BodyPublishers.noBody())
     .build();

HttpClient

  • 透過 HttpClient.newBuilder() 或是 HttpClient.newHttpClient() 產生 instance

  • 透過 Handler 處理 response body

    BodyHandlers.ofByteArray
    BodyHandlers.ofString
    BodyHandlers.ofFile
    BodyHandlers.discarding
    BodyHandlers.replacing
    BodyHandlers.ofLines
    BodyHandlers.fromLineSubscriber
    
    // jdk 11 以前
    HttpResponse<String> response = client.send(request, HttpResponse.BodyHandler.asString());
    
    // 新的 jdk
    HttpResponse<String> response = client.send(request, BodyHandlers.ofString());
  • Proxy

    可定義 proxy

    HttpResponse<String> response = HttpClient
      .newBuilder()
      .proxy(ProxySelector.getDefault())
      .build()
      .send(request, BodyHandlers.ofString());
  • Direct Policy

    如果 reponse 收到 3XX 的 redirect 結果,可設定 redirect policy 直接轉址

    HttpResponse<String> response = HttpClient.newBuilder()
      .followRedirects(HttpClient.Redirect.ALWAYS)
      .build()
      .send(request, BodyHandlers.ofString());
  • HTTP Authentication

    HttpResponse<String> response = HttpClient.newBuilder()
      .authenticator(new Authenticator() {
        @Override
        protected PasswordAuthentication getPasswordAuthentication() {
          return new PasswordAuthentication(
            "username", 
            "password".toCharArray());
        }
    }).build()
      .send(request, BodyHandlers.ofString());
  • Cookie

    // 透過 cookieHandler(CookieHandler cookieHandler)  定義 CookieHandler
    // 設定不接受 cookie
    HttpClient.newBuilder()
      .cookieHandler(new CookieManager(null, CookiePolicy.ACCEPT_NONE))
      .build();
    
    // 取得 CookieStore
    ((CookieManager) httpClient.cookieHandler().get()).getCookieStore()
  • SSL Context

    在 HttpClient 指定 SSL Context,忽略 ssl key 檢查

    private static SSLContext disabledSSLContext() throws KeyManagementException, NoSuchAlgorithmException {
            SSLContext sslContext = SSLContext.getInstance("TLS");
            // https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#sslcontext-algorithms
            sslContext.init(
                    null,
                new TrustManager[] {
                    new X509TrustManager() {
                        public X509Certificate[] getAcceptedIssuers() {
                            return null;
                        }
    
                        public void checkClientTrusted(X509Certificate[] certs, String authType) {
                        }
    
                        public void checkServerTrusted(X509Certificate[] certs, String authType) {
                        }
                    }
                },
                new SecureRandom()
            );
            return sslContext;
        }

Sync or Async 同步或是非同步呼叫

HttpClient 有同步或非同步的發送 request 的方式

  • send 同步

    程式會停在這邊,等待 response 或是 timeout,下一行,可直接取得 response body

    HttpResponse<String> response = HttpClient.newBuilder()
      .build()
      .send(request, BodyHandlers.ofString());
  • sendAsync 非同步 non-blocking

    CompletableFuture<HttpResponse<String>> response = HttpClient.newBuilder()
      .build()
      .sendAsync(request, HttpResponse.BodyHandlers.ofString());
  • 可指定 Executor 限制 threads

    預設是使用 java.util.concurrent.Executors.newCachedThreadPool()

    ExecutorService executorService = Executors.newFixedThreadPool(2);
    
    CompletableFuture<HttpResponse<String>> response1 = HttpClient.newBuilder()
      .executor(executorService)
      .build()
      .sendAsync(request, HttpResponse.BodyHandlers.ofString());
    
    CompletableFuture<HttpResponse<String>> response2 = HttpClient.newBuilder()
      .executor(executorService)
      .build()
      .sendAsync(request, HttpResponse.BodyHandlers.ofString());

HttpResponse

  • URI

    HttpResponse 也有一個 uri() method,可取得 uri,因為有時候會遇到 redirect uri 的回應,因此 response 的 uri,會取得 redirect 後的網址

  • Response Header

    取得 response header list

    HttpResponse<String> response = HttpClient.newHttpClient()
      .send(request, HttpResponse.BodyHandlers.ofString());
    HttpHeaders responseHeaders = response.headers();
  • Http Version

    server 是以哪一個 http version 回應的

    response.version();
  • Response Body

    String body = response.body();

完整 Java Code

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpHeaders;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.file.Paths;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class HttpClientTest {

    public static void main(String[] args) throws Exception {

        // 建立HttpClient實例
        HttpClient httpClient = HttpClient.newBuilder()
                .version(HttpClient.Version.HTTP_1_1) // http 1.1
                .connectTimeout(Duration.ofSeconds(5)) // timeout after 5 seconds
                .sslContext(disabledSSLContext()) // disable SSL verify
                .build();

        // 建立 HttpRequest請求
//        HttpRequest request = getMethod();
//        HttpRequest request = postNoBody();
        HttpRequest request = postStringBody();
//        HttpRequest request = postInputStreamBody();
//        HttpRequest request = postByteArrayBody();
//        HttpRequest request = postFileBody();

        // 錯誤的 URI 測試 Timeout
//        HttpRequest request = postStringBody_InvalidUri();

        // Sync 同步呼叫
//        sync(httpClient, request);
        // 非同步呼叫
        async(httpClient, request);

    }

    private static void async(HttpClient httpClient, HttpRequest request) throws InterruptedException, java.util.concurrent.ExecutionException, java.util.concurrent.TimeoutException {
        // 非同步
        CompletableFuture<HttpResponse<String>> response = httpClient
                .sendAsync(request, HttpResponse.BodyHandlers.ofString());
        String result = response
                .thenApply(HttpResponse::body)
                .exceptionally(t -> {
                    t.printStackTrace();
                    return "fallback";
                })
                .get(10, TimeUnit.SECONDS);
        System.out.println(result);
    }

    private static void sync(HttpClient httpClient, HttpRequest request) throws java.io.IOException, InterruptedException {
        // 發送請求並接收回應
        HttpResponse<String> response = httpClient.send(
                request, HttpResponse.BodyHandlers.ofString());

        HttpClient.Version version = response.version();
        System.out.println("---response version---");
        System.out.println(version);

        System.out.println("---response headers---");
        HttpHeaders responseHeaders = response.headers();
        Map<String, List<String>> responseHeadersMap = responseHeaders.map();
        for (String key : responseHeadersMap.keySet()) {
            System.out.println(key + ":" + responseHeadersMap.get(key));
        }

        // 取得回應主體內容
        String body = response.body();
        System.out.println("---response body---");
        System.out.println(body);
    }

    private static HttpRequest getMethod() {
        // 臺灣證券交易所0056個股日成交資訊API
        String url = "https://www.twse.com.tw/exchangeReport/STOCK_DAY?response=json&date=20230531&stockNo=0056";

        // 建立 HttpRequest請求  get method
        HttpRequest request = HttpRequest.newBuilder()
                .uri(URI.create(url))
                .version(HttpClient.Version.HTTP_2)
                .header("cache-control", "no-cache")
                .GET()
                .build();
        return request;
    }

    private static HttpRequest postNoBody() throws URISyntaxException {
        HttpRequest request = HttpRequest.newBuilder()
                .uri(new URI("https://postman-echo.com/post"))
                .POST(HttpRequest.BodyPublishers.noBody())
                .build();
        return request;
    }

    private static HttpRequest postStringBody() throws URISyntaxException {
        HttpRequest request = HttpRequest.newBuilder()
                .uri(new URI("https://postman-echo.com/post"))
                .headers("Content-Type", "text/plain;charset=UTF-8")
                .POST(HttpRequest.BodyPublishers.ofString("Sample request body"))
                .build();
        return request;
    }

    private static HttpRequest postInputStreamBody() throws URISyntaxException {
        byte[] sampleData = "Sample request body".getBytes();
        HttpRequest request = HttpRequest.newBuilder()
                .uri(new URI("https://postman-echo.com/post"))
                .headers("Content-Type", "text/plain;charset=UTF-8")
                .POST(HttpRequest.BodyPublishers
                        .ofInputStream(() -> new ByteArrayInputStream(sampleData)))
                .build();
        return request;
    }

    private static HttpRequest postByteArrayBody() throws URISyntaxException {
        byte[] sampleData = "Sample request body".getBytes();
        HttpRequest request = HttpRequest.newBuilder()
                .uri(new URI("https://postman-echo.com/post"))
                .headers("Content-Type", "text/plain;charset=UTF-8")
                .POST(HttpRequest.BodyPublishers.ofByteArray(sampleData))
                .build();
        return request;
    }

    private static HttpRequest postFileBody() throws URISyntaxException, FileNotFoundException {
        HttpRequest request = HttpRequest.newBuilder()
                .uri(new URI("https://postman-echo.com/post"))
                .headers("Content-Type", "text/plain;charset=UTF-8")
                .POST(HttpRequest.BodyPublishers.ofFile(
                        Paths.get("sample.txt")))
                .build();
        return request;
    }

    private static HttpRequest postStringBody_InvalidUri() throws URISyntaxException {
        HttpRequest request = HttpRequest.newBuilder()
                .uri(new URI("https://test.com/post"))
                .headers("Content-Type", "text/plain;charset=UTF-8")
                .POST(HttpRequest.BodyPublishers.ofString("Sample request body"))
                .build();
        return request;
    }

    private static SSLContext disabledSSLContext() throws KeyManagementException, NoSuchAlgorithmException {
        SSLContext sslContext = SSLContext.getInstance("TLS");
        // https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#sslcontext-algorithms
        sslContext.init(
                null,
            new TrustManager[] {
                new X509TrustManager() {
                    public X509Certificate[] getAcceptedIssuers() {
                        return null;
                    }

                    public void checkClientTrusted(X509Certificate[] certs, String authType) {
                    }

                    public void checkServerTrusted(X509Certificate[] certs, String authType) {
                    }
                }
            },
            new SecureRandom()
        );
        return sslContext;
    }

}

References

菜鳥工程師 肉豬: Java 11 HttpClient發送請求範例

Exploring the New HTTP Client in Java | Baeldung