在工作过程中,需要导入大量数据,并且对数据进行过滤、转换、导入等动作。为了高效的完成操作,我们可以使用响应式来处理。
implementation('io.projectreactor:reactor-core:3.7.6')相关概念介绍
Flux/Mono 流
响应式中最重要的概念之一,用来表示一条流水线。其中Flux支持流水线上有0到多个元素,而Mono只表示0到1个数据。
流水线上可以配置多个操作,让流入流水线上的每一个元素都能被处理。
创建一条流水线
当我有一个或多个数据,可以直接使用Mono.just或Flux.just
当我有一个迭代数据:
一个集合或iterable:可以使用Flux.fromIterable方法
一个数组:Flux.fromArray
一个range:Flux.range
一个Stream:Flux.fromStream
一个参数给出的源:
一个Supplier<T>:Mono.fromSupplier
一个任务:Mono.fromCallable,Mono.fromRunnable
一个CompletableFuture<T>: Mono.fromFuture
一个可回收的资源:Flux.using或Mono.using
可编程地生成事件(可以使用状态):
同步且逐个的:
Flux.generate异步(也可同步)的,每次尽可能多发出元素:
Flux.create(Mono.create也是异步的,只不过只能发一个)
类型转换
1对1转换:
任意转换 map
类型转换 cast
带序号的 index
1对N转换: flatMap
自由转换:handle,通过SynchronousSink可以发射
0个元素:相当于过滤数据
1个元素:相当于map
多个元素:相当于拆分
错误信号
终止信号
数据过滤
过滤一个序列
条件过滤:filter
异步地进行判断:filterWhen
指定的类型对象:ofType
忽略所有元素:ignoreElements
去重:distinct
只要一部分元素
从第一个开始取n个:take
从第一个开始取一段时间内的:take
只要第一个元素:next
取最后一个元素:takeLast
直到满足某个条件:takeUntil包含,takeWhile不包含
按序号取:elementAt
跳过N个:skip
跳过一段时间:skip
跳过最后几个:skipLast
按条件跳过:skipUntil包含,skipWhile不包含
采样:
给定周期:sample
窥视元素
发了元素:doOnNext
序列完成:doOnComplete,doOnSucess
错误:doOnError
取消:doOnCancel
订阅时:doOnSubscribe
请求时:doOnRequest
结束:doFinally
拆分Flux
以个数为界:window
拆分为List:buffer
回到同步状态
拿到第一个元素前阻塞:blockFirst
拿到最后一个元素前阻塞:blockLast
同步转换为迭代器:toIterable
同步转换为流:toStream
异步处理
parallel(N):将流划分到多个线路并发处理
runOn(Schedulers.newParallel("filter", M),N):后续处理放到新的线路池中处理,并且定义M个线程,每次预取N个数据,其中M不能大于并发线路parallel(N),否则多于的无效
Sinks 水槽
支持两种模式:Sinks.Many,Sinks.One分别用于Flux与Mono
转换成流水线:asFlux,asMono
many().multicast():可以广播到多个订阅者
many().unicast():通知到单个订阅者
many().replay():支持重播
发射数据:
同步发射一个数据:replaySink.emitNext(1, EmitFailureHandler.FAIL_FAST);
异步发射一个数据:EmitResult result=replaySink.tryEmitNext(4);
发射一个结束符:sinks.emitComplete();
发射状态:
OK:成功
FAIL_TERMINATED:因为sink已经完成而失败
FAIL_OVERFLOW:超过缓冲容量而失败
FAIL_CANCELLED:因被取消而失败
FAIL_NON_SERIALIZED:没有被序列化访问而失败,不能两个线程同时发射
FAIL_ZERO_SUBSCRIBER:没有任何订阅者而失败
案例介绍
package org.example;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import java.io.IOException;
public class Main {
public static void main(String[] args) throws IOException {
ExcelFilter excelFilter = new ExcelFilter();
Converter converter = new Converter();
CsvWriter csvWriter = new CsvWriter();
csvWriter.initCsvWriter();
Sinks.Many<ExcelData> sinks = Sinks.many().multicast().onBackpressureBuffer(1000, true);
sinks.asFlux()
.parallel(10000)
.runOn(Schedulers.newParallel("filter", 10000))
.filter(excelData -> excelFilter.test(excelData))
.runOn(Schedulers.newParallel("map", 10000))
.map(excelData -> converter.convert(excelData))
.sequential()
.doOnNext(csvData -> {
csvWriter.writeToCsv(csvData);
})
.doOnComplete(() -> {
System.out.println(Thread.currentThread().getName() + " write success");
csvWriter.close();
})
.subscribe();
// 读excel数据,并写入sinks
ExcelReader excelReader = new ExcelReader();
excelReader.startExcelReader("processed.xlsx", sinks);
System.in.read();
}
}
这是一个导入数据的处理案例,从processed.xlsx文件中读取数据,然后进行过滤、转换后写入新的csv文件的过程。
package org.example;
import com.alibaba.excel.EasyExcel;
import reactor.core.publisher.Sinks;
public class ExcelReader {
public Thread startExcelReader(String excelPath, Sinks.Many<ExcelData> sink) {
Thread thread = new Thread(() -> {
try {
EasyExcel.read(excelPath, ExcelData.class, new ReactiveReadListener(sink))
.autoCloseStream(true)
.sheet()
.doRead();
} catch (Exception e) {
sink.tryEmitError(new RuntimeException("Excel read failed", e));
}
});
thread.setName("reader");
EasyExcel.read(excelPath, ExcelData.class, new ReactiveReadListener(sink))
.autoCloseStream(true)
.sheet()
.doRead();
thread.start();
return thread;
}
}
package org.example;
import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.read.listener.ReadListener;
import reactor.core.publisher.Sinks;
public class ReactiveReadListener implements ReadListener<ExcelData> {
long i = 0;
private final Sinks.Many<ExcelData> sink;
public ReactiveReadListener(Sinks.Many<ExcelData> sink) {
this.sink = sink;
}
@Override
public void invoke(ExcelData excelData, AnalysisContext analysisContext) {
i++;
if (i % 10000 == 0) {
System.out.println(Thread.currentThread().getName() + " read " + i + " records");
}
Sinks.EmitResult emitResult = sink.tryEmitNext(excelData);
while (emitResult.isFailure()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + " emit failed, retry after 1s" + i);
emitResult = sink.tryEmitNext(excelData);
}
}
@Override
public void doAfterAllAnalysed(AnalysisContext analysisContext) {
sink.tryEmitComplete();
}
}
package org.example;
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger;
public class CsvWriter {
public void initCsvWriter() throws IOException {
csvWriter = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream("csv.csv"), StandardCharsets.UTF_8),
1024 * 1024 * 4
);
}
private BufferedWriter csvWriter;
private final AtomicInteger atomicInteger = new AtomicInteger(0);
public synchronized void writeToCsv(CsvData data) {
int count = atomicInteger.getAndIncrement();
try {
csvWriter.write(data.toCsvString());
// 每1000条刷新一次(平衡性能和安全性)
if (count % 10000 == 0) {
csvWriter.flush();
System.out.println(Thread.currentThread().getName() + " wrote " + count + " ");
}
} catch (
IOException e) {
throw new RuntimeException("CSV write failed", e);
}
}
public void close() {
System.out.println(Thread.currentThread().getName() + " close");
try {
csvWriter.flush();
csvWriter.close();
} catch (IOException e) {
throw new RuntimeException("CSV close failed", e);
}
}
}