Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions mssql_python/pybind/ddbc_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2439,6 +2439,30 @@ SQLRETURN BindParameterArray(SQLHANDLE hStmt, const py::list& columnwise_params,
bufferLength = sizeof(SQLGUID);
break;
}
case SQL_C_DEFAULT: {
// Handle NULL parameters - all values in this column should be NULL
// The upstream Python type detection (via _compute_column_type) ensures
// SQL_C_DEFAULT is only used when all values are None
LOG("BindParameterArray: Binding SQL_C_DEFAULT (NULL) array - param_index=%d, "
"count=%zu",
paramIndex, paramSetSize);

// For NULL parameters, we need to allocate a minimal buffer and set all
// indicators to SQL_NULL_DATA Use SQL_C_CHAR as a safe default C type for NULL
// values
char* nullBuffer = AllocateParamBufferArray<char>(tempBuffers, paramSetSize);
strLenOrIndArray = AllocateParamBufferArray<SQLLEN>(tempBuffers, paramSetSize);

for (size_t i = 0; i < paramSetSize; ++i) {
nullBuffer[i] = 0;
strLenOrIndArray[i] = SQL_NULL_DATA;
}

dataPtr = nullBuffer;
bufferLength = 1;
LOG("BindParameterArray: SQL_C_DEFAULT bound - param_index=%d", paramIndex);
break;
}
default: {
LOG("BindParameterArray: Unsupported C type - "
"param_index=%d, C_type=%d",
Expand Down
242 changes: 242 additions & 0 deletions tests/test_004_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1598,6 +1598,248 @@ def test_executemany_empty_parameter_list(cursor, db_connection):
db_connection.commit()


def test_executemany_mixed_null_and_typed_values(cursor, db_connection):
"""Test executemany with randomly mixed NULL and non-NULL values across multiple columns and rows (50 rows, 10 columns)."""
try:
# Create table with 10 columns of various types
cursor.execute(
"""
CREATE TABLE #pytest_empty_params (
col1 INT,
col2 VARCHAR(50),
col3 FLOAT,
col4 BIT,
col5 DATETIME,
col6 DECIMAL(10, 2),
col7 NVARCHAR(100),
col8 BIGINT,
col9 DATE,
col10 REAL
)
"""
)

# Generate 50 rows with randomly mixed NULL and non-NULL values across 10 columns
data = []
for i in range(50):
row = (
i if i % 3 != 0 else None, # col1: NULL every 3rd row
f"text_{i}" if i % 2 == 0 else None, # col2: NULL on odd rows
float(i * 1.5) if i % 4 != 0 else None, # col3: NULL every 4th row
True if i % 5 == 0 else (False if i % 5 == 1 else None), # col4: NULL on some rows
datetime(2025, 1, 1, 12, 0, 0) if i % 6 != 0 else None, # col5: NULL every 6th row
decimal.Decimal(f"{i}.99") if i % 3 != 0 else None, # col6: NULL every 3rd row
f"desc_{i}" if i % 7 != 0 else None, # col7: NULL every 7th row
i * 100 if i % 8 != 0 else None, # col8: NULL every 8th row
date(2025, 1, 1) if i % 9 != 0 else None, # col9: NULL every 9th row
float(i / 2.0) if i % 10 != 0 else None, # col10: NULL every 10th row
)
data.append(row)

cursor.executemany(
"INSERT INTO #pytest_empty_params VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", data
)
db_connection.commit()

# Verify all 50 rows were inserted
cursor.execute("SELECT COUNT(*) FROM #pytest_empty_params")
count = cursor.fetchone()[0]
assert count == 50, f"Expected 50 rows, got {count}"

# Verify NULL counts for specific columns
cursor.execute("SELECT COUNT(*) FROM #pytest_empty_params WHERE col1 IS NULL")
null_count_col1 = cursor.fetchone()[0]
assert (
null_count_col1 == 17
), f"Expected 17 NULLs in col1 (every 3rd row), got {null_count_col1}"

cursor.execute("SELECT COUNT(*) FROM #pytest_empty_params WHERE col2 IS NULL")
null_count_col2 = cursor.fetchone()[0]
assert null_count_col2 == 25, f"Expected 25 NULLs in col2 (odd rows), got {null_count_col2}"

cursor.execute("SELECT COUNT(*) FROM #pytest_empty_params WHERE col3 IS NULL")
null_count_col3 = cursor.fetchone()[0]
assert (
null_count_col3 == 13
), f"Expected 13 NULLs in col3 (every 4th row), got {null_count_col3}"

# Verify some non-NULL values exist
cursor.execute("SELECT COUNT(*) FROM #pytest_empty_params WHERE col1 IS NOT NULL")
non_null_count = cursor.fetchone()[0]
assert non_null_count > 0, "Expected some non-NULL values in col1"

cursor.execute("SELECT COUNT(*) FROM #pytest_empty_params WHERE col2 IS NOT NULL")
non_null_count = cursor.fetchone()[0]
assert non_null_count > 0, "Expected some non-NULL values in col2"

finally:
cursor.execute("DROP TABLE IF EXISTS #pytest_empty_params")
db_connection.commit()


def test_executemany_multi_column_null_arrays(cursor, db_connection):
"""Test executemany with multi-column NULL arrays (50 records, 8 columns)."""
try:
# Create table with 8 columns of various types
cursor.execute(
"""
CREATE TABLE #pytest_null_arrays (
col1 INT,
col2 VARCHAR(100),
col3 FLOAT,
col4 DATETIME,
col5 DECIMAL(18, 4),
col6 NVARCHAR(200),
col7 BIGINT,
col8 DATE
)
"""
)

# Generate 50 rows with all NULL values across 8 columns
data = [(None, None, None, None, None, None, None, None) for _ in range(50)]

cursor.executemany("INSERT INTO #pytest_null_arrays VALUES (?, ?, ?, ?, ?, ?, ?, ?)", data)
db_connection.commit()

# Verify all 50 rows were inserted
cursor.execute("SELECT COUNT(*) FROM #pytest_null_arrays")
count = cursor.fetchone()[0]
assert count == 50, f"Expected 50 rows, got {count}"

# Verify all values are NULL for each column
for col_num in range(1, 9):
cursor.execute(f"SELECT COUNT(*) FROM #pytest_null_arrays WHERE col{col_num} IS NULL")
null_count = cursor.fetchone()[0]
assert null_count == 50, f"Expected 50 NULLs in col{col_num}, got {null_count}"

# Verify no non-NULL values exist
cursor.execute(
"""
SELECT COUNT(*) FROM #pytest_null_arrays
WHERE col1 IS NOT NULL OR col2 IS NOT NULL OR col3 IS NOT NULL
OR col4 IS NOT NULL OR col5 IS NOT NULL OR col6 IS NOT NULL
OR col7 IS NOT NULL OR col8 IS NOT NULL
"""
)
non_null_count = cursor.fetchone()[0]
assert non_null_count == 0, f"Expected 0 non-NULL values, got {non_null_count}"

finally:
cursor.execute("DROP TABLE IF EXISTS #pytest_null_arrays")
db_connection.commit()


def test_executemany_MIX_NONE_parameter_list(cursor, db_connection):
"""Test executemany with an NONE parameter list."""
try:
cursor.execute("CREATE TABLE #pytest_empty_params (val VARCHAR(50))")
data = [(None,), ("Test",), (None,)]
cursor.executemany("INSERT INTO #pytest_empty_params VALUES (?)", data)
db_connection.commit()

cursor.execute("SELECT COUNT(*) FROM #pytest_empty_params")
count = cursor.fetchone()[0]
assert count == 3
finally:
cursor.execute("DROP TABLE IF EXISTS #pytest_empty_params")
db_connection.commit()


def test_executemany_concurrent_null_parameters(db_connection):
"""Test concurrent executemany calls with NULL parameters for thread safety."""
import threading
import time

# Create table
with db_connection.cursor() as cursor:
cursor.execute(
"""
CREATE TABLE #pytest_concurrent_nulls (
thread_id INT,
col1 INT,
col2 VARCHAR(100),
col3 FLOAT,
col4 DATETIME
)
"""
)
db_connection.commit()

errors = []
lock = threading.Lock()

def insert_nulls(thread_id):
"""Worker function to insert NULL data from a thread."""
try:
with db_connection.cursor() as cursor:
# Generate test data with NULLs for this thread
data = []
for i in range(20):
row = (
thread_id,
i if i % 2 == 0 else None, # Mix of values and NULLs
f"thread_{thread_id}_row_{i}" if i % 3 != 0 else None,
float(i * 1.5) if i % 4 != 0 else None,
datetime(2025, 1, 1, 12, 0, 0) if i % 5 != 0 else None,
)
data.append(row)

cursor.executemany(
"INSERT INTO #pytest_concurrent_nulls VALUES (?, ?, ?, ?, ?)", data
)
db_connection.commit()
except Exception as e:
with lock:
errors.append((thread_id, str(e)))

# Create and start multiple threads
threads = []
num_threads = 5

for i in range(num_threads):
thread = threading.Thread(target=insert_nulls, args=(i,))
threads.append(thread)
thread.start()

# Wait for all threads to complete
for thread in threads:
thread.join()

# Check for errors
assert len(errors) == 0, f"Errors occurred in threads: {errors}"

# Verify data was inserted correctly
with db_connection.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM #pytest_concurrent_nulls")
total_count = cursor.fetchone()[0]
assert (
total_count == num_threads * 20
), f"Expected {num_threads * 20} rows, got {total_count}"

# Verify each thread's data
for thread_id in range(num_threads):
cursor.execute(
"SELECT COUNT(*) FROM #pytest_concurrent_nulls WHERE thread_id = ?", [thread_id]
)
thread_count = cursor.fetchone()[0]
assert thread_count == 20, f"Thread {thread_id} expected 20 rows, got {thread_count}"

# Verify NULL counts for this thread
cursor.execute(
"SELECT COUNT(*) FROM #pytest_concurrent_nulls WHERE thread_id = ? AND col1 IS NULL",
[thread_id],
)
null_count = cursor.fetchone()[0]
assert (
null_count == 10
), f"Thread {thread_id} expected 10 NULLs in col1, got {null_count}"

# Cleanup
cursor.execute("DROP TABLE IF EXISTS #pytest_concurrent_nulls")
db_connection.commit()


def test_executemany_Decimal_list(cursor, db_connection):
"""Test executemany with an decimal parameter list."""
try:
Expand Down
Loading