2025/2/17

Queue in Java

Queue 是 java.util package 裡面的 Collection framwork 中的其中一個介面,主要是定義 First-in-First-out FIFO queue 這種介面。除了基本的 Collection 操作方法以外,Queue 還提供了專屬的 insert/get/inspect method。

throw Exception return false/null
insert add(e) offer(e)
remove remove() poll()
examine element() peek()
    @Test
    public void queue_test1() {
        Queue<String> queue = new LinkedList<>();
        queue.add("one");
        queue.add("two");
        queue.add("three");
        assertEquals("[one, two, three]", queue.toString());

        queue.remove("two");
        assertEquals("[one, three]", queue.toString());

        String element = queue.element();
        assertEquals("one", element);
        assertEquals("[one, three]", queue.toString());

        // To empty the queue
        queue.clear();
        queue.offer("one");
        queue.offer("two");
        queue.offer("three");
        assertEquals("[one, two, three]", queue.toString());

        // poll 是取得 queue 的第一個 element
        String pollElement = queue.poll();
        assertEquals("one", pollElement);
        assertEquals("[two, three]", queue.toString());

        // peek 是取得 queue 的第一個 element,但只是偷看,不會從 queue 移除該 element
        String peakElement = queue.peek();
        assertEquals("two", peakElement);
        assertEquals("[two, three]", queue.toString());
    }

    @Test
    public void queue_test2() {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
        queue.add("one");
        queue.add("two");
        // offer 在 insert 超過 Queue 容量時,會產生 exception
        IllegalStateException exception = assertThrows(IllegalStateException.class, () -> {
            queue.add("three");
        });

        // offer 在 insert 超過 Queue 容量時,不會產生 exception
        // 只是回傳一個 true/false flag 代表這個 insert 有沒有成功
        queue.clear();
        queue.offer("one");
        assertTrue( queue.offer("two") );
        assertFalse( queue.offer("three") );
        assertEquals("[one, two]", queue.toString());

        queue.clear();
        // remove, element 在 Queue 沒有任何資料時,會產生 exception
        NoSuchElementException exception2 = assertThrows(NoSuchElementException.class, () -> {
            queue.remove();
        });
        NoSuchElementException exception3 = assertThrows(NoSuchElementException.class, () -> {
            queue.element();
        });
        // poll, peek 會在 Queue 為空的時候,回傳 null
        assertNull(queue.poll());
        assertNull(queue.peek());
    }

sub interfaces

Queue 有三個主要的子介面: Blocking Queue, Transfer Queue, Deque

Blocking Queue

增加 methods,可強制 threads 等待 queue,例如在取得 queue 的元素時,可一直等待 queue 裡面有元素才回傳。或是可以等待 queue 清空後,再新增元素時。

Blocking Queue 的實作包含了 LinkedBlockingQueue, SynchronousQueue 及ArrayBlockingQueue

除了既有的 add(), offer() 以外,另外還有

  • put()

    insert 一個元素,等待 queue 有空間才能 put 進去

  • offer(E e, long timeout, TimeUnit unit)

    insert 一個元素,等待 queue 有空間才能 put 進去,等待的時間有 timeout 機制

remove 部分除了既有的 remove(), poll() 以外,還有

  • take()

    取得第一個元素,當 queue 為空的時候,會 blocking thread,等待 queue 有元素可以取得

  • poll(long timeout, TimeUnit int)

    取得第一個元素,當 queue 為空的時候,會 blocking thread,等待 queue 有元素可以取得,等待的時間有 timeout 機制

import java.util.Random;
import java.util.concurrent.BlockingQueue;

class Producer extends Thread {
    protected BlockingQueue<Integer> blockingQueue;
    private int limit;

    Producer(BlockingQueue<Integer> blockingQueue, int limit) {
        this.blockingQueue = blockingQueue;
        this.limit = limit;
    }

    public void run() {
        Random random = new Random();
        for(int i = 1; i <= limit; i++) {
            try {
                // random 放入 1/2 個 integer
                int randomProducer = random.nextInt(2);
//                System.out.println("randomProducer=" + randomProducer);
                for(int j = 0; j <= randomProducer; j++) {
                    System.out.println("Producer put " + (i+j));
                    blockingQueue.put((i+j)); // to produce data
                }
                i = i+randomProducer;
                // produce data with an interval of 0.5 sec
                Thread.sleep(500);
            } catch (InterruptedException exp) {
                System.out.println("An interruption occurred at Producer");
            }
        }
    }
}

Consumer.java

import java.util.concurrent.BlockingQueue;

class Consumer extends Thread {
    protected BlockingQueue<Integer> blockingQueue;
    Consumer(BlockingQueue<Integer> blockingQueue) { // constructor
        this.blockingQueue = blockingQueue;
    }
    public void run() { // overriding run method
        try {
            while (true) {
                Integer elem = blockingQueue.take(); // to consume data
                System.out.println("Consumer take " + elem);
            }
        }
        // to handle exception
        catch (InterruptedException exp) {
            System.out.println("An interruption occurred at Consumer");
        }
    }
}

CPTest.java

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class CPTest {
    public static void main(String[] args) throws InterruptedException {
        // create an object of BlockingQueue
        BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(5);

        // passing object of BlockingQueue as arguments
        Producer threadProd = new Producer(blockingQueue, 20);
        Consumer threadCon = new Consumer(blockingQueue);

        // to start the process
        threadProd.start();
        threadCon.start();

        // to exit the process after 5 sec
        Thread.sleep(2000);
        System.exit(0);
    }
}

執行結果

Producer put 1
Producer put 2
Consumer take 1
Consumer take 2
Producer put 3
Consumer take 3
Producer put 4
Consumer take 4
Producer put 5
Consumer take 5

Transfer Queue

extends BlockingQueue 介面,並套用 producer-consumer pattern,可控制 producer 到 consumer 資料流動的速度。

Transfer Queue 的實作包含了 LinkedTrasferQueue。

Producer.java

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;

class Producer extends Thread {
    protected TransferQueue<Integer> transferQueue;
    private int limit;

    Producer(TransferQueue<Integer> transferQueue, int limit) {
        this.transferQueue = transferQueue;
        this.limit = limit;
    }

    public void run() {
        for(int i = 1; i <= limit; i++) {
            try {
                System.out.println("Producer put " + i);
                boolean added = transferQueue.tryTransfer(i, 4000, TimeUnit.MILLISECONDS);
                if( !added ) {
                    i = i-1;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Consumer.java

import java.util.concurrent.TransferQueue;

class Consumer extends Thread {
    protected TransferQueue<Integer> transferQueue;
    Consumer(TransferQueue<Integer> transferQueue) { // constructor
        this.transferQueue = transferQueue;
    }
    public void run() {
        try {
            while (true) {
                Integer elem = transferQueue.take(); // to consume data
                System.out.println("Consumer take " + elem);
            }
        } catch (InterruptedException exp) {
            System.out.println("An interruption occurred at Consumer");
        }
    }
}

TransferQueueTest.java

import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;

public class TransferQueueTest {
    public static void main(String[] args) throws InterruptedException {
        TransferQueue<Integer> transferQueue = new LinkedTransferQueue<>();

        // passing object of BlockingQueue as arguments
        Producer threadProd = new Producer(transferQueue, 5);
        Consumer threadCon = new Consumer(transferQueue);

        // to start the process
        threadProd.start();
        threadCon.start();

        // to exit the process after 5 sec
        Thread.sleep(2000);
        System.exit(0);
    }
}

Deque

Deque 是 Double-Ended Queue 的縮寫,也就是雙向的 Queue,頭尾都可以 insert/get 資料

Deque 的實作包含了 ArrayDeque。

Operation Method Method throwing Exception
Insertion from Head offerFirst(e) addFirst(e)
Removal from Head pollFirst() removeFirst()
Retrieval from Head peekFirst() getFirst()
Insertion from Tail offerLast(e) addLast(e)
Removal from Tail pollLast() removeLast()
Retrieval from Tail peekLast() getLast()

測試程式

    @Test
    public void deque_test() {
        // Deque as Stack
        Deque<String> stack = new ArrayDeque<>();
        stack.push("one");
        stack.push("two");
        assertEquals("two", stack.getFirst());
        assertEquals("two", stack.pop());
        stack.pop();
        NoSuchElementException exception = assertThrows(NoSuchElementException.class, () -> {
            stack.pop();
        });

        // Deque as Queue
        Deque<String> queue = new ArrayDeque<>();
        queue.offer("one");
        queue.offer("two");
        assertEquals("two", queue.getLast());
        assertEquals("one", queue.poll());
        queue.poll();
        assertNull(queue.poll());
    }

Priority Queue

新的元素要加入 PriorityQueue 時,會立刻以 natural order 或是已經定義的 Comparator 排序

    @Test
    public void priority_queue_test() {
        PriorityQueue<String> integerQueue = new PriorityQueue<>();

        integerQueue.add("one");
        integerQueue.add("two");
        integerQueue.add("three");

        String first = integerQueue.poll();
        String second = integerQueue.poll();
        String third = integerQueue.poll();

        assertEquals("one", first);
        assertEquals("three", second);
        assertEquals("two", third);
    }

Reference

Guide to the Java Queue Interface | Baeldung

Java Queue – Queue in Java | DigitalOcean

2025/1/20

afick

afick: another file integrity checker Files 是檔案檢查工具,可監控是否有檔案被異動。

install

afick 試用 perl 開發的,故安裝前要先安裝 perl

dnf -y intall perl

根據 [install 文件](afick installation) 的說明,可以到 another file integrity checker - Browse /afick/3.8.0 at SourceForge.net 下載套件或 source code。

Rocky Linux 安裝方式,是安裝 rpm file

rpm -ivh afick-3.8.0-1.noarch.rpm

也可以直接從 source code 編譯安裝。參考 source code 裡面的 INSTALL 文件的說明,編譯安裝步驟為

tar xvfz afick*.tgz
cd  afick*
perl Makefile.pl
make install

如果要直接安裝成 service,就改為以下步驟。會有一個 cronjob 放在 /etc/cron.daily 目錄

tar xvfz afick*.tgz
cd  afick*
perl Makefile.pl Makefile_sys.in
make install

設定

設定檔在 /etc/afick.conf,在設定檔最後面加上要監控的目錄,例如

/var/www/html DIR

使用

# 初始化 afick
afick -c /etc/afick.conf -init

# 監控並檢查檔案
afick -c /etc/afick.conf -k

# 檢查檔案並更新資料庫
afick -c afick.conf --update

在執行初始化時,花了不少時間,25 萬個檔案,大約用了 10 分鐘

afick -c /etc/afick.conf -init

# #################################################################
# MD5 hash of /var/lib/afick/afick => LXVxgXA/BosPirqMhDpowg

# Hash database created successfully. 251267 files entered.
# user time : 88.79; system time : 32.05; real time : 625

修改兩個檔案內容後,測試檢查檔案。

檢查時也將 afick.conf 納入檢查範圍。因預設 exclude_suffix 把 html, htm 排除了,所以用 css file 測試。

afick -c /etc/afick.conf -k

# archive:=/var/lib/afick/archive
# database:=/var/lib/afick/afick
# exclude_suffix:=log LOG html htm HTM txt TXT xml hlp pod chm tmp old bak fon ttf TTF bmp BMP jpg JPG gif png ico wav WAV mp3 avi pyc
# history:=/var/lib/afick/history
# max_checksum_size:=10000000
# running_files:=1
# timing:=1
# dbm:=Storable
# last run on 2024/08/12 10:17:35 with afick version 3.8.0
WARNING: (control) afick internal change : /etc/afick.conf (see below)

# summary changes
deleted directory : /
    number of deleted files         : 1
changed file : /etc/afick.conf
changed file : /var/www/html/index.css

# detailed changes
deleted directory : /
    parent_date         : Thu Aug  1 14:23:12 2024
    number of deleted files         : 1
changed file : /etc/afick.conf
    md5         : 877b96dc1be6083fd4589a96a2767006    f604ce2893a4bda0750b6564c84020b9
    filesize         : 7268    7269
changed file : /var/www/html/index.css
    inode         : 402693690    402705600
# #################################################################
# MD5 hash of /var/lib/afick/afick => ddgOifAlUpJfbRakzwY9tQ

# Hash database : 251265 files scanned, 4 changed (new : 0; delete : 2; changed : 2; dangling : 16; exclude_suffix : 26665; exclude_prefix : 0; exclude_re : 0; masked : 0; degraded : 244)
# user time : 104.85; system time : 20.72; real time : 383

afick_cron

在 /etc/cron.daily/afick_cron 裡面,ACTION 參數決定,cronjob 檢查檔案後,是否要更新資料庫

# the default action is "update" (-u), you can also use "compare" (-k)
ACTION="-u"

References

不只是資安: [工具介紹] Linux 下的檔案完整性偵測工具 - afick

CentOS 7 安裝 AFICK – 檔案安全監控 (更新內容 – 2018/12/12) – Ken Wu

2025/1/13

Netty in Java 2

如何處理 Stream-based Transport

在 TCP/IP 的 stream-based transport 中,接收資料後,會存放在 socket receive buffer,但資料是一連串的 bytes,這代表說,即使用兩個訊息,以獨立的資料封包傳送,在 OS 也只會認為是一連串的 bytes,不代表解讀時也是這樣。

假設 OS TCP/IP 收到這三個資料包要傳送

ABC   DEF   GHI

在接收端可能會讀取到

AB    CDEFG    H   I

因此接收端必須自己整理 bytes 資料,並恢復為原本的狀態,才能正確解讀資料

ABC   DEF   GHI

方案1

在 Time client 的例子中,雖然 32 bits 資料很少,不太可能分片,但也有可能因為 traffic 增加,而出現這種狀況。

最簡單的方式,就是建立一個內部 buffer,累積到 4 bytes,就能繼續處理。

TimeClientHandler1.java

package netty.time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class TimeClientHandler1 extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        // 在 channel 產生時,建立一個 4 bytes buffer
        buf = ctx.alloc().buffer(4);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        // 當 channel 被移除時,就 release buffer
        buf.release();
        buf = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        // 一次讀取一個 byte,寫入 ByteBuf
        buf.writeBytes(m);
        m.release();

        // 檢查是否已經累積到 4 bytes
        if (buf.readableBytes() >= 4) {
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

方案2

將訊息組合的部份拆開,移到 Decoder

TimeDecoder.java

package netty.time.stream;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

// 處理碎片問題
public class TimeDecoder extends ByteToMessageDecoder {

    // 每次收到資料時,都會用內部的 buffer,呼叫這個 method
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        // 當 in 不足 4 bytes 時,就不寫入到 out
        if (in.readableBytes() < 4) {
            return;
        }
        // 當 in 有 4 bytes
        out.add(in.readBytes(4));
    }
}

TimeDecoder1.java

TimeDecoder 可用另一種簡化的方式實作

package netty.time.stream;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;

import java.util.List;

// 改用 ReplayingDecoder,可簡化 TimeDecoder 的寫法
public class TimeDecoder1 extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}

TimeClient.java

修改 ChannelInitializer 的部分,加上 Decoder

package netty.time.stream;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {
    public static void main(String[] args) throws Exception {

        String host = "localhost";
        int port = 8080;
        if (args.length > 0) {
            host = args[0];
            port = Integer.parseInt(args[1]);
        }
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // client 要使用 Bootstrap,跟 ServerBootstrap 不同
            Bootstrap b = new Bootstrap();
            // client 只需要一個 worker EventLoopGroup
            b.group(workerGroup);
            // client 要改用 NioSocketChannel,跟 NioServerSocketChannel 不同
            b.channel(NioSocketChannel.class);
            // 不要使用 childOption
            b.option(ChannelOption.SO_KEEPALIVE, true);
//            b.handler(new ChannelInitializer<SocketChannel>() {
//                @Override
//                public void initChannel(SocketChannel ch) throws Exception {
//                    ch.pipeline().addLast(new TimeClientHandler());
//                }
//            });
            // 要在 TimeClientHandler 之前,使用 TimeDecoder
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
                }
            });

            // Start the client.
            // 呼叫 connect 而不是 bind
            ChannelFuture f = b.connect(host, port).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

TimeClientHandler.java

程式不變

package netty.time.stream;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 收到的資料是 ByteBuf
        ByteBuf m = (ByteBuf) msg;
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

POJO

改用 POJO java class,讓 protocol 解讀更清楚

UnixTime.java

package netty.time.pojo;

import java.util.Date;

public class UnixTime {

    private final long value;

    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }

    public UnixTime(long value) {
        this.value = value;
    }

    public long value() {
        return value;
    }

    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

修改 TimeDecoder

TimeDecoder.java

package netty.time.pojo;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

// 處理碎片問題
public class TimeDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        if (in.readableBytes() < 4) {
            return;
        }

        out.add(new UnixTime(in.readUnsignedInt()));
    }
}

TimeClientHandler.java

package netty.time.pojo;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        UnixTime m = (UnixTime) msg;
        System.out.println(m);
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

TimeClient.java

package netty.time.pojo;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {
    public static void main(String[] args) throws Exception {

        String host = "localhost";
        int port = 8080;
        if (args.length > 0) {
            host = args[0];
            port = Integer.parseInt(args[1]);
        }
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // client 要使用 Bootstrap,跟 ServerBootstrap 不同
            Bootstrap b = new Bootstrap();
            // client 只需要一個 worker EventLoopGroup
            b.group(workerGroup);
            // client 要改用 NioSocketChannel,跟 NioServerSocketChannel 不同
            b.channel(NioSocketChannel.class);
            // 不要使用 childOption
            b.option(ChannelOption.SO_KEEPALIVE, true);
//            b.handler(new ChannelInitializer<SocketChannel>() {
//                @Override
//                public void initChannel(SocketChannel ch) throws Exception {
//                    ch.pipeline().addLast(new TimeClientHandler());
//                }
//            });
            // 要在 TimeClientHandler 之前,使用 TimeDecoder
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
                }
            });

            // Start the client.
            // 呼叫 connect 而不是 bind
            ChannelFuture f = b.connect(host, port).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

POJO Server

UnixTime.java

package netty.time.pojoserver;

import java.util.Date;

public class UnixTime {

    private final long value;

    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }

    public UnixTime(long value) {
        this.value = value;
    }

    public long value() {
        return value;
    }

    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

TimeServerHandler.java

package netty.time.pojoserver;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ChannelFuture f = ctx.writeAndFlush(new UnixTime());
        f.addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

TimeEncoder.java

package netty.time.pojoserver;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int)m.value());
        // 將 ChannelPromise 傳給 Netty,讓 netty 可決定 success or failure
        ctx.write(encoded, promise);
    }
}

TimeEncoder1.java

簡化寫法

package netty.time.pojoserver;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class TimeEncoder1 extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt((int)msg.value());
    }
}

TimeServer.java

package netty.time.pojoserver;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class TimeServer {

    private int port;

    public TimeServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        // NioEventLoopGroup 是 multithread event loop,處理 IO operation
        // 第一個 NioEventLoopGroup 處理 incoming connection,稱為 boss
        // 第二個 NioEventLoopGroup 處理已接收連線的 traffic,稱為 worker
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // ServerBootstrap 是 helper class,可設定 server
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)  // 使用 NioServerSocketChannel 接受 incoming connection
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new TimeEncoder(), new TimeServerHandler());
                        }
                    })
                    // 設定 ChannelOption, ChannelConfig 可取得所有可設定的參數
                    // option 是設定 NioServerSocketChannel 的參數
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // childOption 是設定 NioSocketChannel 的參數
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // Bind and start to accept incoming connections.
            // 綁定 TCP Port
            ChannelFuture f = b.bind(port).sync(); // (7)

            // 這邊開始會等待所有 server socket 關閉
            // 但這個例子不會發生這種狀況
            // 如果要以正常方式關閉 server,可呼叫以下 method
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new TimeServer(port).run();
    }
}

References

Netty.docs: User guide for 4.x

2025/1/6

Netty in Java 1

Netty是一個 non-blokcing I/O socket framework,主要用於開發網路應用程式。非同步事件驅動的框架和工具可簡化程式開發。Netty最初由JBoss開發,現在由Netty項目社區開發和維護。Netty還支援了HTTP、HTTP2、DNS及其他協定,WebSockets、Google Protocol Buffers、支援 SSL/TLS 以及支援用於SPDY協定和訊息壓縮。

早期 Java Socket framework 會使用 Mina 這個 library,但因為 netty 跟 mina 作者相同,且 mina 已經很久都沒有在維護了,所以目前大部分的文章都建議要使用 netty。不過討論中有提到一個最大的不同點,在 UDP 的處理部分,mina 跟 netty 有不同的作法,mina 有高階的封裝,可讓 connection less 的 UDP 連線,使用起來很像有連線的狀況,netty 是比較貼近原本的 UDP,保持了 connection less 的特性。

另外在使用 netty 之前,要注意使用了哪一個版本的 netty。根據 Remove master branch · Issue #4466 · netty/netty · GitHub 的討論,由於 netty 5 開發時,發現新的作法增加了城市的複雜度,但卻沒有帶來明顯的效能提升,所以 netty 5 目前是被放棄的狀態,建議還是要使用 4.1 版,4.1 版的 user guide 在 Netty.docs: User guide for 4.x

netty 的 libary 切割為以下這些部分

Core 是核心,Protocol Support 是在 socket 的上層的通訊協定,Transport Support 則是資料傳輸,也就是 Socket/Datagram、HTTP Tunnel 或 In-VM Pipe,這幾個都是實際傳輸資料的實作。

maven

使用netty最簡單的方式是引用所有 netty 的 libary,可引用 netty-all

<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.111.Final</version>
</dependency>

DISCARD

網路協定最簡單的是 DISCARD,server side 不管 client 送什麼資料,都會直接丟棄,不做任何回應。

DiscardServer.java

package netty.discard;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * Discards any incoming data.
 */
public class DiscardServer {

    private int port;

    public DiscardServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        // NioEventLoopGroup 是 multithread event loop,處理 IO operation
        // 第一個 NioEventLoopGroup 處理 incoming connection,稱為 boss
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 第二個 NioEventLoopGroup 處理已接收連線的 traffic,稱為 worker
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // ServerBootstrap 是 helper class,可設定 server
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) // 使用 NioServerSocketChannel 接受 incoming connection
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        // ChannelInitializer 用來設定新的 Channel
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new DiscardServerHandler());
                        }
                    })
                    // 設定 ChannelOption, ChannelConfig 可取得所有可設定的參數
                    // option 是設定 NioServerSocketChannel 的參數
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // childOption 是設定 NioSocketChannel 的參數
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // Bind and start to accept incoming connections.
            // 綁定 TCP Port
            ChannelFuture f = b.bind(port).sync();

            // 這邊開始會等待所有 server socket 關閉
            // 但這個例子不會發生這種狀況
            // 如果要以正常方式關閉 server,可呼叫以下 method
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new DiscardServer(port).run();
    }
}

DiscardServerHandler.java

package netty.discard;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

// ChannelInboundHandlerAdapter 實作了 ChannelInboundHandler 介面所有 methods
// DiscardServerHandler 只需要 繼承 ChannelInboundHandlerAdapter
// 就可以只 override 必要的 methods
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {

    // channelRead 會在收到 message 時被呼叫
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // Discard the received data silently.
//        ((ByteBuf) msg).release(); // msg 必須要呼叫 release 釋放記憶體
//
//        channelRead 通常會用以下的方式實作內容
//        try {
//            // Do something with msg
//        } finally {
//            ReferenceCountUtil.release(msg);
//        }
        ByteBuf in = (ByteBuf) msg;
        try {
            while (in.isReadable()) {
                System.out.print((char) in.readByte());
                System.out.flush();
            }
        } finally {
            ReferenceCountUtil.release(msg);
        }

        // 上面的 while loop 可替代為以下寫法
        System.out.print(in.toString(io.netty.util.CharsetUtil.US_ASCII));
        in.release();
    }

//    // 上面的 while loop 可替代為以下寫法
//    @Override
//    public void channelRead(ChannelHandlerContext ctx, Object msg) {
//        ByteBuf in = (ByteBuf) msg;
//        System.out.print(in.toString(io.netty.util.CharsetUtil.US_ASCII));
//        in.release();
//    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        // 發生 IO error 或 handler 實作有問題時,會產生 exception,由這個 method 處理
        cause.printStackTrace();
        ctx.close();
    }
}

測試

啟動 DiscardServer 後,可用 telnet/nc 測試

# 等同 telnet 127.0.0.1 8080
nc -nvv 127.0.0.1 8080

ECHO

echo 協定基於 discard 做一些修改,在 echo server 收到 client 發送的資料後,會直接將收到的資料回傳給 client。

EchoServer.java

package netty.echo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class EchoServer {

    private int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        // NioEventLoopGroup 是 multithread event loop,處理 IO operation
        // 第一個 NioEventLoopGroup 處理 incoming connection,稱為 boss
        // 第二個 NioEventLoopGroup 處理已接收連線的 traffic,稱為 worker
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // ServerBootstrap 是 helper class,可設定 server
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)  // 使用 NioServerSocketChannel 接受 incoming connection
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoServerHandler());
                        }
                    })
                    // 設定 ChannelOption, ChannelConfig 可取得所有可設定的參數
                    // option 是設定 NioServerSocketChannel 的參數
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // childOption 是設定 NioSocketChannel 的參數
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // Bind and start to accept incoming connections.
            // 綁定 TCP Port
            ChannelFuture f = b.bind(port).sync(); // (7)

            // 這邊開始會等待所有 server socket 關閉
            // 但這個例子不會發生這種狀況
            // 如果要以正常方式關閉 server,可呼叫以下 method
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new EchoServer(port).run();
    }
}

EchoServerHandler.java

package netty.echo;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

/**
 * Handles a server-side channel.
 */
public class EchoServerHandler extends ChannelInboundHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // ChannelHandlerContext ctx 可驅動 I/O
        // 這邊將收到的 msg 透過 ctx 寫回 channel,但不需要 release msg
        // 因為 netty 會在寫入時,自動 release msg
        ctx.write(msg);
        ctx.flush();
        // ctx.write 不是直接寫到網路上,而是先放到 buffer,然後再 flush
    }

//    // 可改用 writeAndFlush
//    @Override
//    public void channelRead(ChannelHandlerContext ctx, Object msg) {
//        ctx.writeAndFlush(msg);
//    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

測試

一樣用 nc 去測試

nc -nvv 127.0.0.1 8080

TIME

TIME protocol 的 server 會在 client 連線後發送一個 32 bits 整數,然後就直接關閉連線。

因為 server 必須忽略 client 發送的所有資料,所以不能用 channelRead() 要改用 channelActive()

TimeServer.java

package netty.time;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class TimeServer {

    private int port;

    public TimeServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        // NioEventLoopGroup 是 multithread event loop,處理 IO operation
        // 第一個 NioEventLoopGroup 處理 incoming connection,稱為 boss
        // 第二個 NioEventLoopGroup 處理已接收連線的 traffic,稱為 worker
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // ServerBootstrap 是 helper class,可設定 server
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)  // 使用 NioServerSocketChannel 接受 incoming connection
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new TimeServerHandler());
                        }
                    })
                    // 設定 ChannelOption, ChannelConfig 可取得所有可設定的參數
                    // option 是設定 NioServerSocketChannel 的參數
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // childOption 是設定 NioSocketChannel 的參數
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // Bind and start to accept incoming connections.
            // 綁定 TCP Port
            ChannelFuture f = b.bind(port).sync(); // (7)

            // 這邊開始會等待所有 server socket 關閉
            // 但這個例子不會發生這種狀況
            // 如果要以正常方式關閉 server,可呼叫以下 method
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new TimeServer(port).run();
    }
}

TimeServerHandler.java

package netty.time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) {
        // channelActive 會在 connection 建立後,就被呼叫
        // 先取得一塊 32 bits integer (4 bytes) 的 ByteBuf,然後將現在的時間填進去
        final ByteBuf time = ctx.alloc().buffer(4);
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

        // 這邊不需要跟 NIO 的 ByteBuffer 一樣,在發送前,呼叫 java.nio.ByteBuffer.flip()
        // 因為 ByteBuf 已經自動處理了 read/write,會使用兩種不同的 pointer (index)
        final ChannelFuture f = ctx.writeAndFlush(time);
        // ChannelHandlerContext.write() 跟 writeAndFlush 會回傳 ChannelFuture
        // 代表 Netty 的 IO operation 是非同步的
        // 必須用非同步的方式,確認 future 已經完成,才能將 context 關閉
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        });  // 這邊可以簡化為這種寫法
        // f.addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

TimeClient.java

package netty.time;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {
    public static void main(String[] args) throws Exception {

        String host = "localhost";
        int port = 8080;
        if (args.length > 0) {
            host = args[0];
            port = Integer.parseInt(args[1]);
        }
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // client 要使用 Bootstrap,跟 ServerBootstrap 不同
            Bootstrap b = new Bootstrap();
            // client 只需要一個 worker EventLoopGroup
            b.group(workerGroup);
            // client 要改用 NioSocketChannel,跟 NioServerSocketChannel 不同
            b.channel(NioSocketChannel.class);
            // 不要使用 childOption
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });

            // Start the client.
            // 呼叫 connect 而不是 bind
            ChannelFuture f = b.connect(host, port).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

TimeClientHandler.java

package netty.time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 收到的資料是 ByteBuf
        ByteBuf m = (ByteBuf) msg;
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

References

Netty.docs: User guide for 4.x

netty/example/src/main/java/io/netty/example at 4.1 · netty/netty · GitHub

2024/12/23

Mockito: mock static final method

以下記錄 Mockito 在處理 static method,以及 final class/method 的方法。

準備

先準備一個要被 mock 的類別

import java.util.UUID;

public class DataUtils {
    public static String getUuid() {
//        return UUID.randomUUID().toString();
        return "UUID";
    }

    public static String concat(String a, String b) {
        return a+":"+b;
    }

    public final int finalMethod() {
        return 1;
    }
    public String multiply(String a, int times) {
        return a.repeat( times );
    }
}

mock static method

  • 沒有參數的 static method

Mockito.mockStatic(Class classToMock)

注意 MockedStatic 的 scope 問題,必須要放在 try-with-resources 裡面

    @Test
    // mock 沒有參數的 static method 的方法
    public void mock_test1() {
        assertEquals("UUID", DataUtils.getUuid());

        // Mockito.mockStatic(Class<T> classToMock)
        // 注意 MockedStatic 的 scope 問題,必須要放在 try-with-resources 裡面
        // MockedStatic 必須要在 block 結束時,自動回收
        try (MockedStatic<DataUtils> utils = mockStatic(DataUtils.class)) {
            utils.when(DataUtils::getUuid).thenReturn("Custom UUID");
            assertEquals("Custom UUID", DataUtils.getUuid());
        }

        assertEquals("UUID", DataUtils.getUuid());
    }
  • mock 有參數的 static method
    @Test
    // mock 有參數的 static method
    public void mock_test2() {
        assertEquals("A:B", DataUtils.concat("A", "B"));

        // Mockito.mockStatic(Class<T> classToMock)
        try (MockedStatic<DataUtils> utils = mockStatic(DataUtils.class)) {
            utils.when(() -> DataUtils.concat("A", "B"))
                    .thenReturn( "A-B" );
            assertEquals("A-B", DataUtils.concat("A", "B"));
        }

        assertEquals("A:B", DataUtils.concat("A", "B"));
    }
  • MockitoException

static mocking is already registered in the current thread 的 exception

    @Test
    public void mock_test3() {
        // 將 utils 再一次 建立一個 mockStatic 物件時,會發生 exception
//        org.mockito.exceptions.base.MockitoException:
//        For mock.DataUtils, static mocking is already registered in the current thread
//        To create a new mock, the existing static mock registration must be deregistered
        assertThrows(MockitoException.class, () -> {
            MockedStatic<DataUtils> utils = mockStatic(DataUtils.class);
            utils = mockStatic(DataUtils.class);
        });
    }
  • JUnit @Before @After

利用 JUnit 的 @Before @After,在每一次執行 @Test 的前後,處理 MockedStatic

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

import static org.junit.Assert.*;
import static org.mockito.Mockito.mockStatic;

public class DataUtilsMockitoTest2 {
    private MockedStatic<DataUtils> mockStatic;

    // @Before 會在每一次 @Test 之前被執行
    @Before
    public void before() {
        // Registering a static mock for UserService before each test
        mockStatic = mockStatic(DataUtils.class);
    }

    // @After 會在每一次 @Test 之後被執行
    @After
    public void after() {
        // Closing the mockStatic after each test
        mockStatic.close();
    }

    @Test
    public void mock_test4_1() {
        assertTrue(Mockito.mockingDetails(DataUtils.class).isMock());
        mockStatic.when(() -> DataUtils.concat("A", "B"))
                .thenReturn( "A-B" );
        assertEquals("A-B", DataUtils.concat("A", "B"));
    }

    @Test
    public void mock_test4_2() {
        assertTrue(Mockito.mockingDetails(DataUtils.class).isMock());
        mockStatic.when(() -> DataUtils.concat("A", "B"))
                .thenReturn( "A*B" );
        assertEquals("A*B", DataUtils.concat("A", "B"));
    }
}

mock final class/method

先準備一個 final class

public final class DataUtilsFinal extends DataUtils {
    public static String concat(String a, String b) {
        return a+":"+b;
    }

    public String multiply(String a, int times) {
        return a.repeat( times-1 );
    }
}
  • final method
    @Test
    public void mock_test1() {
        // 直接用 mock 就可以使用 finalMethod
        DataUtils dataUtils = new DataUtils();
        DataUtils dataUtilsMock = mock(DataUtils.class);
        when(dataUtilsMock.finalMethod()).thenReturn(0);

        assertEquals(1, dataUtils.finalMethod());
        assertEquals(0, dataUtilsMock.finalMethod());
    }
  • final class
    @Test
    public void mock_test2() {
        // final class
        DataUtilsFinal mock = mock(DataUtilsFinal.class);
        when(mock.multiply("a", 2)).thenReturn("a");

        assertEquals("a", mock.multiply("a", 2));
    }

References

Mocking Static Methods With Mockito | Baeldung

Mock Final Classes and Methods with Mockito | Baeldung