refactor: decople code

main
jackluson 2 years ago
parent 5fc2a7ae4d
commit 2f08f9822e

@ -34,7 +34,7 @@ def main():
10.高分基金\n \
输入")
if input_value == '1':
page_index = 0
page_index = 1
get_fund_list(page_index) # 执行申万行业信息入库
elif input_value == '2':
acquire_fund_base() # 执行行业股票信息入库

@ -8,12 +8,15 @@ Author: luxuemin2108@gmail.com
Copyright (c) 2020 Camel Lu
'''
from threading import Lock
from utils.login import login_morning_star
from utils.index import bootstrap_thread
from fund_info.crawler import FundSpider
from lib.mysnowflake import IdWorker
from sql_model.fund_query import FundQuery
from sql_model.fund_insert import FundInsert
from sql_model.fund_query import FundQuery
from utils.driver import create_chrome_driver
from utils.index import bootstrap_thread
from utils.login import login_morning_star
def acquire_fund_base():
lock = Lock()
@ -30,7 +33,9 @@ def acquire_fund_base():
def crawlData(start, end):
login_url = 'https://www.morningstar.cn/membership/signin.aspx'
chrome_driver = login_morning_star(login_url, False)
chrome_driver = create_chrome_driver()
login_morning_star(chrome_driver, login_url, False)
page_start = start
page_limit = 10
# 遍历从基金列表的单支基金

@ -9,18 +9,20 @@ Author: luxuemin2108@gmail.com
Copyright (c) 2020 Camel Lu
'''
from pprint import pprint
from threading import Lock, current_thread
from time import sleep, time
from pprint import pprint
from fund_info.crawler import FundSpider
from fund_info.api import FundApier
from fund_info.crawler import FundSpider
from fund_info.csv import FundCSV
from lib.mysnowflake import IdWorker
from utils.login import login_morning_star
from utils.index import bootstrap_thread
from sql_model.fund_query import FundQuery
from sql_model.fund_insert import FundInsert
from models.manager import Manager, ManagerAssoc
from sql_model.fund_insert import FundInsert
from sql_model.fund_query import FundQuery
from utils.driver import create_chrome_driver
from utils.index import bootstrap_thread
from utils.login import login_morning_star
# 利用api获取同类基金的资产
@ -48,7 +50,8 @@ def acquire_fund_quarter():
def crawlData(start, end):
login_url = 'https://www.morningstar.cn/membership/signin.aspx'
chrome_driver = login_morning_star(login_url, False)
chrome_driver = create_chrome_driver()
login_morning_star(chrome_driver, login_url, False)
page_start = start
page_limit = 10
try:

@ -19,46 +19,25 @@ from time import sleep
import pandas as pd
from bs4 import BeautifulSoup
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.by import By
from db.connect import connect
from lib.mysnowflake import IdWorker
from utils.index import get_star_count, bootstrap_thread
from utils.index import get_star_count
from utils.login import login_morning_star
from utils.driver import create_chrome_driver, text_to_be_present_in_element
connect_instance = connect()
cursor = connect_instance.cursor()
def text_to_be_present_in_element(locator, text, next_page_locator):
""" An expectation for checking if the given text is present in the
specified element.
locator, text -- 判读是否当前页一致没有的话切换上一页下一页操作
"""
def _predicate(driver):
try:
element_text = driver.find_element_by_xpath(locator).text
if int(element_text) != int(text):
# 跳转指定的js执行代码
js_content = "javascript:__doPostBack('ctl00$cphMain$AspNetPager1','{}')".format(
text)
execute_return = driver.execute_script(js_content)
print('execute_return', execute_return)
sleep(5)
return text == element_text
except:
return False
return _predicate
def get_fund_list(page_index):
morning_fund_selector_url = "https://www.morningstar.cn/fundselect/default.aspx"
chrome_driver = login_morning_star(morning_fund_selector_url, False)
chrome_driver = create_chrome_driver()
login_morning_star(chrome_driver, morning_fund_selector_url, False)
page_count = 25 # 晨星固定分页数
page_total = math.ceil(int(chrome_driver.find_element_by_xpath(
page_total = math.ceil(int(chrome_driver.find_element(By.XPATH,
'/html/body/form/div[8]/div/div[4]/div[3]/div[2]/span').text) / page_count)
result_dir = './output/'
output_head = '代码' + ',' + '晨星专属号' + ',' + '名称' + ',' + \
@ -121,8 +100,8 @@ def get_fund_list(page_index):
# 基金分类
fund_cat.append(tds_text[2].string)
index = str(tr_index * 2 + 2 + i)
rating_3_img_ele_xpath = chrome_driver.find_element_by_xpath('//*[@id="ctl00_cphMain_gridResult"]/tbody/tr[' + index + ']/td[5]/img')
rating_5_img_ele_xpath = chrome_driver.find_element_by_xpath('//*[@id="ctl00_cphMain_gridResult"]/tbody/tr[' + index + ']/td[6]/img')
rating_3_img_ele_xpath = chrome_driver.find_element(By.XPATH, '//*[@id="ctl00_cphMain_gridResult"]/tbody/tr[' + index + ']/td[5]/img')
rating_5_img_ele_xpath = chrome_driver.find_element(By.XPATH, '//*[@id="ctl00_cphMain_gridResult"]/tbody/tr[' + index + ']/td[6]/img')
# 三年评级 //*[@id="ctl00_cphMain_gridResult"]/tbody/tr[2]/td[7]/img
# rating = None
rating_3_img_ele = tds_text[3].find_all('img')[0]
@ -156,7 +135,7 @@ def get_fund_list(page_index):
csv_file.write(output_line)
# 获取下一页元素
next_page = chrome_driver.find_element_by_xpath(
next_page = chrome_driver.find_element(By.XPATH,
xpath_str)
# 点击下一页
next_page.click()
@ -168,5 +147,5 @@ def get_fund_list(page_index):
if __name__ == "__main__":
page_index = 1
page_index = 127
fund_list = get_fund_list(page_index)

@ -9,9 +9,11 @@ Copyright (c) 2022 Camel Lu
'''
import sys
sys.path.append('./src')
from sqlalchemy.orm import Session
from sqlalchemy import and_
from sqlalchemy.orm import Session
from models.fund import FundBase, FundQuarter
from models.manager import ManagerAssoc
from models.var import engine
@ -35,6 +37,17 @@ def query_high_score_funds(quarter_index):
).filter(rule).all()
return res
def query_all_fund():
all_funds = session.query(FundBase).all()
all_fund_dict = {}
for fund in all_funds:
all_fund_dict[fund.fund_code] = {
'fund_code': fund.fund_code,
'morning_star_code': fund.morning_star_code,
'fund_name': fund.fund_name,
'fund_cat': fund.fund_cat,
}
return all_fund_dict
if __name__ == '__main__':
quarter_index = '2022-Q2'

@ -10,8 +10,9 @@ Copyright (c) 2020 Camel Lu
import re
from datetime import datetime, timedelta, date
from time import sleep, time
from time import sleep
from utils.index import get_star_count, get_quarter_index, get_last_quarter_str
from selenium.webdriver.common.by import By
from selenium.common.exceptions import NoSuchElementException
@ -73,8 +74,8 @@ class FundSpider:
def get_element_text_by_class_name(self, class_name, parent_id):
try:
text = self._chrome_driver.find_element_by_id(
parent_id).find_element_by_class_name(class_name).text
text = self._chrome_driver.find_element(By.ID,
parent_id).find_element(By.CLASS_NAME, class_name).text
return text if text != '-' else None
except NoSuchElementException:
self._is_trigger_catch = True
@ -90,7 +91,7 @@ class FundSpider:
def get_element_text_by_id(self, id):
try:
text = self._chrome_driver.find_element_by_id(
text = self._chrome_driver.find_element(By.ID,
id).text
return text if text != '-' else None
except NoSuchElementException:
@ -108,10 +109,10 @@ class FundSpider:
try:
text = '-'
if parent_el == None:
text = self._chrome_driver.find_element_by_xpath(xpath).text if parent_id == None else self._chrome_driver.find_element_by_id(
parent_id).find_element_by_xpath(xpath).text
text = self._chrome_driver.find_element(By.XPATH, xpath).text if parent_id == None else self._chrome_driver.find_element(By.ID,
parent_id).find_element(By.XPATH, xpath).text
else:
text = parent_el.find_element_by_xpath(xpath).text
text = parent_el.find_element(By.XPATH, xpath).text
return text if text != '-' else None
except NoSuchElementException:
self._is_trigger_catch = True
@ -138,12 +139,12 @@ class FundSpider:
# 获取基金经理信息(多位在任基金经理,只需第一位)
def get_fund_manager_info(self):
manager_ele_list = self._chrome_driver.find_element_by_id(
manager_ele_list = self._chrome_driver.find_element(By.ID,
'qt_manager').find_elements_by_xpath("ul")
for manager_ele in manager_ele_list:
try:
# 基金经理
manager_name = manager_ele.find_element_by_xpath(
manager_name = manager_ele.find_element(By.XPATH,
"li[@class='col1']/a").text
# 仅仅记录目前在职的
if '[离任]' in manager_name:
@ -151,14 +152,14 @@ class FundSpider:
manager = dict()
manager['name'] = manager_name
manager_id = re.findall(
r'(?<=managerid=)(\w+)$', manager_ele.find_element_by_xpath("li[@class='col1']/a").get_attribute('href')).pop(0)
r'(?<=managerid=)(\w+)$', manager_ele.find_element(By.XPATH, "li[@class='col1']/a").get_attribute('href')).pop(0)
if not manager_id:
continue
manager['manager_id'] = manager_id
manager['manager_start_date'] = manager_ele.find_element_by_xpath(
manager['manager_start_date'] = manager_ele.find_element(By.XPATH,
"li[@class='col1']/i").text[0:10]
manager['brife'] = manager_ele.find_element_by_xpath(
manager['brife'] = manager_ele.find_element(By.XPATH,
"li[@class='col2']").text
self.manager_list.append(manager)
@ -172,14 +173,14 @@ class FundSpider:
def get_fund_morning_rating(self):
try:
qt_el = self._chrome_driver.find_element_by_id('qt_star')
rating_3_img_ele = qt_el.find_element_by_xpath(
qt_el = self._chrome_driver.find_element(By.ID, 'qt_star')
rating_3_img_ele = qt_el.find_element(By.XPATH,
"//li[@class='star3']/img")
rating_3_src = rating_3_img_ele.get_attribute('src')
rating_5_img_ele = qt_el.find_element_by_xpath(
rating_5_img_ele = qt_el.find_element(By.XPATH,
"//li[@class='star5']/img")
rating_5_src = rating_5_img_ele.get_attribute('src')
rating_10_img_ele = qt_el.find_element_by_xpath(
rating_10_img_ele = qt_el.find_element(By.XPATH,
"//li[@class='star10']/img")
rating_10_src = rating_10_img_ele.get_attribute('src')
@ -219,14 +220,14 @@ class FundSpider:
def get_fund_qt_rating(self):
try:
qt_el = self._chrome_driver.find_element_by_id('qt_rating')
rating_2_src = qt_el.find_element_by_xpath(
qt_el = self._chrome_driver.find_element(By.ID, 'qt_rating')
rating_2_src = qt_el.find_element(By.XPATH,
"//li[5]/img").get_attribute('src')
rating_3_src = qt_el.find_element_by_xpath(
rating_3_src = qt_el.find_element(By.XPATH,
"li[6]/img").get_attribute('src')
rating_5_src = qt_el.find_element_by_xpath(
rating_5_src = qt_el.find_element(By.XPATH,
"li[7]/img").get_attribute('src')
rating_10_src = qt_el.find_element_by_xpath(
rating_10_src = qt_el.find_element(By.XPATH,
"li[8]/img").get_attribute('src')
# //*[@id="qt_rating"]/li[6]/img
rating_2 = re.findall(
@ -291,8 +292,8 @@ class FundSpider:
self.bond_position["five"] = five_bond_position
# 获取标准差
# standard_deviation = self._chrome_driver.find_element_by_id(
# "qt_risk").find_element_by_xpath('li[16]').text
# standard_deviation = self._chrome_driver.find_element(By.ID,
# "qt_risk").find_element(By.XPATH, 'li[16]').text
standard_deviation = self.get_element_text_by_xpath(
'li[16]', 'qt_risk')
if standard_deviation != None:
@ -321,8 +322,8 @@ class FundSpider:
def get_asset_composition_info(self):
# 判断是否含有股票持仓
li_elements = self._chrome_driver.find_element_by_id(
'qt_stock').find_elements_by_xpath("li")
li_elements = self._chrome_driver.find_element(By.ID,
'qt_stock').find_elements(By.XPATH ,"li")
for index in range(4, len(li_elements) - 1, 4):
temp_stock_info = dict() # 一只股票信息
stock_base = re.split('\.|\s', li_elements[index].text)

@ -9,6 +9,7 @@ Copyright (c) 2021 Camel Lu
'''
import requests
from selenium import webdriver
from selenium.webdriver.common.by import By
chrome_options = webdriver.ChromeOptions()
@ -23,7 +24,7 @@ def get_tiantian_fund_list(chrome_driver):
# # print("res", res.text)
chrome_driver.get(fund_list_url)
fund_list_code_str = chrome_driver.find_element_by_tag_name("pre").text
fund_list_code_str = chrome_driver.find_element(By.TAG_NAME, "pre").text
return_value_code_str = ";return {\
fund_list: r \
};"
@ -44,7 +45,7 @@ content_text = chrome_driver.page_source
fund_item_code_str = chrome_driver.find_element_by_tag_name("pre").text
fund_item_code_str = chrome_driver.find_element(By.TAG_NAME, "pre").text
execute_return_item = chrome_driver.execute_script(fund_item_code_str + return_value_code_str)
print("execute_return", execute_return_item)

@ -9,15 +9,20 @@ Author: luxuemin2108@gmail.com
-----
Copyright (c) 2020 Camel Lu
'''
import re
import decimal
import re
from functools import cmp_to_key
from pprint import pprint
import pandas as pd
import numpy as np
import pandas as pd
from fund_info.statistic import FundStatistic
from utils.index import get_last_quarter_str, get_stock_market, find_from_list_of_dict, update_xlsx_file, update_xlsx_file_with_sorted, update_xlsx_file_with_insert
from utils.file_op import read_dir_all_file
from utils.file_op import (read_dir_all_file, update_xlsx_file,
update_xlsx_file_with_insert,
update_xlsx_file_with_sorted)
from utils.index import (find_from_list_of_dict, get_last_quarter_str,
get_stock_market)
def get_fund_code_pool(condition_dict):

@ -7,11 +7,15 @@ Author: luxuemin2108@gmail.com
-----
Copyright (c) 2021 Camel Lu
'''
from datetime import timedelta, date
from utils.index import get_last_quarter_str, update_xlsx_file_with_insert
from crud.query import query_high_score_funds
from datetime import date, timedelta
import pandas as pd
from crud.query import query_high_score_funds
from utils.file_op import update_xlsx_file_with_insert
from utils.index import get_last_quarter_str
def output_high_score_funds(quarter_index=None):
"""
输出高分基金

@ -8,11 +8,13 @@ Author: luxuemin2108@gmail.com
Copyright (c) 2022 Camel Lu
'''
import sys
sys.path.append('./src')
from sqlalchemy import Table
from sqlalchemy.orm import relationship
from sqlalchemy import Table
from models.var import prefix, ORM_Base, engine
from lib.mysnowflake import IdWorker
from models.var import Model, ORM_Base, engine, prefix
fund_base_tablename = prefix + 'base'
fund_quarter_tablename = prefix + 'quarter'
@ -20,9 +22,23 @@ fund_quarter_tablename = prefix + 'quarter'
fund_base_table = Table(fund_base_tablename, ORM_Base.metadata, autoload=True, autoload_with=engine)
fund_quarter_table = Table(fund_quarter_tablename, ORM_Base.metadata, autoload=True, autoload_with=engine)
class FundBase(ORM_Base):
idWorker = IdWorker()
class FundBase(ORM_Base, Model):
__table__ = fund_base_table
def __init__(self, **kwargs):
self.id = idWorker.get_id()
column_keys = self.__table__.columns.keys()
udpate_data = dict()
for key in kwargs.keys():
if key not in column_keys:
continue
else:
udpate_data[key] = kwargs[key]
ORM_Base.__init__(self, **udpate_data)
Model.__init__(self, **kwargs, id = self.id)
def __repr__(self):
return f"Fund Base(id={self.id!r}, name={self.fund_code!r}, manager_id={self.fund_name!r})"

@ -8,13 +8,15 @@ Author: luxuemin2108@gmail.com
Copyright (c) 2022 Camel Lu
'''
import sys
sys.path.append('./src')
from sqlalchemy import (BigInteger, Column, Date, DateTime, ForeignKey,
Integer, String, Table, UniqueConstraint, func, text)
from sqlalchemy.orm import registry, relationship
from sqlalchemy import UniqueConstraint, Table, Column, Integer, BigInteger, String, ForeignKey, text, DateTime, Date, func
from db.engine import get_engine
from models.var import prefix, ORM_Base, engine, Model
from lib.mysnowflake import IdWorker
from models.var import Model, ORM_Base, engine, prefix
manager_table_name = prefix + 'manager'
manager_table = Table(manager_table_name, ORM_Base.metadata, autoload=True, autoload_with=engine)

@ -17,6 +17,7 @@ from models.var import ORM_Base, engine, Model
class Quarter(ORM_Base, Model):
__tablename__ = 'quarter'
__table_args__ = {'extend_existing': True}
id = Column(Integer, primary_key=True)
quarter_index = Column(String(12), nullable=False, unique=True)
start_time = Column(Date(), nullable=False, unique=True)
@ -26,7 +27,16 @@ class Quarter(ORM_Base, Model):
UniqueConstraint(quarter_index, name='uix_1')
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
column_keys = self.__table__.columns.keys()
udpate_data = dict()
for key in kwargs.keys():
if key not in column_keys:
continue
else:
udpate_data[key] = kwargs[key]
ORM_Base.__init__(self, **udpate_data)
Model.__init__(self, **kwargs, id = self.id)
@validates('end_time')
def validate_start_time(self, key, end_time):
@ -39,7 +49,7 @@ class Quarter(ORM_Base, Model):
return end_time
def __repr__(self):
return f"Quarter(id={self.id!r}, name={self.quarter_index!r})"
return f"Quarter(name={self.quarter_index!r})"
def create():
ORM_Base.metadata.create_all(engine)

@ -0,0 +1,47 @@
'''
Desc:
File: /driver.py
File Created: Tuesday, 1st November 2022 10:38:28 pm
Author: luxuemin2108@gmail.com
-----
Copyright (c) 2022 Camel Lu
'''
from time import sleep
from selenium.webdriver.common.by import By
from selenium import webdriver
def create_chrome_driver():
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument("--no-sandbox")
# chrome_options.add_argument('--headless')
# chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_driver = webdriver.Chrome(options=chrome_options)
chrome_driver.set_page_load_timeout(12000)
return chrome_driver
def text_to_be_present_in_element(locator, text, next_page_locator):
""" An expectation for checking if the given text is present in the
specified element.
locator, text -- 判读是否当前页一致没有的话切换上一页下一页操作
"""
def _predicate(driver):
try:
element_text = driver.find_element(By.XPATH, locator).text
if int(element_text) != int(text):
# 跳转指定的js执行代码
js_content = "javascript:__doPostBack('ctl00$cphMain$AspNetPager1','{}')".format(
text)
execute_return = driver.execute_script(js_content)
print('execute_return', execute_return)
sleep(5)
return text == element_text
except:
return False
return _predicate

@ -7,8 +7,11 @@ Author: luxuemin2108@gmail.com
-----
Copyright (c) 2021 Camel Lu
'''
import time
import os
import time
import pandas as pd
from openpyxl import load_workbook
# 写json文件
@ -26,3 +29,84 @@ def write_fund_json_data(data, filename, file_dir=None):
def read_dir_all_file(path):
return os.listdir(path)
def update_xlsx_file(path, df_data, sheet_name):
try:
if os.path.exists(path):
writer = pd.ExcelWriter(path, engine='openpyxl')
book = load_workbook(path)
# 表名重复,删掉,重写
if sheet_name in book.sheetnames:
del book[sheet_name]
if len(book.sheetnames) == 0:
df_data.to_excel(
path, sheet_name=sheet_name)
return
else:
writer.book = book
df_data.to_excel(
writer, sheet_name=sheet_name)
writer.save()
writer.close()
else:
df_data.to_excel(
path, sheet_name=sheet_name)
except BaseException:
print("path", path)
raise BaseException('更新excel失败')
def update_xlsx_file_with_sorted(path, df_data, sheet_name, sorted_sheetnames = []):
try:
if os.path.exists(path):
writer = pd.ExcelWriter(path, engine='openpyxl')
workbook = load_workbook(path)
writer.book = workbook
writer.sheets = {ws.title:ws for ws in workbook.worksheets}
for sheet_item in sorted_sheetnames:
del workbook[sheet_item]
df_data.to_excel(
writer, sheet_name=sheet_name)
workbook = writer.book
for worksheet in sorted_sheetnames:
workbook._add_sheet(writer.sheets.get(worksheet))
writer.book = workbook
writer.save()
writer.close()
else:
df_data.to_excel(
path, sheet_name=sheet_name)
except BaseException:
print("path", path)
raise BaseException('更新excel失败')
def update_xlsx_file_with_insert(path, df_data, sheet_name, index = 0):
try:
if os.path.exists(path):
writer = pd.ExcelWriter(path, engine='openpyxl')
workbook = load_workbook(path)
if sheet_name in workbook.sheetnames:
del workbook[sheet_name]
writer.book = workbook
df_data.to_excel(
writer, sheet_name=sheet_name)
workbook = writer.book
writer.sheets = {ws.title:ws for ws in workbook.worksheets}
# workbook.remove(sheet_name)
del workbook[sheet_name]
workbook._add_sheet(writer.sheets.get(sheet_name), index)
writer.book = workbook
writer.save()
writer.close()
else:
df_data.to_excel(
path, sheet_name=sheet_name)
except BaseException:
print("path", path)
raise BaseException('更新excel失败')

@ -1,17 +1,15 @@
import time
import datetime
import os
import re
import time
from threading import Lock, Thread
import numpy as np
import requests
from PIL import Image
from sewar.full_ref import sam, uqi
from skimage import io
from sewar.full_ref import uqi, sam
import re
from threading import Thread, Lock
import pandas as pd
from openpyxl import load_workbook
requests.adapters.DEFAULT_RETRIES = 10 # 增加重连次数
s = requests.session()
@ -39,6 +37,7 @@ def use_sewar_get_star_level(img_path):
return level
print('res_uqi:', res_uqi, 'res_sam:', res_sam)
raise "img_path 图片比较失败"
def lock_process(func):
lock = Lock()
@ -49,14 +48,12 @@ def lock_process(func):
return result
return wrapper
def debug(func):
def wrapper(self, *args): # 指定一毛一样的参数
print("[DEBUG]: enter {}()".format(func.__name__))
return func(self, *args)
return wrapper # 返回包装过函数
def get_star_count_with_sewar(fund_code, img_ele):
picture_time = time.strftime(
"%Y-%m-%d-%H_%M_%S", time.localtime(time.time()))
@ -77,7 +74,6 @@ def get_star_count_with_sewar(fund_code, img_ele):
else:
raise "截图失败"
def get_star_count_with_np(morning_star_url):
module_path = os.getcwd() + '/src'
temp_star_url = module_path + '/assets/star/tmp.gif'
@ -107,8 +103,6 @@ def get_star_count(morning_star_url, fund_code, img_ele=None):
print('图片相似度比较失败')
return get_star_count_with_np(morning_star_url)
def parse_csv(datafile):
data = []
with open(datafile, "r") as f:
@ -126,7 +120,6 @@ def parse_csv(datafile):
return data
def get_quarter_index(input_date):
year = time.strftime("%Y", time.localtime())
boundary_date_list = ['03-31', '06-30', '09-30', '12-31']
@ -198,85 +191,6 @@ def get_stock_market(stock_code):
return '其他'
def update_xlsx_file(path, df_data, sheet_name):
try:
if os.path.exists(path):
writer = pd.ExcelWriter(path, engine='openpyxl')
book = load_workbook(path)
# 表名重复,删掉,重写
if sheet_name in book.sheetnames:
del book[sheet_name]
if len(book.sheetnames) == 0:
df_data.to_excel(
path, sheet_name=sheet_name)
return
else:
writer.book = book
df_data.to_excel(
writer, sheet_name=sheet_name)
writer.save()
writer.close()
else:
df_data.to_excel(
path, sheet_name=sheet_name)
except BaseException:
print("path", path)
raise BaseException('更新excel失败')
def update_xlsx_file_with_sorted(path, df_data, sheet_name, sorted_sheetnames = []):
try:
if os.path.exists(path):
writer = pd.ExcelWriter(path, engine='openpyxl')
workbook = load_workbook(path)
writer.book = workbook
writer.sheets = {ws.title:ws for ws in workbook.worksheets}
for sheet_item in sorted_sheetnames:
del workbook[sheet_item]
df_data.to_excel(
writer, sheet_name=sheet_name)
workbook = writer.book
for worksheet in sorted_sheetnames:
workbook._add_sheet(writer.sheets.get(worksheet))
writer.book = workbook
writer.save()
writer.close()
else:
df_data.to_excel(
path, sheet_name=sheet_name)
except BaseException:
print("path", path)
raise BaseException('更新excel失败')
def update_xlsx_file_with_insert(path, df_data, sheet_name, index = 0):
try:
if os.path.exists(path):
writer = pd.ExcelWriter(path, engine='openpyxl')
workbook = load_workbook(path)
if sheet_name in workbook.sheetnames:
del workbook[sheet_name]
writer.book = workbook
df_data.to_excel(
writer, sheet_name=sheet_name)
workbook = writer.book
writer.sheets = {ws.title:ws for ws in workbook.worksheets}
# workbook.remove(sheet_name)
del workbook[sheet_name]
workbook._add_sheet(writer.sheets.get(sheet_name), index)
writer.book = workbook
writer.save()
writer.close()
else:
df_data.to_excel(
path, sheet_name=sheet_name)
except BaseException:
print("path", path)
raise BaseException('更新excel失败')
def bootstrap_thread(target_fn, total, thread_count=2):
threaders = []
start_time = time.time()

@ -10,7 +10,7 @@ Copyright (c) 2021 Camel Lu
import time
import os
from dotenv import load_dotenv
from selenium.webdriver.common.by import By
from .cookies import set_cookies
load_dotenv()
@ -60,16 +60,17 @@ def mock_login_site(chrome_driver, site_url, redirect_url=None):
'?ReturnUrl=' + redirect_url
chrome_driver.get(site_url)
time.sleep(2)
from selenium.webdriver.support import expected_conditions as EC
username = chrome_driver.find_element_by_id('emailTxt')
password = chrome_driver.find_element_by_id('pwdValue')
username = chrome_driver.find_element(By.ID, 'emailTxt')
password = chrome_driver.find_element(By.ID, 'pwdValue')
# username = chrome_driver.find_element_by_id('emailTxt')
# password = chrome_driver.find_element_by_id('pwdValue')
env_username = os.getenv('morning_star_username')
env_password = os.getenv('morning_star_password')
username.send_keys(env_username)
password.send_keys(env_password)
submit = chrome_driver.find_element_by_id('loginGo')
submit = chrome_driver.find_element(By.ID, 'loginGo')
# submit = chrome_driver.find_element_by_id('loginGo')
submit.click()
# check_code = chrome_driver.find_element_by_id('txtCheckCode')
# count = 1
# flag = True
# while count < 10 and flag:
@ -78,7 +79,6 @@ def mock_login_site(chrome_driver, site_url, redirect_url=None):
# time.sleep(1)
# check_code.send_keys(code)
# time.sleep(3)
# submit = chrome_driver.find_element_by_id('loginGo')
# submit.click()
# # 通过弹窗判断验证码是否正确
# time.sleep(3)
@ -104,13 +104,7 @@ def mock_login_site(chrome_driver, site_url, redirect_url=None):
return True
def login_morning_star(redirect_url, is_cookies_login=False):
from selenium import webdriver
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument('--headless')
chrome_driver = webdriver.Chrome(options=chrome_options)
chrome_driver.set_page_load_timeout(12000)
def login_morning_star(chrome_driver, redirect_url, is_cookies_login=False):
"""
模拟登录,支持两种方式
1. 设置已经登录的cookie
@ -126,7 +120,8 @@ def login_morning_star(redirect_url, is_cookies_login=False):
chrome_driver, login_url, redirect_url)
if login_status:
print('login success')
return True
else:
print('login fail')
exit()
return chrome_driver
return False
# exit()

Loading…
Cancel
Save