Skip to content

Commit 572739d

Browse files
authored
Refine some db codes (#2751)
* refine some db code * polish * add docstring
1 parent 7a55e89 commit 572739d

7 files changed

Lines changed: 60 additions & 17 deletions

File tree

python/runtime/db.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import numpy as np
1818
import runtime.db_writer as db_writer
1919
import six
20-
from odps import ODPS, tunnel
2120

2221

2322
def parseMySQLDSN(dsn):

python/runtime/db_writer/hive.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ def _column_list(self):
5858
return result
5959

6060
def _indexing_table_schema(self, table_schema):
61-
cursor = self.conn.cursor()
6261
column_list = self._column_list()
6362

6463
schema_idx = []
@@ -77,7 +76,7 @@ def _indexing_table_schema(self, table_schema):
7776

7877
def _ordered_row_data(self, row):
7978
# Use NULL as the default value for hive columns
80-
row_data = ["NULL" for i in range(len(self.table_schema))]
79+
row_data = ["NULL" for _ in range(len(self.table_schema))]
8180
for idx, element in enumerate(row):
8281
row_data[self.schema_idx[idx]] = str(element)
8382
return CSV_DELIMITER.join(row_data)

python/runtime/db_writer/maxcompute.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,38 @@
1111
# See the License for the specific language governing permissions and
1212
# limitations under the License.
1313

14-
from odps import ODPS, tunnel
1514
from runtime.db_writer.base import BufferedDBWriter
1615

1716

1817
class MaxComputeDBWriter(BufferedDBWriter):
18+
"""
19+
MaxComputeDBWriter is used to write the Python row data into
20+
the MaxCompute table.
21+
22+
Args:
23+
conn: the database connection object.
24+
table_name (str): the MaxCompute table name.
25+
table_schema (list[str]): the column names of the MaxCompute table.
26+
buff_size (int): the buffer size to be flushed.
27+
"""
1928
def __init__(self, conn, table_name, table_schema, buff_size):
20-
return super(MaxComputeDBWriter,
21-
self).__init__(conn, table_name, table_schema, buff_size)
29+
super(MaxComputeDBWriter, self).__init__(conn, table_name,
30+
table_schema, buff_size)
31+
32+
# NOTE: import odps here instead of in the front of this file,
33+
# so that we do not need the odps package installed in the Docker
34+
# image if we do not use MaxComputeDBWriter.
35+
from odps import tunnel
36+
self.compress = tunnel.CompressOption.CompressAlgorithm.ODPS_ZLIB
2237

2338
def flush(self):
24-
compress = tunnel.CompressOption.CompressAlgorithm.ODPS_ZLIB
39+
"""
40+
Flush the row data into the MaxCompute table.
41+
42+
Returns:
43+
None
44+
"""
2545
self.conn.write_table(self.table_name,
2646
self.rows,
27-
compress_option=compress)
47+
compress_option=self.compress)
2848
self.rows = []

python/runtime/db_writer/mysql.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,32 @@
1515

1616

1717
class MySQLDBWriter(BufferedDBWriter):
18-
def __init__(self, conn, table_name, table_schema, buff_size):
19-
return super().__init__(conn, table_name, table_schema, buff_size)
18+
"""
19+
MySQLDBWriter is used to write the Python row data into
20+
the MySQL table.
2021
21-
def flush(self):
22-
statement = '''insert into {} ({}) values({})'''.format(
22+
Args:
23+
conn: the database connection object.
24+
table_name (str): the MySQL table name.
25+
table_schema (list[str]): the column names of the MySQL table.
26+
buff_size (int): the buffer size to be flushed.
27+
"""
28+
def __init__(self, conn, table_name, table_schema, buff_size):
29+
super().__init__(conn, table_name, table_schema, buff_size)
30+
self.statement = '''insert into {} ({}) values({})'''.format(
2331
self.table_name, ", ".join(self.table_schema),
2432
", ".join(["%s"] * len(self.table_schema)))
33+
34+
def flush(self):
35+
"""
36+
Flush the row data into the MySQL table.
37+
38+
Returns:
39+
None
40+
"""
2541
cursor = self.conn.cursor()
2642
try:
27-
cursor.executemany(statement, self.rows)
43+
cursor.executemany(self.statement, self.rows)
2844
self.conn.commit()
2945
finally:
3046
cursor.close()

python/runtime/diagnostics.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1111
# See the License for the specific language governing permissions and
1212
# limitations under the License.
13-
import copy
1413
import inspect
1514
import os
1615
import re

python/runtime/maxcompute.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,19 @@
1919
# Here we use the sdk to operate the database
2020
class MaxCompute:
2121
@staticmethod
22-
def connect(database, user, password, host, auth=""):
22+
def connect(database, user, password, host):
23+
"""
24+
Create a MaxCompute database connection object.
25+
26+
Args:
27+
database: the MaxCompute project name.
28+
user: the MaxCompute AK.
29+
password: the MaxCompute SK.
30+
host: the MaxCompute endpoint address.
31+
32+
Returns:
33+
A MaxCompute database connection object.
34+
"""
2335
return ODPS(user, password, project=database, endpoint=host)
2436

2537
@staticmethod

python/runtime/oss.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,8 @@
1111
# See the License for the specific language governing permissions and
1212
# limitations under the License.
1313

14-
import io
1514
import os
1615
import pickle
17-
import tarfile
1816

1917
import oss2
2018
import tensorflow as tf

0 commit comments

Comments
 (0)