4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2014, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
39 #include <linux/module.h>
41 #include <obd_class.h>
42 #include <lustre_dlm.h>
43 #include <lustre_fid.h> /* fid_res_name_eq() */
44 #include <lustre_intent.h>
45 #include <lustre_mdc.h>
46 #include <lustre_net.h>
47 #include <lustre_req_layout.h>
48 #include "mdc_internal.h"
50 struct mdc_getattr_args {
51 struct obd_export *ga_exp;
52 struct md_enqueue_info *ga_minfo;
53 struct ldlm_enqueue_info *ga_einfo;
56 int it_open_error(int phase, struct lookup_intent *it)
58 if (it_disposition(it, DISP_OPEN_LEASE)) {
59 if (phase >= DISP_OPEN_LEASE)
60 return it->d.lustre.it_status;
64 if (it_disposition(it, DISP_OPEN_OPEN)) {
65 if (phase >= DISP_OPEN_OPEN)
66 return it->d.lustre.it_status;
71 if (it_disposition(it, DISP_OPEN_CREATE)) {
72 if (phase >= DISP_OPEN_CREATE)
73 return it->d.lustre.it_status;
78 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
79 if (phase >= DISP_LOOKUP_EXECD)
80 return it->d.lustre.it_status;
85 if (it_disposition(it, DISP_IT_EXECD)) {
86 if (phase >= DISP_IT_EXECD)
87 return it->d.lustre.it_status;
91 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
92 it->d.lustre.it_status);
96 EXPORT_SYMBOL(it_open_error);
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
102 struct ldlm_lock *lock;
103 struct inode *new_inode = data;
112 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
114 LASSERT(lock != NULL);
115 lock_res_and_lock(lock);
116 if (lock->l_resource->lr_lvb_inode &&
117 lock->l_resource->lr_lvb_inode != data) {
118 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119 LASSERTF(old_inode->i_state & I_FREEING,
120 "Found existing inode %p/%lu/%u state %lu in lock: "
121 "setting data to %p/%lu/%u\n", old_inode,
122 old_inode->i_ino, old_inode->i_generation,
124 new_inode, new_inode->i_ino, new_inode->i_generation);
126 lock->l_resource->lr_lvb_inode = new_inode;
128 *bits = lock->l_policy_data.l_inodebits.bits;
130 unlock_res_and_lock(lock);
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137 const struct lu_fid *fid, enum ldlm_type type,
138 union ldlm_policy_data *policy,
139 enum ldlm_mode mode, struct lustre_handle *lockh)
141 struct ldlm_res_id res_id;
145 fid_build_reg_res_name(fid, &res_id);
146 /* LU-4405: Clear bits not supported by server */
147 policy->l_inodebits.bits &= exp_connect_ibits(exp);
148 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149 &res_id, type, policy, mode, lockh, 0);
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154 union ldlm_policy_data *policy, enum ldlm_mode mode,
155 enum ldlm_cancel_flags flags, void *opaque)
157 struct obd_device *obd = class_exp2obd(exp);
158 struct ldlm_res_id res_id;
163 fid_build_reg_res_name(fid, &res_id);
164 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165 policy, mode, flags, opaque);
169 int mdc_null_inode(struct obd_export *exp,
170 const struct lu_fid *fid)
172 struct ldlm_res_id res_id;
173 struct ldlm_resource *res;
174 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
177 LASSERTF(ns != NULL, "no namespace passed\n");
179 fid_build_reg_res_name(fid, &res_id);
181 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
186 res->lr_lvb_inode = NULL;
189 ldlm_resource_putref(res);
193 /* find any ldlm lock of the inode in mdc
197 int mdc_find_cbdata(struct obd_export *exp,
198 const struct lu_fid *fid,
199 ldlm_iterator_t it, void *data)
201 struct ldlm_res_id res_id;
205 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
206 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
208 if (rc == LDLM_ITER_STOP)
210 else if (rc == LDLM_ITER_CONTINUE)
215 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
217 /* Don't hold error requests for replay. */
218 if (req->rq_replay) {
219 spin_lock(&req->rq_lock);
221 spin_unlock(&req->rq_lock);
223 if (rc && req->rq_transno != 0) {
224 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
229 /* Save a large LOV EA into the request buffer so that it is available
230 * for replay. We don't do this in the initial request because the
231 * original request doesn't need this buffer (at most it sends just the
232 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
233 * buffer and may also be difficult to allocate and save a very large
234 * request buffer for each open. (bug 5707)
236 * OOM here may cause recovery failure if lmm is needed (only for the
237 * original open if the MDS crashed just when this client also OOM'd)
238 * but this is incredibly unlikely, and questionable whether the client
239 * could do MDS recovery under OOM anyways... */
240 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
241 struct mdt_body *body)
245 /* FIXME: remove this explicit offset. */
246 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
247 body->mbo_eadatasize);
249 CERROR("Can't enlarge segment %d size to %d\n",
250 DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
251 body->mbo_valid &= ~OBD_MD_FLEASIZE;
252 body->mbo_eadatasize = 0;
256 static struct ptlrpc_request *
257 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
258 struct md_op_data *op_data)
260 struct ptlrpc_request *req;
261 struct obd_device *obddev = class_exp2obd(exp);
262 struct ldlm_intent *lit;
263 const void *lmm = op_data->op_data;
264 __u32 lmmsize = op_data->op_data_size;
265 struct list_head cancels = LIST_HEAD_INIT(cancels);
271 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
273 /* XXX: openlock is not cancelled for cross-refs. */
274 /* If inode is known, cancel conflicting OPEN locks. */
275 if (fid_is_sane(&op_data->op_fid2)) {
276 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
277 if (it->it_flags & FMODE_WRITE)
282 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
285 else if (it->it_flags & FMODE_EXEC)
291 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
296 /* If CREATE, cancel parent's UPDATE lock. */
297 if (it->it_op & IT_CREAT)
301 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
303 MDS_INODELOCK_UPDATE);
305 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
306 &RQF_LDLM_INTENT_OPEN);
308 ldlm_lock_list_put(&cancels, l_bl_ast, count);
309 RETURN(ERR_PTR(-ENOMEM));
312 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
313 op_data->op_namelen + 1);
314 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
315 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
317 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
319 ptlrpc_request_free(req);
323 spin_lock(&req->rq_lock);
324 req->rq_replay = req->rq_import->imp_replayable;
325 spin_unlock(&req->rq_lock);
327 /* pack the intent */
328 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
329 lit->opc = (__u64)it->it_op;
331 /* pack the intended request */
332 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
335 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
336 obddev->u.cli.cl_max_mds_easize);
338 /* for remote client, fetch remote perm for current user */
339 if (client_is_remote(exp))
340 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
341 sizeof(struct mdt_remote_perm));
342 ptlrpc_request_set_replen(req);
346 static struct ptlrpc_request *
347 mdc_intent_getxattr_pack(struct obd_export *exp,
348 struct lookup_intent *it,
349 struct md_op_data *op_data)
351 struct ptlrpc_request *req;
352 struct ldlm_intent *lit;
355 struct list_head cancels = LIST_HEAD_INIT(cancels);
359 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
360 &RQF_LDLM_INTENT_GETXATTR);
362 RETURN(ERR_PTR(-ENOMEM));
364 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
366 ptlrpc_request_free(req);
370 /* pack the intent */
371 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
372 lit->opc = IT_GETXATTR;
374 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
376 /* pack the intended request */
377 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
380 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
381 RCL_SERVER, maxdata);
383 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
384 RCL_SERVER, maxdata);
386 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
387 RCL_SERVER, maxdata);
389 ptlrpc_request_set_replen(req);
394 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
395 struct lookup_intent *it,
396 struct md_op_data *op_data)
398 struct ptlrpc_request *req;
399 struct obd_device *obddev = class_exp2obd(exp);
400 struct ldlm_intent *lit;
404 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
405 &RQF_LDLM_INTENT_UNLINK);
407 RETURN(ERR_PTR(-ENOMEM));
409 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
410 op_data->op_namelen + 1);
412 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
414 ptlrpc_request_free(req);
418 /* pack the intent */
419 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
420 lit->opc = (__u64)it->it_op;
422 /* pack the intended request */
423 mdc_unlink_pack(req, op_data);
425 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
426 obddev->u.cli.cl_default_mds_easize);
427 ptlrpc_request_set_replen(req);
431 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
432 struct lookup_intent *it,
433 struct md_op_data *op_data)
435 struct ptlrpc_request *req;
436 struct obd_device *obddev = class_exp2obd(exp);
437 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
438 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
440 (client_is_remote(exp) ?
441 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
442 struct ldlm_intent *lit;
447 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
448 &RQF_LDLM_INTENT_GETATTR);
450 RETURN(ERR_PTR(-ENOMEM));
452 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
453 op_data->op_namelen + 1);
455 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
457 ptlrpc_request_free(req);
461 /* pack the intent */
462 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
463 lit->opc = (__u64)it->it_op;
465 if (obddev->u.cli.cl_default_mds_easize > 0)
466 easize = obddev->u.cli.cl_default_mds_easize;
468 easize = obddev->u.cli.cl_max_mds_easize;
470 /* pack the intended request */
471 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
473 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
474 if (client_is_remote(exp))
475 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
476 sizeof(struct mdt_remote_perm));
477 ptlrpc_request_set_replen(req);
481 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
482 struct lookup_intent *it,
483 struct md_op_data *unused)
485 struct obd_device *obd = class_exp2obd(exp);
486 struct ptlrpc_request *req;
487 struct ldlm_intent *lit;
488 struct layout_intent *layout;
492 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
493 &RQF_LDLM_INTENT_LAYOUT);
495 RETURN(ERR_PTR(-ENOMEM));
497 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
498 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
500 ptlrpc_request_free(req);
504 /* pack the intent */
505 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
506 lit->opc = (__u64)it->it_op;
508 /* pack the layout intent request */
509 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
510 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
511 * set for replication */
512 layout->li_opc = LAYOUT_INTENT_ACCESS;
514 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
515 obd->u.cli.cl_default_mds_easize);
516 ptlrpc_request_set_replen(req);
520 static struct ptlrpc_request *
521 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
523 struct ptlrpc_request *req;
527 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
529 RETURN(ERR_PTR(-ENOMEM));
531 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
533 ptlrpc_request_free(req);
537 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
538 ptlrpc_request_set_replen(req);
542 static int mdc_finish_enqueue(struct obd_export *exp,
543 struct ptlrpc_request *req,
544 struct ldlm_enqueue_info *einfo,
545 struct lookup_intent *it,
546 struct lustre_handle *lockh,
549 struct req_capsule *pill = &req->rq_pill;
550 struct ldlm_request *lockreq;
551 struct ldlm_reply *lockrep;
552 struct lustre_intent_data *intent = &it->d.lustre;
553 struct ldlm_lock *lock;
554 void *lvb_data = NULL;
559 /* Similarly, if we're going to replay this request, we don't want to
560 * actually get a lock, just perform the intent. */
561 if (req->rq_transno || req->rq_replay) {
562 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
563 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
566 if (rc == ELDLM_LOCK_ABORTED) {
568 memset(lockh, 0, sizeof(*lockh));
570 } else { /* rc = 0 */
571 lock = ldlm_handle2lock(lockh);
572 LASSERT(lock != NULL);
574 /* If the server gave us back a different lock mode, we should
575 * fix up our variables. */
576 if (lock->l_req_mode != einfo->ei_mode) {
577 ldlm_lock_addref(lockh, lock->l_req_mode);
578 ldlm_lock_decref(lockh, einfo->ei_mode);
579 einfo->ei_mode = lock->l_req_mode;
584 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
585 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
587 intent->it_disposition = (int)lockrep->lock_policy_res1;
588 intent->it_status = (int)lockrep->lock_policy_res2;
589 intent->it_lock_mode = einfo->ei_mode;
590 intent->it_lock_handle = lockh->cookie;
591 intent->it_data = req;
593 /* Technically speaking rq_transno must already be zero if
594 * it_status is in error, so the check is a bit redundant */
595 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
596 mdc_clear_replay_flag(req, intent->it_status);
598 /* If we're doing an IT_OPEN which did not result in an actual
599 * successful open, then we need to remove the bit which saves
600 * this request for unconditional replay.
602 * It's important that we do this first! Otherwise we might exit the
603 * function without doing so, and try to replay a failed create
605 if (it->it_op & IT_OPEN && req->rq_replay &&
606 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
607 mdc_clear_replay_flag(req, intent->it_status);
609 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
610 it->it_op, intent->it_disposition, intent->it_status);
612 /* We know what to expect, so we do any byte flipping required here */
613 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
614 struct mdt_body *body;
616 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
618 CERROR ("Can't swab mdt_body\n");
622 if (it_disposition(it, DISP_OPEN_OPEN) &&
623 !it_open_error(DISP_OPEN_OPEN, it)) {
625 * If this is a successful OPEN request, we need to set
626 * replay handler and data early, so that if replay
627 * happens immediately after swabbing below, new reply
628 * is swabbed by that handler correctly.
630 mdc_set_open_replay_data(NULL, NULL, it);
633 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
636 mdc_update_max_ea_from_body(exp, body);
639 * The eadata is opaque; just check that it is there.
640 * Eventually, obd_unpackmd() will check the contents.
642 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
643 body->mbo_eadatasize);
647 /* save lvb data and length in case this is for layout
650 lvb_len = body->mbo_eadatasize;
653 * We save the reply LOV EA in case we have to replay a
654 * create for recovery. If we didn't allocate a large
655 * enough request buffer above we need to reallocate it
656 * here to hold the actual LOV EA.
658 * To not save LOV EA if request is not going to replay
659 * (for example error one).
661 if ((it->it_op & IT_OPEN) && req->rq_replay) {
663 if (req_capsule_get_size(pill, &RMF_EADATA,
665 body->mbo_eadatasize)
666 mdc_realloc_openmsg(req, body);
668 req_capsule_shrink(pill, &RMF_EADATA,
669 body->mbo_eadatasize,
672 req_capsule_set_size(pill, &RMF_EADATA,
674 body->mbo_eadatasize);
676 lmm = req_capsule_client_get(pill, &RMF_EADATA);
679 body->mbo_eadatasize);
683 if (body->mbo_valid & OBD_MD_FLRMTPERM) {
684 struct mdt_remote_perm *perm;
686 LASSERT(client_is_remote(exp));
687 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
688 lustre_swab_mdt_remote_perm);
692 } else if (it->it_op & IT_LAYOUT) {
693 /* maybe the lock was granted right away and layout
694 * is packed into RMF_DLM_LVB of req */
695 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
697 lvb_data = req_capsule_server_sized_get(pill,
698 &RMF_DLM_LVB, lvb_len);
699 if (lvb_data == NULL)
704 /* fill in stripe data for layout lock.
705 * LU-6581: trust layout data only if layout lock is granted. The MDT
706 * has stopped sending layout unless the layout lock is granted. The
707 * client still does this checking in case it's talking with an old
708 * server. - Jinshan */
709 lock = ldlm_handle2lock(lockh);
710 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
711 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
714 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
715 ldlm_it2str(it->it_op), lvb_len);
717 OBD_ALLOC_LARGE(lmm, lvb_len);
722 memcpy(lmm, lvb_data, lvb_len);
724 /* install lvb_data */
725 lock_res_and_lock(lock);
726 if (lock->l_lvb_data == NULL) {
727 lock->l_lvb_type = LVB_T_LAYOUT;
728 lock->l_lvb_data = lmm;
729 lock->l_lvb_len = lvb_len;
732 unlock_res_and_lock(lock);
734 OBD_FREE_LARGE(lmm, lvb_len);
742 /* We always reserve enough space in the reply packet for a stripe MD, because
743 * we don't know in advance the file type. */
744 int mdc_enqueue(struct obd_export *exp,
745 struct ldlm_enqueue_info *einfo,
746 const union ldlm_policy_data *policy,
747 struct lookup_intent *it, struct md_op_data *op_data,
748 struct lustre_handle *lockh, __u64 extra_lock_flags)
750 struct obd_device *obddev = class_exp2obd(exp);
751 struct ptlrpc_request *req = NULL;
752 __u64 flags, saved_flags = extra_lock_flags;
753 struct ldlm_res_id res_id;
754 static const union ldlm_policy_data lookup_policy = {
755 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
756 static const union ldlm_policy_data update_policy = {
757 .l_inodebits = { MDS_INODELOCK_UPDATE } };
758 static const union ldlm_policy_data layout_policy = {
759 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
760 static const union ldlm_policy_data getxattr_policy = {
761 .l_inodebits = { MDS_INODELOCK_XATTR } };
762 int generation, resends = 0;
763 struct ldlm_reply *lockrep;
764 enum lvb_type lvb_type = 0;
768 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
770 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
773 LASSERT(policy == NULL);
775 saved_flags |= LDLM_FL_HAS_INTENT;
776 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
777 policy = &update_policy;
778 else if (it->it_op & IT_LAYOUT)
779 policy = &layout_policy;
780 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
781 policy = &getxattr_policy;
783 policy = &lookup_policy;
786 generation = obddev->u.cli.cl_import->imp_generation;
790 /* The only way right now is FLOCK. */
791 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
793 res_id.name[3] = LDLM_FLOCK;
794 } else if (it->it_op & IT_OPEN) {
795 req = mdc_intent_open_pack(exp, it, op_data);
796 } else if (it->it_op & IT_UNLINK) {
797 req = mdc_intent_unlink_pack(exp, it, op_data);
798 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
799 req = mdc_intent_getattr_pack(exp, it, op_data);
800 } else if (it->it_op & IT_READDIR) {
801 req = mdc_enqueue_pack(exp, 0);
802 } else if (it->it_op & IT_LAYOUT) {
803 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
805 req = mdc_intent_layout_pack(exp, it, op_data);
806 lvb_type = LVB_T_LAYOUT;
807 } else if (it->it_op & IT_GETXATTR) {
808 req = mdc_intent_getxattr_pack(exp, it, op_data);
815 RETURN(PTR_ERR(req));
818 req->rq_generation_set = 1;
819 req->rq_import_generation = generation;
820 req->rq_sent = cfs_time_current_sec() + resends;
823 /* It is important to obtain modify RPC slot first (if applicable), so
824 * that threads that are waiting for a modify RPC slot are not polluting
825 * our rpcs in flight counter.
826 * We do not do flock request limiting, though */
828 mdc_get_mod_rpc_slot(req, it);
829 rc = obd_get_request_slot(&obddev->u.cli);
831 mdc_put_mod_rpc_slot(req, it);
832 mdc_clear_replay_flag(req, 0);
833 ptlrpc_req_finished(req);
838 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
839 0, lvb_type, lockh, 0);
841 /* For flock requests we immediatelly return without further
842 delay and let caller deal with the rest, since rest of
843 this function metadata processing makes no sense for flock
844 requests anyway. But in case of problem during comms with
845 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
846 can not rely on caller and this mainly for F_UNLCKs
847 (explicits or automatically generated by Kernel to clean
848 current FLocks upon exit) that can't be trashed */
849 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
850 (einfo->ei_type == LDLM_FLOCK) &&
851 (einfo->ei_mode == LCK_NL))
856 obd_put_request_slot(&obddev->u.cli);
857 mdc_put_mod_rpc_slot(req, it);
860 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
861 obddev->obd_name, rc);
863 mdc_clear_replay_flag(req, rc);
864 ptlrpc_req_finished(req);
868 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
869 LASSERT(lockrep != NULL);
871 lockrep->lock_policy_res2 =
872 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
874 /* Retry infinitely when the server returns -EINPROGRESS for the
875 * intent operation, when server returns -EINPROGRESS for acquiring
876 * intent lock, we'll retry in after_reply(). */
877 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
878 mdc_clear_replay_flag(req, rc);
879 ptlrpc_req_finished(req);
882 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
883 obddev->obd_name, resends, it->it_op,
884 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
886 if (generation == obddev->u.cli.cl_import->imp_generation) {
889 CDEBUG(D_HA, "resend cross eviction\n");
894 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
896 if (lustre_handle_is_used(lockh)) {
897 ldlm_lock_decref(lockh, einfo->ei_mode);
898 memset(lockh, 0, sizeof(*lockh));
900 ptlrpc_req_finished(req);
902 it->d.lustre.it_lock_handle = 0;
903 it->d.lustre.it_lock_mode = 0;
904 it->d.lustre.it_data = NULL;
910 static int mdc_finish_intent_lock(struct obd_export *exp,
911 struct ptlrpc_request *request,
912 struct md_op_data *op_data,
913 struct lookup_intent *it,
914 struct lustre_handle *lockh)
916 struct lustre_handle old_lock;
917 struct mdt_body *mdt_body;
918 struct ldlm_lock *lock;
922 LASSERT(request != NULL);
923 LASSERT(request != LP_POISON);
924 LASSERT(request->rq_repmsg != LP_POISON);
926 if (it->it_op & IT_READDIR)
929 if (!it_disposition(it, DISP_IT_EXECD)) {
930 /* The server failed before it even started executing the
931 * intent, i.e. because it couldn't unpack the request. */
932 LASSERT(it->d.lustre.it_status != 0);
933 RETURN(it->d.lustre.it_status);
935 rc = it_open_error(DISP_IT_EXECD, it);
939 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
940 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
942 rc = it_open_error(DISP_LOOKUP_EXECD, it);
946 /* keep requests around for the multiple phases of the call
947 * this shows the DISP_XX must guarantee we make it into the call
949 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
950 it_disposition(it, DISP_OPEN_CREATE) &&
951 !it_open_error(DISP_OPEN_CREATE, it)) {
952 it_set_disposition(it, DISP_ENQ_CREATE_REF);
953 ptlrpc_request_addref(request); /* balanced in ll_create_node */
955 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
956 it_disposition(it, DISP_OPEN_OPEN) &&
957 !it_open_error(DISP_OPEN_OPEN, it)) {
958 it_set_disposition(it, DISP_ENQ_OPEN_REF);
959 ptlrpc_request_addref(request); /* balanced in ll_file_open */
960 /* BUG 11546 - eviction in the middle of open rpc processing */
961 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
964 if (it->it_op & IT_CREAT) {
965 /* XXX this belongs in ll_create_it */
966 } else if (it->it_op == IT_OPEN) {
967 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
969 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
972 /* If we already have a matching lock, then cancel the new
973 * one. We have to set the data here instead of in
974 * mdc_enqueue, because we need to use the child's inode as
975 * the l_ast_data to match, and that's not available until
976 * intent_finish has performed the iget().) */
977 lock = ldlm_handle2lock(lockh);
979 union ldlm_policy_data policy = lock->l_policy_data;
980 LDLM_DEBUG(lock, "matching against this");
982 LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1,
983 &lock->l_resource->lr_name),
984 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
985 PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1));
988 memcpy(&old_lock, lockh, sizeof(*lockh));
989 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
990 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
991 ldlm_lock_decref_and_cancel(lockh,
992 it->d.lustre.it_lock_mode);
993 memcpy(lockh, &old_lock, sizeof(old_lock));
994 it->d.lustre.it_lock_handle = lockh->cookie;
997 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
998 (int)op_data->op_namelen, op_data->op_name,
999 ldlm_it2str(it->it_op), it->d.lustre.it_status,
1000 it->d.lustre.it_disposition, rc);
1004 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1005 struct lu_fid *fid, __u64 *bits)
1007 /* We could just return 1 immediately, but since we should only
1008 * be called in revalidate_it if we already have a lock, let's
1010 struct ldlm_res_id res_id;
1011 struct lustre_handle lockh;
1012 union ldlm_policy_data policy;
1013 enum ldlm_mode mode;
1016 if (it->d.lustre.it_lock_handle) {
1017 lockh.cookie = it->d.lustre.it_lock_handle;
1018 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1020 fid_build_reg_res_name(fid, &res_id);
1021 switch (it->it_op) {
1023 /* File attributes are held under multiple bits:
1024 * nlink is under lookup lock, size and times are
1025 * under UPDATE lock and recently we've also got
1026 * a separate permissions lock for owner/group/acl that
1027 * were protected by lookup lock before.
1028 * Getattr must provide all of that information,
1029 * so we need to ensure we have all of those locks.
1030 * Unfortunately, if the bits are split across multiple
1031 * locks, there's no easy way to match all of them here,
1032 * so an extra RPC would be performed to fetch all
1033 * of those bits at once for now. */
1034 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1035 * but for old MDTs (< 2.4), permission is covered
1036 * by LOOKUP lock, so it needs to match all bits here.*/
1037 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1038 MDS_INODELOCK_LOOKUP |
1042 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1045 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1048 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1052 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1053 LDLM_IBITS, &policy,
1054 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1059 it->d.lustre.it_lock_handle = lockh.cookie;
1060 it->d.lustre.it_lock_mode = mode;
1062 it->d.lustre.it_lock_handle = 0;
1063 it->d.lustre.it_lock_mode = 0;
1070 * This long block is all about fixing up the lock and request state
1071 * so that it is correct as of the moment _before_ the operation was
1072 * applied; that way, the VFS will think that everything is normal and
1073 * call Lustre's regular VFS methods.
1075 * If we're performing a creation, that means that unless the creation
1076 * failed with EEXIST, we should fake up a negative dentry.
1078 * For everything else, we want to lookup to succeed.
1080 * One additional note: if CREATE or OPEN succeeded, we add an extra
1081 * reference to the request because we need to keep it around until
1082 * ll_create/ll_open gets called.
1084 * The server will return to us, in it_disposition, an indication of
1085 * exactly what d.lustre.it_status refers to.
1087 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1088 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1089 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1090 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1093 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1096 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1097 struct lookup_intent *it, struct ptlrpc_request **reqp,
1098 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1100 struct ldlm_enqueue_info einfo = {
1101 .ei_type = LDLM_IBITS,
1102 .ei_mode = it_to_lock_mode(it),
1103 .ei_cb_bl = cb_blocking,
1104 .ei_cb_cp = ldlm_completion_ast,
1106 struct lustre_handle lockh;
1111 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1112 ", intent: %s flags %#"LPF64"o\n", (int)op_data->op_namelen,
1113 op_data->op_name, PFID(&op_data->op_fid2),
1114 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1118 if (fid_is_sane(&op_data->op_fid2) &&
1119 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1120 /* We could just return 1 immediately, but since we should only
1121 * be called in revalidate_it if we already have a lock, let's
1123 it->d.lustre.it_lock_handle = 0;
1124 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1125 /* Only return failure if it was not GETATTR by cfid
1126 (from inode_revalidate) */
1127 if (rc || op_data->op_namelen != 0)
1131 /* For case if upper layer did not alloc fid, do it now. */
1132 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1133 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1135 CERROR("Can't alloc new fid, rc %d\n", rc);
1140 rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
1145 *reqp = it->d.lustre.it_data;
1146 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1150 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1151 struct ptlrpc_request *req,
1154 struct mdc_getattr_args *ga = args;
1155 struct obd_export *exp = ga->ga_exp;
1156 struct md_enqueue_info *minfo = ga->ga_minfo;
1157 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1158 struct lookup_intent *it;
1159 struct lustre_handle *lockh;
1160 struct obd_device *obddev;
1161 struct ldlm_reply *lockrep;
1162 __u64 flags = LDLM_FL_HAS_INTENT;
1166 lockh = &minfo->mi_lockh;
1168 obddev = class_exp2obd(exp);
1170 obd_put_request_slot(&obddev->u.cli);
1171 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1174 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1175 &flags, NULL, 0, lockh, rc);
1177 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1178 mdc_clear_replay_flag(req, rc);
1182 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1183 LASSERT(lockrep != NULL);
1185 lockrep->lock_policy_res2 =
1186 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1188 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1192 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1196 OBD_FREE_PTR(einfo);
1197 minfo->mi_cb(req, minfo, rc);
1201 int mdc_intent_getattr_async(struct obd_export *exp,
1202 struct md_enqueue_info *minfo,
1203 struct ldlm_enqueue_info *einfo)
1205 struct md_op_data *op_data = &minfo->mi_data;
1206 struct lookup_intent *it = &minfo->mi_it;
1207 struct ptlrpc_request *req;
1208 struct mdc_getattr_args *ga;
1209 struct obd_device *obddev = class_exp2obd(exp);
1210 struct ldlm_res_id res_id;
1211 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1212 * for statahead currently. Consider CMD in future, such two bits
1213 * maybe managed by different MDS, should be adjusted then. */
1214 union ldlm_policy_data policy = {
1215 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1216 MDS_INODELOCK_UPDATE } };
1218 __u64 flags = LDLM_FL_HAS_INTENT;
1221 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#"
1223 (int)op_data->op_namelen, op_data->op_name,
1224 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1226 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1227 req = mdc_intent_getattr_pack(exp, it, op_data);
1229 RETURN(PTR_ERR(req));
1231 rc = obd_get_request_slot(&obddev->u.cli);
1233 ptlrpc_req_finished(req);
1237 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1238 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1240 obd_put_request_slot(&obddev->u.cli);
1241 ptlrpc_req_finished(req);
1245 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1246 ga = ptlrpc_req_async_args(req);
1248 ga->ga_minfo = minfo;
1249 ga->ga_einfo = einfo;
1251 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1252 ptlrpcd_add_req(req);