|
16 | 16 | import zipfile |
17 | 17 | import tempfile |
18 | 18 | from datetime import date, datetime, timedelta |
19 | | -from typing import Dict, Any, List, Tuple |
| 19 | +from typing import Dict, Any, List, Tuple, Final |
20 | 20 | import time |
21 | 21 | from django.db.models import ( |
22 | 22 | F, |
|
39 | 39 | QuerySet, |
40 | 40 | Prefetch, |
41 | 41 | ) |
42 | | -from django.db.models.functions import Greatest, Coalesce |
| 42 | +from django.db.models.functions import Coalesce |
43 | 43 |
|
44 | 44 | from collections import defaultdict |
45 | 45 | import pytz |
|
76 | 76 |
|
77 | 77 |
|
78 | 78 | from django.apps import apps |
79 | | -from django.contrib.auth.models import Permission |
| 79 | +from django.contrib.auth.models import AnonymousUser, Permission |
80 | 80 | from django.conf import settings |
81 | 81 | from django.core.exceptions import FieldDoesNotExist |
82 | 82 | from django.core.files.storage import default_storage |
| 83 | +from django.contrib.auth.base_user import AbstractBaseUser |
83 | 84 |
|
84 | 85 | from django.db import models, transaction |
85 | 86 | from django.forms import ValidationError |
@@ -2434,6 +2435,75 @@ def category(self, request): |
2434 | 2435 | def csf_function(self, request): |
2435 | 2436 | return Response(dict(ReferenceControl.CSF_FUNCTION)) |
2436 | 2437 |
|
| 2438 | + @staticmethod |
| 2439 | + def _get_syncable_applied_controls( |
| 2440 | + reference_control: ReferenceControl, user: AbstractBaseUser | AnonymousUser |
| 2441 | + ) -> list[AppliedControl]: |
| 2442 | + """Return the list of syncable `AppliedControl` objects (meaning they are currently unsynced) the `User` can synchronize (based on his permissions).""" |
| 2443 | + |
| 2444 | + _, changeable_applied_controls, _ = RoleAssignment.get_accessible_object_ids( |
| 2445 | + Folder.get_root_folder(), user, AppliedControl |
| 2446 | + ) |
| 2447 | + |
| 2448 | + syncable_applied_controls = ( |
| 2449 | + reference_control.get_unsynced_applied_controls_queryset().filter( |
| 2450 | + id__in=changeable_applied_controls, |
| 2451 | + ) |
| 2452 | + ) |
| 2453 | + return list(syncable_applied_controls) |
| 2454 | + |
| 2455 | + @action(detail=True, methods=["get"], url_path="syncable-applied-controls") |
| 2456 | + def syncable_applied_controls(self, request, pk): |
| 2457 | + reference_control = self.get_object() |
| 2458 | + syncable_applied_controls = self._get_syncable_applied_controls( |
| 2459 | + reference_control, request.user |
| 2460 | + ) |
| 2461 | + |
| 2462 | + return Response( |
| 2463 | + [ |
| 2464 | + {"id": applied_control.id, "name": applied_control.name} |
| 2465 | + for applied_control in syncable_applied_controls |
| 2466 | + ] |
| 2467 | + ) |
| 2468 | + |
| 2469 | + @action(detail=True, methods=["post"], url_path="sync-applied-controls") |
| 2470 | + def sync_applied_controls(self, request, pk): |
| 2471 | + reference_control = self.get_object() |
| 2472 | + syncable_applied_controls = self._get_syncable_applied_controls( |
| 2473 | + reference_control, request.user |
| 2474 | + ) |
| 2475 | + |
| 2476 | + FIELDS_TO_SYNC: Final[list[str]] = [ |
| 2477 | + "category", |
| 2478 | + "csf_function", |
| 2479 | + ] |
| 2480 | + |
| 2481 | + for syncable_applied_control in syncable_applied_controls: |
| 2482 | + for field_to_sync in FIELDS_TO_SYNC: |
| 2483 | + reference_control_value = getattr(reference_control, field_to_sync) |
| 2484 | + |
| 2485 | + setattr( |
| 2486 | + syncable_applied_control, field_to_sync, reference_control_value |
| 2487 | + ) |
| 2488 | + |
| 2489 | + AppliedControl.objects.bulk_update( |
| 2490 | + syncable_applied_controls, FIELDS_TO_SYNC, batch_size=100 |
| 2491 | + ) |
| 2492 | + |
| 2493 | + skip_sync = all( |
| 2494 | + field_to_sync not in AppliedControl.INTEGRATION_SYNCABLE_FIELDS |
| 2495 | + for field_to_sync in FIELDS_TO_SYNC |
| 2496 | + ) |
| 2497 | + for applied_control in syncable_applied_controls: |
| 2498 | + applied_control.save(skip_sync=skip_sync) |
| 2499 | + |
| 2500 | + return Response( |
| 2501 | + [ |
| 2502 | + {"id": applied_control.id, "name": applied_control.name} |
| 2503 | + for applied_control in syncable_applied_controls |
| 2504 | + ] |
| 2505 | + ) |
| 2506 | + |
2437 | 2507 |
|
2438 | 2508 | class RiskMatrixViewSet(BaseModelViewSet): |
2439 | 2509 | """ |
@@ -5649,6 +5719,41 @@ def build_sunburst_data(data_dict, name="Root", level=0, parent_color=None): |
5649 | 5719 |
|
5650 | 5720 | return Response({"results": sunburst_data}) |
5651 | 5721 |
|
| 5722 | + @action(detail=True, methods=["post"], url_path="sync-to-reference-control") |
| 5723 | + def sync_applied_controls(self, request, pk): |
| 5724 | + dry_run = request.query_params.get("dry_run", True) |
| 5725 | + if dry_run == "false": |
| 5726 | + dry_run = False |
| 5727 | + |
| 5728 | + applied_control = self.get_object() |
| 5729 | + reference_control = applied_control.reference_control |
| 5730 | + changes: list[tuple[str, str]] = [] # List of (old_value, new_value) tuples. |
| 5731 | + |
| 5732 | + FIELDS_TO_SYNC: Final[list[str]] = [ |
| 5733 | + "category", |
| 5734 | + "csf_function", |
| 5735 | + ] |
| 5736 | + |
| 5737 | + for field_to_sync in FIELDS_TO_SYNC: |
| 5738 | + reference_control_value = getattr(reference_control, field_to_sync) |
| 5739 | + |
| 5740 | + applied_control_value = getattr(applied_control, field_to_sync) |
| 5741 | + if reference_control_value != applied_control_value: |
| 5742 | + changes.append((reference_control_value, applied_control_value)) |
| 5743 | + |
| 5744 | + setattr(applied_control, field_to_sync, reference_control_value) |
| 5745 | + |
| 5746 | + if dry_run: |
| 5747 | + return Response(changes) |
| 5748 | + |
| 5749 | + skip_sync = all( |
| 5750 | + field_to_sync not in AppliedControl.INTEGRATION_SYNCABLE_FIELDS |
| 5751 | + for field_to_sync in FIELDS_TO_SYNC |
| 5752 | + ) |
| 5753 | + applied_control.save(update_fields=FIELDS_TO_SYNC, skip_sync=skip_sync) |
| 5754 | + |
| 5755 | + return Response(changes) |
| 5756 | + |
5652 | 5757 |
|
5653 | 5758 | class ActionPlanList(generics.ListAPIView): |
5654 | 5759 | search_fields = ["name", "description", "ref_id"] |
|
0 commit comments