Subscribing to Data Events with the CFBD GraphQL API

Subscribing to Data Events with the CFBD GraphQL API
Photo by Museums of History New South Wales / Unsplash

Over the weekend, I announced the new and experimental CFBD GraphQL API. I already broke down most of the benefits of using GraphQL, which includese more dynamic querying and granular control over the data. One benefit is so big that it merits its own post, GraphQL Subscriptions.

Subscriptions do exactly what they say. They allow you to subscribe to data updates. If you're a Patreon subscriber, you may already be familiar with the live endpoints in the CFBD REST API (e.g. /scoreboard). While these endpoints present live data, they also require you, the user, to implement some sort of polling mechanism to re-trigger the endpoint on a cycle. And what's more, the data returned by the endpoint may or may not have changed. It's up to the user to figure out if it has.

In GraphQL, however, subscriptions are event-based. You specify a GraphQL query as a subscription and, instead of polling the data source repeatedly, the query auto-triggers each time that data has actually updated. Instead of making a bunch of calls, you specify one operation and then the data is pushed directly to your code whenever it changes in the CFBD database.

Subscriptions are pretty simple. Let's take a regular GraphQL query, one that queries betting lines from a specific sportsbook for all future games:


query bettingQuery {
	game(
		where: {
			status: { _eq: "scheduled" }
			lines: { provider: { name: { _eq: "Bovada" } } }
			_or: [
				{ homeClassification: { _eq: "fbs" } }
				{ awayClassification: { _eq: "fbs" } }
			]
		}
	) {
		homeTeam
		awayTeam
		lines(where: { provider: { name: { _eq: "Bovada" } } }) {
			spread
			overUnder
			provider {
				name
			}
		}
	}
}

Pretty standard query, right? If we wanted, we could call this query regularly, parsing the response to see if any of the data has changed. Much simpler would be turning it into a subscription:


subscription bettingSubscription {
	game(
		where: {
			status: { _eq: "scheduled" }
			lines: { provider: { name: { _eq: "Bovada" } } }
			_or: [
				{ homeClassification: { _eq: "fbs" } }
				{ awayClassification: { _eq: "fbs" } }
			]
		}
	) {
		homeTeam
		awayTeam
		lines(where: { provider: { name: { _eq: "Bovada" } } }) {
			spread
			overUnder
			provider {
				name
			}
		}
	}
}

That was simple! The only change I made was changing the query operation to a subscription operation (I also changed the arbitrary name of bettingSubscription). Now, whenever the data returned by this query changes in CFBD, I will get an update pushed directly to me. No more polling over and over again. No more trying to figure out if anything has actually changed.

If you want to get pushed an update whenever a game's status changes to "completed" so you know that it's time to pull play or box score data, you can do that. If you want to be alerted as above when a sportsbook spread has changed, you can do that.  Want to be pushed an update when recruiting data changes? You can now do that, too.

Creating a Subscription in Python

One important thing to note, Insomnia does not support GraphQL subscriptions. However, I still recommend always designing all of your GraphQL operations Insomnia since you can take advantage of its autocomplete and interactive GraphQL docs. You would just build the subscription as a query and then change it to a subscription when putting it into your Python code.

We're going to be working with three PyPI packages: gql, asyncio, and backoff. So make sure to have all of these installed in your environment.

We're going to walk through two different examples. Here is the first example and it's pretty simple:


from gql import Client, gql
from gql.transport.websockets import WebsocketsTransport

transport = WebsocketsTransport(
    url="wss://graphql.collegefootballdata.com/v1/graphql",
    headers={ "Authorization": "Bearer YOUR_API_KEY"}
)

client = Client(
    transport=transport,
    fetch_schema_from_transport=True,
)

query = gql('''
    subscription bettingSubscription {
        game(
            where: {
                status: { _eq: "scheduled" }
                lines: { provider: { name: { _eq: "Bovada" } } }
                _or: [
                    { homeClassification: { _eq: "fbs" } }
                    { awayClassification: { _eq: "fbs" } }
                ]
            }
        ) {
            homeTeam
            awayTeam
            lines(where: { provider: { name: { _eq: "Bovada" } } }) {
                spread
                overUnder
                provider {
                    name
                }
            }
        }
    }
''')

for result in client.subscribe(query):
    # put your logic here
    print(result)

Let's walk through what this code is doing. On line 4, we are creating a WebsocketsTransport. You'll note this is different than what we did in the previous post for making GraphQL queries. If you remember, queries and mutations are just HTTP POST requests. If you look at line 5, we are instead using a wss:// protocol. Instead of making an HTTP request, we are working over a WebSocket. Unlike the HTTP protocol, WebSockets establish a persistent connection that allow for two-way communication. This is how GraphQL subscriptions are possible. A persistent connection is opened over a WebSocket. The client submits the subscription to the GraphQL server and then the GraphQL server pushes a communication out to the client whenever there is an update relevant to that subscription.

On line 6, be sure to replace YOUR_API_KEY with the same API key you use to access the CFBD REST API.

Starting at line 14, we build out a GraphQL operation that will be submitted to the GraphQL server as a subscription. This is the same subscription we outlined at the start of this post which subscribes to updates to the spreads and totals from a specific sportsbook (Bovada) for upcoming games.

On line 39, we begin looping through subscription updates. The GraphQL server will return an initial data set pertaining to the subscription query. Whenever there are updates to the data set, more results will appear in the loop and our code will act upon it. In the example above, we are merely printing the results to the console, but this is where you would put the logic that you want to be executed whenever there is a data update, such as pushing the updated data to your own data store.

I mentioned that we would be walking through two different examples. There is one potential issue with the example above: WebSocket connections, while incredibly useful, can be very brittle. The persistent connection can be interrupted for any number of reasons: network outage on your end, network outage on the GraphQL server's end, the GraphQL server going down temporarily for maintenance, etc.

Luckily, there are ways to address this. This is where we will be using the asyncio  and backoff packages. Let's start with some imports:


import asyncio
import backoff

from gql import Client, gql
from gql.transport.websockets import WebsocketsTransport

Next, we are going to extract the GraphQL operation into its own async function. We will take a session as a parameter, which will be used to subscribe to a WebSocket session we will create later. This is basically a copy and paste from the previous example


async def subscribe(session):
    query = gql('''
        subscription bettingSubscription {
            game(
                where: {
                    status: { _eq: "scheduled" }
                    lines: { provider: { name: { _eq: "Bovada" } } }
                    _or: [
                        { homeClassification: { _eq: "fbs" } }
                        { awayClassification: { _eq: "fbs" } }
                    ]
                }
            ) {
                homeTeam
                awayTeam
                lines {
                    spread
                    overUnder
                    provider {
                        name
                    }
                }
            }
        }
    ''')

    async for result in session.subscribe(query):
        # put your logic here
        print(result)

We will now create another function for managing the WebSocket connection and calling our subscription function:


@backoff.on_exception(backoff.expo, Exception, max_time=60)
async def graphql_connection():
    transport = WebsocketsTransport(
        url="wss://graphql.collegefootballdata.com/v1/graphql",
        headers={ "Authorization": "Bearer YOUR_API_KEY"}
    )

    client = Client(
        transport=transport,
        fetch_schema_from_transport=True,
    )
    
    async with client as session:
        task = asyncio.create_task(subscribe(session))
        
        await asyncio.gather(task)

The backoff module is used on line 1. This establishes some retry logic with an exponential backoff. In other words, if the WebSocket connection gets interrupted for any reason, it will retry this method over and over again with an exponential increase in the wait period in between retries.

Starting on line 3, we have some more code copy and pasted from the previous example. Be sure to enter your CFBD API key in on line 5.

The last four lines deal with calling the subscription method using the WebSocket session that was established on the previous lines. What's interesting is that we are calling the subscribe method inside of a task. We could take advantage of this to call multiple subscriptions at once if we had multiple. This would enable them all to share the same WebSocket connection. The modified code would look similar to this:


def subscribe1(session):
    # GraphQL subscription here
    
def subscribe2(session):
    # GraphQL subscription here
    
def subscribe3(session):
    # GraphQL subscription here
    
def subscribe4(session):
    # GraphQL subscription here

@backoff.on_exception(backoff.expo, Exception, max_time=60)
async def graphql_connection():
    transport = WebsocketsTransport(
        url="wss://graphql.collegefootballdata.com/v1/graphql",
        headers={ "Authorization": "Bearer YOUR_API_KEY"}
    )

    client = Client(
        transport=transport,
        fetch_schema_from_transport=True,
    )
    
    async with client as session:
        task1 = asyncio.create_task(subscribe1(session))
        task2 = asyncio.create_task(subscribe2(session))
        task3 = asyncio.create_task(subscribe3(session))
        task4 = asyncio.create_task(subscribe4(session))
        
        await asyncio.gather(task1, task2, task3, task4)

This modification has four different subscriptions to track, each encapsulated by its own function.

The last thing we need to do is call the graphql_connection function and this is where the asyncio package comes into play:


asyncio.run(graphql_connection())

Putting everything together, your final code should look similar to this:


import asyncio
import backoff

from gql import Client, gql
from gql.transport.websockets import WebsocketsTransport

async def subscribe(session):
    query = gql('''
        subscription bettingSubscription {
            game(
                where: {
                    status: { _eq: "scheduled" }
                    lines: { provider: { name: { _eq: "Bovada" } } }
                    _or: [
                        { homeClassification: { _eq: "fbs" } }
                        { awayClassification: { _eq: "fbs" } }
                    ]
                }
            ) {
                homeTeam
                awayTeam
                lines {
                    spread
                    overUnder
                    provider {
                        name
                    }
                }
            }
        }
    ''')

    async for result in session.subscribe(query):
        # put your logic here
        print(result)
        
@backoff.on_exception(backoff.expo, Exception, max_time=60)
async def graphql_connection():
    transport = WebsocketsTransport(
        url="wss://graphql.collegefootballdata.com/v1/graphql",
        headers={ "Authorization": "Bearer YOUR_API_KEY"}
    )

    client = Client(
        transport=transport,
        fetch_schema_from_transport=True,
    )
    
    async with client as session:
        task = asyncio.create_task(subscribe(session))
        
        await asyncio.gather(task)
        
asyncio.run(graphql_connection())

Conclusion

GraphQL subscriptions are a great and efficient mechanism for subscribing to data updates. Whether you are looking to cut back on your API calls or be more efficient with your code, they are a great option. They are also a great option if you need to know when data updates. The experimental CFBD GraphQL API is available to Patreon subscribers at Tier 3. Join today if you would like to check it out. Also, check out my previous post to see more examples of what the GraphQL API can do for you. As always, let me know what you think!