4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_MDC
35 #include <linux/module.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
48 #include "mdc_internal.h"
50 struct mdc_getattr_args {
51 struct obd_export *ga_exp;
52 struct md_enqueue_info *ga_minfo;
55 int it_open_error(int phase, struct lookup_intent *it)
57 if (it_disposition(it, DISP_OPEN_LEASE)) {
58 if (phase >= DISP_OPEN_LEASE)
63 if (it_disposition(it, DISP_OPEN_OPEN)) {
64 if (phase >= DISP_OPEN_OPEN)
70 if (it_disposition(it, DISP_OPEN_CREATE)) {
71 if (phase >= DISP_OPEN_CREATE)
77 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78 if (phase >= DISP_LOOKUP_EXECD)
84 if (it_disposition(it, DISP_IT_EXECD)) {
85 if (phase >= DISP_IT_EXECD)
91 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
96 EXPORT_SYMBOL(it_open_error);
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100 void *data, __u64 *bits)
102 struct ldlm_lock *lock;
103 struct inode *new_inode = data;
109 if (!lustre_handle_is_used(lockh))
112 lock = ldlm_handle2lock(lockh);
114 LASSERT(lock != NULL);
115 lock_res_and_lock(lock);
116 if (lock->l_resource->lr_lvb_inode &&
117 lock->l_resource->lr_lvb_inode != data) {
118 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119 LASSERTF(old_inode->i_state & I_FREEING,
120 "Found existing inode %p/%lu/%u state %lu in lock: "
121 "setting data to %p/%lu/%u\n", old_inode,
122 old_inode->i_ino, old_inode->i_generation,
124 new_inode, new_inode->i_ino, new_inode->i_generation);
126 lock->l_resource->lr_lvb_inode = new_inode;
128 *bits = lock->l_policy_data.l_inodebits.bits;
130 unlock_res_and_lock(lock);
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137 const struct lu_fid *fid, enum ldlm_type type,
138 union ldlm_policy_data *policy,
139 enum ldlm_mode mode, struct lustre_handle *lockh)
141 struct ldlm_res_id res_id;
145 fid_build_reg_res_name(fid, &res_id);
146 /* LU-4405: Clear bits not supported by server */
147 policy->l_inodebits.bits &= exp_connect_ibits(exp);
148 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149 &res_id, type, policy, mode, lockh, 0);
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154 union ldlm_policy_data *policy, enum ldlm_mode mode,
155 enum ldlm_cancel_flags flags, void *opaque)
157 struct obd_device *obd = class_exp2obd(exp);
158 struct ldlm_res_id res_id;
163 fid_build_reg_res_name(fid, &res_id);
164 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165 policy, mode, flags, opaque);
169 int mdc_null_inode(struct obd_export *exp,
170 const struct lu_fid *fid)
172 struct ldlm_res_id res_id;
173 struct ldlm_resource *res;
174 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
177 LASSERTF(ns != NULL, "no namespace passed\n");
179 fid_build_reg_res_name(fid, &res_id);
181 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
186 res->lr_lvb_inode = NULL;
189 ldlm_resource_putref(res);
193 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
195 /* Don't hold error requests for replay. */
196 if (req->rq_replay) {
197 spin_lock(&req->rq_lock);
199 spin_unlock(&req->rq_lock);
201 if (rc && req->rq_transno != 0) {
202 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
207 /* Save a large LOV EA into the request buffer so that it is available
208 * for replay. We don't do this in the initial request because the
209 * original request doesn't need this buffer (at most it sends just the
210 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211 * buffer and may also be difficult to allocate and save a very large
212 * request buffer for each open. (bug 5707)
214 * OOM here may cause recovery failure if lmm is needed (only for the
215 * original open if the MDS crashed just when this client also OOM'd)
216 * but this is incredibly unlikely, and questionable whether the client
217 * could do MDS recovery under OOM anyways... */
218 int mdc_save_lovea(struct ptlrpc_request *req,
219 const struct req_msg_field *field,
220 void *data, u32 size)
222 struct req_capsule *pill = &req->rq_pill;
226 if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
227 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
229 CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
230 req->rq_export->exp_obd->obd_name,
235 req_capsule_shrink(pill, field, size, RCL_CLIENT);
238 req_capsule_set_size(pill, field, RCL_CLIENT, size);
239 lmm = req_capsule_client_get(pill, field);
241 memcpy(lmm, data, size);
246 static struct ptlrpc_request *
247 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
248 struct md_op_data *op_data, __u32 acl_bufsize)
250 struct ptlrpc_request *req;
251 struct obd_device *obddev = class_exp2obd(exp);
252 struct ldlm_intent *lit;
253 const void *lmm = op_data->op_data;
254 __u32 lmmsize = op_data->op_data_size;
255 struct list_head cancels = LIST_HEAD_INIT(cancels);
261 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
263 /* XXX: openlock is not cancelled for cross-refs. */
264 /* If inode is known, cancel conflicting OPEN locks. */
265 if (fid_is_sane(&op_data->op_fid2)) {
266 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
267 if (it->it_flags & MDS_FMODE_WRITE)
272 if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
275 else if (it->it_flags & FMODE_EXEC)
281 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
286 /* If CREATE, cancel parent's UPDATE lock. */
287 if (it->it_op & IT_CREAT)
291 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
293 MDS_INODELOCK_UPDATE);
295 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
296 &RQF_LDLM_INTENT_OPEN);
298 ldlm_lock_list_put(&cancels, l_bl_ast, count);
299 RETURN(ERR_PTR(-ENOMEM));
302 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
303 op_data->op_namelen + 1);
304 if (cl_is_lov_delay_create(it->it_flags)) {
305 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
306 LASSERT(lmmsize == 0);
307 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
309 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
310 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
313 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
314 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
315 strlen(op_data->op_file_secctx_name) + 1 : 0);
317 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
318 op_data->op_file_secctx_size);
320 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
322 ptlrpc_request_free(req);
326 spin_lock(&req->rq_lock);
327 req->rq_replay = req->rq_import->imp_replayable;
328 spin_unlock(&req->rq_lock);
330 /* pack the intent */
331 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
332 lit->opc = (__u64)it->it_op;
334 /* pack the intended request */
335 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
338 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
339 obddev->u.cli.cl_max_mds_easize);
340 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
341 ptlrpc_request_set_replen(req);
345 #define GA_DEFAULT_EA_NAME_LEN 20
346 #define GA_DEFAULT_EA_VAL_LEN 250
347 #define GA_DEFAULT_EA_NUM 10
349 static struct ptlrpc_request *
350 mdc_intent_getxattr_pack(struct obd_export *exp,
351 struct lookup_intent *it,
352 struct md_op_data *op_data)
354 struct ptlrpc_request *req;
355 struct ldlm_intent *lit;
357 struct list_head cancels = LIST_HEAD_INIT(cancels);
361 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
362 &RQF_LDLM_INTENT_GETXATTR);
364 RETURN(ERR_PTR(-ENOMEM));
366 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
368 ptlrpc_request_free(req);
372 /* pack the intent */
373 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
374 lit->opc = IT_GETXATTR;
376 /* pack the intended request */
377 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
378 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM,
381 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
382 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
384 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
385 GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM);
387 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
388 sizeof(__u32) * GA_DEFAULT_EA_NUM);
390 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
392 ptlrpc_request_set_replen(req);
397 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
398 struct lookup_intent *it,
399 struct md_op_data *op_data)
401 struct ptlrpc_request *req;
402 struct obd_device *obddev = class_exp2obd(exp);
403 struct ldlm_intent *lit;
407 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
408 &RQF_LDLM_INTENT_UNLINK);
410 RETURN(ERR_PTR(-ENOMEM));
412 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
413 op_data->op_namelen + 1);
415 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
417 ptlrpc_request_free(req);
421 /* pack the intent */
422 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
423 lit->opc = (__u64)it->it_op;
425 /* pack the intended request */
426 mdc_unlink_pack(req, op_data);
428 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
429 obddev->u.cli.cl_default_mds_easize);
430 ptlrpc_request_set_replen(req);
434 static struct ptlrpc_request *
435 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
436 struct md_op_data *op_data, __u32 acl_bufsize)
438 struct ptlrpc_request *req;
439 struct obd_device *obddev = class_exp2obd(exp);
440 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
441 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
442 OBD_MD_MEA | OBD_MD_FLACL;
443 struct ldlm_intent *lit;
448 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
449 &RQF_LDLM_INTENT_GETATTR);
451 RETURN(ERR_PTR(-ENOMEM));
453 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
454 op_data->op_namelen + 1);
456 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
458 ptlrpc_request_free(req);
462 /* pack the intent */
463 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
464 lit->opc = (__u64)it->it_op;
466 if (obddev->u.cli.cl_default_mds_easize > 0)
467 easize = obddev->u.cli.cl_default_mds_easize;
469 easize = obddev->u.cli.cl_max_mds_easize;
471 /* pack the intended request */
472 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
474 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
475 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
476 ptlrpc_request_set_replen(req);
480 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
481 struct lookup_intent *it,
482 struct md_op_data *op_data)
484 struct obd_device *obd = class_exp2obd(exp);
485 struct ptlrpc_request *req;
486 struct ldlm_intent *lit;
487 struct layout_intent *layout;
491 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
492 &RQF_LDLM_INTENT_LAYOUT);
494 RETURN(ERR_PTR(-ENOMEM));
496 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
497 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
499 ptlrpc_request_free(req);
503 /* pack the intent */
504 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
505 lit->opc = (__u64)it->it_op;
507 /* pack the layout intent request */
508 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
509 LASSERT(op_data->op_data != NULL);
510 LASSERT(op_data->op_data_size == sizeof(*layout));
511 memcpy(layout, op_data->op_data, sizeof(*layout));
513 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
514 obd->u.cli.cl_default_mds_easize);
515 ptlrpc_request_set_replen(req);
519 static struct ptlrpc_request *
520 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
522 struct ptlrpc_request *req;
526 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
528 RETURN(ERR_PTR(-ENOMEM));
530 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
532 ptlrpc_request_free(req);
536 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
537 ptlrpc_request_set_replen(req);
541 static int mdc_finish_enqueue(struct obd_export *exp,
542 struct ptlrpc_request *req,
543 struct ldlm_enqueue_info *einfo,
544 struct lookup_intent *it,
545 struct lustre_handle *lockh,
548 struct req_capsule *pill = &req->rq_pill;
549 struct ldlm_request *lockreq;
550 struct ldlm_reply *lockrep;
551 struct ldlm_lock *lock;
552 struct mdt_body *body = NULL;
553 void *lvb_data = NULL;
559 /* Similarly, if we're going to replay this request, we don't want to
560 * actually get a lock, just perform the intent. */
561 if (req->rq_transno || req->rq_replay) {
562 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
563 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
566 if (rc == ELDLM_LOCK_ABORTED) {
568 memset(lockh, 0, sizeof(*lockh));
570 } else { /* rc = 0 */
571 lock = ldlm_handle2lock(lockh);
572 LASSERT(lock != NULL);
574 /* If the server gave us back a different lock mode, we should
575 * fix up our variables. */
576 if (lock->l_req_mode != einfo->ei_mode) {
577 ldlm_lock_addref(lockh, lock->l_req_mode);
578 ldlm_lock_decref(lockh, einfo->ei_mode);
579 einfo->ei_mode = lock->l_req_mode;
584 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
585 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
587 it->it_disposition = (int)lockrep->lock_policy_res1;
588 it->it_status = (int)lockrep->lock_policy_res2;
589 it->it_lock_mode = einfo->ei_mode;
590 it->it_lock_handle = lockh->cookie;
591 it->it_request = req;
593 /* Technically speaking rq_transno must already be zero if
594 * it_status is in error, so the check is a bit redundant */
595 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
596 mdc_clear_replay_flag(req, it->it_status);
598 /* If we're doing an IT_OPEN which did not result in an actual
599 * successful open, then we need to remove the bit which saves
600 * this request for unconditional replay.
602 * It's important that we do this first! Otherwise we might exit the
603 * function without doing so, and try to replay a failed create
605 if (it->it_op & IT_OPEN && req->rq_replay &&
606 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
607 mdc_clear_replay_flag(req, it->it_status);
609 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
610 it->it_op, it->it_disposition, it->it_status);
612 /* We know what to expect, so we do any byte flipping required here */
613 if (it_has_reply_body(it)) {
614 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
616 CERROR ("Can't swab mdt_body\n");
620 if (it_disposition(it, DISP_OPEN_OPEN) &&
621 !it_open_error(DISP_OPEN_OPEN, it)) {
623 * If this is a successful OPEN request, we need to set
624 * replay handler and data early, so that if replay
625 * happens immediately after swabbing below, new reply
626 * is swabbed by that handler correctly.
628 mdc_set_open_replay_data(NULL, NULL, it);
631 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
634 mdc_update_max_ea_from_body(exp, body);
637 * The eadata is opaque; just check that it is there.
638 * Eventually, obd_unpackmd() will check the contents.
640 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
641 body->mbo_eadatasize);
645 /* save lvb data and length in case this is for layout
648 lvb_len = body->mbo_eadatasize;
651 * We save the reply LOV EA in case we have to replay a
652 * create for recovery. If we didn't allocate a large
653 * enough request buffer above we need to reallocate it
654 * here to hold the actual LOV EA.
656 * To not save LOV EA if request is not going to replay
657 * (for example error one).
659 if ((it->it_op & IT_OPEN) && req->rq_replay) {
660 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
661 body->mbo_eadatasize);
663 body->mbo_valid &= ~OBD_MD_FLEASIZE;
664 body->mbo_eadatasize = 0;
669 } else if (it->it_op & IT_LAYOUT) {
670 /* maybe the lock was granted right away and layout
671 * is packed into RMF_DLM_LVB of req */
672 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
674 lvb_data = req_capsule_server_sized_get(pill,
675 &RMF_DLM_LVB, lvb_len);
676 if (lvb_data == NULL)
680 * save replied layout data to the request buffer for
681 * recovery consideration (lest MDS reinitialize
682 * another set of OST objects).
685 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
690 /* fill in stripe data for layout lock.
691 * LU-6581: trust layout data only if layout lock is granted. The MDT
692 * has stopped sending layout unless the layout lock is granted. The
693 * client still does this checking in case it's talking with an old
694 * server. - Jinshan */
695 lock = ldlm_handle2lock(lockh);
699 if (ldlm_has_layout(lock) && lvb_data != NULL &&
700 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
703 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
704 ldlm_it2str(it->it_op), lvb_len);
706 OBD_ALLOC_LARGE(lmm, lvb_len);
708 GOTO(out_lock, rc = -ENOMEM);
710 memcpy(lmm, lvb_data, lvb_len);
712 /* install lvb_data */
713 lock_res_and_lock(lock);
714 if (lock->l_lvb_data == NULL) {
715 lock->l_lvb_type = LVB_T_LAYOUT;
716 lock->l_lvb_data = lmm;
717 lock->l_lvb_len = lvb_len;
720 unlock_res_and_lock(lock);
722 OBD_FREE_LARGE(lmm, lvb_len);
725 if (ldlm_has_dom(lock)) {
726 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
728 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
729 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
730 LDLM_ERROR(lock, "%s: DoM lock without size.\n",
731 exp->exp_obd->obd_name);
732 GOTO(out_lock, rc = -EPROTO);
735 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
736 ldlm_it2str(it->it_op), body->mbo_dom_size);
738 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
746 /* We always reserve enough space in the reply packet for a stripe MD, because
747 * we don't know in advance the file type. */
748 static int mdc_enqueue_base(struct obd_export *exp,
749 struct ldlm_enqueue_info *einfo,
750 const union ldlm_policy_data *policy,
751 struct lookup_intent *it,
752 struct md_op_data *op_data,
753 struct lustre_handle *lockh,
754 __u64 extra_lock_flags)
756 struct obd_device *obddev = class_exp2obd(exp);
757 struct ptlrpc_request *req = NULL;
758 __u64 flags, saved_flags = extra_lock_flags;
759 struct ldlm_res_id res_id;
760 static const union ldlm_policy_data lookup_policy = {
761 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
762 static const union ldlm_policy_data update_policy = {
763 .l_inodebits = { MDS_INODELOCK_UPDATE } };
764 static const union ldlm_policy_data layout_policy = {
765 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
766 static const union ldlm_policy_data getxattr_policy = {
767 .l_inodebits = { MDS_INODELOCK_XATTR } };
768 int generation, resends = 0;
769 struct ldlm_reply *lockrep;
770 struct obd_import *imp = class_exp2cliimp(exp);
772 enum lvb_type lvb_type = 0;
776 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
778 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
781 LASSERT(policy == NULL);
783 saved_flags |= LDLM_FL_HAS_INTENT;
784 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
785 policy = &update_policy;
786 else if (it->it_op & IT_LAYOUT)
787 policy = &layout_policy;
788 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
789 policy = &getxattr_policy;
791 policy = &lookup_policy;
794 generation = obddev->u.cli.cl_import->imp_generation;
795 if (!it || (it->it_op & (IT_CREAT | IT_OPEN_CREAT)))
796 acl_bufsize = imp->imp_connect_data.ocd_max_easize;
798 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
803 /* The only way right now is FLOCK. */
804 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
806 res_id.name[3] = LDLM_FLOCK;
807 } else if (it->it_op & IT_OPEN) {
808 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
809 } else if (it->it_op & IT_UNLINK) {
810 req = mdc_intent_unlink_pack(exp, it, op_data);
811 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
812 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
813 } else if (it->it_op & IT_READDIR) {
814 req = mdc_enqueue_pack(exp, 0);
815 } else if (it->it_op & IT_LAYOUT) {
816 if (!imp_connect_lvb_type(imp))
818 req = mdc_intent_layout_pack(exp, it, op_data);
819 lvb_type = LVB_T_LAYOUT;
820 } else if (it->it_op & IT_GETXATTR) {
821 req = mdc_intent_getxattr_pack(exp, it, op_data);
828 RETURN(PTR_ERR(req));
831 req->rq_generation_set = 1;
832 req->rq_import_generation = generation;
833 req->rq_sent = ktime_get_real_seconds() + resends;
836 /* It is important to obtain modify RPC slot first (if applicable), so
837 * that threads that are waiting for a modify RPC slot are not polluting
838 * our rpcs in flight counter.
839 * We do not do flock request limiting, though */
841 mdc_get_mod_rpc_slot(req, it);
842 rc = obd_get_request_slot(&obddev->u.cli);
844 mdc_put_mod_rpc_slot(req, it);
845 mdc_clear_replay_flag(req, 0);
846 ptlrpc_req_finished(req);
851 /* With Data-on-MDT the glimpse callback is needed too.
852 * It is set here in advance but not in mdc_finish_enqueue()
853 * to avoid possible races. It is safe to have glimpse handler
854 * for non-DOM locks and costs nothing.*/
855 if (einfo->ei_cb_gl == NULL)
856 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
858 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
859 0, lvb_type, lockh, 0);
861 /* For flock requests we immediatelly return without further
862 delay and let caller deal with the rest, since rest of
863 this function metadata processing makes no sense for flock
864 requests anyway. But in case of problem during comms with
865 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
866 can not rely on caller and this mainly for F_UNLCKs
867 (explicits or automatically generated by Kernel to clean
868 current FLocks upon exit) that can't be trashed */
869 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
870 (einfo->ei_type == LDLM_FLOCK) &&
871 (einfo->ei_mode == LCK_NL))
876 obd_put_request_slot(&obddev->u.cli);
877 mdc_put_mod_rpc_slot(req, it);
881 "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
882 obddev->obd_name, PFID(&op_data->op_fid1),
883 PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
885 mdc_clear_replay_flag(req, rc);
886 ptlrpc_req_finished(req);
890 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
891 LASSERT(lockrep != NULL);
893 lockrep->lock_policy_res2 =
894 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
896 /* Retry infinitely when the server returns -EINPROGRESS for the
897 * intent operation, when server returns -EINPROGRESS for acquiring
898 * intent lock, we'll retry in after_reply(). */
899 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
900 mdc_clear_replay_flag(req, rc);
901 ptlrpc_req_finished(req);
902 if (generation == obddev->u.cli.cl_import->imp_generation) {
903 if (signal_pending(current))
907 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
908 obddev->obd_name, resends, it->it_op,
909 PFID(&op_data->op_fid1),
910 PFID(&op_data->op_fid2));
913 CDEBUG(D_HA, "resend cross eviction\n");
918 if ((int)lockrep->lock_policy_res2 == -ERANGE &&
919 it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
920 acl_bufsize != imp->imp_connect_data.ocd_max_easize) {
921 mdc_clear_replay_flag(req, -ERANGE);
922 ptlrpc_req_finished(req);
923 acl_bufsize = imp->imp_connect_data.ocd_max_easize;
927 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
929 if (lustre_handle_is_used(lockh)) {
930 ldlm_lock_decref(lockh, einfo->ei_mode);
931 memset(lockh, 0, sizeof(*lockh));
933 ptlrpc_req_finished(req);
935 it->it_lock_handle = 0;
936 it->it_lock_mode = 0;
937 it->it_request = NULL;
943 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
944 const union ldlm_policy_data *policy,
945 struct md_op_data *op_data,
946 struct lustre_handle *lockh, __u64 extra_lock_flags)
948 return mdc_enqueue_base(exp, einfo, policy, NULL,
949 op_data, lockh, extra_lock_flags);
952 static int mdc_finish_intent_lock(struct obd_export *exp,
953 struct ptlrpc_request *request,
954 struct md_op_data *op_data,
955 struct lookup_intent *it,
956 struct lustre_handle *lockh)
958 struct lustre_handle old_lock;
959 struct ldlm_lock *lock;
963 LASSERT(request != NULL);
964 LASSERT(request != LP_POISON);
965 LASSERT(request->rq_repmsg != LP_POISON);
967 if (it->it_op & IT_READDIR)
970 if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
971 if (it->it_status != 0)
972 GOTO(out, rc = it->it_status);
974 if (!it_disposition(it, DISP_IT_EXECD)) {
975 /* The server failed before it even started executing
976 * the intent, i.e. because it couldn't unpack the
979 LASSERT(it->it_status != 0);
980 GOTO(out, rc = it->it_status);
982 rc = it_open_error(DISP_IT_EXECD, it);
986 rc = it_open_error(DISP_LOOKUP_EXECD, it);
990 /* keep requests around for the multiple phases of the call
991 * this shows the DISP_XX must guarantee we make it into the
994 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
995 it_disposition(it, DISP_OPEN_CREATE) &&
996 !it_open_error(DISP_OPEN_CREATE, it)) {
997 it_set_disposition(it, DISP_ENQ_CREATE_REF);
998 /* balanced in ll_create_node */
999 ptlrpc_request_addref(request);
1001 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1002 it_disposition(it, DISP_OPEN_OPEN) &&
1003 !it_open_error(DISP_OPEN_OPEN, it)) {
1004 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1005 /* balanced in ll_file_open */
1006 ptlrpc_request_addref(request);
1007 /* BUG 11546 - eviction in the middle of open rpc
1010 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1014 if (it->it_op & IT_CREAT) {
1015 /* XXX this belongs in ll_create_it */
1016 } else if (it->it_op == IT_OPEN) {
1017 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1019 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1023 /* If we already have a matching lock, then cancel the new
1024 * one. We have to set the data here instead of in
1025 * mdc_enqueue, because we need to use the child's inode as
1026 * the l_ast_data to match, and that's not available until
1027 * intent_finish has performed the iget().) */
1028 lock = ldlm_handle2lock(lockh);
1030 union ldlm_policy_data policy = lock->l_policy_data;
1031 LDLM_DEBUG(lock, "matching against this");
1033 if (it_has_reply_body(it)) {
1034 struct mdt_body *body;
1036 body = req_capsule_server_get(&request->rq_pill,
1038 /* mdc_enqueue checked */
1039 LASSERT(body != NULL);
1040 LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1041 &lock->l_resource->lr_name),
1042 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1043 PLDLMRES(lock->l_resource),
1044 PFID(&body->mbo_fid1));
1046 LDLM_LOCK_PUT(lock);
1048 memcpy(&old_lock, lockh, sizeof(*lockh));
1049 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1050 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1051 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1052 memcpy(lockh, &old_lock, sizeof(old_lock));
1053 it->it_lock_handle = lockh->cookie;
1059 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1060 (int)op_data->op_namelen, op_data->op_name,
1061 ldlm_it2str(it->it_op), it->it_status,
1062 it->it_disposition, rc);
1066 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1067 struct lu_fid *fid, __u64 *bits)
1069 /* We could just return 1 immediately, but since we should only
1070 * be called in revalidate_it if we already have a lock, let's
1072 struct ldlm_res_id res_id;
1073 struct lustre_handle lockh;
1074 union ldlm_policy_data policy;
1075 enum ldlm_mode mode;
1078 if (it->it_lock_handle) {
1079 lockh.cookie = it->it_lock_handle;
1080 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1082 fid_build_reg_res_name(fid, &res_id);
1083 switch (it->it_op) {
1085 /* File attributes are held under multiple bits:
1086 * nlink is under lookup lock, size and times are
1087 * under UPDATE lock and recently we've also got
1088 * a separate permissions lock for owner/group/acl that
1089 * were protected by lookup lock before.
1090 * Getattr must provide all of that information,
1091 * so we need to ensure we have all of those locks.
1092 * Unfortunately, if the bits are split across multiple
1093 * locks, there's no easy way to match all of them here,
1094 * so an extra RPC would be performed to fetch all
1095 * of those bits at once for now. */
1096 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1097 * but for old MDTs (< 2.4), permission is covered
1098 * by LOOKUP lock, so it needs to match all bits here.*/
1099 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1100 MDS_INODELOCK_LOOKUP |
1104 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1107 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1110 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1114 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1115 LDLM_IBITS, &policy,
1116 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1121 it->it_lock_handle = lockh.cookie;
1122 it->it_lock_mode = mode;
1124 it->it_lock_handle = 0;
1125 it->it_lock_mode = 0;
1132 * This long block is all about fixing up the lock and request state
1133 * so that it is correct as of the moment _before_ the operation was
1134 * applied; that way, the VFS will think that everything is normal and
1135 * call Lustre's regular VFS methods.
1137 * If we're performing a creation, that means that unless the creation
1138 * failed with EEXIST, we should fake up a negative dentry.
1140 * For everything else, we want to lookup to succeed.
1142 * One additional note: if CREATE or OPEN succeeded, we add an extra
1143 * reference to the request because we need to keep it around until
1144 * ll_create/ll_open gets called.
1146 * The server will return to us, in it_disposition, an indication of
1147 * exactly what it_status refers to.
1149 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1150 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1151 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1152 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1155 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1158 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1159 struct lookup_intent *it, struct ptlrpc_request **reqp,
1160 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1162 struct ldlm_enqueue_info einfo = {
1163 .ei_type = LDLM_IBITS,
1164 .ei_mode = it_to_lock_mode(it),
1165 .ei_cb_bl = cb_blocking,
1166 .ei_cb_cp = ldlm_completion_ast,
1167 .ei_cb_gl = mdc_ldlm_glimpse_ast,
1169 struct lustre_handle lockh;
1174 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1175 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1176 op_data->op_name, PFID(&op_data->op_fid2),
1177 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1181 if (fid_is_sane(&op_data->op_fid2) &&
1182 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1183 /* We could just return 1 immediately, but since we should only
1184 * be called in revalidate_it if we already have a lock, let's
1186 it->it_lock_handle = 0;
1187 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1188 /* Only return failure if it was not GETATTR by cfid
1189 (from inode_revalidate) */
1190 if (rc || op_data->op_namelen != 0)
1194 /* For case if upper layer did not alloc fid, do it now. */
1195 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1196 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1198 CERROR("Can't alloc new fid, rc %d\n", rc);
1203 rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1208 *reqp = it->it_request;
1209 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1213 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1214 struct ptlrpc_request *req,
1217 struct mdc_getattr_args *ga = args;
1218 struct obd_export *exp = ga->ga_exp;
1219 struct md_enqueue_info *minfo = ga->ga_minfo;
1220 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1221 struct lookup_intent *it;
1222 struct lustre_handle *lockh;
1223 struct obd_device *obddev;
1224 struct ldlm_reply *lockrep;
1225 __u64 flags = LDLM_FL_HAS_INTENT;
1229 lockh = &minfo->mi_lockh;
1231 obddev = class_exp2obd(exp);
1233 obd_put_request_slot(&obddev->u.cli);
1234 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1237 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1238 &flags, NULL, 0, lockh, rc);
1240 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1241 mdc_clear_replay_flag(req, rc);
1245 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1246 LASSERT(lockrep != NULL);
1248 lockrep->lock_policy_res2 =
1249 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1251 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1255 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1259 minfo->mi_cb(req, minfo, rc);
1263 int mdc_intent_getattr_async(struct obd_export *exp,
1264 struct md_enqueue_info *minfo)
1266 struct md_op_data *op_data = &minfo->mi_data;
1267 struct lookup_intent *it = &minfo->mi_it;
1268 struct ptlrpc_request *req;
1269 struct mdc_getattr_args *ga;
1270 struct obd_device *obddev = class_exp2obd(exp);
1271 struct ldlm_res_id res_id;
1272 union ldlm_policy_data policy = {
1273 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1274 MDS_INODELOCK_UPDATE } };
1276 __u64 flags = LDLM_FL_HAS_INTENT;
1279 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1280 (int)op_data->op_namelen, op_data->op_name,
1281 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1283 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1284 /* If the MDT return -ERANGE because of large ACL, then the sponsor
1285 * of the async getattr RPC will handle that by itself. */
1286 req = mdc_intent_getattr_pack(exp, it, op_data,
1287 LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1289 RETURN(PTR_ERR(req));
1291 rc = obd_get_request_slot(&obddev->u.cli);
1293 ptlrpc_req_finished(req);
1297 /* With Data-on-MDT the glimpse callback is needed too.
1298 * It is set here in advance but not in mdc_finish_enqueue()
1299 * to avoid possible races. It is safe to have glimpse handler
1300 * for non-DOM locks and costs nothing.*/
1301 if (minfo->mi_einfo.ei_cb_gl == NULL)
1302 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1304 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1305 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1307 obd_put_request_slot(&obddev->u.cli);
1308 ptlrpc_req_finished(req);
1312 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1313 ga = ptlrpc_req_async_args(req);
1315 ga->ga_minfo = minfo;
1317 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1318 ptlrpcd_add_req(req);