@@ -419,15 +419,21 @@ def evaluateMapState(self, function_input, key, metadata, sapi):
419
419
self ._logger .debug ("[StateUtils] evaluateMapState, maxConcurrency: " + str (maxConcurrency ))
420
420
self ._logger .debug ("[StateUtils] evaluateMapState metadata: " + str (metadata ))
421
421
422
+ self ._logger .info ("[StateUtils] evaluateMapState, maxConcurrency: " + str (maxConcurrency ))
423
+ self ._logger .info ("[StateUtils] evaluateMapState metadata: " + str (metadata ))
424
+
422
425
counter_name_topic = self .functionstatename + "-" + self .sandboxid
423
426
424
427
total_branch_count = len (function_input ) # all branches executed concurrently
425
-
428
+ #sapi.put(name_prefix + "_" + "mapInputCount", str(len(function_input)))
429
+
426
430
klist = [total_branch_count ]
427
431
428
432
self .parsedfunctionstateinfo ["BranchCount" ] = int (total_branch_count ) # overwrite parsed BranchCount with new value
429
433
self ._logger .debug ("[StateUtils] evaluateMapState, total_branch_count: " + str (total_branch_count ))
430
434
435
+ self ._logger .info ("[StateUtils] evaluateMapState, total_branch_count: " + str (total_branch_count ))
436
+
431
437
# prepare counter metadata
432
438
counter_metadata = {}
433
439
counter_metadata ["__state_action" ] = "post_map_processing"
@@ -459,6 +465,7 @@ def evaluateMapState(self, function_input, key, metadata, sapi):
459
465
counter_name_value_metadata ["__state_action" ] = "post_map_processing"
460
466
counter_name_value_metadata ["state_counter" ] = metadata ["state_counter" ]
461
467
self ._logger .debug ("[StateUtils] evaluateMapState, metadata[state_counter]: " + str (metadata ["state_counter" ]))
468
+ self ._logger .info ("[StateUtils] evaluateMapState, metadata[state_counter]: " + str (metadata ["state_counter" ]))
462
469
self .mapStateCounter = int (metadata ["state_counter" ])
463
470
464
471
counter_name_value = {"__mfnmetadata" : counter_name_value_metadata , "__mfnuserdata" : '{}' }
@@ -506,6 +513,8 @@ def evaluateMapState(self, function_input, key, metadata, sapi):
506
513
assert py3utils .is_string (workflow_instance_metadata_storage_key )
507
514
self ._logger .debug ("[StateUtils] full_metadata_encoded put key: " + str (workflow_instance_metadata_storage_key ))
508
515
516
+ self ._logger .info ("[StateUtils] full_metadata_encoded put key: " + str (workflow_instance_metadata_storage_key ))
517
+
509
518
sapi .put (workflow_instance_metadata_storage_key , json .dumps (metadata ))
510
519
511
520
# Now provide each branch with its own input
@@ -523,9 +532,14 @@ def evaluateMapState(self, function_input, key, metadata, sapi):
523
532
self ._logger .debug ("\t Map State StartAt:" + startat )
524
533
self ._logger .debug ("\t Map State input:" + str (function_input [i ]))
525
534
535
+ self ._logger .info ("\t Map State StartAt:" + startat )
536
+ self ._logger .info ("\t Map State input:" + str (function_input [i ]))
537
+
526
538
return function_input , metadata
527
539
528
540
def evaluatePostMap (self , function_input , key , metadata , sapi ):
541
+ self ._logger .info ("\t inside evaluatePostMap: " + str (function_input )+ " " + str (metadata ) + " " + str (sapi ))
542
+
529
543
530
544
name_prefix = self .functiontopic + "_" + key
531
545
@@ -542,11 +556,15 @@ def evaluatePostMap(self, function_input, key, metadata, sapi):
542
556
543
557
self ._logger .debug ("\t metadata:" + json .dumps (metadata ))
544
558
559
+ self ._logger .info ("\t metadata:" + json .dumps (metadata ))
560
+
545
561
workflow_instance_metadata_storage_key = str (function_input ["WorkflowInstanceMetadataStorageKey" ])
546
562
assert py3utils .is_string (workflow_instance_metadata_storage_key )
547
563
full_metadata_encoded = sapi .get (workflow_instance_metadata_storage_key )
548
564
self ._logger .debug ("[StateUtils] full_metadata_encoded get: " + str (full_metadata_encoded ))
549
565
566
+ self ._logger .info ("[StateUtils] full_metadata_encoded get: " + str (full_metadata_encoded ))
567
+
550
568
full_metadata = json .loads (full_metadata_encoded )
551
569
full_metadata ["state_counter" ] = state_counter
552
570
@@ -557,6 +575,8 @@ def evaluatePostMap(self, function_input, key, metadata, sapi):
557
575
branchOutputKeysSet = sapi .retrieveSet (branchOutputKeysSetKey )
558
576
self ._logger .debug ("\t branchOutputKeysSet: " + str (branchOutputKeysSet ))
559
577
578
+ self ._logger .info ("\t branchOutputKeysSet: " + str (branchOutputKeysSet ))
579
+
560
580
if not branchOutputKeysSet :
561
581
self ._logger .error ("[StateUtils] branchOutputKeysSet is empty" )
562
582
raise Exception ("[StateUtils] branchOutputKeysSet is empty" )
@@ -576,13 +596,17 @@ def evaluatePostMap(self, function_input, key, metadata, sapi):
576
596
NumBranchesFinished = abs (counterValue )
577
597
self ._logger .debug ("\t NumBranchesFinished:" + str (NumBranchesFinished ))
578
598
599
+ self ._logger .info ("\t NumBranchesFinished:" + str (NumBranchesFinished ))
600
+
579
601
do_cleanup = False
580
602
581
603
if klist [- 1 ] == NumBranchesFinished :
582
604
do_cleanup = True
583
605
584
606
self ._logger .debug ("\t do_cleanup:" + str (do_cleanup ))
585
607
608
+ self ._logger .info ("\t do_cleanup:" + str (do_cleanup ))
609
+
586
610
counterName = str (mapInfo ["CounterName" ])
587
611
counter_metadata_key_name = counterName + "_metadata"
588
612
assert py3utils .is_string (counterName )
@@ -610,6 +634,10 @@ def evaluatePostMap(self, function_input, key, metadata, sapi):
610
634
611
635
self ._logger .debug ("\t mapInfo_BranchOutputKeys length: " + str (len (mapInfo ["BranchOutputKeys" ])))
612
636
637
+ self ._logger .info ("\t mapInfo_BranchOutputKeys:" + str (mapInfo ["BranchOutputKeys" ]))
638
+
639
+ self ._logger .info ("\t mapInfo_BranchOutputKeys length: " + str (len (mapInfo ["BranchOutputKeys" ])))
640
+
613
641
for outputkey in mapInfo ["BranchOutputKeys" ]:
614
642
outputkey = str (outputkey )
615
643
if outputkey in branchOutputKeysSet : # mapInfo["BranchOutputKeys"]:
@@ -623,15 +651,23 @@ def evaluatePostMap(self, function_input, key, metadata, sapi):
623
651
self ._logger .debug ("\t branchOutput:" + branchOutput )
624
652
self ._logger .debug ("\t branchOutput_decoded(type):" + str (type (branchOutput_decoded )))
625
653
self ._logger .debug ("\t branchOutput_decoded:" + str (branchOutput_decoded ))
654
+ self ._logger .info ("\t branchOutput(type):" + str (type (branchOutput )))
655
+ self ._logger .info ("\t branchOutput:" + branchOutput )
656
+ self ._logger .info ("\t branchOutput_decoded(type):" + str (type (branchOutput_decoded )))
657
+ self ._logger .info ("\t branchOutput_decoded:" + str (branchOutput_decoded ))
626
658
post_map_output_values = post_map_output_values + [branchOutput_decoded ]
627
659
if do_cleanup :
628
660
sapi .delete (outputkey ) # cleanup the key from data layer
629
661
self ._logger .debug ("\t cleaned output key:" + outputkey )
662
+ self ._logger .info ("\t cleaned output key:" + outputkey )
630
663
else :
631
664
post_map_output_values = post_map_output_values + [None ]
632
665
self ._logger .debug ("\t this_BranchOutputKeys is not contained: " + str (outputkey ))
633
666
667
+ self ._logger .info ("\t this_BranchOutputKeys is not contained: " + str (outputkey ))
668
+
634
669
self ._logger .debug ("\t post_map_output_values:" + str (post_map_output_values ))
670
+ self ._logger .info ("\t post_map_output_values:" + str (post_map_output_values ))
635
671
while (sapi .get (name_prefix + "_" + "mapStatePartialResult" )) == "" :
636
672
time .sleep (0.1 ) # wait until value is available
637
673
@@ -640,15 +676,25 @@ def evaluatePostMap(self, function_input, key, metadata, sapi):
640
676
mapStatePartialResult += post_map_output_values
641
677
sapi .put (name_prefix + "_" + "mapStatePartialResult" , str (mapStatePartialResult ))
642
678
679
+ time .sleep (5.0 )
680
+
643
681
# now apply ResultPath and OutputPath
644
682
if do_cleanup :
645
683
646
684
sapi .deleteSet (branchOutputKeysSetKey )
647
685
686
+ while (sapi .get (name_prefix + "_" + "mapInputCount" ) == "" ):
687
+ time .sleep (0.1 ) # wait until value is available
688
+
648
689
if ast .literal_eval (sapi .get (name_prefix + "_" + "mapInputCount" )) == len (mapStatePartialResult ):
690
+ #time.sleep(0.5)
649
691
650
692
# we are ready to publish but need to honour ResultPath and OutputPath
693
+ while (sapi .get (name_prefix + "_" + "mapStatePartialResult" ) == "" ):
694
+ time .sleep (0.1 )
651
695
res_raw = ast .literal_eval (sapi .get (name_prefix + "_" + "mapStatePartialResult" ))
696
+ self ._logger .info ("[StateUtils] evaluatePostMap: res_raw" + str (res_raw ) + " vs. " + sapi .get (name_prefix + "_" + "mapInputCount" ))
697
+
652
698
653
699
# remove unwanted keys from input before publishing
654
700
function_input = {}
@@ -668,6 +714,9 @@ def evaluatePostMap(self, function_input, key, metadata, sapi):
668
714
sapi .delete (name_prefix + "_" + "mapStatePartialResult" )
669
715
sapi .delete (name_prefix + "_" + "tobeProcessedlater" )
670
716
post_map_output_values = function_input_post_output
717
+ else :
718
+ #raise Exception("mapInputCount" + str(sapi.get(name_prefix + "_" + "mapInputCount")) + " does not match mapStatePartialResult: " + str(mapStatePartialResult))
719
+ print ("mapInputCount" + str (sapi .get (name_prefix + "_" + "mapInputCount" )) + " does not match mapStatePartialResult: " + str (mapStatePartialResult ))
671
720
return post_map_output_values , full_metadata
672
721
673
722
def evaluateParallelState (self , function_input , key , metadata , sapi ):
@@ -964,7 +1013,7 @@ def evaluatePostParallel(self, function_input, key, metadata, sapi):
964
1013
965
1014
def evaluateNonTaskState (self , function_input , key , metadata , sapi ):
966
1015
# 3. Evaluate Non Task states
967
- # self._logger.debug ("[StateUtils] NonTask state type: " + str(self.functionstatetype))
1016
+ self ._logger .info ("[StateUtils] NonTask state type: " + str (self .functionstatetype ))
968
1017
#self._logger.debug("[StateUtils] Welcome to evaluateNonTaskState! Current key:" + str(key))
969
1018
function_output = None
970
1019
if self .functionstatetype == StateUtils .choiceStateType :
@@ -1090,6 +1139,9 @@ def evaluateNonTaskState(self, function_input, key, metadata, sapi):
1090
1139
self ._logger .debug ("[StateUtils] Map state maxConcurrency: " + str (maxConcurrency ))
1091
1140
self ._logger .debug ("[StateUtils] Map state handling" )
1092
1141
1142
+ self ._logger .info ("[StateUtils] Map state maxConcurrency: " + str (maxConcurrency ))
1143
+ self ._logger .info ("[StateUtils] Map state handling metadata: " + str (metadata ) )
1144
+
1093
1145
if "__state_action" not in metadata or metadata ["__state_action" ] != "post_map_processing" :
1094
1146
# here we start the iteration process on a first batch
1095
1147
if maxConcurrency != 0 :
@@ -1099,26 +1151,41 @@ def evaluateNonTaskState(self, function_input, key, metadata, sapi):
1099
1151
tobeProcessednow = function_input
1100
1152
tobeProcessedlater = []
1101
1153
self ._logger .debug ("[StateUtils] Map state function_input split:" + str (tobeProcessednow ) + " " + str (tobeProcessedlater ))
1154
+ self ._logger .info ("[StateUtils] Map state function_input split:" + str (tobeProcessednow ) + " " + str (tobeProcessedlater ))
1102
1155
sapi .put (name_prefix + "_" + "tobeProcessedlater" , str (tobeProcessedlater )) # store elements to be processed on DL
1103
1156
sapi .put (name_prefix + "_" + "mapStatePartialResult" , "[]" ) # initialise the collector variable
1104
1157
sapi .put (name_prefix + "_" + "mapInputCount" , str (len (function_input )))
1158
+ #metadata["__state_action"] = ""
1105
1159
1106
1160
function_output , metadata = self .evaluateMapState (tobeProcessednow , key , metadata , sapi )
1161
+ #metadata["__state_action"] = ""
1162
+
1107
1163
1108
1164
elif metadata ["__state_action" ] == "post_map_processing" :
1109
1165
tobeProcessedlater = ast .literal_eval (sapi .get (name_prefix + "_" + "tobeProcessedlater" )) # get all elements that have not yet been processed
1110
1166
self ._logger .debug ("[StateUtils] Map state post_map processing input:" + str (tobeProcessedlater ))
1167
+ self ._logger .info ("[StateUtils] Map state post_map processing input:" + str (tobeProcessedlater ))
1111
1168
# we need to decide at this point if there is a need for more batches. if so:
1112
1169
1113
1170
if len (tobeProcessedlater ) > 0 : # we need to start another batch
1171
+ self ._logger .info ("[StateUtils] tobeProcessedlater: " + str (tobeProcessedlater )+ ", function_input: " + str (function_input ))
1114
1172
function_output , metadata2 = self .evaluatePostMap (function_input , key , metadata , sapi ) # take care not to overwrite metadata
1173
+ self ._logger .info ("[StateUtils] after evaluatPostMap: " + str (function_output ))
1115
1174
function_output , metadata = self .evaluateMapState (tobeProcessedlater [:maxConcurrency ], key , metadata , sapi ) # start a new batch
1175
+ self ._logger .info ("[StateUtils] after evaluateMapState:" + str (function_output ))
1176
+ self ._logger .info ("[StateUtils] after evaluateMapState, metadata: " + str (metadata ))
1116
1177
sapi .put (name_prefix + "_" + "tobeProcessedlater" , str (tobeProcessedlater [maxConcurrency :])) # store remaining elements to be processed on DL
1178
+ self ._logger .info ("[StateUtils] after sapi.put: " + str (tobeProcessedlater [maxConcurrency :]))
1179
+
1117
1180
1118
1181
else :# no more batches required. we are at the iteration end, publish the final result
1119
1182
self ._logger .debug ("[StateUtils] Map state input final stage: " + str (function_input ))
1183
+ self ._logger .info ("[StateUtils] Map state input final stage: " + str (function_input ))
1120
1184
function_output , metadata = self .evaluatePostMap (function_input , key , metadata , sapi )
1121
1185
1186
+ elif metadata ["__state_action" ] == '' :
1187
+ raise Exception ("Unkown state action in map state" )
1188
+
1122
1189
else :
1123
1190
raise Exception ("Unknow action type in map state" )
1124
1191
0 commit comments