Skip to content

Commit 692e6ad

Browse files
rdicroceluben
authored andcommitted
ZstdCompressCtx: add method to get frame progression
When performing compression in a streaming fashion, the position of the input buffer does not accurately indicate how much input has actually been compressed, because zstd has its own internal buffers. The getFrameProgression() function allows the user to get accurate numbers.
1 parent bb156dc commit 692e6ad

File tree

4 files changed

+99
-0
lines changed

4 files changed

+99
-0
lines changed

src/main/java/com/github/luben/zstd/ZstdCompressCtx.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,16 @@ private void ensureOpen() {
197197
}
198198
}
199199

200+
/**
201+
* Tells how much data has been ingested (read from input),
202+
* consumed (input actually compressed) and produced (output) for current frame.
203+
*/
204+
public ZstdFrameProgression getFrameProgression() {
205+
ensureOpen();
206+
return getFrameProgression0();
207+
}
208+
private native ZstdFrameProgression getFrameProgression0();
209+
200210
/**
201211
* Clear all state and parameters from the compression context. This leaves the object in a
202212
* state identical to a newly created compression context.
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package com.github.luben.zstd;
2+
3+
public class ZstdFrameProgression {
4+
5+
private long ingested;
6+
private long consumed;
7+
private long produced;
8+
private long flushed;
9+
private int currentJobID;
10+
private int nbActiveWorkers;
11+
12+
public ZstdFrameProgression(long ingested, long consumed, long produced, long flushed, int currentJobID,
13+
int nbActiveWorkers) {
14+
this.ingested = ingested;
15+
this.consumed = consumed;
16+
this.produced = produced;
17+
this.flushed = flushed;
18+
this.currentJobID = currentJobID;
19+
this.nbActiveWorkers = nbActiveWorkers;
20+
}
21+
22+
/**
23+
* The number of input bytes read and buffered.
24+
*/
25+
public long getIngested() {
26+
return ingested;
27+
}
28+
29+
/**
30+
* The number of input bytes actually compressed.
31+
* Note: ingested - consumed = amount of input data buffered internally, not yet compressed.
32+
*/
33+
public long getConsumed() {
34+
return consumed;
35+
}
36+
37+
/**
38+
* The number of compressed bytes generated and buffered.
39+
*/
40+
public long getProduced() {
41+
return produced;
42+
}
43+
44+
/**
45+
* The number of compressed bytes flushed.
46+
*/
47+
public long getFlushed() {
48+
return flushed;
49+
}
50+
51+
/**
52+
* The last started job number. Only applicable if multi-threading is enabled.
53+
*/
54+
public int getCurrentJobID() {
55+
return currentJobID;
56+
}
57+
58+
/**
59+
* The number of workers actively compressing. Only applicable if multi-threading is enabled.
60+
*/
61+
public int getNbActiveWorkers() {
62+
return nbActiveWorkers;
63+
}
64+
65+
}

src/main/native/jni_fast_zstd.c

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
#ifndef ZSTD_STATIC_LINKING_ONLY
2+
#define ZSTD_STATIC_LINKING_ONLY
3+
#endif
14
#include <jni.h>
25
#include <zstd.h>
36
#include <zstd_errors.h>
@@ -330,6 +333,19 @@ JNIEXPORT jlong JNICALL Java_com_github_luben_zstd_ZstdCompressCtx_reset0
330333
return ZSTD_CCtx_reset(cctx, ZSTD_reset_session_and_parameters);
331334
}
332335

336+
JNIEXPORT jobject JNICALL Java_com_github_luben_zstd_ZstdCompressCtx_getFrameProgression0
337+
(JNIEnv *env, jclass jctx) {
338+
ZSTD_CCtx* cctx = (ZSTD_CCtx*)(intptr_t)(*env)->GetLongField(env, jctx, compress_ctx_nativePtr);
339+
ZSTD_frameProgression native_progression = ZSTD_getFrameProgression(cctx);
340+
341+
jclass frame_progression_class = (*env)->FindClass(env, "com/github/luben/zstd/ZstdFrameProgression");
342+
jmethodID frame_progression_constructor = (*env)->GetMethodID(env, frame_progression_class, "<init>", "(JJJJII)V");
343+
return (*env)->NewObject(
344+
env, frame_progression_class, frame_progression_constructor, native_progression.ingested,
345+
native_progression.consumed, native_progression.produced, native_progression.flushed,
346+
native_progression.currentJobID, native_progression.nbActiveWorkers);
347+
}
348+
333349
JNIEXPORT jlong JNICALL Java_com_github_luben_zstd_ZstdCompressCtx_setPledgedSrcSize0
334350
(JNIEnv *env, jclass jctx, jlong src_size) {
335351
if (src_size < 0) {

src/test/scala/Zstd.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -895,9 +895,17 @@ class ZstdSpec extends AnyFlatSpec with ScalaCheckPropertyChecks {
895895
compressedBuffer.limit(compressedBuffer.position() + 1)
896896
cctx.compressDirectByteBufferStream(compressedBuffer, inputBuffer, EndDirective.CONTINUE)
897897
}
898+
899+
var frameProgression = cctx.getFrameProgression()
900+
assert(frameProgression.getIngested() == size)
901+
assert(frameProgression.getFlushed() == compressedBuffer.position())
902+
898903
compressedBuffer.limit(compressedBuffer.capacity())
899904
val done = cctx.compressDirectByteBufferStream(compressedBuffer, inputBuffer, EndDirective.END)
900905
assert(done)
906+
907+
frameProgression = cctx.getFrameProgression()
908+
assert(frameProgression.getConsumed() == size)
901909

902910
compressedBuffer.flip()
903911
val decompressedBuffer = ByteBuffer.allocateDirect(size)

0 commit comments

Comments
 (0)