Making SQLite Upserts Snappy (with Python and SQLAlchemy)

Making SQLite Upserts Snappy (with Python and SQLAlchemy)

Chris Roberts is a Software Engineer 2 on FlightAware’s Backend team. He spends a lot of time thinking about how to make FlightAware’s APIs more pleasant to use.

I recently had the opportunity to optimize some code at FlightAware and learned a great deal along the way.

In this post, we explore some of the tradeoffs of performance optimization and where to draw the line in the never-ending quest for speedy code.

Background

I was tasked with developing an open-source reference service we could provide to users of our Firehose flight data feed. It would receive, process, and store messages from the feed, making it easier for users to integrate FlightAware’s flight data with their existing infrastructure. Very simply put, Firehose is a socket connection which sends you newline-separated JSON messages. These messages represent real-time updates to flight statuses (departures, arrivals, cancellations, etc.). Also, the messages are sparse: one message for a flight may provide its route and an ETA while the next may contain just the flight’s callsign.

We decided to call the application Firestarter (get it?).

Firestarter would use the Firehose messages to maintain a simple database table representing the current state of all flights coming through the feed. It follows a classic upsert model: new flight? Insert it. Existing flight? Update the relevant table row with any updated fields.

Our primary goal with Firestarter was to make it simple for users to interact with Firehose. To keep the application entirely self-contained, we chose SQLite as our database engine. Ideally, a user could just run the application and then do what they want with the resulting table. We understood that that was not always feasible, though, and wanted to ensure that users could still benefit from reading Firestarter’s code. In service to this goal, we chose to implement Firestarter in Python due to its widespread adoption and reputation for clarity.

Performance was a consideration as well. At a minimum, Firestarter needs to be capable of handling around 150 messages per second (msg/s). That target represents keeping up with the average real-time rate of worldwide flight events on a busy day. Ideally, there’d be a healthy buffer on top of that to allow for traffic spikes or catching up to real-time if Firestarter needs to reconnect to Firehose. 300 msg/s seemed like a reasonable target then.

Optimizing Firestarter’s Performance

What follows is a reproduction of the rough progression of Firestarter’s performance from a measly 20 msg/s to well beyond our target. We’ll be looking at quite a bit of code, but none of it is terribly complicated.

And of course, it’s not a performance post without some benchmarking! I fed a few thousand Firehose messages into a text file to serve as test data.

All benchmarks were run on a Dell server with a Xeon E5-2630 @ 2.3 GHz and a spinning disk. Software versions:

  • Ubuntu 20.04.1 (linux kernel 5.4.0-42)
  • Python 3.9.0
  • SQLAlchemy 1.3.20
  • SQLite 3.31.1

Finally, we need a harness to aid us in benchmarking our code. For brevity, I’ve left out the full table definition and standard library imports (and will continue to do so in all proceeding code samples). Don’t worry, though, the full code used for these benchmarks is on our Github. If that only piques your curiosity further, you can find the actual Firestarter code here. Try not to spoil the post for yourself, though!

import sqlalchemy as sa

meta = sa.MetaData()
table = sa.Table(
    "flights",
    meta,
    sa.Column("id", sa.String, primary_key=True),
    sa.Column("ident", sa.String),
    sa.Column("reg", sa.String),
    ...
    sa.Column("predicted_off", sa.String),
    sa.Column("predicted_on", sa.String),
    sa.Column("predicted_in", sa.String),
)

try:
    os.remove("flights.db")
except FileNotFoundError:
    pass
engine = sa.create_engine("sqlite:///flights.db")

meta.create_all(engine)

def run(fn):
    lines = int(sys.argv[1])
    start = time.time()
    fn(lines)
    total_time = time.time() - start
    print(f"{lines} messages processed in {total_time:.2f} seconds: {lines / total_time:.1f}msg/s")

In the harness, we set up our database and provide a function to easily run some code and measure its performance. Separating out the harness like this helps us stay focused on the code we’re actually benchmarking, and it ensures we’re performing identical setup across the multiple test cases we’ll be developing.

v1: The basest of baselines

Now for the good stuff. We’ll start simple, really simple, so simple you probably already know this isn’t going to go well. Nevertheless, we must start somewhere!

from harness import table, engine, run

def write_to_db(message):
    with engine.connect() as conn:
        existing_flight = conn.execute(table.select().where(table.c.id == message[“id”])).first()
        if existing_flight:
            conn.execute(table.update().where(table.c.id == message[“id”]), message)
        else:
            conn.execute(table.insert(), message)

def main(lines):
    for line in itertools.islice(sys.stdin.readlines(), lines):
        write_to_db(json.loads(line))

if __name__ == “__main__”:
    run(main)

We read a line from stdin, parse it, and check the table for its id. If it exists already, we update the row; otherwise, we insert it. Our sole goal right now is clarity, so we made the code straightforward.

How’s the performance? Pretty abysmal, it turns out.

$ python v1.py 100 < messages.jsonl
100 messages processed in 5.02 seconds: 19.9msg/s

We’ve made a classic blunder, one common enough that it was worth including in SQLite’s own FAQ. We’re using a new transaction for every single query. Well, technically SQLite is using a new transaction, we’re just not telling it to do otherwise. Unfortunately, transactions are quite slow. Their performance is based directly on the speed of your disk (likely the slowest component in your computer). In return for this poor performance, though, transactions provide us with data durability.

v2: Tweaking some options

What can we do about this? SQLite’s FAQ has a few pieces of advice:

  • Disable synchronous database writes with PRAGMA synchronous=OFF;
  • Perform more queries per transaction

The first tip risks database corruption, which is not acceptable. However, further research shows that the less extreme PRAGMA synchronous=NORMAL; can offer modest performance benefits without risk of corruption. Let’s give it a try.

It’s a bit strange to enable through SQLAlchemy. We have to hook the engine connection:

from sqlalchemy import event

@event.listens_for(engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
    cursor = dbapi_connection.cursor()
    cursor.execute("PRAGMA synchronous=NORMAL")
    cursor.close()

The rest of the code is identical to V1 above.

$ python v2.py 100 < messages.jsonl
100 messages processed in 3.91 seconds: 25.6msg/s

A 28% improvement. Not bad, but still nowhere near what we need.

v3: Speed at all costs

What about the second option? We can easily reduce the number of transactions, just wrap the whole script in one!

def write_to_db(conn, message):
    existing_flight = conn.execute(table.select().where(table.c.id == message[“id”])).first()
    if existing_flight: 
        conn.execute(table.update().where(table.c.id == message[“id”]), message)
    else: 
        conn.execute(table.insert(), message)

def main(lines):
    with engine.begin() as conn:
        for line in islice(sys.stdin.readlines(), lines):
            write_to_db(conn, json.loads(line))

Only the modified code has been included. We’ve pulled our context manager out to our main function and called engine.begin() to start a transaction that won’t be committed until we’re done processing lines.

$ python v3.py 100 < messages.jsonl
100 messages processed in 0.44 seconds: 225.4msg/s

Whoa! Now there’s the performance we need! In fact, it ran quickly enough that we should really re-run it with more messages to ensure we’re seeing legitimate numbers.

$ python v3.py 2000 < messages.jsonl
2000 messages processed in 3.08 seconds: 649.5msg/s

This easily exceeds our original performance target. There’s a problem, though: we’ve traded away all our durability for pure performance. By putting all our queries into one transaction, any interruption to our application will result in the loss of all data! Worse still, the above approach won’t even work in the real-world application of Firestarter where there’s no end to the data it receives.

v4: Finding a sweet spot

Resolving this isn’t very difficult, but it raises the question: how much data are we willing to lose if something goes wrong? If there’s power loss or a crash in the middle of a transaction, the data written as part of that transaction never ends up in the table. With our V1 code, that meant losing — at worst — one message. With the new speedy option, we’ll lose everything. We need to pick a point somewhere in the middle of that sliding scale of durability vs. performance. To do so, we can rewrite our loop to perform a transaction periodically:

def main(lines):
    connection = engine.connect()
    transaction = connection.begin()
    start_time = time.time()
    for line in islice(sys.stdin.readlines(), lines):
        write_to_db(connection, json.loads(line))
        if time.time() > start_time + <period>:
            transaction.commit()
            transaction = connection.begin()
            start_time = time.time()
    transaction.commit()

It’s not as clean as the prior code. We’ve had to do away with our context manager, and there’s timing code in the loop now, but it’s all for a noble pursuit: better performance. Now we’re just left to pick the period. Fortunately, there are many correct answers here. I somewhat arbitrarily settled on 1 second. Its impact on performance wasn’t too severe.

$ python v4.py 2000 < messages.jsonl
2000 messages processed in 3.42 seconds: 585.3msg/s

v5: Taking things too far

We could stop here, satisfied with almost 2x our initial performance target, but that’s not what I did when I was developing Firestarter; I was just starting to have some fun! I had been bitten by the optimization bug. How fast could we make it? It became an exercise in pride rather than satisfying any business requirements.

Our initial jump in performance came from a classic optimization technique: batching our work. Instead of making many small writes to disk, we make one larger write and reap the performance benefits. Were there other opportunities for batching in the code?

What about batching the SQL queries themselves? There wasn’t really a need for us to execute a SELECT for every message. What if we accumulated the messages for a bit and then checked the table for all the ids at once using the IN operator?

cache = {}

def add_to_cache(message):
    cache.setdefault(message[“id”], {}).update(message)

def write_to_db(conn):
    existing_flights = conn.execute(table.select().where(table.c.id.in_(cache)))
    for flight in existing_flights:
        conn.execute(table.update().where(table.c.id == flight.id), cache.pop(flight.id))
    # We popped the updates, so anything left must be an insert.
    for flight in cache.values():
        conn.execute(table.insert(), flight)
    cache.clear()

def main(lines):
    start_time = time.time()
    for line in islice(sys.stdin.readlines(), lines):
        add_to_cache(json.loads(line))
        if time.time() > start_time + 1:
            with engine.begin() as connection:
                write_to_db(connection)
            start_time = time.time()
    with engine.begin() as connection:
        write_to_db(connection)

Now we’re getting more complicated; we’ve added a cache to the mix. Instead of committing a transaction every second, we flush the cache. In write_to_db(), we now only need to execute SELECT once. Let’s see what that does for us:

$ python v5.py 2000 < messages.jsonl
2000 messages processed in 0.83 seconds: 2417.2msg/s

The endorphins start to rush. We just quadrupled our performance! Let’s bump the line count up again.

$ python v5.py 20000 < messages.jsonl
20000 messages processed in 4.32 seconds: 4631.7msg/s

It just gets better and better.

What about the UPDATEs and INSERTs? Surely those could be batched as well?

v6:⚡️⚡️⚡️

It didn’t take long to stumble across SQLAlchemy’s documentation about executing multiple statements, though adapting the code was trickier than expected:

from sqlalchemy.sql import bindparam

cache = {}
BASE_MESSAGE = dict.fromkeys(c.name for c in table.c)

def add_to_cache(message):
    cache.setdefault(message[“id”], {}).update(message)

def write_to_db(conn):
    existing_flights = conn.execute(table.select().where(table.c.id.in_(cache)))
    updates = [dict(*flight, _id=flight.id) | cache.pop(flight.id) for flight in existing_flights]
    if updates:
        # sqlalchemy reserves the “id” bindparam for its own use
        conn.execute(table.update().where(table.c.id == bindparam(“_id”)), *updates)
    # We popped the updates, so anything left must be an insert.
    # All inserted dicts must have same structure.
    inserts = [BASE_MESSAGE | val for val in cache.values()]
    if inserts:
        conn.execute(table.insert(), *inserts)
    cache.clear()

Updates and inserts could be batched, but all the dictionaries we passed needed to contain the same keys. This required some additional code in the update section since we essentially needed to merge the current and updated fields in Python rather than letting the query do it. It took me a few tries to get that working correctly. Where does our performance stand now?

$ python v6.py 20000 < messages.jsonl
20000 messages processed in 2.53 seconds: 7899.9msg/s

This is around when I came to my senses while working on Firestarter. Having exceeded our initial performance target by 25x, I was really just stroking my ego at this point. Sure, having all this headroom meant we could buy back some durability, but there are other tradeoffs at play here, too.

Tradeoffs

Remember the primary goals I mentioned at the beginning of this post? We wanted this code to be easy to read and understand. In the pursuit of maximum performance, we sacrificed that. And what about correctness? The trickier the techniques that we use, the more likely we are to introduce bugs. As I already said, it took me a few tries to actually get V6 of the code working correctly.

I also haven’t even mentioned the fact that conn.execute(table.select().where(table.c.id.in_(cache))) will fail for caches with over 1000 items if SQLite was compiled with its default flags. Many distributions (Debian, Alpine, Arch) tweak the compile flags so that they are not subject to this issue, but we don’t want Firestarter to be limited to just one of those distributions. We could work around the issue, but that would result in further complicating our code.

Lessons Learned

Striving for performance involves both obvious and not-so-obvious costs. As satisfying as it can be to watch the benchmark numbers tick up and up, we must always keep our primary requirements at the forefront of our minds. Only optimize as much as is needed, and no more.

Chris Roberts

Chris Roberts

Chris Roberts is a Senior Software Engineer on the Backend Alpha Wing. He spends a lot of time thinking about how to make FlightAware's APIs more pleasant to use.

Show Comments
Back to home