Whamcloud - gitweb
this minimal patch might actually work.
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /*
2  *  linux/mds/handler.c
3  *  
4  *  Lustre Object Server Module (OST)
5  * 
6  *  Copyright (C) 2001  Cluster File Systems, Inc.
7  *
8  *  This code is issued under the GNU General Public License.
9  *  See the file COPYING in this distribution
10  *
11  *  by Peter Braam <braam@clusterfs.com>
12  * 
13  *  This server is single threaded at present (but can easily be multi threaded). 
14  *  For testing and management it is treated as an obd_device, although it does
15  *  not export a full OBD method table (the requests are coming in over the wire, 
16  *  so object target modules do not have a full method table.)
17  * 
18  */
19
20
21 #define EXPORT_SYMTAB
22
23 #include <linux/version.h>
24 #include <linux/module.h>
25 #include <linux/fs.h>
26 #include <linux/stat.h>
27 #include <linux/locks.h>
28 #include <linux/ext2_fs.h>
29 #include <linux/quotaops.h>
30 #include <asm/unistd.h>
31 #include <linux/obd_support.h>
32 #include <linux/obd.h>
33 #include <linux/obd_class.h>
34 #include <linux/lustre_lib.h>
35 #include <linux/lustre_idl.h>
36 #include <linux/lustre_mds.h>
37 #include <linux/obd_class.h>
38
39 // for testing
40 static int ost_queue_req(struct obd_device *obddev, struct ost_request *req)
41 {
42         struct ost_request *srv_req; 
43         struct ost_obd *ost = &obddev->u.ost;
44         
45         if (!ost) { 
46                 EXIT;
47                 return -1;
48         }
49
50         srv_req = kmalloc(sizeof(*srv_req), GFP_KERNEL); 
51         if (!srv_req) { 
52                 EXIT;
53                 return -ENOMEM;
54         }
55
56         printk("---> OST at %d %p, incoming req %p, srv_req %p\n", 
57                __LINE__, ost, req, srv_req);
58
59         memset(srv_req, 0, sizeof(*req)); 
60
61         /* move the request buffer */
62         srv_req->rq_reqbuf = req->rq_reqbuf;
63         srv_req->rq_reqlen    = req->rq_reqlen;
64         srv_req->rq_obd = ost;
65
66         /* remember where it came from */
67         srv_req->rq_reply_handle = req;
68
69         list_add(&srv_req->rq_list, &ost->ost_reqs); 
70         wake_up(&ost->ost_waitq);
71         return 0;
72 }
73
74
75 /* XXX replace with networking code */
76 int ost_reply(struct obd_device *obddev, struct ost_request *req)
77 {
78         struct ost_request *clnt_req = req->rq_reply_handle;
79
80         ENTRY;
81         printk("ost_reply: req %p clnt_req at %p\n", req, clnt_req); 
82
83         /* free the request buffer */
84         kfree(req->rq_reqbuf);
85         req->rq_reqbuf = NULL; 
86         
87         /* move the reply to the client */ 
88         clnt_req->rq_replen = req->rq_replen;
89         clnt_req->rq_repbuf = req->rq_repbuf;
90
91         printk("---> client req %p repbuf %p len %d status %d\n", 
92                clnt_req, clnt_req->rq_repbuf, clnt_req->rq_replen, 
93                req->rq_rephdr->status); 
94
95         req->rq_repbuf = NULL;
96         req->rq_replen = 0;
97         
98         /* free the server request */
99         kfree(req); 
100         /* wake up the client */ 
101         wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
102         EXIT;
103         return 0;
104 }
105
106 int ost_error(struct obd_device *obddev, struct ost_request *req)
107 {
108         struct ost_rep_hdr *hdr;
109
110         ENTRY;
111         hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
112         if (!hdr) { 
113                 EXIT;
114                 return -ENOMEM;
115         }
116
117         memset(hdr, 0, sizeof(*hdr));
118         
119         hdr->seqno = req->rq_reqhdr->seqno;
120         hdr->status = req->rq_status; 
121         hdr->type = OST_TYPE_ERR;
122
123         req->rq_repbuf = (char *)hdr;
124         req->rq_replen = sizeof(*hdr); 
125
126         EXIT;
127         return ost_reply(obddev, req);
128 }
129
130 static int ost_destroy(struct ost_obd *ost, struct ost_request *req)
131 {
132         struct obd_conn conn; 
133         int rc;
134
135         ENTRY;
136         
137         conn.oc_id = req->rq_req->connid;
138         conn.oc_dev = ost->ost_tgt;
139
140         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
141                           &req->rq_replen, &req->rq_repbuf); 
142         if (rc) { 
143                 printk("ost_destroy: cannot pack reply\n"); 
144                 return rc;
145         }
146
147         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_destroy
148                 (&conn, &req->rq_req->oa); 
149
150         EXIT;
151         return 0;
152 }
153
154 static int ost_getattr(struct ost_obd *ost, struct ost_request *req)
155 {
156         struct obd_conn conn; 
157         int rc;
158
159         ENTRY;
160         printk("ost getattr entered\n"); 
161         
162         conn.oc_id = req->rq_req->connid;
163         conn.oc_dev = ost->ost_tgt;
164
165         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
166                           &req->rq_replen, &req->rq_repbuf); 
167         if (rc) { 
168                 printk("ost_getattr: cannot pack reply\n"); 
169                 return rc;
170         }
171         req->rq_rep->oa.o_id = req->rq_req->oa.o_id;
172         req->rq_rep->oa.o_valid = req->rq_req->oa.o_valid;
173
174         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_getattr
175                 (&conn, &req->rq_rep->oa); 
176
177         EXIT;
178         return 0;
179 }
180
181 static int ost_create(struct ost_obd *ost, struct ost_request *req)
182 {
183         struct obd_conn conn; 
184         int rc;
185
186         ENTRY;
187         
188         conn.oc_id = req->rq_req->connid;
189         conn.oc_dev = ost->ost_tgt;
190
191         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
192                           &req->rq_replen, &req->rq_repbuf); 
193         if (rc) { 
194                 printk("ost_create: cannot pack reply\n"); 
195                 return rc;
196         }
197
198         memcpy(&req->rq_rep->oa, &req->rq_req->oa, sizeof(req->rq_req->oa));
199
200         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_create
201                 (&conn, &req->rq_rep->oa); 
202
203         EXIT;
204         return 0;
205 }
206
207
208 static int ost_setattr(struct ost_obd *ost, struct ost_request *req)
209 {
210         struct obd_conn conn; 
211         int rc;
212
213         ENTRY;
214         
215         conn.oc_id = req->rq_req->connid;
216         conn.oc_dev = ost->ost_tgt;
217
218         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
219                           &req->rq_replen, &req->rq_repbuf); 
220         if (rc) { 
221                 printk("ost_setattr: cannot pack reply\n"); 
222                 return rc;
223         }
224
225         memcpy(&req->rq_rep->oa, &req->rq_req->oa, sizeof(req->rq_req->oa));
226
227         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_setattr
228                 (&conn, &req->rq_rep->oa); 
229
230         EXIT;
231         return 0;
232 }
233
234 static int ost_connect(struct ost_obd *ost, struct ost_request *req)
235 {
236         struct obd_conn conn; 
237         int rc;
238
239         ENTRY;
240         
241         conn.oc_dev = ost->ost_tgt;
242
243         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
244                           &req->rq_replen, &req->rq_repbuf); 
245         if (rc) { 
246                 printk("ost_setattr: cannot pack reply\n"); 
247                 return rc;
248         }
249
250         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_connect(&conn);
251
252         printk("ost_connect: rep buffer %p, id %d\n", req->rq_repbuf, 
253                conn.oc_id);
254         req->rq_rep->connid = conn.oc_id;
255         EXIT;
256         return 0;
257 }
258
259
260 static int ost_disconnect(struct ost_obd *ost, struct ost_request *req)
261 {
262         struct obd_conn conn; 
263         int rc;
264
265         ENTRY;
266         
267         conn.oc_dev = ost->ost_tgt;
268         conn.oc_id = req->rq_req->connid;
269
270         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
271                           &req->rq_replen, &req->rq_repbuf); 
272         if (rc) { 
273                 printk("ost_setattr: cannot pack reply\n"); 
274                 return rc;
275         }
276
277         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_disconnect(&conn);
278
279         EXIT;
280         return 0;
281 }
282
283 static int ost_get_info(struct ost_obd *ost, struct ost_request *req)
284 {
285         struct obd_conn conn; 
286         int rc;
287         int vallen;
288         void *val;
289
290         ENTRY;
291         
292         conn.oc_id = req->rq_req->connid;
293         conn.oc_dev = ost->ost_tgt;
294
295         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_get_info
296                 (&conn, req->rq_req->buflen1, req->rq_req->buf1, &vallen, &val); 
297
298
299         rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr, &req->rq_rep,
300                           &req->rq_replen, &req->rq_repbuf); 
301         if (rc) { 
302                 printk("ost_setattr: cannot pack reply\n"); 
303                 return rc;
304         }
305
306         EXIT;
307         return 0;
308 }
309
310
311
312 //int ost_handle(struct ost_conn *conn, int len, char *buf)
313 int ost_handle(struct obd_device *obddev, struct ost_request *req)
314 {
315         int rc;
316         struct ost_obd *ost = &obddev->u.ost;
317         struct ost_req_hdr *hdr;
318
319         ENTRY;
320         printk("ost_handle: req at %p\n", req); 
321
322         hdr = (struct ost_req_hdr *)req->rq_reqbuf;
323         if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
324                 printk("lustre_ost: wrong packet type sent %d\n",
325                        NTOH__u32(hdr->type));
326                 rc = -EINVAL;
327                 goto out;
328         }
329
330         rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
331                             &req->rq_reqhdr, &req->rq_req);
332         if (rc) { 
333                 printk("lustre_ost: Invalid request\n");
334                 EXIT; 
335                 goto out;
336         }
337
338         switch (req->rq_reqhdr->opc) { 
339
340         case OST_CONNECT:
341                 CDEBUG(D_INODE, "connect\n");
342                 printk("----> connect \n"); 
343                 rc = ost_connect(ost, req);
344                 break;
345         case OST_DISCONNECT:
346                 CDEBUG(D_INODE, "disconnect\n");
347                 rc = ost_disconnect(ost, req);
348                 break;
349         case OST_GET_INFO:
350                 CDEBUG(D_INODE, "get_info\n");
351                 rc = ost_get_info(ost, req);
352                 break;
353         case OST_CREATE:
354                 CDEBUG(D_INODE, "create\n");
355                 rc = ost_create(ost, req);
356                 break;
357         case OST_DESTROY:
358                 CDEBUG(D_INODE, "destroy\n");
359                 rc = ost_destroy(ost, req);
360                 break;
361         case OST_GETATTR:
362                 CDEBUG(D_INODE, "getattr\n");
363                 rc = ost_getattr(ost, req);
364                 break;
365         case OST_SETATTR:
366                 CDEBUG(D_INODE, "setattr\n");
367                 rc = ost_setattr(ost, req);
368                 break;
369
370         default:
371                 return ost_error(obddev, req);
372         }
373
374 out:
375         req->rq_rephdr->status = rc;
376         if (rc) { 
377                 printk("ost: processing error %d\n", rc);
378                 ost_error(obddev, req);
379         } else { 
380                 CDEBUG(D_INODE, "sending reply\n"); 
381                 ost_reply(obddev, req); 
382         }
383
384         return 0;
385 }
386
387 int ost_main(void *arg)
388 {
389         struct obd_device *obddev = (struct obd_device *) arg;
390         struct ost_obd *ost = &obddev->u.ost;
391         ENTRY;
392         printk("---> %d\n", __LINE__);
393
394
395         lock_kernel();
396         printk("---> %d\n", __LINE__);
397         daemonize();
398         printk("---> %d\n", __LINE__);
399         spin_lock_irq(&current->sigmask_lock);
400         printk("---> %d\n", __LINE__);
401         sigfillset(&current->blocked);
402         printk("---> %d\n", __LINE__);
403         recalc_sigpending(current);
404         printk("---> %d\n", __LINE__);
405         spin_unlock_irq(&current->sigmask_lock);
406         printk("---> %d\n", __LINE__);
407
408         printk("---> %d\n", __LINE__);
409         sprintf(current->comm, "lustre_ost");
410         printk("---> %d\n", __LINE__);
411
412         /* Record that the  thread is running */
413         ost->ost_thread = current;
414         printk("---> %d\n", __LINE__);
415         wake_up(&ost->ost_done_waitq); 
416         printk("---> %d\n", __LINE__);
417
418         /* XXX maintain a list of all managed devices: insert here */
419
420         /* And now, wait forever for commit wakeup events. */
421         while (1) {
422                 struct ost_request *request;
423                 int rc; 
424
425                 if (ost->ost_flags & OST_EXIT)
426                         break;
427
428
429                 wake_up(&ost->ost_done_waitq);
430                 interruptible_sleep_on(&ost->ost_waitq);
431
432                 CDEBUG(D_INODE, "lustre_ost wakes\n");
433                 CDEBUG(D_INODE, "pick up req here and continue\n"); 
434
435                 if (list_empty(&ost->ost_reqs)) { 
436                         CDEBUG(D_INODE, "woke because of timer\n"); 
437                 } else { 
438                         printk("---> %d\n", __LINE__);
439                         request = list_entry(ost->ost_reqs.next, 
440                                              struct ost_request, rq_list);
441                         printk("---> %d\n", __LINE__);
442                         list_del(&request->rq_list);
443                         rc = ost_handle(obddev, request); 
444                 }
445         }
446
447         /* XXX maintain a list of all managed devices: cleanup here */
448         printk("---> %d\n", __LINE__);
449         ost->ost_thread = NULL;
450         printk("---> %d\n", __LINE__);
451         wake_up(&ost->ost_done_waitq);
452         printk("lustre_ost: exiting\n");
453         return 0;
454 }
455
456 static void ost_stop_srv_thread(struct ost_obd *ost)
457 {
458         ost->ost_flags |= OST_EXIT;
459
460         while (ost->ost_thread) {
461                 wake_up(&ost->ost_waitq);
462                 sleep_on(&ost->ost_done_waitq);
463         }
464 }
465
466 static void ost_start_srv_thread(struct obd_device *obd)
467 {
468         struct ost_obd *ost = &obd->u.ost;
469         ENTRY;
470
471         init_waitqueue_head(&ost->ost_waitq);
472         printk("---> %d\n", __LINE__);
473         init_waitqueue_head(&ost->ost_done_waitq);
474         printk("---> %d\n", __LINE__);
475         kernel_thread(ost_main, (void *)obd, 
476                       CLONE_VM | CLONE_FS | CLONE_FILES);
477         printk("---> %d\n", __LINE__);
478         while (!ost->ost_thread) 
479                 sleep_on(&ost->ost_done_waitq);
480         printk("---> %d\n", __LINE__);
481         EXIT;
482 }
483
484 /* mount the file system (secretly) */
485 static int ost_setup(struct obd_device *obddev, obd_count len,
486                         void *buf)
487                         
488 {
489         struct obd_ioctl_data* data = buf;
490         struct ost_obd *ost = &obddev->u.ost;
491         struct obd_device *tgt;
492         int err; 
493         ENTRY;
494
495         if (data->ioc_dev  < 0 || data->ioc_dev > MAX_OBD_DEVICES) { 
496                 EXIT;
497                 return -ENODEV;
498         }
499
500         tgt = &obd_dev[data->ioc_dev];
501         ost->ost_tgt = tgt;
502         if ( ! (tgt->obd_flags & OBD_ATTACHED) || 
503              ! (tgt->obd_flags & OBD_SET_UP) ){
504                 printk("device not attached or not set up (%d)\n", 
505                        data->ioc_dev);
506                 EXIT;
507                 return -EINVAL;
508         } 
509
510         ost->ost_conn.oc_dev = tgt;
511         err = tgt->obd_type->typ_ops->o_connect(&ost->ost_conn);
512         if (err) { 
513                 printk("lustre ost: fail to connect to device %d\n", 
514                        data->ioc_dev); 
515                 return -EINVAL;
516         }
517
518         INIT_LIST_HEAD(&ost->ost_reqs);
519         ost->ost_thread = NULL;
520         ost->ost_flags = 0;
521
522         spin_lock_init(&obddev->u.ost.ost_lock);
523
524         ost_start_srv_thread(obddev);
525
526         MOD_INC_USE_COUNT;
527         EXIT; 
528         return 0;
529
530
531 static int ost_cleanup(struct obd_device * obddev)
532 {
533         struct ost_obd *ost = &obddev->u.ost;
534         struct obd_device *tgt;
535         int err;
536
537         ENTRY;
538
539         if ( !(obddev->obd_flags & OBD_SET_UP) ) {
540                 EXIT;
541                 return 0;
542         }
543
544         if ( !list_empty(&obddev->obd_gen_clients) ) {
545                 printk(KERN_WARNING __FUNCTION__ ": still has clients!\n");
546                 EXIT;
547                 return -EBUSY;
548         }
549
550         ost_stop_srv_thread(ost);
551
552         if (!list_empty(&ost->ost_reqs)) {
553                 // XXX reply with errors and clean up
554                 CDEBUG(D_INODE, "Request list not empty!\n");
555         }
556
557         tgt = ost->ost_tgt;
558         err = tgt->obd_type->typ_ops->o_disconnect(&ost->ost_conn);
559         if (err) { 
560                 printk("lustre ost: fail to disconnect device\n");
561                 return -EINVAL;
562         }
563         
564
565         MOD_DEC_USE_COUNT;
566         EXIT;
567         return 0;
568 }
569
570 /* use obd ops to offer management infrastructure */
571 static struct obd_ops ost_obd_ops = {
572         o_setup:       ost_setup,
573         o_cleanup:     ost_cleanup,
574 };
575
576 static int __init ost_init(void)
577 {
578         obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
579         return 0;
580 }
581
582 static void __exit ost_exit(void)
583 {
584         obd_unregister_type(LUSTRE_OST_NAME);
585 }
586
587 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
588 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
589 MODULE_LICENSE("GPL");
590
591 // for testing (maybe this stays)
592 EXPORT_SYMBOL(ost_queue_req);
593
594 module_init(ost_init);
595 module_exit(ost_exit);