Skip to content

Commit bfaa8b3

Browse files
committed
Support for informers
1 parent 7d17fba commit bfaa8b3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+4302
-35
lines changed

.dockerignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
**/bin/
2+
**/obj/
3+
**/out/
4+
**/layer/
5+
**Dockerfile*
6+
*/*.md

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ bin/
1313
# JetBrains Rider
1414
.idea/
1515
*.sln.iml
16+
examples/informers/Dockerfile

examples/httpClientFactory/Program.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ public static async Task Main(string[] args)
3232
serviceProvider.GetRequiredService<KubernetesClientConfiguration>(),
3333
httpClient);
3434
})
35-
.ConfigurePrimaryHttpMessageHandler(config.CreateDefaultHttpClientHandler);
35+
.ConfigurePrimaryHttpMessageHandler(config.CreateDefaultHttpClientHandler)
36+
.AddHttpMessageHandler(KubernetesClientConfiguration.CreateWatchHandler);
3637

3738
// Add the class that uses the client
3839
services.AddHostedService<PodListHostedService>();
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using Microsoft.Extensions.DependencyInjection;
6+
using Microsoft.Extensions.Hosting;
7+
8+
namespace informers
9+
{
10+
public class ControllerService : BackgroundService
11+
{
12+
private readonly IServiceProvider _serviceProvider;
13+
14+
public ControllerService(IServiceProvider serviceProvider)
15+
{
16+
_serviceProvider = serviceProvider;
17+
}
18+
19+
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
20+
{
21+
var controllers = _serviceProvider.GetServices<IController>();
22+
await Task.WhenAll(controllers.Select(x => x.Initialize(stoppingToken)));
23+
}
24+
25+
26+
}
27+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Reactive.Disposables;
5+
using System.Reactive.Linq;
6+
using System.Text;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
using System.Threading.Tasks.Dataflow;
10+
using k8s;
11+
using k8s.Informers;
12+
using k8s.Informers.Notifications;
13+
using k8s.Models;
14+
using KellermanSoftware.CompareNetObjects;
15+
using Microsoft.Extensions.Logging;
16+
using Newtonsoft.Json;
17+
using Newtonsoft.Json.Converters;
18+
19+
namespace informers
20+
{
21+
// this sample demos both informer and controller
22+
// there are two loggers:
23+
// _informerLogger lets you see raw data coming out of informer stream
24+
// _reconcilerLogger lets you see batches of object transitions that object went through since last time we did work on it
25+
// reconciler is purposely slowed down to show accumulation of events between worker actions
26+
27+
// try creating and deleting some pods in "default" namespace and watch the output
28+
29+
public class DeltaChangesQueryingController : IController
30+
{
31+
private readonly IKubernetesInformer<V1Pod> _podInformer;
32+
private readonly ILogger _reconcilerLogger;
33+
private ILogger _informerLogger;
34+
CompareLogic _objectCompare = new CompareLogic();
35+
// private readonly ActionBlock<List<ResourceEvent<V1Pod>>> _reconciler;
36+
private readonly CompositeDisposable _subscription = new CompositeDisposable();
37+
38+
public DeltaChangesQueryingController(IKubernetesInformer<V1Pod> podInformer, ILoggerFactory loggerFactory)
39+
{
40+
_podInformer = podInformer;
41+
_reconcilerLogger = loggerFactory.CreateLogger("Reconciler");
42+
_informerLogger = loggerFactory.CreateLogger("Informer");
43+
_objectCompare.Config.MaxDifferences = 100;
44+
// the commented sections show how to use advanced syntax to work with TPL dataflows. most scenarios can get away with .ProcessWith helper method
45+
46+
// _reconciler = new ActionBlock<List<ResourceEvent<V1Pod>>>(Reconcile,
47+
// new ExecutionDataflowBlockOptions()
48+
// {
49+
// BoundedCapacity = 2,
50+
// MaxDegreeOfParallelism = 2,
51+
// });
52+
}
53+
54+
// public Task Completion => _reconciler.Completion;
55+
56+
public Task Initialize(CancellationToken cancellationToken)
57+
{
58+
var workerQueue = _podInformer
59+
.GetResource(ResourceStreamType.ListWatch, new KubernetesInformerOptions() { Namespace = "default"})
60+
// .Resync(TimeSpan.FromSeconds(5))
61+
.Do(item =>
62+
{
63+
_informerLogger.LogInformation($"\n EventType: {item.EventFlags} \n Name: {item.Value.Metadata.Name} \n Version: {item.Value.Metadata.ResourceVersion}");
64+
})
65+
.Catch<ResourceEvent<V1Pod>,Exception>(e =>
66+
{
67+
_informerLogger.LogCritical(e, e.Message);
68+
return Observable.Throw<ResourceEvent<V1Pod>>(e);
69+
})
70+
.ToResourceEventDeltaBlock(x => x.Metadata.Name, out var informerSubscription);
71+
informerSubscription.DisposeWith(_subscription);
72+
workerQueue
73+
//.LinkTo(_reconciler) // working with action blocks directly for fine grained control
74+
.ProcessWith(Reconcile, _reconcilerLogger) // simplified syntax
75+
.DisposeWith(_subscription);
76+
return Task.CompletedTask;
77+
}
78+
79+
private async Task Reconcile(List<ResourceEvent<V1Pod>> changes)
80+
{
81+
// invoke reconcilation here
82+
83+
var obj = changes.First().Value;
84+
var sb = new StringBuilder();
85+
// sb.AppendLine($"Received changes for object with ID {KubernetesObject.KeySelector(obj)} with {changes.Count} items");
86+
sb.AppendLine($"Received changes for object with ID {KubernetesObject.KeySelector(obj)} with {changes.Count} items");
87+
sb.AppendLine($"Last known state was {changes.Last().EventFlags}");
88+
foreach (var item in changes)
89+
{
90+
sb.AppendLine($"==={item.EventFlags}===");
91+
sb.AppendLine($"Name: {item.Value.Metadata.Name}");
92+
sb.AppendLine($"Version: {item.Value.Metadata.ResourceVersion}");
93+
if (item.EventFlags.HasFlag(EventTypeFlags.Modify))
94+
{
95+
var updateDelta = _objectCompare.Compare(item.OldValue, item.Value);
96+
foreach (var difference in updateDelta.Differences)
97+
{
98+
sb.AppendLine($"{difference.PropertyName}: {difference.Object1} -> {difference.Object2}");
99+
}
100+
101+
}
102+
// sb.AppendLine(JsonConvert.SerializeObject(item, Formatting.Indented, new StringEnumConverter()));
103+
}
104+
_reconcilerLogger.LogInformation(sb.ToString());
105+
106+
await Task.Delay(TimeSpan.FromSeconds(10)); // simulate
107+
await Task.CompletedTask;
108+
109+
}
110+
}
111+
}

examples/informers/IController.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using System.Threading;
2+
using System.Threading.Tasks;
3+
4+
namespace informers
5+
{
6+
/// <summary>
7+
/// Base interface for implementing controllers
8+
/// </summary>
9+
public interface IController
10+
{
11+
/// <summary>
12+
/// Signals that controller is done processing all the work and no more work will ever be processed.
13+
/// Mainly useful in testing
14+
/// </summary>
15+
public Task Initialize(CancellationToken cancellationToken);
16+
}
17+
}

examples/informers/Program.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
using Microsoft.Extensions.Hosting;
2+
using Microsoft.Extensions.Logging;
3+
4+
namespace informers
5+
{
6+
public class Program
7+
{
8+
public static void Main(string[] args)
9+
{
10+
CreateHostBuilder(args).Build().Run();
11+
}
12+
13+
public static IHostBuilder CreateHostBuilder(string[] args) =>
14+
Host.CreateDefaultBuilder(args)
15+
.ConfigureLogging(x => x.AddConsole())
16+
.ConfigureServices((hostContext, services) =>
17+
{
18+
services.AddKubernetes();
19+
});
20+
21+
}
22+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"profiles": {
3+
"K8SControllerExample": {
4+
"commandName": "Project",
5+
"environmentVariables": {
6+
"DOTNET_ENVIRONMENT": "Development"
7+
}
8+
}
9+
}
10+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
using System;
2+
using System.Linq;
3+
using k8s;
4+
using k8s.Informers;
5+
using k8s.Informers.Cache;
6+
using Microsoft.Extensions.DependencyInjection;
7+
using Microsoft.Rest;
8+
using Microsoft.Rest.TransientFaultHandling;
9+
10+
namespace informers
11+
{
12+
public static class Extensions
13+
{
14+
public static IServiceCollection AddKubernetes(this IServiceCollection services)
15+
{
16+
services.AddHostedService<ControllerService>();
17+
var controllers = AppDomain.CurrentDomain.GetAssemblies()
18+
.SelectMany(x => x.DefinedTypes)
19+
.Where(x => x.IsClass && !x.IsAbstract && typeof(IController).IsAssignableFrom(x))
20+
.ToList();
21+
foreach (var controller in controllers)
22+
{
23+
services.AddSingleton(typeof(IController), controller);
24+
}
25+
var config = KubernetesClientConfiguration.BuildDefaultConfig();
26+
27+
services.AddSingleton(config);
28+
services.AddSingleton<RetryDelegatingHandler>();
29+
services.AddHttpClient("DefaultName")
30+
.AddTypedClient<IKubernetes>((httpClient, serviceProvider) =>
31+
new Kubernetes(
32+
serviceProvider.GetRequiredService<KubernetesClientConfiguration>(),
33+
httpClient))
34+
.AddHttpMessageHandler(() => new RetryDelegatingHandler()
35+
{
36+
RetryPolicy = new RetryPolicy<HttpStatusCodeErrorDetectionStrategy>(new ExponentialBackoffRetryStrategy())
37+
})
38+
.AddHttpMessageHandler(KubernetesClientConfiguration.CreateWatchHandler)
39+
.ConfigurePrimaryHttpMessageHandler(config.CreateDefaultHttpClientHandler);
40+
var kubernetesResources = AppDomain.CurrentDomain.GetAssemblies()
41+
.SelectMany(x => x.DefinedTypes)
42+
.Where(x => typeof(IKubernetesObject).IsAssignableFrom(x))
43+
.ToList();
44+
services.AddTransient(typeof(KubernetesInformer<>));
45+
services.AddSingleton(typeof(IKubernetesInformer<>), typeof(SharedKubernetesInformer<>));
46+
return services;
47+
}
48+
}
49+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"Logging": {
3+
"LogLevel": {
4+
"Default": "Information",
5+
"Microsoft": "Warning",
6+
"Microsoft.Hosting.Lifetime": "Information"
7+
}
8+
}
9+
}

examples/informers/appsettings.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"Logging": {
3+
"LogLevel": {
4+
"Default": "Information",
5+
"Microsoft": "Warning",
6+
"Microsoft.Hosting.Lifetime": "Information"
7+
}
8+
}
9+
}

examples/informers/informers.csproj

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<Project Sdk="Microsoft.NET.Sdk.Worker">
2+
3+
<PropertyGroup>
4+
<TargetFramework>netcoreapp3.1</TargetFramework>
5+
<SignAssembly>true</SignAssembly>
6+
<AssemblyOriginatorKeyFile>..\..\src\KubernetesClient\kubernetes-client.snk</AssemblyOriginatorKeyFile>
7+
</PropertyGroup>
8+
9+
<ItemGroup>
10+
<PackageReference Include="CompareNETObjects" Version="4.65.0" />
11+
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.2" />
12+
<PackageReference Include="Microsoft.Extensions.Http" Version="3.1.2" />
13+
<PackageReference Include="Microsoft.Extensions.Http" Version="3.1.2" />
14+
<PackageReference Include="NMica" Version="2.0.2" />
15+
</ItemGroup>
16+
17+
<ItemGroup>
18+
<ProjectReference Include="..\..\src\KubernetesClient\KubernetesClient.csproj" />
19+
</ItemGroup>
20+
</Project>

kubernetes-client.sln

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "patch", "examples\patch\pat
3535
EndProject
3636
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "httpClientFactory", "examples\httpClientFactory\httpClientFactory.csproj", "{A07314A0-02E8-4F36-B233-726D59D28F08}"
3737
EndProject
38+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "informers", "examples\informers\informers.csproj", "{A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}"
39+
EndProject
3840
Global
3941
GlobalSection(SolutionConfigurationPlatforms) = preSolution
4042
Debug|Any CPU = Debug|Any CPU
@@ -189,6 +191,18 @@ Global
189191
{A07314A0-02E8-4F36-B233-726D59D28F08}.Release|x64.Build.0 = Release|Any CPU
190192
{A07314A0-02E8-4F36-B233-726D59D28F08}.Release|x86.ActiveCfg = Release|Any CPU
191193
{A07314A0-02E8-4F36-B233-726D59D28F08}.Release|x86.Build.0 = Release|Any CPU
194+
{A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
195+
{A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}.Debug|Any CPU.Build.0 = Debug|Any CPU
196+
{A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}.Debug|x64.ActiveCfg = Debug|Any CPU
197+
{A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}.Debug|x64.Build.0 = Debug|Any CPU
198+
{A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}.Debug|x86.ActiveCfg = Debug|Any CPU
199+
{A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}.Debug|x86.Build.0 = Debug|Any CPU
200+
{A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}.Release|Any CPU.ActiveCfg = Release|Any CPU
201+
{A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}.Release|Any CPU.Build.0 = Release|Any CPU
202+
{A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}.Release|x64.ActiveCfg = Release|Any CPU
203+
{A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}.Release|x64.Build.0 = Release|Any CPU
204+
{A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}.Release|x86.ActiveCfg = Release|Any CPU
205+
{A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2}.Release|x86.Build.0 = Release|Any CPU
192206
EndGlobalSection
193207
GlobalSection(SolutionProperties) = preSolution
194208
HideSolutionNode = FALSE
@@ -206,6 +220,7 @@ Global
206220
{542DC30E-FDF7-4A35-B026-6C21F435E8B1} = {879F8787-C3BB-43F3-A92D-6D4C7D3A5285}
207221
{04DE2C84-117D-4E21-8B45-B7AE627697BD} = {B70AFB57-57C9-46DC-84BE-11B7DDD34B40}
208222
{A07314A0-02E8-4F36-B233-726D59D28F08} = {B70AFB57-57C9-46DC-84BE-11B7DDD34B40}
223+
{A24E81F3-EFB1-4AFE-8C87-BDF2D3A0C0C2} = {B70AFB57-57C9-46DC-84BE-11B7DDD34B40}
209224
EndGlobalSection
210225
GlobalSection(ExtensibilityGlobals) = postSolution
211226
SolutionGuid = {049A763A-C891-4E8D-80CF-89DD3E22ADC7}

0 commit comments

Comments
 (0)