4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
40 # include <linux/module.h>
41 # include <linux/pagemap.h>
42 # include <linux/miscdevice.h>
43 # include <linux/init.h>
45 # include <liblustre.h>
48 #include <lustre_acl.h>
49 #include <obd_class.h>
50 #include <lustre_dlm.h>
51 /* fid_res_name_eq() */
52 #include <lustre_fid.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
56 struct mdc_getattr_args {
57 struct obd_export *ga_exp;
58 struct md_enqueue_info *ga_minfo;
59 struct ldlm_enqueue_info *ga_einfo;
62 int it_disposition(struct lookup_intent *it, int flag)
64 return it->d.lustre.it_disposition & flag;
66 EXPORT_SYMBOL(it_disposition);
68 void it_set_disposition(struct lookup_intent *it, int flag)
70 it->d.lustre.it_disposition |= flag;
72 EXPORT_SYMBOL(it_set_disposition);
74 void it_clear_disposition(struct lookup_intent *it, int flag)
76 it->d.lustre.it_disposition &= ~flag;
78 EXPORT_SYMBOL(it_clear_disposition);
80 int it_open_error(int phase, struct lookup_intent *it)
82 if (it_disposition(it, DISP_OPEN_OPEN)) {
83 if (phase >= DISP_OPEN_OPEN)
84 return it->d.lustre.it_status;
89 if (it_disposition(it, DISP_OPEN_CREATE)) {
90 if (phase >= DISP_OPEN_CREATE)
91 return it->d.lustre.it_status;
96 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
97 if (phase >= DISP_LOOKUP_EXECD)
98 return it->d.lustre.it_status;
103 if (it_disposition(it, DISP_IT_EXECD)) {
104 if (phase >= DISP_IT_EXECD)
105 return it->d.lustre.it_status;
109 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
110 it->d.lustre.it_status);
114 EXPORT_SYMBOL(it_open_error);
116 /* this must be called on a lockh that is known to have a referenced lock */
117 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
120 struct ldlm_lock *lock;
121 struct inode *new_inode = data;
130 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
132 LASSERT(lock != NULL);
133 lock_res_and_lock(lock);
135 if (lock->l_resource->lr_lvb_inode &&
136 lock->l_resource->lr_lvb_inode != data) {
137 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
138 LASSERTF(old_inode->i_state & I_FREEING,
139 "Found existing inode %p/%lu/%u state %lu in lock: "
140 "setting data to %p/%lu/%u\n", old_inode,
141 old_inode->i_ino, old_inode->i_generation,
143 new_inode, new_inode->i_ino, new_inode->i_generation);
146 lock->l_resource->lr_lvb_inode = new_inode;
148 *bits = lock->l_policy_data.l_inodebits.bits;
150 unlock_res_and_lock(lock);
156 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
157 const struct lu_fid *fid, ldlm_type_t type,
158 ldlm_policy_data_t *policy, ldlm_mode_t mode,
159 struct lustre_handle *lockh)
161 struct ldlm_res_id res_id;
165 fid_build_reg_res_name(fid, &res_id);
166 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
167 &res_id, type, policy, mode, lockh, 0);
171 int mdc_cancel_unused(struct obd_export *exp,
172 const struct lu_fid *fid,
173 ldlm_policy_data_t *policy,
175 ldlm_cancel_flags_t flags,
178 struct ldlm_res_id res_id;
179 struct obd_device *obd = class_exp2obd(exp);
184 fid_build_reg_res_name(fid, &res_id);
185 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
186 policy, mode, flags, opaque);
190 int mdc_null_inode(struct obd_export *exp,
191 const struct lu_fid *fid)
193 struct ldlm_res_id res_id;
194 struct ldlm_resource *res;
195 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
198 LASSERTF(ns != NULL, "no namespace passed\n");
200 fid_build_reg_res_name(fid, &res_id);
202 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
207 res->lr_lvb_inode = NULL;
210 ldlm_resource_putref(res);
214 /* find any ldlm lock of the inode in mdc
218 int mdc_find_cbdata(struct obd_export *exp,
219 const struct lu_fid *fid,
220 ldlm_iterator_t it, void *data)
222 struct ldlm_res_id res_id;
226 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
227 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
229 if (rc == LDLM_ITER_STOP)
231 else if (rc == LDLM_ITER_CONTINUE)
236 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
238 /* Don't hold error requests for replay. */
239 if (req->rq_replay) {
240 spin_lock(&req->rq_lock);
242 spin_unlock(&req->rq_lock);
244 if (rc && req->rq_transno != 0) {
245 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
250 /* Save a large LOV EA into the request buffer so that it is available
251 * for replay. We don't do this in the initial request because the
252 * original request doesn't need this buffer (at most it sends just the
253 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
254 * buffer and may also be difficult to allocate and save a very large
255 * request buffer for each open. (bug 5707)
257 * OOM here may cause recovery failure if lmm is needed (only for the
258 * original open if the MDS crashed just when this client also OOM'd)
259 * but this is incredibly unlikely, and questionable whether the client
260 * could do MDS recovery under OOM anyways... */
261 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
262 struct mdt_body *body)
266 /* FIXME: remove this explicit offset. */
267 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
270 CERROR("Can't enlarge segment %d size to %d\n",
271 DLM_INTENT_REC_OFF + 4, body->eadatasize);
272 body->valid &= ~OBD_MD_FLEASIZE;
273 body->eadatasize = 0;
277 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
278 struct lookup_intent *it,
279 struct md_op_data *op_data,
280 void *lmm, int lmmsize,
283 struct ptlrpc_request *req;
284 struct obd_device *obddev = class_exp2obd(exp);
285 struct ldlm_intent *lit;
286 CFS_LIST_HEAD(cancels);
292 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
294 /* XXX: openlock is not cancelled for cross-refs. */
295 /* If inode is known, cancel conflicting OPEN locks. */
296 if (fid_is_sane(&op_data->op_fid2)) {
297 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
300 else if (it->it_flags & FMODE_EXEC)
305 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
310 /* If CREATE, cancel parent's UPDATE lock. */
311 if (it->it_op & IT_CREAT)
315 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
317 MDS_INODELOCK_UPDATE);
319 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
320 &RQF_LDLM_INTENT_OPEN);
322 ldlm_lock_list_put(&cancels, l_bl_ast, count);
323 RETURN(ERR_PTR(-ENOMEM));
326 /* parent capability */
327 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
328 /* child capability, reserve the size according to parent capa, it will
329 * be filled after we get the reply */
330 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
332 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
333 op_data->op_namelen + 1);
334 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
335 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
337 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
339 ptlrpc_request_free(req);
343 spin_lock(&req->rq_lock);
344 req->rq_replay = req->rq_import->imp_replayable;
345 spin_unlock(&req->rq_lock);
347 /* pack the intent */
348 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
349 lit->opc = (__u64)it->it_op;
351 /* pack the intended request */
352 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
355 /* for remote client, fetch remote perm for current user */
356 if (client_is_remote(exp))
357 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
358 sizeof(struct mdt_remote_perm));
359 ptlrpc_request_set_replen(req);
363 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
364 struct lookup_intent *it,
365 struct md_op_data *op_data)
367 struct ptlrpc_request *req;
368 struct obd_device *obddev = class_exp2obd(exp);
369 struct ldlm_intent *lit;
373 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
374 &RQF_LDLM_INTENT_UNLINK);
376 RETURN(ERR_PTR(-ENOMEM));
378 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
379 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
380 op_data->op_namelen + 1);
382 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
384 ptlrpc_request_free(req);
388 /* pack the intent */
389 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
390 lit->opc = (__u64)it->it_op;
392 /* pack the intended request */
393 mdc_unlink_pack(req, op_data);
395 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
396 obddev->u.cli.cl_max_mds_easize);
397 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
398 obddev->u.cli.cl_max_mds_cookiesize);
399 ptlrpc_request_set_replen(req);
403 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
404 struct lookup_intent *it,
405 struct md_op_data *op_data)
407 struct ptlrpc_request *req;
408 struct obd_device *obddev = class_exp2obd(exp);
409 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
410 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
411 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
412 (client_is_remote(exp) ?
413 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
414 struct ldlm_intent *lit;
418 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
419 &RQF_LDLM_INTENT_GETATTR);
421 RETURN(ERR_PTR(-ENOMEM));
423 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
424 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
425 op_data->op_namelen + 1);
427 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
429 ptlrpc_request_free(req);
433 /* pack the intent */
434 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
435 lit->opc = (__u64)it->it_op;
437 /* pack the intended request */
438 mdc_getattr_pack(req, valid, it->it_flags, op_data,
439 obddev->u.cli.cl_max_mds_easize);
441 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
442 obddev->u.cli.cl_max_mds_easize);
443 if (client_is_remote(exp))
444 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
445 sizeof(struct mdt_remote_perm));
446 ptlrpc_request_set_replen(req);
450 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
451 struct lookup_intent *it,
452 struct md_op_data *unused)
454 struct obd_device *obd = class_exp2obd(exp);
455 struct ptlrpc_request *req;
456 struct ldlm_intent *lit;
457 struct layout_intent *layout;
461 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
462 &RQF_LDLM_INTENT_LAYOUT);
464 RETURN(ERR_PTR(-ENOMEM));
466 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
467 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
469 ptlrpc_request_free(req);
473 /* pack the intent */
474 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
475 lit->opc = (__u64)it->it_op;
477 /* pack the layout intent request */
478 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
479 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
480 * set for replication */
481 layout->li_opc = LAYOUT_INTENT_ACCESS;
483 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
484 obd->u.cli.cl_max_mds_easize);
485 ptlrpc_request_set_replen(req);
489 static struct ptlrpc_request *
490 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
492 struct ptlrpc_request *req;
496 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
498 RETURN(ERR_PTR(-ENOMEM));
500 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
502 ptlrpc_request_free(req);
506 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
507 ptlrpc_request_set_replen(req);
511 static int mdc_finish_enqueue(struct obd_export *exp,
512 struct ptlrpc_request *req,
513 struct ldlm_enqueue_info *einfo,
514 struct lookup_intent *it,
515 struct lustre_handle *lockh,
518 struct req_capsule *pill = &req->rq_pill;
519 struct ldlm_request *lockreq;
520 struct ldlm_reply *lockrep;
521 struct lustre_intent_data *intent = &it->d.lustre;
522 struct ldlm_lock *lock;
523 void *lvb_data = NULL;
528 /* Similarly, if we're going to replay this request, we don't want to
529 * actually get a lock, just perform the intent. */
530 if (req->rq_transno || req->rq_replay) {
531 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
532 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
535 if (rc == ELDLM_LOCK_ABORTED) {
537 memset(lockh, 0, sizeof(*lockh));
539 } else { /* rc = 0 */
540 lock = ldlm_handle2lock(lockh);
541 LASSERT(lock != NULL);
543 /* If the server gave us back a different lock mode, we should
544 * fix up our variables. */
545 if (lock->l_req_mode != einfo->ei_mode) {
546 ldlm_lock_addref(lockh, lock->l_req_mode);
547 ldlm_lock_decref(lockh, einfo->ei_mode);
548 einfo->ei_mode = lock->l_req_mode;
553 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
554 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
556 intent->it_disposition = (int)lockrep->lock_policy_res1;
557 intent->it_status = (int)lockrep->lock_policy_res2;
558 intent->it_lock_mode = einfo->ei_mode;
559 intent->it_lock_handle = lockh->cookie;
560 intent->it_data = req;
562 /* Technically speaking rq_transno must already be zero if
563 * it_status is in error, so the check is a bit redundant */
564 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
565 mdc_clear_replay_flag(req, intent->it_status);
567 /* If we're doing an IT_OPEN which did not result in an actual
568 * successful open, then we need to remove the bit which saves
569 * this request for unconditional replay.
571 * It's important that we do this first! Otherwise we might exit the
572 * function without doing so, and try to replay a failed create
574 if (it->it_op & IT_OPEN && req->rq_replay &&
575 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
576 mdc_clear_replay_flag(req, intent->it_status);
578 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
579 it->it_op, intent->it_disposition, intent->it_status);
581 /* We know what to expect, so we do any byte flipping required here */
582 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
583 struct mdt_body *body;
585 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
587 CERROR ("Can't swab mdt_body\n");
591 if (it_disposition(it, DISP_OPEN_OPEN) &&
592 !it_open_error(DISP_OPEN_OPEN, it)) {
594 * If this is a successful OPEN request, we need to set
595 * replay handler and data early, so that if replay
596 * happens immediately after swabbing below, new reply
597 * is swabbed by that handler correctly.
599 mdc_set_open_replay_data(NULL, NULL, req);
602 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
605 mdc_update_max_ea_from_body(exp, body);
608 * The eadata is opaque; just check that it is there.
609 * Eventually, obd_unpackmd() will check the contents.
611 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
616 /* save lvb data and length in case this is for layout
619 lvb_len = body->eadatasize;
622 * We save the reply LOV EA in case we have to replay a
623 * create for recovery. If we didn't allocate a large
624 * enough request buffer above we need to reallocate it
625 * here to hold the actual LOV EA.
627 * To not save LOV EA if request is not going to replay
628 * (for example error one).
630 if ((it->it_op & IT_OPEN) && req->rq_replay) {
632 if (req_capsule_get_size(pill, &RMF_EADATA,
635 mdc_realloc_openmsg(req, body);
637 req_capsule_shrink(pill, &RMF_EADATA,
641 req_capsule_set_size(pill, &RMF_EADATA,
645 lmm = req_capsule_client_get(pill, &RMF_EADATA);
647 memcpy(lmm, eadata, body->eadatasize);
651 if (body->valid & OBD_MD_FLRMTPERM) {
652 struct mdt_remote_perm *perm;
654 LASSERT(client_is_remote(exp));
655 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
656 lustre_swab_mdt_remote_perm);
660 if (body->valid & OBD_MD_FLMDSCAPA) {
661 struct lustre_capa *capa, *p;
663 capa = req_capsule_server_get(pill, &RMF_CAPA1);
667 if (it->it_op & IT_OPEN) {
668 /* client fid capa will be checked in replay */
669 p = req_capsule_client_get(pill, &RMF_CAPA2);
674 if (body->valid & OBD_MD_FLOSSCAPA) {
675 struct lustre_capa *capa;
677 capa = req_capsule_server_get(pill, &RMF_CAPA2);
681 } else if (it->it_op & IT_LAYOUT) {
682 /* maybe the lock was granted right away and layout
683 * is packed into RMF_DLM_LVB of req */
684 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
686 lvb_data = req_capsule_server_sized_get(pill,
687 &RMF_DLM_LVB, lvb_len);
688 if (lvb_data == NULL)
693 /* fill in stripe data for layout lock */
694 lock = ldlm_handle2lock(lockh);
695 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
698 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
699 ldlm_it2str(it->it_op), lvb_len);
701 OBD_ALLOC_LARGE(lmm, lvb_len);
706 memcpy(lmm, lvb_data, lvb_len);
708 /* install lvb_data */
709 lock_res_and_lock(lock);
710 if (lock->l_lvb_data == NULL) {
711 lock->l_lvb_data = lmm;
712 lock->l_lvb_len = lvb_len;
715 unlock_res_and_lock(lock);
717 OBD_FREE_LARGE(lmm, lvb_len);
725 /* We always reserve enough space in the reply packet for a stripe MD, because
726 * we don't know in advance the file type. */
727 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
728 struct lookup_intent *it, struct md_op_data *op_data,
729 struct lustre_handle *lockh, void *lmm, int lmmsize,
730 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
732 struct obd_device *obddev = class_exp2obd(exp);
733 struct ptlrpc_request *req = NULL;
734 __u64 flags, saved_flags = extra_lock_flags;
736 struct ldlm_res_id res_id;
737 static const ldlm_policy_data_t lookup_policy =
738 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
739 static const ldlm_policy_data_t update_policy =
740 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
741 static const ldlm_policy_data_t layout_policy =
742 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
743 ldlm_policy_data_t const *policy = &lookup_policy;
744 int generation, resends = 0;
745 struct ldlm_reply *lockrep;
746 enum lvb_type lvb_type = 0;
749 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
752 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
755 saved_flags |= LDLM_FL_HAS_INTENT;
756 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
757 policy = &update_policy;
758 else if (it->it_op & IT_LAYOUT)
759 policy = &layout_policy;
762 LASSERT(reqp == NULL);
764 generation = obddev->u.cli.cl_import->imp_generation;
768 /* The only way right now is FLOCK, in this case we hide flock
769 policy as lmm, but lmmsize is 0 */
770 LASSERT(lmm && lmmsize == 0);
771 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
773 policy = (ldlm_policy_data_t *)lmm;
774 res_id.name[3] = LDLM_FLOCK;
775 } else if (it->it_op & IT_OPEN) {
776 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
778 policy = &update_policy;
779 einfo->ei_cbdata = NULL;
781 } else if (it->it_op & IT_UNLINK) {
782 req = mdc_intent_unlink_pack(exp, it, op_data);
783 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
784 req = mdc_intent_getattr_pack(exp, it, op_data);
785 } else if (it->it_op & IT_READDIR) {
786 req = mdc_enqueue_pack(exp, 0);
787 } else if (it->it_op & IT_LAYOUT) {
788 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
791 req = mdc_intent_layout_pack(exp, it, op_data);
792 lvb_type = LVB_T_LAYOUT;
799 RETURN(PTR_ERR(req));
801 if (req != NULL && it && it->it_op & IT_CREAT)
802 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
804 req->rq_no_retry_einprogress = 1;
807 req->rq_generation_set = 1;
808 req->rq_import_generation = generation;
809 req->rq_sent = cfs_time_current_sec() + resends;
812 /* It is important to obtain rpc_lock first (if applicable), so that
813 * threads that are serialised with rpc_lock are not polluting our
814 * rpcs in flight counter. We do not do flock request limiting, though*/
816 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
817 rc = mdc_enter_request(&obddev->u.cli);
819 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
820 mdc_clear_replay_flag(req, 0);
821 ptlrpc_req_finished(req);
826 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
827 0, lvb_type, lockh, 0);
829 /* For flock requests we immediatelly return without further
830 delay and let caller deal with the rest, since rest of
831 this function metadata processing makes no sense for flock
832 requests anyway. But in case of problem during comms with
833 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
834 can not rely on caller and this mainly for F_UNLCKs
835 (explicits or automatically generated by Kernel to clean
836 current FLocks upon exit) that can't be trashed */
837 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
838 (einfo->ei_type == LDLM_FLOCK) &&
839 (einfo->ei_mode == LCK_NL))
844 mdc_exit_request(&obddev->u.cli);
845 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
848 CERROR("ldlm_cli_enqueue: %d\n", rc);
849 mdc_clear_replay_flag(req, rc);
850 ptlrpc_req_finished(req);
854 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
855 LASSERT(lockrep != NULL);
857 /* Retry the create infinitely when we get -EINPROGRESS from
858 * server. This is required by the new quota design. */
859 if (it && it->it_op & IT_CREAT &&
860 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
861 mdc_clear_replay_flag(req, rc);
862 ptlrpc_req_finished(req);
865 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
866 obddev->obd_name, resends, it->it_op,
867 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
869 if (generation == obddev->u.cli.cl_import->imp_generation) {
872 CDEBUG(D_HA, "resend cross eviction\n");
877 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
879 if (lustre_handle_is_used(lockh)) {
880 ldlm_lock_decref(lockh, einfo->ei_mode);
881 memset(lockh, 0, sizeof(*lockh));
883 ptlrpc_req_finished(req);
888 static int mdc_finish_intent_lock(struct obd_export *exp,
889 struct ptlrpc_request *request,
890 struct md_op_data *op_data,
891 struct lookup_intent *it,
892 struct lustre_handle *lockh)
894 struct lustre_handle old_lock;
895 struct mdt_body *mdt_body;
896 struct ldlm_lock *lock;
900 LASSERT(request != NULL);
901 LASSERT(request != LP_POISON);
902 LASSERT(request->rq_repmsg != LP_POISON);
904 if (!it_disposition(it, DISP_IT_EXECD)) {
905 /* The server failed before it even started executing the
906 * intent, i.e. because it couldn't unpack the request. */
907 LASSERT(it->d.lustre.it_status != 0);
908 RETURN(it->d.lustre.it_status);
910 rc = it_open_error(DISP_IT_EXECD, it);
914 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
915 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
917 /* If we were revalidating a fid/name pair, mark the intent in
918 * case we fail and get called again from lookup */
919 if (fid_is_sane(&op_data->op_fid2) &&
920 it->it_create_mode & M_CHECK_STALE &&
921 it->it_op != IT_GETATTR) {
922 it_set_disposition(it, DISP_ENQ_COMPLETE);
924 /* Also: did we find the same inode? */
925 /* sever can return one of two fids:
926 * op_fid2 - new allocated fid - if file is created.
927 * op_fid3 - existent fid - if file only open.
928 * op_fid3 is saved in lmv_intent_open */
929 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
930 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
931 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
932 "\n", PFID(&op_data->op_fid2),
933 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
938 rc = it_open_error(DISP_LOOKUP_EXECD, it);
942 /* keep requests around for the multiple phases of the call
943 * this shows the DISP_XX must guarantee we make it into the call
945 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
946 it_disposition(it, DISP_OPEN_CREATE) &&
947 !it_open_error(DISP_OPEN_CREATE, it)) {
948 it_set_disposition(it, DISP_ENQ_CREATE_REF);
949 ptlrpc_request_addref(request); /* balanced in ll_create_node */
951 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
952 it_disposition(it, DISP_OPEN_OPEN) &&
953 !it_open_error(DISP_OPEN_OPEN, it)) {
954 it_set_disposition(it, DISP_ENQ_OPEN_REF);
955 ptlrpc_request_addref(request); /* balanced in ll_file_open */
956 /* BUG 11546 - eviction in the middle of open rpc processing */
957 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
960 if (it->it_op & IT_CREAT) {
961 /* XXX this belongs in ll_create_it */
962 } else if (it->it_op == IT_OPEN) {
963 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
965 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
968 /* If we already have a matching lock, then cancel the new
969 * one. We have to set the data here instead of in
970 * mdc_enqueue, because we need to use the child's inode as
971 * the l_ast_data to match, and that's not available until
972 * intent_finish has performed the iget().) */
973 lock = ldlm_handle2lock(lockh);
975 ldlm_policy_data_t policy = lock->l_policy_data;
976 LDLM_DEBUG(lock, "matching against this");
978 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
979 &lock->l_resource->lr_name),
980 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
981 PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
984 memcpy(&old_lock, lockh, sizeof(*lockh));
985 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
986 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
987 ldlm_lock_decref_and_cancel(lockh,
988 it->d.lustre.it_lock_mode);
989 memcpy(lockh, &old_lock, sizeof(old_lock));
990 it->d.lustre.it_lock_handle = lockh->cookie;
993 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
994 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
995 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
999 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1000 struct lu_fid *fid, __u64 *bits)
1002 /* We could just return 1 immediately, but since we should only
1003 * be called in revalidate_it if we already have a lock, let's
1005 struct ldlm_res_id res_id;
1006 struct lustre_handle lockh;
1007 ldlm_policy_data_t policy;
1011 if (it->d.lustre.it_lock_handle) {
1012 lockh.cookie = it->d.lustre.it_lock_handle;
1013 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1015 fid_build_reg_res_name(fid, &res_id);
1016 switch (it->it_op) {
1018 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1021 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1024 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1027 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1028 LDLM_FL_BLOCK_GRANTED, &res_id,
1029 LDLM_IBITS, &policy,
1030 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1034 it->d.lustre.it_lock_handle = lockh.cookie;
1035 it->d.lustre.it_lock_mode = mode;
1037 it->d.lustre.it_lock_handle = 0;
1038 it->d.lustre.it_lock_mode = 0;
1045 * This long block is all about fixing up the lock and request state
1046 * so that it is correct as of the moment _before_ the operation was
1047 * applied; that way, the VFS will think that everything is normal and
1048 * call Lustre's regular VFS methods.
1050 * If we're performing a creation, that means that unless the creation
1051 * failed with EEXIST, we should fake up a negative dentry.
1053 * For everything else, we want to lookup to succeed.
1055 * One additional note: if CREATE or OPEN succeeded, we add an extra
1056 * reference to the request because we need to keep it around until
1057 * ll_create/ll_open gets called.
1059 * The server will return to us, in it_disposition, an indication of
1060 * exactly what d.lustre.it_status refers to.
1062 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1063 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1064 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1065 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1068 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1071 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1072 void *lmm, int lmmsize, struct lookup_intent *it,
1073 int lookup_flags, struct ptlrpc_request **reqp,
1074 ldlm_blocking_callback cb_blocking,
1075 __u64 extra_lock_flags)
1077 struct lustre_handle lockh;
1082 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1083 ", intent: %s flags %#o\n", op_data->op_namelen,
1084 op_data->op_name, PFID(&op_data->op_fid2),
1085 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1089 if (fid_is_sane(&op_data->op_fid2) &&
1090 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1091 /* We could just return 1 immediately, but since we should only
1092 * be called in revalidate_it if we already have a lock, let's
1094 it->d.lustre.it_lock_handle = 0;
1095 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1096 /* Only return failure if it was not GETATTR by cfid
1097 (from inode_revalidate) */
1098 if (rc || op_data->op_namelen != 0)
1102 /* lookup_it may be called only after revalidate_it has run, because
1103 * revalidate_it cannot return errors, only zero. Returning zero causes
1104 * this call to lookup, which *can* return an error.
1106 * We only want to execute the request associated with the intent one
1107 * time, however, so don't send the request again. Instead, skip past
1108 * this and use the request from revalidate. In this case, revalidate
1109 * never dropped its reference, so the refcounts are all OK */
1110 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1111 struct ldlm_enqueue_info einfo =
1112 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
1113 ldlm_completion_ast, NULL, NULL, NULL };
1115 /* For case if upper layer did not alloc fid, do it now. */
1116 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1117 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1119 CERROR("Can't alloc new fid, rc %d\n", rc);
1123 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1124 lmm, lmmsize, NULL, extra_lock_flags);
1127 } else if (!fid_is_sane(&op_data->op_fid2) ||
1128 !(it->it_create_mode & M_CHECK_STALE)) {
1129 /* DISP_ENQ_COMPLETE set means there is extra reference on
1130 * request referenced from this intent, saved for subsequent
1131 * lookup. This path is executed when we proceed to this
1132 * lookup, so we clear DISP_ENQ_COMPLETE */
1133 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1135 *reqp = it->d.lustre.it_data;
1136 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1140 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1141 struct ptlrpc_request *req,
1144 struct mdc_getattr_args *ga = args;
1145 struct obd_export *exp = ga->ga_exp;
1146 struct md_enqueue_info *minfo = ga->ga_minfo;
1147 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1148 struct lookup_intent *it;
1149 struct lustre_handle *lockh;
1150 struct obd_device *obddev;
1151 __u64 flags = LDLM_FL_HAS_INTENT;
1155 lockh = &minfo->mi_lockh;
1157 obddev = class_exp2obd(exp);
1159 mdc_exit_request(&obddev->u.cli);
1160 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1163 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1164 &flags, NULL, 0, lockh, rc);
1166 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1167 mdc_clear_replay_flag(req, rc);
1171 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1175 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1179 OBD_FREE_PTR(einfo);
1180 minfo->mi_cb(req, minfo, rc);
1184 int mdc_intent_getattr_async(struct obd_export *exp,
1185 struct md_enqueue_info *minfo,
1186 struct ldlm_enqueue_info *einfo)
1188 struct md_op_data *op_data = &minfo->mi_data;
1189 struct lookup_intent *it = &minfo->mi_it;
1190 struct ptlrpc_request *req;
1191 struct mdc_getattr_args *ga;
1192 struct obd_device *obddev = class_exp2obd(exp);
1193 struct ldlm_res_id res_id;
1194 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1195 * for statahead currently. Consider CMD in future, such two bits
1196 * maybe managed by different MDS, should be adjusted then. */
1197 ldlm_policy_data_t policy = {
1198 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1199 MDS_INODELOCK_UPDATE }
1202 __u64 flags = LDLM_FL_HAS_INTENT;
1205 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1206 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1207 ldlm_it2str(it->it_op), it->it_flags);
1209 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1210 req = mdc_intent_getattr_pack(exp, it, op_data);
1214 rc = mdc_enter_request(&obddev->u.cli);
1216 ptlrpc_req_finished(req);
1220 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1221 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1223 mdc_exit_request(&obddev->u.cli);
1224 ptlrpc_req_finished(req);
1228 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1229 ga = ptlrpc_req_async_args(req);
1231 ga->ga_minfo = minfo;
1232 ga->ga_einfo = einfo;
1234 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1235 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);