1 /**
2  * A wrapper around streams, which do not actually implement the `Stream` interface
3  *
4  * Author:
5  * Tomáš Chaloupka <chalucha@gmail.com>
6  *
7  * License:
8  * Boost Software License 1.0 (BSL-1.0)
9  *
10  * Permission is hereby granted, free of charge, to any person or organization obtaining a copy
11  * of the software and accompanying documentation covered by this license (the "Software") to use,
12  * reproduce, display, distribute, execute, and transmit the Software, and to prepare derivative
13  * works of the Software, and to permit third-parties to whom the Software is furnished to do so,
14  * all subject to the following:
15  *
16  * The copyright notices in the Software and this entire statement, including the above license
17  * grant, this restriction and the following disclaimer, must be included in all copies of the Software,
18  * in whole or in part, and all derivative works of the Software, unless such copies or derivative works
19  * are solely in the form of machine-executable object code generated by a source language processor.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
22  * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
23  * PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR ANYONE
24  * DISTRIBUTING THE SOFTWARE BE LIABLE FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT,
25  * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
26  * OTHER DEALINGS IN THE SOFTWARE.
27  */
28 
29 module mqttd.stream_wrapper;
30 
31 version (Have_vibe_core) import vibe.core.stream : blocking, IOMode, isStream, Stream;
32 else
33 {
34 	import vibe.core.stream : IOMode, isStream, Stream;
35 
36 	// workaround for the original vibe-d:core which doesn't have the blocking UDA defined
37 	// See https://github.com/vibe-d/vibe-core/blob/6ca6dd16d4f89c1a8d3bb89fa58d56cf466d70e6/source/vibe/core/stream.d#L79 for details
38 	struct blocking {}
39 }
40 
41 import vibe.core.net;
42 
43 
44 /**
45  * A wrapper around streams, which do not actually implement the `Stream`
46  * interface (e.g. `TCPConnection`).
47  */
48 class StreamWrapper(T) : Stream if (isStream!T)
49 {
50 private:
51 	T _stream;
52 public:
53 
54 @safe:
55 	@disable this();
56 
57 	this(T stream)
58 	{
59 		this._stream = stream;
60 	}
61 
62 	@property bool empty() @blocking
63 	{
64 		return _stream.empty();
65 	}
66 
67 	@property ulong leastSize() @blocking
68 	{
69 		return _stream.leastSize();
70 	}
71 
72 	@property bool dataAvailableForRead()
73 	{
74 		return _stream.dataAvailableForRead();
75 	}
76 
77 	const(ubyte)[] peek()
78 	{
79 		return _stream.peek();
80 	}
81 
82 	size_t read(scope ubyte[] dst, IOMode mode) @blocking
83 	{
84 		return _stream.read(dst, mode);
85 	}
86 
87 	size_t write(in ubyte[] bytes, IOMode mode) @blocking
88 	{
89 		return _stream.write(bytes, mode);
90 	}
91 
92 	void flush() @blocking
93 	{
94 		return _stream.flush();
95 	}
96 
97 	void finalize() @blocking
98 	{
99 		_stream.finalize();
100 		_stream = T.init; // make sure that we don't hold any reference on wrapped connection
101 	}
102 }