feat: output crawler abnormal record

main
jackluson 2 years ago
parent 57de7dfe2e
commit 904af59ea3

@ -22,6 +22,7 @@ from sql_model.fund_insert import FundInsert
from sql_model.fund_query import FundQuery
from utils.driver import create_chrome_driver
from utils.index import bootstrap_thread
from utils.file_op import read_error_code_from_json, write_fund_json_data
from utils.login import login_morning_star
# 利用api获取同类基金的资产
@ -44,13 +45,17 @@ def get_total_asset(fund_code, platform):
def acquire_fund_quarter():
lock = Lock()
each_fund_query = FundQuery()
idWorker = IdWorker()
result_dir = './output/'
fund_csv = FundCSV(result_dir)
fund_csv.write_season_catch_fund(True)
fund_csv.write_abnormal_url_fund(True)
# result_dir = './output/'
# fund_csv = FundCSV(result_dir)
# fund_csv.write_season_catch_fund(True)
# fund_csv.write_abnormal_url_fund(True)
err_info = read_error_code_from_json()
error_funds_with_page = err_info.get('error_funds_with_page')
error_funds_with_found_date = err_info.get('error_funds_with_found_date')
error_funds_with_unmatch = err_info.get('error_funds_with_unmatch')
filename = err_info.get('filename')
file_dir = err_info.get('file_dir')
def crawlData(start, end):
login_url = 'https://www.morningstar.cn/membership/signin.aspx'
chrome_driver = create_chrome_driver()
@ -62,29 +67,30 @@ def acquire_fund_quarter():
results = each_fund_query.select_quarter_fund(
page_start, page_limit)
for record in results:
sleep(1)
# 0P000179WG
# 001811 中欧明睿新常态混合A
each_fund = FundSpider(
record[0], record[1], record[2], chrome_driver)
fund_code = record[0]
if fund_code in error_funds_with_page or fund_code in error_funds_with_found_date or fund_code in error_funds_with_unmatch:
print('error fund: ', fund_code)
continue
each_fund = FundSpider(fund_code, record[1], record[2], chrome_driver)
each_fund.set_found_data(record[3])
is_error_page = each_fund.go_fund_url()
# 是否能正常跳转到基金详情页没有的话写入csv,退出当前循环
if is_error_page == True:
# error_funds.append(each_fund.fund_code)
fund_infos = [each_fund.fund_code, each_fund.morning_star_code,
each_fund.fund_name, record[3], page_start, '页面跳转有问题']
output_line = ', '.join(str(x)
for x in fund_infos) + '\n'
fund_csv.write_abnormal_url_fund(False, output_line)
# fund_infos = [each_fund.fund_code, each_fund.morning_star_code,
# each_fund.fund_name, record[3], page_start, '页面跳转有问题']
# output_line = ', '.join(str(x)
# for x in fund_infos) + '\n'
# fund_csv.write_abnormal_url_fund(False, output_line)
error_funds_with_page.append(each_fund.fund_code)
continue
# 开始爬取数据
quarter_index = each_fund.get_quarter_index() # 数据更新时间,如果不一致,不爬取下面数据
if quarter_index != each_fund.quarter_index:
print('quarter_index', quarter_index, each_fund.update_date,
each_fund.fund_code, each_fund.fund_name)
# print('quarter_index', quarter_index, each_fund.update_date,
# each_fund.fund_code, each_fund.fund_name)
error_funds_with_unmatch.append(each_fund.fund_code)
continue
each_fund.get_fund_season_info() # 基本季度性数据
@ -95,14 +101,14 @@ def acquire_fund_quarter():
if each_fund.stock_position['total'] != '0.00' and each_fund.total_asset != None:
each_fund.get_asset_composition_info()
# 爬取过程中是否有异常,有的话存在csv中
if each_fund._is_trigger_catch == True:
fund_infos = [each_fund.fund_code, each_fund.morning_star_code,
each_fund.fund_name, record[3],
each_fund.stock_position['total'],
page_start, each_fund._catch_detail]
output_line = ', '.join(str(x)
for x in fund_infos) + '\n'
fund_csv.write_season_catch_fund(False, output_line)
# if each_fund._is_trigger_catch == True:
# fund_infos = [each_fund.fund_code, each_fund.morning_star_code,
# each_fund.fund_name, record[3],
# each_fund.stock_position['total'],
# page_start, each_fund._catch_detail]
# output_line = ', '.join(str(x)
# for x in fund_infos) + '\n'
# fund_csv.write_season_catch_fund(False, output_line)
# 入库
lock.acquire()
snow_flake_id = idWorker.get_id()
@ -157,7 +163,6 @@ def acquire_fund_quarter():
'morning_star_rating_5': each_fund.morning_star_rating.get(5),
'morning_star_rating_10': each_fund.morning_star_rating.get(10),
}
# 入库十大股票持仓
stock_position_total = each_fund.stock_position.get(
'total', '0.00')
@ -192,7 +197,6 @@ def acquire_fund_quarter():
item_code = item[0]
if item_code == each_fund.fund_code:
continue
print("item_code", item_code, platform)
total_asset = get_total_asset(item_code, platform)
if total_asset != None:
init_total_asset = init_total_asset - total_asset
@ -233,25 +237,22 @@ def acquire_fund_quarter():
raise BaseException
chrome_driver.close()
thread_count = 6
# for count in range(6):
total_start_time = time()
# record_total = each_fund_query.select_quarter_fund_total() # 获取记录条数
# print("record_total", record_total)
# bootstrap_thread(crawlData, record_total, thread_count)
for i in range(3):
print("i", i)
start_time = time()
record_total = each_fund_query.select_quarter_fund_total() # 获取记录条数
for i in range(2):
start_time = time()
print('record_total', record_total)
try:
bootstrap_thread(crawlData, record_total, thread_count)
except:
cur_total = each_fund_query.select_quarter_fund_total() # 获取记录条数
print('crawler item count:', record_total - cur_total)
record_total = cur_total
end_time = time()
print("耗时: {:.2f}".format(end_time - start_time))
end_time = time()
print("耗时: {:.2f}".format(end_time - start_time))
write_fund_json_data({'error_funds_with_page': error_funds_with_page, 'error_funds_with_found_date': error_funds_with_found_date, 'error_funds_with_unmatch': error_funds_with_unmatch}, filename=filename, file_dir=file_dir)
total_end_time = time()
print("total耗时: {:.2f}".format(total_end_time - total_start_time))
exit()

@ -11,7 +11,7 @@ Copyright (c) 2022 Camel Lu
import sys
sys.path.append('./src')
from sqlalchemy import and_
from sqlalchemy import and_, or_
from sqlalchemy.orm import Session
from models.fund import FundBase, FundQuarter
@ -53,9 +53,12 @@ def query_empty_company_and_found_date_fund(start, size):
all_funds = session.query(FundBase).where(FundBase.company == None, FundBase.found_date == None, FundBase.is_archive==0).offset(start).limit(size).all()
return all_funds
def query_empty_company_or_found_date_fund(start, size):
all_funds = session.query(FundBase).where(FundBase.is_archive==0).filter(or_(FundBase.company == None, FundBase.found_date == None)).offset(start).limit(size).all()
return all_funds
if __name__ == '__main__':
quarter_index = '2022-Q2'
# fund_list = query_high_score_funds(quarter_index)
query_empty_company_and_found_date_fund(2, 10)
query_empty_company_or_found_date_fund(0, 5000)
# print("fund_list",fund_list)

@ -61,14 +61,20 @@ class FundSpider:
self.morning_star_code
self._chrome_driver.get(morning_fund_selector_url) # 打开爬取页面
sleep(6)
sleep(5)
# 判断是否页面出错,重定向,如果是的话跳过
if self._chrome_driver.current_url == 'https://www.morningstar.cn/errors/defaulterror.html':
return True
while self._chrome_driver.page_source == None:
if 'Value cannot be null' in self._chrome_driver.title:
return True
try_count = 5
while self._chrome_driver.page_source == None and try_count > 0:
self._chrome_driver.refresh()
print('wait:fund_code', self.fund_code)
sleep(9)
try_count -= 1
if self._chrome_driver.page_source == None:
return True
return False
# self._chrome_driver.execute_script('location.reload()')
@ -140,7 +146,7 @@ class FundSpider:
# 获取基金经理信息(多位在任基金经理,只需第一位)
def get_fund_manager_info(self):
manager_ele_list = self._chrome_driver.find_element(By.ID,
'qt_manager').find_elements_by_xpath("ul")
'qt_manager').find_elements(By.XPATH, "ul")
for manager_ele in manager_ele_list:
try:
# 基金经理

@ -14,11 +14,14 @@ from bs4 import BeautifulSoup
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from crud.query import query_all_fund, query_empty_company_and_found_date_fund
from models.fund import FundBase
from crud.query import (query_all_fund,
query_empty_company_and_found_date_fund,
query_empty_company_or_found_date_fund)
from fund_info.crawler import FundSpider
from utils.index import bootstrap_thread
from models.fund import FundBase
from utils.driver import create_chrome_driver, text_to_be_present_in_element
from utils.file_op import read_error_code_from_json, write_fund_json_data
from utils.index import bootstrap_thread
from utils.login import login_morning_star
@ -94,8 +97,13 @@ def sync_fund_base(page_index):
print('end')
def further_complete_base_info():
all_funds = query_empty_company_and_found_date_fund(0, 10000)
error_funds = []
all_funds = query_empty_company_or_found_date_fund(0, 10000)
err_info = read_error_code_from_json()
error_funds_with_page = err_info.get('error_funds_with_page')
error_funds_with_found_date = err_info.get('error_funds_with_found_date')
error_funds_with_unmatch = err_info.get('error_funds_with_unmatch')
filename = err_info.get('filename')
file_dir = err_info.get('file_dir')
def crawlData(start, end):
login_url = 'https://www.morningstar.cn/membership/signin.aspx'
chrome_driver = create_chrome_driver()
@ -109,19 +117,21 @@ def further_complete_base_info():
# results = query_empty_company_and_found_date_fund(page_start, page_limit)
for record in results:
fund_code = record.fund_code
if fund_code in error_funds_with_page or fund_code in error_funds_with_found_date:
continue
morning_star_code = record.morning_star_code
fund_name = record.fund_name
each_fund = FundSpider(fund_code, morning_star_code, fund_name, chrome_driver)
# 是否能正常跳转到基金详情页
is_error_page = each_fund.go_fund_url()
if is_error_page == True:
error_funds.append(each_fund.fund_code)
error_funds_with_page.append(each_fund.fund_code)
continue
each_fund.get_fund_base_info()
# 去掉没有成立时间的
if each_fund.found_date == '-' or each_fund.found_date == None:
# lock.acquire()
error_funds.append(each_fund.fund_code)
error_funds_with_found_date.append(each_fund.fund_code)
# lock.release()
continue
# 拼接sql需要的数据
@ -138,7 +148,12 @@ def further_complete_base_info():
print('page_start', page_start)
page_start = page_start + page_limit
chrome_driver.close()
bootstrap_thread(crawlData, len(all_funds), 3)
try:
bootstrap_thread(crawlData, len(all_funds), 6)
write_fund_json_data({'error_funds_with_page': error_funds_with_page, 'error_funds_with_found_date': error_funds_with_found_date, 'error_funds_with_unmatch': error_funds_with_unmatch}, filename=filename, file_dir=file_dir)
except:
write_fund_json_data({'error_funds_with_page': error_funds_with_page, 'error_funds_with_found_date': error_funds_with_found_date, 'error_funds_with_unmatch': error_funds_with_unmatch}, filename=filename, file_dir=file_dir)
if __name__ == '__main__':
#127, 300, 600-
page_index = 1

@ -7,12 +7,15 @@ Author: luxuemin2108@gmail.com
-----
Copyright (c) 2021 Camel Lu
'''
import json
import os
import time
import pandas as pd
from openpyxl import load_workbook
from .index import get_last_quarter_str
# 写json文件
def write_fund_json_data(data, filename, file_dir=None):
@ -30,8 +33,6 @@ def write_fund_json_data(data, filename, file_dir=None):
def read_dir_all_file(path):
return os.listdir(path)
def update_xlsx_file(path, df_data, sheet_name):
try:
if os.path.exists(path):
@ -110,3 +111,23 @@ def update_xlsx_file_with_insert(path, df_data, sheet_name, index = 0):
except BaseException:
print("path", path)
raise BaseException('更新excel失败')
def read_error_code_from_json():
quarter_index = get_last_quarter_str()
filename = 'error_funds_' + quarter_index + '.json'
file_dir = './output/json/'
error_funds_with_page = []
error_funds_with_unmatch = []
error_funds_with_found_date = []
if os.path.exists(file_dir + filename):
with open(file_dir + filename) as json_file:
my_data = json.load(json_file)
error_funds_with_page = my_data.get('error_funds_with_page')
error_funds_with_found_date = my_data.get('error_funds_with_found_date')
return {
"file_dir": file_dir,
"filename": filename,
'error_funds_with_unmatch': error_funds_with_unmatch,
'error_funds_with_page': error_funds_with_page,
'error_funds_with_found_date': error_funds_with_found_date
}

Loading…
Cancel
Save