Паттерн start stop на golang

Рейтинг: 0Ответов: 2Опубликовано: 25.07.2023

Есть ряд компонентов у которых общий функционал запуск и остановка сервиса (с возможностью перезапуска). Надо этот общий функционал выделить в отдельный компонент. Поддержка конкурентности. Описание требований к интерфейсу.

start_stop_pattern

import (
    "context"

    "github.com/pkg/errors"
)

var (
    ErrAlreadyRunning = errors.New("already running")
    ErrNothingStopped = errors.New("nothing stopped")
)

// New
// f функция пользователя с контекстом, через который
// функция может узнать, что был вызван Stop.
// Пользовательская функция не обязана использовать ctx.
func New(f func(ctx context.Context) error) *Service {
    return &Service{}
}

type Service struct{}

// Start
// Запускает переданную f функцию пользователя  и блокируется до тех пор, пока f не будет выполнена.
// Если другой поток вызывает Start и она уже запущена, то возвращает ErrAlreadyRunning.
func (s *Service) Start() error {}

// Stop
// Останавливает выполнение f пользовательской функции, если Start запущен и ждет ее завершения.
// Если Start не работает, то возвращает ошибку ErrNothingStopped.
// После вызова Stop можно снова запускать Start.
func (s *Service) Stop() error {}

Тесты

package start_stop_pattern

import (
    "context"
    "log"
    "sync"
    "testing"
    "time"

    "github.com/stretchr/testify/require"
)

func TestService(t *testing.T) {
    t.Run("race test", func(t *testing.T) {
        srv := New(func(ctx context.Context) error {
            for {
                select {
                case <-ctx.Done():
                    return nil
                case <-time.NewTicker(1 * time.Millisecond).C:
                }
            }
        }, time.Millisecond*10)

        wg := sync.WaitGroup{}
        for i := 0; i < 1000; i++ {
            wg.Add(1)
            go func() {
                _ = srv.Start()
                wg.Done()
            }()

            wg.Add(1)
            go func() {
                srv.Stop()
                wg.Done()
            }()
        }

        wg.Wait()
    })
    t.Run("running with done", func(t *testing.T) {
        srv := New(func(ctx context.Context) error {
            for {
                select {
                case <-ctx.Done():
                    log.Println("service stop")
                    return nil
                case <-time.NewTicker(100 * time.Millisecond).C:
                    log.Println("some do")
                }
            }
        }, time.Millisecond*100)

        done := make(chan struct{})
        go func() {
            time.Sleep(1 * time.Second)
            require.ErrorIs(t, srv.Start(), ErrAlreadyRunning)
            require.NoError(t, srv.Stop())

            time.Sleep(1 * time.Second)
            require.ErrorIs(t, srv.Stop(), ErrNothingToStop)

            close(done)
        }()

        require.NoError(t, srv.Start())

        <-done
    })
    t.Run("running without done", func(t *testing.T) {
        srv := New(func(ctx context.Context) error {
            time.Sleep(1 * time.Second)
            return nil
        }, time.Millisecond*10)

        done := make(chan struct{})
        go func() {
            time.Sleep(100 * time.Millisecond)
            require.ErrorIs(t, srv.Start(), ErrAlreadyRunning)
            require.ErrorIs(t, srv.Stop(), ErrNothingStopped)

            time.Sleep(100 * time.Millisecond)
            require.ErrorIs(t, srv.Stop(), ErrNothingStopped)

            close(done)
        }()

        require.NoError(t, srv.Start())

        <-done

        srv.Stop()
    })
}

Не получается реализовать функционал, удовлетворящий всем требованиям.

Ответы

▲ 0

Я бы делал так:

package start_stop_pattern

import (
    "context"
    "sync"

    "github.com/pkg/errors"
)

var (
    ErrAlreadyRunning = errors.New("already running")
    ErrNothingStopped = errors.New("nothing stopped")
)

type TheFunc func(context.Context) error

type Service struct {
    mx        sync.Mutex
    f         TheFunc
    running   bool
    cancel    context.CancelFunc
    wg        *sync.WaitGroup
    lastError error
}

// New
// f функция пользователя с контекстом, через который
// функция может узнать, что был вызван Stop.
// Пользовательская функция не обязана использовать ctx.
func New(f TheFunc) *Service {
    return &Service{

        f: f,
    }
}

// Start
// Запускает переданную f функцию пользователя.
// Если Start уже запущен, то возвращает ErrAlreadyRunning.
func (s *Service) Start(ctx context.Context) error {
    s.mx.Lock()
    defer s.mx.Unlock()
    return s.start(ctx)
}

// TryStart
// Пытается запустить переданную f функцию пользователя.
// Если в текущий момент времени в другой горутине выполняется Start или Stop, возвращает `false, nil`
// Если функция уже выполняется, то возвращает true, ErrAlreadyRunning.
// В случае успешного запуска возвращает `true, nil`
func (s *Service) TryStart(ctx context.Context) (bool, error) {
    ok := s.mx.TryLock()
    if !ok {
        return false, nil
    }
    defer s.mx.Unlock()
    return true, s.start(ctx)
}

// Служебная функция для запуска
func (s *Service) start(ctx context.Context) error {
    if s.running {
        return ErrAlreadyRunning
    }
    s.running = true
    s.lastError = nil
    s.wg = &sync.WaitGroup{}
    ctx, s.cancel = context.WithCancel(ctx)

    s.wg.Add(1)
    go runner(s.f, ctx, s.wg, &s.lastError)
    return nil
}

// запускает функцию и ждёт завершение.
// Записывает результат по указателю `p_error`
// Уменьшает счётчик группы синхронизации
func runner(f TheFunc, ctx context.Context, wg *sync.WaitGroup, p_error *error) {
    defer wg.Done()
    err := f(ctx)
    if p_error != nil {
        *p_error = err
    }
}

// Stop
// Останавливает выполнение f пользовательской функции, если Start запущен и ждет ее завершения.
// Если Start не работает, то возвращает ошибку ErrNothingStopped.
// После вызова Stop можно снова запускать Start.
func (s *Service) Stop() error {
    s.mx.Lock()
    defer s.mx.Unlock()

    if !s.running {
        return ErrNothingStopped
    }
    s.cancel()
    s.wg.Wait()
    s.running = false
    return nil
}

// Возвращает `true` если функция запущена.
func (s *Service) IsRunning() bool {
    return s.running
}

// Возвращает ошибку последнего запуска
func (s *Service) LastError() error {
    return s.lastError
}

Никаких гонок, так как все записи в состояние происходят под защитой мутекса. Добавлен метод TryStart для пробного запуска, и методы IsRunning и LastError для проверки запуска и для доступа к ошибке, возвращённой функцией f.

▲ 0

Ммм... вы уверены, что у вас правильные тесты?

Решение:

package start_stop_pattern

import (
    "context"
    "errors"
    "sync"
    "time"
)

var (
    ErrAlreadyRunning = errors.New("already running")
    ErrNothingStopped = errors.New("nothing stopped")
)

// New
// f функция пользователя с контекстом, через который
// функция может узнать, что был вызван Stop.
// Пользовательская функция не обязана использовать ctx.
// func New(f func(ctx context.Context) error) *Service {
func New(f func(ctx context.Context) error, timeout time.Duration) *Service {
    done := make(chan struct{})
    close(done)
    return &Service{f: f, done: done, timeout: timeout}
}

type Service struct {
    f       func(ctx context.Context) error
    cancel  context.CancelFunc
    timeout time.Duration
    mu      sync.RWMutex  // защищаем изменение Service, устраняем гонки
    rmu     sync.Mutex    // гарантируем уникальность выполнения f
    done    chan struct{} // закрыт, когда Service is not running
}

// Start
// Запускает переданную f функцию пользователя  и блокируется до тех пор, пока f не будет выполнена.
// Если другой поток вызывает Start и она уже запущена, то возвращает ErrAlreadyRunning.
func (s *Service) Start() error {
    done := make(chan struct{})
    defer close(done)

    if !s.rmu.TryLock() {
        return ErrAlreadyRunning
    }
    defer s.rmu.Unlock()

    ctx, cancel := context.WithTimeout(context.Background(), s.timeout)
    defer cancel()

    s.mu.Lock()
    s.done = done
    s.cancel = cancel
    s.mu.Unlock()

    if err := s.f(ctx); err != nil {
        return err
    }

    return nil
}

// Stop
// Останавливает выполнение f пользовательской функции, если Start запущен и ждет ее завершения.
// Если Start не работает, то возвращает ошибку ErrNothingStopped.
// После вызова Stop можно снова запускать Start.
func (s *Service) Stop() error {
    s.mu.RLock()
    defer s.mu.RUnlock()

    select {
    case <-s.done:
        return ErrNothingStopped
    default:
    }

    s.cancel()
    <-s.done

    return nil
}

Тесты:

package start_stop_pattern

import (
    "context"
    "log"
    "sync"
    "testing"
    "time"

    "github.com/stretchr/testify/require"
)

func TestService(t *testing.T) {
    t.Run("race test", func(t *testing.T) {
        srv := New(func(ctx context.Context) error {
            for {
                select {
                case <-ctx.Done():
                    return nil
                case <-time.NewTicker(1 * time.Millisecond).C:
                }
            }
        }, time.Millisecond*10)

        wg := sync.WaitGroup{}
        for i := 0; i < 1000; i++ {
            wg.Add(1)
            go func() {
                _ = srv.Start()
                wg.Done()
            }()

            wg.Add(1)
            go func() {
                srv.Stop()
                wg.Done()
            }()
        }

        wg.Wait()
    })
    t.Run("running with done", func(t *testing.T) {
        srv := New(func(ctx context.Context) error {
            for {
                select {
                case <-ctx.Done():
                    log.Println("service stop")
                    return nil
                case <-time.NewTicker(100 * time.Millisecond).C:
                    log.Println("some do")
                }
            }
        }, time.Millisecond*1000)
        //}, time.Millisecond*100)

        done := make(chan struct{})
        go func() {
            //time.Sleep(1 * time.Second)
            time.Sleep(150 * time.Millisecond)
            require.ErrorIs(t, srv.Start(), ErrAlreadyRunning)
            require.NoError(t, srv.Stop())

            //time.Sleep(1 * time.Second)
            time.Sleep(100 * time.Millisecond)
            //require.ErrorIs(t, srv.Stop(), ErrNothingToStop)
            require.ErrorIs(t, srv.Stop(), ErrNothingStopped)

            close(done)
        }()

        require.ErrorIs(t, srv.Stop(), ErrNothingStopped)
        require.NoError(t, srv.Start())

        <-done
    })
    t.Run("running without done", func(t *testing.T) {
        srv := New(func(ctx context.Context) error {
            time.Sleep(1 * time.Second)
            return nil
        }, time.Millisecond*10)

        done := make(chan struct{})
        go func() {
            time.Sleep(100 * time.Millisecond)
            require.ErrorIs(t, srv.Start(), ErrAlreadyRunning)
            //require.ErrorIs(t, srv.Stop(), ErrNothingStopped)
            require.NoError(t, srv.Stop())

            time.Sleep(100 * time.Millisecond)
            require.ErrorIs(t, srv.Stop(), ErrNothingStopped)

            close(done)
        }()

        require.ErrorIs(t, srv.Stop(), ErrNothingStopped)
        require.NoError(t, srv.Start())

        <-done

        //srv.Stop()
    })
}