44import os
55import sys
66import time
7+
78from senzing import (
89 G2BadInputException ,
910 G2Engine ,
@@ -71,10 +72,7 @@ def redo_count(engine):
7172
7273
7374def redo_pause (success ):
74- print (
75- "No redo records to process, pausing for 30 seconds. Total processed:"
76- f" { success :,} (CTRL-C to exit)..."
77- )
75+ print ("No redo records to process, pausing for 30 seconds. Total processed:" f" { success :,} (CTRL-C to exit)..." )
7876 time .sleep (30 )
7977
8078
@@ -94,9 +92,7 @@ def futures_redo(engine):
9492 break
9593
9694 while True :
97- done , _ = concurrent .futures .wait (
98- futures , return_when = concurrent .futures .FIRST_COMPLETED
99- )
95+ done , _ = concurrent .futures .wait (futures , return_when = concurrent .futures .FIRST_COMPLETED )
10096 for f in done :
10197 try :
10298 _ = f .result ()
@@ -110,24 +106,19 @@ def futures_redo(engine):
110106 mock_logger ("CRITICAL" , err , futures [f ])
111107 raise
112108 else :
113- record = get_redo_record (engine )
114- if record :
115- futures [
116- executor .submit (process_redo_record , engine , record )
117- ] = record
118- else :
119- redo_paused = True
120-
121109 success_recs += 1
122110 if success_recs % 100 == 0 :
123- print (
124- f"Processed { success_recs :,} redo records, with"
125- f" { error_recs :,} errors"
126- )
111+ print (f"Processed { success_recs :,} redo records, with" f" { error_recs :,} errors" )
127112
128113 if success_recs % 1000 == 0 :
129114 engine_stats (engine )
130115 finally :
116+ record = get_redo_record (engine )
117+ if record :
118+ futures [executor .submit (process_redo_record , engine , record )] = record
119+ else :
120+ redo_paused = True
121+
131122 del futures [f ]
132123
133124 if redo_paused :
@@ -137,9 +128,7 @@ def futures_redo(engine):
137128 while len (futures ) < executor ._max_workers :
138129 record = get_redo_record (engine )
139130 if record :
140- futures [
141- executor .submit (process_redo_record , engine , record )
142- ] = record
131+ futures [executor .submit (process_redo_record , engine , record )] = record
143132
144133
145134try :
0 commit comments