4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
39 #include <linux/module.h>
41 #include <obd_class.h>
42 #include <lustre_dlm.h>
43 #include <lustre_fid.h> /* fid_res_name_eq() */
44 #include <lustre_intent.h>
45 #include <lustre_mdc.h>
46 #include <lustre_net.h>
47 #include <lustre_req_layout.h>
48 #include "mdc_internal.h"
50 struct mdc_getattr_args {
51 struct obd_export *ga_exp;
52 struct md_enqueue_info *ga_minfo;
53 struct ldlm_enqueue_info *ga_einfo;
56 int it_open_error(int phase, struct lookup_intent *it)
58 if (it_disposition(it, DISP_OPEN_LEASE)) {
59 if (phase >= DISP_OPEN_LEASE)
60 return it->d.lustre.it_status;
64 if (it_disposition(it, DISP_OPEN_OPEN)) {
65 if (phase >= DISP_OPEN_OPEN)
66 return it->d.lustre.it_status;
71 if (it_disposition(it, DISP_OPEN_CREATE)) {
72 if (phase >= DISP_OPEN_CREATE)
73 return it->d.lustre.it_status;
78 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
79 if (phase >= DISP_LOOKUP_EXECD)
80 return it->d.lustre.it_status;
85 if (it_disposition(it, DISP_IT_EXECD)) {
86 if (phase >= DISP_IT_EXECD)
87 return it->d.lustre.it_status;
91 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
92 it->d.lustre.it_status);
96 EXPORT_SYMBOL(it_open_error);
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
102 struct ldlm_lock *lock;
103 struct inode *new_inode = data;
112 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
114 LASSERT(lock != NULL);
115 lock_res_and_lock(lock);
116 if (lock->l_resource->lr_lvb_inode &&
117 lock->l_resource->lr_lvb_inode != data) {
118 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119 LASSERTF(old_inode->i_state & I_FREEING,
120 "Found existing inode %p/%lu/%u state %lu in lock: "
121 "setting data to %p/%lu/%u\n", old_inode,
122 old_inode->i_ino, old_inode->i_generation,
124 new_inode, new_inode->i_ino, new_inode->i_generation);
126 lock->l_resource->lr_lvb_inode = new_inode;
128 *bits = lock->l_policy_data.l_inodebits.bits;
130 unlock_res_and_lock(lock);
136 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
137 const struct lu_fid *fid, ldlm_type_t type,
138 ldlm_policy_data_t *policy, ldlm_mode_t mode,
139 struct lustre_handle *lockh)
141 struct ldlm_res_id res_id;
145 fid_build_reg_res_name(fid, &res_id);
146 /* LU-4405: Clear bits not supported by server */
147 policy->l_inodebits.bits &= exp_connect_ibits(exp);
148 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149 &res_id, type, policy, mode, lockh, 0);
153 int mdc_cancel_unused(struct obd_export *exp,
154 const struct lu_fid *fid,
155 ldlm_policy_data_t *policy,
157 ldlm_cancel_flags_t flags,
160 struct ldlm_res_id res_id;
161 struct obd_device *obd = class_exp2obd(exp);
166 fid_build_reg_res_name(fid, &res_id);
167 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
168 policy, mode, flags, opaque);
172 int mdc_null_inode(struct obd_export *exp,
173 const struct lu_fid *fid)
175 struct ldlm_res_id res_id;
176 struct ldlm_resource *res;
177 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
180 LASSERTF(ns != NULL, "no namespace passed\n");
182 fid_build_reg_res_name(fid, &res_id);
184 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
189 res->lr_lvb_inode = NULL;
192 ldlm_resource_putref(res);
196 /* find any ldlm lock of the inode in mdc
200 int mdc_find_cbdata(struct obd_export *exp,
201 const struct lu_fid *fid,
202 ldlm_iterator_t it, void *data)
204 struct ldlm_res_id res_id;
208 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
209 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
211 if (rc == LDLM_ITER_STOP)
213 else if (rc == LDLM_ITER_CONTINUE)
218 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
220 /* Don't hold error requests for replay. */
221 if (req->rq_replay) {
222 spin_lock(&req->rq_lock);
224 spin_unlock(&req->rq_lock);
226 if (rc && req->rq_transno != 0) {
227 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
232 /* Save a large LOV EA into the request buffer so that it is available
233 * for replay. We don't do this in the initial request because the
234 * original request doesn't need this buffer (at most it sends just the
235 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
236 * buffer and may also be difficult to allocate and save a very large
237 * request buffer for each open. (bug 5707)
239 * OOM here may cause recovery failure if lmm is needed (only for the
240 * original open if the MDS crashed just when this client also OOM'd)
241 * but this is incredibly unlikely, and questionable whether the client
242 * could do MDS recovery under OOM anyways... */
243 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
244 struct mdt_body *body)
248 /* FIXME: remove this explicit offset. */
249 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
250 body->mbo_eadatasize);
252 CERROR("Can't enlarge segment %d size to %d\n",
253 DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
254 body->mbo_valid &= ~OBD_MD_FLEASIZE;
255 body->mbo_eadatasize = 0;
259 static struct ptlrpc_request *
260 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
261 struct md_op_data *op_data)
263 struct ptlrpc_request *req;
264 struct obd_device *obddev = class_exp2obd(exp);
265 struct ldlm_intent *lit;
266 const void *lmm = op_data->op_data;
267 int lmmsize = op_data->op_data_size;
268 struct list_head cancels = LIST_HEAD_INIT(cancels);
274 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
276 /* XXX: openlock is not cancelled for cross-refs. */
277 /* If inode is known, cancel conflicting OPEN locks. */
278 if (fid_is_sane(&op_data->op_fid2)) {
279 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
280 if (it->it_flags & FMODE_WRITE)
285 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
288 else if (it->it_flags & FMODE_EXEC)
294 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
299 /* If CREATE, cancel parent's UPDATE lock. */
300 if (it->it_op & IT_CREAT)
304 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
306 MDS_INODELOCK_UPDATE);
308 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
309 &RQF_LDLM_INTENT_OPEN);
311 ldlm_lock_list_put(&cancels, l_bl_ast, count);
312 RETURN(ERR_PTR(-ENOMEM));
315 /* parent capability */
316 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
317 /* child capability, reserve the size according to parent capa, it will
318 * be filled after we get the reply */
319 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
321 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
322 op_data->op_namelen + 1);
323 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
324 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
326 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
328 ptlrpc_request_free(req);
332 spin_lock(&req->rq_lock);
333 req->rq_replay = req->rq_import->imp_replayable;
334 spin_unlock(&req->rq_lock);
336 /* pack the intent */
337 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
338 lit->opc = (__u64)it->it_op;
340 /* pack the intended request */
341 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
344 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
345 obddev->u.cli.cl_max_mds_easize);
347 /* for remote client, fetch remote perm for current user */
348 if (client_is_remote(exp))
349 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
350 sizeof(struct mdt_remote_perm));
351 ptlrpc_request_set_replen(req);
355 static struct ptlrpc_request *
356 mdc_intent_getxattr_pack(struct obd_export *exp,
357 struct lookup_intent *it,
358 struct md_op_data *op_data)
360 struct ptlrpc_request *req;
361 struct ldlm_intent *lit;
362 int rc, count = 0, maxdata;
363 struct list_head cancels = LIST_HEAD_INIT(cancels);
367 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
368 &RQF_LDLM_INTENT_GETXATTR);
370 RETURN(ERR_PTR(-ENOMEM));
372 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
374 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
376 ptlrpc_request_free(req);
380 /* pack the intent */
381 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
382 lit->opc = IT_GETXATTR;
384 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
386 /* pack the intended request */
387 mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
388 op_data->op_valid, maxdata, -1, 0);
390 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
391 RCL_SERVER, maxdata);
393 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
394 RCL_SERVER, maxdata);
396 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
397 RCL_SERVER, maxdata);
399 ptlrpc_request_set_replen(req);
404 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
405 struct lookup_intent *it,
406 struct md_op_data *op_data)
408 struct ptlrpc_request *req;
409 struct obd_device *obddev = class_exp2obd(exp);
410 struct ldlm_intent *lit;
414 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
415 &RQF_LDLM_INTENT_UNLINK);
417 RETURN(ERR_PTR(-ENOMEM));
419 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
420 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
421 op_data->op_namelen + 1);
423 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
425 ptlrpc_request_free(req);
429 /* pack the intent */
430 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
431 lit->opc = (__u64)it->it_op;
433 /* pack the intended request */
434 mdc_unlink_pack(req, op_data);
436 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
437 obddev->u.cli.cl_default_mds_easize);
438 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
439 obddev->u.cli.cl_default_mds_cookiesize);
440 ptlrpc_request_set_replen(req);
444 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
445 struct lookup_intent *it,
446 struct md_op_data *op_data)
448 struct ptlrpc_request *req;
449 struct obd_device *obddev = class_exp2obd(exp);
450 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
451 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
452 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
453 (client_is_remote(exp) ?
454 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
455 struct ldlm_intent *lit;
460 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
461 &RQF_LDLM_INTENT_GETATTR);
463 RETURN(ERR_PTR(-ENOMEM));
465 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
466 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
467 op_data->op_namelen + 1);
469 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
471 ptlrpc_request_free(req);
475 /* pack the intent */
476 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
477 lit->opc = (__u64)it->it_op;
479 if (obddev->u.cli.cl_default_mds_easize > 0)
480 easize = obddev->u.cli.cl_default_mds_easize;
482 easize = obddev->u.cli.cl_max_mds_easize;
484 /* pack the intended request */
485 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
487 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
488 if (client_is_remote(exp))
489 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
490 sizeof(struct mdt_remote_perm));
491 ptlrpc_request_set_replen(req);
495 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
496 struct lookup_intent *it,
497 struct md_op_data *unused)
499 struct obd_device *obd = class_exp2obd(exp);
500 struct ptlrpc_request *req;
501 struct ldlm_intent *lit;
502 struct layout_intent *layout;
506 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
507 &RQF_LDLM_INTENT_LAYOUT);
509 RETURN(ERR_PTR(-ENOMEM));
511 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
512 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
514 ptlrpc_request_free(req);
518 /* pack the intent */
519 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
520 lit->opc = (__u64)it->it_op;
522 /* pack the layout intent request */
523 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
524 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
525 * set for replication */
526 layout->li_opc = LAYOUT_INTENT_ACCESS;
528 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
529 obd->u.cli.cl_default_mds_easize);
530 ptlrpc_request_set_replen(req);
534 static struct ptlrpc_request *
535 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
537 struct ptlrpc_request *req;
541 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
543 RETURN(ERR_PTR(-ENOMEM));
545 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
547 ptlrpc_request_free(req);
551 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
552 ptlrpc_request_set_replen(req);
556 static int mdc_finish_enqueue(struct obd_export *exp,
557 struct ptlrpc_request *req,
558 struct ldlm_enqueue_info *einfo,
559 struct lookup_intent *it,
560 struct lustre_handle *lockh,
563 struct req_capsule *pill = &req->rq_pill;
564 struct ldlm_request *lockreq;
565 struct ldlm_reply *lockrep;
566 struct lustre_intent_data *intent = &it->d.lustre;
567 struct ldlm_lock *lock;
568 void *lvb_data = NULL;
573 /* Similarly, if we're going to replay this request, we don't want to
574 * actually get a lock, just perform the intent. */
575 if (req->rq_transno || req->rq_replay) {
576 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
577 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
580 if (rc == ELDLM_LOCK_ABORTED) {
582 memset(lockh, 0, sizeof(*lockh));
584 } else { /* rc = 0 */
585 lock = ldlm_handle2lock(lockh);
586 LASSERT(lock != NULL);
588 /* If the server gave us back a different lock mode, we should
589 * fix up our variables. */
590 if (lock->l_req_mode != einfo->ei_mode) {
591 ldlm_lock_addref(lockh, lock->l_req_mode);
592 ldlm_lock_decref(lockh, einfo->ei_mode);
593 einfo->ei_mode = lock->l_req_mode;
598 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
599 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
601 intent->it_disposition = (int)lockrep->lock_policy_res1;
602 intent->it_status = (int)lockrep->lock_policy_res2;
603 intent->it_lock_mode = einfo->ei_mode;
604 intent->it_lock_handle = lockh->cookie;
605 intent->it_data = req;
607 /* Technically speaking rq_transno must already be zero if
608 * it_status is in error, so the check is a bit redundant */
609 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
610 mdc_clear_replay_flag(req, intent->it_status);
612 /* If we're doing an IT_OPEN which did not result in an actual
613 * successful open, then we need to remove the bit which saves
614 * this request for unconditional replay.
616 * It's important that we do this first! Otherwise we might exit the
617 * function without doing so, and try to replay a failed create
619 if (it->it_op & IT_OPEN && req->rq_replay &&
620 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
621 mdc_clear_replay_flag(req, intent->it_status);
623 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
624 it->it_op, intent->it_disposition, intent->it_status);
626 /* We know what to expect, so we do any byte flipping required here */
627 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
628 struct mdt_body *body;
630 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
632 CERROR ("Can't swab mdt_body\n");
636 if (it_disposition(it, DISP_OPEN_OPEN) &&
637 !it_open_error(DISP_OPEN_OPEN, it)) {
639 * If this is a successful OPEN request, we need to set
640 * replay handler and data early, so that if replay
641 * happens immediately after swabbing below, new reply
642 * is swabbed by that handler correctly.
644 mdc_set_open_replay_data(NULL, NULL, it);
647 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
650 mdc_update_max_ea_from_body(exp, body);
653 * The eadata is opaque; just check that it is there.
654 * Eventually, obd_unpackmd() will check the contents.
656 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
657 body->mbo_eadatasize);
661 /* save lvb data and length in case this is for layout
664 lvb_len = body->mbo_eadatasize;
667 * We save the reply LOV EA in case we have to replay a
668 * create for recovery. If we didn't allocate a large
669 * enough request buffer above we need to reallocate it
670 * here to hold the actual LOV EA.
672 * To not save LOV EA if request is not going to replay
673 * (for example error one).
675 if ((it->it_op & IT_OPEN) && req->rq_replay) {
677 if (req_capsule_get_size(pill, &RMF_EADATA,
679 body->mbo_eadatasize)
680 mdc_realloc_openmsg(req, body);
682 req_capsule_shrink(pill, &RMF_EADATA,
683 body->mbo_eadatasize,
686 req_capsule_set_size(pill, &RMF_EADATA,
688 body->mbo_eadatasize);
690 lmm = req_capsule_client_get(pill, &RMF_EADATA);
693 body->mbo_eadatasize);
697 if (body->mbo_valid & OBD_MD_FLRMTPERM) {
698 struct mdt_remote_perm *perm;
700 LASSERT(client_is_remote(exp));
701 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
702 lustre_swab_mdt_remote_perm);
706 if (body->mbo_valid & OBD_MD_FLMDSCAPA) {
707 struct lustre_capa *capa, *p;
709 capa = req_capsule_server_get(pill, &RMF_CAPA1);
713 if (it->it_op & IT_OPEN) {
714 /* client fid capa will be checked in replay */
715 p = req_capsule_client_get(pill, &RMF_CAPA2);
720 if (body->mbo_valid & OBD_MD_FLOSSCAPA) {
721 struct lustre_capa *capa;
723 capa = req_capsule_server_get(pill, &RMF_CAPA2);
727 } else if (it->it_op & IT_LAYOUT) {
728 /* maybe the lock was granted right away and layout
729 * is packed into RMF_DLM_LVB of req */
730 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
732 lvb_data = req_capsule_server_sized_get(pill,
733 &RMF_DLM_LVB, lvb_len);
734 if (lvb_data == NULL)
739 /* fill in stripe data for layout lock */
740 lock = ldlm_handle2lock(lockh);
741 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
744 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
745 ldlm_it2str(it->it_op), lvb_len);
747 OBD_ALLOC_LARGE(lmm, lvb_len);
752 memcpy(lmm, lvb_data, lvb_len);
754 /* install lvb_data */
755 lock_res_and_lock(lock);
756 if (lock->l_lvb_data == NULL) {
757 lock->l_lvb_type = LVB_T_LAYOUT;
758 lock->l_lvb_data = lmm;
759 lock->l_lvb_len = lvb_len;
762 unlock_res_and_lock(lock);
764 OBD_FREE_LARGE(lmm, lvb_len);
772 /* We always reserve enough space in the reply packet for a stripe MD, because
773 * we don't know in advance the file type. */
774 int mdc_enqueue(struct obd_export *exp,
775 struct ldlm_enqueue_info *einfo,
776 const union ldlm_policy_data *policy,
777 struct lookup_intent *it, struct md_op_data *op_data,
778 struct lustre_handle *lockh, __u64 extra_lock_flags)
780 struct obd_device *obddev = class_exp2obd(exp);
781 struct ptlrpc_request *req = NULL;
782 __u64 flags, saved_flags = extra_lock_flags;
784 struct ldlm_res_id res_id;
785 static const ldlm_policy_data_t lookup_policy =
786 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
787 static const ldlm_policy_data_t update_policy =
788 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
789 static const ldlm_policy_data_t layout_policy =
790 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
791 static const ldlm_policy_data_t getxattr_policy = {
792 .l_inodebits = { MDS_INODELOCK_XATTR } };
793 int generation, resends = 0;
794 struct ldlm_reply *lockrep;
795 enum lvb_type lvb_type = 0;
798 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
800 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
803 LASSERT(policy == NULL);
805 saved_flags |= LDLM_FL_HAS_INTENT;
806 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
807 policy = &update_policy;
808 else if (it->it_op & IT_LAYOUT)
809 policy = &layout_policy;
810 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
811 policy = &getxattr_policy;
813 policy = &lookup_policy;
816 generation = obddev->u.cli.cl_import->imp_generation;
820 /* The only way right now is FLOCK. */
821 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
823 res_id.name[3] = LDLM_FLOCK;
824 } else if (it->it_op & IT_OPEN) {
825 req = mdc_intent_open_pack(exp, it, op_data);
826 } else if (it->it_op & IT_UNLINK) {
827 req = mdc_intent_unlink_pack(exp, it, op_data);
828 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
829 req = mdc_intent_getattr_pack(exp, it, op_data);
830 } else if (it->it_op & IT_READDIR) {
831 req = mdc_enqueue_pack(exp, 0);
832 } else if (it->it_op & IT_LAYOUT) {
833 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
835 req = mdc_intent_layout_pack(exp, it, op_data);
836 lvb_type = LVB_T_LAYOUT;
837 } else if (it->it_op & IT_GETXATTR) {
838 req = mdc_intent_getxattr_pack(exp, it, op_data);
845 RETURN(PTR_ERR(req));
847 if (req != NULL && it && it->it_op & IT_CREAT)
848 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
850 req->rq_no_retry_einprogress = 1;
853 req->rq_generation_set = 1;
854 req->rq_import_generation = generation;
855 req->rq_sent = cfs_time_current_sec() + resends;
858 /* It is important to obtain rpc_lock first (if applicable), so that
859 * threads that are serialised with rpc_lock are not polluting our
860 * rpcs in flight counter. We do not do flock request limiting, though*/
862 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
863 rc = obd_get_request_slot(&obddev->u.cli);
865 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
866 mdc_clear_replay_flag(req, 0);
867 ptlrpc_req_finished(req);
872 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
873 0, lvb_type, lockh, 0);
875 /* For flock requests we immediatelly return without further
876 delay and let caller deal with the rest, since rest of
877 this function metadata processing makes no sense for flock
878 requests anyway. But in case of problem during comms with
879 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
880 can not rely on caller and this mainly for F_UNLCKs
881 (explicits or automatically generated by Kernel to clean
882 current FLocks upon exit) that can't be trashed */
883 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
884 (einfo->ei_type == LDLM_FLOCK) &&
885 (einfo->ei_mode == LCK_NL))
890 obd_put_request_slot(&obddev->u.cli);
891 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
894 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
895 obddev->obd_name, rc);
897 mdc_clear_replay_flag(req, rc);
898 ptlrpc_req_finished(req);
902 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
903 LASSERT(lockrep != NULL);
905 lockrep->lock_policy_res2 =
906 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
908 /* Retry the create infinitely when we get -EINPROGRESS from
909 * server. This is required by the new quota design. */
910 if (it && it->it_op & IT_CREAT &&
911 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
912 mdc_clear_replay_flag(req, rc);
913 ptlrpc_req_finished(req);
916 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
917 obddev->obd_name, resends, it->it_op,
918 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
920 if (generation == obddev->u.cli.cl_import->imp_generation) {
923 CDEBUG(D_HA, "resend cross eviction\n");
928 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
930 if (lustre_handle_is_used(lockh)) {
931 ldlm_lock_decref(lockh, einfo->ei_mode);
932 memset(lockh, 0, sizeof(*lockh));
934 ptlrpc_req_finished(req);
936 it->d.lustre.it_lock_handle = 0;
937 it->d.lustre.it_lock_mode = 0;
938 it->d.lustre.it_data = NULL;
944 static int mdc_finish_intent_lock(struct obd_export *exp,
945 struct ptlrpc_request *request,
946 struct md_op_data *op_data,
947 struct lookup_intent *it,
948 struct lustre_handle *lockh)
950 struct lustre_handle old_lock;
951 struct mdt_body *mdt_body;
952 struct ldlm_lock *lock;
956 LASSERT(request != NULL);
957 LASSERT(request != LP_POISON);
958 LASSERT(request->rq_repmsg != LP_POISON);
960 if (it->it_op & IT_READDIR)
963 if (!it_disposition(it, DISP_IT_EXECD)) {
964 /* The server failed before it even started executing the
965 * intent, i.e. because it couldn't unpack the request. */
966 LASSERT(it->d.lustre.it_status != 0);
967 RETURN(it->d.lustre.it_status);
969 rc = it_open_error(DISP_IT_EXECD, it);
973 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
974 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
976 rc = it_open_error(DISP_LOOKUP_EXECD, it);
980 /* keep requests around for the multiple phases of the call
981 * this shows the DISP_XX must guarantee we make it into the call
983 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
984 it_disposition(it, DISP_OPEN_CREATE) &&
985 !it_open_error(DISP_OPEN_CREATE, it)) {
986 it_set_disposition(it, DISP_ENQ_CREATE_REF);
987 ptlrpc_request_addref(request); /* balanced in ll_create_node */
989 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
990 it_disposition(it, DISP_OPEN_OPEN) &&
991 !it_open_error(DISP_OPEN_OPEN, it)) {
992 it_set_disposition(it, DISP_ENQ_OPEN_REF);
993 ptlrpc_request_addref(request); /* balanced in ll_file_open */
994 /* BUG 11546 - eviction in the middle of open rpc processing */
995 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
998 if (it->it_op & IT_CREAT) {
999 /* XXX this belongs in ll_create_it */
1000 } else if (it->it_op == IT_OPEN) {
1001 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1003 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1006 /* If we already have a matching lock, then cancel the new
1007 * one. We have to set the data here instead of in
1008 * mdc_enqueue, because we need to use the child's inode as
1009 * the l_ast_data to match, and that's not available until
1010 * intent_finish has performed the iget().) */
1011 lock = ldlm_handle2lock(lockh);
1013 ldlm_policy_data_t policy = lock->l_policy_data;
1014 LDLM_DEBUG(lock, "matching against this");
1016 LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1,
1017 &lock->l_resource->lr_name),
1018 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1019 PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1));
1020 LDLM_LOCK_PUT(lock);
1022 memcpy(&old_lock, lockh, sizeof(*lockh));
1023 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1024 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1025 ldlm_lock_decref_and_cancel(lockh,
1026 it->d.lustre.it_lock_mode);
1027 memcpy(lockh, &old_lock, sizeof(old_lock));
1028 it->d.lustre.it_lock_handle = lockh->cookie;
1031 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1032 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1033 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1037 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1038 struct lu_fid *fid, __u64 *bits)
1040 /* We could just return 1 immediately, but since we should only
1041 * be called in revalidate_it if we already have a lock, let's
1043 struct ldlm_res_id res_id;
1044 struct lustre_handle lockh;
1045 ldlm_policy_data_t policy;
1049 if (it->d.lustre.it_lock_handle) {
1050 lockh.cookie = it->d.lustre.it_lock_handle;
1051 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1053 fid_build_reg_res_name(fid, &res_id);
1054 switch (it->it_op) {
1056 /* File attributes are held under multiple bits:
1057 * nlink is under lookup lock, size and times are
1058 * under UPDATE lock and recently we've also got
1059 * a separate permissions lock for owner/group/acl that
1060 * were protected by lookup lock before.
1061 * Getattr must provide all of that information,
1062 * so we need to ensure we have all of those locks.
1063 * Unfortunately, if the bits are split across multiple
1064 * locks, there's no easy way to match all of them here,
1065 * so an extra RPC would be performed to fetch all
1066 * of those bits at once for now. */
1067 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1068 * but for old MDTs (< 2.4), permission is covered
1069 * by LOOKUP lock, so it needs to match all bits here.*/
1070 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1071 MDS_INODELOCK_LOOKUP |
1075 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1078 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1081 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1085 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1086 LDLM_IBITS, &policy,
1087 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1092 it->d.lustre.it_lock_handle = lockh.cookie;
1093 it->d.lustre.it_lock_mode = mode;
1095 it->d.lustre.it_lock_handle = 0;
1096 it->d.lustre.it_lock_mode = 0;
1103 * This long block is all about fixing up the lock and request state
1104 * so that it is correct as of the moment _before_ the operation was
1105 * applied; that way, the VFS will think that everything is normal and
1106 * call Lustre's regular VFS methods.
1108 * If we're performing a creation, that means that unless the creation
1109 * failed with EEXIST, we should fake up a negative dentry.
1111 * For everything else, we want to lookup to succeed.
1113 * One additional note: if CREATE or OPEN succeeded, we add an extra
1114 * reference to the request because we need to keep it around until
1115 * ll_create/ll_open gets called.
1117 * The server will return to us, in it_disposition, an indication of
1118 * exactly what d.lustre.it_status refers to.
1120 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1121 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1122 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1123 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1126 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1129 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1130 struct lookup_intent *it, struct ptlrpc_request **reqp,
1131 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1133 struct ldlm_enqueue_info einfo = {
1134 .ei_type = LDLM_IBITS,
1135 .ei_mode = it_to_lock_mode(it),
1136 .ei_cb_bl = cb_blocking,
1137 .ei_cb_cp = ldlm_completion_ast,
1139 struct lustre_handle lockh;
1144 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1145 ", intent: %s flags %#"LPF64"o\n", op_data->op_namelen,
1146 op_data->op_name, PFID(&op_data->op_fid2),
1147 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1151 if (fid_is_sane(&op_data->op_fid2) &&
1152 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1153 /* We could just return 1 immediately, but since we should only
1154 * be called in revalidate_it if we already have a lock, let's
1156 it->d.lustre.it_lock_handle = 0;
1157 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1158 /* Only return failure if it was not GETATTR by cfid
1159 (from inode_revalidate) */
1160 if (rc || op_data->op_namelen != 0)
1164 /* For case if upper layer did not alloc fid, do it now. */
1165 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1166 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1168 CERROR("Can't alloc new fid, rc %d\n", rc);
1173 rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
1178 *reqp = it->d.lustre.it_data;
1179 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1183 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1184 struct ptlrpc_request *req,
1187 struct mdc_getattr_args *ga = args;
1188 struct obd_export *exp = ga->ga_exp;
1189 struct md_enqueue_info *minfo = ga->ga_minfo;
1190 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1191 struct lookup_intent *it;
1192 struct lustre_handle *lockh;
1193 struct obd_device *obddev;
1194 struct ldlm_reply *lockrep;
1195 __u64 flags = LDLM_FL_HAS_INTENT;
1199 lockh = &minfo->mi_lockh;
1201 obddev = class_exp2obd(exp);
1203 obd_put_request_slot(&obddev->u.cli);
1204 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1207 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1208 &flags, NULL, 0, lockh, rc);
1210 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1211 mdc_clear_replay_flag(req, rc);
1215 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1216 LASSERT(lockrep != NULL);
1218 lockrep->lock_policy_res2 =
1219 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1221 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1225 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1229 OBD_FREE_PTR(einfo);
1230 minfo->mi_cb(req, minfo, rc);
1234 int mdc_intent_getattr_async(struct obd_export *exp,
1235 struct md_enqueue_info *minfo,
1236 struct ldlm_enqueue_info *einfo)
1238 struct md_op_data *op_data = &minfo->mi_data;
1239 struct lookup_intent *it = &minfo->mi_it;
1240 struct ptlrpc_request *req;
1241 struct mdc_getattr_args *ga;
1242 struct obd_device *obddev = class_exp2obd(exp);
1243 struct ldlm_res_id res_id;
1244 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1245 * for statahead currently. Consider CMD in future, such two bits
1246 * maybe managed by different MDS, should be adjusted then. */
1247 ldlm_policy_data_t policy = {
1248 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1249 MDS_INODELOCK_UPDATE }
1252 __u64 flags = LDLM_FL_HAS_INTENT;
1255 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#"
1257 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1258 ldlm_it2str(it->it_op), it->it_flags);
1260 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1261 req = mdc_intent_getattr_pack(exp, it, op_data);
1263 RETURN(PTR_ERR(req));
1265 rc = obd_get_request_slot(&obddev->u.cli);
1267 ptlrpc_req_finished(req);
1271 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1272 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1274 obd_put_request_slot(&obddev->u.cli);
1275 ptlrpc_req_finished(req);
1279 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1280 ga = ptlrpc_req_async_args(req);
1282 ga->ga_minfo = minfo;
1283 ga->ga_einfo = einfo;
1285 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1286 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);