REST API 参考
FlowMQ 提供 REST API 用于管理消息基础设施,包括主题、队列、流和消息的程序化操作。
基础信息
Base URL
http://{your-flowmq-host}:{port}/api/v1认证
在请求头中携带 API Key:
Authorization: Bearer YOUR_API_KEY响应格式
成功响应:
json
{
"success": true,
"data": { ... },
"error": null
}错误响应:
json
{
"success": false,
"data": null,
"error": {
"code": "ERROR_CODE",
"message": "错误描述",
"details": {}
}
}HTTP 状态码
| 状态码 | 说明 |
|---|---|
| 200 | 请求成功 |
| 201 | 资源创建成功 |
| 400 | 请求参数无效 |
| 401 | 未认证或认证失败 |
| 403 | 权限不足 |
| 404 | 资源不存在 |
| 409 | 资源已存在 |
| 500 | 服务器内部错误 |
主题 (Topics)
发布消息
http
POST /topics/{topic_name}/messages请求体:
json
{
"payload": "Hello, FlowMQ!",
"content_type": "text/plain",
"properties": {
"user_id": "12345",
"priority": "high"
},
"partition_key": "user-12345"
}| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| payload | string | 是 | 消息内容 |
| content_type | string | 否 | MIME 类型,默认 application/json |
| properties | object | 否 | 自定义消息属性 |
| partition_key | string | 否 | 分区键 |
响应:
json
{
"success": true,
"data": {
"message_id": "msg_abc123",
"topic": "orders/eu/new",
"timestamp": "2024-01-15T10:30:00Z"
}
}批量发布
http
POST /topics/{topic_name}/messages/batch请求体:
json
{
"messages": [
{ "payload": "Message 1", "properties": {"batch_id": "batch-1"} },
{ "payload": "Message 2", "properties": {"batch_id": "batch-1"} }
]
}队列 (Queues)
创建队列
http
POST /queues请求体:
json
{
"name": "order-processing-queue",
"topic_filter": "orders/#",
"max_size_mb": 1024,
"message_ttl_seconds": 86400,
"max_retries": 3,
"dead_letter_topic": "orders/dead-letter"
}| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| name | string | 是 | 队列名称 |
| topic_filter | string | 是 | 主题过滤器(如 orders/#) |
| max_size_mb | int | 否 | 最大容量(MB),默认 1024 |
| message_ttl_seconds | int | 否 | 消息 TTL(秒),默认 86400 |
| max_retries | int | 否 | 最大重试次数,默认 3 |
| dead_letter_topic | string | 否 | 死信主题 |
列出队列
http
GET /queues?limit=50&offset=0获取队列详情
http
GET /queues/{queue_name}删除队列
http
DELETE /queues/{queue_name}消费消息
http
POST /queues/{queue_name}/messages/consume请求体:
json
{
"max_messages": 10,
"timeout_seconds": 30,
"visibility_timeout_seconds": 300
}响应:
json
{
"success": true,
"data": {
"messages": [
{
"message_id": "msg_abc123",
"payload": "Order #12345",
"content_type": "application/json",
"properties": {"priority": "high"},
"publish_time": "2024-01-15T10:30:00Z",
"receive_count": 1,
"receipt_handle": "receipt_xyz789"
}
],
"count": 1
}
}确认消息
http
POST /queues/{queue_name}/messages/{message_id}/ack请求体:
json
{
"receipt_handle": "receipt_xyz789"
}流 (Streams)
创建流
http
POST /streams请求体:
json
{
"name": "order-events-stream",
"topic_filter": "orders/#",
"retention_days": 30,
"max_size_gb": 100,
"partition_count": 3
}| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| name | string | 是 | 流名称 |
| topic_filter | string | 是 | 主题过滤器 |
| retention_days | int | 否 | 消息保留天数,默认 7 |
| max_size_gb | int | 否 | 最大容量(GB),默认 10 |
| partition_count | int | 否 | 分区数,默认 1 |
列出流
http
GET /streams?limit=50&offset=0获取流详情
http
GET /streams/{stream_name}删除流
http
DELETE /streams/{stream_name}注意:以上 API 接口以企业版实际交付版本为准,具体参数和行为可能因版本而异。