diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index 94308e0907510..75b7645b039ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -181,10 +181,7 @@ protected void removeQueryFuture(long reqId) { if (req.cancel()) { cancelIds.add(new CancelMessageId(req.id(), sndId)); - if (req.fields()) - removeFieldsQueryResult(sndId, req.id()); - else - removeQueryResult(sndId, req.id()); + removeQueryResult(sndId, req.id()); } else { if (!cancelIds.contains(new CancelMessageId(req.id(), sndId))) { @@ -207,10 +204,7 @@ protected void removeQueryFuture(long reqId) { if (info == null) return; - if (req.fields()) - runFieldsQuery(info); - else - runQuery(info); + runQuery(info); } catch (Throwable e) { U.error(log(), "Failed to run query.", e); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java index 5dd75d06fffa4..759aeb13ea39e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java @@ -98,10 +98,7 @@ private class LocalQueryRunnable implements GridPlainRunnable { try { qry.query().validate(); - if (fields()) - cctx.queries().runFieldsQuery(localQueryInfo()); - else - cctx.queries().runQuery(localQueryInfo()); + cctx.queries().runQuery(localQueryInfo()); } catch (Throwable e) { onDone(e); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 09d51ef644e83..8fe2c925bb10b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import java.sql.SQLException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; @@ -31,7 +30,6 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedList; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Queue; @@ -95,7 +93,6 @@ import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.GridEmptyCloseableIterator; import org.apache.ignite.internal.util.GridLeanMap; -import org.apache.ignite.internal.util.GridSpiCloseableIteratorWrapper; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -204,10 +201,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte /** */ private final ConcurrentMap>> fieldsQryRes = new ConcurrentHashMap<>(); - - /** */ - private volatile ConcurrentMap> qryResCache = new ConcurrentHashMap<>(); - + /** */ private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); @@ -371,14 +365,6 @@ private boolean isIndexingSpiEnabled() { return cctx.kernalContext().indexing().enabled(); } - /** - * - */ - private void invalidateResultCache() { - if (!qryResCache.isEmpty()) - qryResCache = new ConcurrentHashMap<>(); - } - /** * @param newRow New row. * @param prevRow Previous row. @@ -408,8 +394,6 @@ public void store(CacheDataRow newRow, @Nullable CacheDataRow prevRow, qryProc.store(cctx, newRow, prevRow, prevRowAvailable); } finally { - invalidateResultCache(); - leaveBusy(); } } @@ -439,8 +423,6 @@ public void remove(KeyCacheObject key, @Nullable CacheDataRow prevRow) qryProc.remove(cctx, prevRow); } finally { - invalidateResultCache(); - leaveBusy(); } } @@ -638,65 +620,6 @@ private QueryResult executeQuery(CacheQuery qry, return res; } - /** - * Performs fields query. - * - * @param qry Query. - * @param args Arguments. - * @param loc Local query or not. - * @param taskName Task name. - * @param rcpt ID of the recipient. - * @return Collection of found keys. - * @throws IgniteCheckedException In case of error. - */ - private FieldsResult executeFieldsQuery(CacheQuery qry, @Nullable Object[] args, - boolean loc, @Nullable String taskName, Object rcpt) throws IgniteCheckedException { - assert qry != null; - assert qry.type() == SQL_FIELDS : "Unexpected query type: " + qry.type(); - - if (qry.clause() == null) { - assert !loc; - - throw new IgniteCheckedException("Received next page request after iterator was removed. " + - "Consider increasing maximum number of stored iterators (see " + - "CacheConfiguration.getMaxQueryIteratorsCount() configuration property)."); - } - - if (cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { - cctx.gridEvents().record(new CacheQueryExecutedEvent<>( - cctx.localNode(), - "SQL fields query executed.", - EVT_CACHE_QUERY_EXECUTED, - CacheQueryType.SQL_FIELDS.name(), - cctx.name(), - null, - qry.clause(), - null, - null, - args, - securitySubjectId(cctx), - taskName)); - } - - // Attempt to get result from cache. - T2> resKey = new T2<>(qry.clause(), F.asList(args)); - - FieldsResult res = (FieldsResult)qryResCache.get(resKey); - - if (res != null && res.addRecipient(rcpt)) - return res; // Cached result found. - - res = new FieldsResult(rcpt); - - if (qryResCache.putIfAbsent(resKey, res) != null) - resKey = null; // Failed to cache result. - - if (resKey != null) - qryResCache.remove(resKey, res); - - return res; - } - /** * @param qry Query. * @return Cache set items iterator. @@ -844,185 +767,6 @@ static R injectResources(@Nullable R o, GridCacheContext cctx) throws return o; } - /** - * Processes fields query request. - * - * @param qryInfo Query info. - */ - protected void runFieldsQuery(final GridCacheQueryInfo qryInfo) { - assert qryInfo != null; - - if (!enterBusy()) { - if (cctx.localNodeId().equals(qryInfo.senderId())) - throw new IllegalStateException("Failed to process query request (grid is stopping)."); - - return; // Ignore remote requests when when node is stopping. - } - - try { - if (log.isDebugEnabled()) - log.debug("Running query: " + qryInfo); - - boolean rmvRes = true; - - FieldsResult res = null; - - final boolean statsEnabled = cctx.statisticsEnabled(); - - final boolean readEvt = cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); - - try { - // Preparing query closures. - final IgniteReducer rdc = injectResources((IgniteReducer)qryInfo.reducer(), cctx); - - CacheQuery qry = qryInfo.query(); - - int pageSize = qry.pageSize(); - - Collection data = null; - Collection entities = null; - - boolean isWriteData = qryInfo.local() || rdc != null || cctx.isLocalNode(qryInfo.senderId()); - - if (isWriteData) - data = new ArrayList<>(pageSize); - else - entities = new ArrayList<>(pageSize); - - String taskName = cctx.kernalContext().task().resolveTaskName(qry.taskHash()); - - res = qryInfo.local() ? - executeFieldsQuery(qry, qryInfo.arguments(), qryInfo.local(), taskName, - recipient(qryInfo.senderId(), qryInfo.requestId())) : - fieldsQueryResult(qryInfo, taskName); - - GridCloseableIterator it = new GridSpiCloseableIteratorWrapper( - res.iterator(recipient(qryInfo.senderId(), qryInfo.requestId()))); - - if (log.isDebugEnabled()) - log.debug("Received fields iterator [iterHasNext=" + it.hasNext() + ']'); - - if (!it.hasNext()) { - if (rdc != null) - data = Collections.singletonList(rdc.reduce()); - - onFieldsPageReady(qryInfo.local(), qryInfo, entities, data, true, null); - - return; - } - - int cnt = 0; - - while (!Thread.currentThread().isInterrupted() && it.hasNext()) { - long start = statsEnabled ? System.nanoTime() : 0L; - - Object row = it.next(); - - // Query is cancelled. - if (row == null) { - onPageReady(qryInfo.local(), qryInfo, null, null, true, null); - - break; - } - - if (statsEnabled) { - CacheMetricsImpl metrics = cctx.cache().metrics0(); - - metrics.onRead(true); - - metrics.addGetTimeNanos(System.nanoTime() - start); - } - - if (readEvt && cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ)) { - cctx.gridEvents().record(new CacheQueryReadEvent( - cctx.localNode(), - "SQL fields query result set row read.", - EVT_CACHE_QUERY_OBJECT_READ, - CacheQueryType.SQL_FIELDS.name(), - cctx.name(), - null, - qry.clause(), - null, - null, - qryInfo.arguments(), - securitySubjectId(cctx), - taskName, - null, - null, - null, - row)); - } - - if (isWriteData) { - // Reduce. - if (rdc != null) { - if (!rdc.collect(row)) - break; - } - else - data.add(row); - } - else - entities.add(row); - - if (rdc == null && ((!qryInfo.allPages() && ++cnt == pageSize) || !it.hasNext())) { - onFieldsPageReady(qryInfo.local(), qryInfo, entities, data, !it.hasNext(), null); - - if (it.hasNext()) - rmvRes = false; - - if (!qryInfo.allPages()) - return; - } - } - - if (rdc != null) - onFieldsPageReady(qryInfo.local(), qryInfo, null, Collections.singletonList(rdc.reduce()), true, null); - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled() || !e.hasCause(SQLException.class)) - U.error(log, "Failed to run fields query [qry=" + qryInfo + ", node=" + cctx.nodeId() + ']', e); - else { - if (e.hasCause(SQLException.class)) - U.error(log, "Failed to run fields query [node=" + cctx.nodeId() + - ", msg=" + e.getCause(SQLException.class).getMessage() + ']'); - else - U.error(log, "Failed to run fields query [node=" + cctx.nodeId() + - ", msg=" + e.getMessage() + ']'); - } - - onFieldsPageReady(qryInfo.local(), qryInfo, null, null, true, e); - } - catch (Throwable e) { - U.error(log, "Failed to run fields query [qry=" + qryInfo + ", node=" + cctx.nodeId() + "]", e); - - onFieldsPageReady(qryInfo.local(), qryInfo, null, null, true, e); - - if (e instanceof Error) - throw (Error)e; - } - finally { - if (qryInfo.local()) { - // Don't we need to always remove local iterators? - if (rmvRes && res != null) { - try { - res.closeIfNotShared(recipient(qryInfo.senderId(), qryInfo.requestId())); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to close local iterator [qry=" + qryInfo + ", node=" + - cctx.nodeId() + "]", e); - } - } - } - else if (rmvRes) - removeFieldsQueryResult(qryInfo.senderId(), qryInfo.requestId()); - } - } - finally { - leaveBusy(); - } - } - /** * Processes cache query request. * @@ -1544,121 +1288,6 @@ private static Object recipient(UUID sndId, long reqId) { return new IgniteBiTuple<>(sndId, reqId); } - /** - * @param qryInfo Info. - * @return Iterator. - * @throws IgniteCheckedException In case of error. - */ - private FieldsResult fieldsQueryResult(GridCacheQueryInfo qryInfo, String taskName) - throws IgniteCheckedException { - final UUID sndId = qryInfo.senderId(); - - assert sndId != null; - - Map> iters = fieldsQryRes.get(sndId); - - if (iters == null) { - iters = new LinkedHashMap>(16, 0.75f, true) { - @Override protected boolean removeEldestEntry(Map.Entry> e) { - boolean rmv = size() > maxIterCnt; - - if (rmv) { - try { - e.getValue().get().closeIfNotShared(recipient(sndId, e.getKey())); - } - catch (IgniteCheckedException ex) { - U.error(log, "Failed to close fields query iterator.", ex); - } - } - - return rmv; - } - - @Override public boolean equals(Object o) { - return o == this; - } - }; - - Map> old = fieldsQryRes.putIfAbsent(sndId, iters); - - if (old != null) - iters = old; - } - - return fieldsQueryResult(iters, qryInfo, taskName); - } - - /** - * @param resMap Results map. - * @param qryInfo Info. - * @return Fields query result. - * @throws IgniteCheckedException In case of error. - */ - @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") - private FieldsResult fieldsQueryResult(Map> resMap, - GridCacheQueryInfo qryInfo, String taskName) throws IgniteCheckedException { - assert resMap != null; - assert qryInfo != null; - - GridFutureAdapter fut; - - boolean exec = false; - - synchronized (resMap) { - fut = resMap.get(qryInfo.requestId()); - - if (fut == null) { - resMap.put(qryInfo.requestId(), fut = new GridFutureAdapter<>()); - - exec = true; - } - } - - if (exec) { - try { - fut.onDone(executeFieldsQuery(qryInfo.query(), qryInfo.arguments(), false, - taskName, recipient(qryInfo.senderId(), qryInfo.requestId()))); - } - catch (IgniteCheckedException e) { - fut.onDone(e); - } - } - - return fut.get(); - } - - /** - * @param sndId Sender node ID. - * @param reqId Request ID. - */ - @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") - protected void removeFieldsQueryResult(@Nullable UUID sndId, long reqId) { - if (sndId == null) - return; - - Map> futs = fieldsQryRes.get(sndId); - - if (futs != null) { - IgniteInternalFuture fut; - - synchronized (futs) { - fut = futs.remove(reqId); - } - - if (fut != null) { - assert fut.isDone(); - - try { - fut.get().closeIfNotShared(recipient(sndId, reqId)); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to close iterator.", e); - } - } - } - } - /** * Called when data for page is ready. *