1 /** 2 * MQTT client implementation 3 * 4 * Author: 5 * Tomáš Chaloupka <chalucha@gmail.com> 6 * 7 * License: 8 * Boost Software License 1.0 (BSL-1.0) 9 * 10 * Permission is hereby granted, free of charge, to any person or organization obtaining a copy 11 * of the software and accompanying documentation covered by this license (the "Software") to use, 12 * reproduce, display, distribute, execute, and transmit the Software, and to prepare derivative 13 * works of the Software, and to permit third-parties to whom the Software is furnished to do so, 14 * all subject to the following: 15 * 16 * The copyright notices in the Software and this entire statement, including the above license 17 * grant, this restriction and the following disclaimer, must be included in all copies of the Software, 18 * in whole or in part, and all derivative works of the Software, unless such copies or derivative works 19 * are solely in the form of machine-executable object code generated by a source language processor. 20 * 21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, 22 * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR 23 * PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR ANYONE 24 * DISTRIBUTING THE SOFTWARE BE LIABLE FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, 25 * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR 26 * OTHER DEALINGS IN THE SOFTWARE. 27 */ 28 module mqttd.client; 29 30 import mqttd.messages; 31 import mqttd.serialization; 32 import mqttd.stream_wrapper; 33 import mqttd.traits; 34 35 import std.algorithm : any, map; 36 import std.array : array; 37 import std.datetime; 38 import std.exception; 39 debug import std.stdio; 40 import std..string : format, representation; 41 import std.traits; 42 import std.typecons : Flag, No, Yes; 43 44 import vibe.core.concurrency; 45 import vibe.core.log; 46 import vibe.core.net: TCPConnection; 47 import vibe.core.stream; 48 import vibe.core.sync; 49 import vibe.core.task; 50 import vibe.stream.tls; 51 import vibe.utils.array : FixedRingBuffer; 52 53 // constants 54 enum MQTT_MAX_PACKET_ID = ushort.max; /// maximal packet id (0..65536) - defined by MQTT protocol 55 56 // default settings 57 enum MQTT_DEFAULT_BROKER_PORT = 1883u; /// default mqtt broker port 58 enum MQTT_DEFAULT_BROKER_SSL_PORT = 8883u; /// default mqtt broker ssl port 59 enum MQTT_DEFAULT_SENDQUEUE_SIZE = 1000u; /// maximal number of packets stored in queue to send 60 enum MQTT_DEFAULT_INFLIGHTQUEUE_SIZE = 10u; /// maximal number of packets which can be processed at the same time 61 enum MQTT_DEFAULT_CLIENT_ID = "vibe-mqtt"; /// default client identifier 62 enum MQTT_DEFAULT_RETRY_DELAY = 10_000u; /// retry interval to resend publish (QoS 1 or 2), subscribe and unsubscribe messages [ms] 63 enum MQTT_DEFAULT_RETRY_ATTEMPTS = 3u; /// max publish, subscribe and unsubscribe retry for QoS Level 1 or 2 64 65 // aliases 66 alias SessionContainer = FixedRingBuffer!(MessageContext); 67 68 template Callback(T...) 69 { 70 static if (T.length == 1) alias Callback = void delegate(scope MqttClient ctx, in T packet) @safe; 71 else static if (T.length == 0) alias Callback = void delegate(scope MqttClient ctx) @safe; 72 else static assert(0, "Invalid callback"); 73 } 74 75 /// MqttClient settings 76 struct Settings 77 { 78 string host = "127.0.0.1"; /// message broker address 79 ushort port = MQTT_DEFAULT_BROKER_PORT; /// message broker port 80 string clientId = MQTT_DEFAULT_CLIENT_ID; /// Client Id to identify within message broker (must be unique) 81 string userName = null; /// optional user name to login with 82 string password = null; /// user password 83 int retryDelay = MQTT_DEFAULT_RETRY_DELAY; /// retry interval to resend publish QoS 1 and 2 messages [ms] 84 int retryAttempts = MQTT_DEFAULT_RETRY_ATTEMPTS; /// how many times will client try to resend QoS1 and QoS2 messages 85 bool cleanSession = true; /// clean client and server session state on connect 86 size_t sendQueueSize = MQTT_DEFAULT_SENDQUEUE_SIZE; /// maximal number of packets stored in queue to send 87 size_t inflightQueueSize = MQTT_DEFAULT_INFLIGHTQUEUE_SIZE; /// maximal number of packets which can be processed at the same time 88 ushort keepAlive; /// The Keep Alive is a time interval [s] to send control packets to server. It's used to determine that the network and broker are working. If set to 0, no control packets are send automatically (default). 89 Duration reconnect; /// Time interval [s] in which client tries to reconnect to broker if disconnected. If set to 0, auto reconnect is disabled (default) 90 bool useSsl = false; /// use SSL/TLS for the connection 91 string trustedCertificateFile = null; /// list of trusted certificates for verifying peer certificates 92 TLSPeerValidationMode peerValidationMode = TLSPeerValidationMode.none; /// mode for verifying peer certificates 93 string[] alpn; /// list of ALPN extension protocols 94 95 // callbacks 96 Callback!ConnAck onConnAck; 97 Callback!PingResp onPingResp; 98 Callback!PubAck onPubAck; 99 Callback!PubRec onPubRec; 100 Callback!PubComp onPubComp; 101 Callback!PubRel onPubRel; 102 Callback!Publish onPublish; 103 Callback!SubAck onSubAck; 104 Callback!UnsubAck onUnsubAck; 105 Callback!() onDisconnect; 106 } 107 108 /** 109 * Packet ID generator 110 * Holds the status of ID usage and generates the next ones. 111 * 112 * It's a thread safe singleton 113 */ 114 class PacketIdGenerator 115 { 116 private: 117 this() 118 { 119 _event = createManualEvent(); 120 setUsed(0); 121 } 122 123 // TLS flag, each thread has its own 124 static bool instantiated_; 125 126 // "True" global 127 __gshared PacketIdGenerator instance_; 128 129 pragma(inline, true) auto getIdx(ushort id) { return id / (size_t.sizeof*8); } 130 pragma(inline, true) auto getIdxValue(ushort id) { return cast(size_t)(1uL << (id % (size_t.sizeof*8))); } 131 132 LocalManualEvent _event; 133 ushort _packetId = 0u; 134 size_t[(MQTT_MAX_PACKET_ID+1)/(size_t.sizeof*8)] _idUsage; //1024 * 64 = 65536 => id usage flags storage 135 136 public: 137 138 /// Instance of Packet ID generator 139 @property static PacketIdGenerator get() @trusted 140 { 141 // Since every thread has its own instantiated_ variable, 142 // there is no need for synchronization here. 143 if (!instantiated_) 144 { 145 synchronized (PacketIdGenerator.classinfo) 146 { 147 if (!instance_) 148 { 149 instance_ = new PacketIdGenerator(); 150 } 151 instantiated_ = true; 152 } 153 } 154 return instance_; 155 } 156 157 @safe: 158 159 /// Gets next packet id. If the session is full it won't return till there is free space again 160 @property auto nextPacketId() 161 out (result) 162 { 163 assert(result, "packet id can't be 0!"); 164 assert(_event); 165 } 166 body 167 { 168 do 169 { 170 if (!_idUsage[].any!(a => a != size_t.max)()) 171 { 172 version (MqttDebug) logDiagnostic("MQTT all packet ids in use - waiting"); 173 this._event.wait(); 174 continue; 175 } 176 177 //packet id can't be 0! 178 _packetId = cast(ushort)((_packetId % MQTT_MAX_PACKET_ID) != 0 ? _packetId + 1 : 1); 179 } 180 while (isUsed(_packetId)); 181 182 version (MqttDebug) if (_packetId == 1) logDiagnostic("ID Overflow"); 183 184 setUsed(_packetId); 185 186 return _packetId; 187 } 188 189 /// Is packet id currently used? 190 pragma(inline) auto isUsed(ushort id) { return (_idUsage[getIdx(id)] & getIdxValue(id)) == getIdxValue(id); } 191 192 /// Sets packet id as used 193 pragma(inline) void setUsed(ushort id) 194 { 195 assert(!isUsed(id)); 196 assert(_event); 197 198 _idUsage[getIdx(id)] |= getIdxValue(id); 199 version (unittest) {} //HACK: For some reason unittest will segfault when emiting 200 else _event.emit(); 201 } 202 203 /// Sets packet id as unused 204 pragma(inline) void setUnused(ushort id) 205 { 206 assert(id != 0); 207 assert(isUsed(id)); 208 assert(_event); 209 210 _idUsage[getIdx(id)] ^= getIdxValue(id); 211 212 version (unittest) {} //HACK: For some reason unittest will segfault when emiting 213 else _event.emit(); 214 } 215 } 216 217 unittest 218 { 219 auto gen = PacketIdGenerator.get; 220 assert(gen.getIdx(1) == 0); 221 assert(gen.getIdx(64) == 1); 222 assert(gen.getIdxValue(1) == gen.getIdxValue(65)); 223 assert(gen.getIdxValue(63) == 0x8000000000000000); 224 assert(gen.getIdx(128) == 2); 225 gen.setUsed(1); 226 assert(gen.isUsed(1)); 227 gen.setUsed(64); 228 assert(gen.isUsed(64)); 229 gen.setUnused(64); 230 assert(!gen.isUsed(64)); 231 assert(gen.isUsed(1)); 232 gen.setUnused(1); 233 234 foreach(i; 1..size_t.sizeof*8) gen.setUsed(cast(ushort)i); 235 assert(gen._idUsage[0] == size_t.max); 236 } 237 238 /// MQTT packet state 239 enum PacketState 240 { 241 queuedQos0, /// QOS = 0, Publish message queued 242 queuedQos1, /// QOS = 1, Publish message queued 243 queuedQos2, /// QOS = 2, Publish message queued 244 245 waitForPuback, /// QOS = 1, PUBLISH sent, wait for PUBACK 246 waitForPubrec, /// QOS = 2, PUBLISH sent, wait for PUBREC 247 waitForPubrel, /// QOS = 2, PUBREC sent, wait for PUBREL 248 waitForPubcomp, /// QOS = 2, PUBREL sent, wait for PUBCOMP 249 } 250 251 /// Origin of the stored packet 252 enum PacketOrigin 253 { 254 client, /// originated from this client 255 broker /// originated from broker 256 } 257 258 /// Context for MQTT packet stored in Session 259 private @safe struct MessageContext 260 { 261 ~this() 262 { 263 decRef(); 264 } 265 266 this(Publish message, PacketState state, PacketOrigin origin = PacketOrigin.client) 267 { 268 assert(refcount is null); 269 refcount = new int(1); 270 271 this.timestamp = Clock.currTime; 272 this.state = state; 273 this.origin = origin; 274 this.message = message; 275 if (origin == PacketOrigin.client && state != PacketState.queuedQos0) 276 this.message.packetId = PacketIdGenerator.get.nextPacketId(); 277 } 278 279 this (this) 280 { 281 if (refcount !is null) *refcount += 1; 282 } 283 284 PacketState state; /// message state 285 uint attempt; /// Attempt (for retry) 286 SysTime timestamp; /// Timestamp (for retry) 287 PacketOrigin origin; /// message origin 288 Publish message; /// message itself 289 290 alias message this; 291 292 private: 293 int* refcount; 294 295 void decRef() 296 { 297 if (refcount !is null) 298 { 299 if ((*refcount -= 1) == 0) 300 { 301 refcount = null; 302 if (this.origin == PacketOrigin.client && this.packetId) 303 PacketIdGenerator.get.setUnused(this.packetId); 304 } 305 } 306 } 307 } 308 309 /// Queue storage helper for session 310 private @safe struct SessionQueue(Flag!"send" send) 311 { 312 this(Settings settings) 313 { 314 _event = createManualEvent(); 315 316 static if (send) _packets = SessionContainer(settings.sendQueueSize); 317 else _packets = SessionContainer(settings.inflightQueueSize); 318 } 319 320 @disable this(); 321 @disable this(this) {} 322 323 /** 324 * Adds packet to Session 325 * If the session is full the call will be blocked until there is space again. 326 * Also if there is no free packetId to use, it will be blocked until it is. 327 * 328 * Params: 329 * packet = packet to be sent (can be Publish, Subscribe or Unsubscribe) 330 * state = initial packet state 331 * origin = origin of the packet (session stores control packets from broker too) 332 * 333 * Returns: 334 * Assigned packetId (0 if QoS0 is set). If message originates from broker, it keeps the original.. 335 */ 336 ushort add(Publish packet, PacketState state, PacketOrigin origin = PacketOrigin.client) 337 { 338 return add(MessageContext(packet, state, origin)); 339 } 340 341 /// ditto 342 ushort add(MessageContext ctx) 343 in 344 { 345 assert(ctx.packetId || ctx.state == PacketState.queuedQos0, "PacketId must be set"); 346 static if (send) assert(ctx.origin == PacketOrigin.client, "Only client messages can be added to send queue"); 347 348 with (PacketState) 349 { 350 final switch (ctx.state) 351 { 352 case queuedQos0: 353 case queuedQos1: 354 case queuedQos2: 355 static if (!send) assert(0, "Invalid packet state"); 356 else break; 357 case waitForPuback: 358 case waitForPubcomp: 359 case waitForPubrec: 360 case waitForPubrel: 361 static if (send) assert(0, "Invalid packet state"); 362 else break; 363 } 364 } 365 } 366 body 367 { 368 while (_packets.full) 369 { 370 static if (send) 371 { 372 if (ctx.state == PacketState.queuedQos0) 373 { 374 version (MqttDebug) logDebug("MQTT SendQueueFull - dropping QoS0 publish msg"); 375 return cast(ushort)0; 376 } 377 else 378 { 379 version (MqttDebug) 380 { 381 logDebug("MQTT SendQueueFull ([%s] %s) - waiting", ctx.packetId, ctx.state); 382 scope (exit) logDebug("MQTT SendQueueFull after wait ([%s] %s)", ctx.packetId, ctx.state); 383 } 384 _event.wait(); 385 } 386 } 387 else 388 { 389 version (MqttDebug) 390 { 391 logDebug("MQTT InflightQueueFull ([%s] %s) - waiting", ctx.packetId, ctx.state); 392 scope (exit) logDebug("MQTT InflightQueueFull after wait ([%s] %s)", ctx.packetId, ctx.state); 393 } 394 _event.wait(); 395 } 396 } 397 398 static if (send) assert(!_packets.full, format("SEND %s", ctx)); 399 else assert(!_packets.full, format("WAIT %s", ctx)); 400 401 _packets.put(ctx); 402 _event.emit(); 403 404 return ctx.packetId; 405 } 406 407 /// Waits until the session state is changed 408 auto wait() 409 { 410 return _event.wait(); 411 } 412 413 /// Waits until the session state is changed or timeout is reached 414 auto wait(Duration timeout) 415 { 416 return _event.wait(timeout, _event.emitCount); 417 } 418 419 /// Manually emit session state change to all listeners 420 auto emit() 421 { 422 return _event.emit(); 423 } 424 425 /// Removes the stored PacketContext 426 void removeAt(size_t idx) 427 { 428 assert(idx < this.length); 429 430 _packets.removeAt(_packets[idx..idx+1]); 431 _event.emit(); 432 } 433 434 /// Finds package context stored in session 435 auto canFind(ushort packetId, out size_t idx, PacketState[] state...) 436 { 437 import alg = std.algorithm : canFind; 438 foreach (i, ref c; _packets) 439 { 440 if (c.packetId == packetId && (!state.length || alg.canFind!(a => a == c.state)(state))) 441 { 442 idx = i; 443 return true; 444 } 445 } 446 447 return false; 448 } 449 450 nothrow: 451 452 ref MessageContext opIndex(size_t idx) @nogc pure 453 { 454 assert(idx < this.length); 455 return _packets[idx]; 456 } 457 458 @property ref MessageContext front() @nogc pure 459 { 460 return _packets.front(); 461 } 462 463 void popFront() 464 { 465 assert(!_packets.empty); 466 _packets.popFront(); 467 _event.emit(); 468 } 469 470 @property bool empty() const @nogc pure 471 { 472 return _packets.empty; 473 } 474 475 @property bool full() const @nogc pure 476 { 477 return _packets.full; 478 } 479 480 /// Number of packets to process 481 @property auto length() const @nogc pure 482 { 483 return _packets.length; 484 } 485 486 /// Clears cached messages 487 void clear() 488 { 489 _packets.clear(); 490 _event.emit(); 491 } 492 493 private: 494 LocalManualEvent _event; 495 SessionContainer _packets; 496 } 497 498 /// MQTT session status holder 499 private @safe struct Session 500 { 501 alias InflightQueue = SessionQueue!(No.send); 502 alias SendQueue = SessionQueue!(Yes.send); 503 504 this(Settings settings) 505 { 506 _inflightQueue = InflightQueue(settings); 507 _sendQueue = SendQueue(settings); 508 } 509 510 nothrow: 511 512 @disable this(this) {} 513 514 @property auto ref inflightQueue() 515 { 516 return _inflightQueue; 517 } 518 519 @property auto ref sendQueue() 520 { 521 return _sendQueue; 522 } 523 524 void clear() 525 { 526 this._inflightQueue.clear(); 527 this._sendQueue.clear(); 528 } 529 530 private: 531 /// Packets to handle 532 InflightQueue _inflightQueue; 533 SendQueue _sendQueue; 534 } 535 536 unittest 537 { 538 auto s = Session(Settings()); 539 540 auto pub = Publish(); 541 pub.header.qos = QoSLevel.QoS1; 542 auto id = s.sendQueue.add(pub, PacketState.queuedQos1); 543 544 assert(s.sendQueue.length == 1); 545 546 size_t idx; 547 assert(id != 0); 548 assert(s.sendQueue.canFind(id, idx)); 549 assert(idx == 0); 550 assert(s.sendQueue.length == 1); 551 552 auto ctx = s.sendQueue[idx]; 553 554 assert(ctx.state == PacketState.queuedQos1); 555 assert(ctx.attempt == 0); 556 assert(ctx.message != Publish.init); 557 assert(ctx.timestamp != SysTime.init); 558 559 s.sendQueue.removeAt(idx); 560 assert(s.sendQueue.length == 0); 561 } 562 563 /// MQTT Client implementation 564 @safe class MqttClient 565 { 566 import std.array : Appender; 567 import vibe.core.core : createTimer, setTimer, Timer; 568 569 this(Settings settings) 570 { 571 import std.socket : Socket; 572 573 _readMutex = new RecursiveTaskMutex(); 574 _writeMutex = new RecursiveTaskMutex(); 575 576 _settings = settings; 577 if (_settings.clientId.length == 0) // set clientId if not provided 578 _settings.clientId = Socket.hostName; 579 580 _readBuffer.capacity = 4 * 1024; 581 _session = Session(settings); 582 _conAckTimer = createTimer( 583 () @safe nothrow 584 { 585 logWarn("MQTT ConAck not received, disconnecting"); 586 this.disconnectImpl(false); 587 }); 588 } 589 590 /// Connects to the specified broker and sends it the Connect packet 591 void connect() @safe nothrow 592 body 593 { 594 import vibe.core.core: runTask; 595 import vibe.core.net: connectTCP; 596 597 if (_conAckTimer.pending) 598 { 599 version(MqttDebug) logDebug("MQTT Broker already Connecting"); 600 return; 601 } 602 603 if (this.connected) 604 { 605 version(MqttDebug) logDebug("MQTT already Connected to Broker"); 606 return; 607 } 608 609 version(MqttDebug) logDebug("MQTT Connect"); 610 611 //cleanup before reconnect 612 _readBuffer.clear(); 613 if (_settings.cleanSession ) _session.clear(); 614 _disconnecting = false; 615 616 try 617 { 618 _con = connectTCP(_settings.host, _settings.port); 619 if(_settings.useSsl) 620 { 621 auto sslctx = createTLSContext(TLSContextKind.client); 622 sslctx.peerValidationMode = TLSPeerValidationMode.none; 623 if(_settings.trustedCertificateFile !is null) 624 { 625 sslctx.useTrustedCertificateFile(_settings.trustedCertificateFile); 626 } 627 if (_settings.alpn.length) 628 sslctx.setClientALPN(_settings.alpn); 629 _stream = createTLSStream(_con, sslctx); 630 } 631 else 632 { 633 _stream = new StreamWrapper!TCPConnection(_con); 634 } 635 _listener = runTask(&listener); 636 _dispatcher = runTask(&dispatcher); 637 638 version(MqttDebug) logDebug("MQTT Broker Connecting"); 639 640 auto con = Connect(); 641 con.clientIdentifier = _settings.clientId; 642 con.flags.cleanSession = _settings.cleanSession; 643 con.keepAlive = cast(ushort)((_settings.keepAlive * 3) / 2); 644 if (_settings.userName.length > 0) 645 { 646 con.flags.userName = true; 647 con.userName = _settings.userName; 648 if (_settings.password.length > 0) 649 { 650 con.flags.password = true; 651 con.password = _settings.password; 652 } 653 } 654 655 this.send(con); 656 _conAckTimer.rearm(5.seconds); 657 } 658 catch (Exception ex) 659 { 660 () @trusted {logError("MQTT Error connecting to the broker: %s", ex);}(); 661 disconnectImpl(false); 662 } 663 } 664 665 /// Sends Disconnect packet to the broker and closes the underlying connection 666 void disconnect() nothrow 667 { 668 if (this.connected) 669 { 670 version(MqttDebug) logDebug("MQTT Disconnecting from Broker"); 671 672 this.send(Disconnect()); 673 disconnectImpl(true); 674 675 if(Task.getThis !is _listener) 676 try _listener.join; catch (Exception) {} 677 } 678 else 679 { 680 version(MqttDebug) logDebug("MQTT Already Disconnected from Broker"); 681 disconnectImpl(true); // to make sure we stop any reconnect attempts too 682 } 683 } 684 685 final 686 { 687 /** 688 * Return true, if client is in a connected state 689 */ 690 @property bool connected() const nothrow 691 { 692 return !_disconnecting && _con && _con.connected; 693 } 694 695 /** 696 * Publishes the message on the specified topic 697 * 698 * Params: 699 * topic = Topic to send message to 700 * payload = Content of the message 701 * qos = Required QoSLevel to handle message (default is QoSLevel.AtMostOnce) 702 * retain = If true, the server must store the message so that it can be delivered to future subscribers 703 * 704 */ 705 void publish(T)(in string topic, in T payload, QoSLevel qos = QoSLevel.QoS0, bool retain = false) 706 if (isSomeString!T || (isArray!T && is(ForeachType!T : ubyte))) 707 { 708 auto pub = Publish(); 709 pub.header.qos = qos; 710 pub.header.retain = retain; 711 pub.topic = topic; 712 static if (isSomeString!T) pub.payload = payload.representation.dup; 713 else pub.payload = payload.dup; 714 715 //TODO: Maybe send QoS0 directly? Use settings parameter for it? 716 _session.sendQueue.add(pub, qos == QoSLevel.QoS0 ? 717 PacketState.queuedQos0 : 718 (qos == QoSLevel.QoS1 ? PacketState.queuedQos1 : PacketState.queuedQos2)); 719 } 720 721 /** 722 * Subscribes to the specified topics 723 * 724 * Params: 725 * topics = Array of topic filters to subscribe to 726 * qos = This gives the maximum QoS level at which the Server can send Application Messages to the Client. 727 * 728 */ 729 void subscribe(const string[] topics, QoSLevel qos = QoSLevel.QoS0) 730 { 731 auto sub = Subscribe(); 732 sub.packetId = _subId = PacketIdGenerator.get.nextPacketId(); 733 sub.topics = topics.map!(a => Topic(a, qos)).array; 734 735 if (this.send(sub)) 736 { 737 _subAckTimer = setTimer(dur!"msecs"(1_000), 738 () @safe nothrow 739 { 740 logError("MQTT Server didn't respond with SUBACK - disconnecting"); 741 this.disconnectImpl(false); 742 }); 743 } 744 } 745 746 /** 747 * Unsubscribes from the specified topics 748 * 749 * Params: 750 * topics = Array of topic filters to unsubscribe from 751 * 752 */ 753 void unsubscribe(const string[] topics...) 754 { 755 auto unsub = Unsubscribe(); 756 unsub.packetId = _unsubId = PacketIdGenerator.get.nextPacketId(); 757 unsub.topics = topics.dup; 758 759 if (this.send(unsub)) 760 { 761 _unsubAckTimer = setTimer(dur!"msecs"(1_000), 762 () @safe nothrow 763 { 764 logError("MQTT Server didn't respond with UNSUBACK - disconnecting"); 765 this.disconnectImpl(false); 766 }); 767 } 768 } 769 } 770 771 /// Response to connection request 772 void onConnAck(ConnAck packet) 773 { 774 version(MqttDebug) logDebug("MQTT onConnAck - %s", packet); 775 776 if (_settings.onConnAck) _settings.onConnAck(this, packet); 777 778 if (packet.returnCode == ConnectReturnCode.ConnectionAccepted) 779 { 780 version(MqttDebug) logDebug("MQTT Connection accepted"); 781 _conAckTimer.stop(); 782 if (_settings.keepAlive) 783 { 784 _pingReqTimer = setTimer(dur!"seconds"(_settings.keepAlive), 785 () @safe nothrow 786 { 787 if (this.send(PingReq())) 788 { 789 if (_pingRespTimer && _pingRespTimer.pending) return; 790 791 auto timeout = () @safe nothrow 792 { 793 logError("MQTT no PINGRESP received - disconnecting"); 794 this.disconnectImpl(false); 795 }; 796 797 _pingRespTimer = setTimer(dur!"seconds"(10), timeout, false); 798 } 799 }, true); 800 } 801 _session.sendQueue.emit(); 802 } 803 else 804 { 805 logError("Connection refused: %s", packet.returnCode); 806 disconnectImpl(false); 807 } 808 } 809 810 /// Response to PingReq 811 void onPingResp(PingResp packet) 812 { 813 version(MqttDebug) logDebug("MQTT Received PINGRESP - %s", packet); 814 815 if (_pingRespTimer && _pingRespTimer.pending) _pingRespTimer.stop; 816 if (_settings.onPingResp) _settings.onPingResp(this, packet); 817 } 818 819 // QoS1 handling 820 821 /// Publish request acknowledged - QoS1 822 void onPubAck(PubAck packet) 823 { 824 size_t idx; 825 immutable found = _session.inflightQueue.canFind(packet.packetId, idx, PacketState.waitForPuback); 826 827 if (found) 828 { 829 version(MqttDebug) logDebug("MQTT Received PUBACK - %s", packet); 830 //treat the PUBLISH Packet as “unacknowledged” until corresponding PUBACK received 831 _session.inflightQueue.removeAt(idx); 832 } 833 else logWarn("MQTT Received PUBACK with unknown ID - %s", packet); 834 835 if (_settings.onPubAck) _settings.onPubAck(this, packet); 836 } 837 838 // QoS2 handling - S:Publish, R: PubRec, S: PubRel, R: PubComp 839 840 /// Publish request acknowledged - QoS2 841 void onPubRec(PubRec packet) 842 { 843 size_t idx; 844 immutable found = _session.inflightQueue.canFind(packet.packetId, idx, 845 PacketState.waitForPubrec, PacketState.waitForPubcomp); // Both states to handle possible resends of unanswered PubRec packets 846 847 //MUST send a PUBREL packet when it receives a PUBREC packet from the receiver. 848 this.send(PubRel(packet.packetId)); //send directly to avoid lock on filled sendQueue 849 850 if (found) 851 { 852 version(MqttDebug) logDebug("MQTT Received PUBREC - %s", packet); 853 _session.inflightQueue[idx].state = PacketState.waitForPubcomp; 854 _session.inflightQueue.emit(); 855 } 856 else logWarn("MQTT Received PUBREC with unknown ID - %s", packet); 857 858 if (_settings.onPubRec) _settings.onPubRec(this, packet); 859 } 860 861 /// Confirmation that message was succesfully delivered (Sender side) 862 void onPubComp(PubComp packet) 863 { 864 size_t idx; 865 immutable found = _session.inflightQueue.canFind(packet.packetId, idx, PacketState.waitForPubcomp); 866 867 if (found) 868 { 869 version(MqttDebug) logDebug("MQTT Received PUBCOMP - %s", packet); 870 //treat the PUBREL packet as “unacknowledged” until it has received the corresponding PUBCOMP packet from the receiver. 871 _session.inflightQueue.removeAt(idx); 872 } 873 else logWarn("MQTT Received PUBCOMP with unknown ID - %s", packet); 874 875 if (_settings.onPubComp) _settings.onPubComp(this, packet); 876 } 877 878 void onPubRel(PubRel packet) 879 { 880 size_t idx; 881 immutable found = _session.inflightQueue.canFind(packet.packetId, idx, PacketState.waitForPubrel); 882 883 if (found) 884 { 885 version(MqttDebug) logDebug("MQTT Received PUBREL - %s", packet); 886 _session.inflightQueue.removeAt(idx); 887 } 888 else logWarn("MQTT Received PUBREL with unknown ID - %s", packet); 889 890 //MUST respond to a PUBREL packet by sending a PUBCOMP packet containing the same Packet Identifier as the PUBREL. 891 this.send(PubComp(packet.packetId)); //send directly to avoid lock on filled sendQueue 892 893 if (_settings.onPubRel) _settings.onPubRel(this, packet); 894 } 895 896 /// Message was received from broker 897 void onPublish(Publish packet) 898 { 899 version(MqttDebug) logDebug("MQTT Received PUBLISH - %s", packet); 900 901 if (packet.header.qos == QoSLevel.QoS1) 902 { 903 //MUST respond with a PUBACK Packet containing the Packet Identifier from the incoming PUBLISH Packet 904 this.send(PubAck(packet.packetId)); 905 } 906 else if (packet.header.qos == QoSLevel.QoS2) 907 { 908 //MUST respond with a PUBREC containing the Packet Identifier from the incoming PUBLISH Packet, having accepted ownership of the Application Message. 909 this.send(PubRec(packet.packetId)); 910 _session.inflightQueue.add(packet, PacketState.waitForPubrel, PacketOrigin.broker); 911 } 912 913 if (_settings.onPublish) _settings.onPublish(this, packet); 914 } 915 916 /// Message was succesfully delivered to broker 917 void onSubAck(SubAck packet) 918 { 919 if (packet.packetId == _subId) 920 { 921 assert(_subId != 0); 922 version(MqttDebug) logDebug("MQTT Received SUBACK - %s", packet); 923 _subAckTimer.stop(); 924 PacketIdGenerator.get.setUnused(_subId); 925 _subId = 0; 926 } 927 else logWarn("MQTT Received SUBACK with unknown ID - %s", packet); 928 929 if (_settings.onSubAck) _settings.onSubAck(this, packet); 930 } 931 932 /// Confirmation that unsubscribe request was successfully delivered to broker 933 void onUnsubAck(UnsubAck packet) 934 { 935 if (packet.packetId == _unsubId) 936 { 937 assert(_unsubId != 0); 938 version(MqttDebug) logDebug("MQTT Received UNSUBACK - %s", packet); 939 _unsubAckTimer.stop(); 940 PacketIdGenerator.get.setUnused(_unsubId); 941 _unsubId = 0; 942 } 943 else logWarn("MQTT Received UNSUBACK with unknown ID - %s", packet); 944 945 if (_settings.onUnsubAck) _settings.onUnsubAck(this, packet); 946 } 947 948 /// Client was disconnected from broker 949 void onDisconnect() nothrow 950 { 951 version (MqttDebug) logDebug("MQTT onDisconnect, connected: %s", this.connected); 952 if (_settings.onDisconnect) 953 { 954 try _settings.onDisconnect(this); 955 catch (Exception ex) logError("Error calling onDisconnect: %s", ex.msg); 956 } 957 } 958 959 private: 960 Settings _settings; 961 TCPConnection _con; 962 Stream _stream; 963 Session _session; 964 Task _listener, _dispatcher; 965 Serializer!(Appender!(ubyte[])) _sendBuffer; 966 FixedRingBuffer!ubyte _readBuffer; 967 ubyte[] _packetBuffer; 968 bool _disconnecting; 969 Timer _conAckTimer, _subAckTimer, _unsubAckTimer, _pingReqTimer, _pingRespTimer, _reconnectTimer; 970 ushort _subId, _unsubId; 971 RecursiveTaskMutex _readMutex, _writeMutex; 972 973 final: 974 975 void disconnectImpl(bool force) nothrow 976 { 977 // we need to stop reconnect timer even if we are already disconnected 978 if (force) stopTimer(_reconnectTimer); 979 980 if (_disconnecting) return; // already in process of disconnecting (aditional calls caused by cleanups) 981 _disconnecting = true; 982 scope (exit) onDisconnect(); // call onDisconnect after cleanup 983 984 version (MqttDebug) logDebug("MQTT disconnectImpl, connected: %s, force: %s", this.connected, force); 985 986 // cleanup connection 987 if (_con) 988 { 989 _con.close(); 990 _con = TCPConnection.init; 991 } 992 993 // terminate dispatcher 994 _session.inflightQueue.emit(); 995 _session.sendQueue.emit(); 996 997 // stop ping timers 998 stopTimer(_pingReqTimer); 999 stopTimer(_pingRespTimer); 1000 1001 // stop ConAck timer (if running) 1002 stopTimer(_conAckTimer); 1003 1004 // set reconnect if not forced to disconnect 1005 if (!force && _settings.reconnect) 1006 { 1007 version (MqttDebug) logDebug("MQTT setting reconnect timer"); 1008 1009 auto recon = () @safe nothrow 1010 { 1011 logDiagnostic("MQTT reconnecting"); 1012 this.connect(); 1013 }; 1014 1015 _reconnectTimer = setTimer(_settings.reconnect, recon, false); 1016 } 1017 } 1018 1019 // nothrow wrapper to ensure timer is stopped 1020 static void stopTimer(ref Timer timer) nothrow 1021 { 1022 if (timer && timer.pending) timer.stop(); 1023 } 1024 1025 /// Processes data in read buffer. If whole packet is presented, it delegates it to handler 1026 void proccessData(in ubyte[] data) 1027 { 1028 import mqttd.serialization; 1029 import std.range; 1030 1031 version(MqttDebug) logTrace("MQTT IN: 0x%(%02x%)", data); 1032 1033 if (_readBuffer.freeSpace < data.length) // ensure all fits to the buffer 1034 _readBuffer.capacity = _readBuffer.capacity + data.length; 1035 _readBuffer.put(data); 1036 1037 while (_readBuffer.length > 0 && this.connected) // break message processing when disconnected 1038 { 1039 // try read packet header 1040 FixedHeader header = _readBuffer[0]; // type + flags 1041 1042 // try read remaining length 1043 uint pos; 1044 uint multiplier = 1; 1045 ubyte digit; 1046 do 1047 { 1048 if (++pos >= _readBuffer.length) return; // not enough data 1049 digit = _readBuffer[pos]; 1050 header.length += ((digit & 127) * multiplier); 1051 multiplier *= 128; 1052 if (multiplier > 128*128*128) throw new PacketFormatException("Malformed remaining length"); 1053 } while ((digit & 128) != 0); 1054 1055 if (_readBuffer.length < header.length + pos + 1) return; // not enough data 1056 1057 // we've got the whole packet to handle 1058 _packetBuffer.length = 1 + pos + header.length; // packet type byte + remaining size bytes + remaining size 1059 _readBuffer.read(_packetBuffer); // read whole packet from read buffer 1060 1061 with (PacketType) 1062 { 1063 final switch (header.type) 1064 { 1065 case CONNACK: 1066 onConnAck(_packetBuffer.deserialize!ConnAck()); 1067 break; 1068 case PINGRESP: 1069 onPingResp(_packetBuffer.deserialize!PingResp()); 1070 break; 1071 case PUBACK: 1072 onPubAck(_packetBuffer.deserialize!PubAck()); 1073 break; 1074 case PUBREC: 1075 onPubRec(_packetBuffer.deserialize!PubRec()); 1076 break; 1077 case PUBREL: 1078 onPubRel(_packetBuffer.deserialize!PubRel()); 1079 break; 1080 case PUBCOMP: 1081 onPubComp(_packetBuffer.deserialize!PubComp()); 1082 break; 1083 case PUBLISH: 1084 onPublish(_packetBuffer.deserialize!Publish()); 1085 break; 1086 case SUBACK: 1087 onSubAck(_packetBuffer.deserialize!SubAck()); 1088 break; 1089 case UNSUBACK: 1090 onUnsubAck(_packetBuffer.deserialize!UnsubAck()); 1091 break; 1092 case CONNECT: 1093 case SUBSCRIBE: 1094 case UNSUBSCRIBE: 1095 case PINGREQ: 1096 case DISCONNECT: 1097 case RESERVED1: 1098 case RESERVED2: 1099 throw new Exception(format("Unexpected packet type '%s'", header.type)); 1100 } 1101 } 1102 } 1103 } 1104 1105 /// loop to receive packets 1106 void listener() 1107 { 1108 version (MqttDebug) 1109 { 1110 () @trusted { logDebug("MQTT Entering listening loop - TID:%s", thisTid); }(); 1111 scope (exit) logDebug("MQTT Exiting listening loop"); 1112 } 1113 1114 scope (exit) 1115 { 1116 _listener = Task.init; 1117 if (!_dispatcher && _stream) { _stream.finalize(); _stream = null; } 1118 } 1119 1120 auto buffer = new ubyte[4096]; 1121 1122 size_t size; 1123 while (this.connected) 1124 { 1125 { 1126 auto lock = scopedMutexLock(_readMutex); 1127 if (_stream.empty) break; 1128 size = cast(size_t) _stream.leastSize; 1129 if (size == 0) break; 1130 if (size > buffer.length) size = buffer.length; 1131 _stream.read(buffer[0..size]); 1132 } 1133 proccessData(buffer[0..size]); 1134 } 1135 1136 disconnectImpl(false); 1137 } 1138 1139 /// loop to dispatch in session stored packets 1140 void dispatcher() 1141 { 1142 version (MqttDebug) 1143 { 1144 () @trusted { logDebug("MQTT Entering dispatch loop - TID:%s", thisTid); }(); 1145 scope (exit) logDebug("MQTT Exiting dispatch loop"); 1146 } 1147 1148 scope (exit) 1149 { 1150 _dispatcher = Task.init; 1151 if (!_listener && _stream) { _stream.finalize(); _stream = null; } 1152 } 1153 1154 while (this.connected) 1155 { 1156 // wait for session state change 1157 _session.sendQueue.wait(); 1158 1159 if (!this.connected) break; 1160 if (_conAckTimer.pending) continue; //wait for ConAck before sending any messages 1161 1162 while (_session.sendQueue.length) 1163 { 1164 // wait for space in inflight queue 1165 while (_session.inflightQueue.full) 1166 { 1167 version (MqttDebug) logDebug("MQTT InflightQueue full, wait before sending next message"); 1168 _session.inflightQueue.wait(); 1169 if (!this.connected) goto dispatcherFin; // we can be disconnected in between 1170 } 1171 1172 version (MqttDebug) logDebugV("MQTT Packets in session: send=%s, wait=%s", _session.sendQueue.length, _session.inflightQueue.length); 1173 auto ctx = _session.sendQueue.front; 1174 final switch (ctx.state) 1175 { 1176 // QoS0 handling - S:Publish, S:forget 1177 case PacketState.queuedQos0: // just send it 1178 //Sender request QoS0 1179 assert(ctx.origin == PacketOrigin.client); 1180 this.send(ctx.message); 1181 break; 1182 1183 // QoS1 handling - S:Publish, R:PubAck 1184 case PacketState.queuedQos1: 1185 //Sender request QoS1 1186 //treat the Packet as “unacknowledged” until the corresponding PUBACK packet received 1187 assert(ctx.header.qos == QoSLevel.QoS1); 1188 assert(ctx.origin == PacketOrigin.client); 1189 this.send(ctx.message); 1190 ctx.state = PacketState.waitForPuback; 1191 _session.inflightQueue.add(ctx); 1192 break; 1193 1194 // QoS2 handling - S:Publish, R: PubRec, S: PubRel, R: PubComp 1195 case PacketState.queuedQos2: 1196 //Sender request QoS2 1197 //treat the PUBLISH packet as “unacknowledged” until it has received the corresponding PUBREC packet from the receiver. 1198 assert(ctx.header.qos == QoSLevel.QoS2); 1199 assert(ctx.origin == PacketOrigin.client); 1200 this.send(ctx.message); 1201 ctx.state = PacketState.waitForPubrec; 1202 _session.inflightQueue.add(ctx); 1203 break; 1204 1205 case PacketState.waitForPuback: 1206 case PacketState.waitForPubrec: 1207 case PacketState.waitForPubcomp: 1208 case PacketState.waitForPubrel: 1209 assert(0, "Invalid state"); 1210 } 1211 1212 //remove from sendQueue 1213 _session.sendQueue.popFront; 1214 } 1215 } 1216 1217 dispatcherFin: 1218 disconnectImpl(false); 1219 } 1220 1221 auto send(T)(auto ref T msg) nothrow if (isMqttPacket!T) 1222 { 1223 static if (is (T == Publish)) 1224 { 1225 version (MqttDebug) 1226 { 1227 static SysTime last; 1228 static size_t messages; 1229 1230 try 1231 { 1232 if (last == SysTime.init) last = Clock.currTime; 1233 messages++; 1234 1235 auto diff = Clock.currTime - last; 1236 if (diff.total!"msecs" >= 1_000) 1237 { 1238 logDiagnostic("MQTT %s messages/s", cast(double)(1_000 * messages)/diff.total!"msecs"); 1239 messages = 0; 1240 last = Clock.currTime; 1241 } 1242 } 1243 catch (Exception) {} 1244 } 1245 } 1246 1247 _sendBuffer.clear(); // clear to write new 1248 try _sendBuffer.serialize(msg); catch (Exception ex) { assert(false, ex.msg); } 1249 1250 if (this.connected) 1251 { 1252 version(MqttDebug) 1253 { 1254 logDebug("MQTT OUT: %s", msg); 1255 logTrace("MQTT OUT: 0x%(%02x%)", _sendBuffer.data); 1256 } 1257 try 1258 { 1259 auto lock = scopedMutexLock(_writeMutex); 1260 _stream.write(_sendBuffer.data); 1261 return true; 1262 } 1263 catch (Exception) 1264 { 1265 static if (!is(T == Disconnect)) this.disconnectImpl(false); 1266 return false; 1267 } 1268 } 1269 else return false; 1270 } 1271 }