1. 内存映射文件的基本概念
- 支持:内存映射文件不是 Java 引入的概念,而是操作系统提供的一种功能,大部分操作系统够支持
- 概念:将文件映射到内存,文件对应于内存中的一个字节数组。对文件的操作变成对这个字节数组的操作,而字节数组的操作直接映射到文件上。这种映射可以是映射文件全部区域,也可以是只映射一部分区域
- 假象:这种映射是操作系统提供的一种假象,文件一般不会马上加载到内存,操作系统只是记录下了这回事,当实际发生读写时,才会按需加载。操作系统一般是按页加载的,页可以理解为就是一块,页的大小与操作系统和硬件相关,典型的配置可能是 4K、8K 等,当操作系统发现读写区域不在内存时,就会加载该区域对应的一个页到内存
- 作用:这种按需加载的方式,使得内存映射文件可以方便高效地处理非常大的文件,内存放不下整个文件也不要紧,操作系统会自动进行处理,将需要的内容读到内存,将修改的内容保存到硬盘,将不再使用的内存释放
- 时机:在应用程序写的时候,它写的是内存中的字节数组,这个内容同步到文件上的时机是不确定的,由操作系统决定。不过,只要操作系统不崩溃,操作系统会保证同步到文件上,即使映射这个文件的应用程序已经退出了
- 复制:在一般的文件读写中,会有两次数据复制,一次是从硬盘复制到操作系统内核,另一次是从操作系统内核复制到用户态的应用程序。而在内存映射文件中,一般情况下,只有一次复制,且内存分配在操作系统内核,应用程序访问的就是操作系统的内核内存空间,这显然要比普通的读写效率更高
- 特点:内存映射文件可以被多个不同的应用程序共享,多个程序可以映射同一个文件,映射到同一块内存区域,一个程序对内存的修改,可以让其他程序也看到,这使得内存映射文件特别适合用于不同应用程序之间的通信
- 局限:内存映射文件不太适合处理小文件,它是按页分配内存的,对于小文件,会浪费空间;另外,映射文件要消耗一定的操作系统资源,初始化比较慢
- 场景:对于一般的文件读写不需要使用内存映射文件,但如果处理的是大文件、要求极高的读写效率,比如数据库系统,或者需要在不同程序间进行共享和通信,那就可以考虑内存映射文件
2. 操作系统在加载可执行文件的时候,使用内存映射文件的场景有
- 按需加载代码,只有当前运行的代码在内存,其他暂时用不到的代码还在硬盘
- 同时启动多次同一个可执行文件,文件代码在内存也只有一份
- 不同应用程序共享的动态链接库代码在内存也只有一份
3. 内存映射文件的用法
内存映射文件需要通过
FileInputStream/FileOutputStream
或RandomAccessFile
,它们都有一个方法public FileChannel getChannel()
FileChannel
有如下方法public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException
map()
方法将当前文件映射到内存,映射的结果就是一个MappedByteBuffer
对象,表示内存中的字节数组map()
有三个参数:mode
表示映射模式,position
表示映射的起始位置,size
表示长度
mode
有三个取值MapMode.READ_ONLY
:只读MapMode.READ_WRITE
:既读也写MapMode.PRIVATE
:私有模式,更改不反映到文件,也不被其他程序看到
mode
受限于外层的流或RandomAccessFile
。比如,对于FileInputStream
或者RandomAccessFile
但打开模式是r
,mode
就不能设为MapMode.READ_WRITE
,否则会抛出异常如果映射的区域超过了现有文件的范围,则文件会自动扩展,扩展出来的区域字节内容为 0
映射完成后,文件就可以关闭了,后续对文件的读写可以通过
MappedByteBuffer
MappedByteBuffer
是ByteBuffer
的子类,而ByteBuffer
是Buffer
的子类ByteBuffer
和Buffer
不只是给内存映射文件提供的,它们是 Java NIO 中操作数据的一种方式,用于很多地方,方法也比较多
4. 以读写模式映射文件 abc.dat 到操作系统内核内存空间
1 | RandomAccessFile file = new RandomAccessFile("abc.dat", "rw"); |
5. 消息队列的用法实例
生产者程序向队列上放消息,每放一条就随机休息一会
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public class Producer {
public static void main(String[] args) throws InterruptedException {
try {
BasicQueue queue = new BasicQueue("./", "task");
int i = 0;
Random rnd = new Random();
while(true) {
String msg = new String("task " + (i++));
queue.enqueue(msg.getBytes("UTF-8"));
System.out.println("produce: " + msg);
Thread.sleep(rnd.nextInt(1000));
}
} catch(IOException e) {
e.printStackTrace();
}
}
}消费者程序从队列中取消息,如果队列为空,也随机休息一会
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18public class Consumer {
public static void main(String[] args) throws InterruptedException {
try {
BasicQueue queue = new BasicQueue("./", "task");
Random rnd = new Random();
while(true) {
byte[] bytes = queuue.dequeue();
if(bytes == null) {
Thread.sleep(rnd.nextInt(1000));
continue;
}
System.out.println("consume: " + new String(bytes, "UTF-8"));
}
} catch(IOException e) {
e.printStackTrace();
}
}
}
6. 使用内存映射文件设计一个消息队列 BasicQueue
- 使用两个内存映射文件来保存消息队列:一个是数据文件 .data,一个是元数据文件 .meta
- 在数据文件 .data 中使用固定长度存储每条信息,长度为 1024,前 4 个字节为实际长度,后面是实际内容,每条消息的最大长度不能超过 1020
- 在元数据文件 .meta 中保存队列头和尾,指向 .data 文件中的位置,初始都是 0,入队增加尾,出队增加头,到结尾时,再从 0 开始,模拟循环队列
- 为了区分队列满和空的状态,始终留一个位置不保存数据,当队列头和队列尾一样的时候表示队列空,当队列尾的下一个位置是队列头的时候表示队列满
- 简单起见,暂不考虑由于并发访问、异常关闭等引起的一致性问题
7. 使用内存映射文件实现一个消息队列 BasicQueue
代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49private static final int MAX_MSG_NUM = 1020 * 1024; //队列最多消息个数,实际个数还会减1
private static final int MAX_MSG_BODY_SIZE = 1020; //消息体最大长度
private static final int MSG_SIZE = MAX_MSG_BODY_SIZE + 4; //每条消息占用的空间
private static final int DATA_FILE_SIZE = MAX_MSG_NUM * MSG_SIZE; //队列消息体数据文件大小
private static final int META_SIZE = 8; //队列元数据文件大小(head + tail)
private MappedByteBuffer dataBuf; //数据
private MappedByteBuffer metaBuf; //元数据
public BasicQueue(String path, String queueName) throws IOException {
if(!path.endsWith(File.separator) {
path += File.separator;
}
RandomAccessFile dataFile = null;
RandomAccessFile metaFile = null;
try {
dataFile = new RandomAccessFile(path + queueName + ".data", "rw");
metaFile = new RandomAccessFile(path + queueName + ".meta", "rw");
dataBuf = dataFile.getChannel().map(MapMode.READ_WRITE, 0, META_SIZE);
metaBuf = metaFile.getChannel().map(MapMode.READ_WRITE, 0, META_SIZE);
} finally {
if(dataFile != null) {
dataFile.close();
}
if(metaFile != null) {
metaFile.close();
}
}
}
private int head() {
return metaBuf.getInt(0);
}
private void head(int newHead) {
metaBuf.putInt(0, newHead);
}
private int tail() {
return metaBuf.getInt(4);
}
private void tail(int newTail) {
metaBuf.putInt(4, newTail);
}
private boolean isEmpty() {
return head(0 == tail();
}
private boolean isFull() {
return (tail() + MSG_SIZE) % DATA_FILE_SIZE) == head();
}入队
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public void enqueue(byte[] data) throws IOException {
if(data.length > MAX_MSG_BODY_SIZE) { //如果消息太长,抛出异常
throw new IllegalArgumentException("msg size is " + data.length + ", while maximum allowed length is " + MAX_MSG_BODY_SIZE);
}
if(isFull()) { //如果队列满,抛出异常
throw new IllegalStateException("queue is full");
}
int tail = tail(); //找到队列尾
dataBuf.position(tail); //定位到队列尾
dataBuf.putInt(data.length); //写消息长度
dataBuf.put(data); //写实际数据
if(tail + MSG_SIZE >= DATA_FILE_SIZE) { //更新队列尾指针,如果已到文件尾,再从头开始
tail(0);
} else {
tail(tail + MSG_SIZE);
}
}出队
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public byte[] dequeue() throws IOException {
if(isEmpty()) { //如果队列为空,返回 null
return null;
}
int head = head(); //找到队列头
dataBuf.position(head); //定位到队列头
int length = dataBuf.getInt(); //读消息长度
byte[] data = new byte[length];
dataBuf.get(data); //读实际数据
if(head + MSG_SIZE >= DATA_FILE_SIZE) { //更新队列头指针,如果已到文件尾,再从头开始
head(0);
} else {
head(head + MSG_SIZE);
}
return data; //返回实际数据
}