Skip to content

Instantly share code, notes, and snippets.

@erichoco
Last active May 1, 2017 04:43
Show Gist options
  • Save erichoco/8501e3b5c6f803a7a2291e036d633e23 to your computer and use it in GitHub Desktop.
Save erichoco/8501e3b5c6f803a7a2291e036d633e23 to your computer and use it in GitHub Desktop.
import os
import sys
import signal
import time
import csv
import re
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
from selenium.common.exceptions import NoSuchElementException
from selenium.common.exceptions import StaleElementReferenceException
def setup_info(url):
info = {}
info["messages"] = []
info["gifts"] = []
info["room_url"] = url
room_id = re.search("live/([0-9]+)\?rf", url)
if room_id:
info["room_id"] = room_id.group(1)
else:
info["room_id"] = ""
return info
def init_driver():
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument("--mute-audio")
driver = webdriver.Chrome('./chromedriver', chrome_options=chrome_options)
# calling driver.wait.until would wait 5s for the element to be loaded before throwing exception
driver.wait = WebDriverWait(driver, 5)
return driver
def load_chat_box(driver, url, class_name):
driver.get(url)
try:
box = driver.wait.until(EC.presence_of_element_located((By.CLASS_NAME, class_name)))
return True
except TimeoutException:
print("Live chat box not found.")
print("Check CSS class name of the chat box.")
return False
def crawl_messages(messages):
new_items = driver.find_elements_by_css_selector("li.live-chat-msg")
new_messages = []
for m in reversed(new_items):
try:
name = m.find_element_by_class_name("name").text
content = m.find_element_by_class_name("content").text
if not (name and content):
continue
# Find the last same message
if messages and messages[-1]["name"] == name and messages[-1]["message"] == content:
break
new_messages.insert(0, {
"time": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()),
"name": name,
"message": content})
except (NoSuchElementException, StaleElementReferenceException):
# No messages
pass
except:
print("[Messages] Exception:", sys.exc_info()[0])
raise
if new_messages:
print("==== New Messages ====", new_messages)
return messages + new_messages
def crawl_gifts(gifts):
new_items = driver.find_elements_by_css_selector("li.liveGiftEffectItem")
new_gifts = []
for it in new_items:
try:
div = it.find_element_by_css_selector("div.content")
name = div.find_element_by_class_name("name").text
content = div.find_element_by_tag_name("span").text
count = it.find_element_by_class_name("giftCount").text
if len(count) >= 2:
count = int(count[1:])
else:
continue
if not (name and content and count > 0):
continue
# Check last 3 gifts crawled (will only show <= 3 gifts on webpage)
# Update gift count to same gift found
for g in gifts[-3:]:
if g["name"] + g["gift"] == name + content:
g["count"] = count
break
else:
new_gifts.append({
"time": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()),
"name": name,
"gift": content,
"count": count})
except (NoSuchElementException, StaleElementReferenceException):
# No gift now
pass
except:
print("[Gifts] Exception:", sys.exc_info()[0])
raise
if new_gifts:
print("==== New Gifts ====")
print_gifts(new_gifts)
return gifts + new_gifts
def crawl_info(info):
try:
info["messages"] = crawl_messages(info["messages"])
info["gifts"] = crawl_gifts(info["gifts"])
except:
raise
return info
def print_gifts(gifts):
for g in gifts:
print("name: {} gift: {} count: {}".format(g["name"], g["gift"], g["count"]))
def dump_csv(info):
# create subfolder
path = "./data"
if len(info["room_id"]) > 0:
path += "-" + str(info["room_id"])
if not os.path.exists(path):
try:
os.makedirs(path)
except OSError as exc: # Guard against race condition
if exc.errno != errno.EEXIST:
raise
cur_time = time.strftime("%m-%d_%H:%M:%S", time.gmtime())
# save messages
with open(path + "/messages_" + cur_time + ".csv", "w", newline="", encoding="utf-8") as csvfile:
fieldnames = ["time", "name", "message"] # should be same as the keys in info["messages"]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames, restval="")
writer.writeheader()
for m in info["messages"]:
writer.writerow(m)
# save gifts
with open(path + "/gifts_" + cur_time + ".csv", "w", newline="", encoding="utf-8") as csvfile:
fieldnames = ["time", "name", "gift", "count"] # should be same as the keys in info["messages"]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames, restval="")
writer.writeheader()
for g in info["gifts"]:
writer.writerow(g)
if __name__ == "__main__":
room_url = "https://web.immomo.com/live/358053554?rf=683"
chat_box_class_name = "live-msg-list"
crawl_timeout = 1
info = setup_info(room_url)
driver = init_driver()
try:
if not load_chat_box(driver, room_url, chat_box_class_name):
driver.quit()
exit(0)
# start crawling chatroom info
while True:
time.sleep(crawl_timeout)
info = crawl_info(info)
except KeyboardInterrupt:
dump_csv(info)
driver.quit()
except Exception as e:
dump_csv(info)
driver.quit()
print(e)
@erichoco
Copy link
Author

erichoco commented May 1, 2017

Installation

  • pip3 install selenium
  • Download chromedriver and put in the working directory.

Usage

  1. Find URL of the broadcasting room
  2. Replace the value of room_url with the URL
  3. python3 scraper_momo.py
  4. Hit Ctrl-C when done, the scraped csv files will be in the folder data-[room id]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment