4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/ost/ost_handler.c
38 * Author: Peter J. Braam <braam@clusterfs.com>
39 * Author: Phil Schwan <phil@clusterfs.com>
42 #define DEBUG_SUBSYSTEM S_OST
44 #include <linux/module.h>
46 #include <lustre_dlm.h>
47 #include <lprocfs_status.h>
48 #include "ost_internal.h"
50 static int oss_num_threads;
51 CFS_MODULE_PARM(oss_num_threads, "i", int, 0444,
52 "number of OSS service threads to start");
54 static int ost_num_threads;
55 CFS_MODULE_PARM(ost_num_threads, "i", int, 0444,
56 "number of OST service threads to start (deprecated)");
58 static int oss_num_create_threads;
59 CFS_MODULE_PARM(oss_num_create_threads, "i", int, 0444,
60 "number of OSS create threads to start");
62 static char *oss_cpts;
63 CFS_MODULE_PARM(oss_cpts, "s", charp, 0444,
64 "CPU partitions OSS threads should run on");
66 static char *oss_io_cpts;
67 CFS_MODULE_PARM(oss_io_cpts, "s", charp, 0444,
68 "CPU partitions OSS IO threads should run on");
71 * Validate oa from client.
72 * If the request comes from 2.0 clients, currently only RSVD seq and IDIF
74 * a. objects in Single MDT FS seq = FID_SEQ_OST_MDT0, oi_id != 0
75 * b. Echo objects(seq = 2), old echo client still use oi_id/oi_seq to
76 * pack ost_id. Because non-zero oi_seq will make it diffcult to tell
77 * whether this is oi_fid or real ostid. So it will check
78 * OBD_CONNECT_FID, then convert the ostid to FID for old client.
79 * c. Old FID-disable osc will send IDIF.
80 * d. new FID-enable osc/osp will send normal FID.
82 * And also oi_id/f_oid should always start from 1. oi_id/f_oid = 0 will
83 * be used for LAST_ID file, and only being accessed inside OST now.
85 static int ost_validate_obdo(struct obd_export *exp, struct obdo *oa,
86 struct obd_ioobj *ioobj)
90 if (unlikely(!(exp_connect_flags(exp) & OBD_CONNECT_FID) &&
91 fid_seq_is_echo(oa->o_oi.oi.oi_seq) && oa != NULL)) {
92 /* Sigh 2.[123] client still sends echo req with oi_id = 0
93 * during create, and we will reset this to 1, since this
94 * oi_id is basically useless in the following create process,
95 * but oi_id == 0 will make it difficult to tell whether it is
96 * real FID or ost_id. */
97 oa->o_oi.oi_fid.f_oid = oa->o_oi.oi.oi_id ?: 1;
98 oa->o_oi.oi_fid.f_seq = FID_SEQ_ECHO;
99 oa->o_oi.oi_fid.f_ver = 0;
101 if (unlikely((oa == NULL) || ostid_id(&oa->o_oi) == 0))
102 GOTO(out, rc = -EPROTO);
104 /* Note: this check might be forced in 2.5 or 2.6, i.e.
105 * all of the requests are required to setup FLGROUP */
106 if (unlikely(!(oa->o_valid & OBD_MD_FLGROUP))) {
107 ostid_set_seq_mdt0(&oa->o_oi);
109 ostid_set_seq_mdt0(&ioobj->ioo_oid);
110 oa->o_valid |= OBD_MD_FLGROUP;
113 if (unlikely(!(fid_seq_is_idif(ostid_seq(&oa->o_oi)) ||
114 fid_seq_is_mdt0(ostid_seq(&oa->o_oi)) ||
115 fid_seq_is_norm(ostid_seq(&oa->o_oi)) ||
116 fid_seq_is_echo(ostid_seq(&oa->o_oi)))))
117 GOTO(out, rc = -EPROTO);
121 unsigned max_brw = ioobj_max_brw_get(ioobj);
123 if (unlikely((max_brw & (max_brw - 1)) != 0)) {
124 CERROR("%s: client %s sent bad ioobj max %u for "DOSTID
125 ": rc = -EPROTO\n", exp->exp_obd->obd_name,
126 obd_export_nid2str(exp), max_brw,
128 GOTO(out, rc = -EPROTO);
130 ioobj->ioo_oid = oa->o_oi;
135 CERROR("%s: client %s sent bad object "DOSTID": rc = %d\n",
136 exp->exp_obd->obd_name, obd_export_nid2str(exp),
137 oa ? ostid_seq(&oa->o_oi) : -1,
138 oa ? ostid_id(&oa->o_oi) : -1, rc);
142 struct ost_prolong_data {
143 struct ptlrpc_request *opd_req;
144 struct obd_export *opd_exp;
146 struct ldlm_res_id opd_resid;
147 struct ldlm_extent opd_extent;
148 ldlm_mode_t opd_mode;
149 unsigned int opd_locks;
153 /* prolong locks for the current service time of the corresponding
154 * portal (= OST_IO_PORTAL)
156 static inline int prolong_timeout(struct ptlrpc_request *req)
158 struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
161 return obd_timeout / 2;
163 return max(at_est2timeout(at_get(&svcpt->scp_at_estimate)),
167 static void ost_prolong_lock_one(struct ost_prolong_data *opd,
168 struct ldlm_lock *lock)
170 LASSERT(lock->l_export == opd->opd_exp);
172 if (lock->l_flags & LDLM_FL_DESTROYED) /* lock already cancelled */
175 /* XXX: never try to grab resource lock here because we're inside
176 * exp_bl_list_lock; in ldlm_lockd.c to handle waiting list we take
177 * res lock and then exp_bl_list_lock. */
179 if (!(lock->l_flags & LDLM_FL_AST_SENT))
180 /* ignore locks not being cancelled */
184 "refreshed for req x"LPU64" ext("LPU64"->"LPU64") to %ds.\n",
185 opd->opd_req->rq_xid, opd->opd_extent.start,
186 opd->opd_extent.end, opd->opd_timeout);
188 /* OK. this is a possible lock the user holds doing I/O
189 * let's refresh eviction timer for it */
190 ldlm_refresh_waiting_lock(lock, opd->opd_timeout);
194 static void ost_prolong_locks(struct ost_prolong_data *data)
196 struct obd_export *exp = data->opd_exp;
197 struct obdo *oa = data->opd_oa;
198 struct ldlm_lock *lock;
201 if (oa->o_valid & OBD_MD_FLHANDLE) {
202 /* mostly a request should be covered by only one lock, try
204 lock = ldlm_handle2lock(&oa->o_handle);
206 /* Fast path to check if the lock covers the whole IO
207 * region exclusively. */
208 if (lock->l_granted_mode == LCK_PW &&
209 ldlm_extent_contain(&lock->l_policy_data.l_extent,
210 &data->opd_extent)) {
212 ost_prolong_lock_one(data, lock);
221 spin_lock_bh(&exp->exp_bl_list_lock);
222 cfs_list_for_each_entry(lock, &exp->exp_bl_list, l_exp_list) {
223 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
224 LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
226 if (!ldlm_res_eq(&data->opd_resid, &lock->l_resource->lr_name))
229 if (!ldlm_extent_overlap(&lock->l_policy_data.l_extent,
233 ost_prolong_lock_one(data, lock);
235 spin_unlock_bh(&exp->exp_bl_list_lock);
241 * Returns 1 if the given PTLRPC matches the given LDLM locks, or 0 if it does
244 static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
245 struct ldlm_lock *lock)
247 struct niobuf_remote *nb;
248 struct obd_ioobj *ioo;
250 struct ldlm_extent ext;
253 opc = lustre_msg_get_opc(req->rq_reqmsg);
254 LASSERT(opc == OST_READ || opc == OST_WRITE);
256 ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
257 LASSERT(ioo != NULL);
259 nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
262 ext.start = nb->offset;
263 nb += ioo->ioo_bufcnt - 1;
264 ext.end = nb->offset + nb->len - 1;
266 LASSERT(lock->l_resource != NULL);
267 if (!ostid_res_name_eq(&ioo->ioo_oid, &lock->l_resource->lr_name))
273 if (!(lock->l_granted_mode & mode))
276 RETURN(ldlm_extent_overlap(&lock->l_policy_data.l_extent, &ext));
280 * High-priority queue request check for whether the given PTLRPC request (\a
281 * req) is blocking an LDLM lock cancel.
283 * Returns 1 if the given given PTLRPC request (\a req) is blocking an LDLM lock
284 * cancel, 0 if it is not, and -EFAULT if the request is malformed.
286 * Only OST_READs, OST_WRITEs and OST_PUNCHes go on the h-p RPC queue. This
287 * function looks only at OST_READs and OST_WRITEs.
289 static int ost_rw_hpreq_check(struct ptlrpc_request *req)
291 struct obd_device *obd = req->rq_export->exp_obd;
292 struct ost_body *body;
293 struct obd_ioobj *ioo;
294 struct niobuf_remote *nb;
295 struct ost_prolong_data opd = { 0 };
300 * Use LASSERT to do sanity check because malformed RPCs should have
301 * been filtered out in ost_hpreq_handler().
303 opc = lustre_msg_get_opc(req->rq_reqmsg);
304 LASSERT(opc == OST_READ || opc == OST_WRITE);
306 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
307 LASSERT(body != NULL);
309 ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
310 LASSERT(ioo != NULL);
312 nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
314 LASSERT(!(nb->flags & OBD_BRW_SRVLOCK));
316 ostid_build_res_name(&ioo->ioo_oid, &opd.opd_resid);
323 opd.opd_exp = req->rq_export;
324 opd.opd_oa = &body->oa;
325 opd.opd_extent.start = nb->offset;
326 nb += ioo->ioo_bufcnt - 1;
327 opd.opd_extent.end = nb->offset + nb->len - 1;
328 opd.opd_timeout = prolong_timeout(req);
330 DEBUG_REQ(D_RPCTRACE, req,
331 "%s %s: refresh rw locks: " LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
332 obd->obd_name, current->comm,
333 opd.opd_resid.name[0], opd.opd_resid.name[1],
334 opd.opd_extent.start, opd.opd_extent.end);
336 ost_prolong_locks(&opd);
338 CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
339 obd->obd_name, opd.opd_locks, req);
341 RETURN(opd.opd_locks > 0);
344 static void ost_rw_hpreq_fini(struct ptlrpc_request *req)
346 (void)ost_rw_hpreq_check(req);
350 * Like ost_rw_hpreq_lock_match(), but for OST_PUNCH RPCs.
352 static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
353 struct ldlm_lock *lock)
355 struct ost_body *body;
358 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
359 LASSERT(body != NULL);
361 if (body->oa.o_valid & OBD_MD_FLHANDLE &&
362 body->oa.o_handle.cookie == lock->l_handle.h_cookie)
369 * Like ost_rw_hpreq_check(), but for OST_PUNCH RPCs.
371 static int ost_punch_hpreq_check(struct ptlrpc_request *req)
373 struct obd_device *obd = req->rq_export->exp_obd;
374 struct ost_body *body;
376 struct ost_prolong_data opd = { 0 };
380 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
381 LASSERT(body != NULL);
384 LASSERT(!(oa->o_valid & OBD_MD_FLFLAGS) ||
385 !(oa->o_flags & OBD_FL_SRVLOCK));
388 end = start + oa->o_blocks;
391 opd.opd_mode = LCK_PW;
392 opd.opd_exp = req->rq_export;
394 opd.opd_extent.start = start;
395 opd.opd_extent.end = end;
396 if (oa->o_blocks == OBD_OBJECT_EOF)
397 opd.opd_extent.end = OBD_OBJECT_EOF;
398 opd.opd_timeout = prolong_timeout(req);
400 ostid_build_res_name(&oa->o_oi, &opd.opd_resid);
403 "%s: refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
405 opd.opd_resid.name[0], opd.opd_resid.name[1],
406 opd.opd_extent.start, opd.opd_extent.end);
408 ost_prolong_locks(&opd);
410 CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
411 obd->obd_name, opd.opd_locks, req);
413 RETURN(opd.opd_locks > 0);
416 static void ost_punch_hpreq_fini(struct ptlrpc_request *req)
418 (void)ost_punch_hpreq_check(req);
421 struct ptlrpc_hpreq_ops ost_hpreq_rw = {
422 .hpreq_lock_match = ost_rw_hpreq_lock_match,
423 .hpreq_check = ost_rw_hpreq_check,
424 .hpreq_fini = ost_rw_hpreq_fini
427 struct ptlrpc_hpreq_ops ost_hpreq_punch = {
428 .hpreq_lock_match = ost_punch_hpreq_lock_match,
429 .hpreq_check = ost_punch_hpreq_check,
430 .hpreq_fini = ost_punch_hpreq_fini
433 /** Assign high priority operations to the request if needed. */
434 static int ost_io_hpreq_handler(struct ptlrpc_request *req)
437 if (req->rq_export) {
438 int opc = lustre_msg_get_opc(req->rq_reqmsg);
439 struct ost_body *body;
441 if (opc == OST_READ || opc == OST_WRITE) {
442 struct niobuf_remote *nb;
443 struct obd_ioobj *ioo;
444 int objcount, niocount;
448 /* RPCs on the H-P queue can be inspected before
449 * ost_handler() initializes their pills, so we
450 * initialize that here. Capsule initialization is
451 * idempotent, as is setting the pill's format (provided
452 * it doesn't change).
454 req_capsule_init(&req->rq_pill, req, RCL_SERVER);
456 req_capsule_set(&req->rq_pill,
459 req_capsule_set(&req->rq_pill,
462 body = req_capsule_client_get(&req->rq_pill,
465 CERROR("Missing/short ost_body\n");
469 objcount = req_capsule_get_size(&req->rq_pill,
474 CERROR("Missing/short ioobj\n");
478 CERROR("too many ioobjs (%d)\n", objcount);
482 ioo = req_capsule_client_get(&req->rq_pill,
485 CERROR("Missing/short ioobj\n");
489 rc = ost_validate_obdo(req->rq_export, &body->oa, ioo);
491 CERROR("invalid object ids\n");
495 for (niocount = i = 0; i < objcount; i++) {
496 if (ioo[i].ioo_bufcnt == 0) {
497 CERROR("ioo[%d] has zero bufcnt\n", i);
500 niocount += ioo[i].ioo_bufcnt;
502 if (niocount > PTLRPC_MAX_BRW_PAGES) {
503 DEBUG_REQ(D_RPCTRACE, req,
504 "bulk has too many pages (%d)",
509 nb = req_capsule_client_get(&req->rq_pill,
512 CERROR("Missing/short niobuf\n");
516 if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
517 req->rq_ops = &ost_hpreq_rw;
518 } else if (opc == OST_PUNCH) {
519 req_capsule_init(&req->rq_pill, req, RCL_SERVER);
520 req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
522 body = req_capsule_client_get(&req->rq_pill,
525 CERROR("Missing/short ost_body\n");
529 if (!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
530 !(body->oa.o_flags & OBD_FL_SRVLOCK))
531 req->rq_ops = &ost_hpreq_punch;
537 #define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000)
539 static struct cfs_cpt_table *ost_io_cptable;
542 LPROC_SEQ_FOPS_RO_TYPE(ost, uuid);
544 static struct lprocfs_seq_vars lprocfs_ost_obd_vars[] = {
545 { "uuid", &ost_uuid_fops },
550 /* Sigh - really, this is an OSS, the _server_, not the _target_ */
551 static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
553 static struct ptlrpc_service_conf svc_conf;
554 struct ost_obd *ost = &obd->u.ost;
559 rc = cfs_cleanup_group_info();
564 obd->obd_vars = lprocfs_ost_obd_vars;
565 lprocfs_seq_obd_setup(obd);
567 mutex_init(&ost->ost_health_mutex);
569 svc_conf = (typeof(svc_conf)) {
570 .psc_name = LUSTRE_OSS_NAME,
571 .psc_watchdog_factor = OSS_SERVICE_WATCHDOG_FACTOR,
573 .bc_nbufs = OST_NBUFS,
574 .bc_buf_size = OST_BUFSIZE,
575 .bc_req_max_size = OST_MAXREQSIZE,
576 .bc_rep_max_size = OST_MAXREPSIZE,
577 .bc_req_portal = OST_REQUEST_PORTAL,
578 .bc_rep_portal = OSC_REPLY_PORTAL,
581 .tc_thr_name = "ll_ost",
582 .tc_thr_factor = OSS_THR_FACTOR,
583 .tc_nthrs_init = OSS_NTHRS_INIT,
584 .tc_nthrs_base = OSS_NTHRS_BASE,
585 .tc_nthrs_max = OSS_NTHRS_MAX,
586 .tc_nthrs_user = oss_num_threads,
587 .tc_cpu_affinity = 1,
588 .tc_ctx_tags = LCT_DT_THREAD,
591 .cc_pattern = oss_cpts,
594 .so_req_handler = tgt_request_handle,
595 .so_req_printer = target_print_req,
596 .so_hpreq_handler = ptlrpc_hpreq_handler,
599 ost->ost_service = ptlrpc_register_service(&svc_conf,
600 obd->obd_proc_entry);
601 if (IS_ERR(ost->ost_service)) {
602 rc = PTR_ERR(ost->ost_service);
603 CERROR("failed to start service: %d\n", rc);
604 GOTO(out_lprocfs, rc);
607 memset(&svc_conf, 0, sizeof(svc_conf));
608 svc_conf = (typeof(svc_conf)) {
609 .psc_name = "ost_create",
610 .psc_watchdog_factor = OSS_SERVICE_WATCHDOG_FACTOR,
612 .bc_nbufs = OST_NBUFS,
613 .bc_buf_size = OST_BUFSIZE,
614 .bc_req_max_size = OST_MAXREQSIZE,
615 .bc_rep_max_size = OST_MAXREPSIZE,
616 .bc_req_portal = OST_CREATE_PORTAL,
617 .bc_rep_portal = OSC_REPLY_PORTAL,
620 .tc_thr_name = "ll_ost_create",
621 .tc_thr_factor = OSS_CR_THR_FACTOR,
622 .tc_nthrs_init = OSS_CR_NTHRS_INIT,
623 .tc_nthrs_base = OSS_CR_NTHRS_BASE,
624 .tc_nthrs_max = OSS_CR_NTHRS_MAX,
625 .tc_nthrs_user = oss_num_create_threads,
626 .tc_cpu_affinity = 1,
627 .tc_ctx_tags = LCT_DT_THREAD,
630 .cc_pattern = oss_cpts,
633 .so_req_handler = tgt_request_handle,
634 .so_req_printer = target_print_req,
637 ost->ost_create_service = ptlrpc_register_service(&svc_conf,
638 obd->obd_proc_entry);
639 if (IS_ERR(ost->ost_create_service)) {
640 rc = PTR_ERR(ost->ost_create_service);
641 CERROR("failed to start OST create service: %d\n", rc);
642 GOTO(out_service, rc);
645 mask = cfs_cpt_table->ctb_nodemask;
646 /* event CPT feature is disabled in libcfs level by set partition
647 * number to 1, we still want to set node affinity for io service */
648 if (cfs_cpt_number(cfs_cpt_table) == 1 && nodes_weight(*mask) > 1) {
652 ost_io_cptable = cfs_cpt_table_alloc(nodes_weight(*mask));
653 for_each_node_mask(i, *mask) {
654 if (ost_io_cptable == NULL) {
655 CWARN("OSS failed to create CPT table\n");
659 rc = cfs_cpt_set_node(ost_io_cptable, cpt++, i);
661 CWARN("OSS Failed to set node %d for"
662 "IO CPT table\n", i);
663 cfs_cpt_table_free(ost_io_cptable);
664 ost_io_cptable = NULL;
670 memset(&svc_conf, 0, sizeof(svc_conf));
671 svc_conf = (typeof(svc_conf)) {
672 .psc_name = "ost_io",
673 .psc_watchdog_factor = OSS_SERVICE_WATCHDOG_FACTOR,
675 .bc_nbufs = OST_NBUFS,
676 .bc_buf_size = OST_IO_BUFSIZE,
677 .bc_req_max_size = OST_IO_MAXREQSIZE,
678 .bc_rep_max_size = OST_IO_MAXREPSIZE,
679 .bc_req_portal = OST_IO_PORTAL,
680 .bc_rep_portal = OSC_REPLY_PORTAL,
683 .tc_thr_name = "ll_ost_io",
684 .tc_thr_factor = OSS_THR_FACTOR,
685 .tc_nthrs_init = OSS_NTHRS_INIT,
686 .tc_nthrs_base = OSS_NTHRS_BASE,
687 .tc_nthrs_max = OSS_NTHRS_MAX,
688 .tc_nthrs_user = oss_num_threads,
689 .tc_cpu_affinity = 1,
690 .tc_ctx_tags = LCT_DT_THREAD,
693 .cc_cptable = ost_io_cptable,
694 .cc_pattern = ost_io_cptable == NULL ?
698 .so_thr_init = tgt_io_thread_init,
699 .so_thr_done = tgt_io_thread_done,
700 .so_req_handler = tgt_request_handle,
701 .so_hpreq_handler = ost_io_hpreq_handler,
702 .so_req_printer = target_print_req,
705 ost->ost_io_service = ptlrpc_register_service(&svc_conf,
706 obd->obd_proc_entry);
707 if (IS_ERR(ost->ost_io_service)) {
708 rc = PTR_ERR(ost->ost_io_service);
709 CERROR("failed to start OST I/O service: %d\n", rc);
710 ost->ost_io_service = NULL;
711 GOTO(out_create, rc);
714 memset(&svc_conf, 0, sizeof(svc_conf));
715 svc_conf = (typeof(svc_conf)) {
716 .psc_name = "ost_seq",
717 .psc_watchdog_factor = OSS_SERVICE_WATCHDOG_FACTOR,
719 .bc_nbufs = OST_NBUFS,
720 .bc_buf_size = OST_BUFSIZE,
721 .bc_req_max_size = OST_MAXREQSIZE,
722 .bc_rep_max_size = OST_MAXREPSIZE,
723 .bc_req_portal = SEQ_DATA_PORTAL,
724 .bc_rep_portal = OSC_REPLY_PORTAL,
727 .tc_thr_name = "ll_ost_seq",
728 .tc_thr_factor = OSS_CR_THR_FACTOR,
729 .tc_nthrs_init = OSS_CR_NTHRS_INIT,
730 .tc_nthrs_base = OSS_CR_NTHRS_BASE,
731 .tc_nthrs_max = OSS_CR_NTHRS_MAX,
732 .tc_nthrs_user = oss_num_create_threads,
733 .tc_cpu_affinity = 1,
734 .tc_ctx_tags = LCT_DT_THREAD,
738 .cc_pattern = oss_cpts,
741 .so_req_handler = tgt_request_handle,
742 .so_req_printer = target_print_req,
743 .so_hpreq_handler = NULL,
746 ost->ost_seq_service = ptlrpc_register_service(&svc_conf,
747 obd->obd_proc_entry);
748 if (IS_ERR(ost->ost_seq_service)) {
749 rc = PTR_ERR(ost->ost_seq_service);
750 CERROR("failed to start OST seq service: %d\n", rc);
751 ost->ost_seq_service = NULL;
755 /* Object update service */
756 memset(&svc_conf, 0, sizeof(svc_conf));
757 svc_conf = (typeof(svc_conf)) {
758 .psc_name = "ost_out",
759 .psc_watchdog_factor = OSS_SERVICE_WATCHDOG_FACTOR,
761 .bc_nbufs = OST_NBUFS,
762 .bc_buf_size = OUT_BUFSIZE,
763 .bc_req_max_size = OUT_MAXREQSIZE,
764 .bc_rep_max_size = OUT_MAXREPSIZE,
765 .bc_req_portal = OUT_PORTAL,
766 .bc_rep_portal = OSC_REPLY_PORTAL,
769 * We'd like to have a mechanism to set this on a per-device
773 .tc_thr_name = "ll_ost_out",
774 .tc_thr_factor = OSS_CR_THR_FACTOR,
775 .tc_nthrs_init = OSS_CR_NTHRS_INIT,
776 .tc_nthrs_base = OSS_CR_NTHRS_BASE,
777 .tc_nthrs_max = OSS_CR_NTHRS_MAX,
778 .tc_nthrs_user = oss_num_create_threads,
779 .tc_cpu_affinity = 1,
780 .tc_ctx_tags = LCT_MD_THREAD |
784 .cc_pattern = oss_cpts,
787 .so_req_handler = tgt_request_handle,
788 .so_req_printer = target_print_req,
789 .so_hpreq_handler = NULL,
792 ost->ost_out_service = ptlrpc_register_service(&svc_conf,
793 obd->obd_proc_entry);
794 if (IS_ERR(ost->ost_out_service)) {
795 rc = PTR_ERR(ost->ost_out_service);
796 CERROR("failed to start out service: %d\n", rc);
797 ost->ost_out_service = NULL;
801 ping_evictor_start();
805 ptlrpc_unregister_service(ost->ost_seq_service);
806 ost->ost_seq_service = NULL;
808 ptlrpc_unregister_service(ost->ost_io_service);
809 ost->ost_io_service = NULL;
811 ptlrpc_unregister_service(ost->ost_create_service);
812 ost->ost_create_service = NULL;
814 ptlrpc_unregister_service(ost->ost_service);
815 ost->ost_service = NULL;
817 lprocfs_obd_cleanup(obd);
821 static int ost_cleanup(struct obd_device *obd)
823 struct ost_obd *ost = &obd->u.ost;
829 /* there is no recovery for OST OBD, all recovery is controlled by
831 LASSERT(obd->obd_recovering == 0);
832 mutex_lock(&ost->ost_health_mutex);
833 ptlrpc_unregister_service(ost->ost_service);
834 ptlrpc_unregister_service(ost->ost_create_service);
835 ptlrpc_unregister_service(ost->ost_io_service);
836 ptlrpc_unregister_service(ost->ost_seq_service);
837 ptlrpc_unregister_service(ost->ost_out_service);
839 ost->ost_service = NULL;
840 ost->ost_create_service = NULL;
841 ost->ost_io_service = NULL;
842 ost->ost_seq_service = NULL;
843 ost->ost_out_service = NULL;
845 mutex_unlock(&ost->ost_health_mutex);
847 lprocfs_obd_cleanup(obd);
849 if (ost_io_cptable != NULL) {
850 cfs_cpt_table_free(ost_io_cptable);
851 ost_io_cptable = NULL;
857 static int ost_health_check(const struct lu_env *env, struct obd_device *obd)
859 struct ost_obd *ost = &obd->u.ost;
862 mutex_lock(&ost->ost_health_mutex);
863 rc |= ptlrpc_service_health_check(ost->ost_service);
864 rc |= ptlrpc_service_health_check(ost->ost_create_service);
865 rc |= ptlrpc_service_health_check(ost->ost_io_service);
866 mutex_unlock(&ost->ost_health_mutex);
869 * health_check to return 0 on healthy
870 * and 1 on unhealthy.
878 /* use obd ops to offer management infrastructure */
879 static struct obd_ops ost_obd_ops = {
880 .o_owner = THIS_MODULE,
881 .o_setup = ost_setup,
882 .o_cleanup = ost_cleanup,
883 .o_health_check = ost_health_check,
887 static int __init ost_init(void)
893 rc = class_register_type(&ost_obd_ops, NULL, NULL,
894 #ifndef HAVE_ONLY_PROCFS_SEQ
897 LUSTRE_OSS_NAME, NULL);
899 if (ost_num_threads != 0 && oss_num_threads == 0) {
900 LCONSOLE_INFO("ost_num_threads module parameter is deprecated, "
901 "use oss_num_threads instead or unset both for "
902 "dynamic thread startup\n");
903 oss_num_threads = ost_num_threads;
909 static void /*__exit*/ ost_exit(void)
911 class_unregister_type(LUSTRE_OSS_NAME);
914 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
915 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
916 MODULE_LICENSE("GPL");
918 module_init(ost_init);
919 module_exit(ost_exit);