1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_MDC
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
48 # include <liblustre.h>
51 #include <obd_class.h>
52 #include <lustre_dlm.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
56 int it_open_error(int phase, struct lookup_intent *it)
58 if (it_disposition(it, DISP_OPEN_OPEN)) {
59 if (phase >= DISP_OPEN_OPEN)
60 return it->d.lustre.it_status;
65 if (it_disposition(it, DISP_OPEN_CREATE)) {
66 if (phase >= DISP_OPEN_CREATE)
67 return it->d.lustre.it_status;
72 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
73 if (phase >= DISP_LOOKUP_EXECD)
74 return it->d.lustre.it_status;
79 if (it_disposition(it, DISP_IT_EXECD)) {
80 if (phase >= DISP_IT_EXECD)
81 return it->d.lustre.it_status;
85 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
86 it->d.lustre.it_status);
90 EXPORT_SYMBOL(it_open_error);
92 /* this must be called on a lockh that is known to have a referenced lock */
93 void mdc_set_lock_data(__u64 *l, void *data, __u32 *bits)
95 struct ldlm_lock *lock;
96 struct lustre_handle *lockh = (struct lustre_handle *)l;
107 lock = ldlm_handle2lock(lockh);
109 LASSERT(lock != NULL);
110 lock_res_and_lock(lock);
112 if (lock->l_ast_data && lock->l_ast_data != data) {
113 struct inode *new_inode = data;
114 struct inode *old_inode = lock->l_ast_data;
115 LASSERTF(old_inode->i_state & I_FREEING,
116 "Found existing inode %p/%lu/%u state %lu in lock: "
117 "setting data to %p/%lu/%u\n", old_inode,
118 old_inode->i_ino, old_inode->i_generation,
120 new_inode, new_inode->i_ino, new_inode->i_generation);
123 lock->l_ast_data = data;
125 *bits = lock->l_policy_data.l_inodebits.bits;
126 unlock_res_and_lock(lock);
131 EXPORT_SYMBOL(mdc_set_lock_data);
133 int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid,
134 ldlm_iterator_t it, void *data)
136 struct ldlm_res_id res_id;
139 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
140 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
147 /* find any ldlm lock of the inode in mdc
151 int mdc_find_cbdata(struct obd_export *exp, struct ll_fid *fid,
152 ldlm_iterator_t it, void *data)
154 struct ldlm_res_id res_id;
158 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
159 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
161 if (rc == LDLM_ITER_STOP)
163 else if (rc == LDLM_ITER_CONTINUE)
168 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
170 /* Don't hold error requests for replay. */
171 if (req->rq_replay) {
172 spin_lock(&req->rq_lock);
174 spin_unlock(&req->rq_lock);
176 if (rc && req->rq_transno != 0) {
177 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
182 static int round_up(int val)
192 /* Save a large LOV EA into the request buffer so that it is available
193 * for replay. We don't do this in the initial request because the
194 * original request doesn't need this buffer (at most it sends just the
195 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
196 * buffer and may also be difficult to allocate and save a very large
197 * request buffer for each open. (bug 5707)
199 * OOM here may cause recovery failure if lmm is needed (only for the
200 * original open if the MDS crashed just when this client also OOM'd)
201 * but this is incredibly unlikely, and questionable whether the client
202 * could do MDS recovery under OOM anyways... */
203 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
204 struct mds_body *body)
206 int old_len, new_size, old_size;
207 struct lustre_msg *old_msg = req->rq_reqmsg;
208 struct lustre_msg *new_msg;
211 if (mdc_req_is_2_0_server(req))
216 old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + offset);
217 old_size = lustre_packed_msg_size(old_msg);
218 lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + offset,
220 /* old buffer is more then need */
221 if (old_len > body->eadatasize)
224 new_size = lustre_packed_msg_size(old_msg);
226 OBD_ALLOC(new_msg, new_size);
227 if (new_msg != NULL) {
228 DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u",
230 memcpy(new_msg, old_msg, old_size);
232 spin_lock(&req->rq_lock);
233 req->rq_reqmsg = new_msg;
234 req->rq_reqlen = new_size;
235 spin_unlock(&req->rq_lock);
237 OBD_FREE(old_msg, old_size);
239 lustre_msg_set_buflen(old_msg,
240 DLM_INTENT_REC_OFF + offset, old_len);
241 body->valid &= ~OBD_MD_FLEASIZE;
242 body->eadatasize = 0;
246 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
247 struct lookup_intent *it,
248 struct mdc_op_data *data,
249 void *lmm, __u32 lmmsize)
251 struct ptlrpc_request *req;
252 struct ldlm_intent *lit;
253 struct obd_device *obddev = class_exp2obd(exp);
254 __u32 size[9] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
255 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
256 [DLM_INTENT_IT_OFF] = sizeof(*lit),
257 [DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create),
258 [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
259 /* As an optimization, we allocate an RPC request buffer
260 * for at least a default-sized LOV EA even if we aren't
261 * sending one. We grow the whole request to the next
262 * power-of-two size since we get that much from a slab
263 * allocation anyways. This avoids an allocation below
264 * in the common case where we need to save a
265 * default-sized LOV EA for open replay. */
266 [DLM_INTENT_REC_OFF+2]= max(lmmsize,
267 obddev->u.cli.cl_default_mds_easize) };
268 __u32 repsize[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
269 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
270 [DLM_REPLY_REC_OFF] = sizeof(struct mdt_body),
271 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
273 [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
274 CFS_LIST_HEAD(cancels);
275 int do_join = (it->it_create_mode & M_JOIN_FILE) && data->data;
283 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
284 if (mdc_exp_is_2_0_server(exp)) {
285 size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_rec_create);
286 size[DLM_INTENT_REC_OFF+4] = size[DLM_INTENT_REC_OFF+2];
287 size[DLM_INTENT_REC_OFF+3] = size[DLM_INTENT_REC_OFF+1];
288 size[DLM_INTENT_REC_OFF+2] = 0; /* capa */
289 size[DLM_INTENT_REC_OFF+1] = 0; /* capa */
291 repsize[DLM_REPLY_REC_OFF+3]=sizeof(struct lustre_capa);
292 repsize[DLM_REPLY_REC_OFF+4]=sizeof(struct lustre_capa);
295 rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
298 size[bufcount - 1] = min(size[bufcount - 1] + round_up(rc) - rc,
299 (__u32)obddev->u.cli.cl_max_mds_easize);
301 /* If inode is known, cancel conflicting OPEN locks. */
303 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
306 else if (it->it_flags & FMODE_EXEC)
311 count = mdc_resource_get_unused(exp, &data->fid2, &cancels,
312 mode, MDS_INODELOCK_OPEN);
315 /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
316 if (it->it_op & IT_CREAT || do_join)
320 count += mdc_resource_get_unused(exp, &data->fid1, &cancels, mode,
321 MDS_INODELOCK_UPDATE);
323 __u64 head_size = (*(__u64 *)data->data);
324 /* join is like an unlink of the tail */
325 if (mdc_exp_is_2_0_server(exp)) {
326 size[DLM_INTENT_REC_OFF+5]=sizeof(struct mdt_rec_join);
328 size[DLM_INTENT_REC_OFF+3]=sizeof(struct mds_rec_join);
332 req = ldlm_prep_enqueue_req(exp, bufcount, size,&cancels,count);
334 mdc_join_pack(req, bufcount - 1, data, head_size);
336 req = ldlm_prep_enqueue_req(exp, bufcount, size,&cancels,count);
337 it->it_create_mode &= ~M_JOIN_FILE;
341 spin_lock(&req->rq_lock);
342 req->rq_replay = req->rq_import->imp_replayable;
343 spin_unlock(&req->rq_lock);
345 /* pack the intent */
346 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
348 lit->opc = (__u64)it->it_op;
350 /* pack the intended request */
351 mdc_open_pack(req, DLM_INTENT_REC_OFF, data,
352 it->it_create_mode, 0, it->it_flags,
355 ptlrpc_req_set_repsize(req, repbufcount, repsize);
360 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
361 struct lookup_intent *it,
362 struct mdc_op_data *data)
364 struct ptlrpc_request *req;
365 struct ldlm_intent *lit;
366 struct obd_device *obddev = class_exp2obd(exp);
367 __u32 size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
368 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
369 [DLM_INTENT_IT_OFF] = sizeof(*lit),
370 [DLM_INTENT_REC_OFF] = mdc_exp_is_2_0_server(exp) ?
371 sizeof(struct mdt_rec_unlink) :
372 sizeof(struct mds_rec_unlink),
373 [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
374 __u32 repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
375 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
376 [DLM_REPLY_REC_OFF] = sizeof(struct mdt_body),
377 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
379 [DLM_REPLY_REC_OFF+2] = obddev->u.cli.
380 cl_max_mds_cookiesize };
383 req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
385 /* pack the intent */
386 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
388 lit->opc = (__u64)it->it_op;
390 /* pack the intended request */
391 mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data);
393 ptlrpc_req_set_repsize(req, 5, repsize);
398 static struct ptlrpc_request *mdc_intent_lookup_pack(struct obd_export *exp,
399 struct lookup_intent *it,
400 struct mdc_op_data *data)
402 struct ptlrpc_request *req;
403 struct ldlm_intent *lit;
404 struct obd_device *obddev = class_exp2obd(exp);
405 __u32 size[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
406 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
407 [DLM_INTENT_IT_OFF] = sizeof(*lit),
408 [DLM_INTENT_REC_OFF] = sizeof(struct mdt_body),
409 [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
410 [DLM_INTENT_REC_OFF+2]= 0 };
411 __u32 repsize[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
412 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
413 [DLM_REPLY_REC_OFF] = sizeof(struct mdt_body),
414 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
416 [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE,
417 [DLM_REPLY_REC_OFF+3] = 0 };
418 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLACL |
419 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA;
423 if (mdc_exp_is_2_0_server(exp)) {
424 size[DLM_INTENT_REC_OFF+1] = 0; /* capa */
425 size[DLM_INTENT_REC_OFF+2] = data->namelen + 1;
428 req = ldlm_prep_enqueue_req(exp, bufcount, size, NULL, 0);
430 /* pack the intent */
431 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
433 lit->opc = (__u64)it->it_op;
435 /* pack the intended request */
436 mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, it->it_flags,
438 ptlrpc_req_set_repsize(req, bufcount, repsize);
443 static struct ptlrpc_request *mdc_intent_readdir_pack(struct obd_export *exp)
445 struct ptlrpc_request *req;
446 __u32 size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
447 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request) };
448 __u32 repsize[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
449 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
450 [DLM_REPLY_REC_OFF] = sizeof(struct ost_lvb) };
453 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
455 ptlrpc_req_set_repsize(req, 3, repsize);
459 static int mdc_finish_enqueue(struct obd_export *exp,
460 struct ptlrpc_request *req,
461 struct ldlm_enqueue_info *einfo,
462 struct lookup_intent *it,
463 struct lustre_handle *lockh,
466 struct ldlm_request *lockreq;
467 struct ldlm_reply *lockrep;
471 /* Similarly, if we're going to replay this request, we don't want to
472 * actually get a lock, just perform the intent. */
473 if (req->rq_transno || req->rq_replay) {
474 lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
476 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
479 if (rc == ELDLM_LOCK_ABORTED) {
481 memset(lockh, 0, sizeof(*lockh));
483 } else { /* rc = 0 */
484 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
487 /* If the server gave us back a different lock mode, we should
488 * fix up our variables. */
489 if (lock->l_req_mode != einfo->ei_mode) {
490 ldlm_lock_addref(lockh, lock->l_req_mode);
491 ldlm_lock_decref(lockh, einfo->ei_mode);
492 einfo->ei_mode = lock->l_req_mode;
497 lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
499 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
500 /* swabbed by ldlm_cli_enqueue() */
501 LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
503 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
504 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
505 it->d.lustre.it_lock_mode = einfo->ei_mode;
506 it->d.lustre.it_lock_handle = lockh->cookie;
507 it->d.lustre.it_data = req;
509 if (it->d.lustre.it_status < 0 && req->rq_replay)
510 mdc_clear_replay_flag(req, it->d.lustre.it_status);
512 /* If we're doing an IT_OPEN which did not result in an actual
513 * successful open, then we need to remove the bit which saves
514 * this request for unconditional replay.
516 * It's important that we do this first! Otherwise we might exit the
517 * function without doing so, and try to replay a failed create
519 if ((it->it_op & IT_OPEN) &&
521 (!it_disposition(it, DISP_OPEN_OPEN) ||
522 it->d.lustre.it_status != 0))
523 mdc_clear_replay_flag(req, it->d.lustre.it_status);
525 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
526 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
528 /* We know what to expect, so we do any byte flipping required here */
529 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
530 struct mds_body *body;
532 body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
533 lustre_swab_mds_body);
535 CERROR ("Can't swab mds_body\n");
539 /* If this is a successful OPEN request, we need to set
540 replay handler and data early, so that if replay happens
541 immediately after swabbing below, new reply is swabbed
542 by that handler correctly */
543 if (it_disposition(it, DISP_OPEN_OPEN) &&
544 !it_open_error(DISP_OPEN_OPEN, it))
545 mdc_set_open_replay_data(NULL, req);
547 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
550 mdc_update_max_ea_from_body(exp, body);
552 /* The eadata is opaque; just check that it is there.
553 * Eventually, obd_unpackmd() will check the contents */
554 eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
555 body->eadatasize, NULL);
556 if (eadata == NULL) {
557 CERROR ("Missing/short eadata\n");
560 /* We save the reply LOV EA in case we have to replay
561 * a create for recovery. If we didn't allocate a
562 * large enough request buffer above we need to
563 * reallocate it here to hold the actual LOV EA. */
564 if (it->it_op & IT_OPEN) {
565 int offset = DLM_INTENT_REC_OFF;
568 if (mdc_req_is_2_0_server(req))
573 if (lustre_msg_buflen(req->rq_reqmsg, offset) !=
575 mdc_realloc_openmsg(req, body);
577 lmm = lustre_msg_buf(req->rq_reqmsg, offset,
580 memcpy(lmm, eadata, body->eadatasize);
588 /* We always reserve enough space in the reply packet for a stripe MD, because
589 * we don't know in advance the file type. */
590 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
591 struct lookup_intent *it, struct mdc_op_data *data,
592 struct lustre_handle *lockh, void *lmm, int lmmsize,
593 int extra_lock_flags)
595 struct ptlrpc_request *req;
596 struct obd_device *obddev = class_exp2obd(exp);
597 struct ldlm_res_id res_id;
598 ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
599 int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
603 fid_build_reg_res_name((void *)&data->fid1, &res_id);
604 LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
605 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
606 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
608 if (it->it_op & IT_OPEN) {
609 if ((it->it_op & IT_CREAT) && mdc_exp_is_2_0_server(exp)) {
610 struct client_obd *cli = &obddev->u.cli;
611 data->fid3 = data->fid2;
612 rc = mdc_fid_alloc(cli->cl_seq, (void *)&data->fid2);
614 CERROR("fid allocation result: %d\n", rc);
618 req = mdc_intent_open_pack(exp, it, data, lmm, lmmsize);
619 if (it->it_create_mode & M_JOIN_FILE) {
620 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
622 } else if (it->it_op & IT_UNLINK) {
623 req = mdc_intent_unlink_pack(exp, it, data);
624 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
625 req = mdc_intent_lookup_pack(exp, it, data);
626 } else if (it->it_op == IT_READDIR) {
627 req = mdc_intent_readdir_pack(exp);
629 CERROR("bad it_op %x\n", it->it_op);
636 /* It is important to obtain rpc_lock first (if applicable), so that
637 * threads that are serialised with rpc_lock are not polluting our
638 * rpcs in flight counter */
639 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
640 mdc_enter_request(&obddev->u.cli);
641 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
643 mdc_exit_request(&obddev->u.cli);
644 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
646 CERROR("ldlm_cli_enqueue: %d\n", rc);
647 mdc_clear_replay_flag(req, rc);
648 ptlrpc_req_finished(req);
651 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
655 EXPORT_SYMBOL(mdc_enqueue);
657 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
660 /* We could just return 1 immediately, but since we should only
661 * be called in revalidate_it if we already have a lock, let's
663 struct ldlm_res_id res_id;
664 struct lustre_handle lockh;
665 ldlm_policy_data_t policy;
669 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
670 /* As not all attributes are kept under update lock, e.g.
671 owner/group/acls are under lookup lock, we need both
672 ibits for GETATTR. */
673 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
674 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
675 MDS_INODELOCK_LOOKUP;
677 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
678 LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
679 &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
681 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
682 it->d.lustre.it_lock_mode = mode;
687 EXPORT_SYMBOL(mdc_revalidate_lock);
689 static int mdc_finish_intent_lock(struct obd_export *exp,
690 struct ptlrpc_request *req,
691 struct mdc_op_data *data,
692 struct lookup_intent *it,
693 struct lustre_handle *lockh)
695 struct mds_body *mds_body;
696 struct lustre_handle old_lock;
697 struct ldlm_lock *lock;
701 LASSERT(req != NULL);
702 LASSERT(req != LP_POISON);
703 LASSERT(req->rq_repmsg != LP_POISON);
705 if (!it_disposition(it, DISP_IT_EXECD)) {
706 /* The server failed before it even started executing the
707 * intent, i.e. because it couldn't unpack the request. */
708 LASSERT(it->d.lustre.it_status != 0);
709 RETURN(it->d.lustre.it_status);
711 rc = it_open_error(DISP_IT_EXECD, it);
715 mds_body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
717 /* mdc_enqueue checked */
718 LASSERT(mds_body != NULL);
719 /* mdc_enqueue swabbed */
720 LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
722 /* If we were revalidating a fid/name pair, mark the intent in
723 * case we fail and get called again from lookup */
725 if (data->fid2.id && (it->it_op != IT_GETATTR) &&
726 ( !mdc_exp_is_2_0_server(exp) ||
727 (mdc_exp_is_2_0_server(exp) && (it->it_create_mode & M_CHECK_STALE)))) {
728 it_set_disposition(it, DISP_ENQ_COMPLETE);
730 /* Also: did we find the same inode? */
731 if (memcmp(&data->fid2, &mds_body->fid1, sizeof(data->fid2)) &&
732 memcmp(&data->fid3, &mds_body->fid1, sizeof(data->fid3)))
736 rc = it_open_error(DISP_LOOKUP_EXECD, it);
740 /* keep requests around for the multiple phases of the call
741 * this shows the DISP_XX must guarantee we make it into the call
743 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
744 it_disposition(it, DISP_OPEN_CREATE) &&
745 !it_open_error(DISP_OPEN_CREATE, it)) {
746 it_set_disposition(it, DISP_ENQ_CREATE_REF);
747 ptlrpc_request_addref(req); /* balanced in ll_create_node */
749 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
750 it_disposition(it, DISP_OPEN_OPEN) &&
751 !it_open_error(DISP_OPEN_OPEN, it)) {
752 it_set_disposition(it, DISP_ENQ_OPEN_REF);
753 ptlrpc_request_addref(req); /* balanced in ll_file_open */
754 /* BUG 11546 - eviction in the middle of open rpc processing */
755 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
758 if (it->it_op & IT_CREAT) {
759 /* XXX this belongs in ll_create_it */
760 } else if (it->it_op == IT_OPEN) {
761 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
763 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
766 /* If we already have a matching lock, then cancel the new
767 * one. We have to set the data here instead of in
768 * mdc_enqueue, because we need to use the child's inode as
769 * the l_ast_data to match, and that's not available until
770 * intent_finish has performed the iget().) */
771 lock = ldlm_handle2lock(lockh);
773 ldlm_policy_data_t policy = lock->l_policy_data;
775 LDLM_DEBUG(lock, "matching against this");
777 memcpy(&old_lock, lockh, sizeof(*lockh));
778 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
779 LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
780 ldlm_lock_decref_and_cancel(lockh,
781 it->d.lustre.it_lock_mode);
782 memcpy(lockh, &old_lock, sizeof(old_lock));
783 memcpy(&it->d.lustre.it_lock_handle, lockh,
788 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
789 data->namelen, data->name, ldlm_it2str(it->it_op),
790 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
795 * This long block is all about fixing up the lock and request state
796 * so that it is correct as of the moment _before_ the operation was
797 * applied; that way, the VFS will think that everything is normal and
798 * call Lustre's regular VFS methods.
800 * If we're performing a creation, that means that unless the creation
801 * failed with EEXIST, we should fake up a negative dentry.
803 * For everything else, we want to lookup to succeed.
805 * One additional note: if CREATE or OPEN succeeded, we add an extra
806 * reference to the request because we need to keep it around until
807 * ll_create/ll_open gets called.
809 * The server will return to us, in it_disposition, an indication of
810 * exactly what d.lustre.it_status refers to.
812 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
813 * otherwise if DISP_OPEN_CREATE is set, then it status is the
814 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
815 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
818 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
821 int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
822 void *lmm, int lmmsize, struct lookup_intent *it,
823 int lookup_flags, struct ptlrpc_request **reqp,
824 ldlm_blocking_callback cb_blocking, int extra_lock_flags)
826 struct lustre_handle lockh;
832 CDEBUG(D_DLMTRACE,"name: %.*s("DFID") in inode ("DFID"), "
833 "intent: %s flags %#o\n",
834 op_data->namelen, op_data->name,
835 PFID(((void *)&op_data->fid2)),
836 PFID(((void *)&op_data->fid1)),
837 ldlm_it2str(it->it_op), it->it_flags);
840 if (op_data->fid2.id &&
841 (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
842 rc = mdc_revalidate_lock(exp, it, &op_data->fid2);
843 /* Only return failure if it was not GETATTR by cfid
844 (from inode_revalidate) */
845 if (rc || op_data->namelen != 0)
849 /* lookup_it may be called only after revalidate_it has run, because
850 * revalidate_it cannot return errors, only zero. Returning zero causes
851 * this call to lookup, which *can* return an error.
853 * We only want to execute the request associated with the intent one
854 * time, however, so don't send the request again. Instead, skip past
855 * this and use the request from revalidate. In this case, revalidate
856 * never dropped its reference, so the refcounts are all OK */
857 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
858 struct ldlm_enqueue_info einfo =
859 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
860 ldlm_completion_ast, NULL, NULL };
862 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
863 lmm, lmmsize, extra_lock_flags);
866 } else if (!op_data->fid2.id) {
867 /* DISP_ENQ_COMPLETE set means there is extra reference on
868 * request referenced from this intent, saved for subsequent
869 * lookup. This path is executed when we proceed to this
870 * lookup, so we clear DISP_ENQ_COMPLETE */
871 it_clear_disposition(it, DISP_ENQ_COMPLETE);
874 *reqp = it->d.lustre.it_data;
875 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
879 EXPORT_SYMBOL(mdc_intent_lock);
881 static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
882 void *unused, int rc)
884 struct obd_export *exp = req->rq_async_args.pointer_arg[0];
885 struct md_enqueue_info *minfo = req->rq_async_args.pointer_arg[1];
886 struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
887 struct lookup_intent *it;
888 struct lustre_handle *lockh;
889 struct obd_device *obddev;
890 int flags = LDLM_FL_HAS_INTENT;
894 lockh = &minfo->mi_lockh;
896 obddev = class_exp2obd(exp);
898 mdc_exit_request(&obddev->u.cli);
899 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
902 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
903 &flags, NULL, 0, NULL, lockh, rc);
905 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
906 mdc_clear_replay_flag(req, rc);
910 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
914 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
918 minfo->mi_cb(exp, req, minfo, rc);
923 int mdc_intent_getattr_async(struct obd_export *exp,
924 struct md_enqueue_info *minfo,
925 struct ldlm_enqueue_info *einfo)
927 struct mdc_op_data *op_data = &minfo->mi_data;
928 struct lookup_intent *it = &minfo->mi_it;
929 struct ptlrpc_request *req;
930 struct obd_device *obddev = class_exp2obd(exp);
931 struct ldlm_res_id res_id;
932 ldlm_policy_data_t policy = {
933 .l_inodebits = { MDS_INODELOCK_LOOKUP }
936 int flags = LDLM_FL_HAS_INTENT;
939 CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
940 op_data->namelen, op_data->name, op_data->fid1.id,
941 ldlm_it2str(it->it_op), it->it_flags);
943 fid_build_reg_res_name((void *)&op_data->fid1, &res_id);
944 req = mdc_intent_lookup_pack(exp, it, op_data);
948 mdc_enter_request(&obddev->u.cli);
949 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
950 0, NULL, &minfo->mi_lockh, 1);
952 mdc_exit_request(&obddev->u.cli);
956 req->rq_async_args.pointer_arg[0] = exp;
957 req->rq_async_args.pointer_arg[1] = minfo;
958 req->rq_async_args.pointer_arg[2] = einfo;
959 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
960 ptlrpcd_add_req(req);
964 EXPORT_SYMBOL(mdc_intent_getattr_async);