Whamcloud - gitweb
LU-17705 ptlrpc: replace synchronize_rcu() with rcu_barrier()
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/ost/ost_handler.c
32  *
33  * Author: Peter J. Braam <braam@clusterfs.com>
34  * Author: Phil Schwan <phil@clusterfs.com>
35  */
36
37 #define DEBUG_SUBSYSTEM S_OST
38
39 #include <linux/module.h>
40 #include <lustre_dlm.h>
41 #include <lprocfs_status.h>
42 #include <lustre_nodemap.h>
43 #include <obd_class.h>
44 #include "ost_internal.h"
45
46 int oss_max_threads = 512;
47 module_param(oss_max_threads, int, 0444);
48 MODULE_PARM_DESC(oss_max_threads, "maximum number of OSS service threads");
49
50 static int oss_num_threads;
51 module_param(oss_num_threads, int, 0444);
52 MODULE_PARM_DESC(oss_num_threads, "number of OSS service threads to start");
53
54 static unsigned int oss_cpu_bind = 1;
55 module_param(oss_cpu_bind, uint, 0444);
56 MODULE_PARM_DESC(oss_cpu_bind,
57                  "bind OSS service threads to particular CPU partitions");
58
59 static int oss_num_create_threads;
60 module_param(oss_num_create_threads, int, 0444);
61 MODULE_PARM_DESC(oss_num_create_threads,
62                  "number of OSS create threads to start");
63
64 static unsigned int oss_create_cpu_bind = 1;
65 module_param(oss_create_cpu_bind, uint, 0444);
66 MODULE_PARM_DESC(oss_create_cpu_bind,
67                  "bind OSS create threads to particular CPU partitions");
68
69 static char *oss_cpts;
70 module_param(oss_cpts, charp, 0444);
71 MODULE_PARM_DESC(oss_cpts, "CPU partitions OSS threads should run on");
72
73 static char *oss_io_cpts;
74 module_param(oss_io_cpts, charp, 0444);
75 MODULE_PARM_DESC(oss_io_cpts, "CPU partitions OSS IO threads should run on");
76
77 #define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000)
78
79 static struct cfs_cpt_table *ost_io_cptable;
80
81 /* Sigh - really, this is an OSS, the _server_, not the _target_ */
82 static int ost_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
83 {
84         static struct ptlrpc_service_conf svc_conf;
85         struct ost_obd *ost = obd2ost(obd);
86         nodemask_t *mask;
87         int rc;
88
89         ENTRY;
90
91         rc = lprocfs_obd_setup(obd, true);
92         if (rc)
93                 return rc;
94
95         mutex_init(&ost->ost_health_mutex);
96
97         svc_conf = (typeof(svc_conf)) {
98                 .psc_name               = LUSTRE_OSS_NAME,
99                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
100                 .psc_buf                = {
101                         .bc_nbufs               = OST_NBUFS,
102                         .bc_buf_size            = OST_BUFSIZE,
103                         .bc_req_max_size        = OST_MAXREQSIZE,
104                         .bc_rep_max_size        = OST_MAXREPSIZE,
105                         .bc_req_portal          = OST_REQUEST_PORTAL,
106                         .bc_rep_portal          = OSC_REPLY_PORTAL,
107                 },
108                 .psc_thr                = {
109                         .tc_thr_name            = "ll_ost",
110                         .tc_thr_factor          = OSS_THR_FACTOR,
111                         .tc_nthrs_init          = OSS_NTHRS_INIT,
112                         .tc_nthrs_base          = OSS_NTHRS_BASE,
113                         .tc_nthrs_max           = oss_max_threads,
114                         .tc_nthrs_user          = oss_num_threads,
115                         .tc_cpu_bind            = oss_cpu_bind,
116                         .tc_ctx_tags            = LCT_DT_THREAD,
117                 },
118                 .psc_cpt                = {
119                         .cc_pattern             = oss_cpts,
120                         .cc_affinity            = true,
121                 },
122                 .psc_ops                = {
123                         .so_req_handler         = tgt_request_handle,
124                         .so_req_printer         = target_print_req,
125                         .so_hpreq_handler       = ptlrpc_hpreq_handler,
126                 },
127         };
128         ost->ost_service = ptlrpc_register_service(&svc_conf,
129                                                    &obd->obd_kset,
130                                                    obd->obd_debugfs_entry);
131         if (IS_ERR(ost->ost_service)) {
132                 rc = PTR_ERR(ost->ost_service);
133                 CERROR("failed to start service: %d\n", rc);
134                 GOTO(out_lprocfs, rc);
135         }
136
137         memset(&svc_conf, 0, sizeof(svc_conf));
138         svc_conf = (typeof(svc_conf)) {
139                 .psc_name               = "ost_create",
140                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
141                 .psc_buf                = {
142                         .bc_nbufs               = OST_NBUFS,
143                         .bc_buf_size            = OST_BUFSIZE,
144                         .bc_req_max_size        = OST_MAXREQSIZE,
145                         .bc_rep_max_size        = OST_MAXREPSIZE,
146                         .bc_req_portal          = OST_CREATE_PORTAL,
147                         .bc_rep_portal          = OSC_REPLY_PORTAL,
148                 },
149                 .psc_thr                = {
150                         .tc_thr_name            = "ll_ost_create",
151                         .tc_thr_factor          = OSS_CR_THR_FACTOR,
152                         .tc_nthrs_init          = OSS_CR_NTHRS_INIT,
153                         .tc_nthrs_base          = OSS_CR_NTHRS_BASE,
154                         .tc_nthrs_max           = OSS_CR_NTHRS_MAX,
155                         .tc_nthrs_user          = oss_num_create_threads,
156                         .tc_cpu_bind            = oss_create_cpu_bind,
157                         .tc_ctx_tags            = LCT_DT_THREAD,
158                 },
159                 .psc_cpt                = {
160                         .cc_pattern             = oss_cpts,
161                         .cc_affinity            = true,
162                 },
163                 .psc_ops                = {
164                         .so_req_handler         = tgt_request_handle,
165                         .so_req_printer         = target_print_req,
166                 },
167         };
168         ost->ost_create_service = ptlrpc_register_service(&svc_conf,
169                                                           &obd->obd_kset,
170                                                           obd->obd_debugfs_entry
171                                                           );
172         if (IS_ERR(ost->ost_create_service)) {
173                 rc = PTR_ERR(ost->ost_create_service);
174                 CERROR("failed to start OST create service: %d\n", rc);
175                 GOTO(out_service, rc);
176         }
177
178         mask = cfs_cpt_nodemask(cfs_cpt_tab, CFS_CPT_ANY);
179         /* event CPT feature is disabled in libcfs level by set partition
180          * number to 1, we still want to set node affinity for io service
181          */
182         if (cfs_cpt_number(cfs_cpt_tab) == 1 && nodes_weight(*mask) > 1) {
183                 int     cpt = 0;
184                 int     i;
185
186                 ost_io_cptable = cfs_cpt_table_alloc(nodes_weight(*mask));
187                 for_each_node_mask(i, *mask) {
188                         if (!ost_io_cptable) {
189                                 CWARN("OSS failed to create CPT table\n");
190                                 break;
191                         }
192
193                         rc = cfs_cpt_set_node(ost_io_cptable, cpt++, i);
194                         if (!rc) {
195                                 CWARN("OSS Failed to set node %d for IO CPT table\n",
196                                       i);
197                                 cfs_cpt_table_free(ost_io_cptable);
198                                 ost_io_cptable = NULL;
199                                 break;
200                         }
201                 }
202         }
203
204         memset(&svc_conf, 0, sizeof(svc_conf));
205         svc_conf = (typeof(svc_conf)) {
206                 .psc_name               = "ost_io",
207                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
208                 .psc_buf                = {
209                         .bc_nbufs               = OST_NBUFS,
210                         .bc_buf_size            = OST_IO_BUFSIZE,
211                         .bc_req_max_size        = OST_IO_MAXREQSIZE,
212                         .bc_rep_max_size        = OST_IO_MAXREPSIZE,
213                         .bc_req_portal          = OST_IO_PORTAL,
214                         .bc_rep_portal          = OSC_REPLY_PORTAL,
215                 },
216                 .psc_thr                = {
217                         .tc_thr_name            = "ll_ost_io",
218                         .tc_thr_factor          = OSS_THR_FACTOR,
219                         .tc_nthrs_init          = OSS_NTHRS_INIT,
220                         .tc_nthrs_base          = OSS_NTHRS_BASE,
221                         .tc_nthrs_max           = oss_max_threads,
222                         .tc_nthrs_user          = oss_num_threads,
223                         .tc_cpu_bind            = oss_cpu_bind,
224                         .tc_ctx_tags            = LCT_DT_THREAD,
225                 },
226                 .psc_cpt                = {
227                         .cc_cptable             = ost_io_cptable,
228                         .cc_pattern             = ost_io_cptable == NULL ?
229                                                   oss_io_cpts : NULL,
230                         .cc_affinity            = true,
231                 },
232                 .psc_ops                = {
233                         .so_thr_init            = tgt_io_thread_init,
234                         .so_thr_done            = tgt_io_thread_done,
235                         .so_req_handler         = tgt_request_handle,
236                         .so_hpreq_handler       = tgt_hpreq_handler,
237                         .so_req_printer         = target_print_req,
238                 },
239         };
240         ost->ost_io_service = ptlrpc_register_service(&svc_conf,
241                                                       &obd->obd_kset,
242                                                       obd->obd_debugfs_entry);
243         if (IS_ERR(ost->ost_io_service)) {
244                 rc = PTR_ERR(ost->ost_io_service);
245                 CERROR("failed to start OST I/O service: %d\n", rc);
246                 ost->ost_io_service = NULL;
247                 GOTO(out_create, rc);
248         }
249
250         memset(&svc_conf, 0, sizeof(svc_conf));
251         svc_conf = (typeof(svc_conf)) {
252                 .psc_name               = "ost_seq",
253                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
254                 .psc_buf                = {
255                         .bc_nbufs               = OST_NBUFS,
256                         .bc_buf_size            = OST_BUFSIZE,
257                         .bc_req_max_size        = OST_MAXREQSIZE,
258                         .bc_rep_max_size        = OST_MAXREPSIZE,
259                         .bc_req_portal          = SEQ_DATA_PORTAL,
260                         .bc_rep_portal          = OSC_REPLY_PORTAL,
261                 },
262                 .psc_thr                = {
263                         .tc_thr_name            = "ll_ost_seq",
264                         .tc_thr_factor          = OSS_CR_THR_FACTOR,
265                         .tc_nthrs_init          = OSS_CR_NTHRS_INIT,
266                         .tc_nthrs_base          = OSS_CR_NTHRS_BASE,
267                         .tc_nthrs_max           = OSS_CR_NTHRS_MAX,
268                         .tc_nthrs_user          = oss_num_create_threads,
269                         .tc_cpu_bind            = oss_create_cpu_bind,
270                         .tc_ctx_tags            = LCT_DT_THREAD,
271                 },
272
273                 .psc_cpt                = {
274                         .cc_pattern             = oss_cpts,
275                         .cc_affinity            = true,
276                 },
277                 .psc_ops                = {
278                         .so_req_handler         = tgt_request_handle,
279                         .so_req_printer         = target_print_req,
280                         .so_hpreq_handler       = NULL,
281                 },
282         };
283         ost->ost_seq_service = ptlrpc_register_service(&svc_conf,
284                                                        &obd->obd_kset,
285                                                        obd->obd_debugfs_entry);
286         if (IS_ERR(ost->ost_seq_service)) {
287                 rc = PTR_ERR(ost->ost_seq_service);
288                 CERROR("failed to start OST seq service: %d\n", rc);
289                 ost->ost_seq_service = NULL;
290                 GOTO(out_io, rc);
291         }
292
293         /* Object update service */
294         memset(&svc_conf, 0, sizeof(svc_conf));
295         svc_conf = (typeof(svc_conf)) {
296                 .psc_name               = "ost_out",
297                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
298                 .psc_buf                = {
299                         .bc_nbufs               = OST_NBUFS,
300                         .bc_buf_size            = OUT_BUFSIZE,
301                         .bc_req_max_size        = OUT_MAXREQSIZE,
302                         .bc_rep_max_size        = OUT_MAXREPSIZE,
303                         .bc_req_portal          = OUT_PORTAL,
304                         .bc_rep_portal          = OSC_REPLY_PORTAL,
305                 },
306                 /*
307                  * We'd like to have a mechanism to set this on a per-device
308                  * basis, but alas...
309                  */
310                 .psc_thr                = {
311                         .tc_thr_name            = "ll_ost_out",
312                         .tc_thr_factor          = OSS_CR_THR_FACTOR,
313                         .tc_nthrs_init          = OSS_CR_NTHRS_INIT,
314                         .tc_nthrs_base          = OSS_CR_NTHRS_BASE,
315                         .tc_nthrs_max           = OSS_CR_NTHRS_MAX,
316                         .tc_nthrs_user          = oss_num_create_threads,
317                         .tc_cpu_bind            = oss_create_cpu_bind,
318                         .tc_ctx_tags            = LCT_MD_THREAD |
319                                                   LCT_DT_THREAD,
320                 },
321                 .psc_cpt                = {
322                         .cc_pattern             = oss_cpts,
323                         .cc_affinity            = true,
324                 },
325                 .psc_ops                = {
326                         .so_req_handler         = tgt_request_handle,
327                         .so_req_printer         = target_print_req,
328                         .so_hpreq_handler       = NULL,
329                 },
330         };
331         ost->ost_out_service = ptlrpc_register_service(&svc_conf,
332                                                        &obd->obd_kset,
333                                                        obd->obd_debugfs_entry);
334         if (IS_ERR(ost->ost_out_service)) {
335                 rc = PTR_ERR(ost->ost_out_service);
336                 CERROR("failed to start out service: %d\n", rc);
337                 ost->ost_out_service = NULL;
338                 GOTO(out_seq, rc);
339         }
340
341         ping_evictor_start();
342
343         RETURN(0);
344
345 out_seq:
346         ptlrpc_unregister_service(ost->ost_seq_service);
347         ost->ost_seq_service = NULL;
348 out_io:
349         ptlrpc_unregister_service(ost->ost_io_service);
350         ost->ost_io_service = NULL;
351 out_create:
352         ptlrpc_unregister_service(ost->ost_create_service);
353         ost->ost_create_service = NULL;
354 out_service:
355         ptlrpc_unregister_service(ost->ost_service);
356         ost->ost_service = NULL;
357 out_lprocfs:
358         lprocfs_obd_cleanup(obd);
359         RETURN(rc);
360 }
361
362 static int ost_cleanup(struct obd_device *obd)
363 {
364         struct ost_obd *ost = obd2ost(obd);
365         int err = 0;
366
367         ENTRY;
368
369         ping_evictor_stop();
370
371         /* there is no recovery for OST OBD, all recovery is controlled by
372          * obdfilter OBD
373          */
374         LASSERT(obd->obd_recovering == 0);
375         mutex_lock(&ost->ost_health_mutex);
376         ptlrpc_unregister_service(ost->ost_service);
377         ptlrpc_unregister_service(ost->ost_create_service);
378         ptlrpc_unregister_service(ost->ost_io_service);
379         ptlrpc_unregister_service(ost->ost_seq_service);
380         ptlrpc_unregister_service(ost->ost_out_service);
381
382         ost->ost_service = NULL;
383         ost->ost_create_service = NULL;
384         ost->ost_io_service = NULL;
385         ost->ost_seq_service = NULL;
386         ost->ost_out_service = NULL;
387
388         mutex_unlock(&ost->ost_health_mutex);
389
390         lprocfs_obd_cleanup(obd);
391
392         if (ost_io_cptable) {
393                 cfs_cpt_table_free(ost_io_cptable);
394                 ost_io_cptable = NULL;
395         }
396
397         RETURN(err);
398 }
399
400 static int ost_health_check(const struct lu_env *env, struct obd_device *obd)
401 {
402         struct ost_obd *ost = obd2ost(obd);
403         int rc = 0;
404
405         mutex_lock(&ost->ost_health_mutex);
406         rc |= ptlrpc_service_health_check(ost->ost_service);
407         rc |= ptlrpc_service_health_check(ost->ost_create_service);
408         rc |= ptlrpc_service_health_check(ost->ost_io_service);
409         rc |= ptlrpc_service_health_check(ost->ost_seq_service);
410         mutex_unlock(&ost->ost_health_mutex);
411
412         return rc != 0 ? 1 : 0;
413 }
414
415 /* ioctls on obd dev */
416 static int oss_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
417                          void *karg, void __user *uarg)
418 {
419         struct obd_device *obd = exp->exp_obd;
420         struct obd_ioctl_data *data;
421         int rc = 0;
422
423         ENTRY;
424         CDEBUG(D_IOCTL, "%s: cmd=%x len=%u karg=%pK uarg=%pK\n",
425                obd->obd_name, cmd, len, karg, uarg);
426
427         data = karg;
428         /* we only support nodemap ioctls, for now */
429         if (cmd != OBD_IOC_NODEMAP)
430                 GOTO(out, rc = -EINVAL);
431
432         rc = server_iocontrol_nodemap(obd, data, true);
433         if (rc)
434                 GOTO(out, rc);
435
436 out:
437         RETURN(rc);
438 }
439
440 /* use obd ops to offer management infrastructure */
441 static const struct obd_ops ost_obd_ops = {
442         .o_owner        = THIS_MODULE,
443         .o_setup        = ost_setup,
444         .o_cleanup      = ost_cleanup,
445         .o_health_check = ost_health_check,
446         .o_iocontrol    = oss_iocontrol,
447 };
448
449 static int __init ost_init(void)
450 {
451         int rc;
452
453         ENTRY;
454         rc = libcfs_setup();
455         if (rc)
456                 RETURN(rc);
457
458         rc = class_register_type(&ost_obd_ops, NULL, false,
459                                  LUSTRE_OSS_NAME, NULL);
460
461         RETURN(rc);
462 }
463
464 static void __exit ost_exit(void)
465 {
466         class_unregister_type(LUSTRE_OSS_NAME);
467 }
468
469 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
470 MODULE_DESCRIPTION("Lustre Object Storage Target (OST)");
471 MODULE_VERSION(LUSTRE_VERSION_STRING);
472 MODULE_LICENSE("GPL");
473
474 module_init(ost_init);
475 module_exit(ost_exit);