背景
在生产开发过程中,经常会遇到一些处理大量消息的情况,如果使用分布式队列,会增加系统复杂性,也会严重影响系统执行效率。如果使用内存队列,一但消费者跟不上生产者的脚步就会导致内存爆增,甚至出现oom的问题。
如果有一种队列能支持将消息持久化到文件中,不占用堆内存就好了,于是在一顿搜索下找到到名为ChronicleQueue的队列。经测试效率确实很高,写入1000万条数据尽然在1秒内完成,但是被消费后的消息不会自动清理将导致文件占用很大。很快就把服务器硬盘消耗殆尽,要自动清理文件需要使用其收费版本,于时但萌生了一个自己造轮子的想法。
需求
先对这个文件队列启了一个非常直接的名字,就叫file-queue。我们期望他具有以下特性:
支持多线程生产或消费
一个文件队列只有一个实例(不能创建多个实例,避免实体间一致性问题)
一个队列可以有多个消费者
读写效率高,占用内存与磁盘空间少
支持多种不同的文件清理策略
支持可扩展的对象序列化
设计
为了支持一个文件队列只能创建一个实例(不管是不是同一个进程内),我们需要在创建实例的时候添加一个文件锁。内存锁不能解决不同进程的创建
要求读写效率高,我们将采用nio的文件映射对象来操作文件
要求占用内存少,我们将采用流式处理来写文件
抽象化文件清理策略和序列化工具
对文件结构进行精心设计:
索引文件:[4:segmentSize][8:writePosition][8:writeOffset][8:readPosition1][8:readPosition2][...]。segmentSize表示每个数据文件存放消息的条数,writePosition表示当前队列的写入位置,writeOffset表示写入点的文件位置,readPosition1-readPosition16存放了16个消费者当前消费的位置。
数据文件:[头信息1:[索引:8][写入位置:8][写入长度:4]]*segmentSize + [数据:[数据1][数据2]...[数据n]]。头信息包括当前数据的索引数、在当前文件的偏移值以及数据的长度。 根据segmentSize来制定头信息的长度。头信息后直接跟上数据内容。
开发
定义文件队列接口FileQueue
package com.zhaofujun.filequeue;
import java.io.Closeable;
import java.io.IOException;
public interface FileQueue<T> extends Closeable {
void pushWithSerialize(T t) throws IOException;
void push(T t) throws IOException;
T pop(int no) throws IOException;
void clear() throws IOException;
long size() throws IOException;
long unReadMessages(int no) throws IOException;
T get(long position) throws IOException;
T getTop(int no) throws IOException;
void deleteSegmentFile() ;
}
抽象清理段文件的策略DeleteSegmentStrategy
package com.zhaofujun.filequeue;
/**
* 删除分段文件的策略
*/
public abstract class DeleteSegmentStrategy {
protected FileQueue<?> fileQueue;
public void setFileQueue(FileQueue<?> fileQueue) {
this.fileQueue = fileQueue;
}
public abstract void start();
public abstract void stop();
protected void execute() {
System.out.println("start delete segment file:");
fileQueue.deleteSegmentFile();
}
}
定义序列化工具接口Serializer
package com.zhaofujun.filequeue;
public interface Serializer<T> {
byte[] serialize(T t);
T deserialize(byte[] bytes);
}
定义消息在段文件的信息SegmentInfo
package com.zhaofujun.filequeue;
import java.io.File;
public class SegmentInfo {
/**
* 在总队列中的位置
*/
private final long position;
/**
* 在段文件中的索引
*/
private final int index;
/**
* 段文件编号
*/
private final long segmentNumber;
/**
* 段文件
*/
private final File segmentFile;
public SegmentInfo(long position, int index, long segmentNumber, File segmentFile) {
this.position = position;
this.index = index;
this.segmentNumber = segmentNumber;
this.segmentFile = segmentFile;
}
public long getPosition() {
return position;
}
public int getIndex() {
return index;
}
public long getSegmentNumber() {
return segmentNumber;
}
public File getSegmentFile() {
return segmentFile;
}
@Override
public String toString() {
return "SegmentInfo{" +
"position=" + position +
", index=" + index +
", segmentNumber=" + segmentNumber +
", segmentFile=" + segmentFile +
'}';
}
}
定义消息头信息HeadInfo
package com.zhaofujun.filequeue;
import java.nio.ByteBuffer;
public class HeadInfo {
private final long position;
/**
* 偏移量
*/
private final long bodyOffset;
/**
* 消息大小
*/
private final int bodySize;
public long getBodyOffset() {
return bodyOffset;
}
public int getBodySize() {
return bodySize;
}
public long getPosition() {
return position;
}
public HeadInfo(long position, long bodyOffset, int bodySize) {
this.position = position;
this.bodyOffset = bodyOffset;
this.bodySize = bodySize;
}
public static final int SIZE = Long.BYTES + Long.BYTES + Integer.BYTES;
public byte[] toBytes() {
byte[] bytes = new byte[SIZE];
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
byteBuffer.putLong(position);
byteBuffer.putLong(bodyOffset);
byteBuffer.putInt(bodySize);
return bytes;
}
public static HeadInfo fromBytes(byte[] bytes) {
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
long position = byteBuffer.getLong();
long offset = byteBuffer.getLong();
int size = byteBuffer.getInt();
return new HeadInfo(position, offset, size);
}
@Override
public String toString() {
return "HeadInfo{" +
"position=" + position +
", bodyOffset=" + bodyOffset +
", bodySize=" + bodySize +
'}';
}
}
消息的完整信息DataInfo
package com.zhaofujun.filequeue;
public class DataInfo {
private byte[] data;
private HeadInfo headInfo;
private SegmentInfo segmentInfo;
public byte[] getData() {
return data;
}
public void setData(byte[] data) {
this.data = data;
}
public HeadInfo getHeadInfo() {
return headInfo;
}
public void setHeadInfo(HeadInfo headInfo) {
this.headInfo = headInfo;
}
public SegmentInfo getSegmentInfo() {
return segmentInfo;
}
public void setSegmentInfo(SegmentInfo segmentInfo) {
this.segmentInfo = segmentInfo;
}
@Override
public String toString() {
return "DataInfo{" +
"headInfo=" + headInfo +
", segmentInfo=" + segmentInfo +
'}';
}
}
写消息时需要记录的信息WriteNumRef,WriteInfo
package com.zhaofujun.filequeue;
public class WriteInfo {
private long num;
private long offset;
public WriteInfo(long num, long offset) {
this.num = num;
this.offset = offset;
}
public long getNum() {
return num;
}
public long getOffset() {
return offset;
}
public WriteInfo newWrite(long dataLength) {
// return new WriteInfo(num + 1, offset + dataLength);
return new WriteInfo(num+1, offset + dataLength);
}
@Override
public String toString() {
return "WriteInfo{" +
"num=" + num +
", offset=" + offset +
'}';
}
}
package com.zhaofujun.filequeue;
import java.util.concurrent.atomic.AtomicReference;
public class WriteNumRef {
private final AtomicReference<WriteInfo> writeNumAtomicReference = new AtomicReference<>();
public WriteNumRef(long num, long offset) {
this.writeNumAtomicReference.set(new WriteInfo(num, offset));
}
public WriteInfo getAndSet(int bytesLength, SegmentContext segmentContext) {
WriteInfo writeInfo = writeNumAtomicReference.get();
WriteInfo newWriteInfo;
SegmentInfo segmentInfo = segmentContext.calculateSegmentInfo(writeInfo.getNum() + 1);
if (segmentInfo.getIndex() == 0) {
newWriteInfo = new WriteInfo(segmentInfo.getPosition(), segmentContext.calculateFirstOffset());
} else {
newWriteInfo = new WriteInfo(segmentInfo.getPosition(), writeInfo.getOffset() + bytesLength);
}
if (writeNumAtomicReference.compareAndSet(writeInfo, newWriteInfo)) {
return writeInfo;
}
return getAndSet(bytesLength, segmentContext);
}
public WriteInfo getNumRef() {
return writeNumAtomicReference.get();
}
public long getNum() {
return writeNumAtomicReference.get().getNum();
}
}
其中getAndSet这个方法非常重要,可以防止两个线程写入文件。
读消息时需要记录的信息ReadNums
package com.zhaofujun.filequeue;
import java.util.concurrent.atomic.AtomicLong;
public class ReadNums {
public int SIZE = 16;
private final AtomicLong[] nums = new AtomicLong[SIZE];
public ReadNums(long[] nums) {
for (int i = 0; i < SIZE; i++) {
this.nums[i] = new AtomicLong(nums[i]);
}
}
public long[] getNums() {
long[] positions = new long[SIZE];
for (int i = 0; i < SIZE; i++) {
positions[i] = this.nums[i].get();
}
return positions;
}
public boolean compareAndSet(int no, long expect) {
return nums[no].compareAndSet(expect, expect + 1);
}
public long get(int no) {
return nums[no].get();
}
}
基础文件映射器BaseFileMapper
package com.zhaofujun.filequeue;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.MappedByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
public abstract class BaseFileMapper {
protected final String baseDir;
protected int segmentSize;
protected final Method unMapMethod;
public BaseFileMapper(String baseDir, int segmentSize) throws IOException {
this.baseDir = baseDir;
this.segmentSize = segmentSize;
Files.createDirectories(Paths.get(baseDir));
try {
Class<?> fileChannelImplClass = Class.forName("sun.nio.ch.FileChannelImpl");
this.unMapMethod = fileChannelImplClass.getDeclaredMethod("unmap", MappedByteBuffer.class);
this.unMapMethod.setAccessible(true);
} catch (Exception e) {
throw new RuntimeException("初始化 unmap 方法失败", e);
}
}
protected void release(MappedByteBuffer mappedByteBuffer) {
if (mappedByteBuffer != null) {
try {
this.unMapMethod.invoke(null, mappedByteBuffer);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
}
// 计算片段文件
protected SegmentInfo calculateSegmentInfo(long position) {
long segNumber = position / segmentSize;
int index = (int) (position % segmentSize);
Path segmentFile = getSegmentFile(segNumber);
return new SegmentInfo(position, index, segNumber, segmentFile.toFile());
}
protected Path getSegmentFile(long segNumber) {
return Paths.get(baseDir, "segment_" + segNumber + ".txt");
}
public abstract void release() throws InvocationTargetException, IllegalAccessException;
public int getSegmentSize() {
return segmentSize;
}
public long calculateFirstOffset() {
return (long) HeadInfo.SIZE * segmentSize;
}
}
状态文件映射器StatusFileMapper
package com.zhaofujun.filequeue;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
public class StatusFileMapper extends BaseFileMapper {
private final WriteNumRef writeNumRef;
private final ReadNums readNums;
private final MappedByteBuffer statusMappedByteBuffer;
public int SIZE = Integer.BYTES + Long.BYTES + Long.BYTES + Long.BYTES * 16;
private MappedByteBuffer getStatusMappedByteBuffer(String baseDir) throws IOException {
try (RandomAccessFile randomAccessFile = new RandomAccessFile(getStatusMappedFile(baseDir), "rw");
FileChannel channel = randomAccessFile.getChannel()) {
return channel.map(FileChannel.MapMode.READ_WRITE, 0, SIZE);
}
}
private File getStatusMappedFile(String baseDir) {
return Paths.get(baseDir, "status.rw").toFile();
}
public StatusFileMapper(String baseDir, int segmentSize) throws IOException {
super(baseDir, segmentSize);
//系统预置16个readPosition
//位置信息存放的内容:[4:segmentSize][8:writePosition][8:writeOffset][8:readPosition1][8:readPosition2]//如果有多个readPosition,依次类推
if (getStatusMappedFile(baseDir).exists()) {
//文件存在,则读取文件中的WritePosition信息
this.statusMappedByteBuffer = getStatusMappedByteBuffer(baseDir);
//读取文件中的segmentSize信息,忽略传入的segmentSize
this.segmentSize = statusMappedByteBuffer.getInt(0);
} else {
//文件不存在
this.statusMappedByteBuffer = getStatusMappedByteBuffer(baseDir);
//写入segmentSize信息
statusMappedByteBuffer.putInt(0, segmentSize);
statusMappedByteBuffer.putLong(12, calculateFirstOffset());
}
//初始化写指针的信息
//从4开始读,读出两个Long,分别是segmentSize和writePosition
long writePosition = statusMappedByteBuffer.getLong(4);
long writeOffset = statusMappedByteBuffer.getLong(12);
//读取文件中的writePosition信息
this.writeNumRef = new WriteNumRef(writePosition, writeOffset);
//初始化readNums
//从20开始读,读16个readPosition
long[] readPositions = new long[16];
for (int i = 0; i < 16; i++) {
readPositions[i] = this.statusMappedByteBuffer.getLong(20 + i * Long.BYTES);
}
this.readNums = new ReadNums(readPositions);
}
public DataInfo preallocate(byte[] bytes, SegmentFileMapper segmentContext) {
DataInfo dataInfo = new DataInfo();
//包装数据
dataInfo.setData(bytes);
//包装数据信息的头信息
WriteInfo writeInfo = writeNumRef.getAndSet(bytes.length, segmentContext);
dataInfo.setHeadInfo(new HeadInfo(writeInfo.getNum(), writeInfo.getOffset(), dataInfo.getData().length));
//包装数据信息的片段信息
SegmentInfo segmentInfo = segmentContext.calculateSegmentInfo(writeInfo.getNum());
dataInfo.setSegmentInfo(segmentInfo);
return dataInfo;
}
public void release() {
release(statusMappedByteBuffer);
}
public void updateWritePosition() {
WriteInfo writeInfo = writeNumRef.getNumRef();
statusMappedByteBuffer.putLong(4, writeInfo.getNum());
statusMappedByteBuffer.putLong(12, writeInfo.getOffset());
}
public void updateReadPosition(int no) {
long readNum = readNums.get(no);
statusMappedByteBuffer.putLong(20 + no * Long.BYTES, readNum);
}
public WriteNumRef getWriteNumRef() {
return writeNumRef;
}
public ReadNums getReadNums() {
return readNums;
}
}
段文件映射器SegmentFileMapper
package com.zhaofujun.filequeue;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.reflect.InvocationTargetException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
public class SegmentFileMapper extends BaseFileMapper {
protected int elementSize;
private final RandomAccessFile lockRandomAccessFile;
private final FileChannel lockFileChannel;
private FileLock lock;
private Path getLockFilePath() {
return Paths.get(baseDir, ".lock");
}
public SegmentFileMapper(String baseDir, int segmentSize) throws IOException {
//数据文件格式[头信息1:[索引:8][写入位置:8][写入长度:4]]*segmentSize + [数据:[数据1][数据2]...[数据n]]
super(baseDir, segmentSize);
this.lockRandomAccessFile = new RandomAccessFile(getLockFilePath().toFile(), "rw");
this.lockFileChannel = this.lockRandomAccessFile.getChannel();
this.lock = lockFileChannel.lock();
}
private final Map<Long, MappedByteBuffer> bufferMap = new HashMap<>();
private MappedByteBuffer getSegmentBuffer(SegmentInfo segmentInfo, FileMode mode) {
MappedByteBuffer mappedByteBuffer = bufferMap.get(segmentInfo.getSegmentNumber());
if (mappedByteBuffer != null)
return mappedByteBuffer;
if (!segmentInfo.getSegmentFile().exists()) {
//检查文件不存在时,根据模式进行处理
if (mode == FileMode.WRITE) {
long size = calculateFirstOffset() + elementSize * segmentSize * 2L;
try (RandomAccessFile randomAccessFile = new RandomAccessFile(segmentInfo.getSegmentFile(), "rw");
FileChannel channel = randomAccessFile.getChannel()) {
mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, size);
bufferMap.put(segmentInfo.getSegmentNumber(), mappedByteBuffer);
} catch (IOException e) {
throw new RuntimeException("create segment file failed in mode " + mode, e);
}
} else {
throw new RuntimeException("File not exist in mode " + mode + " " + segmentInfo.getSegmentFile());
}
} else {
//如果文件存在,则映射文件到内存
try (RandomAccessFile randomAccessFile = new RandomAccessFile(segmentInfo.getSegmentFile(), "rw");
FileChannel channel = randomAccessFile.getChannel()) {
mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, channel.size());
bufferMap.put(segmentInfo.getSegmentNumber(), mappedByteBuffer);
} catch (IOException e) {
throw new RuntimeException("map segment file failed in mode " + mode, e);
}
}
return mappedByteBuffer;
}
//扩容
private void expandSegmentBuffer(SegmentInfo segmentInfo) {
//根据当前文件的大小与写入位置的比例进行扩容
try (RandomAccessFile randomAccessFile = new RandomAccessFile(segmentInfo.getSegmentFile(), "rw");
FileChannel channel = randomAccessFile.getChannel()) {
long fileSize = (long) (segmentSize / segmentInfo.getIndex()) * elementSize * 2 + channel.size();
MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, fileSize);
bufferMap.put(segmentInfo.getSegmentNumber(), mappedByteBuffer);
} catch (IOException e) {
throw new RuntimeException("expand segment file failed", e);
}
}
public long put(DataInfo info) {
SegmentInfo segmentInfo = info.getSegmentInfo();
// 从缓冲区中获取片段
this.elementSize = info.getHeadInfo().getBodySize();
MappedByteBuffer mappedByteBuffer = getSegmentBuffer(segmentInfo, FileMode.WRITE);
//检查当前缓冲区是否足够写入数据
if (info.getHeadInfo().getBodyOffset() + info.getHeadInfo().getBodySize() > mappedByteBuffer.capacity()) {
// 缓冲区不足,需要扩容文件
expandSegmentBuffer(segmentInfo);
mappedByteBuffer = getSegmentBuffer(segmentInfo, FileMode.WRITE);
}
// 写入头数据
int headerOffset = segmentInfo.getIndex() * HeadInfo.SIZE;
mappedByteBuffer.put(headerOffset, info.getHeadInfo().toBytes());
// 写入数据
mappedByteBuffer.put((int) (info.getHeadInfo().getBodyOffset()), info.getData());
return info.getHeadInfo().getPosition();
}
public byte[] get(SegmentInfo segmentInfo) {
MappedByteBuffer mappedByteBuffer = getSegmentBuffer(segmentInfo, FileMode.READ);
int headerOffset = segmentInfo.getIndex() * HeadInfo.SIZE;
//先读头数据
byte[] headBytes = new byte[HeadInfo.SIZE];
mappedByteBuffer.get(headerOffset, headBytes);
HeadInfo headInfo = HeadInfo.fromBytes(headBytes);
//读数据
byte[] dataBytes = new byte[headInfo.getBodySize()];
mappedByteBuffer.get((int) headInfo.getBodyOffset(), dataBytes);
return dataBytes;
}
public void release() throws InvocationTargetException, IllegalAccessException {
try {
this.lock.release();
this.lockFileChannel.close();
this.lockRandomAccessFile.close();
} catch (IOException e) {
//已经关闭
System.out.println("lock file already closed");
}
for (MappedByteBuffer mappedByteBuffer : bufferMap.values()) {
release(mappedByteBuffer);
}
bufferMap.clear();
//释放策略
}
public void deleteSegmentFile(long segmentNumber) throws IOException {
//释放segmentNumber对应的缓冲区
MappedByteBuffer mappedByteBuffer = bufferMap.get(segmentNumber);
if (mappedByteBuffer != null) {
release(mappedByteBuffer);
bufferMap.remove(segmentNumber);
}
Path segmentFile = getSegmentFile(segmentNumber);
if (Files.exists(segmentFile)) {
Files.delete(segmentFile);
}
}
}
文件队列实现FileQueueWithReactor
package com.zhaofujun.filequeue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
public class FileQueueWithReactor<T> implements FileQueue<T> {
Logger logger = LoggerFactory.getLogger(FileQueueWithReactor.class);
private final Sinks.Many<T> sinks = Sinks.many().unicast().onBackpressureBuffer();
private final StatusFileMapper globalContext;
private final Serializer<T> serializer;
private final SegmentFileMapper segmentContext;
private final DeleteSegmentStrategy deleteSegmentStrategy;
private final Runnable onClose;
FileQueueWithReactor(String baseDir, int segmentSize, DeleteSegmentStrategy deleteSegmentStrategy, Serializer<T> serializer, Runnable onClose) throws IOException {
this.segmentContext = new SegmentFileMapper(baseDir, segmentSize);
this.globalContext = new StatusFileMapper(baseDir, segmentSize);
this.deleteSegmentStrategy = deleteSegmentStrategy;
this.deleteSegmentStrategy.setFileQueue(this);
this.deleteSegmentStrategy.start();
this.serializer = serializer;
this.onClose = onClose;
this.sinks.asFlux()
//序列化数据
.map(p -> {
return serializer.serialize(p);
})
//配置headInfo和segmentInfo
.map(bytes -> {
return globalContext.preallocate(bytes, segmentContext);
})
//写入文件
.map(segmentContext::put)
//写入位置信息到头文件
.doOnNext(position -> {
globalContext.updateWritePosition();
})
.doOnError(throwable -> {
logger.error("write error", throwable);
})
.subscribe(da -> {
}, throwable -> {
logger.error("write error", throwable);
}, () -> {
logger.info("write complete");
});
}
@Override
public void pushWithSerialize(T t) throws IOException {
Schedulers.single().schedule(() -> {
Sinks.EmitResult emitResult = sinks.tryEmitNext(t);
if (emitResult != Sinks.EmitResult.OK) {
try {
Thread.sleep(100);
push(t);
} catch (InterruptedException | IOException e) {
throw new RuntimeException(e);
}
}
}
);
}
@Override
public void push(T t) throws IOException {
Sinks.EmitResult emitResult = sinks.tryEmitNext(t);
if (emitResult != Sinks.EmitResult.OK) {
try {
Thread.sleep(100);
push(t);
} catch (InterruptedException | IOException e) {
throw new RuntimeException(e);
}
}
}
@Override
public T pop(int no) throws IOException {
// 从全局上下文获取当前读指针
long readNum = globalContext.getReadNums().get(no);
long writeNum = globalContext.getWriteNumRef().getNum();
if (readNum >= writeNum) {
return null;
}
// 更新读指针
if (globalContext.getReadNums().compareAndSet(no, readNum)) {
// 检查当前位置是否有效,有效则读取,无效则递归调用
T t = get(readNum);
//更新文件中的位置信息
globalContext.updateReadPosition(no);
return t;
} else {
return pop(no);
}
}
@Override
public void clear() throws IOException {
}
@Override
public long size() throws IOException {
return globalContext.getWriteNumRef().getNumRef().getNum() + 1;
}
@Override
public long unReadMessages(int no) throws IOException {
long writeNum = globalContext.getWriteNumRef().getNumRef().getNum();
long readNum = globalContext.getReadNums().get(no);
return writeNum - readNum;
}
@Override
public T get(long position) throws IOException {
SegmentInfo segmentInfo = segmentContext.calculateSegmentInfo(position);
byte[] bytes = segmentContext.get(segmentInfo);
return serializer.deserialize(bytes);
}
@Override
public T getTop(int no) throws IOException {
// 从全局上下文获取当前读指针
long readPosition = globalContext.getReadNums().get(no);
// 从文件中读取数据
T t = get(readPosition);
return t;
}
@Override
public void deleteSegmentFile() {
//根据读写指针,找到已经完成读取的片段文件,将其删除
long[] readPositions = globalContext.getReadNums().getNums();
// 找到除0以外的最小值
Arrays.stream(readPositions).filter(num -> num != 0).min().ifPresent(min -> {
// 删除所有小于min的片段文件
//读取最小位置的片段文件,将其删除
SegmentInfo segmentInfo = segmentContext.calculateSegmentInfo(min);
long readSegmentIndex = segmentInfo.getSegmentNumber();
for (long i = 0; i < readSegmentIndex; i++) {
try {
segmentContext.deleteSegmentFile(i);
} catch (IOException e) {
logger.warn("delete segment file failed:{}", e);
}
}
});
}
@Override
public void close() throws IOException {
//关闭删除策略
deleteSegmentStrategy.stop();
try {
segmentContext.release();
globalContext.release();
} catch (InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException(e);
}
deleteSegmentFile();
// 执行关闭回调
if (onClose != null) {
onClose.run();
}
}
}
内置的序列化工具与周期清除策略KryoSerializerV2,PeriodDeleteSegmentStrategy
package com.zhaofujun.filequeue;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
/**
* Kryo序列化工具类(适用于Java 8)
* 注意:Kryo实例非线程安全,通过ThreadLocal确保线程隔离
*/
public class KryoSerializerV2<T> implements Serializer<T> {
private final Class<T> clazz;
// 线程局部变量:每个线程持有一个Kryo实例
private final ThreadLocal<Kryo> KRYO_THREAD_LOCAL = new ThreadLocal<Kryo>() {
@Override
protected Kryo initialValue() {
Kryo kryo = new Kryo();
// 配置:关闭默认的类注册验证(提升性能,Java 8中无需严格验证)
kryo.setRegistrationRequired(false);
kryo.register(clazz);
return kryo;
}
};
public KryoSerializerV2(Class<T> clazz) {
this.clazz = clazz;
}
/**
* 序列化:将对象转换为字节数组
*
* @param obj 待序列化的对象(不能为null)
* @return 序列化后的字节数组
*/
public byte[] serialize(Object obj) {
if (obj == null) {
throw new IllegalArgumentException("序列化对象不能为null");
}
// 使用Kryo的Output流写入字节
try (ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
Output output = new Output(byteOut)) {
// 获取当前线程的Kryo实例
Kryo kryo = KRYO_THREAD_LOCAL.get();
// 写入对象(自动处理类型信息)
kryo.writeObject(output, obj);
output.flush();
return byteOut.toByteArray();
} catch (Exception e) {
throw new RuntimeException("Kryo序列化失败", e);
}
}
/**
* 反序列化:将字节数组转换为对象
*
* @param data 序列化后的字节数组(不能为null或空)
* @return 反序列化后的对象
*/
@Override
public T deserialize(byte[] data) {
if (data == null || data.length == 0) {
throw new IllegalArgumentException("反序列化字节数组不能为null或空");
}
if (clazz == null) {
throw new IllegalArgumentException("目标类不能为null");
}
// 使用Kryo的Input流读取字节
try (ByteArrayInputStream byteIn = new ByteArrayInputStream(data);
Input input = new Input(byteIn)) {
// 获取当前线程的Kryo实例
Kryo kryo = KRYO_THREAD_LOCAL.get();
// 读取对象(指定目标类)
return kryo.readObject(input, clazz);
} catch (Exception e) {
throw new RuntimeException("Kryo反序列化失败", e);
}
}
// 测试用的POJO类
static class User {
private Long id;
private String name;
private int age;
private String email;
// 必须有默认构造函数(Kryo反序列化需要)
public User() {
}
public User(Long id, String name, int age, String email) {
this.id = id;
this.name = name;
this.age = age;
this.email = email;
}
@Override
public String toString() {
return "User{id=" + id + ", name='" + name + "', age=" + age + ", email='" + email + "'}";
}
// getter和setter(可选,Kryo可直接访问字段)
public Long getId() {
return id;
}
public String getName() {
return name;
}
public int getAge() {
return age;
}
public String getEmail() {
return email;
}
}
}
package com.zhaofujun.filequeue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 周期性删除分段文件的策略
*/
public class PeriodDeleteSegmentStrategy extends DeleteSegmentStrategy {
private final int period;
private final TimeUnit timeUnit;
public PeriodDeleteSegmentStrategy(int period, TimeUnit timeUnit) {
this.period = period;
this.timeUnit = timeUnit;
}
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
@Override
public void start() {
scheduler.scheduleAtFixedRate(this::execute, 0, period, timeUnit);
}
@Override
public void stop() {
scheduler.shutdown();
}
}
定义一个工厂方法来获得实例FileQueueFactory
package com.zhaofujun.filequeue;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
public final class FileQueueFactory {
private static final Map<String, FileQueue<?>> fileQueueMap = new ConcurrentHashMap<>();
public static <U> FileQueue<U> createFileQueue(String basePath, int elementSize, Serializer<U> serializer, DeleteSegmentStrategy deleteSegmentStrategy) throws IOException {
FileQueue<U> fileQueue = new FileQueueWithReactor<>(basePath, elementSize, deleteSegmentStrategy, serializer, () -> removeFileQueue(basePath));
fileQueueMap.put(basePath, fileQueue);
return fileQueue;
}
public static <U> FileQueue<U> createFileQueue(String basePath, int elementSize, Serializer<U> serializer) throws IOException {
FileQueue<U> fileQueue = new FileQueueWithReactor<>(basePath, elementSize, new PeriodDeleteSegmentStrategy(1, TimeUnit.HOURS), serializer, () -> removeFileQueue(basePath));
fileQueueMap.put(basePath, fileQueue);
return fileQueue;
}
public static <U> FileQueue<U> load(String basePath) throws IOException {
FileQueue<U> fileQueue = (FileQueue<U>) fileQueueMap.get(basePath);
if (fileQueue == null) {
throw new IOException("file queue not found");
}
return fileQueue;
}
private static void removeFileQueue(String basePath) {
fileQueueMap.remove(basePath);
}
}