git / code.ach.gov.ru / scarping / stcmd
commit 251e9fd0fcff27a7666b57d3a1cd54e0dd1d5ab1
author Иван Бегтин <ibegtin@gmail.com>
date 2019-05-28 06:50:38 +0300
parents root commit
message
Первоначальный импорт
files
| file | add | del |
|---|---|---|
| README.md | +48 | -0 |
| stcmd.py | +372 | -0 |
| test.py | +7 | -0 |
patch
diff --git a/README.md b/README.md
new file mode 100644
index 0000000000000000000000000000000000000000..c2820c866aca604c6f9ac48bc0ff3d521eff3fb7
--- /dev/null
+++ b/README.md
@@ -0,0 +1,48 @@
+# subcmd
+Утилиты командной строки для работы с данными страт планирования в рамках ГАС "Управление"
+
+## Команды
+
+Получить статистику
+```
+python stcmd.py stats
+```
+
+Выгрузить все данные из gasu.gov.ru
+```
+python stcmd.py download
+```
+
+Обработать данные по документам стратпланирования - загружает все JSON'ы в MongoDB
+```
+python stcmd.py process
+```
+
+Построить индексы по таблицам и создать таблицу документов для последующей выгрузки документов
+```
+python stcmd.py reindex
+```
+Выкачать все файлы с gasu.gov.ru локально
+```
+python stcmd.py download_docs
+```
+
+# Условия
+
+Выгрузка файлов работает с помощью утилиты aria2c с с командой --enable-rpc, это позволяет существенно ускорить их выгрузку. Утилита ставится с помощью
+apt install aria2 и аналогичных команд развёртывания пакетов в других ОС
+```
+aria2c --enable-rpc
+```
+
+
+## Доступ к данным
+Доступ через сервер MongoDB по порту 27017.
+База **minec** коллекции:
+* **splan** - база с описаниями реестра стратпланирования
+* **splandocs** - база документов включая статус их обработки
+* **splanfiles** - выкачанные файлы
+* **splanstats** - статистика
+
+
+
diff --git a/stcmd.py b/stcmd.py
new file mode 100644
index 0000000000000000000000000000000000000000..104f561f05e01a91430f7502bfc32970a191e81e
--- /dev/null
+++ b/stcmd.py
@@ -0,0 +1,372 @@
+#!/usr/bin/env python
+
+import click
+import urllib
+import json
+import os
+import hashlib
+import zlib
+import time
+import csv
+import logging
+import datetime
+import io
+import sys
+import requests
+
+from urllib.request import urlopen, urlretrieve
+from pprint import pprint
+from pymongo import MongoClient, DESCENDING, ASCENDING
+
+logging.getLogger().addHandler(logging.StreamHandler())
+
+PAGE_SIZE = 10
+
+PINGURL = 'http://gasu.gov.ru/gwt/stratplanning/rest/stratplanning/registry/list'
+URLPAT = 'http://gasu.gov.ru/gwt/stratplanning/rest/stratplanning/registry/list'
+FILE_URL_PAT = 'http://gasu.gov.ru/GASUServicesSpring/rest/document/downloadDoc/stratplanning/%s/%s'
+SINGLEITEM_PAT = 'http://gasu.gov.ru/gwt/stratdocuments/rest/stratplanning/registry/getDocumentByRequestId/%s'
+POSTTEMPLATE = '{"sortInfo":[{"sortField":"id", "sortDir":"DESC"}], "limit":%d, "offset":%d, "disjunction":false, "filters":[]}'
+
+DATAPATH = '/mnt/c/data/stratplan'
+#DATAPATH = 'C:\\data\\stratplan'
+CACHEPATH = os.path.join(DATAPATH, 'cache')
+DOCUMENTSPATH = os.path.join(DATAPATH, 'documents')
+FILESPATH = os.path.join(DATAPATH, 'files')
+
+DBHOST = 'localhost'
+DBPORT = 27017
+
+
+def calc_hash_crc(filename):
+ """Calculate hash and crc32 of selected file"""
+ data = open(filename, 'rb').read()
+ fhash = hashlib.sha256(data).hexdigest()
+ fcrc = zlib.crc32(data)
+ return {'sha256': fhash, 'crc32' : fcrc}
+
+class DataManager:
+ def __init__(self):
+ self.conn = MongoClient(DBHOST, DBPORT)
+ self.db = self.conn['minec']
+ self.rs_coll = self.db['splan']
+ self.rsdoc_coll = self.db['splandocs']
+ self.rsfiles_coll = self.db['splanfiles']
+ self.rsstats_coll = self.db['splanstats']
+# self.rsindex_coll = self.db['rsindex']
+
+ def ping(self):
+ """Pings server"""
+ postdata = POSTTEMPLATE % (PAGE_SIZE, 0)
+ r = requests.post(URLPAT, postdata)
+ data = r.text
+ js = json.loads(data)
+ total = js['totalLength']
+ return True, total
+
+
+ def download_all(self, force=False):
+ """Download and cache"""
+ isalive, cnt = self.ping()
+ total_pages = int((cnt / PAGE_SIZE) + 2)
+ print('Total count %d, total pages %d' % (cnt, total_pages))
+ pages = range(1, total_pages, 1)
+ for n in pages:
+ print('Page %d of %d' % (n, total_pages))
+ fname = os.path.join(CACHEPATH, 'page_%d.json' % n)
+ if not force:
+ if os.path.exists(fname):
+ print('Already cached page. Skip')
+ continue
+ postdata = POSTTEMPLATE % (PAGE_SIZE, (n-1)*PAGE_SIZE)
+ r = requests.post(URLPAT, postdata)
+ data = r.text
+ f = open(fname, 'w')
+ f.write(data)
+ f.close()
+
+ def update(self):
+ """Update local cache"""
+ pass
+
+ def reload_requests(self):
+ """Reload requests"""
+
+
+ def full_reload(self, reset=True):
+ """Full reload of data"""
+ if reset:
+ self.rs_coll.drop()
+ self.rs_coll = self.db['splan']
+ num = 1
+ files = os.listdir(CACHEPATH)
+ for fname in files:
+ f = open(os.path.join(CACHEPATH, fname), 'r')
+ print ('page', fname)
+ js = json.load(f)
+ try:
+ if 'data' not in js.keys(): continue
+ except KeyboardInterrupt:
+ print ('Error processing page', fname)
+ continue
+ print ('length', len(js['data']))
+ for r in js['data']:
+ print(r['id'])
+ r['_sysdata'] = {'docsaved' : False, 'filessaved': False}
+ try:
+ self.rs_coll.save(r)
+ except:
+ print ('Error processing record', r['id'])
+ continue
+ print(num, fname)
+ num += 1
+
+ def build_indexes(self):
+ """Build indexes for all collections"""
+ logging.info('Building indexes for rs collection')
+ self.db['splan'].ensure_index([('id', DESCENDING)])
+ self.db['splan'].ensure_index([('regNumber', DESCENDING)])
+ self.db['splan'].ensure_index([('docname', DESCENDING)])
+ logging.info('Building indexes for splanreq collection')
+ self.db['splanreq'].ensure_index([('id', ASCENDING)])
+ self.db['splanreq'].ensure_index([('regNumber', DESCENDING)])
+ self.db['splanreq'].ensure_index([('docname', DESCENDING)])
+ logging.info('Building indexes for rsdoc collection')
+ self.db['splanfiles'].ensure_index([('id', ASCENDING)])
+ self.db['splanfiles'].ensure_index([('docid', ASCENDING)])
+ self.db['splanfiles'].ensure_index([('fileEntryId', ASCENDING)])
+ pass
+
+
+
+ def index_documents(self, full_reload=False):
+ total = self.rs_coll.find().count()
+ print(total)
+ objects = self.rs_coll.find({'_sysdata.docsaved' : False})
+ i = 0
+ n = 0
+ print(objects.count())
+ for o in objects:
+ i += 1
+ if i % 100 == 0:
+ print('Indexed documents %d, file %d' % (i, n))
+ logging.info('Indexed documents %d of %d' % (i, total))
+ alldocs = []
+ # pprint(jd)
+ files = o['request']['files']
+ for doc in files:
+ alldocs.append(doc)
+ for doc in alldocs:
+ n += 1
+ docobj = self.rsfiles_coll.find_one({'docid': doc['docid'], 'id' : doc['id']} )
+ if not docobj:
+ docobj = doc
+# docobj['docid'] = o['id']
+ docobj['cached'] = False
+ docobj['url'] = FILE_URL_PAT % (docobj['docid'], docobj['code'])
+# docobj['filesize'] = os.stat(filename)[6]
+# docobj.update(calc_hash_crc(filename))
+# pprint(docobj)
+ self.rsfiles_coll.save(docobj)
+ else:
+ print('Already stored %d' % (doc['docid']))
+
+ print(self.rsfiles_coll.count())
+
+
+ def prepare_download(self):
+ """Prepare download"""
+ f = open('files.txt', 'w')
+ documents = []
+ allobjects = self.rsfiles_coll.find({'cached' : False} )
+ for doc in allobjects:
+ documents.append(doc)
+ i = 0
+ for docobj in documents:
+ i += 1
+ if i % 100 == 0:
+ print('Processed %d of %d' % (i, total))
+ filename = os.path.join(FILESPATH, '%s\\%s' % (docobj['docid'], docobj['name']))
+ os.makedirs(os.path.join(FILESPATH, str(docobj['docid'])))
+ f.write(docobj['url'])
+
+
+
+ def download_docs(self, update=True, indexexisting=True):
+ """Download all documents"""
+ uniqnames = []
+ total = self.rsfiles_coll.find({'cached' : False}).count()
+ print(total)
+ documents = []
+ allobjects = self.rsfiles_coll.find({'cached' : False} )
+ for doc in allobjects:
+ documents.append(doc)
+ i = 0
+ for docobj in documents:
+ i += 1
+ if i % 100 == 0:
+ print('Processed %d of %d' % (i, total))
+ filename = 'files/%s/%s' % (docobj['docid'], docobj['name'])
+ if indexexisting and os.path.exists(filename):
+ print('- file already exists', docobj['name'].encode('utf8'))
+ else:
+ try:
+ os.makedirs(os.path.join(FILESPATH, str(docobj['docid'])))
+ except FileExistsError:
+ pass
+ try:
+ print(docobj['url'])
+ jsonreq = json.dumps({'jsonrpc':'2.0', 'id':'qwer',
+ 'method':'aria2.addUri',
+ 'params':[[docobj['url']], dict(out=filename)]})
+ print(jsonreq)
+ c = requests.post('http://localhost:6800/jsonrpc', jsonreq)
+# urlretrieve(docobj['url'], )
+ except KeyboardInterrupt:
+ logging.error("Keyboard interrupt occured")
+ break
+# except IOError:
+# logging.error('Error retrieving file %s' % (docobj['name'].encode('utf8')))
+# continue
+# pass
+ logging.info('File %s retrieved' % (docobj['name'].encode('utf8')))
+ print('File %s retrieved' % (docobj['name'].encode('utf8')))
+ docobj['cached'] = True
+ docobj['filename'] = filename
+# docobj['filesize'] = os.stat(filename)[6]
+# docobj.update(calc_hash_crc(filename))
+ self.rsfiles_coll.save(docobj)
+
+
+ def improve_data(self):
+ docs = self.rsdoc_coll.find()
+ formats = {}
+ for doc in docs:
+ format = doc['name'].rsplit('.', 1)[-1].lower()
+ if format not in formats:
+ formats[format] = 1
+ else:
+ formats[format] += 1
+ for format, amount in formats.items():
+ print('Ext:', format, amount)
+
+ def stats(self, extreport=False):
+# """Calc collections stats"""
+ import tabulate
+ keys = ['indicator', 'value']
+ table = []
+ filessize = self.rsdoc_coll.aggregate([{"$group": { "_id" : None, "sum" : { "$sum": "$filesize" } } }]);
+ rsstats = self.db.command("collstats", "splandocs")
+ docstats = self.db.command("collstats", "splanfiles")
+# print
+ print('Collecting stats')
+ table.append(['documents count', rsstats['count']])
+ table.append(['documents size', rsstats['size']/(10024*1024)])
+ table.append(['files count', docstats['count']])
+ table.append(['files size', docstats['size']/(10024*1024)])
+ table.append(['cached files count', self.rsfiles_coll.find({'cached' : True}).count()])
+ table.append(['cached files size', filessize['result'][0]['sum']/(1024*1024)])
+
+ print(tabulate.tabulate(table, headers=keys))
+
+ if extreport:
+ contentTypes = self.rsdoc_coll.aggregate([{"$group": { "_id" : "$contentType", "sum" : { "$sum": 1 } } }]);
+ table = []
+ keys = ['contentType', 'count']
+ for c in contentTypes['result']:
+ table.append([c['_id'], c['sum']])
+ print(tabulate.tabulate(table, headers=keys))
+
+
+@click.group()
+def cli1():
+ pass
+
+@cli1.command()
+def download():
+ """Download data"""
+ manager = DataManager()
+ manager.download_all()
+ pass
+
+
+@click.group()
+def cli2():
+ pass
+
+@cli2.command()
+def download_docs():
+ """Download all documents"""
+ manager = DataManager()
+ manager.download_docs()
+ pass
+
+@click.group()
+def cli3():
+ pass
+
+@cli3.command()
+def loaddata():
+ """Load cached JSON data into MongoDB"""
+ manager = DataManager()
+ manager.full_reload()
+ pass
+
+@click.group()
+def cli4():
+ pass
+
+@cli4.command()
+def stats():
+ """Return actual statistics"""
+ manager = DataManager()
+ manager.stats()
+ pass
+
+@click.group()
+def cli5():
+ pass
+
+@cli5.command()
+def ping():
+ """Ping source"""
+ manager = DataManager()
+ isalive, cnt = manager.ping()
+ if isalive:
+ print('Alive. Record count is', cnt)
+ else:
+ print('Not available')
+
+ pass
+
+@click.group()
+def cli6():
+ pass
+
+@cli6.command()
+def reindex():
+ """Create database indexes"""
+ manager = DataManager()
+ manager.build_indexes()
+ manager.index_documents()
+
+
+@click.group()
+def cli7():
+ pass
+
+@cli7.command()
+def improvedata():
+ """Improve data"""
+ manager = DataManager()
+ manager.improve_data()
+
+cli = click.CommandCollection(sources=[cli1, cli2, cli3, cli4, cli5, cli6, cli7])
+
+if __name__ == '__main__':
+ logging.basicConfig(filename='subcmd.log',level=logging.DEBUG)
+ cli()
+
+# download_docs()
+# process()
+# download()
diff --git a/test.py b/test.py
new file mode 100644
index 0000000000000000000000000000000000000000..c6b80db2b6c75778fa124ff70bec54f7e1ebfdb7
--- /dev/null
+++ b/test.py
@@ -0,0 +1,7 @@
+import requests, json
+jsonreq = json.dumps({'jsonrpc':'2.0', 'id':'qwer',
+ 'method':'aria2.addUri',
+ 'params':[['http://example.org'], dict(out='/mnt/c/data/stratplan/files/28252351/739 \u043e\u0442 18.12.2015 \u041e\u0434\u043e\u0431\u0440\u0435\u043d\u0438\u0435 \u043f\u0440\u043e\u0433\u043d\u043e\u0437\u0430.doc')]})
+c = requests.post('http://localhost:6800/jsonrpc', jsonreq)
+data = c.text
+print(data)
\ No newline at end of file