4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2015, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
39 #include <linux/module.h>
42 #include <obd_class.h>
43 #include <lustre_dlm.h>
44 #include <lustre_fid.h>
45 #include <lustre_intent.h>
46 #include <lustre_mdc.h>
47 #include <lustre_net.h>
48 #include <lustre_req_layout.h>
49 #include <lustre_swab.h>
51 #include "mdc_internal.h"
53 struct mdc_getattr_args {
54 struct obd_export *ga_exp;
55 struct md_enqueue_info *ga_minfo;
58 int it_open_error(int phase, struct lookup_intent *it)
60 if (it_disposition(it, DISP_OPEN_LEASE)) {
61 if (phase >= DISP_OPEN_LEASE)
62 return it->d.lustre.it_status;
66 if (it_disposition(it, DISP_OPEN_OPEN)) {
67 if (phase >= DISP_OPEN_OPEN)
68 return it->d.lustre.it_status;
73 if (it_disposition(it, DISP_OPEN_CREATE)) {
74 if (phase >= DISP_OPEN_CREATE)
75 return it->d.lustre.it_status;
80 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
81 if (phase >= DISP_LOOKUP_EXECD)
82 return it->d.lustre.it_status;
87 if (it_disposition(it, DISP_IT_EXECD)) {
88 if (phase >= DISP_IT_EXECD)
89 return it->d.lustre.it_status;
93 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
94 it->d.lustre.it_status);
98 EXPORT_SYMBOL(it_open_error);
100 /* this must be called on a lockh that is known to have a referenced lock */
101 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
104 struct ldlm_lock *lock;
105 struct inode *new_inode = data;
114 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
116 LASSERT(lock != NULL);
117 lock_res_and_lock(lock);
118 if (lock->l_resource->lr_lvb_inode &&
119 lock->l_resource->lr_lvb_inode != data) {
120 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
121 LASSERTF(old_inode->i_state & I_FREEING,
122 "Found existing inode %p/%lu/%u state %lu in lock: "
123 "setting data to %p/%lu/%u\n", old_inode,
124 old_inode->i_ino, old_inode->i_generation,
126 new_inode, new_inode->i_ino, new_inode->i_generation);
128 lock->l_resource->lr_lvb_inode = new_inode;
130 *bits = lock->l_policy_data.l_inodebits.bits;
132 unlock_res_and_lock(lock);
138 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
139 const struct lu_fid *fid, enum ldlm_type type,
140 union ldlm_policy_data *policy,
141 enum ldlm_mode mode, struct lustre_handle *lockh)
143 struct ldlm_res_id res_id;
147 fid_build_reg_res_name(fid, &res_id);
148 /* LU-4405: Clear bits not supported by server */
149 policy->l_inodebits.bits &= exp_connect_ibits(exp);
150 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
151 &res_id, type, policy, mode, lockh, 0);
155 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
156 union ldlm_policy_data *policy, enum ldlm_mode mode,
157 enum ldlm_cancel_flags flags, void *opaque)
159 struct obd_device *obd = class_exp2obd(exp);
160 struct ldlm_res_id res_id;
165 fid_build_reg_res_name(fid, &res_id);
166 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
167 policy, mode, flags, opaque);
171 int mdc_null_inode(struct obd_export *exp,
172 const struct lu_fid *fid)
174 struct ldlm_res_id res_id;
175 struct ldlm_resource *res;
176 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
179 LASSERTF(ns != NULL, "no namespace passed\n");
181 fid_build_reg_res_name(fid, &res_id);
183 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
188 res->lr_lvb_inode = NULL;
191 ldlm_resource_putref(res);
195 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
197 /* Don't hold error requests for replay. */
198 if (req->rq_replay) {
199 spin_lock(&req->rq_lock);
201 spin_unlock(&req->rq_lock);
203 if (rc && req->rq_transno != 0) {
204 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
209 /* Save a large LOV EA into the request buffer so that it is available
210 * for replay. We don't do this in the initial request because the
211 * original request doesn't need this buffer (at most it sends just the
212 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
213 * buffer and may also be difficult to allocate and save a very large
214 * request buffer for each open. (bug 5707)
216 * OOM here may cause recovery failure if lmm is needed (only for the
217 * original open if the MDS crashed just when this client also OOM'd)
218 * but this is incredibly unlikely, and questionable whether the client
219 * could do MDS recovery under OOM anyways... */
220 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
221 struct mdt_body *body)
225 /* FIXME: remove this explicit offset. */
226 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
227 body->mbo_eadatasize);
229 CERROR("Can't enlarge segment %d size to %d\n",
230 DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
231 body->mbo_valid &= ~OBD_MD_FLEASIZE;
232 body->mbo_eadatasize = 0;
236 static struct ptlrpc_request *
237 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
238 struct md_op_data *op_data)
240 struct ptlrpc_request *req;
241 struct obd_device *obddev = class_exp2obd(exp);
242 struct ldlm_intent *lit;
243 const void *lmm = op_data->op_data;
244 __u32 lmmsize = op_data->op_data_size;
245 struct list_head cancels = LIST_HEAD_INIT(cancels);
251 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
253 /* XXX: openlock is not cancelled for cross-refs. */
254 /* If inode is known, cancel conflicting OPEN locks. */
255 if (fid_is_sane(&op_data->op_fid2)) {
256 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
257 if (it->it_flags & FMODE_WRITE)
262 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
265 else if (it->it_flags & FMODE_EXEC)
271 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
276 /* If CREATE, cancel parent's UPDATE lock. */
277 if (it->it_op & IT_CREAT)
281 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
283 MDS_INODELOCK_UPDATE);
285 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
286 &RQF_LDLM_INTENT_OPEN);
288 ldlm_lock_list_put(&cancels, l_bl_ast, count);
289 RETURN(ERR_PTR(-ENOMEM));
292 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
293 op_data->op_namelen + 1);
294 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
295 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
297 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
299 ptlrpc_request_free(req);
303 spin_lock(&req->rq_lock);
304 req->rq_replay = req->rq_import->imp_replayable;
305 spin_unlock(&req->rq_lock);
307 /* pack the intent */
308 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
309 lit->opc = (__u64)it->it_op;
311 /* pack the intended request */
312 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
315 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
316 obddev->u.cli.cl_max_mds_easize);
318 /* for remote client, fetch remote perm for current user */
319 if (client_is_remote(exp))
320 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
321 sizeof(struct mdt_remote_perm));
322 ptlrpc_request_set_replen(req);
326 static struct ptlrpc_request *
327 mdc_intent_getxattr_pack(struct obd_export *exp,
328 struct lookup_intent *it,
329 struct md_op_data *op_data)
331 struct ptlrpc_request *req;
332 struct ldlm_intent *lit;
335 struct list_head cancels = LIST_HEAD_INIT(cancels);
339 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
340 &RQF_LDLM_INTENT_GETXATTR);
342 RETURN(ERR_PTR(-ENOMEM));
344 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
346 ptlrpc_request_free(req);
350 /* pack the intent */
351 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
352 lit->opc = IT_GETXATTR;
354 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
356 /* pack the intended request */
357 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
360 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
361 RCL_SERVER, maxdata);
363 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
364 RCL_SERVER, maxdata);
366 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
367 RCL_SERVER, maxdata);
369 ptlrpc_request_set_replen(req);
374 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
375 struct lookup_intent *it,
376 struct md_op_data *op_data)
378 struct ptlrpc_request *req;
379 struct obd_device *obddev = class_exp2obd(exp);
380 struct ldlm_intent *lit;
384 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
385 &RQF_LDLM_INTENT_UNLINK);
387 RETURN(ERR_PTR(-ENOMEM));
389 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
390 op_data->op_namelen + 1);
392 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
394 ptlrpc_request_free(req);
398 /* pack the intent */
399 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
400 lit->opc = (__u64)it->it_op;
402 /* pack the intended request */
403 mdc_unlink_pack(req, op_data);
405 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
406 obddev->u.cli.cl_default_mds_easize);
407 ptlrpc_request_set_replen(req);
411 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
412 struct lookup_intent *it,
413 struct md_op_data *op_data)
415 struct ptlrpc_request *req;
416 struct obd_device *obddev = class_exp2obd(exp);
417 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
418 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
420 (client_is_remote(exp) ?
421 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
422 struct ldlm_intent *lit;
427 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
428 &RQF_LDLM_INTENT_GETATTR);
430 RETURN(ERR_PTR(-ENOMEM));
432 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
433 op_data->op_namelen + 1);
435 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
437 ptlrpc_request_free(req);
441 /* pack the intent */
442 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
443 lit->opc = (__u64)it->it_op;
445 if (obddev->u.cli.cl_default_mds_easize > 0)
446 easize = obddev->u.cli.cl_default_mds_easize;
448 easize = obddev->u.cli.cl_max_mds_easize;
450 /* pack the intended request */
451 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
453 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
454 if (client_is_remote(exp))
455 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
456 sizeof(struct mdt_remote_perm));
457 ptlrpc_request_set_replen(req);
461 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
462 struct lookup_intent *it,
463 struct md_op_data *unused)
465 struct obd_device *obd = class_exp2obd(exp);
466 struct ptlrpc_request *req;
467 struct ldlm_intent *lit;
468 struct layout_intent *layout;
472 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
473 &RQF_LDLM_INTENT_LAYOUT);
475 RETURN(ERR_PTR(-ENOMEM));
477 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
478 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
480 ptlrpc_request_free(req);
484 /* pack the intent */
485 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
486 lit->opc = (__u64)it->it_op;
488 /* pack the layout intent request */
489 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
490 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
491 * set for replication */
492 layout->li_opc = LAYOUT_INTENT_ACCESS;
494 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
495 obd->u.cli.cl_default_mds_easize);
496 ptlrpc_request_set_replen(req);
500 static struct ptlrpc_request *
501 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
503 struct ptlrpc_request *req;
507 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
509 RETURN(ERR_PTR(-ENOMEM));
511 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
513 ptlrpc_request_free(req);
517 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
518 ptlrpc_request_set_replen(req);
522 static int mdc_finish_enqueue(struct obd_export *exp,
523 struct ptlrpc_request *req,
524 struct ldlm_enqueue_info *einfo,
525 struct lookup_intent *it,
526 struct lustre_handle *lockh,
529 struct req_capsule *pill = &req->rq_pill;
530 struct ldlm_request *lockreq;
531 struct ldlm_reply *lockrep;
532 struct lustre_intent_data *intent = &it->d.lustre;
533 struct ldlm_lock *lock;
534 void *lvb_data = NULL;
539 /* Similarly, if we're going to replay this request, we don't want to
540 * actually get a lock, just perform the intent. */
541 if (req->rq_transno || req->rq_replay) {
542 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
543 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
546 if (rc == ELDLM_LOCK_ABORTED) {
548 memset(lockh, 0, sizeof(*lockh));
550 } else { /* rc = 0 */
551 lock = ldlm_handle2lock(lockh);
552 LASSERT(lock != NULL);
554 /* If the server gave us back a different lock mode, we should
555 * fix up our variables. */
556 if (lock->l_req_mode != einfo->ei_mode) {
557 ldlm_lock_addref(lockh, lock->l_req_mode);
558 ldlm_lock_decref(lockh, einfo->ei_mode);
559 einfo->ei_mode = lock->l_req_mode;
564 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
565 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
567 intent->it_disposition = (int)lockrep->lock_policy_res1;
568 intent->it_status = (int)lockrep->lock_policy_res2;
569 intent->it_lock_mode = einfo->ei_mode;
570 intent->it_lock_handle = lockh->cookie;
571 intent->it_data = req;
573 /* Technically speaking rq_transno must already be zero if
574 * it_status is in error, so the check is a bit redundant */
575 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
576 mdc_clear_replay_flag(req, intent->it_status);
578 /* If we're doing an IT_OPEN which did not result in an actual
579 * successful open, then we need to remove the bit which saves
580 * this request for unconditional replay.
582 * It's important that we do this first! Otherwise we might exit the
583 * function without doing so, and try to replay a failed create
585 if (it->it_op & IT_OPEN && req->rq_replay &&
586 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
587 mdc_clear_replay_flag(req, intent->it_status);
589 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
590 it->it_op, intent->it_disposition, intent->it_status);
592 /* We know what to expect, so we do any byte flipping required here */
593 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
594 struct mdt_body *body;
596 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
598 CERROR ("Can't swab mdt_body\n");
602 if (it_disposition(it, DISP_OPEN_OPEN) &&
603 !it_open_error(DISP_OPEN_OPEN, it)) {
605 * If this is a successful OPEN request, we need to set
606 * replay handler and data early, so that if replay
607 * happens immediately after swabbing below, new reply
608 * is swabbed by that handler correctly.
610 mdc_set_open_replay_data(NULL, NULL, it);
613 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
616 mdc_update_max_ea_from_body(exp, body);
619 * The eadata is opaque; just check that it is there.
620 * Eventually, obd_unpackmd() will check the contents.
622 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
623 body->mbo_eadatasize);
627 /* save lvb data and length in case this is for layout
630 lvb_len = body->mbo_eadatasize;
633 * We save the reply LOV EA in case we have to replay a
634 * create for recovery. If we didn't allocate a large
635 * enough request buffer above we need to reallocate it
636 * here to hold the actual LOV EA.
638 * To not save LOV EA if request is not going to replay
639 * (for example error one).
641 if ((it->it_op & IT_OPEN) && req->rq_replay) {
643 if (req_capsule_get_size(pill, &RMF_EADATA,
645 body->mbo_eadatasize)
646 mdc_realloc_openmsg(req, body);
648 req_capsule_shrink(pill, &RMF_EADATA,
649 body->mbo_eadatasize,
652 req_capsule_set_size(pill, &RMF_EADATA,
654 body->mbo_eadatasize);
656 lmm = req_capsule_client_get(pill, &RMF_EADATA);
659 body->mbo_eadatasize);
663 if (body->mbo_valid & OBD_MD_FLRMTPERM) {
664 struct mdt_remote_perm *perm;
666 LASSERT(client_is_remote(exp));
667 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
668 lustre_swab_mdt_remote_perm);
672 } else if (it->it_op & IT_LAYOUT) {
673 /* maybe the lock was granted right away and layout
674 * is packed into RMF_DLM_LVB of req */
675 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
677 lvb_data = req_capsule_server_sized_get(pill,
678 &RMF_DLM_LVB, lvb_len);
679 if (lvb_data == NULL)
684 /* fill in stripe data for layout lock.
685 * LU-6581: trust layout data only if layout lock is granted. The MDT
686 * has stopped sending layout unless the layout lock is granted. The
687 * client still does this checking in case it's talking with an old
688 * server. - Jinshan */
689 lock = ldlm_handle2lock(lockh);
690 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
691 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
694 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
695 ldlm_it2str(it->it_op), lvb_len);
697 OBD_ALLOC_LARGE(lmm, lvb_len);
702 memcpy(lmm, lvb_data, lvb_len);
704 /* install lvb_data */
705 lock_res_and_lock(lock);
706 if (lock->l_lvb_data == NULL) {
707 lock->l_lvb_type = LVB_T_LAYOUT;
708 lock->l_lvb_data = lmm;
709 lock->l_lvb_len = lvb_len;
712 unlock_res_and_lock(lock);
714 OBD_FREE_LARGE(lmm, lvb_len);
722 /* We always reserve enough space in the reply packet for a stripe MD, because
723 * we don't know in advance the file type. */
724 int mdc_enqueue(struct obd_export *exp,
725 struct ldlm_enqueue_info *einfo,
726 const union ldlm_policy_data *policy,
727 struct lookup_intent *it, struct md_op_data *op_data,
728 struct lustre_handle *lockh, __u64 extra_lock_flags)
730 struct obd_device *obddev = class_exp2obd(exp);
731 struct ptlrpc_request *req = NULL;
732 __u64 flags, saved_flags = extra_lock_flags;
733 struct ldlm_res_id res_id;
734 static const union ldlm_policy_data lookup_policy = {
735 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
736 static const union ldlm_policy_data update_policy = {
737 .l_inodebits = { MDS_INODELOCK_UPDATE } };
738 static const union ldlm_policy_data layout_policy = {
739 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
740 static const union ldlm_policy_data getxattr_policy = {
741 .l_inodebits = { MDS_INODELOCK_XATTR } };
742 int generation, resends = 0;
743 struct ldlm_reply *lockrep;
744 enum lvb_type lvb_type = 0;
748 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
750 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
753 LASSERT(policy == NULL);
755 saved_flags |= LDLM_FL_HAS_INTENT;
756 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
757 policy = &update_policy;
758 else if (it->it_op & IT_LAYOUT)
759 policy = &layout_policy;
760 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
761 policy = &getxattr_policy;
763 policy = &lookup_policy;
766 generation = obddev->u.cli.cl_import->imp_generation;
770 /* The only way right now is FLOCK. */
771 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
773 res_id.name[3] = LDLM_FLOCK;
774 } else if (it->it_op & IT_OPEN) {
775 req = mdc_intent_open_pack(exp, it, op_data);
776 } else if (it->it_op & IT_UNLINK) {
777 req = mdc_intent_unlink_pack(exp, it, op_data);
778 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
779 req = mdc_intent_getattr_pack(exp, it, op_data);
780 } else if (it->it_op & IT_READDIR) {
781 req = mdc_enqueue_pack(exp, 0);
782 } else if (it->it_op & IT_LAYOUT) {
783 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
785 req = mdc_intent_layout_pack(exp, it, op_data);
786 lvb_type = LVB_T_LAYOUT;
787 } else if (it->it_op & IT_GETXATTR) {
788 req = mdc_intent_getxattr_pack(exp, it, op_data);
795 RETURN(PTR_ERR(req));
798 req->rq_generation_set = 1;
799 req->rq_import_generation = generation;
800 req->rq_sent = cfs_time_current_sec() + resends;
803 /* It is important to obtain modify RPC slot first (if applicable), so
804 * that threads that are waiting for a modify RPC slot are not polluting
805 * our rpcs in flight counter.
806 * We do not do flock request limiting, though */
808 mdc_get_mod_rpc_slot(req, it);
809 rc = obd_get_request_slot(&obddev->u.cli);
811 mdc_put_mod_rpc_slot(req, it);
812 mdc_clear_replay_flag(req, 0);
813 ptlrpc_req_finished(req);
818 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
819 0, lvb_type, lockh, 0);
821 /* For flock requests we immediatelly return without further
822 delay and let caller deal with the rest, since rest of
823 this function metadata processing makes no sense for flock
824 requests anyway. But in case of problem during comms with
825 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
826 can not rely on caller and this mainly for F_UNLCKs
827 (explicits or automatically generated by Kernel to clean
828 current FLocks upon exit) that can't be trashed */
829 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
830 (einfo->ei_type == LDLM_FLOCK) &&
831 (einfo->ei_mode == LCK_NL))
836 obd_put_request_slot(&obddev->u.cli);
837 mdc_put_mod_rpc_slot(req, it);
840 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
841 obddev->obd_name, rc);
843 mdc_clear_replay_flag(req, rc);
844 ptlrpc_req_finished(req);
848 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
849 LASSERT(lockrep != NULL);
851 lockrep->lock_policy_res2 =
852 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
854 /* Retry infinitely when the server returns -EINPROGRESS for the
855 * intent operation, when server returns -EINPROGRESS for acquiring
856 * intent lock, we'll retry in after_reply(). */
857 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
858 mdc_clear_replay_flag(req, rc);
859 ptlrpc_req_finished(req);
862 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
863 obddev->obd_name, resends, it->it_op,
864 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
866 if (generation == obddev->u.cli.cl_import->imp_generation) {
869 CDEBUG(D_HA, "resend cross eviction\n");
874 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
876 if (lustre_handle_is_used(lockh)) {
877 ldlm_lock_decref(lockh, einfo->ei_mode);
878 memset(lockh, 0, sizeof(*lockh));
880 ptlrpc_req_finished(req);
882 it->d.lustre.it_lock_handle = 0;
883 it->d.lustre.it_lock_mode = 0;
884 it->d.lustre.it_data = NULL;
890 static int mdc_finish_intent_lock(struct obd_export *exp,
891 struct ptlrpc_request *request,
892 struct md_op_data *op_data,
893 struct lookup_intent *it,
894 struct lustre_handle *lockh)
896 struct lustre_handle old_lock;
897 struct mdt_body *mdt_body;
898 struct ldlm_lock *lock;
902 LASSERT(request != NULL);
903 LASSERT(request != LP_POISON);
904 LASSERT(request->rq_repmsg != LP_POISON);
906 if (it->it_op & IT_READDIR)
909 if (!it_disposition(it, DISP_IT_EXECD)) {
910 /* The server failed before it even started executing the
911 * intent, i.e. because it couldn't unpack the request. */
912 LASSERT(it->d.lustre.it_status != 0);
913 RETURN(it->d.lustre.it_status);
915 rc = it_open_error(DISP_IT_EXECD, it);
919 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
920 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
922 rc = it_open_error(DISP_LOOKUP_EXECD, it);
926 /* keep requests around for the multiple phases of the call
927 * this shows the DISP_XX must guarantee we make it into the call
929 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
930 it_disposition(it, DISP_OPEN_CREATE) &&
931 !it_open_error(DISP_OPEN_CREATE, it)) {
932 it_set_disposition(it, DISP_ENQ_CREATE_REF);
933 ptlrpc_request_addref(request); /* balanced in ll_create_node */
935 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
936 it_disposition(it, DISP_OPEN_OPEN) &&
937 !it_open_error(DISP_OPEN_OPEN, it)) {
938 it_set_disposition(it, DISP_ENQ_OPEN_REF);
939 ptlrpc_request_addref(request); /* balanced in ll_file_open */
940 /* BUG 11546 - eviction in the middle of open rpc processing */
941 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
944 if (it->it_op & IT_CREAT) {
945 /* XXX this belongs in ll_create_it */
946 } else if (it->it_op == IT_OPEN) {
947 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
949 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
952 /* If we already have a matching lock, then cancel the new
953 * one. We have to set the data here instead of in
954 * mdc_enqueue, because we need to use the child's inode as
955 * the l_ast_data to match, and that's not available until
956 * intent_finish has performed the iget().) */
957 lock = ldlm_handle2lock(lockh);
959 union ldlm_policy_data policy = lock->l_policy_data;
960 LDLM_DEBUG(lock, "matching against this");
962 LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1,
963 &lock->l_resource->lr_name),
964 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
965 PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1));
968 memcpy(&old_lock, lockh, sizeof(*lockh));
969 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
970 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
971 ldlm_lock_decref_and_cancel(lockh,
972 it->d.lustre.it_lock_mode);
973 memcpy(lockh, &old_lock, sizeof(old_lock));
974 it->d.lustre.it_lock_handle = lockh->cookie;
977 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
978 (int)op_data->op_namelen, op_data->op_name,
979 ldlm_it2str(it->it_op), it->d.lustre.it_status,
980 it->d.lustre.it_disposition, rc);
984 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
985 struct lu_fid *fid, __u64 *bits)
987 /* We could just return 1 immediately, but since we should only
988 * be called in revalidate_it if we already have a lock, let's
990 struct ldlm_res_id res_id;
991 struct lustre_handle lockh;
992 union ldlm_policy_data policy;
996 if (it->d.lustre.it_lock_handle) {
997 lockh.cookie = it->d.lustre.it_lock_handle;
998 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1000 fid_build_reg_res_name(fid, &res_id);
1001 switch (it->it_op) {
1003 /* File attributes are held under multiple bits:
1004 * nlink is under lookup lock, size and times are
1005 * under UPDATE lock and recently we've also got
1006 * a separate permissions lock for owner/group/acl that
1007 * were protected by lookup lock before.
1008 * Getattr must provide all of that information,
1009 * so we need to ensure we have all of those locks.
1010 * Unfortunately, if the bits are split across multiple
1011 * locks, there's no easy way to match all of them here,
1012 * so an extra RPC would be performed to fetch all
1013 * of those bits at once for now. */
1014 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1015 * but for old MDTs (< 2.4), permission is covered
1016 * by LOOKUP lock, so it needs to match all bits here.*/
1017 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1018 MDS_INODELOCK_LOOKUP |
1022 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1025 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1028 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1032 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1033 LDLM_IBITS, &policy,
1034 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1039 it->d.lustre.it_lock_handle = lockh.cookie;
1040 it->d.lustre.it_lock_mode = mode;
1042 it->d.lustre.it_lock_handle = 0;
1043 it->d.lustre.it_lock_mode = 0;
1050 * This long block is all about fixing up the lock and request state
1051 * so that it is correct as of the moment _before_ the operation was
1052 * applied; that way, the VFS will think that everything is normal and
1053 * call Lustre's regular VFS methods.
1055 * If we're performing a creation, that means that unless the creation
1056 * failed with EEXIST, we should fake up a negative dentry.
1058 * For everything else, we want to lookup to succeed.
1060 * One additional note: if CREATE or OPEN succeeded, we add an extra
1061 * reference to the request because we need to keep it around until
1062 * ll_create/ll_open gets called.
1064 * The server will return to us, in it_disposition, an indication of
1065 * exactly what d.lustre.it_status refers to.
1067 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1068 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1069 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1070 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1073 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1076 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1077 struct lookup_intent *it, struct ptlrpc_request **reqp,
1078 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1080 struct ldlm_enqueue_info einfo = {
1081 .ei_type = LDLM_IBITS,
1082 .ei_mode = it_to_lock_mode(it),
1083 .ei_cb_bl = cb_blocking,
1084 .ei_cb_cp = ldlm_completion_ast,
1086 struct lustre_handle lockh;
1091 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1092 ", intent: %s flags %#"LPF64"o\n", (int)op_data->op_namelen,
1093 op_data->op_name, PFID(&op_data->op_fid2),
1094 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1098 if (fid_is_sane(&op_data->op_fid2) &&
1099 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1100 /* We could just return 1 immediately, but since we should only
1101 * be called in revalidate_it if we already have a lock, let's
1103 it->d.lustre.it_lock_handle = 0;
1104 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1105 /* Only return failure if it was not GETATTR by cfid
1106 (from inode_revalidate) */
1107 if (rc || op_data->op_namelen != 0)
1111 /* For case if upper layer did not alloc fid, do it now. */
1112 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1113 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1115 CERROR("Can't alloc new fid, rc %d\n", rc);
1120 rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
1125 *reqp = it->d.lustre.it_data;
1126 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1130 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1131 struct ptlrpc_request *req,
1134 struct mdc_getattr_args *ga = args;
1135 struct obd_export *exp = ga->ga_exp;
1136 struct md_enqueue_info *minfo = ga->ga_minfo;
1137 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1138 struct lookup_intent *it;
1139 struct lustre_handle *lockh;
1140 struct obd_device *obddev;
1141 struct ldlm_reply *lockrep;
1142 __u64 flags = LDLM_FL_HAS_INTENT;
1146 lockh = &minfo->mi_lockh;
1148 obddev = class_exp2obd(exp);
1150 obd_put_request_slot(&obddev->u.cli);
1151 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1154 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1155 &flags, NULL, 0, lockh, rc);
1157 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1158 mdc_clear_replay_flag(req, rc);
1162 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1163 LASSERT(lockrep != NULL);
1165 lockrep->lock_policy_res2 =
1166 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1168 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1172 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1176 minfo->mi_cb(req, minfo, rc);
1180 int mdc_intent_getattr_async(struct obd_export *exp,
1181 struct md_enqueue_info *minfo)
1183 struct md_op_data *op_data = &minfo->mi_data;
1184 struct lookup_intent *it = &minfo->mi_it;
1185 struct ptlrpc_request *req;
1186 struct mdc_getattr_args *ga;
1187 struct obd_device *obddev = class_exp2obd(exp);
1188 struct ldlm_res_id res_id;
1189 union ldlm_policy_data policy = {
1190 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1191 MDS_INODELOCK_UPDATE } };
1193 __u64 flags = LDLM_FL_HAS_INTENT;
1196 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#"
1198 (int)op_data->op_namelen, op_data->op_name,
1199 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1201 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1202 req = mdc_intent_getattr_pack(exp, it, op_data);
1204 RETURN(PTR_ERR(req));
1206 rc = obd_get_request_slot(&obddev->u.cli);
1208 ptlrpc_req_finished(req);
1212 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1213 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1215 obd_put_request_slot(&obddev->u.cli);
1216 ptlrpc_req_finished(req);
1220 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1221 ga = ptlrpc_req_async_args(req);
1223 ga->ga_minfo = minfo;
1225 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1226 ptlrpcd_add_req(req);