web爬虫
import logging
from dataclasses import dataclass
from time import sleep
from typing import List
from selenium import webdriver
from selenium.webdriver.common.by import By
log = logging.getLogger('demo')
log.setLevel(logging.DEBUG)
class Node:
def __init__(self, data=None):
self.data: Item = data
self.children: List[Node] = []
def travel(self, node=None, depth=1, root=True):
if root:
node = self
if node is None:
return
yield node, depth
depth += 1
for child in node.children:
yield from self.travel(child, depth, False)
depth -= 1
def mindmap(self):
content = []
content.append('@startmindmap')
for node, depth in self.travel():
content.append(f'{"*" * depth} {node.data}')
content.append('@endmindmap')
return '\n'.join(content)
def __repr__(self):
return str(self.data)
class Stack:
def __init__(self):
self._data: List[Node] = []
def push(self, data):
self._data.append(data)
def pop(self):
return self._data.pop()
def top(self):
return self._data[-1]
def __contains__(self, item: Node):
for _item in self._data:
for child in _item.children:
if child.data.uuid() == item.data.uuid():
return True
return False
def pop_to_item(self, item: Node):
if len(self._data) <= 2:
return None
for _item in self._data[0:-2]:
for child in _item.children:
if child.data.uuid() == item.data.uuid():
log.debug(f'pop to {_item}')
while self.top().data.uuid() != _item.data.uuid():
self.pop()
return child
return None
@dataclass
class Item:
name: str = None
href: str = None
clicked: bool = False
click_index: int = 0
def uuid(self):
return f'{self.name} {self.href}'
# def __repr__(self):
# return f'{self.name} {self.href} {self.clicked}'
def test_crawler():
url = 'https://ceshiren.com'
current_status = {}
current_status['index'] = 0
driver = webdriver.Chrome()
driver.maximize_window()
driver.get(url)
root = Node(Item(href=url, name='', clicked=True))
stack = Stack()
def update_node(node):
# driver.execute_script('console.log("crawler")')
elements = driver.find_elements(by=By.CSS_SELECTOR, value='a')
element_map = {}
for element in elements[0:20]:
try:
if not element.is_displayed():
continue
href = element.get_property('href')
name = element.text
item = Item(name=name, href=href)
match_elements = [e for e in node.children if e.data.uuid() == item.uuid()]
element_map[item.uuid()] = element
if not match_elements:
node.children.append(Node(item))
except Exception as e:
log.warning(e)
return element_map
def crawler(node, element_map=None):
log.info(node.data)
stack.push(node)
if current_status['index'] % 10 == 0:
with open(f'{__name__}.puml', 'w') as f:
f.write(root.mindmap())
if element_map is None:
log.info(f'refresh {node}')
element_map = update_node(node)
log.info(f'{element_map}')
target_node = None
for child in node.children:
if not child.data.clicked and child.data.uuid() in element_map:
target_node = child
log.info(f'found {target_node}')
break
if target_node is not None:
parent_node = stack.pop_to_item(target_node)
if parent_node:
target_node = parent_node
log.info(f'back to {target_node}')
else:
log.info(element_map)
target_element = element_map[target_node.data.uuid()]
log.info(f'click {target_node}')
log.info(f'click {target_element.id}')
try:
target_node.data.clicked = True
if target_element.is_displayed():
target_element.click()
current_status['index'] += 1
target_node.data.click_index = current_status['index']
else:
log.warning('not displayed skipped')
except Exception as e:
log.warning(e)
crawler(target_node, None)
else:
log.info('all clicked')
crawler(root)
2 个赞