Whamcloud - gitweb
merge b_devel into HEAD. Includes:
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #define EXPORT_SYMTAB
34 #define DEBUG_SUBSYSTEM S_OST
35
36 #include <linux/module.h>
37 #include <linux/obd_ost.h>
38 #include <linux/lustre_net.h>
39 #include <linux/lustre_dlm.h>
40 #include <linux/lustre_export.h>
41 #include <linux/init.h>
42 #include <linux/lprocfs_status.h>
43
44 inline void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
45 {
46         if (oti && req->rq_repmsg)
47                 req->rq_repmsg->transno = HTON__u64(oti->oti_transno);
48         EXIT;
49 }
50
51 static int ost_destroy(struct ptlrpc_request *req, struct obd_trans_info *oti)
52 {
53         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
54         struct ost_body *body;
55         int rc, size = sizeof(*body);
56         ENTRY;
57
58         body = lustre_msg_buf(req->rq_reqmsg, 0);
59
60         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
61         if (rc)
62                 RETURN(rc);
63
64         req->rq_status = obd_destroy(conn, &body->oa, NULL, oti);
65         RETURN(0);
66 }
67
68 static int ost_getattr(struct ptlrpc_request *req)
69 {
70         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
71         struct ost_body *body, *repbody;
72         int rc, size = sizeof(*body);
73         ENTRY;
74
75         body = lustre_msg_buf(req->rq_reqmsg, 0);
76
77         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
78         if (rc)
79                 RETURN(rc);
80
81         repbody = lustre_msg_buf(req->rq_repmsg, 0);
82         /* FIXME: unpack only valid fields instead of memcpy, endianness */
83         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
84         req->rq_status = obd_getattr(conn, &repbody->oa, NULL);
85         RETURN(0);
86 }
87
88 static int ost_statfs(struct ptlrpc_request *req)
89 {
90         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
91         struct obd_statfs *osfs;
92         int rc, size = sizeof(*osfs);
93         ENTRY;
94
95         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
96         if (rc)
97                 RETURN(rc);
98
99         osfs = lustre_msg_buf(req->rq_repmsg, 0);
100         memset(osfs, 0, size);
101
102         rc = obd_statfs(conn, osfs);
103         if (rc) {
104                 CERROR("ost: statfs failed: rc %d\n", rc);
105                 req->rq_status = rc;
106                 RETURN(rc);
107         }
108         obd_statfs_pack(osfs, osfs);
109
110         RETURN(0);
111 }
112
113 static int ost_syncfs(struct ptlrpc_request *req)
114 {
115         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
116         struct obd_statfs *osfs;
117         int rc, size = sizeof(*osfs);
118         ENTRY;
119
120         rc = lustre_pack_msg(0, &size, NULL, &req->rq_replen, &req->rq_repmsg);
121         if (rc)
122                 RETURN(rc);
123
124         rc = obd_syncfs(conn);
125         if (rc) {
126                 CERROR("ost: syncfs failed: rc %d\n", rc);
127                 req->rq_status = rc;
128                 RETURN(rc);
129         }
130
131         RETURN(0);
132 }
133
134 static int ost_open(struct ptlrpc_request *req, struct obd_trans_info *oti)
135 {
136         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
137         struct ost_body *body, *repbody;
138         int rc, size = sizeof(*body);
139         ENTRY;
140
141         body = lustre_msg_buf(req->rq_reqmsg, 0);
142
143         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
144         if (rc)
145                 RETURN(rc);
146
147         repbody = lustre_msg_buf(req->rq_repmsg, 0);
148         /* FIXME: unpack only valid fields instead of memcpy, endianness */
149         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
150         req->rq_status = obd_open(conn, &repbody->oa, NULL, oti);
151         RETURN(0);
152 }
153
154 static int ost_close(struct ptlrpc_request *req, struct obd_trans_info *oti)
155 {
156         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
157         struct ost_body *body, *repbody;
158         int rc, size = sizeof(*body);
159         ENTRY;
160
161         body = lustre_msg_buf(req->rq_reqmsg, 0);
162
163         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
164         if (rc)
165                 RETURN(rc);
166
167         repbody = lustre_msg_buf(req->rq_repmsg, 0);
168         /* FIXME: unpack only valid fields instead of memcpy, endianness */
169         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
170         req->rq_status = obd_close(conn, &repbody->oa, NULL, oti);
171         RETURN(0);
172 }
173
174 static int ost_create(struct ptlrpc_request *req, struct obd_trans_info *oti)
175 {
176         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
177         struct ost_body *body, *repbody;
178         int rc, size = sizeof(*body);
179         ENTRY;
180
181         body = lustre_msg_buf(req->rq_reqmsg, 0);
182
183         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
184         if (rc)
185                 RETURN(rc);
186
187         repbody = lustre_msg_buf(req->rq_repmsg, 0);
188         /* FIXME: unpack only valid fields instead of memcpy, endianness */
189         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
190         req->rq_status = obd_create(conn, &repbody->oa, NULL, oti);
191         RETURN(0);
192 }
193
194 static int ost_punch(struct ptlrpc_request *req, struct obd_trans_info *oti)
195 {
196         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
197         struct ost_body *body, *repbody;
198         int rc, size = sizeof(*body);
199         ENTRY;
200
201         body = lustre_msg_buf(req->rq_reqmsg, 0);
202
203         if ((NTOH__u32(body->oa.o_valid) & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))!=
204             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
205                 RETURN(-EINVAL);
206
207         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
208         if (rc)
209                 RETURN(rc);
210
211         repbody = lustre_msg_buf(req->rq_repmsg, 0);
212         /* FIXME: unpack only valid fields instead of memcpy, endianness */
213         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
214         req->rq_status = obd_punch(conn, &repbody->oa, NULL,
215                                    repbody->oa.o_size, repbody->oa.o_blocks, oti);
216         RETURN(0);
217 }
218
219 static int ost_setattr(struct ptlrpc_request *req, struct obd_trans_info *oti)
220 {
221         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
222         struct ost_body *body, *repbody;
223         int rc, size = sizeof(*body);
224         ENTRY;
225
226         body = lustre_msg_buf(req->rq_reqmsg, 0);
227
228         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
229         if (rc)
230                 RETURN(rc);
231
232         repbody = lustre_msg_buf(req->rq_repmsg, 0);
233         /* FIXME: unpack only valid fields instead of memcpy, endianness */
234         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
235         req->rq_status = obd_setattr(conn, &repbody->oa, NULL, oti);
236         RETURN(0);
237 }
238
239 static int ost_bulk_timeout(void *data)
240 {
241         ENTRY;
242         /* We don't fail the connection here, because having the export
243          * killed makes the (vital) call to commitrw very sad.
244          */
245         RETURN(1);
246 }
247
248 static int ost_brw_read(struct ptlrpc_request *req)
249 {
250         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
251         struct ptlrpc_bulk_desc *desc;
252         struct niobuf_remote *remote_nb;
253         struct niobuf_local *local_nb = NULL;
254         struct obd_ioobj *ioo;
255         struct ost_body *body;
256         struct l_wait_info lwi;
257         void *desc_priv = NULL;
258         void *end2;
259         int cmd, i, j, objcount, niocount, size = sizeof(*body);
260         int rc = 0;
261 #if CHECKSUM_BULK
262         __u64 cksum = 0;
263 #endif
264         ENTRY;
265
266         body = lustre_msg_buf(req->rq_reqmsg, 0);
267         ioo = lustre_msg_buf(req->rq_reqmsg, 1);
268         remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
269         end2 = (char *)remote_nb + req->rq_reqmsg->buflens[2];
270         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
271         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
272         cmd = OBD_BRW_READ;
273
274         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
275                 GOTO(out, req->rq_status = -EIO);
276
277         /* Hmm, we don't return anything in this reply buffer?
278          * We should be returning per-page status codes and also
279          * per-object size, blocks count, mtime, ctime.  (bug 593) */
280         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
281         if (rc)
282                 GOTO(out, req->rq_status = rc);
283
284         for (i = 0; i < objcount; i++, ioo++) {
285                 ost_unpack_ioo(ioo, ioo);
286                 if ((void *)(remote_nb + ioo->ioo_bufcnt) > end2) {
287                         CERROR("BRW: objid "LPX64" count %u larger than %u\n",
288                                ioo->ioo_id, ioo->ioo_bufcnt,
289                                (int)(end2 - (void *)remote_nb));
290                         LBUG();
291                         GOTO(out, rc = -EINVAL);
292                 }
293                 for (j = 0; j < ioo->ioo_bufcnt; j++, remote_nb++) {
294                         ost_unpack_niobuf(remote_nb, remote_nb);
295                         if (remote_nb->len == 0) {
296                                 CERROR("zero len BRW: objid "LPX64" buf %u\n",
297                                        ioo->ioo_id, j);
298                                 GOTO(out, rc = -EINVAL);
299                         }
300                         if (j && remote_nb->offset <= (remote_nb - 1)->offset) {
301                                 CERROR("unordered BRW: objid "LPX64
302                                        " buf %u offset "LPX64" <= "LPX64"\n",
303                                        ioo->ioo_id, j, remote_nb->offset,
304                                        (remote_nb - 1)->offset);
305                                 GOTO(out, rc = -EINVAL);
306                         }
307                 }
308         }
309
310         OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
311         if (local_nb == NULL)
312                 GOTO(out, rc = -ENOMEM);
313
314         /* The unpackers move ioo and remote_nb, so reset them before using */
315         ioo = lustre_msg_buf(req->rq_reqmsg, 1);
316         remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
317         req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount,
318                                     remote_nb, local_nb, &desc_priv, NULL);
319
320         if (req->rq_status)
321                 GOTO(out, req->rq_status);
322
323         desc = ptlrpc_prep_bulk(req->rq_connection);
324         if (desc == NULL)
325                 GOTO(out_local, rc = -ENOMEM);
326         desc->bd_ptl_ev_hdlr = NULL;
327         desc->bd_portal = OST_BULK_PORTAL;
328
329         for (i = 0; i < niocount; i++) {
330                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
331
332                 if (bulk == NULL)
333                         GOTO(out_bulk, rc = -ENOMEM);
334                 bulk->bp_xid = remote_nb[i].xid;
335                 bulk->bp_buf = local_nb[i].addr;
336                 bulk->bp_buflen = remote_nb[i].len;
337                 if (body->oa.o_valid & NTOH__u32(OBD_MD_FLCKSUM))
338                         ost_checksum(&cksum, bulk->bp_buf, bulk->bp_buflen);
339         }
340
341         rc = ptlrpc_bulk_put(desc);
342         if (rc)
343                 GOTO(out_bulk, rc);
344
345         lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
346         rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_SENT,
347                           &lwi);
348         if (rc) {
349                 LASSERT(rc == -ETIMEDOUT);
350                 GOTO(out_bulk, rc);
351         }
352
353         req->rq_status = obd_commitrw(cmd, conn, objcount, ioo, niocount,
354                                       local_nb, desc_priv, NULL);
355
356 out_bulk:
357         ptlrpc_bulk_decref(desc);
358 out_local:
359         OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
360 out:
361         if (rc)
362                 ptlrpc_error(req->rq_svc, req);
363         else {
364 #if CHECKSUM_BULK
365                 body = lustre_msg_buf(req->rq_repmsg, 0);
366                 body->oa.o_rdev = HTON__u64(cksum);
367                 body->oa.o_valid |= HTON__u32(OBD_MD_FLCKSUM);
368 #endif
369                 ptlrpc_reply(req->rq_svc, req);
370         }
371
372         RETURN(rc);
373 }
374
375 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
376 {
377         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
378         struct ptlrpc_bulk_desc *desc;
379         struct niobuf_remote *remote_nb;
380         void *end2;
381         struct niobuf_local *local_nb = NULL;
382         struct obd_ioobj *ioo;
383         struct ost_body *body;
384         struct l_wait_info lwi;
385         void *desc_priv = NULL;
386         int cmd, i, j, objcount, niocount, size = sizeof(*body);
387         int rc = 0;
388         ENTRY;
389
390         body = lustre_msg_buf(req->rq_reqmsg, 0);
391         ioo = lustre_msg_buf(req->rq_reqmsg, 1);
392         remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
393         end2 = (void *)remote_nb + req->rq_reqmsg->buflens[2];
394         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
395         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
396         cmd = OBD_BRW_WRITE;
397
398         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
399                 GOTO(out, req->rq_status = -EIO);
400
401         for (i = 0; i < objcount; i++, ioo++) {
402                 ost_unpack_ioo(ioo, ioo);
403                 if ((void *)(remote_nb + ioo->ioo_bufcnt) > end2) {
404                         CERROR("BRW: objid "LPX64" count %u larger than %u\n",
405                                ioo->ioo_id, ioo->ioo_bufcnt,
406                                (int)(end2 - (void *)remote_nb));
407                         LBUG();
408                         GOTO(out, rc = -EINVAL);
409                 }
410                 for (j = 0; j < ioo->ioo_bufcnt; j++, remote_nb++) {
411                         ost_unpack_niobuf(remote_nb, remote_nb);
412                         if (remote_nb->len == 0) {
413                                 CERROR("zero len BRW: objid "LPX64" buf %u\n",
414                                        ioo->ioo_id, j);
415                                 GOTO(out, rc = -EINVAL);
416                         }
417                         if (j && remote_nb->offset <= (remote_nb - 1)->offset) {
418                                 CERROR("unordered BRW: objid "LPX64
419                                        " buf %u offset "LPX64" <= "LPX64"\n",
420                                        ioo->ioo_id, j, remote_nb->offset,
421                                        (remote_nb - 1)->offset);
422                                 GOTO(out, rc = -EINVAL);
423                         }
424                 }
425         }
426
427         OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
428         if (local_nb == NULL)
429                 GOTO(out, rc = -ENOMEM);
430
431         /* The unpackers move ioo and remote_nb, so reset them before using */
432         ioo = lustre_msg_buf(req->rq_reqmsg, 1);
433         remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
434
435         req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount,
436                                     remote_nb, local_nb, &desc_priv, oti);
437
438         if (req->rq_status)
439                 GOTO(out_local, rc = 0);
440
441         desc = ptlrpc_prep_bulk(req->rq_connection);
442         if (desc == NULL)
443                 GOTO(out_local, rc = -ENOMEM);
444         desc->bd_ptl_ev_hdlr = NULL;
445         desc->bd_portal = OSC_BULK_PORTAL;
446
447         for (i = 0; i < niocount; i++) {
448                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
449
450                 if (bulk == NULL)
451                         GOTO(out_bulk, rc = -ENOMEM);
452                 bulk->bp_xid = remote_nb[i].xid;
453                 bulk->bp_buf = local_nb[i].addr;
454                 bulk->bp_buflen = remote_nb[i].len;
455         }
456
457         rc = ptlrpc_bulk_get(desc);
458         if (rc)
459                 GOTO(out_bulk, rc);
460
461         lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
462         rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_RCVD,
463                           &lwi);
464         if (rc) {
465                 LASSERT(rc == -ETIMEDOUT);
466                 ptlrpc_abort_bulk(desc);
467                 recovd_conn_fail(desc->bd_connection);
468                 obd_commitrw(cmd, conn, objcount, ioo, niocount, local_nb,
469                              desc_priv, oti);
470                 GOTO(out_bulk, rc);
471         }
472
473 #if CHECKSUM_BULK
474         if ((body->oa.o_valid & NTOH__u32(OBD_MD_FLCKSUM))) {
475                 static int cksum_counter;
476                 __u64 client_cksum = NTOH__u64(body->oa.o_rdev);
477                 __u64 cksum = 0;
478
479                 for (i = 0; i < niocount; i++) {
480                         char *ptr = kmap(local_nb[i].page);
481                         int   off = local_nb[i].offset & (PAGE_SIZE - 1);
482                         int   len = local_nb[i].len;
483
484                         LASSERT(off + len <= PAGE_SIZE);
485                         ost_checksum(&cksum, ptr + off, len);
486                         kunmap(local_nb[i].page);
487                 }
488
489                 if (client_cksum != cksum) {
490                         CERROR("Bad checksum: client "LPX64", server "LPX64
491                                ", client NID "LPX64"\n", client_cksum, cksum,
492                                req->rq_connection->c_peer.peer_nid);
493                         cksum_counter = 1;
494                 } else {
495                         cksum_counter++;
496                         if ((cksum_counter & (-cksum_counter)) == cksum_counter)
497                                 CERROR("Checksum %d from "LPX64": "LPX64" OK\n",
498                                         cksum_counter,
499                                         req->rq_connection->c_peer.peer_nid,
500                                         cksum);
501                 }
502         }
503 #endif
504
505         req->rq_status = obd_commitrw(cmd, conn, objcount, ioo, niocount,
506                                       local_nb, desc_priv, oti);
507
508  out_bulk:
509         ptlrpc_bulk_decref(desc);
510  out_local:
511         OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
512  out:
513         if (!rc)
514                 /* Hmm, we don't return anything in this reply buffer?
515                  * We should be returning per-page status codes and also
516                  * per-object size, blocks count, mtime, ctime.  (bug 593) */
517                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
518                                      &req->rq_repmsg);
519         if (rc)
520                 ptlrpc_error(req->rq_svc, req);
521         else {
522                 oti_to_request(oti, req);
523                 rc = ptlrpc_reply(req->rq_svc, req);
524         }
525         RETURN(rc);
526 }
527
528 static int ost_san_brw(struct ptlrpc_request *req, int alloc)
529 {
530         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
531         struct niobuf_remote *remote_nb, *res_nb;
532         struct obd_ioobj *ioo;
533         struct ost_body *body;
534         int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
535         void *end2;
536         ENTRY;
537
538         body = lustre_msg_buf(req->rq_reqmsg, 0);
539         ioo = lustre_msg_buf(req->rq_reqmsg, 1);
540         remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
541         end2 = (void *)remote_nb + req->rq_reqmsg->buflens[2];
542         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
543         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
544
545         cmd = alloc ? OBD_BRW_WRITE : OBD_BRW_READ;
546
547         for (i = 0; i < objcount; i++, ioo++) {
548                 ost_unpack_ioo(ioo, ioo);
549                 if ((void *)(remote_nb + ioo->ioo_bufcnt) > end2) {
550                         CERROR("BRW: objid "LPX64" count %u larger than %u\n",
551                                ioo->ioo_id, ioo->ioo_bufcnt,
552                                (int)(end2 - (void *)remote_nb));
553                         GOTO(out, rc = -EINVAL);
554                 }
555                 for (j = 0; j < ioo->ioo_bufcnt; j++, remote_nb++)
556                         ost_unpack_niobuf(remote_nb, remote_nb);
557         }
558
559         size[1] = niocount * sizeof(*remote_nb);
560         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
561         if (rc)
562                 GOTO(out, rc);
563
564         /* The unpackers move ioo and remote_nb, so reset them before using */
565         ioo = lustre_msg_buf(req->rq_reqmsg, 1);
566         remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
567
568         req->rq_status = obd_san_preprw(cmd, conn, objcount, ioo,
569                                         niocount, remote_nb);
570
571         if (req->rq_status) {
572                 rc = 0;
573                 goto out;
574         }
575
576         remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
577         res_nb = lustre_msg_buf(req->rq_reqmsg, 2);
578         for (i = 0; i < niocount; i++, remote_nb++, res_nb++)
579                 ost_pack_niobuf(remote_nb, res_nb->offset, res_nb->len,
580                                 res_nb->flags, res_nb->xid);
581
582         rc = 0;
583
584 out:
585         if (rc) {
586                 OBD_FREE(req->rq_repmsg, req->rq_replen);
587                 req->rq_repmsg = NULL;
588                 ptlrpc_error(req->rq_svc, req);
589         } else
590                 ptlrpc_reply(req->rq_svc, req);
591
592         return rc;
593 }
594
595 static int filter_recovery_request(struct ptlrpc_request *req,
596                                    struct obd_device *obd, int *process)
597 {
598         switch (req->rq_reqmsg->opc) {
599         case OST_CONNECT: /* This will never get here, but for completeness. */
600         case OST_DISCONNECT:
601                *process = 1;
602                RETURN(0);
603
604         case OST_CLOSE:
605         case OST_CREATE:
606         case OST_DESTROY:
607         case OST_OPEN:
608         case OST_PUNCH:
609         case OST_SETATTR: 
610         case OST_SYNCFS:
611         case OST_WRITE:
612         case LDLM_ENQUEUE:
613                 *process = target_queue_recovery_request(req, obd);
614                 RETURN(0);
615
616         default:
617                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
618                 *process = 0;
619                 /* XXX what should we set rq_status to here? */
620                 RETURN(ptlrpc_error(req->rq_svc, req));
621         }
622 }
623
624 static int ost_handle(struct ptlrpc_request *req)
625 {
626         struct obd_trans_info trans_info = { 0, }, *oti = &trans_info;
627         int should_process, rc;
628         ENTRY;
629
630         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
631         if (rc || OBD_FAIL_CHECK(OBD_FAIL_OST_HANDLE_UNPACK)) {
632                 CERROR("lustre_ost: Invalid request\n");
633                 GOTO(out, rc);
634         }
635
636         if (req->rq_reqmsg->opc != OST_CONNECT) {
637                 struct obd_device *obd;
638
639                 if (req->rq_export == NULL) {
640                         CERROR("lustre_ost: operation %d on unconnected OST\n",
641                                req->rq_reqmsg->opc);
642                         req->rq_status = -ENOTCONN;
643                         GOTO(out, rc = -ENOTCONN);
644                 }
645
646                 obd = req->rq_export->exp_obd;
647
648                 spin_lock_bh(&obd->obd_processing_task_lock);
649                 if (obd->obd_flags & OBD_ABORT_RECOVERY)
650                         target_abort_recovery(obd);
651                 spin_unlock_bh(&obd->obd_processing_task_lock);
652
653                 if (obd->obd_flags & OBD_RECOVERING) {
654                         rc = filter_recovery_request(req, obd, &should_process);
655                         if (rc || !should_process)
656                                 RETURN(rc);
657                 } else if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
658 #if 0
659 /* need to store this reply somewhere... */
660                         if (req->rq_xid == med->med_last_xid) {
661                                 DEBUG_REQ(D_HA, req, "resending reply");
662                                 OBD_ALLOC(req->rq_repmsg, med->med_last_replen);
663                                 req->rq_replen = med->med_last_replen;
664                                 memcpy(req->rq_repmsg, med->med_last_reply,
665                                        req->rq_replen);
666                                 ptlrpc_reply(req->rq_svc, req);
667                                 return 0;
668                         }
669                         DEBUG_REQ(D_HA, req, "no reply for resend, continuing");
670 #endif
671                 }
672
673         } 
674
675         if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0)
676                 GOTO(out, rc = -EINVAL);
677
678         switch (req->rq_reqmsg->opc) {
679         case OST_CONNECT:
680                 CDEBUG(D_INODE, "connect\n");
681                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
682                 rc = target_handle_connect(req, ost_handle);
683                 break;
684         case OST_DISCONNECT:
685                 CDEBUG(D_INODE, "disconnect\n");
686                 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
687                 rc = target_handle_disconnect(req);
688                 break;
689         case OST_CREATE:
690                 CDEBUG(D_INODE, "create\n");
691                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
692                 rc = ost_create(req, oti);
693                 break;
694         case OST_DESTROY:
695                 CDEBUG(D_INODE, "destroy\n");
696                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
697                 rc = ost_destroy(req, oti);
698                 break;
699         case OST_GETATTR:
700                 CDEBUG(D_INODE, "getattr\n");
701                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
702                 rc = ost_getattr(req);
703                 break;
704         case OST_SETATTR:
705                 CDEBUG(D_INODE, "setattr\n");
706                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
707                 rc = ost_setattr(req, oti);
708                 break;
709         case OST_OPEN:
710                 CDEBUG(D_INODE, "open\n");
711                 OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
712                 rc = ost_open(req, oti);
713                 break;
714         case OST_CLOSE:
715                 CDEBUG(D_INODE, "close\n");
716                 OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
717                 rc = ost_close(req, oti);
718                 break;
719         case OST_WRITE:
720                 CDEBUG(D_INODE, "write\n");
721                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
722                 rc = ost_brw_write(req, oti);
723                 /* ost_brw sends its own replies */
724                 RETURN(rc);
725         case OST_READ:
726                 CDEBUG(D_INODE, "read\n");
727                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
728                 rc = ost_brw_read(req);
729                 /* ost_brw sends its own replies */
730                 RETURN(rc);
731         case OST_SAN_READ:
732                 CDEBUG(D_INODE, "san read\n");
733                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
734                 rc = ost_san_brw(req, 0);
735                 /* ost_san_brw sends its own replies */
736                 RETURN(rc);
737         case OST_SAN_WRITE:
738                 CDEBUG(D_INODE, "san write\n");
739                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
740                 rc = ost_san_brw(req, 1);
741                 /* ost_san_brw sends its own replies */
742                 RETURN(rc);
743         case OST_PUNCH:
744                 CDEBUG(D_INODE, "punch\n");
745                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
746                 rc = ost_punch(req, oti);
747                 break;
748         case OST_STATFS:
749                 CDEBUG(D_INODE, "statfs\n");
750                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
751                 rc = ost_statfs(req);
752                 break;
753         case OST_SYNCFS:
754                 CDEBUG(D_INODE, "sync\n");
755                 OBD_FAIL_RETURN(OBD_FAIL_OST_SYNCFS_NET, 0);
756                 rc = ost_syncfs(req);
757                 break;
758         case LDLM_ENQUEUE:
759                 CDEBUG(D_INODE, "enqueue\n");
760                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
761                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
762                                          ldlm_server_blocking_ast);
763                 break;
764         case LDLM_CONVERT:
765                 CDEBUG(D_INODE, "convert\n");
766                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
767                 rc = ldlm_handle_convert(req);
768                 break;
769         case LDLM_CANCEL:
770                 CDEBUG(D_INODE, "cancel\n");
771                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
772                 rc = ldlm_handle_cancel(req);
773                 break;
774         case LDLM_BL_CALLBACK:
775         case LDLM_CP_CALLBACK:
776                 CDEBUG(D_INODE, "callback\n");
777                 CERROR("callbacks should not happen on OST\n");
778                 LBUG();
779                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
780                 break;
781         default:
782                 req->rq_status = -ENOTSUPP;
783                 rc = ptlrpc_error(req->rq_svc, req);
784                 RETURN(rc);
785         }
786
787         EXIT;
788         /* If we're DISCONNECTing, the export_data is already freed */
789         if (!rc && req->rq_reqmsg->opc != OST_DISCONNECT) {
790                 struct obd_device *obd  = req->rq_export->exp_obd;
791                 if ((obd->obd_flags & OBD_NO_TRANSNO) == 0) {
792                         req->rq_repmsg->last_committed =
793                                 HTON__u64(obd->obd_last_committed);
794                 } else {
795                         DEBUG_REQ(D_IOCTL, req,
796                                   "not sending last_committed update");
797                 }
798                 CDEBUG(D_INFO, "last_committed "LPU64", xid "LPX64"\n",
799                        obd->obd_last_committed, HTON__u64(req->rq_xid));
800         }
801
802 out:
803         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
804                 struct obd_device *obd = req->rq_export->exp_obd;
805
806                 if (obd && (obd->obd_flags & OBD_RECOVERING)) {
807                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
808                         return target_queue_final_reply(req, rc);
809                 }
810                 /* Lost a race with recovery; let the error path DTRT. */
811                 rc = req->rq_status = -ENOTCONN;
812         }
813
814         if (rc) {
815                 CERROR("ost: processing error (opcode=%d): %d\n",
816                        req->rq_reqmsg->opc, rc);
817                 ptlrpc_error(req->rq_svc, req);
818         } else {
819                 CDEBUG(D_INODE, "sending reply\n");
820                 if (req->rq_repmsg == NULL)
821                         CERROR("handler for opcode %d returned rc=0 without "
822                                "creating rq_repmsg; needs to return rc != 0!\n",
823                                req->rq_reqmsg->opc);
824                 else
825                         oti_to_request(oti, req);
826                 ptlrpc_reply(req->rq_svc, req);
827         }
828
829         return 0;
830 }
831
832 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
833 {
834         struct ost_obd *ost = &obddev->u.ost;
835         int err;
836         int i;
837         ENTRY;
838
839         ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS,
840                                            OST_BUFSIZE, OST_MAXREQSIZE,
841                                            OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
842                                            ost_handle, "ost");
843         if (!ost->ost_service) {
844                 CERROR("failed to start service\n");
845                 GOTO(error_disc, err = -ENOMEM);
846         }
847
848         for (i = 0; i < OST_NUM_THREADS; i++) {
849                 char name[32];
850                 sprintf(name, "ll_ost_%02d", i);
851                 err = ptlrpc_start_thread(obddev, ost->ost_service, name);
852                 if (err) {
853                         CERROR("error starting thread #%d: rc %d\n", i, err);
854                         GOTO(error_disc, err = -EINVAL);
855                 }
856         }
857
858         RETURN(0);
859
860 error_disc:
861         RETURN(err);
862 }
863
864 static int ost_cleanup(struct obd_device * obddev)
865 {
866         struct ost_obd *ost = &obddev->u.ost;
867         int err = 0;
868
869         ENTRY;
870
871         ptlrpc_stop_all_threads(ost->ost_service);
872         ptlrpc_unregister_service(ost->ost_service);
873
874         RETURN(err);
875 }
876
877 int ost_attach(struct obd_device *dev, obd_count len, void *data)
878 {
879         struct lprocfs_static_vars lvars;
880
881         lprocfs_init_vars(&lvars);
882         return lprocfs_obd_attach(dev, lvars.obd_vars);
883 }
884
885 int ost_detach(struct obd_device *dev)
886 {
887         return lprocfs_obd_detach(dev);
888 }
889
890 /* I don't think this function is ever used, since nothing 
891  * connects directly to this module.
892  */
893 static int ost_connect(struct lustre_handle *conn,
894                        struct obd_device *obd, struct obd_uuid *cluuid,
895                        struct recovd_obd *recovd,
896                        ptlrpc_recovery_cb_t recover)
897 {
898         struct obd_export *exp;
899         int rc;
900         ENTRY;
901
902         if (!conn || !obd || !cluuid)
903                 RETURN(-EINVAL);
904
905         rc = class_connect(conn, obd, cluuid);
906         if (rc)
907                 RETURN(rc);
908         exp = class_conn2export(conn);
909         LASSERT(exp);
910
911         RETURN(0);
912 }
913
914 /* use obd ops to offer management infrastructure */
915 static struct obd_ops ost_obd_ops = {
916         o_owner:        THIS_MODULE,
917         o_attach:       ost_attach,
918         o_detach:       ost_detach,
919         o_setup:        ost_setup,
920         o_cleanup:      ost_cleanup,
921         o_connect:      ost_connect,
922 };
923
924 static int __init ost_init(void)
925 {
926         struct lprocfs_static_vars lvars;
927         ENTRY;
928
929         lprocfs_init_vars(&lvars);
930         RETURN(class_register_type(&ost_obd_ops, lvars.module_vars,
931                                    LUSTRE_OST_NAME));
932 }
933
934 static void __exit ost_exit(void)
935 {
936         class_unregister_type(LUSTRE_OST_NAME);
937 }
938
939 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
940 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
941 MODULE_LICENSE("GPL");
942
943 module_init(ost_init);
944 module_exit(ost_exit);