Skip to content

Commit c173e27

Browse files
committed
improve archive index reading performance
* use a streaming parser to find a file in the local index * switch to memmap as reader * use zero-copy deserialization for the key, change key data type
1 parent e52460f commit c173e27

File tree

4 files changed

+172
-63
lines changed

4 files changed

+172
-63
lines changed

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ rusoto_credential = "0.46.0"
7777
# Data serialization and deserialization
7878
serde = { version = "1.0", features = ["derive"] }
7979
serde_json = "1.0"
80+
memmap = "0.7.0"
8081

8182
# iron dependencies
8283
iron = "0.6"

src/storage/archive_index.rs

Lines changed: 151 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
use crate::error::Result;
22
use crate::storage::{compression::CompressionAlgorithm, FileRange};
33
use anyhow::{bail, Context as _};
4-
use serde::{Deserialize, Serialize};
4+
use memmap::MmapOptions;
5+
use serde::de::DeserializeSeed;
6+
use serde::de::{IgnoredAny, MapAccess, Visitor};
7+
use serde::{Deserialize, Deserializer, Serialize};
58
use std::collections::HashMap;
6-
use std::io;
7-
use std::path::{Path, PathBuf};
9+
use std::fmt;
10+
use std::path::Path;
11+
use std::{fs, io};
812

913
#[derive(Deserialize, Serialize)]
1014
pub(crate) struct FileInfo {
@@ -21,51 +25,155 @@ impl FileInfo {
2125
}
2226
}
2327

24-
#[derive(Deserialize, Serialize)]
25-
pub(crate) struct Index {
26-
files: HashMap<PathBuf, FileInfo>,
28+
#[derive(Serialize)]
29+
struct Index {
30+
files: HashMap<String, FileInfo>,
2731
}
2832

29-
impl Index {
30-
pub(crate) fn load(reader: impl io::Read) -> Result<Index> {
31-
serde_cbor::from_reader(reader).context("deserialization error")
33+
pub(crate) fn create<R: io::Read + io::Seek, W: io::Write>(
34+
zipfile: &mut R,
35+
writer: &mut W,
36+
) -> Result<()> {
37+
let mut archive = zip::ZipArchive::new(zipfile)?;
38+
39+
// get file locations
40+
let mut files: HashMap<String, FileInfo> = HashMap::with_capacity(archive.len());
41+
for i in 0..archive.len() {
42+
let zf = archive.by_index(i)?;
43+
44+
files.insert(
45+
zf.name().to_string(),
46+
FileInfo {
47+
range: FileRange::new(zf.data_start(), zf.data_start() + zf.compressed_size() - 1),
48+
compression: match zf.compression() {
49+
zip::CompressionMethod::Bzip2 => CompressionAlgorithm::Bzip2,
50+
c => bail!("unsupported compression algorithm {} in zip-file", c),
51+
},
52+
},
53+
);
3254
}
3355

34-
pub(crate) fn save(&self, writer: impl io::Write) -> Result<()> {
35-
serde_cbor::to_writer(writer, self).context("serialization error")
56+
serde_cbor::to_writer(writer, &Index { files }).context("serialization error")
57+
}
58+
59+
pub(crate) fn find_in_slice(bytes: &[u8], search_for: &str) -> Result<Option<FileInfo>> {
60+
let mut deserializer = serde_cbor::Deserializer::from_slice(bytes);
61+
62+
/// This visitor will just find the `files` element in the top-level map.
63+
/// Then it will call the `FindFileVisitor` that should find the actual
64+
/// FileInfo for the path we are searching for.
65+
struct FindFileListVisitor {
66+
search_for: String,
3667
}
3768

38-
pub(crate) fn new_from_zip<R: io::Read + io::Seek>(zipfile: &mut R) -> Result<Index> {
39-
let mut archive = zip::ZipArchive::new(zipfile)?;
40-
41-
// get file locations
42-
let mut files: HashMap<PathBuf, FileInfo> = HashMap::with_capacity(archive.len());
43-
for i in 0..archive.len() {
44-
let zf = archive.by_index(i)?;
45-
46-
files.insert(
47-
PathBuf::from(zf.name()),
48-
FileInfo {
49-
range: FileRange::new(
50-
zf.data_start(),
51-
zf.data_start() + zf.compressed_size() - 1,
52-
),
53-
compression: match zf.compression() {
54-
zip::CompressionMethod::Bzip2 => CompressionAlgorithm::Bzip2,
55-
c => bail!("unsupported compression algorithm {} in zip-file", c),
56-
},
57-
},
58-
);
69+
impl FindFileListVisitor {
70+
pub fn new(path: String) -> Self {
71+
FindFileListVisitor { search_for: path }
5972
}
73+
}
6074

61-
Ok(Index { files })
75+
impl<'de> Visitor<'de> for FindFileListVisitor {
76+
type Value = Option<FileInfo>;
77+
78+
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
79+
write!(formatter, "a map with a 'files' key")
80+
}
81+
82+
fn visit_map<V>(self, mut map: V) -> Result<Self::Value, V::Error>
83+
where
84+
V: MapAccess<'de>,
85+
{
86+
/// This visitor will walk the full `files` map and search for
87+
/// the path we want to have.
88+
/// Return value is just the `FileInfo` we want to have, or
89+
/// `None`.
90+
struct FindFileVisitor {
91+
search_for: String,
92+
}
93+
94+
impl FindFileVisitor {
95+
pub fn new(search_for: String) -> Self {
96+
FindFileVisitor { search_for }
97+
}
98+
}
99+
100+
impl<'de> DeserializeSeed<'de> for FindFileVisitor {
101+
type Value = Option<FileInfo>;
102+
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
103+
where
104+
D: Deserializer<'de>,
105+
{
106+
deserializer.deserialize_map(self)
107+
}
108+
}
109+
110+
impl<'de> Visitor<'de> for FindFileVisitor {
111+
type Value = Option<FileInfo>;
112+
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
113+
write!(
114+
formatter,
115+
"a map with path => FileInfo, searching for path {:?}",
116+
self.search_for
117+
)
118+
}
119+
fn visit_map<V>(self, mut map: V) -> Result<Self::Value, V::Error>
120+
where
121+
V: MapAccess<'de>,
122+
{
123+
while let Some(key) = map.next_key::<&str>()? {
124+
if key == self.search_for {
125+
let value = map.next_value::<FileInfo>()?;
126+
// skip over the rest of the data without really parsing it.
127+
// If we don't do this the serde_cbor deserializer fails because not
128+
// the whole map is consumed.
129+
while map.next_entry::<IgnoredAny, IgnoredAny>()?.is_some() {}
130+
return Ok(Some(value));
131+
} else {
132+
// skip parsing the FileInfo structure when the key doesn't match.
133+
map.next_value::<IgnoredAny>()?;
134+
}
135+
}
136+
137+
Ok(None)
138+
}
139+
}
140+
141+
while let Some(key) = map.next_key::<&str>()? {
142+
if key == "files" {
143+
return map.next_value_seed(FindFileVisitor::new(self.search_for));
144+
}
145+
}
146+
147+
Ok(None)
148+
}
62149
}
63150

64-
pub(crate) fn find_file<P: AsRef<Path>>(&self, path: P) -> Result<&FileInfo> {
65-
self.files
66-
.get(path.as_ref())
67-
.ok_or_else(|| super::PathNotFoundError.into())
151+
impl<'de> DeserializeSeed<'de> for FindFileListVisitor {
152+
type Value = Option<FileInfo>;
153+
154+
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
155+
where
156+
D: Deserializer<'de>,
157+
{
158+
deserializer.deserialize_map(self)
159+
}
68160
}
161+
162+
Ok(FindFileListVisitor::new(search_for.to_string()).deserialize(&mut deserializer)?)
163+
}
164+
165+
pub(crate) fn find_in_file<P: AsRef<Path>>(
166+
archive_index_path: P,
167+
search_for: &str,
168+
) -> Result<Option<FileInfo>> {
169+
let file = fs::File::open(archive_index_path).context("could not open file")?;
170+
let mmap = unsafe {
171+
MmapOptions::new()
172+
.map(&file)
173+
.context("could not create memory map")?
174+
};
175+
176+
find_in_slice(&mmap, search_for)
69177
}
70178

71179
#[cfg(test)]
@@ -74,14 +182,6 @@ mod tests {
74182
use std::io::Write;
75183
use zip::write::FileOptions;
76184

77-
fn validate_index(index: &Index) {
78-
assert_eq!(index.files.len(), 1);
79-
80-
let fi = index.files.get(&PathBuf::from("testfile1")).unwrap();
81-
assert_eq!(fi.range, FileRange::new(39, 459));
82-
assert_eq!(fi.compression, CompressionAlgorithm::Bzip2);
83-
}
84-
85185
#[test]
86186
fn index_create_save_load() {
87187
let mut tf = tempfile::tempfile().unwrap();
@@ -98,13 +198,13 @@ mod tests {
98198
archive.write_all(&objectcontent).unwrap();
99199
tf = archive.finish().unwrap();
100200

101-
let index = Index::new_from_zip(&mut tf).unwrap();
102-
validate_index(&index);
103-
104201
let mut buf = Vec::new();
105-
index.save(&mut buf).unwrap();
202+
create(&mut tf, &mut buf).unwrap();
203+
204+
let fi = find_in_slice(&buf, "testfile1").unwrap().unwrap();
205+
assert_eq!(fi.range, FileRange::new(39, 459));
206+
assert_eq!(fi.compression, CompressionAlgorithm::Bzip2);
106207

107-
let new_index = Index::load(io::Cursor::new(&buf)).unwrap();
108-
validate_index(&new_index);
208+
assert!(find_in_slice(&buf, "some_other_file").unwrap().is_none());
109209
}
110210
}

src/storage/mod.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -200,8 +200,8 @@ impl Storage {
200200
}
201201

202202
pub(crate) fn exists_in_archive(&self, archive_path: &str, path: &str) -> Result<bool> {
203-
match self.get_index_for(archive_path) {
204-
Ok(index) => Ok(index.find_file(path).is_ok()),
203+
match self.get_index_filename(archive_path) {
204+
Ok(index_filename) => Ok(archive_index::find_in_file(index_filename, path)?.is_some()),
205205
Err(err) => {
206206
if err.downcast_ref::<PathNotFoundError>().is_some() {
207207
Ok(false)
@@ -245,17 +245,15 @@ impl Storage {
245245
Ok(blob)
246246
}
247247

248-
fn get_index_for(&self, archive_path: &str) -> Result<archive_index::Index> {
248+
fn get_index_filename(&self, archive_path: &str) -> Result<PathBuf> {
249249
// remote/folder/and/x.zip.index
250250
let remote_index_path = format!("{}.index", archive_path);
251251
let local_index_path = self
252252
.config
253253
.local_archive_cache_path
254254
.join(&remote_index_path);
255255

256-
if local_index_path.exists() {
257-
archive_index::Index::load(io::BufReader::new(fs::File::open(local_index_path)?))
258-
} else {
256+
if !local_index_path.exists() {
259257
let index_content = self.get(&remote_index_path, std::usize::MAX)?.content;
260258

261259
fs::create_dir_all(
@@ -265,9 +263,9 @@ impl Storage {
265263
)?;
266264
let mut file = fs::File::create(&local_index_path)?;
267265
file.write_all(&index_content)?;
268-
269-
archive_index::Index::load(&mut &index_content[..])
270266
}
267+
268+
Ok(local_index_path)
271269
}
272270

273271
pub(crate) fn get_from_archive(
@@ -280,8 +278,8 @@ impl Storage {
280278
if let Some(ref mut t) = fetch_time {
281279
t.step("find path in index");
282280
}
283-
let index = self.get_index_for(archive_path)?;
284-
let info = index.find_file(path)?;
281+
let info = archive_index::find_in_file(self.get_index_filename(archive_path)?, path)?
282+
.ok_or(PathNotFoundError)?;
285283

286284
if let Some(t) = fetch_time {
287285
t.step("range request");
@@ -336,9 +334,8 @@ impl Storage {
336334
}
337335

338336
let mut zip_content = zip.finish()?.into_inner();
339-
let index = archive_index::Index::new_from_zip(&mut io::Cursor::new(&mut zip_content))?;
340337
let mut index_content = vec![];
341-
index.save(&mut index_content)?;
338+
archive_index::create(&mut io::Cursor::new(&mut zip_content), &mut index_content)?;
342339
let alg = CompressionAlgorithm::default();
343340
let compressed_index_content = compress(&index_content[..], alg)?;
344341

0 commit comments

Comments
 (0)