@@ -115,337 +115,3 @@ where o_orderstatus in (
115115
116116 Ok ( ( ) )
117117}
118-
119- #[ tokio:: test]
120- async fn tpch_q2_correlated ( ) -> Result < ( ) > {
121- let ctx = SessionContext :: new ( ) ;
122- register_tpch_csv ( & ctx, "part" ) . await ?;
123- register_tpch_csv ( & ctx, "supplier" ) . await ?;
124- register_tpch_csv ( & ctx, "partsupp" ) . await ?;
125- register_tpch_csv ( & ctx, "nation" ) . await ?;
126- register_tpch_csv ( & ctx, "region" ) . await ?;
127-
128- let sql = r#"select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
129- from part, supplier, partsupp, nation, region
130- where p_partkey = ps_partkey and s_suppkey = ps_suppkey and p_size = 15 and p_type like '%BRASS'
131- and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'EUROPE'
132- and ps_supplycost = (
133- select min(ps_supplycost) from partsupp, supplier, nation, region
134- where p_partkey = ps_partkey and s_suppkey = ps_suppkey and s_nationkey = n_nationkey
135- and n_regionkey = r_regionkey and r_name = 'EUROPE'
136- )
137- order by s_acctbal desc, n_name, s_name, p_partkey;"# ;
138-
139- // assert plan
140- let dataframe = ctx. sql ( sql) . await . unwrap ( ) ;
141- let plan = dataframe. into_optimized_plan ( ) . unwrap ( ) ;
142- let actual = format ! ( "{}" , plan. display_indent( ) ) ;
143- let expected = r#"Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplier.s_name ASC NULLS LAST, part.p_partkey ASC NULLS LAST
144- Projection: supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment
145- Projection: part.p_partkey, part.p_mfgr, supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, nation.n_name
146- Inner Join: part.p_partkey = __scalar_sq_1.ps_partkey, partsupp.ps_supplycost = __scalar_sq_1.__value
147- Inner Join: nation.n_regionkey = region.r_regionkey
148- Inner Join: supplier.s_nationkey = nation.n_nationkey
149- Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
150- Inner Join: part.p_partkey = partsupp.ps_partkey
151- Filter: part.p_size = Int32(15) AND part.p_type LIKE Utf8("%BRASS")
152- TableScan: part projection=[p_partkey, p_mfgr, p_type, p_size], partial_filters=[part.p_size = Int32(15), part.p_type LIKE Utf8("%BRASS")]
153- TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]
154- TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]
155- TableScan: nation projection=[n_nationkey, n_name, n_regionkey]
156- Filter: region.r_name = Utf8("EUROPE")
157- TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8("EUROPE")]
158- SubqueryAlias: __scalar_sq_1
159- Projection: partsupp.ps_partkey, MIN(partsupp.ps_supplycost) AS __value
160- Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[MIN(partsupp.ps_supplycost)]]
161- Inner Join: nation.n_regionkey = region.r_regionkey
162- Inner Join: supplier.s_nationkey = nation.n_nationkey
163- Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
164- TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]
165- TableScan: supplier projection=[s_suppkey, s_nationkey]
166- TableScan: nation projection=[n_nationkey, n_regionkey]
167- Filter: region.r_name = Utf8("EUROPE")
168- TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8("EUROPE")]"# ;
169- assert_eq ! ( actual, expected) ;
170-
171- // assert data
172- let results = execute_to_batches ( & ctx, sql) . await ;
173- let expected = vec ! [ "++" , "++" ] ;
174- assert_batches_eq ! ( expected, & results) ;
175-
176- Ok ( ( ) )
177- }
178-
179- #[ tokio:: test]
180- async fn tpch_q4_correlated ( ) -> Result < ( ) > {
181- let orders = r#"4,13678,O,53829.87,1995-10-11,5-LOW,Clerk#000000124,0,
182- 35,12760,O,192885.43,1995-10-23,4-NOT SPECIFIED,Clerk#000000259,0,
183- 65,1627,P,99763.79,1995-03-18,1-URGENT,Clerk#000000632,0,
184- "# ;
185- let lineitems = r#"4,8804,579,1,30,51384,0.03,0.08,N,O,1996-01-10,1995-12-14,1996-01-18,DELIVER IN PERSON,REG AIR,
186- 35,45,296,1,24,22680.96,0.02,0,N,O,1996-02-21,1996-01-03,1996-03-18,TAKE BACK RETURN,FOB,
187- 65,5970,481,1,26,48775.22,0.03,0.03,A,F,1995-04-20,1995-04-25,1995-05-13,NONE,TRUCK,
188- "# ;
189-
190- let ctx = SessionContext :: new ( ) ;
191- register_tpch_csv_data ( & ctx, "orders" , orders) . await ?;
192- register_tpch_csv_data ( & ctx, "lineitem" , lineitems) . await ?;
193-
194- let sql = r#"
195- select o_orderpriority, count(*) as order_count
196- from orders
197- where exists (
198- select * from lineitem where l_orderkey = o_orderkey and l_commitdate < l_receiptdate)
199- group by o_orderpriority
200- order by o_orderpriority;
201- "# ;
202-
203- // assert plan
204- let dataframe = ctx. sql ( sql) . await . unwrap ( ) ;
205- let plan = dataframe. into_optimized_plan ( ) . unwrap ( ) ;
206- let actual = format ! ( "{}" , plan. display_indent( ) ) ;
207- let expected = r#"Sort: orders.o_orderpriority ASC NULLS LAST
208- Projection: orders.o_orderpriority, COUNT(UInt8(1)) AS order_count
209- Aggregate: groupBy=[[orders.o_orderpriority]], aggr=[[COUNT(UInt8(1))]]
210- LeftSemi Join: orders.o_orderkey = lineitem.l_orderkey
211- TableScan: orders projection=[o_orderkey, o_orderpriority]
212- Filter: lineitem.l_commitdate < lineitem.l_receiptdate
213- TableScan: lineitem projection=[l_orderkey, l_commitdate, l_receiptdate]"#
214- . to_string ( ) ;
215- assert_eq ! ( actual, expected) ;
216-
217- // assert data
218- let results = execute_to_batches ( & ctx, sql) . await ;
219- let expected = vec ! [
220- "+-----------------+-------------+" ,
221- "| o_orderpriority | order_count |" ,
222- "+-----------------+-------------+" ,
223- "| 1-URGENT | 1 |" ,
224- "| 4-NOT SPECIFIED | 1 |" ,
225- "| 5-LOW | 1 |" ,
226- "+-----------------+-------------+" ,
227- ] ;
228- assert_batches_eq ! ( expected, & results) ;
229-
230- Ok ( ( ) )
231- }
232-
233- #[ tokio:: test]
234- async fn tpch_q17_correlated ( ) -> Result < ( ) > {
235- let parts = r#"63700,goldenrod lavender spring chocolate lace,Manufacturer#1,Brand#23,PROMO BURNISHED COPPER,7,MED BOX,901.00,ly. slyly ironi
236- "# ;
237- let lineitems = r#"1,63700,7311,2,36.0,45983.16,0.09,0.06,N,O,1996-04-12,1996-02-28,1996-04-20,TAKE BACK RETURN,MAIL,ly final dependencies: slyly bold
238- 1,63700,3701,3,1.0,13309.6,0.1,0.02,N,O,1996-01-29,1996-03-05,1996-01-31,TAKE BACK RETURN,REG AIR,"riously. regular, express dep"
239- "# ;
240-
241- let ctx = SessionContext :: new ( ) ;
242- register_tpch_csv_data ( & ctx, "part" , parts) . await ?;
243- register_tpch_csv_data ( & ctx, "lineitem" , lineitems) . await ?;
244-
245- let sql = r#"select sum(l_extendedprice) / 7.0 as avg_yearly
246- from lineitem, part
247- where p_partkey = l_partkey and p_brand = 'Brand#23' and p_container = 'MED BOX'
248- and l_quantity < (
249- select 0.2 * avg(l_quantity)
250- from lineitem where l_partkey = p_partkey
251- );"# ;
252-
253- // assert plan
254- let dataframe = ctx. sql ( sql) . await . unwrap ( ) ;
255- let plan = dataframe. into_optimized_plan ( ) . unwrap ( ) ;
256- let actual = format ! ( "{}" , plan. display_indent( ) ) ;
257- let expected = r#"Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) / Float64(7) AS avg_yearly
258- Aggregate: groupBy=[[]], aggr=[[SUM(lineitem.l_extendedprice)]]
259- Filter: CAST(lineitem.l_quantity AS Decimal128(30, 15)) < CAST(__scalar_sq_1.__value AS Decimal128(30, 15))
260- Inner Join: part.p_partkey = __scalar_sq_1.l_partkey, lineitem.l_partkey = __scalar_sq_1.l_partkey
261- Inner Join: lineitem.l_partkey = part.p_partkey
262- TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice]
263- Filter: part.p_brand = Utf8("Brand#23") AND part.p_container = Utf8("MED BOX")
264- TableScan: part projection=[p_partkey, p_brand, p_container]
265- SubqueryAlias: __scalar_sq_1
266- Projection: lineitem.l_partkey, Float64(0.2) * CAST(AVG(lineitem.l_quantity) AS Float64) AS __value
267- Aggregate: groupBy=[[lineitem.l_partkey]], aggr=[[AVG(lineitem.l_quantity)]]
268- TableScan: lineitem projection=[l_partkey, l_quantity]"#
269- . to_string ( ) ;
270- assert_eq ! ( actual, expected) ;
271-
272- // assert data
273- let results = execute_to_batches ( & ctx, sql) . await ;
274- let expected = vec ! [
275- "+--------------------+" ,
276- "| avg_yearly |" ,
277- "+--------------------+" ,
278- "| 190.13714285714286 |" ,
279- "+--------------------+" ,
280- ] ;
281- assert_batches_eq ! ( expected, & results) ;
282-
283- Ok ( ( ) )
284- }
285-
286- #[ tokio:: test]
287- async fn tpch_q20_correlated ( ) -> Result < ( ) > {
288- let ctx = SessionContext :: new ( ) ;
289- register_tpch_csv ( & ctx, "supplier" ) . await ?;
290- register_tpch_csv ( & ctx, "nation" ) . await ?;
291- register_tpch_csv ( & ctx, "partsupp" ) . await ?;
292- register_tpch_csv ( & ctx, "part" ) . await ?;
293- register_tpch_csv ( & ctx, "lineitem" ) . await ?;
294-
295- let sql = r#"select s_name, s_address
296- from supplier, nation
297- where s_suppkey in (
298- select ps_suppkey from partsupp
299- where ps_partkey in ( select p_partkey from part where p_name like 'forest%' )
300- and ps_availqty > ( select 0.5 * sum(l_quantity) from lineitem
301- where l_partkey = ps_partkey and l_suppkey = ps_suppkey and l_shipdate >= date '1994-01-01'
302- )
303- )
304- and s_nationkey = n_nationkey and n_name = 'CANADA'
305- order by s_name;
306- "# ;
307-
308- // assert plan
309- let dataframe = ctx. sql ( sql) . await . unwrap ( ) ;
310- let plan = dataframe. into_optimized_plan ( ) . unwrap ( ) ;
311- let actual = format ! ( "{}" , plan. display_indent( ) ) ;
312- let expected = r#"Sort: supplier.s_name ASC NULLS LAST
313- Projection: supplier.s_name, supplier.s_address
314- LeftSemi Join: supplier.s_suppkey = __correlated_sq_1.ps_suppkey
315- Inner Join: supplier.s_nationkey = nation.n_nationkey
316- TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey]
317- Filter: nation.n_name = Utf8("CANADA")
318- TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("CANADA")]
319- SubqueryAlias: __correlated_sq_1
320- Projection: partsupp.ps_suppkey AS ps_suppkey
321- Filter: CAST(partsupp.ps_availqty AS Float64) > __scalar_sq_1.__value
322- Inner Join: partsupp.ps_partkey = __scalar_sq_1.l_partkey, partsupp.ps_suppkey = __scalar_sq_1.l_suppkey
323- LeftSemi Join: partsupp.ps_partkey = __correlated_sq_2.p_partkey
324- TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty]
325- SubqueryAlias: __correlated_sq_2
326- Projection: part.p_partkey AS p_partkey
327- Filter: part.p_name LIKE Utf8("forest%")
328- TableScan: part projection=[p_partkey, p_name], partial_filters=[part.p_name LIKE Utf8("forest%")]
329- SubqueryAlias: __scalar_sq_1
330- Projection: lineitem.l_partkey, lineitem.l_suppkey, Float64(0.5) * CAST(SUM(lineitem.l_quantity) AS Float64) AS __value
331- Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_quantity)]]
332- Filter: lineitem.l_shipdate >= Date32("8766")
333- TableScan: lineitem projection=[l_partkey, l_suppkey, l_quantity, l_shipdate], partial_filters=[lineitem.l_shipdate >= Date32("8766")]"# ;
334- assert_eq ! ( actual, expected) ;
335-
336- // assert data
337- let results = execute_to_batches ( & ctx, sql) . await ;
338- let expected = vec ! [ "++" , "++" ] ;
339- assert_batches_eq ! ( expected, & results) ;
340-
341- Ok ( ( ) )
342- }
343-
344- #[ tokio:: test]
345- async fn tpch_q22_correlated ( ) -> Result < ( ) > {
346- let ctx = SessionContext :: new ( ) ;
347- register_tpch_csv ( & ctx, "customer" ) . await ?;
348- register_tpch_csv ( & ctx, "orders" ) . await ?;
349-
350- let sql = r#"select cntrycode, count(*) as numcust, sum(c_acctbal) as totacctbal
351- from (
352- select substring(c_phone from 1 for 2) as cntrycode, c_acctbal from customer
353- where substring(c_phone from 1 for 2) in ('13', '31', '23', '29', '30', '18', '17')
354- and c_acctbal > (
355- select avg(c_acctbal) from customer where c_acctbal > 0.00
356- and substring(c_phone from 1 for 2) in ('13', '31', '23', '29', '30', '18', '17')
357- )
358- and not exists ( select * from orders where o_custkey = c_custkey )
359- ) as custsale
360- group by cntrycode
361- order by cntrycode;"# ;
362-
363- // assert plan
364- let dataframe = ctx. sql ( sql) . await . unwrap ( ) ;
365- let plan = dataframe. into_optimized_plan ( ) . unwrap ( ) ;
366- let actual = format ! ( "{}" , plan. display_indent( ) ) ;
367- let expected = r#"Sort: custsale.cntrycode ASC NULLS LAST
368- Projection: custsale.cntrycode, COUNT(UInt8(1)) AS numcust, SUM(custsale.c_acctbal) AS totacctbal
369- Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[COUNT(UInt8(1)), SUM(custsale.c_acctbal)]]
370- SubqueryAlias: custsale
371- Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, customer.c_acctbal
372- Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __scalar_sq_1.__value
373- CrossJoin:
374- LeftAnti Join: customer.c_custkey = orders.o_custkey
375- Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])
376- TableScan: customer projection=[c_custkey, c_phone, c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])]
377- TableScan: orders projection=[o_custkey]
378- SubqueryAlias: __scalar_sq_1
379- Projection: AVG(customer.c_acctbal) AS __value
380- Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]]
381- Filter: customer.c_acctbal > Decimal128(Some(0),15,2) AND substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])
382- TableScan: customer projection=[c_phone, c_acctbal], partial_filters=[customer.c_acctbal > Decimal128(Some(0),15,2) AS customer.c_acctbal > Decimal128(Some(0),30,15), substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]), customer.c_acctbal > Decimal128(Some(0),15,2)]"# ;
383- assert_eq ! ( expected, actual) ;
384-
385- // assert data
386- let results = execute_to_batches ( & ctx, sql) . await ;
387- let expected = vec ! [
388- "+-----------+---------+------------+" ,
389- "| cntrycode | numcust | totacctbal |" ,
390- "+-----------+---------+------------+" ,
391- "| 18 | 1 | 8324.07 |" ,
392- "| 30 | 1 | 7638.57 |" ,
393- "+-----------+---------+------------+" ,
394- ] ;
395- assert_batches_eq ! ( expected, & results) ;
396-
397- Ok ( ( ) )
398- }
399-
400- #[ tokio:: test]
401- async fn tpch_q11_correlated ( ) -> Result < ( ) > {
402- let ctx = SessionContext :: new ( ) ;
403- register_tpch_csv ( & ctx, "partsupp" ) . await ?;
404- register_tpch_csv ( & ctx, "supplier" ) . await ?;
405- register_tpch_csv ( & ctx, "nation" ) . await ?;
406-
407- let sql = r#"select ps_partkey, sum(ps_supplycost * ps_availqty) as value
408- from partsupp, supplier, nation
409- where ps_suppkey = s_suppkey and s_nationkey = n_nationkey and n_name = 'GERMANY'
410- group by ps_partkey having
411- sum(ps_supplycost * ps_availqty) > (
412- select sum(ps_supplycost * ps_availqty) * 0.0001
413- from partsupp, supplier, nation
414- where ps_suppkey = s_suppkey and s_nationkey = n_nationkey and n_name = 'GERMANY'
415- )
416- order by value desc;
417- "# ;
418-
419- // assert plan
420- let dataframe = ctx. sql ( sql) . await . unwrap ( ) ;
421- let plan = dataframe. into_optimized_plan ( ) . unwrap ( ) ;
422- let actual = format ! ( "{}" , plan. display_indent( ) ) ;
423- let expected = r#"Sort: value DESC NULLS FIRST
424- Projection: partsupp.ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS value
425- Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > CAST(__scalar_sq_1.__value AS Decimal128(38, 15))
426- CrossJoin:
427- Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]
428- Inner Join: supplier.s_nationkey = nation.n_nationkey
429- Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
430- TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost]
431- TableScan: supplier projection=[s_suppkey, s_nationkey]
432- Filter: nation.n_name = Utf8("GERMANY")
433- TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("GERMANY")]
434- SubqueryAlias: __scalar_sq_1
435- Projection: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS __value
436- Aggregate: groupBy=[[]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]
437- Inner Join: supplier.s_nationkey = nation.n_nationkey
438- Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
439- TableScan: partsupp projection=[ps_suppkey, ps_availqty, ps_supplycost]
440- TableScan: supplier projection=[s_suppkey, s_nationkey]
441- Filter: nation.n_name = Utf8("GERMANY")
442- TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("GERMANY")]"# ;
443- assert_eq ! ( actual, expected) ;
444-
445- // assert data
446- let results = execute_to_batches ( & ctx, sql) . await ;
447- let expected = vec ! [ "++" , "++" ] ;
448- assert_batches_eq ! ( expected, & results) ;
449-
450- Ok ( ( ) )
451- }
0 commit comments