Whamcloud - gitweb
122e71c74fc113fae4f3c2dc3718b9306307db67
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /*
2  *  linux/mds/handler.c
3  *  
4  *  Lustre Object Server Module (OST)
5  * 
6  *  Copyright (C) 2001  Cluster File Systems, Inc.
7  *
8  *  This code is issued under the GNU General Public License.
9  *  See the file COPYING in this distribution
10  *
11  *  by Peter Braam <braam@clusterfs.com>
12  * 
13  *  This server is single threaded at present (but can easily be multi threaded). 
14  *  For testing and management it is treated as an obd_device, although it does
15  *  not export a full OBD method table (the requests are coming in over the wire, 
16  *  so object target modules do not have a full method table.)
17  * 
18  */
19
20
21 #define EXPORT_SYMTAB
22
23 #include <linux/version.h>
24 #include <linux/module.h>
25 #include <linux/fs.h>
26 #include <linux/stat.h>
27 #include <linux/locks.h>
28 #include <linux/ext2_fs.h>
29 #include <linux/quotaops.h>
30 #include <asm/unistd.h>
31 #include <linux/obd_support.h>
32 #include <linux/obd.h>
33 #include <linux/obd_class.h>
34 #include <linux/lustre_lib.h>
35 #include <linux/lustre_idl.h>
36 #include <linux/lustre_mds.h>
37 #include <linux/obd_class.h>
38
39 // for testing
40 static int ost_queue_req(struct obd_device *obddev, struct ost_request *req)
41 {
42         struct ost_request *srv_req; 
43         struct ost_obd *ost = &obddev->u.ost;
44         
45         printk("---> OST at %d %p\n", __LINE__, ost);
46         if (!ost) { 
47                 EXIT;
48                 return -1;
49         }
50
51         srv_req = kmalloc(sizeof(*srv_req), GFP_KERNEL); 
52         if (!srv_req) { 
53                 EXIT;
54                 return -ENOMEM;
55         }
56         memcpy(srv_req, req, sizeof(*req)); 
57         srv_req->rq_reply_handle = req;
58         srv_req->rq_obd = ost;
59         list_add(&srv_req->rq_list, &ost->ost_reqs); 
60
61         wake_up(&ost->ost_waitq);
62         return 0;
63 }
64
65
66 /* XXX replace with networking code */
67 int ost_reply(struct obd_device *obddev, struct ost_request *req)
68 {
69         struct ost_request *clnt_req = req->rq_reply_handle;
70
71         ENTRY;
72
73         /* free the request buffer */
74         kfree(req->rq_reqbuf);
75         req->rq_reqbuf = NULL; 
76         
77         /* move the reply to the client */ 
78         clnt_req->rq_replen = req->rq_replen;
79         clnt_req->rq_repbuf = req->rq_repbuf;
80         req->rq_repbuf = NULL;
81         req->rq_replen = 0;
82         
83         /* free the server request */
84         kfree(req); 
85         /* wake up the client */ 
86         wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
87         EXIT;
88         return 0;
89 }
90
91 int ost_error(struct obd_device *obddev, struct ost_request *req)
92 {
93         struct ost_rep_hdr *hdr;
94
95         ENTRY;
96         hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
97         if (!hdr) { 
98                 EXIT;
99                 return -ENOMEM;
100         }
101
102         memset(hdr, 0, sizeof(*hdr));
103         
104         hdr->seqno = req->rq_reqhdr->seqno;
105         hdr->status = req->rq_status; 
106         hdr->type = OST_TYPE_ERR;
107         req->rq_repbuf = (char *)hdr;
108
109         EXIT;
110         return ost_reply(obddev, req);
111 }
112
113 static int ost_destroy(struct ost_obd *ost, struct ost_request *req)
114 {
115         struct obd_conn conn; 
116         int rc;
117
118         ENTRY;
119         
120         conn.oc_id = req->rq_req->connid;
121         conn.oc_dev = ost->ost_tgt;
122
123         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
124                           &req->rq_replen, &req->rq_repbuf); 
125         if (rc) { 
126                 printk("ost_destroy: cannot pack reply\n"); 
127                 return rc;
128         }
129
130         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_destroy
131                 (&conn, &req->rq_req->oa); 
132
133         EXIT;
134         return 0;
135 }
136
137 static int ost_getattr(struct ost_obd *ost, struct ost_request *req)
138 {
139         struct obd_conn conn; 
140         int rc;
141
142         ENTRY;
143         
144         conn.oc_id = req->rq_req->connid;
145         conn.oc_dev = ost->ost_tgt;
146
147         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
148                           &req->rq_replen, &req->rq_repbuf); 
149         if (rc) { 
150                 printk("ost_getattr: cannot pack reply\n"); 
151                 return rc;
152         }
153         req->rq_rep->oa.o_id = req->rq_req->oa.o_id;
154
155         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_getattr
156                 (&conn, &req->rq_rep->oa); 
157
158         EXIT;
159         return 0;
160 }
161
162 static int ost_create(struct ost_obd *ost, struct ost_request *req)
163 {
164         struct obd_conn conn; 
165         int rc;
166
167         ENTRY;
168         
169         conn.oc_id = req->rq_req->connid;
170         conn.oc_dev = ost->ost_tgt;
171
172         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
173                           &req->rq_replen, &req->rq_repbuf); 
174         if (rc) { 
175                 printk("ost_create: cannot pack reply\n"); 
176                 return rc;
177         }
178
179         memcpy(&req->rq_rep->oa, &req->rq_req->oa, sizeof(req->rq_req->oa));
180
181         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_create
182                 (&conn, &req->rq_rep->oa); 
183
184         EXIT;
185         return 0;
186 }
187
188
189 static int ost_setattr(struct ost_obd *ost, struct ost_request *req)
190 {
191         struct obd_conn conn; 
192         int rc;
193
194         ENTRY;
195         
196         conn.oc_id = req->rq_req->connid;
197         conn.oc_dev = ost->ost_tgt;
198
199         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
200                           &req->rq_replen, &req->rq_repbuf); 
201         if (rc) { 
202                 printk("ost_setattr: cannot pack reply\n"); 
203                 return rc;
204         }
205
206         memcpy(&req->rq_rep->oa, &req->rq_req->oa, sizeof(req->rq_req->oa));
207
208         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_setattr
209                 (&conn, &req->rq_rep->oa); 
210
211         EXIT;
212         return 0;
213 }
214
215 static int ost_connect(struct ost_obd *ost, struct ost_request *req)
216 {
217         struct obd_conn conn; 
218         int rc;
219
220         ENTRY;
221         
222         conn.oc_dev = ost->ost_tgt;
223
224         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
225                           &req->rq_replen, &req->rq_repbuf); 
226         if (rc) { 
227                 printk("ost_setattr: cannot pack reply\n"); 
228                 return rc;
229         }
230
231         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_connect(&conn);
232         req->rq_rep->connid = conn.oc_id;
233
234         EXIT;
235         return 0;
236 }
237
238
239 static int ost_disconnect(struct ost_obd *ost, struct ost_request *req)
240 {
241         struct obd_conn conn; 
242         int rc;
243
244         ENTRY;
245         
246         conn.oc_dev = ost->ost_tgt;
247         conn.oc_id = req->rq_req->connid;
248
249         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
250                           &req->rq_replen, &req->rq_repbuf); 
251         if (rc) { 
252                 printk("ost_setattr: cannot pack reply\n"); 
253                 return rc;
254         }
255
256         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_disconnect(&conn);
257
258         EXIT;
259         return 0;
260 }
261
262 static int ost_get_info(struct ost_obd *ost, struct ost_request *req)
263 {
264         struct obd_conn conn; 
265         int rc;
266         int vallen;
267         void *val;
268
269         ENTRY;
270         
271         conn.oc_id = req->rq_req->connid;
272         conn.oc_dev = ost->ost_tgt;
273
274         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_get_info
275                 (&conn, req->rq_req->buflen1, req->rq_req->buf1, &vallen, &val); 
276
277
278         rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr, &req->rq_rep,
279                           &req->rq_replen, &req->rq_repbuf); 
280         if (rc) { 
281                 printk("ost_setattr: cannot pack reply\n"); 
282                 return rc;
283         }
284
285         EXIT;
286         return 0;
287 }
288
289
290
291 //int ost_handle(struct ost_conn *conn, int len, char *buf)
292 int ost_handle(struct obd_device *obddev, struct ost_request *req)
293 {
294         int rc;
295         struct ost_obd *ost = &obddev->u.ost;
296         struct ost_req_hdr *hdr;
297
298         ENTRY;
299
300         hdr = (struct ost_req_hdr *)req->rq_reqbuf;
301
302         if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
303                 printk("lustre_ost: wrong packet type sent %d\n",
304                        NTOH__u32(hdr->type));
305                 rc = -EINVAL;
306                 goto out;
307         }
308
309         rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
310                             &req->rq_reqhdr, &req->rq_req);
311         if (rc) { 
312                 printk("lustre_ost: Invalid request\n");
313                 EXIT; 
314                 goto out;
315         }
316
317         switch (req->rq_reqhdr->opc) { 
318
319         case OST_CONNECT:
320                 CDEBUG(D_INODE, "connect\n");
321                 rc = ost_connect(ost, req);
322                 break;
323         case OST_DISCONNECT:
324                 CDEBUG(D_INODE, "disconnect\n");
325                 rc = ost_disconnect(ost, req);
326                 break;
327         case OST_GET_INFO:
328                 CDEBUG(D_INODE, "get_info\n");
329                 rc = ost_get_info(ost, req);
330                 break;
331         case OST_CREATE:
332                 CDEBUG(D_INODE, "create\n");
333                 rc = ost_create(ost, req);
334                 break;
335         case OST_DESTROY:
336                 CDEBUG(D_INODE, "destroy\n");
337                 rc = ost_destroy(ost, req);
338                 break;
339         case OST_GETATTR:
340                 CDEBUG(D_INODE, "getattr\n");
341                 rc = ost_getattr(ost, req);
342                 break;
343         case OST_SETATTR:
344                 CDEBUG(D_INODE, "setattr\n");
345                 rc = ost_setattr(ost, req);
346                 break;
347
348         default:
349                 return ost_error(obddev, req);
350         }
351
352 out:
353         if (rc) { 
354                 printk("ost: processing error %d\n", rc);
355                 ost_error(obddev, req);
356         } else { 
357                 CDEBUG(D_INODE, "sending reply\n"); 
358                 ost_reply(obddev, req); 
359         }
360
361         return 0;
362 }
363
364 int ost_main(void *arg)
365 {
366         struct obd_device *obddev = (struct obd_device *) arg;
367         struct ost_obd *ost = &obddev->u.ost;
368         ENTRY;
369         printk("---> %d\n", __LINE__);
370
371
372         lock_kernel();
373         printk("---> %d\n", __LINE__);
374         daemonize();
375         printk("---> %d\n", __LINE__);
376         spin_lock_irq(&current->sigmask_lock);
377         printk("---> %d\n", __LINE__);
378         sigfillset(&current->blocked);
379         printk("---> %d\n", __LINE__);
380         recalc_sigpending(current);
381         printk("---> %d\n", __LINE__);
382         spin_unlock_irq(&current->sigmask_lock);
383         printk("---> %d\n", __LINE__);
384
385         printk("---> %d\n", __LINE__);
386         sprintf(current->comm, "lustre_ost");
387         printk("---> %d\n", __LINE__);
388
389         /* Record that the  thread is running */
390         ost->ost_thread = current;
391         printk("---> %d\n", __LINE__);
392         wake_up(&ost->ost_done_waitq); 
393         printk("---> %d\n", __LINE__);
394
395         /* XXX maintain a list of all managed devices: insert here */
396
397         /* And now, wait forever for commit wakeup events. */
398         while (1) {
399                 struct ost_request *request;
400                 int rc; 
401
402                 if (ost->ost_flags & OST_EXIT)
403                         break;
404
405
406                 wake_up(&ost->ost_done_waitq);
407                 interruptible_sleep_on(&ost->ost_waitq);
408
409                 CDEBUG(D_INODE, "lustre_ost wakes\n");
410                 CDEBUG(D_INODE, "pick up req here and continue\n"); 
411
412                 if (list_empty(&ost->ost_reqs)) { 
413                         CDEBUG(D_INODE, "woke because of timer\n"); 
414                 } else { 
415                         printk("---> %d\n", __LINE__);
416                         request = list_entry(ost->ost_reqs.next, 
417                                              struct ost_request, rq_list);
418                         printk("---> %d\n", __LINE__);
419                         list_del(&request->rq_list);
420                         rc = ost_handle(obddev, request); 
421                 }
422         }
423
424         /* XXX maintain a list of all managed devices: cleanup here */
425         printk("---> %d\n", __LINE__);
426         ost->ost_thread = NULL;
427         printk("---> %d\n", __LINE__);
428         wake_up(&ost->ost_done_waitq);
429         printk("lustre_ost: exiting\n");
430         return 0;
431 }
432
433 static void ost_stop_srv_thread(struct ost_obd *ost)
434 {
435         ost->ost_flags |= OST_EXIT;
436
437         while (ost->ost_thread) {
438                 wake_up(&ost->ost_waitq);
439                 sleep_on(&ost->ost_done_waitq);
440         }
441 }
442
443 static void ost_start_srv_thread(struct obd_device *obd)
444 {
445         struct ost_obd *ost = &obd->u.ost;
446         ENTRY;
447
448         init_waitqueue_head(&ost->ost_waitq);
449         printk("---> %d\n", __LINE__);
450         init_waitqueue_head(&ost->ost_done_waitq);
451         printk("---> %d\n", __LINE__);
452         kernel_thread(ost_main, (void *)obd, 
453                       CLONE_VM | CLONE_FS | CLONE_FILES);
454         printk("---> %d\n", __LINE__);
455         while (!ost->ost_thread) 
456                 sleep_on(&ost->ost_done_waitq);
457         printk("---> %d\n", __LINE__);
458         EXIT;
459 }
460
461 /* mount the file system (secretly) */
462 static int ost_setup(struct obd_device *obddev, obd_count len,
463                         void *buf)
464                         
465 {
466         struct obd_ioctl_data* data = buf;
467         struct ost_obd *ost = &obddev->u.ost;
468         struct obd_device *tgt;
469         int err; 
470         ENTRY;
471
472         if (data->ioc_dev  < 0 || data->ioc_dev > MAX_OBD_DEVICES) { 
473                 EXIT;
474                 return -ENODEV;
475         }
476
477         tgt = &obd_dev[data->ioc_dev];
478         
479         if ( ! (tgt->obd_flags & OBD_ATTACHED) || 
480              ! (tgt->obd_flags & OBD_SET_UP) ){
481                 printk("device not attached or not set up (%d)\n", 
482                        data->ioc_dev);
483                 EXIT;
484                 return -EINVAL;
485         } 
486
487         ost->ost_conn.oc_dev = tgt;
488         err = tgt->obd_type->typ_ops->o_connect(&ost->ost_conn);
489         if (err) { 
490                 printk("lustre ost: fail to connect to device %d\n", 
491                        data->ioc_dev); 
492                 return -EINVAL;
493         }
494
495         printk("---> OST at %d %p\n", __LINE__, ost);
496         INIT_LIST_HEAD(&ost->ost_reqs);
497         ost->ost_thread = NULL;
498         ost->ost_flags = 0;
499
500         printk("---> %d\n", __LINE__);
501         spin_lock_init(&obddev->u.ost.fo_lock);
502
503         printk("---> %d\n", __LINE__);
504         ost_start_srv_thread(obddev);
505         printk("---> %d\n", __LINE__);
506
507         MOD_INC_USE_COUNT;
508         EXIT; 
509         return 0;
510
511
512 static int ost_cleanup(struct obd_device * obddev)
513 {
514         struct ost_obd *ost = &obddev->u.ost;
515         struct obd_device *tgt;
516         int err;
517
518         ENTRY;
519
520         if ( !(obddev->obd_flags & OBD_SET_UP) ) {
521                 EXIT;
522                 return 0;
523         }
524
525         if ( !list_empty(&obddev->obd_gen_clients) ) {
526                 printk(KERN_WARNING __FUNCTION__ ": still has clients!\n");
527                 EXIT;
528                 return -EBUSY;
529         }
530
531         ost_stop_srv_thread(ost);
532
533         if (!list_empty(&ost->ost_reqs)) {
534                 // XXX reply with errors and clean up
535                 CDEBUG(D_INODE, "Request list not empty!\n");
536         }
537
538         tgt = ost->ost_tgt;
539         err = tgt->obd_type->typ_ops->o_disconnect(&ost->ost_conn);
540         if (err) { 
541                 printk("lustre ost: fail to disconnect device\n");
542                 return -EINVAL;
543         }
544         
545
546         MOD_DEC_USE_COUNT;
547         EXIT;
548         return 0;
549 }
550
551 /* use obd ops to offer management infrastructure */
552 static struct obd_ops ost_obd_ops = {
553         o_setup:       ost_setup,
554         o_cleanup:     ost_cleanup,
555 };
556
557 static int __init ost_init(void)
558 {
559         obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
560         return 0;
561 }
562
563 static void __exit ost_exit(void)
564 {
565         obd_unregister_type(LUSTRE_OST_NAME);
566 }
567
568 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
569 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
570 MODULE_LICENSE("GPL");
571
572 // for testing (maybe this stays)
573 EXPORT_SYMBOL(ost_queue_req);
574
575 module_init(ost_init);
576 module_exit(ost_exit);