Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings
This repository was archived by the owner on Jun 6, 2025. It is now read-only.
/GafferPublic archive

Gh-3322: Cache updates for federated POC#3323

Merged
wb36499 merged 10 commits intodevelopfromgh-3322-cache-updates-federated-poc
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line numberDiff line numberDiff line change
Expand Up@@ -128,6 +128,7 @@ private Collection<Operation> resolveNamedOperations(final Operation operation,
.getOperationChain(namedOperation.getParameters());
// Update the operation inputs and add operation chain to the updated list
OperationHandlerUtil.updateOperationInput(namedOperationChain, namedOperation.getInput());
namedOperationChain.setOptions(namedOperation.getOptions());

// Run again to resolve any nested operations in the chain before adding
namedOperationChain.updateOperations(resolveNamedOperations(namedOperationChain, user, depth + 1));
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -37,6 +37,7 @@
import uk.gov.gchq.gaffer.federated.simple.operation.GetAllGraphIds;
import uk.gov.gchq.gaffer.federated.simple.operation.GetAllGraphInfo;
import uk.gov.gchq.gaffer.federated.simple.operation.RemoveGraph;
import uk.gov.gchq.gaffer.federated.simple.operation.handler.EitherOperationHandler;
import uk.gov.gchq.gaffer.federated.simple.operation.handler.FederatedOperationHandler;
import uk.gov.gchq.gaffer.federated.simple.operation.handler.FederatedOutputHandler;
import uk.gov.gchq.gaffer.federated.simple.operation.handler.add.AddGraphHandler;
Expand All@@ -46,6 +47,13 @@
import uk.gov.gchq.gaffer.federated.simple.operation.handler.misc.ChangeGraphIdHandler;
import uk.gov.gchq.gaffer.federated.simple.operation.handler.misc.RemoveGraphHandler;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.named.operation.AddNamedOperation;
import uk.gov.gchq.gaffer.named.operation.DeleteNamedOperation;
import uk.gov.gchq.gaffer.named.operation.GetAllNamedOperations;
import uk.gov.gchq.gaffer.named.operation.NamedOperation;
import uk.gov.gchq.gaffer.named.view.AddNamedView;
import uk.gov.gchq.gaffer.named.view.DeleteNamedView;
import uk.gov.gchq.gaffer.named.view.GetAllNamedViews;
import uk.gov.gchq.gaffer.operation.Operation;
import uk.gov.gchq.gaffer.operation.OperationChain;
import uk.gov.gchq.gaffer.operation.OperationException;
Expand All@@ -54,6 +62,7 @@
import uk.gov.gchq.gaffer.operation.impl.get.GetAdjacentIds;
import uk.gov.gchq.gaffer.operation.impl.get.GetAllElements;
import uk.gov.gchq.gaffer.operation.impl.get.GetElements;
import uk.gov.gchq.gaffer.operation.impl.get.GetGraphCreatedTime;
import uk.gov.gchq.gaffer.serialisation.Serialiser;
import uk.gov.gchq.gaffer.serialisation.ToBytesSerialiser;
import uk.gov.gchq.gaffer.store.Context;
Expand All@@ -64,9 +73,18 @@
import uk.gov.gchq.gaffer.store.operation.DeleteAllData;
import uk.gov.gchq.gaffer.store.operation.GetSchema;
import uk.gov.gchq.gaffer.store.operation.GetTraits;
import uk.gov.gchq.gaffer.store.operation.handler.GetGraphCreatedTimeHandler;
import uk.gov.gchq.gaffer.store.operation.handler.OperationChainHandler;
import uk.gov.gchq.gaffer.store.operation.OperationChainValidator;
import uk.gov.gchq.gaffer.store.operation.handler.OperationHandler;
import uk.gov.gchq.gaffer.store.operation.handler.OutputOperationHandler;
import uk.gov.gchq.gaffer.store.operation.handler.named.AddNamedOperationHandler;
import uk.gov.gchq.gaffer.store.operation.handler.named.AddNamedViewHandler;
import uk.gov.gchq.gaffer.store.operation.handler.named.DeleteNamedOperationHandler;
import uk.gov.gchq.gaffer.store.operation.handler.named.DeleteNamedViewHandler;
import uk.gov.gchq.gaffer.store.operation.handler.named.GetAllNamedOperationsHandler;
import uk.gov.gchq.gaffer.store.operation.handler.named.GetAllNamedViewsHandler;
import uk.gov.gchq.gaffer.store.operation.handler.named.NamedOperationHandler;
import uk.gov.gchq.gaffer.store.schema.Schema;
import uk.gov.gchq.gaffer.store.schema.ViewValidator;

Expand All@@ -84,6 +102,7 @@
import static uk.gov.gchq.gaffer.accumulostore.utils.TableUtils.renameTable;
import static uk.gov.gchq.gaffer.cache.CacheServiceLoader.DEFAULT_SERVICE_NAME;
import static uk.gov.gchq.gaffer.federated.simple.FederatedStoreProperties.PROP_DEFAULT_GRAPH_IDS;
import static uk.gov.gchq.gaffer.federated.simple.FederatedStoreProperties.PROP_GRAPH_CACHE_NAME;

/**
* The federated store implementation. Provides the set up and required
Expand DownExpand Up@@ -293,16 +312,6 @@ public Schema getSchema(final List<GraphSerialisable> graphs) {
}
}

/**
* Access to getting the operations that have handlers specific to this
* store.
*
* @return The Operation classes handled by this store.
*/
public Set<Class<? extends Operation>> getStoreSpecificOperations() {
return storeHandlers.keySet();
}

@Override
public void initialise(final String graphId, final Schema unused, final StoreProperties properties) throws StoreException {
if (unused != null) {
Expand All@@ -311,7 +320,7 @@ public void initialise(final String graphId, final Schema unused, final StorePro
super.initialise(graphId, new Schema(), properties);

// Init the cache for graphs
graphCache = new Cache<>("federatedGraphCache-" + graphId);
graphCache = new Cache<>(properties.get(PROP_GRAPH_CACHE_NAME, "federatedGraphCache_" + graphId));

// Get and set default graph IDs from properties
if (properties.containsKey(PROP_DEFAULT_GRAPH_IDS)) {
Expand DownExpand Up@@ -347,11 +356,34 @@ protected Object doUnhandledOperation(final Operation operation, final Context c
@Override
protected void addAdditionalOperationHandlers() {
storeHandlers.forEach(this::addOperationHandler);

final String namedOpCacheSuffix = getProperties().getCacheServiceNamedOperationSuffix(getGraphId());
final String namedViewCacheSuffix = getProperties().getCacheServiceNamedViewSuffix(getGraphId());
final Boolean nestedNamedOpsAllowed = getProperties().isNestedNamedOperationAllow();

// Add overrides as cache operations could be run locally or on sub graphs
if (getProperties().getNamedOperationEnabled()) {
addOperationHandler(NamedOperation.class, new EitherOperationHandler<>(new NamedOperationHandler()));
addOperationHandler(AddNamedOperation.class, new EitherOperationHandler<>(
new AddNamedOperationHandler(namedOpCacheSuffix, nestedNamedOpsAllowed)));
addOperationHandler(GetAllNamedOperations.class, new EitherOperationHandler<>(new GetAllNamedOperationsHandler(namedOpCacheSuffix)));
addOperationHandler(DeleteNamedOperation.class, new EitherOperationHandler<>(new DeleteNamedOperationHandler(namedOpCacheSuffix)));
}

// Named Views could be either
if (getProperties().getNamedViewEnabled()) {
addOperationHandler(AddNamedView.class, new EitherOperationHandler<>(new AddNamedViewHandler(namedViewCacheSuffix)));
addOperationHandler(GetAllNamedViews.class, new EitherOperationHandler<>(new GetAllNamedViewsHandler(namedViewCacheSuffix)));
addOperationHandler(DeleteNamedView.class, new EitherOperationHandler<>(new DeleteNamedViewHandler(namedViewCacheSuffix)));
}

// Misc operations that could be for sub graphs or not
addOperationHandler(GetGraphCreatedTime.class, new EitherOperationHandler<>(new GetGraphCreatedTimeHandler()));
}

@Override
protected OperationHandler<? extends OperationChain<?>> getOperationChainHandler() {
return newFederatedOperationHandler<>();
return newEitherOperationHandler<>(new OperationChainHandler<>(getOperationChainValidator(), getOperationChainOptimisers()));
}

@Override
Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -34,6 +34,11 @@ public class FederatedStoreProperties extends StoreProperties {
* Property key for setting if public graphs can be added to the store or not
*/
public static final String PROP_ALLOW_PUBLIC_GRAPHS = "gaffer.store.federated.allowPublicGraphs";
/**
* Property key for setting a custom name for the graph cache, by default
* this will be "federatedGraphCache_" followed by the federated graph ID.
*/
public static final String PROP_GRAPH_CACHE_NAME = "gaffer.store.federated.graphCache.name";
/**
* Property key for the class to use when merging number results
*/
Expand Down
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package uk.gov.gchq.gaffer.federated.simple.operation.handler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import uk.gov.gchq.gaffer.operation.Operation;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.store.Context;
import uk.gov.gchq.gaffer.store.Store;
import uk.gov.gchq.gaffer.store.operation.handler.OperationHandler;

/**
* Custom handler for operations that could in theory target sub graphs or the
* federated store directly.
*/
public class EitherOperationHandler<O extends Operation> implements OperationHandler<O> {
private static final Logger LOGGER = LoggerFactory.getLogger(EitherOperationHandler.class);

private final OperationHandler<O> standardHandler;

public EitherOperationHandler(final OperationHandler<O> standardHandler) {
this.standardHandler = standardHandler;
}

/**
* If graph IDs are in the options the operation will be handled by a
* {@link FederatedOperationHandler}, otherwise the original handler will be
* used e.g. executed on the federated store directly.
*/
@Override
public Object doOperation(final O operation, final Context context, final Store store) throws OperationException {
LOGGER.debug("Checking if Operation should be handled locally or on sub graphs: {}", operation);

// If we have graph IDs then run as a federated operation
if (operation.containsOption(FederatedOperationHandler.OPT_GRAPH_IDS) ||
operation.containsOption(FederatedOperationHandler.OPT_SHORT_GRAPH_IDS) ||
operation.containsOption(FederatedOperationHandler.OPT_EXCLUDE_GRAPH_IDS) ||
operation.containsOption(FederatedOperationHandler.OPT_USE_DFLT_GRAPH_IDS)) {
LOGGER.debug("Operation has specified graph IDs, it will be handled by sub graphs");
return new FederatedOperationHandler<>().doOperation(operation, context, store);
}

// No sub graphs involved just run the handler for this operations on the federated store
return standardHandler.doOperation(operation, context, store);
}
}
Original file line numberDiff line numberDiff line change
Expand Up@@ -25,19 +25,16 @@
import uk.gov.gchq.gaffer.federated.simple.access.GraphAccess;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.operation.Operation;
import uk.gov.gchq.gaffer.operation.OperationChain;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.operation.io.Output;
import uk.gov.gchq.gaffer.store.Context;
import uk.gov.gchq.gaffer.store.Store;
import uk.gov.gchq.gaffer.store.operation.handler.OperationChainHandler;
import uk.gov.gchq.gaffer.store.operation.handler.OperationHandler;

import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand DownExpand Up@@ -68,51 +65,41 @@ public class FederatedOperationHandler<P extends Operation> implements Operation
public static final String OPT_EXCLUDE_GRAPH_IDS = "federated.excludeGraphIds";

/**
* The boolean operation option to specify if element merging should be applied or not.
* A boolean option to specify to use the default graph IDs. The option is
* not specifically required as default graph IDs will be used as a
* fallback, but if set the whole chain will be forwarded rather than each
* individual operation so can speed things up.
*/
public static final StringOPT_AGGREGATE_ELEMENTS = "federated.aggregateElements";
public static final StringOPT_USE_DFLT_GRAPH_IDS = "federated.useDefaultGraphIds";

/**
*A boolean option to specify ifto forward the whole operation chain to the sub graph or not.
*The booleanoperationoption to specify ifelement merging should be applied or not.
*/
public static final StringOPT_FORWARD_CHAIN = "federated.forwardChain";
public static final StringOPT_AGGREGATE_ELEMENTS = "federated.aggregateElements";

/**
* A boolean option to specify if a graph should be skipped if execution
* fails on it e.g. continue executing on the rest of the graphs
*/
public static final String OPT_SKIP_FAILED_EXECUTE = "federated.skipGraphOnFail";

/**
* A boolean option to specify if the results from each graph should be kept
* separate. If set this will return a map where each key value is the graph
* ID and its respective result.
*/
public static final String OPT_SEPARATE_RESULTS = "federated.separateResults";

@Override
public Object doOperation(final P operation, final Context context, final Store store) throws OperationException {
LOGGER.debug("Running operation: {}", operation);
// Check inside operation chains in case there are operations that don't require running on sub graphs
if (operation instanceof OperationChain) {
Set<Class<? extends Operation>> storeSpecificOps = ((FederatedStore) store).getStoreSpecificOperations();
List<Class<? extends Operation>> chainOps = ((OperationChain<?>) operation).flatten().stream()
.map(Operation::getClass)
.collect(Collectors.toList());

// If all the operations in the chain can be handled by the store then execute them.
// Or if told not to forward the whole chain process each operation individually.
if (storeSpecificOps.containsAll(chainOps) ||
(!Boolean.parseBoolean(operation.getOption(OPT_FORWARD_CHAIN, "true")))) {
// Use default handler
return new OperationChainHandler<>(store.getOperationChainValidator(), store.getOperationChainOptimisers())
.doOperation((OperationChain<Object>) operation, context, store);
}

// Check if we have a mix as that is an issue
// It's better to keep federated and non federated separate so error and report back
if (!Collections.disjoint(storeSpecificOps, chainOps)) {
throw new OperationException(
"Chain contains standard Operations alongside federated store specific Operations."
+ " Please submit each type separately or set: '" + OPT_FORWARD_CHAIN + "' to: 'false'.");
}
}

// If the operation has output wrap and return using sub class handler
if (operation instanceof Output) {
// Should we keep the results separate
if (Boolean.parseBoolean(operation.getOption(OPT_SEPARATE_RESULTS, "false"))) {
return new SeparateOutputHandler<>().doOperation((Output) operation, context, store);
}
return new FederatedOutputHandler<>().doOperation((Output) operation, context, store);
}

Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -65,7 +65,7 @@ public O doOperation(final P operation, final Context context, final Store store
}

// Not expecting any output so exit since we've executed
if (operation.getOutputClass().isAssignableFrom(Void.class) || graphResults.isEmpty()) {
if (operation.getOutputClass() ==Void.class || graphResults.isEmpty()) {
return null;
}

Expand Down
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package uk.gov.gchq.gaffer.federated.simple.operation.handler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import uk.gov.gchq.gaffer.federated.simple.FederatedStore;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.operation.io.Output;
import uk.gov.gchq.gaffer.store.Context;
import uk.gov.gchq.gaffer.store.Store;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Handler for running federated operations but keeping the results separate
* under a key of the graph ID the results come from.
*/
public class SeparateOutputHandler<P extends Output<O>, O> extends FederatedOperationHandler<P> {
private static final Logger LOGGER = LoggerFactory.getLogger(SeparateOutputHandler.class);

@Override
public Map<String, O> doOperation(final P operation, final Context context, final Store store) throws OperationException {
List<GraphSerialisable> graphsToExecute = this.getGraphsToExecuteOn(operation, context, (FederatedStore) store);

if (graphsToExecute.isEmpty()) {
return new HashMap<>();
}

// Execute the operation chain on each graph
LOGGER.debug("Returning separated graph results");
Map<String, O> results = new HashMap<>();
for (final GraphSerialisable gs : graphsToExecute) {
try {
results.put(gs.getGraphId(), gs.getGraph().execute(operation, context.getUser()));
} catch (final OperationException | UnsupportedOperationException e) {
// Optionally skip this error if user has specified to do so
LOGGER.error("Operation failed on graph: {}", gs.getGraphId());
if (!Boolean.parseBoolean(operation.getOption(OPT_SKIP_FAILED_EXECUTE, "false"))) {
throw e;
}
}
}

return results;
}
}
Loading
Loading

[8]ページ先頭

©2009-2025 Movatter.jp