/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.dirigible.runtime.listener.message;

import java.util.HashMap;
import org.eclipse.dirigible.repository.datasource.DataSourceFacade;
import org.eclipse.dirigible.repository.ext.messaging.EMessagingException;
import org.eclipse.dirigible.repository.ext.messaging.MessageDefinition;
import org.eclipse.dirigible.repository.ext.messaging.MessageHub;
import org.eclipse.dirigible.repository.logging.Logger;
import org.eclipse.dirigible.runtime.listener.IListenerEventProcessor;
import org.eclipse.dirigible.runtime.listener.Listener;
import org.eclipse.dirigible.runtime.listener.ListenerProcessor;
import org.eclipse.dirigible.runtime.listener.message.MessageListenerManager;

public class MessageListenerEventProcessor
implements IListenerEventProcessor {
    private static final String PARAM_CLIENT = "client";
    private static final String PARAM_TOPIC = "topic";
    private static final String PARAM_MESSAGE = "message";
    private static final Logger logger = Logger.getLogger(MessageListenerEventProcessor.class);
    private String client;
    private String topic;
    private Listener listener;
    private MessageHub messageHub = new MessageHub(DataSourceFacade.getInstance().getDataSource(null), null);

    @Override
    public void start(Listener listener) {
        this.listener = listener;
        this.client = listener.getParams().get(PARAM_CLIENT);
        this.topic = listener.getParams().get(PARAM_TOPIC);
        try {
            this.messageHub.subscribe(this.client, this.topic);
            MessageListenerManager.getInstance().registerProcessor(this);
        }
        catch (EMessagingException e) {
            logger.error(e.getMessage(), (Throwable)e);
        }
    }

    @Override
    public void stop() {
        try {
            this.messageHub.unsubscribe(this.client, this.topic);
            MessageListenerManager.getInstance().unregisterProcessor(this);
        }
        catch (EMessagingException e) {
            logger.error(e.getMessage(), (Throwable)e);
        }
    }

    public void processMessages() throws EMessagingException {
        for (MessageDefinition messageDefinition : this.messageHub.receive(this.client)) {
            HashMap<Object, Object> executionContext = new HashMap<Object, Object>();
            executionContext.put(PARAM_MESSAGE, messageDefinition);
            ListenerProcessor.executeByEngineType(this.listener.getModule(), executionContext, this.listener);
        }
    }
}

