老赵
发布于 2025-09-25 / 6 阅读
0
0

响应式开发笔记

在工作过程中,需要导入大量数据,并且对数据进行过滤、转换、导入等动作。为了高效的完成操作,我们可以使用响应式来处理。

    implementation('io.projectreactor:reactor-core:3.7.6')

相关概念介绍

Flux/Mono 流

响应式中最重要的概念之一,用来表示一条流水线。其中Flux支持流水线上有0到多个元素,而Mono只表示0到1个数据。

流水线上可以配置多个操作,让流入流水线上的每一个元素都能被处理。

创建一条流水线

  • 当我有一个或多个数据,可以直接使用Mono.just或Flux.just

  • 当我有一个迭代数据:

    • 一个集合或iterable:可以使用Flux.fromIterable方法

    • 一个数组:Flux.fromArray

    • 一个range:Flux.range

    • 一个Stream:Flux.fromStream

  • 一个参数给出的源:

    • 一个Supplier<T>:Mono.fromSupplier

    • 一个任务:Mono.fromCallable,Mono.fromRunnable

    • 一个CompletableFuture<T>: Mono.fromFuture

  • 一个可回收的资源:Flux.using或Mono.using

  • 可编程地生成事件(可以使用状态):

    • 同步且逐个的:Flux.generate

    • 异步(也可同步)的,每次尽可能多发出元素:Flux.createMono.create 也是异步的,只不过只能发一个)

类型转换

  • 1对1转换:

    • 任意转换 map

    • 类型转换 cast

    • 带序号的 index

  • 1对N转换: flatMap

  • 自由转换:handle,通过SynchronousSink可以发射

    • 0个元素:相当于过滤数据

    • 1个元素:相当于map

    • 多个元素:相当于拆分

    • 错误信号

    • 终止信号

数据过滤

  • 过滤一个序列

    • 条件过滤:filter

    • 异步地进行判断:filterWhen

    • 指定的类型对象:ofType

    • 忽略所有元素:ignoreElements

    • 去重:distinct

  • 只要一部分元素

    • 从第一个开始取n个:take

    • 从第一个开始取一段时间内的:take

    • 只要第一个元素:next

    • 取最后一个元素:takeLast

    • 直到满足某个条件:takeUntil包含,takeWhile不包含

    • 按序号取:elementAt

    • 跳过N个:skip

    • 跳过一段时间:skip

    • 跳过最后几个:skipLast

    • 按条件跳过:skipUntil包含,skipWhile不包含

  • 采样:

    • 给定周期:sample

窥视元素

  • 发了元素:doOnNext

  • 序列完成:doOnComplete,doOnSucess

  • 错误:doOnError

  • 取消:doOnCancel

  • 订阅时:doOnSubscribe

  • 请求时:doOnRequest

  • 结束:doFinally

拆分Flux

  • 以个数为界:window

  • 拆分为List:buffer

回到同步状态

  • 拿到第一个元素前阻塞:blockFirst

  • 拿到最后一个元素前阻塞:blockLast

  • 同步转换为迭代器:toIterable

  • 同步转换为流:toStream

异步处理

parallel(N):将流划分到多个线路并发处理

runOn(Schedulers.newParallel("filter", M),N):后续处理放到新的线路池中处理,并且定义M个线程,每次预取N个数据,其中M不能大于并发线路parallel(N),否则多于的无效

Sinks 水槽

支持两种模式:Sinks.Many,Sinks.One分别用于Flux与Mono

转换成流水线:asFlux,asMono

many().multicast():可以广播到多个订阅者

many().unicast():通知到单个订阅者

many().replay():支持重播

发射数据:

  • 同步发射一个数据:replaySink.emitNext(1, EmitFailureHandler.FAIL_FAST);

  • 异步发射一个数据:EmitResult result=replaySink.tryEmitNext(4);

  • 发射一个结束符:sinks.emitComplete();

发射状态:

  • OK:成功

  • FAIL_TERMINATED:因为sink已经完成而失败

  • FAIL_OVERFLOW:超过缓冲容量而失败

  • FAIL_CANCELLED:因被取消而失败

  • FAIL_NON_SERIALIZED:没有被序列化访问而失败,不能两个线程同时发射

  • FAIL_ZERO_SUBSCRIBER:没有任何订阅者而失败

案例介绍

package org.example;


import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

import java.io.IOException;

public class Main {


    public static void main(String[] args) throws IOException {

        ExcelFilter excelFilter = new ExcelFilter();
        Converter converter = new Converter();
        CsvWriter csvWriter = new CsvWriter();
        csvWriter.initCsvWriter();

        Sinks.Many<ExcelData> sinks = Sinks.many().multicast().onBackpressureBuffer(1000, true);
        sinks.asFlux()
                .parallel(10000)
                .runOn(Schedulers.newParallel("filter", 10000))
                .filter(excelData -> excelFilter.test(excelData))
                .runOn(Schedulers.newParallel("map", 10000))
                .map(excelData -> converter.convert(excelData))
                .sequential()

                .doOnNext(csvData -> {
                    csvWriter.writeToCsv(csvData);
                })

                .doOnComplete(() -> {
                    System.out.println(Thread.currentThread().getName() + " write success");
                    csvWriter.close();
                })
                .subscribe();
        // 读excel数据,并写入sinks
        ExcelReader excelReader = new ExcelReader();
        excelReader.startExcelReader("processed.xlsx", sinks);

        System.in.read();

    }


}

这是一个导入数据的处理案例,从processed.xlsx文件中读取数据,然后进行过滤、转换后写入新的csv文件的过程。

package org.example;

import com.alibaba.excel.EasyExcel;
import reactor.core.publisher.Sinks;

public class ExcelReader {
    public Thread startExcelReader(String excelPath, Sinks.Many<ExcelData> sink) {
        Thread thread = new Thread(() -> {
            try {
                EasyExcel.read(excelPath, ExcelData.class, new ReactiveReadListener(sink))
                        .autoCloseStream(true)
                        .sheet()
                        .doRead();
            } catch (Exception e) {
                sink.tryEmitError(new RuntimeException("Excel read failed", e));
            }
        });
        thread.setName("reader");

        EasyExcel.read(excelPath, ExcelData.class, new ReactiveReadListener(sink))
                .autoCloseStream(true)
                .sheet()
                .doRead();


        thread.start();
        return thread;

    }
}
package org.example;

import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.read.listener.ReadListener;
import reactor.core.publisher.Sinks;

public class ReactiveReadListener implements ReadListener<ExcelData> {

    long i = 0;
    private final Sinks.Many<ExcelData> sink;

    public ReactiveReadListener(Sinks.Many<ExcelData> sink) {
        this.sink = sink;
    }

    @Override
    public void invoke(ExcelData excelData, AnalysisContext analysisContext) {
        i++;
        if (i % 10000 == 0) {
            System.out.println(Thread.currentThread().getName() + " read " + i + " records");
        }
        Sinks.EmitResult emitResult = sink.tryEmitNext(excelData);
        while (emitResult.isFailure()) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(Thread.currentThread().getName() + " emit failed, retry after 1s" + i);
            emitResult = sink.tryEmitNext(excelData);
        }
    }

    @Override
    public void doAfterAllAnalysed(AnalysisContext analysisContext) {
        sink.tryEmitComplete();
    }
}
package org.example;

import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger;

public class CsvWriter {
    public void initCsvWriter() throws IOException {
        csvWriter = new BufferedWriter(
                new OutputStreamWriter(new FileOutputStream("csv.csv"), StandardCharsets.UTF_8),
                1024 * 1024 * 4
        );
    }

    private BufferedWriter csvWriter;
    private final AtomicInteger atomicInteger = new AtomicInteger(0);

    public synchronized void writeToCsv(CsvData data) {

        int count = atomicInteger.getAndIncrement();
        try {

            csvWriter.write(data.toCsvString());

            // 每1000条刷新一次(平衡性能和安全性)
            if (count % 10000 == 0) {
                csvWriter.flush();
                System.out.println(Thread.currentThread().getName() + " wrote " + count + " ");
            }

        } catch (
                IOException e) {
            throw new RuntimeException("CSV write failed", e);
        }
    }

    public void close() {
        System.out.println(Thread.currentThread().getName() + " close");
        try {
            csvWriter.flush();
            csvWriter.close();
        } catch (IOException e) {
            throw new RuntimeException("CSV close failed", e);
        }
    }
}


评论