Use bee-queue instead of Kue

This commit is contained in:
syuilo 2018-07-26 08:11:47 +09:00
parent bf7b214cf1
commit b7b4b622a2
8 changed files with 25 additions and 53 deletions

View file

@ -1,11 +1,7 @@
# Management guide # Management guide
## Check the status of the job queue ## Check the status of the job queue
In the directory of Misskey: coming soon
``` shell
node_modules/kue/bin/kue-dashboard -p 3050
```
When you access port 3050, you will see the UI.
## Mark as 'admin' user ## Mark as 'admin' user
``` shell ``` shell

View file

@ -1,11 +1,7 @@
# 運営ガイド # 運営ガイド
## ジョブキューの状態を調べる ## ジョブキューの状態を調べる
Misskeyのディレクトリで: coming soon
``` shell
node_modules/kue/bin/kue-dashboard -p 3050
```
ポート3050にアクセスするとUIが表示されます
## 管理者ユーザーを設定する ## 管理者ユーザーを設定する
``` shell ``` shell

View file

@ -55,7 +55,6 @@
"@types/koa-send": "4.1.1", "@types/koa-send": "4.1.1",
"@types/koa-views": "2.0.3", "@types/koa-views": "2.0.3",
"@types/koa__cors": "2.2.2", "@types/koa__cors": "2.2.2",
"@types/kue": "0.11.9",
"@types/minio": "6.0.2", "@types/minio": "6.0.2",
"@types/mkdirp": "0.5.2", "@types/mkdirp": "0.5.2",
"@types/mocha": "5.2.3", "@types/mocha": "5.2.3",
@ -86,6 +85,7 @@
"autosize": "4.0.2", "autosize": "4.0.2",
"autwh": "0.1.0", "autwh": "0.1.0",
"bcryptjs": "2.4.3", "bcryptjs": "2.4.3",
"bee-queue": "1.2.2",
"bootstrap-vue": "2.0.0-rc.11", "bootstrap-vue": "2.0.0-rc.11",
"cafy": "11.3.0", "cafy": "11.3.0",
"chalk": "2.4.1", "chalk": "2.4.1",
@ -144,7 +144,6 @@
"koa-send": "5.0.0", "koa-send": "5.0.0",
"koa-slow": "2.1.0", "koa-slow": "2.1.0",
"koa-views": "6.1.4", "koa-views": "6.1.4",
"kue": "0.11.6",
"loader-utils": "1.1.0", "loader-utils": "1.1.0",
"mecab-async": "0.1.2", "mecab-async": "0.1.2",
"minio": "6.0.0", "minio": "6.0.0",

View file

@ -31,9 +31,6 @@ if (process.env.NODE_ENV != 'production') {
process.env.DEBUG = 'misskey:*'; process.env.DEBUG = 'misskey:*';
} }
// https://github.com/Automattic/kue/issues/822
require('events').EventEmitter.prototype._maxListeners = 512;
// Start app // Start app
main(); main();

View file

@ -1,52 +1,36 @@
import { createQueue } from 'kue'; import * as Queue from 'bee-queue';
import config from '../config'; import config from '../config';
import http from './processors/http'; import http from './processors/http';
import { ILocalUser } from '../models/user'; import { ILocalUser } from '../models/user';
const queue = createQueue({ const queue = new Queue('misskey', {
redis: { redis: {
port: config.redis.port, port: config.redis.port,
host: config.redis.host, host: config.redis.host,
auth: config.redis.pass password: config.redis.pass
} },
removeOnSuccess: true,
removeOnFailure: true
}); });
process.once('SIGTERM', () => { export function createHttpJob(data: any) {
queue.shutdown(5000, (err: any) => { return queue.createJob(data)
console.log('Kue shutdown: ', err || ''); .retries(4)
process.exit(0); .backoff('exponential', 16384) // 16s
}); .save();
});
export function createHttp(data: any) {
return queue
.create('http', data)
.removeOnComplete(true)
.events(false)
.attempts(8)
.backoff({ delay: 16384, type: 'exponential' });
} }
export function deliver(user: ILocalUser, content: any, to: any) { export function deliver(user: ILocalUser, content: any, to: any) {
createHttp({ createHttpJob({
title: 'deliver',
type: 'deliver', type: 'deliver',
user, user,
content, content,
to to
}).save(); });
} }
export default function() { export default function() {
/* queue.process(8, http);
256 is the default concurrency limit of Mozilla Firefox and Google
Chromium.
a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google
https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff
Network.http.max-connections - MozillaZine Knowledge Base
http://kb.mozillazine.org/Network.http.max-connections
*/
//queue.process('http', 256, http);
queue.process('http', 128, http);
} }

View file

@ -1,8 +1,8 @@
import * as kue from 'kue'; import * as bq from 'bee-queue';
import request from '../../../remote/activitypub/request'; import request from '../../../remote/activitypub/request';
export default async (job: kue.Job, done: any): Promise<void> => { export default async (job: bq.Job, done: any): Promise<void> => {
try { try {
await request(job.data.user, job.data.to, job.data.content); await request(job.data.user, job.data.to, job.data.content);
done(); done();

View file

@ -1,4 +1,4 @@
import * as kue from 'kue'; import * as bq from 'bee-queue';
import * as debug from 'debug'; import * as debug from 'debug';
const httpSignature = require('http-signature'); const httpSignature = require('http-signature');
@ -10,7 +10,7 @@ import { resolvePerson } from '../../../remote/activitypub/models/person';
const log = debug('misskey:queue:inbox'); const log = debug('misskey:queue:inbox');
// ユーザーのinboxにアクティビティが届いた時の処理 // ユーザーのinboxにアクティビティが届いた時の処理
export default async (job: kue.Job, done: any): Promise<void> => { export default async (job: bq.Job, done: any): Promise<void> => {
const signature = job.data.signature; const signature = job.data.signature;
const activity = job.data.activity; const activity = job.data.activity;

View file

@ -3,7 +3,7 @@ import * as Router from 'koa-router';
const json = require('koa-json-body'); const json = require('koa-json-body');
const httpSignature = require('http-signature'); const httpSignature = require('http-signature');
import { createHttp } from '../queue'; import { createHttpJob } from '../queue';
import pack from '../remote/activitypub/renderer'; import pack from '../remote/activitypub/renderer';
import Note from '../models/note'; import Note from '../models/note';
import User, { isLocalUser, ILocalUser, IUser } from '../models/user'; import User, { isLocalUser, ILocalUser, IUser } from '../models/user';
@ -30,11 +30,11 @@ function inbox(ctx: Router.IRouterContext) {
return; return;
} }
createHttp({ createHttpJob({
type: 'processInbox', type: 'processInbox',
activity: ctx.request.body, activity: ctx.request.body,
signature signature
}).save(); });
ctx.status = 202; ctx.status = 202;
} }