Whamcloud - gitweb
21dd24501728fadf6c985f609985c6f290645625
[fs/lustre-release.git] / lustre / mdc / mdc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001-2004 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.sf.net/projects/lustre/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_MDC
26
27 #ifdef __KERNEL__
28 # include <linux/module.h>
29 # include <linux/pagemap.h>
30 # include <linux/miscdevice.h>
31 # include <linux/init.h>
32 #else
33 # include <liblustre.h>
34 #endif
35
36 #include <linux/obd_class.h>
37 #include <linux/lustre_mds.h>
38 #include <linux/lustre_dlm.h>
39 #include <linux/lustre_sec.h>
40 #include <linux/lprocfs_status.h>
41 #include <linux/lustre_acl.h>
42 #include "mdc_internal.h"
43
44 #define REQUEST_MINOR 244
45
46 static int mdc_cleanup(struct obd_device *obd, int flags);
47
48 extern int mds_queue_req(struct ptlrpc_request *);
49 /* Helper that implements most of mdc_getstatus and signal_completed_replay. */
50 /* XXX this should become mdc_get_info("key"), sending MDS_GET_INFO RPC */
51 static int send_getstatus(struct obd_import *imp, struct lustre_id *rootid,
52                           int level, int msg_flags)
53 {
54         struct ptlrpc_request *req;
55         struct mds_body *body;
56         int rc, size[2] = {0, sizeof(*body)};
57         ENTRY;
58
59         //size[0] = lustre_secdesc_size();
60
61         req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_GETSTATUS,
62                               2, size, NULL);
63         if (!req)
64                 GOTO(out, rc = -ENOMEM);
65
66         //lustre_pack_secdesc(req, size[0]);
67
68         body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof (*body));
69         req->rq_send_state = level;
70         req->rq_replen = lustre_msg_size(1, &size[1]);
71
72         req->rq_reqmsg->flags |= msg_flags;
73         rc = ptlrpc_queue_wait(req);
74
75         if (!rc) {
76                 body = lustre_swab_repbuf (req, 0, sizeof (*body),
77                                            lustre_swab_mds_body);
78                 if (body == NULL) {
79                         CERROR ("Can't extract mds_body\n");
80                         GOTO (out, rc = -EPROTO);
81                 }
82
83                 memcpy(rootid, &body->id1, sizeof(*rootid));
84
85                 CDEBUG(D_NET, "root ino="LPU64", last_committed="LPU64
86                        ", last_xid="LPU64"\n", rootid->li_stc.u.e3s.l3s_ino,
87                        req->rq_repmsg->last_committed, req->rq_repmsg->last_xid);
88         }
89
90         EXIT;
91  out:
92         ptlrpc_req_finished(req);
93         return rc;
94 }
95
96 /* This should be mdc_get_info("rootid") */
97 int mdc_getstatus(struct obd_export *exp, struct lustre_id *rootid)
98 {
99         return send_getstatus(class_exp2cliimp(exp), rootid,
100                               LUSTRE_IMP_FULL, 0);
101 }
102
103 int mdc_getattr_common(struct obd_export *exp, unsigned int ea_size,
104                        struct ptlrpc_request *req)
105 {
106         struct mds_body *body, *reqbody;
107         void            *eadata;
108         int              rc;
109         int              repsize[2] = {sizeof(*body)};
110         int              bufcount = 1;
111         ENTRY;
112
113         /* request message already built */
114
115         if (ea_size != 0) {
116                 repsize[bufcount++] = ea_size;
117                 CDEBUG(D_INODE, "reserved %u bytes for MD/symlink in packet\n",
118                        ea_size);
119         }
120
121         reqbody = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*reqbody));
122         LASSERT(!(reqbody->valid & OBD_MD_FLACL));
123
124         req->rq_replen = lustre_msg_size(bufcount, repsize);
125
126         mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
127         rc = ptlrpc_queue_wait(req);
128         mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
129         if (rc != 0)
130                 RETURN (rc);
131
132         body = lustre_swab_repbuf (req, 0, sizeof (*body),
133                                    lustre_swab_mds_body);
134         if (body == NULL) {
135                 CERROR ("Can't unpack mds_body\n");
136                 RETURN (-EPROTO);
137         }
138
139         CDEBUG(D_NET, "mode: %o\n", body->mode);
140
141         LASSERT_REPSWAB (req, 1);
142
143         /* Skip the check if getxattr/listxattr are called with no buffers */
144         if ((reqbody->eadatasize != 0) &&
145             !(reqbody->valid & (OBD_MD_FLXATTR | OBD_MD_FLXATTRLIST))) {
146                 /* reply indicates presence of eadata; check it's there... */
147                 eadata = lustre_msg_buf (req->rq_repmsg, 1,
148                                          body->eadatasize);
149                 if (eadata == NULL) {
150                         CERROR ("Missing/short eadata\n");
151                         RETURN (-EPROTO);
152                 }
153         }
154
155         RETURN (0);
156 }
157
158 static int mdc_cancel_unused(struct obd_export *exp,
159                              struct lov_stripe_md *lsm, 
160                              int flags, void *opaque)
161 {
162         struct obd_device *obd = class_exp2obd(exp);
163
164         ENTRY;
165         RETURN(ldlm_cli_cancel_unused(obd->obd_namespace,
166                                       NULL, flags, opaque));
167 }
168
169 int mdc_getattr(struct obd_export *exp, struct lustre_id *id,
170                 __u64 valid, const char *xattr_name,
171                 const void *xattr_data, unsigned int xattr_datalen,
172                 unsigned int ea_size, struct ptlrpc_request **request)
173 {
174         struct ptlrpc_request *req;
175         struct mds_body *body;
176         int xattr_namelen = xattr_name ? strlen(xattr_name) + 1 : 0;
177         int size[4] = {0, sizeof(*body)};
178         int bufcount = 2;
179         int rc;
180         ENTRY;
181
182         size[0] = lustre_secdesc_size();
183
184         if (valid & OBD_MD_FLXATTR) {
185                 size[bufcount++] = xattr_namelen;
186
187                 if (xattr_datalen > 0) {
188                         LASSERT(xattr_data);
189                         size[bufcount++] = xattr_datalen;
190                 }
191         } else
192                 LASSERT(!xattr_data && !xattr_datalen);
193
194         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
195                               MDS_GETATTR, bufcount, size, NULL);
196         if (!req)
197                 GOTO(out, rc = -ENOMEM);
198
199         lustre_pack_secdesc(req, size[0]);
200
201         body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof (*body));
202         memcpy(&body->id1, id, sizeof(*id));
203         body->valid = valid;
204         body->eadatasize = ea_size;
205
206         if (valid & OBD_MD_FLXATTR) {
207                 memcpy(lustre_msg_buf(req->rq_reqmsg, 2, xattr_namelen),
208                        xattr_name, xattr_namelen);
209                 if (xattr_datalen)
210                         memcpy(lustre_msg_buf(req->rq_reqmsg, 3, xattr_datalen),
211                                xattr_data, xattr_datalen);
212         }
213
214         rc = mdc_getattr_common(exp, ea_size, req);
215         if (rc != 0) {
216                 ptlrpc_req_finished (req);
217                 req = NULL;
218         }
219  out:
220         *request = req;
221         RETURN (rc);
222 }
223
224 int mdc_access_check(struct obd_export *exp, struct lustre_id *id,
225                      struct ptlrpc_request **request)
226
227 {
228         struct ptlrpc_request *req;
229         struct mds_body *body;
230         int size[2] = {0, sizeof(*body)};
231         int rc;
232         ENTRY;
233
234         size[0] = lustre_secdesc_size();
235         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
236                               MDS_ACCESS_CHECK, 2, size, NULL);
237         if (!req)
238                 GOTO(out, rc = -ENOMEM);
239
240         lustre_pack_secdesc(req, size[0]);
241         body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof (*body));
242         memcpy(&body->id1, id, sizeof(*id));
243
244         req->rq_replen = lustre_msg_size(2, size);
245         mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
246         rc = ptlrpc_queue_wait(req);
247         mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
248         if (rc != 0) {
249                 ptlrpc_req_finished (req);
250                 req = NULL;
251         } else {
252                 body = lustre_swab_repbuf (req, 0, sizeof (*body),
253                                            lustre_swab_mds_body);
254                 if (body == NULL) {
255                         CERROR ("Can't unpack mds_body\n");
256                         RETURN (-EPROTO);
257                 }
258         }
259
260  out:
261         *request = req;
262         RETURN (rc);
263 }
264
265 int mdc_getattr_lock(struct obd_export *exp, struct lustre_id *id,
266                      char *filename, int namelen, __u64 valid,
267                      unsigned int ea_size, struct ptlrpc_request **request)
268 {
269         struct ptlrpc_request *req;
270         struct mds_body *body;
271         int rc, size[3] = {0, sizeof(*body), namelen};
272         ENTRY;
273
274         size[0] = lustre_secdesc_size();
275
276         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
277                               MDS_GETATTR_LOCK, 3, size, NULL);
278         if (!req)
279                 GOTO(out, rc = -ENOMEM);
280
281         lustre_pack_secdesc(req, size[0]);
282
283         body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof (*body));
284         memcpy(&body->id1, id, sizeof(*id));
285         body->valid = valid;
286         body->eadatasize = ea_size;
287
288         if (filename != NULL) {
289                 LASSERT (strnlen (filename, namelen) == namelen - 1);
290                 memcpy(lustre_msg_buf(req->rq_reqmsg, 2, namelen),
291                        filename, namelen);
292         } else {
293                 LASSERT(namelen == 1);
294         }
295
296         rc = mdc_getattr_common(exp, ea_size, req);
297         if (rc != 0) {
298                 ptlrpc_req_finished (req);
299                 req = NULL;
300         }
301  out:
302         *request = req;
303         RETURN(rc);
304 }
305
306 /* This should be called with both the request and the reply still packed. */
307 int mdc_store_inode_generation(struct obd_export *exp,
308                                struct ptlrpc_request *req,
309                                int reqoff, int repoff)
310 {
311         struct mds_rec_create *rec =
312                 lustre_msg_buf(req->rq_reqmsg, reqoff, sizeof(*rec));
313         struct mds_body *body =
314                 lustre_msg_buf(req->rq_repmsg, repoff, sizeof(*body));
315
316         LASSERT (rec != NULL);
317         LASSERT (body != NULL);
318
319         memcpy(&rec->cr_replayid, &body->id1, sizeof(rec->cr_replayid));
320         DEBUG_REQ(D_HA, req, "storing generation for ino "DLID4,
321                   OLID4(&rec->cr_replayid));
322         return 0;
323 }
324
325 int mdc_req2lustre_md(struct obd_export *exp_lmv, struct ptlrpc_request *req, 
326                       unsigned int offset, struct obd_export *exp_lov, 
327                       struct lustre_md *md)
328 {
329         struct lov_mds_md *lmm;
330         struct posix_acl *acl;
331         struct mds_remote_perm *perm;
332         void *buf;
333         int size, acl_off;
334         int rc = 0;
335         ENTRY;
336
337         LASSERT(md != NULL);
338         memset(md, 0, sizeof(*md));
339
340         md->body = lustre_msg_buf(req->rq_repmsg, offset,
341                                   sizeof(*md->body));
342         if (!md->body)
343                 RETURN(-ENOMEM);
344
345         LASSERT_REPSWABBED(req, offset);
346
347         if (!(md->body->valid & OBD_MD_FLEASIZE) &&
348             !(md->body->valid & OBD_MD_FLDIREA))
349                 RETURN(0);
350
351         if (S_ISREG(md->body->mode)) {
352                 if (md->body->eadatasize == 0) {
353                         CERROR("invalid EA size (0) is detected\n");
354                         RETURN(-EPROTO);
355                 }
356
357                 lmm = lustre_msg_buf(req->rq_repmsg, offset + 1,
358                                      md->body->eadatasize);
359                 if (!lmm)
360                         RETURN(-EINVAL);
361
362                 LASSERT(exp_lov != NULL);
363                 
364                 rc = obd_unpackmd(exp_lov, &md->lsm, lmm,
365                                   md->body->eadatasize);
366                 if (rc > 0) {
367                         LASSERT(rc >= sizeof(*md->lsm));
368                         rc = 0;
369                 }
370         } else if (S_ISDIR(md->body->mode)) {
371                 /* dir can be non-splitted */
372                 if (md->body->eadatasize == 0)
373                         RETURN(0);
374
375                 lmm = lustre_msg_buf(req->rq_repmsg, offset + 1,
376                                      md->body->eadatasize);
377                 if (!lmm)
378                         RETURN(-EINVAL);
379
380                 if (md->body->valid & OBD_MD_MEA) {
381                         LASSERT(exp_lmv != NULL);
382                 
383                         rc = obd_unpackmd(exp_lmv, (void *)&md->mea,
384                                           lmm, md->body->eadatasize);
385                         if (rc > 0) {
386                                 LASSERT(rc >= sizeof(*md->mea));
387                                 rc = 0;
388                         }
389                 }
390         } else {
391                 LASSERT(S_ISCHR(md->body->mode) ||
392                         S_ISBLK(md->body->mode) ||
393                         S_ISFIFO(md->body->mode)||
394                         S_ISLNK(md->body->mode) ||
395                         S_ISSOCK(md->body->mode));
396         }
397
398         /* if anything wrong when unpacking md, we don't check acl
399          * stuff, for simplicity
400          */
401         if (rc)
402                 RETURN(rc);
403
404         if (md->body->valid & OBD_MD_FLACL) {
405                 acl_off = (md->body->valid & OBD_MD_FLEASIZE) ?
406                                 (offset + 2) : (offset + 1);
407
408                 if (md->body->valid & OBD_MD_FLRMTACL) {
409
410                         buf = lustre_swab_repbuf(req, acl_off, sizeof(*perm),
411                                                  lustre_swab_remote_perm);
412                         if (buf == NULL) {
413                                 CERROR("Can't unpack remote perm\n");
414                                 RETURN(0);
415                         }
416
417                         OBD_ALLOC(perm, sizeof(*perm));
418                         if (!perm)
419                                 RETURN(0);
420                         memcpy(perm, buf, sizeof(*perm));
421
422                         md->remote_perm = perm;
423                 } else {
424                         size = le32_to_cpu(*(__u32 *) lustre_msg_buf(
425                                            req->rq_repmsg, acl_off, 4));
426                         buf = lustre_msg_buf(req->rq_repmsg, acl_off + 1, size);
427
428                         acl = posix_acl_from_xattr(buf, size);
429                         if (IS_ERR(acl)) {
430                                 rc = PTR_ERR(acl);
431                                 CERROR("convert xattr to acl failed: %d\n", rc);
432                                 RETURN(0);
433                         } else if (acl) {
434                                 rc = posix_acl_valid(acl);
435                                 if (rc) {
436                                         CERROR("acl valid error: %d\n", rc);
437                                         posix_acl_release(acl);
438                                         RETURN(0);
439                                 }
440                         }
441
442                         md->posix_acl = acl;
443                 }
444         }
445
446         RETURN(rc);
447 }
448
449 static void mdc_commit_open(struct ptlrpc_request *req)
450 {
451         struct mdc_open_data *mod = req->rq_cb_data;
452         if (mod == NULL)
453                 return;
454
455         if (mod->mod_close_req != NULL)
456                 mod->mod_close_req->rq_cb_data = NULL;
457
458         if (mod->mod_och != NULL)
459                 mod->mod_och->och_mod = NULL;
460
461         OBD_FREE(mod, sizeof(*mod));
462         req->rq_cb_data = NULL;
463         LASSERT(atomic_read(&req->rq_refcount) > 1);
464         ptlrpc_req_finished(req);
465 }
466
467 static void mdc_replay_open(struct ptlrpc_request *req)
468 {
469         struct mdc_open_data *mod = req->rq_cb_data;
470         struct obd_client_handle *och;
471         struct ptlrpc_request *close_req;
472         struct lustre_handle old;
473         struct mds_body *body;
474         ENTRY;
475
476         body = lustre_swab_repbuf(req, 1, sizeof(*body), lustre_swab_mds_body);
477         LASSERT (body != NULL);
478
479         if (mod == NULL) {
480                 DEBUG_REQ(D_ERROR, req,
481                           "can't properly replay without open data");
482                 EXIT;
483                 return;
484         }
485
486         och = mod->mod_och;
487         if (och != NULL) {
488                 struct lustre_handle *file_fh;
489                 LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
490                 file_fh = &och->och_fh;
491                 CDEBUG(D_HA, "updating handle from "LPX64" to "LPX64"\n",
492                        file_fh->cookie, body->handle.cookie);
493                 memcpy(&old, file_fh, sizeof(old));
494                 memcpy(file_fh, &body->handle, sizeof(*file_fh));
495         }
496
497         close_req = mod->mod_close_req;
498         if (close_req != NULL) {
499                 struct mds_body *close_body;
500                 LASSERT(close_req->rq_reqmsg->opc == MDS_CLOSE);
501                 close_body = lustre_msg_buf(close_req->rq_reqmsg,
502                                             MDS_REQ_REC_OFF,
503                                             sizeof(*close_body));
504                 if (och != NULL)
505                         LASSERT(!memcmp(&old, &close_body->handle, sizeof old));
506                 DEBUG_REQ(D_HA, close_req, "updating close body with new fh");
507                 memcpy(&close_body->handle, &body->handle,
508                        sizeof(close_body->handle));
509         }
510
511         EXIT;
512 }
513
514 int mdc_set_open_replay_data(struct obd_export *exp,
515                              struct obd_client_handle *och,
516                              struct ptlrpc_request *open_req)
517 {
518         struct mdc_open_data *mod;
519         struct mds_rec_create *rec;
520         struct mds_body *body;
521
522         rec = lustre_msg_buf(open_req->rq_reqmsg, MDS_REQ_INTENT_REC_OFF,
523                              sizeof(*rec));
524         body = lustre_msg_buf(open_req->rq_repmsg, 1, sizeof(*body));
525
526         LASSERT(rec != NULL);
527         /* outgoing messages always in my byte order */
528         LASSERT(body != NULL);
529         /* incoming message in my byte order (it's been swabbed) */
530         LASSERT_REPSWABBED(open_req, 1);
531
532         OBD_ALLOC(mod, sizeof(*mod));
533         if (mod == NULL) {
534                 DEBUG_REQ(D_ERROR, open_req, "can't allocate mdc_open_data");
535                 return 0;
536         }
537
538         och->och_mod = mod;
539         mod->mod_och = och;
540         mod->mod_open_req = ptlrpc_request_addref(open_req);
541
542         memcpy(&rec->cr_replayid, &body->id1, sizeof rec->cr_replayid);
543         open_req->rq_replay_cb = mdc_replay_open;
544         open_req->rq_commit_cb = mdc_commit_open;
545         open_req->rq_cb_data = mod;
546         DEBUG_REQ(D_HA, open_req, "set up replay data");
547         return 0;
548 }
549
550 int mdc_clear_open_replay_data(struct obd_export *exp,
551                                struct obd_client_handle *och)
552 {
553         struct mdc_open_data *mod = och->och_mod;
554
555         /* Don't free the structure now (it happens in mdc_commit_open, after
556          * we're sure we won't need to fix up the close request in the future),
557          * but make sure that replay doesn't poke at the och, which is about to
558          * be freed. */
559         LASSERT(mod != LP_POISON);
560         if (mod != NULL)
561                 mod->mod_och = NULL;
562         och->och_mod = NULL;
563         return 0;
564 }
565
566 static void mdc_commit_close(struct ptlrpc_request *req)
567 {
568         struct mdc_open_data *mod = req->rq_cb_data;
569         struct ptlrpc_request *open_req;
570         struct obd_import *imp = req->rq_import;
571
572         DEBUG_REQ(D_HA, req, "close req committed");
573         if (mod == NULL)
574                 return;
575
576         mod->mod_close_req = NULL;
577         req->rq_cb_data = NULL;
578         req->rq_commit_cb = NULL;
579
580         open_req = mod->mod_open_req;
581         LASSERT(open_req != NULL);
582         LASSERT(open_req != LP_POISON);
583         LASSERT(open_req->rq_type != LI_POISON);
584
585         DEBUG_REQ(D_HA, open_req, "open req balanced");
586         if (open_req->rq_transno == 0) {
587                 DEBUG_REQ(D_ERROR, open_req, "BUG 3892  open");
588                 DEBUG_REQ(D_ERROR, req, "BUG 3892 close");
589                 LASSERTF(open_req->rq_transno != 0, "BUG 3892\n");
590         }
591         LASSERT(open_req->rq_import == imp);
592
593         /* We no longer want to preserve this for transno-unconditional
594          * replay. */
595         spin_lock(&open_req->rq_lock);
596         open_req->rq_replay = 0;
597         spin_unlock(&open_req->rq_lock);
598 }
599
600 int mdc_close(struct obd_export *exp, struct obdo *oa,
601               struct obd_client_handle *och,
602               struct ptlrpc_request **request)
603 {
604         struct obd_device *obd = class_exp2obd(exp);
605         struct obd_import *imp = class_exp2cliimp(exp);
606         int reqsize[3] = {0, sizeof(struct mds_body),
607                           obd->u.cli.cl_max_mds_cookiesize};
608         int rc, repsize[3] = {sizeof(struct mds_body),
609                               obd->u.cli.cl_max_mds_easize,
610                               obd->u.cli.cl_max_mds_cookiesize};
611         struct ptlrpc_request *req;
612         struct mdc_open_data *mod;
613         ENTRY;
614
615         if (imp->imp_connection == NULL) {
616                 CERROR("request on not connected import %s\n",
617                         imp->imp_obd->obd_name);
618                 RETURN(-EIO);
619         }
620
621         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
622                               MDS_CLOSE, 3, reqsize, NULL);
623         if (req == NULL)
624                 GOTO(out, rc = -ENOMEM);
625
626         //reqsize[0] = lustre_secdesc_size();
627         //lustre_pack_secdesc(req, reqsize[0]);
628
629         /* Ensure that this close's handle is fixed up during replay. */
630         LASSERT(och != NULL);
631         mod = och->och_mod;
632         if (likely(mod != NULL)) {
633                 mod->mod_close_req = req;
634                 LASSERT(mod->mod_open_req->rq_type != LI_POISON);
635                 DEBUG_REQ(D_HA, mod->mod_open_req, "matched open");
636         } else {
637                 CDEBUG(D_HA, "couldn't find open req; "
638                        "expecting close error\n");
639         }
640
641         mdc_close_pack(req, 1, oa, oa->o_valid, och);
642
643         req->rq_replen = lustre_msg_size(3, repsize);
644         req->rq_commit_cb = mdc_commit_close;
645         LASSERT(req->rq_cb_data == NULL);
646         req->rq_cb_data = mod;
647
648         mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL);
649         rc = ptlrpc_queue_wait(req);
650         mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
651
652         if (req->rq_repmsg == NULL) {
653                 CDEBUG(D_HA, "request failed to send: %p, %d\n", req,
654                        req->rq_status);
655                 if (rc == 0)
656                         rc = req->rq_status ? req->rq_status : -EIO;
657         } else if (rc == 0) {
658                 rc = req->rq_repmsg->status;
659                 if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
660                         DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, "
661                                   "err = %d", rc);
662                         if (rc > 0)
663                                 rc = -rc;
664                 } else {
665                         if (mod == NULL)
666                                 CERROR("Unexpected: can't find mdc_open_data, but "
667                                        "close succeeded. Please tell CFS.\n");
668                         if (!lustre_swab_repbuf(req, 0, sizeof(struct mds_body),
669                                                 lustre_swab_mds_body))
670                         {
671                                 CERROR("Error unpacking mds_body\n");
672                                 rc = -EPROTO;
673                         }
674                 }
675         }
676
677         EXIT;
678  out:
679         *request = req;
680         return rc;
681 }
682
683 int mdc_done_writing(struct obd_export *exp, struct obdo *obdo)
684 {
685         struct ptlrpc_request *req;
686         struct mds_body *body;
687         int rc, size[2] = {0, sizeof(*body)};
688         ENTRY;
689
690         size[0] = lustre_secdesc_size();
691
692         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
693                               MDS_DONE_WRITING, 2, size, NULL);
694         if (req == NULL)
695                 RETURN(-ENOMEM);
696
697         lustre_pack_secdesc(req, size[0]);
698
699         body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, 
700                               sizeof(*body));
701         
702         mdc_pack_id(&body->id1, obdo->o_id, 0, obdo->o_mode, 
703                     obdo->o_mds, obdo->o_fid);
704         
705         body->size = obdo->o_size;
706         body->blocks = obdo->o_blocks;
707         body->flags = obdo->o_flags;
708         body->valid = obdo->o_valid;
709
710         req->rq_replen = lustre_msg_size(1, &size[1]);
711
712         rc = ptlrpc_queue_wait(req);
713         ptlrpc_req_finished(req);
714         RETURN(rc);
715 }
716
717 int mdc_readpage(struct obd_export *exp,
718                  struct lustre_id *id,
719                  __u64 offset, struct page *page,
720                  struct ptlrpc_request **request)
721 {
722         struct obd_import *imp = class_exp2cliimp(exp);
723         struct ptlrpc_request *req = NULL;
724         struct ptlrpc_bulk_desc *desc = NULL;
725         struct mds_body *body;
726         int rc, size[2] = {0, sizeof(*body)};
727         ENTRY;
728
729         CDEBUG(D_INODE, "inode: %ld\n", (long)id->li_stc.u.e3s.l3s_ino);
730
731         size[0] = lustre_secdesc_size();
732
733         req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_READPAGE,
734                               2, size, NULL);
735         if (req == NULL)
736                 GOTO(out, rc = -ENOMEM);
737         /* XXX FIXME bug 249 */
738         req->rq_request_portal = MDS_READPAGE_PORTAL;
739
740         lustre_pack_secdesc(req, size[0]);
741
742         desc = ptlrpc_prep_bulk_imp(req, 1, BULK_PUT_SINK, MDS_BULK_PORTAL);
743         if (desc == NULL)
744                 GOTO(out, rc = -ENOMEM);
745         /* NB req now owns desc and will free it when it gets freed */
746
747         ptlrpc_prep_bulk_page(desc, page, 0, PAGE_CACHE_SIZE);
748         mdc_readdir_pack(req, 1, offset, PAGE_CACHE_SIZE, id);
749
750         req->rq_replen = lustre_msg_size(1, &size[1]);
751         rc = ptlrpc_queue_wait(req);
752
753         if (rc == 0) {
754                 body = lustre_swab_repbuf(req, 0, sizeof (*body),
755                                           lustre_swab_mds_body);
756                 if (body == NULL) {
757                         CERROR("Can't unpack mds_body\n");
758                         GOTO(out, rc = -EPROTO);
759                 }
760
761                 if (req->rq_bulk->bd_nob_transferred != PAGE_CACHE_SIZE) {
762                         CERROR ("Unexpected # bytes transferred: %d"
763                                 " (%ld expected)\n",
764                                 req->rq_bulk->bd_nob_transferred,
765                                 PAGE_CACHE_SIZE);
766                         GOTO (out, rc = -EPROTO);
767                 }
768         }
769
770         EXIT;
771  out:
772         *request = req;
773         return rc;
774 }
775
776 static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
777                          void *karg, void *uarg)
778 {
779         struct obd_device *obd = exp->exp_obd;
780         struct obd_ioctl_data *data = karg;
781         struct obd_import *imp = obd->u.cli.cl_import;
782         struct llog_ctxt *ctxt;
783         int rc;
784         ENTRY;
785
786 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
787         MOD_INC_USE_COUNT;
788 #else
789         if (!try_module_get(THIS_MODULE)) {
790                 CERROR("Can't get module. Is it alive?");
791                 return -EINVAL;
792         }
793 #endif
794         switch (cmd) {
795         case OBD_IOC_CLIENT_RECOVER:
796                 rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1);
797                 if (rc < 0)
798                         GOTO(out, rc);
799                 GOTO(out, rc = 0);
800         case IOC_OSC_SET_ACTIVE:
801                 rc = ptlrpc_set_import_active(imp, data->ioc_offset);
802                 GOTO(out, rc);
803         case IOC_OSC_CTL_RECOVERY:
804                 rc = ptlrpc_import_control_recovery(imp, data->ioc_offset);
805                 GOTO(out, rc);
806         case OBD_IOC_PARSE: {
807                 ctxt = llog_get_context(&exp->exp_obd->obd_llogs,
808                                         LLOG_CONFIG_REPL_CTXT);
809                 rc = class_config_process_llog(ctxt, data->ioc_inlbuf1, NULL);
810                 GOTO(out, rc);
811         }
812 #ifdef __KERNEL__
813         case OBD_IOC_LLOG_INFO:
814         case OBD_IOC_LLOG_PRINT: {
815                 ctxt = llog_get_context(&obd->obd_llogs, LLOG_CONFIG_REPL_CTXT);
816                 rc = llog_ioctl(ctxt, cmd, data);
817
818                 GOTO(out, rc);
819         }
820 #endif
821         default:
822                 CERROR("mdc_ioctl(): unrecognised ioctl %#x\n", cmd);
823                 GOTO(out, rc = -ENOTTY);
824         }
825 out:
826 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
827         MOD_DEC_USE_COUNT;
828 #else
829         module_put(THIS_MODULE);
830 #endif
831
832         return rc;
833 }
834
835 int mdc_set_info(struct obd_export *exp, obd_count keylen,
836                  void *key, obd_count vallen, void *val)
837 {
838         int rc = -EINVAL;
839
840         if (keylen == strlen("initial_recov") &&
841             memcmp(key, "initial_recov", strlen("initial_recov")) == 0) {
842                 struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
843                 if (vallen != sizeof(int))
844                         RETURN(-EINVAL);
845                 imp->imp_initial_recov = *(int *)val;
846                 CDEBUG(D_HA, "%s: set imp_no_init_recov = %d\n",
847                        exp->exp_obd->obd_name,
848                        imp->imp_initial_recov);
849                 RETURN(0);
850         } else if (keylen >= strlen("mds_type") &&
851                    strcmp(key, "mds_type") == 0) {
852                 struct ptlrpc_request *req;
853                 char *bufs[2] = {key, val};
854                 int rc, size[2] = {keylen, vallen};
855
856                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
857                                       OST_SET_INFO, 2, size, bufs);
858                 if (req == NULL)
859                         RETURN(-ENOMEM);
860
861                 req->rq_replen = lustre_msg_size(0, NULL);
862                 rc = ptlrpc_queue_wait(req);
863                 ptlrpc_req_finished(req);
864                 RETURN(rc);
865         } else if (keylen >= strlen("inter_mds") && strcmp(key, "inter_mds") == 0) {
866                 struct obd_import *imp = class_exp2cliimp(exp);
867                 imp->imp_server_timeout = 1;
868                 CDEBUG(D_OTHER, "%s: timeout / 2\n", exp->exp_obd->obd_name);
869                 RETURN(0);
870         } else if (keylen == strlen("sec") &&
871                    memcmp(key, "sec", keylen) == 0) {
872                 struct client_obd *cli = &exp->exp_obd->u.cli;
873
874                 cli->cl_sec_flavor = ptlrpcs_name2flavor(val);
875                 if (cli->cl_sec_flavor == PTLRPCS_FLVR_INVALID) {
876                         CERROR("unrecognized security type %s\n", (char*) val);
877                         RETURN(-EINVAL);
878                 }
879
880                 RETURN(0);
881         } else if (keylen == strlen("sec_flags") &&
882                    memcmp(key, "sec_flags", keylen) == 0) {
883                 struct client_obd *cli = &exp->exp_obd->u.cli;
884
885                 cli->cl_sec_flags = *((unsigned long *) val);
886                 RETURN(0);
887         } else if (keylen == strlen("flush_cred") &&
888                    memcmp(key, "flush_cred", keylen) == 0) {
889                 struct client_obd *cli = &exp->exp_obd->u.cli;
890
891                 if (cli->cl_import)
892                         ptlrpcs_import_flush_current_creds(cli->cl_import);
893                 RETURN(0);
894         } else if (keylen == strlen("async") && memcmp(key, "async", keylen) == 0) {
895                 struct client_obd *cl = &exp->exp_obd->u.cli;
896                 if (vallen != sizeof(int))
897                         RETURN(-EINVAL);
898                 cl->cl_async = *(int *)val;
899                 CDEBUG(D_HA, "%s: set async = %d\n",
900                        exp->exp_obd->obd_name, cl->cl_async);
901                 RETURN(0);
902         }
903         RETURN(rc);
904 }
905
906 static int mdc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
907                       unsigned long max_age)
908 {
909         struct obd_statfs *msfs;
910         struct ptlrpc_request *req;
911         int rc, size = sizeof(*msfs);
912         ENTRY;
913
914         /* We could possibly pass max_age in the request (as an absolute
915          * timestamp or a "seconds.usec ago") so the target can avoid doing
916          * extra calls into the filesystem if that isn't necessary (e.g.
917          * during mount that would help a bit).  Having relative timestamps
918          * is not so great if request processing is slow, while absolute
919          * timestamps are not ideal because they need time synchronization. */
920         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_MDS_VERSION,
921                               MDS_STATFS, 0, NULL, NULL);
922         if (!req)
923                 RETURN(-ENOMEM);
924
925         req->rq_replen = lustre_msg_size(1, &size);
926
927         mdc_get_rpc_lock(obd->u.cli.cl_rpc_lock, NULL);
928         rc = ptlrpc_queue_wait(req);
929         mdc_put_rpc_lock(obd->u.cli.cl_rpc_lock, NULL);
930
931         if (rc) {
932                 /* this can be LMV fake import, whcih is not connected. */
933                 if (!req->rq_import->imp_connection)
934                         memset(osfs, 0, sizeof(*osfs));
935                 GOTO(out, rc);
936         }
937
938         msfs = lustre_swab_repbuf(req, 0, sizeof(*msfs),
939                                   lustre_swab_obd_statfs);
940         if (msfs == NULL) {
941                 CERROR("Can't unpack obd_statfs\n");
942                 GOTO(out, rc = -EPROTO);
943         }
944
945         memcpy(osfs, msfs, sizeof (*msfs));
946         EXIT;
947 out:
948         ptlrpc_req_finished(req);
949         return rc;
950 }
951
952 static int mdc_pin(struct obd_export *exp, obd_id ino, __u32 gen, int type,
953                    struct obd_client_handle *handle, int flag)
954 {
955         struct ptlrpc_request *req;
956         struct mds_body *body;
957         int rc, size[2] = {0, sizeof(*body)};
958         ENTRY;
959
960         //size[0] = lustre_secdesc_size();
961
962         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
963                               MDS_PIN, 2, size, NULL);
964         if (req == NULL)
965                 RETURN(-ENOMEM);
966
967         //lustre_pack_secdesc(req, size[0]);
968
969         body = lustre_msg_buf(req->rq_reqmsg, 
970                               MDS_REQ_REC_OFF, sizeof(*body));
971
972         /* FIXME-UMKA: here should be also mdsnum and fid. */
973         mdc_pack_id(&body->id1, ino, gen, type, 0, 0);
974         body->flags = flag;
975
976         req->rq_replen = lustre_msg_size(1, &size[1]);
977
978         mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
979         rc = ptlrpc_queue_wait(req);
980         mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
981         if (rc) {
982                 CERROR("pin failed: %d\n", rc);
983                 ptlrpc_req_finished(req);
984                 RETURN(rc);
985         }
986
987         body = lustre_swab_repbuf(req, 0, sizeof(*body), lustre_swab_mds_body);
988         if (body == NULL) {
989                 ptlrpc_req_finished(req);
990                 RETURN(rc);
991         }
992
993         memcpy(&handle->och_fh, &body->handle, sizeof(body->handle));
994         handle->och_magic = OBD_CLIENT_HANDLE_MAGIC;
995
996         OBD_ALLOC(handle->och_mod, sizeof(*handle->och_mod));
997         if (handle->och_mod == NULL) {
998                 DEBUG_REQ(D_ERROR, req, "can't allocate mdc_open_data");
999                 RETURN(-ENOMEM);
1000         }
1001         handle->och_mod->mod_open_req = req; /* will be dropped by unpin */
1002
1003         RETURN(rc);
1004 }
1005
1006 static int mdc_unpin(struct obd_export *exp,
1007                      struct obd_client_handle *handle, int flag)
1008 {
1009         struct ptlrpc_request *req;
1010         struct mds_body *body;
1011         int rc, size[2] = {0, sizeof(*body)};
1012         ENTRY;
1013
1014         if (handle->och_magic != OBD_CLIENT_HANDLE_MAGIC)
1015                 RETURN(0);
1016
1017         //size[0] = lustre_secdesc_size();
1018
1019         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
1020                               MDS_CLOSE, 2, size, NULL);
1021         if (req == NULL)
1022                 RETURN(-ENOMEM);
1023
1024         //lustre_pack_secdesc(req, size[0]);
1025
1026         body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof(*body));
1027         memcpy(&body->handle, &handle->och_fh, sizeof(body->handle));
1028         body->flags = flag;
1029
1030         req->rq_replen = lustre_msg_size(0, NULL);
1031         mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
1032         rc = ptlrpc_queue_wait(req);
1033         mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
1034
1035         if (rc != 0)
1036                 CERROR("unpin failed: %d\n", rc);
1037
1038         ptlrpc_req_finished(req);
1039         ptlrpc_req_finished(handle->och_mod->mod_open_req);
1040         OBD_FREE(handle->och_mod, sizeof(*handle->och_mod));
1041         RETURN(rc);
1042 }
1043
1044 int mdc_sync(struct obd_export *exp, struct lustre_id *id,
1045              struct ptlrpc_request **request)
1046 {
1047         struct ptlrpc_request *req;
1048         struct mds_body *body;
1049         int size[2] = {0, sizeof(*body)};
1050         int rc;
1051         ENTRY;
1052
1053         //size[0] = lustre_secdesc_size();
1054
1055         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
1056                               MDS_SYNC, 2, size, NULL);
1057         if (!req)
1058                 RETURN(rc = -ENOMEM);
1059
1060         //lustre_pack_secdesc(req, size[0]);
1061
1062         if (id) {
1063                 body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF,
1064                                       sizeof (*body));
1065                 memcpy(&body->id1, id, sizeof(*id));
1066         }
1067
1068         req->rq_replen = lustre_msg_size(1, &size[1]);
1069
1070         rc = ptlrpc_queue_wait(req);
1071         if (rc || request == NULL)
1072                 ptlrpc_req_finished(req);
1073         else
1074                 *request = req;
1075
1076         RETURN(rc);
1077 }
1078
1079 static int mdc_import_event(struct obd_device *obd, struct obd_import *imp,
1080                             enum obd_import_event event)
1081 {
1082         int rc = 0;
1083
1084         LASSERT(imp->imp_obd == obd);
1085
1086         switch (event) {
1087         case IMP_EVENT_DISCON: {
1088                 break;
1089         }
1090         case IMP_EVENT_INACTIVE: {
1091                 if (obd->obd_observer)
1092                         rc = obd_notify(obd->obd_observer, obd, 0, 0);
1093                 break;
1094         }
1095         case IMP_EVENT_INVALIDATE: {
1096                 struct ldlm_namespace *ns = obd->obd_namespace;
1097
1098                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
1099
1100                 break;
1101         }
1102         case IMP_EVENT_ACTIVE: {
1103                 if (obd->obd_observer)
1104                         rc = obd_notify(obd->obd_observer, obd, 1, 0);
1105                 break;
1106         }
1107         default:
1108                 CERROR("Unknown import event %d\n", event);
1109                 LBUG();
1110         }
1111         RETURN(rc);
1112 }
1113
1114 static int mdc_attach(struct obd_device *dev, obd_count len, void *data)
1115 {
1116         struct lprocfs_static_vars lvars;
1117
1118         lprocfs_init_vars(mdc, &lvars);
1119         return lprocfs_obd_attach(dev, lvars.obd_vars);
1120 }
1121
1122 static int mdc_detach(struct obd_device *dev)
1123 {
1124         return lprocfs_obd_detach(dev);
1125 }
1126
1127 static int mdc_setup(struct obd_device *obd, obd_count len, void *buf)
1128 {
1129         struct client_obd *cli = &obd->u.cli;
1130         int rc;
1131         ENTRY;
1132
1133         OBD_ALLOC(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
1134         if (!cli->cl_rpc_lock)
1135                 RETURN(-ENOMEM);
1136         mdc_init_rpc_lock(cli->cl_rpc_lock);
1137
1138         ptlrpcd_addref();
1139
1140         OBD_ALLOC(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
1141         if (!cli->cl_setattr_lock)
1142                 GOTO(err_rpc_lock, rc = -ENOMEM);
1143         mdc_init_rpc_lock(cli->cl_setattr_lock);
1144
1145         OBD_ALLOC(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
1146         if (!cli->cl_close_lock)
1147                 GOTO(err_setattr_lock, rc = -ENOMEM);
1148         mdc_init_rpc_lock(cli->cl_close_lock);
1149
1150         rc = client_obd_setup(obd, len, buf);
1151         if (rc)
1152                 GOTO(err_close_lock, rc);
1153
1154         rc = obd_llog_init(obd, &obd->obd_llogs, obd, 0, NULL);
1155         if (rc) {
1156                 mdc_cleanup(obd, 0);
1157                 CERROR("failed to setup llogging subsystems\n");
1158         }
1159
1160         RETURN(rc);
1161
1162 err_close_lock:
1163         OBD_FREE(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
1164 err_setattr_lock:
1165         OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
1166 err_rpc_lock:
1167         OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
1168         ptlrpcd_decref();
1169         RETURN(rc);
1170 }
1171
1172 static int mdc_init_ea_size(struct obd_export *exp, int easize, int cookiesize)
1173 {
1174         struct obd_device *obd = exp->exp_obd;
1175         struct client_obd *cli = &obd->u.cli;
1176         ENTRY;
1177
1178         if (cli->cl_max_mds_easize < easize)
1179                 cli->cl_max_mds_easize = easize;
1180         if (cli->cl_max_mds_cookiesize < cookiesize)
1181                 cli->cl_max_mds_cookiesize = cookiesize;
1182         RETURN(0);
1183 }
1184
1185 static int mdc_precleanup(struct obd_device *obd, int flags)
1186 {
1187         int rc = 0;
1188         
1189         rc = obd_llog_finish(obd, &obd->obd_llogs, 0);
1190         if (rc != 0)
1191                 CERROR("failed to cleanup llogging subsystems\n");
1192
1193         RETURN(rc);
1194 }
1195
1196 static int mdc_cleanup(struct obd_device *obd, int flags)
1197 {
1198         struct client_obd *cli = &obd->u.cli;
1199
1200         OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
1201         OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
1202         OBD_FREE(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
1203
1204         ptlrpcd_decref();
1205
1206         return client_obd_cleanup(obd, flags);
1207 }
1208
1209
1210 static int mdc_llog_init(struct obd_device *obd, struct obd_llogs *llogs, 
1211                          struct obd_device *tgt, int count,
1212                          struct llog_catid *logid)
1213 {
1214         struct llog_ctxt *ctxt;
1215         int rc;
1216         ENTRY;
1217
1218         rc = obd_llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
1219                             &llog_client_ops);
1220         if (rc == 0) {
1221                 ctxt = llog_get_context(llogs, LLOG_CONFIG_REPL_CTXT);
1222                 ctxt->loc_imp = obd->u.cli.cl_import;
1223         }
1224
1225         RETURN(rc);
1226 }
1227
1228 static int mdc_llog_finish(struct obd_device *obd,
1229                            struct obd_llogs *llogs, int count)
1230 {
1231         int rc;
1232         ENTRY;
1233
1234         rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_CONFIG_REPL_CTXT));
1235         RETURN(rc);
1236 }
1237 static struct obd_device *mdc_get_real_obd(struct obd_export *exp,
1238                                            struct lustre_id *id)
1239 {
1240        ENTRY;
1241        RETURN(exp->exp_obd);
1242 }
1243
1244 static int mdc_get_info(struct obd_export *exp, __u32 keylen,
1245                         void *key, __u32 *valsize, void *val)
1246 {
1247         struct ptlrpc_request *req;
1248         char *bufs[1] = {key};
1249         int rc = 0;
1250         ENTRY;
1251         
1252         if (!valsize || !val)
1253                 RETURN(-EFAULT);
1254
1255         if (keylen == strlen("remote_flag") && !strcmp(key, "remote_flag")) {
1256                 struct obd_import *imp;
1257                 struct obd_connect_data *data;
1258
1259                 imp = class_exp2cliimp(exp);
1260                 if (!imp) {
1261                         LBUG();
1262                         RETURN(-EINVAL);
1263                 }
1264
1265                 if (imp->imp_state != LUSTRE_IMP_FULL) {
1266                         CERROR("import state not full\n");
1267                         RETURN(-EINVAL);
1268                 }
1269
1270                 data = &imp->imp_connect_data;
1271                 if (data->ocd_connect_flags & OBD_CONNECT_REMOTE) {
1272                         *((int *)val) = 1;
1273                         RETURN(0);
1274                 } else if (data->ocd_connect_flags & OBD_CONNECT_LOCAL) {
1275                         *((int *)val) = 0;
1276                         RETURN(0);
1277                 }
1278                 CERROR("no remote flag set?\n");
1279                 RETURN(-EINVAL);
1280         }
1281
1282         if ((keylen < strlen("mdsize") || strcmp(key, "mdsize") != 0) &&
1283             (keylen < strlen("mdsnum") || strcmp(key, "mdsnum") != 0) &&
1284             (keylen < strlen("rootid") || strcmp(key, "rootid") != 0))
1285                 RETURN(-EPROTO);
1286                 
1287         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
1288                               OST_GET_INFO, 1, (int *)&keylen, bufs);
1289         if (req == NULL)
1290                 RETURN(-ENOMEM);
1291
1292         req->rq_replen = lustre_msg_size(1, (int *)valsize);
1293         rc = ptlrpc_queue_wait(req);
1294         if (rc)
1295                 GOTO(out_req, rc);
1296
1297         if (keylen >= strlen("rootid") && !strcmp(key, "rootid")) {
1298                 struct lustre_id *reply;
1299                 
1300                 reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
1301                                            lustre_swab_lustre_id);
1302                 if (reply == NULL) {
1303                         CERROR("Can't unpack %s\n", (char *)key);
1304                         GOTO(out_req, rc = -EPROTO);
1305                 }
1306
1307                 *(struct lustre_id *)val = *reply;
1308         } else if (keylen >= strlen("lovdesc") && !strcmp(key, "lovdesc")) {
1309                 struct lov_desc *reply;
1310                 
1311                 reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
1312                                            lustre_swab_lov_desc);
1313                 if (reply == NULL) {
1314                         CERROR("Can't unpack %s\n", (char *)key);
1315                         GOTO(out_req, rc = -EPROTO);
1316                 }
1317
1318                 *(struct lov_desc *)val = *reply;
1319                 RETURN(0);
1320         } else {
1321                 __u32 *reply;
1322                 
1323                 reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
1324                                            lustre_swab_generic_32s);
1325                 if (reply == NULL) {
1326                         CERROR("Can't unpack %s\n", (char *)key);
1327                         GOTO(out_req, rc = -EPROTO);
1328                 }
1329                 *((__u32 *)val) = *reply;
1330         }
1331 out_req:
1332         ptlrpc_req_finished(req);
1333         RETURN(rc);
1334 }
1335
1336 int mdc_obj_create(struct obd_export *exp, struct obdo *oa,
1337                    void *acl, int acl_size,
1338                    struct lov_stripe_md **ea, struct obd_trans_info *oti)
1339 {
1340         struct ptlrpc_request *request;
1341         struct ost_body *body;
1342         char *acl_buf;
1343         int rc, size[2] = { sizeof(*body), acl_size };
1344         ENTRY;
1345
1346         LASSERT(oa);
1347
1348         request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION,
1349                                   OST_CREATE, 2, size, NULL);
1350         if (!request)
1351                 GOTO(out_req, rc = -ENOMEM);
1352
1353         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
1354         memcpy(&body->oa, oa, sizeof(body->oa));
1355
1356         if (acl_size) {
1357                 acl_buf = lustre_msg_buf(request->rq_reqmsg, 1, acl_size);
1358                 memcpy(acl_buf, acl, acl_size);
1359         }
1360
1361         request->rq_replen = lustre_msg_size(1, size);
1362         rc = ptlrpc_queue_wait(request);
1363         if (rc)
1364                 GOTO(out_req, rc);
1365
1366         body = lustre_swab_repbuf(request, 0, sizeof(*body),
1367                                   lustre_swab_ost_body);
1368         if (body == NULL) {
1369                 CERROR ("can't unpack ost_body\n");
1370                 GOTO (out_req, rc = -EPROTO);
1371         }
1372
1373         memcpy(oa, &body->oa, sizeof(*oa));
1374
1375         /* store ino/generation for recovery */
1376         body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body));
1377         body->oa.o_id = oa->o_id;
1378         body->oa.o_generation = oa->o_generation;
1379         body->oa.o_fid = oa->o_fid;
1380         body->oa.o_mds = oa->o_mds;
1381
1382         CDEBUG(D_HA, "transno: "LPD64"\n", request->rq_repmsg->transno);
1383         EXIT;
1384 out_req:
1385         ptlrpc_req_finished(request);
1386         return rc;
1387 }
1388
1389 int mdc_brw(int rw, struct obd_export *exp, struct obdo *oa,
1390             struct lov_stripe_md *ea, obd_count oa_bufs,
1391             struct brw_page *pgarr, struct obd_trans_info *oti)
1392 {
1393         struct ptlrpc_bulk_desc *desc;
1394         struct niobuf_remote *niobuf;
1395         struct ptlrpc_request *req;
1396         struct obd_ioobj *ioobj;
1397         struct ost_body *body;
1398         int err, opc, i;
1399         int size[3];
1400
1401         opc = ((rw & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
1402         
1403         size[0] = sizeof(*body);
1404         size[1] = sizeof(*ioobj);
1405         size[2] = oa_bufs * sizeof(*niobuf);
1406
1407         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, opc,
1408                               3, size, NULL);
1409         LASSERT(req != NULL);
1410
1411         if (opc == OST_WRITE)
1412                 desc = ptlrpc_prep_bulk_imp(req, oa_bufs, BULK_GET_SOURCE,
1413                                             OST_BULK_PORTAL);
1414         else
1415                 desc = ptlrpc_prep_bulk_imp(req, oa_bufs, BULK_PUT_SINK,
1416                                             OST_BULK_PORTAL);
1417         LASSERT(desc != NULL);
1418
1419         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
1420         ioobj = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*ioobj));
1421         niobuf = lustre_msg_buf(req->rq_reqmsg, 2, oa_bufs * sizeof(*niobuf));
1422
1423         memcpy(&body->oa, oa, sizeof(*oa));
1424         obdo_to_ioobj(oa, ioobj);
1425         ioobj->ioo_bufcnt = oa_bufs;
1426
1427         for (i = 0; i < oa_bufs; i++, niobuf++) {
1428                 struct brw_page *pg = &pgarr[i];
1429
1430                 LASSERT(pg->count > 0);
1431                 LASSERT((pg->disk_offset & ~PAGE_MASK) + pg->count <= PAGE_SIZE);
1432
1433                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->disk_offset & ~PAGE_MASK,
1434                                       pg->count);
1435
1436                 niobuf->offset = pg->disk_offset;
1437                 niobuf->len = pg->count;
1438                 niobuf->flags = pg->flag;
1439         }
1440
1441         /* size[0] still sizeof (*body) */
1442         if (opc == OST_WRITE) {
1443                 /* 1 RC per niobuf */
1444                 size[1] = sizeof(__u32) * oa_bufs;
1445                 req->rq_replen = lustre_msg_size(2, size);
1446         } else {
1447                 /* 1 RC for the whole I/O */
1448                 req->rq_replen = lustre_msg_size(1, size);
1449         }
1450         err = ptlrpc_queue_wait(req);
1451         LASSERT(err == 0);
1452
1453         ptlrpc_req_finished(req);
1454         return 0;
1455 }
1456
1457 static int mdc_valid_attrs(struct obd_export *exp,
1458                            struct lustre_id *id)
1459 {
1460         struct ldlm_res_id res_id = { .name = {0} };
1461         struct obd_device *obd = exp->exp_obd;
1462         struct lustre_handle lockh;
1463         ldlm_policy_data_t policy;
1464         int flags;
1465         ENTRY;
1466
1467         res_id.name[0] = id_fid(id);
1468         res_id.name[1] = id_group(id);
1469         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1470
1471         CDEBUG(D_INFO, "trying to match res "LPU64"\n",
1472                res_id.name[0]);
1473
1474         /* FIXME use LDLM_FL_TEST_LOCK instead */
1475         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
1476         if (ldlm_lock_match(obd->obd_namespace, flags, &res_id,
1477                             LDLM_IBITS, &policy, LCK_PR, &lockh)) {
1478                 ldlm_lock_decref(&lockh, LCK_PR);
1479                 RETURN(1);
1480         }
1481
1482         if (ldlm_lock_match(obd->obd_namespace, flags, &res_id,
1483                             LDLM_IBITS, &policy, LCK_PW, &lockh)) {
1484                 ldlm_lock_decref(&lockh, LCK_PW);
1485                 RETURN(1);
1486         }
1487         RETURN(0);
1488 }
1489
1490 static int mdc_change_cbdata_name(struct obd_export *exp,
1491                                   struct lustre_id *pid,
1492                                   char *name, int len,
1493                                   struct lustre_id *cid,
1494                                   ldlm_iterator_t it, void *data)
1495 {
1496         int rc;
1497         rc = mdc_change_cbdata(exp, cid, it, data);
1498         RETURN(rc);
1499 }
1500
1501 struct obd_ops mdc_obd_ops = {
1502         .o_owner         = THIS_MODULE,
1503         .o_attach        = mdc_attach,
1504         .o_detach        = mdc_detach,
1505         .o_setup         = mdc_setup,
1506         .o_precleanup    = mdc_precleanup,
1507         .o_cleanup       = mdc_cleanup,
1508         .o_add_conn      = client_import_add_conn,
1509         .o_del_conn      = client_import_del_conn,
1510         .o_connect       = client_connect_import,
1511         .o_disconnect    = client_disconnect_export,
1512         .o_iocontrol     = mdc_iocontrol,
1513         .o_packmd        = mdc_packmd,
1514         .o_unpackmd      = mdc_unpackmd,
1515         .o_statfs        = mdc_statfs,
1516         .o_pin           = mdc_pin,
1517         .o_unpin         = mdc_unpin,
1518         .o_import_event  = mdc_import_event,
1519         .o_llog_init     = mdc_llog_init,
1520         .o_llog_finish   = mdc_llog_finish,
1521         .o_create        = mdc_obj_create,
1522         .o_set_info      = mdc_set_info,
1523         .o_get_info      = mdc_get_info,
1524         .o_brw           = mdc_brw,
1525         .o_cancel_unused = mdc_cancel_unused,
1526         .o_init_ea_size  = mdc_init_ea_size,
1527 };
1528
1529 struct md_ops mdc_md_ops = {
1530         .m_getstatus     = mdc_getstatus,
1531         .m_getattr       = mdc_getattr,
1532         .m_close         = mdc_close,
1533         .m_create        = mdc_create,
1534         .m_done_writing  = mdc_done_writing,
1535         .m_enqueue       = mdc_enqueue,
1536         .m_getattr_lock  = mdc_getattr_lock,
1537         .m_intent_lock   = mdc_intent_lock,
1538         .m_link          = mdc_link,
1539         .m_rename        = mdc_rename,
1540         .m_setattr       = mdc_setattr,
1541         .m_sync          = mdc_sync,
1542         .m_readpage      = mdc_readpage,
1543         .m_unlink        = mdc_unlink,
1544         .m_valid_attrs   = mdc_valid_attrs,
1545         .m_req2lustre_md = mdc_req2lustre_md,
1546         .m_set_open_replay_data   = mdc_set_open_replay_data,
1547         .m_clear_open_replay_data = mdc_clear_open_replay_data,
1548         .m_store_inode_generation = mdc_store_inode_generation,
1549         .m_set_lock_data = mdc_set_lock_data,
1550         .m_get_real_obd  = mdc_get_real_obd,
1551         .m_change_cbdata_name = mdc_change_cbdata_name,
1552         .m_change_cbdata = mdc_change_cbdata,
1553         .m_access_check  = mdc_access_check,
1554 };
1555
1556 int __init mdc_init(void)
1557 {
1558         struct lprocfs_static_vars lvars;
1559         
1560         lprocfs_init_vars(mdc, &lvars);
1561         return class_register_type(&mdc_obd_ops, &mdc_md_ops,
1562                                    lvars.module_vars, OBD_MDC_DEVICENAME);
1563 }
1564
1565 #ifdef __KERNEL__
1566 static void /*__exit*/ mdc_exit(void)
1567 {
1568         class_unregister_type(OBD_MDC_DEVICENAME);
1569 }
1570
1571 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1572 MODULE_DESCRIPTION("Lustre Metadata Client");
1573 MODULE_LICENSE("GPL");
1574
1575 EXPORT_SYMBOL(mdc_req2lustre_md);
1576 EXPORT_SYMBOL(mdc_change_cbdata);
1577 EXPORT_SYMBOL(mdc_getstatus);
1578 EXPORT_SYMBOL(mdc_getattr);
1579 EXPORT_SYMBOL(mdc_getattr_lock);
1580 EXPORT_SYMBOL(mdc_create);
1581 EXPORT_SYMBOL(mdc_unlink);
1582 EXPORT_SYMBOL(mdc_rename);
1583 EXPORT_SYMBOL(mdc_link);
1584 EXPORT_SYMBOL(mdc_readpage);
1585 EXPORT_SYMBOL(mdc_setattr);
1586 EXPORT_SYMBOL(mdc_close);
1587 EXPORT_SYMBOL(mdc_done_writing);
1588 EXPORT_SYMBOL(mdc_sync);
1589 EXPORT_SYMBOL(mdc_set_open_replay_data);
1590 EXPORT_SYMBOL(mdc_clear_open_replay_data);
1591 EXPORT_SYMBOL(mdc_store_inode_generation);
1592
1593 module_init(mdc_init);
1594 module_exit(mdc_exit);
1595 #endif