🏠 back to Observable

Pattern for reading from database in batches

I’ve been having an awesome time playing with the direct postgres connection, but often times the bottleneck during visualization experimentation is pulling data. I’m thinking that I could iterate a lot faster if I streamed results from the database in small batches, so I could at least start taking a look at the visualization while it loads vs waiting ~10 seconds for it to finish…

As an example, my private version of https://observablehq.com/@meekohi/deck-gl-scenethink-venues-demo replaces the File with venues = client.query("SELECT latitude, longitude FROM venues LIMIT 1000")

I feel like there is some slick way to solve this with a generator, but I’m not sure how to “hold onto” the previous results…

venues = {
  let offset = 0
  while(true) {
    yield client.query(`SELECT latitude, longitude FROM venues LIMIT 50 OFFSET ${offset}`)
    offset += 50
  }
}

but then I end up creating a new layer each time and throwing away the old ones… the right pattern isn’t coming to me. Anyone have a nice solution they could show?

3 Likes

Came to me at last… curious if there are better approaches:

venues = {
  let offset = 0
  let allVenues = []
  while(true) {
    yield client.query(`SELECT latitude, longitude FROM venues LIMIT 200 OFFSET ${offset}`)
      .then(newVenues => {
        allVenues = [...allVenues, ...newVenues]
        offset += 200
        return allVenues
      })
  }
}
1 Like
venues = {
  const batchSize = 200
  let offset = 0
  let venues = []
  while(true) {
    const newVenues = await client.query(`SELECT latitude, longitude FROM venues LIMIT ${batchSize} OFFSET ${offset}`)
    venues = venues.concat(newVenues)
    offset += batchSize
    yield venues
    if (newVenues.length < batchSize) break // stop when query finishes
  }
}

Alternately, this could be made into a helper function:

function* streamQuery(client, query, batchSize = 200) {
  let offset = 0
  let rows = []
  while(true) {
    const newRows = await client.query(`${query} LIMIT ${batchSize} OFFSET ${offset}`)
    rows = rows.concat(newRows)
    offset += batchSize
    yield rows
    if (newRows.length < batchSize) break // stop when query finishes
  }
}

venues = streamQuery(client, `SELECT latitude, longitude FROM venues`)

I haven’t tested this though.

3 Likes

Very cool!

If anyone comes up with a useful little set of these sort of Postgres helper functions, that are generic to the specific query you’re trying to run … I could see them being immensely popular as an importable helper notebook.

A while ago I created these two helper functions to make it easier to write queries:

query = (strings, ...interpolations) => {
  return client.query(strings.join('?'), interpolations)
}
queryRow = (strings, ...interpolations) => {
  return client.queryRow(strings.join('?'), interpolations)
}

Usage:

query`select * from users where username = ${userName}`

I ran it on MySQL so the syntax might be different for Postgres, but the idea is that it replaces the interpolations with placeholders so injection isn’t possible while maintaining the intuitiveness of concatenation.

This could also be expanded to the explain method. Would it make sense to incorporate this feature into the native API?

1 Like