|
12 | 12 | */ |
13 | 13 | package io.kubernetes.client; |
14 | 14 |
|
| 15 | +import com.google.common.io.CharStreams; |
15 | 16 | import com.google.gson.reflect.TypeToken; |
16 | 17 | import io.kubernetes.client.models.V1Pod; |
17 | 18 | import io.kubernetes.client.models.V1Status; |
|
21 | 22 | import io.kubernetes.client.util.WebSockets; |
22 | 23 | import java.io.IOException; |
23 | 24 | import java.io.InputStream; |
| 25 | +import java.io.InputStreamReader; |
24 | 26 | import java.io.OutputStream; |
| 27 | +import java.io.Reader; |
25 | 28 | import java.lang.reflect.Type; |
26 | 29 | import java.util.HashMap; |
27 | 30 | import java.util.List; |
@@ -192,16 +195,12 @@ public Process exec( |
192 | 195 |
|
193 | 196 | static int parseExitCode(ApiClient client, InputStream inputStream) { |
194 | 197 | try { |
195 | | - int available = inputStream.available(); |
196 | | - |
197 | | - // Kubernetes returns no content when the exit code is 0 |
198 | | - if (available == 0) return 0; |
199 | | - |
200 | | - byte[] b = new byte[available]; |
201 | | - inputStream.read(b); |
202 | | - String body = new String(b, "UTF-8"); |
203 | | - |
204 | 198 | Type returnType = new TypeToken<V1Status>() {}.getType(); |
| 199 | + String body; |
| 200 | + try (final Reader reader = new InputStreamReader(inputStream)) { |
| 201 | + body = CharStreams.toString(reader); |
| 202 | + } |
| 203 | + |
205 | 204 | V1Status status = client.getJSON().deserialize(body, returnType); |
206 | 205 | if ("Success".equals(status.getStatus())) return 0; |
207 | 206 |
|
@@ -232,21 +231,34 @@ static int parseExitCode(ApiClient client, InputStream inputStream) { |
232 | 231 |
|
233 | 232 | private class ExecProcess extends Process { |
234 | 233 | private final WebSocketStreamHandler streamHandler; |
235 | | - private volatile int statusCode; |
| 234 | + private int statusCode = -1; |
| 235 | + private boolean isAlive = true; |
236 | 236 | private final Map<Integer, InputStream> input = new HashMap<>(); |
237 | 237 |
|
238 | 238 | public ExecProcess() throws IOException { |
239 | | - this.statusCode = -1; |
240 | 239 | this.streamHandler = |
241 | 240 | new WebSocketStreamHandler() { |
242 | 241 | @Override |
243 | | - public void close() { |
244 | | - if (statusCode == -1) { |
245 | | - int exitCode = parseExitCode(apiClient, ExecProcess.this.getInputStream(3)); |
| 242 | + protected void handleMessage(int stream, InputStream inStream) { |
| 243 | + if (stream == 3) { |
| 244 | + int exitCode = parseExitCode(apiClient, inStream); |
| 245 | + if (exitCode >= 0) { |
| 246 | + // notify of process completion |
| 247 | + synchronized (ExecProcess.this) { |
| 248 | + statusCode = exitCode; |
| 249 | + isAlive = false; |
| 250 | + ExecProcess.this.notifyAll(); |
| 251 | + } |
| 252 | + } |
| 253 | + } else super.handleMessage(stream, inStream); |
| 254 | + } |
246 | 255 |
|
247 | | - // notify of process completion |
248 | | - synchronized (ExecProcess.this) { |
249 | | - statusCode = exitCode; |
| 256 | + @Override |
| 257 | + public void close() { |
| 258 | + // notify of process completion |
| 259 | + synchronized (ExecProcess.this) { |
| 260 | + if (isAlive) { |
| 261 | + isAlive = false; |
250 | 262 | ExecProcess.this.notifyAll(); |
251 | 263 | } |
252 | 264 | } |
@@ -286,24 +298,29 @@ private synchronized InputStream getInputStream(int stream) { |
286 | 298 | public int waitFor() throws InterruptedException { |
287 | 299 | synchronized (this) { |
288 | 300 | this.wait(); |
| 301 | + return statusCode; |
289 | 302 | } |
290 | | - return statusCode; |
291 | 303 | } |
292 | 304 |
|
293 | 305 | @Override |
294 | 306 | public boolean waitFor(long timeout, TimeUnit unit) throws InterruptedException { |
295 | 307 | synchronized (this) { |
296 | 308 | this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit)); |
| 309 | + return !isAlive(); |
297 | 310 | } |
298 | | - return !isAlive(); |
299 | 311 | } |
300 | 312 |
|
301 | 313 | @Override |
302 | | - public int exitValue() { |
303 | | - if (statusCode == -1) throw new IllegalThreadStateException(); |
| 314 | + public synchronized int exitValue() { |
| 315 | + if (isAlive) throw new IllegalThreadStateException(); |
304 | 316 | return statusCode; |
305 | 317 | } |
306 | 318 |
|
| 319 | + @Override |
| 320 | + public synchronized boolean isAlive() { |
| 321 | + return isAlive; |
| 322 | + } |
| 323 | + |
307 | 324 | @Override |
308 | 325 | public void destroy() { |
309 | 326 | streamHandler.close(); |
|
0 commit comments