|
6 | 6 | import os |
7 | 7 | import sys |
8 | 8 | import time |
9 | | -from senzing import G2BadInputException, G2Engine, G2Exception, G2RetryableException, G2UnrecoverableException |
| 9 | +from senzing import ( |
| 10 | + G2BadInputException, |
| 11 | + G2Engine, |
| 12 | + G2Exception, |
| 13 | + G2RetryableException, |
| 14 | + G2UnrecoverableException, |
| 15 | +) |
10 | 16 |
|
11 | | -engine_config_json = os.getenv('SENZING_ENGINE_CONFIGURATION_JSON', None) |
| 17 | +engine_config_json = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", None) |
12 | 18 |
|
13 | 19 |
|
14 | 20 | def mock_logger(level, exception, error_rec=None): |
15 | | - print(f'\n{level}: {exception}', file=sys.stderr) |
| 21 | + print(f"\n{level}: {exception}", file=sys.stderr) |
16 | 22 | if error_rec: |
17 | | - print(f'{error_rec}', file=sys.stderr) |
| 23 | + print(f"{error_rec}", file=sys.stderr) |
18 | 24 |
|
19 | 25 |
|
20 | 26 | def del_record(engine, rec_to_del): |
21 | 27 | with_info = bytearray() |
22 | 28 | record_dict = json.loads(rec_to_del) |
23 | | - data_source = record_dict.get('DATA_SOURCE', None) |
24 | | - record_id = record_dict.get('RECORD_ID', None) |
| 29 | + data_source = record_dict.get("DATA_SOURCE", None) |
| 30 | + record_id = record_dict.get("RECORD_ID", None) |
25 | 31 | engine.deleteRecordWithInfo(data_source, record_id, with_info) |
26 | | - return with_info.decode() + '\n' |
| 32 | + return with_info.decode() |
27 | 33 |
|
28 | 34 |
|
29 | 35 | def engine_stats(engine): |
30 | 36 | response = bytearray() |
31 | 37 | try: |
32 | 38 | engine.stats(response) |
33 | | - print(f'\n{response.decode()}\n') |
34 | | - except G2RetryableException as ex: |
35 | | - mock_logger('WARN', ex) |
36 | | - except (G2UnrecoverableException, G2Exception) as ex: |
37 | | - mock_logger('CRITICAL', ex) |
| 39 | + print(f"\n{response.decode()}\n") |
| 40 | + except G2RetryableException as err: |
| 41 | + mock_logger("WARN", err) |
| 42 | + except G2Exception as err: |
| 43 | + mock_logger("CRITICAL", err) |
38 | 44 | raise |
39 | 45 |
|
40 | 46 |
|
41 | | -def record_stats(success_recs, prev_time): |
42 | | - print(f'Processed {success_recs} deletes, {int(1000 / (time.time() - prev_time))} records per second') |
| 47 | +def record_stats(success, error, prev_time): |
| 48 | + print( |
| 49 | + f"Processed {success:,} deletes," |
| 50 | + f" {int(1000 / (time.time() - prev_time)):,} records per second," |
| 51 | + f" {error} errors" |
| 52 | + ) |
43 | 53 | return time.time() |
44 | 54 |
|
45 | 55 |
|
46 | 56 | def futures_del(engine, input_file, output_file): |
47 | 57 | prev_time = time.time() |
48 | 58 | success_recs = error_recs = 0 |
49 | 59 |
|
50 | | - with open(output_file, 'w') as out_file: |
51 | | - with open(input_file, 'r') as in_file: |
| 60 | + with open(output_file, "w") as out_file: |
| 61 | + with open(input_file, "r") as in_file: |
52 | 62 | with concurrent.futures.ThreadPoolExecutor() as executor: |
53 | | - futures = {executor.submit(del_record, engine, record): record for record in itertools.islice(in_file, executor._max_workers)} |
| 63 | + futures = { |
| 64 | + executor.submit(del_record, engine, record): record |
| 65 | + for record in itertools.islice(in_file, executor._max_workers) |
| 66 | + } |
54 | 67 |
|
55 | 68 | while futures: |
56 | | - for f in concurrent.futures.as_completed(futures.keys()): |
| 69 | + done, _ = concurrent.futures.wait( |
| 70 | + futures, return_when=concurrent.futures.FIRST_COMPLETED |
| 71 | + ) |
| 72 | + for f in done: |
57 | 73 | try: |
58 | 74 | result = f.result() |
59 | | - except G2BadInputException as ex: |
60 | | - mock_logger('ERROR', ex, futures[f]) |
| 75 | + except (G2BadInputException, json.JSONDecodeError) as err: |
| 76 | + mock_logger("ERROR", err, futures[f]) |
61 | 77 | error_recs += 1 |
62 | | - except G2RetryableException as ex: |
63 | | - mock_logger('WARN', ex, futures[f]) |
| 78 | + except G2RetryableException as err: |
| 79 | + mock_logger("WARN", err, futures[f]) |
64 | 80 | error_recs += 1 |
65 | | - except (G2UnrecoverableException, G2Exception) as ex: |
66 | | - mock_logger('CRITICAL', ex, futures[f]) |
| 81 | + except (G2UnrecoverableException, G2Exception) as err: |
| 82 | + mock_logger("CRITICAL", err, futures[f]) |
67 | 83 | raise |
68 | | - except json.JSONDecodeError as ex: |
69 | | - mock_logger('ERROR', ex, futures[f]) |
70 | | - error_recs += 1 |
71 | 84 | else: |
72 | | - success_recs += 1 |
73 | | - out_file.write(result) |
| 85 | + record = in_file.readline() |
| 86 | + if record: |
| 87 | + futures[executor.submit(del_record, engine, record)] = ( |
| 88 | + record |
| 89 | + ) |
| 90 | + |
| 91 | + out_file.write(f"{result}\n") |
74 | 92 |
|
| 93 | + success_recs += 1 |
75 | 94 | if success_recs % 1000 == 0: |
76 | | - prev_time = record_stats(success_recs, prev_time) |
| 95 | + prev_time = record_stats( |
| 96 | + success_recs, error_recs, prev_time |
| 97 | + ) |
77 | 98 |
|
78 | 99 | if success_recs % 10000 == 0: |
79 | 100 | engine_stats(engine) |
80 | 101 | finally: |
81 | | - futures.pop(f) |
82 | | - |
83 | | - record = in_file.readline() |
84 | | - if record: |
85 | | - futures[executor.submit(del_record, engine, record)] = record |
| 102 | + del futures[f] |
86 | 103 |
|
87 | | - print(f'Successfully deleted {success_recs} records, with {error_recs} errors') |
88 | | - print(f'With info responses written to {output_file}') |
| 104 | + print( |
| 105 | + f"Successfully deleted {success_recs:,} records, with" |
| 106 | + f" {error_recs:,} errors" |
| 107 | + ) |
| 108 | + print(f"With info responses written to {output_file}") |
89 | 109 |
|
90 | 110 |
|
91 | 111 | try: |
92 | 112 | g2_engine = G2Engine() |
93 | | - g2_engine.init('G2Engine', engine_config_json, False) |
| 113 | + g2_engine.init("G2Engine", engine_config_json, False) |
94 | 114 | futures_del( |
95 | 115 | g2_engine, |
96 | | - '../../../Resources/Data/del-10K.json', |
97 | | - '../../../Resources/Output/Del_File_WithInfo.json') |
| 116 | + "../../../Resources/Data/del-10K.json", |
| 117 | + "../../../Resources/Output/Del_File_WithInfo.json", |
| 118 | + ) |
98 | 119 | g2_engine.destroy() |
99 | | -except (G2BadInputException, G2RetryableException, G2UnrecoverableException, G2Exception) as ex: |
100 | | - print(ex) |
101 | | - sys.exit(-1) |
| 120 | +except G2Exception as err: |
| 121 | + mock_logger("CRITICAL", err) |
0 commit comments