Skip to content

Commit a2b65cd

Browse files
committed
databricks connector
1 parent ce5a1e3 commit a2b65cd

File tree

4 files changed

+563
-10
lines changed

4 files changed

+563
-10
lines changed

package.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
"vite": "^7.0.0"
6464
},
6565
"devDependencies": {
66+
"@databricks/sql": "^1.11.0",
6667
"@duckdb/node-api": "^1.3.2-alpha.26",
6768
"@eslint/js": "^9.29.0",
6869
"@google-cloud/bigquery": "^8.1.1",
@@ -80,12 +81,16 @@
8081
"vitest": "^3.2.4"
8182
},
8283
"peerDependencies": {
84+
"@databricks/sql": "^1.11.0",
8385
"@duckdb/node-api": "^1.3.2-alpha.26",
8486
"@google-cloud/bigquery": "^8.1.1",
8587
"postgres": "^3.4.7",
8688
"snowflake-sdk": "^2.1.3"
8789
},
8890
"peerDependenciesMeta": {
91+
"@databricks/sql": {
92+
"optional": true
93+
},
8994
"@duckdb/node-api": {
9095
"optional": true
9196
},

src/databases/databricks.ts

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import {DBSQLClient, DBSQLLogger, LogLevel, thrift} from "@databricks/sql";
2+
import type {DBSQLSession} from "@databricks/sql";
3+
import type {DatabricksConfig, QueryTemplateFunction} from "./index.js";
4+
import {ColumnSchema} from "../runtime/index.js";
5+
6+
type IOperation = Awaited<ReturnType<DBSQLSession["executeStatement"]>>;
7+
type TTableSchema = NonNullable<Awaited<ReturnType<IOperation["getSchema"]>>>;
8+
type TColumnDesc = TTableSchema["columns"][0];
9+
type TTypeDesc = TColumnDesc["typeDesc"];
10+
const TTypeId = thrift.TCLIService_types.TTypeId;
11+
12+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
13+
export default function databricks({type, ...options}: DatabricksConfig): QueryTemplateFunction {
14+
return async (strings, ...params) => {
15+
const logger = new DBSQLLogger({level: LogLevel.error});
16+
const client = new DBSQLClient({logger});
17+
await client.connect(options);
18+
try {
19+
const session = await client.openSession();
20+
try {
21+
const date = new Date();
22+
const operation = await session.executeStatement(strings.join("?"), {
23+
runAsync: true,
24+
ordinalParameters: params,
25+
maxRows: 10000
26+
});
27+
try {
28+
const rows = (await operation.fetchAll()) as Record<string, unknown>[];
29+
const schema = await operation.getSchema();
30+
return {rows, schema: getTableSchema(schema!), duration: Date.now() - +date, date};
31+
} finally {
32+
await operation.close();
33+
}
34+
} finally {
35+
await session.close();
36+
}
37+
} finally {
38+
await client.close();
39+
}
40+
};
41+
}
42+
43+
function getTableSchema({columns}: TTableSchema): ColumnSchema[] {
44+
return columns.map(getColumnSchema);
45+
}
46+
47+
function getColumnSchema(column: TColumnDesc): ColumnSchema {
48+
return {name: column.columnName, type: getColumnType(column.typeDesc)};
49+
}
50+
51+
function getColumnType({types: [type]}: TTypeDesc): ColumnSchema["type"] {
52+
switch (type.primitiveEntry?.type) {
53+
case TTypeId.BINARY_TYPE:
54+
return "buffer";
55+
case TTypeId.BOOLEAN_TYPE:
56+
return "boolean";
57+
case TTypeId.BIGINT_TYPE:
58+
case TTypeId.TINYINT_TYPE:
59+
case TTypeId.SMALLINT_TYPE:
60+
case TTypeId.INT_TYPE:
61+
case TTypeId.DECIMAL_TYPE:
62+
return "integer";
63+
case TTypeId.DOUBLE_TYPE:
64+
case TTypeId.FLOAT_TYPE:
65+
return "number";
66+
case TTypeId.DATE_TYPE:
67+
case TTypeId.TIMESTAMP_TYPE:
68+
case TTypeId.INTERVAL_DAY_TIME_TYPE:
69+
case TTypeId.INTERVAL_YEAR_MONTH_TYPE:
70+
return "date";
71+
default:
72+
return "string";
73+
}
74+
}

src/databases/index.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import type {ColumnSchema, QueryParam} from "../runtime/index.js";
77

88
export type DatabaseConfig =
99
| BigQueryConfig
10+
| DatabricksConfig
1011
| DuckDBConfig
1112
| SQLiteConfig
1213
| SnowflakeConfig
@@ -20,6 +21,15 @@ export type BigQueryConfig = {
2021
projectId?: string;
2122
};
2223

24+
export type DatabricksConfig = {
25+
type: "databricks";
26+
host: string;
27+
path: string;
28+
} & (
29+
| {authType?: "access-token"; token: string}
30+
| {authType: "databricks-oauth"; oauthClientId?: string; oauthClientSecret?: string}
31+
);
32+
2333
export type DuckDBConfig = {
2434
type: "duckdb";
2535
path?: string;
@@ -83,7 +93,7 @@ export async function getDatabaseConfig(
8393
else if (databaseName === "duckdb") config = {type: "duckdb"};
8494
else if (databaseName === "sqlite") config = {type: "sqlite"};
8595
else if (/\.duckdb$/i.test(databaseName)) config = {type: "duckdb", path: databaseName};
86-
else if (/\.db$/i.test(databaseName)) config = {type: "sqlite", path: databaseName}; // TODO disambiguate
96+
else if (/\.db$/i.test(databaseName)) config = {type: "sqlite", path: databaseName};
8797
else throw new Error(`database not found: ${databaseName}`);
8898
}
8999
return config;
@@ -93,10 +103,12 @@ export async function getDatabase(config: DatabaseConfig): Promise<QueryTemplate
93103
switch (config.type) {
94104
case "bigquery":
95105
return (await import("./bigquery.js")).default(config);
106+
case "databricks":
107+
return (await import("./databricks.js")).default(config);
96108
case "duckdb":
97109
return (await import("./duckdb.js")).default(config);
98110
case "sqlite":
99-
return (await import(process.versions.bun ? "./sqlite-bun.js" : "./sqlite-node.js")).default(config);
111+
return (await import(process.versions.bun ? "./sqlite-bun.js" : "./sqlite-node.js")).default(config); // prettier-ignore
100112
case "snowflake":
101113
return (await import("./snowflake.js")).default(config);
102114
case "postgres":

0 commit comments

Comments
 (0)