Getting Started Pt 2

Getting Started with Autopilot Part 2 - Implementing the Workers

In part 2 of the Getting Started tutorial for AutoPilot, we’ll implement business logic for the operator in the form of “workers” that process the AutoRoute CRD.

To see the completed code for this tutorial, check out https://github.com/solo-io/autorouter

About Autopilot Workers

Workers are generated by Autopilot for each phase defined in the phases of your autopilot.yaml

Each worker is responsible for handling a different phase of the top-level CRD.

Autopilot executes its main reconcile function whenever a top-level Custom Resource is created or updated. It can also be called on deletions by setting enableFinalizer: true in your autopilot.yaml. The main reconcile function will be called whenever an input or output resource is modified as well, to ensure dependencies between the top-level CRD and other resources is respected.

When the main reconcile function is called, it will defer to a user-defined Worker which corresponds to the current Phase of the top-level Custom Resource. The main reconcile function constructs a one-off instance of the worker whose Sync function is invoked. This allows Autopilot users to write workers as stateless one-off “serverless-style” functions.

Workers pass action to one another (or requeue themselves) by progressing the CRD state (or persisting it in its current state).

Workers are intended to share state via the Status field on their CRD. In addition to returning the next Phase, the worker Sync function may return an optional StatusInfo, defined in the user spec.go. This allows passing arbitrary data between workers, while maintaining stateless workers.

In this example we have 3 workers to implement: - Initializing worker: the Initializing worker will simply mark the CRD with the Syncing phase to indicate to the user that processing has started - Syncing worker: the Syncing worker will ensure all the necessary Kubernetes and Istio configuration is applied to match the AutoRoute desired state. - Ready worker: the Ready worker indicates the AutoRoute is ready to serve traffic. The Ready worker will check that the Sync result matches the desired state of the cluster. If the cluster is out-of-sync, the Ready worker sends us back to the Syncing state for a re-sync.

Tutorial

Prerequisites

Update the API Spec

To begin our implementation, we’ll define an the API our operator will serve. Generated code for the top-level Kubernetes CRD lives in <project root>/pkg/apis/<plural lowercase kind>/<version>

Users are expected to modify the spec.go and any non-generated files in this directory. The doc, registry, types, phases, and zz_deepcopy .go files will all be overwritten on re-generate.

We’ll put define our API as Go structs in pkg/apis/autoroutes/v1/spec.go. Opening it up, here’s how it currently looks:

package v1

// EDIT THIS FILE!  This file should contain the definitions for your API Spec and Status!
// NOTE: json tags are required.  Any new fields you add must have json tags for the fields to be serialized.
// Important: Run "autopilot generate" to regenerate code after modifying this file

// AutoRouteSpec defines the desired state of AutoRoute
// +k8s:openapi-gen=true
type AutoRouteSpec struct {
	// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
}

// AutoRouteStatusInfo defines an observed condition of AutoRoute
// +k8s:openapi-gen=true
type AutoRouteStatusInfo struct {
	// INSERT ADDITIONAL STATUS FIELDS - observed state of cluster
}

Fields the user should configure (reflecting the desired configuration) should go in the Spec struct.

Fields set by the controller for purposes of observability / persisting state should go in the StatusInfo struct.

Let’s update our Spec and StatusInfo. Paste the following into pkg/apis/autoroutes/v1/spec.go:

package v1

import (
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// AutoRouteSpec defines the desired state of AutoRoute
// +k8s:openapi-gen=true
type AutoRouteSpec struct {
	// Users will provide us with a LabelSelector that tells us which deployments to select
	DeploymentSelector metav1.LabelSelector `json:"deploymentSelector"`
}

// AutoRouteStatusInfo defines an observed condition of AutoRoute
// +k8s:openapi-gen=true
type AutoRouteStatusInfo struct {
	// The Operator will record all the deployments that have routes.
	// If this falls out of sync with the set of routes, the operator will resync
	SyncedDeployments []string `json:"syncedDeployments"`
}

Once we’ve updated the spec.go, we need to re-run our deepcopy-gen:

ap generate --deepcopy-only
INFO[0000] Generating Deepcopy types for autorouter.examples.io/pkg/apis/autoroutes/v1
INFO[0000] Generating Deepcopy code for API: &args.GeneratorArgs{InputDirs:[]string{"./pkg/apis/autoroutes/v1"}, OutputBase:"", OutputPackagePath:"./pkg/apis/autoroutes/v1", OutputFileBaseName:"zz_generated.deepcopy", GoHeaderFilePath:"hack/boilerplate/boilerplate.go.txt", GeneratedByCommentTemplate:"// Code generated by GENERATOR_NAME. DO NOT EDIT.", VerifyOnly:false, IncludeTestFiles:false, GeneratedBuildTag:"ignore_autogenerated", CustomArgs:(*generators.CustomArgs)(0xc000355600), defaultCommandLineFlags:false}

Great! Now let’s implement our workers

Update the Initializing Worker

Take a look at the generated worker file in pkg/workers/initializing/worker.go:

package initializing

import (
	"context"

	"github.com/go-logr/logr"
	"github.com/solo-io/autopilot/pkg/ezkube"

	v1 "autorouter.examples.io/pkg/apis/autoroutes/v1"
)

// EDIT THIS FILE!  THIS IS SCAFFOLDING FOR YOU TO OWN!

type Worker struct {
	Client ezkube.Client
	Logger logr.Logger
}

func (w *Worker) Sync(ctx context.Context, autoRoute *v1.AutoRoute) (v1.AutoRoutePhase, *v1.AutoRouteStatusInfo, error) {
	panic("implement me!")
}

This file shows the generated stubs for implementing the Initializing worker. The input and output parameters for Sync are defined in the autopilot.yaml. Once we modify this file it will no longer be regenerated for us - ap will skip it when performing file generation.

Let’s replace the content of pkg/workers/initializing/worker.go with the following:

package initializing

import (
	"context"

	"github.com/go-logr/logr"
	"github.com/solo-io/autopilot/pkg/ezkube"

	v1 "autorouter.examples.io/pkg/apis/autoroutes/v1"
)

// The initializing worker's job is simply to set the phase to "Syncing"
// This tells the user that processing has started
type Worker struct {
	Client ezkube.Client
	Logger logr.Logger
}

func (w *Worker) Sync(ctx context.Context, autoRoute *v1.AutoRoute) (v1.AutoRoutePhase, *v1.AutoRouteStatusInfo, error) {
	// advance to the Syncing state to let the user know the auto route has been processed
	return v1.AutoRoutePhaseSyncing, nil, nil
}

This worker is pretty simple. Let’s move on to the next one. This will be the meat of our operator.

Update the Syncing Worker

The syncing worker generates the Istio Gateway, Istio Virtual Services, and Kubernetes Services required to route ingress traffic to all the deployments selected by a single AutoRoute.

Paste the following implementation into pkg/workers/syncing/worker.go:

package syncing

import (
	"context"

	"autorouter.examples.io/pkg/parameters"
	v1alpha3spec "istio.io/api/networking/v1alpha3"
	"istio.io/client-go/pkg/apis/networking/v1alpha3"
	corev1 "k8s.io/api/core/v1"

	"github.com/go-logr/logr"
	"github.com/solo-io/autopilot/pkg/ezkube"

	v1 "autorouter.examples.io/pkg/apis/autoroutes/v1"
	appsv1 "k8s.io/api/apps/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/labels"
)

// The Syncing worker will resync the existing services, gateways, and virtual services to ensure that a route
// is exposed on the Istio ingress gateway
type Worker struct {
	Client ezkube.Client
	Logger logr.Logger
}

func (w *Worker) Sync(ctx context.Context, route *v1.AutoRoute, inputs Inputs) (Outputs, v1.AutoRoutePhase, *v1.AutoRouteStatusInfo, error) {
	// get all matching deployments
	inputDeployments, err := w.getMatchingDeployments(route, inputs.Deployments.Items)
	if err != nil {
		return Outputs{}, "", nil, err
	}

	// construct a k8s service and istio vservice for each deployment
	kubeServices, virtualServices, err := w.makeOutputKubeServices(route, inputDeployments)
	if err != nil {
		return Outputs{}, "", nil, err
	}

	// construct one gateway for the vservices
	// will match on hostname <*.route name>
	desiredGateway := v1alpha3.Gateway{
		ObjectMeta: metav1.ObjectMeta{
			Name:      route.Name,
			Namespace: route.Namespace,
		},
		// attach to use the istioingressgateawy (default)
		Spec: v1alpha3spec.Gateway{
			Selector: map[string]string{"istio": "ingressgateway"},

			Servers: []*v1alpha3spec.Server{{
				Port: &v1alpha3spec.Port{
					Number:   80,
					Name:     "http",
					Protocol: "HTTP",
				},
				Hosts: []string{"*." + route.Name},
			}},
		},
	}

	// construct some status info for the operation we performed
	status := &v1.AutoRouteStatusInfo{
		SyncedDeployments: deploymentNames(inputDeployments),
	}

	w.Logger.WithValues(
		"status", status,
		"gateway", desiredGateway.Name,
		"virtual services", len(virtualServices),
		"kube services", len(kubeServices),
	).Info("ensuring the gateway, services and virtual service outputs are created")

	// construct the set of outputs we will return
	outputs := Outputs{
		Services: parameters.Services{
			Items: kubeServices,
		},
		VirtualServices: parameters.VirtualServices{
			Items: virtualServices,
		},
		Gateways: parameters.Gateways{
			Items: []v1alpha3.Gateway{
				desiredGateway,
			},
		},
	}

	return outputs, v1.AutoRoutePhaseReady, status, nil
}

func deploymentNames(deployments []appsv1.Deployment) []string {
	var names []string
	for _, deployment := range deployments {
		names = append(names, deployment.Name)
	}
	return names
}

func (w *Worker) getMatchingDeployments(route *v1.AutoRoute, deployments []appsv1.Deployment) ([]appsv1.Deployment, error) {
	// get the selector from the autoRoute
	// we'll use this to match the deployments from our inputs
	selector, err := metav1.LabelSelectorAsSelector(&route.Spec.DeploymentSelector)
	if err != nil {
		return deployments, err
	}

	w.Logger.Info("cycle through each deployment and check that the labels match our selector")
	var matchingDeployments []appsv1.Deployment
	for _, deployment := range deployments {
		labelsToMatch := labels.Set(deployment.Labels)
		if deployment.Namespace == route.Namespace && selector.Matches(labelsToMatch) {
			w.Logger.Info("found matching deployment", "deployment", deployment.Name)
			matchingDeployments = append(matchingDeployments, deployment)
		}
	}

	return matchingDeployments, nil
}

func (w *Worker) makeOutputKubeServices(route *v1.AutoRoute, deployments []appsv1.Deployment) ([]corev1.Service, []v1alpha3.VirtualService, error) {
	var desiredKubeServices []corev1.Service
	var desiredVirtualServices []v1alpha3.VirtualService

	for _, deployment := range deployments {
		logger := w.Logger.WithValues("deployment", deployment)

		hostname := deployment.Name + "." + route.Name
		logger.Info("adding hostname", "hostname", hostname)

		logger.Info("get the target port from the Deployment's first ContainerPort")
		var targetPort int32
	getFirstPort:
		for _, container := range deployment.Spec.Template.Spec.Containers {
			for _, port := range container.Ports {
				targetPort = port.ContainerPort
				break getFirstPort
			}
		}

		if targetPort == 0 {
			logger.Info("no ports to route to")
			continue
		}

		deploymentSelector := deployment.Spec.Selector

		w.Logger.Info("create a service for the deployment")
		desiredKubeServices = append(desiredKubeServices, corev1.Service{
			ObjectMeta: metav1.ObjectMeta{
				Name:      deployment.Name,
				Namespace: route.Namespace,
			},
			Spec: corev1.ServiceSpec{
				Ports: []corev1.ServicePort{{
					Name: "http",
					Port: targetPort,
				}},
				Selector: deploymentSelector.MatchLabels,
			},
		})

		logger.Info("attach to the gateway we will create for the AutoRoute")
		logger.Info("create a simple route to the service created for the deployment")
		desiredVirtualServices = append(desiredVirtualServices, v1alpha3.VirtualService{
			ObjectMeta: metav1.ObjectMeta{
				Name:      hostname,
				Namespace: route.Namespace,
			},
			Spec: v1alpha3spec.VirtualService{
				Hosts:    []string{hostname},
				Gateways: []string{route.Name},
				Http: []*v1alpha3spec.HTTPRoute{{
					Match: []*v1alpha3spec.HTTPMatchRequest{{
						Uri: &v1alpha3spec.StringMatch{
							MatchType: &v1alpha3spec.StringMatch_Prefix{
								Prefix: "/",
							},
						},
					}},
					Route: []*v1alpha3spec.HTTPRouteDestination{{
						Destination: &v1alpha3spec.Destination{
							Port: &v1alpha3spec.PortSelector{
								Number: uint32(targetPort),
							},
							Host: deployment.Name,
						},
					}},
				}},
			},
		})
	}

	return desiredKubeServices, desiredVirtualServices, nil
}

The above code returns a set of Outputs which includes the resources that need to be applied in the cluster to ensure we can reach our deployments.

Update the Ready Worker

The ready worker has the job of watching the deployments and ensuring our AutoRouter has routes for all of them. If not, it will send us back to the Syncing worker for another resync.

Paste the following into pkg/workers/ready/worker.go:

package ready

import (
	"context"
	"sort"

	"k8s.io/apimachinery/pkg/labels"

	"github.com/go-logr/logr"
	"github.com/solo-io/autopilot/pkg/ezkube"
	appsv1 "k8s.io/api/apps/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

	v1 "autorouter.examples.io/pkg/apis/autoroutes/v1"
)

type Worker struct {
	Client ezkube.Client
	Logger logr.Logger
}

func (w *Worker) Sync(ctx context.Context, route *v1.AutoRoute, inputs Inputs) (v1.AutoRoutePhase, *v1.AutoRouteStatusInfo, error) {

	// check if we need resync
	needsResync, err := w.needsResync(route, inputs.Deployments.Items)
	if err != nil {
		// errors can be returned; the worker will be retried (with backoff)
		return "", nil, err
	}

	if needsResync {
		// if we need resync, return to the resync phase so that worker can
		// restore us to Ready
		return v1.AutoRoutePhaseSyncing, nil, nil
	} else {
		// otherwise continue in Ready phase
		// worker will be called at work interval
		return v1.AutoRoutePhaseReady, nil, nil
	}
}

func (w *Worker) needsResync(route *v1.AutoRoute, deployments []appsv1.Deployment) (bool, error) {
	expectedDeployments := route.Status.SyncedDeployments
	actualDeployments, err := w.getMatchingDeployments(route, deployments)
	if err != nil {
		return false, err
	}
	sort.Strings(expectedDeployments)
	sort.Strings(actualDeployments)

	// if the expected routes exist, do nothing (resync on interval)
	return !stringSlicesEqual(expectedDeployments, actualDeployments), nil
}

func stringSlicesEqual(a, b []string) bool {
	if len(a) != len(b) {
		return false
	}
	for i, v := range a {
		if v != b[i] {
			return false
		}
	}
	return true
}

func (w *Worker) getMatchingDeployments(route *v1.AutoRoute, deployments []appsv1.Deployment) ([]string, error) {
	// get the selector from the autoRoute
	// we'll use this to match the deployments from our inputs
	selector, err := metav1.LabelSelectorAsSelector(&route.Spec.DeploymentSelector)
	if err != nil {
		return nil, err
	}

	w.Logger.Info("cycle through each deployment and check that the labels match our selector")
	var matchingDeployments []string
	for _, deployment := range deployments {
		labelsToMatch := labels.Set(deployment.Labels)
		if deployment.Namespace == route.Namespace && selector.Matches(labelsToMatch) {
			w.Logger.Info("found matching deployment", "deployment", deployment.Name)
			matchingDeployments = append(matchingDeployments, deployment.Name)
		}
	}

	return matchingDeployments, nil
}

Once our 3 workers are implemented, we’re done with the Operator!

Move to part 3 of our tutorial where we’ll build and deploy our changes, and test using a real AutoRoute custom resource.