KeepSessionAliveStrategy.java

package fr.sii.ogham.sms.sender.impl.cloudhopper.session;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cloudhopper.smpp.SmppSession;
import com.cloudhopper.smpp.pdu.EnquireLink;

import fr.sii.ogham.core.exception.MessageException;
import fr.sii.ogham.core.retry.RetryExecutor;
import fr.sii.ogham.core.service.MessagingService;
import fr.sii.ogham.sms.builder.cloudhopper.SmppClientSupplier;
import fr.sii.ogham.sms.builder.cloudhopper.SmppSessionHandlerSupplier;
import fr.sii.ogham.sms.message.Sms;
import fr.sii.ogham.sms.sender.impl.cloudhopper.ExtendedSmppSessionConfiguration;
import fr.sii.ogham.sms.sender.impl.cloudhopper.exception.SmppException;

/**
 * Strategy that regularly sends {@link EnquireLink} requests to keep session
 * alive.
 * 
 * <p>
 * A session is initiated either at startup or when the first message is about
 * to be sent. Once the session is initialized, several messages can be sent
 * using the same session. Then it may take some time before a message is sent
 * again. In order to avoid initializing a new SMPP session again, the
 * connection with the server is maintained. To do so, a task is started in the
 * background in order to regularly send {@link EnquireLink} requests to the
 * server. This way the connection is maintained actively.
 * 
 * <p>
 * The opened session is automatically closed when the {@link MessagingService}
 * is released or when a cleanup is explicitly requested.
 * 
 * <p>
 * If the {@link EnquireLinkTask} fails, the error is analyzed using an
 * {@link ErrorAnalyzer}. This analyzer indicates if the triggered error is
 * raised because the connection with the server seems to be lost. In this case,
 * the current connection must be closed and a new session is created.
 * 
 * <p>
 * If a message can't be sent, the raised error is also analyzed to check
 * whether a new session has to be created. The failed SMS is not re-sent.
 * Instead the original error is thrown. This way, the global retry handling can
 * be used. A new session is requested in background.
 * 
 * <p>
 * If the reconnection fails (for example, if the server is down). The
 * reconnection process will be attempted only once (however there may have
 * several bind requests using the {@link RetryExecutor}). When a new message
 * has to be sent, the reconnection will be attempted at this moment.
 * 
 * @author Aurélien Baudet
 *
 */
public class KeepSessionAliveStrategy extends BaseSessionHandlingStrategy implements ErrorHandler {
	private static final Logger LOG = LoggerFactory.getLogger(KeepSessionAliveStrategy.class);

	private final Supplier<ScheduledExecutorService> timerSupplier;
	private final ErrorAnalyzer errorAnalyzer;
	private final ErrorHandler reconnectionErrorHandler;
	private ScheduledExecutorService currentTimer;
	private ScheduledFuture<?> enquireLinkTask;
	private volatile boolean reconnecting;

	public KeepSessionAliveStrategy(ExtendedSmppSessionConfiguration configuration, SmppClientSupplier clientSupplier, SmppSessionHandlerSupplier smppSessionHandlerSupplier, RetryExecutor retry,
			Supplier<ScheduledExecutorService> timerSupplier, ErrorAnalyzer errorAnalyzer, ErrorHandler reconnectionErrorHandler) {
		super(LOG, configuration, clientSupplier, smppSessionHandlerSupplier, retry);
		this.timerSupplier = timerSupplier;
		this.errorAnalyzer = errorAnalyzer;
		this.reconnectionErrorHandler = reconnectionErrorHandler;
		if (configuration.getKeepAlive().isConnectAtStartup(false)) {
			tryConnect();
		}
	}

	@Override
	public SmppSession getSession() throws SmppException {
		initSessionIfNeeded();
		return currentSession;
	}

	@Override
	public void messageSent(Sms sms) {
		// nothing to do
	}

	@Override
	public void messageNotSent(Sms sms, SmppException e) throws MessageException {
		if (errorAnalyzer.requiresNewConnection(e)) {
			reconnectInBackground(e);
			// Throw the original exception so that message may be handled
			// (maybe send will be retried)
			// Adds some contextual information if possible
			throw new MessageException("Failed to send SMS because it seems that the current session is broken. A new SMPP session is requested in background", sms, e);
		}
		throw new MessageException("Failed to send SMS", sms, e);
	}

	@Override
	public void handleFailure(Throwable e) {
		if (errorAnalyzer.requiresNewConnection(e)) {
			reconnectInBackground(e);
		}
	}

	@Override
	public void messageProcessed(Sms sms) {
		// nothing to do
	}

	@Override
	public void clean() {
		stopEnquiredLinkTask();
		destroySession();
		destroyClient();
	}

	private void tryConnect() {
		try {
			initSessionIfNeeded();
		} catch (SmppException e) {
			LOG.warn("Connection at startup was requested but couldn't be achived. Connection will be re-attempted when first message is sent", e);
		}
	}

	private void initSessionIfNeeded() throws SmppException {
		initClient();
		initSession();
		startEnquiredLinkTask();
	}

	private synchronized void startEnquiredLinkTask() {
		if (currentSession == null || isEnquireLinkTaskRunning()) {
			return;
		}
		// TODO: only run when no activity ? take the date of the last sent
		// message ?
		long enquireLinkRequestTimeout = configuration.getKeepAlive().getEnquireLinkTimeout();
		long enquireLinkInterval = configuration.getKeepAlive().getEnquireLinkInterval();
		LOG.debug("Start sending EnquireLink requests every {}ms", enquireLinkInterval);
		currentTimer = timerSupplier.get();
		enquireLinkTask = currentTimer.scheduleWithFixedDelay(new EnquireLinkTask(currentSession, this, enquireLinkRequestTimeout), enquireLinkInterval, enquireLinkInterval, MILLISECONDS);
	}

	private boolean isEnquireLinkTaskRunning() {
		return enquireLinkTask != null && !enquireLinkTask.isCancelled() && !enquireLinkTask.isDone();
	}

	private void stopEnquiredLinkTask() {
		if (currentTimer != null) {
			currentTimer.shutdownNow();
			currentTimer = null;
		}
		if (enquireLinkTask != null) {
			LOG.debug("Stop sending EnquireLink requests");
			enquireLinkTask.cancel(true);
			enquireLinkTask = null;
		}
	}

	private boolean tryReconnect(Throwable failureRequiringReconnection) {
		try {
			LOG.debug("Reconnecting due to error that requires a new session", failureRequiringReconnection);
			reconnect();
			LOG.debug("Reconnected");
			return true;
		} catch (SmppException se) {
			LOG.debug("Failed to reconnect. Stopping active keep alive... Reconnection and active keep alive will be attempted later (either using another strategy or when sending next message)",
					failureRequiringReconnection);
			clean();
			reconnectionErrorHandler.handleFailure(se);
			LOG.debug("Active keep alive stopped...", se);
		}
		return false;
	}

	private void reconnect() throws SmppException {
		clean();
		initSessionIfNeeded();
	}

	private boolean alreadyReconnecting() {
		return reconnecting;
	}

	private void reconnectInBackground(Throwable failureRequiringReconnection) {
		if (alreadyReconnecting()) {
			return;
		}
		ExecutorService executor = Executors.newSingleThreadExecutor();
		executor.execute(new ReconnectionTask(failureRequiringReconnection));
		executor.shutdown();
	}

	private class ReconnectionTask implements Runnable {
		private final Throwable failureRequiringReconnection;

		public ReconnectionTask(Throwable failureRequiringReconnection) {
			super();
			this.failureRequiringReconnection = failureRequiringReconnection;
		}

		@Override
		public void run() {
			reconnecting = true;
			tryReconnect(failureRequiringReconnection);
			reconnecting = false;
		}

	}
}