4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_MDC
35 #include <linux/module.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
48 #include "mdc_internal.h"
50 struct mdc_getattr_args {
51 struct obd_export *ga_exp;
52 struct md_enqueue_info *ga_minfo;
55 int it_open_error(int phase, struct lookup_intent *it)
57 if (it_disposition(it, DISP_OPEN_LEASE)) {
58 if (phase >= DISP_OPEN_LEASE)
63 if (it_disposition(it, DISP_OPEN_OPEN)) {
64 if (phase >= DISP_OPEN_OPEN)
70 if (it_disposition(it, DISP_OPEN_CREATE)) {
71 if (phase >= DISP_OPEN_CREATE)
77 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78 if (phase >= DISP_LOOKUP_EXECD)
84 if (it_disposition(it, DISP_IT_EXECD)) {
85 if (phase >= DISP_IT_EXECD)
91 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
96 EXPORT_SYMBOL(it_open_error);
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100 void *data, __u64 *bits)
102 struct ldlm_lock *lock;
103 struct inode *new_inode = data;
109 if (!lustre_handle_is_used(lockh))
112 lock = ldlm_handle2lock(lockh);
114 LASSERT(lock != NULL);
115 lock_res_and_lock(lock);
116 if (lock->l_resource->lr_lvb_inode &&
117 lock->l_resource->lr_lvb_inode != data) {
118 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119 LASSERTF(old_inode->i_state & I_FREEING,
120 "Found existing inode %p/%lu/%u state %lu in lock: "
121 "setting data to %p/%lu/%u\n", old_inode,
122 old_inode->i_ino, old_inode->i_generation,
124 new_inode, new_inode->i_ino, new_inode->i_generation);
126 lock->l_resource->lr_lvb_inode = new_inode;
128 *bits = lock->l_policy_data.l_inodebits.bits;
130 unlock_res_and_lock(lock);
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137 const struct lu_fid *fid, enum ldlm_type type,
138 union ldlm_policy_data *policy,
139 enum ldlm_mode mode, struct lustre_handle *lockh)
141 struct ldlm_res_id res_id;
145 fid_build_reg_res_name(fid, &res_id);
146 /* LU-4405: Clear bits not supported by server */
147 policy->l_inodebits.bits &= exp_connect_ibits(exp);
148 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149 &res_id, type, policy, mode, lockh, 0);
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154 union ldlm_policy_data *policy, enum ldlm_mode mode,
155 enum ldlm_cancel_flags flags, void *opaque)
157 struct obd_device *obd = class_exp2obd(exp);
158 struct ldlm_res_id res_id;
163 fid_build_reg_res_name(fid, &res_id);
164 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165 policy, mode, flags, opaque);
169 int mdc_null_inode(struct obd_export *exp,
170 const struct lu_fid *fid)
172 struct ldlm_res_id res_id;
173 struct ldlm_resource *res;
174 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
177 LASSERTF(ns != NULL, "no namespace passed\n");
179 fid_build_reg_res_name(fid, &res_id);
181 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
186 res->lr_lvb_inode = NULL;
189 ldlm_resource_putref(res);
193 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
195 /* Don't hold error requests for replay. */
196 if (req->rq_replay) {
197 spin_lock(&req->rq_lock);
199 spin_unlock(&req->rq_lock);
201 if (rc && req->rq_transno != 0) {
202 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
207 /* Save a large LOV EA into the request buffer so that it is available
208 * for replay. We don't do this in the initial request because the
209 * original request doesn't need this buffer (at most it sends just the
210 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211 * buffer and may also be difficult to allocate and save a very large
212 * request buffer for each open. (bug 5707)
214 * OOM here may cause recovery failure if lmm is needed (only for the
215 * original open if the MDS crashed just when this client also OOM'd)
216 * but this is incredibly unlikely, and questionable whether the client
217 * could do MDS recovery under OOM anyways... */
218 int mdc_save_lovea(struct ptlrpc_request *req,
219 const struct req_msg_field *field,
220 void *data, u32 size)
222 struct req_capsule *pill = &req->rq_pill;
226 if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
227 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
229 CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
230 req->rq_export->exp_obd->obd_name,
235 req_capsule_shrink(pill, field, size, RCL_CLIENT);
238 req_capsule_set_size(pill, field, RCL_CLIENT, size);
239 lmm = req_capsule_client_get(pill, field);
241 memcpy(lmm, data, size);
246 static struct ptlrpc_request *
247 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
248 struct md_op_data *op_data, __u32 acl_bufsize)
250 struct ptlrpc_request *req;
251 struct obd_device *obddev = class_exp2obd(exp);
252 struct ldlm_intent *lit;
253 const void *lmm = op_data->op_data;
254 __u32 lmmsize = op_data->op_data_size;
255 struct list_head cancels = LIST_HEAD_INIT(cancels);
259 int repsize, repsize_estimate;
263 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
265 /* XXX: openlock is not cancelled for cross-refs. */
266 /* If inode is known, cancel conflicting OPEN locks. */
267 if (fid_is_sane(&op_data->op_fid2)) {
268 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
269 if (it->it_flags & MDS_FMODE_WRITE)
274 if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
277 else if (it->it_flags & FMODE_EXEC)
283 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
288 /* If CREATE, cancel parent's UPDATE lock. */
289 if (it->it_op & IT_CREAT)
293 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
295 MDS_INODELOCK_UPDATE);
297 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
298 &RQF_LDLM_INTENT_OPEN);
300 ldlm_lock_list_put(&cancels, l_bl_ast, count);
301 RETURN(ERR_PTR(-ENOMEM));
304 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
305 op_data->op_namelen + 1);
306 if (cl_is_lov_delay_create(it->it_flags)) {
307 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
308 LASSERT(lmmsize == 0);
309 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
311 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
312 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
315 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
316 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
317 strlen(op_data->op_file_secctx_name) + 1 : 0);
319 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
320 op_data->op_file_secctx_size);
322 /* get SELinux policy info if any */
323 rc = sptlrpc_get_sepol(req);
325 ptlrpc_request_free(req);
328 req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
329 strlen(req->rq_sepol) ?
330 strlen(req->rq_sepol) + 1 : 0);
332 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
334 ptlrpc_request_free(req);
338 spin_lock(&req->rq_lock);
339 req->rq_replay = req->rq_import->imp_replayable;
340 spin_unlock(&req->rq_lock);
342 /* pack the intent */
343 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
344 lit->opc = (__u64)it->it_op;
346 /* pack the intended request */
347 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
350 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
351 obddev->u.cli.cl_max_mds_easize);
352 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
355 * Inline buffer for possible data from Data-on-MDT files.
357 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
358 sizeof(struct niobuf_remote));
359 ptlrpc_request_set_replen(req);
361 /* Get real repbuf allocated size as rounded up power of 2 */
362 repsize = size_roundup_power2(req->rq_replen +
363 lustre_msg_early_size());
364 /* Estimate free space for DoM files in repbuf */
365 repsize_estimate = repsize - (req->rq_replen -
366 obddev->u.cli.cl_max_mds_easize +
367 sizeof(struct lov_comp_md_v1) +
368 sizeof(struct lov_comp_md_entry_v1) +
369 lov_mds_md_size(0, LOV_MAGIC_V3));
371 if (repsize_estimate < obddev->u.cli.cl_dom_min_inline_repsize) {
372 repsize = obddev->u.cli.cl_dom_min_inline_repsize -
373 repsize_estimate + sizeof(struct niobuf_remote);
374 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
376 sizeof(struct niobuf_remote) + repsize);
377 ptlrpc_request_set_replen(req);
378 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
379 repsize, req->rq_replen);
380 repsize = size_roundup_power2(req->rq_replen +
381 lustre_msg_early_size());
383 /* The only way to report real allocated repbuf size to the server
384 * is the lm_repsize but it must be set prior buffer allocation itself
385 * due to security reasons - it is part of buffer used in signature
386 * calculation (see LU-11414). Therefore the saved size is predicted
387 * value as rq_replen rounded to the next higher power of 2.
388 * Such estimation is safe. Though the final allocated buffer might
389 * be even larger, it is not possible to know that at this point.
391 req->rq_reqmsg->lm_repsize = repsize;
395 #define GA_DEFAULT_EA_NAME_LEN 20
396 #define GA_DEFAULT_EA_VAL_LEN 250
397 #define GA_DEFAULT_EA_NUM 10
399 static struct ptlrpc_request *
400 mdc_intent_getxattr_pack(struct obd_export *exp,
401 struct lookup_intent *it,
402 struct md_op_data *op_data)
404 struct ptlrpc_request *req;
405 struct ldlm_intent *lit;
407 struct list_head cancels = LIST_HEAD_INIT(cancels);
408 u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM;
412 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
413 &RQF_LDLM_INTENT_GETXATTR);
415 RETURN(ERR_PTR(-ENOMEM));
417 /* get SELinux policy info if any */
418 rc = sptlrpc_get_sepol(req);
420 ptlrpc_request_free(req);
423 req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
424 strlen(req->rq_sepol) ?
425 strlen(req->rq_sepol) + 1 : 0);
427 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
429 ptlrpc_request_free(req);
433 /* pack the intent */
434 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
435 lit->opc = IT_GETXATTR;
437 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
438 /* If the supplied buffer is too small then the server will
439 * return -ERANGE and llite will fallback to using non cached
440 * xattr operations. On servers before 2.10.1 a (non-cached)
441 * listxattr RPC for an orphan or dead file causes an oops. So
442 * let's try to avoid sending too small a buffer to too old a
443 * server. This is effectively undoing the memory conservation
444 * of LU-9417 when it would be *more* likely to crash the
445 * server. See LU-9856. */
446 if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
447 ea_vals_buf_size = max_t(u32, ea_vals_buf_size,
448 exp->exp_connect_data.ocd_max_easize);
451 /* pack the intended request */
452 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
453 ea_vals_buf_size, -1, 0);
455 /* get SELinux policy info if any */
456 mdc_file_sepol_pack(req);
458 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
459 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
461 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
464 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
465 sizeof(u32) * GA_DEFAULT_EA_NUM);
467 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
469 ptlrpc_request_set_replen(req);
474 static struct ptlrpc_request *
475 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
476 struct md_op_data *op_data, __u32 acl_bufsize)
478 struct ptlrpc_request *req;
479 struct obd_device *obddev = class_exp2obd(exp);
480 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
481 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
482 OBD_MD_MEA | OBD_MD_FLACL;
483 struct ldlm_intent *lit;
488 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
489 &RQF_LDLM_INTENT_GETATTR);
491 RETURN(ERR_PTR(-ENOMEM));
493 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
494 op_data->op_namelen + 1);
496 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
498 ptlrpc_request_free(req);
502 /* pack the intent */
503 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
504 lit->opc = (__u64)it->it_op;
506 if (obddev->u.cli.cl_default_mds_easize > 0)
507 easize = obddev->u.cli.cl_default_mds_easize;
509 easize = obddev->u.cli.cl_max_mds_easize;
511 /* pack the intended request */
512 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
514 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
515 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
516 ptlrpc_request_set_replen(req);
520 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
521 struct lookup_intent *it,
522 struct md_op_data *op_data)
524 struct obd_device *obd = class_exp2obd(exp);
525 struct ptlrpc_request *req;
526 struct ldlm_intent *lit;
527 struct layout_intent *layout;
531 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
532 &RQF_LDLM_INTENT_LAYOUT);
534 RETURN(ERR_PTR(-ENOMEM));
536 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
537 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
539 ptlrpc_request_free(req);
543 /* pack the intent */
544 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
545 lit->opc = (__u64)it->it_op;
547 /* pack the layout intent request */
548 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
549 LASSERT(op_data->op_data != NULL);
550 LASSERT(op_data->op_data_size == sizeof(*layout));
551 memcpy(layout, op_data->op_data, sizeof(*layout));
553 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
554 obd->u.cli.cl_default_mds_easize);
555 ptlrpc_request_set_replen(req);
559 static struct ptlrpc_request *
560 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
562 struct ptlrpc_request *req;
566 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
568 RETURN(ERR_PTR(-ENOMEM));
570 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
572 ptlrpc_request_free(req);
576 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
577 ptlrpc_request_set_replen(req);
581 static int mdc_finish_enqueue(struct obd_export *exp,
582 struct ptlrpc_request *req,
583 struct ldlm_enqueue_info *einfo,
584 struct lookup_intent *it,
585 struct lustre_handle *lockh,
588 struct req_capsule *pill = &req->rq_pill;
589 struct ldlm_request *lockreq;
590 struct ldlm_reply *lockrep;
591 struct ldlm_lock *lock;
592 struct mdt_body *body = NULL;
593 void *lvb_data = NULL;
599 /* Similarly, if we're going to replay this request, we don't want to
600 * actually get a lock, just perform the intent. */
601 if (req->rq_transno || req->rq_replay) {
602 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
603 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
606 if (rc == ELDLM_LOCK_ABORTED) {
608 memset(lockh, 0, sizeof(*lockh));
610 } else { /* rc = 0 */
611 lock = ldlm_handle2lock(lockh);
612 LASSERT(lock != NULL);
614 /* If the server gave us back a different lock mode, we should
615 * fix up our variables. */
616 if (lock->l_req_mode != einfo->ei_mode) {
617 ldlm_lock_addref(lockh, lock->l_req_mode);
618 ldlm_lock_decref(lockh, einfo->ei_mode);
619 einfo->ei_mode = lock->l_req_mode;
624 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
625 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
627 it->it_disposition = (int)lockrep->lock_policy_res1;
628 it->it_status = (int)lockrep->lock_policy_res2;
629 it->it_lock_mode = einfo->ei_mode;
630 it->it_lock_handle = lockh->cookie;
631 it->it_request = req;
633 /* Technically speaking rq_transno must already be zero if
634 * it_status is in error, so the check is a bit redundant */
635 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
636 mdc_clear_replay_flag(req, it->it_status);
638 /* If we're doing an IT_OPEN which did not result in an actual
639 * successful open, then we need to remove the bit which saves
640 * this request for unconditional replay.
642 * It's important that we do this first! Otherwise we might exit the
643 * function without doing so, and try to replay a failed create
645 if (it->it_op & IT_OPEN && req->rq_replay &&
646 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
647 mdc_clear_replay_flag(req, it->it_status);
649 DEBUG_REQ(D_RPCTRACE, req, "op: %x disposition: %x, status: %d",
650 it->it_op, it->it_disposition, it->it_status);
652 /* We know what to expect, so we do any byte flipping required here */
653 if (it_has_reply_body(it)) {
654 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
656 CERROR ("Can't swab mdt_body\n");
660 if (it_disposition(it, DISP_OPEN_OPEN) &&
661 !it_open_error(DISP_OPEN_OPEN, it)) {
663 * If this is a successful OPEN request, we need to set
664 * replay handler and data early, so that if replay
665 * happens immediately after swabbing below, new reply
666 * is swabbed by that handler correctly.
668 mdc_set_open_replay_data(NULL, NULL, it);
671 if (it_disposition(it, DISP_OPEN_CREATE) &&
672 !it_open_error(DISP_OPEN_CREATE, it)) {
673 lprocfs_counter_incr(exp->exp_obd->obd_md_stats,
677 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
680 mdc_update_max_ea_from_body(exp, body);
683 * The eadata is opaque; just check that it is there.
684 * Eventually, obd_unpackmd() will check the contents.
686 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
687 body->mbo_eadatasize);
691 /* save lvb data and length in case this is for layout
694 lvb_len = body->mbo_eadatasize;
697 * We save the reply LOV EA in case we have to replay a
698 * create for recovery. If we didn't allocate a large
699 * enough request buffer above we need to reallocate it
700 * here to hold the actual LOV EA.
702 * To not save LOV EA if request is not going to replay
703 * (for example error one).
705 if ((it->it_op & IT_OPEN) && req->rq_replay) {
706 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
707 body->mbo_eadatasize);
709 body->mbo_valid &= ~OBD_MD_FLEASIZE;
710 body->mbo_eadatasize = 0;
715 } else if (it->it_op & IT_LAYOUT) {
716 /* maybe the lock was granted right away and layout
717 * is packed into RMF_DLM_LVB of req */
718 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
719 CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
720 class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
722 lvb_data = req_capsule_server_sized_get(pill,
723 &RMF_DLM_LVB, lvb_len);
724 if (lvb_data == NULL)
728 * save replied layout data to the request buffer for
729 * recovery consideration (lest MDS reinitialize
730 * another set of OST objects).
733 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
738 /* fill in stripe data for layout lock.
739 * LU-6581: trust layout data only if layout lock is granted. The MDT
740 * has stopped sending layout unless the layout lock is granted. The
741 * client still does this checking in case it's talking with an old
742 * server. - Jinshan */
743 lock = ldlm_handle2lock(lockh);
747 if (ldlm_has_layout(lock) && lvb_data != NULL &&
748 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
751 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
752 ldlm_it2str(it->it_op), lvb_len);
754 OBD_ALLOC_LARGE(lmm, lvb_len);
756 GOTO(out_lock, rc = -ENOMEM);
758 memcpy(lmm, lvb_data, lvb_len);
760 /* install lvb_data */
761 lock_res_and_lock(lock);
762 if (lock->l_lvb_data == NULL) {
763 lock->l_lvb_type = LVB_T_LAYOUT;
764 lock->l_lvb_data = lmm;
765 lock->l_lvb_len = lvb_len;
768 unlock_res_and_lock(lock);
770 OBD_FREE_LARGE(lmm, lvb_len);
773 if (ldlm_has_dom(lock)) {
774 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
776 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
777 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
778 LDLM_ERROR(lock, "%s: DoM lock without size.\n",
779 exp->exp_obd->obd_name);
780 GOTO(out_lock, rc = -EPROTO);
783 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
784 ldlm_it2str(it->it_op), body->mbo_dom_size);
786 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
794 /* We always reserve enough space in the reply packet for a stripe MD, because
795 * we don't know in advance the file type. */
796 static int mdc_enqueue_base(struct obd_export *exp,
797 struct ldlm_enqueue_info *einfo,
798 const union ldlm_policy_data *policy,
799 struct lookup_intent *it,
800 struct md_op_data *op_data,
801 struct lustre_handle *lockh,
802 __u64 extra_lock_flags)
804 struct obd_device *obddev = class_exp2obd(exp);
805 struct ptlrpc_request *req = NULL;
806 __u64 flags, saved_flags = extra_lock_flags;
807 struct ldlm_res_id res_id;
808 static const union ldlm_policy_data lookup_policy = {
809 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
810 static const union ldlm_policy_data update_policy = {
811 .l_inodebits = { MDS_INODELOCK_UPDATE } };
812 static const union ldlm_policy_data layout_policy = {
813 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
814 static const union ldlm_policy_data getxattr_policy = {
815 .l_inodebits = { MDS_INODELOCK_XATTR } };
816 int generation, resends = 0;
817 struct ldlm_reply *lockrep;
818 struct obd_import *imp = class_exp2cliimp(exp);
820 enum lvb_type lvb_type = 0;
824 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
826 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
829 LASSERT(policy == NULL);
831 saved_flags |= LDLM_FL_HAS_INTENT;
832 if (it->it_op & (IT_GETATTR | IT_READDIR))
833 policy = &update_policy;
834 else if (it->it_op & IT_LAYOUT)
835 policy = &layout_policy;
836 else if (it->it_op & IT_GETXATTR)
837 policy = &getxattr_policy;
839 policy = &lookup_policy;
842 generation = obddev->u.cli.cl_import->imp_generation;
843 if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
844 acl_bufsize = imp->imp_connect_data.ocd_max_easize;
846 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
851 /* The only way right now is FLOCK. */
852 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
854 res_id.name[3] = LDLM_FLOCK;
855 } else if (it->it_op & IT_OPEN) {
856 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
857 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
858 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
859 } else if (it->it_op & IT_READDIR) {
860 req = mdc_enqueue_pack(exp, 0);
861 } else if (it->it_op & IT_LAYOUT) {
862 if (!imp_connect_lvb_type(imp))
864 req = mdc_intent_layout_pack(exp, it, op_data);
865 lvb_type = LVB_T_LAYOUT;
866 } else if (it->it_op & IT_GETXATTR) {
867 req = mdc_intent_getxattr_pack(exp, it, op_data);
874 RETURN(PTR_ERR(req));
877 req->rq_generation_set = 1;
878 req->rq_import_generation = generation;
879 req->rq_sent = ktime_get_real_seconds() + resends;
882 /* It is important to obtain modify RPC slot first (if applicable), so
883 * that threads that are waiting for a modify RPC slot are not polluting
884 * our rpcs in flight counter.
885 * We do not do flock request limiting, though */
887 mdc_get_mod_rpc_slot(req, it);
888 rc = obd_get_request_slot(&obddev->u.cli);
890 mdc_put_mod_rpc_slot(req, it);
891 mdc_clear_replay_flag(req, 0);
892 ptlrpc_req_finished(req);
897 /* With Data-on-MDT the glimpse callback is needed too.
898 * It is set here in advance but not in mdc_finish_enqueue()
899 * to avoid possible races. It is safe to have glimpse handler
900 * for non-DOM locks and costs nothing.*/
901 if (einfo->ei_cb_gl == NULL)
902 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
904 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
905 0, lvb_type, lockh, 0);
907 /* For flock requests we immediatelly return without further
908 delay and let caller deal with the rest, since rest of
909 this function metadata processing makes no sense for flock
910 requests anyway. But in case of problem during comms with
911 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
912 can not rely on caller and this mainly for F_UNLCKs
913 (explicits or automatically generated by Kernel to clean
914 current FLocks upon exit) that can't be trashed */
915 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
916 (einfo->ei_type == LDLM_FLOCK) &&
917 (einfo->ei_mode == LCK_NL))
922 obd_put_request_slot(&obddev->u.cli);
923 mdc_put_mod_rpc_slot(req, it);
927 "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
928 obddev->obd_name, PFID(&op_data->op_fid1),
929 PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
931 mdc_clear_replay_flag(req, rc);
932 ptlrpc_req_finished(req);
936 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
937 LASSERT(lockrep != NULL);
939 lockrep->lock_policy_res2 =
940 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
942 /* Retry infinitely when the server returns -EINPROGRESS for the
943 * intent operation, when server returns -EINPROGRESS for acquiring
944 * intent lock, we'll retry in after_reply(). */
945 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
946 mdc_clear_replay_flag(req, rc);
947 ptlrpc_req_finished(req);
948 if (generation == obddev->u.cli.cl_import->imp_generation) {
949 if (signal_pending(current))
953 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
954 obddev->obd_name, resends, it->it_op,
955 PFID(&op_data->op_fid1),
956 PFID(&op_data->op_fid2));
959 CDEBUG(D_HA, "resend cross eviction\n");
964 if ((int)lockrep->lock_policy_res2 == -ERANGE &&
965 it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
966 acl_bufsize != imp->imp_connect_data.ocd_max_easize) {
967 mdc_clear_replay_flag(req, -ERANGE);
968 ptlrpc_req_finished(req);
969 acl_bufsize = imp->imp_connect_data.ocd_max_easize;
973 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
975 if (lustre_handle_is_used(lockh)) {
976 ldlm_lock_decref(lockh, einfo->ei_mode);
977 memset(lockh, 0, sizeof(*lockh));
979 ptlrpc_req_finished(req);
981 it->it_lock_handle = 0;
982 it->it_lock_mode = 0;
983 it->it_request = NULL;
989 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
990 const union ldlm_policy_data *policy,
991 struct md_op_data *op_data,
992 struct lustre_handle *lockh, __u64 extra_lock_flags)
994 return mdc_enqueue_base(exp, einfo, policy, NULL,
995 op_data, lockh, extra_lock_flags);
998 static int mdc_finish_intent_lock(struct obd_export *exp,
999 struct ptlrpc_request *request,
1000 struct md_op_data *op_data,
1001 struct lookup_intent *it,
1002 struct lustre_handle *lockh)
1004 struct lustre_handle old_lock;
1005 struct ldlm_lock *lock;
1009 LASSERT(request != NULL);
1010 LASSERT(request != LP_POISON);
1011 LASSERT(request->rq_repmsg != LP_POISON);
1013 if (it->it_op & IT_READDIR)
1016 if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
1017 if (it->it_status != 0)
1018 GOTO(out, rc = it->it_status);
1020 if (!it_disposition(it, DISP_IT_EXECD)) {
1021 /* The server failed before it even started executing
1022 * the intent, i.e. because it couldn't unpack the
1025 LASSERT(it->it_status != 0);
1026 GOTO(out, rc = it->it_status);
1028 rc = it_open_error(DISP_IT_EXECD, it);
1032 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1036 /* keep requests around for the multiple phases of the call
1037 * this shows the DISP_XX must guarantee we make it into the
1040 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1041 it_disposition(it, DISP_OPEN_CREATE) &&
1042 !it_open_error(DISP_OPEN_CREATE, it)) {
1043 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1044 /* balanced in ll_create_node */
1045 ptlrpc_request_addref(request);
1047 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1048 it_disposition(it, DISP_OPEN_OPEN) &&
1049 !it_open_error(DISP_OPEN_OPEN, it)) {
1050 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1051 /* balanced in ll_file_open */
1052 ptlrpc_request_addref(request);
1053 /* BUG 11546 - eviction in the middle of open rpc
1056 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1060 if (it->it_op & IT_CREAT) {
1061 /* XXX this belongs in ll_create_it */
1062 } else if (it->it_op == IT_OPEN) {
1063 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1065 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1069 /* If we already have a matching lock, then cancel the new
1070 * one. We have to set the data here instead of in
1071 * mdc_enqueue, because we need to use the child's inode as
1072 * the l_ast_data to match, and that's not available until
1073 * intent_finish has performed the iget().) */
1074 lock = ldlm_handle2lock(lockh);
1076 union ldlm_policy_data policy = lock->l_policy_data;
1077 LDLM_DEBUG(lock, "matching against this");
1079 if (it_has_reply_body(it)) {
1080 struct mdt_body *body;
1082 body = req_capsule_server_get(&request->rq_pill,
1084 /* mdc_enqueue checked */
1085 LASSERT(body != NULL);
1086 LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1087 &lock->l_resource->lr_name),
1088 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1089 PLDLMRES(lock->l_resource),
1090 PFID(&body->mbo_fid1));
1092 LDLM_LOCK_PUT(lock);
1094 memcpy(&old_lock, lockh, sizeof(*lockh));
1095 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1096 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1097 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1098 memcpy(lockh, &old_lock, sizeof(old_lock));
1099 it->it_lock_handle = lockh->cookie;
1105 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1106 (int)op_data->op_namelen, op_data->op_name,
1107 ldlm_it2str(it->it_op), it->it_status,
1108 it->it_disposition, rc);
1112 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1113 struct lu_fid *fid, __u64 *bits)
1115 /* We could just return 1 immediately, but since we should only
1116 * be called in revalidate_it if we already have a lock, let's
1118 struct ldlm_res_id res_id;
1119 struct lustre_handle lockh;
1120 union ldlm_policy_data policy;
1121 enum ldlm_mode mode;
1124 if (it->it_lock_handle) {
1125 lockh.cookie = it->it_lock_handle;
1126 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1128 fid_build_reg_res_name(fid, &res_id);
1129 switch (it->it_op) {
1131 /* File attributes are held under multiple bits:
1132 * nlink is under lookup lock, size and times are
1133 * under UPDATE lock and recently we've also got
1134 * a separate permissions lock for owner/group/acl that
1135 * were protected by lookup lock before.
1136 * Getattr must provide all of that information,
1137 * so we need to ensure we have all of those locks.
1138 * Unfortunately, if the bits are split across multiple
1139 * locks, there's no easy way to match all of them here,
1140 * so an extra RPC would be performed to fetch all
1141 * of those bits at once for now. */
1142 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1143 * but for old MDTs (< 2.4), permission is covered
1144 * by LOOKUP lock, so it needs to match all bits here.*/
1145 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1146 MDS_INODELOCK_LOOKUP |
1150 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1153 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1156 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1160 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1161 LDLM_IBITS, &policy,
1162 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1167 it->it_lock_handle = lockh.cookie;
1168 it->it_lock_mode = mode;
1170 it->it_lock_handle = 0;
1171 it->it_lock_mode = 0;
1178 * This long block is all about fixing up the lock and request state
1179 * so that it is correct as of the moment _before_ the operation was
1180 * applied; that way, the VFS will think that everything is normal and
1181 * call Lustre's regular VFS methods.
1183 * If we're performing a creation, that means that unless the creation
1184 * failed with EEXIST, we should fake up a negative dentry.
1186 * For everything else, we want to lookup to succeed.
1188 * One additional note: if CREATE or OPEN succeeded, we add an extra
1189 * reference to the request because we need to keep it around until
1190 * ll_create/ll_open gets called.
1192 * The server will return to us, in it_disposition, an indication of
1193 * exactly what it_status refers to.
1195 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1196 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1197 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1198 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1201 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1204 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1205 struct lookup_intent *it, struct ptlrpc_request **reqp,
1206 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1208 struct ldlm_enqueue_info einfo = {
1209 .ei_type = LDLM_IBITS,
1210 .ei_mode = it_to_lock_mode(it),
1211 .ei_cb_bl = cb_blocking,
1212 .ei_cb_cp = ldlm_completion_ast,
1213 .ei_cb_gl = mdc_ldlm_glimpse_ast,
1215 struct lustre_handle lockh;
1220 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1221 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1222 op_data->op_name, PFID(&op_data->op_fid2),
1223 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1227 if (fid_is_sane(&op_data->op_fid2) &&
1228 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1229 /* We could just return 1 immediately, but since we should only
1230 * be called in revalidate_it if we already have a lock, let's
1232 it->it_lock_handle = 0;
1233 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1234 /* Only return failure if it was not GETATTR by cfid
1235 (from inode_revalidate) */
1236 if (rc || op_data->op_namelen != 0)
1240 /* For case if upper layer did not alloc fid, do it now. */
1241 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1242 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1244 CERROR("Can't alloc new fid, rc %d\n", rc);
1249 rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1254 *reqp = it->it_request;
1255 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1259 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1260 struct ptlrpc_request *req,
1263 struct mdc_getattr_args *ga = args;
1264 struct obd_export *exp = ga->ga_exp;
1265 struct md_enqueue_info *minfo = ga->ga_minfo;
1266 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1267 struct lookup_intent *it;
1268 struct lustre_handle *lockh;
1269 struct obd_device *obddev;
1270 struct ldlm_reply *lockrep;
1271 __u64 flags = LDLM_FL_HAS_INTENT;
1275 lockh = &minfo->mi_lockh;
1277 obddev = class_exp2obd(exp);
1279 obd_put_request_slot(&obddev->u.cli);
1280 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1283 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1284 &flags, NULL, 0, lockh, rc);
1286 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1287 mdc_clear_replay_flag(req, rc);
1291 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1292 LASSERT(lockrep != NULL);
1294 lockrep->lock_policy_res2 =
1295 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1297 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1301 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1305 minfo->mi_cb(req, minfo, rc);
1309 int mdc_intent_getattr_async(struct obd_export *exp,
1310 struct md_enqueue_info *minfo)
1312 struct md_op_data *op_data = &minfo->mi_data;
1313 struct lookup_intent *it = &minfo->mi_it;
1314 struct ptlrpc_request *req;
1315 struct mdc_getattr_args *ga;
1316 struct obd_device *obddev = class_exp2obd(exp);
1317 struct ldlm_res_id res_id;
1318 union ldlm_policy_data policy = {
1319 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1320 MDS_INODELOCK_UPDATE } };
1322 __u64 flags = LDLM_FL_HAS_INTENT;
1325 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1326 (int)op_data->op_namelen, op_data->op_name,
1327 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1329 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1330 /* If the MDT return -ERANGE because of large ACL, then the sponsor
1331 * of the async getattr RPC will handle that by itself. */
1332 req = mdc_intent_getattr_pack(exp, it, op_data,
1333 LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1335 RETURN(PTR_ERR(req));
1337 rc = obd_get_request_slot(&obddev->u.cli);
1339 ptlrpc_req_finished(req);
1343 /* With Data-on-MDT the glimpse callback is needed too.
1344 * It is set here in advance but not in mdc_finish_enqueue()
1345 * to avoid possible races. It is safe to have glimpse handler
1346 * for non-DOM locks and costs nothing.*/
1347 if (minfo->mi_einfo.ei_cb_gl == NULL)
1348 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1350 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1351 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1353 obd_put_request_slot(&obddev->u.cli);
1354 ptlrpc_req_finished(req);
1358 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1359 ga = ptlrpc_req_async_args(req);
1361 ga->ga_minfo = minfo;
1363 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1364 ptlrpcd_add_req(req);