Whamcloud - gitweb
Fix confusing MDC error message
[fs/lustre-release.git] / lustre / mdc / mdc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.sf.net/projects/lustre/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_MDC
26
27 #ifdef __KERNEL__
28 # include <linux/module.h>
29 # include <linux/pagemap.h>
30 # include <linux/miscdevice.h>
31 # include <linux/init.h>
32 #else
33 # include <liblustre.h>
34 #endif
35
36 #include <linux/obd_class.h>
37 #include <linux/lustre_mds.h>
38 #include <linux/lustre_dlm.h>
39 #include <linux/lprocfs_status.h>
40 #include "mdc_internal.h"
41
42 #define REQUEST_MINOR 244
43
44 extern int mds_queue_req(struct ptlrpc_request *);
45 /* Helper that implements most of mdc_getstatus and signal_completed_replay. */
46 /* XXX this should become mdc_get_info("key"), sending MDS_GET_INFO RPC */
47 static int send_getstatus(struct obd_import *imp, struct ll_fid *rootfid,
48                           int level, int msg_flags)
49 {
50         struct ptlrpc_request *req;
51         struct mds_body *body;
52         int rc, size = sizeof(*body);
53         ENTRY;
54
55         req = ptlrpc_prep_req(imp, MDS_GETSTATUS, 1, &size, NULL);
56         if (!req)
57                 GOTO(out, rc = -ENOMEM);
58
59         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
60         req->rq_send_state = level;
61         req->rq_replen = lustre_msg_size(1, &size);
62
63         mdc_pack_req_body(req);
64         req->rq_reqmsg->flags |= msg_flags;
65         rc = ptlrpc_queue_wait(req);
66
67         if (!rc) {
68                 body = lustre_swab_repbuf (req, 0, sizeof (*body),
69                                            lustre_swab_mds_body);
70                 if (body == NULL) {
71                         CERROR ("Can't extract mds_body\n");
72                         GOTO (out, rc = -EPROTO);
73                 }
74
75                 memcpy(rootfid, &body->fid1, sizeof(*rootfid));
76
77                 CDEBUG(D_NET, "root ino="LPU64", last_committed="LPU64
78                        ", last_xid="LPU64"\n",
79                        rootfid->id, req->rq_repmsg->last_committed,
80                        req->rq_repmsg->last_xid);
81         }
82
83         EXIT;
84  out:
85         ptlrpc_req_finished(req);
86         return rc;
87 }
88
89 /* This should be mdc_get_info("rootfid") */
90 int mdc_getstatus(struct obd_export *exp, struct ll_fid *rootfid)
91 {
92         return send_getstatus(class_exp2cliimp(exp), rootfid, LUSTRE_IMP_FULL,
93                               0);
94 }
95
96 int mdc_getattr_common(struct obd_export *exp, unsigned int ea_size, 
97                        struct ptlrpc_request *req)
98 {
99         struct mds_body *body;
100         void            *eadata;
101         int              rc;
102         int              size[2] = {sizeof(*body), 0};
103         int              bufcount = 1;
104         ENTRY;
105
106         /* request message already built */
107
108         if (ea_size != 0) {
109                 size[bufcount++] = ea_size;
110                 CDEBUG(D_INODE, "reserved %u bytes for MD/symlink in packet\n",
111                        ea_size);
112         }
113         req->rq_replen = lustre_msg_size(bufcount, size);
114
115         mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
116         rc = ptlrpc_queue_wait(req);
117         mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
118         if (rc != 0)
119                 RETURN (rc);
120
121         body = lustre_swab_repbuf (req, 0, sizeof (*body),
122                                    lustre_swab_mds_body);
123         if (body == NULL) {
124                 CERROR ("Can't unpack mds_body\n");
125                 RETURN (-EPROTO);
126         }
127
128         CDEBUG(D_NET, "mode: %o\n", body->mode);
129
130         LASSERT_REPSWAB (req, 1);
131         if (body->eadatasize != 0) {
132                 /* reply indicates presence of eadata; check it's there... */
133                 eadata = lustre_msg_buf (req->rq_repmsg, 1, body->eadatasize);
134                 if (eadata == NULL) {
135                         CERROR ("Missing/short eadata\n");
136                         RETURN (-EPROTO);
137                 }
138         }
139
140         RETURN (0);
141 }
142
143 int mdc_getattr(struct obd_export *exp, struct ll_fid *fid,
144                 unsigned long valid, unsigned int ea_size,
145                 struct ptlrpc_request **request)
146 {
147         struct ptlrpc_request *req;
148         struct mds_body *body;
149         int size = sizeof(*body);
150         int rc;
151         ENTRY;
152
153         /* XXX do we need to make another request here?  We just did a getattr
154          *     to do the lookup in the first place.
155          */
156         req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_GETATTR, 1, &size,
157                               NULL);
158         if (!req)
159                 GOTO(out, rc = -ENOMEM);
160
161         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
162         memcpy(&body->fid1, fid, sizeof(*fid));
163         body->valid = valid;
164         body->eadatasize = ea_size;
165         mdc_pack_req_body(req);
166
167         rc = mdc_getattr_common(exp, ea_size, req);
168         if (rc != 0) {
169                 ptlrpc_req_finished (req);
170                 req = NULL;
171         }
172  out:
173         *request = req;
174         RETURN (rc);
175 }
176
177 int mdc_getattr_name(struct obd_export *exp, struct ll_fid *fid,
178                      char *filename, int namelen, unsigned long valid,
179                      unsigned int ea_size, struct ptlrpc_request **request)
180 {
181         struct ptlrpc_request *req;
182         struct mds_body *body;
183         int rc, size[2] = {sizeof(*body), namelen};
184         ENTRY;
185
186         req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_GETATTR_NAME, 2,
187                               size, NULL);
188         if (!req)
189                 GOTO(out, rc = -ENOMEM);
190
191         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
192         memcpy(&body->fid1, fid, sizeof(*fid));
193         body->valid = valid;
194         body->eadatasize = ea_size;
195         mdc_pack_req_body(req);
196
197         LASSERT (strnlen (filename, namelen) == namelen - 1);
198         memcpy(lustre_msg_buf(req->rq_reqmsg, 1, namelen), filename, namelen);
199
200         rc = mdc_getattr_common(exp, ea_size, req);
201         if (rc != 0) {
202                 ptlrpc_req_finished (req);
203                 req = NULL;
204         }
205  out:
206         *request = req;
207         RETURN(rc);
208 }
209
210 /* This should be called with both the request and the reply still packed. */
211 void mdc_store_inode_generation(struct ptlrpc_request *req, int reqoff,
212                                 int repoff)
213 {
214         struct mds_rec_create *rec =
215                 lustre_msg_buf(req->rq_reqmsg, reqoff, sizeof(*rec));
216         struct mds_body *body =
217                 lustre_msg_buf(req->rq_repmsg, repoff, sizeof(*body));
218
219         LASSERT (rec != NULL);
220         LASSERT (body != NULL);
221
222         memcpy(&rec->cr_replayfid, &body->fid1, sizeof rec->cr_replayfid);
223         DEBUG_REQ(D_HA, req, "storing generation %u for ino "LPU64,
224                   rec->cr_replayfid.generation, rec->cr_replayfid.id);
225 }
226
227 int mdc_req2lustre_md(struct ptlrpc_request *req, int offset,
228                       struct obd_export *exp,
229                       struct lustre_md *md)
230 {
231         int rc = 0;
232         ENTRY;
233
234         LASSERT(md);
235         memset(md, 0, sizeof(*md));
236
237         md->body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*md->body));
238         LASSERT (md->body != NULL);
239         LASSERT_REPSWABBED (req, offset);
240
241         if (md->body->valid & OBD_MD_FLEASIZE) {
242                 int lmmsize;
243                 struct lov_mds_md *lmm;
244
245                 LASSERT(S_ISREG(md->body->mode));
246
247                 if (md->body->eadatasize == 0) {
248                         CERROR ("OBD_MD_FLEASIZE set, but eadatasize 0\n");
249                         RETURN(-EPROTO);
250                 }
251                 lmmsize = md->body->eadatasize;
252                 lmm = lustre_msg_buf(req->rq_repmsg, offset + 1, lmmsize);
253                 LASSERT (lmm != NULL);
254                 LASSERT_REPSWABBED (req, offset + 1);
255
256                 rc = obd_unpackmd(exp, &md->lsm, lmm, lmmsize);
257                 if (rc >= 0) {
258                         LASSERT (rc >= sizeof (*md->lsm));
259                         rc = 0;
260                 }
261         }
262         RETURN(rc);
263 }
264
265 static void mdc_commit_open(struct ptlrpc_request *req)
266 {
267         struct mdc_open_data *mod = req->rq_cb_data;
268         if (mod == NULL)
269                 return;
270
271         if (mod->mod_close_req != NULL)
272                 mod->mod_close_req->rq_cb_data = NULL;
273
274         if (mod->mod_och != NULL)
275                 mod->mod_och->och_mod = NULL;
276
277         OBD_FREE(mod, sizeof(*mod));
278         req->rq_cb_data = NULL;
279 }
280
281 static void mdc_replay_open(struct ptlrpc_request *req)
282 {
283         struct mdc_open_data *mod = req->rq_cb_data;
284         struct obd_client_handle *och;
285         struct ptlrpc_request *close_req;
286         struct lustre_handle old; 
287         struct mds_body *body;
288         ENTRY;
289
290         body = lustre_swab_repbuf(req, 1, sizeof(*body), lustre_swab_mds_body);
291         LASSERT (body != NULL);
292
293         if (mod == NULL) {
294                 DEBUG_REQ(D_ERROR, req,
295                           "can't properly replay without open data");
296                 EXIT;
297                 return;
298         }
299
300         och = mod->mod_och;
301         if (och != NULL) {
302                 struct lustre_handle *file_fh; 
303                 LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
304                 file_fh = &och->och_fh;
305                 CDEBUG(D_HA, "updating handle from "LPX64" to "LPX64"\n",
306                        file_fh->cookie, body->handle.cookie);
307                 memcpy(&old, file_fh, sizeof(old));
308                 memcpy(file_fh, &body->handle, sizeof(*file_fh));
309         } 
310
311         close_req = mod->mod_close_req;
312         if (close_req != NULL) {
313                 struct mds_body *close_body;
314                 LASSERT(close_req->rq_reqmsg->opc == MDS_CLOSE);
315                 close_body = lustre_msg_buf(close_req->rq_reqmsg, 0,
316                                             sizeof(*close_body));
317                 if (och != NULL)
318                         LASSERT(!memcmp(&old, &close_body->handle, sizeof old));
319                 DEBUG_REQ(D_HA, close_req, "updating close body with new fh");
320                 memcpy(&close_body->handle, &body->handle,
321                        sizeof(close_body->handle));
322         }
323
324         EXIT;
325 }
326
327 void mdc_set_open_replay_data(struct obd_client_handle *och,
328                               struct ptlrpc_request *open_req)
329 {
330         struct mdc_open_data *mod;
331         struct mds_rec_create *rec =
332                 lustre_msg_buf(open_req->rq_reqmsg, 2, sizeof(*rec));
333         struct mds_body *body =
334                 lustre_msg_buf(open_req->rq_repmsg, 1, sizeof(*body));
335
336         LASSERT(rec != NULL);
337         /* outgoing messages always in my byte order */
338         LASSERT(body != NULL);
339         /* incoming message in my byte order (it's been swabbed) */
340         LASSERT_REPSWABBED(open_req, 1);
341
342         OBD_ALLOC(mod, sizeof(*mod));
343         if (mod == NULL) {
344                 DEBUG_REQ(D_ERROR, open_req, "can't allocate mdc_open_data");
345                 return;
346         }
347
348         och->och_mod = mod;
349         mod->mod_och = och;
350         mod->mod_open_req = open_req;
351
352         memcpy(&rec->cr_replayfid, &body->fid1, sizeof rec->cr_replayfid);
353         open_req->rq_replay_cb = mdc_replay_open;
354         open_req->rq_commit_cb = mdc_commit_open;
355         open_req->rq_cb_data = mod;
356         DEBUG_REQ(D_HA, open_req, "set up replay data");
357 }
358
359 void mdc_clear_open_replay_data(struct obd_client_handle *och)
360 {
361         struct mdc_open_data *mod = och->och_mod;
362
363         /* Don't free the structure now (it happens in mdc_commit_open, after
364          * we're sure we won't need to fix up the close request in the future),
365          * but make sure that replay doesn't poke at the och, which is about to
366          * be freed. */
367         LASSERT(mod != (void *)0x5a5a5a5a);
368         if (mod != NULL)
369                 mod->mod_och = NULL;
370         och->och_mod = NULL;
371 }
372
373 static void mdc_commit_close(struct ptlrpc_request *req)
374 {
375         struct mdc_open_data *mod = req->rq_cb_data;
376         struct ptlrpc_request *open_req;
377         struct obd_import *imp = req->rq_import;
378
379         DEBUG_REQ(D_HA, req, "close req committed");
380         if (mod == NULL)
381                 return;
382
383         mod->mod_close_req = NULL;
384         req->rq_cb_data = NULL;
385         req->rq_commit_cb = NULL;
386
387         open_req = mod->mod_open_req;
388         LASSERT(open_req != NULL);
389         LASSERT(open_req != (void *)0x5a5a5a5a);
390
391         DEBUG_REQ(D_HA, open_req, "open req balanced");
392         LASSERT(open_req->rq_transno != 0);
393         LASSERT(open_req->rq_import == imp);
394
395         /* We no longer want to preserve this for transno-unconditional
396          * replay. */
397         spin_lock(&open_req->rq_lock);
398         open_req->rq_replay = 0;
399         spin_unlock(&open_req->rq_lock);
400 }
401
402 static int mdc_close_interpret(struct ptlrpc_request *req, void *data, int rc)
403 {
404         union ptlrpc_async_args *aa = data;
405         struct mdc_rpc_lock *rpc_lock = aa->pointer_arg[0];
406         mdc_put_rpc_lock(rpc_lock, NULL);
407         wake_up(&req->rq_reply_waitq);
408         RETURN(rc);
409 }
410
411 /* We can't use ptlrpc_check_reply, because we don't want to wake up for 
412  * anything but a reply or an error. */
413 static int mdc_close_check_reply(struct ptlrpc_request *req)
414 {
415         int rc = 0;
416         unsigned long flags;
417
418         spin_lock_irqsave(&req->rq_lock, flags);
419         if (req->rq_replied || req->rq_err)
420                 rc = 1;
421         spin_unlock_irqrestore (&req->rq_lock, flags);
422         return rc;
423 }
424
425 static int go_back_to_sleep(void *unused)
426 {
427         return 0;
428 }
429
430 int mdc_close(struct obd_export *exp, struct obdo *obdo,
431               struct obd_client_handle *och, struct ptlrpc_request **request)
432 {
433         struct mds_body *body;
434         struct obd_device *obd = class_exp2obd(exp);
435         int reqsize = sizeof(*body);
436         int rc, repsize[3] = {sizeof(*body),
437                               obd->u.cli.cl_max_mds_easize,
438                               obd->u.cli.cl_max_mds_cookiesize};
439         struct ptlrpc_request *req;
440         struct mdc_open_data *mod;
441         struct l_wait_info lwi;
442         struct mdc_rpc_lock *rpc_lock = obd->u.cli.cl_rpc_lock;
443         ENTRY;
444
445         req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_CLOSE, 1, &reqsize,
446                               NULL);
447         if (req == NULL)
448                 GOTO(out, rc = -ENOMEM);
449
450         /* Ensure that this close's handle is fixed up during replay. */
451         LASSERT(och != NULL);
452         mod = och->och_mod;
453         if (likely(mod != NULL)) {
454                 mod->mod_close_req = req;
455                 DEBUG_REQ(D_HA, mod->mod_open_req, "matched open req %p",
456                           mod->mod_open_req);
457         } else {
458                 CDEBUG(D_HA, "couldn't find open req; expecting close error\n");
459         }
460
461         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
462         mdc_pack_fid(&body->fid1, obdo->o_id, 0, obdo->o_mode);
463         memcpy(&body->handle, &och->och_fh, sizeof(body->handle));
464         body->size = obdo->o_size;
465         body->blocks = obdo->o_blocks;
466         body->flags = obdo->o_flags;
467         body->valid = obdo->o_valid;
468
469         req->rq_replen = lustre_msg_size(3, repsize);
470         req->rq_commit_cb = mdc_commit_close;
471         LASSERT(req->rq_cb_data == NULL);
472         req->rq_cb_data = mod;
473
474         /* We hand a ref to the rpcd here, so we need another one of our own. */
475         ptlrpc_request_addref(req);
476
477         mdc_get_rpc_lock(rpc_lock, NULL);
478         req->rq_interpret_reply = mdc_close_interpret;
479         req->rq_async_args.pointer_arg[0] = rpc_lock;
480         ptlrpcd_add_req(req);
481         lwi = LWI_TIMEOUT_INTR(MAX(req->rq_timeout * HZ, 1), go_back_to_sleep,
482                                NULL, NULL);
483         rc = l_wait_event(req->rq_reply_waitq, mdc_close_check_reply(req),
484                           &lwi);
485         
486         if (mod == NULL && rc == 0)
487                 CERROR("Unexpected: can't find mdc_open_data, but the close "
488                        "succeeded.  Please tell CFS.\n");
489
490         if (rc == 0) {
491                 rc = req->rq_repmsg->status;
492                 if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
493                         DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR");
494                         if (rc > 0)
495                                 rc = -rc;
496                 }
497         }
498
499         EXIT;
500  out:
501         *request = req;
502         return rc;
503 }
504
505 int mdc_done_writing(struct obd_export *exp, struct obdo *obdo)
506 {
507         struct ptlrpc_request *req;
508         struct mds_body *body;
509         int rc, size = sizeof(*body);
510         ENTRY;
511
512         req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_DONE_WRITING, 1,
513                               &size, NULL);
514         if (req == NULL)
515                 RETURN(-ENOMEM);
516
517         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
518         mdc_pack_fid(&body->fid1, obdo->o_id, 0, obdo->o_mode);
519         body->size = obdo->o_size;
520         body->blocks = obdo->o_blocks;
521         body->flags = obdo->o_flags;
522         body->valid = obdo->o_valid;
523 //        memcpy(&body->handle, &och->och_fh, sizeof(body->handle));
524
525         req->rq_replen = lustre_msg_size(1, &size);
526
527         rc = ptlrpc_queue_wait(req);
528         ptlrpc_req_finished(req);
529         RETURN(rc);
530 }
531
532 int mdc_readpage(struct obd_export *exp, struct ll_fid *mdc_fid, __u64 offset,
533                  struct page *page, struct ptlrpc_request **request)
534 {
535         struct obd_import *imp = class_exp2cliimp(exp);
536         struct ptlrpc_request *req = NULL;
537         struct ptlrpc_bulk_desc *desc = NULL;
538         struct mds_body *body;
539         int rc, size = sizeof(*body);
540         ENTRY;
541
542         CDEBUG(D_INODE, "inode: %ld\n", (long)mdc_fid->id);
543
544         req = ptlrpc_prep_req(imp, MDS_READPAGE, 1, &size, NULL);
545         if (req == NULL)
546                 GOTO(out, rc = -ENOMEM);
547         /* XXX FIXME bug 249 */
548         req->rq_request_portal = MDS_READPAGE_PORTAL;
549
550         desc = ptlrpc_prep_bulk_imp(req, BULK_PUT_SINK, MDS_BULK_PORTAL);
551         if (desc == NULL)
552                 GOTO(out, rc = -ENOMEM);
553         /* NB req now owns desc and will free it when it gets freed */
554
555         rc = ptlrpc_prep_bulk_page(desc, page, 0, PAGE_CACHE_SIZE);
556         if (rc != 0)
557                 GOTO(out, rc);
558
559         mdc_readdir_pack(req, offset, PAGE_CACHE_SIZE, mdc_fid);
560
561         req->rq_replen = lustre_msg_size(1, &size);
562         rc = ptlrpc_queue_wait(req);
563
564         if (rc == 0) {
565                 LASSERT(desc->bd_page_count == 1);
566                 body = lustre_swab_repbuf(req, 0, sizeof(*body),
567                                           lustre_swab_mds_body);
568                 if (body == NULL) {
569                         CERROR("Can't unpack mds_body\n");
570                         GOTO(out, rc = -EPROTO);
571                 }
572         }
573
574         EXIT;
575  out:
576         *request = req;
577         return rc;
578 }
579
580 static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
581                          void *karg, void *uarg)
582 {
583         struct obd_device *obd = exp->exp_obd;
584         struct obd_ioctl_data *data = karg;
585         struct obd_import *imp = obd->u.cli.cl_import;
586         struct llog_ctxt *ctxt;
587         int rc;
588         ENTRY;
589         
590         switch (cmd) {
591         case OBD_IOC_CLIENT_RECOVER:
592                 rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1);
593                 if (rc < 0)
594                         RETURN(rc);
595                 RETURN(0);
596         case IOC_OSC_SET_ACTIVE:
597                 RETURN(ptlrpc_set_import_active(imp, data->ioc_offset));
598         case OBD_IOC_PARSE: {
599                 ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT);
600                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
601                 RETURN(rc);
602         }
603 #ifdef __KERNEL__
604         case OBD_IOC_LLOG_INFO:
605         case OBD_IOC_LLOG_PRINT: {
606                 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
607                 rc = llog_ioctl(ctxt, cmd, data);
608                 
609                 RETURN(rc);
610         }
611 #endif
612         default:
613                 CERROR("osc_ioctl(): unrecognised ioctl %#x\n", cmd);
614                 RETURN(-ENOTTY);
615         }
616 }
617
618 static int mdc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
619                       unsigned long max_age)
620 {
621         struct ptlrpc_request *req;
622         struct obd_statfs *msfs;
623         int rc, size = sizeof(*msfs);
624         ENTRY;
625
626         /* We could possibly pass max_age in the request (as an absolute
627          * timestamp or a "seconds.usec ago") so the target can avoid doing
628          * extra calls into the filesystem if that isn't necessary (e.g.
629          * during mount that would help a bit).  Having relative timestamps
630          * is not so great if request processing is slow, while absolute
631          * timestamps are not ideal because they need time synchronization. */
632         req = ptlrpc_prep_req(obd->u.cli.cl_import, MDS_STATFS, 0, NULL, NULL);
633         if (!req)
634                 RETURN(-ENOMEM);
635
636         req->rq_replen = lustre_msg_size(1, &size);
637
638         mdc_get_rpc_lock(obd->u.cli.cl_rpc_lock, NULL);
639         rc = ptlrpc_queue_wait(req);
640         mdc_put_rpc_lock(obd->u.cli.cl_rpc_lock, NULL);
641
642         if (rc)
643                 GOTO(out, rc);
644
645         msfs = lustre_swab_repbuf(req, 0, sizeof(*msfs),lustre_swab_obd_statfs);
646         if (msfs == NULL) {
647                 CERROR("Can't unpack obd_statfs\n");
648                 GOTO(out, rc = -EPROTO);
649         }
650
651         memcpy(osfs, msfs, sizeof (*msfs));
652         EXIT;
653 out:
654         ptlrpc_req_finished(req);
655
656         return rc;
657 }
658
659 static int mdc_pin(struct obd_export *exp, obd_id ino, __u32 gen, int type,
660                    struct obd_client_handle *handle, int flag)
661 {
662         struct ptlrpc_request *req;
663         struct mds_body *body;
664         int rc, size = sizeof(*body);
665         ENTRY;
666
667         req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_PIN, 1, &size, NULL);
668         if (req == NULL)
669                 RETURN(-ENOMEM);
670
671         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
672         mdc_pack_fid(&body->fid1, ino, gen, type);
673         body->flags = flag;
674
675         req->rq_replen = lustre_msg_size(1, &size);
676
677         mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
678         rc = ptlrpc_queue_wait(req);
679         mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
680         if (rc) {
681                 CERROR("pin failed: %d\n", rc);
682                 ptlrpc_req_finished(req);
683                 RETURN(rc);
684         }
685
686         body = lustre_swab_repbuf(req, 0, sizeof(*body), lustre_swab_mds_body);
687         if (body == NULL) {
688                 ptlrpc_req_finished(req);
689                 RETURN(rc);
690         }
691
692         memcpy(&handle->och_fh, &body->handle, sizeof(body->handle));
693         handle->och_magic = OBD_CLIENT_HANDLE_MAGIC;
694
695         OBD_ALLOC(handle->och_mod, sizeof(*handle->och_mod));
696         if (handle->och_mod == NULL) {
697                 DEBUG_REQ(D_ERROR, req, "can't allocate mdc_open_data");
698                 RETURN(-ENOMEM);
699         }
700         handle->och_mod->mod_open_req = req; /* will be dropped by unpin */
701
702         RETURN(rc);
703 }
704
705 static int mdc_unpin(struct obd_export *exp,
706                      struct obd_client_handle *handle, int flag)
707 {
708         struct ptlrpc_request *req;
709         struct mds_body *body;
710         int rc, size = sizeof(*body);
711         ENTRY;
712
713         if (handle->och_magic != OBD_CLIENT_HANDLE_MAGIC)
714                 RETURN(0);
715
716         req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_CLOSE, 1, &size, NULL);
717         if (req == NULL)
718                 RETURN(-ENOMEM);
719
720         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
721         memcpy(&body->handle, &handle->och_fh, sizeof(body->handle));
722         body->flags = flag;
723
724         req->rq_replen = lustre_msg_size(0, NULL);
725         mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
726         rc = ptlrpc_queue_wait(req);
727         mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
728
729         if (rc != 0)
730                 CERROR("unpin failed: %d\n", rc);
731
732         ptlrpc_req_finished(req);
733         ptlrpc_req_finished(handle->och_mod->mod_open_req);
734         OBD_FREE(handle->och_mod, sizeof(*handle->och_mod));
735         RETURN(rc);
736 }
737
738 int mdc_sync(struct obd_export *exp, struct ll_fid *fid,
739              struct ptlrpc_request **request)
740 {
741         struct ptlrpc_request *req;
742         struct mds_body *body;
743         int size = sizeof(*body);
744         int rc;
745         ENTRY;
746
747         req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_SYNC, 1,&size,NULL);
748         if (!req)
749                 RETURN(rc = -ENOMEM);
750
751         if (fid) {
752                 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
753                 memcpy(&body->fid1, fid, sizeof(*fid));
754                 mdc_pack_req_body(req);
755         }
756
757         req->rq_replen = lustre_msg_size(1, &size);
758
759         rc = ptlrpc_queue_wait(req);
760         if (rc || request == NULL)
761                 ptlrpc_req_finished(req);
762         else
763                 *request = req;
764
765         RETURN(rc);
766 }
767
768 static int mdc_attach(struct obd_device *dev, obd_count len, void *data)
769 {
770         struct lprocfs_static_vars lvars;
771
772         lprocfs_init_vars(mdc, &lvars);
773         return lprocfs_obd_attach(dev, lvars.obd_vars);
774 }
775
776 static int mdc_detach(struct obd_device *dev)
777 {
778         return lprocfs_obd_detach(dev);
779 }
780
781 static int mdc_setup(struct obd_device *obd, obd_count len, void *buf)
782 {
783         struct client_obd *cli = &obd->u.cli;
784         int rc;
785         ENTRY;
786
787         OBD_ALLOC(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
788         if (!cli->cl_rpc_lock)
789                 RETURN(-ENOMEM);
790         mdc_init_rpc_lock(cli->cl_rpc_lock);
791
792         ptlrpcd_addref();
793
794         OBD_ALLOC(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
795         if (!cli->cl_setattr_lock)
796                 GOTO(out_free_rpc, rc = -ENOMEM);
797         mdc_init_rpc_lock(cli->cl_setattr_lock);
798
799         rc = client_obd_setup(obd, len, buf);
800         if (rc) {
801                 OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
802  out_free_rpc:
803                 OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
804         }
805
806         RETURN(rc);
807 }
808
809
810 int mdc_postsetup(struct obd_device *obd) 
811 {
812         int rc;
813         rc = obd_llog_init(obd, obd, 0, NULL);
814         if (rc) {
815                 CERROR("failed to setup llogging subsystems\n");
816         }
817         RETURN(rc);
818 }
819
820 int mdc_init_ea_size(struct obd_device *obd, char *lov_name)
821 {
822         struct client_obd *cli = &obd->u.cli;
823         struct obd_device *lov_obd;
824         struct obd_export *exp;
825         struct lustre_handle conn;
826         struct lov_desc desc;
827         int valsize;
828         int rc;
829
830         lov_obd = class_name2obd(lov_name);
831         if (!lov_obd) {
832                 CERROR("MDC cannot locate LOV %s!\n", lov_name);
833                 RETURN(-ENOTCONN);
834         }
835
836         rc = obd_connect(&conn, lov_obd, &obd->obd_uuid);
837         if (rc) {
838                 CERROR("MDC failed connect to LOV %s (%d)\n", lov_name, rc);
839                 RETURN(rc);
840         }
841         exp = class_conn2export(&conn);
842
843         valsize = sizeof(desc);
844         rc = obd_get_info(exp, strlen("lovdesc") + 1, "lovdesc",
845                           &valsize, &desc);
846         if (rc == 0) {
847                 cli->cl_max_mds_easize = obd_size_diskmd(exp, NULL);
848                 cli->cl_max_mds_cookiesize = desc.ld_tgt_count *
849                         sizeof(struct llog_cookie);
850         }
851         obd_disconnect(exp, 0);
852         RETURN(rc);
853 }
854
855 static int mdc_precleanup(struct obd_device *obd, int flags)
856 {
857         int rc = 0;
858         
859         rc = obd_llog_finish(obd, 0);
860         if (rc != 0)
861                 CERROR("failed to cleanup llogging subsystems\n");
862
863         RETURN(rc);
864 }
865
866 static int mdc_cleanup(struct obd_device *obd, int flags)
867 {
868         struct client_obd *cli = &obd->u.cli;
869  
870         OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
871         OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
872
873         ptlrpcd_decref();
874
875         return client_obd_cleanup(obd, flags);
876 }
877
878
879 static int mdc_llog_init(struct obd_device *obd, struct obd_device *tgt,
880                          int count, struct llog_logid *logid)
881 {
882         struct llog_ctxt *ctxt;
883         int rc;
884         ENTRY;
885
886         rc = llog_setup(obd, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
887                         &llog_client_ops);
888         if (rc == 0) {
889                 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
890                 ctxt->loc_imp = obd->u.cli.cl_import; 
891         }
892
893         RETURN(rc);
894 }
895
896 static int mdc_llog_finish(struct obd_device *obd, int count)
897 {
898         int rc;
899         ENTRY;
900
901         rc = llog_cleanup(llog_get_context(obd, LLOG_CONFIG_REPL_CTXT));
902         RETURN(rc);
903 }
904
905 struct obd_ops mdc_obd_ops = {
906         o_owner:       THIS_MODULE,
907         o_attach:      mdc_attach,
908         o_detach:      mdc_detach,
909         o_setup:       mdc_setup,
910         o_postsetup:   mdc_postsetup,
911         o_precleanup:  mdc_precleanup,
912         o_cleanup:     mdc_cleanup,
913         o_connect:     client_connect_import,
914         o_disconnect:  client_disconnect_export,
915         o_iocontrol:   mdc_iocontrol,
916         o_statfs:      mdc_statfs,
917         o_pin:         mdc_pin,
918         o_unpin:       mdc_unpin,
919         o_llog_init:   mdc_llog_init,
920         o_llog_finish: mdc_llog_finish,
921 };
922
923 int __init mdc_init(void)
924 {
925         struct lprocfs_static_vars lvars;
926         lprocfs_init_vars(mdc, &lvars);
927         return class_register_type(&mdc_obd_ops, lvars.module_vars,
928                                    LUSTRE_MDC_NAME);
929 }
930
931 static void /*__exit*/ mdc_exit(void)
932 {
933         class_unregister_type(LUSTRE_MDC_NAME);
934 }
935
936 #ifdef __KERNEL__
937 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
938 MODULE_DESCRIPTION("Lustre Metadata Client");
939 MODULE_LICENSE("GPL");
940
941 EXPORT_SYMBOL(mdc_req2lustre_md);
942 EXPORT_SYMBOL(mdc_change_cbdata);
943 EXPORT_SYMBOL(mdc_getstatus);
944 EXPORT_SYMBOL(mdc_getattr);
945 EXPORT_SYMBOL(mdc_getattr_name);
946 EXPORT_SYMBOL(mdc_create);
947 EXPORT_SYMBOL(mdc_unlink);
948 EXPORT_SYMBOL(mdc_rename);
949 EXPORT_SYMBOL(mdc_link);
950 EXPORT_SYMBOL(mdc_readpage);
951 EXPORT_SYMBOL(mdc_setattr);
952 EXPORT_SYMBOL(mdc_close);
953 EXPORT_SYMBOL(mdc_done_writing);
954 EXPORT_SYMBOL(mdc_sync);
955 EXPORT_SYMBOL(mdc_set_open_replay_data);
956 EXPORT_SYMBOL(mdc_clear_open_replay_data);
957 EXPORT_SYMBOL(mdc_store_inode_generation);
958 EXPORT_SYMBOL(mdc_init_ea_size);
959
960 module_init(mdc_init);
961 module_exit(mdc_exit);
962 #endif