4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2014, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
39 #include <linux/module.h>
41 #include <obd_class.h>
42 #include <lustre_dlm.h>
43 #include <lustre_fid.h> /* fid_res_name_eq() */
44 #include <lustre_intent.h>
45 #include <lustre_mdc.h>
46 #include <lustre_net.h>
47 #include <lustre_req_layout.h>
48 #include "mdc_internal.h"
50 struct mdc_getattr_args {
51 struct obd_export *ga_exp;
52 struct md_enqueue_info *ga_minfo;
53 struct ldlm_enqueue_info *ga_einfo;
56 int it_open_error(int phase, struct lookup_intent *it)
58 if (it_disposition(it, DISP_OPEN_LEASE)) {
59 if (phase >= DISP_OPEN_LEASE)
60 return it->d.lustre.it_status;
64 if (it_disposition(it, DISP_OPEN_OPEN)) {
65 if (phase >= DISP_OPEN_OPEN)
66 return it->d.lustre.it_status;
71 if (it_disposition(it, DISP_OPEN_CREATE)) {
72 if (phase >= DISP_OPEN_CREATE)
73 return it->d.lustre.it_status;
78 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
79 if (phase >= DISP_LOOKUP_EXECD)
80 return it->d.lustre.it_status;
85 if (it_disposition(it, DISP_IT_EXECD)) {
86 if (phase >= DISP_IT_EXECD)
87 return it->d.lustre.it_status;
91 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
92 it->d.lustre.it_status);
96 EXPORT_SYMBOL(it_open_error);
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
102 struct ldlm_lock *lock;
103 struct inode *new_inode = data;
112 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
114 LASSERT(lock != NULL);
115 lock_res_and_lock(lock);
116 if (lock->l_resource->lr_lvb_inode &&
117 lock->l_resource->lr_lvb_inode != data) {
118 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119 LASSERTF(old_inode->i_state & I_FREEING,
120 "Found existing inode %p/%lu/%u state %lu in lock: "
121 "setting data to %p/%lu/%u\n", old_inode,
122 old_inode->i_ino, old_inode->i_generation,
124 new_inode, new_inode->i_ino, new_inode->i_generation);
126 lock->l_resource->lr_lvb_inode = new_inode;
128 *bits = lock->l_policy_data.l_inodebits.bits;
130 unlock_res_and_lock(lock);
136 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
137 const struct lu_fid *fid, ldlm_type_t type,
138 ldlm_policy_data_t *policy, ldlm_mode_t mode,
139 struct lustre_handle *lockh)
141 struct ldlm_res_id res_id;
145 fid_build_reg_res_name(fid, &res_id);
146 /* LU-4405: Clear bits not supported by server */
147 policy->l_inodebits.bits &= exp_connect_ibits(exp);
148 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149 &res_id, type, policy, mode, lockh, 0);
153 int mdc_cancel_unused(struct obd_export *exp,
154 const struct lu_fid *fid,
155 ldlm_policy_data_t *policy,
157 ldlm_cancel_flags_t flags,
160 struct ldlm_res_id res_id;
161 struct obd_device *obd = class_exp2obd(exp);
166 fid_build_reg_res_name(fid, &res_id);
167 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
168 policy, mode, flags, opaque);
172 int mdc_null_inode(struct obd_export *exp,
173 const struct lu_fid *fid)
175 struct ldlm_res_id res_id;
176 struct ldlm_resource *res;
177 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
180 LASSERTF(ns != NULL, "no namespace passed\n");
182 fid_build_reg_res_name(fid, &res_id);
184 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
189 res->lr_lvb_inode = NULL;
192 ldlm_resource_putref(res);
196 /* find any ldlm lock of the inode in mdc
200 int mdc_find_cbdata(struct obd_export *exp,
201 const struct lu_fid *fid,
202 ldlm_iterator_t it, void *data)
204 struct ldlm_res_id res_id;
208 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
209 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
211 if (rc == LDLM_ITER_STOP)
213 else if (rc == LDLM_ITER_CONTINUE)
218 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
220 /* Don't hold error requests for replay. */
221 if (req->rq_replay) {
222 spin_lock(&req->rq_lock);
224 spin_unlock(&req->rq_lock);
226 if (rc && req->rq_transno != 0) {
227 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
232 /* Save a large LOV EA into the request buffer so that it is available
233 * for replay. We don't do this in the initial request because the
234 * original request doesn't need this buffer (at most it sends just the
235 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
236 * buffer and may also be difficult to allocate and save a very large
237 * request buffer for each open. (bug 5707)
239 * OOM here may cause recovery failure if lmm is needed (only for the
240 * original open if the MDS crashed just when this client also OOM'd)
241 * but this is incredibly unlikely, and questionable whether the client
242 * could do MDS recovery under OOM anyways... */
243 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
244 struct mdt_body *body)
248 /* FIXME: remove this explicit offset. */
249 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
250 body->mbo_eadatasize);
252 CERROR("Can't enlarge segment %d size to %d\n",
253 DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
254 body->mbo_valid &= ~OBD_MD_FLEASIZE;
255 body->mbo_eadatasize = 0;
259 static struct ptlrpc_request *
260 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
261 struct md_op_data *op_data)
263 struct ptlrpc_request *req;
264 struct obd_device *obddev = class_exp2obd(exp);
265 struct ldlm_intent *lit;
266 const void *lmm = op_data->op_data;
267 __u32 lmmsize = op_data->op_data_size;
268 struct list_head cancels = LIST_HEAD_INIT(cancels);
274 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
276 /* XXX: openlock is not cancelled for cross-refs. */
277 /* If inode is known, cancel conflicting OPEN locks. */
278 if (fid_is_sane(&op_data->op_fid2)) {
279 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
280 if (it->it_flags & FMODE_WRITE)
285 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
288 else if (it->it_flags & FMODE_EXEC)
294 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
299 /* If CREATE, cancel parent's UPDATE lock. */
300 if (it->it_op & IT_CREAT)
304 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
306 MDS_INODELOCK_UPDATE);
308 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
309 &RQF_LDLM_INTENT_OPEN);
311 ldlm_lock_list_put(&cancels, l_bl_ast, count);
312 RETURN(ERR_PTR(-ENOMEM));
315 /* parent capability */
316 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
317 /* child capability, reserve the size according to parent capa, it will
318 * be filled after we get the reply */
319 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
321 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
322 op_data->op_namelen + 1);
323 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
324 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
326 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
328 ptlrpc_request_free(req);
332 spin_lock(&req->rq_lock);
333 req->rq_replay = req->rq_import->imp_replayable;
334 spin_unlock(&req->rq_lock);
336 /* pack the intent */
337 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
338 lit->opc = (__u64)it->it_op;
340 /* pack the intended request */
341 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
344 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
345 obddev->u.cli.cl_max_mds_easize);
347 /* for remote client, fetch remote perm for current user */
348 if (client_is_remote(exp))
349 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
350 sizeof(struct mdt_remote_perm));
351 ptlrpc_request_set_replen(req);
355 static struct ptlrpc_request *
356 mdc_intent_getxattr_pack(struct obd_export *exp,
357 struct lookup_intent *it,
358 struct md_op_data *op_data)
360 struct ptlrpc_request *req;
361 struct ldlm_intent *lit;
364 struct list_head cancels = LIST_HEAD_INIT(cancels);
368 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
369 &RQF_LDLM_INTENT_GETXATTR);
371 RETURN(ERR_PTR(-ENOMEM));
373 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
375 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
377 ptlrpc_request_free(req);
381 /* pack the intent */
382 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
383 lit->opc = IT_GETXATTR;
385 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
387 /* pack the intended request */
388 mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
389 op_data->op_valid, maxdata, -1, 0);
391 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
392 RCL_SERVER, maxdata);
394 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
395 RCL_SERVER, maxdata);
397 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
398 RCL_SERVER, maxdata);
400 ptlrpc_request_set_replen(req);
405 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
406 struct lookup_intent *it,
407 struct md_op_data *op_data)
409 struct ptlrpc_request *req;
410 struct obd_device *obddev = class_exp2obd(exp);
411 struct ldlm_intent *lit;
415 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
416 &RQF_LDLM_INTENT_UNLINK);
418 RETURN(ERR_PTR(-ENOMEM));
420 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
421 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
422 op_data->op_namelen + 1);
424 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
426 ptlrpc_request_free(req);
430 /* pack the intent */
431 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
432 lit->opc = (__u64)it->it_op;
434 /* pack the intended request */
435 mdc_unlink_pack(req, op_data);
437 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
438 obddev->u.cli.cl_default_mds_easize);
439 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
440 obddev->u.cli.cl_default_mds_cookiesize);
441 ptlrpc_request_set_replen(req);
445 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
446 struct lookup_intent *it,
447 struct md_op_data *op_data)
449 struct ptlrpc_request *req;
450 struct obd_device *obddev = class_exp2obd(exp);
451 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
452 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
453 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
454 (client_is_remote(exp) ?
455 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
456 struct ldlm_intent *lit;
461 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
462 &RQF_LDLM_INTENT_GETATTR);
464 RETURN(ERR_PTR(-ENOMEM));
466 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
467 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
468 op_data->op_namelen + 1);
470 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
472 ptlrpc_request_free(req);
476 /* pack the intent */
477 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
478 lit->opc = (__u64)it->it_op;
480 if (obddev->u.cli.cl_default_mds_easize > 0)
481 easize = obddev->u.cli.cl_default_mds_easize;
483 easize = obddev->u.cli.cl_max_mds_easize;
485 /* pack the intended request */
486 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
488 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
489 if (client_is_remote(exp))
490 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
491 sizeof(struct mdt_remote_perm));
492 ptlrpc_request_set_replen(req);
496 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
497 struct lookup_intent *it,
498 struct md_op_data *unused)
500 struct obd_device *obd = class_exp2obd(exp);
501 struct ptlrpc_request *req;
502 struct ldlm_intent *lit;
503 struct layout_intent *layout;
507 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
508 &RQF_LDLM_INTENT_LAYOUT);
510 RETURN(ERR_PTR(-ENOMEM));
512 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
513 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
515 ptlrpc_request_free(req);
519 /* pack the intent */
520 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
521 lit->opc = (__u64)it->it_op;
523 /* pack the layout intent request */
524 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
525 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
526 * set for replication */
527 layout->li_opc = LAYOUT_INTENT_ACCESS;
529 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
530 obd->u.cli.cl_default_mds_easize);
531 ptlrpc_request_set_replen(req);
535 static struct ptlrpc_request *
536 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
538 struct ptlrpc_request *req;
542 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
544 RETURN(ERR_PTR(-ENOMEM));
546 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
548 ptlrpc_request_free(req);
552 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
553 ptlrpc_request_set_replen(req);
557 static int mdc_finish_enqueue(struct obd_export *exp,
558 struct ptlrpc_request *req,
559 struct ldlm_enqueue_info *einfo,
560 struct lookup_intent *it,
561 struct lustre_handle *lockh,
564 struct req_capsule *pill = &req->rq_pill;
565 struct ldlm_request *lockreq;
566 struct ldlm_reply *lockrep;
567 struct lustre_intent_data *intent = &it->d.lustre;
568 struct ldlm_lock *lock;
569 void *lvb_data = NULL;
574 /* Similarly, if we're going to replay this request, we don't want to
575 * actually get a lock, just perform the intent. */
576 if (req->rq_transno || req->rq_replay) {
577 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
578 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
581 if (rc == ELDLM_LOCK_ABORTED) {
583 memset(lockh, 0, sizeof(*lockh));
585 } else { /* rc = 0 */
586 lock = ldlm_handle2lock(lockh);
587 LASSERT(lock != NULL);
589 /* If the server gave us back a different lock mode, we should
590 * fix up our variables. */
591 if (lock->l_req_mode != einfo->ei_mode) {
592 ldlm_lock_addref(lockh, lock->l_req_mode);
593 ldlm_lock_decref(lockh, einfo->ei_mode);
594 einfo->ei_mode = lock->l_req_mode;
599 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
600 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
602 intent->it_disposition = (int)lockrep->lock_policy_res1;
603 intent->it_status = (int)lockrep->lock_policy_res2;
604 intent->it_lock_mode = einfo->ei_mode;
605 intent->it_lock_handle = lockh->cookie;
606 intent->it_data = req;
608 /* Technically speaking rq_transno must already be zero if
609 * it_status is in error, so the check is a bit redundant */
610 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
611 mdc_clear_replay_flag(req, intent->it_status);
613 /* If we're doing an IT_OPEN which did not result in an actual
614 * successful open, then we need to remove the bit which saves
615 * this request for unconditional replay.
617 * It's important that we do this first! Otherwise we might exit the
618 * function without doing so, and try to replay a failed create
620 if (it->it_op & IT_OPEN && req->rq_replay &&
621 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
622 mdc_clear_replay_flag(req, intent->it_status);
624 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
625 it->it_op, intent->it_disposition, intent->it_status);
627 /* We know what to expect, so we do any byte flipping required here */
628 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
629 struct mdt_body *body;
631 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
633 CERROR ("Can't swab mdt_body\n");
637 if (it_disposition(it, DISP_OPEN_OPEN) &&
638 !it_open_error(DISP_OPEN_OPEN, it)) {
640 * If this is a successful OPEN request, we need to set
641 * replay handler and data early, so that if replay
642 * happens immediately after swabbing below, new reply
643 * is swabbed by that handler correctly.
645 mdc_set_open_replay_data(NULL, NULL, it);
648 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
651 mdc_update_max_ea_from_body(exp, body);
654 * The eadata is opaque; just check that it is there.
655 * Eventually, obd_unpackmd() will check the contents.
657 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
658 body->mbo_eadatasize);
662 /* save lvb data and length in case this is for layout
665 lvb_len = body->mbo_eadatasize;
668 * We save the reply LOV EA in case we have to replay a
669 * create for recovery. If we didn't allocate a large
670 * enough request buffer above we need to reallocate it
671 * here to hold the actual LOV EA.
673 * To not save LOV EA if request is not going to replay
674 * (for example error one).
676 if ((it->it_op & IT_OPEN) && req->rq_replay) {
678 if (req_capsule_get_size(pill, &RMF_EADATA,
680 body->mbo_eadatasize)
681 mdc_realloc_openmsg(req, body);
683 req_capsule_shrink(pill, &RMF_EADATA,
684 body->mbo_eadatasize,
687 req_capsule_set_size(pill, &RMF_EADATA,
689 body->mbo_eadatasize);
691 lmm = req_capsule_client_get(pill, &RMF_EADATA);
694 body->mbo_eadatasize);
698 if (body->mbo_valid & OBD_MD_FLRMTPERM) {
699 struct mdt_remote_perm *perm;
701 LASSERT(client_is_remote(exp));
702 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
703 lustre_swab_mdt_remote_perm);
707 if (body->mbo_valid & OBD_MD_FLMDSCAPA) {
708 struct lustre_capa *capa, *p;
710 capa = req_capsule_server_get(pill, &RMF_CAPA1);
714 if (it->it_op & IT_OPEN) {
715 /* client fid capa will be checked in replay */
716 p = req_capsule_client_get(pill, &RMF_CAPA2);
721 if (body->mbo_valid & OBD_MD_FLOSSCAPA) {
722 struct lustre_capa *capa;
724 capa = req_capsule_server_get(pill, &RMF_CAPA2);
728 } else if (it->it_op & IT_LAYOUT) {
729 /* maybe the lock was granted right away and layout
730 * is packed into RMF_DLM_LVB of req */
731 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
733 lvb_data = req_capsule_server_sized_get(pill,
734 &RMF_DLM_LVB, lvb_len);
735 if (lvb_data == NULL)
740 /* fill in stripe data for layout lock */
741 lock = ldlm_handle2lock(lockh);
742 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
745 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
746 ldlm_it2str(it->it_op), lvb_len);
748 OBD_ALLOC_LARGE(lmm, lvb_len);
753 memcpy(lmm, lvb_data, lvb_len);
755 /* install lvb_data */
756 lock_res_and_lock(lock);
757 if (lock->l_lvb_data == NULL) {
758 lock->l_lvb_type = LVB_T_LAYOUT;
759 lock->l_lvb_data = lmm;
760 lock->l_lvb_len = lvb_len;
763 unlock_res_and_lock(lock);
765 OBD_FREE_LARGE(lmm, lvb_len);
773 /* We always reserve enough space in the reply packet for a stripe MD, because
774 * we don't know in advance the file type. */
775 int mdc_enqueue(struct obd_export *exp,
776 struct ldlm_enqueue_info *einfo,
777 const union ldlm_policy_data *policy,
778 struct lookup_intent *it, struct md_op_data *op_data,
779 struct lustre_handle *lockh, __u64 extra_lock_flags)
781 struct obd_device *obddev = class_exp2obd(exp);
782 struct ptlrpc_request *req = NULL;
783 __u64 flags, saved_flags = extra_lock_flags;
785 struct ldlm_res_id res_id;
786 static const ldlm_policy_data_t lookup_policy =
787 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
788 static const ldlm_policy_data_t update_policy =
789 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
790 static const ldlm_policy_data_t layout_policy =
791 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
792 static const ldlm_policy_data_t getxattr_policy = {
793 .l_inodebits = { MDS_INODELOCK_XATTR } };
794 int generation, resends = 0;
795 struct ldlm_reply *lockrep;
796 enum lvb_type lvb_type = 0;
799 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
801 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
804 LASSERT(policy == NULL);
806 saved_flags |= LDLM_FL_HAS_INTENT;
807 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
808 policy = &update_policy;
809 else if (it->it_op & IT_LAYOUT)
810 policy = &layout_policy;
811 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
812 policy = &getxattr_policy;
814 policy = &lookup_policy;
817 generation = obddev->u.cli.cl_import->imp_generation;
821 /* The only way right now is FLOCK. */
822 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
824 res_id.name[3] = LDLM_FLOCK;
825 } else if (it->it_op & IT_OPEN) {
826 req = mdc_intent_open_pack(exp, it, op_data);
827 } else if (it->it_op & IT_UNLINK) {
828 req = mdc_intent_unlink_pack(exp, it, op_data);
829 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
830 req = mdc_intent_getattr_pack(exp, it, op_data);
831 } else if (it->it_op & IT_READDIR) {
832 req = mdc_enqueue_pack(exp, 0);
833 } else if (it->it_op & IT_LAYOUT) {
834 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
836 req = mdc_intent_layout_pack(exp, it, op_data);
837 lvb_type = LVB_T_LAYOUT;
838 } else if (it->it_op & IT_GETXATTR) {
839 req = mdc_intent_getxattr_pack(exp, it, op_data);
846 RETURN(PTR_ERR(req));
848 if (req != NULL && it && it->it_op & IT_CREAT)
849 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
851 req->rq_no_retry_einprogress = 1;
854 req->rq_generation_set = 1;
855 req->rq_import_generation = generation;
856 req->rq_sent = cfs_time_current_sec() + resends;
859 /* It is important to obtain rpc_lock first (if applicable), so that
860 * threads that are serialised with rpc_lock are not polluting our
861 * rpcs in flight counter. We do not do flock request limiting, though*/
863 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
864 rc = obd_get_request_slot(&obddev->u.cli);
866 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
867 mdc_clear_replay_flag(req, 0);
868 ptlrpc_req_finished(req);
873 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
874 0, lvb_type, lockh, 0);
876 /* For flock requests we immediatelly return without further
877 delay and let caller deal with the rest, since rest of
878 this function metadata processing makes no sense for flock
879 requests anyway. But in case of problem during comms with
880 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
881 can not rely on caller and this mainly for F_UNLCKs
882 (explicits or automatically generated by Kernel to clean
883 current FLocks upon exit) that can't be trashed */
884 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
885 (einfo->ei_type == LDLM_FLOCK) &&
886 (einfo->ei_mode == LCK_NL))
891 obd_put_request_slot(&obddev->u.cli);
892 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
895 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
896 obddev->obd_name, rc);
898 mdc_clear_replay_flag(req, rc);
899 ptlrpc_req_finished(req);
903 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
904 LASSERT(lockrep != NULL);
906 lockrep->lock_policy_res2 =
907 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
909 /* Retry the create infinitely when we get -EINPROGRESS from
910 * server. This is required by the new quota design. */
911 if (it && it->it_op & IT_CREAT &&
912 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
913 mdc_clear_replay_flag(req, rc);
914 ptlrpc_req_finished(req);
917 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
918 obddev->obd_name, resends, it->it_op,
919 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
921 if (generation == obddev->u.cli.cl_import->imp_generation) {
924 CDEBUG(D_HA, "resend cross eviction\n");
929 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
931 if (lustre_handle_is_used(lockh)) {
932 ldlm_lock_decref(lockh, einfo->ei_mode);
933 memset(lockh, 0, sizeof(*lockh));
935 ptlrpc_req_finished(req);
937 it->d.lustre.it_lock_handle = 0;
938 it->d.lustre.it_lock_mode = 0;
939 it->d.lustre.it_data = NULL;
945 static int mdc_finish_intent_lock(struct obd_export *exp,
946 struct ptlrpc_request *request,
947 struct md_op_data *op_data,
948 struct lookup_intent *it,
949 struct lustre_handle *lockh)
951 struct lustre_handle old_lock;
952 struct mdt_body *mdt_body;
953 struct ldlm_lock *lock;
957 LASSERT(request != NULL);
958 LASSERT(request != LP_POISON);
959 LASSERT(request->rq_repmsg != LP_POISON);
961 if (it->it_op & IT_READDIR)
964 if (!it_disposition(it, DISP_IT_EXECD)) {
965 /* The server failed before it even started executing the
966 * intent, i.e. because it couldn't unpack the request. */
967 LASSERT(it->d.lustre.it_status != 0);
968 RETURN(it->d.lustre.it_status);
970 rc = it_open_error(DISP_IT_EXECD, it);
974 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
975 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
977 rc = it_open_error(DISP_LOOKUP_EXECD, it);
981 /* keep requests around for the multiple phases of the call
982 * this shows the DISP_XX must guarantee we make it into the call
984 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
985 it_disposition(it, DISP_OPEN_CREATE) &&
986 !it_open_error(DISP_OPEN_CREATE, it)) {
987 it_set_disposition(it, DISP_ENQ_CREATE_REF);
988 ptlrpc_request_addref(request); /* balanced in ll_create_node */
990 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
991 it_disposition(it, DISP_OPEN_OPEN) &&
992 !it_open_error(DISP_OPEN_OPEN, it)) {
993 it_set_disposition(it, DISP_ENQ_OPEN_REF);
994 ptlrpc_request_addref(request); /* balanced in ll_file_open */
995 /* BUG 11546 - eviction in the middle of open rpc processing */
996 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
999 if (it->it_op & IT_CREAT) {
1000 /* XXX this belongs in ll_create_it */
1001 } else if (it->it_op == IT_OPEN) {
1002 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1004 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1007 /* If we already have a matching lock, then cancel the new
1008 * one. We have to set the data here instead of in
1009 * mdc_enqueue, because we need to use the child's inode as
1010 * the l_ast_data to match, and that's not available until
1011 * intent_finish has performed the iget().) */
1012 lock = ldlm_handle2lock(lockh);
1014 ldlm_policy_data_t policy = lock->l_policy_data;
1015 LDLM_DEBUG(lock, "matching against this");
1017 LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1,
1018 &lock->l_resource->lr_name),
1019 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1020 PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1));
1021 LDLM_LOCK_PUT(lock);
1023 memcpy(&old_lock, lockh, sizeof(*lockh));
1024 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1025 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1026 ldlm_lock_decref_and_cancel(lockh,
1027 it->d.lustre.it_lock_mode);
1028 memcpy(lockh, &old_lock, sizeof(old_lock));
1029 it->d.lustre.it_lock_handle = lockh->cookie;
1032 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1033 (int)op_data->op_namelen, op_data->op_name,
1034 ldlm_it2str(it->it_op), it->d.lustre.it_status,
1035 it->d.lustre.it_disposition, rc);
1039 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1040 struct lu_fid *fid, __u64 *bits)
1042 /* We could just return 1 immediately, but since we should only
1043 * be called in revalidate_it if we already have a lock, let's
1045 struct ldlm_res_id res_id;
1046 struct lustre_handle lockh;
1047 ldlm_policy_data_t policy;
1051 if (it->d.lustre.it_lock_handle) {
1052 lockh.cookie = it->d.lustre.it_lock_handle;
1053 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1055 fid_build_reg_res_name(fid, &res_id);
1056 switch (it->it_op) {
1058 /* File attributes are held under multiple bits:
1059 * nlink is under lookup lock, size and times are
1060 * under UPDATE lock and recently we've also got
1061 * a separate permissions lock for owner/group/acl that
1062 * were protected by lookup lock before.
1063 * Getattr must provide all of that information,
1064 * so we need to ensure we have all of those locks.
1065 * Unfortunately, if the bits are split across multiple
1066 * locks, there's no easy way to match all of them here,
1067 * so an extra RPC would be performed to fetch all
1068 * of those bits at once for now. */
1069 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1070 * but for old MDTs (< 2.4), permission is covered
1071 * by LOOKUP lock, so it needs to match all bits here.*/
1072 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1073 MDS_INODELOCK_LOOKUP |
1077 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1080 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1083 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1087 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1088 LDLM_IBITS, &policy,
1089 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1094 it->d.lustre.it_lock_handle = lockh.cookie;
1095 it->d.lustre.it_lock_mode = mode;
1097 it->d.lustre.it_lock_handle = 0;
1098 it->d.lustre.it_lock_mode = 0;
1105 * This long block is all about fixing up the lock and request state
1106 * so that it is correct as of the moment _before_ the operation was
1107 * applied; that way, the VFS will think that everything is normal and
1108 * call Lustre's regular VFS methods.
1110 * If we're performing a creation, that means that unless the creation
1111 * failed with EEXIST, we should fake up a negative dentry.
1113 * For everything else, we want to lookup to succeed.
1115 * One additional note: if CREATE or OPEN succeeded, we add an extra
1116 * reference to the request because we need to keep it around until
1117 * ll_create/ll_open gets called.
1119 * The server will return to us, in it_disposition, an indication of
1120 * exactly what d.lustre.it_status refers to.
1122 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1123 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1124 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1125 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1128 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1131 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1132 struct lookup_intent *it, struct ptlrpc_request **reqp,
1133 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1135 struct ldlm_enqueue_info einfo = {
1136 .ei_type = LDLM_IBITS,
1137 .ei_mode = it_to_lock_mode(it),
1138 .ei_cb_bl = cb_blocking,
1139 .ei_cb_cp = ldlm_completion_ast,
1141 struct lustre_handle lockh;
1146 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1147 ", intent: %s flags %#"LPF64"o\n", (int)op_data->op_namelen,
1148 op_data->op_name, PFID(&op_data->op_fid2),
1149 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1153 if (fid_is_sane(&op_data->op_fid2) &&
1154 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1155 /* We could just return 1 immediately, but since we should only
1156 * be called in revalidate_it if we already have a lock, let's
1158 it->d.lustre.it_lock_handle = 0;
1159 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1160 /* Only return failure if it was not GETATTR by cfid
1161 (from inode_revalidate) */
1162 if (rc || op_data->op_namelen != 0)
1166 /* For case if upper layer did not alloc fid, do it now. */
1167 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1168 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1170 CERROR("Can't alloc new fid, rc %d\n", rc);
1175 rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
1180 *reqp = it->d.lustre.it_data;
1181 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1185 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1186 struct ptlrpc_request *req,
1189 struct mdc_getattr_args *ga = args;
1190 struct obd_export *exp = ga->ga_exp;
1191 struct md_enqueue_info *minfo = ga->ga_minfo;
1192 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1193 struct lookup_intent *it;
1194 struct lustre_handle *lockh;
1195 struct obd_device *obddev;
1196 struct ldlm_reply *lockrep;
1197 __u64 flags = LDLM_FL_HAS_INTENT;
1201 lockh = &minfo->mi_lockh;
1203 obddev = class_exp2obd(exp);
1205 obd_put_request_slot(&obddev->u.cli);
1206 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1209 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1210 &flags, NULL, 0, lockh, rc);
1212 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1213 mdc_clear_replay_flag(req, rc);
1217 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1218 LASSERT(lockrep != NULL);
1220 lockrep->lock_policy_res2 =
1221 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1223 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1227 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1231 OBD_FREE_PTR(einfo);
1232 minfo->mi_cb(req, minfo, rc);
1236 int mdc_intent_getattr_async(struct obd_export *exp,
1237 struct md_enqueue_info *minfo,
1238 struct ldlm_enqueue_info *einfo)
1240 struct md_op_data *op_data = &minfo->mi_data;
1241 struct lookup_intent *it = &minfo->mi_it;
1242 struct ptlrpc_request *req;
1243 struct mdc_getattr_args *ga;
1244 struct obd_device *obddev = class_exp2obd(exp);
1245 struct ldlm_res_id res_id;
1246 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1247 * for statahead currently. Consider CMD in future, such two bits
1248 * maybe managed by different MDS, should be adjusted then. */
1249 ldlm_policy_data_t policy = {
1250 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1251 MDS_INODELOCK_UPDATE }
1254 __u64 flags = LDLM_FL_HAS_INTENT;
1257 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#"
1259 (int)op_data->op_namelen, op_data->op_name,
1260 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1262 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1263 req = mdc_intent_getattr_pack(exp, it, op_data);
1265 RETURN(PTR_ERR(req));
1267 rc = obd_get_request_slot(&obddev->u.cli);
1269 ptlrpc_req_finished(req);
1273 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1274 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1276 obd_put_request_slot(&obddev->u.cli);
1277 ptlrpc_req_finished(req);
1281 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1282 ga = ptlrpc_req_async_args(req);
1284 ga->ga_minfo = minfo;
1285 ga->ga_einfo = einfo;
1287 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1288 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);