Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion packages/core/src/preagg/PreAggregator.js
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,10 @@ function preaggregateInfo(clientQuery, active, preaggCols, schema) {
// generate creation query string and hash id
const create = query.toString();
const id = (fnv_hash(create) >>> 0).toString(16);
const table = `${schema}.preagg_${id}`;

// Simplification of Ibis experiment by ensuring all tables land in the default schema
// const table = `${schema}.preagg_${id}`;
const table = `preagg_${id}`;

// generate preaggregate select query
const select = Query
Expand Down
44 changes: 40 additions & 4 deletions packages/duckdb-server/pkg/__main__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import sys

import duckdb
import ibis
from diskcache import Cache

from pkg.server import server
Expand All @@ -10,12 +10,48 @@
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)


class MosaicConnection:
def __init__(self, backend: ibis.BaseBackend):
self.backend = backend
self.duckdb = ibis.duckdb.connect()

def query(self, query: str):
# We query duckdb natively when encountering a describe statement or when backend is set to duckdb
# SQLGlot parses DESCRIBE statements under the hood, but packages/core/src/util/field-info.js expects responses in duckdb's format
if query.lower().startswith("describe") or self.backend.name == "duckdb":
res = self.duckdb.con.query(query).arrow()
# We still return an ibis.Table object to keep the interface consistent
return ibis.memtable(res)

# In all other cases we go through the Ibis backend's SQL API
# DuckDB query -> Ibis internal query representation -> Backend-specific query
logger.debug(f'Executing query on backend: {self.backend.name}')
table = self.backend.sql(query, dialect="duckdb")
logger.debug(f'Ibis internal representation: {table}')
return table

def execute(self, query: str):
# For this experiment we create all tables in DuckDB first, then sync them to the backend
# (many backends do not support read_parquet, do not have access to local files, etc.)
execute_res = self.duckdb.con.execute(query)
self._sync_tables()
return execute_res

def _sync_tables(self):
# We use the DuckDB backend instance as our catalog of tables and copy their contents to the target backend
for table_name in self.duckdb.list_tables():
table = self.duckdb.table(table_name)
self.backend.create_table(table_name, table.to_pyarrow(), overwrite=True)


def serve():
db_path = sys.argv[1] if len(sys.argv) >= 2 else ":memory:" # noqa: PLR2004
backend_resource = sys.argv[1] if len(sys.argv) >= 2 else "duckdb://:memory:" # noqa: PLR2004

logger.info(f"Using backend resource {backend_resource}")
backend = ibis.connect(backend_resource)

logger.info(f"Using DuckDB {db_path}")
con = MosaicConnection(backend)

con = duckdb.connect(db_path)
cache = Cache()

logger.info(f"Caching in {cache.directory}")
Expand Down
4 changes: 2 additions & 2 deletions packages/duckdb-server/pkg/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def retrieve(cache, query, get):


def get_arrow(con, sql):
return con.query(sql).arrow()
return con.query(sql).to_pyarrow()


def arrow_to_bytes(arrow):
Expand All @@ -42,5 +42,5 @@ def get_arrow_bytes(con, sql):


def get_json(con, sql):
result = con.query(sql).df()
result = con.query(sql).to_pandas()
return result.to_json(orient="records")
3 changes: 2 additions & 1 deletion packages/duckdb-server/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ requires-python = ">=3.10"
dependencies = [
"diskcache",
"duckdb>=1.1.3",
"ibis-framework[duckdb,polars,clickhouse,postgres,datafusion]",
"pandas",
"pyarrow",
"socketify",
"ujson"
"ujson",
]

[project.scripts]
Expand Down
Loading