Skip to content

Commit 1248ac8

Browse files
committed
Support for informers
1 parent 01a52c9 commit 1248ac8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+5353
-34
lines changed

.dockerignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
**/bin/
2+
**/obj/
3+
**/out/
4+
**/layer/
5+
**Dockerfile*
6+
*/*.md

.github/workflows/dotnet.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,25 +15,25 @@ jobs:
1515
uses: actions/setup-dotnet@v1
1616
with:
1717
dotnet-version: 2.1.202
18-
- name: Setup dotnet runtime 2.1.17
18+
- name: Setup dotnet runtime 2.1.0
1919
uses: actions/setup-dotnet@v1
2020
with:
21-
dotnet-version: 2.1.805
21+
dotnet-version: 2.1.300
2222
- name: Setup dotnet SDK 3.1
2323
uses: actions/setup-dotnet@v1
2424
with:
2525
dotnet-version: 3.1.201
2626
- name: Setup SxS dotnet
2727
if: matrix.os == 'windows-latest'
2828
run: |
29-
(robocopy %DOTNET_ROOT:3.1.201=2.1.805% %DOTNET_ROOT% /E /XC /XN /XO /NFL /NDL /NJH /NJS /NP) ^& if %ERRORLEVEL% geq 8 exit /B %ERRORLEVEL%
29+
(robocopy %DOTNET_ROOT:3.1.201=2.1.300% %DOTNET_ROOT% /E /XC /XN /XO /NFL /NDL /NJH /NJS /NP) ^& if %ERRORLEVEL% geq 8 exit /B %ERRORLEVEL%
3030
(robocopy %DOTNET_ROOT:3.1.201=2.1.202% %DOTNET_ROOT% /E /XC /XN /XO /NFL /NDL /NJH /NJS /NP) ^& if %ERRORLEVEL% geq 8 exit /B %ERRORLEVEL%
3131
exit /B 0
3232
shell: cmd
3333
- name: Setup SxS dotnet
3434
if: matrix.os != 'windows-latest'
3535
run: |
36-
rsync --archive --ignore-existing ${DOTNET_ROOT/3.1.201/2.1.805}/ ${DOTNET_ROOT/3.1.201/2.1.202}/ $DOTNET_ROOT
36+
rsync --archive --ignore-existing ${DOTNET_ROOT/3.1.201/2.1.300}/ ${DOTNET_ROOT/3.1.201/2.1.202}/ $DOTNET_ROOT
3737
- name: Build
3838
run: dotnet build --configuration Release
3939
- name: Test

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ bin/
1313
# JetBrains Rider
1414
.idea/
1515
*.sln.iml
16+
examples/informers/Dockerfile
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using Microsoft.Extensions.DependencyInjection;
6+
using Microsoft.Extensions.Hosting;
7+
8+
namespace informers
9+
{
10+
public class ControllerService : BackgroundService
11+
{
12+
private readonly IServiceProvider _serviceProvider;
13+
14+
public ControllerService(IServiceProvider serviceProvider)
15+
{
16+
_serviceProvider = serviceProvider;
17+
}
18+
19+
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
20+
{
21+
var controllers = _serviceProvider.GetServices<IController>();
22+
await Task.WhenAll(controllers.Select(x => x.Initialize(stoppingToken)));
23+
}
24+
25+
26+
}
27+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Reactive.Disposables;
5+
using System.Reactive.Linq;
6+
using System.Text;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
using System.Threading.Tasks.Dataflow;
10+
using k8s;
11+
using k8s.Informers;
12+
using k8s.Informers.Notifications;
13+
using k8s.Models;
14+
using KellermanSoftware.CompareNetObjects;
15+
using Microsoft.Extensions.Logging;
16+
using Newtonsoft.Json;
17+
using Newtonsoft.Json.Converters;
18+
19+
namespace informers
20+
{
21+
// this sample demos both informer and controller
22+
// there are two loggers:
23+
// _informerLogger lets you see raw data coming out of informer stream
24+
// _reconcilerLogger lets you see batches of object transitions that object went through since last time we did work on it
25+
// reconciler is purposely slowed down to show accumulation of events between worker actions
26+
27+
// try creating and deleting some pods in "default" namespace and watch the output
28+
29+
public class DeltaChangesQueryingController : IController
30+
{
31+
private readonly IKubernetesInformer<V1Pod> _podInformer;
32+
private readonly ILogger _reconcilerLogger;
33+
private ILogger _informerLogger;
34+
CompareLogic _objectCompare = new CompareLogic();
35+
// private readonly ActionBlock<List<ResourceEvent<V1Pod>>> _reconciler;
36+
private readonly CompositeDisposable _subscription = new CompositeDisposable();
37+
38+
public DeltaChangesQueryingController(IKubernetesInformer<V1Pod> podInformer, ILoggerFactory loggerFactory)
39+
{
40+
_podInformer = podInformer;
41+
_reconcilerLogger = loggerFactory.CreateLogger("Reconciler");
42+
_informerLogger = loggerFactory.CreateLogger("Informer");
43+
_objectCompare.Config.MaxDifferences = 100;
44+
// the commented sections show how to use advanced syntax to work with TPL dataflows. most scenarios can get away with .ProcessWith helper method
45+
46+
// _reconciler = new ActionBlock<List<ResourceEvent<V1Pod>>>(Reconcile,
47+
// new ExecutionDataflowBlockOptions()
48+
// {
49+
// BoundedCapacity = 2,
50+
// MaxDegreeOfParallelism = 2,
51+
// });
52+
}
53+
54+
// public Task Completion => _reconciler.Completion;
55+
56+
public Task Initialize(CancellationToken cancellationToken)
57+
{
58+
var workerQueue = _podInformer
59+
.GetResource(ResourceStreamType.ListWatch, new KubernetesInformerOptions() { Namespace = "default"})
60+
// .Resync(TimeSpan.FromSeconds(5))
61+
.Do(item =>
62+
{
63+
_informerLogger.LogInformation($"\n EventType: {item.EventFlags} \n Name: {item.Value.Metadata.Name} \n Version: {item.Value.Metadata.ResourceVersion}");
64+
})
65+
.Catch<ResourceEvent<V1Pod>,Exception>(e =>
66+
{
67+
_informerLogger.LogCritical(e, e.Message);
68+
return Observable.Throw<ResourceEvent<V1Pod>>(e);
69+
})
70+
.ToResourceEventDeltaBlock(x => x.Metadata.Name, out var informerSubscription);
71+
informerSubscription.DisposeWith(_subscription);
72+
workerQueue
73+
//.LinkTo(_reconciler) // working with action blocks directly for fine grained control
74+
.ProcessWith(Reconcile, _reconcilerLogger) // simplified syntax
75+
.DisposeWith(_subscription);
76+
return Task.CompletedTask;
77+
}
78+
79+
private async Task Reconcile(List<ResourceEvent<V1Pod>> changes)
80+
{
81+
// invoke reconcilation here
82+
83+
var obj = changes.First().Value;
84+
var sb = new StringBuilder();
85+
// sb.AppendLine($"Received changes for object with ID {KubernetesObject.KeySelector(obj)} with {changes.Count} items");
86+
sb.AppendLine($"Received changes for object with ID {KubernetesObject.KeySelector(obj)} with {changes.Count} items");
87+
sb.AppendLine($"Last known state was {changes.Last().EventFlags}");
88+
foreach (var item in changes)
89+
{
90+
sb.AppendLine($"==={item.EventFlags}===");
91+
sb.AppendLine($"Name: {item.Value.Metadata.Name}");
92+
sb.AppendLine($"Version: {item.Value.Metadata.ResourceVersion}");
93+
if (item.EventFlags.HasFlag(EventTypeFlags.Modify))
94+
{
95+
var updateDelta = _objectCompare.Compare(item.OldValue, item.Value);
96+
foreach (var difference in updateDelta.Differences)
97+
{
98+
sb.AppendLine($"{difference.PropertyName}: {difference.Object1} -> {difference.Object2}");
99+
}
100+
101+
}
102+
// sb.AppendLine(JsonConvert.SerializeObject(item, Formatting.Indented, new StringEnumConverter()));
103+
}
104+
_reconcilerLogger.LogInformation(sb.ToString());
105+
106+
await Task.Delay(TimeSpan.FromSeconds(10)); // simulate
107+
await Task.CompletedTask;
108+
109+
}
110+
}
111+
}

examples/informers/IController.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using System.Threading;
2+
using System.Threading.Tasks;
3+
4+
namespace informers
5+
{
6+
/// <summary>
7+
/// Base interface for implementing controllers
8+
/// </summary>
9+
public interface IController
10+
{
11+
/// <summary>
12+
/// Signals that controller is done processing all the work and no more work will ever be processed.
13+
/// Mainly useful in testing
14+
/// </summary>
15+
public Task Initialize(CancellationToken cancellationToken);
16+
}
17+
}

examples/informers/Program.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
using Microsoft.Extensions.Hosting;
2+
using Microsoft.Extensions.Logging;
3+
4+
namespace informers
5+
{
6+
public class Program
7+
{
8+
public static void Main(string[] args)
9+
{
10+
CreateHostBuilder(args).Build().Run();
11+
}
12+
13+
public static IHostBuilder CreateHostBuilder(string[] args) =>
14+
Host.CreateDefaultBuilder(args)
15+
.ConfigureLogging(x => x.AddConsole())
16+
.ConfigureServices((hostContext, services) =>
17+
{
18+
services.AddKubernetes();
19+
});
20+
21+
}
22+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"profiles": {
3+
"K8SControllerExample": {
4+
"commandName": "Project",
5+
"environmentVariables": {
6+
"DOTNET_ENVIRONMENT": "Development"
7+
}
8+
}
9+
}
10+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading;
4+
using k8s;
5+
using k8s.Informers;
6+
using k8s.Informers.Cache;
7+
using Microsoft.Extensions.DependencyInjection;
8+
using Microsoft.Rest;
9+
using Microsoft.Rest.TransientFaultHandling;
10+
11+
namespace informers
12+
{
13+
public static class Extensions
14+
{
15+
public static IServiceCollection AddKubernetes(this IServiceCollection services)
16+
{
17+
services.AddHostedService<ControllerService>();
18+
var controllers = AppDomain.CurrentDomain.GetAssemblies()
19+
.SelectMany(x => x.DefinedTypes)
20+
.Where(x => x.IsClass && !x.IsAbstract && typeof(IController).IsAssignableFrom(x))
21+
.ToList();
22+
foreach (var controller in controllers)
23+
{
24+
services.AddSingleton(typeof(IController), controller);
25+
}
26+
var config = KubernetesClientConfiguration.BuildDefaultConfig();
27+
28+
services.AddSingleton(config);
29+
services.AddSingleton<RetryDelegatingHandler>();
30+
services.AddHttpClient("DefaultName")
31+
.AddTypedClient<IKubernetes>((httpClient, serviceProvider) =>
32+
{
33+
httpClient.Timeout = Timeout.InfiniteTimeSpan;
34+
return new Kubernetes(
35+
serviceProvider.GetRequiredService<KubernetesClientConfiguration>(),
36+
httpClient);
37+
})
38+
.AddHttpMessageHandler(() => new TimeoutHandler(TimeSpan.FromSeconds(5)))
39+
.AddHttpMessageHandler(() => new RetryDelegatingHandler()
40+
{
41+
RetryPolicy = new RetryPolicy<HttpStatusCodeErrorDetectionStrategy>(new ExponentialBackoffRetryStrategy())
42+
})
43+
.AddHttpMessageHandler(KubernetesClientConfiguration.CreateWatchHandler)
44+
.ConfigurePrimaryHttpMessageHandler(config.CreateDefaultHttpClientHandler);
45+
var kubernetesResources = AppDomain.CurrentDomain.GetAssemblies()
46+
.SelectMany(x => x.DefinedTypes)
47+
.Where(x => typeof(IKubernetesObject).IsAssignableFrom(x))
48+
.ToList();
49+
services.AddTransient(typeof(KubernetesInformer<>));
50+
services.AddSingleton(typeof(IKubernetesInformer<>), typeof(SharedKubernetesInformer<>));
51+
return services;
52+
}
53+
}
54+
}

examples/informers/TimeoutHandler.cs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
using System;
2+
using System.Linq;
3+
using System.Net.Http;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Microsoft.AspNetCore.WebUtilities;
7+
8+
namespace informers
9+
{
10+
internal class TimeoutHandler : DelegatingHandler
11+
{
12+
private readonly TimeSpan _timeout;
13+
14+
public TimeoutHandler(TimeSpan timeout)
15+
{
16+
_timeout = timeout;
17+
}
18+
19+
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
20+
{
21+
var query = QueryHelpers.ParseQuery(request.RequestUri.Query);
22+
23+
if (!(query.TryGetValue("watch", out var values) && values.Any(v => v == "true")))
24+
{
25+
var cts = new CancellationTokenSource(_timeout);
26+
cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, cts.Token).Token;
27+
}
28+
var originResponse = await base.SendAsync(request, cancellationToken).ConfigureAwait(false);
29+
30+
return originResponse;
31+
}
32+
33+
}
34+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"Logging": {
3+
"LogLevel": {
4+
"Default": "Information",
5+
"Microsoft": "Warning",
6+
"Microsoft.Hosting.Lifetime": "Information"
7+
}
8+
}
9+
}

examples/informers/appsettings.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"Logging": {
3+
"LogLevel": {
4+
"Default": "Information",
5+
"Microsoft": "Warning",
6+
"Microsoft.Hosting.Lifetime": "Information"
7+
}
8+
}
9+
}

examples/informers/informers.csproj

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<Project Sdk="Microsoft.NET.Sdk.Worker">
2+
3+
<PropertyGroup>
4+
<TargetFramework>netcoreapp3.1</TargetFramework>
5+
<SignAssembly>true</SignAssembly>
6+
<AssemblyOriginatorKeyFile>..\..\src\KubernetesClient\kubernetes-client.snk</AssemblyOriginatorKeyFile>
7+
</PropertyGroup>
8+
9+
<ItemGroup>
10+
<PackageReference Include="CompareNETObjects" Version="4.65.0" />
11+
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.2" />
12+
<PackageReference Include="Microsoft.Extensions.Http" Version="3.1.2" />
13+
<PackageReference Include="Microsoft.Extensions.Http" Version="3.1.2" />
14+
<PackageReference Include="NMica" Version="2.0.2" />
15+
</ItemGroup>
16+
17+
<ItemGroup>
18+
<ProjectReference Include="..\..\src\KubernetesClient\KubernetesClient.csproj" />
19+
</ItemGroup>
20+
</Project>

0 commit comments

Comments
 (0)