Skip to content

Commit

Permalink
Add eviction-in-progress annotation during evacuation
Browse files Browse the repository at this point in the history
Add the `descheduler.alpha.kubernetes.io/eviction-in-progress` annotation
to virt-launcher pods during evacuation migration./
This annotation indicates pods whose eviction was initiated by an external component.

ref: kubevirt/community#258

Signed-off-by: fossedihelm <[email protected]>
  • Loading branch information
fossedihelm committed Jun 7, 2024
1 parent fb00b05 commit 21fc46e
Showing 1 changed file with 101 additions and 1 deletion.
102 changes: 101 additions & 1 deletion pkg/virt-controller/watch/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ const defaultFinalizedMigrationGarbageCollectionBuffer = 5
// cause the migration to fail when it could have reasonably succeeded.
const defaultCatchAllPendingTimeoutSeconds = int64(60 * 15)

const deschedulerEvictionInProgressAnnotation = "descheduler.alpha.kubernetes.io/eviction-in-progress"

var migrationBackoffError = errors.New(MigrationBackoffReason)

type MigrationController struct {
Expand Down Expand Up @@ -434,10 +436,16 @@ func (c *MigrationController) updateStatus(migration *virtv1.VirtualMachineInsta
// 3. Begin progressing migration state based on VMI's MigrationState status.
} else if vmi == nil {
migrationCopy.Status.Phase = virtv1.MigrationFailed
if err := c.removeDeschedulerAnnotationFromSourcePod(migrationCopy); err != nil {
return err
}
c.recorder.Eventf(migration, k8sv1.EventTypeWarning, FailedMigrationReason, "Migration failed because vmi does not exist.")
log.Log.Object(migration).Error("vmi does not exist")
} else if vmi.IsFinal() {
migrationCopy.Status.Phase = virtv1.MigrationFailed
if err := c.removeDeschedulerAnnotationFromSourcePod(migrationCopy); err != nil {
return err
}
c.recorder.Eventf(migration, k8sv1.EventTypeWarning, FailedMigrationReason, "Migration failed vmi shutdown during migration.")
log.Log.Object(migration).Error("Unable to migrate vmi because vmi is shutdown.")
} else if migration.DeletionTimestamp != nil && !c.isMigrationHandedOff(migration, vmi) {
Expand All @@ -451,30 +459,48 @@ func (c *MigrationController) updateStatus(migration *virtv1.VirtualMachineInsta
migrationCopy.Status.Conditions = append(migrationCopy.Status.Conditions, condition)
}
migrationCopy.Status.Phase = virtv1.MigrationFailed
if err := c.removeDeschedulerAnnotationFromSourcePod(migrationCopy); err != nil {
return err
}
} else if podExists && podIsDown(pod) {
migrationCopy.Status.Phase = virtv1.MigrationFailed
if err := c.removeDeschedulerAnnotationFromSourcePod(migrationCopy); err != nil {
return err
}
c.recorder.Eventf(migration, k8sv1.EventTypeWarning, FailedMigrationReason, "Migration failed because target pod shutdown during migration")
log.Log.Object(migration).Errorf("target pod %s/%s shutdown during migration", pod.Namespace, pod.Name)
} else if migration.TargetIsCreated() && !podExists {
migrationCopy.Status.Phase = virtv1.MigrationFailed
if err := c.removeDeschedulerAnnotationFromSourcePod(migrationCopy); err != nil {
return err
}
c.recorder.Eventf(migration, k8sv1.EventTypeWarning, FailedMigrationReason, "Migration target pod was removed during active migration.")
log.Log.Object(migration).Error("target pod disappeared during migration")
} else if migration.TargetIsHandedOff() && vmi.Status.MigrationState == nil {
migrationCopy.Status.Phase = virtv1.MigrationFailed
if err := c.removeDeschedulerAnnotationFromSourcePod(migrationCopy); err != nil {
return err
}
c.recorder.Eventf(migration, k8sv1.EventTypeWarning, FailedMigrationReason, "VMI's migration state was cleared during the active migration.")
log.Log.Object(migration).Error("vmi migration state cleared during migration")
} else if migration.TargetIsHandedOff() &&
vmi.Status.MigrationState != nil &&
vmi.Status.MigrationState.MigrationUID != migration.UID {

migrationCopy.Status.Phase = virtv1.MigrationFailed
if err := c.removeDeschedulerAnnotationFromSourcePod(migrationCopy); err != nil {
return err
}
c.recorder.Eventf(migration, k8sv1.EventTypeWarning, FailedMigrationReason, "VMI's migration state was taken over by another migration job during active migration.")
log.Log.Object(migration).Error("vmi's migration state was taken over by another migration object")
} else if vmi.Status.MigrationState != nil &&
vmi.Status.MigrationState.MigrationUID == migration.UID &&
vmi.Status.MigrationState.Failed {

migrationCopy.Status.Phase = virtv1.MigrationFailed
if err := c.removeDeschedulerAnnotationFromSourcePod(migrationCopy); err != nil {
return err
}
c.recorder.Eventf(migration, k8sv1.EventTypeWarning, FailedMigrationReason, "source node reported migration failed")
log.Log.Object(migration).Errorf("VMI %s/%s reported migration failed", vmi.Namespace, vmi.Name)

Expand All @@ -488,6 +514,9 @@ func (c *MigrationController) updateStatus(migration *virtv1.VirtualMachineInsta
migrationCopy.Status.Conditions = append(migrationCopy.Status.Conditions, condition)
} else if attachmentPodExists && podIsDown(attachmentPod) {
migrationCopy.Status.Phase = virtv1.MigrationFailed
if err := c.removeDeschedulerAnnotationFromSourcePod(migrationCopy); err != nil {
return err
}
c.recorder.Eventf(migration, k8sv1.EventTypeWarning, FailedMigrationReason, "Migration failed because target attachment pod shutdown during migration")
log.Log.Object(migration).Errorf("target attachment pod %s/%s shutdown during migration", attachmentPod.Namespace, attachmentPod.Name)
} else {
Expand Down Expand Up @@ -1277,6 +1306,12 @@ func (c *MigrationController) sync(key string, migration *virtv1.VirtualMachineI
return nil
}

if _, exists := migration.GetAnnotations()[virtv1.EvacuationMigrationAnnotation]; exists {
if err = c.addDeschedulerAnnotationToSourcePod(sourcePod); err != nil {
return err
}
}

// patch VMI annotations and set RuntimeUser in preparation for target pod creation
patches := c.setupVMIRuntimeUser(vmi)
if !patches.IsEmpty() {
Expand Down Expand Up @@ -1332,8 +1367,17 @@ func (c *MigrationController) sync(key string, migration *virtv1.VirtualMachineI
!vmi.Status.MigrationState.Failed &&
!vmi.Status.MigrationState.Completed {

return c.handleMarkMigrationFailedOnVMI(migration, vmi)
err = c.handleMarkMigrationFailedOnVMI(migration, vmi)
if err != nil {
return err
}
}

if migration.Status.Phase != virtv1.MigrationFailed {
return nil
}

return c.removeDeschedulerAnnotationFromSourcePod(migration)
case virtv1.MigrationRunning:
if migration.DeletionTimestamp != nil && vmi.Status.MigrationState != nil {
err = c.markMigrationAbortInVmiStatus(migration, vmi)
Expand All @@ -1346,6 +1390,62 @@ func (c *MigrationController) sync(key string, migration *virtv1.VirtualMachineI
return nil
}

func (c *MigrationController) addDeschedulerAnnotationToSourcePod(sourcePod *k8sv1.Pod) error {
if _, exists := sourcePod.GetAnnotations()[deschedulerEvictionInProgressAnnotation]; !exists {
patchSet := patch.New(
patch.WithAdd(fmt.Sprintf("/metadata/annotations/%s", patch.EscapeJSONPointer(deschedulerEvictionInProgressAnnotation)), ""),
)
patchBytes, err := patchSet.GeneratePayload()
if err != nil {
return err
}

_, err = c.clientset.CoreV1().Pods(sourcePod.Namespace).Patch(context.Background(), sourcePod.Name, types.JSONPatchType, patchBytes, v1.PatchOptions{})
if err != nil {
log.Log.Object(sourcePod).Errorf("failed to add %s pod annotation: %v", deschedulerEvictionInProgressAnnotation, err)
return err
}
}
return nil
}

func (c *MigrationController) removeDeschedulerAnnotationFromSourcePod(migration *virtv1.VirtualMachineInstanceMigration) error {
if migration.Status.MigrationState == nil || migration.Status.MigrationState.SourcePod == "" {
return nil
}

podKey := controller.NamespacedKey(migration.Namespace, migration.Status.MigrationState.SourcePod)
obj, exists, err := c.podIndexer.GetByKey(podKey)
if !exists {
log.Log.Reason(err).Errorf("source pod %s does not exist", migration.Status.MigrationState.SourcePod)
return nil
}
if err != nil {
log.Log.Reason(err).Errorf("Failed to fetch source pod %s for namespace from cache.", migration.Status.MigrationState.SourcePod)
return err
}

sourcePod := obj.(*k8sv1.Pod)
if _, exists := sourcePod.GetAnnotations()[deschedulerEvictionInProgressAnnotation]; exists {
patchSet := patch.New(
patch.WithTest(fmt.Sprintf("/metadata/annotations/%s", patch.EscapeJSONPointer(deschedulerEvictionInProgressAnnotation)), ""),
patch.WithRemove(fmt.Sprintf("/metadata/annotations/%s", patch.EscapeJSONPointer(deschedulerEvictionInProgressAnnotation))),
)
patchBytes, err := patchSet.GeneratePayload()
if err != nil {
return err
}

_, err = c.clientset.CoreV1().Pods(sourcePod.Namespace).Patch(context.Background(), sourcePod.Name, types.JSONPatchType, patchBytes, v1.PatchOptions{})
if err != nil {
log.Log.Object(sourcePod).Errorf("failed to remove %s pod annotation : %v", deschedulerEvictionInProgressAnnotation, err)
return err
}
}

return nil
}

func (c *MigrationController) setupVMIRuntimeUser(vmi *virtv1.VirtualMachineInstance) *patch.PatchSet {
patchSet := patch.New()
if !c.clusterConfig.RootEnabled() {
Expand Down

0 comments on commit 21fc46e

Please sign in to comment.