Skip to content

Commit 43c0c92

Browse files
committed
Rework kubelet plugin code with changes introduced for 1.31
Signed-off-by: Kevin Klues <[email protected]>
1 parent 71019d2 commit 43c0c92

File tree

9 files changed

+122
-286
lines changed

9 files changed

+122
-286
lines changed

cmd/dra-example-kubeletplugin/cdi.go

+15-27
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ import (
2222

2323
cdiapi "github.com/container-orchestrated-devices/container-device-interface/pkg/cdi"
2424
cdispec "github.com/container-orchestrated-devices/container-device-interface/specs-go"
25-
26-
gpucrd "sigs.k8s.io/dra-example-driver/api/example.com/resource/gpu/v1alpha1"
2725
)
2826

2927
const (
@@ -89,7 +87,7 @@ func (cdi *CDIHandler) CreateCommonSpecFile() error {
8987
return cdi.registry.SpecDB().WriteSpec(spec, specName)
9088
}
9189

92-
func (cdi *CDIHandler) CreateClaimSpecFile(claimUID string, devices *PreparedDevices) error {
90+
func (cdi *CDIHandler) CreateClaimSpecFile(claimUID string, devices PreparedDevices) error {
9391
specName := cdiapi.GenerateTransientSpecName(cdiVendor, cdiClass, claimUID)
9492

9593
spec := &cdispec.Spec{
@@ -98,22 +96,17 @@ func (cdi *CDIHandler) CreateClaimSpecFile(claimUID string, devices *PreparedDev
9896
}
9997

10098
gpuIdx := 0
101-
switch devices.Type() {
102-
case gpucrd.GpuDeviceType:
103-
for _, device := range devices.Gpu.Devices {
104-
cdiDevice := cdispec.Device{
105-
Name: device.UUID,
106-
ContainerEdits: cdispec.ContainerEdits{
107-
Env: []string{
108-
fmt.Sprintf("GPU_DEVICE_%d=%s", gpuIdx, device.UUID),
109-
},
99+
for _, device := range devices {
100+
cdiDevice := cdispec.Device{
101+
Name: device.DeviceName,
102+
ContainerEdits: cdispec.ContainerEdits{
103+
Env: []string{
104+
fmt.Sprintf("GPU_DEVICE_%d=%s", gpuIdx, device.DeviceName),
110105
},
111-
}
112-
spec.Devices = append(spec.Devices, cdiDevice)
113-
gpuIdx++
106+
},
114107
}
115-
default:
116-
return fmt.Errorf("unknown device type: %v", devices.Type())
108+
spec.Devices = append(spec.Devices, cdiDevice)
109+
gpuIdx++
117110
}
118111

119112
minVersion, err := cdiapi.MinimumRequiredVersion(spec)
@@ -130,20 +123,15 @@ func (cdi *CDIHandler) DeleteClaimSpecFile(claimUID string) error {
130123
return cdi.registry.SpecDB().RemoveSpec(specName)
131124
}
132125

133-
func (cdi *CDIHandler) GetClaimDevices(claimUID string, devices *PreparedDevices) ([]string, error) {
126+
func (cdi *CDIHandler) GetClaimDevices(devices []string) []string {
134127
cdiDevices := []string{
135128
cdiapi.QualifiedName(cdiVendor, cdiClass, cdiCommonDeviceName),
136129
}
137130

138-
switch devices.Type() {
139-
case gpucrd.GpuDeviceType:
140-
for _, device := range devices.Gpu.Devices {
141-
cdiDevice := cdiapi.QualifiedName(cdiVendor, cdiClass, device.UUID)
142-
cdiDevices = append(cdiDevices, cdiDevice)
143-
}
144-
default:
145-
return nil, fmt.Errorf("unknown device type: %v", devices.Type())
131+
for _, device := range devices {
132+
cdiDevice := cdiapi.QualifiedName(cdiVendor, cdiClass, device)
133+
cdiDevices = append(cdiDevices, cdiDevice)
146134
}
147135

148-
return cdiDevices, nil
136+
return cdiDevices
149137
}

cmd/dra-example-kubeletplugin/checkpoint.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func newCheckpoint() *Checkpoint {
1919
pc := &Checkpoint{
2020
Checksum: 0,
2121
V1: &CheckpointV1{
22-
PreparedClaims: make(map[string]*PreparedDevices),
22+
PreparedClaims: make(PreparedClaims),
2323
},
2424
}
2525
return pc

cmd/dra-example-kubeletplugin/discovery.go

+13-6
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ import (
2020
"math/rand"
2121
"os"
2222

23+
resourceapi "k8s.io/api/resource/v1alpha3"
24+
"k8s.io/utils/ptr"
25+
2326
"github.com/google/uuid"
2427
)
2528

@@ -30,13 +33,17 @@ func enumerateAllPossibleDevices() (AllocatableDevices, error) {
3033

3134
alldevices := make(AllocatableDevices)
3235
for _, uuid := range uuids {
33-
deviceInfo := &AllocatableDeviceInfo{
34-
GpuInfo: &GpuInfo{
35-
UUID: uuid,
36-
model: "LATEST-GPU-MODEL",
36+
device := resourceapi.Device{
37+
Name: uuid,
38+
Basic: &resourceapi.BasicDevice{
39+
Attributes: map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{
40+
"model": {
41+
StringValue: ptr.To("LATEST-GPU-MODEL"),
42+
},
43+
},
3744
},
3845
}
39-
alldevices[uuid] = deviceInfo
46+
alldevices[uuid] = device
4047
}
4148
return alldevices, nil
4249
}
@@ -49,7 +56,7 @@ func generateUUIDs(seed string, count int) []string {
4956
charset := make([]byte, 16)
5057
rand.Read(charset)
5158
uuid, _ := uuid.FromBytes(charset)
52-
uuids[i] = "GPU-" + uuid.String()
59+
uuids[i] = "gpu-" + uuid.String()
5360
}
5461

5562
return uuids

cmd/dra-example-kubeletplugin/driver.go

+44-65
Original file line numberDiff line numberDiff line change
@@ -20,132 +20,111 @@ import (
2020
"context"
2121
"fmt"
2222

23-
resourceapi "k8s.io/api/resource/v1alpha3"
23+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24+
coreclientset "k8s.io/client-go/kubernetes"
25+
"k8s.io/dynamic-resource-allocation/kubeletplugin"
2426
"k8s.io/klog/v2"
27+
2528
drapbv1 "k8s.io/kubelet/pkg/apis/dra/v1alpha4"
2629
)
2730

2831
var _ drapbv1.NodeServer = &driver{}
2932

3033
type driver struct {
31-
doneCh chan struct{}
34+
client coreclientset.Interface
35+
plugin kubeletplugin.DRAPlugin
3236
state *DeviceState
3337
}
3438

3539
func NewDriver(ctx context.Context, config *Config) (*driver, error) {
40+
driver := &driver{
41+
client: config.coreclient,
42+
}
43+
3644
state, err := NewDeviceState(config)
3745
if err != nil {
3846
return nil, err
3947
}
48+
driver.state = state
49+
50+
plugin, err := kubeletplugin.Start(
51+
ctx,
52+
driver,
53+
kubeletplugin.KubeClient(config.coreclient),
54+
kubeletplugin.NodeName(config.flags.nodeName),
55+
kubeletplugin.DriverName(DriverName),
56+
kubeletplugin.RegistrarSocketPath(PluginRegistrationPath),
57+
kubeletplugin.PluginSocketPath(DriverPluginSocketPath),
58+
kubeletplugin.KubeletPluginSocketPath(DriverPluginSocketPath))
59+
if err != nil {
60+
return nil, err
61+
}
62+
driver.plugin = plugin
4063

41-
d := &driver{
42-
state: state,
64+
var resources kubeletplugin.Resources
65+
for _, device := range state.allocatable {
66+
resources.Devices = append(resources.Devices, device)
4367
}
68+
plugin.PublishResources(ctx, resources)
4469

45-
return d, nil
70+
return driver, nil
4671
}
4772

4873
func (d *driver) Shutdown(ctx context.Context) error {
49-
close(d.doneCh)
74+
d.plugin.Stop()
5075
return nil
5176
}
5277

53-
func (d *driver) NodeListAndWatchResources(req *drapbv1.NodeListAndWatchResourcesRequest, stream drapbv1.Node_NodeListAndWatchResourcesServer) error {
54-
model := d.state.getResourceModelFromAllocatableDevices()
55-
resp := &drapbv1.NodeListAndWatchResourcesResponse{
56-
Resources: []*resourceapi.ResourceModel{&model},
57-
}
58-
59-
if err := stream.Send(resp); err != nil {
60-
return err
61-
}
62-
63-
//nolint:all,S1000: should use for range instead of for { select {} } (gosimple)
64-
for {
65-
select {
66-
case <-d.doneCh:
67-
return nil
68-
}
69-
// TODO: Update with case for when GPUs go unhealthy
70-
}
71-
}
72-
7378
func (d *driver) NodePrepareResources(ctx context.Context, req *drapbv1.NodePrepareResourcesRequest) (*drapbv1.NodePrepareResourcesResponse, error) {
7479
klog.Infof("NodePrepareResource is called: number of claims: %d", len(req.Claims))
7580
preparedResources := &drapbv1.NodePrepareResourcesResponse{Claims: map[string]*drapbv1.NodePrepareResourceResponse{}}
7681

77-
// In production version some common operations of d.nodeUnprepareResources
78-
// should be done outside of the loop, for instance updating the CR could
79-
// be done once after all HW was prepared.
8082
for _, claim := range req.Claims {
81-
preparedResources.Claims[claim.Uid] = d.nodePrepareResource(ctx, claim)
83+
preparedResources.Claims[claim.UID] = d.nodePrepareResource(ctx, claim)
8284
}
8385

8486
return preparedResources, nil
8587
}
8688

8789
func (d *driver) nodePrepareResource(ctx context.Context, claim *drapbv1.Claim) *drapbv1.NodePrepareResourceResponse {
88-
if len(claim.StructuredResourceHandle) == 0 {
89-
return &drapbv1.NodePrepareResourceResponse{
90-
Error: "driver only supports structured parameters",
91-
}
92-
}
93-
94-
allocated, err := d.getAllocatedDevices(ctx, claim)
90+
resourceClaim, err := d.client.ResourceV1alpha3().ResourceClaims(claim.Namespace).Get(
91+
ctx,
92+
claim.Name,
93+
metav1.GetOptions{})
9594
if err != nil {
9695
return &drapbv1.NodePrepareResourceResponse{
97-
Error: fmt.Sprintf("error allocating devices for claim %v: %v", claim.Uid, err),
96+
Error: fmt.Sprintf("failed to fetch ResourceClaim %s in namespace %s", claim.Name, claim.Namespace),
9897
}
9998
}
10099

101-
prepared, err := d.state.Prepare(claim.Uid, allocated)
100+
prepared, err := d.state.Prepare(resourceClaim)
102101
if err != nil {
103102
return &drapbv1.NodePrepareResourceResponse{
104-
Error: fmt.Sprintf("error preparing devices for claim %v: %v", claim.Uid, err),
103+
Error: fmt.Sprintf("error preparing devices for claim %v: %v", claim.UID, err),
105104
}
106105
}
107106

108-
klog.Infof("Returning newly prepared devices for claim '%v': %s", claim.Uid, prepared)
109-
return &drapbv1.NodePrepareResourceResponse{CDIDevices: prepared}
107+
klog.Infof("Returning newly prepared devices for claim '%v': %v", claim.UID, prepared)
108+
return &drapbv1.NodePrepareResourceResponse{Devices: prepared}
110109
}
111110

112111
func (d *driver) NodeUnprepareResources(ctx context.Context, req *drapbv1.NodeUnprepareResourcesRequest) (*drapbv1.NodeUnprepareResourcesResponse, error) {
113112
klog.Infof("NodeUnPrepareResource is called: number of claims: %d", len(req.Claims))
114113
unpreparedResources := &drapbv1.NodeUnprepareResourcesResponse{Claims: map[string]*drapbv1.NodeUnprepareResourceResponse{}}
115114

116115
for _, claim := range req.Claims {
117-
unpreparedResources.Claims[claim.Uid] = d.nodeUnprepareResource(ctx, claim)
116+
unpreparedResources.Claims[claim.UID] = d.nodeUnprepareResource(ctx, claim)
118117
}
119118

120119
return unpreparedResources, nil
121120
}
122121

123122
func (d *driver) nodeUnprepareResource(ctx context.Context, claim *drapbv1.Claim) *drapbv1.NodeUnprepareResourceResponse {
124-
if len(claim.StructuredResourceHandle) == 0 {
125-
return &drapbv1.NodeUnprepareResourceResponse{
126-
Error: "driver only supports structured parameters",
127-
}
128-
}
129-
130-
if err := d.state.Unprepare(claim.Uid); err != nil {
123+
if err := d.state.Unprepare(claim.UID); err != nil {
131124
return &drapbv1.NodeUnprepareResourceResponse{
132-
Error: fmt.Sprintf("error unpreparing devices for claim %v: %v", claim.Uid, err),
125+
Error: fmt.Sprintf("error unpreparing devices for claim %v: %v", claim.UID, err),
133126
}
134127
}
135128

136129
return &drapbv1.NodeUnprepareResourceResponse{}
137130
}
138-
139-
func (d *driver) getAllocatedDevices(ctx context.Context, claim *drapbv1.Claim) (AllocatedDevices, error) {
140-
allocated := AllocatedDevices{
141-
Gpu: &AllocatedGpus{},
142-
}
143-
144-
for _, r := range claim.StructuredResourceHandle[0].Results {
145-
name := r.AllocationResultModel.NamedResources.Name
146-
gpu := fmt.Sprintf("GPU-%s", name[4:])
147-
allocated.Gpu.Devices = append(allocated.Gpu.Devices, gpu)
148-
}
149-
150-
return allocated, nil
151-
}

cmd/dra-example-kubeletplugin/main.go

+15-23
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,14 @@ import (
2525

2626
"github.com/urfave/cli/v2"
2727

28-
plugin "k8s.io/dynamic-resource-allocation/kubeletplugin"
28+
coreclientset "k8s.io/client-go/kubernetes"
2929
"k8s.io/klog/v2"
3030

31-
gpucrd "sigs.k8s.io/dra-example-driver/api/example.com/resource/gpu/v1alpha1"
32-
exampleclientset "sigs.k8s.io/dra-example-driver/pkg/example.com/resource/clientset/versioned"
3331
"sigs.k8s.io/dra-example-driver/pkg/flags"
3432
)
3533

3634
const (
37-
DriverName = gpucrd.GroupName
35+
DriverName = "gpu.example.com"
3836

3937
PluginRegistrationPath = "/var/lib/kubelet/plugins_registry/" + DriverName + ".sock"
4038
DriverPluginPath = "/var/lib/kubelet/plugins/" + DriverName
@@ -44,15 +42,15 @@ const (
4442

4543
type Flags struct {
4644
kubeClientConfig flags.KubeClientConfig
47-
crdConfig flags.CRDConfig
4845
loggingConfig *flags.LoggingConfig
4946

50-
cdiRoot string
47+
nodeName string
48+
cdiRoot string
5149
}
5250

5351
type Config struct {
54-
flags *Flags
55-
exampleclient exampleclientset.Interface
52+
flags *Flags
53+
coreclient coreclientset.Interface
5654
}
5755

5856
func main() {
@@ -67,6 +65,13 @@ func newApp() *cli.App {
6765
loggingConfig: flags.NewLoggingConfig(),
6866
}
6967
cliFlags := []cli.Flag{
68+
&cli.StringFlag{
69+
Name: "node-name",
70+
Usage: "The name of the node to be worked on.",
71+
Required: true,
72+
Destination: &flags.nodeName,
73+
EnvVars: []string{"NODE_NAME"},
74+
},
7075
&cli.StringFlag{
7176
Name: "cdi-root",
7277
Usage: "Absolute path to the directory where CDI files will be generated.",
@@ -76,7 +81,6 @@ func newApp() *cli.App {
7681
},
7782
}
7883
cliFlags = append(cliFlags, flags.kubeClientConfig.Flags()...)
79-
cliFlags = append(cliFlags, flags.crdConfig.Flags()...)
8084
cliFlags = append(cliFlags, flags.loggingConfig.Flags()...)
8185

8286
app := &cli.App{
@@ -99,8 +103,8 @@ func newApp() *cli.App {
99103
}
100104

101105
config := &Config{
102-
flags: flags,
103-
exampleclient: clientSets.Example,
106+
flags: flags,
107+
coreclient: clientSets.Core,
104108
}
105109

106110
return StartPlugin(ctx, config)
@@ -134,22 +138,10 @@ func StartPlugin(ctx context.Context, config *Config) error {
134138
return err
135139
}
136140

137-
dp, err := plugin.Start(
138-
driver,
139-
plugin.DriverName(DriverName),
140-
plugin.RegistrarSocketPath(PluginRegistrationPath),
141-
plugin.PluginSocketPath(DriverPluginSocketPath),
142-
plugin.KubeletPluginSocketPath(DriverPluginSocketPath))
143-
if err != nil {
144-
return err
145-
}
146-
147141
sigc := make(chan os.Signal, 1)
148142
signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
149143
<-sigc
150144

151-
dp.Stop()
152-
153145
err = driver.Shutdown(ctx)
154146
if err != nil {
155147
klog.FromContext(ctx).Error(err, "Unable to cleanly shutdown driver")

0 commit comments

Comments
 (0)