Skip to content

Commit

Permalink
Add a Runner that watches for Kubernetes APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
cbandy committed Nov 25, 2024
1 parent 31bb0fa commit 4910dd4
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 35 deletions.
45 changes: 10 additions & 35 deletions cmd/postgres-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"go.opentelemetry.io/otel"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/healthz"

Expand All @@ -28,6 +27,7 @@ import (
"github.com/crunchydata/postgres-operator/internal/controller/standalone_pgadmin"
"github.com/crunchydata/postgres-operator/internal/feature"
"github.com/crunchydata/postgres-operator/internal/initialize"
"github.com/crunchydata/postgres-operator/internal/kubernetes"
"github.com/crunchydata/postgres-operator/internal/logging"
"github.com/crunchydata/postgres-operator/internal/naming"
"github.com/crunchydata/postgres-operator/internal/registration"
Expand Down Expand Up @@ -146,6 +146,10 @@ func main() {
// deprecation warnings when using an older version of a resource for backwards compatibility).
rest.SetDefaultWarningHandler(rest.NoWarnings{})

k8s, err := kubernetes.NewDiscoveryRunner(cfg)
assertNoError(err)
assertNoError(k8s.Read(ctx))

options, err := initManager()
assertNoError(err)

Expand All @@ -159,11 +163,12 @@ func main() {

mgr, err := runtime.NewManager(cfg, options)
assertNoError(err)
assertNoError(mgr.Add(k8s))

openshift := isOpenshift(cfg)
if openshift {
log.Info("detected OpenShift environment")
}
openshift := k8s.Has(kubernetes.API{
Group: "security.openshift.io", Kind: "SecurityContextConstraints",
})
log.Info("Connected to Kubernetes", "api", k8s.Version().String(), "openshift", openshift)

registrar, err := registration.NewRunner(os.Getenv("RSA_KEY"), os.Getenv("TOKEN_PATH"), shutdown)
assertNoError(err)
Expand Down Expand Up @@ -270,33 +275,3 @@ func addControllersToManager(mgr runtime.Manager, openshift bool, log logging.Lo
os.Exit(1)
}
}

func isOpenshift(cfg *rest.Config) bool {
const sccGroupName, sccKind = "security.openshift.io", "SecurityContextConstraints"

client, err := discovery.NewDiscoveryClientForConfig(cfg)
assertNoError(err)

groups, err := client.ServerGroups()
if err != nil {
assertNoError(err)
}
for _, g := range groups.Groups {
if g.Name != sccGroupName {
continue
}
for _, v := range g.Versions {
resourceList, err := client.ServerResourcesForGroupVersion(v.GroupVersion)
if err != nil {
assertNoError(err)
}
for _, r := range resourceList.APIResources {
if r.Kind == sccKind {
return true
}
}
}
}

return false
}
208 changes: 208 additions & 0 deletions internal/kubernetes/discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
// Copyright 2024 Crunchy Data Solutions, Inc.
//
// SPDX-License-Identifier: Apache-2.0

package kubernetes

import (
"context"
"errors"
"sync"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"

"github.com/crunchydata/postgres-operator/internal/logging"
)

type Version = version.Info

// DiscoveryRunner implements [APIs] by reading from a Kubernetes API client.
// Its methods are safe to call concurrently.
type DiscoveryRunner struct {
// NOTE(tracing): The methods of [discovery.DiscoveryClient] do not take
// a Context so their API calls won't have a parent span.
Client interface {
ServerGroups() (*metav1.APIGroupList, error)
ServerResourcesForGroupVersion(string) (*metav1.APIResourceList, error)
ServerVersion() (*version.Info, error)
}

refresh time.Duration

// relevant is the list of APIs to examine during Read.
// Has, HasAll, and HasAny return false when this is empty.
relevant []API

have struct {
sync.RWMutex
APISet
Version
}
}

// NewDiscoveryRunner creates a [DiscoveryRunner] that periodically reads from
// the Kubernetes at config.
func NewDiscoveryRunner(config *rest.Config) (*DiscoveryRunner, error) {
dc, err := discovery.NewDiscoveryClientForConfig(config)

runner := &DiscoveryRunner{
Client: dc,
refresh: 10 * time.Minute,
relevant: []API{
// https://cert-manager.io/docs/usage/certificate
// https://cert-manager.io/docs/trust/trust-manager
{Group: "cert-manager.io", Kind: "Certificate"},
{Group: "trust.cert-manager.io", Kind: "Bundle"},

// https://gateway-api.sigs.k8s.io/api-types/referencegrant
// https://kep.k8s.io/3766
{Group: "gateway.networking.k8s.io", Kind: "ReferenceGrant"},

// https://docs.openshift.com/container-platform/latest/authentication/managing-security-context-constraints.html
{Group: "security.openshift.io", Kind: "SecurityContextConstraints"},

// https://docs.k8s.io/concepts/storage/volume-snapshots
{Group: "snapshot.storage.k8s.io", Kind: "VolumeSnapshot"},
},
}

return runner, err
}

// Has returns true when api is available in Kuberentes.
func (r *DiscoveryRunner) Has(api API) bool { return r.HasAny(api) }

// HasAll returns true when every api is available in Kubernetes.
func (r *DiscoveryRunner) HasAll(api ...API) bool {
r.have.RLock()
defer r.have.RUnlock()
return r.have.HasAll(api...)
}

// HasAny returns true when at least one api is available in Kubernetes.
func (r *DiscoveryRunner) HasAny(api ...API) bool {
r.have.RLock()
defer r.have.RUnlock()
return r.have.HasAny(api...)
}

// NeedLeaderElection returns false so that r runs on any [manager.Manager],
// regardless of which is elected leader in the Kubernetes namespace.
func (r *DiscoveryRunner) NeedLeaderElection() bool { return false }

// Read fetches available APIs from Kubernetes.
func (r *DiscoveryRunner) Read(ctx context.Context) error {
return errors.Join(r.readAPIs(ctx), r.readVersion())
}

func (r *DiscoveryRunner) readAPIs(ctx context.Context) error {
// Build an index of the APIs we want to know about.
wantAPIs := make(map[string]map[string]sets.Set[string])
for _, want := range r.relevant {
if wantAPIs[want.Group] == nil {
wantAPIs[want.Group] = make(map[string]sets.Set[string])
}
if wantAPIs[want.Group][want.Version] == nil {
wantAPIs[want.Group][want.Version] = sets.New[string]()
}
if want.Kind != "" {
wantAPIs[want.Group][want.Version].Insert(want.Kind)
}
}

// Fetch Groups and Versions from Kubernetes.
groups, err := r.Client.ServerGroups()
if err != nil {
return err
}

// Build an index of the Groups and GVs available in Kubernetes;
// add GK and GVK for resources that we want to know about.
haveAPIs := make(APISet)
for _, apiG := range groups.Groups {
haveG := apiG.Name
haveAPIs.Insert(API{Group: haveG})

for _, apiGV := range apiG.Versions {
haveV := apiGV.Version
haveAPIs.Insert(API{Group: haveG, Version: haveV})

// Only fetch Resources when there are Kinds we want to know about.
if wantAPIs[haveG][""].Len() == 0 && wantAPIs[haveG][haveV].Len() == 0 {
continue
}

resources, err := r.Client.ServerResourcesForGroupVersion(apiGV.GroupVersion)
if err != nil {
return err
}

for _, apiR := range resources.APIResources {
haveK := apiR.Kind
haveAPIs.Insert(
API{Group: haveG, Kind: haveK},
API{Group: haveG, Kind: haveK, Version: haveV},
)
}
}
}

r.have.Lock()
r.have.APISet = haveAPIs
r.have.Unlock()

r.have.RLock()
defer r.have.RUnlock()
logging.FromContext(ctx).V(1).Info("Found APIs", "index_size", r.have.APISet.Len())

return nil
}

func (r *DiscoveryRunner) readVersion() error {
info, err := r.Client.ServerVersion()

if info != nil && err == nil {
r.have.Lock()
r.have.Version = *info
r.have.Unlock()
}

return err
}

// Start periodically reads the Kuberentes API. It blocks until ctx is cancelled.
func (r *DiscoveryRunner) Start(ctx context.Context) error {
ticker := time.NewTicker(r.refresh)
defer ticker.Stop()

log := logging.FromContext(ctx).WithValues("controller", "kubernetes")
ctx = logging.NewContext(ctx, log)

for {
select {
case <-ticker.C:
if err := r.Read(ctx); err != nil {
log.Error(err, "Unable to detect Kubernetes APIs")
}
case <-ctx.Done():
// TODO(controller-runtime): Fixed in v0.19.0
// https://github.com/kubernetes-sigs/controller-runtime/issues/1927
if errors.Is(ctx.Err(), context.Canceled) {
return nil
}
return ctx.Err()
}
}
}

// Version returns the detected version of Kubernetes.
func (r *DiscoveryRunner) Version() Version {
r.have.RLock()
defer r.have.RUnlock()
return r.have.Version
}
55 changes: 55 additions & 0 deletions internal/kubernetes/discovery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2024 Crunchy Data Solutions, Inc.
//
// SPDX-License-Identifier: Apache-2.0

package kubernetes

import (
"context"
"testing"

"gotest.tools/v3/assert"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/crunchydata/postgres-operator/internal/testing/require"
)

func TestDiscoveryRunnerInterfaces(t *testing.T) {
var _ APIs = new(DiscoveryRunner)
var _ manager.Runnable = new(DiscoveryRunner)

var runnable manager.LeaderElectionRunnable = new(DiscoveryRunner)
assert.Assert(t, false == runnable.NeedLeaderElection())
}

func TestDiscoveryRunnerAPIs(t *testing.T) {
ctx := context.Background()
cfg, _ := require.Kubernetes2(t)
require.ParallelCapacity(t, 0)

runner, err := NewDiscoveryRunner(cfg)
assert.NilError(t, err)

// Search for an API that should always exist.
runner.relevant = append(runner.relevant, API{Kind: "Pod"})
assert.NilError(t, runner.readAPIs(ctx))

assert.Assert(t, runner.Has(API{Kind: "Pod"}))
assert.Assert(t, runner.HasAll(API{Kind: "Pod"}, API{Kind: "Secret"}))
assert.Assert(t, runner.HasAny(API{Kind: "Pod"}, API{Kind: "NotGonnaExist"}))
assert.Assert(t, !runner.Has(API{Kind: "NotGonnaExist"}))
}

func TestDiscoveryRunnerVersion(t *testing.T) {
cfg, _ := require.Kubernetes2(t)
require.ParallelCapacity(t, 0)

runner, err := NewDiscoveryRunner(cfg)
assert.NilError(t, err)
assert.NilError(t, runner.readVersion())

version := runner.Version()
assert.Assert(t, version.Major != "", "got %#v", version)
assert.Assert(t, version.Minor != "", "got %#v", version)
assert.Assert(t, version.String() != "", "got %q", version.String())
}
1 change: 1 addition & 0 deletions internal/registration/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func (r *Runner) Start(ctx context.Context) error {
r.changed()
}
case <-ctx.Done():
// TODO(controller-runtime): Fixed in v0.19.0
// https://github.com/kubernetes-sigs/controller-runtime/issues/1927
if errors.Is(ctx.Err(), context.Canceled) {
return nil
Expand Down

0 comments on commit 4910dd4

Please sign in to comment.