2025-05-28 16:19:21 +08:00

398 lines
11 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package server
import (
"context"
"encoding/json"
"fmt"
"net"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/emsoft/HospitalPay-Go/internal/config"
"github.com/emsoft/HospitalPay-Go/internal/model"
"github.com/emsoft/HospitalPay-Go/internal/pkg/errcode"
"github.com/emsoft/HospitalPay-Go/internal/pkg/logger"
"github.com/emsoft/HospitalPay-Go/internal/pkg/metrics"
"github.com/emsoft/HospitalPay-Go/internal/service"
)
// SocketServer Socket 服务器
type SocketServer struct {
criminalService service.ICriminalService
listener net.Listener
connLimit chan struct{}
wg sync.WaitGroup
shutdown chan struct{}
}
// NewSocketServer 创建 Socket 服务器
func NewSocketServer() *SocketServer {
return &SocketServer{
criminalService: service.NewCriminalService(),
connLimit: make(chan struct{}, config.GlobalConfig.Server.MaxConnNum),
shutdown: make(chan struct{}),
}
}
// Start 启动服务器
func (s *SocketServer) Start() error {
var err error
s.listener, err = net.Listen("tcp", ":"+config.GlobalConfig.Server.Port)
if err != nil {
return fmt.Errorf("failed to start socket server: %v", err)
}
// 注册信号处理
go s.handleSignal()
logger.Infof("Socket server started on port %s", config.GlobalConfig.Server.Port)
// 开始接受连接
go s.acceptConnections()
// 等待关闭
<-s.shutdown
return nil
}
// 接受连接
func (s *SocketServer) acceptConnections() {
for {
conn, err := s.listener.Accept()
if err != nil {
select {
case <-s.shutdown:
return
default:
logger.Errorf("Failed to accept connection: %v", err)
continue
}
}
// 记录指标
metrics.ActiveConnections.Inc()
// 并发控制
select {
case s.connLimit <- struct{}{}:
s.wg.Add(1)
go func() {
defer func() {
<-s.connLimit
metrics.ActiveConnections.Dec()
s.wg.Done()
}()
s.handleConnection(conn)
}()
default:
logger.Warnf("Connection limit reached, rejecting connection from %s", conn.RemoteAddr())
conn.Close()
}
}
}
// 处理连接
func (s *SocketServer) handleConnection(conn net.Conn) {
defer conn.Close()
// 设置超时
if err := conn.SetDeadline(time.Now().Add(config.GlobalConfig.Server.ReadTimeout)); err != nil {
logger.Errorf("Failed to set deadline: %v", err)
return
}
// 记录连接信息
remoteAddr := conn.RemoteAddr().String()
logger.Infof("New connection from %s", remoteAddr)
buffer := make([]byte, 4096)
n, err := conn.Read(buffer)
if err != nil {
logger.Errorf("Failed to read from connection: %v", err)
return
}
message := string(buffer[:n])
if len(message) < 31 { // 最小消息长度4(长度) + 4(功能码) + 4(医院编码) + 19(时间戳)
logger.Errorf("Invalid message format from %s: %s", remoteAddr, message)
return
}
// 解析消息
length := message[:4]
functionCode := message[4:8]
hospitalCode := message[8:12]
timestamp := message[12:31]
data := message[31:]
logger.Infof("Received message from %s: length=%s, functionCode=%s, hospitalCode=%s, timestamp=%s, data=%s",
remoteAddr, length, functionCode, hospitalCode, timestamp, data)
// 请求计数
metrics.RequestCounter.WithLabelValues(functionCode).Inc()
start := time.Now()
// 验证医院编码
if hospitalCode != config.GlobalConfig.Server.HospitalCode {
logger.Warnf("Invalid hospital code from %s: %s", remoteAddr, hospitalCode)
s.sendResponse(conn, &model.CriminalResponse{
ResultCode: errcode.InvalidParams.Code,
ResultMsg: "无效的医院编码",
})
return
}
// 创建上下文
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// 处理请求
var response *model.CriminalResponse
switch functionCode {
case "0001":
response = s.handleCriminalIn(ctx, data)
case "0002":
response = s.handleConsumeQuota(ctx, data)
case "0003":
response = s.handleCriminalOut(ctx, data)
case "0004":
response = s.handleConsumeRecord(ctx, data)
case "0005":
response = s.handleRealTimeBalance(ctx, data)
case "0006":
response = s.handleInvoiceSync(ctx, data)
default:
response = &model.CriminalResponse{
ResultCode: errcode.ServerError.Code,
ResultMsg: "未知的功能码",
}
}
// 记录处理时间
duration := time.Since(start).Seconds()
metrics.RequestDuration.WithLabelValues(functionCode).Observe(duration)
// 发送响应
s.sendResponse(conn, response)
}
// 处理入院登记
func (s *SocketServer) handleCriminalIn(ctx context.Context, data string) *model.CriminalResponse {
var req model.CriminalRequest
if err := json.Unmarshal([]byte(data), &req); err != nil {
logger.Errorf("Failed to unmarshal CriminalIn request: %v", err)
metrics.ErrorCounter.WithLabelValues("0001", errcode.InvalidParams.Code).Inc()
return &model.CriminalResponse{
ResultCode: errcode.InvalidParams.Code,
ResultMsg: "请求数据格式错误",
}
}
resp, err := s.criminalService.CriminalIn(ctx, req.FCode)
if err != nil {
logger.Errorf("Failed to process CriminalIn: %v", err)
metrics.ErrorCounter.WithLabelValues("0001", errcode.ServerError.Code).Inc()
return &model.CriminalResponse{
ResultCode: errcode.ServerError.Code,
ResultMsg: err.Error(),
}
}
return resp
}
// 处理消费额度查询
func (s *SocketServer) handleConsumeQuota(ctx context.Context, data string) *model.CriminalResponse {
var req model.CriminalRequest
if err := json.Unmarshal([]byte(data), &req); err != nil {
logger.Errorf("Failed to unmarshal ConsumeQuota request: %v", err)
metrics.ErrorCounter.WithLabelValues("0002", errcode.InvalidParams.Code).Inc()
return &model.CriminalResponse{
ResultCode: errcode.InvalidParams.Code,
ResultMsg: "请求数据格式错误",
}
}
resp, err := s.criminalService.ConsumeQuota(ctx, req.FCode)
if err != nil {
logger.Errorf("Failed to process ConsumeQuota: %v", err)
metrics.ErrorCounter.WithLabelValues("0002", errcode.ServerError.Code).Inc()
return &model.CriminalResponse{
ResultCode: errcode.ServerError.Code,
ResultMsg: err.Error(),
}
}
return resp
}
// 处理出院
func (s *SocketServer) handleCriminalOut(ctx context.Context, data string) *model.CriminalResponse {
var req model.CriminalRequest
if err := json.Unmarshal([]byte(data), &req); err != nil {
logger.Errorf("Failed to unmarshal CriminalOut request: %v", err)
metrics.ErrorCounter.WithLabelValues("0003", errcode.InvalidParams.Code).Inc()
return &model.CriminalResponse{
ResultCode: errcode.InvalidParams.Code,
ResultMsg: "请求数据格式错误",
}
}
resp, err := s.criminalService.CriminalOut(ctx, req.FCode)
if err != nil {
logger.Errorf("Failed to process CriminalOut: %v", err)
metrics.ErrorCounter.WithLabelValues("0003", errcode.ServerError.Code).Inc()
return &model.CriminalResponse{
ResultCode: errcode.ServerError.Code,
ResultMsg: err.Error(),
}
}
return resp
}
// 处理消费记录
func (s *SocketServer) handleConsumeRecord(ctx context.Context, data string) *model.CriminalResponse {
var record model.ConsumeRecord
if err := json.Unmarshal([]byte(data), &record); err != nil {
logger.Errorf("Failed to unmarshal ConsumeRecord request: %v", err)
metrics.ErrorCounter.WithLabelValues("0004", errcode.InvalidParams.Code).Inc()
return &model.CriminalResponse{
ResultCode: errcode.InvalidParams.Code,
ResultMsg: "请求数据格式错误",
}
}
resp, err := s.criminalService.ConsumeRecord(ctx, &record)
if err != nil {
logger.Errorf("Failed to process ConsumeRecord: %v", err)
metrics.ErrorCounter.WithLabelValues("0004", errcode.ServerError.Code).Inc()
return &model.CriminalResponse{
ResultCode: errcode.ServerError.Code,
ResultMsg: err.Error(),
}
}
return resp
}
// 处理实时余额查询
func (s *SocketServer) handleRealTimeBalance(ctx context.Context, data string) *model.CriminalResponse {
var req model.CriminalRequest
if err := json.Unmarshal([]byte(data), &req); err != nil {
logger.Errorf("Failed to unmarshal RealTimeBalance request: %v", err)
metrics.ErrorCounter.WithLabelValues("0005", errcode.InvalidParams.Code).Inc()
return &model.CriminalResponse{
ResultCode: errcode.InvalidParams.Code,
ResultMsg: "请求数据格式错误",
}
}
resp, err := s.criminalService.RealTimeBalance(ctx, req.FCode)
if err != nil {
logger.Errorf("Failed to process RealTimeBalance: %v", err)
metrics.ErrorCounter.WithLabelValues("0005", errcode.ServerError.Code).Inc()
return &model.CriminalResponse{
ResultCode: errcode.ServerError.Code,
ResultMsg: err.Error(),
}
}
return resp
}
// 处理发票同步
func (s *SocketServer) handleInvoiceSync(ctx context.Context, data string) *model.CriminalResponse {
var req model.InvoiceSync
if err := json.Unmarshal([]byte(data), &req); err != nil {
logger.Errorf("Failed to unmarshal InvoiceSync request: %v", err)
metrics.ErrorCounter.WithLabelValues("0006", errcode.InvalidParams.Code).Inc()
return &model.CriminalResponse{
ResultCode: errcode.InvalidParams.Code,
ResultMsg: "请求数据格式错误",
}
}
resp, err := s.criminalService.InvoiceSync(ctx, req.InvoiceList)
if err != nil {
logger.Errorf("Failed to process InvoiceSync: %v", err)
metrics.ErrorCounter.WithLabelValues("0006", errcode.ServerError.Code).Inc()
return &model.CriminalResponse{
ResultCode: errcode.ServerError.Code,
ResultMsg: err.Error(),
}
}
return resp
}
// 发送响应
func (s *SocketServer) sendResponse(conn net.Conn, response *model.CriminalResponse) {
jsonData, err := json.Marshal(response)
if err != nil {
logger.Errorf("Failed to marshal response: %v", err)
return
}
// 设置写超时
if err := conn.SetWriteDeadline(time.Now().Add(config.GlobalConfig.Server.WriteTimeout)); err != nil {
logger.Errorf("Failed to set write deadline: %v", err)
return
}
// 构造响应消息:长度(4位) + JSON数据
message := fmt.Sprintf("%04d%s", len(jsonData), string(jsonData))
if _, err := conn.Write([]byte(message)); err != nil {
logger.Errorf("Failed to send response: %v", err)
return
}
logger.Infof("Response sent to %s: %s", conn.RemoteAddr().String(), message)
}
// 优雅关闭
func (s *SocketServer) Stop() {
logger.Info("Stopping socket server...")
// 关闭监听器,停止接受新连接
if s.listener != nil {
s.listener.Close()
}
// 等待所有连接处理完成
logger.Info("Waiting for all connections to finish...")
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
// 设置超时
select {
case <-done:
logger.Info("All connections finished")
case <-time.After(10 * time.Second):
logger.Warn("Timeout waiting for connections to finish")
}
// 通知关闭完成
close(s.shutdown)
}
// 处理信号
func (s *SocketServer) handleSignal() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
// 等待信号
sig := <-c
logger.Infof("Received signal: %v", sig)
// 停止服务器
s.Stop()
}