4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2016, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_MDC
35 #include <linux/module.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
47 #include "mdc_internal.h"
49 struct mdc_getattr_args {
50 struct obd_export *ga_exp;
51 struct md_enqueue_info *ga_minfo;
54 int it_open_error(int phase, struct lookup_intent *it)
56 if (it_disposition(it, DISP_OPEN_LEASE)) {
57 if (phase >= DISP_OPEN_LEASE)
62 if (it_disposition(it, DISP_OPEN_OPEN)) {
63 if (phase >= DISP_OPEN_OPEN)
69 if (it_disposition(it, DISP_OPEN_CREATE)) {
70 if (phase >= DISP_OPEN_CREATE)
76 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
77 if (phase >= DISP_LOOKUP_EXECD)
83 if (it_disposition(it, DISP_IT_EXECD)) {
84 if (phase >= DISP_IT_EXECD)
90 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
95 EXPORT_SYMBOL(it_open_error);
97 /* this must be called on a lockh that is known to have a referenced lock */
98 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
99 void *data, __u64 *bits)
101 struct ldlm_lock *lock;
102 struct inode *new_inode = data;
108 if (!lustre_handle_is_used(lockh))
111 lock = ldlm_handle2lock(lockh);
113 LASSERT(lock != NULL);
114 lock_res_and_lock(lock);
115 if (lock->l_resource->lr_lvb_inode &&
116 lock->l_resource->lr_lvb_inode != data) {
117 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
118 LASSERTF(old_inode->i_state & I_FREEING,
119 "Found existing inode %p/%lu/%u state %lu in lock: "
120 "setting data to %p/%lu/%u\n", old_inode,
121 old_inode->i_ino, old_inode->i_generation,
123 new_inode, new_inode->i_ino, new_inode->i_generation);
125 lock->l_resource->lr_lvb_inode = new_inode;
127 *bits = lock->l_policy_data.l_inodebits.bits;
129 unlock_res_and_lock(lock);
135 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
136 const struct lu_fid *fid, enum ldlm_type type,
137 union ldlm_policy_data *policy,
138 enum ldlm_mode mode, struct lustre_handle *lockh)
140 struct ldlm_res_id res_id;
144 fid_build_reg_res_name(fid, &res_id);
145 /* LU-4405: Clear bits not supported by server */
146 policy->l_inodebits.bits &= exp_connect_ibits(exp);
147 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
148 &res_id, type, policy, mode, lockh, 0);
152 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
153 union ldlm_policy_data *policy, enum ldlm_mode mode,
154 enum ldlm_cancel_flags flags, void *opaque)
156 struct obd_device *obd = class_exp2obd(exp);
157 struct ldlm_res_id res_id;
162 fid_build_reg_res_name(fid, &res_id);
163 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
164 policy, mode, flags, opaque);
168 int mdc_null_inode(struct obd_export *exp,
169 const struct lu_fid *fid)
171 struct ldlm_res_id res_id;
172 struct ldlm_resource *res;
173 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
176 LASSERTF(ns != NULL, "no namespace passed\n");
178 fid_build_reg_res_name(fid, &res_id);
180 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
185 res->lr_lvb_inode = NULL;
188 ldlm_resource_putref(res);
192 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
194 /* Don't hold error requests for replay. */
195 if (req->rq_replay) {
196 spin_lock(&req->rq_lock);
198 spin_unlock(&req->rq_lock);
200 if (rc && req->rq_transno != 0) {
201 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
206 /* Save a large LOV EA into the request buffer so that it is available
207 * for replay. We don't do this in the initial request because the
208 * original request doesn't need this buffer (at most it sends just the
209 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
210 * buffer and may also be difficult to allocate and save a very large
211 * request buffer for each open. (bug 5707)
213 * OOM here may cause recovery failure if lmm is needed (only for the
214 * original open if the MDS crashed just when this client also OOM'd)
215 * but this is incredibly unlikely, and questionable whether the client
216 * could do MDS recovery under OOM anyways... */
217 int mdc_save_lovea(struct ptlrpc_request *req,
218 const struct req_msg_field *field,
219 void *data, u32 size)
221 struct req_capsule *pill = &req->rq_pill;
225 if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
226 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
228 CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
229 req->rq_export->exp_obd->obd_name,
234 req_capsule_shrink(pill, field, size, RCL_CLIENT);
237 req_capsule_set_size(pill, field, RCL_CLIENT, size);
238 lmm = req_capsule_client_get(pill, field);
240 memcpy(lmm, data, size);
245 static struct ptlrpc_request *
246 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
247 struct md_op_data *op_data)
249 struct ptlrpc_request *req;
250 struct obd_device *obddev = class_exp2obd(exp);
251 struct ldlm_intent *lit;
252 const void *lmm = op_data->op_data;
253 __u32 lmmsize = op_data->op_data_size;
254 struct list_head cancels = LIST_HEAD_INIT(cancels);
260 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
262 /* XXX: openlock is not cancelled for cross-refs. */
263 /* If inode is known, cancel conflicting OPEN locks. */
264 if (fid_is_sane(&op_data->op_fid2)) {
265 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
266 if (it->it_flags & FMODE_WRITE)
271 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
274 else if (it->it_flags & FMODE_EXEC)
280 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
285 /* If CREATE, cancel parent's UPDATE lock. */
286 if (it->it_op & IT_CREAT)
290 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
292 MDS_INODELOCK_UPDATE);
294 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
295 &RQF_LDLM_INTENT_OPEN);
297 ldlm_lock_list_put(&cancels, l_bl_ast, count);
298 RETURN(ERR_PTR(-ENOMEM));
301 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
302 op_data->op_namelen + 1);
303 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
304 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
306 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
307 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
308 strlen(op_data->op_file_secctx_name) + 1 : 0);
310 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
311 op_data->op_file_secctx_size);
313 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
315 ptlrpc_request_free(req);
319 spin_lock(&req->rq_lock);
320 req->rq_replay = req->rq_import->imp_replayable;
321 spin_unlock(&req->rq_lock);
323 /* pack the intent */
324 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
325 lit->opc = (__u64)it->it_op;
327 /* pack the intended request */
328 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
331 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
332 obddev->u.cli.cl_max_mds_easize);
333 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
334 req->rq_import->imp_connect_data.ocd_max_easize);
335 ptlrpc_request_set_replen(req);
339 #define GA_DEFAULT_EA_NAME_LEN 20
340 #define GA_DEFAULT_EA_VAL_LEN 250
341 #define GA_DEFAULT_EA_NUM 10
343 static struct ptlrpc_request *
344 mdc_intent_getxattr_pack(struct obd_export *exp,
345 struct lookup_intent *it,
346 struct md_op_data *op_data)
348 struct ptlrpc_request *req;
349 struct ldlm_intent *lit;
351 struct list_head cancels = LIST_HEAD_INIT(cancels);
355 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
356 &RQF_LDLM_INTENT_GETXATTR);
358 RETURN(ERR_PTR(-ENOMEM));
360 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
362 ptlrpc_request_free(req);
366 /* pack the intent */
367 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
368 lit->opc = IT_GETXATTR;
370 /* pack the intended request */
371 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
372 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM,
375 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
376 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
378 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
379 GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM);
381 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
382 sizeof(__u32) * GA_DEFAULT_EA_NUM);
384 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
386 ptlrpc_request_set_replen(req);
391 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
392 struct lookup_intent *it,
393 struct md_op_data *op_data)
395 struct ptlrpc_request *req;
396 struct obd_device *obddev = class_exp2obd(exp);
397 struct ldlm_intent *lit;
401 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
402 &RQF_LDLM_INTENT_UNLINK);
404 RETURN(ERR_PTR(-ENOMEM));
406 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
407 op_data->op_namelen + 1);
409 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
411 ptlrpc_request_free(req);
415 /* pack the intent */
416 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
417 lit->opc = (__u64)it->it_op;
419 /* pack the intended request */
420 mdc_unlink_pack(req, op_data);
422 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
423 obddev->u.cli.cl_default_mds_easize);
424 ptlrpc_request_set_replen(req);
428 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
429 struct lookup_intent *it,
430 struct md_op_data *op_data)
432 struct ptlrpc_request *req;
433 struct obd_device *obddev = class_exp2obd(exp);
434 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
435 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
436 OBD_MD_MEA | OBD_MD_FLACL;
437 struct ldlm_intent *lit;
442 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
443 &RQF_LDLM_INTENT_GETATTR);
445 RETURN(ERR_PTR(-ENOMEM));
447 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
448 op_data->op_namelen + 1);
450 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
452 ptlrpc_request_free(req);
456 /* pack the intent */
457 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
458 lit->opc = (__u64)it->it_op;
460 if (obddev->u.cli.cl_default_mds_easize > 0)
461 easize = obddev->u.cli.cl_default_mds_easize;
463 easize = obddev->u.cli.cl_max_mds_easize;
465 /* pack the intended request */
466 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
468 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
469 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
470 req->rq_import->imp_connect_data.ocd_max_easize);
471 ptlrpc_request_set_replen(req);
475 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
476 struct lookup_intent *it,
477 struct md_op_data *op_data)
479 struct obd_device *obd = class_exp2obd(exp);
480 struct ptlrpc_request *req;
481 struct ldlm_intent *lit;
482 struct layout_intent *layout;
486 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
487 &RQF_LDLM_INTENT_LAYOUT);
489 RETURN(ERR_PTR(-ENOMEM));
491 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
492 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
494 ptlrpc_request_free(req);
498 /* pack the intent */
499 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
500 lit->opc = (__u64)it->it_op;
502 /* pack the layout intent request */
503 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
504 LASSERT(op_data->op_data != NULL);
505 LASSERT(op_data->op_data_size == sizeof(*layout));
506 memcpy(layout, op_data->op_data, sizeof(*layout));
508 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
509 obd->u.cli.cl_default_mds_easize);
510 ptlrpc_request_set_replen(req);
514 static struct ptlrpc_request *
515 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
517 struct ptlrpc_request *req;
521 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
523 RETURN(ERR_PTR(-ENOMEM));
525 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
527 ptlrpc_request_free(req);
531 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
532 ptlrpc_request_set_replen(req);
536 static int mdc_finish_enqueue(struct obd_export *exp,
537 struct ptlrpc_request *req,
538 struct ldlm_enqueue_info *einfo,
539 struct lookup_intent *it,
540 struct lustre_handle *lockh,
543 struct req_capsule *pill = &req->rq_pill;
544 struct ldlm_request *lockreq;
545 struct ldlm_reply *lockrep;
546 struct ldlm_lock *lock;
547 struct mdt_body *body = NULL;
548 void *lvb_data = NULL;
554 /* Similarly, if we're going to replay this request, we don't want to
555 * actually get a lock, just perform the intent. */
556 if (req->rq_transno || req->rq_replay) {
557 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
558 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
561 if (rc == ELDLM_LOCK_ABORTED) {
563 memset(lockh, 0, sizeof(*lockh));
565 } else { /* rc = 0 */
566 lock = ldlm_handle2lock(lockh);
567 LASSERT(lock != NULL);
569 /* If the server gave us back a different lock mode, we should
570 * fix up our variables. */
571 if (lock->l_req_mode != einfo->ei_mode) {
572 ldlm_lock_addref(lockh, lock->l_req_mode);
573 ldlm_lock_decref(lockh, einfo->ei_mode);
574 einfo->ei_mode = lock->l_req_mode;
579 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
580 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
582 it->it_disposition = (int)lockrep->lock_policy_res1;
583 it->it_status = (int)lockrep->lock_policy_res2;
584 it->it_lock_mode = einfo->ei_mode;
585 it->it_lock_handle = lockh->cookie;
586 it->it_request = req;
588 /* Technically speaking rq_transno must already be zero if
589 * it_status is in error, so the check is a bit redundant */
590 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
591 mdc_clear_replay_flag(req, it->it_status);
593 /* If we're doing an IT_OPEN which did not result in an actual
594 * successful open, then we need to remove the bit which saves
595 * this request for unconditional replay.
597 * It's important that we do this first! Otherwise we might exit the
598 * function without doing so, and try to replay a failed create
600 if (it->it_op & IT_OPEN && req->rq_replay &&
601 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
602 mdc_clear_replay_flag(req, it->it_status);
604 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
605 it->it_op, it->it_disposition, it->it_status);
607 /* We know what to expect, so we do any byte flipping required here */
608 if (it_has_reply_body(it)) {
609 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
611 CERROR ("Can't swab mdt_body\n");
615 if (it_disposition(it, DISP_OPEN_OPEN) &&
616 !it_open_error(DISP_OPEN_OPEN, it)) {
618 * If this is a successful OPEN request, we need to set
619 * replay handler and data early, so that if replay
620 * happens immediately after swabbing below, new reply
621 * is swabbed by that handler correctly.
623 mdc_set_open_replay_data(NULL, NULL, it);
626 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
629 mdc_update_max_ea_from_body(exp, body);
632 * The eadata is opaque; just check that it is there.
633 * Eventually, obd_unpackmd() will check the contents.
635 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
636 body->mbo_eadatasize);
640 /* save lvb data and length in case this is for layout
643 lvb_len = body->mbo_eadatasize;
646 * We save the reply LOV EA in case we have to replay a
647 * create for recovery. If we didn't allocate a large
648 * enough request buffer above we need to reallocate it
649 * here to hold the actual LOV EA.
651 * To not save LOV EA if request is not going to replay
652 * (for example error one).
654 if ((it->it_op & IT_OPEN) && req->rq_replay) {
655 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
656 body->mbo_eadatasize);
658 body->mbo_valid &= ~OBD_MD_FLEASIZE;
659 body->mbo_eadatasize = 0;
664 } else if (it->it_op & IT_LAYOUT) {
665 /* maybe the lock was granted right away and layout
666 * is packed into RMF_DLM_LVB of req */
667 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
669 lvb_data = req_capsule_server_sized_get(pill,
670 &RMF_DLM_LVB, lvb_len);
671 if (lvb_data == NULL)
675 * save replied layout data to the request buffer for
676 * recovery consideration (lest MDS reinitialize
677 * another set of OST objects).
680 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
685 /* fill in stripe data for layout lock.
686 * LU-6581: trust layout data only if layout lock is granted. The MDT
687 * has stopped sending layout unless the layout lock is granted. The
688 * client still does this checking in case it's talking with an old
689 * server. - Jinshan */
690 lock = ldlm_handle2lock(lockh);
694 if (ldlm_has_layout(lock) && lvb_data != NULL &&
695 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
698 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
699 ldlm_it2str(it->it_op), lvb_len);
701 OBD_ALLOC_LARGE(lmm, lvb_len);
703 GOTO(out_lock, rc = -ENOMEM);
705 memcpy(lmm, lvb_data, lvb_len);
707 /* install lvb_data */
708 lock_res_and_lock(lock);
709 if (lock->l_lvb_data == NULL) {
710 lock->l_lvb_type = LVB_T_LAYOUT;
711 lock->l_lvb_data = lmm;
712 lock->l_lvb_len = lvb_len;
715 unlock_res_and_lock(lock);
717 OBD_FREE_LARGE(lmm, lvb_len);
720 if (ldlm_has_dom(lock)) {
721 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
723 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
724 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
725 LDLM_ERROR(lock, "%s: DoM lock without size.\n",
726 exp->exp_obd->obd_name);
727 GOTO(out_lock, rc = -EPROTO);
730 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
731 ldlm_it2str(it->it_op), body->mbo_dom_size);
733 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
741 /* We always reserve enough space in the reply packet for a stripe MD, because
742 * we don't know in advance the file type. */
743 static int mdc_enqueue_base(struct obd_export *exp,
744 struct ldlm_enqueue_info *einfo,
745 const union ldlm_policy_data *policy,
746 struct lookup_intent *it,
747 struct md_op_data *op_data,
748 struct lustre_handle *lockh,
749 __u64 extra_lock_flags)
751 struct obd_device *obddev = class_exp2obd(exp);
752 struct ptlrpc_request *req = NULL;
753 __u64 flags, saved_flags = extra_lock_flags;
754 struct ldlm_res_id res_id;
755 static const union ldlm_policy_data lookup_policy = {
756 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
757 static const union ldlm_policy_data update_policy = {
758 .l_inodebits = { MDS_INODELOCK_UPDATE } };
759 static const union ldlm_policy_data layout_policy = {
760 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
761 static const union ldlm_policy_data getxattr_policy = {
762 .l_inodebits = { MDS_INODELOCK_XATTR } };
763 int generation, resends = 0;
764 struct ldlm_reply *lockrep;
765 enum lvb_type lvb_type = 0;
769 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
771 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
774 LASSERT(policy == NULL);
776 saved_flags |= LDLM_FL_HAS_INTENT;
777 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
778 policy = &update_policy;
779 else if (it->it_op & IT_LAYOUT)
780 policy = &layout_policy;
781 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
782 policy = &getxattr_policy;
784 policy = &lookup_policy;
787 generation = obddev->u.cli.cl_import->imp_generation;
791 /* The only way right now is FLOCK. */
792 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
794 res_id.name[3] = LDLM_FLOCK;
795 } else if (it->it_op & IT_OPEN) {
796 req = mdc_intent_open_pack(exp, it, op_data);
797 } else if (it->it_op & IT_UNLINK) {
798 req = mdc_intent_unlink_pack(exp, it, op_data);
799 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
800 req = mdc_intent_getattr_pack(exp, it, op_data);
801 } else if (it->it_op & IT_READDIR) {
802 req = mdc_enqueue_pack(exp, 0);
803 } else if (it->it_op & IT_LAYOUT) {
804 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
806 req = mdc_intent_layout_pack(exp, it, op_data);
807 lvb_type = LVB_T_LAYOUT;
808 } else if (it->it_op & IT_GETXATTR) {
809 req = mdc_intent_getxattr_pack(exp, it, op_data);
816 RETURN(PTR_ERR(req));
819 req->rq_generation_set = 1;
820 req->rq_import_generation = generation;
821 req->rq_sent = ktime_get_real_seconds() + resends;
824 /* It is important to obtain modify RPC slot first (if applicable), so
825 * that threads that are waiting for a modify RPC slot are not polluting
826 * our rpcs in flight counter.
827 * We do not do flock request limiting, though */
829 mdc_get_mod_rpc_slot(req, it);
830 rc = obd_get_request_slot(&obddev->u.cli);
832 mdc_put_mod_rpc_slot(req, it);
833 mdc_clear_replay_flag(req, 0);
834 ptlrpc_req_finished(req);
839 /* With Data-on-MDT the glimpse callback is needed too.
840 * It is set here in advance but not in mdc_finish_enqueue()
841 * to avoid possible races. It is safe to have glimpse handler
842 * for non-DOM locks and costs nothing.*/
843 if (einfo->ei_cb_gl == NULL)
844 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
846 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
847 0, lvb_type, lockh, 0);
849 /* For flock requests we immediatelly return without further
850 delay and let caller deal with the rest, since rest of
851 this function metadata processing makes no sense for flock
852 requests anyway. But in case of problem during comms with
853 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
854 can not rely on caller and this mainly for F_UNLCKs
855 (explicits or automatically generated by Kernel to clean
856 current FLocks upon exit) that can't be trashed */
857 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
858 (einfo->ei_type == LDLM_FLOCK) &&
859 (einfo->ei_mode == LCK_NL))
864 obd_put_request_slot(&obddev->u.cli);
865 mdc_put_mod_rpc_slot(req, it);
869 "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
870 obddev->obd_name, PFID(&op_data->op_fid1),
871 PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
873 mdc_clear_replay_flag(req, rc);
874 ptlrpc_req_finished(req);
878 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
879 LASSERT(lockrep != NULL);
881 lockrep->lock_policy_res2 =
882 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
884 /* Retry infinitely when the server returns -EINPROGRESS for the
885 * intent operation, when server returns -EINPROGRESS for acquiring
886 * intent lock, we'll retry in after_reply(). */
887 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
888 mdc_clear_replay_flag(req, rc);
889 ptlrpc_req_finished(req);
892 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
893 obddev->obd_name, resends, it->it_op,
894 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
896 if (generation == obddev->u.cli.cl_import->imp_generation) {
899 CDEBUG(D_HA, "resend cross eviction\n");
904 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
906 if (lustre_handle_is_used(lockh)) {
907 ldlm_lock_decref(lockh, einfo->ei_mode);
908 memset(lockh, 0, sizeof(*lockh));
910 ptlrpc_req_finished(req);
912 it->it_lock_handle = 0;
913 it->it_lock_mode = 0;
914 it->it_request = NULL;
920 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
921 const union ldlm_policy_data *policy,
922 struct md_op_data *op_data,
923 struct lustre_handle *lockh, __u64 extra_lock_flags)
925 return mdc_enqueue_base(exp, einfo, policy, NULL,
926 op_data, lockh, extra_lock_flags);
929 static int mdc_finish_intent_lock(struct obd_export *exp,
930 struct ptlrpc_request *request,
931 struct md_op_data *op_data,
932 struct lookup_intent *it,
933 struct lustre_handle *lockh)
935 struct lustre_handle old_lock;
936 struct ldlm_lock *lock;
940 LASSERT(request != NULL);
941 LASSERT(request != LP_POISON);
942 LASSERT(request->rq_repmsg != LP_POISON);
944 if (it->it_op & IT_READDIR)
947 if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
948 if (it->it_status != 0)
949 GOTO(out, rc = it->it_status);
951 if (!it_disposition(it, DISP_IT_EXECD)) {
952 /* The server failed before it even started executing
953 * the intent, i.e. because it couldn't unpack the
956 LASSERT(it->it_status != 0);
957 GOTO(out, rc = it->it_status);
959 rc = it_open_error(DISP_IT_EXECD, it);
963 rc = it_open_error(DISP_LOOKUP_EXECD, it);
967 /* keep requests around for the multiple phases of the call
968 * this shows the DISP_XX must guarantee we make it into the
971 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
972 it_disposition(it, DISP_OPEN_CREATE) &&
973 !it_open_error(DISP_OPEN_CREATE, it)) {
974 it_set_disposition(it, DISP_ENQ_CREATE_REF);
975 /* balanced in ll_create_node */
976 ptlrpc_request_addref(request);
978 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
979 it_disposition(it, DISP_OPEN_OPEN) &&
980 !it_open_error(DISP_OPEN_OPEN, it)) {
981 it_set_disposition(it, DISP_ENQ_OPEN_REF);
982 /* balanced in ll_file_open */
983 ptlrpc_request_addref(request);
984 /* BUG 11546 - eviction in the middle of open rpc
987 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
991 if (it->it_op & IT_CREAT) {
992 /* XXX this belongs in ll_create_it */
993 } else if (it->it_op == IT_OPEN) {
994 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
996 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1000 /* If we already have a matching lock, then cancel the new
1001 * one. We have to set the data here instead of in
1002 * mdc_enqueue, because we need to use the child's inode as
1003 * the l_ast_data to match, and that's not available until
1004 * intent_finish has performed the iget().) */
1005 lock = ldlm_handle2lock(lockh);
1007 union ldlm_policy_data policy = lock->l_policy_data;
1008 LDLM_DEBUG(lock, "matching against this");
1010 if (it_has_reply_body(it)) {
1011 struct mdt_body *body;
1013 body = req_capsule_server_get(&request->rq_pill,
1015 /* mdc_enqueue checked */
1016 LASSERT(body != NULL);
1017 LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1018 &lock->l_resource->lr_name),
1019 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1020 PLDLMRES(lock->l_resource),
1021 PFID(&body->mbo_fid1));
1023 LDLM_LOCK_PUT(lock);
1025 memcpy(&old_lock, lockh, sizeof(*lockh));
1026 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1027 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1028 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1029 memcpy(lockh, &old_lock, sizeof(old_lock));
1030 it->it_lock_handle = lockh->cookie;
1036 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1037 (int)op_data->op_namelen, op_data->op_name,
1038 ldlm_it2str(it->it_op), it->it_status,
1039 it->it_disposition, rc);
1043 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1044 struct lu_fid *fid, __u64 *bits)
1046 /* We could just return 1 immediately, but since we should only
1047 * be called in revalidate_it if we already have a lock, let's
1049 struct ldlm_res_id res_id;
1050 struct lustre_handle lockh;
1051 union ldlm_policy_data policy;
1052 enum ldlm_mode mode;
1055 if (it->it_lock_handle) {
1056 lockh.cookie = it->it_lock_handle;
1057 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1059 fid_build_reg_res_name(fid, &res_id);
1060 switch (it->it_op) {
1062 /* File attributes are held under multiple bits:
1063 * nlink is under lookup lock, size and times are
1064 * under UPDATE lock and recently we've also got
1065 * a separate permissions lock for owner/group/acl that
1066 * were protected by lookup lock before.
1067 * Getattr must provide all of that information,
1068 * so we need to ensure we have all of those locks.
1069 * Unfortunately, if the bits are split across multiple
1070 * locks, there's no easy way to match all of them here,
1071 * so an extra RPC would be performed to fetch all
1072 * of those bits at once for now. */
1073 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1074 * but for old MDTs (< 2.4), permission is covered
1075 * by LOOKUP lock, so it needs to match all bits here.*/
1076 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1077 MDS_INODELOCK_LOOKUP |
1081 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1084 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1087 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1091 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1092 LDLM_IBITS, &policy,
1093 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1098 it->it_lock_handle = lockh.cookie;
1099 it->it_lock_mode = mode;
1101 it->it_lock_handle = 0;
1102 it->it_lock_mode = 0;
1109 * This long block is all about fixing up the lock and request state
1110 * so that it is correct as of the moment _before_ the operation was
1111 * applied; that way, the VFS will think that everything is normal and
1112 * call Lustre's regular VFS methods.
1114 * If we're performing a creation, that means that unless the creation
1115 * failed with EEXIST, we should fake up a negative dentry.
1117 * For everything else, we want to lookup to succeed.
1119 * One additional note: if CREATE or OPEN succeeded, we add an extra
1120 * reference to the request because we need to keep it around until
1121 * ll_create/ll_open gets called.
1123 * The server will return to us, in it_disposition, an indication of
1124 * exactly what it_status refers to.
1126 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1127 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1128 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1129 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1132 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1135 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1136 struct lookup_intent *it, struct ptlrpc_request **reqp,
1137 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1139 struct ldlm_enqueue_info einfo = {
1140 .ei_type = LDLM_IBITS,
1141 .ei_mode = it_to_lock_mode(it),
1142 .ei_cb_bl = cb_blocking,
1143 .ei_cb_cp = ldlm_completion_ast,
1144 .ei_cb_gl = mdc_ldlm_glimpse_ast,
1146 struct lustre_handle lockh;
1151 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1152 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1153 op_data->op_name, PFID(&op_data->op_fid2),
1154 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1158 if (fid_is_sane(&op_data->op_fid2) &&
1159 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1160 /* We could just return 1 immediately, but since we should only
1161 * be called in revalidate_it if we already have a lock, let's
1163 it->it_lock_handle = 0;
1164 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1165 /* Only return failure if it was not GETATTR by cfid
1166 (from inode_revalidate) */
1167 if (rc || op_data->op_namelen != 0)
1171 /* For case if upper layer did not alloc fid, do it now. */
1172 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1173 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1175 CERROR("Can't alloc new fid, rc %d\n", rc);
1180 rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1185 *reqp = it->it_request;
1186 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1190 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1191 struct ptlrpc_request *req,
1194 struct mdc_getattr_args *ga = args;
1195 struct obd_export *exp = ga->ga_exp;
1196 struct md_enqueue_info *minfo = ga->ga_minfo;
1197 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1198 struct lookup_intent *it;
1199 struct lustre_handle *lockh;
1200 struct obd_device *obddev;
1201 struct ldlm_reply *lockrep;
1202 __u64 flags = LDLM_FL_HAS_INTENT;
1206 lockh = &minfo->mi_lockh;
1208 obddev = class_exp2obd(exp);
1210 obd_put_request_slot(&obddev->u.cli);
1211 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1214 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1215 &flags, NULL, 0, lockh, rc);
1217 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1218 mdc_clear_replay_flag(req, rc);
1222 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1223 LASSERT(lockrep != NULL);
1225 lockrep->lock_policy_res2 =
1226 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1228 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1232 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1236 minfo->mi_cb(req, minfo, rc);
1240 int mdc_intent_getattr_async(struct obd_export *exp,
1241 struct md_enqueue_info *minfo)
1243 struct md_op_data *op_data = &minfo->mi_data;
1244 struct lookup_intent *it = &minfo->mi_it;
1245 struct ptlrpc_request *req;
1246 struct mdc_getattr_args *ga;
1247 struct obd_device *obddev = class_exp2obd(exp);
1248 struct ldlm_res_id res_id;
1249 union ldlm_policy_data policy = {
1250 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1251 MDS_INODELOCK_UPDATE } };
1253 __u64 flags = LDLM_FL_HAS_INTENT;
1256 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1257 (int)op_data->op_namelen, op_data->op_name,
1258 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1260 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1261 req = mdc_intent_getattr_pack(exp, it, op_data);
1263 RETURN(PTR_ERR(req));
1265 rc = obd_get_request_slot(&obddev->u.cli);
1267 ptlrpc_req_finished(req);
1271 /* With Data-on-MDT the glimpse callback is needed too.
1272 * It is set here in advance but not in mdc_finish_enqueue()
1273 * to avoid possible races. It is safe to have glimpse handler
1274 * for non-DOM locks and costs nothing.*/
1275 if (minfo->mi_einfo.ei_cb_gl == NULL)
1276 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1278 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1279 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1281 obd_put_request_slot(&obddev->u.cli);
1282 ptlrpc_req_finished(req);
1286 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1287 ga = ptlrpc_req_async_args(req);
1289 ga->ga_minfo = minfo;
1291 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1292 ptlrpcd_add_req(req);