Whamcloud - gitweb
- documentation updates for the collaborative cache
[fs/lustre-release.git] / lustre / mdc / mdc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.sf.net/projects/lustre/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define EXPORT_SYMTAB
24 #define DEBUG_SUBSYSTEM S_MDC
25
26 #include <linux/module.h>
27 #include <linux/miscdevice.h>
28 #include <linux/lustre_mds.h>
29 #include <linux/lustre_lite.h>
30 #include <linux/lustre_dlm.h>
31 #include <linux/init.h>
32 #include <linux/obd_lov.h>
33
34 #define REQUEST_MINOR 244
35
36 extern int mds_queue_req(struct ptlrpc_request *);
37
38 int mdc_getstatus(struct lustre_handle *conn, struct ll_fid *rootfid,
39                   __u64 *last_committed, __u64 *last_xid,
40                   struct ptlrpc_request **request)
41 {
42         struct ptlrpc_request *req;
43         struct mds_body *body;
44         int rc, size = sizeof(*body);
45         ENTRY;
46
47         req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_GETSTATUS, 1, &size,
48                               NULL);
49         if (!req)
50                 GOTO(out, rc = -ENOMEM);
51
52         body = lustre_msg_buf(req->rq_reqmsg, 0);
53         req->rq_level = LUSTRE_CONN_CON;
54         req->rq_replen = lustre_msg_size(1, &size);
55
56         mds_pack_req_body(req);
57         rc = ptlrpc_queue_wait(req);
58         rc = ptlrpc_check_status(req, rc);
59
60         if (!rc) {
61                 body = lustre_msg_buf(req->rq_repmsg, 0);
62                 mds_unpack_body(body);
63                 memcpy(rootfid, &body->fid1, sizeof(*rootfid));
64                 *last_committed = req->rq_repmsg->last_committed;
65                 *last_xid = req->rq_repmsg->last_xid;
66
67                 CDEBUG(D_NET,"root ino=%ld, last_committed=%Lu, last_xid=%Ld\n",
68                        (unsigned long)rootfid->id,
69                        (unsigned long long)*last_committed,
70                        (unsigned long long)*last_xid);
71         }
72
73         EXIT;
74  out:
75         ptlrpc_free_req(req);
76         return rc;
77 }
78
79 int mdc_getlovinfo(struct obd_device *obd, struct lustre_handle *mdc_connh,
80                    struct ptlrpc_request **request)
81 {
82         struct ptlrpc_request *req;
83         struct mds_status_req *streq;
84         int rc, size[2] = {sizeof(*streq)};
85         ENTRY;
86
87         req = ptlrpc_prep_req(class_conn2cliimp(mdc_connh), MDS_GETLOVINFO, 1,
88                               size, NULL);
89         if (!req)
90                 GOTO(out, rc = -ENOMEM);
91
92         *request = req;
93         streq = lustre_msg_buf(req->rq_reqmsg, 0);
94         streq->flags = HTON__u32(MDS_STATUS_LOV);
95         streq->repbuf = HTON__u32(8192);
96
97         /* prepare for reply */
98         req->rq_level = LUSTRE_CONN_CON;
99         size[0] = 512;
100         size[1] = 8192;
101         req->rq_replen = lustre_msg_size(2, size);
102
103         rc = ptlrpc_queue_wait(req);
104         rc = ptlrpc_check_status(req, rc);
105
106  out:
107         RETURN(rc);
108 }
109
110
111 int mdc_getattr(struct lustre_handle *conn,
112                 obd_id ino, int type, unsigned long valid, size_t ea_size,
113                 struct ptlrpc_request **request)
114 {
115         struct ptlrpc_request *req;
116         struct mds_body *body;
117         int rc, size[2] = {sizeof(*body), 0}, bufcount = 1;
118         ENTRY;
119
120         req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_GETATTR, 1, size,
121                               NULL);
122         if (!req)
123                 GOTO(out, rc = -ENOMEM);
124
125         body = lustre_msg_buf(req->rq_reqmsg, 0);
126         ll_ino2fid(&body->fid1, ino, 0, type);
127         body->valid = valid;
128         mds_pack_req_body(req); 
129
130         if (S_ISREG(type)) {
131                 struct client_obd *mdc = &class_conn2obd(conn)->u.cli;
132                 bufcount = 2;
133                 size[1] = mdc->cl_max_mds_easize;
134         } else if (valid & OBD_MD_LINKNAME) {
135                 bufcount = 2;
136                 size[1] = ea_size;
137         }
138         req->rq_replen = lustre_msg_size(bufcount, size);
139
140         rc = ptlrpc_queue_wait(req);
141         rc = ptlrpc_check_status(req, rc);
142
143         if (!rc) {
144                 body = lustre_msg_buf(req->rq_repmsg, 0);
145                 mds_unpack_body(body);
146                 CDEBUG(D_NET, "mode: %o\n", body->mode);
147         }
148
149         EXIT;
150  out:
151         *request = req;
152         return rc;
153 }
154
155 static int mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
156                             void *data, __u32 data_len)
157 {
158         int rc;
159         struct inode *inode = data;
160         struct lustre_handle lockh;
161         ENTRY;
162
163         if (data_len != sizeof(*inode)) {
164                 CERROR("data_len should be %d, but is %d\n", sizeof(*inode),
165                        data_len);
166                 LBUG();
167                 RETURN(-EINVAL);
168         }
169
170         if (inode == NULL)
171                 LBUG();
172
173         /* FIXME: do something better than throwing away everything */
174         if (S_ISDIR(inode->i_mode)) {
175                 CDEBUG(D_INODE, "invalidating inode %ld\n",
176                        inode->i_ino);
177                 invalidate_inode_pages(inode);
178         }
179
180         ldlm_lock2handle(lock, &lockh);
181         rc = ldlm_cli_cancel(&lockh);
182         if (rc < 0) {
183                 CERROR("ldlm_cli_cancel: %d\n", rc);
184                 LBUG();
185         }
186         RETURN(0);
187 }
188
189 int mdc_enqueue(struct lustre_handle *conn, int lock_type,
190                 struct lookup_intent *it, int lock_mode, struct inode *dir,
191                 struct dentry *de, struct lustre_handle *lockh,
192                 char *tgt, int tgtlen, void *data, int datalen)
193 {
194         struct ptlrpc_request *req;
195         struct obd_device *obddev = class_conn2obd(conn);
196         __u64 res_id[RES_NAME_SIZE] = {dir->i_ino};
197         int size[5] = {sizeof(struct ldlm_request), sizeof(struct ldlm_intent)};
198         int rc, flags;
199         int repsize[3] = {sizeof(struct ldlm_reply),
200                           sizeof(struct mds_body),
201                           obddev->u.cli.cl_max_mds_easize};
202         struct ldlm_reply *dlm_rep;
203         struct ldlm_intent *lit;
204         ENTRY;
205
206         LDLM_DEBUG_NOLOCK("mdsintent %s dir %ld", ldlm_it2str(it->it_op),
207                           dir->i_ino);
208
209         switch (it->it_op) {
210         case IT_MKDIR:
211                 it->it_mode = (it->it_mode | S_IFDIR) & ~current->fs->umask;
212                 break;
213         case (IT_CREAT|IT_OPEN):
214         case IT_CREAT:
215                 it->it_mode |= S_IFREG; /* no break */
216         case IT_MKNOD:
217                 it->it_mode &= ~current->fs->umask;
218                 break;
219         case IT_SYMLINK:
220                 it->it_mode = (it->it_mode | S_IFLNK) & ~current->fs->umask;
221                 break;
222         }
223
224         if (it->it_op & (IT_MKDIR | IT_CREAT | IT_SYMLINK | IT_MKNOD)) {
225                 size[2] = sizeof(struct mds_rec_create);
226                 size[3] = de->d_name.len + 1;
227                 size[4] = tgtlen + 1;
228                 req = ptlrpc_prep_req(class_conn2cliimp(conn), LDLM_ENQUEUE, 5,
229                                       size, NULL);
230                 if (!req)
231                         RETURN(-ENOMEM);
232
233                 /* pack the intent */
234                 lit = lustre_msg_buf(req->rq_reqmsg, 1);
235                 lit->opc = NTOH__u64((__u64)it->it_op);
236
237                 /* pack the intended request */
238                 mds_create_pack(req, 2, dir, it->it_mode, 0, current->fsuid,
239                                 current->fsgid, CURRENT_TIME, de->d_name.name,
240                                 de->d_name.len, tgt, tgtlen);
241                 req->rq_replen = lustre_msg_size(3, repsize);
242         } else if (it->it_op == IT_RENAME2) {
243                 struct dentry *old_de = it->it_data;
244
245                 size[2] = sizeof(struct mds_rec_rename);
246                 size[3] = old_de->d_name.len + 1;
247                 size[4] = de->d_name.len + 1;
248                 req = ptlrpc_prep_req(class_conn2cliimp(conn), LDLM_ENQUEUE, 5,
249                                       size, NULL);
250                 if (!req)
251                         RETURN(-ENOMEM);
252
253                 /* pack the intent */
254                 lit = lustre_msg_buf(req->rq_reqmsg, 1);
255                 lit->opc = NTOH__u64((__u64)it->it_op);
256
257                 /* pack the intended request */
258                 mds_rename_pack(req, 2, old_de->d_parent->d_inode, dir,
259                                 old_de->d_name.name, old_de->d_name.len,
260                                 de->d_name.name, de->d_name.len);
261                 req->rq_replen = lustre_msg_size(3, repsize);
262         } else if (it->it_op == IT_LINK2) {
263                 struct dentry *old_de = it->it_data;
264
265                 size[2] = sizeof(struct mds_rec_link);
266                 size[3] = de->d_name.len + 1;
267                 req = ptlrpc_prep_req(class_conn2cliimp(conn), LDLM_ENQUEUE, 4,
268                                       size, NULL);
269                 if (!req)
270                         RETURN(-ENOMEM);
271
272                 /* pack the intent */
273                 lit = lustre_msg_buf(req->rq_reqmsg, 1);
274                 lit->opc = NTOH__u64((__u64)it->it_op);
275
276                 /* pack the intended request */
277                 mds_link_pack(req, 2, old_de->d_inode, dir,
278                                 de->d_name.name, de->d_name.len);
279                 req->rq_replen = lustre_msg_size(3, repsize);
280         } else if (it->it_op == IT_UNLINK || it->it_op == IT_RMDIR) {
281                 size[2] = sizeof(struct mds_rec_unlink);
282                 size[3] = de->d_name.len + 1;
283                 req = ptlrpc_prep_req(class_conn2cliimp(conn), LDLM_ENQUEUE, 4,
284                                       size, NULL);
285                 if (!req)
286                         RETURN(-ENOMEM);
287
288                 /* pack the intent */
289                 lit = lustre_msg_buf(req->rq_reqmsg, 1);
290                 lit->opc = NTOH__u64((__u64)it->it_op);
291
292                 /* pack the intended request */
293                 mds_unlink_pack(req, 2, dir, NULL,
294                                 it->it_op == IT_UNLINK ? S_IFREG : S_IFDIR,
295                                 de->d_name.name, de->d_name.len);
296
297                 req->rq_replen = lustre_msg_size(3, repsize);
298         } else if (it->it_op  & (IT_GETATTR | IT_RENAME | IT_LINK | 
299                    IT_OPEN |  IT_SETATTR | IT_LOOKUP | IT_READLINK)) {
300                 size[2] = sizeof(struct mds_body);
301                 size[3] = de->d_name.len + 1;
302
303                 req = ptlrpc_prep_req(class_conn2cliimp(conn), LDLM_ENQUEUE, 4,
304                                       size, NULL);
305                 if (!req)
306                         RETURN(-ENOMEM);
307
308                 /* pack the intent */
309                 lit = lustre_msg_buf(req->rq_reqmsg, 1);
310                 lit->opc = NTOH__u64((__u64)it->it_op);
311
312                 /* pack the intended request */
313                 mds_getattr_pack(req, 2, dir, de->d_name.name, de->d_name.len);
314
315                 /* we need to replay opens */
316                 if (it->it_op == IT_OPEN)
317                         req->rq_flags |= PTL_RPC_FL_REPLAY;
318
319                 /* get ready for the reply */
320                 req->rq_replen = lustre_msg_size(3, repsize);
321         } else if (it->it_op == IT_READDIR) {
322                 req = ptlrpc_prep_req(class_conn2cliimp(conn), LDLM_ENQUEUE, 1,
323                                       size, NULL);
324                 if (!req)
325                         RETURN(-ENOMEM);
326
327                 /* get ready for the reply */
328                 req->rq_replen = lustre_msg_size(1, repsize);
329         } else {
330                 LBUG();
331                 RETURN(-EINVAL);
332         }
333 #warning FIXME: the data here needs to be different if a lock was granted for a different inode
334         rc = ldlm_cli_enqueue(conn, req, obddev->obd_namespace, NULL, res_id,
335                               lock_type, NULL, 0, lock_mode, &flags,
336                               ldlm_completion_ast, mdc_blocking_ast, data,
337                               datalen, lockh);
338         if (rc == -ENOENT) {
339                 /* This can go when we're sure that this can never happen */
340                 LBUG();
341         }
342         if (rc == ELDLM_LOCK_ABORTED) {
343                 lock_mode = 0;
344                 memset(lockh, 0, sizeof(*lockh));
345                 /* rc = 0 */
346         } else if (rc != 0) {
347                 CERROR("ldlm_cli_enqueue: %d\n", rc);
348                 RETURN(rc);
349         }
350
351         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
352         it->it_disposition = (int) dlm_rep->lock_policy_res1;
353         it->it_status = (int) dlm_rep->lock_policy_res2;
354         it->it_lock_mode = lock_mode;
355         it->it_data = req;
356
357         RETURN(0);
358 }
359
360 static void mdc_replay_open(struct ptlrpc_request *req, void *data)
361 {
362         __u64 *fh = data;
363         struct mds_body *body;
364         
365         body = lustre_msg_buf(req->rq_repmsg, 0);
366         mds_unpack_body(body);
367         *fh = body->extra;
368 }
369
370 int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags,
371              struct lov_stripe_md *lsm, __u64 cookie, __u64 *fh,
372              struct ptlrpc_request **request)
373 {
374         struct mds_body *body;
375         int rc, size[2] = {sizeof(*body)}, bufcount = 1;
376         struct ptlrpc_request *req;
377         ENTRY;
378
379         if (lsm) {
380                 bufcount = 2;
381                 // size[1] = mdc->cl_max_mds_easize; soon...
382                 size[1] = lsm->lsm_mds_easize;
383         }
384
385         req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_OPEN, bufcount, size,
386                               NULL);
387         if (!req)
388                 GOTO(out, rc = -ENOMEM);
389
390         req->rq_flags |= PTL_RPC_FL_REPLAY;
391         body = lustre_msg_buf(req->rq_reqmsg, 0);
392
393         ll_ino2fid(&body->fid1, ino, 0, type);
394         body->flags = HTON__u32(flags);
395         body->extra = cookie;
396
397         if (lsm)
398                 lov_packmd(lustre_msg_buf(req->rq_reqmsg, 1), lsm);
399
400         req->rq_replen = lustre_msg_size(1, size);
401
402         rc = ptlrpc_queue_wait(req);
403         rc = ptlrpc_check_status(req, rc);
404         if (!rc) {
405                 body = lustre_msg_buf(req->rq_repmsg, 0);
406                 mds_unpack_body(body);
407                 *fh = body->extra;
408         }
409
410         /* If open is replayed, we need to fix up the fh. */
411         req->rq_replay_cb = mdc_replay_open;
412         req->rq_replay_cb_data = fh;
413
414         EXIT;
415  out:
416         *request = req;
417         return rc;
418 }
419
420 int mdc_close(struct lustre_handle *conn,
421               obd_id ino, int type, __u64 fh, struct ptlrpc_request **request)
422 {
423         struct mds_body *body;
424         int rc, size = sizeof(*body);
425         struct ptlrpc_request *req;
426
427         req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_CLOSE, 1, &size,
428                               NULL);
429         if (!req)
430                 GOTO(out, rc = -ENOMEM);
431
432         body = lustre_msg_buf(req->rq_reqmsg, 0);
433         ll_ino2fid(&body->fid1, ino, 0, type);
434         body->extra = fh;
435
436         req->rq_replen = lustre_msg_size(0, NULL);
437
438         rc = ptlrpc_queue_wait(req);
439         rc = ptlrpc_check_status(req, rc);
440
441         EXIT;
442  out:
443         *request = req;
444         return rc;
445 }
446
447 int mdc_readpage(struct lustre_handle *conn, obd_id ino, int type, __u64 offset,
448                  char *addr, struct ptlrpc_request **request)
449 {
450         struct ptlrpc_connection *connection = 
451                 client_conn2cli(conn)->cl_import.imp_connection;
452         struct ptlrpc_request *req = NULL;
453         struct ptlrpc_bulk_desc *desc = NULL;
454         struct ptlrpc_bulk_page *bulk = NULL;
455         struct mds_body *body;
456         int rc, size = sizeof(*body);
457         ENTRY;
458
459         CDEBUG(D_INODE, "inode: %ld\n", (long)ino);
460
461         desc = ptlrpc_prep_bulk(connection);
462         if (desc == NULL)
463                 GOTO(out, rc = -ENOMEM);
464
465         req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_READPAGE, 1, &size,
466                               NULL);
467         if (!req)
468                 GOTO(out2, rc = -ENOMEM);
469
470         bulk = ptlrpc_prep_bulk_page(desc);
471         bulk->bp_buflen = PAGE_SIZE;
472         bulk->bp_buf = addr;
473         bulk->bp_xid = req->rq_xid;
474         desc->bd_portal = MDS_BULK_PORTAL;
475
476         rc = ptlrpc_register_bulk(desc);
477         if (rc) {
478                 CERROR("couldn't setup bulk sink: error %d.\n", rc);
479                 GOTO(out2, rc);
480         }
481
482         body = lustre_msg_buf(req->rq_reqmsg, 0);
483         body->fid1.id = ino;
484         body->fid1.f_type = type;
485         body->size = offset;
486
487         req->rq_replen = lustre_msg_size(1, &size);
488         rc = ptlrpc_queue_wait(req);
489         rc = ptlrpc_check_status(req, rc);
490         if (rc) {
491                 ptlrpc_abort_bulk(desc);
492                 GOTO(out2, rc);
493         } else {
494                 body = lustre_msg_buf(req->rq_repmsg, 0);
495                 mds_unpack_body(body);
496         }
497
498         EXIT;
499  out2:
500         ptlrpc_free_bulk(desc);
501  out:
502         *request = req;
503         return rc;
504 }
505
506 int mdc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs,
507                struct ptlrpc_request **request)
508 {
509         struct ptlrpc_request *req;
510         int rc, size = sizeof(*osfs);
511         ENTRY;
512
513         req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_STATFS, 0, NULL,
514                               NULL);
515         if (!req)
516                 GOTO(out, rc = -ENOMEM);
517         req->rq_replen = lustre_msg_size(1, &size);
518
519         rc = ptlrpc_queue_wait(req);
520         rc = ptlrpc_check_status(req, rc);
521
522         if (rc)
523                 GOTO(out, rc);
524
525         obd_statfs_unpack(osfs, lustre_msg_buf(req->rq_repmsg, 0));
526
527         EXIT;
528 out:
529         *request = req;
530
531         return rc;
532 }
533
534 struct obd_ops mdc_obd_ops = {
535         o_setup:   client_obd_setup,
536         o_cleanup: client_obd_cleanup,
537         o_connect: client_obd_connect,
538         o_disconnect: client_obd_disconnect,
539 };
540
541 static int __init ptlrpc_request_init(void)
542 {
543         return class_register_type(&mdc_obd_ops, LUSTRE_MDC_NAME);
544 }
545
546 static void __exit ptlrpc_request_exit(void)
547 {
548         class_unregister_type(LUSTRE_MDC_NAME);
549 }
550
551 MODULE_AUTHOR("Cluster File Systems <info@clusterfs.com>");
552 MODULE_DESCRIPTION("Lustre Metadata Client v1.0");
553 MODULE_LICENSE("GPL");
554
555 EXPORT_SYMBOL(mdc_getstatus);
556 EXPORT_SYMBOL(mdc_getlovinfo);
557 EXPORT_SYMBOL(mdc_enqueue);
558 EXPORT_SYMBOL(mdc_getattr);
559 EXPORT_SYMBOL(mdc_statfs);
560 EXPORT_SYMBOL(mdc_create);
561 EXPORT_SYMBOL(mdc_unlink);
562 EXPORT_SYMBOL(mdc_rename);
563 EXPORT_SYMBOL(mdc_link);
564 EXPORT_SYMBOL(mdc_readpage);
565 EXPORT_SYMBOL(mdc_setattr);
566 EXPORT_SYMBOL(mdc_close);
567 EXPORT_SYMBOL(mdc_open);
568
569 module_init(ptlrpc_request_init);
570 module_exit(ptlrpc_request_exit);