1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_MDC
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
48 # include <liblustre.h>
51 #include <obd_class.h>
52 #include <lustre_dlm.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
56 struct mdc_getattr_args {
57 struct obd_export *ga_exp;
58 struct md_enqueue_info *ga_minfo;
59 struct ldlm_enqueue_info *ga_einfo;
62 int it_open_error(int phase, struct lookup_intent *it)
64 if (it_disposition(it, DISP_OPEN_OPEN)) {
65 if (phase >= DISP_OPEN_OPEN)
66 return it->d.lustre.it_status;
71 if (it_disposition(it, DISP_OPEN_CREATE)) {
72 if (phase >= DISP_OPEN_CREATE)
73 return it->d.lustre.it_status;
78 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
79 if (phase >= DISP_LOOKUP_EXECD)
80 return it->d.lustre.it_status;
85 if (it_disposition(it, DISP_IT_EXECD)) {
86 if (phase >= DISP_IT_EXECD)
87 return it->d.lustre.it_status;
91 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
92 it->d.lustre.it_status);
96 EXPORT_SYMBOL(it_open_error);
98 /* this must be called on a lockh that is known to have a referenced lock */
99 void mdc_set_lock_data(__u64 *l, void *data, __u32 *bits)
101 struct ldlm_lock *lock;
102 struct lustre_handle *lockh = (struct lustre_handle *)l;
113 lock = ldlm_handle2lock(lockh);
115 LASSERT(lock != NULL);
116 lock_res_and_lock(lock);
118 if (lock->l_ast_data && lock->l_ast_data != data) {
119 struct inode *new_inode = data;
120 struct inode *old_inode = lock->l_ast_data;
121 LASSERTF(old_inode->i_state & I_FREEING,
122 "Found existing inode %p/%lu/%u state %lu in lock: "
123 "setting data to %p/%lu/%u\n", old_inode,
124 old_inode->i_ino, old_inode->i_generation,
126 new_inode, new_inode->i_ino, new_inode->i_generation);
129 lock->l_ast_data = data;
131 *bits = lock->l_policy_data.l_inodebits.bits;
132 unlock_res_and_lock(lock);
137 EXPORT_SYMBOL(mdc_set_lock_data);
139 int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid,
140 ldlm_iterator_t it, void *data)
142 struct ldlm_res_id res_id;
145 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
146 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
153 /* find any ldlm lock of the inode in mdc
157 int mdc_find_cbdata(struct obd_export *exp, struct ll_fid *fid,
158 ldlm_iterator_t it, void *data)
160 struct ldlm_res_id res_id;
164 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
165 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
167 if (rc == LDLM_ITER_STOP)
169 else if (rc == LDLM_ITER_CONTINUE)
174 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
176 /* Don't hold error requests for replay. */
177 if (req->rq_replay) {
178 spin_lock(&req->rq_lock);
180 spin_unlock(&req->rq_lock);
182 if (rc && req->rq_transno != 0) {
183 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
188 static int l_round_up(int val)
198 /* Save a large LOV EA into the request buffer so that it is available
199 * for replay. We don't do this in the initial request because the
200 * original request doesn't need this buffer (at most it sends just the
201 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
202 * buffer and may also be difficult to allocate and save a very large
203 * request buffer for each open. (bug 5707)
205 * OOM here may cause recovery failure if lmm is needed (only for the
206 * original open if the MDS crashed just when this client also OOM'd)
207 * but this is incredibly unlikely, and questionable whether the client
208 * could do MDS recovery under OOM anyways... */
209 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
210 struct mds_body *body)
212 int old_len, new_size, old_size;
213 struct lustre_msg *old_msg = req->rq_reqmsg;
214 struct lustre_msg *new_msg;
217 if (mdc_req_is_2_0_server(req))
222 old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + offset);
223 old_size = lustre_packed_msg_size(old_msg);
224 lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + offset,
226 /* old buffer is more then need */
227 if (old_len > body->eadatasize)
230 new_size = lustre_packed_msg_size(old_msg);
232 OBD_ALLOC(new_msg, new_size);
233 if (new_msg != NULL) {
234 DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u",
236 memcpy(new_msg, old_msg, old_size);
238 spin_lock(&req->rq_lock);
239 req->rq_reqmsg = new_msg;
240 req->rq_reqlen = new_size;
241 spin_unlock(&req->rq_lock);
243 OBD_FREE(old_msg, old_size);
245 lustre_msg_set_buflen(old_msg,
246 DLM_INTENT_REC_OFF + offset, old_len);
247 body->valid &= ~OBD_MD_FLEASIZE;
248 body->eadatasize = 0;
252 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
253 struct lookup_intent *it,
254 struct mdc_op_data *data,
255 void *lmm, __u32 lmmsize)
257 struct ptlrpc_request *req;
258 struct ldlm_intent *lit;
259 struct obd_device *obddev = class_exp2obd(exp);
260 __u32 size[9] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
261 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
262 [DLM_INTENT_IT_OFF] = sizeof(*lit),
263 [DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create),
264 [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
265 /* As an optimization, we allocate an RPC request buffer
266 * for at least a default-sized LOV EA even if we aren't
267 * sending one. We grow the whole request to the next
268 * power-of-two size since we get that much from a slab
269 * allocation anyways. This avoids an allocation below
270 * in the common case where we need to save a
271 * default-sized LOV EA for open replay. */
272 [DLM_INTENT_REC_OFF+2]= max(lmmsize,
273 obddev->u.cli.cl_default_mds_easize) };
274 __u32 repsize[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
275 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
276 [DLM_REPLY_REC_OFF] = sizeof(struct mdt_body),
277 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
279 [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
280 CFS_LIST_HEAD(cancels);
281 int do_join = (it->it_create_mode & M_JOIN_FILE) && data->data;
289 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
290 if (mdc_exp_is_2_0_server(exp)) {
291 size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_rec_create);
292 size[DLM_INTENT_REC_OFF+4] = size[DLM_INTENT_REC_OFF+2];
293 size[DLM_INTENT_REC_OFF+3] = size[DLM_INTENT_REC_OFF+1];
294 size[DLM_INTENT_REC_OFF+2] = 0; /* capa */
295 size[DLM_INTENT_REC_OFF+1] = 0; /* capa */
297 repsize[DLM_REPLY_REC_OFF+3]=sizeof(struct lustre_capa);
298 repsize[DLM_REPLY_REC_OFF+4]=sizeof(struct lustre_capa);
301 rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
304 size[bufcount - 1] = min(size[bufcount - 1] +
306 (__u32)obddev->u.cli.cl_max_mds_easize);
308 /* If inode is known, cancel conflicting OPEN locks. */
310 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
313 else if (it->it_flags & FMODE_EXEC)
318 count = mdc_resource_get_unused(exp, &data->fid2, &cancels,
319 mode, MDS_INODELOCK_OPEN);
322 /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
323 if (it->it_op & IT_CREAT || do_join)
327 count += mdc_resource_get_unused(exp, &data->fid1, &cancels, mode,
328 MDS_INODELOCK_UPDATE);
330 __u64 head_size = (*(__u64 *)data->data);
331 /* join is like an unlink of the tail */
332 if (mdc_exp_is_2_0_server(exp)) {
333 size[DLM_INTENT_REC_OFF+5]=sizeof(struct mdt_rec_join);
335 size[DLM_INTENT_REC_OFF+3]=sizeof(struct mds_rec_join);
339 req = ldlm_prep_enqueue_req(exp, bufcount, size,&cancels,count);
341 mdc_join_pack(req, bufcount - 1, data, head_size);
343 req = ldlm_prep_enqueue_req(exp, bufcount, size,&cancels,count);
344 it->it_create_mode &= ~M_JOIN_FILE;
348 spin_lock(&req->rq_lock);
349 req->rq_replay = req->rq_import->imp_replayable;
350 spin_unlock(&req->rq_lock);
352 /* pack the intent */
353 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
355 lit->opc = (__u64)it->it_op;
357 /* pack the intended request */
358 mdc_open_pack(req, DLM_INTENT_REC_OFF, data,
359 it->it_create_mode, 0, it->it_flags,
362 ptlrpc_req_set_repsize(req, repbufcount, repsize);
367 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
368 struct lookup_intent *it,
369 struct mdc_op_data *data)
371 struct ptlrpc_request *req;
372 struct ldlm_intent *lit;
373 struct obd_device *obddev = class_exp2obd(exp);
374 __u32 size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
375 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
376 [DLM_INTENT_IT_OFF] = sizeof(*lit),
377 [DLM_INTENT_REC_OFF] = mdc_exp_is_2_0_server(exp) ?
378 sizeof(struct mdt_rec_unlink) :
379 sizeof(struct mds_rec_unlink),
380 [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
381 __u32 repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
382 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
383 [DLM_REPLY_REC_OFF] = sizeof(struct mdt_body),
384 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
386 [DLM_REPLY_REC_OFF+2] = obddev->u.cli.
387 cl_max_mds_cookiesize };
390 req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
392 /* pack the intent */
393 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
395 lit->opc = (__u64)it->it_op;
397 /* pack the intended request */
398 mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data);
400 ptlrpc_req_set_repsize(req, 5, repsize);
405 static struct ptlrpc_request *mdc_intent_lookup_pack(struct obd_export *exp,
406 struct lookup_intent *it,
407 struct mdc_op_data *data)
409 struct ptlrpc_request *req;
410 struct ldlm_intent *lit;
411 struct obd_device *obddev = class_exp2obd(exp);
412 __u32 size[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
413 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
414 [DLM_INTENT_IT_OFF] = sizeof(*lit),
415 [DLM_INTENT_REC_OFF] = sizeof(struct mdt_body),
416 [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
417 [DLM_INTENT_REC_OFF+2]= 0 };
418 __u32 repsize[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
419 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
420 [DLM_REPLY_REC_OFF] = sizeof(struct mdt_body),
421 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
423 [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE,
424 [DLM_REPLY_REC_OFF+3] = 0 };
425 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLACL |
426 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA;
430 if (mdc_exp_is_2_0_server(exp)) {
431 size[DLM_INTENT_REC_OFF+1] = 0; /* capa */
432 size[DLM_INTENT_REC_OFF+2] = data->namelen + 1;
435 req = ldlm_prep_enqueue_req(exp, bufcount, size, NULL, 0);
437 /* pack the intent */
438 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
440 lit->opc = (__u64)it->it_op;
442 /* pack the intended request */
443 mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, it->it_flags,
444 data, obddev->u.cli.cl_max_mds_easize);
445 ptlrpc_req_set_repsize(req, bufcount, repsize);
450 static struct ptlrpc_request *mdc_intent_readdir_pack(struct obd_export *exp)
452 struct ptlrpc_request *req;
453 __u32 size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
454 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request) };
455 __u32 repsize[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
456 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
457 [DLM_REPLY_REC_OFF] = sizeof(struct ost_lvb) };
460 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
462 ptlrpc_req_set_repsize(req, 3, repsize);
466 static int mdc_finish_enqueue(struct obd_export *exp,
467 struct ptlrpc_request *req,
468 struct ldlm_enqueue_info *einfo,
469 struct lookup_intent *it,
470 struct lustre_handle *lockh,
473 struct ldlm_request *lockreq;
474 struct ldlm_reply *lockrep;
478 /* Similarly, if we're going to replay this request, we don't want to
479 * actually get a lock, just perform the intent. */
480 if (req->rq_transno || req->rq_replay) {
481 lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
483 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
486 if (rc == ELDLM_LOCK_ABORTED) {
488 memset(lockh, 0, sizeof(*lockh));
490 } else { /* rc = 0 */
491 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
494 /* If the server gave us back a different lock mode, we should
495 * fix up our variables. */
496 if (lock->l_req_mode != einfo->ei_mode) {
497 ldlm_lock_addref(lockh, lock->l_req_mode);
498 ldlm_lock_decref(lockh, einfo->ei_mode);
499 einfo->ei_mode = lock->l_req_mode;
504 lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
506 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
507 /* swabbed by ldlm_cli_enqueue() */
508 LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
510 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
511 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
512 it->d.lustre.it_lock_mode = einfo->ei_mode;
513 it->d.lustre.it_lock_handle = lockh->cookie;
514 it->d.lustre.it_data = req;
516 if (it->d.lustre.it_status < 0 && req->rq_replay)
517 mdc_clear_replay_flag(req, it->d.lustre.it_status);
519 /* If we're doing an IT_OPEN which did not result in an actual
520 * successful open, then we need to remove the bit which saves
521 * this request for unconditional replay.
523 * It's important that we do this first! Otherwise we might exit the
524 * function without doing so, and try to replay a failed create
526 if ((it->it_op & IT_OPEN) &&
528 (!it_disposition(it, DISP_OPEN_OPEN) ||
529 it->d.lustre.it_status != 0))
530 mdc_clear_replay_flag(req, it->d.lustre.it_status);
532 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
533 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
535 /* We know what to expect, so we do any byte flipping required here */
536 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
537 struct mds_body *body;
539 body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
540 lustre_swab_mds_body);
542 CERROR ("Can't swab mds_body\n");
546 /* If this is a successful OPEN request, we need to set
547 replay handler and data early, so that if replay happens
548 immediately after swabbing below, new reply is swabbed
549 by that handler correctly */
550 if (it_disposition(it, DISP_OPEN_OPEN) &&
551 !it_open_error(DISP_OPEN_OPEN, it))
552 mdc_set_open_replay_data(NULL, req);
554 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
557 mdc_update_max_ea_from_body(exp, body);
559 /* The eadata is opaque; just check that it is there.
560 * Eventually, obd_unpackmd() will check the contents */
561 eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
562 body->eadatasize, NULL);
563 if (eadata == NULL) {
564 CERROR ("Missing/short eadata\n");
567 /* We save the reply LOV EA in case we have to replay
568 * a create for recovery. If we didn't allocate a
569 * large enough request buffer above we need to
570 * reallocate it here to hold the actual LOV EA. */
571 if (it->it_op & IT_OPEN) {
572 int offset = DLM_INTENT_REC_OFF;
575 if (mdc_req_is_2_0_server(req))
580 if (lustre_msg_buflen(req->rq_reqmsg, offset) !=
582 mdc_realloc_openmsg(req, body);
584 lmm = lustre_msg_buf(req->rq_reqmsg, offset,
587 memcpy(lmm, eadata, body->eadatasize);
595 /* We always reserve enough space in the reply packet for a stripe MD, because
596 * we don't know in advance the file type. */
597 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
598 struct lookup_intent *it, struct mdc_op_data *data,
599 struct lustre_handle *lockh, void *lmm, int lmmsize,
600 int extra_lock_flags)
602 struct ptlrpc_request *req;
603 struct obd_device *obddev = class_exp2obd(exp);
604 struct ldlm_res_id res_id;
605 ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
606 int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
610 fid_build_reg_res_name((void *)&data->fid1, &res_id);
611 LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
612 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
613 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
615 if (it->it_op & IT_OPEN) {
616 if ((it->it_op & IT_CREAT) && mdc_exp_is_2_0_server(exp)) {
617 struct client_obd *cli = &obddev->u.cli;
618 data->fid3 = data->fid2;
619 rc = mdc_fid_alloc(cli->cl_seq, (void *)&data->fid2);
621 CERROR("fid allocation result: %d\n", rc);
625 req = mdc_intent_open_pack(exp, it, data, lmm, lmmsize);
626 if (it->it_create_mode & M_JOIN_FILE) {
627 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
629 } else if (it->it_op & IT_UNLINK) {
630 req = mdc_intent_unlink_pack(exp, it, data);
631 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
632 req = mdc_intent_lookup_pack(exp, it, data);
633 } else if (it->it_op == IT_READDIR) {
634 req = mdc_intent_readdir_pack(exp);
636 CERROR("bad it_op %x\n", it->it_op);
643 /* It is important to obtain rpc_lock first (if applicable), so that
644 * threads that are serialised with rpc_lock are not polluting our
645 * rpcs in flight counter */
646 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
647 rc = mdc_enter_request(&obddev->u.cli);
649 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags,
650 NULL, 0, NULL, lockh, 0);
651 mdc_exit_request(&obddev->u.cli);
653 CERROR("ldlm_cli_enqueue error: %d\n", rc);
655 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
657 mdc_clear_replay_flag(req, rc);
658 ptlrpc_req_finished(req);
661 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
665 EXPORT_SYMBOL(mdc_enqueue);
667 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
670 /* We could just return 1 immediately, but since we should only
671 * be called in revalidate_it if we already have a lock, let's
673 struct ldlm_res_id res_id;
674 struct lustre_handle lockh;
675 ldlm_policy_data_t policy;
679 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
680 /* As not all attributes are kept under update lock, e.g.
681 owner/group/acls are under lookup lock, we need both
682 ibits for GETATTR. */
683 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
684 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
685 MDS_INODELOCK_LOOKUP;
687 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
688 LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
689 &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
691 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
692 it->d.lustre.it_lock_mode = mode;
697 EXPORT_SYMBOL(mdc_revalidate_lock);
699 static int mdc_finish_intent_lock(struct obd_export *exp,
700 struct ptlrpc_request *req,
701 struct mdc_op_data *data,
702 struct lookup_intent *it,
703 struct lustre_handle *lockh)
705 struct mds_body *mds_body;
706 struct lustre_handle old_lock;
707 struct ldlm_lock *lock;
711 LASSERT(req != NULL);
712 LASSERT(req != LP_POISON);
713 LASSERT(req->rq_repmsg != LP_POISON);
715 if (!it_disposition(it, DISP_IT_EXECD)) {
716 /* The server failed before it even started executing the
717 * intent, i.e. because it couldn't unpack the request. */
718 LASSERT(it->d.lustre.it_status != 0);
719 RETURN(it->d.lustre.it_status);
721 rc = it_open_error(DISP_IT_EXECD, it);
725 mds_body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
727 /* mdc_enqueue checked */
728 LASSERT(mds_body != NULL);
729 /* mdc_enqueue swabbed */
730 LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
732 /* If we were revalidating a fid/name pair, mark the intent in
733 * case we fail and get called again from lookup */
735 if (data->fid2.id && (it->it_op != IT_GETATTR) &&
736 ( !mdc_exp_is_2_0_server(exp) ||
737 (mdc_exp_is_2_0_server(exp) && (it->it_create_mode & M_CHECK_STALE)))) {
738 it_set_disposition(it, DISP_ENQ_COMPLETE);
740 /* Also: did we find the same inode? */
741 if (memcmp(&data->fid2, &mds_body->fid1, sizeof(data->fid2)) &&
742 memcmp(&data->fid3, &mds_body->fid1, sizeof(data->fid3)))
746 rc = it_open_error(DISP_LOOKUP_EXECD, it);
750 /* keep requests around for the multiple phases of the call
751 * this shows the DISP_XX must guarantee we make it into the call
753 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
754 it_disposition(it, DISP_OPEN_CREATE) &&
755 !it_open_error(DISP_OPEN_CREATE, it)) {
756 it_set_disposition(it, DISP_ENQ_CREATE_REF);
757 ptlrpc_request_addref(req); /* balanced in ll_create_node */
759 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
760 it_disposition(it, DISP_OPEN_OPEN) &&
761 !it_open_error(DISP_OPEN_OPEN, it)) {
762 it_set_disposition(it, DISP_ENQ_OPEN_REF);
763 ptlrpc_request_addref(req); /* balanced in ll_file_open */
764 /* BUG 11546 - eviction in the middle of open rpc processing */
765 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
768 if (it->it_op & IT_CREAT) {
769 /* XXX this belongs in ll_create_it */
770 } else if (it->it_op == IT_OPEN) {
771 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
773 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
776 /* If we already have a matching lock, then cancel the new
777 * one. We have to set the data here instead of in
778 * mdc_enqueue, because we need to use the child's inode as
779 * the l_ast_data to match, and that's not available until
780 * intent_finish has performed the iget().) */
781 lock = ldlm_handle2lock(lockh);
783 ldlm_policy_data_t policy = lock->l_policy_data;
785 LDLM_DEBUG(lock, "matching against this");
787 memcpy(&old_lock, lockh, sizeof(*lockh));
788 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
789 LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
790 ldlm_lock_decref_and_cancel(lockh,
791 it->d.lustre.it_lock_mode);
792 memcpy(lockh, &old_lock, sizeof(old_lock));
793 memcpy(&it->d.lustre.it_lock_handle, lockh,
798 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
799 data->namelen, data->name, ldlm_it2str(it->it_op),
800 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
805 * This long block is all about fixing up the lock and request state
806 * so that it is correct as of the moment _before_ the operation was
807 * applied; that way, the VFS will think that everything is normal and
808 * call Lustre's regular VFS methods.
810 * If we're performing a creation, that means that unless the creation
811 * failed with EEXIST, we should fake up a negative dentry.
813 * For everything else, we want to lookup to succeed.
815 * One additional note: if CREATE or OPEN succeeded, we add an extra
816 * reference to the request because we need to keep it around until
817 * ll_create/ll_open gets called.
819 * The server will return to us, in it_disposition, an indication of
820 * exactly what d.lustre.it_status refers to.
822 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
823 * otherwise if DISP_OPEN_CREATE is set, then it status is the
824 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
825 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
828 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
831 int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
832 void *lmm, int lmmsize, struct lookup_intent *it,
833 int lookup_flags, struct ptlrpc_request **reqp,
834 ldlm_blocking_callback cb_blocking, int extra_lock_flags)
836 struct lustre_handle lockh;
842 CDEBUG(D_DLMTRACE,"name: %.*s("DFID") in inode ("DFID"), "
843 "intent: %s flags %#o\n",
844 op_data->namelen, op_data->name,
845 PFID(((void *)&op_data->fid2)),
846 PFID(((void *)&op_data->fid1)),
847 ldlm_it2str(it->it_op), it->it_flags);
850 if (op_data->fid2.id &&
851 (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
852 rc = mdc_revalidate_lock(exp, it, &op_data->fid2);
853 /* Only return failure if it was not GETATTR by cfid
854 (from inode_revalidate) */
855 if (rc || op_data->namelen != 0)
859 /* lookup_it may be called only after revalidate_it has run, because
860 * revalidate_it cannot return errors, only zero. Returning zero causes
861 * this call to lookup, which *can* return an error.
863 * We only want to execute the request associated with the intent one
864 * time, however, so don't send the request again. Instead, skip past
865 * this and use the request from revalidate. In this case, revalidate
866 * never dropped its reference, so the refcounts are all OK */
867 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
868 struct ldlm_enqueue_info einfo =
869 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
870 ldlm_completion_ast, NULL, NULL };
872 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
873 lmm, lmmsize, extra_lock_flags);
876 } else if (!op_data->fid2.id) {
877 /* DISP_ENQ_COMPLETE set means there is extra reference on
878 * request referenced from this intent, saved for subsequent
879 * lookup. This path is executed when we proceed to this
880 * lookup, so we clear DISP_ENQ_COMPLETE */
881 it_clear_disposition(it, DISP_ENQ_COMPLETE);
884 *reqp = it->d.lustre.it_data;
885 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
889 EXPORT_SYMBOL(mdc_intent_lock);
891 static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
894 struct mdc_getattr_args *ga = args;
895 struct obd_export *exp = ga->ga_exp;
896 struct md_enqueue_info *minfo = ga->ga_minfo;
897 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
898 struct lookup_intent *it;
899 struct lustre_handle *lockh;
900 struct obd_device *obddev;
901 int flags = LDLM_FL_HAS_INTENT;
905 lockh = &minfo->mi_lockh;
907 obddev = class_exp2obd(exp);
909 mdc_exit_request(&obddev->u.cli);
910 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
913 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
914 &flags, NULL, 0, NULL, lockh, rc);
916 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
917 mdc_clear_replay_flag(req, rc);
921 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
925 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
929 minfo->mi_cb(exp, req, minfo, rc);
934 int mdc_intent_getattr_async(struct obd_export *exp,
935 struct md_enqueue_info *minfo,
936 struct ldlm_enqueue_info *einfo)
938 struct mdc_op_data *op_data = &minfo->mi_data;
939 struct lookup_intent *it = &minfo->mi_it;
940 struct ptlrpc_request *req;
941 struct mdc_getattr_args *ga;
942 struct obd_device *obddev = class_exp2obd(exp);
943 struct ldlm_res_id res_id;
944 ldlm_policy_data_t policy = {
945 .l_inodebits = { MDS_INODELOCK_LOOKUP }
948 int flags = LDLM_FL_HAS_INTENT;
951 CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
952 op_data->namelen, op_data->name, op_data->fid1.id,
953 ldlm_it2str(it->it_op), it->it_flags);
955 fid_build_reg_res_name((void *)&op_data->fid1, &res_id);
956 req = mdc_intent_lookup_pack(exp, it, op_data);
960 rc = mdc_enter_request(&obddev->u.cli);
962 ptlrpc_req_finished(req);
965 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
966 0, NULL, &minfo->mi_lockh, 1);
968 mdc_exit_request(&obddev->u.cli);
969 ptlrpc_req_finished(req);
973 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
974 ga = ptlrpc_req_async_args(req);
976 ga->ga_minfo = minfo;
977 ga->ga_einfo = einfo;
979 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
980 ptlrpcd_add_req(req);
984 EXPORT_SYMBOL(mdc_intent_getattr_async);