4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2014, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
39 #include <linux/module.h>
41 #include <obd_class.h>
42 #include <lustre_dlm.h>
43 #include <lustre_fid.h> /* fid_res_name_eq() */
44 #include <lustre_intent.h>
45 #include <lustre_mdc.h>
46 #include <lustre_net.h>
47 #include <lustre_req_layout.h>
48 #include "mdc_internal.h"
50 struct mdc_getattr_args {
51 struct obd_export *ga_exp;
52 struct md_enqueue_info *ga_minfo;
53 struct ldlm_enqueue_info *ga_einfo;
56 int it_open_error(int phase, struct lookup_intent *it)
58 if (it_disposition(it, DISP_OPEN_LEASE)) {
59 if (phase >= DISP_OPEN_LEASE)
60 return it->d.lustre.it_status;
64 if (it_disposition(it, DISP_OPEN_OPEN)) {
65 if (phase >= DISP_OPEN_OPEN)
66 return it->d.lustre.it_status;
71 if (it_disposition(it, DISP_OPEN_CREATE)) {
72 if (phase >= DISP_OPEN_CREATE)
73 return it->d.lustre.it_status;
78 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
79 if (phase >= DISP_LOOKUP_EXECD)
80 return it->d.lustre.it_status;
85 if (it_disposition(it, DISP_IT_EXECD)) {
86 if (phase >= DISP_IT_EXECD)
87 return it->d.lustre.it_status;
91 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
92 it->d.lustre.it_status);
96 EXPORT_SYMBOL(it_open_error);
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
102 struct ldlm_lock *lock;
103 struct inode *new_inode = data;
112 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
114 LASSERT(lock != NULL);
115 lock_res_and_lock(lock);
116 if (lock->l_resource->lr_lvb_inode &&
117 lock->l_resource->lr_lvb_inode != data) {
118 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119 LASSERTF(old_inode->i_state & I_FREEING,
120 "Found existing inode %p/%lu/%u state %lu in lock: "
121 "setting data to %p/%lu/%u\n", old_inode,
122 old_inode->i_ino, old_inode->i_generation,
124 new_inode, new_inode->i_ino, new_inode->i_generation);
126 lock->l_resource->lr_lvb_inode = new_inode;
128 *bits = lock->l_policy_data.l_inodebits.bits;
130 unlock_res_and_lock(lock);
136 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
137 const struct lu_fid *fid, ldlm_type_t type,
138 ldlm_policy_data_t *policy, ldlm_mode_t mode,
139 struct lustre_handle *lockh)
141 struct ldlm_res_id res_id;
145 fid_build_reg_res_name(fid, &res_id);
146 /* LU-4405: Clear bits not supported by server */
147 policy->l_inodebits.bits &= exp_connect_ibits(exp);
148 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149 &res_id, type, policy, mode, lockh, 0);
153 int mdc_cancel_unused(struct obd_export *exp,
154 const struct lu_fid *fid,
155 ldlm_policy_data_t *policy,
157 ldlm_cancel_flags_t flags,
160 struct ldlm_res_id res_id;
161 struct obd_device *obd = class_exp2obd(exp);
166 fid_build_reg_res_name(fid, &res_id);
167 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
168 policy, mode, flags, opaque);
172 int mdc_null_inode(struct obd_export *exp,
173 const struct lu_fid *fid)
175 struct ldlm_res_id res_id;
176 struct ldlm_resource *res;
177 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
180 LASSERTF(ns != NULL, "no namespace passed\n");
182 fid_build_reg_res_name(fid, &res_id);
184 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
189 res->lr_lvb_inode = NULL;
192 ldlm_resource_putref(res);
196 /* find any ldlm lock of the inode in mdc
200 int mdc_find_cbdata(struct obd_export *exp,
201 const struct lu_fid *fid,
202 ldlm_iterator_t it, void *data)
204 struct ldlm_res_id res_id;
208 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
209 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
211 if (rc == LDLM_ITER_STOP)
213 else if (rc == LDLM_ITER_CONTINUE)
218 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
220 /* Don't hold error requests for replay. */
221 if (req->rq_replay) {
222 spin_lock(&req->rq_lock);
224 spin_unlock(&req->rq_lock);
226 if (rc && req->rq_transno != 0) {
227 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
232 /* Save a large LOV EA into the request buffer so that it is available
233 * for replay. We don't do this in the initial request because the
234 * original request doesn't need this buffer (at most it sends just the
235 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
236 * buffer and may also be difficult to allocate and save a very large
237 * request buffer for each open. (bug 5707)
239 * OOM here may cause recovery failure if lmm is needed (only for the
240 * original open if the MDS crashed just when this client also OOM'd)
241 * but this is incredibly unlikely, and questionable whether the client
242 * could do MDS recovery under OOM anyways... */
243 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
244 struct mdt_body *body)
248 /* FIXME: remove this explicit offset. */
249 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
250 body->mbo_eadatasize);
252 CERROR("Can't enlarge segment %d size to %d\n",
253 DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
254 body->mbo_valid &= ~OBD_MD_FLEASIZE;
255 body->mbo_eadatasize = 0;
259 static struct ptlrpc_request *
260 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
261 struct md_op_data *op_data)
263 struct ptlrpc_request *req;
264 struct obd_device *obddev = class_exp2obd(exp);
265 struct ldlm_intent *lit;
266 const void *lmm = op_data->op_data;
267 __u32 lmmsize = op_data->op_data_size;
268 struct list_head cancels = LIST_HEAD_INIT(cancels);
274 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
276 /* XXX: openlock is not cancelled for cross-refs. */
277 /* If inode is known, cancel conflicting OPEN locks. */
278 if (fid_is_sane(&op_data->op_fid2)) {
279 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
280 if (it->it_flags & FMODE_WRITE)
285 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
288 else if (it->it_flags & FMODE_EXEC)
294 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
299 /* If CREATE, cancel parent's UPDATE lock. */
300 if (it->it_op & IT_CREAT)
304 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
306 MDS_INODELOCK_UPDATE);
308 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
309 &RQF_LDLM_INTENT_OPEN);
311 ldlm_lock_list_put(&cancels, l_bl_ast, count);
312 RETURN(ERR_PTR(-ENOMEM));
315 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
316 op_data->op_namelen + 1);
317 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
318 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
320 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
322 ptlrpc_request_free(req);
326 spin_lock(&req->rq_lock);
327 req->rq_replay = req->rq_import->imp_replayable;
328 spin_unlock(&req->rq_lock);
330 /* pack the intent */
331 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
332 lit->opc = (__u64)it->it_op;
334 /* pack the intended request */
335 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
338 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
339 obddev->u.cli.cl_max_mds_easize);
341 /* for remote client, fetch remote perm for current user */
342 if (client_is_remote(exp))
343 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
344 sizeof(struct mdt_remote_perm));
345 ptlrpc_request_set_replen(req);
349 static struct ptlrpc_request *
350 mdc_intent_getxattr_pack(struct obd_export *exp,
351 struct lookup_intent *it,
352 struct md_op_data *op_data)
354 struct ptlrpc_request *req;
355 struct ldlm_intent *lit;
358 struct list_head cancels = LIST_HEAD_INIT(cancels);
362 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
363 &RQF_LDLM_INTENT_GETXATTR);
365 RETURN(ERR_PTR(-ENOMEM));
367 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
369 ptlrpc_request_free(req);
373 /* pack the intent */
374 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
375 lit->opc = IT_GETXATTR;
377 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
379 /* pack the intended request */
380 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
383 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
384 RCL_SERVER, maxdata);
386 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
387 RCL_SERVER, maxdata);
389 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
390 RCL_SERVER, maxdata);
392 ptlrpc_request_set_replen(req);
397 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
398 struct lookup_intent *it,
399 struct md_op_data *op_data)
401 struct ptlrpc_request *req;
402 struct obd_device *obddev = class_exp2obd(exp);
403 struct ldlm_intent *lit;
407 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
408 &RQF_LDLM_INTENT_UNLINK);
410 RETURN(ERR_PTR(-ENOMEM));
412 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
413 op_data->op_namelen + 1);
415 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
417 ptlrpc_request_free(req);
421 /* pack the intent */
422 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
423 lit->opc = (__u64)it->it_op;
425 /* pack the intended request */
426 mdc_unlink_pack(req, op_data);
428 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
429 obddev->u.cli.cl_default_mds_easize);
430 ptlrpc_request_set_replen(req);
434 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
435 struct lookup_intent *it,
436 struct md_op_data *op_data)
438 struct ptlrpc_request *req;
439 struct obd_device *obddev = class_exp2obd(exp);
440 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
441 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
443 (client_is_remote(exp) ?
444 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
445 struct ldlm_intent *lit;
450 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
451 &RQF_LDLM_INTENT_GETATTR);
453 RETURN(ERR_PTR(-ENOMEM));
455 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
456 op_data->op_namelen + 1);
458 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
460 ptlrpc_request_free(req);
464 /* pack the intent */
465 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
466 lit->opc = (__u64)it->it_op;
468 if (obddev->u.cli.cl_default_mds_easize > 0)
469 easize = obddev->u.cli.cl_default_mds_easize;
471 easize = obddev->u.cli.cl_max_mds_easize;
473 /* pack the intended request */
474 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
476 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
477 if (client_is_remote(exp))
478 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
479 sizeof(struct mdt_remote_perm));
480 ptlrpc_request_set_replen(req);
484 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
485 struct lookup_intent *it,
486 struct md_op_data *unused)
488 struct obd_device *obd = class_exp2obd(exp);
489 struct ptlrpc_request *req;
490 struct ldlm_intent *lit;
491 struct layout_intent *layout;
495 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
496 &RQF_LDLM_INTENT_LAYOUT);
498 RETURN(ERR_PTR(-ENOMEM));
500 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
501 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
503 ptlrpc_request_free(req);
507 /* pack the intent */
508 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
509 lit->opc = (__u64)it->it_op;
511 /* pack the layout intent request */
512 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
513 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
514 * set for replication */
515 layout->li_opc = LAYOUT_INTENT_ACCESS;
517 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
518 obd->u.cli.cl_default_mds_easize);
519 ptlrpc_request_set_replen(req);
523 static struct ptlrpc_request *
524 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
526 struct ptlrpc_request *req;
530 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
532 RETURN(ERR_PTR(-ENOMEM));
534 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
536 ptlrpc_request_free(req);
540 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
541 ptlrpc_request_set_replen(req);
545 static int mdc_finish_enqueue(struct obd_export *exp,
546 struct ptlrpc_request *req,
547 struct ldlm_enqueue_info *einfo,
548 struct lookup_intent *it,
549 struct lustre_handle *lockh,
552 struct req_capsule *pill = &req->rq_pill;
553 struct ldlm_request *lockreq;
554 struct ldlm_reply *lockrep;
555 struct lustre_intent_data *intent = &it->d.lustre;
556 struct ldlm_lock *lock;
557 void *lvb_data = NULL;
562 /* Similarly, if we're going to replay this request, we don't want to
563 * actually get a lock, just perform the intent. */
564 if (req->rq_transno || req->rq_replay) {
565 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
566 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
569 if (rc == ELDLM_LOCK_ABORTED) {
571 memset(lockh, 0, sizeof(*lockh));
573 } else { /* rc = 0 */
574 lock = ldlm_handle2lock(lockh);
575 LASSERT(lock != NULL);
577 /* If the server gave us back a different lock mode, we should
578 * fix up our variables. */
579 if (lock->l_req_mode != einfo->ei_mode) {
580 ldlm_lock_addref(lockh, lock->l_req_mode);
581 ldlm_lock_decref(lockh, einfo->ei_mode);
582 einfo->ei_mode = lock->l_req_mode;
587 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
588 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
590 intent->it_disposition = (int)lockrep->lock_policy_res1;
591 intent->it_status = (int)lockrep->lock_policy_res2;
592 intent->it_lock_mode = einfo->ei_mode;
593 intent->it_lock_handle = lockh->cookie;
594 intent->it_data = req;
596 /* Technically speaking rq_transno must already be zero if
597 * it_status is in error, so the check is a bit redundant */
598 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
599 mdc_clear_replay_flag(req, intent->it_status);
601 /* If we're doing an IT_OPEN which did not result in an actual
602 * successful open, then we need to remove the bit which saves
603 * this request for unconditional replay.
605 * It's important that we do this first! Otherwise we might exit the
606 * function without doing so, and try to replay a failed create
608 if (it->it_op & IT_OPEN && req->rq_replay &&
609 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
610 mdc_clear_replay_flag(req, intent->it_status);
612 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
613 it->it_op, intent->it_disposition, intent->it_status);
615 /* We know what to expect, so we do any byte flipping required here */
616 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
617 struct mdt_body *body;
619 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
621 CERROR ("Can't swab mdt_body\n");
625 if (it_disposition(it, DISP_OPEN_OPEN) &&
626 !it_open_error(DISP_OPEN_OPEN, it)) {
628 * If this is a successful OPEN request, we need to set
629 * replay handler and data early, so that if replay
630 * happens immediately after swabbing below, new reply
631 * is swabbed by that handler correctly.
633 mdc_set_open_replay_data(NULL, NULL, it);
636 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
639 mdc_update_max_ea_from_body(exp, body);
642 * The eadata is opaque; just check that it is there.
643 * Eventually, obd_unpackmd() will check the contents.
645 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
646 body->mbo_eadatasize);
650 /* save lvb data and length in case this is for layout
653 lvb_len = body->mbo_eadatasize;
656 * We save the reply LOV EA in case we have to replay a
657 * create for recovery. If we didn't allocate a large
658 * enough request buffer above we need to reallocate it
659 * here to hold the actual LOV EA.
661 * To not save LOV EA if request is not going to replay
662 * (for example error one).
664 if ((it->it_op & IT_OPEN) && req->rq_replay) {
666 if (req_capsule_get_size(pill, &RMF_EADATA,
668 body->mbo_eadatasize)
669 mdc_realloc_openmsg(req, body);
671 req_capsule_shrink(pill, &RMF_EADATA,
672 body->mbo_eadatasize,
675 req_capsule_set_size(pill, &RMF_EADATA,
677 body->mbo_eadatasize);
679 lmm = req_capsule_client_get(pill, &RMF_EADATA);
682 body->mbo_eadatasize);
686 if (body->mbo_valid & OBD_MD_FLRMTPERM) {
687 struct mdt_remote_perm *perm;
689 LASSERT(client_is_remote(exp));
690 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
691 lustre_swab_mdt_remote_perm);
695 } else if (it->it_op & IT_LAYOUT) {
696 /* maybe the lock was granted right away and layout
697 * is packed into RMF_DLM_LVB of req */
698 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
700 lvb_data = req_capsule_server_sized_get(pill,
701 &RMF_DLM_LVB, lvb_len);
702 if (lvb_data == NULL)
707 /* fill in stripe data for layout lock.
708 * LU-6581: trust layout data only if layout lock is granted. The MDT
709 * has stopped sending layout unless the layout lock is granted. The
710 * client still does this checking in case it's talking with an old
711 * server. - Jinshan */
712 lock = ldlm_handle2lock(lockh);
713 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
714 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
717 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
718 ldlm_it2str(it->it_op), lvb_len);
720 OBD_ALLOC_LARGE(lmm, lvb_len);
725 memcpy(lmm, lvb_data, lvb_len);
727 /* install lvb_data */
728 lock_res_and_lock(lock);
729 if (lock->l_lvb_data == NULL) {
730 lock->l_lvb_type = LVB_T_LAYOUT;
731 lock->l_lvb_data = lmm;
732 lock->l_lvb_len = lvb_len;
735 unlock_res_and_lock(lock);
737 OBD_FREE_LARGE(lmm, lvb_len);
745 /* We always reserve enough space in the reply packet for a stripe MD, because
746 * we don't know in advance the file type. */
747 int mdc_enqueue(struct obd_export *exp,
748 struct ldlm_enqueue_info *einfo,
749 const union ldlm_policy_data *policy,
750 struct lookup_intent *it, struct md_op_data *op_data,
751 struct lustre_handle *lockh, __u64 extra_lock_flags)
753 struct obd_device *obddev = class_exp2obd(exp);
754 struct ptlrpc_request *req = NULL;
755 __u64 flags, saved_flags = extra_lock_flags;
757 struct ldlm_res_id res_id;
758 static const ldlm_policy_data_t lookup_policy =
759 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
760 static const ldlm_policy_data_t update_policy =
761 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
762 static const ldlm_policy_data_t layout_policy =
763 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
764 static const ldlm_policy_data_t getxattr_policy = {
765 .l_inodebits = { MDS_INODELOCK_XATTR } };
766 int generation, resends = 0;
767 struct ldlm_reply *lockrep;
768 enum lvb_type lvb_type = 0;
771 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
773 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
776 LASSERT(policy == NULL);
778 saved_flags |= LDLM_FL_HAS_INTENT;
779 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
780 policy = &update_policy;
781 else if (it->it_op & IT_LAYOUT)
782 policy = &layout_policy;
783 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
784 policy = &getxattr_policy;
786 policy = &lookup_policy;
789 generation = obddev->u.cli.cl_import->imp_generation;
793 /* The only way right now is FLOCK. */
794 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
796 res_id.name[3] = LDLM_FLOCK;
797 } else if (it->it_op & IT_OPEN) {
798 req = mdc_intent_open_pack(exp, it, op_data);
799 } else if (it->it_op & IT_UNLINK) {
800 req = mdc_intent_unlink_pack(exp, it, op_data);
801 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
802 req = mdc_intent_getattr_pack(exp, it, op_data);
803 } else if (it->it_op & IT_READDIR) {
804 req = mdc_enqueue_pack(exp, 0);
805 } else if (it->it_op & IT_LAYOUT) {
806 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
808 req = mdc_intent_layout_pack(exp, it, op_data);
809 lvb_type = LVB_T_LAYOUT;
810 } else if (it->it_op & IT_GETXATTR) {
811 req = mdc_intent_getxattr_pack(exp, it, op_data);
818 RETURN(PTR_ERR(req));
820 if (req != NULL && it && it->it_op & IT_CREAT)
821 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
823 req->rq_no_retry_einprogress = 1;
826 req->rq_generation_set = 1;
827 req->rq_import_generation = generation;
828 req->rq_sent = cfs_time_current_sec() + resends;
831 /* It is important to obtain rpc_lock first (if applicable), so that
832 * threads that are serialised with rpc_lock are not polluting our
833 * rpcs in flight counter. We do not do flock request limiting, though*/
835 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
836 rc = obd_get_request_slot(&obddev->u.cli);
838 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
839 mdc_clear_replay_flag(req, 0);
840 ptlrpc_req_finished(req);
845 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
846 0, lvb_type, lockh, 0);
848 /* For flock requests we immediatelly return without further
849 delay and let caller deal with the rest, since rest of
850 this function metadata processing makes no sense for flock
851 requests anyway. But in case of problem during comms with
852 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
853 can not rely on caller and this mainly for F_UNLCKs
854 (explicits or automatically generated by Kernel to clean
855 current FLocks upon exit) that can't be trashed */
856 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
857 (einfo->ei_type == LDLM_FLOCK) &&
858 (einfo->ei_mode == LCK_NL))
863 obd_put_request_slot(&obddev->u.cli);
864 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
867 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
868 obddev->obd_name, rc);
870 mdc_clear_replay_flag(req, rc);
871 ptlrpc_req_finished(req);
875 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
876 LASSERT(lockrep != NULL);
878 lockrep->lock_policy_res2 =
879 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
881 /* Retry the create infinitely when we get -EINPROGRESS from
882 * server. This is required by the new quota design. */
883 if (it && it->it_op & IT_CREAT &&
884 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
885 mdc_clear_replay_flag(req, rc);
886 ptlrpc_req_finished(req);
889 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
890 obddev->obd_name, resends, it->it_op,
891 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
893 if (generation == obddev->u.cli.cl_import->imp_generation) {
896 CDEBUG(D_HA, "resend cross eviction\n");
901 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
903 if (lustre_handle_is_used(lockh)) {
904 ldlm_lock_decref(lockh, einfo->ei_mode);
905 memset(lockh, 0, sizeof(*lockh));
907 ptlrpc_req_finished(req);
909 it->d.lustre.it_lock_handle = 0;
910 it->d.lustre.it_lock_mode = 0;
911 it->d.lustre.it_data = NULL;
917 static int mdc_finish_intent_lock(struct obd_export *exp,
918 struct ptlrpc_request *request,
919 struct md_op_data *op_data,
920 struct lookup_intent *it,
921 struct lustre_handle *lockh)
923 struct lustre_handle old_lock;
924 struct mdt_body *mdt_body;
925 struct ldlm_lock *lock;
929 LASSERT(request != NULL);
930 LASSERT(request != LP_POISON);
931 LASSERT(request->rq_repmsg != LP_POISON);
933 if (it->it_op & IT_READDIR)
936 if (!it_disposition(it, DISP_IT_EXECD)) {
937 /* The server failed before it even started executing the
938 * intent, i.e. because it couldn't unpack the request. */
939 LASSERT(it->d.lustre.it_status != 0);
940 RETURN(it->d.lustre.it_status);
942 rc = it_open_error(DISP_IT_EXECD, it);
946 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
947 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
949 rc = it_open_error(DISP_LOOKUP_EXECD, it);
953 /* keep requests around for the multiple phases of the call
954 * this shows the DISP_XX must guarantee we make it into the call
956 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
957 it_disposition(it, DISP_OPEN_CREATE) &&
958 !it_open_error(DISP_OPEN_CREATE, it)) {
959 it_set_disposition(it, DISP_ENQ_CREATE_REF);
960 ptlrpc_request_addref(request); /* balanced in ll_create_node */
962 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
963 it_disposition(it, DISP_OPEN_OPEN) &&
964 !it_open_error(DISP_OPEN_OPEN, it)) {
965 it_set_disposition(it, DISP_ENQ_OPEN_REF);
966 ptlrpc_request_addref(request); /* balanced in ll_file_open */
967 /* BUG 11546 - eviction in the middle of open rpc processing */
968 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
971 if (it->it_op & IT_CREAT) {
972 /* XXX this belongs in ll_create_it */
973 } else if (it->it_op == IT_OPEN) {
974 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
976 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
979 /* If we already have a matching lock, then cancel the new
980 * one. We have to set the data here instead of in
981 * mdc_enqueue, because we need to use the child's inode as
982 * the l_ast_data to match, and that's not available until
983 * intent_finish has performed the iget().) */
984 lock = ldlm_handle2lock(lockh);
986 ldlm_policy_data_t policy = lock->l_policy_data;
987 LDLM_DEBUG(lock, "matching against this");
989 LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1,
990 &lock->l_resource->lr_name),
991 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
992 PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1));
995 memcpy(&old_lock, lockh, sizeof(*lockh));
996 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
997 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
998 ldlm_lock_decref_and_cancel(lockh,
999 it->d.lustre.it_lock_mode);
1000 memcpy(lockh, &old_lock, sizeof(old_lock));
1001 it->d.lustre.it_lock_handle = lockh->cookie;
1004 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1005 (int)op_data->op_namelen, op_data->op_name,
1006 ldlm_it2str(it->it_op), it->d.lustre.it_status,
1007 it->d.lustre.it_disposition, rc);
1011 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1012 struct lu_fid *fid, __u64 *bits)
1014 /* We could just return 1 immediately, but since we should only
1015 * be called in revalidate_it if we already have a lock, let's
1017 struct ldlm_res_id res_id;
1018 struct lustre_handle lockh;
1019 ldlm_policy_data_t policy;
1023 if (it->d.lustre.it_lock_handle) {
1024 lockh.cookie = it->d.lustre.it_lock_handle;
1025 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1027 fid_build_reg_res_name(fid, &res_id);
1028 switch (it->it_op) {
1030 /* File attributes are held under multiple bits:
1031 * nlink is under lookup lock, size and times are
1032 * under UPDATE lock and recently we've also got
1033 * a separate permissions lock for owner/group/acl that
1034 * were protected by lookup lock before.
1035 * Getattr must provide all of that information,
1036 * so we need to ensure we have all of those locks.
1037 * Unfortunately, if the bits are split across multiple
1038 * locks, there's no easy way to match all of them here,
1039 * so an extra RPC would be performed to fetch all
1040 * of those bits at once for now. */
1041 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1042 * but for old MDTs (< 2.4), permission is covered
1043 * by LOOKUP lock, so it needs to match all bits here.*/
1044 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1045 MDS_INODELOCK_LOOKUP |
1049 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1052 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1055 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1059 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1060 LDLM_IBITS, &policy,
1061 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1066 it->d.lustre.it_lock_handle = lockh.cookie;
1067 it->d.lustre.it_lock_mode = mode;
1069 it->d.lustre.it_lock_handle = 0;
1070 it->d.lustre.it_lock_mode = 0;
1077 * This long block is all about fixing up the lock and request state
1078 * so that it is correct as of the moment _before_ the operation was
1079 * applied; that way, the VFS will think that everything is normal and
1080 * call Lustre's regular VFS methods.
1082 * If we're performing a creation, that means that unless the creation
1083 * failed with EEXIST, we should fake up a negative dentry.
1085 * For everything else, we want to lookup to succeed.
1087 * One additional note: if CREATE or OPEN succeeded, we add an extra
1088 * reference to the request because we need to keep it around until
1089 * ll_create/ll_open gets called.
1091 * The server will return to us, in it_disposition, an indication of
1092 * exactly what d.lustre.it_status refers to.
1094 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1095 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1096 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1097 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1100 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1103 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1104 struct lookup_intent *it, struct ptlrpc_request **reqp,
1105 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1107 struct ldlm_enqueue_info einfo = {
1108 .ei_type = LDLM_IBITS,
1109 .ei_mode = it_to_lock_mode(it),
1110 .ei_cb_bl = cb_blocking,
1111 .ei_cb_cp = ldlm_completion_ast,
1113 struct lustre_handle lockh;
1118 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1119 ", intent: %s flags %#"LPF64"o\n", (int)op_data->op_namelen,
1120 op_data->op_name, PFID(&op_data->op_fid2),
1121 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1125 if (fid_is_sane(&op_data->op_fid2) &&
1126 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1127 /* We could just return 1 immediately, but since we should only
1128 * be called in revalidate_it if we already have a lock, let's
1130 it->d.lustre.it_lock_handle = 0;
1131 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1132 /* Only return failure if it was not GETATTR by cfid
1133 (from inode_revalidate) */
1134 if (rc || op_data->op_namelen != 0)
1138 /* For case if upper layer did not alloc fid, do it now. */
1139 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1140 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1142 CERROR("Can't alloc new fid, rc %d\n", rc);
1147 rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
1152 *reqp = it->d.lustre.it_data;
1153 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1157 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1158 struct ptlrpc_request *req,
1161 struct mdc_getattr_args *ga = args;
1162 struct obd_export *exp = ga->ga_exp;
1163 struct md_enqueue_info *minfo = ga->ga_minfo;
1164 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1165 struct lookup_intent *it;
1166 struct lustre_handle *lockh;
1167 struct obd_device *obddev;
1168 struct ldlm_reply *lockrep;
1169 __u64 flags = LDLM_FL_HAS_INTENT;
1173 lockh = &minfo->mi_lockh;
1175 obddev = class_exp2obd(exp);
1177 obd_put_request_slot(&obddev->u.cli);
1178 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1181 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1182 &flags, NULL, 0, lockh, rc);
1184 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1185 mdc_clear_replay_flag(req, rc);
1189 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1190 LASSERT(lockrep != NULL);
1192 lockrep->lock_policy_res2 =
1193 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1195 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1199 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1203 OBD_FREE_PTR(einfo);
1204 minfo->mi_cb(req, minfo, rc);
1208 int mdc_intent_getattr_async(struct obd_export *exp,
1209 struct md_enqueue_info *minfo,
1210 struct ldlm_enqueue_info *einfo)
1212 struct md_op_data *op_data = &minfo->mi_data;
1213 struct lookup_intent *it = &minfo->mi_it;
1214 struct ptlrpc_request *req;
1215 struct mdc_getattr_args *ga;
1216 struct obd_device *obddev = class_exp2obd(exp);
1217 struct ldlm_res_id res_id;
1218 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1219 * for statahead currently. Consider CMD in future, such two bits
1220 * maybe managed by different MDS, should be adjusted then. */
1221 ldlm_policy_data_t policy = {
1222 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1223 MDS_INODELOCK_UPDATE }
1226 __u64 flags = LDLM_FL_HAS_INTENT;
1229 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#"
1231 (int)op_data->op_namelen, op_data->op_name,
1232 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1234 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1235 req = mdc_intent_getattr_pack(exp, it, op_data);
1237 RETURN(PTR_ERR(req));
1239 rc = obd_get_request_slot(&obddev->u.cli);
1241 ptlrpc_req_finished(req);
1245 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1246 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1248 obd_put_request_slot(&obddev->u.cli);
1249 ptlrpc_req_finished(req);
1253 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1254 ga = ptlrpc_req_async_args(req);
1256 ga->ga_minfo = minfo;
1257 ga->ga_einfo = einfo;
1259 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1260 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);