1 /** 2 * A wrapper around streams, which do not actually implement the `Stream` interface 3 * 4 * Author: 5 * Tomáš Chaloupka <chalucha@gmail.com> 6 * 7 * License: 8 * Boost Software License 1.0 (BSL-1.0) 9 * 10 * Permission is hereby granted, free of charge, to any person or organization obtaining a copy 11 * of the software and accompanying documentation covered by this license (the "Software") to use, 12 * reproduce, display, distribute, execute, and transmit the Software, and to prepare derivative 13 * works of the Software, and to permit third-parties to whom the Software is furnished to do so, 14 * all subject to the following: 15 * 16 * The copyright notices in the Software and this entire statement, including the above license 17 * grant, this restriction and the following disclaimer, must be included in all copies of the Software, 18 * in whole or in part, and all derivative works of the Software, unless such copies or derivative works 19 * are solely in the form of machine-executable object code generated by a source language processor. 20 * 21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, 22 * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR 23 * PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR ANYONE 24 * DISTRIBUTING THE SOFTWARE BE LIABLE FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, 25 * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR 26 * OTHER DEALINGS IN THE SOFTWARE. 27 */ 28 29 module mqttd.stream_wrapper; 30 31 version (Have_vibe_core) import vibe.core.stream : blocking, IOMode, isStream, Stream; 32 else 33 { 34 import vibe.core.stream : IOMode, isStream, Stream; 35 36 // workaround for the original vibe-d:core which doesn't have the blocking UDA defined 37 // See https://github.com/vibe-d/vibe-core/blob/6ca6dd16d4f89c1a8d3bb89fa58d56cf466d70e6/source/vibe/core/stream.d#L79 for details 38 struct blocking {} 39 } 40 41 import vibe.core.net; 42 43 44 /** 45 * A wrapper around streams, which do not actually implement the `Stream` 46 * interface (e.g. `TCPConnection`). 47 */ 48 class StreamWrapper(T) : Stream if (isStream!T) 49 { 50 private: 51 T _stream; 52 public: 53 54 @safe: 55 @disable this(); 56 57 this(T stream) 58 { 59 this._stream = stream; 60 } 61 62 @property bool empty() @blocking 63 { 64 return _stream.empty(); 65 } 66 67 @property ulong leastSize() @blocking 68 { 69 return _stream.leastSize(); 70 } 71 72 @property bool dataAvailableForRead() 73 { 74 return _stream.dataAvailableForRead(); 75 } 76 77 const(ubyte)[] peek() 78 { 79 return _stream.peek(); 80 } 81 82 size_t read(scope ubyte[] dst, IOMode mode) @blocking 83 { 84 return _stream.read(dst, mode); 85 } 86 87 size_t write(in ubyte[] bytes, IOMode mode) @blocking 88 { 89 return _stream.write(bytes, mode); 90 } 91 92 void flush() @blocking 93 { 94 return _stream.flush(); 95 } 96 97 void finalize() @blocking 98 { 99 _stream.finalize(); 100 _stream = T.init; // make sure that we don't hold any reference on wrapped connection 101 } 102 }