tft每日頭條

 > 科技

 > python sql 解析

python sql 解析

科技 更新时间:2024-11-18 18:10:23

python sql 解析(用Python給你醍醐灌頂)1

執行流程如下

python sql 解析(用Python給你醍醐灌頂)2

那麼根據流程所需要的功能,需要以下的實例進行支撐:

1.并發實例

2.查詢數據實例

3.執行post請求實例

目标:編寫Http執行POST請求的基本類方法

編寫test03.py查詢mysql相關對應字段數據

1# -*- coding: utf-8 -*- 2 3from tools.MysqlTools import MysqldbHelper 4import pymysql 5from tools.PostTools import PostHelper 6 7if __name__ == "__main__": 8 9 # 定義數據庫訪問參數 10 config = { 11 'host': '******#注釋', 12 'port': 3361, 13 'user': 'root', 14 'passwd': '******#注釋', 15 'charset': 'utf8', 16 'cursorclass': pymysql.cursors.DictCursor 17 } 18 19 # 初始化打開數據庫連接 20 mydb = MysqldbHelper(config) 21 22 # 選擇數據庫 23 DB_NAME = '******#注釋' 24 # mydb.createDataBase(DB_NAME) 25 26 # 選擇數據庫 27 print "========= 選擇數據庫%s ===========" % DB_NAME 28 mydb.selectDataBase(DB_NAME) 29 30 #選擇表 31 TABLE_NAME = '******#注釋' 32 33 # 數據查詢 34 print "========= 數據查詢 ===========" 35 select_fields = [ 36 "id", 37 ******#注釋 38 "1", 39 "1", 40 "null", 41 "NOW()", 42 "last_sync_time", 43 ] 44 select_order = "order by id desc limit 1" 45 select_result = mydb.select(TABLE_NAME, fields=select_fields,order=select_order) 46 47 # 拆分打印的字段 48 for row in select_result: 49 print "id=", row['id'] 50 print "user_id=", row['user_id'] 51 ******#注釋 52 print "1=", row['1'] 53 print "1=", row['1'] 54 print "null=", None # 注意:查詢null的需要寫為None 55 print "NOW()=", row['NOW()'] 56 print "last_sync_time=", row['last_sync_time'] 57 print

私信菜鳥007獲取源文件!

python sql 解析(用Python給你醍醐灌頂)3

編寫model類,抽象查詢的過程方法

python sql 解析(用Python給你醍醐灌頂)4

models.py

我新建了一個core文件夾目錄,然後新建一個models,專門用來處理查詢以及調用API發送請求的業務處理。

models.py代碼如下:

1# -*- coding: utf-8 -*- 2 3from tools.MysqlTools import MysqldbHelper 4 5class ModelHelper(object): # 繼承object類所有方法 6 7 # 初始化數據庫連接 8 def __init__(self , config): 9 self.mydb = MysqldbHelper(config) # 初始化打開數據庫連接 10 11 def selectTable(self,DB_NAME,TABLE_NAME,fields,order): 12 # 選擇數據庫 13 self.mydb.selectDataBase(DB_NAME) 14 # 數據查詢 15 print "========= 數據查詢 ===========" 16 result = self.mydb.select(TABLE_NAME, fields=fields,order=order) 17 # 返回查詢的數據 18 return result

編寫test04.py測試文件,執行測試一下:

1# -*- coding: utf-8 -*- 2 3from tools.MysqlTools import MysqldbHelper 4import pymysql 5from tools.PostTools import PostHelper 6from core.models import ModelHelper 7 8if __name__ == "__main__": 9 10 # 定義數據庫訪問參數 11 config = { 12 'host': '#####注釋####', 13 'port': 3361, 14 'user': 'root', 15 'passwd': '#####注釋####', 16 'charset': 'utf8', 17 'cursorclass': pymysql.cursors.DictCursor 18 } 19 # 初始化數據模型 20 model = ModelHelper(config) 21 # 設置需要查詢的數據庫 22 DB_NAME = '#####注釋####' 23 # 設置需要查詢的表明 24 TABLE_NAME = '#####注釋####' 25 # 數據查詢 26 select_fields = [ 27 "id", 28 #####注釋#### 29 "1", 30 "1", 31 "null", 32 "NOW()", 33 "last_sync_time", 34 ] 35 select_order = "order by id desc limit 1" 36 select_result = model.selectTable(DB_NAME,TABLE_NAME,select_fields,select_order) 37 38 # 拆分打印的字段 39 for row in select_result: 40 print "id=", row['id'] 41 #####注釋#### 42 print "1=", row['1'] 43 print "1=", row['1'] 44 print "null=", None 45 print "NOW()=", row['NOW()'] 46 print "last_sync_time=", row['last_sync_time'] 47 print

測試結果如下:

1id= 1066511830261694464 2 #####注釋#### 31= 1 41= 1 5null= None 6NOW()= 2018-11-27 11:45:08 7last_sync_time= 2018-11-25 10:00:10

那麼已經抽象了這部分查詢在model中處理了,還有下一步請求API也要寫入到model中自動處理。

那麼下面來繼續寫寫。

将返回的查詢結果轉化為字典類型數據

其中查詢的舊表字段與新表的字段應該要用字典進行一一映射關聯,方便後續調用。

1、定義字典存儲 舊表字段 《==》新表字段的映射關系

2、獲取舊表字段數據,進行數據查詢

3、獲取新表字段對應存儲數據,再次使用API請求新表,灌入數據

1# 設置字段映射字典: 舊表查詢字段 ==》 新表的字段 2 dict_fields = { 3 "id": "id", 4 "user_id": "user_id", 5 ## 注釋部分字段 6 "1": "purpose", 7 "1": "status", 8 "null": "data_source", 9 "NOW()": "create_time", 10 "last_sync_time": "last_modify_time" 11 } 1 # 獲取舊表字段數組 2 select_fields = [] 3 for key, value in dict_fields.items(): 4 print "key = %s , value = %s" % (key,value) 5 select_fields.append(key) 6 7 print "select_fields = " 8 print select_fields

執行結果如下:

1# 映射字典的查詢數據 2key = user_code , value = user_code 3key = null , value = data_source 4###### 注釋部分 ####### 5key = NOW() , value = create_time 6key = 1 , value = status 7key = user_level , value = user_level 8 9# 獲取生成舊表需要查詢的字段 10select_fields = [ 11'census_town', 12###### 注釋部分 ####### 13 'NOW()', '1', 14 'user_level' 15]

1、那麼下面就可以根據獲取的字段數據,進行mysql數據查詢

2、然後生成一個body請求體字典數據,但是此時body的請求體key是舊表的字段,請求API的時候需要新表的字段,那麼就需要進行字段替換

3、再寫一個字段映射字典的循環,生成請求API的new_body

1# 此時已有查詢字段的數組 2print "select_fields = " 3 print select_fields 4 5 select_order = "order by id desc limit 2" 6 select_result = model.selectTable(DB_NAME,TABLE_NAME,select_fields,select_order) 7 8 # 循環生成每條查詢數據的請求body 9 body = {} 10 for result in select_result: 11 for field in select_fields: 12 if field == "null": 13 body[field] = None 14 else: 15 body[field] = result[field] 16 17 # 打印查看已生成的body數據 18 for field in select_fields: 19 print body[field] 20 21 print body 22 23 # 更新body的字段為新表的字段 24 new_body = {} 25 for key, value in dict_fields.items(): 26 print "key = %s , value = %s" % (key,value) 27 new_body[value] = body[key] 28 29 print "new_body=" 30 print new_body

執行結果如下:

python sql 解析(用Python給你醍醐灌頂)5

那麼上面的過程最好寫在model中,這樣可以方便使用。

編寫model增加生成請求API的body數據相關方法

python sql 解析(用Python給你醍醐灌頂)6

1# -*- coding: utf-8 -*- 2 3from tools.MysqlTools import MysqldbHelper 4from tools.PostTools import PostHelper 5 6class ModelHelper(object): # 繼承object類所有方法 7 8 # 初始化數據庫連接 9 def __init__(self , config): 10 self.mydb = MysqldbHelper(config) # 初始化打開數據庫連接 11 12 # 根據設置的舊表字段,查詢舊庫的數據庫數據 13 def selectTable(self,DB_NAME,TABLE_NAME,fields,order): 14 # 選擇數據庫 15 self.mydb.selectDataBase(DB_NAME) 16 # 數據查詢 17 result = self.mydb.select(TABLE_NAME, fields=fields,order=order) 18 # 返回查詢的數據 19 return result 20 21 # 根據字段映射字典獲取舊表字段數組 22 def getSelectFields(self,dict_fields): 23 # 獲取舊表字段數組 24 select_fields = [] 25 for key, value in dict_fields.items(): 26 # print "key = %s , value = %s" % (key, value) 27 select_fields.append(key) 28 return select_fields 29 30 # 根據查詢的結果以及字段字典,轉化為請求API的body 31 def convertApiBody(self,result,dict_fields): 32 # 循環生成每條查詢數據的請求body 33 body = {} 34 for result in result: 35 for field in result: 36 if field == "null": 37 body[field] = None 38 else: 39 body[field] = result[field] 40 # 更新body的字段為新表的字段 41 new_body = {} 42 for key, value in dict_fields.items(): 43 # print "key = %s , value = %s" % (key, value) 44 if key == "null": 45 new_body[value] = None 46 else: 47 new_body[value] = body[key] 48 return new_body

使用model方法,以及執行結果:

python sql 解析(用Python給你醍醐灌頂)7

那麼下一步就是再編寫一個執行API的方法。

但是在請求API之前,需要将body序列化為json格式,這個存在datetime類型導緻序列化失敗的情況,下一個篇章繼續。

,

更多精彩资讯请关注tft每日頭條,我们将持续为您更新最新资讯!

查看全部

相关科技资讯推荐

热门科技资讯推荐

网友关注

Copyright 2023-2024 - www.tftnews.com All Rights Reserved