4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
40 # include <linux/module.h>
41 # include <linux/pagemap.h>
42 # include <linux/miscdevice.h>
43 # include <linux/init.h>
45 # include <liblustre.h>
48 #include <lustre_acl.h>
49 #include <obd_class.h>
50 #include <lustre_dlm.h>
51 /* fid_res_name_eq() */
52 #include <lustre_fid.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
56 struct mdc_getattr_args {
57 struct obd_export *ga_exp;
58 struct md_enqueue_info *ga_minfo;
59 struct ldlm_enqueue_info *ga_einfo;
62 int it_disposition(struct lookup_intent *it, int flag)
64 return it->d.lustre.it_disposition & flag;
66 EXPORT_SYMBOL(it_disposition);
68 void it_set_disposition(struct lookup_intent *it, int flag)
70 it->d.lustre.it_disposition |= flag;
72 EXPORT_SYMBOL(it_set_disposition);
74 void it_clear_disposition(struct lookup_intent *it, int flag)
76 it->d.lustre.it_disposition &= ~flag;
78 EXPORT_SYMBOL(it_clear_disposition);
80 int it_open_error(int phase, struct lookup_intent *it)
82 if (it_disposition(it, DISP_OPEN_OPEN)) {
83 if (phase >= DISP_OPEN_OPEN)
84 return it->d.lustre.it_status;
89 if (it_disposition(it, DISP_OPEN_CREATE)) {
90 if (phase >= DISP_OPEN_CREATE)
91 return it->d.lustre.it_status;
96 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
97 if (phase >= DISP_LOOKUP_EXECD)
98 return it->d.lustre.it_status;
103 if (it_disposition(it, DISP_IT_EXECD)) {
104 if (phase >= DISP_IT_EXECD)
105 return it->d.lustre.it_status;
109 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
110 it->d.lustre.it_status);
114 EXPORT_SYMBOL(it_open_error);
116 /* this must be called on a lockh that is known to have a referenced lock */
117 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
120 struct ldlm_lock *lock;
121 struct inode *new_inode = data;
130 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
132 LASSERT(lock != NULL);
133 lock_res_and_lock(lock);
135 if (lock->l_resource->lr_lvb_inode &&
136 lock->l_resource->lr_lvb_inode != data) {
137 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
138 LASSERTF(old_inode->i_state & I_FREEING,
139 "Found existing inode %p/%lu/%u state %lu in lock: "
140 "setting data to %p/%lu/%u\n", old_inode,
141 old_inode->i_ino, old_inode->i_generation,
143 new_inode, new_inode->i_ino, new_inode->i_generation);
146 lock->l_resource->lr_lvb_inode = new_inode;
148 *bits = lock->l_policy_data.l_inodebits.bits;
150 unlock_res_and_lock(lock);
156 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
157 const struct lu_fid *fid, ldlm_type_t type,
158 ldlm_policy_data_t *policy, ldlm_mode_t mode,
159 struct lustre_handle *lockh)
161 struct ldlm_res_id res_id;
165 fid_build_reg_res_name(fid, &res_id);
166 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
167 &res_id, type, policy, mode, lockh, 0);
171 int mdc_cancel_unused(struct obd_export *exp,
172 const struct lu_fid *fid,
173 ldlm_policy_data_t *policy,
175 ldlm_cancel_flags_t flags,
178 struct ldlm_res_id res_id;
179 struct obd_device *obd = class_exp2obd(exp);
184 fid_build_reg_res_name(fid, &res_id);
185 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
186 policy, mode, flags, opaque);
190 int mdc_null_inode(struct obd_export *exp,
191 const struct lu_fid *fid)
193 struct ldlm_res_id res_id;
194 struct ldlm_resource *res;
195 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
198 LASSERTF(ns != NULL, "no namespace passed\n");
200 fid_build_reg_res_name(fid, &res_id);
202 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
207 res->lr_lvb_inode = NULL;
210 ldlm_resource_putref(res);
214 /* find any ldlm lock of the inode in mdc
218 int mdc_find_cbdata(struct obd_export *exp,
219 const struct lu_fid *fid,
220 ldlm_iterator_t it, void *data)
222 struct ldlm_res_id res_id;
226 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
227 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
229 if (rc == LDLM_ITER_STOP)
231 else if (rc == LDLM_ITER_CONTINUE)
236 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
238 /* Don't hold error requests for replay. */
239 if (req->rq_replay) {
240 spin_lock(&req->rq_lock);
242 spin_unlock(&req->rq_lock);
244 if (rc && req->rq_transno != 0) {
245 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
250 /* Save a large LOV EA into the request buffer so that it is available
251 * for replay. We don't do this in the initial request because the
252 * original request doesn't need this buffer (at most it sends just the
253 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
254 * buffer and may also be difficult to allocate and save a very large
255 * request buffer for each open. (bug 5707)
257 * OOM here may cause recovery failure if lmm is needed (only for the
258 * original open if the MDS crashed just when this client also OOM'd)
259 * but this is incredibly unlikely, and questionable whether the client
260 * could do MDS recovery under OOM anyways... */
261 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
262 struct mdt_body *body)
266 /* FIXME: remove this explicit offset. */
267 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
270 CERROR("Can't enlarge segment %d size to %d\n",
271 DLM_INTENT_REC_OFF + 4, body->eadatasize);
272 body->valid &= ~OBD_MD_FLEASIZE;
273 body->eadatasize = 0;
277 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
278 struct lookup_intent *it,
279 struct md_op_data *op_data,
280 void *lmm, int lmmsize,
283 struct ptlrpc_request *req;
284 struct obd_device *obddev = class_exp2obd(exp);
285 struct ldlm_intent *lit;
286 CFS_LIST_HEAD(cancels);
292 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
294 /* XXX: openlock is not cancelled for cross-refs. */
295 /* If inode is known, cancel conflicting OPEN locks. */
296 if (fid_is_sane(&op_data->op_fid2)) {
297 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
300 else if (it->it_flags & FMODE_EXEC)
305 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
310 /* If CREATE, cancel parent's UPDATE lock. */
311 if (it->it_op & IT_CREAT)
315 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
317 MDS_INODELOCK_UPDATE);
319 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
320 &RQF_LDLM_INTENT_OPEN);
322 ldlm_lock_list_put(&cancels, l_bl_ast, count);
323 RETURN(ERR_PTR(-ENOMEM));
326 /* parent capability */
327 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
328 /* child capability, reserve the size according to parent capa, it will
329 * be filled after we get the reply */
330 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
332 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
333 op_data->op_namelen + 1);
334 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
335 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
337 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
339 ptlrpc_request_free(req);
343 spin_lock(&req->rq_lock);
344 req->rq_replay = req->rq_import->imp_replayable;
345 spin_unlock(&req->rq_lock);
347 /* pack the intent */
348 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
349 lit->opc = (__u64)it->it_op;
351 /* pack the intended request */
352 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
355 /* for remote client, fetch remote perm for current user */
356 if (client_is_remote(exp))
357 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
358 sizeof(struct mdt_remote_perm));
359 ptlrpc_request_set_replen(req);
363 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
364 struct lookup_intent *it,
365 struct md_op_data *op_data)
367 struct ptlrpc_request *req;
368 struct obd_device *obddev = class_exp2obd(exp);
369 struct ldlm_intent *lit;
373 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
374 &RQF_LDLM_INTENT_UNLINK);
376 RETURN(ERR_PTR(-ENOMEM));
378 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
379 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
380 op_data->op_namelen + 1);
382 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
384 ptlrpc_request_free(req);
388 /* pack the intent */
389 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
390 lit->opc = (__u64)it->it_op;
392 /* pack the intended request */
393 mdc_unlink_pack(req, op_data);
395 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
396 obddev->u.cli.cl_max_mds_easize);
397 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
398 obddev->u.cli.cl_max_mds_cookiesize);
399 ptlrpc_request_set_replen(req);
403 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
404 struct lookup_intent *it,
405 struct md_op_data *op_data)
407 struct ptlrpc_request *req;
408 struct obd_device *obddev = class_exp2obd(exp);
409 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
410 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
411 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
412 (client_is_remote(exp) ?
413 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
414 struct ldlm_intent *lit;
418 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
419 &RQF_LDLM_INTENT_GETATTR);
421 RETURN(ERR_PTR(-ENOMEM));
423 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
424 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
425 op_data->op_namelen + 1);
427 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
429 ptlrpc_request_free(req);
433 /* pack the intent */
434 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
435 lit->opc = (__u64)it->it_op;
437 /* pack the intended request */
438 mdc_getattr_pack(req, valid, it->it_flags, op_data,
439 obddev->u.cli.cl_max_mds_easize);
441 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
442 obddev->u.cli.cl_max_mds_easize);
443 if (client_is_remote(exp))
444 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
445 sizeof(struct mdt_remote_perm));
446 ptlrpc_request_set_replen(req);
450 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
451 struct lookup_intent *it,
452 struct md_op_data *unused)
454 struct obd_device *obd = class_exp2obd(exp);
455 struct ptlrpc_request *req;
456 struct ldlm_intent *lit;
457 struct layout_intent *layout;
461 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
462 &RQF_LDLM_INTENT_LAYOUT);
464 RETURN(ERR_PTR(-ENOMEM));
466 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
467 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
469 ptlrpc_request_free(req);
473 /* pack the intent */
474 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
475 lit->opc = (__u64)it->it_op;
477 /* pack the layout intent request */
478 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
479 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
480 * set for replication */
481 layout->li_opc = LAYOUT_INTENT_ACCESS;
483 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
484 obd->u.cli.cl_max_mds_easize);
485 ptlrpc_request_set_replen(req);
489 static struct ptlrpc_request *
490 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
492 struct ptlrpc_request *req;
496 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
498 RETURN(ERR_PTR(-ENOMEM));
500 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
502 ptlrpc_request_free(req);
506 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
507 ptlrpc_request_set_replen(req);
511 static int mdc_finish_enqueue(struct obd_export *exp,
512 struct ptlrpc_request *req,
513 struct ldlm_enqueue_info *einfo,
514 struct lookup_intent *it,
515 struct lustre_handle *lockh,
518 struct req_capsule *pill = &req->rq_pill;
519 struct ldlm_request *lockreq;
520 struct ldlm_reply *lockrep;
521 struct lustre_intent_data *intent = &it->d.lustre;
522 struct ldlm_lock *lock;
523 void *lvb_data = NULL;
528 /* Similarly, if we're going to replay this request, we don't want to
529 * actually get a lock, just perform the intent. */
530 if (req->rq_transno || req->rq_replay) {
531 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
532 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
535 if (rc == ELDLM_LOCK_ABORTED) {
537 memset(lockh, 0, sizeof(*lockh));
539 } else { /* rc = 0 */
540 lock = ldlm_handle2lock(lockh);
541 LASSERT(lock != NULL);
543 /* If the server gave us back a different lock mode, we should
544 * fix up our variables. */
545 if (lock->l_req_mode != einfo->ei_mode) {
546 ldlm_lock_addref(lockh, lock->l_req_mode);
547 ldlm_lock_decref(lockh, einfo->ei_mode);
548 einfo->ei_mode = lock->l_req_mode;
553 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
554 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
556 intent->it_disposition = (int)lockrep->lock_policy_res1;
557 intent->it_status = (int)lockrep->lock_policy_res2;
558 intent->it_lock_mode = einfo->ei_mode;
559 intent->it_lock_handle = lockh->cookie;
560 intent->it_data = req;
562 /* Technically speaking rq_transno must already be zero if
563 * it_status is in error, so the check is a bit redundant */
564 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
565 mdc_clear_replay_flag(req, intent->it_status);
567 /* If we're doing an IT_OPEN which did not result in an actual
568 * successful open, then we need to remove the bit which saves
569 * this request for unconditional replay.
571 * It's important that we do this first! Otherwise we might exit the
572 * function without doing so, and try to replay a failed create
574 if (it->it_op & IT_OPEN && req->rq_replay &&
575 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
576 mdc_clear_replay_flag(req, intent->it_status);
578 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
579 it->it_op, intent->it_disposition, intent->it_status);
581 /* We know what to expect, so we do any byte flipping required here */
582 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
583 struct mdt_body *body;
585 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
587 CERROR ("Can't swab mdt_body\n");
591 if (it_disposition(it, DISP_OPEN_OPEN) &&
592 !it_open_error(DISP_OPEN_OPEN, it)) {
594 * If this is a successful OPEN request, we need to set
595 * replay handler and data early, so that if replay
596 * happens immediately after swabbing below, new reply
597 * is swabbed by that handler correctly.
599 mdc_set_open_replay_data(NULL, NULL, req);
602 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
605 mdc_update_max_ea_from_body(exp, body);
608 * The eadata is opaque; just check that it is there.
609 * Eventually, obd_unpackmd() will check the contents.
611 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
616 /* save lvb data and length in case this is for layout
619 lvb_len = body->eadatasize;
622 * We save the reply LOV EA in case we have to replay a
623 * create for recovery. If we didn't allocate a large
624 * enough request buffer above we need to reallocate it
625 * here to hold the actual LOV EA.
627 * To not save LOV EA if request is not going to replay
628 * (for example error one).
630 if ((it->it_op & IT_OPEN) && req->rq_replay) {
632 if (req_capsule_get_size(pill, &RMF_EADATA,
635 mdc_realloc_openmsg(req, body);
637 req_capsule_shrink(pill, &RMF_EADATA,
641 req_capsule_set_size(pill, &RMF_EADATA,
645 lmm = req_capsule_client_get(pill, &RMF_EADATA);
647 memcpy(lmm, eadata, body->eadatasize);
651 if (body->valid & OBD_MD_FLRMTPERM) {
652 struct mdt_remote_perm *perm;
654 LASSERT(client_is_remote(exp));
655 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
656 lustre_swab_mdt_remote_perm);
660 if (body->valid & OBD_MD_FLMDSCAPA) {
661 struct lustre_capa *capa, *p;
663 capa = req_capsule_server_get(pill, &RMF_CAPA1);
667 if (it->it_op & IT_OPEN) {
668 /* client fid capa will be checked in replay */
669 p = req_capsule_client_get(pill, &RMF_CAPA2);
674 if (body->valid & OBD_MD_FLOSSCAPA) {
675 struct lustre_capa *capa;
677 capa = req_capsule_server_get(pill, &RMF_CAPA2);
681 } else if (it->it_op & IT_LAYOUT) {
682 /* maybe the lock was granted right away and layout
683 * is packed into RMF_DLM_LVB of req */
684 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
686 lvb_data = req_capsule_server_sized_get(pill,
687 &RMF_DLM_LVB, lvb_len);
688 if (lvb_data == NULL)
693 /* fill in stripe data for layout lock */
694 lock = ldlm_handle2lock(lockh);
695 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
698 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
699 ldlm_it2str(it->it_op), lvb_len);
701 OBD_ALLOC_LARGE(lmm, lvb_len);
706 memcpy(lmm, lvb_data, lvb_len);
708 /* install lvb_data */
709 lock_res_and_lock(lock);
710 if (lock->l_lvb_data == NULL) {
711 lock->l_lvb_data = lmm;
712 lock->l_lvb_len = lvb_len;
715 unlock_res_and_lock(lock);
717 OBD_FREE_LARGE(lmm, lvb_len);
725 /* We always reserve enough space in the reply packet for a stripe MD, because
726 * we don't know in advance the file type. */
727 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
728 struct lookup_intent *it, struct md_op_data *op_data,
729 struct lustre_handle *lockh, void *lmm, int lmmsize,
730 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
732 struct obd_device *obddev = class_exp2obd(exp);
733 struct ptlrpc_request *req = NULL;
734 __u64 flags, saved_flags = extra_lock_flags;
736 struct ldlm_res_id res_id;
737 static const ldlm_policy_data_t lookup_policy =
738 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
739 static const ldlm_policy_data_t update_policy =
740 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
741 static const ldlm_policy_data_t layout_policy =
742 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
743 ldlm_policy_data_t const *policy = &lookup_policy;
744 int generation, resends = 0;
745 struct ldlm_reply *lockrep;
746 enum lvb_type lvb_type = 0;
749 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
752 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
755 saved_flags |= LDLM_FL_HAS_INTENT;
756 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
757 policy = &update_policy;
758 else if (it->it_op & IT_LAYOUT)
759 policy = &layout_policy;
762 LASSERT(reqp == NULL);
764 generation = obddev->u.cli.cl_import->imp_generation;
768 /* The only way right now is FLOCK, in this case we hide flock
769 policy as lmm, but lmmsize is 0 */
770 LASSERT(lmm && lmmsize == 0);
771 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
773 policy = (ldlm_policy_data_t *)lmm;
774 res_id.name[3] = LDLM_FLOCK;
775 } else if (it->it_op & IT_OPEN) {
776 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
778 policy = &update_policy;
779 einfo->ei_cbdata = NULL;
781 } else if (it->it_op & IT_UNLINK) {
782 req = mdc_intent_unlink_pack(exp, it, op_data);
783 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
784 req = mdc_intent_getattr_pack(exp, it, op_data);
785 } else if (it->it_op & IT_READDIR) {
786 req = mdc_enqueue_pack(exp, 0);
787 } else if (it->it_op & IT_LAYOUT) {
788 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
791 req = mdc_intent_layout_pack(exp, it, op_data);
792 lvb_type = LVB_T_LAYOUT;
799 RETURN(PTR_ERR(req));
801 if (req != NULL && it && it->it_op & IT_CREAT)
802 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
804 req->rq_no_retry_einprogress = 1;
807 req->rq_generation_set = 1;
808 req->rq_import_generation = generation;
809 req->rq_sent = cfs_time_current_sec() + resends;
812 /* It is important to obtain rpc_lock first (if applicable), so that
813 * threads that are serialised with rpc_lock are not polluting our
814 * rpcs in flight counter. We do not do flock request limiting, though*/
816 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
817 rc = mdc_enter_request(&obddev->u.cli);
819 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
820 mdc_clear_replay_flag(req, 0);
821 ptlrpc_req_finished(req);
826 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
827 0, lvb_type, lockh, 0);
829 /* For flock requests we immediatelly return without further
830 delay and let caller deal with the rest, since rest of
831 this function metadata processing makes no sense for flock
836 mdc_exit_request(&obddev->u.cli);
837 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
840 CERROR("ldlm_cli_enqueue: %d\n", rc);
841 mdc_clear_replay_flag(req, rc);
842 ptlrpc_req_finished(req);
846 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
847 LASSERT(lockrep != NULL);
849 /* Retry the create infinitely when we get -EINPROGRESS from
850 * server. This is required by the new quota design. */
851 if (it && it->it_op & IT_CREAT &&
852 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
853 mdc_clear_replay_flag(req, rc);
854 ptlrpc_req_finished(req);
857 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
858 obddev->obd_name, resends, it->it_op,
859 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
861 if (generation == obddev->u.cli.cl_import->imp_generation) {
864 CDEBUG(D_HA, "resend cross eviction\n");
869 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
871 if (lustre_handle_is_used(lockh)) {
872 ldlm_lock_decref(lockh, einfo->ei_mode);
873 memset(lockh, 0, sizeof(*lockh));
875 ptlrpc_req_finished(req);
880 static int mdc_finish_intent_lock(struct obd_export *exp,
881 struct ptlrpc_request *request,
882 struct md_op_data *op_data,
883 struct lookup_intent *it,
884 struct lustre_handle *lockh)
886 struct lustre_handle old_lock;
887 struct mdt_body *mdt_body;
888 struct ldlm_lock *lock;
892 LASSERT(request != NULL);
893 LASSERT(request != LP_POISON);
894 LASSERT(request->rq_repmsg != LP_POISON);
896 if (!it_disposition(it, DISP_IT_EXECD)) {
897 /* The server failed before it even started executing the
898 * intent, i.e. because it couldn't unpack the request. */
899 LASSERT(it->d.lustre.it_status != 0);
900 RETURN(it->d.lustre.it_status);
902 rc = it_open_error(DISP_IT_EXECD, it);
906 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
907 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
909 /* If we were revalidating a fid/name pair, mark the intent in
910 * case we fail and get called again from lookup */
911 if (fid_is_sane(&op_data->op_fid2) &&
912 it->it_create_mode & M_CHECK_STALE &&
913 it->it_op != IT_GETATTR) {
914 it_set_disposition(it, DISP_ENQ_COMPLETE);
916 /* Also: did we find the same inode? */
917 /* sever can return one of two fids:
918 * op_fid2 - new allocated fid - if file is created.
919 * op_fid3 - existent fid - if file only open.
920 * op_fid3 is saved in lmv_intent_open */
921 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
922 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
923 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
924 "\n", PFID(&op_data->op_fid2),
925 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
930 rc = it_open_error(DISP_LOOKUP_EXECD, it);
934 /* keep requests around for the multiple phases of the call
935 * this shows the DISP_XX must guarantee we make it into the call
937 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
938 it_disposition(it, DISP_OPEN_CREATE) &&
939 !it_open_error(DISP_OPEN_CREATE, it)) {
940 it_set_disposition(it, DISP_ENQ_CREATE_REF);
941 ptlrpc_request_addref(request); /* balanced in ll_create_node */
943 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
944 it_disposition(it, DISP_OPEN_OPEN) &&
945 !it_open_error(DISP_OPEN_OPEN, it)) {
946 it_set_disposition(it, DISP_ENQ_OPEN_REF);
947 ptlrpc_request_addref(request); /* balanced in ll_file_open */
948 /* BUG 11546 - eviction in the middle of open rpc processing */
949 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
952 if (it->it_op & IT_CREAT) {
953 /* XXX this belongs in ll_create_it */
954 } else if (it->it_op == IT_OPEN) {
955 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
957 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
960 /* If we already have a matching lock, then cancel the new
961 * one. We have to set the data here instead of in
962 * mdc_enqueue, because we need to use the child's inode as
963 * the l_ast_data to match, and that's not available until
964 * intent_finish has performed the iget().) */
965 lock = ldlm_handle2lock(lockh);
967 ldlm_policy_data_t policy = lock->l_policy_data;
968 LDLM_DEBUG(lock, "matching against this");
970 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
971 &lock->l_resource->lr_name),
972 "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
973 (unsigned long)lock->l_resource->lr_name.name[0],
974 (unsigned long)lock->l_resource->lr_name.name[1],
975 (unsigned long)lock->l_resource->lr_name.name[2],
976 (unsigned long)fid_seq(&mdt_body->fid1),
977 (unsigned long)fid_oid(&mdt_body->fid1),
978 (unsigned long)fid_ver(&mdt_body->fid1));
981 memcpy(&old_lock, lockh, sizeof(*lockh));
982 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
983 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
984 ldlm_lock_decref_and_cancel(lockh,
985 it->d.lustre.it_lock_mode);
986 memcpy(lockh, &old_lock, sizeof(old_lock));
987 it->d.lustre.it_lock_handle = lockh->cookie;
990 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
991 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
992 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
996 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
997 struct lu_fid *fid, __u64 *bits)
999 /* We could just return 1 immediately, but since we should only
1000 * be called in revalidate_it if we already have a lock, let's
1002 struct ldlm_res_id res_id;
1003 struct lustre_handle lockh;
1004 ldlm_policy_data_t policy;
1008 if (it->d.lustre.it_lock_handle) {
1009 lockh.cookie = it->d.lustre.it_lock_handle;
1010 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1012 fid_build_reg_res_name(fid, &res_id);
1013 switch (it->it_op) {
1015 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1018 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1021 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1024 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1025 LDLM_FL_BLOCK_GRANTED, &res_id,
1026 LDLM_IBITS, &policy,
1027 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1031 it->d.lustre.it_lock_handle = lockh.cookie;
1032 it->d.lustre.it_lock_mode = mode;
1034 it->d.lustre.it_lock_handle = 0;
1035 it->d.lustre.it_lock_mode = 0;
1042 * This long block is all about fixing up the lock and request state
1043 * so that it is correct as of the moment _before_ the operation was
1044 * applied; that way, the VFS will think that everything is normal and
1045 * call Lustre's regular VFS methods.
1047 * If we're performing a creation, that means that unless the creation
1048 * failed with EEXIST, we should fake up a negative dentry.
1050 * For everything else, we want to lookup to succeed.
1052 * One additional note: if CREATE or OPEN succeeded, we add an extra
1053 * reference to the request because we need to keep it around until
1054 * ll_create/ll_open gets called.
1056 * The server will return to us, in it_disposition, an indication of
1057 * exactly what d.lustre.it_status refers to.
1059 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1060 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1061 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1062 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1065 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1068 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1069 void *lmm, int lmmsize, struct lookup_intent *it,
1070 int lookup_flags, struct ptlrpc_request **reqp,
1071 ldlm_blocking_callback cb_blocking,
1072 __u64 extra_lock_flags)
1074 struct lustre_handle lockh;
1079 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1080 ", intent: %s flags %#o\n", op_data->op_namelen,
1081 op_data->op_name, PFID(&op_data->op_fid2),
1082 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1086 if (fid_is_sane(&op_data->op_fid2) &&
1087 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1088 /* We could just return 1 immediately, but since we should only
1089 * be called in revalidate_it if we already have a lock, let's
1091 it->d.lustre.it_lock_handle = 0;
1092 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1093 /* Only return failure if it was not GETATTR by cfid
1094 (from inode_revalidate) */
1095 if (rc || op_data->op_namelen != 0)
1099 /* lookup_it may be called only after revalidate_it has run, because
1100 * revalidate_it cannot return errors, only zero. Returning zero causes
1101 * this call to lookup, which *can* return an error.
1103 * We only want to execute the request associated with the intent one
1104 * time, however, so don't send the request again. Instead, skip past
1105 * this and use the request from revalidate. In this case, revalidate
1106 * never dropped its reference, so the refcounts are all OK */
1107 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1108 struct ldlm_enqueue_info einfo =
1109 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
1110 ldlm_completion_ast, NULL, NULL, NULL };
1112 /* For case if upper layer did not alloc fid, do it now. */
1113 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1114 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1116 CERROR("Can't alloc new fid, rc %d\n", rc);
1120 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1121 lmm, lmmsize, NULL, extra_lock_flags);
1124 } else if (!fid_is_sane(&op_data->op_fid2) ||
1125 !(it->it_create_mode & M_CHECK_STALE)) {
1126 /* DISP_ENQ_COMPLETE set means there is extra reference on
1127 * request referenced from this intent, saved for subsequent
1128 * lookup. This path is executed when we proceed to this
1129 * lookup, so we clear DISP_ENQ_COMPLETE */
1130 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1132 *reqp = it->d.lustre.it_data;
1133 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1137 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1138 struct ptlrpc_request *req,
1141 struct mdc_getattr_args *ga = args;
1142 struct obd_export *exp = ga->ga_exp;
1143 struct md_enqueue_info *minfo = ga->ga_minfo;
1144 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1145 struct lookup_intent *it;
1146 struct lustre_handle *lockh;
1147 struct obd_device *obddev;
1148 __u64 flags = LDLM_FL_HAS_INTENT;
1152 lockh = &minfo->mi_lockh;
1154 obddev = class_exp2obd(exp);
1156 mdc_exit_request(&obddev->u.cli);
1157 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1160 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1161 &flags, NULL, 0, lockh, rc);
1163 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1164 mdc_clear_replay_flag(req, rc);
1168 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1172 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1176 OBD_FREE_PTR(einfo);
1177 minfo->mi_cb(req, minfo, rc);
1181 int mdc_intent_getattr_async(struct obd_export *exp,
1182 struct md_enqueue_info *minfo,
1183 struct ldlm_enqueue_info *einfo)
1185 struct md_op_data *op_data = &minfo->mi_data;
1186 struct lookup_intent *it = &minfo->mi_it;
1187 struct ptlrpc_request *req;
1188 struct mdc_getattr_args *ga;
1189 struct obd_device *obddev = class_exp2obd(exp);
1190 struct ldlm_res_id res_id;
1191 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1192 * for statahead currently. Consider CMD in future, such two bits
1193 * maybe managed by different MDS, should be adjusted then. */
1194 ldlm_policy_data_t policy = {
1195 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1196 MDS_INODELOCK_UPDATE }
1199 __u64 flags = LDLM_FL_HAS_INTENT;
1202 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1203 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1204 ldlm_it2str(it->it_op), it->it_flags);
1206 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1207 req = mdc_intent_getattr_pack(exp, it, op_data);
1211 rc = mdc_enter_request(&obddev->u.cli);
1213 ptlrpc_req_finished(req);
1217 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1218 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1220 mdc_exit_request(&obddev->u.cli);
1221 ptlrpc_req_finished(req);
1225 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1226 ga = ptlrpc_req_async_args(req);
1228 ga->ga_minfo = minfo;
1229 ga->ga_einfo = einfo;
1231 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1232 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);