47 using v8::EscapableHandleScope;
48 using v8::FunctionCallbackInfo;
49 using v8::FunctionTemplate;
50 using v8::HandleScope;
59 Local<Context> context) {
60 Environment* env = Environment::GetCurrent(context);
62 auto is_construct_call_callback =
63 [](
const FunctionCallbackInfo<Value>& args) {
64 CHECK(args.IsConstructCall());
65 ClearWrap(args.This());
67 Local<FunctionTemplate> sw =
69 sw->InstanceTemplate()->SetInternalFieldCount(1);
70 Local<String> wrapString =
71 FIXED_ONE_BYTE_STRING(env->isolate(),
"ShutdownWrap");
72 sw->SetClassName(wrapString);
73 AsyncWrap::AddWrapMethods(env, sw);
74 target->Set(wrapString, sw->GetFunction());
76 Local<FunctionTemplate> ww =
78 ww->InstanceTemplate()->SetInternalFieldCount(1);
79 Local<String> writeWrapString =
80 FIXED_ONE_BYTE_STRING(env->isolate(),
"WriteWrap");
81 ww->SetClassName(writeWrapString);
82 AsyncWrap::AddWrapMethods(env, ww);
83 target->Set(writeWrapString, ww->GetFunction());
84 env->set_write_wrap_constructor_function(ww->GetFunction());
88 StreamWrap::StreamWrap(Environment* env,
91 AsyncWrap::ProviderType provider)
94 reinterpret_cast<uv_handle_t*>(stream),
98 set_after_write_cb({ OnAfterWriteImpl,
this });
99 set_alloc_cb({ OnAllocImpl,
this });
100 set_read_cb({ OnReadImpl,
this });
104 void StreamWrap::AddMethods(Environment* env,
105 v8::Local<v8::FunctionTemplate> target,
107 env->SetProtoMethod(target,
"setBlocking", SetBlocking);
108 StreamBase::AddMethods<StreamWrap>(env, target, flags);
112 int StreamWrap::GetFD() {
115 if (stream() !=
nullptr)
116 uv_fileno(reinterpret_cast<uv_handle_t*>(stream()), &fd);
122 bool StreamWrap::IsAlive() {
123 return HandleWrap::IsAlive(
this);
127 bool StreamWrap::IsClosing() {
128 return uv_is_closing(reinterpret_cast<uv_handle_t*>(stream()));
132 void* StreamWrap::Cast() {
133 return reinterpret_cast<void*
>(
this);
137 AsyncWrap* StreamWrap::GetAsyncWrap() {
138 return static_cast<AsyncWrap*
>(
this);
142 bool StreamWrap::IsIPCPipe() {
143 return is_named_pipe_ipc();
147 void StreamWrap::UpdateWriteQueueSize() {
148 HandleScope scope(env()->isolate());
149 Local<Integer> write_queue_size =
150 Integer::NewFromUnsigned(env()->isolate(), stream()->write_queue_size);
151 object()->Set(env()->write_queue_size_string(), write_queue_size);
155 int StreamWrap::ReadStart() {
156 return uv_read_start(stream(), OnAlloc, OnRead);
160 int StreamWrap::ReadStop() {
161 return uv_read_stop(stream());
165 void StreamWrap::OnAlloc(uv_handle_t* handle,
166 size_t suggested_size,
168 StreamWrap*
wrap =
static_cast<StreamWrap*
>(handle->data);
169 HandleScope scope(wrap->env()->isolate());
170 Context::Scope context_scope(wrap->env()->context());
172 CHECK_EQ(wrap->stream(),
reinterpret_cast<uv_stream_t*
>(handle));
174 return static_cast<StreamBase*
>(
wrap)->OnAlloc(suggested_size, buf);
178 void StreamWrap::OnAllocImpl(
size_t size, uv_buf_t* buf,
void*
ctx) {
179 buf->base = node::Malloc(size);
184 template <
class WrapType,
class UVType>
185 static Local<Object> AcceptHandle(Environment* env, StreamWrap* parent) {
186 EscapableHandleScope scope(env->isolate());
187 Local<Object> wrap_obj;
190 wrap_obj = WrapType::Instantiate(env, parent);
191 if (wrap_obj.IsEmpty())
192 return Local<Object>();
195 ASSIGN_OR_RETURN_UNWRAP(&wrap, wrap_obj, Local<Object>());
196 handle = wrap->UVHandle();
198 if (uv_accept(parent->stream(),
reinterpret_cast<uv_stream_t*
>(handle)))
201 return scope.Escape(wrap_obj);
205 void StreamWrap::OnReadImpl(ssize_t nread,
207 uv_handle_type pending,
209 StreamWrap* wrap =
static_cast<StreamWrap*
>(
ctx);
210 Environment* env = wrap->env();
211 HandleScope handle_scope(env->isolate());
212 Context::Scope context_scope(env->context());
214 Local<Object> pending_obj;
217 if (buf->base !=
nullptr)
219 wrap->EmitData(nread, Local<Object>(), pending_obj);
224 if (buf->base !=
nullptr)
229 CHECK_LE(static_cast<size_t>(nread), buf->len);
230 char* base = node::Realloc(buf->base, nread);
232 if (pending == UV_TCP) {
233 pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(env,
wrap);
234 }
else if (pending == UV_NAMED_PIPE) {
235 pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(env,
wrap);
236 }
else if (pending == UV_UDP) {
237 pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(env,
wrap);
239 CHECK_EQ(pending, UV_UNKNOWN_HANDLE);
242 Local<Object> obj =
Buffer::New(env, base, nread).ToLocalChecked();
243 wrap->EmitData(nread, obj, pending_obj);
247 void StreamWrap::OnRead(uv_stream_t* handle,
249 const uv_buf_t* buf) {
250 StreamWrap* wrap =
static_cast<StreamWrap*
>(handle->data);
251 HandleScope scope(wrap->env()->isolate());
252 Context::Scope context_scope(wrap->env()->context());
253 uv_handle_type type = UV_UNKNOWN_HANDLE;
255 if (wrap->is_named_pipe_ipc() &&
256 uv_pipe_pending_count(reinterpret_cast<uv_pipe_t*>(handle)) > 0) {
257 type = uv_pipe_pending_type(reinterpret_cast<uv_pipe_t*>(handle));
262 CHECK_EQ(wrap->persistent().IsEmpty(),
false);
265 if (wrap->is_tcp()) {
267 }
else if (wrap->is_named_pipe()) {
272 static_cast<StreamBase*
>(
wrap)->OnRead(nread, buf, type);
276 void StreamWrap::SetBlocking(
const FunctionCallbackInfo<Value>& args) {
278 ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
280 CHECK_GT(args.Length(), 0);
281 if (!wrap->IsAlive())
282 return args.GetReturnValue().Set(UV_EINVAL);
284 bool enable = args[0]->IsTrue();
285 args.GetReturnValue().Set(uv_stream_set_blocking(wrap->stream(), enable));
289 int StreamWrap::DoShutdown(ShutdownWrap* req_wrap) {
291 err = uv_shutdown(req_wrap->req(), stream(), AfterShutdown);
292 req_wrap->Dispatched();
297 void StreamWrap::AfterShutdown(uv_shutdown_t*
req,
int status) {
298 ShutdownWrap* req_wrap = ShutdownWrap::from_req(req);
299 CHECK_NE(req_wrap,
nullptr);
300 HandleScope scope(req_wrap->env()->isolate());
301 Context::Scope context_scope(req_wrap->env()->context());
302 req_wrap->Done(status);
310 int StreamWrap::DoTryWrite(uv_buf_t** bufs,
size_t* count) {
313 uv_buf_t* vbufs = *bufs;
314 size_t vcount = *count;
316 err = uv_try_write(stream(), vbufs, vcount);
317 if (err == UV_ENOSYS || err == UV_EAGAIN)
325 for (; vcount > 0; vbufs++, vcount--) {
327 if (vbufs[0].
len > written) {
328 vbufs[0].base += written;
329 vbufs[0].len -= written;
335 written -= vbufs[0].len;
346 int StreamWrap::DoWrite(WriteWrap* w,
349 uv_stream_t* send_handle) {
351 if (send_handle ==
nullptr) {
352 r = uv_write(w->req(), stream(), bufs, count, AfterWrite);
354 r = uv_write2(w->req(), stream(), bufs, count, send_handle, AfterWrite);
359 for (
size_t i = 0; i < count; i++)
360 bytes += bufs[i].
len;
361 if (stream()->type == UV_TCP) {
363 }
else if (stream()->type == UV_NAMED_PIPE) {
369 UpdateWriteQueueSize();
375 void StreamWrap::AfterWrite(uv_write_t* req,
int status) {
376 WriteWrap* req_wrap = WriteWrap::from_req(req);
377 CHECK_NE(req_wrap,
nullptr);
378 HandleScope scope(req_wrap->env()->isolate());
379 Context::Scope context_scope(req_wrap->env()->context());
380 req_wrap->Done(status);
384 void StreamWrap::OnAfterWriteImpl(WriteWrap* w,
void* ctx) {
385 StreamWrap* wrap =
static_cast<StreamWrap*
>(
ctx);
386 wrap->UpdateWriteQueueSize();
void NODE_COUNT_NET_BYTES_SENT(int bytes)
NODE_MODULE_CONTEXT_AWARE_BUILTIN(inspector, node::inspector::Agent::InitInspector)
void NODE_COUNT_PIPE_BYTES_SENT(int bytes)
void NODE_COUNT_NET_BYTES_RECV(int bytes)
void Initialize(Local< Object > target, Local< Value > unused, Local< Context > context, void *priv)
void NODE_COUNT_PIPE_BYTES_RECV(int bytes)
MaybeLocal< Object > New(Isolate *isolate, Local< String > string, enum encoding enc)