4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_MDC
35 #include <linux/module.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
48 #include "mdc_internal.h"
50 struct mdc_getattr_args {
51 struct obd_export *ga_exp;
52 struct md_enqueue_info *ga_minfo;
55 int it_open_error(int phase, struct lookup_intent *it)
57 if (it_disposition(it, DISP_OPEN_LEASE)) {
58 if (phase >= DISP_OPEN_LEASE)
63 if (it_disposition(it, DISP_OPEN_OPEN)) {
64 if (phase >= DISP_OPEN_OPEN)
70 if (it_disposition(it, DISP_OPEN_CREATE)) {
71 if (phase >= DISP_OPEN_CREATE)
77 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78 if (phase >= DISP_LOOKUP_EXECD)
84 if (it_disposition(it, DISP_IT_EXECD)) {
85 if (phase >= DISP_IT_EXECD)
91 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
96 EXPORT_SYMBOL(it_open_error);
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100 void *data, __u64 *bits)
102 struct ldlm_lock *lock;
103 struct inode *new_inode = data;
109 if (!lustre_handle_is_used(lockh))
112 lock = ldlm_handle2lock(lockh);
114 LASSERT(lock != NULL);
115 lock_res_and_lock(lock);
116 if (lock->l_resource->lr_lvb_inode &&
117 lock->l_resource->lr_lvb_inode != data) {
118 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119 LASSERTF(old_inode->i_state & I_FREEING,
120 "Found existing inode %p/%lu/%u state %lu in lock: "
121 "setting data to %p/%lu/%u\n", old_inode,
122 old_inode->i_ino, old_inode->i_generation,
124 new_inode, new_inode->i_ino, new_inode->i_generation);
126 lock->l_resource->lr_lvb_inode = new_inode;
128 *bits = lock->l_policy_data.l_inodebits.bits;
130 unlock_res_and_lock(lock);
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137 const struct lu_fid *fid, enum ldlm_type type,
138 union ldlm_policy_data *policy,
139 enum ldlm_mode mode, struct lustre_handle *lockh)
141 struct ldlm_res_id res_id;
145 fid_build_reg_res_name(fid, &res_id);
146 /* LU-4405: Clear bits not supported by server */
147 policy->l_inodebits.bits &= exp_connect_ibits(exp);
148 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149 &res_id, type, policy, mode, lockh, 0);
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154 union ldlm_policy_data *policy, enum ldlm_mode mode,
155 enum ldlm_cancel_flags flags, void *opaque)
157 struct obd_device *obd = class_exp2obd(exp);
158 struct ldlm_res_id res_id;
163 fid_build_reg_res_name(fid, &res_id);
164 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165 policy, mode, flags, opaque);
169 int mdc_null_inode(struct obd_export *exp,
170 const struct lu_fid *fid)
172 struct ldlm_res_id res_id;
173 struct ldlm_resource *res;
174 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
177 LASSERTF(ns != NULL, "no namespace passed\n");
179 fid_build_reg_res_name(fid, &res_id);
181 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
186 res->lr_lvb_inode = NULL;
189 ldlm_resource_putref(res);
193 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
195 /* Don't hold error requests for replay. */
196 if (req->rq_replay) {
197 spin_lock(&req->rq_lock);
199 spin_unlock(&req->rq_lock);
201 if (rc && req->rq_transno != 0) {
202 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
207 /* Save a large LOV EA into the request buffer so that it is available
208 * for replay. We don't do this in the initial request because the
209 * original request doesn't need this buffer (at most it sends just the
210 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211 * buffer and may also be difficult to allocate and save a very large
212 * request buffer for each open. (bug 5707)
214 * OOM here may cause recovery failure if lmm is needed (only for the
215 * original open if the MDS crashed just when this client also OOM'd)
216 * but this is incredibly unlikely, and questionable whether the client
217 * could do MDS recovery under OOM anyways... */
218 int mdc_save_lovea(struct ptlrpc_request *req,
219 const struct req_msg_field *field,
220 void *data, u32 size)
222 struct req_capsule *pill = &req->rq_pill;
226 if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
227 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
229 CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
230 req->rq_export->exp_obd->obd_name,
235 req_capsule_shrink(pill, field, size, RCL_CLIENT);
238 req_capsule_set_size(pill, field, RCL_CLIENT, size);
239 lmm = req_capsule_client_get(pill, field);
241 memcpy(lmm, data, size);
246 static struct ptlrpc_request *
247 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
248 struct md_op_data *op_data, __u32 acl_bufsize)
250 struct ptlrpc_request *req;
251 struct obd_device *obddev = class_exp2obd(exp);
252 struct ldlm_intent *lit;
253 const void *lmm = op_data->op_data;
254 __u32 lmmsize = op_data->op_data_size;
255 struct list_head cancels = LIST_HEAD_INIT(cancels);
259 int repsize, repsize_estimate;
263 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
265 /* XXX: openlock is not cancelled for cross-refs. */
266 /* If inode is known, cancel conflicting OPEN locks. */
267 if (fid_is_sane(&op_data->op_fid2)) {
268 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
269 if (it->it_flags & MDS_FMODE_WRITE)
274 if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
277 else if (it->it_flags & FMODE_EXEC)
283 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
288 /* If CREATE, cancel parent's UPDATE lock. */
289 if (it->it_op & IT_CREAT)
293 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
295 MDS_INODELOCK_UPDATE);
297 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
298 &RQF_LDLM_INTENT_OPEN);
300 ldlm_lock_list_put(&cancels, l_bl_ast, count);
301 RETURN(ERR_PTR(-ENOMEM));
304 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
305 op_data->op_namelen + 1);
306 if (cl_is_lov_delay_create(it->it_flags)) {
307 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
308 LASSERT(lmmsize == 0);
309 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
311 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
312 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
315 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
316 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
317 strlen(op_data->op_file_secctx_name) + 1 : 0);
319 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
320 op_data->op_file_secctx_size);
322 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
324 ptlrpc_request_free(req);
328 spin_lock(&req->rq_lock);
329 req->rq_replay = req->rq_import->imp_replayable;
330 spin_unlock(&req->rq_lock);
332 /* pack the intent */
333 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
334 lit->opc = (__u64)it->it_op;
336 /* pack the intended request */
337 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
340 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
341 obddev->u.cli.cl_max_mds_easize);
342 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
345 * Inline buffer for possible data from Data-on-MDT files.
347 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
348 sizeof(struct niobuf_remote));
349 ptlrpc_request_set_replen(req);
351 /* Get real repbuf allocated size as rounded up power of 2 */
352 repsize = size_roundup_power2(req->rq_replen +
353 lustre_msg_early_size());
354 /* Estimate free space for DoM files in repbuf */
355 repsize_estimate = repsize - (req->rq_replen -
356 obddev->u.cli.cl_max_mds_easize +
357 sizeof(struct lov_comp_md_v1) +
358 sizeof(struct lov_comp_md_entry_v1) +
359 lov_mds_md_size(0, LOV_MAGIC_V3));
361 if (repsize_estimate < obddev->u.cli.cl_dom_min_inline_repsize) {
362 repsize = obddev->u.cli.cl_dom_min_inline_repsize -
363 repsize_estimate + sizeof(struct niobuf_remote);
364 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
366 sizeof(struct niobuf_remote) + repsize);
367 ptlrpc_request_set_replen(req);
368 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
369 repsize, req->rq_replen);
370 repsize = size_roundup_power2(req->rq_replen +
371 lustre_msg_early_size());
373 /* The only way to report real allocated repbuf size to the server
374 * is the lm_repsize but it must be set prior buffer allocation itself
375 * due to security reasons - it is part of buffer used in signature
376 * calculation (see LU-11414). Therefore the saved size is predicted
377 * value as rq_replen rounded to the next higher power of 2.
378 * Such estimation is safe. Though the final allocated buffer might
379 * be even larger, it is not possible to know that at this point.
381 req->rq_reqmsg->lm_repsize = repsize;
385 #define GA_DEFAULT_EA_NAME_LEN 20
386 #define GA_DEFAULT_EA_VAL_LEN 250
387 #define GA_DEFAULT_EA_NUM 10
389 static struct ptlrpc_request *
390 mdc_intent_getxattr_pack(struct obd_export *exp,
391 struct lookup_intent *it,
392 struct md_op_data *op_data)
394 struct ptlrpc_request *req;
395 struct ldlm_intent *lit;
397 struct list_head cancels = LIST_HEAD_INIT(cancels);
398 u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM;
402 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
403 &RQF_LDLM_INTENT_GETXATTR);
405 RETURN(ERR_PTR(-ENOMEM));
407 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
409 ptlrpc_request_free(req);
413 /* pack the intent */
414 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
415 lit->opc = IT_GETXATTR;
417 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
418 /* If the supplied buffer is too small then the server will
419 * return -ERANGE and llite will fallback to using non cached
420 * xattr operations. On servers before 2.10.1 a (non-cached)
421 * listxattr RPC for an orphan or dead file causes an oops. So
422 * let's try to avoid sending too small a buffer to too old a
423 * server. This is effectively undoing the memory conservation
424 * of LU-9417 when it would be *more* likely to crash the
425 * server. See LU-9856. */
426 if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
427 ea_vals_buf_size = max_t(u32, ea_vals_buf_size,
428 exp->exp_connect_data.ocd_max_easize);
431 /* pack the intended request */
432 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
433 ea_vals_buf_size, -1, 0);
435 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
436 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
438 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
441 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
442 sizeof(u32) * GA_DEFAULT_EA_NUM);
444 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
446 ptlrpc_request_set_replen(req);
451 static struct ptlrpc_request *
452 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
453 struct md_op_data *op_data, __u32 acl_bufsize)
455 struct ptlrpc_request *req;
456 struct obd_device *obddev = class_exp2obd(exp);
457 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
458 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
459 OBD_MD_MEA | OBD_MD_FLACL;
460 struct ldlm_intent *lit;
465 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
466 &RQF_LDLM_INTENT_GETATTR);
468 RETURN(ERR_PTR(-ENOMEM));
470 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
471 op_data->op_namelen + 1);
473 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
475 ptlrpc_request_free(req);
479 /* pack the intent */
480 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
481 lit->opc = (__u64)it->it_op;
483 if (obddev->u.cli.cl_default_mds_easize > 0)
484 easize = obddev->u.cli.cl_default_mds_easize;
486 easize = obddev->u.cli.cl_max_mds_easize;
488 /* pack the intended request */
489 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
491 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
492 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
493 ptlrpc_request_set_replen(req);
497 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
498 struct lookup_intent *it,
499 struct md_op_data *op_data)
501 struct obd_device *obd = class_exp2obd(exp);
502 struct ptlrpc_request *req;
503 struct ldlm_intent *lit;
504 struct layout_intent *layout;
508 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
509 &RQF_LDLM_INTENT_LAYOUT);
511 RETURN(ERR_PTR(-ENOMEM));
513 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
514 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
516 ptlrpc_request_free(req);
520 /* pack the intent */
521 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
522 lit->opc = (__u64)it->it_op;
524 /* pack the layout intent request */
525 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
526 LASSERT(op_data->op_data != NULL);
527 LASSERT(op_data->op_data_size == sizeof(*layout));
528 memcpy(layout, op_data->op_data, sizeof(*layout));
530 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
531 obd->u.cli.cl_default_mds_easize);
532 ptlrpc_request_set_replen(req);
536 static struct ptlrpc_request *
537 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
539 struct ptlrpc_request *req;
543 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
545 RETURN(ERR_PTR(-ENOMEM));
547 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
549 ptlrpc_request_free(req);
553 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
554 ptlrpc_request_set_replen(req);
558 static int mdc_finish_enqueue(struct obd_export *exp,
559 struct ptlrpc_request *req,
560 struct ldlm_enqueue_info *einfo,
561 struct lookup_intent *it,
562 struct lustre_handle *lockh,
565 struct req_capsule *pill = &req->rq_pill;
566 struct ldlm_request *lockreq;
567 struct ldlm_reply *lockrep;
568 struct ldlm_lock *lock;
569 struct mdt_body *body = NULL;
570 void *lvb_data = NULL;
576 /* Similarly, if we're going to replay this request, we don't want to
577 * actually get a lock, just perform the intent. */
578 if (req->rq_transno || req->rq_replay) {
579 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
580 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
583 if (rc == ELDLM_LOCK_ABORTED) {
585 memset(lockh, 0, sizeof(*lockh));
587 } else { /* rc = 0 */
588 lock = ldlm_handle2lock(lockh);
589 LASSERT(lock != NULL);
591 /* If the server gave us back a different lock mode, we should
592 * fix up our variables. */
593 if (lock->l_req_mode != einfo->ei_mode) {
594 ldlm_lock_addref(lockh, lock->l_req_mode);
595 ldlm_lock_decref(lockh, einfo->ei_mode);
596 einfo->ei_mode = lock->l_req_mode;
601 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
602 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
604 it->it_disposition = (int)lockrep->lock_policy_res1;
605 it->it_status = (int)lockrep->lock_policy_res2;
606 it->it_lock_mode = einfo->ei_mode;
607 it->it_lock_handle = lockh->cookie;
608 it->it_request = req;
610 /* Technically speaking rq_transno must already be zero if
611 * it_status is in error, so the check is a bit redundant */
612 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
613 mdc_clear_replay_flag(req, it->it_status);
615 /* If we're doing an IT_OPEN which did not result in an actual
616 * successful open, then we need to remove the bit which saves
617 * this request for unconditional replay.
619 * It's important that we do this first! Otherwise we might exit the
620 * function without doing so, and try to replay a failed create
622 if (it->it_op & IT_OPEN && req->rq_replay &&
623 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
624 mdc_clear_replay_flag(req, it->it_status);
626 DEBUG_REQ(D_RPCTRACE, req, "op: %x disposition: %x, status: %d",
627 it->it_op, it->it_disposition, it->it_status);
629 /* We know what to expect, so we do any byte flipping required here */
630 if (it_has_reply_body(it)) {
631 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
633 CERROR ("Can't swab mdt_body\n");
637 if (it_disposition(it, DISP_OPEN_OPEN) &&
638 !it_open_error(DISP_OPEN_OPEN, it)) {
640 * If this is a successful OPEN request, we need to set
641 * replay handler and data early, so that if replay
642 * happens immediately after swabbing below, new reply
643 * is swabbed by that handler correctly.
645 mdc_set_open_replay_data(NULL, NULL, it);
648 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
651 mdc_update_max_ea_from_body(exp, body);
654 * The eadata is opaque; just check that it is there.
655 * Eventually, obd_unpackmd() will check the contents.
657 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
658 body->mbo_eadatasize);
662 /* save lvb data and length in case this is for layout
665 lvb_len = body->mbo_eadatasize;
668 * We save the reply LOV EA in case we have to replay a
669 * create for recovery. If we didn't allocate a large
670 * enough request buffer above we need to reallocate it
671 * here to hold the actual LOV EA.
673 * To not save LOV EA if request is not going to replay
674 * (for example error one).
676 if ((it->it_op & IT_OPEN) && req->rq_replay) {
677 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
678 body->mbo_eadatasize);
680 body->mbo_valid &= ~OBD_MD_FLEASIZE;
681 body->mbo_eadatasize = 0;
686 } else if (it->it_op & IT_LAYOUT) {
687 /* maybe the lock was granted right away and layout
688 * is packed into RMF_DLM_LVB of req */
689 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
690 CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
691 class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
693 lvb_data = req_capsule_server_sized_get(pill,
694 &RMF_DLM_LVB, lvb_len);
695 if (lvb_data == NULL)
699 * save replied layout data to the request buffer for
700 * recovery consideration (lest MDS reinitialize
701 * another set of OST objects).
704 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
709 /* fill in stripe data for layout lock.
710 * LU-6581: trust layout data only if layout lock is granted. The MDT
711 * has stopped sending layout unless the layout lock is granted. The
712 * client still does this checking in case it's talking with an old
713 * server. - Jinshan */
714 lock = ldlm_handle2lock(lockh);
718 if (ldlm_has_layout(lock) && lvb_data != NULL &&
719 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
722 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
723 ldlm_it2str(it->it_op), lvb_len);
725 OBD_ALLOC_LARGE(lmm, lvb_len);
727 GOTO(out_lock, rc = -ENOMEM);
729 memcpy(lmm, lvb_data, lvb_len);
731 /* install lvb_data */
732 lock_res_and_lock(lock);
733 if (lock->l_lvb_data == NULL) {
734 lock->l_lvb_type = LVB_T_LAYOUT;
735 lock->l_lvb_data = lmm;
736 lock->l_lvb_len = lvb_len;
739 unlock_res_and_lock(lock);
741 OBD_FREE_LARGE(lmm, lvb_len);
744 if (ldlm_has_dom(lock)) {
745 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
747 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
748 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
749 LDLM_ERROR(lock, "%s: DoM lock without size.\n",
750 exp->exp_obd->obd_name);
751 GOTO(out_lock, rc = -EPROTO);
754 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
755 ldlm_it2str(it->it_op), body->mbo_dom_size);
757 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
765 /* We always reserve enough space in the reply packet for a stripe MD, because
766 * we don't know in advance the file type. */
767 static int mdc_enqueue_base(struct obd_export *exp,
768 struct ldlm_enqueue_info *einfo,
769 const union ldlm_policy_data *policy,
770 struct lookup_intent *it,
771 struct md_op_data *op_data,
772 struct lustre_handle *lockh,
773 __u64 extra_lock_flags)
775 struct obd_device *obddev = class_exp2obd(exp);
776 struct ptlrpc_request *req = NULL;
777 __u64 flags, saved_flags = extra_lock_flags;
778 struct ldlm_res_id res_id;
779 static const union ldlm_policy_data lookup_policy = {
780 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
781 static const union ldlm_policy_data update_policy = {
782 .l_inodebits = { MDS_INODELOCK_UPDATE } };
783 static const union ldlm_policy_data layout_policy = {
784 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
785 static const union ldlm_policy_data getxattr_policy = {
786 .l_inodebits = { MDS_INODELOCK_XATTR } };
787 int generation, resends = 0;
788 struct ldlm_reply *lockrep;
789 struct obd_import *imp = class_exp2cliimp(exp);
791 enum lvb_type lvb_type = 0;
795 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
797 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
800 LASSERT(policy == NULL);
802 saved_flags |= LDLM_FL_HAS_INTENT;
803 if (it->it_op & (IT_GETATTR | IT_READDIR))
804 policy = &update_policy;
805 else if (it->it_op & IT_LAYOUT)
806 policy = &layout_policy;
807 else if (it->it_op & IT_GETXATTR)
808 policy = &getxattr_policy;
810 policy = &lookup_policy;
813 generation = obddev->u.cli.cl_import->imp_generation;
814 if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
815 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
818 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
823 /* The only way right now is FLOCK. */
824 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
826 res_id.name[3] = LDLM_FLOCK;
827 } else if (it->it_op & IT_OPEN) {
828 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
829 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
830 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
831 } else if (it->it_op & IT_READDIR) {
832 req = mdc_enqueue_pack(exp, 0);
833 } else if (it->it_op & IT_LAYOUT) {
834 if (!imp_connect_lvb_type(imp))
836 req = mdc_intent_layout_pack(exp, it, op_data);
837 lvb_type = LVB_T_LAYOUT;
838 } else if (it->it_op & IT_GETXATTR) {
839 req = mdc_intent_getxattr_pack(exp, it, op_data);
846 RETURN(PTR_ERR(req));
849 req->rq_generation_set = 1;
850 req->rq_import_generation = generation;
851 req->rq_sent = ktime_get_real_seconds() + resends;
854 /* It is important to obtain modify RPC slot first (if applicable), so
855 * that threads that are waiting for a modify RPC slot are not polluting
856 * our rpcs in flight counter.
857 * We do not do flock request limiting, though */
859 mdc_get_mod_rpc_slot(req, it);
860 rc = obd_get_request_slot(&obddev->u.cli);
862 mdc_put_mod_rpc_slot(req, it);
863 mdc_clear_replay_flag(req, 0);
864 ptlrpc_req_finished(req);
869 /* With Data-on-MDT the glimpse callback is needed too.
870 * It is set here in advance but not in mdc_finish_enqueue()
871 * to avoid possible races. It is safe to have glimpse handler
872 * for non-DOM locks and costs nothing.*/
873 if (einfo->ei_cb_gl == NULL)
874 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
876 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
877 0, lvb_type, lockh, 0);
879 /* For flock requests we immediatelly return without further
880 delay and let caller deal with the rest, since rest of
881 this function metadata processing makes no sense for flock
882 requests anyway. But in case of problem during comms with
883 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
884 can not rely on caller and this mainly for F_UNLCKs
885 (explicits or automatically generated by Kernel to clean
886 current FLocks upon exit) that can't be trashed */
887 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
888 (einfo->ei_type == LDLM_FLOCK) &&
889 (einfo->ei_mode == LCK_NL))
894 obd_put_request_slot(&obddev->u.cli);
895 mdc_put_mod_rpc_slot(req, it);
899 "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
900 obddev->obd_name, PFID(&op_data->op_fid1),
901 PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
903 mdc_clear_replay_flag(req, rc);
904 ptlrpc_req_finished(req);
908 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
909 LASSERT(lockrep != NULL);
911 lockrep->lock_policy_res2 =
912 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
914 /* Retry infinitely when the server returns -EINPROGRESS for the
915 * intent operation, when server returns -EINPROGRESS for acquiring
916 * intent lock, we'll retry in after_reply(). */
917 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
918 mdc_clear_replay_flag(req, rc);
919 ptlrpc_req_finished(req);
920 if (generation == obddev->u.cli.cl_import->imp_generation) {
921 if (signal_pending(current))
925 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
926 obddev->obd_name, resends, it->it_op,
927 PFID(&op_data->op_fid1),
928 PFID(&op_data->op_fid2));
931 CDEBUG(D_HA, "resend cross eviction\n");
936 if ((int)lockrep->lock_policy_res2 == -ERANGE &&
937 it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
938 acl_bufsize == LUSTRE_POSIX_ACL_MAX_SIZE_OLD) {
939 mdc_clear_replay_flag(req, -ERANGE);
940 ptlrpc_req_finished(req);
941 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
946 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
948 if (lustre_handle_is_used(lockh)) {
949 ldlm_lock_decref(lockh, einfo->ei_mode);
950 memset(lockh, 0, sizeof(*lockh));
952 ptlrpc_req_finished(req);
954 it->it_lock_handle = 0;
955 it->it_lock_mode = 0;
956 it->it_request = NULL;
962 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
963 const union ldlm_policy_data *policy,
964 struct md_op_data *op_data,
965 struct lustre_handle *lockh, __u64 extra_lock_flags)
967 return mdc_enqueue_base(exp, einfo, policy, NULL,
968 op_data, lockh, extra_lock_flags);
971 static int mdc_finish_intent_lock(struct obd_export *exp,
972 struct ptlrpc_request *request,
973 struct md_op_data *op_data,
974 struct lookup_intent *it,
975 struct lustre_handle *lockh)
977 struct lustre_handle old_lock;
978 struct ldlm_lock *lock;
982 LASSERT(request != NULL);
983 LASSERT(request != LP_POISON);
984 LASSERT(request->rq_repmsg != LP_POISON);
986 if (it->it_op & IT_READDIR)
989 if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
990 if (it->it_status != 0)
991 GOTO(out, rc = it->it_status);
993 if (!it_disposition(it, DISP_IT_EXECD)) {
994 /* The server failed before it even started executing
995 * the intent, i.e. because it couldn't unpack the
998 LASSERT(it->it_status != 0);
999 GOTO(out, rc = it->it_status);
1001 rc = it_open_error(DISP_IT_EXECD, it);
1005 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1009 /* keep requests around for the multiple phases of the call
1010 * this shows the DISP_XX must guarantee we make it into the
1013 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1014 it_disposition(it, DISP_OPEN_CREATE) &&
1015 !it_open_error(DISP_OPEN_CREATE, it)) {
1016 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1017 /* balanced in ll_create_node */
1018 ptlrpc_request_addref(request);
1020 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1021 it_disposition(it, DISP_OPEN_OPEN) &&
1022 !it_open_error(DISP_OPEN_OPEN, it)) {
1023 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1024 /* balanced in ll_file_open */
1025 ptlrpc_request_addref(request);
1026 /* BUG 11546 - eviction in the middle of open rpc
1029 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1033 if (it->it_op & IT_CREAT) {
1034 /* XXX this belongs in ll_create_it */
1035 } else if (it->it_op == IT_OPEN) {
1036 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1038 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1042 /* If we already have a matching lock, then cancel the new
1043 * one. We have to set the data here instead of in
1044 * mdc_enqueue, because we need to use the child's inode as
1045 * the l_ast_data to match, and that's not available until
1046 * intent_finish has performed the iget().) */
1047 lock = ldlm_handle2lock(lockh);
1049 union ldlm_policy_data policy = lock->l_policy_data;
1050 LDLM_DEBUG(lock, "matching against this");
1052 if (it_has_reply_body(it)) {
1053 struct mdt_body *body;
1055 body = req_capsule_server_get(&request->rq_pill,
1057 /* mdc_enqueue checked */
1058 LASSERT(body != NULL);
1059 LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1060 &lock->l_resource->lr_name),
1061 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1062 PLDLMRES(lock->l_resource),
1063 PFID(&body->mbo_fid1));
1065 LDLM_LOCK_PUT(lock);
1067 memcpy(&old_lock, lockh, sizeof(*lockh));
1068 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1069 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1070 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1071 memcpy(lockh, &old_lock, sizeof(old_lock));
1072 it->it_lock_handle = lockh->cookie;
1078 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1079 (int)op_data->op_namelen, op_data->op_name,
1080 ldlm_it2str(it->it_op), it->it_status,
1081 it->it_disposition, rc);
1085 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1086 struct lu_fid *fid, __u64 *bits)
1088 /* We could just return 1 immediately, but since we should only
1089 * be called in revalidate_it if we already have a lock, let's
1091 struct ldlm_res_id res_id;
1092 struct lustre_handle lockh;
1093 union ldlm_policy_data policy;
1094 enum ldlm_mode mode;
1097 if (it->it_lock_handle) {
1098 lockh.cookie = it->it_lock_handle;
1099 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1101 fid_build_reg_res_name(fid, &res_id);
1102 switch (it->it_op) {
1104 /* File attributes are held under multiple bits:
1105 * nlink is under lookup lock, size and times are
1106 * under UPDATE lock and recently we've also got
1107 * a separate permissions lock for owner/group/acl that
1108 * were protected by lookup lock before.
1109 * Getattr must provide all of that information,
1110 * so we need to ensure we have all of those locks.
1111 * Unfortunately, if the bits are split across multiple
1112 * locks, there's no easy way to match all of them here,
1113 * so an extra RPC would be performed to fetch all
1114 * of those bits at once for now. */
1115 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1116 * but for old MDTs (< 2.4), permission is covered
1117 * by LOOKUP lock, so it needs to match all bits here.*/
1118 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1119 MDS_INODELOCK_LOOKUP |
1123 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1126 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1129 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1133 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1134 LDLM_IBITS, &policy,
1135 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1140 it->it_lock_handle = lockh.cookie;
1141 it->it_lock_mode = mode;
1143 it->it_lock_handle = 0;
1144 it->it_lock_mode = 0;
1151 * This long block is all about fixing up the lock and request state
1152 * so that it is correct as of the moment _before_ the operation was
1153 * applied; that way, the VFS will think that everything is normal and
1154 * call Lustre's regular VFS methods.
1156 * If we're performing a creation, that means that unless the creation
1157 * failed with EEXIST, we should fake up a negative dentry.
1159 * For everything else, we want to lookup to succeed.
1161 * One additional note: if CREATE or OPEN succeeded, we add an extra
1162 * reference to the request because we need to keep it around until
1163 * ll_create/ll_open gets called.
1165 * The server will return to us, in it_disposition, an indication of
1166 * exactly what it_status refers to.
1168 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1169 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1170 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1171 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1174 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1177 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1178 struct lookup_intent *it, struct ptlrpc_request **reqp,
1179 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1181 struct ldlm_enqueue_info einfo = {
1182 .ei_type = LDLM_IBITS,
1183 .ei_mode = it_to_lock_mode(it),
1184 .ei_cb_bl = cb_blocking,
1185 .ei_cb_cp = ldlm_completion_ast,
1186 .ei_cb_gl = mdc_ldlm_glimpse_ast,
1188 struct lustre_handle lockh;
1193 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1194 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1195 op_data->op_name, PFID(&op_data->op_fid2),
1196 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1200 if (fid_is_sane(&op_data->op_fid2) &&
1201 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1202 /* We could just return 1 immediately, but since we should only
1203 * be called in revalidate_it if we already have a lock, let's
1205 it->it_lock_handle = 0;
1206 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1207 /* Only return failure if it was not GETATTR by cfid
1208 (from inode_revalidate) */
1209 if (rc || op_data->op_namelen != 0)
1213 /* For case if upper layer did not alloc fid, do it now. */
1214 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1215 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1217 CERROR("Can't alloc new fid, rc %d\n", rc);
1222 rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1227 *reqp = it->it_request;
1228 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1232 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1233 struct ptlrpc_request *req,
1236 struct mdc_getattr_args *ga = args;
1237 struct obd_export *exp = ga->ga_exp;
1238 struct md_enqueue_info *minfo = ga->ga_minfo;
1239 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1240 struct lookup_intent *it;
1241 struct lustre_handle *lockh;
1242 struct obd_device *obddev;
1243 struct ldlm_reply *lockrep;
1244 __u64 flags = LDLM_FL_HAS_INTENT;
1248 lockh = &minfo->mi_lockh;
1250 obddev = class_exp2obd(exp);
1252 obd_put_request_slot(&obddev->u.cli);
1253 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1256 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1257 &flags, NULL, 0, lockh, rc);
1259 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1260 mdc_clear_replay_flag(req, rc);
1264 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1265 LASSERT(lockrep != NULL);
1267 lockrep->lock_policy_res2 =
1268 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1270 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1274 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1278 minfo->mi_cb(req, minfo, rc);
1282 int mdc_intent_getattr_async(struct obd_export *exp,
1283 struct md_enqueue_info *minfo)
1285 struct md_op_data *op_data = &minfo->mi_data;
1286 struct lookup_intent *it = &minfo->mi_it;
1287 struct ptlrpc_request *req;
1288 struct mdc_getattr_args *ga;
1289 struct obd_device *obddev = class_exp2obd(exp);
1290 struct ldlm_res_id res_id;
1291 union ldlm_policy_data policy = {
1292 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1293 MDS_INODELOCK_UPDATE } };
1295 __u64 flags = LDLM_FL_HAS_INTENT;
1298 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1299 (int)op_data->op_namelen, op_data->op_name,
1300 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1302 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1303 /* If the MDT return -ERANGE because of large ACL, then the sponsor
1304 * of the async getattr RPC will handle that by itself. */
1305 req = mdc_intent_getattr_pack(exp, it, op_data,
1306 LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1308 RETURN(PTR_ERR(req));
1310 rc = obd_get_request_slot(&obddev->u.cli);
1312 ptlrpc_req_finished(req);
1316 /* With Data-on-MDT the glimpse callback is needed too.
1317 * It is set here in advance but not in mdc_finish_enqueue()
1318 * to avoid possible races. It is safe to have glimpse handler
1319 * for non-DOM locks and costs nothing.*/
1320 if (minfo->mi_einfo.ei_cb_gl == NULL)
1321 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1323 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1324 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1326 obd_put_request_slot(&obddev->u.cli);
1327 ptlrpc_req_finished(req);
1331 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1332 ga = ptlrpc_req_async_args(req);
1334 ga->ga_minfo = minfo;
1336 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1337 ptlrpcd_add_req(req);