4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_MDC
35 #include <linux/module.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
48 #include "mdc_internal.h"
50 struct mdc_getattr_args {
51 struct obd_export *ga_exp;
52 struct md_enqueue_info *ga_minfo;
55 int it_open_error(int phase, struct lookup_intent *it)
57 if (it_disposition(it, DISP_OPEN_LEASE)) {
58 if (phase >= DISP_OPEN_LEASE)
63 if (it_disposition(it, DISP_OPEN_OPEN)) {
64 if (phase >= DISP_OPEN_OPEN)
70 if (it_disposition(it, DISP_OPEN_CREATE)) {
71 if (phase >= DISP_OPEN_CREATE)
77 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78 if (phase >= DISP_LOOKUP_EXECD)
84 if (it_disposition(it, DISP_IT_EXECD)) {
85 if (phase >= DISP_IT_EXECD)
91 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
96 EXPORT_SYMBOL(it_open_error);
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100 void *data, __u64 *bits)
102 struct ldlm_lock *lock;
103 struct inode *new_inode = data;
109 if (!lustre_handle_is_used(lockh))
112 lock = ldlm_handle2lock(lockh);
114 LASSERT(lock != NULL);
115 lock_res_and_lock(lock);
116 if (lock->l_resource->lr_lvb_inode &&
117 lock->l_resource->lr_lvb_inode != data) {
118 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119 LASSERTF(old_inode->i_state & I_FREEING,
120 "Found existing inode %p/%lu/%u state %lu in lock: "
121 "setting data to %p/%lu/%u\n", old_inode,
122 old_inode->i_ino, old_inode->i_generation,
124 new_inode, new_inode->i_ino, new_inode->i_generation);
126 lock->l_resource->lr_lvb_inode = new_inode;
128 *bits = lock->l_policy_data.l_inodebits.bits;
130 unlock_res_and_lock(lock);
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137 const struct lu_fid *fid, enum ldlm_type type,
138 union ldlm_policy_data *policy,
139 enum ldlm_mode mode, struct lustre_handle *lockh)
141 struct ldlm_res_id res_id;
145 fid_build_reg_res_name(fid, &res_id);
146 /* LU-4405: Clear bits not supported by server */
147 policy->l_inodebits.bits &= exp_connect_ibits(exp);
148 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149 &res_id, type, policy, mode, lockh, 0);
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154 union ldlm_policy_data *policy, enum ldlm_mode mode,
155 enum ldlm_cancel_flags flags, void *opaque)
157 struct obd_device *obd = class_exp2obd(exp);
158 struct ldlm_res_id res_id;
163 fid_build_reg_res_name(fid, &res_id);
164 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165 policy, mode, flags, opaque);
169 int mdc_null_inode(struct obd_export *exp,
170 const struct lu_fid *fid)
172 struct ldlm_res_id res_id;
173 struct ldlm_resource *res;
174 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
177 LASSERTF(ns != NULL, "no namespace passed\n");
179 fid_build_reg_res_name(fid, &res_id);
181 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
186 res->lr_lvb_inode = NULL;
189 ldlm_resource_putref(res);
193 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
195 /* Don't hold error requests for replay. */
196 if (req->rq_replay) {
197 spin_lock(&req->rq_lock);
199 spin_unlock(&req->rq_lock);
201 if (rc && req->rq_transno != 0) {
202 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
207 /* Save a large LOV EA into the request buffer so that it is available
208 * for replay. We don't do this in the initial request because the
209 * original request doesn't need this buffer (at most it sends just the
210 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211 * buffer and may also be difficult to allocate and save a very large
212 * request buffer for each open. (bug 5707)
214 * OOM here may cause recovery failure if lmm is needed (only for the
215 * original open if the MDS crashed just when this client also OOM'd)
216 * but this is incredibly unlikely, and questionable whether the client
217 * could do MDS recovery under OOM anyways... */
218 int mdc_save_lovea(struct ptlrpc_request *req,
219 const struct req_msg_field *field,
220 void *data, u32 size)
222 struct req_capsule *pill = &req->rq_pill;
226 if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
227 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
229 CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
230 req->rq_export->exp_obd->obd_name,
235 req_capsule_shrink(pill, field, size, RCL_CLIENT);
238 req_capsule_set_size(pill, field, RCL_CLIENT, size);
239 lmm = req_capsule_client_get(pill, field);
241 memcpy(lmm, data, size);
246 static struct ptlrpc_request *
247 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
248 struct md_op_data *op_data, __u32 acl_bufsize)
250 struct ptlrpc_request *req;
251 struct obd_device *obddev = class_exp2obd(exp);
252 struct ldlm_intent *lit;
253 const void *lmm = op_data->op_data;
254 __u32 lmmsize = op_data->op_data_size;
255 struct list_head cancels = LIST_HEAD_INIT(cancels);
263 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
265 /* XXX: openlock is not cancelled for cross-refs. */
266 /* If inode is known, cancel conflicting OPEN locks. */
267 if (fid_is_sane(&op_data->op_fid2)) {
268 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
269 if (it->it_flags & MDS_FMODE_WRITE)
274 if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
277 else if (it->it_flags & FMODE_EXEC)
283 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
288 /* If CREATE, cancel parent's UPDATE lock. */
289 if (it->it_op & IT_CREAT)
293 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
295 MDS_INODELOCK_UPDATE);
297 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
298 &RQF_LDLM_INTENT_OPEN);
300 ldlm_lock_list_put(&cancels, l_bl_ast, count);
301 RETURN(ERR_PTR(-ENOMEM));
304 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
305 op_data->op_namelen + 1);
306 if (cl_is_lov_delay_create(it->it_flags)) {
307 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
308 LASSERT(lmmsize == 0);
309 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
311 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
312 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
315 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
316 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
317 strlen(op_data->op_file_secctx_name) + 1 : 0);
319 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
320 op_data->op_file_secctx_size);
322 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
324 ptlrpc_request_free(req);
328 spin_lock(&req->rq_lock);
329 req->rq_replay = req->rq_import->imp_replayable;
330 spin_unlock(&req->rq_lock);
332 /* pack the intent */
333 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
334 lit->opc = (__u64)it->it_op;
336 /* pack the intended request */
337 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
340 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
341 obddev->u.cli.cl_max_mds_easize);
342 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
345 * Inline buffer for possible data from Data-on-MDT files.
347 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
348 sizeof(struct niobuf_remote));
349 ptlrpc_request_set_replen(req);
351 /* Get real repbuf allocated size as rounded up power of 2 */
352 repsize = size_roundup_power2(req->rq_replen +
353 lustre_msg_early_size());
355 /* Estimate free space for DoM files in repbuf */
356 repsize -= req->rq_replen - obddev->u.cli.cl_max_mds_easize +
357 sizeof(struct lov_comp_md_v1) +
358 sizeof(struct lov_comp_md_entry_v1) +
359 lov_mds_md_size(0, LOV_MAGIC_V3);
361 if (repsize < obddev->u.cli.cl_dom_min_inline_repsize) {
362 repsize = obddev->u.cli.cl_dom_min_inline_repsize - repsize;
363 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
365 sizeof(struct niobuf_remote) + repsize);
366 ptlrpc_request_set_replen(req);
367 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
368 repsize, req->rq_replen);
373 #define GA_DEFAULT_EA_NAME_LEN 20
374 #define GA_DEFAULT_EA_VAL_LEN 250
375 #define GA_DEFAULT_EA_NUM 10
377 static struct ptlrpc_request *
378 mdc_intent_getxattr_pack(struct obd_export *exp,
379 struct lookup_intent *it,
380 struct md_op_data *op_data)
382 struct ptlrpc_request *req;
383 struct ldlm_intent *lit;
385 struct list_head cancels = LIST_HEAD_INIT(cancels);
386 u32 min_buf_size = 0;
390 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
391 &RQF_LDLM_INTENT_GETXATTR);
393 RETURN(ERR_PTR(-ENOMEM));
395 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
397 ptlrpc_request_free(req);
401 /* pack the intent */
402 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
403 lit->opc = IT_GETXATTR;
405 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
406 /* If the supplied buffer is too small then the server will
407 * return -ERANGE and llite will fallback to using non cached
408 * xattr operations. On servers before 2.10.1 a (non-cached)
409 * listxattr RPC for an orphan or dead file causes an oops. So
410 * let's try to avoid sending too small a buffer to too old a
411 * server. This is effectively undoing the memory conservation
412 * of LU-9417 when it would be *more* likely to crash the
413 * server. See LU-9856. */
414 if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
415 min_buf_size = exp->exp_connect_data.ocd_max_easize;
418 /* pack the intended request */
419 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
420 max_t(u32, min_buf_size,
421 GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM),
424 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
425 max_t(u32, min_buf_size,
426 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM));
428 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
429 max_t(u32, min_buf_size,
430 GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM));
432 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
433 max_t(u32, min_buf_size,
434 sizeof(__u32) * GA_DEFAULT_EA_NUM));
436 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
438 ptlrpc_request_set_replen(req);
443 static struct ptlrpc_request *
444 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
445 struct md_op_data *op_data, __u32 acl_bufsize)
447 struct ptlrpc_request *req;
448 struct obd_device *obddev = class_exp2obd(exp);
449 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
450 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
451 OBD_MD_MEA | OBD_MD_FLACL;
452 struct ldlm_intent *lit;
457 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
458 &RQF_LDLM_INTENT_GETATTR);
460 RETURN(ERR_PTR(-ENOMEM));
462 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
463 op_data->op_namelen + 1);
465 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
467 ptlrpc_request_free(req);
471 /* pack the intent */
472 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
473 lit->opc = (__u64)it->it_op;
475 if (obddev->u.cli.cl_default_mds_easize > 0)
476 easize = obddev->u.cli.cl_default_mds_easize;
478 easize = obddev->u.cli.cl_max_mds_easize;
480 /* pack the intended request */
481 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
483 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
484 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
485 ptlrpc_request_set_replen(req);
489 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
490 struct lookup_intent *it,
491 struct md_op_data *op_data)
493 struct obd_device *obd = class_exp2obd(exp);
494 struct ptlrpc_request *req;
495 struct ldlm_intent *lit;
496 struct layout_intent *layout;
500 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
501 &RQF_LDLM_INTENT_LAYOUT);
503 RETURN(ERR_PTR(-ENOMEM));
505 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
506 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
508 ptlrpc_request_free(req);
512 /* pack the intent */
513 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
514 lit->opc = (__u64)it->it_op;
516 /* pack the layout intent request */
517 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
518 LASSERT(op_data->op_data != NULL);
519 LASSERT(op_data->op_data_size == sizeof(*layout));
520 memcpy(layout, op_data->op_data, sizeof(*layout));
522 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
523 obd->u.cli.cl_default_mds_easize);
524 ptlrpc_request_set_replen(req);
528 static struct ptlrpc_request *
529 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
531 struct ptlrpc_request *req;
535 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
537 RETURN(ERR_PTR(-ENOMEM));
539 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
541 ptlrpc_request_free(req);
545 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
546 ptlrpc_request_set_replen(req);
550 static int mdc_finish_enqueue(struct obd_export *exp,
551 struct ptlrpc_request *req,
552 struct ldlm_enqueue_info *einfo,
553 struct lookup_intent *it,
554 struct lustre_handle *lockh,
557 struct req_capsule *pill = &req->rq_pill;
558 struct ldlm_request *lockreq;
559 struct ldlm_reply *lockrep;
560 struct ldlm_lock *lock;
561 struct mdt_body *body = NULL;
562 void *lvb_data = NULL;
568 /* Similarly, if we're going to replay this request, we don't want to
569 * actually get a lock, just perform the intent. */
570 if (req->rq_transno || req->rq_replay) {
571 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
572 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
575 if (rc == ELDLM_LOCK_ABORTED) {
577 memset(lockh, 0, sizeof(*lockh));
579 } else { /* rc = 0 */
580 lock = ldlm_handle2lock(lockh);
581 LASSERT(lock != NULL);
583 /* If the server gave us back a different lock mode, we should
584 * fix up our variables. */
585 if (lock->l_req_mode != einfo->ei_mode) {
586 ldlm_lock_addref(lockh, lock->l_req_mode);
587 ldlm_lock_decref(lockh, einfo->ei_mode);
588 einfo->ei_mode = lock->l_req_mode;
593 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
594 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
596 it->it_disposition = (int)lockrep->lock_policy_res1;
597 it->it_status = (int)lockrep->lock_policy_res2;
598 it->it_lock_mode = einfo->ei_mode;
599 it->it_lock_handle = lockh->cookie;
600 it->it_request = req;
602 /* Technically speaking rq_transno must already be zero if
603 * it_status is in error, so the check is a bit redundant */
604 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
605 mdc_clear_replay_flag(req, it->it_status);
607 /* If we're doing an IT_OPEN which did not result in an actual
608 * successful open, then we need to remove the bit which saves
609 * this request for unconditional replay.
611 * It's important that we do this first! Otherwise we might exit the
612 * function without doing so, and try to replay a failed create
614 if (it->it_op & IT_OPEN && req->rq_replay &&
615 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
616 mdc_clear_replay_flag(req, it->it_status);
618 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
619 it->it_op, it->it_disposition, it->it_status);
621 /* We know what to expect, so we do any byte flipping required here */
622 if (it_has_reply_body(it)) {
623 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
625 CERROR ("Can't swab mdt_body\n");
629 if (it_disposition(it, DISP_OPEN_OPEN) &&
630 !it_open_error(DISP_OPEN_OPEN, it)) {
632 * If this is a successful OPEN request, we need to set
633 * replay handler and data early, so that if replay
634 * happens immediately after swabbing below, new reply
635 * is swabbed by that handler correctly.
637 mdc_set_open_replay_data(NULL, NULL, it);
640 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
643 mdc_update_max_ea_from_body(exp, body);
646 * The eadata is opaque; just check that it is there.
647 * Eventually, obd_unpackmd() will check the contents.
649 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
650 body->mbo_eadatasize);
654 /* save lvb data and length in case this is for layout
657 lvb_len = body->mbo_eadatasize;
660 * We save the reply LOV EA in case we have to replay a
661 * create for recovery. If we didn't allocate a large
662 * enough request buffer above we need to reallocate it
663 * here to hold the actual LOV EA.
665 * To not save LOV EA if request is not going to replay
666 * (for example error one).
668 if ((it->it_op & IT_OPEN) && req->rq_replay) {
669 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
670 body->mbo_eadatasize);
672 body->mbo_valid &= ~OBD_MD_FLEASIZE;
673 body->mbo_eadatasize = 0;
678 } else if (it->it_op & IT_LAYOUT) {
679 /* maybe the lock was granted right away and layout
680 * is packed into RMF_DLM_LVB of req */
681 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
683 lvb_data = req_capsule_server_sized_get(pill,
684 &RMF_DLM_LVB, lvb_len);
685 if (lvb_data == NULL)
689 * save replied layout data to the request buffer for
690 * recovery consideration (lest MDS reinitialize
691 * another set of OST objects).
694 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
699 /* fill in stripe data for layout lock.
700 * LU-6581: trust layout data only if layout lock is granted. The MDT
701 * has stopped sending layout unless the layout lock is granted. The
702 * client still does this checking in case it's talking with an old
703 * server. - Jinshan */
704 lock = ldlm_handle2lock(lockh);
708 if (ldlm_has_layout(lock) && lvb_data != NULL &&
709 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
712 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
713 ldlm_it2str(it->it_op), lvb_len);
715 OBD_ALLOC_LARGE(lmm, lvb_len);
717 GOTO(out_lock, rc = -ENOMEM);
719 memcpy(lmm, lvb_data, lvb_len);
721 /* install lvb_data */
722 lock_res_and_lock(lock);
723 if (lock->l_lvb_data == NULL) {
724 lock->l_lvb_type = LVB_T_LAYOUT;
725 lock->l_lvb_data = lmm;
726 lock->l_lvb_len = lvb_len;
729 unlock_res_and_lock(lock);
731 OBD_FREE_LARGE(lmm, lvb_len);
734 if (ldlm_has_dom(lock)) {
735 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
737 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
738 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
739 LDLM_ERROR(lock, "%s: DoM lock without size.\n",
740 exp->exp_obd->obd_name);
741 GOTO(out_lock, rc = -EPROTO);
744 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
745 ldlm_it2str(it->it_op), body->mbo_dom_size);
747 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
755 /* We always reserve enough space in the reply packet for a stripe MD, because
756 * we don't know in advance the file type. */
757 static int mdc_enqueue_base(struct obd_export *exp,
758 struct ldlm_enqueue_info *einfo,
759 const union ldlm_policy_data *policy,
760 struct lookup_intent *it,
761 struct md_op_data *op_data,
762 struct lustre_handle *lockh,
763 __u64 extra_lock_flags)
765 struct obd_device *obddev = class_exp2obd(exp);
766 struct ptlrpc_request *req = NULL;
767 __u64 flags, saved_flags = extra_lock_flags;
768 struct ldlm_res_id res_id;
769 static const union ldlm_policy_data lookup_policy = {
770 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
771 static const union ldlm_policy_data update_policy = {
772 .l_inodebits = { MDS_INODELOCK_UPDATE } };
773 static const union ldlm_policy_data layout_policy = {
774 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
775 static const union ldlm_policy_data getxattr_policy = {
776 .l_inodebits = { MDS_INODELOCK_XATTR } };
777 int generation, resends = 0;
778 struct ldlm_reply *lockrep;
779 struct obd_import *imp = class_exp2cliimp(exp);
781 enum lvb_type lvb_type = 0;
785 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
787 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
790 LASSERT(policy == NULL);
792 saved_flags |= LDLM_FL_HAS_INTENT;
793 if (it->it_op & (IT_GETATTR | IT_READDIR))
794 policy = &update_policy;
795 else if (it->it_op & IT_LAYOUT)
796 policy = &layout_policy;
797 else if (it->it_op & IT_GETXATTR)
798 policy = &getxattr_policy;
800 policy = &lookup_policy;
803 generation = obddev->u.cli.cl_import->imp_generation;
804 if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
805 acl_bufsize = imp->imp_connect_data.ocd_max_easize;
807 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
812 /* The only way right now is FLOCK. */
813 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
815 res_id.name[3] = LDLM_FLOCK;
816 } else if (it->it_op & IT_OPEN) {
817 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
818 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
819 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
820 } else if (it->it_op & IT_READDIR) {
821 req = mdc_enqueue_pack(exp, 0);
822 } else if (it->it_op & IT_LAYOUT) {
823 if (!imp_connect_lvb_type(imp))
825 req = mdc_intent_layout_pack(exp, it, op_data);
826 lvb_type = LVB_T_LAYOUT;
827 } else if (it->it_op & IT_GETXATTR) {
828 req = mdc_intent_getxattr_pack(exp, it, op_data);
835 RETURN(PTR_ERR(req));
838 req->rq_generation_set = 1;
839 req->rq_import_generation = generation;
840 req->rq_sent = ktime_get_real_seconds() + resends;
843 /* It is important to obtain modify RPC slot first (if applicable), so
844 * that threads that are waiting for a modify RPC slot are not polluting
845 * our rpcs in flight counter.
846 * We do not do flock request limiting, though */
848 mdc_get_mod_rpc_slot(req, it);
849 rc = obd_get_request_slot(&obddev->u.cli);
851 mdc_put_mod_rpc_slot(req, it);
852 mdc_clear_replay_flag(req, 0);
853 ptlrpc_req_finished(req);
858 /* With Data-on-MDT the glimpse callback is needed too.
859 * It is set here in advance but not in mdc_finish_enqueue()
860 * to avoid possible races. It is safe to have glimpse handler
861 * for non-DOM locks and costs nothing.*/
862 if (einfo->ei_cb_gl == NULL)
863 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
865 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
866 0, lvb_type, lockh, 0);
868 /* For flock requests we immediatelly return without further
869 delay and let caller deal with the rest, since rest of
870 this function metadata processing makes no sense for flock
871 requests anyway. But in case of problem during comms with
872 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
873 can not rely on caller and this mainly for F_UNLCKs
874 (explicits or automatically generated by Kernel to clean
875 current FLocks upon exit) that can't be trashed */
876 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
877 (einfo->ei_type == LDLM_FLOCK) &&
878 (einfo->ei_mode == LCK_NL))
883 obd_put_request_slot(&obddev->u.cli);
884 mdc_put_mod_rpc_slot(req, it);
888 "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
889 obddev->obd_name, PFID(&op_data->op_fid1),
890 PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
892 mdc_clear_replay_flag(req, rc);
893 ptlrpc_req_finished(req);
897 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
898 LASSERT(lockrep != NULL);
900 lockrep->lock_policy_res2 =
901 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
903 /* Retry infinitely when the server returns -EINPROGRESS for the
904 * intent operation, when server returns -EINPROGRESS for acquiring
905 * intent lock, we'll retry in after_reply(). */
906 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
907 mdc_clear_replay_flag(req, rc);
908 ptlrpc_req_finished(req);
909 if (generation == obddev->u.cli.cl_import->imp_generation) {
910 if (signal_pending(current))
914 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
915 obddev->obd_name, resends, it->it_op,
916 PFID(&op_data->op_fid1),
917 PFID(&op_data->op_fid2));
920 CDEBUG(D_HA, "resend cross eviction\n");
925 if ((int)lockrep->lock_policy_res2 == -ERANGE &&
926 it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
927 acl_bufsize != imp->imp_connect_data.ocd_max_easize) {
928 mdc_clear_replay_flag(req, -ERANGE);
929 ptlrpc_req_finished(req);
930 acl_bufsize = imp->imp_connect_data.ocd_max_easize;
934 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
936 if (lustre_handle_is_used(lockh)) {
937 ldlm_lock_decref(lockh, einfo->ei_mode);
938 memset(lockh, 0, sizeof(*lockh));
940 ptlrpc_req_finished(req);
942 it->it_lock_handle = 0;
943 it->it_lock_mode = 0;
944 it->it_request = NULL;
950 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
951 const union ldlm_policy_data *policy,
952 struct md_op_data *op_data,
953 struct lustre_handle *lockh, __u64 extra_lock_flags)
955 return mdc_enqueue_base(exp, einfo, policy, NULL,
956 op_data, lockh, extra_lock_flags);
959 static int mdc_finish_intent_lock(struct obd_export *exp,
960 struct ptlrpc_request *request,
961 struct md_op_data *op_data,
962 struct lookup_intent *it,
963 struct lustre_handle *lockh)
965 struct lustre_handle old_lock;
966 struct ldlm_lock *lock;
970 LASSERT(request != NULL);
971 LASSERT(request != LP_POISON);
972 LASSERT(request->rq_repmsg != LP_POISON);
974 if (it->it_op & IT_READDIR)
977 if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
978 if (it->it_status != 0)
979 GOTO(out, rc = it->it_status);
981 if (!it_disposition(it, DISP_IT_EXECD)) {
982 /* The server failed before it even started executing
983 * the intent, i.e. because it couldn't unpack the
986 LASSERT(it->it_status != 0);
987 GOTO(out, rc = it->it_status);
989 rc = it_open_error(DISP_IT_EXECD, it);
993 rc = it_open_error(DISP_LOOKUP_EXECD, it);
997 /* keep requests around for the multiple phases of the call
998 * this shows the DISP_XX must guarantee we make it into the
1001 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1002 it_disposition(it, DISP_OPEN_CREATE) &&
1003 !it_open_error(DISP_OPEN_CREATE, it)) {
1004 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1005 /* balanced in ll_create_node */
1006 ptlrpc_request_addref(request);
1008 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1009 it_disposition(it, DISP_OPEN_OPEN) &&
1010 !it_open_error(DISP_OPEN_OPEN, it)) {
1011 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1012 /* balanced in ll_file_open */
1013 ptlrpc_request_addref(request);
1014 /* BUG 11546 - eviction in the middle of open rpc
1017 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1021 if (it->it_op & IT_CREAT) {
1022 /* XXX this belongs in ll_create_it */
1023 } else if (it->it_op == IT_OPEN) {
1024 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1026 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1030 /* If we already have a matching lock, then cancel the new
1031 * one. We have to set the data here instead of in
1032 * mdc_enqueue, because we need to use the child's inode as
1033 * the l_ast_data to match, and that's not available until
1034 * intent_finish has performed the iget().) */
1035 lock = ldlm_handle2lock(lockh);
1037 union ldlm_policy_data policy = lock->l_policy_data;
1038 LDLM_DEBUG(lock, "matching against this");
1040 if (it_has_reply_body(it)) {
1041 struct mdt_body *body;
1043 body = req_capsule_server_get(&request->rq_pill,
1045 /* mdc_enqueue checked */
1046 LASSERT(body != NULL);
1047 LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1048 &lock->l_resource->lr_name),
1049 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1050 PLDLMRES(lock->l_resource),
1051 PFID(&body->mbo_fid1));
1053 LDLM_LOCK_PUT(lock);
1055 memcpy(&old_lock, lockh, sizeof(*lockh));
1056 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1057 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1058 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1059 memcpy(lockh, &old_lock, sizeof(old_lock));
1060 it->it_lock_handle = lockh->cookie;
1066 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1067 (int)op_data->op_namelen, op_data->op_name,
1068 ldlm_it2str(it->it_op), it->it_status,
1069 it->it_disposition, rc);
1073 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1074 struct lu_fid *fid, __u64 *bits)
1076 /* We could just return 1 immediately, but since we should only
1077 * be called in revalidate_it if we already have a lock, let's
1079 struct ldlm_res_id res_id;
1080 struct lustre_handle lockh;
1081 union ldlm_policy_data policy;
1082 enum ldlm_mode mode;
1085 if (it->it_lock_handle) {
1086 lockh.cookie = it->it_lock_handle;
1087 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1089 fid_build_reg_res_name(fid, &res_id);
1090 switch (it->it_op) {
1092 /* File attributes are held under multiple bits:
1093 * nlink is under lookup lock, size and times are
1094 * under UPDATE lock and recently we've also got
1095 * a separate permissions lock for owner/group/acl that
1096 * were protected by lookup lock before.
1097 * Getattr must provide all of that information,
1098 * so we need to ensure we have all of those locks.
1099 * Unfortunately, if the bits are split across multiple
1100 * locks, there's no easy way to match all of them here,
1101 * so an extra RPC would be performed to fetch all
1102 * of those bits at once for now. */
1103 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1104 * but for old MDTs (< 2.4), permission is covered
1105 * by LOOKUP lock, so it needs to match all bits here.*/
1106 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1107 MDS_INODELOCK_LOOKUP |
1111 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1114 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1117 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1121 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1122 LDLM_IBITS, &policy,
1123 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1128 it->it_lock_handle = lockh.cookie;
1129 it->it_lock_mode = mode;
1131 it->it_lock_handle = 0;
1132 it->it_lock_mode = 0;
1139 * This long block is all about fixing up the lock and request state
1140 * so that it is correct as of the moment _before_ the operation was
1141 * applied; that way, the VFS will think that everything is normal and
1142 * call Lustre's regular VFS methods.
1144 * If we're performing a creation, that means that unless the creation
1145 * failed with EEXIST, we should fake up a negative dentry.
1147 * For everything else, we want to lookup to succeed.
1149 * One additional note: if CREATE or OPEN succeeded, we add an extra
1150 * reference to the request because we need to keep it around until
1151 * ll_create/ll_open gets called.
1153 * The server will return to us, in it_disposition, an indication of
1154 * exactly what it_status refers to.
1156 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1157 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1158 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1159 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1162 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1165 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1166 struct lookup_intent *it, struct ptlrpc_request **reqp,
1167 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1169 struct ldlm_enqueue_info einfo = {
1170 .ei_type = LDLM_IBITS,
1171 .ei_mode = it_to_lock_mode(it),
1172 .ei_cb_bl = cb_blocking,
1173 .ei_cb_cp = ldlm_completion_ast,
1174 .ei_cb_gl = mdc_ldlm_glimpse_ast,
1176 struct lustre_handle lockh;
1181 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1182 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1183 op_data->op_name, PFID(&op_data->op_fid2),
1184 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1188 if (fid_is_sane(&op_data->op_fid2) &&
1189 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1190 /* We could just return 1 immediately, but since we should only
1191 * be called in revalidate_it if we already have a lock, let's
1193 it->it_lock_handle = 0;
1194 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1195 /* Only return failure if it was not GETATTR by cfid
1196 (from inode_revalidate) */
1197 if (rc || op_data->op_namelen != 0)
1201 /* For case if upper layer did not alloc fid, do it now. */
1202 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1203 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1205 CERROR("Can't alloc new fid, rc %d\n", rc);
1210 rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1215 *reqp = it->it_request;
1216 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1220 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1221 struct ptlrpc_request *req,
1224 struct mdc_getattr_args *ga = args;
1225 struct obd_export *exp = ga->ga_exp;
1226 struct md_enqueue_info *minfo = ga->ga_minfo;
1227 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1228 struct lookup_intent *it;
1229 struct lustre_handle *lockh;
1230 struct obd_device *obddev;
1231 struct ldlm_reply *lockrep;
1232 __u64 flags = LDLM_FL_HAS_INTENT;
1236 lockh = &minfo->mi_lockh;
1238 obddev = class_exp2obd(exp);
1240 obd_put_request_slot(&obddev->u.cli);
1241 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1244 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1245 &flags, NULL, 0, lockh, rc);
1247 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1248 mdc_clear_replay_flag(req, rc);
1252 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1253 LASSERT(lockrep != NULL);
1255 lockrep->lock_policy_res2 =
1256 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1258 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1262 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1266 minfo->mi_cb(req, minfo, rc);
1270 int mdc_intent_getattr_async(struct obd_export *exp,
1271 struct md_enqueue_info *minfo)
1273 struct md_op_data *op_data = &minfo->mi_data;
1274 struct lookup_intent *it = &minfo->mi_it;
1275 struct ptlrpc_request *req;
1276 struct mdc_getattr_args *ga;
1277 struct obd_device *obddev = class_exp2obd(exp);
1278 struct ldlm_res_id res_id;
1279 union ldlm_policy_data policy = {
1280 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1281 MDS_INODELOCK_UPDATE } };
1283 __u64 flags = LDLM_FL_HAS_INTENT;
1286 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1287 (int)op_data->op_namelen, op_data->op_name,
1288 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1290 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1291 /* If the MDT return -ERANGE because of large ACL, then the sponsor
1292 * of the async getattr RPC will handle that by itself. */
1293 req = mdc_intent_getattr_pack(exp, it, op_data,
1294 LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1296 RETURN(PTR_ERR(req));
1298 rc = obd_get_request_slot(&obddev->u.cli);
1300 ptlrpc_req_finished(req);
1304 /* With Data-on-MDT the glimpse callback is needed too.
1305 * It is set here in advance but not in mdc_finish_enqueue()
1306 * to avoid possible races. It is safe to have glimpse handler
1307 * for non-DOM locks and costs nothing.*/
1308 if (minfo->mi_einfo.ei_cb_gl == NULL)
1309 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1311 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1312 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1314 obd_put_request_slot(&obddev->u.cli);
1315 ptlrpc_req_finished(req);
1319 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1320 ga = ptlrpc_req_async_args(req);
1322 ga->ga_minfo = minfo;
1324 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1325 ptlrpcd_add_req(req);