#!/usr/bin/env python
import click
import urllib
import json
import os
import hashlib
import zlib
import time
import csv
import logging
import datetime
import io
import sys
import requests
from urllib.request import urlopen, urlretrieve
from pprint import pprint
from pymongo import MongoClient, DESCENDING, ASCENDING
logging.getLogger().addHandler(logging.StreamHandler())
PAGE_SIZE = 10
PINGURL = 'http://gasu.gov.ru/gwt/stratplanning/rest/stratplanning/registry/list'
URLPAT = 'http://gasu.gov.ru/gwt/stratplanning/rest/stratplanning/registry/list'
FILE_URL_PAT = 'http://gasu.gov.ru/GASUServicesSpring/rest/document/downloadDoc/stratplanning/%s/%s'
SINGLEITEM_PAT = 'http://gasu.gov.ru/gwt/stratdocuments/rest/stratplanning/registry/getDocumentByRequestId/%s'
POSTTEMPLATE = '{"sortInfo":[{"sortField":"id", "sortDir":"DESC"}], "limit":%d, "offset":%d, "disjunction":false, "filters":[]}'
DATAPATH = '/mnt/c/data/stratplan'
#DATAPATH = 'C:\\data\\stratplan'
CACHEPATH = os.path.join(DATAPATH, 'cache')
DOCUMENTSPATH = os.path.join(DATAPATH, 'documents')
FILESPATH = os.path.join(DATAPATH, 'files')
DBHOST = 'localhost'
DBPORT = 27017
def calc_hash_crc(filename):
"""Calculate hash and crc32 of selected file"""
data = open(filename, 'rb').read()
fhash = hashlib.sha256(data).hexdigest()
fcrc = zlib.crc32(data)
return {'sha256': fhash, 'crc32' : fcrc}
class DataManager:
def __init__(self):
self.conn = MongoClient(DBHOST, DBPORT)
self.db = self.conn['minec']
self.rs_coll = self.db['splan']
self.rsdoc_coll = self.db['splandocs']
self.rsfiles_coll = self.db['splanfiles']
self.rsstats_coll = self.db['splanstats']
# self.rsindex_coll = self.db['rsindex']
def ping(self):
"""Pings server"""
postdata = POSTTEMPLATE % (PAGE_SIZE, 0)
r = requests.post(URLPAT, postdata)
data = r.text
js = json.loads(data)
total = js['totalLength']
return True, total
def download_all(self, force=False):
"""Download and cache"""
isalive, cnt = self.ping()
total_pages = int((cnt / PAGE_SIZE) + 2)
print('Total count %d, total pages %d' % (cnt, total_pages))
pages = range(1, total_pages, 1)
for n in pages:
print('Page %d of %d' % (n, total_pages))
fname = os.path.join(CACHEPATH, 'page_%d.json' % n)
if not force:
if os.path.exists(fname):
print('Already cached page. Skip')
continue
postdata = POSTTEMPLATE % (PAGE_SIZE, (n-1)*PAGE_SIZE)
r = requests.post(URLPAT, postdata)
data = r.text
f = open(fname, 'w')
f.write(data)
f.close()
def update(self):
"""Update local cache"""
pass
def reload_requests(self):
"""Reload requests"""
def full_reload(self, reset=True):
"""Full reload of data"""
if reset:
self.rs_coll.drop()
self.rs_coll = self.db['splan']
num = 1
files = os.listdir(CACHEPATH)
for fname in files:
f = open(os.path.join(CACHEPATH, fname), 'r')
print ('page', fname)
js = json.load(f)
try:
if 'data' not in js.keys(): continue
except KeyboardInterrupt:
print ('Error processing page', fname)
continue
print ('length', len(js['data']))
for r in js['data']:
print(r['id'])
r['_sysdata'] = {'docsaved' : False, 'filessaved': False}
try:
self.rs_coll.save(r)
except:
print ('Error processing record', r['id'])
continue
print(num, fname)
num += 1
def build_indexes(self):
"""Build indexes for all collections"""
logging.info('Building indexes for rs collection')
self.db['splan'].ensure_index([('id', DESCENDING)])
self.db['splan'].ensure_index([('regNumber', DESCENDING)])
self.db['splan'].ensure_index([('docname', DESCENDING)])
logging.info('Building indexes for splanreq collection')
self.db['splanreq'].ensure_index([('id', ASCENDING)])
self.db['splanreq'].ensure_index([('regNumber', DESCENDING)])
self.db['splanreq'].ensure_index([('docname', DESCENDING)])
logging.info('Building indexes for rsdoc collection')
self.db['splanfiles'].ensure_index([('id', ASCENDING)])
self.db['splanfiles'].ensure_index([('docid', ASCENDING)])
self.db['splanfiles'].ensure_index([('fileEntryId', ASCENDING)])
pass
def index_documents(self, full_reload=False):
total = self.rs_coll.find().count()
print(total)
objects = self.rs_coll.find({'_sysdata.docsaved' : False})
i = 0
n = 0
print(objects.count())
for o in objects:
i += 1
if i % 100 == 0:
print('Indexed documents %d, file %d' % (i, n))
logging.info('Indexed documents %d of %d' % (i, total))
alldocs = []
# pprint(jd)
files = o['request']['files']
for doc in files:
alldocs.append(doc)
for doc in alldocs:
n += 1
docobj = self.rsfiles_coll.find_one({'docid': doc['docid'], 'id' : doc['id']} )
if not docobj:
docobj = doc
# docobj['docid'] = o['id']
docobj['cached'] = False
docobj['url'] = FILE_URL_PAT % (docobj['docid'], docobj['code'])
# docobj['filesize'] = os.stat(filename)[6]
# docobj.update(calc_hash_crc(filename))
# pprint(docobj)
self.rsfiles_coll.save(docobj)
else:
print('Already stored %d' % (doc['docid']))
print(self.rsfiles_coll.count())
def prepare_download(self):
"""Prepare download"""
f = open('files.txt', 'w')
documents = []
allobjects = self.rsfiles_coll.find({'cached' : False} )
for doc in allobjects:
documents.append(doc)
i = 0
for docobj in documents:
i += 1
if i % 100 == 0:
print('Processed %d of %d' % (i, total))
filename = os.path.join(FILESPATH, '%s\\%s' % (docobj['docid'], docobj['name']))
os.makedirs(os.path.join(FILESPATH, str(docobj['docid'])))
f.write(docobj['url'])
def download_docs(self, update=True, indexexisting=True):
"""Download all documents"""
uniqnames = []
total = self.rsfiles_coll.find({'cached' : False}).count()
print(total)
documents = []
allobjects = self.rsfiles_coll.find({'cached' : False} )
for doc in allobjects:
documents.append(doc)
i = 0
for docobj in documents:
i += 1
if i % 100 == 0:
print('Processed %d of %d' % (i, total))
filename = 'files/%s/%s' % (docobj['docid'], docobj['name'])
if indexexisting and os.path.exists(filename):
print('- file already exists', docobj['name'].encode('utf8'))
else:
try:
os.makedirs(os.path.join(FILESPATH, str(docobj['docid'])))
except FileExistsError:
pass
try:
print(docobj['url'])
jsonreq = json.dumps({'jsonrpc':'2.0', 'id':'qwer',
'method':'aria2.addUri',
'params':[[docobj['url']], dict(out=filename)]})
print(jsonreq)
c = requests.post('http://localhost:6800/jsonrpc', jsonreq)
# urlretrieve(docobj['url'], )
except KeyboardInterrupt:
logging.error("Keyboard interrupt occured")
break
# except IOError:
# logging.error('Error retrieving file %s' % (docobj['name'].encode('utf8')))
# continue
# pass
logging.info('File %s retrieved' % (docobj['name'].encode('utf8')))
print('File %s retrieved' % (docobj['name'].encode('utf8')))
docobj['cached'] = True
docobj['filename'] = filename
# docobj['filesize'] = os.stat(filename)[6]
# docobj.update(calc_hash_crc(filename))
self.rsfiles_coll.save(docobj)
def improve_data(self):
docs = self.rsdoc_coll.find()
formats = {}
for doc in docs:
format = doc['name'].rsplit('.', 1)[-1].lower()
if format not in formats:
formats[format] = 1
else:
formats[format] += 1
for format, amount in formats.items():
print('Ext:', format, amount)
def stats(self, extreport=False):
# """Calc collections stats"""
import tabulate
keys = ['indicator', 'value']
table = []
filessize = self.rsdoc_coll.aggregate([{"$group": { "_id" : None, "sum" : { "$sum": "$filesize" } } }]);
rsstats = self.db.command("collstats", "splandocs")
docstats = self.db.command("collstats", "splanfiles")
# print
print('Collecting stats')
table.append(['documents count', rsstats['count']])
table.append(['documents size', rsstats['size']/(10024*1024)])
table.append(['files count', docstats['count']])
table.append(['files size', docstats['size']/(10024*1024)])
table.append(['cached files count', self.rsfiles_coll.find({'cached' : True}).count()])
table.append(['cached files size', filessize['result'][0]['sum']/(1024*1024)])
print(tabulate.tabulate(table, headers=keys))
if extreport:
contentTypes = self.rsdoc_coll.aggregate([{"$group": { "_id" : "$contentType", "sum" : { "$sum": 1 } } }]);
table = []
keys = ['contentType', 'count']
for c in contentTypes['result']:
table.append([c['_id'], c['sum']])
print(tabulate.tabulate(table, headers=keys))
@click.group()
def cli1():
pass
@cli1.command()
def download():
"""Download data"""
manager = DataManager()
manager.download_all()
pass
@click.group()
def cli2():
pass
@cli2.command()
def download_docs():
"""Download all documents"""
manager = DataManager()
manager.download_docs()
pass
@click.group()
def cli3():
pass
@cli3.command()
def loaddata():
"""Load cached JSON data into MongoDB"""
manager = DataManager()
manager.full_reload()
pass
@click.group()
def cli4():
pass
@cli4.command()
def stats():
"""Return actual statistics"""
manager = DataManager()
manager.stats()
pass
@click.group()
def cli5():
pass
@cli5.command()
def ping():
"""Ping source"""
manager = DataManager()
isalive, cnt = manager.ping()
if isalive:
print('Alive. Record count is', cnt)
else:
print('Not available')
pass
@click.group()
def cli6():
pass
@cli6.command()
def reindex():
"""Create database indexes"""
manager = DataManager()
manager.build_indexes()
manager.index_documents()
@click.group()
def cli7():
pass
@cli7.command()
def improvedata():
"""Improve data"""
manager = DataManager()
manager.improve_data()
cli = click.CommandCollection(sources=[cli1, cli2, cli3, cli4, cli5, cli6, cli7])
if __name__ == '__main__':
logging.basicConfig(filename='subcmd.log',level=logging.DEBUG)
cli()
# download_docs()
# process()
# download()