mirror of
https://github.com/lukaszraczylo/go-telegram.git
synced 2026-06-05 22:43:59 +00:00
62c76e7e4e
Both SetWebhookParams.AllowedUpdates and GetUpdatesParams.AllowedUpdates
(plus the WebhookInfo.AllowedUpdates field on the response side) were
typed []string, forcing callers to cast every typed constant:
AllowedUpdates: []string{
string(api.UpdateMessage),
string(api.UpdateMyChatMember),
...
}
Switch to []UpdateType so the same call site is typo-safe end-to-end:
AllowedUpdates: []UpdateType{
api.UpdateMessage,
api.UpdateMyChatMember,
...
}
Wire format is unchanged — UpdateType is type UpdateType string, marshals
identically as JSON strings. The MultipartFields()/runtime.go encoding
paths likewise continue to work via json.Marshal on the typed slice.
Implementation note: api/methods.gen.go and api/types.gen.go are
generated by cmd/genapi from internal/spec/api.json, where the field is
described as Array of String. The Telegram docs do not enumerate the
allowed_updates values inline, so the scraper cannot synthesise the
enum and UpdateType lives hand-curated in api/enums.go (see existing
doc comment there). The retype is therefore done as a small pinned
override inside cmd/genapi/emitter.go's goField — keyed on the wire
field name allowed_updates with elem type string — so the change
survives a future regeneration of the .gen.go files. transport/longpoll
drops the now-unnecessary []string conversion.
Backward incompatibility: callers passing untyped []string variables
will need to convert; callers using untyped string literals inside the
slice ARE fine because Go's untyped-literal rule auto-converts.
126 lines
2.9 KiB
Go
126 lines
2.9 KiB
Go
package transport
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/lukaszraczylo/go-telegram/api"
|
|
"github.com/lukaszraczylo/go-telegram/client"
|
|
)
|
|
|
|
// LongPoller pulls updates via Bot.GetUpdates in a loop, advancing the
|
|
// offset cursor after each batch. It applies BackoffStrategy on transient
|
|
// errors (network failures, 5xx, 429).
|
|
//
|
|
// At-least-once semantics on shutdown: when ctx is cancelled or Stop is
|
|
// called mid-batch, any updates already fetched but not yet dispatched are
|
|
// dropped without advancing the offset. On the next restart those updates
|
|
// will be re-delivered by Telegram.
|
|
type LongPoller struct {
|
|
Bot *client.Bot
|
|
Timeout int // seconds, default 30
|
|
Limit int // 1..100, default 100
|
|
AllowedTypes []api.UpdateType
|
|
Backoff BackoffStrategy
|
|
|
|
out chan api.Update
|
|
once sync.Once
|
|
stop chan struct{}
|
|
}
|
|
|
|
// NewLongPoller constructs a LongPoller with sensible defaults.
|
|
func NewLongPoller(b *client.Bot) *LongPoller {
|
|
return &LongPoller{
|
|
Bot: b,
|
|
Timeout: 30,
|
|
Limit: 100,
|
|
Backoff: DefaultBackoff(),
|
|
out: make(chan api.Update, 64),
|
|
stop: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Updates implements Updater.
|
|
func (p *LongPoller) Updates() <-chan api.Update { return p.out }
|
|
|
|
// Run implements Updater. It blocks until ctx is cancelled, Stop is
|
|
// called, or a fatal error occurs (e.g. unauthorized). See LongPoller
|
|
// for at-least-once delivery semantics on shutdown.
|
|
func (p *LongPoller) Run(ctx context.Context) error {
|
|
defer close(p.out)
|
|
|
|
var offset int64
|
|
failures := 0
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-p.stop:
|
|
return nil
|
|
default:
|
|
}
|
|
|
|
params := &api.GetUpdatesParams{Offset: &offset}
|
|
if p.Limit > 0 {
|
|
lim := int64(p.Limit)
|
|
params.Limit = &lim
|
|
}
|
|
if p.Timeout > 0 {
|
|
to := int64(p.Timeout)
|
|
params.Timeout = &to
|
|
}
|
|
if len(p.AllowedTypes) > 0 {
|
|
params.AllowedUpdates = p.AllowedTypes
|
|
}
|
|
ups, err := api.GetUpdates(ctx, p.Bot, params)
|
|
if err != nil {
|
|
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
|
return err
|
|
}
|
|
// Fatal: unauthorized -> bail.
|
|
if errors.Is(err, client.ErrUnauthorized) {
|
|
return err
|
|
}
|
|
var ae *client.APIError
|
|
var delay time.Duration
|
|
if errors.As(err, &ae) && ae.RetryAfter() > 0 {
|
|
delay = ae.RetryAfter()
|
|
// Don't escalate failures count — Telegram is dictating the wait.
|
|
} else {
|
|
failures++
|
|
delay = p.Backoff.NextDelay(failures)
|
|
}
|
|
select {
|
|
case <-time.After(delay):
|
|
continue
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-p.stop:
|
|
return nil
|
|
}
|
|
}
|
|
failures = 0
|
|
|
|
for _, u := range ups {
|
|
select {
|
|
case p.out <- u:
|
|
if u.UpdateID >= offset {
|
|
offset = u.UpdateID + 1
|
|
}
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-p.stop:
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stop implements Updater.
|
|
func (p *LongPoller) Stop(ctx context.Context) error {
|
|
p.once.Do(func() { close(p.stop) })
|
|
return nil
|
|
}
|