-
Notifications
You must be signed in to change notification settings - Fork 4.8k
HIVE-27224: Enhance drop table/partition command #5851
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
...store/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
Show resolved
Hide resolved
| } catch (InvalidOperationException e) { | ||
| // This means there are tables of something in the database | ||
| throw new InvalidOperationException("There are still objects in the default " + | ||
| "database for catalog " + catName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we also throw stack trace?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO the message can tell the enough about the exception, we don't need to pollute the log file
|
| public void drop_database_req(final DropDatabaseRequest req) | ||
| public AsyncOperationResp drop_database_req(final DropDatabaseRequest req) | ||
| throws NoSuchObjectException, InvalidOperationException, MetaException { | ||
| startFunction("drop_database", ": " + req.getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the operation is async, the recorded API time will be not the operation finished cost time, should we record it in the handler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice catch, will try the timing in handler
| LOG.warn("Error from isWritable", ex); | ||
| throw new MetaException("Table partition not deleted since " + dir.getParent() | ||
| + " access cannot be checked: " + ex.getMessage()); | ||
| return resp; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the returned partitions of resp is empty, the client can not know if it's finished or still running.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for add partitions, it's not asynchronous, this method wouldn't return until the handler finishes the job,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
| part_vals = getPartValsFromName(t, dropPartitionReq.getPartName()); | ||
| } | ||
| partNames.add(Warehouse.makePartName(t.getPartitionKeys(), part_vals)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| part_vals = getPartValsFromName(t, dropPartitionReq.getPartName()); | |
| } | |
| partNames.add(Warehouse.makePartName(t.getPartitionKeys(), part_vals)); | |
| partNames.add(dropPartitionReq.getPartName()); | |
| } else { | |
| partNames.add(Warehouse.makePartName(t.getPartitionKeys(), part_vals)); | |
| } |
...lone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
Show resolved
Hide resolved
...lone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
Show resolved
Hide resolved
| AbstractOperationHandler<DropDatabaseRequest, DropDatabaseHandler.DropDatabaseResult> dropDatabaseOp = | ||
| AbstractOperationHandler.offer(this, req); | ||
| AbstractOperationHandler.OperationStatus status = dropDatabaseOp.getOperationStatus(); | ||
| if (status.isFinished() && dropDatabaseOp.getResult() != null && dropDatabaseOp.getResult().isSuccess()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dropDatabaseOp.getResult() != null is redundant and can be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the handler is canceled, the dropDatabaseOp.getResult() might be null, will add the comment
| // a copy is required to allow incremental replication to work correctly. | ||
| if (func.getResourceUris() != null && !func.getResourceUris().isEmpty()) { | ||
| for (ResourceUri uri : func.getResourceUris()) { | ||
| if (uri.getUri().toLowerCase().startsWith("hdfs:") && needsCm) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function can fast-return if needsCm is false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice catch, will do
| private A result; | ||
| private boolean async; | ||
| private Future<A> future; | ||
| private ExecutorService executor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it better to use a shared thread pool for the operation handler? In the current implementation, the number of threads is not bounded, which could lead to resource exhaustion or even crashes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The number of handler's threads is limited by the max threads the thrift server can spawn, which is set by hive.metastore.server.max.threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In production, I think we shouldn't not have such a high drop databases/table operations happens near the same time, usually the database is the bottleneck before the Metastore hits the limit, if this is the case, we can tune down the hive.metastore.server.max.threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the async mode, a thread may trigger multiple operation handlers, so the hive.metastore.server.max.threads could not limit total threads here. If we configure a fixed size pool for the async operations, it can help limit service traffic to some extent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Usually the new handler runs inside the same thread as the parent, such as DropDatabaseHandler, involves multiple DropTableHandlers, these DropTableHandlers runs inside the same thread as the DropDatabaseHandler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Usually the new handler runs inside the same thread as the parent, such as DropDatabaseHandler, involves multiple DropTableHandlers, these DropTableHandlers run inside the same thread as the DropDatabaseHandler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I mean that for async request, the HMSHandler thread could create an operation handler where the executor starts a new thread. Then the HMSHandler thread finish this request immediatly and could handle another request where it may produce another new thread.
If such async request is frequent, it may lead to an explosion in the number of threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though Metastore handles the async request in background, the client doesn't, it pings the server for the status until the end:
Lines 1524 to 1529 in 79f63b6
| while (!resp.isFinished() && !Thread.currentThread().isInterrupted()) { | |
| resp = client.drop_database_req(req); | |
| if (resp.getMessage() != null) { | |
| LOG.info(resp.getMessage()); | |
| } | |
| } |
the client will know whether the request is successful or not as usual at the end, and the Metastore needs a handler thread to answer the request.
the HMSHandler thread finish this request immediatly
Line 179 in 79f63b6
result = async ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get();
Now it will wait for 5 seconds before answering the API for long running drop unless the request is satisfied within this timeout, then we can get the result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are customized clients based on ThriftHiveMetastore.Iface, which may not guarantee such behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
asyncDrop defaults to false, unless it's specified explicitly as true, then the customized client should take care of his case.
| dropRequest.setEnvContext(context); | ||
| // Drop the table but not its data | ||
| dropRequest.setDeleteData(false); | ||
| dropRequest.setDropPartitions(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to explicitly disable asyncDrop here:
dropRequest.setAsyncDrop(false);| AbstractOperationHandler<DropPartitionsRequest, DropPartitionsHandler.DropPartitionsResult> dropPartsOp = | ||
| AbstractOperationHandler.offer(this, request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Such calls can be changed as following, including other related api:
| AbstractOperationHandler<DropPartitionsRequest, DropPartitionsHandler.DropPartitionsResult> dropPartsOp = | |
| AbstractOperationHandler.offer(this, request); | |
| DropPartitionsHandler dropPartsOp = (DropPartitionsHandler) AbstractOperationHandler.offer(this, request); |
|
A couple of test failures seem to be related to this patch. |
| wh.addToChangeManagement(funcCmPath); | ||
| } | ||
| if (req.isDeleteData()) { | ||
| // Moving the data deletion out of the async handler. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should move this into the operation handler, because if a thrift client only calls this api once in async mode, then such cleanup code would never be run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In async mode, the client still need to ping the server for the operation status until the end, the client needs to know whether the request is a failure or not.
The main reason is the TUGIBasedProcessor/TUGIAssumingProcessor might close the shared FileSystem behind, causing the "java.io.IOException: Filesystem closed" for the handler running in background.
Still we need to address this "Filesystem closed" issue, as we don't know whether there are Filesystem operations in the Metastore listeners.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FileSystem.closeAllForUGI(clientUgi); in TUGIAssumingProcessor seems a bug, assume that there are two requests with same ugi to handle the same path uri concurrently, it may also hit the "Filesystem closed" issue.
This is indeed a tricky problem, not sure if we can only remove cache for inactive ugi to solve it. And for this thread, it still has an issue if the client crush between two pings before the operation handler finished, the cleanup code will not take effect either.
| return execute(); | ||
| } finally { | ||
| afterExecute(); | ||
| OPID_CLEANER.schedule(() -> OPID_TO_HANDLER.remove(id), 1, TimeUnit.HOURS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider adding a OPID_TO_STATUS? Because a thrift client should be able to get the result of an operation id even if the operation is finished, but now if the operation finished all things are removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the result is stored at the future,
Line 179 in 79f63b6
| result = async ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get(); |
the status can be determined by the status of this future, which would be cleared 1 hours later



What changes were proposed in this pull request?
Why are the changes needed?
Does this PR introduce any user-facing change?
How was this patch tested?