2.1 单词排序开发实例

在FastDCS的源代码中的/src/demo/目录下有一个完整的开发样例,该样例演示了如何使用FastDCS对/src/demo/dict.txt文件中的英文单词按照出现的次数进行排序,该样例仅用于FastDCS开发讲解并无实际应用价值;

2.1.1 开发思路

  • 首先,我们需要按照任务分解的思想,将dict.txt文件中按照一行(或几行)作为一个计算任务单元,将整个工作拆分成N份计算任务,将所有的计算任务存储到一个外部存储系统中,Demo中使用了mysql作为外部存储系统;
  • 你需要按照/src/demo/dict.sql文件中提供的SQL脚本,在mysql中建立用于保存每个计算任务的dict_task表以及用于保存单词排序结果的dict_word表;
  • Demo中提供了dict_mock程序帮助你将计算任务单元保存到了mysql中的dict_task表。
  • 我们需要开发管理节点Master程序,我们将它命名为 DictMaster,用来从dict_task表中获取未处理的计算任务单元导入到FastDCS服务集群中,还负责将计算结果保存到dict_word表中;
  • 我们需要开发工作节点Worker程序,我们将它命名为 DictWorker,用来接受DictMaster节点分配的计算任务单元,根据计算任务单元的ID从dict_task表中获取对应的文字内容,然后按照单词进行拆分计算,计算的结果由FastDCS自动发送给DictMaster节点进行保存;
  • 通过运行一段时间完成计算后,dict_word 表中将保存了 dict.txt 文本中所有单词排序后的结果;

2.1.2 DictMaster开发说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
// 1.将自定义类'DictMaster'派生在'Master'之上
class DictMaster : public Master {
public:
// 2.Master节点程序启动后,会调用这个方法
void InitialTracker(struct settings_s settings) {
// 初始化自定义变量 mysql_connect_,连接mysql数据库
mysql_connect_.Connect(settings.mysql_database,
settings.mysql_host,
settings.mysql_user,
settings.mysql_passwd,
settings.mysql_port);
// 必须调用基类中的InitialTracker函数,初始化Master类
Master::InitialTracker(settings);
};
// 3.Master节点程序退出时,会调用这个方法
void FinalizeTracker() {
// 由于自定义变量 mysql_connect_ 是自动释放资源的,所以没有自定义变量需要在这里释放
// 必须调用基类中的FinalizeTracker函数,释放Master类
Master::FinalizeTracker();
};
// 4.FastDCS空闲的时候会主动调用这个方法,将新的计算任务导入到服务集群
// 该方法只会由选举为'Primary master'的节点进行调用
// 开发思路:批量获取'dict_task'表中的任务状态字段'task_status'等于'1'未处理的计算任务
// 将它们导入到FastDCS,同时将这批数据的任务状态字段'task_status'设置为'2'(已处理)
bool ImportTaskUDF(vector<FdcsTask> &tasks) {
LOG(INFO) << "DictMaster::ImportTaskUDF()";
std::string select_sql, update_sql, update_where;
update_sql = "UPDATE dict_task SET task_status = 2 WHERE ";
// 根据配置文件master.conf中的'preload_tasks'所配置的参数批量获取
// task_status等于'1'(未处理)的的计算任务
std::string format = "SELECT task_id FROM dict_task WHERE task_status = 1 LIMIT %d;";
SStringPrintf(&select_sql, format.data(), fdcs_env_.PreloadTasks());
LOG(INFO) << select_sql;
MysqlQuery query(&mysql_connect_);
if (!query.Execute(select_sql.data())) {
LOG(ERROR) << "select task from mysql failduer!";
return false;
}
MysqlResult *sql_result = query.Store();
int rows_size = sql_result->RowsSize();
LOG(INFO) << "sql_result->RowsSize() = " << sql_result->RowsSize();
if (0 == rows_size) return false;
// 获取到新的计算任务后,为了避免重复处理
// 将数据库中本次获取的计算任务的'task_status'设置为'2'(正在处理状态)
FdcsTask task;
for (int i = 0; i < rows_size; ++i) {
MysqlRow row = sql_result->FetchRow();
std::string task_id = (char*)row[0];
LOG(INFO) << "task_id = " << task_id;
task.set_task_id(task_id);
tasks.push_back(task);
if (update_where.empty()) {
update_where = "task_id = '" + task_id + "'";
} else {
update_where = update_where + " or task_id = '" + task_id + "'";
}
}
query.FreeResult(sql_result);
update_sql = update_sql + update_where + ";";
// 执行更新task_status状态的SQL语句
if (!query.TryExecute(update_sql.data())) {
LOG(ERROR) << "update task status faildure!";
return false;
}
return true;
};
// 5.FastDCS会主动调用这个方法,将任务的计算结果导出到外部存储系统
// 该方法只会由选举为'Primary master'的节点进行调用
// 开发思路:根据FastDCS导出的计算结果,将单词转换成hash码作为唯一标示,将结果进行保存
// 如果数据库中已经存在该单词所对应的的hash码,则进行累加处理,如果不存在直接进行保存处理
bool ExportTaskUDF(vector<FdcsTask> tasks) {
// 变量tasks中保存了数据结构FdcsTask的数组,变量中的数据结构类似于
// tasks = {[
// FdcsTask = {task_id=id1, key_values_pairs=[{单词1,count1},{单词11,count11}, ...]},
// FdcsTask = {task_id=id2, key_values_pairs=[{单词2,count2},{单词22,count22}, ...]},
// ...
// ]}
// 初始化mysql查询对象
MysqlQuery query(&mysql_connect_);
// 循环获取tasks数组中的每一个计算结果
for (int i = 0; i < tasks.size(); ++i) {
// 打印计算任务的ID
LOG(INFO) << "task_id = " << tasks[i].task_id();
std::vector<KeyValuePair> key_values;
// 每一个计算任务的结果保存在一对多键值对'key_values_pairs'变量中
for (int ii = 0; ii < tasks[i].key_values_pairs_size(); ++ii) {
// key_values_pair 中的数据结构类似于
// FdcsTask = {task_id=id1, key_values_pairs=[{单词1,count1},{单词11,count11}, ...]}
KeyValuesPair key_values_pair = tasks[i].key_values_pairs(ii);
// 通过'ReadRecord'函数将'key_values_pairs'变量中的内容读取到一对一键值对'key_values'数组中
ReadRecord(&key_values_pair, &key_values);
for (int iii = 0; iii < key_values.size(); ++iii) {
// 每一个计算任务结果中通过键值对保存了当前任务的
// 每一个单词的名称和出现的次数
// 数据结果类似[key=单词1, value=count1]
string word = key_values[iii].key();
word = StringReplace(word, "\"", "\\\"");
word = StringReplace(word, "'", "\\\'");
int count = KeyToInt32(key_values[iii].value());
// 将单词转换成Hash编码
uint32 word_id = BKDRHash(word);
// 查找'dict_word'表中已有的数据中是否存在该单词,
// 如果有进行累加处理,如果没有直接进行保存
string select_sql;
SStringPrintf(&select_sql,
"SELECT count FROM dict_word WHERE word_id = %lu;", word_id);
MysqlResult *sql_result = NULL;
if (!query.Execute(select_sql.data())) {
LOG(ERROR) << "SELECT count FROM dict_word faildure!";
continue;
}
sql_result = query.Store();
int old_count = 0;
if (sql_result->RowsSize() > 0) {
MysqlRow row = sql_result->FetchRow();
old_count = (int)row[0];
}
query.FreeResult(sql_result);
// 拼装SQL语句,如果 dict_word 表中已存在该单词则进行累加处理,否则直接进行保存
string insert_sql;
if (0 == old_count) {
insert_sql = StringPrintf("INSERT INTO dict_word \
(word_id, word, count, create_time) VALUES (%lu, '%s', %d, now())", \
word_id, word.data(), count);
} else {
insert_sql = StringPrintf("UPDATE dict_word SET count = %d, \
update_time = now() WHERE word_id = %lu", \
old_count + count, word_id);
}
if ((old_count + count) < 0 || old_count < 0 || count < 0) {
LOG(INFO) << "old_count = " << old_count;
LOG(INFO) << "count = " << count;
LOG(INFO) << "key_values[iii] = " << key_values[iii].value();
LOG(INFO) << "insert_sql = " << insert_sql;
continue;
}
// 执行SQL语句
if (!query.TryExecute(insert_sql.data())) {
LOG(ERROR) << "INSERT dict_word[" << word_id
<< ", word = " << word << "] faildure!";
continue;
}
}
}
}
return true;
};
private:
// 使用FastDCS提供的mysql封装类
MysqlConnection mysql_connect_;
};
// 6.注册宏必须填写正确,'DictMaster'是你自定义的Master类名称
REGISTER_FASTDCS_TRACKER(DictMaster);

2.1.3 DictWorker开发说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// 1.将自定义类'DictWorker'派生在'Worker'之上
class DictWorker : public Worker {
public:
// 2.Worker节点启动后,会调用这个方法
void InitialTracker(struct settings_s settings) {
// 初始化自定义变量 mysql_connect_,连接mysql数据库
mysql_connect_.Connect(settings.mysql_database,
settings.mysql_host,
settings.mysql_user,
settings.mysql_passwd,
settings.mysql_port);
// 必须调用基类中的InitialTracker函数,初始化Worker类
Worker::InitialTracker(settings);
}
// 3.Worker节点退出时,会调用这个方法
void FinalizeTracker() {
// 由于自定义变量 mysql_connect_ 是自动释放资源的,所以没有自定义变量需要在这里释放
// 必须调用基类中的FinalizeTracker函数,释放Worker类
Worker::FinalizeTracker();
}
// 4.FastDCS中有需要计算的任务的时候会主动调用这个方法,开发者自行实现自定义的计算方法
// 开发思路:根据FastDCS分发的 task 数据中的任务ID,从外部存储系统中获取该任务对应的文本
// 将文本内容根据单词之间的空格进行拆分,并计算每个单词在该任务的文本中出现的次数
// 计算结果完成后,FastDCS会自动将这个任务分发给 DictMaster 节点,进行数据更新
bool ComputingUDF(FdcsTask &task) {
// task 是由 DictMaster 分发给 DictWorker 的计算任务,task 中的数据结构类似 :
// task = {task_id=id1, key_values_pairs=[{单词1,count1},{单词11,count11}, ...]}
// 根据计算任务ID从外部存储系统mysql中获取相应的文本内容
std::string select_sql, text;
SStringPrintf(&select_sql, "SELECT text FROM dict_task WHERE task_id = %s;",
task.task_id().data());
LOG(INFO) << select_sql;
MysqlQuery query(&mysql_connect_);
if (!query.Execute(select_sql.data())) {
LOG(ERROR) << "select text from mysql failduer!";
return false;
}
MysqlResult *sql_result = query.Store();
LOG(INFO) << "sql_result->RowsSize() = " << sql_result->RowsSize();
if (sql_result->RowsSize() > 0) {
MysqlRow row = sql_result->FetchRow();
text.append((char*)row[0]);
}
query.FreeResult(sql_result);
// 根据文本之间的空格,拆分文本,将每个单词和出现的次数保存到'word_count'变量中
std::map<string /*word*/, int /*count*/> word_count;
std::vector<std::string> words;
SplitStringUsing(text, " ", &words);
for (int i = 0; i < words.size(); ++i) {
if (true == words[i].empty()) continue;
if (word_count.end() == word_count.find(words[i])) {
word_count[words[i]] = 1;
} else {
word_count[words[i]] = word_count[words[i]] + 1;
}
}
// 将每个单词和出现的次数保存到'task'中的一对多键值对'KeyValuePair'变量中
KeyValuesPair *key_values_pair = task.add_key_values_pairs();
for (WordCountIter it = word_count.begin(); it != word_count.end(); ++it) {
// 将每个单词和出现的次数保存到一对一键值对'key_value'中
KeyValuePair key_value;
key_value.set_key(it->first);
string count = Int32ToKey(it->second);
if (true == count.empty() || it->second < 0 || KeyToInt32(count) < 0) {
LOG(INFO) << "it" << it->first << ", " << it->second << ", " << KeyToInt32(count);
abort();
}
key_value.set_value(count);
// 将 key_value 变量保存到'task'中的一对多键值对'KeyValuePair'变量中
WriteRecord(key_values_pair, key_value);
}
// 你可以通过休眠5秒,来模拟较复杂的计算
// sleep(5);
// 'task'变量已经保存了你的计算结果,准备发送到'DictMaster'节点中
return true;
}
private:
// 使用FastDCS提供的mysql封装类
MysqlConnection mysql_connect_;
typedef std::map<string /*word*/, int /*count*/>::iterator WordCountIter;
};
// 5.注册宏必须填写正确,'DictWorker'是你自定义的Worker类名称
REGISTER_FASTDCS_TRACKER(DictWorker);

2.1.4 DictMock开发说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// 创建开发实例中的表结构
void CreateDictTable();
// 将dict.txt文件中的内容拆分成计算任务保存到数据库中
void CreateDictData();
// 清空开发实例中的表中的所有数据
void TruncateDictData();
// 清除开发实例中的表结构和数据
void DropDictTable();
// 使用FastDCS提供的mysql封装类
MysqlConnection connection;
int main(int argc, char const *argv[]) {
// 你可以修改数据库名称、服务IP、用户名、用户密码和数据库端口连接你的mysql数据库
bool conn = connection.Connect("FastDCS", "localhost", "FastDCS", "fastdcs", 3306);
if (!conn)
LOG(ERROR) << "connect mysql faildure!";
// 接受用户的输入,进行相关的操作
char cmd[20];
printf("----- 0:quit! -----\n");
printf("----- 1:create dict table -----\n");
printf("----- 2:create dict data -----\n");
printf("----- 3:truncate dict data -----\n");
printf("----- 4:drop dict table -----\n");
while (strncmp("0", cmd, 1) != 0) {
fgets(cmd, 20, stdin);
if (strncmp("1", cmd, 1) == 0) {
CreateDictTable();
} else if (strncmp("2", cmd, 1) == 0) {
CreateDictData();
} else if (strncmp("3", cmd, 1) == 0) {
TruncateDictData();
} else if (strncmp("4", cmd, 1) == 0) {
DropDictTable();
}
}
return 0;
}
// 创建开发实例中的表结构
void CreateDictTable() {
MysqlQuery query(&connection);
if (!query.Execute("SELECT * FROM dict_task;")) {
query.Execute("CREATE TABLE dict_task ( \
task_id VARCHAR(32) NOT NULL PRIMARY KEY, \
task_status INTEGER default 0, \
text VARCHAR(20480), \
create_time DATETIME, \
update_time DATETIME \
)ENGINE=InnoDB DEFAULT CHARSET=utf8;");
}
LOG(INFO) << "CREATE TABLE dict_task";
if (!query.Execute("SELECT * FROM dict_word;")) {
query.Execute("CREATE TABLE dict_word ( \
word_id INTEGER UNSIGNED NOT NULL PRIMARY KEY, \
word VARCHAR(256) NOT NULL, \
count INTEGER default 0, \
create_time DATETIME, \
update_time DATETIME \
)ENGINE=InnoDB DEFAULT CHARSET=utf8;");
}
LOG(INFO) << "CREATE TABLE dict_word";
}
// 清除开发实例中的表结构和数据
void DropDictTable() {
MysqlQuery query(&connection);
if (!query.TryExecute("DROP TABLE dict_task;")) {
LOG(ERROR) << "DROP TABLE dict_task faildure!";
} else {
LOG(INFO) << "DROP TABLE dict_task success!";
}
if (!query.TryExecute("DROP TABLE dict_word;")) {
LOG(ERROR) << "DROP TABLE dict_word faildure!";
} else {
LOG(INFO) << "DROP TABLE dict_word success!";
}
}
// 将dict.txt文件中的内容拆分成计算任务保存到数据库中
void CreateDictData() {
float time_use = 0;
struct timeval start, end;
gettimeofday(&start, NULL);
LOG(INFO) << "start.tv_sec:" << start.tv_sec;
LOG(INFO) << "start.tv_usec:" << start.tv_usec;
FILE* file = NULL;
char* line = NULL;
size_t len = 0;
size_t read;
file = fopen("dict.txt", "r");
CHECK(file);
int32_t count = 1;
std::string sql, line_text;
MysqlQuery query(&connection);
while ((read = getline(&line, &len, file)) != -1) {
line_text = line;
line_text = StringReplace(line_text, "\"", "\\\"");
line_text = StringReplace(line_text, "'", "\\\'");
sql = "INSERT INTO dict_task(task_id, task_status, text, create_time) VALUES ('";
sql += Int32ToKey(count);
sql += "' , 1, '";
sql += line_text;
sql += "', now())";
if (!query.TryExecute(sql.data())) {
LOG(ERROR) << "INSERT INTO dict_task[" << count << "] faildure!";
fclose(file);
return;
}
count ++;
}
LOG(INFO) << "file line count = " << count;
if (line)
free(line);
fclose(file);
gettimeofday(&end, NULL);
LOG(INFO) << "end.tv_sec:" << end.tv_sec;
LOG(INFO) << "end.tv_usec:" << end.tv_usec;
time_use = (end.tv_sec-start.tv_sec)*1000000+(end.tv_usec-start.tv_usec); // ms
LOG(INFO) << "time_use is " << time_use/1000000 << "s";
}
// 清空开发实例中的表中的所有数据
void TruncateDictData() {
MysqlQuery query(&connection);
if (!query.TryExecute("TRUNCATE TABLE dict_task;")) {
LOG(ERROR) << "TRUNCATE TABLE dict_task faildure!";
} else {
LOG(INFO) << "TRUNCATE TABLE dict_task success.";
}
if (!query.TryExecute("TRUNCATE TABLE dict_word;")) {
LOG(ERROR) << "TRUNCATE TABLE dict_word faildure!";
} else {
LOG(INFO) << "TRUNCATE TABLE dict_word success.";
}
}
// 查看开发实例数据的一些常用脚本
// SELECT * FROM dict_word order by count desc limit 0, 10;
// SELECT count(1), MAX(count) FROM dict_word;
// SELECT count(1), task_status FROM dict_task GROUP BY task_status;

2.2.1 实例部署

  1. 按照/src/demo/dict.sql文件中提供的SQL脚本,在mysql中建立用于保存每个计算任务的dict_task表以及用于保存单词排序结果的dict_word表;
  2. 参照《1.6.1 编译和安装说明》将单词排序实例在Linux环境下进行编译和安装,实例程序将会安装到/home/liuxun/FastDCS/demo中,其中liuxun可以修改成你的工作目录;
  3. 通过运行dict_mock程序将dict.txt文件拆分成计算任务,保存到mysql数据库中;
  4. 你可以在本机上运行多个 master 和 worker 节点程序来模拟一个FastDCS服务集群,或者将 master 和 worker 节点程序复制到不同的Linux服务器上;
  5. 在编译安装的目录中,可以通过修改配置文件,让FastDCS系统以应用程序的方式运行并将日志输出到控制台

需要修改的配置项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
run_by_daemon = false
max_core_file = true
info_log =
warn_log =
err_log =
// 服务集群的IP地址,可以配置一个或者多个,必须和你的master节点的tracker_server相对应
tracker_group = 127.0.0.1:32301;127.0.0.1:32302;127.0.0.1:32303
// 当前节点的IP地址,如果在同一台Linux服务器上需要配置不同的端口
tracker_server = 127.0.0.1:32301
// mysql参数,需要根据你的运行环境进行配置
mysql_database = FastDCS
mysql_host = localhost
mysql_user = FastDCS
mysql_passwd = fastdcs
mysql_port = 3306

使用多个控制台分别运行master和worker节点程序

1
2
3
4
5
6
7
$ ./master_32301.sh
$ ./master_32302.sh
$ ./master_32303.sh
$ ./worker.sh

如果配置正确,应该就可以看见其中一台 master 节点不停的获取计算任务,分配给其他的 master 节点,worker 节点接受计算任务的调度进行计算,你可以通过/src/demo/dict.sql中提供的SQL工具脚本在mysql中查看数据的运行情况。

2.2 如何进行实际运用

通过对单词排序应用实例的讲解,将分布式计算系统FastDCS中的计算任务分解、计算任务导入、任务的计算、计算结果保存到外部存储系统等方面进行了阐述,在将FastDCS运用于实际业务的开发中时和对单词排序应用实例的开发流程完全一致,所需要自定义的功能模块有以下区别:

  1. DictMock程序在实际运用中一般都是计算任务的输入源,可能是一个固定大小和时间长度的计算任务,也可能是一个在线业务永不停息进行输入的数据流,例如:2008-2012年瑞读网每天大量的用户上传的文件;
  2. 外部存储系统一般分为二个部分,一个是用于保存计算任务的数据库,如:mysql、hbase等。另一个是用于保存计算数据的存储系统,如:FastDFS、hdfs等;
  3. DictMaster程序中的计算任务的导入和导出功能的具体实现,需要根据实际运用的具体情况进行自定义开发;
  4. DictWorker程序中的计算功能的具体实现,需要根据实际运用的具体情况进行自定义开发;

3.1 应用案例

2013年FastDCS应用于某城市的智能交通项目,FastDCS部署在14台服务器上,每日进行400万次的交通数据处理和10亿数据量的分析工作;