Whamcloud - gitweb
land b1_5 onto HEAD
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_MDC
29
30 #ifdef __KERNEL__
31 # include <linux/module.h>
32 # include <linux/pagemap.h>
33 # include <linux/miscdevice.h>
34 # include <linux/init.h>
35 #else
36 # include <liblustre.h>
37 #endif
38
39 #include <obd_class.h>
40 #include <lustre_dlm.h>
41 #include <lprocfs_status.h>
42 #include "mdc_internal.h"
43
44 int it_disposition(struct lookup_intent *it, int flag)
45 {
46         return it->d.lustre.it_disposition & flag;
47 }
48 EXPORT_SYMBOL(it_disposition);
49
50 void it_set_disposition(struct lookup_intent *it, int flag)
51 {
52         it->d.lustre.it_disposition |= flag;
53 }
54 EXPORT_SYMBOL(it_set_disposition);
55
56 void it_clear_disposition(struct lookup_intent *it, int flag)
57 {
58         it->d.lustre.it_disposition &= ~flag;
59 }
60 EXPORT_SYMBOL(it_clear_disposition);
61
62 static int it_to_lock_mode(struct lookup_intent *it)
63 {
64         /* CREAT needs to be tested before open (both could be set) */
65         if (it->it_op & IT_CREAT)
66                 return LCK_CW;
67         else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP))
68                 return LCK_CR;
69
70         LBUG();
71         RETURN(-EINVAL);
72 }
73
74 int it_open_error(int phase, struct lookup_intent *it)
75 {
76         if (it_disposition(it, DISP_OPEN_OPEN)) {
77                 if (phase >= DISP_OPEN_OPEN)
78                         return it->d.lustre.it_status;
79                 else
80                         return 0;
81         }
82
83         if (it_disposition(it, DISP_OPEN_CREATE)) {
84                 if (phase >= DISP_OPEN_CREATE)
85                         return it->d.lustre.it_status;
86                 else
87                         return 0;
88         }
89
90         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
91                 if (phase >= DISP_LOOKUP_EXECD)
92                         return it->d.lustre.it_status;
93                 else
94                         return 0;
95         }
96
97         if (it_disposition(it, DISP_IT_EXECD)) {
98                 if (phase >= DISP_IT_EXECD)
99                         return it->d.lustre.it_status;
100                 else
101                         return 0;
102         }
103         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
104                it->d.lustre.it_status);
105         LBUG();
106         return 0;
107 }
108 EXPORT_SYMBOL(it_open_error);
109
110 /* this must be called on a lockh that is known to have a referenced lock */
111 void mdc_set_lock_data(__u64 *l, void *data)
112 {
113         struct ldlm_lock *lock;
114         struct lustre_handle *lockh = (struct lustre_handle *)l;
115         ENTRY;
116
117         if (!*l) {
118                 EXIT;
119                 return;
120         }
121
122         lock = ldlm_handle2lock(lockh);
123
124         LASSERT(lock != NULL);
125         lock_res_and_lock(lock);
126 #ifdef __KERNEL__
127         if (lock->l_ast_data && lock->l_ast_data != data) {
128                 struct inode *new_inode = data;
129                 struct inode *old_inode = lock->l_ast_data;
130                 LASSERTF(old_inode->i_state & I_FREEING,
131                          "Found existing inode %p/%lu/%u state %lu in lock: "
132                          "setting data to %p/%lu/%u\n", old_inode,
133                          old_inode->i_ino, old_inode->i_generation,
134                          old_inode->i_state,
135                          new_inode, new_inode->i_ino, new_inode->i_generation);
136         }
137 #endif
138         lock->l_ast_data = data;
139         unlock_res_and_lock(lock);
140         LDLM_LOCK_PUT(lock);
141
142         EXIT;
143 }
144 EXPORT_SYMBOL(mdc_set_lock_data);
145
146 int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid, 
147                       ldlm_iterator_t it, void *data)
148 {
149         struct ldlm_res_id res_id = { .name = {0} };
150         ENTRY;
151
152         res_id.name[0] = fid->id;
153         res_id.name[1] = fid->generation;
154
155         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
156                               it, data);
157
158         EXIT;
159         return 0;
160 }
161
162 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
163 {
164         /* Don't hold error requests for replay. */
165         if (req->rq_replay) {
166                 spin_lock(&req->rq_lock);
167                 req->rq_replay = 0;
168                 spin_unlock(&req->rq_lock);
169         }
170         if (rc && req->rq_transno != 0) {
171                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
172                 LBUG();
173         }
174 }
175
176 static int round_up(int val)
177 {
178         int ret = 1;
179         while (val) {
180                 val >>= 1;
181                 ret <<= 1;
182         }
183         return ret;
184 }
185
186 /* Save a large LOV EA into the request buffer so that it is available
187  * for replay.  We don't do this in the initial request because the
188  * original request doesn't need this buffer (at most it sends just the
189  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
190  * buffer and may also be difficult to allocate and save a very large
191  * request buffer for each open. (bug 5707)
192  *
193  * OOM here may cause recovery failure if lmm is needed (only for the
194  * original open if the MDS crashed just when this client also OOM'd)
195  * but this is incredibly unlikely, and questionable whether the client
196  * could do MDS recovery under OOM anyways... */
197 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
198                                 struct mds_body *body, int size[6])
199 {
200         int new_size, old_size;
201         struct lustre_msg *new_msg;
202
203         /* save old size */
204         old_size = lustre_msg_size(lustre_request_magic(req), 6, size);
205
206         size[DLM_INTENT_REC_OFF + 2] = body->eadatasize;
207         new_size = lustre_msg_size(lustre_request_magic(req), 6, size);
208         OBD_ALLOC(new_msg, new_size);
209         if (new_msg != NULL) {
210                 struct lustre_msg *old_msg = req->rq_reqmsg;
211
212                 DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u\n",
213                           body->eadatasize);
214                 memcpy(new_msg, old_msg, old_size);
215                 lustre_msg_set_buflen(new_msg, DLM_INTENT_REC_OFF + 2,
216                                       body->eadatasize);
217
218                 spin_lock(&req->rq_lock);
219                 req->rq_reqmsg = new_msg;
220                 req->rq_reqlen = new_size;
221                 spin_unlock(&req->rq_lock);
222
223                 OBD_FREE(old_msg, old_size);
224         } else {
225                 body->valid &= ~OBD_MD_FLEASIZE;
226                 body->eadatasize = 0;
227         }
228 }
229
230 /* We always reserve enough space in the reply packet for a stripe MD, because
231  * we don't know in advance the file type. */
232 int mdc_enqueue(struct obd_export *exp,
233                 int lock_type,
234                 struct lookup_intent *it,
235                 int lock_mode,
236                 struct mdc_op_data *data,
237                 struct lustre_handle *lockh,
238                 void *lmm,
239                 int lmmsize,
240                 ldlm_completion_callback cb_completion,
241                 ldlm_blocking_callback cb_blocking,
242                 void *cb_data, int extra_lock_flags)
243 {
244         struct ptlrpc_request *req;
245         struct obd_device *obddev = class_exp2obd(exp);
246         struct ldlm_res_id res_id =
247                 { .name = {data->fid1.id, data->fid1.generation} };
248         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
249         struct ldlm_request *lockreq;
250         struct ldlm_intent *lit;
251         struct ldlm_reply *lockrep;
252         int size[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
253                         [DLM_LOCKREQ_OFF]     = sizeof(*lockreq),
254                         [DLM_INTENT_IT_OFF]   = sizeof(*lit) };
255         int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
256                            [DLM_LOCKREPLY_OFF]   = sizeof(*lockrep),
257                            [DLM_REPLY_REC_OFF]   = sizeof(struct mds_body),
258                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
259                                                    cl_max_mds_easize };
260         int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
261         int repbufcnt = 4, rc;
262         void *eadata;
263         ENTRY;
264
265         LASSERTF(lock_type == LDLM_IBITS, "lock type %d\n", lock_type);
266 //        LDLM_DEBUG_NOLOCK("mdsintent=%s,name=%s,dir=%lu",
267 //                          ldlm_it2str(it->it_op), it_name, it_inode->i_ino);
268
269         if (it->it_op & IT_OPEN) {
270                 it->it_create_mode |= S_IFREG;
271
272                 size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create);
273                 size[DLM_INTENT_REC_OFF + 1] = data->namelen + 1;
274                 /* As an optimization, we allocate an RPC request buffer for
275                  * at least a default-sized LOV EA even if we aren't sending
276                  * one.  We grow the whole request to the next power-of-two
277                  * size since we get that much from a slab allocation anyways.
278                  * This avoids an allocation below in the common case where
279                  * we need to save a default-sized LOV EA for open replay. */
280                 size[DLM_INTENT_REC_OFF + 2] = max(lmmsize,
281                                           obddev->u.cli.cl_default_mds_easize);
282                 rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, 6,
283                                      size);
284                 if (rc & (rc - 1))
285                         size[DLM_INTENT_REC_OFF + 2] =
286                                  min(size[DLM_INTENT_REC_OFF+2]+round_up(rc)-rc,
287                                      obddev->u.cli.cl_max_mds_easize);
288
289                 if (it->it_flags & O_JOIN_FILE) {
290                         __u64 head_size = *(__u32*)cb_data;
291                         __u32 tsize = *(__u32*)lmm;
292
293                         /* join is like an unlink of the tail */
294                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
295                         size[DLM_INTENT_REC_OFF + 3] =
296                                                  sizeof(struct mds_rec_join);
297                         req = ptlrpc_prep_req(class_exp2cliimp(exp),
298                                               LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
299                                               7, size, NULL);
300                         /* when joining file, cb_data and lmm args together
301                          * indicate the head file size*/
302                         mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, data,
303                                       (head_size << 32) | tsize);
304                         cb_data = NULL;
305                         lmm = NULL;
306                 } else {
307                         req = ptlrpc_prep_req(class_exp2cliimp(exp),
308                                               LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
309                                               6, size, NULL);
310                 }
311
312                 if (!req)
313                         RETURN(-ENOMEM);
314
315                 spin_lock(&req->rq_lock);
316                 req->rq_replay = 1;
317                 spin_unlock(&req->rq_lock);
318
319                 /* pack the intent */
320                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
321                                      sizeof(*lit));
322                 lit->opc = (__u64)it->it_op;
323
324                 /* pack the intended request */
325                 mdc_open_pack(req, DLM_INTENT_REC_OFF, data, it->it_create_mode,
326                               0, it->it_flags, lmm, lmmsize);
327
328                 repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
329         } else if (it->it_op & IT_UNLINK) {
330                 size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_unlink);
331                 size[DLM_INTENT_REC_OFF + 1] = data->namelen + 1;
332                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
333                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
334                                       LDLM_ENQUEUE, 5, size, NULL);
335                 if (!req)
336                         RETURN(-ENOMEM);
337
338                 /* pack the intent */
339                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
340                                      sizeof(*lit));
341                 lit->opc = (__u64)it->it_op;
342
343                 /* pack the intended request */
344                 mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data);
345
346                 repsize[repbufcnt++] = obddev->u.cli.cl_max_mds_cookiesize;
347         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
348                 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
349                                   OBD_MD_FLACL | OBD_MD_FLMODEASIZE |
350                                   OBD_MD_FLDIREA;
351                 size[DLM_INTENT_REC_OFF] = sizeof(struct mds_body);
352                 size[DLM_INTENT_REC_OFF + 1] = data->namelen + 1;
353
354                 if (it->it_op & IT_GETATTR)
355                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
356
357                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
358                                       LDLM_ENQUEUE, 5, size, NULL);
359                 if (!req)
360                         RETURN(-ENOMEM);
361
362                 /* pack the intent */
363                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
364                                      sizeof(*lit));
365                 lit->opc = (__u64)it->it_op;
366
367                 /* pack the intended request */
368                 mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid,
369                                  it->it_flags, data);
370
371                 repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
372         } else if (it->it_op == IT_READDIR) {
373                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
374                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
375                                       LDLM_ENQUEUE, 2, size, NULL);
376                 if (!req)
377                         RETURN(-ENOMEM);
378
379                 repbufcnt = 2;
380         } else {
381                 LBUG();
382                 RETURN(-EINVAL);
383         }
384
385         /* get ready for the reply */
386         ptlrpc_req_set_repsize(req, repbufcnt, repsize);
387
388          /* It is important to obtain rpc_lock first (if applicable), so that
389           * threads that are serialised with rpc_lock are not polluting our
390           * rpcs in flight counter */
391         mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
392         mdc_enter_request(&obddev->u.cli);
393         rc = ldlm_cli_enqueue(exp, &req, res_id, lock_type, &policy,
394                               lock_mode, &flags, cb_blocking, cb_completion,
395                               NULL, cb_data, NULL, 0, NULL, lockh, 0);
396         mdc_exit_request(&obddev->u.cli);
397         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
398
399         /* Similarly, if we're going to replay this request, we don't want to
400          * actually get a lock, just perform the intent. */
401         if (req->rq_transno || req->rq_replay) {
402                 lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
403                                          sizeof(*lockreq));
404                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
405         }
406
407         /* This can go when we're sure that this can never happen */
408         LASSERT(rc != -ENOENT);
409         if (rc == ELDLM_LOCK_ABORTED) {
410                 lock_mode = 0;
411                 memset(lockh, 0, sizeof(*lockh));
412                 rc = 0;
413         } else if (rc != 0) {
414                 CERROR("ldlm_cli_enqueue: %d\n", rc);
415                 LASSERTF(rc < 0, "rc %d\n", rc);
416                 mdc_clear_replay_flag(req, rc);
417                 ptlrpc_req_finished(req);
418                 RETURN(rc);
419         } else { /* rc = 0 */
420                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
421                 LASSERT(lock);
422
423                 /* If the server gave us back a different lock mode, we should
424                  * fix up our variables. */
425                 if (lock->l_req_mode != lock_mode) {
426                         ldlm_lock_addref(lockh, lock->l_req_mode);
427                         ldlm_lock_decref(lockh, lock_mode);
428                         lock_mode = lock->l_req_mode;
429                 }
430                 LDLM_LOCK_PUT(lock);
431         }
432
433         lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
434                                  sizeof(*lockrep));
435         LASSERT(lockrep != NULL);                 /* checked by ldlm_cli_enqueue() */
436         LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF); /* swabbed by ldlm_cli_enqueue() */
437
438         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
439         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
440         it->d.lustre.it_lock_mode = lock_mode;
441         it->d.lustre.it_data = req;
442
443         if (it->d.lustre.it_status < 0 && req->rq_replay)
444                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
445
446         /* If we're doing an IT_OPEN which did not result in an actual
447          * successful open, then we need to remove the bit which saves
448          * this request for unconditional replay.
449          *
450          * It's important that we do this first!  Otherwise we might exit the
451          * function without doing so, and try to replay a failed create
452          * (bug 3440) */
453         if (it->it_op & IT_OPEN && req->rq_replay &&
454             (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
455                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
456
457         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
458                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
459
460         /* We know what to expect, so we do any byte flipping required here */
461         LASSERT(repbufcnt == 5 || repbufcnt == 2);
462         if (repbufcnt == 5) {
463                 struct mds_body *body;
464
465                 body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
466                                          lustre_swab_mds_body);
467                 if (body == NULL) {
468                         CERROR ("Can't swab mds_body\n");
469                         RETURN (-EPROTO);
470                 }
471
472                 /* If this is a successful OPEN request, we need to set
473                    replay handler and data early, so that if replay happens
474                    immediately after swabbing below, new reply is swabbed
475                    by that handler correctly */
476                 if (it_disposition(it, DISP_OPEN_OPEN) &&
477                     !it_open_error(DISP_OPEN_OPEN, it))
478                         mdc_set_open_replay_data(NULL, req);
479
480                 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
481                         /* The eadata is opaque; just check that it is there.
482                          * Eventually, obd_unpackmd() will check the contents */
483                         eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
484                                                     body->eadatasize, NULL);
485                         if (eadata == NULL) {
486                                 CERROR ("Missing/short eadata\n");
487                                 RETURN (-EPROTO);
488                         }
489                         if (body->valid & OBD_MD_FLMODEASIZE) {
490                                 if (obddev->u.cli.cl_max_mds_easize < 
491                                                         body->max_mdsize) {
492                                         obddev->u.cli.cl_max_mds_easize = 
493                                                 body->max_mdsize;
494                                         CDEBUG(D_INFO, "maxeasize become %d\n",
495                                                body->max_mdsize);
496                                 }
497                                 if (obddev->u.cli.cl_max_mds_cookiesize <
498                                                         body->max_cookiesize) {
499                                         obddev->u.cli.cl_max_mds_cookiesize =
500                                                 body->max_cookiesize;
501                                         CDEBUG(D_INFO, "cookiesize become %d\n",
502                                                body->max_cookiesize);
503                                 }
504                         }
505                         /* We save the reply LOV EA in case we have to replay
506                          * a create for recovery.  If we didn't allocate a
507                          * large enough request buffer above we need to
508                          * reallocate it here to hold the actual LOV EA. */
509                         if (it->it_op & IT_OPEN) {
510                                 int offset = DLM_INTENT_REC_OFF + 2;
511
512                                 if (lustre_msg_buflen(req->rq_reqmsg, offset) <
513                                     body->eadatasize)
514                                         mdc_realloc_openmsg(req, body, size);
515
516                                 lmm = lustre_msg_buf(req->rq_reqmsg, offset,
517                                                      body->eadatasize);
518                                 if (lmm)
519                                         memcpy(lmm, eadata, body->eadatasize);
520                         }
521                 }
522         }
523
524         RETURN(rc);
525 }
526 EXPORT_SYMBOL(mdc_enqueue);
527
528 /* 
529  * This long block is all about fixing up the lock and request state
530  * so that it is correct as of the moment _before_ the operation was
531  * applied; that way, the VFS will think that everything is normal and
532  * call Lustre's regular VFS methods.
533  *
534  * If we're performing a creation, that means that unless the creation
535  * failed with EEXIST, we should fake up a negative dentry.
536  *
537  * For everything else, we want to lookup to succeed.
538  *
539  * One additional note: if CREATE or OPEN succeeded, we add an extra
540  * reference to the request because we need to keep it around until
541  * ll_create/ll_open gets called.
542  *
543  * The server will return to us, in it_disposition, an indication of
544  * exactly what d.lustre.it_status refers to.
545  *
546  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
547  * otherwise if DISP_OPEN_CREATE is set, then it status is the
548  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
549  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
550  * was successful.
551  *
552  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
553  * child lookup.
554  */
555 int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
556                     void *lmm, int lmmsize, struct lookup_intent *it,
557                     int lookup_flags, struct ptlrpc_request **reqp,
558                     ldlm_blocking_callback cb_blocking, int extra_lock_flags)
559 {
560         struct lustre_handle lockh;
561         struct ptlrpc_request *request;
562         int rc = 0;
563         struct mds_body *mds_body;
564         struct lustre_handle old_lock;
565         struct ldlm_lock *lock;
566         ENTRY;
567         LASSERT(it);
568
569         CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
570                op_data->namelen, op_data->name, op_data->fid1.id,
571                ldlm_it2str(it->it_op), it->it_flags);
572
573         if (op_data->fid2.id &&
574             (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
575                 /* We could just return 1 immediately, but since we should only
576                  * be called in revalidate_it if we already have a lock, let's
577                  * verify that. */
578                 struct ldlm_res_id res_id = {.name ={op_data->fid2.id,
579                                                      op_data->fid2.generation}};
580                 struct lustre_handle lockh;
581                 ldlm_policy_data_t policy;
582                 int mode = LCK_CR;
583
584                 /* As not all attributes are kept under update lock, e.g. 
585                    owner/group/acls are under lookup lock, we need both 
586                    ibits for GETATTR. */
587                 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
588                         MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
589                         MDS_INODELOCK_LOOKUP;
590
591                 rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
592                                      LDLM_FL_BLOCK_GRANTED, &res_id,
593                                      LDLM_IBITS, &policy, LCK_CR, &lockh);
594                 if (!rc) {
595                         mode = LCK_CW;
596                         rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
597                                              LDLM_FL_BLOCK_GRANTED, &res_id,
598                                              LDLM_IBITS, &policy,LCK_CW,&lockh);
599                 }
600                 if (!rc) {
601                         mode = LCK_PR;
602                         rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
603                                              LDLM_FL_BLOCK_GRANTED, &res_id,
604                                              LDLM_IBITS, &policy,LCK_PR,&lockh);
605                 }
606                 if (rc) {
607                         memcpy(&it->d.lustre.it_lock_handle, &lockh,
608                                sizeof(lockh));
609                         it->d.lustre.it_lock_mode = mode;
610                 }
611
612                 /* Only return failure if it was not GETATTR by cfid
613                    (from inode_revalidate) */
614                 if (rc || op_data->namelen != 0)
615                         RETURN(rc);
616         }
617
618         /* lookup_it may be called only after revalidate_it has run, because
619          * revalidate_it cannot return errors, only zero.  Returning zero causes
620          * this call to lookup, which *can* return an error.
621          *
622          * We only want to execute the request associated with the intent one
623          * time, however, so don't send the request again.  Instead, skip past
624          * this and use the request from revalidate.  In this case, revalidate
625          * never dropped its reference, so the refcounts are all OK */
626         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
627
628                 rc = mdc_enqueue(exp, LDLM_IBITS, it, it_to_lock_mode(it),
629                                  op_data, &lockh, lmm, lmmsize,
630                                  ldlm_completion_ast, cb_blocking, NULL,
631                                  extra_lock_flags);
632                 if (rc < 0)
633                         RETURN(rc);
634                 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
635         } else if (!op_data->fid2.id) {
636                 /* DISP_ENQ_COMPLETE set means there is extra reference on
637                  * request referenced from this intent, saved for subsequent
638                  * lookup.  This path is executed when we proceed to this
639                  * lookup, so we clear DISP_ENQ_COMPLETE */
640                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
641         }
642         request = *reqp = it->d.lustre.it_data;
643         LASSERT(request != NULL);
644         LASSERT(request != LP_POISON);
645         LASSERT(request->rq_repmsg != LP_POISON);
646
647         if (!it_disposition(it, DISP_IT_EXECD)) {
648                 /* The server failed before it even started executing the
649                  * intent, i.e. because it couldn't unpack the request. */
650                 LASSERT(it->d.lustre.it_status != 0);
651                 RETURN(it->d.lustre.it_status);
652         }
653         rc = it_open_error(DISP_IT_EXECD, it);
654         if (rc)
655                 RETURN(rc);
656
657         mds_body = lustre_msg_buf(request->rq_repmsg, DLM_REPLY_REC_OFF,
658                                   sizeof(*mds_body));
659         LASSERT(mds_body != NULL);           /* mdc_enqueue checked */
660         LASSERT_REPSWABBED(request, 1); /* mdc_enqueue swabbed */
661
662         /* If we were revalidating a fid/name pair, mark the intent in
663          * case we fail and get called again from lookup */
664         if (op_data->fid2.id && (it->it_op != IT_GETATTR)) {
665                 it_set_disposition(it, DISP_ENQ_COMPLETE);
666                 /* Also: did we find the same inode? */
667                 if (memcmp(&op_data->fid2, &mds_body->fid1,
668                            sizeof(op_data->fid2)))
669                         RETURN (-ESTALE);
670         }
671
672         rc = it_open_error(DISP_LOOKUP_EXECD, it);
673         if (rc)
674                 RETURN(rc);
675
676         /* keep requests around for the multiple phases of the call
677          * this shows the DISP_XX must guarantee we make it into the call
678          */
679         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
680             it_disposition(it, DISP_OPEN_CREATE) &&
681             !it_open_error(DISP_OPEN_CREATE, it)) {
682                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
683                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
684         }
685         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
686             it_disposition(it, DISP_OPEN_OPEN) &&
687             !it_open_error(DISP_OPEN_OPEN, it)) {
688                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
689                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
690         }
691
692         if (it->it_op & IT_CREAT) {
693                 /* XXX this belongs in ll_create_it */
694         } else if (it->it_op == IT_OPEN) {
695                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
696         } else {
697                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
698         }
699
700         /* If we already have a matching lock, then cancel the new
701          * one.  We have to set the data here instead of in
702          * mdc_enqueue, because we need to use the child's inode as
703          * the l_ast_data to match, and that's not available until
704          * intent_finish has performed the iget().) */
705         lock = ldlm_handle2lock(&lockh);
706         if (lock) {
707                 ldlm_policy_data_t policy = lock->l_policy_data;
708                 LDLM_DEBUG(lock, "matching against this");
709                 LDLM_LOCK_PUT(lock);
710                 memcpy(&old_lock, &lockh, sizeof(lockh));
711                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
712                                     LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
713                         ldlm_lock_decref_and_cancel(&lockh,
714                                                     it->d.lustre.it_lock_mode);
715                         memcpy(&lockh, &old_lock, sizeof(old_lock));
716                         memcpy(&it->d.lustre.it_lock_handle, &lockh,
717                                sizeof(lockh));
718                 }
719         }
720         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
721                op_data->namelen, op_data->name, ldlm_it2str(it->it_op),
722                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
723
724         RETURN(rc);
725 }
726 EXPORT_SYMBOL(mdc_intent_lock);