-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathnabto.py
153 lines (125 loc) · 3.98 KB
/
nabto.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""
Helper module for working with Nabto communication client
More info on Nabto can be found here:
https://www.nabto.com/developer
https://downloads.nabto.com/assets/docs/TEN025 Writing a Nabto API client application.pdf
"""
import sys
import os
import json
from ctypes import *
class Client:
"""
Simple wrapper for Nabto client library (currently only limited session/RPC functionality)
"""
def __init__(self, home):
if sys.platform == 'win32':
library = 'nabto_client_api.dll'
else:
library = 'libnabto_client_api.so'
package_dir = os.path.dirname(os.path.abspath(__file__))
self.client = cdll.LoadLibrary(os.path.join(package_dir, 'libs', library))
# NABTO_DECL_PREFIX nabto_status_t NABTOAPI nabtoStartup(const char* nabtoHomeDir);
self.client.nabtoStartup(home.encode())
self.client.nabtoInstallDefaultStaticResources(None)
self.client.nabtoSetOption(b'urlPortalHostName', b'lscontrol')
def __del__(self):
# NABTO_DECL_PREFIX nabto_status_t NABTOAPI nabtoShutdown(void);
self.client.nabtoShutdown()
def GetLocalDevices(self):
"""
Enumerate local Nabto devices
Returns
-------
list
Found devices
"""
devices = pointer(c_char_p())
count = c_int(0)
# NABTO_DECL_PREFIX nabto_status_t NABTOAPI nabtoGetLocalDevices(char*** devices, int* numberOfDevices);
self.client.nabtoGetLocalDevices(pointer(devices), pointer(count))
if (count.value != 0):
return [devices.contents.value]
return []
def CreateProfile(self, user, pwd):
"""
Create profile that can be used to establish a session to device
Parameters
----------
user : str
User name of account associated with device
pwd : str
Password for given account
Returns
-------
int
Nabto status (0=success)
"""
# NABTO_DECL_PREFIX nabto_status_t NABTOAPI nabtoCreateProfile(const char* email, const char* password);
return self.client.nabtoCreateProfile(user.encode(), pwd.encode())
def OpenSession(self, user, pwd):
"""
Open session to device
Parameters
----------
user : str
User name of account associated with device
pwd : str
Password for given account
Returns
-------
Client.Session
Session object
"""
return self.Session(self.client, user, pwd)
class Session:
"""
A class that represents opened session
"""
def __init__(self, client, user, pwd):
self.client = client
session = c_void_p()
# NABTO_DECL_PREFIX nabto_status_t NABTOAPI nabtoOpenSession(nabto_handle_t* session, const char* id, const char* password);
status = self.client.nabtoOpenSession(pointer(session), user.encode(), pwd.encode())
if status == 5:
status = self.client.nabtoCreateProfile(user.encode(), pwd.encode())
if status != 0:
print('nabtoCreateProfile error (%d)' % status)
session = c_void_p()
status = self.client.nabtoOpenSession(pointer(session), user.encode(), pwd.encode())
if status != 0:
print('nabtoOpenSession error (%d)' % status)
self.session = session
def __del__(self):
self.client.nabtoCloseSession(self.session)
def RpcSetDefaultInterface(self, interfaceDefinition):
"""
Assign RPC interface definition to session
Parameters
----------
interfaceDefinition : str
XML with RPC interface definition
"""
err = c_char_p()
# NABTO_DECL_PREFIX nabto_status_t NABTOAPI nabtoRpcSetDefaultInterface(nabto_handle_t session, const char* interfaceDefinition, char** errorMessage);
if self.client.nabtoRpcSetDefaultInterface(self.session, interfaceDefinition.encode(), pointer(err)) != 0:
print('nabtoRpcSetDefaultInterface error: %s' % err)
def RpcInvoke(self, nabtoUrl):
"""
Invoke RPC command
Parameters
----------
nabtoUrl : str
URL that contains RPC command along with command parameters
Returns
-------
dict
RPC response
"""
out = c_char_p()
self.client.nabtoRpcInvoke(self.session, nabtoUrl.encode(), pointer(out))
if out:
response = out.value
self.client.nabtoFree(out)
return json.loads(response)
return []