git / code.ach.gov.ru / gavrin / epbs_loader

commit 76767b3508711b1d4bbaaf832bc125e2b41ce57c

author gavrin <whintu@gmail.com>

date 2020-02-10 16:24:18 +0300

parents root commit

browse tree at this commit

message

init commit

files

fileadddel
.gitignore+49-0
epbs_data_loader.py+103-0
js_pg_loader.py+34-0

patch

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000000000000000000000000000000000000..2873e01c7aed4ae631d9df64d054607ff3f769de
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,49 @@
+# Temporary and binary files
+*~
+*.py[cod]
+*.so
+*.cfg
+!.isort.cfg
+!setup.cfg
+*.orig
+*.log
+*.pot
+__pycache__/*
+.cache/*
+.*.swp
+*/.ipynb_checkpoints/*
+
+# Project files
+.ropeproject
+.project
+.pydevproject
+.settings
+.idea
+tags
+
+# Package files
+*.egg
+*.eggs/
+.installed.cfg
+*.egg-info
+
+# Unittest and coverage
+htmlcov/*
+.coverage
+.tox
+junit.xml
+coverage.xml
+.pytest_cache/
+
+# Build and docs folder/files
+build/*
+dist/*
+sdist/*
+docs/api/*
+docs/_rst/*
+docs/_build/*
+cover/*
+MANIFEST
+
+# Per-project virtualenvs
+.venv*/
diff --git a/epbs_data_loader.py b/epbs_data_loader.py
new file mode 100644
index 0000000000000000000000000000000000000000..1ac1457c331de475bf8f0b171a6a9ae7ba27287a
--- /dev/null
+++ b/epbs_data_loader.py
@@ -0,0 +1,103 @@
+'''
+загрузчик данных из API системы ЕПБС
+'''
+import asyncio
+import aiohttp
+import json
+import tqdm
+import click
+import logging
+import sys
+
+conf = {'tq': None}
+URL = 'http://budget.gov.ru/epbs/registry/{name}/data'
+
+logging.basicConfig(level=logging.INFO, filename='agreement_loader.log')
+logger = logging.getLogger(name='agreement_loader')
+
+
+def json_writer(fileobj, per_row=False):
+    if not per_row:
+        fileobj.write(b'[\n')
+
+    while True:
+        try:
+            data = yield
+            if per_row:
+                fileobj.write(data)
+                fileobj.write(b'\n')
+            else:
+                fileobj.write(b'  ')
+                fileobj.write(data)
+                fileobj.write(b',\n')
+        except StopIteration:
+            if per_row:
+                fileobj.write(data)
+                fileobj.write(b'\n')
+            else:
+                fileobj.write(b'  ')
+                fileobj.write(data)
+                fileobj.write(b'\n')
+                fileobj.write(b']')
+
+
+async def fetch_simple(url, session, tq, out):
+    async with session.get(url) as response:
+        if response.status != 200:
+            response.raise_for_status()
+        data = await response.json()
+        logger.info('got data from url %s', url)
+        if tq:
+            tq.update(1)
+        else:
+            return data
+        for row in data['data']:
+            row = json.dumps(row, ensure_ascii=False).encode('utf8')
+            if out:
+                out.send(row)
+            else:
+                sys.stdout.buffer.write(row)
+                sys.stdout.buffer.write(b'\r\n')
+
+
+async def fetch_chunk(base_url, page_size=1, counter=1, tq=None, out=None):
+    connector = aiohttp.TCPConnector(limit=20)
+    timeout = aiohttp.ClientTimeout(total=600)
+    dummy_jar = aiohttp.DummyCookieJar()
+    async with aiohttp.ClientSession(connector=connector, timeout=timeout, cookie_jar=dummy_jar) as session:
+        tasks = [asyncio.create_task(fetch_simple(base_url + '?pageSize=%s&pageNum=%s' % (page_size, i), session, tq, out)) for i in range(1, counter + 1)]
+        return await asyncio.gather(*tasks)
+
+
+@click.command()
+@click.option('-u', '--uri', default='grants', help='название раздела в API ЕПБС (например "grants")')
+@click.option('-s', '--page-size', default=100, help='размер страницы при выкачивании данных (до 1000)')
+@click.option('-c', '--page-count', type=int, help='количество страниц')
+@click.option('-f', '--result-filename', default='result.json', help='имя выходного файла')
+@click.option('-r', '--per-row', default=False, is_flag=True, help='построчный вывод в файл')
+def main(uri, page_size, page_count, result_filename, per_row):
+    url = URL.format(name=uri)
+    if page_count:
+        counter = page_count
+    else:
+        page = asyncio.run(fetch_chunk(url))[0]
+        del page['data']
+        logger.info('download simple page. Got data: %s', page)
+        counter = int(page['recordCount']) // page_size + 1
+
+    tq = tqdm.tqdm(total=counter, desc='json load')
+    if per_row:
+        asyncio.run(fetch_chunk(url, page_size, counter, tq=tq, out=None))
+    else:
+        with open(result_filename, 'wb') as f:
+            writer = json_writer(f)
+            writer.send(None)
+            asyncio.run(fetch_chunk(url, page_size, counter, tq=tq, out=writer))
+            writer.throw(StopIteration)
+
+    tq.close()
+    asyncio.run(asyncio.sleep(0))
+
+
+if __name__ == '__main__':
+    main()
diff --git a/js_pg_loader.py b/js_pg_loader.py
new file mode 100644
index 0000000000000000000000000000000000000000..174971e77547e30e333f649e8d792e832ce6a9bc
--- /dev/null
+++ b/js_pg_loader.py
@@ -0,0 +1,34 @@
+import sys
+import tqdm
+import click
+import json
+import psycopg2 as pg
+from psycopg2.extras import execute_values
+
+dsn = 'postgresql://etl:etl@10.0.0.29/epbs'
+TABLE_NAME = 'public.data'
+QUERY = 'INSERT INTO {table}(id, body) VALUES %s'.format(table=TABLE_NAME)
+
+
+def rd(inp, *args):
+    for row in inp:
+        body = json.loads(row, encoding='utf8')
+        yield [body.get(f) for f in args] + [row]
+
+
+@click.command()
+@click.option('-f', '--input-file', help='входной файл для загрузки (не может быть пустым)')
+@click.option('-d', '--db', default=dsn, help='строка подключения, по умолчанию "%s"' % dsn)
+@click.option('-k', '--keyfield', default='id', help='поле во входных данных, являющееся ключом')
+def main(input_file, db, keyfield):
+    with pg.connect(db) as con:
+        with con.cursor() as cursor:
+            cursor.execute('TRUNCATE TABLE {table}'.format(table=TABLE_NAME))
+        con.commit()
+        with con.cursor() as cursor:
+            with open(input_file, encoding='utf8') as inp_file:
+                execute_values(cursor, QUERY, tqdm.tqdm(rd(inp_file, keyfield), desc='load data to database'))
+
+
+if __name__ == "__main__":
+    main()