4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2015, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_MDC
35 #include <linux/module.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
47 #include "mdc_internal.h"
49 struct mdc_getattr_args {
50 struct obd_export *ga_exp;
51 struct md_enqueue_info *ga_minfo;
54 int it_open_error(int phase, struct lookup_intent *it)
56 if (it_disposition(it, DISP_OPEN_LEASE)) {
57 if (phase >= DISP_OPEN_LEASE)
62 if (it_disposition(it, DISP_OPEN_OPEN)) {
63 if (phase >= DISP_OPEN_OPEN)
69 if (it_disposition(it, DISP_OPEN_CREATE)) {
70 if (phase >= DISP_OPEN_CREATE)
76 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
77 if (phase >= DISP_LOOKUP_EXECD)
83 if (it_disposition(it, DISP_IT_EXECD)) {
84 if (phase >= DISP_IT_EXECD)
90 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
95 EXPORT_SYMBOL(it_open_error);
97 /* this must be called on a lockh that is known to have a referenced lock */
98 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
99 void *data, __u64 *bits)
101 struct ldlm_lock *lock;
102 struct inode *new_inode = data;
108 if (!lustre_handle_is_used(lockh))
111 lock = ldlm_handle2lock(lockh);
113 LASSERT(lock != NULL);
114 lock_res_and_lock(lock);
115 if (lock->l_resource->lr_lvb_inode &&
116 lock->l_resource->lr_lvb_inode != data) {
117 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
118 LASSERTF(old_inode->i_state & I_FREEING,
119 "Found existing inode %p/%lu/%u state %lu in lock: "
120 "setting data to %p/%lu/%u\n", old_inode,
121 old_inode->i_ino, old_inode->i_generation,
123 new_inode, new_inode->i_ino, new_inode->i_generation);
125 lock->l_resource->lr_lvb_inode = new_inode;
127 *bits = lock->l_policy_data.l_inodebits.bits;
129 unlock_res_and_lock(lock);
135 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
136 const struct lu_fid *fid, enum ldlm_type type,
137 union ldlm_policy_data *policy,
138 enum ldlm_mode mode, struct lustre_handle *lockh)
140 struct ldlm_res_id res_id;
144 fid_build_reg_res_name(fid, &res_id);
145 /* LU-4405: Clear bits not supported by server */
146 policy->l_inodebits.bits &= exp_connect_ibits(exp);
147 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
148 &res_id, type, policy, mode, lockh, 0);
152 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
153 union ldlm_policy_data *policy, enum ldlm_mode mode,
154 enum ldlm_cancel_flags flags, void *opaque)
156 struct obd_device *obd = class_exp2obd(exp);
157 struct ldlm_res_id res_id;
162 fid_build_reg_res_name(fid, &res_id);
163 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
164 policy, mode, flags, opaque);
168 int mdc_null_inode(struct obd_export *exp,
169 const struct lu_fid *fid)
171 struct ldlm_res_id res_id;
172 struct ldlm_resource *res;
173 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
176 LASSERTF(ns != NULL, "no namespace passed\n");
178 fid_build_reg_res_name(fid, &res_id);
180 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
185 res->lr_lvb_inode = NULL;
188 ldlm_resource_putref(res);
192 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
194 /* Don't hold error requests for replay. */
195 if (req->rq_replay) {
196 spin_lock(&req->rq_lock);
198 spin_unlock(&req->rq_lock);
200 if (rc && req->rq_transno != 0) {
201 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
206 /* Save a large LOV EA into the request buffer so that it is available
207 * for replay. We don't do this in the initial request because the
208 * original request doesn't need this buffer (at most it sends just the
209 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
210 * buffer and may also be difficult to allocate and save a very large
211 * request buffer for each open. (bug 5707)
213 * OOM here may cause recovery failure if lmm is needed (only for the
214 * original open if the MDS crashed just when this client also OOM'd)
215 * but this is incredibly unlikely, and questionable whether the client
216 * could do MDS recovery under OOM anyways... */
217 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
218 struct mdt_body *body)
222 /* FIXME: remove this explicit offset. */
223 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
224 body->mbo_eadatasize);
226 CERROR("Can't enlarge segment %d size to %d\n",
227 DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
228 body->mbo_valid &= ~OBD_MD_FLEASIZE;
229 body->mbo_eadatasize = 0;
233 static struct ptlrpc_request *
234 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
235 struct md_op_data *op_data)
237 struct ptlrpc_request *req;
238 struct obd_device *obddev = class_exp2obd(exp);
239 struct ldlm_intent *lit;
240 const void *lmm = op_data->op_data;
241 __u32 lmmsize = op_data->op_data_size;
242 struct list_head cancels = LIST_HEAD_INIT(cancels);
248 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
250 /* XXX: openlock is not cancelled for cross-refs. */
251 /* If inode is known, cancel conflicting OPEN locks. */
252 if (fid_is_sane(&op_data->op_fid2)) {
253 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
254 if (it->it_flags & FMODE_WRITE)
259 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
262 else if (it->it_flags & FMODE_EXEC)
268 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
273 /* If CREATE, cancel parent's UPDATE lock. */
274 if (it->it_op & IT_CREAT)
278 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
280 MDS_INODELOCK_UPDATE);
282 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
283 &RQF_LDLM_INTENT_OPEN);
285 ldlm_lock_list_put(&cancels, l_bl_ast, count);
286 RETURN(ERR_PTR(-ENOMEM));
289 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
290 op_data->op_namelen + 1);
291 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
292 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
294 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
295 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
296 strlen(op_data->op_file_secctx_name) + 1 : 0);
298 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
299 op_data->op_file_secctx_size);
301 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
303 ptlrpc_request_free(req);
307 spin_lock(&req->rq_lock);
308 req->rq_replay = req->rq_import->imp_replayable;
309 spin_unlock(&req->rq_lock);
311 /* pack the intent */
312 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
313 lit->opc = (__u64)it->it_op;
315 /* pack the intended request */
316 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
319 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
320 obddev->u.cli.cl_max_mds_easize);
321 ptlrpc_request_set_replen(req);
325 static struct ptlrpc_request *
326 mdc_intent_getxattr_pack(struct obd_export *exp,
327 struct lookup_intent *it,
328 struct md_op_data *op_data)
330 struct ptlrpc_request *req;
331 struct ldlm_intent *lit;
334 struct list_head cancels = LIST_HEAD_INIT(cancels);
338 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
339 &RQF_LDLM_INTENT_GETXATTR);
341 RETURN(ERR_PTR(-ENOMEM));
343 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
345 ptlrpc_request_free(req);
349 /* pack the intent */
350 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
351 lit->opc = IT_GETXATTR;
353 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
355 /* pack the intended request */
356 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
359 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
360 RCL_SERVER, maxdata);
362 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
363 RCL_SERVER, maxdata);
365 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
366 RCL_SERVER, maxdata);
368 ptlrpc_request_set_replen(req);
373 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
374 struct lookup_intent *it,
375 struct md_op_data *op_data)
377 struct ptlrpc_request *req;
378 struct obd_device *obddev = class_exp2obd(exp);
379 struct ldlm_intent *lit;
383 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
384 &RQF_LDLM_INTENT_UNLINK);
386 RETURN(ERR_PTR(-ENOMEM));
388 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
389 op_data->op_namelen + 1);
391 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
393 ptlrpc_request_free(req);
397 /* pack the intent */
398 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
399 lit->opc = (__u64)it->it_op;
401 /* pack the intended request */
402 mdc_unlink_pack(req, op_data);
404 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
405 obddev->u.cli.cl_default_mds_easize);
406 ptlrpc_request_set_replen(req);
410 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
411 struct lookup_intent *it,
412 struct md_op_data *op_data)
414 struct ptlrpc_request *req;
415 struct obd_device *obddev = class_exp2obd(exp);
416 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
417 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
418 OBD_MD_MEA | OBD_MD_FLACL;
419 struct ldlm_intent *lit;
424 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
425 &RQF_LDLM_INTENT_GETATTR);
427 RETURN(ERR_PTR(-ENOMEM));
429 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
430 op_data->op_namelen + 1);
432 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
434 ptlrpc_request_free(req);
438 /* pack the intent */
439 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
440 lit->opc = (__u64)it->it_op;
442 if (obddev->u.cli.cl_default_mds_easize > 0)
443 easize = obddev->u.cli.cl_default_mds_easize;
445 easize = obddev->u.cli.cl_max_mds_easize;
447 /* pack the intended request */
448 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
450 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
451 ptlrpc_request_set_replen(req);
455 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
456 struct lookup_intent *it,
457 struct md_op_data *unused)
459 struct obd_device *obd = class_exp2obd(exp);
460 struct ptlrpc_request *req;
461 struct ldlm_intent *lit;
462 struct layout_intent *layout;
466 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
467 &RQF_LDLM_INTENT_LAYOUT);
469 RETURN(ERR_PTR(-ENOMEM));
471 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
472 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
474 ptlrpc_request_free(req);
478 /* pack the intent */
479 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
480 lit->opc = (__u64)it->it_op;
482 /* pack the layout intent request */
483 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
484 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
485 * set for replication */
486 layout->li_opc = LAYOUT_INTENT_ACCESS;
488 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
489 obd->u.cli.cl_default_mds_easize);
490 ptlrpc_request_set_replen(req);
494 static struct ptlrpc_request *
495 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
497 struct ptlrpc_request *req;
501 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
503 RETURN(ERR_PTR(-ENOMEM));
505 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
507 ptlrpc_request_free(req);
511 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
512 ptlrpc_request_set_replen(req);
516 static int mdc_finish_enqueue(struct obd_export *exp,
517 struct ptlrpc_request *req,
518 struct ldlm_enqueue_info *einfo,
519 struct lookup_intent *it,
520 struct lustre_handle *lockh,
523 struct req_capsule *pill = &req->rq_pill;
524 struct ldlm_request *lockreq;
525 struct ldlm_reply *lockrep;
526 struct ldlm_lock *lock;
527 void *lvb_data = NULL;
532 /* Similarly, if we're going to replay this request, we don't want to
533 * actually get a lock, just perform the intent. */
534 if (req->rq_transno || req->rq_replay) {
535 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
536 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
539 if (rc == ELDLM_LOCK_ABORTED) {
541 memset(lockh, 0, sizeof(*lockh));
543 } else { /* rc = 0 */
544 lock = ldlm_handle2lock(lockh);
545 LASSERT(lock != NULL);
547 /* If the server gave us back a different lock mode, we should
548 * fix up our variables. */
549 if (lock->l_req_mode != einfo->ei_mode) {
550 ldlm_lock_addref(lockh, lock->l_req_mode);
551 ldlm_lock_decref(lockh, einfo->ei_mode);
552 einfo->ei_mode = lock->l_req_mode;
557 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
558 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
560 it->it_disposition = (int)lockrep->lock_policy_res1;
561 it->it_status = (int)lockrep->lock_policy_res2;
562 it->it_lock_mode = einfo->ei_mode;
563 it->it_lock_handle = lockh->cookie;
564 it->it_request = req;
566 /* Technically speaking rq_transno must already be zero if
567 * it_status is in error, so the check is a bit redundant */
568 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
569 mdc_clear_replay_flag(req, it->it_status);
571 /* If we're doing an IT_OPEN which did not result in an actual
572 * successful open, then we need to remove the bit which saves
573 * this request for unconditional replay.
575 * It's important that we do this first! Otherwise we might exit the
576 * function without doing so, and try to replay a failed create
578 if (it->it_op & IT_OPEN && req->rq_replay &&
579 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
580 mdc_clear_replay_flag(req, it->it_status);
582 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
583 it->it_op, it->it_disposition, it->it_status);
585 /* We know what to expect, so we do any byte flipping required here */
586 if (it_has_reply_body(it)) {
587 struct mdt_body *body;
589 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
591 CERROR ("Can't swab mdt_body\n");
595 if (it_disposition(it, DISP_OPEN_OPEN) &&
596 !it_open_error(DISP_OPEN_OPEN, it)) {
598 * If this is a successful OPEN request, we need to set
599 * replay handler and data early, so that if replay
600 * happens immediately after swabbing below, new reply
601 * is swabbed by that handler correctly.
603 mdc_set_open_replay_data(NULL, NULL, it);
606 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
609 mdc_update_max_ea_from_body(exp, body);
612 * The eadata is opaque; just check that it is there.
613 * Eventually, obd_unpackmd() will check the contents.
615 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
616 body->mbo_eadatasize);
620 /* save lvb data and length in case this is for layout
623 lvb_len = body->mbo_eadatasize;
626 * We save the reply LOV EA in case we have to replay a
627 * create for recovery. If we didn't allocate a large
628 * enough request buffer above we need to reallocate it
629 * here to hold the actual LOV EA.
631 * To not save LOV EA if request is not going to replay
632 * (for example error one).
634 if ((it->it_op & IT_OPEN) && req->rq_replay) {
636 if (req_capsule_get_size(pill, &RMF_EADATA,
638 body->mbo_eadatasize)
639 mdc_realloc_openmsg(req, body);
641 req_capsule_shrink(pill, &RMF_EADATA,
642 body->mbo_eadatasize,
645 req_capsule_set_size(pill, &RMF_EADATA,
647 body->mbo_eadatasize);
649 lmm = req_capsule_client_get(pill, &RMF_EADATA);
652 body->mbo_eadatasize);
655 } else if (it->it_op & IT_LAYOUT) {
656 /* maybe the lock was granted right away and layout
657 * is packed into RMF_DLM_LVB of req */
658 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
660 lvb_data = req_capsule_server_sized_get(pill,
661 &RMF_DLM_LVB, lvb_len);
662 if (lvb_data == NULL)
667 /* fill in stripe data for layout lock.
668 * LU-6581: trust layout data only if layout lock is granted. The MDT
669 * has stopped sending layout unless the layout lock is granted. The
670 * client still does this checking in case it's talking with an old
671 * server. - Jinshan */
672 lock = ldlm_handle2lock(lockh);
673 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
674 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
677 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
678 ldlm_it2str(it->it_op), lvb_len);
680 OBD_ALLOC_LARGE(lmm, lvb_len);
685 memcpy(lmm, lvb_data, lvb_len);
687 /* install lvb_data */
688 lock_res_and_lock(lock);
689 if (lock->l_lvb_data == NULL) {
690 lock->l_lvb_type = LVB_T_LAYOUT;
691 lock->l_lvb_data = lmm;
692 lock->l_lvb_len = lvb_len;
695 unlock_res_and_lock(lock);
697 OBD_FREE_LARGE(lmm, lvb_len);
705 /* We always reserve enough space in the reply packet for a stripe MD, because
706 * we don't know in advance the file type. */
707 static int mdc_enqueue_base(struct obd_export *exp,
708 struct ldlm_enqueue_info *einfo,
709 const union ldlm_policy_data *policy,
710 struct lookup_intent *it,
711 struct md_op_data *op_data,
712 struct lustre_handle *lockh,
713 __u64 extra_lock_flags)
715 struct obd_device *obddev = class_exp2obd(exp);
716 struct ptlrpc_request *req = NULL;
717 __u64 flags, saved_flags = extra_lock_flags;
718 struct ldlm_res_id res_id;
719 static const union ldlm_policy_data lookup_policy = {
720 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
721 static const union ldlm_policy_data update_policy = {
722 .l_inodebits = { MDS_INODELOCK_UPDATE } };
723 static const union ldlm_policy_data layout_policy = {
724 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
725 static const union ldlm_policy_data getxattr_policy = {
726 .l_inodebits = { MDS_INODELOCK_XATTR } };
727 int generation, resends = 0;
728 struct ldlm_reply *lockrep;
729 enum lvb_type lvb_type = 0;
733 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
735 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
738 LASSERT(policy == NULL);
740 saved_flags |= LDLM_FL_HAS_INTENT;
741 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
742 policy = &update_policy;
743 else if (it->it_op & IT_LAYOUT)
744 policy = &layout_policy;
745 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
746 policy = &getxattr_policy;
748 policy = &lookup_policy;
751 generation = obddev->u.cli.cl_import->imp_generation;
755 /* The only way right now is FLOCK. */
756 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
758 res_id.name[3] = LDLM_FLOCK;
759 } else if (it->it_op & IT_OPEN) {
760 req = mdc_intent_open_pack(exp, it, op_data);
761 } else if (it->it_op & IT_UNLINK) {
762 req = mdc_intent_unlink_pack(exp, it, op_data);
763 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
764 req = mdc_intent_getattr_pack(exp, it, op_data);
765 } else if (it->it_op & IT_READDIR) {
766 req = mdc_enqueue_pack(exp, 0);
767 } else if (it->it_op & IT_LAYOUT) {
768 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
770 req = mdc_intent_layout_pack(exp, it, op_data);
771 lvb_type = LVB_T_LAYOUT;
772 } else if (it->it_op & IT_GETXATTR) {
773 req = mdc_intent_getxattr_pack(exp, it, op_data);
780 RETURN(PTR_ERR(req));
783 req->rq_generation_set = 1;
784 req->rq_import_generation = generation;
785 req->rq_sent = cfs_time_current_sec() + resends;
788 /* It is important to obtain modify RPC slot first (if applicable), so
789 * that threads that are waiting for a modify RPC slot are not polluting
790 * our rpcs in flight counter.
791 * We do not do flock request limiting, though */
793 mdc_get_mod_rpc_slot(req, it);
794 rc = obd_get_request_slot(&obddev->u.cli);
796 mdc_put_mod_rpc_slot(req, it);
797 mdc_clear_replay_flag(req, 0);
798 ptlrpc_req_finished(req);
803 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
804 0, lvb_type, lockh, 0);
806 /* For flock requests we immediatelly return without further
807 delay and let caller deal with the rest, since rest of
808 this function metadata processing makes no sense for flock
809 requests anyway. But in case of problem during comms with
810 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
811 can not rely on caller and this mainly for F_UNLCKs
812 (explicits or automatically generated by Kernel to clean
813 current FLocks upon exit) that can't be trashed */
814 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
815 (einfo->ei_type == LDLM_FLOCK) &&
816 (einfo->ei_mode == LCK_NL))
821 obd_put_request_slot(&obddev->u.cli);
822 mdc_put_mod_rpc_slot(req, it);
825 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
826 obddev->obd_name, rc);
828 mdc_clear_replay_flag(req, rc);
829 ptlrpc_req_finished(req);
833 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
834 LASSERT(lockrep != NULL);
836 lockrep->lock_policy_res2 =
837 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
839 /* Retry infinitely when the server returns -EINPROGRESS for the
840 * intent operation, when server returns -EINPROGRESS for acquiring
841 * intent lock, we'll retry in after_reply(). */
842 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
843 mdc_clear_replay_flag(req, rc);
844 ptlrpc_req_finished(req);
847 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
848 obddev->obd_name, resends, it->it_op,
849 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
851 if (generation == obddev->u.cli.cl_import->imp_generation) {
854 CDEBUG(D_HA, "resend cross eviction\n");
859 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
861 if (lustre_handle_is_used(lockh)) {
862 ldlm_lock_decref(lockh, einfo->ei_mode);
863 memset(lockh, 0, sizeof(*lockh));
865 ptlrpc_req_finished(req);
867 it->it_lock_handle = 0;
868 it->it_lock_mode = 0;
869 it->it_request = NULL;
875 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
876 const union ldlm_policy_data *policy,
877 struct md_op_data *op_data,
878 struct lustre_handle *lockh, __u64 extra_lock_flags)
880 return mdc_enqueue_base(exp, einfo, policy, NULL,
881 op_data, lockh, extra_lock_flags);
884 static int mdc_finish_intent_lock(struct obd_export *exp,
885 struct ptlrpc_request *request,
886 struct md_op_data *op_data,
887 struct lookup_intent *it,
888 struct lustre_handle *lockh)
890 struct lustre_handle old_lock;
891 struct ldlm_lock *lock;
895 LASSERT(request != NULL);
896 LASSERT(request != LP_POISON);
897 LASSERT(request->rq_repmsg != LP_POISON);
899 if (it->it_op & IT_READDIR)
902 if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
903 if (it->it_status != 0)
904 GOTO(out, rc = it->it_status);
906 if (!it_disposition(it, DISP_IT_EXECD)) {
907 /* The server failed before it even started executing
908 * the intent, i.e. because it couldn't unpack the
911 LASSERT(it->it_status != 0);
912 GOTO(out, rc = it->it_status);
914 rc = it_open_error(DISP_IT_EXECD, it);
918 rc = it_open_error(DISP_LOOKUP_EXECD, it);
922 /* keep requests around for the multiple phases of the call
923 * this shows the DISP_XX must guarantee we make it into the
926 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
927 it_disposition(it, DISP_OPEN_CREATE) &&
928 !it_open_error(DISP_OPEN_CREATE, it)) {
929 it_set_disposition(it, DISP_ENQ_CREATE_REF);
930 /* balanced in ll_create_node */
931 ptlrpc_request_addref(request);
933 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
934 it_disposition(it, DISP_OPEN_OPEN) &&
935 !it_open_error(DISP_OPEN_OPEN, it)) {
936 it_set_disposition(it, DISP_ENQ_OPEN_REF);
937 /* balanced in ll_file_open */
938 ptlrpc_request_addref(request);
939 /* BUG 11546 - eviction in the middle of open rpc
942 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
946 if (it->it_op & IT_CREAT) {
947 /* XXX this belongs in ll_create_it */
948 } else if (it->it_op == IT_OPEN) {
949 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
951 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
955 /* If we already have a matching lock, then cancel the new
956 * one. We have to set the data here instead of in
957 * mdc_enqueue, because we need to use the child's inode as
958 * the l_ast_data to match, and that's not available until
959 * intent_finish has performed the iget().) */
960 lock = ldlm_handle2lock(lockh);
962 union ldlm_policy_data policy = lock->l_policy_data;
963 LDLM_DEBUG(lock, "matching against this");
965 if (it_has_reply_body(it)) {
966 struct mdt_body *body;
968 body = req_capsule_server_get(&request->rq_pill,
970 /* mdc_enqueue checked */
971 LASSERT(body != NULL);
972 LASSERTF(fid_res_name_eq(&body->mbo_fid1,
973 &lock->l_resource->lr_name),
974 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
975 PLDLMRES(lock->l_resource),
976 PFID(&body->mbo_fid1));
980 memcpy(&old_lock, lockh, sizeof(*lockh));
981 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
982 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
983 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
984 memcpy(lockh, &old_lock, sizeof(old_lock));
985 it->it_lock_handle = lockh->cookie;
991 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
992 (int)op_data->op_namelen, op_data->op_name,
993 ldlm_it2str(it->it_op), it->it_status,
994 it->it_disposition, rc);
998 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
999 struct lu_fid *fid, __u64 *bits)
1001 /* We could just return 1 immediately, but since we should only
1002 * be called in revalidate_it if we already have a lock, let's
1004 struct ldlm_res_id res_id;
1005 struct lustre_handle lockh;
1006 union ldlm_policy_data policy;
1007 enum ldlm_mode mode;
1010 if (it->it_lock_handle) {
1011 lockh.cookie = it->it_lock_handle;
1012 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1014 fid_build_reg_res_name(fid, &res_id);
1015 switch (it->it_op) {
1017 /* File attributes are held under multiple bits:
1018 * nlink is under lookup lock, size and times are
1019 * under UPDATE lock and recently we've also got
1020 * a separate permissions lock for owner/group/acl that
1021 * were protected by lookup lock before.
1022 * Getattr must provide all of that information,
1023 * so we need to ensure we have all of those locks.
1024 * Unfortunately, if the bits are split across multiple
1025 * locks, there's no easy way to match all of them here,
1026 * so an extra RPC would be performed to fetch all
1027 * of those bits at once for now. */
1028 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1029 * but for old MDTs (< 2.4), permission is covered
1030 * by LOOKUP lock, so it needs to match all bits here.*/
1031 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1032 MDS_INODELOCK_LOOKUP |
1036 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1039 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1042 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1046 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1047 LDLM_IBITS, &policy,
1048 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1053 it->it_lock_handle = lockh.cookie;
1054 it->it_lock_mode = mode;
1056 it->it_lock_handle = 0;
1057 it->it_lock_mode = 0;
1064 * This long block is all about fixing up the lock and request state
1065 * so that it is correct as of the moment _before_ the operation was
1066 * applied; that way, the VFS will think that everything is normal and
1067 * call Lustre's regular VFS methods.
1069 * If we're performing a creation, that means that unless the creation
1070 * failed with EEXIST, we should fake up a negative dentry.
1072 * For everything else, we want to lookup to succeed.
1074 * One additional note: if CREATE or OPEN succeeded, we add an extra
1075 * reference to the request because we need to keep it around until
1076 * ll_create/ll_open gets called.
1078 * The server will return to us, in it_disposition, an indication of
1079 * exactly what it_status refers to.
1081 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1082 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1083 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1084 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1087 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1090 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1091 struct lookup_intent *it, struct ptlrpc_request **reqp,
1092 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1094 struct ldlm_enqueue_info einfo = {
1095 .ei_type = LDLM_IBITS,
1096 .ei_mode = it_to_lock_mode(it),
1097 .ei_cb_bl = cb_blocking,
1098 .ei_cb_cp = ldlm_completion_ast,
1100 struct lustre_handle lockh;
1105 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1106 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1107 op_data->op_name, PFID(&op_data->op_fid2),
1108 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1112 if (fid_is_sane(&op_data->op_fid2) &&
1113 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1114 /* We could just return 1 immediately, but since we should only
1115 * be called in revalidate_it if we already have a lock, let's
1117 it->it_lock_handle = 0;
1118 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1119 /* Only return failure if it was not GETATTR by cfid
1120 (from inode_revalidate) */
1121 if (rc || op_data->op_namelen != 0)
1125 /* For case if upper layer did not alloc fid, do it now. */
1126 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1127 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1129 CERROR("Can't alloc new fid, rc %d\n", rc);
1134 rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1139 *reqp = it->it_request;
1140 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1144 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1145 struct ptlrpc_request *req,
1148 struct mdc_getattr_args *ga = args;
1149 struct obd_export *exp = ga->ga_exp;
1150 struct md_enqueue_info *minfo = ga->ga_minfo;
1151 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1152 struct lookup_intent *it;
1153 struct lustre_handle *lockh;
1154 struct obd_device *obddev;
1155 struct ldlm_reply *lockrep;
1156 __u64 flags = LDLM_FL_HAS_INTENT;
1160 lockh = &minfo->mi_lockh;
1162 obddev = class_exp2obd(exp);
1164 obd_put_request_slot(&obddev->u.cli);
1165 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1168 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1169 &flags, NULL, 0, lockh, rc);
1171 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1172 mdc_clear_replay_flag(req, rc);
1176 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1177 LASSERT(lockrep != NULL);
1179 lockrep->lock_policy_res2 =
1180 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1182 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1186 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1190 minfo->mi_cb(req, minfo, rc);
1194 int mdc_intent_getattr_async(struct obd_export *exp,
1195 struct md_enqueue_info *minfo)
1197 struct md_op_data *op_data = &minfo->mi_data;
1198 struct lookup_intent *it = &minfo->mi_it;
1199 struct ptlrpc_request *req;
1200 struct mdc_getattr_args *ga;
1201 struct obd_device *obddev = class_exp2obd(exp);
1202 struct ldlm_res_id res_id;
1203 union ldlm_policy_data policy = {
1204 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1205 MDS_INODELOCK_UPDATE } };
1207 __u64 flags = LDLM_FL_HAS_INTENT;
1210 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1211 (int)op_data->op_namelen, op_data->op_name,
1212 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1214 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1215 req = mdc_intent_getattr_pack(exp, it, op_data);
1217 RETURN(PTR_ERR(req));
1219 rc = obd_get_request_slot(&obddev->u.cli);
1221 ptlrpc_req_finished(req);
1225 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1226 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1228 obd_put_request_slot(&obddev->u.cli);
1229 ptlrpc_req_finished(req);
1233 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1234 ga = ptlrpc_req_async_args(req);
1236 ga->ga_minfo = minfo;
1238 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1239 ptlrpcd_add_req(req);