21 using v8::FunctionCallbackInfo;
22 using v8::HandleScope;
30 template int StreamBase::WriteString<ASCII>(
31 const FunctionCallbackInfo<Value>& args);
32 template int StreamBase::WriteString<UTF8>(
33 const FunctionCallbackInfo<Value>& args);
34 template int StreamBase::WriteString<UCS2>(
35 const FunctionCallbackInfo<Value>& args);
36 template int StreamBase::WriteString<LATIN1>(
37 const FunctionCallbackInfo<Value>& args);
40 int StreamBase::ReadStart(
const FunctionCallbackInfo<Value>& args) {
45 int StreamBase::ReadStop(
const FunctionCallbackInfo<Value>& args) {
50 int StreamBase::Shutdown(
const FunctionCallbackInfo<Value>& args) {
51 Environment* env = Environment::GetCurrent(args);
53 CHECK(args[0]->IsObject());
54 Local<Object> req_wrap_obj = args[0].As<Object>();
56 AsyncWrap*
wrap = GetAsyncWrap();
57 CHECK_NE(wrap,
nullptr);
58 env->set_init_trigger_id(wrap->get_id());
59 ShutdownWrap* req_wrap =
new ShutdownWrap(env,
64 int err = DoShutdown(req_wrap);
71 void StreamBase::AfterShutdown(ShutdownWrap* req_wrap,
int status) {
72 StreamBase* wrap = req_wrap->wrap();
73 Environment* env = req_wrap->env();
76 CHECK_EQ(req_wrap->persistent().IsEmpty(),
false);
78 HandleScope handle_scope(env->isolate());
79 Context::Scope context_scope(env->context());
81 Local<Object> req_wrap_obj = req_wrap->object();
82 Local<Value> argv[3] = {
88 if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
89 req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
95 int StreamBase::Writev(
const FunctionCallbackInfo<Value>& args) {
96 Environment* env = Environment::GetCurrent(args);
98 CHECK(args[0]->IsObject());
99 CHECK(args[1]->IsArray());
101 Local<Object> req_wrap_obj = args[0].As<Object>();
102 Local<Array> chunks = args[1].As<Array>();
103 bool all_buffers = args[2]->IsTrue();
107 count = chunks->Length();
109 count = chunks->Length() >> 1;
111 MaybeStackBuffer<uv_buf_t, 16> bufs(count);
112 uv_buf_t* buf_list = *bufs;
114 size_t storage_size = 0;
123 for (
size_t i = 0; i < count; i++) {
124 storage_size = ROUND_UP(storage_size, WriteWrap::kAlignSize);
126 Local<Value> chunk = chunks->Get(i * 2);
133 Local<String>
string = chunk->ToString(env->isolate());
135 chunks->Get(i * 2 + 1));
137 if (encoding ==
UTF8 && string->Length() > 65535)
138 chunk_size = StringBytes::Size(env->isolate(), string,
encoding);
140 chunk_size = StringBytes::StorageSize(env->isolate(), string,
encoding);
142 storage_size += chunk_size;
145 if (storage_size > INT_MAX)
148 for (
size_t i = 0; i < count; i++) {
149 Local<Value> chunk = chunks->Get(i);
152 bytes += bufs[i].len;
156 err = DoTryWrite(&buf_list, &count);
157 if (err != 0 || count == 0)
161 wrap = GetAsyncWrap();
162 CHECK_NE(wrap,
nullptr);
163 env->set_init_trigger_id(wrap->get_id());
164 req_wrap =
WriteWrap::New(env, req_wrap_obj,
this, AfterWrite, storage_size);
168 for (
size_t i = 0; i < count; i++) {
169 Local<Value> chunk = chunks->Get(i * 2);
175 bytes += bufs[i].len;
180 offset = ROUND_UP(offset, WriteWrap::kAlignSize);
181 CHECK_LE(offset, storage_size);
182 char* str_storage = req_wrap->Extra(offset);
183 size_t str_size = storage_size - offset;
185 Local<String>
string = chunk->ToString(env->isolate());
187 chunks->Get(i * 2 + 1));
188 str_size = StringBytes::Write(env->isolate(),
193 bufs[i].base = str_storage;
194 bufs[i].len = str_size;
200 err = DoWrite(req_wrap, buf_list, count,
nullptr);
201 req_wrap_obj->Set(env->async(), True(env->isolate()));
207 const char* msg = Error();
208 if (msg !=
nullptr) {
209 req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
212 req_wrap_obj->Set(env->bytes_string(),
Number::New(env->isolate(), bytes));
220 int StreamBase::WriteBuffer(
const FunctionCallbackInfo<Value>& args) {
221 CHECK(args[0]->IsObject());
223 Environment* env = Environment::GetCurrent(args);
225 if (!args[1]->IsUint8Array()) {
226 env->ThrowTypeError(
"Second argument must be a buffer");
230 Local<Object> req_wrap_obj = args[0].As<Object>();
237 buf.base =
const_cast<char*
>(
data);
241 uv_buf_t* bufs = &
buf;
243 int err = DoTryWrite(&bufs, &count);
250 wrap = GetAsyncWrap();
252 env->set_init_trigger_id(wrap->get_id());
256 err = DoWrite(req_wrap, bufs, count,
nullptr);
257 req_wrap_obj->Set(env->async(), True(env->isolate()));
258 req_wrap_obj->Set(env->buffer_string(), args[1]);
264 const char* msg = Error();
265 if (msg !=
nullptr) {
266 req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
269 req_wrap_obj->Set(env->bytes_string(),
270 Integer::NewFromUnsigned(env->isolate(), length));
275 template <enum encoding enc>
276 int StreamBase::WriteString(
const FunctionCallbackInfo<Value>& args) {
277 Environment* env = Environment::GetCurrent(args);
278 CHECK(args[0]->IsObject());
279 CHECK(args[1]->IsString());
281 Local<Object> req_wrap_obj = args[0].As<Object>();
282 Local<String>
string = args[1].As<String>();
283 Local<Object> send_handle_obj;
285 if (args[2]->IsObject())
286 send_handle_obj = args[2].As<Object>();
294 if (enc ==
UTF8 && string->Length() > 65535)
295 storage_size = StringBytes::Size(env->isolate(), string, enc);
297 storage_size = StringBytes::StorageSize(env->isolate(), string, enc);
299 if (storage_size > INT_MAX)
305 char stack_storage[16384];
309 bool try_write = storage_size <=
sizeof(stack_storage) &&
310 (!IsIPCPipe() || send_handle_obj.IsEmpty());
312 data_size = StringBytes::Write(env->isolate(),
317 buf = uv_buf_init(stack_storage, data_size);
319 uv_buf_t* bufs = &
buf;
321 err = DoTryWrite(&bufs, &count);
335 wrap = GetAsyncWrap();
337 env->set_init_trigger_id(wrap->get_id());
338 req_wrap =
WriteWrap::New(env, req_wrap_obj,
this, AfterWrite, storage_size);
340 data = req_wrap->Extra();
344 memcpy(data, buf.base, buf.len);
348 data_size = StringBytes::Write(env->isolate(),
355 CHECK_LE(data_size, storage_size);
357 buf = uv_buf_init(data, data_size);
360 err = DoWrite(req_wrap, &buf, 1,
nullptr);
362 uv_handle_t* send_handle =
nullptr;
364 if (!send_handle_obj.IsEmpty()) {
366 ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
367 send_handle = wrap->GetHandle();
370 CHECK_EQ(
false, req_wrap->persistent().IsEmpty());
371 req_wrap_obj->Set(env->handle_string(), send_handle_obj);
378 reinterpret_cast<uv_stream_t*>(send_handle));
381 req_wrap_obj->Set(env->async(), True(env->isolate()));
387 const char* msg = Error();
388 if (msg !=
nullptr) {
389 req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
392 req_wrap_obj->Set(env->bytes_string(),
393 Integer::NewFromUnsigned(env->isolate(), data_size));
398 void StreamBase::AfterWrite(WriteWrap* req_wrap,
int status) {
399 StreamBase* wrap = req_wrap->wrap();
400 Environment* env = req_wrap->env();
402 HandleScope handle_scope(env->isolate());
403 Context::Scope context_scope(env->context());
406 CHECK_EQ(req_wrap->persistent().IsEmpty(),
false);
409 Local<Object> req_wrap_obj = req_wrap->object();
410 req_wrap_obj->Delete(env->context(), env->handle_string()).FromJust();
411 wrap->OnAfterWrite(req_wrap);
413 Local<Value> argv[] = {
417 Undefined(env->isolate())
420 const char* msg = wrap->Error();
421 if (msg !=
nullptr) {
422 argv[3] = OneByteString(env->isolate(), msg);
426 if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
427 req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
433 void StreamBase::EmitData(ssize_t nread,
435 Local<Object> handle) {
436 Environment* env =
env_;
438 Local<Value> argv[] = {
444 if (argv[1].IsEmpty())
445 argv[1] = Undefined(env->isolate());
447 if (argv[2].IsEmpty())
448 argv[2] = Undefined(env->isolate());
450 AsyncWrap* wrap = GetAsyncWrap();
451 CHECK_NE(wrap,
nullptr);
452 wrap->MakeCallback(env->onread_string(), arraysize(argv), argv);
456 bool StreamBase::IsIPCPipe() {
461 int StreamBase::GetFD() {
466 Local<Object> StreamBase::GetObject() {
467 return GetAsyncWrap()->object();
471 int StreamResource::DoTryWrite(uv_buf_t** bufs,
size_t* count) {
477 const char* StreamResource::Error()
const {
482 void StreamResource::ClearError() {
bool HasInstance(Local< Value > val)
union node::cares_wrap::@8::CaresAsyncData::@0 data
size_t Length(Local< Value > val)
char * Data(Local< Value > val)
enum encoding ParseEncoding(const char *encoding, enum encoding default_encoding)
MaybeLocal< Object > New(Isolate *isolate, Local< String > string, enum encoding enc)