4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_MDC
35 #include <linux/module.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
48 #include "mdc_internal.h"
50 struct mdc_getattr_args {
51 struct obd_export *ga_exp;
52 struct md_enqueue_info *ga_minfo;
55 int it_open_error(int phase, struct lookup_intent *it)
57 if (it_disposition(it, DISP_OPEN_LEASE)) {
58 if (phase >= DISP_OPEN_LEASE)
63 if (it_disposition(it, DISP_OPEN_OPEN)) {
64 if (phase >= DISP_OPEN_OPEN)
70 if (it_disposition(it, DISP_OPEN_CREATE)) {
71 if (phase >= DISP_OPEN_CREATE)
77 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78 if (phase >= DISP_LOOKUP_EXECD)
84 if (it_disposition(it, DISP_IT_EXECD)) {
85 if (phase >= DISP_IT_EXECD)
91 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
96 EXPORT_SYMBOL(it_open_error);
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100 void *data, __u64 *bits)
102 struct ldlm_lock *lock;
103 struct inode *new_inode = data;
109 if (!lustre_handle_is_used(lockh))
112 lock = ldlm_handle2lock(lockh);
114 LASSERT(lock != NULL);
115 lock_res_and_lock(lock);
116 if (lock->l_resource->lr_lvb_inode &&
117 lock->l_resource->lr_lvb_inode != data) {
118 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119 LASSERTF(old_inode->i_state & I_FREEING,
120 "Found existing inode %p/%lu/%u state %lu in lock: "
121 "setting data to %p/%lu/%u\n", old_inode,
122 old_inode->i_ino, old_inode->i_generation,
124 new_inode, new_inode->i_ino, new_inode->i_generation);
126 lock->l_resource->lr_lvb_inode = new_inode;
128 *bits = lock->l_policy_data.l_inodebits.bits;
130 unlock_res_and_lock(lock);
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137 const struct lu_fid *fid, enum ldlm_type type,
138 union ldlm_policy_data *policy,
139 enum ldlm_mode mode, struct lustre_handle *lockh)
141 struct ldlm_res_id res_id;
145 fid_build_reg_res_name(fid, &res_id);
146 /* LU-4405: Clear bits not supported by server */
147 policy->l_inodebits.bits &= exp_connect_ibits(exp);
148 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149 &res_id, type, policy, mode, lockh, 0);
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154 union ldlm_policy_data *policy, enum ldlm_mode mode,
155 enum ldlm_cancel_flags flags, void *opaque)
157 struct obd_device *obd = class_exp2obd(exp);
158 struct ldlm_res_id res_id;
163 fid_build_reg_res_name(fid, &res_id);
164 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165 policy, mode, flags, opaque);
169 int mdc_null_inode(struct obd_export *exp,
170 const struct lu_fid *fid)
172 struct ldlm_res_id res_id;
173 struct ldlm_resource *res;
174 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
177 LASSERTF(ns != NULL, "no namespace passed\n");
179 fid_build_reg_res_name(fid, &res_id);
181 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
186 res->lr_lvb_inode = NULL;
189 ldlm_resource_putref(res);
193 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
195 /* Don't hold error requests for replay. */
196 if (req->rq_replay) {
197 spin_lock(&req->rq_lock);
199 spin_unlock(&req->rq_lock);
201 if (rc && req->rq_transno != 0) {
202 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
207 /* Save a large LOV EA into the request buffer so that it is available
208 * for replay. We don't do this in the initial request because the
209 * original request doesn't need this buffer (at most it sends just the
210 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211 * buffer and may also be difficult to allocate and save a very large
212 * request buffer for each open. (bug 5707)
214 * OOM here may cause recovery failure if lmm is needed (only for the
215 * original open if the MDS crashed just when this client also OOM'd)
216 * but this is incredibly unlikely, and questionable whether the client
217 * could do MDS recovery under OOM anyways... */
218 int mdc_save_lovea(struct ptlrpc_request *req,
219 const struct req_msg_field *field,
220 void *data, u32 size)
222 struct req_capsule *pill = &req->rq_pill;
226 if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
227 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
229 CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
230 req->rq_export->exp_obd->obd_name,
235 req_capsule_shrink(pill, field, size, RCL_CLIENT);
238 req_capsule_set_size(pill, field, RCL_CLIENT, size);
239 lmm = req_capsule_client_get(pill, field);
241 memcpy(lmm, data, size);
246 static struct ptlrpc_request *
247 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
248 struct md_op_data *op_data, __u32 acl_bufsize)
250 struct ptlrpc_request *req;
251 struct obd_device *obddev = class_exp2obd(exp);
252 struct ldlm_intent *lit;
253 const void *lmm = op_data->op_data;
254 __u32 lmmsize = op_data->op_data_size;
255 struct list_head cancels = LIST_HEAD_INIT(cancels);
263 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
265 /* XXX: openlock is not cancelled for cross-refs. */
266 /* If inode is known, cancel conflicting OPEN locks. */
267 if (fid_is_sane(&op_data->op_fid2)) {
268 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
269 if (it->it_flags & MDS_FMODE_WRITE)
274 if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
277 else if (it->it_flags & FMODE_EXEC)
283 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
288 /* If CREATE, cancel parent's UPDATE lock. */
289 if (it->it_op & IT_CREAT)
293 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
295 MDS_INODELOCK_UPDATE);
297 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
298 &RQF_LDLM_INTENT_OPEN);
300 ldlm_lock_list_put(&cancels, l_bl_ast, count);
301 RETURN(ERR_PTR(-ENOMEM));
304 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
305 op_data->op_namelen + 1);
306 if (cl_is_lov_delay_create(it->it_flags)) {
307 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
308 LASSERT(lmmsize == 0);
309 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
311 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
312 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
315 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
316 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
317 strlen(op_data->op_file_secctx_name) + 1 : 0);
319 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
320 op_data->op_file_secctx_size);
322 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
324 ptlrpc_request_free(req);
328 spin_lock(&req->rq_lock);
329 req->rq_replay = req->rq_import->imp_replayable;
330 spin_unlock(&req->rq_lock);
332 /* pack the intent */
333 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
334 lit->opc = (__u64)it->it_op;
336 /* pack the intended request */
337 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
340 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
341 obddev->u.cli.cl_max_mds_easize);
342 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
345 * Inline buffer for possible data from Data-on-MDT files.
347 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
348 sizeof(struct niobuf_remote));
349 ptlrpc_request_set_replen(req);
351 /* Get real repbuf allocated size as rounded up power of 2 */
352 repsize = size_roundup_power2(req->rq_replen +
353 lustre_msg_early_size());
355 /* Estimate free space for DoM files in repbuf */
356 repsize -= req->rq_replen - obddev->u.cli.cl_max_mds_easize +
357 sizeof(struct lov_comp_md_v1) +
358 sizeof(struct lov_comp_md_entry_v1) +
359 lov_mds_md_size(0, LOV_MAGIC_V3);
361 if (repsize < obddev->u.cli.cl_dom_min_inline_repsize) {
362 repsize = obddev->u.cli.cl_dom_min_inline_repsize - repsize;
363 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
365 sizeof(struct niobuf_remote) + repsize);
366 ptlrpc_request_set_replen(req);
367 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
368 repsize, req->rq_replen);
373 #define GA_DEFAULT_EA_NAME_LEN 20
374 #define GA_DEFAULT_EA_VAL_LEN 250
375 #define GA_DEFAULT_EA_NUM 10
377 static struct ptlrpc_request *
378 mdc_intent_getxattr_pack(struct obd_export *exp,
379 struct lookup_intent *it,
380 struct md_op_data *op_data)
382 struct ptlrpc_request *req;
383 struct ldlm_intent *lit;
385 struct list_head cancels = LIST_HEAD_INIT(cancels);
386 u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM;
390 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
391 &RQF_LDLM_INTENT_GETXATTR);
393 RETURN(ERR_PTR(-ENOMEM));
395 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
397 ptlrpc_request_free(req);
401 /* pack the intent */
402 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
403 lit->opc = IT_GETXATTR;
405 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
406 /* If the supplied buffer is too small then the server will
407 * return -ERANGE and llite will fallback to using non cached
408 * xattr operations. On servers before 2.10.1 a (non-cached)
409 * listxattr RPC for an orphan or dead file causes an oops. So
410 * let's try to avoid sending too small a buffer to too old a
411 * server. This is effectively undoing the memory conservation
412 * of LU-9417 when it would be *more* likely to crash the
413 * server. See LU-9856. */
414 if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
415 ea_vals_buf_size = max_t(u32, ea_vals_buf_size,
416 exp->exp_connect_data.ocd_max_easize);
419 /* pack the intended request */
420 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
421 ea_vals_buf_size, -1, 0);
423 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
424 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
426 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
429 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
430 sizeof(u32) * GA_DEFAULT_EA_NUM);
432 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
434 ptlrpc_request_set_replen(req);
439 static struct ptlrpc_request *
440 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
441 struct md_op_data *op_data, __u32 acl_bufsize)
443 struct ptlrpc_request *req;
444 struct obd_device *obddev = class_exp2obd(exp);
445 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
446 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
447 OBD_MD_MEA | OBD_MD_FLACL;
448 struct ldlm_intent *lit;
453 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
454 &RQF_LDLM_INTENT_GETATTR);
456 RETURN(ERR_PTR(-ENOMEM));
458 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
459 op_data->op_namelen + 1);
461 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
463 ptlrpc_request_free(req);
467 /* pack the intent */
468 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
469 lit->opc = (__u64)it->it_op;
471 if (obddev->u.cli.cl_default_mds_easize > 0)
472 easize = obddev->u.cli.cl_default_mds_easize;
474 easize = obddev->u.cli.cl_max_mds_easize;
476 /* pack the intended request */
477 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
479 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
480 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
481 ptlrpc_request_set_replen(req);
485 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
486 struct lookup_intent *it,
487 struct md_op_data *op_data)
489 struct obd_device *obd = class_exp2obd(exp);
490 struct ptlrpc_request *req;
491 struct ldlm_intent *lit;
492 struct layout_intent *layout;
496 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
497 &RQF_LDLM_INTENT_LAYOUT);
499 RETURN(ERR_PTR(-ENOMEM));
501 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
502 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
504 ptlrpc_request_free(req);
508 /* pack the intent */
509 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
510 lit->opc = (__u64)it->it_op;
512 /* pack the layout intent request */
513 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
514 LASSERT(op_data->op_data != NULL);
515 LASSERT(op_data->op_data_size == sizeof(*layout));
516 memcpy(layout, op_data->op_data, sizeof(*layout));
518 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
519 obd->u.cli.cl_default_mds_easize);
520 ptlrpc_request_set_replen(req);
524 static struct ptlrpc_request *
525 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
527 struct ptlrpc_request *req;
531 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
533 RETURN(ERR_PTR(-ENOMEM));
535 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
537 ptlrpc_request_free(req);
541 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
542 ptlrpc_request_set_replen(req);
546 static int mdc_finish_enqueue(struct obd_export *exp,
547 struct ptlrpc_request *req,
548 struct ldlm_enqueue_info *einfo,
549 struct lookup_intent *it,
550 struct lustre_handle *lockh,
553 struct req_capsule *pill = &req->rq_pill;
554 struct ldlm_request *lockreq;
555 struct ldlm_reply *lockrep;
556 struct ldlm_lock *lock;
557 struct mdt_body *body = NULL;
558 void *lvb_data = NULL;
564 /* Similarly, if we're going to replay this request, we don't want to
565 * actually get a lock, just perform the intent. */
566 if (req->rq_transno || req->rq_replay) {
567 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
568 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
571 if (rc == ELDLM_LOCK_ABORTED) {
573 memset(lockh, 0, sizeof(*lockh));
575 } else { /* rc = 0 */
576 lock = ldlm_handle2lock(lockh);
577 LASSERT(lock != NULL);
579 /* If the server gave us back a different lock mode, we should
580 * fix up our variables. */
581 if (lock->l_req_mode != einfo->ei_mode) {
582 ldlm_lock_addref(lockh, lock->l_req_mode);
583 ldlm_lock_decref(lockh, einfo->ei_mode);
584 einfo->ei_mode = lock->l_req_mode;
589 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
590 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
592 it->it_disposition = (int)lockrep->lock_policy_res1;
593 it->it_status = (int)lockrep->lock_policy_res2;
594 it->it_lock_mode = einfo->ei_mode;
595 it->it_lock_handle = lockh->cookie;
596 it->it_request = req;
598 /* Technically speaking rq_transno must already be zero if
599 * it_status is in error, so the check is a bit redundant */
600 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
601 mdc_clear_replay_flag(req, it->it_status);
603 /* If we're doing an IT_OPEN which did not result in an actual
604 * successful open, then we need to remove the bit which saves
605 * this request for unconditional replay.
607 * It's important that we do this first! Otherwise we might exit the
608 * function without doing so, and try to replay a failed create
610 if (it->it_op & IT_OPEN && req->rq_replay &&
611 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
612 mdc_clear_replay_flag(req, it->it_status);
614 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
615 it->it_op, it->it_disposition, it->it_status);
617 /* We know what to expect, so we do any byte flipping required here */
618 if (it_has_reply_body(it)) {
619 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
621 CERROR ("Can't swab mdt_body\n");
625 if (it_disposition(it, DISP_OPEN_OPEN) &&
626 !it_open_error(DISP_OPEN_OPEN, it)) {
628 * If this is a successful OPEN request, we need to set
629 * replay handler and data early, so that if replay
630 * happens immediately after swabbing below, new reply
631 * is swabbed by that handler correctly.
633 mdc_set_open_replay_data(NULL, NULL, it);
636 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
639 mdc_update_max_ea_from_body(exp, body);
642 * The eadata is opaque; just check that it is there.
643 * Eventually, obd_unpackmd() will check the contents.
645 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
646 body->mbo_eadatasize);
650 /* save lvb data and length in case this is for layout
653 lvb_len = body->mbo_eadatasize;
656 * We save the reply LOV EA in case we have to replay a
657 * create for recovery. If we didn't allocate a large
658 * enough request buffer above we need to reallocate it
659 * here to hold the actual LOV EA.
661 * To not save LOV EA if request is not going to replay
662 * (for example error one).
664 if ((it->it_op & IT_OPEN) && req->rq_replay) {
665 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
666 body->mbo_eadatasize);
668 body->mbo_valid &= ~OBD_MD_FLEASIZE;
669 body->mbo_eadatasize = 0;
674 } else if (it->it_op & IT_LAYOUT) {
675 /* maybe the lock was granted right away and layout
676 * is packed into RMF_DLM_LVB of req */
677 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
679 lvb_data = req_capsule_server_sized_get(pill,
680 &RMF_DLM_LVB, lvb_len);
681 if (lvb_data == NULL)
685 * save replied layout data to the request buffer for
686 * recovery consideration (lest MDS reinitialize
687 * another set of OST objects).
690 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
695 /* fill in stripe data for layout lock.
696 * LU-6581: trust layout data only if layout lock is granted. The MDT
697 * has stopped sending layout unless the layout lock is granted. The
698 * client still does this checking in case it's talking with an old
699 * server. - Jinshan */
700 lock = ldlm_handle2lock(lockh);
704 if (ldlm_has_layout(lock) && lvb_data != NULL &&
705 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
708 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
709 ldlm_it2str(it->it_op), lvb_len);
711 OBD_ALLOC_LARGE(lmm, lvb_len);
713 GOTO(out_lock, rc = -ENOMEM);
715 memcpy(lmm, lvb_data, lvb_len);
717 /* install lvb_data */
718 lock_res_and_lock(lock);
719 if (lock->l_lvb_data == NULL) {
720 lock->l_lvb_type = LVB_T_LAYOUT;
721 lock->l_lvb_data = lmm;
722 lock->l_lvb_len = lvb_len;
725 unlock_res_and_lock(lock);
727 OBD_FREE_LARGE(lmm, lvb_len);
730 if (ldlm_has_dom(lock)) {
731 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
733 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
734 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
735 LDLM_ERROR(lock, "%s: DoM lock without size.\n",
736 exp->exp_obd->obd_name);
737 GOTO(out_lock, rc = -EPROTO);
740 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
741 ldlm_it2str(it->it_op), body->mbo_dom_size);
743 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
751 /* We always reserve enough space in the reply packet for a stripe MD, because
752 * we don't know in advance the file type. */
753 static int mdc_enqueue_base(struct obd_export *exp,
754 struct ldlm_enqueue_info *einfo,
755 const union ldlm_policy_data *policy,
756 struct lookup_intent *it,
757 struct md_op_data *op_data,
758 struct lustre_handle *lockh,
759 __u64 extra_lock_flags)
761 struct obd_device *obddev = class_exp2obd(exp);
762 struct ptlrpc_request *req = NULL;
763 __u64 flags, saved_flags = extra_lock_flags;
764 struct ldlm_res_id res_id;
765 static const union ldlm_policy_data lookup_policy = {
766 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
767 static const union ldlm_policy_data update_policy = {
768 .l_inodebits = { MDS_INODELOCK_UPDATE } };
769 static const union ldlm_policy_data layout_policy = {
770 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
771 static const union ldlm_policy_data getxattr_policy = {
772 .l_inodebits = { MDS_INODELOCK_XATTR } };
773 int generation, resends = 0;
774 struct ldlm_reply *lockrep;
775 struct obd_import *imp = class_exp2cliimp(exp);
777 enum lvb_type lvb_type = 0;
781 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
783 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
786 LASSERT(policy == NULL);
788 saved_flags |= LDLM_FL_HAS_INTENT;
789 if (it->it_op & (IT_GETATTR | IT_READDIR))
790 policy = &update_policy;
791 else if (it->it_op & IT_LAYOUT)
792 policy = &layout_policy;
793 else if (it->it_op & IT_GETXATTR)
794 policy = &getxattr_policy;
796 policy = &lookup_policy;
799 generation = obddev->u.cli.cl_import->imp_generation;
800 if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
801 acl_bufsize = imp->imp_connect_data.ocd_max_easize;
803 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
808 /* The only way right now is FLOCK. */
809 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
811 res_id.name[3] = LDLM_FLOCK;
812 } else if (it->it_op & IT_OPEN) {
813 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
814 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
815 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
816 } else if (it->it_op & IT_READDIR) {
817 req = mdc_enqueue_pack(exp, 0);
818 } else if (it->it_op & IT_LAYOUT) {
819 if (!imp_connect_lvb_type(imp))
821 req = mdc_intent_layout_pack(exp, it, op_data);
822 lvb_type = LVB_T_LAYOUT;
823 } else if (it->it_op & IT_GETXATTR) {
824 req = mdc_intent_getxattr_pack(exp, it, op_data);
831 RETURN(PTR_ERR(req));
834 req->rq_generation_set = 1;
835 req->rq_import_generation = generation;
836 req->rq_sent = ktime_get_real_seconds() + resends;
839 /* It is important to obtain modify RPC slot first (if applicable), so
840 * that threads that are waiting for a modify RPC slot are not polluting
841 * our rpcs in flight counter.
842 * We do not do flock request limiting, though */
844 mdc_get_mod_rpc_slot(req, it);
845 rc = obd_get_request_slot(&obddev->u.cli);
847 mdc_put_mod_rpc_slot(req, it);
848 mdc_clear_replay_flag(req, 0);
849 ptlrpc_req_finished(req);
854 /* With Data-on-MDT the glimpse callback is needed too.
855 * It is set here in advance but not in mdc_finish_enqueue()
856 * to avoid possible races. It is safe to have glimpse handler
857 * for non-DOM locks and costs nothing.*/
858 if (einfo->ei_cb_gl == NULL)
859 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
861 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
862 0, lvb_type, lockh, 0);
864 /* For flock requests we immediatelly return without further
865 delay and let caller deal with the rest, since rest of
866 this function metadata processing makes no sense for flock
867 requests anyway. But in case of problem during comms with
868 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
869 can not rely on caller and this mainly for F_UNLCKs
870 (explicits or automatically generated by Kernel to clean
871 current FLocks upon exit) that can't be trashed */
872 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
873 (einfo->ei_type == LDLM_FLOCK) &&
874 (einfo->ei_mode == LCK_NL))
879 obd_put_request_slot(&obddev->u.cli);
880 mdc_put_mod_rpc_slot(req, it);
884 "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
885 obddev->obd_name, PFID(&op_data->op_fid1),
886 PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
888 mdc_clear_replay_flag(req, rc);
889 ptlrpc_req_finished(req);
893 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
894 LASSERT(lockrep != NULL);
896 lockrep->lock_policy_res2 =
897 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
899 /* Retry infinitely when the server returns -EINPROGRESS for the
900 * intent operation, when server returns -EINPROGRESS for acquiring
901 * intent lock, we'll retry in after_reply(). */
902 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
903 mdc_clear_replay_flag(req, rc);
904 ptlrpc_req_finished(req);
905 if (generation == obddev->u.cli.cl_import->imp_generation) {
906 if (signal_pending(current))
910 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
911 obddev->obd_name, resends, it->it_op,
912 PFID(&op_data->op_fid1),
913 PFID(&op_data->op_fid2));
916 CDEBUG(D_HA, "resend cross eviction\n");
921 if ((int)lockrep->lock_policy_res2 == -ERANGE &&
922 it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
923 acl_bufsize != imp->imp_connect_data.ocd_max_easize) {
924 mdc_clear_replay_flag(req, -ERANGE);
925 ptlrpc_req_finished(req);
926 acl_bufsize = imp->imp_connect_data.ocd_max_easize;
930 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
932 if (lustre_handle_is_used(lockh)) {
933 ldlm_lock_decref(lockh, einfo->ei_mode);
934 memset(lockh, 0, sizeof(*lockh));
936 ptlrpc_req_finished(req);
938 it->it_lock_handle = 0;
939 it->it_lock_mode = 0;
940 it->it_request = NULL;
946 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
947 const union ldlm_policy_data *policy,
948 struct md_op_data *op_data,
949 struct lustre_handle *lockh, __u64 extra_lock_flags)
951 return mdc_enqueue_base(exp, einfo, policy, NULL,
952 op_data, lockh, extra_lock_flags);
955 static int mdc_finish_intent_lock(struct obd_export *exp,
956 struct ptlrpc_request *request,
957 struct md_op_data *op_data,
958 struct lookup_intent *it,
959 struct lustre_handle *lockh)
961 struct lustre_handle old_lock;
962 struct ldlm_lock *lock;
966 LASSERT(request != NULL);
967 LASSERT(request != LP_POISON);
968 LASSERT(request->rq_repmsg != LP_POISON);
970 if (it->it_op & IT_READDIR)
973 if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
974 if (it->it_status != 0)
975 GOTO(out, rc = it->it_status);
977 if (!it_disposition(it, DISP_IT_EXECD)) {
978 /* The server failed before it even started executing
979 * the intent, i.e. because it couldn't unpack the
982 LASSERT(it->it_status != 0);
983 GOTO(out, rc = it->it_status);
985 rc = it_open_error(DISP_IT_EXECD, it);
989 rc = it_open_error(DISP_LOOKUP_EXECD, it);
993 /* keep requests around for the multiple phases of the call
994 * this shows the DISP_XX must guarantee we make it into the
997 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
998 it_disposition(it, DISP_OPEN_CREATE) &&
999 !it_open_error(DISP_OPEN_CREATE, it)) {
1000 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1001 /* balanced in ll_create_node */
1002 ptlrpc_request_addref(request);
1004 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1005 it_disposition(it, DISP_OPEN_OPEN) &&
1006 !it_open_error(DISP_OPEN_OPEN, it)) {
1007 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1008 /* balanced in ll_file_open */
1009 ptlrpc_request_addref(request);
1010 /* BUG 11546 - eviction in the middle of open rpc
1013 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1017 if (it->it_op & IT_CREAT) {
1018 /* XXX this belongs in ll_create_it */
1019 } else if (it->it_op == IT_OPEN) {
1020 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1022 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1026 /* If we already have a matching lock, then cancel the new
1027 * one. We have to set the data here instead of in
1028 * mdc_enqueue, because we need to use the child's inode as
1029 * the l_ast_data to match, and that's not available until
1030 * intent_finish has performed the iget().) */
1031 lock = ldlm_handle2lock(lockh);
1033 union ldlm_policy_data policy = lock->l_policy_data;
1034 LDLM_DEBUG(lock, "matching against this");
1036 if (it_has_reply_body(it)) {
1037 struct mdt_body *body;
1039 body = req_capsule_server_get(&request->rq_pill,
1041 /* mdc_enqueue checked */
1042 LASSERT(body != NULL);
1043 LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1044 &lock->l_resource->lr_name),
1045 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1046 PLDLMRES(lock->l_resource),
1047 PFID(&body->mbo_fid1));
1049 LDLM_LOCK_PUT(lock);
1051 memcpy(&old_lock, lockh, sizeof(*lockh));
1052 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1053 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1054 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1055 memcpy(lockh, &old_lock, sizeof(old_lock));
1056 it->it_lock_handle = lockh->cookie;
1062 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1063 (int)op_data->op_namelen, op_data->op_name,
1064 ldlm_it2str(it->it_op), it->it_status,
1065 it->it_disposition, rc);
1069 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1070 struct lu_fid *fid, __u64 *bits)
1072 /* We could just return 1 immediately, but since we should only
1073 * be called in revalidate_it if we already have a lock, let's
1075 struct ldlm_res_id res_id;
1076 struct lustre_handle lockh;
1077 union ldlm_policy_data policy;
1078 enum ldlm_mode mode;
1081 if (it->it_lock_handle) {
1082 lockh.cookie = it->it_lock_handle;
1083 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1085 fid_build_reg_res_name(fid, &res_id);
1086 switch (it->it_op) {
1088 /* File attributes are held under multiple bits:
1089 * nlink is under lookup lock, size and times are
1090 * under UPDATE lock and recently we've also got
1091 * a separate permissions lock for owner/group/acl that
1092 * were protected by lookup lock before.
1093 * Getattr must provide all of that information,
1094 * so we need to ensure we have all of those locks.
1095 * Unfortunately, if the bits are split across multiple
1096 * locks, there's no easy way to match all of them here,
1097 * so an extra RPC would be performed to fetch all
1098 * of those bits at once for now. */
1099 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1100 * but for old MDTs (< 2.4), permission is covered
1101 * by LOOKUP lock, so it needs to match all bits here.*/
1102 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1103 MDS_INODELOCK_LOOKUP |
1107 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1110 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1113 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1117 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1118 LDLM_IBITS, &policy,
1119 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1124 it->it_lock_handle = lockh.cookie;
1125 it->it_lock_mode = mode;
1127 it->it_lock_handle = 0;
1128 it->it_lock_mode = 0;
1135 * This long block is all about fixing up the lock and request state
1136 * so that it is correct as of the moment _before_ the operation was
1137 * applied; that way, the VFS will think that everything is normal and
1138 * call Lustre's regular VFS methods.
1140 * If we're performing a creation, that means that unless the creation
1141 * failed with EEXIST, we should fake up a negative dentry.
1143 * For everything else, we want to lookup to succeed.
1145 * One additional note: if CREATE or OPEN succeeded, we add an extra
1146 * reference to the request because we need to keep it around until
1147 * ll_create/ll_open gets called.
1149 * The server will return to us, in it_disposition, an indication of
1150 * exactly what it_status refers to.
1152 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1153 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1154 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1155 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1158 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1161 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1162 struct lookup_intent *it, struct ptlrpc_request **reqp,
1163 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1165 struct ldlm_enqueue_info einfo = {
1166 .ei_type = LDLM_IBITS,
1167 .ei_mode = it_to_lock_mode(it),
1168 .ei_cb_bl = cb_blocking,
1169 .ei_cb_cp = ldlm_completion_ast,
1170 .ei_cb_gl = mdc_ldlm_glimpse_ast,
1172 struct lustre_handle lockh;
1177 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1178 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1179 op_data->op_name, PFID(&op_data->op_fid2),
1180 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1184 if (fid_is_sane(&op_data->op_fid2) &&
1185 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1186 /* We could just return 1 immediately, but since we should only
1187 * be called in revalidate_it if we already have a lock, let's
1189 it->it_lock_handle = 0;
1190 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1191 /* Only return failure if it was not GETATTR by cfid
1192 (from inode_revalidate) */
1193 if (rc || op_data->op_namelen != 0)
1197 /* For case if upper layer did not alloc fid, do it now. */
1198 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1199 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1201 CERROR("Can't alloc new fid, rc %d\n", rc);
1206 rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1211 *reqp = it->it_request;
1212 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1216 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1217 struct ptlrpc_request *req,
1220 struct mdc_getattr_args *ga = args;
1221 struct obd_export *exp = ga->ga_exp;
1222 struct md_enqueue_info *minfo = ga->ga_minfo;
1223 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1224 struct lookup_intent *it;
1225 struct lustre_handle *lockh;
1226 struct obd_device *obddev;
1227 struct ldlm_reply *lockrep;
1228 __u64 flags = LDLM_FL_HAS_INTENT;
1232 lockh = &minfo->mi_lockh;
1234 obddev = class_exp2obd(exp);
1236 obd_put_request_slot(&obddev->u.cli);
1237 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1240 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1241 &flags, NULL, 0, lockh, rc);
1243 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1244 mdc_clear_replay_flag(req, rc);
1248 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1249 LASSERT(lockrep != NULL);
1251 lockrep->lock_policy_res2 =
1252 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1254 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1258 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1262 minfo->mi_cb(req, minfo, rc);
1266 int mdc_intent_getattr_async(struct obd_export *exp,
1267 struct md_enqueue_info *minfo)
1269 struct md_op_data *op_data = &minfo->mi_data;
1270 struct lookup_intent *it = &minfo->mi_it;
1271 struct ptlrpc_request *req;
1272 struct mdc_getattr_args *ga;
1273 struct obd_device *obddev = class_exp2obd(exp);
1274 struct ldlm_res_id res_id;
1275 union ldlm_policy_data policy = {
1276 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1277 MDS_INODELOCK_UPDATE } };
1279 __u64 flags = LDLM_FL_HAS_INTENT;
1282 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1283 (int)op_data->op_namelen, op_data->op_name,
1284 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1286 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1287 /* If the MDT return -ERANGE because of large ACL, then the sponsor
1288 * of the async getattr RPC will handle that by itself. */
1289 req = mdc_intent_getattr_pack(exp, it, op_data,
1290 LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1292 RETURN(PTR_ERR(req));
1294 rc = obd_get_request_slot(&obddev->u.cli);
1296 ptlrpc_req_finished(req);
1300 /* With Data-on-MDT the glimpse callback is needed too.
1301 * It is set here in advance but not in mdc_finish_enqueue()
1302 * to avoid possible races. It is safe to have glimpse handler
1303 * for non-DOM locks and costs nothing.*/
1304 if (minfo->mi_einfo.ei_cb_gl == NULL)
1305 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1307 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1308 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1310 obd_put_request_slot(&obddev->u.cli);
1311 ptlrpc_req_finished(req);
1315 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1316 ga = ptlrpc_req_async_args(req);
1318 ga->ga_minfo = minfo;
1320 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1321 ptlrpcd_add_req(req);