Whamcloud - gitweb
correctly handle too big reply message.
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_MDC
41
42 #ifdef __KERNEL__
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
47 #else
48 # include <liblustre.h>
49 #endif
50
51 #include <lustre_acl.h>
52 #include <obd_class.h>
53 #include <lustre_dlm.h>
54 /* fid_res_name_eq() */
55 #include <lustre_fid.h>
56 #include <lprocfs_status.h>
57 #include "mdc_internal.h"
58
59 int it_disposition(struct lookup_intent *it, int flag)
60 {
61         return it->d.lustre.it_disposition & flag;
62 }
63 EXPORT_SYMBOL(it_disposition);
64
65 void it_set_disposition(struct lookup_intent *it, int flag)
66 {
67         it->d.lustre.it_disposition |= flag;
68 }
69 EXPORT_SYMBOL(it_set_disposition);
70
71 void it_clear_disposition(struct lookup_intent *it, int flag)
72 {
73         it->d.lustre.it_disposition &= ~flag;
74 }
75 EXPORT_SYMBOL(it_clear_disposition);
76
77 int it_open_error(int phase, struct lookup_intent *it)
78 {
79         if (it_disposition(it, DISP_OPEN_OPEN)) {
80                 if (phase >= DISP_OPEN_OPEN)
81                         return it->d.lustre.it_status;
82                 else
83                         return 0;
84         }
85
86         if (it_disposition(it, DISP_OPEN_CREATE)) {
87                 if (phase >= DISP_OPEN_CREATE)
88                         return it->d.lustre.it_status;
89                 else
90                         return 0;
91         }
92
93         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
94                 if (phase >= DISP_LOOKUP_EXECD)
95                         return it->d.lustre.it_status;
96                 else
97                         return 0;
98         }
99
100         if (it_disposition(it, DISP_IT_EXECD)) {
101                 if (phase >= DISP_IT_EXECD)
102                         return it->d.lustre.it_status;
103                 else
104                         return 0;
105         }
106         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
107                it->d.lustre.it_status);
108         LBUG();
109         return 0;
110 }
111 EXPORT_SYMBOL(it_open_error);
112
113 /* this must be called on a lockh that is known to have a referenced lock */
114 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
115                       __u32 *bits)
116 {
117         struct ldlm_lock *lock;
118         ENTRY;
119
120         if(bits)
121                 *bits = 0;
122
123         if (!*lockh) {
124                 EXIT;
125                 RETURN(0);
126         }
127
128         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
129
130         LASSERT(lock != NULL);
131         lock_res_and_lock(lock);
132 #ifdef __KERNEL__
133         if (lock->l_ast_data && lock->l_ast_data != data) {
134                 struct inode *new_inode = data;
135                 struct inode *old_inode = lock->l_ast_data;
136                 LASSERTF(old_inode->i_state & I_FREEING,
137                          "Found existing inode %p/%lu/%u state %lu in lock: "
138                          "setting data to %p/%lu/%u\n", old_inode,
139                          old_inode->i_ino, old_inode->i_generation,
140                          old_inode->i_state,
141                          new_inode, new_inode->i_ino, new_inode->i_generation);
142         }
143 #endif
144         lock->l_ast_data = data;
145         if (bits)
146                 *bits = lock->l_policy_data.l_inodebits.bits;
147
148         unlock_res_and_lock(lock);
149         LDLM_LOCK_PUT(lock);
150
151         RETURN(0);
152 }
153
154 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
155                            const struct lu_fid *fid, ldlm_type_t type,
156                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
157                            struct lustre_handle *lockh)
158 {
159         struct ldlm_res_id res_id;
160         ldlm_mode_t rc;
161         ENTRY;
162
163         fid_build_reg_res_name(fid, &res_id);
164         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
165                              &res_id, type, policy, mode, lockh, 0);
166         RETURN(rc);
167 }
168
169 int mdc_cancel_unused(struct obd_export *exp,
170                       const struct lu_fid *fid,
171                       ldlm_policy_data_t *policy,
172                       ldlm_mode_t mode, int flags, void *opaque)
173 {
174         struct ldlm_res_id res_id;
175         struct obd_device *obd = class_exp2obd(exp);
176         int rc;
177
178         ENTRY;
179
180         fid_build_reg_res_name(fid, &res_id);
181         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
182                                              policy, mode, flags, opaque);
183         RETURN(rc);
184 }
185
186 int mdc_change_cbdata(struct obd_export *exp,
187                       const struct lu_fid *fid,
188                       ldlm_iterator_t it, void *data)
189 {
190         struct ldlm_res_id res_id;
191         ENTRY;
192
193         fid_build_reg_res_name(fid, &res_id);
194         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
195                               &res_id, it, data);
196
197         EXIT;
198         return 0;
199 }
200
201 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
202 {
203         /* Don't hold error requests for replay. */
204         if (req->rq_replay) {
205                 spin_lock(&req->rq_lock);
206                 req->rq_replay = 0;
207                 spin_unlock(&req->rq_lock);
208         }
209         if (rc && req->rq_transno != 0) {
210                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
211                 LBUG();
212         }
213 }
214
215 /* Save a large LOV EA into the request buffer so that it is available
216  * for replay.  We don't do this in the initial request because the
217  * original request doesn't need this buffer (at most it sends just the
218  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
219  * buffer and may also be difficult to allocate and save a very large
220  * request buffer for each open. (bug 5707)
221  *
222  * OOM here may cause recovery failure if lmm is needed (only for the
223  * original open if the MDS crashed just when this client also OOM'd)
224  * but this is incredibly unlikely, and questionable whether the client
225  * could do MDS recovery under OOM anyways... */
226 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
227                                 struct mdt_body *body)
228 {
229         int     rc;
230
231         /* FIXME: remove this explicit offset. */
232         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
233                                         body->eadatasize);
234         if (rc) {
235                 CERROR("Can't enlarge segment %d size to %d\n",
236                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
237                 body->valid &= ~OBD_MD_FLEASIZE;
238                 body->eadatasize = 0;
239         }
240 }
241
242 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
243                                                    struct lookup_intent *it,
244                                                    struct md_op_data *op_data,
245                                                    void *lmm, int lmmsize,
246                                                    void *cb_data)
247 {
248         struct ptlrpc_request *req;
249         struct obd_device     *obddev = class_exp2obd(exp);
250         struct ldlm_intent    *lit;
251         int           joinfile = !!((it->it_create_mode & M_JOIN_FILE) &&
252                                     op_data->op_data);
253         CFS_LIST_HEAD(cancels);
254         int                    count = 0;
255         int                    mode;
256         int                    rc;
257         ENTRY;
258
259         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
260
261         /* XXX: openlock is not cancelled for cross-refs. */
262         /* If inode is known, cancel conflicting OPEN locks. */
263         if (fid_is_sane(&op_data->op_fid2)) {
264                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
265                         mode = LCK_CW;
266 #ifdef FMODE_EXEC
267                 else if (it->it_flags & FMODE_EXEC)
268                         mode = LCK_PR;
269 #endif
270                 else
271                         mode = LCK_CR;
272                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
273                                                 &cancels, mode,
274                                                 MDS_INODELOCK_OPEN);
275         }
276
277         /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
278         if (it->it_op & IT_CREAT || joinfile)
279                 mode = LCK_EX;
280         else
281                 mode = LCK_CR;
282         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
283                                          &cancels, mode,
284                                          MDS_INODELOCK_UPDATE);
285
286         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
287                                    &RQF_LDLM_INTENT_OPEN);
288         if (req == NULL) {
289                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
290                 RETURN(ERR_PTR(-ENOMEM));
291         }
292
293         /* parent capability */
294         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
295         /* child capability, reserve the size according to parent capa, it will
296          * be filled after we get the reply */
297         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
298
299         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
300                              op_data->op_namelen + 1);
301         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
302                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
303         if (!joinfile) {
304                 req_capsule_set_size(&req->rq_pill, &RMF_REC_JOINFILE,
305                                      RCL_CLIENT, 0);
306         }
307
308         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
309         if (rc) {
310                 ptlrpc_request_free(req);
311                 return NULL;
312         }
313
314         if (joinfile) {
315                 __u64 head_size = *(__u64 *)op_data->op_data;
316                 mdc_join_pack(req, op_data, head_size);
317         }
318
319         spin_lock(&req->rq_lock);
320         req->rq_replay = req->rq_import->imp_replayable;
321         spin_unlock(&req->rq_lock);
322
323         /* pack the intent */
324         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
325         lit->opc = (__u64)it->it_op;
326
327         /* pack the intended request */
328         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
329                       lmmsize);
330
331         /* for remote client, fetch remote perm for current user */
332         if (client_is_remote(exp))
333                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
334                                      sizeof(struct mdt_remote_perm));
335         ptlrpc_request_set_replen(req);
336         return req;
337 }
338
339 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
340                                                      struct lookup_intent *it,
341                                                      struct md_op_data *op_data)
342 {
343         struct ptlrpc_request *req;
344         struct obd_device     *obddev = class_exp2obd(exp);
345         struct ldlm_intent    *lit;
346         int                    rc;
347         ENTRY;
348
349         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
350                                    &RQF_LDLM_INTENT_UNLINK);
351         if (req == NULL)
352                 RETURN(ERR_PTR(-ENOMEM));
353
354         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
355         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
356                              op_data->op_namelen + 1);
357
358         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
359         if (rc) {
360                 ptlrpc_request_free(req);
361                 RETURN(ERR_PTR(rc));
362         }
363
364         /* pack the intent */
365         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
366         lit->opc = (__u64)it->it_op;
367
368         /* pack the intended request */
369         mdc_unlink_pack(req, op_data);
370
371         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
372                              obddev->u.cli.cl_max_mds_easize);
373         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
374                              obddev->u.cli.cl_max_mds_cookiesize);
375         ptlrpc_request_set_replen(req);
376         RETURN(req);
377 }
378
379 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
380                                                       struct lookup_intent *it,
381                                                       struct md_op_data *op_data)
382 {
383         struct ptlrpc_request *req;
384         struct obd_device     *obddev = class_exp2obd(exp);
385         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
386                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
387                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
388                                        (client_is_remote(exp) ?
389                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
390         struct ldlm_intent    *lit;
391         int                    rc;
392         ENTRY;
393
394         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
395                                    &RQF_LDLM_INTENT_GETATTR);
396         if (req == NULL)
397                 RETURN(ERR_PTR(-ENOMEM));
398
399         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
400         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
401                              op_data->op_namelen + 1);
402
403         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
404         if (rc) {
405                 ptlrpc_request_free(req);
406                 RETURN(ERR_PTR(rc));
407         }
408
409         /* pack the intent */
410         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
411         lit->opc = (__u64)it->it_op;
412
413         /* pack the intended request */
414         mdc_getattr_pack(req, valid, it->it_flags, op_data);
415
416         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
417                              obddev->u.cli.cl_max_mds_easize);
418         if (client_is_remote(exp))
419                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
420                                      sizeof(struct mdt_remote_perm));
421         ptlrpc_request_set_replen(req);
422         RETURN(req);
423 }
424
425 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
426 {
427         struct ptlrpc_request *req;
428         int rc;
429         ENTRY;
430
431         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
432         if (req == NULL)
433                 RETURN(ERR_PTR(-ENOMEM));
434
435         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
436         if (rc) {
437                 ptlrpc_request_free(req);
438                 RETURN(ERR_PTR(rc));
439         }
440
441         ptlrpc_request_set_replen(req);
442         RETURN(req);
443 }
444
445 static int mdc_finish_enqueue(struct obd_export *exp,
446                               struct ptlrpc_request *req,
447                               struct ldlm_enqueue_info *einfo,
448                               struct lookup_intent *it,
449                               struct lustre_handle *lockh,
450                               int rc)
451 {
452         struct req_capsule  *pill = &req->rq_pill;
453         struct ldlm_request *lockreq;
454         struct ldlm_reply   *lockrep;
455         ENTRY;
456
457         LASSERT(rc >= 0);
458         /* Similarly, if we're going to replay this request, we don't want to
459          * actually get a lock, just perform the intent. */
460         if (req->rq_transno || req->rq_replay) {
461                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
462                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
463         }
464
465         if (rc == ELDLM_LOCK_ABORTED) {
466                 einfo->ei_mode = 0;
467                 memset(lockh, 0, sizeof(*lockh));
468                 rc = 0;
469         } else { /* rc = 0 */
470                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
471                 LASSERT(lock);
472
473                 /* If the server gave us back a different lock mode, we should
474                  * fix up our variables. */
475                 if (lock->l_req_mode != einfo->ei_mode) {
476                         ldlm_lock_addref(lockh, lock->l_req_mode);
477                         ldlm_lock_decref(lockh, einfo->ei_mode);
478                         einfo->ei_mode = lock->l_req_mode;
479                 }
480                 LDLM_LOCK_PUT(lock);
481         }
482
483         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
484         LASSERT(lockrep != NULL);                 /* checked by ldlm_cli_enqueue() */
485
486         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
487         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
488         it->d.lustre.it_lock_mode = einfo->ei_mode;
489         it->d.lustre.it_lock_handle = lockh->cookie;
490         it->d.lustre.it_data = req;
491
492         if (it->d.lustre.it_status < 0 && req->rq_replay)
493                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
494
495         /* If we're doing an IT_OPEN which did not result in an actual
496          * successful open, then we need to remove the bit which saves
497          * this request for unconditional replay.
498          *
499          * It's important that we do this first!  Otherwise we might exit the
500          * function without doing so, and try to replay a failed create
501          * (bug 3440) */
502         if (it->it_op & IT_OPEN && req->rq_replay &&
503             (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
504                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
505
506         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
507                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
508
509         /* We know what to expect, so we do any byte flipping required here */
510         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
511                 struct mdt_body *body;
512
513                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
514                 if (body == NULL) {
515                         CERROR ("Can't swab mdt_body\n");
516                         RETURN (-EPROTO);
517                 }
518
519                 if (it_disposition(it, DISP_OPEN_OPEN) &&
520                     !it_open_error(DISP_OPEN_OPEN, it)) {
521                         /*
522                          * If this is a successful OPEN request, we need to set
523                          * replay handler and data early, so that if replay
524                          * happens immediately after swabbing below, new reply
525                          * is swabbed by that handler correctly.
526                          */
527                         mdc_set_open_replay_data(NULL, NULL, req);
528                 }
529
530                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
531                         void *eadata;
532
533                          mdc_update_max_ea_from_body(exp, body);
534
535                         /*
536                          * The eadata is opaque; just check that it is there.
537                          * Eventually, obd_unpackmd() will check the contents.
538                          */
539                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
540                                                               body->eadatasize);
541                         if (eadata == NULL)
542                                 RETURN(-EPROTO);
543
544                         /*
545                          * We save the reply LOV EA in case we have to replay a
546                          * create for recovery.  If we didn't allocate a large
547                          * enough request buffer above we need to reallocate it
548                          * here to hold the actual LOV EA.
549                          *
550                          * To not save LOV EA if request is not going to replay
551                          * (for example error one).
552                          */
553                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
554                                 void *lmm;
555                                 if (req_capsule_get_size(pill, &RMF_EADATA,
556                                                          RCL_CLIENT) <
557                                     body->eadatasize)
558                                         mdc_realloc_openmsg(req, body);
559                                 else
560                                         req_capsule_shrink(pill, &RMF_EADATA,
561                                                            body->eadatasize,
562                                                            RCL_CLIENT);
563
564                                 req_capsule_set_size(pill, &RMF_EADATA,
565                                                      RCL_CLIENT,
566                                                      body->eadatasize);
567
568                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
569                                 if (lmm)
570                                         memcpy(lmm, eadata, body->eadatasize);
571                         }
572                 }
573
574                 if (body->valid & OBD_MD_FLRMTPERM) {
575                         struct mdt_remote_perm *perm;
576
577                         LASSERT(client_is_remote(exp));
578                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
579                                                 lustre_swab_mdt_remote_perm);
580                         if (perm == NULL)
581                                 RETURN(-EPROTO);
582                 }
583                 if (body->valid & OBD_MD_FLMDSCAPA) {
584                         struct lustre_capa *capa, *p;
585
586                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
587                         if (capa == NULL)
588                                 RETURN(-EPROTO);
589
590                         if (it->it_op & IT_OPEN) {
591                                 /* client fid capa will be checked in replay */
592                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
593                                 LASSERT(p);
594                                 *p = *capa;
595                         }
596                 }
597                 if (body->valid & OBD_MD_FLOSSCAPA) {
598                         struct lustre_capa *capa;
599
600                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
601                         if (capa == NULL)
602                                 RETURN(-EPROTO);
603                 }
604         }
605
606         RETURN(rc);
607 }
608
609 /* We always reserve enough space in the reply packet for a stripe MD, because
610  * we don't know in advance the file type. */
611 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
612                 struct lookup_intent *it, struct md_op_data *op_data,
613                 struct lustre_handle *lockh, void *lmm, int lmmsize,
614                 struct ptlrpc_request **reqp, int extra_lock_flags)
615 {
616         struct obd_device     *obddev = class_exp2obd(exp);
617         struct ptlrpc_request *req = NULL;
618         struct req_capsule    *pill;
619         int                    flags = extra_lock_flags;
620         int                    rc;
621         struct ldlm_res_id res_id;
622         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
623         ENTRY;
624
625         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
626                  einfo->ei_type);
627
628         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
629
630         if (it)
631                 flags |= LDLM_FL_HAS_INTENT;
632         if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
633                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
634
635         if (reqp)
636                 req = *reqp;
637
638         if (!it) {
639                 /* The only way right now is FLOCK, in this case we hide flock
640                    policy as lmm, but lmmsize is 0 */
641                 LASSERT(lmm && lmmsize == 0);
642                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
643                          einfo->ei_type);
644                 policy = *(ldlm_policy_data_t *)lmm;
645                 res_id.name[3] = LDLM_FLOCK;
646         } else if (it->it_op & IT_OPEN) {
647                 int joinfile = !!((it->it_create_mode & M_JOIN_FILE) &&
648                                               op_data->op_data);
649
650                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
651                                            einfo->ei_cbdata);
652                 if (!joinfile) {
653                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
654                         einfo->ei_cbdata = NULL;
655                         lmm = NULL;
656                 } else
657                         it->it_create_mode &= ~M_JOIN_FILE;
658         } else if (it->it_op & IT_UNLINK)
659                 req = mdc_intent_unlink_pack(exp, it, op_data);
660         else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
661                 req = mdc_intent_getattr_pack(exp, it, op_data);
662         else if (it->it_op == IT_READDIR)
663                 req = ldlm_enqueue_pack(exp);
664         else {
665                 LBUG();
666                 RETURN(-EINVAL);
667         }
668
669         if (IS_ERR(req))
670                 RETURN(PTR_ERR(req));
671         pill = &req->rq_pill;
672
673         /* It is important to obtain rpc_lock first (if applicable), so that
674          * threads that are serialised with rpc_lock are not polluting our
675          * rpcs in flight counter. We do not do flock request limiting, though*/
676         if (it) {
677                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
678                 mdc_enter_request(&obddev->u.cli);
679         }
680         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
681                               0, NULL, lockh, 0);
682         if (reqp)
683                 *reqp = req;
684
685         if (it) {
686                 mdc_exit_request(&obddev->u.cli);
687                 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
688         }
689         if (!it) {
690                 /* For flock requests we immediatelly return without further
691                    delay and let caller deal with the rest, since rest of
692                    this function metadata processing makes no sense for flock
693                    requests anyway */
694                 RETURN(rc);
695         }
696
697         if (rc < 0) {
698                 CERROR("ldlm_cli_enqueue: %d\n", rc);
699                 mdc_clear_replay_flag(req, rc);
700                 ptlrpc_req_finished(req);
701                 RETURN(rc);
702         }
703         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
704
705         RETURN(rc);
706 }
707
708 static int mdc_finish_intent_lock(struct obd_export *exp,
709                                   struct ptlrpc_request *request,
710                                   struct md_op_data *op_data,
711                                   struct lookup_intent *it,
712                                   struct lustre_handle *lockh)
713 {
714         struct lustre_handle old_lock;
715         struct mdt_body *mdt_body;
716         struct ldlm_lock *lock;
717         int rc;
718
719
720         LASSERT(request != NULL);
721         LASSERT(request != LP_POISON);
722         LASSERT(request->rq_repmsg != LP_POISON);
723
724         if (!it_disposition(it, DISP_IT_EXECD)) {
725                 /* The server failed before it even started executing the
726                  * intent, i.e. because it couldn't unpack the request. */
727                 LASSERT(it->d.lustre.it_status != 0);
728                 RETURN(it->d.lustre.it_status);
729         }
730         rc = it_open_error(DISP_IT_EXECD, it);
731         if (rc)
732                 RETURN(rc);
733
734         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
735         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
736
737         /* If we were revalidating a fid/name pair, mark the intent in
738          * case we fail and get called again from lookup */
739         if (fid_is_sane(&op_data->op_fid2) &&
740             it->it_create_mode & M_CHECK_STALE &&
741             it->it_op != IT_GETATTR) {
742                 it_set_disposition(it, DISP_ENQ_COMPLETE);
743
744                 /* Also: did we find the same inode? */
745                 /* sever can return one of two fids:
746                  * op_fid2 - new allocated fid - if file is created.
747                  * op_fid3 - existent fid - if file only open.
748                  * op_fid3 is saved in lmv_intent_open */
749                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
750                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
751                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
752                                "\n", PFID(&op_data->op_fid2),
753                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
754                         RETURN(-ESTALE);
755                 }
756         }
757
758         rc = it_open_error(DISP_LOOKUP_EXECD, it);
759         if (rc)
760                 RETURN(rc);
761
762         /* keep requests around for the multiple phases of the call
763          * this shows the DISP_XX must guarantee we make it into the call
764          */
765         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
766             it_disposition(it, DISP_OPEN_CREATE) &&
767             !it_open_error(DISP_OPEN_CREATE, it)) {
768                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
769                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
770         }
771         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
772             it_disposition(it, DISP_OPEN_OPEN) &&
773             !it_open_error(DISP_OPEN_OPEN, it)) {
774                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
775                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
776                 /* BUG 11546 - eviction in the middle of open rpc processing */
777                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
778         }
779
780         if (it->it_op & IT_CREAT) {
781                 /* XXX this belongs in ll_create_it */
782         } else if (it->it_op == IT_OPEN) {
783                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
784         } else {
785                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
786         }
787
788         /* If we already have a matching lock, then cancel the new
789          * one.  We have to set the data here instead of in
790          * mdc_enqueue, because we need to use the child's inode as
791          * the l_ast_data to match, and that's not available until
792          * intent_finish has performed the iget().) */
793         lock = ldlm_handle2lock(lockh);
794         if (lock) {
795                 ldlm_policy_data_t policy = lock->l_policy_data;
796                 LDLM_DEBUG(lock, "matching against this");
797
798                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
799                                          &lock->l_resource->lr_name),
800                          "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
801                          (unsigned long)lock->l_resource->lr_name.name[0],
802                          (unsigned long)lock->l_resource->lr_name.name[1],
803                          (unsigned long)lock->l_resource->lr_name.name[2],
804                          (unsigned long)fid_seq(&mdt_body->fid1),
805                          (unsigned long)fid_oid(&mdt_body->fid1),
806                          (unsigned long)fid_ver(&mdt_body->fid1));
807                 LDLM_LOCK_PUT(lock);
808
809                 memcpy(&old_lock, lockh, sizeof(*lockh));
810                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
811                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
812                         ldlm_lock_decref_and_cancel(lockh,
813                                                     it->d.lustre.it_lock_mode);
814                         memcpy(lockh, &old_lock, sizeof(old_lock));
815                         it->d.lustre.it_lock_handle = lockh->cookie;
816                 }
817         }
818         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
819                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
820                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
821         RETURN(rc);
822 }
823
824 /*
825  * This long block is all about fixing up the lock and request state
826  * so that it is correct as of the moment _before_ the operation was
827  * applied; that way, the VFS will think that everything is normal and
828  * call Lustre's regular VFS methods.
829  *
830  * If we're performing a creation, that means that unless the creation
831  * failed with EEXIST, we should fake up a negative dentry.
832  *
833  * For everything else, we want to lookup to succeed.
834  *
835  * One additional note: if CREATE or OPEN succeeded, we add an extra
836  * reference to the request because we need to keep it around until
837  * ll_create/ll_open gets called.
838  *
839  * The server will return to us, in it_disposition, an indication of
840  * exactly what d.lustre.it_status refers to.
841  *
842  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
843  * otherwise if DISP_OPEN_CREATE is set, then it status is the
844  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
845  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
846  * was successful.
847  *
848  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
849  * child lookup.
850  */
851 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
852                     void *lmm, int lmmsize, struct lookup_intent *it,
853                     int lookup_flags, struct ptlrpc_request **reqp,
854                     ldlm_blocking_callback cb_blocking,
855                     int extra_lock_flags)
856 {
857         struct lustre_handle lockh;
858         int rc = 0;
859         ENTRY;
860         LASSERT(it);
861
862         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
863                ", intent: %s flags %#o\n", op_data->op_namelen,
864                op_data->op_name, PFID(&op_data->op_fid2),
865                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
866                it->it_flags);
867
868         lockh.cookie = 0;
869         if (fid_is_sane(&op_data->op_fid2) &&
870             (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
871                 /* We could just return 1 immediately, but since we should only
872                  * be called in revalidate_it if we already have a lock, let's
873                  * verify that. */
874                 ldlm_policy_data_t policy;
875                 ldlm_mode_t mode;
876
877                 /* As not all attributes are kept under update lock, e.g.
878                    owner/group/acls are under lookup lock, we need both
879                    ibits for GETATTR. */
880
881                 /* For CMD, UPDATE lock and LOOKUP lock can not be got
882                  * at the same for cross-object, so we can not match
883                  * the 2 lock at the same time FIXME: but how to handle
884                  * the above situation */
885                 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
886                         MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
887
888                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED,
889                                       &op_data->op_fid2, LDLM_IBITS, &policy,
890                                       LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
891                 if (mode) {
892                         it->d.lustre.it_lock_handle = lockh.cookie;
893                         it->d.lustre.it_lock_mode = mode;
894                 }
895
896                 /* Only return failure if it was not GETATTR by cfid
897                    (from inode_revalidate) */
898                 if (mode || op_data->op_namelen != 0)
899                         RETURN(!!mode);
900         }
901
902         /* lookup_it may be called only after revalidate_it has run, because
903          * revalidate_it cannot return errors, only zero.  Returning zero causes
904          * this call to lookup, which *can* return an error.
905          *
906          * We only want to execute the request associated with the intent one
907          * time, however, so don't send the request again.  Instead, skip past
908          * this and use the request from revalidate.  In this case, revalidate
909          * never dropped its reference, so the refcounts are all OK */
910         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
911                 struct ldlm_enqueue_info einfo =
912                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
913                           ldlm_completion_ast, NULL, NULL, NULL };
914
915                 /* For case if upper layer did not alloc fid, do it now. */
916                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
917                         rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
918                         if (rc < 0) {
919                                 CERROR("Can't alloc new fid, rc %d\n", rc);
920                                 RETURN(rc);
921                         }
922                 }
923                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
924                                  lmm, lmmsize, NULL, extra_lock_flags);
925                 if (rc < 0)
926                         RETURN(rc);
927         } else if (!fid_is_sane(&op_data->op_fid2) ||
928                    !(it->it_create_mode & M_CHECK_STALE)) {
929                 /* DISP_ENQ_COMPLETE set means there is extra reference on
930                  * request referenced from this intent, saved for subsequent
931                  * lookup.  This path is executed when we proceed to this
932                  * lookup, so we clear DISP_ENQ_COMPLETE */
933                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
934         }
935         *reqp = it->d.lustre.it_data;
936         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
937         RETURN(rc);
938 }
939
940 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
941                                               struct ptlrpc_request *req,
942                                               void *unused, int rc)
943 {
944         struct obd_export        *exp = req->rq_async_args.pointer_arg[0];
945         struct md_enqueue_info   *minfo = req->rq_async_args.pointer_arg[1];
946         struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
947         struct lookup_intent     *it;
948         struct lustre_handle     *lockh;
949         struct obd_device        *obddev;
950         int                       flags = LDLM_FL_HAS_INTENT;
951         ENTRY;
952
953         it    = &minfo->mi_it;
954         lockh = &minfo->mi_lockh;
955
956         obddev = class_exp2obd(exp);
957
958         mdc_exit_request(&obddev->u.cli);
959         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
960                 rc = -ETIMEDOUT;
961
962         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
963                                    &flags, NULL, 0, NULL, lockh, rc);
964         if (rc < 0) {
965                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
966                 mdc_clear_replay_flag(req, rc);
967                 GOTO(out, rc);
968         }
969
970         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
971         if (rc)
972                 GOTO(out, rc);
973
974         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
975         EXIT;
976
977 out:
978         OBD_FREE_PTR(einfo);
979         minfo->mi_cb(req, minfo, rc);
980         return 0;
981 }
982
983 int mdc_intent_getattr_async(struct obd_export *exp,
984                              struct md_enqueue_info *minfo,
985                              struct ldlm_enqueue_info *einfo)
986 {
987         struct md_op_data       *op_data = &minfo->mi_data;
988         struct lookup_intent    *it = &minfo->mi_it;
989         struct ptlrpc_request   *req;
990         struct obd_device       *obddev = class_exp2obd(exp);
991         struct ldlm_res_id       res_id;
992         ldlm_policy_data_t       policy = {
993                                         .l_inodebits = { MDS_INODELOCK_LOOKUP }
994                                  };
995         int                      rc;
996         int                      flags = LDLM_FL_HAS_INTENT;
997         ENTRY;
998
999         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1000                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1001                ldlm_it2str(it->it_op), it->it_flags);
1002
1003         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1004         req = mdc_intent_getattr_pack(exp, it, op_data);
1005         if (!req)
1006                 RETURN(-ENOMEM);
1007
1008         mdc_enter_request(&obddev->u.cli);
1009         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1010                               0, NULL, &minfo->mi_lockh, 1);
1011         if (rc < 0) {
1012                 mdc_exit_request(&obddev->u.cli);
1013                 RETURN(rc);
1014         }
1015
1016         req->rq_async_args.pointer_arg[0] = exp;
1017         req->rq_async_args.pointer_arg[1] = minfo;
1018         req->rq_async_args.pointer_arg[2] = einfo;
1019         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1020         ptlrpcd_add_req(req, PSCOPE_OTHER);
1021
1022         RETURN(0);
1023 }
1024
1025 int mdc_revalidate_lock(struct obd_export *exp,
1026                         struct lookup_intent *it,
1027                         struct lu_fid *fid)
1028 {
1029         /* We could just return 1 immediately, but since we should only
1030          * be called in revalidate_it if we already have a lock, let's
1031          * verify that. */
1032         struct ldlm_res_id res_id;
1033         struct lustre_handle lockh;
1034         ldlm_policy_data_t policy;
1035         ldlm_mode_t mode;
1036         ENTRY;
1037
1038         fid_build_reg_res_name(fid, &res_id);
1039         /* As not all attributes are kept under update lock, e.g.
1040            owner/group/acls are under lookup lock, we need both
1041            ibits for GETATTR. */
1042         policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
1043                 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
1044                 MDS_INODELOCK_LOOKUP;
1045
1046         mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1047                                LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
1048                                &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1049         if (mode) {
1050                 it->d.lustre.it_lock_handle = lockh.cookie;
1051                 it->d.lustre.it_lock_mode = mode;
1052         }
1053
1054         RETURN(!!mode);
1055 }