Whamcloud - gitweb
6f2da48b0506f8cff064c6f2d861f776b5b299fd
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_MDC
29
30 #ifdef __KERNEL__
31 # include <linux/module.h>
32 # include <linux/pagemap.h>
33 # include <linux/miscdevice.h>
34 # include <linux/init.h>
35 #else
36 # include <liblustre.h>
37 #endif
38
39 #include <obd_class.h>
40 #include <lustre_dlm.h>
41 #include <lprocfs_status.h>
42 #include "mdc_internal.h"
43
44 int it_disposition(struct lookup_intent *it, int flag)
45 {
46         return it->d.lustre.it_disposition & flag;
47 }
48 EXPORT_SYMBOL(it_disposition);
49
50 void it_set_disposition(struct lookup_intent *it, int flag)
51 {
52         it->d.lustre.it_disposition |= flag;
53 }
54 EXPORT_SYMBOL(it_set_disposition);
55
56 void it_clear_disposition(struct lookup_intent *it, int flag)
57 {
58         it->d.lustre.it_disposition &= ~flag;
59 }
60 EXPORT_SYMBOL(it_clear_disposition);
61
62 int it_open_error(int phase, struct lookup_intent *it)
63 {
64         if (it_disposition(it, DISP_OPEN_OPEN)) {
65                 if (phase >= DISP_OPEN_OPEN)
66                         return it->d.lustre.it_status;
67                 else
68                         return 0;
69         }
70
71         if (it_disposition(it, DISP_OPEN_CREATE)) {
72                 if (phase >= DISP_OPEN_CREATE)
73                         return it->d.lustre.it_status;
74                 else
75                         return 0;
76         }
77
78         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
79                 if (phase >= DISP_LOOKUP_EXECD)
80                         return it->d.lustre.it_status;
81                 else
82                         return 0;
83         }
84
85         if (it_disposition(it, DISP_IT_EXECD)) {
86                 if (phase >= DISP_IT_EXECD)
87                         return it->d.lustre.it_status;
88                 else
89                         return 0;
90         }
91         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
92                it->d.lustre.it_status);
93         LBUG();
94         return 0;
95 }
96 EXPORT_SYMBOL(it_open_error);
97
98 /* this must be called on a lockh that is known to have a referenced lock */
99 void mdc_set_lock_data(__u64 *l, void *data)
100 {
101         struct ldlm_lock *lock;
102         struct lustre_handle *lockh = (struct lustre_handle *)l;
103         ENTRY;
104
105         if (!*l) {
106                 EXIT;
107                 return;
108         }
109
110         lock = ldlm_handle2lock(lockh);
111
112         LASSERT(lock != NULL);
113         lock_res_and_lock(lock);
114 #ifdef __KERNEL__
115         if (lock->l_ast_data && lock->l_ast_data != data) {
116                 struct inode *new_inode = data;
117                 struct inode *old_inode = lock->l_ast_data;
118                 LASSERTF(old_inode->i_state & I_FREEING,
119                          "Found existing inode %p/%lu/%u state %lu in lock: "
120                          "setting data to %p/%lu/%u\n", old_inode,
121                          old_inode->i_ino, old_inode->i_generation,
122                          old_inode->i_state,
123                          new_inode, new_inode->i_ino, new_inode->i_generation);
124         }
125 #endif
126         lock->l_ast_data = data;
127         unlock_res_and_lock(lock);
128         LDLM_LOCK_PUT(lock);
129
130         EXIT;
131 }
132 EXPORT_SYMBOL(mdc_set_lock_data);
133
134 int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid, 
135                       ldlm_iterator_t it, void *data)
136 {
137         struct ldlm_res_id res_id = { .name = {0} };
138         ENTRY;
139
140         res_id.name[0] = fid->id;
141         res_id.name[1] = fid->generation;
142
143         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
144                               it, data);
145
146         EXIT;
147         return 0;
148 }
149
150 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
151 {
152         /* Don't hold error requests for replay. */
153         if (req->rq_replay) {
154                 spin_lock(&req->rq_lock);
155                 req->rq_replay = 0;
156                 spin_unlock(&req->rq_lock);
157         }
158         if (rc && req->rq_transno != 0) {
159                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
160                 LBUG();
161         }
162 }
163
164 static int round_up(int val)
165 {
166         int ret = 1;
167         while (val) {
168                 val >>= 1;
169                 ret <<= 1;
170         }
171         return ret;
172 }
173
174 /* Save a large LOV EA into the request buffer so that it is available
175  * for replay.  We don't do this in the initial request because the
176  * original request doesn't need this buffer (at most it sends just the
177  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
178  * buffer and may also be difficult to allocate and save a very large
179  * request buffer for each open. (bug 5707)
180  *
181  * OOM here may cause recovery failure if lmm is needed (only for the
182  * original open if the MDS crashed just when this client also OOM'd)
183  * but this is incredibly unlikely, and questionable whether the client
184  * could do MDS recovery under OOM anyways... */
185 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
186                                 struct mds_body *body)
187 {
188         int old_len, new_size, old_size;
189         struct lustre_msg *old_msg = req->rq_reqmsg;
190         struct lustre_msg *new_msg;
191
192         old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + 2);
193         /* save old size */
194         old_size = lustre_msg_size(lustre_request_magic(req),
195                                    req->rq_reqmsg->lm_bufcount,
196                                    req->rq_reqmsg->lm_buflens);
197
198         lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2,
199                               body->eadatasize);
200         new_size = lustre_msg_size(lustre_request_magic(req),
201                                    req->rq_reqmsg->lm_bufcount,
202                                    req->rq_reqmsg->lm_buflens);
203         OBD_ALLOC(new_msg, new_size);
204         if (new_msg != NULL) {
205                 DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u\n",
206                           body->eadatasize);
207                 memcpy(new_msg, old_msg, old_size);
208
209                 spin_lock(&req->rq_lock);
210                 req->rq_reqmsg = new_msg;
211                 req->rq_reqlen = new_size;
212                 spin_unlock(&req->rq_lock);
213
214                 OBD_FREE(old_msg, old_size);
215         } else {
216                 lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2, old_len);
217                 body->valid &= ~OBD_MD_FLEASIZE;
218                 body->eadatasize = 0;
219         }
220 }
221
222 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
223                                                    struct lookup_intent *it,
224                                                    struct mdc_op_data *data,
225                                                    void *lmm, int lmmsize)
226 {
227         struct ptlrpc_request *req;
228         struct ldlm_intent *lit;
229         struct obd_device *obddev = class_exp2obd(exp);
230         int size[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
231                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
232                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
233                         [DLM_INTENT_REC_OFF]  = sizeof(struct mds_rec_create),
234                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
235                         /* As an optimization, we allocate an RPC request buffer
236                          * for at least a default-sized LOV EA even if we aren't
237                          * sending one.  We grow the whole request to the next
238                          * power-of-two size since we get that much from a slab
239                          * allocation anyways. This avoids an allocation below
240                          * in the common case where we need to save a
241                          * default-sized LOV EA for open replay. */
242                         [DLM_INTENT_REC_OFF+2]= max(lmmsize,
243                                          obddev->u.cli.cl_default_mds_easize) };
244         int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
245                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
246                            [DLM_REPLY_REC_OFF]   = sizeof(struct mds_body),
247                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
248                                                         cl_max_mds_easize,
249                            [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
250                 CFS_LIST_HEAD(cancels);
251                 int count = 0;
252                 int mode;
253         int rc;
254                 
255                 it->it_create_mode |= S_IFREG;
256
257         rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, 6, size);
258                 if (rc & (rc - 1))
259                         size[DLM_INTENT_REC_OFF + 2] =
260                          min(size[DLM_INTENT_REC_OFF + 2] + round_up(rc) - rc,
261                                      obddev->u.cli.cl_max_mds_easize);
262
263                 /* If inode is known, cancel conflicting OPEN locks. */
264         if (data->fid2.id) {
265                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
266                                 mode = LCK_CW;
267 #ifdef FMODE_EXEC
268                         else if (it->it_flags & FMODE_EXEC)
269                                 mode = LCK_PR;
270 #endif
271                         else 
272                                 mode = LCK_CR;
273                 count = mdc_resource_get_unused(exp, &data->fid2, &cancels,
274                                                 mode, MDS_INODELOCK_OPEN);
275                 }
276
277                 /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
278                 if (it->it_op & IT_CREAT || it->it_flags & O_JOIN_FILE)
279                         mode = LCK_EX;
280                 else
281                         mode = LCK_CR;
282         count += mdc_resource_get_unused(exp, &data->fid1, &cancels, mode,
283                                          MDS_INODELOCK_UPDATE);
284                 if (it->it_flags & O_JOIN_FILE) {
285                 __u64 head_size = (*(__u64 *)data->data);
286                         /* join is like an unlink of the tail */
287                 size[DLM_INTENT_REC_OFF + 3] = sizeof(struct mds_rec_join);
288                 req = ldlm_prep_enqueue_req(exp, 7, size, &cancels, count);
289                 mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, data, head_size);
290                 } else {
291                 req = ldlm_prep_enqueue_req(exp, 6, size, &cancels, count);
292                 }
293
294         if (req) {
295                 spin_lock(&req->rq_lock);
296                 req->rq_replay = 1;
297                 spin_unlock(&req->rq_lock);
298
299                 /* pack the intent */
300                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
301                                      sizeof(*lit));
302                 lit->opc = (__u64)it->it_op;
303
304                 /* pack the intended request */
305                 mdc_open_pack(req, DLM_INTENT_REC_OFF, data,
306                               it->it_create_mode, 0, it->it_flags,
307                               lmm, lmmsize);
308
309                 ptlrpc_req_set_repsize(req, 5, repsize);
310         }
311         return req;
312 }
313
314 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
315                                                      struct lookup_intent *it,
316                                                      struct mdc_op_data *data)
317 {
318         struct ptlrpc_request *req;
319         struct ldlm_intent *lit;
320         struct obd_device *obddev = class_exp2obd(exp);
321         int size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
322                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
323                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
324                         [DLM_INTENT_REC_OFF]  = sizeof(struct mds_rec_unlink),
325                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
326         int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
327                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
328                            [DLM_REPLY_REC_OFF]   = sizeof(struct mds_body),
329                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
330                                                         cl_max_mds_easize,
331                            [DLM_REPLY_REC_OFF+2] = obddev->u.cli.
332                                                         cl_max_mds_cookiesize };
333
334         req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
335         if (req) {
336                 /* pack the intent */
337                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
338                                      sizeof(*lit));
339                 lit->opc = (__u64)it->it_op;
340
341                 /* pack the intended request */
342                 mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data);
343
344                 ptlrpc_req_set_repsize(req, 5, repsize);
345         }
346         return req;
347 }
348
349 static struct ptlrpc_request *mdc_intent_lookup_pack(struct obd_export *exp,
350                                                      struct lookup_intent *it,
351                                                      struct mdc_op_data *data)
352 {
353         struct ptlrpc_request *req;
354         struct ldlm_intent *lit;
355         struct obd_device *obddev = class_exp2obd(exp);
356         int size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
357                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
358                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
359                         [DLM_INTENT_REC_OFF]  = sizeof(struct mds_body),
360                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
361         int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
362                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
363                            [DLM_REPLY_REC_OFF]   = sizeof(struct mds_body),
364                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
365                                                         cl_max_mds_easize,
366                            [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
367         obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLACL |
368                           OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA;
369
370                 req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
371         if (req) {
372                 /* pack the intent */
373                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
374                                      sizeof(*lit));
375                 lit->opc = (__u64)it->it_op;
376
377                 /* pack the intended request */
378                 mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, it->it_flags,
379                                  data);
380                 ptlrpc_req_set_repsize(req, 5, repsize);
381         }
382         return req;
383 }
384
385 static struct ptlrpc_request *mdc_intent_readdir_pack(struct obd_export *exp)
386 {
387         struct ptlrpc_request *req;
388         int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
389                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request) };
390         int repsize[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
391                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply) };
392
393         req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
394         if (req)
395                 ptlrpc_req_set_repsize(req, 2, repsize);
396         return req;
397 }
398
399 static int mdc_finish_enqueue(struct obd_export *exp,
400                               struct ptlrpc_request *req,
401                               struct ldlm_enqueue_info *einfo,
402                               struct lookup_intent *it,
403                               struct lustre_handle *lockh,
404                               int rc)
405 {
406         struct ldlm_request *lockreq;
407         struct ldlm_reply *lockrep;
408         ENTRY;
409
410         /* Similarly, if we're going to replay this request, we don't want to
411          * actually get a lock, just perform the intent. */
412         if (req->rq_transno || req->rq_replay) {
413                 lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
414                                          sizeof(*lockreq));
415                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
416         }
417
418         if (rc == ELDLM_LOCK_ABORTED) {
419                 einfo->ei_mode = 0;
420                 memset(lockh, 0, sizeof(*lockh));
421                 rc = 0;
422         } else if (rc != 0) {
423                 CERROR("ldlm_cli_enqueue: %d\n", rc);
424                 LASSERTF(rc < 0, "rc %d\n", rc);
425                 mdc_clear_replay_flag(req, rc);
426                 ptlrpc_req_finished(req);
427                 RETURN(rc);
428         } else { /* rc = 0 */
429                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
430                 LASSERT(lock);
431
432                 /* If the server gave us back a different lock mode, we should
433                  * fix up our variables. */
434                 if (lock->l_req_mode != einfo->ei_mode) {
435                         ldlm_lock_addref(lockh, lock->l_req_mode);
436                         ldlm_lock_decref(lockh, einfo->ei_mode);
437                         einfo->ei_mode = lock->l_req_mode;
438                 }
439                 LDLM_LOCK_PUT(lock);
440         }
441
442         lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
443                                  sizeof(*lockrep));
444         LASSERT(lockrep != NULL);                 /* checked by ldlm_cli_enqueue() */
445         /* swabbed by ldlm_cli_enqueue() */
446         LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
447
448         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
449         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
450         it->d.lustre.it_lock_mode = einfo->ei_mode;
451         it->d.lustre.it_data = req;
452
453         if (it->d.lustre.it_status < 0 && req->rq_replay)
454                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
455
456         /* If we're doing an IT_OPEN which did not result in an actual
457          * successful open, then we need to remove the bit which saves
458          * this request for unconditional replay.
459          *
460          * It's important that we do this first!  Otherwise we might exit the
461          * function without doing so, and try to replay a failed create
462          * (bug 3440) */
463         if ((it->it_op & IT_OPEN) &&
464             req->rq_replay &&
465             (!it_disposition(it, DISP_OPEN_OPEN) ||
466              it->d.lustre.it_status != 0))
467                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
468
469         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
470                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
471
472         /* We know what to expect, so we do any byte flipping required here */
473         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
474                 struct mds_body *body;
475
476                 body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
477                                          lustre_swab_mds_body);
478                 if (body == NULL) {
479                         CERROR ("Can't swab mds_body\n");
480                         RETURN (-EPROTO);
481                 }
482
483                 /* If this is a successful OPEN request, we need to set
484                    replay handler and data early, so that if replay happens
485                    immediately after swabbing below, new reply is swabbed
486                    by that handler correctly */
487                 if (it_disposition(it, DISP_OPEN_OPEN) &&
488                     !it_open_error(DISP_OPEN_OPEN, it))
489                         mdc_set_open_replay_data(NULL, req);
490
491                 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
492                         void *eadata;
493
494                         /* The eadata is opaque; just check that it is there.
495                          * Eventually, obd_unpackmd() will check the contents */
496                         eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
497                                                     body->eadatasize, NULL);
498                         if (eadata == NULL) {
499                                 CERROR ("Missing/short eadata\n");
500                                 RETURN (-EPROTO);
501                         }
502                         if (body->valid & OBD_MD_FLMODEASIZE) {
503                                 struct obd_device *obddev = class_exp2obd(exp);
504
505                                 if (obddev->u.cli.cl_max_mds_easize < 
506                                                         body->max_mdsize) {
507                                         obddev->u.cli.cl_max_mds_easize = 
508                                                 body->max_mdsize;
509                                         CDEBUG(D_INFO, "maxeasize become %d\n",
510                                                body->max_mdsize);
511                                 }
512                                 if (obddev->u.cli.cl_max_mds_cookiesize <
513                                                         body->max_cookiesize) {
514                                         obddev->u.cli.cl_max_mds_cookiesize =
515                                                 body->max_cookiesize;
516                                         CDEBUG(D_INFO, "cookiesize become %d\n",
517                                                body->max_cookiesize);
518                                 }
519                         }
520                         /* We save the reply LOV EA in case we have to replay
521                          * a create for recovery.  If we didn't allocate a
522                          * large enough request buffer above we need to
523                          * reallocate it here to hold the actual LOV EA. */
524                         if (it->it_op & IT_OPEN) {
525                                 int offset = DLM_INTENT_REC_OFF + 2;
526                                 void *lmm;
527
528                                 if (lustre_msg_buflen(req->rq_reqmsg, offset) <
529                                     body->eadatasize)
530                                         mdc_realloc_openmsg(req, body);
531
532                                 lmm = lustre_msg_buf(req->rq_reqmsg, offset,
533                                                      body->eadatasize);
534                                 if (lmm)
535                                         memcpy(lmm, eadata, body->eadatasize);
536                         }
537                 }
538         }
539
540         RETURN(rc);
541 }
542
543 /* We always reserve enough space in the reply packet for a stripe MD, because
544  * we don't know in advance the file type. */
545 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
546                 struct lookup_intent *it, struct mdc_op_data *data,
547                 struct lustre_handle *lockh, void *lmm, int lmmsize,
548                 int extra_lock_flags)
549 {
550         struct ptlrpc_request *req;
551         struct obd_device *obddev = class_exp2obd(exp);
552         struct ldlm_res_id res_id =
553                 { .name = {data->fid1.id, data->fid1.generation} };
554         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
555         int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
556         int rc;
557         ENTRY;
558
559         LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
560         if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
561                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
562
563         if (it->it_op & IT_OPEN) {
564                 req = mdc_intent_open_pack(exp, it, data, lmm, lmmsize);
565                 if (it->it_flags & O_JOIN_FILE) {
566                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
567                 }
568         } else if (it->it_op & IT_UNLINK) {
569                 req = mdc_intent_unlink_pack(exp, it, data);
570         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
571                 req = mdc_intent_lookup_pack(exp, it, data);
572         } else if (it->it_op == IT_READDIR) {
573                 req = mdc_intent_readdir_pack(exp);
574         } else {
575                 CERROR("bad it_op %x\n", it->it_op);
576                 RETURN(-EINVAL);
577         }
578
579         if (!req)
580                 RETURN(-ENOMEM);
581
582          /* It is important to obtain rpc_lock first (if applicable), so that
583           * threads that are serialised with rpc_lock are not polluting our
584           * rpcs in flight counter */
585         mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
586         mdc_enter_request(&obddev->u.cli);
587         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
588                               0, NULL, lockh, 0);
589         mdc_exit_request(&obddev->u.cli);
590         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
591
592         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
593
594         RETURN(rc);
595 }
596 EXPORT_SYMBOL(mdc_enqueue);
597
598 int mdc_revalidate_lock(struct obd_export *exp,
599                         struct lookup_intent *it,
600                         struct ll_fid *fid)
601 {
602                 /* We could just return 1 immediately, but since we should only
603                  * be called in revalidate_it if we already have a lock, let's
604                  * verify that. */
605         struct ldlm_res_id res_id = {.name ={fid->id, fid->generation}};
606                 struct lustre_handle lockh;
607                 ldlm_policy_data_t policy;
608                 int mode = LCK_CR;
609         int rc;
610
611                 /* As not all attributes are kept under update lock, e.g. 
612                    owner/group/acls are under lookup lock, we need both 
613                    ibits for GETATTR. */
614                 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
615                         MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
616                         MDS_INODELOCK_LOOKUP;
617
618         rc = ldlm_lock_match(exp->exp_obd->obd_namespace, LDLM_FL_BLOCK_GRANTED,
619                              &res_id, LDLM_IBITS, &policy, LCK_CR, &lockh);
620                 if (!rc) {
621                         mode = LCK_CW;
622                         rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
623                                      LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
624                                      &policy, LCK_CW, &lockh);
625                 }
626                 if (!rc) {
627                         mode = LCK_PR;
628                         rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
629                                      LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
630                                      &policy, LCK_PR, &lockh);
631                 }
632                 if (rc) {
633                 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
634                         it->d.lustre.it_lock_mode = mode;
635                 }
636
637         return rc;
638 }
639 EXPORT_SYMBOL(mdc_revalidate_lock);
640
641 static int mdc_finish_intent_lock(struct obd_export *exp,
642                                   struct ptlrpc_request *req,
643                                   struct mdc_op_data *data,
644                                   struct lookup_intent *it,
645                                   struct lustre_handle *lockh)
646 {
647         struct mds_body *mds_body;
648         struct lustre_handle old_lock;
649         struct ldlm_lock *lock;
650         int rc;
651         ENTRY;
652
653         LASSERT(req != NULL);
654         LASSERT(req != LP_POISON);
655         LASSERT(req->rq_repmsg != LP_POISON);
656
657         if (!it_disposition(it, DISP_IT_EXECD)) {
658                 /* The server failed before it even started executing the
659                  * intent, i.e. because it couldn't unpack the request. */
660                 LASSERT(it->d.lustre.it_status != 0);
661                 RETURN(it->d.lustre.it_status);
662         }
663         rc = it_open_error(DISP_IT_EXECD, it);
664         if (rc)
665                 RETURN(rc);
666
667         mds_body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
668                                   sizeof(*mds_body));
669         /* mdc_enqueue checked */
670         LASSERT(mds_body != NULL);
671         /* mdc_enqueue swabbed */
672         LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
673
674         /* If we were revalidating a fid/name pair, mark the intent in
675          * case we fail and get called again from lookup */
676         if (data->fid2.id && (it->it_op != IT_GETATTR)) {
677                 it_set_disposition(it, DISP_ENQ_COMPLETE);
678                 /* Also: did we find the same inode? */
679                 if (memcmp(&data->fid2, &mds_body->fid1, sizeof(data->fid2))) 
680                         RETURN(-ESTALE);
681         }
682
683         rc = it_open_error(DISP_LOOKUP_EXECD, it);
684         if (rc)
685                 RETURN(rc);
686
687         /* keep requests around for the multiple phases of the call
688          * this shows the DISP_XX must guarantee we make it into the call
689          */
690         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
691             it_disposition(it, DISP_OPEN_CREATE) &&
692             !it_open_error(DISP_OPEN_CREATE, it)) {
693                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
694                 ptlrpc_request_addref(req); /* balanced in ll_create_node */
695         }
696         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
697             it_disposition(it, DISP_OPEN_OPEN) &&
698             !it_open_error(DISP_OPEN_OPEN, it)) {
699                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
700                 ptlrpc_request_addref(req); /* balanced in ll_file_open */
701                 /* BUG 11546 - eviction in the middle of open rpc processing */
702                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
703         }
704
705         if (it->it_op & IT_CREAT) {
706                 /* XXX this belongs in ll_create_it */
707         } else if (it->it_op == IT_OPEN) {
708                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
709         } else {
710                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
711         }
712
713         /* If we already have a matching lock, then cancel the new
714          * one.  We have to set the data here instead of in
715          * mdc_enqueue, because we need to use the child's inode as
716          * the l_ast_data to match, and that's not available until
717          * intent_finish has performed the iget().) */
718         lock = ldlm_handle2lock(lockh);
719         if (lock) {
720                 ldlm_policy_data_t policy = lock->l_policy_data;
721
722                 LDLM_DEBUG(lock, "matching against this");
723                 LDLM_LOCK_PUT(lock);
724                 memcpy(&old_lock, lockh, sizeof(*lockh));
725                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
726                                     LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
727                         ldlm_lock_decref_and_cancel(lockh,
728                                                     it->d.lustre.it_lock_mode);
729                         memcpy(lockh, &old_lock, sizeof(old_lock));
730                         memcpy(&it->d.lustre.it_lock_handle, lockh,
731                                sizeof(*lockh));
732                 }
733         }
734
735         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
736                data->namelen, data->name, ldlm_it2str(it->it_op),
737                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
738         RETURN(rc);
739 }
740
741 /* 
742  * This long block is all about fixing up the lock and request state
743  * so that it is correct as of the moment _before_ the operation was
744  * applied; that way, the VFS will think that everything is normal and
745  * call Lustre's regular VFS methods.
746  *
747  * If we're performing a creation, that means that unless the creation
748  * failed with EEXIST, we should fake up a negative dentry.
749  *
750  * For everything else, we want to lookup to succeed.
751  *
752  * One additional note: if CREATE or OPEN succeeded, we add an extra
753  * reference to the request because we need to keep it around until
754  * ll_create/ll_open gets called.
755  *
756  * The server will return to us, in it_disposition, an indication of
757  * exactly what d.lustre.it_status refers to.
758  *
759  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
760  * otherwise if DISP_OPEN_CREATE is set, then it status is the
761  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
762  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
763  * was successful.
764  *
765  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
766  * child lookup.
767  */
768 int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
769                     void *lmm, int lmmsize, struct lookup_intent *it,
770                     int lookup_flags, struct ptlrpc_request **reqp,
771                     ldlm_blocking_callback cb_blocking, int extra_lock_flags)
772 {
773         struct lustre_handle lockh;
774         int rc;
775         ENTRY;
776
777         LASSERT(it);
778
779         CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
780                op_data->namelen, op_data->name, op_data->fid1.id,
781                ldlm_it2str(it->it_op), it->it_flags);
782
783         if (op_data->fid2.id &&
784             (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
785                 rc = mdc_revalidate_lock(exp, it, &op_data->fid2);
786                 /* Only return failure if it was not GETATTR by cfid
787                    (from inode_revalidate) */
788                 if (rc || op_data->namelen != 0)
789                         RETURN(rc);
790         }
791
792         /* lookup_it may be called only after revalidate_it has run, because
793          * revalidate_it cannot return errors, only zero.  Returning zero causes
794          * this call to lookup, which *can* return an error.
795          *
796          * We only want to execute the request associated with the intent one
797          * time, however, so don't send the request again.  Instead, skip past
798          * this and use the request from revalidate.  In this case, revalidate
799          * never dropped its reference, so the refcounts are all OK */
800         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
801                 struct ldlm_enqueue_info einfo =
802                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
803                           ldlm_completion_ast, NULL, NULL };
804
805                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
806                                  lmm, lmmsize, extra_lock_flags);
807                 if (rc < 0)
808                         RETURN(rc);
809                 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
810         } else if (!op_data->fid2.id) {
811                 /* DISP_ENQ_COMPLETE set means there is extra reference on
812                  * request referenced from this intent, saved for subsequent
813                  * lookup.  This path is executed when we proceed to this
814                  * lookup, so we clear DISP_ENQ_COMPLETE */
815                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
816         }
817
818         *reqp = it->d.lustre.it_data;
819         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
820
821         RETURN(rc);
822 }
823 EXPORT_SYMBOL(mdc_intent_lock);
824
825 static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
826                                               void *unused, int rc)
827 {
828         struct mdc_enqueue_args  *ma;
829         struct md_enqueue_info   *minfo;
830         struct ldlm_enqueue_info *einfo;
831         struct obd_export        *exp;
832         struct lookup_intent     *it;
833         struct lustre_handle     *lockh;
834         struct obd_device        *obddev;
835         int                       flags = LDLM_FL_HAS_INTENT;
836         ENTRY;
837
838         ma = (struct mdc_enqueue_args *)&req->rq_async_args;
839         minfo = ma->ma_mi;
840         einfo = ma->ma_ei;
841
842         exp   = minfo->mi_exp;
843         it    = &minfo->mi_it;
844         lockh = &minfo->mi_lockh;
845
846         obddev = class_exp2obd(exp);
847
848         mdc_exit_request(&obddev->u.cli);
849
850         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
851                                    &flags, NULL, 0, NULL, lockh, rc);
852
853         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
854         if (rc)
855                 GOTO(out, rc);
856
857         memcpy(&it->d.lustre.it_lock_handle, lockh, sizeof(*lockh));
858
859         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
860         GOTO(out, rc);
861 out:
862         OBD_FREE_PTR(einfo);
863         minfo->mi_cb(exp, req, minfo, rc);
864
865         return 0;
866 }
867
868 int mdc_intent_getattr_async(struct obd_export *exp,
869                              struct md_enqueue_info *minfo,
870                              struct ldlm_enqueue_info *einfo)
871 {
872         struct mdc_op_data      *op_data = &minfo->mi_data;
873         struct lookup_intent    *it = &minfo->mi_it;
874         struct ptlrpc_request   *req;
875         struct obd_device       *obddev = class_exp2obd(exp);
876         struct ldlm_res_id       res_id = {
877                                         .name = {op_data->fid1.id,
878                                                  op_data->fid1.generation}
879                                  };
880         ldlm_policy_data_t       policy = {
881                                         .l_inodebits = { MDS_INODELOCK_LOOKUP }
882                                  };
883         struct mdc_enqueue_args *aa;
884         int                      rc;
885         int                      flags = LDLM_FL_HAS_INTENT;
886         ENTRY;
887
888         CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
889                op_data->namelen, op_data->name, op_data->fid1.id,
890                ldlm_it2str(it->it_op), it->it_flags);
891
892         req = mdc_intent_lookup_pack(exp, it, op_data);
893         if (!req)
894                 RETURN(-ENOMEM);
895
896         mdc_enter_request(&obddev->u.cli);
897         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
898                               0, NULL, &minfo->mi_lockh, 1);
899         if (rc < 0) {
900                 mdc_exit_request(&obddev->u.cli);
901                 RETURN(rc);
902         }
903
904         CLASSERT(sizeof(*aa) < sizeof(req->rq_async_args));
905         aa = (struct mdc_enqueue_args *)&req->rq_async_args;
906         aa->ma_mi = minfo;
907         aa->ma_ei = einfo;
908         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
909         ptlrpcd_add_req(req);
910
911         RETURN(0);
912 }
913 EXPORT_SYMBOL(mdc_intent_getattr_async);