Node.js  v8.x
Node.js is a JavaScript runtime built on Chrome's V8 JavaScript engine
stream_base.cc
Go to the documentation of this file.
1 #include "stream_base.h"
2 #include "stream_base-inl.h"
3 #include "stream_wrap.h"
4 
5 #include "node.h"
6 #include "node_buffer.h"
7 #include "env.h"
8 #include "env-inl.h"
9 #include "js_stream.h"
10 #include "string_bytes.h"
11 #include "util.h"
12 #include "util-inl.h"
13 #include "v8.h"
14 
15 #include <limits.h> // INT_MAX
16 
17 namespace node {
18 
19 using v8::Array;
20 using v8::Context;
21 using v8::FunctionCallbackInfo;
22 using v8::HandleScope;
23 using v8::Integer;
24 using v8::Local;
25 using v8::Number;
26 using v8::Object;
27 using v8::String;
28 using v8::Value;
29 
30 template int StreamBase::WriteString<ASCII>(
31  const FunctionCallbackInfo<Value>& args);
32 template int StreamBase::WriteString<UTF8>(
33  const FunctionCallbackInfo<Value>& args);
34 template int StreamBase::WriteString<UCS2>(
35  const FunctionCallbackInfo<Value>& args);
36 template int StreamBase::WriteString<LATIN1>(
37  const FunctionCallbackInfo<Value>& args);
38 
39 
40 int StreamBase::ReadStart(const FunctionCallbackInfo<Value>& args) {
41  return ReadStart();
42 }
43 
44 
45 int StreamBase::ReadStop(const FunctionCallbackInfo<Value>& args) {
46  return ReadStop();
47 }
48 
49 
50 int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
51  Environment* env = Environment::GetCurrent(args);
52 
53  CHECK(args[0]->IsObject());
54  Local<Object> req_wrap_obj = args[0].As<Object>();
55 
56  AsyncWrap* wrap = GetAsyncWrap();
57  CHECK_NE(wrap, nullptr);
58  env->set_init_trigger_id(wrap->get_id());
59  ShutdownWrap* req_wrap = new ShutdownWrap(env,
60  req_wrap_obj,
61  this,
62  AfterShutdown);
63 
64  int err = DoShutdown(req_wrap);
65  if (err)
66  delete req_wrap;
67  return err;
68 }
69 
70 
71 void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {
72  StreamBase* wrap = req_wrap->wrap();
73  Environment* env = req_wrap->env();
74 
75  // The wrap and request objects should still be there.
76  CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
77 
78  HandleScope handle_scope(env->isolate());
79  Context::Scope context_scope(env->context());
80 
81  Local<Object> req_wrap_obj = req_wrap->object();
82  Local<Value> argv[3] = {
83  Integer::New(env->isolate(), status),
84  wrap->GetObject(),
85  req_wrap_obj
86  };
87 
88  if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
89  req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
90 
91  delete req_wrap;
92 }
93 
94 
95 int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
96  Environment* env = Environment::GetCurrent(args);
97 
98  CHECK(args[0]->IsObject());
99  CHECK(args[1]->IsArray());
100 
101  Local<Object> req_wrap_obj = args[0].As<Object>();
102  Local<Array> chunks = args[1].As<Array>();
103  bool all_buffers = args[2]->IsTrue();
104 
105  size_t count;
106  if (all_buffers)
107  count = chunks->Length();
108  else
109  count = chunks->Length() >> 1;
110 
111  MaybeStackBuffer<uv_buf_t, 16> bufs(count);
112  uv_buf_t* buf_list = *bufs;
113 
114  size_t storage_size = 0;
115  uint32_t bytes = 0;
116  size_t offset;
117  AsyncWrap* wrap;
118  WriteWrap* req_wrap;
119  int err;
120 
121  if (!all_buffers) {
122  // Determine storage size first
123  for (size_t i = 0; i < count; i++) {
124  storage_size = ROUND_UP(storage_size, WriteWrap::kAlignSize);
125 
126  Local<Value> chunk = chunks->Get(i * 2);
127 
128  if (Buffer::HasInstance(chunk))
129  continue;
130  // Buffer chunk, no additional storage required
131 
132  // String chunk
133  Local<String> string = chunk->ToString(env->isolate());
134  enum encoding encoding = ParseEncoding(env->isolate(),
135  chunks->Get(i * 2 + 1));
136  size_t chunk_size;
137  if (encoding == UTF8 && string->Length() > 65535)
138  chunk_size = StringBytes::Size(env->isolate(), string, encoding);
139  else
140  chunk_size = StringBytes::StorageSize(env->isolate(), string, encoding);
141 
142  storage_size += chunk_size;
143  }
144 
145  if (storage_size > INT_MAX)
146  return UV_ENOBUFS;
147  } else {
148  for (size_t i = 0; i < count; i++) {
149  Local<Value> chunk = chunks->Get(i);
150  bufs[i].base = Buffer::Data(chunk);
151  bufs[i].len = Buffer::Length(chunk);
152  bytes += bufs[i].len;
153  }
154 
155  // Try writing immediately without allocation
156  err = DoTryWrite(&buf_list, &count);
157  if (err != 0 || count == 0)
158  goto done;
159  }
160 
161  wrap = GetAsyncWrap();
162  CHECK_NE(wrap, nullptr);
163  env->set_init_trigger_id(wrap->get_id());
164  req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, storage_size);
165 
166  offset = 0;
167  if (!all_buffers) {
168  for (size_t i = 0; i < count; i++) {
169  Local<Value> chunk = chunks->Get(i * 2);
170 
171  // Write buffer
172  if (Buffer::HasInstance(chunk)) {
173  bufs[i].base = Buffer::Data(chunk);
174  bufs[i].len = Buffer::Length(chunk);
175  bytes += bufs[i].len;
176  continue;
177  }
178 
179  // Write string
180  offset = ROUND_UP(offset, WriteWrap::kAlignSize);
181  CHECK_LE(offset, storage_size);
182  char* str_storage = req_wrap->Extra(offset);
183  size_t str_size = storage_size - offset;
184 
185  Local<String> string = chunk->ToString(env->isolate());
186  enum encoding encoding = ParseEncoding(env->isolate(),
187  chunks->Get(i * 2 + 1));
188  str_size = StringBytes::Write(env->isolate(),
189  str_storage,
190  str_size,
191  string,
192  encoding);
193  bufs[i].base = str_storage;
194  bufs[i].len = str_size;
195  offset += str_size;
196  bytes += str_size;
197  }
198  }
199 
200  err = DoWrite(req_wrap, buf_list, count, nullptr);
201  req_wrap_obj->Set(env->async(), True(env->isolate()));
202 
203  if (err)
204  req_wrap->Dispose();
205 
206  done:
207  const char* msg = Error();
208  if (msg != nullptr) {
209  req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
210  ClearError();
211  }
212  req_wrap_obj->Set(env->bytes_string(), Number::New(env->isolate(), bytes));
213 
214  return err;
215 }
216 
217 
218 
219 
220 int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
221  CHECK(args[0]->IsObject());
222 
223  Environment* env = Environment::GetCurrent(args);
224 
225  if (!args[1]->IsUint8Array()) {
226  env->ThrowTypeError("Second argument must be a buffer");
227  return 0;
228  }
229 
230  Local<Object> req_wrap_obj = args[0].As<Object>();
231  const char* data = Buffer::Data(args[1]);
232  size_t length = Buffer::Length(args[1]);
233 
234  AsyncWrap* wrap;
235  WriteWrap* req_wrap;
236  uv_buf_t buf;
237  buf.base = const_cast<char*>(data);
238  buf.len = length;
239 
240  // Try writing immediately without allocation
241  uv_buf_t* bufs = &buf;
242  size_t count = 1;
243  int err = DoTryWrite(&bufs, &count);
244  if (err != 0)
245  goto done;
246  if (count == 0)
247  goto done;
248  CHECK_EQ(count, 1);
249 
250  wrap = GetAsyncWrap();
251  if (wrap != nullptr)
252  env->set_init_trigger_id(wrap->get_id());
253  // Allocate, or write rest
254  req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite);
255 
256  err = DoWrite(req_wrap, bufs, count, nullptr);
257  req_wrap_obj->Set(env->async(), True(env->isolate()));
258  req_wrap_obj->Set(env->buffer_string(), args[1]);
259 
260  if (err)
261  req_wrap->Dispose();
262 
263  done:
264  const char* msg = Error();
265  if (msg != nullptr) {
266  req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
267  ClearError();
268  }
269  req_wrap_obj->Set(env->bytes_string(),
270  Integer::NewFromUnsigned(env->isolate(), length));
271  return err;
272 }
273 
274 
275 template <enum encoding enc>
276 int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
277  Environment* env = Environment::GetCurrent(args);
278  CHECK(args[0]->IsObject());
279  CHECK(args[1]->IsString());
280 
281  Local<Object> req_wrap_obj = args[0].As<Object>();
282  Local<String> string = args[1].As<String>();
283  Local<Object> send_handle_obj;
284  AsyncWrap* wrap;
285  if (args[2]->IsObject())
286  send_handle_obj = args[2].As<Object>();
287 
288  int err;
289 
290  // Compute the size of the storage that the string will be flattened into.
291  // For UTF8 strings that are very long, go ahead and take the hit for
292  // computing their actual size, rather than tripling the storage.
293  size_t storage_size;
294  if (enc == UTF8 && string->Length() > 65535)
295  storage_size = StringBytes::Size(env->isolate(), string, enc);
296  else
297  storage_size = StringBytes::StorageSize(env->isolate(), string, enc);
298 
299  if (storage_size > INT_MAX)
300  return UV_ENOBUFS;
301 
302  // Try writing immediately if write size isn't too big
303  WriteWrap* req_wrap;
304  char* data;
305  char stack_storage[16384]; // 16kb
306  size_t data_size;
307  uv_buf_t buf;
308 
309  bool try_write = storage_size <= sizeof(stack_storage) &&
310  (!IsIPCPipe() || send_handle_obj.IsEmpty());
311  if (try_write) {
312  data_size = StringBytes::Write(env->isolate(),
313  stack_storage,
314  storage_size,
315  string,
316  enc);
317  buf = uv_buf_init(stack_storage, data_size);
318 
319  uv_buf_t* bufs = &buf;
320  size_t count = 1;
321  err = DoTryWrite(&bufs, &count);
322 
323  // Failure
324  if (err != 0)
325  goto done;
326 
327  // Success
328  if (count == 0)
329  goto done;
330 
331  // Partial write
332  CHECK_EQ(count, 1);
333  }
334 
335  wrap = GetAsyncWrap();
336  if (wrap != nullptr)
337  env->set_init_trigger_id(wrap->get_id());
338  req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, storage_size);
339 
340  data = req_wrap->Extra();
341 
342  if (try_write) {
343  // Copy partial data
344  memcpy(data, buf.base, buf.len);
345  data_size = buf.len;
346  } else {
347  // Write it
348  data_size = StringBytes::Write(env->isolate(),
349  data,
350  storage_size,
351  string,
352  enc);
353  }
354 
355  CHECK_LE(data_size, storage_size);
356 
357  buf = uv_buf_init(data, data_size);
358 
359  if (!IsIPCPipe()) {
360  err = DoWrite(req_wrap, &buf, 1, nullptr);
361  } else {
362  uv_handle_t* send_handle = nullptr;
363 
364  if (!send_handle_obj.IsEmpty()) {
365  HandleWrap* wrap;
366  ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
367  send_handle = wrap->GetHandle();
368  // Reference StreamWrap instance to prevent it from being garbage
369  // collected before `AfterWrite` is called.
370  CHECK_EQ(false, req_wrap->persistent().IsEmpty());
371  req_wrap_obj->Set(env->handle_string(), send_handle_obj);
372  }
373 
374  err = DoWrite(
375  req_wrap,
376  &buf,
377  1,
378  reinterpret_cast<uv_stream_t*>(send_handle));
379  }
380 
381  req_wrap_obj->Set(env->async(), True(env->isolate()));
382 
383  if (err)
384  req_wrap->Dispose();
385 
386  done:
387  const char* msg = Error();
388  if (msg != nullptr) {
389  req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
390  ClearError();
391  }
392  req_wrap_obj->Set(env->bytes_string(),
393  Integer::NewFromUnsigned(env->isolate(), data_size));
394  return err;
395 }
396 
397 
398 void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
399  StreamBase* wrap = req_wrap->wrap();
400  Environment* env = req_wrap->env();
401 
402  HandleScope handle_scope(env->isolate());
403  Context::Scope context_scope(env->context());
404 
405  // The wrap and request objects should still be there.
406  CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
407 
408  // Unref handle property
409  Local<Object> req_wrap_obj = req_wrap->object();
410  req_wrap_obj->Delete(env->context(), env->handle_string()).FromJust();
411  wrap->OnAfterWrite(req_wrap);
412 
413  Local<Value> argv[] = {
414  Integer::New(env->isolate(), status),
415  wrap->GetObject(),
416  req_wrap_obj,
417  Undefined(env->isolate())
418  };
419 
420  const char* msg = wrap->Error();
421  if (msg != nullptr) {
422  argv[3] = OneByteString(env->isolate(), msg);
423  wrap->ClearError();
424  }
425 
426  if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
427  req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
428 
429  req_wrap->Dispose();
430 }
431 
432 
433 void StreamBase::EmitData(ssize_t nread,
434  Local<Object> buf,
435  Local<Object> handle) {
436  Environment* env = env_;
437 
438  Local<Value> argv[] = {
439  Integer::New(env->isolate(), nread),
440  buf,
441  handle
442  };
443 
444  if (argv[1].IsEmpty())
445  argv[1] = Undefined(env->isolate());
446 
447  if (argv[2].IsEmpty())
448  argv[2] = Undefined(env->isolate());
449 
450  AsyncWrap* wrap = GetAsyncWrap();
451  CHECK_NE(wrap, nullptr);
452  wrap->MakeCallback(env->onread_string(), arraysize(argv), argv);
453 }
454 
455 
456 bool StreamBase::IsIPCPipe() {
457  return false;
458 }
459 
460 
461 int StreamBase::GetFD() {
462  return -1;
463 }
464 
465 
466 Local<Object> StreamBase::GetObject() {
467  return GetAsyncWrap()->object();
468 }
469 
470 
471 int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
472  // No TryWrite by default
473  return 0;
474 }
475 
476 
477 const char* StreamResource::Error() const {
478  return nullptr;
479 }
480 
481 
482 void StreamResource::ClearError() {
483  // No-op
484 }
485 
486 } // namespace node
bool HasInstance(Local< Value > val)
Definition: node_buffer.cc:201
unsigned char * buf
Definition: cares_wrap.cc:483
QueryWrap * wrap
Definition: cares_wrap.cc:478
Environment *const env_
int status
Definition: cares_wrap.cc:479
union node::cares_wrap::@8::CaresAsyncData::@0 data
size_t Length(Local< Value > val)
Definition: node_buffer.cc:227
this done
Definition: v8ustack.d:366
encoding
Definition: node.h:322
char * Data(Local< Value > val)
Definition: node_buffer.cc:211
enum encoding ParseEncoding(const char *encoding, enum encoding default_encoding)
Definition: node.cc:1485
MaybeLocal< Object > New(Isolate *isolate, Local< String > string, enum encoding enc)
Definition: node_buffer.cc:241