refactor: 🎉refactor code

main
luxuemin 3 years ago
parent 552a8f170a
commit be21338725

@ -16,32 +16,29 @@ from pprint import pprint
import pandas
from db.connect import connect
from fund_info.crawler import FundSpider
from fund_info.api import FundApier
from fund_info.csv import FundCSV
from lib.mysnowflake import IdWorker
from utils.login import login_morning_star
from sql_model.fund_query import FundQuery
from sql_model.fund_insert import FundInsert
connect_instance = connect()
cursor = connect_instance.cursor()
lock = Lock()
# 利用api获取同类基金的资产
def generate_insert_sql(target_dict, table_name, ignore_list):
keys = ','.join(target_dict.keys())
values = ','.join(['%s'] * len(target_dict))
update_values = ''
for key in target_dict.keys():
if key in ignore_list:
continue
update_values = update_values + '{0}=VALUES({0}),'.format(key)
sql_insert = "INSERT INTO {table} ({keys}) VALUES ({values}) ON DUPLICATE KEY UPDATE {update_values}; ".format(
table=table_name,
keys=keys,
values=values,
update_values=update_values[0:-1]
)
return sql_insert
def get_total_asset(fund_code, platform):
each_fund = FundApier(fund_code, '2021-05-07', platform)
total_asset = each_fund.get_total_asset()
# 如果在爱基金平台找不到,则到展恒基金找
if total_asset == None and platform == 'ai_fund':
print("fund_code", total_asset, fund_code)
each_fund = FundApier(fund_code, '2021-05-10', 'zh_fund')
total_asset = each_fund.get_total_asset()
if __name__ == '__main__':
@ -62,7 +59,7 @@ if __name__ == '__main__':
fund_csv.write_abnormal_url_fund(True)
# df = pandas.read_csv(
# result_dir + 'fund_morning_season_error.csv', usecols=[0, 2, 4])
# result_dir + 'fund_morning_quarter_error.csv', usecols=[0, 2, 4])
# fund_list = df.values.tolist()
# print(len(d[d['代码'].astype(str).str.contains('10535')]))
# print(df[df['代码'].astype(str).str.contains('10535')]
@ -76,10 +73,8 @@ if __name__ == '__main__':
while(page_start < end):
results = each_fund_query.select_quarter_fund(
page_start, page_limit)
print('results', results)
for record in results:
sleep(1)
print(current_thread().getName(), 'record-->', record)
each_fund = FundSpider(
record[0], record[1], record[2], chrome_driver)
is_normal = each_fund.go_fund_url()
@ -94,20 +89,15 @@ if __name__ == '__main__':
continue
# 开始爬取数据
quarter_index = each_fund.get_quarter_index() # 数据更新时间
if quarter_index == each_fund.quarter_index:
quarter_index = each_fund.get_quarter_index() # 数据更新时间,如果不一致,不爬取下面数据
if quarter_index != each_fund.quarter_index:
print('quarter_index', quarter_index)
if each_fund.fund_name.endswith('A'):
print('fund_name', each_fund.fund_name[0:-1])
similar_name = each_fund.fund_name[0:-1]
results = each_fund_query.select_similar_fund(
similar_name) # 获取查询的所有记录
print('results', results)
continue
each_fund.get_fund_season_info() # 基本季度性数据
each_fund.get_fund_manager_info() # 基金经理模块
each_fund.get_fund_morning_rating() # 基金晨星评级
each_fund.get_fund_qt_rating() # 基金风险评级
print('each_fund.total_asset', each_fund.total_asset)
# 判断是否有股票持仓,有则爬取
if each_fund.stock_position['total'] != '0.00' and each_fund.total_asset != None:
each_fund.get_asset_composition_info()
@ -122,8 +112,8 @@ if __name__ == '__main__':
lock.acquire()
snow_flake_id = IdWorker.get_id()
lock.release()
continue
# 开始存入数据
fund_insert = FundInsert()
# 基金经理
if each_fund.manager.get('id'):
manager_dict = {
@ -132,15 +122,9 @@ if __name__ == '__main__':
'name': each_fund.manager.get('name'),
'brife': each_fund.manager.get('brife')
}
manager_sql_insert = generate_insert_sql(
manager_dict, 'fund_morning_manager', ['id', 'manager_id', 'name'])
lock.acquire()
# cursor.execute(manager_sql_insert,
# tuple(manager_dict.values()))
# connect_instance.commit()
lock.release()
fund_insert.insert_fund_manger_info(manager_dict)
# 季度信息 TODO: 对比数据更新时间field
season_dict = {
quarterly_dict = {
'id': snow_flake_id,
'quarter_index': each_fund.quarter_index,
'fund_code': each_fund.fund_code,
@ -168,13 +152,7 @@ if __name__ == '__main__':
'morning_star_rating_5': each_fund.morning_star_rating.get(5),
'morning_star_rating_10': each_fund.morning_star_rating.get(10),
}
season_sql_insert = generate_insert_sql(
season_dict, 'fund_morning_season', ['id', 'quarter_index', 'fund_code'])
lock.acquire()
# cursor.execute(season_sql_insert,
# tuple(season_dict.values()))
# connect_instance.commit()
lock.release()
fund_insert.fund_quarterly_info(quarterly_dict)
# 入库十大股票持仓
stock_position_total = each_fund.stock_position.get(
'total', '0.00')
@ -196,14 +174,24 @@ if __name__ == '__main__':
stock_dict[portion_key] = temp_stock['stock_portion']
market_key = prefix + 'market'
stock_dict[market_key] = temp_stock['stock_market']
stock_sql_insert = generate_insert_sql(
stock_dict, 'fund_morning_stock_info', ['id', 'quarter_index', 'fund_code'])
lock.acquire()
# print('stock_sql_insert', stock_sql_insert)
# cursor.execute(stock_sql_insert,
# tuple(stock_dict.values()))
# connect_instance.commit()
lock.release()
fund_insert.fund_stock_info(stock_dict)
# 获取同类基金,再获取同类基金的总资产
if each_fund.fund_name.endswith('A'):
similar_name = each_fund.fund_name[0:-1]
results = each_fund_query.select_similar_fund(
similar_name) # 获取查询的所有记录
platform = 'zh_fund' if '封闭' in similar_name else 'ai_fund'
for i in range(0, len(results)):
item = results[i]
item_code = item[0]
total_asset = get_total_asset(item_code, platform)
quarterly_dict['fund_code'] = item_code
quarterly_dict['total_asset'] = total_asset
quarterly_dict['id'] = snow_flake_id + i + 1
fund_insert.fund_quarterly_info(quarterly_dict)
stock_dict['fund_code'] = item_code
stock_dict['id'] = snow_flake_id + i + 1
fund_insert.fund_stock_info(stock_dict)
# pprint(fundDict)
page_start = page_start + page_limit
print(current_thread().getName(), 'page_start', page_start)
@ -213,25 +201,25 @@ if __name__ == '__main__':
start_time = time()
step_num = 10
steps = [{
"start": 800,
"end": 2500
"start": 0,
"end": 1500
}, {
"start": 2740,
"end": 5000
"start": 1500,
"end": 3000
}, {
"start": 5100,
"end": 7500
"start": 3000,
"end": 4500
}, {
"start": 8300,
"start": 4500,
"end": record_total
}]
for i in range(0):
skip_num = 10
# print(i * step_num + skip_num, (i+1) * step_num)
# start = steps[i]['start']
# end = steps[i]['end']
start = i * step_num
end = (i + 1) * step_num
for i in range(4):
skip_num = 0
# print(i * step_num + skip_num, (i + 1) * step_num)
start = steps[i]['start']
end = steps[i]['end']
# start = i * step_num
# end = (i + 1) * step_num
t = Thread(target=crawlData, args=(start, end))
t.setDaemon(True)
threaders.append(t)
@ -239,6 +227,6 @@ if __name__ == '__main__':
for threader in threaders:
threader.join()
end_time = time()
print('run time is %s' % (end_time - start_time))
print(record_total, 'run time is %s' % (end_time - start_time))
# print('error_funds', error_funds)
exit()

@ -304,10 +304,17 @@ class FundSpider:
def get_quarter_index(self):
# 总资产 TODO: 增加一个数据更新时间field
self.update_date = self.get_element_text_by_class_name(
update_date = self.get_element_text_by_class_name(
"date4", 'aspnetForm')
print('self.update_date ', self.fund_code, self.update_date)
if(update_date == None):
self._chrome_driver.refresh()
print('wait:fund_code', self.fund_code)
sleep(9)
update_date = self.get_element_text_by_class_name(
"date4", 'aspnetForm')
if update_date == None:
return
self.update_date = update_date
split_dates = self.update_date.split('-', 1)
quarter_index = get_season_index(split_dates[1])
print("self.update_date", split_dates[0] + '-Q' + str(quarter_index))
return split_dates[0] + '-Q' + str(quarter_index)

@ -14,26 +14,25 @@ class FundCSV:
def __init__(self, output_dir):
self.output_dir = output_dir
self.lock = Lock()
# 爬取过程中异常基金
# 爬取过程中异常基金
def write_season_catch_fund(self, is_init=False, data=None):
head_data = '代码' + ',' + '晨星专属号' + ',' + '名称' + ',' + \
'类型' + ',' + '股票总仓位' + ',' + '页码' + ',' + '备注' + '\n'
mode = 'w+' if is_init else 'a'
write_data = head_data if is_init else data
self.lock.acquire()
with open(self.output_dir + 'fund_morning_season_catch.csv', mode) as csv_file:
with open(self.output_dir + 'fund_morning_quarter_catch.csv', mode) as csv_file:
csv_file.write(write_data)
self.lock.release()
# 基金url跳转异常基金
def write_abnormal_url_fund(self, is_init=False, data=None):
head_data = '代码' + ',' + '晨星专属号' + ',' + '名称' + ',' + \
'类型' + ',' + '页码' + ',' + '备注' + '\n'
mode = 'w+' if is_init else 'a'
write_data = head_data if is_init else data
self.lock.acquire()
with open(self.output_dir + 'fund_morning_season_abnormal.csv', mode) as csv_file:
with open(self.output_dir + 'fund_morning_quarter_abnormal.csv', mode) as csv_file:
csv_file.write(write_data)
self.lock.release()

@ -33,3 +33,30 @@ class FundInsert:
update_values=update_values[0:-1]
)
return sql_insert
def insert_fund_manger_info(self, manager_dict):
self.lock.acquire()
manager_sql_insert = self.generate_insert_sql(
manager_dict, 'fund_morning_manager', ['id', 'manager_id', 'name'])
self.cursor.execute(manager_sql_insert,
tuple(manager_dict.values()))
self.connect_instance.commit()
self.lock.release()
def fund_quarterly_info(self, quarterly_dict):
self.lock.acquire()
quarterly_sql_insert = self.generate_insert_sql(
quarterly_dict, 'fund_morning_quarter', ['id', 'quarter_index', 'fund_code'])
self.cursor.execute(quarterly_sql_insert,
tuple(quarterly_dict.values()))
self.connect_instance.commit()
self.lock.release()
def fund_stock_info(self, stock_dict):
self.lock.acquire()
stock_sql_insert = self.generate_insert_sql(
stock_dict, 'fund_morning_stock_info', ['id', 'quarter_index', 'fund_code'])
self.cursor.execute(stock_sql_insert,
tuple(stock_dict.values()))
self.connect_instance.commit()
self.lock.release()

@ -19,6 +19,7 @@ if __name__ == '__main__':
page_start = 3600
page_limit = 10000
fund_query = FundQuery()
# 获取所有的A类基金
all_a_results = fund_query.select_all_a_class_fund(
page_start, page_limit) # 获取查询的所有记录
for i in range(0, len(all_a_results)):

@ -8,7 +8,7 @@ def get_star_count(morning_star_url):
import numpy as np
import requests
from PIL import Image
module_path = os.path.dirname(__file__)
module_path = os.getcwd() + '/src'
temp_star_url = module_path + '/assets/star/tmp.gif'
r = requests.get(morning_star_url)
with open(temp_star_url, "wb") as f:

Loading…
Cancel
Save