Skip to content

Examples

hitch-b24 edited this page Mar 6, 2018 · 2 revisions

Below is a quick example of using the picosapi to create a test script for a pico-based system representing a collection of Wovyn temperatures sensors capable of providing "heartbeat" temperature information to a pico (written in Python 3):

import picosapi as picos
import time
import random

def mock_heartbeat(temperature):
  return { "genericThing":{"data":{"temperature":
            [{"temperatureF": float(format(temperature, '.2f'))}]
         }}}

def reset_pico_based_systems_test(manager_eci):
  print("Cleaning up from last test...")
  url = picos.event_url(manager_eci, "remove-sensor", 
                              "sensor", "unneeded_sensor")
  for i in range(1,6):
    print("Removing pico sensor %d" % i)
    ok, r = picos.post(url, data={"sensor_id": i})
    if not ok:
      print(r)
  return True

def run_pico_based_systems_test(manager_eci, reset=True):
  if reset:
    # Undo the last test
    reset_pico_based_systems_test(manager_eci)

  # Create 5 picos, generating a series of random heartbeats
  print("Generating 5 picos...")
  add_url = picos.event_url(manager_eci, "add-sensor", "sensor", 
                            "new_sensor")
  for i in range(1,6):
    ok, r = picos.post(add_url, data={"sensor_id": i})
    if not ok:
      print(r)
      return False
    eci = r.content["directives"][0]["options"]["pico"]["eci"]
    if eci is None:
      return False

    print("Populating %d with heartbeat data" % i)
    heartbeat_url = picos.event_url(eci, "mock-heartbeat", 
                                    "wovyn", "heartbeat")
    for temp in range(10):
      ok, r = picos.post(heartbeat_url, 
                   data=mock_heartbeat(random.uniform(1.0, 70.0)))
      if not ok:
        print(r)
        return False

    print("Sending threshold violating heartbeat data to %d" % i)
    ok, r = picos.post(heartbeat_url, data=mock_heartbeat(100.0))
    if not ok:
      print(r)
      return False

  # Wait for a bit
  print("Check results!")
  time.sleep(10)

  # Delete a sensor pico
  print("Deleting sensor pico randomly...")
  remove_url = picos.event_url(manager_eci, "remove-sensor", 
                               "sensor", "unneeded_sensor")
  ok, r = picos.post(remove_url, 
               data={"sensor_id": random.randint(1,5)})
  if not ok:
    print(r)
    return False

  # Wait for a bit
  print("Check results!")
  time.sleep(10)

  # Get the temperature values for all sensors
  temperature_url = picos.api_url(manager_eci, 
                          "manage_sensors", "temperatures")
  ok, r = picos.get(temperature_url)
  print(r)

def main():
  # Testing
  pico_eci = ...
  run_pico_based_systems_test(pico_eci)

if __name__ == "__main__":
	main()

Picos API Documentation

A Note About Picos

Picos are useful in building Internet of Things (IoT) technology that preserves personal freedom. They are currently being developed under the direction of Dr. Phil Windley by Pico Labs at Brigham Young University. Picos is an actor-based programming system that supports people-centric, reactive programming on the Internet of Things.

For more information about Picos, visit Pico Labs or see Dr. Windley's article on Reactive Programming with Picos.

Clone this wiki locally