4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/ost/ost_handler.c
38 * Author: Peter J. Braam <braam@clusterfs.com>
39 * Author: Phil Schwan <phil@clusterfs.com>
42 #define DEBUG_SUBSYSTEM S_OST
44 #include <linux/module.h>
45 #include <obd_cksum.h>
47 #include <lustre_net.h>
48 #include <lustre_dlm.h>
49 #include <lustre_export.h>
50 #include <lustre_debug.h>
51 #include <linux/init.h>
52 #include <lprocfs_status.h>
53 #include <libcfs/list.h>
54 #include <lustre_quota.h>
55 #include "ost_internal.h"
57 static int oss_num_threads;
58 CFS_MODULE_PARM(oss_num_threads, "i", int, 0444,
59 "number of OSS service threads to start");
61 static int ost_num_threads;
62 CFS_MODULE_PARM(ost_num_threads, "i", int, 0444,
63 "number of OST service threads to start (deprecated)");
65 static int oss_num_create_threads;
66 CFS_MODULE_PARM(oss_num_create_threads, "i", int, 0444,
67 "number of OSS create threads to start");
70 * Do not return server-side uid/gid to remote client
72 static void ost_drop_id(struct obd_export *exp, struct obdo *oa)
74 if (exp_connect_rmtclient(exp)) {
77 oa->o_valid &= ~(OBD_MD_FLUID | OBD_MD_FLGID);
82 * Validate oa from client.
83 * If the request comes from 2.0 clients, currently only RSVD seq and IDIF
85 * a. for single MDS seq = FID_SEQ_OST_MDT0,
86 * b. for CMD, seq = FID_SEQ_OST_MDT0, FID_SEQ_OST_MDT1 - FID_SEQ_OST_MAX
88 static int ost_validate_obdo(struct obd_export *exp, struct obdo *oa,
89 struct obd_ioobj *ioobj)
91 if (oa != NULL && !(oa->o_valid & OBD_MD_FLGROUP)) {
92 oa->o_seq = FID_SEQ_OST_MDT0;
94 ioobj->ioo_seq = FID_SEQ_OST_MDT0;
95 /* remove fid_seq_is_rsvd() after FID-on-OST allows SEQ > 9 */
96 } else if (oa == NULL || !(fid_seq_is_rsvd(oa->o_seq) ||
97 fid_seq_is_mdt0(oa->o_seq))) {
98 CERROR("%s: client %s sent invalid object "POSTID"\n",
99 exp->exp_obd->obd_name, obd_export_nid2str(exp),
100 oa ? oa->o_id : -1, oa ? oa->o_seq : -1);
103 obdo_from_ostid(oa, &oa->o_oi);
105 ioobj_from_obdo(ioobj, oa);
109 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
111 struct oti_req_ack_lock *ack_lock;
117 if (req->rq_repmsg) {
118 __u64 versions[PTLRPC_NUM_VERSIONS] = { 0 };
119 lustre_msg_set_transno(req->rq_repmsg, oti->oti_transno);
120 versions[0] = oti->oti_pre_version;
121 lustre_msg_set_versions(req->rq_repmsg, versions);
123 req->rq_transno = oti->oti_transno;
125 /* XXX 4 == entries in oti_ack_locks??? */
126 for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
129 /* XXX not even calling target_send_reply in some cases... */
130 ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode, 0);
134 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
135 struct obd_trans_info *oti)
137 struct ost_body *body, *repbody;
138 struct lustre_capa *capa = NULL;
142 /* Get the request body */
143 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
147 if (body->oa.o_id == 0)
150 rc = ost_validate_obdo(exp, &body->oa, NULL);
154 /* If there's a DLM request, cancel the locks mentioned in it*/
155 if (req_capsule_field_present(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT)) {
156 struct ldlm_request *dlm;
158 dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
161 ldlm_request_cancel(req, dlm, 0);
164 /* If there's a capability, get it */
165 if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
166 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
168 CERROR("Missing capability for OST DESTROY");
173 /* Prepare the reply */
174 rc = req_capsule_server_pack(&req->rq_pill);
178 /* Get the log cancellation cookie */
179 if (body->oa.o_valid & OBD_MD_FLCOOKIE)
180 oti->oti_logcookies = &body->oa.o_lcookie;
182 /* Finish the reply */
183 repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
184 memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
186 /* Do the destroy and set the reply status accordingly */
187 req->rq_status = obd_destroy(req->rq_svc_thread->t_env, exp,
188 &repbody->oa, NULL, oti, NULL, capa);
193 * Helper function for getting server side [start, start+count] DLM lock
194 * if asked by client.
196 static int ost_lock_get(struct obd_export *exp, struct obdo *oa,
197 __u64 start, __u64 count, struct lustre_handle *lh,
200 struct ldlm_res_id res_id;
201 ldlm_policy_data_t policy;
202 __u64 end = start + count;
206 LASSERT(!lustre_handle_is_used(lh));
207 /* o_id and o_gr are used for localizing resource, if client miss to set
208 * them, do not trigger ASSERTION. */
209 if (unlikely((oa->o_valid & (OBD_MD_FLID | OBD_MD_FLGROUP)) !=
210 (OBD_MD_FLID | OBD_MD_FLGROUP)))
213 if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
214 !(oa->o_flags & OBD_FL_SRVLOCK))
217 osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
218 CDEBUG(D_INODE, "OST-side extent lock.\n");
220 policy.l_extent.start = start & CFS_PAGE_MASK;
222 /* If ->o_blocks is EOF it means "lock till the end of the
223 * file". Otherwise, it's size of a hole being punched (in bytes) */
224 if (count == OBD_OBJECT_EOF || end < start)
225 policy.l_extent.end = OBD_OBJECT_EOF;
227 policy.l_extent.end = end | ~CFS_PAGE_MASK;
229 RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
230 LDLM_EXTENT, &policy, mode, &flags,
231 ldlm_blocking_ast, ldlm_completion_ast,
232 ldlm_glimpse_ast, NULL, 0, NULL, lh));
235 /* Helper function: release lock, if any. */
236 static void ost_lock_put(struct obd_export *exp,
237 struct lustre_handle *lh, int mode)
240 if (lustre_handle_is_used(lh))
241 ldlm_lock_decref(lh, mode);
245 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
247 struct ost_body *body, *repbody;
248 struct obd_info *oinfo;
249 struct lustre_handle lh = { 0 };
250 struct lustre_capa *capa = NULL;
254 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
258 rc = ost_validate_obdo(exp, &body->oa, NULL);
262 if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
263 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
265 CERROR("Missing capability for OST GETATTR");
270 rc = req_capsule_server_pack(&req->rq_pill);
274 repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
275 repbody->oa = body->oa;
277 rc = ost_lock_get(exp, &repbody->oa, 0, OBD_OBJECT_EOF, &lh, LCK_PR, 0);
281 OBD_ALLOC_PTR(oinfo);
283 GOTO(unlock, rc = -ENOMEM);
284 oinfo->oi_oa = &repbody->oa;
285 oinfo->oi_capa = capa;
287 req->rq_status = obd_getattr(req->rq_svc_thread->t_env, exp, oinfo);
291 ost_drop_id(exp, &repbody->oa);
294 ost_lock_put(exp, &lh, LCK_PR);
298 static int ost_statfs(struct ptlrpc_request *req)
300 struct obd_statfs *osfs;
304 rc = req_capsule_server_pack(&req->rq_pill);
308 osfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
310 req->rq_status = obd_statfs(req->rq_svc_thread->t_env, req->rq_export,
312 cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
314 if (req->rq_status != 0)
315 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
320 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
321 struct obd_trans_info *oti)
323 struct ost_body *body, *repbody;
327 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
331 rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
335 rc = req_capsule_server_pack(&req->rq_pill);
339 repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
340 repbody->oa = body->oa;
341 oti->oti_logcookies = &body->oa.o_lcookie;
343 req->rq_status = obd_create(req->rq_svc_thread->t_env, exp,
344 &repbody->oa, NULL, oti);
345 //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
349 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
350 struct obd_trans_info *oti)
352 struct ost_body *body, *repbody;
354 struct lustre_handle lh = {0,};
357 /* check that we do support OBD_CONNECT_TRUNCLOCK. */
358 CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
360 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
364 rc = ost_validate_obdo(exp, &body->oa, NULL);
368 if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
369 (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
372 rc = req_capsule_server_pack(&req->rq_pill);
376 /* standard truncate optimization: if file body is completely
377 * destroyed, don't send data back to the server. */
378 if (body->oa.o_size == 0)
379 flags |= LDLM_AST_DISCARD_DATA;
381 repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
382 repbody->oa = body->oa;
384 rc = ost_lock_get(exp, &repbody->oa, repbody->oa.o_size,
385 repbody->oa.o_blocks, &lh, LCK_PW, flags);
387 struct obd_info *oinfo;
388 struct lustre_capa *capa = NULL;
390 if (repbody->oa.o_valid & OBD_MD_FLFLAGS &&
391 repbody->oa.o_flags == OBD_FL_SRVLOCK)
393 * If OBD_FL_SRVLOCK is the only bit set in
394 * ->o_flags, clear OBD_MD_FLFLAGS to avoid falling
395 * through filter_setattr() to filter_iocontrol().
397 repbody->oa.o_valid &= ~OBD_MD_FLFLAGS;
399 if (repbody->oa.o_valid & OBD_MD_FLOSSCAPA) {
400 capa = req_capsule_client_get(&req->rq_pill,
403 CERROR("Missing capability for OST PUNCH");
404 GOTO(unlock, rc = -EFAULT);
408 OBD_ALLOC_PTR(oinfo);
410 GOTO(unlock, rc = -ENOMEM);
411 oinfo->oi_oa = &repbody->oa;
412 oinfo->oi_policy.l_extent.start = oinfo->oi_oa->o_size;
413 oinfo->oi_policy.l_extent.end = oinfo->oi_oa->o_blocks;
414 oinfo->oi_capa = capa;
415 oinfo->oi_flags = OBD_FL_PUNCH;
417 req->rq_status = obd_punch(req->rq_svc_thread->t_env, exp,
421 ost_lock_put(exp, &lh, LCK_PW);
424 ost_drop_id(exp, &repbody->oa);
428 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req,
429 struct obd_trans_info *oti)
431 struct ost_body *body, *repbody;
432 struct obd_info *oinfo;
433 struct lustre_capa *capa = NULL;
437 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
441 rc = ost_validate_obdo(exp, &body->oa, NULL);
445 if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
446 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
448 CERROR("Missing capability for OST SYNC");
453 rc = req_capsule_server_pack(&req->rq_pill);
457 repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
458 repbody->oa = body->oa;
460 OBD_ALLOC_PTR(oinfo);
464 oinfo->oi_oa = &repbody->oa;
465 oinfo->oi_capa = capa;
466 oinfo->oi_jobid = oti->oti_jobid;
467 req->rq_status = obd_sync(req->rq_svc_thread->t_env, exp, oinfo,
468 repbody->oa.o_size, repbody->oa.o_blocks,
472 ost_drop_id(exp, &repbody->oa);
476 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
477 struct obd_trans_info *oti)
479 struct ost_body *body, *repbody;
480 struct obd_info *oinfo;
481 struct lustre_capa *capa = NULL;
485 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
489 rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
493 rc = req_capsule_server_pack(&req->rq_pill);
497 if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
498 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
500 CERROR("Missing capability for OST SETATTR");
505 repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
506 repbody->oa = body->oa;
508 OBD_ALLOC_PTR(oinfo);
511 oinfo->oi_oa = &repbody->oa;
512 oinfo->oi_capa = capa;
514 req->rq_status = obd_setattr(req->rq_svc_thread->t_env, exp, oinfo,
519 ost_drop_id(exp, &repbody->oa);
523 static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc, int opc,
524 cksum_type_t cksum_type)
529 cksum = init_checksum(cksum_type);
530 for (i = 0; i < desc->bd_iov_count; i++) {
531 struct page *page = desc->bd_iov[i].kiov_page;
532 int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
533 char *ptr = kmap(page) + off;
534 int len = desc->bd_iov[i].kiov_len;
536 /* corrupt the data before we compute the checksum, to
537 * simulate a client->OST data error */
538 if (i == 0 && opc == OST_WRITE &&
539 OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_RECEIVE))
540 memcpy(ptr, "bad3", min(4, len));
541 cksum = compute_checksum(cksum, ptr, len, cksum_type);
542 /* corrupt the data after we compute the checksum, to
543 * simulate an OST->client data error */
544 if (i == 0 && opc == OST_READ &&
545 OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND)) {
546 memcpy(ptr, "bad4", min(4, len));
547 /* nobody should use corrupted page again */
548 ClearPageUptodate(page);
553 return fini_checksum(cksum, cksum_type);
556 static int ost_brw_lock_get(int mode, struct obd_export *exp,
557 struct obd_ioobj *obj, struct niobuf_remote *nb,
558 struct lustre_handle *lh)
561 int nrbufs = obj->ioo_bufcnt;
562 struct ldlm_res_id res_id;
563 ldlm_policy_data_t policy;
567 osc_build_res_name(obj->ioo_id, obj->ioo_seq, &res_id);
568 LASSERT(mode == LCK_PR || mode == LCK_PW);
569 LASSERT(!lustre_handle_is_used(lh));
571 if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
574 for (i = 1; i < nrbufs; i ++)
575 if ((nb[0].flags & OBD_BRW_SRVLOCK) !=
576 (nb[i].flags & OBD_BRW_SRVLOCK))
579 policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
580 policy.l_extent.end = (nb[nrbufs - 1].offset +
581 nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
583 RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
584 LDLM_EXTENT, &policy, mode, &flags,
585 ldlm_blocking_ast, ldlm_completion_ast,
586 ldlm_glimpse_ast, NULL, 0, NULL, lh));
589 static void ost_brw_lock_put(int mode,
590 struct obd_ioobj *obj, struct niobuf_remote *niob,
591 struct lustre_handle *lh)
594 LASSERT(mode == LCK_PR || mode == LCK_PW);
595 LASSERT((obj->ioo_bufcnt > 0 && (niob[0].flags & OBD_BRW_SRVLOCK)) ==
596 lustre_handle_is_used(lh));
597 if (lustre_handle_is_used(lh))
598 ldlm_lock_decref(lh, mode);
602 /* Allocate thread local buffers if needed */
603 static struct ost_thread_local_cache *ost_tls_get(struct ptlrpc_request *r)
605 struct ost_thread_local_cache *tls =
606 (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
608 /* In normal mode of operation an I/O request is serviced only
609 * by ll_ost_io threads each of them has own tls buffers allocated by
611 * During recovery, an I/O request may be queued until any of the ost
612 * service threads process it. Not necessary it should be one of
613 * ll_ost_io threads. In that case we dynamically allocating tls
614 * buffers for the request service time. */
615 if (unlikely(tls == NULL)) {
616 LASSERT(r->rq_export->exp_in_recovery);
620 r->rq_svc_thread->t_data = tls;
626 /* Free thread local buffers if they were allocated only for servicing
627 * this one request */
628 static void ost_tls_put(struct ptlrpc_request *r)
630 struct ost_thread_local_cache *tls =
631 (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
633 if (unlikely(tls->temporary)) {
635 r->rq_svc_thread->t_data = NULL;
639 static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
641 struct ptlrpc_bulk_desc *desc = NULL;
642 struct obd_export *exp = req->rq_export;
643 struct niobuf_remote *remote_nb;
644 struct niobuf_local *local_nb;
645 struct obd_ioobj *ioo;
646 struct ost_body *body, *repbody;
647 struct lustre_capa *capa = NULL;
648 struct l_wait_info lwi;
649 struct lustre_handle lockh = { 0 };
650 int niocount, npages, nob = 0, rc, i;
652 struct ost_thread_local_cache *tls;
655 req->rq_bulk_read = 1;
657 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
658 GOTO(out, rc = -EIO);
660 OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
662 /* Check if there is eviction in progress, and if so, wait for it to
664 if (unlikely(cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
665 lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes
666 rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
667 !cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress),
671 GOTO(out, rc = -ENOTCONN);
673 /* ost_body, ioobj & noibuf_remote are verified and swabbed in
674 * ost_rw_hpreq_check(). */
675 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
677 GOTO(out, rc = -EFAULT);
680 * A req_capsule_X_get_array(pill, field, ptr_to_element_count) function
681 * would be useful here and wherever we get &RMF_OBD_IOOBJ and
682 * &RMF_NIOBUF_REMOTE.
684 ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
686 GOTO(out, rc = -EFAULT);
688 rc = ost_validate_obdo(exp, &body->oa, ioo);
692 niocount = ioo->ioo_bufcnt;
693 remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
694 if (remote_nb == NULL)
695 GOTO(out, rc = -EFAULT);
697 if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
698 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
700 CERROR("Missing capability for OST BRW READ");
701 GOTO(out, rc = -EFAULT);
705 rc = req_capsule_server_pack(&req->rq_pill);
709 tls = ost_tls_get(req);
711 GOTO(out_bulk, rc = -ENOMEM);
712 local_nb = tls->local;
714 rc = ost_brw_lock_get(LCK_PR, exp, ioo, remote_nb, &lockh);
719 * If getting the lock took more time than
720 * client was willing to wait, drop it. b=11330
722 if (cfs_time_current_sec() > req->rq_deadline ||
723 OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
725 CERROR("Dropping timed-out read from %s because locking"
726 "object "LPX64" took %ld seconds (limit was %ld).\n",
727 libcfs_id2str(req->rq_peer), ioo->ioo_id,
728 cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
729 req->rq_deadline - req->rq_arrival_time.tv_sec);
730 GOTO(out_lock, rc = -ETIMEDOUT);
733 repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
734 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
736 npages = OST_THREAD_POOL_SIZE;
737 rc = obd_preprw(req->rq_svc_thread->t_env, OBD_BRW_READ, exp,
738 &repbody->oa, 1, ioo, remote_nb, &npages, local_nb,
743 desc = ptlrpc_prep_bulk_exp(req, npages,
744 BULK_PUT_SOURCE, OST_BULK_PORTAL);
746 GOTO(out_commitrw, rc = -ENOMEM);
749 for (i = 0; i < npages; i++) {
750 int page_rc = local_nb[i].rc;
752 if (page_rc < 0) { /* error */
758 if (page_rc != 0) { /* some data! */
759 LASSERT (local_nb[i].page != NULL);
760 ptlrpc_prep_bulk_page(desc, local_nb[i].page,
761 local_nb[i].offset & ~CFS_PAGE_MASK,
765 if (page_rc != local_nb[i].len) { /* short read */
766 /* All subsequent pages should be 0 */
768 LASSERT(local_nb[i].rc == 0);
773 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
774 cksum_type_t cksum_type =
775 cksum_type_unpack(repbody->oa.o_valid & OBD_MD_FLFLAGS ?
776 repbody->oa.o_flags : 0);
777 repbody->oa.o_flags = cksum_type_pack(cksum_type);
778 repbody->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
779 repbody->oa.o_cksum = ost_checksum_bulk(desc, OST_READ,cksum_type);
780 CDEBUG(D_PAGE, "checksum at read origin: %x\n",
781 repbody->oa.o_cksum);
783 repbody->oa.o_valid = 0;
785 /* We're finishing using body->oa as an input variable */
787 /* Check if client was evicted while we were doing i/o before touching
790 if (likely(!CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2)))
791 rc = target_bulk_io(exp, desc, &lwi);
796 /* Must commit after prep above in all cases */
797 rc = obd_commitrw(req->rq_svc_thread->t_env, OBD_BRW_READ, exp,
798 &repbody->oa, 1, ioo, remote_nb, npages, local_nb,
802 ost_drop_id(exp, &repbody->oa);
805 ost_brw_lock_put(LCK_PR, ioo, remote_nb, &lockh);
809 if (desc && !CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))
810 ptlrpc_free_bulk(desc);
814 req->rq_status = nob;
815 ptlrpc_lprocfs_brw(req, nob);
816 target_committed_to_req(req);
818 } else if (!no_reply) {
819 /* Only reply if there was no comms problem with bulk */
820 target_committed_to_req(req);
824 /* reply out callback would free */
825 ptlrpc_req_drop_rs(req);
826 LCONSOLE_WARN("%s: Bulk IO read error with %s (at %s), "
827 "client will retry: rc %d\n",
828 exp->exp_obd->obd_name,
829 obd_uuid2str(&exp->exp_client_uuid),
830 obd_export_nid2str(exp), rc);
832 /* send a bulk after reply to simulate a network delay or reordering
834 if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))) {
836 struct l_wait_info lwi1;
838 CDEBUG(D_INFO, "reorder BULK\n");
839 cfs_waitq_init(&waitq);
841 lwi1 = LWI_TIMEOUT_INTR(cfs_time_seconds(3), NULL, NULL, NULL);
842 l_wait_event(waitq, 0, &lwi1);
843 rc = target_bulk_io(exp, desc, &lwi);
844 ptlrpc_free_bulk(desc);
850 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
852 struct ptlrpc_bulk_desc *desc = NULL;
853 struct obd_export *exp = req->rq_export;
854 struct niobuf_remote *remote_nb;
855 struct niobuf_local *local_nb;
856 struct obd_ioobj *ioo;
857 struct ost_body *body, *repbody;
858 struct l_wait_info lwi;
859 struct lustre_handle lockh = {0};
860 struct lustre_capa *capa = NULL;
862 int objcount, niocount, npages;
864 obd_count client_cksum = 0, server_cksum = 0;
865 cksum_type_t cksum_type = OBD_CKSUM_CRC32;
866 int no_reply = 0, mmap = 0;
867 __u32 o_uid = 0, o_gid = 0;
868 struct ost_thread_local_cache *tls;
871 req->rq_bulk_write = 1;
873 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
874 GOTO(out, rc = -EIO);
875 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK2))
876 GOTO(out, rc = -EFAULT);
878 /* pause before transaction has been started */
879 OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
881 /* ost_body, ioobj & noibuf_remote are verified and swabbed in
882 * ost_rw_hpreq_check(). */
883 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
885 GOTO(out, rc = -EFAULT);
887 objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
888 RCL_CLIENT) / sizeof(*ioo);
889 ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
891 GOTO(out, rc = -EFAULT);
893 rc = ost_validate_obdo(exp, &body->oa, ioo);
897 for (niocount = i = 0; i < objcount; i++)
898 niocount += ioo[i].ioo_bufcnt;
901 * It'd be nice to have a capsule function to indicate how many elements
902 * there were in a buffer for an RMF that's declared to be an array.
903 * It's easy enough to compute the number of elements here though.
905 remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
906 if (remote_nb == NULL || niocount != (req_capsule_get_size(&req->rq_pill,
907 &RMF_NIOBUF_REMOTE, RCL_CLIENT) / sizeof(*remote_nb)))
908 GOTO(out, rc = -EFAULT);
910 if ((remote_nb[0].flags & OBD_BRW_MEMALLOC) &&
911 (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self))
912 cfs_memory_pressure_set();
914 if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
915 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
917 CERROR("Missing capability for OST BRW WRITE");
918 GOTO(out, rc = -EFAULT);
922 req_capsule_set_size(&req->rq_pill, &RMF_RCS, RCL_SERVER,
923 niocount * sizeof(*rcs));
924 rc = req_capsule_server_pack(&req->rq_pill);
927 CFS_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, cfs_fail_val);
928 rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS);
930 tls = ost_tls_get(req);
932 GOTO(out_bulk, rc = -ENOMEM);
933 local_nb = tls->local;
935 rc = ost_brw_lock_get(LCK_PW, exp, ioo, remote_nb, &lockh);
940 * If getting the lock took more time than
941 * client was willing to wait, drop it. b=11330
943 if (cfs_time_current_sec() > req->rq_deadline ||
944 OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
946 CERROR("Dropping timed-out write from %s because locking "
947 "object "LPX64" took %ld seconds (limit was %ld).\n",
948 libcfs_id2str(req->rq_peer), ioo->ioo_id,
949 cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
950 req->rq_deadline - req->rq_arrival_time.tv_sec);
951 GOTO(out_lock, rc = -ETIMEDOUT);
954 /* obd_preprw clobbers oa->valid, so save what we need */
955 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
956 client_cksum = body->oa.o_cksum;
957 if (body->oa.o_valid & OBD_MD_FLFLAGS)
958 cksum_type = cksum_type_unpack(body->oa.o_flags);
960 if (body->oa.o_valid & OBD_MD_FLFLAGS && body->oa.o_flags & OBD_FL_MMAP)
963 /* Because we already sync grant info with client when reconnect,
964 * grant info will be cleared for resent req, then fed_grant and
965 * total_grant will not be modified in following preprw_write */
966 if (lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) {
967 DEBUG_REQ(D_CACHE, req, "clear resent/replay req grant info");
968 body->oa.o_valid &= ~OBD_MD_FLGRANT;
971 if (exp_connect_rmtclient(exp)) {
972 o_uid = body->oa.o_uid;
973 o_gid = body->oa.o_gid;
976 repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
977 memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
979 npages = OST_THREAD_POOL_SIZE;
980 rc = obd_preprw(req->rq_svc_thread->t_env, OBD_BRW_WRITE, exp,
981 &repbody->oa, objcount, ioo, remote_nb, &npages,
982 local_nb, oti, capa);
986 desc = ptlrpc_prep_bulk_exp(req, npages,
987 BULK_GET_SINK, OST_BULK_PORTAL);
989 GOTO(skip_transfer, rc = -ENOMEM);
991 /* NB Having prepped, we must commit... */
993 for (i = 0; i < npages; i++)
994 ptlrpc_prep_bulk_page(desc, local_nb[i].page,
995 local_nb[i].offset & ~CFS_PAGE_MASK,
998 rc = sptlrpc_svc_prep_bulk(req, desc);
1002 rc = target_bulk_io(exp, desc, &lwi);
1006 if (client_cksum != 0 && rc == 0) {
1007 static int cksum_counter;
1008 repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1009 repbody->oa.o_flags &= ~OBD_FL_CKSUM_ALL;
1010 repbody->oa.o_flags |= cksum_type_pack(cksum_type);
1011 server_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
1012 repbody->oa.o_cksum = server_cksum;
1014 if (unlikely(client_cksum != server_cksum)) {
1015 CDEBUG_LIMIT(mmap ? D_INFO : D_ERROR,
1016 "client csum %x, server csum %x\n",
1017 client_cksum, server_cksum);
1019 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
1020 CDEBUG(D_INFO, "Checksum %u from %s OK: %x\n",
1021 cksum_counter, libcfs_id2str(req->rq_peer),
1026 /* Must commit after prep above in all cases */
1027 rc = obd_commitrw(req->rq_svc_thread->t_env, OBD_BRW_WRITE, exp,
1028 &repbody->oa, objcount, ioo, remote_nb, npages,
1030 if (rc == -ENOTCONN)
1031 /* quota acquire process has been given up because
1032 * either the client has been evicted or the client
1033 * has timed out the request already */
1036 if (exp_connect_rmtclient(exp)) {
1037 repbody->oa.o_uid = o_uid;
1038 repbody->oa.o_gid = o_gid;
1042 * Disable sending mtime back to the client. If the client locked the
1043 * whole object, then it has already updated the mtime on its side,
1044 * otherwise it will have to glimpse anyway (see bug 21489, comment 32)
1046 repbody->oa.o_valid &= ~(OBD_MD_FLMTIME | OBD_MD_FLATIME);
1048 if (unlikely(client_cksum != server_cksum && rc == 0 && !mmap)) {
1049 int new_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
1054 if (new_cksum == server_cksum)
1055 msg = "changed in transit before arrival at OST";
1056 else if (new_cksum == client_cksum)
1057 msg = "initial checksum before message complete";
1059 msg = "changed in transit AND after initial checksum";
1061 if (req->rq_peer.nid == desc->bd_sender) {
1065 router = libcfs_nid2str(desc->bd_sender);
1068 LCONSOLE_ERROR_MSG(0x168, "%s: BAD WRITE CHECKSUM: %s from "
1069 "%s%s%s inode "DFID" object "
1070 LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1071 exp->exp_obd->obd_name, msg,
1072 libcfs_id2str(req->rq_peer),
1074 body->oa.o_valid & OBD_MD_FLFID ?
1075 body->oa.o_parent_seq : (__u64)0,
1076 body->oa.o_valid & OBD_MD_FLFID ?
1077 body->oa.o_parent_oid : 0,
1078 body->oa.o_valid & OBD_MD_FLFID ?
1079 body->oa.o_parent_ver : 0,
1081 body->oa.o_valid & OBD_MD_FLGROUP ?
1082 body->oa.o_seq : (__u64)0,
1084 local_nb[npages-1].offset +
1085 local_nb[npages-1].len - 1 );
1086 CERROR("client csum %x, original server csum %x, "
1087 "server csum now %x\n",
1088 client_cksum, server_cksum, new_cksum);
1094 /* set per-requested niobuf return codes */
1095 for (i = j = 0; i < niocount; i++) {
1096 int len = remote_nb[i].len;
1101 LASSERT(j < npages);
1102 if (local_nb[j].rc < 0)
1103 rcs[i] = local_nb[j].rc;
1104 len -= local_nb[j].len;
1109 LASSERT(j == npages);
1110 ptlrpc_lprocfs_brw(req, nob);
1114 ost_brw_lock_put(LCK_PW, ioo, remote_nb, &lockh);
1119 ptlrpc_free_bulk(desc);
1122 oti_to_request(oti, req);
1123 target_committed_to_req(req);
1124 rc = ptlrpc_reply(req);
1125 } else if (!no_reply) {
1126 /* Only reply if there was no comms problem with bulk */
1127 target_committed_to_req(req);
1128 req->rq_status = rc;
1131 /* reply out callback would free */
1132 ptlrpc_req_drop_rs(req);
1133 LCONSOLE_WARN("%s: Bulk IO write error with %s (at %s), "
1134 "client will retry: rc %d\n",
1135 exp->exp_obd->obd_name,
1136 obd_uuid2str(&exp->exp_client_uuid),
1137 obd_export_nid2str(exp), rc);
1139 cfs_memory_pressure_clr();
1144 * Implementation of OST_SET_INFO.
1146 * OST_SET_INFO is like ioctl(): heavily overloaded. Specifically, it takes a
1147 * "key" and a value RPC buffers as arguments, with the value's contents
1148 * interpreted according to the key.
1150 * Value types that need swabbing have swabbing done explicitly, either here or
1151 * in functions called from here. This should be corrected: all swabbing should
1152 * be done in the capsule abstraction, as that will then allow us to move
1153 * swabbing exclusively to the client without having to modify server code
1154 * outside the capsule abstraction's implementation itself. To correct this
1155 * will require minor changes to the capsule abstraction; see the comments for
1156 * req_capsule_extend() in layout.c.
1158 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
1160 struct ost_body *body = NULL, *repbody;
1161 char *key, *val = NULL;
1162 int keylen, vallen, rc = 0;
1163 int is_grant_shrink = 0;
1166 key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
1168 DEBUG_REQ(D_HA, req, "no set_info key");
1171 keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
1174 vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
1177 if ((is_grant_shrink = KEY_IS(KEY_GRANT_SHRINK)))
1178 /* In this case the value is actually an RMF_OST_BODY, so we
1179 * transmutate the type of this PTLRPC */
1180 req_capsule_extend(&req->rq_pill, &RQF_OST_SET_GRANT_INFO);
1182 rc = req_capsule_server_pack(&req->rq_pill);
1187 if (is_grant_shrink) {
1188 body = req_capsule_client_get(&req->rq_pill,
1193 repbody = req_capsule_server_get(&req->rq_pill,
1195 memcpy(repbody, body, sizeof(*body));
1196 val = (char*)repbody;
1198 val = req_capsule_client_get(&req->rq_pill,
1203 if (KEY_IS(KEY_EVICT_BY_NID)) {
1205 obd_export_evict_by_nid(exp->exp_obd, val);
1207 } else if (KEY_IS(KEY_MDS_CONN) && ptlrpc_req_need_swab(req)) {
1208 if (vallen < sizeof(__u32))
1210 __swab32s((__u32 *)val);
1213 /* OBD will also check if KEY_IS(KEY_GRANT_SHRINK), and will cast val to
1214 * a struct ost_body * value */
1215 rc = obd_set_info_async(req->rq_svc_thread->t_env, exp, keylen,
1216 key, vallen, val, NULL);
1218 lustre_msg_set_status(req->rq_repmsg, 0);
1222 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
1225 int keylen, replylen, rc = 0;
1226 struct req_capsule *pill = &req->rq_pill;
1229 /* this common part for get_info rpc */
1230 key = req_capsule_client_get(pill, &RMF_SETINFO_KEY);
1232 DEBUG_REQ(D_HA, req, "no get_info key");
1235 keylen = req_capsule_get_size(pill, &RMF_SETINFO_KEY, RCL_CLIENT);
1237 if (KEY_IS(KEY_FIEMAP)) {
1238 struct ll_fiemap_info_key *fm_key = key;
1241 rc = ost_validate_obdo(exp, &fm_key->oa, NULL);
1246 rc = obd_get_info(req->rq_svc_thread->t_env, exp, keylen, key,
1247 &replylen, NULL, NULL);
1251 req_capsule_set_size(pill, &RMF_GENERIC_DATA,
1252 RCL_SERVER, replylen);
1254 rc = req_capsule_server_pack(pill);
1258 reply = req_capsule_server_get(pill, &RMF_GENERIC_DATA);
1262 /* call again to fill in the reply buffer */
1263 rc = obd_get_info(req->rq_svc_thread->t_env, exp, keylen, key,
1264 &replylen, reply, NULL);
1266 lustre_msg_set_status(req->rq_repmsg, 0);
1270 #ifdef HAVE_QUOTA_SUPPORT
1271 static int ost_handle_quotactl(struct ptlrpc_request *req)
1273 struct obd_quotactl *oqctl, *repoqc;
1277 oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1279 GOTO(out, rc = -EPROTO);
1281 rc = req_capsule_server_pack(&req->rq_pill);
1285 repoqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1286 req->rq_status = obd_quotactl(req->rq_export, oqctl);
1293 static int ost_handle_quotacheck(struct ptlrpc_request *req)
1295 struct obd_quotactl *oqctl;
1299 oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1303 rc = req_capsule_server_pack(&req->rq_pill);
1307 req->rq_status = obd_quotacheck(req->rq_export, oqctl);
1311 static int ost_handle_quota_adjust_qunit(struct ptlrpc_request *req)
1313 struct quota_adjust_qunit *oqaq, *repoqa;
1314 struct lustre_quota_ctxt *qctxt;
1318 qctxt = &req->rq_export->exp_obd->u.obt.obt_qctxt;
1319 oqaq = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_ADJUST_QUNIT);
1321 GOTO(out, rc = -EPROTO);
1323 rc = req_capsule_server_pack(&req->rq_pill);
1327 repoqa = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_ADJUST_QUNIT);
1328 req->rq_status = obd_quota_adjust_qunit(req->rq_export, oqaq, qctxt, NULL);
1336 static int ost_llog_handle_connect(struct obd_export *exp,
1337 struct ptlrpc_request *req)
1339 struct llogd_conn_body *body;
1343 body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_CONN_BODY);
1344 rc = obd_llog_connect(exp, body);
1348 #define ost_init_sec_none(reply, exp) \
1350 reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT | \
1351 OBD_CONNECT_RMT_CLIENT_FORCE | \
1352 OBD_CONNECT_OSS_CAPA); \
1353 cfs_spin_lock(&exp->exp_lock); \
1354 exp->exp_connect_flags = reply->ocd_connect_flags; \
1355 cfs_spin_unlock(&exp->exp_lock); \
1358 static int ost_init_sec_level(struct ptlrpc_request *req)
1360 struct obd_export *exp = req->rq_export;
1361 struct req_capsule *pill = &req->rq_pill;
1362 struct obd_device *obd = exp->exp_obd;
1363 struct filter_obd *filter = &obd->u.filter;
1364 char *client = libcfs_nid2str(req->rq_peer.nid);
1365 struct obd_connect_data *data, *reply;
1369 data = req_capsule_client_get(pill, &RMF_CONNECT_DATA);
1370 reply = req_capsule_server_get(pill, &RMF_CONNECT_DATA);
1371 if (data == NULL || reply == NULL)
1374 /* connection from MDT is always trusted */
1375 if (req->rq_auth_usr_mdt) {
1376 ost_init_sec_none(reply, exp);
1380 /* no GSS support case */
1381 if (!req->rq_auth_gss) {
1382 if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
1383 CWARN("client %s -> target %s does not user GSS, "
1384 "can not run under security level %d.\n",
1385 client, obd->obd_name, filter->fo_sec_level);
1388 ost_init_sec_none(reply, exp);
1393 /* old version case */
1394 if (unlikely(!(data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT) ||
1395 !(data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA))) {
1396 if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
1397 CWARN("client %s -> target %s uses old version, "
1398 "can not run under security level %d.\n",
1399 client, obd->obd_name, filter->fo_sec_level);
1402 CWARN("client %s -> target %s uses old version, "
1403 "run under security level %d.\n",
1404 client, obd->obd_name, filter->fo_sec_level);
1405 ost_init_sec_none(reply, exp);
1410 remote = data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT_FORCE;
1412 if (!req->rq_auth_remote)
1413 CDEBUG(D_SEC, "client (local realm) %s -> target %s "
1414 "asked to be remote.\n", client, obd->obd_name);
1415 } else if (req->rq_auth_remote) {
1417 CDEBUG(D_SEC, "client (remote realm) %s -> target %s is set "
1418 "as remote by default.\n", client, obd->obd_name);
1422 if (!filter->fo_fl_oss_capa) {
1423 CDEBUG(D_SEC, "client %s -> target %s is set as remote,"
1424 " but OSS capabilities are not enabled: %d.\n",
1425 client, obd->obd_name, filter->fo_fl_oss_capa);
1430 switch (filter->fo_sec_level) {
1431 case LUSTRE_SEC_NONE:
1433 ost_init_sec_none(reply, exp);
1436 CDEBUG(D_SEC, "client %s -> target %s is set as remote, "
1437 "can not run under security level %d.\n",
1438 client, obd->obd_name, filter->fo_sec_level);
1441 case LUSTRE_SEC_REMOTE:
1443 ost_init_sec_none(reply, exp);
1445 case LUSTRE_SEC_ALL:
1447 reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
1448 OBD_CONNECT_RMT_CLIENT_FORCE);
1449 if (!filter->fo_fl_oss_capa)
1450 reply->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
1452 cfs_spin_lock(&exp->exp_lock);
1453 exp->exp_connect_flags = reply->ocd_connect_flags;
1454 cfs_spin_unlock(&exp->exp_lock);
1466 * this should be done in filter_connect()/filter_reconnect(), but
1467 * we can't obtain information like NID, which stored in incoming
1468 * request, thus can't decide what flavor to use. so we do it here.
1470 * This hack should be removed after the OST stack be rewritten, just
1471 * like what we are doing in mdt_obd_connect()/mdt_obd_reconnect().
1473 static int ost_connect_check_sptlrpc(struct ptlrpc_request *req)
1475 struct obd_export *exp = req->rq_export;
1476 struct filter_obd *filter = &exp->exp_obd->u.filter;
1477 struct sptlrpc_flavor flvr;
1480 if (unlikely(strcmp(exp->exp_obd->obd_type->typ_name,
1481 LUSTRE_ECHO_NAME) == 0)) {
1482 exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_ANY;
1486 if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
1487 cfs_read_lock(&filter->fo_sptlrpc_lock);
1488 sptlrpc_target_choose_flavor(&filter->fo_sptlrpc_rset,
1492 cfs_read_unlock(&filter->fo_sptlrpc_lock);
1494 cfs_spin_lock(&exp->exp_lock);
1496 exp->exp_sp_peer = req->rq_sp_from;
1497 exp->exp_flvr = flvr;
1499 if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
1500 exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
1501 CERROR("unauthorized rpc flavor %x from %s, "
1502 "expect %x\n", req->rq_flvr.sf_rpc,
1503 libcfs_nid2str(req->rq_peer.nid),
1504 exp->exp_flvr.sf_rpc);
1508 cfs_spin_unlock(&exp->exp_lock);
1510 if (exp->exp_sp_peer != req->rq_sp_from) {
1511 CERROR("RPC source %s doesn't match %s\n",
1512 sptlrpc_part2name(req->rq_sp_from),
1513 sptlrpc_part2name(exp->exp_sp_peer));
1516 rc = sptlrpc_target_export_check(exp, req);
1523 /* Ensure that data and metadata are synced to the disk when lock is cancelled
1525 int ost_blocking_ast(struct ldlm_lock *lock,
1526 struct ldlm_lock_desc *desc,
1527 void *data, int flag)
1529 __u32 sync_lock_cancel = 0;
1530 __u32 len = sizeof(sync_lock_cancel);
1534 rc = obd_get_info(NULL, lock->l_export, sizeof(KEY_SYNC_LOCK_CANCEL),
1535 KEY_SYNC_LOCK_CANCEL, &len, &sync_lock_cancel, NULL);
1537 if (!rc && flag == LDLM_CB_CANCELING &&
1538 (lock->l_granted_mode & (LCK_PW|LCK_GROUP)) &&
1539 (sync_lock_cancel == ALWAYS_SYNC_ON_CANCEL ||
1540 (sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL &&
1541 lock->l_flags & LDLM_FL_CBPENDING))) {
1542 struct obd_info *oinfo;
1546 OBD_ALLOC_PTR(oinfo);
1551 OBD_FREE_PTR(oinfo);
1554 oa->o_id = lock->l_resource->lr_name.name[0];
1555 oa->o_seq = lock->l_resource->lr_name.name[1];
1556 oa->o_valid = OBD_MD_FLID|OBD_MD_FLGROUP;
1559 rc = obd_sync(NULL, lock->l_export, oinfo,
1560 lock->l_policy_data.l_extent.start,
1561 lock->l_policy_data.l_extent.end, NULL);
1563 CERROR("Error %d syncing data on lock cancel\n", rc);
1566 OBD_FREE_PTR(oinfo);
1569 rc = ldlm_server_blocking_ast(lock, desc, data, flag);
1573 static int ost_filter_recovery_request(struct ptlrpc_request *req,
1574 struct obd_device *obd, int *process)
1576 switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1577 case OST_CONNECT: /* This will never get here, but for completeness. */
1578 case OST_DISCONNECT:
1589 case OBD_LOG_CANCEL:
1591 *process = target_queue_recovery_request(req, obd);
1595 DEBUG_REQ(D_WARNING, req, "not permitted during recovery");
1601 int ost_msg_check_version(struct lustre_msg *msg)
1605 switch(lustre_msg_get_opc(msg)) {
1607 case OST_DISCONNECT:
1610 case SEC_CTX_INIT_CONT:
1612 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1614 CERROR("bad opc %u version %08x, expecting %08x\n",
1615 lustre_msg_get_opc(msg),
1616 lustre_msg_get_version(msg),
1617 LUSTRE_OBD_VERSION);
1630 #ifdef HAVE_QUOTA_SUPPORT
1631 case OST_QUOTACHECK:
1633 case OST_QUOTA_ADJUST_QUNIT:
1635 rc = lustre_msg_check_version(msg, LUSTRE_OST_VERSION);
1637 CERROR("bad opc %u version %08x, expecting %08x\n",
1638 lustre_msg_get_opc(msg),
1639 lustre_msg_get_version(msg),
1640 LUSTRE_OST_VERSION);
1645 case LDLM_BL_CALLBACK:
1646 case LDLM_CP_CALLBACK:
1647 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1649 CERROR("bad opc %u version %08x, expecting %08x\n",
1650 lustre_msg_get_opc(msg),
1651 lustre_msg_get_version(msg),
1652 LUSTRE_DLM_VERSION);
1654 case LLOG_ORIGIN_CONNECT:
1655 case OBD_LOG_CANCEL:
1656 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1658 CERROR("bad opc %u version %08x, expecting %08x\n",
1659 lustre_msg_get_opc(msg),
1660 lustre_msg_get_version(msg),
1661 LUSTRE_LOG_VERSION);
1664 CERROR("Unexpected opcode %d\n", lustre_msg_get_opc(msg));
1670 struct ost_prolong_data {
1671 struct ptlrpc_request *opd_req;
1672 struct obd_export *opd_exp;
1673 struct obdo *opd_oa;
1674 struct ldlm_res_id opd_resid;
1675 struct ldlm_extent opd_extent;
1676 ldlm_mode_t opd_mode;
1677 unsigned int opd_locks;
1681 /* prolong locks for the current service time of the corresponding
1682 * portal (= OST_IO_PORTAL)
1684 static inline int prolong_timeout(struct ptlrpc_request *req)
1686 struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
1689 return obd_timeout / 2;
1691 return max(at_est2timeout(at_get(&svc->srv_at_estimate)), ldlm_timeout);
1694 static void ost_prolong_lock_one(struct ost_prolong_data *opd,
1695 struct ldlm_lock *lock)
1697 LASSERT(lock->l_export == opd->opd_exp);
1699 if (lock->l_destroyed) /* lock already cancelled */
1702 /* XXX: never try to grab resource lock here because we're inside
1703 * exp_bl_list_lock; in ldlm_lockd.c to handle waiting list we take
1704 * res lock and then exp_bl_list_lock. */
1706 if (!(lock->l_flags & LDLM_FL_AST_SENT))
1707 /* ignore locks not being cancelled */
1711 "refreshed for req x"LPU64" ext("LPU64"->"LPU64") to %ds.\n",
1712 opd->opd_req->rq_xid, opd->opd_extent.start,
1713 opd->opd_extent.end, opd->opd_timeout);
1715 /* OK. this is a possible lock the user holds doing I/O
1716 * let's refresh eviction timer for it */
1717 ldlm_refresh_waiting_lock(lock, opd->opd_timeout);
1721 static void ost_prolong_locks(struct ost_prolong_data *data)
1723 struct obd_export *exp = data->opd_exp;
1724 struct obdo *oa = data->opd_oa;
1725 struct ldlm_lock *lock;
1728 if (oa->o_valid & OBD_MD_FLHANDLE) {
1729 /* mostly a request should be covered by only one lock, try
1731 lock = ldlm_handle2lock(&oa->o_handle);
1733 /* Fast path to check if the lock covers the whole IO
1734 * region exclusively. */
1735 if (lock->l_granted_mode == LCK_PW &&
1736 ldlm_extent_contain(&lock->l_policy_data.l_extent,
1737 &data->opd_extent)) {
1739 ost_prolong_lock_one(data, lock);
1740 LDLM_LOCK_PUT(lock);
1743 LDLM_LOCK_PUT(lock);
1748 cfs_spin_lock_bh(&exp->exp_bl_list_lock);
1749 cfs_list_for_each_entry(lock, &exp->exp_bl_list, l_exp_list) {
1750 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
1751 LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
1753 if (!ldlm_res_eq(&data->opd_resid, &lock->l_resource->lr_name))
1756 if (!ldlm_extent_overlap(&lock->l_policy_data.l_extent,
1760 ost_prolong_lock_one(data, lock);
1762 cfs_spin_unlock_bh(&exp->exp_bl_list_lock);
1768 * Returns 1 if the given PTLRPC matches the given LDLM locks, or 0 if it does
1771 static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
1772 struct ldlm_lock *lock)
1774 struct niobuf_remote *nb;
1775 struct obd_ioobj *ioo;
1777 struct ldlm_extent ext;
1780 opc = lustre_msg_get_opc(req->rq_reqmsg);
1781 LASSERT(opc == OST_READ || opc == OST_WRITE);
1783 ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
1784 LASSERT(ioo != NULL);
1786 nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
1787 LASSERT(nb != NULL);
1789 ext.start = nb->offset;
1790 nb += ioo->ioo_bufcnt - 1;
1791 ext.end = nb->offset + nb->len - 1;
1793 LASSERT(lock->l_resource != NULL);
1794 if (!osc_res_name_eq(ioo->ioo_id, ioo->ioo_seq,
1795 &lock->l_resource->lr_name))
1799 if (opc == OST_READ)
1801 if (!(lock->l_granted_mode & mode))
1804 RETURN(ldlm_extent_overlap(&lock->l_policy_data.l_extent, &ext));
1808 * High-priority queue request check for whether the given PTLRPC request (\a
1809 * req) is blocking an LDLM lock cancel.
1811 * Returns 1 if the given given PTLRPC request (\a req) is blocking an LDLM lock
1812 * cancel, 0 if it is not, and -EFAULT if the request is malformed.
1814 * Only OST_READs, OST_WRITEs and OST_PUNCHes go on the h-p RPC queue. This
1815 * function looks only at OST_READs and OST_WRITEs.
1817 static int ost_rw_hpreq_check(struct ptlrpc_request *req)
1819 struct obd_device *obd = req->rq_export->exp_obd;
1820 struct ost_body *body;
1821 struct obd_ioobj *ioo;
1822 struct niobuf_remote *nb;
1823 struct ost_prolong_data opd = { 0 };
1828 * Use LASSERT to do sanity check because malformed RPCs should have
1829 * been filtered out in ost_hpreq_handler().
1831 opc = lustre_msg_get_opc(req->rq_reqmsg);
1832 LASSERT(opc == OST_READ || opc == OST_WRITE);
1834 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1835 LASSERT(body != NULL);
1837 ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
1838 LASSERT(ioo != NULL);
1840 nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
1841 LASSERT(nb != NULL);
1842 LASSERT(!(nb->flags & OBD_BRW_SRVLOCK));
1844 osc_build_res_name(ioo->ioo_id, ioo->ioo_seq, &opd.opd_resid);
1848 if (opc == OST_READ)
1850 opd.opd_mode = mode;
1851 opd.opd_exp = req->rq_export;
1852 opd.opd_oa = &body->oa;
1853 opd.opd_extent.start = nb->offset;
1854 nb += ioo->ioo_bufcnt - 1;
1855 opd.opd_extent.end = nb->offset + nb->len - 1;
1856 opd.opd_timeout = prolong_timeout(req);
1858 DEBUG_REQ(D_RPCTRACE, req,
1859 "%s %s: refresh rw locks: " LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
1860 obd->obd_name, cfs_current()->comm,
1861 opd.opd_resid.name[0], opd.opd_resid.name[1],
1862 opd.opd_extent.start, opd.opd_extent.end);
1864 ost_prolong_locks(&opd);
1866 CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
1867 obd->obd_name, opd.opd_locks, req);
1869 RETURN(opd.opd_locks);
1872 static void ost_rw_hpreq_fini(struct ptlrpc_request *req)
1874 (void)ost_rw_hpreq_check(req);
1878 * Like ost_rw_hpreq_lock_match(), but for OST_PUNCH RPCs.
1880 static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
1881 struct ldlm_lock *lock)
1883 struct ost_body *body;
1886 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1887 LASSERT(body != NULL);
1889 if (body->oa.o_valid & OBD_MD_FLHANDLE &&
1890 body->oa.o_handle.cookie == lock->l_handle.h_cookie)
1897 * Like ost_rw_hpreq_check(), but for OST_PUNCH RPCs.
1899 static int ost_punch_hpreq_check(struct ptlrpc_request *req)
1901 struct obd_device *obd = req->rq_export->exp_obd;
1902 struct ost_body *body;
1904 struct ost_prolong_data opd = { 0 };
1908 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1909 LASSERT(body != NULL);
1912 LASSERT(!(oa->o_valid & OBD_MD_FLFLAGS) ||
1913 !(oa->o_flags & OBD_FL_SRVLOCK));
1916 end = start + oa->o_blocks;
1919 opd.opd_mode = LCK_PW;
1920 opd.opd_exp = req->rq_export;
1922 opd.opd_extent.start = start;
1923 opd.opd_extent.end = end;
1924 if (oa->o_blocks == OBD_OBJECT_EOF)
1925 opd.opd_extent.end = OBD_OBJECT_EOF;
1926 opd.opd_timeout = prolong_timeout(req);
1928 osc_build_res_name(oa->o_id, oa->o_seq, &opd.opd_resid);
1931 "%s: refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
1933 opd.opd_resid.name[0], opd.opd_resid.name[1],
1934 opd.opd_extent.start, opd.opd_extent.end);
1936 ost_prolong_locks(&opd);
1938 CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
1939 obd->obd_name, opd.opd_locks, req);
1941 RETURN(opd.opd_locks > 0);
1944 static void ost_punch_hpreq_fini(struct ptlrpc_request *req)
1946 (void)ost_punch_hpreq_check(req);
1949 struct ptlrpc_hpreq_ops ost_hpreq_rw = {
1950 .hpreq_lock_match = ost_rw_hpreq_lock_match,
1951 .hpreq_check = ost_rw_hpreq_check,
1952 .hpreq_fini = ost_rw_hpreq_fini
1955 struct ptlrpc_hpreq_ops ost_hpreq_punch = {
1956 .hpreq_lock_match = ost_punch_hpreq_lock_match,
1957 .hpreq_check = ost_punch_hpreq_check,
1958 .hpreq_fini = ost_punch_hpreq_fini
1961 /** Assign high priority operations to the request if needed. */
1962 static int ost_hpreq_handler(struct ptlrpc_request *req)
1965 if (req->rq_export) {
1966 int opc = lustre_msg_get_opc(req->rq_reqmsg);
1967 struct ost_body *body;
1969 if (opc == OST_READ || opc == OST_WRITE) {
1970 struct niobuf_remote *nb;
1971 struct obd_ioobj *ioo;
1972 int objcount, niocount;
1976 /* RPCs on the H-P queue can be inspected before
1977 * ost_handler() initializes their pills, so we
1978 * initialize that here. Capsule initialization is
1979 * idempotent, as is setting the pill's format (provided
1980 * it doesn't change).
1982 req_capsule_init(&req->rq_pill, req, RCL_SERVER);
1983 if (opc == OST_READ)
1984 req_capsule_set(&req->rq_pill,
1987 req_capsule_set(&req->rq_pill,
1988 &RQF_OST_BRW_WRITE);
1990 body = req_capsule_client_get(&req->rq_pill,
1993 CERROR("Missing/short ost_body\n");
1997 objcount = req_capsule_get_size(&req->rq_pill,
2001 if (objcount == 0) {
2002 CERROR("Missing/short ioobj\n");
2006 CERROR("too many ioobjs (%d)\n", objcount);
2010 ioo = req_capsule_client_get(&req->rq_pill,
2013 CERROR("Missing/short ioobj\n");
2017 rc = ost_validate_obdo(req->rq_export, &body->oa, ioo);
2019 CERROR("invalid object ids\n");
2023 for (niocount = i = 0; i < objcount; i++) {
2024 if (ioo[i].ioo_bufcnt == 0) {
2025 CERROR("ioo[%d] has zero bufcnt\n", i);
2028 niocount += ioo[i].ioo_bufcnt;
2030 if (niocount > PTLRPC_MAX_BRW_PAGES) {
2031 DEBUG_REQ(D_RPCTRACE, req,
2032 "bulk has too many pages (%d)",
2037 nb = req_capsule_client_get(&req->rq_pill,
2038 &RMF_NIOBUF_REMOTE);
2040 CERROR("Missing/short niobuf\n");
2044 if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
2045 req->rq_ops = &ost_hpreq_rw;
2046 } else if (opc == OST_PUNCH) {
2047 req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2048 req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
2050 body = req_capsule_client_get(&req->rq_pill,
2053 CERROR("Missing/short ost_body\n");
2057 if (!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
2058 !(body->oa.o_flags & OBD_FL_SRVLOCK))
2059 req->rq_ops = &ost_hpreq_punch;
2065 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
2066 int ost_handle(struct ptlrpc_request *req)
2068 struct obd_trans_info trans_info = { 0, };
2069 struct obd_trans_info *oti = &trans_info;
2070 int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
2071 struct obd_device *obd = NULL;
2074 /* OST module is kept between remounts, but the last reference
2075 * to specific module (say, osd or ofd) kills all related keys
2076 * from the environment. so we have to refill it until the root
2077 * cause is fixed properly */
2078 lu_env_refill(req->rq_svc_thread->t_env);
2080 LASSERT(current->journal_info == NULL);
2082 /* primordial rpcs don't affect server recovery */
2083 switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2085 case SEC_CTX_INIT_CONT:
2090 req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2092 if (lustre_msg_get_opc(req->rq_reqmsg) != OST_CONNECT) {
2093 if (!class_connected_export(req->rq_export)) {
2094 CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
2095 lustre_msg_get_opc(req->rq_reqmsg),
2096 libcfs_id2str(req->rq_peer));
2097 req->rq_status = -ENOTCONN;
2098 GOTO(out, rc = -ENOTCONN);
2101 obd = req->rq_export->exp_obd;
2103 /* Check for aborted recovery. */
2104 if (obd->obd_recovering) {
2105 rc = ost_filter_recovery_request(req, obd,
2107 if (rc || !should_process)
2109 else if (should_process < 0) {
2110 req->rq_status = should_process;
2111 rc = ptlrpc_error(req);
2119 rc = ost_msg_check_version(req->rq_reqmsg);
2123 if (req && req->rq_reqmsg && req->rq_export &&
2124 (req->rq_export->exp_connect_flags & OBD_CONNECT_JOBSTATS))
2125 oti->oti_jobid = lustre_msg_get_jobid(req->rq_reqmsg);
2127 switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2129 CDEBUG(D_INODE, "connect\n");
2130 req_capsule_set(&req->rq_pill, &RQF_OST_CONNECT);
2131 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET))
2133 rc = target_handle_connect(req);
2134 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET2))
2137 rc = ost_init_sec_level(req);
2139 rc = ost_connect_check_sptlrpc(req);
2143 case OST_DISCONNECT:
2144 CDEBUG(D_INODE, "disconnect\n");
2145 req_capsule_set(&req->rq_pill, &RQF_OST_DISCONNECT);
2146 if (OBD_FAIL_CHECK(OBD_FAIL_OST_DISCONNECT_NET))
2148 rc = target_handle_disconnect(req);
2151 CDEBUG(D_INODE, "create\n");
2152 req_capsule_set(&req->rq_pill, &RQF_OST_CREATE);
2153 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CREATE_NET))
2155 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2156 GOTO(out, rc = -EROFS);
2157 rc = ost_create(req->rq_export, req, oti);
2160 CDEBUG(D_INODE, "destroy\n");
2161 req_capsule_set(&req->rq_pill, &RQF_OST_DESTROY);
2162 if (OBD_FAIL_CHECK(OBD_FAIL_OST_DESTROY_NET))
2164 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2165 GOTO(out, rc = -EROFS);
2166 rc = ost_destroy(req->rq_export, req, oti);
2169 CDEBUG(D_INODE, "getattr\n");
2170 req_capsule_set(&req->rq_pill, &RQF_OST_GETATTR);
2171 if (OBD_FAIL_CHECK(OBD_FAIL_OST_GETATTR_NET))
2173 rc = ost_getattr(req->rq_export, req);
2176 CDEBUG(D_INODE, "setattr\n");
2177 req_capsule_set(&req->rq_pill, &RQF_OST_SETATTR);
2178 if (OBD_FAIL_CHECK(OBD_FAIL_OST_SETATTR_NET))
2180 rc = ost_setattr(req->rq_export, req, oti);
2183 req_capsule_set(&req->rq_pill, &RQF_OST_BRW_WRITE);
2184 CDEBUG(D_INODE, "write\n");
2185 /* req->rq_request_portal would be nice, if it was set */
2186 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
2187 CERROR("%s: deny write request from %s to portal %u\n",
2188 req->rq_export->exp_obd->obd_name,
2189 obd_export_nid2str(req->rq_export),
2190 req->rq_rqbd->rqbd_service->srv_req_portal);
2191 GOTO(out, rc = -EPROTO);
2193 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_NET))
2195 if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC))
2196 GOTO(out, rc = -ENOSPC);
2197 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2198 GOTO(out, rc = -EROFS);
2199 rc = ost_brw_write(req, oti);
2200 LASSERT(current->journal_info == NULL);
2201 /* ost_brw_write sends its own replies */
2204 req_capsule_set(&req->rq_pill, &RQF_OST_BRW_READ);
2205 CDEBUG(D_INODE, "read\n");
2206 /* req->rq_request_portal would be nice, if it was set */
2207 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
2208 CERROR("%s: deny read request from %s to portal %u\n",
2209 req->rq_export->exp_obd->obd_name,
2210 obd_export_nid2str(req->rq_export),
2211 req->rq_rqbd->rqbd_service->srv_req_portal);
2212 GOTO(out, rc = -EPROTO);
2214 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_NET))
2216 rc = ost_brw_read(req, oti);
2217 LASSERT(current->journal_info == NULL);
2218 /* ost_brw_read sends its own replies */
2221 CDEBUG(D_INODE, "punch\n");
2222 req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
2223 if (OBD_FAIL_CHECK(OBD_FAIL_OST_PUNCH_NET))
2225 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2226 GOTO(out, rc = -EROFS);
2227 rc = ost_punch(req->rq_export, req, oti);
2230 CDEBUG(D_INODE, "statfs\n");
2231 req_capsule_set(&req->rq_pill, &RQF_OST_STATFS);
2232 if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_NET))
2234 rc = ost_statfs(req);
2237 CDEBUG(D_INODE, "sync\n");
2238 req_capsule_set(&req->rq_pill, &RQF_OST_SYNC);
2239 if (OBD_FAIL_CHECK(OBD_FAIL_OST_SYNC_NET))
2241 rc = ost_sync(req->rq_export, req, oti);
2244 DEBUG_REQ(D_INODE, req, "set_info");
2245 req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
2246 rc = ost_set_info(req->rq_export, req);
2249 DEBUG_REQ(D_INODE, req, "get_info");
2250 req_capsule_set(&req->rq_pill, &RQF_OST_GET_INFO_GENERIC);
2251 rc = ost_get_info(req->rq_export, req);
2253 #ifdef HAVE_QUOTA_SUPPORT
2254 case OST_QUOTACHECK:
2255 CDEBUG(D_INODE, "quotacheck\n");
2256 req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACHECK);
2257 if (OBD_FAIL_CHECK(OBD_FAIL_OST_QUOTACHECK_NET))
2259 rc = ost_handle_quotacheck(req);
2262 CDEBUG(D_INODE, "quotactl\n");
2263 req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACTL);
2264 if (OBD_FAIL_CHECK(OBD_FAIL_OST_QUOTACTL_NET))
2266 rc = ost_handle_quotactl(req);
2268 case OST_QUOTA_ADJUST_QUNIT:
2269 CDEBUG(D_INODE, "quota_adjust_qunit\n");
2270 req_capsule_set(&req->rq_pill, &RQF_OST_QUOTA_ADJUST_QUNIT);
2271 rc = ost_handle_quota_adjust_qunit(req);
2275 DEBUG_REQ(D_INODE, req, "ping");
2276 req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
2277 rc = target_handle_ping(req);
2279 /* FIXME - just reply status */
2280 case LLOG_ORIGIN_CONNECT:
2281 DEBUG_REQ(D_INODE, req, "log connect");
2282 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_CONNECT);
2283 rc = ost_llog_handle_connect(req->rq_export, req);
2284 req->rq_status = rc;
2285 rc = req_capsule_server_pack(&req->rq_pill);
2288 RETURN(ptlrpc_reply(req));
2289 case OBD_LOG_CANCEL:
2290 CDEBUG(D_INODE, "log cancel\n");
2291 req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
2292 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
2294 rc = llog_origin_handle_cancel(req);
2295 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_REP))
2297 req->rq_status = rc;
2298 rc = req_capsule_server_pack(&req->rq_pill);
2301 RETURN(ptlrpc_reply(req));
2303 CDEBUG(D_INODE, "enqueue\n");
2304 req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
2305 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE))
2307 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
2309 ldlm_server_glimpse_ast);
2310 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
2313 CDEBUG(D_INODE, "convert\n");
2314 req_capsule_set(&req->rq_pill, &RQF_LDLM_CONVERT);
2315 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CONVERT))
2317 rc = ldlm_handle_convert(req);
2320 CDEBUG(D_INODE, "cancel\n");
2321 req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
2322 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL))
2324 rc = ldlm_handle_cancel(req);
2326 case LDLM_BL_CALLBACK:
2327 case LDLM_CP_CALLBACK:
2328 CDEBUG(D_INODE, "callback\n");
2329 CERROR("callbacks should not happen on OST\n");
2332 CERROR("Unexpected opcode %d\n",
2333 lustre_msg_get_opc(req->rq_reqmsg));
2334 req->rq_status = -ENOTSUPP;
2335 rc = ptlrpc_error(req);
2339 LASSERT(current->journal_info == NULL);
2342 /* If we're DISCONNECTing, the export_data is already freed */
2343 if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != OST_DISCONNECT)
2344 target_committed_to_req(req);
2348 oti_to_request(oti, req);
2350 target_send_reply(req, rc, fail);
2353 EXPORT_SYMBOL(ost_handle);
2355 * free per-thread pool created by ost_thread_init().
2357 static void ost_thread_done(struct ptlrpc_thread *thread)
2359 struct ost_thread_local_cache *tls; /* TLS stands for Thread-Local
2364 LASSERT(thread != NULL);
2367 * be prepared to handle partially-initialized pools (because this is
2368 * called from ost_thread_init() for cleanup.
2370 tls = thread->t_data;
2373 thread->t_data = NULL;
2379 * initialize per-thread page pool (bug 5137).
2381 static int ost_thread_init(struct ptlrpc_thread *thread)
2383 struct ost_thread_local_cache *tls;
2387 LASSERT(thread != NULL);
2388 LASSERT(thread->t_data == NULL);
2389 LASSERTF(thread->t_id <= OSS_THREADS_MAX, "%u\n", thread->t_id);
2394 thread->t_data = tls;
2398 #define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000)
2400 /* Sigh - really, this is an OSS, the _server_, not the _target_ */
2401 static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
2403 static struct ptlrpc_service_conf svc_conf;
2404 struct ost_obd *ost = &obd->u.ost;
2405 struct lprocfs_static_vars lvars;
2406 int oss_min_threads;
2407 int oss_max_threads;
2408 int oss_min_create_threads;
2409 int oss_max_create_threads;
2413 rc = cfs_cleanup_group_info();
2417 lprocfs_ost_init_vars(&lvars);
2418 lprocfs_obd_setup(obd, lvars.obd_vars);
2420 cfs_mutex_init(&ost->ost_health_mutex);
2422 if (oss_num_threads) {
2423 /* If oss_num_threads is set, it is the min and the max. */
2424 if (oss_num_threads > OSS_THREADS_MAX)
2425 oss_num_threads = OSS_THREADS_MAX;
2426 if (oss_num_threads < OSS_THREADS_MIN)
2427 oss_num_threads = OSS_THREADS_MIN;
2428 oss_max_threads = oss_min_threads = oss_num_threads;
2430 /* Base min threads on memory and cpus */
2432 cfs_num_online_cpus() * CFS_NUM_CACHEPAGES >>
2433 (27 - CFS_PAGE_SHIFT);
2434 if (oss_min_threads < OSS_THREADS_MIN)
2435 oss_min_threads = OSS_THREADS_MIN;
2436 /* Insure a 4x range for dynamic threads */
2437 if (oss_min_threads > OSS_THREADS_MAX / 4)
2438 oss_min_threads = OSS_THREADS_MAX / 4;
2439 oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4 + 1);
2442 svc_conf = (typeof(svc_conf)) {
2443 .psc_name = LUSTRE_OSS_NAME,
2444 .psc_watchdog_factor = OSS_SERVICE_WATCHDOG_FACTOR,
2446 .bc_nbufs = OST_NBUFS,
2447 .bc_buf_size = OST_BUFSIZE,
2448 .bc_req_max_size = OST_MAXREQSIZE,
2449 .bc_rep_max_size = OST_MAXREPSIZE,
2450 .bc_req_portal = OST_REQUEST_PORTAL,
2451 .bc_rep_portal = OSC_REPLY_PORTAL,
2454 .tc_thr_name = "ll_ost",
2455 .tc_nthrs_min = oss_min_threads,
2456 .tc_nthrs_max = oss_max_threads,
2457 .tc_ctx_tags = LCT_DT_THREAD,
2460 .so_req_handler = ost_handle,
2461 .so_req_printer = target_print_req,
2464 ost->ost_service = ptlrpc_register_service(&svc_conf,
2465 obd->obd_proc_entry);
2466 if (IS_ERR(ost->ost_service)) {
2467 rc = PTR_ERR(ost->ost_service);
2468 CERROR("failed to start service: %d\n", rc);
2469 GOTO(out_lprocfs, rc);
2472 if (oss_num_create_threads) {
2473 if (oss_num_create_threads > OSS_MAX_CREATE_THREADS)
2474 oss_num_create_threads = OSS_MAX_CREATE_THREADS;
2475 if (oss_num_create_threads < OSS_MIN_CREATE_THREADS)
2476 oss_num_create_threads = OSS_MIN_CREATE_THREADS;
2477 oss_min_create_threads = oss_max_create_threads =
2478 oss_num_create_threads;
2480 oss_min_create_threads = OSS_MIN_CREATE_THREADS;
2481 oss_max_create_threads = OSS_MAX_CREATE_THREADS;
2484 memset(&svc_conf, 0, sizeof(svc_conf));
2485 svc_conf = (typeof(svc_conf)) {
2486 .psc_name = "ost_create",
2487 .psc_watchdog_factor = OSS_SERVICE_WATCHDOG_FACTOR,
2489 .bc_nbufs = OST_NBUFS,
2490 .bc_buf_size = OST_BUFSIZE,
2491 .bc_req_max_size = OST_MAXREQSIZE,
2492 .bc_rep_max_size = OST_MAXREPSIZE,
2493 .bc_req_portal = OST_CREATE_PORTAL,
2494 .bc_rep_portal = OSC_REPLY_PORTAL,
2497 .tc_thr_name = "ll_ost_create",
2498 .tc_nthrs_min = oss_min_create_threads,
2499 .tc_nthrs_max = oss_max_create_threads,
2500 .tc_ctx_tags = LCT_DT_THREAD,
2503 .so_req_handler = ost_handle,
2504 .so_req_printer = target_print_req,
2507 ost->ost_create_service = ptlrpc_register_service(&svc_conf,
2508 obd->obd_proc_entry);
2509 if (IS_ERR(ost->ost_create_service)) {
2510 rc = PTR_ERR(ost->ost_create_service);
2511 CERROR("failed to start OST create service: %d\n", rc);
2512 GOTO(out_service, rc);
2515 memset(&svc_conf, 0, sizeof(svc_conf));
2516 svc_conf = (typeof(svc_conf)) {
2517 .psc_name = "ost_io",
2518 .psc_watchdog_factor = OSS_SERVICE_WATCHDOG_FACTOR,
2520 .bc_nbufs = OST_NBUFS,
2521 .bc_buf_size = OST_BUFSIZE,
2522 .bc_req_max_size = OST_MAXREQSIZE,
2523 .bc_rep_max_size = OST_MAXREPSIZE,
2524 .bc_req_portal = OST_IO_PORTAL,
2525 .bc_rep_portal = OSC_REPLY_PORTAL,
2528 .tc_thr_name = "ll_ost_io",
2529 .tc_nthrs_min = oss_min_threads,
2530 .tc_nthrs_max = oss_max_threads,
2531 .tc_cpu_affinity = 1,
2532 .tc_ctx_tags = LCT_DT_THREAD,
2535 .so_thr_init = ost_thread_init,
2536 .so_thr_done = ost_thread_done,
2537 .so_req_handler = ost_handle,
2538 .so_hpreq_handler = ost_hpreq_handler,
2539 .so_req_printer = target_print_req,
2542 ost->ost_io_service = ptlrpc_register_service(&svc_conf,
2543 obd->obd_proc_entry);
2544 if (IS_ERR(ost->ost_io_service)) {
2545 rc = PTR_ERR(ost->ost_io_service);
2546 CERROR("failed to start OST I/O service: %d\n", rc);
2547 ost->ost_io_service = NULL;
2548 GOTO(out_create, rc);
2551 ping_evictor_start();
2556 ptlrpc_unregister_service(ost->ost_create_service);
2557 ost->ost_create_service = NULL;
2559 ptlrpc_unregister_service(ost->ost_service);
2560 ost->ost_service = NULL;
2562 lprocfs_obd_cleanup(obd);
2566 static int ost_cleanup(struct obd_device *obd)
2568 struct ost_obd *ost = &obd->u.ost;
2572 ping_evictor_stop();
2574 /* there is no recovery for OST OBD, all recovery is controlled by
2576 LASSERT(obd->obd_recovering == 0);
2577 cfs_mutex_lock(&ost->ost_health_mutex);
2578 ptlrpc_unregister_service(ost->ost_service);
2579 ptlrpc_unregister_service(ost->ost_create_service);
2580 ptlrpc_unregister_service(ost->ost_io_service);
2581 ost->ost_service = NULL;
2582 ost->ost_create_service = NULL;
2583 ost->ost_io_service = NULL;
2585 cfs_mutex_unlock(&ost->ost_health_mutex);
2587 lprocfs_obd_cleanup(obd);
2592 static int ost_health_check(const struct lu_env *env, struct obd_device *obd)
2594 struct ost_obd *ost = &obd->u.ost;
2597 cfs_mutex_lock(&ost->ost_health_mutex);
2598 rc |= ptlrpc_service_health_check(ost->ost_service);
2599 rc |= ptlrpc_service_health_check(ost->ost_create_service);
2600 rc |= ptlrpc_service_health_check(ost->ost_io_service);
2601 cfs_mutex_unlock(&ost->ost_health_mutex);
2604 * health_check to return 0 on healthy
2605 * and 1 on unhealthy.
2613 struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r)
2615 return (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
2618 /* use obd ops to offer management infrastructure */
2619 static struct obd_ops ost_obd_ops = {
2620 .o_owner = THIS_MODULE,
2621 .o_setup = ost_setup,
2622 .o_cleanup = ost_cleanup,
2623 .o_health_check = ost_health_check,
2627 static int __init ost_init(void)
2629 struct lprocfs_static_vars lvars;
2633 lprocfs_ost_init_vars(&lvars);
2634 rc = class_register_type(&ost_obd_ops, NULL, lvars.module_vars,
2635 LUSTRE_OSS_NAME, NULL);
2637 if (ost_num_threads != 0 && oss_num_threads == 0) {
2638 LCONSOLE_INFO("ost_num_threads module parameter is deprecated, "
2639 "use oss_num_threads instead or unset both for "
2640 "dynamic thread startup\n");
2641 oss_num_threads = ost_num_threads;
2647 static void /*__exit*/ ost_exit(void)
2649 class_unregister_type(LUSTRE_OSS_NAME);
2652 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2653 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
2654 MODULE_LICENSE("GPL");
2656 module_init(ost_init);
2657 module_exit(ost_exit);