Skip to content

Commit 70236ba

Browse files
askyrieSongZhen0704
authored andcommitted
feat: k8s add configmap
1 parent 4d2ed43 commit 70236ba

11 files changed

Lines changed: 397 additions & 130 deletions

File tree

cli/ctl/common/const.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ var RESOURCE_TYPES = []string{
3030
"NATGateways", "NATRules", "NATVMConnections", "LBs", "LBListeners", "LBTargetServers",
3131
"LBVMConnections", "PeerConnections", "CENs", "RedisInstances", "RDSInstances", "VInterfaces",
3232
"IPs", "FloatingIPs", "PodClusters", "PodNodes", "VMPodNodeConnections", "PodNamespaces",
33-
"PodGroups", "PodReplicaSets", "Pods", "PodServices", "PodServicePorts", "PodGroupPorts",
34-
"PodIngresses", "PodIngressRules", "PodIngressRuleBackends", "Processes", "PrometheusTargets", "VIPs",
33+
"PodGroups", "PodGroupConfigMapConnections", "PodReplicaSets", "ConfigMaps", "Pods", "PodServices",
34+
"PodServicePorts", "PodGroupPorts", "PodIngresses", "PodIngressRules", "PodIngressRuleBackends",
35+
"Processes", "PrometheusTargets", "VIPs",
3536
}
3637

3738
//go:generate stringer -type=DomainType -trimprefix=DOMAIN_TYPE_ -linecomment

server/controller/cloud/common/utils.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ package common
1818

1919
import (
2020
"bufio"
21+
"crypto/md5"
2122
"encoding/binary"
2223
"encoding/csv"
24+
"encoding/hex"
2325
"errors"
2426
"fmt"
2527
"io"
@@ -751,3 +753,11 @@ func GetVTapSubDomainMappingByDomain(domain string, db *gorm.DB) (map[int]string
751753

752754
return vtapIDToSubDomain, nil
753755
}
756+
757+
func GenerateMD5Sum(data string) string {
758+
if data == "" {
759+
return ""
760+
}
761+
hash := md5.Sum([]byte(data))
762+
return hex.EncodeToString(hash[:])
763+
}

server/controller/cloud/kubernetes.go

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -120,30 +120,32 @@ func (c *Cloud) getKubernetesData() model.Resource {
120120
}
121121

122122
return model.Resource{
123-
Verified: true,
124-
SyncAt: time.Now(),
125-
AZs: []model.AZ{kubernetesGatherResource.AZ},
126-
VPCs: []model.VPC{kubernetesGatherResource.VPC},
127-
PodClusters: []model.PodCluster{kubernetesGatherResource.PodCluster},
128-
ErrorState: kubernetesGatherResource.ErrorState,
129-
ErrorMessage: kubernetesGatherResource.ErrorMessage,
130-
PodNodes: kubernetesGatherResource.PodNodes,
131-
PodServices: kubernetesGatherResource.PodServices,
132-
PodNamespaces: kubernetesGatherResource.PodNamespaces,
133-
Pods: kubernetesGatherResource.Pods,
134-
PodGroups: kubernetesGatherResource.PodGroups,
135-
PodIngresses: kubernetesGatherResource.PodIngresses,
136-
PodGroupPorts: kubernetesGatherResource.PodGroupPorts,
137-
PodReplicaSets: kubernetesGatherResource.PodReplicaSets,
138-
PodServicePorts: kubernetesGatherResource.PodServicePorts,
139-
PodIngressRules: kubernetesGatherResource.PodIngressRules,
140-
PodIngressRuleBackends: kubernetesGatherResource.PodIngressRuleBackends,
141-
IPs: ips,
142-
VMs: vms,
143-
Regions: regions,
144-
Subnets: subnets,
145-
Networks: networks,
146-
VInterfaces: vinterfaces,
147-
VMPodNodeConnections: vmPodNodeConnections,
123+
Verified: true,
124+
SyncAt: time.Now(),
125+
AZs: []model.AZ{kubernetesGatherResource.AZ},
126+
VPCs: []model.VPC{kubernetesGatherResource.VPC},
127+
PodClusters: []model.PodCluster{kubernetesGatherResource.PodCluster},
128+
ErrorState: kubernetesGatherResource.ErrorState,
129+
ErrorMessage: kubernetesGatherResource.ErrorMessage,
130+
PodNodes: kubernetesGatherResource.PodNodes,
131+
PodServices: kubernetesGatherResource.PodServices,
132+
PodNamespaces: kubernetesGatherResource.PodNamespaces,
133+
Pods: kubernetesGatherResource.Pods,
134+
PodGroups: kubernetesGatherResource.PodGroups,
135+
ConfigMaps: kubernetesGatherResource.ConfigMaps,
136+
PodIngresses: kubernetesGatherResource.PodIngresses,
137+
PodGroupPorts: kubernetesGatherResource.PodGroupPorts,
138+
PodReplicaSets: kubernetesGatherResource.PodReplicaSets,
139+
PodServicePorts: kubernetesGatherResource.PodServicePorts,
140+
PodIngressRules: kubernetesGatherResource.PodIngressRules,
141+
PodIngressRuleBackends: kubernetesGatherResource.PodIngressRuleBackends,
142+
PodGroupConfigMapConnections: kubernetesGatherResource.PodGroupConfigMapConnections,
143+
IPs: ips,
144+
VMs: vms,
145+
Regions: regions,
146+
Subnets: subnets,
147+
Networks: networks,
148+
VInterfaces: vinterfaces,
149+
VMPodNodeConnections: vmPodNodeConnections,
148150
}
149151
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright (c) 2024 Yunshan Networks
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kubernetes_gather
18+
19+
import (
20+
"time"
21+
22+
"github.com/bitly/go-simplejson"
23+
cloudcommon "github.com/deepflowio/deepflow/server/controller/cloud/common"
24+
"github.com/deepflowio/deepflow/server/controller/cloud/model"
25+
"github.com/deepflowio/deepflow/server/controller/common"
26+
"github.com/deepflowio/deepflow/server/libs/logger"
27+
)
28+
29+
func (k *KubernetesGather) getConfigMaps() (configMaps []model.ConfigMap, err error) {
30+
log.Debug("get configmaps starting", logger.NewORGPrefix(k.orgID))
31+
for _, c := range k.k8sInfo["*v1.ConfigMap"] {
32+
cData, cErr := simplejson.NewJson([]byte(c))
33+
if cErr != nil {
34+
err = cErr
35+
log.Errorf("configmap initialization simplejson error: (%s)", cErr.Error(), logger.NewORGPrefix(k.orgID))
36+
return
37+
}
38+
metaData, ok := cData.CheckGet("metadata")
39+
if !ok {
40+
log.Info("configmap metadata not found", logger.NewORGPrefix(k.orgID))
41+
continue
42+
}
43+
uID := metaData.Get("uid").MustString()
44+
if uID == "" {
45+
log.Info("configmap uid not found", logger.NewORGPrefix(k.orgID))
46+
continue
47+
}
48+
name := metaData.Get("name").MustString()
49+
if name == "" {
50+
log.Infof("configmap (%s) name not found", uID, logger.NewORGPrefix(k.orgID))
51+
continue
52+
}
53+
uLcuuid := common.IDGenerateUUID(k.orgID, uID)
54+
namespace := metaData.Get("namespace").MustString()
55+
namespaceLcuuid, ok := k.namespaceToLcuuid[namespace]
56+
if !ok {
57+
log.Infof("configmap (%s) namespace not found", name, logger.NewORGPrefix(k.orgID))
58+
continue
59+
}
60+
var created time.Time
61+
cTime := metaData.Get("creationTimestamp").MustString()
62+
if cTime != "" {
63+
localTime, err := time.Parse(time.RFC3339, cTime)
64+
if err == nil {
65+
created = localTime.Local()
66+
}
67+
}
68+
configMaps = append(configMaps, model.ConfigMap{
69+
Data: c,
70+
DataHash: cloudcommon.GenerateMD5Sum(k.simpleJsonMarshal(cData)),
71+
Lcuuid: uLcuuid,
72+
Name: name,
73+
PodNamespaceLcuuid: namespaceLcuuid,
74+
CreatedAt: created,
75+
VPCLcuuid: k.VPCUUID,
76+
AZLcuuid: k.azLcuuid,
77+
RegionLcuuid: k.RegionUUID,
78+
PodClusterLcuuid: k.podClusterLcuuid,
79+
})
80+
k.configMapToLcuuid[[2]string{namespace, name}] = uLcuuid
81+
}
82+
log.Debug("get configmaps complete", logger.NewORGPrefix(k.orgID))
83+
return
84+
}

server/controller/cloud/kubernetes_gather/kubernetes_gather.go

Lines changed: 113 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
cloudcommon "github.com/deepflowio/deepflow/server/controller/cloud/common"
2929
"github.com/deepflowio/deepflow/server/controller/cloud/config"
3030
"github.com/deepflowio/deepflow/server/controller/cloud/kubernetes_gather/model"
31+
cloudmodel "github.com/deepflowio/deepflow/server/controller/cloud/model"
3132
"github.com/deepflowio/deepflow/server/controller/common"
3233
"github.com/deepflowio/deepflow/server/controller/db/metadb"
3334
metadbmodel "github.com/deepflowio/deepflow/server/controller/db/metadb/model"
@@ -68,6 +69,7 @@ type KubernetesGather struct {
6869
serviceLcuuidToIngressLcuuid map[string]string
6970
k8sInfo map[string][]string
7071
pgLcuuidToPSLcuuids map[string][]string
72+
configMapToLcuuid map[[2]string]string
7173
podLcuuidToPGInfo map[string][2]string
7274
nsLabelToGroupLcuuids map[string]mapset.Set
7375
pgLcuuidTopodTargetPorts map[string]map[string]int
@@ -207,6 +209,7 @@ func NewKubernetesGather(db *metadb.DB, domain *metadbmodel.Domain, subDomain *m
207209
serviceLcuuidToIngressLcuuid: map[string]string{},
208210
k8sInfo: map[string][]string{},
209211
pgLcuuidToPSLcuuids: map[string][]string{},
212+
configMapToLcuuid: map[[2]string]string{},
210213
podLcuuidToPGInfo: map[string][2]string{},
211214
nsLabelToGroupLcuuids: map[string]mapset.Set{},
212215
pgLcuuidTopodTargetPorts: map[string]map[string]int{},
@@ -249,6 +252,76 @@ func (k *KubernetesGather) GetLabel(labelMap map[string]interface{}) string {
249252
return strings.Join(labelSlice, ", ")
250253
}
251254

255+
func (k *KubernetesGather) simpleJsonMarshal(json *simplejson.Json) string {
256+
bytes, err := json.MarshalJSON()
257+
if err != nil {
258+
log.Infof("simplejson (%s) marshal failed: %s", json, err.Error(), logger.NewORGPrefix(k.orgID))
259+
return ""
260+
}
261+
return string(bytes)
262+
}
263+
264+
func (k *KubernetesGather) pgSpecGenerateConnections(nsName, pgName, pgLcuuid string, mainSpec *simplejson.Json) []cloudmodel.PodGroupConfigMapConnection {
265+
var connections []cloudmodel.PodGroupConfigMapConnection
266+
267+
existSet := map[string]bool{}
268+
spec := mainSpec.GetPath("template", "spec")
269+
containers := spec.Get("containers")
270+
for c := range containers.MustArray() {
271+
envs := containers.GetIndex(c).Get("env")
272+
for e := range envs.MustArray() {
273+
env := envs.GetIndex(e)
274+
ref, ok := env.Get("valueFrom").CheckGet("configMapKeyRef")
275+
if !ok {
276+
continue
277+
}
278+
cmName := ref.Get("Name").MustString()
279+
cmLcuuid, ok := k.configMapToLcuuid[[2]string{nsName, cmName}]
280+
if !ok {
281+
log.Infof("pod group (%s) imported env config map (%s) not found", pgName, cmName, logger.NewORGPrefix(k.orgID))
282+
continue
283+
}
284+
if _, ok := existSet[pgLcuuid+cmLcuuid]; ok {
285+
log.Debugf("env pod group (%s) and config map (%s) connections already exists", pgName, cmName, logger.NewORGPrefix(k.orgID))
286+
continue
287+
}
288+
connections = append(connections, cloudmodel.PodGroupConfigMapConnection{
289+
Lcuuid: common.GetUUIDByOrgID(k.orgID, pgLcuuid+cmLcuuid),
290+
PodGroupLcuuid: pgLcuuid,
291+
ConfigMapLcuuid: cmLcuuid,
292+
})
293+
existSet[pgLcuuid+cmLcuuid] = false
294+
}
295+
}
296+
297+
volumes := spec.Get("volumes")
298+
for v := range volumes.MustArray() {
299+
volume := volumes.GetIndex(v)
300+
cm, ok := volume.CheckGet("configMap")
301+
if !ok {
302+
continue
303+
}
304+
cmName := cm.Get("name").MustString()
305+
cmLcuuid, ok := k.configMapToLcuuid[[2]string{nsName, cmName}]
306+
if !ok {
307+
log.Infof("pod group (%s) imported volumes config map (%s) not found", pgName, cmName, logger.NewORGPrefix(k.orgID))
308+
continue
309+
}
310+
if _, ok := existSet[pgLcuuid+cmLcuuid]; ok {
311+
log.Debugf("volumes pod group (%s) and config map (%s) connections already exists", pgName, cmName, logger.NewORGPrefix(k.orgID))
312+
continue
313+
}
314+
connections = append(connections, cloudmodel.PodGroupConfigMapConnection{
315+
Lcuuid: common.GetUUIDByOrgID(k.orgID, pgLcuuid+cmLcuuid),
316+
PodGroupLcuuid: pgLcuuid,
317+
ConfigMapLcuuid: cmLcuuid,
318+
})
319+
existSet[pgLcuuid+cmLcuuid] = false
320+
}
321+
322+
return connections
323+
}
324+
252325
func (k *KubernetesGather) GetKubernetesGatherData() (model.KubernetesGatherResource, error) {
253326
// 任务循环的是同一个实例,所以这里要对关联关系进行初始化
254327
k.azLcuuid = ""
@@ -263,6 +336,7 @@ func (k *KubernetesGather) GetKubernetesGatherData() (model.KubernetesGatherReso
263336
k.serviceLcuuidToIngressLcuuid = map[string]string{}
264337
k.nsLabelToGroupLcuuids = map[string]mapset.Set{}
265338
k.pgLcuuidToPSLcuuids = map[string][]string{}
339+
k.configMapToLcuuid = map[[2]string]string{}
266340
k.podLcuuidToPGInfo = map[string][2]string{}
267341
k.pgLcuuidTopodTargetPorts = map[string]map[string]int{}
268342
k.namespaceToExLabels = map[string]map[string]interface{}{}
@@ -309,24 +383,31 @@ func (k *KubernetesGather) GetKubernetesGatherData() (model.KubernetesGatherReso
309383
return model.KubernetesGatherResource{}, err
310384
}
311385

312-
podGroups, err := k.getPodGroups()
386+
configMaps, err := k.getConfigMaps()
387+
if err != nil {
388+
return model.KubernetesGatherResource{}, err
389+
}
390+
391+
podGroups, podGroupConfigMapConnections, err := k.getPodGroups()
313392
if err != nil {
314393
return model.KubernetesGatherResource{}, err
315394
}
316395

317-
podRCs, err := k.getPodReplicationControllers()
396+
podRCs, podRCsConfigMapConnections, err := k.getPodReplicationControllers()
318397
if err != nil {
319398
return model.KubernetesGatherResource{}, err
320399
}
321400

322401
podGroups = append(podGroups, podRCs...)
402+
podGroupConfigMapConnections = append(podGroupConfigMapConnections, podRCsConfigMapConnections...)
323403

324-
replicaSets, podRSCs, err := k.getReplicaSetsAndReplicaSetControllers()
404+
replicaSets, podRSCs, podRSCsConfigMapConnections, err := k.getReplicaSetsAndReplicaSetControllers()
325405
if err != nil {
326406
return model.KubernetesGatherResource{}, err
327407
}
328408

329409
podGroups = append(podGroups, podRSCs...)
410+
podGroupConfigMapConnections = append(podGroupConfigMapConnections, podRSCsConfigMapConnections...)
330411

331412
podServices, servicePorts, podGroupPorts, serviceNetworks, serviceSubnets, serviceVinterfaces, serviceIPs, err := k.getPodServices()
332413
if err != nil {
@@ -354,33 +435,35 @@ func (k *KubernetesGather) GetKubernetesGatherData() (model.KubernetesGatherReso
354435
}
355436

356437
resource := model.KubernetesGatherResource{
357-
Region: region,
358-
AZ: az,
359-
VPC: vpc,
360-
PodNodes: podNodes,
361-
PodCluster: podCluster,
362-
PodServices: podServices,
363-
PodNamespaces: podNamespaces,
364-
PodNetwork: podNetwork,
365-
PodSubnets: podSubnets,
366-
PodVInterfaces: podVInterfaces,
367-
PodIPs: podIPs,
368-
PodNodeNetwork: nodeNetwork,
369-
PodNodeSubnets: nodeSubnets,
370-
PodNodeVInterfaces: nodeVInterfaces,
371-
PodNodeIPs: nodeIPs,
372-
PodServiceNetwork: serviceNetworks,
373-
PodServiceSubnets: serviceSubnets,
374-
PodServiceVInterfaces: serviceVinterfaces,
375-
PodServiceIPs: serviceIPs,
376-
PodServicePorts: servicePorts,
377-
PodGroupPorts: podGroupPorts,
378-
PodIngresses: ingresses,
379-
PodIngressRules: ingressRules,
380-
PodIngressRuleBackends: ingressRuleBackends,
381-
PodReplicaSets: replicaSets,
382-
PodGroups: podGroups,
383-
Pods: pods,
438+
Region: region,
439+
AZ: az,
440+
VPC: vpc,
441+
PodNodes: podNodes,
442+
PodCluster: podCluster,
443+
PodServices: podServices,
444+
PodNamespaces: podNamespaces,
445+
PodNetwork: podNetwork,
446+
PodSubnets: podSubnets,
447+
PodVInterfaces: podVInterfaces,
448+
PodIPs: podIPs,
449+
PodNodeNetwork: nodeNetwork,
450+
PodNodeSubnets: nodeSubnets,
451+
PodNodeVInterfaces: nodeVInterfaces,
452+
PodNodeIPs: nodeIPs,
453+
PodServiceNetwork: serviceNetworks,
454+
PodServiceSubnets: serviceSubnets,
455+
PodServiceVInterfaces: serviceVinterfaces,
456+
PodServiceIPs: serviceIPs,
457+
PodServicePorts: servicePorts,
458+
PodGroupPorts: podGroupPorts,
459+
PodGroupConfigMapConnections: podGroupConfigMapConnections,
460+
PodIngresses: ingresses,
461+
PodIngressRules: ingressRules,
462+
PodIngressRuleBackends: ingressRuleBackends,
463+
PodReplicaSets: replicaSets,
464+
PodGroups: podGroups,
465+
ConfigMaps: configMaps,
466+
Pods: pods,
384467
}
385468

386469
k.cloudStatsd.ResCount = statsd.GetResCount(resource)

0 commit comments

Comments
 (0)