Whamcloud - gitweb
4116df15765118fe1b097e01bf5d9d6bf8797042
[fs/lustre-release.git] / lustre / mdc / mdc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.sf.net/projects/lustre/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define EXPORT_SYMTAB
24 #define DEBUG_SUBSYSTEM S_MDC
25
26 #include <linux/module.h>
27 #include <linux/miscdevice.h>
28 #include <linux/lustre_mds.h>
29 #include <linux/lustre_lite.h>
30 #include <linux/lustre_dlm.h>
31 #include <linux/init.h>
32
33 #define REQUEST_MINOR 244
34
35 extern int mds_queue_req(struct ptlrpc_request *);
36
37 int mdc_con2cl(struct lustre_handle *conn, struct ptlrpc_client **cl,
38                struct ptlrpc_connection **connection,
39                struct lustre_handle **rconn)
40 {
41         struct obd_export *export;
42         struct mdc_obd *mdc;
43
44         export = class_conn2export(conn);
45         if (!export)
46                 return -ENOTCONN;
47
48         mdc = &export->exp_obd->u.mdc;
49
50         *cl = mdc->mdc_client;
51         *connection = mdc->mdc_conn;
52         *rconn = &export->exp_rconnh;
53
54         return 0;
55 }
56
57 static int mdc_con2dlmcl(struct lustre_handle *conn, struct ptlrpc_client **cl,
58                          struct ptlrpc_connection **connection,
59                          struct lustre_handle **rconn)
60 {
61         struct obd_export *export;
62         struct mdc_obd *mdc;
63
64         export = class_conn2export(conn);
65         if (!export)
66                 return -ENOTCONN;
67
68         mdc = &export->exp_obd->u.mdc;
69
70         *cl = mdc->mdc_ldlm_client;
71         *connection = mdc->mdc_conn;
72         *rconn = &export->exp_rconnh;
73
74         return 0;
75 }
76
77
78 int mdc_getstatus(struct lustre_handle *conn, struct ll_fid *rootfid,
79                   __u64 *last_committed, __u64 *last_rcvd,
80                   __u32 *last_xid, struct ptlrpc_request **request)
81 {
82         struct ptlrpc_request *req;
83         struct mds_body *body;
84         struct ptlrpc_client *cl;
85         struct ptlrpc_connection *connection;
86         struct lustre_handle *rconn;
87         int rc, size = sizeof(*body);
88         ENTRY;
89
90         mdc_con2cl(conn, &cl, &connection, &rconn);
91         req = ptlrpc_prep_req2(cl, connection, rconn,
92                                MDS_GETSTATUS, 1, &size, NULL);
93         if (!req)
94                 GOTO(out, rc = -ENOMEM);
95
96         body = lustre_msg_buf(req->rq_reqmsg, 0);
97         req->rq_level = LUSTRE_CONN_CON;
98         req->rq_replen = lustre_msg_size(1, &size);
99
100         mds_pack_req_body(req);
101         rc = ptlrpc_queue_wait(req);
102         rc = ptlrpc_check_status(req, rc);
103
104         if (!rc) {
105                 body = lustre_msg_buf(req->rq_repmsg, 0);
106                 mds_unpack_body(body);
107                 memcpy(rootfid, &body->fid1, sizeof(*rootfid));
108                 *last_committed = req->rq_repmsg->last_committed;
109                 *last_rcvd = req->rq_repmsg->last_rcvd;
110                 *last_xid = body->last_xid;
111
112                 CDEBUG(D_NET, "root ino=%ld, last_committed=%Lu, last_rcvd=%Lu,"
113                        " last_xid=%d\n",
114                        (unsigned long)rootfid->id,
115                        (unsigned long long)*last_committed,
116                        (unsigned long long)*last_rcvd,
117                        body->last_xid);
118         }
119
120         EXIT;
121  out:
122         ptlrpc_free_req(req); 
123         return rc;
124 }
125
126 int mdc_getlovinfo(struct obd_device *obd, struct lustre_handle *mdc_connh,
127                    uuid_t **uuids, struct ptlrpc_request **request)
128 {
129         struct ptlrpc_request *req;
130         struct mds_status_req *streq;
131         struct lov_obd *lov = &obd->u.lov;
132         struct mdc_obd *mdc = &lov->mdcobd->u.mdc;
133         struct ptlrpc_client *cl;
134         struct ptlrpc_connection *connection;
135         struct lustre_handle *rconn;
136         struct lov_desc *desc = &lov->desc;
137         int rc, size[2] = {sizeof(*streq)};
138         ENTRY;
139
140         mdc_con2cl(mdc_connh, &cl, &connection, &rconn);
141         req = ptlrpc_prep_req2(cl, connection, rconn,
142                                MDS_GETLOVINFO, 1, size, NULL);
143         if (!req)
144                 GOTO(out, rc = -ENOMEM);
145
146         *request = req;
147         streq = lustre_msg_buf(req->rq_reqmsg, 0);
148         streq->flags = HTON__u32(MDS_STATUS_LOV);
149         streq->repbuf = HTON__u32(8000);
150
151         /* prepare for reply */
152         req->rq_level = LUSTRE_CONN_CON;
153         size[0] = sizeof(*desc);
154         size[1] = 8000;
155         req->rq_replen = lustre_msg_size(2, size);
156
157         rc = ptlrpc_queue_wait(req);
158         rc = ptlrpc_check_status(req, rc);
159
160         if (!rc) {
161                 memcpy(desc, lustre_msg_buf(req->rq_repmsg, 0), sizeof(*desc));
162                 *uuids = lustre_msg_buf(req->rq_repmsg, 1);
163                 lov_unpackdesc(desc);
164         }
165         mdc->mdc_max_mdsize = sizeof(*desc) +
166                 desc->ld_tgt_count * sizeof(uuid_t);
167
168         EXIT;
169  out:
170         return rc;
171 }
172
173
174 int mdc_getattr(struct lustre_handle *conn,
175                 obd_id ino, int type, unsigned long valid, size_t ea_size,
176                 struct ptlrpc_request **request)
177 {
178         struct ptlrpc_client *cl;
179         struct ptlrpc_connection *connection;
180         struct lustre_handle *rconn;
181         struct ptlrpc_request *req;
182         struct mds_body *body;
183         int rc, size[2] = {sizeof(*body), 0}, bufcount = 1;
184         ENTRY;
185
186         mdc_con2cl(conn, &cl, &connection, &rconn);
187         req = ptlrpc_prep_req2(cl, connection, rconn,
188                                MDS_GETATTR, 1, size, NULL);
189         if (!req)
190                 GOTO(out, rc = -ENOMEM);
191
192         body = lustre_msg_buf(req->rq_reqmsg, 0);
193         ll_ino2fid(&body->fid1, ino, 0, type);
194         body->valid = valid;
195
196         if (S_ISREG(type)) {
197                 struct mdc_obd *mdc = &class_conn2obd(conn)->u.mdc;
198                 bufcount = 2;
199                 size[1] = mdc->mdc_max_mdsize;
200         } else if (valid & OBD_MD_LINKNAME) {
201                 bufcount = 2;
202                 size[1] = ea_size;
203         }
204         req->rq_replen = lustre_msg_size(bufcount, size);
205
206         rc = ptlrpc_queue_wait(req);
207         rc = ptlrpc_check_status(req, rc);
208
209         if (!rc) {
210                 body = lustre_msg_buf(req->rq_repmsg, 0);
211                 mds_unpack_body(body);
212                 CDEBUG(D_NET, "mode: %o\n", body->mode);
213         }
214
215         EXIT;
216  out:
217         *request = req;
218         return rc;
219 }
220
221 static int mdc_lock_callback(struct lustre_handle *lockh,
222                              struct ldlm_lock_desc *desc, void *data,
223                              int data_len, struct ptlrpc_request **req)
224 {
225         int rc;
226         struct inode *inode = data;
227         ENTRY;
228
229         if (desc == NULL) {
230                 /* Completion AST.  Do nothing. */
231                 RETURN(0);
232         }
233
234         if (data_len != sizeof(*inode)) {
235                 CERROR("data_len should be %d, but is %d\n", sizeof(*inode),
236                        data_len);
237                 LBUG();
238         }
239
240         /* FIXME: do something better than throwing away everything */
241         if (inode == NULL)
242                 LBUG();
243         if (S_ISDIR(inode->i_mode)) {
244                 CDEBUG(D_INODE, "invalidating inode %ld\n", inode->i_ino);
245                 invalidate_inode_pages(inode);
246         }
247
248         rc = ldlm_cli_cancel(lockh);
249         if (rc < 0) {
250                 CERROR("ldlm_cli_cancel: %d\n", rc);
251                 LBUG();
252         }
253         RETURN(0);
254 }
255
256 int mdc_enqueue(struct lustre_handle *conn, int lock_type,
257                 struct lookup_intent *it, int lock_mode, struct inode *dir,
258                 struct dentry *de, struct lustre_handle *lockh, __u64 id,
259                 char *tgt, int tgtlen, void *data, int datalen)
260 {
261         struct ptlrpc_request *req;
262         struct obd_device *obddev = class_conn2obd(conn);
263         struct ptlrpc_client *cl;
264         struct ptlrpc_connection *connection;
265         struct lustre_handle *rconn;
266         __u64 res_id[RES_NAME_SIZE] = {dir->i_ino};
267         int size[5] = {sizeof(struct ldlm_request), sizeof(struct ldlm_intent)};
268         int rc, flags;
269         int repsize[3] = {sizeof(struct ldlm_reply), 
270                           sizeof(struct mds_body),
271                           obddev->u.mdc.mdc_max_mdsize};
272         struct ldlm_reply *dlm_rep;
273         struct ldlm_intent *lit;
274         ENTRY;
275
276         LDLM_DEBUG_NOLOCK("mdsintent %d dir %ld", it->it_op, dir->i_ino);
277
278         switch (it->it_op) { 
279         case IT_MKDIR:
280                 it->it_mode = (it->it_mode | S_IFDIR) & ~current->fs->umask; 
281                 break;
282         case (IT_CREAT|IT_OPEN):
283         case IT_CREAT:
284                 it->it_mode |= S_IFREG; /* no break */
285         case IT_MKNOD:
286                 it->it_mode &= ~current->fs->umask;
287                 break;
288         case IT_SYMLINK:
289                 it->it_mode = (it->it_mode | S_IFLNK) & ~current->fs->umask; 
290                 break;
291         }
292
293         mdc_con2dlmcl(conn, &cl, &connection, &rconn);
294         if (it->it_op & (IT_MKDIR | IT_CREAT | IT_SYMLINK | IT_MKNOD)) {
295                 size[2] = sizeof(struct mds_rec_create);
296                 size[3] = de->d_name.len + 1;
297                 size[4] = tgtlen + 1;
298                 req = ptlrpc_prep_req2(cl, connection, rconn,
299                                        LDLM_ENQUEUE, 5, size, NULL);
300                 if (!req)
301                         RETURN(-ENOMEM);
302
303                 /* pack the intent */
304                 lit = lustre_msg_buf(req->rq_reqmsg, 1);
305                 lit->opc = NTOH__u64((__u64)it->it_op);
306
307                 /* pack the intended request */
308                 mds_create_pack(req, 2, dir, it->it_mode, id, current->fsuid,
309                                 current->fsgid, CURRENT_TIME, de->d_name.name,
310                                 de->d_name.len, tgt, tgtlen);
311                 req->rq_replen = lustre_msg_size(3, repsize);
312         } else if (it->it_op == IT_RENAME2) {
313                 struct dentry *old_de = it->it_data;
314
315                 size[2] = sizeof(struct mds_rec_rename);
316                 size[3] = old_de->d_name.len + 1;
317                 size[4] = de->d_name.len + 1;
318                 req = ptlrpc_prep_req2(cl, connection, rconn,
319                                        LDLM_ENQUEUE, 5, size, NULL);
320                 if (!req)
321                         RETURN(-ENOMEM);
322
323                 /* pack the intent */
324                 lit = lustre_msg_buf(req->rq_reqmsg, 1);
325                 lit->opc = NTOH__u64((__u64)it->it_op);
326
327                 /* pack the intended request */
328                 mds_rename_pack(req, 2, old_de->d_parent->d_inode, dir,
329                                 old_de->d_name.name, old_de->d_name.len,
330                                 de->d_name.name, de->d_name.len);
331                 req->rq_replen = lustre_msg_size(1, repsize);
332         } else if (it->it_op == IT_UNLINK || it->it_op == IT_RMDIR) {
333                 size[2] = sizeof(struct mds_rec_unlink);
334                 size[3] = de->d_name.len + 1;
335                 req = ptlrpc_prep_req2(cl, connection, rconn,
336                                        LDLM_ENQUEUE, 4, size, NULL);
337                 if (!req)
338                         RETURN(-ENOMEM);
339
340                 /* pack the intent */
341                 lit = lustre_msg_buf(req->rq_reqmsg, 1);
342                 lit->opc = NTOH__u64((__u64)it->it_op);
343
344                 /* pack the intended request */
345                 mds_unlink_pack(req, 2, dir, NULL, de->d_name.name, 
346                                 de->d_name.len);
347
348                 req->rq_replen = lustre_msg_size(3, repsize);
349         } else if (it->it_op == IT_GETATTR || it->it_op == IT_RENAME ||
350                    it->it_op == IT_OPEN || it->it_op == IT_SETATTR ||
351                    it->it_op == IT_LOOKUP || it->it_op == IT_READLINK) {
352                 size[2] = sizeof(struct mds_body);
353                 size[3] = de->d_name.len + 1;
354
355                 req = ptlrpc_prep_req2(cl, connection, rconn,
356                                        LDLM_ENQUEUE, 4, size, NULL);
357                 if (!req)
358                         RETURN(-ENOMEM);
359
360                 /* pack the intent */
361                 lit = lustre_msg_buf(req->rq_reqmsg, 1);
362                 lit->opc = NTOH__u64((__u64)it->it_op);
363
364                 /* pack the intended request */
365                 mds_getattr_pack(req, 2, dir, de->d_name.name, de->d_name.len);
366
367                 /* get ready for the reply */
368                 req->rq_replen = lustre_msg_size(3, repsize);
369         } else if (it->it_op == IT_READDIR) {
370                 req = ptlrpc_prep_req2(cl, connection, rconn,
371                                        LDLM_ENQUEUE, 1, size, NULL);
372                 if (!req)
373                         RETURN(-ENOMEM);
374
375                 /* get ready for the reply */
376                 req->rq_replen = lustre_msg_size(1, repsize);
377         } else {
378                 LBUG();
379                 RETURN(-1);
380         }
381 #warning FIXME: the data here needs to be different if a lock was granted for a different inode
382         rc = ldlm_cli_enqueue(cl, connection, rconn, req,
383                               obddev->obd_namespace, NULL, res_id, lock_type,
384                               NULL, 0, lock_mode, &flags,
385                               (void *)mdc_lock_callback, data, datalen, lockh);
386         if (rc == -ENOENT || rc == ELDLM_LOCK_ABORTED) {
387                 lock_mode = 0;
388                 memset(lockh, 0, sizeof(*lockh));
389         } else if (rc != 0) {
390                 CERROR("ldlm_cli_enqueue: %d\n", rc);
391                 RETURN(rc);
392         }
393
394         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0); 
395         it->it_disposition = (int) dlm_rep->lock_policy_res1;
396         it->it_status = (int) dlm_rep->lock_policy_res2;
397         it->it_lock_mode = lock_mode;
398         it->it_data = req;
399
400         RETURN(0);
401 }
402
403 int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags,
404              struct lov_stripe_md *smd, __u64 cookie, __u64 *fh,
405              struct ptlrpc_request **request)
406 {
407         struct ptlrpc_client *cl;
408         struct ptlrpc_connection *connection;
409         struct lustre_handle *rconn;
410         struct mds_body *body;
411         int rc, size[2] = {sizeof(*body)}, bufcount = 1;
412         struct ptlrpc_request *req;
413         ENTRY;
414
415         if (smd != NULL) {
416                 bufcount = 2;
417                 size[1] = smd->lmd_size;
418         }
419
420         mdc_con2cl(conn, &cl, &connection, &rconn);
421         req = ptlrpc_prep_req2(cl, connection, rconn,
422                                MDS_OPEN, bufcount, size, NULL);
423         if (!req)
424                 GOTO(out, rc = -ENOMEM);
425
426         req->rq_flags |= PTL_RPC_FL_REPLAY;
427         body = lustre_msg_buf(req->rq_reqmsg, 0);
428
429         ll_ino2fid(&body->fid1, ino, 0, type);
430         body->flags = HTON__u32(flags);
431         body->extra = cookie;
432
433         if (smd != NULL)
434                 memcpy(lustre_msg_buf(req->rq_reqmsg, 1), smd, smd->lmd_size);
435
436         req->rq_replen = lustre_msg_size(1, size);
437
438         rc = ptlrpc_queue_wait(req);
439         rc = ptlrpc_check_status(req, rc);
440         if (!rc) {
441                 body = lustre_msg_buf(req->rq_repmsg, 0);
442                 mds_unpack_body(body);
443                 *fh = body->extra;
444         }
445
446         EXIT;
447  out:
448         *request = req;
449         return rc;
450 }
451
452 int mdc_close(struct lustre_handle *conn, 
453               obd_id ino, int type, __u64 fh, struct ptlrpc_request **request)
454 {
455         struct ptlrpc_client *cl;
456         struct ptlrpc_connection *connection;
457         struct lustre_handle *rconn;
458         struct mds_body *body;
459         int rc, size = sizeof(*body);
460         struct ptlrpc_request *req;
461
462         mdc_con2cl(conn, &cl, &connection, &rconn);
463         req = ptlrpc_prep_req2(cl, connection, rconn,
464                                MDS_CLOSE, 1, &size, NULL);
465         if (!req)
466                 GOTO(out, rc = -ENOMEM);
467
468         body = lustre_msg_buf(req->rq_reqmsg, 0);
469         ll_ino2fid(&body->fid1, ino, 0, type);
470         body->extra = fh;
471
472         req->rq_replen = lustre_msg_size(0, NULL);
473
474         rc = ptlrpc_queue_wait(req);
475         rc = ptlrpc_check_status(req, rc);
476
477         EXIT;
478  out:
479         *request = req;
480         return rc;
481 }
482
483 int mdc_readpage(struct lustre_handle *conn, obd_id ino, int type, __u64 offset,
484                  char *addr, struct ptlrpc_request **request)
485 {
486         struct ptlrpc_client *cl;
487         struct ptlrpc_connection *connection;
488         struct lustre_handle *rconn;
489         struct ptlrpc_request *req = NULL;
490         struct ptlrpc_bulk_desc *desc = NULL;
491         struct ptlrpc_bulk_page *bulk = NULL;
492         struct mds_body *body;
493         int rc, size = sizeof(*body);
494         ENTRY;
495
496         CDEBUG(D_INODE, "inode: %ld\n", (long)ino);
497
498         mdc_con2cl(conn, &cl, &connection, &rconn);
499         desc = ptlrpc_prep_bulk(connection);
500         if (desc == NULL)
501                 GOTO(out, rc = -ENOMEM);
502
503         req = ptlrpc_prep_req2(cl, connection, rconn,
504                                MDS_READPAGE, 1, &size, NULL);
505         if (!req)
506                 GOTO(out2, rc = -ENOMEM);
507
508         bulk = ptlrpc_prep_bulk_page(desc);
509         bulk->b_buflen = PAGE_SIZE;
510         bulk->b_buf = addr;
511         bulk->b_xid = req->rq_xid;
512         desc->b_portal = MDS_BULK_PORTAL;
513
514         rc = ptlrpc_register_bulk(desc);
515         if (rc) {
516                 CERROR("couldn't setup bulk sink: error %d.\n", rc);
517                 GOTO(out2, rc);
518         }
519
520         body = lustre_msg_buf(req->rq_reqmsg, 0);
521         body->fid1.id = ino;
522         body->fid1.f_type = type;
523         body->size = offset;
524
525         req->rq_replen = lustre_msg_size(1, &size);
526         rc = ptlrpc_queue_wait(req);
527         rc = ptlrpc_check_status(req, rc);
528         if (rc) {
529                 ptlrpc_abort_bulk(desc);
530                 GOTO(out2, rc);
531         } else { 
532                 body = lustre_msg_buf(req->rq_repmsg, 0);
533                 mds_unpack_body(body);
534         }
535
536         EXIT;
537  out2:
538         ptlrpc_free_bulk(desc);
539  out:
540         *request = req;
541         return rc;
542 }
543
544 int mdc_statfs(struct lustre_handle *conn, struct statfs *sfs,
545                struct ptlrpc_request **request)
546 {
547         struct ptlrpc_client *cl;
548         struct ptlrpc_connection *connection;
549         struct lustre_handle *rconn;
550         struct obd_statfs *osfs;
551         struct ptlrpc_request *req;
552         int rc, size = sizeof(*osfs);
553         ENTRY;
554
555         mdc_con2cl(conn, &cl, &connection, &rconn);
556         req = ptlrpc_prep_req2(cl, connection, rconn,
557                                MDS_STATFS, 0, NULL, NULL);
558         if (!req)
559                 GOTO(out, rc = -ENOMEM);
560         req->rq_replen = lustre_msg_size(1, &size);
561
562         rc = ptlrpc_queue_wait(req);
563         rc = ptlrpc_check_status(req, rc);
564
565         if (rc)
566                 GOTO(out, rc);
567
568         osfs = lustre_msg_buf(req->rq_repmsg, 0);
569         obd_statfs_unpack(osfs, sfs);
570
571         EXIT;
572 out:
573         *request = req;
574
575         return rc;
576 }
577
578 static int mdc_ioctl(long cmd, struct lustre_handle *conn, int len, void *karg,
579                      void *uarg)
580 {
581 #if 0
582         /* FIXME XXX : This should use the new ioc_data to pass args in */
583         int err = 0;
584         struct ptlrpc_client *cl;
585         struct ptlrpc_connection *connection;
586         struct lustre_handle *rconn;
587         struct ptlrpc_request *request;
588
589         ENTRY;
590
591         if (_IOC_TYPE(cmd) != IOC_REQUEST_TYPE ||
592             _IOC_NR(cmd) < IOC_REQUEST_MIN_NR  ||
593             _IOC_NR(cmd) > IOC_REQUEST_MAX_NR ) {
594                 CDEBUG(D_IOCTL, "invalid ioctl ( type %d, nr %d, size %d )\n",
595                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
596                 RETURN(-EINVAL);
597         }
598
599         ptlrpc_init_client(NULL, NULL, 
600                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, &cl);
601
602         mdc_con2cl(conn, &cl, &connection, &rconn);
603         switch (cmd) {
604         case IOC_REQUEST_GETATTR: {
605                 CERROR("-- getting attr for ino %lu\n", arg);
606                 err = mdc_getattr(&cl, connection, (obd_id)arg, S_IFDIR, ~0, 0,
607                                   &request);
608                 CERROR("-- done err %d\n", err);
609
610                 GOTO(out, err);
611         }
612
613         case IOC_REQUEST_READPAGE: {
614                 char *buf;
615                 OBD_ALLOC(buf, PAGE_SIZE);
616                 if (!buf) {
617                         err = -ENOMEM;
618                         GOTO(out, err);
619                 }
620                 CERROR("-- readpage 0 for ino %lu\n", arg);
621                 err = mdc_readpage(&cl, connection, arg, S_IFDIR, 0, buf,
622                                    &request);
623                 CERROR("-- done err %d\n", err);
624                 OBD_FREE(buf, PAGE_SIZE);
625
626                 GOTO(out, err);
627         }
628
629         case IOC_REQUEST_SETATTR: {
630                 struct inode inode;
631                 struct iattr iattr;
632
633                 inode.i_ino = arg;
634                 inode.i_generation = 0;
635                 iattr.ia_mode = 040777;
636                 iattr.ia_atime = 0;
637                 iattr.ia_valid = ATTR_MODE | ATTR_ATIME;
638
639                 err = mdc_setattr(&cl, connection, &inode, &iattr, &request);
640                 CERROR("-- done err %d\n", err);
641
642                 GOTO(out, err);
643         }
644
645         case IOC_REQUEST_CREATE: {
646                 struct inode inode;
647                 struct iattr iattr;
648
649                 inode.i_ino = arg;
650                 inode.i_generation = 0;
651                 iattr.ia_mode = 040777;
652                 iattr.ia_atime = 0;
653                 iattr.ia_valid = ATTR_MODE | ATTR_ATIME;
654
655                 err = mdc_create(&cl, connection, &inode,
656                                  "foofile", strlen("foofile"),
657                                  NULL, 0, 0100707, 47114711,
658                                  11, 47, 0, NULL, &request);
659                 CERROR("-- done err %d\n", err);
660
661                 GOTO(out, err);
662         }
663
664         case IOC_REQUEST_OPEN: {
665                 __u64 fh, ino;
666                 copy_from_user(&ino, (__u64 *)arg, sizeof(ino));
667                 CERROR("-- opening ino %llu\n", (unsigned long long)ino);
668                 err = mdc_open(&cl, connection, ino, S_IFDIR, O_RDONLY, 4711,
669                                &fh, &request);
670                 copy_to_user((__u64 *)arg, &fh, sizeof(fh));
671                 CERROR("-- done err %d (fh=%Lu)\n", err,
672                        (unsigned long long)fh);
673
674                 GOTO(out, err);
675         }
676
677         case IOC_REQUEST_CLOSE: {
678                 CERROR("-- closing ino 2, filehandle %lu\n", arg);
679                 err = mdc_close(&cl, connection, 2, S_IFDIR, arg, &request);
680                 CERROR("-- done err %d\n", err);
681
682                 GOTO(out, err);
683         }
684
685         default:
686                 GOTO(out, err = -EINVAL);
687         }
688
689  out:
690         ptlrpc_free_req(request);
691         ptlrpc_put_connection(connection);
692         ptlrpc_cleanup_client(&cl);
693
694         RETURN(err);
695 #endif
696         return 0;
697 }
698
699 static int mdc_setup(struct obd_device *obddev, obd_count len, void *buf)
700 {
701         struct obd_ioctl_data* data = buf;
702         struct mdc_obd *mdc = &obddev->u.mdc;
703         char server_uuid[37];
704         int rc;
705         ENTRY;
706
707         if (data->ioc_inllen1 < 1) {
708                 CERROR("osc setup requires a TARGET UUID\n");
709                 RETURN(-EINVAL);
710         }
711
712         if (data->ioc_inllen1 > 37) {
713                 CERROR("mdc UUID must be less than 38 characters\n");
714                 RETURN(-EINVAL);
715         }
716
717         if (data->ioc_inllen2 < 1) {
718                 CERROR("mdc setup requires a SERVER UUID\n");
719                RETURN(-EINVAL);
720         }
721
722         if (data->ioc_inllen2 > 37) {
723                 CERROR("mdc UUID must be less than 38 characters\n");
724                 RETURN(-EINVAL);
725         }
726
727         memcpy(mdc->mdc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
728         memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
729                                                    sizeof(server_uuid)));
730
731         mdc->mdc_conn = ptlrpc_uuid_to_connection(server_uuid);
732         if (!mdc->mdc_conn)
733                 RETURN(-ENOENT); 
734
735         OBD_ALLOC(mdc->mdc_client, sizeof(*mdc->mdc_client));
736         if (mdc->mdc_client == NULL)
737                 GOTO(out_conn, rc = -ENOMEM);
738
739         OBD_ALLOC(mdc->mdc_ldlm_client, sizeof(*mdc->mdc_ldlm_client));
740         if (mdc->mdc_ldlm_client == NULL)
741                 GOTO(out_client, rc = -ENOMEM);
742
743         ptlrpc_init_client(NULL, NULL, MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
744                            mdc->mdc_client);
745         ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
746                            mdc->mdc_ldlm_client);
747         mdc->mdc_client->cli_name = "mdc";
748         mdc->mdc_ldlm_client->cli_name = "ldlm";
749         mdc->mdc_max_mdsize = sizeof(struct lov_stripe_md);
750         /* XXX get recovery hooked in here again */
751         //ptlrpc_init_client(ptlrpc_connmgr, ll_recover,...
752
753         ptlrpc_init_client(ptlrpc_connmgr, NULL,
754                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
755                            mdc->mdc_client);
756
757         MOD_INC_USE_COUNT;
758         RETURN(0);
759
760  out_client:
761         OBD_FREE(mdc->mdc_client, sizeof(*mdc->mdc_client));
762  out_conn:
763         ptlrpc_put_connection(mdc->mdc_conn);
764         return rc;
765 }
766
767 static int mdc_cleanup(struct obd_device * obddev)
768 {
769         struct mdc_obd *mdc = &obddev->u.mdc;
770
771         ptlrpc_cleanup_client(mdc->mdc_client);
772         OBD_FREE(mdc->mdc_client, sizeof(*mdc->mdc_client));
773         ptlrpc_cleanup_client(mdc->mdc_ldlm_client);
774         OBD_FREE(mdc->mdc_ldlm_client, sizeof(*mdc->mdc_ldlm_client));
775         ptlrpc_put_connection(mdc->mdc_conn);
776
777         MOD_DEC_USE_COUNT;
778         return 0;
779 }
780
781 static int mdc_connect(struct lustre_handle *conn, struct obd_device *obd)
782 {
783         struct mdc_obd *mdc = &obd->u.mdc;
784         struct ptlrpc_request *request;
785         int rc, size = sizeof(mdc->mdc_target_uuid);
786         char *tmp = mdc->mdc_target_uuid;
787
788         ENTRY;
789
790         obd->obd_namespace =
791                 ldlm_namespace_new("mdc", LDLM_NAMESPACE_CLIENT);
792         if (obd->obd_namespace == NULL)
793                 RETURN(-ENOMEM);
794
795         MOD_INC_USE_COUNT;
796         rc = class_connect(conn, obd);
797         if (rc) 
798                 RETURN(rc); 
799
800         request = ptlrpc_prep_req(mdc->mdc_client, mdc->mdc_conn,
801                                   MDS_CONNECT, 1, &size, &tmp);
802         if (!request)
803                 GOTO(out_disco, -ENOMEM);
804
805         request->rq_level = LUSTRE_CONN_NEW;
806         request->rq_replen = lustre_msg_size(0, NULL);
807         /* Sending our local connection info breaks for local connections
808         request->rq_reqmsg->addr = conn->addr;
809         request->rq_reqmsg->cookie = conn->cookie;
810          */
811
812         rc = ptlrpc_queue_wait(request);
813         rc = ptlrpc_check_status(request, rc);
814         if (rc)
815                 GOTO(out, rc);
816
817         class_rconn2export(conn, (struct lustre_handle *)request->rq_repmsg);
818
819         EXIT;
820  out:
821         ptlrpc_free_req(request);
822  out_disco:
823         if (rc) {
824                 class_disconnect(conn);
825                 MOD_DEC_USE_COUNT;
826         }
827         return rc;
828 }
829
830 static int mdc_disconnect(struct lustre_handle *conn)
831 {
832         struct ptlrpc_client *cl;
833         struct ptlrpc_connection *connection;
834         struct lustre_handle *rconn;
835         struct obd_device *obd = class_conn2obd(conn);
836         struct ptlrpc_request *request;
837         int rc;
838         ENTRY;
839
840         ldlm_namespace_free(obd->obd_namespace);
841         mdc_con2cl(conn, &cl, &connection, &rconn);
842         request = ptlrpc_prep_req2(cl, connection, rconn,
843                                    MDS_DISCONNECT, 0, NULL, NULL);
844         if (!request)
845                 RETURN(-ENOMEM);
846
847         request->rq_replen = lustre_msg_size(0, NULL);
848
849         rc = ptlrpc_queue_wait(request);
850         if (rc) 
851                 GOTO(out, rc);
852         rc = class_disconnect(conn);
853         if (!rc)
854                 MOD_DEC_USE_COUNT;
855  out:
856         ptlrpc_free_req(request);
857         return rc;
858 }
859
860 struct obd_ops mdc_obd_ops = {
861         o_setup:   mdc_setup,
862         o_cleanup: mdc_cleanup,
863         o_connect: mdc_connect,
864         o_disconnect: mdc_disconnect,
865         o_iocontrol: mdc_ioctl
866 };
867
868 static int __init ptlrpc_request_init(void)
869 {
870         return class_register_type(&mdc_obd_ops, LUSTRE_MDC_NAME);
871 }
872
873 static void __exit ptlrpc_request_exit(void)
874 {
875         class_unregister_type(LUSTRE_MDC_NAME);
876 }
877
878 MODULE_AUTHOR("Cluster File Systems <info@clusterfs.com>");
879 MODULE_DESCRIPTION("Lustre Metadata Client v1.0");
880 MODULE_LICENSE("GPL");
881
882 EXPORT_SYMBOL(mdc_getstatus);
883 EXPORT_SYMBOL(mdc_getlovinfo);
884 EXPORT_SYMBOL(mdc_enqueue);
885 EXPORT_SYMBOL(mdc_getattr);
886 EXPORT_SYMBOL(mdc_statfs);
887 EXPORT_SYMBOL(mdc_create);
888 EXPORT_SYMBOL(mdc_unlink);
889 EXPORT_SYMBOL(mdc_rename);
890 EXPORT_SYMBOL(mdc_link);
891 EXPORT_SYMBOL(mdc_readpage);
892 EXPORT_SYMBOL(mdc_setattr);
893 EXPORT_SYMBOL(mdc_close);
894 EXPORT_SYMBOL(mdc_open);
895
896 module_init(ptlrpc_request_init);
897 module_exit(ptlrpc_request_exit);