#include <stdlib.h>
#include <string.h>

#include "ant.h"
#include "ptr.h"
#include "internal.h"
#include "silver/engine.h"
#include "esm/loader.h"
#include "gc/roots.h"

#include "modules/assert.h"
#include "modules/buffer.h"
#include "modules/events.h"
#include "modules/stream.h"
#include "modules/symbol.h"
#include "modules/string_decoder.h"

enum { STREAM_NATIVE_TAG = 0x5354524Du }; // STRM

static ant_value_t g_stream_proto = 0;
static ant_value_t g_stream_ctor  = 0;

static ant_value_t g_readable_proto = 0;
static ant_value_t g_readable_ctor  = 0;

static ant_value_t g_writable_proto = 0;
static ant_value_t g_writable_ctor  = 0;

static ant_value_t g_duplex_proto = 0;
static ant_value_t g_duplex_ctor  = 0;

static ant_value_t g_transform_proto = 0;
static ant_value_t g_transform_ctor  = 0;

static ant_value_t g_passthrough_proto = 0;
static ant_value_t g_passthrough_ctor  = 0;

static double g_default_high_water_mark = 16384.0;
static double g_default_object_high_water_mark = 16.0;

static ant_value_t stream_noop(ant_t *js, ant_value_t *args, int nargs) {
  return js_mkundef();
}

static bool stream_is_instance(ant_value_t value) {
  return is_object_type(value) && js_check_native_tag(value, STREAM_NATIVE_TAG);
}

static inline void stream_set_end_callback(ant_t *js, ant_value_t stream_obj, ant_value_t callback) {
  js_set_slot_wb(js, stream_obj, SLOT_AUX, callback);
}

static stream_private_state_t *stream_private_state(ant_value_t stream_obj) {
  if (!stream_is_instance(stream_obj)) return NULL;
  return (stream_private_state_t *)js_get_native_ptr(stream_obj);
}

static ant_value_t stream_require_this(ant_t *js, ant_value_t value, const char *label) {
  if (!stream_is_instance(value))
    return js_mkerr_typed(js, JS_ERR_TYPE, "Invalid %s", label);
  return value;
}

static ant_value_t stream_truthy_or_object(ant_t *js, ant_value_t value) {
  return js_truthy(js, value) ? value : js_mkobj(js);
}

static ant_value_t stream_readable_state(ant_t *js, ant_value_t stream_obj) {
  return js_get(js, stream_obj, "_readableState");
}

static ant_value_t stream_writable_state(ant_t *js, ant_value_t stream_obj) {
  return js_get(js, stream_obj, "_writableState");
}

static ant_value_t stream_pipes(ant_t *js, ant_value_t stream_obj) {
  return js_get(js, stream_obj, "_pipes");
}

static bool stream_key_is_cstr(ant_t *js, ant_value_t value, const char *expected) {
  size_t len = 0;
  const char *s = NULL;
  if (vtype(value) != T_STR) return false;
  s = js_getstr(js, value, &len);
  return s && len == strlen(expected) && memcmp(s, expected, len) == 0;
}

static ant_value_t stream_event_key(ant_t *js, ant_value_t value) {
  uint8_t t = vtype(value);
  if (t == T_STR || t == T_SYMBOL) return value;
  return js_mkerr(js, "event must be a string or Symbol");
}

void *stream_get_attached_state(ant_value_t stream_obj) {
  stream_private_state_t *priv = stream_private_state(stream_obj);
  return priv ? priv->attached_state : NULL;
}

void stream_set_attached_state(
  ant_value_t stream_obj,
  void *state,
  stream_finalize_fn finalize
) {
  stream_private_state_t *priv = stream_private_state(stream_obj);
  if (!priv) return;
  priv->attached_state = state;
  priv->attached_state_finalize = finalize;
}

void stream_clear_attached_state(ant_value_t stream_obj) {
  stream_private_state_t *priv = stream_private_state(stream_obj);
  if (!priv) return;
  priv->attached_state = NULL;
  priv->attached_state_finalize = NULL;
}

static void stream_finalize(ant_t *js, ant_object_t *obj) {
  ant_value_t stream_obj = js_obj_from_ptr(obj);
  stream_private_state_t *priv = stream_private_state(stream_obj);
  if (!priv) return;
  js_set_native_ptr(stream_obj, NULL);
  if (priv->attached_state && priv->attached_state_finalize)
    priv->attached_state_finalize(js, stream_obj, priv->attached_state);
  free(priv);
}

static ant_value_t stream_call(
  ant_t *js,
  ant_value_t fn,
  ant_value_t this_val,
  ant_value_t *args,
  int nargs,
  bool is_ctor
) {
  if (!is_callable(fn)) return js_mkundef();
  if (sv_check_c_stack_overflow(js))
    return js_mkerr_typed(js, JS_ERR_RANGE | JS_ERR_NO_STACK, "Maximum call stack size exceeded");

  sv_call_mode_t mode = is_ctor ? SV_CALL_MODE_CONSTRUCT : SV_CALL_MODE_NORMAL;
  sv_call_plan_t plan;
  ant_value_t err = sv_prepare_call(js->vm, js, fn, this_val, args, nargs, NULL, mode, &plan);
  if (is_err(err)) return err;

  return sv_execute_call_plan(js->vm, js, &plan, NULL);
}

static ant_value_t stream_call_prop(
  ant_t *js,
  ant_value_t target,
  const char *name,
  ant_value_t *args,
  int nargs
) {
  ant_value_t fn = js_getprop_fallback(js, target, name);
  if (is_err(fn) || !is_callable(fn)) return js_mkundef();
  return stream_call(js, fn, target, args, nargs, false);
}

static void stream_call_callback(ant_t *js, ant_value_t fn, ant_value_t *args, int nargs) {
  if (!is_callable(fn)) return;
  stream_call(js, fn, js_mkundef(), args, nargs, false);
}

static void stream_schedule_microtask(ant_t *js, ant_cfunc_t fn, ant_value_t data) {
  ant_value_t promise = js_mkpromise(js);
  ant_value_t cb = js_heavy_mkfun(js, fn, data);
  ant_value_t then_result = 0;

  js_resolve_promise(js, promise, js_mkundef());
  then_result = js_promise_then(js, promise, cb, js_mkundef());
  promise_mark_handled(then_result);
}

static ant_value_t stream_buffer_ctor(ant_t *js) {
  ant_value_t ns = js_esm_import_sync_cstr(js, "buffer", 6);
  if (is_err(ns)) return ns;
  return js_get(js, ns, "Buffer");
}

static ant_value_t stream_readable_decoder(ant_t *js, ant_value_t stream_obj) {
  ant_value_t state = stream_readable_state(js, stream_obj);
  if (!is_object_type(state)) return js_mkundef();
  return js_get(js, state, "decoder");
}

static ant_value_t stream_readable_decode_chunk(
  ant_t *js, ant_value_t stream_obj,
  ant_value_t chunk, bool flush
) {
  ant_value_t decoder = stream_readable_decoder(js, stream_obj);
  if (!is_object_type(decoder)) return chunk;
  return string_decoder_decode_value(js, decoder, chunk, flush);
}

static bool stream_value_is_empty_string(ant_t *js, ant_value_t value) {
  size_t len = 0;
  if (vtype(value) != T_STR) return false;
  (void)js_getstr(js, value, &len);
  return len == 0;
}

static ant_value_t stream_make_buffer(ant_t *js, ant_value_t value, ant_value_t encoding) {
  ant_value_t buffer_ctor = stream_buffer_ctor(js);
  ant_value_t from_fn = 0;
  ant_value_t args[2];

  if (is_err(buffer_ctor)) return buffer_ctor;
  from_fn = js_get(js, buffer_ctor, "from");
  if (is_err(from_fn) || !is_callable(from_fn))
    return js_mkerr(js, "Buffer.from is not available");
    
  args[0] = value;
  args[1] = encoding;
  return stream_call(js, from_fn, buffer_ctor, args, 2, false);
}

static ant_value_t stream_normalize_chunk(
  ant_t *js,
  ant_value_t chunk,
  bool object_mode,
  ant_value_t encoding
) {
  ant_value_t str_val = 0;

  if (
    object_mode || is_null(chunk) || is_undefined(chunk) ||
    vtype(chunk) == T_TYPEDARRAY || buffer_is_binary_source(chunk)
  ) return chunk;

  if (vtype(chunk) == T_STR) return stream_make_buffer(js, chunk, encoding);

  str_val = js_tostring_val(js, chunk);
  if (is_err(str_val)) return str_val;
  
  return stream_make_buffer(js, str_val, encoding);
}

static ant_value_t stream_readable_buffer(ant_t *js, ant_value_t stream_obj) {
  ant_value_t state = stream_readable_state(js, stream_obj);
  if (!is_object_type(state)) return js_mkundef();
  return js_get(js, state, "buffer");
}

static ant_offset_t stream_readable_buffer_head(ant_t *js, ant_value_t stream_obj) {
  ant_value_t state = stream_readable_state(js, stream_obj);
  ant_value_t head = is_object_type(state) ? js_get(js, state, "bufferHead") : js_mkundef();
  return vtype(head) == T_NUM ? (ant_offset_t)js_getnum(head) : 0;
}

static void stream_set_readable_buffer_head(ant_t *js, ant_value_t stream_obj, ant_offset_t head) {
  ant_value_t state = stream_readable_state(js, stream_obj);
  if (is_object_type(state)) js_set(js, state, "bufferHead", js_mknum((double)head));
}

static ant_offset_t stream_readable_buffer_len(ant_t *js, ant_value_t stream_obj) {
  ant_value_t buffer = stream_readable_buffer(js, stream_obj);
  ant_offset_t head = stream_readable_buffer_head(js, stream_obj);
  ant_offset_t len = vtype(buffer) == T_ARR ? js_arr_len(js, buffer) : 0;
  return len > head ? len - head : 0;
}

static void stream_compact_readable_buffer(ant_t *js, ant_value_t stream_obj) {
  ant_value_t state = stream_readable_state(js, stream_obj);
  ant_value_t buffer = stream_readable_buffer(js, stream_obj);
  ant_offset_t head = stream_readable_buffer_head(js, stream_obj);
  ant_offset_t len = vtype(buffer) == T_ARR ? js_arr_len(js, buffer) : 0;
  ant_value_t compact = 0;

  if (!is_object_type(state) || vtype(buffer) != T_ARR) return;
  if (head == 0) return;
  
  if (head >= len) {
    compact = js_mkarr(js);
    js_set(js, state, "buffer", compact);
    js_set(js, state, "bufferHead", js_mknum(0));
    return;
  }

  if (head <= 32 && head * 2 < len) return;
  compact = js_mkarr(js);
  for (ant_offset_t i = head; i < len; i++) js_arr_push(js, compact, js_arr_get(js, buffer, i));
  
  js_set(js, state, "buffer", compact);
  js_set(js, state, "bufferHead", js_mknum(0));
}

static void stream_buffer_push(ant_t *js, ant_value_t stream_obj, ant_value_t value) {
  ant_value_t state = stream_readable_state(js, stream_obj);
  ant_value_t buffer = stream_readable_buffer(js, stream_obj);

  if (!is_object_type(state) || vtype(buffer) != T_ARR) return;
  js_arr_push(js, buffer, value);
}

static ant_value_t stream_buffer_shift(ant_t *js, ant_value_t stream_obj) {
  ant_value_t buffer = stream_readable_buffer(js, stream_obj);
  ant_offset_t head = stream_readable_buffer_head(js, stream_obj);
  ant_offset_t len = vtype(buffer) == T_ARR ? js_arr_len(js, buffer) : 0;
  ant_value_t value = js_mkundef();

  if (vtype(buffer) != T_ARR || head >= len) return js_mkundef();
  value = js_arr_get(js, buffer, head);
  stream_set_readable_buffer_head(js, stream_obj, head + 1);
  stream_compact_readable_buffer(js, stream_obj);
  return value;
}

static bool stream_listener_count_positive(ant_t *js, ant_value_t target, const char *event_name) {
  ant_value_t args[1];
  ant_value_t result = 0;

  args[0] = js_mkstr(js, event_name, strlen(event_name));
  result = stream_call_prop(js, target, "listenerCount", args, 1);
  return vtype(result) == T_NUM && js_getnum(result) > 0;
}

static void stream_remove_listener(
  ant_t *js,
  ant_value_t target,
  const char *event_name,
  ant_value_t listener
) {
  ant_value_t args[2];
  args[0] = js_mkstr(js, event_name, strlen(event_name));
  args[1] = listener;
  stream_call_prop(js, target, "removeListener", args, 2);
}

static ant_value_t stream_get_option(ant_t *js, ant_value_t options, const char *name) {
  if (!is_object_type(options)) return js_mkundef();
  return js_get(js, options, name);
}

static double stream_default_high_water_mark(bool object_mode) {
  return object_mode ? g_default_object_high_water_mark : g_default_high_water_mark;
}

static double stream_high_water_mark_from_options(ant_t *js, ant_value_t options, bool object_mode) {
  ant_value_t hwm = stream_get_option(js, options, "highWaterMark");
  return (vtype(hwm) == T_NUM && js_getnum(hwm) > 0)
    ? js_getnum(hwm)
    : stream_default_high_water_mark(object_mode);
}

static ant_value_t stream_make_base_object(ant_t *js, ant_value_t proto) {
  ant_value_t obj = js_mkobj(js);
  stream_private_state_t *priv = calloc(1, sizeof(*priv));
  
  if (is_object_type(proto)) js_set_proto_init(obj, proto);
  js_set_native_tag(obj, STREAM_NATIVE_TAG);
  
  if (priv) js_set_native_ptr(obj, priv);
  js_set_slot(obj, SLOT_AUX, js_mkundef());
  js_set_finalizer(obj, stream_finalize);
  
  return obj;
}

static void stream_init_base(ant_t *js, ant_value_t obj, ant_value_t raw_options) {
  ant_value_t pipes = js_mkarr(js);
  js_set(js, obj, "readable", js_true);
  js_set(js, obj, "writable", js_true);
  js_set(js, obj, "destroyed", js_false);
  js_set(js, obj, "_paused", js_false);
  js_set(js, obj, "_pipes", pipes);
  js_set(js, obj, "_streamOptions", stream_truthy_or_object(js, raw_options));
}

static void stream_init_readable(ant_t *js, ant_value_t obj, ant_value_t raw_options) {
  ant_value_t options = is_object_type(raw_options) ? raw_options : js_mkobj(js);
  ant_value_t state = js_mkobj(js);
  ant_value_t read_fn = stream_get_option(js, options, "read");

  bool object_mode = js_truthy(js, stream_get_option(js, options, "objectMode"));
  double high_water_mark = stream_high_water_mark_from_options(js, options, object_mode);

  stream_init_base(js, obj, raw_options);
  js_set(js, obj, "readable", js_true);
  js_set(js, obj, "writable", js_false);
  js_set(js, obj, "readableEnded", js_false);

  js_set(js, state, "objectMode", js_bool(object_mode));
  js_set(js, state, "ended", js_false);
  js_set(js, state, "endEmitted", js_false);
  js_set(js, state, "flowing", js_false);
  js_set(js, state, "flowingReadScheduled", js_false);
  js_set(js, state, "reading", js_false);
  js_set(js, state, "highWaterMark", js_mknum(high_water_mark));
  js_set(js, state, "buffer", js_mkarr(js));
  js_set(js, state, "bufferHead", js_mknum(0));
  js_set(js, obj, "_readableState", state);

  if (is_callable(read_fn)) js_set(js, obj, "_read", read_fn);
}

static void stream_init_writable(ant_t *js, ant_value_t obj, ant_value_t raw_options) {
  ant_value_t options = is_object_type(raw_options) ? raw_options : js_mkobj(js);
  ant_value_t state = js_mkobj(js);
  bool object_mode = js_truthy(js, stream_get_option(js, options, "objectMode"))
    || js_truthy(js, stream_get_option(js, options, "writableObjectMode"));
  ant_value_t write_fn = stream_get_option(js, options, "write");
  ant_value_t final_fn = stream_get_option(js, options, "final");
  ant_value_t destroy_fn = stream_get_option(js, options, "destroy");

  stream_init_base(js, obj, raw_options);
  js_set(js, obj, "readable", js_false);
  js_set(js, obj, "writable", js_true);
  js_set(js, obj, "writableEnded", js_false);
  js_set(js, obj, "writableFinished", js_false);

  js_set(js, state, "objectMode", js_bool(object_mode));
  js_set(js, state, "finished", js_false);
  js_set(js, state, "ended", js_false);
  js_set(js, obj, "_writableState", state);

  if (is_callable(write_fn)) js_set(js, obj, "_write", write_fn);
  if (is_callable(final_fn)) js_set(js, obj, "_final", final_fn);
  if (is_callable(destroy_fn)) js_set(js, obj, "_destroy", destroy_fn);
}

static ant_value_t stream_construct(
  ant_t *js,
  ant_value_t base_proto,
  ant_value_t raw_options,
  void (*init_fn)(ant_t *, ant_value_t, ant_value_t)
) {
  ant_value_t proto = js_instance_proto_from_new_target(js, base_proto);
  ant_value_t obj = stream_make_base_object(js, is_object_type(proto) ? proto : base_proto);
  init_fn(js, obj, raw_options);
  return obj;
}

static ant_value_t stream_emit_named(ant_t *js, ant_value_t stream_obj, const char *event_name) {
  return js_bool(eventemitter_emit_args(js, stream_obj, event_name, NULL, 0));
}

static void stream_emit_error(ant_t *js, ant_value_t stream_obj, ant_value_t error) {
  ant_value_t args[1];
  args[0] = error;
  eventemitter_emit_args(js, stream_obj, "error", args, 1);
}

static void stream_readable_schedule_continue_flowing(ant_t *js, ant_value_t stream_obj) {
  ant_value_t state = stream_readable_state(js, stream_obj);

  if (!is_object_type(state)) return;
  if (!js_truthy(js, js_get(js, state, "flowing"))) return;
  if (js_truthy(js, js_get(js, stream_obj, "destroyed"))) return;
  if (js_truthy(js, js_get(js, state, "ended"))) return;
  if (stream_readable_buffer_len(js, stream_obj) > 0) return;
  if (js_truthy(js, js_get(js, state, "flowingReadScheduled"))) return;

  js_set(js, state, "flowingReadScheduled", js_true);
  stream_schedule_microtask(js, stream_readable_continue_flowing, stream_obj);
}

static ant_value_t js_stream_pause(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "stream");
  if (is_err(stream_obj)) return stream_obj;

  js_set(js, stream_obj, "_paused", js_true);
  stream_emit_named(js, stream_obj, "pause");
  return stream_obj;
}

static ant_value_t js_stream_resume(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "stream");
  if (is_err(stream_obj)) return stream_obj;

  js_set(js, stream_obj, "_paused", js_false);
  stream_emit_named(js, stream_obj, "resume");
  return stream_obj;
}

static ant_value_t js_stream_is_paused(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "stream");
  ant_value_t paused = 0;
  if (is_err(stream_obj)) return stream_obj;
  paused = js_get(js, stream_obj, "_paused");
  return js_bool(js_truthy(js, paused));
}

static void stream_pipe_remove_state(ant_t *js, ant_value_t source, ant_value_t state_obj) {
  ant_value_t pipes = stream_pipes(js, source);
  ant_offset_t len = vtype(pipes) == T_ARR ? js_arr_len(js, pipes) : 0;
  ant_value_t next = js_mkarr(js);

  for (ant_offset_t i = 0; i < len; i++) {
    ant_value_t item = js_arr_get(js, pipes, i);
    if (item != state_obj) js_arr_push(js, next, item);
  }

  js_set(js, source, "_pipes", next);
}

static void stream_pipe_cleanup(ant_t *js, ant_value_t state_obj) {
  ant_value_t cleaned = js_get(js, state_obj, "cleaned");
  ant_value_t source = js_get(js, state_obj, "source");
  ant_value_t dest = js_get(js, state_obj, "dest");
  ant_value_t on_data = js_get(js, state_obj, "onData");
  ant_value_t on_drain = js_get(js, state_obj, "onDrain");
  ant_value_t on_end = js_get(js, state_obj, "onEnd");
  ant_value_t on_close = js_get(js, state_obj, "onClose");
  ant_value_t on_error = js_get(js, state_obj, "onError");

  if (js_truthy(js, cleaned)) return;
  js_set(js, state_obj, "cleaned", js_true);

  if (stream_is_instance(source)) {
    stream_remove_listener(js, source, "data", on_data);
    stream_remove_listener(js, source, "end", on_end);
    stream_remove_listener(js, source, "close", on_close);
    stream_remove_listener(js, source, "error", on_error);
    stream_pipe_remove_state(js, source, state_obj);
  }

  if (is_object_type(dest))
    stream_remove_listener(js, dest, "drain", on_drain);
}

static ant_value_t stream_pipe_on_data(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
  ant_value_t source = js_get(js, state_obj, "source");
  ant_value_t dest = js_get(js, state_obj, "dest");
  ant_value_t result = js_mkundef();

  if (!is_object_type(dest)) return js_mkundef();
  result = stream_call_prop(js, dest, "write", nargs > 0 ? &args[0] : NULL, nargs > 0 ? 1 : 0);
  if (is_err(result)) return result;

  if (result == js_false) stream_call_prop(js, source, "pause", NULL, 0);
  return js_mkundef();
}

static ant_value_t stream_pipe_on_drain(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
  ant_value_t source = js_get(js, state_obj, "source");
  stream_call_prop(js, source, "resume", NULL, 0);
  return js_mkundef();
}

static ant_value_t stream_pipe_on_end(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
  ant_value_t dest = js_get(js, state_obj, "dest");
  bool end_dest = js_truthy(js, js_get(js, state_obj, "end"));
  stream_pipe_cleanup(js, state_obj);
  if (end_dest) stream_call_prop(js, dest, "end", NULL, 0);
  return js_mkundef();
}

static ant_value_t stream_pipe_on_close(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
  stream_pipe_cleanup(js, state_obj);
  return js_mkundef();
}

static ant_value_t stream_pipe_on_error(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
  ant_value_t dest = js_get(js, state_obj, "dest");
  stream_pipe_cleanup(js, state_obj);
  if (is_object_type(dest) && stream_listener_count_positive(js, dest, "error") && nargs > 0)
    eventemitter_emit_args(js, dest, "error", &args[0], 1);
  return js_mkundef();
}

static ant_value_t js_stream_pipe(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t source = stream_require_this(js, js_getthis(js), "stream");
  ant_value_t options = nargs > 1 ? args[1] : js_mkundef();
  ant_value_t state_obj = 0;
  ant_value_t readable_state = 0;
  bool end_dest = true;

  if (is_err(source)) return source;
  if (nargs < 1 || !is_object_type(args[0])) return js_mkerr(js, "pipe requires a destination stream");
  if (is_object_type(options)) {
    ant_value_t end_val = js_get(js, options, "end");
    if (!is_undefined(end_val)) end_dest = end_val != js_false;
  }

  state_obj = js_mkobj(js);
  js_set(js, state_obj, "source", source);
  js_set(js, state_obj, "dest", args[0]);
  js_set(js, state_obj, "end", js_bool(end_dest));
  js_set(js, state_obj, "cleaned", js_false);
  js_set(js, state_obj, "onData", js_heavy_mkfun(js, stream_pipe_on_data, state_obj));
  js_set(js, state_obj, "onDrain", js_heavy_mkfun(js, stream_pipe_on_drain, state_obj));
  js_set(js, state_obj, "onEnd", js_heavy_mkfun(js, stream_pipe_on_end, state_obj));
  js_set(js, state_obj, "onClose", js_heavy_mkfun(js, stream_pipe_on_close, state_obj));
  js_set(js, state_obj, "onError", js_heavy_mkfun(js, stream_pipe_on_error, state_obj));

  js_arr_push(js, stream_pipes(js, source), state_obj);
  eventemitter_add_listener(js, source, "data", js_get(js, state_obj, "onData"), false);
  eventemitter_add_listener(js, source, "end", js_get(js, state_obj, "onEnd"), true);
  eventemitter_add_listener(js, source, "close", js_get(js, state_obj, "onClose"), true);
  eventemitter_add_listener(js, source, "error", js_get(js, state_obj, "onError"), false);
  eventemitter_add_listener(js, args[0], "drain", js_get(js, state_obj, "onDrain"), false);
  eventemitter_emit_args(js, args[0], "pipe", &source, 1);
  readable_state = stream_readable_state(js, source);
  if (is_object_type(readable_state)) js_set(js, readable_state, "flowing", js_true);
  stream_call_prop(js, source, "resume", NULL, 0);

  return args[0];
}

static ant_value_t js_stream_unpipe(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t source = stream_require_this(js, js_getthis(js), "stream");
  ant_value_t pipes = 0;
  ant_value_t matches = 0;
  ant_offset_t len = 0;
  ant_value_t dest = nargs > 0 ? args[0] : js_mkundef();

  if (is_err(source)) return source;
  pipes = stream_pipes(js, source);
  if (vtype(pipes) != T_ARR) return source;

  matches = js_mkarr(js);
  len = js_arr_len(js, pipes);
  for (ant_offset_t i = 0; i < len; i++) {
    ant_value_t state_obj = js_arr_get(js, pipes, i);
    ant_value_t entry_dest = js_get(js, state_obj, "dest");
    if (!is_object_type(dest) || entry_dest == dest) js_arr_push(js, matches, state_obj);
  }

  len = js_arr_len(js, matches);
  for (ant_offset_t i = 0; i < len; i++) stream_pipe_cleanup(js, js_arr_get(js, matches, i));
  return source;
}

static ant_value_t stream_destroy_done(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
  ant_value_t stream_obj = js_get(js, state_obj, "stream");
  ant_value_t destroyed_err = (nargs > 0) ? args[0] : js_mkundef();
  if (!is_null(destroyed_err) && !is_undefined(destroyed_err)) stream_emit_error(js, stream_obj, destroyed_err);
  stream_emit_named(js, stream_obj, "close");
  return js_mkundef();
}

static ant_value_t stream_once_call(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
  ant_value_t fn = js_get(js, state_obj, "fn");
  ant_value_t this_val = js_get(js, state_obj, "thisVal");
  ant_value_t called = js_get(js, state_obj, "called");

  if (js_truthy(js, called)) return js_mkundef();
  js_set(js, state_obj, "called", js_true);
  return stream_call(js, fn, this_val, args, nargs, false);
}

static ant_value_t stream_make_once(ant_t *js, ant_value_t fn, ant_value_t this_val) {
  ant_value_t state_obj = js_mkobj(js);
  js_set(js, state_obj, "fn", fn);
  js_set(js, state_obj, "thisVal", this_val);
  js_set(js, state_obj, "called", js_false);
  return js_heavy_mkfun(js, stream_once_call, state_obj);
}

static ant_value_t js_stream_destroy(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "stream");
  ant_value_t destroy_fn = 0;
  ant_value_t done_state = 0;
  ant_value_t done = 0;
  ant_value_t destroy_args[2];
  ant_value_t error = nargs > 0 ? args[0] : js_mkundef();
  ant_value_t result = 0;

  if (is_err(stream_obj)) return stream_obj;
  if (js_truthy(js, js_get(js, stream_obj, "destroyed"))) return stream_obj;

  js_set(js, stream_obj, "destroyed", js_true);

  done_state = js_mkobj(js);
  js_set(js, done_state, "stream", stream_obj);
  done = stream_make_once(js, js_heavy_mkfun(js, stream_destroy_done, done_state), js_mkundef());
  destroy_fn = js_getprop_fallback(js, stream_obj, "_destroy");

  if (is_callable(destroy_fn)) {
    destroy_args[0] = is_undefined(error) ? js_mknull() : error;
    destroy_args[1] = done;
    result = stream_call(js, destroy_fn, stream_obj, destroy_args, 2, false);
    return is_err(result) ? result : stream_obj;
  }

  destroy_args[0] = is_undefined(error) ? js_mknull() : error;
  stream_call_callback(js, done, destroy_args, 1);
  return stream_obj;
}

static ant_value_t js_readable__read(ant_t *js, ant_value_t *args, int nargs) {
  return js_mkundef();
}

static ant_value_t stream_readable_start_flowing(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t stream_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
  return stream_readable_begin_flowing(js, stream_obj);
}

ant_value_t stream_readable_continue_flowing(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t stream_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
  ant_value_t state = stream_readable_state(js, stream_obj);

  if (!is_object_type(state)) return js_mkundef();
  js_set(js, state, "flowingReadScheduled", js_false);

  if (!js_truthy(js, js_get(js, state, "flowing"))) return js_mkundef();
  if (js_truthy(js, js_get(js, stream_obj, "destroyed"))) return js_mkundef();
  if (js_truthy(js, js_get(js, state, "ended"))) return js_mkundef();

  stream_readable_maybe_read(js, stream_obj);
  stream_readable_flush(js, stream_obj);
  
  return js_mkundef();
}

ant_value_t stream_readable_begin_flowing(ant_t *js, ant_value_t stream_obj) {
  ant_value_t state = stream_readable_state(js, stream_obj);

  if (!is_object_type(state)) return js_mkundef();
  if (!js_truthy(js, js_get(js, state, "flowing"))) return js_mkundef();

  {
    ant_value_t saved_this = js->this_val;
    js->this_val = stream_obj;
    js_stream_resume(js, NULL, 0);
    js->this_val = saved_this;
  }

  stream_readable_maybe_read(js, stream_obj);
  stream_readable_flush(js, stream_obj);
  
  return js_mkundef();
}

ant_value_t stream_readable_flush(ant_t *js, ant_value_t stream_obj) {
  ant_value_t state = stream_readable_state(js, stream_obj);
  bool emitted_data = false;

  if (!is_object_type(state)) return js_mkundef();

  while (js_truthy(js, js_get(js, state, "flowing")) && stream_readable_buffer_len(js, stream_obj) > 0) {
    ant_value_t chunk = stream_buffer_shift(js, stream_obj);
    chunk = stream_readable_decode_chunk(js, stream_obj, chunk, false);
    if (is_err(chunk)) return chunk;
    emitted_data = true;
    eventemitter_emit_args(js, stream_obj, "data", &chunk, 1);
  }

  if (
    js_truthy(js, js_get(js, state, "ended")) &&
    stream_readable_buffer_len(js, stream_obj) == 0 &&
    !js_truthy(js, js_get(js, state, "endEmitted"))
  ) {
    ant_value_t tail = stream_readable_decode_chunk(js, stream_obj, js_mkundef(), true);
    if (is_err(tail)) return tail;
    if (!is_undefined(tail) && !stream_value_is_empty_string(js, tail)) {
      emitted_data = true;
      eventemitter_emit_args(js, stream_obj, "data", &tail, 1);
    }
    js_set(js, state, "endEmitted", js_true);
    js_set(js, stream_obj, "readableEnded", js_true);
    stream_emit_named(js, stream_obj, "end");
    stream_emit_named(js, stream_obj, "close");
  } else if (emitted_data) stream_readable_schedule_continue_flowing(js, stream_obj);

  return js_mkundef();
}

ant_value_t stream_readable_maybe_read(ant_t *js, ant_value_t stream_obj) {
  ant_value_t state = stream_readable_state(js, stream_obj);
  ant_value_t read_fn = 0;
  ant_value_t args[1];

  if (!is_object_type(state)) return js_mkundef();
  if (js_truthy(js, js_get(js, stream_obj, "destroyed"))) return js_mkundef();
  if (js_truthy(js, js_get(js, state, "reading"))) return js_mkundef();
  if (js_truthy(js, js_get(js, state, "ended"))) return js_mkundef();
  if (stream_readable_buffer_len(js, stream_obj) > 0) return js_mkundef();

  read_fn = js_getprop_fallback(js, stream_obj, "_read");
  js_set(js, state, "reading", js_true);
  args[0] = js_get(js, state, "highWaterMark");
  
  if (is_callable(read_fn)) stream_call(js, read_fn, stream_obj, args, 1, false);
  js_set(js, state, "reading", js_false);
  
  return js_mkundef();
}

ant_value_t stream_readable_push_value(
  ant_t *js,
  ant_value_t stream_obj,
  ant_value_t chunk,
  ant_value_t encoding
) {
  ant_value_t state = stream_readable_state(js, stream_obj);
  ant_value_t normalized = 0;

  if (!is_object_type(state)) return js_false;
  if (js_truthy(js, js_get(js, stream_obj, "destroyed"))) return js_false;

  if (is_null(chunk)) {
    js_set(js, state, "ended", js_true);
    stream_readable_flush(js, stream_obj);
    return js_false;
  }

  normalized = stream_normalize_chunk(
    js, chunk,
    js_truthy(js, js_get(js, state, "objectMode")),
    is_undefined(encoding) ? js_mkstr(js, "utf8", 4) : encoding
  );
  if (is_err(normalized)) return normalized;

  stream_buffer_push(js, stream_obj, normalized);
  if (js_truthy(js, js_get(js, state, "flowing"))) stream_readable_flush(js, stream_obj);
  
  return js_bool(js_truthy(js, js_get(js, state, "flowing")));
}

static ant_value_t js_readable_push(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Readable");
  ant_value_t chunk = nargs > 0 ? args[0] : js_mkundef();
  ant_value_t encoding = nargs > 1 ? args[1] : js_mkundef();
  if (is_err(stream_obj)) return stream_obj;
  return stream_readable_push_value(js, stream_obj, chunk, encoding);
}

static ant_value_t js_readable_read(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Readable");
  ant_value_t state = 0;
  ant_value_t chunk = 0;

  if (is_err(stream_obj)) return stream_obj;
  state = stream_readable_state(js, stream_obj);
  if (!is_object_type(state)) return js_mknull();

  if (stream_readable_buffer_len(js, stream_obj) == 0) stream_readable_maybe_read(js, stream_obj);
  if (stream_readable_buffer_len(js, stream_obj) == 0) return js_mknull();

  chunk = stream_buffer_shift(js, stream_obj);
  chunk = stream_readable_decode_chunk(js, stream_obj, chunk, false);
  if (is_err(chunk)) return chunk;
  if (js_truthy(js, js_get(js, state, "flowing"))) stream_readable_flush(js, stream_obj);
  
  return chunk;
}

static ant_value_t js_readable_set_encoding(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Readable");
  ant_value_t state = 0; ant_value_t decoder = 0;

  ant_value_t encoding = nargs > 0 && !is_undefined(args[0]) ? args[0] : js_mkstr(js, "utf8", 4);
  ant_value_t encoding_str = 0;

  if (is_err(stream_obj)) return stream_obj;
  state = stream_readable_state(js, stream_obj);
  if (!is_object_type(state)) return stream_obj;

  decoder = string_decoder_create(js, encoding);
  if (is_err(decoder)) return decoder;
  encoding_str = js_tostring_val(js, encoding);
  if (is_err(encoding_str)) return encoding_str;

  js_set(js, state, "decoder", decoder);
  js_set(js, stream_obj, "encoding", encoding_str);
  js_set(js, stream_obj, "readableEncoding", encoding_str);
  
  return stream_obj;
}

static ant_value_t js_readable_on(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Readable");
  ant_value_t key = 0;
  ant_value_t state = 0;

  if (is_err(stream_obj)) return stream_obj;
  if (nargs < 2) return js_mkerr(js, "on requires 2 arguments (event, listener)");
  key = stream_event_key(js, args[0]);
  
  if (is_err(key)) return key;
  if (!eventemitter_add_listener_val(js, stream_obj, key, args[1], false))
    return js_mkerr(js, "listener must be a function");
    
  if (stream_key_is_cstr(js, key, "data")) {
    state = stream_readable_state(js, stream_obj);
    if (is_object_type(state)) js_set(js, state, "flowing", js_true);
    stream_schedule_microtask(js, stream_readable_start_flowing, stream_obj);
  }

  return stream_obj;
}

static ant_value_t js_readable_resume(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Readable");
  ant_value_t state = 0;
  if (is_err(stream_obj)) return stream_obj;

  state = stream_readable_state(js, stream_obj);
  if (is_object_type(state)) js_set(js, state, "flowing", js_true);
  
  js_stream_resume(js, NULL, 0);
  stream_readable_maybe_read(js, stream_obj);
  stream_readable_flush(js, stream_obj);
  
  return stream_obj;
}

static ant_value_t js_readable_pause(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Readable");
  ant_value_t state = 0;
  if (is_err(stream_obj)) return stream_obj;

  state = stream_readable_state(js, stream_obj);
  if (is_object_type(state)) js_set(js, state, "flowing", js_false);
  return js_stream_pause(js, args, nargs);
}

static ant_value_t js_writable__write(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t callback = nargs > 2 ? args[2] : js_mkundef();
  stream_call_callback(js, callback, NULL, 0);
  return js_mkundef();
}

static ant_value_t js_writable__final(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t callback = nargs > 0 ? args[0] : js_mkundef();
  stream_call_callback(js, callback, NULL, 0);
  return js_mkundef();
}

static ant_value_t stream_writable_write_done(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
  ant_value_t stream_obj = js_get(js, state_obj, "stream");
  ant_value_t callback = js_get(js, state_obj, "callback");
  
  stream_private_state_t *priv = stream_private_state(stream_obj);
  ant_value_t err = nargs > 0 ? args[0] : js_mkundef();

  if (priv) priv->writing = false;

  if (!is_undefined(err) && !is_null(err)) {
    ant_value_t destroy_args[1] = { err };
    js_set(js, state_obj, "done", js_true);
    if (priv) priv->pending_final = false; {
      ant_value_t saved_this = js->this_val;
      js->this_val = stream_obj;
      js_stream_destroy(js, destroy_args, 1);
      js->this_val = saved_this;
    }
    
    if (is_callable(callback)) stream_call_callback(js, callback, &err, 1);
    return js_mkundef();
  }

  if (is_callable(callback)) stream_call_callback(js, callback, NULL, 0);
  stream_emit_named(js, stream_obj, "drain");
  
  if (priv && priv->pending_final && !priv->final_started) {
    ant_value_t end_callback = js_get_slot(stream_obj, SLOT_AUX);
    priv->pending_final = false;
    stream_set_end_callback(js, stream_obj, js_mkundef());
    return stream_writable_begin_end(js, stream_obj, end_callback);
  }
  
  return js_mkundef();
}

static ant_value_t stream_writable_write_impl(
  ant_t *js,
  ant_value_t stream_obj,
  ant_value_t chunk,
  ant_value_t encoding,
  ant_value_t callback,
  bool allow_after_end
) {
  ant_value_t state = 0;
  ant_value_t normalized = 0;
  ant_value_t write_fn = 0;
  ant_value_t done_state = 0;
  ant_value_t done = 0;
  ant_value_t write_args[3];

  state = stream_writable_state(js, stream_obj);
  if (!is_object_type(state)) return js_false;

  if (
    (!allow_after_end && js_truthy(js, js_get(js, stream_obj, "writableEnded"))) ||
    js_truthy(js, js_get(js, stream_obj, "destroyed"))
  ) {
    ant_value_t err = js_mkerr(js, "write after end");
    if (is_callable(callback)) stream_call_callback(js, callback, &err, 1);
    else stream_emit_error(js, stream_obj, err);
    return js_false;
  }

  normalized = stream_normalize_chunk(
    js, chunk,
    js_truthy(js, js_get(js, state, "objectMode")),
    encoding
  );
  
  if (is_err(normalized)) return normalized;
  done_state = js_mkobj(js);
  
  js_set(js, done_state, "stream", stream_obj);
  js_set(js, done_state, "callback", callback);
  js_set(js, done_state, "done", js_false);
  
  done = stream_make_once(js, js_heavy_mkfun(js, stream_writable_write_done, done_state), js_mkundef());
  write_fn = js_getprop_fallback(js, stream_obj, "_write");
  stream_private_state_t *priv = stream_private_state(stream_obj);
  if (priv) priv->writing = true;

  write_args[0] = normalized;
  write_args[1] = encoding;
  write_args[2] = done;
  
  if (is_callable(write_fn)) {
  ant_value_t result = stream_call(js, write_fn, stream_obj, write_args, 3, false);
  if (is_err(result)) {
    ant_value_t err_args[1] = { result };
    stream_call_callback(js, done, err_args, 1);
    return js_false;
  }}

  return js_bool(!js_truthy(js, js_get(js, stream_obj, "destroyed")));
}

static ant_value_t js_writable_write(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Writable");
  ant_value_t callback = js_mkundef();
  ant_value_t encoding = js_mkstr(js, "utf8", 4);

  if (is_err(stream_obj)) return stream_obj;

  if (nargs > 1 && is_callable(args[1])) callback = args[1];
  else if (nargs > 1 && !is_undefined(args[1])) encoding = args[1];
  if (nargs > 2 && is_callable(args[2])) callback = args[2];

  return stream_writable_write_impl(
    js, stream_obj,
    nargs > 0 ? args[0] : js_mkundef(),
    encoding, callback, false
  );
}

static ant_value_t stream_writable_end_done(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
  ant_value_t stream_obj = js_get(js, state_obj, "stream");
  ant_value_t callback = js_get(js, state_obj, "callback");
  ant_value_t err = nargs > 0 ? args[0] : js_mkundef();

  if (!is_undefined(err) && !is_null(err)) {
    ant_value_t destroy_args[1] = { err };
    ant_value_t saved_this = js->this_val;
    js->this_val = stream_obj;
    js_stream_destroy(js, destroy_args, 1);
    js->this_val = saved_this;
    if (is_callable(callback)) stream_call_callback(js, callback, &err, 1);
    return js_mkundef();
  }

  js_set(js, stream_obj, "writableFinished", js_true);
  js_set(js, stream_writable_state(js, stream_obj), "finished", js_true);
  stream_emit_named(js, stream_obj, "finish");
  
  if (is_callable(callback)) stream_call_callback(js, callback, NULL, 0);
  if (!js_truthy(js, js_get(js, stream_obj, "readable"))) stream_emit_named(js, stream_obj, "close");
  
  return js_mkundef();
}

ant_value_t stream_writable_begin_end(ant_t *js, ant_value_t stream_obj, ant_value_t callback) {
  ant_value_t final_fn = 0;
  ant_value_t final_args[1];
  ant_value_t done_state = 0;
  ant_value_t done = 0;
  stream_private_state_t *priv = stream_private_state(stream_obj);

  done_state = js_mkobj(js);
  js_set(js, done_state, "stream", stream_obj);
  js_set(js, done_state, "callback", callback);
  
  done = stream_make_once(js, js_heavy_mkfun(js, stream_writable_end_done, done_state), js_mkundef());
  if (priv) {
    priv->final_started = true;
    priv->pending_final = false;
  }
  
  stream_set_end_callback(js, stream_obj, js_mkundef());
  final_fn = js_getprop_fallback(js, stream_obj, "_final");
  final_args[0] = done;
  
  if (is_callable(final_fn)) stream_call(js, final_fn, stream_obj, final_args, 1, false);
  else stream_call_callback(js, done, NULL, 0);

  return stream_obj;
}

static ant_value_t stream_writable_end_after_write(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
  ant_value_t stream_obj = js_get(js, state_obj, "stream");
  ant_value_t callback = js_get(js, state_obj, "callback");
  stream_private_state_t *priv = stream_private_state(stream_obj);
  ant_value_t err = nargs > 0 ? args[0] : js_mkundef();

  if (!is_undefined(err) && !is_null(err)) {
    ant_value_t destroy_args[1] = { err };
    ant_value_t saved_this = js->this_val;
    js->this_val = stream_obj;
    js_stream_destroy(js, destroy_args, 1);
    js->this_val = saved_this;
    if (is_callable(callback)) stream_call_callback(js, callback, &err, 1);
    return js_mkundef();
  }

  if (priv) priv->pending_final = false;
  stream_set_end_callback(js, stream_obj, js_mkundef());

  return stream_writable_begin_end(js, stream_obj, callback);
}

static ant_value_t js_writable_end(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Writable");
  ant_value_t callback = js_mkundef();
  ant_value_t chunk = js_mkundef();
  ant_value_t encoding = js_mkundef();
  ant_value_t after_write_state = 0;
  ant_value_t after_write = 0;
  stream_private_state_t *priv = NULL;

  if (is_err(stream_obj)) return stream_obj;
  if (nargs > 0 && is_callable(args[0])) callback = args[0];
  else {
    if (nargs > 0) chunk = args[0];
    if (nargs > 1 && is_callable(args[1])) callback = args[1];
    else if (nargs > 1) encoding = args[1];
    if (nargs > 2 && is_callable(args[2])) callback = args[2];
  }

  if (js_truthy(js, js_get(js, stream_obj, "writableEnded"))) {
    if (is_callable(callback)) stream_call_callback(js, callback, NULL, 0);
    return stream_obj;
  }

  js_set(js, stream_obj, "writableEnded", js_true);
  js_set(js, stream_writable_state(js, stream_obj), "ended", js_true);
  priv = stream_private_state(stream_obj);

  if (!is_undefined(chunk) && !is_null(chunk)) {
    after_write_state = js_mkobj(js);
    js_set(js, after_write_state, "stream", stream_obj);
    js_set(js, after_write_state, "callback", callback);
    
    after_write = stream_make_once(
      js, js_heavy_mkfun(js, stream_writable_end_after_write, after_write_state),
      js_mkundef()
    );
    
    stream_writable_write_impl(js, stream_obj, chunk, encoding, after_write, true);
    return stream_obj;
  }

  if (priv && priv->writing && !priv->final_started) {
    priv->pending_final = true;
    stream_set_end_callback(js, stream_obj, callback);
    return stream_obj;
  }

  return stream_writable_begin_end(js, stream_obj, callback);
}

static ant_value_t js_transform__transform(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t callback = nargs > 2 ? args[2] : js_mkundef();
  ant_value_t cb_args[2];
  cb_args[0] = js_mknull();
  cb_args[1] = nargs > 0 ? args[0] : js_mkundef();
  stream_call_callback(js, callback, cb_args, 2);
  return js_mkundef();
}

static ant_value_t stream_transform_write_callback(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
  ant_value_t stream_obj = js_get(js, state_obj, "stream");
  ant_value_t outer_callback = js_get(js, state_obj, "callback");

  if (nargs > 0 && !is_null(args[0]) && !is_undefined(args[0])) {
    if (is_callable(outer_callback)) stream_call(js, outer_callback, stream_obj, &args[0], 1, false);
    return js_mkundef();
  }

  if (nargs > 1 && !is_null(args[1]) && !is_undefined(args[1]))
    stream_readable_push_value(js, stream_obj, args[1], js_mkundef());

  if (is_callable(outer_callback)) stream_call(js, outer_callback, stream_obj, NULL, 0, false);
  return js_mkundef();
}

static ant_value_t js_transform__write(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Transform");
  ant_value_t transform_fn = 0;
  ant_value_t cb_state = 0;
  ant_value_t cb = 0;
  ant_value_t call_args[3];

  if (is_err(stream_obj)) return stream_obj;
  transform_fn = js_getprop_fallback(js, stream_obj, "_transform");

  cb_state = js_mkobj(js);
  js_set(js, cb_state, "stream", stream_obj);
  js_set(js, cb_state, "callback", nargs > 2 ? args[2] : js_mkundef());
  cb = js_heavy_mkfun(js, stream_transform_write_callback, cb_state);

  call_args[0] = nargs > 0 ? args[0] : js_mkundef();
  call_args[1] = nargs > 1 ? args[1] : js_mkstr(js, "utf8", 4);
  call_args[2] = cb;

  return stream_call(js, transform_fn, stream_obj, call_args, 3, false);
}

static ant_value_t stream_transform_final_callback(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
  ant_value_t stream_obj = js_get(js, state_obj, "stream");
  ant_value_t callback = js_get(js, state_obj, "callback");

  if (nargs > 0 && !is_null(args[0]) && !is_undefined(args[0])) {
    if (is_callable(callback)) stream_call(js, callback, stream_obj, &args[0], 1, false);
    return js_mkundef();
  }

  if (nargs > 1 && !is_null(args[1]) && !is_undefined(args[1]))
    stream_readable_push_value(js, stream_obj, args[1], js_mkundef());
  stream_readable_push_value(js, stream_obj, js_mknull(), js_mkundef());
  if (is_callable(callback)) stream_call(js, callback, stream_obj, NULL, 0, false);
  
  return js_mkundef();
}

static ant_value_t js_transform__final(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "Transform");
  ant_value_t flush_fn = 0;
  ant_value_t cb_state = 0;
  ant_value_t cb = 0;
  ant_value_t call_args[1];

  if (is_err(stream_obj)) return stream_obj;
  flush_fn = js_getprop_fallback(js, stream_obj, "_flush");
  
  if (!is_callable(flush_fn)) {
    stream_readable_push_value(js, stream_obj, js_mknull(), js_mkundef());
    stream_call_callback(js, nargs > 0 ? args[0] : js_mkundef(), NULL, 0);
    return js_mkundef();
  }

  cb_state = js_mkobj(js);
  js_set(js, cb_state, "stream", stream_obj);
  js_set(js, cb_state, "callback", nargs > 0 ? args[0] : js_mkundef());
  cb = js_heavy_mkfun(js, stream_transform_final_callback, cb_state);
  call_args[0] = cb;
  return stream_call(js, flush_fn, stream_obj, call_args, 1, false);
}

static ant_value_t js_passthrough__transform(ant_t *js, ant_value_t *args, int nargs) {
  return js_transform__transform(js, args, nargs);
}

static ant_value_t stream_finished_cleanup(ant_t *js, ant_value_t state_obj) {
  ant_value_t stream_obj = js_get(js, state_obj, "stream");
  if (stream_is_instance(stream_obj) || is_object_type(stream_obj)) {
    stream_remove_listener(js, stream_obj, "end", js_get(js, state_obj, "onFinish"));
    stream_remove_listener(js, stream_obj, "finish", js_get(js, state_obj, "onFinish"));
    stream_remove_listener(js, stream_obj, "close", js_get(js, state_obj, "onFinish"));
    stream_remove_listener(js, stream_obj, "error", js_get(js, state_obj, "onError"));
  }
  return js_mkundef();
}

static ant_value_t stream_finished_fire(ant_t *js, ant_value_t state_obj, ant_value_t error) {
  ant_value_t called = js_get(js, state_obj, "called");
  ant_value_t callback = js_get(js, state_obj, "callback");
  ant_value_t cb_args[1];

  if (js_truthy(js, called)) return js_mkundef();
  js_set(js, state_obj, "called", js_true);
  stream_finished_cleanup(js, state_obj);

  if (is_undefined(error)) stream_call_callback(js, callback, NULL, 0);
  else {
    cb_args[0] = error;
    stream_call_callback(js, callback, cb_args, 1);
  }
  return js_mkundef();
}

static ant_value_t stream_finished_on_finish(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
  return stream_finished_fire(js, state_obj, js_mkundef());
}

static ant_value_t stream_finished_on_error(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
  ant_value_t error = nargs > 0 ? args[0] : js_mkundef();
  return stream_finished_fire(js, state_obj, error);
}

static ant_value_t stream_finished_register(ant_t *js, ant_value_t stream_obj, ant_value_t callback) {
  ant_value_t state_obj = js_mkobj(js);
  ant_value_t on_finish = 0;
  ant_value_t on_error = 0;

  if (!is_callable(callback)) callback = js_mkfun(stream_noop);

  js_set(js, state_obj, "stream", stream_obj);
  js_set(js, state_obj, "callback", callback);
  js_set(js, state_obj, "called", js_false);
  on_finish = js_heavy_mkfun(js, stream_finished_on_finish, state_obj);
  on_error = js_heavy_mkfun(js, stream_finished_on_error, state_obj);
  js_set(js, state_obj, "onFinish", on_finish);
  js_set(js, state_obj, "onError", on_error);

  eventemitter_add_listener(js, stream_obj, "end", on_finish, false);
  eventemitter_add_listener(js, stream_obj, "finish", on_finish, false);
  eventemitter_add_listener(js, stream_obj, "close", on_finish, false);
  eventemitter_add_listener(js, stream_obj, "error", on_error, false);
  return stream_obj;
}

static ant_value_t js_stream_finished(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t callback = nargs > 1 ? args[1] : js_mkundef();
  if (nargs < 1 || !is_object_type(args[0])) return js_mkerr(js, "finished requires a stream");
  return stream_finished_register(js, args[0], callback);
}

static ant_value_t stream_pipeline_done(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
  ant_value_t callback = js_get(js, state_obj, "callback");
  ant_value_t called = js_get(js, state_obj, "called");
  ant_value_t cb_args[1];

  if (js_truthy(js, called)) return js_mkundef();
  js_set(js, state_obj, "called", js_true);

  if (nargs > 0 && !is_undefined(args[0])) {
    cb_args[0] = args[0];
    stream_call_callback(js, callback, cb_args, 1);
  } else stream_call_callback(js, callback, NULL, 0);
  return js_mkundef();
}

static ant_value_t stream_pipeline_error(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t done = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
  if (nargs > 0 && !is_undefined(args[0])) stream_call_callback(js, done, &args[0], 1);
  return js_mkundef();
}

static ant_value_t stream_pipeline_schedule_done(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t done = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
  stream_call_callback(js, done, NULL, 0);
  return js_mkundef();
}

static ant_value_t js_stream_pipeline(ant_t *js, ant_value_t *args, int nargs) {
  int stream_count = nargs;
  ant_value_t callback = js_mkundef();
  ant_value_t done_state = 0;
  ant_value_t done = 0;

  if (nargs > 0 && is_callable(args[nargs - 1])) {
    callback = args[nargs - 1];
    stream_count--;
  }
  
  if (!is_callable(callback)) callback = js_mkfun(stream_noop);
  if (stream_count <= 0) return js_mkundef();

  done_state = js_mkobj(js);
  js_set(js, done_state, "callback", callback);
  js_set(js, done_state, "called", js_false);
  done = js_heavy_mkfun(js, stream_pipeline_done, done_state);

  if (stream_count < 2) {
    stream_schedule_microtask(js, stream_pipeline_schedule_done, done);
    return args[0];
  }

  for (int i = 0; i < stream_count - 1; i++) {
    ant_value_t error_cb = js_heavy_mkfun(js, stream_pipeline_error, done);
    ant_value_t finished_args[2];
    
    finished_args[0] = args[i];
    finished_args[1] = error_cb;
    js_stream_finished(js, finished_args, 2);
    
    ant_value_t pipe_args[2];
    pipe_args[0] = args[i + 1];
    pipe_args[1] = js_mkundef();
    stream_call_prop(js, args[i], "pipe", pipe_args, 2);
  }

  {
    ant_value_t finished_args[2];
    finished_args[0] = args[stream_count - 1];
    finished_args[1] = done;
    js_stream_finished(js, finished_args, 2);
  }

  return args[stream_count - 1];
}

static ant_value_t stream_promise_callback(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t promise = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
  if (nargs > 0 && !is_undefined(args[0])) js_reject_promise(js, promise, args[0]);
  else js_resolve_promise(js, promise, js_mkundef());
  return js_mkundef();
}

static ant_value_t js_stream_promises_finished(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t promise = js_mkpromise(js);
  ant_value_t finished_args[2];
  if (nargs < 1 || !is_object_type(args[0])) {
    js_reject_promise(js, promise, js_mkerr(js, "finished requires a stream"));
    return promise;
  }
  finished_args[0] = args[0];
  finished_args[1] = js_heavy_mkfun(js, stream_promise_callback, promise);
  js_stream_finished(js, finished_args, 2);
  return promise;
}

static ant_value_t js_stream_promises_pipeline(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t promise = js_mkpromise(js);
  ant_value_t *call_args = NULL;

  if (nargs <= 0) {
    js_resolve_promise(js, promise, js_mkundef());
    return promise;
  }

  call_args = malloc((size_t)(nargs + 1) * sizeof(*call_args));
  if (!call_args) {
    js_reject_promise(js, promise, js_mkerr(js, "out of memory"));
    return promise;
  }

  for (int i = 0; i < nargs; i++) call_args[i] = args[i];
  call_args[nargs] = js_heavy_mkfun(js, stream_promise_callback, promise);
  js_stream_pipeline(js, call_args, nargs + 1);
  free(call_args);
  return promise;
}

static void stream_release_reader(ant_t *js, ant_value_t state_obj) {
  ant_value_t reader = js_get(js, state_obj, "reader");
  if (!is_object_type(reader)) return;
  stream_call_prop(js, reader, "releaseLock", NULL, 0);
}

static ant_value_t stream_readable_from_step(ant_t *js, ant_value_t *args, int nargs);

static void stream_readable_from_schedule(ant_t *js, ant_value_t state_obj) {
  stream_schedule_microtask(js, stream_readable_from_step, state_obj);
}

static ant_value_t stream_readable_from_fail(ant_t *js, ant_value_t state_obj, ant_value_t error) {
  ant_value_t readable = js_get(js, state_obj, "readable");
  stream_release_reader(js, state_obj);
  if (stream_is_instance(readable)) {
    ant_value_t destroy_args[1] = { error };
    ant_value_t saved_this = js->this_val;
    js->this_val = readable;
    js_stream_destroy(js, destroy_args, 1);
    js->this_val = saved_this;
  }
  return js_mkundef();
}

static ant_value_t stream_readable_from_handle_result(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
  ant_value_t readable = js_get(js, state_obj, "readable");
  ant_value_t result = nargs > 0 ? args[0] : js_mkundef();
  ant_value_t done = 0;
  ant_value_t value = 0;

  if (js_truthy(js, js_get(js, readable, "destroyed"))) {
    stream_release_reader(js, state_obj);
    return js_mkundef();
  }

  if (!is_object_type(result)) return stream_readable_from_fail(js, state_obj, js_mkerr(js, "iterator step must be an object"));
  done = js_get(js, result, "done");
  value = js_get(js, result, "value");
  if (js_truthy(js, done)) {
    stream_release_reader(js, state_obj);
    stream_readable_push_value(js, readable, js_mknull(), js_mkundef());
    return js_mkundef();
  }

  stream_readable_push_value(js, readable, value, js_mkundef());
  if (!js_truthy(js, js_get(js, readable, "destroyed"))) stream_readable_from_schedule(js, state_obj);
  return js_mkundef();
}

static ant_value_t stream_readable_from_reject(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
  ant_value_t error = nargs > 0 ? args[0] : js_mkerr(js, "stream iteration failed");
  return stream_readable_from_fail(js, state_obj, error);
}

static ant_value_t stream_readable_from_step(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
  ant_value_t readable = js_get(js, state_obj, "readable");
  ant_value_t mode = js_get(js, state_obj, "mode");
  ant_value_t iterator = js_get(js, state_obj, "iterator");
  
  ant_value_t next_result = js_mkundef();
  ant_value_t on_resolve = js_heavy_mkfun(js, stream_readable_from_handle_result, state_obj);
  ant_value_t on_reject = js_heavy_mkfun(js, stream_readable_from_reject, state_obj);
  ant_value_t then_result = 0;

  if (js_truthy(js, js_get(js, readable, "destroyed"))) {
    stream_release_reader(js, state_obj);
    return js_mkundef();
  }

  if (stream_key_is_cstr(js, mode, "reader")) next_result = stream_call_prop(js, iterator, "read", NULL, 0);
  else next_result = stream_call_prop(js, iterator, "next", NULL, 0);

  if (is_err(next_result)) return stream_readable_from_fail(js, state_obj, next_result);
  if (vtype(next_result) == T_PROMISE) {
    then_result = js_promise_then(js, next_result, on_resolve, on_reject);
    promise_mark_handled(then_result);
    return js_mkundef();
  }

  ant_value_t one_arg[1] = { next_result };
  return stream_readable_from_handle_result(js, one_arg, 1);
}

static ant_value_t stream_readable_from_start(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t state_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA);
  ant_value_t readable = js_get(js, state_obj, "readable");
  ant_value_t source = js_get(js, state_obj, "source");
  
  ant_value_t async_iter_fn = 0;
  ant_value_t reader_fn = 0;
  js_iter_t it;

  if (js_truthy(js, js_get(js, readable, "destroyed"))) return js_mkundef();

  async_iter_fn = is_object_type(source) ? js_get_sym(js, source, get_asyncIterator_sym()) : js_mkundef();
  if (is_callable(async_iter_fn)) {
    ant_value_t iterator = stream_call(js, async_iter_fn, source, NULL, 0, false);
    if (is_err(iterator)) return stream_readable_from_fail(js, state_obj, iterator);
    js_set(js, state_obj, "iterator", iterator);
    js_set(js, state_obj, "mode", js_mkstr(js, "async", 5));
    stream_readable_from_schedule(js, state_obj);
    return js_mkundef();
  }

  if (js_iter_open(js, source, &it)) {
    ant_value_t value = 0;
    while (js_iter_next(js, &it, &value)) {
      if (js_truthy(js, js_get(js, readable, "destroyed"))) break;
      stream_readable_push_value(js, readable, value, js_mkundef());
    }
    js_iter_close(js, &it);
    if (!js_truthy(js, js_get(js, readable, "destroyed")))
      stream_readable_push_value(js, readable, js_mknull(), js_mkundef());
    return js_mkundef();
  }

  reader_fn = is_object_type(source) ? js_get(js, source, "getReader") : js_mkundef();
  if (is_callable(reader_fn)) {
    ant_value_t reader = stream_call(js, reader_fn, source, NULL, 0, false);
    if (is_err(reader)) return stream_readable_from_fail(js, state_obj, reader);
    js_set(js, state_obj, "reader", reader);
    js_set(js, state_obj, "iterator", reader);
    js_set(js, state_obj, "mode", js_mkstr(js, "reader", 6));
    stream_readable_from_schedule(js, state_obj);
    return js_mkundef();
  }

  if (!is_undefined(source)) stream_readable_push_value(js, readable, source, js_mkundef());
  stream_readable_push_value(js, readable, js_mknull(), js_mkundef());
  
  return js_mkundef();
}

static ant_value_t js_readable_from(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t ctor_args[1];
  ant_value_t readable = 0;
  ant_value_t state_obj = 0;

  ctor_args[0] = nargs > 1 ? args[1] : js_mkundef();
  readable = stream_construct(js, g_readable_proto, ctor_args[0], stream_init_readable);
  if (is_err(readable)) return readable;

  state_obj = js_mkobj(js);
  js_set(js, state_obj, "readable", readable);
  js_set(js, state_obj, "source", nargs > 0 ? args[0] : js_mkundef());
  js_set(js, state_obj, "iterator", js_mkundef());
  js_set(js, state_obj, "reader", js_mkundef());
  js_set(js, state_obj, "mode", js_mkundef());
  stream_schedule_microtask(js, stream_readable_from_start, state_obj);
  
  return readable;
}

static ant_value_t js_readable_from_web(ant_t *js, ant_value_t *args, int nargs) {
  return js_readable_from(js, args, nargs);
}

static ant_value_t js_stream_ctor(ant_t *js, ant_value_t *args, int nargs) {
  return stream_construct(js, g_stream_proto, nargs > 0 ? args[0] : js_mkundef(), stream_init_base);
}

static ant_value_t js_readable_ctor(ant_t *js, ant_value_t *args, int nargs) {
  return stream_construct(js, g_readable_proto, nargs > 0 ? args[0] : js_mkundef(), stream_init_readable);
}

static ant_value_t js_writable_ctor(ant_t *js, ant_value_t *args, int nargs) {
  return stream_construct(js, g_writable_proto, nargs > 0 ? args[0] : js_mkundef(), stream_init_writable);
}

static ant_value_t js_duplex_ctor(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t proto = js_instance_proto_from_new_target(js, g_duplex_proto);
  ant_value_t obj = stream_make_base_object(js, is_object_type(proto) ? proto : g_duplex_proto);
  ant_value_t options = nargs > 0 ? args[0] : js_mkundef();
  ant_value_t options_obj = is_object_type(options) ? options : js_mkobj(js);

  stream_init_readable(js, obj, options);
  stream_init_writable(js, obj, options);
  
  js_set(js, obj, "readable", js_true);
  js_set(js, obj, "writable", js_true);
  js_set(js, obj, "allowHalfOpen", js_bool(stream_get_option(js, options_obj, "allowHalfOpen") != js_false));
  
  return obj;
}

static ant_value_t js_transform_ctor(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t proto = js_instance_proto_from_new_target(js, g_transform_proto);
  ant_value_t obj = stream_make_base_object(js, is_object_type(proto) ? proto : g_transform_proto);
  ant_value_t options = nargs > 0 ? args[0] : js_mkundef();
  ant_value_t options_obj = is_object_type(options) ? options : js_mkobj(js);
  
  ant_value_t transform_fn = 0;
  ant_value_t flush_fn = 0;

  stream_init_readable(js, obj, options);
  stream_init_writable(js, obj, options);
  
  js_set(js, obj, "readable", js_true);
  js_set(js, obj, "writable", js_true);
  js_set(js, obj, "allowHalfOpen", js_bool(stream_get_option(js, options_obj, "allowHalfOpen") != js_false));

  transform_fn = stream_get_option(js, options_obj, "transform");
  flush_fn = stream_get_option(js, options_obj, "flush");
  
  if (is_callable(transform_fn)) js_set(js, obj, "_transform", transform_fn);
  if (is_callable(flush_fn)) js_set(js, obj, "_flush", flush_fn);
  
  return obj;
}

static ant_value_t js_passthrough_ctor(ant_t *js, ant_value_t *args, int nargs) {
  ant_value_t proto = js_instance_proto_from_new_target(js, g_passthrough_proto);
  ant_value_t obj = stream_make_base_object(js, is_object_type(proto) ? proto : g_passthrough_proto);
  ant_value_t options = nargs > 0 ? args[0] : js_mkundef();
  ant_value_t options_obj = is_object_type(options) ? options : js_mkobj(js);

  stream_init_readable(js, obj, options);
  stream_init_writable(js, obj, options);
  
  js_set(js, obj, "readable", js_true);
  js_set(js, obj, "writable", js_true);
  js_set(js, obj, "allowHalfOpen", js_bool(stream_get_option(js, options_obj, "allowHalfOpen") != js_false));
  
  return obj;
}

void stream_init_constructors(ant_t *js) {
  ant_value_t events = 0;
  ant_value_t ee_ctor = 0;
  ant_value_t ee_proto = 0;

  if (g_stream_ctor) return;

  events = events_library(js);
  ee_ctor = js_get(js, events, "EventEmitter");
  ee_proto = js_get(js, ee_ctor, "prototype");

  g_stream_proto = js_mkobj(js);
  js_set_proto_init(g_stream_proto, ee_proto);
  js_set(js, g_stream_proto, "pipe", js_mkfun(js_stream_pipe));
  js_set(js, g_stream_proto, "unpipe", js_mkfun(js_stream_unpipe));
  js_set(js, g_stream_proto, "pause", js_mkfun(js_stream_pause));
  js_set(js, g_stream_proto, "resume", js_mkfun(js_stream_resume));
  js_set(js, g_stream_proto, "isPaused", js_mkfun(js_stream_is_paused));
  js_set(js, g_stream_proto, "destroy", js_mkfun(js_stream_destroy));
  js_set_sym(js, g_stream_proto, get_toStringTag_sym(), js_mkstr(js, "Stream", 6));
  g_stream_ctor = js_make_ctor(js, js_stream_ctor, g_stream_proto, "Stream", 6);

  g_readable_proto = js_mkobj(js);
  js_set_proto_init(g_readable_proto, g_stream_proto);
  js_set(js, g_readable_proto, "_read", js_mkfun(js_readable__read));
  js_set(js, g_readable_proto, "push", js_mkfun(js_readable_push));
  js_set(js, g_readable_proto, "read", js_mkfun(js_readable_read));
  js_set(js, g_readable_proto, "setEncoding", js_mkfun(js_readable_set_encoding));
  js_set(js, g_readable_proto, "on", js_mkfun(js_readable_on));
  js_set(js, g_readable_proto, "resume", js_mkfun(js_readable_resume));
  js_set(js, g_readable_proto, "pause", js_mkfun(js_readable_pause));
  js_set_sym(js, g_readable_proto, get_toStringTag_sym(), js_mkstr(js, "Readable", 8));
  g_readable_ctor = js_make_ctor(js, js_readable_ctor, g_readable_proto, "Readable", 8);
  js_set(js, g_readable_ctor, "from", js_mkfun(js_readable_from));
  js_set(js, g_readable_ctor, "fromWeb", js_mkfun(js_readable_from_web));

  g_writable_proto = js_mkobj(js);
  js_set_proto_init(g_writable_proto, g_stream_proto);
  js_set(js, g_writable_proto, "_write", js_mkfun(js_writable__write));
  js_set(js, g_writable_proto, "_final", js_mkfun(js_writable__final));
  js_set(js, g_writable_proto, "write", js_mkfun(js_writable_write));
  js_set(js, g_writable_proto, "end", js_mkfun(js_writable_end));
  js_set(js, g_writable_proto, "cork", js_mkfun(stream_noop));
  js_set(js, g_writable_proto, "uncork", js_mkfun(stream_noop));
  js_set_sym(js, g_writable_proto, get_toStringTag_sym(), js_mkstr(js, "Writable", 8));
  g_writable_ctor = js_make_ctor(js, js_writable_ctor, g_writable_proto, "Writable", 8);

  g_duplex_proto = js_mkobj(js);
  js_set_proto_init(g_duplex_proto, g_readable_proto);
  js_set(js, g_duplex_proto, "_write", js_mkfun(js_writable__write));
  js_set(js, g_duplex_proto, "_final", js_mkfun(js_writable__final));
  js_set(js, g_duplex_proto, "write", js_mkfun(js_writable_write));
  js_set(js, g_duplex_proto, "end", js_mkfun(js_writable_end));
  js_set(js, g_duplex_proto, "cork", js_mkfun(stream_noop));
  js_set(js, g_duplex_proto, "uncork", js_mkfun(stream_noop));
  js_set_sym(js, g_duplex_proto, get_toStringTag_sym(), js_mkstr(js, "Duplex", 6));
  g_duplex_ctor = js_make_ctor(js, js_duplex_ctor, g_duplex_proto, "Duplex", 6);

  g_transform_proto = js_mkobj(js);
  js_set_proto_init(g_transform_proto, g_duplex_proto);
  js_set(js, g_transform_proto, "_transform", js_mkfun(js_transform__transform));
  js_set(js, g_transform_proto, "_write", js_mkfun(js_transform__write));
  js_set(js, g_transform_proto, "_final", js_mkfun(js_transform__final));
  js_set_sym(js, g_transform_proto, get_toStringTag_sym(), js_mkstr(js, "Transform", 9));
  g_transform_ctor = js_make_ctor(js, js_transform_ctor, g_transform_proto, "Transform", 9);

  g_passthrough_proto = js_mkobj(js);
  js_set_proto_init(g_passthrough_proto, g_transform_proto);
  js_set(js, g_passthrough_proto, "_transform", js_mkfun(js_passthrough__transform));
  js_set_sym(js, g_passthrough_proto, get_toStringTag_sym(), js_mkstr(js, "PassThrough", 11));
  g_passthrough_ctor = js_make_ctor(js, js_passthrough_ctor, g_passthrough_proto, "PassThrough", 11);

  gc_register_root(&g_stream_proto);
  gc_register_root(&g_stream_ctor);
  gc_register_root(&g_readable_proto);
  gc_register_root(&g_readable_ctor);
  gc_register_root(&g_writable_proto);
  gc_register_root(&g_writable_ctor);
  gc_register_root(&g_duplex_proto);
  gc_register_root(&g_duplex_ctor);
  gc_register_root(&g_transform_proto);
  gc_register_root(&g_transform_ctor);
  gc_register_root(&g_passthrough_proto);
  gc_register_root(&g_passthrough_ctor);
}

ant_value_t stream_readable_constructor(ant_t *js) {
  stream_init_constructors(js);
  return g_readable_ctor;
}

ant_value_t stream_writable_constructor(ant_t *js) {
  stream_init_constructors(js);
  return g_writable_ctor;
}

ant_value_t stream_readable_prototype(ant_t *js) {
  stream_init_constructors(js);
  return g_readable_proto;
}

ant_value_t stream_writable_prototype(ant_t *js) {
  stream_init_constructors(js);
  return g_writable_proto;
}

ant_value_t stream_duplex_prototype(ant_t *js) {
  stream_init_constructors(js);
  return g_duplex_proto;
}

ant_value_t stream_construct_readable(ant_t *js, ant_value_t base_proto, ant_value_t options) {
  stream_init_constructors(js);
  return stream_construct(js, base_proto, options, stream_init_readable);
}

ant_value_t stream_construct_writable(ant_t *js, ant_value_t base_proto, ant_value_t options) {
  stream_init_constructors(js);
  return stream_construct(js, base_proto, options, stream_init_writable);
}

void stream_init_readable_object(ant_t *js, ant_value_t obj, ant_value_t options) {
  stream_init_constructors(js);
  if (!is_object_type(obj)) return;
  js_set_native_tag(obj, STREAM_NATIVE_TAG);
  stream_init_readable(js, obj, options);
}

void stream_init_writable_object(ant_t *js, ant_value_t obj, ant_value_t options) {
  stream_init_constructors(js);
  if (!is_object_type(obj)) return;
  js_set_native_tag(obj, STREAM_NATIVE_TAG);
  stream_init_writable(js, obj, options);
}

void stream_init_duplex_object(ant_t *js, ant_value_t obj, ant_value_t options) {
  stream_init_constructors(js);
  if (!is_object_type(obj)) return;
  js_set_native_tag(obj, STREAM_NATIVE_TAG);
  stream_init_readable(js, obj, options);
  stream_init_writable(js, obj, options);
}

ant_value_t stream_readable_push(ant_t *js, ant_value_t stream_obj, ant_value_t chunk, ant_value_t encoding) {
  stream_init_constructors(js);
  return stream_readable_push_value(js, stream_obj, chunk, encoding);
}

static ant_value_t js_stream_get_default_high_water_mark(ant_t *js, ant_value_t *args, int nargs) {
  bool object_mode = nargs > 0 && js_truthy(js, args[0]);
  return js_mknum(stream_default_high_water_mark(object_mode));
}

static ant_value_t js_stream_set_default_high_water_mark(ant_t *js, ant_value_t *args, int nargs) {
  if (nargs < 2 || vtype(args[1]) != T_NUM || js_getnum(args[1]) < 0)
    return js_mkerr_typed(js, JS_ERR_RANGE, "setDefaultHighWaterMark requires a non-negative number");

  bool object_mode = js_truthy(js, args[0]);
  if (object_mode) g_default_object_high_water_mark = js_getnum(args[1]);
  else g_default_high_water_mark = js_getnum(args[1]);

  return js_mkundef();
}

static void stream_web_copy_global(ant_t *js, ant_value_t obj, const char *name) {
  ant_value_t value = js_get(js, js->global, name);
  if (is_err(value)) return;
  js_set(js, obj, name, value);
}

// TODO: remove copy-on-start
static void stream_web_define_common(ant_t *js, ant_value_t obj) {
  stream_web_copy_global(js, obj, "ReadableStream");
  stream_web_copy_global(js, obj, "ReadableStreamDefaultReader");
  stream_web_copy_global(js, obj, "ReadableStreamDefaultController");
  stream_web_copy_global(js, obj, "WritableStream");
  stream_web_copy_global(js, obj, "WritableStreamDefaultWriter");
  stream_web_copy_global(js, obj, "WritableStreamDefaultController");
  stream_web_copy_global(js, obj, "TransformStream");
  stream_web_copy_global(js, obj, "TransformStreamDefaultController");
  stream_web_copy_global(js, obj, "ByteLengthQueuingStrategy");
  stream_web_copy_global(js, obj, "CountQueuingStrategy");
  stream_web_copy_global(js, obj, "TextEncoderStream");
  stream_web_copy_global(js, obj, "TextDecoderStream");
  stream_web_copy_global(js, obj, "CompressionStream");
  stream_web_copy_global(js, obj, "DecompressionStream");
}

ant_value_t stream_library(ant_t *js) {
  ant_value_t lib = js_mkobj(js);
  ant_value_t promises = js_mkobj(js);
  stream_init_constructors(js);

  js_set(js, promises, "pipeline", js_mkfun(js_stream_promises_pipeline));
  js_set(js, promises, "finished", js_mkfun(js_stream_promises_finished));

  js_set_module_default(js, lib, g_stream_ctor, "Stream");
  js_set(js, lib, "Readable", g_readable_ctor);
  js_set(js, lib, "Writable", g_writable_ctor);
  js_set(js, lib, "Duplex", g_duplex_ctor);
  js_set(js, lib, "Transform", g_transform_ctor);
  js_set(js, lib, "PassThrough", g_passthrough_ctor);
  js_set(js, lib, "pipeline", js_mkfun(js_stream_pipeline));
  js_set(js, lib, "finished", js_mkfun(js_stream_finished));
  js_set(js, lib, "getDefaultHighWaterMark", js_mkfun(js_stream_get_default_high_water_mark));
  js_set(js, lib, "setDefaultHighWaterMark", js_mkfun(js_stream_set_default_high_water_mark));
  js_set(js, lib, "promises", promises);

  js_set(js, g_stream_ctor, "Readable", g_readable_ctor);
  js_set(js, g_stream_ctor, "Writable", g_writable_ctor);
  js_set(js, g_stream_ctor, "Duplex", g_duplex_ctor);
  js_set(js, g_stream_ctor, "Transform", g_transform_ctor);
  js_set(js, g_stream_ctor, "PassThrough", g_passthrough_ctor);
  js_set(js, g_stream_ctor, "pipeline", js_get(js, lib, "pipeline"));
  js_set(js, g_stream_ctor, "finished", js_get(js, lib, "finished"));
  js_set(js, g_stream_ctor, "getDefaultHighWaterMark", js_get(js, lib, "getDefaultHighWaterMark"));
  js_set(js, g_stream_ctor, "setDefaultHighWaterMark", js_get(js, lib, "setDefaultHighWaterMark"));
  js_set(js, g_stream_ctor, "promises", promises);

  js_set(js, promises, "default", promises);
  js_set_slot_wb(js, promises, SLOT_DEFAULT, promises);
  js_set_sym(js, lib, get_toStringTag_sym(), js_mkstr(js, "stream", 6));
  
  return lib;
}

ant_value_t stream_promises_library(ant_t *js) {
  ant_value_t stream_ns = js_esm_import_sync_cstr(js, "stream", 6);
  ant_value_t promises = 0;
  
  if (is_err(stream_ns)) return stream_ns;
  promises = js_get(js, stream_ns, "promises");
  
  if (!is_object_type(promises)) return js_mkerr(js, "stream.promises is not available");
  js_set(js, promises, "default", promises);
  js_set_slot_wb(js, promises, SLOT_DEFAULT, promises);
  
  return promises;
}

ant_value_t stream_web_library(ant_t *js) {
  ant_value_t lib = js_mkobj(js);

  stream_web_define_common(js, lib);
  js_set(js, lib, "default", lib);
  js_set_slot_wb(js, lib, SLOT_DEFAULT, lib);
  js_set_sym(js, lib, get_toStringTag_sym(), js_mkstr(js, "stream/web", 10));

  return lib;
}
