■MapReduce の決まり
・map()でemit()した値の形式とreduce()の戻り値の形式が一致していること
reduce関数は一度のMapReduce内で何度も呼ばれ、全てのreduceの戻り値をreduceした結果がMapReduceの戻り値となる。
→最終的な結果 = reduce(key, [values_3, reduce(key, [values_2, reduce(key, [values_1])])])
また、map()内で1件しかemit()されなかったkeyと値は、reduce()で一度も処理されない!!
すべてreduceした後の値を処理したい場合は、finalize functionを利用できる。
mongoの公式マニュアルが詳しい。
MapReduce - Docs-Japanese - 10gen Confluence
(mongoは集計できるので)mongoでは使いどころは限られると思うが、簡単にjavascriptでMapReduceできるなんてMongoDBやっぱいいね(・∀・)
■ MapReduceでやる意味(長所)
・集計対象のデータが巨大になっても負荷分散できる。
・oracleのマテビューのように結果を保存できる。(merge, replaceも可能)
・そもそもcassandraやhbaseでは集計関数やキー以外の条件検索ができないが、MapReduceなら集計できる。
そのため集計用のテーブルなどを別に作成せず、ユーザーのトランザクションデータを直接集計できる。
■ データの初期化
// データの初期化
db.userActivity.remove()
// 登録
db.userActivity.insert({name: 'ruffy', type: 'login', at: new Date('2012/05/06 1:00:00')});
db.userActivity.insert({name: 'zoro', type: 'login', at: new Date('2012/05/06 2:00:00')});
db.userActivity.insert({name: 'usopp', type: 'login', at: new Date('2012/05/06 3:00:00')});
db.userActivity.insert({name: 'nami', type: 'login', at: new Date('2012/05/06 4:00:00')});
db.userActivity.insert({name: 'ruffy', type: 'login', at: new Date('2012/05/05 5:00:00')});
db.userActivity.insert({name: 'zoro', type: 'login', at: new Date('2012/05/05 6:00:00')});
db.userActivity.insert({name: 'nami', type: 'login', at: new Date('2012/05/05 7:00:00')});
db.userActivity.insert({name: 'ruffy', type: 'login', at: new Date('2012/05/04 8:00:00')});
db.userActivity.insert({name: 'usopp', type: 'login', at: new Date('2012/05/04 9:00:00')});
db.userActivity.insert({name: 'nami', type: 'login', at: new Date('2012/05/04 10:00:00')});
db.userActivity.insert({name: 'ruffy', type: 'login', at: new Date('2012/05/03 11:00:00')});
db.userActivity.insert({name: 'zoro', type: 'login', at: new Date('2012/05/03 12:00:00')});
db.userActivity.insert({name: 'ruffy', type: 'login', at: new Date('2012/05/02 13:00:00')});
db.userActivity.insert({name: 'zoro', type: 'login', at: new Date('2012/05/02 14:00:00')});
db.userActivity.insert({name: 'usopp', type: 'login', at: new Date('2012/05/02 15:00:00')});
db.userActivity.insert({name: 'nami', type: 'login', at: new Date('2012/05/02 16:00:00')});
db.userActivity.insert({name: 'ruffy', type: 'login', at: new Date('2012/05/01 17:00:00')});
db.userActivity.insert({name: 'usopp', type: 'login', at: new Date('2012/05/01 18:00:00')});
db.userActivity.insert({name: 'ruffy', type: 'event_login', at: new Date('2012/05/06 19:00:00')});
db.userActivity.insert({name: 'nami', type: 'event_login', at: new Date('2012/05/06 20:00:00')});
db.userActivity.insert({name: 'zoro', type: 'event_login', at: new Date('2012/05/05 21:00:00')});
db.userActivity.insert({name: 'nami', type: 'event_login', at: new Date('2012/05/05 22:00:00')});
db.userActivity.insert({name: 'usopp', type: 'event_login', at: new Date('2012/05/04 23:00:00')});
db.userActivity.insert({name: 'nami', type: 'event_login', at: new Date('2012/05/04 1:00:00')});
db.userActivity.insert({name: 'ruffy', type: 'event_login', at: new Date('2012/05/03 2:00:00')});
db.userActivity.insert({name: 'zoro', type: 'event_login', at: new Date('2012/05/03 3:00:00')});
db.userActivity.insert({name: 'ruffy', type: 'event_login', at: new Date('2012/05/02 4:00:00')});
db.userActivity.insert({name: 'nami', type: 'event_login', at: new Date('2012/05/02 5:00:00')});
db.userActivity.insert({name: 'ruffy', type: 'event_login', at: new Date('2012/05/01 6:00:00')});
// 検索
db.userActivity.find({name: 'ruffy'}).sort({at:-1});
db.userActivity.find({name: 'zoro'}).sort({at:-1});
■ 誤った方法
MapReduceの決まりに沿っていないため、動きはするがデータ量が増えるとNG
決まり:map()でemit()した値の形式とreduce()の戻り値の形式が一致していること
// ------------------------------------------------
// ユーザー軸の集計
map = function () {
// 'ruffy', {type: login, dt: 2012-05-05}
emit(this.name, {'type': this.type, 'dt': this.at});
}
reduce = function(key, values) {
var result = {login: [], event_login: []};
values.forEach(function(value) {
type = value.type
dt = value.dt
// javascriptの日付操作、なんとかならんかの。
dt.setHours(0);dt.setMinutes(0);dt.setSeconds(0);dt.setMilliseconds(0);
result[type].push(dt)
});
return result;
}
// 実行(inlineオプションは、結果を保存せず画面出力するオプション)
db.userActivity.mapReduce(map, reduce, {out: { inline : 1}});
// とはいえ、これはユーザーでorderすれば同じようなことができる
db.userActivity.find().sort({name: 1, at: -1, type:1})
// ------------------------------------------------
// 日付軸の集計
map = function () {
dt = this.at;
dt.setHours(0);dt.setMinutes(0);dt.setSeconds(0);dt.setMilliseconds(0);
// 2012-05-05, {type: login, name: 'ruffy'}
emit(dt, {'type': this.type, 'name': this.name});
}
reduce = function(key, values) {
var result = {login: [], event_login: []};
values.forEach(function(value) {
type = value.type
result[type].push(value.name)
});
return result;
}
// 実行(inlineオプションは、結果を保存せず画面出力するオプション)
db.userActivity.mapReduce(map, reduce, {out: { inline : 1}});
// sqlだとdate型のカラムがない状態で集計は面倒だけど楽にできちゃうヽ(・∀・)ノ
// 対象データをfilterして集計とかもできちゃう
db.userActivity.mapReduce(map, reduce, {query: {at: {$gte: new Date('2012/05/03')}}, out: {inline : 1}});
// ------------------------------------------------
// 活動軸の集計
map = function () {
// login, {name: 'ruffy', dt: 2012-05-05}
emit(this.type, {'name': this.name, 'dt': this.at});
}
reduce = function(key, values) {
var result = {};
values.forEach(function(value) {
dt = value.dt
dt.setHours(0);dt.setMinutes(0);dt.setSeconds(0);dt.setMilliseconds(0);
if (typeof result[dt] == 'undefined') {
result[dt] = 0
}
result[dt] += 1
});
return result;
}
// 実行(inlineオプションは、結果を保存せず画面出力するオプション)
db.userActivity.mapReduce(map, reduce, {out: { inline : 1}});
db.userActivity.mapReduce(map, reduce, {query: {type: 'login'}, out: { inline : 1}});
■ 正しい方法
// ------------------------------------------------
// ユーザー軸の集計
map = function () {
// 'ruffy', {type: login, dt: 2012-05-05}
dt = this.at
dt.setHours(0);dt.setMinutes(0);dt.setSeconds(0);dt.setMilliseconds(0);
value = {}
value[this.type] = [dt]
emit(this.name, value);
}
reduce = function(key, values) {
var result = {};
values.forEach(function(value) {
for (type in value) {
for (i in value[type]) {
if (typeof result[type] == 'undefined') { result[type] = []; }
dt = value[type][i]
result[type].push(dt)
}
}
});
return result;
}
// 実行(inlineオプションは、結果を保存せず画面出力するオプション)
db.userActivity.mapReduce(map, reduce, {out: { inline : 1}});
// ------------------------------------------------
// 日付軸の集計
map = function () {
dt = this.at
dt.setHours(0);dt.setMinutes(0);dt.setSeconds(0);dt.setMilliseconds(0);
value = {}
value[this.type] = [this.name]
emit(dt, value);
}
reduce = function(key, values) {
var result = {};
values.forEach(function(value) {
for (type in value) {
for (i in value[type]) {
if (typeof result[type] == 'undefined') { result[type] = []; }
dt = value[type][i]
result[type].push(dt)
}
}
});
return result;
}
// 実行(inlineオプションは、結果を保存せず画面出力するオプション)
db.userActivity.mapReduce(map, reduce, {out: { inline : 1}});
// ------------------------------------------------
// 活動軸の集計
map = function () {
dt = this.at
dt.setHours(0);dt.setMinutes(0);dt.setSeconds(0);dt.setMilliseconds(0);
value = {}
value[dt] = 1
emit(this.type, value);
}
reduce = function(key, values) {
var result = {};
values.forEach(function(value) {
for (dt in value) {
if (typeof result[dt] == 'undefined') { result[dt] = 0; }
count = value[dt];
result[dt] += parseInt(count)
}
});
return result;
}
// 実行(inlineオプションは、結果を保存せず画面出力するオプション)
db.userActivity.mapReduce(map, reduce, {out: { inline : 1}});
■ 参考サイト
MongoDBのMapReduceは、その他にも様々なオプションでいろいろできる!
MapReduce - Docs-Japanese - 10gen Confluence - MongoDB