Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support connectivity on Android devices with USB Serial #23

Open
wants to merge 1 commit into
base: 1.0-pyuf
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions uf/android/comm/usbserialandroid_ascii.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#!/usr/bin/env python3
# Software License Agreement (BSD License)
#
# Author: Adrian Clark <adrian.clark@canterbury.ac.nz>
# Modifed from file written by: Duke Fong <duke@ufactory.cc>


import _thread, threading

# Import USB 4 Android and USB Serial 4 Android libraries
from usb4a import usb
from usbserial4a import serial4a

# We need to use the select_port function defined in these scripts as the port selection is slightly different
from ..utils.usbserialandroid_select_serial_port import select_port
from ...utils.log import *

class USBSerialAndroidAscii(threading.Thread):
def __init__(self, ufc, node, iomap, dev_port = None, baud = 115200, filters = None):

self.ports = {
'in': {'dir': 'in', 'type': 'topic', 'callback': self.in_cb},
'out': {'dir': 'out', 'type': 'topic'},
'service': {'dir': 'in', 'type': 'service', 'callback': self.service_cb}
}

self.node = node
self.logger = logging.getLogger('uf.' + node.replace('/', '.'))
ufc.node_init(node, self.ports, iomap)

dev_port = select_port(logger = self.logger, dev_port = dev_port, filters = filters)

if not dev_port:
quit(1)

# Use the USB Serial 4 Android Serial Port Function to open
self.com = serial4a.get_serial_port(dev_port.name, baud, 8, 'N', 1)

if not self.com.isOpen():
raise Exception('serial open failed')
threading.Thread.__init__(self)
self.daemon = True
self.alive = True
self.start()

def run(self):
while self.alive:
# Read_Until is the equivalent function to ReadLines
line = self.com.read_until()
if not line:
continue
line = ''.join(map(chr, line)).rstrip()
self.logger.log(logging.VERBOSE, '-> ' + line)
if self.ports['out']['handle']:
self.ports['out']['handle'].publish(line)
self.com.close()

def stop(self):
self.alive = False
self.join()

def in_cb(self, message):
self.logger.log(logging.VERBOSE, '<- ' + message)
self.com.write(bytes(map(ord, message + '\n')))

def service_cb(self, message):
pass


128 changes: 128 additions & 0 deletions uf/android/serial/tools/usbserialandroid_list_ports.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#!/usr/bin/env python
#
# This is a module that gathers a list of serial ports including details on
# GNU/Linux systems.
#
# Author: Adrian Clark <adrian.clark@canterbury.ac.nz>
#
# This file is was modified based off of part of pySerial.
# https://github.com/pyserial/pyserial
# (C) 2011-2015 Chris Liechti <cliechti@gmx.net>
#
# SPDX-License-Identifier: BSD-3-Clause

from __future__ import absolute_import

import glob
import os

class USBSerial4A():
"""Wrapper for USBSerial 4 Android library"""

def __init__(self, device):
# Store the USBDevice reference in Device
self.device = device
self.name = device.getDeviceName()

# Haven't translated across the relevant functions yet
# But they aren't necessary for functionality
self.description = 'n/a'
self.hwid = 'n/a'

# USB specific data pulled from USBDevice
self.vid = device.getVendorId()
self.pid = device.getProductId()
self.serial_number = device.getSerialNumber()
self.manufacturer = device.getManufacturerName()
self.product = device.getProductName()

# Haven't translated across the relevant functions yet
# But they aren't necessary for functionality
self.location = None
self.interface = None

self.description = self.usb_description()
self.hwid = self.usb_info()

# Original Code from pySerial - should double check against
# What is being done with USBSerial 4 Android to ensure consistency
#
# self.usb_device_path = None
# if os.path.exists('/sys/class/tty/{}/device'.format(self.name)):
# self.device_path = os.path.realpath('/sys/class/tty/{}/device'.format(self.name))
# self.subsystem = os.path.basename(os.path.realpath(os.path.join(self.device_path, 'subsystem')))
# else:
# self.device_path = None
# self.subsystem = None
# check device type
# if self.subsystem == 'usb-serial':
# self.usb_interface_path = os.path.dirname(self.device_path)
#elif self.subsystem == 'usb':
# self.usb_interface_path = self.device_path
#else:
# self.usb_interface_path = None
# fill-in info for USB devices
#if self.usb_interface_path is not None:
# self.usb_device_path = os.path.dirname(self.usb_interface_path)

# try:
# num_if = int(self.read_line(self.usb_device_path, 'bNumInterfaces'))
# except ValueError:
# num_if = 1

# self.vid = int(self.read_line(self.usb_device_path, 'idVendor'), 16)
# self.pid = int(self.read_line(self.usb_device_path, 'idProduct'), 16)
# self.serial_number = self.read_line(self.usb_device_path, 'serial')
# if num_if > 1: # multi interface devices like FT4232
# self.location = os.path.basename(self.usb_interface_path)
# else:
# self.location = os.path.basename(self.usb_device_path)

#self.manufacturer = self.read_line(self.usb_device_path, 'manufacturer')
#self.product = self.read_line(self.usb_device_path, 'product')
#self.interface = self.read_line(self.usb_interface_path, 'interface')

#if self.subsystem in ('usb', 'usb-serial'):
# self.apply_usb_info()
#~ elif self.subsystem in ('pnp', 'amba'): # PCI based devices, raspi
#elif self.subsystem == 'pnp': # PCI based devices
# self.description = self.name
# self.hwid = self.read_line(self.device_path, 'id')
#elif self.subsystem == 'amba': # raspi
# self.description = self.name
# self.hwid = os.path.basename(self.device_path)

#if is_link:
#self.hwid += ' LINK={}'.format(device)
def usb_info(self):
"""return a string with USB related information about device"""
return 'USB VID:PID={:04X}:{:04X}{}{}'.format(
self.vid or 0,
self.pid or 0,
' SER={}'.format(self.serial_number) if self.serial_number is not None else '',
' LOCATION={}'.format(self.location) if self.location is not None else '')

def usb_description(self):
"""return a short string to name the port based on USB info"""
if self.interface is not None:
return '{} - {}'.format(self.product, self.interface)
elif self.product is not None:
return self.product
else:
return self.name

from usb4a import usb
def comports(include_links=False):
# Get all the USB Devices
usb_device_list = usb.get_usb_device_list()

# Parse them through USBSerial4A class and return as a list
return [info
for info in [USBSerial4A(d) for d in usb_device_list]
] # hide non-present internal serial ports

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# test
if __name__ == '__main__':
for info in sorted(comports()):
print("{0}: {0.subsystem}".format(info))
109 changes: 109 additions & 0 deletions uf/android/usbserialandroid_swift/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#!/usr/bin/env python3
# Software License Agreement (BSD License)
#
# Author: Adrian Clark <adrian.clark@canterbury.ac.nz>
# Modifed from file written by: Duke Fong <duke@ufactory.cc>

from ...utils.module_group import ModuleGroup
from ..comm.usbserialandroid_ascii import USBSerialAndroidAscii
from ...comm.protocol_ascii import ProtocolAscii

# I had to duplicate these files otherwise it pulls in original the
# original Swift files, and then crashes requiring PySerial, which
# we don't actually need
from .swift_body import SwiftBody
from .gripper import Gripper
from .pump import Pump
from .keys import Keys

# A new class was used so we can use the USBSerialAndroidAscii Module
# Rather than the one based on PySerial
class UsbSerialAndroid_Swift(ModuleGroup):
'''\
The top module of swift and swift_pro
default kwargs: dev_port = None, baud = 115200, filters = {'hwid': 'USB VID:PID=2341:0042'}
'''

def __init__(self, ufc, node, iomap, **kwargs):

self.sub_nodes = [
{
# Module here is changed
'module': USBSerialAndroidAscii,
'node': 'serial_ascii',
'args': ['dev_port', 'baud', 'filters'],
'iomap': {
'out': 'inner: pkt_ser2ptc',
'in': 'inner: pkt_ptc2ser'
}
},
{
'module': ProtocolAscii,
'node': 'ptc_ascii',
'args': ['cmd_pend_size'],
'iomap': {
'cmd_async': 'outer: ptc_async',
'cmd_sync': 'outer: ptc_sync',
'report': 'outer: ptc_report',
'service': 'outer: ptc',

'packet_in': 'inner: pkt_ser2ptc',
'packet_out': 'inner: pkt_ptc2ser'
}
},
{
'module': SwiftBody,
'node': 'swift_body',
'iomap': {
'pos_in': 'outer: pos_in',
'pos_out': 'outer: pos_out',
'buzzer': 'outer: buzzer',
'service': 'outer: service',

'cmd_async': 'outer: ptc_async',
'cmd_sync': 'outer: ptc_sync',
'report': 'outer: ptc_report'
}
},
{
'module': Gripper,
'node': 'gripper',
'iomap': {
'service': 'outer: gripper',
'cmd_sync': 'outer: ptc_sync'
}
},
{
'module': Pump,
'node': 'pump',
'iomap': {
'service': 'outer: pump',
'limit_switch': 'outer: limit_switch',
'cmd_sync': 'outer: ptc_sync',
'report': 'outer: ptc_report'
}
},
{
'module': Keys,
'node': 'keys',
'iomap': {
'service': 'outer: keys',
'key0': 'outer: key0',
'key1': 'outer: key1',
'cmd_sync': 'outer: ptc_sync',
'report': 'outer: ptc_report'
}
}
]

if 'dev_port' not in kwargs:
kwargs['dev_port'] = None
if 'baud' not in kwargs:
kwargs['baud'] = 115200
if 'filters' not in kwargs:
kwargs['filters'] = {'hwid': 'USB VID:PID=2341:0042'}
if 'cmd_pend_size' not in kwargs:
kwargs['cmd_pend_size'] = 2
super().__init__(ufc, node, iomap, **kwargs)


64 changes: 64 additions & 0 deletions uf/android/usbserialandroid_swift/gripper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/usr/bin/env python3
# Software License Agreement (BSD License)
#
# Copyright (c) 2017, UFactory, Inc.
# All rights reserved.
#
# Author: Duke Fong <duke@ufactory.cc>

from time import sleep
from ...utils.log import *


class Gripper():
def __init__(self, ufc, node, iomap):

self.ports = {
'service': {'dir': 'in', 'type': 'service', 'callback': self.service_cb},

'cmd_sync': {'dir': 'out', 'type': 'service'},
}

self.logger = logging.getLogger('uf.' + node.replace('/', '.'))
ufc.node_init(node, self.ports, iomap)

def set_gripper(self, val):
if val == 'on':
self.ports['cmd_sync']['handle'].call('M2232 V1')
else:
self.ports['cmd_sync']['handle'].call('M2232 V0')

# TODO: modify the default timeout time with a service command
for _ in range(20):
ret = self.ports['cmd_sync']['handle'].call('P2232')
if val == 'on':
if ret == 'ok V2': # grabbed
return 'ok'
else:
if ret == 'ok V0': # stop
return 'ok'
sleep(0.5)
return 'err, timeout for {}, last ret: {}'.format(val, ret)

def service_cb(self, msg):
msg = msg.split(' ', 2)

if msg[1] == 'value':
if msg[0] == 'get':
ret = self.ports['cmd_sync']['handle'].call('P2232')
self.logger.debug('get value ret: %s' % ret)

if ret == 'ok V0':
return 'ok, stoped'
elif ret == 'ok V1':
return 'ok, working'
elif ret == 'ok V2':
return 'ok, grabbed'
else:
return 'err, unkown ret: %s' % ret

if msg[0] == 'set':
self.logger.debug('set value: %s' % msg[2])
return self.set_gripper(msg[2])


Loading