/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.amqp;

import com.github.benmanes.caffeine.cache.Cache;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.Message;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.amqp.AbstractServiceClient;
import org.eclipse.hono.client.amqp.RequestResponseClient;
import org.eclipse.hono.client.amqp.config.RequestResponseClientConfigProperties;
import org.eclipse.hono.client.amqp.connection.AmqpUtils;
import org.eclipse.hono.client.amqp.connection.HonoConnection;
import org.eclipse.hono.client.amqp.connection.SendMessageSampler;
import org.eclipse.hono.client.util.CachingClientFactory;
import org.eclipse.hono.client.util.StatusCodeMapper;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.CacheDirective;
import org.eclipse.hono.util.RequestResponseResult;

public abstract class AbstractRequestResponseServiceClient<T, R extends RequestResponseResult<T>>
extends AbstractServiceClient {
    private static final int[] CACHEABLE_STATUS_CODES = new int[]{200, 203, 206, 300, 301, 410};
    protected final CachingClientFactory<RequestResponseClient<R>> clientFactory;
    private final Cache<Object, R> responseCache;

    protected AbstractRequestResponseServiceClient(HonoConnection connection, SendMessageSampler.Factory samplerFactory, CachingClientFactory<RequestResponseClient<R>> clientFactory, Cache<Object, R> responseCache) {
        super(connection, samplerFactory);
        this.clientFactory = Objects.requireNonNull(clientFactory);
        this.responseCache = Optional.ofNullable(responseCache).orElse(null);
    }

    protected abstract String getKey(String var1);

    protected void removeClient(String tenantId) {
        this.clientFactory.removeClient(this.getKey(tenantId));
    }

    protected void handleTenantTimeout(Message<Object> msg) {
        Optional.ofNullable(msg.body()).filter(String.class::isInstance).map(String.class::cast).map(this::getKey).ifPresent(k -> this.clientFactory.removeClient((String)k, RequestResponseClient::close));
    }

    @Override
    protected void onDisconnect() {
        this.clientFactory.onDisconnect();
    }

    protected final boolean isSuccessResponse(int status, String contentType, Buffer payload) {
        return StatusCodeMapper.isSuccessful(status) && payload != null && "application/json".equalsIgnoreCase(contentType);
    }

    protected final R getRequestResponseResult(org.apache.qpid.proton.message.Message message) {
        Integer status = AmqpUtils.getStatus(message);
        if (status == null) {
            this.log.debug("response message has no status code application property [reply-to: {}, correlation ID: {}]", (Object)message.getReplyTo(), message.getCorrelationId());
            return null;
        }
        CacheDirective cacheDirective = CacheDirective.from(AmqpUtils.getCacheDirective(message));
        return this.getResult(status, message.getContentType(), AmqpUtils.getPayload(message), cacheDirective, message.getApplicationProperties());
    }

    protected abstract R getResult(int var1, String var2, Buffer var3, CacheDirective var4, ApplicationProperties var5);

    protected final long getResponseCacheDefaultTimeout() {
        if (this.connection.getConfig() instanceof RequestResponseClientConfigProperties) {
            return ((RequestResponseClientConfigProperties)this.connection.getConfig()).getResponseCacheDefaultTimeout();
        }
        return 600L;
    }

    protected final boolean isCachingEnabled() {
        return this.responseCache != null;
    }

    private boolean isCacheableStatusCode(int code) {
        return Arrays.binarySearch(CACHEABLE_STATUS_CODES, code) >= 0;
    }

    protected Future<R> getResponseFromCache(Object key, Span currentSpan) {
        Objects.requireNonNull(key);
        if (this.isCachingEnabled()) {
            RequestResponseResult result = (RequestResponseResult)this.responseCache.getIfPresent(key);
            TracingHelper.TAG_CACHE_HIT.set(currentSpan, result != null);
            return Optional.ofNullable(result).map(Future::succeededFuture).orElse(Future.failedFuture("cache miss"));
        }
        TracingHelper.TAG_CACHE_HIT.set(currentSpan, Boolean.FALSE);
        return Future.failedFuture(new IllegalStateException("no cache configured"));
    }

    protected final void addToCache(Object key, R response) {
        if (this.isCachingEnabled()) {
            Objects.requireNonNull(key);
            Objects.requireNonNull(response);
            boolean resultCanBeCached = Optional.ofNullable(((RequestResponseResult)response).getCacheDirective()).map(directive -> directive.isCachingAllowed()).orElse(this.isCacheableStatusCode(((RequestResponseResult)response).getStatus()));
            if (resultCanBeCached) {
                this.responseCache.put(key, response);
            }
        }
    }

    protected final void removeFromCache(Object key) {
        if (this.isCachingEnabled()) {
            Objects.requireNonNull(key);
            this.responseCache.invalidate(key);
        }
    }

    protected final void removeFromCacheByPattern(Predicate<Object> keyPredicate) {
        if (this.isCachingEnabled()) {
            Objects.requireNonNull(keyPredicate);
            this.connection.getVertx().executeBlocking(p -> {
                Set matchingKeys = this.responseCache.asMap().keySet().stream().filter(keyPredicate).collect(Collectors.toSet());
                if (!matchingKeys.isEmpty()) {
                    this.log.debug("removing {} responses from the cache", (Object)matchingKeys.size());
                    this.responseCache.invalidateAll(matchingKeys);
                }
            });
        }
    }

    protected final Future<T> mapResultAndFinishSpan(Future<R> result, Function<R, T> resultMapper, Span currentSpan) {
        return result.recover(t -> {
            Tags.HTTP_STATUS.set(currentSpan, ServiceInvocationException.extractStatusCode(t));
            TracingHelper.logError(currentSpan, t);
            return Future.failedFuture(t);
        }).map(resultValue -> {
            this.setTagsForResult(currentSpan, resultValue);
            return resultMapper.apply(resultValue);
        }).onComplete(o -> currentSpan.finish());
    }

    protected final void setTagsForResult(Span span, R result) {
        Objects.requireNonNull(span);
        if (result != null) {
            Tags.HTTP_STATUS.set(span, ((RequestResponseResult)result).getStatus());
            if (((RequestResponseResult)result).isError()) {
                Tags.ERROR.set(span, Boolean.TRUE);
            }
        } else {
            Tags.HTTP_STATUS.set(span, 202);
        }
    }

    protected final Map<String, Object> createDeviceIdProperties(String deviceId) {
        HashMap<String, Object> properties = new HashMap<String, Object>();
        properties.put("device_id", deviceId);
        return properties;
    }
}

