Whamcloud - gitweb
LU-13004 ptlrpc: Allow BULK_BUF_KIOV to accept a kvec
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/ost/ost_handler.c
33  *
34  * Author: Peter J. Braam <braam@clusterfs.com>
35  * Author: Phil Schwan <phil@clusterfs.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_OST
39
40 #include <linux/module.h>
41 #include <lustre_dlm.h>
42 #include <lprocfs_status.h>
43 #include <obd_class.h>
44 #include "ost_internal.h"
45
46 int oss_max_threads = 512;
47 module_param(oss_max_threads, int, 0444);
48 MODULE_PARM_DESC(oss_max_threads, "maximum number of OSS service threads");
49
50 static int oss_num_threads;
51 module_param(oss_num_threads, int, 0444);
52 MODULE_PARM_DESC(oss_num_threads, "number of OSS service threads to start");
53
54 static unsigned int oss_cpu_bind = 1;
55 module_param(oss_cpu_bind, uint, 0444);
56 MODULE_PARM_DESC(oss_cpu_bind,
57                  "bind OSS service threads to particular CPU partitions");
58
59 static int oss_num_create_threads;
60 module_param(oss_num_create_threads, int, 0444);
61 MODULE_PARM_DESC(oss_num_create_threads, "number of OSS create threads to start");
62
63 static unsigned int oss_create_cpu_bind = 1;
64 module_param(oss_create_cpu_bind, uint, 0444);
65 MODULE_PARM_DESC(oss_create_cpu_bind,
66                  "bind OSS create threads to particular CPU partitions");
67
68 static char *oss_cpts;
69 module_param(oss_cpts, charp, 0444);
70 MODULE_PARM_DESC(oss_cpts, "CPU partitions OSS threads should run on");
71
72 static char *oss_io_cpts;
73 module_param(oss_io_cpts, charp, 0444);
74 MODULE_PARM_DESC(oss_io_cpts, "CPU partitions OSS IO threads should run on");
75
76 #define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000)
77
78 static struct cfs_cpt_table     *ost_io_cptable;
79
80 /* Sigh - really, this is an OSS, the _server_, not the _target_ */
81 static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
82 {
83         static struct ptlrpc_service_conf       svc_conf;
84         struct ost_obd *ost = &obd->u.ost;
85         nodemask_t              *mask;
86         int rc;
87         ENTRY;
88
89         rc = lprocfs_obd_setup(obd, true);
90         if (rc)
91                 return rc;
92
93         mutex_init(&ost->ost_health_mutex);
94
95         svc_conf = (typeof(svc_conf)) {
96                 .psc_name               = LUSTRE_OSS_NAME,
97                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
98                 .psc_buf                = {
99                         .bc_nbufs               = OST_NBUFS,
100                         .bc_buf_size            = OST_BUFSIZE,
101                         .bc_req_max_size        = OST_MAXREQSIZE,
102                         .bc_rep_max_size        = OST_MAXREPSIZE,
103                         .bc_req_portal          = OST_REQUEST_PORTAL,
104                         .bc_rep_portal          = OSC_REPLY_PORTAL,
105                 },
106                 .psc_thr                = {
107                         .tc_thr_name            = "ll_ost",
108                         .tc_thr_factor          = OSS_THR_FACTOR,
109                         .tc_nthrs_init          = OSS_NTHRS_INIT,
110                         .tc_nthrs_base          = OSS_NTHRS_BASE,
111                         .tc_nthrs_max           = oss_max_threads,
112                         .tc_nthrs_user          = oss_num_threads,
113                         .tc_cpu_bind            = oss_cpu_bind,
114                         .tc_ctx_tags            = LCT_DT_THREAD,
115                 },
116                 .psc_cpt                = {
117                         .cc_pattern             = oss_cpts,
118                         .cc_affinity            = true,
119                 },
120                 .psc_ops                = {
121                         .so_req_handler         = tgt_request_handle,
122                         .so_req_printer         = target_print_req,
123                         .so_hpreq_handler       = ptlrpc_hpreq_handler,
124                 },
125         };
126         ost->ost_service = ptlrpc_register_service(&svc_conf,
127                                                    &obd->obd_kset,
128                                                    obd->obd_debugfs_entry);
129         if (IS_ERR(ost->ost_service)) {
130                 rc = PTR_ERR(ost->ost_service);
131                 CERROR("failed to start service: %d\n", rc);
132                 GOTO(out_lprocfs, rc);
133         }
134
135         memset(&svc_conf, 0, sizeof(svc_conf));
136         svc_conf = (typeof(svc_conf)) {
137                 .psc_name               = "ost_create",
138                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
139                 .psc_buf                = {
140                         .bc_nbufs               = OST_NBUFS,
141                         .bc_buf_size            = OST_BUFSIZE,
142                         .bc_req_max_size        = OST_MAXREQSIZE,
143                         .bc_rep_max_size        = OST_MAXREPSIZE,
144                         .bc_req_portal          = OST_CREATE_PORTAL,
145                         .bc_rep_portal          = OSC_REPLY_PORTAL,
146                 },
147                 .psc_thr                = {
148                         .tc_thr_name            = "ll_ost_create",
149                         .tc_thr_factor          = OSS_CR_THR_FACTOR,
150                         .tc_nthrs_init          = OSS_CR_NTHRS_INIT,
151                         .tc_nthrs_base          = OSS_CR_NTHRS_BASE,
152                         .tc_nthrs_max           = OSS_CR_NTHRS_MAX,
153                         .tc_nthrs_user          = oss_num_create_threads,
154                         .tc_cpu_bind            = oss_create_cpu_bind,
155                         .tc_ctx_tags            = LCT_DT_THREAD,
156                 },
157                 .psc_cpt                = {
158                         .cc_pattern             = oss_cpts,
159                         .cc_affinity            = true,
160                 },
161                 .psc_ops                = {
162                         .so_req_handler         = tgt_request_handle,
163                         .so_req_printer         = target_print_req,
164                 },
165         };
166         ost->ost_create_service = ptlrpc_register_service(&svc_conf,
167                                                           &obd->obd_kset,
168                                                           obd->obd_debugfs_entry);
169         if (IS_ERR(ost->ost_create_service)) {
170                 rc = PTR_ERR(ost->ost_create_service);
171                 CERROR("failed to start OST create service: %d\n", rc);
172                 GOTO(out_service, rc);
173         }
174
175         mask = cfs_cpt_nodemask(cfs_cpt_table, CFS_CPT_ANY);
176         /* event CPT feature is disabled in libcfs level by set partition
177          * number to 1, we still want to set node affinity for io service */
178         if (cfs_cpt_number(cfs_cpt_table) == 1 && nodes_weight(*mask) > 1) {
179                 int     cpt = 0;
180                 int     i;
181
182                 ost_io_cptable = cfs_cpt_table_alloc(nodes_weight(*mask));
183                 for_each_node_mask(i, *mask) {
184                         if (ost_io_cptable == NULL) {
185                                 CWARN("OSS failed to create CPT table\n");
186                                 break;
187                         }
188
189                         rc = cfs_cpt_set_node(ost_io_cptable, cpt++, i);
190                         if (!rc) {
191                                 CWARN("OSS Failed to set node %d for"
192                                       "IO CPT table\n", i);
193                                 cfs_cpt_table_free(ost_io_cptable);
194                                 ost_io_cptable = NULL;
195                                 break;
196                         }
197                 }
198         }
199
200         memset(&svc_conf, 0, sizeof(svc_conf));
201         svc_conf = (typeof(svc_conf)) {
202                 .psc_name               = "ost_io",
203                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
204                 .psc_buf                = {
205                         .bc_nbufs               = OST_NBUFS,
206                         .bc_buf_size            = OST_IO_BUFSIZE,
207                         .bc_req_max_size        = OST_IO_MAXREQSIZE,
208                         .bc_rep_max_size        = OST_IO_MAXREPSIZE,
209                         .bc_req_portal          = OST_IO_PORTAL,
210                         .bc_rep_portal          = OSC_REPLY_PORTAL,
211                 },
212                 .psc_thr                = {
213                         .tc_thr_name            = "ll_ost_io",
214                         .tc_thr_factor          = OSS_THR_FACTOR,
215                         .tc_nthrs_init          = OSS_NTHRS_INIT,
216                         .tc_nthrs_base          = OSS_NTHRS_BASE,
217                         .tc_nthrs_max           = oss_max_threads,
218                         .tc_nthrs_user          = oss_num_threads,
219                         .tc_cpu_bind            = oss_cpu_bind,
220                         .tc_ctx_tags            = LCT_DT_THREAD,
221                 },
222                 .psc_cpt                = {
223                         .cc_cptable             = ost_io_cptable,
224                         .cc_pattern             = ost_io_cptable == NULL ?
225                                                   oss_io_cpts : NULL,
226                         .cc_affinity            = true,
227                 },
228                 .psc_ops                = {
229                         .so_thr_init            = tgt_io_thread_init,
230                         .so_thr_done            = tgt_io_thread_done,
231                         .so_req_handler         = tgt_request_handle,
232                         .so_hpreq_handler       = tgt_hpreq_handler,
233                         .so_req_printer         = target_print_req,
234                 },
235         };
236         ost->ost_io_service = ptlrpc_register_service(&svc_conf,
237                                                       &obd->obd_kset,
238                                                       obd->obd_debugfs_entry);
239         if (IS_ERR(ost->ost_io_service)) {
240                 rc = PTR_ERR(ost->ost_io_service);
241                 CERROR("failed to start OST I/O service: %d\n", rc);
242                 ost->ost_io_service = NULL;
243                 GOTO(out_create, rc);
244         }
245
246         memset(&svc_conf, 0, sizeof(svc_conf));
247         svc_conf = (typeof(svc_conf)) {
248                 .psc_name               = "ost_seq",
249                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
250                 .psc_buf                = {
251                         .bc_nbufs               = OST_NBUFS,
252                         .bc_buf_size            = OST_BUFSIZE,
253                         .bc_req_max_size        = OST_MAXREQSIZE,
254                         .bc_rep_max_size        = OST_MAXREPSIZE,
255                         .bc_req_portal          = SEQ_DATA_PORTAL,
256                         .bc_rep_portal          = OSC_REPLY_PORTAL,
257                 },
258                 .psc_thr                = {
259                         .tc_thr_name            = "ll_ost_seq",
260                         .tc_thr_factor          = OSS_CR_THR_FACTOR,
261                         .tc_nthrs_init          = OSS_CR_NTHRS_INIT,
262                         .tc_nthrs_base          = OSS_CR_NTHRS_BASE,
263                         .tc_nthrs_max           = OSS_CR_NTHRS_MAX,
264                         .tc_nthrs_user          = oss_num_create_threads,
265                         .tc_cpu_bind            = oss_create_cpu_bind,
266                         .tc_ctx_tags            = LCT_DT_THREAD,
267                 },
268
269                 .psc_cpt                = {
270                         .cc_pattern             = oss_cpts,
271                         .cc_affinity            = true,
272                 },
273                 .psc_ops                = {
274                         .so_req_handler         = tgt_request_handle,
275                         .so_req_printer         = target_print_req,
276                         .so_hpreq_handler       = NULL,
277                 },
278         };
279         ost->ost_seq_service = ptlrpc_register_service(&svc_conf,
280                                                        &obd->obd_kset,
281                                                        obd->obd_debugfs_entry);
282         if (IS_ERR(ost->ost_seq_service)) {
283                 rc = PTR_ERR(ost->ost_seq_service);
284                 CERROR("failed to start OST seq service: %d\n", rc);
285                 ost->ost_seq_service = NULL;
286                 GOTO(out_io, rc);
287         }
288
289         /* Object update service */
290         memset(&svc_conf, 0, sizeof(svc_conf));
291         svc_conf = (typeof(svc_conf)) {
292                 .psc_name               = "ost_out",
293                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
294                 .psc_buf                = {
295                         .bc_nbufs               = OST_NBUFS,
296                         .bc_buf_size            = OUT_BUFSIZE,
297                         .bc_req_max_size        = OUT_MAXREQSIZE,
298                         .bc_rep_max_size        = OUT_MAXREPSIZE,
299                         .bc_req_portal          = OUT_PORTAL,
300                         .bc_rep_portal          = OSC_REPLY_PORTAL,
301                 },
302                 /*
303                  * We'd like to have a mechanism to set this on a per-device
304                  * basis, but alas...
305                  */
306                 .psc_thr                = {
307                         .tc_thr_name            = "ll_ost_out",
308                         .tc_thr_factor          = OSS_CR_THR_FACTOR,
309                         .tc_nthrs_init          = OSS_CR_NTHRS_INIT,
310                         .tc_nthrs_base          = OSS_CR_NTHRS_BASE,
311                         .tc_nthrs_max           = OSS_CR_NTHRS_MAX,
312                         .tc_nthrs_user          = oss_num_create_threads,
313                         .tc_cpu_bind            = oss_create_cpu_bind,
314                         .tc_ctx_tags            = LCT_MD_THREAD |
315                                                   LCT_DT_THREAD,
316                 },
317                 .psc_cpt                = {
318                         .cc_pattern             = oss_cpts,
319                         .cc_affinity            = true,
320                 },
321                 .psc_ops                = {
322                         .so_req_handler         = tgt_request_handle,
323                         .so_req_printer         = target_print_req,
324                         .so_hpreq_handler       = NULL,
325                 },
326         };
327         ost->ost_out_service = ptlrpc_register_service(&svc_conf,
328                                                        &obd->obd_kset,
329                                                        obd->obd_debugfs_entry);
330         if (IS_ERR(ost->ost_out_service)) {
331                 rc = PTR_ERR(ost->ost_out_service);
332                 CERROR("failed to start out service: %d\n", rc);
333                 ost->ost_out_service = NULL;
334                 GOTO(out_seq, rc);
335         }
336
337         ping_evictor_start();
338
339         RETURN(0);
340
341 out_seq:
342         ptlrpc_unregister_service(ost->ost_seq_service);
343         ost->ost_seq_service = NULL;
344 out_io:
345         ptlrpc_unregister_service(ost->ost_io_service);
346         ost->ost_io_service = NULL;
347 out_create:
348         ptlrpc_unregister_service(ost->ost_create_service);
349         ost->ost_create_service = NULL;
350 out_service:
351         ptlrpc_unregister_service(ost->ost_service);
352         ost->ost_service = NULL;
353 out_lprocfs:
354         lprocfs_obd_cleanup(obd);
355         RETURN(rc);
356 }
357
358 static int ost_cleanup(struct obd_device *obd)
359 {
360         struct ost_obd *ost = &obd->u.ost;
361         int err = 0;
362         ENTRY;
363
364         ping_evictor_stop();
365
366         /* there is no recovery for OST OBD, all recovery is controlled by
367          * obdfilter OBD */
368         LASSERT(obd->obd_recovering == 0);
369         mutex_lock(&ost->ost_health_mutex);
370         ptlrpc_unregister_service(ost->ost_service);
371         ptlrpc_unregister_service(ost->ost_create_service);
372         ptlrpc_unregister_service(ost->ost_io_service);
373         ptlrpc_unregister_service(ost->ost_seq_service);
374         ptlrpc_unregister_service(ost->ost_out_service);
375
376         ost->ost_service = NULL;
377         ost->ost_create_service = NULL;
378         ost->ost_io_service = NULL;
379         ost->ost_seq_service = NULL;
380         ost->ost_out_service = NULL;
381
382         mutex_unlock(&ost->ost_health_mutex);
383
384         lprocfs_obd_cleanup(obd);
385
386         if (ost_io_cptable != NULL) {
387                 cfs_cpt_table_free(ost_io_cptable);
388                 ost_io_cptable = NULL;
389         }
390
391         RETURN(err);
392 }
393
394 static int ost_health_check(const struct lu_env *env, struct obd_device *obd)
395 {
396         struct ost_obd *ost = &obd->u.ost;
397         int rc = 0;
398
399         mutex_lock(&ost->ost_health_mutex);
400         rc |= ptlrpc_service_health_check(ost->ost_service);
401         rc |= ptlrpc_service_health_check(ost->ost_create_service);
402         rc |= ptlrpc_service_health_check(ost->ost_io_service);
403         rc |= ptlrpc_service_health_check(ost->ost_seq_service);
404         mutex_unlock(&ost->ost_health_mutex);
405
406         return rc != 0 ? 1 : 0;
407 }
408
409 /* use obd ops to offer management infrastructure */
410 static const struct obd_ops ost_obd_ops = {
411         .o_owner        = THIS_MODULE,
412         .o_setup        = ost_setup,
413         .o_cleanup      = ost_cleanup,
414         .o_health_check = ost_health_check,
415 };
416
417
418 static int __init ost_init(void)
419 {
420         int rc;
421
422         ENTRY;
423
424         rc = class_register_type(&ost_obd_ops, NULL, false, NULL,
425                                  LUSTRE_OSS_NAME, NULL);
426
427         RETURN(rc);
428 }
429
430 static void __exit ost_exit(void)
431 {
432         class_unregister_type(LUSTRE_OSS_NAME);
433 }
434
435 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
436 MODULE_DESCRIPTION("Lustre Object Storage Target (OST)");
437 MODULE_VERSION(LUSTRE_VERSION_STRING);
438 MODULE_LICENSE("GPL");
439
440 module_init(ost_init);
441 module_exit(ost_exit);