4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
40 # include <linux/module.h>
41 # include <linux/pagemap.h>
42 # include <linux/miscdevice.h>
43 # include <linux/init.h>
45 # include <liblustre.h>
48 #include <lustre_acl.h>
49 #include <obd_class.h>
50 #include <lustre_dlm.h>
51 /* fid_res_name_eq() */
52 #include <lustre_fid.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
56 struct mdc_getattr_args {
57 struct obd_export *ga_exp;
58 struct md_enqueue_info *ga_minfo;
59 struct ldlm_enqueue_info *ga_einfo;
62 int it_disposition(struct lookup_intent *it, int flag)
64 return it->d.lustre.it_disposition & flag;
66 EXPORT_SYMBOL(it_disposition);
68 void it_set_disposition(struct lookup_intent *it, int flag)
70 it->d.lustre.it_disposition |= flag;
72 EXPORT_SYMBOL(it_set_disposition);
74 void it_clear_disposition(struct lookup_intent *it, int flag)
76 it->d.lustre.it_disposition &= ~flag;
78 EXPORT_SYMBOL(it_clear_disposition);
80 int it_open_error(int phase, struct lookup_intent *it)
82 if (it_disposition(it, DISP_OPEN_OPEN)) {
83 if (phase >= DISP_OPEN_OPEN)
84 return it->d.lustre.it_status;
89 if (it_disposition(it, DISP_OPEN_CREATE)) {
90 if (phase >= DISP_OPEN_CREATE)
91 return it->d.lustre.it_status;
96 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
97 if (phase >= DISP_LOOKUP_EXECD)
98 return it->d.lustre.it_status;
103 if (it_disposition(it, DISP_IT_EXECD)) {
104 if (phase >= DISP_IT_EXECD)
105 return it->d.lustre.it_status;
109 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
110 it->d.lustre.it_status);
114 EXPORT_SYMBOL(it_open_error);
116 /* this must be called on a lockh that is known to have a referenced lock */
117 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
120 struct ldlm_lock *lock;
129 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
131 LASSERT(lock != NULL);
132 lock_res_and_lock(lock);
134 if (lock->l_ast_data && lock->l_ast_data != data) {
135 struct inode *new_inode = data;
136 struct inode *old_inode = lock->l_ast_data;
137 LASSERTF(old_inode->i_state & I_FREEING,
138 "Found existing inode %p/%lu/%u state %lu in lock: "
139 "setting data to %p/%lu/%u\n", old_inode,
140 old_inode->i_ino, old_inode->i_generation,
142 new_inode, new_inode->i_ino, new_inode->i_generation);
145 lock->l_ast_data = data;
147 *bits = lock->l_policy_data.l_inodebits.bits;
149 unlock_res_and_lock(lock);
155 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
156 const struct lu_fid *fid, ldlm_type_t type,
157 ldlm_policy_data_t *policy, ldlm_mode_t mode,
158 struct lustre_handle *lockh)
160 struct ldlm_res_id res_id;
164 fid_build_reg_res_name(fid, &res_id);
165 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
166 &res_id, type, policy, mode, lockh, 0);
170 int mdc_cancel_unused(struct obd_export *exp,
171 const struct lu_fid *fid,
172 ldlm_policy_data_t *policy,
174 ldlm_cancel_flags_t flags,
177 struct ldlm_res_id res_id;
178 struct obd_device *obd = class_exp2obd(exp);
183 fid_build_reg_res_name(fid, &res_id);
184 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
185 policy, mode, flags, opaque);
189 int mdc_change_cbdata(struct obd_export *exp,
190 const struct lu_fid *fid,
191 ldlm_iterator_t it, void *data)
193 struct ldlm_res_id res_id;
196 fid_build_reg_res_name(fid, &res_id);
197 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
204 /* find any ldlm lock of the inode in mdc
208 int mdc_find_cbdata(struct obd_export *exp,
209 const struct lu_fid *fid,
210 ldlm_iterator_t it, void *data)
212 struct ldlm_res_id res_id;
216 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
217 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
219 if (rc == LDLM_ITER_STOP)
221 else if (rc == LDLM_ITER_CONTINUE)
226 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
228 /* Don't hold error requests for replay. */
229 if (req->rq_replay) {
230 spin_lock(&req->rq_lock);
232 spin_unlock(&req->rq_lock);
234 if (rc && req->rq_transno != 0) {
235 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
240 /* Save a large LOV EA into the request buffer so that it is available
241 * for replay. We don't do this in the initial request because the
242 * original request doesn't need this buffer (at most it sends just the
243 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
244 * buffer and may also be difficult to allocate and save a very large
245 * request buffer for each open. (bug 5707)
247 * OOM here may cause recovery failure if lmm is needed (only for the
248 * original open if the MDS crashed just when this client also OOM'd)
249 * but this is incredibly unlikely, and questionable whether the client
250 * could do MDS recovery under OOM anyways... */
251 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
252 struct mdt_body *body)
256 /* FIXME: remove this explicit offset. */
257 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
260 CERROR("Can't enlarge segment %d size to %d\n",
261 DLM_INTENT_REC_OFF + 4, body->eadatasize);
262 body->valid &= ~OBD_MD_FLEASIZE;
263 body->eadatasize = 0;
267 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
268 struct lookup_intent *it,
269 struct md_op_data *op_data,
270 void *lmm, int lmmsize,
273 struct ptlrpc_request *req;
274 struct obd_device *obddev = class_exp2obd(exp);
275 struct ldlm_intent *lit;
276 CFS_LIST_HEAD(cancels);
282 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
284 /* XXX: openlock is not cancelled for cross-refs. */
285 /* If inode is known, cancel conflicting OPEN locks. */
286 if (fid_is_sane(&op_data->op_fid2)) {
287 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
290 else if (it->it_flags & FMODE_EXEC)
295 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
300 /* If CREATE, cancel parent's UPDATE lock. */
301 if (it->it_op & IT_CREAT)
305 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
307 MDS_INODELOCK_UPDATE);
309 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
310 &RQF_LDLM_INTENT_OPEN);
312 ldlm_lock_list_put(&cancels, l_bl_ast, count);
313 RETURN(ERR_PTR(-ENOMEM));
316 /* parent capability */
317 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
318 /* child capability, reserve the size according to parent capa, it will
319 * be filled after we get the reply */
320 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
322 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
323 op_data->op_namelen + 1);
324 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
325 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
327 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
329 ptlrpc_request_free(req);
333 spin_lock(&req->rq_lock);
334 req->rq_replay = req->rq_import->imp_replayable;
335 spin_unlock(&req->rq_lock);
337 /* pack the intent */
338 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
339 lit->opc = (__u64)it->it_op;
341 /* pack the intended request */
342 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
345 /* for remote client, fetch remote perm for current user */
346 if (client_is_remote(exp))
347 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
348 sizeof(struct mdt_remote_perm));
349 ptlrpc_request_set_replen(req);
353 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
354 struct lookup_intent *it,
355 struct md_op_data *op_data)
357 struct ptlrpc_request *req;
358 struct obd_device *obddev = class_exp2obd(exp);
359 struct ldlm_intent *lit;
363 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
364 &RQF_LDLM_INTENT_UNLINK);
366 RETURN(ERR_PTR(-ENOMEM));
368 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
369 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
370 op_data->op_namelen + 1);
372 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
374 ptlrpc_request_free(req);
378 /* pack the intent */
379 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
380 lit->opc = (__u64)it->it_op;
382 /* pack the intended request */
383 mdc_unlink_pack(req, op_data);
385 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
386 obddev->u.cli.cl_max_mds_easize);
387 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
388 obddev->u.cli.cl_max_mds_cookiesize);
389 ptlrpc_request_set_replen(req);
393 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
394 struct lookup_intent *it,
395 struct md_op_data *op_data)
397 struct ptlrpc_request *req;
398 struct obd_device *obddev = class_exp2obd(exp);
399 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
400 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
401 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
402 (client_is_remote(exp) ?
403 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
404 struct ldlm_intent *lit;
408 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
409 &RQF_LDLM_INTENT_GETATTR);
411 RETURN(ERR_PTR(-ENOMEM));
413 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
414 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
415 op_data->op_namelen + 1);
417 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
419 ptlrpc_request_free(req);
423 /* pack the intent */
424 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
425 lit->opc = (__u64)it->it_op;
427 /* pack the intended request */
428 mdc_getattr_pack(req, valid, it->it_flags, op_data,
429 obddev->u.cli.cl_max_mds_easize);
431 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
432 obddev->u.cli.cl_max_mds_easize);
433 if (client_is_remote(exp))
434 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
435 sizeof(struct mdt_remote_perm));
436 ptlrpc_request_set_replen(req);
440 static struct ptlrpc_request *
441 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
443 struct ptlrpc_request *req;
447 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
449 RETURN(ERR_PTR(-ENOMEM));
451 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
453 ptlrpc_request_free(req);
457 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
458 ptlrpc_request_set_replen(req);
462 static int mdc_finish_enqueue(struct obd_export *exp,
463 struct ptlrpc_request *req,
464 struct ldlm_enqueue_info *einfo,
465 struct lookup_intent *it,
466 struct lustre_handle *lockh,
469 struct req_capsule *pill = &req->rq_pill;
470 struct ldlm_request *lockreq;
471 struct ldlm_reply *lockrep;
472 struct lustre_intent_data *intent = &it->d.lustre;
476 /* Similarly, if we're going to replay this request, we don't want to
477 * actually get a lock, just perform the intent. */
478 if (req->rq_transno || req->rq_replay) {
479 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
480 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
483 if (rc == ELDLM_LOCK_ABORTED) {
485 memset(lockh, 0, sizeof(*lockh));
487 } else { /* rc = 0 */
488 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
491 /* If the server gave us back a different lock mode, we should
492 * fix up our variables. */
493 if (lock->l_req_mode != einfo->ei_mode) {
494 ldlm_lock_addref(lockh, lock->l_req_mode);
495 ldlm_lock_decref(lockh, einfo->ei_mode);
496 einfo->ei_mode = lock->l_req_mode;
501 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
502 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
504 intent->it_disposition = (int)lockrep->lock_policy_res1;
505 intent->it_status = (int)lockrep->lock_policy_res2;
506 intent->it_lock_mode = einfo->ei_mode;
507 intent->it_lock_handle = lockh->cookie;
508 intent->it_data = req;
510 /* Technically speaking rq_transno must already be zero if
511 * it_status is in error, so the check is a bit redundant */
512 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
513 mdc_clear_replay_flag(req, intent->it_status);
515 /* If we're doing an IT_OPEN which did not result in an actual
516 * successful open, then we need to remove the bit which saves
517 * this request for unconditional replay.
519 * It's important that we do this first! Otherwise we might exit the
520 * function without doing so, and try to replay a failed create
522 if (it->it_op & IT_OPEN && req->rq_replay &&
523 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
524 mdc_clear_replay_flag(req, intent->it_status);
526 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
527 it->it_op, intent->it_disposition, intent->it_status);
529 /* We know what to expect, so we do any byte flipping required here */
530 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
531 struct mdt_body *body;
533 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
535 CERROR ("Can't swab mdt_body\n");
539 if (it_disposition(it, DISP_OPEN_OPEN) &&
540 !it_open_error(DISP_OPEN_OPEN, it)) {
542 * If this is a successful OPEN request, we need to set
543 * replay handler and data early, so that if replay
544 * happens immediately after swabbing below, new reply
545 * is swabbed by that handler correctly.
547 mdc_set_open_replay_data(NULL, NULL, req);
550 /* TODO: make sure LAYOUT lock must be granted along with EA */
552 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
555 mdc_update_max_ea_from_body(exp, body);
558 * The eadata is opaque; just check that it is there.
559 * Eventually, obd_unpackmd() will check the contents.
561 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
567 * We save the reply LOV EA in case we have to replay a
568 * create for recovery. If we didn't allocate a large
569 * enough request buffer above we need to reallocate it
570 * here to hold the actual LOV EA.
572 * To not save LOV EA if request is not going to replay
573 * (for example error one).
575 if ((it->it_op & IT_OPEN) && req->rq_replay) {
577 if (req_capsule_get_size(pill, &RMF_EADATA,
580 mdc_realloc_openmsg(req, body);
582 req_capsule_shrink(pill, &RMF_EADATA,
586 req_capsule_set_size(pill, &RMF_EADATA,
590 lmm = req_capsule_client_get(pill, &RMF_EADATA);
592 memcpy(lmm, eadata, body->eadatasize);
596 if (body->valid & OBD_MD_FLRMTPERM) {
597 struct mdt_remote_perm *perm;
599 LASSERT(client_is_remote(exp));
600 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
601 lustre_swab_mdt_remote_perm);
605 if (body->valid & OBD_MD_FLMDSCAPA) {
606 struct lustre_capa *capa, *p;
608 capa = req_capsule_server_get(pill, &RMF_CAPA1);
612 if (it->it_op & IT_OPEN) {
613 /* client fid capa will be checked in replay */
614 p = req_capsule_client_get(pill, &RMF_CAPA2);
619 if (body->valid & OBD_MD_FLOSSCAPA) {
620 struct lustre_capa *capa;
622 capa = req_capsule_server_get(pill, &RMF_CAPA2);
626 } else if (it->it_op & IT_LAYOUT) {
627 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
629 if (lock != NULL && lock->l_lvb_data == NULL) {
632 /* maybe the lock was granted right away and layout
633 * is packed into RMF_DLM_LVB of req */
634 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB,
640 lvb = req_capsule_server_sized_get(pill,
641 &RMF_DLM_LVB, lvb_len);
647 OBD_ALLOC_LARGE(lmm, lvb_len);
652 memcpy(lmm, lvb, lvb_len);
654 /* install lvb_data */
655 lock_res_and_lock(lock);
656 LASSERT(lock->l_lvb_data == NULL);
657 lock->l_lvb_data = lmm;
658 lock->l_lvb_len = lvb_len;
659 unlock_res_and_lock(lock);
669 /* We always reserve enough space in the reply packet for a stripe MD, because
670 * we don't know in advance the file type. */
671 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
672 struct lookup_intent *it, struct md_op_data *op_data,
673 struct lustre_handle *lockh, void *lmm, int lmmsize,
674 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
676 struct obd_device *obddev = class_exp2obd(exp);
677 struct ptlrpc_request *req = NULL;
678 __u64 flags, saved_flags = extra_lock_flags;
680 struct ldlm_res_id res_id;
681 static const ldlm_policy_data_t lookup_policy =
682 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
683 static const ldlm_policy_data_t update_policy =
684 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
685 static const ldlm_policy_data_t layout_policy =
686 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
687 ldlm_policy_data_t const *policy = &lookup_policy;
688 int generation, resends = 0;
689 struct ldlm_reply *lockrep;
690 enum lvb_type lvb_type = 0;
693 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
696 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
699 saved_flags |= LDLM_FL_HAS_INTENT;
700 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
701 policy = &update_policy;
702 else if (it->it_op & IT_LAYOUT)
703 policy = &layout_policy;
706 LASSERT(reqp == NULL);
708 generation = obddev->u.cli.cl_import->imp_generation;
712 /* The only way right now is FLOCK, in this case we hide flock
713 policy as lmm, but lmmsize is 0 */
714 LASSERT(lmm && lmmsize == 0);
715 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
717 policy = (ldlm_policy_data_t *)lmm;
718 res_id.name[3] = LDLM_FLOCK;
719 } else if (it->it_op & IT_OPEN) {
720 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
722 policy = &update_policy;
723 einfo->ei_cbdata = NULL;
725 } else if (it->it_op & IT_UNLINK) {
726 req = mdc_intent_unlink_pack(exp, it, op_data);
727 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
728 req = mdc_intent_getattr_pack(exp, it, op_data);
729 } else if (it->it_op & IT_READDIR) {
730 req = mdc_enqueue_pack(exp, 0);
731 } else if (it->it_op & IT_LAYOUT) {
732 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
735 req = mdc_enqueue_pack(exp, obddev->u.cli.cl_max_mds_easize);
736 lvb_type = LVB_T_LAYOUT;
743 RETURN(PTR_ERR(req));
745 if (req != NULL && it && it->it_op & IT_CREAT)
746 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
748 req->rq_no_retry_einprogress = 1;
751 req->rq_generation_set = 1;
752 req->rq_import_generation = generation;
753 req->rq_sent = cfs_time_current_sec() + resends;
756 /* It is important to obtain rpc_lock first (if applicable), so that
757 * threads that are serialised with rpc_lock are not polluting our
758 * rpcs in flight counter. We do not do flock request limiting, though*/
760 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
761 rc = mdc_enter_request(&obddev->u.cli);
763 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
764 mdc_clear_replay_flag(req, 0);
765 ptlrpc_req_finished(req);
770 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
771 0, lvb_type, lockh, 0);
773 /* For flock requests we immediatelly return without further
774 delay and let caller deal with the rest, since rest of
775 this function metadata processing makes no sense for flock
780 mdc_exit_request(&obddev->u.cli);
781 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
784 CERROR("ldlm_cli_enqueue: %d\n", rc);
785 mdc_clear_replay_flag(req, rc);
786 ptlrpc_req_finished(req);
790 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
791 LASSERT(lockrep != NULL);
793 /* Retry the create infinitely when we get -EINPROGRESS from
794 * server. This is required by the new quota design. */
795 if (it && it->it_op & IT_CREAT &&
796 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
797 mdc_clear_replay_flag(req, rc);
798 ptlrpc_req_finished(req);
801 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
802 obddev->obd_name, resends, it->it_op,
803 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
805 if (generation == obddev->u.cli.cl_import->imp_generation) {
808 CDEBUG(D_HA, "resend cross eviction\n");
813 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
818 static int mdc_finish_intent_lock(struct obd_export *exp,
819 struct ptlrpc_request *request,
820 struct md_op_data *op_data,
821 struct lookup_intent *it,
822 struct lustre_handle *lockh)
824 struct lustre_handle old_lock;
825 struct mdt_body *mdt_body;
826 struct ldlm_lock *lock;
830 LASSERT(request != NULL);
831 LASSERT(request != LP_POISON);
832 LASSERT(request->rq_repmsg != LP_POISON);
834 if (!it_disposition(it, DISP_IT_EXECD)) {
835 /* The server failed before it even started executing the
836 * intent, i.e. because it couldn't unpack the request. */
837 LASSERT(it->d.lustre.it_status != 0);
838 RETURN(it->d.lustre.it_status);
840 rc = it_open_error(DISP_IT_EXECD, it);
844 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
845 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
847 /* If we were revalidating a fid/name pair, mark the intent in
848 * case we fail and get called again from lookup */
849 if (fid_is_sane(&op_data->op_fid2) &&
850 it->it_create_mode & M_CHECK_STALE &&
851 it->it_op != IT_GETATTR) {
852 it_set_disposition(it, DISP_ENQ_COMPLETE);
854 /* Also: did we find the same inode? */
855 /* sever can return one of two fids:
856 * op_fid2 - new allocated fid - if file is created.
857 * op_fid3 - existent fid - if file only open.
858 * op_fid3 is saved in lmv_intent_open */
859 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
860 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
861 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
862 "\n", PFID(&op_data->op_fid2),
863 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
868 rc = it_open_error(DISP_LOOKUP_EXECD, it);
872 /* keep requests around for the multiple phases of the call
873 * this shows the DISP_XX must guarantee we make it into the call
875 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
876 it_disposition(it, DISP_OPEN_CREATE) &&
877 !it_open_error(DISP_OPEN_CREATE, it)) {
878 it_set_disposition(it, DISP_ENQ_CREATE_REF);
879 ptlrpc_request_addref(request); /* balanced in ll_create_node */
881 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
882 it_disposition(it, DISP_OPEN_OPEN) &&
883 !it_open_error(DISP_OPEN_OPEN, it)) {
884 it_set_disposition(it, DISP_ENQ_OPEN_REF);
885 ptlrpc_request_addref(request); /* balanced in ll_file_open */
886 /* BUG 11546 - eviction in the middle of open rpc processing */
887 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
890 if (it->it_op & IT_CREAT) {
891 /* XXX this belongs in ll_create_it */
892 } else if (it->it_op == IT_OPEN) {
893 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
895 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
898 /* If we already have a matching lock, then cancel the new
899 * one. We have to set the data here instead of in
900 * mdc_enqueue, because we need to use the child's inode as
901 * the l_ast_data to match, and that's not available until
902 * intent_finish has performed the iget().) */
903 lock = ldlm_handle2lock(lockh);
905 ldlm_policy_data_t policy = lock->l_policy_data;
906 LDLM_DEBUG(lock, "matching against this");
908 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
909 &lock->l_resource->lr_name),
910 "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
911 (unsigned long)lock->l_resource->lr_name.name[0],
912 (unsigned long)lock->l_resource->lr_name.name[1],
913 (unsigned long)lock->l_resource->lr_name.name[2],
914 (unsigned long)fid_seq(&mdt_body->fid1),
915 (unsigned long)fid_oid(&mdt_body->fid1),
916 (unsigned long)fid_ver(&mdt_body->fid1));
919 memcpy(&old_lock, lockh, sizeof(*lockh));
920 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
921 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
922 ldlm_lock_decref_and_cancel(lockh,
923 it->d.lustre.it_lock_mode);
924 memcpy(lockh, &old_lock, sizeof(old_lock));
925 it->d.lustre.it_lock_handle = lockh->cookie;
928 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
929 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
930 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
934 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
935 struct lu_fid *fid, __u64 *bits)
937 /* We could just return 1 immediately, but since we should only
938 * be called in revalidate_it if we already have a lock, let's
940 struct ldlm_res_id res_id;
941 struct lustre_handle lockh;
942 ldlm_policy_data_t policy;
946 if (it->d.lustre.it_lock_handle) {
947 lockh.cookie = it->d.lustre.it_lock_handle;
948 mode = ldlm_revalidate_lock_handle(&lockh, bits);
950 fid_build_reg_res_name(fid, &res_id);
953 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
956 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
959 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
962 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
963 LDLM_FL_BLOCK_GRANTED, &res_id,
965 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
969 it->d.lustre.it_lock_handle = lockh.cookie;
970 it->d.lustre.it_lock_mode = mode;
972 it->d.lustre.it_lock_handle = 0;
973 it->d.lustre.it_lock_mode = 0;
980 * This long block is all about fixing up the lock and request state
981 * so that it is correct as of the moment _before_ the operation was
982 * applied; that way, the VFS will think that everything is normal and
983 * call Lustre's regular VFS methods.
985 * If we're performing a creation, that means that unless the creation
986 * failed with EEXIST, we should fake up a negative dentry.
988 * For everything else, we want to lookup to succeed.
990 * One additional note: if CREATE or OPEN succeeded, we add an extra
991 * reference to the request because we need to keep it around until
992 * ll_create/ll_open gets called.
994 * The server will return to us, in it_disposition, an indication of
995 * exactly what d.lustre.it_status refers to.
997 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
998 * otherwise if DISP_OPEN_CREATE is set, then it status is the
999 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1000 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1003 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1006 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1007 void *lmm, int lmmsize, struct lookup_intent *it,
1008 int lookup_flags, struct ptlrpc_request **reqp,
1009 ldlm_blocking_callback cb_blocking,
1010 __u64 extra_lock_flags)
1012 struct lustre_handle lockh;
1017 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1018 ", intent: %s flags %#o\n", op_data->op_namelen,
1019 op_data->op_name, PFID(&op_data->op_fid2),
1020 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1024 if (fid_is_sane(&op_data->op_fid2) &&
1025 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT))) {
1026 /* We could just return 1 immediately, but since we should only
1027 * be called in revalidate_it if we already have a lock, let's
1029 it->d.lustre.it_lock_handle = 0;
1030 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1031 /* Only return failure if it was not GETATTR by cfid
1032 (from inode_revalidate) */
1033 if (rc || op_data->op_namelen != 0)
1037 /* lookup_it may be called only after revalidate_it has run, because
1038 * revalidate_it cannot return errors, only zero. Returning zero causes
1039 * this call to lookup, which *can* return an error.
1041 * We only want to execute the request associated with the intent one
1042 * time, however, so don't send the request again. Instead, skip past
1043 * this and use the request from revalidate. In this case, revalidate
1044 * never dropped its reference, so the refcounts are all OK */
1045 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1046 struct ldlm_enqueue_info einfo =
1047 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
1048 ldlm_completion_ast, NULL, NULL, NULL };
1050 /* For case if upper layer did not alloc fid, do it now. */
1051 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1052 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1054 CERROR("Can't alloc new fid, rc %d\n", rc);
1058 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1059 lmm, lmmsize, NULL, extra_lock_flags);
1062 } else if (!fid_is_sane(&op_data->op_fid2) ||
1063 !(it->it_create_mode & M_CHECK_STALE)) {
1064 /* DISP_ENQ_COMPLETE set means there is extra reference on
1065 * request referenced from this intent, saved for subsequent
1066 * lookup. This path is executed when we proceed to this
1067 * lookup, so we clear DISP_ENQ_COMPLETE */
1068 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1070 *reqp = it->d.lustre.it_data;
1071 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1075 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1076 struct ptlrpc_request *req,
1079 struct mdc_getattr_args *ga = args;
1080 struct obd_export *exp = ga->ga_exp;
1081 struct md_enqueue_info *minfo = ga->ga_minfo;
1082 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1083 struct lookup_intent *it;
1084 struct lustre_handle *lockh;
1085 struct obd_device *obddev;
1086 __u64 flags = LDLM_FL_HAS_INTENT;
1090 lockh = &minfo->mi_lockh;
1092 obddev = class_exp2obd(exp);
1094 mdc_exit_request(&obddev->u.cli);
1095 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1098 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1099 &flags, NULL, 0, lockh, rc);
1101 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1102 mdc_clear_replay_flag(req, rc);
1106 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1110 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1114 OBD_FREE_PTR(einfo);
1115 minfo->mi_cb(req, minfo, rc);
1119 int mdc_intent_getattr_async(struct obd_export *exp,
1120 struct md_enqueue_info *minfo,
1121 struct ldlm_enqueue_info *einfo)
1123 struct md_op_data *op_data = &minfo->mi_data;
1124 struct lookup_intent *it = &minfo->mi_it;
1125 struct ptlrpc_request *req;
1126 struct mdc_getattr_args *ga;
1127 struct obd_device *obddev = class_exp2obd(exp);
1128 struct ldlm_res_id res_id;
1129 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1130 * for statahead currently. Consider CMD in future, such two bits
1131 * maybe managed by different MDS, should be adjusted then. */
1132 ldlm_policy_data_t policy = {
1133 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1134 MDS_INODELOCK_UPDATE }
1137 __u64 flags = LDLM_FL_HAS_INTENT;
1140 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1141 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1142 ldlm_it2str(it->it_op), it->it_flags);
1144 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1145 req = mdc_intent_getattr_pack(exp, it, op_data);
1149 rc = mdc_enter_request(&obddev->u.cli);
1151 ptlrpc_req_finished(req);
1155 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1156 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1158 mdc_exit_request(&obddev->u.cli);
1159 ptlrpc_req_finished(req);
1163 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1164 ga = ptlrpc_req_async_args(req);
1166 ga->ga_minfo = minfo;
1167 ga->ga_einfo = einfo;
1169 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1170 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);