|
1 | 1 | from django.urls import reverse
|
2 | 2 | from django.db import models
|
3 | 3 | from django.core.validators import MaxValueValidator, MinValueValidator, RegexValidator
|
| 4 | +from django.core.exceptions import FieldError |
4 | 5 | from django.conf import settings
|
5 | 6 |
|
6 | 7 | from taggit.managers import TaggableManager
|
7 | 8 |
|
8 | 9 | from utilities.choices import ChoiceSet
|
9 | 10 | from netbox.models import NetBoxModel
|
10 | 11 | from netbox.models.features import ChangeLoggingMixin
|
| 12 | +from ipam.models import Prefix |
11 | 13 |
|
12 | 14 |
|
13 | 15 | class ASNStatusChoices(ChoiceSet):
|
@@ -38,6 +40,14 @@ class SessionStatusChoices(ChoiceSet):
|
38 | 40 | )
|
39 | 41 |
|
40 | 42 |
|
| 43 | +class ActionChoices(ChoiceSet): |
| 44 | + |
| 45 | + CHOICES = [ |
| 46 | + ('permit', 'Permit', 'green'), |
| 47 | + ('deny', 'Deny', 'red'), |
| 48 | + ] |
| 49 | + |
| 50 | + |
41 | 51 | class ASNGroup(ChangeLoggingMixin, models.Model):
|
42 | 52 | """
|
43 | 53 | """
|
@@ -306,3 +316,97 @@ def get_status_color(self):
|
306 | 316 |
|
307 | 317 | def get_absolute_url(self):
|
308 | 318 | return reverse('plugins:netbox_bgp:bgpsession', args=[self.pk])
|
| 319 | + |
| 320 | + |
| 321 | +class RoutingPolicyRule(NetBoxModel): |
| 322 | + routing_policy = models.ForeignKey( |
| 323 | + to=RoutingPolicy, |
| 324 | + on_delete=models.CASCADE, |
| 325 | + related_name='rules' |
| 326 | + ) |
| 327 | + index = models.PositiveIntegerField() |
| 328 | + action = models.CharField( |
| 329 | + max_length=30, |
| 330 | + choices=ActionChoices |
| 331 | + ) |
| 332 | + description = models.CharField( |
| 333 | + max_length=500, |
| 334 | + blank=True |
| 335 | + ) |
| 336 | + match_community = models.ManyToManyField( |
| 337 | + to=Community, |
| 338 | + blank=True, |
| 339 | + related_name='+' |
| 340 | + ) |
| 341 | + match_ip = models.ManyToManyField( |
| 342 | + to='ipam.Prefix', |
| 343 | + blank=True, |
| 344 | + related_name='+', |
| 345 | + ) |
| 346 | + match_ip_cond = models.JSONField( |
| 347 | + blank=True, |
| 348 | + null=True, |
| 349 | + ) |
| 350 | + match_custom = models.JSONField( |
| 351 | + blank=True, |
| 352 | + null=True, |
| 353 | + ) |
| 354 | + set_actions = models.JSONField( |
| 355 | + blank=True, |
| 356 | + null=True, |
| 357 | + ) |
| 358 | + |
| 359 | + class Meta: |
| 360 | + ordering = ('routing_policy', 'index') |
| 361 | + unique_together = ('routing_policy', 'index') |
| 362 | + |
| 363 | + def __str__(self): |
| 364 | + return f'{self.routing_policy}: Rule {self.index}' |
| 365 | + |
| 366 | + def get_absolute_url(self): |
| 367 | + return reverse('plugins:netbox_bgp:routingpolicyrule', args=[self.pk]) |
| 368 | + |
| 369 | + def get_action_color(self): |
| 370 | + return ActionChoices.colors.get(self.action) |
| 371 | + |
| 372 | + def get_ip_conditions(self): |
| 373 | + queryset = Prefix.objects.none() |
| 374 | + if self.match_ip_cond and self.match_ip_cond != {}: |
| 375 | + try: |
| 376 | + queryset = Prefix.objects.filter(**self.match_ip_cond) |
| 377 | + except FieldError: |
| 378 | + pass |
| 379 | + return queryset |
| 380 | + |
| 381 | + def get_match_custom(self): |
| 382 | + # some kind of ckeck? |
| 383 | + result = {} |
| 384 | + if self.match_custom: |
| 385 | + result = self.match_custom |
| 386 | + return result |
| 387 | + |
| 388 | + @property |
| 389 | + def match_statements(self): |
| 390 | + result = {} |
| 391 | + # add communities |
| 392 | + result.update( |
| 393 | + {'community': list(self.match_community.all().values_list('value', flat=True))} |
| 394 | + ) |
| 395 | + result.update( |
| 396 | + {'ip address': [str(prefix) for prefix in self.match_ip.all().values_list('prefix', flat=True)]} |
| 397 | + ) |
| 398 | + matched_ip = self.get_ip_conditions() |
| 399 | + result['ip address'].extend([str(prefix) for prefix in matched_ip.values_list('prefix', flat=True)]) |
| 400 | + custom_match = self.get_match_custom() |
| 401 | + # update community from custom |
| 402 | + result['community'].extend(custom_match.get('community', [])) |
| 403 | + result['ip address'].extend(custom_match.get('ip address', [])) |
| 404 | + # remove empty matches |
| 405 | + result = {k: v for k, v in result.items() if v} |
| 406 | + return result |
| 407 | + |
| 408 | + @property |
| 409 | + def set_statements(self): |
| 410 | + if self.set_actions: |
| 411 | + return self.set_actions |
| 412 | + return {} |
0 commit comments