Skip to content

Commit bf37330

Browse files
committed
working nvarcharmax
1 parent fba0e63 commit bf37330

5 files changed

Lines changed: 158 additions & 48 deletions

File tree

main.py

Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,65 @@
55

66
setup_logging('stdout')
77

8-
conn_str = os.getenv("DB_CONNECTION_STRING")
9-
conn = connect(conn_str)
8+
# conn_str = os.getenv("DB_CONNECTION_STRING")
9+
# conn = connect(conn_str)
1010

11-
# conn.autocommit = True
11+
# # conn.autocommit = True
1212

13-
cursor = conn.cursor()
14-
cursor.execute("SELECT database_id, name from sys.databases;")
15-
rows = cursor.fetchall()
13+
# cursor = conn.cursor()
14+
# cursor.execute("SELECT database_id, name from sys.databases;")
15+
# rows = cursor.fetchall()
1616

17-
for row in rows:
18-
print(f"Database ID: {row[0]}, Name: {row[1]}")
17+
# for row in rows:
18+
# print(f"Database ID: {row[0]}, Name: {row[1]}")
1919

20-
cursor.close()
21-
conn.close()
20+
# cursor.close()
21+
# conn.close()
22+
23+
conn_str = "Server=Saumya;DATABASE=master;UID=sa;PWD=HappyPass1234;Trust_Connection=yes;TrustServerCertificate=yes;"
24+
db_connection = connect(conn_str)
25+
db_connection.autocommit = True
26+
values = ["Ω" * 4100, "漢" * 5000]
27+
cursor = db_connection.cursor()
28+
cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))")
29+
db_connection.commit()
30+
31+
# --- use executemany for inserts ---
32+
cursor.executemany(
33+
"INSERT INTO #pytest_nvarcharmax VALUES (?)",
34+
[(v,) for v in values],
35+
)
36+
db_connection.commit()
37+
38+
# --- fetchall ---
39+
cursor.execute("SELECT col FROM #pytest_nvarcharmax ORDER BY LEN(col)")
40+
rows = [r[0] for r in cursor.fetchall()]
41+
assert rows == sorted(values, key=len)
42+
43+
# --- fetchone ---
44+
cursor.execute("SELECT col FROM #pytest_nvarcharmax ORDER BY LEN(col)")
45+
r1 = cursor.fetchone()[0]
46+
r2 = cursor.fetchone()[0]
47+
assert {r1, r2} == set(values)
48+
assert cursor.fetchone() is None
49+
50+
# --- fetchmany ---
51+
cursor.execute("SELECT col FROM #pytest_nvarcharmax ORDER BY LEN(col)")
52+
53+
batch = cursor.fetchmany(1)
54+
assert set(r[0] for r in batch).issubset(set(values))
55+
56+
remaining = []
57+
while True:
58+
rows = cursor.fetchmany(1)
59+
if not rows:
60+
break
61+
remaining.extend(rows)
62+
63+
all_fetched = [r[0] for r in batch + remaining]
64+
assert set(all_fetched) == set(values)
65+
66+
67+
# --- cleanup ---
68+
cursor.execute("DROP TABLE #pytest_nvarcharmax")
69+
db_connection.commit()

mssql_python/cursor.py

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -998,19 +998,19 @@ def execute(
998998
self.is_stmt_prepared = [False]
999999

10001000
log('debug', "Executing query: %s", operation)
1001-
for i, param in enumerate(parameters):
1002-
log('debug',
1003-
"""Parameter number: %s, Parameter: %s,
1004-
Param Python Type: %s, ParamInfo: %s, %s, %s, %s, %s""",
1005-
i + 1,
1006-
param,
1007-
str(type(param)),
1008-
parameters_type[i].paramSQLType,
1009-
parameters_type[i].paramCType,
1010-
parameters_type[i].columnSize,
1011-
parameters_type[i].decimalDigits,
1012-
parameters_type[i].inputOutputType,
1013-
)
1001+
# for i, param in enumerate(parameters):
1002+
# log('debug',
1003+
# """Parameter number: %s, Parameter: %s,
1004+
# Param Python Type: %s, ParamInfo: %s, %s, %s, %s, %s""",
1005+
# i + 1,
1006+
# param,
1007+
# str(type(param)),
1008+
# parameters_type[i].paramSQLType,
1009+
# parameters_type[i].paramCType,
1010+
# parameters_type[i].columnSize,
1011+
# parameters_type[i].decimalDigits,
1012+
# parameters_type[i].inputOutputType,
1013+
# )
10141014

10151015
ret = ddbc_bindings.DDBCSQLExecute(
10161016
self.hstmt,
@@ -1561,22 +1561,14 @@ def _compute_column_type(self, column):
15611561
sample_value = v
15621562

15631563
return sample_value, None, None
1564-
1564+
15651565
def executemany(self, operation: str, seq_of_parameters: list) -> None:
15661566
"""
1567-
Prepare a database operation and execute it against all parameter sequences.
1568-
This version uses column-wise parameter binding and a single batched SQLExecute().
1569-
Args:
1570-
operation: SQL query or command.
1571-
seq_of_parameters: Sequence of sequences or mappings of parameters.
1572-
1573-
Raises:
1574-
Error: If the operation fails.
1567+
Execute a database operation against all parameter sequences.
1568+
Automatically falls back to row-by-row execution if any parameter requires DAE/streaming.
15751569
"""
15761570
self._check_closed()
15771571
self._reset_cursor()
1578-
1579-
# Clear any previous messages
15801572
self.messages = []
15811573

15821574
if not seq_of_parameters:
@@ -1602,6 +1594,7 @@ def executemany(self, operation: str, seq_of_parameters: list) -> None:
16021594
param_count = len(sample_row)
16031595
param_info = ddbc_bindings.ParamInfo
16041596
parameters_type = []
1597+
any_dae = False
16051598

16061599
# Check if we have explicit input sizes set
16071600
if self._inputsizes:
@@ -1705,6 +1698,15 @@ def executemany(self, operation: str, seq_of_parameters: list) -> None:
17051698
paraminfo.columnSize = max(max_binary_size, 1)
17061699

17071700
parameters_type.append(paraminfo)
1701+
if paraminfo.isDAE:
1702+
any_dae = True
1703+
1704+
# If any DAE parameter exists, fall back to row-by-row execution
1705+
if any_dae:
1706+
log('debug', "DAE parameters detected. Falling back to row-by-row execution with streaming.")
1707+
for row in seq_of_parameters:
1708+
self.execute(operation, row)
1709+
return
17081710

17091711
# Process parameters into column-wise format with possible type conversions
17101712
# First, convert any Decimal types as needed for NUMERIC/DECIMAL columns
@@ -1728,8 +1730,7 @@ def executemany(self, operation: str, seq_of_parameters: list) -> None:
17281730
log('debug', "Executing batch query with %d parameter sets:\n%s",
17291731
len(seq_of_parameters), "\n".join(f" {i+1}: {tuple(p) if isinstance(p, (list, tuple)) else p}" for i, p in enumerate(seq_of_parameters[:5])) # Limit to first 5 rows for large batches
17301732
)
1731-
1732-
# Execute batched statement
1733+
17331734
ret = ddbc_bindings.SQLExecuteMany(
17341735
self.hstmt,
17351736
operation,
@@ -1758,6 +1759,7 @@ def executemany(self, operation: str, seq_of_parameters: list) -> None:
17581759
# Reset input sizes after execution
17591760
self._reset_inputsizes()
17601761

1762+
17611763
def fetchone(self) -> Union[None, Row]:
17621764
"""
17631765
Fetch the next row of a query result set.

mssql_python/msvcp140.dll

562 KB
Binary file not shown.

mssql_python/pybind/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ target_compile_definitions(ddbc_bindings PRIVATE
272272

273273
# Add warning level flags for MSVC
274274
if(MSVC)
275-
target_compile_options(ddbc_bindings PRIVATE /W4 /WX)
275+
target_compile_options(ddbc_bindings PRIVATE /W4 )
276276
endif()
277277

278278
# Add macOS-specific string conversion fix

mssql_python/pybind/ddbc_bindings.cpp

Lines changed: 71 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1997,26 +1997,86 @@ SQLRETURN SQLExecuteMany_wrap(const SqlHandlePtr statementHandle,
19971997
const std::wstring& query,
19981998
const py::list& columnwise_params,
19991999
const std::vector<ParamInfo>& paramInfos,
2000-
size_t paramSetSize) {
2000+
size_t paramSetSize)
2001+
{
20012002
SQLHANDLE hStmt = statementHandle->get();
20022003
SQLWCHAR* queryPtr;
2004+
20032005
#if defined(__APPLE__) || defined(__linux__)
20042006
std::vector<SQLWCHAR> queryBuffer = WStringToSQLWCHAR(query);
20052007
queryPtr = queryBuffer.data();
20062008
#else
20072009
queryPtr = const_cast<SQLWCHAR*>(query.c_str());
20082010
#endif
2011+
20092012
RETCODE rc = SQLPrepare_ptr(hStmt, queryPtr, SQL_NTS);
20102013
if (!SQL_SUCCEEDED(rc)) return rc;
2011-
std::vector<std::shared_ptr<void>> paramBuffers;
2012-
rc = BindParameterArray(hStmt, columnwise_params, paramInfos, paramSetSize, paramBuffers);
2013-
if (!SQL_SUCCEEDED(rc)) return rc;
2014-
rc = SQLSetStmtAttr_ptr(hStmt, SQL_ATTR_PARAMSET_SIZE, (SQLPOINTER)paramSetSize, 0);
2015-
if (!SQL_SUCCEEDED(rc)) return rc;
2016-
rc = SQLExecute_ptr(hStmt);
2017-
return rc;
2014+
2015+
// Check if any param is DAE/streaming
2016+
bool hasDAE = false;
2017+
for (const auto& p : paramInfos) {
2018+
if (p.isDAE) {
2019+
hasDAE = true;
2020+
break;
2021+
}
2022+
}
2023+
2024+
if (!hasDAE) {
2025+
LOG("No DAE parameters detected. Proceeding with fast array execution.");
2026+
// Fast path: no streaming, bind entire array
2027+
std::vector<std::shared_ptr<void>> paramBuffers;
2028+
rc = BindParameterArray(hStmt, columnwise_params, paramInfos, paramSetSize, paramBuffers);
2029+
if (!SQL_SUCCEEDED(rc)) return rc;
2030+
2031+
rc = SQLSetStmtAttr_ptr(hStmt, SQL_ATTR_PARAMSET_SIZE, (SQLPOINTER)paramSetSize, 0);
2032+
if (!SQL_SUCCEEDED(rc)) return rc;
2033+
2034+
rc = SQLExecute_ptr(hStmt);
2035+
return rc;
2036+
} else {
2037+
LOG("DAE parameters detected. Falling back to row-by-row execution with streaming.");
2038+
// Fallback: row-by-row execution with streaming
2039+
size_t rowCount = columnwise_params.size();
2040+
for (size_t rowIndex = 0; rowIndex < rowCount; ++rowIndex) {
2041+
py::list rowParams = columnwise_params[rowIndex];
2042+
2043+
std::vector<std::shared_ptr<void>> paramBuffers;
2044+
rc = BindParameters(hStmt, rowParams, const_cast<std::vector<ParamInfo>&>(paramInfos), paramBuffers);
2045+
if (!SQL_SUCCEEDED(rc)) return rc;
2046+
2047+
rc = SQLExecute_ptr(hStmt);
2048+
2049+
// Handle DAE streaming
2050+
while (rc == SQL_NEED_DATA) {
2051+
SQLPOINTER token;
2052+
rc = SQLParamData_ptr(hStmt, &token);
2053+
if (!SQL_SUCCEEDED(rc) && rc != SQL_NEED_DATA) return rc;
2054+
2055+
py::object* py_obj_ptr = reinterpret_cast<py::object*>(token);
2056+
if (!py_obj_ptr) return SQL_ERROR;
2057+
2058+
// Support string/binary streaming
2059+
if (py::isinstance<py::str>(*py_obj_ptr)) {
2060+
std::string data = py_obj_ptr->cast<std::string>();
2061+
SQLLEN data_len = static_cast<SQLLEN>(data.size());
2062+
rc = SQLPutData_ptr(hStmt, (SQLPOINTER)data.c_str(), data_len);
2063+
} else if (py::isinstance<py::bytes>(*py_obj_ptr) || py::isinstance<py::bytearray>(*py_obj_ptr)) {
2064+
std::string data = py_obj_ptr->cast<std::string>();
2065+
SQLLEN data_len = static_cast<SQLLEN>(data.size());
2066+
rc = SQLPutData_ptr(hStmt, (SQLPOINTER)data.c_str(), data_len);
2067+
} else {
2068+
LOG("Unsupported DAE parameter type in row {}", rowIndex);
2069+
return SQL_ERROR;
2070+
}
2071+
}
2072+
2073+
if (!SQL_SUCCEEDED(rc)) return rc;
2074+
}
2075+
return SQL_SUCCESS;
2076+
}
20182077
}
20192078

2079+
20202080
// Wrap SQLNumResultCols
20212081
SQLSMALLINT SQLNumResultCols_wrap(SqlHandlePtr statementHandle) {
20222082
LOG("Get number of columns in result set");
@@ -2213,7 +2273,7 @@ static py::object FetchLobColumnData(SQLHSTMT hStmt,
22132273
LOG("Loop {}: Appended {} bytes", loopCount, bytesRead);
22142274
}
22152275
if (ret == SQL_SUCCESS) {
2216-
LOG("Loop {}: SQL_SUCCESS no more data", loopCount);
2276+
LOG("Loop {}: SQL_SUCCESS, no more data", loopCount);
22172277
break;
22182278
}
22192279
}
@@ -3270,7 +3330,7 @@ SQLRETURN FetchMany_wrap(SqlHandlePtr StatementHandle, py::list& rows, int fetch
32703330

32713331
// If we have LOBs → fall back to row-by-row fetch + SQLGetData_wrap
32723332
if (!lobColumns.empty()) {
3273-
LOG("LOB columns detected using per-row SQLGetData path");
3333+
LOG("LOB columns detected, using per-row SQLGetData path");
32743334
while (true) {
32753335
ret = SQLFetch_ptr(hStmt);
32763336
if (ret == SQL_NO_DATA) break;
@@ -3392,7 +3452,7 @@ SQLRETURN FetchAll_wrap(SqlHandlePtr StatementHandle, py::list& rows) {
33923452

33933453
// If we have LOBs → fall back to row-by-row fetch + SQLGetData_wrap
33943454
if (!lobColumns.empty()) {
3395-
LOG("LOB columns detected using per-row SQLGetData path");
3455+
LOG("LOB columns detected, using per-row SQLGetData path");
33963456
while (true) {
33973457
ret = SQLFetch_ptr(hStmt);
33983458
if (ret == SQL_NO_DATA) break;

0 commit comments

Comments
 (0)