Node.js  v8.x
Node.js is a JavaScript runtime built on Chrome's V8 JavaScript engine
stream_base.h
Go to the documentation of this file.
1 #ifndef SRC_STREAM_BASE_H_
2 #define SRC_STREAM_BASE_H_
3 
4 #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5 
6 #include "env.h"
7 #include "async-wrap.h"
8 #include "req-wrap.h"
9 #include "req-wrap-inl.h"
10 #include "node.h"
11 #include "util.h"
12 
13 #include "v8.h"
14 
15 namespace node {
16 
17 // Forward declarations
18 class StreamBase;
19 
20 template <class Req>
21 class StreamReq {
22  public:
23  typedef void (*DoneCb)(Req* req, int status);
24 
25  explicit StreamReq(DoneCb cb) : cb_(cb) {
26  }
27 
28  inline void Done(int status, const char* error_str = nullptr) {
29  Req* req = static_cast<Req*>(this);
30  Environment* env = req->env();
31  if (error_str != nullptr) {
32  req->object()->Set(env->error_string(),
33  OneByteString(env->isolate(), error_str));
34  }
35 
36  cb_(req, status);
37  }
38 
39  private:
40  DoneCb cb_;
41 };
42 
43 class ShutdownWrap : public ReqWrap<uv_shutdown_t>,
44  public StreamReq<ShutdownWrap> {
45  public:
46  ShutdownWrap(Environment* env,
47  v8::Local<v8::Object> req_wrap_obj,
48  StreamBase* wrap,
49  DoneCb cb)
50  : ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP),
51  StreamReq<ShutdownWrap>(cb),
52  wrap_(wrap) {
53  Wrap(req_wrap_obj, this);
54  }
55 
56  ~ShutdownWrap() {
57  ClearWrap(object());
58  }
59 
60  static ShutdownWrap* from_req(uv_shutdown_t* req) {
61  return ContainerOf(&ShutdownWrap::req_, req);
62  }
63 
64  inline StreamBase* wrap() const { return wrap_; }
65  size_t self_size() const override { return sizeof(*this); }
66 
67  private:
68  StreamBase* const wrap_;
69 };
70 
71 class WriteWrap: public ReqWrap<uv_write_t>,
72  public StreamReq<WriteWrap> {
73  public:
74  static inline WriteWrap* New(Environment* env,
75  v8::Local<v8::Object> obj,
76  StreamBase* wrap,
77  DoneCb cb,
78  size_t extra = 0);
79  inline void Dispose();
80  inline char* Extra(size_t offset = 0);
81 
82  inline StreamBase* wrap() const { return wrap_; }
83 
84  size_t self_size() const override { return storage_size_; }
85 
86  static WriteWrap* from_req(uv_write_t* req) {
87  return ContainerOf(&WriteWrap::req_, req);
88  }
89 
90  static const size_t kAlignSize = 16;
91 
92  WriteWrap(Environment* env,
93  v8::Local<v8::Object> obj,
94  StreamBase* wrap,
95  DoneCb cb)
96  : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
97  StreamReq<WriteWrap>(cb),
98  wrap_(wrap),
99  storage_size_(0) {
100  Wrap(obj, this);
101  }
102 
103  protected:
104  WriteWrap(Environment* env,
105  v8::Local<v8::Object> obj,
106  StreamBase* wrap,
107  DoneCb cb,
108  size_t storage_size)
109  : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
110  StreamReq<WriteWrap>(cb),
111  wrap_(wrap),
112  storage_size_(storage_size) {
113  Wrap(obj, this);
114  }
115 
116  ~WriteWrap() {
117  ClearWrap(object());
118  }
119 
120  void* operator new(size_t size) = delete;
121  void* operator new(size_t size, char* storage) { return storage; }
122 
123  // This is just to keep the compiler happy. It should never be called, since
124  // we don't use exceptions in node.
125  void operator delete(void* ptr, char* storage) { UNREACHABLE(); }
126 
127  private:
128  // People should not be using the non-placement new and delete operator on a
129  // WriteWrap. Ensure this never happens.
130  void operator delete(void* ptr) { UNREACHABLE(); }
131 
132  StreamBase* const wrap_;
133  const size_t storage_size_;
134 };
135 
136 class StreamResource {
137  public:
138  template <class T>
139  struct Callback {
140  Callback() : fn(nullptr), ctx(nullptr) {}
141  Callback(T fn, void* ctx) : fn(fn), ctx(ctx) {}
142  Callback(const Callback&) = default;
143 
144  inline bool is_empty() { return fn == nullptr; }
145  inline void clear() {
146  fn = nullptr;
147  ctx = nullptr;
148  }
149 
150  T fn;
151  void* ctx;
152  };
153 
154  typedef void (*AfterWriteCb)(WriteWrap* w, void* ctx);
155  typedef void (*AllocCb)(size_t size, uv_buf_t* buf, void* ctx);
156  typedef void (*ReadCb)(ssize_t nread,
157  const uv_buf_t* buf,
158  uv_handle_type pending,
159  void* ctx);
160  typedef void (*DestructCb)(void* ctx);
161 
162  StreamResource() : bytes_read_(0) {
163  }
164  virtual ~StreamResource() {
165  if (!destruct_cb_.is_empty())
166  destruct_cb_.fn(destruct_cb_.ctx);
167  }
168 
169  virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
170  virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
171  virtual int DoWrite(WriteWrap* w,
172  uv_buf_t* bufs,
173  size_t count,
174  uv_stream_t* send_handle) = 0;
175  virtual const char* Error() const;
176  virtual void ClearError();
177 
178  // Events
179  inline void OnAfterWrite(WriteWrap* w) {
180  if (!after_write_cb_.is_empty())
181  after_write_cb_.fn(w, after_write_cb_.ctx);
182  }
183 
184  inline void OnAlloc(size_t size, uv_buf_t* buf) {
185  if (!alloc_cb_.is_empty())
186  alloc_cb_.fn(size, buf, alloc_cb_.ctx);
187  }
188 
189  inline void OnRead(ssize_t nread,
190  const uv_buf_t* buf,
191  uv_handle_type pending = UV_UNKNOWN_HANDLE) {
192  if (nread > 0)
193  bytes_read_ += static_cast<uint64_t>(nread);
194  if (!read_cb_.is_empty())
195  read_cb_.fn(nread, buf, pending, read_cb_.ctx);
196  }
197 
198  inline void set_after_write_cb(Callback<AfterWriteCb> c) {
199  after_write_cb_ = c;
200  }
201 
202  inline void set_alloc_cb(Callback<AllocCb> c) { alloc_cb_ = c; }
203  inline void set_read_cb(Callback<ReadCb> c) { read_cb_ = c; }
204  inline void set_destruct_cb(Callback<DestructCb> c) { destruct_cb_ = c; }
205 
206  inline Callback<AfterWriteCb> after_write_cb() { return after_write_cb_; }
207  inline Callback<AllocCb> alloc_cb() { return alloc_cb_; }
208  inline Callback<ReadCb> read_cb() { return read_cb_; }
209  inline Callback<DestructCb> destruct_cb() { return destruct_cb_; }
210 
211  private:
212  Callback<AfterWriteCb> after_write_cb_;
213  Callback<AllocCb> alloc_cb_;
214  Callback<ReadCb> read_cb_;
215  Callback<DestructCb> destruct_cb_;
216  uint64_t bytes_read_;
217 
218  friend class StreamBase;
219 };
220 
221 class StreamBase : public StreamResource {
222  public:
223  enum Flags {
224  kFlagNone = 0x0,
225  kFlagHasWritev = 0x1,
226  kFlagNoShutdown = 0x2
227  };
228 
229  template <class Base>
230  static inline void AddMethods(Environment* env,
231  v8::Local<v8::FunctionTemplate> target,
232  int flags = kFlagNone);
233 
234  virtual void* Cast() = 0;
235  virtual bool IsAlive() = 0;
236  virtual bool IsClosing() = 0;
237  virtual bool IsIPCPipe();
238  virtual int GetFD();
239 
240  virtual int ReadStart() = 0;
241  virtual int ReadStop() = 0;
242 
243  inline void Consume() {
244  CHECK_EQ(consumed_, false);
245  consumed_ = true;
246  }
247 
248  inline void Unconsume() {
249  CHECK_EQ(consumed_, true);
250  consumed_ = false;
251  }
252 
253  template <class Outer>
254  inline Outer* Cast() { return static_cast<Outer*>(Cast()); }
255 
256  void EmitData(ssize_t nread,
257  v8::Local<v8::Object> buf,
258  v8::Local<v8::Object> handle);
259 
260  protected:
261  explicit StreamBase(Environment* env) : env_(env), consumed_(false) {
262  }
263 
264  virtual ~StreamBase() = default;
265 
266  // One of these must be implemented
267  virtual AsyncWrap* GetAsyncWrap() = 0;
268  virtual v8::Local<v8::Object> GetObject();
269 
270  // Libuv callbacks
271  static void AfterShutdown(ShutdownWrap* req, int status);
272  static void AfterWrite(WriteWrap* req, int status);
273 
274  // JS Methods
275  int ReadStart(const v8::FunctionCallbackInfo<v8::Value>& args);
276  int ReadStop(const v8::FunctionCallbackInfo<v8::Value>& args);
277  int Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args);
278  int Writev(const v8::FunctionCallbackInfo<v8::Value>& args);
279  int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
280  template <enum encoding enc>
281  int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args);
282 
283  template <class Base>
284  static void GetFD(v8::Local<v8::String> key,
285  const v8::PropertyCallbackInfo<v8::Value>& args);
286 
287  template <class Base>
288  static void GetExternal(v8::Local<v8::String> key,
289  const v8::PropertyCallbackInfo<v8::Value>& args);
290 
291  template <class Base>
292  static void GetBytesRead(v8::Local<v8::String> key,
293  const v8::PropertyCallbackInfo<v8::Value>& args);
294 
295  template <class Base,
296  int (StreamBase::*Method)(
297  const v8::FunctionCallbackInfo<v8::Value>& args)>
298  static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args);
299 
300  private:
301  Environment* env_;
302  bool consumed_;
303 };
304 
305 } // namespace node
306 
307 #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
308 
309 #endif // SRC_STREAM_BASE_H_
unsigned char * buf
Definition: cares_wrap.cc:483
QueryWrap * wrap
Definition: cares_wrap.cc:478
Environment *const env_
int status
Definition: cares_wrap.cc:479
uv_fs_t req
Definition: node_file.cc:374
MaybeLocal< Object > New(Isolate *isolate, Local< String > string, enum encoding enc)
Definition: node_buffer.cc:241
this ctx
Definition: v8ustack.d:369