Whamcloud - gitweb
da3eed96a2d82aefb51db51a4ba7e3b2f7ff0902
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /*
2  *  ost/ost_handler.c
3  *  Storage Target Handling functions
4  *  
5  *  Lustre Object Server Module (OST)
6  * 
7  *  Copyright (C) 2001  Cluster File Systems, Inc.
8  *
9  *  This code is issued under the GNU General Public License.
10  *  See the file COPYING in this distribution
11  *
12  *  by Peter Braam <braam@clusterfs.com>
13  * 
14  *  This server is single threaded at present (but can easily be multi threaded). 
15  *  For testing and management it is treated as an obd_device, although it does
16  *  not export a full OBD method table (the requests are coming in over the wire, 
17  *  so object target modules do not have a full method table.)
18  * 
19  */
20
21
22 #define EXPORT_SYMTAB
23
24 #include <linux/version.h>
25 #include <linux/module.h>
26 #include <linux/fs.h>
27 #include <linux/stat.h>
28 #include <linux/locks.h>
29 #include <linux/ext2_fs.h>
30 #include <linux/quotaops.h>
31 #include <asm/unistd.h>
32 #include <linux/obd_support.h>
33 #include <linux/obd.h>
34 #include <linux/obd_class.h>
35 #include <linux/lustre_lib.h>
36 #include <linux/lustre_idl.h>
37 #include <linux/lustre_mds.h>
38 #include <linux/obd_class.h>
39
40 // for testing
41 static int ost_queue_req(struct obd_device *obddev, struct ptlrpc_request *req)
42 {
43         struct ptlrpc_request *srv_req; 
44         struct ost_obd *ost = &obddev->u.ost;
45         
46         if (!ost) { 
47                 EXIT;
48                 return -1;
49         }
50
51         srv_req = kmalloc(sizeof(*srv_req), GFP_KERNEL); 
52         if (!srv_req) { 
53                 EXIT;
54                 return -ENOMEM;
55         }
56
57         printk("---> OST at %d %p, incoming req %p, srv_req %p\n", 
58                __LINE__, ost, req, srv_req);
59
60         memset(srv_req, 0, sizeof(*req)); 
61
62         /* move the request buffer */
63         srv_req->rq_reqbuf = req->rq_reqbuf;
64         srv_req->rq_reqlen    = req->rq_reqlen;
65         srv_req->rq_ost = ost;
66
67         /* remember where it came from */
68         srv_req->rq_reply_handle = req;
69
70         list_add(&srv_req->rq_list, &ost->ost_reqs); 
71         wake_up(&ost->ost_waitq);
72         return 0;
73 }
74
75
76 /* XXX replace with networking code */
77 int ost_reply(struct obd_device *obddev, struct ptlrpc_request *req)
78 {
79         struct ptlrpc_request *clnt_req = req->rq_reply_handle;
80
81         ENTRY;
82         printk("ost_reply: req %p clnt_req at %p\n", req, clnt_req); 
83
84         /* free the request buffer */
85         kfree(req->rq_reqbuf);
86         req->rq_reqbuf = NULL; 
87         
88         /* move the reply to the client */ 
89         clnt_req->rq_replen = req->rq_replen;
90         clnt_req->rq_repbuf = req->rq_repbuf;
91
92         printk("---> client req %p repbuf %p len %d status %d\n", 
93                clnt_req, clnt_req->rq_repbuf, clnt_req->rq_replen, 
94                req->rq_rephdr->status); 
95
96         req->rq_repbuf = NULL;
97         req->rq_replen = 0;
98         
99         /* free the server request */
100         kfree(req); 
101         /* wake up the client */ 
102         wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
103         EXIT;
104         return 0;
105 }
106
107 int ost_error(struct obd_device *obddev, struct ptlrpc_request *req)
108 {
109         struct ptlrep_hdr *hdr;
110
111         ENTRY;
112         hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
113         if (!hdr) { 
114                 EXIT;
115                 return -ENOMEM;
116         }
117
118         memset(hdr, 0, sizeof(*hdr));
119         
120         hdr->seqno = req->rq_reqhdr->seqno;
121         hdr->status = req->rq_status; 
122         hdr->type = OST_TYPE_ERR;
123
124         req->rq_repbuf = (char *)hdr;
125         req->rq_replen = sizeof(*hdr); 
126
127         EXIT;
128         return ost_reply(obddev, req);
129 }
130
131 static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
132 {
133         struct obd_conn conn; 
134         int rc;
135
136         ENTRY;
137         
138         conn.oc_id = req->rq_req.ost->connid;
139         conn.oc_dev = ost->ost_tgt;
140
141         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
142                           &req->rq_replen, &req->rq_repbuf); 
143         if (rc) { 
144                 printk("ost_destroy: cannot pack reply\n"); 
145                 return rc;
146         }
147
148         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_destroy
149                 (&conn, &req->rq_req.ost->oa); 
150
151         EXIT;
152         return 0;
153 }
154
155 static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
156 {
157         struct obd_conn conn; 
158         int rc;
159
160         ENTRY;
161         printk("ost getattr entered\n"); 
162         
163         conn.oc_id = req->rq_req.ost->connid;
164         conn.oc_dev = ost->ost_tgt;
165
166         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
167                           &req->rq_replen, &req->rq_repbuf); 
168         if (rc) { 
169                 printk("ost_getattr: cannot pack reply\n"); 
170                 return rc;
171         }
172         req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id;
173         req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid;
174
175         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_getattr
176                 (&conn, &req->rq_rep.ost->oa); 
177
178         EXIT;
179         return 0;
180 }
181
182 static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
183 {
184         struct obd_conn conn; 
185         int rc;
186
187         ENTRY;
188         
189         conn.oc_id = req->rq_req.ost->connid;
190         conn.oc_dev = ost->ost_tgt;
191
192         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
193                           &req->rq_replen, &req->rq_repbuf); 
194         if (rc) { 
195                 printk("ost_create: cannot pack reply\n"); 
196                 return rc;
197         }
198
199         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa));
200
201         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_create
202                 (&conn, &req->rq_rep.ost->oa); 
203
204         EXIT;
205         return 0;
206 }
207
208
209 static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
210 {
211         struct obd_conn conn; 
212         int rc;
213
214         ENTRY;
215         
216         conn.oc_id = req->rq_req.ost->connid;
217         conn.oc_dev = ost->ost_tgt;
218
219         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
220                           &req->rq_replen, &req->rq_repbuf); 
221         if (rc) { 
222                 printk("ost_setattr: cannot pack reply\n"); 
223                 return rc;
224         }
225
226         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa));
227
228         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_setattr
229                 (&conn, &req->rq_rep.ost->oa); 
230
231         EXIT;
232         return 0;
233 }
234
235 static int ost_connect(struct ost_obd *ost, struct ptlrpc_request *req)
236 {
237         struct obd_conn conn; 
238         int rc;
239
240         ENTRY;
241         
242         conn.oc_dev = ost->ost_tgt;
243
244         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
245                           &req->rq_replen, &req->rq_repbuf); 
246         if (rc) { 
247                 printk("ost_setattr: cannot pack reply\n"); 
248                 return rc;
249         }
250
251         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_connect(&conn);
252
253         printk("ost_connect: rep buffer %p, id %d\n", req->rq_repbuf, 
254                conn.oc_id);
255         req->rq_rep.ost->connid = conn.oc_id;
256         EXIT;
257         return 0;
258 }
259
260
261 static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req)
262 {
263         struct obd_conn conn; 
264         int rc;
265
266         ENTRY;
267         
268         conn.oc_dev = ost->ost_tgt;
269         conn.oc_id = req->rq_req.ost->connid;
270
271         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
272                           &req->rq_replen, &req->rq_repbuf); 
273         if (rc) { 
274                 printk("ost_setattr: cannot pack reply\n"); 
275                 return rc;
276         }
277
278         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_disconnect(&conn);
279
280         EXIT;
281         return 0;
282 }
283
284 static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
285 {
286         struct obd_conn conn; 
287         int rc;
288         int vallen;
289         void *val;
290         char *ptr; 
291
292         ENTRY;
293         
294         conn.oc_id = req->rq_req.ost->connid;
295         conn.oc_dev = ost->ost_tgt;
296
297         ptr = ost_req_buf1(req->rq_req.ost);
298         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_get_info
299                 (&conn, req->rq_req.ost->buflen1, ptr, &vallen, &val); 
300
301         rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr, &req->rq_rep.ost,
302                           &req->rq_replen, &req->rq_repbuf); 
303         if (rc) { 
304                 printk("ost_setattr: cannot pack reply\n"); 
305                 return rc;
306         }
307
308         EXIT;
309         return 0;
310 }
311
312
313 #if 0
314 static struct page * ext2_get_page(struct inode *dir, unsigned long n)
315 {
316         struct address_space *mapping = dir->i_mapping;
317         struct page *page = read_cache_page(mapping, n,
318                                 (filler_t*)mapping->a_ops->readpage, NULL);
319         if (!IS_ERR(page)) {
320                 wait_on_page(page);
321                 kmap(page);
322                 if (!Page_Uptodate(page))
323                         goto fail;
324                 if (!PageChecked(page))
325                         ext2_check_page(page);
326                 if (PageError(page))
327                         goto fail;
328         }
329         return page;
330
331 fail:
332         ext2_put_page(page);
333         return ERR_PTR(-EIO);
334 }
335
336 static inline void ext2_put_page(struct page *page)
337 {
338         kunmap(page);
339         page_cache_release(page);
340 }
341
342 /* Releases the page */
343 void ext2_set_link(struct inode *dir, struct ext2_dir_entry_2 *de,
344                         struct page *page, struct inode *inode)
345 {
346         unsigned from = (char *) de - (char *) page_address(page);
347         unsigned to = from + le16_to_cpu(de->rec_len);
348         int err;
349
350         lock_page(page);
351         err = page->mapping->a_ops->prepare_write(NULL, page, from, to);
352         if (err)
353                 BUG();
354         de->inode = cpu_to_le32(inode->i_ino);
355         ext2_set_de_type (de, inode);
356         dir->i_mtime = dir->i_ctime = CURRENT_TIME;
357         err = ext2_commit_chunk(page, from, to);
358         UnlockPage(page);
359         ext2_put_page(page);
360 }
361
362 static int ext2_commit_chunk(struct page *page, unsigned from, unsigned to)
363 {
364         struct inode *dir = page->mapping->host;
365         int err = 0;
366         dir->i_version = ++event;
367         SetPageUptodate(page);
368         set_page_clean(page);
369
370         //page->mapping->a_ops->commit_write(NULL, page, from, to);
371         //if (IS_SYNC(dir))
372         //      err = waitfor_one_page(page);
373         return err;
374 }
375
376 #endif
377
378 int ost_prepw(struct ost_obd *obddev, struct ptlrpc_request *req)
379 {
380 #if 0
381         struct obd_conn conn; 
382         int rc;
383         int i, j, n;
384         int objcount;
385         void *tmp;
386         struct niobuf **nb;
387         struct obd_ioo **ioo;
388
389         ENTRY;
390         
391         tmp1 = ost_req_buf1(req);
392         tmp2 = ost_req_buf2(req);
393         objcount = req->buflen1 / sizeof(**ioo); 
394
395         n = 0;
396         for (i=0 ; i<objcount ; i++) { 
397                 obd_unpack_ioo
398
399         conn.oc_id = req->rq_req.ost->connid;
400         conn.oc_dev = ost->ost_tgt;
401
402         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
403                           &req->rq_replen, &req->rq_repbuf); 
404         if (rc) { 
405                 printk("ost_create: cannot pack reply\n"); 
406                 return rc;
407         }
408
409         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa, sizeof(req->rq_req.ost->oa));
410
411         req->rq_rep.ost->result =ost->ost_tgt->obd_type->typ_ops->o_create
412                 (&conn, &req->rq_rep.ost->oa); 
413
414         EXIT;
415         return 0;
416 #endif
417         return -ENOTSUPP;
418
419 }
420
421
422 int ost_handle(struct obd_device *obddev, struct ptlrpc_request *req)
423 {
424         int rc;
425         struct ost_obd *ost = &obddev->u.ost;
426         struct ptlreq_hdr *hdr;
427
428         ENTRY;
429         printk("ost_handle: req at %p\n", req); 
430
431         hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
432         if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
433                 printk("lustre_ost: wrong packet type sent %d\n",
434                        NTOH__u32(hdr->type));
435                 rc = -EINVAL;
436                 goto out;
437         }
438
439         rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
440                             &req->rq_reqhdr, &req->rq_req.ost);
441         if (rc) { 
442                 printk("lustre_ost: Invalid request\n");
443                 EXIT; 
444                 goto out;
445         }
446
447         switch (req->rq_reqhdr->opc) { 
448
449         case OST_CONNECT:
450                 CDEBUG(D_INODE, "connect\n");
451                 printk("----> connect \n"); 
452                 rc = ost_connect(ost, req);
453                 break;
454         case OST_DISCONNECT:
455                 CDEBUG(D_INODE, "disconnect\n");
456                 rc = ost_disconnect(ost, req);
457                 break;
458         case OST_GET_INFO:
459                 CDEBUG(D_INODE, "get_info\n");
460                 rc = ost_get_info(ost, req);
461                 break;
462         case OST_CREATE:
463                 CDEBUG(D_INODE, "create\n");
464                 rc = ost_create(ost, req);
465                 break;
466         case OST_DESTROY:
467                 CDEBUG(D_INODE, "destroy\n");
468                 rc = ost_destroy(ost, req);
469                 break;
470         case OST_GETATTR:
471                 CDEBUG(D_INODE, "getattr\n");
472                 rc = ost_getattr(ost, req);
473                 break;
474         case OST_SETATTR:
475                 CDEBUG(D_INODE, "setattr\n");
476                 rc = ost_setattr(ost, req);
477                 break;
478         case OST_PREPW:
479                 CDEBUG(D_INODE, "prepw\n");
480                 rc = ost_prepw(ost, req);
481                 break;
482         default:
483                 req->rq_status = -ENOTSUPP;
484                 return ost_error(obddev, req);
485         }
486
487 out:
488         req->rq_status = rc;
489         if (rc) { 
490                 printk("ost: processing error %d\n", rc);
491                 ost_error(obddev, req);
492         } else { 
493                 CDEBUG(D_INODE, "sending reply\n"); 
494                 ost_reply(obddev, req); 
495         }
496
497         return 0;
498 }
499
500 int ost_main(void *arg)
501 {
502         struct obd_device *obddev = (struct obd_device *) arg;
503         struct ost_obd *ost = &obddev->u.ost;
504         ENTRY;
505         printk("---> %d\n", __LINE__);
506
507
508         lock_kernel();
509         printk("---> %d\n", __LINE__);
510         daemonize();
511         printk("---> %d\n", __LINE__);
512         spin_lock_irq(&current->sigmask_lock);
513         printk("---> %d\n", __LINE__);
514         sigfillset(&current->blocked);
515         printk("---> %d\n", __LINE__);
516         recalc_sigpending(current);
517         printk("---> %d\n", __LINE__);
518         spin_unlock_irq(&current->sigmask_lock);
519         printk("---> %d\n", __LINE__);
520
521         printk("---> %d\n", __LINE__);
522         sprintf(current->comm, "lustre_ost");
523         printk("---> %d\n", __LINE__);
524
525         /* Record that the  thread is running */
526         ost->ost_thread = current;
527         printk("---> %d\n", __LINE__);
528         wake_up(&ost->ost_done_waitq); 
529         printk("---> %d\n", __LINE__);
530
531         /* XXX maintain a list of all managed devices: insert here */
532
533         /* And now, wait forever for commit wakeup events. */
534         while (1) {
535                 struct ptlrpc_request *request;
536                 int rc; 
537
538                 if (ost->ost_flags & OST_EXIT)
539                         break;
540
541
542                 wake_up(&ost->ost_done_waitq);
543                 interruptible_sleep_on(&ost->ost_waitq);
544
545                 CDEBUG(D_INODE, "lustre_ost wakes\n");
546                 CDEBUG(D_INODE, "pick up req here and continue\n"); 
547
548                 if (list_empty(&ost->ost_reqs)) { 
549                         CDEBUG(D_INODE, "woke because of timer\n"); 
550                 } else { 
551                         printk("---> %d\n", __LINE__);
552                         request = list_entry(ost->ost_reqs.next, 
553                                              struct ptlrpc_request, rq_list);
554                         printk("---> %d\n", __LINE__);
555                         list_del(&request->rq_list);
556                         rc = ost_handle(obddev, request); 
557                 }
558         }
559
560         /* XXX maintain a list of all managed devices: cleanup here */
561         printk("---> %d\n", __LINE__);
562         ost->ost_thread = NULL;
563         printk("---> %d\n", __LINE__);
564         wake_up(&ost->ost_done_waitq);
565         printk("lustre_ost: exiting\n");
566         return 0;
567 }
568
569 static void ost_stop_srv_thread(struct ost_obd *ost)
570 {
571         ost->ost_flags |= OST_EXIT;
572
573         while (ost->ost_thread) {
574                 wake_up(&ost->ost_waitq);
575                 sleep_on(&ost->ost_done_waitq);
576         }
577 }
578
579 static void ost_start_srv_thread(struct obd_device *obd)
580 {
581         struct ost_obd *ost = &obd->u.ost;
582         ENTRY;
583
584         init_waitqueue_head(&ost->ost_waitq);
585         printk("---> %d\n", __LINE__);
586         init_waitqueue_head(&ost->ost_done_waitq);
587         printk("---> %d\n", __LINE__);
588         kernel_thread(ost_main, (void *)obd, 
589                       CLONE_VM | CLONE_FS | CLONE_FILES);
590         printk("---> %d\n", __LINE__);
591         while (!ost->ost_thread) 
592                 sleep_on(&ost->ost_done_waitq);
593         printk("---> %d\n", __LINE__);
594         EXIT;
595 }
596
597 /* mount the file system (secretly) */
598 static int ost_setup(struct obd_device *obddev, obd_count len,
599                         void *buf)
600                         
601 {
602         struct obd_ioctl_data* data = buf;
603         struct ost_obd *ost = &obddev->u.ost;
604         struct obd_device *tgt;
605         int err; 
606         ENTRY;
607
608         if (data->ioc_dev  < 0 || data->ioc_dev > MAX_OBD_DEVICES) { 
609                 EXIT;
610                 return -ENODEV;
611         }
612
613         tgt = &obd_dev[data->ioc_dev];
614         ost->ost_tgt = tgt;
615         if ( ! (tgt->obd_flags & OBD_ATTACHED) || 
616              ! (tgt->obd_flags & OBD_SET_UP) ){
617                 printk("device not attached or not set up (%d)\n", 
618                        data->ioc_dev);
619                 EXIT;
620                 return -EINVAL;
621         } 
622
623         ost->ost_conn.oc_dev = tgt;
624         err = tgt->obd_type->typ_ops->o_connect(&ost->ost_conn);
625         if (err) { 
626                 printk("lustre ost: fail to connect to device %d\n", 
627                        data->ioc_dev); 
628                 return -EINVAL;
629         }
630
631         INIT_LIST_HEAD(&ost->ost_reqs);
632         ost->ost_thread = NULL;
633         ost->ost_flags = 0;
634
635         spin_lock_init(&obddev->u.ost.ost_lock);
636
637         ost_start_srv_thread(obddev);
638
639         MOD_INC_USE_COUNT;
640         EXIT; 
641         return 0;
642
643
644 static int ost_cleanup(struct obd_device * obddev)
645 {
646         struct ost_obd *ost = &obddev->u.ost;
647         struct obd_device *tgt;
648         int err;
649
650         ENTRY;
651
652         if ( !(obddev->obd_flags & OBD_SET_UP) ) {
653                 EXIT;
654                 return 0;
655         }
656
657         if ( !list_empty(&obddev->obd_gen_clients) ) {
658                 printk(KERN_WARNING __FUNCTION__ ": still has clients!\n");
659                 EXIT;
660                 return -EBUSY;
661         }
662
663         ost_stop_srv_thread(ost);
664
665         if (!list_empty(&ost->ost_reqs)) {
666                 // XXX reply with errors and clean up
667                 CDEBUG(D_INODE, "Request list not empty!\n");
668         }
669
670         tgt = ost->ost_tgt;
671         err = tgt->obd_type->typ_ops->o_disconnect(&ost->ost_conn);
672         if (err) { 
673                 printk("lustre ost: fail to disconnect device\n");
674                 return -EINVAL;
675         }
676         
677
678         MOD_DEC_USE_COUNT;
679         EXIT;
680         return 0;
681 }
682
683 /* use obd ops to offer management infrastructure */
684 static struct obd_ops ost_obd_ops = {
685         o_setup:       ost_setup,
686         o_cleanup:     ost_cleanup,
687 };
688
689 static int __init ost_init(void)
690 {
691         obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
692         return 0;
693 }
694
695 static void __exit ost_exit(void)
696 {
697         obd_unregister_type(LUSTRE_OST_NAME);
698 }
699
700 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
701 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
702 MODULE_LICENSE("GPL");
703
704 // for testing (maybe this stays)
705 EXPORT_SYMBOL(ost_queue_req);
706
707 module_init(ost_init);
708 module_exit(ost_exit);