4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
40 # include <linux/module.h>
41 # include <linux/pagemap.h>
42 # include <linux/miscdevice.h>
43 # include <linux/init.h>
45 # include <liblustre.h>
48 #include <lustre_acl.h>
49 #include <obd_class.h>
50 #include <lustre_dlm.h>
51 /* fid_res_name_eq() */
52 #include <lustre_fid.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
56 struct mdc_getattr_args {
57 struct obd_export *ga_exp;
58 struct md_enqueue_info *ga_minfo;
59 struct ldlm_enqueue_info *ga_einfo;
62 int it_disposition(struct lookup_intent *it, int flag)
64 return it->d.lustre.it_disposition & flag;
66 EXPORT_SYMBOL(it_disposition);
68 void it_set_disposition(struct lookup_intent *it, int flag)
70 it->d.lustre.it_disposition |= flag;
72 EXPORT_SYMBOL(it_set_disposition);
74 void it_clear_disposition(struct lookup_intent *it, int flag)
76 it->d.lustre.it_disposition &= ~flag;
78 EXPORT_SYMBOL(it_clear_disposition);
80 int it_open_error(int phase, struct lookup_intent *it)
82 if (it_disposition(it, DISP_OPEN_LEASE)) {
83 if (phase >= DISP_OPEN_LEASE)
84 return it->d.lustre.it_status;
88 if (it_disposition(it, DISP_OPEN_OPEN)) {
89 if (phase >= DISP_OPEN_OPEN)
90 return it->d.lustre.it_status;
95 if (it_disposition(it, DISP_OPEN_CREATE)) {
96 if (phase >= DISP_OPEN_CREATE)
97 return it->d.lustre.it_status;
102 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
103 if (phase >= DISP_LOOKUP_EXECD)
104 return it->d.lustre.it_status;
109 if (it_disposition(it, DISP_IT_EXECD)) {
110 if (phase >= DISP_IT_EXECD)
111 return it->d.lustre.it_status;
115 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
116 it->d.lustre.it_status);
120 EXPORT_SYMBOL(it_open_error);
122 /* this must be called on a lockh that is known to have a referenced lock */
123 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
126 struct ldlm_lock *lock;
127 struct inode *new_inode = data;
136 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
138 LASSERT(lock != NULL);
139 lock_res_and_lock(lock);
141 if (lock->l_resource->lr_lvb_inode &&
142 lock->l_resource->lr_lvb_inode != data) {
143 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
144 LASSERTF(old_inode->i_state & I_FREEING,
145 "Found existing inode %p/%lu/%u state %lu in lock: "
146 "setting data to %p/%lu/%u\n", old_inode,
147 old_inode->i_ino, old_inode->i_generation,
149 new_inode, new_inode->i_ino, new_inode->i_generation);
152 lock->l_resource->lr_lvb_inode = new_inode;
154 *bits = lock->l_policy_data.l_inodebits.bits;
156 unlock_res_and_lock(lock);
162 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
163 const struct lu_fid *fid, ldlm_type_t type,
164 ldlm_policy_data_t *policy, ldlm_mode_t mode,
165 struct lustre_handle *lockh)
167 struct ldlm_res_id res_id;
171 fid_build_reg_res_name(fid, &res_id);
172 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
173 &res_id, type, policy, mode, lockh, 0);
177 int mdc_cancel_unused(struct obd_export *exp,
178 const struct lu_fid *fid,
179 ldlm_policy_data_t *policy,
181 ldlm_cancel_flags_t flags,
184 struct ldlm_res_id res_id;
185 struct obd_device *obd = class_exp2obd(exp);
190 fid_build_reg_res_name(fid, &res_id);
191 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
192 policy, mode, flags, opaque);
196 int mdc_null_inode(struct obd_export *exp,
197 const struct lu_fid *fid)
199 struct ldlm_res_id res_id;
200 struct ldlm_resource *res;
201 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
204 LASSERTF(ns != NULL, "no namespace passed\n");
206 fid_build_reg_res_name(fid, &res_id);
208 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
213 res->lr_lvb_inode = NULL;
216 ldlm_resource_putref(res);
220 /* find any ldlm lock of the inode in mdc
224 int mdc_find_cbdata(struct obd_export *exp,
225 const struct lu_fid *fid,
226 ldlm_iterator_t it, void *data)
228 struct ldlm_res_id res_id;
232 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
233 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
235 if (rc == LDLM_ITER_STOP)
237 else if (rc == LDLM_ITER_CONTINUE)
242 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
244 /* Don't hold error requests for replay. */
245 if (req->rq_replay) {
246 spin_lock(&req->rq_lock);
248 spin_unlock(&req->rq_lock);
250 if (rc && req->rq_transno != 0) {
251 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
256 /* Save a large LOV EA into the request buffer so that it is available
257 * for replay. We don't do this in the initial request because the
258 * original request doesn't need this buffer (at most it sends just the
259 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
260 * buffer and may also be difficult to allocate and save a very large
261 * request buffer for each open. (bug 5707)
263 * OOM here may cause recovery failure if lmm is needed (only for the
264 * original open if the MDS crashed just when this client also OOM'd)
265 * but this is incredibly unlikely, and questionable whether the client
266 * could do MDS recovery under OOM anyways... */
267 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
268 struct mdt_body *body)
272 /* FIXME: remove this explicit offset. */
273 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
276 CERROR("Can't enlarge segment %d size to %d\n",
277 DLM_INTENT_REC_OFF + 4, body->eadatasize);
278 body->valid &= ~OBD_MD_FLEASIZE;
279 body->eadatasize = 0;
283 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
284 struct lookup_intent *it,
285 struct md_op_data *op_data,
286 void *lmm, int lmmsize,
289 struct ptlrpc_request *req;
290 struct obd_device *obddev = class_exp2obd(exp);
291 struct ldlm_intent *lit;
292 CFS_LIST_HEAD(cancels);
298 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
300 /* XXX: openlock is not cancelled for cross-refs. */
301 /* If inode is known, cancel conflicting OPEN locks. */
302 if (fid_is_sane(&op_data->op_fid2)) {
303 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
304 if (it->it_flags & FMODE_WRITE)
309 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
312 else if (it->it_flags & FMODE_EXEC)
318 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
323 /* If CREATE, cancel parent's UPDATE lock. */
324 if (it->it_op & IT_CREAT)
328 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
330 MDS_INODELOCK_UPDATE);
332 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
333 &RQF_LDLM_INTENT_OPEN);
335 ldlm_lock_list_put(&cancels, l_bl_ast, count);
336 RETURN(ERR_PTR(-ENOMEM));
339 /* parent capability */
340 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
341 /* child capability, reserve the size according to parent capa, it will
342 * be filled after we get the reply */
343 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
345 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
346 op_data->op_namelen + 1);
347 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
348 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
350 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
352 ptlrpc_request_free(req);
356 spin_lock(&req->rq_lock);
357 req->rq_replay = req->rq_import->imp_replayable;
358 spin_unlock(&req->rq_lock);
360 /* pack the intent */
361 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
362 lit->opc = (__u64)it->it_op;
364 /* pack the intended request */
365 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
368 /* for remote client, fetch remote perm for current user */
369 if (client_is_remote(exp))
370 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
371 sizeof(struct mdt_remote_perm));
372 ptlrpc_request_set_replen(req);
376 static struct ptlrpc_request *
377 mdc_intent_getxattr_pack(struct obd_export *exp,
378 struct lookup_intent *it,
379 struct md_op_data *op_data)
381 struct ptlrpc_request *req;
382 struct ldlm_intent *lit;
383 int rc, count = 0, maxdata;
384 CFS_LIST_HEAD(cancels);
388 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
389 &RQF_LDLM_INTENT_GETXATTR);
391 RETURN(ERR_PTR(-ENOMEM));
393 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
395 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
397 ptlrpc_request_free(req);
401 /* pack the intent */
402 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
403 lit->opc = IT_GETXATTR;
405 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
407 /* pack the intended request */
408 mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
409 op_data->op_valid, maxdata, -1, 0);
411 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
412 RCL_SERVER, maxdata);
414 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
415 RCL_SERVER, maxdata);
417 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
418 RCL_SERVER, maxdata);
420 ptlrpc_request_set_replen(req);
425 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
426 struct lookup_intent *it,
427 struct md_op_data *op_data)
429 struct ptlrpc_request *req;
430 struct obd_device *obddev = class_exp2obd(exp);
431 struct ldlm_intent *lit;
435 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
436 &RQF_LDLM_INTENT_UNLINK);
438 RETURN(ERR_PTR(-ENOMEM));
440 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
441 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
442 op_data->op_namelen + 1);
444 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
446 ptlrpc_request_free(req);
450 /* pack the intent */
451 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
452 lit->opc = (__u64)it->it_op;
454 /* pack the intended request */
455 mdc_unlink_pack(req, op_data);
457 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
458 obddev->u.cli.cl_max_mds_easize);
459 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
460 obddev->u.cli.cl_max_mds_cookiesize);
461 ptlrpc_request_set_replen(req);
465 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
466 struct lookup_intent *it,
467 struct md_op_data *op_data)
469 struct ptlrpc_request *req;
470 struct obd_device *obddev = class_exp2obd(exp);
471 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
472 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
473 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
474 (client_is_remote(exp) ?
475 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
476 struct ldlm_intent *lit;
480 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
481 &RQF_LDLM_INTENT_GETATTR);
483 RETURN(ERR_PTR(-ENOMEM));
485 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
486 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
487 op_data->op_namelen + 1);
489 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
491 ptlrpc_request_free(req);
495 /* pack the intent */
496 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
497 lit->opc = (__u64)it->it_op;
499 /* pack the intended request */
500 mdc_getattr_pack(req, valid, it->it_flags, op_data,
501 obddev->u.cli.cl_max_mds_easize);
503 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
504 obddev->u.cli.cl_max_mds_easize);
505 if (client_is_remote(exp))
506 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
507 sizeof(struct mdt_remote_perm));
508 ptlrpc_request_set_replen(req);
512 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
513 struct lookup_intent *it,
514 struct md_op_data *unused)
516 struct obd_device *obd = class_exp2obd(exp);
517 struct ptlrpc_request *req;
518 struct ldlm_intent *lit;
519 struct layout_intent *layout;
523 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
524 &RQF_LDLM_INTENT_LAYOUT);
526 RETURN(ERR_PTR(-ENOMEM));
528 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
529 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
531 ptlrpc_request_free(req);
535 /* pack the intent */
536 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
537 lit->opc = (__u64)it->it_op;
539 /* pack the layout intent request */
540 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
541 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
542 * set for replication */
543 layout->li_opc = LAYOUT_INTENT_ACCESS;
545 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
546 obd->u.cli.cl_max_mds_easize);
547 ptlrpc_request_set_replen(req);
551 static struct ptlrpc_request *
552 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
554 struct ptlrpc_request *req;
558 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
560 RETURN(ERR_PTR(-ENOMEM));
562 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
564 ptlrpc_request_free(req);
568 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
569 ptlrpc_request_set_replen(req);
573 static int mdc_finish_enqueue(struct obd_export *exp,
574 struct ptlrpc_request *req,
575 struct ldlm_enqueue_info *einfo,
576 struct lookup_intent *it,
577 struct lustre_handle *lockh,
580 struct req_capsule *pill = &req->rq_pill;
581 struct ldlm_request *lockreq;
582 struct ldlm_reply *lockrep;
583 struct lustre_intent_data *intent = &it->d.lustre;
584 struct ldlm_lock *lock;
585 void *lvb_data = NULL;
590 /* Similarly, if we're going to replay this request, we don't want to
591 * actually get a lock, just perform the intent. */
592 if (req->rq_transno || req->rq_replay) {
593 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
594 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
597 if (rc == ELDLM_LOCK_ABORTED) {
599 memset(lockh, 0, sizeof(*lockh));
601 } else { /* rc = 0 */
602 lock = ldlm_handle2lock(lockh);
603 LASSERT(lock != NULL);
605 /* If the server gave us back a different lock mode, we should
606 * fix up our variables. */
607 if (lock->l_req_mode != einfo->ei_mode) {
608 ldlm_lock_addref(lockh, lock->l_req_mode);
609 ldlm_lock_decref(lockh, einfo->ei_mode);
610 einfo->ei_mode = lock->l_req_mode;
615 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
616 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
618 intent->it_disposition = (int)lockrep->lock_policy_res1;
619 intent->it_status = (int)lockrep->lock_policy_res2;
620 intent->it_lock_mode = einfo->ei_mode;
621 intent->it_lock_handle = lockh->cookie;
622 intent->it_data = req;
624 /* Technically speaking rq_transno must already be zero if
625 * it_status is in error, so the check is a bit redundant */
626 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
627 mdc_clear_replay_flag(req, intent->it_status);
629 /* If we're doing an IT_OPEN which did not result in an actual
630 * successful open, then we need to remove the bit which saves
631 * this request for unconditional replay.
633 * It's important that we do this first! Otherwise we might exit the
634 * function without doing so, and try to replay a failed create
636 if (it->it_op & IT_OPEN && req->rq_replay &&
637 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
638 mdc_clear_replay_flag(req, intent->it_status);
640 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
641 it->it_op, intent->it_disposition, intent->it_status);
643 /* We know what to expect, so we do any byte flipping required here */
644 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
645 struct mdt_body *body;
647 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
649 CERROR ("Can't swab mdt_body\n");
653 if (it_disposition(it, DISP_OPEN_OPEN) &&
654 !it_open_error(DISP_OPEN_OPEN, it)) {
656 * If this is a successful OPEN request, we need to set
657 * replay handler and data early, so that if replay
658 * happens immediately after swabbing below, new reply
659 * is swabbed by that handler correctly.
661 mdc_set_open_replay_data(NULL, NULL, req);
664 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
667 mdc_update_max_ea_from_body(exp, body);
670 * The eadata is opaque; just check that it is there.
671 * Eventually, obd_unpackmd() will check the contents.
673 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
678 /* save lvb data and length in case this is for layout
681 lvb_len = body->eadatasize;
684 * We save the reply LOV EA in case we have to replay a
685 * create for recovery. If we didn't allocate a large
686 * enough request buffer above we need to reallocate it
687 * here to hold the actual LOV EA.
689 * To not save LOV EA if request is not going to replay
690 * (for example error one).
692 if ((it->it_op & IT_OPEN) && req->rq_replay) {
694 if (req_capsule_get_size(pill, &RMF_EADATA,
697 mdc_realloc_openmsg(req, body);
699 req_capsule_shrink(pill, &RMF_EADATA,
703 req_capsule_set_size(pill, &RMF_EADATA,
707 lmm = req_capsule_client_get(pill, &RMF_EADATA);
709 memcpy(lmm, eadata, body->eadatasize);
713 if (body->valid & OBD_MD_FLRMTPERM) {
714 struct mdt_remote_perm *perm;
716 LASSERT(client_is_remote(exp));
717 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
718 lustre_swab_mdt_remote_perm);
722 if (body->valid & OBD_MD_FLMDSCAPA) {
723 struct lustre_capa *capa, *p;
725 capa = req_capsule_server_get(pill, &RMF_CAPA1);
729 if (it->it_op & IT_OPEN) {
730 /* client fid capa will be checked in replay */
731 p = req_capsule_client_get(pill, &RMF_CAPA2);
736 if (body->valid & OBD_MD_FLOSSCAPA) {
737 struct lustre_capa *capa;
739 capa = req_capsule_server_get(pill, &RMF_CAPA2);
743 } else if (it->it_op & IT_LAYOUT) {
744 /* maybe the lock was granted right away and layout
745 * is packed into RMF_DLM_LVB of req */
746 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
748 lvb_data = req_capsule_server_sized_get(pill,
749 &RMF_DLM_LVB, lvb_len);
750 if (lvb_data == NULL)
755 /* fill in stripe data for layout lock */
756 lock = ldlm_handle2lock(lockh);
757 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
760 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
761 ldlm_it2str(it->it_op), lvb_len);
763 OBD_ALLOC_LARGE(lmm, lvb_len);
768 memcpy(lmm, lvb_data, lvb_len);
770 /* install lvb_data */
771 lock_res_and_lock(lock);
772 if (lock->l_lvb_data == NULL) {
773 lock->l_lvb_data = lmm;
774 lock->l_lvb_len = lvb_len;
777 unlock_res_and_lock(lock);
779 OBD_FREE_LARGE(lmm, lvb_len);
787 /* We always reserve enough space in the reply packet for a stripe MD, because
788 * we don't know in advance the file type. */
789 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
790 struct lookup_intent *it, struct md_op_data *op_data,
791 struct lustre_handle *lockh, void *lmm, int lmmsize,
792 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
794 struct obd_device *obddev = class_exp2obd(exp);
795 struct ptlrpc_request *req = NULL;
796 __u64 flags, saved_flags = extra_lock_flags;
798 struct ldlm_res_id res_id;
799 static const ldlm_policy_data_t lookup_policy =
800 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
801 static const ldlm_policy_data_t update_policy =
802 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
803 static const ldlm_policy_data_t layout_policy =
804 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
805 static const ldlm_policy_data_t getxattr_policy = {
806 .l_inodebits = { MDS_INODELOCK_XATTR } };
807 ldlm_policy_data_t const *policy = &lookup_policy;
808 int generation, resends = 0;
809 struct ldlm_reply *lockrep;
810 enum lvb_type lvb_type = 0;
813 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
816 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
819 saved_flags |= LDLM_FL_HAS_INTENT;
820 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
821 policy = &update_policy;
822 else if (it->it_op & IT_LAYOUT)
823 policy = &layout_policy;
824 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
825 policy = &getxattr_policy;
828 LASSERT(reqp == NULL);
830 generation = obddev->u.cli.cl_import->imp_generation;
834 /* The only way right now is FLOCK, in this case we hide flock
835 policy as lmm, but lmmsize is 0 */
836 LASSERT(lmm && lmmsize == 0);
837 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
839 policy = (ldlm_policy_data_t *)lmm;
840 res_id.name[3] = LDLM_FLOCK;
841 } else if (it->it_op & IT_OPEN) {
842 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
844 policy = &update_policy;
845 einfo->ei_cbdata = NULL;
847 } else if (it->it_op & IT_UNLINK) {
848 req = mdc_intent_unlink_pack(exp, it, op_data);
849 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
850 req = mdc_intent_getattr_pack(exp, it, op_data);
851 } else if (it->it_op & IT_READDIR) {
852 req = mdc_enqueue_pack(exp, 0);
853 } else if (it->it_op & IT_LAYOUT) {
854 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
856 req = mdc_intent_layout_pack(exp, it, op_data);
857 lvb_type = LVB_T_LAYOUT;
858 } else if (it->it_op & IT_GETXATTR) {
859 req = mdc_intent_getxattr_pack(exp, it, op_data);
866 RETURN(PTR_ERR(req));
868 if (req != NULL && it && it->it_op & IT_CREAT)
869 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
871 req->rq_no_retry_einprogress = 1;
874 req->rq_generation_set = 1;
875 req->rq_import_generation = generation;
876 req->rq_sent = cfs_time_current_sec() + resends;
879 /* It is important to obtain rpc_lock first (if applicable), so that
880 * threads that are serialised with rpc_lock are not polluting our
881 * rpcs in flight counter. We do not do flock request limiting, though*/
883 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
884 rc = mdc_enter_request(&obddev->u.cli);
886 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
887 mdc_clear_replay_flag(req, 0);
888 ptlrpc_req_finished(req);
893 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
894 0, lvb_type, lockh, 0);
896 /* For flock requests we immediatelly return without further
897 delay and let caller deal with the rest, since rest of
898 this function metadata processing makes no sense for flock
899 requests anyway. But in case of problem during comms with
900 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
901 can not rely on caller and this mainly for F_UNLCKs
902 (explicits or automatically generated by Kernel to clean
903 current FLocks upon exit) that can't be trashed */
904 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
905 (einfo->ei_type == LDLM_FLOCK) &&
906 (einfo->ei_mode == LCK_NL))
911 mdc_exit_request(&obddev->u.cli);
912 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
915 CERROR("ldlm_cli_enqueue: %d\n", rc);
916 mdc_clear_replay_flag(req, rc);
917 ptlrpc_req_finished(req);
921 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
922 LASSERT(lockrep != NULL);
924 lockrep->lock_policy_res2 =
925 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
927 /* Retry the create infinitely when we get -EINPROGRESS from
928 * server. This is required by the new quota design. */
929 if (it && it->it_op & IT_CREAT &&
930 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
931 mdc_clear_replay_flag(req, rc);
932 ptlrpc_req_finished(req);
935 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
936 obddev->obd_name, resends, it->it_op,
937 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
939 if (generation == obddev->u.cli.cl_import->imp_generation) {
942 CDEBUG(D_HA, "resend cross eviction\n");
947 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
949 if (lustre_handle_is_used(lockh)) {
950 ldlm_lock_decref(lockh, einfo->ei_mode);
951 memset(lockh, 0, sizeof(*lockh));
953 ptlrpc_req_finished(req);
958 static int mdc_finish_intent_lock(struct obd_export *exp,
959 struct ptlrpc_request *request,
960 struct md_op_data *op_data,
961 struct lookup_intent *it,
962 struct lustre_handle *lockh)
964 struct lustre_handle old_lock;
965 struct mdt_body *mdt_body;
966 struct ldlm_lock *lock;
970 LASSERT(request != NULL);
971 LASSERT(request != LP_POISON);
972 LASSERT(request->rq_repmsg != LP_POISON);
974 if (!it_disposition(it, DISP_IT_EXECD)) {
975 /* The server failed before it even started executing the
976 * intent, i.e. because it couldn't unpack the request. */
977 LASSERT(it->d.lustre.it_status != 0);
978 RETURN(it->d.lustre.it_status);
980 rc = it_open_error(DISP_IT_EXECD, it);
984 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
985 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
987 /* If we were revalidating a fid/name pair, mark the intent in
988 * case we fail and get called again from lookup */
989 if (fid_is_sane(&op_data->op_fid2) &&
990 it->it_create_mode & M_CHECK_STALE &&
991 it->it_op != IT_GETATTR) {
992 it_set_disposition(it, DISP_ENQ_COMPLETE);
994 /* Also: did we find the same inode? */
995 /* sever can return one of two fids:
996 * op_fid2 - new allocated fid - if file is created.
997 * op_fid3 - existent fid - if file only open.
998 * op_fid3 is saved in lmv_intent_open */
999 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
1000 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
1001 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
1002 "\n", PFID(&op_data->op_fid2),
1003 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
1008 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1012 /* keep requests around for the multiple phases of the call
1013 * this shows the DISP_XX must guarantee we make it into the call
1015 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1016 it_disposition(it, DISP_OPEN_CREATE) &&
1017 !it_open_error(DISP_OPEN_CREATE, it)) {
1018 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1019 ptlrpc_request_addref(request); /* balanced in ll_create_node */
1021 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1022 it_disposition(it, DISP_OPEN_OPEN) &&
1023 !it_open_error(DISP_OPEN_OPEN, it)) {
1024 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1025 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1026 /* BUG 11546 - eviction in the middle of open rpc processing */
1027 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1030 if (it->it_op & IT_CREAT) {
1031 /* XXX this belongs in ll_create_it */
1032 } else if (it->it_op == IT_OPEN) {
1033 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1035 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1038 /* If we already have a matching lock, then cancel the new
1039 * one. We have to set the data here instead of in
1040 * mdc_enqueue, because we need to use the child's inode as
1041 * the l_ast_data to match, and that's not available until
1042 * intent_finish has performed the iget().) */
1043 lock = ldlm_handle2lock(lockh);
1045 ldlm_policy_data_t policy = lock->l_policy_data;
1046 LDLM_DEBUG(lock, "matching against this");
1048 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1049 &lock->l_resource->lr_name),
1050 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1051 PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1052 LDLM_LOCK_PUT(lock);
1054 memcpy(&old_lock, lockh, sizeof(*lockh));
1055 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1056 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1057 ldlm_lock_decref_and_cancel(lockh,
1058 it->d.lustre.it_lock_mode);
1059 memcpy(lockh, &old_lock, sizeof(old_lock));
1060 it->d.lustre.it_lock_handle = lockh->cookie;
1063 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1064 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1065 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1069 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1070 struct lu_fid *fid, __u64 *bits)
1072 /* We could just return 1 immediately, but since we should only
1073 * be called in revalidate_it if we already have a lock, let's
1075 struct ldlm_res_id res_id;
1076 struct lustre_handle lockh;
1077 ldlm_policy_data_t policy;
1081 if (it->d.lustre.it_lock_handle) {
1082 lockh.cookie = it->d.lustre.it_lock_handle;
1083 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1085 fid_build_reg_res_name(fid, &res_id);
1086 switch (it->it_op) {
1088 /* File attributes are held under multiple bits:
1089 * nlink is under lookup lock, size and times are
1090 * under UPDATE lock and recently we've also got
1091 * a separate permissions lock for owner/group/acl that
1092 * were protected by lookup lock before.
1093 * Getattr must provide all of that information,
1094 * so we need to ensure we have all of those locks.
1095 * Unfortunately, if the bits are split across multiple
1096 * locks, there's no easy way to match all of them here,
1097 * so an extra RPC would be performed to fetch all
1098 * of those bits at once for now. */
1099 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1100 MDS_INODELOCK_LOOKUP |
1104 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1107 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1111 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1112 LDLM_FL_BLOCK_GRANTED, &res_id,
1113 LDLM_IBITS, &policy,
1114 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1118 it->d.lustre.it_lock_handle = lockh.cookie;
1119 it->d.lustre.it_lock_mode = mode;
1121 it->d.lustre.it_lock_handle = 0;
1122 it->d.lustre.it_lock_mode = 0;
1129 * This long block is all about fixing up the lock and request state
1130 * so that it is correct as of the moment _before_ the operation was
1131 * applied; that way, the VFS will think that everything is normal and
1132 * call Lustre's regular VFS methods.
1134 * If we're performing a creation, that means that unless the creation
1135 * failed with EEXIST, we should fake up a negative dentry.
1137 * For everything else, we want to lookup to succeed.
1139 * One additional note: if CREATE or OPEN succeeded, we add an extra
1140 * reference to the request because we need to keep it around until
1141 * ll_create/ll_open gets called.
1143 * The server will return to us, in it_disposition, an indication of
1144 * exactly what d.lustre.it_status refers to.
1146 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1147 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1148 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1149 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1152 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1155 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1156 void *lmm, int lmmsize, struct lookup_intent *it,
1157 int lookup_flags, struct ptlrpc_request **reqp,
1158 ldlm_blocking_callback cb_blocking,
1159 __u64 extra_lock_flags)
1161 struct lustre_handle lockh;
1166 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1167 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1168 op_data->op_name, PFID(&op_data->op_fid2),
1169 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1173 if (fid_is_sane(&op_data->op_fid2) &&
1174 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1175 /* We could just return 1 immediately, but since we should only
1176 * be called in revalidate_it if we already have a lock, let's
1178 it->d.lustre.it_lock_handle = 0;
1179 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1180 /* Only return failure if it was not GETATTR by cfid
1181 (from inode_revalidate) */
1182 if (rc || op_data->op_namelen != 0)
1186 /* lookup_it may be called only after revalidate_it has run, because
1187 * revalidate_it cannot return errors, only zero. Returning zero causes
1188 * this call to lookup, which *can* return an error.
1190 * We only want to execute the request associated with the intent one
1191 * time, however, so don't send the request again. Instead, skip past
1192 * this and use the request from revalidate. In this case, revalidate
1193 * never dropped its reference, so the refcounts are all OK */
1194 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1195 struct ldlm_enqueue_info einfo = {
1196 .ei_type = LDLM_IBITS,
1197 .ei_mode = it_to_lock_mode(it),
1198 .ei_cb_bl = cb_blocking,
1199 .ei_cb_cp = ldlm_completion_ast,
1202 /* For case if upper layer did not alloc fid, do it now. */
1203 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1204 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1206 CERROR("Can't alloc new fid, rc %d\n", rc);
1210 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1211 lmm, lmmsize, NULL, extra_lock_flags);
1214 } else if (!fid_is_sane(&op_data->op_fid2) ||
1215 !(it->it_create_mode & M_CHECK_STALE)) {
1216 /* DISP_ENQ_COMPLETE set means there is extra reference on
1217 * request referenced from this intent, saved for subsequent
1218 * lookup. This path is executed when we proceed to this
1219 * lookup, so we clear DISP_ENQ_COMPLETE */
1220 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1222 *reqp = it->d.lustre.it_data;
1223 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1227 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1228 struct ptlrpc_request *req,
1231 struct mdc_getattr_args *ga = args;
1232 struct obd_export *exp = ga->ga_exp;
1233 struct md_enqueue_info *minfo = ga->ga_minfo;
1234 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1235 struct lookup_intent *it;
1236 struct lustre_handle *lockh;
1237 struct obd_device *obddev;
1238 struct ldlm_reply *lockrep;
1239 __u64 flags = LDLM_FL_HAS_INTENT;
1243 lockh = &minfo->mi_lockh;
1245 obddev = class_exp2obd(exp);
1247 mdc_exit_request(&obddev->u.cli);
1248 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1251 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1252 &flags, NULL, 0, lockh, rc);
1254 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1255 mdc_clear_replay_flag(req, rc);
1259 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1260 LASSERT(lockrep != NULL);
1262 lockrep->lock_policy_res2 =
1263 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1265 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1269 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1273 OBD_FREE_PTR(einfo);
1274 minfo->mi_cb(req, minfo, rc);
1278 int mdc_intent_getattr_async(struct obd_export *exp,
1279 struct md_enqueue_info *minfo,
1280 struct ldlm_enqueue_info *einfo)
1282 struct md_op_data *op_data = &minfo->mi_data;
1283 struct lookup_intent *it = &minfo->mi_it;
1284 struct ptlrpc_request *req;
1285 struct mdc_getattr_args *ga;
1286 struct obd_device *obddev = class_exp2obd(exp);
1287 struct ldlm_res_id res_id;
1288 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1289 * for statahead currently. Consider CMD in future, such two bits
1290 * maybe managed by different MDS, should be adjusted then. */
1291 ldlm_policy_data_t policy = {
1292 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1293 MDS_INODELOCK_UPDATE }
1296 __u64 flags = LDLM_FL_HAS_INTENT;
1299 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1300 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1301 ldlm_it2str(it->it_op), it->it_flags);
1303 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1304 req = mdc_intent_getattr_pack(exp, it, op_data);
1308 rc = mdc_enter_request(&obddev->u.cli);
1310 ptlrpc_req_finished(req);
1314 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1315 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1317 mdc_exit_request(&obddev->u.cli);
1318 ptlrpc_req_finished(req);
1322 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1323 ga = ptlrpc_req_async_args(req);
1325 ga->ga_minfo = minfo;
1326 ga->ga_einfo = einfo;
1328 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1329 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);