Skip to content

Examples

hitch-b24 edited this page Mar 9, 2018 · 2 revisions

Below is a quick example of using the picosapi to create a test script for a pico-based system representing a collection of Wovyn temperatures sensors capable of providing "heartbeat" temperature information to a pico (written in Python 3):

import picosapi as picos
import time
import random

def mock_heartbeat(temperature):
  """
  A real Wovyn temperature sensor produces a heartbeat that contains
  lots of additional information, but since we are only working with
  temperature readings, we only need to reproduce that part of the
  JSON object.
  """
  return { "genericThing":{"data":{"temperature":
            [{"temperatureF": float(format(temperature, '.2f'))}]
         }}}

def reset_pico_based_systems_test(manager_eci):
  """
  This will delete any existing temperature sensor picos registered
  with the sensor manager pico, essentially resetting the state of
  the engine so we can begin another test with a clean slate.
  """
  print("Cleaning up from last test...")
  # construct an EventURL object
  url = picos.event_url(manager_eci, "remove-sensor", 
                              "sensor", "unneeded_sensor")
  for i in range(1,6):
    print("Removing pico sensor %d" % i)
    # post an event to remove the sensor
    ok, r = picos.post(url, data={"sensor_id": i})
    # if we run into an error, stop what we are doing to assess
    if not ok:
      print(r)
  return True

def run_pico_based_systems_test(manager_eci, reset=True):
  """
  Here is the bulk of the testing script. We will create 5 new
  temperature picos, populate them with mock heartbeat data,
  and then randomly delete one of them to make sure that the
  sensor manager is functioning properly.
  """
  if reset:
    # Undo the last test
    reset_pico_based_systems_test(manager_eci)

  # Create 5 picos, generating a series of random heartbeats
  print("Generating 5 picos...")
  # create the event url
  add_url = picos.event_url(manager_eci, "add-sensor", "sensor", 
                            "new_sensor")
  for i in range(1,6):
    # post to the url to create sensor picos
    ok, r = picos.post(add_url, data={"sensor_id": i})
    if not ok:
      print(r)
      return False
    # we need to get the ECI of the newly created pico so that
    # we can populate it with heartbeat data
    eci = r.content["directives"][0]["options"]["pico"]["eci"]
    if eci is None:
      return False

    print("Populating %d with heartbeat data" % i)
    # create the url for heartbeat events
    heartbeat_url = picos.event_url(eci, "mock-heartbeat", 
                                    "wovyn", "heartbeat")
    for temp in range(10):
      # post the heartbeat
      ok, r = picos.post(heartbeat_url, 
                   data=mock_heartbeat(random.uniform(1.0, 70.0)))
      if not ok:
        print(r)
        return False

    print("Sending threshold violating heartbeat data to %d" % i)
    # now that we have 10 valid heartbeat values, we can test the
    # threshold violation functionality of the sensor pico to
    # make sure it will notify us if the temperature is too high
    ok, r = picos.post(heartbeat_url, data=mock_heartbeat(100.0))
    if not ok:
      print(r)
      return False

  # Wait for a bit
  print("Check results!")
  time.sleep(10)

  # Delete a sensor pico
  print("Deleting sensor pico randomly...")
  # another url for generating an event to delete a sensor
  remove_url = picos.event_url(manager_eci, "remove-sensor", 
                               "sensor", "unneeded_sensor")
  ok, r = picos.post(remove_url, 
               data={"sensor_id": random.randint(1,5)})
  if not ok:
    print(r)
    return False

  # Wait for a bit
  print("Check results!")
  time.sleep(10)

  # Get the temperature values for all sensors
  # this url is using the Sky Cloud API to call a shared
  # resource that the manager pico is providing
  temperature_url = picos.api_url(manager_eci, 
                          "manage_sensors", "temperatures")
  ok, r = picos.get(temperature_url)
  print(r)

def main():
  # Pico ECIs should not be disclosed, so I don't show one
  # here. For the purposes of this example we will assume that
  # the ECI is valid
  pico_eci = ...
  run_pico_based_systems_test(pico_eci)

if __name__ == "__main__":
	main()

Picos API Documentation

A Note About Picos

Picos are useful in building Internet of Things (IoT) technology that preserves personal freedom. They are currently being developed under the direction of Dr. Phil Windley by Pico Labs at Brigham Young University. Picos is an actor-based programming system that supports people-centric, reactive programming on the Internet of Things.

For more information about Picos, visit Pico Labs or see Dr. Windley's article on Reactive Programming with Picos.

Clone this wiki locally