|
15 | 15 | import uuid |
16 | 16 | import datetime |
17 | 17 | import warnings |
18 | | -from typing import List, Union, Any, Optional, Tuple, Sequence, TYPE_CHECKING |
| 18 | +from typing import List, Union, Any, Optional, Tuple, Sequence, TYPE_CHECKING, Iterable |
19 | 19 | from mssql_python.constants import ConstantsDDBC as ddbc_sql_const, SQLTypes |
20 | 20 | from mssql_python.helpers import check_error |
21 | 21 | from mssql_python.logging import logger |
@@ -2451,6 +2451,190 @@ def nextset(self) -> Union[bool, None]: |
2451 | 2451 | ) |
2452 | 2452 | return True |
2453 | 2453 |
|
| 2454 | + def _bulkcopy( |
| 2455 | + self, table_name: str, data: Iterable[Union[Tuple, List]], **kwargs |
| 2456 | + ): # pragma: no cover |
| 2457 | + """ |
| 2458 | + Perform bulk copy operation for high-performance data loading. |
| 2459 | +
|
| 2460 | + Args: |
| 2461 | + table_name: Target table name (can include schema, e.g., 'dbo.MyTable'). |
| 2462 | + The table must exist and the user must have INSERT permissions. |
| 2463 | +
|
| 2464 | + data: Iterable of tuples or lists containing row data to be inserted. |
| 2465 | +
|
| 2466 | + Data Format Requirements: |
| 2467 | + - Each element in the iterable represents one row |
| 2468 | + - Each row should be a tuple or list of column values |
| 2469 | + - Column order must match the target table's column order (by ordinal |
| 2470 | + position), unless column_mappings is specified |
| 2471 | + - The number of values in each row must match the number of columns |
| 2472 | + in the target table |
| 2473 | +
|
| 2474 | + **kwargs: Additional bulk copy options. |
| 2475 | +
|
| 2476 | + column_mappings (List[Tuple[int, str]], optional): |
| 2477 | + Maps source data column indices to target table column names. |
| 2478 | + Each tuple is (source_index, target_column_name) where: |
| 2479 | + - source_index: 0-based index of the column in the source data |
| 2480 | + - target_column_name: Name of the target column in the database table |
| 2481 | +
|
| 2482 | + When omitted: Columns are mapped by ordinal position (first data |
| 2483 | + column → first table column, second → second, etc.) |
| 2484 | +
|
| 2485 | + When specified: Only the mapped columns are inserted; unmapped |
| 2486 | + source columns are ignored, and unmapped target columns must |
| 2487 | + have default values or allow NULL. |
| 2488 | +
|
| 2489 | + Returns: |
| 2490 | + Dictionary with bulk copy results including: |
| 2491 | + - rows_copied: Number of rows successfully copied |
| 2492 | + - batch_count: Number of batches processed |
| 2493 | + - elapsed_time: Time taken for the operation |
| 2494 | +
|
| 2495 | + Raises: |
| 2496 | + ImportError: If mssql_py_core library is not installed |
| 2497 | + TypeError: If data is None, not iterable, or is a string/bytes |
| 2498 | + ValueError: If table_name is empty or parameters are invalid |
| 2499 | + RuntimeError: If connection string is not available |
| 2500 | + """ |
| 2501 | + try: |
| 2502 | + import mssql_py_core |
| 2503 | + except ImportError as exc: |
| 2504 | + raise ImportError( |
| 2505 | + "Bulk copy requires the mssql_py_core library which is not installed. " |
| 2506 | + "To install, run: pip install mssql_py_core " |
| 2507 | + ) from exc |
| 2508 | + |
| 2509 | + # Validate inputs |
| 2510 | + if not table_name or not isinstance(table_name, str): |
| 2511 | + raise ValueError("table_name must be a non-empty string") |
| 2512 | + |
| 2513 | + # Validate that data is iterable (but not a string or bytes, which are technically iterable) |
| 2514 | + if data is None: |
| 2515 | + raise TypeError("data must be an iterable of tuples or lists, got None") |
| 2516 | + if isinstance(data, (str, bytes)): |
| 2517 | + raise TypeError( |
| 2518 | + f"data must be an iterable of tuples or lists, got {type(data).__name__}. " |
| 2519 | + "Strings and bytes are not valid row collections." |
| 2520 | + ) |
| 2521 | + if not hasattr(data, "__iter__"): |
| 2522 | + raise TypeError( |
| 2523 | + f"data must be an iterable of tuples or lists, got non-iterable {type(data).__name__}" |
| 2524 | + ) |
| 2525 | + |
| 2526 | + # Extract and validate kwargs with defaults |
| 2527 | + batch_size = kwargs.get("batch_size", None) |
| 2528 | + timeout = kwargs.get("timeout", 30) |
| 2529 | + |
| 2530 | + # Validate batch_size type and value (only if explicitly provided) |
| 2531 | + if batch_size is not None: |
| 2532 | + if not isinstance(batch_size, (int, float)): |
| 2533 | + raise TypeError( |
| 2534 | + f"batch_size must be a positive integer, got {type(batch_size).__name__}" |
| 2535 | + ) |
| 2536 | + if batch_size <= 0: |
| 2537 | + raise ValueError(f"batch_size must be positive, got {batch_size}") |
| 2538 | + |
| 2539 | + # Validate timeout type and value |
| 2540 | + if not isinstance(timeout, (int, float)): |
| 2541 | + raise TypeError(f"timeout must be a positive number, got {type(timeout).__name__}") |
| 2542 | + if timeout <= 0: |
| 2543 | + raise ValueError(f"timeout must be positive, got {timeout}") |
| 2544 | + |
| 2545 | + # Get and parse connection string |
| 2546 | + if not hasattr(self.connection, "connection_str"): |
| 2547 | + raise RuntimeError("Connection string not available for bulk copy") |
| 2548 | + |
| 2549 | + # Use the proper connection string parser that handles braced values |
| 2550 | + from mssql_python.connection_string_parser import _ConnectionStringParser |
| 2551 | + |
| 2552 | + parser = _ConnectionStringParser(validate_keywords=False) |
| 2553 | + params = parser._parse(self.connection.connection_str) |
| 2554 | + |
| 2555 | + if not params.get("server"): |
| 2556 | + raise ValueError("SERVER parameter is required in connection string") |
| 2557 | + |
| 2558 | + if not params.get("database"): |
| 2559 | + raise ValueError( |
| 2560 | + "DATABASE parameter is required in connection string for bulk copy. " |
| 2561 | + "Specify the target database explicitly to avoid accidentally writing to system databases." |
| 2562 | + ) |
| 2563 | + |
| 2564 | + # Build connection context for bulk copy library |
| 2565 | + # Note: Password is extracted separately to avoid storing it in the main context |
| 2566 | + # dict that could be accidentally logged or exposed in error messages. |
| 2567 | + trust_cert = params.get("trustservercertificate", "yes").lower() in ("yes", "true") |
| 2568 | + |
| 2569 | + # Parse encryption setting from connection string |
| 2570 | + encrypt_param = params.get("encrypt") |
| 2571 | + if encrypt_param is not None: |
| 2572 | + encrypt_value = encrypt_param.strip().lower() |
| 2573 | + if encrypt_value in ("yes", "true", "mandatory", "required"): |
| 2574 | + encryption = "Required" |
| 2575 | + elif encrypt_value in ("no", "false", "optional"): |
| 2576 | + encryption = "Optional" |
| 2577 | + else: |
| 2578 | + # Pass through unrecognized values (e.g., "Strict") to the underlying driver |
| 2579 | + encryption = encrypt_param |
| 2580 | + else: |
| 2581 | + encryption = "Optional" |
| 2582 | + |
| 2583 | + context = { |
| 2584 | + "server": params.get("server"), |
| 2585 | + "database": params.get("database"), |
| 2586 | + "user_name": params.get("uid", ""), |
| 2587 | + "trust_server_certificate": trust_cert, |
| 2588 | + "encryption": encryption, |
| 2589 | + } |
| 2590 | + |
| 2591 | + # Extract password separately to avoid storing it in generic context that may be logged |
| 2592 | + password = params.get("pwd", "") |
| 2593 | + pycore_context = dict(context) |
| 2594 | + pycore_context["password"] = password |
| 2595 | + |
| 2596 | + pycore_connection = None |
| 2597 | + pycore_cursor = None |
| 2598 | + try: |
| 2599 | + pycore_connection = mssql_py_core.PyCoreConnection(pycore_context) |
| 2600 | + pycore_cursor = pycore_connection.cursor() |
| 2601 | + |
| 2602 | + result = pycore_cursor.bulkcopy(table_name, iter(data), **kwargs) |
| 2603 | + |
| 2604 | + return result |
| 2605 | + |
| 2606 | + except Exception as e: |
| 2607 | + # Log the error for debugging (without exposing credentials) |
| 2608 | + logger.debug( |
| 2609 | + "Bulk copy operation failed for table '%s': %s: %s", |
| 2610 | + table_name, |
| 2611 | + type(e).__name__, |
| 2612 | + str(e), |
| 2613 | + ) |
| 2614 | + # Re-raise without exposing connection context in the error chain |
| 2615 | + # to prevent credential leakage in stack traces |
| 2616 | + raise type(e)(str(e)) from None |
| 2617 | + |
| 2618 | + finally: |
| 2619 | + # Clear sensitive data to minimize memory exposure |
| 2620 | + password = "" |
| 2621 | + if pycore_context: |
| 2622 | + pycore_context["password"] = "" |
| 2623 | + pycore_context["user_name"] = "" |
| 2624 | + # Clean up bulk copy resources |
| 2625 | + for resource in (pycore_cursor, pycore_connection): |
| 2626 | + if resource and hasattr(resource, "close"): |
| 2627 | + try: |
| 2628 | + resource.close() |
| 2629 | + except Exception as cleanup_error: |
| 2630 | + # Log cleanup errors at debug level to aid troubleshooting |
| 2631 | + # without masking the original exception |
| 2632 | + logger.debug( |
| 2633 | + "Failed to close bulk copy resource %s: %s", |
| 2634 | + type(resource).__name__, |
| 2635 | + cleanup_error, |
| 2636 | + ) |
| 2637 | + |
2454 | 2638 | def __enter__(self): |
2455 | 2639 | """ |
2456 | 2640 | Enter the runtime context for the cursor. |
|
0 commit comments