4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2016, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_MDC
35 #include <linux/module.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
47 #include "mdc_internal.h"
49 struct mdc_getattr_args {
50 struct obd_export *ga_exp;
51 struct md_enqueue_info *ga_minfo;
54 int it_open_error(int phase, struct lookup_intent *it)
56 if (it_disposition(it, DISP_OPEN_LEASE)) {
57 if (phase >= DISP_OPEN_LEASE)
62 if (it_disposition(it, DISP_OPEN_OPEN)) {
63 if (phase >= DISP_OPEN_OPEN)
69 if (it_disposition(it, DISP_OPEN_CREATE)) {
70 if (phase >= DISP_OPEN_CREATE)
76 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
77 if (phase >= DISP_LOOKUP_EXECD)
83 if (it_disposition(it, DISP_IT_EXECD)) {
84 if (phase >= DISP_IT_EXECD)
90 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
95 EXPORT_SYMBOL(it_open_error);
97 /* this must be called on a lockh that is known to have a referenced lock */
98 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
99 void *data, __u64 *bits)
101 struct ldlm_lock *lock;
102 struct inode *new_inode = data;
108 if (!lustre_handle_is_used(lockh))
111 lock = ldlm_handle2lock(lockh);
113 LASSERT(lock != NULL);
114 lock_res_and_lock(lock);
115 if (lock->l_resource->lr_lvb_inode &&
116 lock->l_resource->lr_lvb_inode != data) {
117 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
118 LASSERTF(old_inode->i_state & I_FREEING,
119 "Found existing inode %p/%lu/%u state %lu in lock: "
120 "setting data to %p/%lu/%u\n", old_inode,
121 old_inode->i_ino, old_inode->i_generation,
123 new_inode, new_inode->i_ino, new_inode->i_generation);
125 lock->l_resource->lr_lvb_inode = new_inode;
127 *bits = lock->l_policy_data.l_inodebits.bits;
129 unlock_res_and_lock(lock);
135 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
136 const struct lu_fid *fid, enum ldlm_type type,
137 union ldlm_policy_data *policy,
138 enum ldlm_mode mode, struct lustre_handle *lockh)
140 struct ldlm_res_id res_id;
144 fid_build_reg_res_name(fid, &res_id);
145 /* LU-4405: Clear bits not supported by server */
146 policy->l_inodebits.bits &= exp_connect_ibits(exp);
147 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
148 &res_id, type, policy, mode, lockh, 0);
152 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
153 union ldlm_policy_data *policy, enum ldlm_mode mode,
154 enum ldlm_cancel_flags flags, void *opaque)
156 struct obd_device *obd = class_exp2obd(exp);
157 struct ldlm_res_id res_id;
162 fid_build_reg_res_name(fid, &res_id);
163 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
164 policy, mode, flags, opaque);
168 int mdc_null_inode(struct obd_export *exp,
169 const struct lu_fid *fid)
171 struct ldlm_res_id res_id;
172 struct ldlm_resource *res;
173 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
176 LASSERTF(ns != NULL, "no namespace passed\n");
178 fid_build_reg_res_name(fid, &res_id);
180 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
185 res->lr_lvb_inode = NULL;
188 ldlm_resource_putref(res);
192 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
194 /* Don't hold error requests for replay. */
195 if (req->rq_replay) {
196 spin_lock(&req->rq_lock);
198 spin_unlock(&req->rq_lock);
200 if (rc && req->rq_transno != 0) {
201 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
206 /* Save a large LOV EA into the request buffer so that it is available
207 * for replay. We don't do this in the initial request because the
208 * original request doesn't need this buffer (at most it sends just the
209 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
210 * buffer and may also be difficult to allocate and save a very large
211 * request buffer for each open. (bug 5707)
213 * OOM here may cause recovery failure if lmm is needed (only for the
214 * original open if the MDS crashed just when this client also OOM'd)
215 * but this is incredibly unlikely, and questionable whether the client
216 * could do MDS recovery under OOM anyways... */
217 int mdc_save_lovea(struct ptlrpc_request *req,
218 const struct req_msg_field *field,
219 void *data, u32 size)
221 struct req_capsule *pill = &req->rq_pill;
225 if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
226 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
228 CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
229 req->rq_export->exp_obd->obd_name,
234 req_capsule_shrink(pill, field, size, RCL_CLIENT);
237 req_capsule_set_size(pill, field, RCL_CLIENT, size);
238 lmm = req_capsule_client_get(pill, field);
240 memcpy(lmm, data, size);
245 static struct ptlrpc_request *
246 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
247 struct md_op_data *op_data)
249 struct ptlrpc_request *req;
250 struct obd_device *obddev = class_exp2obd(exp);
251 struct ldlm_intent *lit;
252 const void *lmm = op_data->op_data;
253 __u32 lmmsize = op_data->op_data_size;
254 struct list_head cancels = LIST_HEAD_INIT(cancels);
260 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
262 /* XXX: openlock is not cancelled for cross-refs. */
263 /* If inode is known, cancel conflicting OPEN locks. */
264 if (fid_is_sane(&op_data->op_fid2)) {
265 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
266 if (it->it_flags & FMODE_WRITE)
271 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
274 else if (it->it_flags & FMODE_EXEC)
280 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
285 /* If CREATE, cancel parent's UPDATE lock. */
286 if (it->it_op & IT_CREAT)
290 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
292 MDS_INODELOCK_UPDATE);
294 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
295 &RQF_LDLM_INTENT_OPEN);
297 ldlm_lock_list_put(&cancels, l_bl_ast, count);
298 RETURN(ERR_PTR(-ENOMEM));
301 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
302 op_data->op_namelen + 1);
303 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
304 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
306 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
307 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
308 strlen(op_data->op_file_secctx_name) + 1 : 0);
310 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
311 op_data->op_file_secctx_size);
313 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
315 ptlrpc_request_free(req);
319 spin_lock(&req->rq_lock);
320 req->rq_replay = req->rq_import->imp_replayable;
321 spin_unlock(&req->rq_lock);
323 /* pack the intent */
324 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
325 lit->opc = (__u64)it->it_op;
327 /* pack the intended request */
328 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
331 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
332 obddev->u.cli.cl_max_mds_easize);
333 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
334 req->rq_import->imp_connect_data.ocd_max_easize);
335 ptlrpc_request_set_replen(req);
339 #define GA_DEFAULT_EA_NAME_LEN 20
340 #define GA_DEFAULT_EA_VAL_LEN 250
341 #define GA_DEFAULT_EA_NUM 10
343 static struct ptlrpc_request *
344 mdc_intent_getxattr_pack(struct obd_export *exp,
345 struct lookup_intent *it,
346 struct md_op_data *op_data)
348 struct ptlrpc_request *req;
349 struct ldlm_intent *lit;
351 struct list_head cancels = LIST_HEAD_INIT(cancels);
352 u32 min_buf_size = 0;
356 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
357 &RQF_LDLM_INTENT_GETXATTR);
359 RETURN(ERR_PTR(-ENOMEM));
361 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
363 ptlrpc_request_free(req);
367 /* pack the intent */
368 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
369 lit->opc = IT_GETXATTR;
371 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
372 /* If the supplied buffer is too small then the server will
373 * return -ERANGE and llite will fallback to using non cached
374 * xattr operations. On servers before 2.10.1 a (non-cached)
375 * listxattr RPC for an orphan or dead file causes an oops. So
376 * let's try to avoid sending too small a buffer to too old a
377 * server. This is effectively undoing the memory conservation
378 * of LU-9417 when it would be *more* likely to crash the
379 * server. See LU-9856. */
380 if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
381 min_buf_size = exp->exp_connect_data.ocd_max_easize;
384 /* pack the intended request */
385 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
386 max_t(u32, min_buf_size,
387 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM),
390 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
391 max_t(u32, min_buf_size,
392 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM));
394 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
395 max_t(u32, min_buf_size,
396 GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM));
398 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
399 max_t(u32, min_buf_size,
400 sizeof(__u32) * GA_DEFAULT_EA_NUM));
402 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
404 ptlrpc_request_set_replen(req);
409 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
410 struct lookup_intent *it,
411 struct md_op_data *op_data)
413 struct ptlrpc_request *req;
414 struct obd_device *obddev = class_exp2obd(exp);
415 struct ldlm_intent *lit;
419 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
420 &RQF_LDLM_INTENT_UNLINK);
422 RETURN(ERR_PTR(-ENOMEM));
424 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
425 op_data->op_namelen + 1);
427 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
429 ptlrpc_request_free(req);
433 /* pack the intent */
434 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
435 lit->opc = (__u64)it->it_op;
437 /* pack the intended request */
438 mdc_unlink_pack(req, op_data);
440 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
441 obddev->u.cli.cl_default_mds_easize);
442 ptlrpc_request_set_replen(req);
446 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
447 struct lookup_intent *it,
448 struct md_op_data *op_data)
450 struct ptlrpc_request *req;
451 struct obd_device *obddev = class_exp2obd(exp);
452 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
453 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
454 OBD_MD_MEA | OBD_MD_FLACL;
455 struct ldlm_intent *lit;
460 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
461 &RQF_LDLM_INTENT_GETATTR);
463 RETURN(ERR_PTR(-ENOMEM));
465 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
466 op_data->op_namelen + 1);
468 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
470 ptlrpc_request_free(req);
474 /* pack the intent */
475 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
476 lit->opc = (__u64)it->it_op;
478 if (obddev->u.cli.cl_default_mds_easize > 0)
479 easize = obddev->u.cli.cl_default_mds_easize;
481 easize = obddev->u.cli.cl_max_mds_easize;
483 /* pack the intended request */
484 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
486 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
487 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
488 req->rq_import->imp_connect_data.ocd_max_easize);
489 ptlrpc_request_set_replen(req);
493 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
494 struct lookup_intent *it,
495 struct md_op_data *op_data)
497 struct obd_device *obd = class_exp2obd(exp);
498 struct ptlrpc_request *req;
499 struct ldlm_intent *lit;
500 struct layout_intent *layout;
504 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
505 &RQF_LDLM_INTENT_LAYOUT);
507 RETURN(ERR_PTR(-ENOMEM));
509 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
510 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
512 ptlrpc_request_free(req);
516 /* pack the intent */
517 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
518 lit->opc = (__u64)it->it_op;
520 /* pack the layout intent request */
521 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
522 LASSERT(op_data->op_data != NULL);
523 LASSERT(op_data->op_data_size == sizeof(*layout));
524 memcpy(layout, op_data->op_data, sizeof(*layout));
526 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
527 obd->u.cli.cl_default_mds_easize);
528 ptlrpc_request_set_replen(req);
532 static struct ptlrpc_request *
533 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
535 struct ptlrpc_request *req;
539 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
541 RETURN(ERR_PTR(-ENOMEM));
543 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
545 ptlrpc_request_free(req);
549 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
550 ptlrpc_request_set_replen(req);
554 static int mdc_finish_enqueue(struct obd_export *exp,
555 struct ptlrpc_request *req,
556 struct ldlm_enqueue_info *einfo,
557 struct lookup_intent *it,
558 struct lustre_handle *lockh,
561 struct req_capsule *pill = &req->rq_pill;
562 struct ldlm_request *lockreq;
563 struct ldlm_reply *lockrep;
564 struct ldlm_lock *lock;
565 void *lvb_data = NULL;
570 /* Similarly, if we're going to replay this request, we don't want to
571 * actually get a lock, just perform the intent. */
572 if (req->rq_transno || req->rq_replay) {
573 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
574 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
577 if (rc == ELDLM_LOCK_ABORTED) {
579 memset(lockh, 0, sizeof(*lockh));
581 } else { /* rc = 0 */
582 lock = ldlm_handle2lock(lockh);
583 LASSERT(lock != NULL);
585 /* If the server gave us back a different lock mode, we should
586 * fix up our variables. */
587 if (lock->l_req_mode != einfo->ei_mode) {
588 ldlm_lock_addref(lockh, lock->l_req_mode);
589 ldlm_lock_decref(lockh, einfo->ei_mode);
590 einfo->ei_mode = lock->l_req_mode;
595 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
596 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
598 it->it_disposition = (int)lockrep->lock_policy_res1;
599 it->it_status = (int)lockrep->lock_policy_res2;
600 it->it_lock_mode = einfo->ei_mode;
601 it->it_lock_handle = lockh->cookie;
602 it->it_request = req;
604 /* Technically speaking rq_transno must already be zero if
605 * it_status is in error, so the check is a bit redundant */
606 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
607 mdc_clear_replay_flag(req, it->it_status);
609 /* If we're doing an IT_OPEN which did not result in an actual
610 * successful open, then we need to remove the bit which saves
611 * this request for unconditional replay.
613 * It's important that we do this first! Otherwise we might exit the
614 * function without doing so, and try to replay a failed create
616 if (it->it_op & IT_OPEN && req->rq_replay &&
617 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
618 mdc_clear_replay_flag(req, it->it_status);
620 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
621 it->it_op, it->it_disposition, it->it_status);
623 /* We know what to expect, so we do any byte flipping required here */
624 if (it_has_reply_body(it)) {
625 struct mdt_body *body;
627 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
629 CERROR ("Can't swab mdt_body\n");
633 if (it_disposition(it, DISP_OPEN_OPEN) &&
634 !it_open_error(DISP_OPEN_OPEN, it)) {
636 * If this is a successful OPEN request, we need to set
637 * replay handler and data early, so that if replay
638 * happens immediately after swabbing below, new reply
639 * is swabbed by that handler correctly.
641 mdc_set_open_replay_data(NULL, NULL, it);
644 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
647 mdc_update_max_ea_from_body(exp, body);
650 * The eadata is opaque; just check that it is there.
651 * Eventually, obd_unpackmd() will check the contents.
653 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
654 body->mbo_eadatasize);
658 /* save lvb data and length in case this is for layout
661 lvb_len = body->mbo_eadatasize;
664 * We save the reply LOV EA in case we have to replay a
665 * create for recovery. If we didn't allocate a large
666 * enough request buffer above we need to reallocate it
667 * here to hold the actual LOV EA.
669 * To not save LOV EA if request is not going to replay
670 * (for example error one).
672 if ((it->it_op & IT_OPEN) && req->rq_replay) {
673 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
674 body->mbo_eadatasize);
676 body->mbo_valid &= ~OBD_MD_FLEASIZE;
677 body->mbo_eadatasize = 0;
682 } else if (it->it_op & IT_LAYOUT) {
683 /* maybe the lock was granted right away and layout
684 * is packed into RMF_DLM_LVB of req */
685 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
687 lvb_data = req_capsule_server_sized_get(pill,
688 &RMF_DLM_LVB, lvb_len);
689 if (lvb_data == NULL)
693 * save replied layout data to the request buffer for
694 * recovery consideration (lest MDS reinitialize
695 * another set of OST objects).
698 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
703 /* fill in stripe data for layout lock.
704 * LU-6581: trust layout data only if layout lock is granted. The MDT
705 * has stopped sending layout unless the layout lock is granted. The
706 * client still does this checking in case it's talking with an old
707 * server. - Jinshan */
708 lock = ldlm_handle2lock(lockh);
709 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
710 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
713 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
714 ldlm_it2str(it->it_op), lvb_len);
716 OBD_ALLOC_LARGE(lmm, lvb_len);
721 memcpy(lmm, lvb_data, lvb_len);
723 /* install lvb_data */
724 lock_res_and_lock(lock);
725 if (lock->l_lvb_data == NULL) {
726 lock->l_lvb_type = LVB_T_LAYOUT;
727 lock->l_lvb_data = lmm;
728 lock->l_lvb_len = lvb_len;
731 unlock_res_and_lock(lock);
733 OBD_FREE_LARGE(lmm, lvb_len);
741 /* We always reserve enough space in the reply packet for a stripe MD, because
742 * we don't know in advance the file type. */
743 static int mdc_enqueue_base(struct obd_export *exp,
744 struct ldlm_enqueue_info *einfo,
745 const union ldlm_policy_data *policy,
746 struct lookup_intent *it,
747 struct md_op_data *op_data,
748 struct lustre_handle *lockh,
749 __u64 extra_lock_flags)
751 struct obd_device *obddev = class_exp2obd(exp);
752 struct ptlrpc_request *req = NULL;
753 __u64 flags, saved_flags = extra_lock_flags;
754 struct ldlm_res_id res_id;
755 static const union ldlm_policy_data lookup_policy = {
756 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
757 static const union ldlm_policy_data update_policy = {
758 .l_inodebits = { MDS_INODELOCK_UPDATE } };
759 static const union ldlm_policy_data layout_policy = {
760 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
761 static const union ldlm_policy_data getxattr_policy = {
762 .l_inodebits = { MDS_INODELOCK_XATTR } };
763 int generation, resends = 0;
764 struct ldlm_reply *lockrep;
765 enum lvb_type lvb_type = 0;
769 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
771 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
774 LASSERT(policy == NULL);
776 saved_flags |= LDLM_FL_HAS_INTENT;
777 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
778 policy = &update_policy;
779 else if (it->it_op & IT_LAYOUT)
780 policy = &layout_policy;
781 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
782 policy = &getxattr_policy;
784 policy = &lookup_policy;
787 generation = obddev->u.cli.cl_import->imp_generation;
791 /* The only way right now is FLOCK. */
792 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
794 res_id.name[3] = LDLM_FLOCK;
795 } else if (it->it_op & IT_OPEN) {
796 req = mdc_intent_open_pack(exp, it, op_data);
797 } else if (it->it_op & IT_UNLINK) {
798 req = mdc_intent_unlink_pack(exp, it, op_data);
799 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
800 req = mdc_intent_getattr_pack(exp, it, op_data);
801 } else if (it->it_op & IT_READDIR) {
802 req = mdc_enqueue_pack(exp, 0);
803 } else if (it->it_op & IT_LAYOUT) {
804 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
806 req = mdc_intent_layout_pack(exp, it, op_data);
807 lvb_type = LVB_T_LAYOUT;
808 } else if (it->it_op & IT_GETXATTR) {
809 req = mdc_intent_getxattr_pack(exp, it, op_data);
816 RETURN(PTR_ERR(req));
819 req->rq_generation_set = 1;
820 req->rq_import_generation = generation;
821 req->rq_sent = ktime_get_real_seconds() + resends;
824 /* It is important to obtain modify RPC slot first (if applicable), so
825 * that threads that are waiting for a modify RPC slot are not polluting
826 * our rpcs in flight counter.
827 * We do not do flock request limiting, though */
829 mdc_get_mod_rpc_slot(req, it);
830 rc = obd_get_request_slot(&obddev->u.cli);
832 mdc_put_mod_rpc_slot(req, it);
833 mdc_clear_replay_flag(req, 0);
834 ptlrpc_req_finished(req);
839 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
840 0, lvb_type, lockh, 0);
842 /* For flock requests we immediatelly return without further
843 delay and let caller deal with the rest, since rest of
844 this function metadata processing makes no sense for flock
845 requests anyway. But in case of problem during comms with
846 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
847 can not rely on caller and this mainly for F_UNLCKs
848 (explicits or automatically generated by Kernel to clean
849 current FLocks upon exit) that can't be trashed */
850 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
851 (einfo->ei_type == LDLM_FLOCK) &&
852 (einfo->ei_mode == LCK_NL))
857 obd_put_request_slot(&obddev->u.cli);
858 mdc_put_mod_rpc_slot(req, it);
862 "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
863 obddev->obd_name, PFID(&op_data->op_fid1),
864 PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
866 mdc_clear_replay_flag(req, rc);
867 ptlrpc_req_finished(req);
871 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
872 LASSERT(lockrep != NULL);
874 lockrep->lock_policy_res2 =
875 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
877 /* Retry infinitely when the server returns -EINPROGRESS for the
878 * intent operation, when server returns -EINPROGRESS for acquiring
879 * intent lock, we'll retry in after_reply(). */
880 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
881 mdc_clear_replay_flag(req, rc);
882 ptlrpc_req_finished(req);
883 if (generation == obddev->u.cli.cl_import->imp_generation) {
884 if (signal_pending(current))
888 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
889 obddev->obd_name, resends, it->it_op,
890 PFID(&op_data->op_fid1),
891 PFID(&op_data->op_fid2));
894 CDEBUG(D_HA, "resend cross eviction\n");
899 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
901 if (lustre_handle_is_used(lockh)) {
902 ldlm_lock_decref(lockh, einfo->ei_mode);
903 memset(lockh, 0, sizeof(*lockh));
905 ptlrpc_req_finished(req);
907 it->it_lock_handle = 0;
908 it->it_lock_mode = 0;
909 it->it_request = NULL;
915 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
916 const union ldlm_policy_data *policy,
917 struct md_op_data *op_data,
918 struct lustre_handle *lockh, __u64 extra_lock_flags)
920 return mdc_enqueue_base(exp, einfo, policy, NULL,
921 op_data, lockh, extra_lock_flags);
924 static int mdc_finish_intent_lock(struct obd_export *exp,
925 struct ptlrpc_request *request,
926 struct md_op_data *op_data,
927 struct lookup_intent *it,
928 struct lustre_handle *lockh)
930 struct lustre_handle old_lock;
931 struct ldlm_lock *lock;
935 LASSERT(request != NULL);
936 LASSERT(request != LP_POISON);
937 LASSERT(request->rq_repmsg != LP_POISON);
939 if (it->it_op & IT_READDIR)
942 if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
943 if (it->it_status != 0)
944 GOTO(out, rc = it->it_status);
946 if (!it_disposition(it, DISP_IT_EXECD)) {
947 /* The server failed before it even started executing
948 * the intent, i.e. because it couldn't unpack the
951 LASSERT(it->it_status != 0);
952 GOTO(out, rc = it->it_status);
954 rc = it_open_error(DISP_IT_EXECD, it);
958 rc = it_open_error(DISP_LOOKUP_EXECD, it);
962 /* keep requests around for the multiple phases of the call
963 * this shows the DISP_XX must guarantee we make it into the
966 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
967 it_disposition(it, DISP_OPEN_CREATE) &&
968 !it_open_error(DISP_OPEN_CREATE, it)) {
969 it_set_disposition(it, DISP_ENQ_CREATE_REF);
970 /* balanced in ll_create_node */
971 ptlrpc_request_addref(request);
973 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
974 it_disposition(it, DISP_OPEN_OPEN) &&
975 !it_open_error(DISP_OPEN_OPEN, it)) {
976 it_set_disposition(it, DISP_ENQ_OPEN_REF);
977 /* balanced in ll_file_open */
978 ptlrpc_request_addref(request);
979 /* BUG 11546 - eviction in the middle of open rpc
982 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
986 if (it->it_op & IT_CREAT) {
987 /* XXX this belongs in ll_create_it */
988 } else if (it->it_op == IT_OPEN) {
989 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
991 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
995 /* If we already have a matching lock, then cancel the new
996 * one. We have to set the data here instead of in
997 * mdc_enqueue, because we need to use the child's inode as
998 * the l_ast_data to match, and that's not available until
999 * intent_finish has performed the iget().) */
1000 lock = ldlm_handle2lock(lockh);
1002 union ldlm_policy_data policy = lock->l_policy_data;
1003 LDLM_DEBUG(lock, "matching against this");
1005 if (it_has_reply_body(it)) {
1006 struct mdt_body *body;
1008 body = req_capsule_server_get(&request->rq_pill,
1010 /* mdc_enqueue checked */
1011 LASSERT(body != NULL);
1012 LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1013 &lock->l_resource->lr_name),
1014 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1015 PLDLMRES(lock->l_resource),
1016 PFID(&body->mbo_fid1));
1018 LDLM_LOCK_PUT(lock);
1020 memcpy(&old_lock, lockh, sizeof(*lockh));
1021 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1022 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1023 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1024 memcpy(lockh, &old_lock, sizeof(old_lock));
1025 it->it_lock_handle = lockh->cookie;
1031 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1032 (int)op_data->op_namelen, op_data->op_name,
1033 ldlm_it2str(it->it_op), it->it_status,
1034 it->it_disposition, rc);
1038 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1039 struct lu_fid *fid, __u64 *bits)
1041 /* We could just return 1 immediately, but since we should only
1042 * be called in revalidate_it if we already have a lock, let's
1044 struct ldlm_res_id res_id;
1045 struct lustre_handle lockh;
1046 union ldlm_policy_data policy;
1047 enum ldlm_mode mode;
1050 if (it->it_lock_handle) {
1051 lockh.cookie = it->it_lock_handle;
1052 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1054 fid_build_reg_res_name(fid, &res_id);
1055 switch (it->it_op) {
1057 /* File attributes are held under multiple bits:
1058 * nlink is under lookup lock, size and times are
1059 * under UPDATE lock and recently we've also got
1060 * a separate permissions lock for owner/group/acl that
1061 * were protected by lookup lock before.
1062 * Getattr must provide all of that information,
1063 * so we need to ensure we have all of those locks.
1064 * Unfortunately, if the bits are split across multiple
1065 * locks, there's no easy way to match all of them here,
1066 * so an extra RPC would be performed to fetch all
1067 * of those bits at once for now. */
1068 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1069 * but for old MDTs (< 2.4), permission is covered
1070 * by LOOKUP lock, so it needs to match all bits here.*/
1071 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1072 MDS_INODELOCK_LOOKUP |
1076 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1079 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1082 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1086 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1087 LDLM_IBITS, &policy,
1088 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1093 it->it_lock_handle = lockh.cookie;
1094 it->it_lock_mode = mode;
1096 it->it_lock_handle = 0;
1097 it->it_lock_mode = 0;
1104 * This long block is all about fixing up the lock and request state
1105 * so that it is correct as of the moment _before_ the operation was
1106 * applied; that way, the VFS will think that everything is normal and
1107 * call Lustre's regular VFS methods.
1109 * If we're performing a creation, that means that unless the creation
1110 * failed with EEXIST, we should fake up a negative dentry.
1112 * For everything else, we want to lookup to succeed.
1114 * One additional note: if CREATE or OPEN succeeded, we add an extra
1115 * reference to the request because we need to keep it around until
1116 * ll_create/ll_open gets called.
1118 * The server will return to us, in it_disposition, an indication of
1119 * exactly what it_status refers to.
1121 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1122 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1123 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1124 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1127 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1130 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1131 struct lookup_intent *it, struct ptlrpc_request **reqp,
1132 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1134 struct ldlm_enqueue_info einfo = {
1135 .ei_type = LDLM_IBITS,
1136 .ei_mode = it_to_lock_mode(it),
1137 .ei_cb_bl = cb_blocking,
1138 .ei_cb_cp = ldlm_completion_ast,
1140 struct lustre_handle lockh;
1145 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1146 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1147 op_data->op_name, PFID(&op_data->op_fid2),
1148 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1152 if (fid_is_sane(&op_data->op_fid2) &&
1153 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1154 /* We could just return 1 immediately, but since we should only
1155 * be called in revalidate_it if we already have a lock, let's
1157 it->it_lock_handle = 0;
1158 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1159 /* Only return failure if it was not GETATTR by cfid
1160 (from inode_revalidate) */
1161 if (rc || op_data->op_namelen != 0)
1165 /* For case if upper layer did not alloc fid, do it now. */
1166 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1167 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1169 CERROR("Can't alloc new fid, rc %d\n", rc);
1174 rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1179 *reqp = it->it_request;
1180 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1184 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1185 struct ptlrpc_request *req,
1188 struct mdc_getattr_args *ga = args;
1189 struct obd_export *exp = ga->ga_exp;
1190 struct md_enqueue_info *minfo = ga->ga_minfo;
1191 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1192 struct lookup_intent *it;
1193 struct lustre_handle *lockh;
1194 struct obd_device *obddev;
1195 struct ldlm_reply *lockrep;
1196 __u64 flags = LDLM_FL_HAS_INTENT;
1200 lockh = &minfo->mi_lockh;
1202 obddev = class_exp2obd(exp);
1204 obd_put_request_slot(&obddev->u.cli);
1205 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1208 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1209 &flags, NULL, 0, lockh, rc);
1211 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1212 mdc_clear_replay_flag(req, rc);
1216 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1217 LASSERT(lockrep != NULL);
1219 lockrep->lock_policy_res2 =
1220 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1222 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1226 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1230 minfo->mi_cb(req, minfo, rc);
1234 int mdc_intent_getattr_async(struct obd_export *exp,
1235 struct md_enqueue_info *minfo)
1237 struct md_op_data *op_data = &minfo->mi_data;
1238 struct lookup_intent *it = &minfo->mi_it;
1239 struct ptlrpc_request *req;
1240 struct mdc_getattr_args *ga;
1241 struct obd_device *obddev = class_exp2obd(exp);
1242 struct ldlm_res_id res_id;
1243 union ldlm_policy_data policy = {
1244 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1245 MDS_INODELOCK_UPDATE } };
1247 __u64 flags = LDLM_FL_HAS_INTENT;
1250 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1251 (int)op_data->op_namelen, op_data->op_name,
1252 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1254 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1255 req = mdc_intent_getattr_pack(exp, it, op_data);
1257 RETURN(PTR_ERR(req));
1259 rc = obd_get_request_slot(&obddev->u.cli);
1261 ptlrpc_req_finished(req);
1265 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1266 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1268 obd_put_request_slot(&obddev->u.cli);
1269 ptlrpc_req_finished(req);
1273 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1274 ga = ptlrpc_req_async_args(req);
1276 ga->ga_minfo = minfo;
1278 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1279 ptlrpcd_add_req(req);