2025/1/6

Netty in Java 1

Netty是一個 non-blokcing I/O socket framework,主要用於開發網路應用程式。非同步事件驅動的框架和工具可簡化程式開發。Netty最初由JBoss開發,現在由Netty項目社區開發和維護。Netty還支援了HTTP、HTTP2、DNS及其他協定,WebSockets、Google Protocol Buffers、支援 SSL/TLS 以及支援用於SPDY協定和訊息壓縮。

早期 Java Socket framework 會使用 Mina 這個 library,但因為 netty 跟 mina 作者相同,且 mina 已經很久都沒有在維護了,所以目前大部分的文章都建議要使用 netty。不過討論中有提到一個最大的不同點,在 UDP 的處理部分,mina 跟 netty 有不同的作法,mina 有高階的封裝,可讓 connection less 的 UDP 連線,使用起來很像有連線的狀況,netty 是比較貼近原本的 UDP,保持了 connection less 的特性。

另外在使用 netty 之前,要注意使用了哪一個版本的 netty。根據 Remove master branch · Issue #4466 · netty/netty · GitHub 的討論,由於 netty 5 開發時,發現新的作法增加了城市的複雜度,但卻沒有帶來明顯的效能提升,所以 netty 5 目前是被放棄的狀態,建議還是要使用 4.1 版,4.1 版的 user guide 在 Netty.docs: User guide for 4.x

netty 的 libary 切割為以下這些部分

Core 是核心,Protocol Support 是在 socket 的上層的通訊協定,Transport Support 則是資料傳輸,也就是 Socket/Datagram、HTTP Tunnel 或 In-VM Pipe,這幾個都是實際傳輸資料的實作。

maven

使用netty最簡單的方式是引用所有 netty 的 libary,可引用 netty-all

<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.111.Final</version>
</dependency>

DISCARD

網路協定最簡單的是 DISCARD,server side 不管 client 送什麼資料,都會直接丟棄,不做任何回應。

DiscardServer.java

package netty.discard;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * Discards any incoming data.
 */
public class DiscardServer {

    private int port;

    public DiscardServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        // NioEventLoopGroup 是 multithread event loop,處理 IO operation
        // 第一個 NioEventLoopGroup 處理 incoming connection,稱為 boss
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 第二個 NioEventLoopGroup 處理已接收連線的 traffic,稱為 worker
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // ServerBootstrap 是 helper class,可設定 server
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) // 使用 NioServerSocketChannel 接受 incoming connection
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        // ChannelInitializer 用來設定新的 Channel
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new DiscardServerHandler());
                        }
                    })
                    // 設定 ChannelOption, ChannelConfig 可取得所有可設定的參數
                    // option 是設定 NioServerSocketChannel 的參數
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // childOption 是設定 NioSocketChannel 的參數
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // Bind and start to accept incoming connections.
            // 綁定 TCP Port
            ChannelFuture f = b.bind(port).sync();

            // 這邊開始會等待所有 server socket 關閉
            // 但這個例子不會發生這種狀況
            // 如果要以正常方式關閉 server,可呼叫以下 method
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new DiscardServer(port).run();
    }
}

DiscardServerHandler.java

package netty.discard;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

// ChannelInboundHandlerAdapter 實作了 ChannelInboundHandler 介面所有 methods
// DiscardServerHandler 只需要 繼承 ChannelInboundHandlerAdapter
// 就可以只 override 必要的 methods
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {

    // channelRead 會在收到 message 時被呼叫
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // Discard the received data silently.
//        ((ByteBuf) msg).release(); // msg 必須要呼叫 release 釋放記憶體
//
//        channelRead 通常會用以下的方式實作內容
//        try {
//            // Do something with msg
//        } finally {
//            ReferenceCountUtil.release(msg);
//        }
        ByteBuf in = (ByteBuf) msg;
        try {
            while (in.isReadable()) {
                System.out.print((char) in.readByte());
                System.out.flush();
            }
        } finally {
            ReferenceCountUtil.release(msg);
        }

        // 上面的 while loop 可替代為以下寫法
        System.out.print(in.toString(io.netty.util.CharsetUtil.US_ASCII));
        in.release();
    }

//    // 上面的 while loop 可替代為以下寫法
//    @Override
//    public void channelRead(ChannelHandlerContext ctx, Object msg) {
//        ByteBuf in = (ByteBuf) msg;
//        System.out.print(in.toString(io.netty.util.CharsetUtil.US_ASCII));
//        in.release();
//    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        // 發生 IO error 或 handler 實作有問題時,會產生 exception,由這個 method 處理
        cause.printStackTrace();
        ctx.close();
    }
}

測試

啟動 DiscardServer 後,可用 telnet/nc 測試

# 等同 telnet 127.0.0.1 8080
nc -nvv 127.0.0.1 8080

ECHO

echo 協定基於 discard 做一些修改,在 echo server 收到 client 發送的資料後,會直接將收到的資料回傳給 client。

EchoServer.java

package netty.echo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class EchoServer {

    private int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        // NioEventLoopGroup 是 multithread event loop,處理 IO operation
        // 第一個 NioEventLoopGroup 處理 incoming connection,稱為 boss
        // 第二個 NioEventLoopGroup 處理已接收連線的 traffic,稱為 worker
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // ServerBootstrap 是 helper class,可設定 server
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)  // 使用 NioServerSocketChannel 接受 incoming connection
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoServerHandler());
                        }
                    })
                    // 設定 ChannelOption, ChannelConfig 可取得所有可設定的參數
                    // option 是設定 NioServerSocketChannel 的參數
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // childOption 是設定 NioSocketChannel 的參數
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // Bind and start to accept incoming connections.
            // 綁定 TCP Port
            ChannelFuture f = b.bind(port).sync(); // (7)

            // 這邊開始會等待所有 server socket 關閉
            // 但這個例子不會發生這種狀況
            // 如果要以正常方式關閉 server,可呼叫以下 method
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new EchoServer(port).run();
    }
}

EchoServerHandler.java

package netty.echo;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

/**
 * Handles a server-side channel.
 */
public class EchoServerHandler extends ChannelInboundHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // ChannelHandlerContext ctx 可驅動 I/O
        // 這邊將收到的 msg 透過 ctx 寫回 channel,但不需要 release msg
        // 因為 netty 會在寫入時,自動 release msg
        ctx.write(msg);
        ctx.flush();
        // ctx.write 不是直接寫到網路上,而是先放到 buffer,然後再 flush
    }

//    // 可改用 writeAndFlush
//    @Override
//    public void channelRead(ChannelHandlerContext ctx, Object msg) {
//        ctx.writeAndFlush(msg);
//    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

測試

一樣用 nc 去測試

nc -nvv 127.0.0.1 8080

TIME

TIME protocol 的 server 會在 client 連線後發送一個 32 bits 整數,然後就直接關閉連線。

因為 server 必須忽略 client 發送的所有資料,所以不能用 channelRead() 要改用 channelActive()

TimeServer.java

package netty.time;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class TimeServer {

    private int port;

    public TimeServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        // NioEventLoopGroup 是 multithread event loop,處理 IO operation
        // 第一個 NioEventLoopGroup 處理 incoming connection,稱為 boss
        // 第二個 NioEventLoopGroup 處理已接收連線的 traffic,稱為 worker
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // ServerBootstrap 是 helper class,可設定 server
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)  // 使用 NioServerSocketChannel 接受 incoming connection
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new TimeServerHandler());
                        }
                    })
                    // 設定 ChannelOption, ChannelConfig 可取得所有可設定的參數
                    // option 是設定 NioServerSocketChannel 的參數
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // childOption 是設定 NioSocketChannel 的參數
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // Bind and start to accept incoming connections.
            // 綁定 TCP Port
            ChannelFuture f = b.bind(port).sync(); // (7)

            // 這邊開始會等待所有 server socket 關閉
            // 但這個例子不會發生這種狀況
            // 如果要以正常方式關閉 server,可呼叫以下 method
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new TimeServer(port).run();
    }
}

TimeServerHandler.java

package netty.time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) {
        // channelActive 會在 connection 建立後,就被呼叫
        // 先取得一塊 32 bits integer (4 bytes) 的 ByteBuf,然後將現在的時間填進去
        final ByteBuf time = ctx.alloc().buffer(4);
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

        // 這邊不需要跟 NIO 的 ByteBuffer 一樣,在發送前,呼叫 java.nio.ByteBuffer.flip()
        // 因為 ByteBuf 已經自動處理了 read/write,會使用兩種不同的 pointer (index)
        final ChannelFuture f = ctx.writeAndFlush(time);
        // ChannelHandlerContext.write() 跟 writeAndFlush 會回傳 ChannelFuture
        // 代表 Netty 的 IO operation 是非同步的
        // 必須用非同步的方式,確認 future 已經完成,才能將 context 關閉
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        });  // 這邊可以簡化為這種寫法
        // f.addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

TimeClient.java

package netty.time;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {
    public static void main(String[] args) throws Exception {

        String host = "localhost";
        int port = 8080;
        if (args.length > 0) {
            host = args[0];
            port = Integer.parseInt(args[1]);
        }
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // client 要使用 Bootstrap,跟 ServerBootstrap 不同
            Bootstrap b = new Bootstrap();
            // client 只需要一個 worker EventLoopGroup
            b.group(workerGroup);
            // client 要改用 NioSocketChannel,跟 NioServerSocketChannel 不同
            b.channel(NioSocketChannel.class);
            // 不要使用 childOption
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });

            // Start the client.
            // 呼叫 connect 而不是 bind
            ChannelFuture f = b.connect(host, port).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

TimeClientHandler.java

package netty.time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 收到的資料是 ByteBuf
        ByteBuf m = (ByteBuf) msg;
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

References

Netty.docs: User guide for 4.x

netty/example/src/main/java/io/netty/example at 4.1 · netty/netty · GitHub

2024/12/23

Mockito: mock static final method

以下記錄 Mockito 在處理 static method,以及 final class/method 的方法。

準備

先準備一個要被 mock 的類別

import java.util.UUID;

public class DataUtils {
    public static String getUuid() {
//        return UUID.randomUUID().toString();
        return "UUID";
    }

    public static String concat(String a, String b) {
        return a+":"+b;
    }

    public final int finalMethod() {
        return 1;
    }
    public String multiply(String a, int times) {
        return a.repeat( times );
    }
}

mock static method

  • 沒有參數的 static method

Mockito.mockStatic(Class classToMock)

注意 MockedStatic 的 scope 問題,必須要放在 try-with-resources 裡面

    @Test
    // mock 沒有參數的 static method 的方法
    public void mock_test1() {
        assertEquals("UUID", DataUtils.getUuid());

        // Mockito.mockStatic(Class<T> classToMock)
        // 注意 MockedStatic 的 scope 問題,必須要放在 try-with-resources 裡面
        // MockedStatic 必須要在 block 結束時,自動回收
        try (MockedStatic<DataUtils> utils = mockStatic(DataUtils.class)) {
            utils.when(DataUtils::getUuid).thenReturn("Custom UUID");
            assertEquals("Custom UUID", DataUtils.getUuid());
        }

        assertEquals("UUID", DataUtils.getUuid());
    }
  • mock 有參數的 static method
    @Test
    // mock 有參數的 static method
    public void mock_test2() {
        assertEquals("A:B", DataUtils.concat("A", "B"));

        // Mockito.mockStatic(Class<T> classToMock)
        try (MockedStatic<DataUtils> utils = mockStatic(DataUtils.class)) {
            utils.when(() -> DataUtils.concat("A", "B"))
                    .thenReturn( "A-B" );
            assertEquals("A-B", DataUtils.concat("A", "B"));
        }

        assertEquals("A:B", DataUtils.concat("A", "B"));
    }
  • MockitoException

static mocking is already registered in the current thread 的 exception

    @Test
    public void mock_test3() {
        // 將 utils 再一次 建立一個 mockStatic 物件時,會發生 exception
//        org.mockito.exceptions.base.MockitoException:
//        For mock.DataUtils, static mocking is already registered in the current thread
//        To create a new mock, the existing static mock registration must be deregistered
        assertThrows(MockitoException.class, () -> {
            MockedStatic<DataUtils> utils = mockStatic(DataUtils.class);
            utils = mockStatic(DataUtils.class);
        });
    }
  • JUnit @Before @After

利用 JUnit 的 @Before @After,在每一次執行 @Test 的前後,處理 MockedStatic

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

import static org.junit.Assert.*;
import static org.mockito.Mockito.mockStatic;

public class DataUtilsMockitoTest2 {
    private MockedStatic<DataUtils> mockStatic;

    // @Before 會在每一次 @Test 之前被執行
    @Before
    public void before() {
        // Registering a static mock for UserService before each test
        mockStatic = mockStatic(DataUtils.class);
    }

    // @After 會在每一次 @Test 之後被執行
    @After
    public void after() {
        // Closing the mockStatic after each test
        mockStatic.close();
    }

    @Test
    public void mock_test4_1() {
        assertTrue(Mockito.mockingDetails(DataUtils.class).isMock());
        mockStatic.when(() -> DataUtils.concat("A", "B"))
                .thenReturn( "A-B" );
        assertEquals("A-B", DataUtils.concat("A", "B"));
    }

    @Test
    public void mock_test4_2() {
        assertTrue(Mockito.mockingDetails(DataUtils.class).isMock());
        mockStatic.when(() -> DataUtils.concat("A", "B"))
                .thenReturn( "A*B" );
        assertEquals("A*B", DataUtils.concat("A", "B"));
    }
}

mock final class/method

先準備一個 final class

public final class DataUtilsFinal extends DataUtils {
    public static String concat(String a, String b) {
        return a+":"+b;
    }

    public String multiply(String a, int times) {
        return a.repeat( times-1 );
    }
}
  • final method
    @Test
    public void mock_test1() {
        // 直接用 mock 就可以使用 finalMethod
        DataUtils dataUtils = new DataUtils();
        DataUtils dataUtilsMock = mock(DataUtils.class);
        when(dataUtilsMock.finalMethod()).thenReturn(0);

        assertEquals(1, dataUtils.finalMethod());
        assertEquals(0, dataUtilsMock.finalMethod());
    }
  • final class
    @Test
    public void mock_test2() {
        // final class
        DataUtilsFinal mock = mock(DataUtilsFinal.class);
        when(mock.multiply("a", 2)).thenReturn("a");

        assertEquals("a", mock.multiply("a", 2));
    }

References

Mocking Static Methods With Mockito | Baeldung

Mock Final Classes and Methods with Mockito | Baeldung

2024/12/9

Mockito 簡介

在做 Java 專案測試時,常見的方法是使用 JUnit 測試框架。但在實際專案測試時,最常遇到的問題是,準備測試資料。專案程式通常都會連接資料庫,然後透過商業邏輯或一些運算,再將結果存回資料庫。

這在測試時就會遇到很多問題,例如測試案例需要某些特殊的使用者在某個特別的狀態,例如未付款的狀態,然後程式經過了某些付款程序,讓資料變成已付款狀態,又存回資料庫,更新了這個使用者的狀態。這裡遇到的問題是,沒辦法一直產生未付款狀態的使用者,這就導致每一次要測試付款程序時,就要重新準備一次測試資料。

如果該資料跟其他的測試有相依性,又讓這個問題更複雜了。

在使用 JUnit 時,常常會遇到測試的對象裡面因為包含了其他類別的物件,需要先準備/建立這些前置的物件,才能真正地去測試現在想要測試的對象。例如專案中的商業邏輯程式需要使用到資料庫的連線,就必須確實準備一個資料庫,並建立該 DAO 物件,然後才能進行測試。

這跟剛剛提到的測試資料是類似的問題,就是外部資料相依性問題。

假設目前的類別關係如下

flowchart LR
    A --> B --> D & E
    A --> C

因為 A 相依於 B 與 C,故這時候需要製作 B 與 C 的 mock 物件,用來作 A 的測試

flowchart LR
    A --> B[mock of B]
    A --> C[mock of C]

# TDD/BDD and Test Double 這篇文章提到 unit test 必須要

  • 是最小的測試單位

  • 一個案例只能測試一個方法

  • 測試案例之間沒有相依性

  • 沒有外部相依性

  • 不具備邏輯

Mockito 就是用來輔助 JUnit,製作 mock 物件,也就是達成上面所說的沒有外部相依性,也可以讓測試案例之間沒有相依性。Mock 模擬對象 是一種模擬真實對象行為的假的物件,這個假物件可以用來測試其他程式的行為。

單元測試之 mock/stub/spy/fake ? 這邊提到了幾個名詞的差異

  • mock

    模擬的假物件,可讓程式使用 mock 物件驗證商業邏輯或是互動是否正確。mock 有可能會造成測試失敗

  • stub

    也是假物件,但有點替身的意思,跟 mock 類似,是取代真實物件的假物件,使用時,該替身不會造成測試失敗

  • fake

    完全不做事情的假物件,測試僅僅會經過這些物件,但不會做任何驗證,不會造成測試失敗,也就是 stub 的意思

  • dummy

    空物件,只用來填補缺少的參數,或是其他已經測試完成的物件,測試僅僅會經過這些物件,但不會做任何驗證,不會造成測試失敗,也就是 stub 的意思

  • spy

    通常 mock 是製作整個假物件,而 spy 只會偽造類別裡面的某些 method,如果針對該偽造的方法有做驗證測試,就將 spy 視為 mock。如果沒有驗證,那就視為 stub

Mockito

使用 mockito 基本需要了解這三個部分

  • mock

    static method,用來產生 mock 物件

  • when/then

    為剛剛用 mock 產生的物件,自訂其行為,也就是自訂某些 method 回傳的資料

  • verify

    用來檢查 mock 物件的使用狀況

測試準備

測試前,先製作一個要被 mock 的類別

public class DataDAO {
    public String getDataById(String id) {
        return id;
    }

    public int getDataSize() {
        return 0;
    }

    public boolean add(String data) {
        return true;
    }

    public void clear() {
    }
}

引用 libary

        <!-- https://mvnrepository.com/artifact/org.mockito/mockito-core -->
        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-core</artifactId>
            <version>5.12.0</version>
            <scope>test</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.mockito/mockito-inline -->
        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-inline</artifactId>
            <version>5.2.0</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.junit.vintage</groupId>
            <artifactId>junit-vintage-engine</artifactId>
            <version>5.9.1</version>
            <scope>compile</scope>
        </dependency>

mock

  • 用類別定義產生 mock object

    public static <T> T mock(Class<T> classToMock)

  • 用類別名稱產生 mock object 後,指定這個 mock object 的名稱

    public static <T> T mock(Class<T> classToMock, String name)

  • 產生 mock object,自訂 Answer

    public static <T> T mock(Class<T> classToMock, Answer defaultAnswer)

  • 產生 MockSettings 自訂 Answer

    private String randomId() {
        return UUID.randomUUID().toString();
    }
    @Test
    public void mock_test1() {
        // public static <T> T mock(Class<T> classToMock)
        // 用類別定義產生 mock object

        // 利用 mock 產生 DataDAO 的 mock object
        DataDAO dataDAOmock = mock(DataDAO.class);
        // 當透過這個 mock object 呼叫 add method 時,永遠回傳 false
        when(dataDAOmock.add(anyString())).thenReturn(false);

        boolean added = dataDAOmock.add( randomId() );

        // verify 可檢查是否有呼叫 add method
        verify(dataDAOmock).add(anyString());
        // 以 JUnit 檢查 add method 的 return value
        assertFalse(added);
    }

    @Test
    public void mock_test2_name() {
        // public static <T> T mock(Class<T> classToMock, String name)
        // 用類別名稱產生 mock object 後,指定這個 mock object 的名稱

        // 利用 mock 產生 DataDAO 的 mock object
        DataDAO dataDAOmock = mock(DataDAO.class, "test2DataDAOMock");
        // 當透過這個 mock object 呼叫 add method 時,永遠回傳 false
        when(dataDAOmock.add(anyString())).thenReturn(false);

        boolean added = dataDAOmock.add( randomId() );

        TooFewActualInvocations exception = assertThrows(TooFewActualInvocations.class, () -> {
            verify(dataDAOmock, times(2)).add(anyString());
        });
//        exception.printStackTrace();
//        rg.mockito.exceptions.verification.TooFewActualInvocations:
//        test2DataDAOMock.add(<any string>);
//        Wanted 2 times:
//-> at mock.DataDAO.add(DataDAO.java:13)
//        But was 1 time:
//-> at mock.DataDAOMockitoTest.test2(DataDAOMockitoTest.java:41)
//        ......
        assertTrue(exception.getMessage().contains("test2DataDAOMock.add"));
    }

    static class CustomAnswer implements Answer<Boolean> {
        @Override
        public Boolean answer(InvocationOnMock invocation) throws Throwable {
            return false;
        }
    }
    @Test
    public void mock_test3_answer() {
        // public static <T> T mock(Class<T> classToMock, Answer defaultAnswer)
        // 產生 mock object,自訂 Answer
        DataDAO dataDAOmock = mock(DataDAO.class, new CustomAnswer());

        boolean added = dataDAOmock.add( randomId() );

        verify(dataDAOmock).add(anyString());
        assertFalse(added);
    }

    @Test
    public void mock_test4_MockSettings() {
        // 產生 MockSettings 自訂 Answer
        MockSettings customSettings = withSettings().defaultAnswer(new CustomAnswer());

        DataDAO dataDAOmock = mock(DataDAO.class, customSettings);
        boolean added = dataDAOmock.add( randomId() );
        verify(dataDAOmock).add(anyString());
        assertFalse(added);
    }

when/then

  • 當透過這個 mock object 呼叫 method 時,永遠回傳某個值

  • 用 doReturn 方式設定 return 的結果

  • 設定 method 呼叫時,會 throw Excpetion

  • 設定 void return 的 method,會 throw Exception

  • 設定 method 多次呼叫時,有不同的 return 結果

  • 設定 spy 的行為,spy 是對一部分的 method 做 mock

  • 設定呼叫 mock 的某個 method 要呼叫真實的物件的 method

  • 自訂 Answer

    @Test
    public void when_test1() {
        // 利用 mock 產生 DataDAO 的 mock object
        DataDAO dataDAOmock = mock(DataDAO.class);
        // 當透過這個 mock object 呼叫 add method 時,永遠回傳 false
        when(dataDAOmock.add(anyString())).thenReturn(false);

        boolean added = dataDAOmock.add( randomId() );
        assertFalse(added);

        // 用另一種方式設定 return 的結果
        doReturn(false).when(dataDAOmock).add(anyString());
        boolean added2 = dataDAOmock.add( randomId() );
        assertFalse(added2);

        // 設定 method 呼叫時,會 throw Excpetion
        when(dataDAOmock.add(anyString())).thenThrow(IllegalStateException.class);
        IllegalStateException exception = assertThrows(IllegalStateException.class, () -> {
            dataDAOmock.add( randomId() );
        });

        // 設定 void return 的 method,會 throw Exception
        doThrow(NullPointerException.class).when(dataDAOmock).clear();
        assertThrows(NullPointerException.class, () -> dataDAOmock.clear());

        // 設定 method 多次呼叫時,有不同的 return 結果
        DataDAO dataDAOmock2 = mock(DataDAO.class);
        when(dataDAOmock2.add(anyString()))
                .thenReturn(false)
                .thenThrow(IllegalStateException.class);
        assertThrows(IllegalStateException.class, () -> {
            dataDAOmock2.add( randomId() );
            dataDAOmock2.add( randomId() );
        });

        // 設定 spy 的行為
        // mock 是接管所有物件的 method,但 spy 則是對一部分的 method 做 mock
        DataDAO dataDAO = new DataDAO();
        DataDAO spy = spy(dataDAO);
        doThrow(NullPointerException.class).when(spy).getDataSize();
        assertThrows(NullPointerException.class, () -> spy.getDataSize());
        assertEquals("test", spy.getDataById("test"));

        // 設定呼叫 mock 的某個 method 要呼叫真實的物件的 method
        DataDAO dataDAOmock3 = mock(DataDAO.class);
        when(dataDAOmock3.getDataSize()).thenCallRealMethod();
        assertEquals( 0, dataDAOmock3.getDataSize());

        // 自訂 Answer
        doAnswer(invocation -> "Always the same").when(dataDAOmock3).getDataById(anyString());
        String data = dataDAOmock3.getDataById("1");
        assertEquals("Always the same", data);
    }

verify

  • 檢查是否有呼叫某個 method

  • 檢查呼叫某個 method 的次數

  • 檢查是否沒有使用 mock object

  • 檢查是不是沒有呼叫某個 method

  • 檢查是不是沒有非預期的操作互動 verifyNoMoreInteractions

  • 檢查 呼叫 method 操作的順序

  • 檢查是不是沒有呼叫某個 method

  • 檢查呼叫 method 的次數,至少 或是 最多 幾次

  • 檢查是否有使用某個參數呼叫 method

  • 檢查是否有使用任意參數呼叫 method

  • 利用 argument capture 檢查

    @Test
    public void verify_test1() {
        DataDAO dataDAOmock = mock(DataDAO.class);
        dataDAOmock.getDataSize();
        // 檢查是否有呼叫某個 method
        verify(dataDAOmock).getDataSize();

        // 檢查呼叫某個 method 的次數
        verify(dataDAOmock, times(1)).getDataSize();

        DataDAO dataDAOmock2 = mock(DataDAO.class);
        // 檢查是否沒有使用 mock object
        verifyNoInteractions(dataDAOmock2);
        // 檢查是不是沒有呼叫某個 method
        verify(dataDAOmock2, times(0)).getDataSize();

        // 檢查是不是沒有非預期的操作互動 verifyNoMoreInteractions
        DataDAO dataDAOmock3 = mock(DataDAO.class);
        dataDAOmock3.getDataSize();
        dataDAOmock3.clear();
        verify(dataDAOmock3).getDataSize();
        assertThrows(NoInteractionsWanted.class, () -> verifyNoMoreInteractions(dataDAOmock3));

        // 檢查 呼叫 method 操作的順序
        DataDAO dataDAOmock4 = mock(DataDAO.class);
        dataDAOmock4.getDataSize();
        dataDAOmock4.add("a parameter");
        dataDAOmock4.clear();

        InOrder inOrder = Mockito.inOrder(dataDAOmock4);
        inOrder.verify(dataDAOmock4).getDataSize();
        inOrder.verify(dataDAOmock4).add("a parameter");
        inOrder.verify(dataDAOmock4).clear();

        // 檢查是不是沒有呼叫某個 method
        verify(dataDAOmock4, never()).getDataById("");

        // 檢查呼叫 method 的次數,至少 或是 最多 幾次
        DataDAO dataDAOmock5 = mock(DataDAO.class);
        dataDAOmock5.clear();
        dataDAOmock5.clear();
        dataDAOmock5.clear();
        verify(dataDAOmock5, atLeast(1)).clear();
        verify(dataDAOmock5, atMost(5)).clear();

        // 檢查是否有使用某個參數呼叫 method
        DataDAO dataDAOmock6 = mock(DataDAO.class);
        dataDAOmock6.getDataById("test1");
        verify(dataDAOmock6).getDataById("test1");
        assertThrows(WantedButNotInvoked.class, () -> verify(dataDAOmock6).getDataById("test"));
        // 檢查是否有使用任意參數呼叫 method
        verify(dataDAOmock6).getDataById(anyString());

        // 利用 argument capture 檢查
        DataDAO dataDAOmock7 = mock(DataDAO.class);
        dataDAOmock7.getDataById("someElement");

        ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
        verify(dataDAOmock7).getDataById(argumentCaptor.capture());

        String capturedArgument = argumentCaptor.getValue();
        assertEquals( "someElement", capturedArgument);
    }

Note

在執行測試時,有遇到這樣的錯誤資訊

OpenJDK 64-Bit Server VM warning: Sharing is only supported for boot loader classes because bootstrap classpath has been appended

原因是 VM warning: Sharing is only supported for boot loader classes · Issue #3111 · mockito/mockito · GitHub

CDS: Class Data Sharing 將一組類別預處理為共享存檔文件,然後可以在運行時進行內存映射以減少啓動時間。主要目的是減少啓動時間。應用程式對於它使用的核心類別的數量越小,節省的啓動時間部分就越大。自 JDK 12 開始,就預先打包了一份預設的 CDS 檔案。

解決方式是加上 JVM 執行參數

-Xshare:off

Reference

Mockito - 維基百科,自由的百科全書

Mockito - mockito-core 5.12.0 javadoc

# SpringBoot - 單元測試工具 Mockito

Mockito 简明教程| waylau.com

Mockito's Mock Methods | Baeldung

Mockito When/Then Cookbook | Baeldung

Mockito Verify Cookbook | Baeldung

2024/12/2

Retrofit RxJava Adapter

Retrofit 的 RxJava Adapter 用於將 Retrofit 的服務呼叫結果轉換為 RxJava 的 Observable、Single、Maybe 或 Completable 等反應式類別。在進行網路請求時,能充分利用 RxJava 的功能,如流控制、非同步處理、錯誤處理等。

https://reqres.in/ 有一個 List User 的 GET method API,以下以這個 API 實作

pom.xml

廚了 Retrofit2 集 RxJava3 以外,增加 adapter-rxjava3

        <!--retrofit-->
        <dependency>
            <groupId>com.squareup.retrofit2</groupId>
            <artifactId>retrofit</artifactId>
            <version>2.11.0</version>
        </dependency>
        <dependency>
            <groupId>com.squareup.retrofit2</groupId>
            <artifactId>converter-gson</artifactId>
            <version>2.11.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.squareup.retrofit2/adapter-rxjava3 -->
        <dependency>
            <groupId>com.squareup.retrofit2</groupId>
            <artifactId>adapter-rxjava3</artifactId>
            <version>2.11.0</version>
        </dependency>

        <!-- rxjava -->
        <dependency>
            <groupId>io.reactivex.rxjava3</groupId>
            <artifactId>rxjava</artifactId>
            <version>3.1.8</version>
        </dependency>

Retrofit

透過網頁服務的 API 定義 service 及發送,回傳的資料

UserData.java

public class UserData {
    private long id;
    private String first_name;
    private String last_name;
    private String email;
// getter, setter, toString
}

UserList.java

public class UserList {
    private int page;
    private int per_page;
    private int total;
    private int total_pages;

    private List<UserData> data;
// getter, setter, toString
}

UserListService.java

注意,這邊的回傳結果改為 Observable<UserList>

public interface UserListService {
    @GET("/api/users")
    Observable<UserList> listUser(@Query("page") int pageno);
}

Retrofit RxJava 測試

RetrofitRxJavaTest.java

public class RetrofitRxJavaTest {
    public static void sync() {
        int maxIdleConnections = 10;
        int keepAliveDurationMills = 1000;
        OkHttpClient.Builder httpClient = new OkHttpClient.Builder()
                .connectionPool(new ConnectionPool(maxIdleConnections, keepAliveDurationMills, TimeUnit.MILLISECONDS));
        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("https://reqres.in/")
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJava3CallAdapterFactory.create())
                .client(httpClient.build())
                .build();

        UserListService service = retrofit.create(UserListService.class);

        Observable<UserList> observable = service.listUser(2);
        observable.subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.single())
//                    .subscribe(
//                            System.out::println
//                    );
                    .subscribe(new DefaultObserver<UserList>() {
                        @Override
                        public void onNext(UserList response) {
                            System.out.println(response);
                        }

                        @Override
                        public void onError(@NonNull Throwable e) {

                        }

                        @Override
                        public void onComplete() {

                        }
                    });
    }

    public static void main(String[] args) {
        System.out.println(new Date());
        sync();

        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

如果程式放在 Android APP,則可以改為在主執行緒,也就是主畫面的執行緒上觀察結果

service.listUser(2).subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())

2024/11/25

How to use RxJava

測試如何使用 RxJava

maven pom.xml

加上 rxjava3 的 library

        <!-- rxjava -->
        <dependency>
            <groupId>io.reactivex.rxjava3</groupId>
            <artifactId>rxjava</artifactId>
            <version>3.1.8</version>
        </dependency>

Observable

Observable 可從

這邊是產生 Observable,然後 observer 用 subscribe 註冊 Observable sequence。該 sequence 會發送 items 到 observer,一次發送一個

    public static void test1() {
        // just 會將 item 轉成 Observable
        Observable<String> observerable = Observable.just("Hello", "World");
        // subscribe 註冊 Observable,列印每一個 item
        observerable.subscribe(System.out::println);
    }

observer 有三個 interface

  • OnNext

    當 observerable 發布一個新的 event (item),observer 收到該 item 然後能做一些操作

  • OnComplete

    當 sequence of events 完成時,代表不會再呼叫 OnNext

  • OnError

    如果在 RxJava framework 或是 OnNext 裡面發生 error

    public static void test2() {
        String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
        Observable<String> observable = Observable.fromArray(letters);

        // Java 的 String 是 immutable class,且是 pass by reference
        // 所以這邊如果要儲存結果,不能直接用 String 物件,改用 array 記錄一直改變的 String reference
        String[] result = {""};
        observable.subscribe(
                i -> result[0] += i,  //OnNext
                Throwable::printStackTrace, //OnError
                () -> result[0] += "_Completed" //OnCompleted
        );
        System.out.println(" result[0]="+ result[0]);

        // 也可以用 StringBuilder,這樣就可以一直改變 string 內容
        StringBuilder sb = new StringBuilder();
        observable.subscribe(
                i -> sb.append(i),  //OnNext
                Throwable::printStackTrace, //OnError
                () -> sb.append("_Completed") //OnCompleted
        );
        System.out.println(" sb="+ sb.toString());
        // 結果 abcdefg_Completed

        // 測試 OnError
        // 當處理到 "a",因為無法轉成 integer 會發生 error
        // error 處理為 列印 _err 文字
        String[] letters2 = {"1", "a", "2"};
        Observable<String> observable2 = Observable.fromArray(letters2);
        StringBuilder sb2 = new StringBuilder();
        observable2.subscribe(
                i -> sb2.append(Integer.parseInt(i)),  //OnNext
//                Throwable::printStackTrace, //OnError
                err -> sb2.append("_err"),
                () -> sb2.append("_Completed") //OnCompleted
        );
        System.out.println(" sb2="+ sb2.toString());
        // 結果 1_err
    }

operator

map, flatMap

map 是 1-to-1 處理的,但是 flatMap 可以 1-to-many, many-to-many

    public static void operator_test_map_flatmap() {
        // map: 對每一個 item 執行某個 operation,會回傳任意物件
        String[] letters = {"a", "b", "c", "d"};
        StringBuilder sb = new StringBuilder();
        Observable.fromArray(letters)
                .map(String::toUpperCase)
                .subscribe(letter -> sb.append(letter));
        System.out.println("operator_test_1 sb="+ sb.toString());

        // flatMap 會回傳結果的 Observable
        // flatMap 不保證 sequence 順序
        // map 是 1-to-1 處理的,但是 flatMap 可以 1-to-many, many-to-many
        Observable<String> observable = Observable.just("Hello", "World");
        observable
            .flatMap(item -> Observable.fromArray(item.split("")))
            .subscribe(System.out::println);
    }

filter

filter 會過濾 item,符合條件的才會通過

    public static void operator_test_filter() {
        // filter 會過濾 item,符合條件的才會通過
        Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5);
        observable.filter(item -> item % 2 == 0)
                .subscribe(System.out::println);
        // 結果是
        // 2
        // 4
    }

scan

scan 會將每次計算的結果,發送給下一個 步驟

    public static void operator_test_scan() {

        // scan 會將每次計算的結果,發送給下一個 步驟
        String[] letters = {"a", "b", "c"};

        StringBuilder sb = new StringBuilder();
        Observable.fromArray(letters)
                .scan(new StringBuilder(), StringBuilder::append)
                .subscribe(total -> sb.append(total.toString()+" ") );

        System.out.println("operator_test_scan sb="+ sb.toString());
        // a ab abc

        // 累加 1~5
        StringBuilder sb2 = new StringBuilder();
        Observable.range(1,5)
                .scan( (Integer res1, Integer res2) -> res1+res2  )
                .subscribe(total -> sb2.append(total.toString()+" ") );
        System.out.println("operator_test_scan sb="+ sb2.toString());
        // 1 3 6 10 15
    }

groupBy

將 event 分類

ex: 將數字依照單雙數分類

    public static void operator_test_groupby() {
        String[] nums = {"", ""};
        Observable.range(1,5)
                .groupBy(i -> 0 == (i % 2) ? "EVEN" : "ODD")
                .subscribe(group ->
                        group.subscribe((number) -> {
                            if (group.getKey().toString().equals("EVEN")) {
                                nums[0] += number+" ";
                            } else {
                                nums[1] += number+" ";
                            }
                        })
                );
        System.out.println("operator_test_groupby nums="+ Arrays.deepToString(nums));
        // 結果  operator_test_groupby nums=[2 4 , 1 3 5 ]
    }

conditional

defaultIfEmpty

first

takeWhile

    public static void operator_test_conditional() {
        // empty() 可產生一個 空的 Observable
        // defaultIfEmpty() 在 observable 空的時候有作用
        Observable.empty()
                .defaultIfEmpty("Observable is empty")
                .subscribe(System.out::println);

        // first  發出第一個來源資料,如果沒有則發送預設值。
        String[] letters = {"a", "b", "c"};
        StringBuilder sb = new StringBuilder();
        Observable.fromArray(letters)
                .defaultIfEmpty("Observable is empty")
                .first("default first")
                .subscribe( res -> sb.append(res) );
        System.out.println("operator_test_conditional sb="+sb);
        // operator_test_conditional sb=a

        StringBuilder sb2 = new StringBuilder();
        Observable.empty()
                .defaultIfEmpty("Observable is empty")
                .first("default first")
                .subscribe( res -> sb2.append(res) );
        System.out.println("operator_test_conditional empty sb2="+sb2);
        // operator_test_conditional empty sb2=Observable is empty

        // takeWhile 只在滿足條件時,取用此 item,否則就丟棄該 item
        int[] sum = {0};
        Observable.range(1,10)
                .takeWhile(i -> i < 5)
                .subscribe(s -> sum[0] += s);
        System.out.println("operator_test_conditional takeWhile sum="+sum[0]);
        // operator_test_conditional takeWhile sum=10
    }

combining

    public static void operator_test_combining() {
        // merge: 將多個 Observables 合併成一個
        Observable<String> observable1 = Observable.just("Hello");
        Observable<String> observable2 = Observable.just("World");
        Observable.merge(observable1, observable2)
                .subscribe(System.out::println);
        //Hello
        //World

        // zip: 以某個方式,合併多個 Observables
        Observable.zip(observable1, observable2, (item1, item2) -> item1 + " " + item2)
                .subscribe(System.out::println);
        //Hello World

        // startWith: 將指定的 Observable 合併到另一個 的開頭
        Observable<String> names = Observable.just("Project", "zero");
        Observable<String> otherNames = Observable.just("Git", "Code");
        names.startWith(otherNames).subscribe(System.out::println);
        //Git
        //Code
        //Project
        //zero
    }

Operators Category

ReactiveX - Operators 這邊可查詢到所有 ReactiveX 的 operators 定義

Alphabetical List of Observable Operators · ReactiveX/RxJava Wiki · GitHub 這邊是 RxJava 的 operators

依照 ReactiveX 的分類有

  • Creating

  • Transforming

  • Filtering

  • Combining

  • Observable Utility

  • Conditional and Boolean

  • Mathematical and Aggregate

  • Backpressure

  • Connectable Observable

  • Operators to Convert

Scheduler

subscribeOn: 指定 Observable 的執行緒,只能寫一次,影響 Observable 的執行。會影響從訂閱開始到第一個 observeOn 的所有操作

observeOn: 在哪個執行緒上觀察 Observable 的結果,可以寫很多次切換執行緒。

    public static void test_scheduler() {
        printCurrentThread("start");
        Observable<Object> observable = Observable.just("Hello", "World");
        observable
                .map(s -> {
                    printCurrentThread("first map: "+s);
                    return s;
                })
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.io())
                .map(s -> {
                    printCurrentThread("second map: "+s);
                    return s;
                })
                .observeOn(Schedulers.single())
                .subscribe(
                    s -> {
                        printCurrentThread("subscribe: "+s);
                    }
                );

        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
//        Thread: main, start
//        Thread: RxComputationThreadPool-1, first map: Hello
//        Thread: RxCachedThreadScheduler-1, second map: Hello
//        Thread: RxComputationThreadPool-1, first map: World
//        Thread: RxCachedThreadScheduler-1, second map: World
//        Thread: RxSingleScheduler-1, subscribe: Hello
//        Thread: RxSingleScheduler-1, subscribe: World
    }

    private static void printCurrentThread(String message) {
        System.out.format("Thread: %s, %s%n", Thread.currentThread().getName(), message);
    }

執行結果

Thread: main, start
Thread: RxComputationThreadPool-1, first map: Hello
Thread: RxCachedThreadScheduler-1, second map: Hello
Thread: RxComputationThreadPool-1, first map: World
Thread: RxCachedThreadScheduler-1, second map: World
Thread: RxSingleScheduler-1, subscribe: Hello
Thread: RxSingleScheduler-1, subscribe: World

Error Handling

observer 有三個 interface: OnNext, OnError, OnComplete

    public static void test_error() {
        Observable<String> observable = Observable.just("Hello", "World")
                .map(item -> {
                    if (item.equals("World")) {
                        throw new RuntimeException("Error occurred");
                    }
                    return item;
                });

        observable.subscribe(
                System.out::println,
                error -> System.err.println("Error: " + error.getMessage()),
                () -> System.out.println("Completed!")
        );
        //Hello
        //Error: Error occurred
    }

References

Introduction to RxJava | Baeldung

Guide to RxJava in Java

RxJava3 用法 - petercao - 博客园