4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
40 # include <linux/module.h>
41 # include <linux/lustre_intent.h>
43 # include <liblustre.h>
47 #include <obd_class.h>
48 #include <lustre_dlm.h>
49 #include <lustre_fid.h> /* fid_res_name_eq() */
50 #include <lustre_mdc.h>
51 #include <lustre_net.h>
52 #include <lustre_req_layout.h>
53 #include "mdc_internal.h"
55 struct mdc_getattr_args {
56 struct obd_export *ga_exp;
57 struct md_enqueue_info *ga_minfo;
58 struct ldlm_enqueue_info *ga_einfo;
61 int it_disposition(struct lookup_intent *it, int flag)
63 return it->d.lustre.it_disposition & flag;
65 EXPORT_SYMBOL(it_disposition);
67 void it_set_disposition(struct lookup_intent *it, int flag)
69 it->d.lustre.it_disposition |= flag;
71 EXPORT_SYMBOL(it_set_disposition);
73 void it_clear_disposition(struct lookup_intent *it, int flag)
75 it->d.lustre.it_disposition &= ~flag;
77 EXPORT_SYMBOL(it_clear_disposition);
79 int it_open_error(int phase, struct lookup_intent *it)
81 if (it_disposition(it, DISP_OPEN_LEASE)) {
82 if (phase >= DISP_OPEN_LEASE)
83 return it->d.lustre.it_status;
87 if (it_disposition(it, DISP_OPEN_OPEN)) {
88 if (phase >= DISP_OPEN_OPEN)
89 return it->d.lustre.it_status;
94 if (it_disposition(it, DISP_OPEN_CREATE)) {
95 if (phase >= DISP_OPEN_CREATE)
96 return it->d.lustre.it_status;
101 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
102 if (phase >= DISP_LOOKUP_EXECD)
103 return it->d.lustre.it_status;
108 if (it_disposition(it, DISP_IT_EXECD)) {
109 if (phase >= DISP_IT_EXECD)
110 return it->d.lustre.it_status;
114 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
115 it->d.lustre.it_status);
119 EXPORT_SYMBOL(it_open_error);
121 /* this must be called on a lockh that is known to have a referenced lock */
122 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
125 struct ldlm_lock *lock;
126 struct inode *new_inode = data;
135 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
137 LASSERT(lock != NULL);
138 lock_res_and_lock(lock);
140 if (lock->l_resource->lr_lvb_inode &&
141 lock->l_resource->lr_lvb_inode != data) {
142 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
143 LASSERTF(old_inode->i_state & I_FREEING,
144 "Found existing inode %p/%lu/%u state %lu in lock: "
145 "setting data to %p/%lu/%u\n", old_inode,
146 old_inode->i_ino, old_inode->i_generation,
148 new_inode, new_inode->i_ino, new_inode->i_generation);
151 lock->l_resource->lr_lvb_inode = new_inode;
153 *bits = lock->l_policy_data.l_inodebits.bits;
155 unlock_res_and_lock(lock);
161 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
162 const struct lu_fid *fid, ldlm_type_t type,
163 ldlm_policy_data_t *policy, ldlm_mode_t mode,
164 struct lustre_handle *lockh)
166 struct ldlm_res_id res_id;
170 fid_build_reg_res_name(fid, &res_id);
171 /* LU-4405: Clear bits not supported by server */
172 policy->l_inodebits.bits &= exp_connect_ibits(exp);
173 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
174 &res_id, type, policy, mode, lockh, 0);
178 int mdc_cancel_unused(struct obd_export *exp,
179 const struct lu_fid *fid,
180 ldlm_policy_data_t *policy,
182 ldlm_cancel_flags_t flags,
185 struct ldlm_res_id res_id;
186 struct obd_device *obd = class_exp2obd(exp);
191 fid_build_reg_res_name(fid, &res_id);
192 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
193 policy, mode, flags, opaque);
197 int mdc_null_inode(struct obd_export *exp,
198 const struct lu_fid *fid)
200 struct ldlm_res_id res_id;
201 struct ldlm_resource *res;
202 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
205 LASSERTF(ns != NULL, "no namespace passed\n");
207 fid_build_reg_res_name(fid, &res_id);
209 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
214 res->lr_lvb_inode = NULL;
217 ldlm_resource_putref(res);
221 /* find any ldlm lock of the inode in mdc
225 int mdc_find_cbdata(struct obd_export *exp,
226 const struct lu_fid *fid,
227 ldlm_iterator_t it, void *data)
229 struct ldlm_res_id res_id;
233 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
234 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
236 if (rc == LDLM_ITER_STOP)
238 else if (rc == LDLM_ITER_CONTINUE)
243 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
245 /* Don't hold error requests for replay. */
246 if (req->rq_replay) {
247 spin_lock(&req->rq_lock);
249 spin_unlock(&req->rq_lock);
251 if (rc && req->rq_transno != 0) {
252 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
257 /* Save a large LOV EA into the request buffer so that it is available
258 * for replay. We don't do this in the initial request because the
259 * original request doesn't need this buffer (at most it sends just the
260 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
261 * buffer and may also be difficult to allocate and save a very large
262 * request buffer for each open. (bug 5707)
264 * OOM here may cause recovery failure if lmm is needed (only for the
265 * original open if the MDS crashed just when this client also OOM'd)
266 * but this is incredibly unlikely, and questionable whether the client
267 * could do MDS recovery under OOM anyways... */
268 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
269 struct mdt_body *body)
273 /* FIXME: remove this explicit offset. */
274 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
277 CERROR("Can't enlarge segment %d size to %d\n",
278 DLM_INTENT_REC_OFF + 4, body->eadatasize);
279 body->valid &= ~OBD_MD_FLEASIZE;
280 body->eadatasize = 0;
284 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
285 struct lookup_intent *it,
286 struct md_op_data *op_data,
287 void *lmm, int lmmsize,
290 struct ptlrpc_request *req;
291 struct obd_device *obddev = class_exp2obd(exp);
292 struct ldlm_intent *lit;
293 CFS_LIST_HEAD(cancels);
299 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
301 /* XXX: openlock is not cancelled for cross-refs. */
302 /* If inode is known, cancel conflicting OPEN locks. */
303 if (fid_is_sane(&op_data->op_fid2)) {
304 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
305 if (it->it_flags & FMODE_WRITE)
310 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
313 else if (it->it_flags & FMODE_EXEC)
319 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
324 /* If CREATE, cancel parent's UPDATE lock. */
325 if (it->it_op & IT_CREAT)
329 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
331 MDS_INODELOCK_UPDATE);
333 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
334 &RQF_LDLM_INTENT_OPEN);
336 ldlm_lock_list_put(&cancels, l_bl_ast, count);
337 RETURN(ERR_PTR(-ENOMEM));
340 /* parent capability */
341 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
342 /* child capability, reserve the size according to parent capa, it will
343 * be filled after we get the reply */
344 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
346 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
347 op_data->op_namelen + 1);
348 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
349 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
351 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
353 ptlrpc_request_free(req);
357 spin_lock(&req->rq_lock);
358 req->rq_replay = req->rq_import->imp_replayable;
359 spin_unlock(&req->rq_lock);
361 /* pack the intent */
362 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
363 lit->opc = (__u64)it->it_op;
365 /* pack the intended request */
366 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
369 /* for remote client, fetch remote perm for current user */
370 if (client_is_remote(exp))
371 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
372 sizeof(struct mdt_remote_perm));
373 ptlrpc_request_set_replen(req);
377 static struct ptlrpc_request *
378 mdc_intent_getxattr_pack(struct obd_export *exp,
379 struct lookup_intent *it,
380 struct md_op_data *op_data)
382 struct ptlrpc_request *req;
383 struct ldlm_intent *lit;
384 int rc, count = 0, maxdata;
385 CFS_LIST_HEAD(cancels);
389 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
390 &RQF_LDLM_INTENT_GETXATTR);
392 RETURN(ERR_PTR(-ENOMEM));
394 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
396 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
398 ptlrpc_request_free(req);
402 /* pack the intent */
403 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
404 lit->opc = IT_GETXATTR;
406 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
408 /* pack the intended request */
409 mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
410 op_data->op_valid, maxdata, -1, 0);
412 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
413 RCL_SERVER, maxdata);
415 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
416 RCL_SERVER, maxdata);
418 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
419 RCL_SERVER, maxdata);
421 ptlrpc_request_set_replen(req);
426 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
427 struct lookup_intent *it,
428 struct md_op_data *op_data)
430 struct ptlrpc_request *req;
431 struct obd_device *obddev = class_exp2obd(exp);
432 struct ldlm_intent *lit;
436 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
437 &RQF_LDLM_INTENT_UNLINK);
439 RETURN(ERR_PTR(-ENOMEM));
441 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
442 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
443 op_data->op_namelen + 1);
445 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
447 ptlrpc_request_free(req);
451 /* pack the intent */
452 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
453 lit->opc = (__u64)it->it_op;
455 /* pack the intended request */
456 mdc_unlink_pack(req, op_data);
458 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
459 obddev->u.cli.cl_max_mds_easize);
460 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
461 obddev->u.cli.cl_max_mds_cookiesize);
462 ptlrpc_request_set_replen(req);
466 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
467 struct lookup_intent *it,
468 struct md_op_data *op_data)
470 struct ptlrpc_request *req;
471 struct obd_device *obddev = class_exp2obd(exp);
472 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
473 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
474 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
475 (client_is_remote(exp) ?
476 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
477 struct ldlm_intent *lit;
481 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
482 &RQF_LDLM_INTENT_GETATTR);
484 RETURN(ERR_PTR(-ENOMEM));
486 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
487 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
488 op_data->op_namelen + 1);
490 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
492 ptlrpc_request_free(req);
496 /* pack the intent */
497 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
498 lit->opc = (__u64)it->it_op;
500 /* pack the intended request */
501 mdc_getattr_pack(req, valid, it->it_flags, op_data,
502 obddev->u.cli.cl_max_mds_easize);
504 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
505 obddev->u.cli.cl_max_mds_easize);
506 if (client_is_remote(exp))
507 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
508 sizeof(struct mdt_remote_perm));
509 ptlrpc_request_set_replen(req);
513 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
514 struct lookup_intent *it,
515 struct md_op_data *unused)
517 struct obd_device *obd = class_exp2obd(exp);
518 struct ptlrpc_request *req;
519 struct ldlm_intent *lit;
520 struct layout_intent *layout;
524 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
525 &RQF_LDLM_INTENT_LAYOUT);
527 RETURN(ERR_PTR(-ENOMEM));
529 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
530 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
532 ptlrpc_request_free(req);
536 /* pack the intent */
537 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
538 lit->opc = (__u64)it->it_op;
540 /* pack the layout intent request */
541 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
542 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
543 * set for replication */
544 layout->li_opc = LAYOUT_INTENT_ACCESS;
546 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
547 obd->u.cli.cl_max_mds_easize);
548 ptlrpc_request_set_replen(req);
552 static struct ptlrpc_request *
553 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
555 struct ptlrpc_request *req;
559 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
561 RETURN(ERR_PTR(-ENOMEM));
563 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
565 ptlrpc_request_free(req);
569 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
570 ptlrpc_request_set_replen(req);
574 static int mdc_finish_enqueue(struct obd_export *exp,
575 struct ptlrpc_request *req,
576 struct ldlm_enqueue_info *einfo,
577 struct lookup_intent *it,
578 struct lustre_handle *lockh,
581 struct req_capsule *pill = &req->rq_pill;
582 struct ldlm_request *lockreq;
583 struct ldlm_reply *lockrep;
584 struct lustre_intent_data *intent = &it->d.lustre;
585 struct ldlm_lock *lock;
586 void *lvb_data = NULL;
591 /* Similarly, if we're going to replay this request, we don't want to
592 * actually get a lock, just perform the intent. */
593 if (req->rq_transno || req->rq_replay) {
594 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
595 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
598 if (rc == ELDLM_LOCK_ABORTED) {
600 memset(lockh, 0, sizeof(*lockh));
602 } else { /* rc = 0 */
603 lock = ldlm_handle2lock(lockh);
604 LASSERT(lock != NULL);
606 /* If the server gave us back a different lock mode, we should
607 * fix up our variables. */
608 if (lock->l_req_mode != einfo->ei_mode) {
609 ldlm_lock_addref(lockh, lock->l_req_mode);
610 ldlm_lock_decref(lockh, einfo->ei_mode);
611 einfo->ei_mode = lock->l_req_mode;
616 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
617 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
619 intent->it_disposition = (int)lockrep->lock_policy_res1;
620 intent->it_status = (int)lockrep->lock_policy_res2;
621 intent->it_lock_mode = einfo->ei_mode;
622 intent->it_lock_handle = lockh->cookie;
623 intent->it_data = req;
625 /* Technically speaking rq_transno must already be zero if
626 * it_status is in error, so the check is a bit redundant */
627 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
628 mdc_clear_replay_flag(req, intent->it_status);
630 /* If we're doing an IT_OPEN which did not result in an actual
631 * successful open, then we need to remove the bit which saves
632 * this request for unconditional replay.
634 * It's important that we do this first! Otherwise we might exit the
635 * function without doing so, and try to replay a failed create
637 if (it->it_op & IT_OPEN && req->rq_replay &&
638 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
639 mdc_clear_replay_flag(req, intent->it_status);
641 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
642 it->it_op, intent->it_disposition, intent->it_status);
644 /* We know what to expect, so we do any byte flipping required here */
645 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
646 struct mdt_body *body;
648 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
650 CERROR ("Can't swab mdt_body\n");
654 if (it_disposition(it, DISP_OPEN_OPEN) &&
655 !it_open_error(DISP_OPEN_OPEN, it)) {
657 * If this is a successful OPEN request, we need to set
658 * replay handler and data early, so that if replay
659 * happens immediately after swabbing below, new reply
660 * is swabbed by that handler correctly.
662 mdc_set_open_replay_data(NULL, NULL, it);
665 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
668 mdc_update_max_ea_from_body(exp, body);
671 * The eadata is opaque; just check that it is there.
672 * Eventually, obd_unpackmd() will check the contents.
674 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
679 /* save lvb data and length in case this is for layout
682 lvb_len = body->eadatasize;
685 * We save the reply LOV EA in case we have to replay a
686 * create for recovery. If we didn't allocate a large
687 * enough request buffer above we need to reallocate it
688 * here to hold the actual LOV EA.
690 * To not save LOV EA if request is not going to replay
691 * (for example error one).
693 if ((it->it_op & IT_OPEN) && req->rq_replay) {
695 if (req_capsule_get_size(pill, &RMF_EADATA,
698 mdc_realloc_openmsg(req, body);
700 req_capsule_shrink(pill, &RMF_EADATA,
704 req_capsule_set_size(pill, &RMF_EADATA,
708 lmm = req_capsule_client_get(pill, &RMF_EADATA);
710 memcpy(lmm, eadata, body->eadatasize);
714 if (body->valid & OBD_MD_FLRMTPERM) {
715 struct mdt_remote_perm *perm;
717 LASSERT(client_is_remote(exp));
718 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
719 lustre_swab_mdt_remote_perm);
723 if (body->valid & OBD_MD_FLMDSCAPA) {
724 struct lustre_capa *capa, *p;
726 capa = req_capsule_server_get(pill, &RMF_CAPA1);
730 if (it->it_op & IT_OPEN) {
731 /* client fid capa will be checked in replay */
732 p = req_capsule_client_get(pill, &RMF_CAPA2);
737 if (body->valid & OBD_MD_FLOSSCAPA) {
738 struct lustre_capa *capa;
740 capa = req_capsule_server_get(pill, &RMF_CAPA2);
744 } else if (it->it_op & IT_LAYOUT) {
745 /* maybe the lock was granted right away and layout
746 * is packed into RMF_DLM_LVB of req */
747 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
749 lvb_data = req_capsule_server_sized_get(pill,
750 &RMF_DLM_LVB, lvb_len);
751 if (lvb_data == NULL)
756 /* fill in stripe data for layout lock */
757 lock = ldlm_handle2lock(lockh);
758 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
761 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
762 ldlm_it2str(it->it_op), lvb_len);
764 OBD_ALLOC_LARGE(lmm, lvb_len);
769 memcpy(lmm, lvb_data, lvb_len);
771 /* install lvb_data */
772 lock_res_and_lock(lock);
773 if (lock->l_lvb_data == NULL) {
774 lock->l_lvb_type = LVB_T_LAYOUT;
775 lock->l_lvb_data = lmm;
776 lock->l_lvb_len = lvb_len;
779 unlock_res_and_lock(lock);
781 OBD_FREE_LARGE(lmm, lvb_len);
789 /* We always reserve enough space in the reply packet for a stripe MD, because
790 * we don't know in advance the file type. */
791 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
792 struct lookup_intent *it, struct md_op_data *op_data,
793 struct lustre_handle *lockh, void *lmm, int lmmsize,
794 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
796 struct obd_device *obddev = class_exp2obd(exp);
797 struct ptlrpc_request *req = NULL;
798 __u64 flags, saved_flags = extra_lock_flags;
800 struct ldlm_res_id res_id;
801 static const ldlm_policy_data_t lookup_policy =
802 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
803 static const ldlm_policy_data_t update_policy =
804 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
805 static const ldlm_policy_data_t layout_policy =
806 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
807 static const ldlm_policy_data_t getxattr_policy = {
808 .l_inodebits = { MDS_INODELOCK_XATTR } };
809 ldlm_policy_data_t const *policy = &lookup_policy;
810 int generation, resends = 0;
811 struct ldlm_reply *lockrep;
812 enum lvb_type lvb_type = 0;
815 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
818 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
821 saved_flags |= LDLM_FL_HAS_INTENT;
822 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
823 policy = &update_policy;
824 else if (it->it_op & IT_LAYOUT)
825 policy = &layout_policy;
826 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
827 policy = &getxattr_policy;
830 LASSERT(reqp == NULL);
832 generation = obddev->u.cli.cl_import->imp_generation;
836 /* The only way right now is FLOCK, in this case we hide flock
837 policy as lmm, but lmmsize is 0 */
838 LASSERT(lmm && lmmsize == 0);
839 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
841 policy = (ldlm_policy_data_t *)lmm;
842 res_id.name[3] = LDLM_FLOCK;
843 } else if (it->it_op & IT_OPEN) {
844 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
846 policy = &update_policy;
847 einfo->ei_cbdata = NULL;
849 } else if (it->it_op & IT_UNLINK) {
850 req = mdc_intent_unlink_pack(exp, it, op_data);
851 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
852 req = mdc_intent_getattr_pack(exp, it, op_data);
853 } else if (it->it_op & IT_READDIR) {
854 req = mdc_enqueue_pack(exp, 0);
855 } else if (it->it_op & IT_LAYOUT) {
856 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
858 req = mdc_intent_layout_pack(exp, it, op_data);
859 lvb_type = LVB_T_LAYOUT;
860 } else if (it->it_op & IT_GETXATTR) {
861 req = mdc_intent_getxattr_pack(exp, it, op_data);
868 RETURN(PTR_ERR(req));
870 if (req != NULL && it && it->it_op & IT_CREAT)
871 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
873 req->rq_no_retry_einprogress = 1;
876 req->rq_generation_set = 1;
877 req->rq_import_generation = generation;
878 req->rq_sent = cfs_time_current_sec() + resends;
881 /* It is important to obtain rpc_lock first (if applicable), so that
882 * threads that are serialised with rpc_lock are not polluting our
883 * rpcs in flight counter. We do not do flock request limiting, though*/
885 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
886 rc = mdc_enter_request(&obddev->u.cli);
888 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
889 mdc_clear_replay_flag(req, 0);
890 ptlrpc_req_finished(req);
895 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
896 0, lvb_type, lockh, 0);
898 /* For flock requests we immediatelly return without further
899 delay and let caller deal with the rest, since rest of
900 this function metadata processing makes no sense for flock
901 requests anyway. But in case of problem during comms with
902 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
903 can not rely on caller and this mainly for F_UNLCKs
904 (explicits or automatically generated by Kernel to clean
905 current FLocks upon exit) that can't be trashed */
906 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
907 (einfo->ei_type == LDLM_FLOCK) &&
908 (einfo->ei_mode == LCK_NL))
913 mdc_exit_request(&obddev->u.cli);
914 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
917 CERROR("ldlm_cli_enqueue: %d\n", rc);
918 mdc_clear_replay_flag(req, rc);
919 ptlrpc_req_finished(req);
923 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
924 LASSERT(lockrep != NULL);
926 lockrep->lock_policy_res2 =
927 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
929 /* Retry the create infinitely when we get -EINPROGRESS from
930 * server. This is required by the new quota design. */
931 if (it && it->it_op & IT_CREAT &&
932 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
933 mdc_clear_replay_flag(req, rc);
934 ptlrpc_req_finished(req);
937 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
938 obddev->obd_name, resends, it->it_op,
939 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
941 if (generation == obddev->u.cli.cl_import->imp_generation) {
944 CDEBUG(D_HA, "resend cross eviction\n");
949 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
951 if (lustre_handle_is_used(lockh)) {
952 ldlm_lock_decref(lockh, einfo->ei_mode);
953 memset(lockh, 0, sizeof(*lockh));
955 ptlrpc_req_finished(req);
960 static int mdc_finish_intent_lock(struct obd_export *exp,
961 struct ptlrpc_request *request,
962 struct md_op_data *op_data,
963 struct lookup_intent *it,
964 struct lustre_handle *lockh)
966 struct lustre_handle old_lock;
967 struct mdt_body *mdt_body;
968 struct ldlm_lock *lock;
972 LASSERT(request != NULL);
973 LASSERT(request != LP_POISON);
974 LASSERT(request->rq_repmsg != LP_POISON);
976 if (it->it_op & IT_READDIR)
979 if (!it_disposition(it, DISP_IT_EXECD)) {
980 /* The server failed before it even started executing the
981 * intent, i.e. because it couldn't unpack the request. */
982 LASSERT(it->d.lustre.it_status != 0);
983 RETURN(it->d.lustre.it_status);
985 rc = it_open_error(DISP_IT_EXECD, it);
989 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
990 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
992 /* If we were revalidating a fid/name pair, mark the intent in
993 * case we fail and get called again from lookup */
994 if (fid_is_sane(&op_data->op_fid2) &&
995 it->it_create_mode & M_CHECK_STALE &&
996 it->it_op != IT_GETATTR) {
997 /* Also: did we find the same inode? */
998 /* sever can return one of two fids:
999 * op_fid2 - new allocated fid - if file is created.
1000 * op_fid3 - existent fid - if file only open.
1001 * op_fid3 is saved in lmv_intent_open */
1002 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
1003 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
1004 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
1005 "\n", PFID(&op_data->op_fid2),
1006 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
1011 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1015 /* keep requests around for the multiple phases of the call
1016 * this shows the DISP_XX must guarantee we make it into the call
1018 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1019 it_disposition(it, DISP_OPEN_CREATE) &&
1020 !it_open_error(DISP_OPEN_CREATE, it)) {
1021 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1022 ptlrpc_request_addref(request); /* balanced in ll_create_node */
1024 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1025 it_disposition(it, DISP_OPEN_OPEN) &&
1026 !it_open_error(DISP_OPEN_OPEN, it)) {
1027 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1028 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1029 /* BUG 11546 - eviction in the middle of open rpc processing */
1030 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1033 if (it->it_op & IT_CREAT) {
1034 /* XXX this belongs in ll_create_it */
1035 } else if (it->it_op == IT_OPEN) {
1036 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1038 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1041 /* If we already have a matching lock, then cancel the new
1042 * one. We have to set the data here instead of in
1043 * mdc_enqueue, because we need to use the child's inode as
1044 * the l_ast_data to match, and that's not available until
1045 * intent_finish has performed the iget().) */
1046 lock = ldlm_handle2lock(lockh);
1048 ldlm_policy_data_t policy = lock->l_policy_data;
1049 LDLM_DEBUG(lock, "matching against this");
1051 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1052 &lock->l_resource->lr_name),
1053 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1054 PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1055 LDLM_LOCK_PUT(lock);
1057 memcpy(&old_lock, lockh, sizeof(*lockh));
1058 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1059 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1060 ldlm_lock_decref_and_cancel(lockh,
1061 it->d.lustre.it_lock_mode);
1062 memcpy(lockh, &old_lock, sizeof(old_lock));
1063 it->d.lustre.it_lock_handle = lockh->cookie;
1066 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1067 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1068 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1072 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1073 struct lu_fid *fid, __u64 *bits)
1075 /* We could just return 1 immediately, but since we should only
1076 * be called in revalidate_it if we already have a lock, let's
1078 struct ldlm_res_id res_id;
1079 struct lustre_handle lockh;
1080 ldlm_policy_data_t policy;
1084 if (it->d.lustre.it_lock_handle) {
1085 lockh.cookie = it->d.lustre.it_lock_handle;
1086 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1088 fid_build_reg_res_name(fid, &res_id);
1089 switch (it->it_op) {
1091 /* File attributes are held under multiple bits:
1092 * nlink is under lookup lock, size and times are
1093 * under UPDATE lock and recently we've also got
1094 * a separate permissions lock for owner/group/acl that
1095 * were protected by lookup lock before.
1096 * Getattr must provide all of that information,
1097 * so we need to ensure we have all of those locks.
1098 * Unfortunately, if the bits are split across multiple
1099 * locks, there's no easy way to match all of them here,
1100 * so an extra RPC would be performed to fetch all
1101 * of those bits at once for now. */
1102 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1103 * but for old MDTs (< 2.4), permission is covered
1104 * by LOOKUP lock, so it needs to match all bits here.*/
1105 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1106 MDS_INODELOCK_LOOKUP |
1110 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1113 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1116 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1120 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1121 LDLM_IBITS, &policy,
1122 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1127 it->d.lustre.it_lock_handle = lockh.cookie;
1128 it->d.lustre.it_lock_mode = mode;
1130 it->d.lustre.it_lock_handle = 0;
1131 it->d.lustre.it_lock_mode = 0;
1138 * This long block is all about fixing up the lock and request state
1139 * so that it is correct as of the moment _before_ the operation was
1140 * applied; that way, the VFS will think that everything is normal and
1141 * call Lustre's regular VFS methods.
1143 * If we're performing a creation, that means that unless the creation
1144 * failed with EEXIST, we should fake up a negative dentry.
1146 * For everything else, we want to lookup to succeed.
1148 * One additional note: if CREATE or OPEN succeeded, we add an extra
1149 * reference to the request because we need to keep it around until
1150 * ll_create/ll_open gets called.
1152 * The server will return to us, in it_disposition, an indication of
1153 * exactly what d.lustre.it_status refers to.
1155 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1156 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1157 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1158 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1161 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1164 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1165 void *lmm, int lmmsize, struct lookup_intent *it,
1166 int lookup_flags, struct ptlrpc_request **reqp,
1167 ldlm_blocking_callback cb_blocking,
1168 __u64 extra_lock_flags)
1170 struct ldlm_enqueue_info einfo = {
1171 .ei_type = LDLM_IBITS,
1172 .ei_mode = it_to_lock_mode(it),
1173 .ei_cb_bl = cb_blocking,
1174 .ei_cb_cp = ldlm_completion_ast,
1176 struct lustre_handle lockh;
1181 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1182 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1183 op_data->op_name, PFID(&op_data->op_fid2),
1184 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1188 if (fid_is_sane(&op_data->op_fid2) &&
1189 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1190 /* We could just return 1 immediately, but since we should only
1191 * be called in revalidate_it if we already have a lock, let's
1193 it->d.lustre.it_lock_handle = 0;
1194 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1195 /* Only return failure if it was not GETATTR by cfid
1196 (from inode_revalidate) */
1197 if (rc || op_data->op_namelen != 0)
1201 /* For case if upper layer did not alloc fid, do it now. */
1202 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1203 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1205 CERROR("Can't alloc new fid, rc %d\n", rc);
1209 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, lmm, lmmsize, NULL,
1214 *reqp = it->d.lustre.it_data;
1215 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1219 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1220 struct ptlrpc_request *req,
1223 struct mdc_getattr_args *ga = args;
1224 struct obd_export *exp = ga->ga_exp;
1225 struct md_enqueue_info *minfo = ga->ga_minfo;
1226 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1227 struct lookup_intent *it;
1228 struct lustre_handle *lockh;
1229 struct obd_device *obddev;
1230 struct ldlm_reply *lockrep;
1231 __u64 flags = LDLM_FL_HAS_INTENT;
1235 lockh = &minfo->mi_lockh;
1237 obddev = class_exp2obd(exp);
1239 mdc_exit_request(&obddev->u.cli);
1240 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1243 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1244 &flags, NULL, 0, lockh, rc);
1246 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1247 mdc_clear_replay_flag(req, rc);
1251 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1252 LASSERT(lockrep != NULL);
1254 lockrep->lock_policy_res2 =
1255 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1257 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1261 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1265 OBD_FREE_PTR(einfo);
1266 minfo->mi_cb(req, minfo, rc);
1270 int mdc_intent_getattr_async(struct obd_export *exp,
1271 struct md_enqueue_info *minfo,
1272 struct ldlm_enqueue_info *einfo)
1274 struct md_op_data *op_data = &minfo->mi_data;
1275 struct lookup_intent *it = &minfo->mi_it;
1276 struct ptlrpc_request *req;
1277 struct mdc_getattr_args *ga;
1278 struct obd_device *obddev = class_exp2obd(exp);
1279 struct ldlm_res_id res_id;
1280 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1281 * for statahead currently. Consider CMD in future, such two bits
1282 * maybe managed by different MDS, should be adjusted then. */
1283 ldlm_policy_data_t policy = {
1284 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1285 MDS_INODELOCK_UPDATE }
1288 __u64 flags = LDLM_FL_HAS_INTENT;
1291 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1292 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1293 ldlm_it2str(it->it_op), it->it_flags);
1295 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1296 req = mdc_intent_getattr_pack(exp, it, op_data);
1298 RETURN(PTR_ERR(req));
1300 rc = mdc_enter_request(&obddev->u.cli);
1302 ptlrpc_req_finished(req);
1306 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1307 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1309 mdc_exit_request(&obddev->u.cli);
1310 ptlrpc_req_finished(req);
1314 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1315 ga = ptlrpc_req_async_args(req);
1317 ga->ga_minfo = minfo;
1318 ga->ga_einfo = einfo;
1320 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1321 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);