1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.sf.net/projects/lustre/
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 #define DEBUG_SUBSYSTEM S_MDC
26 #include <linux/module.h>
27 #include <linux/miscdevice.h>
28 #include <linux/lustre_mds.h>
29 #include <linux/lustre_lite.h>
30 #include <linux/lustre_dlm.h>
32 #define REQUEST_MINOR 244
34 extern int mds_queue_req(struct ptlrpc_request *);
37 int mdc_getstatus(struct obd_conn *conn, struct ll_fid *rootfid,
38 __u64 *last_committed, __u64 *last_rcvd,
39 __u32 *last_xid, struct ptlrpc_request **request)
41 struct ptlrpc_request *req;
42 struct mds_body *body;
43 struct mdc_obd *mdc = mdc_conn2mdc(conn);
44 int rc, size = sizeof(*body);
47 req = ptlrpc_prep_req(mdc->mdc_client, mdc->mdc_conn,
48 MDS_GETSTATUS, 1, &size, NULL);
50 GOTO(out, rc = -ENOMEM);
52 body = lustre_msg_buf(req->rq_reqmsg, 0);
53 req->rq_level = LUSTRE_CONN_CON;
54 req->rq_replen = lustre_msg_size(1, &size);
56 mds_pack_req_body(req);
57 rc = ptlrpc_queue_wait(req);
58 rc = ptlrpc_check_status(req, rc);
61 body = lustre_msg_buf(req->rq_repmsg, 0);
62 mds_unpack_body(body);
63 memcpy(rootfid, &body->fid1, sizeof(*rootfid));
64 *last_committed = req->rq_repmsg->last_committed;
65 *last_rcvd = req->rq_repmsg->last_rcvd;
66 *last_xid = body->last_xid;
68 CDEBUG(D_NET, "root ino=%ld, last_committed=%Lu, last_rcvd=%Lu,"
70 (unsigned long)rootfid->id,
71 (unsigned long long)*last_committed,
72 (unsigned long long)*last_rcvd,
83 int mdc_getattr(struct obd_conn *conn,
84 ino_t ino, int type, unsigned long valid, size_t ea_size,
85 struct ptlrpc_request **request)
87 struct mdc_obd *mdc = mdc_conn2mdc(conn);
88 struct ptlrpc_request *req;
89 struct mds_body *body;
90 int rc, size[2] = {sizeof(*body), 0}, bufcount = 1;
93 req = ptlrpc_prep_req(mdc->mdc_client, mdc->mdc_conn,
94 MDS_GETATTR, 1, size, NULL);
96 GOTO(out, rc = -ENOMEM);
98 body = lustre_msg_buf(req->rq_reqmsg, 0);
99 ll_ino2fid(&body->fid1, ino, 0, type);
104 size[1] = sizeof(struct obdo);
105 } else if (valid & OBD_MD_LINKNAME) {
109 req->rq_replen = lustre_msg_size(bufcount, size);
110 req->rq_level = LUSTRE_CONN_FULL;
112 rc = ptlrpc_queue_wait(req);
113 rc = ptlrpc_check_status(req, rc);
116 body = lustre_msg_buf(req->rq_repmsg, 0);
117 mds_unpack_body(body);
118 CDEBUG(D_NET, "mode: %o\n", body->mode);
127 static int mdc_lock_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
128 void *data, int data_len)
131 struct inode *inode = data;
135 /* Completion AST. Do nothing. */
139 if (data_len != sizeof(*inode)) {
140 CERROR("data_len should be %d, but is %d\n", sizeof(*inode),
145 /* FIXME: do something better than throwing away everything */
148 invalidate_inode_pages(inode);
150 rc = ldlm_cli_cancel(lock->l_client, lock);
152 CERROR("ldlm_cli_cancel: %d\n", rc);
158 int mdc_enqueue(struct obd_conn *conn, int lock_type, struct lookup_intent *it,
159 int lock_mode, struct inode *dir, struct dentry *de,
160 struct lustre_handle *lockh, __u64 id, char *tgt, int tgtlen,
161 void *data, int datalen)
163 struct ptlrpc_request *req;
164 struct obd_device *obddev = conn->oc_dev;
165 struct mdc_obd *mdc = mdc_conn2mdc(conn);
166 __u64 res_id[RES_NAME_SIZE] = {dir->i_ino};
167 int size[5] = {sizeof(struct ldlm_request), sizeof(struct ldlm_intent)};
169 struct ldlm_reply *dlm_rep;
170 struct ldlm_intent *lit;
175 it->it_mode = (it->it_mode | S_IFDIR) & ~current->fs->umask;
178 it->it_op = IT_GETATTR;
180 case (IT_CREAT|IT_OPEN):
182 it->it_mode = (it->it_mode | S_IFREG) & ~current->fs->umask;
185 it->it_mode = (it->it_mode | S_IFLNK) & ~current->fs->umask;
189 if (it->it_op & (IT_MKDIR | IT_CREAT | IT_SYMLINK | IT_MKNOD)) {
190 size[2] = sizeof(struct mds_rec_create);
191 size[3] = de->d_name.len + 1;
192 size[4] = tgtlen + 1;
193 req = ptlrpc_prep_req(mdc->mdc_ldlm_client, mdc->mdc_conn,
194 LDLM_ENQUEUE, 5, size, NULL);
198 /* pack the intent */
199 lit = lustre_msg_buf(req->rq_reqmsg, 1);
200 lit->opc = NTOH__u64((__u64)it->it_op);
202 /* pack the intended request */
203 mds_create_pack(req, 2, dir, it->it_mode, id, current->fsuid,
204 current->fsgid, CURRENT_TIME, de->d_name.name,
205 de->d_name.len, tgt, tgtlen);
207 size[0] = sizeof(struct ldlm_reply);
208 size[1] = sizeof(struct mds_body);
209 size[2] = sizeof(struct obdo);
210 req->rq_replen = lustre_msg_size(3, size);
211 } else if ( it->it_op == IT_RENAME2 ) {
212 struct dentry *old_de = it->it_data;
214 size[2] = sizeof(struct mds_rec_rename);
215 size[3] = old_de->d_name.len + 1;
216 size[4] = de->d_name.len + 1;
217 req = ptlrpc_prep_req(mdc->mdc_ldlm_client, mdc->mdc_conn,
218 LDLM_ENQUEUE, 5, size, NULL);
222 /* pack the intent */
223 lit = lustre_msg_buf(req->rq_reqmsg, 1);
224 lit->opc = NTOH__u64((__u64)it->it_op);
226 /* pack the intended request */
227 mds_rename_pack(req, 2, old_de->d_inode, dir,
228 old_de->d_parent->d_name.name,
229 old_de->d_parent->d_name.len,
230 de->d_name.name, de->d_name.len);
232 size[0] = sizeof(struct ldlm_reply);
233 size[1] = sizeof(struct mds_body);
234 req->rq_replen = lustre_msg_size(2, size);
235 } else if ( it->it_op == IT_GETATTR || it->it_op == IT_RENAME ||
236 it->it_op == IT_OPEN ) {
237 size[2] = sizeof(struct mds_body);
238 size[3] = de->d_name.len + 1;
240 req = ptlrpc_prep_req(mdc->mdc_ldlm_client, mdc->mdc_conn,
241 LDLM_ENQUEUE, 4, size, NULL);
245 /* pack the intent */
246 lit = lustre_msg_buf(req->rq_reqmsg, 1);
247 lit->opc = NTOH__u64((__u64)it->it_op);
249 /* pack the intended request */
250 mds_getattr_pack(req, 2, dir, de->d_name.name, de->d_name.len);
252 /* get ready for the reply */
253 size[0] = sizeof(struct ldlm_reply);
254 size[1] = sizeof(struct mds_body);
255 size[2] = sizeof(struct obdo);
256 req->rq_replen = lustre_msg_size(3, size);
257 } else if ( it->it_op == IT_SETATTR) {
258 size[2] = sizeof(struct mds_rec_setattr);
259 size[3] = de->d_name.len + 1;
260 req = ptlrpc_prep_req(mdc->mdc_ldlm_client, mdc->mdc_conn,
261 LDLM_ENQUEUE, 5, size, NULL);
265 lit = lustre_msg_buf(req->rq_reqmsg, 1);
266 lit->opc = NTOH__u64((__u64)it->it_op);
271 mds_setattr_pack(req, 2, dir, it->it_iattr,
272 de->d_name.name, de->d_name.len);
273 size[0] = sizeof(struct ldlm_reply);
274 size[1] = sizeof(struct mds_body);
275 req->rq_replen = lustre_msg_size(2, size);
276 } else if ( it->it_op == IT_READDIR ) {
277 req = ptlrpc_prep_req(mdc->mdc_ldlm_client, mdc->mdc_conn,
278 LDLM_ENQUEUE, 1, size, NULL);
282 /* get ready for the reply */
283 size[0] = sizeof(struct ldlm_reply);
284 req->rq_replen = lustre_msg_size(1, size);
289 rc = ldlm_cli_enqueue(mdc->mdc_ldlm_client, mdc->mdc_conn, req,
290 obddev->obd_namespace, NULL, res_id, lock_type,
291 NULL, 0, lock_mode, &flags,
292 (void *)mdc_lock_callback, data, datalen, lockh);
295 CERROR("ldlm_cli_enqueue: %d\n", rc);
299 dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
300 it->it_disposition = (int) dlm_rep->lock_policy_res1;
301 it->it_status = (int) dlm_rep->lock_policy_res2;
302 it->it_lock_mode = lock_mode;
308 int mdc_open(struct obd_conn *conn, ino_t ino, int type, int flags, __u64 objid,
309 __u64 cookie, __u64 *fh, struct ptlrpc_request **request)
311 struct mdc_obd *mdc = mdc_conn2mdc(conn);
312 struct mds_body *body;
313 int rc, size = sizeof(*body);
314 struct ptlrpc_request *req;
316 req = ptlrpc_prep_req(mdc->mdc_client, mdc->mdc_conn,
317 MDS_OPEN, 1, &size, NULL);
319 GOTO(out, rc = -ENOMEM);
321 req->rq_flags |= PTL_RPC_FL_REPLAY;
322 req->rq_level = LUSTRE_CONN_FULL;
323 body = lustre_msg_buf(req->rq_reqmsg, 0);
325 ll_ino2fid(&body->fid1, ino, 0, type);
326 body->flags = HTON__u32(flags);
327 body->extra = cookie;
329 req->rq_replen = lustre_msg_size(1, &size);
331 rc = ptlrpc_queue_wait(req);
332 rc = ptlrpc_check_status(req, rc);
335 body = lustre_msg_buf(req->rq_repmsg, 0);
336 mds_unpack_body(body);
346 int mdc_close(struct obd_conn *conn,
347 ino_t ino, int type, __u64 fh, struct ptlrpc_request **request)
349 struct mdc_obd *mdc = mdc_conn2mdc(conn);
350 struct mds_body *body;
351 int rc, size = sizeof(*body);
352 struct ptlrpc_request *req;
354 req = ptlrpc_prep_req(mdc->mdc_client, mdc->mdc_conn,
355 MDS_CLOSE, 1, &size, NULL);
357 GOTO(out, rc = -ENOMEM);
359 body = lustre_msg_buf(req->rq_reqmsg, 0);
360 ll_ino2fid(&body->fid1, ino, 0, type);
363 req->rq_level = LUSTRE_CONN_FULL;
364 req->rq_replen = lustre_msg_size(0, NULL);
366 rc = ptlrpc_queue_wait(req);
367 rc = ptlrpc_check_status(req, rc);
375 int mdc_readpage(struct obd_conn *conn, ino_t ino, int type, __u64 offset,
376 char *addr, struct ptlrpc_request **request)
378 struct mdc_obd *mdc = mdc_conn2mdc(conn);
379 struct ptlrpc_request *req = NULL;
380 struct ptlrpc_bulk_desc *desc = NULL;
381 struct ptlrpc_bulk_page *bulk = NULL;
382 struct mds_body *body;
383 int rc, size = sizeof(*body);
386 CDEBUG(D_INODE, "inode: %ld\n", (long)ino);
388 desc = ptlrpc_prep_bulk(mdc->mdc_conn);
390 GOTO(out, rc = -ENOMEM);
392 req = ptlrpc_prep_req(mdc->mdc_client, mdc->mdc_conn,
393 MDS_READPAGE, 1, &size, NULL);
395 GOTO(out2, rc = -ENOMEM);
397 bulk = ptlrpc_prep_bulk_page(desc);
398 bulk->b_buflen = PAGE_SIZE;
400 bulk->b_xid = req->rq_xid;
401 desc->b_portal = MDS_BULK_PORTAL;
403 rc = ptlrpc_register_bulk(desc);
405 CERROR("couldn't setup bulk sink: error %d.\n", rc);
409 body = lustre_msg_buf(req->rq_reqmsg, 0);
411 body->fid1.f_type = type;
414 req->rq_replen = lustre_msg_size(1, &size);
415 req->rq_level = LUSTRE_CONN_FULL;
416 rc = ptlrpc_queue_wait(req);
417 rc = ptlrpc_check_status(req, rc);
419 ptlrpc_abort_bulk(desc);
422 body = lustre_msg_buf(req->rq_repmsg, 0);
423 mds_unpack_body(body);
428 ptlrpc_free_bulk(desc);
435 int mdc_statfs(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
436 struct statfs *statfs,
437 struct ptlrpc_request **request)
439 struct mds_rec_setattr *rec;
440 struct ptlrpc_request *req;
441 int rc, size = sizeof(*rec);
444 req = ptlrpc_prep_req(cl, conn, MDS_STATFS, 1, &size, NULL);
448 rec = lustre_msg_buf(req->rq_reqmsg, 0);
449 mds_setattr_pack(rec, inode, iattr);
451 size = sizeof(struct mds_body);
452 req->rq_replen = lustre_msg_size(1, &size);
454 rc = mdc_reint(cl, req, LUSTRE_CONN_FULL);
456 if (rc == -ERESTARTSYS )
463 static int mdc_ioctl(long cmd, struct obd_conn *conn, int len, void *karg,
467 /* FIXME XXX : This should use the new ioc_data to pass args in */
469 struct ptlrpc_client cl;
470 struct ptlrpc_connection *conn;
471 struct ptlrpc_request *request;
475 if (_IOC_TYPE(cmd) != IOC_REQUEST_TYPE ||
476 _IOC_NR(cmd) < IOC_REQUEST_MIN_NR ||
477 _IOC_NR(cmd) > IOC_REQUEST_MAX_NR ) {
478 CDEBUG(D_IOCTL, "invalid ioctl ( type %d, nr %d, size %d )\n",
479 _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
483 ptlrpc_init_client(NULL, NULL,
484 MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, &cl);
485 connection = ptlrpc_uuid_to_connection("mds");
487 CERROR("cannot create client\n");
492 case IOC_REQUEST_GETATTR: {
493 CERROR("-- getting attr for ino %lu\n", arg);
494 err = mdc_getattr(&cl, connection, arg, S_IFDIR, ~0, 0,
496 CERROR("-- done err %d\n", err);
501 case IOC_REQUEST_READPAGE: {
503 OBD_ALLOC(buf, PAGE_SIZE);
508 CERROR("-- readpage 0 for ino %lu\n", arg);
509 err = mdc_readpage(&cl, connection, arg, S_IFDIR, 0, buf,
511 CERROR("-- done err %d\n", err);
512 OBD_FREE(buf, PAGE_SIZE);
517 case IOC_REQUEST_SETATTR: {
522 inode.i_generation = 0;
523 iattr.ia_mode = 040777;
525 iattr.ia_valid = ATTR_MODE | ATTR_ATIME;
527 err = mdc_setattr(&cl, connection, &inode, &iattr, &request);
528 CERROR("-- done err %d\n", err);
533 case IOC_REQUEST_CREATE: {
538 inode.i_generation = 0;
539 iattr.ia_mode = 040777;
541 iattr.ia_valid = ATTR_MODE | ATTR_ATIME;
543 err = mdc_create(&cl, connection, &inode,
544 "foofile", strlen("foofile"),
545 NULL, 0, 0100707, 47114711,
546 11, 47, 0, NULL, &request);
547 CERROR("-- done err %d\n", err);
552 case IOC_REQUEST_OPEN: {
554 copy_from_user(&ino, (__u64 *)arg, sizeof(ino));
555 CERROR("-- opening ino %llu\n", (unsigned long long)ino);
556 err = mdc_open(&cl, connection, ino, S_IFDIR, O_RDONLY, 4711,
558 copy_to_user((__u64 *)arg, &fh, sizeof(fh));
559 CERROR("-- done err %d (fh=%Lu)\n", err,
560 (unsigned long long)fh);
565 case IOC_REQUEST_CLOSE: {
566 CERROR("-- closing ino 2, filehandle %lu\n", arg);
567 err = mdc_close(&cl, connection, 2, S_IFDIR, arg, &request);
568 CERROR("-- done err %d\n", err);
574 GOTO(out, err = -EINVAL);
578 ptlrpc_free_req(request);
579 ptlrpc_put_connection(connection);
580 ptlrpc_cleanup_client(&cl);
587 static int mdc_setup(struct obd_device *obddev, obd_count len, void *buf)
589 struct obd_ioctl_data* data = buf;
590 struct mdc_obd *mdc = &obddev->u.mdc;
591 char server_uuid[37];
595 if (data->ioc_inllen1 < 1) {
596 CERROR("osc setup requires a TARGET UUID\n");
600 if (data->ioc_inllen1 > 37) {
601 CERROR("mdc UUID must be less than 38 characters\n");
605 if (data->ioc_inllen2 < 1) {
606 CERROR("mdc setup requires a SERVER UUID\n");
610 if (data->ioc_inllen2 > 37) {
611 CERROR("mdc UUID must be less than 38 characters\n");
615 memcpy(mdc->mdc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
616 memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
617 sizeof(server_uuid)));
619 mdc->mdc_conn = ptlrpc_uuid_to_connection(server_uuid);
623 OBD_ALLOC(mdc->mdc_client, sizeof(*mdc->mdc_client));
624 if (mdc->mdc_client == NULL)
625 GOTO(out_conn, rc = -ENOMEM);
627 OBD_ALLOC(mdc->mdc_ldlm_client, sizeof(*mdc->mdc_ldlm_client));
628 if (mdc->mdc_ldlm_client == NULL)
629 GOTO(out_client, rc = -ENOMEM);
631 ptlrpc_init_client(NULL, NULL, MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
633 ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
634 mdc->mdc_ldlm_client);
635 mdc->mdc_client->cli_name = "mdc";
636 mdc->mdc_ldlm_client->cli_name = "ldlm";
637 /* XXX get recovery hooked in here again */
638 //ptlrpc_init_client(ptlrpc_connmgr, ll_recover,...
640 ptlrpc_init_client(ptlrpc_connmgr, NULL,
641 MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
648 OBD_FREE(mdc->mdc_client, sizeof(*mdc->mdc_client));
650 ptlrpc_put_connection(mdc->mdc_conn);
654 static int mdc_cleanup(struct obd_device * obddev)
656 struct mdc_obd *mdc = &obddev->u.mdc;
658 ptlrpc_cleanup_client(mdc->mdc_client);
659 OBD_FREE(mdc->mdc_client, sizeof(*mdc->mdc_client));
660 ptlrpc_cleanup_client(mdc->mdc_ldlm_client);
661 OBD_FREE(mdc->mdc_ldlm_client, sizeof(*mdc->mdc_ldlm_client));
662 ptlrpc_put_connection(mdc->mdc_conn);
668 static int mdc_connect(struct obd_conn *conn)
670 struct mdc_obd *mdc = mdc_conn2mdc(conn);
671 struct ptlrpc_request *request;
672 int rc, size = sizeof(mdc->mdc_target_uuid);
673 char *tmp = mdc->mdc_target_uuid;
677 conn->oc_dev->obd_namespace = ldlm_namespace_new(LDLM_NAMESPACE_CLIENT);
678 if (conn->oc_dev->obd_namespace == NULL)
681 request = ptlrpc_prep_req(mdc->mdc_client, mdc->mdc_conn,
682 MDS_CONNECT, 1, &size, &tmp);
686 request->rq_replen = lustre_msg_size(0, NULL);
688 rc = ptlrpc_queue_wait(request);
692 mdc->mdc_client->cli_target_devno = request->rq_repmsg->target_id;
693 mdc->mdc_ldlm_client->cli_target_devno =
694 mdc->mdc_client->cli_target_devno;
697 ptlrpc_free_req(request);
701 static int mdc_disconnect(struct obd_conn *conn)
703 struct mdc_obd *mdc = mdc_conn2mdc(conn);
704 struct ptlrpc_request *request;
705 struct mds_body *body;
706 int rc, size = sizeof(*body);
709 ldlm_namespace_free(conn->oc_dev->obd_namespace);
710 request = ptlrpc_prep_req(mdc->mdc_client, mdc->mdc_conn,
711 MDS_DISCONNECT, 1, &size,
716 body = lustre_msg_buf(request->rq_reqmsg, 0);
717 body->valid = conn->oc_id;
719 request->rq_replen = lustre_msg_size(0, NULL);
721 rc = ptlrpc_queue_wait(request);
724 ptlrpc_free_req(request);
728 struct obd_ops mdc_obd_ops = {
730 o_cleanup: mdc_cleanup,
731 o_connect: mdc_connect,
732 o_disconnect: mdc_disconnect,
733 o_iocontrol: mdc_ioctl
736 static int __init ptlrpc_request_init(void)
738 return obd_register_type(&mdc_obd_ops, LUSTRE_MDC_NAME);
741 static void __exit ptlrpc_request_exit(void)
743 obd_unregister_type(LUSTRE_MDC_NAME);
746 MODULE_AUTHOR("Cluster File Systems <info@clusterfs.com>");
747 MODULE_DESCRIPTION("Lustre Metadata Client v1.0");
748 MODULE_LICENSE("GPL");
750 EXPORT_SYMBOL(mdc_getstatus);
751 EXPORT_SYMBOL(mdc_enqueue);
752 EXPORT_SYMBOL(mdc_getattr);
753 EXPORT_SYMBOL(mdc_create);
754 EXPORT_SYMBOL(mdc_unlink);
755 EXPORT_SYMBOL(mdc_rename);
756 EXPORT_SYMBOL(mdc_link);
757 EXPORT_SYMBOL(mdc_readpage);
758 EXPORT_SYMBOL(mdc_setattr);
759 EXPORT_SYMBOL(mdc_close);
760 EXPORT_SYMBOL(mdc_open);
762 module_init(ptlrpc_request_init);
763 module_exit(ptlrpc_request_exit);