Whamcloud - gitweb
b36beca4ac5c734b2b251913cfe711457c9f2167
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /*
2  *  linux/mds/handler.c
3  *  
4  *  Lustre Object Server Module (OST)
5  * 
6  *  Copyright (C) 2001  Cluster File Systems, Inc.
7  *
8  *  This code is issued under the GNU General Public License.
9  *  See the file COPYING in this distribution
10  *
11  *  by Peter Braam <braam@clusterfs.com>
12  * 
13  *  This server is single threaded at present (but can easily be multi threaded). 
14  * 
15  */
16
17
18 #define EXPORT_SYMTAB
19
20 #include <linux/version.h>
21 #include <linux/module.h>
22 #include <linux/fs.h>
23 #include <linux/stat.h>
24 #include <linux/locks.h>
25 #include <linux/ext2_fs.h>
26 #include <linux/quotaops.h>
27 #include <asm/unistd.h>
28 #include <linux/obd_support.h>
29 #include <linux/obd.h>
30 #include <linux/obd_class.h>
31 #include <linux/lustre_lib.h>
32 #include <linux/lustre_idl.h>
33 #include <linux/lustre_mds.h>
34 #include <linux/obd_class.h>
35
36
37 // for testing
38 static struct ost_obd *OST;
39
40 // for testing
41 static int ost_queue_req(struct ost_request *req)
42 {
43         
44         if (!OST) { 
45                 EXIT;
46                 return -1;
47         }
48
49         list_add(&req->rq_list, &OST->ost_reqs); 
50         init_waitqueue_head(&req->rq_wait_for_ost_rep);
51         req->rq_obd = OST;
52         wake_up(&OST->ost_waitq);
53         printk("-- sleeping\n");
54         interruptible_sleep_on(&req->rq_wait_for_ost_rep);
55         printk("-- done\n");
56         return 0;
57 }
58
59 int ost_reply(struct ost_request *req)
60 {
61         ENTRY;
62         kfree(req->rq_reqbuf);
63         req->rq_reqbuf = NULL; 
64         wake_up_interruptible(&req->rq_wait_for_ost_rep); 
65         EXIT;
66         return 0;
67 }
68
69 int ost_error(struct ost_request *req)
70 {
71         struct ost_rep_hdr *hdr;
72
73         ENTRY;
74         hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
75         if (!hdr) { 
76                 EXIT;
77                 return -ENOMEM;
78         }
79
80         memset(hdr, 0, sizeof(*hdr));
81         
82         hdr->seqno = req->rq_reqhdr->seqno;
83         hdr->status = req->rq_status; 
84         hdr->type = OST_TYPE_ERR;
85         req->rq_repbuf = (char *)hdr;
86
87         EXIT;
88         return ost_reply(req);
89 }
90
91 static int ost_destroy(struct ost_obd *ost, struct ost_request *req)
92 {
93         struct obd_conn conn; 
94         int rc;
95
96         ENTRY;
97         
98         conn.oc_id = req->rq_req->connid;
99         conn.oc_dev = ost->ost_tgt;
100
101         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
102                           &req->rq_reqlen, &req->rq_reqbuf); 
103         if (rc) { 
104                 printk("ost_destroy: cannot pack reply\n"); 
105                 return rc;
106         }
107
108         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_destroy
109                 (&conn, &req->rq_req->oa); 
110
111         EXIT;
112         return 0;
113 }
114
115 static int ost_getattr(struct ost_obd *ost, struct ost_request *req)
116 {
117         struct obd_conn conn; 
118         int rc;
119
120         ENTRY;
121         
122         conn.oc_id = req->rq_req->connid;
123         conn.oc_dev = ost->ost_tgt;
124
125         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
126                           &req->rq_reqlen, &req->rq_reqbuf); 
127         if (rc) { 
128                 printk("ost_getattr: cannot pack reply\n"); 
129                 return rc;
130         }
131         req->rq_rep->oa.o_id = req->rq_req->oa.o_id;
132
133         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_getattr
134                 (&conn, &req->rq_rep->oa); 
135
136         EXIT;
137         return 0;
138 }
139
140 static int ost_create(struct ost_obd *ost, struct ost_request *req)
141 {
142         struct obd_conn conn; 
143         int rc;
144
145         ENTRY;
146         
147         conn.oc_id = req->rq_req->connid;
148         conn.oc_dev = ost->ost_tgt;
149
150         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
151                           &req->rq_reqlen, &req->rq_reqbuf); 
152         if (rc) { 
153                 printk("ost_create: cannot pack reply\n"); 
154                 return rc;
155         }
156
157         memcpy(&req->rq_rep->oa, &req->rq_req->oa, sizeof(req->rq_req->oa));
158
159         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_create
160                 (&conn, &req->rq_rep->oa); 
161
162         EXIT;
163         return 0;
164 }
165
166
167 static int ost_setattr(struct ost_obd *ost, struct ost_request *req)
168 {
169         struct obd_conn conn; 
170         int rc;
171
172         ENTRY;
173         
174         conn.oc_id = req->rq_req->connid;
175         conn.oc_dev = ost->ost_tgt;
176
177         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
178                           &req->rq_reqlen, &req->rq_reqbuf); 
179         if (rc) { 
180                 printk("ost_setattr: cannot pack reply\n"); 
181                 return rc;
182         }
183
184         memcpy(&req->rq_rep->oa, &req->rq_req->oa, sizeof(req->rq_req->oa));
185
186         req->rq_rep->result =ost->ost_tgt->obd_type->typ_ops->o_setattr
187                 (&conn, &req->rq_rep->oa); 
188
189         EXIT;
190         return 0;
191 }
192
193
194 //int ost_handle(struct ost_conn *conn, int len, char *buf)
195 int ost_handle(struct ost_obd *ost, struct ost_request *req)
196 {
197         int rc;
198         struct ost_req_hdr *hdr;
199
200         ENTRY;
201
202         hdr = (struct ost_req_hdr *)req->rq_reqbuf;
203
204         if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
205                 printk("lustre_ost: wrong packet type sent %d\n",
206                        NTOH__u32(hdr->type));
207                 rc = -EINVAL;
208                 goto out;
209         }
210
211         rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
212                             &req->rq_reqhdr, &req->rq_req);
213         if (rc) { 
214                 printk("lustre_ost: Invalid request\n");
215                 EXIT; 
216                 goto out;
217         }
218
219         switch (req->rq_reqhdr->opc) { 
220
221         case OST_CREATE:
222                 CDEBUG(D_INODE, "create\n");
223                 rc = ost_create(ost, req);
224                 break;
225         case OST_DESTROY:
226                 CDEBUG(D_INODE, "destroy\n");
227                 rc = ost_destroy(ost, req);
228                 break;
229         case OST_GETATTR:
230                 CDEBUG(D_INODE, "getattr\n");
231                 rc = ost_getattr(ost, req);
232                 break;
233         case OST_SETATTR:
234                 CDEBUG(D_INODE, "setattr\n");
235                 rc = ost_setattr(ost, req);
236                 break;
237
238         default:
239                 return ost_error(req);
240         }
241
242 out:
243         if (rc) { 
244                 printk("ost: processing error %d\n", rc);
245                 ost_error(req);
246         } else { 
247                 CDEBUG(D_INODE, "sending reply\n"); 
248                 ost_reply(req); 
249         }
250
251         return 0;
252 }
253
254 int ost_main(void *arg)
255 {
256         struct ost_obd *ost = (struct ost_obd *) arg;
257
258         lock_kernel();
259         daemonize();
260         spin_lock_irq(&current->sigmask_lock);
261         sigfillset(&current->blocked);
262         recalc_sigpending(current);
263         spin_unlock_irq(&current->sigmask_lock);
264
265         sprintf(current->comm, "lustre_ost");
266
267         /* Record that the  thread is running */
268         ost->ost_thread = current;
269         wake_up(&ost->ost_done_waitq); 
270
271         /* XXX maintain a list of all managed devices: insert here */
272
273         /* And now, wait forever for commit wakeup events. */
274         while (1) {
275                 struct ost_request *request;
276                 int rc; 
277
278                 if (ost->ost_flags & OST_EXIT)
279                         break;
280
281
282                 wake_up(&ost->ost_done_waitq);
283                 interruptible_sleep_on(&ost->ost_waitq);
284
285                 CDEBUG(D_INODE, "lustre_ost wakes\n");
286                 CDEBUG(D_INODE, "pick up req here and continue\n"); 
287
288                 if (list_empty(&ost->ost_reqs)) { 
289                         CDEBUG(D_INODE, "woke because of timer\n"); 
290                 } else { 
291                         request = list_entry(ost->ost_reqs.next, 
292                                              struct ost_request, rq_list);
293                         list_del(&request->rq_list);
294                         rc = ost_handle(ost, request); 
295                 }
296         }
297
298         /* XXX maintain a list of all managed devices: cleanup here */
299
300         ost->ost_thread = NULL;
301         wake_up(&ost->ost_done_waitq);
302         printk("lustre_ost: exiting\n");
303         return 0;
304 }
305
306 static void ost_stop_srv_thread(struct ost_obd *ost)
307 {
308         ost->ost_flags |= OST_EXIT;
309
310         while (ost->ost_thread) {
311                 wake_up(&ost->ost_waitq);
312                 sleep_on(&ost->ost_done_waitq);
313         }
314 }
315
316 static void ost_start_srv_thread(struct ost_obd *ost)
317 {
318         init_waitqueue_head(&ost->ost_waitq);
319         init_waitqueue_head(&ost->ost_done_waitq);
320         kernel_thread(ost_main, (void *)ost, 
321                       CLONE_VM | CLONE_FS | CLONE_FILES);
322         while (!ost->ost_thread) 
323                 sleep_on(&ost->ost_done_waitq);
324 }
325
326 /* mount the file system (secretly) */
327 static int ost_setup(struct obd_device *obddev, obd_count len,
328                         void *buf)
329                         
330 {
331         struct obd_ioctl_data* data = buf;
332         struct ost_obd *ost = &obddev->u.ost;
333         struct obd_device *tgt;
334         int err; 
335         ENTRY;
336
337         if (data->ioc_dev  < 0 || data->ioc_dev > MAX_OBD_DEVICES) { 
338                 EXIT;
339                 return -ENODEV;
340         }
341
342         tgt = &obd_dev[data->ioc_dev];
343         
344         if ( ! (tgt->obd_flags & OBD_ATTACHED) || 
345              ! (tgt->obd_flags & OBD_SET_UP) ){
346                 printk("device not attached or not set up (%d)\n", 
347                        data->ioc_dev);
348                 EXIT;
349                 return -EINVAL;
350         } 
351
352         err = tgt->obd_type->typ_ops->o_connect(&ost->ost_conn);
353         if (err) { 
354                 printk("lustre ost: fail to connect to device %d\n", 
355                        data->ioc_dev); 
356                 return -EINVAL;
357         }
358
359         INIT_LIST_HEAD(&ost->ost_reqs);
360         ost->ost_thread = NULL;
361         ost->ost_flags = 0;
362         OST = ost;
363
364         spin_lock_init(&obddev->u.ost.fo_lock);
365
366         ost_start_srv_thread(ost);
367
368         MOD_INC_USE_COUNT;
369         EXIT; 
370         return 0;
371
372
373 static int ost_cleanup(struct obd_device * obddev)
374 {
375         struct ost_obd *ost = &obddev->u.ost;
376         struct obd_device *tgt;
377         int err;
378
379         ENTRY;
380
381         if ( !(obddev->obd_flags & OBD_SET_UP) ) {
382                 EXIT;
383                 return 0;
384         }
385
386         if ( !list_empty(&obddev->obd_gen_clients) ) {
387                 printk(KERN_WARNING __FUNCTION__ ": still has clients!\n");
388                 EXIT;
389                 return -EBUSY;
390         }
391
392         OST = NULL;
393         ost_stop_srv_thread(ost);
394
395         if (!list_empty(&ost->ost_reqs)) {
396                 // XXX reply with errors and clean up
397                 CDEBUG(D_INODE, "Request list not empty!\n");
398         }
399
400         tgt = ost->ost_tgt;
401         err = tgt->obd_type->typ_ops->o_disconnect(&ost->ost_conn);
402         if (err) { 
403                 printk("lustre ost: fail to disconnect device\n");
404                 return -EINVAL;
405         }
406         
407
408         MOD_DEC_USE_COUNT;
409         EXIT;
410         return 0;
411 }
412
413 /* use obd ops to offer management infrastructure */
414 static struct obd_ops ost_obd_ops = {
415         o_setup:       ost_setup,
416         o_cleanup:     ost_cleanup,
417 };
418
419 static int __init ost_init(void)
420 {
421         obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
422         return 0;
423 }
424
425 static void __exit ost_exit(void)
426 {
427         obd_unregister_type(LUSTRE_OST_NAME);
428 }
429
430 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
431 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
432 MODULE_LICENSE("GPL");
433
434
435 // for testing (maybe this stays)
436 EXPORT_SYMBOL(ost_queue_req);
437
438 module_init(ost_init);
439 module_exit(ost_exit);