Whamcloud - gitweb
6e87dd046e7b35b8839592960f090ca4809540a8
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /*
2  *  linux/mds/handler.c
3  *  
4  *  Lustre Object Server Module (OST)
5  * 
6  *  Copyright (C) 2001  Cluster File Systems, Inc.
7  *
8  *  This code is issued under the GNU General Public License.
9  *  See the file COPYING in this distribution
10  *
11  *  by Peter Braam <braam@clusterfs.com>
12  * 
13  *  This server is single threaded at present (but can easily be multi threaded). 
14  *  For testing and management it is treated as an obd_device, although it does
15  *  not export a full OBD method table (the requests are coming in over the wire, 
16  *  so object target modules do not have a full method table.)
17  * 
18  */
19
20
21 #define EXPORT_SYMTAB
22
23 #include <linux/version.h>
24 #include <linux/module.h>
25 #include <linux/fs.h>
26 #include <linux/stat.h>
27 #include <linux/locks.h>
28 #include <linux/ext2_fs.h>
29 #include <linux/quotaops.h>
30 #include <asm/unistd.h>
31 #include <linux/obd_support.h>
32 #include <linux/obd.h>
33 #include <linux/obd_class.h>
34 #include <linux/lustre_lib.h>
35 #include <linux/lustre_idl.h>
36 #include <linux/lustre_mds.h>
37 #include <linux/obd_class.h>
38
39 // for testing
40 static int ost_queue_req(struct obd_device *obddev, struct ost_request *req)
41 {
42         struct ost_request *srv_req; 
43         struct ost_obd *ost = &obddev->u.ost;
44         
45         if (!ost) { 
46                 EXIT;
47                 return -1;
48         }
49
50         srv_req = kmalloc(sizeof(*srv_req), GFP_KERNEL); 
51         if (!srv_req) { 
52                 EXIT;
53                 return -ENOMEM;
54         }
55
56         printk("---> OST at %d %p, incoming req %p, srv_req %p\n", 
57                __LINE__, ost, req, srv_req);
58
59         memset(srv_req, 0, sizeof(*req)); 
60         srv_req->rq_reqbuf = req->rq_reqbuf;
61         srv_req->rq_reqlen    = req->rq_reqlen;
62         srv_req->rq_obd = ost;
63         srv_req->rq_reply_handle = req;
64
65         list_add(&srv_req->rq_list, &ost->ost_reqs); 
66         wake_up(&ost->ost_waitq);
67         return 0;
68 }
69
70
71 /* XXX replace with networking code */
72 int ost_reply(struct obd_device *obddev, struct ost_request *req)
73 {
74         struct ost_request *clnt_req = req->rq_reply_handle;
75
76         ENTRY;
77         printk("ost_reply: req %p clnt_req at %p\n", req, clnt_req); 
78
79         /* free the request buffer */
80         kfree(req->rq_reqbuf);
81         req->rq_reqbuf = NULL; 
82         
83         /* move the reply to the client */ 
84         clnt_req->rq_replen = req->rq_replen;
85         clnt_req->rq_repbuf = req->rq_repbuf;
86
87         printk("---> client req %p repbuf %p len %d status %d\n", 
88                clnt_req, clnt_req->rq_repbuf, clnt_req->rq_replen, 
89                req->rq_rephdr->status); 
90
91         req->rq_repbuf = NULL;
92         req->rq_replen = 0;
93         
94         /* free the server request */
95         kfree(req); 
96         /* wake up the client */ 
97         wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
98         EXIT;
99         return 0;
100 }
101
102 int ost_error(struct obd_device *obddev, struct ost_request *req)
103 {
104         struct ost_rep_hdr *hdr;
105
106         ENTRY;
107         hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
108         if (!hdr) { 
109                 EXIT;
110                 return -ENOMEM;
111         }
112
113         memset(hdr, 0, sizeof(*hdr));
114         
115         hdr->seqno = req->rq_reqhdr->seqno;
116         hdr->status = req->rq_status; 
117         hdr->type = OST_TYPE_ERR;
118         req->rq_repbuf = (char *)hdr;
119
120         EXIT;
121         return ost_reply(obddev, req);
122 }
123
124 static int ost_destroy(struct ost_obd *ost, struct ost_request *req)
125 {
126         struct obd_conn conn; 
127         int rc;
128
129         ENTRY;
130         
131         conn.oc_id = req->rq_req->connid;
132         conn.oc_dev = ost->ost_tgt;
133
134         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
135                           &req->rq_replen, &req->rq_repbuf); 
136         if (rc) { 
137                 printk("ost_destroy: cannot pack reply\n"); 
138                 return rc;
139         }
140
141         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_destroy
142                 (&conn, &req->rq_req->oa); 
143
144         EXIT;
145         return 0;
146 }
147
148 static int ost_getattr(struct ost_obd *ost, struct ost_request *req)
149 {
150         struct obd_conn conn; 
151         int rc;
152
153         ENTRY;
154         printk("ost getattr entered\n"); 
155         
156         conn.oc_id = req->rq_req->connid;
157         conn.oc_dev = ost->ost_tgt;
158
159         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
160                           &req->rq_replen, &req->rq_repbuf); 
161         if (rc) { 
162                 printk("ost_getattr: cannot pack reply\n"); 
163                 return rc;
164         }
165         req->rq_rep->oa.o_id = req->rq_req->oa.o_id;
166         req->rq_rep->oa.o_valid = req->rq_req->oa.o_valid;
167
168         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_getattr
169                 (&conn, &req->rq_rep->oa); 
170
171         EXIT;
172         return 0;
173 }
174
175 static int ost_create(struct ost_obd *ost, struct ost_request *req)
176 {
177         struct obd_conn conn; 
178         int rc;
179
180         ENTRY;
181         
182         conn.oc_id = req->rq_req->connid;
183         conn.oc_dev = ost->ost_tgt;
184
185         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
186                           &req->rq_replen, &req->rq_repbuf); 
187         if (rc) { 
188                 printk("ost_create: cannot pack reply\n"); 
189                 return rc;
190         }
191
192         memcpy(&req->rq_rep->oa, &req->rq_req->oa, sizeof(req->rq_req->oa));
193
194         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_create
195                 (&conn, &req->rq_rep->oa); 
196
197         EXIT;
198         return 0;
199 }
200
201
202 static int ost_setattr(struct ost_obd *ost, struct ost_request *req)
203 {
204         struct obd_conn conn; 
205         int rc;
206
207         ENTRY;
208         
209         conn.oc_id = req->rq_req->connid;
210         conn.oc_dev = ost->ost_tgt;
211
212         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
213                           &req->rq_replen, &req->rq_repbuf); 
214         if (rc) { 
215                 printk("ost_setattr: cannot pack reply\n"); 
216                 return rc;
217         }
218
219         memcpy(&req->rq_rep->oa, &req->rq_req->oa, sizeof(req->rq_req->oa));
220
221         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_setattr
222                 (&conn, &req->rq_rep->oa); 
223
224         EXIT;
225         return 0;
226 }
227
228 static int ost_connect(struct ost_obd *ost, struct ost_request *req)
229 {
230         struct obd_conn conn; 
231         int rc;
232
233         ENTRY;
234         
235         conn.oc_dev = ost->ost_tgt;
236
237         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
238                           &req->rq_replen, &req->rq_repbuf); 
239         if (rc) { 
240                 printk("ost_setattr: cannot pack reply\n"); 
241                 return rc;
242         }
243
244         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_connect(&conn);
245
246         printk("ost_connect: rep buffer %p, id %d\n", req->rq_repbuf, 
247                conn.oc_id);
248         req->rq_rep->connid = conn.oc_id;
249         EXIT;
250         return 0;
251 }
252
253
254 static int ost_disconnect(struct ost_obd *ost, struct ost_request *req)
255 {
256         struct obd_conn conn; 
257         int rc;
258
259         ENTRY;
260         
261         conn.oc_dev = ost->ost_tgt;
262         conn.oc_id = req->rq_req->connid;
263
264         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
265                           &req->rq_replen, &req->rq_repbuf); 
266         if (rc) { 
267                 printk("ost_setattr: cannot pack reply\n"); 
268                 return rc;
269         }
270
271         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_disconnect(&conn);
272
273         EXIT;
274         return 0;
275 }
276
277 static int ost_get_info(struct ost_obd *ost, struct ost_request *req)
278 {
279         struct obd_conn conn; 
280         int rc;
281         int vallen;
282         void *val;
283
284         ENTRY;
285         
286         conn.oc_id = req->rq_req->connid;
287         conn.oc_dev = ost->ost_tgt;
288
289         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_get_info
290                 (&conn, req->rq_req->buflen1, req->rq_req->buf1, &vallen, &val); 
291
292
293         rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr, &req->rq_rep,
294                           &req->rq_replen, &req->rq_repbuf); 
295         if (rc) { 
296                 printk("ost_setattr: cannot pack reply\n"); 
297                 return rc;
298         }
299
300         EXIT;
301         return 0;
302 }
303
304
305
306 //int ost_handle(struct ost_conn *conn, int len, char *buf)
307 int ost_handle(struct obd_device *obddev, struct ost_request *req)
308 {
309         int rc;
310         struct ost_obd *ost = &obddev->u.ost;
311         struct ost_req_hdr *hdr;
312
313         ENTRY;
314         printk("ost_handle: req at %p\n", req); 
315
316         hdr = (struct ost_req_hdr *)req->rq_reqbuf;
317         if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
318                 printk("lustre_ost: wrong packet type sent %d\n",
319                        NTOH__u32(hdr->type));
320                 rc = -EINVAL;
321                 goto out;
322         }
323
324         rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
325                             &req->rq_reqhdr, &req->rq_req);
326         if (rc) { 
327                 printk("lustre_ost: Invalid request\n");
328                 EXIT; 
329                 goto out;
330         }
331
332         switch (req->rq_reqhdr->opc) { 
333
334         case OST_CONNECT:
335                 CDEBUG(D_INODE, "connect\n");
336                 printk("----> connect \n"); 
337                 rc = ost_connect(ost, req);
338                 break;
339         case OST_DISCONNECT:
340                 CDEBUG(D_INODE, "disconnect\n");
341                 rc = ost_disconnect(ost, req);
342                 break;
343         case OST_GET_INFO:
344                 CDEBUG(D_INODE, "get_info\n");
345                 rc = ost_get_info(ost, req);
346                 break;
347         case OST_CREATE:
348                 CDEBUG(D_INODE, "create\n");
349                 rc = ost_create(ost, req);
350                 break;
351         case OST_DESTROY:
352                 CDEBUG(D_INODE, "destroy\n");
353                 rc = ost_destroy(ost, req);
354                 break;
355         case OST_GETATTR:
356                 CDEBUG(D_INODE, "getattr\n");
357                 rc = ost_getattr(ost, req);
358                 break;
359         case OST_SETATTR:
360                 CDEBUG(D_INODE, "setattr\n");
361                 rc = ost_setattr(ost, req);
362                 break;
363
364         default:
365                 return ost_error(obddev, req);
366         }
367
368 out:
369         req->rq_rephdr->status = rc;
370         if (rc) { 
371                 printk("ost: processing error %d\n", rc);
372                 ost_error(obddev, req);
373         } else { 
374                 CDEBUG(D_INODE, "sending reply\n"); 
375                 ost_reply(obddev, req); 
376         }
377
378         return 0;
379 }
380
381 int ost_main(void *arg)
382 {
383         struct obd_device *obddev = (struct obd_device *) arg;
384         struct ost_obd *ost = &obddev->u.ost;
385         ENTRY;
386         printk("---> %d\n", __LINE__);
387
388
389         lock_kernel();
390         printk("---> %d\n", __LINE__);
391         daemonize();
392         printk("---> %d\n", __LINE__);
393         spin_lock_irq(&current->sigmask_lock);
394         printk("---> %d\n", __LINE__);
395         sigfillset(&current->blocked);
396         printk("---> %d\n", __LINE__);
397         recalc_sigpending(current);
398         printk("---> %d\n", __LINE__);
399         spin_unlock_irq(&current->sigmask_lock);
400         printk("---> %d\n", __LINE__);
401
402         printk("---> %d\n", __LINE__);
403         sprintf(current->comm, "lustre_ost");
404         printk("---> %d\n", __LINE__);
405
406         /* Record that the  thread is running */
407         ost->ost_thread = current;
408         printk("---> %d\n", __LINE__);
409         wake_up(&ost->ost_done_waitq); 
410         printk("---> %d\n", __LINE__);
411
412         /* XXX maintain a list of all managed devices: insert here */
413
414         /* And now, wait forever for commit wakeup events. */
415         while (1) {
416                 struct ost_request *request;
417                 int rc; 
418
419                 if (ost->ost_flags & OST_EXIT)
420                         break;
421
422
423                 wake_up(&ost->ost_done_waitq);
424                 interruptible_sleep_on(&ost->ost_waitq);
425
426                 CDEBUG(D_INODE, "lustre_ost wakes\n");
427                 CDEBUG(D_INODE, "pick up req here and continue\n"); 
428
429                 if (list_empty(&ost->ost_reqs)) { 
430                         CDEBUG(D_INODE, "woke because of timer\n"); 
431                 } else { 
432                         printk("---> %d\n", __LINE__);
433                         request = list_entry(ost->ost_reqs.next, 
434                                              struct ost_request, rq_list);
435                         printk("---> %d\n", __LINE__);
436                         list_del(&request->rq_list);
437                         rc = ost_handle(obddev, request); 
438                 }
439         }
440
441         /* XXX maintain a list of all managed devices: cleanup here */
442         printk("---> %d\n", __LINE__);
443         ost->ost_thread = NULL;
444         printk("---> %d\n", __LINE__);
445         wake_up(&ost->ost_done_waitq);
446         printk("lustre_ost: exiting\n");
447         return 0;
448 }
449
450 static void ost_stop_srv_thread(struct ost_obd *ost)
451 {
452         ost->ost_flags |= OST_EXIT;
453
454         while (ost->ost_thread) {
455                 wake_up(&ost->ost_waitq);
456                 sleep_on(&ost->ost_done_waitq);
457         }
458 }
459
460 static void ost_start_srv_thread(struct obd_device *obd)
461 {
462         struct ost_obd *ost = &obd->u.ost;
463         ENTRY;
464
465         init_waitqueue_head(&ost->ost_waitq);
466         printk("---> %d\n", __LINE__);
467         init_waitqueue_head(&ost->ost_done_waitq);
468         printk("---> %d\n", __LINE__);
469         kernel_thread(ost_main, (void *)obd, 
470                       CLONE_VM | CLONE_FS | CLONE_FILES);
471         printk("---> %d\n", __LINE__);
472         while (!ost->ost_thread) 
473                 sleep_on(&ost->ost_done_waitq);
474         printk("---> %d\n", __LINE__);
475         EXIT;
476 }
477
478 /* mount the file system (secretly) */
479 static int ost_setup(struct obd_device *obddev, obd_count len,
480                         void *buf)
481                         
482 {
483         struct obd_ioctl_data* data = buf;
484         struct ost_obd *ost = &obddev->u.ost;
485         struct obd_device *tgt;
486         int err; 
487         ENTRY;
488
489         if (data->ioc_dev  < 0 || data->ioc_dev > MAX_OBD_DEVICES) { 
490                 EXIT;
491                 return -ENODEV;
492         }
493
494         tgt = &obd_dev[data->ioc_dev];
495         ost->ost_tgt = tgt;
496         if ( ! (tgt->obd_flags & OBD_ATTACHED) || 
497              ! (tgt->obd_flags & OBD_SET_UP) ){
498                 printk("device not attached or not set up (%d)\n", 
499                        data->ioc_dev);
500                 EXIT;
501                 return -EINVAL;
502         } 
503
504         ost->ost_conn.oc_dev = tgt;
505         err = tgt->obd_type->typ_ops->o_connect(&ost->ost_conn);
506         if (err) { 
507                 printk("lustre ost: fail to connect to device %d\n", 
508                        data->ioc_dev); 
509                 return -EINVAL;
510         }
511
512         INIT_LIST_HEAD(&ost->ost_reqs);
513         ost->ost_thread = NULL;
514         ost->ost_flags = 0;
515
516         spin_lock_init(&obddev->u.ost.fo_lock);
517
518         ost_start_srv_thread(obddev);
519
520         MOD_INC_USE_COUNT;
521         EXIT; 
522         return 0;
523
524
525 static int ost_cleanup(struct obd_device * obddev)
526 {
527         struct ost_obd *ost = &obddev->u.ost;
528         struct obd_device *tgt;
529         int err;
530
531         ENTRY;
532
533         if ( !(obddev->obd_flags & OBD_SET_UP) ) {
534                 EXIT;
535                 return 0;
536         }
537
538         if ( !list_empty(&obddev->obd_gen_clients) ) {
539                 printk(KERN_WARNING __FUNCTION__ ": still has clients!\n");
540                 EXIT;
541                 return -EBUSY;
542         }
543
544         ost_stop_srv_thread(ost);
545
546         if (!list_empty(&ost->ost_reqs)) {
547                 // XXX reply with errors and clean up
548                 CDEBUG(D_INODE, "Request list not empty!\n");
549         }
550
551         tgt = ost->ost_tgt;
552         err = tgt->obd_type->typ_ops->o_disconnect(&ost->ost_conn);
553         if (err) { 
554                 printk("lustre ost: fail to disconnect device\n");
555                 return -EINVAL;
556         }
557         
558
559         MOD_DEC_USE_COUNT;
560         EXIT;
561         return 0;
562 }
563
564 /* use obd ops to offer management infrastructure */
565 static struct obd_ops ost_obd_ops = {
566         o_setup:       ost_setup,
567         o_cleanup:     ost_cleanup,
568 };
569
570 static int __init ost_init(void)
571 {
572         obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
573         return 0;
574 }
575
576 static void __exit ost_exit(void)
577 {
578         obd_unregister_type(LUSTRE_OST_NAME);
579 }
580
581 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
582 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
583 MODULE_LICENSE("GPL");
584
585 // for testing (maybe this stays)
586 EXPORT_SYMBOL(ost_queue_req);
587
588 module_init(ost_init);
589 module_exit(ost_exit);