Whamcloud - gitweb
Changes for file creation and small fixes.
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /*
2  *  ost/ost_handler.c
3  *  Storage Target Handling functions
4  *  
5  *  Lustre Object Server Module (OST)
6  * 
7  *  Copyright (C) 2001  Cluster File Systems, Inc.
8  *
9  *  This code is issued under the GNU General Public License.
10  *  See the file COPYING in this distribution
11  *
12  *  by Peter Braam <braam@clusterfs.com>
13  * 
14  *  This server is single threaded at present (but can easily be multi threaded). 
15  *  For testing and management it is treated as an obd_device, although it does
16  *  not export a full OBD method table (the requests are coming in over the wire, 
17  *  so object target modules do not have a full method table.)
18  * 
19  */
20
21
22 #define EXPORT_SYMTAB
23
24 #include <linux/version.h>
25 #include <linux/module.h>
26 #include <linux/fs.h>
27 #include <linux/stat.h>
28 #include <linux/locks.h>
29 #include <linux/ext2_fs.h>
30 #include <linux/quotaops.h>
31 #include <asm/unistd.h>
32 #include <linux/obd_support.h>
33 #include <linux/obd.h>
34 #include <linux/obd_class.h>
35 #include <linux/lustre_lib.h>
36 #include <linux/lustre_idl.h>
37 #include <linux/lustre_mds.h>
38 #include <linux/obd_class.h>
39
40 // for testing
41 static int ost_queue_req(struct obd_device *obddev, struct ost_request *req)
42 {
43         struct ost_request *srv_req; 
44         struct ost_obd *ost = &obddev->u.ost;
45         
46         if (!ost) { 
47                 EXIT;
48                 return -1;
49         }
50
51         srv_req = kmalloc(sizeof(*srv_req), GFP_KERNEL); 
52         if (!srv_req) { 
53                 EXIT;
54                 return -ENOMEM;
55         }
56
57         printk("---> OST at %d %p, incoming req %p, srv_req %p\n", 
58                __LINE__, ost, req, srv_req);
59
60         memset(srv_req, 0, sizeof(*req)); 
61
62         /* move the request buffer */
63         srv_req->rq_reqbuf = req->rq_reqbuf;
64         srv_req->rq_reqlen    = req->rq_reqlen;
65         srv_req->rq_obd = ost;
66
67         /* remember where it came from */
68         srv_req->rq_reply_handle = req;
69
70         list_add(&srv_req->rq_list, &ost->ost_reqs); 
71         wake_up(&ost->ost_waitq);
72         return 0;
73 }
74
75
76 /* XXX replace with networking code */
77 int ost_reply(struct obd_device *obddev, struct ost_request *req)
78 {
79         struct ost_request *clnt_req = req->rq_reply_handle;
80
81         ENTRY;
82         printk("ost_reply: req %p clnt_req at %p\n", req, clnt_req); 
83
84         /* free the request buffer */
85         kfree(req->rq_reqbuf);
86         req->rq_reqbuf = NULL; 
87         
88         /* move the reply to the client */ 
89         clnt_req->rq_replen = req->rq_replen;
90         clnt_req->rq_repbuf = req->rq_repbuf;
91
92         printk("---> client req %p repbuf %p len %d status %d\n", 
93                clnt_req, clnt_req->rq_repbuf, clnt_req->rq_replen, 
94                req->rq_rephdr->status); 
95
96         req->rq_repbuf = NULL;
97         req->rq_replen = 0;
98         
99         /* free the server request */
100         kfree(req); 
101         /* wake up the client */ 
102         wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
103         EXIT;
104         return 0;
105 }
106
107 int ost_error(struct obd_device *obddev, struct ost_request *req)
108 {
109         struct ost_rep_hdr *hdr;
110
111         ENTRY;
112         hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
113         if (!hdr) { 
114                 EXIT;
115                 return -ENOMEM;
116         }
117
118         memset(hdr, 0, sizeof(*hdr));
119         
120         hdr->seqno = req->rq_reqhdr->seqno;
121         hdr->status = req->rq_status; 
122         hdr->type = OST_TYPE_ERR;
123
124         req->rq_repbuf = (char *)hdr;
125         req->rq_replen = sizeof(*hdr); 
126
127         EXIT;
128         return ost_reply(obddev, req);
129 }
130
131 static int ost_destroy(struct ost_obd *ost, struct ost_request *req)
132 {
133         struct obd_conn conn; 
134         int rc;
135
136         ENTRY;
137         
138         conn.oc_id = req->rq_req->connid;
139         conn.oc_dev = ost->ost_tgt;
140
141         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
142                           &req->rq_replen, &req->rq_repbuf); 
143         if (rc) { 
144                 printk("ost_destroy: cannot pack reply\n"); 
145                 return rc;
146         }
147
148         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_destroy
149                 (&conn, &req->rq_req->oa); 
150
151         EXIT;
152         return 0;
153 }
154
155 static int ost_getattr(struct ost_obd *ost, struct ost_request *req)
156 {
157         struct obd_conn conn; 
158         int rc;
159
160         ENTRY;
161         printk("ost getattr entered\n"); 
162         
163         conn.oc_id = req->rq_req->connid;
164         conn.oc_dev = ost->ost_tgt;
165
166         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
167                           &req->rq_replen, &req->rq_repbuf); 
168         if (rc) { 
169                 printk("ost_getattr: cannot pack reply\n"); 
170                 return rc;
171         }
172         req->rq_rep->oa.o_id = req->rq_req->oa.o_id;
173         req->rq_rep->oa.o_valid = req->rq_req->oa.o_valid;
174
175         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_getattr
176                 (&conn, &req->rq_rep->oa); 
177
178         EXIT;
179         return 0;
180 }
181
182 static int ost_create(struct ost_obd *ost, struct ost_request *req)
183 {
184         struct obd_conn conn; 
185         int rc;
186
187         ENTRY;
188         
189         conn.oc_id = req->rq_req->connid;
190         conn.oc_dev = ost->ost_tgt;
191
192         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
193                           &req->rq_replen, &req->rq_repbuf); 
194         if (rc) { 
195                 printk("ost_create: cannot pack reply\n"); 
196                 return rc;
197         }
198
199         memcpy(&req->rq_rep->oa, &req->rq_req->oa, sizeof(req->rq_req->oa));
200
201         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_create
202                 (&conn, &req->rq_rep->oa); 
203
204         EXIT;
205         return 0;
206 }
207
208
209 static int ost_setattr(struct ost_obd *ost, struct ost_request *req)
210 {
211         struct obd_conn conn; 
212         int rc;
213
214         ENTRY;
215         
216         conn.oc_id = req->rq_req->connid;
217         conn.oc_dev = ost->ost_tgt;
218
219         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
220                           &req->rq_replen, &req->rq_repbuf); 
221         if (rc) { 
222                 printk("ost_setattr: cannot pack reply\n"); 
223                 return rc;
224         }
225
226         memcpy(&req->rq_rep->oa, &req->rq_req->oa, sizeof(req->rq_req->oa));
227
228         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_setattr
229                 (&conn, &req->rq_rep->oa); 
230
231         EXIT;
232         return 0;
233 }
234
235 static int ost_connect(struct ost_obd *ost, struct ost_request *req)
236 {
237         struct obd_conn conn; 
238         int rc;
239
240         ENTRY;
241         
242         conn.oc_dev = ost->ost_tgt;
243
244         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
245                           &req->rq_replen, &req->rq_repbuf); 
246         if (rc) { 
247                 printk("ost_setattr: cannot pack reply\n"); 
248                 return rc;
249         }
250
251         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_connect(&conn);
252
253         printk("ost_connect: rep buffer %p, id %d\n", req->rq_repbuf, 
254                conn.oc_id);
255         req->rq_rep->connid = conn.oc_id;
256         EXIT;
257         return 0;
258 }
259
260
261 static int ost_disconnect(struct ost_obd *ost, struct ost_request *req)
262 {
263         struct obd_conn conn; 
264         int rc;
265
266         ENTRY;
267         
268         conn.oc_dev = ost->ost_tgt;
269         conn.oc_id = req->rq_req->connid;
270
271         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
272                           &req->rq_replen, &req->rq_repbuf); 
273         if (rc) { 
274                 printk("ost_setattr: cannot pack reply\n"); 
275                 return rc;
276         }
277
278         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_disconnect(&conn);
279
280         EXIT;
281         return 0;
282 }
283
284 static int ost_get_info(struct ost_obd *ost, struct ost_request *req)
285 {
286         struct obd_conn conn; 
287         int rc;
288         int vallen;
289         void *val;
290
291         ENTRY;
292         
293         conn.oc_id = req->rq_req->connid;
294         conn.oc_dev = ost->ost_tgt;
295
296         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_get_info
297                 (&conn, req->rq_req->buflen1, req->rq_req->buf1, &vallen, &val); 
298
299
300         rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr, &req->rq_rep,
301                           &req->rq_replen, &req->rq_repbuf); 
302         if (rc) { 
303                 printk("ost_setattr: cannot pack reply\n"); 
304                 return rc;
305         }
306
307         EXIT;
308         return 0;
309 }
310
311
312
313 //int ost_handle(struct ost_conn *conn, int len, char *buf)
314 int ost_handle(struct obd_device *obddev, struct ost_request *req)
315 {
316         int rc;
317         struct ost_obd *ost = &obddev->u.ost;
318         struct ost_req_hdr *hdr;
319
320         ENTRY;
321         printk("ost_handle: req at %p\n", req); 
322
323         hdr = (struct ost_req_hdr *)req->rq_reqbuf;
324         if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
325                 printk("lustre_ost: wrong packet type sent %d\n",
326                        NTOH__u32(hdr->type));
327                 rc = -EINVAL;
328                 goto out;
329         }
330
331         rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
332                             &req->rq_reqhdr, &req->rq_req);
333         if (rc) { 
334                 printk("lustre_ost: Invalid request\n");
335                 EXIT; 
336                 goto out;
337         }
338
339         switch (req->rq_reqhdr->opc) { 
340
341         case OST_CONNECT:
342                 CDEBUG(D_INODE, "connect\n");
343                 printk("----> connect \n"); 
344                 rc = ost_connect(ost, req);
345                 break;
346         case OST_DISCONNECT:
347                 CDEBUG(D_INODE, "disconnect\n");
348                 rc = ost_disconnect(ost, req);
349                 break;
350         case OST_GET_INFO:
351                 CDEBUG(D_INODE, "get_info\n");
352                 rc = ost_get_info(ost, req);
353                 break;
354         case OST_CREATE:
355                 CDEBUG(D_INODE, "create\n");
356                 rc = ost_create(ost, req);
357                 break;
358         case OST_DESTROY:
359                 CDEBUG(D_INODE, "destroy\n");
360                 rc = ost_destroy(ost, req);
361                 break;
362         case OST_GETATTR:
363                 CDEBUG(D_INODE, "getattr\n");
364                 rc = ost_getattr(ost, req);
365                 break;
366         case OST_SETATTR:
367                 CDEBUG(D_INODE, "setattr\n");
368                 rc = ost_setattr(ost, req);
369                 break;
370
371         default:
372                 return ost_error(obddev, req);
373         }
374
375 out:
376         req->rq_rephdr->status = rc;
377         if (rc) { 
378                 printk("ost: processing error %d\n", rc);
379                 ost_error(obddev, req);
380         } else { 
381                 CDEBUG(D_INODE, "sending reply\n"); 
382                 ost_reply(obddev, req); 
383         }
384
385         return 0;
386 }
387
388 int ost_main(void *arg)
389 {
390         struct obd_device *obddev = (struct obd_device *) arg;
391         struct ost_obd *ost = &obddev->u.ost;
392         ENTRY;
393         printk("---> %d\n", __LINE__);
394
395
396         lock_kernel();
397         printk("---> %d\n", __LINE__);
398         daemonize();
399         printk("---> %d\n", __LINE__);
400         spin_lock_irq(&current->sigmask_lock);
401         printk("---> %d\n", __LINE__);
402         sigfillset(&current->blocked);
403         printk("---> %d\n", __LINE__);
404         recalc_sigpending(current);
405         printk("---> %d\n", __LINE__);
406         spin_unlock_irq(&current->sigmask_lock);
407         printk("---> %d\n", __LINE__);
408
409         printk("---> %d\n", __LINE__);
410         sprintf(current->comm, "lustre_ost");
411         printk("---> %d\n", __LINE__);
412
413         /* Record that the  thread is running */
414         ost->ost_thread = current;
415         printk("---> %d\n", __LINE__);
416         wake_up(&ost->ost_done_waitq); 
417         printk("---> %d\n", __LINE__);
418
419         /* XXX maintain a list of all managed devices: insert here */
420
421         /* And now, wait forever for commit wakeup events. */
422         while (1) {
423                 struct ost_request *request;
424                 int rc; 
425
426                 if (ost->ost_flags & OST_EXIT)
427                         break;
428
429
430                 wake_up(&ost->ost_done_waitq);
431                 interruptible_sleep_on(&ost->ost_waitq);
432
433                 CDEBUG(D_INODE, "lustre_ost wakes\n");
434                 CDEBUG(D_INODE, "pick up req here and continue\n"); 
435
436                 if (list_empty(&ost->ost_reqs)) { 
437                         CDEBUG(D_INODE, "woke because of timer\n"); 
438                 } else { 
439                         printk("---> %d\n", __LINE__);
440                         request = list_entry(ost->ost_reqs.next, 
441                                              struct ost_request, rq_list);
442                         printk("---> %d\n", __LINE__);
443                         list_del(&request->rq_list);
444                         rc = ost_handle(obddev, request); 
445                 }
446         }
447
448         /* XXX maintain a list of all managed devices: cleanup here */
449         printk("---> %d\n", __LINE__);
450         ost->ost_thread = NULL;
451         printk("---> %d\n", __LINE__);
452         wake_up(&ost->ost_done_waitq);
453         printk("lustre_ost: exiting\n");
454         return 0;
455 }
456
457 static void ost_stop_srv_thread(struct ost_obd *ost)
458 {
459         ost->ost_flags |= OST_EXIT;
460
461         while (ost->ost_thread) {
462                 wake_up(&ost->ost_waitq);
463                 sleep_on(&ost->ost_done_waitq);
464         }
465 }
466
467 static void ost_start_srv_thread(struct obd_device *obd)
468 {
469         struct ost_obd *ost = &obd->u.ost;
470         ENTRY;
471
472         init_waitqueue_head(&ost->ost_waitq);
473         printk("---> %d\n", __LINE__);
474         init_waitqueue_head(&ost->ost_done_waitq);
475         printk("---> %d\n", __LINE__);
476         kernel_thread(ost_main, (void *)obd, 
477                       CLONE_VM | CLONE_FS | CLONE_FILES);
478         printk("---> %d\n", __LINE__);
479         while (!ost->ost_thread) 
480                 sleep_on(&ost->ost_done_waitq);
481         printk("---> %d\n", __LINE__);
482         EXIT;
483 }
484
485 /* mount the file system (secretly) */
486 static int ost_setup(struct obd_device *obddev, obd_count len,
487                         void *buf)
488                         
489 {
490         struct obd_ioctl_data* data = buf;
491         struct ost_obd *ost = &obddev->u.ost;
492         struct obd_device *tgt;
493         int err; 
494         ENTRY;
495
496         if (data->ioc_dev  < 0 || data->ioc_dev > MAX_OBD_DEVICES) { 
497                 EXIT;
498                 return -ENODEV;
499         }
500
501         tgt = &obd_dev[data->ioc_dev];
502         ost->ost_tgt = tgt;
503         if ( ! (tgt->obd_flags & OBD_ATTACHED) || 
504              ! (tgt->obd_flags & OBD_SET_UP) ){
505                 printk("device not attached or not set up (%d)\n", 
506                        data->ioc_dev);
507                 EXIT;
508                 return -EINVAL;
509         } 
510
511         ost->ost_conn.oc_dev = tgt;
512         err = tgt->obd_type->typ_ops->o_connect(&ost->ost_conn);
513         if (err) { 
514                 printk("lustre ost: fail to connect to device %d\n", 
515                        data->ioc_dev); 
516                 return -EINVAL;
517         }
518
519         INIT_LIST_HEAD(&ost->ost_reqs);
520         ost->ost_thread = NULL;
521         ost->ost_flags = 0;
522
523         spin_lock_init(&obddev->u.ost.ost_lock);
524
525         ost_start_srv_thread(obddev);
526
527         MOD_INC_USE_COUNT;
528         EXIT; 
529         return 0;
530
531
532 static int ost_cleanup(struct obd_device * obddev)
533 {
534         struct ost_obd *ost = &obddev->u.ost;
535         struct obd_device *tgt;
536         int err;
537
538         ENTRY;
539
540         if ( !(obddev->obd_flags & OBD_SET_UP) ) {
541                 EXIT;
542                 return 0;
543         }
544
545         if ( !list_empty(&obddev->obd_gen_clients) ) {
546                 printk(KERN_WARNING __FUNCTION__ ": still has clients!\n");
547                 EXIT;
548                 return -EBUSY;
549         }
550
551         ost_stop_srv_thread(ost);
552
553         if (!list_empty(&ost->ost_reqs)) {
554                 // XXX reply with errors and clean up
555                 CDEBUG(D_INODE, "Request list not empty!\n");
556         }
557
558         tgt = ost->ost_tgt;
559         err = tgt->obd_type->typ_ops->o_disconnect(&ost->ost_conn);
560         if (err) { 
561                 printk("lustre ost: fail to disconnect device\n");
562                 return -EINVAL;
563         }
564         
565
566         MOD_DEC_USE_COUNT;
567         EXIT;
568         return 0;
569 }
570
571 /* use obd ops to offer management infrastructure */
572 static struct obd_ops ost_obd_ops = {
573         o_setup:       ost_setup,
574         o_cleanup:     ost_cleanup,
575 };
576
577 static int __init ost_init(void)
578 {
579         obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
580         return 0;
581 }
582
583 static void __exit ost_exit(void)
584 {
585         obd_unregister_type(LUSTRE_OST_NAME);
586 }
587
588 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
589 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
590 MODULE_LICENSE("GPL");
591
592 // for testing (maybe this stays)
593 EXPORT_SYMBOL(ost_queue_req);
594
595 module_init(ost_init);
596 module_exit(ost_exit);