4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2015, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
39 #include <linux/module.h>
42 #include <obd_class.h>
43 #include <lustre_dlm.h>
44 #include <lustre_fid.h>
45 #include <lustre_intent.h>
46 #include <lustre_mdc.h>
47 #include <lustre_net.h>
48 #include <lustre_req_layout.h>
49 #include <lustre_swab.h>
51 #include "mdc_internal.h"
53 struct mdc_getattr_args {
54 struct obd_export *ga_exp;
55 struct md_enqueue_info *ga_minfo;
58 int it_open_error(int phase, struct lookup_intent *it)
60 if (it_disposition(it, DISP_OPEN_LEASE)) {
61 if (phase >= DISP_OPEN_LEASE)
66 if (it_disposition(it, DISP_OPEN_OPEN)) {
67 if (phase >= DISP_OPEN_OPEN)
73 if (it_disposition(it, DISP_OPEN_CREATE)) {
74 if (phase >= DISP_OPEN_CREATE)
80 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
81 if (phase >= DISP_LOOKUP_EXECD)
87 if (it_disposition(it, DISP_IT_EXECD)) {
88 if (phase >= DISP_IT_EXECD)
94 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
99 EXPORT_SYMBOL(it_open_error);
101 /* this must be called on a lockh that is known to have a referenced lock */
102 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
103 void *data, __u64 *bits)
105 struct ldlm_lock *lock;
106 struct inode *new_inode = data;
112 if (!lustre_handle_is_used(lockh))
115 lock = ldlm_handle2lock(lockh);
117 LASSERT(lock != NULL);
118 lock_res_and_lock(lock);
119 if (lock->l_resource->lr_lvb_inode &&
120 lock->l_resource->lr_lvb_inode != data) {
121 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
122 LASSERTF(old_inode->i_state & I_FREEING,
123 "Found existing inode %p/%lu/%u state %lu in lock: "
124 "setting data to %p/%lu/%u\n", old_inode,
125 old_inode->i_ino, old_inode->i_generation,
127 new_inode, new_inode->i_ino, new_inode->i_generation);
129 lock->l_resource->lr_lvb_inode = new_inode;
131 *bits = lock->l_policy_data.l_inodebits.bits;
133 unlock_res_and_lock(lock);
139 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
140 const struct lu_fid *fid, enum ldlm_type type,
141 union ldlm_policy_data *policy,
142 enum ldlm_mode mode, struct lustre_handle *lockh)
144 struct ldlm_res_id res_id;
148 fid_build_reg_res_name(fid, &res_id);
149 /* LU-4405: Clear bits not supported by server */
150 policy->l_inodebits.bits &= exp_connect_ibits(exp);
151 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
152 &res_id, type, policy, mode, lockh, 0);
156 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
157 union ldlm_policy_data *policy, enum ldlm_mode mode,
158 enum ldlm_cancel_flags flags, void *opaque)
160 struct obd_device *obd = class_exp2obd(exp);
161 struct ldlm_res_id res_id;
166 fid_build_reg_res_name(fid, &res_id);
167 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
168 policy, mode, flags, opaque);
172 int mdc_null_inode(struct obd_export *exp,
173 const struct lu_fid *fid)
175 struct ldlm_res_id res_id;
176 struct ldlm_resource *res;
177 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
180 LASSERTF(ns != NULL, "no namespace passed\n");
182 fid_build_reg_res_name(fid, &res_id);
184 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
189 res->lr_lvb_inode = NULL;
192 ldlm_resource_putref(res);
196 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
198 /* Don't hold error requests for replay. */
199 if (req->rq_replay) {
200 spin_lock(&req->rq_lock);
202 spin_unlock(&req->rq_lock);
204 if (rc && req->rq_transno != 0) {
205 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
210 /* Save a large LOV EA into the request buffer so that it is available
211 * for replay. We don't do this in the initial request because the
212 * original request doesn't need this buffer (at most it sends just the
213 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
214 * buffer and may also be difficult to allocate and save a very large
215 * request buffer for each open. (bug 5707)
217 * OOM here may cause recovery failure if lmm is needed (only for the
218 * original open if the MDS crashed just when this client also OOM'd)
219 * but this is incredibly unlikely, and questionable whether the client
220 * could do MDS recovery under OOM anyways... */
221 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
222 struct mdt_body *body)
226 /* FIXME: remove this explicit offset. */
227 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
228 body->mbo_eadatasize);
230 CERROR("Can't enlarge segment %d size to %d\n",
231 DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
232 body->mbo_valid &= ~OBD_MD_FLEASIZE;
233 body->mbo_eadatasize = 0;
237 static struct ptlrpc_request *
238 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
239 struct md_op_data *op_data)
241 struct ptlrpc_request *req;
242 struct obd_device *obddev = class_exp2obd(exp);
243 struct ldlm_intent *lit;
244 const void *lmm = op_data->op_data;
245 __u32 lmmsize = op_data->op_data_size;
246 struct list_head cancels = LIST_HEAD_INIT(cancels);
252 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
254 /* XXX: openlock is not cancelled for cross-refs. */
255 /* If inode is known, cancel conflicting OPEN locks. */
256 if (fid_is_sane(&op_data->op_fid2)) {
257 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
258 if (it->it_flags & FMODE_WRITE)
263 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
266 else if (it->it_flags & FMODE_EXEC)
272 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
277 /* If CREATE, cancel parent's UPDATE lock. */
278 if (it->it_op & IT_CREAT)
282 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
284 MDS_INODELOCK_UPDATE);
286 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
287 &RQF_LDLM_INTENT_OPEN);
289 ldlm_lock_list_put(&cancels, l_bl_ast, count);
290 RETURN(ERR_PTR(-ENOMEM));
293 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
294 op_data->op_namelen + 1);
295 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
296 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
298 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
299 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
300 strlen(op_data->op_file_secctx_name) + 1 : 0);
302 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
303 op_data->op_file_secctx_size);
305 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
307 ptlrpc_request_free(req);
311 spin_lock(&req->rq_lock);
312 req->rq_replay = req->rq_import->imp_replayable;
313 spin_unlock(&req->rq_lock);
315 /* pack the intent */
316 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
317 lit->opc = (__u64)it->it_op;
319 /* pack the intended request */
320 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
323 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
324 obddev->u.cli.cl_max_mds_easize);
325 ptlrpc_request_set_replen(req);
329 static struct ptlrpc_request *
330 mdc_intent_getxattr_pack(struct obd_export *exp,
331 struct lookup_intent *it,
332 struct md_op_data *op_data)
334 struct ptlrpc_request *req;
335 struct ldlm_intent *lit;
338 struct list_head cancels = LIST_HEAD_INIT(cancels);
342 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
343 &RQF_LDLM_INTENT_GETXATTR);
345 RETURN(ERR_PTR(-ENOMEM));
347 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
349 ptlrpc_request_free(req);
353 /* pack the intent */
354 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
355 lit->opc = IT_GETXATTR;
357 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
359 /* pack the intended request */
360 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
363 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
364 RCL_SERVER, maxdata);
366 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
367 RCL_SERVER, maxdata);
369 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
370 RCL_SERVER, maxdata);
372 ptlrpc_request_set_replen(req);
377 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
378 struct lookup_intent *it,
379 struct md_op_data *op_data)
381 struct ptlrpc_request *req;
382 struct obd_device *obddev = class_exp2obd(exp);
383 struct ldlm_intent *lit;
387 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
388 &RQF_LDLM_INTENT_UNLINK);
390 RETURN(ERR_PTR(-ENOMEM));
392 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
393 op_data->op_namelen + 1);
395 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
397 ptlrpc_request_free(req);
401 /* pack the intent */
402 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
403 lit->opc = (__u64)it->it_op;
405 /* pack the intended request */
406 mdc_unlink_pack(req, op_data);
408 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
409 obddev->u.cli.cl_default_mds_easize);
410 ptlrpc_request_set_replen(req);
414 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
415 struct lookup_intent *it,
416 struct md_op_data *op_data)
418 struct ptlrpc_request *req;
419 struct obd_device *obddev = class_exp2obd(exp);
420 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
421 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
422 OBD_MD_MEA | OBD_MD_FLACL;
423 struct ldlm_intent *lit;
428 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
429 &RQF_LDLM_INTENT_GETATTR);
431 RETURN(ERR_PTR(-ENOMEM));
433 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
434 op_data->op_namelen + 1);
436 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
438 ptlrpc_request_free(req);
442 /* pack the intent */
443 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
444 lit->opc = (__u64)it->it_op;
446 if (obddev->u.cli.cl_default_mds_easize > 0)
447 easize = obddev->u.cli.cl_default_mds_easize;
449 easize = obddev->u.cli.cl_max_mds_easize;
451 /* pack the intended request */
452 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
454 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
455 ptlrpc_request_set_replen(req);
459 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
460 struct lookup_intent *it,
461 struct md_op_data *unused)
463 struct obd_device *obd = class_exp2obd(exp);
464 struct ptlrpc_request *req;
465 struct ldlm_intent *lit;
466 struct layout_intent *layout;
470 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
471 &RQF_LDLM_INTENT_LAYOUT);
473 RETURN(ERR_PTR(-ENOMEM));
475 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
476 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
478 ptlrpc_request_free(req);
482 /* pack the intent */
483 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
484 lit->opc = (__u64)it->it_op;
486 /* pack the layout intent request */
487 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
488 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
489 * set for replication */
490 layout->li_opc = LAYOUT_INTENT_ACCESS;
492 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
493 obd->u.cli.cl_default_mds_easize);
494 ptlrpc_request_set_replen(req);
498 static struct ptlrpc_request *
499 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
501 struct ptlrpc_request *req;
505 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
507 RETURN(ERR_PTR(-ENOMEM));
509 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
511 ptlrpc_request_free(req);
515 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
516 ptlrpc_request_set_replen(req);
520 static int mdc_finish_enqueue(struct obd_export *exp,
521 struct ptlrpc_request *req,
522 struct ldlm_enqueue_info *einfo,
523 struct lookup_intent *it,
524 struct lustre_handle *lockh,
527 struct req_capsule *pill = &req->rq_pill;
528 struct ldlm_request *lockreq;
529 struct ldlm_reply *lockrep;
530 struct ldlm_lock *lock;
531 void *lvb_data = NULL;
536 /* Similarly, if we're going to replay this request, we don't want to
537 * actually get a lock, just perform the intent. */
538 if (req->rq_transno || req->rq_replay) {
539 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
540 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
543 if (rc == ELDLM_LOCK_ABORTED) {
545 memset(lockh, 0, sizeof(*lockh));
547 } else { /* rc = 0 */
548 lock = ldlm_handle2lock(lockh);
549 LASSERT(lock != NULL);
551 /* If the server gave us back a different lock mode, we should
552 * fix up our variables. */
553 if (lock->l_req_mode != einfo->ei_mode) {
554 ldlm_lock_addref(lockh, lock->l_req_mode);
555 ldlm_lock_decref(lockh, einfo->ei_mode);
556 einfo->ei_mode = lock->l_req_mode;
561 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
562 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
564 it->it_disposition = (int)lockrep->lock_policy_res1;
565 it->it_status = (int)lockrep->lock_policy_res2;
566 it->it_lock_mode = einfo->ei_mode;
567 it->it_lock_handle = lockh->cookie;
568 it->it_request = req;
570 /* Technically speaking rq_transno must already be zero if
571 * it_status is in error, so the check is a bit redundant */
572 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
573 mdc_clear_replay_flag(req, it->it_status);
575 /* If we're doing an IT_OPEN which did not result in an actual
576 * successful open, then we need to remove the bit which saves
577 * this request for unconditional replay.
579 * It's important that we do this first! Otherwise we might exit the
580 * function without doing so, and try to replay a failed create
582 if (it->it_op & IT_OPEN && req->rq_replay &&
583 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
584 mdc_clear_replay_flag(req, it->it_status);
586 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
587 it->it_op, it->it_disposition, it->it_status);
589 /* We know what to expect, so we do any byte flipping required here */
590 if (it_has_reply_body(it)) {
591 struct mdt_body *body;
593 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
595 CERROR ("Can't swab mdt_body\n");
599 if (it_disposition(it, DISP_OPEN_OPEN) &&
600 !it_open_error(DISP_OPEN_OPEN, it)) {
602 * If this is a successful OPEN request, we need to set
603 * replay handler and data early, so that if replay
604 * happens immediately after swabbing below, new reply
605 * is swabbed by that handler correctly.
607 mdc_set_open_replay_data(NULL, NULL, it);
610 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
613 mdc_update_max_ea_from_body(exp, body);
616 * The eadata is opaque; just check that it is there.
617 * Eventually, obd_unpackmd() will check the contents.
619 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
620 body->mbo_eadatasize);
624 /* save lvb data and length in case this is for layout
627 lvb_len = body->mbo_eadatasize;
630 * We save the reply LOV EA in case we have to replay a
631 * create for recovery. If we didn't allocate a large
632 * enough request buffer above we need to reallocate it
633 * here to hold the actual LOV EA.
635 * To not save LOV EA if request is not going to replay
636 * (for example error one).
638 if ((it->it_op & IT_OPEN) && req->rq_replay) {
640 if (req_capsule_get_size(pill, &RMF_EADATA,
642 body->mbo_eadatasize)
643 mdc_realloc_openmsg(req, body);
645 req_capsule_shrink(pill, &RMF_EADATA,
646 body->mbo_eadatasize,
649 req_capsule_set_size(pill, &RMF_EADATA,
651 body->mbo_eadatasize);
653 lmm = req_capsule_client_get(pill, &RMF_EADATA);
656 body->mbo_eadatasize);
659 } else if (it->it_op & IT_LAYOUT) {
660 /* maybe the lock was granted right away and layout
661 * is packed into RMF_DLM_LVB of req */
662 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
664 lvb_data = req_capsule_server_sized_get(pill,
665 &RMF_DLM_LVB, lvb_len);
666 if (lvb_data == NULL)
671 /* fill in stripe data for layout lock.
672 * LU-6581: trust layout data only if layout lock is granted. The MDT
673 * has stopped sending layout unless the layout lock is granted. The
674 * client still does this checking in case it's talking with an old
675 * server. - Jinshan */
676 lock = ldlm_handle2lock(lockh);
677 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
678 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
681 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
682 ldlm_it2str(it->it_op), lvb_len);
684 OBD_ALLOC_LARGE(lmm, lvb_len);
689 memcpy(lmm, lvb_data, lvb_len);
691 /* install lvb_data */
692 lock_res_and_lock(lock);
693 if (lock->l_lvb_data == NULL) {
694 lock->l_lvb_type = LVB_T_LAYOUT;
695 lock->l_lvb_data = lmm;
696 lock->l_lvb_len = lvb_len;
699 unlock_res_and_lock(lock);
701 OBD_FREE_LARGE(lmm, lvb_len);
709 /* We always reserve enough space in the reply packet for a stripe MD, because
710 * we don't know in advance the file type. */
711 static int mdc_enqueue_base(struct obd_export *exp,
712 struct ldlm_enqueue_info *einfo,
713 const union ldlm_policy_data *policy,
714 struct lookup_intent *it,
715 struct md_op_data *op_data,
716 struct lustre_handle *lockh,
717 __u64 extra_lock_flags)
719 struct obd_device *obddev = class_exp2obd(exp);
720 struct ptlrpc_request *req = NULL;
721 __u64 flags, saved_flags = extra_lock_flags;
722 struct ldlm_res_id res_id;
723 static const union ldlm_policy_data lookup_policy = {
724 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
725 static const union ldlm_policy_data update_policy = {
726 .l_inodebits = { MDS_INODELOCK_UPDATE } };
727 static const union ldlm_policy_data layout_policy = {
728 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
729 static const union ldlm_policy_data getxattr_policy = {
730 .l_inodebits = { MDS_INODELOCK_XATTR } };
731 int generation, resends = 0;
732 struct ldlm_reply *lockrep;
733 enum lvb_type lvb_type = 0;
737 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
739 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
742 LASSERT(policy == NULL);
744 saved_flags |= LDLM_FL_HAS_INTENT;
745 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
746 policy = &update_policy;
747 else if (it->it_op & IT_LAYOUT)
748 policy = &layout_policy;
749 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
750 policy = &getxattr_policy;
752 policy = &lookup_policy;
755 generation = obddev->u.cli.cl_import->imp_generation;
759 /* The only way right now is FLOCK. */
760 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
762 res_id.name[3] = LDLM_FLOCK;
763 } else if (it->it_op & IT_OPEN) {
764 req = mdc_intent_open_pack(exp, it, op_data);
765 } else if (it->it_op & IT_UNLINK) {
766 req = mdc_intent_unlink_pack(exp, it, op_data);
767 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
768 req = mdc_intent_getattr_pack(exp, it, op_data);
769 } else if (it->it_op & IT_READDIR) {
770 req = mdc_enqueue_pack(exp, 0);
771 } else if (it->it_op & IT_LAYOUT) {
772 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
774 req = mdc_intent_layout_pack(exp, it, op_data);
775 lvb_type = LVB_T_LAYOUT;
776 } else if (it->it_op & IT_GETXATTR) {
777 req = mdc_intent_getxattr_pack(exp, it, op_data);
784 RETURN(PTR_ERR(req));
787 req->rq_generation_set = 1;
788 req->rq_import_generation = generation;
789 req->rq_sent = cfs_time_current_sec() + resends;
792 /* It is important to obtain modify RPC slot first (if applicable), so
793 * that threads that are waiting for a modify RPC slot are not polluting
794 * our rpcs in flight counter.
795 * We do not do flock request limiting, though */
797 mdc_get_mod_rpc_slot(req, it);
798 rc = obd_get_request_slot(&obddev->u.cli);
800 mdc_put_mod_rpc_slot(req, it);
801 mdc_clear_replay_flag(req, 0);
802 ptlrpc_req_finished(req);
807 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
808 0, lvb_type, lockh, 0);
810 /* For flock requests we immediatelly return without further
811 delay and let caller deal with the rest, since rest of
812 this function metadata processing makes no sense for flock
813 requests anyway. But in case of problem during comms with
814 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
815 can not rely on caller and this mainly for F_UNLCKs
816 (explicits or automatically generated by Kernel to clean
817 current FLocks upon exit) that can't be trashed */
818 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
819 (einfo->ei_type == LDLM_FLOCK) &&
820 (einfo->ei_mode == LCK_NL))
825 obd_put_request_slot(&obddev->u.cli);
826 mdc_put_mod_rpc_slot(req, it);
829 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
830 obddev->obd_name, rc);
832 mdc_clear_replay_flag(req, rc);
833 ptlrpc_req_finished(req);
837 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
838 LASSERT(lockrep != NULL);
840 lockrep->lock_policy_res2 =
841 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
843 /* Retry infinitely when the server returns -EINPROGRESS for the
844 * intent operation, when server returns -EINPROGRESS for acquiring
845 * intent lock, we'll retry in after_reply(). */
846 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
847 mdc_clear_replay_flag(req, rc);
848 ptlrpc_req_finished(req);
851 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
852 obddev->obd_name, resends, it->it_op,
853 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
855 if (generation == obddev->u.cli.cl_import->imp_generation) {
858 CDEBUG(D_HA, "resend cross eviction\n");
863 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
865 if (lustre_handle_is_used(lockh)) {
866 ldlm_lock_decref(lockh, einfo->ei_mode);
867 memset(lockh, 0, sizeof(*lockh));
869 ptlrpc_req_finished(req);
871 it->it_lock_handle = 0;
872 it->it_lock_mode = 0;
873 it->it_request = NULL;
879 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
880 const union ldlm_policy_data *policy,
881 struct md_op_data *op_data,
882 struct lustre_handle *lockh, __u64 extra_lock_flags)
884 return mdc_enqueue_base(exp, einfo, policy, NULL,
885 op_data, lockh, extra_lock_flags);
888 static int mdc_finish_intent_lock(struct obd_export *exp,
889 struct ptlrpc_request *request,
890 struct md_op_data *op_data,
891 struct lookup_intent *it,
892 struct lustre_handle *lockh)
894 struct lustre_handle old_lock;
895 struct ldlm_lock *lock;
899 LASSERT(request != NULL);
900 LASSERT(request != LP_POISON);
901 LASSERT(request->rq_repmsg != LP_POISON);
903 if (it->it_op & IT_READDIR)
906 if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
907 if (it->it_status != 0)
908 GOTO(out, rc = it->it_status);
910 if (!it_disposition(it, DISP_IT_EXECD)) {
911 /* The server failed before it even started executing
912 * the intent, i.e. because it couldn't unpack the
915 LASSERT(it->it_status != 0);
916 GOTO(out, rc = it->it_status);
918 rc = it_open_error(DISP_IT_EXECD, it);
922 rc = it_open_error(DISP_LOOKUP_EXECD, it);
926 /* keep requests around for the multiple phases of the call
927 * this shows the DISP_XX must guarantee we make it into the
930 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
931 it_disposition(it, DISP_OPEN_CREATE) &&
932 !it_open_error(DISP_OPEN_CREATE, it)) {
933 it_set_disposition(it, DISP_ENQ_CREATE_REF);
934 /* balanced in ll_create_node */
935 ptlrpc_request_addref(request);
937 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
938 it_disposition(it, DISP_OPEN_OPEN) &&
939 !it_open_error(DISP_OPEN_OPEN, it)) {
940 it_set_disposition(it, DISP_ENQ_OPEN_REF);
941 /* balanced in ll_file_open */
942 ptlrpc_request_addref(request);
943 /* BUG 11546 - eviction in the middle of open rpc
946 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
950 if (it->it_op & IT_CREAT) {
951 /* XXX this belongs in ll_create_it */
952 } else if (it->it_op == IT_OPEN) {
953 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
955 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
959 /* If we already have a matching lock, then cancel the new
960 * one. We have to set the data here instead of in
961 * mdc_enqueue, because we need to use the child's inode as
962 * the l_ast_data to match, and that's not available until
963 * intent_finish has performed the iget().) */
964 lock = ldlm_handle2lock(lockh);
966 union ldlm_policy_data policy = lock->l_policy_data;
967 LDLM_DEBUG(lock, "matching against this");
969 if (it_has_reply_body(it)) {
970 struct mdt_body *body;
972 body = req_capsule_server_get(&request->rq_pill,
974 /* mdc_enqueue checked */
975 LASSERT(body != NULL);
976 LASSERTF(fid_res_name_eq(&body->mbo_fid1,
977 &lock->l_resource->lr_name),
978 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
979 PLDLMRES(lock->l_resource),
980 PFID(&body->mbo_fid1));
984 memcpy(&old_lock, lockh, sizeof(*lockh));
985 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
986 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
987 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
988 memcpy(lockh, &old_lock, sizeof(old_lock));
989 it->it_lock_handle = lockh->cookie;
995 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
996 (int)op_data->op_namelen, op_data->op_name,
997 ldlm_it2str(it->it_op), it->it_status,
998 it->it_disposition, rc);
1002 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1003 struct lu_fid *fid, __u64 *bits)
1005 /* We could just return 1 immediately, but since we should only
1006 * be called in revalidate_it if we already have a lock, let's
1008 struct ldlm_res_id res_id;
1009 struct lustre_handle lockh;
1010 union ldlm_policy_data policy;
1011 enum ldlm_mode mode;
1014 if (it->it_lock_handle) {
1015 lockh.cookie = it->it_lock_handle;
1016 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1018 fid_build_reg_res_name(fid, &res_id);
1019 switch (it->it_op) {
1021 /* File attributes are held under multiple bits:
1022 * nlink is under lookup lock, size and times are
1023 * under UPDATE lock and recently we've also got
1024 * a separate permissions lock for owner/group/acl that
1025 * were protected by lookup lock before.
1026 * Getattr must provide all of that information,
1027 * so we need to ensure we have all of those locks.
1028 * Unfortunately, if the bits are split across multiple
1029 * locks, there's no easy way to match all of them here,
1030 * so an extra RPC would be performed to fetch all
1031 * of those bits at once for now. */
1032 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1033 * but for old MDTs (< 2.4), permission is covered
1034 * by LOOKUP lock, so it needs to match all bits here.*/
1035 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1036 MDS_INODELOCK_LOOKUP |
1040 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1043 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1046 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1050 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1051 LDLM_IBITS, &policy,
1052 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1057 it->it_lock_handle = lockh.cookie;
1058 it->it_lock_mode = mode;
1060 it->it_lock_handle = 0;
1061 it->it_lock_mode = 0;
1068 * This long block is all about fixing up the lock and request state
1069 * so that it is correct as of the moment _before_ the operation was
1070 * applied; that way, the VFS will think that everything is normal and
1071 * call Lustre's regular VFS methods.
1073 * If we're performing a creation, that means that unless the creation
1074 * failed with EEXIST, we should fake up a negative dentry.
1076 * For everything else, we want to lookup to succeed.
1078 * One additional note: if CREATE or OPEN succeeded, we add an extra
1079 * reference to the request because we need to keep it around until
1080 * ll_create/ll_open gets called.
1082 * The server will return to us, in it_disposition, an indication of
1083 * exactly what it_status refers to.
1085 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1086 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1087 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1088 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1091 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1094 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1095 struct lookup_intent *it, struct ptlrpc_request **reqp,
1096 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1098 struct ldlm_enqueue_info einfo = {
1099 .ei_type = LDLM_IBITS,
1100 .ei_mode = it_to_lock_mode(it),
1101 .ei_cb_bl = cb_blocking,
1102 .ei_cb_cp = ldlm_completion_ast,
1104 struct lustre_handle lockh;
1109 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1110 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1111 op_data->op_name, PFID(&op_data->op_fid2),
1112 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1116 if (fid_is_sane(&op_data->op_fid2) &&
1117 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1118 /* We could just return 1 immediately, but since we should only
1119 * be called in revalidate_it if we already have a lock, let's
1121 it->it_lock_handle = 0;
1122 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1123 /* Only return failure if it was not GETATTR by cfid
1124 (from inode_revalidate) */
1125 if (rc || op_data->op_namelen != 0)
1129 /* For case if upper layer did not alloc fid, do it now. */
1130 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1131 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1133 CERROR("Can't alloc new fid, rc %d\n", rc);
1138 rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1143 *reqp = it->it_request;
1144 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1148 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1149 struct ptlrpc_request *req,
1152 struct mdc_getattr_args *ga = args;
1153 struct obd_export *exp = ga->ga_exp;
1154 struct md_enqueue_info *minfo = ga->ga_minfo;
1155 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1156 struct lookup_intent *it;
1157 struct lustre_handle *lockh;
1158 struct obd_device *obddev;
1159 struct ldlm_reply *lockrep;
1160 __u64 flags = LDLM_FL_HAS_INTENT;
1164 lockh = &minfo->mi_lockh;
1166 obddev = class_exp2obd(exp);
1168 obd_put_request_slot(&obddev->u.cli);
1169 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1172 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1173 &flags, NULL, 0, lockh, rc);
1175 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1176 mdc_clear_replay_flag(req, rc);
1180 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1181 LASSERT(lockrep != NULL);
1183 lockrep->lock_policy_res2 =
1184 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1186 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1190 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1194 minfo->mi_cb(req, minfo, rc);
1198 int mdc_intent_getattr_async(struct obd_export *exp,
1199 struct md_enqueue_info *minfo)
1201 struct md_op_data *op_data = &minfo->mi_data;
1202 struct lookup_intent *it = &minfo->mi_it;
1203 struct ptlrpc_request *req;
1204 struct mdc_getattr_args *ga;
1205 struct obd_device *obddev = class_exp2obd(exp);
1206 struct ldlm_res_id res_id;
1207 union ldlm_policy_data policy = {
1208 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1209 MDS_INODELOCK_UPDATE } };
1211 __u64 flags = LDLM_FL_HAS_INTENT;
1214 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1215 (int)op_data->op_namelen, op_data->op_name,
1216 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1218 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1219 req = mdc_intent_getattr_pack(exp, it, op_data);
1221 RETURN(PTR_ERR(req));
1223 rc = obd_get_request_slot(&obddev->u.cli);
1225 ptlrpc_req_finished(req);
1229 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1230 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1232 obd_put_request_slot(&obddev->u.cli);
1233 ptlrpc_req_finished(req);
1237 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1238 ga = ptlrpc_req_async_args(req);
1240 ga->ga_minfo = minfo;
1242 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1243 ptlrpcd_add_req(req);