const std = @import("std");
const c_allocator = std.heap.c_allocator;

const debug = @import("debug.zig");
const extractor = @import("extractor.zig");
const uv = @import("uv.zig");
const tlsuv = @import("tlsuv.zig");
const nghttp2 = @import("nghttp2.zig");

const config = @import("config");
const user_agent: [:0]const u8 = "ant/" ++ config.version;

pub const FetchError = error{
  ConnectionFailed,
  TlsError,
  Http2Error,
  Timeout,
  InvalidUrl,
  ResponseError,
  OutOfMemory,
};

pub const ParsedUrl = struct {
  scheme: []const u8,
  host: []const u8,
  port: u16,
  path: []const u8,

  pub fn parse(url: []const u8) !ParsedUrl {
    var remaining = url;
    const scheme_end = std.mem.indexOf(u8, remaining, "://") orelse return error.InvalidUrl;
    const scheme = remaining[0..scheme_end];
    remaining = remaining[scheme_end + 3 ..];

    const path_start = std.mem.indexOf(u8, remaining, "/") orelse remaining.len;
    const host_port = remaining[0..path_start];
    remaining = if (path_start < remaining.len) remaining[path_start..] else "/";

    var host: []const u8 = host_port;
    var port: u16 = if (std.mem.eql(u8, scheme, "https")) 443 else 80;

    if (std.mem.indexOf(u8, host_port, ":")) |colon| {
      host = host_port[0..colon];
      port = std.fmt.parseInt(u16, host_port[colon + 1 ..], 10) catch return error.InvalidUrl;
    }

    return .{ .scheme = scheme, .host = host, .port = port, .path = remaining };
  }
};

pub const StreamHandler = struct {
  on_data: *const fn ([]const u8, ?*anyopaque) void,
  on_complete: *const fn (u16, ?*anyopaque) void,
  on_error: *const fn (FetchError, ?*anyopaque) void,
  user_data: ?*anyopaque,
  
  pub fn init(
    on_data: *const fn ([]const u8, ?*anyopaque) void,
    on_complete: *const fn (u16, ?*anyopaque) void,
    on_error: *const fn (FetchError, ?*anyopaque) void,
    user_data: ?*anyopaque,
  ) StreamHandler {
    return .{ .on_data = on_data, .on_complete = on_complete, .on_error = on_error, .user_data = user_data };
  }
};

const PendingRequest = struct {
  url: []const u8,
  handler: ?StreamHandler,
};

const MAX_PENDING_REQUESTS = 20;
const NUM_CONNECTIONS = 6;
const NUM_META_CONNECTIONS = 3;
const META_SLOW_LOG_MS: u64 = 250;

const Http2Client = struct {
  allocator: std.mem.Allocator,
  loop: *uv.loop_t,
  tls: tlsuv.stream_t,
  h2_session: ?*nghttp2.session,
  host: [:0]const u8,
  use_tls: bool,
  connected: i32,
  connect_pending: bool,
  closing: bool,
  write_buf: std.ArrayListUnmanaged(u8),
  requests: [MAX_PENDING_REQUESTS]RequestState,
  request_count: usize,
  requests_done: usize,
  last_response_status_code: u16,

  const RequestState = struct {
    stream_id: i32,
    path: ?[:0]const u8,
    on_data: ?*const fn ([]const u8, ?*anyopaque) void,
    on_complete: ?*const fn (u16, ?*anyopaque) void,
    on_error: ?*const fn (FetchError, ?*anyopaque) void,
    userdata: ?*anyopaque,
    response_body: std.ArrayListUnmanaged(u8),
    status_code: u16,
    done: bool,
    has_error: bool,
    start_ns: u64,
    end_ns: u64,
    bytes: usize,
    content_encoding: ContentEncoding,
  };

  const ContentEncoding = enum {
    identity,
    gzip,
  };

  const alpn_protocols = [_][*:0]const u8{ "h2", "http/1.1" };

  pub fn init(allocator: std.mem.Allocator, host: []const u8, use_tls: bool) !*Http2Client {
    const client = try allocator.create(Http2Client);
    errdefer allocator.destroy(client);

    const host_z = try allocator.dupeZ(u8, host);
    errdefer allocator.free(host_z);

    client.* = .{
      .allocator = allocator,
      .loop = uv.uv_default_loop(),
      .tls = .{},
      .h2_session = null,
      .host = host_z,
      .use_tls = use_tls,
      .connected = 0,
      .connect_pending = false,
      .closing = false,
      .write_buf = .{},
      .requests = undefined,
      .request_count = 0,
      .requests_done = 0,
      .last_response_status_code = 0,
    };

    for (&client.requests) |*req| {
      req.* = .{
        .stream_id = 0,
        .path = null,
        .on_data = null,
        .on_complete = null,
        .on_error = null,
        .userdata = null,
        .response_body = .{},
        .status_code = 0,
        .done = false,
        .has_error = false,
        .start_ns = 0,
        .end_ns = 0,
        .bytes = 0,
        .content_encoding = .identity,
      };
    }

    if (tlsuv.tlsuv_stream_init(client.loop, &client.tls, null) != 0) {
      allocator.free(host_z);
      allocator.destroy(client);
      return error.ConnectionFailed;
    }

    _ = tlsuv.tlsuv_stream_set_hostname(&client.tls, host_z.ptr);
    _ = tlsuv.tlsuv_stream_set_protocols(&client.tls, 2, &alpn_protocols);

    return client;
  }

  pub fn deinit(self: *Http2Client) void {
    self.closing = true;
    self.connect_pending = false;

    for (&self.requests) |*req| {
      req.on_data = null;
      req.on_complete = null;
      req.on_error = null;
      req.userdata = null;
    }

    if (self.connected > 0) {
      self.tls.data = self;
      _ = tlsuv.tlsuv_stream_close(&self.tls, onStreamClose);
      while (self.connected > 0) _ = uv.uv_run(self.loop, uv.RUN_ONCE);
    }

    if (self.h2_session) |session| nghttp2.nghttp2_session_del(session);

    for (&self.requests) |*req| {
      if (req.stream_id != -1) {
        if (req.path) |p| self.allocator.free(p);
        req.response_body.deinit(self.allocator);
      }
    }
    
    self.write_buf.deinit(self.allocator);
    self.allocator.free(self.host);
    self.allocator.destroy(self);
  }

  pub fn resetRequests(self: *Http2Client) void {
    for (self.requests[0..self.request_count]) |*req| {
      if (req.stream_id != -1) {
        if (req.path) |p| self.allocator.free(p);
        req.response_body.deinit(self.allocator);
      }
      req.* = .{
        .stream_id = 0,
        .path = null,
        .on_data = null,
        .on_complete = null,
        .on_error = null,
        .userdata = null,
        .response_body = .{},
        .status_code = 0,
        .done = false,
        .has_error = false,
        .start_ns = 0,
        .end_ns = 0,
        .bytes = 0,
        .content_encoding = .identity,
      };
    }
    self.request_count = 0;
    self.requests_done = 0;
  }

  pub fn hasCapacity(self: *const Http2Client) bool {
    for (self.requests[0..self.request_count]) |req| {
      if (req.stream_id == -1) return true;
    }
    return self.request_count < MAX_PENDING_REQUESTS - 1;
  }

  pub fn recycleCompletedRequests(self: *Http2Client) void {
    if (self.requests_done == 0) return;

    for (self.requests[0..self.request_count]) |*req| {
      if (req.done and req.stream_id != -1) {
        if (req.path) |p| self.allocator.free(p);
        req.response_body.deinit(self.allocator);
        req.path = null;
        req.response_body = .{};
        req.stream_id = -1;
      }
    }
  }

  fn findOrAllocSlot(self: *Http2Client) ?*RequestState {
    for (self.requests[0..self.request_count]) |*req| {
      if (req.stream_id == -1) return req;
    }
    if (self.request_count < MAX_PENDING_REQUESTS) {
      const req = &self.requests[self.request_count];
      self.request_count += 1;
      return req;
    }
    return null;
  }

  fn onStreamClose(handle: *uv.handle_t) callconv(.c) void {
    const tls: *tlsuv.stream_t = @ptrCast(@alignCast(handle));
    const client: *Http2Client = @ptrCast(@alignCast(tls.data));
    client.connected = -2;
    client.connect_pending = false;
  }

  fn findRequest(self: *Http2Client, stream_id: i32) ?*RequestState {
    for (self.requests[0..self.request_count]) |*req| if (req.stream_id == stream_id) return req;
    return null;
  }

  fn h2Send(_: ?*nghttp2.session, data: [*c]const u8, len: usize, _: c_int, ud: ?*anyopaque) callconv(.c) isize {
    const client: *Http2Client = @ptrCast(@alignCast(ud));
    client.write_buf.appendSlice(client.allocator, data[0..len]) catch return nghttp2.ERR_NOMEM;
    return @intCast(len);
  }

  fn h2FrameRecv(_: ?*nghttp2.session, frame: *const nghttp2.frame, ud: ?*anyopaque) callconv(.c) c_int {
    const client: *Http2Client = @ptrCast(@alignCast(ud));
    if (frame.hd.flags & nghttp2.FLAG_END_STREAM != 0) {
      if (client.findRequest(frame.hd.stream_id)) |req| {
        if (!req.done) {
          req.done = true;
          req.end_ns = @intCast(std.time.nanoTimestamp());
          client.requests_done += 1;
          if (req.on_complete) |cb| cb(req.status_code, req.userdata);
        }
      }
    }
    return 0;
  }

  fn h2DataChunk(session: ?*nghttp2.session, _: u8, stream_id: i32, data: [*c]const u8, len: usize, ud: ?*anyopaque) callconv(.c) c_int {
    const client: *Http2Client = @ptrCast(@alignCast(ud));
    const req = client.findRequest(stream_id) orelse return 0;
    if (req.on_data) |cb| cb(data[0..len], req.userdata) else req.response_body.appendSlice(client.allocator, data[0..len]) catch {
      req.has_error = true;
    }; req.bytes += len;
    if (session) |s| _ = nghttp2.nghttp2_session_consume(s, stream_id, len);

    return 0;
  }

  fn h2Header(_: ?*nghttp2.session, frame: *const nghttp2.frame, name: [*c]const u8, namelen: usize, value: [*c]const u8, valuelen: usize, _: u8, ud: ?*anyopaque) callconv(.c) c_int {
    const client: *Http2Client = @ptrCast(@alignCast(ud));
    if (frame.hd.type != nghttp2.HEADERS) return 0;
    const req = client.findRequest(frame.hd.stream_id) orelse return 0;
    if (namelen == 7 and std.mem.eql(u8, name[0..7], ":status"))
      req.status_code = std.fmt.parseInt(u16, value[0..valuelen], 10) catch 0;
    if (std.mem.eql(u8, name[0..namelen], "content-encoding")) {
      if (std.mem.startsWith(u8, value[0..valuelen], "gzip")) {
        req.content_encoding = .gzip;
      }
    }
    return 0;
  }

  fn h2StreamClose(_: ?*nghttp2.session, stream_id: i32, error_code: u32, ud: ?*anyopaque) callconv(.c) c_int {
    const client: *Http2Client = @ptrCast(@alignCast(ud));
    const req = client.findRequest(stream_id) orelse return 0;
    if (!req.done) {
      req.done = true;
      req.end_ns = @intCast(std.time.nanoTimestamp());
      client.requests_done += 1;
      if (error_code != 0) {
        req.has_error = true;
        if (req.on_error) |cb| cb(FetchError.Http2Error, req.userdata);
      } else if (req.on_complete) |cb| cb(req.status_code, req.userdata);
    }
    return 0;
  }

  fn initH2(self: *Http2Client) !void {
    var callbacks: *nghttp2.session_callbacks = undefined;
    if (nghttp2.nghttp2_session_callbacks_new(&callbacks) != 0) return error.Http2Error;
    defer nghttp2.nghttp2_session_callbacks_del(callbacks);

   nghttp2.nghttp2_session_callbacks_set_send_callback2(callbacks, h2Send);
   nghttp2.nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, h2FrameRecv);
   nghttp2.nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, h2DataChunk);
   nghttp2.nghttp2_session_callbacks_set_on_header_callback(callbacks, h2Header);
   nghttp2.nghttp2_session_callbacks_set_on_stream_close_callback(callbacks, h2StreamClose);

    var session: *nghttp2.session = undefined;
    if (nghttp2.nghttp2_session_client_new(&session, callbacks, self) != 0) return error.Http2Error;
    self.h2_session = session;

    var settings = [_]nghttp2.settings_entry{
      .{ .settings_id = nghttp2.SETTINGS_MAX_CONCURRENT_STREAMS, .value = MAX_PENDING_REQUESTS },
      .{ .settings_id = nghttp2.SETTINGS_INITIAL_WINDOW_SIZE, .value = 16 * 1024 * 1024 },
    };
    if (nghttp2.nghttp2_submit_settings(self.h2_session.?, nghttp2.FLAG_NONE, &settings, settings.len) != 0) return error.Http2Error;

    const conn_window_increase: i32 = (16 * 1024 * 1024) - 65535;
    _ = nghttp2.nghttp2_submit_window_update(self.h2_session.?, nghttp2.FLAG_NONE, 0, conn_window_increase);
  }

  fn flush(self: *Http2Client) !void {
    if (self.closing) return error.ConnectionFailed;
    if (self.h2_session) |session| while (nghttp2.nghttp2_session_want_write(session) != 0) if (nghttp2.nghttp2_session_send(session) != 0) break;
    if (self.write_buf.items.len > 0) {
      const data = try self.allocator.dupe(u8, self.write_buf.items);
      self.write_buf.clearRetainingCapacity();
      const wr = try self.allocator.create(uv.write_t);
      wr.data = data.ptr;
      var buf = uv.buf_t{ .base = data.ptr, .len = data.len };
      if (tlsuv.tlsuv_stream_write(wr, &self.tls, &buf, onWrite) != 0) {
        self.allocator.free(data);
        self.allocator.destroy(wr);
        return error.ConnectionFailed;
      }
    }
  }

  fn onWrite(wr: *uv.write_t, _: c_int) callconv(.c) void {
    const data_ptr: [*]u8 = @ptrCast(wr.data);
    std.c.free(data_ptr);
    std.c.free(@ptrCast(wr));
  }

  fn allocBuf(_: *uv.handle_t, size: usize, buf: *uv.buf_t) callconv(.c) void {
    buf.base = @ptrCast(std.c.malloc(size) orelse return);
    buf.len = size;
  }

  fn onRead(stream: *uv.stream_t, nread: isize, buf: *const uv.buf_t) callconv(.c) void {
    const tls: *tlsuv.stream_t = @ptrCast(@alignCast(stream));
    const client: *Http2Client = @ptrCast(@alignCast(tls.data));
    defer if (buf.base) |b| std.c.free(b);
    if (client.closing) return;
    if (nread < 0) {
      for (client.requests[0..client.request_count]) |*req| if (!req.done) {
        req.done = true;
        req.has_error = true;
        client.requests_done += 1;
        if (req.on_error) |cb| cb(FetchError.ConnectionFailed, req.userdata);
      };
      return;
    }
    if (nread > 0 and client.h2_session != null) {
      _ = nghttp2.nghttp2_session_mem_recv(client.h2_session.?, @ptrCast(buf.base), @intCast(nread));
      client.flush() catch {};
    }
  }

  fn onConnect(req: *uv.connect_t, status: c_int) callconv(.c) void {
    const ctx: *ConnectCtx = @ptrCast(@alignCast(req.data));
    defer ctx.client.allocator.destroy(ctx);
    ctx.client.connect_pending = false;
    if (ctx.client.closing) {
      ctx.client.connected = -1;
      _ = tlsuv.tlsuv_stream_close(&ctx.client.tls, onStreamClose);
      return;
    }
    if (status < 0) {
      ctx.client.connected = -1;
      return;
    }
    ctx.client.connected = 1;
    ctx.client.tls.data = ctx.client;
    ctx.client.initH2() catch {
      ctx.client.connected = -1;
      return;
    };
    _ = tlsuv.tlsuv_stream_read_start(&ctx.client.tls, allocBuf, onRead);
    ctx.client.flush() catch {};
  }

  const ConnectCtx = struct { client: *Http2Client, req: uv.connect_t };

  fn ensureConnected(self: *Http2Client) !void {
    if (self.closing) return error.ConnectionFailed;
    if (self.connected > 0) return;
    if (self.connected < 0) return error.ConnectionFailed;

    var conn_start: u64 = @intCast(std.time.nanoTimestamp());

    if (!self.connect_pending) {
      const ctx = try self.allocator.create(ConnectCtx);
      ctx.* = .{ .client = self, .req = .{} };
      ctx.req.data = ctx;
      if (tlsuv.tlsuv_stream_connect(&ctx.req, &self.tls, self.host.ptr, if (self.use_tls) 443 else 80, onConnect) != 0) {
        self.allocator.destroy(ctx);
        return error.ConnectionFailed;
      }
      self.connect_pending = true;
    }

    var loop_count: u32 = 0;
    while (self.connected == 0) {
      _ = uv.uv_run(self.loop, uv.RUN_ONCE);
      loop_count += 1;
    }
    conn_start = debug.timer("    h2: tls connect", conn_start);
    debug.log("    h2: connect loop iterations={d}", .{loop_count});
    if (self.connected < 0) return error.ConnectionFailed;
  }

  pub fn initiateConnectAsync(self: *Http2Client) !void {
    if (self.closing) return error.ConnectionFailed;
    if (self.connected > 0) return;
    if (self.connected < 0) return error.ConnectionFailed;
    if (self.connect_pending) return;

    const ctx = try self.allocator.create(ConnectCtx);
    ctx.* = .{ .client = self, .req = .{} };
    ctx.req.data = ctx;
    if (tlsuv.tlsuv_stream_connect(&ctx.req, &self.tls, self.host.ptr, if (self.use_tls) 443 else 80, onConnect) != 0) {
      self.allocator.destroy(ctx);
      return error.ConnectionFailed;
    }
    self.connect_pending = true;
  }

  fn makeNv(name: [:0]const u8, value: [:0]const u8) nghttp2.nv {
    return .{ 
      .name = @constCast(name.ptr),
      .value = @constCast(value.ptr),
      .namelen = name.len, .valuelen = value.len, .flags = nghttp2.NV_FLAG_NONE 
    };
  }

  pub fn get(self: *Http2Client, path: []const u8, allocator: std.mem.Allocator) ![]u8 {
    return self.getWithAccept(path, "application/json", allocator);
  }

  pub fn getWithAccept(self: *Http2Client, path: []const u8, accept: [:0]const u8, allocator: std.mem.Allocator) ![]u8 {
    try self.ensureConnected();
    if (self.request_count >= MAX_PENDING_REQUESTS) self.resetRequests();
    const req = &self.requests[self.request_count]; self.request_count += 1;
    
    req.* = .{
      .stream_id = 0,
      .path = try self.allocator.dupeZ(u8, path),
      .on_data = null,
      .on_complete = null,
      .on_error = null,
      .userdata = null,
      .response_body = .{},
      .status_code = 0,
      .done = false,
      .has_error = false,
      .start_ns = @intCast(std.time.nanoTimestamp()),
      .end_ns = 0,
      .bytes = 0,
      .content_encoding = .identity,
    };
    
    const session = self.h2_session orelse return error.Http2Error;
    
    var hdrs = [_]nghttp2.nv{ 
      makeNv(":method", "GET"),
      makeNv(":path", req.path.?),
      makeNv(":scheme", "https"),
      makeNv(":authority", self.host),
      makeNv("accept", accept),
      makeNv("user-agent", user_agent) 
    };
    
    const sid = nghttp2.nghttp2_submit_request(session, null, &hdrs, hdrs.len, null, req);
    if (sid < 0) {
      self.request_count -= 1;
      if (req.path) |p| self.allocator.free(p);
      return error.Http2Error;
    }
    
    req.stream_id = sid;
    try self.flush();
    while (!req.done) {
      _ = uv.uv_run(self.loop, uv.RUN_ONCE);
      try self.flush();
    }
    
    self.last_response_status_code = req.status_code;
    if (req.has_error or req.status_code != 200) return error.ResponseError;
    return try allocator.dupe(u8, req.response_body.items);
  }

  pub fn getStream(self: *Http2Client, path: []const u8, on_data: *const fn ([]const u8, ?*anyopaque) void, on_complete: *const fn (u16, ?*anyopaque) void, on_error: *const fn (FetchError, ?*anyopaque) void, userdata: ?*anyopaque) !void {
    try self.ensureConnected();
    const req = self.findOrAllocSlot() orelse return error.OutOfMemory;
    
    req.* = .{ 
      .stream_id = 0,
      .path = try self.allocator.dupeZ(u8, path),
      .on_data = on_data,
      .on_complete = on_complete,
      .on_error = on_error,
      .userdata = userdata,
      .response_body = .{},
      .status_code = 0,
      .done = false,
      .has_error = false,
      .start_ns = @intCast(std.time.nanoTimestamp()),
      .end_ns = 0,
      .bytes = 0,
      .content_encoding = .identity 
    };

    const session = self.h2_session orelse return error.Http2Error;
    
    var hdrs = [_]nghttp2.nv{ 
      makeNv(":method", "GET"),
      makeNv(":path", req.path.?),
      makeNv(":scheme", "https"),
      makeNv(":authority", self.host),
      makeNv("accept", "*/*"),
      makeNv("user-agent", user_agent)
    };
    
    const sid = nghttp2.nghttp2_submit_request(session, null, &hdrs, hdrs.len, null, req);
    if (sid < 0) {
      if (req.path) |p| self.allocator.free(p);
      req.stream_id = -1;
      return error.Http2Error;
    }
    
    req.stream_id = sid;
    try self.flush();
  }

  pub fn run(self: *Http2Client) !void {
    const run_start: u64 = @intCast(std.time.nanoTimestamp());
    var loop_count: u32 = 0;
    var last_done: usize = 0;
    var last_report: u64 = run_start;

    while (self.requests_done < self.request_count) {
      if (uv.uv_run(self.loop, uv.RUN_ONCE) == 0) break;
      try self.flush();
      loop_count += 1;

      const now: u64 = @intCast(std.time.nanoTimestamp());
      if (now - last_report > 1_000_000_000) {
        const done_delta = self.requests_done - last_done;
        debug.log("    h2: progress {d}/{d} (+{d} in last 1s) loops={d}", .{
          self.requests_done, self.request_count,
          done_delta, loop_count,
        });
        last_done = self.requests_done;
        last_report = now;
      }
    }

    const elapsed_ns: u64 = @intCast(@as(i128, @intCast(std.time.nanoTimestamp())) - @as(i128, run_start));
    const elapsed_ms = elapsed_ns / 1_000_000;
    debug.log("    h2: run complete in {d}ms, {d} loops, {d}/{d} done", .{
      elapsed_ms, loop_count,
      self.requests_done, self.request_count,
    });

    var error_count: usize = 0;
    for (self.requests[0..self.request_count]) |req| {
      if (req.has_error) error_count += 1;
    }
    if (error_count > 0) {
      debug.log("    h2: {d} requests had errors", .{error_count});
      return error.ResponseError;
    }
  }
};

pub const TarballCtx = struct {
  handler: StreamHandler,
  done: bool,
  has_error: bool,
  url: []const u8,
  start_ns: u64,
  bytes: usize,
};

const TarballStats = struct {
  url: []const u8,
  bytes: usize,
  elapsed_ms: u64,
};

const TarballCallbacks = struct {
  fn onData(data: []const u8, ud: ?*anyopaque) void {
    const ctx: *TarballCtx = @ptrCast(@alignCast(ud));
    ctx.bytes += data.len;
    ctx.handler.on_data(data, ctx.handler.user_data);
  }
  
  fn onComplete(status: u16, ud: ?*anyopaque) void {
    const ctx: *TarballCtx = @ptrCast(@alignCast(ud));
    ctx.handler.on_complete(status, ctx.handler.user_data);
    ctx.done = true;
  }
  
  fn onError(err: FetchError, ud: ?*anyopaque) void {
    const ctx: *TarballCtx = @ptrCast(@alignCast(ud));
    ctx.handler.on_error(err, ctx.handler.user_data);
    ctx.done = true;
    ctx.has_error = true;
  }
};

pub const Fetcher = struct {
  allocator: std.mem.Allocator,
  registry_host: []const u8,
  meta_clients: [NUM_META_CONNECTIONS]?*Http2Client,
  meta_clients_initialized: bool,
  pending: std.ArrayListUnmanaged(PendingRequest),
  tarball_clients: [NUM_CONNECTIONS]?*Http2Client,
  tarball_clients_initialized: bool,
  tarball_contexts: std.ArrayListUnmanaged(*TarballCtx),
  tarball_round_robin: usize,
  tarball_stats: std.ArrayListUnmanaged(TarballStats),
  last_http_error_url: ?[]u8,
  last_http_error_status: u16,

  pub const HttpErrorInfo = struct {
    url: []const u8,
    status: u16,
  };

  pub fn init(allocator: std.mem.Allocator, registry_host: []const u8) !*Fetcher {
    const f = try allocator.create(Fetcher);
    f.* = .{
      .allocator = allocator,
      .registry_host = try allocator.dupe(u8, registry_host),
      .meta_clients = [_]?*Http2Client{null} ** NUM_META_CONNECTIONS,
      .meta_clients_initialized = false,
      .pending = .{},
      .tarball_clients = [_]?*Http2Client{null} ** NUM_CONNECTIONS,
      .tarball_clients_initialized = false,
      .tarball_contexts = .{},
      .tarball_round_robin = 0,
      .tarball_stats = .{},
      .last_http_error_url = null,
      .last_http_error_status = 0,
    };
    return f;
  }

  pub fn deinit(self: *Fetcher) void {
    if (self.last_http_error_url) |url| self.allocator.free(url);
    for (&self.meta_clients) |*maybe_client| {
      if (maybe_client.*) |c| { c.deinit();  maybe_client.* = null; }
    }
    for (self.pending.items) |req| self.allocator.free(req.url);
    self.pending.deinit(self.allocator);
    for (&self.tarball_clients) |*maybe_client| {
      if (maybe_client.*) |c| { c.deinit(); maybe_client.* = null; }
    }
    for (self.tarball_contexts.items) |ctx| {
      self.allocator.free(ctx.url);
      self.allocator.destroy(ctx);
    }
    self.tarball_contexts.deinit(self.allocator);
    for (self.tarball_stats.items) |stat| self.allocator.free(stat.url);
    self.tarball_stats.deinit(self.allocator);
    self.allocator.free(self.registry_host);
    self.allocator.destroy(self);
  }

  fn clearLastHttpError(self: *Fetcher) void {
    if (self.last_http_error_url) |url| self.allocator.free(url);
    self.last_http_error_url = null;
    self.last_http_error_status = 0;
  }

  fn setLastHttpError(self: *Fetcher, url: []const u8, status: u16) void {
    self.clearLastHttpError();
    self.last_http_error_url = self.allocator.dupe(u8, url) catch null;
    self.last_http_error_status = status;
  }

  pub fn getLastHttpError(self: *const Fetcher) ?HttpErrorInfo {
    const url = self.last_http_error_url orelse return null;
    return .{ .url = url, .status = self.last_http_error_status };
  }

  fn ensureMetaClients(self: *Fetcher) !void {
    if (self.meta_clients_initialized) return;

    for (&self.meta_clients, 0..) |*slot, i| {
      const client = Http2Client.init(self.allocator, self.registry_host, true) catch |err| {
        debug.log("fetcher: failed to init meta connection {d}: {}", .{ i, err });
        continue;
      };
      client.ensureConnected() catch |err| {
        debug.log("fetcher: failed to connect meta {d}: {}", .{ i, err });
        client.deinit();
        continue;
      };
      slot.* = client;
    }

    var any_connected = false;
    for (self.meta_clients) |slot| {
      if (slot != null) { any_connected = true; break; }
    }
    
    if (!any_connected) return error.ConnectionFailed;
    self.meta_clients_initialized = true;
  }

  pub fn resetMetaClients(self: *Fetcher) void {
    self.clearLastHttpError();
    for (&self.meta_clients) |*slot| {
      if (slot.*) |client| { client.deinit(); slot.* = null; }
    }
    self.meta_clients_initialized = false;
  }

  fn ensureTarballClients(self: *Fetcher) !void {
    if (self.tarball_clients_initialized) return;

    debug.log("fetcher: initializing {d} persistent connections", .{NUM_CONNECTIONS});
    const init_start: u64 = @intCast(std.time.nanoTimestamp());

    for (&self.tarball_clients, 0..) |*slot, i| {
      const client = Http2Client.init(self.allocator, self.registry_host, true) catch |err| {
        debug.log("fetcher: failed to init connection {d}: {}", .{ i, err });
        continue;
      };
      client.ensureConnected() catch |err| {
        debug.log("fetcher: failed to connect {d}: {}", .{ i, err });
        client.deinit();
        continue;
      };
      slot.* = client;
    }

    var any_connected = false;
    for (self.tarball_clients) |slot| {
      if (slot != null) { any_connected = true; break; }
    }

    if (!any_connected) return error.ConnectionFailed;
    self.tarball_clients_initialized = true;
    
    _ = debug.timer("fetcher: connection pool init", init_start);
  }
  
  fn findAvailableClient(self: *Fetcher) ?struct { client: *Http2Client, idx: usize } {
    var attempts: usize = 0;
    while (attempts < NUM_CONNECTIONS) : (attempts += 1) {
      const idx = (self.tarball_round_robin + attempts) % NUM_CONNECTIONS;
      if (self.tarball_clients[idx]) |client| { if (client.hasCapacity()) return .{ .client = client, .idx = idx }; }
    }
    return null;
  }

  pub fn initiateTarballConnectionsAsync(self: *Fetcher) void {
    if (self.tarball_clients_initialized) return;
    debug.log("fetcher: initiating {d} tarball connections (async)", .{NUM_CONNECTIONS});

    for (&self.tarball_clients, 0..) |*slot, i| {
      const client = Http2Client.init(self.allocator, self.registry_host, true) catch {
        continue;
      };
      client.initiateConnectAsync() catch {
        client.deinit(); continue;
      };
      slot.* = client; _ = i;
    }

    var any_connected = false;
    for (self.tarball_clients) |slot| {
      if (slot != null) { any_connected = true; break; }
    }

    if (any_connected) self.tarball_clients_initialized = true;
  }

  pub fn queueTarballAsync(self: *Fetcher, url: []const u8, handler: StreamHandler) !void {
    try self.ensureTarballClients();
    const parsed = try ParsedUrl.parse(url);
    
    const available = self.findAvailableClient() orelse {
      try self.pending.append(self.allocator, .{
        .url = try self.allocator.dupe(u8, url),
        .handler = handler,
      }); return;
    };
    
    const ctx = try self.allocator.create(TarballCtx);
    ctx.* = .{
      .handler = handler,
      .done = false,
      .has_error = false,
      .url = try self.allocator.dupe(u8, url),
      .start_ns = @intCast(std.time.nanoTimestamp()),
      .bytes = 0,
    };
    
    try self.tarball_contexts.append(
      self.allocator,
      ctx
    );

    try available.client.getStream(
      parsed.path,
      TarballCallbacks.onData,
      TarballCallbacks.onComplete,
      TarballCallbacks.onError,
      ctx,
    );
    
    self.tarball_round_robin = (available.idx + 1) % NUM_CONNECTIONS;
  }

  pub fn tick(self: *Fetcher) usize {
    self.ensureTarballClients() catch return 0;
    const loop = uv.uv_default_loop();
    
    for (&self.tarball_clients) |maybe_client| {
      if (maybe_client) |c| c.flush() catch {};
    }
    _ = uv.uv_run(loop, uv.RUN_NOWAIT);
    
    for (&self.tarball_clients) |maybe_client| {
      if (maybe_client) |c| c.recycleCompletedRequests();
    }
    
    const completed = self.cleanupCompletedContexts();
    self.dispatchPending();
    
    return completed;
  }
  
  fn cleanupCompletedContexts(self: *Fetcher) usize {
    var completed: usize = 0; var i: usize = 0;
    while (i < self.tarball_contexts.items.len) {
      const ctx = self.tarball_contexts.items[i];
      if (ctx.done) {
        completed += 1;
        self.allocator.free(ctx.url);
        self.allocator.destroy(ctx);
        _ = self.tarball_contexts.swapRemove(i);
      } else i += 1;
    }
    return completed;
  }
  
  fn dispatchPending(self: *Fetcher) void {
    while (self.pending.items.len > 0) {
      const available = self.findAvailableClient() orelse break;
      const req = self.pending.pop() orelse break;
      
      const handler = req.handler orelse {
        self.allocator.free(req.url); continue;
      };
      
      self.dispatchRequest(available.client, req.url, handler) catch |err| {
        handler.on_error(errToFetchError(err), handler.user_data);
        self.allocator.free(req.url); continue;
      };
    }
  }
  
  fn dispatchRequest(self: *Fetcher, client: *Http2Client, url: []const u8, handler: StreamHandler) !void {
    const parsed = try ParsedUrl.parse(url);
    const ctx = try self.allocator.create(TarballCtx);
    
    ctx.* = .{
      .handler = handler,
      .done = false,
      .has_error = false,
      .url = url,
      .start_ns = @intCast(std.time.nanoTimestamp()),
      .bytes = 0,
    };
    
    errdefer self.allocator.destroy(ctx);
    try self.tarball_contexts.append(self.allocator, ctx);
    
    try client.getStream(
      parsed.path,
      TarballCallbacks.onData,
      TarballCallbacks.onComplete,
      TarballCallbacks.onError,
      ctx,
    );
  }
  
  fn errToFetchError(err: anyerror) FetchError {
    return switch (err) {
      error.InvalidUrl => FetchError.InvalidUrl,
      error.OutOfMemory => FetchError.OutOfMemory,
      else => FetchError.Http2Error,
    };
  }
  
  pub fn pendingTarballCount(self: *Fetcher) usize {
    return self.tarball_contexts.items.len;
  }

  pub fn finishTarballs(self: *Fetcher) void {
    const loop = uv.uv_default_loop();
    var last_report: u64 = @intCast(std.time.nanoTimestamp());
    var loops: usize = 0;
    var completed: usize = 0;
    const start = last_report;

    while (self.tarball_contexts.items.len > 0 or self.pending.items.len > 0) {
      for (&self.tarball_clients) |maybe_client| {
        if (maybe_client) |c| c.flush() catch {};
      }

      if (uv.uv_run(loop, uv.RUN_ONCE) == 0 and self.pending.items.len == 0 and self.tarball_contexts.items.len == 0) break;
      loops += 1;

      for (&self.tarball_clients) |maybe_client| {
        if (maybe_client) |c| c.recycleCompletedRequests();
      }

      var i: usize = 0;
      while (i < self.tarball_contexts.items.len) {
        const ctx = self.tarball_contexts.items[i];
        if (ctx.done) {
          if (!ctx.has_error) {
            const elapsed_ms: u64 = @intCast((@as(u64, @intCast(std.time.nanoTimestamp())) - ctx.start_ns) / 1_000_000);
            const url_copy = self.allocator.dupe(u8, ctx.url) catch null;
            if (url_copy) |url| {
              self.tarball_stats.append(self.allocator, .{ .url = url, .bytes = ctx.bytes, .elapsed_ms = elapsed_ms }) catch {};
            }
          }
          self.allocator.free(ctx.url);
          self.allocator.destroy(ctx);
          _ = self.tarball_contexts.swapRemove(i);
          completed += 1;
        } else {
          i += 1;
        }
      }

      while (self.pending.items.len > 0) {
        var queued = false;
        for (&self.tarball_clients, 0..) |maybe_client, conn_idx| {
          if (maybe_client) |client| {
            if (client.hasCapacity()) {
              const maybe_req = self.pending.pop();
              const req = maybe_req orelse break;
              if (req.handler) |handler| {
                const parsed = ParsedUrl.parse(req.url) catch {
                  handler.on_error(FetchError.InvalidUrl, handler.user_data);
                  self.allocator.free(req.url);
                  continue;
                };

                const ctx = self.allocator.create(TarballCtx) catch {
                  handler.on_error(FetchError.OutOfMemory, handler.user_data);
                  self.allocator.free(req.url);
                  continue;
                };
                ctx.* = .{
                  .handler = handler,
                  .done = false,
                  .has_error = false,
                  .url = req.url,
                  .start_ns = @intCast(std.time.nanoTimestamp()),
                  .bytes = 0,
                };
                self.tarball_contexts.append(self.allocator, ctx) catch {
                  self.allocator.destroy(ctx);
                  self.allocator.free(req.url);
                  continue;
                };

                client.getStream(
                  parsed.path,
                  struct {
                    fn onData(data: []const u8, ud: ?*anyopaque) void {
                      const c: *TarballCtx = @ptrCast(@alignCast(ud));
                      c.bytes += data.len;
                      c.handler.on_data(data, c.handler.user_data);
                    }
                  }.onData,
                  struct {
                    fn onComplete(status: u16, ud: ?*anyopaque) void {
                      const c: *TarballCtx = @ptrCast(@alignCast(ud));
                      c.handler.on_complete(status, c.handler.user_data);
                      if (debug.enabled) {
                        const elapsed_ms: u64 = @intCast((@as(u64, @intCast(std.time.nanoTimestamp())) - c.start_ns) / 1_000_000);
                        debug.log("    tarball: done {s} {d}ms {d} bytes status={d}", .{ c.url, elapsed_ms, c.bytes, status });
                      }
                      c.done = true;
                    }
                  }.onComplete,
                  struct {
                    fn onError(err: FetchError, ud: ?*anyopaque) void {
                      const c: *TarballCtx = @ptrCast(@alignCast(ud));
                      c.handler.on_error(err, c.handler.user_data);
                      if (debug.enabled) {
                        const elapsed_ms: u64 = @intCast((@as(u64, @intCast(std.time.nanoTimestamp())) - c.start_ns) / 1_000_000);
                        debug.log("    tarball: error {s} {d}ms {d} bytes", .{ c.url, elapsed_ms, c.bytes });
                      }
                      c.done = true;
                      c.has_error = true;
                    }
                  }.onError,
                  ctx,
                ) catch {
                  handler.on_error(FetchError.Http2Error, handler.user_data);
                  ctx.done = true;
                };
                queued = true;
                _ = conn_idx;
              } else {
                self.allocator.free(req.url);
              }
              break;
            }
          }
        }
        if (!queued) break;
      }

      const now: u64 = @intCast(std.time.nanoTimestamp());
      if (now - last_report > 1_000_000_000) {
        var total_bytes: usize = 0;
        for (self.tarball_contexts.items) |ctx| {
          total_bytes += ctx.bytes;
        }
        debug.log("    h2: {d} in-flight, {d} pending, {d} completed, {d} loops", .{
          self.tarball_contexts.items.len,
          self.pending.items.len,
          completed,
          loops,
        });
        debug.log("    h2: tarball progress in-flight bytes={d}", .{ total_bytes });
        last_report = now;
      }
    }

    const elapsed_ns: u64 = @intCast(@as(i128, @intCast(std.time.nanoTimestamp())) - @as(i128, start));
    debug.log("fetcher: finishTarballs completed in {d}ms, {d} loops, {d} completed", .{
      elapsed_ns / 1_000_000,
      loops,
      completed,
    });
    if (debug.enabled and self.tarball_stats.items.len > 0) {
      var top_time: [5]?TarballStats = .{null} ** 5;
      var top_size: [5]?TarballStats = .{null} ** 5;

      for (self.tarball_stats.items) |stat| {
        var idx_time: usize = top_time.len;
        for (top_time, 0..) |slot, i| {
          if (slot == null or stat.elapsed_ms > slot.?.elapsed_ms) {
            idx_time = i;
            break;
          }
        }
        if (idx_time < top_time.len) {
          var carry = stat;
          var j = idx_time;
          while (j < top_time.len) : (j += 1) {
            const next = top_time[j];
            top_time[j] = carry;
            if (next) |n| {
              carry = n;
            } else {
              break;
            }
          }
        }

        var idx_size: usize = top_size.len;
        for (top_size, 0..) |slot, i| {
          if (slot == null or stat.bytes > slot.?.bytes) {
            idx_size = i;
            break;
          }
        }
        if (idx_size < top_size.len) {
          var carry_size = stat;
          var k = idx_size;
          while (k < top_size.len) : (k += 1) {
            const next_size = top_size[k];
            top_size[k] = carry_size;
            if (next_size) |n| {
              carry_size = n;
            } else {
              break;
            }
          }
        }
      }

      debug.log("fetcher: top tarballs by time", .{});
      for (top_time, 0..) |maybe_stat, i| {
        if (maybe_stat) |stat| {
          debug.log("  {d}. {s} {d}ms {d} bytes", .{ i + 1, stat.url, stat.elapsed_ms, stat.bytes });
        }
      }
      debug.log("fetcher: top tarballs by size", .{});
      for (top_size, 0..) |maybe_stat, i| {
        if (maybe_stat) |stat| {
          debug.log("  {d}. {s} {d} bytes {d}ms", .{ i + 1, stat.url, stat.bytes, stat.elapsed_ms });
        }
      }
    }
  }

  pub fn fetchMetadata(self: *Fetcher, package_name: []const u8, allocator: std.mem.Allocator) ![]u8 {
    return self.fetchMetadataFull(package_name, false, allocator);
  }

  const DecodedMetadata = struct {
    data: []u8,
    compressed: bool,
  };

  fn metaClientCanQueue(c: *const Http2Client) bool {
    return c.h2_session != null and c.connected == 1 and c.request_count < MAX_PENDING_REQUESTS - 1;
  }

  fn nextMetaClient(self: *Fetcher, conn_idx: *usize) ?*Http2Client {
    var attempts: usize = 0;
    while (attempts < NUM_META_CONNECTIONS) : (attempts += 1) {
      if (self.meta_clients[conn_idx.*]) |c| {
        if (metaClientCanQueue(c)) return c;
      }
      conn_idx.* = (conn_idx.* + 1) % NUM_META_CONNECTIONS;
    }
    return null;
  }

  fn flushMetaClients(self: *Fetcher) void {
    for (self.meta_clients) |maybe_client| {
      if (maybe_client) |c| c.flush() catch {};
    }
  }

  fn metaRequestsComplete(self: *Fetcher) bool {
    for (self.meta_clients) |maybe_client| {
      if (maybe_client) |c| {
        for (c.requests[0..c.request_count]) |*req| {
          if (!req.done and !req.has_error) return false;
        }
      }
    }
    return true;
  }

  fn decodeMetadataOwned(
    req: *Http2Client.RequestState,
    allocator: std.mem.Allocator,
    decompress_buf: *std.ArrayListUnmanaged(u8),
  ) ?DecodedMetadata {
    if (req.has_error or req.status_code != 200) return null;

    if (req.content_encoding != .gzip) {
      const data = allocator.dupe(u8, req.response_body.items) catch return null;
      return .{ .data = data, .compressed = false };
    }

    decompress_buf.clearRetainingCapacity();
    const decomp = extractor.GzipDecompressor.init(allocator) catch return null;
    defer decomp.deinit();

    _ = decomp.decompress(req.response_body.items, struct {
      fn onChunk(data: []const u8, ctx: ?*anyopaque) anyerror!void {
        const buf: *std.ArrayListUnmanaged(u8) = @ptrCast(@alignCast(ctx));
        try buf.appendSlice(c_allocator, data);
      }
    }.onChunk, decompress_buf) catch return null;

    const data = allocator.dupe(u8, decompress_buf.items) catch return null;
    return .{ .data = data, .compressed = true };
  }

  pub fn fetchMetadataFull(self: *Fetcher, package_name: []const u8, full: bool, allocator: std.mem.Allocator) ![]u8 {
    try self.ensureMetaClients();
    self.clearLastHttpError();
    for (self.meta_clients) |maybe_client| {
      if (maybe_client) |client| {
        var path_buf: [512]u8 = undefined;
        const path_slice = std.fmt.bufPrint(&path_buf, "/{s}", .{package_name}) catch return error.OutOfMemory;
        const accept: [:0]const u8 = if (full) "application/json" else "application/vnd.npm.install-v1+json";
        return client.getWithAccept(path_slice, accept, allocator) catch |err| {
          if (err == error.ResponseError) {
            var url_buf: [1024]u8 = undefined;
            const url = std.fmt.bufPrint(&url_buf, "https://{s}/{s}", .{ self.registry_host, package_name }) catch "";
            self.setLastHttpError(url, client.last_response_status_code);
          }
          return err;
        };
      }
    }
    return error.ConnectionFailed;
  }

  pub const MetadataResult = struct {
    name: []const u8,
    data: ?[]u8,
    compressed: bool,
    has_error: bool,
  };

  fn storeMetadataBatchResult(
    req: *Http2Client.RequestState,
    result: *MetadataResult,
    allocator: std.mem.Allocator,
    decompress_buf: *std.ArrayListUnmanaged(u8),
  ) bool {
    const decoded = decodeMetadataOwned(req, allocator, decompress_buf) orelse {
      result.has_error = true;
      return false;
    };
    result.data = decoded.data;
    result.compressed = decoded.compressed;
    return true;
  }

  pub fn fetchMetadataBatch(self: *Fetcher, names: []const []const u8, allocator: std.mem.Allocator) ![]MetadataResult {
    if (names.len == 0) return &[_]MetadataResult{};

    var total_start: u64 = @intCast(std.time.nanoTimestamp());
    try self.ensureMetaClients();
    total_start = debug.timer("  meta: get clients", total_start);

    var active_connections: usize = 0;
    for (self.meta_clients) |maybe_client| {
      if (maybe_client != null) active_connections += 1;
    } if (active_connections == 0) return error.ConnectionFailed;

    debug.log("  meta: batch {d} packages across {d} connections", .{ names.len, active_connections });

    var results = try allocator.alloc(MetadataResult, names.len);
    for (results, 0..) |*r, i| {
      r.* = .{ .name = names[i], .data = null, .compressed = false, .has_error = false };
    }

    const total_capacity = active_connections * (MAX_PENDING_REQUESTS - 1);
    var offset: usize = 0;
    var batch_num: usize = 0;

    var decompress_buf = std.ArrayListUnmanaged(u8){};
    defer decompress_buf.deinit(c_allocator);

    while (offset < names.len) {
      const end = @min(offset + total_capacity, names.len);
      var batch_start: u64 = @intCast(std.time.nanoTimestamp());
      debug.log("  meta: batch {d} ({d}-{d})", .{ batch_num, offset, end });

      var queued: usize = 0;
      var conn_idx: usize = 0;
      for (offset..end) |i| {
        const result = &results[i];
        const name = names[i];

        const c = self.nextMetaClient(&conn_idx) orelse {
          result.has_error = true; continue;
        };
        
        const session = c.h2_session orelse {
          result.has_error = true; continue;
        };
        
        var path_buf: [512]u8 = undefined;
        const path = std.fmt.bufPrint(&path_buf, "/{s}", .{name}) catch {
          result.has_error = true;
          continue;
        };

        var hdrs = [_]nghttp2.nv{
          Http2Client.makeNv(":method", "GET"),
          Http2Client.makeNv(":path", c.allocator.dupeZ(u8, path) catch {
            result.has_error = true; continue;
          }),
          Http2Client.makeNv(":scheme", "https"),
          Http2Client.makeNv(":authority", c.host),
          Http2Client.makeNv("accept", "application/vnd.npm.install-v1+json"),
          Http2Client.makeNv("accept-encoding", "gzip"),
          Http2Client.makeNv("user-agent", user_agent),
        };

        const req = &c.requests[c.request_count];
        c.request_count += 1;
        req.* = .{
          .stream_id = 0,
          .path = hdrs[1].value[0..hdrs[1].valuelen :0],
          .on_data = null,
          .on_complete = null,
          .on_error = null,
          .userdata = result,
          .response_body = .{},
          .status_code = 0,
          .done = false,
          .has_error = false,
          .start_ns = @intCast(std.time.nanoTimestamp()),
          .end_ns = 0,
          .bytes = 0,
          .content_encoding = .identity,
        };

        const sid = nghttp2.nghttp2_submit_request(session, null, &hdrs, hdrs.len, null, req);
        if (sid < 0) {
          c.request_count -= 1;
          if (req.path) |p| c.allocator.free(p);
          result.has_error = true;
          continue;
        }
        req.stream_id = sid;
        queued += 1;
        conn_idx = (conn_idx + 1) % NUM_META_CONNECTIONS;
      }
      
      batch_start = debug.timer("  meta: queue requests", batch_start);
      self.flushMetaClients();

      const loop = uv.uv_default_loop();
      var all_done = false;
      var loops: usize = 0;
      const run_start: u64 = @intCast(std.time.nanoTimestamp());

      while (!all_done) {
        _ = uv.uv_run(loop, uv.RUN_ONCE);
        loops += 1;
        all_done = self.metaRequestsComplete();
      }

      const elapsed_ns: u64 = @intCast(@as(i128, @intCast(std.time.nanoTimestamp())) - @as(i128, run_start));
      debug.log("    h2: run complete in {d}ms, {d} loops", .{ elapsed_ns / 1_000_000, loops });
      batch_start = debug.timer("  meta: run h2 loop", batch_start);

      var slow_count: usize = 0;
      var max_req_ms: u64 = 0;
      var max_req_name: []const u8 = "";
      var total_bytes: usize = 0;
      for (self.meta_clients) |maybe_client| {
        if (maybe_client) |c| {
          for (c.requests[0..c.request_count]) |*req| {
            const result: *MetadataResult = @ptrCast(@alignCast(req.userdata));
            const end_ns = if (req.end_ns != 0) req.end_ns else @as(u64, @intCast(std.time.nanoTimestamp()));
            const duration_ms: u64 = @intCast((end_ns - req.start_ns) / 1_000_000);
            total_bytes += req.response_body.items.len;
            if (duration_ms > max_req_ms) {
              max_req_ms = duration_ms;
              max_req_name = result.name;
            }
            if (duration_ms >= META_SLOW_LOG_MS) {
              slow_count += 1;
              debug.log("    meta: slow {s} {d}ms {d} bytes status={d}", .{
                result.name,
                duration_ms,
                req.response_body.items.len,
                req.status_code,
              });
            }
          }
        }
      }
      debug.log("    meta: summary slow={d} max={s} {d}ms total_bytes={d}", .{ slow_count, max_req_name, max_req_ms, total_bytes });

      var success: usize = 0;
      for (self.meta_clients) |maybe_client| {
        if (maybe_client) |c| {
          for (c.requests[0..c.request_count]) |*req| {
            const result: *MetadataResult = @ptrCast(@alignCast(req.userdata));
            if (storeMetadataBatchResult(req, result, allocator, &decompress_buf)) success += 1;
          }
          c.resetRequests();
        }
      }
      _ = debug.timer("  meta: copy results", batch_start);
      debug.log("  meta: queued={d} success={d}", .{ queued, success });

      offset = end;
      batch_num += 1;
    }

    return results;
  }

  pub const MetadataCallback = *const fn (
    name: []const u8,
    data: ?[]const u8,
    has_error: bool,
    userdata: ?*anyopaque
  ) void;

  const MetadataStreamTracker = struct {
    name: []const u8,
    index: usize,
  };

  fn emitMetadataStreamingResult(
    req: *Http2Client.RequestState,
    name: []const u8,
    allocator: std.mem.Allocator,
    decompress_buf: *std.ArrayListUnmanaged(u8),
    callback: MetadataCallback,
    userdata: ?*anyopaque,
  ) void {
    if (req.has_error or req.status_code != 200) {
      callback(name, null, true, userdata);
      return;
    }

    if (req.content_encoding != .gzip) {
      callback(name, req.response_body.items, false, userdata);
      return;
    }

    decompress_buf.clearRetainingCapacity();
    const decomp = extractor.GzipDecompressor.init(allocator) catch {
      callback(name, null, true, userdata);
      return;
    };
    defer decomp.deinit();

    _ = decomp.decompress(req.response_body.items, struct {
      fn onChunk(data: []const u8, ctx: ?*anyopaque) anyerror!void {
        const buf: *std.ArrayListUnmanaged(u8) = @ptrCast(@alignCast(ctx));
        try buf.appendSlice(c_allocator, data);
      }
    }.onChunk, decompress_buf) catch {
      callback(name, null, true, userdata);
      return;
    };

    callback(name, decompress_buf.items, false, userdata);
  }

  fn emitCompletedStreamingMetadataCallbacks(
    self: *Fetcher,
    processed: []bool,
    allocator: std.mem.Allocator,
    decompress_buf: *std.ArrayListUnmanaged(u8),
    callback: MetadataCallback,
    userdata: ?*anyopaque,
  ) void {
    for (self.meta_clients) |maybe_client| {
      const c = maybe_client orelse continue;
      for (c.requests[0..c.request_count]) |*req| {
        if (!req.done and !req.has_error) continue;

        const tracker: *MetadataStreamTracker = @ptrCast(@alignCast(req.userdata));
        if (processed[tracker.index]) continue;

        processed[tracker.index] = true;
        emitMetadataStreamingResult(req, tracker.name, allocator, decompress_buf, callback, userdata);
      }
    }
  }

  pub fn fetchMetadataStreaming(
    self: *Fetcher,
    names: []const []const u8,
    allocator: std.mem.Allocator,
    callback: MetadataCallback,
    userdata: ?*anyopaque,
  ) !void {
    if (names.len == 0) return;

    var total_start: u64 = @intCast(std.time.nanoTimestamp());
    try self.ensureMetaClients();
    total_start = debug.timer("  meta: get clients", total_start);

    var active_connections: usize = 0;
    for (self.meta_clients) |maybe_client| {
      if (maybe_client != null) active_connections += 1;
    }
    
    if (active_connections == 0) return error.ConnectionFailed;
    debug.log("  meta: streaming {d} packages across {d} connections", .{ names.len, active_connections });

    const processed = try allocator.alloc(bool, names.len);
    defer allocator.free(processed);
    @memset(processed, false);

    var trackers = try allocator.alloc(MetadataStreamTracker, names.len);
    defer allocator.free(trackers);
    for (names, 0..) |name, i| {
      trackers[i] = .{ .name = name, .index = i };
    }

    const total_capacity = active_connections * (MAX_PENDING_REQUESTS - 1);
    var offset: usize = 0;
    var batch_num: usize = 0;

    var decompress_buf = std.ArrayListUnmanaged(u8){};
    defer decompress_buf.deinit(c_allocator);

    while (offset < names.len) {
      const end = @min(offset + total_capacity, names.len);
      var batch_start: u64 = @intCast(std.time.nanoTimestamp());

      debug.log("  meta: batch {d} ({d}-{d})", .{ batch_num, offset, end });

      var queued: usize = 0;
      var conn_idx: usize = 0;
      for (offset..end) |i| {
        const tracker = &trackers[i];
        const name = names[i];

        const c = self.nextMetaClient(&conn_idx) orelse continue;
        const session = c.h2_session orelse continue;
        
        var path_buf: [512]u8 = undefined;
        const path = std.fmt.bufPrint(&path_buf, "/{s}", .{name}) catch continue;

        var hdrs = [_]nghttp2.nv{
          Http2Client.makeNv(":method", "GET"),
          Http2Client.makeNv(":path", c.allocator.dupeZ(u8, path) catch continue),
          Http2Client.makeNv(":scheme", "https"),
          Http2Client.makeNv(":authority", c.host),
          Http2Client.makeNv("accept", "application/vnd.npm.install-v1+json"),
          Http2Client.makeNv("accept-encoding", "gzip"),
          Http2Client.makeNv("user-agent", user_agent),
        };

        const req = &c.requests[c.request_count];
        c.request_count += 1;
        req.* = .{
          .stream_id = 0,
          .path = hdrs[1].value[0..hdrs[1].valuelen :0],
          .on_data = null,
          .on_complete = null,
          .on_error = null,
          .userdata = tracker,
          .response_body = .{},
          .status_code = 0,
          .done = false,
          .has_error = false,
          .start_ns = 0,
          .end_ns = 0,
          .bytes = 0,
          .content_encoding = .identity,
        };
        req.start_ns = @intCast(std.time.nanoTimestamp());

        const sid = nghttp2.nghttp2_submit_request(session, null, &hdrs, hdrs.len, null, req);
        if (sid < 0) {
          c.request_count -= 1;
          if (req.path) |p| c.allocator.free(p);
          continue;
        }
        req.stream_id = sid;
        queued += 1;
        conn_idx = (conn_idx + 1) % NUM_META_CONNECTIONS;
      }
      
      batch_start = debug.timer("  meta: queue requests", batch_start);
      self.flushMetaClients();

      const loop = uv.uv_default_loop();
      var all_done = false;
      var loops: usize = 0;
      const run_start: u64 = @intCast(std.time.nanoTimestamp());

      while (!all_done) {
        _ = uv.uv_run(loop, uv.RUN_ONCE);
        loops += 1;
        self.emitCompletedStreamingMetadataCallbacks(processed, allocator, &decompress_buf, callback, userdata);

        all_done = self.metaRequestsComplete();
      }

      const elapsed_ns: u64 = @intCast(@as(i128, @intCast(std.time.nanoTimestamp())) - @as(i128, run_start));
      debug.log("    h2: run complete in {d}ms, {d} loops", .{ elapsed_ns / 1_000_000, loops });

      var slow_count: usize = 0;
      var max_req_ms: u64 = 0;
      var max_req_name: []const u8 = "";
      var total_bytes: usize = 0;
      for (self.meta_clients) |maybe_client| {
        if (maybe_client) |c| {
          for (c.requests[0..c.request_count]) |*req| {
            const tracker: *MetadataStreamTracker = @ptrCast(@alignCast(req.userdata));
            const end_ns = if (req.end_ns != 0) req.end_ns else @as(u64, @intCast(std.time.nanoTimestamp()));
            const duration_ms: u64 = @intCast((end_ns - req.start_ns) / 1_000_000);
            total_bytes += req.response_body.items.len;
            if (duration_ms > max_req_ms) {
              max_req_ms = duration_ms;
              max_req_name = tracker.name;
            }
            if (duration_ms >= META_SLOW_LOG_MS) {
              slow_count += 1;
              debug.log("    meta: slow {s} {d}ms {d} bytes status={d}", .{
                tracker.name,
                duration_ms,
                req.response_body.items.len,
                req.status_code,
              });
            }
          }
        }
      }
      debug.log("    meta: summary slow={d} max={s} {d}ms total_bytes={d}", .{ slow_count, max_req_name, max_req_ms, total_bytes });

      for (self.meta_clients) |maybe_client| {
        if (maybe_client) |c| c.resetRequests();
      }

      offset = end;
      batch_num += 1;
    }
  }

  pub fn fetchTarball(self: *Fetcher, url: []const u8, handler: StreamHandler) !void {
    try self.pending.append(self.allocator, .{ .url = try self.allocator.dupe(u8, url), .handler = handler });
  }

  pub fn run(self: *Fetcher) !void {
    if (self.pending.items.len == 0 and self.tarball_contexts.items.len == 0) return;

    const run_start: u64 = @intCast(std.time.nanoTimestamp());
    const total_requests = self.pending.items.len + self.tarball_contexts.items.len;
    
    debug.log("fetcher: {d} tarballs to download (pending={d}, in-flight={d})", .{
      total_requests,
      self.pending.items.len,
      self.tarball_contexts.items.len,
    });

    try self.ensureTarballClients();
    self.finishTarballs();

    const elapsed_ns: u64 = @intCast(@as(i128, @intCast(std.time.nanoTimestamp())) - @as(i128, run_start));
    debug.log("fetcher: {d} tarballs complete in {d}ms", .{ total_requests, elapsed_ns / 1_000_000 });
  }
};
