Queue

queue组件是elves的队列任务组件,可以管理elves的队列任务,且队列任务支持任务间依赖,根据队列任务内容向scheduler发起异步任务。

安装过程

编译

cd elves-scheduler
chmod +x ./control
./control build                                                 #二进制版本可以忽略编译过程

配置

mv conf/conf.properties.example conf/conf.properties            #复制配置文件
vim conf/conf.properties                                        #编辑配置文件

./conf/conf.properties

#Zookeeper Config
zookeeper.host=127.0.0.1        #Zookeeper地址
zookeeper.outTime=10000         #Zookeeper超时时间
zookeeper.root=/elves           #Zookeeper ROOT地址 

#MQ Basic Config
mq.ip       = 127.0.0.1         #RabbitMQ IP
mq.port     = 5672              #RabbitMQ 端口
mq.user     = admin             #RABBITMQ 账号
mq.password =                   #RABBITMQ 密码
mq.exchange = elves             #Exchange 名称  

#jdbc conf
jdbc.type=mysql
jdbc.driver=com.mysql.jdbc.Driver
jdbc.pool.init=1
jdbc.pool.minIdle=3
jdbc.pool.maxActive=20
jdbc.testSql=SELECT 'x' FROM DUAL
jdbc.url=jdbc\:mysql\://192.168.0.1\:3306/elves_queue?characterEncoding=UTF-8&useOldAliasMetadataBehavior=true&zeroDateTimeBehavior=convertToNull
jdbc.username=mysql
jdbc.password=mysql

脚本参数

./control

build|pack|start|stop|restart|status|version

build   : 运行后将执行mvn pakcge , 最终构建成至 bin
pack    : 将本模块打包(不包含配置文件与日志文件)
start   : 以nohup形式启动elves-{module}
stop    : 关闭elves-{module}
restart : 执行 stop & start
status  : 查看elves-{module}的运行状态
version : 查看当前模块的版本

队列流程

Mysql数据库结构

queue模块计划任务的存储使用mysql实现,数据库结构如下:

queue表:
CREATE TABLE `queue` (
  `queue_id` varchar(16) NOT NULL COMMENT '队列ID',
  `app` varchar(25) DEFAULT NULL COMMENT 'APP',
  `createtime` datetime DEFAULT NULL COMMENT '创建时间',
  `committime` datetime DEFAULT NULL COMMENT '提交时间',
  `status` enum('pendding','running','stoped') DEFAULT 'pendding' COMMENT '队列状态',
  PRIMARY KEY (`queue_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
task_list表
CREATE TABLE `task_list` (
  `task_id` varchar(16) NOT NULL COMMENT '任务ID',
  `queue_id` varchar(16) NOT NULL COMMENT '队列ID',
  `ip` varchar(15) NOT NULL COMMENT 'AgentIP',
  `mode` enum('np','n') NOT NULL COMMENT '模式',
  `app` varchar(32) NOT NULL COMMENT '模块',
  `func` varchar(32) NOT NULL COMMENT '指令',
  `param` text COMMENT '参数',
  `timeout` int(11) DEFAULT '0' COMMENT '超时时间',
  `proxy` varchar(35) DEFAULT NULL COMMENT '代理器',
  `depend_task_id` varchar(16) NOT NULL DEFAULT 'null' COMMENT '依赖的任务id',
  `create_time` datetime NOT NULL COMMENT '创建时间',
  `flag` int(11) DEFAULT NULL COMMENT '返回值',
  `error` text COMMENT '返回错误',
  `worker_flag` int(11) DEFAULT NULL COMMENT 'worker执行状态',
  `worker_message` text COMMENT 'worker执行结果内容',
  `worker_costtime` int(11) DEFAULT NULL COMMENT 'worker执行耗时',
  `exec_finish_time` datetime DEFAULT NULL COMMENT '结果回收时间',
  `status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '状态(0:等待,1:运行,2:结束)',
  PRIMARY KEY (`task_id`),
  KEY `queue_id` (`queue_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='队列任务表'

组件服务

queue模块主要对elves提供队列任务的操作接口,具体如下:

RoutingKey : *.queue

服务提供列表

服务 类型 注释
createQueue rpc.call 创建队列
addTask rpc.call 添加任务项
commitQueue rpc.call 提交队列
stopQueue rpc.call 停止队列
queueResult rpc.call 获取队列执行结果
taskResult rpc.cast 队列任务直接结果处理

服务提供详情

createQueue:
接收消息:
{
    "mqkey":"{组件}.queue.createQueue",
    "mqtype":"call.EC0EF718FCC4130",
    "mqbody":{
        "app":"testApp"
    }
}

回复消息:"发送RoutingKey:EC0EF718FCC4130"
{
    "mqkey":"queue.{组件}",
    "mqtype":"cast",
    "mqbody":{
        "flag":"true",
        "error":"",
        "id":"12d6af3b2e5d4c2e"
    }
}
addTask:
接收消息:
{
    "mqkey":"{组件}.queue.addTask",
    "mqtype":"call.EC0EF718FCC41307",
    "mqbody":{
        "id":"12d6af3b2e5d4c2e",
        "ip":"192.168.1.1",
        "mode":"NP",
        "func":"test",
        "param":"",
        "timeout":20,
        "proxy":"",
        "depend_task_id":""
    }
}

回复消息:"发送RoutingKey:EC0EF718FCC41307"
{
    "mqkey":"queue.{组件}",
    "mqtype":"cast",
    "mqbody":{
        "flag":"true",
        "error":"",
        "id":"ddd6af3b2e5d4c2e"
    }
}
commitQueue:
接收消息:
{
    "mqkey":"{组件}.queue.commitQueue",
    "mqtype":"call.EC0EF718FCC41388",
    "mqbody":{
        "id":"12d6af3b2e5d4c2e"
    }
}

回复消息:"发送RoutingKey:EC0EF718FCC41388"
{
    "mqkey":"queue.{组件}",
    "mqtype":"cast",
    "mqbody":{
        "flag": "true",
        "error": ""
    }
}
stopQueue:
接收消息:
{
    "mqkey":"{组件}.queue.stopQueue",
    "mqtype":"call.AAAEF718FCC41307",
    "mqbody":{
        "id":"12d6af3b2e5d4c2e"
    }
}

回复消息:"发送RoutingKey:AAAEF718FCC41307"
{
    "mqkey":"queue.{组件}",
    "mqtype":"cast",
    "mqbody":{
        "flag": "true",
        "error": "",
        "result": {
            "BF0EE718FCC41307": "finish",
            "AF0EE718FCC4130C": "execing",
            "EF0EE718FSEC4130": "stoped"
        }
    }
}
queueResult
接收消息:
{
    "mqkey":"{组件}.queue.queueResult",
    "mqtype":"call.EC0EF718FCC41BBB",
    "mqbody":{
        "id":"12d6af3b2e5d4c2e"
    }
}

回复消息:"发送RoutingKey:EC0EF718FCC41BBB"
{
    "mqkey":"queue.{组件}",
    "mqtype":"cast",
    "mqbody":{
        "flag": "true",
        "error": "",
        "result":{
            "BF0EE718FCC41307": {
                "status":"finish",
                "depend_task_id":"",
                "flag": "true",
                "error": "",
                "id": "BF0EE718FCC41307",
                "worker_flag": "1",
                "worker_message": "hello word!",
                "worker_costtime": "74"
            },
            "AF0EE718FCC41301": {
                "status":"execing",
                "depend_task_id":"BF0EE718FCC41307",
                "flag": "",
                "error": "",
                "id": "AF0EE718FCC41301",
                "worker_flag": "",
                "worker_message": "",
                "worker_costtime": ""
            },
            "CF0EE718FCC41302": {
                "status":"pendding",
                "depend_task_id":"BF0EE718FCC41307",
                "flag": "",
                "error": "",
                "id": "CF0EE718FCC41302",
                "worker_flag": "",
                "worker_message": "",
                "worker_costtime": ""
            }
        }
    }
}
taskResult:
 {
    "mqkey":"{组件}.queue.taskResult",
    "mqtype":"cast",
    "mqbody":{
        "flag":"true"
        "error":""
        "result":{
            "id":"9ad6af3b2e5d4c2d",
            "worker_flag":"1",
            "worker_message":"hello word!",
            "worker_costtime":"74"
        }
    }
}

服务使用列表

组件 服务 类型 注释
scheduler asyncJob cast 发起异步任务

results matching ""

    No results matching ""