4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2016, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_MDC
35 #include <linux/module.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
47 #include "mdc_internal.h"
49 struct mdc_getattr_args {
50 struct obd_export *ga_exp;
51 struct md_enqueue_info *ga_minfo;
54 int it_open_error(int phase, struct lookup_intent *it)
56 if (it_disposition(it, DISP_OPEN_LEASE)) {
57 if (phase >= DISP_OPEN_LEASE)
62 if (it_disposition(it, DISP_OPEN_OPEN)) {
63 if (phase >= DISP_OPEN_OPEN)
69 if (it_disposition(it, DISP_OPEN_CREATE)) {
70 if (phase >= DISP_OPEN_CREATE)
76 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
77 if (phase >= DISP_LOOKUP_EXECD)
83 if (it_disposition(it, DISP_IT_EXECD)) {
84 if (phase >= DISP_IT_EXECD)
90 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
95 EXPORT_SYMBOL(it_open_error);
97 /* this must be called on a lockh that is known to have a referenced lock */
98 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
99 void *data, __u64 *bits)
101 struct ldlm_lock *lock;
102 struct inode *new_inode = data;
108 if (!lustre_handle_is_used(lockh))
111 lock = ldlm_handle2lock(lockh);
113 LASSERT(lock != NULL);
114 lock_res_and_lock(lock);
115 if (lock->l_resource->lr_lvb_inode &&
116 lock->l_resource->lr_lvb_inode != data) {
117 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
118 LASSERTF(old_inode->i_state & I_FREEING,
119 "Found existing inode %p/%lu/%u state %lu in lock: "
120 "setting data to %p/%lu/%u\n", old_inode,
121 old_inode->i_ino, old_inode->i_generation,
123 new_inode, new_inode->i_ino, new_inode->i_generation);
125 lock->l_resource->lr_lvb_inode = new_inode;
127 *bits = lock->l_policy_data.l_inodebits.bits;
129 unlock_res_and_lock(lock);
135 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
136 const struct lu_fid *fid, enum ldlm_type type,
137 union ldlm_policy_data *policy,
138 enum ldlm_mode mode, struct lustre_handle *lockh)
140 struct ldlm_res_id res_id;
144 fid_build_reg_res_name(fid, &res_id);
145 /* LU-4405: Clear bits not supported by server */
146 policy->l_inodebits.bits &= exp_connect_ibits(exp);
147 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
148 &res_id, type, policy, mode, lockh, 0);
152 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
153 union ldlm_policy_data *policy, enum ldlm_mode mode,
154 enum ldlm_cancel_flags flags, void *opaque)
156 struct obd_device *obd = class_exp2obd(exp);
157 struct ldlm_res_id res_id;
162 fid_build_reg_res_name(fid, &res_id);
163 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
164 policy, mode, flags, opaque);
168 int mdc_null_inode(struct obd_export *exp,
169 const struct lu_fid *fid)
171 struct ldlm_res_id res_id;
172 struct ldlm_resource *res;
173 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
176 LASSERTF(ns != NULL, "no namespace passed\n");
178 fid_build_reg_res_name(fid, &res_id);
180 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
185 res->lr_lvb_inode = NULL;
188 ldlm_resource_putref(res);
192 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
194 /* Don't hold error requests for replay. */
195 if (req->rq_replay) {
196 spin_lock(&req->rq_lock);
198 spin_unlock(&req->rq_lock);
200 if (rc && req->rq_transno != 0) {
201 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
206 /* Save a large LOV EA into the request buffer so that it is available
207 * for replay. We don't do this in the initial request because the
208 * original request doesn't need this buffer (at most it sends just the
209 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
210 * buffer and may also be difficult to allocate and save a very large
211 * request buffer for each open. (bug 5707)
213 * OOM here may cause recovery failure if lmm is needed (only for the
214 * original open if the MDS crashed just when this client also OOM'd)
215 * but this is incredibly unlikely, and questionable whether the client
216 * could do MDS recovery under OOM anyways... */
217 int mdc_save_lovea(struct ptlrpc_request *req,
218 const struct req_msg_field *field,
219 void *data, u32 size)
221 struct req_capsule *pill = &req->rq_pill;
225 if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
226 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
228 CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
229 req->rq_export->exp_obd->obd_name,
234 req_capsule_shrink(pill, field, size, RCL_CLIENT);
237 req_capsule_set_size(pill, field, RCL_CLIENT, size);
238 lmm = req_capsule_client_get(pill, field);
240 memcpy(lmm, data, size);
245 static struct ptlrpc_request *
246 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
247 struct md_op_data *op_data)
249 struct ptlrpc_request *req;
250 struct obd_device *obddev = class_exp2obd(exp);
251 struct ldlm_intent *lit;
252 const void *lmm = op_data->op_data;
253 __u32 lmmsize = op_data->op_data_size;
254 struct list_head cancels = LIST_HEAD_INIT(cancels);
260 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
262 /* XXX: openlock is not cancelled for cross-refs. */
263 /* If inode is known, cancel conflicting OPEN locks. */
264 if (fid_is_sane(&op_data->op_fid2)) {
265 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
266 if (it->it_flags & FMODE_WRITE)
271 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
274 else if (it->it_flags & FMODE_EXEC)
280 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
285 /* If CREATE, cancel parent's UPDATE lock. */
286 if (it->it_op & IT_CREAT)
290 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
292 MDS_INODELOCK_UPDATE);
294 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
295 &RQF_LDLM_INTENT_OPEN);
297 ldlm_lock_list_put(&cancels, l_bl_ast, count);
298 RETURN(ERR_PTR(-ENOMEM));
301 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
302 op_data->op_namelen + 1);
303 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
304 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
306 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
307 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
308 strlen(op_data->op_file_secctx_name) + 1 : 0);
310 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
311 op_data->op_file_secctx_size);
313 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
315 ptlrpc_request_free(req);
319 spin_lock(&req->rq_lock);
320 req->rq_replay = req->rq_import->imp_replayable;
321 spin_unlock(&req->rq_lock);
323 /* pack the intent */
324 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
325 lit->opc = (__u64)it->it_op;
327 /* pack the intended request */
328 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
331 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
332 obddev->u.cli.cl_max_mds_easize);
333 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
334 req->rq_import->imp_connect_data.ocd_max_easize);
335 ptlrpc_request_set_replen(req);
339 #define GA_DEFAULT_EA_NAME_LEN 20
340 #define GA_DEFAULT_EA_VAL_LEN 250
341 #define GA_DEFAULT_EA_NUM 10
343 static struct ptlrpc_request *
344 mdc_intent_getxattr_pack(struct obd_export *exp,
345 struct lookup_intent *it,
346 struct md_op_data *op_data)
348 struct ptlrpc_request *req;
349 struct ldlm_intent *lit;
351 struct list_head cancels = LIST_HEAD_INIT(cancels);
355 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
356 &RQF_LDLM_INTENT_GETXATTR);
358 RETURN(ERR_PTR(-ENOMEM));
360 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
362 ptlrpc_request_free(req);
366 /* pack the intent */
367 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
368 lit->opc = IT_GETXATTR;
370 /* pack the intended request */
371 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
372 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM,
375 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
376 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
378 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
379 GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM);
381 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
382 sizeof(__u32) * GA_DEFAULT_EA_NUM);
384 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
386 ptlrpc_request_set_replen(req);
391 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
392 struct lookup_intent *it,
393 struct md_op_data *op_data)
395 struct ptlrpc_request *req;
396 struct obd_device *obddev = class_exp2obd(exp);
397 struct ldlm_intent *lit;
401 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
402 &RQF_LDLM_INTENT_UNLINK);
404 RETURN(ERR_PTR(-ENOMEM));
406 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
407 op_data->op_namelen + 1);
409 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
411 ptlrpc_request_free(req);
415 /* pack the intent */
416 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
417 lit->opc = (__u64)it->it_op;
419 /* pack the intended request */
420 mdc_unlink_pack(req, op_data);
422 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
423 obddev->u.cli.cl_default_mds_easize);
424 ptlrpc_request_set_replen(req);
428 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
429 struct lookup_intent *it,
430 struct md_op_data *op_data)
432 struct ptlrpc_request *req;
433 struct obd_device *obddev = class_exp2obd(exp);
434 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
435 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
436 OBD_MD_MEA | OBD_MD_FLACL;
437 struct ldlm_intent *lit;
442 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
443 &RQF_LDLM_INTENT_GETATTR);
445 RETURN(ERR_PTR(-ENOMEM));
447 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
448 op_data->op_namelen + 1);
450 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
452 ptlrpc_request_free(req);
456 /* pack the intent */
457 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
458 lit->opc = (__u64)it->it_op;
460 if (obddev->u.cli.cl_default_mds_easize > 0)
461 easize = obddev->u.cli.cl_default_mds_easize;
463 easize = obddev->u.cli.cl_max_mds_easize;
465 /* pack the intended request */
466 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
468 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
469 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
470 req->rq_import->imp_connect_data.ocd_max_easize);
471 ptlrpc_request_set_replen(req);
475 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
476 struct lookup_intent *it,
477 struct md_op_data *op_data)
479 struct obd_device *obd = class_exp2obd(exp);
480 struct ptlrpc_request *req;
481 struct ldlm_intent *lit;
482 struct layout_intent *layout;
486 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
487 &RQF_LDLM_INTENT_LAYOUT);
489 RETURN(ERR_PTR(-ENOMEM));
491 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
492 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
494 ptlrpc_request_free(req);
498 /* pack the intent */
499 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
500 lit->opc = (__u64)it->it_op;
502 /* pack the layout intent request */
503 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
504 LASSERT(op_data->op_data != NULL);
505 LASSERT(op_data->op_data_size == sizeof(*layout));
506 memcpy(layout, op_data->op_data, sizeof(*layout));
508 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
509 obd->u.cli.cl_default_mds_easize);
510 ptlrpc_request_set_replen(req);
514 static struct ptlrpc_request *
515 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
517 struct ptlrpc_request *req;
521 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
523 RETURN(ERR_PTR(-ENOMEM));
525 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
527 ptlrpc_request_free(req);
531 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
532 ptlrpc_request_set_replen(req);
536 static int mdc_finish_enqueue(struct obd_export *exp,
537 struct ptlrpc_request *req,
538 struct ldlm_enqueue_info *einfo,
539 struct lookup_intent *it,
540 struct lustre_handle *lockh,
543 struct req_capsule *pill = &req->rq_pill;
544 struct ldlm_request *lockreq;
545 struct ldlm_reply *lockrep;
546 struct ldlm_lock *lock;
547 void *lvb_data = NULL;
552 /* Similarly, if we're going to replay this request, we don't want to
553 * actually get a lock, just perform the intent. */
554 if (req->rq_transno || req->rq_replay) {
555 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
556 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
559 if (rc == ELDLM_LOCK_ABORTED) {
561 memset(lockh, 0, sizeof(*lockh));
563 } else { /* rc = 0 */
564 lock = ldlm_handle2lock(lockh);
565 LASSERT(lock != NULL);
567 /* If the server gave us back a different lock mode, we should
568 * fix up our variables. */
569 if (lock->l_req_mode != einfo->ei_mode) {
570 ldlm_lock_addref(lockh, lock->l_req_mode);
571 ldlm_lock_decref(lockh, einfo->ei_mode);
572 einfo->ei_mode = lock->l_req_mode;
577 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
578 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
580 it->it_disposition = (int)lockrep->lock_policy_res1;
581 it->it_status = (int)lockrep->lock_policy_res2;
582 it->it_lock_mode = einfo->ei_mode;
583 it->it_lock_handle = lockh->cookie;
584 it->it_request = req;
586 /* Technically speaking rq_transno must already be zero if
587 * it_status is in error, so the check is a bit redundant */
588 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
589 mdc_clear_replay_flag(req, it->it_status);
591 /* If we're doing an IT_OPEN which did not result in an actual
592 * successful open, then we need to remove the bit which saves
593 * this request for unconditional replay.
595 * It's important that we do this first! Otherwise we might exit the
596 * function without doing so, and try to replay a failed create
598 if (it->it_op & IT_OPEN && req->rq_replay &&
599 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
600 mdc_clear_replay_flag(req, it->it_status);
602 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
603 it->it_op, it->it_disposition, it->it_status);
605 /* We know what to expect, so we do any byte flipping required here */
606 if (it_has_reply_body(it)) {
607 struct mdt_body *body;
609 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
611 CERROR ("Can't swab mdt_body\n");
615 if (it_disposition(it, DISP_OPEN_OPEN) &&
616 !it_open_error(DISP_OPEN_OPEN, it)) {
618 * If this is a successful OPEN request, we need to set
619 * replay handler and data early, so that if replay
620 * happens immediately after swabbing below, new reply
621 * is swabbed by that handler correctly.
623 mdc_set_open_replay_data(NULL, NULL, it);
626 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
629 mdc_update_max_ea_from_body(exp, body);
632 * The eadata is opaque; just check that it is there.
633 * Eventually, obd_unpackmd() will check the contents.
635 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
636 body->mbo_eadatasize);
640 /* save lvb data and length in case this is for layout
643 lvb_len = body->mbo_eadatasize;
646 * We save the reply LOV EA in case we have to replay a
647 * create for recovery. If we didn't allocate a large
648 * enough request buffer above we need to reallocate it
649 * here to hold the actual LOV EA.
651 * To not save LOV EA if request is not going to replay
652 * (for example error one).
654 if ((it->it_op & IT_OPEN) && req->rq_replay) {
655 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
656 body->mbo_eadatasize);
658 body->mbo_valid &= ~OBD_MD_FLEASIZE;
659 body->mbo_eadatasize = 0;
664 } else if (it->it_op & IT_LAYOUT) {
665 /* maybe the lock was granted right away and layout
666 * is packed into RMF_DLM_LVB of req */
667 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
669 lvb_data = req_capsule_server_sized_get(pill,
670 &RMF_DLM_LVB, lvb_len);
671 if (lvb_data == NULL)
675 * save replied layout data to the request buffer for
676 * recovery consideration (lest MDS reinitialize
677 * another set of OST objects).
680 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
685 /* fill in stripe data for layout lock.
686 * LU-6581: trust layout data only if layout lock is granted. The MDT
687 * has stopped sending layout unless the layout lock is granted. The
688 * client still does this checking in case it's talking with an old
689 * server. - Jinshan */
690 lock = ldlm_handle2lock(lockh);
691 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
692 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
695 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
696 ldlm_it2str(it->it_op), lvb_len);
698 OBD_ALLOC_LARGE(lmm, lvb_len);
703 memcpy(lmm, lvb_data, lvb_len);
705 /* install lvb_data */
706 lock_res_and_lock(lock);
707 if (lock->l_lvb_data == NULL) {
708 lock->l_lvb_type = LVB_T_LAYOUT;
709 lock->l_lvb_data = lmm;
710 lock->l_lvb_len = lvb_len;
713 unlock_res_and_lock(lock);
715 OBD_FREE_LARGE(lmm, lvb_len);
723 /* We always reserve enough space in the reply packet for a stripe MD, because
724 * we don't know in advance the file type. */
725 static int mdc_enqueue_base(struct obd_export *exp,
726 struct ldlm_enqueue_info *einfo,
727 const union ldlm_policy_data *policy,
728 struct lookup_intent *it,
729 struct md_op_data *op_data,
730 struct lustre_handle *lockh,
731 __u64 extra_lock_flags)
733 struct obd_device *obddev = class_exp2obd(exp);
734 struct ptlrpc_request *req = NULL;
735 __u64 flags, saved_flags = extra_lock_flags;
736 struct ldlm_res_id res_id;
737 static const union ldlm_policy_data lookup_policy = {
738 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
739 static const union ldlm_policy_data update_policy = {
740 .l_inodebits = { MDS_INODELOCK_UPDATE } };
741 static const union ldlm_policy_data layout_policy = {
742 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
743 static const union ldlm_policy_data getxattr_policy = {
744 .l_inodebits = { MDS_INODELOCK_XATTR } };
745 int generation, resends = 0;
746 struct ldlm_reply *lockrep;
747 enum lvb_type lvb_type = 0;
751 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
753 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
756 LASSERT(policy == NULL);
758 saved_flags |= LDLM_FL_HAS_INTENT;
759 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
760 policy = &update_policy;
761 else if (it->it_op & IT_LAYOUT)
762 policy = &layout_policy;
763 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
764 policy = &getxattr_policy;
766 policy = &lookup_policy;
769 generation = obddev->u.cli.cl_import->imp_generation;
773 /* The only way right now is FLOCK. */
774 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
776 res_id.name[3] = LDLM_FLOCK;
777 } else if (it->it_op & IT_OPEN) {
778 req = mdc_intent_open_pack(exp, it, op_data);
779 } else if (it->it_op & IT_UNLINK) {
780 req = mdc_intent_unlink_pack(exp, it, op_data);
781 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
782 req = mdc_intent_getattr_pack(exp, it, op_data);
783 } else if (it->it_op & IT_READDIR) {
784 req = mdc_enqueue_pack(exp, 0);
785 } else if (it->it_op & IT_LAYOUT) {
786 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
788 req = mdc_intent_layout_pack(exp, it, op_data);
789 lvb_type = LVB_T_LAYOUT;
790 } else if (it->it_op & IT_GETXATTR) {
791 req = mdc_intent_getxattr_pack(exp, it, op_data);
798 RETURN(PTR_ERR(req));
801 req->rq_generation_set = 1;
802 req->rq_import_generation = generation;
803 req->rq_sent = ktime_get_real_seconds() + resends;
806 /* It is important to obtain modify RPC slot first (if applicable), so
807 * that threads that are waiting for a modify RPC slot are not polluting
808 * our rpcs in flight counter.
809 * We do not do flock request limiting, though */
811 mdc_get_mod_rpc_slot(req, it);
812 rc = obd_get_request_slot(&obddev->u.cli);
814 mdc_put_mod_rpc_slot(req, it);
815 mdc_clear_replay_flag(req, 0);
816 ptlrpc_req_finished(req);
821 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
822 0, lvb_type, lockh, 0);
824 /* For flock requests we immediatelly return without further
825 delay and let caller deal with the rest, since rest of
826 this function metadata processing makes no sense for flock
827 requests anyway. But in case of problem during comms with
828 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
829 can not rely on caller and this mainly for F_UNLCKs
830 (explicits or automatically generated by Kernel to clean
831 current FLocks upon exit) that can't be trashed */
832 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
833 (einfo->ei_type == LDLM_FLOCK) &&
834 (einfo->ei_mode == LCK_NL))
839 obd_put_request_slot(&obddev->u.cli);
840 mdc_put_mod_rpc_slot(req, it);
844 "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
845 obddev->obd_name, PFID(&op_data->op_fid1),
846 PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
848 mdc_clear_replay_flag(req, rc);
849 ptlrpc_req_finished(req);
853 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
854 LASSERT(lockrep != NULL);
856 lockrep->lock_policy_res2 =
857 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
859 /* Retry infinitely when the server returns -EINPROGRESS for the
860 * intent operation, when server returns -EINPROGRESS for acquiring
861 * intent lock, we'll retry in after_reply(). */
862 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
863 mdc_clear_replay_flag(req, rc);
864 ptlrpc_req_finished(req);
865 if (generation == obddev->u.cli.cl_import->imp_generation) {
866 if (signal_pending(current))
870 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
871 obddev->obd_name, resends, it->it_op,
872 PFID(&op_data->op_fid1),
873 PFID(&op_data->op_fid2));
876 CDEBUG(D_HA, "resend cross eviction\n");
881 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
883 if (lustre_handle_is_used(lockh)) {
884 ldlm_lock_decref(lockh, einfo->ei_mode);
885 memset(lockh, 0, sizeof(*lockh));
887 ptlrpc_req_finished(req);
889 it->it_lock_handle = 0;
890 it->it_lock_mode = 0;
891 it->it_request = NULL;
897 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
898 const union ldlm_policy_data *policy,
899 struct md_op_data *op_data,
900 struct lustre_handle *lockh, __u64 extra_lock_flags)
902 return mdc_enqueue_base(exp, einfo, policy, NULL,
903 op_data, lockh, extra_lock_flags);
906 static int mdc_finish_intent_lock(struct obd_export *exp,
907 struct ptlrpc_request *request,
908 struct md_op_data *op_data,
909 struct lookup_intent *it,
910 struct lustre_handle *lockh)
912 struct lustre_handle old_lock;
913 struct ldlm_lock *lock;
917 LASSERT(request != NULL);
918 LASSERT(request != LP_POISON);
919 LASSERT(request->rq_repmsg != LP_POISON);
921 if (it->it_op & IT_READDIR)
924 if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
925 if (it->it_status != 0)
926 GOTO(out, rc = it->it_status);
928 if (!it_disposition(it, DISP_IT_EXECD)) {
929 /* The server failed before it even started executing
930 * the intent, i.e. because it couldn't unpack the
933 LASSERT(it->it_status != 0);
934 GOTO(out, rc = it->it_status);
936 rc = it_open_error(DISP_IT_EXECD, it);
940 rc = it_open_error(DISP_LOOKUP_EXECD, it);
944 /* keep requests around for the multiple phases of the call
945 * this shows the DISP_XX must guarantee we make it into the
948 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
949 it_disposition(it, DISP_OPEN_CREATE) &&
950 !it_open_error(DISP_OPEN_CREATE, it)) {
951 it_set_disposition(it, DISP_ENQ_CREATE_REF);
952 /* balanced in ll_create_node */
953 ptlrpc_request_addref(request);
955 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
956 it_disposition(it, DISP_OPEN_OPEN) &&
957 !it_open_error(DISP_OPEN_OPEN, it)) {
958 it_set_disposition(it, DISP_ENQ_OPEN_REF);
959 /* balanced in ll_file_open */
960 ptlrpc_request_addref(request);
961 /* BUG 11546 - eviction in the middle of open rpc
964 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
968 if (it->it_op & IT_CREAT) {
969 /* XXX this belongs in ll_create_it */
970 } else if (it->it_op == IT_OPEN) {
971 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
973 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
977 /* If we already have a matching lock, then cancel the new
978 * one. We have to set the data here instead of in
979 * mdc_enqueue, because we need to use the child's inode as
980 * the l_ast_data to match, and that's not available until
981 * intent_finish has performed the iget().) */
982 lock = ldlm_handle2lock(lockh);
984 union ldlm_policy_data policy = lock->l_policy_data;
985 LDLM_DEBUG(lock, "matching against this");
987 if (it_has_reply_body(it)) {
988 struct mdt_body *body;
990 body = req_capsule_server_get(&request->rq_pill,
992 /* mdc_enqueue checked */
993 LASSERT(body != NULL);
994 LASSERTF(fid_res_name_eq(&body->mbo_fid1,
995 &lock->l_resource->lr_name),
996 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
997 PLDLMRES(lock->l_resource),
998 PFID(&body->mbo_fid1));
1000 LDLM_LOCK_PUT(lock);
1002 memcpy(&old_lock, lockh, sizeof(*lockh));
1003 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1004 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1005 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1006 memcpy(lockh, &old_lock, sizeof(old_lock));
1007 it->it_lock_handle = lockh->cookie;
1013 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1014 (int)op_data->op_namelen, op_data->op_name,
1015 ldlm_it2str(it->it_op), it->it_status,
1016 it->it_disposition, rc);
1020 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1021 struct lu_fid *fid, __u64 *bits)
1023 /* We could just return 1 immediately, but since we should only
1024 * be called in revalidate_it if we already have a lock, let's
1026 struct ldlm_res_id res_id;
1027 struct lustre_handle lockh;
1028 union ldlm_policy_data policy;
1029 enum ldlm_mode mode;
1032 if (it->it_lock_handle) {
1033 lockh.cookie = it->it_lock_handle;
1034 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1036 fid_build_reg_res_name(fid, &res_id);
1037 switch (it->it_op) {
1039 /* File attributes are held under multiple bits:
1040 * nlink is under lookup lock, size and times are
1041 * under UPDATE lock and recently we've also got
1042 * a separate permissions lock for owner/group/acl that
1043 * were protected by lookup lock before.
1044 * Getattr must provide all of that information,
1045 * so we need to ensure we have all of those locks.
1046 * Unfortunately, if the bits are split across multiple
1047 * locks, there's no easy way to match all of them here,
1048 * so an extra RPC would be performed to fetch all
1049 * of those bits at once for now. */
1050 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1051 * but for old MDTs (< 2.4), permission is covered
1052 * by LOOKUP lock, so it needs to match all bits here.*/
1053 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1054 MDS_INODELOCK_LOOKUP |
1058 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1061 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1064 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1068 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1069 LDLM_IBITS, &policy,
1070 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1075 it->it_lock_handle = lockh.cookie;
1076 it->it_lock_mode = mode;
1078 it->it_lock_handle = 0;
1079 it->it_lock_mode = 0;
1086 * This long block is all about fixing up the lock and request state
1087 * so that it is correct as of the moment _before_ the operation was
1088 * applied; that way, the VFS will think that everything is normal and
1089 * call Lustre's regular VFS methods.
1091 * If we're performing a creation, that means that unless the creation
1092 * failed with EEXIST, we should fake up a negative dentry.
1094 * For everything else, we want to lookup to succeed.
1096 * One additional note: if CREATE or OPEN succeeded, we add an extra
1097 * reference to the request because we need to keep it around until
1098 * ll_create/ll_open gets called.
1100 * The server will return to us, in it_disposition, an indication of
1101 * exactly what it_status refers to.
1103 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1104 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1105 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1106 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1109 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1112 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1113 struct lookup_intent *it, struct ptlrpc_request **reqp,
1114 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1116 struct ldlm_enqueue_info einfo = {
1117 .ei_type = LDLM_IBITS,
1118 .ei_mode = it_to_lock_mode(it),
1119 .ei_cb_bl = cb_blocking,
1120 .ei_cb_cp = ldlm_completion_ast,
1122 struct lustre_handle lockh;
1127 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1128 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1129 op_data->op_name, PFID(&op_data->op_fid2),
1130 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1134 if (fid_is_sane(&op_data->op_fid2) &&
1135 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1136 /* We could just return 1 immediately, but since we should only
1137 * be called in revalidate_it if we already have a lock, let's
1139 it->it_lock_handle = 0;
1140 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1141 /* Only return failure if it was not GETATTR by cfid
1142 (from inode_revalidate) */
1143 if (rc || op_data->op_namelen != 0)
1147 /* For case if upper layer did not alloc fid, do it now. */
1148 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1149 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1151 CERROR("Can't alloc new fid, rc %d\n", rc);
1156 rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1161 *reqp = it->it_request;
1162 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1166 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1167 struct ptlrpc_request *req,
1170 struct mdc_getattr_args *ga = args;
1171 struct obd_export *exp = ga->ga_exp;
1172 struct md_enqueue_info *minfo = ga->ga_minfo;
1173 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1174 struct lookup_intent *it;
1175 struct lustre_handle *lockh;
1176 struct obd_device *obddev;
1177 struct ldlm_reply *lockrep;
1178 __u64 flags = LDLM_FL_HAS_INTENT;
1182 lockh = &minfo->mi_lockh;
1184 obddev = class_exp2obd(exp);
1186 obd_put_request_slot(&obddev->u.cli);
1187 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1190 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1191 &flags, NULL, 0, lockh, rc);
1193 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1194 mdc_clear_replay_flag(req, rc);
1198 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1199 LASSERT(lockrep != NULL);
1201 lockrep->lock_policy_res2 =
1202 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1204 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1208 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1212 minfo->mi_cb(req, minfo, rc);
1216 int mdc_intent_getattr_async(struct obd_export *exp,
1217 struct md_enqueue_info *minfo)
1219 struct md_op_data *op_data = &minfo->mi_data;
1220 struct lookup_intent *it = &minfo->mi_it;
1221 struct ptlrpc_request *req;
1222 struct mdc_getattr_args *ga;
1223 struct obd_device *obddev = class_exp2obd(exp);
1224 struct ldlm_res_id res_id;
1225 union ldlm_policy_data policy = {
1226 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1227 MDS_INODELOCK_UPDATE } };
1229 __u64 flags = LDLM_FL_HAS_INTENT;
1232 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1233 (int)op_data->op_namelen, op_data->op_name,
1234 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1236 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1237 req = mdc_intent_getattr_pack(exp, it, op_data);
1239 RETURN(PTR_ERR(req));
1241 rc = obd_get_request_slot(&obddev->u.cli);
1243 ptlrpc_req_finished(req);
1247 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1248 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1250 obd_put_request_slot(&obddev->u.cli);
1251 ptlrpc_req_finished(req);
1255 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1256 ga = ptlrpc_req_async_args(req);
1258 ga->ga_minfo = minfo;
1260 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1261 ptlrpcd_add_req(req);