Skip to content

Instantly share code, notes, and snippets.

@versae
Created March 5, 2018 20:52
Show Gist options
  • Save versae/b4631fda466670206f240b7eb2712d45 to your computer and use it in GitHub Desktop.
Save versae/b4631fda466670206f240b7eb2712d45 to your computer and use it in GitHub Desktop.
NREGA scrapping
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
# coding: utf-8
# # NREGA Scrapping
#
# ## Setup the driver
# In[1]:
import itertools
import time
import os
import numpy as np
from selenium import webdriver
from selenium.webdriver.support.ui import Select
from selenium.webdriver.chrome.options import Options
from tqdm import tqdm
download_directory = os.path.join(os.path.expanduser("~"), "Downloads", "nrega")
options = webdriver.ChromeOptions()
options.set_headless(headless=True)
options.add_argument("--incognito")
options.add_argument("--disable-extensions")
options.add_experimental_option("prefs", {
"download.default_directory": download_directory,
"download.prompt_for_download": False,
"download.directory_upgrade": True,
"safebrowsing.enabled": True
})
# FirefoxProfile fxProfile = new FirefoxProfile();
# fxProfile.setPreference("browser.download.folderList",2);
# fxProfile.setPreference("browser.download.manager.showWhenStarting",false);
# fxProfile.setPreference("browser.download.dir","c:\\mydownloads");
# fxProfile.setPreference("browser.helperApps.neverAsk.saveToDisk","text/csv");
# In[2]:
def click_all(elements, delay=0):
for element in elements:
try:
element.location_once_scrolled_into_view # also scrolls to element
element.click()
except:
pass
time.sleep(delay)
# In[3]:
def expand_click(lis, delay=0):
for li in lis:
click_all((element for element in li.find_elements_by_css_selector(".accordion")
if element.text.strip() != "-"), delay)
# In[4]:
def select_by_id(element_id, value=None, text=None):
select_element = browser.find_element_by_id(element_id)
select_element.location_once_scrolled_into_view
if value:
Select(select_element).select_by_value(str(value))
if text:
Select(select_element).select_by_visible_text(str(text))
# In[5]:
def enable_download_in_headless_chrome(browser, download_dir):
# https://bugs.chromium.org/p/chromium/issues/detail?id=696481#c86
# add missing support for chrome "send_command" to selenium webdriver
browser.command_executor._commands["send_command"] = ("POST", '/session/$sessionId/chromium/send_command')
params = {'cmd': 'Page.setDownloadBehavior', 'params': {'behavior': 'allow', 'downloadPath': download_dir}}
browser.execute("send_command", params)
# In[6]:
browser = webdriver.Chrome(chrome_options=options)
# ## Regions
# In[7]:
def select_regions(state_names=None, delay=0):
select_by_id("regionselect", text="GP")
container = browser.find_element_by_id("middlecontainer")
if state_names:
states = [li for li in container.find_elements_by_class_name("statebox") if li.text[2:] in state_names]
else:
states = [li for li in container.find_elements_by_class_name("statebox")]
expand_click(states, delay)
# run it twice each since sometimes the requests fail
expand_click(container.find_elements_by_class_name("districtbox"), delay)
expand_click(container.find_elements_by_class_name("districtbox"), delay)
expand_click(container.find_elements_by_class_name("blockbox"), delay)
expand_click(container.find_elements_by_class_name("blockbox"), delay)
for li in states:
state_input = li.find_element_by_tag_name("input")
state_input.click()
# ## Indicators, years, and download
# In[8]:
def select_indicators(person_days=None, age_range=None, month=None):
for panel in ("UpdatePanelmiddle", "UpdatePanelleft", "UpdatePanelright"):
click_all(browser.find_element_by_id(panel).find_elements_by_css_selector("input"))
if person_days:
select_by_id("DdlstTxtBox1", text=person_days)
else:
click_all(browser.find_elements_by_id("TxtBox1"))
if age_range:
select_by_id("DdlstTxtBox9", text=age_range)
else:
click_all(browser.find_elements_by_id("TxtBox9"))
if month:
for month_select in ("DdlstTxtBox5", "DdlstTxtBox6", "DdlstTxtBox7", "DdlstTxtBox8"):
select_by_id(month_select, text=month)
else:
for month_select in ("TxtBox5", "TxtBox6", "TxtBox7", "TxtBox8"):
click_all(browser.find_elements_by_id(month_select))
# In[9]:
def select_year(year):
select_by_id("DdlstFinYear", value=year)
# ## Main
# In[10]:
url = "http://nregarep2.nic.in/netnrega/dynamic2/DynamicReport_new4.aspx"
state_names = ('Andaman and Nicobar', 'Andhra Pradesh', 'Arunachal Pradesh', 'Assam', 'Bihar', 'Chhattisgarh', 'Dadra & Nagar Haveli', 'Daman & Diu', 'Goa', 'Gujarat', 'Haryana', 'Himachal Pradesh', 'Jammu and Kashmir', 'Jharkhand', 'Karnataka', 'Kerala', 'Lakshadweep', 'Madhya Pradesh', 'Maharashtra', 'Manipur', 'Meghalaya', 'Mizoram', 'Nagaland', 'Odisha', 'Puducherry', 'Punjab', 'Rajasthan', 'Sikkim', 'Tamil Nadu', 'Tripura', 'Uttar Pradesh', 'Uttarakhand', 'West Bengal')
years = [str(y) for y in range(2011, 2018)] # 2011 doesn't quite work
persons_days = ("", ">14", "1-10", "11-20", "21-30", "31-40", "41-50", "51-60", "61-70", "71-80", "81-99", "100", ">100")
age_ranges = ("", "18-30 yrs", "30-40 yrs", "40-50 yrs", "50-60 yrs", ">60 yrs")
months = ("", "April", "May", "June", "July", "August", "September", "October", "November", "December", "January", "February", "March")
indicator_list = [state_names, years, months, persons_days, age_ranges]
indicators = itertools.product(*indicator_list)
def main(indicators, url, total=None, click_delay=0.5, download_delay=5):
count = 0
print(f"Total form submissions to make: {total or 0}")
for indicator in indicators:
count += 1
log_set = set()
browser.get(url)
state_name, year, month, persons_day, age_range, *_ = indicator
enable_download_in_headless_chrome(browser, download_directory)
select_year(year)
select_indicators(persons_day, age_range, month)
header = "{count:5}. {indicator}... ".format(
count=count,
indicator=", ".join(filter(bool, indicator))
)
print(header, end="")
select_regions([state_name], delay=click_delay)
logs = browser.get_log("browser")
is_severe = False
if logs:
for log in logs:
if (log["level"] == "SEVERE"
and "favicon" not in log["message"]
and log["message"] not in log_set):
is_severe = True
print(f"\n\tERROR: {log['message']}", end="")
log_set.add(log["message"])
if is_severe:
print("\n\t", "-" * len(header), sep="")
# download
browser.find_element_by_id("dwnldDummy").click()
time.sleep(download_delay)
filepath = os.path.join(download_directory, "report.xls")
if os.path.isfile(filepath):
os.rename(filepath, os.path.join(download_directory, f"{header[:-4].strip()}.xls"))
print("OK")
elif not is_severe:
print("FAIL")
print()
main(indicators, url, total=np.prod([m for m in map(len, indicator_list)]), click_delay=0.5, download_delay=5)
# In[ ]:
browser.quit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment