1 #ifndef SRC_STREAM_BASE_H_ 2 #define SRC_STREAM_BASE_H_ 4 #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS 23 typedef void (*DoneCb)(Req*
req,
int status);
25 explicit StreamReq(DoneCb cb) : cb_(cb) {
28 inline void Done(
int status,
const char* error_str =
nullptr) {
29 Req* req =
static_cast<Req*
>(
this);
30 Environment* env = req->env();
31 if (error_str !=
nullptr) {
32 req->object()->Set(env->error_string(),
33 OneByteString(env->isolate(), error_str));
43 class ShutdownWrap :
public ReqWrap<uv_shutdown_t>,
44 public StreamReq<ShutdownWrap> {
46 ShutdownWrap(Environment* env,
47 v8::Local<v8::Object> req_wrap_obj,
50 : ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP),
51 StreamReq<ShutdownWrap>(cb),
53 Wrap(req_wrap_obj,
this);
60 static ShutdownWrap* from_req(uv_shutdown_t* req) {
61 return ContainerOf(&ShutdownWrap::req_, req);
64 inline StreamBase*
wrap()
const {
return wrap_; }
65 size_t self_size()
const override {
return sizeof(*this); }
68 StreamBase*
const wrap_;
71 class WriteWrap:
public ReqWrap<uv_write_t>,
72 public StreamReq<WriteWrap> {
74 static inline WriteWrap*
New(Environment* env,
75 v8::Local<v8::Object> obj,
79 inline void Dispose();
80 inline char* Extra(
size_t offset = 0);
82 inline StreamBase*
wrap()
const {
return wrap_; }
84 size_t self_size()
const override {
return storage_size_; }
86 static WriteWrap* from_req(uv_write_t* req) {
87 return ContainerOf(&WriteWrap::req_, req);
90 static const size_t kAlignSize = 16;
92 WriteWrap(Environment* env,
93 v8::Local<v8::Object> obj,
96 : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
97 StreamReq<WriteWrap>(cb),
104 WriteWrap(Environment* env,
105 v8::Local<v8::Object> obj,
109 : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
110 StreamReq<WriteWrap>(cb),
112 storage_size_(storage_size) {
120 void*
operator new(
size_t size) =
delete;
121 void*
operator new(
size_t size,
char* storage) {
return storage; }
125 void operator delete(
void* ptr,
char* storage) { UNREACHABLE(); }
130 void operator delete(
void* ptr) { UNREACHABLE(); }
132 StreamBase*
const wrap_;
133 const size_t storage_size_;
136 class StreamResource {
141 Callback(T fn,
void*
ctx) : fn(fn), ctx(ctx) {}
142 Callback(
const Callback&) =
default;
144 inline bool is_empty() {
return fn ==
nullptr; }
145 inline void clear() {
154 typedef void (*AfterWriteCb)(WriteWrap* w,
void*
ctx);
155 typedef void (*AllocCb)(
size_t size, uv_buf_t*
buf,
void*
ctx);
156 typedef void (*ReadCb)(ssize_t nread,
158 uv_handle_type pending,
160 typedef void (*DestructCb)(
void*
ctx);
162 StreamResource() : bytes_read_(0) {
164 virtual ~StreamResource() {
165 if (!destruct_cb_.is_empty())
166 destruct_cb_.fn(destruct_cb_.ctx);
169 virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
170 virtual int DoTryWrite(uv_buf_t** bufs,
size_t* count);
171 virtual int DoWrite(WriteWrap* w,
174 uv_stream_t* send_handle) = 0;
175 virtual const char* Error()
const;
176 virtual void ClearError();
179 inline void OnAfterWrite(WriteWrap* w) {
180 if (!after_write_cb_.is_empty())
181 after_write_cb_.fn(w, after_write_cb_.ctx);
184 inline void OnAlloc(
size_t size, uv_buf_t* buf) {
185 if (!alloc_cb_.is_empty())
186 alloc_cb_.fn(size, buf, alloc_cb_.ctx);
189 inline void OnRead(ssize_t nread,
191 uv_handle_type pending = UV_UNKNOWN_HANDLE) {
193 bytes_read_ +=
static_cast<uint64_t
>(nread);
194 if (!read_cb_.is_empty())
195 read_cb_.fn(nread, buf, pending, read_cb_.ctx);
198 inline void set_after_write_cb(Callback<AfterWriteCb> c) {
202 inline void set_alloc_cb(Callback<AllocCb> c) { alloc_cb_ = c; }
203 inline void set_read_cb(Callback<ReadCb> c) { read_cb_ = c; }
204 inline void set_destruct_cb(Callback<DestructCb> c) { destruct_cb_ = c; }
206 inline Callback<AfterWriteCb> after_write_cb() {
return after_write_cb_; }
207 inline Callback<AllocCb> alloc_cb() {
return alloc_cb_; }
208 inline Callback<ReadCb> read_cb() {
return read_cb_; }
209 inline Callback<DestructCb> destruct_cb() {
return destruct_cb_; }
212 Callback<AfterWriteCb> after_write_cb_;
213 Callback<AllocCb> alloc_cb_;
214 Callback<ReadCb> read_cb_;
215 Callback<DestructCb> destruct_cb_;
216 uint64_t bytes_read_;
218 friend class StreamBase;
221 class StreamBase :
public StreamResource {
225 kFlagHasWritev = 0x1,
226 kFlagNoShutdown = 0x2
229 template <
class Base>
230 static inline void AddMethods(Environment* env,
231 v8::Local<v8::FunctionTemplate> target,
232 int flags = kFlagNone);
234 virtual void* Cast() = 0;
235 virtual bool IsAlive() = 0;
236 virtual bool IsClosing() = 0;
237 virtual bool IsIPCPipe();
240 virtual int ReadStart() = 0;
241 virtual int ReadStop() = 0;
243 inline void Consume() {
244 CHECK_EQ(consumed_,
false);
248 inline void Unconsume() {
249 CHECK_EQ(consumed_,
true);
253 template <
class Outer>
254 inline Outer* Cast() {
return static_cast<Outer*
>(Cast()); }
256 void EmitData(ssize_t nread,
257 v8::Local<v8::Object> buf,
258 v8::Local<v8::Object> handle);
261 explicit StreamBase(Environment* env) :
env_(env), consumed_(false) {
264 virtual ~StreamBase() =
default;
267 virtual AsyncWrap* GetAsyncWrap() = 0;
268 virtual v8::Local<v8::Object> GetObject();
271 static void AfterShutdown(ShutdownWrap* req,
int status);
272 static void AfterWrite(WriteWrap* req,
int status);
275 int ReadStart(
const v8::FunctionCallbackInfo<v8::Value>& args);
276 int ReadStop(
const v8::FunctionCallbackInfo<v8::Value>& args);
277 int Shutdown(
const v8::FunctionCallbackInfo<v8::Value>& args);
278 int Writev(
const v8::FunctionCallbackInfo<v8::Value>& args);
279 int WriteBuffer(
const v8::FunctionCallbackInfo<v8::Value>& args);
280 template <enum encoding enc>
281 int WriteString(
const v8::FunctionCallbackInfo<v8::Value>& args);
283 template <
class Base>
284 static void GetFD(v8::Local<v8::String> key,
285 const v8::PropertyCallbackInfo<v8::Value>& args);
287 template <
class Base>
288 static void GetExternal(v8::Local<v8::String> key,
289 const v8::PropertyCallbackInfo<v8::Value>& args);
291 template <
class Base>
292 static void GetBytesRead(v8::Local<v8::String> key,
293 const v8::PropertyCallbackInfo<v8::Value>& args);
295 template <
class Base,
296 int (StreamBase::*Method)(
297 const v8::FunctionCallbackInfo<v8::Value>& args)>
298 static void JSMethod(
const v8::FunctionCallbackInfo<v8::Value>& args);
307 #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS 309 #endif // SRC_STREAM_BASE_H_
MaybeLocal< Object > New(Isolate *isolate, Local< String > string, enum encoding enc)