-
Notifications
You must be signed in to change notification settings - Fork 81
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
sal
committed
Jan 16, 2025
1 parent
26debc3
commit b4e66d1
Showing
13 changed files
with
327 additions
and
101 deletions.
There are no files selected for viewing
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
|
||
import commune as c | ||
|
||
class Test: | ||
def test_encryption(self, values = [10, 'fam', 'hello world']): | ||
cls = c.module('key') | ||
for value in values: | ||
value = str(value) | ||
key = cls.new_key() | ||
enc = key.encrypt(value) | ||
dec = key.decrypt(enc) | ||
assert dec == value, f'encryption failed, {dec} != {value}' | ||
return {'encrypted':enc, 'decrypted': dec} | ||
|
||
def test_encryption_with_password(self, value = 10, password = 'fam'): | ||
cls = c.module('key') | ||
value = str(value) | ||
key = cls.new_key() | ||
enc = key.encrypt(value, password=password) | ||
dec = key.decrypt(enc, password=password) | ||
assert dec == value, f'encryption failed, {dec} != {value}' | ||
return {'encrypted':enc, 'decrypted': dec} | ||
|
||
def test_key_encryption(self, test_key='test.key'): | ||
self = c.module('key') | ||
key = self.add_key(test_key, refresh=True) | ||
og_key = self.get_key(test_key) | ||
r = self.encrypt_key(test_key) | ||
self.decrypt_key(test_key, password=r['password']) | ||
key = self.get_key(test_key) | ||
assert key.ss58_address == og_key.ss58_address, f'key encryption failed, {key.ss58_address} != {self.ss58_address}' | ||
return {'success': True, 'msg': 'test_key_encryption passed'} | ||
|
||
def test_key_management(self, key1='test.key' , key2='test2.key'): | ||
self = c.module('key') | ||
if self.key_exists(key1): | ||
self.rm_key(key1) | ||
if self.key_exists(key2): | ||
self.rm_key(key2) | ||
self.add_key(key1) | ||
k1 = self.get_key(key1) | ||
assert self.key_exists(key1), f'Key management failed, key still exists' | ||
self.mv_key(key1, key2) | ||
k2 = self.get_key(key2) | ||
assert k1.ss58_address == k2.ss58_address, f'Key management failed, {k1.ss58_address} != {k2.ss58_address}' | ||
assert self.key_exists(key2), f'Key management failed, key does not exist' | ||
assert not self.key_exists(key1), f'Key management failed, key still exists' | ||
self.mv_key(key2, key1) | ||
assert self.key_exists(key1), f'Key management failed, key does not exist' | ||
assert not self.key_exists(key2), f'Key management failed, key still exists' | ||
self.rm_key(key1) | ||
# self.rm_key(key2) | ||
assert not self.key_exists(key1), f'Key management failed, key still exists' | ||
assert not self.key_exists(key2), f'Key management failed, key still exists' | ||
return {'success': True, 'msg': 'test_key_management passed'} | ||
|
||
|
||
def test_signing(self): | ||
self = c.module('key')() | ||
sig = self.sign('test') | ||
assert self.verify('test',sig, self.public_key) | ||
return {'success':True} | ||
|
||
def test_key_encryption(self, password='1234'): | ||
cls = c.module('key') | ||
path = 'test.enc' | ||
cls.add_key('test.enc', refresh=True) | ||
assert cls.is_key_encrypted(path) == False, f'file {path} is encrypted' | ||
cls.encrypt_key(path, password=password) | ||
assert cls.is_key_encrypted(path) == True, f'file {path} is not encrypted' | ||
cls.decrypt_key(path, password=password) | ||
assert cls.is_key_encrypted(path) == False, f'file {path} is encrypted' | ||
cls.rm(path) | ||
print('file deleted', path, c.exists, 'fam') | ||
assert not c.exists(path), f'file {path} not deleted' | ||
return {'success': True, 'msg': 'test_key_encryption passed'} | ||
|
||
def test_move_key(self): | ||
self = c.module('key')() | ||
self.add_key('testfrom') | ||
assert self.key_exists('testfrom') | ||
og_key = self.get_key('testfrom') | ||
self.mv_key('testfrom', 'testto') | ||
assert self.key_exists('testto') | ||
assert not self.key_exists('testfrom') | ||
new_key = self.get_key('testto') | ||
assert og_key.ss58_address == new_key.ss58_address | ||
self.rm_key('testto') | ||
assert not self.key_exists('testto') | ||
return {'success':True, 'msg':'test_move_key passed', 'key':new_key.ss58_address} | ||
|
||
|
||
def test_ss58_encoding(self): | ||
self = c.module('key') | ||
keypair = self.create_from_uri('//Alice') | ||
ss58_address = keypair.ss58_address | ||
public_key = keypair.public_key | ||
assert keypair.ss58_address == self.ss58_encode(public_key, ss58_format=42) | ||
assert keypair.ss58_address == self.ss58_encode(public_key, ss58_format=42) | ||
assert keypair.public_key.hex() == self.ss58_decode(ss58_address) | ||
assert keypair.public_key.hex() == self.ss58_decode(ss58_address) | ||
return {'success':True} | ||
|
||
def test(self): | ||
return c.run_test(self) | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
import commune as c | ||
|
||
|
||
class Test: | ||
|
||
def test_global_params(self): | ||
self = c.module('subspace')() | ||
global_params = self.global_params() | ||
assert isinstance(global_params, dict) | ||
return {'msg': 'global_params test passed', 'success': True} | ||
|
||
def test_subnet_params(self, subnet=0): | ||
self = c.module('subspace')() | ||
subnet_params = self.subnet_params(subnet=subnet) | ||
assert isinstance(subnet_params, dict), f'{subnet_params} is not a dict' | ||
return {'msg': 'subnet_params test passed', 'success': True} | ||
|
||
|
||
def test_module_params(self, keys=['dividends', 'incentive'], subnet=0): | ||
self = c.module('subspace')() | ||
key = self.keys(subnet)[0] | ||
module_info = self.get_module(key, subnet=subnet) | ||
assert isinstance(module_info, dict) | ||
for k in keys: | ||
assert k in module_info, f'{k} not in {module_info}' | ||
|
||
return {'msg': 'module_params test passed', 'success': True, 'module_info': module_info} | ||
|
||
|
||
def test_substrate(self): | ||
self = c.module('subspace')() | ||
for i in range(3): | ||
t1 = c.time() | ||
c.print(self.substrate) | ||
t2 = c.time() | ||
c.print(f'{t2-t1:.2f} seconds') | ||
return {'msg': 'substrate test passed', 'success': True} | ||
|
||
|
||
def test(self): | ||
fns = [f for f in dir(self) if 'test_' in f] | ||
fn2result = {} | ||
fn2error = {} | ||
for fn in fns: | ||
try: | ||
result = getattr(self, fn)() | ||
fn2result[fn] = result | ||
except Exception as e: | ||
fn2error[fn] = {'success': False, 'msg': str(e)} | ||
if len(fn2error) > 0: | ||
raise Exception(f'Errors: {fn2error}') | ||
|
||
return fn2result | ||
|
||
|
||
|
||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
|
||
|
||
import commune as c | ||
|
||
class Test: | ||
def test_serializer(self): | ||
self = c.module('serializer')() | ||
import torch, time | ||
data_list = [ | ||
torch.ones(1000), | ||
torch.zeros(1000), | ||
torch.rand(1000), | ||
[1,2,3,4,5], | ||
{'a':1, 'b':2, 'c':3}, | ||
'hello world', | ||
c.df([{'name': 'joe', 'fam': 1}]), | ||
1, | ||
1.0, | ||
True, | ||
False, | ||
None | ||
|
||
] | ||
for data in data_list: | ||
t1 = time.time() | ||
ser_data = self.serialize(data) | ||
des_data = self.deserialize(ser_data) | ||
des_ser_data = self.serialize(des_data) | ||
t2 = time.time() | ||
|
||
latency = t2 - t1 | ||
emoji = '✅' if str(des_ser_data) == str(ser_data) else '❌' | ||
print(type(data),emoji) | ||
return {'msg': 'PASSED test_serialize_deserialize'} | ||
|
||
|
||
def test_basics(self) -> dict: | ||
servers = c.servers() | ||
c.print(servers) | ||
name = f'module::test' | ||
c.serve(name) | ||
c.kill(name) | ||
assert name not in c.servers() | ||
return {'success': True, 'msg': 'server test passed'} | ||
|
||
def test_serving(self, name = 'module::test'): | ||
module = c.serve(name) | ||
module = c.connect(name) | ||
r = module.info() | ||
assert 'name' in r, f"get failed {r}" | ||
c.kill(name) | ||
assert name not in c.servers(update=1) | ||
return {'success': True, 'msg': 'server test passed'} | ||
|
||
def test_serving_with_different_key(self, module = 'module', timeout=10): | ||
tag = 'test_serving_with_different_key' | ||
key_name = module + '::'+ tag | ||
module_name = module + '::'+ tag + '_b' | ||
if not c.key_exists(key_name): | ||
key = c.add_key(key_name) | ||
c.print(c.serve(module_name, key=key_name)) | ||
key = c.get_key(key_name) | ||
c.sleep(2) | ||
info = c.call(f'{module_name}/info', timeout=2) | ||
assert info.get('key', None) == key.ss58_address , f" {info}" | ||
c.kill(module_name) | ||
c.rm_key(key_name) | ||
assert not c.key_exists(key_name) | ||
assert not c.server_exists(module_name) | ||
return {'success': True, 'msg': 'server test passed'} | ||
|
||
def test(self): | ||
return c.run_test(self) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
import commune as c | ||
class Test: | ||
def test( | ||
self, | ||
n=2, | ||
tag = 'vali_test_net', | ||
miner='module', | ||
trials = 5, | ||
tempo = 4, | ||
update=True, | ||
path = '/tmp/commune/vali_test', | ||
network='local' | ||
): | ||
test_miners = [f'{miner}::{tag}{i}' for i in range(n)] | ||
modules = test_miners | ||
search = tag | ||
assert len(modules) == n, f'Number of miners not equal to n {len(modules)} != {n}' | ||
for m in modules: | ||
c.serve(m) | ||
namespace = c.namespace() | ||
for m in modules: | ||
assert m in namespace, f'Miner not in namespace {m}' | ||
|
||
vali = c.module('vali')(network=network, search=search, path=path, update=update, tempo=tempo, run_loop=False) | ||
print(vali.modules) | ||
scoreboard = [] | ||
while len(scoreboard) < n: | ||
c.sleep(1) | ||
scoreboard = vali.epoch() | ||
trials -= 1 | ||
assert trials > 0, f'Trials exhausted {trials}' | ||
for miner in modules: | ||
c.print(c.kill(miner)) | ||
return {'success': True, 'msg': 'subnet test passed'} |
Oops, something went wrong.