Whamcloud - gitweb
LU-1187 mdt: add out handler for object update
[fs/lustre-release.git] / lustre / mdt / mdt_mds.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2012 Intel Corporation
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_mds.c
32  *
33  * Lustre Metadata Service Layer
34  *
35  * Author: Di Wang <di.wang@whamcloud.com>
36  **/
37
38 #define DEBUG_SUBSYSTEM S_MDS
39
40 #include <linux/module.h>
41
42 #include <obd_support.h>
43 /* struct ptlrpc_request */
44 #include <lustre_net.h>
45 /* struct obd_export */
46 #include <lustre_export.h>
47 /* struct obd_device */
48 #include <obd.h>
49 /* lu2dt_dev() */
50 #include <dt_object.h>
51 #include <lustre_mds.h>
52 #include <lustre_mdt.h>
53 #include "mdt_internal.h"
54 #ifdef HAVE_QUOTA_SUPPORT
55 # include <lustre_quota.h>
56 #endif
57 #include <lustre_acl.h>
58 #include <lustre_param.h>
59 #include <lustre_fsfilt.h>
60
61 struct mds_device {
62         /* super-class */
63         struct md_device           mds_md_dev;
64         struct ptlrpc_service     *mds_regular_service;
65         struct ptlrpc_service     *mds_readpage_service;
66         struct ptlrpc_service     *mds_out_service;
67         struct ptlrpc_service     *mds_setattr_service;
68         struct ptlrpc_service     *mds_mdsc_service;
69         struct ptlrpc_service     *mds_mdss_service;
70         struct ptlrpc_service     *mds_fld_service;
71 };
72
73 /*
74  *  * Initialized in mdt_mod_init().
75  *   */
76 static unsigned long mdt_num_threads;
77 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
78                 "number of MDS service threads to start "
79                 "(deprecated in favor of mds_num_threads)");
80
81 static unsigned long mds_num_threads;
82 CFS_MODULE_PARM(mds_num_threads, "ul", ulong, 0444,
83                 "number of MDS service threads to start");
84
85 static char *mds_num_cpts;
86 CFS_MODULE_PARM(mds_num_cpts, "c", charp, 0444,
87                 "CPU partitions MDS threads should run on");
88
89 static unsigned long mds_rdpg_num_threads;
90 CFS_MODULE_PARM(mds_rdpg_num_threads, "ul", ulong, 0444,
91                 "number of MDS readpage service threads to start");
92
93 static char *mds_rdpg_num_cpts;
94 CFS_MODULE_PARM(mds_rdpg_num_cpts, "c", charp, 0444,
95                 "CPU partitions MDS readpage threads should run on");
96
97 /* NB: these two should be removed along with setattr service in the future */
98 static unsigned long mds_attr_num_threads;
99 CFS_MODULE_PARM(mds_attr_num_threads, "ul", ulong, 0444,
100                 "number of MDS setattr service threads to start");
101
102 static char *mds_attr_num_cpts;
103 CFS_MODULE_PARM(mds_attr_num_cpts, "c", charp, 0444,
104                 "CPU partitions MDS setattr threads should run on");
105
106 #define DEFINE_RPC_HANDLER(base, flags, opc, fn, fmt)                   \
107 [opc - base] = {                                                        \
108         .mh_name        = #opc,                                         \
109         .mh_fail_id     = OBD_FAIL_ ## opc ## _NET,                     \
110         .mh_opc         = opc,                                          \
111         .mh_flags       = flags,                                        \
112         .mh_act         = fn,                                           \
113         .mh_fmt         = fmt                                           \
114 }
115
116 /* Request with a format known in advance */
117 #define DEF_MDT_HDL(flags, name, fn)                                    \
118         DEFINE_RPC_HANDLER(MDS_GETATTR, flags, name, fn, &RQF_ ## name)
119
120 /* Request with a format we do not yet know */
121 #define DEF_MDT_HDL_VAR(flags, name, fn)                                \
122         DEFINE_RPC_HANDLER(MDS_GETATTR, flags, name, fn, NULL)
123
124 /* Map one non-standard request format handler.  This should probably get
125  * a common OBD_SET_INFO RPC opcode instead of this mismatch. */
126 #define RQF_MDS_SET_INFO RQF_OBD_SET_INFO
127
128 static struct mdt_handler mdt_mds_ops[] = {
129 DEF_MDT_HDL(0,                          MDS_CONNECT,      mdt_connect),
130 DEF_MDT_HDL(0,                          MDS_DISCONNECT,   mdt_disconnect),
131 DEF_MDT_HDL(0,                          MDS_SET_INFO,     mdt_set_info),
132 DEF_MDT_HDL(0,                          MDS_GET_INFO,     mdt_get_info),
133 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_GETSTATUS,    mdt_getstatus),
134 DEF_MDT_HDL(HABEO_CORPUS,               MDS_GETATTR,      mdt_getattr),
135 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_GETATTR_NAME, mdt_getattr_name),
136 DEF_MDT_HDL(HABEO_CORPUS,               MDS_GETXATTR,     mdt_getxattr),
137 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_STATFS,       mdt_statfs),
138 DEF_MDT_HDL(0           | MUTABOR,      MDS_REINT,        mdt_reint),
139 DEF_MDT_HDL(HABEO_CORPUS,               MDS_CLOSE,        mdt_close),
140 DEF_MDT_HDL(HABEO_CORPUS,               MDS_DONE_WRITING, mdt_done_writing),
141 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_PIN,          mdt_pin),
142 DEF_MDT_HDL_VAR(0,                      MDS_SYNC,         mdt_sync),
143 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_IS_SUBDIR,    mdt_is_subdir),
144 DEF_MDT_HDL(0,                          MDS_QUOTACHECK,   mdt_quotacheck),
145 DEF_MDT_HDL(0,                          MDS_QUOTACTL,     mdt_quotactl),
146 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_HSM_PROGRESS, mdt_hsm_progress),
147 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_HSM_CT_REGISTER,
148                                                 mdt_hsm_ct_register),
149 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_HSM_CT_UNREGISTER,
150                                                 mdt_hsm_ct_unregister),
151 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_STATE_GET,
152                                                 mdt_hsm_state_get),
153 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_STATE_SET,
154                                                 mdt_hsm_state_set),
155 DEF_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_ACTION, mdt_hsm_action),
156 DEF_MDT_HDL(0           | HABEO_REFERO, MDS_HSM_REQUEST, mdt_hsm_request),
157 };
158
159 #define DEF_OBD_HDL(flags, name, fn)                                    \
160         DEFINE_RPC_HANDLER(OBD_PING, flags, name, fn, NULL)
161
162 static struct mdt_handler mdt_obd_ops[] = {
163 DEF_OBD_HDL(0,                          OBD_PING,         mdt_obd_ping),
164 DEF_OBD_HDL(0,                          OBD_LOG_CANCEL,   mdt_obd_log_cancel),
165 DEF_OBD_HDL(0,                          OBD_QC_CALLBACK,  mdt_obd_qc_callback),
166 DEF_OBD_HDL(0,                          OBD_IDX_READ,     mdt_obd_idx_read)
167 };
168
169 #define DEF_DLM_HDL_VAR(flags, name, fn)                                \
170         DEFINE_RPC_HANDLER(LDLM_ENQUEUE, flags, name, fn, NULL)
171 #define DEF_DLM_HDL(flags, name, fn)                                    \
172         DEFINE_RPC_HANDLER(LDLM_ENQUEUE, flags, name, fn, &RQF_ ## name)
173
174 static struct mdt_handler mdt_dlm_ops[] = {
175 DEF_DLM_HDL    (HABEO_CLAVIS,           LDLM_ENQUEUE,     mdt_enqueue),
176 DEF_DLM_HDL_VAR(HABEO_CLAVIS,           LDLM_CONVERT,     mdt_convert),
177 DEF_DLM_HDL_VAR(0,                      LDLM_BL_CALLBACK, mdt_bl_callback),
178 DEF_DLM_HDL_VAR(0,                      LDLM_CP_CALLBACK, mdt_cp_callback)
179 };
180
181 #define DEF_LLOG_HDL(flags, name, fn)                                   \
182         DEFINE_RPC_HANDLER(LLOG_ORIGIN_HANDLE_CREATE, flags, name, fn, NULL)
183
184 static struct mdt_handler mdt_llog_ops[] = {
185 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_CREATE,        mdt_llog_create),
186 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_NEXT_BLOCK,    mdt_llog_next_block),
187 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_READ_HEADER,   mdt_llog_read_header),
188 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_WRITE_REC,     NULL),
189 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_CLOSE,         NULL),
190 DEF_LLOG_HDL(0,         LLOG_ORIGIN_CONNECT,              NULL),
191 DEF_LLOG_HDL(0,         LLOG_CATINFO,                     NULL),
192 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_PREV_BLOCK,    mdt_llog_prev_block),
193 DEF_LLOG_HDL(0,         LLOG_ORIGIN_HANDLE_DESTROY,       mdt_llog_destroy),
194 };
195
196 #define DEF_SEC_HDL(flags, name, fn)                                    \
197         DEFINE_RPC_HANDLER(SEC_CTX_INIT, flags, name, fn, NULL)
198
199 static struct mdt_handler mdt_sec_ctx_ops[] = {
200 DEF_SEC_HDL(0,                          SEC_CTX_INIT,     mdt_sec_ctx_handle),
201 DEF_SEC_HDL(0,                          SEC_CTX_INIT_CONT,mdt_sec_ctx_handle),
202 DEF_SEC_HDL(0,                          SEC_CTX_FINI,     mdt_sec_ctx_handle)
203 };
204
205 #define DEF_QUOTA_HDL(flags, name, fn)                          \
206         DEFINE_RPC_HANDLER(QUOTA_DQACQ, flags, name, fn, &RQF_ ## name)
207
208 static struct mdt_handler mdt_quota_ops[] = {
209 DEF_QUOTA_HDL(HABEO_REFERO,             QUOTA_DQACQ,      mdt_quota_dqacq),
210 };
211
212 struct mdt_opc_slice mdt_regular_handlers[] = {
213         {
214                 .mos_opc_start  = MDS_GETATTR,
215                 .mos_opc_end    = MDS_LAST_OPC,
216                 .mos_hs         = mdt_mds_ops
217         },
218         {
219                 .mos_opc_start  = OBD_PING,
220                 .mos_opc_end    = OBD_LAST_OPC,
221                 .mos_hs         = mdt_obd_ops
222         },
223         {
224                 .mos_opc_start  = LDLM_ENQUEUE,
225                 .mos_opc_end    = LDLM_LAST_OPC,
226                 .mos_hs         = mdt_dlm_ops
227         },
228         {
229                 .mos_opc_start  = LLOG_ORIGIN_HANDLE_CREATE,
230                 .mos_opc_end    = LLOG_LAST_OPC,
231                 .mos_hs         = mdt_llog_ops
232         },
233         {
234                 .mos_opc_start  = SEC_CTX_INIT,
235                 .mos_opc_end    = SEC_LAST_OPC,
236                 .mos_hs         = mdt_sec_ctx_ops
237         },
238         {
239                 .mos_opc_start  = QUOTA_DQACQ,
240                 .mos_opc_end    = QUOTA_LAST_OPC,
241                 .mos_hs         = mdt_quota_ops
242         },
243         {
244                 .mos_hs         = NULL
245         }
246 };
247
248 /* Readpage/readdir handlers */
249 static struct mdt_handler mdt_readpage_ops[] = {
250 DEF_MDT_HDL(0,                  MDS_CONNECT,  mdt_connect),
251 DEF_MDT_HDL(HABEO_CORPUS | HABEO_REFERO, MDS_READPAGE, mdt_readpage),
252 /* XXX: this is ugly and should be fixed one day, see mdc_close() for
253  * detailed comments. --umka */
254 DEF_MDT_HDL(HABEO_CORPUS,               MDS_CLOSE,        mdt_close),
255 DEF_MDT_HDL(HABEO_CORPUS,               MDS_DONE_WRITING, mdt_done_writing),
256 };
257
258 static struct mdt_opc_slice mdt_readpage_handlers[] = {
259         {
260                 .mos_opc_start = MDS_GETATTR,
261                 .mos_opc_end   = MDS_LAST_OPC,
262                 .mos_hs = mdt_readpage_ops
263         },
264         {
265                 .mos_opc_start = OBD_FIRST_OPC,
266                 .mos_opc_end   = OBD_LAST_OPC,
267                 .mos_hs = mdt_obd_ops
268         },
269         {
270                 .mos_hs = NULL
271         }
272 };
273
274 /* Sequence service handlers */
275 #define DEF_SEQ_HDL(flags, name, fn)                                    \
276         DEFINE_RPC_HANDLER(SEQ_QUERY, flags, name, fn, &RQF_ ## name)
277
278 static struct mdt_handler mdt_seq_ops[] = {
279 DEF_SEQ_HDL(0,                          SEQ_QUERY,        (void *)seq_query),
280 };
281
282 struct mdt_opc_slice mdt_seq_handlers[] = {
283         {
284                 .mos_opc_start = SEQ_QUERY,
285                 .mos_opc_end   = SEQ_LAST_OPC,
286                 .mos_hs = mdt_seq_ops
287         },
288         {
289                 .mos_hs = NULL
290         }
291 };
292
293 /* FID Location Database handlers */
294 #define DEF_FLD_HDL(flags, name, fn)                                    \
295         DEFINE_RPC_HANDLER(FLD_QUERY, flags, name, fn, &RQF_ ## name)
296
297 static struct mdt_handler mdt_fld_ops[] = {
298 DEF_FLD_HDL(0,                          FLD_QUERY,        (void *)fld_query),
299 };
300
301 struct mdt_opc_slice mdt_fld_handlers[] = {
302         {
303                 .mos_opc_start = FLD_QUERY,
304                 .mos_opc_end   = FLD_LAST_OPC,
305                 .mos_hs = mdt_fld_ops
306         },
307         {
308                 .mos_hs = NULL
309         }
310 };
311
312 /* Request with a format known in advance */
313 #define DEF_UPDATE_HDL(flags, name, fn)                                 \
314         DEFINE_RPC_HANDLER(UPDATE_OBJ, flags, name, fn, &RQF_ ## name)
315
316 #define target_handler mdt_handler
317 static struct target_handler out_ops[] = {
318         DEF_UPDATE_HDL(MUTABOR,         UPDATE_OBJ,     out_handle),
319 };
320
321 static struct mdt_opc_slice update_handlers[] = {
322         {
323                 .mos_opc_start = MDS_GETATTR,
324                 .mos_opc_end   = MDS_LAST_OPC,
325                 .mos_hs        = mdt_mds_ops
326         },
327         {
328                 .mos_opc_start = OBD_PING,
329                 .mos_opc_end   = OBD_LAST_OPC,
330                 .mos_hs        = mdt_obd_ops
331         },
332         {
333                 .mos_opc_start = LDLM_ENQUEUE,
334                 .mos_opc_end   = LDLM_LAST_OPC,
335                 .mos_hs        = mdt_dlm_ops
336         },
337         {
338                 .mos_opc_start = SEC_CTX_INIT,
339                 .mos_opc_end   = SEC_LAST_OPC,
340                 .mos_hs        = mdt_sec_ctx_ops
341         },
342         {
343                 .mos_opc_start = UPDATE_OBJ,
344                 .mos_opc_end   = UPDATE_LAST_OPC,
345                 .mos_hs        = out_ops
346         },
347         {
348                 .mos_hs        = NULL
349         }
350 };
351
352 static int mds_regular_handle(struct ptlrpc_request *req)
353 {
354         return mdt_handle_common(req, mdt_regular_handlers);
355 }
356
357 static int mds_readpage_handle(struct ptlrpc_request *req)
358 {
359         return mdt_handle_common(req, mdt_readpage_handlers);
360 }
361
362 static int mds_mdsc_handle(struct ptlrpc_request *req)
363 {
364         return mdt_handle_common(req, mdt_seq_handlers);
365 }
366
367 static int mdt_out_handle(struct ptlrpc_request *req)
368 {
369         return mdt_handle_common(req, update_handlers);
370 }
371
372 static int mds_mdss_handle(struct ptlrpc_request *req)
373 {
374         return mdt_handle_common(req, mdt_seq_handlers);
375 }
376
377 static int mds_fld_handle(struct ptlrpc_request *req)
378 {
379         return mdt_handle_common(req, mdt_fld_handlers);
380 }
381
382 /* device init/fini methods */
383 static void mds_stop_ptlrpc_service(struct mds_device *m)
384 {
385         ENTRY;
386         if (m->mds_regular_service != NULL) {
387                 ptlrpc_unregister_service(m->mds_regular_service);
388                 m->mds_regular_service = NULL;
389         }
390         if (m->mds_readpage_service != NULL) {
391                 ptlrpc_unregister_service(m->mds_readpage_service);
392                 m->mds_readpage_service = NULL;
393         }
394         if (m->mds_out_service != NULL) {
395                 ptlrpc_unregister_service(m->mds_out_service);
396                 m->mds_out_service = NULL;
397         }
398         if (m->mds_setattr_service != NULL) {
399                 ptlrpc_unregister_service(m->mds_setattr_service);
400                 m->mds_setattr_service = NULL;
401         }
402         if (m->mds_mdsc_service != NULL) {
403                 ptlrpc_unregister_service(m->mds_mdsc_service);
404                 m->mds_mdsc_service = NULL;
405         }
406         if (m->mds_mdss_service != NULL) {
407                 ptlrpc_unregister_service(m->mds_mdss_service);
408                 m->mds_mdss_service = NULL;
409         }
410         if (m->mds_fld_service != NULL) {
411                 ptlrpc_unregister_service(m->mds_fld_service);
412                 m->mds_fld_service = NULL;
413         }
414         EXIT;
415 }
416
417 static int mds_start_ptlrpc_service(struct mds_device *m)
418 {
419         static struct ptlrpc_service_conf conf;
420         struct obd_device *obd = m->mds_md_dev.md_lu_dev.ld_obd;
421         cfs_proc_dir_entry_t *procfs_entry;
422         int rc = 0;
423         ENTRY;
424
425         procfs_entry = obd->obd_proc_entry;
426         LASSERT(procfs_entry != NULL);
427
428         conf = (typeof(conf)) {
429                 .psc_name               = LUSTRE_MDT_NAME,
430                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
431                 .psc_buf                = {
432                         .bc_nbufs               = MDS_NBUFS,
433                         .bc_buf_size            = MDS_BUFSIZE,
434                         .bc_req_max_size        = MDS_MAXREQSIZE,
435                         .bc_rep_max_size        = MDS_MAXREPSIZE,
436                         .bc_req_portal          = MDS_REQUEST_PORTAL,
437                         .bc_rep_portal          = MDC_REPLY_PORTAL,
438                 },
439                 /*
440                  * We'd like to have a mechanism to set this on a per-device
441                  * basis, but alas...
442                  */
443                 .psc_thr                = {
444                         .tc_thr_name            = LUSTRE_MDT_NAME,
445                         .tc_thr_factor          = MDS_THR_FACTOR,
446                         .tc_nthrs_init          = MDS_NTHRS_INIT,
447                         .tc_nthrs_base          = MDS_NTHRS_BASE,
448                         .tc_nthrs_max           = MDS_NTHRS_MAX,
449                         .tc_nthrs_user          = mds_num_threads,
450                         .tc_cpu_affinity        = 1,
451                         .tc_ctx_tags            = LCT_MD_THREAD,
452                 },
453                 .psc_cpt                = {
454                         .cc_pattern             = mds_num_cpts,
455                 },
456                 .psc_ops                = {
457                         .so_req_handler         = mds_regular_handle,
458                         .so_req_printer         = target_print_req,
459                         .so_hpreq_handler       = ptlrpc_hpreq_handler,
460                 },
461         };
462         m->mds_regular_service = ptlrpc_register_service(&conf, procfs_entry);
463         if (IS_ERR(m->mds_regular_service)) {
464                 rc = PTR_ERR(m->mds_regular_service);
465                 CERROR("failed to start regular mdt service: %d\n", rc);
466                 m->mds_regular_service = NULL;
467
468                 RETURN(rc);
469         }
470
471         /*
472          * readpage service configuration. Parameters have to be adjusted,
473          * ideally.
474          */
475         memset(&conf, 0, sizeof(conf));
476         conf = (typeof(conf)) {
477                 .psc_name               = LUSTRE_MDT_NAME "_readpage",
478                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
479                 .psc_buf                = {
480                         .bc_nbufs               = MDS_NBUFS,
481                         .bc_buf_size            = MDS_BUFSIZE,
482                         .bc_req_max_size        = MDS_MAXREQSIZE,
483                         .bc_rep_max_size        = MDS_MAXREPSIZE,
484                         .bc_req_portal          = MDS_READPAGE_PORTAL,
485                         .bc_rep_portal          = MDC_REPLY_PORTAL,
486                 },
487                 .psc_thr                = {
488                         .tc_thr_name            = LUSTRE_MDT_NAME "_rdpg",
489                         .tc_thr_factor          = MDS_RDPG_THR_FACTOR,
490                         .tc_nthrs_init          = MDS_RDPG_NTHRS_INIT,
491                         .tc_nthrs_base          = MDS_RDPG_NTHRS_BASE,
492                         .tc_nthrs_max           = MDS_RDPG_NTHRS_MAX,
493                         .tc_nthrs_user          = mds_rdpg_num_threads,
494                         .tc_cpu_affinity        = 1,
495                         .tc_ctx_tags            = LCT_MD_THREAD,
496                 },
497                 .psc_cpt                = {
498                         .cc_pattern             = mds_rdpg_num_cpts,
499                 },
500                 .psc_ops                = {
501                         .so_req_handler         = mds_readpage_handle,
502                         .so_req_printer         = target_print_req,
503                 },
504         };
505         m->mds_readpage_service = ptlrpc_register_service(&conf, procfs_entry);
506         if (IS_ERR(m->mds_readpage_service)) {
507                 rc = PTR_ERR(m->mds_readpage_service);
508                 CERROR("failed to start readpage service: %d\n", rc);
509                 m->mds_readpage_service = NULL;
510
511                 GOTO(err_mds_svc, rc);
512         }
513
514         /*
515          * setattr service configuration.
516          *
517          * XXX To keep the compatibility with old client(< 2.2), we need to
518          * preserve this portal for a certain time, it should be removed
519          * eventually. LU-617.
520          */
521         memset(&conf, 0, sizeof(conf));
522         conf = (typeof(conf)) {
523                 .psc_name               = LUSTRE_MDT_NAME "_setattr",
524                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
525                 .psc_buf                = {
526                         .bc_nbufs               = MDS_NBUFS,
527                         .bc_buf_size            = MDS_BUFSIZE,
528                         .bc_req_max_size        = MDS_MAXREQSIZE,
529                         .bc_rep_max_size        = MDS_MAXREPSIZE,
530                         .bc_req_portal          = MDS_SETATTR_PORTAL,
531                         .bc_rep_portal          = MDC_REPLY_PORTAL,
532                 },
533                 .psc_thr                = {
534                         .tc_thr_name            = LUSTRE_MDT_NAME "_attr",
535                         .tc_thr_factor          = MDS_SETA_THR_FACTOR,
536                         .tc_nthrs_init          = MDS_SETA_NTHRS_INIT,
537                         .tc_nthrs_base          = MDS_SETA_NTHRS_BASE,
538                         .tc_nthrs_max           = MDS_SETA_NTHRS_MAX,
539                         .tc_nthrs_user          = mds_attr_num_threads,
540                         .tc_cpu_affinity        = 1,
541                         .tc_ctx_tags            = LCT_MD_THREAD,
542                 },
543                 .psc_cpt                = {
544                         .cc_pattern             = mds_attr_num_cpts,
545                 },
546                 .psc_ops                = {
547                         .so_req_handler         = mds_regular_handle,
548                         .so_req_printer         = target_print_req,
549                         .so_hpreq_handler       = NULL,
550                 },
551         };
552         m->mds_setattr_service = ptlrpc_register_service(&conf, procfs_entry);
553         if (IS_ERR(m->mds_setattr_service)) {
554                 rc = PTR_ERR(m->mds_setattr_service);
555                 CERROR("failed to start setattr service: %d\n", rc);
556                 m->mds_setattr_service = NULL;
557
558                 GOTO(err_mds_svc, rc);
559         }
560
561         /* Object update service */
562         conf = (typeof(conf)) {
563                 .psc_name               = LUSTRE_MDT_NAME "_out",
564                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
565                 .psc_buf                = {
566                         .bc_nbufs               = MDS_NBUFS,
567                         .bc_buf_size            = MDS_BUFSIZE,
568                         .bc_req_max_size        = MDS_MAXREQSIZE,
569                         .bc_rep_max_size        = MDS_MAXREPSIZE,
570                         .bc_req_portal          = MDS_MDS_PORTAL,
571                         .bc_rep_portal          = MDC_REPLY_PORTAL,
572                 },
573                 /*
574                  * We'd like to have a mechanism to set this on a per-device
575                  * basis, but alas...
576                  */
577                 .psc_thr                = {
578                         .tc_thr_name            = LUSTRE_MDT_NAME "_out",
579                         .tc_thr_factor          = MDS_THR_FACTOR,
580                         .tc_nthrs_init          = MDS_NTHRS_INIT,
581                         .tc_nthrs_base          = MDS_NTHRS_BASE,
582                         .tc_nthrs_max           = MDS_NTHRS_MAX,
583                         .tc_nthrs_user          = mds_num_threads,
584                         .tc_cpu_affinity        = 1,
585                         .tc_ctx_tags            = LCT_MD_THREAD,
586                 },
587                 .psc_cpt                = {
588                         .cc_pattern             = mds_num_cpts,
589                 },
590                 .psc_ops                = {
591                         .so_req_handler         = mdt_out_handle,
592                         .so_req_printer         = target_print_req,
593                         .so_hpreq_handler       = NULL,
594                 },
595         };
596         m->mds_out_service = ptlrpc_register_service(&conf, procfs_entry);
597         if (IS_ERR(m->mds_out_service)) {
598                 rc = PTR_ERR(m->mds_out_service);
599                 CERROR("failed to start out service: %d\n", rc);
600                 m->mds_out_service = NULL;
601                 GOTO(err_mds_svc, rc);
602         }
603
604         /*
605          * sequence controller service configuration
606          */
607         memset(&conf, 0, sizeof(conf));
608         conf = (typeof(conf)) {
609                 .psc_name               = LUSTRE_MDT_NAME "_seqs",
610                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
611                 .psc_buf                = {
612                         .bc_nbufs               = MDS_NBUFS,
613                         .bc_buf_size            = MDS_BUFSIZE,
614                         .bc_req_max_size        = SEQ_MAXREQSIZE,
615                         .bc_rep_max_size        = SEQ_MAXREPSIZE,
616                         .bc_req_portal          = SEQ_CONTROLLER_PORTAL,
617                         .bc_rep_portal          = MDC_REPLY_PORTAL,
618                 },
619                 .psc_thr                = {
620                         .tc_thr_name            = LUSTRE_MDT_NAME "_seqs",
621                         .tc_nthrs_init          = MDS_OTHR_NTHRS_INIT,
622                         .tc_nthrs_max           = MDS_OTHR_NTHRS_MAX,
623                         .tc_ctx_tags            = LCT_MD_THREAD,
624                 },
625                 .psc_ops                = {
626                         .so_req_handler         = mds_mdsc_handle,
627                         .so_req_printer         = target_print_req,
628                         .so_hpreq_handler       = NULL,
629                 },
630         };
631         m->mds_mdsc_service = ptlrpc_register_service(&conf, procfs_entry);
632         if (IS_ERR(m->mds_mdsc_service)) {
633                 rc = PTR_ERR(m->mds_mdsc_service);
634                 CERROR("failed to start seq controller service: %d\n", rc);
635                 m->mds_mdsc_service = NULL;
636
637                 GOTO(err_mds_svc, rc);
638         }
639
640         /*
641          * metadata sequence server service configuration
642          */
643         memset(&conf, 0, sizeof(conf));
644         conf = (typeof(conf)) {
645                 .psc_name               = LUSTRE_MDT_NAME "_seqm",
646                 .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
647                 .psc_buf                = {
648                         .bc_nbufs               = MDS_NBUFS,
649                         .bc_buf_size            = MDS_BUFSIZE,
650                         .bc_req_max_size        = SEQ_MAXREQSIZE,
651                         .bc_rep_max_size        = SEQ_MAXREPSIZE,
652                         .bc_req_portal          = SEQ_METADATA_PORTAL,
653                         .bc_rep_portal          = MDC_REPLY_PORTAL,
654                 },
655                 .psc_thr                = {
656                         .tc_thr_name            = LUSTRE_MDT_NAME "_seqm",
657                         .tc_nthrs_init          = MDS_OTHR_NTHRS_INIT,
658                         .tc_nthrs_max           = MDS_OTHR_NTHRS_MAX,
659                         .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD
660                 },
661                 .psc_ops                = {
662                         .so_req_handler         = mds_mdss_handle,
663                         .so_req_printer         = target_print_req,
664                         .so_hpreq_handler       = NULL,
665                 },
666         };
667         m->mds_mdss_service = ptlrpc_register_service(&conf, procfs_entry);
668         if (IS_ERR(m->mds_mdss_service)) {
669                 rc = PTR_ERR(m->mds_mdss_service);
670                 CERROR("failed to start metadata seq server service: %d\n", rc);
671                 m->mds_mdss_service = NULL;
672
673                 GOTO(err_mds_svc, rc);
674         }
675
676         /* FLD service start */
677         memset(&conf, 0, sizeof(conf));
678         conf = (typeof(conf)) {
679                 .psc_name            = LUSTRE_MDT_NAME "_fld",
680                 .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
681                 .psc_buf                = {
682                         .bc_nbufs               = MDS_NBUFS,
683                         .bc_buf_size            = MDS_BUFSIZE,
684                         .bc_req_max_size        = FLD_MAXREQSIZE,
685                         .bc_rep_max_size        = FLD_MAXREPSIZE,
686                         .bc_req_portal          = FLD_REQUEST_PORTAL,
687                         .bc_rep_portal          = MDC_REPLY_PORTAL,
688                 },
689                 .psc_thr                = {
690                         .tc_thr_name            = LUSTRE_MDT_NAME "_fld",
691                         .tc_nthrs_init          = MDS_OTHR_NTHRS_INIT,
692                         .tc_nthrs_max           = MDS_OTHR_NTHRS_MAX,
693                         .tc_ctx_tags            = LCT_DT_THREAD | LCT_MD_THREAD
694                 },
695                 .psc_ops                = {
696                         .so_req_handler         = mds_fld_handle,
697                         .so_req_printer         = target_print_req,
698                         .so_hpreq_handler       = NULL,
699                 },
700         };
701         m->mds_fld_service = ptlrpc_register_service(&conf, procfs_entry);
702         if (IS_ERR(m->mds_fld_service)) {
703                 rc = PTR_ERR(m->mds_fld_service);
704                 CERROR("failed to start fld service: %d\n", rc);
705                 m->mds_fld_service = NULL;
706
707                 GOTO(err_mds_svc, rc);
708         }
709
710         EXIT;
711 err_mds_svc:
712         if (rc)
713                 mds_stop_ptlrpc_service(m);
714
715         return rc;
716 }
717
718 static inline struct mds_device *mds_dev(struct lu_device *d)
719 {
720         return container_of0(d, struct mds_device, mds_md_dev.md_lu_dev);
721 }
722
723 static struct lu_device *mds_device_fini(const struct lu_env *env,
724                                          struct lu_device *d)
725 {
726         struct mds_device *m = mds_dev(d);
727         struct obd_device *obd = d->ld_obd;
728         ENTRY;
729
730         mds_stop_ptlrpc_service(m);
731         lprocfs_obd_cleanup(obd);
732         RETURN(NULL);
733 }
734
735 static struct lu_device *mds_device_free(const struct lu_env *env,
736                                          struct lu_device *d)
737 {
738         struct mds_device *m = mds_dev(d);
739         ENTRY;
740
741         md_device_fini(&m->mds_md_dev);
742         OBD_FREE_PTR(m);
743         RETURN(NULL);
744 }
745
746 static struct lu_device *mds_device_alloc(const struct lu_env *env,
747                                           struct lu_device_type *t,
748                                           struct lustre_cfg *cfg)
749 {
750         struct mds_device        *m;
751         struct obd_device        *obd;
752         struct lu_device          *l;
753         int rc;
754
755         OBD_ALLOC_PTR(m);
756         if (m == NULL)
757                 return ERR_PTR(-ENOMEM);
758
759         md_device_init(&m->mds_md_dev, t);
760         l = &m->mds_md_dev.md_lu_dev;
761
762         obd = class_name2obd(lustre_cfg_string(cfg, 0));
763         LASSERT(obd != NULL);
764
765         l->ld_obd = obd;
766         /* set this lu_device to obd, because error handling need it */
767         obd->obd_lu_dev = l;
768
769         rc = lprocfs_obd_setup(obd, lprocfs_mds_obd_vars);
770         if (rc != 0) {
771                 mds_device_free(env, l);
772                 l = ERR_PTR(rc);
773                 return l;
774         }
775
776         rc = mds_start_ptlrpc_service(m);
777
778         if (rc != 0) {
779                 mds_device_free(env, l);
780                 l = ERR_PTR(rc);
781                 return l;
782         }
783
784         return l;
785 }
786
787 /* type constructor/destructor: mdt_type_init, mdt_type_fini */
788 LU_TYPE_INIT_FINI(mds, &mdt_thread_key);
789
790 static struct lu_device_type_operations mds_device_type_ops = {
791         .ldto_init = mds_type_init,
792         .ldto_fini = mds_type_fini,
793
794         .ldto_start = mds_type_start,
795         .ldto_stop  = mds_type_stop,
796
797         .ldto_device_alloc = mds_device_alloc,
798         .ldto_device_free  = mds_device_free,
799         .ldto_device_fini  = mds_device_fini
800 };
801
802 static struct lu_device_type mds_device_type = {
803         .ldt_tags     = LU_DEVICE_MD,
804         .ldt_name     = LUSTRE_MDS_NAME,
805         .ldt_ops      = &mds_device_type_ops,
806         .ldt_ctx_tags = LCT_MD_THREAD
807 };
808
809 static struct obd_ops mds_obd_device_ops = {
810         .o_owner           = THIS_MODULE,
811 };
812
813 int mds_mod_init(void)
814 {
815         int rc;
816
817         if (mdt_num_threads != 0 && mds_num_threads == 0) {
818                 LCONSOLE_INFO("mdt_num_threads module parameter is deprecated, "
819                               "use mds_num_threads instead or unset both for "
820                               "dynamic thread startup\n");
821                 mds_num_threads = mdt_num_threads;
822         }
823
824         rc = class_register_type(&mds_obd_device_ops, NULL,
825                                  lprocfs_mds_module_vars, LUSTRE_MDS_NAME,
826                                  &mds_device_type);
827         return rc;
828 }
829
830 void mds_mod_exit(void)
831 {
832         class_unregister_type(LUSTRE_MDS_NAME);
833 }