Whamcloud - gitweb
Don't load /etc/lustre/lustre.cfg if config files were passed on the command
[fs/lustre-release.git] / lustre / mdc / mdc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.sf.net/projects/lustre/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define EXPORT_SYMTAB
24 #define DEBUG_SUBSYSTEM S_MDC
25
26 #include <linux/module.h>
27 #include <linux/miscdevice.h>
28 #include <linux/lustre_mds.h>
29 #include <linux/lustre_lite.h>
30 #include <linux/lustre_dlm.h>
31
32 #define REQUEST_MINOR 244
33
34 extern int mds_queue_req(struct ptlrpc_request *);
35
36
37 int mdc_getstatus(struct lustre_handle *conn, struct ll_fid *rootfid,
38                   __u64 *last_committed, __u64 *last_rcvd,
39                   __u32 *last_xid, struct ptlrpc_request **request)
40 {
41         struct ptlrpc_request *req;
42         struct mds_body *body;
43         struct mdc_obd *mdc = mdc_conn2mdc(conn);
44         int rc, size = sizeof(*body);
45         ENTRY;
46
47         req = ptlrpc_prep_req2(mdc->mdc_client, mdc->mdc_conn, &mdc->mdc_connh,
48                               MDS_GETSTATUS, 1, &size, NULL);
49         if (!req)
50                 GOTO(out, rc = -ENOMEM);
51
52         body = lustre_msg_buf(req->rq_reqmsg, 0);
53         req->rq_level = LUSTRE_CONN_CON;
54         req->rq_replen = lustre_msg_size(1, &size);
55
56         mds_pack_req_body(req);
57         rc = ptlrpc_queue_wait(req);
58         rc = ptlrpc_check_status(req, rc);
59
60         if (!rc) {
61                 body = lustre_msg_buf(req->rq_repmsg, 0);
62                 mds_unpack_body(body);
63                 memcpy(rootfid, &body->fid1, sizeof(*rootfid));
64                 *last_committed = req->rq_repmsg->last_committed;
65                 *last_rcvd = req->rq_repmsg->last_rcvd;
66                 *last_xid = body->last_xid;
67
68                 CDEBUG(D_NET, "root ino=%ld, last_committed=%Lu, last_rcvd=%Lu,"
69                        " last_xid=%d\n",
70                        (unsigned long)rootfid->id,
71                        (unsigned long long)*last_committed,
72                        (unsigned long long)*last_rcvd,
73                        body->last_xid);
74         }
75
76         EXIT;
77  out:
78         ptlrpc_free_req(req); 
79         return rc;
80 }
81
82
83 int mdc_getattr(struct lustre_handle *conn,
84                 obd_id ino, int type, unsigned long valid, size_t ea_size,
85                 struct ptlrpc_request **request)
86 {
87         struct mdc_obd *mdc = mdc_conn2mdc(conn);
88         struct ptlrpc_request *req;
89         struct mds_body *body;
90         int rc, size[2] = {sizeof(*body), 0}, bufcount = 1;
91         ENTRY;
92
93         req = ptlrpc_prep_req2(mdc->mdc_client, mdc->mdc_conn, &mdc->mdc_connh,
94                                MDS_GETATTR, 1, size, NULL);
95         if (!req)
96                 GOTO(out, rc = -ENOMEM);
97
98         body = lustre_msg_buf(req->rq_reqmsg, 0);
99         ll_ino2fid(&body->fid1, ino, 0, type);
100         body->valid = valid;
101
102         if (S_ISREG(type)) {
103                 bufcount = 2;
104                 size[1] = sizeof(struct obdo);
105         } else if (valid & OBD_MD_LINKNAME) {
106                 bufcount = 2;
107                 size[1] = ea_size;
108         }
109         req->rq_replen = lustre_msg_size(bufcount, size);
110
111         rc = ptlrpc_queue_wait(req);
112         rc = ptlrpc_check_status(req, rc);
113
114         if (!rc) {
115                 body = lustre_msg_buf(req->rq_repmsg, 0);
116                 mds_unpack_body(body);
117                 CDEBUG(D_NET, "mode: %o\n", body->mode);
118         }
119
120         EXIT;
121  out:
122         *request = req;
123         return rc;
124 }
125
126 static int mdc_lock_callback(struct lustre_handle *lockh,
127                              struct ldlm_lock_desc *desc, void *data,
128                              int data_len, struct ptlrpc_request **req)
129 {
130         int rc;
131         struct inode *inode = data;
132         ENTRY;
133
134         if (desc == NULL) {
135                 /* Completion AST.  Do nothing. */
136                 RETURN(0);
137         }
138
139         if (data_len != sizeof(*inode)) {
140                 CERROR("data_len should be %d, but is %d\n", sizeof(*inode),
141                        data_len);
142                 LBUG();
143         }
144
145         /* FIXME: do something better than throwing away everything */
146         if (inode == NULL)
147                 LBUG();
148         if (S_ISDIR(inode->i_mode)) {
149                 CDEBUG(D_INODE, "invalidating inode %ld\n", inode->i_ino);
150                 invalidate_inode_pages(inode);
151         }
152
153         rc = ldlm_cli_cancel(lockh);
154         if (rc < 0) {
155                 CERROR("ldlm_cli_cancel: %d\n", rc);
156                 LBUG();
157         }
158         RETURN(0);
159 }
160
161 int mdc_enqueue(struct lustre_handle *conn, int lock_type,
162                 struct lookup_intent *it, int lock_mode, struct inode *dir,
163                 struct dentry *de, struct lustre_handle *lockh, __u64 id,
164                 char *tgt, int tgtlen, void *data, int datalen)
165 {
166         struct ptlrpc_request *req;
167         struct obd_device *obddev = class_conn2obd(conn);
168         struct mdc_obd *mdc = mdc_conn2mdc(conn);
169         __u64 res_id[RES_NAME_SIZE] = {dir->i_ino};
170         int size[5] = {sizeof(struct ldlm_request), sizeof(struct ldlm_intent)};
171         int rc, flags;
172         struct ldlm_reply *dlm_rep;
173         struct ldlm_intent *lit;
174         ENTRY;
175
176         LDLM_DEBUG_NOLOCK("mdsintent %d dir %ld", it->it_op, dir->i_ino);
177
178         switch (it->it_op) { 
179         case IT_MKDIR:
180                 it->it_mode = (it->it_mode | S_IFDIR) & ~current->fs->umask; 
181                 break;
182         case (IT_CREAT|IT_OPEN):
183         case IT_CREAT:
184         case IT_MKNOD:
185                 it->it_mode = (it->it_mode | S_IFREG) & ~current->fs->umask; 
186                 break;
187         case IT_SYMLINK:
188                 it->it_mode = (it->it_mode | S_IFLNK) & ~current->fs->umask; 
189                 break;
190         }
191
192         if (it->it_op & (IT_MKDIR | IT_CREAT | IT_SYMLINK | IT_MKNOD)) {
193                 size[2] = sizeof(struct mds_rec_create);
194                 size[3] = de->d_name.len + 1;
195                 size[4] = tgtlen + 1;
196                 req = ptlrpc_prep_req2(mdc->mdc_ldlm_client, mdc->mdc_conn,
197                                        &mdc->mdc_connh, LDLM_ENQUEUE, 5, size,
198                                        NULL);
199                 if (!req)
200                         RETURN(-ENOMEM);
201
202                 /* pack the intent */
203                 lit = lustre_msg_buf(req->rq_reqmsg, 1);
204                 lit->opc = NTOH__u64((__u64)it->it_op);
205
206                 /* pack the intended request */
207                 mds_create_pack(req, 2, dir, it->it_mode, id, current->fsuid,
208                                 current->fsgid, CURRENT_TIME, de->d_name.name,
209                                 de->d_name.len, tgt, tgtlen);
210
211                 size[0] = sizeof(struct ldlm_reply);
212                 size[1] = sizeof(struct mds_body);
213                 size[2] = sizeof(struct obdo);
214                 req->rq_replen = lustre_msg_size(3, size);
215         } else if (it->it_op == IT_RENAME2) {
216                 struct dentry *old_de = it->it_data;
217
218                 size[2] = sizeof(struct mds_rec_rename);
219                 size[3] = old_de->d_name.len + 1;
220                 size[4] = de->d_name.len + 1;
221                 req = ptlrpc_prep_req2(mdc->mdc_ldlm_client, mdc->mdc_conn,
222                                        &mdc->mdc_connh, LDLM_ENQUEUE, 5, size,
223                                        NULL);
224                 if (!req)
225                         RETURN(-ENOMEM);
226
227                 /* pack the intent */
228                 lit = lustre_msg_buf(req->rq_reqmsg, 1);
229                 lit->opc = NTOH__u64((__u64)it->it_op);
230
231                 /* pack the intended request */
232                 mds_rename_pack(req, 2, old_de->d_parent->d_inode, dir,
233                                 old_de->d_name.name, old_de->d_name.len,
234                                 de->d_name.name, de->d_name.len);
235
236                 size[0] = sizeof(struct ldlm_reply);
237                 req->rq_replen = lustre_msg_size(1, size);
238         } else if (it->it_op == IT_UNLINK) {
239                 size[2] = sizeof(struct mds_rec_unlink);
240                 size[3] = de->d_name.len + 1;
241                 req = ptlrpc_prep_req2(mdc->mdc_ldlm_client, mdc->mdc_conn,
242                                 &mdc->mdc_connh, LDLM_ENQUEUE, 4, size, NULL);
243                 if (!req)
244                         RETURN(-ENOMEM);
245
246                 /* pack the intent */
247                 lit = lustre_msg_buf(req->rq_reqmsg, 1);
248                 lit->opc = NTOH__u64((__u64)it->it_op);
249
250                 /* pack the intended request */
251                 mds_unlink_pack(req, 2, dir, NULL, de->d_name.name, 
252                                 de->d_name.len);
253                 size[0] = sizeof(struct ldlm_reply);
254                 size[1] = sizeof(struct obdo);
255                 req->rq_replen = lustre_msg_size(2, size);
256         } else if (it->it_op == IT_RMDIR) {
257                 size[2] = sizeof(struct mds_rec_unlink);
258                 size[3] = de->d_name.len + 1;
259                 req = ptlrpc_prep_req2(mdc->mdc_ldlm_client, mdc->mdc_conn,
260                                        &mdc->mdc_connh, LDLM_ENQUEUE, 4, size,
261                                        NULL);
262                 if (!req)
263                         RETURN(-ENOMEM);
264
265                 /* pack the intent */
266                 lit = lustre_msg_buf(req->rq_reqmsg, 1);
267                 lit->opc = NTOH__u64((__u64)it->it_op);
268
269                 /* pack the intended request */
270                 mds_unlink_pack(req, 2, dir, NULL, de->d_name.name, 
271                                 de->d_name.len);
272                 size[0] = sizeof(struct ldlm_reply);
273                 req->rq_replen = lustre_msg_size(1, size);
274         } else if (it->it_op == IT_GETATTR || it->it_op == IT_RENAME ||
275                    it->it_op == IT_OPEN || it->it_op == IT_SETATTR ||
276                    it->it_op == IT_LOOKUP) {
277                 size[2] = sizeof(struct mds_body);
278                 size[3] = de->d_name.len + 1;
279
280                 req = ptlrpc_prep_req2(mdc->mdc_ldlm_client, mdc->mdc_conn,
281                                        &mdc->mdc_connh, LDLM_ENQUEUE, 4, size,
282                                        NULL);
283                 if (!req)
284                         RETURN(-ENOMEM);
285
286                 /* pack the intent */
287                 lit = lustre_msg_buf(req->rq_reqmsg, 1);
288                 lit->opc = NTOH__u64((__u64)it->it_op);
289
290                 /* pack the intended request */
291                 mds_getattr_pack(req, 2, dir, de->d_name.name, de->d_name.len);
292
293                 /* get ready for the reply */
294                 size[0] = sizeof(struct ldlm_reply);
295                 size[1] = sizeof(struct mds_body);
296                 size[2] = sizeof(struct obdo);
297                 req->rq_replen = lustre_msg_size(3, size);
298         } else if (it->it_op == IT_READDIR) {
299                 req = ptlrpc_prep_req2(mdc->mdc_ldlm_client, mdc->mdc_conn,
300                                 &mdc->mdc_connh, LDLM_ENQUEUE, 1, size, NULL);
301                 if (!req)
302                         RETURN(-ENOMEM);
303
304                 /* get ready for the reply */
305                 size[0] = sizeof(struct ldlm_reply);
306                 req->rq_replen = lustre_msg_size(1, size);
307         } else {
308                 LBUG();
309                 RETURN(-1);
310         }
311 #warning FIXME: the data here needs to be different if a lock was granted for a different inode
312         rc = ldlm_cli_enqueue(mdc->mdc_ldlm_client, mdc->mdc_conn, NULL, req,
313                               obddev->obd_namespace, NULL, res_id, lock_type,
314                               NULL, 0, lock_mode, &flags,
315                               (void *)mdc_lock_callback, data, datalen, lockh);
316         if (rc == -ENOENT || rc == ELDLM_LOCK_ABORTED) {
317                 lock_mode = 0;
318                 memset(lockh, 0, sizeof(*lockh));
319         } else if (rc != 0) {
320                 CERROR("ldlm_cli_enqueue: %d\n", rc);
321                 RETURN(rc);
322         }
323
324         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0); 
325         it->it_disposition = (int) dlm_rep->lock_policy_res1;
326         it->it_status = (int) dlm_rep->lock_policy_res2;
327         it->it_lock_mode = lock_mode;
328         it->it_data = req;
329
330         RETURN(0);
331 }
332
333 int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags,
334              struct obdo *obdo,
335              __u64 cookie, __u64 *fh, struct ptlrpc_request **request)
336 {
337         struct mdc_obd *mdc = mdc_conn2mdc(conn);
338         struct mds_body *body;
339         int rc, size[2] = {sizeof(*body)}, bufcount = 1;
340         struct ptlrpc_request *req;
341         ENTRY;
342
343         if (obdo != NULL) {
344                 bufcount = 2;
345                 size[1] = sizeof(*obdo);
346         }
347
348         req = ptlrpc_prep_req2(mdc->mdc_client, mdc->mdc_conn, &mdc->mdc_connh,
349                                MDS_OPEN, bufcount, size, NULL);
350         if (!req)
351                 GOTO(out, rc = -ENOMEM);
352
353         req->rq_flags |= PTL_RPC_FL_REPLAY;
354         body = lustre_msg_buf(req->rq_reqmsg, 0);
355
356         ll_ino2fid(&body->fid1, ino, 0, type);
357         body->flags = HTON__u32(flags);
358         body->extra = cookie;
359
360         if (obdo != NULL)
361                 memcpy(lustre_msg_buf(req->rq_reqmsg, 1), obdo, sizeof(*obdo));
362
363         req->rq_replen = lustre_msg_size(1, size);
364
365         rc = ptlrpc_queue_wait(req);
366         rc = ptlrpc_check_status(req, rc);
367
368         if (!rc) {
369                 body = lustre_msg_buf(req->rq_repmsg, 0);
370                 mds_unpack_body(body);
371                 *fh = body->extra;
372         }
373
374         EXIT;
375  out:
376         *request = req;
377         return rc;
378 }
379
380 int mdc_close(struct lustre_handle *conn, 
381               obd_id ino, int type, __u64 fh, struct ptlrpc_request **request)
382 {
383         struct mdc_obd *mdc = mdc_conn2mdc(conn);
384         struct mds_body *body;
385         int rc, size = sizeof(*body);
386         struct ptlrpc_request *req;
387
388         req = ptlrpc_prep_req2(mdc->mdc_client, mdc->mdc_conn, &mdc->mdc_connh,
389                                MDS_CLOSE, 1, &size, NULL);
390         if (!req)
391                 GOTO(out, rc = -ENOMEM);
392
393         body = lustre_msg_buf(req->rq_reqmsg, 0);
394         ll_ino2fid(&body->fid1, ino, 0, type);
395         body->extra = fh;
396
397         req->rq_replen = lustre_msg_size(0, NULL);
398
399         rc = ptlrpc_queue_wait(req);
400         rc = ptlrpc_check_status(req, rc);
401
402         EXIT;
403  out:
404         *request = req;
405         return rc;
406 }
407
408 int mdc_readpage(struct lustre_handle *conn, obd_id ino, int type, __u64 offset,
409                  char *addr, struct ptlrpc_request **request)
410 {
411         struct mdc_obd *mdc = mdc_conn2mdc(conn);
412         struct ptlrpc_request *req = NULL;
413         struct ptlrpc_bulk_desc *desc = NULL;
414         struct ptlrpc_bulk_page *bulk = NULL;
415         struct mds_body *body;
416         int rc, size = sizeof(*body);
417         ENTRY;
418
419         CDEBUG(D_INODE, "inode: %ld\n", (long)ino);
420
421         desc = ptlrpc_prep_bulk(mdc->mdc_conn);
422         if (desc == NULL)
423                 GOTO(out, rc = -ENOMEM);
424
425         req = ptlrpc_prep_req2(mdc->mdc_client, mdc->mdc_conn, &mdc->mdc_connh,
426                                MDS_READPAGE, 1, &size, NULL);
427         if (!req)
428                 GOTO(out2, rc = -ENOMEM);
429
430         bulk = ptlrpc_prep_bulk_page(desc);
431         bulk->b_buflen = PAGE_SIZE;
432         bulk->b_buf = addr;
433         bulk->b_xid = req->rq_xid;
434         desc->b_portal = MDS_BULK_PORTAL;
435
436         rc = ptlrpc_register_bulk(desc);
437         if (rc) {
438                 CERROR("couldn't setup bulk sink: error %d.\n", rc);
439                 GOTO(out2, rc);
440         }
441
442         body = lustre_msg_buf(req->rq_reqmsg, 0);
443         body->fid1.id = ino;
444         body->fid1.f_type = type;
445         body->size = offset;
446
447         req->rq_replen = lustre_msg_size(1, &size);
448         rc = ptlrpc_queue_wait(req);
449         rc = ptlrpc_check_status(req, rc);
450         if (rc) {
451                 ptlrpc_abort_bulk(desc);
452                 GOTO(out2, rc);
453         } else { 
454                 body = lustre_msg_buf(req->rq_repmsg, 0);
455                 mds_unpack_body(body);
456         }
457
458         EXIT;
459  out2:
460         ptlrpc_free_bulk(desc);
461  out:
462         *request = req;
463         return rc;
464 }
465
466 int mdc_statfs(struct lustre_handle *conn, struct statfs *sfs,
467                struct ptlrpc_request **request)
468 {
469         struct mdc_obd *mdc = mdc_conn2mdc(conn);
470         struct obd_statfs *osfs;
471         struct ptlrpc_request *req;
472         int rc, size = sizeof(*osfs);
473         ENTRY;
474
475         req = ptlrpc_prep_req2(mdc->mdc_client, mdc->mdc_conn, &mdc->mdc_connh,
476                                MDS_STATFS, 0, NULL, NULL);
477         if (!req)
478                 GOTO(out, rc = -ENOMEM);
479         req->rq_replen = lustre_msg_size(1, &size);
480
481         rc = ptlrpc_queue_wait(req);
482         rc = ptlrpc_check_status(req, rc);
483
484         if (rc)
485                 GOTO(out, rc);
486
487         osfs = lustre_msg_buf(req->rq_repmsg, 0);
488         obd_statfs_unpack(osfs, sfs);
489
490         EXIT;
491 out:
492         *request = req;
493
494         return rc;
495 }
496
497 static int mdc_ioctl(long cmd, struct lustre_handle *conn, int len, void *karg,
498                      void *uarg)
499 {
500 #if 0
501         /* FIXME XXX : This should use the new ioc_data to pass args in */
502         int err = 0;
503         struct ptlrpc_client cl;
504         struct ptlrpc_connection *conn;
505         struct ptlrpc_request *request;
506
507         ENTRY;
508
509         if (_IOC_TYPE(cmd) != IOC_REQUEST_TYPE ||
510             _IOC_NR(cmd) < IOC_REQUEST_MIN_NR  ||
511             _IOC_NR(cmd) > IOC_REQUEST_MAX_NR ) {
512                 CDEBUG(D_IOCTL, "invalid ioctl ( type %d, nr %d, size %d )\n",
513                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
514                 RETURN(-EINVAL);
515         }
516
517         ptlrpc_init_client(NULL, NULL, 
518                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, &cl);
519         connection = ptlrpc_uuid_to_connection("mds");
520         if (!connection) {
521                 CERROR("cannot create client\n");
522                 RETURN(-EINVAL);
523         }
524
525         switch (cmd) {
526         case IOC_REQUEST_GETATTR: {
527                 CERROR("-- getting attr for ino %lu\n", arg);
528                 err = mdc_getattr(&cl, connection, (obd_id)arg, S_IFDIR, ~0, 0,
529                                   &request);
530                 CERROR("-- done err %d\n", err);
531
532                 GOTO(out, err);
533         }
534
535         case IOC_REQUEST_READPAGE: {
536                 char *buf;
537                 OBD_ALLOC(buf, PAGE_SIZE);
538                 if (!buf) {
539                         err = -ENOMEM;
540                         GOTO(out, err);
541                 }
542                 CERROR("-- readpage 0 for ino %lu\n", arg);
543                 err = mdc_readpage(&cl, connection, arg, S_IFDIR, 0, buf,
544                                    &request);
545                 CERROR("-- done err %d\n", err);
546                 OBD_FREE(buf, PAGE_SIZE);
547
548                 GOTO(out, err);
549         }
550
551         case IOC_REQUEST_SETATTR: {
552                 struct inode inode;
553                 struct iattr iattr;
554
555                 inode.i_ino = arg;
556                 inode.i_generation = 0;
557                 iattr.ia_mode = 040777;
558                 iattr.ia_atime = 0;
559                 iattr.ia_valid = ATTR_MODE | ATTR_ATIME;
560
561                 err = mdc_setattr(&cl, connection, &inode, &iattr, &request);
562                 CERROR("-- done err %d\n", err);
563
564                 GOTO(out, err);
565         }
566
567         case IOC_REQUEST_CREATE: {
568                 struct inode inode;
569                 struct iattr iattr;
570
571                 inode.i_ino = arg;
572                 inode.i_generation = 0;
573                 iattr.ia_mode = 040777;
574                 iattr.ia_atime = 0;
575                 iattr.ia_valid = ATTR_MODE | ATTR_ATIME;
576
577                 err = mdc_create(&cl, connection, &inode,
578                                  "foofile", strlen("foofile"),
579                                  NULL, 0, 0100707, 47114711,
580                                  11, 47, 0, NULL, &request);
581                 CERROR("-- done err %d\n", err);
582
583                 GOTO(out, err);
584         }
585
586         case IOC_REQUEST_OPEN: {
587                 __u64 fh, ino;
588                 copy_from_user(&ino, (__u64 *)arg, sizeof(ino));
589                 CERROR("-- opening ino %llu\n", (unsigned long long)ino);
590                 err = mdc_open(&cl, connection, ino, S_IFDIR, O_RDONLY, 4711,
591                                &fh, &request);
592                 copy_to_user((__u64 *)arg, &fh, sizeof(fh));
593                 CERROR("-- done err %d (fh=%Lu)\n", err,
594                        (unsigned long long)fh);
595
596                 GOTO(out, err);
597         }
598
599         case IOC_REQUEST_CLOSE: {
600                 CERROR("-- closing ino 2, filehandle %lu\n", arg);
601                 err = mdc_close(&cl, connection, 2, S_IFDIR, arg, &request);
602                 CERROR("-- done err %d\n", err);
603
604                 GOTO(out, err);
605         }
606
607         default:
608                 GOTO(out, err = -EINVAL);
609         }
610
611  out:
612         ptlrpc_free_req(request);
613         ptlrpc_put_connection(connection);
614         ptlrpc_cleanup_client(&cl);
615
616         RETURN(err);
617 #endif
618         return 0;
619 }
620
621 static int mdc_setup(struct obd_device *obddev, obd_count len, void *buf)
622 {
623         struct obd_ioctl_data* data = buf;
624         struct mdc_obd *mdc = &obddev->u.mdc;
625         char server_uuid[37];
626         int rc;
627         ENTRY;
628
629         if (data->ioc_inllen1 < 1) {
630                 CERROR("osc setup requires a TARGET UUID\n");
631                 RETURN(-EINVAL);
632         }
633
634         if (data->ioc_inllen1 > 37) {
635                 CERROR("mdc UUID must be less than 38 characters\n");
636                 RETURN(-EINVAL);
637         }
638
639         if (data->ioc_inllen2 < 1) {
640                 CERROR("mdc setup requires a SERVER UUID\n");
641                RETURN(-EINVAL);
642         }
643
644         if (data->ioc_inllen2 > 37) {
645                 CERROR("mdc UUID must be less than 38 characters\n");
646                 RETURN(-EINVAL);
647         }
648
649         memcpy(mdc->mdc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
650         memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
651                                                    sizeof(server_uuid)));
652
653         mdc->mdc_conn = ptlrpc_uuid_to_connection(server_uuid);
654         if (!mdc->mdc_conn)
655                 RETURN(-ENOENT); 
656
657         OBD_ALLOC(mdc->mdc_client, sizeof(*mdc->mdc_client));
658         if (mdc->mdc_client == NULL)
659                 GOTO(out_conn, rc = -ENOMEM);
660
661         OBD_ALLOC(mdc->mdc_ldlm_client, sizeof(*mdc->mdc_ldlm_client));
662         if (mdc->mdc_ldlm_client == NULL)
663                 GOTO(out_client, rc = -ENOMEM);
664
665         ptlrpc_init_client(NULL, NULL, MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
666                            mdc->mdc_client);
667         ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
668                            mdc->mdc_ldlm_client);
669         mdc->mdc_client->cli_name = "mdc";
670         mdc->mdc_ldlm_client->cli_name = "ldlm";
671         /* XXX get recovery hooked in here again */
672         //ptlrpc_init_client(ptlrpc_connmgr, ll_recover,...
673
674         ptlrpc_init_client(ptlrpc_connmgr, NULL,
675                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
676                            mdc->mdc_client);
677
678         MOD_INC_USE_COUNT;
679         RETURN(0);
680
681  out_client:
682         OBD_FREE(mdc->mdc_client, sizeof(*mdc->mdc_client));
683  out_conn:
684         ptlrpc_put_connection(mdc->mdc_conn);
685         return rc;
686 }
687
688 static int mdc_cleanup(struct obd_device * obddev)
689 {
690         struct mdc_obd *mdc = &obddev->u.mdc;
691
692         ptlrpc_cleanup_client(mdc->mdc_client);
693         OBD_FREE(mdc->mdc_client, sizeof(*mdc->mdc_client));
694         ptlrpc_cleanup_client(mdc->mdc_ldlm_client);
695         OBD_FREE(mdc->mdc_ldlm_client, sizeof(*mdc->mdc_ldlm_client));
696         ptlrpc_put_connection(mdc->mdc_conn);
697
698         MOD_DEC_USE_COUNT;
699         return 0;
700 }
701
702 static int mdc_connect(struct lustre_handle *conn, struct obd_device *obd)
703 {
704         struct mdc_obd *mdc = &obd->u.mdc;
705         struct ptlrpc_request *request;
706         int rc, size = sizeof(mdc->mdc_target_uuid);
707         char *tmp = mdc->mdc_target_uuid;
708
709         ENTRY;
710
711         obd->obd_namespace =
712                 ldlm_namespace_new("mdc", LDLM_NAMESPACE_CLIENT);
713         if (obd->obd_namespace == NULL)
714                 RETURN(-ENOMEM);
715
716         MOD_INC_USE_COUNT;
717         rc = class_connect(conn, obd);
718         if (rc) 
719                 RETURN(rc); 
720
721         request = ptlrpc_prep_req(mdc->mdc_client, mdc->mdc_conn,
722                                   MDS_CONNECT, 1, &size, &tmp);
723         if (!request)
724                 GOTO(out_disco, -ENOMEM);
725
726         request->rq_level = LUSTRE_CONN_NEW;
727         request->rq_replen = lustre_msg_size(0, NULL);
728
729         rc = ptlrpc_queue_wait(request);
730         rc = ptlrpc_check_status(request, rc);
731         if (rc)
732                 GOTO(out, rc);
733
734         mdc->mdc_connh.addr = request->rq_repmsg->addr;
735         mdc->mdc_connh.cookie = request->rq_repmsg->cookie;
736
737         EXIT;
738  out:
739         ptlrpc_free_req(request);
740  out_disco:
741         if (rc) {
742                 class_disconnect(conn);
743                 MOD_DEC_USE_COUNT;
744         }
745         return rc;
746 }
747
748 static int mdc_disconnect(struct lustre_handle *conn)
749 {
750         struct mdc_obd *mdc = mdc_conn2mdc(conn);
751         struct obd_device *obd = class_conn2obd(conn);
752         struct ptlrpc_request *request;
753         int rc;
754         ENTRY;
755
756         ldlm_namespace_free(obd->obd_namespace);
757         request = ptlrpc_prep_req2(mdc->mdc_client, mdc->mdc_conn, 
758                                    &mdc->mdc_connh, MDS_DISCONNECT, 0, NULL, NULL);
759         if (!request)
760                 RETURN(-ENOMEM);
761
762         request->rq_replen = lustre_msg_size(0, NULL);
763
764         rc = ptlrpc_queue_wait(request);
765         if (rc) 
766                 GOTO(out, rc);
767         rc = class_disconnect(conn);
768         if (!rc)
769                 MOD_DEC_USE_COUNT;
770  out:
771         ptlrpc_free_req(request);
772         return rc;
773 }
774
775 struct obd_ops mdc_obd_ops = {
776         o_setup:   mdc_setup,
777         o_cleanup: mdc_cleanup,
778         o_connect: mdc_connect,
779         o_disconnect: mdc_disconnect,
780         o_iocontrol: mdc_ioctl
781 };
782
783 static int __init ptlrpc_request_init(void)
784 {
785         return class_register_type(&mdc_obd_ops, LUSTRE_MDC_NAME);
786 }
787
788 static void __exit ptlrpc_request_exit(void)
789 {
790         class_unregister_type(LUSTRE_MDC_NAME);
791 }
792
793 MODULE_AUTHOR("Cluster File Systems <info@clusterfs.com>");
794 MODULE_DESCRIPTION("Lustre Metadata Client v1.0");
795 MODULE_LICENSE("GPL");
796
797 EXPORT_SYMBOL(mdc_getstatus);
798 EXPORT_SYMBOL(mdc_enqueue);
799 EXPORT_SYMBOL(mdc_getattr);
800 EXPORT_SYMBOL(mdc_statfs);
801 EXPORT_SYMBOL(mdc_create);
802 EXPORT_SYMBOL(mdc_unlink);
803 EXPORT_SYMBOL(mdc_rename);
804 EXPORT_SYMBOL(mdc_link);
805 EXPORT_SYMBOL(mdc_readpage);
806 EXPORT_SYMBOL(mdc_setattr);
807 EXPORT_SYMBOL(mdc_close);
808 EXPORT_SYMBOL(mdc_open);
809
810 module_init(ptlrpc_request_init);
811 module_exit(ptlrpc_request_exit);