diff --git a/src/Auxiliary/Elasticsearch.Net.VirtualizedCluster/FixedPipelineFactory.cs b/src/Auxiliary/Elasticsearch.Net.VirtualizedCluster/FixedPipelineFactory.cs index 227550a3172..935170032b1 100644 --- a/src/Auxiliary/Elasticsearch.Net.VirtualizedCluster/FixedPipelineFactory.cs +++ b/src/Auxiliary/Elasticsearch.Net.VirtualizedCluster/FixedPipelineFactory.cs @@ -5,7 +5,7 @@ public class FixedPipelineFactory : IRequestPipelineFactory public FixedPipelineFactory(IConnectionConfigurationValues connectionSettings, IDateTimeProvider dateTimeProvider) { DateTimeProvider = dateTimeProvider; - MemoryStreamFactory = new MemoryStreamFactory(); + MemoryStreamFactory = RecyclableMemoryStreamFactory.Default; Settings = connectionSettings; Pipeline = Create(Settings, DateTimeProvider, MemoryStreamFactory, new SearchRequestParameters()); @@ -16,7 +16,7 @@ public FixedPipelineFactory(IConnectionConfigurationValues connectionSettings, I public IRequestPipeline Pipeline { get; } private IDateTimeProvider DateTimeProvider { get; } - private MemoryStreamFactory MemoryStreamFactory { get; } + private IMemoryStreamFactory MemoryStreamFactory { get; } private IConnectionConfigurationValues Settings { get; } private Transport Transport => diff --git a/src/Auxiliary/Elasticsearch.Net.VirtualizedCluster/MockResponses/SniffResponseBytes.cs b/src/Auxiliary/Elasticsearch.Net.VirtualizedCluster/MockResponses/SniffResponseBytes.cs index db0a0130236..bb1810f85cf 100644 --- a/src/Auxiliary/Elasticsearch.Net.VirtualizedCluster/MockResponses/SniffResponseBytes.cs +++ b/src/Auxiliary/Elasticsearch.Net.VirtualizedCluster/MockResponses/SniffResponseBytes.cs @@ -17,17 +17,17 @@ public static byte[] Create(IEnumerable nodes, string elasticsearchVersion cluster_name = ClusterName, nodes = SniffResponseNodes(nodes, elasticsearchVersion, publishAddressOverride, randomFqdn) }; - using (var ms = new MemoryStream()) + using (var ms = RecyclableMemoryStreamFactory.Default.Create()) { - new LowLevelRequestResponseSerializer().Serialize(response, ms); + LowLevelRequestResponseSerializer.Instance.Serialize(response, ms); return ms.ToArray(); } } private static IDictionary SniffResponseNodes( - IEnumerable nodes, + IEnumerable nodes, string elasticsearchVersion, - string publishAddressOverride, + string publishAddressOverride, bool randomFqdn ) => (from node in nodes diff --git a/src/Auxiliary/Elasticsearch.Net.VirtualizedCluster/Rules/RuleBase.cs b/src/Auxiliary/Elasticsearch.Net.VirtualizedCluster/Rules/RuleBase.cs index 132830c6de9..e530704b6a5 100644 --- a/src/Auxiliary/Elasticsearch.Net.VirtualizedCluster/Rules/RuleBase.cs +++ b/src/Auxiliary/Elasticsearch.Net.VirtualizedCluster/Rules/RuleBase.cs @@ -50,9 +50,9 @@ public TRule ReturnResponse(T response) where T : class { byte[] r; - using (var ms = new MemoryStream()) + using (var ms = RecyclableMemoryStreamFactory.Default.Create()) { - new LowLevelRequestResponseSerializer().Serialize(response, ms); + LowLevelRequestResponseSerializer.Instance.Serialize(response, ms); r = ms.ToArray(); } Self.ReturnResponse = r; diff --git a/src/Auxiliary/Elasticsearch.Net.VirtualizedCluster/VirtualClusterConnection.cs b/src/Auxiliary/Elasticsearch.Net.VirtualizedCluster/VirtualClusterConnection.cs index 8153a5bfdb7..d28b307320f 100644 --- a/src/Auxiliary/Elasticsearch.Net.VirtualizedCluster/VirtualClusterConnection.cs +++ b/src/Auxiliary/Elasticsearch.Net.VirtualizedCluster/VirtualClusterConnection.cs @@ -49,7 +49,7 @@ public static VirtualClusterConnection Success(byte[] response) => .StaticConnectionPool() .AllDefaults() .Connection; - + public static VirtualClusterConnection Error() => VirtualClusterWith .Nodes(1) @@ -100,7 +100,7 @@ public override TResponse Request(RequestData requestData) { if (!_calls.ContainsKey(requestData.Uri.Port)) throw new Exception($"Expected a call to happen on port {requestData.Uri.Port} but received none"); - + try { var state = _calls[requestData.Uri.Port]; @@ -264,9 +264,9 @@ private static byte[] CallResponse(TRule rule) if (_defaultResponseBytes != null) return _defaultResponseBytes; var response = DefaultResponse; - using (var ms = new MemoryStream()) + using (var ms = RecyclableMemoryStreamFactory.Default.Create()) { - new LowLevelRequestResponseSerializer().Serialize(response, ms); + LowLevelRequestResponseSerializer.Instance.Serialize(response, ms); _defaultResponseBytes = ms.ToArray(); } return _defaultResponseBytes; diff --git a/src/Elasticsearch.Net/Elasticsearch.Net.csproj b/src/Elasticsearch.Net/Elasticsearch.Net.csproj index a19b158def9..d457ae647d0 100644 --- a/src/Elasticsearch.Net/Elasticsearch.Net.csproj +++ b/src/Elasticsearch.Net/Elasticsearch.Net.csproj @@ -2,7 +2,7 @@ - netstandard2.0;net461 + netstandard2.0;netstandard2.1;net461 true true diff --git a/src/Elasticsearch.Net/Providers/MemoryStreamFactory.cs b/src/Elasticsearch.Net/Providers/MemoryStreamFactory.cs index af49d608aeb..62ecea79f29 100644 --- a/src/Elasticsearch.Net/Providers/MemoryStreamFactory.cs +++ b/src/Elasticsearch.Net/Providers/MemoryStreamFactory.cs @@ -2,6 +2,7 @@ namespace Elasticsearch.Net { + // TODO we use this in some places but its no longer clear to me why need to circumvent RecyclableMemoryStream in some cases /// /// A factory for creating memory streams using instances of /// diff --git a/src/Elasticsearch.Net/Providers/RecyclableMemoryStream.cs b/src/Elasticsearch.Net/Providers/RecyclableMemoryStream.cs index 88744ed4b27..05831904e94 100644 --- a/src/Elasticsearch.Net/Providers/RecyclableMemoryStream.cs +++ b/src/Elasticsearch.Net/Providers/RecyclableMemoryStream.cs @@ -32,31 +32,33 @@ namespace Elasticsearch.Net { /// - /// MemoryStream implementation that deals with pooling and managing memory streams which use potentially large - /// buffers. + /// MemoryStream implementation that deals with pooling and managing memory streams which use potentially large + /// buffers. /// /// - /// This class works in tandem with the RecylableMemoryStreamManager to supply MemoryStream - /// objects to callers, while avoiding these specific problems: - /// 1. LOH allocations - since all large buffers are pooled, they will never incur a Gen2 GC - /// 2. Memory waste - A standard memory stream doubles its size when it runs out of room. This - /// leads to continual memory growth as each stream approaches the maximum allowed size. - /// 3. Memory copying - Each time a MemoryStream grows, all the bytes are copied into new buffers. - /// This implementation only copies the bytes when GetBuffer is called. - /// 4. Memory fragmentation - By using homogeneous buffer sizes, it ensures that blocks of memory - /// can be easily reused. - /// The stream is implemented on top of a series of uniformly-sized blocks. As the stream's length grows, - /// additional blocks are retrieved from the memory manager. It is these blocks that are pooled, not the stream - /// object itself. - /// The biggest wrinkle in this implementation is when GetBuffer() is called. This requires a single - /// contiguous buffer. If only a single block is in use, then that block is returned. If multiple blocks - /// are in use, we retrieve a larger buffer from the memory manager. These large buffers are also pooled, - /// split by size--they are multiples of a chunk size (1 MB by default). - /// Once a large buffer is assigned to the stream the blocks are NEVER again used for this stream. All operations take place on the - /// large buffer. The large buffer can be replaced by a larger buffer from the pool as needed. All blocks and large buffers - /// are maintained in the stream until the stream is disposed (unless AggressiveBufferReturn is enabled in the stream manager). + /// This class works in tandem with the RecyclableMemoryStreamManager to supply MemoryStream + /// objects to callers, while avoiding these specific problems: + /// 1. LOH allocations - since all large buffers are pooled, they will never incur a Gen2 GC + /// 2. Memory waste - A standard memory stream doubles its size when it runs out of room. This + /// leads to continual memory growth as each stream approaches the maximum allowed size. + /// 3. Memory copying - Each time a MemoryStream grows, all the bytes are copied into new buffers. + /// This implementation only copies the bytes when GetBuffer is called. + /// 4. Memory fragmentation - By using homogeneous buffer sizes, it ensures that blocks of memory + /// can be easily reused. + /// The stream is implemented on top of a series of uniformly-sized blocks. As the stream's length grows, + /// additional blocks are retrieved from the memory manager. It is these blocks that are pooled, not the stream + /// object itself. + /// The biggest wrinkle in this implementation is when GetBuffer() is called. This requires a single + /// contiguous buffer. If only a single block is in use, then that block is returned. If multiple blocks + /// are in use, we retrieve a larger buffer from the memory manager. These large buffers are also pooled, + /// split by size--they are multiples/exponentials of a chunk size (1 MB by default). + /// Once a large buffer is assigned to the stream the blocks are NEVER again used for this stream. All operations take + /// place on the + /// large buffer. The large buffer can be replaced by a larger buffer from the pool as needed. All blocks and large buffers + /// are maintained in the stream until the stream is disposed (unless AggressiveBufferReturn is enabled in the stream + /// manager). /// - internal class RecyclableMemoryStream : MemoryStream + internal sealed class RecyclableMemoryStream : MemoryStream { private const long MaxStreamLength = int.MaxValue; @@ -67,6 +69,12 @@ internal class RecyclableMemoryStream : MemoryStream /// private readonly List _blocks = new List(1); + /// + /// This buffer exists so that WriteByte can forward all of its calls to Write + /// without creating a new byte[] buffer on every call. + /// + private readonly byte[] _byteBuffer = new byte[1]; + private readonly Guid _id; private readonly RecyclableMemoryStreamManager _memoryManager; @@ -93,16 +101,73 @@ internal class RecyclableMemoryStream : MemoryStream /// private byte[] _largeBuffer; - private int _length; + /// + /// Unique identifier for this stream across it's entire lifetime + /// + /// Object has been disposed + internal Guid Id + { + get + { + CheckDisposed(); + return _id; + } + } - private int _position; + /// + /// A temporary identifier for the current usage of this stream. + /// + /// Object has been disposed + internal string Tag + { + get + { + CheckDisposed(); + return _tag; + } + } + + /// + /// Gets the memory manager being used by this stream. + /// + /// Object has been disposed + internal RecyclableMemoryStreamManager MemoryManager + { + get + { + CheckDisposed(); + return _memoryManager; + } + } + + /// + /// Callstack of the constructor. It is only set if MemoryManager.GenerateCallStacks is true, + /// which should only be in debugging situations. + /// + internal string AllocationStack { get; } + + /// + /// Callstack of the Dispose call. It is only set if MemoryManager.GenerateCallStacks is true, + /// which should only be in debugging situations. + /// + internal string DisposeStack { get; private set; } + + #region Constructors /// /// Allocate a new RecyclableMemoryStream object. /// /// The memory manager public RecyclableMemoryStream(RecyclableMemoryStreamManager memoryManager) - : this(memoryManager, null, 0, null) { } + : this(memoryManager, Guid.NewGuid(), null, 0, null) { } + + /// + /// Allocate a new RecyclableMemoryStream object. + /// + /// The memory manager + /// A unique identifier which can be used to trace usages of the stream. + public RecyclableMemoryStream(RecyclableMemoryStreamManager memoryManager, Guid id) + : this(memoryManager, id, null, 0, null) { } /// /// Allocate a new RecyclableMemoryStream object @@ -110,7 +175,16 @@ public RecyclableMemoryStream(RecyclableMemoryStreamManager memoryManager) /// The memory manager /// A string identifying this stream for logging and debugging purposes public RecyclableMemoryStream(RecyclableMemoryStreamManager memoryManager, string tag) - : this(memoryManager, tag, 0, null) { } + : this(memoryManager, Guid.NewGuid(), tag, 0, null) { } + + /// + /// Allocate a new RecyclableMemoryStream object + /// + /// The memory manager + /// A unique identifier which can be used to trace usages of the stream. + /// A string identifying this stream for logging and debugging purposes + public RecyclableMemoryStream(RecyclableMemoryStreamManager memoryManager, Guid id, string tag) + : this(memoryManager, id, tag, 0, null) { } /// /// Allocate a new RecyclableMemoryStream object @@ -119,25 +193,35 @@ public RecyclableMemoryStream(RecyclableMemoryStreamManager memoryManager, strin /// A string identifying this stream for logging and debugging purposes /// The initial requested size to prevent future allocations public RecyclableMemoryStream(RecyclableMemoryStreamManager memoryManager, string tag, int requestedSize) - : this(memoryManager, tag, requestedSize, null) { } + : this(memoryManager, Guid.NewGuid(), tag, requestedSize, null) { } /// /// Allocate a new RecyclableMemoryStream object /// /// The memory manager + /// A unique identifier which can be used to trace usages of the stream. + /// A string identifying this stream for logging and debugging purposes + /// The initial requested size to prevent future allocations + public RecyclableMemoryStream(RecyclableMemoryStreamManager memoryManager, Guid id, string tag, int requestedSize) + : this(memoryManager, id, tag, requestedSize, null) { } + + /// + /// Allocate a new RecyclableMemoryStream object + /// + /// The memory manager + /// A unique identifier which can be used to trace usages of the stream. /// A string identifying this stream for logging and debugging purposes /// The initial requested size to prevent future allocations /// - /// An initial buffer to use. This buffer will be owned by the stream and returned to the memory manager upon - /// Dispose. + /// An initial buffer to use. This buffer will be owned by the stream and returned to the + /// memory manager upon Dispose. /// - internal RecyclableMemoryStream(RecyclableMemoryStreamManager memoryManager, string tag, int requestedSize, - byte[] initialLargeBuffer + internal RecyclableMemoryStream(RecyclableMemoryStreamManager memoryManager, Guid id, string tag, int requestedSize, byte[] initialLargeBuffer ) : base(EmptyArray) { _memoryManager = memoryManager; - _id = Guid.NewGuid(); + _id = id; _tag = tag; if (requestedSize < memoryManager.BlockSize) requestedSize = memoryManager.BlockSize; @@ -146,27 +230,98 @@ byte[] initialLargeBuffer EnsureCapacity(requestedSize); else _largeBuffer = initialLargeBuffer; + + if (_memoryManager.GenerateCallStacks) AllocationStack = Environment.StackTrace; + + RecyclableMemoryStreamManager.EventsWriter.MemoryStreamCreated(_id, _tag, requestedSize); + _memoryManager.ReportStreamCreated(); } - /// - /// Whether the stream can currently read - /// - public override bool CanRead => !Disposed; + #endregion - /// - /// Whether the stream can currently seek - /// - public override bool CanSeek => !Disposed; + #region Dispose and Finalize + + ~RecyclableMemoryStream() => Dispose(false); /// - /// Always false + /// Returns the memory used by this stream back to the pool. /// - public override bool CanTimeout => false; + /// Whether we're disposing (true), or being called by the finalizer (false) + [SuppressMessage("Microsoft.Usage", "CA1816:CallGCSuppressFinalizeCorrectly", + Justification = "We have different disposal semantics, so SuppressFinalize is in a different spot.")] + protected override void Dispose(bool disposing) + { + if (Interlocked.CompareExchange(ref _disposedState, 1, 0) != 0) + { + string doubleDisposeStack = null; + if (_memoryManager.GenerateCallStacks) doubleDisposeStack = Environment.StackTrace; + + RecyclableMemoryStreamManager.EventsWriter.MemoryStreamDoubleDispose(_id, _tag, + AllocationStack, + DisposeStack, + doubleDisposeStack); + return; + } + + RecyclableMemoryStreamManager.EventsWriter.MemoryStreamDisposed(_id, _tag); + + if (_memoryManager.GenerateCallStacks) DisposeStack = Environment.StackTrace; + + if (disposing) + { + _memoryManager.ReportStreamDisposed(); + + GC.SuppressFinalize(this); + } + else + { + // We're being finalized. + + RecyclableMemoryStreamManager.EventsWriter.MemoryStreamFinalized(_id, _tag, AllocationStack); + +#if !NETSTANDARD1_4 + if (AppDomain.CurrentDomain.IsFinalizingForUnload()) + { + // If we're being finalized because of a shutdown, don't go any further. + // We have no idea what's already been cleaned up. Triggering events may cause + // a crash. + base.Dispose(disposing); + return; + } +#endif + + _memoryManager.ReportStreamFinalized(); + } + + _memoryManager.ReportStreamLength(_length); + + if (_largeBuffer != null) _memoryManager.ReturnLargeBuffer(_largeBuffer, _tag); + + if (_dirtyBuffers != null) + foreach (var buffer in _dirtyBuffers) + _memoryManager.ReturnLargeBuffer(buffer, _tag); + + _memoryManager.ReturnBlocks(_blocks, _tag); + _blocks.Clear(); + + base.Dispose(disposing); + } /// - /// Whether the stream can currently write + /// Equivalent to Dispose /// - public override bool CanWrite => !Disposed; +#if NETSTANDARD1_4 + public void Close() +#else + public override void Close() +#endif + { + Dispose(true); + } + + #endregion + + #region MemoryStream overrides /// /// Gets or sets the capacity @@ -196,6 +351,8 @@ public override int Capacity } } + private int _length; + /// /// Gets the number of bytes written to this stream. /// @@ -209,6 +366,8 @@ public override long Length } } + private int _position; + /// /// Gets the current position in the stream /// @@ -232,90 +391,24 @@ public override long Position } /// - /// Unique identifier for this stream across it's entire lifetime - /// - /// Object has been disposed - internal Guid Id - { - get - { - CheckDisposed(); - return _id; - } - } - - /// - /// Gets the memory manager being used by this stream. + /// Whether the stream can currently read /// - /// Object has been disposed - internal RecyclableMemoryStreamManager MemoryManager - { - get - { - CheckDisposed(); - return _memoryManager; - } - } + public override bool CanRead => !Disposed; /// - /// A temporary identifier for the current usage of this stream. + /// Whether the stream can currently seek /// - /// Object has been disposed - internal string Tag - { - get - { - CheckDisposed(); - return _tag; - } - } - - private bool Disposed => Interlocked.Read(ref _disposedState) != 0; - - ~RecyclableMemoryStream() => Dispose(false); + public override bool CanSeek => !Disposed; /// - /// Returns the memory used by this stream back to the pool. + /// Always false /// - /// Whether we're disposing (true), or being called by the finalizer (false) - [SuppressMessage("Microsoft.Usage", "CA1816:CallGCSuppressFinalizeCorrectly", - Justification = "We have different disposal semantics, so SuppressFinalize is in a different spot.")] - protected override void Dispose(bool disposing) - { - if (Interlocked.CompareExchange(ref _disposedState, 1, 0) != 0) return; - - if (disposing) - GC.SuppressFinalize(this); - else - { -#if !DOTNETCORE - if (AppDomain.CurrentDomain.IsFinalizingForUnload()) - { - // If we're being finalized because of a shutdown, don't go any further. - // We have no idea what's already been cleaned up. Triggering events may cause - // a crash. - base.Dispose(disposing); - return; - } -#endif - } - - if (_largeBuffer != null) _memoryManager.ReturnLargeBuffer(_largeBuffer, _tag); - - if (_dirtyBuffers != null) - foreach (var buffer in _dirtyBuffers) - _memoryManager.ReturnLargeBuffer(buffer, _tag); - - _memoryManager.ReturnBlocks(_blocks, _tag); - _blocks.Clear(); - - base.Dispose(disposing); - } + public override bool CanTimeout => false; /// - /// Equivalent to Dispose + /// Whether the stream can currently write /// - public override void Close() => Dispose(true); + public override bool CanWrite => !Disposed; /// /// Returns a single buffer containing the contents of the stream. @@ -327,7 +420,11 @@ protected override void Dispose(bool disposing) /// until Dispose is called, but the next time GetBuffer() is called, a new buffer from the pool will be required. /// /// Object has been disposed +#if NETSTANDARD1_4 + public byte[] GetBuffer() +#else public override byte[] GetBuffer() +#endif { CheckDisposed(); @@ -356,7 +453,29 @@ public override byte[] GetBuffer() } /// - /// Returns a new array with a copy of the buffer's contents. You should almost certainly be using GetBuffer combined with the Length to + /// Returns an ArraySegment that wraps a single buffer containing the contents of the stream. + /// + /// An ArraySegment containing a reference to the underlying bytes. + /// Always returns true. + /// + /// GetBuffer has no failure modes (it always returns something, even if it's an empty buffer), therefore this method + /// always returns a valid ArraySegment to the same buffer returned by GetBuffer. + /// +#if NET40 || NET45 + public bool TryGetBuffer(out ArraySegment buffer) +#else + public override bool TryGetBuffer(out ArraySegment buffer) +#endif + { + CheckDisposed(); + buffer = new ArraySegment(GetBuffer(), 0, (int)Length); + // GetBuffer has no failure modes, so this should always succeed + return true; + } + + /// + /// Returns a new array with a copy of the buffer's contents. You should almost certainly be using GetBuffer combined with + /// the Length to /// access the bytes in this stream. Calling ToArray will destroy the benefits of pooled buffers, but it is included /// for the sake of completeness. /// @@ -369,6 +488,10 @@ public override byte[] ToArray() var newBuffer = new byte[Length]; InternalRead(newBuffer, 0, _length, 0); + var stack = _memoryManager.GenerateCallStacks ? Environment.StackTrace : null; + RecyclableMemoryStreamManager.EventsWriter.MemoryStreamToArray(_id, _tag, stack, 0); + _memoryManager.ReportStreamToArray(); + return newBuffer; } #pragma warning restore CS0809 @@ -414,6 +537,32 @@ public int SafeRead(byte[] buffer, int offset, int count, ref int streamPosition return amountRead; } +#if NETCOREAPP2_1 || NETSTANDARD2_1 + /// + /// Reads from the current position into the provided buffer + /// + /// Destination buffer + /// The number of bytes read + /// Object has been disposed + public override int Read(Span buffer) => SafeRead(buffer, ref _position); + + /// + /// Reads from the specified position into the provided buffer + /// + /// Destination buffer + /// Position in the stream to start reading from + /// The number of bytes read + /// Object has been disposed + public int SafeRead(Span buffer, ref int streamPosition) + { + CheckDisposed(); + + var amountRead = InternalRead(buffer, streamPosition); + streamPosition += amountRead; + return amountRead; + } +#endif + /// /// Writes the buffer to the stream /// @@ -442,10 +591,6 @@ public override void Write(byte[] buffer, int offset, int count) // Check for overflow if (end > MaxStreamLength) throw new IOException("Maximum capacity exceeded"); - var requiredBuffers = (end + blockSize - 1) / blockSize; - - if (requiredBuffers * blockSize > MaxStreamLength) throw new IOException("Maximum capacity exceeded"); - EnsureCapacity((int)end); if (_largeBuffer == null) @@ -476,6 +621,55 @@ public override void Write(byte[] buffer, int offset, int count) _length = Math.Max(_position, _length); } +#if NETCOREAPP2_1 || NETSTANDARD2_1 + /// + /// Writes the buffer to the stream + /// + /// Source buffer + /// buffer is null + /// Object has been disposed + public override void Write(ReadOnlySpan source) + { + CheckDisposed(); + + var blockSize = _memoryManager.BlockSize; + var end = (long)_position + source.Length; + // Check for overflow + if (end > MaxStreamLength) + { + throw new IOException("Maximum capacity exceeded"); + } + + EnsureCapacity((int)end); + + if (_largeBuffer == null) + { + var blockAndOffset = GetBlockAndRelativeOffset(_position); + + while (source.Length > 0) + { + var currentBlock = _blocks[blockAndOffset.Block]; + var remainingInBlock = blockSize - blockAndOffset.Offset; + var amountToWriteInBlock = Math.Min(remainingInBlock, source.Length); + + source.Slice(0, amountToWriteInBlock) + .CopyTo(currentBlock.AsSpan(blockAndOffset.Offset)); + + source = source.Slice(amountToWriteInBlock); + + ++blockAndOffset.Block; + blockAndOffset.Offset = 0; + } + } + else + { + source.CopyTo(_largeBuffer.AsSpan(_position)); + } + _position = (int)end; + _length = Math.Max(_position, _length); + } +#endif + /// /// Returns a useful string for debugging. This should not normally be called in actual production code. /// @@ -489,25 +683,8 @@ public override void Write(byte[] buffer, int offset, int count) public override void WriteByte(byte value) { CheckDisposed(); - var end = _position + 1; - - // Check for overflow - if (end > MaxStreamLength) throw new IOException("Maximum capacity exceeded"); - - EnsureCapacity(end); - if (_largeBuffer == null) - { - var blockSize = _memoryManager.BlockSize; - var block = _position / blockSize; - var offset = _position - block * blockSize; - var currentBlock = _blocks[block]; - currentBlock[offset] = value; - } - else - _largeBuffer[_position] = value; - - _position = end; - _length = Math.Max(_position, _length); + _byteBuffer[0] = value; + Write(_byteBuffer, 0, 1); } /// @@ -623,6 +800,12 @@ public override void WriteTo(Stream stream) stream.Write(_largeBuffer, 0, _length); } + #endregion + + #region Helper Methods + + private bool Disposed => Interlocked.Read(ref _disposedState) != 0; + private void CheckDisposed() { if (Disposed) throw new ObjectDisposedException($"The stream with Id {_id} and Tag {_tag} is disposed."); @@ -644,7 +827,6 @@ private int InternalRead(byte[] buffer, int offset, int count, int fromPosition) { amountToCopy = Math.Min(_blocks[blockAndOffset.Block].Length - blockAndOffset.Offset, bytesRemaining); - Buffer.BlockCopy(_blocks[blockAndOffset.Block], blockAndOffset.Offset, buffer, bytesWritten + offset, amountToCopy); @@ -661,6 +843,55 @@ private int InternalRead(byte[] buffer, int offset, int count, int fromPosition) return amountToCopy; } +#if NETCOREAPP2_1 || NETSTANDARD2_1 + private int InternalRead(Span buffer, int fromPosition) + { + if (_length - fromPosition <= 0) + { + return 0; + } + + int amountToCopy; + + if (_largeBuffer == null) + { + var blockAndOffset = GetBlockAndRelativeOffset(fromPosition); + var bytesWritten = 0; + var bytesRemaining = Math.Min(buffer.Length, _length - fromPosition); + + while (bytesRemaining > 0) + { + amountToCopy = Math.Min(_blocks[blockAndOffset.Block].Length - blockAndOffset.Offset, + bytesRemaining); + _blocks[blockAndOffset.Block].AsSpan(blockAndOffset.Offset, amountToCopy) + .CopyTo(buffer.Slice(bytesWritten)); + + bytesWritten += amountToCopy; + bytesRemaining -= amountToCopy; + + ++blockAndOffset.Block; + blockAndOffset.Offset = 0; + } + return bytesWritten; + } + amountToCopy = Math.Min(buffer.Length, _length - fromPosition); + _largeBuffer.AsSpan(fromPosition, amountToCopy).CopyTo(buffer); + return amountToCopy; + } +#endif + + private struct BlockAndOffset + { + public int Block; + public int Offset; + + public BlockAndOffset(int block, int offset) + { + Block = block; + Offset = offset; + } + } + private BlockAndOffset GetBlockAndRelativeOffset(int offset) { var blockSize = _memoryManager.BlockSize; @@ -670,8 +901,14 @@ private BlockAndOffset GetBlockAndRelativeOffset(int offset) private void EnsureCapacity(int newCapacity) { if (newCapacity > _memoryManager.MaximumStreamCapacity && _memoryManager.MaximumStreamCapacity > 0) + { + RecyclableMemoryStreamManager.EventsWriter.MemoryStreamOverCapacity(newCapacity, + _memoryManager + .MaximumStreamCapacity, _tag, + AllocationStack); throw new InvalidOperationException("Requested capacity is too large: " + newCapacity + ". Limit is " + _memoryManager.MaximumStreamCapacity); + } if (_largeBuffer != null) { @@ -697,23 +934,15 @@ private void ReleaseLargeBuffer() _memoryManager.ReturnLargeBuffer(_largeBuffer, _tag); else { - if (_dirtyBuffers == null) _dirtyBuffers = new List(1); + if (_dirtyBuffers == null) + // We most likely will only ever need space for one + _dirtyBuffers = new List(1); _dirtyBuffers.Add(_largeBuffer); } _largeBuffer = null; } - private struct BlockAndOffset - { - public int Block; - public int Offset; - - public BlockAndOffset(int block, int offset) - { - Block = block; - Offset = offset; - } - } + #endregion } } diff --git a/src/Elasticsearch.Net/Providers/RecyclableMemoryStreamFactory.cs b/src/Elasticsearch.Net/Providers/RecyclableMemoryStreamFactory.cs index 82fba01caf9..63fe7d842d3 100644 --- a/src/Elasticsearch.Net/Providers/RecyclableMemoryStreamFactory.cs +++ b/src/Elasticsearch.Net/Providers/RecyclableMemoryStreamFactory.cs @@ -1,4 +1,6 @@ -using System.IO; +using System; +using System.IO; +using System.Threading.Tasks; namespace Elasticsearch.Net { @@ -7,17 +9,32 @@ namespace Elasticsearch.Net /// public class RecyclableMemoryStreamFactory : IMemoryStreamFactory { + private const string TagSource = "Elasticsearch.Net"; private readonly RecyclableMemoryStreamManager _manager; public static RecyclableMemoryStreamFactory Default { get; } = new RecyclableMemoryStreamFactory(); - public RecyclableMemoryStreamFactory() => - _manager = new RecyclableMemoryStreamManager { AggressiveBufferReturn = true }; + public RecyclableMemoryStreamFactory() + { +// var blockSize = 1024; +// var largeBufferMultiple = 1024 * 1024; +// var maxBufferSize = 16 * largeBufferMultiple; +// _manager = new RecyclableMemoryStreamManager(blockSize, largeBufferMultiple, maxBufferSize) +// { +// //AggressiveBufferReturn = true, +// MaximumFreeLargePoolBytes = maxBufferSize * 4, +// MaximumFreeSmallPoolBytes = 100 * blockSize +// }; + _manager = new RecyclableMemoryStreamManager() + { + //AggressiveBufferReturn = true, + }; + } - public MemoryStream Create() => _manager.GetStream(); + public MemoryStream Create() => _manager.GetStream(Guid.Empty, TagSource); - public MemoryStream Create(byte[] bytes) => new MemoryStream(bytes); + public MemoryStream Create(byte[] bytes) => _manager.GetStream(bytes); - public MemoryStream Create(byte[] bytes, int index, int count) => _manager.GetStream(string.Empty, bytes, index, count); + public MemoryStream Create(byte[] bytes, int index, int count) => _manager.GetStream(Guid.Empty, TagSource, bytes, index, count); } } diff --git a/src/Elasticsearch.Net/Providers/RecyclableMemoryStreamManager-Events.cs b/src/Elasticsearch.Net/Providers/RecyclableMemoryStreamManager-Events.cs new file mode 100644 index 00000000000..02bfed6f465 --- /dev/null +++ b/src/Elasticsearch.Net/Providers/RecyclableMemoryStreamManager-Events.cs @@ -0,0 +1,204 @@ +// --------------------------------------------------------------------- +// Copyright (c) 2015 Microsoft +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. +// --------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Diagnostics.Tracing; +using System.Threading; + +namespace Elasticsearch.Net +{ +#if !DOTNETCORE + /// + /// Stub for System.Diagnostics.Tracing.EventCounter which is not available on .NET 4.6.1 + /// + internal class EventCounter + { + public EventCounter(string blocks, RecyclableMemoryStreamManager.Events eventsWriter) { } + + public void WriteMetric(long v) { } + } +#endif +#if !NETSTANDARD2_1 + internal class PollingCounter : IDisposable + { + public PollingCounter(string largeBuffers, RecyclableMemoryStreamManager.Events eventsWriter, Func func) { } + + public void Dispose() {} + } +#endif + + internal sealed partial class RecyclableMemoryStreamManager + { + public static readonly Events EventsWriter = new Events(); + private Counters Counter { get; } + + public sealed class Counters : IDisposable + { + private ReadOnlyCollection Polls { get;} + + public Counters(RecyclableMemoryStreamManager instance) + { + PollingCounter Create(string name, Func poll, string description) => + new PollingCounter(name, EventsWriter, poll) + { +#if NETSTANDARD2_1 + DisplayName = description +#endif + }; + + + var polls = new List() + { + { Create("blocks", () => _blocks, "Pooled blocks active")}, + { Create("large-buffers", () => _largeBuffers, "Large buffers active")}, + { Create("large-buffers-free", () => instance.LargeBuffersFree, "Large buffers free")}, + { Create("large-pool-inuse", () => instance.LargePoolInUseSize, "Large pool in use size")}, + { Create("small-pool-free", () => instance.SmallBlocksFree, "Small pool free blocks")}, + { Create("small-pool-inuse", () => instance.SmallPoolInUseSize, "Small pool in use size")}, + { Create("small-pool-free", () => instance.SmallPoolFreeSize, "Small pool free size")}, + { Create("small-pool-max", () => instance.MaximumFreeSmallPoolBytes, "Small pool max size")}, + { Create("memory-streams", () => _memoryStreams, "Active memory streams")}, + }; + Polls = new ReadOnlyCollection(polls); + + + } + + private long _blocks = 0; + internal void ReportBlockCreated() => Interlocked.Increment(ref _blocks); + + internal void ReportBlockDiscarded() => Interlocked.Decrement(ref _blocks); + + private long _largeBuffers = 0; + internal void ReportLargeBufferCreated() => Interlocked.Increment(ref _largeBuffers); + + internal void ReportLargeBufferDiscarded() => Interlocked.Decrement(ref _largeBuffers); + + private long _memoryStreams = 0; + internal void ReportStreamCreated() => Interlocked.Increment(ref _memoryStreams); + + internal void ReportStreamDisposed() => Interlocked.Decrement(ref _memoryStreams); + + public void Dispose() + { + foreach(var p in Polls) p.Dispose(); + } + } + + [EventSource(Name = "Elasticsearch-Net-RecyclableMemoryStream", Guid = "{AD44FDAC-D3FC-460A-9EBE-E55A3569A8F6}")] + public sealed class Events : EventSource + { + + public enum MemoryStreamBufferType + { + Small, + Large + } + + public enum MemoryStreamDiscardReason + { + TooLarge, + EnoughFree + } + + [Event(1, Level = EventLevel.Verbose)] + public void MemoryStreamCreated(Guid guid, string tag, int requestedSize) + { + if (IsEnabled(EventLevel.Verbose, EventKeywords.None)) + WriteEvent(1, guid, tag ?? string.Empty, requestedSize); + } + + [Event(2, Level = EventLevel.Verbose)] + public void MemoryStreamDisposed(Guid guid, string tag) + { + if (IsEnabled(EventLevel.Verbose, EventKeywords.None)) + WriteEvent(2, guid, tag ?? string.Empty); + } + + [Event(3, Level = EventLevel.Critical)] + public void MemoryStreamDoubleDispose(Guid guid, string tag, string allocationStack, string disposeStack1, string disposeStack2) + { + if (IsEnabled()) + WriteEvent(3, guid, tag ?? string.Empty, allocationStack ?? string.Empty, + disposeStack1 ?? string.Empty, disposeStack2 ?? string.Empty); + } + + [Event(4, Level = EventLevel.Error)] + public void MemoryStreamFinalized(Guid guid, string tag, string allocationStack) + { + if (IsEnabled()) + WriteEvent(4, guid, tag ?? string.Empty, allocationStack ?? string.Empty); + } + + [Event(5, Level = EventLevel.Verbose)] + public void MemoryStreamToArray(Guid guid, string tag, string stack, int size) + { + if (IsEnabled(EventLevel.Verbose, EventKeywords.None)) + WriteEvent(5, guid, tag ?? string.Empty, stack ?? string.Empty, size); + } + + [Event(6, Level = EventLevel.Informational)] + public void MemoryStreamManagerInitialized(int blockSize, int largeBufferMultiple, int maximumBufferSize) + { + if (IsEnabled()) + WriteEvent(6, blockSize, largeBufferMultiple, maximumBufferSize); + } + + [Event(7, Level = EventLevel.Verbose)] + public void MemoryStreamNewBlockCreated(long smallPoolInUseBytes) + { + if (IsEnabled(EventLevel.Verbose, EventKeywords.None)) + WriteEvent(7, smallPoolInUseBytes); + } + + [Event(8, Level = EventLevel.Verbose)] + public void MemoryStreamNewLargeBufferCreated(int requiredSize, long largePoolInUseBytes) + { + if (IsEnabled(EventLevel.Verbose, EventKeywords.None)) + WriteEvent(8, requiredSize, largePoolInUseBytes); + } + + [Event(9, Level = EventLevel.Verbose)] + public void MemoryStreamNonPooledLargeBufferCreated(int requiredSize, string tag, string allocationStack) + { + if (IsEnabled(EventLevel.Verbose, EventKeywords.None)) + WriteEvent(9, requiredSize, tag ?? string.Empty, allocationStack ?? string.Empty); + } + + [Event(10, Level = EventLevel.Warning)] + public void MemoryStreamDiscardBuffer(MemoryStreamBufferType bufferType, string tag, MemoryStreamDiscardReason reason) + { + if (IsEnabled()) + WriteEvent(10, bufferType, tag ?? string.Empty, reason); + } + + [Event(11, Level = EventLevel.Error)] + public void MemoryStreamOverCapacity(int requestedCapacity, long maxCapacity, string tag, string allocationStack) + { + if (IsEnabled()) + WriteEvent(11, requestedCapacity, maxCapacity, tag ?? string.Empty, allocationStack ?? string.Empty); + } + } + } +} diff --git a/src/Elasticsearch.Net/Providers/RecyclableMemoryStreamManager.cs b/src/Elasticsearch.Net/Providers/RecyclableMemoryStreamManager.cs index e3522317451..412f619b82a 100644 --- a/src/Elasticsearch.Net/Providers/RecyclableMemoryStreamManager.cs +++ b/src/Elasticsearch.Net/Providers/RecyclableMemoryStreamManager.cs @@ -19,21 +19,55 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. // --------------------------------------------------------------------- -// -// https://github.com/Microsoft/Microsoft.IO.RecyclableMemoryStream/blob/master/src/RecyclableMemoryStreamManager.cs -// MIT license: https://github.com/Microsoft/Microsoft.IO.RecyclableMemoryStream/blob/master/LICENSE using System; using System.Collections.Concurrent; using System.Collections.Generic; -using System.Diagnostics.CodeAnalysis; using System.IO; using System.Threading; namespace Elasticsearch.Net { - internal class RecyclableMemoryStreamManager + /// + /// Manages pools of RecyclableMemoryStream objects. + /// + /// + /// There are two pools managed in here. The small pool contains same-sized buffers that are handed to streams + /// as they write more data. + /// For scenarios that need to call GetBuffer(), the large pool contains buffers of various sizes, all + /// multiples/exponentials of LargeBufferMultiple (1 MB by default). They are split by size to avoid overly-wasteful buffer + /// usage. There should be far fewer 8 MB buffers than 1 MB buffers, for example. + /// + internal partial class RecyclableMemoryStreamManager { + /// + /// Generic delegate for handling events without any arguments. + /// + public delegate void EventHandler(); + + /// + /// Delegate for handling large buffer discard reports. + /// + /// Reason the buffer was discarded. + public delegate void LargeBufferDiscardedEventHandler(Events.MemoryStreamDiscardReason reason); + + /// + /// Delegate for handling reports of stream size when streams are allocated + /// + /// Bytes allocated. + public delegate void StreamLengthReportHandler(long bytes); + + /// + /// Delegate for handling periodic reporting of memory use statistics. + /// + /// Bytes currently in use in the small pool. + /// Bytes currently free in the small pool. + /// Bytes currently in use in the large pool. + /// Bytes currently free in the large pool. + public delegate void UsageReportEventHandler( + long smallPoolInUseBytes, long smallPoolFreeBytes, long largePoolInUseBytes, long largePoolFreeBytes + ); + public const int DefaultBlockSize = 128 * 1024; public const int DefaultLargeBufferMultiple = 1024 * 1024; public const int DefaultMaximumBufferSize = 128 * 1024 * 1024; @@ -41,9 +75,11 @@ internal class RecyclableMemoryStreamManager private readonly long[] _largeBufferFreeSize; private readonly long[] _largeBufferInUseSize; + /// /// pools[0] = 1x largeBufferMultiple buffers /// pools[1] = 2x largeBufferMultiple buffers + /// pools[2] = 3x(multiple)/4x(exponential) largeBufferMultiple buffers /// etc., up to maximumBufferSize /// private readonly ConcurrentStack[] _largePools; @@ -57,7 +93,7 @@ internal class RecyclableMemoryStreamManager /// Initializes the memory manager with the default block/buffer specifications. /// public RecyclableMemoryStreamManager() - : this(DefaultBlockSize, DefaultLargeBufferMultiple, DefaultMaximumBufferSize) { } + : this(DefaultBlockSize, DefaultLargeBufferMultiple, DefaultMaximumBufferSize, false) { } /// /// Initializes the memory manager with the given block requiredSize. @@ -66,11 +102,26 @@ public RecyclableMemoryStreamManager() /// Each large buffer will be a multiple of this value. /// Buffers larger than this are not pooled /// - /// blockSize is not a positive number, or largeBufferMultiple is not a positive number, or - /// maximumBufferSize is less than blockSize. + /// blockSize is not a positive number, or largeBufferMultiple is not a + /// positive number, or maximumBufferSize is less than blockSize. /// /// maximumBufferSize is not a multiple of largeBufferMultiple public RecyclableMemoryStreamManager(int blockSize, int largeBufferMultiple, int maximumBufferSize) + : this(blockSize, largeBufferMultiple, maximumBufferSize, false) { } + + /// + /// Initializes the memory manager with the given block requiredSize. + /// + /// Size of each block that is pooled. Must be > 0. + /// Each large buffer will be a multiple/exponential of this value. + /// Buffers larger than this are not pooled + /// Switch to exponential large buffer allocation strategy + /// + /// blockSize is not a positive number, or largeBufferMultiple is not a + /// positive number, or maximumBufferSize is less than blockSize. + /// + /// maximumBufferSize is not a multiple/exponential of largeBufferMultiple + public RecyclableMemoryStreamManager(int blockSize, int largeBufferMultiple, int maximumBufferSize, bool useExponentialLargeBuffer) { if (blockSize <= 0) throw new ArgumentOutOfRangeException(nameof(blockSize), blockSize, "blockSize must be a positive number"); @@ -85,13 +136,17 @@ public RecyclableMemoryStreamManager(int blockSize, int largeBufferMultiple, int BlockSize = blockSize; LargeBufferMultiple = largeBufferMultiple; MaximumBufferSize = maximumBufferSize; + UseExponentialLargeBuffer = useExponentialLargeBuffer; - if (!IsLargeBufferMultiple(maximumBufferSize)) - throw new ArgumentException("maximumBufferSize is not a multiple of largeBufferMultiple", + if (!IsLargeBufferSize(maximumBufferSize)) + throw new ArgumentException(string.Format("maximumBufferSize is not {0} of largeBufferMultiple", + UseExponentialLargeBuffer ? "an exponential" : "a multiple"), nameof(maximumBufferSize)); _smallPool = new ConcurrentStack(); - var numLargePools = maximumBufferSize / largeBufferMultiple; + var numLargePools = useExponentialLargeBuffer + ? (int)Math.Log(maximumBufferSize / largeBufferMultiple, 2) + 1 + : maximumBufferSize / largeBufferMultiple; // +1 to store size of bytes in use that are too large to be pooled _largeBufferInUseSize = new long[numLargePools + 1]; @@ -100,6 +155,9 @@ public RecyclableMemoryStreamManager(int blockSize, int largeBufferMultiple, int _largePools = new ConcurrentStack[numLargePools]; for (var i = 0; i < _largePools.Length; ++i) _largePools[i] = new ConcurrentStack(); + + Counter = new Counters(this); + EventsWriter.MemoryStreamManagerInitialized(blockSize, largeBufferMultiple, maximumBufferSize); } /// @@ -118,7 +176,13 @@ public RecyclableMemoryStreamManager(int blockSize, int largeBufferMultiple, int public int BlockSize { get; } /// - /// All buffers are multiples of this number. It must be set at creation and cannot be changed. + /// Whether to save callstacks for stream allocations. This can help in debugging. + /// It should NEVER be turned on generally in production. + /// + public bool GenerateCallStacks { get; set; } + + /// + /// All buffers are multiples/exponentials of this number. It must be set at creation and cannot be changed. /// public int LargeBufferMultiple { get; } @@ -143,11 +207,7 @@ public long LargePoolFreeSize get { long sum = 0; - for (var index = 0; index < _largeBufferFreeSize.Length; index++) - { - var freeSize = _largeBufferFreeSize[index]; - sum += freeSize; - } + foreach (var freeSize in _largeBufferFreeSize) sum += freeSize; return sum; } @@ -161,18 +221,14 @@ public long LargePoolInUseSize get { long sum = 0; - for (var index = 0; index < _largeBufferInUseSize.Length; index++) - { - var inUseSize = _largeBufferInUseSize[index]; - sum += inUseSize; - } + foreach (var inUseSize in _largeBufferInUseSize) sum += inUseSize; return sum; } } /// - /// Gets or sets the maximum buffer size. + /// Gets the maximum buffer size. /// /// /// Any buffer that is returned to the pool that is larger than this will be @@ -214,14 +270,31 @@ public long LargePoolInUseSize /// public long SmallPoolInUseSize => _smallPoolInUseSize; + /// + /// Use exponential large buffer allocation strategy. It must be set at creation and cannot be changed. + /// + public bool UseExponentialLargeBuffer { get; } + + /// + /// Use multiple large buffer allocation strategy. It must be set at creation and cannot be changed. + /// + public bool UseMultipleLargeBuffer => !UseExponentialLargeBuffer; + /// /// Removes and returns a single block from the pool. /// /// A byte[] array internal byte[] GetBlock() { - if (!_smallPool.TryPop(out var block)) + byte[] block; + if (!_smallPool.TryPop(out block)) + { + // We'll add this back to the pool when the stream is disposed + // (unless our free pool is too large) block = new byte[BlockSize]; + EventsWriter.MemoryStreamNewBlockCreated(_smallPoolInUseSize); + ReportBlockCreated(); + } else Interlocked.Add(ref _smallPoolFreeSize, -BlockSize); @@ -231,22 +304,27 @@ internal byte[] GetBlock() /// /// Returns a buffer of arbitrary size from the large buffer pool. This buffer - /// will be at least the requiredSize and always be a multiple of largeBufferMultiple. + /// will be at least the requiredSize and always be a multiple/exponential of largeBufferMultiple. /// /// The minimum length of the buffer /// The tag of the stream returning this buffer, for logging if necessary. /// A buffer of at least the required size. internal byte[] GetLargeBuffer(int requiredSize, string tag) { - requiredSize = RoundToLargeBufferMultiple(requiredSize); + requiredSize = RoundToLargeBufferSize(requiredSize); - var poolIndex = requiredSize / LargeBufferMultiple - 1; + var poolIndex = GetPoolIndex(requiredSize); byte[] buffer; if (poolIndex < _largePools.Length) { if (!_largePools[poolIndex].TryPop(out buffer)) + { buffer = new byte[requiredSize]; + + EventsWriter.MemoryStreamNewLargeBufferCreated(requiredSize, LargePoolInUseSize); + ReportLargeBufferCreated(); + } else Interlocked.Add(ref _largeBufferFreeSize[poolIndex], -buffer.Length); } @@ -260,6 +338,12 @@ internal byte[] GetLargeBuffer(int requiredSize, string tag) // We still want to round up to reduce heap fragmentation. buffer = new byte[requiredSize]; + string callStack = null; + if (GenerateCallStacks) + // Grab the stack -- we want to know who requires such large buffers + callStack = Environment.StackTrace; + EventsWriter.MemoryStreamNonPooledLargeBufferCreated(requiredSize, tag, callStack); + ReportLargeBufferCreated(); } Interlocked.Add(ref _largeBufferInUseSize[poolIndex], buffer.Length); @@ -267,10 +351,34 @@ internal byte[] GetLargeBuffer(int requiredSize, string tag) return buffer; } - private int RoundToLargeBufferMultiple(int requiredSize) => - (requiredSize + LargeBufferMultiple - 1) / LargeBufferMultiple * LargeBufferMultiple; + private int RoundToLargeBufferSize(int requiredSize) + { + if (UseExponentialLargeBuffer) + { + var pow = 1; + while (LargeBufferMultiple * pow < requiredSize) pow <<= 1; + return LargeBufferMultiple * pow; + } + else + return (requiredSize + LargeBufferMultiple - 1) / LargeBufferMultiple * LargeBufferMultiple; + } + + private bool IsLargeBufferSize(int value) => + value != 0 && (UseExponentialLargeBuffer + ? value == RoundToLargeBufferSize(value) + : value % LargeBufferMultiple == 0); - private bool IsLargeBufferMultiple(int value) => value != 0 && value % LargeBufferMultiple == 0; + private int GetPoolIndex(int length) + { + if (UseExponentialLargeBuffer) + { + var index = 0; + while (LargeBufferMultiple << index < length) ++index; + return index; + } + else + return length / LargeBufferMultiple - 1; + } /// /// Returns the buffer to the large pool @@ -278,17 +386,21 @@ private int RoundToLargeBufferMultiple(int requiredSize) => /// The buffer to return. /// The tag of the stream returning this buffer, for logging if necessary. /// buffer is null - /// buffer.Length is not a multiple of LargeBufferMultiple (it did not originate from this pool) + /// + /// buffer.Length is not a multiple/exponential of LargeBufferMultiple (it did not + /// originate from this pool) + /// internal void ReturnLargeBuffer(byte[] buffer, string tag) { if (buffer == null) throw new ArgumentNullException(nameof(buffer)); - if (!IsLargeBufferMultiple(buffer.Length)) + if (!IsLargeBufferSize(buffer.Length)) throw new ArgumentException( - "buffer did not originate from this memory manager. The size is not a multiple of " + + string.Format("buffer did not originate from this memory manager. The size is not {0} of ", + UseExponentialLargeBuffer ? "an exponential" : "a multiple") + LargeBufferMultiple); - var poolIndex = buffer.Length / LargeBufferMultiple - 1; + var poolIndex = GetPoolIndex(buffer.Length); if (poolIndex < _largePools.Length) { @@ -298,11 +410,28 @@ internal void ReturnLargeBuffer(byte[] buffer, string tag) _largePools[poolIndex].Push(buffer); Interlocked.Add(ref _largeBufferFreeSize[poolIndex], buffer.Length); } + else + { + EventsWriter.MemoryStreamDiscardBuffer(Events.MemoryStreamBufferType.Large, tag, + Events.MemoryStreamDiscardReason.EnoughFree); + ReportLargeBufferDiscarded(Events.MemoryStreamDiscardReason.EnoughFree); + } } else + { + // This is a non-poolable buffer, but we still want to track its size for inuse + // analysis. We have space in the inuse array for this. poolIndex = _largeBufferInUseSize.Length - 1; + EventsWriter.MemoryStreamDiscardBuffer(Events.MemoryStreamBufferType.Large, tag, + Events.MemoryStreamDiscardReason.TooLarge); + ReportLargeBufferDiscarded(Events.MemoryStreamDiscardReason.TooLarge); + } + Interlocked.Add(ref _largeBufferInUseSize[poolIndex], -buffer.Length); + + ReportUsageReport(_smallPoolInUseSize, _smallPoolFreeSize, LargePoolInUseSize, + LargePoolFreeSize); } /// @@ -333,10 +462,64 @@ internal void ReturnBlocks(ICollection blocks, string tag) _smallPool.Push(block); } else + { + EventsWriter.MemoryStreamDiscardBuffer(Events.MemoryStreamBufferType.Small, tag, + Events.MemoryStreamDiscardReason.EnoughFree); + ReportBlockDiscarded(); break; + } } + + ReportUsageReport(_smallPoolInUseSize, _smallPoolFreeSize, LargePoolInUseSize, + LargePoolFreeSize); } + internal void ReportBlockCreated() + { + Counter.ReportBlockCreated(); + BlockCreated?.Invoke(); + } + + internal void ReportBlockDiscarded() + { + Counter.ReportBlockDiscarded(); + BlockDiscarded?.Invoke(); + } + + internal void ReportLargeBufferCreated() + { + Counter.ReportLargeBufferCreated(); + LargeBufferCreated?.Invoke(); + } + + internal void ReportLargeBufferDiscarded(Events.MemoryStreamDiscardReason reason) + { + Counter.ReportLargeBufferDiscarded(); + LargeBufferDiscarded?.Invoke(reason); + } + + internal void ReportStreamCreated() + { + Counter.ReportStreamCreated(); + StreamCreated?.Invoke(); + } + + internal void ReportStreamDisposed() + { + Counter.ReportStreamDisposed(); + StreamDisposed?.Invoke(); + } + + internal void ReportStreamFinalized() => StreamFinalized?.Invoke(); + + internal void ReportStreamLength(long bytes) => StreamLength?.Invoke(bytes); + + internal void ReportStreamToArray() => StreamConvertedToArray?.Invoke(); + + internal void ReportUsageReport( + long smallPoolInUseBytes, long smallPoolFreeBytes, long largePoolInUseBytes, long largePoolFreeBytes + ) => + UsageReport?.Invoke(smallPoolInUseBytes, smallPoolFreeBytes, largePoolInUseBytes, largePoolFreeBytes); /// /// Retrieve a new MemoryStream object with no tag and a default initial capacity. @@ -344,6 +527,33 @@ internal void ReturnBlocks(ICollection blocks, string tag) /// A MemoryStream. public MemoryStream GetStream() => new RecyclableMemoryStream(this); + private class ReportingMemoryStream : MemoryStream + { + private readonly RecyclableMemoryStreamManager _instance; + + public ReportingMemoryStream(byte[] bytes, RecyclableMemoryStreamManager instance) : base(bytes) + { + _instance = instance; + _instance.Counter.ReportStreamCreated(); + } + + //NOTE DisposeAsync calls Dispose as well + protected override void Dispose(bool disposing) => _instance.Counter.ReportStreamDisposed(); + } + + /// + /// Shortcut to create a stream that directly wraps bytes but still uses reporting on the stream being created and disposes. + /// Note this does NOT use the pooled memory streams as the bytes have already been allocated + /// + public MemoryStream GetStream(byte[] bytes) => new ReportingMemoryStream(bytes, this); + + /// + /// Retrieve a new MemoryStream object with no tag and a default initial capacity. + /// + /// A unique identifier which can be used to trace usages of the stream. + /// A MemoryStream. + public MemoryStream GetStream(Guid id) => new RecyclableMemoryStream(this, id); + /// /// Retrieve a new MemoryStream object with the given tag and a default initial capacity. /// @@ -351,17 +561,35 @@ internal void ReturnBlocks(ICollection blocks, string tag) /// A MemoryStream. public MemoryStream GetStream(string tag) => new RecyclableMemoryStream(this, tag); + /// + /// Retrieve a new MemoryStream object with the given tag and a default initial capacity. + /// + /// A unique identifier which can be used to trace usages of the stream. + /// A tag which can be used to track the source of the stream. + /// A MemoryStream. + public MemoryStream GetStream(Guid id, string tag) => new RecyclableMemoryStream(this, id, tag); + /// /// Retrieve a new MemoryStream object with the given tag and at least the given capacity. /// + /// A unique identifier which can be used to trace usages of the stream. /// A tag which can be used to track the source of the stream. /// The minimum desired capacity for the stream. /// A MemoryStream. public MemoryStream GetStream(string tag, int requiredSize) => new RecyclableMemoryStream(this, tag, requiredSize); + /// + /// Retrieve a new MemoryStream object with the given tag and at least the given capacity. + /// + /// A unique identifier which can be used to trace usages of the stream. + /// A tag which can be used to track the source of the stream. + /// The minimum desired capacity for the stream. + /// A MemoryStream. + public MemoryStream GetStream(Guid id, string tag, int requiredSize) => new RecyclableMemoryStream(this, id, tag, requiredSize); + /// /// Retrieve a new MemoryStream object with the given tag and at least the given capacity, possibly using - /// a single continugous underlying buffer. + /// a single contiguous underlying buffer. /// /// /// Retrieving a MemoryStream which provides a single contiguous buffer can be useful in situations @@ -369,34 +597,124 @@ internal void ReturnBlocks(ICollection blocks, string tag) /// buffers to a single large one. This is most helpful when you know that you will always call GetBuffer /// on the underlying stream. /// + /// A unique identifier which can be used to trace usages of the stream. /// A tag which can be used to track the source of the stream. /// The minimum desired capacity for the stream. /// Whether to attempt to use a single contiguous buffer. /// A MemoryStream. - public MemoryStream GetStream(string tag, int requiredSize, bool asContiguousBuffer) + public MemoryStream GetStream(Guid id, string tag, int requiredSize, bool asContiguousBuffer) { - if (!asContiguousBuffer || requiredSize <= BlockSize) return GetStream(tag, requiredSize); + if (!asContiguousBuffer || requiredSize <= BlockSize) return GetStream(id, tag, requiredSize); - return new RecyclableMemoryStream(this, tag, requiredSize, GetLargeBuffer(requiredSize, tag)); + return new RecyclableMemoryStream(this, id, tag, requiredSize, GetLargeBuffer(requiredSize, tag)); } + /// + /// Retrieve a new MemoryStream object with the given tag and at least the given capacity, possibly using + /// a single contiguous underlying buffer. + /// + /// + /// Retrieving a MemoryStream which provides a single contiguous buffer can be useful in situations + /// where the initial size is known and it is desirable to avoid copying data between the smaller underlying + /// buffers to a single large one. This is most helpful when you know that you will always call GetBuffer + /// on the underlying stream. + /// + /// A tag which can be used to track the source of the stream. + /// The minimum desired capacity for the stream. + /// Whether to attempt to use a single contiguous buffer. + /// A MemoryStream. + public MemoryStream GetStream(string tag, int requiredSize, bool asContiguousBuffer) => + GetStream(Guid.NewGuid(), tag, requiredSize, asContiguousBuffer); + /// /// Retrieve a new MemoryStream object with the given tag and with contents copied from the provided /// buffer. The provided buffer is not wrapped or used after construction. /// /// The new stream's position is set to the beginning of the stream when returned. + /// A unique identifier which can be used to trace usages of the stream. /// A tag which can be used to track the source of the stream. /// The byte buffer to copy data from. /// The offset from the start of the buffer to copy from. /// The number of bytes to copy from the buffer. /// A MemoryStream. - [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")] - public MemoryStream GetStream(string tag, byte[] buffer, int offset, int count) + public MemoryStream GetStream(Guid id, string tag, byte[] buffer, int offset, int count) { - var stream = new RecyclableMemoryStream(this, tag, count); - stream.Write(buffer, offset, count); - stream.Position = 0; - return stream; + RecyclableMemoryStream stream = null; + try + { + stream = new RecyclableMemoryStream(this, id, tag, count); + stream.Write(buffer, offset, count); + stream.Position = 0; + return stream; + } + catch + { + stream?.Dispose(); + throw; + } } + + /// + /// Retrieve a new MemoryStream object with the given tag and with contents copied from the provided + /// buffer. The provided buffer is not wrapped or used after construction. + /// + /// The new stream's position is set to the beginning of the stream when returned. + /// A tag which can be used to track the source of the stream. + /// The byte buffer to copy data from. + /// The offset from the start of the buffer to copy from. + /// The number of bytes to copy from the buffer. + /// A MemoryStream. + public MemoryStream GetStream(string tag, byte[] buffer, int offset, int count) => GetStream(Guid.NewGuid(), tag, buffer, offset, count); + + /// + /// Triggered when a new block is created. + /// + public event EventHandler BlockCreated; + + /// + /// Triggered when a new block is created. + /// + public event EventHandler BlockDiscarded; + + /// + /// Triggered when a new large buffer is created. + /// + public event EventHandler LargeBufferCreated; + + /// + /// Triggered when a new stream is created. + /// + public event EventHandler StreamCreated; + + /// + /// Triggered when a stream is disposed. + /// + public event EventHandler StreamDisposed; + + /// + /// Triggered when a stream is finalized. + /// + public event EventHandler StreamFinalized; + + /// + /// Triggered when a stream is finalized. + /// + public event StreamLengthReportHandler StreamLength; + + /// + /// Triggered when a user converts a stream to array. + /// + public event EventHandler StreamConvertedToArray; + + /// + /// Triggered when a large buffer is discarded, along with the reason for the discard. + /// + public event LargeBufferDiscardedEventHandler LargeBufferDiscarded; + + /// + /// Periodically triggered to report usage statistics. + /// + public event UsageReportEventHandler UsageReport; + } } diff --git a/src/Elasticsearch.Net/Serialization/ElasticsearchSerializerExtensions.cs b/src/Elasticsearch.Net/Serialization/ElasticsearchSerializerExtensions.cs index a32d4e8f4ab..e1830f0071f 100644 --- a/src/Elasticsearch.Net/Serialization/ElasticsearchSerializerExtensions.cs +++ b/src/Elasticsearch.Net/Serialization/ElasticsearchSerializerExtensions.cs @@ -27,7 +27,7 @@ public static byte[] SerializeToBytes( SerializationFormatting formatting = SerializationFormatting.None ) { - memoryStreamFactory = memoryStreamFactory ?? RecyclableMemoryStreamFactory.Default; + memoryStreamFactory ??= RecyclableMemoryStreamFactory.Default; using (var ms = memoryStreamFactory.Create()) { serializer.Serialize(data, ms, formatting); @@ -58,7 +58,7 @@ public static string SerializeToString( SerializationFormatting formatting = SerializationFormatting.None ) { - memoryStreamFactory = memoryStreamFactory ?? RecyclableMemoryStreamFactory.Default; + memoryStreamFactory ??= RecyclableMemoryStreamFactory.Default; using (var ms = memoryStreamFactory.Create()) { serializer.Serialize(data, ms, formatting); diff --git a/src/Elasticsearch.Net/Utf8Json/Internal/Emit/ILViewer.cs b/src/Elasticsearch.Net/Utf8Json/Internal/Emit/ILViewer.cs index c145c417ed4..d42072d1fa2 100644 --- a/src/Elasticsearch.Net/Utf8Json/Internal/Emit/ILViewer.cs +++ b/src/Elasticsearch.Net/Utf8Json/Internal/Emit/ILViewer.cs @@ -61,7 +61,7 @@ static ILStreamReader() } public ILStreamReader(byte[] ilByteArray) - : base(new MemoryStream(ilByteArray)) + : base(RecyclableMemoryStreamFactory.Default.Create(ilByteArray)) { this.endPosition = ilByteArray.Length; } diff --git a/src/Elasticsearch.sln b/src/Elasticsearch.sln index 9c8e11cea3b..807cf4bd6af 100644 --- a/src/Elasticsearch.sln +++ b/src/Elasticsearch.sln @@ -81,6 +81,8 @@ Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Tests.YamlRunner", "Tests\T EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Jenkins", "..\.ci\Jenkins.csproj", "{5A9C1B95-9280-433E-8D1B-1F5396126166}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tests.LongRunning", "Tests\Tests.LongRunning\Tests.LongRunning.csproj", "{4F4C7D75-0512-4BA2-915C-E627CD7213A5}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -110,6 +112,7 @@ Global {BA4C7FC9-13AD-4632-9A51-FAAD376E70BE} = {14241027-0A92-466D-B024-E0063F338915} {81473437-5722-4829-A5CD-125B17CCA238} = {6C4A2627-AF22-4388-9DF7-7A9AEACFD635} {5A9C1B95-9280-433E-8D1B-1F5396126166} = {432D5575-2347-4D3C-BF8C-3E38410C46CA} + {4F4C7D75-0512-4BA2-915C-E627CD7213A5} = {6C4A2627-AF22-4388-9DF7-7A9AEACFD635} EndGlobalSection GlobalSection(ProjectConfigurationPlatforms) = postSolution {5B393962-7586-49BA-BD99-3B1E35F48E94}.Debug|Any CPU.ActiveCfg = Debug|Any CPU @@ -180,5 +183,9 @@ Global {5A9C1B95-9280-433E-8D1B-1F5396126166}.Debug|Any CPU.Build.0 = Debug|Any CPU {5A9C1B95-9280-433E-8D1B-1F5396126166}.Release|Any CPU.ActiveCfg = Release|Any CPU {5A9C1B95-9280-433E-8D1B-1F5396126166}.Release|Any CPU.Build.0 = Release|Any CPU + {4F4C7D75-0512-4BA2-915C-E627CD7213A5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4F4C7D75-0512-4BA2-915C-E627CD7213A5}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4F4C7D75-0512-4BA2-915C-E627CD7213A5}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4F4C7D75-0512-4BA2-915C-E627CD7213A5}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection EndGlobal diff --git a/src/Nest/Nest.csproj b/src/Nest/Nest.csproj index 1fd58c89d4a..81a07460a31 100644 --- a/src/Nest/Nest.csproj +++ b/src/Nest/Nest.csproj @@ -2,7 +2,7 @@ - netstandard2.0;net461 + netstandard2.0;netstandard2.1;net461 8.0 diff --git a/src/Tests/Tests.LongRunning/BulkIndexer.cs b/src/Tests/Tests.LongRunning/BulkIndexer.cs new file mode 100644 index 00000000000..686a3a45388 --- /dev/null +++ b/src/Tests/Tests.LongRunning/BulkIndexer.cs @@ -0,0 +1,449 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Elasticsearch.Net; +using Nest; +using Tests.LongRunning.Models; +using IndexOptions = Tests.LongRunning.CommandLineArgs.IndexOptions; +using TaskStatus = System.Threading.Tasks.TaskStatus; + +namespace Tests.LongRunning +{ + public class BulkIndexer + { + private const string PostsIndex = "posts"; + private const string UsersIndex = "users"; + private readonly IElasticClient _client; + + public BulkIndexer(IndexOptions opts) + { + var node = new SingleNodeConnectionPool(opts.ElasticsearchUrl); + var settings = new ConnectionSettings(node) + .RequestTimeout(TimeSpan.FromMinutes(10)) + .DefaultMappingFor(new[] + { + new ClrTypeMapping(typeof(Post)) {IndexName = PostsIndex}, + new ClrTypeMapping(typeof(Question)) {IndexName = PostsIndex, RelationName = "question"}, + new ClrTypeMapping(typeof(Answer)) {IndexName = PostsIndex}, + new ClrTypeMapping(typeof(User)) {IndexName = UsersIndex} + }) + .OnRequestCompleted(response => + { + if (!response.Success) + Console.Error.WriteLine(response.DebugInformation); + }); + + if (!string.IsNullOrEmpty(opts.UserName) && !string.IsNullOrEmpty(opts.Password)) + settings.BasicAuthentication(opts.UserName, opts.Password); + + if (opts.AllowInsecure) + settings.ServerCertificateValidationCallback(CertificateValidations.AllowAll); + + _client = new ElasticClient(settings); + } + + public void IndexUsers(string usersPath, string badgesPath) + { + CreateUsersIndexIfNotExists(); + + _client.Indices.UpdateSettings(UsersIndex, u => u + .IndexSettings(i => i + .RefreshInterval("-1") + ) + ); + + var size = 1000; + var seenPages = 0; + var indexedDocs = 0; + var totalDocs = 0; + var handle = new ManualResetEvent(false); + + var users = StackOverflowData.GetUsers(usersPath); + var observableBulk = _client.BulkAll(users, f => f + .MaxDegreeOfParallelism(16) + .BackOffTime(TimeSpan.FromSeconds(10)) + .BackOffRetries(2) + .Size(size) + .RefreshOnCompleted() + .Index(UsersIndex) + ); + + Exception exception = null; + var bulkObserver = new BulkAllObserver( + onError: e => + { + exception = e; + handle.Set(); + }, + onCompleted: () => handle.Set(), + onNext: b => + { + Interlocked.Add(ref indexedDocs, b.Items.Count(i => i.IsValid)); + Interlocked.Add(ref totalDocs, b.Items.Count); + Interlocked.Increment(ref seenPages); + Log.WriteLine($"indexed users page {seenPages}, {indexedDocs} out of {totalDocs}"); + } + ); + + var stopWatch = Stopwatch.StartNew(); + observableBulk.Subscribe(bulkObserver); + handle.WaitOne(); + + if (exception != null) + throw exception; + + Log.WriteLine($"Time taken to index users: {stopWatch.Elapsed}"); + + // update user badges + seenPages = 0; + indexedDocs = 0; + totalDocs = 0; + handle = new ManualResetEvent(false); + + var badgeMetas = StackOverflowData.GetBadgeMetas(badgesPath); + + var observableBadgeBulk = _client.BulkAll(badgeMetas, f => f + .Index() + .MaxDegreeOfParallelism(8) + .Size(size) + .BufferToBulk((bulk, badges) => + { + foreach (var badge in badges) + bulk.Update(u => u + .Script(s => s + .Source(@"if (ctx._source.badges == null) { + ctx._source.badges = [params.badge]; + } else if (ctx._source.badges.any(b -> b.name == params.badge.name) == false) { + ctx._source.badges.add(params.badge); + }") + .Params(d => d + .Add("badge", badge.Badge) + ) + ) + .Id(badge.UserId) + .RetriesOnConflict(10) + ); + }) + .RefreshOnCompleted() + ); + + bulkObserver = new BulkAllObserver( + onError: e => + { + exception = e; + handle.Set(); + }, + onCompleted: () => handle.Set(), + onNext: b => + { + Interlocked.Add(ref indexedDocs, b.Items.Count(i => i.IsValid)); + Interlocked.Add(ref totalDocs, b.Items.Count); + Interlocked.Increment(ref seenPages); + Log.WriteLine($"indexed badges page {seenPages}, {indexedDocs} out of {totalDocs}"); + } + ); + + stopWatch.Restart(); + observableBadgeBulk.Subscribe(bulkObserver); + handle.WaitOne(); + + if (exception != null) + throw exception; + + Log.WriteLine($"Time taken to index badges: {stopWatch.Elapsed}"); + + _client.Indices.UpdateSettings(UsersIndex, u => u + .IndexSettings(i => i + .RefreshInterval("30s") + ) + ); + } + + private void CreateUsersIndexIfNotExists() + { + if (!_client.Indices.Exists(UsersIndex).Exists) + { + var createIndexResponse = _client.Indices.Create(UsersIndex, c => c + .Settings(s => s + .NumberOfShards(3) + .NumberOfReplicas(0) + .Analysis(a => a + .Analyzers(an => an + .Custom("html", ca => ca + .CharFilters("html_strip") + .Tokenizer("standard") + .Filters("lowercase", "stop") + ) + ) + ) + ) + .Map(mm => mm + .AutoMap() + .Properties(p => p + .Text(s => s + .Name(n => n.AboutMe) + .Analyzer("html") + .SearchAnalyzer("standard") + ) + .Keyword(s => s + .Name(n => n.ProfileImageUrl) + ) + .Keyword(s => s + .Name(n => n.WebsiteUrl) + ) + .Object(o => o + .Name(n => n.Badges) + .AutoMap() + .Properties(op => op + .Keyword(s => s + .Name(n => n.Name) + ) + .Keyword(s => s + .Name(n => n.Class) + ) + ) + ) + ) + ) + ); + + if (!createIndexResponse.IsValid) + throw new Exception( + $"invalid response creating users index: {createIndexResponse.DebugInformation}"); + } + } + + public void UpdateAnswersWithQuestionTags(string path, int size) + { + if (!_client.Indices.Exists(PostsIndex).Exists) + throw new Exception($"{PostsIndex} index does not exist. You must run the 'posts' command to index posts first"); + + var postIdsAndTags = StackOverflowData.GetPostTagsWithAnswers(path); + long totalAnswersUpdated = 0; + var totalQuestions = 0; + var stopWatch = Stopwatch.StartNew(); + + foreach (var batch in postIdsAndTags.Batch(size)) + { + var tasks = batch.Select(b => + { + var (id, tags) = b; + return _client.UpdateByQueryAsync(u => u + .Routing(id) + .Query(q => +q + .ParentId(p => p + .Id(id) + .Type() + ) + ) + .Conflicts(Conflicts.Proceed) + .Index(PostsIndex) + .Timeout(TimeSpan.FromMinutes(1)) + .WaitForCompletion() + .Script(ss => ss + .Source(@"if (ctx._source.tags == null) { + ctx._source.tags = params.tags; + } else { + ctx.op = ""noop""; + }") + .Params(p => p + .Add("tags", tags) + ) + ) + ); + }).ToArray(); + + var task = Task.WhenAll(tasks); + task.Wait(); + + if (task.Status == TaskStatus.Faulted) + throw task.Exception.Flatten(); + + totalQuestions += tasks.Length; + totalAnswersUpdated += tasks.Sum(t => t.Result.Updated); + Log.WriteLine($"Updated {totalAnswersUpdated} answers for {totalQuestions} questions"); + } + + Log.WriteLine($"time taken to update answers: {stopWatch.Elapsed}"); + } + + public void IndexPosts(string path) + { + CreatePostsIndexIfNotExists(); + + _client.Indices.UpdateSettings(PostsIndex, u => u + .IndexSettings(i => i + .RefreshInterval("-1") + ) + ); + + var handle = new ManualResetEvent(false); + var size = 1000; + var posts = StackOverflowData.GetPosts(path); + var observableBulk = _client.BulkAll(posts, f => f + .MaxDegreeOfParallelism(Environment.ProcessorCount * 2) + .BackOffTime(TimeSpan.FromSeconds(10)) + .BackOffRetries(2) + .Size(size) + .BufferToBulk((bulk, buffer) => + { + foreach (var post in buffer) + if (post is Question question) + { + var item = new BulkIndexOperation(question); + bulk.AddOperation(item); + } + else + { + var answer = (Answer) post; + var item = new BulkIndexOperation(answer); + bulk.AddOperation(item); + } + }) + .RefreshOnCompleted() + .Index(PostsIndex) + ); + + var seenPages = 0; + var indexedDocs = 0; + var totalDocs = 0; + + Exception exception = null; + var bulkObserver = new BulkAllObserver( + onError: e => + { + exception = e; + handle.Set(); + }, + onCompleted: () => handle.Set(), + onNext: b => + { + Interlocked.Add(ref indexedDocs, b.Items.Count(i => i.IsValid)); + Interlocked.Add(ref totalDocs, b.Items.Count); + Interlocked.Increment(ref seenPages); + Log.WriteLine($"indexed page {seenPages} of questions and answers, {indexedDocs} out of {totalDocs}"); + } + ); + + var stopWatch = Stopwatch.StartNew(); + observableBulk.Subscribe(bulkObserver); + handle.WaitOne(); + + if (exception != null) + throw exception; + + Log.WriteLine($"time taken to index posts: {stopWatch.Elapsed}"); + + _client.Indices.UpdateSettings(PostsIndex, u => u + .IndexSettings(i => i + .RefreshInterval("30s") + ) + ); + } + + private void CreatePostsIndexIfNotExists() + { + if (!_client.Indices.Exists(PostsIndex).Exists) + { + var characterFilterMappings = CreateCharacterFilterMappings(); + + var createIndexResponse = _client.Indices.Create(PostsIndex, c => c + .Settings(s => s + .NumberOfShards(3) + .NumberOfReplicas(0) + .Analysis(a => a + .CharFilters(cf => cf + .Mapping("programming_language", mca => mca + .Mappings(characterFilterMappings) + ) + ) + .Analyzers(an => an + .Custom("html", ca => ca + .CharFilters("html_strip", "programming_language") + .Tokenizer("standard") + .Filters("lowercase", "stop") + ) + .Custom("expand", ca => ca + .CharFilters("programming_language") + .Tokenizer("standard") + .Filters("lowercase", "stop") + ) + ) + ) + ) + .Map(u => u + .RoutingField(r => r.Required()) + .AutoMap() + .AutoMap() + .SourceField(s => s + .Excludes(new[] {"titleSuggest"}) + ) + .Properties(p => p + .Join(j => j + .Name(f => f.ParentId) + .Relations(r => r + .Join() + ) + ) + .Text(s => s + .Name(n => n.Title) + .Analyzer("expand") + .Norms(false) + .Fields(f => f + .Keyword(ss => ss + .Name("raw") + ) + ) + ) + .Keyword(s => s + .Name(n => n.OwnerDisplayName) + ) + .Keyword(s => s + .Name(n => n.LastEditorDisplayName) + ) + .Keyword(s => s + .Name(n => n.Tags) + ) + .Keyword(s => s + .Name(n => n.Type) + ) + .Text(s => s + .Name(n => n.Body) + .Analyzer("html") + .SearchAnalyzer("expand") + ) + .Completion(co => co + .Name(n => n.TitleSuggest) + ) + ) + ) + ); + + if (!createIndexResponse.IsValid) + throw new Exception( + $"invalid response creating posts index: {createIndexResponse.DebugInformation}"); + } + } + + private static IList CreateCharacterFilterMappings() + { + var mappings = new List(); + foreach (var c in new[] {"c", "f", "m", "j", "s", "a", "k", "t"}) + { + mappings.Add($"{c}# => {c}sharp"); + mappings.Add($"{c.ToUpperInvariant()}# => {c}sharp"); + } + + foreach (var c in new[] {"g", "m", "c", "s", "a", "d"}) + { + mappings.Add($"{c}++ => {c}plusplus"); + mappings.Add($"{c.ToUpperInvariant()}++ => {c}plusplus"); + } + + return mappings; + } + } +} diff --git a/src/Tests/Tests.LongRunning/CommandLineArgs/IndexOptions.cs b/src/Tests/Tests.LongRunning/CommandLineArgs/IndexOptions.cs new file mode 100644 index 00000000000..20255bbb0dc --- /dev/null +++ b/src/Tests/Tests.LongRunning/CommandLineArgs/IndexOptions.cs @@ -0,0 +1,33 @@ +using System; +using CommandLine; + +namespace Tests.LongRunning.CommandLineArgs +{ + + public abstract class LongRunningAppArgumentsBase + { + [Option("profile", Required = false, + HelpText = + "Prints the current process id and waits on stdin for confirmation to start allowing profiles/listeners to be attached")] + public bool ProfileApplication { get; set; } + } + + public class IndexOptions : LongRunningAppArgumentsBase + { + [Option('k', "insecure", Required = false, + HelpText = + "Allow unsecured connections to Elasticsearch. Useful when Elasticsearch is running over HTTPS with a self-signed cert. Not recommended for production, fine for demo purposes")] + public bool AllowInsecure { get; set; } + + [Option('e', "elasticsearch", Required = true, HelpText = "The url to use to connect to Elasticsearch")] + public Uri ElasticsearchUrl { get; set; } + + [Option('p', "password", Required = false, + HelpText = "The password to use to connect to Elasticsearch when the cluster has security enabled")] + public string Password { get; set; } + + [Option('u', "username", Required = false, + HelpText = "The username to use to connect to Elasticsearch when the cluster has security enabled")] + public string UserName { get; set; } + } +} diff --git a/src/Tests/Tests.LongRunning/CommandLineArgs/IndexPostsOptions.cs b/src/Tests/Tests.LongRunning/CommandLineArgs/IndexPostsOptions.cs new file mode 100644 index 00000000000..7d36132d4da --- /dev/null +++ b/src/Tests/Tests.LongRunning/CommandLineArgs/IndexPostsOptions.cs @@ -0,0 +1,11 @@ +using CommandLine; + +namespace Tests.LongRunning.CommandLineArgs +{ + [Verb("posts", HelpText = "Index Stack Overflow posts into Elasticsearch")] + public class IndexPostsOptions : IndexOptions + { + [Option('f', "posts-path", Required = true, HelpText = "The path to the Stack Overflow Posts.xml file")] + public string PostsPath { get; set; } + } +} diff --git a/src/Tests/Tests.LongRunning/CommandLineArgs/IndexUsersOptions.cs b/src/Tests/Tests.LongRunning/CommandLineArgs/IndexUsersOptions.cs new file mode 100644 index 00000000000..83f9c6df0d0 --- /dev/null +++ b/src/Tests/Tests.LongRunning/CommandLineArgs/IndexUsersOptions.cs @@ -0,0 +1,14 @@ +using CommandLine; + +namespace Tests.LongRunning.CommandLineArgs +{ + [Verb("users", HelpText = "Index Stack Overflow users and their badges into Elasticsearch")] + public class IndexUsersOptions : IndexOptions + { + [Option('b', "badges-path", Required = true, HelpText = "The path to the Stack Overflow Badges.xml file")] + public string BadgesPath { get; set; } + + [Option('f', "users-path", Required = true, HelpText = "The path to the Stack Overflow Users.xml file")] + public string UsersPath { get; set; } + } +} diff --git a/src/Tests/Tests.LongRunning/CommandLineArgs/UpdateAnswersWithTagsOptions.cs b/src/Tests/Tests.LongRunning/CommandLineArgs/UpdateAnswersWithTagsOptions.cs new file mode 100644 index 00000000000..83e90d0af82 --- /dev/null +++ b/src/Tests/Tests.LongRunning/CommandLineArgs/UpdateAnswersWithTagsOptions.cs @@ -0,0 +1,14 @@ +using CommandLine; + +namespace Tests.LongRunning.CommandLineArgs +{ + [Verb("tags", HelpText = "Updates Stack Overflow answers with the tags of the question in Elasticsearch")] + public class UpdateAnswersWithTagsOptions : IndexOptions + { + [Option('f', "posts-path", Required = true, HelpText = "The path to the Stack Overflow Posts.xml file")] + public string PostsPath { get; set; } + + [Option('s', "size", Required = false, HelpText = "The number of updates to perform per batch", Default = 200)] + public int Size { get; set; } + } +} diff --git a/src/Tests/Tests.LongRunning/EnumerableExtensions.cs b/src/Tests/Tests.LongRunning/EnumerableExtensions.cs new file mode 100644 index 00000000000..b1f30681424 --- /dev/null +++ b/src/Tests/Tests.LongRunning/EnumerableExtensions.cs @@ -0,0 +1,35 @@ +using System.Collections.Generic; +using System.Linq; + +namespace Tests.LongRunning +{ + public static class EnumerableExtensions + { + public static IEnumerable> Batch( + this IEnumerable source, int size + ) + { + T[] bucket = null; + var count = 0; + + foreach (var item in source) + { + if (bucket == null) + bucket = new T[size]; + + bucket[count++] = item; + + if (count != size) + continue; + + yield return bucket.Select(x => x); + + bucket = null; + count = 0; + } + + if (bucket != null && count > 0) + yield return bucket.Take(count); + } + } +} diff --git a/src/Tests/Tests.LongRunning/Log.cs b/src/Tests/Tests.LongRunning/Log.cs new file mode 100644 index 00000000000..596c46b4007 --- /dev/null +++ b/src/Tests/Tests.LongRunning/Log.cs @@ -0,0 +1,9 @@ +using System; + +namespace Tests.LongRunning +{ + public static class Log + { + public static void WriteLine(string message) => Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] {message}"); + } +} diff --git a/src/Tests/Tests.LongRunning/Models/Answer.cs b/src/Tests/Tests.LongRunning/Models/Answer.cs new file mode 100644 index 00000000000..9938a1a6a6d --- /dev/null +++ b/src/Tests/Tests.LongRunning/Models/Answer.cs @@ -0,0 +1,7 @@ +namespace Tests.LongRunning.Models +{ + public class Answer : Post + { + public override string Type => nameof(Answer); + } +} diff --git a/src/Tests/Tests.LongRunning/Models/Badge.cs b/src/Tests/Tests.LongRunning/Models/Badge.cs new file mode 100644 index 00000000000..1a9b9fdad7a --- /dev/null +++ b/src/Tests/Tests.LongRunning/Models/Badge.cs @@ -0,0 +1,17 @@ +using System; + +namespace Tests.LongRunning.Models +{ + public class Badge + { + public BadgeClass Class { get; set; } + public DateTimeOffset Date { get; set; } + public string Name { get; set; } + } + + public class BadgeMeta + { + public Badge Badge { get; set; } + public int UserId { get; set; } + } +} diff --git a/src/Tests/Tests.LongRunning/Models/BadgeClass.cs b/src/Tests/Tests.LongRunning/Models/BadgeClass.cs new file mode 100644 index 00000000000..343ea9f9942 --- /dev/null +++ b/src/Tests/Tests.LongRunning/Models/BadgeClass.cs @@ -0,0 +1,12 @@ +using Elasticsearch.Net; + +namespace Tests.LongRunning.Models +{ + [StringEnum] + public enum BadgeClass + { + Gold = 1, + Silver = 2, + Bronze = 3 + } +} diff --git a/src/Tests/Tests.LongRunning/Models/Post.cs b/src/Tests/Tests.LongRunning/Models/Post.cs new file mode 100644 index 00000000000..2772d975f2e --- /dev/null +++ b/src/Tests/Tests.LongRunning/Models/Post.cs @@ -0,0 +1,23 @@ +using System; +using System.Collections.Generic; +using Nest; + +namespace Tests.LongRunning.Models +{ + public class Post + { + public string Body { get; set; } + public int CommentCount { get; set; } + public DateTimeOffset CreationDate { get; set; } + public int Id { get; set; } + public DateTimeOffset? LastActivityDate { get; set; } + public DateTimeOffset? LastEditDate { get; set; } + public int? LastEditorUserId { get; set; } + public string OwnerDisplayName { get; set; } + public int? OwnerUserId { get; set; } + public JoinField ParentId { get; set; } + public int Score { get; set; } + public List Tags { get; set; } + public virtual string Type => nameof(Post); + } +} diff --git a/src/Tests/Tests.LongRunning/Models/Question.cs b/src/Tests/Tests.LongRunning/Models/Question.cs new file mode 100644 index 00000000000..91ea86b4e85 --- /dev/null +++ b/src/Tests/Tests.LongRunning/Models/Question.cs @@ -0,0 +1,18 @@ +using System; +using Nest; + +namespace Tests.LongRunning.Models +{ + public class Question : Post + { + public int? AcceptedAnswerId { get; set; } + public int AnswerCount { get; set; } + public DateTimeOffset? CommunityOwnedDate { get; set; } + public int FavoriteCount { get; set; } + public string LastEditorDisplayName { get; set; } + public string Title { get; set; } + public CompletionField TitleSuggest { get; set; } + public override string Type => nameof(Question); + public int ViewCount { get; set; } + } +} diff --git a/src/Tests/Tests.LongRunning/Models/User.cs b/src/Tests/Tests.LongRunning/Models/User.cs new file mode 100644 index 00000000000..e6254b86866 --- /dev/null +++ b/src/Tests/Tests.LongRunning/Models/User.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; + +namespace Tests.LongRunning.Models +{ + public class User + { + public string AboutMe { get; set; } + public int AccountId { get; set; } + public int? Age { get; set; } + public List Badges { get; set; } + public DateTimeOffset CreationDate { get; set; } + public string DisplayName { get; set; } + public int DownVotes { get; set; } + public int Id { get; set; } + public DateTimeOffset LastAccessDate { get; set; } + public string Location { get; set; } + public string ProfileImageUrl { get; set; } + public int Reputation { get; set; } + public int UpVotes { get; set; } + public int Views { get; set; } + public string WebsiteUrl { get; set; } + } +} diff --git a/src/Tests/Tests.LongRunning/Program.cs b/src/Tests/Tests.LongRunning/Program.cs new file mode 100644 index 00000000000..b5cf93dec71 --- /dev/null +++ b/src/Tests/Tests.LongRunning/Program.cs @@ -0,0 +1,97 @@ +using System; +using System.Diagnostics; +using System.Globalization; +using CommandLine; +using Tests.LongRunning.CommandLineArgs; + +namespace Tests.LongRunning +{ + internal class Program + { + private static void Main(string[] args) => + Parser.Default.ParseArguments(args) + .MapResult( + (IndexPostsOptions opts) => + ProfileWrapper(opts, () => RunIndexPostsAndReturnExitCode(opts)), + (IndexUsersOptions opts) => + ProfileWrapper(opts, () => RunIndexUsersAndReturnExitCode(opts)), + (UpdateAnswersWithTagsOptions opts) => + ProfileWrapper(opts, () => UpdateAnswersWithQuestionTagsAndReturnExitCode(opts)), + errs => 1); + + private static int ProfileWrapper(LongRunningAppArgumentsBase opts, Func run) + { + WaitOnProfileConfirmation(opts); + var exitCode = 0; + do + { + exitCode = run(); + } while (RunAgain(opts)); + return exitCode; + } + + private static void WaitOnProfileConfirmation(LongRunningAppArgumentsBase opts) + { + if (!opts.ProfileApplication) return; + + Log.WriteLine($"Current Process Id: {Process.GetCurrentProcess().Id}"); + Log.WriteLine("Press any key to continue..."); + Console.ReadKey(); + } + + private static bool RunAgain(LongRunningAppArgumentsBase opts) + { + if (!opts.ProfileApplication) return false; + + Log.WriteLine($"Current Process Id: {Process.GetCurrentProcess().Id}"); + Log.WriteLine("Run again [Y/N]:"); + var c = Console.ReadKey(); + return c.KeyChar.ToString().Equals("Y", StringComparison.OrdinalIgnoreCase); + } + + private static int RunIndexUsersAndReturnExitCode(IndexUsersOptions opts) + { + try + { + var indexer = new BulkIndexer(opts); + indexer.IndexUsers(opts.UsersPath, opts.BadgesPath); + return 0; + } + catch (Exception e) + { + Console.Error.WriteLine(e); + return 1; + } + } + + private static int RunIndexPostsAndReturnExitCode(IndexPostsOptions opts) + { + try + { + var indexer = new BulkIndexer(opts); + indexer.IndexPosts(opts.PostsPath); + return 0; + } + catch (Exception e) + { + Console.Error.WriteLine(e); + return 1; + } + } + + private static int UpdateAnswersWithQuestionTagsAndReturnExitCode(UpdateAnswersWithTagsOptions opts) + { + try + { + var indexer = new BulkIndexer(opts); + indexer.UpdateAnswersWithQuestionTags(opts.PostsPath, opts.Size); + return 0; + } + catch (Exception e) + { + Console.Error.WriteLine(e); + return 1; + } + } + } +} diff --git a/src/Tests/Tests.LongRunning/StackOverflowData.cs b/src/Tests/Tests.LongRunning/StackOverflowData.cs new file mode 100644 index 00000000000..d14056353f6 --- /dev/null +++ b/src/Tests/Tests.LongRunning/StackOverflowData.cs @@ -0,0 +1,222 @@ +using System; +using System.Collections.Generic; +using System.Globalization; +using System.IO; +using System.Linq; +using System.Xml; +using System.Xml.Linq; +using Nest; +using Tests.LongRunning.Models; + +namespace Tests.LongRunning +{ + public static class StackOverflowData + { + public static IEnumerable<(int, List)> GetPostTagsWithAnswers(string path) + { + using (var stream = File.OpenRead(path)) + using (var reader = XmlReader.Create(stream)) + { + reader.ReadToDescendant("posts"); + reader.ReadToDescendant("row"); + do + { + var item = (XElement)XNode.ReadFrom(reader); + var id = int.Parse(item.Attribute("Id").Value, CultureInfo.InvariantCulture); + var postTypeId = int.Parse(item.Attribute("PostTypeId").Value, CultureInfo.InvariantCulture); + + // not a question + if (postTypeId != 1) + continue; + + var answerCount = int.Parse(item.Attribute("AnswerCount").Value, + CultureInfo.InvariantCulture); + + // no answers + if (answerCount == 0) + continue; + + var tags = GetTags(item.Attribute("Tags")); + + // no tags + if (tags == null) + continue; + + yield return (id, tags); + } while (reader.ReadToNextSibling("row")); + } + } + + public static IEnumerable GetPosts(string path) + { + using (var stream = File.OpenRead(path)) + using (var reader = XmlReader.Create(stream)) + { + reader.ReadToDescendant("posts"); + reader.ReadToDescendant("row"); + do + { + var item = (XElement)XNode.ReadFrom(reader); + var id = int.Parse(item.Attribute("Id").Value, CultureInfo.InvariantCulture); + var postTypeId = int.Parse(item.Attribute("PostTypeId").Value, CultureInfo.InvariantCulture); + var score = int.Parse(item.Attribute("Score").Value, CultureInfo.InvariantCulture); + var body = item.Attribute("Body")?.Value; + var creationDate = DateTimeOffset.Parse(item.Attribute("CreationDate").Value, + CultureInfo.InvariantCulture); + var commentCount = int.Parse(item.Attribute("CommentCount").Value, CultureInfo.InvariantCulture); + var ownerUserId = item.Attribute("OwnerUserId") != null + ? int.Parse(item.Attribute("OwnerUserId").Value, CultureInfo.InvariantCulture) + : (int?)null; + var ownerDisplayName = item.Attribute("OwnerDisplayName")?.Value; + var lastEditorUserId = item.Attribute("LastEditorUserId") != null + ? int.Parse(item.Attribute("LastEditorUserId").Value, CultureInfo.InvariantCulture) + : (int?)null; + var lastEditDate = item.Attribute("LastEditDate") != null + ? DateTimeOffset.Parse(item.Attribute("LastEditDate").Value, CultureInfo.InvariantCulture) + : (DateTimeOffset?)null; + var lastActivityDate = item.Attribute("LastActivityDate") != null + ? DateTimeOffset.Parse(item.Attribute("LastActivityDate").Value, CultureInfo.InvariantCulture) + : (DateTimeOffset?)null; + + switch (postTypeId) + { + case 1: + var title = item.Attribute("Title").Value; + + var question = new Question + { + Id = id, + ParentId = JoinField.Root(), + AcceptedAnswerId = item.Attribute("AcceptedAnswerId") != null + ? int.Parse(item.Attribute("AcceptedAnswerId").Value, CultureInfo.InvariantCulture) + : (int?)null, + CreationDate = creationDate, + Score = score, + ViewCount = int.Parse(item.Attribute("ViewCount").Value, CultureInfo.InvariantCulture), + Body = body, + OwnerUserId = ownerUserId, + OwnerDisplayName = ownerDisplayName, + LastEditorUserId = lastEditorUserId, + LastEditorDisplayName = item.Attribute("LastEditorDisplayName")?.Value, + LastEditDate = lastEditDate, + LastActivityDate = lastActivityDate, + Title = title, + TitleSuggest = new CompletionField { Input = new[] { title }, Weight = score < 0 ? 0 : score }, + Tags = GetTags(item.Attribute("Tags")), + AnswerCount = int.Parse(item.Attribute("AnswerCount").Value, + CultureInfo.InvariantCulture), + CommentCount = commentCount, + FavoriteCount = item.Attribute("FavoriteCount") != null + ? int.Parse(item.Attribute("FavoriteCount").Value, CultureInfo.InvariantCulture) + : 0, + CommunityOwnedDate = item.Attribute("CommunityOwnedDate") != null + ? DateTimeOffset.Parse(item.Attribute("CommunityOwnedDate").Value, + CultureInfo.InvariantCulture) + : (DateTimeOffset?)null + }; + + yield return question; + + break; + case 2: + var answer = new Answer + { + Id = id, + ParentId = JoinField.Link(int.Parse(item.Attribute("ParentId").Value)), + CreationDate = creationDate, + Score = score, + Body = body, + OwnerUserId = ownerUserId, + OwnerDisplayName = ownerDisplayName, + LastEditorUserId = lastEditorUserId, + LastEditDate = lastEditDate, + LastActivityDate = lastActivityDate, + CommentCount = commentCount + }; + + yield return answer; + + break; + } + } while (reader.ReadToNextSibling("row")); + } + } + + public static IEnumerable GetUsers(string path) + { + using (var stream = File.OpenRead(path)) + using (var reader = XmlReader.Create(stream)) + { + reader.ReadToDescendant("users"); + reader.ReadToDescendant("row"); + do + { + var item = (XElement)XNode.ReadFrom(reader); + yield return new User + { + Id = int.Parse(item.Attribute("Id").Value, CultureInfo.InvariantCulture), + Reputation = int.Parse(item.Attribute("Reputation").Value, CultureInfo.InvariantCulture), + CreationDate = DateTimeOffset.Parse(item.Attribute("CreationDate").Value, + CultureInfo.InvariantCulture), + DisplayName = item.Attribute("DisplayName")?.Value, + LastAccessDate = DateTimeOffset.Parse(item.Attribute("LastAccessDate").Value, + CultureInfo.InvariantCulture), + WebsiteUrl = item.Attribute("WebsiteUrl")?.Value, + Location = item.Attribute("Location")?.Value, + AboutMe = item.Attribute("AboutMe") != null ? item.Attribute("AboutMe").Value : null, + Views = int.Parse(item.Attribute("Views").Value, CultureInfo.InvariantCulture), + UpVotes = int.Parse(item.Attribute("UpVotes").Value, CultureInfo.InvariantCulture), + DownVotes = int.Parse(item.Attribute("DownVotes").Value, CultureInfo.InvariantCulture), + ProfileImageUrl = item.Attribute("ProfileImageUrl") != null + ? item.Attribute("ProfileImageUrl").Value + : null, + Age = item.Attribute("Age") != null + ? int.Parse(item.Attribute("Age").Value, CultureInfo.InvariantCulture) + : (int?)null, + AccountId = item.Attribute("AccountId") != null + ? int.Parse(item.Attribute("AccountId").Value, CultureInfo.InvariantCulture) + : 0 + }; + } while (reader.ReadToNextSibling("row")); + } + } + + public static IEnumerable GetBadgeMetas(string path) + { + using (var stream = File.OpenRead(path)) + using (var reader = XmlReader.Create(stream)) + { + reader.ReadToDescendant("badges"); + reader.ReadToDescendant("row"); + + do + { + var item = (XElement)XNode.ReadFrom(reader); + + // only interested in tag badges + if (bool.Parse(item.Attribute("TagBased").Value) == false) + continue; + + var badgeClass = (BadgeClass)Enum.Parse(typeof(BadgeClass), item.Attribute("Class").Value); + var name = item.Attribute("Name").Value; + + yield return new BadgeMeta + { + UserId = int.Parse(item.Attribute("UserId").Value, CultureInfo.InvariantCulture), + Badge = new Badge + { + Name = name, + Class = badgeClass, + Date = DateTimeOffset.Parse(item.Attribute("Date").Value, CultureInfo.InvariantCulture) + } + }; + } while (reader.ReadToNextSibling("row")); + } + } + + private static List GetTags(XAttribute attribute) => + attribute?.Value.Replace("<", string.Empty) + .Split(new[] { ">" }, StringSplitOptions.RemoveEmptyEntries) + .ToList(); + } +} diff --git a/src/Tests/Tests.LongRunning/Tests.LongRunning.csproj b/src/Tests/Tests.LongRunning/Tests.LongRunning.csproj new file mode 100644 index 00000000000..f100cab2b95 --- /dev/null +++ b/src/Tests/Tests.LongRunning/Tests.LongRunning.csproj @@ -0,0 +1,18 @@ + + + + Exe + netcoreapp3.0 + + + + + + + + + + + + +