4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_MDC
35 #include <linux/module.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
47 #include "mdc_internal.h"
49 struct mdc_getattr_args {
50 struct obd_export *ga_exp;
51 struct md_enqueue_info *ga_minfo;
54 int it_open_error(int phase, struct lookup_intent *it)
56 if (it_disposition(it, DISP_OPEN_LEASE)) {
57 if (phase >= DISP_OPEN_LEASE)
62 if (it_disposition(it, DISP_OPEN_OPEN)) {
63 if (phase >= DISP_OPEN_OPEN)
69 if (it_disposition(it, DISP_OPEN_CREATE)) {
70 if (phase >= DISP_OPEN_CREATE)
76 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
77 if (phase >= DISP_LOOKUP_EXECD)
83 if (it_disposition(it, DISP_IT_EXECD)) {
84 if (phase >= DISP_IT_EXECD)
90 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
95 EXPORT_SYMBOL(it_open_error);
97 /* this must be called on a lockh that is known to have a referenced lock */
98 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
99 void *data, __u64 *bits)
101 struct ldlm_lock *lock;
102 struct inode *new_inode = data;
108 if (!lustre_handle_is_used(lockh))
111 lock = ldlm_handle2lock(lockh);
113 LASSERT(lock != NULL);
114 lock_res_and_lock(lock);
115 if (lock->l_resource->lr_lvb_inode &&
116 lock->l_resource->lr_lvb_inode != data) {
117 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
118 LASSERTF(old_inode->i_state & I_FREEING,
119 "Found existing inode %p/%lu/%u state %lu in lock: "
120 "setting data to %p/%lu/%u\n", old_inode,
121 old_inode->i_ino, old_inode->i_generation,
123 new_inode, new_inode->i_ino, new_inode->i_generation);
125 lock->l_resource->lr_lvb_inode = new_inode;
127 *bits = lock->l_policy_data.l_inodebits.bits;
129 unlock_res_and_lock(lock);
135 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
136 const struct lu_fid *fid, enum ldlm_type type,
137 union ldlm_policy_data *policy,
138 enum ldlm_mode mode, struct lustre_handle *lockh)
140 struct ldlm_res_id res_id;
144 fid_build_reg_res_name(fid, &res_id);
145 /* LU-4405: Clear bits not supported by server */
146 policy->l_inodebits.bits &= exp_connect_ibits(exp);
147 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
148 &res_id, type, policy, mode, lockh, 0);
152 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
153 union ldlm_policy_data *policy, enum ldlm_mode mode,
154 enum ldlm_cancel_flags flags, void *opaque)
156 struct obd_device *obd = class_exp2obd(exp);
157 struct ldlm_res_id res_id;
162 fid_build_reg_res_name(fid, &res_id);
163 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
164 policy, mode, flags, opaque);
168 int mdc_null_inode(struct obd_export *exp,
169 const struct lu_fid *fid)
171 struct ldlm_res_id res_id;
172 struct ldlm_resource *res;
173 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
176 LASSERTF(ns != NULL, "no namespace passed\n");
178 fid_build_reg_res_name(fid, &res_id);
180 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
185 res->lr_lvb_inode = NULL;
188 ldlm_resource_putref(res);
192 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
194 /* Don't hold error requests for replay. */
195 if (req->rq_replay) {
196 spin_lock(&req->rq_lock);
198 spin_unlock(&req->rq_lock);
200 if (rc && req->rq_transno != 0) {
201 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
206 /* Save a large LOV EA into the request buffer so that it is available
207 * for replay. We don't do this in the initial request because the
208 * original request doesn't need this buffer (at most it sends just the
209 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
210 * buffer and may also be difficult to allocate and save a very large
211 * request buffer for each open. (bug 5707)
213 * OOM here may cause recovery failure if lmm is needed (only for the
214 * original open if the MDS crashed just when this client also OOM'd)
215 * but this is incredibly unlikely, and questionable whether the client
216 * could do MDS recovery under OOM anyways... */
217 int mdc_save_lovea(struct ptlrpc_request *req,
218 const struct req_msg_field *field,
219 void *data, u32 size)
221 struct req_capsule *pill = &req->rq_pill;
225 if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
226 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
228 CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
229 req->rq_export->exp_obd->obd_name,
234 req_capsule_shrink(pill, field, size, RCL_CLIENT);
237 req_capsule_set_size(pill, field, RCL_CLIENT, size);
238 lmm = req_capsule_client_get(pill, field);
240 memcpy(lmm, data, size);
245 static struct ptlrpc_request *
246 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
247 struct md_op_data *op_data)
249 struct ptlrpc_request *req;
250 struct obd_device *obddev = class_exp2obd(exp);
251 struct ldlm_intent *lit;
252 const void *lmm = op_data->op_data;
253 __u32 lmmsize = op_data->op_data_size;
254 struct list_head cancels = LIST_HEAD_INIT(cancels);
260 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
262 /* XXX: openlock is not cancelled for cross-refs. */
263 /* If inode is known, cancel conflicting OPEN locks. */
264 if (fid_is_sane(&op_data->op_fid2)) {
265 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
266 if (it->it_flags & FMODE_WRITE)
271 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
274 else if (it->it_flags & FMODE_EXEC)
280 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
285 /* If CREATE, cancel parent's UPDATE lock. */
286 if (it->it_op & IT_CREAT)
290 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
292 MDS_INODELOCK_UPDATE);
294 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
295 &RQF_LDLM_INTENT_OPEN);
297 ldlm_lock_list_put(&cancels, l_bl_ast, count);
298 RETURN(ERR_PTR(-ENOMEM));
301 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
302 op_data->op_namelen + 1);
303 if (cl_is_lov_delay_create(it->it_flags)) {
304 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
305 LASSERT(lmmsize == 0);
306 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
308 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
309 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
312 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
313 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
314 strlen(op_data->op_file_secctx_name) + 1 : 0);
316 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
317 op_data->op_file_secctx_size);
319 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
321 ptlrpc_request_free(req);
325 spin_lock(&req->rq_lock);
326 req->rq_replay = req->rq_import->imp_replayable;
327 spin_unlock(&req->rq_lock);
329 /* pack the intent */
330 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
331 lit->opc = (__u64)it->it_op;
333 /* pack the intended request */
334 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
337 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
338 obddev->u.cli.cl_max_mds_easize);
339 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
340 req->rq_import->imp_connect_data.ocd_max_easize);
341 ptlrpc_request_set_replen(req);
345 #define GA_DEFAULT_EA_NAME_LEN 20
346 #define GA_DEFAULT_EA_VAL_LEN 250
347 #define GA_DEFAULT_EA_NUM 10
349 static struct ptlrpc_request *
350 mdc_intent_getxattr_pack(struct obd_export *exp,
351 struct lookup_intent *it,
352 struct md_op_data *op_data)
354 struct ptlrpc_request *req;
355 struct ldlm_intent *lit;
357 struct list_head cancels = LIST_HEAD_INIT(cancels);
361 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
362 &RQF_LDLM_INTENT_GETXATTR);
364 RETURN(ERR_PTR(-ENOMEM));
366 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
368 ptlrpc_request_free(req);
372 /* pack the intent */
373 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
374 lit->opc = IT_GETXATTR;
376 /* pack the intended request */
377 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
378 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM,
381 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
382 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
384 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
385 GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM);
387 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
388 sizeof(__u32) * GA_DEFAULT_EA_NUM);
390 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
392 ptlrpc_request_set_replen(req);
397 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
398 struct lookup_intent *it,
399 struct md_op_data *op_data)
401 struct ptlrpc_request *req;
402 struct obd_device *obddev = class_exp2obd(exp);
403 struct ldlm_intent *lit;
407 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
408 &RQF_LDLM_INTENT_UNLINK);
410 RETURN(ERR_PTR(-ENOMEM));
412 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
413 op_data->op_namelen + 1);
415 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
417 ptlrpc_request_free(req);
421 /* pack the intent */
422 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
423 lit->opc = (__u64)it->it_op;
425 /* pack the intended request */
426 mdc_unlink_pack(req, op_data);
428 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
429 obddev->u.cli.cl_default_mds_easize);
430 ptlrpc_request_set_replen(req);
434 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
435 struct lookup_intent *it,
436 struct md_op_data *op_data)
438 struct ptlrpc_request *req;
439 struct obd_device *obddev = class_exp2obd(exp);
440 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
441 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
442 OBD_MD_MEA | OBD_MD_FLACL;
443 struct ldlm_intent *lit;
448 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
449 &RQF_LDLM_INTENT_GETATTR);
451 RETURN(ERR_PTR(-ENOMEM));
453 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
454 op_data->op_namelen + 1);
456 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
458 ptlrpc_request_free(req);
462 /* pack the intent */
463 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
464 lit->opc = (__u64)it->it_op;
466 if (obddev->u.cli.cl_default_mds_easize > 0)
467 easize = obddev->u.cli.cl_default_mds_easize;
469 easize = obddev->u.cli.cl_max_mds_easize;
471 /* pack the intended request */
472 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
474 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
475 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
476 req->rq_import->imp_connect_data.ocd_max_easize);
477 ptlrpc_request_set_replen(req);
481 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
482 struct lookup_intent *it,
483 struct md_op_data *op_data)
485 struct obd_device *obd = class_exp2obd(exp);
486 struct ptlrpc_request *req;
487 struct ldlm_intent *lit;
488 struct layout_intent *layout;
492 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
493 &RQF_LDLM_INTENT_LAYOUT);
495 RETURN(ERR_PTR(-ENOMEM));
497 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
498 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
500 ptlrpc_request_free(req);
504 /* pack the intent */
505 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
506 lit->opc = (__u64)it->it_op;
508 /* pack the layout intent request */
509 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
510 LASSERT(op_data->op_data != NULL);
511 LASSERT(op_data->op_data_size == sizeof(*layout));
512 memcpy(layout, op_data->op_data, sizeof(*layout));
514 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
515 obd->u.cli.cl_default_mds_easize);
516 ptlrpc_request_set_replen(req);
520 static struct ptlrpc_request *
521 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
523 struct ptlrpc_request *req;
527 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
529 RETURN(ERR_PTR(-ENOMEM));
531 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
533 ptlrpc_request_free(req);
537 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
538 ptlrpc_request_set_replen(req);
542 static int mdc_finish_enqueue(struct obd_export *exp,
543 struct ptlrpc_request *req,
544 struct ldlm_enqueue_info *einfo,
545 struct lookup_intent *it,
546 struct lustre_handle *lockh,
549 struct req_capsule *pill = &req->rq_pill;
550 struct ldlm_request *lockreq;
551 struct ldlm_reply *lockrep;
552 struct ldlm_lock *lock;
553 struct mdt_body *body = NULL;
554 void *lvb_data = NULL;
560 /* Similarly, if we're going to replay this request, we don't want to
561 * actually get a lock, just perform the intent. */
562 if (req->rq_transno || req->rq_replay) {
563 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
564 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
567 if (rc == ELDLM_LOCK_ABORTED) {
569 memset(lockh, 0, sizeof(*lockh));
571 } else { /* rc = 0 */
572 lock = ldlm_handle2lock(lockh);
573 LASSERT(lock != NULL);
575 /* If the server gave us back a different lock mode, we should
576 * fix up our variables. */
577 if (lock->l_req_mode != einfo->ei_mode) {
578 ldlm_lock_addref(lockh, lock->l_req_mode);
579 ldlm_lock_decref(lockh, einfo->ei_mode);
580 einfo->ei_mode = lock->l_req_mode;
585 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
586 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
588 it->it_disposition = (int)lockrep->lock_policy_res1;
589 it->it_status = (int)lockrep->lock_policy_res2;
590 it->it_lock_mode = einfo->ei_mode;
591 it->it_lock_handle = lockh->cookie;
592 it->it_request = req;
594 /* Technically speaking rq_transno must already be zero if
595 * it_status is in error, so the check is a bit redundant */
596 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
597 mdc_clear_replay_flag(req, it->it_status);
599 /* If we're doing an IT_OPEN which did not result in an actual
600 * successful open, then we need to remove the bit which saves
601 * this request for unconditional replay.
603 * It's important that we do this first! Otherwise we might exit the
604 * function without doing so, and try to replay a failed create
606 if (it->it_op & IT_OPEN && req->rq_replay &&
607 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
608 mdc_clear_replay_flag(req, it->it_status);
610 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
611 it->it_op, it->it_disposition, it->it_status);
613 /* We know what to expect, so we do any byte flipping required here */
614 if (it_has_reply_body(it)) {
615 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
617 CERROR ("Can't swab mdt_body\n");
621 if (it_disposition(it, DISP_OPEN_OPEN) &&
622 !it_open_error(DISP_OPEN_OPEN, it)) {
624 * If this is a successful OPEN request, we need to set
625 * replay handler and data early, so that if replay
626 * happens immediately after swabbing below, new reply
627 * is swabbed by that handler correctly.
629 mdc_set_open_replay_data(NULL, NULL, it);
632 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
635 mdc_update_max_ea_from_body(exp, body);
638 * The eadata is opaque; just check that it is there.
639 * Eventually, obd_unpackmd() will check the contents.
641 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
642 body->mbo_eadatasize);
646 /* save lvb data and length in case this is for layout
649 lvb_len = body->mbo_eadatasize;
652 * We save the reply LOV EA in case we have to replay a
653 * create for recovery. If we didn't allocate a large
654 * enough request buffer above we need to reallocate it
655 * here to hold the actual LOV EA.
657 * To not save LOV EA if request is not going to replay
658 * (for example error one).
660 if ((it->it_op & IT_OPEN) && req->rq_replay) {
661 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
662 body->mbo_eadatasize);
664 body->mbo_valid &= ~OBD_MD_FLEASIZE;
665 body->mbo_eadatasize = 0;
670 } else if (it->it_op & IT_LAYOUT) {
671 /* maybe the lock was granted right away and layout
672 * is packed into RMF_DLM_LVB of req */
673 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
675 lvb_data = req_capsule_server_sized_get(pill,
676 &RMF_DLM_LVB, lvb_len);
677 if (lvb_data == NULL)
681 * save replied layout data to the request buffer for
682 * recovery consideration (lest MDS reinitialize
683 * another set of OST objects).
686 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
691 /* fill in stripe data for layout lock.
692 * LU-6581: trust layout data only if layout lock is granted. The MDT
693 * has stopped sending layout unless the layout lock is granted. The
694 * client still does this checking in case it's talking with an old
695 * server. - Jinshan */
696 lock = ldlm_handle2lock(lockh);
700 if (ldlm_has_layout(lock) && lvb_data != NULL &&
701 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
704 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
705 ldlm_it2str(it->it_op), lvb_len);
707 OBD_ALLOC_LARGE(lmm, lvb_len);
709 GOTO(out_lock, rc = -ENOMEM);
711 memcpy(lmm, lvb_data, lvb_len);
713 /* install lvb_data */
714 lock_res_and_lock(lock);
715 if (lock->l_lvb_data == NULL) {
716 lock->l_lvb_type = LVB_T_LAYOUT;
717 lock->l_lvb_data = lmm;
718 lock->l_lvb_len = lvb_len;
721 unlock_res_and_lock(lock);
723 OBD_FREE_LARGE(lmm, lvb_len);
726 if (ldlm_has_dom(lock)) {
727 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
729 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
730 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
731 LDLM_ERROR(lock, "%s: DoM lock without size.\n",
732 exp->exp_obd->obd_name);
733 GOTO(out_lock, rc = -EPROTO);
736 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
737 ldlm_it2str(it->it_op), body->mbo_dom_size);
739 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
747 /* We always reserve enough space in the reply packet for a stripe MD, because
748 * we don't know in advance the file type. */
749 static int mdc_enqueue_base(struct obd_export *exp,
750 struct ldlm_enqueue_info *einfo,
751 const union ldlm_policy_data *policy,
752 struct lookup_intent *it,
753 struct md_op_data *op_data,
754 struct lustre_handle *lockh,
755 __u64 extra_lock_flags)
757 struct obd_device *obddev = class_exp2obd(exp);
758 struct ptlrpc_request *req = NULL;
759 __u64 flags, saved_flags = extra_lock_flags;
760 struct ldlm_res_id res_id;
761 static const union ldlm_policy_data lookup_policy = {
762 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
763 static const union ldlm_policy_data update_policy = {
764 .l_inodebits = { MDS_INODELOCK_UPDATE } };
765 static const union ldlm_policy_data layout_policy = {
766 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
767 static const union ldlm_policy_data getxattr_policy = {
768 .l_inodebits = { MDS_INODELOCK_XATTR } };
769 int generation, resends = 0;
770 struct ldlm_reply *lockrep;
771 enum lvb_type lvb_type = 0;
775 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
777 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
780 LASSERT(policy == NULL);
782 saved_flags |= LDLM_FL_HAS_INTENT;
783 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
784 policy = &update_policy;
785 else if (it->it_op & IT_LAYOUT)
786 policy = &layout_policy;
787 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
788 policy = &getxattr_policy;
790 policy = &lookup_policy;
793 generation = obddev->u.cli.cl_import->imp_generation;
797 /* The only way right now is FLOCK. */
798 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
800 res_id.name[3] = LDLM_FLOCK;
801 } else if (it->it_op & IT_OPEN) {
802 req = mdc_intent_open_pack(exp, it, op_data);
803 } else if (it->it_op & IT_UNLINK) {
804 req = mdc_intent_unlink_pack(exp, it, op_data);
805 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
806 req = mdc_intent_getattr_pack(exp, it, op_data);
807 } else if (it->it_op & IT_READDIR) {
808 req = mdc_enqueue_pack(exp, 0);
809 } else if (it->it_op & IT_LAYOUT) {
810 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
812 req = mdc_intent_layout_pack(exp, it, op_data);
813 lvb_type = LVB_T_LAYOUT;
814 } else if (it->it_op & IT_GETXATTR) {
815 req = mdc_intent_getxattr_pack(exp, it, op_data);
822 RETURN(PTR_ERR(req));
825 req->rq_generation_set = 1;
826 req->rq_import_generation = generation;
827 req->rq_sent = ktime_get_real_seconds() + resends;
830 /* It is important to obtain modify RPC slot first (if applicable), so
831 * that threads that are waiting for a modify RPC slot are not polluting
832 * our rpcs in flight counter.
833 * We do not do flock request limiting, though */
835 mdc_get_mod_rpc_slot(req, it);
836 rc = obd_get_request_slot(&obddev->u.cli);
838 mdc_put_mod_rpc_slot(req, it);
839 mdc_clear_replay_flag(req, 0);
840 ptlrpc_req_finished(req);
845 /* With Data-on-MDT the glimpse callback is needed too.
846 * It is set here in advance but not in mdc_finish_enqueue()
847 * to avoid possible races. It is safe to have glimpse handler
848 * for non-DOM locks and costs nothing.*/
849 if (einfo->ei_cb_gl == NULL)
850 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
852 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
853 0, lvb_type, lockh, 0);
855 /* For flock requests we immediatelly return without further
856 delay and let caller deal with the rest, since rest of
857 this function metadata processing makes no sense for flock
858 requests anyway. But in case of problem during comms with
859 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
860 can not rely on caller and this mainly for F_UNLCKs
861 (explicits or automatically generated by Kernel to clean
862 current FLocks upon exit) that can't be trashed */
863 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
864 (einfo->ei_type == LDLM_FLOCK) &&
865 (einfo->ei_mode == LCK_NL))
870 obd_put_request_slot(&obddev->u.cli);
871 mdc_put_mod_rpc_slot(req, it);
875 "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
876 obddev->obd_name, PFID(&op_data->op_fid1),
877 PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
879 mdc_clear_replay_flag(req, rc);
880 ptlrpc_req_finished(req);
884 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
885 LASSERT(lockrep != NULL);
887 lockrep->lock_policy_res2 =
888 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
890 /* Retry infinitely when the server returns -EINPROGRESS for the
891 * intent operation, when server returns -EINPROGRESS for acquiring
892 * intent lock, we'll retry in after_reply(). */
893 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
894 mdc_clear_replay_flag(req, rc);
895 ptlrpc_req_finished(req);
896 if (generation == obddev->u.cli.cl_import->imp_generation) {
897 if (signal_pending(current))
901 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
902 obddev->obd_name, resends, it->it_op,
903 PFID(&op_data->op_fid1),
904 PFID(&op_data->op_fid2));
907 CDEBUG(D_HA, "resend cross eviction\n");
912 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
914 if (lustre_handle_is_used(lockh)) {
915 ldlm_lock_decref(lockh, einfo->ei_mode);
916 memset(lockh, 0, sizeof(*lockh));
918 ptlrpc_req_finished(req);
920 it->it_lock_handle = 0;
921 it->it_lock_mode = 0;
922 it->it_request = NULL;
928 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
929 const union ldlm_policy_data *policy,
930 struct md_op_data *op_data,
931 struct lustre_handle *lockh, __u64 extra_lock_flags)
933 return mdc_enqueue_base(exp, einfo, policy, NULL,
934 op_data, lockh, extra_lock_flags);
937 static int mdc_finish_intent_lock(struct obd_export *exp,
938 struct ptlrpc_request *request,
939 struct md_op_data *op_data,
940 struct lookup_intent *it,
941 struct lustre_handle *lockh)
943 struct lustre_handle old_lock;
944 struct ldlm_lock *lock;
948 LASSERT(request != NULL);
949 LASSERT(request != LP_POISON);
950 LASSERT(request->rq_repmsg != LP_POISON);
952 if (it->it_op & IT_READDIR)
955 if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
956 if (it->it_status != 0)
957 GOTO(out, rc = it->it_status);
959 if (!it_disposition(it, DISP_IT_EXECD)) {
960 /* The server failed before it even started executing
961 * the intent, i.e. because it couldn't unpack the
964 LASSERT(it->it_status != 0);
965 GOTO(out, rc = it->it_status);
967 rc = it_open_error(DISP_IT_EXECD, it);
971 rc = it_open_error(DISP_LOOKUP_EXECD, it);
975 /* keep requests around for the multiple phases of the call
976 * this shows the DISP_XX must guarantee we make it into the
979 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
980 it_disposition(it, DISP_OPEN_CREATE) &&
981 !it_open_error(DISP_OPEN_CREATE, it)) {
982 it_set_disposition(it, DISP_ENQ_CREATE_REF);
983 /* balanced in ll_create_node */
984 ptlrpc_request_addref(request);
986 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
987 it_disposition(it, DISP_OPEN_OPEN) &&
988 !it_open_error(DISP_OPEN_OPEN, it)) {
989 it_set_disposition(it, DISP_ENQ_OPEN_REF);
990 /* balanced in ll_file_open */
991 ptlrpc_request_addref(request);
992 /* BUG 11546 - eviction in the middle of open rpc
995 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
999 if (it->it_op & IT_CREAT) {
1000 /* XXX this belongs in ll_create_it */
1001 } else if (it->it_op == IT_OPEN) {
1002 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1004 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1008 /* If we already have a matching lock, then cancel the new
1009 * one. We have to set the data here instead of in
1010 * mdc_enqueue, because we need to use the child's inode as
1011 * the l_ast_data to match, and that's not available until
1012 * intent_finish has performed the iget().) */
1013 lock = ldlm_handle2lock(lockh);
1015 union ldlm_policy_data policy = lock->l_policy_data;
1016 LDLM_DEBUG(lock, "matching against this");
1018 if (it_has_reply_body(it)) {
1019 struct mdt_body *body;
1021 body = req_capsule_server_get(&request->rq_pill,
1023 /* mdc_enqueue checked */
1024 LASSERT(body != NULL);
1025 LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1026 &lock->l_resource->lr_name),
1027 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1028 PLDLMRES(lock->l_resource),
1029 PFID(&body->mbo_fid1));
1031 LDLM_LOCK_PUT(lock);
1033 memcpy(&old_lock, lockh, sizeof(*lockh));
1034 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1035 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1036 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1037 memcpy(lockh, &old_lock, sizeof(old_lock));
1038 it->it_lock_handle = lockh->cookie;
1044 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1045 (int)op_data->op_namelen, op_data->op_name,
1046 ldlm_it2str(it->it_op), it->it_status,
1047 it->it_disposition, rc);
1051 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1052 struct lu_fid *fid, __u64 *bits)
1054 /* We could just return 1 immediately, but since we should only
1055 * be called in revalidate_it if we already have a lock, let's
1057 struct ldlm_res_id res_id;
1058 struct lustre_handle lockh;
1059 union ldlm_policy_data policy;
1060 enum ldlm_mode mode;
1063 if (it->it_lock_handle) {
1064 lockh.cookie = it->it_lock_handle;
1065 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1067 fid_build_reg_res_name(fid, &res_id);
1068 switch (it->it_op) {
1070 /* File attributes are held under multiple bits:
1071 * nlink is under lookup lock, size and times are
1072 * under UPDATE lock and recently we've also got
1073 * a separate permissions lock for owner/group/acl that
1074 * were protected by lookup lock before.
1075 * Getattr must provide all of that information,
1076 * so we need to ensure we have all of those locks.
1077 * Unfortunately, if the bits are split across multiple
1078 * locks, there's no easy way to match all of them here,
1079 * so an extra RPC would be performed to fetch all
1080 * of those bits at once for now. */
1081 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1082 * but for old MDTs (< 2.4), permission is covered
1083 * by LOOKUP lock, so it needs to match all bits here.*/
1084 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1085 MDS_INODELOCK_LOOKUP |
1089 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1092 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1095 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1099 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1100 LDLM_IBITS, &policy,
1101 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1106 it->it_lock_handle = lockh.cookie;
1107 it->it_lock_mode = mode;
1109 it->it_lock_handle = 0;
1110 it->it_lock_mode = 0;
1117 * This long block is all about fixing up the lock and request state
1118 * so that it is correct as of the moment _before_ the operation was
1119 * applied; that way, the VFS will think that everything is normal and
1120 * call Lustre's regular VFS methods.
1122 * If we're performing a creation, that means that unless the creation
1123 * failed with EEXIST, we should fake up a negative dentry.
1125 * For everything else, we want to lookup to succeed.
1127 * One additional note: if CREATE or OPEN succeeded, we add an extra
1128 * reference to the request because we need to keep it around until
1129 * ll_create/ll_open gets called.
1131 * The server will return to us, in it_disposition, an indication of
1132 * exactly what it_status refers to.
1134 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1135 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1136 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1137 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1140 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1143 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1144 struct lookup_intent *it, struct ptlrpc_request **reqp,
1145 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1147 struct ldlm_enqueue_info einfo = {
1148 .ei_type = LDLM_IBITS,
1149 .ei_mode = it_to_lock_mode(it),
1150 .ei_cb_bl = cb_blocking,
1151 .ei_cb_cp = ldlm_completion_ast,
1152 .ei_cb_gl = mdc_ldlm_glimpse_ast,
1154 struct lustre_handle lockh;
1159 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1160 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1161 op_data->op_name, PFID(&op_data->op_fid2),
1162 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1166 if (fid_is_sane(&op_data->op_fid2) &&
1167 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1168 /* We could just return 1 immediately, but since we should only
1169 * be called in revalidate_it if we already have a lock, let's
1171 it->it_lock_handle = 0;
1172 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1173 /* Only return failure if it was not GETATTR by cfid
1174 (from inode_revalidate) */
1175 if (rc || op_data->op_namelen != 0)
1179 /* For case if upper layer did not alloc fid, do it now. */
1180 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1181 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1183 CERROR("Can't alloc new fid, rc %d\n", rc);
1188 rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1193 *reqp = it->it_request;
1194 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1198 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1199 struct ptlrpc_request *req,
1202 struct mdc_getattr_args *ga = args;
1203 struct obd_export *exp = ga->ga_exp;
1204 struct md_enqueue_info *minfo = ga->ga_minfo;
1205 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1206 struct lookup_intent *it;
1207 struct lustre_handle *lockh;
1208 struct obd_device *obddev;
1209 struct ldlm_reply *lockrep;
1210 __u64 flags = LDLM_FL_HAS_INTENT;
1214 lockh = &minfo->mi_lockh;
1216 obddev = class_exp2obd(exp);
1218 obd_put_request_slot(&obddev->u.cli);
1219 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1222 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1223 &flags, NULL, 0, lockh, rc);
1225 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1226 mdc_clear_replay_flag(req, rc);
1230 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1231 LASSERT(lockrep != NULL);
1233 lockrep->lock_policy_res2 =
1234 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1236 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1240 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1244 minfo->mi_cb(req, minfo, rc);
1248 int mdc_intent_getattr_async(struct obd_export *exp,
1249 struct md_enqueue_info *minfo)
1251 struct md_op_data *op_data = &minfo->mi_data;
1252 struct lookup_intent *it = &minfo->mi_it;
1253 struct ptlrpc_request *req;
1254 struct mdc_getattr_args *ga;
1255 struct obd_device *obddev = class_exp2obd(exp);
1256 struct ldlm_res_id res_id;
1257 union ldlm_policy_data policy = {
1258 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1259 MDS_INODELOCK_UPDATE } };
1261 __u64 flags = LDLM_FL_HAS_INTENT;
1264 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1265 (int)op_data->op_namelen, op_data->op_name,
1266 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1268 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1269 req = mdc_intent_getattr_pack(exp, it, op_data);
1271 RETURN(PTR_ERR(req));
1273 rc = obd_get_request_slot(&obddev->u.cli);
1275 ptlrpc_req_finished(req);
1279 /* With Data-on-MDT the glimpse callback is needed too.
1280 * It is set here in advance but not in mdc_finish_enqueue()
1281 * to avoid possible races. It is safe to have glimpse handler
1282 * for non-DOM locks and costs nothing.*/
1283 if (minfo->mi_einfo.ei_cb_gl == NULL)
1284 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1286 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1287 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1289 obd_put_request_slot(&obddev->u.cli);
1290 ptlrpc_req_finished(req);
1294 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1295 ga = ptlrpc_req_async_args(req);
1297 ga->ga_minfo = minfo;
1299 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1300 ptlrpcd_add_req(req);