Skip to content

AsyncIO python library to play with Asterisk Manager Interface (AMI)

License

Notifications You must be signed in to change notification settings

VCTLabs/pyami_asterisk

 
 

Repository files navigation

AsyncIO python library with Asterisk AMI

CI Docs Security check - Bandit Release

pre-commit

PyPI PyPI - Downloads PyPI - License

Pyami_asterisk is a library based on python’s AsyncIO with Asterisk AMI

Install

Install pyami_asterisk

pip install pyami-asterisk

Usage

Asterisk AMI Listen all Events

import asyncio
from pyami_asterisk import AMIClient


def all_events(events):
    print(events)


async def hangup_call(events):
    """asynchronous callbacks"""
    await asyncio.sleep(3)
    print(events)


ami = AMIClient(
    host="127.0.0.1",
    port=5038,
    username="username",
    secret="password",
)
ami.register_event(["*"], all_events)
ami.register_event(["Hangup"], hangup_call)
ami.connect()

Asterisk AMI Listen Events: Registry, ContactStatus, PeerStatus

from pyami_asterisk import AMIClient


def register_multiple_events(events):
    print(events)


async def callback_peer_status(events):
    """
    Response event:
    {
        'Event': 'PeerStatus',
        'Privilege': 'system,all',
        'ChannelType': 'PJSIP',
        'Peer': 'PJSIP/888',
        'PeerStatus': 'Unreachable',
    }
    """
    if events.get("PeerStatus") == "Unreachable":
        await asyncio.sleep(2)
        print(events)

    async def ping(events):
        """asynchronous callbacks"""
        await asyncio.sleep(3)
        print(events)

    ami.create_action({"Action": "Ping"}, ping)


ami = AMIClient(
    host="127.0.0.1",
    port=5038,
    username="username",
    secret="password",
)
ami.register_event(
    patterns=["Registry", "ContactStatus"],
    callbacks=register_multiple_events,
)
ami.register_event(["PeerStatus"], callback_peer_status)
ami.connect()

Asterisk AMI Actions: CoreSettings

import asyncio
from pyami_asterisk import AMIClient


def core_settings(events):
    print(events)


async def core_status(events):
    """asynchronous callbacks"""
    await asyncio.sleep(4)
    print(events)


ami = AMIClient(
    host="127.0.0.1",
    port=5038,
    username="username",
    secret="password",
)
ami.create_action({"Action": "CoreSettings"}, core_settings)
ami.create_action({"Action": "CoreStatus"}, core_status)
ami.connect()

Asterisk AMI Actions: CoreSettings, CoreStatus (repeat 3 seconds)

from pyami_asterisk import AMIClient


def core_settings(events):
    print(events)


def core_status(events):
    print(events)
    print(events["CoreCurrentCalls"])


ami = AMIClient(
    host="127.0.0.1",
    port=5038,
    username="username",
    secret="password",
)
ami.create_action({"Action": "CoreSettings"}, core_settings)
ami.create_action(
    {"Action": "CoreStatus"}, core_status, repeat=3
)
ami.connect()

Asterisk AMI Action Originate

from pyami_asterisk import AMIClient


def callback_originate(events):
    print(events)


ami = AMIClient(
    host="127.0.0.1",
    port=5038,
    username="username",
    secret="password",
)
ami.create_action(
    {
        "Action": "Originate",
        "Channel": "pjsip/203",
        "Timeout": "20000",
        "CallerID": "+37529XXXXXXX <203>",
        "Exten": "+37529XXXXXXX",
        "Context": "from-internal",
        "Async": "true",
        "Variable": r"PJSIP_HEADER(add,Call-Info)=\;Answer-After=0",
        # or multiple Variable ['var=1', 'var=2']
        "Priority": "1",
    },
    callback_originate,
)
ami.connect()

Asterisk AMI Listen Events + Action

from pyami_asterisk import AMIClient


def callback_peer_status(events):
    def callback_ping(response_ping):
        print("Response Ping", response_ping)

    print("PeerStatus", events)
    ami.create_action({"Action": "Ping"}, callback_ping)


ami = AMIClient(
    host="127.0.0.1",
    port=5038,
    username="username",
    secret="password",
)
ami.register_event(["PeerStatus"], callback_peer_status)
ami.connect()

Create asyncio task

import asyncio
import random
from pyami_asterisk import AMIClient


async def refresh_tokens(timeout=4):
    """Example: Refresh tokens"""
    while True:
        print(random.randrange(0, 1000))
        await asyncio.sleep(timeout)


ami = AMIClient(
    host="127.0.0.1",
    port=5038,
    username="username",
    secret="password",
)
ami.create_asyncio_task(tasks=[refresh_tokens(timeout=2)])
ami.connect()

Run / stop async

import asyncio
from pyami_asterisk import AMIClient


async def all_events(event):
    print(event)
    if event.get("Event") == "SuccessfulAuth":
        # connection close
        await ami.connection_close()


async def run_async():
    await asyncio.sleep(2)
    await ami.connect_ami()


ami = AMIClient(
    host="127.0.0.1",
    port=5038,
    username="username",
    secret="password",
)
ami.register_event(["*"], all_events)
asyncio.run(run_async())

About

AsyncIO python library to play with Asterisk Manager Interface (AMI)

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Python 100.0%