Coverage for src/pyensae/sql/sql_interface_database.py: 98%

43 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-03 02:16 +0200

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief Abstract class to connect to a SQL server using various way. 

5It will be used to implement magic functions 

6""" 

7import pandas 

8from pyquickhelper.loghelper import noLOG 

9from .database_main import Database 

10from .sql_interface import InterfaceSQL, InterfaceSQLException 

11 

12 

13class InterfaceSQLDatabase(InterfaceSQL): 

14 

15 """ 

16 Abstract class to connect to a SQL server using various way. 

17 It will be used to implement magic functions 

18 """ 

19 

20 def __init__(self, filename): 

21 """ 

22 @param filename str or :class:`Database <pyensae.sql.database_main.Database>` 

23 

24 If *filename* is a :class:`Database <pyensae.sql.database_main.Database>`, 

25 the object is kept as is. 

26 

27 .. versionchanged:: 1.1 

28 Parameter *filename* can be a database. 

29 """ 

30 if isinstance(filename, Database): 

31 self.obj = filename 

32 else: 

33 self.obj = Database(filename, LOG=noLOG) 

34 

35 def connect(self): 

36 """ 

37 connection to the database 

38 """ 

39 self.obj.connect() 

40 self.populate_completion() 

41 

42 def close(self): 

43 """ 

44 close the connection to the database 

45 """ 

46 self.obj.close() 

47 

48 def get_table_list(self): 

49 """ 

50 returns the list of tables in the database 

51 

52 @return list of strings 

53 """ 

54 return self.obj.get_table_list() 

55 

56 def get_table_columns(self, table_name, as_dict=True): 

57 """ 

58 returns the list of columns in a table 

59 

60 @param table_name table name 

61 @param as_dict True, as dictionary, as a list otherwise 

62 @return dictionary { "column": (position, type) } 

63 """ 

64 return self.obj.get_table_columns(table_name, dictionary=as_dict) 

65 

66 def execute_clean_query(self, sql_query): 

67 """ 

68 return the resuls of a SQL query 

69 

70 @param sql_query query to execute 

71 @return pandas DataFrame 

72 """ 

73 con = self.obj._connection 

74 try: 

75 return pandas.read_sql(sql_query, con) 

76 except pandas.io.sql.DatabaseError: 

77 try: 

78 self.obj.execute_view(sql_query) 

79 except Exception as ee: 

80 raise InterfaceSQLException(str(ee)) from ee 

81 

82 def import_flat_file(self, filename, table_name): 

83 """ 

84 import a flat file as a table, we assume the columns 

85 separator is ``\\t`` and the file name contains a header 

86 

87 @param filename filename 

88 @param table table name 

89 @return the number of added rows 

90 """ 

91 r = self.obj.import_table_from_flat_file( 

92 filename, 

93 table_name, 

94 columns=None, 

95 header=True) 

96 self.populate_completion() 

97 return r 

98 

99 def drop_table(self, table_name): 

100 """ 

101 drops a table 

102 

103 @param table table name 

104 """ 

105 self.obj.remove_table(table_name) 

106 self.populate_completion() 

107 

108 def add_function(self, code_function): 

109 """ 

110 add a function to the database which can be called in a SELECT statement 

111 

112 @param code_function pointer to the function 

113 """ 

114 name = code_function.__name__ 

115 nbp = code_function.__code__.co_argcount 

116 self.obj.add_function(name, nbp, code_function) 

117 

118 def import_dataframe(self, tablename, df): 

119 """ 

120 import a dataframe into the database 

121 

122 @param tablename name of the table 

123 @param df dataframe 

124 @return the number of added rows 

125 """ 

126 df.to_sql(tablename, self.obj._connection) 

127 self.populate_completion() 

128 return len(df)