4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2016, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_MDC
35 #include <linux/module.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
47 #include "mdc_internal.h"
49 struct mdc_getattr_args {
50 struct obd_export *ga_exp;
51 struct md_enqueue_info *ga_minfo;
54 int it_open_error(int phase, struct lookup_intent *it)
56 if (it_disposition(it, DISP_OPEN_LEASE)) {
57 if (phase >= DISP_OPEN_LEASE)
62 if (it_disposition(it, DISP_OPEN_OPEN)) {
63 if (phase >= DISP_OPEN_OPEN)
69 if (it_disposition(it, DISP_OPEN_CREATE)) {
70 if (phase >= DISP_OPEN_CREATE)
76 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
77 if (phase >= DISP_LOOKUP_EXECD)
83 if (it_disposition(it, DISP_IT_EXECD)) {
84 if (phase >= DISP_IT_EXECD)
90 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
95 EXPORT_SYMBOL(it_open_error);
97 /* this must be called on a lockh that is known to have a referenced lock */
98 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
99 void *data, __u64 *bits)
101 struct ldlm_lock *lock;
102 struct inode *new_inode = data;
108 if (!lustre_handle_is_used(lockh))
111 lock = ldlm_handle2lock(lockh);
113 LASSERT(lock != NULL);
114 lock_res_and_lock(lock);
115 if (lock->l_resource->lr_lvb_inode &&
116 lock->l_resource->lr_lvb_inode != data) {
117 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
118 LASSERTF(old_inode->i_state & I_FREEING,
119 "Found existing inode %p/%lu/%u state %lu in lock: "
120 "setting data to %p/%lu/%u\n", old_inode,
121 old_inode->i_ino, old_inode->i_generation,
123 new_inode, new_inode->i_ino, new_inode->i_generation);
125 lock->l_resource->lr_lvb_inode = new_inode;
127 *bits = lock->l_policy_data.l_inodebits.bits;
129 unlock_res_and_lock(lock);
135 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
136 const struct lu_fid *fid, enum ldlm_type type,
137 union ldlm_policy_data *policy,
138 enum ldlm_mode mode, struct lustre_handle *lockh)
140 struct ldlm_res_id res_id;
144 fid_build_reg_res_name(fid, &res_id);
145 /* LU-4405: Clear bits not supported by server */
146 policy->l_inodebits.bits &= exp_connect_ibits(exp);
147 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
148 &res_id, type, policy, mode, lockh, 0);
152 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
153 union ldlm_policy_data *policy, enum ldlm_mode mode,
154 enum ldlm_cancel_flags flags, void *opaque)
156 struct obd_device *obd = class_exp2obd(exp);
157 struct ldlm_res_id res_id;
162 fid_build_reg_res_name(fid, &res_id);
163 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
164 policy, mode, flags, opaque);
168 int mdc_null_inode(struct obd_export *exp,
169 const struct lu_fid *fid)
171 struct ldlm_res_id res_id;
172 struct ldlm_resource *res;
173 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
176 LASSERTF(ns != NULL, "no namespace passed\n");
178 fid_build_reg_res_name(fid, &res_id);
180 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
185 res->lr_lvb_inode = NULL;
188 ldlm_resource_putref(res);
192 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
194 /* Don't hold error requests for replay. */
195 if (req->rq_replay) {
196 spin_lock(&req->rq_lock);
198 spin_unlock(&req->rq_lock);
200 if (rc && req->rq_transno != 0) {
201 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
206 /* Save a large LOV EA into the request buffer so that it is available
207 * for replay. We don't do this in the initial request because the
208 * original request doesn't need this buffer (at most it sends just the
209 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
210 * buffer and may also be difficult to allocate and save a very large
211 * request buffer for each open. (bug 5707)
213 * OOM here may cause recovery failure if lmm is needed (only for the
214 * original open if the MDS crashed just when this client also OOM'd)
215 * but this is incredibly unlikely, and questionable whether the client
216 * could do MDS recovery under OOM anyways... */
217 int mdc_save_lovea(struct ptlrpc_request *req,
218 const struct req_msg_field *field,
219 void *data, u32 size)
221 struct req_capsule *pill = &req->rq_pill;
225 if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
226 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
228 CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
229 req->rq_export->exp_obd->obd_name,
234 req_capsule_shrink(pill, field, size, RCL_CLIENT);
237 req_capsule_set_size(pill, field, RCL_CLIENT, size);
238 lmm = req_capsule_client_get(pill, field);
240 memcpy(lmm, data, size);
245 static struct ptlrpc_request *
246 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
247 struct md_op_data *op_data)
249 struct ptlrpc_request *req;
250 struct obd_device *obddev = class_exp2obd(exp);
251 struct ldlm_intent *lit;
252 const void *lmm = op_data->op_data;
253 __u32 lmmsize = op_data->op_data_size;
254 struct list_head cancels = LIST_HEAD_INIT(cancels);
260 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
262 /* XXX: openlock is not cancelled for cross-refs. */
263 /* If inode is known, cancel conflicting OPEN locks. */
264 if (fid_is_sane(&op_data->op_fid2)) {
265 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
266 if (it->it_flags & FMODE_WRITE)
271 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
274 else if (it->it_flags & FMODE_EXEC)
280 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
285 /* If CREATE, cancel parent's UPDATE lock. */
286 if (it->it_op & IT_CREAT)
290 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
292 MDS_INODELOCK_UPDATE);
294 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
295 &RQF_LDLM_INTENT_OPEN);
297 ldlm_lock_list_put(&cancels, l_bl_ast, count);
298 RETURN(ERR_PTR(-ENOMEM));
301 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
302 op_data->op_namelen + 1);
303 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
304 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
306 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
307 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
308 strlen(op_data->op_file_secctx_name) + 1 : 0);
310 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
311 op_data->op_file_secctx_size);
313 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
315 ptlrpc_request_free(req);
319 spin_lock(&req->rq_lock);
320 req->rq_replay = req->rq_import->imp_replayable;
321 spin_unlock(&req->rq_lock);
323 /* pack the intent */
324 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
325 lit->opc = (__u64)it->it_op;
327 /* pack the intended request */
328 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
331 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
332 obddev->u.cli.cl_max_mds_easize);
333 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
334 req->rq_import->imp_connect_data.ocd_max_easize);
335 ptlrpc_request_set_replen(req);
339 static struct ptlrpc_request *
340 mdc_intent_getxattr_pack(struct obd_export *exp,
341 struct lookup_intent *it,
342 struct md_op_data *op_data)
344 struct ptlrpc_request *req;
345 struct ldlm_intent *lit;
348 struct list_head cancels = LIST_HEAD_INIT(cancels);
352 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
353 &RQF_LDLM_INTENT_GETXATTR);
355 RETURN(ERR_PTR(-ENOMEM));
357 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
359 ptlrpc_request_free(req);
363 /* pack the intent */
364 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
365 lit->opc = IT_GETXATTR;
367 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
369 /* pack the intended request */
370 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
373 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
374 RCL_SERVER, maxdata);
376 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
377 RCL_SERVER, maxdata);
379 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
380 RCL_SERVER, maxdata);
382 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, maxdata);
384 ptlrpc_request_set_replen(req);
389 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
390 struct lookup_intent *it,
391 struct md_op_data *op_data)
393 struct ptlrpc_request *req;
394 struct obd_device *obddev = class_exp2obd(exp);
395 struct ldlm_intent *lit;
399 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
400 &RQF_LDLM_INTENT_UNLINK);
402 RETURN(ERR_PTR(-ENOMEM));
404 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
405 op_data->op_namelen + 1);
407 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
409 ptlrpc_request_free(req);
413 /* pack the intent */
414 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
415 lit->opc = (__u64)it->it_op;
417 /* pack the intended request */
418 mdc_unlink_pack(req, op_data);
420 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
421 obddev->u.cli.cl_default_mds_easize);
422 ptlrpc_request_set_replen(req);
426 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
427 struct lookup_intent *it,
428 struct md_op_data *op_data)
430 struct ptlrpc_request *req;
431 struct obd_device *obddev = class_exp2obd(exp);
432 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
433 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
434 OBD_MD_MEA | OBD_MD_FLACL;
435 struct ldlm_intent *lit;
440 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
441 &RQF_LDLM_INTENT_GETATTR);
443 RETURN(ERR_PTR(-ENOMEM));
445 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
446 op_data->op_namelen + 1);
448 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
450 ptlrpc_request_free(req);
454 /* pack the intent */
455 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
456 lit->opc = (__u64)it->it_op;
458 if (obddev->u.cli.cl_default_mds_easize > 0)
459 easize = obddev->u.cli.cl_default_mds_easize;
461 easize = obddev->u.cli.cl_max_mds_easize;
463 /* pack the intended request */
464 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
466 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
467 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
468 req->rq_import->imp_connect_data.ocd_max_easize);
469 ptlrpc_request_set_replen(req);
473 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
474 struct lookup_intent *it,
475 struct md_op_data *op_data)
477 struct obd_device *obd = class_exp2obd(exp);
478 struct ptlrpc_request *req;
479 struct ldlm_intent *lit;
480 struct layout_intent *layout;
484 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
485 &RQF_LDLM_INTENT_LAYOUT);
487 RETURN(ERR_PTR(-ENOMEM));
489 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
490 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
492 ptlrpc_request_free(req);
496 /* pack the intent */
497 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
498 lit->opc = (__u64)it->it_op;
500 /* pack the layout intent request */
501 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
502 LASSERT(op_data->op_data != NULL);
503 LASSERT(op_data->op_data_size == sizeof(*layout));
504 memcpy(layout, op_data->op_data, sizeof(*layout));
506 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
507 obd->u.cli.cl_default_mds_easize);
508 ptlrpc_request_set_replen(req);
512 static struct ptlrpc_request *
513 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
515 struct ptlrpc_request *req;
519 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
521 RETURN(ERR_PTR(-ENOMEM));
523 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
525 ptlrpc_request_free(req);
529 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
530 ptlrpc_request_set_replen(req);
534 static int mdc_finish_enqueue(struct obd_export *exp,
535 struct ptlrpc_request *req,
536 struct ldlm_enqueue_info *einfo,
537 struct lookup_intent *it,
538 struct lustre_handle *lockh,
541 struct req_capsule *pill = &req->rq_pill;
542 struct ldlm_request *lockreq;
543 struct ldlm_reply *lockrep;
544 struct ldlm_lock *lock;
545 void *lvb_data = NULL;
550 /* Similarly, if we're going to replay this request, we don't want to
551 * actually get a lock, just perform the intent. */
552 if (req->rq_transno || req->rq_replay) {
553 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
554 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
557 if (rc == ELDLM_LOCK_ABORTED) {
559 memset(lockh, 0, sizeof(*lockh));
561 } else { /* rc = 0 */
562 lock = ldlm_handle2lock(lockh);
563 LASSERT(lock != NULL);
565 /* If the server gave us back a different lock mode, we should
566 * fix up our variables. */
567 if (lock->l_req_mode != einfo->ei_mode) {
568 ldlm_lock_addref(lockh, lock->l_req_mode);
569 ldlm_lock_decref(lockh, einfo->ei_mode);
570 einfo->ei_mode = lock->l_req_mode;
575 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
576 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
578 it->it_disposition = (int)lockrep->lock_policy_res1;
579 it->it_status = (int)lockrep->lock_policy_res2;
580 it->it_lock_mode = einfo->ei_mode;
581 it->it_lock_handle = lockh->cookie;
582 it->it_request = req;
584 /* Technically speaking rq_transno must already be zero if
585 * it_status is in error, so the check is a bit redundant */
586 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
587 mdc_clear_replay_flag(req, it->it_status);
589 /* If we're doing an IT_OPEN which did not result in an actual
590 * successful open, then we need to remove the bit which saves
591 * this request for unconditional replay.
593 * It's important that we do this first! Otherwise we might exit the
594 * function without doing so, and try to replay a failed create
596 if (it->it_op & IT_OPEN && req->rq_replay &&
597 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
598 mdc_clear_replay_flag(req, it->it_status);
600 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
601 it->it_op, it->it_disposition, it->it_status);
603 /* We know what to expect, so we do any byte flipping required here */
604 if (it_has_reply_body(it)) {
605 struct mdt_body *body;
607 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
609 CERROR ("Can't swab mdt_body\n");
613 if (it_disposition(it, DISP_OPEN_OPEN) &&
614 !it_open_error(DISP_OPEN_OPEN, it)) {
616 * If this is a successful OPEN request, we need to set
617 * replay handler and data early, so that if replay
618 * happens immediately after swabbing below, new reply
619 * is swabbed by that handler correctly.
621 mdc_set_open_replay_data(NULL, NULL, it);
624 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
627 mdc_update_max_ea_from_body(exp, body);
630 * The eadata is opaque; just check that it is there.
631 * Eventually, obd_unpackmd() will check the contents.
633 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
634 body->mbo_eadatasize);
638 /* save lvb data and length in case this is for layout
641 lvb_len = body->mbo_eadatasize;
644 * We save the reply LOV EA in case we have to replay a
645 * create for recovery. If we didn't allocate a large
646 * enough request buffer above we need to reallocate it
647 * here to hold the actual LOV EA.
649 * To not save LOV EA if request is not going to replay
650 * (for example error one).
652 if ((it->it_op & IT_OPEN) && req->rq_replay) {
653 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
654 body->mbo_eadatasize);
656 body->mbo_valid &= ~OBD_MD_FLEASIZE;
657 body->mbo_eadatasize = 0;
662 } else if (it->it_op & IT_LAYOUT) {
663 /* maybe the lock was granted right away and layout
664 * is packed into RMF_DLM_LVB of req */
665 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
667 lvb_data = req_capsule_server_sized_get(pill,
668 &RMF_DLM_LVB, lvb_len);
669 if (lvb_data == NULL)
673 * save replied layout data to the request buffer for
674 * recovery consideration (lest MDS reinitialize
675 * another set of OST objects).
678 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
683 /* fill in stripe data for layout lock.
684 * LU-6581: trust layout data only if layout lock is granted. The MDT
685 * has stopped sending layout unless the layout lock is granted. The
686 * client still does this checking in case it's talking with an old
687 * server. - Jinshan */
688 lock = ldlm_handle2lock(lockh);
689 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
690 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
693 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
694 ldlm_it2str(it->it_op), lvb_len);
696 OBD_ALLOC_LARGE(lmm, lvb_len);
701 memcpy(lmm, lvb_data, lvb_len);
703 /* install lvb_data */
704 lock_res_and_lock(lock);
705 if (lock->l_lvb_data == NULL) {
706 lock->l_lvb_type = LVB_T_LAYOUT;
707 lock->l_lvb_data = lmm;
708 lock->l_lvb_len = lvb_len;
711 unlock_res_and_lock(lock);
713 OBD_FREE_LARGE(lmm, lvb_len);
721 /* We always reserve enough space in the reply packet for a stripe MD, because
722 * we don't know in advance the file type. */
723 static int mdc_enqueue_base(struct obd_export *exp,
724 struct ldlm_enqueue_info *einfo,
725 const union ldlm_policy_data *policy,
726 struct lookup_intent *it,
727 struct md_op_data *op_data,
728 struct lustre_handle *lockh,
729 __u64 extra_lock_flags)
731 struct obd_device *obddev = class_exp2obd(exp);
732 struct ptlrpc_request *req = NULL;
733 __u64 flags, saved_flags = extra_lock_flags;
734 struct ldlm_res_id res_id;
735 static const union ldlm_policy_data lookup_policy = {
736 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
737 static const union ldlm_policy_data update_policy = {
738 .l_inodebits = { MDS_INODELOCK_UPDATE } };
739 static const union ldlm_policy_data layout_policy = {
740 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
741 static const union ldlm_policy_data getxattr_policy = {
742 .l_inodebits = { MDS_INODELOCK_XATTR } };
743 int generation, resends = 0;
744 struct ldlm_reply *lockrep;
745 enum lvb_type lvb_type = 0;
749 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
751 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
754 LASSERT(policy == NULL);
756 saved_flags |= LDLM_FL_HAS_INTENT;
757 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
758 policy = &update_policy;
759 else if (it->it_op & IT_LAYOUT)
760 policy = &layout_policy;
761 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
762 policy = &getxattr_policy;
764 policy = &lookup_policy;
767 generation = obddev->u.cli.cl_import->imp_generation;
771 /* The only way right now is FLOCK. */
772 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
774 res_id.name[3] = LDLM_FLOCK;
775 } else if (it->it_op & IT_OPEN) {
776 req = mdc_intent_open_pack(exp, it, op_data);
777 } else if (it->it_op & IT_UNLINK) {
778 req = mdc_intent_unlink_pack(exp, it, op_data);
779 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
780 req = mdc_intent_getattr_pack(exp, it, op_data);
781 } else if (it->it_op & IT_READDIR) {
782 req = mdc_enqueue_pack(exp, 0);
783 } else if (it->it_op & IT_LAYOUT) {
784 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
786 req = mdc_intent_layout_pack(exp, it, op_data);
787 lvb_type = LVB_T_LAYOUT;
788 } else if (it->it_op & IT_GETXATTR) {
789 req = mdc_intent_getxattr_pack(exp, it, op_data);
796 RETURN(PTR_ERR(req));
799 req->rq_generation_set = 1;
800 req->rq_import_generation = generation;
801 req->rq_sent = ktime_get_real_seconds() + resends;
804 /* It is important to obtain modify RPC slot first (if applicable), so
805 * that threads that are waiting for a modify RPC slot are not polluting
806 * our rpcs in flight counter.
807 * We do not do flock request limiting, though */
809 mdc_get_mod_rpc_slot(req, it);
810 rc = obd_get_request_slot(&obddev->u.cli);
812 mdc_put_mod_rpc_slot(req, it);
813 mdc_clear_replay_flag(req, 0);
814 ptlrpc_req_finished(req);
819 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
820 0, lvb_type, lockh, 0);
822 /* For flock requests we immediatelly return without further
823 delay and let caller deal with the rest, since rest of
824 this function metadata processing makes no sense for flock
825 requests anyway. But in case of problem during comms with
826 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
827 can not rely on caller and this mainly for F_UNLCKs
828 (explicits or automatically generated by Kernel to clean
829 current FLocks upon exit) that can't be trashed */
830 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
831 (einfo->ei_type == LDLM_FLOCK) &&
832 (einfo->ei_mode == LCK_NL))
837 obd_put_request_slot(&obddev->u.cli);
838 mdc_put_mod_rpc_slot(req, it);
841 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
842 obddev->obd_name, rc);
844 mdc_clear_replay_flag(req, rc);
845 ptlrpc_req_finished(req);
849 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
850 LASSERT(lockrep != NULL);
852 lockrep->lock_policy_res2 =
853 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
855 /* Retry infinitely when the server returns -EINPROGRESS for the
856 * intent operation, when server returns -EINPROGRESS for acquiring
857 * intent lock, we'll retry in after_reply(). */
858 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
859 mdc_clear_replay_flag(req, rc);
860 ptlrpc_req_finished(req);
863 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
864 obddev->obd_name, resends, it->it_op,
865 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
867 if (generation == obddev->u.cli.cl_import->imp_generation) {
870 CDEBUG(D_HA, "resend cross eviction\n");
875 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
877 if (lustre_handle_is_used(lockh)) {
878 ldlm_lock_decref(lockh, einfo->ei_mode);
879 memset(lockh, 0, sizeof(*lockh));
881 ptlrpc_req_finished(req);
883 it->it_lock_handle = 0;
884 it->it_lock_mode = 0;
885 it->it_request = NULL;
891 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
892 const union ldlm_policy_data *policy,
893 struct md_op_data *op_data,
894 struct lustre_handle *lockh, __u64 extra_lock_flags)
896 return mdc_enqueue_base(exp, einfo, policy, NULL,
897 op_data, lockh, extra_lock_flags);
900 static int mdc_finish_intent_lock(struct obd_export *exp,
901 struct ptlrpc_request *request,
902 struct md_op_data *op_data,
903 struct lookup_intent *it,
904 struct lustre_handle *lockh)
906 struct lustre_handle old_lock;
907 struct ldlm_lock *lock;
911 LASSERT(request != NULL);
912 LASSERT(request != LP_POISON);
913 LASSERT(request->rq_repmsg != LP_POISON);
915 if (it->it_op & IT_READDIR)
918 if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
919 if (it->it_status != 0)
920 GOTO(out, rc = it->it_status);
922 if (!it_disposition(it, DISP_IT_EXECD)) {
923 /* The server failed before it even started executing
924 * the intent, i.e. because it couldn't unpack the
927 LASSERT(it->it_status != 0);
928 GOTO(out, rc = it->it_status);
930 rc = it_open_error(DISP_IT_EXECD, it);
934 rc = it_open_error(DISP_LOOKUP_EXECD, it);
938 /* keep requests around for the multiple phases of the call
939 * this shows the DISP_XX must guarantee we make it into the
942 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
943 it_disposition(it, DISP_OPEN_CREATE) &&
944 !it_open_error(DISP_OPEN_CREATE, it)) {
945 it_set_disposition(it, DISP_ENQ_CREATE_REF);
946 /* balanced in ll_create_node */
947 ptlrpc_request_addref(request);
949 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
950 it_disposition(it, DISP_OPEN_OPEN) &&
951 !it_open_error(DISP_OPEN_OPEN, it)) {
952 it_set_disposition(it, DISP_ENQ_OPEN_REF);
953 /* balanced in ll_file_open */
954 ptlrpc_request_addref(request);
955 /* BUG 11546 - eviction in the middle of open rpc
958 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
962 if (it->it_op & IT_CREAT) {
963 /* XXX this belongs in ll_create_it */
964 } else if (it->it_op == IT_OPEN) {
965 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
967 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
971 /* If we already have a matching lock, then cancel the new
972 * one. We have to set the data here instead of in
973 * mdc_enqueue, because we need to use the child's inode as
974 * the l_ast_data to match, and that's not available until
975 * intent_finish has performed the iget().) */
976 lock = ldlm_handle2lock(lockh);
978 union ldlm_policy_data policy = lock->l_policy_data;
979 LDLM_DEBUG(lock, "matching against this");
981 if (it_has_reply_body(it)) {
982 struct mdt_body *body;
984 body = req_capsule_server_get(&request->rq_pill,
986 /* mdc_enqueue checked */
987 LASSERT(body != NULL);
988 LASSERTF(fid_res_name_eq(&body->mbo_fid1,
989 &lock->l_resource->lr_name),
990 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
991 PLDLMRES(lock->l_resource),
992 PFID(&body->mbo_fid1));
996 memcpy(&old_lock, lockh, sizeof(*lockh));
997 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
998 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
999 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1000 memcpy(lockh, &old_lock, sizeof(old_lock));
1001 it->it_lock_handle = lockh->cookie;
1007 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1008 (int)op_data->op_namelen, op_data->op_name,
1009 ldlm_it2str(it->it_op), it->it_status,
1010 it->it_disposition, rc);
1014 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1015 struct lu_fid *fid, __u64 *bits)
1017 /* We could just return 1 immediately, but since we should only
1018 * be called in revalidate_it if we already have a lock, let's
1020 struct ldlm_res_id res_id;
1021 struct lustre_handle lockh;
1022 union ldlm_policy_data policy;
1023 enum ldlm_mode mode;
1026 if (it->it_lock_handle) {
1027 lockh.cookie = it->it_lock_handle;
1028 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1030 fid_build_reg_res_name(fid, &res_id);
1031 switch (it->it_op) {
1033 /* File attributes are held under multiple bits:
1034 * nlink is under lookup lock, size and times are
1035 * under UPDATE lock and recently we've also got
1036 * a separate permissions lock for owner/group/acl that
1037 * were protected by lookup lock before.
1038 * Getattr must provide all of that information,
1039 * so we need to ensure we have all of those locks.
1040 * Unfortunately, if the bits are split across multiple
1041 * locks, there's no easy way to match all of them here,
1042 * so an extra RPC would be performed to fetch all
1043 * of those bits at once for now. */
1044 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1045 * but for old MDTs (< 2.4), permission is covered
1046 * by LOOKUP lock, so it needs to match all bits here.*/
1047 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1048 MDS_INODELOCK_LOOKUP |
1052 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1055 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1058 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1062 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1063 LDLM_IBITS, &policy,
1064 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1069 it->it_lock_handle = lockh.cookie;
1070 it->it_lock_mode = mode;
1072 it->it_lock_handle = 0;
1073 it->it_lock_mode = 0;
1080 * This long block is all about fixing up the lock and request state
1081 * so that it is correct as of the moment _before_ the operation was
1082 * applied; that way, the VFS will think that everything is normal and
1083 * call Lustre's regular VFS methods.
1085 * If we're performing a creation, that means that unless the creation
1086 * failed with EEXIST, we should fake up a negative dentry.
1088 * For everything else, we want to lookup to succeed.
1090 * One additional note: if CREATE or OPEN succeeded, we add an extra
1091 * reference to the request because we need to keep it around until
1092 * ll_create/ll_open gets called.
1094 * The server will return to us, in it_disposition, an indication of
1095 * exactly what it_status refers to.
1097 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1098 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1099 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1100 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1103 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1106 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1107 struct lookup_intent *it, struct ptlrpc_request **reqp,
1108 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1110 struct ldlm_enqueue_info einfo = {
1111 .ei_type = LDLM_IBITS,
1112 .ei_mode = it_to_lock_mode(it),
1113 .ei_cb_bl = cb_blocking,
1114 .ei_cb_cp = ldlm_completion_ast,
1116 struct lustre_handle lockh;
1121 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1122 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1123 op_data->op_name, PFID(&op_data->op_fid2),
1124 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1128 if (fid_is_sane(&op_data->op_fid2) &&
1129 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1130 /* We could just return 1 immediately, but since we should only
1131 * be called in revalidate_it if we already have a lock, let's
1133 it->it_lock_handle = 0;
1134 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1135 /* Only return failure if it was not GETATTR by cfid
1136 (from inode_revalidate) */
1137 if (rc || op_data->op_namelen != 0)
1141 /* For case if upper layer did not alloc fid, do it now. */
1142 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1143 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1145 CERROR("Can't alloc new fid, rc %d\n", rc);
1150 rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1155 *reqp = it->it_request;
1156 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1160 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1161 struct ptlrpc_request *req,
1164 struct mdc_getattr_args *ga = args;
1165 struct obd_export *exp = ga->ga_exp;
1166 struct md_enqueue_info *minfo = ga->ga_minfo;
1167 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1168 struct lookup_intent *it;
1169 struct lustre_handle *lockh;
1170 struct obd_device *obddev;
1171 struct ldlm_reply *lockrep;
1172 __u64 flags = LDLM_FL_HAS_INTENT;
1176 lockh = &minfo->mi_lockh;
1178 obddev = class_exp2obd(exp);
1180 obd_put_request_slot(&obddev->u.cli);
1181 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1184 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1185 &flags, NULL, 0, lockh, rc);
1187 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1188 mdc_clear_replay_flag(req, rc);
1192 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1193 LASSERT(lockrep != NULL);
1195 lockrep->lock_policy_res2 =
1196 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1198 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1202 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1206 minfo->mi_cb(req, minfo, rc);
1210 int mdc_intent_getattr_async(struct obd_export *exp,
1211 struct md_enqueue_info *minfo)
1213 struct md_op_data *op_data = &minfo->mi_data;
1214 struct lookup_intent *it = &minfo->mi_it;
1215 struct ptlrpc_request *req;
1216 struct mdc_getattr_args *ga;
1217 struct obd_device *obddev = class_exp2obd(exp);
1218 struct ldlm_res_id res_id;
1219 union ldlm_policy_data policy = {
1220 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1221 MDS_INODELOCK_UPDATE } };
1223 __u64 flags = LDLM_FL_HAS_INTENT;
1226 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1227 (int)op_data->op_namelen, op_data->op_name,
1228 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1230 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1231 req = mdc_intent_getattr_pack(exp, it, op_data);
1233 RETURN(PTR_ERR(req));
1235 rc = obd_get_request_slot(&obddev->u.cli);
1237 ptlrpc_req_finished(req);
1241 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1242 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1244 obd_put_request_slot(&obddev->u.cli);
1245 ptlrpc_req_finished(req);
1249 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1250 ga = ptlrpc_req_async_args(req);
1252 ga->ga_minfo = minfo;
1254 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1255 ptlrpcd_add_req(req);