Skip to content

Commit

Permalink
Merge pull request #527 from snetsystems/dev-2.0.0-jinhyeong
Browse files Browse the repository at this point in the history
[CloudHub AI] Refactor Device Deletion Logic &  Fix bug in ML/DL Setting modal to reset isUpdateAfterCreate state
  • Loading branch information
snetsystems authored Aug 23, 2024
2 parents 2181e01 + c762319 commit a890269
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 81 deletions.
160 changes: 79 additions & 81 deletions backend/server/network_devices.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ type deviceMapByOrg struct {
// State type definition
type State string

// FailedDevice represents a structure for a failed device
type FailedDevice struct {
ID string
Err error
}

// Define constants for the learn states
const (
Ready string = "Ready"
Expand Down Expand Up @@ -388,11 +394,12 @@ func (s *Service) RemoveDevices(w http.ResponseWriter, r *http.Request) {
devicesGroupByOrg := make(map[string][]string)
deviceOrgMap := make(map[string]string)
restartCollectorServers := map[string]string{}
var recordedDevices sync.Map

for _, deviceID := range request.DevicesIDs {
device, err := s.Store.NetworkDevice(ctx).Get(ctx, cloudhub.NetworkDeviceQuery{ID: &deviceID})
if err != nil {
addFailedDevice(failedDevices, &sync.Mutex{}, deviceID, err)
addFailedDevice(failedDevices, deviceID, err)
}
devicesGroupByOrg[device.Organization] = append(devicesGroupByOrg[device.Organization], device.ID)
deviceOrgMap[deviceID] = device.Organization
Expand All @@ -401,17 +408,13 @@ func (s *Service) RemoveDevices(w http.ResponseWriter, r *http.Request) {
activeCollectorKeys := make(map[string]bool)

for orgID, devices := range devicesGroupByOrg {
org, err := s.Store.NetworkDeviceOrg(ctx).Get(ctx, cloudhub.NetworkDeviceOrgQuery{ID: &orgID})
if err != nil {
Error(w, http.StatusUnprocessableEntity, err.Error(), s.Logger)
return
}
if len(org.CollectedDevicesIDs) > 0 {
org, _ := s.Store.NetworkDeviceOrg(ctx).Get(ctx, cloudhub.NetworkDeviceOrgQuery{ID: &orgID})
if org != nil && len(org.CollectedDevicesIDs) > 0 {
var activeCollectorsErr error
_, activeCollectorKeys, activeCollectorsErr = s.getCollectorServers()
if activeCollectorsErr != nil {
for _, id := range devices {
addFailedDevice(failedDevices, &sync.Mutex{}, id, fmt.Errorf("Failed to access active collector-server"))
addFailedDevice(failedDevices, id, fmt.Errorf("Failed to access active collector-server"))
}

response := make(map[string]interface{})
Expand All @@ -423,55 +426,42 @@ func (s *Service) RemoveDevices(w http.ResponseWriter, r *http.Request) {
}

orgsToUpdate, err := removeDeviceIDsFromPreviousOrg(ctx, s, deviceOrgMap)

for orgID, devicesIDs := range devicesGroupByOrg {
org, err := s.Store.NetworkDeviceOrg(ctx).Get(ctx, cloudhub.NetworkDeviceOrgQuery{ID: &orgID})
if err != nil {
for _, id := range devicesIDs {
addFailedDevice(failedDevices, &sync.Mutex{}, id, err)
}
continue
}
org, _ := s.Store.NetworkDeviceOrg(ctx).Get(ctx, cloudhub.NetworkDeviceOrgQuery{ID: &orgID})
if org != nil {
isActive := activeCollectorKeys[org.CollectorServer]
if !isActive && len(org.CollectedDevicesIDs) > 0 {
for _, id := range devicesIDs {
addFailedDevice(failedDevices, id, fmt.Errorf("collector server not active"))
}
continue

isActive := activeCollectorKeys[org.CollectorServer]
if !isActive {
for _, id := range devicesIDs {
addFailedDevice(failedDevices, &sync.Mutex{}, id, fmt.Errorf("collector server not active"))
}
continue
previousLearnedDevicesIDs := org.LearnedDevicesIDs
previousCollectedDevicesIDs := org.CollectedDevicesIDs
org.LearnedDevicesIDs = RemoveElements(previousLearnedDevicesIDs, devicesIDs)
org.CollectedDevicesIDs = RemoveElements(previousCollectedDevicesIDs, devicesIDs)
orgsToUpdate[org.ID] = *org
}

previousLearnedDevicesIDs := org.LearnedDevicesIDs
previousCollectedDevicesIDs := org.CollectedDevicesIDs
org.LearnedDevicesIDs = RemoveElements(previousLearnedDevicesIDs, devicesIDs)
org.CollectedDevicesIDs = RemoveElements(previousCollectedDevicesIDs, devicesIDs)
orgsToUpdate[org.ID] = *org

}

for _, org := range orgsToUpdate {
currentOrg, err := s.Store.NetworkDeviceOrg(ctx).Get(ctx, cloudhub.NetworkDeviceOrgQuery{ID: &org.ID})
if err != nil {
for _, devicesIDs := range devicesGroupByOrg {
for _, id := range devicesIDs {
addFailedDevice(failedDevices, &sync.Mutex{}, id, err)
}
}
continue
}

if !reflect.DeepEqual(org.CollectedDevicesIDs, currentOrg.CollectedDevicesIDs) {
currentOrg, _ := s.Store.NetworkDeviceOrg(ctx).Get(ctx, cloudhub.NetworkDeviceOrgQuery{ID: &org.ID})
if currentOrg != nil && !reflect.DeepEqual(org.CollectedDevicesIDs, currentOrg.CollectedDevicesIDs) {
statusCode, resp, err := s.manageLogstashConfig(ctx, &org)
if err != nil {
for _, devicesIDs := range devicesGroupByOrg {
for _, id := range devicesIDs {
addFailedDevice(failedDevices, &sync.Mutex{}, id, err)
addFailedDevice(failedDevices, id, err)
}
}
continue
} else if statusCode < http.StatusOK || statusCode >= http.StatusMultipleChoices {
for _, devicesIDs := range devicesGroupByOrg {
for _, id := range devicesIDs {
addFailedDevice(failedDevices, &sync.Mutex{}, id, fmt.Errorf(string(resp)))
addFailedDevice(failedDevices, id, fmt.Errorf(string(resp)))
}
}
continue
Expand All @@ -482,31 +472,37 @@ func (s *Service) RemoveDevices(w http.ResponseWriter, r *http.Request) {
if err != nil {
for _, devicesIDs := range devicesGroupByOrg {
for _, id := range devicesIDs {
addFailedDevice(failedDevices, &sync.Mutex{}, id, err)
addFailedDevice(failedDevices, id, err)
}
}
continue
}
}
}

err = s.Store.NetworkDeviceOrg(ctx).Update(ctx, &org)
msg := fmt.Sprintf(MsgNetWorkDeviceOrgModified.String(), org.ID)
s.logRegistration(ctx, "NetWorkDeviceOrg", msg)
if err != nil {
for _, devicesIDs := range devicesGroupByOrg {
for _, id := range devicesIDs {
addFailedDevice(failedDevices, &sync.Mutex{}, id, err)
if currentOrg != nil {
err = s.Store.NetworkDeviceOrg(ctx).Update(ctx, &org)
msg := fmt.Sprintf(MsgNetWorkDeviceOrgModified.String(), org.ID)
s.logRegistration(ctx, "NetWorkDeviceOrg", msg)
if err != nil {
for _, devicesIDs := range devicesGroupByOrg {
for _, id := range devicesIDs {
addFailedDevice(failedDevices, id, err)
}
}
continue
}
continue
}
}

failedDevicesChan := make(chan FailedDevice, len(failedDevices))

for id, err := range failedDevices {
recordedDevices.Store(id, true)
failedDevicesChan <- FailedDevice{ID: id, Err: fmt.Errorf(err)}
}

sem := make(chan struct{}, cloudhub.WorkerLimit)
var wg sync.WaitGroup
var mu sync.Mutex

for i, id := range request.DevicesIDs {
wg.Add(1)
sem <- struct{}{}
Expand All @@ -516,46 +512,44 @@ func (s *Service) RemoveDevices(w http.ResponseWriter, r *http.Request) {
<-sem
}()

mu.Lock()
if _, exists := failedDevices[id]; exists {
mu.Unlock()
if _, exists := recordedDevices.Load(id); exists {
return
}
mu.Unlock()

device, err := s.Store.NetworkDevice(ctx).Get(ctx, cloudhub.NetworkDeviceQuery{ID: &id})
if err != nil {
addFailedDevice(failedDevices, &mu, id, err)
failedDevicesChan <- FailedDevice{ID: id, Err: err}
return
}
if err := s.OrganizationExists(ctx, device.Organization); err != nil {
addFailedDevice(failedDevices, &mu, id, err)
failedDevicesChan <- FailedDevice{ID: id, Err: err}
return
}
serverCtx := serverContext(ctx)
MLRst, _ := s.Store.MLNxRst(serverCtx).Get(serverCtx, cloudhub.MLNxRstQuery{ID: &device.DeviceIP})
if MLRst != nil {
err = s.Store.MLNxRst(serverCtx).Delete(serverCtx, MLRst)
if err != nil {
addFailedDevice(failedDevices, &mu, id, err)
failedDevicesChan <- FailedDevice{ID: id, Err: err}
return
}
}
DLRst, _ := s.Store.DLNxRst(serverCtx).Get(serverCtx, cloudhub.DLNxRstQuery{ID: &device.DeviceIP})
if DLRst != nil {
err = s.Store.DLNxRst(serverCtx).Delete(serverCtx, DLRst)
if err != nil {
addFailedDevice(failedDevices, &mu, id, err)
failedDevicesChan <- FailedDevice{ID: id, Err: err}
return
}
}
err = s.Store.DLNxRstStg(serverCtx).Delete(serverCtx, cloudhub.DLNxRstStgQuery{ID: &device.DeviceIP})
if err != nil {
addFailedDevice(failedDevices, &mu, id, err)
failedDevicesChan <- FailedDevice{ID: id, Err: err}
return
}
err = s.Store.NetworkDevice(ctx).Delete(ctx, device)
if err != nil {
addFailedDevice(failedDevices, &mu, id, err)
failedDevicesChan <- FailedDevice{ID: id, Err: err}
return
}
msg := fmt.Sprintf(MsgNetWorkDeviceDeleted.String(), id)
Expand All @@ -564,6 +558,12 @@ func (s *Service) RemoveDevices(w http.ResponseWriter, r *http.Request) {
}

wg.Wait()
close(failedDevicesChan)

for failedDevice := range failedDevicesChan {
addFailedDevice(failedDevices, failedDevice.ID, failedDevice.Err)
}

response := make(map[string]interface{})
if len(failedDevices) > 0 {
response["failed_devices"] = convertFailedDevicesToArray(failedDevices)
Expand Down Expand Up @@ -1032,9 +1032,7 @@ func convertFailedDevicesToArray(failedDevices map[string]string) []deviceError
return result
}

func addFailedDevice(failedDevices map[string]string, mu *sync.Mutex, id string, err error) {
mu.Lock()
defer mu.Unlock()
func addFailedDevice(failedDevices map[string]string, id string, err error) {
if _, exists := failedDevices[id]; !exists {
failedDevices[id] = err.Error()
}
Expand All @@ -1044,27 +1042,27 @@ func addFailedDevice(failedDevices map[string]string, mu *sync.Mutex, id string,
func removeDeviceIDsFromPreviousOrg(ctx context.Context, s *Service, deviceOrgMap map[string]string) (map[string]cloudhub.NetworkDeviceOrg, error) {
orgsToUpdate := make(map[string]cloudhub.NetworkDeviceOrg)
allOrgs, err := s.Store.NetworkDeviceOrg(ctx).All(ctx)
if err != nil {
return nil, err
}
for _, orgInfo := range allOrgs {
updated := false
for _, deviceID := range orgInfo.LearnedDevicesIDs {
if org, exists := deviceOrgMap[deviceID]; exists && org != orgInfo.ID {
orgInfo.LearnedDevicesIDs = removeDeviceID(orgInfo.LearnedDevicesIDs, deviceID)
updated = true
if err == nil {
for _, orgInfo := range allOrgs {
updated := false
for _, deviceID := range orgInfo.LearnedDevicesIDs {
if org, exists := deviceOrgMap[deviceID]; exists && org != orgInfo.ID {
orgInfo.LearnedDevicesIDs = removeDeviceID(orgInfo.LearnedDevicesIDs, deviceID)
updated = true
}
}
}
for _, deviceID := range orgInfo.CollectedDevicesIDs {
if org, exists := deviceOrgMap[deviceID]; exists && org != orgInfo.ID {
orgInfo.CollectedDevicesIDs = removeDeviceID(orgInfo.CollectedDevicesIDs, deviceID)
updated = true
for _, deviceID := range orgInfo.CollectedDevicesIDs {
if org, exists := deviceOrgMap[deviceID]; exists && org != orgInfo.ID {
orgInfo.CollectedDevicesIDs = removeDeviceID(orgInfo.CollectedDevicesIDs, deviceID)
updated = true
}
}
if updated {
orgsToUpdate[orgInfo.ID] = orgInfo
}
}
if updated {
orgsToUpdate[orgInfo.ID] = orgInfo
}
}

return orgsToUpdate, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ function LearningSettingModal({

useEffect(() => {
if (isVisible) {
setIsUpdateAfterCreate(false)

const organizationID = getOrganizationIdByName(
organizations,
selectedSource?.telegraf
Expand Down Expand Up @@ -230,6 +232,7 @@ function LearningSettingModal({
value: DropdownItem | Source
) => {
if (key === 'organization') {
setIsUpdateAfterCreate(false)
setCurrentTask(DEFAULT_TASK)
setCronSchedule(DEFAULT_CRON_SCHEDULE)
setProcessCount(DEFAULT_PROCESS_COUNT)
Expand Down

0 comments on commit a890269

Please sign in to comment.