Whamcloud - gitweb
b=15346
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_MDC
29
30 #ifdef __KERNEL__
31 # include <linux/module.h>
32 # include <linux/pagemap.h>
33 # include <linux/miscdevice.h>
34 # include <linux/init.h>
35 #else
36 # include <liblustre.h>
37 #endif
38
39 #include <linux/lustre_acl.h>
40 #include <obd_class.h>
41 #include <lustre_dlm.h>
42 /* fid_res_name_eq() */
43 #include <lustre_fid.h>
44 #include <lprocfs_status.h>
45 #include "mdc_internal.h"
46
47 int it_disposition(struct lookup_intent *it, int flag)
48 {
49         return it->d.lustre.it_disposition & flag;
50 }
51 EXPORT_SYMBOL(it_disposition);
52
53 void it_set_disposition(struct lookup_intent *it, int flag)
54 {
55         it->d.lustre.it_disposition |= flag;
56 }
57 EXPORT_SYMBOL(it_set_disposition);
58
59 void it_clear_disposition(struct lookup_intent *it, int flag)
60 {
61         it->d.lustre.it_disposition &= ~flag;
62 }
63 EXPORT_SYMBOL(it_clear_disposition);
64
65 int it_open_error(int phase, struct lookup_intent *it)
66 {
67         if (it_disposition(it, DISP_OPEN_OPEN)) {
68                 if (phase >= DISP_OPEN_OPEN)
69                         return it->d.lustre.it_status;
70                 else
71                         return 0;
72         }
73
74         if (it_disposition(it, DISP_OPEN_CREATE)) {
75                 if (phase >= DISP_OPEN_CREATE)
76                         return it->d.lustre.it_status;
77                 else
78                         return 0;
79         }
80
81         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
82                 if (phase >= DISP_LOOKUP_EXECD)
83                         return it->d.lustre.it_status;
84                 else
85                         return 0;
86         }
87
88         if (it_disposition(it, DISP_IT_EXECD)) {
89                 if (phase >= DISP_IT_EXECD)
90                         return it->d.lustre.it_status;
91                 else
92                         return 0;
93         }
94         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
95                it->d.lustre.it_status);
96         LBUG();
97         return 0;
98 }
99 EXPORT_SYMBOL(it_open_error);
100
101 /* this must be called on a lockh that is known to have a referenced lock */
102 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data)
103 {
104         struct ldlm_lock *lock;
105         ENTRY;
106
107         if (!*lockh) {
108                 EXIT;
109                 RETURN(0);
110         }
111
112         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
113
114         LASSERT(lock != NULL);
115         lock_res_and_lock(lock);
116 #ifdef __KERNEL__
117         if (lock->l_ast_data && lock->l_ast_data != data) {
118                 struct inode *new_inode = data;
119                 struct inode *old_inode = lock->l_ast_data;
120                 LASSERTF(old_inode->i_state & I_FREEING,
121                          "Found existing inode %p/%lu/%u state %lu in lock: "
122                          "setting data to %p/%lu/%u\n", old_inode,
123                          old_inode->i_ino, old_inode->i_generation,
124                          old_inode->i_state,
125                          new_inode, new_inode->i_ino, new_inode->i_generation);
126         }
127 #endif
128         lock->l_ast_data = data;
129         unlock_res_and_lock(lock);
130         LDLM_LOCK_PUT(lock);
131
132         RETURN(0);
133 }
134
135 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
136                            const struct lu_fid *fid, ldlm_type_t type,
137                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
138                            struct lustre_handle *lockh)
139 {
140         struct ldlm_res_id res_id;
141         ldlm_mode_t rc;
142         ENTRY;
143
144         fid_build_reg_res_name(fid, &res_id);
145         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
146                              &res_id, type, policy, mode, lockh);
147         RETURN(rc);
148 }
149
150 int mdc_cancel_unused(struct obd_export *exp,
151                       const struct lu_fid *fid,
152                       ldlm_policy_data_t *policy,
153                       ldlm_mode_t mode, int flags, void *opaque)
154 {
155         struct ldlm_res_id res_id;
156         struct obd_device *obd = class_exp2obd(exp);
157         int rc;
158
159         ENTRY;
160
161         fid_build_reg_res_name(fid, &res_id);
162         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
163                                              policy, mode, flags, opaque);
164         RETURN(rc);
165 }
166
167 int mdc_change_cbdata(struct obd_export *exp,
168                       const struct lu_fid *fid,
169                       ldlm_iterator_t it, void *data)
170 {
171         struct ldlm_res_id res_id;
172         ENTRY;
173
174         fid_build_reg_res_name(fid, &res_id);
175         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
176                               &res_id, it, data);
177
178         EXIT;
179         return 0;
180 }
181
182 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
183 {
184         /* Don't hold error requests for replay. */
185         if (req->rq_replay) {
186                 spin_lock(&req->rq_lock);
187                 req->rq_replay = 0;
188                 spin_unlock(&req->rq_lock);
189         }
190         if (rc && req->rq_transno != 0) {
191                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
192                 LBUG();
193         }
194 }
195
196 /* Save a large LOV EA into the request buffer so that it is available
197  * for replay.  We don't do this in the initial request because the
198  * original request doesn't need this buffer (at most it sends just the
199  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
200  * buffer and may also be difficult to allocate and save a very large
201  * request buffer for each open. (bug 5707)
202  *
203  * OOM here may cause recovery failure if lmm is needed (only for the
204  * original open if the MDS crashed just when this client also OOM'd)
205  * but this is incredibly unlikely, and questionable whether the client
206  * could do MDS recovery under OOM anyways... */
207 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
208                                 struct mdt_body *body)
209 {
210         int     rc;
211
212         /* FIXME: remove this explicit offset. */
213         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
214                                         body->eadatasize);
215         if (rc) {
216                 CERROR("Can't enlarge segment %d size to %d\n",
217                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
218                 body->valid &= ~OBD_MD_FLEASIZE;
219                 body->eadatasize = 0;
220         }
221 }
222
223 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
224                                                    struct lookup_intent *it,
225                                                    struct md_op_data *op_data,
226                                                    void *lmm, int lmmsize,
227                                                    void *cb_data)
228 {
229         struct ptlrpc_request *req;
230         struct obd_device     *obddev = class_exp2obd(exp);
231         struct ldlm_intent    *lit;
232         int                    joinfile = !!((it->it_flags & O_JOIN_FILE) && 
233                                               op_data->op_data);
234         CFS_LIST_HEAD(cancels);
235         int                    count = 0;
236         int                    mode;
237         int                    rc;
238         ENTRY;
239
240         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
241
242         /* XXX: openlock is not cancelled for cross-refs. */
243         /* If inode is known, cancel conflicting OPEN locks. */
244         if (fid_is_sane(&op_data->op_fid2)) {
245                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
246                         mode = LCK_CW;
247 #ifdef FMODE_EXEC
248                 else if (it->it_flags & FMODE_EXEC)
249                         mode = LCK_PR;
250 #endif
251                 else
252                         mode = LCK_CR;
253                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
254                                                 &cancels, mode,
255                                                 MDS_INODELOCK_OPEN);
256         }
257
258         /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
259         if (it->it_op & IT_CREAT || joinfile)
260                 mode = LCK_EX;
261         else
262                 mode = LCK_CR;
263         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
264                                          &cancels, mode,
265                                          MDS_INODELOCK_UPDATE);
266
267         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
268                                    &RQF_LDLM_INTENT_OPEN);
269         if (req == NULL) {
270                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
271                 RETURN(ERR_PTR(-ENOMEM));
272         }
273
274         /* parent capability */
275         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
276         /* child capability, reserve the size according to parent capa, it will
277          * be filled after we get the reply */
278         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
279
280         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
281                              op_data->op_namelen + 1);
282         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
283                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
284         if (!joinfile) {
285                 req_capsule_set_size(&req->rq_pill, &RMF_REC_JOINFILE,
286                                      RCL_CLIENT, 0);
287         }
288
289         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
290         if (rc) {
291                 ptlrpc_request_free(req);
292                 return NULL;
293         }
294
295         if (joinfile) {
296                 __u64 head_size = *(__u64 *)op_data->op_data;
297                 mdc_join_pack(req, op_data, head_size);
298         }
299
300         spin_lock(&req->rq_lock);
301         req->rq_replay = 1;
302         spin_unlock(&req->rq_lock);
303
304         /* pack the intent */
305         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
306         lit->opc = (__u64)it->it_op;
307
308         /* pack the intended request */
309         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
310                       lmmsize);
311
312         /* for remote client, fetch remote perm for current user */
313         if (client_is_remote(exp))
314                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
315                                      sizeof(struct mdt_remote_perm));
316         ptlrpc_request_set_replen(req);
317         return req;
318 }
319
320 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
321                                                      struct lookup_intent *it,
322                                                      struct md_op_data *op_data)
323 {
324         struct ptlrpc_request *req;
325         struct obd_device     *obddev = class_exp2obd(exp);
326         struct ldlm_intent    *lit;
327         int                    rc;
328         ENTRY;
329
330         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
331                                    &RQF_LDLM_INTENT_UNLINK);
332         if (req == NULL)
333                 RETURN(ERR_PTR(-ENOMEM));
334
335         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
336         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
337                              op_data->op_namelen + 1);
338
339         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
340         if (rc) {
341                 ptlrpc_request_free(req);
342                 RETURN(ERR_PTR(rc));
343         }
344
345         /* pack the intent */
346         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
347         lit->opc = (__u64)it->it_op;
348
349         /* pack the intended request */
350         mdc_unlink_pack(req, op_data);
351
352         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
353                              obddev->u.cli.cl_max_mds_easize);
354         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
355                              obddev->u.cli.cl_max_mds_cookiesize);
356         ptlrpc_request_set_replen(req);
357         RETURN(req);
358 }
359
360 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
361                                                       struct lookup_intent *it,
362                                                       struct md_op_data *op_data)
363 {
364         struct ptlrpc_request *req;
365         struct obd_device     *obddev = class_exp2obd(exp);
366         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
367                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
368                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
369                                        (client_is_remote(exp) ?
370                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
371         struct ldlm_intent    *lit;
372         int                    rc;
373         ENTRY;
374
375         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
376                                    &RQF_LDLM_INTENT_GETATTR);
377         if (req == NULL)
378                 RETURN(ERR_PTR(-ENOMEM));
379
380         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
381         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
382                              op_data->op_namelen + 1);
383
384         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
385         if (rc) {
386                 ptlrpc_request_free(req);
387                 RETURN(ERR_PTR(rc));
388         }
389
390         /* pack the intent */
391         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
392         lit->opc = (__u64)it->it_op;
393
394         /* pack the intended request */
395         mdc_getattr_pack(req, valid, it->it_flags, op_data);
396
397         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
398                              obddev->u.cli.cl_max_mds_easize);
399         if (client_is_remote(exp))
400                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
401                                      sizeof(struct mdt_remote_perm));
402         ptlrpc_request_set_replen(req);
403         RETURN(req);
404 }
405
406 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
407 {
408         struct ptlrpc_request *req;
409         int rc;
410         ENTRY;
411
412         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
413         if (req == NULL)
414                 RETURN(ERR_PTR(-ENOMEM));
415
416         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
417         if (rc) {
418                 ptlrpc_request_free(req);
419                 RETURN(ERR_PTR(rc));
420         }
421
422         ptlrpc_request_set_replen(req);
423         RETURN(req);
424 }
425
426 static int mdc_finish_enqueue(struct obd_export *exp,
427                               struct ptlrpc_request *req,
428                               struct ldlm_enqueue_info *einfo,
429                               struct lookup_intent *it,
430                               struct lustre_handle *lockh,
431                               int rc)
432 {
433         struct req_capsule  *pill = &req->rq_pill;
434         struct ldlm_request *lockreq;
435         struct ldlm_reply   *lockrep;
436         ENTRY;
437
438         LASSERT(rc >= 0);
439         /* Similarly, if we're going to replay this request, we don't want to
440          * actually get a lock, just perform the intent. */
441         if (req->rq_transno || req->rq_replay) {
442                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
443                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
444         }
445
446         if (rc == ELDLM_LOCK_ABORTED) {
447                 einfo->ei_mode = 0;
448                 memset(lockh, 0, sizeof(*lockh));
449                 rc = 0;
450         } else { /* rc = 0 */
451                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
452                 LASSERT(lock);
453
454                 /* If the server gave us back a different lock mode, we should
455                  * fix up our variables. */
456                 if (lock->l_req_mode != einfo->ei_mode) {
457                         ldlm_lock_addref(lockh, lock->l_req_mode);
458                         ldlm_lock_decref(lockh, einfo->ei_mode);
459                         einfo->ei_mode = lock->l_req_mode;
460                 }
461                 LDLM_LOCK_PUT(lock);
462         }
463
464         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
465         LASSERT(lockrep != NULL);                 /* checked by ldlm_cli_enqueue() */
466
467         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
468         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
469         it->d.lustre.it_lock_mode = einfo->ei_mode;
470         it->d.lustre.it_data = req;
471
472         if (it->d.lustre.it_status < 0 && req->rq_replay)
473                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
474
475         /* If we're doing an IT_OPEN which did not result in an actual
476          * successful open, then we need to remove the bit which saves
477          * this request for unconditional replay.
478          *
479          * It's important that we do this first!  Otherwise we might exit the
480          * function without doing so, and try to replay a failed create
481          * (bug 3440) */
482         if (it->it_op & IT_OPEN && req->rq_replay &&
483             (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
484                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
485
486         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
487                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
488
489         /* We know what to expect, so we do any byte flipping required here */
490         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
491                 struct mdt_body *body;
492
493                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
494                 if (body == NULL) {
495                         CERROR ("Can't swab mdt_body\n");
496                         RETURN (-EPROTO);
497                 }
498
499                 if (it_disposition(it, DISP_OPEN_OPEN) &&
500                     !it_open_error(DISP_OPEN_OPEN, it)) {
501                         /*
502                          * If this is a successful OPEN request, we need to set
503                          * replay handler and data early, so that if replay
504                          * happens immediately after swabbing below, new reply
505                          * is swabbed by that handler correctly.
506                          */
507                         mdc_set_open_replay_data(NULL, NULL, req);
508                 }
509
510                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
511                         void *eadata;
512
513                         /*
514                          * The eadata is opaque; just check that it is there.
515                          * Eventually, obd_unpackmd() will check the contents.
516                          */
517                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
518                                                               body->eadatasize);
519                         if (eadata == NULL)
520                                 RETURN(-EPROTO);
521
522                         if (body->valid & OBD_MD_FLMODEASIZE) {
523                                 struct obd_device *obddev = class_exp2obd(exp);
524
525                                 if (obddev->u.cli.cl_max_mds_easize <
526                                     body->max_mdsize) {
527                                         obddev->u.cli.cl_max_mds_easize =
528                                                 body->max_mdsize;
529                                         CDEBUG(D_INFO, "maxeasize become %d\n",
530                                                body->max_mdsize);
531                                 }
532                                 if (obddev->u.cli.cl_max_mds_cookiesize <
533                                     body->max_cookiesize) {
534                                         obddev->u.cli.cl_max_mds_cookiesize =
535                                                 body->max_cookiesize;
536                                         CDEBUG(D_INFO, "cookiesize become %d\n",
537                                                body->max_cookiesize);
538                                 }
539                         }
540
541                         /*
542                          * We save the reply LOV EA in case we have to replay a
543                          * create for recovery.  If we didn't allocate a large
544                          * enough request buffer above we need to reallocate it
545                          * here to hold the actual LOV EA.
546                          *
547                          * To not save LOV EA if request is not going to replay
548                          * (for example error one).
549                          */
550                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
551                                 void *lmm;
552                                 if (req_capsule_get_size(pill, &RMF_EADATA,
553                                                          RCL_CLIENT) <
554                                     body->eadatasize) {
555                                         mdc_realloc_openmsg(req, body);
556                                         req_capsule_set_size(pill, &RMF_EADATA,
557                                                              RCL_CLIENT,
558                                                              body->eadatasize);
559                                 }
560                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
561                                 if (lmm)
562                                         memcpy(lmm, eadata, body->eadatasize);
563                         }
564                 }
565
566                 if (body->valid & OBD_MD_FLRMTPERM) {
567                         struct mdt_remote_perm *perm;
568
569                         LASSERT(client_is_remote(exp));
570                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
571                                                 lustre_swab_mdt_remote_perm);
572                         if (perm == NULL)
573                                 RETURN(-EPROTO);
574                 }
575                 if (body->valid & OBD_MD_FLMDSCAPA) {
576                         struct lustre_capa *capa, *p;
577
578                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
579                         if (capa == NULL)
580                                 RETURN(-EPROTO);
581
582                         if (it->it_op & IT_OPEN) {
583                                 /* client fid capa will be checked in replay */
584                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
585                                 LASSERT(p);
586                                 *p = *capa;
587                         }
588                 }
589                 if (body->valid & OBD_MD_FLOSSCAPA) {
590                         struct lustre_capa *capa;
591
592                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
593                         if (capa == NULL)
594                                 RETURN(-EPROTO);
595                 }
596         }
597
598         RETURN(rc);
599 }
600
601 /* We always reserve enough space in the reply packet for a stripe MD, because
602  * we don't know in advance the file type. */
603 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
604                 struct lookup_intent *it, struct md_op_data *op_data,
605                 struct lustre_handle *lockh, void *lmm, int lmmsize,
606                 int extra_lock_flags)
607 {
608         struct obd_device     *obddev = class_exp2obd(exp);
609         struct ptlrpc_request *req;
610         struct req_capsule    *pill;
611         int                    flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
612         int                    rc;
613         struct ldlm_res_id res_id;
614         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
615         ENTRY;
616
617         LASSERTF(einfo->ei_type == LDLM_IBITS, "lock type %d\n", einfo->ei_type);
618
619         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
620
621         if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
622                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
623
624         if (it->it_op & IT_OPEN) {
625                 int joinfile = !!((it->it_flags & O_JOIN_FILE) &&
626                                               op_data->op_data);
627
628                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
629                                            einfo->ei_cbdata);
630                 if (!joinfile) {
631                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
632                         einfo->ei_cbdata = NULL;
633                         lmm = NULL;
634                 } else
635                         it->it_flags &= ~O_JOIN_FILE;
636         } else if (it->it_op & IT_UNLINK)
637                 req = mdc_intent_unlink_pack(exp, it, op_data);
638         else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
639                 req = mdc_intent_getattr_pack(exp, it, op_data);
640         else if (it->it_op == IT_READDIR)
641                 req = ldlm_enqueue_pack(exp);
642         else {
643                 LBUG();
644                 RETURN(-EINVAL);
645         }
646
647         if (IS_ERR(req))
648                 RETURN(PTR_ERR(req));
649         pill = &req->rq_pill;
650
651         /* It is important to obtain rpc_lock first (if applicable), so that
652          * threads that are serialised with rpc_lock are not polluting our
653          * rpcs in flight counter */
654         mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
655         mdc_enter_request(&obddev->u.cli);
656         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
657                               0, NULL, lockh, 0);
658         mdc_exit_request(&obddev->u.cli);
659         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
660         if (rc < 0) {
661                 CERROR("ldlm_cli_enqueue: %d\n", rc);
662                 mdc_clear_replay_flag(req, rc);
663                 ptlrpc_req_finished(req);
664                 RETURN(rc);
665         }
666         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
667
668         RETURN(rc);
669 }
670
671 static int mdc_finish_intent_lock(struct obd_export *exp,
672                                   struct ptlrpc_request *request,
673                                   struct md_op_data *op_data,
674                                   struct lookup_intent *it,
675                                   struct lustre_handle *lockh)
676 {
677         struct lustre_handle old_lock;
678         struct mdt_body *mdt_body;
679         struct ldlm_lock *lock;
680         int rc;
681
682
683         LASSERT(request != NULL);
684         LASSERT(request != LP_POISON);
685         LASSERT(request->rq_repmsg != LP_POISON);
686
687         if (!it_disposition(it, DISP_IT_EXECD)) {
688                 /* The server failed before it even started executing the
689                  * intent, i.e. because it couldn't unpack the request. */
690                 LASSERT(it->d.lustre.it_status != 0);
691                 RETURN(it->d.lustre.it_status);
692         }
693         rc = it_open_error(DISP_IT_EXECD, it);
694         if (rc)
695                 RETURN(rc);
696
697         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
698         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
699
700         /* If we were revalidating a fid/name pair, mark the intent in
701          * case we fail and get called again from lookup */
702         if (fid_is_sane(&op_data->op_fid2) &&
703             it->it_flags & O_CHECK_STALE &&
704             it->it_op != IT_GETATTR) {
705                 it_set_disposition(it, DISP_ENQ_COMPLETE);
706
707                 /* Also: did we find the same inode? */
708                 /* sever can return one of two fids:
709                  * op_fid2 - new allocated fid - if file is created.
710                  * op_fid3 - existent fid - if file only open.
711                  * op_fid3 is saved in lmv_intent_open */
712                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
713                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
714                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
715                                "\n", PFID(&op_data->op_fid2),
716                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
717                         RETURN(-ESTALE);
718                 }
719         }
720
721         rc = it_open_error(DISP_LOOKUP_EXECD, it);
722         if (rc)
723                 RETURN(rc);
724
725         /* keep requests around for the multiple phases of the call
726          * this shows the DISP_XX must guarantee we make it into the call
727          */
728         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
729             it_disposition(it, DISP_OPEN_CREATE) &&
730             !it_open_error(DISP_OPEN_CREATE, it)) {
731                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
732                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
733         }
734         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
735             it_disposition(it, DISP_OPEN_OPEN) &&
736             !it_open_error(DISP_OPEN_OPEN, it)) {
737                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
738                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
739                 /* BUG 11546 - eviction in the middle of open rpc processing */
740                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
741         }
742
743         if (it->it_op & IT_CREAT) {
744                 /* XXX this belongs in ll_create_it */
745         } else if (it->it_op == IT_OPEN) {
746                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
747         } else {
748                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
749         }
750
751         /* If we already have a matching lock, then cancel the new
752          * one.  We have to set the data here instead of in
753          * mdc_enqueue, because we need to use the child's inode as
754          * the l_ast_data to match, and that's not available until
755          * intent_finish has performed the iget().) */
756         lock = ldlm_handle2lock(lockh);
757         if (lock) {
758                 ldlm_policy_data_t policy = lock->l_policy_data;
759                 LDLM_DEBUG(lock, "matching against this");
760
761                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
762                                          &lock->l_resource->lr_name),
763                          "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
764                          (unsigned long)lock->l_resource->lr_name.name[0],
765                          (unsigned long)lock->l_resource->lr_name.name[1],
766                          (unsigned long)lock->l_resource->lr_name.name[2],
767                          (unsigned long)fid_seq(&mdt_body->fid1),
768                          (unsigned long)fid_oid(&mdt_body->fid1),
769                          (unsigned long)fid_ver(&mdt_body->fid1));
770                 LDLM_LOCK_PUT(lock);
771
772                 memcpy(&old_lock, lockh, sizeof(*lockh));
773                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
774                                     LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
775                         ldlm_lock_decref_and_cancel(lockh,
776                                                     it->d.lustre.it_lock_mode);
777                         memcpy(lockh, &old_lock, sizeof(old_lock));
778                         it->d.lustre.it_lock_handle = lockh->cookie;
779                 }
780         }
781         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
782                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
783                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
784         RETURN(rc);
785 }
786
787 /*
788  * This long block is all about fixing up the lock and request state
789  * so that it is correct as of the moment _before_ the operation was
790  * applied; that way, the VFS will think that everything is normal and
791  * call Lustre's regular VFS methods.
792  *
793  * If we're performing a creation, that means that unless the creation
794  * failed with EEXIST, we should fake up a negative dentry.
795  *
796  * For everything else, we want to lookup to succeed.
797  *
798  * One additional note: if CREATE or OPEN succeeded, we add an extra
799  * reference to the request because we need to keep it around until
800  * ll_create/ll_open gets called.
801  *
802  * The server will return to us, in it_disposition, an indication of
803  * exactly what d.lustre.it_status refers to.
804  *
805  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
806  * otherwise if DISP_OPEN_CREATE is set, then it status is the
807  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
808  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
809  * was successful.
810  *
811  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
812  * child lookup.
813  */
814 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
815                     void *lmm, int lmmsize, struct lookup_intent *it,
816                     int lookup_flags, struct ptlrpc_request **reqp,
817                     ldlm_blocking_callback cb_blocking,
818                     int extra_lock_flags)
819 {
820         struct lustre_handle lockh;
821         int rc = 0;
822         ENTRY;
823         LASSERT(it);
824
825         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
826                ", intent: %s flags %#o\n", op_data->op_namelen,
827                op_data->op_name, PFID(&op_data->op_fid2),
828                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
829                it->it_flags);
830
831         lockh.cookie = 0;
832         if (fid_is_sane(&op_data->op_fid2) &&
833             (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
834                 /* We could just return 1 immediately, but since we should only
835                  * be called in revalidate_it if we already have a lock, let's
836                  * verify that. */
837                 ldlm_policy_data_t policy;
838                 ldlm_mode_t mode;
839
840                 /* As not all attributes are kept under update lock, e.g.
841                    owner/group/acls are under lookup lock, we need both
842                    ibits for GETATTR. */
843
844                 /* For CMD, UPDATE lock and LOOKUP lock can not be got
845                  * at the same for cross-object, so we can not match
846                  * the 2 lock at the same time FIXME: but how to handle
847                  * the above situation */
848                 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
849                         MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
850
851                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED,
852                                       &op_data->op_fid2, LDLM_IBITS, &policy,
853                                       LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
854                 if (mode) {
855                         it->d.lustre.it_lock_handle = lockh.cookie;
856                         it->d.lustre.it_lock_mode = mode;
857                 }
858
859                 /* Only return failure if it was not GETATTR by cfid
860                    (from inode_revalidate) */
861                 if (mode || op_data->op_namelen != 0)
862                         RETURN(!!mode);
863         }
864
865         /* lookup_it may be called only after revalidate_it has run, because
866          * revalidate_it cannot return errors, only zero.  Returning zero causes
867          * this call to lookup, which *can* return an error.
868          *
869          * We only want to execute the request associated with the intent one
870          * time, however, so don't send the request again.  Instead, skip past
871          * this and use the request from revalidate.  In this case, revalidate
872          * never dropped its reference, so the refcounts are all OK */
873         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
874                 struct ldlm_enqueue_info einfo =
875                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
876                           ldlm_completion_ast, NULL, NULL };
877
878                 /* For case if upper layer did not alloc fid, do it now. */
879                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
880                         rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
881                         if (rc < 0) {
882                                 CERROR("Can't alloc new fid, rc %d\n", rc);
883                                 RETURN(rc);
884                         }
885                 }
886                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
887                                  lmm, lmmsize, extra_lock_flags);
888                 if (rc < 0)
889                         RETURN(rc);
890                 it->d.lustre.it_lock_handle = lockh.cookie;
891         } else if (!fid_is_sane(&op_data->op_fid2) ||
892                    !(it->it_flags & O_CHECK_STALE)) {
893                 /* DISP_ENQ_COMPLETE set means there is extra reference on
894                  * request referenced from this intent, saved for subsequent
895                  * lookup.  This path is executed when we proceed to this
896                  * lookup, so we clear DISP_ENQ_COMPLETE */
897                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
898         }
899         *reqp = it->d.lustre.it_data;
900         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
901         RETURN(rc);
902 }
903
904 static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
905                                               void *unused, int rc)
906 {
907         struct obd_export        *exp = req->rq_async_args.pointer_arg[0];
908         struct md_enqueue_info   *minfo = req->rq_async_args.pointer_arg[1];
909         struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
910         struct lookup_intent     *it;
911         struct lustre_handle     *lockh;
912         struct obd_device        *obddev;
913         int                       flags = LDLM_FL_HAS_INTENT;
914         ENTRY;
915
916         it    = &minfo->mi_it;
917         lockh = &minfo->mi_lockh;
918
919         obddev = class_exp2obd(exp);
920
921         mdc_exit_request(&obddev->u.cli);
922         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
923                 rc = -ETIMEDOUT;
924
925         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
926                                    &flags, NULL, 0, NULL, lockh, rc);
927         if (rc < 0) {
928                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
929                 mdc_clear_replay_flag(req, rc);
930                 GOTO(out, rc);
931         }
932
933         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
934         if (rc)
935                 GOTO(out, rc);
936
937         it->d.lustre.it_lock_handle = lockh->cookie;
938
939         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
940         EXIT;
941
942 out:
943         OBD_FREE_PTR(einfo);
944         minfo->mi_cb(req, minfo, rc);
945         return 0;
946 }
947
948 int mdc_intent_getattr_async(struct obd_export *exp,
949                              struct md_enqueue_info *minfo,
950                              struct ldlm_enqueue_info *einfo)
951 {
952         struct md_op_data       *op_data = &minfo->mi_data;
953         struct lookup_intent    *it = &minfo->mi_it;
954         struct ptlrpc_request   *req;
955         struct obd_device       *obddev = class_exp2obd(exp);
956         struct ldlm_res_id       res_id;
957         ldlm_policy_data_t       policy = {
958                                         .l_inodebits = { MDS_INODELOCK_LOOKUP }
959                                  };
960         int                      rc;
961         int                      flags = LDLM_FL_HAS_INTENT;
962         ENTRY;
963
964         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
965                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
966                ldlm_it2str(it->it_op), it->it_flags);
967
968         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
969         req = mdc_intent_getattr_pack(exp, it, op_data);
970         if (!req)
971                 RETURN(-ENOMEM);
972
973         mdc_enter_request(&obddev->u.cli);
974         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
975                               0, NULL, &minfo->mi_lockh, 1);
976         if (rc < 0) {
977                 mdc_exit_request(&obddev->u.cli);
978                 RETURN(rc);
979         }
980
981         req->rq_async_args.pointer_arg[0] = exp;
982         req->rq_async_args.pointer_arg[1] = minfo;
983         req->rq_async_args.pointer_arg[2] = einfo;
984         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
985         ptlrpcd_add_req(req);
986
987         RETURN(0);
988 }
989
990 int mdc_revalidate_lock(struct obd_export *exp,
991                         struct lookup_intent *it,
992                         struct lu_fid *fid)
993 {
994         /* We could just return 1 immediately, but since we should only
995          * be called in revalidate_it if we already have a lock, let's
996          * verify that. */
997         struct ldlm_res_id res_id;
998         struct lustre_handle lockh;
999         ldlm_policy_data_t policy;
1000         ldlm_mode_t mode;
1001         ENTRY;
1002
1003         fid_build_reg_res_name(fid, &res_id);
1004         /* As not all attributes are kept under update lock, e.g. 
1005            owner/group/acls are under lookup lock, we need both 
1006            ibits for GETATTR. */
1007         policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
1008                 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
1009                 MDS_INODELOCK_LOOKUP;
1010
1011         mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1012                                LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
1013                                &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
1014         if (mode) {
1015                 it->d.lustre.it_lock_handle = lockh.cookie;
1016                 it->d.lustre.it_lock_mode = mode;
1017         }
1018
1019         RETURN(!!mode);
1020 }