Whamcloud - gitweb
Branch b1_8
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_MDC
29
30 #ifdef __KERNEL__
31 # include <linux/module.h>
32 # include <linux/pagemap.h>
33 # include <linux/miscdevice.h>
34 # include <linux/init.h>
35 #else
36 # include <liblustre.h>
37 #endif
38
39 #include <obd_class.h>
40 #include <lustre_dlm.h>
41 #include <lprocfs_status.h>
42 #include "mdc_internal.h"
43
44 int it_disposition(struct lookup_intent *it, int flag)
45 {
46         return it->d.lustre.it_disposition & flag;
47 }
48 EXPORT_SYMBOL(it_disposition);
49
50 void it_set_disposition(struct lookup_intent *it, int flag)
51 {
52         it->d.lustre.it_disposition |= flag;
53 }
54 EXPORT_SYMBOL(it_set_disposition);
55
56 void it_clear_disposition(struct lookup_intent *it, int flag)
57 {
58         it->d.lustre.it_disposition &= ~flag;
59 }
60 EXPORT_SYMBOL(it_clear_disposition);
61
62 int it_open_error(int phase, struct lookup_intent *it)
63 {
64         if (it_disposition(it, DISP_OPEN_OPEN)) {
65                 if (phase >= DISP_OPEN_OPEN)
66                         return it->d.lustre.it_status;
67                 else
68                         return 0;
69         }
70
71         if (it_disposition(it, DISP_OPEN_CREATE)) {
72                 if (phase >= DISP_OPEN_CREATE)
73                         return it->d.lustre.it_status;
74                 else
75                         return 0;
76         }
77
78         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
79                 if (phase >= DISP_LOOKUP_EXECD)
80                         return it->d.lustre.it_status;
81                 else
82                         return 0;
83         }
84
85         if (it_disposition(it, DISP_IT_EXECD)) {
86                 if (phase >= DISP_IT_EXECD)
87                         return it->d.lustre.it_status;
88                 else
89                         return 0;
90         }
91         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
92                it->d.lustre.it_status);
93         LBUG();
94         return 0;
95 }
96 EXPORT_SYMBOL(it_open_error);
97
98 /* this must be called on a lockh that is known to have a referenced lock */
99 void mdc_set_lock_data(__u64 *l, void *data)
100 {
101         struct ldlm_lock *lock;
102         struct lustre_handle *lockh = (struct lustre_handle *)l;
103         ENTRY;
104
105         if (!*l) {
106                 EXIT;
107                 return;
108         }
109
110         lock = ldlm_handle2lock(lockh);
111
112         LASSERT(lock != NULL);
113         lock_res_and_lock(lock);
114 #ifdef __KERNEL__
115         if (lock->l_ast_data && lock->l_ast_data != data) {
116                 struct inode *new_inode = data;
117                 struct inode *old_inode = lock->l_ast_data;
118                 LASSERTF(old_inode->i_state & I_FREEING,
119                          "Found existing inode %p/%lu/%u state %lu in lock: "
120                          "setting data to %p/%lu/%u\n", old_inode,
121                          old_inode->i_ino, old_inode->i_generation,
122                          old_inode->i_state,
123                          new_inode, new_inode->i_ino, new_inode->i_generation);
124         }
125 #endif
126         lock->l_ast_data = data;
127         unlock_res_and_lock(lock);
128         LDLM_LOCK_PUT(lock);
129
130         EXIT;
131 }
132 EXPORT_SYMBOL(mdc_set_lock_data);
133
134 int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid, 
135                       ldlm_iterator_t it, void *data)
136 {
137         struct ldlm_res_id res_id = { .name = {0} };
138         ENTRY;
139
140         res_id.name[0] = fid->id;
141         res_id.name[1] = fid->generation;
142
143         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
144                               it, data);
145
146         EXIT;
147         return 0;
148 }
149
150 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
151 {
152         /* Don't hold error requests for replay. */
153         if (req->rq_replay) {
154                 spin_lock(&req->rq_lock);
155                 req->rq_replay = 0;
156                 spin_unlock(&req->rq_lock);
157         }
158         if (rc && req->rq_transno != 0) {
159                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
160                 LBUG();
161         }
162 }
163
164 static int round_up(int val)
165 {
166         int ret = 1;
167         while (val) {
168                 val >>= 1;
169                 ret <<= 1;
170         }
171         return ret;
172 }
173
174 /* Save a large LOV EA into the request buffer so that it is available
175  * for replay.  We don't do this in the initial request because the
176  * original request doesn't need this buffer (at most it sends just the
177  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
178  * buffer and may also be difficult to allocate and save a very large
179  * request buffer for each open. (bug 5707)
180  *
181  * OOM here may cause recovery failure if lmm is needed (only for the
182  * original open if the MDS crashed just when this client also OOM'd)
183  * but this is incredibly unlikely, and questionable whether the client
184  * could do MDS recovery under OOM anyways... */
185 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
186                                 struct mds_body *body)
187 {
188         int old_len, new_size, old_size;
189         struct lustre_msg *old_msg = req->rq_reqmsg;
190         struct lustre_msg *new_msg;
191
192         old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + 2);
193         old_size = lustre_packed_msg_size(old_msg);
194         lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2,
195                               body->eadatasize);
196         new_size = lustre_packed_msg_size(old_msg);
197
198         OBD_ALLOC(new_msg, new_size);
199         if (new_msg != NULL) {
200                 DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u\n",
201                           body->eadatasize);
202                 memcpy(new_msg, old_msg, old_size);
203
204                 spin_lock(&req->rq_lock);
205                 req->rq_reqmsg = new_msg;
206                 req->rq_reqlen = new_size;
207                 spin_unlock(&req->rq_lock);
208
209                 OBD_FREE(old_msg, old_size);
210         } else {
211                 lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2, old_len);
212                 body->valid &= ~OBD_MD_FLEASIZE;
213                 body->eadatasize = 0;
214         }
215 }
216
217 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
218                                                    struct lookup_intent *it,
219                                                    struct mdc_op_data *data,
220                                                    void *lmm, int lmmsize)
221 {
222         struct ptlrpc_request *req;
223         struct ldlm_intent *lit;
224         struct obd_device *obddev = class_exp2obd(exp);
225         int size[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
226                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
227                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
228                         [DLM_INTENT_REC_OFF]  = sizeof(struct mds_rec_create),
229                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
230                         /* As an optimization, we allocate an RPC request buffer
231                          * for at least a default-sized LOV EA even if we aren't
232                          * sending one.  We grow the whole request to the next
233                          * power-of-two size since we get that much from a slab
234                          * allocation anyways. This avoids an allocation below
235                          * in the common case where we need to save a
236                          * default-sized LOV EA for open replay. */
237                         [DLM_INTENT_REC_OFF+2]= max(lmmsize,
238                                          obddev->u.cli.cl_default_mds_easize) };
239         int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
240                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
241                            [DLM_REPLY_REC_OFF]   = sizeof(struct mds_body),
242                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
243                                                         cl_max_mds_easize,
244                            [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
245         CFS_LIST_HEAD(cancels);
246         int do_join = (it->it_flags & O_JOIN_FILE) && data->data;
247         int count = 0;
248         int mode;
249         int rc;
250
251         it->it_create_mode |= S_IFREG;
252
253         rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, 6, size);
254         if (rc & (rc - 1))
255                 size[DLM_INTENT_REC_OFF + 2] =
256                          min(size[DLM_INTENT_REC_OFF + 2] + round_up(rc) - rc,
257                                      obddev->u.cli.cl_max_mds_easize);
258
259                 /* If inode is known, cancel conflicting OPEN locks. */
260         if (data->fid2.id) {
261                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
262                         mode = LCK_CW;
263 #ifdef FMODE_EXEC
264                 else if (it->it_flags & FMODE_EXEC)
265                         mode = LCK_PR;
266 #endif
267                 else
268                         mode = LCK_CR;
269                 count = mdc_resource_get_unused(exp, &data->fid2, &cancels,
270                                                 mode, MDS_INODELOCK_OPEN);
271         }
272
273         /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
274         if (it->it_op & IT_CREAT || do_join)
275                 mode = LCK_EX;
276         else
277                 mode = LCK_CR;
278         count += mdc_resource_get_unused(exp, &data->fid1, &cancels, mode,
279                                          MDS_INODELOCK_UPDATE);
280         if (do_join) {
281                 __u64 head_size = (*(__u64 *)data->data);
282                         /* join is like an unlink of the tail */
283                 size[DLM_INTENT_REC_OFF + 3] = sizeof(struct mds_rec_join);
284                 req = ldlm_prep_enqueue_req(exp, 7, size, &cancels, count);
285                 if (req)
286                         mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, data, 
287                                       head_size);
288         } else {
289                 req = ldlm_prep_enqueue_req(exp, 6, size, &cancels, count);
290                 it->it_flags &= ~O_JOIN_FILE;
291         }
292
293         if (req) {
294                 spin_lock(&req->rq_lock);
295                 req->rq_replay = 1;
296                 spin_unlock(&req->rq_lock);
297
298                 /* pack the intent */
299                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
300                                      sizeof(*lit));
301                 lit->opc = (__u64)it->it_op;
302
303                 /* pack the intended request */
304                 mdc_open_pack(req, DLM_INTENT_REC_OFF, data,
305                               it->it_create_mode, 0, it->it_flags,
306                               lmm, lmmsize);
307
308                 ptlrpc_req_set_repsize(req, 5, repsize);
309         }
310         return req;
311 }
312
313 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
314                                                      struct lookup_intent *it,
315                                                      struct mdc_op_data *data)
316 {
317         struct ptlrpc_request *req;
318         struct ldlm_intent *lit;
319         struct obd_device *obddev = class_exp2obd(exp);
320         int size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
321                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
322                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
323                         [DLM_INTENT_REC_OFF]  = sizeof(struct mds_rec_unlink),
324                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
325         int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
326                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
327                            [DLM_REPLY_REC_OFF]   = sizeof(struct mds_body),
328                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
329                                                         cl_max_mds_easize,
330                            [DLM_REPLY_REC_OFF+2] = obddev->u.cli.
331                                                         cl_max_mds_cookiesize };
332
333         req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
334         if (req) {
335                 /* pack the intent */
336                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
337                                      sizeof(*lit));
338                 lit->opc = (__u64)it->it_op;
339
340                 /* pack the intended request */
341                 mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data);
342
343                 ptlrpc_req_set_repsize(req, 5, repsize);
344         }
345         return req;
346 }
347
348 static struct ptlrpc_request *mdc_intent_lookup_pack(struct obd_export *exp,
349                                                      struct lookup_intent *it,
350                                                      struct mdc_op_data *data)
351 {
352         struct ptlrpc_request *req;
353         struct ldlm_intent *lit;
354         struct obd_device *obddev = class_exp2obd(exp);
355         int size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
356                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
357                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
358                         [DLM_INTENT_REC_OFF]  = sizeof(struct mds_body),
359                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
360         int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
361                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
362                            [DLM_REPLY_REC_OFF]   = sizeof(struct mds_body),
363                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
364                                                         cl_max_mds_easize,
365                            [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
366         obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLACL |
367                           OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA;
368
369                 req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
370         if (req) {
371                 /* pack the intent */
372                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
373                                      sizeof(*lit));
374                 lit->opc = (__u64)it->it_op;
375
376                 /* pack the intended request */
377                 mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, it->it_flags,
378                                  data);
379                 ptlrpc_req_set_repsize(req, 5, repsize);
380         }
381         return req;
382 }
383
384 static struct ptlrpc_request *mdc_intent_readdir_pack(struct obd_export *exp)
385 {
386         struct ptlrpc_request *req;
387         int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
388                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request) };
389         int repsize[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
390                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply) };
391
392         req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
393         if (req)
394                 ptlrpc_req_set_repsize(req, 2, repsize);
395         return req;
396 }
397
398 static int mdc_finish_enqueue(struct obd_export *exp,
399                               struct ptlrpc_request *req,
400                               struct ldlm_enqueue_info *einfo,
401                               struct lookup_intent *it,
402                               struct lustre_handle *lockh,
403                               int rc)
404 {
405         struct ldlm_request *lockreq;
406         struct ldlm_reply *lockrep;
407         ENTRY;
408
409         LASSERT(rc >= 0);
410         /* Similarly, if we're going to replay this request, we don't want to
411          * actually get a lock, just perform the intent. */
412         if (req->rq_transno || req->rq_replay) {
413                 lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
414                                          sizeof(*lockreq));
415                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
416         }
417
418         if (rc == ELDLM_LOCK_ABORTED) {
419                 einfo->ei_mode = 0;
420                 memset(lockh, 0, sizeof(*lockh));
421                 rc = 0;
422         } else { /* rc = 0 */
423                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
424                 LASSERT(lock);
425
426                 /* If the server gave us back a different lock mode, we should
427                  * fix up our variables. */
428                 if (lock->l_req_mode != einfo->ei_mode) {
429                         ldlm_lock_addref(lockh, lock->l_req_mode);
430                         ldlm_lock_decref(lockh, einfo->ei_mode);
431                         einfo->ei_mode = lock->l_req_mode;
432                 }
433                 LDLM_LOCK_PUT(lock);
434         }
435
436         lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
437                                  sizeof(*lockrep));
438         LASSERT(lockrep != NULL);                 /* checked by ldlm_cli_enqueue() */
439         /* swabbed by ldlm_cli_enqueue() */
440         LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
441
442         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
443         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
444         it->d.lustre.it_lock_mode = einfo->ei_mode;
445         it->d.lustre.it_data = req;
446
447         if (it->d.lustre.it_status < 0 && req->rq_replay)
448                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
449
450         /* If we're doing an IT_OPEN which did not result in an actual
451          * successful open, then we need to remove the bit which saves
452          * this request for unconditional replay.
453          *
454          * It's important that we do this first!  Otherwise we might exit the
455          * function without doing so, and try to replay a failed create
456          * (bug 3440) */
457         if ((it->it_op & IT_OPEN) &&
458             req->rq_replay &&
459             (!it_disposition(it, DISP_OPEN_OPEN) ||
460              it->d.lustre.it_status != 0))
461                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
462
463         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
464                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
465
466         /* We know what to expect, so we do any byte flipping required here */
467         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
468                 struct mds_body *body;
469
470                 body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
471                                          lustre_swab_mds_body);
472                 if (body == NULL) {
473                         CERROR ("Can't swab mds_body\n");
474                         RETURN (-EPROTO);
475                 }
476
477                 /* If this is a successful OPEN request, we need to set
478                    replay handler and data early, so that if replay happens
479                    immediately after swabbing below, new reply is swabbed
480                    by that handler correctly */
481                 if (it_disposition(it, DISP_OPEN_OPEN) &&
482                     !it_open_error(DISP_OPEN_OPEN, it))
483                         mdc_set_open_replay_data(NULL, req);
484
485                 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
486                         void *eadata;
487
488                         /* The eadata is opaque; just check that it is there.
489                          * Eventually, obd_unpackmd() will check the contents */
490                         eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
491                                                     body->eadatasize, NULL);
492                         if (eadata == NULL) {
493                                 CERROR ("Missing/short eadata\n");
494                                 RETURN (-EPROTO);
495                         }
496                         if (body->valid & OBD_MD_FLMODEASIZE) {
497                                 struct obd_device *obddev = class_exp2obd(exp);
498
499                                 if (obddev->u.cli.cl_max_mds_easize < 
500                                                         body->max_mdsize) {
501                                         obddev->u.cli.cl_max_mds_easize = 
502                                                 body->max_mdsize;
503                                         CDEBUG(D_INFO, "maxeasize become %d\n",
504                                                body->max_mdsize);
505                                 }
506                                 if (obddev->u.cli.cl_max_mds_cookiesize <
507                                                         body->max_cookiesize) {
508                                         obddev->u.cli.cl_max_mds_cookiesize =
509                                                 body->max_cookiesize;
510                                         CDEBUG(D_INFO, "cookiesize become %d\n",
511                                                body->max_cookiesize);
512                                 }
513                         }
514                         /* We save the reply LOV EA in case we have to replay
515                          * a create for recovery.  If we didn't allocate a
516                          * large enough request buffer above we need to
517                          * reallocate it here to hold the actual LOV EA. */
518                         if (it->it_op & IT_OPEN) {
519                                 int offset = DLM_INTENT_REC_OFF + 2;
520                                 void *lmm;
521
522                                 if (lustre_msg_buflen(req->rq_reqmsg, offset) <
523                                     body->eadatasize)
524                                         mdc_realloc_openmsg(req, body);
525
526                                 lmm = lustre_msg_buf(req->rq_reqmsg, offset,
527                                                      body->eadatasize);
528                                 if (lmm)
529                                         memcpy(lmm, eadata, body->eadatasize);
530                         }
531                 }
532         }
533
534         RETURN(rc);
535 }
536
537 /* We always reserve enough space in the reply packet for a stripe MD, because
538  * we don't know in advance the file type. */
539 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
540                 struct lookup_intent *it, struct mdc_op_data *data,
541                 struct lustre_handle *lockh, void *lmm, int lmmsize,
542                 int extra_lock_flags)
543 {
544         struct ptlrpc_request *req;
545         struct obd_device *obddev = class_exp2obd(exp);
546         struct ldlm_res_id res_id =
547                 { .name = {data->fid1.id, data->fid1.generation} };
548         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
549         int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
550         int rc;
551         ENTRY;
552
553         LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
554         if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
555                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
556
557         if (it->it_op & IT_OPEN) {
558                 req = mdc_intent_open_pack(exp, it, data, lmm, lmmsize);
559                 if (it->it_flags & O_JOIN_FILE) {
560                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
561                 }
562         } else if (it->it_op & IT_UNLINK) {
563                 req = mdc_intent_unlink_pack(exp, it, data);
564         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
565                 req = mdc_intent_lookup_pack(exp, it, data);
566         } else if (it->it_op == IT_READDIR) {
567                 req = mdc_intent_readdir_pack(exp);
568         } else {
569                 CERROR("bad it_op %x\n", it->it_op);
570                 RETURN(-EINVAL);
571         }
572
573         if (!req)
574                 RETURN(-ENOMEM);
575
576          /* It is important to obtain rpc_lock first (if applicable), so that
577           * threads that are serialised with rpc_lock are not polluting our
578           * rpcs in flight counter */
579         mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
580         mdc_enter_request(&obddev->u.cli);
581         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
582                               0, NULL, lockh, 0);
583         mdc_exit_request(&obddev->u.cli);
584         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
585         if (rc < 0) {
586                 CERROR("ldlm_cli_enqueue: %d\n", rc);
587                 mdc_clear_replay_flag(req, rc);
588                 ptlrpc_req_finished(req);
589                 RETURN(rc);
590         }
591         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
592
593         RETURN(rc);
594 }
595 EXPORT_SYMBOL(mdc_enqueue);
596
597 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
598                         struct ll_fid *fid)
599 {
600                 /* We could just return 1 immediately, but since we should only
601                  * be called in revalidate_it if we already have a lock, let's
602                  * verify that. */
603         struct ldlm_res_id res_id = {.name ={fid->id, fid->generation}};
604         struct lustre_handle lockh;
605         ldlm_policy_data_t policy;
606         ldlm_mode_t mode;
607
608         /* As not all attributes are kept under update lock, e.g. 
609            owner/group/acls are under lookup lock, we need both 
610            ibits for GETATTR. */
611         policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
612                 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
613                 MDS_INODELOCK_LOOKUP;
614
615         mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
616                                LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
617                                &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
618         if (mode) {
619                 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
620                 it->d.lustre.it_lock_mode = mode;
621         }
622
623         return !!mode;
624 }
625 EXPORT_SYMBOL(mdc_revalidate_lock);
626
627 static int mdc_finish_intent_lock(struct obd_export *exp,
628                                   struct ptlrpc_request *req,
629                                   struct mdc_op_data *data,
630                                   struct lookup_intent *it,
631                                   struct lustre_handle *lockh)
632 {
633         struct mds_body *mds_body;
634         struct lustre_handle old_lock;
635         struct ldlm_lock *lock;
636         int rc;
637         ENTRY;
638
639         LASSERT(req != NULL);
640         LASSERT(req != LP_POISON);
641         LASSERT(req->rq_repmsg != LP_POISON);
642
643         if (!it_disposition(it, DISP_IT_EXECD)) {
644                 /* The server failed before it even started executing the
645                  * intent, i.e. because it couldn't unpack the request. */
646                 LASSERT(it->d.lustre.it_status != 0);
647                 RETURN(it->d.lustre.it_status);
648         }
649         rc = it_open_error(DISP_IT_EXECD, it);
650         if (rc)
651                 RETURN(rc);
652
653         mds_body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
654                                   sizeof(*mds_body));
655         /* mdc_enqueue checked */
656         LASSERT(mds_body != NULL);
657         /* mdc_enqueue swabbed */
658         LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
659
660         /* If we were revalidating a fid/name pair, mark the intent in
661          * case we fail and get called again from lookup */
662         if (data->fid2.id && (it->it_op != IT_GETATTR)) {
663                 it_set_disposition(it, DISP_ENQ_COMPLETE);
664                 /* Also: did we find the same inode? */
665                 if (memcmp(&data->fid2, &mds_body->fid1, sizeof(data->fid2))) 
666                         RETURN(-ESTALE);
667         }
668
669         rc = it_open_error(DISP_LOOKUP_EXECD, it);
670         if (rc)
671                 RETURN(rc);
672
673         /* keep requests around for the multiple phases of the call
674          * this shows the DISP_XX must guarantee we make it into the call
675          */
676         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
677             it_disposition(it, DISP_OPEN_CREATE) &&
678             !it_open_error(DISP_OPEN_CREATE, it)) {
679                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
680                 ptlrpc_request_addref(req); /* balanced in ll_create_node */
681         }
682         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
683             it_disposition(it, DISP_OPEN_OPEN) &&
684             !it_open_error(DISP_OPEN_OPEN, it)) {
685                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
686                 ptlrpc_request_addref(req); /* balanced in ll_file_open */
687                 /* BUG 11546 - eviction in the middle of open rpc processing */
688                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
689         }
690
691         if (it->it_op & IT_CREAT) {
692                 /* XXX this belongs in ll_create_it */
693         } else if (it->it_op == IT_OPEN) {
694                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
695         } else {
696                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
697         }
698
699         /* If we already have a matching lock, then cancel the new
700          * one.  We have to set the data here instead of in
701          * mdc_enqueue, because we need to use the child's inode as
702          * the l_ast_data to match, and that's not available until
703          * intent_finish has performed the iget().) */
704         lock = ldlm_handle2lock(lockh);
705         if (lock) {
706                 ldlm_policy_data_t policy = lock->l_policy_data;
707
708                 LDLM_DEBUG(lock, "matching against this");
709                 LDLM_LOCK_PUT(lock);
710                 memcpy(&old_lock, lockh, sizeof(*lockh));
711                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
712                                     LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
713                         ldlm_lock_decref_and_cancel(lockh,
714                                                     it->d.lustre.it_lock_mode);
715                         memcpy(lockh, &old_lock, sizeof(old_lock));
716                         memcpy(&it->d.lustre.it_lock_handle, lockh,
717                                sizeof(*lockh));
718                 }
719         }
720
721         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
722                data->namelen, data->name, ldlm_it2str(it->it_op),
723                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
724         RETURN(rc);
725 }
726
727 /* 
728  * This long block is all about fixing up the lock and request state
729  * so that it is correct as of the moment _before_ the operation was
730  * applied; that way, the VFS will think that everything is normal and
731  * call Lustre's regular VFS methods.
732  *
733  * If we're performing a creation, that means that unless the creation
734  * failed with EEXIST, we should fake up a negative dentry.
735  *
736  * For everything else, we want to lookup to succeed.
737  *
738  * One additional note: if CREATE or OPEN succeeded, we add an extra
739  * reference to the request because we need to keep it around until
740  * ll_create/ll_open gets called.
741  *
742  * The server will return to us, in it_disposition, an indication of
743  * exactly what d.lustre.it_status refers to.
744  *
745  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
746  * otherwise if DISP_OPEN_CREATE is set, then it status is the
747  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
748  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
749  * was successful.
750  *
751  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
752  * child lookup.
753  */
754 int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
755                     void *lmm, int lmmsize, struct lookup_intent *it,
756                     int lookup_flags, struct ptlrpc_request **reqp,
757                     ldlm_blocking_callback cb_blocking, int extra_lock_flags)
758 {
759         struct lustre_handle lockh;
760         int rc;
761         ENTRY;
762
763         LASSERT(it);
764
765         CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
766                op_data->namelen, op_data->name, op_data->fid1.id,
767                ldlm_it2str(it->it_op), it->it_flags);
768
769         lockh.cookie = 0;
770         if (op_data->fid2.id &&
771             (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
772                 rc = mdc_revalidate_lock(exp, it, &op_data->fid2);
773                 /* Only return failure if it was not GETATTR by cfid
774                    (from inode_revalidate) */
775                 if (rc || op_data->namelen != 0)
776                         RETURN(rc);
777         }
778
779         /* lookup_it may be called only after revalidate_it has run, because
780          * revalidate_it cannot return errors, only zero.  Returning zero causes
781          * this call to lookup, which *can* return an error.
782          *
783          * We only want to execute the request associated with the intent one
784          * time, however, so don't send the request again.  Instead, skip past
785          * this and use the request from revalidate.  In this case, revalidate
786          * never dropped its reference, so the refcounts are all OK */
787         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
788                 struct ldlm_enqueue_info einfo =
789                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
790                           ldlm_completion_ast, NULL, NULL };
791
792                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
793                                  lmm, lmmsize, extra_lock_flags);
794                 if (rc < 0)
795                         RETURN(rc);
796                 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
797         } else if (!op_data->fid2.id) {
798                 /* DISP_ENQ_COMPLETE set means there is extra reference on
799                  * request referenced from this intent, saved for subsequent
800                  * lookup.  This path is executed when we proceed to this
801                  * lookup, so we clear DISP_ENQ_COMPLETE */
802                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
803         }
804
805         *reqp = it->d.lustre.it_data;
806         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
807
808         RETURN(rc);
809 }
810 EXPORT_SYMBOL(mdc_intent_lock);
811
812 static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
813                                               void *unused, int rc)
814 {
815         struct mdc_enqueue_args  *ma;
816         struct md_enqueue_info   *minfo;
817         struct ldlm_enqueue_info *einfo;
818         struct obd_export        *exp;
819         struct lookup_intent     *it;
820         struct lustre_handle     *lockh;
821         struct obd_device        *obddev;
822         int                       flags = LDLM_FL_HAS_INTENT;
823         ENTRY;
824
825         ma = (struct mdc_enqueue_args *)&req->rq_async_args;
826         minfo = ma->ma_mi;
827         einfo = ma->ma_ei;
828
829         exp   = minfo->mi_exp;
830         it    = &minfo->mi_it;
831         lockh = &minfo->mi_lockh;
832
833         obddev = class_exp2obd(exp);
834
835         mdc_exit_request(&obddev->u.cli);
836         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
837                 rc = -ETIMEDOUT;
838
839         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
840                                    &flags, NULL, 0, NULL, lockh, rc);
841         if (rc < 0) {
842                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
843                 mdc_clear_replay_flag(req, rc);
844                 GOTO(out, rc);
845         }
846
847         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
848         if (rc)
849                 GOTO(out, rc);
850
851         memcpy(&it->d.lustre.it_lock_handle, lockh, sizeof(*lockh));
852
853         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
854         GOTO(out, rc);
855 out:
856         OBD_FREE_PTR(einfo);
857         minfo->mi_cb(exp, req, minfo, rc);
858
859         return 0;
860 }
861
862 int mdc_intent_getattr_async(struct obd_export *exp,
863                              struct md_enqueue_info *minfo,
864                              struct ldlm_enqueue_info *einfo)
865 {
866         struct mdc_op_data      *op_data = &minfo->mi_data;
867         struct lookup_intent    *it = &minfo->mi_it;
868         struct ptlrpc_request   *req;
869         struct obd_device       *obddev = class_exp2obd(exp);
870         struct ldlm_res_id       res_id = {
871                                         .name = {op_data->fid1.id,
872                                                  op_data->fid1.generation}
873                                  };
874         ldlm_policy_data_t       policy = {
875                                         .l_inodebits = { MDS_INODELOCK_LOOKUP }
876                                  };
877         struct mdc_enqueue_args *aa;
878         int                      rc;
879         int                      flags = LDLM_FL_HAS_INTENT;
880         ENTRY;
881
882         CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
883                op_data->namelen, op_data->name, op_data->fid1.id,
884                ldlm_it2str(it->it_op), it->it_flags);
885
886         req = mdc_intent_lookup_pack(exp, it, op_data);
887         if (!req)
888                 RETURN(-ENOMEM);
889
890         mdc_enter_request(&obddev->u.cli);
891         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
892                               0, NULL, &minfo->mi_lockh, 1);
893         if (rc < 0) {
894                 mdc_exit_request(&obddev->u.cli);
895                 RETURN(rc);
896         }
897
898         CLASSERT(sizeof(*aa) < sizeof(req->rq_async_args));
899         aa = (struct mdc_enqueue_args *)&req->rq_async_args;
900         aa->ma_mi = minfo;
901         aa->ma_ei = einfo;
902         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
903         ptlrpcd_add_req(req);
904
905         RETURN(0);
906 }
907 EXPORT_SYMBOL(mdc_intent_getattr_async);