diff --git a/.dockerignore b/.dockerignore
new file mode 100644
index 000000000..21d4f655c
--- /dev/null
+++ b/.dockerignore
@@ -0,0 +1,6 @@
+**/bin/
+**/obj/
+**/out/
+**/layer/
+**Dockerfile*
+*/*.md
\ No newline at end of file
diff --git a/.github/workflows/nuget.yaml b/.github/workflows/nuget.yaml
index 63d384c25..7d68d3e0b 100644
--- a/.github/workflows/nuget.yaml
+++ b/.github/workflows/nuget.yaml
@@ -35,5 +35,9 @@ jobs:
run: dotnet test
- name: Pack
run: dotnet pack --configuration Release src/KubernetesClient -o pkg
+ - name: Pack
+ run: dotnet pack --configuration Release src/KubernetesClient.Informers -o pkg
+ - name: Pack
+ run: dotnet pack --configuration Release src/KubernetesClient.DependencyInjection -o pkg
- name: Push
run: dotnet nuget push pkg\*.nupkg -s https://www.nuget.org/ -k ${{ secrets.nuget_api_key }}
diff --git a/.gitignore b/.gitignore
index 8244919f8..e40db5d6c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,6 +3,7 @@
obj/
bin/
**/TestResults
+pkg/**
# User-specific VS files
*.suo
@@ -13,3 +14,4 @@ bin/
# JetBrains Rider
.idea/
*.sln.iml
+examples/informers/Dockerfile
diff --git a/examples/informers/ControllerService.cs b/examples/informers/ControllerService.cs
new file mode 100644
index 000000000..75d2af01a
--- /dev/null
+++ b/examples/informers/ControllerService.cs
@@ -0,0 +1,29 @@
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+
+namespace informers
+{
+ ///
+ /// Starts all controllers registered in dependency injection container
+ ///
+ public class ControllerService : BackgroundService
+ {
+ private readonly IServiceProvider _serviceProvider;
+
+ public ControllerService(IServiceProvider serviceProvider)
+ {
+ _serviceProvider = serviceProvider;
+ }
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ var controllers = _serviceProvider.GetServices();
+ await Task.WhenAll(controllers.Select(x => x.Initialize(stoppingToken)));
+ }
+
+ }
+}
diff --git a/examples/informers/DeltaChangesQueryingController.cs b/examples/informers/DeltaChangesQueryingController.cs
new file mode 100644
index 000000000..68f23b8ab
--- /dev/null
+++ b/examples/informers/DeltaChangesQueryingController.cs
@@ -0,0 +1,90 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using k8s;
+using k8s.Informers;
+using k8s.Informers.Notifications;
+using k8s.Models;
+using KellermanSoftware.CompareNetObjects;
+using Microsoft.Extensions.Logging;
+
+namespace informers
+{
+ // this sample demos of informer in a basic controller context.
+ // there are two loggers:
+ // _informerLogger lets you see raw data coming out of informer stream
+
+ // try creating and deleting some pods in "default" namespace and watch the output
+ // current code is not production grade and lacks concurrency guards against modifying same resource
+ public class DeltaChangesQueryingController : IController
+ {
+ private readonly IKubernetesInformer _podInformer;
+ private readonly CompositeDisposable _subscription = new CompositeDisposable();
+ private readonly ILogger _informerLogger;
+ private readonly CompareLogic _objectCompare = new CompareLogic();
+
+ public DeltaChangesQueryingController(IKubernetesInformer podInformer, ILoggerFactory loggerFactory)
+ {
+ _podInformer = podInformer;
+ _informerLogger = loggerFactory.CreateLogger("Informer");
+ _objectCompare.Config.MaxDifferences = 100;
+ }
+
+
+ public Task Initialize(CancellationToken cancellationToken)
+ {
+ _podInformer
+ .GetResource(ResourceStreamType.ListWatch, KubernetesInformerOptions.Builder.NamespaceEquals("default").Build())
+ .Resync(TimeSpan.FromSeconds(10))
+ .Catch, Exception>(e =>
+ {
+ _informerLogger.LogCritical(e, e.Message);
+ return Observable.Throw>(e);
+ })
+ .Buffer(TimeSpan.FromSeconds(5))
+ .Where(x => x.Any())
+ .Do(x =>
+ {
+ var eventsPerResource = x.GroupBy(x => x.Value.Metadata.Name);
+ foreach (var item in eventsPerResource)
+ {
+ PrintChanges(item.ToList());
+ }
+ })
+ .Subscribe()
+ .DisposeWith(_subscription);
+ return Task.CompletedTask;
+ }
+
+ private void PrintChanges(IList> changes)
+ {
+ // it's possible to do reconciliation here, but the current code is not production grade and lacks concurrency guards against modifying same resource
+ var obj = changes.First().Value;
+ var sb = new StringBuilder();
+ sb.AppendLine($"Received changes for object with ID {obj.Metadata.Name} with {changes.Count} items");
+ sb.AppendLine($"Last known state was {changes.Last().EventFlags}");
+ foreach (var item in changes)
+ {
+ sb.AppendLine($"==={item.EventFlags}===");
+ sb.AppendLine($"Name: {item.Value.Metadata.Name}");
+ sb.AppendLine($"Version: {item.Value.Metadata.ResourceVersion}");
+ if (item.EventFlags.HasFlag(EventTypeFlags.Modify))
+ {
+ var updateDelta = _objectCompare.Compare(item.OldValue, item.Value);
+ foreach (var difference in updateDelta.Differences)
+ {
+ sb.AppendLine($"{difference.PropertyName}: {difference.Object1} -> {difference.Object2}");
+ }
+ }
+ }
+
+ _informerLogger.LogInformation(sb.ToString());
+
+ }
+ }
+}
diff --git a/examples/informers/IController.cs b/examples/informers/IController.cs
new file mode 100644
index 000000000..9793b66d4
--- /dev/null
+++ b/examples/informers/IController.cs
@@ -0,0 +1,17 @@
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace informers
+{
+ ///
+ /// Base interface for implementing controllers
+ ///
+ public interface IController
+ {
+ ///
+ /// Signals that controller is done processing all the work and no more work will ever be processed.
+ /// Mainly useful in testing
+ ///
+ public Task Initialize(CancellationToken cancellationToken);
+ }
+}
diff --git a/examples/informers/Program.cs b/examples/informers/Program.cs
new file mode 100644
index 000000000..fb5459fdd
--- /dev/null
+++ b/examples/informers/Program.cs
@@ -0,0 +1,29 @@
+using System;
+using System.Linq;
+using k8s;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+namespace informers
+{
+ public class Program
+ {
+ public static void Main(string[] args)
+ {
+ CreateHostBuilder(args).Build().Run();
+ }
+
+ public static IHostBuilder CreateHostBuilder(string[] args) =>
+ Host.CreateDefaultBuilder(args)
+ .ConfigureLogging(x => x.AddConsole())
+ .ConfigureServices((hostContext, services) =>
+ {
+ services.AddKubernetesClient(KubernetesClientConfiguration.BuildDefaultConfig);
+ services.AddKubernetesInformers();
+
+ services.AddHostedService();
+ services.AddSingleton();
+ });
+ }
+}
diff --git a/examples/informers/Properties/launchSettings.json b/examples/informers/Properties/launchSettings.json
new file mode 100644
index 000000000..9631e8a81
--- /dev/null
+++ b/examples/informers/Properties/launchSettings.json
@@ -0,0 +1,10 @@
+{
+ "profiles": {
+ "K8SControllerExample": {
+ "commandName": "Project",
+ "environmentVariables": {
+ "DOTNET_ENVIRONMENT": "Development"
+ }
+ }
+ }
+}
diff --git a/examples/informers/appsettings.Development.json b/examples/informers/appsettings.Development.json
new file mode 100644
index 000000000..8983e0fc1
--- /dev/null
+++ b/examples/informers/appsettings.Development.json
@@ -0,0 +1,9 @@
+{
+ "Logging": {
+ "LogLevel": {
+ "Default": "Information",
+ "Microsoft": "Warning",
+ "Microsoft.Hosting.Lifetime": "Information"
+ }
+ }
+}
diff --git a/examples/informers/appsettings.json b/examples/informers/appsettings.json
new file mode 100644
index 000000000..8983e0fc1
--- /dev/null
+++ b/examples/informers/appsettings.json
@@ -0,0 +1,9 @@
+{
+ "Logging": {
+ "LogLevel": {
+ "Default": "Information",
+ "Microsoft": "Warning",
+ "Microsoft.Hosting.Lifetime": "Information"
+ }
+ }
+}
diff --git a/examples/informers/informers.csproj b/examples/informers/informers.csproj
new file mode 100644
index 000000000..17b617a42
--- /dev/null
+++ b/examples/informers/informers.csproj
@@ -0,0 +1,19 @@
+
+
+
+ netcoreapp3.1
+ true
+ ..\..\src\KubernetesClient\kubernetes-client.snk
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/kubernetes-client.sln b/kubernetes-client.sln
index 7627fbd58..abcb952c8 100644
--- a/kubernetes-client.sln
+++ b/kubernetes-client.sln
@@ -35,6 +35,14 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "patch", "examples\patch\pat
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "httpClientFactory", "examples\httpClientFactory\httpClientFactory.csproj", "{A07314A0-02E8-4F36-B233-726D59D28F08}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "informers", "examples\informers\informers.csproj", "{A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KubernetesClient.Informers", "src\KubernetesClient.Informers\KubernetesClient.Informers.csproj", "{7C1B2872-A0E3-4437-9532-0227F419399C}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KubernetesClient.Informers.Tests", "tests\KubernetesClient.Informers.Tests\KubernetesClient.Informers.Tests.csproj", "{5116C7E7-66A8-4523-AE5A-4CF00391C399}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KubernetesClient.DependencyInjection", "src\KubernetesClient.DependencyInjection\KubernetesClient.DependencyInjection.csproj", "{5832A440-3DD9-47A4-8D3C-1FB049978297}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -189,6 +197,54 @@ Global
{A07314A0-02E8-4F36-B233-726D59D28F08}.Release|x64.Build.0 = Release|Any CPU
{A07314A0-02E8-4F36-B233-726D59D28F08}.Release|x86.ActiveCfg = Release|Any CPU
{A07314A0-02E8-4F36-B233-726D59D28F08}.Release|x86.Build.0 = Release|Any CPU
+ {A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}.Debug|x64.Build.0 = Debug|Any CPU
+ {A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}.Debug|x86.Build.0 = Debug|Any CPU
+ {A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}.Release|Any CPU.Build.0 = Release|Any CPU
+ {A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}.Release|x64.ActiveCfg = Release|Any CPU
+ {A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}.Release|x64.Build.0 = Release|Any CPU
+ {A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}.Release|x86.ActiveCfg = Release|Any CPU
+ {A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}.Release|x86.Build.0 = Release|Any CPU
+ {7C1B2872-A0E3-4437-9532-0227F419399C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {7C1B2872-A0E3-4437-9532-0227F419399C}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {7C1B2872-A0E3-4437-9532-0227F419399C}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {7C1B2872-A0E3-4437-9532-0227F419399C}.Debug|x64.Build.0 = Debug|Any CPU
+ {7C1B2872-A0E3-4437-9532-0227F419399C}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {7C1B2872-A0E3-4437-9532-0227F419399C}.Debug|x86.Build.0 = Debug|Any CPU
+ {7C1B2872-A0E3-4437-9532-0227F419399C}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {7C1B2872-A0E3-4437-9532-0227F419399C}.Release|Any CPU.Build.0 = Release|Any CPU
+ {7C1B2872-A0E3-4437-9532-0227F419399C}.Release|x64.ActiveCfg = Release|Any CPU
+ {7C1B2872-A0E3-4437-9532-0227F419399C}.Release|x64.Build.0 = Release|Any CPU
+ {7C1B2872-A0E3-4437-9532-0227F419399C}.Release|x86.ActiveCfg = Release|Any CPU
+ {7C1B2872-A0E3-4437-9532-0227F419399C}.Release|x86.Build.0 = Release|Any CPU
+ {5116C7E7-66A8-4523-AE5A-4CF00391C399}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {5116C7E7-66A8-4523-AE5A-4CF00391C399}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {5116C7E7-66A8-4523-AE5A-4CF00391C399}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {5116C7E7-66A8-4523-AE5A-4CF00391C399}.Debug|x64.Build.0 = Debug|Any CPU
+ {5116C7E7-66A8-4523-AE5A-4CF00391C399}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {5116C7E7-66A8-4523-AE5A-4CF00391C399}.Debug|x86.Build.0 = Debug|Any CPU
+ {5116C7E7-66A8-4523-AE5A-4CF00391C399}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {5116C7E7-66A8-4523-AE5A-4CF00391C399}.Release|Any CPU.Build.0 = Release|Any CPU
+ {5116C7E7-66A8-4523-AE5A-4CF00391C399}.Release|x64.ActiveCfg = Release|Any CPU
+ {5116C7E7-66A8-4523-AE5A-4CF00391C399}.Release|x64.Build.0 = Release|Any CPU
+ {5116C7E7-66A8-4523-AE5A-4CF00391C399}.Release|x86.ActiveCfg = Release|Any CPU
+ {5116C7E7-66A8-4523-AE5A-4CF00391C399}.Release|x86.Build.0 = Release|Any CPU
+ {5832A440-3DD9-47A4-8D3C-1FB049978297}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {5832A440-3DD9-47A4-8D3C-1FB049978297}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {5832A440-3DD9-47A4-8D3C-1FB049978297}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {5832A440-3DD9-47A4-8D3C-1FB049978297}.Debug|x64.Build.0 = Debug|Any CPU
+ {5832A440-3DD9-47A4-8D3C-1FB049978297}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {5832A440-3DD9-47A4-8D3C-1FB049978297}.Debug|x86.Build.0 = Debug|Any CPU
+ {5832A440-3DD9-47A4-8D3C-1FB049978297}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {5832A440-3DD9-47A4-8D3C-1FB049978297}.Release|Any CPU.Build.0 = Release|Any CPU
+ {5832A440-3DD9-47A4-8D3C-1FB049978297}.Release|x64.ActiveCfg = Release|Any CPU
+ {5832A440-3DD9-47A4-8D3C-1FB049978297}.Release|x64.Build.0 = Release|Any CPU
+ {5832A440-3DD9-47A4-8D3C-1FB049978297}.Release|x86.ActiveCfg = Release|Any CPU
+ {5832A440-3DD9-47A4-8D3C-1FB049978297}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -206,6 +262,10 @@ Global
{542DC30E-FDF7-4A35-B026-6C21F435E8B1} = {879F8787-C3BB-43F3-A92D-6D4C7D3A5285}
{04DE2C84-117D-4E21-8B45-B7AE627697BD} = {B70AFB57-57C9-46DC-84BE-11B7DDD34B40}
{A07314A0-02E8-4F36-B233-726D59D28F08} = {B70AFB57-57C9-46DC-84BE-11B7DDD34B40}
+ {A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2} = {B70AFB57-57C9-46DC-84BE-11B7DDD34B40}
+ {7C1B2872-A0E3-4437-9532-0227F419399C} = {3D1864AA-1FFC-4512-BB13-46055E410F73}
+ {5116C7E7-66A8-4523-AE5A-4CF00391C399} = {8AF4A5C2-F0CE-47D5-A4C5-FE4AB83CA509}
+ {5832A440-3DD9-47A4-8D3C-1FB049978297} = {3D1864AA-1FFC-4512-BB13-46055E410F73}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {049A763A-C891-4E8D-80CF-89DD3E22ADC7}
diff --git a/src/KubernetesClient.DependencyInjection/KubernetesClient.DependencyInjection.csproj b/src/KubernetesClient.DependencyInjection/KubernetesClient.DependencyInjection.csproj
new file mode 100644
index 000000000..3e1b3959c
--- /dev/null
+++ b/src/KubernetesClient.DependencyInjection/KubernetesClient.DependencyInjection.csproj
@@ -0,0 +1,35 @@
+
+
+
+ The Kubernetes Project Authors
+ 2017 The Kubernetes Project Authors
+ Client library for the Kubernetes open source container orchestrator.
+
+ https://www.apache.org/licenses/LICENSE-2.0
+ https://github.com/kubernetes-client/csharp
+ https://raw.githubusercontent.com/kubernetes/kubernetes/master/logo/logo.png
+ kubernetes;docker;containers;
+ netstandard2.0
+ k8s
+ true
+ ..\KubernetesClient\kubernetes-client.snk
+ true
+ 1701;1702;1591;1570;1572;1573;1574
+
+
+ true
+
+
+ true
+ snupkg
+ 8
+
+
+
+
+
+
+
+
+
+
diff --git a/src/KubernetesClient.DependencyInjection/ServiceCollectionKubernetesExtensions.cs b/src/KubernetesClient.DependencyInjection/ServiceCollectionKubernetesExtensions.cs
new file mode 100644
index 000000000..688800c46
--- /dev/null
+++ b/src/KubernetesClient.DependencyInjection/ServiceCollectionKubernetesExtensions.cs
@@ -0,0 +1,38 @@
+using System;
+using System.Linq;
+using System.Threading;
+using k8s;
+using k8s.Informers;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Rest;
+using Microsoft.Rest.TransientFaultHandling;
+
+namespace informers
+{
+ public static class Extensions
+ {
+ public static IServiceCollection AddKubernetesClient(this IServiceCollection services, Func configProvider)
+ {
+ var config = configProvider();
+ services.AddHttpClient("DefaultName")
+ .AddTypedClient((httpClient, serviceProvider) =>
+ {
+ httpClient.Timeout = Timeout.InfiniteTimeSpan;
+ return new Kubernetes(config, httpClient);
+ })
+ .AddHttpMessageHandler(() => new TimeoutHandler(TimeSpan.FromSeconds(100)))
+ .AddHttpMessageHandler(() => new RetryDelegatingHandler { RetryPolicy = new RetryPolicy(new ExponentialBackoffRetryStrategy()) })
+ .AddHttpMessageHandler(KubernetesClientConfiguration.CreateWatchHandler)
+ .ConfigurePrimaryHttpMessageHandler(config.CreateDefaultHttpClientHandler);
+
+ return services;
+ }
+
+ public static IServiceCollection AddKubernetesInformers(this IServiceCollection services)
+ {
+ services.AddTransient(typeof(KubernetesInformer<>));
+ services.AddSingleton(typeof(IKubernetesInformer<>), typeof(SharedKubernetesInformer<>));
+ return services;
+ }
+ }
+}
diff --git a/src/KubernetesClient.DependencyInjection/version.json b/src/KubernetesClient.DependencyInjection/version.json
new file mode 100644
index 000000000..de893e06e
--- /dev/null
+++ b/src/KubernetesClient.DependencyInjection/version.json
@@ -0,0 +1,8 @@
+{
+ "$schema": "https://raw.githubusercontent.com/AArnott/Nerdbank.GitVersioning/master/src/NerdBank.GitVersioning/version.schema.json",
+ "version": "2.0",
+ "publicReleaseRefSpec": [
+ "^refs/heads/master$" // we release out of master
+ ],
+ "pathFilters": [".", "../KubernetesClient","../KubernetesClient.Informers"]
+}
diff --git a/src/KubernetesClient.Informers/Cache/CacheSynchronized.cs b/src/KubernetesClient.Informers/Cache/CacheSynchronized.cs
new file mode 100644
index 000000000..c7d235b30
--- /dev/null
+++ b/src/KubernetesClient.Informers/Cache/CacheSynchronized.cs
@@ -0,0 +1,29 @@
+namespace k8s.Informers.Cache
+{
+ public struct CacheSynchronized
+ {
+ public CacheSynchronized(long messageNumber, long cacheVersion, T value)
+ {
+ MessageNumber = messageNumber;
+ CacheVersion = cacheVersion;
+ Value = value;
+ }
+
+ ///
+ /// Message number in the sequencer
+ ///
+ public long MessageNumber { get; }
+
+ ///
+ /// The version of cache this message was included in
+ ///
+ public long CacheVersion { get; }
+
+ public T Value { get; }
+
+ public override string ToString()
+ {
+ return $"MessageNumber: {MessageNumber}, IncludedInCache: {CacheVersion}: {Value}";
+ }
+ }
+}
diff --git a/src/KubernetesClient.Informers/Cache/ICache.cs b/src/KubernetesClient.Informers/Cache/ICache.cs
new file mode 100644
index 000000000..67775c672
--- /dev/null
+++ b/src/KubernetesClient.Informers/Cache/ICache.cs
@@ -0,0 +1,38 @@
+using System;
+using System.Collections.Generic;
+
+namespace k8s.Informers.Cache
+{
+ ///
+ /// Maintains cache of objects of type .
+ ///
+ /// The type of key
+ /// The type of resource
+ public interface ICache : IDictionary, IDisposable
+ {
+ ///
+ /// Current version of cache
+ ///
+ long Version { get; set; }
+
+ ///
+ /// Replace all values in cache with new values
+ ///
+ ///
+ void Reset(IDictionary newValues);
+
+ /// Takes a snapshot of the current cache that is version locked
+ ///
+ /// Copy of current cache locked to the version at the time cache is snapshot is taken
+ ICacheSnapshot Snapshot();
+ }
+
+ // A readonly snapshot of cache at a point in time
+ public interface ICacheSnapshot : IReadOnlyDictionary
+ {
+ ///
+ /// Current version of cache
+ ///
+ long Version { get; }
+ }
+}
diff --git a/src/KubernetesClient.Informers/Cache/SimpleCache.cs b/src/KubernetesClient.Informers/Cache/SimpleCache.cs
new file mode 100644
index 000000000..f59c196a0
--- /dev/null
+++ b/src/KubernetesClient.Informers/Cache/SimpleCache.cs
@@ -0,0 +1,213 @@
+using System.Collections;
+using System.Collections.Generic;
+using System.Linq;
+
+namespace k8s.Informers.Cache
+{
+ public class SimpleCache : ICache
+ {
+ private readonly IDictionary _items;
+ private readonly object _syncRoot = new object();
+
+ public SimpleCache()
+ {
+ _items = new Dictionary();
+ }
+
+ public SimpleCache(IDictionary items, long version)
+ {
+ Version = version;
+ _items = new Dictionary(items);
+ }
+
+ public void Reset(IDictionary newValues)
+ {
+ lock (_syncRoot)
+ {
+ _items.Clear();
+ foreach (var item in newValues)
+ {
+ _items.Add(item.Key, item.Value);
+ }
+ }
+ }
+
+ public ICacheSnapshot Snapshot()
+ {
+ lock (_syncRoot)
+ {
+ return new SimpleCacheSnapshot(this, Version);
+ }
+ }
+
+ public IEnumerator> GetEnumerator()
+ {
+ lock (_syncRoot)
+ {
+ return _items.ToList().GetEnumerator();
+ }
+ }
+
+ IEnumerator IEnumerable.GetEnumerator()
+ {
+ lock (_syncRoot)
+ {
+ return _items.ToList().GetEnumerator();
+ }
+ }
+
+ public void Add(KeyValuePair item)
+ {
+ lock (_syncRoot)
+ {
+ _items.Add(item);
+ }
+ }
+
+ public void Clear()
+ {
+ lock (_syncRoot)
+ {
+ _items.Clear();
+ }
+ }
+
+ public bool Contains(KeyValuePair item)
+ {
+ lock (_syncRoot)
+ {
+ return _items.Contains(item);
+ }
+ }
+
+ public void CopyTo(KeyValuePair[] array, int arrayIndex)
+ {
+ lock (_syncRoot)
+ {
+ _items.CopyTo(array, arrayIndex);
+ }
+ }
+
+ public bool Remove(KeyValuePair item)
+ {
+ lock (_syncRoot)
+ {
+ return _items.Remove(item.Key);
+ }
+ }
+
+ public int Count
+ {
+ get
+ {
+ lock (_syncRoot)
+ {
+ return _items.Count;
+ }
+ }
+ }
+
+ public bool IsReadOnly => false;
+
+ public void Add(TKey key, TResource value)
+ {
+ lock (_syncRoot)
+ {
+ _items.Add(key, value);
+ }
+ }
+
+ public bool ContainsKey(TKey key)
+ {
+ lock (_syncRoot)
+ {
+ return _items.ContainsKey(key);
+ }
+ }
+
+ public bool Remove(TKey key)
+ {
+ lock (_syncRoot)
+ {
+ if (!_items.Remove(key, out var existing))
+ {
+ return false;
+ }
+ return true;
+ }
+ }
+
+ public bool TryGetValue(TKey key, out TResource value)
+ {
+ lock (_syncRoot)
+ {
+ return _items.TryGetValue(key, out value);
+ }
+ }
+
+ public TResource this[TKey key]
+ {
+ get
+ {
+ lock (_syncRoot)
+ {
+ return _items[key];
+ }
+ }
+ set
+ {
+ lock (_syncRoot)
+ {
+ _items[key] = value;
+ }
+ }
+ }
+
+ public ICollection Keys
+ {
+ get
+ {
+ lock (_syncRoot)
+ {
+ return _items.Keys.ToList();
+ }
+ }
+ }
+
+ public ICollection Values
+ {
+ get
+ {
+ lock (_syncRoot)
+ {
+ return _items.Values.ToList();
+ }
+ }
+ }
+
+ public void Dispose()
+ {
+ }
+
+ public long Version { get; set; }
+
+ internal sealed class SimpleCacheSnapshot : ICacheSnapshot
+ {
+ private readonly Dictionary _items;
+ public SimpleCacheSnapshot(IDictionary cache, long version)
+ {
+ _items = new Dictionary(cache);
+ Version = version;
+ }
+ public IEnumerator> GetEnumerator() => _items.GetEnumerator();
+ IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
+ public int Count => _items.Count;
+ public bool ContainsKey(TKey key) => _items.ContainsKey(key);
+ public bool TryGetValue(TKey key, out TResource value) => _items.TryGetValue(key, out value);
+ public TResource this[TKey key] => _items[key];
+ public IEnumerable Keys => _items.Keys;
+ public IEnumerable Values => _items.Values;
+ public long Version { get; }
+ }
+ }
+}
diff --git a/src/KubernetesClient.Informers/Cache/VersionPartitionedSharedCache.cs b/src/KubernetesClient.Informers/Cache/VersionPartitionedSharedCache.cs
new file mode 100644
index 000000000..0fbb4c732
--- /dev/null
+++ b/src/KubernetesClient.Informers/Cache/VersionPartitionedSharedCache.cs
@@ -0,0 +1,304 @@
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Linq;
+
+namespace k8s.Informers.Cache
+{
+ ///
+ /// Allows creating cache partitions for objects that have versioning semantics. Each partition will maintain its own view of its tracked objects,
+ /// but any items with same key and version will be shared across multiple cache partitions.
+ ///
+ ///
+ /// The semantics of this class allows for object reuse between informers without compromising each informers ownership of its own cache. Primarily the issue
+ /// it solves is if multiple informers are created with different options, but the data they receive may overlap
+ /// (ex. overlapping labels, or informer scoped to namespace and another scoped globally). Since the master informer (actual connection to physical server) will receive
+ /// same notification over separate channels, we run the risk of informer desynchronization if they share the same cache. However, if each informer maintains it's own
+ /// cache, we may get multiple duplicate objects in memory. This allows any objects that share the same key/version to point to the same reference, while maintaining
+ /// integrity of each cache (dictionary). Note that unlike a regular dictionary, this does not allow updates to same key/version
+ ///
+ /// The type of the key uniquely identifying object
+ /// The type of resource
+ /// The type of version associated with object
+ public class VersionPartitionedSharedCache
+ {
+ private readonly Func _keySelector;
+ private readonly object _lock = new object();
+ private readonly Func _versionSelector;
+
+ private readonly HashSet _views = new HashSet();
+
+ // internal to allow for unit testing
+ internal readonly Dictionary Items = new Dictionary();
+
+ public VersionPartitionedSharedCache(Func keySelector, Func versionSelector)
+ {
+ _keySelector = keySelector;
+ _versionSelector = versionSelector;
+ }
+
+ ///
+ /// Creates a unique cache partition that may share references to objects with same key/versions with other partitions
+ ///
+ /// Partitioned cache
+ public ICache CreatePartition()
+ {
+ lock (_lock)
+ {
+ var view = new CacheView(this);
+ _views.Add(view);
+ return view;
+ }
+ }
+
+
+ private void Remove(TResource resource, CacheView originView)
+ {
+ var versionedKey = GetVersionKeyFor(resource);
+ Remove(versionedKey, originView);
+ }
+
+ private void Remove(VersionResourceKey versionedKey, CacheView originView)
+ {
+ var otherViewsTrackingResource = _views
+ .Except(new[] { originView })
+ .Any(x => x.TryGetValue(versionedKey.Key, out var resource) && _versionSelector(resource).Equals(versionedKey.Version));
+ if (!otherViewsTrackingResource)
+ {
+ Items.Remove(versionedKey);
+ }
+ }
+
+ private TResource GetOrAdd(TResource resource)
+ {
+ var key = GetVersionKeyFor(resource);
+ if (Items.TryGetValue(key, out var existingResource))
+ {
+ return existingResource;
+ }
+ Items.Add(key, resource);
+ return resource;
+ }
+
+ private VersionResourceKey GetVersionKeyFor(TResource resource)
+ {
+ return new VersionResourceKey { Key = _keySelector(resource), Version = _versionSelector(resource) };
+ }
+
+ internal struct VersionResourceKey
+ {
+ public TKey Key;
+ public TVersion Version;
+ }
+
+ private class CacheView : ICache
+ {
+ private readonly Dictionary _items = new Dictionary();
+ private readonly VersionPartitionedSharedCache _parent;
+
+ public CacheView(VersionPartitionedSharedCache parent)
+ {
+ _parent = parent;
+ }
+
+ public long Version { get; set; } // = 1;
+
+ public void Reset(IDictionary newValues)
+ {
+ lock (_parent._lock)
+ {
+ _items.Clear();
+ foreach (var item in newValues)
+ {
+ _items.Add(item.Key, item.Value);
+ }
+ }
+ }
+
+ public ICacheSnapshot Snapshot()
+ {
+ lock (_parent._lock)
+ {
+ return new SimpleCache.SimpleCacheSnapshot(this, Version);
+ }
+ }
+
+ public IEnumerator> GetEnumerator()
+ {
+ lock (_parent._lock)
+ {
+ return _items.ToList().GetEnumerator();
+ }
+ }
+
+ IEnumerator IEnumerable.GetEnumerator()
+ {
+ lock (_parent._lock)
+ {
+ return _items.ToList().GetEnumerator();
+ }
+ }
+
+ public void Add(KeyValuePair item)
+ {
+ lock (_parent._lock)
+ {
+ AssertMatchingKeys(item.Key, item.Value);
+ var cacheItem = _parent.GetOrAdd(item.Value);
+ _items.Add(_parent._keySelector(cacheItem), cacheItem);
+ }
+ }
+
+ public void Clear()
+ {
+ lock (_parent._lock)
+ {
+ foreach (var item in _items)
+ {
+ _parent.Remove(item.Value, this);
+ }
+
+ _items.Clear();
+ }
+ }
+
+ public bool Contains(KeyValuePair item)
+ {
+ lock (_parent._lock)
+ {
+ return _items.Contains(item);
+ }
+ }
+
+ public void CopyTo(KeyValuePair[] array, int arrayIndex)
+ {
+ lock (_parent._lock)
+ {
+ ((IDictionary)_items).CopyTo(array, arrayIndex);
+ }
+ }
+
+ public bool Remove(KeyValuePair item)
+ {
+ lock (_parent._lock)
+ {
+ _parent.Remove(item.Value, this);
+ return _items.Remove(item.Key);
+ }
+ }
+
+ public int Count
+ {
+ get
+ {
+ lock (_parent._lock)
+ {
+ return _items.Count;
+ }
+ }
+ }
+
+ public bool IsReadOnly => false;
+
+ public void Add(TKey key, TResource value)
+ {
+ lock (_parent._lock)
+ {
+ AssertMatchingKeys(key, value);
+ value = _parent.GetOrAdd(value);
+ _items.Add(key, value);
+ }
+ }
+
+ public bool ContainsKey(TKey key)
+ {
+ lock (_parent._lock)
+ {
+ return _items.ContainsKey(key);
+ }
+ }
+
+ public bool Remove(TKey key)
+ {
+ lock (_parent._lock)
+ {
+ if (!_items.Remove(key, out var existing))
+ {
+ return false;
+ }
+ _parent.Remove(existing, this);
+ return true;
+ }
+ }
+
+ public bool TryGetValue(TKey key, out TResource value)
+ {
+ lock (_parent._lock)
+ {
+ return _items.TryGetValue(key, out value);
+ }
+ }
+
+ public TResource this[TKey key]
+ {
+ get
+ {
+ lock (_parent._lock)
+ {
+ return _items[key];
+ }
+ }
+ set
+ {
+ // the semantics of set here are tricky because if the value already exists, it will reuse existing
+ // this means that consumers should not make assumption that the object that was passed as value to set
+ // is the one that got added to collection, and should always do a "get" operation if they plan on modifying it
+ lock (_parent._lock)
+ {
+ AssertMatchingKeys(key, value);
+ var existing = _parent.GetOrAdd(value);
+ _items[key] = existing;
+ }
+ }
+ }
+
+ public ICollection Keys
+ {
+ get
+ {
+ lock (_parent._lock)
+ {
+ return _items.Keys.ToList();
+ }
+ }
+ }
+
+ public ICollection Values
+ {
+ get
+ {
+ lock (_parent._lock)
+ {
+ return _items.Values.ToList();
+ }
+ }
+ }
+
+ public void Dispose()
+ {
+ lock (_parent._lock)
+ {
+ _parent._views.Remove(this);
+ }
+ }
+
+ private void AssertMatchingKeys(TKey key, TResource resource)
+ {
+ if (!key.Equals(_parent._keySelector(resource)))
+ {
+ throw new InvalidOperationException("The value of the key specified is not the same as the one inside the resource");
+ }
+ }
+ }
+ }
+}
diff --git a/src/KubernetesClient.Informers/Extensions.cs b/src/KubernetesClient.Informers/Extensions.cs
new file mode 100644
index 000000000..494a6d640
--- /dev/null
+++ b/src/KubernetesClient.Informers/Extensions.cs
@@ -0,0 +1,148 @@
+using System;
+using System.Collections.Generic;
+using System.Reactive;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Threading.Tasks;
+using k8s.Models;
+using Microsoft.Rest;
+
+namespace k8s.Informers
+{
+ public static class Extensions
+ {
+ ///
+ /// Removes an item from the dictionary
+ ///
+ /// The source dictionary
+ /// The key for which item should be removed
+ /// The value of the object that was removed, or if value was not present in dictionary
+ /// The type of key
+ /// The type of value
+ /// if the object was removed from dictionry, or if the specific key was not present in dictionary
+ internal static bool Remove(this IDictionary source, TKey key, out TValue result)
+ {
+ result = default;
+ if (!source.TryGetValue(key, out result))
+ {
+ return false;
+ }
+
+ source.Remove(key);
+ return true;
+ }
+
+
+ ///
+ /// Creates a for
+ ///
+ /// The source enumerable
+ /// The type of elements
+ /// The produced hashset
+ internal static HashSet ToHashSet(this IEnumerable source)
+ {
+ return source.ToHashSet(null);
+ }
+
+ ///
+ /// Creates a for
+ ///
+ /// The source enumerable
+ /// The comparer to use
+ /// The type of elements
+ /// The produced hashset
+ internal static HashSet ToHashSet(
+ this IEnumerable source,
+ IEqualityComparer comparer)
+ {
+ if (source == null)
+ {
+ throw new ArgumentNullException(nameof(source));
+ }
+
+ return new HashSet(source, comparer);
+ }
+
+
+ ///
+ /// Attaches the source to the target
+ ///
+ /// The original
+ /// The to attach to
+ /// The original disposable passed as
+ public static IDisposable DisposeWith(this IDisposable source, CompositeDisposable composite)
+ {
+ composite.Add(source);
+ return source;
+ }
+
+ ///
+ /// Combines the source disposable with another into a single disposable
+ ///
+ /// The original
+ /// The to combine with
+ /// Composite disposable made up of and
+ public static IDisposable CombineWith(this IDisposable source, IDisposable other)
+ {
+ return new CompositeDisposable(source, other);
+ }
+
+ public static IDisposable Subscribe(this IObservable source, IObserver observer, Action onFinished = null)
+ {
+ return source.Subscribe(observer, _ => { }, x => onFinished(), onFinished);
+ }
+
+ public static IDisposable Subscribe(this IObservable source, IObserver observer, Action onNext = null, Action onError = null, Action onCompleted = null)
+ {
+ onNext ??= obj => { };
+ onError ??= obj => { };
+ onCompleted ??= () => { };
+ return source.Subscribe(x =>
+ {
+ observer.OnNext(x);
+ onNext(x);
+ },
+ error =>
+ {
+ observer.OnError(error);
+ onError(error);
+ },
+ () =>
+ {
+ observer.OnCompleted();
+ onCompleted();
+ });
+ }
+
+
+ public static IObservable> BufferWithThrottle(this IObservable source, int maxAmount, TimeSpan threshold)
+ {
+ return Observable.Create>(obs =>
+ {
+ return source.GroupByUntil(_ => true,
+ g => g.Throttle(threshold).Select(_ => Unit.Default)
+ .Merge(g.Take(maxAmount)
+ .LastAsync()
+ .Select(_ => Unit.Default)))
+ .SelectMany(i => i.ToList())
+ .Subscribe(obs);
+ });
+ }
+ public static IObservable.WatchEvent> Watch(this Task>> responseTask) where T : IKubernetesObject
+ {
+ return Observable.Create.WatchEvent>(observer =>
+ {
+ void OnNext(WatchEventType type, T item) => observer.OnNext(new k8s.Watcher.WatchEvent { Type = type, Object = item });
+ var watcher = responseTask.Watch>(OnNext, observer.OnError, observer.OnCompleted);
+ var eventSubscription = Disposable.Create(() =>
+ {
+ watcher.OnEvent -= OnNext;
+ watcher.OnError -= observer.OnError;
+ watcher.OnClosed -= observer.OnCompleted;
+ });
+ return new CompositeDisposable(watcher, eventSubscription);
+ });
+
+ }
+ }
+}
diff --git a/src/KubernetesClient.Informers/FaultTolerance/Extensions.cs b/src/KubernetesClient.Informers/FaultTolerance/Extensions.cs
new file mode 100644
index 000000000..43b23f3d5
--- /dev/null
+++ b/src/KubernetesClient.Informers/FaultTolerance/Extensions.cs
@@ -0,0 +1,30 @@
+using System;
+using System.Net;
+using System.Net.Http;
+using Microsoft.Rest.TransientFaultHandling;
+
+namespace k8s.Informers.FaultTolerance
+{
+ public static class Extensions
+ {
+ ///
+ /// Checks if the type of exception is the one that is temporary and will resolve itself over time
+ ///
+ /// Exception to check
+ /// Return if exception is transient, or if it's not
+ public static bool IsTransient(this Exception exception)
+ {
+ if (exception is HttpRequestWithStatusException statusException)
+ {
+ return statusException.StatusCode >= HttpStatusCode.ServiceUnavailable || statusException.StatusCode == HttpStatusCode.RequestTimeout;
+ }
+
+ if (exception is HttpRequestException || exception is KubernetesException)
+ {
+ return true;
+ }
+
+ return false;
+ }
+ }
+}
diff --git a/src/KubernetesClient.Informers/FaultTolerance/RetryPolicy.cs b/src/KubernetesClient.Informers/FaultTolerance/RetryPolicy.cs
new file mode 100644
index 000000000..ec1c80c16
--- /dev/null
+++ b/src/KubernetesClient.Informers/FaultTolerance/RetryPolicy.cs
@@ -0,0 +1,81 @@
+using System;
+using System.Reactive.Linq;
+using System.Threading.Tasks;
+
+namespace k8s.Informers.FaultTolerance
+{
+ ///
+ /// Specifies retry policy to apply to a Task or Observable
+ ///
+ ///
+ /// This class could potentially be replaced by Polly, but currently Polly doesn't work with observables (need access to policy rules in the builder, which are internal atm).
+ ///
+ public class RetryPolicy
+ {
+ ///
+ /// No retry policy should be applied
+ ///
+ public static readonly RetryPolicy None = new RetryPolicy((_, __) => false, _ => TimeSpan.Zero);
+
+ /// A delegate which accepts exception being handled and retry attempt, and returns if retry should be attempted
+ /// A delegate that accepts retry attempt and returns delay till next retry attempt
+ public RetryPolicy(Func shouldRetry, Func retryDelay)
+ {
+ ShouldRetry = shouldRetry;
+ RetryDelay = retryDelay;
+ }
+
+ internal Func ShouldRetry { get; }
+ internal Func RetryDelay { get; }
+
+ ///
+ /// Executes a given task while applying the specified retry policy
+ ///
+ /// Delegate for the task to execute
+ /// Return type of the Task
+ /// Task result
+ public async Task ExecuteAsync(Func> action)
+ {
+ var retryCount = 1;
+ while (true)
+ try
+ {
+ return await action().ConfigureAwait(false); ;
+ }
+ catch (Exception e)
+ {
+ if (!ShouldRetry(e, retryCount))
+ {
+ throw;
+ }
+ retryCount++;
+ await Task.Delay(RetryDelay(retryCount)).ConfigureAwait(false); ;
+ }
+ }
+ }
+
+ public static class RetryPolicyExtensions
+ {
+ ///
+ /// Catches any exceptions in observable sequence and handles them with the specified retry policy.
+ /// Resubscribes to the observable if the policy determines that retry should be attempted
+ ///
+ /// The source observable
+ /// The retry policy to apply
+ /// The type of the observable
+ /// Original observable wrapped in retry policy
+ public static IObservable WithRetryPolicy(this IObservable observable, RetryPolicy retryPolicy)
+ {
+ var retryCounter = 1;
+ return observable.Catch(exception =>
+ {
+ if (!retryPolicy.ShouldRetry(exception, retryCounter))
+ {
+ return Observable.Throw(exception);
+ }
+ retryCounter++;
+ return observable.DelaySubscription(retryPolicy.RetryDelay(retryCounter));
+ });
+ }
+ }
+}
diff --git a/src/KubernetesClient.Informers/IInformer.cs b/src/KubernetesClient.Informers/IInformer.cs
new file mode 100644
index 000000000..78bcc7a29
--- /dev/null
+++ b/src/KubernetesClient.Informers/IInformer.cs
@@ -0,0 +1,56 @@
+using System;
+using k8s.Informers.Notifications;
+
+namespace k8s.Informers
+{
+ ///
+ /// Provides observable abstraction over collections of resource of type which support List/Watch semantics
+ ///
+ /// The type of resource
+ public interface IInformer
+ {
+ ///
+ /// Exposes an Observable stream over a resource of a particular type
+ ///
+ ///
+ /// Message stream semantics are as following
+ /// - When subscription is first established and is has flag set
+ /// the first batch of messages that will be sent when subscription is opened the current state of all the objects being monitored.
+ /// This batch is referred to as "resource list reset".
+ /// - Each message in reset event will be of type
+ /// - The boundaries of the reset event will be marked with and
+ /// - If there are no objects in a reset list event, and the has a flag set,
+ /// message with flag is used to mark the end of List operation and start of Watch
+ ///
+ /// Observable type
+ /// Observable stream for resources of a particular type
+ IObservable> GetResource(ResourceStreamType type);
+ }
+
+ ///
+ /// Provides observable abstraction over collections of resource of type which support List/Watch semantics,
+ /// and support subscriptions with type
+ ///
+ /// The type of resource
+ /// The type of options
+ public interface IInformer
+ {
+ ///
+ /// Exposes an Observable stream over a resource of a particular type
+ ///
+ ///
+ /// Message stream semantics are as following
+ /// - When subscription is first established and is has flag set
+ /// the first batch of messages that will be sent when subscription is opened the current state of all the objects being monitored.
+ /// This batch is referred to as "resource list reset".
+ /// - Each message in reset event will be of type
+ /// - The boundaries of the reset event will be marked with and
+ /// - If there are no objects in a reset list event, and the has a flag set,
+ /// message with flag is used to mark the end of List operation and start of Watch
+ ///
+ /// Observable type
+ ///
+ ///
+ IObservable> GetResource(ResourceStreamType type, TOptions options);
+ }
+}
diff --git a/src/KubernetesClient.Informers/IKubernetesInformer.cs b/src/KubernetesClient.Informers/IKubernetesInformer.cs
new file mode 100644
index 000000000..d08243561
--- /dev/null
+++ b/src/KubernetesClient.Informers/IKubernetesInformer.cs
@@ -0,0 +1,10 @@
+namespace k8s.Informers
+{
+ ///
+ /// An informer that serves kubernetes resources
+ ///
+ /// The type of Kubernetes resource
+ public interface IKubernetesInformer : IInformer, IInformer where TResource : IKubernetesObject
+ {
+ }
+}
diff --git a/src/KubernetesClient.Informers/IKubernetesInformerOptionsBuilder.cs b/src/KubernetesClient.Informers/IKubernetesInformerOptionsBuilder.cs
new file mode 100644
index 000000000..e1518611c
--- /dev/null
+++ b/src/KubernetesClient.Informers/IKubernetesInformerOptionsBuilder.cs
@@ -0,0 +1,18 @@
+using System.Collections.Generic;
+
+namespace k8s.Informers
+{
+ public interface IKubernetesInformerOptionsBuilder
+ {
+ KubernetesInformerOptionsBuilder NamespaceEquals(string value);
+ KubernetesInformerOptionsBuilder LabelEquals(string label, string value);
+ KubernetesInformerOptionsBuilder LabelNotEquals(string label, string value);
+ KubernetesInformerOptionsBuilder LabelContains(string label, params string[] values);
+ KubernetesInformerOptionsBuilder LabelContains(string label, ICollection values);
+ KubernetesInformerOptionsBuilder LabelDoesNotContains(string label, params string[] values);
+ KubernetesInformerOptionsBuilder LabelDoesNotContains(string label, ICollection values);
+ KubernetesInformerOptionsBuilder HasLabel(string label);
+ KubernetesInformerOptionsBuilder DoesNotHaveLabel(string label);
+ KubernetesInformerOptions Build();
+ }
+}
diff --git a/src/KubernetesClient.Informers/KubernetesClient.Informers.csproj b/src/KubernetesClient.Informers/KubernetesClient.Informers.csproj
new file mode 100644
index 000000000..d7cf945e0
--- /dev/null
+++ b/src/KubernetesClient.Informers/KubernetesClient.Informers.csproj
@@ -0,0 +1,35 @@
+
+
+
+ The Kubernetes Project Authors
+ 2017 The Kubernetes Project Authors
+ Client library for the Kubernetes open source container orchestrator.
+ https://www.apache.org/licenses/LICENSE-2.0
+ https://github.com/kubernetes-client/csharp
+ https://raw.githubusercontent.com/kubernetes/kubernetes/master/logo/logo.png
+ kubernetes;docker;containers;
+ netstandard2.0
+ k8s.Informers
+ true
+ ..\KubernetesClient\kubernetes-client.snk
+ true
+ 1701;1702;1591;1570;1572;1573;1574
+
+
+ true
+
+
+ true
+ snupkg
+ 8
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/KubernetesClient.Informers/KubernetesInformer.cs b/src/KubernetesClient.Informers/KubernetesInformer.cs
new file mode 100644
index 000000000..d4b9775d9
--- /dev/null
+++ b/src/KubernetesClient.Informers/KubernetesInformer.cs
@@ -0,0 +1,176 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Reactive.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using k8s.Informers.FaultTolerance;
+using k8s.Informers.Notifications;
+using k8s.Models;
+using Microsoft.Rest.TransientFaultHandling;
+using RetryPolicy = k8s.Informers.FaultTolerance.RetryPolicy;
+
+namespace k8s.Informers
+{
+ ///
+ /// An implementation of Kubernetes informer that talks to Kubernetes API Server
+ ///
+ /// The type of Kubernetes resource
+ public class KubernetesInformer : IKubernetesInformer where TResource : IKubernetesObject
+ {
+ private readonly IKubernetes _kubernetes;
+ private readonly Func _restartOnCompletion;
+ private readonly RetryPolicy _retryPolicy;
+
+ public KubernetesInformer(IKubernetes kubernetes, RetryPolicy retryPolicy = null) : this(kubernetes, retryPolicy, () => true)
+ {
+ }
+
+ public KubernetesInformer(IKubernetes kubernetes, RetryPolicy retryPolicy, Func restartOnCompletion)
+ {
+ _kubernetes = kubernetes;
+ _restartOnCompletion = restartOnCompletion;
+ _retryPolicy = retryPolicy ?? DefaultRetryPolicy;
+ }
+
+ private static RetryPolicy DefaultRetryPolicy => new RetryPolicy(
+ (exception, retryAttempt) => exception.IsTransient(),
+ retryAttempt => TimeSpan.FromSeconds(Math.Min(Math.Pow(2, retryAttempt), 30)));
+
+ public IObservable> GetResource(ResourceStreamType type) => GetResource(type, KubernetesInformerOptions.Default);
+
+ public IObservable> GetResource(ResourceStreamType type, KubernetesInformerOptions options)
+ {
+ return new KubernetesInformerEmitter(this, options, type).GetObservable();
+ }
+
+ private class KubernetesInformerEmitter
+ {
+ private readonly KubernetesInformerOptions _options;
+ private readonly KubernetesInformer _parent;
+ private readonly ResourceStreamType _type;
+ private string _resourceVersion;
+
+ public KubernetesInformerEmitter(KubernetesInformer parent, KubernetesInformerOptions options, ResourceStreamType type)
+ {
+ _parent = parent;
+ _options = options;
+ _type = type;
+ }
+
+ public IObservable> GetObservable()
+ {
+ var result = Observable.Empty>();
+ if (_type.HasFlag(ResourceStreamType.List))
+ {
+ result = result.Concat(List());
+ }
+
+ if (_type.HasFlag(ResourceStreamType.Watch))
+ {
+ result = result.Concat(Watch());
+ }
+
+ return result;
+ }
+
+ private IObservable> List()
+ {
+ return Observable.Create>(async (observer, cancellationToken) =>
+ {
+ var response = await _parent._kubernetes.ListWithHttpMessagesAsync(
+ _options.Namespace,
+ resourceVersion: _resourceVersion,
+ labelSelector: _options.LabelSelector,
+ cancellationToken: cancellationToken).ConfigureAwait(false);
+ if (!response.Response.IsSuccessStatusCode)
+ {
+ throw new HttpRequestWithStatusException("Web server replied with error code") { StatusCode = response.Response.StatusCode };
+ }
+
+ var listKubernetesObject = response.Body;
+ _resourceVersion = listKubernetesObject.Metadata.ResourceVersion;
+ var items = listKubernetesObject.Items ?? new List();
+ foreach (var item in items.ToReset(_type == ResourceStreamType.ListWatch))
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ break;
+ }
+ observer.OnNext(item);
+ }
+ })
+ .WithRetryPolicy(_parent._retryPolicy);
+ }
+
+ private IObservable> Watch()
+ {
+ return Observable.Create>(async (observer, cancellationToken) =>
+ {
+ var result = await _parent._kubernetes.ListWithHttpMessagesAsync(
+ _options.Namespace,
+ watch: true,
+ allowWatchBookmarks: true,
+ resourceVersion: _resourceVersion,
+ labelSelector: _options.LabelSelector,
+ cancellationToken: cancellationToken
+ ).ConfigureAwait(false);
+ if (!result.Response.IsSuccessStatusCode)
+ {
+ throw new HttpRequestWithStatusException("Web server replied with error code") { StatusCode = result.Response.StatusCode };
+ }
+ return Task.FromResult(result)
+ .Watch()
+ .SelectMany(x => // this is not a one to one mapping as some events cause side effects but don't propagate, so we need SelectMany
+ {
+ if (x.Object is IMetadata status && status.Metadata.ResourceVersion != null)
+ {
+ _resourceVersion = status.Metadata.ResourceVersion;
+ }
+ switch (x.Type)
+ {
+ case WatchEventType.Added:
+ return new[] { x.Object.ToResourceEvent(EventTypeFlags.Add) };
+ case WatchEventType.Deleted:
+ return new[] { x.Object.ToResourceEvent(EventTypeFlags.Delete) };
+ case WatchEventType.Modified:
+ return new[] { x.Object.ToResourceEvent(EventTypeFlags.Modify) };
+ case WatchEventType.Bookmark:
+ // we're just updating resource version
+ break;
+ case WatchEventType.Error:
+ default:
+ if (x.Object is V1Status error)
+ {
+ throw new KubernetesException(error);
+ }
+
+ throw new KubernetesException($"Received unknown error in watch: {x.Object}");
+ }
+
+ return Enumerable.Empty>();
+ })
+ .Select(x => x)
+ // watch should never "complete" on it's own unless there's a critical exception, except in testing scenarios
+ .Concat(_parent._restartOnCompletion() ? Observable.Defer(Watch) : Observable.Empty>())
+ .Subscribe(observer);
+ })
+ .Catch, Exception>(exception =>
+ {
+ // handle case when we tried rewatching by specifying resource version to resume after disconnect,
+ // but resource is too stale - should try to resubscribe from scratch
+ if (exception is HttpRequestWithStatusException httpException && httpException.StatusCode == HttpStatusCode.Gone && _resourceVersion != null)
+ {
+ // we tried resuming but failed, restart from scratch
+ _resourceVersion = null;
+ return GetObservable();
+ }
+
+ return Observable.Throw>(exception);
+ })
+ .WithRetryPolicy(_parent._retryPolicy);
+ }
+ }
+ }
+}
diff --git a/src/KubernetesClient.Informers/KubernetesInformerOptions.cs b/src/KubernetesClient.Informers/KubernetesInformerOptions.cs
new file mode 100644
index 000000000..14c160d53
--- /dev/null
+++ b/src/KubernetesClient.Informers/KubernetesInformerOptions.cs
@@ -0,0 +1,77 @@
+using System;
+
+namespace k8s.Informers
+{
+ public class KubernetesInformerOptions // theoretically this could be done with QObservable, but parsing expression trees is too much overhead at this point
+ : IEquatable
+ {
+ public bool Equals(KubernetesInformerOptions other)
+ {
+ if (ReferenceEquals(null, other))
+ {
+ return false;
+ }
+
+ if (ReferenceEquals(this, other))
+ {
+ return true;
+ }
+
+ return Namespace == other.Namespace && LabelSelector == other.LabelSelector;
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (ReferenceEquals(null, obj))
+ {
+ return false;
+ }
+
+ if (ReferenceEquals(this, obj))
+ {
+ return true;
+ }
+
+ if (obj.GetType() != this.GetType())
+ {
+ return false;
+ }
+ return Equals((KubernetesInformerOptions)obj);
+ }
+
+ public override int GetHashCode()
+ {
+ unchecked
+ {
+ return ((Namespace != null ? Namespace.GetHashCode() : 0) * 397) ^ (LabelSelector != null ? LabelSelector.GetHashCode() : 0);
+ }
+ }
+
+ public static bool operator ==(KubernetesInformerOptions left, KubernetesInformerOptions right)
+ {
+ return Equals(left, right);
+ }
+
+ public static bool operator !=(KubernetesInformerOptions left, KubernetesInformerOptions right)
+ {
+ return !Equals(left, right);
+ }
+
+ public static IKubernetesInformerOptionsBuilder Builder => new KubernetesInformerOptionsBuilder();
+
+ internal KubernetesInformerOptions()
+ {
+ }
+
+ ///
+ /// The default options for kubernetes informer, without any server side filters
+ ///
+ public static KubernetesInformerOptions Default { get; } = new KubernetesInformerOptions();
+
+ ///
+ /// The namespace to which observable stream should be filtered
+ ///
+ public string Namespace { get; internal set; }
+ public string LabelSelector { get; internal set; }
+ }
+}
diff --git a/src/KubernetesClient.Informers/KubernetesInformerOptionsBuilder.cs b/src/KubernetesClient.Informers/KubernetesInformerOptionsBuilder.cs
new file mode 100644
index 000000000..4302fbdb9
--- /dev/null
+++ b/src/KubernetesClient.Informers/KubernetesInformerOptionsBuilder.cs
@@ -0,0 +1,84 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+
+namespace k8s.Informers
+{
+ public class KubernetesInformerOptionsBuilder : IKubernetesInformerOptionsBuilder
+ {
+ private string _namespace;
+ private readonly List _labelSelectors = new List();
+
+ internal KubernetesInformerOptionsBuilder()
+ {
+ }
+
+ public KubernetesInformerOptionsBuilder NamespaceEquals(string value)
+ {
+ _namespace = value;
+ return this;
+ }
+ public KubernetesInformerOptionsBuilder LabelEquals(string label, string value) => LabelContains(label, new[] { value });
+
+ public KubernetesInformerOptionsBuilder LabelContains(string label, params string[] values) => LabelContains(label, (ICollection)values);
+
+ public KubernetesInformerOptionsBuilder LabelContains(string label, ICollection values)
+ {
+ switch (values.Count)
+ {
+ case 0:
+ throw new InvalidOperationException($"{nameof(values)} cannot be empty");
+ case 1:
+ _labelSelectors.Add($"{label}={values.First()}");
+ break;
+ default:
+ _labelSelectors.Add($"{label} in ({string.Join(",", values)})");
+ break;
+ }
+
+ return this;
+ }
+ public KubernetesInformerOptionsBuilder LabelNotEquals(string label, string value) => LabelDoesNotContains(label, new[] { value });
+
+ public KubernetesInformerOptionsBuilder LabelDoesNotContains(string label, params string[] values) => LabelDoesNotContains(label, (ICollection)values);
+ public KubernetesInformerOptionsBuilder LabelDoesNotContains(string label, ICollection values)
+ {
+ switch (values.Count)
+ {
+ case 0:
+ throw new InvalidOperationException($"{nameof(values)} cannot be empty");
+ case 1:
+ _labelSelectors.Add($"{label}!={values.First()}");
+ break;
+ default:
+ _labelSelectors.Add($"{label} notin ({string.Join(",", values)})");
+ break;
+ }
+
+ return this;
+ }
+ public KubernetesInformerOptionsBuilder HasLabel(string label)
+ {
+ _labelSelectors.Add(label);
+ return this;
+ }
+ public KubernetesInformerOptionsBuilder DoesNotHaveLabel(string label)
+ {
+ _labelSelectors.Add($"!{label}");
+ return this;
+ }
+
+ public KubernetesInformerOptions Build()
+ {
+ string labelSelector = null;
+ if (_labelSelectors.Any())
+ {
+ _labelSelectors.Sort();
+ labelSelector = string.Join(",", _labelSelectors);
+ }
+
+ return new KubernetesInformerOptions { Namespace = _namespace, LabelSelector = labelSelector };
+ }
+
+ }
+}
diff --git a/src/KubernetesClient.Informers/KubernetesKeyVersionEqualityComparer.cs b/src/KubernetesClient.Informers/KubernetesKeyVersionEqualityComparer.cs
new file mode 100644
index 000000000..78281a033
--- /dev/null
+++ b/src/KubernetesClient.Informers/KubernetesKeyVersionEqualityComparer.cs
@@ -0,0 +1,42 @@
+using System;
+using System.Collections.Generic;
+using k8s.Models;
+
+namespace k8s.Informers
+{
+ public class KubernetesNameVersionEqualityComparer : IEqualityComparer where T : IMetadata
+ {
+ private KubernetesNameVersionEqualityComparer()
+ {
+ }
+
+ public static KubernetesNameVersionEqualityComparer Instance => new KubernetesNameVersionEqualityComparer();
+
+ public bool Equals(T x, T y)
+ {
+ if (x?.Metadata?.Name == null || y?.Metadata?.Name == null || x.Metadata.ResourceVersion == null || y.Metadata.ResourceVersion == null)
+ {
+ return false;
+ }
+ return x.Metadata.Name.Equals(y.Metadata.Name) && x.Metadata.ResourceVersion.Equals(y.Metadata.ResourceVersion);
+ }
+
+ public int GetHashCode(T obj)
+ {
+ if (obj == null)
+ {
+ throw new ArgumentNullException(nameof(obj));
+ }
+ unchecked
+ {
+ if (obj.Metadata?.Name == null || obj.Metadata?.ResourceVersion == null)
+ {
+ return 0;
+ }
+ var hashCode = obj.Metadata.Name.GetHashCode();
+ hashCode = (hashCode * 397) ^ obj.Metadata.ResourceVersion.GetHashCode();
+ return hashCode;
+ }
+ }
+ }
+}
diff --git a/src/KubernetesClient.Informers/Notifications/EventTypeFlags.cs b/src/KubernetesClient.Informers/Notifications/EventTypeFlags.cs
new file mode 100644
index 000000000..2fe091572
--- /dev/null
+++ b/src/KubernetesClient.Informers/Notifications/EventTypeFlags.cs
@@ -0,0 +1,65 @@
+using System;
+
+namespace k8s.Informers.Notifications
+{
+ ///
+ /// Denotes flags that specify how the event in resource observable stream should be interpreted.
+ /// Note that more then one value is usually set - use HasFlag instead of equals
+ ///
+ [Flags]
+ public enum EventTypeFlags
+ {
+ ///
+ /// A resource was added
+ ///
+ Add = 1,
+
+ ///
+ /// A resource was deleted
+ ///
+ Delete = 2,
+
+ ///
+ /// A resource was modified
+ ///
+ Modify = 4,
+
+ ///
+ /// State of the resource has not changed and the intent of the message is inform of current state
+ ///
+ Current = 8,
+
+ ///
+ /// The current state of the resource is published as part of regular synchronization interval
+ ///
+ Sync = 16,
+
+ ///
+ /// The state of the resource has been reset, and all subscribers should reset their existing cache values based on the new
+ ///
+ Reset = 32,
+
+ ///
+ /// The start of a sequence of reset messages, usually used to mark the start of a List operation
+ ///
+ ResetStart = 64,
+
+ ///
+ /// The end of a sequence of reset messages, usually used to mark the start of a List operation
+ ///
+ ResetEnd = 128,
+
+ ///
+ /// Marks the boundary between empty list operation and the start of watch in an observable stream
+ ///
+ ResetEmpty = 256,
+
+ ///
+ /// The event was computed through discrepancy reconciliation with server rather then explicit event.
+ /// This can occur when relisting after reconnect to resource server when there are items in local cache that
+ /// don't match what is in cache, so there must have been updates that were missed. By comparing current state
+ /// and old state (cache), we can compute the kind of events that we missed and emit them with this flag
+ ///
+ Computed = 512
+ }
+}
diff --git a/src/KubernetesClient.Informers/Notifications/ResourceEvent.cs b/src/KubernetesClient.Informers/Notifications/ResourceEvent.cs
new file mode 100644
index 000000000..e2aef2709
--- /dev/null
+++ b/src/KubernetesClient.Informers/Notifications/ResourceEvent.cs
@@ -0,0 +1,146 @@
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Text;
+
+namespace k8s.Informers.Notifications
+{
+ ///
+ ///
+ ///
+ [DebuggerStepThrough]
+ public struct ResourceEvent
+ {
+ public ResourceEvent(EventTypeFlags eventFlags, TResource value, TResource oldValue = default)
+ {
+ if (eventFlags.HasFlag(EventTypeFlags.ResetEmpty) || eventFlags.HasFlag(EventTypeFlags.ResetEmpty))
+ {
+ eventFlags |= EventTypeFlags.ResetStart | EventTypeFlags.ResetEnd;
+ }
+
+ if (eventFlags.HasFlag(EventTypeFlags.ResetEnd) || eventFlags.HasFlag(EventTypeFlags.ResetStart))
+ {
+ eventFlags |= EventTypeFlags.Reset;
+ }
+
+ if (eventFlags.HasFlag(EventTypeFlags.Reset) || eventFlags.HasFlag(EventTypeFlags.Sync))
+ {
+ eventFlags |= EventTypeFlags.Current;
+ }
+
+ Value = value;
+ OldValue = oldValue;
+ EventFlags = eventFlags;
+ }
+
+ public EventTypeFlags EventFlags { get; }
+
+ public TResource OldValue { get; }
+ public TResource Value { get; }
+ public static ResourceEvent ResetEmpty { get; } = new ResourceEvent(EventTypeFlags.ResetEmpty, default);
+
+ public override string ToString()
+ {
+ var includePrefix = Value != null && OldValue != null;
+
+ var sb = new StringBuilder();
+ sb.AppendLine();
+ sb.Append(" ");
+ sb.Append(EventFlags);
+ sb.Append(": [");
+ if (Value != null)
+ {
+ if (includePrefix)
+ {
+ sb.Append(nameof(Value));
+ sb.Append("{ ");
+ }
+
+ sb.Append(Value);
+ if (includePrefix)
+ {
+ sb.Append("} ");
+ }
+ }
+
+ if (OldValue != null)
+ {
+ if (includePrefix)
+ {
+ sb.Append(nameof(OldValue));
+ sb.Append("{ ");
+ }
+
+ sb.Append(OldValue);
+ if (includePrefix)
+ {
+ sb.Append("} ");
+ }
+ }
+
+ sb.Append("]");
+ return sb.ToString();
+ }
+ }
+
+ public static class ResourceEventExtensions
+ {
+ public static ResourceEvent ToResourceEvent(this T obj, EventTypeFlags typeFlags, T oldValue = default)
+ {
+ if (typeFlags.HasFlag(EventTypeFlags.Delete) && oldValue == null)
+ {
+ oldValue = obj;
+ }
+ return new ResourceEvent(typeFlags, obj, oldValue);
+ }
+
+ ///
+ /// Converts a list of objects to a resource reset list event block. Every item is of type ,
+ /// with first and last elements also having and
+ /// set respectively. If is empty and is set,
+ ///
+ /// The source enumerable
+ ///
+ /// If the resulting will contain a single
+ /// with no object value and flag set
+ ///
+ /// The type of resource
+ /// The resulting enumerable of reset events
+ public static IEnumerable> ToReset(this IEnumerable source, bool emitEmpty = false)
+ {
+ var i = 0;
+ using var enumerator = source.GetEnumerator();
+ if (!enumerator.MoveNext())
+ {
+ if (emitEmpty)
+ {
+ yield return new ResourceEvent(EventTypeFlags.ResetEmpty, default);
+ }
+ yield break;
+ }
+
+ var current = enumerator.Current;
+ while (enumerator.MoveNext())
+ {
+ if (i == 0)
+ {
+ yield return current.ToResourceEvent(EventTypeFlags.ResetStart);
+ }
+ else
+ {
+ yield return current.ToResourceEvent(EventTypeFlags.Reset);
+ }
+ current = enumerator.Current;
+ i++;
+ }
+
+ if (i == 0)
+ {
+ yield return current.ToResourceEvent(EventTypeFlags.ResetStart | EventTypeFlags.ResetEnd);
+ }
+ else
+ {
+ yield return current.ToResourceEvent(EventTypeFlags.ResetEnd);
+ }
+ }
+ }
+}
diff --git a/src/KubernetesClient.Informers/Properties/Assembly.cs b/src/KubernetesClient.Informers/Properties/Assembly.cs
new file mode 100644
index 000000000..bd0cfa062
--- /dev/null
+++ b/src/KubernetesClient.Informers/Properties/Assembly.cs
@@ -0,0 +1,3 @@
+using System.Runtime.CompilerServices;
+
+[assembly: InternalsVisibleTo("KubernetesClient.Informers.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001004917ad4e106c573cc5dbb3b7456de8b6c07128ae43de292752b339eb423de60f0db6a6c0cb21e6640fc672cc84df4a772db85df1505e5dd08c98d5d115eed7a7b59c67fe1f4b32fa716b7177743a417b3fcf88606861650a81f565ac6614abbf8b6b7710436edb497a83974165f9fe6995b70af13047a110bf63cdbfa45f89ac")]
diff --git a/src/KubernetesClient.Informers/ResourceObservableExtensions.cs b/src/KubernetesClient.Informers/ResourceObservableExtensions.cs
new file mode 100644
index 000000000..434852924
--- /dev/null
+++ b/src/KubernetesClient.Informers/ResourceObservableExtensions.cs
@@ -0,0 +1,292 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using k8s.Informers.Cache;
+using k8s.Informers.Notifications;
+
+namespace k8s.Informers
+{
+ public static class ResourceObservableExtensions
+ {
+
+
+ public static IObservable> WithResets(this IObservable> source, Func>, IEnumerable>> action)
+ {
+ return source.WithResets(action, e => e);
+ }
+
+ public static IObservable WithResets(this IObservable> source, Func>, IEnumerable>> resetSelector, Func, TOut> itemSelector)
+ {
+ return Observable.Create(observer =>
+ {
+ var resetBuffer = new List>();
+
+ void FlushBuffer()
+ {
+ if (!resetBuffer.Any())
+ {
+ return;
+ }
+
+ foreach (var item in resetSelector(resetBuffer))
+ {
+ observer.OnNext(itemSelector(item));
+ }
+ resetBuffer.Clear();
+ }
+
+ void OnComplete()
+ {
+ FlushBuffer();
+ observer.OnCompleted();
+ }
+
+ void OnError(Exception e)
+ {
+ observer.OnError(e);
+ }
+
+ var upstreamSubscription = source
+ .Subscribe(notification =>
+ {
+ if (notification.EventFlags.HasFlag(EventTypeFlags.Reset))
+ {
+ resetBuffer.Add(notification);
+ if (!notification.EventFlags.HasFlag(EventTypeFlags.ResetEnd)) // continue buffering till we reach the end of list window
+ {
+ return;
+ }
+ }
+
+ if (notification.EventFlags.HasFlag(EventTypeFlags.ResetEnd))
+ {
+ FlushBuffer();
+ return;
+ }
+
+ if (!notification.EventFlags.HasFlag(EventTypeFlags.Reset) && resetBuffer.Count > 0)
+ {
+ FlushBuffer();
+ }
+
+ observer.OnNext(itemSelector(notification));
+ }, OnError, OnComplete);
+ return StableCompositeDisposable.Create(upstreamSubscription, Disposable.Create(OnComplete));
+ })
+ .ObserveOn(Scheduler.Immediate);
+ }
+
+ ///
+ /// Synchronizes the specified cache with resource event stream such that cache is maintained up to date.
+ ///
+ /// The source sequence
+ /// The cache to synchronize
+ /// The key selector function
+ /// The type of key
+ /// The type of resource
+ /// Source sequence wrapped into , which allows downstream consumers to synchronize themselves with cache version
+ public static IObservable>> SynchronizeCache(
+ this IObservable> source,
+ ICache cache,
+ Func keySelector,
+ Func mapper = null)
+ {
+ if (mapper == null)
+ {
+ mapper = (oldResource, newResource) => newResource;
+ }
+
+ return Observable.Defer(() =>
+ {
+ long msgNum = 0;
+
+ return source
+ .Do(_ => msgNum++)
+ .WithResets(events =>
+ {
+ var reset = events
+ .Select(x => x.Value)
+ .Where(x => x != null)
+ .ToDictionary(keySelector, x => x);
+
+ cache.Reset(reset);
+ cache.Version += events.Count;
+ return events;
+ }, notification =>
+ {
+ if (notification.EventFlags.HasFlag(EventTypeFlags.Reset))
+ {
+ }
+ else if (!notification.EventFlags.HasFlag(EventTypeFlags.Delete))
+ {
+ cache.Version++;
+ if (notification.Value != null)
+ {
+ var key = keySelector(notification.Value);
+ if (cache.TryGetValue(key, out var existing))
+ {
+ notification = new ResourceEvent(notification.EventFlags, mapper(existing, notification.Value), existing);
+ }
+ cache[keySelector(notification.Value)] = notification.Value;
+ }
+ }
+ else
+ {
+ cache.Remove(keySelector(notification.OldValue));
+ }
+
+ return new CacheSynchronized>(msgNum, cache.Version, notification);
+ });
+ });
+ }
+
+
+ public static IObservable> ComputeMissedEventsBetweenResets(this IObservable> source, Func keySelector, IEqualityComparer comparer)
+ {
+ return Observable.Create>(observer =>
+ {
+ var cache = new SimpleCache();
+ var cacheSynchronized = false;
+ return source
+ .WithResets(resetBuffer =>
+ {
+ if (!cacheSynchronized)
+ {
+ return resetBuffer;
+ }
+
+ var cacheSnapshot = cache.Snapshot();
+ var newKeys = resetBuffer
+ .Where(x => x.Value != null)
+ .Select(x => keySelector(x.Value))
+ .ToHashSet();
+
+ var addedEntities = resetBuffer
+ .Select(x => x.Value)
+ .Where(x => x != null && !cacheSnapshot.ContainsKey(keySelector(x)))
+ .Select(x => x.ToResourceEvent(EventTypeFlags.Add | EventTypeFlags.Computed))
+ .ToList();
+ var addedKeys = addedEntities
+ .Select(x => keySelector(x.Value))
+ .ToHashSet();
+
+ var deletedEntities = cacheSnapshot
+ .Where(x => !newKeys.Contains(x.Key))
+ .Select(x => x.Value.ToResourceEvent(EventTypeFlags.Delete | EventTypeFlags.Computed))
+ .ToList();
+ var deletedKeys = deletedEntities
+ .Select(x => keySelector(x.Value))
+ .ToHashSet();
+
+ // we can only compute updates if we are given a proper comparer to determine equality between objects
+ // if not provided, will be sent downstream as just part of reset
+ var updatedEntities = new List>();
+ if (comparer != null)
+ {
+ var previouslyKnownEntitiesInResetWindowKeys = cacheSnapshot
+ .Keys
+ .Intersect(resetBuffer.Select(x => keySelector(x.Value)));
+
+ updatedEntities = resetBuffer
+ .Where(x => previouslyKnownEntitiesInResetWindowKeys.Contains(keySelector(x.Value)))
+ .Select(x => x.Value) // stuff in buffer that also existed in cache (by key)
+ .Except(cacheSnapshot.Select(x => x.Value), comparer)
+ .Select(x => x.ToResourceEvent(EventTypeFlags.Modify | EventTypeFlags.Computed))
+ .ToList();
+ }
+
+ var updatedKeys = updatedEntities
+ .Select(x => keySelector(x.Value))
+ .ToHashSet();
+
+ var resetEntities = resetBuffer
+ .Select(x => x.Value)
+ .Where(x => x != null &&
+ !addedKeys.Contains(keySelector(x)) &&
+ !deletedKeys.Contains(keySelector(x)) &&
+ !updatedKeys.Contains(keySelector(x)))
+ .ToReset()
+ .ToList();
+
+ return deletedEntities
+ .Union(addedEntities)
+ .Union(updatedEntities)
+ .Union(resetEntities);
+ })
+ .SynchronizeCache(cache, keySelector)
+ .Do(msg => { cacheSynchronized = true; })
+ .Select(x => x.Value)
+ .ObserveOn(Scheduler.Immediate)
+ .Subscribe(observer);
+ });
+ }
+
+ ///
+ /// Injects a of type into the observable for each item produced
+ /// by the operation from
+ ///
+ /// The source sequence that will have sync messages appended
+ /// The timespan interval at which the messages should be produced
+ /// The type of resource
+ /// Original sequence with resync applied
+ public static IObservable> Resync(this IObservable> source, TimeSpan timeSpan, IScheduler scheduler = null)
+ {
+ scheduler ??= DefaultScheduler.Instance;
+ return Observable.Create>(observer =>
+ {
+ var timerSubscription = Observable
+ .Interval(timeSpan, scheduler)
+ .SelectMany(_ => source
+ .TakeUntil(x => x.EventFlags.HasFlag(EventTypeFlags.ResetEnd))
+ .Do(x =>
+ {
+ if (!x.EventFlags.HasFlag(EventTypeFlags.Reset))
+ {
+ throw new InvalidOperationException("Resync was applied to an observable sequence that does not issue a valid List event block when subscribed to");
+ }
+ })
+ .Select(x => x.Value.ToResourceEvent(EventTypeFlags.Sync)))
+ .Subscribe(observer);
+ // this ensures that both timer and upstream subscription is closed when subscriber disconnects
+ var sourceSubscription = source.Subscribe(
+ observer.OnNext,
+ observer.OnError,
+ () =>
+ {
+ observer.OnCompleted();
+ timerSubscription.Dispose();
+ });
+ return StableCompositeDisposable.Create(timerSubscription, sourceSubscription);
+ });
+ }
+
+ ///
+ /// Wraps an instance of as by using the same
+ /// set of for every subscription
+ ///
+ /// The original instance of
+ /// The options to use
+ /// The type of resource
+ ///
+ ///
+ public static IInformer WithOptions(this IInformer optionedInformer, TOptions options) =>
+ new WrappedOptionsInformer(optionedInformer, options);
+
+ private class WrappedOptionsInformer : IInformer
+ {
+ private readonly IInformer _informer;
+ private readonly TOptions _options;
+
+ public WrappedOptionsInformer(IInformer informer, TOptions options)
+ {
+ _informer = informer;
+ _options = options;
+ }
+
+ public IObservable> GetResource(ResourceStreamType type) => _informer.GetResource(type, _options);
+ }
+ }
+}
diff --git a/src/KubernetesClient.Informers/ResourceStreamType.cs b/src/KubernetesClient.Informers/ResourceStreamType.cs
new file mode 100644
index 000000000..bccf8c4db
--- /dev/null
+++ b/src/KubernetesClient.Informers/ResourceStreamType.cs
@@ -0,0 +1,26 @@
+using System;
+
+namespace k8s.Informers
+{
+ ///
+ /// The type of resource observable stream that specifies whether to return current state of resource, observe changes, or both
+ ///
+ [Flags]
+ public enum ResourceStreamType
+ {
+ ///
+ /// A Cold observable that returns current state of resources and then completes
+ ///
+ List = 1,
+
+ ///
+ /// A Hot observable that publishes a list of changes as they happen
+ ///
+ Watch = 2,
+
+ ///
+ /// A Hot observable that Lists current state of resources followed by watch.
+ ///
+ ListWatch = List | Watch
+ }
+}
diff --git a/src/KubernetesClient.Informers/SharedInformer.cs b/src/KubernetesClient.Informers/SharedInformer.cs
new file mode 100644
index 000000000..4a46875a5
--- /dev/null
+++ b/src/KubernetesClient.Informers/SharedInformer.cs
@@ -0,0 +1,212 @@
+using System;
+using System.Diagnostics;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Reactive.Subjects;
+using System.Threading;
+using System.Threading.Tasks;
+using k8s.Informers.Cache;
+using k8s.Informers.Notifications;
+using Microsoft.Extensions.Logging;
+
+namespace k8s.Informers
+{
+ ///
+ /// Wraps a single master informer (such as Kubernetes API connection) for rebroadcast to multiple internal subscribers
+ /// and is responsible for managing and synchronizing cache
+ ///
+ ///
+ ///
+ /// Allows rebroadcasting of single informer provided by masterInformer to multiple internal subscribers.
+ /// Lazy loading semantics apply where subscription to master informer is only established when there's at least one attached observer, and it is closed if all observers disconnect
+ ///
+ ///
+ /// is considered the sole owner of managing the cache. Since cache is used as the source of truth for "List" operations of any downstream subscribers,
+ /// any attempt to modify cache externally will result in desynchronization. Shared informer will only start emitting events downstream after cache has been synchronized
+ /// (after first List).
+ ///
+ ///
+ ///
+ public class SharedInformer : IInformer
+ {
+ private readonly ICache _cache;
+ private readonly Func _keySelector;
+ private readonly object _lock = new object();
+ private readonly ILogger _logger;
+ private readonly CountdownEvent _waitingSubscribers = new CountdownEvent(0);
+ private TaskCompletionSource _cacheSynchronized = new TaskCompletionSource();
+ private readonly IConnectableObservable>> _masterObservable;
+ private readonly IScheduler _masterScheduler;
+ private IDisposable _masterSubscription;
+ private int _subscribers;
+
+ public SharedInformer(IInformer masterInformer, ILogger logger, Func keySelector)
+ : this(masterInformer, logger, keySelector, new SimpleCache())
+ {
+ }
+
+ public SharedInformer(IInformer masterInformer, ILogger logger, Func keySelector, ICache cache, IScheduler scheduler = null)
+ {
+ _cache = cache;
+ _masterScheduler = scheduler ?? new EventLoopScheduler();
+ _logger = logger;
+ _keySelector = keySelector;
+ _masterObservable = masterInformer
+ .GetResource(ResourceStreamType.ListWatch)
+ .ObserveOn(_masterScheduler)
+ .Do(x => _logger.LogTrace($"Received message from upstream {x}"))
+ .SynchronizeCache(_cache, _keySelector)
+ .Do(msg =>
+ {
+ // cache is synchronized as soon as we get at least one message past this point
+ _logger.LogTrace($"Cache v{cache.Version} synchronized: {msg} ");
+ _cacheSynchronized.TrySetResult(true);
+ _logger.LogTrace("_cacheSynchronized.TrySetResult(true)");
+ })
+ .Do(_ => YieldToWaitingSubscribers())
+ .ObserveOn(Scheduler.Immediate) // immediate ensures that all caches operations are done atomically
+ .ObserveOn(_masterScheduler)
+ .Catch>, Exception>(e =>
+ {
+ _cacheSynchronized.TrySetException(e);
+ // _cacheSynchronized.OnError(e);
+ return Observable.Throw>>(e);
+ })
+ .Finally(() => _cacheSynchronized.TrySetResult(false))
+ // .SubscribeOn(_masterScheduler)
+ .Publish();
+ }
+
+
+ public IObservable> GetResource(ResourceStreamType type) =>
+ Observable.Using(() => new EventLoopScheduler(), childScheduler =>
+ Observable.Defer(async () =>
+ {
+ AddSubscriber();
+ _logger.LogTrace("Subscriber awaiting cache synchronization before attaching");
+
+ var isCacheSynchronized = await _cacheSynchronized.Task.ConfigureAwait(false);
+ if (!isCacheSynchronized) // really this only happens if the reset is the master completes before first reset, in which case the downstream subscriber gets nothing
+ {
+ return Observable.Empty>();
+ }
+ // we use lock to pause any processing of the broadcaster while we're attaching to the stream so proper alignment can be made
+
+ _logger.LogTrace("Subscriber attaching to broadcaster");
+
+ return Observable.Create>(observer =>
+ {
+ var broadcasterAttachment = Disposable.Empty;
+ var cacheVersion = _cache.Version;
+ if (type.HasFlag(ResourceStreamType.List))
+ {
+ _logger.LogTrace($"Flushing contents of cache version {cacheVersion}");
+ _cache.Values
+ .ToReset(type == ResourceStreamType.ListWatch)
+ .ToObservable()
+ .Concat(Observable.Never>())
+ .ObserveOn(Scheduler.Immediate)
+ .Subscribe(observer);
+ }
+
+ if (type.HasFlag(ResourceStreamType.Watch))
+ broadcasterAttachment = _masterObservable
+ // we could be ahead of broadcaster because we initialized from cache which gets updated before the message are sent to broadcaster
+ // this logic realigns us at the correct point with the broadcaster
+ .Do(x => _logger.LogTrace($"Received from broadcaster {x}"))
+ .SkipWhile(x => x.MessageNumber <= cacheVersion)
+ .Select(x => x.Value)
+ .Do(x => _logger.LogTrace($"Aligned with broadcaster {x}"))
+ .SubscribeOn(_masterScheduler)
+ .ObserveOn(childScheduler)
+ .Subscribe(observer, () =>
+ {
+ _logger.LogTrace("Child OnComplete");
+ RemoveSubscriber();
+ });
+ else
+ {
+ observer.OnCompleted();
+ }
+
+ // let broadcaster know we're done attaching to stream so it can resume it's regular work
+ _logger.LogTrace("Finished attaching to stream - signalling to resume");
+ lock (_lock)
+ {
+ _waitingSubscribers.Signal();
+ }
+
+ return broadcasterAttachment;
+ })
+ .ObserveOn(childScheduler)
+ .SubscribeOn(childScheduler);
+ })
+ .SubscribeOn(childScheduler) // ensures that when we attach master observer it's done on child thread, as we plan on awaiting cache synchronization
+ .Do(_ => _logger.LogTrace($"Shared informer out: {_}")));
+
+
+
+ [DebuggerStepThrough]
+ private void YieldToWaitingSubscribers()
+ {
+ _logger.LogTrace("Waiting for subscribers to attach to stream");
+ while (_waitingSubscribers.CurrentCount > 0)
+ {
+ // give a chance to any joining subscribers to realign with the broadcast stream
+ _waitingSubscribers.Wait(100);
+ }
+
+ _logger.LogTrace("Finished yielding to subscribers");
+ }
+
+ private void AddSubscriber()
+ {
+ // when child subscribers attach they need to be synchronized to the master stream
+ // this is allowed outside of "reset" event boundary.
+ // the broadcaster will yield to any _waitingSubscribers before resuming work
+ var shouldConnectMaster = false;
+ lock (_lock)
+ {
+ // need to do this under lock because we can't just increment if the lock is already set, and there's a
+ // risk of collision of two threads resetting to 1 at the same time
+ if (!_waitingSubscribers.TryAddCount())
+ {
+ _waitingSubscribers.Reset(1);
+ }
+
+ if (_subscribers == 0)
+ {
+ shouldConnectMaster = true;
+ }
+ _subscribers++;
+ }
+
+ if (shouldConnectMaster)
+ {
+ _masterSubscription = _masterObservable.Connect();
+ }
+ }
+
+ private void RemoveSubscriber()
+ {
+ try
+ {
+ _logger.LogTrace("Removing Subscriber!");
+ }
+ catch (Exception) // given the use of Observable.Using, in unit tests this may actually get called AFTER unit test completes when logger is already gone
+ {
+ }
+
+ lock (_lock)
+ {
+ _subscribers--;
+ if (_subscribers == 0)
+ {
+ _cacheSynchronized = new TaskCompletionSource(false);
+ _masterSubscription.Dispose();
+ }
+ }
+ }
+ }
+}
diff --git a/src/KubernetesClient.Informers/SharedKubernetesInformer.cs b/src/KubernetesClient.Informers/SharedKubernetesInformer.cs
new file mode 100644
index 000000000..d0ee1d259
--- /dev/null
+++ b/src/KubernetesClient.Informers/SharedKubernetesInformer.cs
@@ -0,0 +1,46 @@
+using System;
+using k8s.Informers.Cache;
+using k8s.Informers.Notifications;
+using k8s.Models;
+using Microsoft.Extensions.Logging;
+
+namespace k8s.Informers
+{
+ ///
+ /// Opens a single connection to API server with per unique
+ /// and attaches 1 or more internal subscriber to it. The connection is automatically opened if there is
+ /// at least one subscriber and closes if there are none
+ ///
+ /// The type of resource to monitor
+ public class SharedKubernetesInformer :
+ SharedOptionsInformer,
+ IKubernetesInformer
+ where TResource : IKubernetesObject, IMetadata
+ {
+ public SharedKubernetesInformer(KubernetesInformer masterInformer, ILoggerFactory loggerFactory)
+ : base(masterInformer, SharedKubernetesInformerFactory(loggerFactory, GetVersionPartitionedCacheFactory()))
+ {
+ }
+
+ public SharedKubernetesInformer(KubernetesInformer masterInformer, Func> cacheFactory, ILoggerFactory loggerFactory)
+ : base(masterInformer, SharedKubernetesInformerFactory(loggerFactory, cacheFactory))
+ {
+ }
+
+ ///
+ public IObservable> GetResource(ResourceStreamType type) => base.GetResource(type, KubernetesInformerOptions.Default);
+
+ private static Func> GetVersionPartitionedCacheFactory()
+ {
+ var partitionedSharedCache = new VersionPartitionedSharedCache(x => x.Metadata.Name, x => x.Metadata.ResourceVersion);
+ return () => partitionedSharedCache.CreatePartition();
+ }
+
+ private static Func, IInformer> SharedKubernetesInformerFactory(ILoggerFactory loggerFactory, Func> cacheFactory) =>
+ masterInformer => new SharedInformer(
+ masterInformer,
+ loggerFactory.CreateLogger>>(),
+ x => x.Metadata.Name,
+ cacheFactory());
+ }
+}
diff --git a/src/KubernetesClient.Informers/SharedOptionsInformer.cs b/src/KubernetesClient.Informers/SharedOptionsInformer.cs
new file mode 100644
index 000000000..90e1ab6d4
--- /dev/null
+++ b/src/KubernetesClient.Informers/SharedOptionsInformer.cs
@@ -0,0 +1,33 @@
+using System;
+using System.Collections.Concurrent;
+using k8s.Informers.Notifications;
+
+namespace k8s.Informers
+{
+ ///
+ /// Manages multiple for each unique set of and ensures subscriptions are attached to correct one
+ ///
+ ///
+ ///
+ public class SharedOptionsInformer : IInformer
+ {
+ private readonly IInformer _masterInformer;
+ private readonly Func, IInformer> _sharedInformerFactory;
+ private readonly ConcurrentDictionary> _sharedInformers = new ConcurrentDictionary>();
+
+ public SharedOptionsInformer(
+ IInformer masterInformer,
+ Func, IInformer> sharedInformerFactory)
+ {
+ _masterInformer = masterInformer;
+ _sharedInformerFactory = sharedInformerFactory;
+ }
+
+
+ public IObservable> GetResource(ResourceStreamType type, TOptions options)
+ {
+ var sharedInformer = _sharedInformers.GetOrAdd(options, opt => _sharedInformerFactory(_masterInformer.WithOptions(opt)));
+ return sharedInformer.GetResource(type);
+ }
+ }
+}
diff --git a/src/KubernetesClient.Informers/version.json b/src/KubernetesClient.Informers/version.json
new file mode 100644
index 000000000..8044b6cf2
--- /dev/null
+++ b/src/KubernetesClient.Informers/version.json
@@ -0,0 +1,8 @@
+{
+ "$schema": "https://raw.githubusercontent.com/AArnott/Nerdbank.GitVersioning/master/src/NerdBank.GitVersioning/version.schema.json",
+ "version": "2.0",
+ "publicReleaseRefSpec": [
+ "^refs/heads/master$" // we release out of master
+ ],
+ "pathFilters": [".", "../KubernetesClient"]
+}
diff --git a/src/KubernetesClient/Authentication/GcpTokenProvider.cs b/src/KubernetesClient/Authentication/GcpTokenProvider.cs
new file mode 100644
index 000000000..4cd6c54b1
--- /dev/null
+++ b/src/KubernetesClient/Authentication/GcpTokenProvider.cs
@@ -0,0 +1,57 @@
+using System;
+using System.Diagnostics;
+using System.Net.Http.Headers;
+using System.Threading;
+using System.Threading.Tasks;
+using k8s.Exceptions;
+using Microsoft.Rest;
+using Newtonsoft.Json.Linq;
+
+namespace k8s.Authentication
+{
+ public class GcpTokenProvider : ITokenProvider
+ {
+ private readonly string _gcloudCli;
+ private DateTime _expiry;
+ private string _token;
+
+ public GcpTokenProvider(string gcloudCli)
+ {
+ _gcloudCli = gcloudCli;
+ }
+
+ public Task GetAuthenticationHeaderAsync(CancellationToken cancellationToken)
+ {
+ if (DateTime.UtcNow.AddSeconds(30) > _expiry)
+ {
+ RefreshToken();
+ }
+ return Task.FromResult(new AuthenticationHeaderValue("Bearer", _token));
+ }
+
+ private void RefreshToken()
+ {
+ var process = new Process();
+ process.StartInfo.FileName = _gcloudCli;
+ process.StartInfo.Arguments = "config config-helper --format=json";
+ process.StartInfo.UseShellExecute = false;
+ process.StartInfo.RedirectStandardOutput = true;
+ process.StartInfo.RedirectStandardError = true;
+ process.Start();
+ //* Read the output (or the error)
+ var output = process.StandardOutput.ReadToEnd();
+ Console.WriteLine(output);
+ var err = process.StandardError.ReadToEnd();
+ Console.WriteLine(err);
+ process.WaitForExit();
+ if (process.ExitCode != 0)
+ {
+ throw new KubernetesClientException($"Unable to obtain a token via gcloud command. Error code {process.ExitCode}. \n {err}");
+ }
+
+ var json = JToken.Parse(output);
+ _token = json["credential"]["access_token"].Value();
+ _expiry = json["credential"]["token_expiry"].Value();
+ }
+ }
+}
diff --git a/src/KubernetesClient/DryRun.cs b/src/KubernetesClient/DryRun.cs
new file mode 100644
index 000000000..355411e3f
--- /dev/null
+++ b/src/KubernetesClient/DryRun.cs
@@ -0,0 +1,9 @@
+using System.Runtime.Serialization;
+
+namespace k8s
+{
+ public enum DryRun
+ {
+ All
+ }
+}
diff --git a/src/KubernetesClient/Extensions.cs b/src/KubernetesClient/Extensions.cs
new file mode 100644
index 000000000..40e4e8bc5
--- /dev/null
+++ b/src/KubernetesClient/Extensions.cs
@@ -0,0 +1,75 @@
+using System;
+using System.Collections;
+using System.Reflection;
+using System.Text.RegularExpressions;
+using k8s.Models;
+
+namespace k8s
+{
+ public static class Extensions
+ {
+ public static KubernetesEntityAttribute GetKubernetesTypeMetadata(this T obj) where T : IKubernetesObject
+ {
+ return obj.GetType().GetKubernetesTypeMetadata();
+ }
+
+ public static KubernetesEntityAttribute GetKubernetesTypeMetadata(this Type currentType)
+ {
+ var attr = currentType.GetCustomAttribute();
+ if (attr == null)
+ {
+ throw new InvalidOperationException($"Custom resource must have {nameof(KubernetesEntityAttribute)} applied to it");
+ }
+
+ return attr;
+ }
+
+ public static T Initialize(this T obj) where T : IKubernetesObject
+ {
+ var metadata = obj.GetKubernetesTypeMetadata();
+
+ obj.ApiVersion = !string.IsNullOrEmpty(metadata.Group) ? $"{metadata.Group}/{metadata.ApiVersion}" : metadata.ApiVersion;
+ obj.Kind = metadata.Kind ?? obj.GetType().Name;
+ if (obj is IMetadata withMetadata && withMetadata.Metadata == null)
+ {
+ withMetadata.Metadata = new V1ObjectMeta();
+ }
+
+ return obj;
+ }
+
+ internal static bool IsValidKubernetesName(this string value)
+ {
+ return !Regex.IsMatch(value, "^[a-z0-9-]+$");
+ }
+
+ // Convert the string to camel case.
+ public static string ToCamelCase(this string value)
+ {
+ // If there are 0 or 1 characters, just return the string.
+ if (value == null || value.Length < 2)
+ {
+ return value;
+ }
+
+ // Split the string into words.
+ var words = value.Split(
+ new char[0],
+ StringSplitOptions.RemoveEmptyEntries);
+
+ // Combine the words.
+ var result = words[0].ToLower();
+ for (var i = 1; i < words.Length; i++)
+ {
+ result +=
+ words[i].Substring(0, 1).ToUpper() +
+ words[i].Substring(1);
+ }
+
+ return result;
+ }
+
+ public static bool In