老赵
发布于 2025-10-24 / 2 阅读
0
0

File-Queue设计与开发

背景

在生产开发过程中,经常会遇到一些处理大量消息的情况,如果使用分布式队列,会增加系统复杂性,也会严重影响系统执行效率。如果使用内存队列,一但消费者跟不上生产者的脚步就会导致内存爆增,甚至出现oom的问题。

如果有一种队列能支持将消息持久化到文件中,不占用堆内存就好了,于是在一顿搜索下找到到名为ChronicleQueue的队列。经测试效率确实很高,写入1000万条数据尽然在1秒内完成,但是被消费后的消息不会自动清理将导致文件占用很大。很快就把服务器硬盘消耗殆尽,要自动清理文件需要使用其收费版本,于时但萌生了一个自己造轮子的想法。

需求

先对这个文件队列启了一个非常直接的名字,就叫file-queue。我们期望他具有以下特性:

  1. 支持多线程生产或消费

  2. 一个文件队列只有一个实例(不能创建多个实例,避免实体间一致性问题)

  3. 一个队列可以有多个消费者

  4. 读写效率高,占用内存与磁盘空间少

  5. 支持多种不同的文件清理策略

  6. 支持可扩展的对象序列化

设计

  1. 为了支持一个文件队列只能创建一个实例(不管是不是同一个进程内),我们需要在创建实例的时候添加一个文件锁。内存锁不能解决不同进程的创建

  2. 要求读写效率高,我们将采用nio的文件映射对象来操作文件

  3. 要求占用内存少,我们将采用流式处理来写文件

  4. 抽象化文件清理策略和序列化工具

  5. 对文件结构进行精心设计:

    1. 索引文件:[4:segmentSize][8:writePosition][8:writeOffset][8:readPosition1][8:readPosition2][...]。segmentSize表示每个数据文件存放消息的条数,writePosition表示当前队列的写入位置,writeOffset表示写入点的文件位置,readPosition1-readPosition16存放了16个消费者当前消费的位置。

    2. 数据文件:[头信息1:[索引:8][写入位置:8][写入长度:4]]*segmentSize + [数据:[数据1][数据2]...[数据n]]。头信息包括当前数据的索引数、在当前文件的偏移值以及数据的长度。 根据segmentSize来制定头信息的长度。头信息后直接跟上数据内容。

开发

定义文件队列接口FileQueue

package com.zhaofujun.filequeue;

import java.io.Closeable;
import java.io.IOException;

public interface FileQueue<T> extends Closeable {
    void pushWithSerialize(T t) throws IOException;

    void push(T t) throws IOException;

    T pop(int no) throws IOException;

    void clear() throws IOException;

    long size() throws IOException;

    long unReadMessages(int no) throws IOException;

    T get(long position) throws IOException;

    T getTop(int no) throws IOException;

    void deleteSegmentFile()  ;


}

抽象清理段文件的策略DeleteSegmentStrategy

package com.zhaofujun.filequeue;

/**
 * 删除分段文件的策略
 */
public abstract class DeleteSegmentStrategy {
    protected FileQueue<?> fileQueue;

    public void setFileQueue(FileQueue<?> fileQueue) {
        this.fileQueue = fileQueue;
    }

    public abstract void start();
    public abstract void stop();

    protected void execute() {
        System.out.println("start delete segment file:");
        fileQueue.deleteSegmentFile();
    }

}

定义序列化工具接口Serializer

package com.zhaofujun.filequeue;

public interface Serializer<T> {
    byte[] serialize(T t);

    T deserialize(byte[] bytes);
}

定义消息在段文件的信息SegmentInfo

package com.zhaofujun.filequeue;

import java.io.File;


public class SegmentInfo {

    /**
     * 在总队列中的位置
     */
    private final long position;
    /**
     * 在段文件中的索引
     */
    private final int index;
    /**
     * 段文件编号
     */
    private final long segmentNumber;
    /**
     * 段文件
     */
    private final File segmentFile;

    public SegmentInfo(long position, int index, long segmentNumber, File segmentFile) {
        this.position = position;
        this.index = index;
        this.segmentNumber = segmentNumber;
        this.segmentFile = segmentFile;
    }

    public long getPosition() {
        return position;
    }

    public int getIndex() {
        return index;
    }

    public long getSegmentNumber() {
        return segmentNumber;
    }

    public File getSegmentFile() {
        return segmentFile;
    }


        @Override
        public String toString() {
            return "SegmentInfo{" +
                    "position=" + position +
                    ", index=" + index +
                    ", segmentNumber=" + segmentNumber +
                    ", segmentFile=" + segmentFile +
                    '}';
        }
}

定义消息头信息HeadInfo

package com.zhaofujun.filequeue;

import java.nio.ByteBuffer;

public class HeadInfo {
    private final long position;
    /**
     * 偏移量
     */
    private final long bodyOffset;
    /**
     * 消息大小
     */
    private final int bodySize;

    public long getBodyOffset() {
        return bodyOffset;
    }

    public int getBodySize() {
        return bodySize;
    }

    public long getPosition() {
        return position;
    }

    public HeadInfo(long position, long bodyOffset, int bodySize) {
        this.position = position;
        this.bodyOffset = bodyOffset;
        this.bodySize = bodySize;
    }

    public static final int SIZE = Long.BYTES + Long.BYTES + Integer.BYTES;

    public byte[] toBytes() {
        byte[] bytes = new byte[SIZE];
        ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
        byteBuffer.putLong(position);
        byteBuffer.putLong(bodyOffset);
        byteBuffer.putInt(bodySize);
        return bytes;
    }

    public static HeadInfo fromBytes(byte[] bytes) {
        ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
        long position = byteBuffer.getLong();
        long offset = byteBuffer.getLong();
        int size = byteBuffer.getInt();
        return new HeadInfo(position, offset, size);
    }
        @Override
        public String toString() {
            return "HeadInfo{" +
                    "position=" + position +
                    ", bodyOffset=" + bodyOffset +
                    ", bodySize=" + bodySize +
                    '}';
        }
}

消息的完整信息DataInfo

package com.zhaofujun.filequeue;


public class DataInfo {
    private byte[] data;
    private HeadInfo headInfo;
    private SegmentInfo segmentInfo;

    public byte[] getData() {
        return data;
    }

    public void setData(byte[] data) {
        this.data = data;
    }

    public HeadInfo getHeadInfo() {
        return headInfo;
    }

    public void setHeadInfo(HeadInfo headInfo) {
        this.headInfo = headInfo;
    }

    public SegmentInfo getSegmentInfo() {
        return segmentInfo;
    }

    public void setSegmentInfo(SegmentInfo segmentInfo) {
        this.segmentInfo = segmentInfo;
    }

    @Override
    public String toString() {
        return "DataInfo{" +
                "headInfo=" + headInfo +
                ", segmentInfo=" + segmentInfo +
                '}';
    }
}

写消息时需要记录的信息WriteNumRef,WriteInfo

package com.zhaofujun.filequeue;

public class WriteInfo {

    private  long num;
    private  long offset;

    public WriteInfo(long num, long offset) {
        this.num = num;
        this.offset = offset;
    }

    public long getNum() {
        return num;
    }

    public long getOffset() {
        return offset;
    }


    public WriteInfo newWrite(long dataLength) {
//        return new WriteInfo(num + 1, offset + dataLength);
        return new WriteInfo(num+1, offset + dataLength);
    }

        @Override
        public String toString() {
            return "WriteInfo{" +
                    "num=" + num +
                    ", offset=" + offset +
                    '}';
        }
}

package com.zhaofujun.filequeue;

import java.util.concurrent.atomic.AtomicReference;

public class WriteNumRef {
    private final AtomicReference<WriteInfo> writeNumAtomicReference = new AtomicReference<>();


    public WriteNumRef(long num, long offset) {
        this.writeNumAtomicReference.set(new WriteInfo(num, offset));
    }

    public WriteInfo getAndSet(int bytesLength, SegmentContext segmentContext) {
        WriteInfo writeInfo = writeNumAtomicReference.get();
        WriteInfo newWriteInfo;

        SegmentInfo segmentInfo = segmentContext.calculateSegmentInfo(writeInfo.getNum() + 1);
        if (segmentInfo.getIndex() == 0) {
            newWriteInfo = new WriteInfo(segmentInfo.getPosition(), segmentContext.calculateFirstOffset());
        } else {
            newWriteInfo = new WriteInfo(segmentInfo.getPosition(), writeInfo.getOffset() + bytesLength);
        }
        if (writeNumAtomicReference.compareAndSet(writeInfo, newWriteInfo)) {
            return writeInfo;
        }
        return getAndSet(bytesLength, segmentContext);
    }

    public WriteInfo getNumRef() {
        return writeNumAtomicReference.get();
    }

    public long getNum() {
        return writeNumAtomicReference.get().getNum();
    }

}

其中getAndSet这个方法非常重要,可以防止两个线程写入文件。

读消息时需要记录的信息ReadNums

package com.zhaofujun.filequeue;

import java.util.concurrent.atomic.AtomicLong;

public class ReadNums {

    public int SIZE = 16;
    private final AtomicLong[] nums = new AtomicLong[SIZE];

    public ReadNums(long[] nums) {
        for (int i = 0; i < SIZE; i++) {
            this.nums[i] = new AtomicLong(nums[i]);
        }
    }

    public long[] getNums() {
        long[] positions = new long[SIZE];
        for (int i = 0; i < SIZE; i++) {
            positions[i] = this.nums[i].get();
        }
        return positions;
    }

    public boolean compareAndSet(int no, long expect) {
        return nums[no].compareAndSet(expect, expect + 1);
    }

    public long get(int no) {
        return nums[no].get();
    }
}

基础文件映射器BaseFileMapper

package com.zhaofujun.filequeue;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.MappedByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

public abstract class BaseFileMapper {
    protected final String baseDir;
    protected int segmentSize;
    protected final Method unMapMethod;

    public BaseFileMapper(String baseDir, int segmentSize) throws IOException {
        this.baseDir = baseDir;
        this.segmentSize = segmentSize;

        Files.createDirectories(Paths.get(baseDir));

        try {
            Class<?> fileChannelImplClass = Class.forName("sun.nio.ch.FileChannelImpl");
            this.unMapMethod = fileChannelImplClass.getDeclaredMethod("unmap", MappedByteBuffer.class);
            this.unMapMethod.setAccessible(true);
        } catch (Exception e) {
            throw new RuntimeException("初始化 unmap 方法失败", e);
        }
    }


    protected void release(MappedByteBuffer mappedByteBuffer) {
        if (mappedByteBuffer != null) {
            try {
                this.unMapMethod.invoke(null, mappedByteBuffer);
            } catch (IllegalAccessException | InvocationTargetException e) {
                throw new RuntimeException(e);
            }
        }
    }

    // 计算片段文件
    protected SegmentInfo calculateSegmentInfo(long position) {
        long segNumber = position / segmentSize;
        int index = (int) (position % segmentSize);

        Path segmentFile = getSegmentFile(segNumber);
        return new SegmentInfo(position, index, segNumber, segmentFile.toFile());
    }

    protected Path getSegmentFile(long segNumber) {
        return Paths.get(baseDir, "segment_" + segNumber + ".txt");
    }

    public abstract void release() throws InvocationTargetException, IllegalAccessException;

    public int getSegmentSize() {
        return segmentSize;
    }

    public long calculateFirstOffset() {
        return (long) HeadInfo.SIZE * segmentSize;
    }
}

状态文件映射器StatusFileMapper

package com.zhaofujun.filequeue;


import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;

public class StatusFileMapper extends BaseFileMapper {

    private final WriteNumRef writeNumRef;
    private final ReadNums readNums;


    private final MappedByteBuffer statusMappedByteBuffer;


    public int SIZE = Integer.BYTES + Long.BYTES + Long.BYTES + Long.BYTES * 16;

    private MappedByteBuffer getStatusMappedByteBuffer(String baseDir) throws IOException {
        try (RandomAccessFile randomAccessFile = new RandomAccessFile(getStatusMappedFile(baseDir), "rw");
             FileChannel channel = randomAccessFile.getChannel()) {
            return channel.map(FileChannel.MapMode.READ_WRITE, 0, SIZE);
        }
    }

    private File getStatusMappedFile(String baseDir) {
        return Paths.get(baseDir, "status.rw").toFile();
    }

    public StatusFileMapper(String baseDir, int segmentSize) throws IOException {
        super(baseDir, segmentSize);

        //系统预置16个readPosition
        //位置信息存放的内容:[4:segmentSize][8:writePosition][8:writeOffset][8:readPosition1][8:readPosition2]//如果有多个readPosition,依次类推


        if (getStatusMappedFile(baseDir).exists()) {
            //文件存在,则读取文件中的WritePosition信息
            this.statusMappedByteBuffer = getStatusMappedByteBuffer(baseDir);

            //读取文件中的segmentSize信息,忽略传入的segmentSize
            this.segmentSize = statusMappedByteBuffer.getInt(0);

        } else {
            //文件不存在
            this.statusMappedByteBuffer = getStatusMappedByteBuffer(baseDir);

            //写入segmentSize信息
            statusMappedByteBuffer.putInt(0, segmentSize);
            statusMappedByteBuffer.putLong(12, calculateFirstOffset());

        }
        //初始化写指针的信息
        //从4开始读,读出两个Long,分别是segmentSize和writePosition
        long writePosition = statusMappedByteBuffer.getLong(4);
        long writeOffset = statusMappedByteBuffer.getLong(12);

        //读取文件中的writePosition信息
        this.writeNumRef = new WriteNumRef(writePosition, writeOffset);


        //初始化readNums
        //从20开始读,读16个readPosition
        long[] readPositions = new long[16];
        for (int i = 0; i < 16; i++) {
            readPositions[i] = this.statusMappedByteBuffer.getLong(20 + i * Long.BYTES);
        }
        this.readNums = new ReadNums(readPositions);


    }


    public DataInfo preallocate(byte[] bytes, SegmentFileMapper segmentContext) {
        DataInfo dataInfo = new DataInfo();

        //包装数据
        dataInfo.setData(bytes);


        //包装数据信息的头信息
        WriteInfo writeInfo = writeNumRef.getAndSet(bytes.length, segmentContext);
        dataInfo.setHeadInfo(new HeadInfo(writeInfo.getNum(), writeInfo.getOffset(), dataInfo.getData().length));
        //包装数据信息的片段信息
        SegmentInfo segmentInfo = segmentContext.calculateSegmentInfo(writeInfo.getNum());
        dataInfo.setSegmentInfo(segmentInfo);


        return dataInfo;
    }

    public void release() {
        release(statusMappedByteBuffer);
    }

    public void updateWritePosition() {
        WriteInfo writeInfo = writeNumRef.getNumRef();
        statusMappedByteBuffer.putLong(4, writeInfo.getNum());
        statusMappedByteBuffer.putLong(12, writeInfo.getOffset());
    }

    public void updateReadPosition(int no) {
        long readNum = readNums.get(no);
        statusMappedByteBuffer.putLong(20 + no * Long.BYTES, readNum);
    }


    public WriteNumRef getWriteNumRef() {
        return writeNumRef;
    }

    public ReadNums getReadNums() {
        return readNums;
    }
}

段文件映射器SegmentFileMapper

package com.zhaofujun.filequeue;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.reflect.InvocationTargetException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;

public class SegmentFileMapper extends BaseFileMapper {

    protected int elementSize;

    private final RandomAccessFile lockRandomAccessFile;
    private final FileChannel lockFileChannel;
    private FileLock lock;

    private Path getLockFilePath() {
        return Paths.get(baseDir, ".lock");
    }


    public SegmentFileMapper(String baseDir, int segmentSize) throws IOException {
        //数据文件格式[头信息1:[索引:8][写入位置:8][写入长度:4]]*segmentSize + [数据:[数据1][数据2]...[数据n]]

        super(baseDir, segmentSize);

        this.lockRandomAccessFile = new RandomAccessFile(getLockFilePath().toFile(), "rw");
        this.lockFileChannel = this.lockRandomAccessFile.getChannel();
        this.lock = lockFileChannel.lock();

    }


    private final Map<Long, MappedByteBuffer> bufferMap = new HashMap<>();



    private MappedByteBuffer getSegmentBuffer(SegmentInfo segmentInfo, FileMode mode) {
        MappedByteBuffer mappedByteBuffer = bufferMap.get(segmentInfo.getSegmentNumber());
        if (mappedByteBuffer != null)
            return mappedByteBuffer;

        if (!segmentInfo.getSegmentFile().exists()) {
            //检查文件不存在时,根据模式进行处理
            if (mode == FileMode.WRITE) {
                long size = calculateFirstOffset() + elementSize * segmentSize * 2L;
                try (RandomAccessFile randomAccessFile = new RandomAccessFile(segmentInfo.getSegmentFile(), "rw");
                     FileChannel channel = randomAccessFile.getChannel()) {
                    mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, size);
                    bufferMap.put(segmentInfo.getSegmentNumber(), mappedByteBuffer);
                } catch (IOException e) {
                    throw new RuntimeException("create segment file failed in mode " + mode, e);
                }
            } else {
                throw new RuntimeException("File not exist in mode " + mode + " " + segmentInfo.getSegmentFile());
            }
        } else {
            //如果文件存在,则映射文件到内存
            try (RandomAccessFile randomAccessFile = new RandomAccessFile(segmentInfo.getSegmentFile(), "rw");
                 FileChannel channel = randomAccessFile.getChannel()) {
                mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, channel.size());
                bufferMap.put(segmentInfo.getSegmentNumber(), mappedByteBuffer);
            } catch (IOException e) {
                throw new RuntimeException("map segment file failed in mode " + mode, e);
            }
        }
        return mappedByteBuffer;
    }

    //扩容
    private void expandSegmentBuffer(SegmentInfo segmentInfo) {
        //根据当前文件的大小与写入位置的比例进行扩容
        try (RandomAccessFile randomAccessFile = new RandomAccessFile(segmentInfo.getSegmentFile(), "rw");
             FileChannel channel = randomAccessFile.getChannel()) {
            long fileSize = (long) (segmentSize / segmentInfo.getIndex()) * elementSize * 2 + channel.size();
            MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, fileSize);
            bufferMap.put(segmentInfo.getSegmentNumber(), mappedByteBuffer);
        } catch (IOException e) {
            throw new RuntimeException("expand segment file failed", e);
        }


    }

    public long put(DataInfo info) {

        SegmentInfo segmentInfo = info.getSegmentInfo();

        // 从缓冲区中获取片段
        this.elementSize = info.getHeadInfo().getBodySize();

        MappedByteBuffer mappedByteBuffer = getSegmentBuffer(segmentInfo, FileMode.WRITE);

        //检查当前缓冲区是否足够写入数据
        if (info.getHeadInfo().getBodyOffset() + info.getHeadInfo().getBodySize() > mappedByteBuffer.capacity()) {
            // 缓冲区不足,需要扩容文件
            expandSegmentBuffer(segmentInfo);
            mappedByteBuffer = getSegmentBuffer(segmentInfo, FileMode.WRITE);
        }

        // 写入头数据
        int headerOffset = segmentInfo.getIndex() * HeadInfo.SIZE;
        mappedByteBuffer.put(headerOffset, info.getHeadInfo().toBytes());
        // 写入数据
        mappedByteBuffer.put((int) (info.getHeadInfo().getBodyOffset()), info.getData());


        return info.getHeadInfo().getPosition();
    }

    public byte[] get(SegmentInfo segmentInfo) {
        MappedByteBuffer mappedByteBuffer = getSegmentBuffer(segmentInfo, FileMode.READ);

        int headerOffset = segmentInfo.getIndex() * HeadInfo.SIZE;

        //先读头数据
        byte[] headBytes = new byte[HeadInfo.SIZE];
        mappedByteBuffer.get(headerOffset, headBytes);
        HeadInfo headInfo = HeadInfo.fromBytes(headBytes);

        //读数据
        byte[] dataBytes = new byte[headInfo.getBodySize()];
        mappedByteBuffer.get((int) headInfo.getBodyOffset(), dataBytes);
        return dataBytes;
    }


    public void release() throws InvocationTargetException, IllegalAccessException {


        try {
            this.lock.release();
            this.lockFileChannel.close();
            this.lockRandomAccessFile.close();
        } catch (IOException e) {
            //已经关闭
            System.out.println("lock file already closed");
        }


        for (MappedByteBuffer mappedByteBuffer : bufferMap.values()) {
            release(mappedByteBuffer);
        }
        bufferMap.clear();

        //释放策略

    }

    public void deleteSegmentFile(long segmentNumber) throws IOException {

        //释放segmentNumber对应的缓冲区
        MappedByteBuffer mappedByteBuffer = bufferMap.get(segmentNumber);
        if (mappedByteBuffer != null) {
            release(mappedByteBuffer);
            bufferMap.remove(segmentNumber);
        }

        Path segmentFile = getSegmentFile(segmentNumber);
        if (Files.exists(segmentFile)) {
            Files.delete(segmentFile);
        }
    }


}

文件队列实现FileQueueWithReactor

package com.zhaofujun.filequeue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;


public class FileQueueWithReactor<T> implements FileQueue<T> {

    Logger logger = LoggerFactory.getLogger(FileQueueWithReactor.class);

    private final Sinks.Many<T> sinks = Sinks.many().unicast().onBackpressureBuffer();
    private final StatusFileMapper globalContext;
    private final Serializer<T> serializer;
    private final SegmentFileMapper segmentContext;
    private final DeleteSegmentStrategy deleteSegmentStrategy;
    private final Runnable onClose;


    FileQueueWithReactor(String baseDir, int segmentSize, DeleteSegmentStrategy deleteSegmentStrategy, Serializer<T> serializer, Runnable onClose) throws IOException {

        this.segmentContext = new SegmentFileMapper(baseDir, segmentSize);
        this.globalContext = new StatusFileMapper(baseDir, segmentSize);
        this.deleteSegmentStrategy = deleteSegmentStrategy;
        this.deleteSegmentStrategy.setFileQueue(this);
        this.deleteSegmentStrategy.start();

        this.serializer = serializer;
        this.onClose = onClose;


        this.sinks.asFlux()

                //序列化数据
                .map(p -> {
                    return serializer.serialize(p);
                })
                //配置headInfo和segmentInfo
                .map(bytes -> {
                    return globalContext.preallocate(bytes, segmentContext);
                })
                //写入文件
                .map(segmentContext::put)
                //写入位置信息到头文件
                .doOnNext(position -> {
                    globalContext.updateWritePosition();
                })
                .doOnError(throwable -> {
                    logger.error("write error", throwable);
                })
                .subscribe(da -> {
                }, throwable -> {
                    logger.error("write error", throwable);
                }, () -> {
                    logger.info("write complete");
                });

    }

    @Override
    public void pushWithSerialize(T t) throws IOException {

        Schedulers.single().schedule(() -> {
                    Sinks.EmitResult emitResult = sinks.tryEmitNext(t);

                    if (emitResult != Sinks.EmitResult.OK) {
                        try {
                            Thread.sleep(100);
                            push(t);
                        } catch (InterruptedException | IOException e) {
                            throw new RuntimeException(e);
                        }
                    }
                }
        );


    }

    @Override
    public void push(T t) throws IOException {
        Sinks.EmitResult emitResult = sinks.tryEmitNext(t);

        if (emitResult != Sinks.EmitResult.OK) {
            try {
                Thread.sleep(100);
                push(t);
            } catch (InterruptedException | IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public T pop(int no) throws IOException {
        // 从全局上下文获取当前读指针
        long readNum = globalContext.getReadNums().get(no);
        long writeNum = globalContext.getWriteNumRef().getNum();
        if (readNum >= writeNum) {
            return null;
        }

        // 更新读指针
        if (globalContext.getReadNums().compareAndSet(no, readNum)) {
            // 检查当前位置是否有效,有效则读取,无效则递归调用

            T t = get(readNum);
            //更新文件中的位置信息
            globalContext.updateReadPosition(no);
            return t;
        } else {
            return pop(no);
        }
    }

    @Override
    public void clear() throws IOException {

    }

    @Override
    public long size() throws IOException {
        return globalContext.getWriteNumRef().getNumRef().getNum() + 1;
    }

    @Override
    public long unReadMessages(int no) throws IOException {
        long writeNum = globalContext.getWriteNumRef().getNumRef().getNum();
        long readNum = globalContext.getReadNums().get(no);
        return writeNum - readNum;
    }

    @Override
    public T get(long position) throws IOException {
        SegmentInfo segmentInfo = segmentContext.calculateSegmentInfo(position);
        byte[] bytes = segmentContext.get(segmentInfo);
        return serializer.deserialize(bytes);
    }

    @Override
    public T getTop(int no) throws IOException {
        // 从全局上下文获取当前读指针
        long readPosition = globalContext.getReadNums().get(no);
        // 从文件中读取数据
        T t = get(readPosition);

        return t;
    }

    @Override
    public void deleteSegmentFile() {
        //根据读写指针,找到已经完成读取的片段文件,将其删除
        long[] readPositions = globalContext.getReadNums().getNums();
        // 找到除0以外的最小值
        Arrays.stream(readPositions).filter(num -> num != 0).min().ifPresent(min -> {
            // 删除所有小于min的片段文件
            //读取最小位置的片段文件,将其删除

            SegmentInfo segmentInfo = segmentContext.calculateSegmentInfo(min);
            long readSegmentIndex = segmentInfo.getSegmentNumber();
            for (long i = 0; i < readSegmentIndex; i++) {
                try {
                    segmentContext.deleteSegmentFile(i);
                } catch (IOException e) {
                    logger.warn("delete segment file failed:{}", e);
                }
            }
        });


    }

    @Override
    public void close() throws IOException {

        //关闭删除策略
        deleteSegmentStrategy.stop();

        try {
            segmentContext.release();
            globalContext.release();
        } catch (InvocationTargetException | IllegalAccessException e) {
            throw new RuntimeException(e);
        }

        deleteSegmentFile();
        // 执行关闭回调
        if (onClose != null) {
            onClose.run();
        }
    }
}

内置的序列化工具与周期清除策略KryoSerializerV2,PeriodDeleteSegmentStrategy

package com.zhaofujun.filequeue;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;

/**
 * Kryo序列化工具类(适用于Java 8)
 * 注意:Kryo实例非线程安全,通过ThreadLocal确保线程隔离
 */
public class KryoSerializerV2<T> implements Serializer<T> {

    private final Class<T> clazz;

    // 线程局部变量:每个线程持有一个Kryo实例
    private final ThreadLocal<Kryo> KRYO_THREAD_LOCAL = new ThreadLocal<Kryo>() {
        @Override
        protected Kryo initialValue() {
            Kryo kryo = new Kryo();
            // 配置:关闭默认的类注册验证(提升性能,Java 8中无需严格验证)
            kryo.setRegistrationRequired(false);
            kryo.register(clazz);
            return kryo;
        }
    };

    public KryoSerializerV2(Class<T> clazz) {
        this.clazz = clazz;
    }

    /**
     * 序列化:将对象转换为字节数组
     *
     * @param obj 待序列化的对象(不能为null)
     * @return 序列化后的字节数组
     */
    public byte[] serialize(Object obj) {
        if (obj == null) {
            throw new IllegalArgumentException("序列化对象不能为null");
        }

        // 使用Kryo的Output流写入字节
        try (ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
             Output output = new Output(byteOut)) {
            // 获取当前线程的Kryo实例
            Kryo kryo = KRYO_THREAD_LOCAL.get();
            // 写入对象(自动处理类型信息)
            kryo.writeObject(output, obj);
            output.flush();
            return byteOut.toByteArray();
        } catch (Exception e) {
            throw new RuntimeException("Kryo序列化失败", e);
        }
    }

    /**
     * 反序列化:将字节数组转换为对象
     *
     * @param data 序列化后的字节数组(不能为null或空)
     * @return 反序列化后的对象
     */
    @Override
    public T deserialize(byte[] data) {
        if (data == null || data.length == 0) {
            throw new IllegalArgumentException("反序列化字节数组不能为null或空");
        }
        if (clazz == null) {
            throw new IllegalArgumentException("目标类不能为null");
        }

        // 使用Kryo的Input流读取字节
        try (ByteArrayInputStream byteIn = new ByteArrayInputStream(data);
             Input input = new Input(byteIn)) {
            // 获取当前线程的Kryo实例
            Kryo kryo = KRYO_THREAD_LOCAL.get();
            // 读取对象(指定目标类)
            return kryo.readObject(input, clazz);
        } catch (Exception e) {
            throw new RuntimeException("Kryo反序列化失败", e);
        }
    }


    // 测试用的POJO类
    static class User {
        private Long id;
        private String name;
        private int age;
        private String email;

        // 必须有默认构造函数(Kryo反序列化需要)
        public User() {
        }

        public User(Long id, String name, int age, String email) {
            this.id = id;
            this.name = name;
            this.age = age;
            this.email = email;
        }

        @Override
        public String toString() {
            return "User{id=" + id + ", name='" + name + "', age=" + age + ", email='" + email + "'}";
        }

        // getter和setter(可选,Kryo可直接访问字段)
        public Long getId() {
            return id;
        }

        public String getName() {
            return name;
        }

        public int getAge() {
            return age;
        }

        public String getEmail() {
            return email;
        }
    }
}
package com.zhaofujun.filequeue;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * 周期性删除分段文件的策略
 */
public class PeriodDeleteSegmentStrategy extends DeleteSegmentStrategy {
    private final int period;
    private final TimeUnit timeUnit;

    public PeriodDeleteSegmentStrategy(int period, TimeUnit timeUnit) {
        this.period = period;
        this.timeUnit = timeUnit;
    }

    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    @Override
    public void start() {
        scheduler.scheduleAtFixedRate(this::execute, 0, period, timeUnit);
    }

    @Override
    public void stop() {
        scheduler.shutdown();
    }


}

定义一个工厂方法来获得实例FileQueueFactory

package com.zhaofujun.filequeue;


import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

public final class FileQueueFactory {
    private static final Map<String, FileQueue<?>> fileQueueMap = new ConcurrentHashMap<>();

    public static <U> FileQueue<U> createFileQueue(String basePath, int elementSize, Serializer<U> serializer, DeleteSegmentStrategy deleteSegmentStrategy) throws IOException {
        FileQueue<U> fileQueue = new FileQueueWithReactor<>(basePath, elementSize, deleteSegmentStrategy, serializer, () -> removeFileQueue(basePath));
        fileQueueMap.put(basePath, fileQueue);
        return fileQueue;
    }

    public static <U> FileQueue<U> createFileQueue(String basePath, int elementSize, Serializer<U> serializer) throws IOException {
        FileQueue<U> fileQueue = new FileQueueWithReactor<>(basePath, elementSize, new PeriodDeleteSegmentStrategy(1, TimeUnit.HOURS), serializer, () -> removeFileQueue(basePath));
        fileQueueMap.put(basePath, fileQueue);
        return fileQueue;
    }

    public static <U> FileQueue<U> load(String basePath) throws IOException {
        FileQueue<U> fileQueue = (FileQueue<U>) fileQueueMap.get(basePath);
        if (fileQueue == null) {
            throw new IOException("file queue not found");
        }
        return fileQueue;
    }

    private static void removeFileQueue(String basePath) {
        fileQueueMap.remove(basePath);
    }
}


评论