/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.forecast.transport;

import java.util.HashMap;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionType;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.forecast.indices.ForecastIndex;
import org.opensearch.forecast.indices.ForecastIndexManagement;
import org.opensearch.forecast.ml.ForecastModelManager;
import org.opensearch.forecast.model.ForecastResult;
import org.opensearch.forecast.model.ForecastTask;
import org.opensearch.forecast.model.ForecastTaskType;
import org.opensearch.forecast.model.Forecaster;
import org.opensearch.forecast.settings.ForecastEnabledSetting;
import org.opensearch.forecast.settings.ForecastNumericSetting;
import org.opensearch.forecast.settings.ForecastSettings;
import org.opensearch.forecast.stats.ForecastStats;
import org.opensearch.forecast.task.ForecastTaskManager;
import org.opensearch.forecast.transport.EntityForecastResultAction;
import org.opensearch.forecast.transport.ForecastResultProcessor;
import org.opensearch.forecast.transport.ForecastResultRequest;
import org.opensearch.forecast.transport.ForecastResultResponse;
import org.opensearch.forecast.transport.ForecastRunOnceAction;
import org.opensearch.forecast.transport.ForecastRunOnceProfileAction;
import org.opensearch.forecast.transport.ForecastRunOnceProfileRequest;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.ExistsQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHits;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.breaker.CircuitBreakerService;
import org.opensearch.timeseries.cluster.HashRing;
import org.opensearch.timeseries.feature.FeatureManager;
import org.opensearch.timeseries.model.Config;
import org.opensearch.timeseries.model.Job;
import org.opensearch.timeseries.model.TaskState;
import org.opensearch.timeseries.stats.StatNames;
import org.opensearch.timeseries.task.TaskCacheManager;
import org.opensearch.timeseries.transport.ResultProcessor;
import org.opensearch.timeseries.util.ExceptionUtil;
import org.opensearch.timeseries.util.ParseUtils;
import org.opensearch.timeseries.util.SecurityClientUtil;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.client.Client;

public class ForecastRunOnceTransportAction
extends HandledTransportAction<ForecastResultRequest, ForecastResultResponse> {
    private static final Logger LOG = LogManager.getLogger(ForecastRunOnceTransportAction.class);
    private static final int MAX_RETRIES = 3;
    private static final long BASE_DELAY_MS = 1000L;
    private static final int POLL_FREQ = 10;
    private static final int MAX_WAIT_TIMES = 100;
    private ResultProcessor<ForecastResultRequest, ForecastResult, ForecastResultResponse, TaskCacheManager, ForecastTaskType, ForecastTask, ForecastIndex, ForecastIndexManagement, ForecastTaskManager> resultProcessor = null;
    private final Client client;
    private CircuitBreakerService circuitBreakerService;
    private final NodeStateManager nodeStateManager;
    private final Settings settings;
    private final ClusterService clusterService;
    private final ThreadPool threadPool;
    private final HashRing hashRing;
    private final TransportService transportService;
    private final ForecastTaskManager taskManager;
    private final NamedXContentRegistry xContentRegistry;
    private final SecurityClientUtil clientUtil;
    private final IndexNameExpressionResolver indexNameExpressionResolver;
    private final FeatureManager featureManager;
    private final ForecastStats forecastStats;
    private volatile Boolean filterByEnabled;
    protected volatile Integer maxSingleStreamForecasters;
    protected volatile Integer maxHCForecasters;
    protected volatile Integer maxForecastFeatures;
    protected volatile Integer maxCategoricalFields;

    @Inject
    public ForecastRunOnceTransportAction(ActionFilters actionFilters, TransportService transportService, Settings settings, Client client, SecurityClientUtil clientUtil, NodeStateManager nodeStateManager, FeatureManager featureManager, ForecastModelManager modelManager, HashRing hashRing, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, CircuitBreakerService circuitBreakerService, ForecastStats forecastStats, ThreadPool threadPool, NamedXContentRegistry xContentRegistry, ForecastTaskManager realTimeTaskManager) {
        super(ForecastRunOnceAction.NAME, transportService, actionFilters, ForecastResultRequest::new);
        this.settings = settings;
        this.clusterService = clusterService;
        this.threadPool = threadPool;
        this.hashRing = hashRing;
        this.transportService = transportService;
        this.taskManager = realTimeTaskManager;
        this.xContentRegistry = xContentRegistry;
        this.clientUtil = clientUtil;
        this.indexNameExpressionResolver = indexNameExpressionResolver;
        this.featureManager = featureManager;
        this.forecastStats = forecastStats;
        this.client = client;
        this.circuitBreakerService = circuitBreakerService;
        this.nodeStateManager = nodeStateManager;
        this.filterByEnabled = (Boolean)ForecastSettings.FORECAST_FILTER_BY_BACKEND_ROLES.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(ForecastSettings.FORECAST_FILTER_BY_BACKEND_ROLES, it -> {
            this.filterByEnabled = it;
        });
        this.maxSingleStreamForecasters = (Integer)ForecastSettings.MAX_SINGLE_STREAM_FORECASTERS.get(settings);
        this.maxHCForecasters = (Integer)ForecastSettings.MAX_HC_FORECASTERS.get(settings);
        this.maxForecastFeatures = 1;
        this.maxCategoricalFields = ForecastNumericSetting.maxCategoricalFields();
        clusterService.getClusterSettings().addSettingsUpdateConsumer(ForecastSettings.MAX_SINGLE_STREAM_FORECASTERS, it -> {
            this.maxSingleStreamForecasters = it;
        });
        clusterService.getClusterSettings().addSettingsUpdateConsumer(ForecastSettings.MAX_HC_FORECASTERS, it -> {
            this.maxHCForecasters = it;
        });
    }

    protected void doExecute(Task task, ForecastResultRequest request, ActionListener<ForecastResultResponse> listener) {
        String forecastID = request.getConfigId();
        User user = ParseUtils.getUserContext(this.client);
        try (ThreadContext.StoredContext context = this.client.threadPool().getThreadContext().stashContext();){
            ParseUtils.verifyResourceAccessAndProcessRequest(this.settings, args -> this.executeRunOnce(forecastID, request, listener), new Object[0], fallbackArgs -> ParseUtils.resolveUserAndExecute(user, forecastID, this.filterByEnabled, listener, forecaster -> this.executeRunOnce(forecastID, request, listener), this.client, this.clusterService, this.xContentRegistry, Forecaster.class), new Object[0]);
        }
        catch (Exception e) {
            LOG.error((Object)e);
            listener.onFailure((Exception)new OpenSearchStatusException("Failed to run once forecaster " + forecastID, RestStatus.INTERNAL_SERVER_ERROR, new Object[0]));
        }
    }

    private void executeRunOnce(String forecastID, ForecastResultRequest request, ActionListener<ForecastResultResponse> listener) {
        if (!ForecastEnabledSetting.isForecastEnabled()) {
            listener.onFailure((Exception)new OpenSearchStatusException("Forecast functionality is disabled. To enable update plugins.forecast.enabled to true", RestStatus.FORBIDDEN, new Object[0]));
            return;
        }
        if (this.circuitBreakerService.isOpen().booleanValue()) {
            listener.onFailure((Exception)new OpenSearchStatusException("The total OpenSearch memory usage exceeds our threshold, opening the memory circuit.", RestStatus.SERVICE_UNAVAILABLE, new Object[0]));
            return;
        }
        this.client.execute((ActionType)ForecastRunOnceProfileAction.INSTANCE, (ActionRequest)new ForecastRunOnceProfileRequest(forecastID, new String[0]), ActionListener.wrap(r -> {
            if (r.isAnswerTrue()) {
                listener.onFailure((Exception)new OpenSearchStatusException("cannot start a new test for " + forecastID + " since current test hasn't finished.", RestStatus.CONFLICT, new Object[0]));
            } else {
                this.nodeStateManager.getJob(forecastID, (ActionListener<Optional<Job>>)ActionListener.wrap(jobOptional -> {
                    if (jobOptional.isPresent() && ((Job)jobOptional.get()).isEnabled()) {
                        listener.onFailure((Exception)new OpenSearchStatusException("Cannot run once " + forecastID + " when real time job is running.", RestStatus.CONFLICT, new Object[0]));
                        return;
                    }
                    this.triggerRunOnce(forecastID, request, listener);
                }, e -> {
                    if (e instanceof IndexNotFoundException) {
                        this.triggerRunOnce(forecastID, request, listener);
                    } else {
                        LOG.error(e);
                        listener.onFailure((Exception)new OpenSearchStatusException("Fail to verify if job " + forecastID + " starts or not.", RestStatus.CONFLICT, new Object[0]));
                    }
                }));
            }
        }, e -> {
            LOG.error(e);
            listener.onFailure((Exception)new OpenSearchStatusException("Failed to run once forecaster " + forecastID, RestStatus.INTERNAL_SERVER_ERROR, new Object[0]));
        }));
    }

    private void checkIfRunOnceFinished(String forecastID, String taskId, AtomicInteger waitTimes) {
        this.client.execute((ActionType)ForecastRunOnceProfileAction.INSTANCE, (ActionRequest)new ForecastRunOnceProfileRequest(forecastID, new String[0]), ActionListener.wrap(r -> {
            if (r.isAnswerTrue()) {
                this.handleRunOnceNotFinished(forecastID, taskId, waitTimes, r.getExceptionMsg());
            } else {
                this.handleRunOnceFinished(forecastID, taskId, r.getExceptionMsg());
            }
        }, e -> {
            LOG.error("Failed to profile run once of forecaster " + forecastID, (Throwable)e);
            this.handleRunOnceNotFinished(forecastID, taskId, waitTimes, ExceptionUtil.getErrorMessage(e));
        }));
    }

    private void handleRunOnceNotFinished(String forecastID, String taskId, AtomicInteger waitTimes, String exceptionMsg) {
        if (waitTimes.get() < 100) {
            waitTimes.addAndGet(1);
            this.threadPool.schedule(() -> this.checkIfRunOnceFinished(forecastID, taskId, waitTimes), new TimeValue(10L, TimeUnit.SECONDS), "forecast-threadpool");
            if (!Strings.isEmpty((CharSequence)exceptionMsg)) {
                this.updateTaskError(forecastID, taskId, exceptionMsg);
            }
        } else {
            LOG.warn("Timed out run once of forecaster {}", (Object)forecastID);
            this.updateTaskState(forecastID, taskId, TaskState.INACTIVE);
        }
    }

    private void handleRunOnceFinished(String forecastID, String taskId, String exceptionMsg) {
        LOG.info("Run once of forecaster {} finished", (Object)forecastID);
        this.nodeStateManager.getConfig(forecastID, AnalysisType.FORECAST, false, (ActionListener<Optional<? extends Config>>)ActionListener.wrap(configOptional -> {
            if (configOptional.isEmpty()) {
                this.updateTaskState(forecastID, taskId, TaskState.INACTIVE);
                return;
            }
            this.checkForecastResults(forecastID, taskId, (Config)configOptional.get(), exceptionMsg);
        }, e -> {
            LOG.error("Fail to get config", (Throwable)e);
            this.updateTaskState(forecastID, taskId, TaskState.INACTIVE);
        }));
    }

    private void checkForecastResults(String forecastID, String taskId, Config config, String exceptionMsg) {
        BoolQueryBuilder filterQuery = new BoolQueryBuilder();
        filterQuery.filter((QueryBuilder)QueryBuilders.termQuery((String)"forecaster_id", (String)forecastID));
        ExistsQueryBuilder forecastsExistFilter = QueryBuilders.existsQuery((String)"forecast_value");
        filterQuery.must((QueryBuilder)forecastsExistFilter);
        filterQuery.filter((QueryBuilder)QueryBuilders.termQuery((String)"task_id", (String)taskId));
        SearchSourceBuilder source = new SearchSourceBuilder().query((QueryBuilder)filterQuery).size(1);
        SearchRequest request = new SearchRequest(new String[]{"opensearch-forecast-results*"});
        request.source(source);
        if (config.getCustomResultIndexOrAlias() != null) {
            request.indices(new String[]{config.getCustomResultIndexPattern()});
        }
        this.performSearchWithRetry(forecastID, taskId, request, 0, exceptionMsg);
    }

    private void updateTaskError(String forecastID, String taskId, String exceptionMsg) {
        this.updateTask(forecastID, taskId, null, exceptionMsg);
    }

    private void updateTaskState(String forecastID, String taskId, TaskState state) {
        this.updateTask(forecastID, taskId, state, null);
    }

    private void updateTask(String forecastID, String taskId, TaskState state, String exceptionMsg) {
        HashMap<String, Object> updatedFields = new HashMap<String, Object>();
        if (state != null) {
            updatedFields.put("state", state.name());
        }
        if (!Strings.isEmpty((CharSequence)exceptionMsg)) {
            updatedFields.put("error", exceptionMsg);
        }
        this.taskManager.updateTask(taskId, updatedFields, (ActionListener<UpdateResponse>)ActionListener.wrap(updateResponse -> LOG.info("Updated forecaster task {} for forecaster {}: {}", (Object)taskId, (Object)forecastID, (Object)updatedFields), e -> LOG.error("Failed to update forecaster task: {} for forecaster: {}", (Object)taskId, (Object)forecastID, e)));
    }

    private void triggerRunOnce(String forecastID, ForecastResultRequest request, ActionListener<ForecastResultResponse> listener) {
        try {
            this.resultProcessor = new ForecastResultProcessor(ForecastSettings.FORECAST_REQUEST_TIMEOUT, EntityForecastResultAction.NAME, StatNames.FORECAST_HC_EXECUTE_REQUEST_COUNT, this.settings, this.clusterService, this.threadPool, this.hashRing, this.nodeStateManager, this.transportService, this.forecastStats, this.taskManager, this.xContentRegistry, this.client, this.clientUtil, this.indexNameExpressionResolver, ForecastResultResponse.class, this.featureManager, AnalysisType.FORECAST, true);
            ActionListener wrappedListener = ActionListener.wrap(r -> {
                AtomicInteger waitTimes = new AtomicInteger(0);
                this.threadPool.schedule(() -> this.checkIfRunOnceFinished(forecastID, r.getTaskId(), waitTimes), new TimeValue(10L, TimeUnit.SECONDS), "forecast-threadpool");
                listener.onResponse((Object)r);
            }, e -> {
                LOG.error("Failed to finish run once of forecaster " + forecastID, (Throwable)e);
                listener.onFailure((Exception)new OpenSearchStatusException("Failed to run once forecaster " + forecastID, RestStatus.INTERNAL_SERVER_ERROR, new Object[0]));
            });
            this.nodeStateManager.getConfig(forecastID, AnalysisType.FORECAST, false, this.resultProcessor.onGetConfig((ActionListener<ForecastResultResponse>)wrappedListener, forecastID, request, Optional.empty()));
        }
        catch (Exception ex) {
            ResultProcessor.handleExecuteException(ex, listener, forecastID);
        }
    }

    private void performSearchWithRetry(String forecastID, String taskId, SearchRequest request, int attempt, String exceptionMsg) {
        this.client.search(request, ActionListener.wrap(searchResponse -> {
            SearchHits hits = searchResponse.getHits();
            if (hits.getTotalHits().value() > 0L) {
                this.updateTaskState(forecastID, taskId, TaskState.TEST_COMPLETE);
            } else if (attempt < 3) {
                long delayMillis = 1000L * (1L << attempt);
                LOG.info("No hits found. Retrying search in {} ms (attempt {}/{})...", (Object)delayMillis, (Object)(attempt + 1), (Object)3);
                this.threadPool.schedule(() -> this.performSearchWithRetry(forecastID, taskId, request, attempt + 1, exceptionMsg), TimeValue.timeValueMillis((long)delayMillis), "forecast-threadpool");
            } else {
                this.taskManager.getTask(taskId, ActionListener.wrap(r -> {
                    if (r.isPresent()) {
                        String state = ((ForecastTask)r.get()).getState();
                        if (Strings.isEmpty((CharSequence)state) || TaskState.NOT_ENDED_STATES.contains(state)) {
                            this.updateTask(forecastID, taskId, TaskState.INIT_TEST_FAILED, exceptionMsg);
                        }
                    } else {
                        this.updateTask(forecastID, taskId, TaskState.INIT_TEST_FAILED, exceptionMsg);
                    }
                }, e -> this.updateTask(forecastID, taskId, TaskState.INIT_TEST_FAILED, ExceptionUtil.getErrorMessage(e))));
            }
        }, e -> {
            LOG.error("Fail to search result on attempt {}/{}. Retrying...", (Object)(attempt + 1), (Object)3, e);
            if (attempt < 3) {
                long delayMillis = 1000L * (1L << attempt);
                LOG.info("Retrying search in {} ms due to failure (attempt {}/{})...", (Object)delayMillis, (Object)(attempt + 1), (Object)3);
                this.threadPool.schedule(() -> this.performSearchWithRetry(forecastID, taskId, request, attempt + 1, ExceptionUtil.getErrorMessage(e)), TimeValue.timeValueMillis((long)delayMillis), "forecast-threadpool");
            } else {
                this.updateTask(forecastID, taskId, TaskState.INIT_TEST_FAILED, ExceptionUtil.getErrorMessage(e));
            }
        }));
    }
}

