Whamcloud - gitweb
b=19200
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_MDC
41
42 #ifdef __KERNEL__
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
47 #else
48 # include <liblustre.h>
49 #endif
50
51 #include <lustre_acl.h>
52 #include <obd_class.h>
53 #include <lustre_dlm.h>
54 /* fid_res_name_eq() */
55 #include <lustre_fid.h>
56 #include <lprocfs_status.h>
57 #include "mdc_internal.h"
58
59 int it_disposition(struct lookup_intent *it, int flag)
60 {
61         return it->d.lustre.it_disposition & flag;
62 }
63 EXPORT_SYMBOL(it_disposition);
64
65 void it_set_disposition(struct lookup_intent *it, int flag)
66 {
67         it->d.lustre.it_disposition |= flag;
68 }
69 EXPORT_SYMBOL(it_set_disposition);
70
71 void it_clear_disposition(struct lookup_intent *it, int flag)
72 {
73         it->d.lustre.it_disposition &= ~flag;
74 }
75 EXPORT_SYMBOL(it_clear_disposition);
76
77 int it_open_error(int phase, struct lookup_intent *it)
78 {
79         if (it_disposition(it, DISP_OPEN_OPEN)) {
80                 if (phase >= DISP_OPEN_OPEN)
81                         return it->d.lustre.it_status;
82                 else
83                         return 0;
84         }
85
86         if (it_disposition(it, DISP_OPEN_CREATE)) {
87                 if (phase >= DISP_OPEN_CREATE)
88                         return it->d.lustre.it_status;
89                 else
90                         return 0;
91         }
92
93         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
94                 if (phase >= DISP_LOOKUP_EXECD)
95                         return it->d.lustre.it_status;
96                 else
97                         return 0;
98         }
99
100         if (it_disposition(it, DISP_IT_EXECD)) {
101                 if (phase >= DISP_IT_EXECD)
102                         return it->d.lustre.it_status;
103                 else
104                         return 0;
105         }
106         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
107                it->d.lustre.it_status);
108         LBUG();
109         return 0;
110 }
111 EXPORT_SYMBOL(it_open_error);
112
113 /* this must be called on a lockh that is known to have a referenced lock */
114 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
115                       __u32 *bits)
116 {
117         struct ldlm_lock *lock;
118         ENTRY;
119
120         if(bits)
121                 *bits = 0;
122
123         if (!*lockh) {
124                 EXIT;
125                 RETURN(0);
126         }
127
128         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
129
130         LASSERT(lock != NULL);
131         lock_res_and_lock(lock);
132 #ifdef __KERNEL__
133         if (lock->l_ast_data && lock->l_ast_data != data) {
134                 struct inode *new_inode = data;
135                 struct inode *old_inode = lock->l_ast_data;
136                 LASSERTF(old_inode->i_state & I_FREEING,
137                          "Found existing inode %p/%lu/%u state %lu in lock: "
138                          "setting data to %p/%lu/%u\n", old_inode,
139                          old_inode->i_ino, old_inode->i_generation,
140                          old_inode->i_state,
141                          new_inode, new_inode->i_ino, new_inode->i_generation);
142         }
143 #endif
144         lock->l_ast_data = data;
145         if (bits)
146                 *bits = lock->l_policy_data.l_inodebits.bits;
147
148         unlock_res_and_lock(lock);
149         LDLM_LOCK_PUT(lock);
150
151         RETURN(0);
152 }
153
154 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
155                            const struct lu_fid *fid, ldlm_type_t type,
156                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
157                            struct lustre_handle *lockh)
158 {
159         struct ldlm_res_id res_id;
160         ldlm_mode_t rc;
161         ENTRY;
162
163         fid_build_reg_res_name(fid, &res_id);
164         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
165                              &res_id, type, policy, mode, lockh, 0);
166         RETURN(rc);
167 }
168
169 int mdc_cancel_unused(struct obd_export *exp,
170                       const struct lu_fid *fid,
171                       ldlm_policy_data_t *policy,
172                       ldlm_mode_t mode, int flags, void *opaque)
173 {
174         struct ldlm_res_id res_id;
175         struct obd_device *obd = class_exp2obd(exp);
176         int rc;
177
178         ENTRY;
179
180         fid_build_reg_res_name(fid, &res_id);
181         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
182                                              policy, mode, flags, opaque);
183         RETURN(rc);
184 }
185
186 int mdc_change_cbdata(struct obd_export *exp,
187                       const struct lu_fid *fid,
188                       ldlm_iterator_t it, void *data)
189 {
190         struct ldlm_res_id res_id;
191         ENTRY;
192
193         fid_build_reg_res_name(fid, &res_id);
194         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
195                               &res_id, it, data);
196
197         EXIT;
198         return 0;
199 }
200
201 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
202 {
203         /* Don't hold error requests for replay. */
204         if (req->rq_replay) {
205                 spin_lock(&req->rq_lock);
206                 req->rq_replay = 0;
207                 spin_unlock(&req->rq_lock);
208         }
209         if (rc && req->rq_transno != 0) {
210                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
211                 LBUG();
212         }
213 }
214
215 /* Save a large LOV EA into the request buffer so that it is available
216  * for replay.  We don't do this in the initial request because the
217  * original request doesn't need this buffer (at most it sends just the
218  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
219  * buffer and may also be difficult to allocate and save a very large
220  * request buffer for each open. (bug 5707)
221  *
222  * OOM here may cause recovery failure if lmm is needed (only for the
223  * original open if the MDS crashed just when this client also OOM'd)
224  * but this is incredibly unlikely, and questionable whether the client
225  * could do MDS recovery under OOM anyways... */
226 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
227                                 struct mdt_body *body)
228 {
229         int     rc;
230
231         /* FIXME: remove this explicit offset. */
232         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
233                                         body->eadatasize);
234         if (rc) {
235                 CERROR("Can't enlarge segment %d size to %d\n",
236                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
237                 body->valid &= ~OBD_MD_FLEASIZE;
238                 body->eadatasize = 0;
239         }
240 }
241
242 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
243                                                    struct lookup_intent *it,
244                                                    struct md_op_data *op_data,
245                                                    void *lmm, int lmmsize,
246                                                    void *cb_data)
247 {
248         struct ptlrpc_request *req;
249         struct obd_device     *obddev = class_exp2obd(exp);
250         struct ldlm_intent    *lit;
251         CFS_LIST_HEAD(cancels);
252         int                    count = 0;
253         int                    mode;
254         int                    rc;
255         ENTRY;
256
257         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
258
259         /* XXX: openlock is not cancelled for cross-refs. */
260         /* If inode is known, cancel conflicting OPEN locks. */
261         if (fid_is_sane(&op_data->op_fid2)) {
262                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
263                         mode = LCK_CW;
264 #ifdef FMODE_EXEC
265                 else if (it->it_flags & FMODE_EXEC)
266                         mode = LCK_PR;
267 #endif
268                 else
269                         mode = LCK_CR;
270                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
271                                                 &cancels, mode,
272                                                 MDS_INODELOCK_OPEN);
273         }
274
275         /* If CREATE, cancel parent's UPDATE lock. */
276         if (it->it_op & IT_CREAT)
277                 mode = LCK_EX;
278         else
279                 mode = LCK_CR;
280         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
281                                          &cancels, mode,
282                                          MDS_INODELOCK_UPDATE);
283
284         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
285                                    &RQF_LDLM_INTENT_OPEN);
286         if (req == NULL) {
287                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
288                 RETURN(ERR_PTR(-ENOMEM));
289         }
290
291         /* parent capability */
292         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
293         /* child capability, reserve the size according to parent capa, it will
294          * be filled after we get the reply */
295         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
296
297         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
298                              op_data->op_namelen + 1);
299         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
300                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
301
302         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
303         if (rc) {
304                 ptlrpc_request_free(req);
305                 return NULL;
306         }
307
308         spin_lock(&req->rq_lock);
309         req->rq_replay = req->rq_import->imp_replayable;
310         spin_unlock(&req->rq_lock);
311
312         /* pack the intent */
313         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
314         lit->opc = (__u64)it->it_op;
315
316         /* pack the intended request */
317         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
318                       lmmsize);
319
320         /* for remote client, fetch remote perm for current user */
321         if (client_is_remote(exp))
322                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
323                                      sizeof(struct mdt_remote_perm));
324         ptlrpc_request_set_replen(req);
325         return req;
326 }
327
328 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
329                                                      struct lookup_intent *it,
330                                                      struct md_op_data *op_data)
331 {
332         struct ptlrpc_request *req;
333         struct obd_device     *obddev = class_exp2obd(exp);
334         struct ldlm_intent    *lit;
335         int                    rc;
336         ENTRY;
337
338         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
339                                    &RQF_LDLM_INTENT_UNLINK);
340         if (req == NULL)
341                 RETURN(ERR_PTR(-ENOMEM));
342
343         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
344         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
345                              op_data->op_namelen + 1);
346
347         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
348         if (rc) {
349                 ptlrpc_request_free(req);
350                 RETURN(ERR_PTR(rc));
351         }
352
353         /* pack the intent */
354         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
355         lit->opc = (__u64)it->it_op;
356
357         /* pack the intended request */
358         mdc_unlink_pack(req, op_data);
359
360         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
361                              obddev->u.cli.cl_max_mds_easize);
362         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
363                              obddev->u.cli.cl_max_mds_cookiesize);
364         ptlrpc_request_set_replen(req);
365         RETURN(req);
366 }
367
368 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
369                                                       struct lookup_intent *it,
370                                                       struct md_op_data *op_data)
371 {
372         struct ptlrpc_request *req;
373         struct obd_device     *obddev = class_exp2obd(exp);
374         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
375                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
376                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
377                                        (client_is_remote(exp) ?
378                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
379         struct ldlm_intent    *lit;
380         int                    rc;
381         ENTRY;
382
383         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
384                                    &RQF_LDLM_INTENT_GETATTR);
385         if (req == NULL)
386                 RETURN(ERR_PTR(-ENOMEM));
387
388         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
389         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
390                              op_data->op_namelen + 1);
391
392         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
393         if (rc) {
394                 ptlrpc_request_free(req);
395                 RETURN(ERR_PTR(rc));
396         }
397
398         /* pack the intent */
399         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
400         lit->opc = (__u64)it->it_op;
401
402         /* pack the intended request */
403         mdc_getattr_pack(req, valid, it->it_flags, op_data);
404
405         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
406                              obddev->u.cli.cl_max_mds_easize);
407         if (client_is_remote(exp))
408                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
409                                      sizeof(struct mdt_remote_perm));
410         ptlrpc_request_set_replen(req);
411         RETURN(req);
412 }
413
414 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
415 {
416         struct ptlrpc_request *req;
417         int rc;
418         ENTRY;
419
420         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
421         if (req == NULL)
422                 RETURN(ERR_PTR(-ENOMEM));
423
424         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
425         if (rc) {
426                 ptlrpc_request_free(req);
427                 RETURN(ERR_PTR(rc));
428         }
429
430         ptlrpc_request_set_replen(req);
431         RETURN(req);
432 }
433
434 static int mdc_finish_enqueue(struct obd_export *exp,
435                               struct ptlrpc_request *req,
436                               struct ldlm_enqueue_info *einfo,
437                               struct lookup_intent *it,
438                               struct lustre_handle *lockh,
439                               int rc)
440 {
441         struct req_capsule  *pill = &req->rq_pill;
442         struct ldlm_request *lockreq;
443         struct ldlm_reply   *lockrep;
444         ENTRY;
445
446         LASSERT(rc >= 0);
447         /* Similarly, if we're going to replay this request, we don't want to
448          * actually get a lock, just perform the intent. */
449         if (req->rq_transno || req->rq_replay) {
450                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
451                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
452         }
453
454         if (rc == ELDLM_LOCK_ABORTED) {
455                 einfo->ei_mode = 0;
456                 memset(lockh, 0, sizeof(*lockh));
457                 rc = 0;
458         } else { /* rc = 0 */
459                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
460                 LASSERT(lock);
461
462                 /* If the server gave us back a different lock mode, we should
463                  * fix up our variables. */
464                 if (lock->l_req_mode != einfo->ei_mode) {
465                         ldlm_lock_addref(lockh, lock->l_req_mode);
466                         ldlm_lock_decref(lockh, einfo->ei_mode);
467                         einfo->ei_mode = lock->l_req_mode;
468                 }
469                 LDLM_LOCK_PUT(lock);
470         }
471
472         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
473         LASSERT(lockrep != NULL);                 /* checked by ldlm_cli_enqueue() */
474
475         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
476         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
477         it->d.lustre.it_lock_mode = einfo->ei_mode;
478         it->d.lustre.it_lock_handle = lockh->cookie;
479         it->d.lustre.it_data = req;
480
481         if (it->d.lustre.it_status < 0 && req->rq_replay)
482                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
483
484         /* If we're doing an IT_OPEN which did not result in an actual
485          * successful open, then we need to remove the bit which saves
486          * this request for unconditional replay.
487          *
488          * It's important that we do this first!  Otherwise we might exit the
489          * function without doing so, and try to replay a failed create
490          * (bug 3440) */
491         if (it->it_op & IT_OPEN && req->rq_replay &&
492             (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
493                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
494
495         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
496                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
497
498         /* We know what to expect, so we do any byte flipping required here */
499         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
500                 struct mdt_body *body;
501
502                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
503                 if (body == NULL) {
504                         CERROR ("Can't swab mdt_body\n");
505                         RETURN (-EPROTO);
506                 }
507
508                 if (it_disposition(it, DISP_OPEN_OPEN) &&
509                     !it_open_error(DISP_OPEN_OPEN, it)) {
510                         /*
511                          * If this is a successful OPEN request, we need to set
512                          * replay handler and data early, so that if replay
513                          * happens immediately after swabbing below, new reply
514                          * is swabbed by that handler correctly.
515                          */
516                         mdc_set_open_replay_data(NULL, NULL, req);
517                 }
518
519                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
520                         void *eadata;
521
522                          mdc_update_max_ea_from_body(exp, body);
523
524                         /*
525                          * The eadata is opaque; just check that it is there.
526                          * Eventually, obd_unpackmd() will check the contents.
527                          */
528                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
529                                                               body->eadatasize);
530                         if (eadata == NULL)
531                                 RETURN(-EPROTO);
532
533                         /*
534                          * We save the reply LOV EA in case we have to replay a
535                          * create for recovery.  If we didn't allocate a large
536                          * enough request buffer above we need to reallocate it
537                          * here to hold the actual LOV EA.
538                          *
539                          * To not save LOV EA if request is not going to replay
540                          * (for example error one).
541                          */
542                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
543                                 void *lmm;
544                                 if (req_capsule_get_size(pill, &RMF_EADATA,
545                                                          RCL_CLIENT) <
546                                     body->eadatasize)
547                                         mdc_realloc_openmsg(req, body);
548                                 else
549                                         req_capsule_shrink(pill, &RMF_EADATA,
550                                                            body->eadatasize,
551                                                            RCL_CLIENT);
552
553                                 req_capsule_set_size(pill, &RMF_EADATA,
554                                                      RCL_CLIENT,
555                                                      body->eadatasize);
556
557                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
558                                 if (lmm)
559                                         memcpy(lmm, eadata, body->eadatasize);
560                         }
561                 }
562
563                 if (body->valid & OBD_MD_FLRMTPERM) {
564                         struct mdt_remote_perm *perm;
565
566                         LASSERT(client_is_remote(exp));
567                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
568                                                 lustre_swab_mdt_remote_perm);
569                         if (perm == NULL)
570                                 RETURN(-EPROTO);
571                 }
572                 if (body->valid & OBD_MD_FLMDSCAPA) {
573                         struct lustre_capa *capa, *p;
574
575                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
576                         if (capa == NULL)
577                                 RETURN(-EPROTO);
578
579                         if (it->it_op & IT_OPEN) {
580                                 /* client fid capa will be checked in replay */
581                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
582                                 LASSERT(p);
583                                 *p = *capa;
584                         }
585                 }
586                 if (body->valid & OBD_MD_FLOSSCAPA) {
587                         struct lustre_capa *capa;
588
589                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
590                         if (capa == NULL)
591                                 RETURN(-EPROTO);
592                 }
593         }
594
595         RETURN(rc);
596 }
597
598 /* We always reserve enough space in the reply packet for a stripe MD, because
599  * we don't know in advance the file type. */
600 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
601                 struct lookup_intent *it, struct md_op_data *op_data,
602                 struct lustre_handle *lockh, void *lmm, int lmmsize,
603                 struct ptlrpc_request **reqp, int extra_lock_flags)
604 {
605         struct obd_device     *obddev = class_exp2obd(exp);
606         struct ptlrpc_request *req = NULL;
607         struct req_capsule    *pill;
608         int                    flags = extra_lock_flags;
609         int                    rc;
610         struct ldlm_res_id res_id;
611         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
612         ENTRY;
613
614         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
615                  einfo->ei_type);
616
617         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
618
619         if (it)
620                 flags |= LDLM_FL_HAS_INTENT;
621         if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
622                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
623
624         if (reqp)
625                 req = *reqp;
626
627         if (!it) {
628                 /* The only way right now is FLOCK, in this case we hide flock
629                    policy as lmm, but lmmsize is 0 */
630                 LASSERT(lmm && lmmsize == 0);
631                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
632                          einfo->ei_type);
633                 policy = *(ldlm_policy_data_t *)lmm;
634                 res_id.name[3] = LDLM_FLOCK;
635         } else if (it->it_op & IT_OPEN) {
636                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
637                                            einfo->ei_cbdata);
638                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
639                 einfo->ei_cbdata = NULL;
640                 lmm = NULL;
641         } else if (it->it_op & IT_UNLINK)
642                 req = mdc_intent_unlink_pack(exp, it, op_data);
643         else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
644                 req = mdc_intent_getattr_pack(exp, it, op_data);
645         else if (it->it_op == IT_READDIR)
646                 req = ldlm_enqueue_pack(exp);
647         else {
648                 LBUG();
649                 RETURN(-EINVAL);
650         }
651
652         if (IS_ERR(req))
653                 RETURN(PTR_ERR(req));
654         pill = &req->rq_pill;
655
656         /* It is important to obtain rpc_lock first (if applicable), so that
657          * threads that are serialised with rpc_lock are not polluting our
658          * rpcs in flight counter. We do not do flock request limiting, though*/
659         if (it) {
660                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
661                 mdc_enter_request(&obddev->u.cli);
662         }
663         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
664                               0, NULL, lockh, 0);
665         if (reqp)
666                 *reqp = req;
667
668         if (it) {
669                 mdc_exit_request(&obddev->u.cli);
670                 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
671         }
672         if (!it) {
673                 /* For flock requests we immediatelly return without further
674                    delay and let caller deal with the rest, since rest of
675                    this function metadata processing makes no sense for flock
676                    requests anyway */
677                 RETURN(rc);
678         }
679
680         if (rc < 0) {
681                 CERROR("ldlm_cli_enqueue: %d\n", rc);
682                 mdc_clear_replay_flag(req, rc);
683                 ptlrpc_req_finished(req);
684                 RETURN(rc);
685         }
686         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
687
688         RETURN(rc);
689 }
690
691 static int mdc_finish_intent_lock(struct obd_export *exp,
692                                   struct ptlrpc_request *request,
693                                   struct md_op_data *op_data,
694                                   struct lookup_intent *it,
695                                   struct lustre_handle *lockh)
696 {
697         struct lustre_handle old_lock;
698         struct mdt_body *mdt_body;
699         struct ldlm_lock *lock;
700         int rc;
701
702
703         LASSERT(request != NULL);
704         LASSERT(request != LP_POISON);
705         LASSERT(request->rq_repmsg != LP_POISON);
706
707         if (!it_disposition(it, DISP_IT_EXECD)) {
708                 /* The server failed before it even started executing the
709                  * intent, i.e. because it couldn't unpack the request. */
710                 LASSERT(it->d.lustre.it_status != 0);
711                 RETURN(it->d.lustre.it_status);
712         }
713         rc = it_open_error(DISP_IT_EXECD, it);
714         if (rc)
715                 RETURN(rc);
716
717         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
718         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
719
720         /* If we were revalidating a fid/name pair, mark the intent in
721          * case we fail and get called again from lookup */
722         if (fid_is_sane(&op_data->op_fid2) &&
723             it->it_create_mode & M_CHECK_STALE &&
724             it->it_op != IT_GETATTR) {
725                 it_set_disposition(it, DISP_ENQ_COMPLETE);
726
727                 /* Also: did we find the same inode? */
728                 /* sever can return one of two fids:
729                  * op_fid2 - new allocated fid - if file is created.
730                  * op_fid3 - existent fid - if file only open.
731                  * op_fid3 is saved in lmv_intent_open */
732                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
733                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
734                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
735                                "\n", PFID(&op_data->op_fid2),
736                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
737                         RETURN(-ESTALE);
738                 }
739         }
740
741         rc = it_open_error(DISP_LOOKUP_EXECD, it);
742         if (rc)
743                 RETURN(rc);
744
745         /* keep requests around for the multiple phases of the call
746          * this shows the DISP_XX must guarantee we make it into the call
747          */
748         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
749             it_disposition(it, DISP_OPEN_CREATE) &&
750             !it_open_error(DISP_OPEN_CREATE, it)) {
751                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
752                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
753         }
754         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
755             it_disposition(it, DISP_OPEN_OPEN) &&
756             !it_open_error(DISP_OPEN_OPEN, it)) {
757                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
758                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
759                 /* BUG 11546 - eviction in the middle of open rpc processing */
760                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
761         }
762
763         if (it->it_op & IT_CREAT) {
764                 /* XXX this belongs in ll_create_it */
765         } else if (it->it_op == IT_OPEN) {
766                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
767         } else {
768                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
769         }
770
771         /* If we already have a matching lock, then cancel the new
772          * one.  We have to set the data here instead of in
773          * mdc_enqueue, because we need to use the child's inode as
774          * the l_ast_data to match, and that's not available until
775          * intent_finish has performed the iget().) */
776         lock = ldlm_handle2lock(lockh);
777         if (lock) {
778                 ldlm_policy_data_t policy = lock->l_policy_data;
779                 LDLM_DEBUG(lock, "matching against this");
780
781                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
782                                          &lock->l_resource->lr_name),
783                          "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
784                          (unsigned long)lock->l_resource->lr_name.name[0],
785                          (unsigned long)lock->l_resource->lr_name.name[1],
786                          (unsigned long)lock->l_resource->lr_name.name[2],
787                          (unsigned long)fid_seq(&mdt_body->fid1),
788                          (unsigned long)fid_oid(&mdt_body->fid1),
789                          (unsigned long)fid_ver(&mdt_body->fid1));
790                 LDLM_LOCK_PUT(lock);
791
792                 memcpy(&old_lock, lockh, sizeof(*lockh));
793                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
794                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
795                         ldlm_lock_decref_and_cancel(lockh,
796                                                     it->d.lustre.it_lock_mode);
797                         memcpy(lockh, &old_lock, sizeof(old_lock));
798                         it->d.lustre.it_lock_handle = lockh->cookie;
799                 }
800         }
801         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
802                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
803                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
804         RETURN(rc);
805 }
806
807 /*
808  * This long block is all about fixing up the lock and request state
809  * so that it is correct as of the moment _before_ the operation was
810  * applied; that way, the VFS will think that everything is normal and
811  * call Lustre's regular VFS methods.
812  *
813  * If we're performing a creation, that means that unless the creation
814  * failed with EEXIST, we should fake up a negative dentry.
815  *
816  * For everything else, we want to lookup to succeed.
817  *
818  * One additional note: if CREATE or OPEN succeeded, we add an extra
819  * reference to the request because we need to keep it around until
820  * ll_create/ll_open gets called.
821  *
822  * The server will return to us, in it_disposition, an indication of
823  * exactly what d.lustre.it_status refers to.
824  *
825  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
826  * otherwise if DISP_OPEN_CREATE is set, then it status is the
827  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
828  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
829  * was successful.
830  *
831  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
832  * child lookup.
833  */
834 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
835                     void *lmm, int lmmsize, struct lookup_intent *it,
836                     int lookup_flags, struct ptlrpc_request **reqp,
837                     ldlm_blocking_callback cb_blocking,
838                     int extra_lock_flags)
839 {
840         struct lustre_handle lockh;
841         int rc = 0;
842         ENTRY;
843         LASSERT(it);
844
845         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
846                ", intent: %s flags %#o\n", op_data->op_namelen,
847                op_data->op_name, PFID(&op_data->op_fid2),
848                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
849                it->it_flags);
850
851         lockh.cookie = 0;
852         if (fid_is_sane(&op_data->op_fid2) &&
853             (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
854                 /* We could just return 1 immediately, but since we should only
855                  * be called in revalidate_it if we already have a lock, let's
856                  * verify that. */
857                 ldlm_policy_data_t policy;
858                 ldlm_mode_t mode;
859
860                 /* As not all attributes are kept under update lock, e.g.
861                    owner/group/acls are under lookup lock, we need both
862                    ibits for GETATTR. */
863
864                 /* For CMD, UPDATE lock and LOOKUP lock can not be got
865                  * at the same for cross-object, so we can not match
866                  * the 2 lock at the same time FIXME: but how to handle
867                  * the above situation */
868                 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
869                         MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
870
871                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED,
872                                       &op_data->op_fid2, LDLM_IBITS, &policy,
873                                       LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
874                 if (mode) {
875                         it->d.lustre.it_lock_handle = lockh.cookie;
876                         it->d.lustre.it_lock_mode = mode;
877                 }
878
879                 /* Only return failure if it was not GETATTR by cfid
880                    (from inode_revalidate) */
881                 if (mode || op_data->op_namelen != 0)
882                         RETURN(!!mode);
883         }
884
885         /* lookup_it may be called only after revalidate_it has run, because
886          * revalidate_it cannot return errors, only zero.  Returning zero causes
887          * this call to lookup, which *can* return an error.
888          *
889          * We only want to execute the request associated with the intent one
890          * time, however, so don't send the request again.  Instead, skip past
891          * this and use the request from revalidate.  In this case, revalidate
892          * never dropped its reference, so the refcounts are all OK */
893         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
894                 struct ldlm_enqueue_info einfo =
895                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
896                           ldlm_completion_ast, NULL, NULL, NULL };
897
898                 /* For case if upper layer did not alloc fid, do it now. */
899                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
900                         rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
901                         if (rc < 0) {
902                                 CERROR("Can't alloc new fid, rc %d\n", rc);
903                                 RETURN(rc);
904                         }
905                 }
906                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
907                                  lmm, lmmsize, NULL, extra_lock_flags);
908                 if (rc < 0)
909                         RETURN(rc);
910         } else if (!fid_is_sane(&op_data->op_fid2) ||
911                    !(it->it_create_mode & M_CHECK_STALE)) {
912                 /* DISP_ENQ_COMPLETE set means there is extra reference on
913                  * request referenced from this intent, saved for subsequent
914                  * lookup.  This path is executed when we proceed to this
915                  * lookup, so we clear DISP_ENQ_COMPLETE */
916                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
917         }
918         *reqp = it->d.lustre.it_data;
919         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
920         RETURN(rc);
921 }
922
923 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
924                                               struct ptlrpc_request *req,
925                                               void *unused, int rc)
926 {
927         struct obd_export        *exp = req->rq_async_args.pointer_arg[0];
928         struct md_enqueue_info   *minfo = req->rq_async_args.pointer_arg[1];
929         struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
930         struct lookup_intent     *it;
931         struct lustre_handle     *lockh;
932         struct obd_device        *obddev;
933         int                       flags = LDLM_FL_HAS_INTENT;
934         ENTRY;
935
936         it    = &minfo->mi_it;
937         lockh = &minfo->mi_lockh;
938
939         obddev = class_exp2obd(exp);
940
941         mdc_exit_request(&obddev->u.cli);
942         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
943                 rc = -ETIMEDOUT;
944
945         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
946                                    &flags, NULL, 0, NULL, lockh, rc);
947         if (rc < 0) {
948                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
949                 mdc_clear_replay_flag(req, rc);
950                 GOTO(out, rc);
951         }
952
953         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
954         if (rc)
955                 GOTO(out, rc);
956
957         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
958         EXIT;
959
960 out:
961         OBD_FREE_PTR(einfo);
962         minfo->mi_cb(req, minfo, rc);
963         return 0;
964 }
965
966 int mdc_intent_getattr_async(struct obd_export *exp,
967                              struct md_enqueue_info *minfo,
968                              struct ldlm_enqueue_info *einfo)
969 {
970         struct md_op_data       *op_data = &minfo->mi_data;
971         struct lookup_intent    *it = &minfo->mi_it;
972         struct ptlrpc_request   *req;
973         struct obd_device       *obddev = class_exp2obd(exp);
974         struct ldlm_res_id       res_id;
975         ldlm_policy_data_t       policy = {
976                                         .l_inodebits = { MDS_INODELOCK_LOOKUP }
977                                  };
978         int                      rc;
979         int                      flags = LDLM_FL_HAS_INTENT;
980         ENTRY;
981
982         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
983                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
984                ldlm_it2str(it->it_op), it->it_flags);
985
986         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
987         req = mdc_intent_getattr_pack(exp, it, op_data);
988         if (!req)
989                 RETURN(-ENOMEM);
990
991         mdc_enter_request(&obddev->u.cli);
992         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
993                               0, NULL, &minfo->mi_lockh, 1);
994         if (rc < 0) {
995                 mdc_exit_request(&obddev->u.cli);
996                 RETURN(rc);
997         }
998
999         req->rq_async_args.pointer_arg[0] = exp;
1000         req->rq_async_args.pointer_arg[1] = minfo;
1001         req->rq_async_args.pointer_arg[2] = einfo;
1002         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1003         ptlrpcd_add_req(req, PSCOPE_OTHER);
1004
1005         RETURN(0);
1006 }
1007
1008 int mdc_revalidate_lock(struct obd_export *exp,
1009                         struct lookup_intent *it,
1010                         struct lu_fid *fid)
1011 {
1012         /* We could just return 1 immediately, but since we should only
1013          * be called in revalidate_it if we already have a lock, let's
1014          * verify that. */
1015         struct ldlm_res_id res_id;
1016         struct lustre_handle lockh;
1017         ldlm_policy_data_t policy;
1018         ldlm_mode_t mode;
1019         ENTRY;
1020
1021         fid_build_reg_res_name(fid, &res_id);
1022         /* As not all attributes are kept under update lock, e.g.
1023            owner/group/acls are under lookup lock, we need both
1024            ibits for GETATTR. */
1025         policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
1026                 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
1027                 MDS_INODELOCK_LOOKUP;
1028
1029         mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1030                                LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
1031                                &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1032         if (mode) {
1033                 it->d.lustre.it_lock_handle = lockh.cookie;
1034                 it->d.lustre.it_lock_mode = mode;
1035         }
1036
1037         RETURN(!!mode);
1038 }