4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2016, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_MDC
35 #include <linux/module.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
47 #include "mdc_internal.h"
49 struct mdc_getattr_args {
50 struct obd_export *ga_exp;
51 struct md_enqueue_info *ga_minfo;
54 int it_open_error(int phase, struct lookup_intent *it)
56 if (it_disposition(it, DISP_OPEN_LEASE)) {
57 if (phase >= DISP_OPEN_LEASE)
62 if (it_disposition(it, DISP_OPEN_OPEN)) {
63 if (phase >= DISP_OPEN_OPEN)
69 if (it_disposition(it, DISP_OPEN_CREATE)) {
70 if (phase >= DISP_OPEN_CREATE)
76 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
77 if (phase >= DISP_LOOKUP_EXECD)
83 if (it_disposition(it, DISP_IT_EXECD)) {
84 if (phase >= DISP_IT_EXECD)
90 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
95 EXPORT_SYMBOL(it_open_error);
97 /* this must be called on a lockh that is known to have a referenced lock */
98 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
99 void *data, __u64 *bits)
101 struct ldlm_lock *lock;
102 struct inode *new_inode = data;
108 if (!lustre_handle_is_used(lockh))
111 lock = ldlm_handle2lock(lockh);
113 LASSERT(lock != NULL);
114 lock_res_and_lock(lock);
115 if (lock->l_resource->lr_lvb_inode &&
116 lock->l_resource->lr_lvb_inode != data) {
117 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
118 LASSERTF(old_inode->i_state & I_FREEING,
119 "Found existing inode %p/%lu/%u state %lu in lock: "
120 "setting data to %p/%lu/%u\n", old_inode,
121 old_inode->i_ino, old_inode->i_generation,
123 new_inode, new_inode->i_ino, new_inode->i_generation);
125 lock->l_resource->lr_lvb_inode = new_inode;
127 *bits = lock->l_policy_data.l_inodebits.bits;
129 unlock_res_and_lock(lock);
135 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
136 const struct lu_fid *fid, enum ldlm_type type,
137 union ldlm_policy_data *policy,
138 enum ldlm_mode mode, struct lustre_handle *lockh)
140 struct ldlm_res_id res_id;
144 fid_build_reg_res_name(fid, &res_id);
145 /* LU-4405: Clear bits not supported by server */
146 policy->l_inodebits.bits &= exp_connect_ibits(exp);
147 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
148 &res_id, type, policy, mode, lockh, 0);
152 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
153 union ldlm_policy_data *policy, enum ldlm_mode mode,
154 enum ldlm_cancel_flags flags, void *opaque)
156 struct obd_device *obd = class_exp2obd(exp);
157 struct ldlm_res_id res_id;
162 fid_build_reg_res_name(fid, &res_id);
163 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
164 policy, mode, flags, opaque);
168 int mdc_null_inode(struct obd_export *exp,
169 const struct lu_fid *fid)
171 struct ldlm_res_id res_id;
172 struct ldlm_resource *res;
173 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
176 LASSERTF(ns != NULL, "no namespace passed\n");
178 fid_build_reg_res_name(fid, &res_id);
180 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
185 res->lr_lvb_inode = NULL;
188 ldlm_resource_putref(res);
192 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
194 /* Don't hold error requests for replay. */
195 if (req->rq_replay) {
196 spin_lock(&req->rq_lock);
198 spin_unlock(&req->rq_lock);
200 if (rc && req->rq_transno != 0) {
201 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
206 /* Save a large LOV EA into the request buffer so that it is available
207 * for replay. We don't do this in the initial request because the
208 * original request doesn't need this buffer (at most it sends just the
209 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
210 * buffer and may also be difficult to allocate and save a very large
211 * request buffer for each open. (bug 5707)
213 * OOM here may cause recovery failure if lmm is needed (only for the
214 * original open if the MDS crashed just when this client also OOM'd)
215 * but this is incredibly unlikely, and questionable whether the client
216 * could do MDS recovery under OOM anyways... */
217 int mdc_save_lovea(struct ptlrpc_request *req,
218 const struct req_msg_field *field,
219 void *data, u32 size)
221 struct req_capsule *pill = &req->rq_pill;
225 if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
226 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
228 CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
229 req->rq_export->exp_obd->obd_name,
234 req_capsule_shrink(pill, field, size, RCL_CLIENT);
237 req_capsule_set_size(pill, field, RCL_CLIENT, size);
238 lmm = req_capsule_client_get(pill, field);
240 memcpy(lmm, data, size);
245 static struct ptlrpc_request *
246 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
247 struct md_op_data *op_data)
249 struct ptlrpc_request *req;
250 struct obd_device *obddev = class_exp2obd(exp);
251 struct ldlm_intent *lit;
252 const void *lmm = op_data->op_data;
253 __u32 lmmsize = op_data->op_data_size;
254 struct list_head cancels = LIST_HEAD_INIT(cancels);
260 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
262 /* XXX: openlock is not cancelled for cross-refs. */
263 /* If inode is known, cancel conflicting OPEN locks. */
264 if (fid_is_sane(&op_data->op_fid2)) {
265 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
266 if (it->it_flags & FMODE_WRITE)
271 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
274 else if (it->it_flags & FMODE_EXEC)
280 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
285 /* If CREATE, cancel parent's UPDATE lock. */
286 if (it->it_op & IT_CREAT)
290 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
292 MDS_INODELOCK_UPDATE);
294 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
295 &RQF_LDLM_INTENT_OPEN);
297 ldlm_lock_list_put(&cancels, l_bl_ast, count);
298 RETURN(ERR_PTR(-ENOMEM));
301 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
302 op_data->op_namelen + 1);
303 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
304 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
306 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
307 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
308 strlen(op_data->op_file_secctx_name) + 1 : 0);
310 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
311 op_data->op_file_secctx_size);
313 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
315 ptlrpc_request_free(req);
319 spin_lock(&req->rq_lock);
320 req->rq_replay = req->rq_import->imp_replayable;
321 spin_unlock(&req->rq_lock);
323 /* pack the intent */
324 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
325 lit->opc = (__u64)it->it_op;
327 /* pack the intended request */
328 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
331 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
332 obddev->u.cli.cl_max_mds_easize);
333 ptlrpc_request_set_replen(req);
337 static struct ptlrpc_request *
338 mdc_intent_getxattr_pack(struct obd_export *exp,
339 struct lookup_intent *it,
340 struct md_op_data *op_data)
342 struct ptlrpc_request *req;
343 struct ldlm_intent *lit;
346 struct list_head cancels = LIST_HEAD_INIT(cancels);
350 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
351 &RQF_LDLM_INTENT_GETXATTR);
353 RETURN(ERR_PTR(-ENOMEM));
355 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
357 ptlrpc_request_free(req);
361 /* pack the intent */
362 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
363 lit->opc = IT_GETXATTR;
365 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
367 /* pack the intended request */
368 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
371 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
372 RCL_SERVER, maxdata);
374 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
375 RCL_SERVER, maxdata);
377 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
378 RCL_SERVER, maxdata);
380 ptlrpc_request_set_replen(req);
385 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
386 struct lookup_intent *it,
387 struct md_op_data *op_data)
389 struct ptlrpc_request *req;
390 struct obd_device *obddev = class_exp2obd(exp);
391 struct ldlm_intent *lit;
395 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
396 &RQF_LDLM_INTENT_UNLINK);
398 RETURN(ERR_PTR(-ENOMEM));
400 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
401 op_data->op_namelen + 1);
403 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
405 ptlrpc_request_free(req);
409 /* pack the intent */
410 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
411 lit->opc = (__u64)it->it_op;
413 /* pack the intended request */
414 mdc_unlink_pack(req, op_data);
416 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
417 obddev->u.cli.cl_default_mds_easize);
418 ptlrpc_request_set_replen(req);
422 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
423 struct lookup_intent *it,
424 struct md_op_data *op_data)
426 struct ptlrpc_request *req;
427 struct obd_device *obddev = class_exp2obd(exp);
428 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
429 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
430 OBD_MD_MEA | OBD_MD_FLACL;
431 struct ldlm_intent *lit;
436 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
437 &RQF_LDLM_INTENT_GETATTR);
439 RETURN(ERR_PTR(-ENOMEM));
441 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
442 op_data->op_namelen + 1);
444 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
446 ptlrpc_request_free(req);
450 /* pack the intent */
451 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
452 lit->opc = (__u64)it->it_op;
454 if (obddev->u.cli.cl_default_mds_easize > 0)
455 easize = obddev->u.cli.cl_default_mds_easize;
457 easize = obddev->u.cli.cl_max_mds_easize;
459 /* pack the intended request */
460 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
462 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
463 ptlrpc_request_set_replen(req);
467 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
468 struct lookup_intent *it,
469 struct md_op_data *op_data)
471 struct obd_device *obd = class_exp2obd(exp);
472 struct ptlrpc_request *req;
473 struct ldlm_intent *lit;
474 struct layout_intent *layout;
478 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
479 &RQF_LDLM_INTENT_LAYOUT);
481 RETURN(ERR_PTR(-ENOMEM));
483 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
484 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
486 ptlrpc_request_free(req);
490 /* pack the intent */
491 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
492 lit->opc = (__u64)it->it_op;
494 /* pack the layout intent request */
495 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
496 LASSERT(op_data->op_data != NULL);
497 LASSERT(op_data->op_data_size == sizeof(*layout));
498 memcpy(layout, op_data->op_data, sizeof(*layout));
500 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
501 obd->u.cli.cl_default_mds_easize);
502 ptlrpc_request_set_replen(req);
506 static struct ptlrpc_request *
507 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
509 struct ptlrpc_request *req;
513 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
515 RETURN(ERR_PTR(-ENOMEM));
517 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
519 ptlrpc_request_free(req);
523 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
524 ptlrpc_request_set_replen(req);
528 static int mdc_finish_enqueue(struct obd_export *exp,
529 struct ptlrpc_request *req,
530 struct ldlm_enqueue_info *einfo,
531 struct lookup_intent *it,
532 struct lustre_handle *lockh,
535 struct req_capsule *pill = &req->rq_pill;
536 struct ldlm_request *lockreq;
537 struct ldlm_reply *lockrep;
538 struct ldlm_lock *lock;
539 void *lvb_data = NULL;
544 /* Similarly, if we're going to replay this request, we don't want to
545 * actually get a lock, just perform the intent. */
546 if (req->rq_transno || req->rq_replay) {
547 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
548 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
551 if (rc == ELDLM_LOCK_ABORTED) {
553 memset(lockh, 0, sizeof(*lockh));
555 } else { /* rc = 0 */
556 lock = ldlm_handle2lock(lockh);
557 LASSERT(lock != NULL);
559 /* If the server gave us back a different lock mode, we should
560 * fix up our variables. */
561 if (lock->l_req_mode != einfo->ei_mode) {
562 ldlm_lock_addref(lockh, lock->l_req_mode);
563 ldlm_lock_decref(lockh, einfo->ei_mode);
564 einfo->ei_mode = lock->l_req_mode;
569 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
570 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
572 it->it_disposition = (int)lockrep->lock_policy_res1;
573 it->it_status = (int)lockrep->lock_policy_res2;
574 it->it_lock_mode = einfo->ei_mode;
575 it->it_lock_handle = lockh->cookie;
576 it->it_request = req;
578 /* Technically speaking rq_transno must already be zero if
579 * it_status is in error, so the check is a bit redundant */
580 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
581 mdc_clear_replay_flag(req, it->it_status);
583 /* If we're doing an IT_OPEN which did not result in an actual
584 * successful open, then we need to remove the bit which saves
585 * this request for unconditional replay.
587 * It's important that we do this first! Otherwise we might exit the
588 * function without doing so, and try to replay a failed create
590 if (it->it_op & IT_OPEN && req->rq_replay &&
591 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
592 mdc_clear_replay_flag(req, it->it_status);
594 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
595 it->it_op, it->it_disposition, it->it_status);
597 /* We know what to expect, so we do any byte flipping required here */
598 if (it_has_reply_body(it)) {
599 struct mdt_body *body;
601 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
603 CERROR ("Can't swab mdt_body\n");
607 if (it_disposition(it, DISP_OPEN_OPEN) &&
608 !it_open_error(DISP_OPEN_OPEN, it)) {
610 * If this is a successful OPEN request, we need to set
611 * replay handler and data early, so that if replay
612 * happens immediately after swabbing below, new reply
613 * is swabbed by that handler correctly.
615 mdc_set_open_replay_data(NULL, NULL, it);
618 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
621 mdc_update_max_ea_from_body(exp, body);
624 * The eadata is opaque; just check that it is there.
625 * Eventually, obd_unpackmd() will check the contents.
627 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
628 body->mbo_eadatasize);
632 /* save lvb data and length in case this is for layout
635 lvb_len = body->mbo_eadatasize;
638 * We save the reply LOV EA in case we have to replay a
639 * create for recovery. If we didn't allocate a large
640 * enough request buffer above we need to reallocate it
641 * here to hold the actual LOV EA.
643 * To not save LOV EA if request is not going to replay
644 * (for example error one).
646 if ((it->it_op & IT_OPEN) && req->rq_replay) {
647 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
648 body->mbo_eadatasize);
650 body->mbo_valid &= ~OBD_MD_FLEASIZE;
651 body->mbo_eadatasize = 0;
656 } else if (it->it_op & IT_LAYOUT) {
657 /* maybe the lock was granted right away and layout
658 * is packed into RMF_DLM_LVB of req */
659 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
661 lvb_data = req_capsule_server_sized_get(pill,
662 &RMF_DLM_LVB, lvb_len);
663 if (lvb_data == NULL)
667 * save replied layout data to the request buffer for
668 * recovery consideration (lest MDS reinitialize
669 * another set of OST objects).
672 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
677 /* fill in stripe data for layout lock.
678 * LU-6581: trust layout data only if layout lock is granted. The MDT
679 * has stopped sending layout unless the layout lock is granted. The
680 * client still does this checking in case it's talking with an old
681 * server. - Jinshan */
682 lock = ldlm_handle2lock(lockh);
683 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
684 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
687 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
688 ldlm_it2str(it->it_op), lvb_len);
690 OBD_ALLOC_LARGE(lmm, lvb_len);
695 memcpy(lmm, lvb_data, lvb_len);
697 /* install lvb_data */
698 lock_res_and_lock(lock);
699 if (lock->l_lvb_data == NULL) {
700 lock->l_lvb_type = LVB_T_LAYOUT;
701 lock->l_lvb_data = lmm;
702 lock->l_lvb_len = lvb_len;
705 unlock_res_and_lock(lock);
707 OBD_FREE_LARGE(lmm, lvb_len);
715 /* We always reserve enough space in the reply packet for a stripe MD, because
716 * we don't know in advance the file type. */
717 static int mdc_enqueue_base(struct obd_export *exp,
718 struct ldlm_enqueue_info *einfo,
719 const union ldlm_policy_data *policy,
720 struct lookup_intent *it,
721 struct md_op_data *op_data,
722 struct lustre_handle *lockh,
723 __u64 extra_lock_flags)
725 struct obd_device *obddev = class_exp2obd(exp);
726 struct ptlrpc_request *req = NULL;
727 __u64 flags, saved_flags = extra_lock_flags;
728 struct ldlm_res_id res_id;
729 static const union ldlm_policy_data lookup_policy = {
730 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
731 static const union ldlm_policy_data update_policy = {
732 .l_inodebits = { MDS_INODELOCK_UPDATE } };
733 static const union ldlm_policy_data layout_policy = {
734 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
735 static const union ldlm_policy_data getxattr_policy = {
736 .l_inodebits = { MDS_INODELOCK_XATTR } };
737 int generation, resends = 0;
738 struct ldlm_reply *lockrep;
739 enum lvb_type lvb_type = 0;
743 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
745 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
748 LASSERT(policy == NULL);
750 saved_flags |= LDLM_FL_HAS_INTENT;
751 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
752 policy = &update_policy;
753 else if (it->it_op & IT_LAYOUT)
754 policy = &layout_policy;
755 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
756 policy = &getxattr_policy;
758 policy = &lookup_policy;
761 generation = obddev->u.cli.cl_import->imp_generation;
765 /* The only way right now is FLOCK. */
766 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
768 res_id.name[3] = LDLM_FLOCK;
769 } else if (it->it_op & IT_OPEN) {
770 req = mdc_intent_open_pack(exp, it, op_data);
771 } else if (it->it_op & IT_UNLINK) {
772 req = mdc_intent_unlink_pack(exp, it, op_data);
773 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
774 req = mdc_intent_getattr_pack(exp, it, op_data);
775 } else if (it->it_op & IT_READDIR) {
776 req = mdc_enqueue_pack(exp, 0);
777 } else if (it->it_op & IT_LAYOUT) {
778 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
780 req = mdc_intent_layout_pack(exp, it, op_data);
781 lvb_type = LVB_T_LAYOUT;
782 } else if (it->it_op & IT_GETXATTR) {
783 req = mdc_intent_getxattr_pack(exp, it, op_data);
790 RETURN(PTR_ERR(req));
793 req->rq_generation_set = 1;
794 req->rq_import_generation = generation;
795 req->rq_sent = cfs_time_current_sec() + resends;
798 /* It is important to obtain modify RPC slot first (if applicable), so
799 * that threads that are waiting for a modify RPC slot are not polluting
800 * our rpcs in flight counter.
801 * We do not do flock request limiting, though */
803 mdc_get_mod_rpc_slot(req, it);
804 rc = obd_get_request_slot(&obddev->u.cli);
806 mdc_put_mod_rpc_slot(req, it);
807 mdc_clear_replay_flag(req, 0);
808 ptlrpc_req_finished(req);
813 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
814 0, lvb_type, lockh, 0);
816 /* For flock requests we immediatelly return without further
817 delay and let caller deal with the rest, since rest of
818 this function metadata processing makes no sense for flock
819 requests anyway. But in case of problem during comms with
820 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
821 can not rely on caller and this mainly for F_UNLCKs
822 (explicits or automatically generated by Kernel to clean
823 current FLocks upon exit) that can't be trashed */
824 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
825 (einfo->ei_type == LDLM_FLOCK) &&
826 (einfo->ei_mode == LCK_NL))
831 obd_put_request_slot(&obddev->u.cli);
832 mdc_put_mod_rpc_slot(req, it);
835 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
836 obddev->obd_name, rc);
838 mdc_clear_replay_flag(req, rc);
839 ptlrpc_req_finished(req);
843 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
844 LASSERT(lockrep != NULL);
846 lockrep->lock_policy_res2 =
847 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
849 /* Retry infinitely when the server returns -EINPROGRESS for the
850 * intent operation, when server returns -EINPROGRESS for acquiring
851 * intent lock, we'll retry in after_reply(). */
852 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
853 mdc_clear_replay_flag(req, rc);
854 ptlrpc_req_finished(req);
857 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
858 obddev->obd_name, resends, it->it_op,
859 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
861 if (generation == obddev->u.cli.cl_import->imp_generation) {
864 CDEBUG(D_HA, "resend cross eviction\n");
869 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
871 if (lustre_handle_is_used(lockh)) {
872 ldlm_lock_decref(lockh, einfo->ei_mode);
873 memset(lockh, 0, sizeof(*lockh));
875 ptlrpc_req_finished(req);
877 it->it_lock_handle = 0;
878 it->it_lock_mode = 0;
879 it->it_request = NULL;
885 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
886 const union ldlm_policy_data *policy,
887 struct md_op_data *op_data,
888 struct lustre_handle *lockh, __u64 extra_lock_flags)
890 return mdc_enqueue_base(exp, einfo, policy, NULL,
891 op_data, lockh, extra_lock_flags);
894 static int mdc_finish_intent_lock(struct obd_export *exp,
895 struct ptlrpc_request *request,
896 struct md_op_data *op_data,
897 struct lookup_intent *it,
898 struct lustre_handle *lockh)
900 struct lustre_handle old_lock;
901 struct ldlm_lock *lock;
905 LASSERT(request != NULL);
906 LASSERT(request != LP_POISON);
907 LASSERT(request->rq_repmsg != LP_POISON);
909 if (it->it_op & IT_READDIR)
912 if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
913 if (it->it_status != 0)
914 GOTO(out, rc = it->it_status);
916 if (!it_disposition(it, DISP_IT_EXECD)) {
917 /* The server failed before it even started executing
918 * the intent, i.e. because it couldn't unpack the
921 LASSERT(it->it_status != 0);
922 GOTO(out, rc = it->it_status);
924 rc = it_open_error(DISP_IT_EXECD, it);
928 rc = it_open_error(DISP_LOOKUP_EXECD, it);
932 /* keep requests around for the multiple phases of the call
933 * this shows the DISP_XX must guarantee we make it into the
936 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
937 it_disposition(it, DISP_OPEN_CREATE) &&
938 !it_open_error(DISP_OPEN_CREATE, it)) {
939 it_set_disposition(it, DISP_ENQ_CREATE_REF);
940 /* balanced in ll_create_node */
941 ptlrpc_request_addref(request);
943 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
944 it_disposition(it, DISP_OPEN_OPEN) &&
945 !it_open_error(DISP_OPEN_OPEN, it)) {
946 it_set_disposition(it, DISP_ENQ_OPEN_REF);
947 /* balanced in ll_file_open */
948 ptlrpc_request_addref(request);
949 /* BUG 11546 - eviction in the middle of open rpc
952 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
956 if (it->it_op & IT_CREAT) {
957 /* XXX this belongs in ll_create_it */
958 } else if (it->it_op == IT_OPEN) {
959 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
961 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
965 /* If we already have a matching lock, then cancel the new
966 * one. We have to set the data here instead of in
967 * mdc_enqueue, because we need to use the child's inode as
968 * the l_ast_data to match, and that's not available until
969 * intent_finish has performed the iget().) */
970 lock = ldlm_handle2lock(lockh);
972 union ldlm_policy_data policy = lock->l_policy_data;
973 LDLM_DEBUG(lock, "matching against this");
975 if (it_has_reply_body(it)) {
976 struct mdt_body *body;
978 body = req_capsule_server_get(&request->rq_pill,
980 /* mdc_enqueue checked */
981 LASSERT(body != NULL);
982 LASSERTF(fid_res_name_eq(&body->mbo_fid1,
983 &lock->l_resource->lr_name),
984 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
985 PLDLMRES(lock->l_resource),
986 PFID(&body->mbo_fid1));
990 memcpy(&old_lock, lockh, sizeof(*lockh));
991 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
992 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
993 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
994 memcpy(lockh, &old_lock, sizeof(old_lock));
995 it->it_lock_handle = lockh->cookie;
1001 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1002 (int)op_data->op_namelen, op_data->op_name,
1003 ldlm_it2str(it->it_op), it->it_status,
1004 it->it_disposition, rc);
1008 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1009 struct lu_fid *fid, __u64 *bits)
1011 /* We could just return 1 immediately, but since we should only
1012 * be called in revalidate_it if we already have a lock, let's
1014 struct ldlm_res_id res_id;
1015 struct lustre_handle lockh;
1016 union ldlm_policy_data policy;
1017 enum ldlm_mode mode;
1020 if (it->it_lock_handle) {
1021 lockh.cookie = it->it_lock_handle;
1022 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1024 fid_build_reg_res_name(fid, &res_id);
1025 switch (it->it_op) {
1027 /* File attributes are held under multiple bits:
1028 * nlink is under lookup lock, size and times are
1029 * under UPDATE lock and recently we've also got
1030 * a separate permissions lock for owner/group/acl that
1031 * were protected by lookup lock before.
1032 * Getattr must provide all of that information,
1033 * so we need to ensure we have all of those locks.
1034 * Unfortunately, if the bits are split across multiple
1035 * locks, there's no easy way to match all of them here,
1036 * so an extra RPC would be performed to fetch all
1037 * of those bits at once for now. */
1038 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1039 * but for old MDTs (< 2.4), permission is covered
1040 * by LOOKUP lock, so it needs to match all bits here.*/
1041 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1042 MDS_INODELOCK_LOOKUP |
1046 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1049 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1052 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1056 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1057 LDLM_IBITS, &policy,
1058 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1063 it->it_lock_handle = lockh.cookie;
1064 it->it_lock_mode = mode;
1066 it->it_lock_handle = 0;
1067 it->it_lock_mode = 0;
1074 * This long block is all about fixing up the lock and request state
1075 * so that it is correct as of the moment _before_ the operation was
1076 * applied; that way, the VFS will think that everything is normal and
1077 * call Lustre's regular VFS methods.
1079 * If we're performing a creation, that means that unless the creation
1080 * failed with EEXIST, we should fake up a negative dentry.
1082 * For everything else, we want to lookup to succeed.
1084 * One additional note: if CREATE or OPEN succeeded, we add an extra
1085 * reference to the request because we need to keep it around until
1086 * ll_create/ll_open gets called.
1088 * The server will return to us, in it_disposition, an indication of
1089 * exactly what it_status refers to.
1091 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1092 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1093 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1094 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1097 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1100 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1101 struct lookup_intent *it, struct ptlrpc_request **reqp,
1102 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1104 struct ldlm_enqueue_info einfo = {
1105 .ei_type = LDLM_IBITS,
1106 .ei_mode = it_to_lock_mode(it),
1107 .ei_cb_bl = cb_blocking,
1108 .ei_cb_cp = ldlm_completion_ast,
1110 struct lustre_handle lockh;
1115 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1116 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1117 op_data->op_name, PFID(&op_data->op_fid2),
1118 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1122 if (fid_is_sane(&op_data->op_fid2) &&
1123 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1124 /* We could just return 1 immediately, but since we should only
1125 * be called in revalidate_it if we already have a lock, let's
1127 it->it_lock_handle = 0;
1128 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1129 /* Only return failure if it was not GETATTR by cfid
1130 (from inode_revalidate) */
1131 if (rc || op_data->op_namelen != 0)
1135 /* For case if upper layer did not alloc fid, do it now. */
1136 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1137 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1139 CERROR("Can't alloc new fid, rc %d\n", rc);
1144 rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1149 *reqp = it->it_request;
1150 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1154 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1155 struct ptlrpc_request *req,
1158 struct mdc_getattr_args *ga = args;
1159 struct obd_export *exp = ga->ga_exp;
1160 struct md_enqueue_info *minfo = ga->ga_minfo;
1161 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1162 struct lookup_intent *it;
1163 struct lustre_handle *lockh;
1164 struct obd_device *obddev;
1165 struct ldlm_reply *lockrep;
1166 __u64 flags = LDLM_FL_HAS_INTENT;
1170 lockh = &minfo->mi_lockh;
1172 obddev = class_exp2obd(exp);
1174 obd_put_request_slot(&obddev->u.cli);
1175 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1178 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1179 &flags, NULL, 0, lockh, rc);
1181 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1182 mdc_clear_replay_flag(req, rc);
1186 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1187 LASSERT(lockrep != NULL);
1189 lockrep->lock_policy_res2 =
1190 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1192 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1196 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1200 minfo->mi_cb(req, minfo, rc);
1204 int mdc_intent_getattr_async(struct obd_export *exp,
1205 struct md_enqueue_info *minfo)
1207 struct md_op_data *op_data = &minfo->mi_data;
1208 struct lookup_intent *it = &minfo->mi_it;
1209 struct ptlrpc_request *req;
1210 struct mdc_getattr_args *ga;
1211 struct obd_device *obddev = class_exp2obd(exp);
1212 struct ldlm_res_id res_id;
1213 union ldlm_policy_data policy = {
1214 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1215 MDS_INODELOCK_UPDATE } };
1217 __u64 flags = LDLM_FL_HAS_INTENT;
1220 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1221 (int)op_data->op_namelen, op_data->op_name,
1222 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1224 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1225 req = mdc_intent_getattr_pack(exp, it, op_data);
1227 RETURN(PTR_ERR(req));
1229 rc = obd_get_request_slot(&obddev->u.cli);
1231 ptlrpc_req_finished(req);
1235 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1236 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1238 obd_put_request_slot(&obddev->u.cli);
1239 ptlrpc_req_finished(req);
1243 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1244 ga = ptlrpc_req_async_args(req);
1246 ga->ga_minfo = minfo;
1248 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1249 ptlrpcd_add_req(req);