feat:🚀fund data crawl

main
jackluson 4 years ago
commit 905b4362f4

13
.gitignore vendored

@ -0,0 +1,13 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.7 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.6 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.6 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.8 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.5 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.5 KiB

@ -0,0 +1,36 @@
# 晨星基金列表数据爬取
### 前言
晨星网,国际权威评级机构 Morningstar 的中国官方网站,所以它的基金评级是很有参考性的。该仓库则是爬取晨星网筛选列表,包括基金代码,基金专属代码,基金分类,三年评级,五年评级等,有了这些基本数据,为了爬取基金详情页,基金筛选等铺好数据基础。
基金爬取数据截图:
<img src="./screenshot/fund_result.png" style="zoom:50%;" />
### 技术点
- `selenium` 模拟登录, 切换分页
- `BeautifulSoup` 解析 HTML
- `pandas` 处理数据
- 工具 — 数据库用了`pymysql` , id 使用雪花 id验证码识别使用了`pytesseract`
### 爬虫流程
1. `selenium` 模拟登录:
- 可采用验证码识别方式
- 设置已经登录好的账号 cookies
2. 利用`BeautifulSoup` 解析 html提取当前页的基金列表信息存入到 mysql 中,或者追加到 csv 中
3. `selenium` 模拟切换分页,重复第二,第三步
4. 所有的页数据爬取完,退出浏览器
### 其他
涉及到一些细节有:
1. 验证码识别错误的话,怎么处理
2. 切换分页如果是最后一页时,怎么处理下一页点击
3. 晨星评级是用图片表示,如果转化成数字表示
4. 如何保证循环当前页与浏览器当前页一致
以上问题,我都做了相对应的处理,如果有问题的话,欢迎提 issue私聊star。

File diff suppressed because it is too large Load Diff

Binary file not shown.

After

Width:  |  Height:  |  Size: 406 KiB

@ -0,0 +1,193 @@
'''
Desc: 爬取晨星网基金列表数据支持保存成csv或者存入到mysql中
File: /acquire_fund_list.py
Project: src
File Created: Saturday, 26th December 2020 11:48:55 am
Author: luxuemin2108@gmail.com
-----
Copyright (c) 2020 Camel Lu
'''
import re
import math
import os
# import pymysql
from time import sleep
from bs4 import BeautifulSoup
import pandas as pd
from selenium.webdriver.support.ui import WebDriverWait
from lib.mysnowflake import IdWorker
from utils import parse_cookiestr, set_cookies, login_site, get_star_count
# connect = pymysql.connect(host='127.0.0.1', user='root',
# password='rootroot', db='fund_work', charset='utf8')
# cursor = connect.cursor()
'''
判读是否当前页一致没有的话切换上一页下一页操作
'''
def text_to_be_present_in_element(locator, text, next_page_locator):
""" An expectation for checking if the given text is present in the
specified element.
locator, text
"""
def _predicate(driver):
try:
element_text = driver.find_element_by_xpath(locator).text
# 比给定的页码小的话,触发下一页
if int(element_text) < int(text):
print(element_text, text)
next_page = driver.find_element_by_xpath(
next_page_locator)
# driver.refresh()
next_page.click()
sleep(5)
# 比给定的页码大的话,触发上一页
elif int(element_text) > int(text):
print(element_text, text)
prev_page = driver.find_element_by_xpath(
'/html/body/form/div[8]/div/div[4]/div[3]/div[3]/div[1]/a[2]')
# driver.refresh()
prev_page.click()
sleep(5)
return text == element_text
except:
return False
return _predicate
def get_fund_list(cookie_str=None):
from selenium import webdriver
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument("--no-sandbox")
chrome_driver = webdriver.Chrome(options=chrome_options)
chrome_driver.set_page_load_timeout(12000) # 防止页面加载个没完
morning_fund_selector_url = "https://www.morningstar.cn/fundselect/default.aspx"
# "https://cn.morningstar.com/quickrank/default.aspx"
"""
模拟登录,支持两种方式
1. 设置已经登录的cookie
2. 输入账号密码验证码登录验证码识别正确率30%识别识别支持重试
"""
if cookie_str:
set_cookies(chrome_driver, morning_fund_selector_url, cookie_str)
else:
morning_cookies = ""
if morning_cookies == "":
login_status = login_site(chrome_driver, morning_fund_selector_url)
if login_status:
print('login success')
sleep(3)
else:
print('login fail')
exit()
# 获取网站cookie
morning_cookies = chrome_driver.get_cookies()
else:
chrome_driver.get(morning_fund_selector_url) # 再次打开爬取页面
print(chrome_driver.get_cookies()) # 打印设置成功的cookie
# 定义起始页码
page_num = 1
page_count = 25
page_num_total = math.ceil(int(chrome_driver.find_element_by_xpath(
'/html/body/form/div[8]/div/div[4]/div[3]/div[2]/span').text) / page_count)
# 爬取共306页
result_dir = './output/'
output_head = '代码' + ',' + '晨星专属号' + ',' + '名称' + ',' + \
'类型' + ',' + '三年评级' + ',' + '五年评级' + ',' + '今年回报率' + '\n'
# 设置表头
if page_num == 1:
with open(result_dir + 'fund_morning_star.csv', 'w+') as csv_file:
csv_file.write(output_head)
while page_num <= page_num_total:
# 求余
remainder = page_num_total % 10
# 判断是否最后一页
num = (remainder +
2) if page_num > (page_num_total - remainder) else 12
xpath_str = '/html/body/form/div[8]/div/div[4]/div[3]/div[3]/div[1]/a[%s]' % (
num)
print('page_num', page_num)
# 等待直到当前页样式判断等于page_num
WebDriverWait(chrome_driver, timeout=600).until(text_to_be_present_in_element(
"/html/body/form/div[8]/div/div[4]/div[3]/div[3]/div[1]/span[@style='margin-right:5px;font-weight:Bold;color:red;']", str(page_num), xpath_str))
sleep(1)
# 列表用于存放爬取的数据
id_list = [] # 雪花id
code_list = [] # 基金代码
morning_star_code_list = [] # 晨星专属代码
name_list = [] # 基金名称
fund_cat = [] # 基金分类
fund_rating_3 = [] # 晨星评级(三年)
fund_rating_5 = [] # 晨星评级(五年)
rate_of_return = [] # 今年以来汇报(%
# 获取每页的源代码
data = chrome_driver.page_source
# 利用BeautifulSoup解析网页源代码
bs = BeautifulSoup(data, 'lxml')
class_list = ['gridItem', 'gridAlternateItem'] # 数据在这两个类下面
# 取出所有类的信息,并保存到对应的列表里
for i in range(len(class_list)):
for tr in bs.find_all('tr', {'class': class_list[i]}):
# 雪花id
worker = IdWorker()
id_list.append(worker.get_id())
tds_text = tr.find_all('td', {'class': "msDataText"})
tds_nume = tr.find_all('td', {'class': "msDataNumeric"})
# 基金代码
code_a_element = tds_text[0].find_all('a')[0]
code_list.append(code_a_element.string)
# 从href中匹配出晨星专属代码
current_morning_code = re.findall(
r'(?<=/quicktake/)(\w+)$', code_a_element.get('href')).pop(0)
# 晨星基金专属晨星码
morning_star_code_list.append(current_morning_code)
name_list.append(tds_text[1].find_all('a')[0].string)
# 基金分类
fund_cat.append(tds_text[2].string)
# 三年评级
rating = get_star_count(tds_text[3].find_all('img')[0]['src'])
fund_rating_3.append(rating)
# 5年评级
rating = get_star_count(tds_text[4].find_all('img')[0]['src'])
fund_rating_5.append(rating)
# 今年以来回报(%)
return_value = tds_nume[3].string if tds_nume[3].string != '-' else None
rate_of_return.append(return_value)
print('数据准备完毕')
fund_df = pd.DataFrame({'fund_code': code_list, 'morning_star_code': morning_star_code_list, 'fund_name': name_list, 'fund_cat': fund_cat,
'fund_rating_3': fund_rating_3, 'fund_rating_5': fund_rating_5, 'rate_of_return': rate_of_return})
sql_insert = "replace into fund_morning_star(`id`, `fund_code`,`morning_star_code`, `fund_name`, `fund_cat`, `fund_rating_3`, `fund_rating_5`, `rate_of_return`) values(%s, %s, %s, %s, %s, %s, %s, %s)"
fund_list = fund_df.values.tolist()
# cursor.executemany(sql_insert, fund_list)
# connect.commit()
# sql_insert = "insert into fund_morning_star(`fund_code`, `fund_name`, `fund_cat`, `fund_rate_3`, `fund_rate_5`, `rate_of_return`) values(%s, %s, %s, %s, %s, %s)"
# ALTER TABLE fund_morning_star MODIFY COLUMN update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
print('fund_list', fund_list)
with open(result_dir + 'fund_morning_star.csv', 'a') as csv_file:
for fund_item in fund_list:
output_line = ', '.join(str(x) for x in fund_item) + '\n'
csv_file.write(output_line)
# 获取下一页元素
next_page = chrome_driver.find_element_by_xpath(
xpath_str)
# 点击下一页
next_page.click()
page_num += 1
chrome_driver.close()
print('end')
# chrome_driver.close()
if __name__ == "__main__":
cookie_str = 'Hm_lvt_eca85e284f8b74d1200a42c9faa85464=1610788772; user=username=18219112108@163.com&nickname=camel-lu&status=Free&password=KFPJOQuxD1w=; MS_LocalEmailAddr=18219112108@163.com=; ASP.NET_SessionId=0aenwime2ljio155dogxybev; Hm_lvt_eca85e284f8b74d1200a42c9faa85464=; MSCC=GUflpfSQOVM=; authWeb=5220F774042557D9FA31A08FA717CB8DE74F5016A9ADDB25C269FBB69C7DF340D59E6E63061444FE0B93DBB4F5AAFA6B1D21155C3FAA68C79992F39B9986630AEB6F674F242B6B792693ABB6162784CA329333200C2BBDD44021A1F38E80A363F157CD24D4D0E527C3E8F23E3DEA13C5D9950FF5; Hm_lpvt_eca85e284f8b74d1200a42c9faa85464=1613479786'
fund_list = get_fund_list()

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 KiB

@ -0,0 +1,102 @@
import time
# 64位ID的划分
WORKER_ID_BITS = 5
DATACENTER_ID_BITS = 5
SEQUENCE_BITS = 12
# 最大取值计算
MAX_WORKER_ID = -1 ^ (-1 << WORKER_ID_BITS) # 2**5-1 0b11111
MAX_DATACENTER_ID = -1 ^ (-1 << DATACENTER_ID_BITS)
# 移位偏移计算
WOKER_ID_SHIFT = SEQUENCE_BITS
DATACENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS
TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATACENTER_ID_BITS
# 序号循环掩码
SEQUENCE_MASK = -1 ^ (-1 << SEQUENCE_BITS)
# Twitter元年时间戳
TWEPOCH = 1611065536271
class InvalidSystemClock(Exception):
"""
时钟回拨异常
"""
pass
class IdWorker(object):
"""
用于生成IDs
"""
def __init__(self, datacenter_id=0, worker_id=0, did_wid=-1, sequence=0):
"""
初始化
:param datacenter_id: 数据中心机器区域ID
:param worker_id: 机器ID
:param did_wid: 数据中心和机器ID合并成10位二进制,用十进制0-1023表示,通过算法会拆分成datacenter_id和worker_id
:param sequence: 起始序号
"""
if did_wid > 0:
datacenter_id = did_wid >> 5
worker_id = did_wid ^ (datacenter_id << 5)
# sanity check
if worker_id > MAX_WORKER_ID or worker_id < 0:
raise ValueError('worker_id值越界')
if datacenter_id > MAX_DATACENTER_ID or datacenter_id < 0:
raise ValueError('datacenter_id值越界')
self.worker_id = worker_id
self.datacenter_id = datacenter_id
self.sequence = sequence
self.last_timestamp = -1 # 上次计算的时间戳
def _gen_timestamp(self):
"""
生成整数时间戳
:return:int timestamp
"""
return int(time.time() * 1000)
def get_ids(self, count):
ids = []
for i in range(count):
ids.append(self.get_id())
return ids
def get_id(self):
"""
获取新ID
:return:
"""
timestamp = self._gen_timestamp()
# 时钟回拨
if timestamp < self.last_timestamp:
print('clock is moving backwards. Rejecting requests until {}'.format(
self.last_timestamp))
raise InvalidSystemClock
if timestamp == self.last_timestamp:
self.sequence = (self.sequence + 1) & SEQUENCE_MASK
if self.sequence == 0:
timestamp = self._til_next_millis(self.last_timestamp)
else:
self.sequence = 0
self.last_timestamp = timestamp
new_id = ((timestamp - TWEPOCH) << TIMESTAMP_LEFT_SHIFT) | (self.datacenter_id << DATACENTER_ID_SHIFT) | \
(self.worker_id << WOKER_ID_SHIFT) | self.sequence
return new_id
def _til_next_millis(self, last_timestamp):
"""
等到下一毫秒
"""
timestamp = self._gen_timestamp()
while timestamp <= last_timestamp:
timestamp = self._gen_timestamp()
return timestamp
# if __name__ == '__main__':
# worker = IdWorker()
# print(worker.get_id())

@ -0,0 +1,133 @@
from urllib import parse
import time
import os
def parse_cookiestr(cookie_str, split_str="; "):
cookielist = []
for item in cookie_str.split(split_str):
cookie = {}
itemname = item.split('=')[0]
iremvalue = item.split('=')[1]
cookie['name'] = itemname
cookie['value'] = parse.unquote(iremvalue)
cookielist.append(cookie)
return cookielist
def set_cookies(chrome_driver, url, cookie_str):
chrome_driver.get(url)
# 2.需要先获取一下url不然使用add_cookie会报错这里有点奇怪
cookie_list = parse_cookiestr(cookie_str)
chrome_driver.delete_all_cookies()
for i in cookie_list:
cookie = {}
# 3.对于使用add_cookie来说参考其函数源码注释需要有name,value字段来表示一条cookie有点生硬
cookie['name'] = i['name']
cookie['value'] = i['value']
# 4.这里需要先删掉之前那次访问时的同名cookie不然自己设置的cookie会失效
# chrome_driver.delete_cookie(i['name'])
# 添加自己的cookie
# print('cookie', cookie)
chrome_driver.add_cookie(cookie)
chrome_driver.refresh()
def identify_verification_code(chrome_driver, id="checkcodeImg"):
# 生成年月日时分秒时间
picture_time = time.strftime(
"%Y-%m-%d-%H_%M_%S", time.localtime(time.time()))
directory_time = time.strftime("%Y-%m-%d", time.localtime(time.time()))
# 获取到当前文件的目录,并检查是否有 directory_time 文件夹,如果不存在则自动新建 directory_time 文件
try:
file_Path = os.getcwd() + '/' + directory_time + '/'
if not os.path.exists(file_Path):
os.makedirs(file_Path)
print("目录新建成功:%s" % file_Path)
else:
print("目录已存在!!!")
except BaseException as msg:
print("新建目录失败:%s" % msg)
try:
from selenium.webdriver.common.by import By
ele = chrome_driver.find_element(By.ID, id)
code_path = './' + directory_time + '/' + picture_time + '_code.png'
url = ele.screenshot(code_path)
if url:
print("%s :截图成功!!!" % url)
from PIL import Image
image = Image.open(code_path)
# image.show()
import pytesseract
custom_oem_psm_config = '--oem 0 --psm 13 digits'
identify_code = pytesseract.image_to_string(
image, config=custom_oem_psm_config)
code = "".join(identify_code.split())
return code
else:
raise Exception('截图失败,不能保存')
except Exception as pic_msg:
print("截图失败:%s" % pic_msg)
def get_star_count(morning_star_url):
import numpy as np
import requests
from PIL import Image
module_path = os.path.dirname(__file__)
temp_star_url = module_path + '/assets/star/tmp.gif'
r = requests.get(morning_star_url)
with open(temp_star_url, "wb") as f:
f.write(r.content)
f.close()
path = module_path + '/assets/star/star'
# path = './assets/star/star'
for i in range(6):
p1 = np.array(Image.open(path+str(i)+'.gif'))
p2 = np.array(Image.open(temp_star_url))
if (p1 == p2).all():
return i
def login_site(chrome_driver, site_url):
chrome_driver.get(site_url)
time.sleep(2)
from selenium.webdriver.support import expected_conditions as EC
username = chrome_driver.find_element_by_id('emailTxt')
password = chrome_driver.find_element_by_id('pwdValue')
check_code = chrome_driver.find_element_by_id('txtCheckCode')
username.send_keys('18219112108@163.com')
password.send_keys('w780880')
count = 1
flag = True
while count < 10 and flag:
code = identify_verification_code(chrome_driver)
check_code.clear()
time.sleep(1)
check_code.send_keys(code)
time.sleep(3)
submit = chrome_driver.find_element_by_id('loginGo')
submit.click()
# 通过弹窗判断验证码是否正确
time.sleep(3)
from selenium.webdriver.common.by import By
# message_container = chrome_driver.find_element_by_id('message-container')
try:
message_box = chrome_driver.find_element_by_id(
'message-container')
flag = message_box.is_displayed()
if flag:
close_btn = message_box.find_element(
By.CLASS_NAME, "modal-close")
close_btn.click()
time.sleep(1)
print('flag', flag)
except:
return True
if count > 10:
return False
return True
Loading…
Cancel
Save