Whamcloud - gitweb
Branch b_new_cmd
[fs/lustre-release.git] / lustre / mdc / mdc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_MDC
29
30 #ifdef __KERNEL__
31 # include <linux/module.h>
32 # include <linux/pagemap.h>
33 # include <linux/miscdevice.h>
34 # include <linux/init.h>
35 #else
36 # include <liblustre.h>
37 #endif
38
39 #include <linux/lustre_acl.h>
40 #include <obd_class.h>
41 #include <lustre_dlm.h>
42 #include <lustre_fid.h>
43 #include <md_object.h>
44 #include <lprocfs_status.h>
45 #include <lustre_param.h>
46 #include "mdc_internal.h"
47
48 static quota_interface_t *quota_interface;
49
50 #define REQUEST_MINOR 244
51
52 static int mdc_cleanup(struct obd_device *obd);
53
54 extern int mds_queue_req(struct ptlrpc_request *);
55
56 static inline struct obd_capa *mdc_unpack_capa(struct ptlrpc_request *req,
57                                                unsigned int offset)
58 {
59         struct lustre_capa *capa;
60         struct obd_capa *oc;
61
62         /* swabbed already in mdc_enqueue */
63         capa = lustre_msg_buf(req->rq_repmsg, offset, sizeof(*capa));
64         if (capa == NULL) {
65                 CERROR("missing capa at offset %d failed!\n", offset);
66                 return ERR_PTR(-EFAULT);
67         }
68
69         oc = alloc_capa(CAPA_SITE_CLIENT);
70         if (!oc) {
71                 CERROR("alloc capa failed!\n");
72                 return ERR_PTR(-ENOMEM);
73         }
74         oc->c_capa = *capa;
75
76         return oc;
77 }
78
79 /* Helper that implements most of mdc_getstatus and signal_completed_replay. */
80 /* XXX this should become mdc_get_info("key"), sending MDS_GET_INFO RPC */
81 static int send_getstatus(struct obd_import *imp, struct lu_fid *rootfid,
82                           struct obd_capa **pc, int level, int msg_flags)
83 {
84         struct ptlrpc_request *req;
85         struct mdt_body *body;
86         int rc, size[3] = { sizeof(struct ptlrpc_body),
87                             sizeof(*body),
88                             sizeof(struct lustre_capa) };
89         ENTRY;
90
91         req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_GETSTATUS, 2, size,
92                               NULL);
93         if (!req)
94                 GOTO(out, rc = -ENOMEM);
95
96         req->rq_send_state = level;
97         ptlrpc_req_set_repsize(req, 3, size);
98
99         mdc_pack_req_body(req, REQ_REC_OFF, 0, NULL, NULL, 0, 0);
100         lustre_msg_add_flags(req->rq_reqmsg, msg_flags);
101         rc = ptlrpc_queue_wait(req);
102
103         if (!rc) {
104                 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
105                                           lustre_swab_mdt_body);
106                 if (body == NULL) {
107                         CERROR ("Can't extract mdt_body\n");
108                         GOTO (out, rc = -EPROTO);
109                 }
110
111                 *rootfid = body->fid1;
112
113                 if (body->valid & OBD_MD_FLMDSCAPA) {
114                         struct obd_capa *oc;
115
116                         oc = mdc_unpack_capa(req, REPLY_REC_OFF + 1);
117                         if (IS_ERR(oc))
118                                 GOTO(out, rc = PTR_ERR(oc));
119                         *pc = oc;
120                 }
121
122                 CDEBUG(D_NET, "root fid="DFID", last_committed="LPU64
123                        ", last_xid="LPU64"\n",
124                        PFID(rootfid),
125                        lustre_msg_get_last_committed(req->rq_repmsg),
126                        lustre_msg_get_last_xid(req->rq_repmsg));
127         }
128
129         EXIT;
130  out:
131         ptlrpc_req_finished(req);
132         return rc;
133 }
134
135 /* This should be mdc_get_info("rootfid") */
136 int mdc_getstatus(struct obd_export *exp, struct lu_fid *rootfid,
137                   struct obd_capa **pc)
138 {
139         return send_getstatus(class_exp2cliimp(exp), rootfid, pc,
140                               LUSTRE_IMP_FULL, 0);
141 }
142
143 /*
144  * This function now is known to always saying that it will receive 4 buffers
145  * from server. Even for cases when acl_size and md_size is zero, RPC header
146  * willcontain 4 fields and RPC itself will contain zero size fields. This is
147  * because mdt_getattr*() _always_ returns 4 fields, but if acl is not needed
148  * and thus zero, it shirinks it, making zero size. The same story about
149  * md_size. And this is course of problem when client waits for smaller number
150  * of fields. This issue will be fixed later when client gets awar of RPC
151  * layouts.  --umka
152  */
153 static int mdc_getattr_common(struct obd_export *exp, unsigned int ea_size,
154                               unsigned int acl_size, int mdscapa,
155                               struct ptlrpc_request *req)
156 {
157         struct mdt_body *body;
158         void *eadata;
159         int size[5] = { sizeof(struct ptlrpc_body),
160                         sizeof(*body),
161                         ea_size,
162                         acl_size };
163         int offset, rc;
164         ENTRY;
165         
166         /* Request message already built. */
167         if (ea_size)
168                 CDEBUG(D_INODE, "reserved %u bytes for MD/symlink in packet\n",
169                        ea_size);
170         if (acl_size)
171                 CDEBUG(D_INODE, "reserved %u bytes for ACL\n", acl_size);
172         if (mdscapa)
173                 size[REPLY_REC_OFF + 2] = sizeof(struct lustre_capa);
174
175         ptlrpc_req_set_repsize(req, 5, size);
176
177         rc = ptlrpc_queue_wait(req);
178         if (rc != 0)
179                 RETURN (rc);
180
181         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
182                                   lustre_swab_mdt_body);
183         if (body == NULL) {
184                 CERROR ("Can't unpack mdt_body\n");
185                 RETURN (-EPROTO);
186         }
187
188         CDEBUG(D_NET, "mode: %o\n", body->mode);
189
190         offset = REPLY_REC_OFF + 1;
191         LASSERT_REPSWAB(req, offset);
192         if (body->eadatasize != 0) {
193                 /* reply indicates presence of eadata; check it's there... */
194                 eadata = lustre_msg_buf(req->rq_repmsg, offset++,
195                                         body->eadatasize);
196                 if (eadata == NULL) {
197                         CERROR ("Missing/short eadata\n");
198                         RETURN (-EPROTO);
199                 }
200         }
201
202         if (body->valid & OBD_MD_FLMODEASIZE) {
203                 struct client_obd *cli = &exp->exp_obd->u.cli;
204
205                 if (cli->cl_max_mds_easize < body->max_mdsize)
206                         cli->cl_max_mds_easize = body->max_mdsize;
207                 if (cli->cl_max_mds_cookiesize < body->max_cookiesize)
208                         cli->cl_max_mds_cookiesize = body->max_cookiesize;
209         }
210
211         offset += !!body->aclsize;
212
213         if (body->valid & OBD_MD_FLMDSCAPA) {
214                 struct lustre_capa *capa;
215
216                 LASSERT(mdscapa);
217                 capa = lustre_unpack_capa(req->rq_repmsg, offset++);
218                 if (capa == NULL) {
219                         CERROR("Missing/short client MDS capability\n");
220                         RETURN(-EPROTO);
221                 }
222         }
223
224         RETURN (0);
225 }
226
227 int mdc_getattr(struct obd_export *exp, const struct lu_fid *fid,
228                 struct obd_capa *oc, obd_valid valid, int ea_size,
229                 struct ptlrpc_request **request)
230 {
231         struct ptlrpc_request *req;
232         int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct mdt_body) };
233         int acl_size = 0, rc;
234         ENTRY;
235
236         if (oc)
237                 size[REQ_REC_OFF + 1] = sizeof(struct lustre_capa);
238
239         /*
240          * XXX do we need to make another request here?  We just did a getattr
241          * to do the lookup in the first place.
242          */
243         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
244                               MDS_GETATTR, 3, size, NULL);
245         if (!req)
246                 GOTO(out, rc = -ENOMEM);
247
248         mdc_pack_req_body(req, REQ_REC_OFF, valid, fid, oc, ea_size,
249                           MDS_BFLAG_EXT_FLAGS/*request "new" flags(bug 9486)*/);
250
251         /* currently only root inode will call us with FLACL */
252         if (valid & OBD_MD_FLACL)
253                 acl_size = LUSTRE_POSIX_ACL_MAX_SIZE;
254          
255         rc = mdc_getattr_common(exp, ea_size, acl_size,
256                                 !!(valid & OBD_MD_FLMDSCAPA), req);
257         if (rc != 0) {
258                 ptlrpc_req_finished (req);
259                 req = NULL;
260         }
261  out:
262         *request = req;
263         RETURN (rc);
264 }
265
266 int mdc_getattr_name(struct obd_export *exp, const struct lu_fid *fid,
267                      struct obd_capa *oc, const char *filename, int namelen,
268                      obd_valid valid, int ea_size,
269                      struct ptlrpc_request **request)
270 {
271         struct ptlrpc_request *req;
272         struct mdt_body *body;
273         int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body), 0, namelen};
274         int rc;
275         ENTRY;
276
277         if (oc)
278                 size[REQ_REC_OFF + 1] = sizeof(struct lustre_capa);
279
280         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
281                               MDS_GETATTR_NAME, 4, size, NULL);
282         if (!req)
283                 GOTO(out, rc = -ENOMEM);
284
285         mdc_pack_req_body(req, REQ_REC_OFF, valid, fid, oc, ea_size,
286                           MDS_BFLAG_EXT_FLAGS/*request "new" flags(bug 9486)*/);
287  
288         LASSERT(strnlen(filename, namelen) == namelen - 1);
289         memcpy(lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, namelen),
290                filename, namelen);
291
292         rc = mdc_getattr_common(exp, ea_size, 0, !!(valid & OBD_MD_FLMDSCAPA),
293                                 req);
294         if (rc != 0) {
295                 ptlrpc_req_finished (req);
296                 req = NULL;
297         }
298  out:
299         *request = req;
300         RETURN(rc);
301 }
302
303 int mdc_is_subdir(struct obd_export *exp, const struct lu_fid *pfid,
304                   const struct lu_fid *cfid,
305                   struct obd_capa *pc, struct obd_capa *cc,
306                   struct ptlrpc_request **request)
307 {
308         int size[4] = { sizeof(struct ptlrpc_body),
309                         sizeof(struct mdt_body) };
310         struct ptlrpc_request *req;
311         struct mdt_body *body;
312         int rc;
313         ENTRY;
314
315         if (pc)
316                 size[REQ_REC_OFF + 1] = sizeof(struct lustre_capa);
317         if (cc)
318                 size[REQ_REC_OFF + 2] = sizeof(struct lustre_capa);
319
320         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
321                               MDS_IS_SUBDIR, 4, size, NULL);
322         if (!req)
323                 GOTO(out, rc = -ENOMEM);
324
325         mdc_is_subdir_pack(req, REQ_REC_OFF, pfid, cfid, pc, cc, 0);
326
327         ptlrpc_req_set_repsize(req, 2, size);
328         rc = ptlrpc_queue_wait(req);
329         if (rc != 0)
330                 GOTO(out, rc);
331
332         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
333                                   lustre_swab_mdt_body);
334         if (body == NULL) {
335                 CERROR ("Can't unpack mdt_body\n");
336                 GOTO(out, rc = -EPROTO);
337         }
338         EXIT;
339  out:
340         *request = req;
341         return rc;
342 }
343
344 static
345 int mdc_xattr_common(struct obd_export *exp, const struct lu_fid *fid,
346                      struct obd_capa *oc,
347                      int opcode, obd_valid valid, const char *xattr_name,
348                      const char *input, int input_size, int output_size,
349                      int flags, struct ptlrpc_request **request)
350 {
351         struct ptlrpc_request *req;
352         int size[5] = { sizeof(struct ptlrpc_body), sizeof(struct mdt_body) };
353         int bufcnt = 3, offset = REQ_REC_OFF + 2;
354         int rc, xattr_namelen = 0, remote_acl = 0;
355         void *tmp;
356         ENTRY;
357
358         if (oc)
359                 size[REQ_REC_OFF + 1] = sizeof(struct lustre_capa);
360         if (xattr_name) {
361                 xattr_namelen = strlen(xattr_name) + 1;
362                 size[bufcnt++] = xattr_namelen;
363         }
364         if (input_size) {
365                 LASSERT(input);
366                 size[bufcnt++] = input_size;
367         }
368
369         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
370                               opcode, bufcnt, size, NULL);
371         if (!req)
372                 GOTO(out, rc = -ENOMEM);
373
374         /* request data */
375         mdc_pack_req_body(req, REQ_REC_OFF, valid, fid, oc, output_size, flags);
376
377
378         if (xattr_name) {
379                 tmp = lustre_msg_buf(req->rq_reqmsg, offset++, xattr_namelen);
380                 memcpy(tmp, xattr_name, xattr_namelen);
381                 if (!strcmp(xattr_name, XATTR_NAME_LUSTRE_ACL))
382                         remote_acl = 1;
383         }
384         if (input_size) {
385                 tmp = lustre_msg_buf(req->rq_reqmsg, offset++, input_size);
386                 memcpy(tmp, input, input_size);
387         }
388
389         /* reply buffers */
390         if (opcode == MDS_GETXATTR) {
391                 size[REPLY_REC_OFF] = sizeof(struct mdt_body);
392                 bufcnt = 2;
393         } else {
394                 bufcnt = 1;
395         }
396
397         /* we do this even output_size is 0, because server is doing that */
398         size[bufcnt++] = output_size;
399         ptlrpc_req_set_repsize(req, bufcnt, size);
400
401         /* make rpc */
402         /* NB: set remote acl doesn't need hold rpc lock, because it just
403          * send command to MDS, and when it's executed on mountpoint on MDS,
404          * another mdc_xattr_common() will be invoked there. */
405         if (opcode == MDS_SETXATTR && !remote_acl)
406                 mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
407
408         rc = ptlrpc_queue_wait(req);
409
410         if (opcode == MDS_SETXATTR && !remote_acl)
411                 mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
412
413         if (rc != 0)
414                 GOTO(err_out, rc);
415
416         if (opcode == MDS_GETXATTR) {
417                 struct mdt_body * body = lustre_swab_repbuf(req, REPLY_REC_OFF,
418                                           sizeof(*body), lustre_swab_mdt_body);
419                 if (body == NULL) {
420                         CERROR ("Can't unpack mdt_body\n");
421                         GOTO(err_out, rc = -EPROTO);
422                 }
423         }
424 out:
425         *request = req;
426         RETURN (rc);
427 err_out:
428         ptlrpc_req_finished(req);
429         req = NULL;
430         goto out;
431 }
432
433 int mdc_setxattr(struct obd_export *exp, const struct lu_fid *fid,
434                  struct obd_capa *oc, obd_valid valid, const char *xattr_name,
435                  const char *input, int input_size, int output_size, int flags,
436                  struct ptlrpc_request **request)
437 {
438         return mdc_xattr_common(exp, fid, oc, MDS_SETXATTR, valid, xattr_name,
439                                 input, input_size, output_size, flags, request);
440 }
441
442 int mdc_getxattr(struct obd_export *exp, const struct lu_fid *fid,
443                  struct obd_capa *oc, obd_valid valid, const char *xattr_name,
444                  const char *input, int input_size, int output_size, int flags,
445                  struct ptlrpc_request **request)
446 {
447         return mdc_xattr_common(exp, fid, oc, MDS_GETXATTR, valid, xattr_name,
448                                 input, input_size, output_size, flags, request);
449 }
450
451 #ifdef CONFIG_FS_POSIX_ACL
452 static
453 int mdc_unpack_acl(struct obd_export *exp, struct ptlrpc_request *req,
454                    struct lustre_md *md, unsigned int offset)
455 {
456         struct mdt_body  *body = md->body;
457         struct posix_acl *acl;
458         void             *buf;
459         int               rc;
460
461         if (!body->aclsize)
462                 return 0;
463
464         buf = lustre_msg_buf(req->rq_repmsg, offset, body->aclsize);
465         if (!buf) {
466                 CERROR("aclsize %u, bufcount %u, bufsize %u\n",
467                        body->aclsize, lustre_msg_bufcount(req->rq_repmsg),
468                        (lustre_msg_bufcount(req->rq_repmsg) <= offset) ?
469                                 -1 : lustre_msg_buflen(req->rq_repmsg, offset));
470                 return -EPROTO;
471         }
472
473         acl = posix_acl_from_xattr(buf, body->aclsize);
474         if (IS_ERR(acl)) {
475                 rc = PTR_ERR(acl);
476                 CERROR("convert xattr to acl: %d\n", rc);
477                 return rc;
478         }
479
480         rc = posix_acl_valid(acl);
481         if (rc) {
482                 CERROR("validate acl: %d\n", rc);
483                 posix_acl_release(acl);
484                 return rc;
485         }
486
487         md->posix_acl = acl;
488         return 0;
489 }
490 #else
491 #define mdc_unpack_acl(exp, req, md, offset) 0
492 #endif
493
494 int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
495                       int offset, struct obd_export *dt_exp, 
496                       struct obd_export *md_exp, 
497                       struct lustre_md *md)
498 {
499         int rc;
500         ENTRY;
501
502         LASSERT(md);
503         memset(md, 0, sizeof(*md));
504
505         md->body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*md->body));
506         LASSERT (md->body != NULL);
507         LASSERT_REPSWABBED(req, offset);
508         offset++;
509
510         if (md->body->valid & OBD_MD_FLEASIZE) {
511                 int lmmsize;
512                 struct lov_mds_md *lmm;
513
514                 LASSERT(S_ISREG(md->body->mode));
515
516                 if (md->body->eadatasize == 0) {
517                         CERROR("OBD_MD_FLEASIZE set, but eadatasize 0\n");
518                         RETURN(-EPROTO);
519                 }
520                 lmmsize = md->body->eadatasize;
521                 lmm = lustre_msg_buf(req->rq_repmsg, offset, lmmsize);
522                 LASSERT (lmm != NULL);
523                 LASSERT_REPSWABBED(req, offset);
524
525                 rc = obd_unpackmd(dt_exp, &md->lsm, lmm, lmmsize);
526                 if (rc < 0)
527                         RETURN(rc);
528
529                 LASSERT (rc >= sizeof (*md->lsm));
530                 offset++;
531         } else if (md->body->valid & OBD_MD_FLDIREA) {
532                 int lmvsize;
533                 struct lov_mds_md *lmv;
534                 LASSERT(S_ISDIR(md->body->mode));
535         
536                 if (md->body->eadatasize == 0) {
537                         CERROR("OBD_MD_FLEASIZE set, but eadatasize 0\n");
538                         RETURN(-EPROTO);
539                 }
540                 if (md->body->valid & OBD_MD_MEA) {
541                         lmvsize = md->body->eadatasize;
542                         lmv = lustre_msg_buf(req->rq_repmsg, offset, lmvsize);
543                         LASSERT (lmv != NULL);
544                         LASSERT_REPSWABBED(req, offset);
545
546                         rc = obd_unpackmd(md_exp, (void *)&md->mea, lmv, 
547                                           lmvsize);
548                         if (rc < 0)
549                                 RETURN(rc);
550
551                         LASSERT (rc >= sizeof (*md->mea));
552                 }
553                 offset++;
554         }
555         rc = 0;
556
557         /* for ACL, it's possible that FLACL is set but aclsize is zero.  only
558          * when aclsize != 0 there's an actual segment for ACL in reply
559          * buffer. */
560         if ((md->body->valid & OBD_MD_FLACL) && md->body->aclsize) {
561                 rc = mdc_unpack_acl(dt_exp, req, md, offset++);
562                 if (rc)
563                         GOTO(out, rc);
564         }
565
566         /* remote permission */
567         if (md->body->valid & OBD_MD_FLRMTPERM) {
568                 md->remote_perm = lustre_msg_buf(req->rq_repmsg, offset++,
569                                                 sizeof(struct mdt_remote_perm));
570                 LASSERT(md->remote_perm);
571         }
572
573         if (md->body->valid & OBD_MD_FLMDSCAPA) {
574                 struct obd_capa *oc = mdc_unpack_capa(req, offset++);
575
576                 if (IS_ERR(oc))
577                         GOTO(out, rc = PTR_ERR(oc));
578                 md->mds_capa = oc;
579         }
580
581         if (md->body->valid & OBD_MD_FLOSSCAPA) {
582                 struct obd_capa *oc = mdc_unpack_capa(req, offset++);
583
584                 if (IS_ERR(oc))
585                         GOTO(out, rc = PTR_ERR(oc));
586                 md->oss_capa = oc;
587         }
588
589         EXIT;
590 out:
591         if (rc) {
592                 if (md->oss_capa)
593                         free_capa(md->oss_capa);
594                 if (md->mds_capa)
595                         free_capa(md->mds_capa);
596 #ifdef CONFIG_FS_POSIX_ACL
597                 posix_acl_release(md->posix_acl);
598 #endif
599                 if (md->lsm)
600                         obd_free_memmd(dt_exp, &md->lsm);
601         }
602         return rc;
603 }
604
605 int mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
606 {
607         ENTRY;
608         if (md->lsm)
609                 obd_free_memmd(exp, &md->lsm);
610
611 #ifdef CONFIG_FS_POSIX_ACL
612         if (md->posix_acl) {
613                 posix_acl_release(md->posix_acl);
614                 md->posix_acl = NULL;
615         }
616 #endif
617         RETURN(0);
618 }
619
620 static void mdc_commit_open(struct ptlrpc_request *req)
621 {
622         struct mdc_open_data *mod = req->rq_cb_data;
623         if (mod == NULL)
624                 return;
625
626         if (mod->mod_close_req != NULL)
627                 mod->mod_close_req->rq_cb_data = NULL;
628
629         if (mod->mod_och != NULL)
630                 mod->mod_och->och_mod = NULL;
631
632         OBD_FREE(mod, sizeof(*mod));
633         req->rq_cb_data = NULL;
634 }
635
636 static void mdc_replay_open(struct ptlrpc_request *req)
637 {
638         struct mdc_open_data *mod = req->rq_cb_data;
639         struct obd_client_handle *och;
640         struct ptlrpc_request *close_req;
641         struct lustre_handle old;
642         struct mdt_body *body;
643         ENTRY;
644
645         if (mod == NULL) {
646                 DEBUG_REQ(D_ERROR, req,
647                           "Can't properly replay without open data.");
648                 EXIT;
649                 return;
650         }
651
652         body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
653                                   lustre_swab_mdt_body);
654
655         och = mod->mod_och;
656         if (och != NULL) {
657                 struct lustre_handle *file_fh;
658
659                 LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
660                 LASSERT(body != NULL);
661
662                 file_fh = &och->och_fh;
663                 CDEBUG(D_HA, "updating handle from "LPX64" to "LPX64"\n",
664                        file_fh->cookie, body->handle.cookie);
665                 memcpy(&old, file_fh, sizeof(old));
666                 memcpy(file_fh, &body->handle, sizeof(*file_fh));
667         }
668
669         close_req = mod->mod_close_req;
670         if (close_req != NULL) {
671                 struct mdt_epoch *epoch;
672
673                 LASSERT(lustre_msg_get_opc(close_req->rq_reqmsg) == MDS_CLOSE);
674                 LASSERT(body != NULL);
675
676                 epoch = lustre_msg_buf(close_req->rq_reqmsg, REQ_REC_OFF,
677                                        sizeof(*epoch));
678                 LASSERT(epoch);
679                 if (och != NULL)
680                         LASSERT(!memcmp(&old, &epoch->handle, sizeof old));
681                 DEBUG_REQ(D_HA, close_req, "updating close body with new fh");
682                 memcpy(&epoch->handle, &body->handle,
683                        sizeof(epoch->handle));
684         }
685
686         EXIT;
687 }
688
689 int mdc_set_open_replay_data(struct obd_export *exp,
690                              struct obd_client_handle *och,
691                              struct ptlrpc_request *open_req)
692 {
693         struct mdc_open_data *mod;
694         struct mdt_rec_create *rec = lustre_msg_buf(open_req->rq_reqmsg,
695                                                     DLM_INTENT_REC_OFF,
696                                                     sizeof(*rec));
697         struct mdt_body *body = lustre_msg_buf(open_req->rq_repmsg,
698                                                DLM_REPLY_REC_OFF,
699                                                sizeof(*body));
700         ENTRY;
701
702         /* incoming message in my byte order (it's been swabbed) */
703         LASSERT(rec != NULL);
704         LASSERT_REPSWABBED(open_req, DLM_REPLY_REC_OFF);
705         /* outgoing messages always in my byte order */
706         LASSERT(body != NULL);
707
708         if (och) {
709                 OBD_ALLOC(mod, sizeof(*mod));
710                 if (mod == NULL) {
711                         DEBUG_REQ(D_ERROR, open_req, "can't allocate mdc_open_data");
712                         RETURN(0);
713                 }
714
715                 och->och_mod = mod;
716                 mod->mod_och = och;
717                 mod->mod_open_req = open_req;
718                 open_req->rq_cb_data = mod;
719                 open_req->rq_commit_cb = mdc_commit_open;
720         }
721
722         rec->cr_fid2 = body->fid1;
723         rec->cr_ioepoch = body->ioepoch;
724         open_req->rq_replay_cb = mdc_replay_open;
725         if (!fid_is_sane(&body->fid1)) {
726                 DEBUG_REQ(D_ERROR, open_req, "saving replay request with "
727                           "insane fid");
728                 LBUG();
729         }
730
731         DEBUG_REQ(D_HA, open_req, "set up replay data");
732         RETURN(0);
733 }
734
735 int mdc_clear_open_replay_data(struct obd_export *exp,
736                                struct obd_client_handle *och)
737 {
738         struct mdc_open_data *mod = och->och_mod;
739         ENTRY;
740
741         /* Don't free the structure now (it happens in mdc_commit_open, after
742          * we're sure we won't need to fix up the close request in the future),
743          * but make sure that replay doesn't poke at the och, which is about to
744          * be freed. */
745         LASSERT(mod != LP_POISON);
746         if (mod != NULL)
747                 mod->mod_och = NULL;
748         och->och_mod = NULL;
749         RETURN(0);
750 }
751
752 static void mdc_commit_close(struct ptlrpc_request *req)
753 {
754         struct mdc_open_data *mod = req->rq_cb_data;
755         struct ptlrpc_request *open_req;
756         struct obd_import *imp = req->rq_import;
757
758         DEBUG_REQ(D_HA, req, "close req committed");
759         if (mod == NULL)
760                 return;
761
762         mod->mod_close_req = NULL;
763         req->rq_cb_data = NULL;
764         req->rq_commit_cb = NULL;
765
766         open_req = mod->mod_open_req;
767         LASSERT(open_req != NULL);
768         LASSERT(open_req != LP_POISON);
769         LASSERT(open_req->rq_type != LI_POISON);
770
771         DEBUG_REQ(D_HA, open_req, "open req balanced");
772         LASSERT(open_req->rq_transno != 0);
773         LASSERT(open_req->rq_import == imp);
774
775         /* We no longer want to preserve this for transno-unconditional
776          * replay. */
777         spin_lock(&open_req->rq_lock);
778         open_req->rq_replay = 0;
779         spin_unlock(&open_req->rq_lock);
780 }
781
782 int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
783               struct obd_client_handle *och, struct ptlrpc_request **request)
784 {
785         struct obd_device *obd = class_exp2obd(exp);
786         int reqsize[4] = { sizeof(struct ptlrpc_body),
787                            sizeof(struct mdt_epoch),
788                            sizeof(struct mdt_rec_setattr)};
789         int repsize[4] = { sizeof(struct ptlrpc_body),
790                            sizeof(struct mdt_body),
791                            obd->u.cli.cl_max_mds_easize,
792                            obd->u.cli.cl_max_mds_cookiesize };
793         struct ptlrpc_request *req;
794         struct mdc_open_data *mod;
795         int rc;
796         ENTRY;
797
798         if (op_data->mod_capa1)
799                 reqsize[REQ_REC_OFF + 2] = sizeof(struct lustre_capa);
800         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
801                               MDS_CLOSE, 4, reqsize, NULL);
802         if (req == NULL)
803                 GOTO(out, rc = -ENOMEM);
804
805         /* To avoid a livelock (bug 7034), we need to send CLOSE RPCs to a
806          * portal whose threads are not taking any DLM locks and are therefore
807          * always progressing */
808         /* XXX FIXME bug 249 */
809         req->rq_request_portal = MDS_READPAGE_PORTAL;
810
811         /* Ensure that this close's handle is fixed up during replay. */
812         LASSERT(och != NULL);
813         LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
814         mod = och->och_mod;
815         if (likely(mod != NULL)) {
816                 mod->mod_close_req = req;
817                 if (mod->mod_open_req->rq_type == LI_POISON) {
818                         /* FIXME This should be an ASSERT, but until we
819                            figure out why it can be poisoned here, give
820                            a reasonable return. bug 6155 */
821                         CERROR("LBUG POISONED open %p!\n", mod->mod_open_req);
822                         ptlrpc_req_finished(req);
823                         req = NULL;
824                         GOTO(out, rc = -EIO);
825                 }
826                 DEBUG_REQ(D_HA, mod->mod_open_req, "matched open");
827         } else {
828                 CDEBUG(D_HA, "couldn't find open req; expecting close error\n");
829         }
830
831         mdc_close_pack(req, REQ_REC_OFF, op_data);
832         ptlrpc_req_set_repsize(req, 4, repsize);
833         req->rq_commit_cb = mdc_commit_close;
834         LASSERT(req->rq_cb_data == NULL);
835         req->rq_cb_data = mod;
836
837         mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL);
838         rc = ptlrpc_queue_wait(req);
839         mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
840
841         if (req->rq_repmsg == NULL) {
842                 CDEBUG(D_HA, "request failed to send: %p, %d\n", req,
843                        req->rq_status);
844                 if (rc == 0)
845                         rc = req->rq_status ? req->rq_status : -EIO;
846         } else if (rc == 0) {
847                 rc = lustre_msg_get_status(req->rq_repmsg);
848                 if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
849                         DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err "
850                                   "= %d", rc);
851                         if (rc > 0)
852                                 rc = -rc;
853                 } else if (mod == NULL) {
854                         CERROR("Unexpected: can't find mdc_open_data, but the "
855                                "close succeeded.  Please tell CFS.\n");
856                 }
857                 if (!lustre_swab_repbuf(req, REPLY_REC_OFF,
858                                         sizeof(struct mdt_body),
859                                         lustre_swab_mdt_body)) {
860                         CERROR("Error unpacking mdt_body\n");
861                         rc = -EPROTO;
862                 }
863         }
864
865         EXIT;
866         *request = req;
867  out:
868         if (rc != 0 && req && req->rq_commit_cb)
869                 req->rq_commit_cb(req);
870
871         return rc;
872 }
873
874 int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data,
875                      struct obd_client_handle *och)
876 {
877         struct ptlrpc_request *req;
878         int size[4] = { sizeof(struct ptlrpc_body),
879                         sizeof(struct mdt_epoch),
880                         sizeof(struct mdt_rec_setattr)};
881         int repsize[2] = { sizeof(struct ptlrpc_body),
882                            sizeof(struct mdt_body)};
883         int rc;
884         ENTRY;
885
886         if (op_data->mod_capa1)
887                 size[REQ_REC_OFF + 2] = sizeof(struct lustre_capa);
888         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
889                               MDS_DONE_WRITING, 4, size, NULL);
890         if (req == NULL)
891                 RETURN(-ENOMEM);
892
893         /* XXX: add DONE_WRITING request to och -- when Size-on-MDS 
894          * recovery will be ready. */
895         mdc_close_pack(req, REQ_REC_OFF, op_data);
896         
897         ptlrpc_req_set_repsize(req, 2, repsize);
898         rc = ptlrpc_queue_wait(req);
899         ptlrpc_req_finished(req);
900         RETURN(rc);
901 }
902
903 #ifdef HAVE_SPLIT_SUPPORT
904 int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid,
905                  const struct page *page, int offset)
906 {
907         struct obd_import *imp = class_exp2cliimp(exp);
908         struct ptlrpc_request *req = NULL;
909         struct ptlrpc_bulk_desc *desc = NULL;
910         struct mdt_body *body;
911         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
912         ENTRY;
913
914         CDEBUG(D_INODE, "object: "DFID"\n", PFID(fid));
915
916         req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_WRITEPAGE, 3, size,
917                               NULL);
918         if (req == NULL)
919                 GOTO(out, rc = -ENOMEM);
920
921         req->rq_request_portal = MDS_READPAGE_PORTAL;
922
923         desc = ptlrpc_prep_bulk_imp(req, 1, BULK_GET_SOURCE, MDS_BULK_PORTAL);
924         if (desc == NULL)
925                 GOTO(out, rc = -ENOMEM);
926         /* NB req now owns desc and will free it when it gets freed */
927         ptlrpc_prep_bulk_page(desc, (struct page*)page, 0, offset);
928
929         mdc_readdir_pack(req, REQ_REC_OFF, 0, offset, fid, NULL);
930
931         ptlrpc_req_set_repsize(req, 2, size);
932         rc = ptlrpc_queue_wait(req);
933 out:
934         ptlrpc_req_finished(req);
935         RETURN(rc);
936 }
937 EXPORT_SYMBOL(mdc_sendpage);
938 #endif
939
940 int mdc_readpage(struct obd_export *exp, const struct lu_fid *fid,
941                  struct obd_capa *oc, __u64 offset, struct page *page,
942                  struct ptlrpc_request **request)
943 {
944         struct obd_import *imp = class_exp2cliimp(exp);
945         struct ptlrpc_request *req = NULL;
946         struct ptlrpc_bulk_desc *desc = NULL;
947         struct mdt_body *body;
948         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
949         ENTRY;
950
951         CDEBUG(D_INODE, "object: "DFID"\n", PFID(fid));
952
953         if (oc)
954                 size[REQ_REC_OFF + 1] = sizeof(struct lustre_capa);
955         req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_READPAGE, 3, size,
956                               NULL);
957         if (req == NULL)
958                 GOTO(out, rc = -ENOMEM);
959
960         /* XXX FIXME bug 249 */
961         req->rq_request_portal = MDS_READPAGE_PORTAL;
962
963         desc = ptlrpc_prep_bulk_imp(req, 1, BULK_PUT_SINK, MDS_BULK_PORTAL);
964         if (desc == NULL)
965                 GOTO(out, rc = -ENOMEM);
966         /* NB req now owns desc and will free it when it gets freed */
967
968         ptlrpc_prep_bulk_page(desc, page, 0, PAGE_CACHE_SIZE);
969
970         mdc_readdir_pack(req, REQ_REC_OFF, offset, PAGE_CACHE_SIZE, fid, oc);
971
972         ptlrpc_req_set_repsize(req, 2, size);
973         rc = ptlrpc_queue_wait(req);
974
975         if (rc == 0 || rc == -ERANGE) {
976                 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
977                                           lustre_swab_mdt_body);
978                 if (body == NULL) {
979                         CERROR("Can't unpack mdt_body\n");
980                         GOTO(out, rc = -EPROTO);
981                 }
982
983                 if (req->rq_bulk->bd_nob_transferred != PAGE_CACHE_SIZE) {
984                         CERROR ("Unexpected # bytes transferred: %d"
985                                 " (%ld expected)\n",
986                                 req->rq_bulk->bd_nob_transferred,
987                                 PAGE_CACHE_SIZE);
988                         GOTO (out, rc = -EPROTO);
989                 }
990         }
991
992         EXIT;
993  out:
994         *request = req;
995         return rc;
996 }
997
998 static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
999                          void *karg, void *uarg)
1000 {
1001         struct obd_device *obd = exp->exp_obd;
1002         struct obd_ioctl_data *data = karg;
1003         struct obd_import *imp = obd->u.cli.cl_import;
1004         struct llog_ctxt *ctxt;
1005         int rc;
1006         ENTRY;
1007
1008 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1009         MOD_INC_USE_COUNT;
1010 #else
1011         if (!try_module_get(THIS_MODULE)) {
1012                 CERROR("Can't get module. Is it alive?");
1013                 return -EINVAL;
1014         }
1015 #endif
1016         switch (cmd) {
1017         case OBD_IOC_CLIENT_RECOVER:
1018                 rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1);
1019                 if (rc < 0)
1020                         GOTO(out, rc);
1021                 GOTO(out, rc = 0);
1022         case IOC_OSC_SET_ACTIVE:
1023                 rc = ptlrpc_set_import_active(imp, data->ioc_offset);
1024                 GOTO(out, rc);
1025         case OBD_IOC_PARSE: {
1026                 ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT);
1027                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
1028                 GOTO(out, rc);
1029         }
1030 #ifdef __KERNEL__
1031         case OBD_IOC_LLOG_INFO:
1032         case OBD_IOC_LLOG_PRINT: {
1033                 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
1034                 rc = llog_ioctl(ctxt, cmd, data);
1035
1036                 GOTO(out, rc);
1037         }
1038 #endif
1039         case OBD_IOC_POLL_QUOTACHECK:
1040                 rc = lquota_poll_check(quota_interface, exp,
1041                                        (struct if_quotacheck *)karg);
1042                 GOTO(out, rc);
1043         default:
1044                 CERROR("mdc_ioctl(): unrecognised ioctl %#x\n", cmd);
1045                 GOTO(out, rc = -ENOTTY);
1046         }
1047 out:
1048 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1049         MOD_DEC_USE_COUNT;
1050 #else
1051         module_put(THIS_MODULE);
1052 #endif
1053
1054         return rc;
1055 }
1056
1057 int mdc_set_info_async(struct obd_export *exp, obd_count keylen,
1058                        void *key, obd_count vallen, void *val,
1059                        struct ptlrpc_request_set *set)
1060 {
1061         struct obd_import *imp = class_exp2cliimp(exp);
1062         int rc = -EINVAL;
1063
1064         if (KEY_IS(KEY_INIT_RECOV)) {
1065                 if (vallen != sizeof(int))
1066                         RETURN(-EINVAL);
1067                 imp->imp_initial_recov = *(int *)val;
1068                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
1069                        exp->exp_obd->obd_name, imp->imp_initial_recov);
1070                 RETURN(0);
1071         }
1072         /* Turn off initial_recov after we try all backup servers once */
1073         if (KEY_IS(KEY_INIT_RECOV_BACKUP)) {
1074                 if (vallen != sizeof(int))
1075                         RETURN(-EINVAL);
1076                 imp->imp_initial_recov_bk = *(int *)val;
1077                 if (imp->imp_initial_recov_bk)
1078                         imp->imp_initial_recov = 1;
1079                 CDEBUG(D_HA, "%s: set imp_initial_recov_bk = %d\n",
1080                        exp->exp_obd->obd_name, imp->imp_initial_recov_bk);
1081                 RETURN(0);
1082         }
1083         if (KEY_IS("read-only")) {
1084                 struct ptlrpc_request *req;
1085                 int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
1086                 char *bufs[3] = { NULL, key, val };
1087
1088                 if (vallen != sizeof(int))
1089                         RETURN(-EINVAL);
1090
1091                 if (*((int *)val)) {
1092                         imp->imp_connect_flags_orig |= OBD_CONNECT_RDONLY;
1093                         imp->imp_connect_data.ocd_connect_flags |=
1094                                 OBD_CONNECT_RDONLY;
1095                 } else {
1096                         imp->imp_connect_flags_orig &= ~OBD_CONNECT_RDONLY;
1097                         imp->imp_connect_data.ocd_connect_flags &=
1098                                 ~OBD_CONNECT_RDONLY;
1099                 }
1100
1101                 req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_SET_INFO,
1102                                       3, size, bufs);
1103                 if (req == NULL)
1104                         RETURN(-ENOMEM);
1105
1106                 ptlrpc_req_set_repsize(req, 1, NULL);
1107                 if (set) {
1108                         rc = 0;
1109                         ptlrpc_set_add_req(set, req);
1110                         ptlrpc_check_set(set);
1111                 } else {
1112                         rc = ptlrpc_queue_wait(req);
1113                         ptlrpc_req_finished(req);
1114                 }
1115
1116                 RETURN(rc);
1117         }
1118
1119         if (KEY_IS(KEY_FLUSH_CTX)) {
1120                 sptlrpc_import_flush_my_ctx(imp);
1121                 RETURN(0);
1122         }
1123
1124         RETURN(rc);
1125 }
1126
1127 int mdc_get_info(struct obd_export *exp, __u32 keylen, void *key,
1128                  __u32 *vallen, void *val)
1129 {
1130         int rc = -EINVAL;
1131
1132         if (KEY_IS(KEY_MAX_EASIZE)) {
1133                 int mdsize, *max_easize;
1134
1135                 if (*vallen != sizeof(int))
1136                         RETURN(-EINVAL);
1137                 mdsize = *(int*)val;
1138                 if (mdsize > exp->exp_obd->u.cli.cl_max_mds_easize)
1139                         exp->exp_obd->u.cli.cl_max_mds_easize = mdsize;
1140                 max_easize = val;
1141                 *max_easize = exp->exp_obd->u.cli.cl_max_mds_easize;
1142                 RETURN(0);
1143         }
1144         if (KEY_IS(KEY_CONN_DATA)) {
1145                 struct obd_import *imp = class_exp2cliimp(exp);
1146                 struct obd_connect_data *data = val;
1147
1148                 if (*vallen != sizeof(*data))
1149                         RETURN(-EINVAL);
1150
1151                 *data = imp->imp_connect_data;
1152                 RETURN(0);
1153         }
1154                 
1155         RETURN(rc);
1156 }
1157
1158 static int mdc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1159                       __u64 max_age)
1160 {
1161         struct ptlrpc_request *req;
1162         struct obd_statfs *msfs;
1163         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*msfs) };
1164         ENTRY;
1165
1166         /* We could possibly pass max_age in the request (as an absolute
1167          * timestamp or a "seconds.usec ago") so the target can avoid doing
1168          * extra calls into the filesystem if that isn't necessary (e.g.
1169          * during mount that would help a bit).  Having relative timestamps
1170          * is not so great if request processing is slow, while absolute
1171          * timestamps are not ideal because they need time synchronization. */
1172         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_MDS_VERSION,
1173                               MDS_STATFS, 1, NULL, NULL);
1174         if (!req)
1175                 RETURN(-ENOMEM);
1176
1177         ptlrpc_req_set_repsize(req, 2, size);
1178
1179         rc = ptlrpc_queue_wait(req);
1180
1181         if (rc)
1182                 GOTO(out, rc);
1183
1184         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
1185                                   lustre_swab_obd_statfs);
1186         if (msfs == NULL) {
1187                 CERROR("Can't unpack obd_statfs\n");
1188                 GOTO(out, rc = -EPROTO);
1189         }
1190
1191         memcpy(osfs, msfs, sizeof(*msfs));
1192         EXIT;
1193 out:
1194         ptlrpc_req_finished(req);
1195
1196         return rc;
1197 }
1198
1199 static int mdc_pin(struct obd_export *exp, const struct lu_fid *fid,
1200                    struct obd_capa *oc,
1201                    struct obd_client_handle *handle, int flag)
1202 {
1203         struct ptlrpc_request *req;
1204         struct mdt_body *body;
1205         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
1206         ENTRY;
1207
1208         if (oc)
1209                 size[REQ_REC_OFF + 1] = sizeof(struct lustre_capa);
1210         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
1211                               MDS_PIN, 3, size, NULL);
1212         if (req == NULL)
1213                 RETURN(-ENOMEM);
1214
1215         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof (*body));
1216         body->fid1 = *fid;
1217         body->flags = flag;
1218         mdc_pack_capa(req, REQ_REC_OFF + 1, oc);
1219
1220         ptlrpc_req_set_repsize(req, 2, size);
1221
1222         mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
1223         rc = ptlrpc_queue_wait(req);
1224         mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
1225         if (rc) {
1226                 CERROR("pin failed: %d\n", rc);
1227                 ptlrpc_req_finished(req);
1228                 RETURN(rc);
1229         }
1230
1231         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1232                                   lustre_swab_mdt_body);
1233         if (body == NULL) {
1234                 ptlrpc_req_finished(req);
1235                 RETURN(rc);
1236         }
1237
1238         memcpy(&handle->och_fh, &body->handle, sizeof(body->handle));
1239         handle->och_magic = OBD_CLIENT_HANDLE_MAGIC;
1240
1241         OBD_ALLOC(handle->och_mod, sizeof(*handle->och_mod));
1242         if (handle->och_mod == NULL) {
1243                 DEBUG_REQ(D_ERROR, req, "can't allocate mdc_open_data");
1244                 RETURN(-ENOMEM);
1245         }
1246         handle->och_mod->mod_open_req = req; /* will be dropped by unpin */
1247
1248         RETURN(rc);
1249 }
1250
1251 static int mdc_unpin(struct obd_export *exp,
1252                      struct obd_client_handle *handle, int flag)
1253 {
1254         struct ptlrpc_request *req;
1255         struct mdt_body *body;
1256         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
1257         ENTRY;
1258
1259         if (handle->och_magic != OBD_CLIENT_HANDLE_MAGIC)
1260                 RETURN(0);
1261
1262         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
1263                               MDS_CLOSE, 2, size, NULL);
1264         if (req == NULL)
1265                 RETURN(-ENOMEM);
1266
1267         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1268         memcpy(&body->handle, &handle->och_fh, sizeof(body->handle));
1269         body->flags = flag;
1270
1271         ptlrpc_req_set_repsize(req, 1, NULL);
1272         mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
1273         rc = ptlrpc_queue_wait(req);
1274         mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
1275
1276         if (rc != 0)
1277                 CERROR("unpin failed: %d\n", rc);
1278
1279         ptlrpc_req_finished(req);
1280         ptlrpc_req_finished(handle->och_mod->mod_open_req);
1281         OBD_FREE(handle->och_mod, sizeof(*handle->och_mod));
1282         RETURN(rc);
1283 }
1284
1285 int mdc_sync(struct obd_export *exp, const struct lu_fid *fid,
1286              struct obd_capa *oc,
1287              struct ptlrpc_request **request)
1288 {
1289         struct ptlrpc_request *req;
1290         int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct mdt_body) };
1291         int rc;
1292         ENTRY;
1293
1294         if (oc)
1295                 size[REQ_REC_OFF + 1] = sizeof(struct lustre_capa);
1296         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
1297                               MDS_SYNC, 3, size, NULL);
1298         if (!req)
1299                 RETURN(rc = -ENOMEM);
1300
1301         mdc_pack_req_body(req, REQ_REC_OFF, 0, fid, oc, 0, 0);
1302
1303         ptlrpc_req_set_repsize(req, 2, size);
1304
1305         rc = ptlrpc_queue_wait(req);
1306         if (rc || request == NULL)
1307                 ptlrpc_req_finished(req);
1308         else
1309                 *request = req;
1310
1311         RETURN(rc);
1312 }
1313
1314 static int mdc_import_event(struct obd_device *obd, struct obd_import *imp,
1315                             enum obd_import_event event)
1316 {
1317         int rc = 0;
1318
1319         LASSERT(imp->imp_obd == obd);
1320
1321         switch (event) {
1322         case IMP_EVENT_DISCON: {
1323                 break;
1324         }
1325         case IMP_EVENT_INACTIVE: {
1326                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
1327                 break;
1328         }
1329         case IMP_EVENT_INVALIDATE: {
1330                 struct ldlm_namespace *ns = obd->obd_namespace;
1331
1332                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
1333
1334                 break;
1335         }
1336         case IMP_EVENT_ACTIVE: {
1337                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
1338                 break;
1339         }
1340         case IMP_EVENT_OCD:
1341                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
1342                 break;
1343
1344         default:
1345                 CERROR("Unknown import event %x\n", event);
1346                 LBUG();
1347         }
1348         RETURN(rc);
1349 }
1350
1351 static int mdc_fid_init(struct obd_export *exp)
1352 {
1353         struct client_obd *cli = &exp->exp_obd->u.cli;
1354         char *prefix;
1355         int rc;
1356         ENTRY;
1357
1358         OBD_ALLOC_PTR(cli->cl_seq);
1359         if (cli->cl_seq == NULL)
1360                 RETURN(-ENOMEM);
1361
1362         OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
1363         if (prefix == NULL)
1364                 GOTO(out_free_seq, rc = -ENOMEM);
1365
1366         snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s",
1367                  exp->exp_obd->obd_name);
1368
1369         /* init client side sequence-manager */
1370         rc = seq_client_init(cli->cl_seq, exp, 
1371                              LUSTRE_SEQ_METADATA,
1372                              prefix, NULL);
1373         OBD_FREE(prefix, MAX_OBD_NAME + 5);
1374         if (rc)
1375                 GOTO(out_free_seq, rc);
1376
1377         /* pre-allocate meta-sequence */
1378         rc = seq_client_alloc_meta(cli->cl_seq, NULL);
1379         if (rc) {
1380                 CERROR("can't allocate new mata-sequence, "
1381                        "rc %d\n", rc);
1382                 GOTO(out_free_seq, rc);
1383         }
1384         RETURN(rc);
1385
1386 out_free_seq:
1387         OBD_FREE_PTR(cli->cl_seq);
1388         cli->cl_seq = NULL;
1389         return rc;
1390 }
1391
1392 static int mdc_fid_fini(struct obd_export *exp)
1393 {
1394         struct client_obd *cli = &exp->exp_obd->u.cli;
1395         ENTRY;
1396
1397         if (cli->cl_seq != NULL) {
1398                 seq_client_fini(cli->cl_seq);
1399                 OBD_FREE_PTR(cli->cl_seq);
1400                 cli->cl_seq = NULL;
1401         }
1402         
1403         RETURN(0);
1404 }
1405
1406 static int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
1407                          struct lu_placement_hint *hint)
1408 {
1409         struct client_obd *cli = &exp->exp_obd->u.cli;
1410         struct lu_client_seq *seq = cli->cl_seq;
1411
1412         ENTRY;
1413         RETURN(seq_client_alloc_fid(seq, fid));
1414 }
1415
1416 static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
1417 {
1418         struct client_obd *cli = &obd->u.cli;
1419         struct lprocfs_static_vars lvars;
1420         int rc;
1421         ENTRY;
1422
1423         OBD_ALLOC(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
1424         if (!cli->cl_rpc_lock)
1425                 RETURN(-ENOMEM);
1426         mdc_init_rpc_lock(cli->cl_rpc_lock);
1427
1428         ptlrpcd_addref();
1429
1430         OBD_ALLOC(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
1431         if (!cli->cl_setattr_lock)
1432                 GOTO(err_rpc_lock, rc = -ENOMEM);
1433         mdc_init_rpc_lock(cli->cl_setattr_lock);
1434
1435         OBD_ALLOC(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
1436         if (!cli->cl_close_lock)
1437                 GOTO(err_setattr_lock, rc = -ENOMEM);
1438         mdc_init_rpc_lock(cli->cl_close_lock);
1439
1440         rc = client_obd_setup(obd, cfg);
1441         if (rc)
1442                 GOTO(err_close_lock, rc);
1443         lprocfs_init_vars(mdc, &lvars);
1444         lprocfs_obd_setup(obd, lvars.obd_vars);
1445
1446         rc = obd_llog_init(obd, NULL, obd, 0, NULL, NULL);
1447         if (rc) {
1448                 mdc_cleanup(obd);
1449                 CERROR("failed to setup llogging subsystems\n");
1450         }
1451
1452         RETURN(rc);
1453
1454 err_close_lock:
1455         OBD_FREE(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
1456 err_setattr_lock:
1457         OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
1458 err_rpc_lock:
1459         OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
1460         ptlrpcd_decref();
1461         RETURN(rc);
1462 }
1463
1464 /* Initialize the default and maximum LOV EA and cookie sizes.  This allows
1465  * us to make MDS RPCs with large enough reply buffers to hold the
1466  * maximum-sized (= maximum striped) EA and cookie without having to
1467  * calculate this (via a call into the LOV + OSCs) each time we make an RPC. */
1468 int mdc_init_ea_size(struct obd_export *exp, int easize,
1469                      int def_easize, int cookiesize)
1470 {
1471         struct obd_device *obd = exp->exp_obd;
1472         struct client_obd *cli = &obd->u.cli;
1473         ENTRY;
1474
1475         if (cli->cl_max_mds_easize < easize)
1476                 cli->cl_max_mds_easize = easize;
1477
1478         if (cli->cl_default_mds_easize < def_easize)
1479                 cli->cl_default_mds_easize = def_easize;
1480
1481         if (cli->cl_max_mds_cookiesize < cookiesize)
1482                 cli->cl_max_mds_cookiesize = cookiesize;
1483
1484         RETURN(0);
1485 }
1486
1487 static int mdc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
1488 {
1489         int rc = 0;
1490         ENTRY;
1491
1492         switch (stage) {
1493         case OBD_CLEANUP_EARLY:
1494         case OBD_CLEANUP_EXPORTS:
1495                 break;
1496         case OBD_CLEANUP_SELF_EXP:
1497                 rc = obd_llog_finish(obd, 0);
1498                 if (rc != 0)
1499                         CERROR("failed to cleanup llogging subsystems\n");
1500         case OBD_CLEANUP_OBD:
1501                 break;
1502         }
1503         RETURN(rc);
1504 }
1505
1506 static int mdc_cleanup(struct obd_device *obd)
1507 {
1508         struct client_obd *cli = &obd->u.cli;
1509
1510         OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
1511         OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
1512         OBD_FREE(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
1513
1514         lprocfs_obd_cleanup(obd);
1515         ptlrpcd_decref();
1516
1517         return client_obd_cleanup(obd);
1518 }
1519
1520
1521 static int mdc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
1522                          struct obd_device *tgt,
1523                          int count, struct llog_catid *logid, 
1524                          struct obd_uuid *uuid)
1525 {
1526         struct llog_ctxt *ctxt;
1527         int rc;
1528         ENTRY;
1529
1530         rc = llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
1531                         &llog_client_ops);
1532         if (rc == 0) {
1533                 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
1534                 ctxt->loc_imp = obd->u.cli.cl_import;
1535         }
1536
1537         rc = llog_setup(obd, llogs, LLOG_LOVEA_REPL_CTXT, tgt, 0, NULL,
1538                        &llog_client_ops);
1539         if (rc == 0) {
1540                 ctxt = llog_get_context(obd, LLOG_LOVEA_REPL_CTXT);
1541                 ctxt->loc_imp = obd->u.cli.cl_import;
1542         }
1543
1544         RETURN(rc);
1545 }
1546
1547 static int mdc_llog_finish(struct obd_device *obd, int count)
1548 {
1549         int rc;
1550         ENTRY;
1551
1552         rc = llog_cleanup(llog_get_context(obd, LLOG_LOVEA_REPL_CTXT));
1553         if (rc) {
1554                 CERROR("can not cleanup LLOG_CONFIG_REPL_CTXT rc %d\n", rc);
1555         }
1556         rc = llog_cleanup(llog_get_context(obd, LLOG_CONFIG_REPL_CTXT));
1557         RETURN(rc);
1558 }
1559
1560 static int mdc_process_config(struct obd_device *obd, obd_count len, void *buf)
1561 {
1562         struct lustre_cfg *lcfg = buf;
1563         struct lprocfs_static_vars lvars;
1564         int rc = 0;
1565
1566         lprocfs_init_vars(mdc, &lvars);
1567         
1568         rc = class_process_proc_param(PARAM_MDC, lvars.obd_vars, lcfg, obd);
1569         return(rc);
1570 }
1571
1572 /* get remote permission for current user on fid */
1573 int mdc_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid,
1574                         struct obd_capa *oc, struct ptlrpc_request **request)
1575 {
1576         struct ptlrpc_request *req;
1577         struct mdt_body *body;
1578         struct mdt_remote_perm *perm;
1579         int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) };
1580         int rc;
1581         ENTRY;
1582
1583         if (oc)
1584                 size[REQ_REC_OFF + 1] = sizeof(struct lustre_capa);
1585
1586         *request = NULL;
1587         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
1588                               MDS_GETATTR, 3, size, NULL);
1589         if (!req)
1590                 RETURN(-ENOMEM);
1591
1592         mdc_pack_req_body(req, REQ_REC_OFF, OBD_MD_FLRMTPERM, fid, oc, 0, 0);
1593
1594         size[REPLY_REC_OFF + 1] = sizeof(*perm);
1595         ptlrpc_req_set_repsize(req, 5, size);
1596         rc = ptlrpc_queue_wait(req);
1597         if (rc) {
1598                 ptlrpc_req_finished(req);
1599                 RETURN(rc);
1600         }
1601
1602         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1603                                   lustre_swab_mdt_body);
1604         LASSERT(body);
1605         LASSERT(body->valid & OBD_MD_FLRMTPERM);
1606
1607         perm = lustre_swab_repbuf(req, REPLY_REC_OFF + 1, sizeof(*perm),
1608                                   lustre_swab_mdt_remote_perm);
1609         LASSERT(perm);
1610
1611         *request = req;
1612         RETURN(0);
1613 }
1614
1615 static int mdc_renew_capa(struct obd_export *exp, struct obd_capa *oc,
1616                           renew_capa_cb_t cb)
1617 {
1618         struct ptlrpc_request *req;
1619         int size[2] = { sizeof(struct ptlrpc_body),
1620                         sizeof(struct lustre_capa) };
1621         int repsize[3] = { sizeof(struct ptlrpc_body),
1622                            sizeof(struct mdt_body),
1623                            sizeof(struct lustre_capa) };
1624         ENTRY;
1625
1626         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
1627                               MDS_RENEW_CAPA, 2, size, NULL);
1628         if (!req)
1629                 RETURN(-ENOMEM);
1630
1631         mdc_pack_capa(req, REQ_REC_OFF, oc);
1632
1633         ptlrpc_req_set_repsize(req, 3, repsize);
1634         req->rq_interpret_reply = cb;
1635         ptlrpcd_add_req(req);
1636
1637         RETURN(0);
1638 }
1639
1640 struct obd_ops mdc_obd_ops = {
1641         .o_owner            = THIS_MODULE,
1642         .o_setup            = mdc_setup,
1643         .o_precleanup       = mdc_precleanup,
1644         .o_cleanup          = mdc_cleanup,
1645         .o_add_conn         = client_import_add_conn,
1646         .o_del_conn         = client_import_del_conn,
1647         .o_connect          = client_connect_import,
1648         .o_disconnect       = client_disconnect_export,
1649         .o_iocontrol        = mdc_iocontrol,
1650         .o_set_info_async   = mdc_set_info_async,
1651         .o_statfs           = mdc_statfs,
1652         .o_pin              = mdc_pin,
1653         .o_unpin            = mdc_unpin,
1654         .o_fid_init         = mdc_fid_init,
1655         .o_fid_fini         = mdc_fid_fini,
1656         .o_fid_alloc        = mdc_fid_alloc,
1657         .o_import_event     = mdc_import_event,
1658         .o_llog_init        = mdc_llog_init,
1659         .o_llog_finish      = mdc_llog_finish,
1660         .o_get_info         = mdc_get_info,
1661         .o_process_config  = mdc_process_config,
1662 };
1663
1664 struct md_ops mdc_md_ops = {
1665         .m_getstatus        = mdc_getstatus,
1666         .m_change_cbdata    = mdc_change_cbdata,
1667         .m_close            = mdc_close,
1668         .m_create           = mdc_create,
1669         .m_done_writing     = mdc_done_writing,
1670         .m_enqueue          = mdc_enqueue,
1671         .m_getattr          = mdc_getattr,
1672         .m_getattr_name     = mdc_getattr_name,
1673         .m_intent_lock      = mdc_intent_lock,
1674         .m_link             = mdc_link,
1675         .m_is_subdir        = mdc_is_subdir,
1676         .m_rename           = mdc_rename,
1677         .m_setattr          = mdc_setattr,
1678         .m_setxattr         = mdc_setxattr,
1679         .m_getxattr         = mdc_getxattr,
1680         .m_sync             = mdc_sync,
1681         .m_readpage         = mdc_readpage,
1682         .m_unlink           = mdc_unlink,
1683         .m_cancel_unused    = mdc_cancel_unused,
1684         .m_init_ea_size     = mdc_init_ea_size,
1685         .m_set_lock_data    = mdc_set_lock_data,
1686         .m_lock_match       = mdc_lock_match,
1687         .m_get_lustre_md    = mdc_get_lustre_md,
1688         .m_free_lustre_md   = mdc_free_lustre_md,
1689         .m_set_open_replay_data = mdc_set_open_replay_data,
1690         .m_clear_open_replay_data = mdc_clear_open_replay_data,
1691         .m_get_remote_perm  = mdc_get_remote_perm,
1692         .m_renew_capa       = mdc_renew_capa
1693 };
1694
1695 extern quota_interface_t mdc_quota_interface;
1696
1697 int __init mdc_init(void)
1698 {
1699         int rc;
1700         struct lprocfs_static_vars lvars;
1701         lprocfs_init_vars(mdc, &lvars);
1702         
1703         request_module("lquota");
1704         quota_interface = PORTAL_SYMBOL_GET(mdc_quota_interface);
1705         init_obd_quota_ops(quota_interface, &mdc_obd_ops);
1706
1707         rc = class_register_type(&mdc_obd_ops, &mdc_md_ops, lvars.module_vars,
1708                                  LUSTRE_MDC_NAME, NULL);
1709         if (rc && quota_interface)
1710                 PORTAL_SYMBOL_PUT(mdc_quota_interface);
1711
1712         RETURN(rc);
1713 }
1714
1715 #ifdef __KERNEL__
1716 static void /*__exit*/ mdc_exit(void)
1717 {
1718         if (quota_interface)
1719                 PORTAL_SYMBOL_PUT(mdc_quota_interface);
1720
1721         class_unregister_type(LUSTRE_MDC_NAME);
1722 }
1723
1724 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1725 MODULE_DESCRIPTION("Lustre Metadata Client");
1726 MODULE_LICENSE("GPL");
1727
1728 module_init(mdc_init);
1729 module_exit(mdc_exit);
1730 #endif