4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_MDC
35 #include <linux/module.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
48 #include "mdc_internal.h"
50 struct mdc_getattr_args {
51 struct obd_export *ga_exp;
52 struct md_enqueue_info *ga_minfo;
55 int it_open_error(int phase, struct lookup_intent *it)
57 if (it_disposition(it, DISP_OPEN_LEASE)) {
58 if (phase >= DISP_OPEN_LEASE)
63 if (it_disposition(it, DISP_OPEN_OPEN)) {
64 if (phase >= DISP_OPEN_OPEN)
70 if (it_disposition(it, DISP_OPEN_CREATE)) {
71 if (phase >= DISP_OPEN_CREATE)
77 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78 if (phase >= DISP_LOOKUP_EXECD)
84 if (it_disposition(it, DISP_IT_EXECD)) {
85 if (phase >= DISP_IT_EXECD)
91 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
96 EXPORT_SYMBOL(it_open_error);
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100 void *data, __u64 *bits)
102 struct ldlm_lock *lock;
103 struct inode *new_inode = data;
109 if (!lustre_handle_is_used(lockh))
112 lock = ldlm_handle2lock(lockh);
114 LASSERT(lock != NULL);
115 lock_res_and_lock(lock);
116 if (lock->l_resource->lr_lvb_inode &&
117 lock->l_resource->lr_lvb_inode != data) {
118 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119 LASSERTF(old_inode->i_state & I_FREEING,
120 "Found existing inode %p/%lu/%u state %lu in lock: "
121 "setting data to %p/%lu/%u\n", old_inode,
122 old_inode->i_ino, old_inode->i_generation,
124 new_inode, new_inode->i_ino, new_inode->i_generation);
126 lock->l_resource->lr_lvb_inode = new_inode;
128 *bits = lock->l_policy_data.l_inodebits.bits;
130 unlock_res_and_lock(lock);
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137 const struct lu_fid *fid, enum ldlm_type type,
138 union ldlm_policy_data *policy,
139 enum ldlm_mode mode, struct lustre_handle *lockh)
141 struct ldlm_res_id res_id;
145 fid_build_reg_res_name(fid, &res_id);
146 /* LU-4405: Clear bits not supported by server */
147 policy->l_inodebits.bits &= exp_connect_ibits(exp);
148 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149 &res_id, type, policy, mode, lockh, 0);
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154 union ldlm_policy_data *policy, enum ldlm_mode mode,
155 enum ldlm_cancel_flags flags, void *opaque)
157 struct obd_device *obd = class_exp2obd(exp);
158 struct ldlm_res_id res_id;
163 fid_build_reg_res_name(fid, &res_id);
164 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165 policy, mode, flags, opaque);
169 int mdc_null_inode(struct obd_export *exp,
170 const struct lu_fid *fid)
172 struct ldlm_res_id res_id;
173 struct ldlm_resource *res;
174 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
177 LASSERTF(ns != NULL, "no namespace passed\n");
179 fid_build_reg_res_name(fid, &res_id);
181 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
186 res->lr_lvb_inode = NULL;
189 ldlm_resource_putref(res);
193 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
195 /* Don't hold error requests for replay. */
196 if (req->rq_replay) {
197 spin_lock(&req->rq_lock);
199 spin_unlock(&req->rq_lock);
201 if (rc && req->rq_transno != 0) {
202 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
207 /* Save a large LOV EA into the request buffer so that it is available
208 * for replay. We don't do this in the initial request because the
209 * original request doesn't need this buffer (at most it sends just the
210 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211 * buffer and may also be difficult to allocate and save a very large
212 * request buffer for each open. (bug 5707)
214 * OOM here may cause recovery failure if lmm is needed (only for the
215 * original open if the MDS crashed just when this client also OOM'd)
216 * but this is incredibly unlikely, and questionable whether the client
217 * could do MDS recovery under OOM anyways... */
218 int mdc_save_lovea(struct ptlrpc_request *req,
219 const struct req_msg_field *field,
220 void *data, u32 size)
222 struct req_capsule *pill = &req->rq_pill;
226 if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
227 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
229 CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
230 req->rq_export->exp_obd->obd_name,
235 req_capsule_shrink(pill, field, size, RCL_CLIENT);
238 req_capsule_set_size(pill, field, RCL_CLIENT, size);
239 lmm = req_capsule_client_get(pill, field);
241 memcpy(lmm, data, size);
246 static struct ptlrpc_request *
247 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
248 struct md_op_data *op_data, __u32 acl_bufsize)
250 struct ptlrpc_request *req;
251 struct obd_device *obddev = class_exp2obd(exp);
252 struct ldlm_intent *lit;
253 const void *lmm = op_data->op_data;
254 __u32 lmmsize = op_data->op_data_size;
255 struct list_head cancels = LIST_HEAD_INIT(cancels);
261 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
263 /* XXX: openlock is not cancelled for cross-refs. */
264 /* If inode is known, cancel conflicting OPEN locks. */
265 if (fid_is_sane(&op_data->op_fid2)) {
266 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
267 if (it->it_flags & MDS_FMODE_WRITE)
272 if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
275 else if (it->it_flags & FMODE_EXEC)
281 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
286 /* If CREATE, cancel parent's UPDATE lock. */
287 if (it->it_op & IT_CREAT)
291 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
293 MDS_INODELOCK_UPDATE);
295 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
296 &RQF_LDLM_INTENT_OPEN);
298 ldlm_lock_list_put(&cancels, l_bl_ast, count);
299 RETURN(ERR_PTR(-ENOMEM));
302 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
303 op_data->op_namelen + 1);
304 if (cl_is_lov_delay_create(it->it_flags)) {
305 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
306 LASSERT(lmmsize == 0);
307 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
309 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
310 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
313 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
314 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
315 strlen(op_data->op_file_secctx_name) + 1 : 0);
317 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
318 op_data->op_file_secctx_size);
320 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
322 ptlrpc_request_free(req);
326 spin_lock(&req->rq_lock);
327 req->rq_replay = req->rq_import->imp_replayable;
328 spin_unlock(&req->rq_lock);
330 /* pack the intent */
331 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
332 lit->opc = (__u64)it->it_op;
334 /* pack the intended request */
335 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
338 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
339 obddev->u.cli.cl_max_mds_easize);
340 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
341 ptlrpc_request_set_replen(req);
345 #define GA_DEFAULT_EA_NAME_LEN 20
346 #define GA_DEFAULT_EA_VAL_LEN 250
347 #define GA_DEFAULT_EA_NUM 10
349 static struct ptlrpc_request *
350 mdc_intent_getxattr_pack(struct obd_export *exp,
351 struct lookup_intent *it,
352 struct md_op_data *op_data)
354 struct ptlrpc_request *req;
355 struct ldlm_intent *lit;
357 struct list_head cancels = LIST_HEAD_INIT(cancels);
358 u32 min_buf_size = 0;
362 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
363 &RQF_LDLM_INTENT_GETXATTR);
365 RETURN(ERR_PTR(-ENOMEM));
367 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
369 ptlrpc_request_free(req);
373 /* pack the intent */
374 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
375 lit->opc = IT_GETXATTR;
377 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
378 /* If the supplied buffer is too small then the server will
379 * return -ERANGE and llite will fallback to using non cached
380 * xattr operations. On servers before 2.10.1 a (non-cached)
381 * listxattr RPC for an orphan or dead file causes an oops. So
382 * let's try to avoid sending too small a buffer to too old a
383 * server. This is effectively undoing the memory conservation
384 * of LU-9417 when it would be *more* likely to crash the
385 * server. See LU-9856. */
386 if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
387 min_buf_size = exp->exp_connect_data.ocd_max_easize;
390 /* pack the intended request */
391 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
392 max_t(u32, min_buf_size,
393 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM),
396 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
397 max_t(u32, min_buf_size,
398 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM));
400 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
401 max_t(u32, min_buf_size,
402 GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM));
404 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
405 max_t(u32, min_buf_size,
406 sizeof(__u32) * GA_DEFAULT_EA_NUM));
408 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
410 ptlrpc_request_set_replen(req);
415 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
416 struct lookup_intent *it,
417 struct md_op_data *op_data)
419 struct ptlrpc_request *req;
420 struct obd_device *obddev = class_exp2obd(exp);
421 struct ldlm_intent *lit;
425 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
426 &RQF_LDLM_INTENT_UNLINK);
428 RETURN(ERR_PTR(-ENOMEM));
430 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
431 op_data->op_namelen + 1);
433 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
435 ptlrpc_request_free(req);
439 /* pack the intent */
440 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
441 lit->opc = (__u64)it->it_op;
443 /* pack the intended request */
444 mdc_unlink_pack(req, op_data);
446 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
447 obddev->u.cli.cl_default_mds_easize);
448 ptlrpc_request_set_replen(req);
452 static struct ptlrpc_request *
453 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
454 struct md_op_data *op_data, __u32 acl_bufsize)
456 struct ptlrpc_request *req;
457 struct obd_device *obddev = class_exp2obd(exp);
458 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
459 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
460 OBD_MD_MEA | OBD_MD_FLACL;
461 struct ldlm_intent *lit;
466 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
467 &RQF_LDLM_INTENT_GETATTR);
469 RETURN(ERR_PTR(-ENOMEM));
471 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
472 op_data->op_namelen + 1);
474 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
476 ptlrpc_request_free(req);
480 /* pack the intent */
481 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
482 lit->opc = (__u64)it->it_op;
484 if (obddev->u.cli.cl_default_mds_easize > 0)
485 easize = obddev->u.cli.cl_default_mds_easize;
487 easize = obddev->u.cli.cl_max_mds_easize;
489 /* pack the intended request */
490 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
492 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
493 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
494 ptlrpc_request_set_replen(req);
498 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
499 struct lookup_intent *it,
500 struct md_op_data *op_data)
502 struct obd_device *obd = class_exp2obd(exp);
503 struct ptlrpc_request *req;
504 struct ldlm_intent *lit;
505 struct layout_intent *layout;
509 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
510 &RQF_LDLM_INTENT_LAYOUT);
512 RETURN(ERR_PTR(-ENOMEM));
514 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
515 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
517 ptlrpc_request_free(req);
521 /* pack the intent */
522 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
523 lit->opc = (__u64)it->it_op;
525 /* pack the layout intent request */
526 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
527 LASSERT(op_data->op_data != NULL);
528 LASSERT(op_data->op_data_size == sizeof(*layout));
529 memcpy(layout, op_data->op_data, sizeof(*layout));
531 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
532 obd->u.cli.cl_default_mds_easize);
533 ptlrpc_request_set_replen(req);
537 static struct ptlrpc_request *
538 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
540 struct ptlrpc_request *req;
544 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
546 RETURN(ERR_PTR(-ENOMEM));
548 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
550 ptlrpc_request_free(req);
554 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
555 ptlrpc_request_set_replen(req);
559 static int mdc_finish_enqueue(struct obd_export *exp,
560 struct ptlrpc_request *req,
561 struct ldlm_enqueue_info *einfo,
562 struct lookup_intent *it,
563 struct lustre_handle *lockh,
566 struct req_capsule *pill = &req->rq_pill;
567 struct ldlm_request *lockreq;
568 struct ldlm_reply *lockrep;
569 struct ldlm_lock *lock;
570 struct mdt_body *body = NULL;
571 void *lvb_data = NULL;
577 /* Similarly, if we're going to replay this request, we don't want to
578 * actually get a lock, just perform the intent. */
579 if (req->rq_transno || req->rq_replay) {
580 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
581 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
584 if (rc == ELDLM_LOCK_ABORTED) {
586 memset(lockh, 0, sizeof(*lockh));
588 } else { /* rc = 0 */
589 lock = ldlm_handle2lock(lockh);
590 LASSERT(lock != NULL);
592 /* If the server gave us back a different lock mode, we should
593 * fix up our variables. */
594 if (lock->l_req_mode != einfo->ei_mode) {
595 ldlm_lock_addref(lockh, lock->l_req_mode);
596 ldlm_lock_decref(lockh, einfo->ei_mode);
597 einfo->ei_mode = lock->l_req_mode;
602 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
603 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
605 it->it_disposition = (int)lockrep->lock_policy_res1;
606 it->it_status = (int)lockrep->lock_policy_res2;
607 it->it_lock_mode = einfo->ei_mode;
608 it->it_lock_handle = lockh->cookie;
609 it->it_request = req;
611 /* Technically speaking rq_transno must already be zero if
612 * it_status is in error, so the check is a bit redundant */
613 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
614 mdc_clear_replay_flag(req, it->it_status);
616 /* If we're doing an IT_OPEN which did not result in an actual
617 * successful open, then we need to remove the bit which saves
618 * this request for unconditional replay.
620 * It's important that we do this first! Otherwise we might exit the
621 * function without doing so, and try to replay a failed create
623 if (it->it_op & IT_OPEN && req->rq_replay &&
624 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
625 mdc_clear_replay_flag(req, it->it_status);
627 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
628 it->it_op, it->it_disposition, it->it_status);
630 /* We know what to expect, so we do any byte flipping required here */
631 if (it_has_reply_body(it)) {
632 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
634 CERROR ("Can't swab mdt_body\n");
638 if (it_disposition(it, DISP_OPEN_OPEN) &&
639 !it_open_error(DISP_OPEN_OPEN, it)) {
641 * If this is a successful OPEN request, we need to set
642 * replay handler and data early, so that if replay
643 * happens immediately after swabbing below, new reply
644 * is swabbed by that handler correctly.
646 mdc_set_open_replay_data(NULL, NULL, it);
649 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
652 mdc_update_max_ea_from_body(exp, body);
655 * The eadata is opaque; just check that it is there.
656 * Eventually, obd_unpackmd() will check the contents.
658 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
659 body->mbo_eadatasize);
663 /* save lvb data and length in case this is for layout
666 lvb_len = body->mbo_eadatasize;
669 * We save the reply LOV EA in case we have to replay a
670 * create for recovery. If we didn't allocate a large
671 * enough request buffer above we need to reallocate it
672 * here to hold the actual LOV EA.
674 * To not save LOV EA if request is not going to replay
675 * (for example error one).
677 if ((it->it_op & IT_OPEN) && req->rq_replay) {
678 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
679 body->mbo_eadatasize);
681 body->mbo_valid &= ~OBD_MD_FLEASIZE;
682 body->mbo_eadatasize = 0;
687 } else if (it->it_op & IT_LAYOUT) {
688 /* maybe the lock was granted right away and layout
689 * is packed into RMF_DLM_LVB of req */
690 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
692 lvb_data = req_capsule_server_sized_get(pill,
693 &RMF_DLM_LVB, lvb_len);
694 if (lvb_data == NULL)
698 * save replied layout data to the request buffer for
699 * recovery consideration (lest MDS reinitialize
700 * another set of OST objects).
703 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
708 /* fill in stripe data for layout lock.
709 * LU-6581: trust layout data only if layout lock is granted. The MDT
710 * has stopped sending layout unless the layout lock is granted. The
711 * client still does this checking in case it's talking with an old
712 * server. - Jinshan */
713 lock = ldlm_handle2lock(lockh);
717 if (ldlm_has_layout(lock) && lvb_data != NULL &&
718 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
721 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
722 ldlm_it2str(it->it_op), lvb_len);
724 OBD_ALLOC_LARGE(lmm, lvb_len);
726 GOTO(out_lock, rc = -ENOMEM);
728 memcpy(lmm, lvb_data, lvb_len);
730 /* install lvb_data */
731 lock_res_and_lock(lock);
732 if (lock->l_lvb_data == NULL) {
733 lock->l_lvb_type = LVB_T_LAYOUT;
734 lock->l_lvb_data = lmm;
735 lock->l_lvb_len = lvb_len;
738 unlock_res_and_lock(lock);
740 OBD_FREE_LARGE(lmm, lvb_len);
743 if (ldlm_has_dom(lock)) {
744 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
746 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
747 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
748 LDLM_ERROR(lock, "%s: DoM lock without size.\n",
749 exp->exp_obd->obd_name);
750 GOTO(out_lock, rc = -EPROTO);
753 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
754 ldlm_it2str(it->it_op), body->mbo_dom_size);
756 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
764 /* We always reserve enough space in the reply packet for a stripe MD, because
765 * we don't know in advance the file type. */
766 static int mdc_enqueue_base(struct obd_export *exp,
767 struct ldlm_enqueue_info *einfo,
768 const union ldlm_policy_data *policy,
769 struct lookup_intent *it,
770 struct md_op_data *op_data,
771 struct lustre_handle *lockh,
772 __u64 extra_lock_flags)
774 struct obd_device *obddev = class_exp2obd(exp);
775 struct ptlrpc_request *req = NULL;
776 __u64 flags, saved_flags = extra_lock_flags;
777 struct ldlm_res_id res_id;
778 static const union ldlm_policy_data lookup_policy = {
779 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
780 static const union ldlm_policy_data update_policy = {
781 .l_inodebits = { MDS_INODELOCK_UPDATE } };
782 static const union ldlm_policy_data layout_policy = {
783 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
784 static const union ldlm_policy_data getxattr_policy = {
785 .l_inodebits = { MDS_INODELOCK_XATTR } };
786 int generation, resends = 0;
787 struct ldlm_reply *lockrep;
788 struct obd_import *imp = class_exp2cliimp(exp);
790 enum lvb_type lvb_type = 0;
794 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
796 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
799 LASSERT(policy == NULL);
801 saved_flags |= LDLM_FL_HAS_INTENT;
802 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
803 policy = &update_policy;
804 else if (it->it_op & IT_LAYOUT)
805 policy = &layout_policy;
806 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
807 policy = &getxattr_policy;
809 policy = &lookup_policy;
812 generation = obddev->u.cli.cl_import->imp_generation;
813 if (!it || (it->it_op & (IT_CREAT | IT_OPEN_CREAT)))
814 acl_bufsize = imp->imp_connect_data.ocd_max_easize;
816 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
821 /* The only way right now is FLOCK. */
822 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
824 res_id.name[3] = LDLM_FLOCK;
825 } else if (it->it_op & IT_OPEN) {
826 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
827 } else if (it->it_op & IT_UNLINK) {
828 req = mdc_intent_unlink_pack(exp, it, op_data);
829 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
830 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
831 } else if (it->it_op & IT_READDIR) {
832 req = mdc_enqueue_pack(exp, 0);
833 } else if (it->it_op & IT_LAYOUT) {
834 if (!imp_connect_lvb_type(imp))
836 req = mdc_intent_layout_pack(exp, it, op_data);
837 lvb_type = LVB_T_LAYOUT;
838 } else if (it->it_op & IT_GETXATTR) {
839 req = mdc_intent_getxattr_pack(exp, it, op_data);
846 RETURN(PTR_ERR(req));
849 req->rq_generation_set = 1;
850 req->rq_import_generation = generation;
851 req->rq_sent = ktime_get_real_seconds() + resends;
854 /* It is important to obtain modify RPC slot first (if applicable), so
855 * that threads that are waiting for a modify RPC slot are not polluting
856 * our rpcs in flight counter.
857 * We do not do flock request limiting, though */
859 mdc_get_mod_rpc_slot(req, it);
860 rc = obd_get_request_slot(&obddev->u.cli);
862 mdc_put_mod_rpc_slot(req, it);
863 mdc_clear_replay_flag(req, 0);
864 ptlrpc_req_finished(req);
869 /* With Data-on-MDT the glimpse callback is needed too.
870 * It is set here in advance but not in mdc_finish_enqueue()
871 * to avoid possible races. It is safe to have glimpse handler
872 * for non-DOM locks and costs nothing.*/
873 if (einfo->ei_cb_gl == NULL)
874 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
876 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
877 0, lvb_type, lockh, 0);
879 /* For flock requests we immediatelly return without further
880 delay and let caller deal with the rest, since rest of
881 this function metadata processing makes no sense for flock
882 requests anyway. But in case of problem during comms with
883 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
884 can not rely on caller and this mainly for F_UNLCKs
885 (explicits or automatically generated by Kernel to clean
886 current FLocks upon exit) that can't be trashed */
887 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
888 (einfo->ei_type == LDLM_FLOCK) &&
889 (einfo->ei_mode == LCK_NL))
894 obd_put_request_slot(&obddev->u.cli);
895 mdc_put_mod_rpc_slot(req, it);
899 "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
900 obddev->obd_name, PFID(&op_data->op_fid1),
901 PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
903 mdc_clear_replay_flag(req, rc);
904 ptlrpc_req_finished(req);
908 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
909 LASSERT(lockrep != NULL);
911 lockrep->lock_policy_res2 =
912 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
914 /* Retry infinitely when the server returns -EINPROGRESS for the
915 * intent operation, when server returns -EINPROGRESS for acquiring
916 * intent lock, we'll retry in after_reply(). */
917 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
918 mdc_clear_replay_flag(req, rc);
919 ptlrpc_req_finished(req);
920 if (generation == obddev->u.cli.cl_import->imp_generation) {
921 if (signal_pending(current))
925 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
926 obddev->obd_name, resends, it->it_op,
927 PFID(&op_data->op_fid1),
928 PFID(&op_data->op_fid2));
931 CDEBUG(D_HA, "resend cross eviction\n");
936 if ((int)lockrep->lock_policy_res2 == -ERANGE &&
937 it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
938 acl_bufsize != imp->imp_connect_data.ocd_max_easize) {
939 mdc_clear_replay_flag(req, -ERANGE);
940 ptlrpc_req_finished(req);
941 acl_bufsize = imp->imp_connect_data.ocd_max_easize;
945 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
947 if (lustre_handle_is_used(lockh)) {
948 ldlm_lock_decref(lockh, einfo->ei_mode);
949 memset(lockh, 0, sizeof(*lockh));
951 ptlrpc_req_finished(req);
953 it->it_lock_handle = 0;
954 it->it_lock_mode = 0;
955 it->it_request = NULL;
961 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
962 const union ldlm_policy_data *policy,
963 struct md_op_data *op_data,
964 struct lustre_handle *lockh, __u64 extra_lock_flags)
966 return mdc_enqueue_base(exp, einfo, policy, NULL,
967 op_data, lockh, extra_lock_flags);
970 static int mdc_finish_intent_lock(struct obd_export *exp,
971 struct ptlrpc_request *request,
972 struct md_op_data *op_data,
973 struct lookup_intent *it,
974 struct lustre_handle *lockh)
976 struct lustre_handle old_lock;
977 struct ldlm_lock *lock;
981 LASSERT(request != NULL);
982 LASSERT(request != LP_POISON);
983 LASSERT(request->rq_repmsg != LP_POISON);
985 if (it->it_op & IT_READDIR)
988 if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
989 if (it->it_status != 0)
990 GOTO(out, rc = it->it_status);
992 if (!it_disposition(it, DISP_IT_EXECD)) {
993 /* The server failed before it even started executing
994 * the intent, i.e. because it couldn't unpack the
997 LASSERT(it->it_status != 0);
998 GOTO(out, rc = it->it_status);
1000 rc = it_open_error(DISP_IT_EXECD, it);
1004 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1008 /* keep requests around for the multiple phases of the call
1009 * this shows the DISP_XX must guarantee we make it into the
1012 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1013 it_disposition(it, DISP_OPEN_CREATE) &&
1014 !it_open_error(DISP_OPEN_CREATE, it)) {
1015 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1016 /* balanced in ll_create_node */
1017 ptlrpc_request_addref(request);
1019 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1020 it_disposition(it, DISP_OPEN_OPEN) &&
1021 !it_open_error(DISP_OPEN_OPEN, it)) {
1022 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1023 /* balanced in ll_file_open */
1024 ptlrpc_request_addref(request);
1025 /* BUG 11546 - eviction in the middle of open rpc
1028 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1032 if (it->it_op & IT_CREAT) {
1033 /* XXX this belongs in ll_create_it */
1034 } else if (it->it_op == IT_OPEN) {
1035 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1037 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1041 /* If we already have a matching lock, then cancel the new
1042 * one. We have to set the data here instead of in
1043 * mdc_enqueue, because we need to use the child's inode as
1044 * the l_ast_data to match, and that's not available until
1045 * intent_finish has performed the iget().) */
1046 lock = ldlm_handle2lock(lockh);
1048 union ldlm_policy_data policy = lock->l_policy_data;
1049 LDLM_DEBUG(lock, "matching against this");
1051 if (it_has_reply_body(it)) {
1052 struct mdt_body *body;
1054 body = req_capsule_server_get(&request->rq_pill,
1056 /* mdc_enqueue checked */
1057 LASSERT(body != NULL);
1058 LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1059 &lock->l_resource->lr_name),
1060 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1061 PLDLMRES(lock->l_resource),
1062 PFID(&body->mbo_fid1));
1064 LDLM_LOCK_PUT(lock);
1066 memcpy(&old_lock, lockh, sizeof(*lockh));
1067 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1068 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1069 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1070 memcpy(lockh, &old_lock, sizeof(old_lock));
1071 it->it_lock_handle = lockh->cookie;
1077 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1078 (int)op_data->op_namelen, op_data->op_name,
1079 ldlm_it2str(it->it_op), it->it_status,
1080 it->it_disposition, rc);
1084 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1085 struct lu_fid *fid, __u64 *bits)
1087 /* We could just return 1 immediately, but since we should only
1088 * be called in revalidate_it if we already have a lock, let's
1090 struct ldlm_res_id res_id;
1091 struct lustre_handle lockh;
1092 union ldlm_policy_data policy;
1093 enum ldlm_mode mode;
1096 if (it->it_lock_handle) {
1097 lockh.cookie = it->it_lock_handle;
1098 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1100 fid_build_reg_res_name(fid, &res_id);
1101 switch (it->it_op) {
1103 /* File attributes are held under multiple bits:
1104 * nlink is under lookup lock, size and times are
1105 * under UPDATE lock and recently we've also got
1106 * a separate permissions lock for owner/group/acl that
1107 * were protected by lookup lock before.
1108 * Getattr must provide all of that information,
1109 * so we need to ensure we have all of those locks.
1110 * Unfortunately, if the bits are split across multiple
1111 * locks, there's no easy way to match all of them here,
1112 * so an extra RPC would be performed to fetch all
1113 * of those bits at once for now. */
1114 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1115 * but for old MDTs (< 2.4), permission is covered
1116 * by LOOKUP lock, so it needs to match all bits here.*/
1117 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1118 MDS_INODELOCK_LOOKUP |
1122 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1125 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1128 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1132 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1133 LDLM_IBITS, &policy,
1134 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1139 it->it_lock_handle = lockh.cookie;
1140 it->it_lock_mode = mode;
1142 it->it_lock_handle = 0;
1143 it->it_lock_mode = 0;
1150 * This long block is all about fixing up the lock and request state
1151 * so that it is correct as of the moment _before_ the operation was
1152 * applied; that way, the VFS will think that everything is normal and
1153 * call Lustre's regular VFS methods.
1155 * If we're performing a creation, that means that unless the creation
1156 * failed with EEXIST, we should fake up a negative dentry.
1158 * For everything else, we want to lookup to succeed.
1160 * One additional note: if CREATE or OPEN succeeded, we add an extra
1161 * reference to the request because we need to keep it around until
1162 * ll_create/ll_open gets called.
1164 * The server will return to us, in it_disposition, an indication of
1165 * exactly what it_status refers to.
1167 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1168 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1169 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1170 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1173 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1176 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1177 struct lookup_intent *it, struct ptlrpc_request **reqp,
1178 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1180 struct ldlm_enqueue_info einfo = {
1181 .ei_type = LDLM_IBITS,
1182 .ei_mode = it_to_lock_mode(it),
1183 .ei_cb_bl = cb_blocking,
1184 .ei_cb_cp = ldlm_completion_ast,
1185 .ei_cb_gl = mdc_ldlm_glimpse_ast,
1187 struct lustre_handle lockh;
1192 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1193 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1194 op_data->op_name, PFID(&op_data->op_fid2),
1195 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1199 if (fid_is_sane(&op_data->op_fid2) &&
1200 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1201 /* We could just return 1 immediately, but since we should only
1202 * be called in revalidate_it if we already have a lock, let's
1204 it->it_lock_handle = 0;
1205 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1206 /* Only return failure if it was not GETATTR by cfid
1207 (from inode_revalidate) */
1208 if (rc || op_data->op_namelen != 0)
1212 /* For case if upper layer did not alloc fid, do it now. */
1213 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1214 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1216 CERROR("Can't alloc new fid, rc %d\n", rc);
1221 rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1226 *reqp = it->it_request;
1227 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1231 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1232 struct ptlrpc_request *req,
1235 struct mdc_getattr_args *ga = args;
1236 struct obd_export *exp = ga->ga_exp;
1237 struct md_enqueue_info *minfo = ga->ga_minfo;
1238 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1239 struct lookup_intent *it;
1240 struct lustre_handle *lockh;
1241 struct obd_device *obddev;
1242 struct ldlm_reply *lockrep;
1243 __u64 flags = LDLM_FL_HAS_INTENT;
1247 lockh = &minfo->mi_lockh;
1249 obddev = class_exp2obd(exp);
1251 obd_put_request_slot(&obddev->u.cli);
1252 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1255 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1256 &flags, NULL, 0, lockh, rc);
1258 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1259 mdc_clear_replay_flag(req, rc);
1263 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1264 LASSERT(lockrep != NULL);
1266 lockrep->lock_policy_res2 =
1267 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1269 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1273 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1277 minfo->mi_cb(req, minfo, rc);
1281 int mdc_intent_getattr_async(struct obd_export *exp,
1282 struct md_enqueue_info *minfo)
1284 struct md_op_data *op_data = &minfo->mi_data;
1285 struct lookup_intent *it = &minfo->mi_it;
1286 struct ptlrpc_request *req;
1287 struct mdc_getattr_args *ga;
1288 struct obd_device *obddev = class_exp2obd(exp);
1289 struct ldlm_res_id res_id;
1290 union ldlm_policy_data policy = {
1291 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1292 MDS_INODELOCK_UPDATE } };
1294 __u64 flags = LDLM_FL_HAS_INTENT;
1297 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1298 (int)op_data->op_namelen, op_data->op_name,
1299 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1301 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1302 /* If the MDT return -ERANGE because of large ACL, then the sponsor
1303 * of the async getattr RPC will handle that by itself. */
1304 req = mdc_intent_getattr_pack(exp, it, op_data,
1305 LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1307 RETURN(PTR_ERR(req));
1309 rc = obd_get_request_slot(&obddev->u.cli);
1311 ptlrpc_req_finished(req);
1315 /* With Data-on-MDT the glimpse callback is needed too.
1316 * It is set here in advance but not in mdc_finish_enqueue()
1317 * to avoid possible races. It is safe to have glimpse handler
1318 * for non-DOM locks and costs nothing.*/
1319 if (minfo->mi_einfo.ei_cb_gl == NULL)
1320 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1322 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1323 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1325 obd_put_request_slot(&obddev->u.cli);
1326 ptlrpc_req_finished(req);
1330 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1331 ga = ptlrpc_req_async_args(req);
1333 ga->ga_minfo = minfo;
1335 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1336 ptlrpcd_add_req(req);