1 /**
2  * MQTT client implementation
3  *
4  * Author:
5  * Tomáš Chaloupka <chalucha@gmail.com>
6  *
7  * License:
8  * Boost Software License 1.0 (BSL-1.0)
9  *
10  * Permission is hereby granted, free of charge, to any person or organization obtaining a copy
11  * of the software and accompanying documentation covered by this license (the "Software") to use,
12  * reproduce, display, distribute, execute, and transmit the Software, and to prepare derivative
13  * works of the Software, and to permit third-parties to whom the Software is furnished to do so,
14  * all subject to the following:
15  *
16  * The copyright notices in the Software and this entire statement, including the above license
17  * grant, this restriction and the following disclaimer, must be included in all copies of the Software,
18  * in whole or in part, and all derivative works of the Software, unless such copies or derivative works
19  * are solely in the form of machine-executable object code generated by a source language processor.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
22  * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
23  * PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR ANYONE
24  * DISTRIBUTING THE SOFTWARE BE LIABLE FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT,
25  * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
26  * OTHER DEALINGS IN THE SOFTWARE.
27  */
28 module mqttd.client;
29 
30 import mqttd.messages;
31 import mqttd.serialization;
32 import mqttd.stream_wrapper;
33 import mqttd.traits;
34 
35 import std.algorithm : any, map;
36 import std.array : array;
37 import std.datetime;
38 import std.exception;
39 debug import std.stdio;
40 import std..string : format, representation;
41 import std.traits;
42 import std.typecons : Flag, No, Yes;
43 
44 import vibe.core.concurrency;
45 import vibe.core.log;
46 import vibe.core.net: TCPConnection;
47 import vibe.core.stream;
48 import vibe.core.sync;
49 import vibe.core.task;
50 import vibe.stream.tls;
51 import vibe.utils.array : FixedRingBuffer;
52 
53 // constants
54 enum MQTT_MAX_PACKET_ID = ushort.max; /// maximal packet id (0..65536) - defined by MQTT protocol
55 
56 // default settings
57 enum MQTT_DEFAULT_BROKER_PORT = 1883u; /// default mqtt broker port
58 enum MQTT_DEFAULT_BROKER_SSL_PORT = 8883u; /// default mqtt broker ssl port
59 enum MQTT_DEFAULT_SENDQUEUE_SIZE = 1000u; /// maximal number of packets stored in queue to send
60 enum MQTT_DEFAULT_INFLIGHTQUEUE_SIZE = 10u; /// maximal number of packets which can be processed at the same time
61 enum MQTT_DEFAULT_CLIENT_ID = "vibe-mqtt"; /// default client identifier
62 enum MQTT_DEFAULT_RETRY_DELAY = 10_000u; /// retry interval to resend publish (QoS 1 or 2), subscribe and unsubscribe messages [ms]
63 enum MQTT_DEFAULT_RETRY_ATTEMPTS = 3u; /// max publish, subscribe and unsubscribe retry for QoS Level 1 or 2
64 
65 // aliases
66 alias SessionContainer = FixedRingBuffer!(MessageContext);
67 
68 template Callback(T...)
69 {
70 	static if (T.length == 1) alias Callback = void delegate(scope MqttClient ctx, in T packet) @safe;
71 	else static if (T.length == 0) alias Callback = void delegate(scope MqttClient ctx) @safe;
72 	else static assert(0, "Invalid callback");
73 }
74 
75 /// MqttClient settings
76 struct Settings
77 {
78 	string host = "127.0.0.1"; /// message broker address
79 	ushort port = MQTT_DEFAULT_BROKER_PORT; /// message broker port
80 	string clientId = MQTT_DEFAULT_CLIENT_ID; /// Client Id to identify within message broker (must be unique)
81 	string userName = null; /// optional user name to login with
82 	string password = null; /// user password
83 	int retryDelay = MQTT_DEFAULT_RETRY_DELAY; /// retry interval to resend publish QoS 1 and 2 messages [ms]
84 	int retryAttempts = MQTT_DEFAULT_RETRY_ATTEMPTS; /// how many times will client try to resend QoS1 and QoS2 messages
85 	bool cleanSession = true; /// clean client and server session state on connect
86 	size_t sendQueueSize = MQTT_DEFAULT_SENDQUEUE_SIZE; /// maximal number of packets stored in queue to send
87 	size_t inflightQueueSize = MQTT_DEFAULT_INFLIGHTQUEUE_SIZE; /// maximal number of packets which can be processed at the same time
88 	ushort keepAlive; /// The Keep Alive is a time interval [s] to send control packets to server. It's used to determine that the network and broker are working. If set to 0, no control packets are send automatically (default).
89 	Duration reconnect; /// Time interval [s] in which client tries to reconnect to broker if disconnected. If set to 0, auto reconnect is disabled (default)
90 	bool useSsl = false; /// use SSL/TLS for the connection
91 	string trustedCertificateFile = null; /// list of trusted certificates for verifying peer certificates
92 	TLSPeerValidationMode peerValidationMode = TLSPeerValidationMode.none; /// mode for verifying peer certificates
93 	string[] alpn; /// list of ALPN extension protocols
94 
95 	// callbacks
96 	Callback!ConnAck onConnAck;
97 	Callback!PingResp onPingResp;
98 	Callback!PubAck onPubAck;
99 	Callback!PubRec onPubRec;
100 	Callback!PubComp onPubComp;
101 	Callback!PubRel onPubRel;
102 	Callback!Publish onPublish;
103 	Callback!SubAck onSubAck;
104 	Callback!UnsubAck onUnsubAck;
105 	Callback!() onDisconnect;
106 }
107 
108 /**
109  * Packet ID generator
110  * Holds the status of ID usage and generates the next ones.
111  *
112  * It's a thread safe singleton
113  */
114 class PacketIdGenerator
115 {
116 private:
117 	this()
118 	{
119 		_event = createManualEvent();
120 		setUsed(0);
121 	}
122 
123 	// TLS flag, each thread has its own
124 	static bool instantiated_;
125 
126 	// "True" global
127 	__gshared PacketIdGenerator instance_;
128 
129 	pragma(inline, true) auto getIdx(ushort id) { return id / (size_t.sizeof*8); }
130 	pragma(inline, true) auto getIdxValue(ushort id) { return cast(size_t)(1uL << (id % (size_t.sizeof*8))); }
131 
132 	LocalManualEvent _event;
133 	ushort _packetId = 0u;
134 	size_t[(MQTT_MAX_PACKET_ID+1)/(size_t.sizeof*8)] _idUsage; //1024 * 64 = 65536 => id usage flags storage
135 
136 public:
137 
138 	/// Instance of Packet ID generator
139 	@property static PacketIdGenerator get() @trusted
140 	{
141 		// Since every thread has its own instantiated_ variable,
142 		// there is no need for synchronization here.
143 		if (!instantiated_)
144 		{
145 			synchronized (PacketIdGenerator.classinfo)
146 			{
147 				if (!instance_)
148 				{
149 					instance_ = new PacketIdGenerator();
150 				}
151 				instantiated_ = true;
152 			}
153 		}
154 		return instance_;
155 	}
156 
157 @safe:
158 
159 	/// Gets next packet id. If the session is full it won't return till there is free space again
160 	@property auto nextPacketId()
161 	out (result)
162 	{
163 		assert(result, "packet id can't be 0!");
164 		assert(_event);
165 	}
166 	body
167 	{
168 		do
169 		{
170 			if (!_idUsage[].any!(a => a != size_t.max)())
171 			{
172 				version (MqttDebug) logDiagnostic("MQTT all packet ids in use - waiting");
173 				this._event.wait();
174 				continue;
175 			}
176 
177 			//packet id can't be 0!
178 			_packetId = cast(ushort)((_packetId % MQTT_MAX_PACKET_ID) != 0 ? _packetId + 1 : 1);
179 		}
180 		while (isUsed(_packetId));
181 
182 		version (MqttDebug) if (_packetId == 1) logDiagnostic("ID Overflow");
183 
184 		setUsed(_packetId);
185 
186 		return _packetId;
187 	}
188 
189 	/// Is packet id currently used?
190 	pragma(inline) auto isUsed(ushort id) { return (_idUsage[getIdx(id)] & getIdxValue(id)) == getIdxValue(id); }
191 
192 	/// Sets packet id as used
193 	pragma(inline) void setUsed(ushort id)
194 	{
195 		assert(!isUsed(id));
196 		assert(_event);
197 
198 		_idUsage[getIdx(id)] |= getIdxValue(id);
199 		version (unittest) {} //HACK: For some reason unittest will segfault when emiting
200 		else _event.emit();
201 	}
202 
203 	/// Sets packet id as unused
204 	pragma(inline) void setUnused(ushort id)
205 	{
206 		assert(id != 0);
207 		assert(isUsed(id));
208 		assert(_event);
209 
210 		_idUsage[getIdx(id)] ^= getIdxValue(id);
211 
212 		version (unittest) {} //HACK: For some reason unittest will segfault when emiting
213 		else _event.emit();
214 	}
215 }
216 
217 unittest
218 {
219 	auto gen = PacketIdGenerator.get;
220 	assert(gen.getIdx(1) == 0);
221 	assert(gen.getIdx(64) == 1);
222 	assert(gen.getIdxValue(1) == gen.getIdxValue(65));
223 	assert(gen.getIdxValue(63) == 0x8000000000000000);
224 	assert(gen.getIdx(128) == 2);
225 	gen.setUsed(1);
226 	assert(gen.isUsed(1));
227 	gen.setUsed(64);
228 	assert(gen.isUsed(64));
229 	gen.setUnused(64);
230 	assert(!gen.isUsed(64));
231 	assert(gen.isUsed(1));
232 	gen.setUnused(1);
233 
234 	foreach(i; 1..size_t.sizeof*8) gen.setUsed(cast(ushort)i);
235 	assert(gen._idUsage[0] == size_t.max);
236 }
237 
238 /// MQTT packet state
239 enum PacketState
240 {
241 	queuedQos0, /// QOS = 0, Publish message queued
242 	queuedQos1, /// QOS = 1, Publish message queued
243 	queuedQos2, /// QOS = 2, Publish message queued
244 
245 	waitForPuback, /// QOS = 1, PUBLISH sent, wait for PUBACK
246 	waitForPubrec, /// QOS = 2, PUBLISH sent, wait for PUBREC
247 	waitForPubrel, /// QOS = 2, PUBREC sent, wait for PUBREL
248 	waitForPubcomp, /// QOS = 2, PUBREL sent, wait for PUBCOMP
249 }
250 
251 /// Origin of the stored packet
252 enum PacketOrigin
253 {
254 	client, /// originated from this client
255 	broker /// originated from broker
256 }
257 
258 /// Context for MQTT packet stored in Session
259 private @safe struct MessageContext
260 {
261 	~this()
262 	{
263 		decRef();
264 	}
265 
266 	this(Publish message, PacketState state, PacketOrigin origin = PacketOrigin.client)
267 	{
268 		assert(refcount is null);
269 		refcount = new int(1);
270 
271 		this.timestamp = Clock.currTime;
272 		this.state = state;
273 		this.origin = origin;
274 		this.message = message;
275 		if (origin == PacketOrigin.client && state != PacketState.queuedQos0)
276 			this.message.packetId = PacketIdGenerator.get.nextPacketId();
277 	}
278 
279 	this (this)
280 	{
281 		if (refcount !is null) *refcount += 1;
282 	}
283 
284 	PacketState state; /// message state
285 	uint attempt; /// Attempt (for retry)
286 	SysTime timestamp; /// Timestamp (for retry)
287 	PacketOrigin origin; /// message origin
288 	Publish message; /// message itself
289 
290 	alias message this;
291 
292 private:
293 	int* refcount;
294 
295 	void decRef()
296 	{
297 		if (refcount !is null)
298 		{
299 			if ((*refcount -= 1) == 0)
300 			{
301 				refcount = null;
302 				if (this.origin == PacketOrigin.client && this.packetId)
303 					PacketIdGenerator.get.setUnused(this.packetId);
304 			}
305 		}
306 	}
307 }
308 
309 /// Queue storage helper for session
310 private @safe struct SessionQueue(Flag!"send" send)
311 {
312 	this(Settings settings)
313 	{
314 		_event = createManualEvent();
315 
316 		static if (send) _packets = SessionContainer(settings.sendQueueSize);
317 		else _packets = SessionContainer(settings.inflightQueueSize);
318 	}
319 
320 	@disable this();
321 	@disable this(this) {}
322 
323 	/**
324 	 * Adds packet to Session
325 	 * If the session is full the call will be blocked until there is space again.
326 	 * Also if there is no free packetId to use, it will be blocked until it is.
327 	 *
328 	 * Params:
329 	 * 		packet = packet to be sent (can be Publish, Subscribe or Unsubscribe)
330 	 * 		state = initial packet state
331 	 * 		origin = origin of the packet (session stores control packets from broker too)
332 	 *
333 	 * Returns:
334 	 * 		Assigned packetId (0 if QoS0 is set). If message originates from broker, it keeps the original..
335 	 */
336 	ushort add(Publish packet, PacketState state, PacketOrigin origin = PacketOrigin.client)
337 	{
338 		return add(MessageContext(packet, state, origin));
339 	}
340 
341 	/// ditto
342 	ushort add(MessageContext ctx)
343 	in
344 	{
345 		assert(ctx.packetId || ctx.state == PacketState.queuedQos0, "PacketId must be set");
346 		static if (send) assert(ctx.origin == PacketOrigin.client, "Only client messages can be added to send queue");
347 
348 		with (PacketState)
349 		{
350 			final switch (ctx.state)
351 			{
352 				case queuedQos0:
353 				case queuedQos1:
354 				case queuedQos2:
355 					static if (!send) assert(0, "Invalid packet state");
356 					else break;
357 				case waitForPuback:
358 				case waitForPubcomp:
359 				case waitForPubrec:
360 				case waitForPubrel:
361 					static if (send) assert(0, "Invalid packet state");
362 					else break;
363 			}
364 		}
365 	}
366 	body
367 	{
368 		while (_packets.full)
369 		{
370 			static if (send)
371 			{
372 				if (ctx.state == PacketState.queuedQos0)
373 				{
374 					version (MqttDebug) logDebug("MQTT SendQueueFull - dropping QoS0 publish msg");
375 					return cast(ushort)0;
376 				}
377 				else
378 				{
379 					version (MqttDebug)
380 					{
381 						logDebug("MQTT SendQueueFull ([%s] %s) - waiting", ctx.packetId, ctx.state);
382 						scope (exit) logDebug("MQTT SendQueueFull after wait ([%s] %s)", ctx.packetId, ctx.state);
383 					}
384 					_event.wait();
385 				}
386 			}
387 			else
388 			{
389 				version (MqttDebug)
390 				{
391 					logDebug("MQTT InflightQueueFull ([%s] %s) - waiting", ctx.packetId, ctx.state);
392 					scope (exit) logDebug("MQTT InflightQueueFull after wait ([%s] %s)", ctx.packetId, ctx.state);
393 				}
394 				_event.wait();
395 			}
396 		}
397 
398 		static if (send) assert(!_packets.full, format("SEND %s", ctx));
399 		else assert(!_packets.full, format("WAIT %s", ctx));
400 
401 		_packets.put(ctx);
402 		_event.emit();
403 
404 		return ctx.packetId;
405 	}
406 
407 	/// Waits until the session state is changed
408 	auto wait()
409 	{
410 		return _event.wait();
411 	}
412 
413 	/// Waits until the session state is changed or timeout is reached
414 	auto wait(Duration timeout)
415 	{
416 		return _event.wait(timeout, _event.emitCount);
417 	}
418 
419 	/// Manually emit session state change to all listeners
420 	auto emit()
421 	{
422 		return _event.emit();
423 	}
424 
425 	/// Removes the stored PacketContext
426 	void removeAt(size_t idx)
427 	{
428 		assert(idx < this.length);
429 
430 		_packets.removeAt(_packets[idx..idx+1]);
431 		_event.emit();
432 	}
433 
434 	/// Finds package context stored in session
435 	auto canFind(ushort packetId, out size_t idx, PacketState[] state...)
436 	{
437 		import alg = std.algorithm : canFind;
438 		foreach (i, ref c; _packets)
439 		{
440 			if (c.packetId == packetId && (!state.length || alg.canFind!(a => a == c.state)(state)))
441 			{
442 				idx = i;
443 				return true;
444 			}
445 		}
446 
447 		return false;
448 	}
449 
450 nothrow:
451 
452 	ref MessageContext opIndex(size_t idx) @nogc pure
453 	{
454 		assert(idx < this.length);
455 		return _packets[idx];
456 	}
457 
458 	@property ref MessageContext front() @nogc pure
459 	{
460 		return _packets.front();
461 	}
462 
463 	void popFront()
464 	{
465 		assert(!_packets.empty);
466 		_packets.popFront();
467 		_event.emit();
468 	}
469 
470 	@property bool empty() const @nogc pure
471 	{
472 		return _packets.empty;
473 	}
474 
475 	@property bool full() const @nogc pure
476 	{
477 		return _packets.full;
478 	}
479 
480 	/// Number of packets to process
481 	@property auto length() const @nogc pure
482 	{
483 		return _packets.length;
484 	}
485 
486 	/// Clears cached messages
487 	void clear()
488 	{
489 		_packets.clear();
490 		_event.emit();
491 	}
492 
493 private:
494 	LocalManualEvent _event;
495 	SessionContainer _packets;
496 }
497 
498 /// MQTT session status holder
499 private @safe struct Session
500 {
501 	alias InflightQueue = SessionQueue!(No.send);
502 	alias SendQueue = SessionQueue!(Yes.send);
503 
504 	this(Settings settings)
505 	{
506 		_inflightQueue = InflightQueue(settings);
507 		_sendQueue = SendQueue(settings);
508 	}
509 
510 nothrow:
511 
512 	@disable this(this) {}
513 
514 	@property auto ref inflightQueue()
515 	{
516 		return _inflightQueue;
517 	}
518 
519 	@property auto ref sendQueue()
520 	{
521 		return _sendQueue;
522 	}
523 
524 	void clear()
525 	{
526 		this._inflightQueue.clear();
527 		this._sendQueue.clear();
528 	}
529 
530 private:
531 	/// Packets to handle
532 	InflightQueue _inflightQueue;
533 	SendQueue _sendQueue;
534 }
535 
536 unittest
537 {
538 	auto s = Session(Settings());
539 
540 	auto pub = Publish();
541 	pub.header.qos = QoSLevel.QoS1;
542 	auto id = s.sendQueue.add(pub, PacketState.queuedQos1);
543 
544 	assert(s.sendQueue.length == 1);
545 
546 	size_t idx;
547 	assert(id != 0);
548 	assert(s.sendQueue.canFind(id, idx));
549 	assert(idx == 0);
550 	assert(s.sendQueue.length == 1);
551 
552 	auto ctx = s.sendQueue[idx];
553 
554 	assert(ctx.state == PacketState.queuedQos1);
555 	assert(ctx.attempt == 0);
556 	assert(ctx.message != Publish.init);
557 	assert(ctx.timestamp != SysTime.init);
558 
559 	s.sendQueue.removeAt(idx);
560 	assert(s.sendQueue.length == 0);
561 }
562 
563 /// MQTT Client implementation
564 @safe class MqttClient
565 {
566 	import std.array : Appender;
567 	import vibe.core.core : createTimer, setTimer, Timer;
568 
569 	this(Settings settings)
570 	{
571 		import std.socket : Socket;
572 
573 		_readMutex = new RecursiveTaskMutex();
574 		_writeMutex = new RecursiveTaskMutex();
575 
576 		_settings = settings;
577 		if (_settings.clientId.length == 0) // set clientId if not provided
578 			_settings.clientId = Socket.hostName;
579 
580 		_readBuffer.capacity = 4 * 1024;
581 		_session = Session(settings);
582 		_conAckTimer = createTimer(
583 			() @safe nothrow
584 			{
585 				logWarn("MQTT ConAck not received, disconnecting");
586 				this.disconnectImpl(false);
587 			});
588 	}
589 
590 	/// Connects to the specified broker and sends it the Connect packet
591 	void connect() @safe nothrow
592 	body
593 	{
594 		import vibe.core.core: runTask;
595 		import vibe.core.net: connectTCP;
596 
597 		if (_conAckTimer.pending)
598 		{
599 			version(MqttDebug) logDebug("MQTT Broker already Connecting");
600 			return;
601 		}
602 
603 		if (this.connected)
604 		{
605 			version(MqttDebug) logDebug("MQTT already Connected to Broker");
606 			return;
607 		}
608 
609 		version(MqttDebug) logDebug("MQTT Connect");
610 
611 		//cleanup before reconnect
612 		_readBuffer.clear();
613 		if (_settings.cleanSession ) _session.clear();
614 		_disconnecting = false;
615 
616 		try
617 		{
618 			_con = connectTCP(_settings.host, _settings.port);
619 			if(_settings.useSsl)
620 			{
621 				auto sslctx = createTLSContext(TLSContextKind.client);
622 				sslctx.peerValidationMode = TLSPeerValidationMode.none;
623 				if(_settings.trustedCertificateFile !is null)
624 				{
625 					sslctx.useTrustedCertificateFile(_settings.trustedCertificateFile);
626 				}
627 				if (_settings.alpn.length)
628 					sslctx.setClientALPN(_settings.alpn);
629 				_stream = createTLSStream(_con, sslctx);
630 			}
631 			else
632 			{
633 				_stream = new StreamWrapper!TCPConnection(_con);
634 			}
635 			_listener = runTask(&listener);
636 			_dispatcher = runTask(&dispatcher);
637 
638 			version(MqttDebug) logDebug("MQTT Broker Connecting");
639 
640 			auto con = Connect();
641 			con.clientIdentifier = _settings.clientId;
642 			con.flags.cleanSession = _settings.cleanSession;
643 			con.keepAlive = cast(ushort)((_settings.keepAlive * 3) / 2);
644 			if (_settings.userName.length > 0)
645 			{
646 				con.flags.userName = true;
647 				con.userName = _settings.userName;
648 				if (_settings.password.length > 0)
649 				{
650 					con.flags.password = true;
651 					con.password = _settings.password;
652 				}
653 			}
654 
655 			this.send(con);
656 			_conAckTimer.rearm(5.seconds);
657 		}
658 		catch (Exception ex)
659 		{
660 			() @trusted {logError("MQTT Error connecting to the broker: %s", ex);}();
661 			disconnectImpl(false);
662 		}
663 	}
664 
665 	/// Sends Disconnect packet to the broker and closes the underlying connection
666 	void disconnect() nothrow
667 	{
668 		if (this.connected)
669 		{
670 			version(MqttDebug) logDebug("MQTT Disconnecting from Broker");
671 
672 			this.send(Disconnect());
673 			disconnectImpl(true);
674 
675 			if(Task.getThis !is _listener)
676 				try _listener.join; catch (Exception) {}
677 		}
678 		else
679 		{
680 			version(MqttDebug) logDebug("MQTT Already Disconnected from Broker");
681 			disconnectImpl(true); // to make sure we stop any reconnect attempts too
682 		}
683 	}
684 
685 	final
686 	{
687 		/**
688 		 * Return true, if client is in a connected state
689 		 */
690 		@property bool connected() const nothrow
691 		{
692 			return !_disconnecting && _con && _con.connected;
693 			}
694 
695 		/**
696 		 * Publishes the message on the specified topic
697 		 *
698 		 * Params:
699 		 *     topic = Topic to send message to
700 		 *     payload = Content of the message
701 		 *     qos = Required QoSLevel to handle message (default is QoSLevel.AtMostOnce)
702 		 *     retain = If true, the server must store the message so that it can be delivered to future subscribers
703 		 *
704 		 */
705 		void publish(T)(in string topic, in T payload, QoSLevel qos = QoSLevel.QoS0, bool retain = false)
706 			if (isSomeString!T || (isArray!T && is(ForeachType!T : ubyte)))
707 		{
708 			auto pub = Publish();
709 			pub.header.qos = qos;
710 			pub.header.retain = retain;
711 			pub.topic = topic;
712 			static if (isSomeString!T) pub.payload = payload.representation.dup;
713 			else pub.payload = payload.dup;
714 
715 			//TODO: Maybe send QoS0 directly? Use settings parameter for it?
716 			_session.sendQueue.add(pub, qos == QoSLevel.QoS0 ?
717 				PacketState.queuedQos0 :
718 				(qos == QoSLevel.QoS1 ? PacketState.queuedQos1 : PacketState.queuedQos2));
719 		}
720 
721 		/**
722 		 * Subscribes to the specified topics
723 		 *
724 		 * Params:
725 		 *      topics = Array of topic filters to subscribe to
726 		 *      qos = This gives the maximum QoS level at which the Server can send Application Messages to the Client.
727 		 *
728 		 */
729 		void subscribe(const string[] topics, QoSLevel qos = QoSLevel.QoS0)
730 		{
731 			auto sub = Subscribe();
732 			sub.packetId = _subId = PacketIdGenerator.get.nextPacketId();
733 			sub.topics = topics.map!(a => Topic(a, qos)).array;
734 
735 			if (this.send(sub))
736 			{
737 				_subAckTimer = setTimer(dur!"msecs"(1_000),
738 					() @safe nothrow
739 					{
740 						logError("MQTT Server didn't respond with SUBACK - disconnecting");
741 						this.disconnectImpl(false);
742 					});
743 			}
744 		}
745 
746 		/**
747 		 * Unsubscribes from the specified topics
748 		 *
749 		 * Params:
750 		 *      topics = Array of topic filters to unsubscribe from
751 		 *
752 		 */
753 		void unsubscribe(const string[] topics...)
754 		{
755 			auto unsub = Unsubscribe();
756 			unsub.packetId = _unsubId = PacketIdGenerator.get.nextPacketId();
757 			unsub.topics = topics.dup;
758 
759 			if (this.send(unsub))
760 			{
761 				_unsubAckTimer = setTimer(dur!"msecs"(1_000),
762 					() @safe nothrow
763 					{
764 						logError("MQTT Server didn't respond with UNSUBACK - disconnecting");
765 						this.disconnectImpl(false);
766 					});
767 			}
768 		}
769 	}
770 
771 	/// Response to connection request
772 	void onConnAck(ConnAck packet)
773 	{
774 		version(MqttDebug) logDebug("MQTT onConnAck - %s", packet);
775 
776 		if (_settings.onConnAck) _settings.onConnAck(this, packet);
777 
778 		if (packet.returnCode == ConnectReturnCode.ConnectionAccepted)
779 		{
780 			version(MqttDebug) logDebug("MQTT Connection accepted");
781 			_conAckTimer.stop();
782 			if (_settings.keepAlive)
783 			{
784 				_pingReqTimer = setTimer(dur!"seconds"(_settings.keepAlive),
785 					() @safe nothrow
786 					{
787 						if (this.send(PingReq()))
788 						{
789 							if (_pingRespTimer && _pingRespTimer.pending) return;
790 
791 							auto timeout = () @safe nothrow
792 							{
793 								logError("MQTT no PINGRESP received - disconnecting");
794 								this.disconnectImpl(false);
795 							};
796 
797 							_pingRespTimer = setTimer(dur!"seconds"(10), timeout, false);
798 							}
799 					}, true);
800 			}
801 			_session.sendQueue.emit();
802 		}
803 		else
804 		{
805 			logError("Connection refused: %s", packet.returnCode);
806 			disconnectImpl(false);
807 		}
808 	}
809 
810 	/// Response to PingReq
811 	void onPingResp(PingResp packet)
812 	{
813 		version(MqttDebug) logDebug("MQTT Received PINGRESP - %s", packet);
814 
815 		if (_pingRespTimer && _pingRespTimer.pending) _pingRespTimer.stop;
816 		if (_settings.onPingResp) _settings.onPingResp(this, packet);
817 	}
818 
819 	// QoS1 handling
820 
821 	/// Publish request acknowledged - QoS1
822 	void onPubAck(PubAck packet)
823 	{
824 		size_t idx;
825 		immutable found = _session.inflightQueue.canFind(packet.packetId, idx, PacketState.waitForPuback);
826 
827 		if (found)
828 		{
829 			version(MqttDebug) logDebug("MQTT Received PUBACK - %s", packet);
830 			//treat the PUBLISH Packet as “unacknowledged” until corresponding PUBACK received
831 			_session.inflightQueue.removeAt(idx);
832 		}
833 		else logWarn("MQTT Received PUBACK with unknown ID - %s", packet);
834 
835 		if (_settings.onPubAck) _settings.onPubAck(this, packet);
836 	}
837 
838 	// QoS2 handling - S:Publish, R: PubRec, S: PubRel, R: PubComp
839 
840 	/// Publish request acknowledged - QoS2
841 	void onPubRec(PubRec packet)
842 	{
843 		size_t idx;
844 		immutable found = _session.inflightQueue.canFind(packet.packetId, idx,
845 			PacketState.waitForPubrec, PacketState.waitForPubcomp); // Both states to handle possible resends of unanswered PubRec packets
846 
847 		//MUST send a PUBREL packet when it receives a PUBREC packet from the receiver.
848 		this.send(PubRel(packet.packetId)); //send directly to avoid lock on filled sendQueue
849 
850 		if (found)
851 		{
852 			version(MqttDebug) logDebug("MQTT Received PUBREC - %s", packet);
853 			_session.inflightQueue[idx].state = PacketState.waitForPubcomp;
854 			_session.inflightQueue.emit();
855 		}
856 		else logWarn("MQTT Received PUBREC with unknown ID - %s", packet);
857 
858 		if (_settings.onPubRec) _settings.onPubRec(this, packet);
859 	}
860 
861 	/// Confirmation that message was succesfully delivered (Sender side)
862 	void onPubComp(PubComp packet)
863 	{
864 		size_t idx;
865 		immutable found = _session.inflightQueue.canFind(packet.packetId, idx, PacketState.waitForPubcomp);
866 
867 		if (found)
868 		{
869 			version(MqttDebug) logDebug("MQTT Received PUBCOMP - %s", packet);
870 			//treat the PUBREL packet as “unacknowledged” until it has received the corresponding PUBCOMP packet from the receiver.
871 			_session.inflightQueue.removeAt(idx);
872 		}
873 		else logWarn("MQTT Received PUBCOMP with unknown ID - %s", packet);
874 
875 		if (_settings.onPubComp) _settings.onPubComp(this, packet);
876 	}
877 
878 	void onPubRel(PubRel packet)
879 	{
880 		size_t idx;
881 		immutable found = _session.inflightQueue.canFind(packet.packetId, idx, PacketState.waitForPubrel);
882 
883 		if (found)
884 		{
885 			version(MqttDebug) logDebug("MQTT Received PUBREL - %s", packet);
886 			_session.inflightQueue.removeAt(idx);
887 		}
888 		else logWarn("MQTT Received PUBREL with unknown ID - %s", packet);
889 
890 		//MUST respond to a PUBREL packet by sending a PUBCOMP packet containing the same Packet Identifier as the PUBREL.
891 		this.send(PubComp(packet.packetId)); //send directly to avoid lock on filled sendQueue
892 
893 		if (_settings.onPubRel) _settings.onPubRel(this, packet);
894 	}
895 
896 	/// Message was received from broker
897 	void onPublish(Publish packet)
898 	{
899 		version(MqttDebug) logDebug("MQTT Received PUBLISH - %s", packet);
900 
901 		if (packet.header.qos == QoSLevel.QoS1)
902 		{
903 			//MUST respond with a PUBACK Packet containing the Packet Identifier from the incoming PUBLISH Packet
904 			this.send(PubAck(packet.packetId));
905 		}
906 		else if (packet.header.qos == QoSLevel.QoS2)
907 		{
908 			//MUST respond with a PUBREC containing the Packet Identifier from the incoming PUBLISH Packet, having accepted ownership of the Application Message.
909 			this.send(PubRec(packet.packetId));
910 			_session.inflightQueue.add(packet, PacketState.waitForPubrel, PacketOrigin.broker);
911 		}
912 
913 		if (_settings.onPublish) _settings.onPublish(this, packet);
914 	}
915 
916 	/// Message was succesfully delivered to broker
917 	void onSubAck(SubAck packet)
918 	{
919 		if (packet.packetId == _subId)
920 		{
921 			assert(_subId != 0);
922 			version(MqttDebug) logDebug("MQTT Received SUBACK - %s", packet);
923 			_subAckTimer.stop();
924 			PacketIdGenerator.get.setUnused(_subId);
925 			_subId = 0;
926 		}
927 		else logWarn("MQTT Received SUBACK with unknown ID - %s", packet);
928 
929 		if (_settings.onSubAck) _settings.onSubAck(this, packet);
930 	}
931 
932 	/// Confirmation that unsubscribe request was successfully delivered to broker
933 	void onUnsubAck(UnsubAck packet)
934 	{
935 		if (packet.packetId == _unsubId)
936 		{
937 			assert(_unsubId != 0);
938 			version(MqttDebug) logDebug("MQTT Received UNSUBACK - %s", packet);
939 			_unsubAckTimer.stop();
940 			PacketIdGenerator.get.setUnused(_unsubId);
941 			_unsubId = 0;
942 		}
943 		else logWarn("MQTT Received UNSUBACK with unknown ID - %s", packet);
944 
945 		if (_settings.onUnsubAck) _settings.onUnsubAck(this, packet);
946 	}
947 
948 	/// Client was disconnected from broker
949 	void onDisconnect() nothrow
950 	{
951 		version (MqttDebug) logDebug("MQTT onDisconnect, connected: %s", this.connected);
952 		if (_settings.onDisconnect)
953 		{
954 			try _settings.onDisconnect(this);
955 			catch (Exception ex) logError("Error calling onDisconnect: %s", ex.msg);
956 		}
957 	}
958 
959 private:
960 	Settings _settings;
961 	TCPConnection _con;
962 	Stream _stream;
963 	Session _session;
964 	Task _listener, _dispatcher;
965 	Serializer!(Appender!(ubyte[])) _sendBuffer;
966 	FixedRingBuffer!ubyte _readBuffer;
967 	ubyte[] _packetBuffer;
968 	bool _disconnecting;
969 	Timer _conAckTimer, _subAckTimer, _unsubAckTimer, _pingReqTimer, _pingRespTimer, _reconnectTimer;
970 	ushort _subId, _unsubId;
971 	RecursiveTaskMutex _readMutex, _writeMutex;
972 
973 final:
974 
975 	void disconnectImpl(bool force) nothrow
976 	{
977 		// we need to stop reconnect timer even if we are already disconnected
978 		if (force) stopTimer(_reconnectTimer);
979 
980 		if (_disconnecting) return; // already in process of disconnecting (aditional calls caused by cleanups)
981 		_disconnecting = true;
982 		scope (exit) onDisconnect(); // call onDisconnect after cleanup
983 
984 		version (MqttDebug) logDebug("MQTT disconnectImpl, connected: %s, force: %s", this.connected, force);
985 
986 		// cleanup connection
987 		if (_con)
988 		{
989 			_con.close();
990 			_con = TCPConnection.init;
991 		}
992 
993 		// terminate dispatcher
994 		_session.inflightQueue.emit();
995 		_session.sendQueue.emit();
996 
997 		// stop ping timers
998 		stopTimer(_pingReqTimer);
999 		stopTimer(_pingRespTimer);
1000 
1001 		// stop ConAck timer (if running)
1002 		stopTimer(_conAckTimer);
1003 
1004 		// set reconnect if not forced to disconnect
1005 		if (!force && _settings.reconnect)
1006 		{
1007 			version (MqttDebug) logDebug("MQTT setting reconnect timer");
1008 
1009 			auto recon = () @safe nothrow
1010 				{
1011 					logDiagnostic("MQTT reconnecting");
1012 					this.connect();
1013 				};
1014 
1015 			_reconnectTimer = setTimer(_settings.reconnect, recon, false);
1016 		}
1017 	}
1018 
1019 	// nothrow wrapper to ensure timer is stopped
1020 	static void stopTimer(ref Timer timer) nothrow
1021 	{
1022 		if (timer && timer.pending) timer.stop();
1023 	}
1024 
1025 	/// Processes data in read buffer. If whole packet is presented, it delegates it to handler
1026 	void proccessData(in ubyte[] data)
1027 	{
1028 		import mqttd.serialization;
1029 		import std.range;
1030 
1031 		version(MqttDebug) logTrace("MQTT IN: 0x%(%02x%)", data);
1032 
1033 		if (_readBuffer.freeSpace < data.length) // ensure all fits to the buffer
1034 			_readBuffer.capacity = _readBuffer.capacity + data.length;
1035 		_readBuffer.put(data);
1036 
1037 		while (_readBuffer.length > 0 && this.connected) // break message processing when disconnected
1038 		{
1039 			// try read packet header
1040 			FixedHeader header = _readBuffer[0]; // type + flags
1041 
1042 			// try read remaining length
1043 			uint pos;
1044 			uint multiplier = 1;
1045 			ubyte digit;
1046 			do
1047 			{
1048 				if (++pos >= _readBuffer.length) return; // not enough data
1049 				digit = _readBuffer[pos];
1050 				header.length += ((digit & 127) * multiplier);
1051 				multiplier *= 128;
1052 				if (multiplier > 128*128*128) throw new PacketFormatException("Malformed remaining length");
1053 			} while ((digit & 128) != 0);
1054 
1055 			if (_readBuffer.length < header.length + pos + 1) return; // not enough data
1056 
1057 			// we've got the whole packet to handle
1058 			_packetBuffer.length = 1 + pos + header.length; // packet type byte + remaining size bytes + remaining size
1059 			_readBuffer.read(_packetBuffer); // read whole packet from read buffer
1060 
1061 			with (PacketType)
1062 			{
1063 				final switch (header.type)
1064 				{
1065 					case CONNACK:
1066 						onConnAck(_packetBuffer.deserialize!ConnAck());
1067 						break;
1068 					case PINGRESP:
1069 						onPingResp(_packetBuffer.deserialize!PingResp());
1070 						break;
1071 					case PUBACK:
1072 						onPubAck(_packetBuffer.deserialize!PubAck());
1073 						break;
1074 					case PUBREC:
1075 						onPubRec(_packetBuffer.deserialize!PubRec());
1076 						break;
1077 					case PUBREL:
1078 						onPubRel(_packetBuffer.deserialize!PubRel());
1079 						break;
1080 					case PUBCOMP:
1081 						onPubComp(_packetBuffer.deserialize!PubComp());
1082 						break;
1083 					case PUBLISH:
1084 						onPublish(_packetBuffer.deserialize!Publish());
1085 						break;
1086 					case SUBACK:
1087 						onSubAck(_packetBuffer.deserialize!SubAck());
1088 						break;
1089 					case UNSUBACK:
1090 						onUnsubAck(_packetBuffer.deserialize!UnsubAck());
1091 						break;
1092 					case CONNECT:
1093 					case SUBSCRIBE:
1094 					case UNSUBSCRIBE:
1095 					case PINGREQ:
1096 					case DISCONNECT:
1097 					case RESERVED1:
1098 					case RESERVED2:
1099 						throw new Exception(format("Unexpected packet type '%s'", header.type));
1100 				}
1101 			}
1102 		}
1103 	}
1104 
1105 	/// loop to receive packets
1106 	void listener()
1107 	{
1108 		version (MqttDebug)
1109 		{
1110 			() @trusted { logDebug("MQTT Entering listening loop - TID:%s", thisTid); }();
1111 			scope (exit) logDebug("MQTT Exiting listening loop");
1112 		}
1113 
1114 		scope (exit)
1115 		{
1116 			_listener = Task.init;
1117 			if (!_dispatcher && _stream) { _stream.finalize(); _stream = null; }
1118 		}
1119 
1120 		auto buffer = new ubyte[4096];
1121 
1122 		size_t size;
1123 		while (this.connected)
1124 		{
1125 			{
1126 				auto lock = scopedMutexLock(_readMutex);
1127 				if (_stream.empty) break;
1128 				size = cast(size_t) _stream.leastSize;
1129 				if (size == 0) break;
1130 				if (size > buffer.length) size = buffer.length;
1131 				_stream.read(buffer[0..size]);
1132 			}
1133 			proccessData(buffer[0..size]);
1134 		}
1135 
1136 		disconnectImpl(false);
1137 	}
1138 
1139 	/// loop to dispatch in session stored packets
1140 	void dispatcher()
1141 	{
1142 		version (MqttDebug)
1143 		{
1144 			() @trusted { logDebug("MQTT Entering dispatch loop - TID:%s", thisTid); }();
1145 			scope (exit) logDebug("MQTT Exiting dispatch loop");
1146 		}
1147 
1148 		scope (exit)
1149 		{
1150 			_dispatcher = Task.init;
1151 			if (!_listener && _stream) { _stream.finalize(); _stream = null; }
1152 		}
1153 
1154 		while (this.connected)
1155 		{
1156 			// wait for session state change
1157 			_session.sendQueue.wait();
1158 
1159 			if (!this.connected) break;
1160 			if (_conAckTimer.pending) continue; //wait for ConAck before sending any messages
1161 
1162 			while (_session.sendQueue.length)
1163 			{
1164 				// wait for space in inflight queue
1165 				while (_session.inflightQueue.full)
1166 				{
1167 					version (MqttDebug) logDebug("MQTT InflightQueue full, wait before sending next message");
1168 					_session.inflightQueue.wait();
1169 					if (!this.connected) goto dispatcherFin; // we can be disconnected in between
1170 				}
1171 
1172 				version (MqttDebug) logDebugV("MQTT Packets in session: send=%s, wait=%s", _session.sendQueue.length, _session.inflightQueue.length);
1173 				auto ctx = _session.sendQueue.front;
1174 				final switch (ctx.state)
1175 				{
1176 					// QoS0 handling - S:Publish, S:forget
1177 					case PacketState.queuedQos0: // just send it
1178 						//Sender request QoS0
1179 						assert(ctx.origin == PacketOrigin.client);
1180 						this.send(ctx.message);
1181 						break;
1182 
1183 					// QoS1 handling - S:Publish, R:PubAck
1184 					case PacketState.queuedQos1:
1185 						//Sender request QoS1
1186 						//treat the Packet as “unacknowledged” until the corresponding PUBACK packet received
1187 						assert(ctx.header.qos == QoSLevel.QoS1);
1188 						assert(ctx.origin == PacketOrigin.client);
1189 						this.send(ctx.message);
1190 						ctx.state = PacketState.waitForPuback;
1191 						_session.inflightQueue.add(ctx);
1192 						break;
1193 
1194 					// QoS2 handling - S:Publish, R: PubRec, S: PubRel, R: PubComp
1195 					case PacketState.queuedQos2:
1196 						//Sender request QoS2
1197 						//treat the PUBLISH packet as “unacknowledged” until it has received the corresponding PUBREC packet from the receiver.
1198 						assert(ctx.header.qos == QoSLevel.QoS2);
1199 						assert(ctx.origin == PacketOrigin.client);
1200 						this.send(ctx.message);
1201 						ctx.state = PacketState.waitForPubrec;
1202 						_session.inflightQueue.add(ctx);
1203 						break;
1204 
1205 					case PacketState.waitForPuback:
1206 					case PacketState.waitForPubrec:
1207 					case PacketState.waitForPubcomp:
1208 					case PacketState.waitForPubrel:
1209 						assert(0, "Invalid state");
1210 				}
1211 
1212 				//remove from sendQueue
1213 				_session.sendQueue.popFront;
1214 			}
1215 		}
1216 
1217 dispatcherFin:
1218 		disconnectImpl(false);
1219 	}
1220 
1221 	auto send(T)(auto ref T msg) nothrow if (isMqttPacket!T)
1222 	{
1223 		static if (is (T == Publish))
1224 		{
1225 			version (MqttDebug)
1226 			{
1227 				static SysTime last;
1228 				static size_t messages;
1229 
1230 				try
1231 				{
1232 					if (last == SysTime.init) last = Clock.currTime;
1233 					messages++;
1234 
1235 					auto diff = Clock.currTime - last;
1236 					if (diff.total!"msecs" >= 1_000)
1237 					{
1238 						logDiagnostic("MQTT %s messages/s", cast(double)(1_000 * messages)/diff.total!"msecs");
1239 						messages = 0;
1240 						last = Clock.currTime;
1241 					}
1242 				}
1243 				catch (Exception) {}
1244 			}
1245 		}
1246 
1247 		_sendBuffer.clear(); // clear to write new
1248 		try _sendBuffer.serialize(msg); catch (Exception ex) { assert(false, ex.msg); }
1249 
1250 		if (this.connected)
1251 		{
1252 			version(MqttDebug)
1253 			{
1254 				logDebug("MQTT OUT: %s", msg);
1255 				logTrace("MQTT OUT: 0x%(%02x%)", _sendBuffer.data);
1256 			}
1257 			try
1258 			{
1259 				auto lock = scopedMutexLock(_writeMutex);
1260 				_stream.write(_sendBuffer.data);
1261 				return true;
1262 			}
1263 			catch (Exception)
1264 			{
1265 				static if (!is(T == Disconnect)) this.disconnectImpl(false);
1266 				return false;
1267 			}
1268 		}
1269 		else return false;
1270 	}
1271 }