Getting Started Pt 2
Getting Started with Autopilot Part 2 - Implementing the Workers
In part 2 of the Getting Started tutorial for AutoPilot, we’ll implement business logic for the operator in the form of “workers” that process the AutoRoute
CRD.
To see the completed code for this tutorial, check out https://github.com/solo-io/autorouter
About Autopilot Workers
Workers
are generated by Autopilot for each phase
defined in
the phases
of your autopilot.yaml
Each worker is responsible for handling a different phase
of the
top-level CRD.
Autopilot executes its main reconcile function whenever a top-level Custom Resource is created or updated. It can also be called on deletions by setting enableFinalizer: true
in your autopilot.yaml
. The main reconcile function will be called whenever an input
or output
resource is modified as well, to ensure dependencies between the top-level CRD and other resources is respected.
When the main reconcile function is called, it will defer to a user-defined Worker
which corresponds to the current Phase
of the top-level Custom Resource. The main reconcile function constructs a one-off instance of the worker whose Sync
function is invoked. This allows Autopilot users to write workers as stateless one-off “serverless-style” functions.
Workers pass action to one another (or requeue themselves) by progressing the CRD state (or persisting it in its current state).
Workers are intended to share state via the Status
field on their CRD. In addition to returning the next Phase
, the worker Sync
function may return an optional StatusInfo
, defined in the user spec.go
. This allows passing arbitrary data between workers, while maintaining stateless workers.
In this example we have 3 workers to implement:
- Initializing
worker: the Initializing worker will simply mark the CRD
with the Syncing
phase to indicate to the user that processing has started
- Syncing
worker: the Syncing worker will ensure all the necessary Kubernetes and Istio configuration is applied to match the AutoRoute
desired state.
- Ready
worker: the Ready worker indicates the AutoRoute is ready to serve traffic. The Ready worker will check that the Sync result matches the desired state of the cluster. If the cluster is out-of-sync, the Ready worker sends us back to the Syncing
state for a re-sync.
Tutorial
Prerequisites
- Completed part one of this tutorial series.
Update the API Spec
To begin our implementation, we’ll define an the API our operator will serve.
Generated code for the top-level Kubernetes CRD lives in <project root>/pkg/apis/<plural lowercase kind>/<version>
Users are expected to modify the spec.go
and any non-generated files in this directory. The doc, registry, types, phases, and zz_deepcopy
.go
files will all be overwritten on re-generate.
We’ll put define our API as Go structs in pkg/apis/autoroutes/v1/spec.go
. Opening it up, here’s how it currently looks:
package v1
// EDIT THIS FILE! This file should contain the definitions for your API Spec and Status!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
// Important: Run "autopilot generate" to regenerate code after modifying this file
// AutoRouteSpec defines the desired state of AutoRoute
// +k8s:openapi-gen=true
type AutoRouteSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
}
// AutoRouteStatusInfo defines an observed condition of AutoRoute
// +k8s:openapi-gen=true
type AutoRouteStatusInfo struct {
// INSERT ADDITIONAL STATUS FIELDS - observed state of cluster
}
Fields the user should configure (reflecting the desired configuration) should go in the Spec
struct.
Fields set by the controller for purposes of observability / persisting state should go in the StatusInfo
struct.
Let’s update our Spec
and StatusInfo
. Paste the following into pkg/apis/autoroutes/v1/spec.go
:
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// AutoRouteSpec defines the desired state of AutoRoute
// +k8s:openapi-gen=true
type AutoRouteSpec struct {
// Users will provide us with a LabelSelector that tells us which deployments to select
DeploymentSelector metav1.LabelSelector `json:"deploymentSelector"`
}
// AutoRouteStatusInfo defines an observed condition of AutoRoute
// +k8s:openapi-gen=true
type AutoRouteStatusInfo struct {
// The Operator will record all the deployments that have routes.
// If this falls out of sync with the set of routes, the operator will resync
SyncedDeployments []string `json:"syncedDeployments"`
}
Once we’ve updated the spec.go
, we need to re-run our deepcopy-gen:
ap generate --deepcopy-only
INFO[0000] Generating Deepcopy types for autorouter.examples.io/pkg/apis/autoroutes/v1
INFO[0000] Generating Deepcopy code for API: &args.GeneratorArgs{InputDirs:[]string{"./pkg/apis/autoroutes/v1"}, OutputBase:"", OutputPackagePath:"./pkg/apis/autoroutes/v1", OutputFileBaseName:"zz_generated.deepcopy", GoHeaderFilePath:"hack/boilerplate/boilerplate.go.txt", GeneratedByCommentTemplate:"// Code generated by GENERATOR_NAME. DO NOT EDIT.", VerifyOnly:false, IncludeTestFiles:false, GeneratedBuildTag:"ignore_autogenerated", CustomArgs:(*generators.CustomArgs)(0xc000355600), defaultCommandLineFlags:false}
Great! Now let’s implement our workers
Update the Initializing Worker
Take a look at the generated worker file in pkg/workers/initializing/worker.go
:
package initializing
import (
"context"
"github.com/go-logr/logr"
"github.com/solo-io/autopilot/pkg/ezkube"
v1 "autorouter.examples.io/pkg/apis/autoroutes/v1"
)
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
type Worker struct {
Client ezkube.Client
Logger logr.Logger
}
func (w *Worker) Sync(ctx context.Context, autoRoute *v1.AutoRoute) (v1.AutoRoutePhase, *v1.AutoRouteStatusInfo, error) {
panic("implement me!")
}
This file shows the generated stubs for implementing the Initializing
worker. The input and output parameters for Sync
are defined in the
autopilot.yaml
. Once we modify this file it will no longer be regenerated for us - ap
will skip it when performing file generation.
Let’s replace the content of pkg/workers/initializing/worker.go
with the following:
package initializing
import (
"context"
"github.com/go-logr/logr"
"github.com/solo-io/autopilot/pkg/ezkube"
v1 "autorouter.examples.io/pkg/apis/autoroutes/v1"
)
// The initializing worker's job is simply to set the phase to "Syncing"
// This tells the user that processing has started
type Worker struct {
Client ezkube.Client
Logger logr.Logger
}
func (w *Worker) Sync(ctx context.Context, autoRoute *v1.AutoRoute) (v1.AutoRoutePhase, *v1.AutoRouteStatusInfo, error) {
// advance to the Syncing state to let the user know the auto route has been processed
return v1.AutoRoutePhaseSyncing, nil, nil
}
This worker is pretty simple. Let’s move on to the next one. This will be the meat of our operator.
Update the Syncing Worker
The syncing worker generates the Istio Gateway, Istio Virtual Services, and Kubernetes Services required to route ingress traffic to all the deployments selected by a single AutoRoute
.
Paste the following implementation into pkg/workers/syncing/worker.go
:
package syncing
import (
"context"
"autorouter.examples.io/pkg/parameters"
v1alpha3spec "istio.io/api/networking/v1alpha3"
"istio.io/client-go/pkg/apis/networking/v1alpha3"
corev1 "k8s.io/api/core/v1"
"github.com/go-logr/logr"
"github.com/solo-io/autopilot/pkg/ezkube"
v1 "autorouter.examples.io/pkg/apis/autoroutes/v1"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
)
// The Syncing worker will resync the existing services, gateways, and virtual services to ensure that a route
// is exposed on the Istio ingress gateway
type Worker struct {
Client ezkube.Client
Logger logr.Logger
}
func (w *Worker) Sync(ctx context.Context, route *v1.AutoRoute, inputs Inputs) (Outputs, v1.AutoRoutePhase, *v1.AutoRouteStatusInfo, error) {
// get all matching deployments
inputDeployments, err := w.getMatchingDeployments(route, inputs.Deployments.Items)
if err != nil {
return Outputs{}, "", nil, err
}
// construct a k8s service and istio vservice for each deployment
kubeServices, virtualServices, err := w.makeOutputKubeServices(route, inputDeployments)
if err != nil {
return Outputs{}, "", nil, err
}
// construct one gateway for the vservices
// will match on hostname <*.route name>
desiredGateway := v1alpha3.Gateway{
ObjectMeta: metav1.ObjectMeta{
Name: route.Name,
Namespace: route.Namespace,
},
// attach to use the istioingressgateawy (default)
Spec: v1alpha3spec.Gateway{
Selector: map[string]string{"istio": "ingressgateway"},
Servers: []*v1alpha3spec.Server{{
Port: &v1alpha3spec.Port{
Number: 80,
Name: "http",
Protocol: "HTTP",
},
Hosts: []string{"*." + route.Name},
}},
},
}
// construct some status info for the operation we performed
status := &v1.AutoRouteStatusInfo{
SyncedDeployments: deploymentNames(inputDeployments),
}
w.Logger.WithValues(
"status", status,
"gateway", desiredGateway.Name,
"virtual services", len(virtualServices),
"kube services", len(kubeServices),
).Info("ensuring the gateway, services and virtual service outputs are created")
// construct the set of outputs we will return
outputs := Outputs{
Services: parameters.Services{
Items: kubeServices,
},
VirtualServices: parameters.VirtualServices{
Items: virtualServices,
},
Gateways: parameters.Gateways{
Items: []v1alpha3.Gateway{
desiredGateway,
},
},
}
return outputs, v1.AutoRoutePhaseReady, status, nil
}
func deploymentNames(deployments []appsv1.Deployment) []string {
var names []string
for _, deployment := range deployments {
names = append(names, deployment.Name)
}
return names
}
func (w *Worker) getMatchingDeployments(route *v1.AutoRoute, deployments []appsv1.Deployment) ([]appsv1.Deployment, error) {
// get the selector from the autoRoute
// we'll use this to match the deployments from our inputs
selector, err := metav1.LabelSelectorAsSelector(&route.Spec.DeploymentSelector)
if err != nil {
return deployments, err
}
w.Logger.Info("cycle through each deployment and check that the labels match our selector")
var matchingDeployments []appsv1.Deployment
for _, deployment := range deployments {
labelsToMatch := labels.Set(deployment.Labels)
if deployment.Namespace == route.Namespace && selector.Matches(labelsToMatch) {
w.Logger.Info("found matching deployment", "deployment", deployment.Name)
matchingDeployments = append(matchingDeployments, deployment)
}
}
return matchingDeployments, nil
}
func (w *Worker) makeOutputKubeServices(route *v1.AutoRoute, deployments []appsv1.Deployment) ([]corev1.Service, []v1alpha3.VirtualService, error) {
var desiredKubeServices []corev1.Service
var desiredVirtualServices []v1alpha3.VirtualService
for _, deployment := range deployments {
logger := w.Logger.WithValues("deployment", deployment)
hostname := deployment.Name + "." + route.Name
logger.Info("adding hostname", "hostname", hostname)
logger.Info("get the target port from the Deployment's first ContainerPort")
var targetPort int32
getFirstPort:
for _, container := range deployment.Spec.Template.Spec.Containers {
for _, port := range container.Ports {
targetPort = port.ContainerPort
break getFirstPort
}
}
if targetPort == 0 {
logger.Info("no ports to route to")
continue
}
deploymentSelector := deployment.Spec.Selector
w.Logger.Info("create a service for the deployment")
desiredKubeServices = append(desiredKubeServices, corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: deployment.Name,
Namespace: route.Namespace,
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{{
Name: "http",
Port: targetPort,
}},
Selector: deploymentSelector.MatchLabels,
},
})
logger.Info("attach to the gateway we will create for the AutoRoute")
logger.Info("create a simple route to the service created for the deployment")
desiredVirtualServices = append(desiredVirtualServices, v1alpha3.VirtualService{
ObjectMeta: metav1.ObjectMeta{
Name: hostname,
Namespace: route.Namespace,
},
Spec: v1alpha3spec.VirtualService{
Hosts: []string{hostname},
Gateways: []string{route.Name},
Http: []*v1alpha3spec.HTTPRoute{{
Match: []*v1alpha3spec.HTTPMatchRequest{{
Uri: &v1alpha3spec.StringMatch{
MatchType: &v1alpha3spec.StringMatch_Prefix{
Prefix: "/",
},
},
}},
Route: []*v1alpha3spec.HTTPRouteDestination{{
Destination: &v1alpha3spec.Destination{
Port: &v1alpha3spec.PortSelector{
Number: uint32(targetPort),
},
Host: deployment.Name,
},
}},
}},
},
})
}
return desiredKubeServices, desiredVirtualServices, nil
}
The above code returns a set of Outputs
which includes the resources
that need to be applied in the cluster to ensure we can reach our deployments.
Update the Ready Worker
The ready worker has the job of watching the deployments and ensuring our AutoRouter
has routes for all of them. If not, it will send us back to the Syncing
worker for another resync.
Paste the following into pkg/workers/ready/worker.go
:
package ready
import (
"context"
"sort"
"k8s.io/apimachinery/pkg/labels"
"github.com/go-logr/logr"
"github.com/solo-io/autopilot/pkg/ezkube"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "autorouter.examples.io/pkg/apis/autoroutes/v1"
)
type Worker struct {
Client ezkube.Client
Logger logr.Logger
}
func (w *Worker) Sync(ctx context.Context, route *v1.AutoRoute, inputs Inputs) (v1.AutoRoutePhase, *v1.AutoRouteStatusInfo, error) {
// check if we need resync
needsResync, err := w.needsResync(route, inputs.Deployments.Items)
if err != nil {
// errors can be returned; the worker will be retried (with backoff)
return "", nil, err
}
if needsResync {
// if we need resync, return to the resync phase so that worker can
// restore us to Ready
return v1.AutoRoutePhaseSyncing, nil, nil
} else {
// otherwise continue in Ready phase
// worker will be called at work interval
return v1.AutoRoutePhaseReady, nil, nil
}
}
func (w *Worker) needsResync(route *v1.AutoRoute, deployments []appsv1.Deployment) (bool, error) {
expectedDeployments := route.Status.SyncedDeployments
actualDeployments, err := w.getMatchingDeployments(route, deployments)
if err != nil {
return false, err
}
sort.Strings(expectedDeployments)
sort.Strings(actualDeployments)
// if the expected routes exist, do nothing (resync on interval)
return !stringSlicesEqual(expectedDeployments, actualDeployments), nil
}
func stringSlicesEqual(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i, v := range a {
if v != b[i] {
return false
}
}
return true
}
func (w *Worker) getMatchingDeployments(route *v1.AutoRoute, deployments []appsv1.Deployment) ([]string, error) {
// get the selector from the autoRoute
// we'll use this to match the deployments from our inputs
selector, err := metav1.LabelSelectorAsSelector(&route.Spec.DeploymentSelector)
if err != nil {
return nil, err
}
w.Logger.Info("cycle through each deployment and check that the labels match our selector")
var matchingDeployments []string
for _, deployment := range deployments {
labelsToMatch := labels.Set(deployment.Labels)
if deployment.Namespace == route.Namespace && selector.Matches(labelsToMatch) {
w.Logger.Info("found matching deployment", "deployment", deployment.Name)
matchingDeployments = append(matchingDeployments, deployment.Name)
}
}
return matchingDeployments, nil
}
Once our 3 workers are implemented, we’re done with the Operator!
Move to part 3 of our tutorial where we’ll build and deploy our changes, and test using a real AutoRoute
custom resource.