Skip to content
This repository was archived by the owner on Jun 1, 2024. It is now read-only.

Commit b5bd109

Browse files
Add ElasticsearchSinkOptions.BufferFileRollingInterval option (#416)
* Add `ElasticsearchSinkOptions.BufferFileRollingInterval` option - Using this option we can customize buffer file rolling interval. The default is `RollingInterval.Day` (so no changes here). In some cases higher granularity may be needed. - Changed regular expression for FileSet to get buffer files to support different rolling interval file name formats - from Infinite to Minute. All of them are different amount of digits representing date - 0(Infinite), 4(Year),..., 12 (Minute). So replaced expression part for day format `(?<date>\\d{8})` with the expression for all interval date `(?<date>\\d{0,12})`. * Add tests for the desired functionality, which fail now. - Return code to support only Daily rolling interval - Add RollingIntervalExtensions.cs (origin: Serilog.Sinks.File) with InternalVisible attribute to be able to test - Add InternalVisible to FileSet.cs to be able to test it - Sign tests assembly the same way as Serilog.Sinks.Elasticsearch for InternalVisible to work * Support different rolling intervals for DurableElasticsearchSink rolling files. - Make support only for intervals like Day, Hour, Minute. As for less frequent intervals we cannot get specific date (specific day) for passing to _getIndexForEvent in `ElasticsearchPayloadReader`. - Support handling rolling files for different intervals in `ElasticsearchPayloadReader` and `FileSet` by using corresponding formats and search patterns. - Add tests of changed code - for `ElasticsearchPayloadReader` and `FileSet` * Remove redundant spaces
1 parent 7591772 commit b5bd109

File tree

11 files changed

+267
-16
lines changed

11 files changed

+267
-16
lines changed

src/Serilog.Sinks.Elasticsearch/Serilog.Sinks.Elasticsearch.csproj

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,10 @@
5555
<PackageReference Include="Microsoft.CSharp" Version="4.6.0" />
5656
</ItemGroup>
5757

58+
<ItemGroup>
59+
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleTo">
60+
<_Parameter1>Serilog.Sinks.Elasticsearch.Tests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100fb8d13fd344a1c6fe0fe83ef33c1080bf30690765bc6eb0df26ebfdf8f21670c64265b30db09f73a0dea5b3db4c9d18dbf6d5a25af5ce9016f281014d79dc3b4201ac646c451830fc7e61a2dfd633d34c39f87b81894191652df5ac63cc40c77f3542f702bda692e6e8a9158353df189007a49da0f3cfd55eb250066b19485ec</_Parameter1>
61+
</AssemblyAttribute>
62+
</ItemGroup>
63+
5864
</Project>

src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/Elasticsearch/DurableElasticsearchSink.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,19 +39,17 @@ public DurableElasticsearchSink(ElasticsearchSinkOptions options)
3939
throw new ArgumentException("Cannot create the durable ElasticSearch sink without a buffer base file name!");
4040
}
4141

42-
4342
_sink = new LoggerConfiguration()
4443
.MinimumLevel.Verbose()
4544
.WriteTo.File(_state.DurableFormatter,
4645
options.BufferBaseFilename + FileNameSuffix,
47-
rollingInterval: RollingInterval.Day,
46+
rollingInterval: options.BufferFileRollingInterval,
4847
fileSizeLimitBytes: options.BufferFileSizeLimitBytes,
4948
rollOnFileSizeLimit: true,
5049
retainedFileCountLimit: options.BufferFileCountLimit,
5150
levelSwitch: _state.Options.LevelSwitch,
5251
encoding: Encoding.UTF8)
5352
.CreateLogger();
54-
5553

5654
var elasticSearchLogClient = new ElasticsearchLogClient(
5755
elasticLowLevelClient: _state.Client,
@@ -63,7 +61,8 @@ public DurableElasticsearchSink(ElasticsearchSinkOptions options)
6361
typeName:_state.Options.TypeName,
6462
serialize:_state.Serialize,
6563
getIndexForEvent: _state.GetBufferedIndexForEvent,
66-
elasticOpType: _state.Options.BatchAction);
64+
elasticOpType: _state.Options.BatchAction,
65+
rollingInterval: options.BufferFileRollingInterval);
6766

6867
_shipper = new ElasticsearchLogShipper(
6968
bufferBaseFilename: _state.Options.BufferBaseFilename,
@@ -75,7 +74,8 @@ public DurableElasticsearchSink(ElasticsearchSinkOptions options)
7574
payloadReader: payloadReader,
7675
retainedInvalidPayloadsLimitBytes: _state.Options.BufferRetainedInvalidPayloadsLimitBytes,
7776
bufferSizeLimitBytes: _state.Options.BufferFileSizeLimitBytes,
78-
registerTemplateIfNeeded: _state.RegisterTemplateIfNeeded);
77+
registerTemplateIfNeeded: _state.RegisterTemplateIfNeeded,
78+
rollingInterval: options.BufferFileRollingInterval);
7979

8080
}
8181

src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/Elasticsearch/ElasticsearchLogShipper.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,13 @@ public class ElasticsearchLogShipper : LogShipper<List<string>>
3030
/// <param name="retainedInvalidPayloadsLimitBytes"></param>
3131
/// <param name="bufferSizeLimitBytes"></param>
3232
/// <param name="registerTemplateIfNeeded"></param>
33+
/// <param name="rollingInterval"></param>
3334
public ElasticsearchLogShipper(string bufferBaseFilename, int batchPostingLimit, TimeSpan period,
3435
long? eventBodyLimitBytes, LoggingLevelSwitch levelControlSwitch, ILogClient<List<string>> logClient,
3536
IPayloadReader<List<string>> payloadReader, long? retainedInvalidPayloadsLimitBytes,
36-
long? bufferSizeLimitBytes, Action registerTemplateIfNeeded)
37-
: base(bufferBaseFilename, batchPostingLimit, period, eventBodyLimitBytes,
38-
levelControlSwitch, logClient, payloadReader, retainedInvalidPayloadsLimitBytes, bufferSizeLimitBytes)
37+
long? bufferSizeLimitBytes, Action registerTemplateIfNeeded, RollingInterval rollingInterval)
38+
: base(bufferBaseFilename, batchPostingLimit, period, eventBodyLimitBytes, levelControlSwitch, logClient,
39+
payloadReader, retainedInvalidPayloadsLimitBytes, bufferSizeLimitBytes, rollingInterval)
3940
{
4041
_registerTemplateIfNeeded = registerTemplateIfNeeded;
4142
}

src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/Elasticsearch/ElasticsearchPayloadReader.cs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public class ElasticsearchPayloadReader: APayloadReader<List<string>>
1717
private readonly Func<object, string> _serialize;
1818
private readonly Func<string, DateTime,string> _getIndexForEvent;
1919
private readonly ElasticOpType _elasticOpType;
20+
private readonly RollingInterval _rollingInterval;
2021
private List<string> _payload;
2122
private int _count;
2223
private DateTime _date;
@@ -29,14 +30,21 @@ public class ElasticsearchPayloadReader: APayloadReader<List<string>>
2930
/// <param name="serialize"></param>
3031
/// <param name="getIndexForEvent"></param>
3132
/// <param name="elasticOpType"></param>
33+
/// <param name="rollingInterval"></param>
3234
public ElasticsearchPayloadReader(string pipelineName, string typeName, Func<object, string> serialize,
33-
Func<string, DateTime, string> getIndexForEvent, ElasticOpType elasticOpType)
35+
Func<string, DateTime, string> getIndexForEvent, ElasticOpType elasticOpType, RollingInterval rollingInterval)
3436
{
37+
if ((int)rollingInterval < (int)RollingInterval.Day)
38+
{
39+
throw new ArgumentException("Rolling intervals less frequent than RollingInterval.Day are not supported");
40+
}
41+
3542
_pipelineName = pipelineName;
3643
_typeName = typeName;
3744
_serialize = serialize;
3845
_getIndexForEvent = getIndexForEvent;
3946
_elasticOpType = elasticOpType;
47+
_rollingInterval = rollingInterval;
4048
}
4149

4250
/// <summary>
@@ -64,8 +72,9 @@ protected override void InitPayLoad(string filename)
6472
throw new FormatException(string.Format("The file name '{0}' does not seem to follow the right file pattern - it must be named [whatever]-{{Date}}[_n].json", Path.GetFileName(filename)));
6573
}
6674

67-
var dateString = lastToken.Substring(0, 8);
68-
_date = DateTime.ParseExact(dateString, "yyyyMMdd", CultureInfo.InvariantCulture);
75+
var dateFormat = _rollingInterval.GetFormat();
76+
var dateString = lastToken.Substring(0, dateFormat.Length);
77+
_date = DateTime.ParseExact(dateString, dateFormat, CultureInfo.InvariantCulture);
6978
}
7079
/// <summary>
7180
///
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
using System;
2+
using System.Runtime.CompilerServices;
3+
4+
[assembly:
5+
InternalsVisibleTo(
6+
"Serilog.Sinks.Elasticsearch.Tests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100fb8d13fd344a1c6fe0fe83ef33c1080bf30690765bc6eb0df26ebfdf8f21670c64265b30db09f73a0dea5b3db4c9d18dbf6d5a25af5ce9016f281014d79dc3b4201ac646c451830fc7e61a2dfd633d34c39f87b81894191652df5ac63cc40c77f3542f702bda692e6e8a9158353df189007a49da0f3cfd55eb250066b19485ec")]
7+
8+
namespace Serilog.Sinks.Elasticsearch.Durable
9+
{
10+
internal static class RollingIntervalExtensions
11+
{
12+
// From https://github.com/serilog/serilog-sinks-file/blob/dev/src/Serilog.Sinks.File/Sinks/File/RollingIntervalExtensions.cs#L19
13+
public static string GetFormat(this RollingInterval interval)
14+
{
15+
switch (interval)
16+
{
17+
case RollingInterval.Infinite:
18+
return "";
19+
case RollingInterval.Year:
20+
return "yyyy";
21+
case RollingInterval.Month:
22+
return "yyyyMM";
23+
case RollingInterval.Day:
24+
return "yyyyMMdd";
25+
case RollingInterval.Hour:
26+
return "yyyyMMddHH";
27+
case RollingInterval.Minute:
28+
return "yyyyMMddHHmm";
29+
default:
30+
throw new ArgumentException("Invalid rolling interval");
31+
}
32+
}
33+
34+
public static string GetMatchingDateRegularExpressionPart(this RollingInterval interval)
35+
{
36+
switch (interval)
37+
{
38+
case RollingInterval.Infinite:
39+
return "";
40+
case RollingInterval.Year:
41+
return "\\d{4}";
42+
case RollingInterval.Month:
43+
return "\\d{6}";
44+
case RollingInterval.Day:
45+
return "\\d{8}";
46+
case RollingInterval.Hour:
47+
return "\\d{10}";
48+
case RollingInterval.Minute:
49+
return "\\d{12}";
50+
default:
51+
throw new ArgumentException("Invalid rolling interval");
52+
}
53+
}
54+
}
55+
}

src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/FileSet.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@
1919
using System.IO;
2020
using System.Linq;
2121
using System.Net;
22+
using System.Runtime.CompilerServices;
2223
using System.Text.RegularExpressions;
2324
using Serilog.Debugging;
24-
25+
[assembly:
26+
InternalsVisibleTo(
27+
"Serilog.Sinks.Elasticsearch.Tests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100fb8d13fd344a1c6fe0fe83ef33c1080bf30690765bc6eb0df26ebfdf8f21670c64265b30db09f73a0dea5b3db4c9d18dbf6d5a25af5ce9016f281014d79dc3b4201ac646c451830fc7e61a2dfd633d34c39f87b81894191652df5ac63cc40c77f3542f702bda692e6e8a9158353df189007a49da0f3cfd55eb250066b19485ec")]
2528
namespace Serilog.Sinks.Elasticsearch.Durable
2629
{
2730
/// <summary>
@@ -36,14 +39,16 @@ class FileSet
3639

3740
const string InvalidPayloadFilePrefix = "invalid-";
3841

39-
public FileSet(string bufferBaseFilename)
42+
public FileSet(string bufferBaseFilename, RollingInterval rollingInterval)
4043
{
4144
if (bufferBaseFilename == null) throw new ArgumentNullException(nameof(bufferBaseFilename));
4245

4346
_bookmarkFilename = Path.GetFullPath(bufferBaseFilename + ".bookmark");
4447
_logFolder = Path.GetDirectoryName(_bookmarkFilename);
4548
_candidateSearchPath = Path.GetFileName(bufferBaseFilename) + "-*.json";
46-
_filenameMatcher = new Regex("^" + Regex.Escape(Path.GetFileName(bufferBaseFilename)) + "-(?<date>\\d{8})(?<sequence>_[0-9]{3,}){0,1}\\.json$");
49+
var dateRegularExpressionPart = rollingInterval.GetMatchingDateRegularExpressionPart();
50+
_filenameMatcher = new Regex("^" + Regex.Escape(Path.GetFileName(bufferBaseFilename)) + "-(?<date>"
51+
+ dateRegularExpressionPart + ")(?<sequence>_[0-9]{3,}){0,1}\\.json$");
4752
}
4853

4954
public BookmarkFile OpenBookmarkFile()

src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/LogShipper.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public class LogShipper<TPayload> : IDisposable
7676
/// <param name="payloadReader"></param>
7777
/// <param name="retainedInvalidPayloadsLimitBytes"></param>
7878
/// <param name="bufferSizeLimitBytes"></param>
79+
/// <param name="rollingInterval"></param>
7980
public LogShipper(
8081
string bufferBaseFilename,
8182
int batchPostingLimit,
@@ -85,7 +86,8 @@ public LogShipper(
8586
ILogClient<TPayload> logClient,
8687
IPayloadReader<TPayload> payloadReader,
8788
long? retainedInvalidPayloadsLimitBytes,
88-
long? bufferSizeLimitBytes)
89+
long? bufferSizeLimitBytes,
90+
RollingInterval rollingInterval = RollingInterval.Day)
8991
{
9092
_batchPostingLimit = batchPostingLimit;
9193
_eventBodyLimitBytes = eventBodyLimitBytes;
@@ -95,7 +97,7 @@ public LogShipper(
9597
_connectionSchedule = new ExponentialBackoffConnectionSchedule(period);
9698
_retainedInvalidPayloadsLimitBytes = retainedInvalidPayloadsLimitBytes;
9799
_bufferSizeLimitBytes = bufferSizeLimitBytes;
98-
_fileSet = new FileSet(bufferBaseFilename);
100+
_fileSet = new FileSet(bufferBaseFilename, rollingInterval);
99101
_timer = new PortableTimer(c => OnTick());
100102
SetTimer();
101103

src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticsearchSinkOptions.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,13 @@ public int QueueSizeLimit
273273
/// When set to true splits the StackTrace by new line and writes it as a an array of strings.
274274
/// </summary>
275275
public bool FormatStackTraceAsArray { get; set; }
276+
277+
/// <summary>
278+
/// The interval at which buffer log files will roll over to a new file. The default is <see cref="RollingInterval.Day"/>.
279+
/// Less frequent intervals like <see cref="RollingInterval.Infinite"/>, <see cref="RollingInterval.Year"/>,
280+
/// <see cref="RollingInterval.Month"/> are not supported.
281+
/// </summary>
282+
public RollingInterval BufferFileRollingInterval { get; set; }
276283

277284
/// <summary>
278285
/// Configures the elasticsearch sink defaults
@@ -294,6 +301,7 @@ public ElasticsearchSinkOptions()
294301
this.BufferFileSizeLimitBytes = 100L * 1024 * 1024;
295302
this.FormatStackTraceAsArray = false;
296303
this.ConnectionPool = new SingleNodeConnectionPool(_defaultNode);
304+
this.BufferFileRollingInterval = RollingInterval.Day;
297305
}
298306

299307
/// <summary>
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
using System;
2+
using System.IO;
3+
using System.Text;
4+
using FluentAssertions;
5+
using Serilog.Sinks.Elasticsearch.Durable;
6+
using Xunit;
7+
8+
namespace Serilog.Sinks.Elasticsearch.Tests;
9+
10+
public class ElasticsearchPayloadReaderTests : IDisposable
11+
{
12+
private readonly string _tempFileFullPathTemplate;
13+
private string _bufferFileName;
14+
15+
public ElasticsearchPayloadReaderTests()
16+
{
17+
_tempFileFullPathTemplate = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString("N")) + "-{0}.json";
18+
}
19+
20+
public void Dispose()
21+
{
22+
if (!string.IsNullOrEmpty(_bufferFileName))
23+
{
24+
System.IO.File.Delete(_bufferFileName);
25+
}
26+
}
27+
28+
[Theory]
29+
[InlineData(RollingInterval.Day)]
30+
[InlineData(RollingInterval.Hour)]
31+
[InlineData(RollingInterval.Minute)]
32+
public void ReadPayload_ShouldReadSpecifiedTypesOfRollingFile(RollingInterval rollingInterval)
33+
{
34+
// Arrange
35+
var format = rollingInterval.GetFormat();
36+
var payloadReader = new ElasticsearchPayloadReader("testPipelineName",
37+
"TestTypeName",
38+
null,
39+
(_, _) => "TestIndex",
40+
ElasticOpType.Index,
41+
rollingInterval);
42+
var lines = new[]
43+
{
44+
rollingInterval.ToString()
45+
};
46+
_bufferFileName = string.Format(_tempFileFullPathTemplate,
47+
string.IsNullOrEmpty(format) ? string.Empty : new DateTime(2000, 1, 1).ToString(format));
48+
// Important to use UTF8 with BOM if we are starting from 0 position
49+
System.IO.File.WriteAllLines(_bufferFileName, lines, new UTF8Encoding(true));
50+
51+
// Act
52+
var fileSetPosition = new FileSetPosition(0, _bufferFileName);
53+
var count = 0;
54+
var payload = payloadReader.ReadPayload(int.MaxValue,
55+
null,
56+
ref fileSetPosition,
57+
ref count,
58+
_bufferFileName);
59+
60+
// Assert
61+
// Thus we ensure that file was properly handled by PayloadReader
62+
payload.Count.Should().Be(lines.Length * 2);
63+
payload[1].Should().Be(lines[0]);
64+
}
65+
66+
[Theory]
67+
[InlineData(RollingInterval.Infinite)]
68+
[InlineData(RollingInterval.Year)]
69+
[InlineData(RollingInterval.Month)]
70+
public void ElasticsearchPayloadReader_CannotUseRollingIntervalLessFrequentThanDay(RollingInterval rollingInterval)
71+
{
72+
// Arrange
73+
74+
// Act
75+
Action act = () => new ElasticsearchPayloadReader("testPipelineName",
76+
"TestTypeName",
77+
null,
78+
(_, _) => "TestIndex",
79+
ElasticOpType.Index,
80+
rollingInterval);
81+
82+
// Assert
83+
act.ShouldThrow<ArgumentException>()
84+
.WithMessage("Rolling intervals less frequent than RollingInterval.Day are not supported");
85+
}
86+
}

0 commit comments

Comments
 (0)