4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
40 # include <linux/module.h>
41 # include <linux/pagemap.h>
42 # include <linux/miscdevice.h>
43 # include <linux/init.h>
45 # include <liblustre.h>
48 #include <lustre_acl.h>
49 #include <obd_class.h>
50 #include <lustre_dlm.h>
51 /* fid_res_name_eq() */
52 #include <lustre_fid.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
56 struct mdc_getattr_args {
57 struct obd_export *ga_exp;
58 struct md_enqueue_info *ga_minfo;
59 struct ldlm_enqueue_info *ga_einfo;
62 int it_disposition(struct lookup_intent *it, int flag)
64 return it->d.lustre.it_disposition & flag;
66 EXPORT_SYMBOL(it_disposition);
68 void it_set_disposition(struct lookup_intent *it, int flag)
70 it->d.lustre.it_disposition |= flag;
72 EXPORT_SYMBOL(it_set_disposition);
74 void it_clear_disposition(struct lookup_intent *it, int flag)
76 it->d.lustre.it_disposition &= ~flag;
78 EXPORT_SYMBOL(it_clear_disposition);
80 int it_open_error(int phase, struct lookup_intent *it)
82 if (it_disposition(it, DISP_OPEN_LEASE)) {
83 if (phase >= DISP_OPEN_LEASE)
84 return it->d.lustre.it_status;
88 if (it_disposition(it, DISP_OPEN_OPEN)) {
89 if (phase >= DISP_OPEN_OPEN)
90 return it->d.lustre.it_status;
95 if (it_disposition(it, DISP_OPEN_CREATE)) {
96 if (phase >= DISP_OPEN_CREATE)
97 return it->d.lustre.it_status;
102 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
103 if (phase >= DISP_LOOKUP_EXECD)
104 return it->d.lustre.it_status;
109 if (it_disposition(it, DISP_IT_EXECD)) {
110 if (phase >= DISP_IT_EXECD)
111 return it->d.lustre.it_status;
115 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
116 it->d.lustre.it_status);
120 EXPORT_SYMBOL(it_open_error);
122 /* this must be called on a lockh that is known to have a referenced lock */
123 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
126 struct ldlm_lock *lock;
127 struct inode *new_inode = data;
136 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
138 LASSERT(lock != NULL);
139 lock_res_and_lock(lock);
141 if (lock->l_resource->lr_lvb_inode &&
142 lock->l_resource->lr_lvb_inode != data) {
143 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
144 LASSERTF(old_inode->i_state & I_FREEING,
145 "Found existing inode %p/%lu/%u state %lu in lock: "
146 "setting data to %p/%lu/%u\n", old_inode,
147 old_inode->i_ino, old_inode->i_generation,
149 new_inode, new_inode->i_ino, new_inode->i_generation);
152 lock->l_resource->lr_lvb_inode = new_inode;
154 *bits = lock->l_policy_data.l_inodebits.bits;
156 unlock_res_and_lock(lock);
162 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
163 const struct lu_fid *fid, ldlm_type_t type,
164 ldlm_policy_data_t *policy, ldlm_mode_t mode,
165 struct lustre_handle *lockh)
167 struct ldlm_res_id res_id;
171 fid_build_reg_res_name(fid, &res_id);
172 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
173 &res_id, type, policy, mode, lockh, 0);
177 int mdc_cancel_unused(struct obd_export *exp,
178 const struct lu_fid *fid,
179 ldlm_policy_data_t *policy,
181 ldlm_cancel_flags_t flags,
184 struct ldlm_res_id res_id;
185 struct obd_device *obd = class_exp2obd(exp);
190 fid_build_reg_res_name(fid, &res_id);
191 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
192 policy, mode, flags, opaque);
196 int mdc_null_inode(struct obd_export *exp,
197 const struct lu_fid *fid)
199 struct ldlm_res_id res_id;
200 struct ldlm_resource *res;
201 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
204 LASSERTF(ns != NULL, "no namespace passed\n");
206 fid_build_reg_res_name(fid, &res_id);
208 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
213 res->lr_lvb_inode = NULL;
216 ldlm_resource_putref(res);
220 /* find any ldlm lock of the inode in mdc
224 int mdc_find_cbdata(struct obd_export *exp,
225 const struct lu_fid *fid,
226 ldlm_iterator_t it, void *data)
228 struct ldlm_res_id res_id;
232 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
233 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
235 if (rc == LDLM_ITER_STOP)
237 else if (rc == LDLM_ITER_CONTINUE)
242 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
244 /* Don't hold error requests for replay. */
245 if (req->rq_replay) {
246 spin_lock(&req->rq_lock);
248 spin_unlock(&req->rq_lock);
250 if (rc && req->rq_transno != 0) {
251 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
256 /* Save a large LOV EA into the request buffer so that it is available
257 * for replay. We don't do this in the initial request because the
258 * original request doesn't need this buffer (at most it sends just the
259 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
260 * buffer and may also be difficult to allocate and save a very large
261 * request buffer for each open. (bug 5707)
263 * OOM here may cause recovery failure if lmm is needed (only for the
264 * original open if the MDS crashed just when this client also OOM'd)
265 * but this is incredibly unlikely, and questionable whether the client
266 * could do MDS recovery under OOM anyways... */
267 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
268 struct mdt_body *body)
272 /* FIXME: remove this explicit offset. */
273 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
276 CERROR("Can't enlarge segment %d size to %d\n",
277 DLM_INTENT_REC_OFF + 4, body->eadatasize);
278 body->valid &= ~OBD_MD_FLEASIZE;
279 body->eadatasize = 0;
283 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
284 struct lookup_intent *it,
285 struct md_op_data *op_data,
286 void *lmm, int lmmsize,
289 struct ptlrpc_request *req;
290 struct obd_device *obddev = class_exp2obd(exp);
291 struct ldlm_intent *lit;
292 CFS_LIST_HEAD(cancels);
298 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
300 /* XXX: openlock is not cancelled for cross-refs. */
301 /* If inode is known, cancel conflicting OPEN locks. */
302 if (fid_is_sane(&op_data->op_fid2)) {
303 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
304 if (it->it_flags & FMODE_WRITE)
309 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
312 else if (it->it_flags & FMODE_EXEC)
318 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
323 /* If CREATE, cancel parent's UPDATE lock. */
324 if (it->it_op & IT_CREAT)
328 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
330 MDS_INODELOCK_UPDATE);
332 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
333 &RQF_LDLM_INTENT_OPEN);
335 ldlm_lock_list_put(&cancels, l_bl_ast, count);
336 RETURN(ERR_PTR(-ENOMEM));
339 /* parent capability */
340 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
341 /* child capability, reserve the size according to parent capa, it will
342 * be filled after we get the reply */
343 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
345 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
346 op_data->op_namelen + 1);
347 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
348 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
350 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
352 ptlrpc_request_free(req);
356 spin_lock(&req->rq_lock);
357 req->rq_replay = req->rq_import->imp_replayable;
358 spin_unlock(&req->rq_lock);
360 /* pack the intent */
361 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
362 lit->opc = (__u64)it->it_op;
364 /* pack the intended request */
365 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
368 /* for remote client, fetch remote perm for current user */
369 if (client_is_remote(exp))
370 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
371 sizeof(struct mdt_remote_perm));
372 ptlrpc_request_set_replen(req);
376 static struct ptlrpc_request *
377 mdc_intent_getxattr_pack(struct obd_export *exp,
378 struct lookup_intent *it,
379 struct md_op_data *op_data)
381 struct ptlrpc_request *req;
382 struct ldlm_intent *lit;
383 int rc, count = 0, maxdata;
384 CFS_LIST_HEAD(cancels);
388 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
389 &RQF_LDLM_INTENT_GETXATTR);
391 RETURN(ERR_PTR(-ENOMEM));
393 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
395 if (it->it_op == IT_SETXATTR)
396 /* If we want to upgrade to LCK_PW, let's cancel LCK_PR
397 * locks now. This avoids unnecessary ASTs. */
398 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
400 MDS_INODELOCK_XATTR);
402 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
404 ptlrpc_request_free(req);
408 /* pack the intent */
409 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
410 lit->opc = IT_GETXATTR;
412 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
414 /* pack the intended request */
415 mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
416 op_data->op_valid, maxdata, -1, 0);
418 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
419 RCL_SERVER, maxdata);
421 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
422 RCL_SERVER, maxdata);
424 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
425 RCL_SERVER, maxdata);
427 ptlrpc_request_set_replen(req);
432 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
433 struct lookup_intent *it,
434 struct md_op_data *op_data)
436 struct ptlrpc_request *req;
437 struct obd_device *obddev = class_exp2obd(exp);
438 struct ldlm_intent *lit;
442 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
443 &RQF_LDLM_INTENT_UNLINK);
445 RETURN(ERR_PTR(-ENOMEM));
447 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
448 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
449 op_data->op_namelen + 1);
451 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
453 ptlrpc_request_free(req);
457 /* pack the intent */
458 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
459 lit->opc = (__u64)it->it_op;
461 /* pack the intended request */
462 mdc_unlink_pack(req, op_data);
464 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
465 obddev->u.cli.cl_max_mds_easize);
466 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
467 obddev->u.cli.cl_max_mds_cookiesize);
468 ptlrpc_request_set_replen(req);
472 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
473 struct lookup_intent *it,
474 struct md_op_data *op_data)
476 struct ptlrpc_request *req;
477 struct obd_device *obddev = class_exp2obd(exp);
478 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
479 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
480 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
481 (client_is_remote(exp) ?
482 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
483 struct ldlm_intent *lit;
487 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
488 &RQF_LDLM_INTENT_GETATTR);
490 RETURN(ERR_PTR(-ENOMEM));
492 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
493 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
494 op_data->op_namelen + 1);
496 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
498 ptlrpc_request_free(req);
502 /* pack the intent */
503 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
504 lit->opc = (__u64)it->it_op;
506 /* pack the intended request */
507 mdc_getattr_pack(req, valid, it->it_flags, op_data,
508 obddev->u.cli.cl_max_mds_easize);
510 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
511 obddev->u.cli.cl_max_mds_easize);
512 if (client_is_remote(exp))
513 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
514 sizeof(struct mdt_remote_perm));
515 ptlrpc_request_set_replen(req);
519 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
520 struct lookup_intent *it,
521 struct md_op_data *unused)
523 struct obd_device *obd = class_exp2obd(exp);
524 struct ptlrpc_request *req;
525 struct ldlm_intent *lit;
526 struct layout_intent *layout;
530 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
531 &RQF_LDLM_INTENT_LAYOUT);
533 RETURN(ERR_PTR(-ENOMEM));
535 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
536 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
538 ptlrpc_request_free(req);
542 /* pack the intent */
543 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
544 lit->opc = (__u64)it->it_op;
546 /* pack the layout intent request */
547 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
548 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
549 * set for replication */
550 layout->li_opc = LAYOUT_INTENT_ACCESS;
552 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
553 obd->u.cli.cl_max_mds_easize);
554 ptlrpc_request_set_replen(req);
558 static struct ptlrpc_request *
559 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
561 struct ptlrpc_request *req;
565 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
567 RETURN(ERR_PTR(-ENOMEM));
569 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
571 ptlrpc_request_free(req);
575 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
576 ptlrpc_request_set_replen(req);
580 static int mdc_finish_enqueue(struct obd_export *exp,
581 struct ptlrpc_request *req,
582 struct ldlm_enqueue_info *einfo,
583 struct lookup_intent *it,
584 struct lustre_handle *lockh,
587 struct req_capsule *pill = &req->rq_pill;
588 struct ldlm_request *lockreq;
589 struct ldlm_reply *lockrep;
590 struct lustre_intent_data *intent = &it->d.lustre;
591 struct ldlm_lock *lock;
592 void *lvb_data = NULL;
597 /* Similarly, if we're going to replay this request, we don't want to
598 * actually get a lock, just perform the intent. */
599 if (req->rq_transno || req->rq_replay) {
600 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
601 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
604 if (rc == ELDLM_LOCK_ABORTED) {
606 memset(lockh, 0, sizeof(*lockh));
608 } else { /* rc = 0 */
609 lock = ldlm_handle2lock(lockh);
610 LASSERT(lock != NULL);
612 /* If the server gave us back a different lock mode, we should
613 * fix up our variables. */
614 if (lock->l_req_mode != einfo->ei_mode) {
615 ldlm_lock_addref(lockh, lock->l_req_mode);
616 ldlm_lock_decref(lockh, einfo->ei_mode);
617 einfo->ei_mode = lock->l_req_mode;
622 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
623 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
625 intent->it_disposition = (int)lockrep->lock_policy_res1;
626 intent->it_status = (int)lockrep->lock_policy_res2;
627 intent->it_lock_mode = einfo->ei_mode;
628 intent->it_lock_handle = lockh->cookie;
629 intent->it_data = req;
631 /* Technically speaking rq_transno must already be zero if
632 * it_status is in error, so the check is a bit redundant */
633 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
634 mdc_clear_replay_flag(req, intent->it_status);
636 /* If we're doing an IT_OPEN which did not result in an actual
637 * successful open, then we need to remove the bit which saves
638 * this request for unconditional replay.
640 * It's important that we do this first! Otherwise we might exit the
641 * function without doing so, and try to replay a failed create
643 if (it->it_op & IT_OPEN && req->rq_replay &&
644 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
645 mdc_clear_replay_flag(req, intent->it_status);
647 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
648 it->it_op, intent->it_disposition, intent->it_status);
650 /* We know what to expect, so we do any byte flipping required here */
651 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
652 struct mdt_body *body;
654 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
656 CERROR ("Can't swab mdt_body\n");
660 if (it_disposition(it, DISP_OPEN_OPEN) &&
661 !it_open_error(DISP_OPEN_OPEN, it)) {
663 * If this is a successful OPEN request, we need to set
664 * replay handler and data early, so that if replay
665 * happens immediately after swabbing below, new reply
666 * is swabbed by that handler correctly.
668 mdc_set_open_replay_data(NULL, NULL, req);
671 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
674 mdc_update_max_ea_from_body(exp, body);
677 * The eadata is opaque; just check that it is there.
678 * Eventually, obd_unpackmd() will check the contents.
680 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
685 /* save lvb data and length in case this is for layout
688 lvb_len = body->eadatasize;
691 * We save the reply LOV EA in case we have to replay a
692 * create for recovery. If we didn't allocate a large
693 * enough request buffer above we need to reallocate it
694 * here to hold the actual LOV EA.
696 * To not save LOV EA if request is not going to replay
697 * (for example error one).
699 if ((it->it_op & IT_OPEN) && req->rq_replay) {
701 if (req_capsule_get_size(pill, &RMF_EADATA,
704 mdc_realloc_openmsg(req, body);
706 req_capsule_shrink(pill, &RMF_EADATA,
710 req_capsule_set_size(pill, &RMF_EADATA,
714 lmm = req_capsule_client_get(pill, &RMF_EADATA);
716 memcpy(lmm, eadata, body->eadatasize);
720 if (body->valid & OBD_MD_FLRMTPERM) {
721 struct mdt_remote_perm *perm;
723 LASSERT(client_is_remote(exp));
724 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
725 lustre_swab_mdt_remote_perm);
729 if (body->valid & OBD_MD_FLMDSCAPA) {
730 struct lustre_capa *capa, *p;
732 capa = req_capsule_server_get(pill, &RMF_CAPA1);
736 if (it->it_op & IT_OPEN) {
737 /* client fid capa will be checked in replay */
738 p = req_capsule_client_get(pill, &RMF_CAPA2);
743 if (body->valid & OBD_MD_FLOSSCAPA) {
744 struct lustre_capa *capa;
746 capa = req_capsule_server_get(pill, &RMF_CAPA2);
750 } else if (it->it_op & IT_LAYOUT) {
751 /* maybe the lock was granted right away and layout
752 * is packed into RMF_DLM_LVB of req */
753 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
755 lvb_data = req_capsule_server_sized_get(pill,
756 &RMF_DLM_LVB, lvb_len);
757 if (lvb_data == NULL)
762 /* fill in stripe data for layout lock */
763 lock = ldlm_handle2lock(lockh);
764 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
767 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
768 ldlm_it2str(it->it_op), lvb_len);
770 OBD_ALLOC_LARGE(lmm, lvb_len);
775 memcpy(lmm, lvb_data, lvb_len);
777 /* install lvb_data */
778 lock_res_and_lock(lock);
779 if (lock->l_lvb_data == NULL) {
780 lock->l_lvb_data = lmm;
781 lock->l_lvb_len = lvb_len;
784 unlock_res_and_lock(lock);
786 OBD_FREE_LARGE(lmm, lvb_len);
794 /* We always reserve enough space in the reply packet for a stripe MD, because
795 * we don't know in advance the file type. */
796 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
797 struct lookup_intent *it, struct md_op_data *op_data,
798 struct lustre_handle *lockh, void *lmm, int lmmsize,
799 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
801 struct obd_device *obddev = class_exp2obd(exp);
802 struct ptlrpc_request *req = NULL;
803 __u64 flags, saved_flags = extra_lock_flags;
805 struct ldlm_res_id res_id;
806 static const ldlm_policy_data_t lookup_policy =
807 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
808 static const ldlm_policy_data_t update_policy =
809 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
810 static const ldlm_policy_data_t layout_policy =
811 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
812 static const ldlm_policy_data_t getxattr_policy = {
813 .l_inodebits = { MDS_INODELOCK_XATTR } };
814 ldlm_policy_data_t const *policy = &lookup_policy;
815 int generation, resends = 0;
816 struct ldlm_reply *lockrep;
817 enum lvb_type lvb_type = 0;
820 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
823 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
826 saved_flags |= LDLM_FL_HAS_INTENT;
827 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
828 policy = &update_policy;
829 else if (it->it_op & IT_LAYOUT)
830 policy = &layout_policy;
831 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
832 policy = &getxattr_policy;
835 LASSERT(reqp == NULL);
837 generation = obddev->u.cli.cl_import->imp_generation;
841 /* The only way right now is FLOCK, in this case we hide flock
842 policy as lmm, but lmmsize is 0 */
843 LASSERT(lmm && lmmsize == 0);
844 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
846 policy = (ldlm_policy_data_t *)lmm;
847 res_id.name[3] = LDLM_FLOCK;
848 } else if (it->it_op & IT_OPEN) {
849 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
851 policy = &update_policy;
852 einfo->ei_cbdata = NULL;
854 } else if (it->it_op & IT_UNLINK) {
855 req = mdc_intent_unlink_pack(exp, it, op_data);
856 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
857 req = mdc_intent_getattr_pack(exp, it, op_data);
858 } else if (it->it_op & IT_READDIR) {
859 req = mdc_enqueue_pack(exp, 0);
860 } else if (it->it_op & IT_LAYOUT) {
861 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
863 req = mdc_intent_layout_pack(exp, it, op_data);
864 lvb_type = LVB_T_LAYOUT;
865 } else if (it->it_op & (IT_GETXATTR | IT_SETXATTR)) {
866 req = mdc_intent_getxattr_pack(exp, it, op_data);
873 RETURN(PTR_ERR(req));
875 if (req != NULL && it && it->it_op & IT_CREAT)
876 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
878 req->rq_no_retry_einprogress = 1;
881 req->rq_generation_set = 1;
882 req->rq_import_generation = generation;
883 req->rq_sent = cfs_time_current_sec() + resends;
886 /* It is important to obtain rpc_lock first (if applicable), so that
887 * threads that are serialised with rpc_lock are not polluting our
888 * rpcs in flight counter. We do not do flock request limiting, though*/
890 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
891 rc = mdc_enter_request(&obddev->u.cli);
893 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
894 mdc_clear_replay_flag(req, 0);
895 ptlrpc_req_finished(req);
900 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
901 0, lvb_type, lockh, 0);
903 /* For flock requests we immediatelly return without further
904 delay and let caller deal with the rest, since rest of
905 this function metadata processing makes no sense for flock
906 requests anyway. But in case of problem during comms with
907 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
908 can not rely on caller and this mainly for F_UNLCKs
909 (explicits or automatically generated by Kernel to clean
910 current FLocks upon exit) that can't be trashed */
911 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
912 (einfo->ei_type == LDLM_FLOCK) &&
913 (einfo->ei_mode == LCK_NL))
918 mdc_exit_request(&obddev->u.cli);
919 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
922 CERROR("ldlm_cli_enqueue: %d\n", rc);
923 mdc_clear_replay_flag(req, rc);
924 ptlrpc_req_finished(req);
928 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
929 LASSERT(lockrep != NULL);
931 lockrep->lock_policy_res2 =
932 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
934 /* Retry the create infinitely when we get -EINPROGRESS from
935 * server. This is required by the new quota design. */
936 if (it && it->it_op & IT_CREAT &&
937 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
938 mdc_clear_replay_flag(req, rc);
939 ptlrpc_req_finished(req);
942 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
943 obddev->obd_name, resends, it->it_op,
944 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
946 if (generation == obddev->u.cli.cl_import->imp_generation) {
949 CDEBUG(D_HA, "resend cross eviction\n");
954 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
956 if (lustre_handle_is_used(lockh)) {
957 ldlm_lock_decref(lockh, einfo->ei_mode);
958 memset(lockh, 0, sizeof(*lockh));
960 ptlrpc_req_finished(req);
965 static int mdc_finish_intent_lock(struct obd_export *exp,
966 struct ptlrpc_request *request,
967 struct md_op_data *op_data,
968 struct lookup_intent *it,
969 struct lustre_handle *lockh)
971 struct lustre_handle old_lock;
972 struct mdt_body *mdt_body;
973 struct ldlm_lock *lock;
977 LASSERT(request != NULL);
978 LASSERT(request != LP_POISON);
979 LASSERT(request->rq_repmsg != LP_POISON);
981 if (!it_disposition(it, DISP_IT_EXECD)) {
982 /* The server failed before it even started executing the
983 * intent, i.e. because it couldn't unpack the request. */
984 LASSERT(it->d.lustre.it_status != 0);
985 RETURN(it->d.lustre.it_status);
987 rc = it_open_error(DISP_IT_EXECD, it);
991 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
992 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
994 /* If we were revalidating a fid/name pair, mark the intent in
995 * case we fail and get called again from lookup */
996 if (fid_is_sane(&op_data->op_fid2) &&
997 it->it_create_mode & M_CHECK_STALE &&
998 it->it_op != IT_GETATTR) {
999 it_set_disposition(it, DISP_ENQ_COMPLETE);
1001 /* Also: did we find the same inode? */
1002 /* sever can return one of two fids:
1003 * op_fid2 - new allocated fid - if file is created.
1004 * op_fid3 - existent fid - if file only open.
1005 * op_fid3 is saved in lmv_intent_open */
1006 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
1007 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
1008 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
1009 "\n", PFID(&op_data->op_fid2),
1010 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
1015 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1019 /* keep requests around for the multiple phases of the call
1020 * this shows the DISP_XX must guarantee we make it into the call
1022 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1023 it_disposition(it, DISP_OPEN_CREATE) &&
1024 !it_open_error(DISP_OPEN_CREATE, it)) {
1025 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1026 ptlrpc_request_addref(request); /* balanced in ll_create_node */
1028 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1029 it_disposition(it, DISP_OPEN_OPEN) &&
1030 !it_open_error(DISP_OPEN_OPEN, it)) {
1031 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1032 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1033 /* BUG 11546 - eviction in the middle of open rpc processing */
1034 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1037 if (it->it_op & IT_CREAT) {
1038 /* XXX this belongs in ll_create_it */
1039 } else if (it->it_op == IT_OPEN) {
1040 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1042 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1045 /* If we already have a matching lock, then cancel the new
1046 * one. We have to set the data here instead of in
1047 * mdc_enqueue, because we need to use the child's inode as
1048 * the l_ast_data to match, and that's not available until
1049 * intent_finish has performed the iget().) */
1050 lock = ldlm_handle2lock(lockh);
1052 ldlm_policy_data_t policy = lock->l_policy_data;
1053 LDLM_DEBUG(lock, "matching against this");
1055 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1056 &lock->l_resource->lr_name),
1057 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1058 PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1059 LDLM_LOCK_PUT(lock);
1061 memcpy(&old_lock, lockh, sizeof(*lockh));
1062 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1063 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1064 ldlm_lock_decref_and_cancel(lockh,
1065 it->d.lustre.it_lock_mode);
1066 memcpy(lockh, &old_lock, sizeof(old_lock));
1067 it->d.lustre.it_lock_handle = lockh->cookie;
1070 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1071 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1072 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1076 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1077 struct lu_fid *fid, __u64 *bits)
1079 /* We could just return 1 immediately, but since we should only
1080 * be called in revalidate_it if we already have a lock, let's
1082 struct ldlm_res_id res_id;
1083 struct lustre_handle lockh;
1084 ldlm_policy_data_t policy;
1088 if (it->d.lustre.it_lock_handle) {
1089 lockh.cookie = it->d.lustre.it_lock_handle;
1090 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1092 fid_build_reg_res_name(fid, &res_id);
1093 switch (it->it_op) {
1095 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1098 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1101 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1104 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1105 LDLM_FL_BLOCK_GRANTED, &res_id,
1106 LDLM_IBITS, &policy,
1107 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1111 it->d.lustre.it_lock_handle = lockh.cookie;
1112 it->d.lustre.it_lock_mode = mode;
1114 it->d.lustre.it_lock_handle = 0;
1115 it->d.lustre.it_lock_mode = 0;
1122 * This long block is all about fixing up the lock and request state
1123 * so that it is correct as of the moment _before_ the operation was
1124 * applied; that way, the VFS will think that everything is normal and
1125 * call Lustre's regular VFS methods.
1127 * If we're performing a creation, that means that unless the creation
1128 * failed with EEXIST, we should fake up a negative dentry.
1130 * For everything else, we want to lookup to succeed.
1132 * One additional note: if CREATE or OPEN succeeded, we add an extra
1133 * reference to the request because we need to keep it around until
1134 * ll_create/ll_open gets called.
1136 * The server will return to us, in it_disposition, an indication of
1137 * exactly what d.lustre.it_status refers to.
1139 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1140 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1141 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1142 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1145 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1148 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1149 void *lmm, int lmmsize, struct lookup_intent *it,
1150 int lookup_flags, struct ptlrpc_request **reqp,
1151 ldlm_blocking_callback cb_blocking,
1152 __u64 extra_lock_flags)
1154 struct lustre_handle lockh;
1159 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1160 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1161 op_data->op_name, PFID(&op_data->op_fid2),
1162 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1166 if (fid_is_sane(&op_data->op_fid2) &&
1167 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1168 /* We could just return 1 immediately, but since we should only
1169 * be called in revalidate_it if we already have a lock, let's
1171 it->d.lustre.it_lock_handle = 0;
1172 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1173 /* Only return failure if it was not GETATTR by cfid
1174 (from inode_revalidate) */
1175 if (rc || op_data->op_namelen != 0)
1179 /* lookup_it may be called only after revalidate_it has run, because
1180 * revalidate_it cannot return errors, only zero. Returning zero causes
1181 * this call to lookup, which *can* return an error.
1183 * We only want to execute the request associated with the intent one
1184 * time, however, so don't send the request again. Instead, skip past
1185 * this and use the request from revalidate. In this case, revalidate
1186 * never dropped its reference, so the refcounts are all OK */
1187 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1188 struct ldlm_enqueue_info einfo = {
1189 .ei_type = LDLM_IBITS,
1190 .ei_mode = it_to_lock_mode(it),
1191 .ei_cb_bl = cb_blocking,
1192 .ei_cb_cp = ldlm_completion_ast,
1195 /* For case if upper layer did not alloc fid, do it now. */
1196 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1197 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1199 CERROR("Can't alloc new fid, rc %d\n", rc);
1203 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1204 lmm, lmmsize, NULL, extra_lock_flags);
1207 } else if (!fid_is_sane(&op_data->op_fid2) ||
1208 !(it->it_create_mode & M_CHECK_STALE)) {
1209 /* DISP_ENQ_COMPLETE set means there is extra reference on
1210 * request referenced from this intent, saved for subsequent
1211 * lookup. This path is executed when we proceed to this
1212 * lookup, so we clear DISP_ENQ_COMPLETE */
1213 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1215 *reqp = it->d.lustre.it_data;
1216 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1220 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1221 struct ptlrpc_request *req,
1224 struct mdc_getattr_args *ga = args;
1225 struct obd_export *exp = ga->ga_exp;
1226 struct md_enqueue_info *minfo = ga->ga_minfo;
1227 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1228 struct lookup_intent *it;
1229 struct lustre_handle *lockh;
1230 struct obd_device *obddev;
1231 struct ldlm_reply *lockrep;
1232 __u64 flags = LDLM_FL_HAS_INTENT;
1236 lockh = &minfo->mi_lockh;
1238 obddev = class_exp2obd(exp);
1240 mdc_exit_request(&obddev->u.cli);
1241 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1244 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1245 &flags, NULL, 0, lockh, rc);
1247 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1248 mdc_clear_replay_flag(req, rc);
1252 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1253 LASSERT(lockrep != NULL);
1255 lockrep->lock_policy_res2 =
1256 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1258 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1262 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1266 OBD_FREE_PTR(einfo);
1267 minfo->mi_cb(req, minfo, rc);
1271 int mdc_intent_getattr_async(struct obd_export *exp,
1272 struct md_enqueue_info *minfo,
1273 struct ldlm_enqueue_info *einfo)
1275 struct md_op_data *op_data = &minfo->mi_data;
1276 struct lookup_intent *it = &minfo->mi_it;
1277 struct ptlrpc_request *req;
1278 struct mdc_getattr_args *ga;
1279 struct obd_device *obddev = class_exp2obd(exp);
1280 struct ldlm_res_id res_id;
1281 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1282 * for statahead currently. Consider CMD in future, such two bits
1283 * maybe managed by different MDS, should be adjusted then. */
1284 ldlm_policy_data_t policy = {
1285 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1286 MDS_INODELOCK_UPDATE }
1289 __u64 flags = LDLM_FL_HAS_INTENT;
1292 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1293 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1294 ldlm_it2str(it->it_op), it->it_flags);
1296 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1297 req = mdc_intent_getattr_pack(exp, it, op_data);
1301 rc = mdc_enter_request(&obddev->u.cli);
1303 ptlrpc_req_finished(req);
1307 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1308 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1310 mdc_exit_request(&obddev->u.cli);
1311 ptlrpc_req_finished(req);
1315 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1316 ga = ptlrpc_req_async_args(req);
1318 ga->ga_minfo = minfo;
1319 ga->ga_einfo = einfo;
1321 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1322 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);