Whamcloud - gitweb
Branch: b_new_cmd
[fs/lustre-release.git] / lustre / mdc / mdc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_MDC
29
30 #ifdef __KERNEL__
31 # include <linux/module.h>
32 # include <linux/pagemap.h>
33 # include <linux/miscdevice.h>
34 # include <linux/init.h>
35 #else
36 # include <liblustre.h>
37 #endif
38
39 #include <linux/lustre_acl.h>
40 #include <obd_class.h>
41 #include <lustre_dlm.h>
42 #include <lustre_fid.h>
43 #include <md_object.h>
44 #include <lprocfs_status.h>
45 #include <lustre_param.h>
46 #include "mdc_internal.h"
47
48 static quota_interface_t *quota_interface;
49
50 #define REQUEST_MINOR 244
51
52 static int mdc_cleanup(struct obd_device *obd);
53
54 extern int mds_queue_req(struct ptlrpc_request *);
55 /* Helper that implements most of mdc_getstatus and signal_completed_replay. */
56 /* XXX this should become mdc_get_info("key"), sending MDS_GET_INFO RPC */
57 static int send_getstatus(struct obd_import *imp, struct lu_fid *rootfid,
58                           int level, int msg_flags)
59 {
60         struct ptlrpc_request *req;
61         struct mdt_body *body;
62         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
63         ENTRY;
64
65         req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_GETSTATUS, 2, size,
66                               NULL);
67         if (!req)
68                 GOTO(out, rc = -ENOMEM);
69
70         req->rq_send_state = level;
71         ptlrpc_req_set_repsize(req, 2, size);
72
73         mdc_pack_req_body(req, REQ_REC_OFF, 0, NULL, 0, 0);
74         lustre_msg_add_flags(req->rq_reqmsg, msg_flags);
75         rc = ptlrpc_queue_wait(req);
76
77         if (!rc) {
78                 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
79                                           lustre_swab_mdt_body);
80                 if (body == NULL) {
81                         CERROR ("Can't extract mdt_body\n");
82                         GOTO (out, rc = -EPROTO);
83                 }
84
85                 *rootfid = body->fid1;
86
87                 CDEBUG(D_NET, "root fid="DFID", last_committed="LPU64
88                        ", last_xid="LPU64"\n",
89                        PFID(rootfid),
90                        lustre_msg_get_last_committed(req->rq_repmsg),
91                        lustre_msg_get_last_xid(req->rq_repmsg));
92         }
93
94         EXIT;
95  out:
96         ptlrpc_req_finished(req);
97         return rc;
98 }
99
100 /* This should be mdc_get_info("rootfid") */
101 int mdc_getstatus(struct obd_export *exp, struct lu_fid *rootfid)
102 {
103         return send_getstatus(class_exp2cliimp(exp), rootfid, LUSTRE_IMP_FULL,
104                               0);
105 }
106
107 static
108 int mdc_getattr_common(struct obd_export *exp, unsigned int ea_size,
109                        unsigned int acl_size, struct ptlrpc_request *req)
110 {
111         struct mdt_body *body;
112         void *eadata;
113         int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
114         int bufcount = 2, rc;
115         ENTRY;
116         
117         /* request message already built */
118         if (ea_size != 0) {
119                 size[bufcount++] = ea_size;
120                 CDEBUG(D_INODE, "reserved %u bytes for MD/symlink in packet\n",
121                        ea_size);
122         }
123         if (acl_size) {
124                 size[bufcount++] = acl_size;
125                 CDEBUG(D_INODE, "reserved %u bytes for ACL\n", acl_size);
126         }
127
128         ptlrpc_req_set_repsize(req, bufcount, size);
129
130         rc = ptlrpc_queue_wait(req);
131         if (rc != 0)
132                 RETURN (rc);
133
134         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
135                                   lustre_swab_mdt_body);
136         if (body == NULL) {
137                 CERROR ("Can't unpack mdt_body\n");
138                 RETURN (-EPROTO);
139         }
140
141         CDEBUG(D_NET, "mode: %o\n", body->mode);
142
143         LASSERT_REPSWAB(req, REPLY_REC_OFF + 1);
144         if (body->eadatasize != 0) {
145                 /* reply indicates presence of eadata; check it's there... */
146                 eadata = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
147                                         body->eadatasize);
148                 if (eadata == NULL) {
149                         CERROR ("Missing/short eadata\n");
150                         RETURN (-EPROTO);
151                 }
152         }
153
154         if (body->valid & OBD_MD_FLMODEASIZE) {
155                 if (exp->exp_obd->u.cli.cl_max_mds_easize < body->max_mdsize)
156                         exp->exp_obd->u.cli.cl_max_mds_easize =
157                                                 body->max_mdsize;
158                 if (exp->exp_obd->u.cli.cl_max_mds_cookiesize <
159                                                 body->max_cookiesize)
160                         exp->exp_obd->u.cli.cl_max_mds_cookiesize =
161                                                 body->max_cookiesize;
162         }
163
164         RETURN (0);
165 }
166
167 int mdc_getattr(struct obd_export *exp, struct lu_fid *fid,
168                 obd_valid valid, int ea_size,
169                 struct ptlrpc_request **request)
170 {
171         struct ptlrpc_request *req;
172         int size[2] = { sizeof(struct ptlrpc_body), sizeof(struct mdt_body) };
173         int acl_size = 0, rc;
174         ENTRY;
175
176         /* XXX do we need to make another request here?  We just did a getattr
177          *     to do the lookup in the first place.
178          */
179         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
180                               MDS_GETATTR, 2, size, NULL);
181         if (!req)
182                 GOTO(out, rc = -ENOMEM);
183
184         mdc_pack_req_body(req, REQ_REC_OFF, valid, fid, ea_size,
185                           MDS_BFLAG_EXT_FLAGS/*request "new" flags(bug 9486)*/);
186
187         /* currently only root inode will call us with FLACL */
188
189         /* FIXME:XXX:reserve enough space regardless the flag temporarily.
190          * server will do lustre_shrink_reply();
191          * 
192          *if (valid & OBD_MD_FLACL)
193          */
194         acl_size = LUSTRE_POSIX_ACL_MAX_SIZE;
195
196          
197         rc = mdc_getattr_common(exp, ea_size, acl_size, req);
198         if (rc != 0) {
199                 ptlrpc_req_finished (req);
200                 req = NULL;
201         }
202  out:
203         *request = req;
204         RETURN (rc);
205 }
206
207 int mdc_getattr_name(struct obd_export *exp, struct lu_fid *fid,
208                      const char *filename, int namelen, obd_valid valid,
209                      int ea_size, struct ptlrpc_request **request)
210 {
211         struct ptlrpc_request *req;
212         struct mdt_body *body;
213         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), namelen};
214         ENTRY;
215
216         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
217                               MDS_GETATTR_NAME, 3, size, NULL);
218         if (!req)
219                 GOTO(out, rc = -ENOMEM);
220
221         mdc_pack_req_body(req, REQ_REC_OFF, valid, fid, ea_size,
222                           MDS_BFLAG_EXT_FLAGS/*request "new" flags(bug 9486)*/);
223  
224         LASSERT(strnlen(filename, namelen) == namelen - 1);
225         memcpy(lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, namelen),
226                filename, namelen);
227
228         rc = mdc_getattr_common(exp, ea_size, 0, req);
229         if (rc != 0) {
230                 ptlrpc_req_finished (req);
231                 req = NULL;
232         }
233  out:
234         *request = req;
235         RETURN(rc);
236 }
237
238 static
239 int mdc_xattr_common(struct obd_export *exp, struct lu_fid *fid,
240                      int opcode, obd_valid valid, const char *xattr_name,
241                      const char *input, int input_size, int output_size,
242                      int flags, struct ptlrpc_request **request)
243 {
244         struct ptlrpc_request *req;
245         int size[4] = { sizeof(struct ptlrpc_body), sizeof(struct mdt_body) };
246         // int size[3] = {sizeof(struct mdt_body)}, bufcnt = 1;
247         int rc, xattr_namelen = 0, bufcnt = 2, offset;
248         void *tmp;
249         ENTRY;
250
251         if (xattr_name) {
252                 xattr_namelen = strlen(xattr_name) + 1;
253                 size[bufcnt++] = xattr_namelen;
254         }
255         if (input_size) {
256                 LASSERT(input);
257                 size[bufcnt++] = input_size;
258         }
259
260         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
261                               opcode, bufcnt, size, NULL);
262         if (!req)
263                 GOTO(out, rc = -ENOMEM);
264
265         /* request data */
266         mdc_pack_req_body(req, REQ_REC_OFF, valid, fid, output_size, flags);
267
268         offset = REQ_REC_OFF + 1;
269
270         if (xattr_name) {
271                 tmp = lustre_msg_buf(req->rq_reqmsg, offset++, xattr_namelen);
272                 memcpy(tmp, xattr_name, xattr_namelen);
273         }
274         if (input_size) {
275                 tmp = lustre_msg_buf(req->rq_reqmsg, offset++, input_size);
276                 memcpy(tmp, input, input_size);
277         }
278
279         /* reply buffers */
280         if (opcode == MDS_GETXATTR) {
281                 size[REPLY_REC_OFF] = sizeof(struct mdt_body);
282                 bufcnt = 2;
283         } else {
284                 bufcnt = 1;
285         }
286
287         /* we do this even output_size is 0, because server is doing that */
288         size[bufcnt++] = output_size;
289         ptlrpc_req_set_repsize(req, bufcnt, size);
290
291         /* make rpc */
292         if (opcode == MDS_SETXATTR)
293                 mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
294
295         rc = ptlrpc_queue_wait(req);
296
297         if (opcode == MDS_SETXATTR)
298                 mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
299
300         if (rc != 0)
301                 GOTO(err_out, rc);
302
303         if (opcode == MDS_GETXATTR) {
304                 struct mdt_body * body = lustre_swab_repbuf(req, REPLY_REC_OFF,
305                                           sizeof(*body), lustre_swab_mdt_body);
306                 if (body == NULL) {
307                         CERROR ("Can't unpack mdt_body\n");
308                         GOTO(err_out, rc = -EPROTO);
309                 }
310         }
311 out:
312         *request = req;
313         RETURN (rc);
314 err_out:
315         ptlrpc_req_finished(req);
316         req = NULL;
317         goto out;
318 }
319
320 int mdc_setxattr(struct obd_export *exp, struct lu_fid *fid,
321                  obd_valid valid, const char *xattr_name,
322                  const char *input, int input_size,
323                  int output_size, int flags,
324                  struct ptlrpc_request **request)
325 {
326         return mdc_xattr_common(exp, fid, MDS_SETXATTR, valid, xattr_name,
327                                 input, input_size, output_size, flags, request);
328 }
329
330 int mdc_getxattr(struct obd_export *exp, struct lu_fid *fid,
331                  obd_valid valid, const char *xattr_name,
332                  const char *input, int input_size,
333                  int output_size, int flags, struct ptlrpc_request **request)
334 {
335         return mdc_xattr_common(exp, fid, MDS_GETXATTR, valid, xattr_name,
336                                 input, input_size, output_size, flags, request);
337 }
338
339 #ifdef CONFIG_FS_POSIX_ACL
340 static
341 int mdc_unpack_acl(struct obd_export *exp, struct ptlrpc_request *req,
342                    struct lustre_md *md, unsigned int offset)
343 {
344         struct mdt_body  *body = md->body;
345         struct posix_acl *acl;
346         void             *buf;
347         int               rc;
348
349         if (!body->aclsize)
350                 return 0;
351
352         buf = lustre_msg_buf(req->rq_repmsg, offset, body->aclsize);
353         if (!buf) {
354                 CERROR("aclsize %u, bufcount %u, bufsize %u\n",
355                        body->aclsize, lustre_msg_bufcount(req->rq_repmsg),
356                        (lustre_msg_bufcount(req->rq_repmsg) <= offset) ?
357                                 -1 : lustre_msg_buflen(req->rq_repmsg, offset));
358                 return -EPROTO;
359         }
360
361         acl = posix_acl_from_xattr(buf, body->aclsize);
362         if (IS_ERR(acl)) {
363                 rc = PTR_ERR(acl);
364                 CERROR("convert xattr to acl: %d\n", rc);
365                 return rc;
366         }
367
368         rc = posix_acl_valid(acl);
369         if (rc) {
370                 CERROR("validate acl: %d\n", rc);
371                 posix_acl_release(acl);
372                 return rc;
373         }
374
375         md->posix_acl = acl;
376         return 0;
377 }
378 #else
379 #define mdc_unpack_acl(exp, req, md, offset) 0
380 #endif
381
382 int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
383                       int offset, struct obd_export *dt_exp, struct lustre_md *md)
384 {
385         int rc = 0;
386         ENTRY;
387
388         LASSERT(md);
389         memset(md, 0, sizeof(*md));
390
391         md->body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*md->body));
392         LASSERT (md->body != NULL);
393         LASSERT_REPSWABBED(req, offset);
394         offset++;
395
396         if (!(md->body->valid & OBD_MD_FLEASIZE) &&
397             !(md->body->valid & OBD_MD_FLDIREA))
398                 RETURN(0);
399
400         if (md->body->valid & OBD_MD_FLEASIZE) {
401                 int lmmsize;
402                 struct lov_mds_md *lmm;
403
404                 LASSERT(S_ISREG(md->body->mode));
405
406                 if (md->body->eadatasize == 0) {
407                         CERROR ("OBD_MD_FLEASIZE set, but eadatasize 0\n");
408                         RETURN(-EPROTO);
409                 }
410                 lmmsize = md->body->eadatasize;
411                 lmm = lustre_msg_buf(req->rq_repmsg, offset, lmmsize);
412                 LASSERT (lmm != NULL);
413                 LASSERT_REPSWABBED(req, offset);
414
415                 rc = obd_unpackmd(dt_exp, &md->lsm, lmm, lmmsize);
416                 if (rc < 0)
417                         RETURN(rc);
418
419                 LASSERT (rc >= sizeof (*md->lsm));
420                 rc = 0;
421
422                 offset++;
423         } else if (md->body->valid & OBD_MD_FLDIREA) {
424                 /* TODO: umka, please handle this case */
425                 LASSERT(S_ISDIR(md->body->mode));
426                 offset++;
427         }
428
429         /* for ACL, it's possible that FLACL is set but aclsize is zero.  only
430          * when aclsize != 0 there's an actual segment for ACL in reply
431          * buffer. */
432         if ((md->body->valid & OBD_MD_FLACL) && md->body->aclsize) {
433                 rc = mdc_unpack_acl(dt_exp, req, md, offset);
434                 if (rc)
435                         GOTO(err_out, rc);
436                 offset++;
437         }
438 out:
439         RETURN(rc);
440
441 err_out:
442         if (md->lsm)
443                 obd_free_memmd(dt_exp, &md->lsm);
444         goto out;
445 }
446
447 int mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
448 {
449         ENTRY;
450         if (md->lsm)
451                 obd_free_memmd(exp, &md->lsm);
452
453 #ifdef CONFIG_FS_POSIX_ACL
454         if (md->posix_acl) {
455                 posix_acl_release(md->posix_acl);
456                 md->posix_acl = NULL;
457         }
458 #endif
459         RETURN(0);
460 }
461
462 static void mdc_commit_open(struct ptlrpc_request *req)
463 {
464         struct mdc_open_data *mod = req->rq_cb_data;
465         if (mod == NULL)
466                 return;
467
468         if (mod->mod_close_req != NULL)
469                 mod->mod_close_req->rq_cb_data = NULL;
470
471         if (mod->mod_och != NULL)
472                 mod->mod_och->och_mod = NULL;
473
474         OBD_FREE(mod, sizeof(*mod));
475         req->rq_cb_data = NULL;
476 }
477
478 static void mdc_replay_open(struct ptlrpc_request *req)
479 {
480         struct mdc_open_data *mod = req->rq_cb_data;
481         struct obd_client_handle *och;
482         struct ptlrpc_request *close_req;
483         struct lustre_handle old;
484         struct mdt_body *body;
485         ENTRY;
486
487         body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
488                                   lustre_swab_mdt_body);
489         LASSERT (body != NULL);
490
491         if (mod == NULL) {
492                 DEBUG_REQ(D_ERROR, req,
493                           "can't properly replay without open data");
494                 EXIT;
495                 return;
496         }
497
498         och = mod->mod_och;
499         if (och != NULL) {
500                 struct lustre_handle *file_fh;
501                 LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
502                 file_fh = &och->och_fh;
503                 CDEBUG(D_HA, "updating handle from "LPX64" to "LPX64"\n",
504                        file_fh->cookie, body->handle.cookie);
505                 memcpy(&old, file_fh, sizeof(old));
506                 memcpy(file_fh, &body->handle, sizeof(*file_fh));
507         }
508
509         close_req = mod->mod_close_req;
510         if (close_req != NULL) {
511                 struct mdt_body *close_body;
512                 LASSERT(lustre_msg_get_opc(close_req->rq_reqmsg) == MDS_CLOSE);
513                 close_body = lustre_msg_buf(close_req->rq_reqmsg, REQ_REC_OFF,
514                                             sizeof(*close_body));
515                 if (och != NULL)
516                         LASSERT(!memcmp(&old, &close_body->handle, sizeof old));
517                 DEBUG_REQ(D_HA, close_req, "updating close body with new fh");
518                 memcpy(&close_body->handle, &body->handle,
519                        sizeof(close_body->handle));
520         }
521
522         EXIT;
523 }
524
525 int mdc_set_open_replay_data(struct obd_export *exp,
526                              struct obd_client_handle *och,
527                              struct ptlrpc_request *open_req)
528 {
529         struct mdc_open_data *mod;
530         struct mdt_rec_create *rec = lustre_msg_buf(open_req->rq_reqmsg,
531                                                     DLM_INTENT_REC_OFF,
532                                                     sizeof(*rec));
533         struct mdt_body *body = lustre_msg_buf(open_req->rq_repmsg,
534                                                DLM_REPLY_REC_OFF,
535                                                sizeof(*body));
536         ENTRY;
537
538         /* incoming message in my byte order (it's been swabbed) */
539         LASSERT(rec != NULL);
540         LASSERT_REPSWABBED(open_req, DLM_REPLY_REC_OFF);
541         /* outgoing messages always in my byte order */
542         LASSERT(body != NULL);
543
544         if (och) {
545                 OBD_ALLOC(mod, sizeof(*mod));
546                 if (mod == NULL) {
547                         DEBUG_REQ(D_ERROR, open_req, "can't allocate mdc_open_data");
548                         RETURN(0);
549                 }
550
551                 och->och_mod = mod;
552                 mod->mod_och = och;
553                 mod->mod_open_req = open_req;
554                 open_req->rq_cb_data = mod;
555                 open_req->rq_commit_cb = mdc_commit_open;
556         }
557
558         rec->cr_fid2 = body->fid1;
559         open_req->rq_replay_cb = mdc_replay_open;
560         if (!fid_is_sane(&body->fid1)) {
561                 DEBUG_REQ(D_ERROR, open_req, "saving replay request with "
562                           "insane fid");
563                 LBUG();
564         }
565
566         DEBUG_REQ(D_HA, open_req, "set up replay data");
567         RETURN(0);
568 }
569
570 int mdc_clear_open_replay_data(struct obd_export *exp,
571                                struct obd_client_handle *och)
572 {
573         struct mdc_open_data *mod = och->och_mod;
574         ENTRY;
575
576         /* Don't free the structure now (it happens in mdc_commit_open, after
577          * we're sure we won't need to fix up the close request in the future),
578          * but make sure that replay doesn't poke at the och, which is about to
579          * be freed. */
580         LASSERT(mod != LP_POISON);
581         if (mod != NULL)
582                 mod->mod_och = NULL;
583         och->och_mod = NULL;
584         RETURN(0);
585 }
586
587 static void mdc_commit_close(struct ptlrpc_request *req)
588 {
589         struct mdc_open_data *mod = req->rq_cb_data;
590         struct ptlrpc_request *open_req;
591         struct obd_import *imp = req->rq_import;
592
593         DEBUG_REQ(D_HA, req, "close req committed");
594         if (mod == NULL)
595                 return;
596
597         mod->mod_close_req = NULL;
598         req->rq_cb_data = NULL;
599         req->rq_commit_cb = NULL;
600
601         open_req = mod->mod_open_req;
602         LASSERT(open_req != NULL);
603         LASSERT(open_req != LP_POISON);
604         LASSERT(open_req->rq_type != LI_POISON);
605
606         DEBUG_REQ(D_HA, open_req, "open req balanced");
607         LASSERT(open_req->rq_transno != 0);
608         LASSERT(open_req->rq_import == imp);
609
610         /* We no longer want to preserve this for transno-unconditional
611          * replay. */
612         spin_lock(&open_req->rq_lock);
613         open_req->rq_replay = 0;
614         spin_unlock(&open_req->rq_lock);
615 }
616
617 int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
618               struct obd_client_handle *och, struct ptlrpc_request **request)
619 {
620         struct obd_device *obd = class_exp2obd(exp);
621         int reqsize[2] = { sizeof(struct ptlrpc_body),
622                            sizeof(struct mdt_body) };
623         int rc, repsize[4] = { sizeof(struct ptlrpc_body),
624                                sizeof(struct mdt_body),
625                                obd->u.cli.cl_max_mds_easize,
626                                obd->u.cli.cl_max_mds_cookiesize };
627         struct ptlrpc_request *req;
628         struct mdc_open_data *mod;
629         ENTRY;
630
631         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
632                               MDS_CLOSE, 2, reqsize, NULL);
633         if (req == NULL)
634                 GOTO(out, rc = -ENOMEM);
635
636         /* To avoid a livelock (bug 7034), we need to send CLOSE RPCs to a
637          * portal whose threads are not taking any DLM locks and are therefore
638          * always progressing */
639         /* XXX FIXME bug 249 */
640         req->rq_request_portal = MDS_READPAGE_PORTAL;
641
642         /* Ensure that this close's handle is fixed up during replay. */
643         LASSERT(och != NULL);
644         LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
645         mod = och->och_mod;
646         if (likely(mod != NULL)) {
647                 mod->mod_close_req = req;
648                 if (mod->mod_open_req->rq_type == LI_POISON) {
649                         /* FIXME This should be an ASSERT, but until we
650                            figure out why it can be poisoned here, give
651                            a reasonable return. bug 6155 */
652                         CERROR("LBUG POISONED open %p!\n", mod->mod_open_req);
653                         ptlrpc_req_finished(req);
654                         req = NULL;
655                         GOTO(out, rc = -EIO);
656                 }
657                 DEBUG_REQ(D_HA, mod->mod_open_req, "matched open");
658         } else {
659                 CDEBUG(D_HA, "couldn't find open req; expecting close error\n");
660         }
661
662         mdc_close_pack(req, REQ_REC_OFF, op_data, op_data->valid, och);
663
664         ptlrpc_req_set_repsize(req, 4, repsize);
665         req->rq_commit_cb = mdc_commit_close;
666         LASSERT(req->rq_cb_data == NULL);
667         req->rq_cb_data = mod;
668
669         mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL);
670         rc = ptlrpc_queue_wait(req);
671         mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
672
673         if (req->rq_repmsg == NULL) {
674                 CDEBUG(D_HA, "request failed to send: %p, %d\n", req,
675                        req->rq_status);
676                 if (rc == 0)
677                         rc = req->rq_status ? req->rq_status : -EIO;
678         } else if (rc == 0) {
679                 rc = lustre_msg_get_status(req->rq_repmsg);
680                 if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
681                         DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err "
682                                   "= %d", rc);
683                         if (rc > 0)
684                                 rc = -rc;
685                 } else if (mod == NULL) {
686                         CERROR("Unexpected: can't find mdc_open_data, but the "
687                                "close succeeded.  Please tell CFS.\n");
688                 }
689                 if (!lustre_swab_repbuf(req, REPLY_REC_OFF,
690                                         sizeof(struct mdt_body),
691                                         lustre_swab_mdt_body)) {
692                         CERROR("Error unpacking mdt_body\n");
693                         rc = -EPROTO;
694                 }
695         }
696
697         EXIT;
698         *request = req;
699  out:
700         if (rc != 0 && req && req->rq_commit_cb)
701                 req->rq_commit_cb(req);
702
703         return rc;
704 }
705
706 int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data)
707 {
708         struct ptlrpc_request *req;
709         struct mdt_body *body;
710         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
711         ENTRY;
712
713         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
714                               MDS_DONE_WRITING, 2, size, NULL);
715         if (req == NULL)
716                 RETURN(-ENOMEM);
717
718         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
719         body->fid1 = op_data->fid1;
720         body->size = op_data->size;
721         body->blocks = op_data->blocks;
722         body->flags = op_data->flags;
723         body->valid = op_data->valid;
724
725         ptlrpc_req_set_repsize(req, 2, size);
726
727         rc = ptlrpc_queue_wait(req);
728         ptlrpc_req_finished(req);
729         RETURN(rc);
730 }
731
732 int mdc_readpage(struct obd_export *exp, struct lu_fid *fid, __u64 offset,
733                  struct page *page, struct ptlrpc_request **request)
734 {
735         struct obd_import *imp = class_exp2cliimp(exp);
736         struct ptlrpc_request *req = NULL;
737         struct ptlrpc_bulk_desc *desc = NULL;
738         struct mdt_body *body;
739         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
740         ENTRY;
741
742         CDEBUG(D_INODE, "object: "DFID"\n", PFID(fid));
743
744         req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_READPAGE, 2, size,
745                               NULL);
746         if (req == NULL)
747                 GOTO(out, rc = -ENOMEM);
748
749         /* XXX FIXME bug 249 */
750         req->rq_request_portal = MDS_READPAGE_PORTAL;
751
752         desc = ptlrpc_prep_bulk_imp(req, 1, BULK_PUT_SINK, MDS_BULK_PORTAL);
753         if (desc == NULL)
754                 GOTO(out, rc = -ENOMEM);
755         /* NB req now owns desc and will free it when it gets freed */
756
757         ptlrpc_prep_bulk_page(desc, page, 0, PAGE_CACHE_SIZE);
758
759         mdc_readdir_pack(req, REQ_REC_OFF, offset, PAGE_CACHE_SIZE, fid);
760
761         ptlrpc_req_set_repsize(req, 2, size);
762         rc = ptlrpc_queue_wait(req);
763
764         if (rc == 0) {
765                 body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
766                                           lustre_swab_mdt_body);
767                 if (body == NULL) {
768                         CERROR("Can't unpack mdt_body\n");
769                         GOTO(out, rc = -EPROTO);
770                 }
771
772                 if (req->rq_bulk->bd_nob_transferred != PAGE_CACHE_SIZE) {
773                         CERROR ("Unexpected # bytes transferred: %d"
774                                 " (%ld expected)\n",
775                                 req->rq_bulk->bd_nob_transferred,
776                                 PAGE_CACHE_SIZE);
777                         GOTO (out, rc = -EPROTO);
778                 }
779         }
780
781         EXIT;
782  out:
783         *request = req;
784         return rc;
785 }
786
787
788 static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
789                          void *karg, void *uarg)
790 {
791         struct obd_device *obd = exp->exp_obd;
792         struct obd_ioctl_data *data = karg;
793         struct obd_import *imp = obd->u.cli.cl_import;
794         struct llog_ctxt *ctxt;
795         int rc;
796         ENTRY;
797
798 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
799         MOD_INC_USE_COUNT;
800 #else
801         if (!try_module_get(THIS_MODULE)) {
802                 CERROR("Can't get module. Is it alive?");
803                 return -EINVAL;
804         }
805 #endif
806         switch (cmd) {
807         case OBD_IOC_CLIENT_RECOVER:
808                 rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1);
809                 if (rc < 0)
810                         GOTO(out, rc);
811                 GOTO(out, rc = 0);
812         case IOC_OSC_SET_ACTIVE:
813                 rc = ptlrpc_set_import_active(imp, data->ioc_offset);
814                 GOTO(out, rc);
815         case OBD_IOC_PARSE: {
816                 ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT);
817                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
818                 GOTO(out, rc);
819         }
820 #ifdef __KERNEL__
821         case OBD_IOC_LLOG_INFO:
822         case OBD_IOC_LLOG_PRINT: {
823                 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
824                 rc = llog_ioctl(ctxt, cmd, data);
825
826                 GOTO(out, rc);
827         }
828 #endif
829         case OBD_IOC_POLL_QUOTACHECK:
830                 rc = lquota_poll_check(quota_interface, exp,
831                                        (struct if_quotacheck *)karg);
832                 GOTO(out, rc);
833         default:
834                 CERROR("mdc_ioctl(): unrecognised ioctl %#x\n", cmd);
835                 GOTO(out, rc = -ENOTTY);
836         }
837 out:
838 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
839         MOD_DEC_USE_COUNT;
840 #else
841         module_put(THIS_MODULE);
842 #endif
843
844         return rc;
845 }
846
847 int mdc_set_info_async(struct obd_export *exp, obd_count keylen,
848                        void *key, obd_count vallen, void *val,
849                        struct ptlrpc_request_set *set)
850 {
851         struct obd_import *imp = class_exp2cliimp(exp);
852         int rc = -EINVAL;
853
854         if (KEY_IS(KEY_INIT_RECOV)) {
855                 if (vallen != sizeof(int))
856                         RETURN(-EINVAL);
857                 imp->imp_initial_recov = *(int *)val;
858                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
859                        exp->exp_obd->obd_name, imp->imp_initial_recov);
860                 RETURN(0);
861         }
862         /* Turn off initial_recov after we try all backup servers once */
863         if (KEY_IS(KEY_INIT_RECOV_BACKUP)) {
864                 if (vallen != sizeof(int))
865                         RETURN(-EINVAL);
866                 imp->imp_initial_recov_bk = *(int *)val;
867                 if (imp->imp_initial_recov_bk)
868                         imp->imp_initial_recov = 1;
869                 CDEBUG(D_HA, "%s: set imp_initial_recov_bk = %d\n",
870                        exp->exp_obd->obd_name, imp->imp_initial_recov_bk);
871                 RETURN(0);
872         }
873         if (KEY_IS("read-only")) {
874                 struct ptlrpc_request *req;
875                 int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
876                 char *bufs[3] = { NULL, key, val };
877
878                 if (vallen != sizeof(int))
879                         RETURN(-EINVAL);
880
881                 if (*((int *)val)) {
882                         imp->imp_connect_flags_orig |= OBD_CONNECT_RDONLY;
883                         imp->imp_connect_data.ocd_connect_flags |=
884                                 OBD_CONNECT_RDONLY;
885                 } else {
886                         imp->imp_connect_flags_orig &= ~OBD_CONNECT_RDONLY;
887                         imp->imp_connect_data.ocd_connect_flags &=
888                                 ~OBD_CONNECT_RDONLY;
889                 }
890
891                 req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_SET_INFO,
892                                       3, size, bufs);
893                 if (req == NULL)
894                         RETURN(-ENOMEM);
895
896                 ptlrpc_req_set_repsize(req, 1, NULL);
897                 if (set) {
898                         rc = 0;
899                         ptlrpc_set_add_req(set, req);
900                         ptlrpc_check_set(set);
901                 } else {
902                         rc = ptlrpc_queue_wait(req);
903                         ptlrpc_req_finished(req);
904                 }
905
906                 RETURN(rc);
907         }
908
909         RETURN(rc);
910 }
911
912 int mdc_get_info(struct obd_export *exp, __u32 keylen, void *key,
913                  __u32 *vallen, void *val)
914 {
915         int rc = -EINVAL;
916
917         if (keylen == strlen("max_easize") &&
918             memcmp(key, "max_easize", strlen("max_easize")) == 0) {
919                 int mdsize, *max_easize;
920
921                 if (*vallen != sizeof(int))
922                         RETURN(-EINVAL);
923                 mdsize = *(int*)val;
924                 if (mdsize > exp->exp_obd->u.cli.cl_max_mds_easize)
925                         exp->exp_obd->u.cli.cl_max_mds_easize = mdsize;
926                 max_easize = val;
927                 *max_easize = exp->exp_obd->u.cli.cl_max_mds_easize;
928                 RETURN(0);
929         }
930         RETURN(rc);
931 }
932
933 static int mdc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
934                       __u64 max_age)
935 {
936         struct ptlrpc_request *req;
937         struct obd_statfs *msfs;
938         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*msfs) };
939         ENTRY;
940
941         /* We could possibly pass max_age in the request (as an absolute
942          * timestamp or a "seconds.usec ago") so the target can avoid doing
943          * extra calls into the filesystem if that isn't necessary (e.g.
944          * during mount that would help a bit).  Having relative timestamps
945          * is not so great if request processing is slow, while absolute
946          * timestamps are not ideal because they need time synchronization. */
947         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_MDS_VERSION,
948                               MDS_STATFS, 1, NULL, NULL);
949         if (!req)
950                 RETURN(-ENOMEM);
951
952         ptlrpc_req_set_repsize(req, 2, size);
953
954         rc = ptlrpc_queue_wait(req);
955
956         if (rc)
957                 GOTO(out, rc);
958
959         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
960                                   lustre_swab_obd_statfs);
961         if (msfs == NULL) {
962                 CERROR("Can't unpack obd_statfs\n");
963                 GOTO(out, rc = -EPROTO);
964         }
965
966         memcpy(osfs, msfs, sizeof(*msfs));
967         EXIT;
968 out:
969         ptlrpc_req_finished(req);
970
971         return rc;
972 }
973
974 static int mdc_pin(struct obd_export *exp, struct lu_fid *fid,
975                    struct obd_client_handle *handle, int flag)
976 {
977         struct ptlrpc_request *req;
978         struct mdt_body *body;
979         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
980         ENTRY;
981
982         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
983                               MDS_PIN, 2, size, NULL);
984         if (req == NULL)
985                 RETURN(-ENOMEM);
986
987         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof (*body));
988         body->fid1 = *fid;
989         body->flags = flag;
990
991         ptlrpc_req_set_repsize(req, 2, size);
992
993         mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
994         rc = ptlrpc_queue_wait(req);
995         mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
996         if (rc) {
997                 CERROR("pin failed: %d\n", rc);
998                 ptlrpc_req_finished(req);
999                 RETURN(rc);
1000         }
1001
1002         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1003                                   lustre_swab_mdt_body);
1004         if (body == NULL) {
1005                 ptlrpc_req_finished(req);
1006                 RETURN(rc);
1007         }
1008
1009         memcpy(&handle->och_fh, &body->handle, sizeof(body->handle));
1010         handle->och_magic = OBD_CLIENT_HANDLE_MAGIC;
1011
1012         OBD_ALLOC(handle->och_mod, sizeof(*handle->och_mod));
1013         if (handle->och_mod == NULL) {
1014                 DEBUG_REQ(D_ERROR, req, "can't allocate mdc_open_data");
1015                 RETURN(-ENOMEM);
1016         }
1017         handle->och_mod->mod_open_req = req; /* will be dropped by unpin */
1018
1019         RETURN(rc);
1020 }
1021
1022 static int mdc_unpin(struct obd_export *exp,
1023                      struct obd_client_handle *handle, int flag)
1024 {
1025         struct ptlrpc_request *req;
1026         struct mdt_body *body;
1027         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
1028         ENTRY;
1029
1030         if (handle->och_magic != OBD_CLIENT_HANDLE_MAGIC)
1031                 RETURN(0);
1032
1033         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
1034                               MDS_CLOSE, 2, size, NULL);
1035         if (req == NULL)
1036                 RETURN(-ENOMEM);
1037
1038         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1039         memcpy(&body->handle, &handle->och_fh, sizeof(body->handle));
1040         body->flags = flag;
1041
1042         ptlrpc_req_set_repsize(req, 1, NULL);
1043         mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
1044         rc = ptlrpc_queue_wait(req);
1045         mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
1046
1047         if (rc != 0)
1048                 CERROR("unpin failed: %d\n", rc);
1049
1050         ptlrpc_req_finished(req);
1051         ptlrpc_req_finished(handle->och_mod->mod_open_req);
1052         OBD_FREE(handle->och_mod, sizeof(*handle->och_mod));
1053         RETURN(rc);
1054 }
1055
1056 int mdc_sync(struct obd_export *exp, struct lu_fid *fid,
1057              struct ptlrpc_request **request)
1058 {
1059         struct ptlrpc_request *req;
1060         int size[2] = { sizeof(struct ptlrpc_body), sizeof(struct mdt_body) };
1061         int rc;
1062         ENTRY;
1063
1064         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
1065                               MDS_SYNC, 2, size, NULL);
1066         if (!req)
1067                 RETURN(rc = -ENOMEM);
1068
1069         mdc_pack_req_body(req, REQ_REC_OFF, 0, fid, 0, 0);
1070
1071         ptlrpc_req_set_repsize(req, 2, size);
1072
1073         rc = ptlrpc_queue_wait(req);
1074         if (rc || request == NULL)
1075                 ptlrpc_req_finished(req);
1076         else
1077                 *request = req;
1078
1079         RETURN(rc);
1080 }
1081
1082 static int mdc_import_event(struct obd_device *obd, struct obd_import *imp,
1083                             enum obd_import_event event)
1084 {
1085         int rc = 0;
1086
1087         LASSERT(imp->imp_obd == obd);
1088
1089         switch (event) {
1090         case IMP_EVENT_DISCON: {
1091                 break;
1092         }
1093         case IMP_EVENT_INACTIVE: {
1094                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
1095                 break;
1096         }
1097         case IMP_EVENT_INVALIDATE: {
1098                 struct ldlm_namespace *ns = obd->obd_namespace;
1099
1100                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
1101
1102                 break;
1103         }
1104         case IMP_EVENT_ACTIVE: {
1105                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
1106                 break;
1107         }
1108         case IMP_EVENT_OCD:
1109                 break;
1110
1111         default:
1112                 CERROR("Unknown import event %x\n", event);
1113                 LBUG();
1114         }
1115         RETURN(rc);
1116 }
1117
1118 static int mdc_fid_init(struct obd_export *exp)
1119 {
1120         struct client_obd *cli = &exp->exp_obd->u.cli;
1121         int rc;
1122         ENTRY;
1123
1124         OBD_ALLOC_PTR(cli->cl_seq);
1125         if (cli->cl_seq == NULL)
1126                 RETURN(-ENOMEM);
1127
1128         /* init client side sequence-manager */
1129         rc = seq_client_init(cli->cl_seq,
1130                              exp->exp_obd->obd_name,
1131                              exp, LUSTRE_SEQ_METADATA);
1132         if (rc)
1133                 GOTO(out_free_seq, rc);
1134
1135         /* pre-allocate meta-sequence */
1136         rc = seq_client_alloc_meta(cli->cl_seq);
1137         if (rc) {
1138                 CERROR("can't allocate new mata-sequence, "
1139                        "rc %d\n", rc);
1140                 GOTO(out_free_seq, rc);
1141         }
1142         RETURN(rc);
1143
1144 out_free_seq:
1145         OBD_FREE_PTR(cli->cl_seq);
1146         cli->cl_seq = NULL;
1147         return rc;
1148 }
1149
1150 static int mdc_fid_fini(struct obd_export *exp)
1151 {
1152         struct client_obd *cli = &exp->exp_obd->u.cli;
1153         ENTRY;
1154
1155         if (cli->cl_seq != NULL) {
1156                 seq_client_fini(cli->cl_seq);
1157                 OBD_FREE_PTR(cli->cl_seq);
1158                 cli->cl_seq = NULL;
1159         }
1160         
1161         RETURN(0);
1162 }
1163
1164 static int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
1165                          struct lu_placement_hint *hint)
1166 {
1167         struct client_obd *cli = &exp->exp_obd->u.cli;
1168         struct lu_client_seq *seq = cli->cl_seq;
1169
1170         ENTRY;
1171         RETURN(seq_client_alloc_fid(seq, fid));
1172 }
1173
1174 static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
1175 {
1176         struct client_obd *cli = &obd->u.cli;
1177         struct lprocfs_static_vars lvars;
1178         int rc;
1179         ENTRY;
1180
1181         OBD_ALLOC(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
1182         if (!cli->cl_rpc_lock)
1183                 RETURN(-ENOMEM);
1184         mdc_init_rpc_lock(cli->cl_rpc_lock);
1185
1186         ptlrpcd_addref();
1187
1188         OBD_ALLOC(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
1189         if (!cli->cl_setattr_lock)
1190                 GOTO(err_rpc_lock, rc = -ENOMEM);
1191         mdc_init_rpc_lock(cli->cl_setattr_lock);
1192
1193         OBD_ALLOC(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
1194         if (!cli->cl_close_lock)
1195                 GOTO(err_setattr_lock, rc = -ENOMEM);
1196         mdc_init_rpc_lock(cli->cl_close_lock);
1197
1198         rc = client_obd_setup(obd, cfg);
1199         if (rc)
1200                 GOTO(err_close_lock, rc);
1201         lprocfs_init_vars(mdc, &lvars);
1202         lprocfs_obd_setup(obd, lvars.obd_vars);
1203
1204         rc = obd_llog_init(obd, obd, 0, NULL);
1205         if (rc) {
1206                 mdc_cleanup(obd);
1207                 CERROR("failed to setup llogging subsystems\n");
1208         }
1209
1210         RETURN(rc);
1211
1212 err_close_lock:
1213         OBD_FREE(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
1214 err_setattr_lock:
1215         OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
1216 err_rpc_lock:
1217         OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
1218         ptlrpcd_decref();
1219         RETURN(rc);
1220 }
1221
1222 /* Initialize the default and maximum LOV EA and cookie sizes.  This allows
1223  * us to make MDS RPCs with large enough reply buffers to hold the
1224  * maximum-sized (= maximum striped) EA and cookie without having to
1225  * calculate this (via a call into the LOV + OSCs) each time we make an RPC. */
1226 int mdc_init_ea_size(struct obd_export *exp, int easize,
1227                      int def_easize, int cookiesize)
1228 {
1229         struct obd_device *obd = exp->exp_obd;
1230         struct client_obd *cli = &obd->u.cli;
1231         ENTRY;
1232
1233         if (cli->cl_max_mds_easize < easize)
1234                 cli->cl_max_mds_easize = easize;
1235
1236         if (cli->cl_default_mds_easize < def_easize)
1237                 cli->cl_default_mds_easize = def_easize;
1238
1239         if (cli->cl_max_mds_cookiesize < cookiesize)
1240                 cli->cl_max_mds_cookiesize = cookiesize;
1241
1242         RETURN(0);
1243 }
1244
1245 static int mdc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
1246 {
1247         int rc = 0;
1248         ENTRY;
1249
1250         switch (stage) {
1251         case OBD_CLEANUP_EARLY:
1252         case OBD_CLEANUP_EXPORTS:
1253                 break;
1254         case OBD_CLEANUP_SELF_EXP:
1255                 rc = obd_llog_finish(obd, 0);
1256                 if (rc != 0)
1257                         CERROR("failed to cleanup llogging subsystems\n");
1258         case OBD_CLEANUP_OBD:
1259                 break;
1260         }
1261         RETURN(rc);
1262 }
1263
1264 static int mdc_cleanup(struct obd_device *obd)
1265 {
1266         struct client_obd *cli = &obd->u.cli;
1267
1268         OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
1269         OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
1270         OBD_FREE(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
1271
1272         lprocfs_obd_cleanup(obd);
1273         ptlrpcd_decref();
1274
1275         return client_obd_cleanup(obd);
1276 }
1277
1278
1279 static int mdc_llog_init(struct obd_device *obd, struct obd_device *tgt,
1280                          int count, struct llog_catid *logid)
1281 {
1282         struct llog_ctxt *ctxt;
1283         int rc;
1284         ENTRY;
1285
1286         rc = llog_setup(obd, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
1287                         &llog_client_ops);
1288         if (rc == 0) {
1289                 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
1290                 ctxt->loc_imp = obd->u.cli.cl_import;
1291         }
1292
1293         rc = llog_setup(obd, LLOG_LOVEA_REPL_CTXT, tgt, 0, NULL,
1294                        &llog_client_ops);
1295         if (rc == 0) {
1296                 ctxt = llog_get_context(obd, LLOG_LOVEA_REPL_CTXT);
1297                 ctxt->loc_imp = obd->u.cli.cl_import;
1298         }
1299
1300         RETURN(rc);
1301 }
1302
1303 static int mdc_llog_finish(struct obd_device *obd, int count)
1304 {
1305         int rc;
1306         ENTRY;
1307
1308         rc = llog_cleanup(llog_get_context(obd, LLOG_LOVEA_REPL_CTXT));
1309         if (rc) {
1310                 CERROR("can not cleanup LLOG_CONFIG_REPL_CTXT rc %d\n", rc);
1311         }
1312         rc = llog_cleanup(llog_get_context(obd, LLOG_CONFIG_REPL_CTXT));
1313         RETURN(rc);
1314 }
1315
1316 static int mdc_process_config(struct obd_device *obd, obd_count len, void *buf)
1317 {
1318         struct lustre_cfg *lcfg = buf;
1319         struct lprocfs_static_vars lvars;
1320         int rc = 0;
1321
1322         lprocfs_init_vars(mdc, &lvars);
1323         
1324         rc = class_process_proc_param(PARAM_MDC, lvars.obd_vars, lcfg, obd);
1325         return(rc);
1326 }
1327
1328 struct obd_ops mdc_obd_ops = {
1329         .o_owner            = THIS_MODULE,
1330         .o_setup            = mdc_setup,
1331         .o_precleanup       = mdc_precleanup,
1332         .o_cleanup          = mdc_cleanup,
1333         .o_add_conn         = client_import_add_conn,
1334         .o_del_conn         = client_import_del_conn,
1335         .o_connect          = client_connect_import,
1336         .o_disconnect       = client_disconnect_export,
1337         .o_iocontrol        = mdc_iocontrol,
1338         .o_set_info_async   = mdc_set_info_async,
1339         .o_statfs           = mdc_statfs,
1340         .o_pin              = mdc_pin,
1341         .o_unpin            = mdc_unpin,
1342         .o_fid_init         = mdc_fid_init,
1343         .o_fid_fini         = mdc_fid_fini,
1344         .o_fid_alloc        = mdc_fid_alloc,
1345         .o_import_event     = mdc_import_event,
1346         .o_llog_init        = mdc_llog_init,
1347         .o_llog_finish      = mdc_llog_finish,
1348         .o_get_info         = mdc_get_info,
1349         .o_process_config  = mdc_process_config,
1350 };
1351
1352 struct md_ops mdc_md_ops = {
1353         .m_getstatus        = mdc_getstatus,
1354         .m_change_cbdata    = mdc_change_cbdata,
1355         .m_close            = mdc_close,
1356         .m_create           = mdc_create,
1357         .m_done_writing     = mdc_done_writing,
1358         .m_enqueue          = mdc_enqueue,
1359         .m_getattr          = mdc_getattr,
1360         .m_getattr_name     = mdc_getattr_name,
1361         .m_intent_lock      = mdc_intent_lock,
1362         .m_link             = mdc_link,
1363         .m_rename           = mdc_rename,
1364         .m_setattr          = mdc_setattr,
1365         .m_setxattr         = mdc_setxattr,
1366         .m_getxattr         = mdc_getxattr,
1367         .m_sync             = mdc_sync,
1368         .m_readpage         = mdc_readpage,
1369         .m_unlink           = mdc_unlink,
1370         .m_cancel_unused    = mdc_cancel_unused,
1371         .m_init_ea_size     = mdc_init_ea_size,
1372         .m_set_lock_data    = mdc_set_lock_data,
1373         .m_lock_match       = mdc_lock_match,
1374         .m_get_lustre_md    = mdc_get_lustre_md,
1375         .m_free_lustre_md   = mdc_free_lustre_md,
1376         .m_set_open_replay_data = mdc_set_open_replay_data,
1377         .m_clear_open_replay_data = mdc_clear_open_replay_data
1378 };
1379
1380 extern quota_interface_t mdc_quota_interface;
1381
1382 int __init mdc_init(void)
1383 {
1384         int rc;
1385         struct lprocfs_static_vars lvars;
1386         lprocfs_init_vars(mdc, &lvars);
1387
1388         quota_interface = PORTAL_SYMBOL_GET(mdc_quota_interface);
1389         init_obd_quota_ops(quota_interface, &mdc_obd_ops);
1390
1391         rc = class_register_type(&mdc_obd_ops, &mdc_md_ops, lvars.module_vars,
1392                                  LUSTRE_MDC_NAME, NULL);
1393         if (rc && quota_interface)
1394                 PORTAL_SYMBOL_PUT(mdc_quota_interface);
1395
1396         RETURN(rc);
1397 }
1398
1399 #ifdef __KERNEL__
1400 static void /*__exit*/ mdc_exit(void)
1401 {
1402         if (quota_interface)
1403                 PORTAL_SYMBOL_PUT(mdc_quota_interface);
1404
1405         class_unregister_type(LUSTRE_MDC_NAME);
1406 }
1407
1408 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1409 MODULE_DESCRIPTION("Lustre Metadata Client");
1410 MODULE_LICENSE("GPL");
1411
1412 module_init(mdc_init);
1413 module_exit(mdc_exit);
1414 #endif