#!/usr/bin/env python
# -*- coding: utf-8 -*-
import click
import urllib.request
import json
import os
import glob
import csv
import datetime
import io
import xlwt
import sys
import yaml
import shutil
import hashlib
import zlib
from lxml.html import fromstring, etree
from tabulate import tabulate
from pymongo import MongoClient, DESCENDING, ASCENDING
from urllib.parse import urljoin
import requests
NALOG_URL = 'https://www.nalog.ru'
FORMS_POSTFIX = '/related_activities/statistics_and_analytics/forms/'
STORAGE_PATH = 'files'
DATA_PATH = 'data'
FORMS_PATH = 'forms'
STATS_URL = 'https://www.nalog.ru/rn77/related_activities/statistics_and_analytics/regstats/'
NALOG_DB = 'fns'
NALOG_STAT_COLL = 'statreports'
NALOG_STAT_FILES_COLL = 'statfiles'
MONGO_SERVER = '127.0.0.1'
MONGO_PORT = 27017
def get_tag_data(tag):
item = {'tag': tag.tag, 'text':tag.text.encode('utf8') if tag.text else '', 'tail' : tag.tail.encode('utf8') if tag.tail else ''}
for key in ['id', 'class']:
item[key] = tag.attrib[key].encode('utf8') if tag.attrib.has_key(key) else ""
return item
def dump_tag_tree(tag, tagpath=None, level=1, data=None):
taglist = []
if data is None:
data = []
if tagpath is None:
tagpath = tag.tag
item = get_tag_data(tag)
item['tagpath'] = tagpath
item['level'] = str(level)
data.append(item)
tagtypes = {}
chl = level+1
for child in tag.getchildren():
if child.tag in tagtypes.keys():
tagtypes[child.tag] += 1
else:
tagtypes[child.tag] = 0
chtagpath = '%s/%s[%d]' % (tagpath, child.tag, tagtypes[child.tag])
data = dump_tag_tree(child, chtagpath, chl, data)
return data
def _get_text(tag):
texts = []
for t in tag.itertext():
t = t.replace('\n', ' ').strip()
if len(t) > 0:
texts.append(t)
return ' '.join(texts)
def collectfiles():
statsurl = STATS_URL
outputfilename = os.path.join(DATA_PATH, 'regstats.csv')
print(statsurl)
data = requests.get(statsurl).text
table = None
jslist = etree.HTML(data).xpath("//table[@class='border_table']")
colltable = etree.HTML(data).xpath("//table[@style='border-collapse: collapse; width: 960px;']")
if len(jslist) > 0:
table = jslist[0]
elif len(colltable) > 0:
table = colltable[0]
else:
tables = etree.HTML(data).xpath("//table")
if len(tables) > 2:
table = tables[2]
category = None
records = []
# print(table)
if table:
for tr in table.xpath('tbody/tr'):
tds = tr.xpath('td')
if len(tds) == 0: continue
if len(tds) == 1:
category = _get_text(tds[0])
elif len(tds) == 2:
category = _get_text(tds[1])
elif len(tds) > 3:
id = _get_text(tds[0])
reportname = _get_text(tds[1])
# print(': '+ ' '.join(tds[1].itertext()).replace('\n', ' ').strip())
reportkey = _get_text(tds[2]).strip()
hrefs = []
for path in ['a', 'div/a', 'div/div/a', 'p/a', 'div/span/span/a', 'div/span/span/span/a', 'span/span/span/a', 'p/span/span/a', 'span/span/a', 'span/a']:
hrefs.extend(tds[3].xpath(path))
# keys = ['level', 'tag', 'tagpath', 'id', 'class', 'text', 'tail']
# print(tabulate(dump_tag_tree(tds[3])))
for h in hrefs:
year = _get_text(h)
if h.attrib.has_key('href') and year is not None:
absurl = urljoin(NALOG_URL, h.attrib['href'])
item = {'id' : id,'year': year.strip(';').strip().strip(';'), 'url' : absurl, 'reportkey' : reportkey, 'reportname' : reportname, 'category' : category}
# print(item)
records.append(item)
writer = csv.DictWriter(open(outputfilename, 'w', encoding='utf8'), fieldnames=['id', 'category', 'reportname', 'reportkey', 'year', 'url'])
writer.writeheader()
for r in records:
writer.writerow(r)
# print('%s\t%s' % (o.attrib['href'] if o.attrib.has_key('href') else '', o.text.encode('utf-8') if o.text else ""))
# print(len(jslist))
def download_all():
client = MongoClient(MONGO_SERVER, MONGO_PORT)
db = client['fns']
reports_coll = db['statreports']
files_coll = db['statfiles']
forms_coll = db['statforms']
csvfiles = os.listdir(DATA_PATH)
for name in csvfiles:
outputfilename = os.path.join(FORMS_PATH, name.split('.', 1)[0] + '_forms.csv')
filedir = os.path.join(STORAGE_PATH, name.split('.', 1)[0])
if not os.path.exists(filedir):
os.makedirs(filedir)
reader = csv.DictReader(open(os.path.join(DATA_PATH, name), 'r', encoding='utf8'))
forms_data = []
for record in reader:
if record['url'].find('statistics_and_analytics/forms/') > -1 and not os.path.exists(outputfilename):
formid = record['url'].rsplit('/', 2)[-2]
print('Got forms, processing %s' %(record['url']))
forms_data.extend(_extract_forms(forms_coll, record))
print('---')
continue
filename = record['url'].rsplit('/', 1)[-1]
fileoutput = os.path.join(filedir, filename)
if os.path.exists(fileoutput):
print('Skipped. Already downloaded %s' %(record['url']))
continue
print('Filename %s downloading' % (record['url']))
try:
r = requests.get(record['url'], allow_redirects=True)
open(fileoutput, 'wb').write(r.content)
print('Filename %s saved' % (filename))
except:
print('Error processing %s' % (record['url']))
continue
if not os.path.exists(outputfilename):
writer = csv.DictWriter(open(outputfilename, 'w'), fieldnames=['repname', 'repurl', 'url'])
writer.writeheader()
for r in forms_data:
writer.writerow(r)
for record in forms_data:
filename = record['repurl'].rsplit('/', 1)[-1]
fileoutput = os.path.join(filedir, filename)
if os.path.exists(fileoutput):
print('Skipped. Already downloaded %s' %(record['repurl']))
continue
print('Filename %s downloading' % (record['repurl']))
try:
r = requests.get(record['repurl'], allow_redirects=True)
open(fileoutput, 'wb').write(r.content)
print('Filename %s saved' % (filename))
except:
print('Error processing %s' % (record['repurl']))
continue
# filename = get_filename_from_cd(r.headers.get('content-disposition'))
pass
def _store_file(files_coll, filedir, url):
try:
os.makedirs(filedir)
except:
pass
filename = (' '.join(url.rsplit('/', 1)[-1].split())).strip('"')
fileoutput = os.path.join(filedir, filename)
filedata = files_coll.find_one({'url': url})
if not filedata:
if os.path.exists(fileoutput):
h = hashlib.new('ripemd160')
h.update(url.encode('utf8'))
finfilename = h.hexdigest() + '.' + filename.rsplit('.', 1)[-1]
print('Dublicate file %s found, replace with %s' % (filename, finfilename))
filename = finfilename
fileoutput = os.path.join(filedir, filename)
print(' - downloading file %s ' %(filename))
try:
r = requests.get(url, allow_redirects=True)
open(fileoutput, 'wb').write(r.content)
filesize = os.path.getsize(fileoutput)
print('- filename %s downloaded' % (filename))
filedata = files_coll.find_one({'url': url})
if not filedata:
filedata = {'url' : url, 'filename' : filename, 'status' : 'downloaded', 'filesize' : filesize}
else:
filedata['status'] = 'downloaded'
except urllib.error.HTTPError:
print('Error processing %s' % (url))
filedata = files_coll.find_one({'url': url})
if not filedata:
filedata = {'url' : url, 'filename' : filename, 'status' : 'error'}
else:
filedata['status'] = 'error'
except requests.exceptions.ConnectionError:
print('Error processing %s' % (url))
filedata = files_coll.find_one({'url': url})
if not filedata:
filedata = {'url' : url, 'filename' : filename, 'status' : 'error'}
else:
filedata['status'] = 'error'
except requests.exceptions.InvalidURL:
print('Error processing %s' % (url))
filedata = files_coll.find_one({'url': url})
if not filedata:
filedata = {'url' : url, 'filename' : filename, 'status' : 'error'}
else:
filedata['status'] = 'error'
except requests.exceptions.MissingSchema:
print('Error processing %s' % (url))
filedata = files_coll.find_one({'url': url})
if not filedata:
filedata = {'url' : url, 'filename' : filename, 'status' : 'error'}
else:
filedata['status'] = 'error'
except requests.exceptions.InvalidSchema:
print('Error processing %s' % (url))
filedata = files_coll.find_one({'url': url})
if not filedata:
filedata = {'url' : url, 'filename' : filename, 'status' : 'error'}
else:
filedata['status'] = 'error'
files_coll.save(filedata)
def _cache_form(coll, files_coll, filedir, region, url):
item = coll.find_one({'url' : url})
if item:
if 'status' not in item.keys():
item['status'] = 'ok'
coll.save(item)
return item
form_id = url.rsplit('/')[-2]
item = {'url' : url, 'region' : region, 'form_id' : form_id}
cache_dir = os.path.join(FORMS_PATH, region)
try:
os.makedirs(cache_dir)
except:
pass
try:
try:
o = urllib.request.urlopen(url)
except urllib.error.HTTPError:
item['status'] = 'error'
coll.save(item)
return item
item['status'] = 'ok'
info = o.info()
if "Location" in info.keys():
item['isdocument'] = True
newurl = info['Location']
item['filename'] = newurl.rsplit('/', 1)[-1]
absurl = urljoin(NALOG_URL, info['Location'])
item['realurl'] = newurl
_store_file(files_coll, filedir, absurl)
else:
item['isdocument'] = False
item['filename'] = '%s.html' % (form_id)
item['fullpath'] = os.path.join(cache_dir, item['filename'])
coll.save(item)
data = o.read()
f = open(item['fullpath'], 'wb')
f.write(data)
f.close()
except KeyboardInterrupt:
return None
return item
def _extract_forms(coll, record):
# print(record['url'])
exts = ['xls', 'xlsx', 'rar', 'zip']
item = coll.find_one({'url' : record['url']})
forms = []
try:
if item is not None and item['status'] == 'ok':
data = open(item['fullpath'], 'rb').read()
else:
data = requests.get(record['url']).text
except urllib.error.HTTPError:
return forms
lis = etree.HTML(data).xpath("//a")
for a in lis:
if a.attrib.has_key('href'):
if len(a.attrib['href']) < 1: continue
ext = a.attrib['href'].rsplit('.', 1)[-1].lower()
if ext not in exts:
continue
absurl = urljoin(NALOG_URL, a.attrib['href'])
print(absurl)
forms.append({'repname' : _get_text(a), 'repurl' : absurl, 'url' : record['url']})
return forms
# print(tabulate(dump_tag_tree(li)))
def loaddata():
# reglist = listregions()
client = MongoClient(MONGO_SERVER, MONGO_PORT)
db = client['fns']
reports_coll = db['statreports']
files_coll = db['statfiles']
forms_coll = db['statforms']
csvfiles = os.listdir(DATA_PATH)
for name in csvfiles:
print('Processing %s' % (name))
region = name.split('.', 1)[0][2:]
filedir = os.path.join(STORAGE_PATH, name.split('.', 1)[0][2:])
reader = csv.DictReader(open(os.path.join(DATA_PATH, name), 'r', encoding='utf8'))
for record in reader:
o = reports_coll.find_one({'url': record['url']})
if o:
record['reportkey'] = record['reportkey'].strip()
record['region'] = region
record = o
parts = record['url'].rsplit('/', 4)
if parts[-4] == 'statistics_and_analytics' and parts[-3] == 'forms' and parts[-2].isdigit():
_cache_form(forms_coll, files_coll, filedir, name.split('.')[0][2:], record['url'])
print('Region %s formID %s cached' % (name.split('.')[0][2:], parts[-2]))
forms = _extract_forms(forms_coll, record)
rnum = 0
for form in forms:
item = {'region' : region}
if form['repurl'] == 'http://': continue
form_o = reports_coll.find_one({'url': form['repurl']})
# if form_o and 'processed' in form_o.keys() and form_o['processed']: continue
for k in record.keys():
item[k] = record[k]
item['url'] = form['repurl']
item['parent_url'] = record['url']
item['reportname'] = form['repname']
_store_file(files_coll, filedir, form['repurl'])
item['processed'] = True
reports_coll.save(item)
rnum += 1
record['files'] = rnum
else:
if o and 'processed' in o.keys() and o['processed']:
record['region'] = region
reports_coll.save(record)
continue
# forms_data.extend(_extract_forms(record))
_store_file(files_coll, filedir, record['url'])
record['processed'] = True
reports_coll.save(record)
@click.group()
def cli2():
pass
@cli2.command()
def collect():
"""Collect urls from nalog.ru"""
filelist = collectfiles()
pass
@click.group()
def cli3():
pass
@cli3.command()
def download():
"""Download all files"""
download_all()
pass
@click.group()
def cli4():
pass
@cli4.command()
def load():
"""Loads data to MongoDB and download all files"""
loaddata()
pass
@click.group()
def cli5():
pass
@cli5.command()
def dump():
"""Dumps tables from MongoDB"""
os.system('mongodump -v -d fns -o data')
pass
cli = click.CommandCollection(sources=[cli2, cli3, cli4, cli5])
if __name__ == '__main__':
cli()