4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2014, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
39 #include <linux/module.h>
41 #include <obd_class.h>
42 #include <lustre_dlm.h>
43 #include <lustre_fid.h> /* fid_res_name_eq() */
44 #include <lustre_intent.h>
45 #include <lustre_mdc.h>
46 #include <lustre_net.h>
47 #include <lustre_req_layout.h>
48 #include "mdc_internal.h"
50 struct mdc_getattr_args {
51 struct obd_export *ga_exp;
52 struct md_enqueue_info *ga_minfo;
55 int it_open_error(int phase, struct lookup_intent *it)
57 if (it_disposition(it, DISP_OPEN_LEASE)) {
58 if (phase >= DISP_OPEN_LEASE)
59 return it->d.lustre.it_status;
63 if (it_disposition(it, DISP_OPEN_OPEN)) {
64 if (phase >= DISP_OPEN_OPEN)
65 return it->d.lustre.it_status;
70 if (it_disposition(it, DISP_OPEN_CREATE)) {
71 if (phase >= DISP_OPEN_CREATE)
72 return it->d.lustre.it_status;
77 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78 if (phase >= DISP_LOOKUP_EXECD)
79 return it->d.lustre.it_status;
84 if (it_disposition(it, DISP_IT_EXECD)) {
85 if (phase >= DISP_IT_EXECD)
86 return it->d.lustre.it_status;
90 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
91 it->d.lustre.it_status);
95 EXPORT_SYMBOL(it_open_error);
97 /* this must be called on a lockh that is known to have a referenced lock */
98 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
101 struct ldlm_lock *lock;
102 struct inode *new_inode = data;
111 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
113 LASSERT(lock != NULL);
114 lock_res_and_lock(lock);
115 if (lock->l_resource->lr_lvb_inode &&
116 lock->l_resource->lr_lvb_inode != data) {
117 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
118 LASSERTF(old_inode->i_state & I_FREEING,
119 "Found existing inode %p/%lu/%u state %lu in lock: "
120 "setting data to %p/%lu/%u\n", old_inode,
121 old_inode->i_ino, old_inode->i_generation,
123 new_inode, new_inode->i_ino, new_inode->i_generation);
125 lock->l_resource->lr_lvb_inode = new_inode;
127 *bits = lock->l_policy_data.l_inodebits.bits;
129 unlock_res_and_lock(lock);
135 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
136 const struct lu_fid *fid, enum ldlm_type type,
137 union ldlm_policy_data *policy,
138 enum ldlm_mode mode, struct lustre_handle *lockh)
140 struct ldlm_res_id res_id;
144 fid_build_reg_res_name(fid, &res_id);
145 /* LU-4405: Clear bits not supported by server */
146 policy->l_inodebits.bits &= exp_connect_ibits(exp);
147 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
148 &res_id, type, policy, mode, lockh, 0);
152 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
153 union ldlm_policy_data *policy, enum ldlm_mode mode,
154 enum ldlm_cancel_flags flags, void *opaque)
156 struct obd_device *obd = class_exp2obd(exp);
157 struct ldlm_res_id res_id;
162 fid_build_reg_res_name(fid, &res_id);
163 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
164 policy, mode, flags, opaque);
168 int mdc_null_inode(struct obd_export *exp,
169 const struct lu_fid *fid)
171 struct ldlm_res_id res_id;
172 struct ldlm_resource *res;
173 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
176 LASSERTF(ns != NULL, "no namespace passed\n");
178 fid_build_reg_res_name(fid, &res_id);
180 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
185 res->lr_lvb_inode = NULL;
188 ldlm_resource_putref(res);
192 /* find any ldlm lock of the inode in mdc
196 int mdc_find_cbdata(struct obd_export *exp,
197 const struct lu_fid *fid,
198 ldlm_iterator_t it, void *data)
200 struct ldlm_res_id res_id;
204 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
205 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
207 if (rc == LDLM_ITER_STOP)
209 else if (rc == LDLM_ITER_CONTINUE)
214 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
216 /* Don't hold error requests for replay. */
217 if (req->rq_replay) {
218 spin_lock(&req->rq_lock);
220 spin_unlock(&req->rq_lock);
222 if (rc && req->rq_transno != 0) {
223 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
228 /* Save a large LOV EA into the request buffer so that it is available
229 * for replay. We don't do this in the initial request because the
230 * original request doesn't need this buffer (at most it sends just the
231 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
232 * buffer and may also be difficult to allocate and save a very large
233 * request buffer for each open. (bug 5707)
235 * OOM here may cause recovery failure if lmm is needed (only for the
236 * original open if the MDS crashed just when this client also OOM'd)
237 * but this is incredibly unlikely, and questionable whether the client
238 * could do MDS recovery under OOM anyways... */
239 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
240 struct mdt_body *body)
244 /* FIXME: remove this explicit offset. */
245 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
246 body->mbo_eadatasize);
248 CERROR("Can't enlarge segment %d size to %d\n",
249 DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
250 body->mbo_valid &= ~OBD_MD_FLEASIZE;
251 body->mbo_eadatasize = 0;
255 static struct ptlrpc_request *
256 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
257 struct md_op_data *op_data)
259 struct ptlrpc_request *req;
260 struct obd_device *obddev = class_exp2obd(exp);
261 struct ldlm_intent *lit;
262 const void *lmm = op_data->op_data;
263 __u32 lmmsize = op_data->op_data_size;
264 struct list_head cancels = LIST_HEAD_INIT(cancels);
270 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
272 /* XXX: openlock is not cancelled for cross-refs. */
273 /* If inode is known, cancel conflicting OPEN locks. */
274 if (fid_is_sane(&op_data->op_fid2)) {
275 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
276 if (it->it_flags & FMODE_WRITE)
281 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
284 else if (it->it_flags & FMODE_EXEC)
290 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
295 /* If CREATE, cancel parent's UPDATE lock. */
296 if (it->it_op & IT_CREAT)
300 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
302 MDS_INODELOCK_UPDATE);
304 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
305 &RQF_LDLM_INTENT_OPEN);
307 ldlm_lock_list_put(&cancels, l_bl_ast, count);
308 RETURN(ERR_PTR(-ENOMEM));
311 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
312 op_data->op_namelen + 1);
313 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
314 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
316 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
318 ptlrpc_request_free(req);
322 spin_lock(&req->rq_lock);
323 req->rq_replay = req->rq_import->imp_replayable;
324 spin_unlock(&req->rq_lock);
326 /* pack the intent */
327 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
328 lit->opc = (__u64)it->it_op;
330 /* pack the intended request */
331 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
334 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
335 obddev->u.cli.cl_max_mds_easize);
337 /* for remote client, fetch remote perm for current user */
338 if (client_is_remote(exp))
339 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
340 sizeof(struct mdt_remote_perm));
341 ptlrpc_request_set_replen(req);
345 static struct ptlrpc_request *
346 mdc_intent_getxattr_pack(struct obd_export *exp,
347 struct lookup_intent *it,
348 struct md_op_data *op_data)
350 struct ptlrpc_request *req;
351 struct ldlm_intent *lit;
354 struct list_head cancels = LIST_HEAD_INIT(cancels);
358 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
359 &RQF_LDLM_INTENT_GETXATTR);
361 RETURN(ERR_PTR(-ENOMEM));
363 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
365 ptlrpc_request_free(req);
369 /* pack the intent */
370 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
371 lit->opc = IT_GETXATTR;
373 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
375 /* pack the intended request */
376 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
379 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
380 RCL_SERVER, maxdata);
382 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
383 RCL_SERVER, maxdata);
385 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
386 RCL_SERVER, maxdata);
388 ptlrpc_request_set_replen(req);
393 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
394 struct lookup_intent *it,
395 struct md_op_data *op_data)
397 struct ptlrpc_request *req;
398 struct obd_device *obddev = class_exp2obd(exp);
399 struct ldlm_intent *lit;
403 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
404 &RQF_LDLM_INTENT_UNLINK);
406 RETURN(ERR_PTR(-ENOMEM));
408 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
409 op_data->op_namelen + 1);
411 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
413 ptlrpc_request_free(req);
417 /* pack the intent */
418 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
419 lit->opc = (__u64)it->it_op;
421 /* pack the intended request */
422 mdc_unlink_pack(req, op_data);
424 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
425 obddev->u.cli.cl_default_mds_easize);
426 ptlrpc_request_set_replen(req);
430 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
431 struct lookup_intent *it,
432 struct md_op_data *op_data)
434 struct ptlrpc_request *req;
435 struct obd_device *obddev = class_exp2obd(exp);
436 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
437 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
439 (client_is_remote(exp) ?
440 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
441 struct ldlm_intent *lit;
446 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
447 &RQF_LDLM_INTENT_GETATTR);
449 RETURN(ERR_PTR(-ENOMEM));
451 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
452 op_data->op_namelen + 1);
454 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
456 ptlrpc_request_free(req);
460 /* pack the intent */
461 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
462 lit->opc = (__u64)it->it_op;
464 if (obddev->u.cli.cl_default_mds_easize > 0)
465 easize = obddev->u.cli.cl_default_mds_easize;
467 easize = obddev->u.cli.cl_max_mds_easize;
469 /* pack the intended request */
470 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
472 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
473 if (client_is_remote(exp))
474 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
475 sizeof(struct mdt_remote_perm));
476 ptlrpc_request_set_replen(req);
480 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
481 struct lookup_intent *it,
482 struct md_op_data *unused)
484 struct obd_device *obd = class_exp2obd(exp);
485 struct ptlrpc_request *req;
486 struct ldlm_intent *lit;
487 struct layout_intent *layout;
491 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
492 &RQF_LDLM_INTENT_LAYOUT);
494 RETURN(ERR_PTR(-ENOMEM));
496 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
497 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
499 ptlrpc_request_free(req);
503 /* pack the intent */
504 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
505 lit->opc = (__u64)it->it_op;
507 /* pack the layout intent request */
508 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
509 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
510 * set for replication */
511 layout->li_opc = LAYOUT_INTENT_ACCESS;
513 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
514 obd->u.cli.cl_default_mds_easize);
515 ptlrpc_request_set_replen(req);
519 static struct ptlrpc_request *
520 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
522 struct ptlrpc_request *req;
526 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
528 RETURN(ERR_PTR(-ENOMEM));
530 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
532 ptlrpc_request_free(req);
536 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
537 ptlrpc_request_set_replen(req);
541 static int mdc_finish_enqueue(struct obd_export *exp,
542 struct ptlrpc_request *req,
543 struct ldlm_enqueue_info *einfo,
544 struct lookup_intent *it,
545 struct lustre_handle *lockh,
548 struct req_capsule *pill = &req->rq_pill;
549 struct ldlm_request *lockreq;
550 struct ldlm_reply *lockrep;
551 struct lustre_intent_data *intent = &it->d.lustre;
552 struct ldlm_lock *lock;
553 void *lvb_data = NULL;
558 /* Similarly, if we're going to replay this request, we don't want to
559 * actually get a lock, just perform the intent. */
560 if (req->rq_transno || req->rq_replay) {
561 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
562 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
565 if (rc == ELDLM_LOCK_ABORTED) {
567 memset(lockh, 0, sizeof(*lockh));
569 } else { /* rc = 0 */
570 lock = ldlm_handle2lock(lockh);
571 LASSERT(lock != NULL);
573 /* If the server gave us back a different lock mode, we should
574 * fix up our variables. */
575 if (lock->l_req_mode != einfo->ei_mode) {
576 ldlm_lock_addref(lockh, lock->l_req_mode);
577 ldlm_lock_decref(lockh, einfo->ei_mode);
578 einfo->ei_mode = lock->l_req_mode;
583 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
584 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
586 intent->it_disposition = (int)lockrep->lock_policy_res1;
587 intent->it_status = (int)lockrep->lock_policy_res2;
588 intent->it_lock_mode = einfo->ei_mode;
589 intent->it_lock_handle = lockh->cookie;
590 intent->it_data = req;
592 /* Technically speaking rq_transno must already be zero if
593 * it_status is in error, so the check is a bit redundant */
594 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
595 mdc_clear_replay_flag(req, intent->it_status);
597 /* If we're doing an IT_OPEN which did not result in an actual
598 * successful open, then we need to remove the bit which saves
599 * this request for unconditional replay.
601 * It's important that we do this first! Otherwise we might exit the
602 * function without doing so, and try to replay a failed create
604 if (it->it_op & IT_OPEN && req->rq_replay &&
605 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
606 mdc_clear_replay_flag(req, intent->it_status);
608 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
609 it->it_op, intent->it_disposition, intent->it_status);
611 /* We know what to expect, so we do any byte flipping required here */
612 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
613 struct mdt_body *body;
615 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
617 CERROR ("Can't swab mdt_body\n");
621 if (it_disposition(it, DISP_OPEN_OPEN) &&
622 !it_open_error(DISP_OPEN_OPEN, it)) {
624 * If this is a successful OPEN request, we need to set
625 * replay handler and data early, so that if replay
626 * happens immediately after swabbing below, new reply
627 * is swabbed by that handler correctly.
629 mdc_set_open_replay_data(NULL, NULL, it);
632 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
635 mdc_update_max_ea_from_body(exp, body);
638 * The eadata is opaque; just check that it is there.
639 * Eventually, obd_unpackmd() will check the contents.
641 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
642 body->mbo_eadatasize);
646 /* save lvb data and length in case this is for layout
649 lvb_len = body->mbo_eadatasize;
652 * We save the reply LOV EA in case we have to replay a
653 * create for recovery. If we didn't allocate a large
654 * enough request buffer above we need to reallocate it
655 * here to hold the actual LOV EA.
657 * To not save LOV EA if request is not going to replay
658 * (for example error one).
660 if ((it->it_op & IT_OPEN) && req->rq_replay) {
662 if (req_capsule_get_size(pill, &RMF_EADATA,
664 body->mbo_eadatasize)
665 mdc_realloc_openmsg(req, body);
667 req_capsule_shrink(pill, &RMF_EADATA,
668 body->mbo_eadatasize,
671 req_capsule_set_size(pill, &RMF_EADATA,
673 body->mbo_eadatasize);
675 lmm = req_capsule_client_get(pill, &RMF_EADATA);
678 body->mbo_eadatasize);
682 if (body->mbo_valid & OBD_MD_FLRMTPERM) {
683 struct mdt_remote_perm *perm;
685 LASSERT(client_is_remote(exp));
686 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
687 lustre_swab_mdt_remote_perm);
691 } else if (it->it_op & IT_LAYOUT) {
692 /* maybe the lock was granted right away and layout
693 * is packed into RMF_DLM_LVB of req */
694 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
696 lvb_data = req_capsule_server_sized_get(pill,
697 &RMF_DLM_LVB, lvb_len);
698 if (lvb_data == NULL)
703 /* fill in stripe data for layout lock.
704 * LU-6581: trust layout data only if layout lock is granted. The MDT
705 * has stopped sending layout unless the layout lock is granted. The
706 * client still does this checking in case it's talking with an old
707 * server. - Jinshan */
708 lock = ldlm_handle2lock(lockh);
709 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
710 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
713 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
714 ldlm_it2str(it->it_op), lvb_len);
716 OBD_ALLOC_LARGE(lmm, lvb_len);
721 memcpy(lmm, lvb_data, lvb_len);
723 /* install lvb_data */
724 lock_res_and_lock(lock);
725 if (lock->l_lvb_data == NULL) {
726 lock->l_lvb_type = LVB_T_LAYOUT;
727 lock->l_lvb_data = lmm;
728 lock->l_lvb_len = lvb_len;
731 unlock_res_and_lock(lock);
733 OBD_FREE_LARGE(lmm, lvb_len);
741 /* We always reserve enough space in the reply packet for a stripe MD, because
742 * we don't know in advance the file type. */
743 int mdc_enqueue(struct obd_export *exp,
744 struct ldlm_enqueue_info *einfo,
745 const union ldlm_policy_data *policy,
746 struct lookup_intent *it, struct md_op_data *op_data,
747 struct lustre_handle *lockh, __u64 extra_lock_flags)
749 struct obd_device *obddev = class_exp2obd(exp);
750 struct ptlrpc_request *req = NULL;
751 __u64 flags, saved_flags = extra_lock_flags;
752 struct ldlm_res_id res_id;
753 static const union ldlm_policy_data lookup_policy = {
754 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
755 static const union ldlm_policy_data update_policy = {
756 .l_inodebits = { MDS_INODELOCK_UPDATE } };
757 static const union ldlm_policy_data layout_policy = {
758 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
759 static const union ldlm_policy_data getxattr_policy = {
760 .l_inodebits = { MDS_INODELOCK_XATTR } };
761 int generation, resends = 0;
762 struct ldlm_reply *lockrep;
763 enum lvb_type lvb_type = 0;
767 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
769 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
772 LASSERT(policy == NULL);
774 saved_flags |= LDLM_FL_HAS_INTENT;
775 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
776 policy = &update_policy;
777 else if (it->it_op & IT_LAYOUT)
778 policy = &layout_policy;
779 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
780 policy = &getxattr_policy;
782 policy = &lookup_policy;
785 generation = obddev->u.cli.cl_import->imp_generation;
789 /* The only way right now is FLOCK. */
790 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
792 res_id.name[3] = LDLM_FLOCK;
793 } else if (it->it_op & IT_OPEN) {
794 req = mdc_intent_open_pack(exp, it, op_data);
795 } else if (it->it_op & IT_UNLINK) {
796 req = mdc_intent_unlink_pack(exp, it, op_data);
797 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
798 req = mdc_intent_getattr_pack(exp, it, op_data);
799 } else if (it->it_op & IT_READDIR) {
800 req = mdc_enqueue_pack(exp, 0);
801 } else if (it->it_op & IT_LAYOUT) {
802 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
804 req = mdc_intent_layout_pack(exp, it, op_data);
805 lvb_type = LVB_T_LAYOUT;
806 } else if (it->it_op & IT_GETXATTR) {
807 req = mdc_intent_getxattr_pack(exp, it, op_data);
814 RETURN(PTR_ERR(req));
817 req->rq_generation_set = 1;
818 req->rq_import_generation = generation;
819 req->rq_sent = cfs_time_current_sec() + resends;
822 /* It is important to obtain modify RPC slot first (if applicable), so
823 * that threads that are waiting for a modify RPC slot are not polluting
824 * our rpcs in flight counter.
825 * We do not do flock request limiting, though */
827 mdc_get_mod_rpc_slot(req, it);
828 rc = obd_get_request_slot(&obddev->u.cli);
830 mdc_put_mod_rpc_slot(req, it);
831 mdc_clear_replay_flag(req, 0);
832 ptlrpc_req_finished(req);
837 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
838 0, lvb_type, lockh, 0);
840 /* For flock requests we immediatelly return without further
841 delay and let caller deal with the rest, since rest of
842 this function metadata processing makes no sense for flock
843 requests anyway. But in case of problem during comms with
844 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
845 can not rely on caller and this mainly for F_UNLCKs
846 (explicits or automatically generated by Kernel to clean
847 current FLocks upon exit) that can't be trashed */
848 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
849 (einfo->ei_type == LDLM_FLOCK) &&
850 (einfo->ei_mode == LCK_NL))
855 obd_put_request_slot(&obddev->u.cli);
856 mdc_put_mod_rpc_slot(req, it);
859 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
860 obddev->obd_name, rc);
862 mdc_clear_replay_flag(req, rc);
863 ptlrpc_req_finished(req);
867 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
868 LASSERT(lockrep != NULL);
870 lockrep->lock_policy_res2 =
871 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
873 /* Retry infinitely when the server returns -EINPROGRESS for the
874 * intent operation, when server returns -EINPROGRESS for acquiring
875 * intent lock, we'll retry in after_reply(). */
876 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
877 mdc_clear_replay_flag(req, rc);
878 ptlrpc_req_finished(req);
881 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
882 obddev->obd_name, resends, it->it_op,
883 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
885 if (generation == obddev->u.cli.cl_import->imp_generation) {
888 CDEBUG(D_HA, "resend cross eviction\n");
893 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
895 if (lustre_handle_is_used(lockh)) {
896 ldlm_lock_decref(lockh, einfo->ei_mode);
897 memset(lockh, 0, sizeof(*lockh));
899 ptlrpc_req_finished(req);
901 it->d.lustre.it_lock_handle = 0;
902 it->d.lustre.it_lock_mode = 0;
903 it->d.lustre.it_data = NULL;
909 static int mdc_finish_intent_lock(struct obd_export *exp,
910 struct ptlrpc_request *request,
911 struct md_op_data *op_data,
912 struct lookup_intent *it,
913 struct lustre_handle *lockh)
915 struct lustre_handle old_lock;
916 struct mdt_body *mdt_body;
917 struct ldlm_lock *lock;
921 LASSERT(request != NULL);
922 LASSERT(request != LP_POISON);
923 LASSERT(request->rq_repmsg != LP_POISON);
925 if (it->it_op & IT_READDIR)
928 if (!it_disposition(it, DISP_IT_EXECD)) {
929 /* The server failed before it even started executing the
930 * intent, i.e. because it couldn't unpack the request. */
931 LASSERT(it->d.lustre.it_status != 0);
932 RETURN(it->d.lustre.it_status);
934 rc = it_open_error(DISP_IT_EXECD, it);
938 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
939 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
941 rc = it_open_error(DISP_LOOKUP_EXECD, it);
945 /* keep requests around for the multiple phases of the call
946 * this shows the DISP_XX must guarantee we make it into the call
948 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
949 it_disposition(it, DISP_OPEN_CREATE) &&
950 !it_open_error(DISP_OPEN_CREATE, it)) {
951 it_set_disposition(it, DISP_ENQ_CREATE_REF);
952 ptlrpc_request_addref(request); /* balanced in ll_create_node */
954 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
955 it_disposition(it, DISP_OPEN_OPEN) &&
956 !it_open_error(DISP_OPEN_OPEN, it)) {
957 it_set_disposition(it, DISP_ENQ_OPEN_REF);
958 ptlrpc_request_addref(request); /* balanced in ll_file_open */
959 /* BUG 11546 - eviction in the middle of open rpc processing */
960 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
963 if (it->it_op & IT_CREAT) {
964 /* XXX this belongs in ll_create_it */
965 } else if (it->it_op == IT_OPEN) {
966 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
968 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
971 /* If we already have a matching lock, then cancel the new
972 * one. We have to set the data here instead of in
973 * mdc_enqueue, because we need to use the child's inode as
974 * the l_ast_data to match, and that's not available until
975 * intent_finish has performed the iget().) */
976 lock = ldlm_handle2lock(lockh);
978 union ldlm_policy_data policy = lock->l_policy_data;
979 LDLM_DEBUG(lock, "matching against this");
981 LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1,
982 &lock->l_resource->lr_name),
983 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
984 PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1));
987 memcpy(&old_lock, lockh, sizeof(*lockh));
988 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
989 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
990 ldlm_lock_decref_and_cancel(lockh,
991 it->d.lustre.it_lock_mode);
992 memcpy(lockh, &old_lock, sizeof(old_lock));
993 it->d.lustre.it_lock_handle = lockh->cookie;
996 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
997 (int)op_data->op_namelen, op_data->op_name,
998 ldlm_it2str(it->it_op), it->d.lustre.it_status,
999 it->d.lustre.it_disposition, rc);
1003 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1004 struct lu_fid *fid, __u64 *bits)
1006 /* We could just return 1 immediately, but since we should only
1007 * be called in revalidate_it if we already have a lock, let's
1009 struct ldlm_res_id res_id;
1010 struct lustre_handle lockh;
1011 union ldlm_policy_data policy;
1012 enum ldlm_mode mode;
1015 if (it->d.lustre.it_lock_handle) {
1016 lockh.cookie = it->d.lustre.it_lock_handle;
1017 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1019 fid_build_reg_res_name(fid, &res_id);
1020 switch (it->it_op) {
1022 /* File attributes are held under multiple bits:
1023 * nlink is under lookup lock, size and times are
1024 * under UPDATE lock and recently we've also got
1025 * a separate permissions lock for owner/group/acl that
1026 * were protected by lookup lock before.
1027 * Getattr must provide all of that information,
1028 * so we need to ensure we have all of those locks.
1029 * Unfortunately, if the bits are split across multiple
1030 * locks, there's no easy way to match all of them here,
1031 * so an extra RPC would be performed to fetch all
1032 * of those bits at once for now. */
1033 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1034 * but for old MDTs (< 2.4), permission is covered
1035 * by LOOKUP lock, so it needs to match all bits here.*/
1036 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1037 MDS_INODELOCK_LOOKUP |
1041 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1044 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1047 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1051 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1052 LDLM_IBITS, &policy,
1053 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1058 it->d.lustre.it_lock_handle = lockh.cookie;
1059 it->d.lustre.it_lock_mode = mode;
1061 it->d.lustre.it_lock_handle = 0;
1062 it->d.lustre.it_lock_mode = 0;
1069 * This long block is all about fixing up the lock and request state
1070 * so that it is correct as of the moment _before_ the operation was
1071 * applied; that way, the VFS will think that everything is normal and
1072 * call Lustre's regular VFS methods.
1074 * If we're performing a creation, that means that unless the creation
1075 * failed with EEXIST, we should fake up a negative dentry.
1077 * For everything else, we want to lookup to succeed.
1079 * One additional note: if CREATE or OPEN succeeded, we add an extra
1080 * reference to the request because we need to keep it around until
1081 * ll_create/ll_open gets called.
1083 * The server will return to us, in it_disposition, an indication of
1084 * exactly what d.lustre.it_status refers to.
1086 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1087 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1088 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1089 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1092 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1095 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1096 struct lookup_intent *it, struct ptlrpc_request **reqp,
1097 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1099 struct ldlm_enqueue_info einfo = {
1100 .ei_type = LDLM_IBITS,
1101 .ei_mode = it_to_lock_mode(it),
1102 .ei_cb_bl = cb_blocking,
1103 .ei_cb_cp = ldlm_completion_ast,
1105 struct lustre_handle lockh;
1110 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1111 ", intent: %s flags %#"LPF64"o\n", (int)op_data->op_namelen,
1112 op_data->op_name, PFID(&op_data->op_fid2),
1113 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1117 if (fid_is_sane(&op_data->op_fid2) &&
1118 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1119 /* We could just return 1 immediately, but since we should only
1120 * be called in revalidate_it if we already have a lock, let's
1122 it->d.lustre.it_lock_handle = 0;
1123 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1124 /* Only return failure if it was not GETATTR by cfid
1125 (from inode_revalidate) */
1126 if (rc || op_data->op_namelen != 0)
1130 /* For case if upper layer did not alloc fid, do it now. */
1131 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1132 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1134 CERROR("Can't alloc new fid, rc %d\n", rc);
1139 rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
1144 *reqp = it->d.lustre.it_data;
1145 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1149 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1150 struct ptlrpc_request *req,
1153 struct mdc_getattr_args *ga = args;
1154 struct obd_export *exp = ga->ga_exp;
1155 struct md_enqueue_info *minfo = ga->ga_minfo;
1156 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1157 struct lookup_intent *it;
1158 struct lustre_handle *lockh;
1159 struct obd_device *obddev;
1160 struct ldlm_reply *lockrep;
1161 __u64 flags = LDLM_FL_HAS_INTENT;
1165 lockh = &minfo->mi_lockh;
1167 obddev = class_exp2obd(exp);
1169 obd_put_request_slot(&obddev->u.cli);
1170 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1173 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1174 &flags, NULL, 0, lockh, rc);
1176 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1177 mdc_clear_replay_flag(req, rc);
1181 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1182 LASSERT(lockrep != NULL);
1184 lockrep->lock_policy_res2 =
1185 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1187 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1191 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1195 minfo->mi_cb(req, minfo, rc);
1199 int mdc_intent_getattr_async(struct obd_export *exp,
1200 struct md_enqueue_info *minfo)
1202 struct md_op_data *op_data = &minfo->mi_data;
1203 struct lookup_intent *it = &minfo->mi_it;
1204 struct ptlrpc_request *req;
1205 struct mdc_getattr_args *ga;
1206 struct obd_device *obddev = class_exp2obd(exp);
1207 struct ldlm_res_id res_id;
1208 union ldlm_policy_data policy = {
1209 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1210 MDS_INODELOCK_UPDATE } };
1212 __u64 flags = LDLM_FL_HAS_INTENT;
1215 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#"
1217 (int)op_data->op_namelen, op_data->op_name,
1218 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1220 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1221 req = mdc_intent_getattr_pack(exp, it, op_data);
1223 RETURN(PTR_ERR(req));
1225 rc = obd_get_request_slot(&obddev->u.cli);
1227 ptlrpc_req_finished(req);
1231 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1232 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1234 obd_put_request_slot(&obddev->u.cli);
1235 ptlrpc_req_finished(req);
1239 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1240 ga = ptlrpc_req_async_args(req);
1242 ga->ga_minfo = minfo;
1244 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1245 ptlrpcd_add_req(req);