Websockets and python asyncio

Websockets and python asyncio could be complicated, especially for the novice user. In this post I will show best practices and give you a better understanding on how to use the data stream websocket making sure you don't fall in the common pitfalls.

First we must understand what is asyncio and the event loop. A very thorough guide could be found here but try to think of it as a multitasking manager that could switch contexts to do many things at once.

So we have a single thread, that knows when to do a context switch and perform a task from a completely different function/class/part of your code. It is especially useful when your task waits for a different entity to do something.

For instance, when you request something from an online service. While you wait for the response, there's no reason to hold the execution loop for yourself - give the cpu power to functions that need it. When the response is received, you will get the context of the thread back and you will be able to process it, while a different part of the code may wait for you to complete the processing.

But let us go back to our context. We want to use the python-sdk to get data streams from the api service.  

Please note that this post is inline with alpaca_trade_api v0.53.0

The very basic example would be this:

from alpaca_trade_api import StreamConn
from alpaca_trade_api.common import URL
 
ALPACA_API_KEY = "<YOUR-API-KEY>"
ALPACA_SECRET_KEY = "<YOUR-SECRET-KEY>"
 
 
if __name__ == '__main__':
    conn = StreamConn(
        ALPACA_API_KEY,
        ALPACA_SECRET_KEY,
        base_url=URL('https://paper-api.alpaca.markets'),
        data_url=URL('https://data.alpaca.markets'),
        data_stream='alpacadatav1'
    )

    @conn.on(r'Q\..+')
    async def on_quotes(conn, channel, quote):
        print('quote', quote)
        
    conn.run(['alpacadatav1/Q.GOOG'])

In this example code we create a StreamConn object that subscribes to google (GOOG) quotes and prints them once received.

So why did I mention the python event loop and asyncio? Under the hood the StreamConn object is using the websockets python module which is based on asyncio.

What does it mean?

  • It has an event loop that schedules asyncio tasks. And instead of waiting for outside entities, the context is moved between tasks.
  • When a quote is received from the servers, an asyncio task we've created is called to print the received quote. That is the on_quotes method. So that means we could create callbacks to let our code decide what to do with the data.
  • And most important - when we call conn.run() it blocks. That means that we cannot write code after that point because it will not execute while the websocket connection is running. You may ask yourself "how to stop the connection/change the subscription?". Keep reading.

To learn more about the different data streams available in the Alpaca API check this link.

Advanced usage

So far we haven't seen anything new (most alpaca users will know how to do the basic example I've just shown. Let's talk about some other features we may want from this data stream connection. We may want to:

  • Make sure it runs forever (or as long as we want) and when it disconnects it takes care of the reconnection on its own.
  • Be able to pause/resume the connection. Could be very handy during night time when the stream is not active, or when we want to upgrade our environment.
  • Change the subscription of the websocket without restarting it. For instance if we want to subscribe to google quotes and then want to change it to apple quotes.

So first of all, we have complete example code for each of these scenarios in our GitHub repo and you should definitely start your implementation with those. They are located here.

Now let's look at each scenario.

Websocket Reconnection

The StreamConn object has a reconnection mechanism built in, but we could use this pattern to make sure nothing goes wrong. A full code example is available here

import time

from alpaca_trade_api import StreamConn
from alpaca_trade_api.common import URL

ALPACA_API_KEY = "<YOUR-API-KEY>"
ALPACA_SECRET_KEY = "<YOUR-SECRET-KEY>"


def run_connection(conn, channels):
    try:
        conn.run(channels)
    except Exception as e:
        print(f'Exception from websocket connection: {e}')
    finally:
        print("Trying to re-establish connection")
        time.sleep(3)
        run_connection(conn, channels)

if __name__ == '__main__':
    channels = ['alpacadatav1/Q.GOOG']

    conn = StreamConn(
        ALPACA_API_KEY,
        ALPACA_SECRET_KEY,
        base_url=URL('https://paper-api.alpaca.markets'),
        data_url=URL('https://data.alpaca.markets'),
        data_stream='alpacadatav1'
    )

    @conn.on(r'Q\..+')
    async def on_quotes(conn, channel, quote):
        print('quote', quote)

    run_connection(conn, channels)

Pause/Resume Data Stream

This is a newly added feature of v0.53.0, and a full example code is available here. As we previously mentioned the conn.run() method is blocking, meaning we cannot execute anything after that. So in order to communicate with our conn object we need to use another thread. We then use the stop_ws() method that signals the connection to stop. Let's see:

import threading
import asyncio
import time

from alpaca_trade_api import StreamConn
from alpaca_trade_api.common import URL

ALPACA_API_KEY = "<YOUR-API-KEY>"
ALPACA_SECRET_KEY = "<YOUR-SECRET-KEY>"

conn: StreamConn = None

def consumer_thread():
    try:
        # make sure we have an event loop, if not create a new one
        loop = asyncio.get_event_loop()
        loop.set_debug(True)
    except RuntimeError:
        asyncio.set_event_loop(asyncio.new_event_loop())

    global conn
    conn = StreamConn(
        ALPACA_API_KEY,
        ALPACA_SECRET_KEY,
        base_url=URL('https://paper-api.alpaca.markets'),
        data_url=URL('https://data.alpaca.markets'),
        data_stream='alpacadatav1'
    )
    @conn.on(r'Q\..+')
    async def on_quotes(conn, channel, quote):
        print('quote', quote)

    conn.run(['alpacadatav1/Q.GOOG'])

if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    while 1:
        threading.Thread(target=consumer_thread).start()
        time.sleep(5)
        loop.run_until_complete(conn.stop_ws())
        time.sleep(20)

Change the Data Subscription

Our last example will show how to change the data subscription without closing the current connection. We again need to use another thread to achieve that we call the subscribe method that will change the subscription. Full example code.

import threading
import asyncio
import time

from alpaca_trade_api import StreamConn
from alpaca_trade_api.common import URL

ALPACA_API_KEY = "<YOUR-API-KEY>"
ALPACA_SECRET_KEY = "<YOUR-SECRET-KEY>"

conn: StreamConn = None

def consumer_thread():
    try:
        # make sure we have an event loop, if not create a new one
        loop = asyncio.get_event_loop()
        loop.set_debug(True)
    except RuntimeError:
        asyncio.set_event_loop(asyncio.new_event_loop())

    global conn
    conn = StreamConn(
        ALPACA_API_KEY,
        ALPACA_SECRET_KEY,
        base_url=URL('https://paper-api.alpaca.markets'),
        data_url=URL('https://data.alpaca.markets'),
        data_stream='alpacadatav1'
    )

    @conn.on(r'^AM\..+$')
    async def on_minute_bars(conn, channel, bar):
        print('bars', bar)


    @conn.on(r'Q\..+')
    async def on_quotes(conn, channel, quote):
        print('quote', quote)


    @conn.on(r'T\..+')
    async def on_trades(conn, channel, trade):
        print('trade', trade)

    conn.run(['alpacadatav1/Q.GOOG'])

if __name__ == '__main__':
    threading.Thread(target=consumer_thread).start()

    loop = asyncio.get_event_loop()

    time.sleep(5)  # give the initial connection time to be established
    subscriptions = [['alpacadatav1/AM.TSLA'], ['alpacadatav1/Q.GOOG'],
                     ['alpacadatav1/T.AAPL']]

    while 1:
        for channels in subscriptions:
            loop.run_until_complete(conn.subscribe(channels))
            if "AM." in channels[0]:
                time.sleep(60)  # aggs are once every minute. give it time
            else:
                time.sleep(20)

You may want to combine the examples to one script, if your requirements need to. Understand these, and build your own robust data stream.


Resources:

Async IO in Python: A Complete Walkthrough – Real Python
This tutorial will give you a firm grasp of Python’s approach to async IO, which is a concurrent programming design that has received dedicated support in Python, evolving rapidly from Python 3.4 through 3.7 (and probably beyond).
Market Data Streaming - Documentation | Alpaca
Alpaca API lets you build and trade with real-time market data for free.
aaugustin/websockets
Library for building WebSocket servers and clients in Python - aaugustin/websockets
alpacahq/alpaca-trade-api-python
Python client for Alpaca’s trade API. Contribute to alpacahq/alpaca-trade-api-python development by creating an account on GitHub.

You can also follow Alpaca and our weekly updates on our LinkedIn, Alpaca Community Slack and @AlpacaHQ on Twitter!

Brokerage services are provided by Alpaca Securities LLC ("Alpaca"), memberFINRA/SIPC, a wholly-owned subsidiary of AlpacaDB, Inc. Technology and services are offered by AlpacaDB, Inc.