Whamcloud - gitweb
Merged branch 'peter' with the tip. Pre-merge tag is 't_20020302_networking'.
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #define EXPORT_SYMTAB
34
35 #include <linux/version.h>
36 #include <linux/module.h>
37 #include <linux/fs.h>
38 #include <linux/stat.h>
39 #include <linux/locks.h>
40 #include <linux/ext2_fs.h>
41 #include <linux/quotaops.h>
42 #include <asm/unistd.h>
43
44 #define DEBUG_SUBSYSTEM S_OST
45
46 #include <linux/obd_support.h>
47 #include <linux/obd.h>
48 #include <linux/obd_class.h>
49 #include <linux/lustre_lib.h>
50 #include <linux/lustre_idl.h>
51 #include <linux/lustre_mds.h>
52 #include <linux/obd_class.h>
53
54
55
56 static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
57 {
58         struct obd_conn conn; 
59         int rc;
60
61         ENTRY;
62         
63         conn.oc_id = req->rq_req.ost->connid;
64         conn.oc_dev = ost->ost_tgt;
65
66         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
67                           &req->rq_replen, &req->rq_repbuf); 
68         if (rc) { 
69                 CERROR("cannot pack reply\n"); 
70                 return rc;
71         }
72
73         req->rq_rep.ost->result = obd_destroy(&conn, &req->rq_req.ost->oa); 
74
75         EXIT;
76         return 0;
77 }
78
79 static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
80 {
81         struct obd_conn conn; 
82         int rc;
83
84         ENTRY;
85         
86         conn.oc_id = req->rq_req.ost->connid;
87         conn.oc_dev = ost->ost_tgt;
88
89         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
90                           &req->rq_replen, &req->rq_repbuf); 
91         if (rc) { 
92                 CERROR("cannot pack reply\n"); 
93                 return rc;
94         }
95         req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id;
96         req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid;
97
98         req->rq_rep.ost->result =  obd_getattr(&conn, &req->rq_rep.ost->oa); 
99
100         EXIT;
101         return 0;
102 }
103
104 static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
105 {
106         struct obd_conn conn; 
107         int rc;
108
109         ENTRY;
110         
111         conn.oc_id = req->rq_req.ost->connid;
112         conn.oc_dev = ost->ost_tgt;
113
114         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
115                           &req->rq_replen, &req->rq_repbuf); 
116         if (rc) { 
117                 CERROR("cannot pack reply\n"); 
118                 return rc;
119         }
120
121         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
122                sizeof(req->rq_req.ost->oa));
123
124         req->rq_rep.ost->result =obd_create(&conn, &req->rq_rep.ost->oa); 
125
126         EXIT;
127         return 0;
128 }
129
130 static int ost_punch(struct ost_obd *ost, struct ptlrpc_request *req)
131 {
132         struct obd_conn conn; 
133         int rc;
134
135         ENTRY;
136         
137         conn.oc_id = req->rq_req.ost->connid;
138         conn.oc_dev = ost->ost_tgt;
139
140         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
141                           &req->rq_replen, &req->rq_repbuf); 
142         if (rc) { 
143                 CERROR("cannot pack reply\n"); 
144                 return rc;
145         }
146
147         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
148                sizeof(req->rq_req.ost->oa));
149
150         req->rq_rep.ost->result = obd_punch(&conn, &req->rq_rep.ost->oa, 
151                                             req->rq_rep.ost->oa.o_size,
152                                             req->rq_rep.ost->oa.o_blocks); 
153
154         EXIT;
155         return 0;
156 }
157
158
159 static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
160 {
161         struct obd_conn conn; 
162         int rc;
163
164         ENTRY;
165         
166         conn.oc_id = req->rq_req.ost->connid;
167         conn.oc_dev = ost->ost_tgt;
168
169         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
170                           &req->rq_replen, &req->rq_repbuf); 
171         if (rc) { 
172                 CERROR("cannot pack reply\n"); 
173                 return rc;
174         }
175
176         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
177                sizeof(req->rq_req.ost->oa));
178
179         req->rq_rep.ost->result = obd_setattr(&conn, &req->rq_rep.ost->oa); 
180
181         EXIT;
182         return 0;
183 }
184
185 static int ost_connect(struct ost_obd *ost, struct ptlrpc_request *req)
186 {
187         struct obd_conn conn; 
188         int rc;
189
190         ENTRY;
191         
192         conn.oc_dev = ost->ost_tgt;
193
194         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
195                           &req->rq_replen, &req->rq_repbuf); 
196         if (rc) { 
197                 CERROR("cannot pack reply\n"); 
198                 return rc;
199         }
200
201         req->rq_rep.ost->result = obd_connect(&conn);
202
203         CDEBUG(D_IOCTL, "rep buffer %p, id %d\n", req->rq_repbuf, conn.oc_id);
204         req->rq_rep.ost->connid = conn.oc_id;
205         EXIT;
206         return 0;
207 }
208
209 static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req)
210 {
211         struct obd_conn conn; 
212         int rc;
213
214         ENTRY;
215         
216         conn.oc_dev = ost->ost_tgt;
217         conn.oc_id = req->rq_req.ost->connid;
218
219         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
220                           &req->rq_replen, &req->rq_repbuf); 
221         if (rc) { 
222                 CERROR("cannot pack reply\n"); 
223                 return rc;
224         }
225         CDEBUG(D_IOCTL, "Disconnecting %d\n", conn.oc_id);
226         req->rq_rep.ost->result = obd_disconnect(&conn);
227
228         EXIT;
229         return 0;
230 }
231
232 static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
233 {
234         struct obd_conn conn; 
235         int rc;
236         int vallen;
237         void *val;
238         char *ptr; 
239
240         ENTRY;
241         
242         conn.oc_id = req->rq_req.ost->connid;
243         conn.oc_dev = ost->ost_tgt;
244
245         ptr = ost_req_buf1(req->rq_req.ost);
246         req->rq_rep.ost->result = obd_get_info(&conn, 
247                                                req->rq_req.ost->buflen1, ptr, 
248                                                &vallen, &val); 
249
250         rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr,
251                           &req->rq_rep, &req->rq_replen, &req->rq_repbuf); 
252         if (rc) { 
253                 CERROR("cannot pack reply\n"); 
254                 return rc;
255         }
256
257         EXIT;
258         return 0;
259 }
260
261 int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
262 {
263         struct ptlrpc_bulk_desc **bulk_vec = NULL;
264         struct ptlrpc_bulk_desc *bulk = NULL;
265         struct obd_conn conn; 
266         int rc;
267         int i, j;
268         int objcount, niocount;
269         char *tmp1, *tmp2, *end2;
270         char *res;
271         int cmd;
272         struct niobuf *nb, *src, *dst;
273         struct obd_ioobj *ioo;
274         struct ost_req *r = req->rq_req.ost;
275
276         ENTRY;
277         
278         tmp1 = ost_req_buf1(r);
279         tmp2 = ost_req_buf2(r);
280         end2 = tmp2 + req->rq_req.ost->buflen2;
281         objcount = r->buflen1 / sizeof(*ioo); 
282         niocount = r->buflen2 / sizeof(*nb); 
283         cmd = r->cmd;
284
285         conn.oc_id = req->rq_req.ost->connid;
286         conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
287
288         for (i = 0; i < objcount; i++) {
289                 ost_unpack_ioo((void *)&tmp1, &ioo);
290                 if (tmp2 + ioo->ioo_bufcnt > end2) { 
291                         rc = -EFAULT;
292                         break; 
293                 }
294                 for (j = 0; j < ioo->ioo_bufcnt; j++) {
295                         ost_unpack_niobuf((void *)&tmp2, &nb); 
296                 }
297         }
298
299         rc = ost_pack_rep(NULL, 0, NULL, niocount * sizeof(*nb),
300                           &req->rq_rephdr, &req->rq_rep,
301                           &req->rq_replen, &req->rq_repbuf);
302         if (rc) { 
303                 CERROR("cannot pack reply\n"); 
304                 return rc;
305         }
306         OBD_ALLOC(res, sizeof(struct niobuf) * niocount);
307         if (res == NULL) {
308                 EXIT;
309                 return -ENOMEM;
310         }
311
312         /* The unpackers move tmp1 and tmp2, so reset them before using */
313         tmp1 = ost_req_buf1(r);
314         tmp2 = ost_req_buf2(r);
315         req->rq_rep.ost->result = obd_preprw
316                 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
317                  niocount, (struct niobuf *)tmp2, (struct niobuf *)res); 
318
319         if (req->rq_rep.ost->result) {
320                 EXIT;
321                 goto out;
322         }
323
324         if (cmd == OBD_BRW_WRITE) {
325                 /* Setup buffers for the incoming pages, then send the niobufs
326                  * describing those buffers to the OSC. */
327                 OBD_ALLOC(bulk_vec,
328                           niocount * sizeof(struct ptlrpc_bulk_desc *));
329                 if (bulk_vec == NULL) {
330                         CERROR("cannot alloc bulk desc vector\n");
331                         return -ENOMEM;
332                 }
333                 memset(bulk_vec, 0,
334                        niocount * sizeof(struct ptlrpc_bulk_desc *));
335
336                 for (i = 0; i < niocount; i++) {
337                         struct ptlrpc_service *srv =
338                                 req->rq_obd->u.ost.ost_service;
339
340                         bulk_vec[i] = ptlrpc_prep_bulk(&req->rq_peer);
341                         if (bulk_vec[i] == NULL) {
342                                 CERROR("cannot alloc bulk desc\n");
343                                 rc = -ENOMEM;
344                                 goto out;
345                         }
346
347                         spin_lock(&srv->srv_lock);
348                         bulk_vec[i]->b_xid = srv->srv_xid++;
349                         spin_unlock(&srv->srv_lock);
350
351                         dst = &((struct niobuf *)res)[i];
352                         /* FIXME: we overload ->page with the xid of this buffer
353                          * for the benefit of the remote client */
354                         dst->page =
355                                 (void *)(unsigned long)HTON__u64(bulk_vec[i]->b_xid);
356
357                         bulk_vec[i]->b_buf = (void *)(unsigned long)dst->addr;
358                         bulk_vec[i]->b_buflen = PAGE_SIZE;
359                         bulk_vec[i]->b_portal = OSC_BULK_PORTAL;
360                         rc = ptlrpc_wait_bulk(bulk_vec[i]);
361                         if (rc)
362                                 goto out;
363
364 #if 0
365                         /* Local delivery */
366                         src = &((struct niobuf *)tmp2)[i];
367                         memcpy((void *)(unsigned long)dst->addr, 
368                                (void *)(unsigned long)src->addr, src->len);
369 #endif
370                 }
371                 barrier();
372         } else {
373                 for (i = 0; i < niocount; i++) {
374                         struct ptlrpc_service *srv =
375                                 req->rq_obd->u.ost.ost_service;
376
377                         bulk = ptlrpc_prep_bulk(&req->rq_peer);
378                         if (bulk == NULL) {
379                                 CERROR("cannot alloc bulk desc\n");
380                                 rc = -ENOMEM;
381                                 goto out;
382                         }
383
384                         spin_lock(&srv->srv_lock);
385                         bulk->b_xid = srv->srv_xid++;
386                         spin_unlock(&srv->srv_lock);
387
388                         src = &((struct niobuf *)tmp2)[i];
389
390                         bulk->b_buf = (void *)(unsigned long)src->addr;
391                         bulk->b_buflen = PAGE_SIZE;
392                         rc = ptlrpc_send_bulk(bulk, OST_BULK_PORTAL);
393                         if (rc) {
394                                 EXIT;
395                                 goto out;
396                         }
397                         wait_event_interruptible(bulk->b_waitq,
398                                                  ptlrpc_check_bulk_sent(bulk));
399
400                         if (bulk->b_flags == PTL_RPC_INTR) {
401                                 EXIT;
402                                 goto out;
403                         }
404
405                         OBD_FREE(bulk, sizeof(*bulk));
406                 }
407
408 #if 0
409                 /* Local delivery */
410                 dst = &((struct niobuf *)tmp2)[i];
411                 memcpy((void *)(unsigned long)dst->addr, 
412                        (void *)(unsigned long)src->addr, PAGE_SIZE);
413 #endif
414                 barrier();
415         }
416
417  out:
418         if (bulk != NULL)
419                 OBD_FREE(bulk, sizeof(*bulk));
420         if (bulk_vec != NULL) {
421                 for (i = 0; i < niocount; i++) {
422                         if (bulk_vec[i] != NULL)
423                                 OBD_FREE(bulk_vec[i], sizeof(*bulk));
424                 }
425                 OBD_FREE(bulk_vec,
426                          niocount * sizeof(struct ptlrpc_bulk_desc *));
427         }
428
429         EXIT;
430         return 0;
431 }
432
433 int ost_brw_complete(struct ost_obd *obddev, struct ptlrpc_request *req)
434 {
435         struct obd_conn conn; 
436         int rc, i, j, cmd;
437         int objcount, niocount;
438         char *tmp1, *tmp2, *end2;
439         struct niobuf *nb;
440         struct obd_ioobj *ioo;
441         struct ost_req *r = req->rq_req.ost;
442
443         ENTRY;
444         
445         tmp1 = ost_req_buf1(r);
446         tmp2 = ost_req_buf2(r);
447         end2 = tmp2 + req->rq_req.ost->buflen2;
448         objcount = r->buflen1 / sizeof(*ioo); 
449         niocount = r->buflen2 / sizeof(*nb); 
450         cmd = r->cmd;
451
452         conn.oc_id = req->rq_req.ost->connid;
453         conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
454
455         for (i = 0; i < objcount; i++) {
456                 ost_unpack_ioo((void *)&tmp1, &ioo);
457                 if (tmp2 + ioo->ioo_bufcnt > end2) { 
458                         rc = -EFAULT;
459                         break; 
460                 }
461                 for (j = 0; j < ioo->ioo_bufcnt; j++) {
462                         ost_unpack_niobuf((void *)&tmp2, &nb); 
463                 }
464         }
465
466         rc = ost_pack_rep(NULL, 0, NULL, 0,
467                           &req->rq_rephdr, &req->rq_rep,
468                           &req->rq_replen, &req->rq_repbuf);
469         if (rc) { 
470                 CERROR("cannot pack reply\n"); 
471                 return rc;
472         }
473
474         /* The unpackers move tmp1 and tmp2, so reset them before using */
475         tmp1 = ost_req_buf1(r);
476         tmp2 = ost_req_buf2(r);
477         req->rq_rep.ost->result = obd_commitrw
478                 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
479                  niocount, (struct niobuf *)tmp2);
480
481         return 0;
482 }
483
484 static int ost_handle(struct obd_device *obddev, 
485                struct ptlrpc_service *svc, 
486                struct ptlrpc_request *req)
487 {
488         int rc;
489         struct ost_obd *ost = &obddev->u.ost;
490         struct ptlreq_hdr *hdr;
491
492         ENTRY;
493
494         hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
495         if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
496                 CERROR("lustre_ost: wrong packet type sent %d\n",
497                        NTOH__u32(hdr->type));
498                 rc = -EINVAL;
499                 goto out;
500         }
501
502         rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
503                             &req->rq_reqhdr, &req->rq_req);
504         if (rc) { 
505                 CERROR("lustre_ost: Invalid request\n");
506                 EXIT; 
507                 goto out;
508         }
509
510         switch (req->rq_reqhdr->opc) { 
511
512         case OST_CONNECT:
513                 CDEBUG(D_INODE, "connect\n");
514                 rc = ost_connect(ost, req);
515                 break;
516         case OST_DISCONNECT:
517                 CDEBUG(D_INODE, "disconnect\n");
518                 rc = ost_disconnect(ost, req);
519                 break;
520         case OST_GET_INFO:
521                 CDEBUG(D_INODE, "get_info\n");
522                 rc = ost_get_info(ost, req);
523                 break;
524         case OST_CREATE:
525                 CDEBUG(D_INODE, "create\n");
526                 rc = ost_create(ost, req);
527                 break;
528         case OST_DESTROY:
529                 CDEBUG(D_INODE, "destroy\n");
530                 rc = ost_destroy(ost, req);
531                 break;
532         case OST_GETATTR:
533                 CDEBUG(D_INODE, "getattr\n");
534                 rc = ost_getattr(ost, req);
535                 break;
536         case OST_SETATTR:
537                 CDEBUG(D_INODE, "setattr\n");
538                 rc = ost_setattr(ost, req);
539                 break;
540         case OST_BRW:
541                 CDEBUG(D_INODE, "brw\n");
542                 rc = ost_brw(ost, req);
543                 break;
544         case OST_BRW_COMPLETE:
545                 CDEBUG(D_INODE, "brw_complete\n");
546                 rc = ost_brw_complete(ost, req);
547                 break;
548         case OST_PUNCH:
549                 CDEBUG(D_INODE, "punch\n");
550                 rc = ost_punch(ost, req);
551                 break;
552         default:
553                 req->rq_status = -ENOTSUPP;
554                 return ptlrpc_error(obddev, svc, req);
555         }
556
557 out:
558         req->rq_status = rc;
559         if (rc) { 
560                 CERROR("ost: processing error %d\n", rc);
561                 ptlrpc_error(obddev, svc, req);
562         } else { 
563                 CDEBUG(D_INODE, "sending reply\n"); 
564                 ptlrpc_reply(obddev, svc, req); 
565         }
566
567         return 0;
568 }
569
570
571 /* mount the file system (secretly) */
572 static int ost_setup(struct obd_device *obddev, obd_count len,
573                         void *buf)
574                         
575 {
576         struct obd_ioctl_data* data = buf;
577         struct ost_obd *ost = &obddev->u.ost;
578         struct obd_device *tgt;
579         int err; 
580         ENTRY;
581
582         if (data->ioc_dev  < 0 || data->ioc_dev > MAX_OBD_DEVICES) { 
583                 EXIT;
584                 return -ENODEV;
585         }
586
587         tgt = &obd_dev[data->ioc_dev];
588         ost->ost_tgt = tgt;
589         if ( ! (tgt->obd_flags & OBD_ATTACHED) || 
590              ! (tgt->obd_flags & OBD_SET_UP) ){
591                 CERROR("device not attached or not set up (%d)\n", 
592                        data->ioc_dev);
593                 EXIT;
594                 return -EINVAL;
595         } 
596
597         ost->ost_conn.oc_dev = tgt;
598         err = obd_connect(&ost->ost_conn);
599         if (err) { 
600                 CERROR("fail to connect to device %d\n", data->ioc_dev); 
601                 return -EINVAL;
602         }
603
604         ost->ost_service = ptlrpc_init_svc( 64 * 1024, 
605                                             OST_REQUEST_PORTAL,
606                                             OSC_REPLY_PORTAL,
607                                             "self", 
608                                             ost_unpack_req,
609                                             ost_pack_rep,
610                                             ost_handle);
611         if (!ost->ost_service) { 
612                 obd_disconnect(&ost->ost_conn); 
613                 return -EINVAL;
614         }
615                                             
616         rpc_register_service(ost->ost_service, "self");
617
618         err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost"); 
619         if (err) { 
620                 obd_disconnect(&ost->ost_conn); 
621                 return -EINVAL;
622         }
623                 
624         MOD_INC_USE_COUNT;
625         EXIT; 
626         return 0;
627
628
629 static int ost_cleanup(struct obd_device * obddev)
630 {
631         struct ost_obd *ost = &obddev->u.ost;
632         int err;
633
634         ENTRY;
635
636         if ( !list_empty(&obddev->obd_gen_clients) ) {
637                 CERROR("still has clients!\n");
638                 EXIT;
639                 return -EBUSY;
640         }
641
642         ptlrpc_stop_thread(ost->ost_service);
643         rpc_unregister_service(ost->ost_service);
644
645         if (!list_empty(&ost->ost_service->srv_reqs)) {
646                 // XXX reply with errors and clean up
647                 CERROR("Request list not empty!\n");
648         }
649         OBD_FREE(ost->ost_service, sizeof(*ost->ost_service));
650
651         err = obd_disconnect(&ost->ost_conn);
652         if (err) { 
653                 CERROR("lustre ost: fail to disconnect device\n");
654                 return -EINVAL;
655         }
656
657         MOD_DEC_USE_COUNT;
658         EXIT;
659         return 0;
660 }
661
662 /* use obd ops to offer management infrastructure */
663 static struct obd_ops ost_obd_ops = {
664         o_setup:       ost_setup,
665         o_cleanup:     ost_cleanup,
666 };
667
668 static int __init ost_init(void)
669 {
670         obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
671         return 0;
672 }
673
674 static void __exit ost_exit(void)
675 {
676         obd_unregister_type(LUSTRE_OST_NAME);
677 }
678
679 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
680 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
681 MODULE_LICENSE("GPL");
682
683 module_init(ost_init);
684 module_exit(ost_exit);