'''
загрузчик данных из API системы ЕПБС
'''
import asyncio
import aiohttp
import json
import tqdm
import click
import logging
import sys
conf = {'tq': None}
URL = 'http://budget.gov.ru/epbs/registry/{name}/data'
logging.basicConfig(level=logging.INFO, filename='agreement_loader.log')
logger = logging.getLogger(name='agreement_loader')
def json_writer(fileobj, per_row=False):
if not per_row:
fileobj.write(b'[\n')
while True:
try:
data = yield
if per_row:
fileobj.write(data)
fileobj.write(b'\n')
else:
fileobj.write(b' ')
fileobj.write(data)
fileobj.write(b',\n')
except StopIteration:
if per_row:
fileobj.write(data)
fileobj.write(b'\n')
else:
fileobj.write(b' ')
fileobj.write(data)
fileobj.write(b'\n')
fileobj.write(b']')
async def fetch_simple(url, session, tq, out):
async with session.get(url) as response:
if response.status != 200:
response.raise_for_status()
data = await response.json()
logger.info('got data from url %s', url)
if tq:
tq.update(1)
else:
return data
for row in data['data']:
row = json.dumps(row, ensure_ascii=False).encode('utf8')
if out:
out.send(row)
else:
sys.stdout.buffer.write(row)
sys.stdout.buffer.write(b'\r\n')
async def fetch_chunk(base_url, page_size=1, counter=1, tq=None, out=None):
connector = aiohttp.TCPConnector(limit=20)
timeout = aiohttp.ClientTimeout(total=600)
dummy_jar = aiohttp.DummyCookieJar()
async with aiohttp.ClientSession(connector=connector, timeout=timeout, cookie_jar=dummy_jar) as session:
tasks = [asyncio.create_task(fetch_simple(base_url + '?pageSize=%s&pageNum=%s' % (page_size, i), session, tq, out)) for i in range(1, counter + 1)]
return await asyncio.gather(*tasks)
@click.command()
@click.option('-u', '--uri', default='grants', help='название раздела в API ЕПБС (например "grants")')
@click.option('-s', '--page-size', default=100, help='размер страницы при выкачивании данных (до 1000)')
@click.option('-c', '--page-count', type=int, help='количество страниц')
@click.option('-f', '--result-filename', default='result.json', help='имя выходного файла')
@click.option('-r', '--per-row', default=False, is_flag=True, help='построчный вывод в файл')
def main(uri, page_size, page_count, result_filename, per_row):
url = URL.format(name=uri)
if page_count:
counter = page_count
else:
page = asyncio.run(fetch_chunk(url))[0]
del page['data']
logger.info('download simple page. Got data: %s', page)
counter = int(page['recordCount']) // page_size + 1
tq = tqdm.tqdm(total=counter, desc='json load')
if per_row:
asyncio.run(fetch_chunk(url, page_size, counter, tq=tq, out=None))
else:
with open(result_filename, 'wb') as f:
writer = json_writer(f)
writer.send(None)
asyncio.run(fetch_chunk(url, page_size, counter, tq=tq, out=writer))
writer.throw(StopIteration)
tq.close()
asyncio.run(asyncio.sleep(0))
if __name__ == '__main__':
main()