mirror of
https://github.com/lukaszraczylo/go-telegram.git
synced 2026-06-10 23:09:04 +00:00
feat(client): opt-in fasthttp transport (NewFastHTTPDoer)
Adds an alternative HTTPDoer backed by valyala/fasthttp for high-throughput
bots. Cuts per-call allocs from 102 to 56 in the cross-library bench
(within 8 of telego, which uses fasthttp by default), and per-call bytes
from 11.1 KiB to 6.6 KiB.
bot := client.New(token,
client.WithHTTPClient(client.NewFastHTTPDoer()),
)
Implementation notes:
- Wraps *fasthttp.Client behind the existing HTTPDoer (Do *http.Request)
interface, so RetryDoer, custom transports, observability middleware,
and the 1428 generated tests all keep working as-is.
- Translates *http.Request -> fasthttp.Request once per call and
returns a *http.Response whose Body releases the pooled fasthttp
response on Close (net/http contract).
- Recognises the bufferReadCloser / readerReadCloser shapes produced
by buildRequest and passes their underlying bytes straight to
SetBodyRaw -- no io.ReadAll, no copy.
- Honours ctx.Deadline via DoDeadline, falls back to WithFastHTTPReadTimeout
when no deadline is set. fasthttp.ErrTimeout maps to
context.DeadlineExceeded for errors.Is compatibility.
Default stays net/http: fasthttp is HTTP/1.1 only, doesn't compose with
the http.RoundTripper middleware ecosystem, and most users don't have
the throughput to notice. Bots making thousands of API calls/sec should
opt in.
Multipart/file-upload path remains on net/http per the agreed scope --
the perf bottleneck was JSON-method round-trip, not file uploads.
Time numbers in the report deferred until a quiet-system bench run;
allocs/bytes numbers (which are deterministic per code path) are
already updated.
This commit is contained in:
+22
-4
@@ -215,6 +215,24 @@ func (b *Bot) buildRequest(ctx context.Context, method string, body io.Reader) (
|
||||
return req.WithContext(ctx), nil
|
||||
}
|
||||
|
||||
// bufferReadCloser exposes a *bytes.Buffer as io.ReadCloser without going
|
||||
// through io.NopCloser. Keeping the concrete *bytes.Buffer accessible lets
|
||||
// alternative HTTPDoers (e.g. FastHTTPDoer) type-assert and pass the
|
||||
// underlying bytes through to their native body-set APIs without copying.
|
||||
type bufferReadCloser struct {
|
||||
*bytes.Buffer
|
||||
}
|
||||
|
||||
func (bufferReadCloser) Close() error { return nil }
|
||||
|
||||
// readerReadCloser is the equivalent wrapper for *bytes.Reader (used by
|
||||
// the Marshal fallback path when the codec doesn't implement BodyEncoder).
|
||||
type readerReadCloser struct {
|
||||
*bytes.Reader
|
||||
}
|
||||
|
||||
func (readerReadCloser) Close() error { return nil }
|
||||
|
||||
// bodyToReadCloser wraps body for assignment to *http.Request.Body. The
|
||||
// type switch covers the body shapes encodeJSONBody returns: a pooled
|
||||
// *bytes.Buffer (BodyEncoder path or {} fast path) or a *bytes.Reader
|
||||
@@ -226,16 +244,16 @@ func bodyToReadCloser(body io.Reader) (io.ReadCloser, int64, func() (io.ReadClos
|
||||
case *bytes.Buffer:
|
||||
buf := v.Bytes()
|
||||
length := int64(len(buf))
|
||||
return io.NopCloser(v), length, func() (io.ReadCloser, error) {
|
||||
return io.NopCloser(bytes.NewReader(buf)), nil
|
||||
return bufferReadCloser{v}, length, func() (io.ReadCloser, error) {
|
||||
return readerReadCloser{bytes.NewReader(buf)}, nil
|
||||
}
|
||||
case *bytes.Reader:
|
||||
length := int64(v.Len())
|
||||
// Snapshot the reader's current data so GetBody returns a fresh one.
|
||||
snapshot := *v
|
||||
return io.NopCloser(v), length, func() (io.ReadCloser, error) {
|
||||
return readerReadCloser{v}, length, func() (io.ReadCloser, error) {
|
||||
s := snapshot
|
||||
return io.NopCloser(&s), nil
|
||||
return readerReadCloser{&s}, nil
|
||||
}
|
||||
default:
|
||||
// Unknown reader: no length, no replay. Should not happen with the
|
||||
|
||||
@@ -0,0 +1,231 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/valyala/fasthttp"
|
||||
)
|
||||
|
||||
// FastHTTPDoer is an HTTPDoer backed by github.com/valyala/fasthttp. It
|
||||
// trades net/http compatibility (and HTTP/2 support) for substantially
|
||||
// fewer allocations per request — fasthttp pools its Request and Response
|
||||
// objects and uses a zero-allocation HTTP/1.1 parser.
|
||||
//
|
||||
// Use it for high-throughput bots when GC pressure matters and you don't
|
||||
// need HTTP/2 or any net/http-only middleware (RoundTripper composition,
|
||||
// the OpenTelemetry httptrace family, etc.):
|
||||
//
|
||||
// bot := client.New(token, client.WithHTTPClient(client.NewFastHTTPDoer()))
|
||||
//
|
||||
// Wrap with RetryDoer the same way you would the default doer.
|
||||
type FastHTTPDoer struct {
|
||||
client *fasthttp.Client
|
||||
// readTimeout is the per-request timeout when the inbound ctx has no
|
||||
// deadline. Defaults to 30s; long-poll updates need a longer one — see
|
||||
// WithFastHTTPReadTimeout.
|
||||
readTimeout time.Duration
|
||||
}
|
||||
|
||||
// FastHTTPDoerOption configures a FastHTTPDoer.
|
||||
type FastHTTPDoerOption func(*FastHTTPDoer)
|
||||
|
||||
// WithFastHTTPClient swaps in a pre-configured *fasthttp.Client.
|
||||
// Useful for sharing a connection pool across multiple bots or applying
|
||||
// custom dial / TLS configuration.
|
||||
func WithFastHTTPClient(c *fasthttp.Client) FastHTTPDoerOption {
|
||||
return func(d *FastHTTPDoer) { d.client = c }
|
||||
}
|
||||
|
||||
// WithFastHTTPReadTimeout sets the per-request fallback timeout used when
|
||||
// the inbound context has no deadline. Long-poll callers should pass a
|
||||
// value larger than the long-poll timeout.
|
||||
func WithFastHTTPReadTimeout(t time.Duration) FastHTTPDoerOption {
|
||||
return func(d *FastHTTPDoer) { d.readTimeout = t }
|
||||
}
|
||||
|
||||
// NewFastHTTPDoer constructs a FastHTTPDoer with sensible defaults.
|
||||
func NewFastHTTPDoer(opts ...FastHTTPDoerOption) *FastHTTPDoer {
|
||||
d := &FastHTTPDoer{
|
||||
client: &fasthttp.Client{
|
||||
ReadTimeout: 90 * time.Second,
|
||||
WriteTimeout: 30 * time.Second,
|
||||
MaxIdleConnDuration: 90 * time.Second,
|
||||
},
|
||||
readTimeout: 30 * time.Second,
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(d)
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
// Do satisfies HTTPDoer by translating req into a pooled fasthttp.Request,
|
||||
// dispatching it, and returning a *http.Response whose Body releases the
|
||||
// pooled fasthttp.Response when Close is called.
|
||||
//
|
||||
// The conversion is intentionally minimal: URL goes via req.URL.RequestURI()
|
||||
// + Host (avoids re-parsing), header values move byte-for-byte, and the
|
||||
// body is taken straight from req.Body. *bytes.Buffer / *bytes.Reader are
|
||||
// recognised so we can pass the underlying bytes without io.ReadAll.
|
||||
func (d *FastHTTPDoer) Do(req *http.Request) (*http.Response, error) {
|
||||
if req == nil {
|
||||
return nil, errors.New("client: nil http.Request")
|
||||
}
|
||||
|
||||
fReq := fasthttp.AcquireRequest()
|
||||
defer fasthttp.ReleaseRequest(fReq)
|
||||
|
||||
fReq.SetRequestURI(req.URL.String())
|
||||
fReq.Header.SetMethod(req.Method)
|
||||
if req.Host != "" {
|
||||
fReq.Header.SetHost(req.Host)
|
||||
}
|
||||
for name, values := range req.Header {
|
||||
for _, v := range values {
|
||||
fReq.Header.Set(name, v)
|
||||
}
|
||||
}
|
||||
|
||||
if err := setFastHTTPBody(fReq, req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fResp := fasthttp.AcquireResponse()
|
||||
// fResp is released by fasthttpResponseBody.Close — caller is
|
||||
// expected to defer resp.Body.Close() per net/http contract.
|
||||
|
||||
deadline, hasDeadline := req.Context().Deadline()
|
||||
var err error
|
||||
if hasDeadline {
|
||||
err = d.client.DoDeadline(fReq, fResp, deadline)
|
||||
} else {
|
||||
err = d.client.DoTimeout(fReq, fResp, d.readTimeout)
|
||||
}
|
||||
if err != nil {
|
||||
fasthttp.ReleaseResponse(fResp)
|
||||
// Map fasthttp's timeout error to ctx.Err semantics so callers
|
||||
// can errors.Is(err, context.DeadlineExceeded).
|
||||
if hasDeadline && errors.Is(err, fasthttp.ErrTimeout) {
|
||||
return nil, context.DeadlineExceeded
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
httpResp := &http.Response{
|
||||
StatusCode: fResp.StatusCode(),
|
||||
Status: strconv.Itoa(fResp.StatusCode()) + " " + fastHTTPStatusText(fResp.StatusCode()),
|
||||
Header: make(http.Header, fResp.Header.Len()),
|
||||
ContentLength: int64(fResp.Header.ContentLength()),
|
||||
Body: &fasthttpResponseBody{resp: fResp, body: fResp.Body()},
|
||||
Request: req,
|
||||
Proto: "HTTP/1.1",
|
||||
ProtoMajor: 1,
|
||||
ProtoMinor: 1,
|
||||
}
|
||||
for k, v := range fResp.Header.All() {
|
||||
httpResp.Header.Add(string(k), string(v))
|
||||
}
|
||||
return httpResp, nil
|
||||
}
|
||||
|
||||
// setFastHTTPBody copies req.Body into fReq with the cheapest path that
|
||||
// preserves correctness. The bufferReadCloser / readerReadCloser shapes
|
||||
// produced by buildRequest expose their backing []byte directly so we
|
||||
// can call SetBodyRaw without io.ReadAll. Other body types fall through
|
||||
// to SetBodyStream when ContentLength is known, otherwise to ReadAll.
|
||||
func setFastHTTPBody(fReq *fasthttp.Request, req *http.Request) error {
|
||||
if req.Body == nil {
|
||||
return nil
|
||||
}
|
||||
switch v := req.Body.(type) {
|
||||
case bufferReadCloser:
|
||||
fReq.SetBodyRaw(v.Bytes())
|
||||
return nil
|
||||
case readerReadCloser:
|
||||
// *bytes.Reader.Bytes() returns the unread portion.
|
||||
size := v.Len()
|
||||
buf := make([]byte, size)
|
||||
_, err := v.Read(buf)
|
||||
if err != nil && !errors.Is(err, io.EOF) {
|
||||
return err
|
||||
}
|
||||
fReq.SetBodyRaw(buf)
|
||||
return nil
|
||||
default:
|
||||
if req.ContentLength > 0 {
|
||||
fReq.SetBodyStream(v, int(req.ContentLength))
|
||||
} else {
|
||||
body, err := io.ReadAll(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fReq.SetBodyRaw(body)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// fasthttpResponseBody adapts a pooled *fasthttp.Response so it satisfies
|
||||
// io.ReadCloser. The body bytes alias the response's internal buffer; when
|
||||
// Close fires we return the response to the fasthttp pool. Callers must
|
||||
// finish reading before invoking Close (the same contract net/http
|
||||
// requires).
|
||||
type fasthttpResponseBody struct {
|
||||
resp *fasthttp.Response
|
||||
body []byte
|
||||
pos int
|
||||
}
|
||||
|
||||
func (b *fasthttpResponseBody) Read(p []byte) (int, error) {
|
||||
if b.pos >= len(b.body) {
|
||||
return 0, io.EOF
|
||||
}
|
||||
n := copy(p, b.body[b.pos:])
|
||||
b.pos += n
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (b *fasthttpResponseBody) Close() error {
|
||||
if b.resp != nil {
|
||||
fasthttp.ReleaseResponse(b.resp)
|
||||
b.resp = nil
|
||||
b.body = nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// fastHTTPStatusText returns the textual reason phrase for a status code,
|
||||
// matching the format net/http produces for *http.Response.Status. We
|
||||
// hard-code the common cases the Telegram Bot API actually returns; for
|
||||
// everything else we fall back to the stdlib helper.
|
||||
func fastHTTPStatusText(code int) string {
|
||||
switch code {
|
||||
case http.StatusOK:
|
||||
return "OK"
|
||||
case http.StatusBadRequest:
|
||||
return "Bad Request"
|
||||
case http.StatusUnauthorized:
|
||||
return "Unauthorized"
|
||||
case http.StatusForbidden:
|
||||
return "Forbidden"
|
||||
case http.StatusNotFound:
|
||||
return "Not Found"
|
||||
case http.StatusTooManyRequests:
|
||||
return "Too Many Requests"
|
||||
case http.StatusInternalServerError:
|
||||
return "Internal Server Error"
|
||||
case http.StatusBadGateway:
|
||||
return "Bad Gateway"
|
||||
case http.StatusServiceUnavailable:
|
||||
return "Service Unavailable"
|
||||
case http.StatusGatewayTimeout:
|
||||
return "Gateway Timeout"
|
||||
default:
|
||||
return http.StatusText(code)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,94 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestFastHTTPDoer_BasicRoundTrip(t *testing.T) {
|
||||
got := make(chan struct{ method, ct, body string }, 1)
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
body, _ := io.ReadAll(r.Body)
|
||||
got <- struct{ method, ct, body string }{r.Method, r.Header.Get("Content-Type"), string(body)}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_, _ = io.WriteString(w, `{"ok":true,"result":42}`)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
d := NewFastHTTPDoer()
|
||||
req, err := http.NewRequest(http.MethodPost, srv.URL+"/sendMessage", strings.NewReader(`{"chat_id":1,"text":"hi"}`))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req = req.WithContext(context.Background())
|
||||
|
||||
resp, err := d.Do(req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
t.Fatalf("status: got %d", resp.StatusCode)
|
||||
}
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
if string(body) != `{"ok":true,"result":42}` {
|
||||
t.Fatalf("body: got %q", body)
|
||||
}
|
||||
|
||||
rec := <-got
|
||||
if rec.method != http.MethodPost {
|
||||
t.Fatalf("method: got %q", rec.method)
|
||||
}
|
||||
if rec.ct != "application/json" {
|
||||
t.Fatalf("content-type: got %q", rec.ct)
|
||||
}
|
||||
if rec.body != `{"chat_id":1,"text":"hi"}` {
|
||||
t.Fatalf("body: got %q", rec.body)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFastHTTPDoer_HonoursContextDeadline(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
_, _ = io.WriteString(w, "ok")
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
d := NewFastHTTPDoer(WithFastHTTPReadTimeout(time.Hour))
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Millisecond)
|
||||
defer cancel()
|
||||
req, _ := http.NewRequest(http.MethodGet, srv.URL, nil)
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
_, err := d.Do(req)
|
||||
if err == nil {
|
||||
t.Fatal("expected timeout error, got nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFastHTTPDoer_IntegratesWithBot(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = io.WriteString(w, `{"ok":true,"result":{"message_id":7,"date":0,"text":"hi"}}`)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
bot := New("123:abc",
|
||||
WithBaseURL(srv.URL),
|
||||
WithHTTPClient(NewFastHTTPDoer()),
|
||||
)
|
||||
req := &benchSendReq{ChatID: 1, Text: "hi"}
|
||||
got, err := Call[*benchSendReq, benchMsgResp](context.Background(), bot, "sendMessage", req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if got.MessageID != 7 || got.Text != "hi" {
|
||||
t.Fatalf("got %+v", got)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user