-
Notifications
You must be signed in to change notification settings - Fork 597
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a Runner that watches for Kubernetes APIs
- Loading branch information
Showing
4 changed files
with
274 additions
and
35 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,208 @@ | ||
// Copyright 2024 Crunchy Data Solutions, Inc. | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package kubernetes | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"sync" | ||
"time" | ||
|
||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/util/sets" | ||
"k8s.io/apimachinery/pkg/version" | ||
"k8s.io/client-go/discovery" | ||
"k8s.io/client-go/rest" | ||
|
||
"github.com/crunchydata/postgres-operator/internal/logging" | ||
) | ||
|
||
type Version = version.Info | ||
|
||
// DiscoveryRunner implements [APIs] by reading from a Kubernetes API client. | ||
// Its methods are safe to call concurrently. | ||
type DiscoveryRunner struct { | ||
// NOTE(tracing): The methods of [discovery.DiscoveryClient] do not take | ||
// a Context so their API calls won't have a parent span. | ||
Client interface { | ||
ServerGroups() (*metav1.APIGroupList, error) | ||
ServerResourcesForGroupVersion(string) (*metav1.APIResourceList, error) | ||
ServerVersion() (*version.Info, error) | ||
} | ||
|
||
refresh time.Duration | ||
|
||
// relevant is the list of APIs to examine during Read. | ||
// Has, HasAll, and HasAny return false when this is empty. | ||
relevant []API | ||
|
||
have struct { | ||
sync.RWMutex | ||
APISet | ||
Version | ||
} | ||
} | ||
|
||
// NewDiscoveryRunner creates a [DiscoveryRunner] that periodically reads from | ||
// the Kubernetes at config. | ||
func NewDiscoveryRunner(config *rest.Config) (*DiscoveryRunner, error) { | ||
dc, err := discovery.NewDiscoveryClientForConfig(config) | ||
|
||
runner := &DiscoveryRunner{ | ||
Client: dc, | ||
refresh: 10 * time.Minute, | ||
relevant: []API{ | ||
// https://cert-manager.io/docs/usage/certificate | ||
// https://cert-manager.io/docs/trust/trust-manager | ||
{Group: "cert-manager.io", Kind: "Certificate"}, | ||
{Group: "trust.cert-manager.io", Kind: "Bundle"}, | ||
|
||
// https://gateway-api.sigs.k8s.io/api-types/referencegrant | ||
// https://kep.k8s.io/3766 | ||
{Group: "gateway.networking.k8s.io", Kind: "ReferenceGrant"}, | ||
|
||
// https://docs.openshift.com/container-platform/latest/authentication/managing-security-context-constraints.html | ||
{Group: "security.openshift.io", Kind: "SecurityContextConstraints"}, | ||
|
||
// https://docs.k8s.io/concepts/storage/volume-snapshots | ||
{Group: "snapshot.storage.k8s.io", Kind: "VolumeSnapshot"}, | ||
}, | ||
} | ||
|
||
return runner, err | ||
} | ||
|
||
// Has returns true when api is available in Kuberentes. | ||
func (r *DiscoveryRunner) Has(api API) bool { return r.HasAny(api) } | ||
|
||
// HasAll returns true when every api is available in Kubernetes. | ||
func (r *DiscoveryRunner) HasAll(api ...API) bool { | ||
r.have.RLock() | ||
defer r.have.RUnlock() | ||
return r.have.HasAll(api...) | ||
} | ||
|
||
// HasAny returns true when at least one api is available in Kubernetes. | ||
func (r *DiscoveryRunner) HasAny(api ...API) bool { | ||
r.have.RLock() | ||
defer r.have.RUnlock() | ||
return r.have.HasAny(api...) | ||
} | ||
|
||
// NeedLeaderElection returns false so that r runs on any [manager.Manager], | ||
// regardless of which is elected leader in the Kubernetes namespace. | ||
func (r *DiscoveryRunner) NeedLeaderElection() bool { return false } | ||
|
||
// Read fetches available APIs from Kubernetes. | ||
func (r *DiscoveryRunner) Read(ctx context.Context) error { | ||
return errors.Join(r.readAPIs(ctx), r.readVersion()) | ||
} | ||
|
||
func (r *DiscoveryRunner) readAPIs(ctx context.Context) error { | ||
// Build an index of the APIs we want to know about. | ||
wantAPIs := make(map[string]map[string]sets.Set[string]) | ||
for _, want := range r.relevant { | ||
if wantAPIs[want.Group] == nil { | ||
wantAPIs[want.Group] = make(map[string]sets.Set[string]) | ||
} | ||
if wantAPIs[want.Group][want.Version] == nil { | ||
wantAPIs[want.Group][want.Version] = sets.New[string]() | ||
} | ||
if want.Kind != "" { | ||
wantAPIs[want.Group][want.Version].Insert(want.Kind) | ||
} | ||
} | ||
|
||
// Fetch Groups and Versions from Kubernetes. | ||
groups, err := r.Client.ServerGroups() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Build an index of the Groups and GVs available in Kubernetes; | ||
// add GK and GVK for resources that we want to know about. | ||
haveAPIs := make(APISet) | ||
for _, apiG := range groups.Groups { | ||
haveG := apiG.Name | ||
haveAPIs.Insert(API{Group: haveG}) | ||
|
||
for _, apiGV := range apiG.Versions { | ||
haveV := apiGV.Version | ||
haveAPIs.Insert(API{Group: haveG, Version: haveV}) | ||
|
||
// Only fetch Resources when there are Kinds we want to know about. | ||
if wantAPIs[haveG][""].Len() == 0 && wantAPIs[haveG][haveV].Len() == 0 { | ||
continue | ||
} | ||
|
||
resources, err := r.Client.ServerResourcesForGroupVersion(apiGV.GroupVersion) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
for _, apiR := range resources.APIResources { | ||
haveK := apiR.Kind | ||
haveAPIs.Insert( | ||
API{Group: haveG, Kind: haveK}, | ||
API{Group: haveG, Kind: haveK, Version: haveV}, | ||
) | ||
} | ||
} | ||
} | ||
|
||
r.have.Lock() | ||
r.have.APISet = haveAPIs | ||
r.have.Unlock() | ||
|
||
r.have.RLock() | ||
defer r.have.RUnlock() | ||
logging.FromContext(ctx).V(1).Info("Found APIs", "index_size", r.have.APISet.Len()) | ||
|
||
return nil | ||
} | ||
|
||
func (r *DiscoveryRunner) readVersion() error { | ||
info, err := r.Client.ServerVersion() | ||
|
||
if info != nil && err == nil { | ||
r.have.Lock() | ||
r.have.Version = *info | ||
r.have.Unlock() | ||
} | ||
|
||
return err | ||
} | ||
|
||
// Start periodically reads the Kuberentes API. It blocks until ctx is cancelled. | ||
func (r *DiscoveryRunner) Start(ctx context.Context) error { | ||
ticker := time.NewTicker(r.refresh) | ||
defer ticker.Stop() | ||
|
||
log := logging.FromContext(ctx).WithValues("controller", "kubernetes") | ||
ctx = logging.NewContext(ctx, log) | ||
|
||
for { | ||
select { | ||
case <-ticker.C: | ||
if err := r.Read(ctx); err != nil { | ||
log.Error(err, "Unable to detect Kubernetes APIs") | ||
} | ||
case <-ctx.Done(): | ||
// TODO(controller-runtime): Fixed in v0.19.0 | ||
// https://github.com/kubernetes-sigs/controller-runtime/issues/1927 | ||
if errors.Is(ctx.Err(), context.Canceled) { | ||
return nil | ||
} | ||
return ctx.Err() | ||
} | ||
} | ||
} | ||
|
||
// Version returns the detected version of Kubernetes. | ||
func (r *DiscoveryRunner) Version() Version { | ||
r.have.RLock() | ||
defer r.have.RUnlock() | ||
return r.have.Version | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
// Copyright 2024 Crunchy Data Solutions, Inc. | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package kubernetes | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"gotest.tools/v3/assert" | ||
"sigs.k8s.io/controller-runtime/pkg/manager" | ||
|
||
"github.com/crunchydata/postgres-operator/internal/testing/require" | ||
) | ||
|
||
func TestDiscoveryRunnerInterfaces(t *testing.T) { | ||
var _ APIs = new(DiscoveryRunner) | ||
var _ manager.Runnable = new(DiscoveryRunner) | ||
|
||
var runnable manager.LeaderElectionRunnable = new(DiscoveryRunner) | ||
assert.Assert(t, false == runnable.NeedLeaderElection()) | ||
} | ||
|
||
func TestDiscoveryRunnerAPIs(t *testing.T) { | ||
ctx := context.Background() | ||
cfg, _ := require.Kubernetes2(t) | ||
require.ParallelCapacity(t, 0) | ||
|
||
runner, err := NewDiscoveryRunner(cfg) | ||
assert.NilError(t, err) | ||
|
||
// Search for an API that should always exist. | ||
runner.relevant = append(runner.relevant, API{Kind: "Pod"}) | ||
assert.NilError(t, runner.readAPIs(ctx)) | ||
|
||
assert.Assert(t, runner.Has(API{Kind: "Pod"})) | ||
assert.Assert(t, runner.HasAll(API{Kind: "Pod"}, API{Kind: "Secret"})) | ||
assert.Assert(t, runner.HasAny(API{Kind: "Pod"}, API{Kind: "NotGonnaExist"})) | ||
assert.Assert(t, !runner.Has(API{Kind: "NotGonnaExist"})) | ||
} | ||
|
||
func TestDiscoveryRunnerVersion(t *testing.T) { | ||
cfg, _ := require.Kubernetes2(t) | ||
require.ParallelCapacity(t, 0) | ||
|
||
runner, err := NewDiscoveryRunner(cfg) | ||
assert.NilError(t, err) | ||
assert.NilError(t, runner.readVersion()) | ||
|
||
version := runner.Version() | ||
assert.Assert(t, version.Major != "", "got %#v", version) | ||
assert.Assert(t, version.Minor != "", "got %#v", version) | ||
assert.Assert(t, version.String() != "", "got %q", version.String()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters