Whamcloud - gitweb
Land b1_6_elc onto b1_6 (20070621_0218)
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_MDC
29
30 #ifdef __KERNEL__
31 # include <linux/module.h>
32 # include <linux/pagemap.h>
33 # include <linux/miscdevice.h>
34 # include <linux/init.h>
35 #else
36 # include <liblustre.h>
37 #endif
38
39 #include <obd_class.h>
40 #include <lustre_dlm.h>
41 #include <lprocfs_status.h>
42 #include "mdc_internal.h"
43
44 int it_disposition(struct lookup_intent *it, int flag)
45 {
46         return it->d.lustre.it_disposition & flag;
47 }
48 EXPORT_SYMBOL(it_disposition);
49
50 void it_set_disposition(struct lookup_intent *it, int flag)
51 {
52         it->d.lustre.it_disposition |= flag;
53 }
54 EXPORT_SYMBOL(it_set_disposition);
55
56 void it_clear_disposition(struct lookup_intent *it, int flag)
57 {
58         it->d.lustre.it_disposition &= ~flag;
59 }
60 EXPORT_SYMBOL(it_clear_disposition);
61
62 static int it_to_lock_mode(struct lookup_intent *it)
63 {
64         /* CREAT needs to be tested before open (both could be set) */
65         if (it->it_op & IT_CREAT)
66                 return LCK_CW;
67         else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP))
68                 return LCK_CR;
69
70         LBUG();
71         RETURN(-EINVAL);
72 }
73
74 int it_open_error(int phase, struct lookup_intent *it)
75 {
76         if (it_disposition(it, DISP_OPEN_OPEN)) {
77                 if (phase >= DISP_OPEN_OPEN)
78                         return it->d.lustre.it_status;
79                 else
80                         return 0;
81         }
82
83         if (it_disposition(it, DISP_OPEN_CREATE)) {
84                 if (phase >= DISP_OPEN_CREATE)
85                         return it->d.lustre.it_status;
86                 else
87                         return 0;
88         }
89
90         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
91                 if (phase >= DISP_LOOKUP_EXECD)
92                         return it->d.lustre.it_status;
93                 else
94                         return 0;
95         }
96
97         if (it_disposition(it, DISP_IT_EXECD)) {
98                 if (phase >= DISP_IT_EXECD)
99                         return it->d.lustre.it_status;
100                 else
101                         return 0;
102         }
103         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
104                it->d.lustre.it_status);
105         LBUG();
106         return 0;
107 }
108 EXPORT_SYMBOL(it_open_error);
109
110 /* this must be called on a lockh that is known to have a referenced lock */
111 void mdc_set_lock_data(__u64 *l, void *data)
112 {
113         struct ldlm_lock *lock;
114         struct lustre_handle *lockh = (struct lustre_handle *)l;
115         ENTRY;
116
117         if (!*l) {
118                 EXIT;
119                 return;
120         }
121
122         lock = ldlm_handle2lock(lockh);
123
124         LASSERT(lock != NULL);
125         lock_res_and_lock(lock);
126 #ifdef __KERNEL__
127         if (lock->l_ast_data && lock->l_ast_data != data) {
128                 struct inode *new_inode = data;
129                 struct inode *old_inode = lock->l_ast_data;
130                 LASSERTF(old_inode->i_state & I_FREEING,
131                          "Found existing inode %p/%lu/%u state %lu in lock: "
132                          "setting data to %p/%lu/%u\n", old_inode,
133                          old_inode->i_ino, old_inode->i_generation,
134                          old_inode->i_state,
135                          new_inode, new_inode->i_ino, new_inode->i_generation);
136         }
137 #endif
138         lock->l_ast_data = data;
139         unlock_res_and_lock(lock);
140         LDLM_LOCK_PUT(lock);
141
142         EXIT;
143 }
144 EXPORT_SYMBOL(mdc_set_lock_data);
145
146 int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid, 
147                       ldlm_iterator_t it, void *data)
148 {
149         struct ldlm_res_id res_id = { .name = {0} };
150         ENTRY;
151
152         res_id.name[0] = fid->id;
153         res_id.name[1] = fid->generation;
154
155         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
156                               it, data);
157
158         EXIT;
159         return 0;
160 }
161
162 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
163 {
164         /* Don't hold error requests for replay. */
165         if (req->rq_replay) {
166                 spin_lock(&req->rq_lock);
167                 req->rq_replay = 0;
168                 spin_unlock(&req->rq_lock);
169         }
170         if (rc && req->rq_transno != 0) {
171                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
172                 LBUG();
173         }
174 }
175
176 static int round_up(int val)
177 {
178         int ret = 1;
179         while (val) {
180                 val >>= 1;
181                 ret <<= 1;
182         }
183         return ret;
184 }
185
186 /* Save a large LOV EA into the request buffer so that it is available
187  * for replay.  We don't do this in the initial request because the
188  * original request doesn't need this buffer (at most it sends just the
189  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
190  * buffer and may also be difficult to allocate and save a very large
191  * request buffer for each open. (bug 5707)
192  *
193  * OOM here may cause recovery failure if lmm is needed (only for the
194  * original open if the MDS crashed just when this client also OOM'd)
195  * but this is incredibly unlikely, and questionable whether the client
196  * could do MDS recovery under OOM anyways... */
197 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
198                                 struct mds_body *body, int size[6])
199 {
200         int new_size, old_size;
201         struct lustre_msg *new_msg;
202
203         /* save old size */
204         old_size = lustre_msg_size(lustre_request_magic(req), 6, size);
205
206         size[DLM_INTENT_REC_OFF + 2] = body->eadatasize;
207         new_size = lustre_msg_size(lustre_request_magic(req), 6, size);
208         OBD_ALLOC(new_msg, new_size);
209         if (new_msg != NULL) {
210                 struct lustre_msg *old_msg = req->rq_reqmsg;
211
212                 DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u\n",
213                           body->eadatasize);
214                 memcpy(new_msg, old_msg, old_size);
215                 lustre_msg_set_buflen(new_msg, DLM_INTENT_REC_OFF + 2,
216                                       body->eadatasize);
217
218                 spin_lock(&req->rq_lock);
219                 req->rq_reqmsg = new_msg;
220                 req->rq_reqlen = new_size;
221                 spin_unlock(&req->rq_lock);
222
223                 OBD_FREE(old_msg, old_size);
224         } else {
225                 body->valid &= ~OBD_MD_FLEASIZE;
226                 body->eadatasize = 0;
227         }
228 }
229
230 /* We always reserve enough space in the reply packet for a stripe MD, because
231  * we don't know in advance the file type. */
232 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
233                 struct lookup_intent *it, struct mdc_op_data *op_data,
234                 struct lustre_handle *lockh, void *lmm, int lmmsize,
235                 int extra_lock_flags)
236 {
237         struct ptlrpc_request *req;
238         struct obd_device *obddev = class_exp2obd(exp);
239         struct ldlm_res_id res_id =
240                 { .name = {op_data->fid1.id, op_data->fid1.generation} };
241         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
242         struct ldlm_request *lockreq;
243         struct ldlm_intent *lit;
244         struct ldlm_reply *lockrep;
245         int size[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
246                         [DLM_LOCKREQ_OFF]     = sizeof(*lockreq),
247                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
248                         0, 0, 0, 0 };
249         int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
250                            [DLM_LOCKREPLY_OFF]   = sizeof(*lockrep),
251                            [DLM_REPLY_REC_OFF]   = sizeof(struct mds_body),
252                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
253                                                    cl_max_mds_easize, 0 };
254         int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
255         int repbufcnt = 4, rc;
256         void *eadata;
257         ENTRY;
258
259         LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
260 //        LDLM_DEBUG_NOLOCK("mdsintent=%s,name=%s,dir=%lu",
261 //                          ldlm_it2str(it->it_op), it_name, it_inode->i_ino);
262
263         if (it->it_op & IT_OPEN) {
264                 CFS_LIST_HEAD(cancels);
265                 int count = 0;
266                 int mode;
267                 
268                 it->it_create_mode |= S_IFREG;
269
270                 size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create);
271                 size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
272                 /* As an optimization, we allocate an RPC request buffer for
273                  * at least a default-sized LOV EA even if we aren't sending
274                  * one.  We grow the whole request to the next power-of-two
275                  * size since we get that much from a slab allocation anyways.
276                  * This avoids an allocation below in the common case where
277                  * we need to save a default-sized LOV EA for open replay. */
278                 size[DLM_INTENT_REC_OFF + 2] = max(lmmsize,
279                                           obddev->u.cli.cl_default_mds_easize);
280                 rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, 6,
281                                      size);
282                 if (rc & (rc - 1))
283                         size[DLM_INTENT_REC_OFF + 2] =
284                                  min(size[DLM_INTENT_REC_OFF+2]+round_up(rc)-rc,
285                                      obddev->u.cli.cl_max_mds_easize);
286
287                 /* If inode is known, cancel conflicting OPEN locks. */
288                 if (op_data->fid2.id) {
289                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
290                                 mode = LCK_CW;
291 #ifdef FMODE_EXEC
292                         else if (it->it_flags & FMODE_EXEC)
293                                 mode = LCK_PR;
294 #endif
295                         else 
296                                 mode = LCK_CR;
297                         count = mdc_resource_get_unused(exp, &op_data->fid2,
298                                                         &cancels, mode,
299                                                         MDS_INODELOCK_OPEN);
300                 }
301
302                 /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
303                 if (it->it_op & IT_CREAT || it->it_flags & O_JOIN_FILE)
304                         mode = LCK_EX;
305                 else
306                         mode = LCK_CR;
307                 count += mdc_resource_get_unused(exp, &op_data->fid1, &cancels,
308                                                  mode, MDS_INODELOCK_UPDATE);
309                 if (it->it_flags & O_JOIN_FILE) {
310                         /* join is like an unlink of the tail */
311                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
312                         size[DLM_INTENT_REC_OFF + 3] =
313                                                  sizeof(struct mds_rec_join);
314                         req = ldlm_prep_enqueue_req(exp, 7, size, &cancels,
315                                                     count);
316                         mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, op_data,
317                                       (*(__u64 *)op_data->data));
318                 } else {
319                         req = ldlm_prep_enqueue_req(exp, 6, size, &cancels,
320                                                     count);
321                 }
322
323                 if (!req)
324                         RETURN(-ENOMEM);
325
326                 spin_lock(&req->rq_lock);
327                 req->rq_replay = 1;
328                 spin_unlock(&req->rq_lock);
329
330                 /* pack the intent */
331                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
332                                      sizeof(*lit));
333                 lit->opc = (__u64)it->it_op;
334
335                 /* pack the intended request */
336                 mdc_open_pack(req, DLM_INTENT_REC_OFF, op_data,
337                               it->it_create_mode, 0, it->it_flags,
338                               lmm, lmmsize);
339
340                 repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
341         } else if (it->it_op & IT_UNLINK) {
342                 size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_unlink);
343                 size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
344                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
345                 req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
346                 if (!req)
347                         RETURN(-ENOMEM);
348
349                 /* pack the intent */
350                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
351                                      sizeof(*lit));
352                 lit->opc = (__u64)it->it_op;
353
354                 /* pack the intended request */
355                 mdc_unlink_pack(req, DLM_INTENT_REC_OFF, op_data);
356
357                 repsize[repbufcnt++] = obddev->u.cli.cl_max_mds_cookiesize;
358         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
359                 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
360                                   OBD_MD_FLACL | OBD_MD_FLMODEASIZE |
361                                   OBD_MD_FLDIREA;
362                 size[DLM_INTENT_REC_OFF] = sizeof(struct mds_body);
363                 size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
364
365                 if (it->it_op & IT_GETATTR)
366                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
367
368                 req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
369                 if (!req)
370                         RETURN(-ENOMEM);
371
372                 /* pack the intent */
373                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
374                                      sizeof(*lit));
375                 lit->opc = (__u64)it->it_op;
376
377                 /* pack the intended request */
378                 mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid,
379                                  it->it_flags, op_data);
380
381                 repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
382         } else if (it->it_op == IT_READDIR) {
383                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
384                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
385                 if (!req)
386                         RETURN(-ENOMEM);
387
388                 repbufcnt = 2;
389         } else {
390                 LBUG();
391                 RETURN(-EINVAL);
392         }
393
394         /* get ready for the reply */
395         ptlrpc_req_set_repsize(req, repbufcnt, repsize);
396
397          /* It is important to obtain rpc_lock first (if applicable), so that
398           * threads that are serialised with rpc_lock are not polluting our
399           * rpcs in flight counter */
400         mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
401         mdc_enter_request(&obddev->u.cli);
402         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
403                               0, NULL, lockh, 0);
404         mdc_exit_request(&obddev->u.cli);
405         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
406
407         /* Similarly, if we're going to replay this request, we don't want to
408          * actually get a lock, just perform the intent. */
409         if (req->rq_transno || req->rq_replay) {
410                 lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
411                                          sizeof(*lockreq));
412                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
413         }
414
415         /* This can go when we're sure that this can never happen */
416         LASSERT(rc != -ENOENT);
417         if (rc == ELDLM_LOCK_ABORTED) {
418                 einfo->ei_mode = 0;
419                 memset(lockh, 0, sizeof(*lockh));
420                 rc = 0;
421         } else if (rc != 0) {
422                 CERROR("ldlm_cli_enqueue: %d\n", rc);
423                 LASSERTF(rc < 0, "rc %d\n", rc);
424                 mdc_clear_replay_flag(req, rc);
425                 ptlrpc_req_finished(req);
426                 RETURN(rc);
427         } else { /* rc = 0 */
428                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
429                 LASSERT(lock);
430
431                 /* If the server gave us back a different lock mode, we should
432                  * fix up our variables. */
433                 if (lock->l_req_mode != einfo->ei_mode) {
434                         ldlm_lock_addref(lockh, lock->l_req_mode);
435                         ldlm_lock_decref(lockh, einfo->ei_mode);
436                         einfo->ei_mode = lock->l_req_mode;
437                 }
438                 LDLM_LOCK_PUT(lock);
439         }
440
441         lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
442                                  sizeof(*lockrep));
443         LASSERT(lockrep != NULL);                 /* checked by ldlm_cli_enqueue() */
444         LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF); /* swabbed by ldlm_cli_enqueue() */
445
446         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
447         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
448         it->d.lustre.it_lock_mode = einfo->ei_mode;
449         it->d.lustre.it_data = req;
450
451         if (it->d.lustre.it_status < 0 && req->rq_replay)
452                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
453
454         /* If we're doing an IT_OPEN which did not result in an actual
455          * successful open, then we need to remove the bit which saves
456          * this request for unconditional replay.
457          *
458          * It's important that we do this first!  Otherwise we might exit the
459          * function without doing so, and try to replay a failed create
460          * (bug 3440) */
461         if (it->it_op & IT_OPEN && req->rq_replay &&
462             (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
463                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
464
465         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
466                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
467
468         /* We know what to expect, so we do any byte flipping required here */
469         LASSERT(repbufcnt == 5 || repbufcnt == 2);
470         if (repbufcnt == 5) {
471                 struct mds_body *body;
472
473                 body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
474                                          lustre_swab_mds_body);
475                 if (body == NULL) {
476                         CERROR ("Can't swab mds_body\n");
477                         RETURN (-EPROTO);
478                 }
479
480                 /* If this is a successful OPEN request, we need to set
481                    replay handler and data early, so that if replay happens
482                    immediately after swabbing below, new reply is swabbed
483                    by that handler correctly */
484                 if (it_disposition(it, DISP_OPEN_OPEN) &&
485                     !it_open_error(DISP_OPEN_OPEN, it))
486                         mdc_set_open_replay_data(NULL, req);
487
488                 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
489                         /* The eadata is opaque; just check that it is there.
490                          * Eventually, obd_unpackmd() will check the contents */
491                         eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
492                                                     body->eadatasize, NULL);
493                         if (eadata == NULL) {
494                                 CERROR ("Missing/short eadata\n");
495                                 RETURN (-EPROTO);
496                         }
497                         if (body->valid & OBD_MD_FLMODEASIZE) {
498                                 if (obddev->u.cli.cl_max_mds_easize < 
499                                                         body->max_mdsize) {
500                                         obddev->u.cli.cl_max_mds_easize = 
501                                                 body->max_mdsize;
502                                         CDEBUG(D_INFO, "maxeasize become %d\n",
503                                                body->max_mdsize);
504                                 }
505                                 if (obddev->u.cli.cl_max_mds_cookiesize <
506                                                         body->max_cookiesize) {
507                                         obddev->u.cli.cl_max_mds_cookiesize =
508                                                 body->max_cookiesize;
509                                         CDEBUG(D_INFO, "cookiesize become %d\n",
510                                                body->max_cookiesize);
511                                 }
512                         }
513                         /* We save the reply LOV EA in case we have to replay
514                          * a create for recovery.  If we didn't allocate a
515                          * large enough request buffer above we need to
516                          * reallocate it here to hold the actual LOV EA. */
517                         if (it->it_op & IT_OPEN) {
518                                 int offset = DLM_INTENT_REC_OFF + 2;
519
520                                 if (lustre_msg_buflen(req->rq_reqmsg, offset) <
521                                     body->eadatasize)
522                                         mdc_realloc_openmsg(req, body, size);
523
524                                 lmm = lustre_msg_buf(req->rq_reqmsg, offset,
525                                                      body->eadatasize);
526                                 if (lmm)
527                                         memcpy(lmm, eadata, body->eadatasize);
528                         }
529                 }
530         }
531
532         RETURN(rc);
533 }
534 EXPORT_SYMBOL(mdc_enqueue);
535
536 /* 
537  * This long block is all about fixing up the lock and request state
538  * so that it is correct as of the moment _before_ the operation was
539  * applied; that way, the VFS will think that everything is normal and
540  * call Lustre's regular VFS methods.
541  *
542  * If we're performing a creation, that means that unless the creation
543  * failed with EEXIST, we should fake up a negative dentry.
544  *
545  * For everything else, we want to lookup to succeed.
546  *
547  * One additional note: if CREATE or OPEN succeeded, we add an extra
548  * reference to the request because we need to keep it around until
549  * ll_create/ll_open gets called.
550  *
551  * The server will return to us, in it_disposition, an indication of
552  * exactly what d.lustre.it_status refers to.
553  *
554  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
555  * otherwise if DISP_OPEN_CREATE is set, then it status is the
556  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
557  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
558  * was successful.
559  *
560  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
561  * child lookup.
562  */
563 int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
564                     void *lmm, int lmmsize, struct lookup_intent *it,
565                     int lookup_flags, struct ptlrpc_request **reqp,
566                     ldlm_blocking_callback cb_blocking, int extra_lock_flags)
567 {
568         struct lustre_handle lockh;
569         struct ptlrpc_request *request;
570         int rc = 0;
571         struct mds_body *mds_body;
572         struct lustre_handle old_lock;
573         struct ldlm_lock *lock;
574         ENTRY;
575         LASSERT(it);
576
577         CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
578                op_data->namelen, op_data->name, op_data->fid1.id,
579                ldlm_it2str(it->it_op), it->it_flags);
580
581         if (op_data->fid2.id &&
582             (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
583                 /* We could just return 1 immediately, but since we should only
584                  * be called in revalidate_it if we already have a lock, let's
585                  * verify that. */
586                 struct ldlm_res_id res_id = {.name ={op_data->fid2.id,
587                                                      op_data->fid2.generation}};
588                 struct lustre_handle lockh;
589                 ldlm_policy_data_t policy;
590                 int mode = LCK_CR;
591
592                 /* As not all attributes are kept under update lock, e.g. 
593                    owner/group/acls are under lookup lock, we need both 
594                    ibits for GETATTR. */
595                 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
596                         MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
597                         MDS_INODELOCK_LOOKUP;
598
599                 rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
600                                      LDLM_FL_BLOCK_GRANTED, &res_id,
601                                      LDLM_IBITS, &policy, LCK_CR, &lockh);
602                 if (!rc) {
603                         mode = LCK_CW;
604                         rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
605                                              LDLM_FL_BLOCK_GRANTED, &res_id,
606                                              LDLM_IBITS, &policy,LCK_CW,&lockh);
607                 }
608                 if (!rc) {
609                         mode = LCK_PR;
610                         rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
611                                              LDLM_FL_BLOCK_GRANTED, &res_id,
612                                              LDLM_IBITS, &policy,LCK_PR,&lockh);
613                 }
614                 if (rc) {
615                         memcpy(&it->d.lustre.it_lock_handle, &lockh,
616                                sizeof(lockh));
617                         it->d.lustre.it_lock_mode = mode;
618                 }
619
620                 /* Only return failure if it was not GETATTR by cfid
621                    (from inode_revalidate) */
622                 if (rc || op_data->namelen != 0)
623                         RETURN(rc);
624         }
625
626         /* lookup_it may be called only after revalidate_it has run, because
627          * revalidate_it cannot return errors, only zero.  Returning zero causes
628          * this call to lookup, which *can* return an error.
629          *
630          * We only want to execute the request associated with the intent one
631          * time, however, so don't send the request again.  Instead, skip past
632          * this and use the request from revalidate.  In this case, revalidate
633          * never dropped its reference, so the refcounts are all OK */
634         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
635                 struct ldlm_enqueue_info einfo =
636                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
637                           ldlm_completion_ast, NULL, NULL };
638
639                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
640                                  lmm, lmmsize, extra_lock_flags);
641                 if (rc < 0)
642                         RETURN(rc);
643                 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
644         } else if (!op_data->fid2.id) {
645                 /* DISP_ENQ_COMPLETE set means there is extra reference on
646                  * request referenced from this intent, saved for subsequent
647                  * lookup.  This path is executed when we proceed to this
648                  * lookup, so we clear DISP_ENQ_COMPLETE */
649                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
650         }
651         request = *reqp = it->d.lustre.it_data;
652         LASSERT(request != NULL);
653         LASSERT(request != LP_POISON);
654         LASSERT(request->rq_repmsg != LP_POISON);
655
656         if (!it_disposition(it, DISP_IT_EXECD)) {
657                 /* The server failed before it even started executing the
658                  * intent, i.e. because it couldn't unpack the request. */
659                 LASSERT(it->d.lustre.it_status != 0);
660                 RETURN(it->d.lustre.it_status);
661         }
662         rc = it_open_error(DISP_IT_EXECD, it);
663         if (rc)
664                 RETURN(rc);
665
666         mds_body = lustre_msg_buf(request->rq_repmsg, DLM_REPLY_REC_OFF,
667                                   sizeof(*mds_body));
668         LASSERT(mds_body != NULL);           /* mdc_enqueue checked */
669         LASSERT_REPSWABBED(request, 1); /* mdc_enqueue swabbed */
670
671         /* If we were revalidating a fid/name pair, mark the intent in
672          * case we fail and get called again from lookup */
673         if (op_data->fid2.id && (it->it_op != IT_GETATTR)) {
674                 it_set_disposition(it, DISP_ENQ_COMPLETE);
675                 /* Also: did we find the same inode? */
676                 if (memcmp(&op_data->fid2, &mds_body->fid1,
677                            sizeof(op_data->fid2)))
678                         RETURN (-ESTALE);
679         }
680
681         rc = it_open_error(DISP_LOOKUP_EXECD, it);
682         if (rc)
683                 RETURN(rc);
684
685         /* keep requests around for the multiple phases of the call
686          * this shows the DISP_XX must guarantee we make it into the call
687          */
688         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
689             it_disposition(it, DISP_OPEN_CREATE) &&
690             !it_open_error(DISP_OPEN_CREATE, it)) {
691                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
692                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
693         }
694         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
695             it_disposition(it, DISP_OPEN_OPEN) &&
696             !it_open_error(DISP_OPEN_OPEN, it)) {
697                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
698                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
699                 /* BUG 11546 - eviction in the middle of open rpc processing */
700                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
701         }
702
703         if (it->it_op & IT_CREAT) {
704                 /* XXX this belongs in ll_create_it */
705         } else if (it->it_op == IT_OPEN) {
706                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
707         } else {
708                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
709         }
710
711         /* If we already have a matching lock, then cancel the new
712          * one.  We have to set the data here instead of in
713          * mdc_enqueue, because we need to use the child's inode as
714          * the l_ast_data to match, and that's not available until
715          * intent_finish has performed the iget().) */
716         lock = ldlm_handle2lock(&lockh);
717         if (lock) {
718                 ldlm_policy_data_t policy = lock->l_policy_data;
719                 LDLM_DEBUG(lock, "matching against this");
720                 LDLM_LOCK_PUT(lock);
721                 memcpy(&old_lock, &lockh, sizeof(lockh));
722                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
723                                     LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
724                         ldlm_lock_decref_and_cancel(&lockh,
725                                                     it->d.lustre.it_lock_mode);
726                         memcpy(&lockh, &old_lock, sizeof(old_lock));
727                         memcpy(&it->d.lustre.it_lock_handle, &lockh,
728                                sizeof(lockh));
729                 }
730         }
731         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
732                op_data->namelen, op_data->name, ldlm_it2str(it->it_op),
733                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
734
735         RETURN(rc);
736 }
737 EXPORT_SYMBOL(mdc_intent_lock);