4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
40 # include <linux/module.h>
41 # include <linux/lustre_intent.h>
43 # include <liblustre.h>
47 #include <obd_class.h>
48 #include <lustre_dlm.h>
49 #include <lustre_fid.h> /* fid_res_name_eq() */
50 #include <lustre_mdc.h>
51 #include <lustre_net.h>
52 #include <lustre_req_layout.h>
53 #include "mdc_internal.h"
55 struct mdc_getattr_args {
56 struct obd_export *ga_exp;
57 struct md_enqueue_info *ga_minfo;
58 struct ldlm_enqueue_info *ga_einfo;
61 int it_open_error(int phase, struct lookup_intent *it)
63 if (it_disposition(it, DISP_OPEN_LEASE)) {
64 if (phase >= DISP_OPEN_LEASE)
65 return it->d.lustre.it_status;
69 if (it_disposition(it, DISP_OPEN_OPEN)) {
70 if (phase >= DISP_OPEN_OPEN)
71 return it->d.lustre.it_status;
76 if (it_disposition(it, DISP_OPEN_CREATE)) {
77 if (phase >= DISP_OPEN_CREATE)
78 return it->d.lustre.it_status;
83 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
84 if (phase >= DISP_LOOKUP_EXECD)
85 return it->d.lustre.it_status;
90 if (it_disposition(it, DISP_IT_EXECD)) {
91 if (phase >= DISP_IT_EXECD)
92 return it->d.lustre.it_status;
96 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
97 it->d.lustre.it_status);
101 EXPORT_SYMBOL(it_open_error);
103 /* this must be called on a lockh that is known to have a referenced lock */
104 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
107 struct ldlm_lock *lock;
108 struct inode *new_inode = data;
117 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
119 LASSERT(lock != NULL);
120 lock_res_and_lock(lock);
122 if (lock->l_resource->lr_lvb_inode &&
123 lock->l_resource->lr_lvb_inode != data) {
124 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
125 LASSERTF(old_inode->i_state & I_FREEING,
126 "Found existing inode %p/%lu/%u state %lu in lock: "
127 "setting data to %p/%lu/%u\n", old_inode,
128 old_inode->i_ino, old_inode->i_generation,
130 new_inode, new_inode->i_ino, new_inode->i_generation);
133 lock->l_resource->lr_lvb_inode = new_inode;
135 *bits = lock->l_policy_data.l_inodebits.bits;
137 unlock_res_and_lock(lock);
143 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
144 const struct lu_fid *fid, ldlm_type_t type,
145 ldlm_policy_data_t *policy, ldlm_mode_t mode,
146 struct lustre_handle *lockh)
148 struct ldlm_res_id res_id;
152 fid_build_reg_res_name(fid, &res_id);
153 /* LU-4405: Clear bits not supported by server */
154 policy->l_inodebits.bits &= exp_connect_ibits(exp);
155 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
156 &res_id, type, policy, mode, lockh, 0);
160 int mdc_cancel_unused(struct obd_export *exp,
161 const struct lu_fid *fid,
162 ldlm_policy_data_t *policy,
164 ldlm_cancel_flags_t flags,
167 struct ldlm_res_id res_id;
168 struct obd_device *obd = class_exp2obd(exp);
173 fid_build_reg_res_name(fid, &res_id);
174 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
175 policy, mode, flags, opaque);
179 int mdc_null_inode(struct obd_export *exp,
180 const struct lu_fid *fid)
182 struct ldlm_res_id res_id;
183 struct ldlm_resource *res;
184 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
187 LASSERTF(ns != NULL, "no namespace passed\n");
189 fid_build_reg_res_name(fid, &res_id);
191 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
196 res->lr_lvb_inode = NULL;
199 ldlm_resource_putref(res);
203 /* find any ldlm lock of the inode in mdc
207 int mdc_find_cbdata(struct obd_export *exp,
208 const struct lu_fid *fid,
209 ldlm_iterator_t it, void *data)
211 struct ldlm_res_id res_id;
215 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
216 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
218 if (rc == LDLM_ITER_STOP)
220 else if (rc == LDLM_ITER_CONTINUE)
225 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
227 /* Don't hold error requests for replay. */
228 if (req->rq_replay) {
229 spin_lock(&req->rq_lock);
231 spin_unlock(&req->rq_lock);
233 if (rc && req->rq_transno != 0) {
234 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
239 /* Save a large LOV EA into the request buffer so that it is available
240 * for replay. We don't do this in the initial request because the
241 * original request doesn't need this buffer (at most it sends just the
242 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
243 * buffer and may also be difficult to allocate and save a very large
244 * request buffer for each open. (bug 5707)
246 * OOM here may cause recovery failure if lmm is needed (only for the
247 * original open if the MDS crashed just when this client also OOM'd)
248 * but this is incredibly unlikely, and questionable whether the client
249 * could do MDS recovery under OOM anyways... */
250 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
251 struct mdt_body *body)
255 /* FIXME: remove this explicit offset. */
256 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
257 body->mbo_eadatasize);
259 CERROR("Can't enlarge segment %d size to %d\n",
260 DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
261 body->mbo_valid &= ~OBD_MD_FLEASIZE;
262 body->mbo_eadatasize = 0;
266 static struct ptlrpc_request *
267 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
268 struct md_op_data *op_data)
270 struct ptlrpc_request *req;
271 struct obd_device *obddev = class_exp2obd(exp);
272 struct ldlm_intent *lit;
273 const void *lmm = op_data->op_data;
274 int lmmsize = op_data->op_data_size;
275 struct list_head cancels = LIST_HEAD_INIT(cancels);
281 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
283 /* XXX: openlock is not cancelled for cross-refs. */
284 /* If inode is known, cancel conflicting OPEN locks. */
285 if (fid_is_sane(&op_data->op_fid2)) {
286 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
287 if (it->it_flags & FMODE_WRITE)
292 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
295 else if (it->it_flags & FMODE_EXEC)
301 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
306 /* If CREATE, cancel parent's UPDATE lock. */
307 if (it->it_op & IT_CREAT)
311 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
313 MDS_INODELOCK_UPDATE);
315 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
316 &RQF_LDLM_INTENT_OPEN);
318 ldlm_lock_list_put(&cancels, l_bl_ast, count);
319 RETURN(ERR_PTR(-ENOMEM));
322 /* parent capability */
323 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
324 /* child capability, reserve the size according to parent capa, it will
325 * be filled after we get the reply */
326 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
328 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
329 op_data->op_namelen + 1);
330 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
331 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
333 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
335 ptlrpc_request_free(req);
339 spin_lock(&req->rq_lock);
340 req->rq_replay = req->rq_import->imp_replayable;
341 spin_unlock(&req->rq_lock);
343 /* pack the intent */
344 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
345 lit->opc = (__u64)it->it_op;
347 /* pack the intended request */
348 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
351 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
352 obddev->u.cli.cl_max_mds_easize);
354 /* for remote client, fetch remote perm for current user */
355 if (client_is_remote(exp))
356 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
357 sizeof(struct mdt_remote_perm));
358 ptlrpc_request_set_replen(req);
362 static struct ptlrpc_request *
363 mdc_intent_getxattr_pack(struct obd_export *exp,
364 struct lookup_intent *it,
365 struct md_op_data *op_data)
367 struct ptlrpc_request *req;
368 struct ldlm_intent *lit;
369 int rc, count = 0, maxdata;
370 struct list_head cancels = LIST_HEAD_INIT(cancels);
374 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
375 &RQF_LDLM_INTENT_GETXATTR);
377 RETURN(ERR_PTR(-ENOMEM));
379 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
381 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
383 ptlrpc_request_free(req);
387 /* pack the intent */
388 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
389 lit->opc = IT_GETXATTR;
391 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
393 /* pack the intended request */
394 mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
395 op_data->op_valid, maxdata, -1, 0);
397 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
398 RCL_SERVER, maxdata);
400 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
401 RCL_SERVER, maxdata);
403 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
404 RCL_SERVER, maxdata);
406 ptlrpc_request_set_replen(req);
411 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
412 struct lookup_intent *it,
413 struct md_op_data *op_data)
415 struct ptlrpc_request *req;
416 struct obd_device *obddev = class_exp2obd(exp);
417 struct ldlm_intent *lit;
421 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
422 &RQF_LDLM_INTENT_UNLINK);
424 RETURN(ERR_PTR(-ENOMEM));
426 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
427 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
428 op_data->op_namelen + 1);
430 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
432 ptlrpc_request_free(req);
436 /* pack the intent */
437 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
438 lit->opc = (__u64)it->it_op;
440 /* pack the intended request */
441 mdc_unlink_pack(req, op_data);
443 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
444 obddev->u.cli.cl_default_mds_easize);
445 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
446 obddev->u.cli.cl_default_mds_cookiesize);
447 ptlrpc_request_set_replen(req);
451 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
452 struct lookup_intent *it,
453 struct md_op_data *op_data)
455 struct ptlrpc_request *req;
456 struct obd_device *obddev = class_exp2obd(exp);
457 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
458 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
459 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
460 (client_is_remote(exp) ?
461 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
462 struct ldlm_intent *lit;
467 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
468 &RQF_LDLM_INTENT_GETATTR);
470 RETURN(ERR_PTR(-ENOMEM));
472 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
473 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
474 op_data->op_namelen + 1);
476 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
478 ptlrpc_request_free(req);
482 /* pack the intent */
483 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
484 lit->opc = (__u64)it->it_op;
486 if (obddev->u.cli.cl_default_mds_easize > 0)
487 easize = obddev->u.cli.cl_default_mds_easize;
489 easize = obddev->u.cli.cl_max_mds_easize;
491 /* pack the intended request */
492 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
494 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
495 if (client_is_remote(exp))
496 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
497 sizeof(struct mdt_remote_perm));
498 ptlrpc_request_set_replen(req);
502 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
503 struct lookup_intent *it,
504 struct md_op_data *unused)
506 struct obd_device *obd = class_exp2obd(exp);
507 struct ptlrpc_request *req;
508 struct ldlm_intent *lit;
509 struct layout_intent *layout;
513 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
514 &RQF_LDLM_INTENT_LAYOUT);
516 RETURN(ERR_PTR(-ENOMEM));
518 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
519 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
521 ptlrpc_request_free(req);
525 /* pack the intent */
526 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
527 lit->opc = (__u64)it->it_op;
529 /* pack the layout intent request */
530 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
531 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
532 * set for replication */
533 layout->li_opc = LAYOUT_INTENT_ACCESS;
535 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
536 obd->u.cli.cl_default_mds_easize);
537 ptlrpc_request_set_replen(req);
541 static struct ptlrpc_request *
542 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
544 struct ptlrpc_request *req;
548 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
550 RETURN(ERR_PTR(-ENOMEM));
552 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
554 ptlrpc_request_free(req);
558 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
559 ptlrpc_request_set_replen(req);
563 static int mdc_finish_enqueue(struct obd_export *exp,
564 struct ptlrpc_request *req,
565 struct ldlm_enqueue_info *einfo,
566 struct lookup_intent *it,
567 struct lustre_handle *lockh,
570 struct req_capsule *pill = &req->rq_pill;
571 struct ldlm_request *lockreq;
572 struct ldlm_reply *lockrep;
573 struct lustre_intent_data *intent = &it->d.lustre;
574 struct ldlm_lock *lock;
575 void *lvb_data = NULL;
580 /* Similarly, if we're going to replay this request, we don't want to
581 * actually get a lock, just perform the intent. */
582 if (req->rq_transno || req->rq_replay) {
583 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
584 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
587 if (rc == ELDLM_LOCK_ABORTED) {
589 memset(lockh, 0, sizeof(*lockh));
591 } else { /* rc = 0 */
592 lock = ldlm_handle2lock(lockh);
593 LASSERT(lock != NULL);
595 /* If the server gave us back a different lock mode, we should
596 * fix up our variables. */
597 if (lock->l_req_mode != einfo->ei_mode) {
598 ldlm_lock_addref(lockh, lock->l_req_mode);
599 ldlm_lock_decref(lockh, einfo->ei_mode);
600 einfo->ei_mode = lock->l_req_mode;
605 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
606 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
608 intent->it_disposition = (int)lockrep->lock_policy_res1;
609 intent->it_status = (int)lockrep->lock_policy_res2;
610 intent->it_lock_mode = einfo->ei_mode;
611 intent->it_lock_handle = lockh->cookie;
612 intent->it_data = req;
614 /* Technically speaking rq_transno must already be zero if
615 * it_status is in error, so the check is a bit redundant */
616 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
617 mdc_clear_replay_flag(req, intent->it_status);
619 /* If we're doing an IT_OPEN which did not result in an actual
620 * successful open, then we need to remove the bit which saves
621 * this request for unconditional replay.
623 * It's important that we do this first! Otherwise we might exit the
624 * function without doing so, and try to replay a failed create
626 if (it->it_op & IT_OPEN && req->rq_replay &&
627 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
628 mdc_clear_replay_flag(req, intent->it_status);
630 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
631 it->it_op, intent->it_disposition, intent->it_status);
633 /* We know what to expect, so we do any byte flipping required here */
634 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
635 struct mdt_body *body;
637 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
639 CERROR ("Can't swab mdt_body\n");
643 if (it_disposition(it, DISP_OPEN_OPEN) &&
644 !it_open_error(DISP_OPEN_OPEN, it)) {
646 * If this is a successful OPEN request, we need to set
647 * replay handler and data early, so that if replay
648 * happens immediately after swabbing below, new reply
649 * is swabbed by that handler correctly.
651 mdc_set_open_replay_data(NULL, NULL, it);
654 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
657 mdc_update_max_ea_from_body(exp, body);
660 * The eadata is opaque; just check that it is there.
661 * Eventually, obd_unpackmd() will check the contents.
663 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
664 body->mbo_eadatasize);
668 /* save lvb data and length in case this is for layout
671 lvb_len = body->mbo_eadatasize;
674 * We save the reply LOV EA in case we have to replay a
675 * create for recovery. If we didn't allocate a large
676 * enough request buffer above we need to reallocate it
677 * here to hold the actual LOV EA.
679 * To not save LOV EA if request is not going to replay
680 * (for example error one).
682 if ((it->it_op & IT_OPEN) && req->rq_replay) {
684 if (req_capsule_get_size(pill, &RMF_EADATA,
686 body->mbo_eadatasize)
687 mdc_realloc_openmsg(req, body);
689 req_capsule_shrink(pill, &RMF_EADATA,
690 body->mbo_eadatasize,
693 req_capsule_set_size(pill, &RMF_EADATA,
695 body->mbo_eadatasize);
697 lmm = req_capsule_client_get(pill, &RMF_EADATA);
700 body->mbo_eadatasize);
704 if (body->mbo_valid & OBD_MD_FLRMTPERM) {
705 struct mdt_remote_perm *perm;
707 LASSERT(client_is_remote(exp));
708 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
709 lustre_swab_mdt_remote_perm);
713 if (body->mbo_valid & OBD_MD_FLMDSCAPA) {
714 struct lustre_capa *capa, *p;
716 capa = req_capsule_server_get(pill, &RMF_CAPA1);
720 if (it->it_op & IT_OPEN) {
721 /* client fid capa will be checked in replay */
722 p = req_capsule_client_get(pill, &RMF_CAPA2);
727 if (body->mbo_valid & OBD_MD_FLOSSCAPA) {
728 struct lustre_capa *capa;
730 capa = req_capsule_server_get(pill, &RMF_CAPA2);
734 } else if (it->it_op & IT_LAYOUT) {
735 /* maybe the lock was granted right away and layout
736 * is packed into RMF_DLM_LVB of req */
737 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
739 lvb_data = req_capsule_server_sized_get(pill,
740 &RMF_DLM_LVB, lvb_len);
741 if (lvb_data == NULL)
746 /* fill in stripe data for layout lock */
747 lock = ldlm_handle2lock(lockh);
748 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
751 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
752 ldlm_it2str(it->it_op), lvb_len);
754 OBD_ALLOC_LARGE(lmm, lvb_len);
759 memcpy(lmm, lvb_data, lvb_len);
761 /* install lvb_data */
762 lock_res_and_lock(lock);
763 if (lock->l_lvb_data == NULL) {
764 lock->l_lvb_type = LVB_T_LAYOUT;
765 lock->l_lvb_data = lmm;
766 lock->l_lvb_len = lvb_len;
769 unlock_res_and_lock(lock);
771 OBD_FREE_LARGE(lmm, lvb_len);
779 /* We always reserve enough space in the reply packet for a stripe MD, because
780 * we don't know in advance the file type. */
781 int mdc_enqueue(struct obd_export *exp,
782 struct ldlm_enqueue_info *einfo,
783 const union ldlm_policy_data *policy,
784 struct lookup_intent *it, struct md_op_data *op_data,
785 struct lustre_handle *lockh, __u64 extra_lock_flags)
787 struct obd_device *obddev = class_exp2obd(exp);
788 struct ptlrpc_request *req = NULL;
789 __u64 flags, saved_flags = extra_lock_flags;
791 struct ldlm_res_id res_id;
792 static const ldlm_policy_data_t lookup_policy =
793 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
794 static const ldlm_policy_data_t update_policy =
795 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
796 static const ldlm_policy_data_t layout_policy =
797 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
798 static const ldlm_policy_data_t getxattr_policy = {
799 .l_inodebits = { MDS_INODELOCK_XATTR } };
800 int generation, resends = 0;
801 struct ldlm_reply *lockrep;
802 enum lvb_type lvb_type = 0;
805 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
807 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
810 LASSERT(policy == NULL);
812 saved_flags |= LDLM_FL_HAS_INTENT;
813 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
814 policy = &update_policy;
815 else if (it->it_op & IT_LAYOUT)
816 policy = &layout_policy;
817 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
818 policy = &getxattr_policy;
820 policy = &lookup_policy;
823 generation = obddev->u.cli.cl_import->imp_generation;
827 /* The only way right now is FLOCK. */
828 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
830 res_id.name[3] = LDLM_FLOCK;
831 } else if (it->it_op & IT_OPEN) {
832 req = mdc_intent_open_pack(exp, it, op_data);
833 } else if (it->it_op & IT_UNLINK) {
834 req = mdc_intent_unlink_pack(exp, it, op_data);
835 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
836 req = mdc_intent_getattr_pack(exp, it, op_data);
837 } else if (it->it_op & IT_READDIR) {
838 req = mdc_enqueue_pack(exp, 0);
839 } else if (it->it_op & IT_LAYOUT) {
840 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
842 req = mdc_intent_layout_pack(exp, it, op_data);
843 lvb_type = LVB_T_LAYOUT;
844 } else if (it->it_op & IT_GETXATTR) {
845 req = mdc_intent_getxattr_pack(exp, it, op_data);
852 RETURN(PTR_ERR(req));
854 if (req != NULL && it && it->it_op & IT_CREAT)
855 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
857 req->rq_no_retry_einprogress = 1;
860 req->rq_generation_set = 1;
861 req->rq_import_generation = generation;
862 req->rq_sent = cfs_time_current_sec() + resends;
865 /* It is important to obtain rpc_lock first (if applicable), so that
866 * threads that are serialised with rpc_lock are not polluting our
867 * rpcs in flight counter. We do not do flock request limiting, though*/
869 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
870 rc = obd_get_request_slot(&obddev->u.cli);
872 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
873 mdc_clear_replay_flag(req, 0);
874 ptlrpc_req_finished(req);
879 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
880 0, lvb_type, lockh, 0);
882 /* For flock requests we immediatelly return without further
883 delay and let caller deal with the rest, since rest of
884 this function metadata processing makes no sense for flock
885 requests anyway. But in case of problem during comms with
886 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
887 can not rely on caller and this mainly for F_UNLCKs
888 (explicits or automatically generated by Kernel to clean
889 current FLocks upon exit) that can't be trashed */
890 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
891 (einfo->ei_type == LDLM_FLOCK) &&
892 (einfo->ei_mode == LCK_NL))
897 obd_put_request_slot(&obddev->u.cli);
898 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
901 CDEBUG_LIMIT((rc == -EACCES || rc == -EIDRM) ? D_INFO : D_ERROR,
902 "%s: ldlm_cli_enqueue failed: rc = %d\n",
903 obddev->obd_name, rc);
905 mdc_clear_replay_flag(req, rc);
906 ptlrpc_req_finished(req);
910 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
911 LASSERT(lockrep != NULL);
913 lockrep->lock_policy_res2 =
914 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
916 /* Retry the create infinitely when we get -EINPROGRESS from
917 * server. This is required by the new quota design. */
918 if (it && it->it_op & IT_CREAT &&
919 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
920 mdc_clear_replay_flag(req, rc);
921 ptlrpc_req_finished(req);
924 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
925 obddev->obd_name, resends, it->it_op,
926 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
928 if (generation == obddev->u.cli.cl_import->imp_generation) {
931 CDEBUG(D_HA, "resend cross eviction\n");
936 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
938 if (lustre_handle_is_used(lockh)) {
939 ldlm_lock_decref(lockh, einfo->ei_mode);
940 memset(lockh, 0, sizeof(*lockh));
942 ptlrpc_req_finished(req);
947 static int mdc_finish_intent_lock(struct obd_export *exp,
948 struct ptlrpc_request *request,
949 struct md_op_data *op_data,
950 struct lookup_intent *it,
951 struct lustre_handle *lockh)
953 struct lustre_handle old_lock;
954 struct mdt_body *mdt_body;
955 struct ldlm_lock *lock;
959 LASSERT(request != NULL);
960 LASSERT(request != LP_POISON);
961 LASSERT(request->rq_repmsg != LP_POISON);
963 if (it->it_op & IT_READDIR)
966 if (!it_disposition(it, DISP_IT_EXECD)) {
967 /* The server failed before it even started executing the
968 * intent, i.e. because it couldn't unpack the request. */
969 LASSERT(it->d.lustre.it_status != 0);
970 RETURN(it->d.lustre.it_status);
972 rc = it_open_error(DISP_IT_EXECD, it);
976 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
977 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
979 /* If we were revalidating a fid/name pair, mark the intent in
980 * case we fail and get called again from lookup */
981 if (fid_is_sane(&op_data->op_fid2) &&
982 it->it_create_mode & M_CHECK_STALE &&
983 it->it_op != IT_GETATTR) {
984 /* Also: did we find the same inode? */
985 /* sever can return one of two fids:
986 * op_fid2 - new allocated fid - if file is created.
987 * op_fid3 - existent fid - if file only open.
988 * op_fid3 is saved in lmv_intent_open */
989 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->mbo_fid1)) &&
990 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->mbo_fid1))) {
991 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
992 "\n", PFID(&op_data->op_fid2),
993 PFID(&op_data->op_fid2),
994 PFID(&mdt_body->mbo_fid1));
999 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1003 /* keep requests around for the multiple phases of the call
1004 * this shows the DISP_XX must guarantee we make it into the call
1006 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1007 it_disposition(it, DISP_OPEN_CREATE) &&
1008 !it_open_error(DISP_OPEN_CREATE, it)) {
1009 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1010 ptlrpc_request_addref(request); /* balanced in ll_create_node */
1012 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1013 it_disposition(it, DISP_OPEN_OPEN) &&
1014 !it_open_error(DISP_OPEN_OPEN, it)) {
1015 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1016 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1017 /* BUG 11546 - eviction in the middle of open rpc processing */
1018 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1021 if (it->it_op & IT_CREAT) {
1022 /* XXX this belongs in ll_create_it */
1023 } else if (it->it_op == IT_OPEN) {
1024 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1026 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1029 /* If we already have a matching lock, then cancel the new
1030 * one. We have to set the data here instead of in
1031 * mdc_enqueue, because we need to use the child's inode as
1032 * the l_ast_data to match, and that's not available until
1033 * intent_finish has performed the iget().) */
1034 lock = ldlm_handle2lock(lockh);
1036 ldlm_policy_data_t policy = lock->l_policy_data;
1037 LDLM_DEBUG(lock, "matching against this");
1039 LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1,
1040 &lock->l_resource->lr_name),
1041 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1042 PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1));
1043 LDLM_LOCK_PUT(lock);
1045 memcpy(&old_lock, lockh, sizeof(*lockh));
1046 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1047 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1048 ldlm_lock_decref_and_cancel(lockh,
1049 it->d.lustre.it_lock_mode);
1050 memcpy(lockh, &old_lock, sizeof(old_lock));
1051 it->d.lustre.it_lock_handle = lockh->cookie;
1054 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1055 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1056 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1060 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1061 struct lu_fid *fid, __u64 *bits)
1063 /* We could just return 1 immediately, but since we should only
1064 * be called in revalidate_it if we already have a lock, let's
1066 struct ldlm_res_id res_id;
1067 struct lustre_handle lockh;
1068 ldlm_policy_data_t policy;
1072 if (it->d.lustre.it_lock_handle) {
1073 lockh.cookie = it->d.lustre.it_lock_handle;
1074 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1076 fid_build_reg_res_name(fid, &res_id);
1077 switch (it->it_op) {
1079 /* File attributes are held under multiple bits:
1080 * nlink is under lookup lock, size and times are
1081 * under UPDATE lock and recently we've also got
1082 * a separate permissions lock for owner/group/acl that
1083 * were protected by lookup lock before.
1084 * Getattr must provide all of that information,
1085 * so we need to ensure we have all of those locks.
1086 * Unfortunately, if the bits are split across multiple
1087 * locks, there's no easy way to match all of them here,
1088 * so an extra RPC would be performed to fetch all
1089 * of those bits at once for now. */
1090 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1091 * but for old MDTs (< 2.4), permission is covered
1092 * by LOOKUP lock, so it needs to match all bits here.*/
1093 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1094 MDS_INODELOCK_LOOKUP |
1098 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1101 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1104 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1108 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1109 LDLM_IBITS, &policy,
1110 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1115 it->d.lustre.it_lock_handle = lockh.cookie;
1116 it->d.lustre.it_lock_mode = mode;
1118 it->d.lustre.it_lock_handle = 0;
1119 it->d.lustre.it_lock_mode = 0;
1126 * This long block is all about fixing up the lock and request state
1127 * so that it is correct as of the moment _before_ the operation was
1128 * applied; that way, the VFS will think that everything is normal and
1129 * call Lustre's regular VFS methods.
1131 * If we're performing a creation, that means that unless the creation
1132 * failed with EEXIST, we should fake up a negative dentry.
1134 * For everything else, we want to lookup to succeed.
1136 * One additional note: if CREATE or OPEN succeeded, we add an extra
1137 * reference to the request because we need to keep it around until
1138 * ll_create/ll_open gets called.
1140 * The server will return to us, in it_disposition, an indication of
1141 * exactly what d.lustre.it_status refers to.
1143 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1144 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1145 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1146 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1149 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1152 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1153 struct lookup_intent *it, struct ptlrpc_request **reqp,
1154 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1156 struct ldlm_enqueue_info einfo = {
1157 .ei_type = LDLM_IBITS,
1158 .ei_mode = it_to_lock_mode(it),
1159 .ei_cb_bl = cb_blocking,
1160 .ei_cb_cp = ldlm_completion_ast,
1162 struct lustre_handle lockh;
1167 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1168 ", intent: %s flags %#"LPF64"o\n", op_data->op_namelen,
1169 op_data->op_name, PFID(&op_data->op_fid2),
1170 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1174 if (fid_is_sane(&op_data->op_fid2) &&
1175 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1176 /* We could just return 1 immediately, but since we should only
1177 * be called in revalidate_it if we already have a lock, let's
1179 it->d.lustre.it_lock_handle = 0;
1180 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1181 /* Only return failure if it was not GETATTR by cfid
1182 (from inode_revalidate) */
1183 if (rc || op_data->op_namelen != 0)
1187 /* For case if upper layer did not alloc fid, do it now. */
1188 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1189 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1191 CERROR("Can't alloc new fid, rc %d\n", rc);
1196 rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
1201 *reqp = it->d.lustre.it_data;
1202 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1206 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1207 struct ptlrpc_request *req,
1210 struct mdc_getattr_args *ga = args;
1211 struct obd_export *exp = ga->ga_exp;
1212 struct md_enqueue_info *minfo = ga->ga_minfo;
1213 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1214 struct lookup_intent *it;
1215 struct lustre_handle *lockh;
1216 struct obd_device *obddev;
1217 struct ldlm_reply *lockrep;
1218 __u64 flags = LDLM_FL_HAS_INTENT;
1222 lockh = &minfo->mi_lockh;
1224 obddev = class_exp2obd(exp);
1226 obd_put_request_slot(&obddev->u.cli);
1227 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1230 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1231 &flags, NULL, 0, lockh, rc);
1233 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1234 mdc_clear_replay_flag(req, rc);
1238 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1239 LASSERT(lockrep != NULL);
1241 lockrep->lock_policy_res2 =
1242 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1244 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1248 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1252 OBD_FREE_PTR(einfo);
1253 minfo->mi_cb(req, minfo, rc);
1257 int mdc_intent_getattr_async(struct obd_export *exp,
1258 struct md_enqueue_info *minfo,
1259 struct ldlm_enqueue_info *einfo)
1261 struct md_op_data *op_data = &minfo->mi_data;
1262 struct lookup_intent *it = &minfo->mi_it;
1263 struct ptlrpc_request *req;
1264 struct mdc_getattr_args *ga;
1265 struct obd_device *obddev = class_exp2obd(exp);
1266 struct ldlm_res_id res_id;
1267 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1268 * for statahead currently. Consider CMD in future, such two bits
1269 * maybe managed by different MDS, should be adjusted then. */
1270 ldlm_policy_data_t policy = {
1271 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1272 MDS_INODELOCK_UPDATE }
1275 __u64 flags = LDLM_FL_HAS_INTENT;
1278 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#"
1280 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1281 ldlm_it2str(it->it_op), it->it_flags);
1283 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1284 req = mdc_intent_getattr_pack(exp, it, op_data);
1286 RETURN(PTR_ERR(req));
1288 rc = obd_get_request_slot(&obddev->u.cli);
1290 ptlrpc_req_finished(req);
1294 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1295 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1297 obd_put_request_slot(&obddev->u.cli);
1298 ptlrpc_req_finished(req);
1302 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1303 ga = ptlrpc_req_async_args(req);
1305 ga->ga_minfo = minfo;
1306 ga->ga_einfo = einfo;
1308 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1309 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);