Whamcloud - gitweb
- Added an 'xid' field to struct niobuf
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #define EXPORT_SYMTAB
34
35 #include <linux/version.h>
36 #include <linux/module.h>
37 #include <linux/fs.h>
38 #include <linux/stat.h>
39 #include <linux/locks.h>
40 #include <linux/ext2_fs.h>
41 #include <linux/quotaops.h>
42 #include <asm/unistd.h>
43
44 #define DEBUG_SUBSYSTEM S_OST
45
46 #include <linux/obd_support.h>
47 #include <linux/obd.h>
48 #include <linux/obd_class.h>
49 #include <linux/lustre_lib.h>
50 #include <linux/lustre_idl.h>
51 #include <linux/lustre_mds.h>
52 #include <linux/obd_class.h>
53
54
55
56 static int ost_destroy(struct ost_obd *ost, struct ptlrpc_request *req)
57 {
58         struct obd_conn conn; 
59         int rc;
60
61         ENTRY;
62         
63         conn.oc_id = req->rq_req.ost->connid;
64         conn.oc_dev = ost->ost_tgt;
65
66         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
67                           &req->rq_replen, &req->rq_repbuf); 
68         if (rc) { 
69                 CERROR("cannot pack reply\n"); 
70                 return rc;
71         }
72
73         req->rq_rep.ost->result = obd_destroy(&conn, &req->rq_req.ost->oa); 
74
75         EXIT;
76         return 0;
77 }
78
79 static int ost_getattr(struct ost_obd *ost, struct ptlrpc_request *req)
80 {
81         struct obd_conn conn; 
82         int rc;
83
84         ENTRY;
85         
86         conn.oc_id = req->rq_req.ost->connid;
87         conn.oc_dev = ost->ost_tgt;
88
89         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
90                           &req->rq_replen, &req->rq_repbuf); 
91         if (rc) { 
92                 CERROR("cannot pack reply\n"); 
93                 return rc;
94         }
95         req->rq_rep.ost->oa.o_id = req->rq_req.ost->oa.o_id;
96         req->rq_rep.ost->oa.o_valid = req->rq_req.ost->oa.o_valid;
97
98         req->rq_rep.ost->result =  obd_getattr(&conn, &req->rq_rep.ost->oa); 
99
100         EXIT;
101         return 0;
102 }
103
104 static int ost_create(struct ost_obd *ost, struct ptlrpc_request *req)
105 {
106         struct obd_conn conn; 
107         int rc;
108
109         ENTRY;
110         
111         conn.oc_id = req->rq_req.ost->connid;
112         conn.oc_dev = ost->ost_tgt;
113
114         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
115                           &req->rq_replen, &req->rq_repbuf); 
116         if (rc) { 
117                 CERROR("cannot pack reply\n"); 
118                 return rc;
119         }
120
121         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
122                sizeof(req->rq_req.ost->oa));
123
124         req->rq_rep.ost->result =obd_create(&conn, &req->rq_rep.ost->oa); 
125
126         EXIT;
127         return 0;
128 }
129
130 static int ost_punch(struct ost_obd *ost, struct ptlrpc_request *req)
131 {
132         struct obd_conn conn; 
133         int rc;
134
135         ENTRY;
136         
137         conn.oc_id = req->rq_req.ost->connid;
138         conn.oc_dev = ost->ost_tgt;
139
140         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
141                           &req->rq_replen, &req->rq_repbuf); 
142         if (rc) { 
143                 CERROR("cannot pack reply\n"); 
144                 return rc;
145         }
146
147         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
148                sizeof(req->rq_req.ost->oa));
149
150         req->rq_rep.ost->result = obd_punch(&conn, &req->rq_rep.ost->oa, 
151                                             req->rq_rep.ost->oa.o_size,
152                                             req->rq_rep.ost->oa.o_blocks); 
153
154         EXIT;
155         return 0;
156 }
157
158
159 static int ost_setattr(struct ost_obd *ost, struct ptlrpc_request *req)
160 {
161         struct obd_conn conn; 
162         int rc;
163
164         ENTRY;
165         
166         conn.oc_id = req->rq_req.ost->connid;
167         conn.oc_dev = ost->ost_tgt;
168
169         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
170                           &req->rq_replen, &req->rq_repbuf); 
171         if (rc) { 
172                 CERROR("cannot pack reply\n"); 
173                 return rc;
174         }
175
176         memcpy(&req->rq_rep.ost->oa, &req->rq_req.ost->oa,
177                sizeof(req->rq_req.ost->oa));
178
179         req->rq_rep.ost->result = obd_setattr(&conn, &req->rq_rep.ost->oa); 
180
181         EXIT;
182         return 0;
183 }
184
185 static int ost_connect(struct ost_obd *ost, struct ptlrpc_request *req)
186 {
187         struct obd_conn conn; 
188         int rc;
189
190         ENTRY;
191         
192         conn.oc_dev = ost->ost_tgt;
193
194         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
195                           &req->rq_replen, &req->rq_repbuf); 
196         if (rc) { 
197                 CERROR("cannot pack reply\n"); 
198                 return rc;
199         }
200
201         req->rq_rep.ost->result = obd_connect(&conn);
202
203         CDEBUG(D_IOCTL, "rep buffer %p, id %d\n", req->rq_repbuf, conn.oc_id);
204         req->rq_rep.ost->connid = conn.oc_id;
205         EXIT;
206         return 0;
207 }
208
209 static int ost_disconnect(struct ost_obd *ost, struct ptlrpc_request *req)
210 {
211         struct obd_conn conn; 
212         int rc;
213
214         ENTRY;
215         
216         conn.oc_dev = ost->ost_tgt;
217         conn.oc_id = req->rq_req.ost->connid;
218
219         rc = ost_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
220                           &req->rq_replen, &req->rq_repbuf); 
221         if (rc) { 
222                 CERROR("cannot pack reply\n"); 
223                 return rc;
224         }
225         CDEBUG(D_IOCTL, "Disconnecting %d\n", conn.oc_id);
226         req->rq_rep.ost->result = obd_disconnect(&conn);
227
228         EXIT;
229         return 0;
230 }
231
232 static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
233 {
234         struct obd_conn conn; 
235         int rc;
236         int vallen;
237         void *val;
238         char *ptr; 
239
240         ENTRY;
241         
242         conn.oc_id = req->rq_req.ost->connid;
243         conn.oc_dev = ost->ost_tgt;
244
245         ptr = ost_req_buf1(req->rq_req.ost);
246         req->rq_rep.ost->result = obd_get_info(&conn, 
247                                                req->rq_req.ost->buflen1, ptr, 
248                                                &vallen, &val); 
249
250         rc = ost_pack_rep(val, vallen, NULL, 0, &req->rq_rephdr,
251                           &req->rq_rep, &req->rq_replen, &req->rq_repbuf); 
252         if (rc) { 
253                 CERROR("cannot pack reply\n"); 
254                 return rc;
255         }
256
257         EXIT;
258         return 0;
259 }
260
261 int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
262 {
263         struct ptlrpc_bulk_desc **bulk_vec = NULL;
264         struct ptlrpc_bulk_desc *bulk = NULL;
265         struct obd_conn conn; 
266         int rc;
267         int i, j;
268         int objcount, niocount;
269         char *tmp1, *tmp2, *end2;
270         char *res;
271         int cmd;
272         struct niobuf *nb, *src, *dst;
273         struct obd_ioobj *ioo;
274         struct ost_req *r = req->rq_req.ost;
275
276         ENTRY;
277         
278         tmp1 = ost_req_buf1(r);
279         tmp2 = ost_req_buf2(r);
280         end2 = tmp2 + req->rq_req.ost->buflen2;
281         objcount = r->buflen1 / sizeof(*ioo); 
282         niocount = r->buflen2 / sizeof(*nb); 
283         cmd = r->cmd;
284
285         conn.oc_id = req->rq_req.ost->connid;
286         conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
287
288         for (i = 0; i < objcount; i++) {
289                 ost_unpack_ioo((void *)&tmp1, &ioo);
290                 if (tmp2 + ioo->ioo_bufcnt > end2) { 
291                         rc = -EFAULT;
292                         break; 
293                 }
294                 for (j = 0; j < ioo->ioo_bufcnt; j++) {
295                         ost_unpack_niobuf((void *)&tmp2, &nb); 
296                 }
297         }
298
299         rc = ost_pack_rep(NULL, 0, NULL, niocount * sizeof(*nb),
300                           &req->rq_rephdr, &req->rq_rep,
301                           &req->rq_replen, &req->rq_repbuf);
302         if (rc) { 
303                 CERROR("cannot pack reply\n"); 
304                 return rc;
305         }
306         OBD_ALLOC(res, sizeof(struct niobuf) * niocount);
307         if (res == NULL) {
308                 EXIT;
309                 return -ENOMEM;
310         }
311
312         /* The unpackers move tmp1 and tmp2, so reset them before using */
313         tmp1 = ost_req_buf1(r);
314         tmp2 = ost_req_buf2(r);
315         req->rq_rep.ost->result = obd_preprw
316                 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
317                  niocount, (struct niobuf *)tmp2, (struct niobuf *)res); 
318
319         if (req->rq_rep.ost->result) {
320                 EXIT;
321                 goto out;
322         }
323
324         if (cmd == OBD_BRW_WRITE) {
325                 /* Setup buffers for the incoming pages, then send the niobufs
326                  * describing those buffers to the OSC. */
327                 OBD_ALLOC(bulk_vec,
328                           niocount * sizeof(struct ptlrpc_bulk_desc *));
329                 if (bulk_vec == NULL) {
330                         CERROR("cannot alloc bulk desc vector\n");
331                         return -ENOMEM;
332                 }
333                 memset(bulk_vec, 0,
334                        niocount * sizeof(struct ptlrpc_bulk_desc *));
335
336                 for (i = 0; i < niocount; i++) {
337                         struct ptlrpc_service *srv =
338                                 req->rq_obd->u.ost.ost_service;
339
340                         bulk_vec[i] = ptlrpc_prep_bulk(&req->rq_peer);
341                         if (bulk_vec[i] == NULL) {
342                                 CERROR("cannot alloc bulk desc\n");
343                                 rc = -ENOMEM;
344                                 goto out;
345                         }
346
347                         spin_lock(&srv->srv_lock);
348                         bulk_vec[i]->b_xid = srv->srv_xid++;
349                         spin_unlock(&srv->srv_lock);
350
351                         dst = &((struct niobuf *)res)[i];
352                         /* FIXME: we overload ->page with the xid of this buffer
353                          * for the benefit of the remote client */
354                         dst->page =
355                                 (void *)(unsigned long)HTON__u64(bulk_vec[i]->b_xid);
356
357                         bulk_vec[i]->b_buf = (void *)(unsigned long)dst->addr;
358                         bulk_vec[i]->b_buflen = PAGE_SIZE;
359                         bulk_vec[i]->b_portal = OSC_BULK_PORTAL;
360                         rc = ptlrpc_register_bulk(bulk_vec[i]);
361                         if (rc)
362                                 goto out;
363
364 #if 0
365                         /* Local delivery */
366                         src = &((struct niobuf *)tmp2)[i];
367                         memcpy((void *)(unsigned long)dst->addr, 
368                                (void *)(unsigned long)src->addr, src->len);
369 #endif
370                 }
371                 barrier();
372         } else {
373                 for (i = 0; i < niocount; i++) {
374                         bulk = ptlrpc_prep_bulk(&req->rq_peer);
375                         if (bulk == NULL) {
376                                 CERROR("cannot alloc bulk desc\n");
377                                 rc = -ENOMEM;
378                                 goto out;
379                         }
380
381                         src = &((struct niobuf *)tmp2)[i];
382
383                         bulk->b_xid = src->xid;
384                         bulk->b_buf = (void *)(unsigned long)src->addr;
385                         bulk->b_buflen = PAGE_SIZE;
386                         rc = ptlrpc_send_bulk(bulk, OST_BULK_PORTAL);
387                         if (rc) {
388                                 EXIT;
389                                 goto out;
390                         }
391                         wait_event_interruptible(bulk->b_waitq,
392                                                  ptlrpc_check_bulk_sent(bulk));
393
394                         if (bulk->b_flags == PTL_RPC_INTR) {
395                                 EXIT;
396                                 goto out;
397                         }
398
399                         OBD_FREE(bulk, sizeof(*bulk));
400                         bulk = NULL;
401                 }
402
403 #if 0
404                 /* Local delivery */
405                 dst = &((struct niobuf *)tmp2)[i];
406                 memcpy((void *)(unsigned long)dst->addr, 
407                        (void *)(unsigned long)src->addr, PAGE_SIZE);
408 #endif
409                 barrier();
410         }
411
412  out:
413         if (bulk != NULL)
414                 OBD_FREE(bulk, sizeof(*bulk));
415         if (bulk_vec != NULL) {
416                 for (i = 0; i < niocount; i++) {
417                         if (bulk_vec[i] != NULL)
418                                 OBD_FREE(bulk_vec[i], sizeof(*bulk));
419                 }
420                 OBD_FREE(bulk_vec,
421                          niocount * sizeof(struct ptlrpc_bulk_desc *));
422         }
423
424         EXIT;
425         return 0;
426 }
427
428 int ost_brw_complete(struct ost_obd *obddev, struct ptlrpc_request *req)
429 {
430         struct obd_conn conn; 
431         int rc, i, j, cmd;
432         int objcount, niocount;
433         char *tmp1, *tmp2, *end2;
434         struct niobuf *nb;
435         struct obd_ioobj *ioo;
436         struct ost_req *r = req->rq_req.ost;
437
438         ENTRY;
439         
440         tmp1 = ost_req_buf1(r);
441         tmp2 = ost_req_buf2(r);
442         end2 = tmp2 + req->rq_req.ost->buflen2;
443         objcount = r->buflen1 / sizeof(*ioo); 
444         niocount = r->buflen2 / sizeof(*nb); 
445         cmd = r->cmd;
446
447         conn.oc_id = req->rq_req.ost->connid;
448         conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
449
450         for (i = 0; i < objcount; i++) {
451                 ost_unpack_ioo((void *)&tmp1, &ioo);
452                 if (tmp2 + ioo->ioo_bufcnt > end2) { 
453                         rc = -EFAULT;
454                         break; 
455                 }
456                 for (j = 0; j < ioo->ioo_bufcnt; j++) {
457                         ost_unpack_niobuf((void *)&tmp2, &nb); 
458                 }
459         }
460
461         rc = ost_pack_rep(NULL, 0, NULL, 0,
462                           &req->rq_rephdr, &req->rq_rep,
463                           &req->rq_replen, &req->rq_repbuf);
464         if (rc) { 
465                 CERROR("cannot pack reply\n"); 
466                 return rc;
467         }
468
469         /* The unpackers move tmp1 and tmp2, so reset them before using */
470         tmp1 = ost_req_buf1(r);
471         tmp2 = ost_req_buf2(r);
472         req->rq_rep.ost->result = obd_commitrw
473                 (cmd, &conn, objcount, (struct obd_ioobj *)tmp1, 
474                  niocount, (struct niobuf *)tmp2);
475
476         return 0;
477 }
478
479 static int ost_handle(struct obd_device *obddev, 
480                struct ptlrpc_service *svc, 
481                struct ptlrpc_request *req)
482 {
483         int rc;
484         struct ost_obd *ost = &obddev->u.ost;
485         struct ptlreq_hdr *hdr;
486
487         ENTRY;
488
489         hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
490         if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
491                 CERROR("lustre_ost: wrong packet type sent %d\n",
492                        NTOH__u32(hdr->type));
493                 rc = -EINVAL;
494                 goto out;
495         }
496
497         rc = ost_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
498                             &req->rq_reqhdr, &req->rq_req);
499         if (rc) { 
500                 CERROR("lustre_ost: Invalid request\n");
501                 EXIT; 
502                 goto out;
503         }
504
505         switch (req->rq_reqhdr->opc) { 
506
507         case OST_CONNECT:
508                 CDEBUG(D_INODE, "connect\n");
509                 rc = ost_connect(ost, req);
510                 break;
511         case OST_DISCONNECT:
512                 CDEBUG(D_INODE, "disconnect\n");
513                 rc = ost_disconnect(ost, req);
514                 break;
515         case OST_GET_INFO:
516                 CDEBUG(D_INODE, "get_info\n");
517                 rc = ost_get_info(ost, req);
518                 break;
519         case OST_CREATE:
520                 CDEBUG(D_INODE, "create\n");
521                 rc = ost_create(ost, req);
522                 break;
523         case OST_DESTROY:
524                 CDEBUG(D_INODE, "destroy\n");
525                 rc = ost_destroy(ost, req);
526                 break;
527         case OST_GETATTR:
528                 CDEBUG(D_INODE, "getattr\n");
529                 rc = ost_getattr(ost, req);
530                 break;
531         case OST_SETATTR:
532                 CDEBUG(D_INODE, "setattr\n");
533                 rc = ost_setattr(ost, req);
534                 break;
535         case OST_BRW:
536                 CDEBUG(D_INODE, "brw\n");
537                 rc = ost_brw(ost, req);
538                 break;
539         case OST_BRW_COMPLETE:
540                 CDEBUG(D_INODE, "brw_complete\n");
541                 rc = ost_brw_complete(ost, req);
542                 break;
543         case OST_PUNCH:
544                 CDEBUG(D_INODE, "punch\n");
545                 rc = ost_punch(ost, req);
546                 break;
547         default:
548                 req->rq_status = -ENOTSUPP;
549                 return ptlrpc_error(obddev, svc, req);
550         }
551
552 out:
553         req->rq_status = rc;
554         if (rc) { 
555                 CERROR("ost: processing error %d\n", rc);
556                 ptlrpc_error(obddev, svc, req);
557         } else { 
558                 CDEBUG(D_INODE, "sending reply\n"); 
559                 ptlrpc_reply(obddev, svc, req); 
560         }
561
562         return 0;
563 }
564
565
566 /* mount the file system (secretly) */
567 static int ost_setup(struct obd_device *obddev, obd_count len,
568                         void *buf)
569                         
570 {
571         struct obd_ioctl_data* data = buf;
572         struct ost_obd *ost = &obddev->u.ost;
573         struct obd_device *tgt;
574         int err; 
575         ENTRY;
576
577         if (data->ioc_dev  < 0 || data->ioc_dev > MAX_OBD_DEVICES) { 
578                 EXIT;
579                 return -ENODEV;
580         }
581
582         tgt = &obd_dev[data->ioc_dev];
583         ost->ost_tgt = tgt;
584         if ( ! (tgt->obd_flags & OBD_ATTACHED) || 
585              ! (tgt->obd_flags & OBD_SET_UP) ){
586                 CERROR("device not attached or not set up (%d)\n", 
587                        data->ioc_dev);
588                 EXIT;
589                 return -EINVAL;
590         } 
591
592         ost->ost_conn.oc_dev = tgt;
593         err = obd_connect(&ost->ost_conn);
594         if (err) { 
595                 CERROR("fail to connect to device %d\n", data->ioc_dev); 
596                 return -EINVAL;
597         }
598
599         ost->ost_service = ptlrpc_init_svc( 64 * 1024, 
600                                             OST_REQUEST_PORTAL,
601                                             OSC_REPLY_PORTAL,
602                                             "self", 
603                                             ost_unpack_req,
604                                             ost_pack_rep,
605                                             ost_handle);
606         if (!ost->ost_service) { 
607                 obd_disconnect(&ost->ost_conn); 
608                 return -EINVAL;
609         }
610                                             
611         rpc_register_service(ost->ost_service, "self");
612
613         err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost"); 
614         if (err) { 
615                 obd_disconnect(&ost->ost_conn); 
616                 return -EINVAL;
617         }
618                 
619         MOD_INC_USE_COUNT;
620         EXIT; 
621         return 0;
622
623
624 static int ost_cleanup(struct obd_device * obddev)
625 {
626         struct ost_obd *ost = &obddev->u.ost;
627         int err;
628
629         ENTRY;
630
631         if ( !list_empty(&obddev->obd_gen_clients) ) {
632                 CERROR("still has clients!\n");
633                 EXIT;
634                 return -EBUSY;
635         }
636
637         ptlrpc_stop_thread(ost->ost_service);
638         rpc_unregister_service(ost->ost_service);
639
640         if (!list_empty(&ost->ost_service->srv_reqs)) {
641                 // XXX reply with errors and clean up
642                 CERROR("Request list not empty!\n");
643         }
644         OBD_FREE(ost->ost_service, sizeof(*ost->ost_service));
645
646         err = obd_disconnect(&ost->ost_conn);
647         if (err) { 
648                 CERROR("lustre ost: fail to disconnect device\n");
649                 return -EINVAL;
650         }
651
652         MOD_DEC_USE_COUNT;
653         EXIT;
654         return 0;
655 }
656
657 /* use obd ops to offer management infrastructure */
658 static struct obd_ops ost_obd_ops = {
659         o_setup:       ost_setup,
660         o_cleanup:     ost_cleanup,
661 };
662
663 static int __init ost_init(void)
664 {
665         obd_register_type(&ost_obd_ops, LUSTRE_OST_NAME);
666         return 0;
667 }
668
669 static void __exit ost_exit(void)
670 {
671         obd_unregister_type(LUSTRE_OST_NAME);
672 }
673
674 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
675 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
676 MODULE_LICENSE("GPL");
677
678 module_init(ost_init);
679 module_exit(ost_exit);