|
12 | 12 | """ |
13 | 13 | import weakref |
14 | 14 | import re |
| 15 | +import codecs |
15 | 16 | from mssql_python.cursor import Cursor |
16 | | -from mssql_python.helpers import add_driver_to_connection_str, sanitize_connection_string, log |
| 17 | +from mssql_python.helpers import add_driver_to_connection_str, sanitize_connection_string, sanitize_user_input, log |
17 | 18 | from mssql_python import ddbc_bindings |
18 | 19 | from mssql_python.pooling import PoolingManager |
| 20 | +from mssql_python.exceptions import InterfaceError, ProgrammingError |
19 | 21 | from mssql_python.auth import process_connection_string |
| 22 | +from mssql_python.constants import ConstantsDDBC |
| 23 | + |
| 24 | +# Add SQL_WMETADATA constant for metadata decoding configuration |
| 25 | +SQL_WMETADATA = -99 # Special flag for column name decoding |
| 26 | + |
| 27 | +# UTF-16 encoding variants that should use SQL_WCHAR by default |
| 28 | +UTF16_ENCODINGS = frozenset([ |
| 29 | + 'utf-16', |
| 30 | + 'utf-16le', |
| 31 | + 'utf-16be' |
| 32 | +]) |
| 33 | + |
| 34 | +def _validate_encoding(encoding: str) -> bool: |
| 35 | + """ |
| 36 | + Cached encoding validation using codecs.lookup(). |
| 37 | + |
| 38 | + Args: |
| 39 | + encoding (str): The encoding name to validate. |
| 40 | + |
| 41 | + Returns: |
| 42 | + bool: True if encoding is valid, False otherwise. |
| 43 | + |
| 44 | + Note: |
| 45 | + Uses LRU cache to avoid repeated expensive codecs.lookup() calls. |
| 46 | + Cache size is limited to 128 entries which should cover most use cases. |
| 47 | + """ |
| 48 | + try: |
| 49 | + codecs.lookup(encoding) |
| 50 | + return True |
| 51 | + except LookupError: |
| 52 | + return False |
20 | 53 |
|
21 | 54 | # Import all DB-API 2.0 exception classes for Connection attributes |
22 | 55 | from mssql_python.exceptions import ( |
@@ -68,6 +101,9 @@ class Connection: |
68 | 101 | close() -> None: |
69 | 102 | __enter__() -> Connection: |
70 | 103 | __exit__() -> None: |
| 104 | + setencoding(encoding=None, ctype=None) -> None: |
| 105 | + setdecoding(sqltype, encoding=None, ctype=None) -> None: |
| 106 | + getdecoding(sqltype) -> dict: |
71 | 107 | """ |
72 | 108 |
|
73 | 109 | # DB-API 2.0 Exception attributes |
@@ -108,6 +144,29 @@ def __init__(self, connection_str: str = "", autocommit: bool = False, attrs_bef |
108 | 144 | ) |
109 | 145 | self._attrs_before = attrs_before or {} |
110 | 146 |
|
| 147 | + # Initialize encoding settings with defaults for Python 3 |
| 148 | + # Python 3 only has str (which is Unicode), so we use utf-16le by default |
| 149 | + self._encoding_settings = { |
| 150 | + 'encoding': 'utf-16le', |
| 151 | + 'ctype': ConstantsDDBC.SQL_WCHAR.value |
| 152 | + } |
| 153 | + |
| 154 | + # Initialize decoding settings with Python 3 defaults |
| 155 | + self._decoding_settings = { |
| 156 | + ConstantsDDBC.SQL_CHAR.value: { |
| 157 | + 'encoding': 'utf-8', |
| 158 | + 'ctype': ConstantsDDBC.SQL_CHAR.value |
| 159 | + }, |
| 160 | + ConstantsDDBC.SQL_WCHAR.value: { |
| 161 | + 'encoding': 'utf-16le', |
| 162 | + 'ctype': ConstantsDDBC.SQL_WCHAR.value |
| 163 | + }, |
| 164 | + SQL_WMETADATA: { |
| 165 | + 'encoding': 'utf-16le', |
| 166 | + 'ctype': ConstantsDDBC.SQL_WCHAR.value |
| 167 | + } |
| 168 | + } |
| 169 | + |
111 | 170 | # Check if the connection string contains authentication parameters |
112 | 171 | # This is important for processing the connection string correctly. |
113 | 172 | # If authentication is specified, it will be processed to handle |
@@ -204,6 +263,247 @@ def setautocommit(self, value: bool = False) -> None: |
204 | 263 | """ |
205 | 264 | self._conn.set_autocommit(value) |
206 | 265 |
|
| 266 | + def setencoding(self, encoding=None, ctype=None): |
| 267 | + """ |
| 268 | + Sets the text encoding for SQL statements and text parameters. |
| 269 | + |
| 270 | + Since Python 3 only has str (which is Unicode), this method configures |
| 271 | + how text is encoded when sending to the database. |
| 272 | + |
| 273 | + Args: |
| 274 | + encoding (str, optional): The encoding to use. This must be a valid Python |
| 275 | + encoding that converts text to bytes. If None, defaults to 'utf-16le'. |
| 276 | + ctype (int, optional): The C data type to use when passing data: |
| 277 | + SQL_CHAR or SQL_WCHAR. If not provided, SQL_WCHAR is used for |
| 278 | + UTF-16 variants (see UTF16_ENCODINGS constant). SQL_CHAR is used for all other encodings. |
| 279 | + |
| 280 | + Returns: |
| 281 | + None |
| 282 | + |
| 283 | + Raises: |
| 284 | + ProgrammingError: If the encoding is not valid or not supported. |
| 285 | + InterfaceError: If the connection is closed. |
| 286 | + |
| 287 | + Example: |
| 288 | + # For databases that only communicate with UTF-8 |
| 289 | + cnxn.setencoding(encoding='utf-8') |
| 290 | + |
| 291 | + # For explicitly using SQL_CHAR |
| 292 | + cnxn.setencoding(encoding='utf-8', ctype=mssql_python.SQL_CHAR) |
| 293 | + """ |
| 294 | + if self._closed: |
| 295 | + raise InterfaceError( |
| 296 | + driver_error="Connection is closed", |
| 297 | + ddbc_error="Connection is closed", |
| 298 | + ) |
| 299 | + |
| 300 | + # Set default encoding if not provided |
| 301 | + if encoding is None: |
| 302 | + encoding = 'utf-16le' |
| 303 | + |
| 304 | + # Validate encoding using cached validation for better performance |
| 305 | + if not _validate_encoding(encoding): |
| 306 | + # Log the sanitized encoding for security |
| 307 | + log('warning', "Invalid encoding attempted: %s", sanitize_user_input(str(encoding))) |
| 308 | + raise ProgrammingError( |
| 309 | + driver_error=f"Unsupported encoding: {encoding}", |
| 310 | + ddbc_error=f"The encoding '{encoding}' is not supported by Python", |
| 311 | + ) |
| 312 | + |
| 313 | + # Normalize encoding to casefold for more robust Unicode handling |
| 314 | + encoding = encoding.casefold() |
| 315 | + |
| 316 | + # Set default ctype based on encoding if not provided |
| 317 | + if ctype is None: |
| 318 | + if encoding in UTF16_ENCODINGS: |
| 319 | + ctype = ConstantsDDBC.SQL_WCHAR.value |
| 320 | + else: |
| 321 | + ctype = ConstantsDDBC.SQL_CHAR.value |
| 322 | + |
| 323 | + # Validate ctype |
| 324 | + valid_ctypes = [ConstantsDDBC.SQL_CHAR.value, ConstantsDDBC.SQL_WCHAR.value] |
| 325 | + if ctype not in valid_ctypes: |
| 326 | + # Log the sanitized ctype for security |
| 327 | + log('warning', "Invalid ctype attempted: %s", sanitize_user_input(str(ctype))) |
| 328 | + raise ProgrammingError( |
| 329 | + driver_error=f"Invalid ctype: {ctype}", |
| 330 | + ddbc_error=f"ctype must be SQL_CHAR ({ConstantsDDBC.SQL_CHAR.value}) or SQL_WCHAR ({ConstantsDDBC.SQL_WCHAR.value})", |
| 331 | + ) |
| 332 | + |
| 333 | + # Store the encoding settings |
| 334 | + self._encoding_settings = { |
| 335 | + 'encoding': encoding, |
| 336 | + 'ctype': ctype |
| 337 | + } |
| 338 | + |
| 339 | + # Log with sanitized values for security |
| 340 | + log('info', "Text encoding set to %s with ctype %s", |
| 341 | + sanitize_user_input(encoding), sanitize_user_input(str(ctype))) |
| 342 | + |
| 343 | + def getencoding(self): |
| 344 | + """ |
| 345 | + Gets the current text encoding settings. |
| 346 | + |
| 347 | + Returns: |
| 348 | + dict: A dictionary containing 'encoding' and 'ctype' keys. |
| 349 | + |
| 350 | + Raises: |
| 351 | + InterfaceError: If the connection is closed. |
| 352 | + |
| 353 | + Example: |
| 354 | + settings = cnxn.getencoding() |
| 355 | + print(f"Current encoding: {settings['encoding']}") |
| 356 | + print(f"Current ctype: {settings['ctype']}") |
| 357 | + """ |
| 358 | + if self._closed: |
| 359 | + raise InterfaceError( |
| 360 | + driver_error="Connection is closed", |
| 361 | + ddbc_error="Connection is closed", |
| 362 | + ) |
| 363 | + |
| 364 | + return self._encoding_settings.copy() |
| 365 | + |
| 366 | + def setdecoding(self, sqltype, encoding=None, ctype=None): |
| 367 | + """ |
| 368 | + Sets the text decoding used when reading SQL_CHAR and SQL_WCHAR from the database. |
| 369 | + |
| 370 | + This method configures how text data is decoded when reading from the database. |
| 371 | + In Python 3, all text is Unicode (str), so this primarily affects the encoding |
| 372 | + used to decode bytes from the database. |
| 373 | + |
| 374 | + Args: |
| 375 | + sqltype (int): The SQL type being configured: SQL_CHAR, SQL_WCHAR, or SQL_WMETADATA. |
| 376 | + SQL_WMETADATA is a special flag for configuring column name decoding. |
| 377 | + encoding (str, optional): The Python encoding to use when decoding the data. |
| 378 | + If None, uses default encoding based on sqltype. |
| 379 | + ctype (int, optional): The C data type to request from SQLGetData: |
| 380 | + SQL_CHAR or SQL_WCHAR. If None, uses default based on encoding. |
| 381 | + |
| 382 | + Returns: |
| 383 | + None |
| 384 | + |
| 385 | + Raises: |
| 386 | + ProgrammingError: If the sqltype, encoding, or ctype is invalid. |
| 387 | + InterfaceError: If the connection is closed. |
| 388 | + |
| 389 | + Example: |
| 390 | + # Configure SQL_CHAR to use UTF-8 decoding |
| 391 | + cnxn.setdecoding(mssql_python.SQL_CHAR, encoding='utf-8') |
| 392 | + |
| 393 | + # Configure column metadata decoding |
| 394 | + cnxn.setdecoding(mssql_python.SQL_WMETADATA, encoding='utf-16le') |
| 395 | + |
| 396 | + # Use explicit ctype |
| 397 | + cnxn.setdecoding(mssql_python.SQL_WCHAR, encoding='utf-16le', ctype=mssql_python.SQL_WCHAR) |
| 398 | + """ |
| 399 | + if self._closed: |
| 400 | + raise InterfaceError( |
| 401 | + driver_error="Connection is closed", |
| 402 | + ddbc_error="Connection is closed", |
| 403 | + ) |
| 404 | + |
| 405 | + # Validate sqltype |
| 406 | + valid_sqltypes = [ |
| 407 | + ConstantsDDBC.SQL_CHAR.value, |
| 408 | + ConstantsDDBC.SQL_WCHAR.value, |
| 409 | + SQL_WMETADATA |
| 410 | + ] |
| 411 | + if sqltype not in valid_sqltypes: |
| 412 | + log('warning', "Invalid sqltype attempted: %s", sanitize_user_input(str(sqltype))) |
| 413 | + raise ProgrammingError( |
| 414 | + driver_error=f"Invalid sqltype: {sqltype}", |
| 415 | + ddbc_error=f"sqltype must be SQL_CHAR ({ConstantsDDBC.SQL_CHAR.value}), SQL_WCHAR ({ConstantsDDBC.SQL_WCHAR.value}), or SQL_WMETADATA ({SQL_WMETADATA})", |
| 416 | + ) |
| 417 | + |
| 418 | + # Set default encoding based on sqltype if not provided |
| 419 | + if encoding is None: |
| 420 | + if sqltype == ConstantsDDBC.SQL_CHAR.value: |
| 421 | + encoding = 'utf-8' # Default for SQL_CHAR in Python 3 |
| 422 | + else: # SQL_WCHAR or SQL_WMETADATA |
| 423 | + encoding = 'utf-16le' # Default for SQL_WCHAR in Python 3 |
| 424 | + |
| 425 | + # Validate encoding using cached validation for better performance |
| 426 | + if not _validate_encoding(encoding): |
| 427 | + log('warning', "Invalid encoding attempted: %s", sanitize_user_input(str(encoding))) |
| 428 | + raise ProgrammingError( |
| 429 | + driver_error=f"Unsupported encoding: {encoding}", |
| 430 | + ddbc_error=f"The encoding '{encoding}' is not supported by Python", |
| 431 | + ) |
| 432 | + |
| 433 | + # Normalize encoding to lowercase for consistency |
| 434 | + encoding = encoding.lower() |
| 435 | + |
| 436 | + # Set default ctype based on encoding if not provided |
| 437 | + if ctype is None: |
| 438 | + if encoding in UTF16_ENCODINGS: |
| 439 | + ctype = ConstantsDDBC.SQL_WCHAR.value |
| 440 | + else: |
| 441 | + ctype = ConstantsDDBC.SQL_CHAR.value |
| 442 | + |
| 443 | + # Validate ctype |
| 444 | + valid_ctypes = [ConstantsDDBC.SQL_CHAR.value, ConstantsDDBC.SQL_WCHAR.value] |
| 445 | + if ctype not in valid_ctypes: |
| 446 | + log('warning', "Invalid ctype attempted: %s", sanitize_user_input(str(ctype))) |
| 447 | + raise ProgrammingError( |
| 448 | + driver_error=f"Invalid ctype: {ctype}", |
| 449 | + ddbc_error=f"ctype must be SQL_CHAR ({ConstantsDDBC.SQL_CHAR.value}) or SQL_WCHAR ({ConstantsDDBC.SQL_WCHAR.value})", |
| 450 | + ) |
| 451 | + |
| 452 | + # Store the decoding settings for the specified sqltype |
| 453 | + self._decoding_settings[sqltype] = { |
| 454 | + 'encoding': encoding, |
| 455 | + 'ctype': ctype |
| 456 | + } |
| 457 | + |
| 458 | + # Log with sanitized values for security |
| 459 | + sqltype_name = { |
| 460 | + ConstantsDDBC.SQL_CHAR.value: "SQL_CHAR", |
| 461 | + ConstantsDDBC.SQL_WCHAR.value: "SQL_WCHAR", |
| 462 | + SQL_WMETADATA: "SQL_WMETADATA" |
| 463 | + }.get(sqltype, str(sqltype)) |
| 464 | + |
| 465 | + log('info', "Text decoding set for %s to %s with ctype %s", |
| 466 | + sqltype_name, sanitize_user_input(encoding), sanitize_user_input(str(ctype))) |
| 467 | + |
| 468 | + def getdecoding(self, sqltype): |
| 469 | + """ |
| 470 | + Gets the current text decoding settings for the specified SQL type. |
| 471 | + |
| 472 | + Args: |
| 473 | + sqltype (int): The SQL type to get settings for: SQL_CHAR, SQL_WCHAR, or SQL_WMETADATA. |
| 474 | + |
| 475 | + Returns: |
| 476 | + dict: A dictionary containing 'encoding' and 'ctype' keys for the specified sqltype. |
| 477 | + |
| 478 | + Raises: |
| 479 | + ProgrammingError: If the sqltype is invalid. |
| 480 | + InterfaceError: If the connection is closed. |
| 481 | + |
| 482 | + Example: |
| 483 | + settings = cnxn.getdecoding(mssql_python.SQL_CHAR) |
| 484 | + print(f"SQL_CHAR encoding: {settings['encoding']}") |
| 485 | + print(f"SQL_CHAR ctype: {settings['ctype']}") |
| 486 | + """ |
| 487 | + if self._closed: |
| 488 | + raise InterfaceError( |
| 489 | + driver_error="Connection is closed", |
| 490 | + ddbc_error="Connection is closed", |
| 491 | + ) |
| 492 | + |
| 493 | + # Validate sqltype |
| 494 | + valid_sqltypes = [ |
| 495 | + ConstantsDDBC.SQL_CHAR.value, |
| 496 | + ConstantsDDBC.SQL_WCHAR.value, |
| 497 | + SQL_WMETADATA |
| 498 | + ] |
| 499 | + if sqltype not in valid_sqltypes: |
| 500 | + raise ProgrammingError( |
| 501 | + driver_error=f"Invalid sqltype: {sqltype}", |
| 502 | + ddbc_error=f"sqltype must be SQL_CHAR ({ConstantsDDBC.SQL_CHAR.value}), SQL_WCHAR ({ConstantsDDBC.SQL_WCHAR.value}), or SQL_WMETADATA ({SQL_WMETADATA})", |
| 503 | + ) |
| 504 | + |
| 505 | + return self._decoding_settings[sqltype].copy() |
| 506 | + |
207 | 507 | def cursor(self) -> Cursor: |
208 | 508 | """ |
209 | 509 | Return a new Cursor object using the connection. |
|
0 commit comments