JSMPPServerSimulator.java

package fr.sii.ogham.testing.sms.simulator.jsmpp;

import static fr.sii.ogham.testing.sms.simulator.decode.MessageDecoder.decode;
import static java.util.Collections.unmodifiableList;
import static org.jsmpp.bean.SMSCDeliveryReceipt.SUCCESS_FAILURE;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.jsmpp.bean.CancelSm;
import org.jsmpp.bean.DataSm;
import org.jsmpp.bean.QuerySm;
import org.jsmpp.bean.ReplaceSm;
import org.jsmpp.bean.SubmitMulti;
import org.jsmpp.bean.SubmitMultiResult;
import org.jsmpp.bean.SubmitSm;
import org.jsmpp.extra.ProcessRequestException;
import org.jsmpp.session.DataSmResult;
import org.jsmpp.session.QuerySmResult;
import org.jsmpp.session.SMPPServerSession;
import org.jsmpp.session.SMPPServerSessionListener;
import org.jsmpp.session.ServerMessageReceiverListener;
import org.jsmpp.session.ServerResponseDeliveryAdapter;
import org.jsmpp.session.Session;
import org.jsmpp.util.MessageIDGenerator;
import org.jsmpp.util.MessageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import fr.sii.ogham.testing.sms.simulator.config.SimulatorConfiguration;

/**
 * @author uudashr
 * @author Aurélien Baudet
 *
 */
public class JSMPPServerSimulator extends ServerResponseDeliveryAdapter implements Runnable, ServerMessageReceiverListener {
	private static final int BIND_THREAD_POOL_SIZE = 5;
	private static final int RECEIPT_THREAD_POOL_SIZE = 100;

	private static final Logger LOG = LoggerFactory.getLogger(JSMPPServerSimulator.class);

	private ExecutorService execService;
	private final ExecutorService execServiceDelReceipt = Executors.newFixedThreadPool(RECEIPT_THREAD_POOL_SIZE);
	private final MessageIDGenerator messageIDGenerator = new UnsecureRandomMessageIDGenerator();
	private int port;
	private boolean stopped;
	private List<SubmitSm> receivedMessages = new ArrayList<>();
	private SMPPServerSessionListener sessionListener;
	private SMPPServerSession serverSession;
	private final Object startupMonitor = new Object();
	private volatile boolean running = false;
	private final SimulatorConfiguration config;
	private ServerStartupException startupFailure;

	public JSMPPServerSimulator(int port, SimulatorConfiguration config) {
		this.port = port;
		this.config = config;
	}

	public void run() {
		try {
			if (!stopped) {
				sessionListener = createServerSessionListener();
				execService = Executors.newFixedThreadPool(BIND_THREAD_POOL_SIZE);
				running = true;
				LOG.info("Listening on port {}", port);
				synchronized (startupMonitor) {
					startupMonitor.notifyAll();
				}
			}
			while (!stopped) {
				serverSession = sessionListener.accept();
				LOG.info("Accepting connection for session {}", serverSession.getSessionId());
				serverSession.setMessageReceiverListener(this);
				serverSession.setResponseDeliveryListener(this);
				execService.execute(new WaitBindTask(serverSession, config.getCredentials()));
			}
		} catch (IOException e) {
			if (!stopped) { // NOSONAR
				LOG.trace("Failed to initialize SMPP server simulator", e);
				startupFailure = new ServerStartupException("Server failed to start on port "+port, e);
				close();
			}
		} finally {
			// Notify everybody that we're ready to accept connections or failed
			// to start.
			// Otherwise will run into startup timeout, see
			// #waitTillRunning(long).
			synchronized (startupMonitor) {
				startupMonitor.notifyAll();
			}
		}
	}

	private SMPPServerSessionListener createServerSessionListener() throws IOException {
		return new ConfigurableSMPPServerSessionListener(port, config.getServerDelays());
	}

	public synchronized void reset() {
		stopped = false;
		if (!config.isKeepMessages()) {
			receivedMessages.clear();
		}
	}

	public synchronized void stop() {
		LOG.info("Stopping SMPP simulator");
		running = false;
		stopped = true;
		if (execService != null) {
			LOG.trace("Stopping executor service");
			execService.shutdownNow();
			execService = null;
		}
		close();
		LOG.info("SMPP simulator stopped");
	}

	private void close() {
		if (serverSession != null) {
			LOG.trace("Closing server session");
			serverSession.close();
			LOG.trace("Server session closed");
			serverSession = null;
		}
		if (sessionListener != null) {
			try {
				LOG.trace("Closing session listener");
				sessionListener.close();
				LOG.trace("Session listener closed");
				sessionListener = null;
			} catch (IOException e) {
				// nothing to do
				LOG.trace("Failed to close session listener", e);
			}
		}
	}

	public void waitTillRunning(long timeoutInMs) throws ServerStartupException {
		try {
			long t = System.currentTimeMillis();
			synchronized (startupMonitor) {
				// Loop to avoid spurious wake ups, see
				// https://www.securecoding.cert.org/confluence/display/java/THI03-J.+Always+invoke+wait%28%29+and+await%28%29+methods+inside+a+loop
				while (!running && startupFailure == null && System.currentTimeMillis() - t < timeoutInMs) {
					startupMonitor.wait(timeoutInMs);
				}
			}
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
			throw new ServerStartupException("Server failed to start (interrupted)", e);
		}
		
		if (startupFailure != null) {
			throw startupFailure;
		}

		if (!running) {
			close();
			throw new ServerStartupException("Server bouldn't be started after "+timeoutInMs+"ms");
		}
	}

	public QuerySmResult onAcceptQuerySm(QuerySm querySm, SMPPServerSession source) throws ProcessRequestException {
		LOG.info("Accepting query sm, but not implemented");
		return null;
	}

	public MessageId onAcceptSubmitSm(SubmitSm submitSm, SMPPServerSession source) throws ProcessRequestException {
		MessageId messageId = messageIDGenerator.newMessageId();
		if (LOG.isDebugEnabled()) {
			LOG.debug("Receiving submit_sm '{}', and return message id {}", decode(new SubmitSmAdapter(submitSm)), messageId);
		}
		receivedMessages.add(submitSm);
		if (SUCCESS_FAILURE.containedIn(submitSm.getRegisteredDelivery())) {
			execServiceDelReceipt.execute(new DeliveryReceiptTask(source, submitSm, messageId));
		}
		return messageId;
	}

	@Override
	public void onSubmitSmRespSent(MessageId messageId, SMPPServerSession source) {
		LOG.debug("submit_sm_resp with message_id {} has been sent", messageId);
	}

	public SubmitMultiResult onAcceptSubmitMulti(SubmitMulti submitMulti, SMPPServerSession source) throws ProcessRequestException {
		MessageId messageId = messageIDGenerator.newMessageId();
		if (LOG.isDebugEnabled()) {
			LOG.debug("Receiving submit_multi_sm '{}', and return message id {}", submitMulti, messageId);
		}
		if (SUCCESS_FAILURE.containedIn(submitMulti.getRegisteredDelivery())) {
			execServiceDelReceipt.execute(new DeliveryReceiptTask(source, submitMulti, messageId));
		}

		return new SubmitMultiResult(messageId.getValue());
	}

	public DataSmResult onAcceptDataSm(DataSm dataSm, Session source) throws ProcessRequestException {
		LOG.debug("onAcceptDataSm '{}'", dataSm);
		return null;
	}

	@Override
	public void onAcceptCancelSm(CancelSm cancelSm, SMPPServerSession source) throws ProcessRequestException {
		// nothing to do
	}

	@Override
	public void onAcceptReplaceSm(ReplaceSm replaceSm, SMPPServerSession source) throws ProcessRequestException {
		// nothing to do
	}

	public List<SubmitSm> getReceivedMessages() {
		return unmodifiableList(new ArrayList<>(receivedMessages));
	}

	public int getPort() {
		return port;
	}
}