-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathWorkflowCollection.py
102 lines (88 loc) · 4.4 KB
/
WorkflowCollection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import io
import json
import os
import glob
from qgis.gui import QgsMessageBar
from processing.core.ProcessingLog import ProcessingLog
from processing.core.ProcessingConfig import ProcessingConfig, Setting
from processing_workflow.WorkflowProviderBase import WorkflowProviderBase
from processing_workflow.WrongWorkflowException import WrongWorkflowException
class WorkflowCollection(WorkflowProviderBase):
def __init__(self, iface, descriptionFile, workflowProvider, activate=False):
self.iface = iface
self.workflowProvider = workflowProvider
# If we just want to parse the description file (e.g. when creating
# new collection) then iface is not provided and we don't want to do
# proper initialization
if iface:
WorkflowProviderBase.__init__(self, activate)
else:
self.css = ""
# Read properties from configuration file
self.descriptionFile = descriptionFile
self.baseDir = os.path.dirname(descriptionFile)
self.configured = self.processDescriptionFile()
# The activate collection setting is in the Workflow provider settings group
if self.configured and iface:
name = self.getActivateSetting()
activateSetting = Setting(self.workflowProvider.longName(), name,
self.tr('Activate %s collection') % self.name(),
self.activate)
ProcessingConfig.addSetting(activateSetting)
ProcessingConfig.addSetting(Setting(self.workflowProvider.longName(),
self.getTaskbarButtonSetting(),
"Show "+self.name()+" collection icon on taskbar",
True))
def unload(self):
WorkflowProviderBase.unload(self)
ProcessingConfig.removeSetting(self.getActivateSetting())
ProcessingConfig.removeSetting(self.getTaskbarButtonSetting())
self.iface.removeToolBarIcon(self.action)
def isActive(self):
active = ProcessingConfig.getSetting(self.getActivateSetting())
if active is None:
active = False
return active
def setActive(self, active):
ProcessingConfig.setSettingValue(self.getActivateSetting(), active)
# Read the JSON description file
def processDescriptionFile(self):
with io.open(self.descriptionFile, "r", encoding="utf-8-sig") as f:
try:
settings = json.loads(f.read())
self.description = settings["description"]
self._name = settings["name"]
self.iconPath = os.path.join(self.baseDir, settings["icon"])
self.aboutHTML = "".join(settings["aboutHTML"])
self.css = os.path.join(self.baseDir, settings["css"])
except ValueError:
msg = self.tr("Workflow collection %s could not be loaded due to invalid JSON " +
"collection.conf file" % (self.baseDir))
if self.iface:
self.iface.messageBar().pushMessage(self.tr("Warning"), msg,
QgsMessageBar.WARNING, 3)
ProcessingLog.addToLog(ProcessingLog.LOG_ERROR, msg)
raise WrongWorkflowException
except KeyError as e:
msg = self.tr("Workflow collection %s could not be fully loaded " % self.baseDir +
"due to missing %s field in JSON collection.conf file" % e)
if self.iface:
self.iface.messageBar().pushMessage(self.tr("Warning"), msg,
QgsMessageBar.WARNING, 3)
ProcessingLog.addToLog(ProcessingLog.LOG_ERROR, msg)
return False
return True
def id(self):
return WorkflowProviderBase.id(self)+"_"+self.name().replace(' ', '_')
# Load all the workflows saved in the workflow folder
def createAlgsList(self):
self.preloadedAlgs = []
for descriptionFile in glob.glob(os.path.join(self.baseDir, "*.workflow")):
self.loadWorkflow(descriptionFile)
return self.preloadedAlgs
def load(self):
if self.configured:
self._addToolbarIcon()
return WorkflowProviderBase.load(self)
else:
return False