Node.js  v8.x
Node.js is a JavaScript runtime built on Chrome's V8 JavaScript engine
stream_wrap.cc
Go to the documentation of this file.
1 // Copyright Joyent, Inc. and other Node contributors.
2 //
3 // Permission is hereby granted, free of charge, to any person obtaining a
4 // copy of this software and associated documentation files (the
5 // "Software"), to deal in the Software without restriction, including
6 // without limitation the rights to use, copy, modify, merge, publish,
7 // distribute, sublicense, and/or sell copies of the Software, and to permit
8 // persons to whom the Software is furnished to do so, subject to the
9 // following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included
12 // in all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 // USE OR OTHER DEALINGS IN THE SOFTWARE.
21 
22 #include "stream_wrap.h"
23 #include "stream_base.h"
24 #include "stream_base-inl.h"
25 
26 #include "env-inl.h"
27 #include "env.h"
28 #include "handle_wrap.h"
29 #include "node_buffer.h"
30 #include "node_counters.h"
31 #include "pipe_wrap.h"
32 #include "req-wrap.h"
33 #include "req-wrap-inl.h"
34 #include "tcp_wrap.h"
35 #include "udp_wrap.h"
36 #include "util.h"
37 #include "util-inl.h"
38 
39 #include <stdlib.h> // abort()
40 #include <string.h> // memcpy()
41 #include <limits.h> // INT_MAX
42 
43 
44 namespace node {
45 
46 using v8::Context;
47 using v8::EscapableHandleScope;
48 using v8::FunctionCallbackInfo;
49 using v8::FunctionTemplate;
50 using v8::HandleScope;
51 using v8::Integer;
52 using v8::Local;
53 using v8::Object;
54 using v8::Value;
55 
56 
57 void StreamWrap::Initialize(Local<Object> target,
58  Local<Value> unused,
59  Local<Context> context) {
60  Environment* env = Environment::GetCurrent(context);
61 
62  auto is_construct_call_callback =
63  [](const FunctionCallbackInfo<Value>& args) {
64  CHECK(args.IsConstructCall());
65  ClearWrap(args.This());
66  };
67  Local<FunctionTemplate> sw =
68  FunctionTemplate::New(env->isolate(), is_construct_call_callback);
69  sw->InstanceTemplate()->SetInternalFieldCount(1);
70  Local<String> wrapString =
71  FIXED_ONE_BYTE_STRING(env->isolate(), "ShutdownWrap");
72  sw->SetClassName(wrapString);
73  AsyncWrap::AddWrapMethods(env, sw);
74  target->Set(wrapString, sw->GetFunction());
75 
76  Local<FunctionTemplate> ww =
77  FunctionTemplate::New(env->isolate(), is_construct_call_callback);
78  ww->InstanceTemplate()->SetInternalFieldCount(1);
79  Local<String> writeWrapString =
80  FIXED_ONE_BYTE_STRING(env->isolate(), "WriteWrap");
81  ww->SetClassName(writeWrapString);
82  AsyncWrap::AddWrapMethods(env, ww);
83  target->Set(writeWrapString, ww->GetFunction());
84  env->set_write_wrap_constructor_function(ww->GetFunction());
85 }
86 
87 
88 StreamWrap::StreamWrap(Environment* env,
89  Local<Object> object,
90  uv_stream_t* stream,
91  AsyncWrap::ProviderType provider)
92  : HandleWrap(env,
93  object,
94  reinterpret_cast<uv_handle_t*>(stream),
95  provider),
96  StreamBase(env),
97  stream_(stream) {
98  set_after_write_cb({ OnAfterWriteImpl, this });
99  set_alloc_cb({ OnAllocImpl, this });
100  set_read_cb({ OnReadImpl, this });
101 }
102 
103 
104 void StreamWrap::AddMethods(Environment* env,
105  v8::Local<v8::FunctionTemplate> target,
106  int flags) {
107  env->SetProtoMethod(target, "setBlocking", SetBlocking);
108  StreamBase::AddMethods<StreamWrap>(env, target, flags);
109 }
110 
111 
112 int StreamWrap::GetFD() {
113  int fd = -1;
114 #if !defined(_WIN32)
115  if (stream() != nullptr)
116  uv_fileno(reinterpret_cast<uv_handle_t*>(stream()), &fd);
117 #endif
118  return fd;
119 }
120 
121 
122 bool StreamWrap::IsAlive() {
123  return HandleWrap::IsAlive(this);
124 }
125 
126 
127 bool StreamWrap::IsClosing() {
128  return uv_is_closing(reinterpret_cast<uv_handle_t*>(stream()));
129 }
130 
131 
132 void* StreamWrap::Cast() {
133  return reinterpret_cast<void*>(this);
134 }
135 
136 
137 AsyncWrap* StreamWrap::GetAsyncWrap() {
138  return static_cast<AsyncWrap*>(this);
139 }
140 
141 
142 bool StreamWrap::IsIPCPipe() {
143  return is_named_pipe_ipc();
144 }
145 
146 
147 void StreamWrap::UpdateWriteQueueSize() {
148  HandleScope scope(env()->isolate());
149  Local<Integer> write_queue_size =
150  Integer::NewFromUnsigned(env()->isolate(), stream()->write_queue_size);
151  object()->Set(env()->write_queue_size_string(), write_queue_size);
152 }
153 
154 
155 int StreamWrap::ReadStart() {
156  return uv_read_start(stream(), OnAlloc, OnRead);
157 }
158 
159 
160 int StreamWrap::ReadStop() {
161  return uv_read_stop(stream());
162 }
163 
164 
165 void StreamWrap::OnAlloc(uv_handle_t* handle,
166  size_t suggested_size,
167  uv_buf_t* buf) {
168  StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
169  HandleScope scope(wrap->env()->isolate());
170  Context::Scope context_scope(wrap->env()->context());
171 
172  CHECK_EQ(wrap->stream(), reinterpret_cast<uv_stream_t*>(handle));
173 
174  return static_cast<StreamBase*>(wrap)->OnAlloc(suggested_size, buf);
175 }
176 
177 
178 void StreamWrap::OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx) {
179  buf->base = node::Malloc(size);
180  buf->len = size;
181 }
182 
183 
184 template <class WrapType, class UVType>
185 static Local<Object> AcceptHandle(Environment* env, StreamWrap* parent) {
186  EscapableHandleScope scope(env->isolate());
187  Local<Object> wrap_obj;
188  UVType* handle;
189 
190  wrap_obj = WrapType::Instantiate(env, parent);
191  if (wrap_obj.IsEmpty())
192  return Local<Object>();
193 
194  WrapType* wrap;
195  ASSIGN_OR_RETURN_UNWRAP(&wrap, wrap_obj, Local<Object>());
196  handle = wrap->UVHandle();
197 
198  if (uv_accept(parent->stream(), reinterpret_cast<uv_stream_t*>(handle)))
199  ABORT();
200 
201  return scope.Escape(wrap_obj);
202 }
203 
204 
205 void StreamWrap::OnReadImpl(ssize_t nread,
206  const uv_buf_t* buf,
207  uv_handle_type pending,
208  void* ctx) {
209  StreamWrap* wrap = static_cast<StreamWrap*>(ctx);
210  Environment* env = wrap->env();
211  HandleScope handle_scope(env->isolate());
212  Context::Scope context_scope(env->context());
213 
214  Local<Object> pending_obj;
215 
216  if (nread < 0) {
217  if (buf->base != nullptr)
218  free(buf->base);
219  wrap->EmitData(nread, Local<Object>(), pending_obj);
220  return;
221  }
222 
223  if (nread == 0) {
224  if (buf->base != nullptr)
225  free(buf->base);
226  return;
227  }
228 
229  CHECK_LE(static_cast<size_t>(nread), buf->len);
230  char* base = node::Realloc(buf->base, nread);
231 
232  if (pending == UV_TCP) {
233  pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(env, wrap);
234  } else if (pending == UV_NAMED_PIPE) {
235  pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(env, wrap);
236  } else if (pending == UV_UDP) {
237  pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(env, wrap);
238  } else {
239  CHECK_EQ(pending, UV_UNKNOWN_HANDLE);
240  }
241 
242  Local<Object> obj = Buffer::New(env, base, nread).ToLocalChecked();
243  wrap->EmitData(nread, obj, pending_obj);
244 }
245 
246 
247 void StreamWrap::OnRead(uv_stream_t* handle,
248  ssize_t nread,
249  const uv_buf_t* buf) {
250  StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
251  HandleScope scope(wrap->env()->isolate());
252  Context::Scope context_scope(wrap->env()->context());
253  uv_handle_type type = UV_UNKNOWN_HANDLE;
254 
255  if (wrap->is_named_pipe_ipc() &&
256  uv_pipe_pending_count(reinterpret_cast<uv_pipe_t*>(handle)) > 0) {
257  type = uv_pipe_pending_type(reinterpret_cast<uv_pipe_t*>(handle));
258  }
259 
260  // We should not be getting this callback if someone as already called
261  // uv_close() on the handle.
262  CHECK_EQ(wrap->persistent().IsEmpty(), false);
263 
264  if (nread > 0) {
265  if (wrap->is_tcp()) {
267  } else if (wrap->is_named_pipe()) {
269  }
270  }
271 
272  static_cast<StreamBase*>(wrap)->OnRead(nread, buf, type);
273 }
274 
275 
276 void StreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
277  StreamWrap* wrap;
278  ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
279 
280  CHECK_GT(args.Length(), 0);
281  if (!wrap->IsAlive())
282  return args.GetReturnValue().Set(UV_EINVAL);
283 
284  bool enable = args[0]->IsTrue();
285  args.GetReturnValue().Set(uv_stream_set_blocking(wrap->stream(), enable));
286 }
287 
288 
289 int StreamWrap::DoShutdown(ShutdownWrap* req_wrap) {
290  int err;
291  err = uv_shutdown(req_wrap->req(), stream(), AfterShutdown);
292  req_wrap->Dispatched();
293  return err;
294 }
295 
296 
297 void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) {
298  ShutdownWrap* req_wrap = ShutdownWrap::from_req(req);
299  CHECK_NE(req_wrap, nullptr);
300  HandleScope scope(req_wrap->env()->isolate());
301  Context::Scope context_scope(req_wrap->env()->context());
302  req_wrap->Done(status);
303 }
304 
305 
306 // NOTE: Call to this function could change both `buf`'s and `count`'s
307 // values, shifting their base and decrementing their length. This is
308 // required in order to skip the data that was successfully written via
309 // uv_try_write().
310 int StreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) {
311  int err;
312  size_t written;
313  uv_buf_t* vbufs = *bufs;
314  size_t vcount = *count;
315 
316  err = uv_try_write(stream(), vbufs, vcount);
317  if (err == UV_ENOSYS || err == UV_EAGAIN)
318  return 0;
319  if (err < 0)
320  return err;
321 
322  // Slice off the buffers: skip all written buffers and slice the one that
323  // was partially written.
324  written = err;
325  for (; vcount > 0; vbufs++, vcount--) {
326  // Slice
327  if (vbufs[0].len > written) {
328  vbufs[0].base += written;
329  vbufs[0].len -= written;
330  written = 0;
331  break;
332 
333  // Discard
334  } else {
335  written -= vbufs[0].len;
336  }
337  }
338 
339  *bufs = vbufs;
340  *count = vcount;
341 
342  return 0;
343 }
344 
345 
346 int StreamWrap::DoWrite(WriteWrap* w,
347  uv_buf_t* bufs,
348  size_t count,
349  uv_stream_t* send_handle) {
350  int r;
351  if (send_handle == nullptr) {
352  r = uv_write(w->req(), stream(), bufs, count, AfterWrite);
353  } else {
354  r = uv_write2(w->req(), stream(), bufs, count, send_handle, AfterWrite);
355  }
356 
357  if (!r) {
358  size_t bytes = 0;
359  for (size_t i = 0; i < count; i++)
360  bytes += bufs[i].len;
361  if (stream()->type == UV_TCP) {
363  } else if (stream()->type == UV_NAMED_PIPE) {
365  }
366  }
367 
368  w->Dispatched();
369  UpdateWriteQueueSize();
370 
371  return r;
372 }
373 
374 
375 void StreamWrap::AfterWrite(uv_write_t* req, int status) {
376  WriteWrap* req_wrap = WriteWrap::from_req(req);
377  CHECK_NE(req_wrap, nullptr);
378  HandleScope scope(req_wrap->env()->isolate());
379  Context::Scope context_scope(req_wrap->env()->context());
380  req_wrap->Done(status);
381 }
382 
383 
384 void StreamWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) {
385  StreamWrap* wrap = static_cast<StreamWrap*>(ctx);
386  wrap->UpdateWriteQueueSize();
387 }
388 
389 } // namespace node
390 
unsigned char * buf
Definition: cares_wrap.cc:483
void NODE_COUNT_NET_BYTES_SENT(int bytes)
NODE_MODULE_CONTEXT_AWARE_BUILTIN(inspector, node::inspector::Agent::InitInspector)
int len
Definition: cares_wrap.cc:485
void NODE_COUNT_PIPE_BYTES_SENT(int bytes)
QueryWrap * wrap
Definition: cares_wrap.cc:478
int status
Definition: cares_wrap.cc:479
void NODE_COUNT_NET_BYTES_RECV(int bytes)
void Initialize(Local< Object > target, Local< Value > unused, Local< Context > context, void *priv)
Definition: node_http2.cc:1172
uv_fs_t req
Definition: node_file.cc:374
void NODE_COUNT_PIPE_BYTES_RECV(int bytes)
MaybeLocal< Object > New(Isolate *isolate, Local< String > string, enum encoding enc)
Definition: node_buffer.cc:241
this ctx
Definition: v8ustack.d:369