0%

Java 文件高级技术(三):内存映射文件

1. 内存映射文件的基本概念

  • 支持:内存映射文件不是 Java 引入的概念,而是操作系统提供的一种功能,大部分操作系统够支持
  • 概念:将文件映射到内存,文件对应于内存中的一个字节数组。对文件的操作变成对这个字节数组的操作,而字节数组的操作直接映射到文件上。这种映射可以是映射文件全部区域,也可以是只映射一部分区域
  • 假象:这种映射是操作系统提供的一种假象,文件一般不会马上加载到内存,操作系统只是记录下了这回事,当实际发生读写时,才会按需加载。操作系统一般是按页加载的,页可以理解为就是一块,页的大小与操作系统和硬件相关,典型的配置可能是 4K、8K 等,当操作系统发现读写区域不在内存时,就会加载该区域对应的一个页到内存
  • 作用:这种按需加载的方式,使得内存映射文件可以方便高效地处理非常大的文件,内存放不下整个文件也不要紧,操作系统会自动进行处理,将需要的内容读到内存,将修改的内容保存到硬盘,将不再使用的内存释放
  • 时机:在应用程序写的时候,它写的是内存中的字节数组,这个内容同步到文件上的时机是不确定的,由操作系统决定。不过,只要操作系统不崩溃,操作系统会保证同步到文件上,即使映射这个文件的应用程序已经退出了
  • 复制:在一般的文件读写中,会有两次数据复制,一次是从硬盘复制到操作系统内核,另一次是从操作系统内核复制到用户态的应用程序。而在内存映射文件中,一般情况下,只有一次复制,且内存分配在操作系统内核,应用程序访问的就是操作系统的内核内存空间,这显然要比普通的读写效率更高
  • 特点:内存映射文件可以被多个不同的应用程序共享,多个程序可以映射同一个文件,映射到同一块内存区域,一个程序对内存的修改,可以让其他程序也看到,这使得内存映射文件特别适合用于不同应用程序之间的通信
  • 局限:内存映射文件不太适合处理小文件,它是按页分配内存的,对于小文件,会浪费空间;另外,映射文件要消耗一定的操作系统资源,初始化比较慢
  • 场景:对于一般的文件读写不需要使用内存映射文件,但如果处理的是大文件、要求极高的读写效率,比如数据库系统,或者需要在不同程序间进行共享和通信,那就可以考虑内存映射文件

2. 操作系统在加载可执行文件的时候,使用内存映射文件的场景有

  • 按需加载代码,只有当前运行的代码在内存,其他暂时用不到的代码还在硬盘
  • 同时启动多次同一个可执行文件,文件代码在内存也只有一份
  • 不同应用程序共享的动态链接库代码在内存也只有一份

3. 内存映射文件的用法

  • 内存映射文件需要通过 FileInputStream/FileOutputStreamRandomAccessFile,它们都有一个方法 public FileChannel getChannel()

    • FileChannel 有如下方法 public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException
    • map() 方法将当前文件映射到内存,映射的结果就是一个 MappedByteBuffer 对象,表示内存中的字节数组
    • map() 有三个参数:mode 表示映射模式,position 表示映射的起始位置,size 表示长度
  • mode 有三个取值

    • MapMode.READ_ONLY:只读
    • MapMode.READ_WRITE:既读也写
    • MapMode.PRIVATE私有模式,更改不反映到文件,也不被其他程序看到
  • mode 受限于外层的流或 RandomAccessFile。比如,对于 FileInputStream 或者 RandomAccessFile 但打开模式是 rmode 就不能设为 MapMode.READ_WRITE,否则会抛出异常

  • 如果映射的区域超过了现有文件的范围,则文件会自动扩展,扩展出来的区域字节内容为 0

  • 映射完成后,文件就可以关闭了,后续对文件的读写可以通过 MappedByteBuffer

    • MappedByteBufferByteBuffer 的子类,而 ByteBufferBuffer 的子类
    • ByteBufferBuffer 不只是给内存映射文件提供的,它们是 Java NIO 中操作数据的一种方式,用于很多地方,方法也比较多

4. 以读写模式映射文件 abc.dat 到操作系统内核内存空间

1
2
3
4
5
6
7
8
9
RandomAccessFile file = new RandomAccessFile("abc.dat", "rw");
try {
MappedByteBuffer buf = file.getChannel().map(Map.Mode.READ_WRITE, 0, file.length());
//使用 buf
} catch(IOException e) {
e.printStackTrace();
} finally {
file.close();
}

5. 消息队列的用法实例

  • 生产者程序向队列上放消息,每放一条就随机休息一会

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public class Producer {
    public static void main(String[] args) throws InterruptedException {
    try {
    BasicQueue queue = new BasicQueue("./", "task");
    int i = 0;
    Random rnd = new Random();
    while(true) {
    String msg = new String("task " + (i++));
    queue.enqueue(msg.getBytes("UTF-8"));
    System.out.println("produce: " + msg);
    Thread.sleep(rnd.nextInt(1000));
    }
    } catch(IOException e) {
    e.printStackTrace();
    }
    }
    }
  • 消费者程序从队列中取消息,如果队列为空,也随机休息一会

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public class Consumer {
    public static void main(String[] args) throws InterruptedException {
    try {
    BasicQueue queue = new BasicQueue("./", "task");
    Random rnd = new Random();
    while(true) {
    byte[] bytes = queuue.dequeue();
    if(bytes == null) {
    Thread.sleep(rnd.nextInt(1000));
    continue;
    }
    System.out.println("consume: " + new String(bytes, "UTF-8"));
    }
    } catch(IOException e) {
    e.printStackTrace();
    }
    }
    }

6. 使用内存映射文件设计一个消息队列 BasicQueue

  • 使用两个内存映射文件来保存消息队列:一个是数据文件 .data,一个是元数据文件 .meta
  • 在数据文件 .data 中使用固定长度存储每条信息,长度为 1024,前 4 个字节为实际长度,后面是实际内容,每条消息的最大长度不能超过 1020
  • 在元数据文件 .meta 中保存队列头和尾,指向 .data 文件中的位置,初始都是 0,入队增加尾,出队增加头,到结尾时,再从 0 开始,模拟循环队列
  • 为了区分队列满和空的状态,始终留一个位置不保存数据,当队列头和队列尾一样的时候表示队列空,当队列尾的下一个位置是队列头的时候表示队列满
  • 简单起见,暂不考虑由于并发访问异常关闭等引起的一致性问题

7. 使用内存映射文件实现一个消息队列 BasicQueue

  • 代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    private static final int MAX_MSG_NUM = 1020 * 1024; //队列最多消息个数,实际个数还会减1
    private static final int MAX_MSG_BODY_SIZE = 1020; //消息体最大长度
    private static final int MSG_SIZE = MAX_MSG_BODY_SIZE + 4; //每条消息占用的空间
    private static final int DATA_FILE_SIZE = MAX_MSG_NUM * MSG_SIZE; //队列消息体数据文件大小
    private static final int META_SIZE = 8; //队列元数据文件大小(head + tail)

    private MappedByteBuffer dataBuf; //数据
    private MappedByteBuffer metaBuf; //元数据

    public BasicQueue(String path, String queueName) throws IOException {
    if(!path.endsWith(File.separator) {
    path += File.separator;
    }
    RandomAccessFile dataFile = null;
    RandomAccessFile metaFile = null;
    try {
    dataFile = new RandomAccessFile(path + queueName + ".data", "rw");
    metaFile = new RandomAccessFile(path + queueName + ".meta", "rw");
    dataBuf = dataFile.getChannel().map(MapMode.READ_WRITE, 0, META_SIZE);
    metaBuf = metaFile.getChannel().map(MapMode.READ_WRITE, 0, META_SIZE);
    } finally {
    if(dataFile != null) {
    dataFile.close();
    }
    if(metaFile != null) {
    metaFile.close();
    }
    }
    }

    private int head() {
    return metaBuf.getInt(0);
    }
    private void head(int newHead) {
    metaBuf.putInt(0, newHead);
    }
    private int tail() {
    return metaBuf.getInt(4);
    }
    private void tail(int newTail) {
    metaBuf.putInt(4, newTail);
    }

    private boolean isEmpty() {
    return head(0 == tail();
    }
    private boolean isFull() {
    return (tail() + MSG_SIZE) % DATA_FILE_SIZE) == head();
    }
  • 入队

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public void enqueue(byte[] data) throws IOException {
    if(data.length > MAX_MSG_BODY_SIZE) { //如果消息太长,抛出异常
    throw new IllegalArgumentException("msg size is " + data.length + ", while maximum allowed length is " + MAX_MSG_BODY_SIZE);
    }
    if(isFull()) { //如果队列满,抛出异常
    throw new IllegalStateException("queue is full");
    }
    int tail = tail(); //找到队列尾
    dataBuf.position(tail); //定位到队列尾
    dataBuf.putInt(data.length); //写消息长度
    dataBuf.put(data); //写实际数据
    if(tail + MSG_SIZE >= DATA_FILE_SIZE) { //更新队列尾指针,如果已到文件尾,再从头开始
    tail(0);
    } else {
    tail(tail + MSG_SIZE);
    }
    }
  • 出队

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public byte[] dequeue() throws IOException {
    if(isEmpty()) { //如果队列为空,返回 null
    return null;
    }
    int head = head(); //找到队列头
    dataBuf.position(head); //定位到队列头
    int length = dataBuf.getInt(); //读消息长度
    byte[] data = new byte[length];
    dataBuf.get(data); //读实际数据
    if(head + MSG_SIZE >= DATA_FILE_SIZE) { //更新队列头指针,如果已到文件尾,再从头开始
    head(0);
    } else {
    head(head + MSG_SIZE);
    }
    return data; //返回实际数据
    }
-------------------- 本文结束感谢您的阅读 --------------------