4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2015, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
39 #include <linux/module.h>
42 #include <obd_class.h>
43 #include <lustre_dlm.h>
44 #include <lustre_fid.h>
45 #include <lustre_intent.h>
46 #include <lustre_mdc.h>
47 #include <lustre_net.h>
48 #include <lustre_req_layout.h>
49 #include <lustre_swab.h>
51 #include "mdc_internal.h"
53 struct mdc_getattr_args {
54 struct obd_export *ga_exp;
55 struct md_enqueue_info *ga_minfo;
58 int it_open_error(int phase, struct lookup_intent *it)
60 if (it_disposition(it, DISP_OPEN_LEASE)) {
61 if (phase >= DISP_OPEN_LEASE)
66 if (it_disposition(it, DISP_OPEN_OPEN)) {
67 if (phase >= DISP_OPEN_OPEN)
73 if (it_disposition(it, DISP_OPEN_CREATE)) {
74 if (phase >= DISP_OPEN_CREATE)
80 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
81 if (phase >= DISP_LOOKUP_EXECD)
87 if (it_disposition(it, DISP_IT_EXECD)) {
88 if (phase >= DISP_IT_EXECD)
94 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
99 EXPORT_SYMBOL(it_open_error);
101 /* this must be called on a lockh that is known to have a referenced lock */
102 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
103 void *data, __u64 *bits)
105 struct ldlm_lock *lock;
106 struct inode *new_inode = data;
112 if (!lustre_handle_is_used(lockh))
115 lock = ldlm_handle2lock(lockh);
117 LASSERT(lock != NULL);
118 lock_res_and_lock(lock);
119 if (lock->l_resource->lr_lvb_inode &&
120 lock->l_resource->lr_lvb_inode != data) {
121 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
122 LASSERTF(old_inode->i_state & I_FREEING,
123 "Found existing inode %p/%lu/%u state %lu in lock: "
124 "setting data to %p/%lu/%u\n", old_inode,
125 old_inode->i_ino, old_inode->i_generation,
127 new_inode, new_inode->i_ino, new_inode->i_generation);
129 lock->l_resource->lr_lvb_inode = new_inode;
131 *bits = lock->l_policy_data.l_inodebits.bits;
133 unlock_res_and_lock(lock);
139 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
140 const struct lu_fid *fid, enum ldlm_type type,
141 union ldlm_policy_data *policy,
142 enum ldlm_mode mode, struct lustre_handle *lockh)
144 struct ldlm_res_id res_id;
148 fid_build_reg_res_name(fid, &res_id);
149 /* LU-4405: Clear bits not supported by server */
150 policy->l_inodebits.bits &= exp_connect_ibits(exp);
151 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
152 &res_id, type, policy, mode, lockh, 0);
156 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
157 union ldlm_policy_data *policy, enum ldlm_mode mode,
158 enum ldlm_cancel_flags flags, void *opaque)
160 struct obd_device *obd = class_exp2obd(exp);
161 struct ldlm_res_id res_id;
166 fid_build_reg_res_name(fid, &res_id);
167 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
168 policy, mode, flags, opaque);
172 int mdc_null_inode(struct obd_export *exp,
173 const struct lu_fid *fid)
175 struct ldlm_res_id res_id;
176 struct ldlm_resource *res;
177 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
180 LASSERTF(ns != NULL, "no namespace passed\n");
182 fid_build_reg_res_name(fid, &res_id);
184 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
189 res->lr_lvb_inode = NULL;
192 ldlm_resource_putref(res);
196 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
198 /* Don't hold error requests for replay. */
199 if (req->rq_replay) {
200 spin_lock(&req->rq_lock);
202 spin_unlock(&req->rq_lock);
204 if (rc && req->rq_transno != 0) {
205 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
210 /* Save a large LOV EA into the request buffer so that it is available
211 * for replay. We don't do this in the initial request because the
212 * original request doesn't need this buffer (at most it sends just the
213 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
214 * buffer and may also be difficult to allocate and save a very large
215 * request buffer for each open. (bug 5707)
217 * OOM here may cause recovery failure if lmm is needed (only for the
218 * original open if the MDS crashed just when this client also OOM'd)
219 * but this is incredibly unlikely, and questionable whether the client
220 * could do MDS recovery under OOM anyways... */
221 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
222 struct mdt_body *body)
226 /* FIXME: remove this explicit offset. */
227 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
228 body->mbo_eadatasize);
230 CERROR("Can't enlarge segment %d size to %d\n",
231 DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
232 body->mbo_valid &= ~OBD_MD_FLEASIZE;
233 body->mbo_eadatasize = 0;
237 static struct ptlrpc_request *
238 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
239 struct md_op_data *op_data)
241 struct ptlrpc_request *req;
242 struct obd_device *obddev = class_exp2obd(exp);
243 struct ldlm_intent *lit;
244 const void *lmm = op_data->op_data;
245 __u32 lmmsize = op_data->op_data_size;
246 struct list_head cancels = LIST_HEAD_INIT(cancels);
252 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
254 /* XXX: openlock is not cancelled for cross-refs. */
255 /* If inode is known, cancel conflicting OPEN locks. */
256 if (fid_is_sane(&op_data->op_fid2)) {
257 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
258 if (it->it_flags & FMODE_WRITE)
263 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
266 else if (it->it_flags & FMODE_EXEC)
272 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
277 /* If CREATE, cancel parent's UPDATE lock. */
278 if (it->it_op & IT_CREAT)
282 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
284 MDS_INODELOCK_UPDATE);
286 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
287 &RQF_LDLM_INTENT_OPEN);
289 ldlm_lock_list_put(&cancels, l_bl_ast, count);
290 RETURN(ERR_PTR(-ENOMEM));
293 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
294 op_data->op_namelen + 1);
295 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
296 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
298 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
300 ptlrpc_request_free(req);
304 spin_lock(&req->rq_lock);
305 req->rq_replay = req->rq_import->imp_replayable;
306 spin_unlock(&req->rq_lock);
308 /* pack the intent */
309 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
310 lit->opc = (__u64)it->it_op;
312 /* pack the intended request */
313 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
316 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
317 obddev->u.cli.cl_max_mds_easize);
318 ptlrpc_request_set_replen(req);
322 static struct ptlrpc_request *
323 mdc_intent_getxattr_pack(struct obd_export *exp,
324 struct lookup_intent *it,
325 struct md_op_data *op_data)
327 struct ptlrpc_request *req;
328 struct ldlm_intent *lit;
331 struct list_head cancels = LIST_HEAD_INIT(cancels);
335 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
336 &RQF_LDLM_INTENT_GETXATTR);
338 RETURN(ERR_PTR(-ENOMEM));
340 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
342 ptlrpc_request_free(req);
346 /* pack the intent */
347 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
348 lit->opc = IT_GETXATTR;
350 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
352 /* pack the intended request */
353 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
356 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
357 RCL_SERVER, maxdata);
359 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
360 RCL_SERVER, maxdata);
362 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
363 RCL_SERVER, maxdata);
365 ptlrpc_request_set_replen(req);
370 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
371 struct lookup_intent *it,
372 struct md_op_data *op_data)
374 struct ptlrpc_request *req;
375 struct obd_device *obddev = class_exp2obd(exp);
376 struct ldlm_intent *lit;
380 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
381 &RQF_LDLM_INTENT_UNLINK);
383 RETURN(ERR_PTR(-ENOMEM));
385 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
386 op_data->op_namelen + 1);
388 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
390 ptlrpc_request_free(req);
394 /* pack the intent */
395 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
396 lit->opc = (__u64)it->it_op;
398 /* pack the intended request */
399 mdc_unlink_pack(req, op_data);
401 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
402 obddev->u.cli.cl_default_mds_easize);
403 ptlrpc_request_set_replen(req);
407 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
408 struct lookup_intent *it,
409 struct md_op_data *op_data)
411 struct ptlrpc_request *req;
412 struct obd_device *obddev = class_exp2obd(exp);
413 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
414 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
415 OBD_MD_MEA | OBD_MD_FLACL;
416 struct ldlm_intent *lit;
421 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
422 &RQF_LDLM_INTENT_GETATTR);
424 RETURN(ERR_PTR(-ENOMEM));
426 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
427 op_data->op_namelen + 1);
429 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
431 ptlrpc_request_free(req);
435 /* pack the intent */
436 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
437 lit->opc = (__u64)it->it_op;
439 if (obddev->u.cli.cl_default_mds_easize > 0)
440 easize = obddev->u.cli.cl_default_mds_easize;
442 easize = obddev->u.cli.cl_max_mds_easize;
444 /* pack the intended request */
445 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
447 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
448 ptlrpc_request_set_replen(req);
452 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
453 struct lookup_intent *it,
454 struct md_op_data *unused)
456 struct obd_device *obd = class_exp2obd(exp);
457 struct ptlrpc_request *req;
458 struct ldlm_intent *lit;
459 struct layout_intent *layout;
463 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
464 &RQF_LDLM_INTENT_LAYOUT);
466 RETURN(ERR_PTR(-ENOMEM));
468 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
469 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
471 ptlrpc_request_free(req);
475 /* pack the intent */
476 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
477 lit->opc = (__u64)it->it_op;
479 /* pack the layout intent request */
480 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
481 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
482 * set for replication */
483 layout->li_opc = LAYOUT_INTENT_ACCESS;
485 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
486 obd->u.cli.cl_default_mds_easize);
487 ptlrpc_request_set_replen(req);
491 static struct ptlrpc_request *
492 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
494 struct ptlrpc_request *req;
498 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
500 RETURN(ERR_PTR(-ENOMEM));
502 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
504 ptlrpc_request_free(req);
508 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
509 ptlrpc_request_set_replen(req);
513 static int mdc_finish_enqueue(struct obd_export *exp,
514 struct ptlrpc_request *req,
515 struct ldlm_enqueue_info *einfo,
516 struct lookup_intent *it,
517 struct lustre_handle *lockh,
520 struct req_capsule *pill = &req->rq_pill;
521 struct ldlm_request *lockreq;
522 struct ldlm_reply *lockrep;
523 struct ldlm_lock *lock;
524 void *lvb_data = NULL;
529 /* Similarly, if we're going to replay this request, we don't want to
530 * actually get a lock, just perform the intent. */
531 if (req->rq_transno || req->rq_replay) {
532 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
533 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
536 if (rc == ELDLM_LOCK_ABORTED) {
538 memset(lockh, 0, sizeof(*lockh));
540 } else { /* rc = 0 */
541 lock = ldlm_handle2lock(lockh);
542 LASSERT(lock != NULL);
544 /* If the server gave us back a different lock mode, we should
545 * fix up our variables. */
546 if (lock->l_req_mode != einfo->ei_mode) {
547 ldlm_lock_addref(lockh, lock->l_req_mode);
548 ldlm_lock_decref(lockh, einfo->ei_mode);
549 einfo->ei_mode = lock->l_req_mode;
554 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
555 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
557 it->it_disposition = (int)lockrep->lock_policy_res1;
558 it->it_status = (int)lockrep->lock_policy_res2;
559 it->it_lock_mode = einfo->ei_mode;
560 it->it_lock_handle = lockh->cookie;
561 it->it_request = req;
563 /* Technically speaking rq_transno must already be zero if
564 * it_status is in error, so the check is a bit redundant */
565 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
566 mdc_clear_replay_flag(req, it->it_status);
568 /* If we're doing an IT_OPEN which did not result in an actual
569 * successful open, then we need to remove the bit which saves
570 * this request for unconditional replay.
572 * It's important that we do this first! Otherwise we might exit the
573 * function without doing so, and try to replay a failed create
575 if (it->it_op & IT_OPEN && req->rq_replay &&
576 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
577 mdc_clear_replay_flag(req, it->it_status);
579 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
580 it->it_op, it->it_disposition, it->it_status);
582 /* We know what to expect, so we do any byte flipping required here */
583 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
584 struct mdt_body *body;
586 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
588 CERROR ("Can't swab mdt_body\n");
592 if (it_disposition(it, DISP_OPEN_OPEN) &&
593 !it_open_error(DISP_OPEN_OPEN, it)) {
595 * If this is a successful OPEN request, we need to set
596 * replay handler and data early, so that if replay
597 * happens immediately after swabbing below, new reply
598 * is swabbed by that handler correctly.
600 mdc_set_open_replay_data(NULL, NULL, it);
603 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
606 mdc_update_max_ea_from_body(exp, body);
609 * The eadata is opaque; just check that it is there.
610 * Eventually, obd_unpackmd() will check the contents.
612 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
613 body->mbo_eadatasize);
617 /* save lvb data and length in case this is for layout
620 lvb_len = body->mbo_eadatasize;
623 * We save the reply LOV EA in case we have to replay a
624 * create for recovery. If we didn't allocate a large
625 * enough request buffer above we need to reallocate it
626 * here to hold the actual LOV EA.
628 * To not save LOV EA if request is not going to replay
629 * (for example error one).
631 if ((it->it_op & IT_OPEN) && req->rq_replay) {
633 if (req_capsule_get_size(pill, &RMF_EADATA,
635 body->mbo_eadatasize)
636 mdc_realloc_openmsg(req, body);
638 req_capsule_shrink(pill, &RMF_EADATA,
639 body->mbo_eadatasize,
642 req_capsule_set_size(pill, &RMF_EADATA,
644 body->mbo_eadatasize);
646 lmm = req_capsule_client_get(pill, &RMF_EADATA);
649 body->mbo_eadatasize);
652 } else if (it->it_op & IT_LAYOUT) {
653 /* maybe the lock was granted right away and layout
654 * is packed into RMF_DLM_LVB of req */
655 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
657 lvb_data = req_capsule_server_sized_get(pill,
658 &RMF_DLM_LVB, lvb_len);
659 if (lvb_data == NULL)
664 /* fill in stripe data for layout lock.
665 * LU-6581: trust layout data only if layout lock is granted. The MDT
666 * has stopped sending layout unless the layout lock is granted. The
667 * client still does this checking in case it's talking with an old
668 * server. - Jinshan */
669 lock = ldlm_handle2lock(lockh);
670 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
671 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
674 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
675 ldlm_it2str(it->it_op), lvb_len);
677 OBD_ALLOC_LARGE(lmm, lvb_len);
682 memcpy(lmm, lvb_data, lvb_len);
684 /* install lvb_data */
685 lock_res_and_lock(lock);
686 if (lock->l_lvb_data == NULL) {
687 lock->l_lvb_type = LVB_T_LAYOUT;
688 lock->l_lvb_data = lmm;
689 lock->l_lvb_len = lvb_len;
692 unlock_res_and_lock(lock);
694 OBD_FREE_LARGE(lmm, lvb_len);
702 /* We always reserve enough space in the reply packet for a stripe MD, because
703 * we don't know in advance the file type. */
704 int mdc_enqueue(struct obd_export *exp,
705 struct ldlm_enqueue_info *einfo,
706 const union ldlm_policy_data *policy,
707 struct lookup_intent *it, struct md_op_data *op_data,
708 struct lustre_handle *lockh, __u64 extra_lock_flags)
710 struct obd_device *obddev = class_exp2obd(exp);
711 struct ptlrpc_request *req = NULL;
712 __u64 flags, saved_flags = extra_lock_flags;
713 struct ldlm_res_id res_id;
714 static const union ldlm_policy_data lookup_policy = {
715 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
716 static const union ldlm_policy_data update_policy = {
717 .l_inodebits = { MDS_INODELOCK_UPDATE } };
718 static const union ldlm_policy_data layout_policy = {
719 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
720 static const union ldlm_policy_data getxattr_policy = {
721 .l_inodebits = { MDS_INODELOCK_XATTR } };
722 int generation, resends = 0;
723 struct ldlm_reply *lockrep;
724 enum lvb_type lvb_type = 0;
728 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
730 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
733 LASSERT(policy == NULL);
735 saved_flags |= LDLM_FL_HAS_INTENT;
736 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
737 policy = &update_policy;
738 else if (it->it_op & IT_LAYOUT)
739 policy = &layout_policy;
740 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
741 policy = &getxattr_policy;
743 policy = &lookup_policy;
746 generation = obddev->u.cli.cl_import->imp_generation;
750 /* The only way right now is FLOCK. */
751 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
753 res_id.name[3] = LDLM_FLOCK;
754 } else if (it->it_op & IT_OPEN) {
755 req = mdc_intent_open_pack(exp, it, op_data);
756 } else if (it->it_op & IT_UNLINK) {
757 req = mdc_intent_unlink_pack(exp, it, op_data);
758 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
759 req = mdc_intent_getattr_pack(exp, it, op_data);
760 } else if (it->it_op & IT_READDIR) {
761 req = mdc_enqueue_pack(exp, 0);
762 } else if (it->it_op & IT_LAYOUT) {
763 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
765 req = mdc_intent_layout_pack(exp, it, op_data);
766 lvb_type = LVB_T_LAYOUT;
767 } else if (it->it_op & IT_GETXATTR) {
768 req = mdc_intent_getxattr_pack(exp, it, op_data);
775 RETURN(PTR_ERR(req));
778 req->rq_generation_set = 1;
779 req->rq_import_generation = generation;
780 req->rq_sent = cfs_time_current_sec() + resends;
783 /* It is important to obtain modify RPC slot first (if applicable), so
784 * that threads that are waiting for a modify RPC slot are not polluting
785 * our rpcs in flight counter.
786 * We do not do flock request limiting, though */
788 mdc_get_mod_rpc_slot(req, it);
789 rc = obd_get_request_slot(&obddev->u.cli);
791 mdc_put_mod_rpc_slot(req, it);
792 mdc_clear_replay_flag(req, 0);
793 ptlrpc_req_finished(req);
798 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
799 0, lvb_type, lockh, 0);
801 /* For flock requests we immediatelly return without further
802 delay and let caller deal with the rest, since rest of
803 this function metadata processing makes no sense for flock
804 requests anyway. But in case of problem during comms with
805 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
806 can not rely on caller and this mainly for F_UNLCKs
807 (explicits or automatically generated by Kernel to clean
808 current FLocks upon exit) that can't be trashed */
809 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
810 (einfo->ei_type == LDLM_FLOCK) &&
811 (einfo->ei_mode == LCK_NL))
816 obd_put_request_slot(&obddev->u.cli);
817 mdc_put_mod_rpc_slot(req, it);
820 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
821 obddev->obd_name, rc);
823 mdc_clear_replay_flag(req, rc);
824 ptlrpc_req_finished(req);
828 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
829 LASSERT(lockrep != NULL);
831 lockrep->lock_policy_res2 =
832 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
834 /* Retry infinitely when the server returns -EINPROGRESS for the
835 * intent operation, when server returns -EINPROGRESS for acquiring
836 * intent lock, we'll retry in after_reply(). */
837 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
838 mdc_clear_replay_flag(req, rc);
839 ptlrpc_req_finished(req);
842 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
843 obddev->obd_name, resends, it->it_op,
844 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
846 if (generation == obddev->u.cli.cl_import->imp_generation) {
849 CDEBUG(D_HA, "resend cross eviction\n");
854 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
856 if (lustre_handle_is_used(lockh)) {
857 ldlm_lock_decref(lockh, einfo->ei_mode);
858 memset(lockh, 0, sizeof(*lockh));
860 ptlrpc_req_finished(req);
862 it->it_lock_handle = 0;
863 it->it_lock_mode = 0;
864 it->it_request = NULL;
870 static int mdc_finish_intent_lock(struct obd_export *exp,
871 struct ptlrpc_request *request,
872 struct md_op_data *op_data,
873 struct lookup_intent *it,
874 struct lustre_handle *lockh)
876 struct lustre_handle old_lock;
877 struct mdt_body *mdt_body;
878 struct ldlm_lock *lock;
882 LASSERT(request != NULL);
883 LASSERT(request != LP_POISON);
884 LASSERT(request->rq_repmsg != LP_POISON);
886 if (it->it_op & IT_READDIR)
889 if (!it_disposition(it, DISP_IT_EXECD)) {
890 /* The server failed before it even started executing the
891 * intent, i.e. because it couldn't unpack the request. */
892 LASSERT(it->it_status != 0);
893 RETURN(it->it_status);
895 rc = it_open_error(DISP_IT_EXECD, it);
899 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
900 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
902 rc = it_open_error(DISP_LOOKUP_EXECD, it);
906 /* keep requests around for the multiple phases of the call
907 * this shows the DISP_XX must guarantee we make it into the call
909 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
910 it_disposition(it, DISP_OPEN_CREATE) &&
911 !it_open_error(DISP_OPEN_CREATE, it)) {
912 it_set_disposition(it, DISP_ENQ_CREATE_REF);
913 ptlrpc_request_addref(request); /* balanced in ll_create_node */
915 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
916 it_disposition(it, DISP_OPEN_OPEN) &&
917 !it_open_error(DISP_OPEN_OPEN, it)) {
918 it_set_disposition(it, DISP_ENQ_OPEN_REF);
919 ptlrpc_request_addref(request); /* balanced in ll_file_open */
920 /* BUG 11546 - eviction in the middle of open rpc processing */
921 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
924 if (it->it_op & IT_CREAT) {
925 /* XXX this belongs in ll_create_it */
926 } else if (it->it_op == IT_OPEN) {
927 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
929 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
932 /* If we already have a matching lock, then cancel the new
933 * one. We have to set the data here instead of in
934 * mdc_enqueue, because we need to use the child's inode as
935 * the l_ast_data to match, and that's not available until
936 * intent_finish has performed the iget().) */
937 lock = ldlm_handle2lock(lockh);
939 union ldlm_policy_data policy = lock->l_policy_data;
940 LDLM_DEBUG(lock, "matching against this");
942 LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1,
943 &lock->l_resource->lr_name),
944 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
945 PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1));
948 memcpy(&old_lock, lockh, sizeof(*lockh));
949 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
950 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
951 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
952 memcpy(lockh, &old_lock, sizeof(old_lock));
953 it->it_lock_handle = lockh->cookie;
957 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
958 (int)op_data->op_namelen, op_data->op_name,
959 ldlm_it2str(it->it_op), it->it_status,
960 it->it_disposition, rc);
965 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
966 struct lu_fid *fid, __u64 *bits)
968 /* We could just return 1 immediately, but since we should only
969 * be called in revalidate_it if we already have a lock, let's
971 struct ldlm_res_id res_id;
972 struct lustre_handle lockh;
973 union ldlm_policy_data policy;
977 if (it->it_lock_handle) {
978 lockh.cookie = it->it_lock_handle;
979 mode = ldlm_revalidate_lock_handle(&lockh, bits);
981 fid_build_reg_res_name(fid, &res_id);
984 /* File attributes are held under multiple bits:
985 * nlink is under lookup lock, size and times are
986 * under UPDATE lock and recently we've also got
987 * a separate permissions lock for owner/group/acl that
988 * were protected by lookup lock before.
989 * Getattr must provide all of that information,
990 * so we need to ensure we have all of those locks.
991 * Unfortunately, if the bits are split across multiple
992 * locks, there's no easy way to match all of them here,
993 * so an extra RPC would be performed to fetch all
994 * of those bits at once for now. */
995 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
996 * but for old MDTs (< 2.4), permission is covered
997 * by LOOKUP lock, so it needs to match all bits here.*/
998 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
999 MDS_INODELOCK_LOOKUP |
1003 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1006 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1009 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1013 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1014 LDLM_IBITS, &policy,
1015 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1020 it->it_lock_handle = lockh.cookie;
1021 it->it_lock_mode = mode;
1023 it->it_lock_handle = 0;
1024 it->it_lock_mode = 0;
1031 * This long block is all about fixing up the lock and request state
1032 * so that it is correct as of the moment _before_ the operation was
1033 * applied; that way, the VFS will think that everything is normal and
1034 * call Lustre's regular VFS methods.
1036 * If we're performing a creation, that means that unless the creation
1037 * failed with EEXIST, we should fake up a negative dentry.
1039 * For everything else, we want to lookup to succeed.
1041 * One additional note: if CREATE or OPEN succeeded, we add an extra
1042 * reference to the request because we need to keep it around until
1043 * ll_create/ll_open gets called.
1045 * The server will return to us, in it_disposition, an indication of
1046 * exactly what it_status refers to.
1048 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1049 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1050 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1051 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1054 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1057 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1058 struct lookup_intent *it, struct ptlrpc_request **reqp,
1059 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1061 struct ldlm_enqueue_info einfo = {
1062 .ei_type = LDLM_IBITS,
1063 .ei_mode = it_to_lock_mode(it),
1064 .ei_cb_bl = cb_blocking,
1065 .ei_cb_cp = ldlm_completion_ast,
1067 struct lustre_handle lockh;
1072 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1073 ", intent: %s flags %#"LPF64"o\n", (int)op_data->op_namelen,
1074 op_data->op_name, PFID(&op_data->op_fid2),
1075 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1079 if (fid_is_sane(&op_data->op_fid2) &&
1080 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1081 /* We could just return 1 immediately, but since we should only
1082 * be called in revalidate_it if we already have a lock, let's
1084 it->it_lock_handle = 0;
1085 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1086 /* Only return failure if it was not GETATTR by cfid
1087 (from inode_revalidate) */
1088 if (rc || op_data->op_namelen != 0)
1092 /* For case if upper layer did not alloc fid, do it now. */
1093 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1094 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1096 CERROR("Can't alloc new fid, rc %d\n", rc);
1101 rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
1106 *reqp = it->it_request;
1107 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1111 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1112 struct ptlrpc_request *req,
1115 struct mdc_getattr_args *ga = args;
1116 struct obd_export *exp = ga->ga_exp;
1117 struct md_enqueue_info *minfo = ga->ga_minfo;
1118 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1119 struct lookup_intent *it;
1120 struct lustre_handle *lockh;
1121 struct obd_device *obddev;
1122 struct ldlm_reply *lockrep;
1123 __u64 flags = LDLM_FL_HAS_INTENT;
1127 lockh = &minfo->mi_lockh;
1129 obddev = class_exp2obd(exp);
1131 obd_put_request_slot(&obddev->u.cli);
1132 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1135 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1136 &flags, NULL, 0, lockh, rc);
1138 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1139 mdc_clear_replay_flag(req, rc);
1143 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1144 LASSERT(lockrep != NULL);
1146 lockrep->lock_policy_res2 =
1147 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1149 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1153 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1157 minfo->mi_cb(req, minfo, rc);
1161 int mdc_intent_getattr_async(struct obd_export *exp,
1162 struct md_enqueue_info *minfo)
1164 struct md_op_data *op_data = &minfo->mi_data;
1165 struct lookup_intent *it = &minfo->mi_it;
1166 struct ptlrpc_request *req;
1167 struct mdc_getattr_args *ga;
1168 struct obd_device *obddev = class_exp2obd(exp);
1169 struct ldlm_res_id res_id;
1170 union ldlm_policy_data policy = {
1171 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1172 MDS_INODELOCK_UPDATE } };
1174 __u64 flags = LDLM_FL_HAS_INTENT;
1177 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#"
1179 (int)op_data->op_namelen, op_data->op_name,
1180 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1182 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1183 req = mdc_intent_getattr_pack(exp, it, op_data);
1185 RETURN(PTR_ERR(req));
1187 rc = obd_get_request_slot(&obddev->u.cli);
1189 ptlrpc_req_finished(req);
1193 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1194 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1196 obd_put_request_slot(&obddev->u.cli);
1197 ptlrpc_req_finished(req);
1201 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1202 ga = ptlrpc_req_async_args(req);
1204 ga->ga_minfo = minfo;
1206 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1207 ptlrpcd_add_req(req);