import sys
import tqdm
import click
import json
import psycopg2 as pg
from psycopg2.extras import execute_values
dsn = 'postgresql://etl:etl@localhost/epbs'
TABLE_NAME = 'public.data'
QUERY = 'INSERT INTO {table}(id, body) VALUES %s'.format(table=TABLE_NAME)
def rd(inp, *args):
for row in inp:
body = json.loads(row, encoding='utf8')
yield [body.get(f) for f in args] + [row]
@click.command()
@click.option('-f', '--input-file', help='входной файл для загрузки (не может быть пустым)')
@click.option('-d', '--db', default=dsn, help='строка подключения, по умолчанию "%s"' % dsn)
@click.option('-k', '--keyfield', default='id', help='поле во входных данных, являющееся ключом')
def main(input_file, db, keyfield):
with pg.connect(db) as con:
with con.cursor() as cursor:
cursor.execute('TRUNCATE TABLE {table}'.format(table=TABLE_NAME))
con.commit()
with con.cursor() as cursor:
with open(input_file, encoding='utf8') as inp_file:
execute_values(cursor, QUERY, tqdm.tqdm(rd(inp_file, keyfield), desc='load data to database'))
if __name__ == "__main__":
main()