4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
40 # include <linux/module.h>
41 # include <linux/pagemap.h>
42 # include <linux/miscdevice.h>
43 # include <linux/init.h>
45 # include <liblustre.h>
48 #include <lustre_acl.h>
49 #include <obd_class.h>
50 #include <lustre_dlm.h>
51 /* fid_res_name_eq() */
52 #include <lustre_fid.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
56 struct mdc_getattr_args {
57 struct obd_export *ga_exp;
58 struct md_enqueue_info *ga_minfo;
59 struct ldlm_enqueue_info *ga_einfo;
62 int it_disposition(struct lookup_intent *it, int flag)
64 return it->d.lustre.it_disposition & flag;
66 EXPORT_SYMBOL(it_disposition);
68 void it_set_disposition(struct lookup_intent *it, int flag)
70 it->d.lustre.it_disposition |= flag;
72 EXPORT_SYMBOL(it_set_disposition);
74 void it_clear_disposition(struct lookup_intent *it, int flag)
76 it->d.lustre.it_disposition &= ~flag;
78 EXPORT_SYMBOL(it_clear_disposition);
80 int it_open_error(int phase, struct lookup_intent *it)
82 if (it_disposition(it, DISP_OPEN_LEASE)) {
83 if (phase >= DISP_OPEN_LEASE)
84 return it->d.lustre.it_status;
88 if (it_disposition(it, DISP_OPEN_OPEN)) {
89 if (phase >= DISP_OPEN_OPEN)
90 return it->d.lustre.it_status;
95 if (it_disposition(it, DISP_OPEN_CREATE)) {
96 if (phase >= DISP_OPEN_CREATE)
97 return it->d.lustre.it_status;
102 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
103 if (phase >= DISP_LOOKUP_EXECD)
104 return it->d.lustre.it_status;
109 if (it_disposition(it, DISP_IT_EXECD)) {
110 if (phase >= DISP_IT_EXECD)
111 return it->d.lustre.it_status;
115 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
116 it->d.lustre.it_status);
120 EXPORT_SYMBOL(it_open_error);
122 /* this must be called on a lockh that is known to have a referenced lock */
123 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
126 struct ldlm_lock *lock;
127 struct inode *new_inode = data;
136 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
138 LASSERT(lock != NULL);
139 lock_res_and_lock(lock);
141 if (lock->l_resource->lr_lvb_inode &&
142 lock->l_resource->lr_lvb_inode != data) {
143 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
144 LASSERTF(old_inode->i_state & I_FREEING,
145 "Found existing inode %p/%lu/%u state %lu in lock: "
146 "setting data to %p/%lu/%u\n", old_inode,
147 old_inode->i_ino, old_inode->i_generation,
149 new_inode, new_inode->i_ino, new_inode->i_generation);
152 lock->l_resource->lr_lvb_inode = new_inode;
154 *bits = lock->l_policy_data.l_inodebits.bits;
156 unlock_res_and_lock(lock);
162 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
163 const struct lu_fid *fid, ldlm_type_t type,
164 ldlm_policy_data_t *policy, ldlm_mode_t mode,
165 struct lustre_handle *lockh)
167 struct ldlm_res_id res_id;
171 fid_build_reg_res_name(fid, &res_id);
172 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
173 &res_id, type, policy, mode, lockh, 0);
177 int mdc_cancel_unused(struct obd_export *exp,
178 const struct lu_fid *fid,
179 ldlm_policy_data_t *policy,
181 ldlm_cancel_flags_t flags,
184 struct ldlm_res_id res_id;
185 struct obd_device *obd = class_exp2obd(exp);
190 fid_build_reg_res_name(fid, &res_id);
191 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
192 policy, mode, flags, opaque);
196 int mdc_null_inode(struct obd_export *exp,
197 const struct lu_fid *fid)
199 struct ldlm_res_id res_id;
200 struct ldlm_resource *res;
201 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
204 LASSERTF(ns != NULL, "no namespace passed\n");
206 fid_build_reg_res_name(fid, &res_id);
208 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
213 res->lr_lvb_inode = NULL;
216 ldlm_resource_putref(res);
220 /* find any ldlm lock of the inode in mdc
224 int mdc_find_cbdata(struct obd_export *exp,
225 const struct lu_fid *fid,
226 ldlm_iterator_t it, void *data)
228 struct ldlm_res_id res_id;
232 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
233 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
235 if (rc == LDLM_ITER_STOP)
237 else if (rc == LDLM_ITER_CONTINUE)
242 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
244 /* Don't hold error requests for replay. */
245 if (req->rq_replay) {
246 spin_lock(&req->rq_lock);
248 spin_unlock(&req->rq_lock);
250 if (rc && req->rq_transno != 0) {
251 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
256 /* Save a large LOV EA into the request buffer so that it is available
257 * for replay. We don't do this in the initial request because the
258 * original request doesn't need this buffer (at most it sends just the
259 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
260 * buffer and may also be difficult to allocate and save a very large
261 * request buffer for each open. (bug 5707)
263 * OOM here may cause recovery failure if lmm is needed (only for the
264 * original open if the MDS crashed just when this client also OOM'd)
265 * but this is incredibly unlikely, and questionable whether the client
266 * could do MDS recovery under OOM anyways... */
267 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
268 struct mdt_body *body)
272 /* FIXME: remove this explicit offset. */
273 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
276 CERROR("Can't enlarge segment %d size to %d\n",
277 DLM_INTENT_REC_OFF + 4, body->eadatasize);
278 body->valid &= ~OBD_MD_FLEASIZE;
279 body->eadatasize = 0;
283 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
284 struct lookup_intent *it,
285 struct md_op_data *op_data,
286 void *lmm, int lmmsize,
289 struct ptlrpc_request *req;
290 struct obd_device *obddev = class_exp2obd(exp);
291 struct ldlm_intent *lit;
292 CFS_LIST_HEAD(cancels);
298 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
300 /* XXX: openlock is not cancelled for cross-refs. */
301 /* If inode is known, cancel conflicting OPEN locks. */
302 if (fid_is_sane(&op_data->op_fid2)) {
303 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
304 if (it->it_flags & FMODE_WRITE)
309 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
312 else if (it->it_flags & FMODE_EXEC)
318 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
323 /* If CREATE, cancel parent's UPDATE lock. */
324 if (it->it_op & IT_CREAT)
328 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
330 MDS_INODELOCK_UPDATE);
332 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
333 &RQF_LDLM_INTENT_OPEN);
335 ldlm_lock_list_put(&cancels, l_bl_ast, count);
336 RETURN(ERR_PTR(-ENOMEM));
339 /* parent capability */
340 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
341 /* child capability, reserve the size according to parent capa, it will
342 * be filled after we get the reply */
343 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
345 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
346 op_data->op_namelen + 1);
347 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
348 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
350 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
352 ptlrpc_request_free(req);
356 spin_lock(&req->rq_lock);
357 req->rq_replay = req->rq_import->imp_replayable;
358 spin_unlock(&req->rq_lock);
360 /* pack the intent */
361 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
362 lit->opc = (__u64)it->it_op;
364 /* pack the intended request */
365 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
368 /* for remote client, fetch remote perm for current user */
369 if (client_is_remote(exp))
370 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
371 sizeof(struct mdt_remote_perm));
372 ptlrpc_request_set_replen(req);
376 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
377 struct lookup_intent *it,
378 struct md_op_data *op_data)
380 struct ptlrpc_request *req;
381 struct obd_device *obddev = class_exp2obd(exp);
382 struct ldlm_intent *lit;
386 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
387 &RQF_LDLM_INTENT_UNLINK);
389 RETURN(ERR_PTR(-ENOMEM));
391 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
392 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
393 op_data->op_namelen + 1);
395 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
397 ptlrpc_request_free(req);
401 /* pack the intent */
402 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
403 lit->opc = (__u64)it->it_op;
405 /* pack the intended request */
406 mdc_unlink_pack(req, op_data);
408 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
409 obddev->u.cli.cl_max_mds_easize);
410 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
411 obddev->u.cli.cl_max_mds_cookiesize);
412 ptlrpc_request_set_replen(req);
416 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
417 struct lookup_intent *it,
418 struct md_op_data *op_data)
420 struct ptlrpc_request *req;
421 struct obd_device *obddev = class_exp2obd(exp);
422 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
423 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
424 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
425 (client_is_remote(exp) ?
426 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
427 struct ldlm_intent *lit;
431 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
432 &RQF_LDLM_INTENT_GETATTR);
434 RETURN(ERR_PTR(-ENOMEM));
436 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
437 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
438 op_data->op_namelen + 1);
440 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
442 ptlrpc_request_free(req);
446 /* pack the intent */
447 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
448 lit->opc = (__u64)it->it_op;
450 /* pack the intended request */
451 mdc_getattr_pack(req, valid, it->it_flags, op_data,
452 obddev->u.cli.cl_max_mds_easize);
454 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
455 obddev->u.cli.cl_max_mds_easize);
456 if (client_is_remote(exp))
457 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
458 sizeof(struct mdt_remote_perm));
459 ptlrpc_request_set_replen(req);
463 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
464 struct lookup_intent *it,
465 struct md_op_data *unused)
467 struct obd_device *obd = class_exp2obd(exp);
468 struct ptlrpc_request *req;
469 struct ldlm_intent *lit;
470 struct layout_intent *layout;
474 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
475 &RQF_LDLM_INTENT_LAYOUT);
477 RETURN(ERR_PTR(-ENOMEM));
479 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
480 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
482 ptlrpc_request_free(req);
486 /* pack the intent */
487 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
488 lit->opc = (__u64)it->it_op;
490 /* pack the layout intent request */
491 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
492 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
493 * set for replication */
494 layout->li_opc = LAYOUT_INTENT_ACCESS;
496 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
497 obd->u.cli.cl_max_mds_easize);
498 ptlrpc_request_set_replen(req);
502 static struct ptlrpc_request *
503 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
505 struct ptlrpc_request *req;
509 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
511 RETURN(ERR_PTR(-ENOMEM));
513 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
515 ptlrpc_request_free(req);
519 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
520 ptlrpc_request_set_replen(req);
524 static int mdc_finish_enqueue(struct obd_export *exp,
525 struct ptlrpc_request *req,
526 struct ldlm_enqueue_info *einfo,
527 struct lookup_intent *it,
528 struct lustre_handle *lockh,
531 struct req_capsule *pill = &req->rq_pill;
532 struct ldlm_request *lockreq;
533 struct ldlm_reply *lockrep;
534 struct lustre_intent_data *intent = &it->d.lustre;
535 struct ldlm_lock *lock;
536 void *lvb_data = NULL;
541 /* Similarly, if we're going to replay this request, we don't want to
542 * actually get a lock, just perform the intent. */
543 if (req->rq_transno || req->rq_replay) {
544 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
545 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
548 if (rc == ELDLM_LOCK_ABORTED) {
550 memset(lockh, 0, sizeof(*lockh));
552 } else { /* rc = 0 */
553 lock = ldlm_handle2lock(lockh);
554 LASSERT(lock != NULL);
556 /* If the server gave us back a different lock mode, we should
557 * fix up our variables. */
558 if (lock->l_req_mode != einfo->ei_mode) {
559 ldlm_lock_addref(lockh, lock->l_req_mode);
560 ldlm_lock_decref(lockh, einfo->ei_mode);
561 einfo->ei_mode = lock->l_req_mode;
566 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
567 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
569 intent->it_disposition = (int)lockrep->lock_policy_res1;
570 intent->it_status = (int)lockrep->lock_policy_res2;
571 intent->it_lock_mode = einfo->ei_mode;
572 intent->it_lock_handle = lockh->cookie;
573 intent->it_data = req;
575 /* Technically speaking rq_transno must already be zero if
576 * it_status is in error, so the check is a bit redundant */
577 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
578 mdc_clear_replay_flag(req, intent->it_status);
580 /* If we're doing an IT_OPEN which did not result in an actual
581 * successful open, then we need to remove the bit which saves
582 * this request for unconditional replay.
584 * It's important that we do this first! Otherwise we might exit the
585 * function without doing so, and try to replay a failed create
587 if (it->it_op & IT_OPEN && req->rq_replay &&
588 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
589 mdc_clear_replay_flag(req, intent->it_status);
591 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
592 it->it_op, intent->it_disposition, intent->it_status);
594 /* We know what to expect, so we do any byte flipping required here */
595 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
596 struct mdt_body *body;
598 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
600 CERROR ("Can't swab mdt_body\n");
604 if (it_disposition(it, DISP_OPEN_OPEN) &&
605 !it_open_error(DISP_OPEN_OPEN, it)) {
607 * If this is a successful OPEN request, we need to set
608 * replay handler and data early, so that if replay
609 * happens immediately after swabbing below, new reply
610 * is swabbed by that handler correctly.
612 mdc_set_open_replay_data(NULL, NULL, req);
615 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
618 mdc_update_max_ea_from_body(exp, body);
621 * The eadata is opaque; just check that it is there.
622 * Eventually, obd_unpackmd() will check the contents.
624 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
629 /* save lvb data and length in case this is for layout
632 lvb_len = body->eadatasize;
635 * We save the reply LOV EA in case we have to replay a
636 * create for recovery. If we didn't allocate a large
637 * enough request buffer above we need to reallocate it
638 * here to hold the actual LOV EA.
640 * To not save LOV EA if request is not going to replay
641 * (for example error one).
643 if ((it->it_op & IT_OPEN) && req->rq_replay) {
645 if (req_capsule_get_size(pill, &RMF_EADATA,
648 mdc_realloc_openmsg(req, body);
650 req_capsule_shrink(pill, &RMF_EADATA,
654 req_capsule_set_size(pill, &RMF_EADATA,
658 lmm = req_capsule_client_get(pill, &RMF_EADATA);
660 memcpy(lmm, eadata, body->eadatasize);
664 if (body->valid & OBD_MD_FLRMTPERM) {
665 struct mdt_remote_perm *perm;
667 LASSERT(client_is_remote(exp));
668 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
669 lustre_swab_mdt_remote_perm);
673 if (body->valid & OBD_MD_FLMDSCAPA) {
674 struct lustre_capa *capa, *p;
676 capa = req_capsule_server_get(pill, &RMF_CAPA1);
680 if (it->it_op & IT_OPEN) {
681 /* client fid capa will be checked in replay */
682 p = req_capsule_client_get(pill, &RMF_CAPA2);
687 if (body->valid & OBD_MD_FLOSSCAPA) {
688 struct lustre_capa *capa;
690 capa = req_capsule_server_get(pill, &RMF_CAPA2);
694 } else if (it->it_op & IT_LAYOUT) {
695 /* maybe the lock was granted right away and layout
696 * is packed into RMF_DLM_LVB of req */
697 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
699 lvb_data = req_capsule_server_sized_get(pill,
700 &RMF_DLM_LVB, lvb_len);
701 if (lvb_data == NULL)
706 /* fill in stripe data for layout lock */
707 lock = ldlm_handle2lock(lockh);
708 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
711 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
712 ldlm_it2str(it->it_op), lvb_len);
714 OBD_ALLOC_LARGE(lmm, lvb_len);
719 memcpy(lmm, lvb_data, lvb_len);
721 /* install lvb_data */
722 lock_res_and_lock(lock);
723 if (lock->l_lvb_data == NULL) {
724 lock->l_lvb_data = lmm;
725 lock->l_lvb_len = lvb_len;
728 unlock_res_and_lock(lock);
730 OBD_FREE_LARGE(lmm, lvb_len);
738 /* We always reserve enough space in the reply packet for a stripe MD, because
739 * we don't know in advance the file type. */
740 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
741 struct lookup_intent *it, struct md_op_data *op_data,
742 struct lustre_handle *lockh, void *lmm, int lmmsize,
743 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
745 struct obd_device *obddev = class_exp2obd(exp);
746 struct ptlrpc_request *req = NULL;
747 __u64 flags, saved_flags = extra_lock_flags;
749 struct ldlm_res_id res_id;
750 static const ldlm_policy_data_t lookup_policy =
751 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
752 static const ldlm_policy_data_t update_policy =
753 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
754 static const ldlm_policy_data_t layout_policy =
755 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
756 ldlm_policy_data_t const *policy = &lookup_policy;
757 int generation, resends = 0;
758 struct ldlm_reply *lockrep;
759 enum lvb_type lvb_type = 0;
762 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
765 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
768 saved_flags |= LDLM_FL_HAS_INTENT;
769 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
770 policy = &update_policy;
771 else if (it->it_op & IT_LAYOUT)
772 policy = &layout_policy;
775 LASSERT(reqp == NULL);
777 generation = obddev->u.cli.cl_import->imp_generation;
781 /* The only way right now is FLOCK, in this case we hide flock
782 policy as lmm, but lmmsize is 0 */
783 LASSERT(lmm && lmmsize == 0);
784 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
786 policy = (ldlm_policy_data_t *)lmm;
787 res_id.name[3] = LDLM_FLOCK;
788 } else if (it->it_op & IT_OPEN) {
789 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
791 policy = &update_policy;
792 einfo->ei_cbdata = NULL;
794 } else if (it->it_op & IT_UNLINK) {
795 req = mdc_intent_unlink_pack(exp, it, op_data);
796 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
797 req = mdc_intent_getattr_pack(exp, it, op_data);
798 } else if (it->it_op & IT_READDIR) {
799 req = mdc_enqueue_pack(exp, 0);
800 } else if (it->it_op & IT_LAYOUT) {
801 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
804 req = mdc_intent_layout_pack(exp, it, op_data);
805 lvb_type = LVB_T_LAYOUT;
812 RETURN(PTR_ERR(req));
814 if (req != NULL && it && it->it_op & IT_CREAT)
815 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
817 req->rq_no_retry_einprogress = 1;
820 req->rq_generation_set = 1;
821 req->rq_import_generation = generation;
822 req->rq_sent = cfs_time_current_sec() + resends;
825 /* It is important to obtain rpc_lock first (if applicable), so that
826 * threads that are serialised with rpc_lock are not polluting our
827 * rpcs in flight counter. We do not do flock request limiting, though*/
829 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
830 rc = mdc_enter_request(&obddev->u.cli);
832 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
833 mdc_clear_replay_flag(req, 0);
834 ptlrpc_req_finished(req);
839 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
840 0, lvb_type, lockh, 0);
842 /* For flock requests we immediatelly return without further
843 delay and let caller deal with the rest, since rest of
844 this function metadata processing makes no sense for flock
845 requests anyway. But in case of problem during comms with
846 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
847 can not rely on caller and this mainly for F_UNLCKs
848 (explicits or automatically generated by Kernel to clean
849 current FLocks upon exit) that can't be trashed */
850 if ((rc == -EINTR) || (rc == -ETIMEDOUT))
855 mdc_exit_request(&obddev->u.cli);
856 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
859 CERROR("ldlm_cli_enqueue: %d\n", rc);
860 mdc_clear_replay_flag(req, rc);
861 ptlrpc_req_finished(req);
865 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
866 LASSERT(lockrep != NULL);
868 lockrep->lock_policy_res2 =
869 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
871 /* Retry the create infinitely when we get -EINPROGRESS from
872 * server. This is required by the new quota design. */
873 if (it && it->it_op & IT_CREAT &&
874 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
875 mdc_clear_replay_flag(req, rc);
876 ptlrpc_req_finished(req);
879 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
880 obddev->obd_name, resends, it->it_op,
881 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
883 if (generation == obddev->u.cli.cl_import->imp_generation) {
886 CDEBUG(D_HA, "resend cross eviction\n");
891 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
893 if (lustre_handle_is_used(lockh)) {
894 ldlm_lock_decref(lockh, einfo->ei_mode);
895 memset(lockh, 0, sizeof(*lockh));
897 ptlrpc_req_finished(req);
902 static int mdc_finish_intent_lock(struct obd_export *exp,
903 struct ptlrpc_request *request,
904 struct md_op_data *op_data,
905 struct lookup_intent *it,
906 struct lustre_handle *lockh)
908 struct lustre_handle old_lock;
909 struct mdt_body *mdt_body;
910 struct ldlm_lock *lock;
914 LASSERT(request != NULL);
915 LASSERT(request != LP_POISON);
916 LASSERT(request->rq_repmsg != LP_POISON);
918 if (!it_disposition(it, DISP_IT_EXECD)) {
919 /* The server failed before it even started executing the
920 * intent, i.e. because it couldn't unpack the request. */
921 LASSERT(it->d.lustre.it_status != 0);
922 RETURN(it->d.lustre.it_status);
924 rc = it_open_error(DISP_IT_EXECD, it);
928 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
929 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
931 /* If we were revalidating a fid/name pair, mark the intent in
932 * case we fail and get called again from lookup */
933 if (fid_is_sane(&op_data->op_fid2) &&
934 it->it_create_mode & M_CHECK_STALE &&
935 it->it_op != IT_GETATTR) {
936 it_set_disposition(it, DISP_ENQ_COMPLETE);
938 /* Also: did we find the same inode? */
939 /* sever can return one of two fids:
940 * op_fid2 - new allocated fid - if file is created.
941 * op_fid3 - existent fid - if file only open.
942 * op_fid3 is saved in lmv_intent_open */
943 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
944 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
945 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
946 "\n", PFID(&op_data->op_fid2),
947 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
952 rc = it_open_error(DISP_LOOKUP_EXECD, it);
956 /* keep requests around for the multiple phases of the call
957 * this shows the DISP_XX must guarantee we make it into the call
959 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
960 it_disposition(it, DISP_OPEN_CREATE) &&
961 !it_open_error(DISP_OPEN_CREATE, it)) {
962 it_set_disposition(it, DISP_ENQ_CREATE_REF);
963 ptlrpc_request_addref(request); /* balanced in ll_create_node */
965 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
966 it_disposition(it, DISP_OPEN_OPEN) &&
967 !it_open_error(DISP_OPEN_OPEN, it)) {
968 it_set_disposition(it, DISP_ENQ_OPEN_REF);
969 ptlrpc_request_addref(request); /* balanced in ll_file_open */
970 /* BUG 11546 - eviction in the middle of open rpc processing */
971 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
974 if (it->it_op & IT_CREAT) {
975 /* XXX this belongs in ll_create_it */
976 } else if (it->it_op == IT_OPEN) {
977 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
979 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
982 /* If we already have a matching lock, then cancel the new
983 * one. We have to set the data here instead of in
984 * mdc_enqueue, because we need to use the child's inode as
985 * the l_ast_data to match, and that's not available until
986 * intent_finish has performed the iget().) */
987 lock = ldlm_handle2lock(lockh);
989 ldlm_policy_data_t policy = lock->l_policy_data;
990 LDLM_DEBUG(lock, "matching against this");
992 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
993 &lock->l_resource->lr_name),
994 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
995 PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
998 memcpy(&old_lock, lockh, sizeof(*lockh));
999 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1000 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1001 ldlm_lock_decref_and_cancel(lockh,
1002 it->d.lustre.it_lock_mode);
1003 memcpy(lockh, &old_lock, sizeof(old_lock));
1004 it->d.lustre.it_lock_handle = lockh->cookie;
1007 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1008 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1009 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1013 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1014 struct lu_fid *fid, __u64 *bits)
1016 /* We could just return 1 immediately, but since we should only
1017 * be called in revalidate_it if we already have a lock, let's
1019 struct ldlm_res_id res_id;
1020 struct lustre_handle lockh;
1021 ldlm_policy_data_t policy;
1025 if (it->d.lustre.it_lock_handle) {
1026 lockh.cookie = it->d.lustre.it_lock_handle;
1027 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1029 fid_build_reg_res_name(fid, &res_id);
1030 switch (it->it_op) {
1032 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1035 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1038 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1041 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1042 LDLM_FL_BLOCK_GRANTED, &res_id,
1043 LDLM_IBITS, &policy,
1044 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1048 it->d.lustre.it_lock_handle = lockh.cookie;
1049 it->d.lustre.it_lock_mode = mode;
1051 it->d.lustre.it_lock_handle = 0;
1052 it->d.lustre.it_lock_mode = 0;
1059 * This long block is all about fixing up the lock and request state
1060 * so that it is correct as of the moment _before_ the operation was
1061 * applied; that way, the VFS will think that everything is normal and
1062 * call Lustre's regular VFS methods.
1064 * If we're performing a creation, that means that unless the creation
1065 * failed with EEXIST, we should fake up a negative dentry.
1067 * For everything else, we want to lookup to succeed.
1069 * One additional note: if CREATE or OPEN succeeded, we add an extra
1070 * reference to the request because we need to keep it around until
1071 * ll_create/ll_open gets called.
1073 * The server will return to us, in it_disposition, an indication of
1074 * exactly what d.lustre.it_status refers to.
1076 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1077 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1078 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1079 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1082 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1085 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1086 void *lmm, int lmmsize, struct lookup_intent *it,
1087 int lookup_flags, struct ptlrpc_request **reqp,
1088 ldlm_blocking_callback cb_blocking,
1089 __u64 extra_lock_flags)
1091 struct lustre_handle lockh;
1096 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1097 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1098 op_data->op_name, PFID(&op_data->op_fid2),
1099 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1103 if (fid_is_sane(&op_data->op_fid2) &&
1104 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1105 /* We could just return 1 immediately, but since we should only
1106 * be called in revalidate_it if we already have a lock, let's
1108 it->d.lustre.it_lock_handle = 0;
1109 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1110 /* Only return failure if it was not GETATTR by cfid
1111 (from inode_revalidate) */
1112 if (rc || op_data->op_namelen != 0)
1116 /* lookup_it may be called only after revalidate_it has run, because
1117 * revalidate_it cannot return errors, only zero. Returning zero causes
1118 * this call to lookup, which *can* return an error.
1120 * We only want to execute the request associated with the intent one
1121 * time, however, so don't send the request again. Instead, skip past
1122 * this and use the request from revalidate. In this case, revalidate
1123 * never dropped its reference, so the refcounts are all OK */
1124 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1125 struct ldlm_enqueue_info einfo = {
1126 .ei_type = LDLM_IBITS,
1127 .ei_mode = it_to_lock_mode(it),
1128 .ei_cb_bl = cb_blocking,
1129 .ei_cb_cp = ldlm_completion_ast,
1132 /* For case if upper layer did not alloc fid, do it now. */
1133 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1134 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1136 CERROR("Can't alloc new fid, rc %d\n", rc);
1140 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1141 lmm, lmmsize, NULL, extra_lock_flags);
1144 } else if (!fid_is_sane(&op_data->op_fid2) ||
1145 !(it->it_create_mode & M_CHECK_STALE)) {
1146 /* DISP_ENQ_COMPLETE set means there is extra reference on
1147 * request referenced from this intent, saved for subsequent
1148 * lookup. This path is executed when we proceed to this
1149 * lookup, so we clear DISP_ENQ_COMPLETE */
1150 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1152 *reqp = it->d.lustre.it_data;
1153 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1157 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1158 struct ptlrpc_request *req,
1161 struct mdc_getattr_args *ga = args;
1162 struct obd_export *exp = ga->ga_exp;
1163 struct md_enqueue_info *minfo = ga->ga_minfo;
1164 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1165 struct lookup_intent *it;
1166 struct lustre_handle *lockh;
1167 struct obd_device *obddev;
1168 struct ldlm_reply *lockrep;
1169 __u64 flags = LDLM_FL_HAS_INTENT;
1173 lockh = &minfo->mi_lockh;
1175 obddev = class_exp2obd(exp);
1177 mdc_exit_request(&obddev->u.cli);
1178 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1181 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1182 &flags, NULL, 0, lockh, rc);
1184 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1185 mdc_clear_replay_flag(req, rc);
1189 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1190 LASSERT(lockrep != NULL);
1192 lockrep->lock_policy_res2 =
1193 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1195 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1199 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1203 OBD_FREE_PTR(einfo);
1204 minfo->mi_cb(req, minfo, rc);
1208 int mdc_intent_getattr_async(struct obd_export *exp,
1209 struct md_enqueue_info *minfo,
1210 struct ldlm_enqueue_info *einfo)
1212 struct md_op_data *op_data = &minfo->mi_data;
1213 struct lookup_intent *it = &minfo->mi_it;
1214 struct ptlrpc_request *req;
1215 struct mdc_getattr_args *ga;
1216 struct obd_device *obddev = class_exp2obd(exp);
1217 struct ldlm_res_id res_id;
1218 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1219 * for statahead currently. Consider CMD in future, such two bits
1220 * maybe managed by different MDS, should be adjusted then. */
1221 ldlm_policy_data_t policy = {
1222 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1223 MDS_INODELOCK_UPDATE }
1226 __u64 flags = LDLM_FL_HAS_INTENT;
1229 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1230 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1231 ldlm_it2str(it->it_op), it->it_flags);
1233 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1234 req = mdc_intent_getattr_pack(exp, it, op_data);
1238 rc = mdc_enter_request(&obddev->u.cli);
1240 ptlrpc_req_finished(req);
1244 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1245 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1247 mdc_exit_request(&obddev->u.cli);
1248 ptlrpc_req_finished(req);
1252 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1253 ga = ptlrpc_req_async_args(req);
1255 ga->ga_minfo = minfo;
1256 ga->ga_einfo = einfo;
1258 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1259 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);