4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2015, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
39 #include <linux/module.h>
42 #include <obd_class.h>
43 #include <lustre_dlm.h>
44 #include <lustre_fid.h>
45 #include <lustre_intent.h>
46 #include <lustre_mdc.h>
47 #include <lustre_net.h>
48 #include <lustre_req_layout.h>
49 #include <lustre_swab.h>
51 #include "mdc_internal.h"
53 struct mdc_getattr_args {
54 struct obd_export *ga_exp;
55 struct md_enqueue_info *ga_minfo;
58 int it_open_error(int phase, struct lookup_intent *it)
60 if (it_disposition(it, DISP_OPEN_LEASE)) {
61 if (phase >= DISP_OPEN_LEASE)
62 return it->d.lustre.it_status;
66 if (it_disposition(it, DISP_OPEN_OPEN)) {
67 if (phase >= DISP_OPEN_OPEN)
68 return it->d.lustre.it_status;
73 if (it_disposition(it, DISP_OPEN_CREATE)) {
74 if (phase >= DISP_OPEN_CREATE)
75 return it->d.lustre.it_status;
80 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
81 if (phase >= DISP_LOOKUP_EXECD)
82 return it->d.lustre.it_status;
87 if (it_disposition(it, DISP_IT_EXECD)) {
88 if (phase >= DISP_IT_EXECD)
89 return it->d.lustre.it_status;
93 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
94 it->d.lustre.it_status);
98 EXPORT_SYMBOL(it_open_error);
100 /* this must be called on a lockh that is known to have a referenced lock */
101 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
104 struct ldlm_lock *lock;
105 struct inode *new_inode = data;
114 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
116 LASSERT(lock != NULL);
117 lock_res_and_lock(lock);
118 if (lock->l_resource->lr_lvb_inode &&
119 lock->l_resource->lr_lvb_inode != data) {
120 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
121 LASSERTF(old_inode->i_state & I_FREEING,
122 "Found existing inode %p/%lu/%u state %lu in lock: "
123 "setting data to %p/%lu/%u\n", old_inode,
124 old_inode->i_ino, old_inode->i_generation,
126 new_inode, new_inode->i_ino, new_inode->i_generation);
128 lock->l_resource->lr_lvb_inode = new_inode;
130 *bits = lock->l_policy_data.l_inodebits.bits;
132 unlock_res_and_lock(lock);
138 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
139 const struct lu_fid *fid, enum ldlm_type type,
140 union ldlm_policy_data *policy,
141 enum ldlm_mode mode, struct lustre_handle *lockh)
143 struct ldlm_res_id res_id;
147 fid_build_reg_res_name(fid, &res_id);
148 /* LU-4405: Clear bits not supported by server */
149 policy->l_inodebits.bits &= exp_connect_ibits(exp);
150 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
151 &res_id, type, policy, mode, lockh, 0);
155 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
156 union ldlm_policy_data *policy, enum ldlm_mode mode,
157 enum ldlm_cancel_flags flags, void *opaque)
159 struct obd_device *obd = class_exp2obd(exp);
160 struct ldlm_res_id res_id;
165 fid_build_reg_res_name(fid, &res_id);
166 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
167 policy, mode, flags, opaque);
171 int mdc_null_inode(struct obd_export *exp,
172 const struct lu_fid *fid)
174 struct ldlm_res_id res_id;
175 struct ldlm_resource *res;
176 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
179 LASSERTF(ns != NULL, "no namespace passed\n");
181 fid_build_reg_res_name(fid, &res_id);
183 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
188 res->lr_lvb_inode = NULL;
191 ldlm_resource_putref(res);
195 /* find any ldlm lock of the inode in mdc
199 int mdc_find_cbdata(struct obd_export *exp,
200 const struct lu_fid *fid,
201 ldlm_iterator_t it, void *data)
203 struct ldlm_res_id res_id;
207 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
208 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
210 if (rc == LDLM_ITER_STOP)
212 else if (rc == LDLM_ITER_CONTINUE)
217 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
219 /* Don't hold error requests for replay. */
220 if (req->rq_replay) {
221 spin_lock(&req->rq_lock);
223 spin_unlock(&req->rq_lock);
225 if (rc && req->rq_transno != 0) {
226 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
231 /* Save a large LOV EA into the request buffer so that it is available
232 * for replay. We don't do this in the initial request because the
233 * original request doesn't need this buffer (at most it sends just the
234 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
235 * buffer and may also be difficult to allocate and save a very large
236 * request buffer for each open. (bug 5707)
238 * OOM here may cause recovery failure if lmm is needed (only for the
239 * original open if the MDS crashed just when this client also OOM'd)
240 * but this is incredibly unlikely, and questionable whether the client
241 * could do MDS recovery under OOM anyways... */
242 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
243 struct mdt_body *body)
247 /* FIXME: remove this explicit offset. */
248 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
249 body->mbo_eadatasize);
251 CERROR("Can't enlarge segment %d size to %d\n",
252 DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
253 body->mbo_valid &= ~OBD_MD_FLEASIZE;
254 body->mbo_eadatasize = 0;
258 static struct ptlrpc_request *
259 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
260 struct md_op_data *op_data)
262 struct ptlrpc_request *req;
263 struct obd_device *obddev = class_exp2obd(exp);
264 struct ldlm_intent *lit;
265 const void *lmm = op_data->op_data;
266 __u32 lmmsize = op_data->op_data_size;
267 struct list_head cancels = LIST_HEAD_INIT(cancels);
273 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
275 /* XXX: openlock is not cancelled for cross-refs. */
276 /* If inode is known, cancel conflicting OPEN locks. */
277 if (fid_is_sane(&op_data->op_fid2)) {
278 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
279 if (it->it_flags & FMODE_WRITE)
284 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
287 else if (it->it_flags & FMODE_EXEC)
293 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
298 /* If CREATE, cancel parent's UPDATE lock. */
299 if (it->it_op & IT_CREAT)
303 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
305 MDS_INODELOCK_UPDATE);
307 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
308 &RQF_LDLM_INTENT_OPEN);
310 ldlm_lock_list_put(&cancels, l_bl_ast, count);
311 RETURN(ERR_PTR(-ENOMEM));
314 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
315 op_data->op_namelen + 1);
316 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
317 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
319 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
321 ptlrpc_request_free(req);
325 spin_lock(&req->rq_lock);
326 req->rq_replay = req->rq_import->imp_replayable;
327 spin_unlock(&req->rq_lock);
329 /* pack the intent */
330 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
331 lit->opc = (__u64)it->it_op;
333 /* pack the intended request */
334 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
337 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
338 obddev->u.cli.cl_max_mds_easize);
340 /* for remote client, fetch remote perm for current user */
341 if (client_is_remote(exp))
342 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
343 sizeof(struct mdt_remote_perm));
344 ptlrpc_request_set_replen(req);
348 static struct ptlrpc_request *
349 mdc_intent_getxattr_pack(struct obd_export *exp,
350 struct lookup_intent *it,
351 struct md_op_data *op_data)
353 struct ptlrpc_request *req;
354 struct ldlm_intent *lit;
357 struct list_head cancels = LIST_HEAD_INIT(cancels);
361 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
362 &RQF_LDLM_INTENT_GETXATTR);
364 RETURN(ERR_PTR(-ENOMEM));
366 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
368 ptlrpc_request_free(req);
372 /* pack the intent */
373 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
374 lit->opc = IT_GETXATTR;
376 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
378 /* pack the intended request */
379 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
382 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
383 RCL_SERVER, maxdata);
385 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
386 RCL_SERVER, maxdata);
388 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
389 RCL_SERVER, maxdata);
391 ptlrpc_request_set_replen(req);
396 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
397 struct lookup_intent *it,
398 struct md_op_data *op_data)
400 struct ptlrpc_request *req;
401 struct obd_device *obddev = class_exp2obd(exp);
402 struct ldlm_intent *lit;
406 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
407 &RQF_LDLM_INTENT_UNLINK);
409 RETURN(ERR_PTR(-ENOMEM));
411 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
412 op_data->op_namelen + 1);
414 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
416 ptlrpc_request_free(req);
420 /* pack the intent */
421 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
422 lit->opc = (__u64)it->it_op;
424 /* pack the intended request */
425 mdc_unlink_pack(req, op_data);
427 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
428 obddev->u.cli.cl_default_mds_easize);
429 ptlrpc_request_set_replen(req);
433 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
434 struct lookup_intent *it,
435 struct md_op_data *op_data)
437 struct ptlrpc_request *req;
438 struct obd_device *obddev = class_exp2obd(exp);
439 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
440 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
442 (client_is_remote(exp) ?
443 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
444 struct ldlm_intent *lit;
449 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
450 &RQF_LDLM_INTENT_GETATTR);
452 RETURN(ERR_PTR(-ENOMEM));
454 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
455 op_data->op_namelen + 1);
457 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
459 ptlrpc_request_free(req);
463 /* pack the intent */
464 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
465 lit->opc = (__u64)it->it_op;
467 if (obddev->u.cli.cl_default_mds_easize > 0)
468 easize = obddev->u.cli.cl_default_mds_easize;
470 easize = obddev->u.cli.cl_max_mds_easize;
472 /* pack the intended request */
473 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
475 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
476 if (client_is_remote(exp))
477 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
478 sizeof(struct mdt_remote_perm));
479 ptlrpc_request_set_replen(req);
483 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
484 struct lookup_intent *it,
485 struct md_op_data *unused)
487 struct obd_device *obd = class_exp2obd(exp);
488 struct ptlrpc_request *req;
489 struct ldlm_intent *lit;
490 struct layout_intent *layout;
494 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
495 &RQF_LDLM_INTENT_LAYOUT);
497 RETURN(ERR_PTR(-ENOMEM));
499 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
500 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
502 ptlrpc_request_free(req);
506 /* pack the intent */
507 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
508 lit->opc = (__u64)it->it_op;
510 /* pack the layout intent request */
511 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
512 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
513 * set for replication */
514 layout->li_opc = LAYOUT_INTENT_ACCESS;
516 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
517 obd->u.cli.cl_default_mds_easize);
518 ptlrpc_request_set_replen(req);
522 static struct ptlrpc_request *
523 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
525 struct ptlrpc_request *req;
529 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
531 RETURN(ERR_PTR(-ENOMEM));
533 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
535 ptlrpc_request_free(req);
539 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
540 ptlrpc_request_set_replen(req);
544 static int mdc_finish_enqueue(struct obd_export *exp,
545 struct ptlrpc_request *req,
546 struct ldlm_enqueue_info *einfo,
547 struct lookup_intent *it,
548 struct lustre_handle *lockh,
551 struct req_capsule *pill = &req->rq_pill;
552 struct ldlm_request *lockreq;
553 struct ldlm_reply *lockrep;
554 struct lustre_intent_data *intent = &it->d.lustre;
555 struct ldlm_lock *lock;
556 void *lvb_data = NULL;
561 /* Similarly, if we're going to replay this request, we don't want to
562 * actually get a lock, just perform the intent. */
563 if (req->rq_transno || req->rq_replay) {
564 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
565 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
568 if (rc == ELDLM_LOCK_ABORTED) {
570 memset(lockh, 0, sizeof(*lockh));
572 } else { /* rc = 0 */
573 lock = ldlm_handle2lock(lockh);
574 LASSERT(lock != NULL);
576 /* If the server gave us back a different lock mode, we should
577 * fix up our variables. */
578 if (lock->l_req_mode != einfo->ei_mode) {
579 ldlm_lock_addref(lockh, lock->l_req_mode);
580 ldlm_lock_decref(lockh, einfo->ei_mode);
581 einfo->ei_mode = lock->l_req_mode;
586 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
587 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
589 intent->it_disposition = (int)lockrep->lock_policy_res1;
590 intent->it_status = (int)lockrep->lock_policy_res2;
591 intent->it_lock_mode = einfo->ei_mode;
592 intent->it_lock_handle = lockh->cookie;
593 intent->it_data = req;
595 /* Technically speaking rq_transno must already be zero if
596 * it_status is in error, so the check is a bit redundant */
597 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
598 mdc_clear_replay_flag(req, intent->it_status);
600 /* If we're doing an IT_OPEN which did not result in an actual
601 * successful open, then we need to remove the bit which saves
602 * this request for unconditional replay.
604 * It's important that we do this first! Otherwise we might exit the
605 * function without doing so, and try to replay a failed create
607 if (it->it_op & IT_OPEN && req->rq_replay &&
608 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
609 mdc_clear_replay_flag(req, intent->it_status);
611 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
612 it->it_op, intent->it_disposition, intent->it_status);
614 /* We know what to expect, so we do any byte flipping required here */
615 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
616 struct mdt_body *body;
618 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
620 CERROR ("Can't swab mdt_body\n");
624 if (it_disposition(it, DISP_OPEN_OPEN) &&
625 !it_open_error(DISP_OPEN_OPEN, it)) {
627 * If this is a successful OPEN request, we need to set
628 * replay handler and data early, so that if replay
629 * happens immediately after swabbing below, new reply
630 * is swabbed by that handler correctly.
632 mdc_set_open_replay_data(NULL, NULL, it);
635 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
638 mdc_update_max_ea_from_body(exp, body);
641 * The eadata is opaque; just check that it is there.
642 * Eventually, obd_unpackmd() will check the contents.
644 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
645 body->mbo_eadatasize);
649 /* save lvb data and length in case this is for layout
652 lvb_len = body->mbo_eadatasize;
655 * We save the reply LOV EA in case we have to replay a
656 * create for recovery. If we didn't allocate a large
657 * enough request buffer above we need to reallocate it
658 * here to hold the actual LOV EA.
660 * To not save LOV EA if request is not going to replay
661 * (for example error one).
663 if ((it->it_op & IT_OPEN) && req->rq_replay) {
665 if (req_capsule_get_size(pill, &RMF_EADATA,
667 body->mbo_eadatasize)
668 mdc_realloc_openmsg(req, body);
670 req_capsule_shrink(pill, &RMF_EADATA,
671 body->mbo_eadatasize,
674 req_capsule_set_size(pill, &RMF_EADATA,
676 body->mbo_eadatasize);
678 lmm = req_capsule_client_get(pill, &RMF_EADATA);
681 body->mbo_eadatasize);
685 if (body->mbo_valid & OBD_MD_FLRMTPERM) {
686 struct mdt_remote_perm *perm;
688 LASSERT(client_is_remote(exp));
689 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
690 lustre_swab_mdt_remote_perm);
694 } else if (it->it_op & IT_LAYOUT) {
695 /* maybe the lock was granted right away and layout
696 * is packed into RMF_DLM_LVB of req */
697 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
699 lvb_data = req_capsule_server_sized_get(pill,
700 &RMF_DLM_LVB, lvb_len);
701 if (lvb_data == NULL)
706 /* fill in stripe data for layout lock.
707 * LU-6581: trust layout data only if layout lock is granted. The MDT
708 * has stopped sending layout unless the layout lock is granted. The
709 * client still does this checking in case it's talking with an old
710 * server. - Jinshan */
711 lock = ldlm_handle2lock(lockh);
712 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
713 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
716 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
717 ldlm_it2str(it->it_op), lvb_len);
719 OBD_ALLOC_LARGE(lmm, lvb_len);
724 memcpy(lmm, lvb_data, lvb_len);
726 /* install lvb_data */
727 lock_res_and_lock(lock);
728 if (lock->l_lvb_data == NULL) {
729 lock->l_lvb_type = LVB_T_LAYOUT;
730 lock->l_lvb_data = lmm;
731 lock->l_lvb_len = lvb_len;
734 unlock_res_and_lock(lock);
736 OBD_FREE_LARGE(lmm, lvb_len);
744 /* We always reserve enough space in the reply packet for a stripe MD, because
745 * we don't know in advance the file type. */
746 int mdc_enqueue(struct obd_export *exp,
747 struct ldlm_enqueue_info *einfo,
748 const union ldlm_policy_data *policy,
749 struct lookup_intent *it, struct md_op_data *op_data,
750 struct lustre_handle *lockh, __u64 extra_lock_flags)
752 struct obd_device *obddev = class_exp2obd(exp);
753 struct ptlrpc_request *req = NULL;
754 __u64 flags, saved_flags = extra_lock_flags;
755 struct ldlm_res_id res_id;
756 static const union ldlm_policy_data lookup_policy = {
757 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
758 static const union ldlm_policy_data update_policy = {
759 .l_inodebits = { MDS_INODELOCK_UPDATE } };
760 static const union ldlm_policy_data layout_policy = {
761 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
762 static const union ldlm_policy_data getxattr_policy = {
763 .l_inodebits = { MDS_INODELOCK_XATTR } };
764 int generation, resends = 0;
765 struct ldlm_reply *lockrep;
766 enum lvb_type lvb_type = 0;
770 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
772 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
775 LASSERT(policy == NULL);
777 saved_flags |= LDLM_FL_HAS_INTENT;
778 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
779 policy = &update_policy;
780 else if (it->it_op & IT_LAYOUT)
781 policy = &layout_policy;
782 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
783 policy = &getxattr_policy;
785 policy = &lookup_policy;
788 generation = obddev->u.cli.cl_import->imp_generation;
792 /* The only way right now is FLOCK. */
793 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
795 res_id.name[3] = LDLM_FLOCK;
796 } else if (it->it_op & IT_OPEN) {
797 req = mdc_intent_open_pack(exp, it, op_data);
798 } else if (it->it_op & IT_UNLINK) {
799 req = mdc_intent_unlink_pack(exp, it, op_data);
800 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
801 req = mdc_intent_getattr_pack(exp, it, op_data);
802 } else if (it->it_op & IT_READDIR) {
803 req = mdc_enqueue_pack(exp, 0);
804 } else if (it->it_op & IT_LAYOUT) {
805 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
807 req = mdc_intent_layout_pack(exp, it, op_data);
808 lvb_type = LVB_T_LAYOUT;
809 } else if (it->it_op & IT_GETXATTR) {
810 req = mdc_intent_getxattr_pack(exp, it, op_data);
817 RETURN(PTR_ERR(req));
820 req->rq_generation_set = 1;
821 req->rq_import_generation = generation;
822 req->rq_sent = cfs_time_current_sec() + resends;
825 /* It is important to obtain modify RPC slot first (if applicable), so
826 * that threads that are waiting for a modify RPC slot are not polluting
827 * our rpcs in flight counter.
828 * We do not do flock request limiting, though */
830 mdc_get_mod_rpc_slot(req, it);
831 rc = obd_get_request_slot(&obddev->u.cli);
833 mdc_put_mod_rpc_slot(req, it);
834 mdc_clear_replay_flag(req, 0);
835 ptlrpc_req_finished(req);
840 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
841 0, lvb_type, lockh, 0);
843 /* For flock requests we immediatelly return without further
844 delay and let caller deal with the rest, since rest of
845 this function metadata processing makes no sense for flock
846 requests anyway. But in case of problem during comms with
847 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
848 can not rely on caller and this mainly for F_UNLCKs
849 (explicits or automatically generated by Kernel to clean
850 current FLocks upon exit) that can't be trashed */
851 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
852 (einfo->ei_type == LDLM_FLOCK) &&
853 (einfo->ei_mode == LCK_NL))
858 obd_put_request_slot(&obddev->u.cli);
859 mdc_put_mod_rpc_slot(req, it);
862 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
863 obddev->obd_name, rc);
865 mdc_clear_replay_flag(req, rc);
866 ptlrpc_req_finished(req);
870 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
871 LASSERT(lockrep != NULL);
873 lockrep->lock_policy_res2 =
874 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
876 /* Retry infinitely when the server returns -EINPROGRESS for the
877 * intent operation, when server returns -EINPROGRESS for acquiring
878 * intent lock, we'll retry in after_reply(). */
879 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
880 mdc_clear_replay_flag(req, rc);
881 ptlrpc_req_finished(req);
884 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
885 obddev->obd_name, resends, it->it_op,
886 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
888 if (generation == obddev->u.cli.cl_import->imp_generation) {
891 CDEBUG(D_HA, "resend cross eviction\n");
896 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
898 if (lustre_handle_is_used(lockh)) {
899 ldlm_lock_decref(lockh, einfo->ei_mode);
900 memset(lockh, 0, sizeof(*lockh));
902 ptlrpc_req_finished(req);
904 it->d.lustre.it_lock_handle = 0;
905 it->d.lustre.it_lock_mode = 0;
906 it->d.lustre.it_data = NULL;
912 static int mdc_finish_intent_lock(struct obd_export *exp,
913 struct ptlrpc_request *request,
914 struct md_op_data *op_data,
915 struct lookup_intent *it,
916 struct lustre_handle *lockh)
918 struct lustre_handle old_lock;
919 struct mdt_body *mdt_body;
920 struct ldlm_lock *lock;
924 LASSERT(request != NULL);
925 LASSERT(request != LP_POISON);
926 LASSERT(request->rq_repmsg != LP_POISON);
928 if (it->it_op & IT_READDIR)
931 if (!it_disposition(it, DISP_IT_EXECD)) {
932 /* The server failed before it even started executing the
933 * intent, i.e. because it couldn't unpack the request. */
934 LASSERT(it->d.lustre.it_status != 0);
935 RETURN(it->d.lustre.it_status);
937 rc = it_open_error(DISP_IT_EXECD, it);
941 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
942 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
944 rc = it_open_error(DISP_LOOKUP_EXECD, it);
948 /* keep requests around for the multiple phases of the call
949 * this shows the DISP_XX must guarantee we make it into the call
951 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
952 it_disposition(it, DISP_OPEN_CREATE) &&
953 !it_open_error(DISP_OPEN_CREATE, it)) {
954 it_set_disposition(it, DISP_ENQ_CREATE_REF);
955 ptlrpc_request_addref(request); /* balanced in ll_create_node */
957 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
958 it_disposition(it, DISP_OPEN_OPEN) &&
959 !it_open_error(DISP_OPEN_OPEN, it)) {
960 it_set_disposition(it, DISP_ENQ_OPEN_REF);
961 ptlrpc_request_addref(request); /* balanced in ll_file_open */
962 /* BUG 11546 - eviction in the middle of open rpc processing */
963 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
966 if (it->it_op & IT_CREAT) {
967 /* XXX this belongs in ll_create_it */
968 } else if (it->it_op == IT_OPEN) {
969 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
971 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
974 /* If we already have a matching lock, then cancel the new
975 * one. We have to set the data here instead of in
976 * mdc_enqueue, because we need to use the child's inode as
977 * the l_ast_data to match, and that's not available until
978 * intent_finish has performed the iget().) */
979 lock = ldlm_handle2lock(lockh);
981 union ldlm_policy_data policy = lock->l_policy_data;
982 LDLM_DEBUG(lock, "matching against this");
984 LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1,
985 &lock->l_resource->lr_name),
986 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
987 PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1));
990 memcpy(&old_lock, lockh, sizeof(*lockh));
991 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
992 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
993 ldlm_lock_decref_and_cancel(lockh,
994 it->d.lustre.it_lock_mode);
995 memcpy(lockh, &old_lock, sizeof(old_lock));
996 it->d.lustre.it_lock_handle = lockh->cookie;
999 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1000 (int)op_data->op_namelen, op_data->op_name,
1001 ldlm_it2str(it->it_op), it->d.lustre.it_status,
1002 it->d.lustre.it_disposition, rc);
1006 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1007 struct lu_fid *fid, __u64 *bits)
1009 /* We could just return 1 immediately, but since we should only
1010 * be called in revalidate_it if we already have a lock, let's
1012 struct ldlm_res_id res_id;
1013 struct lustre_handle lockh;
1014 union ldlm_policy_data policy;
1015 enum ldlm_mode mode;
1018 if (it->d.lustre.it_lock_handle) {
1019 lockh.cookie = it->d.lustre.it_lock_handle;
1020 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1022 fid_build_reg_res_name(fid, &res_id);
1023 switch (it->it_op) {
1025 /* File attributes are held under multiple bits:
1026 * nlink is under lookup lock, size and times are
1027 * under UPDATE lock and recently we've also got
1028 * a separate permissions lock for owner/group/acl that
1029 * were protected by lookup lock before.
1030 * Getattr must provide all of that information,
1031 * so we need to ensure we have all of those locks.
1032 * Unfortunately, if the bits are split across multiple
1033 * locks, there's no easy way to match all of them here,
1034 * so an extra RPC would be performed to fetch all
1035 * of those bits at once for now. */
1036 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1037 * but for old MDTs (< 2.4), permission is covered
1038 * by LOOKUP lock, so it needs to match all bits here.*/
1039 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1040 MDS_INODELOCK_LOOKUP |
1044 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1047 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1050 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1054 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1055 LDLM_IBITS, &policy,
1056 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1061 it->d.lustre.it_lock_handle = lockh.cookie;
1062 it->d.lustre.it_lock_mode = mode;
1064 it->d.lustre.it_lock_handle = 0;
1065 it->d.lustre.it_lock_mode = 0;
1072 * This long block is all about fixing up the lock and request state
1073 * so that it is correct as of the moment _before_ the operation was
1074 * applied; that way, the VFS will think that everything is normal and
1075 * call Lustre's regular VFS methods.
1077 * If we're performing a creation, that means that unless the creation
1078 * failed with EEXIST, we should fake up a negative dentry.
1080 * For everything else, we want to lookup to succeed.
1082 * One additional note: if CREATE or OPEN succeeded, we add an extra
1083 * reference to the request because we need to keep it around until
1084 * ll_create/ll_open gets called.
1086 * The server will return to us, in it_disposition, an indication of
1087 * exactly what d.lustre.it_status refers to.
1089 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1090 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1091 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1092 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1095 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1098 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1099 struct lookup_intent *it, struct ptlrpc_request **reqp,
1100 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1102 struct ldlm_enqueue_info einfo = {
1103 .ei_type = LDLM_IBITS,
1104 .ei_mode = it_to_lock_mode(it),
1105 .ei_cb_bl = cb_blocking,
1106 .ei_cb_cp = ldlm_completion_ast,
1108 struct lustre_handle lockh;
1113 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1114 ", intent: %s flags %#"LPF64"o\n", (int)op_data->op_namelen,
1115 op_data->op_name, PFID(&op_data->op_fid2),
1116 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1120 if (fid_is_sane(&op_data->op_fid2) &&
1121 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1122 /* We could just return 1 immediately, but since we should only
1123 * be called in revalidate_it if we already have a lock, let's
1125 it->d.lustre.it_lock_handle = 0;
1126 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1127 /* Only return failure if it was not GETATTR by cfid
1128 (from inode_revalidate) */
1129 if (rc || op_data->op_namelen != 0)
1133 /* For case if upper layer did not alloc fid, do it now. */
1134 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1135 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1137 CERROR("Can't alloc new fid, rc %d\n", rc);
1142 rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
1147 *reqp = it->d.lustre.it_data;
1148 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1152 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1153 struct ptlrpc_request *req,
1156 struct mdc_getattr_args *ga = args;
1157 struct obd_export *exp = ga->ga_exp;
1158 struct md_enqueue_info *minfo = ga->ga_minfo;
1159 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1160 struct lookup_intent *it;
1161 struct lustre_handle *lockh;
1162 struct obd_device *obddev;
1163 struct ldlm_reply *lockrep;
1164 __u64 flags = LDLM_FL_HAS_INTENT;
1168 lockh = &minfo->mi_lockh;
1170 obddev = class_exp2obd(exp);
1172 obd_put_request_slot(&obddev->u.cli);
1173 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1176 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1177 &flags, NULL, 0, lockh, rc);
1179 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1180 mdc_clear_replay_flag(req, rc);
1184 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1185 LASSERT(lockrep != NULL);
1187 lockrep->lock_policy_res2 =
1188 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1190 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1194 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1198 minfo->mi_cb(req, minfo, rc);
1202 int mdc_intent_getattr_async(struct obd_export *exp,
1203 struct md_enqueue_info *minfo)
1205 struct md_op_data *op_data = &minfo->mi_data;
1206 struct lookup_intent *it = &minfo->mi_it;
1207 struct ptlrpc_request *req;
1208 struct mdc_getattr_args *ga;
1209 struct obd_device *obddev = class_exp2obd(exp);
1210 struct ldlm_res_id res_id;
1211 union ldlm_policy_data policy = {
1212 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1213 MDS_INODELOCK_UPDATE } };
1215 __u64 flags = LDLM_FL_HAS_INTENT;
1218 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#"
1220 (int)op_data->op_namelen, op_data->op_name,
1221 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1223 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1224 req = mdc_intent_getattr_pack(exp, it, op_data);
1226 RETURN(PTR_ERR(req));
1228 rc = obd_get_request_slot(&obddev->u.cli);
1230 ptlrpc_req_finished(req);
1234 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1235 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1237 obd_put_request_slot(&obddev->u.cli);
1238 ptlrpc_req_finished(req);
1242 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1243 ga = ptlrpc_req_async_args(req);
1245 ga->ga_minfo = minfo;
1247 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1248 ptlrpcd_add_req(req);