Whamcloud - gitweb
land 0.5.20.3 b_devel onto HEAD (b_devel will remain)
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  *  Storage Target Handling functions
24  *  Lustre Object Server Module (OST)
25  *
26  *  This server is single threaded at present (but can easily be multi
27  *  threaded). For testing and management it is treated as an
28  *  obd_device, although it does not export a full OBD method table
29  *  (the requests are coming in over the wire, so object target
30  *  modules do not have a full method table.)
31  */
32
33 #define EXPORT_SYMTAB
34 #define DEBUG_SUBSYSTEM S_OST
35
36 #include <linux/module.h>
37 #include <linux/obd_ost.h>
38 #include <linux/lustre_net.h>
39 #include <linux/lustre_dlm.h>
40 #include <linux/lustre_export.h>
41 #include <linux/init.h>
42 #include <linux/lprocfs_status.h>
43
44 inline void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
45 {
46         if (oti && req->rq_repmsg)
47                 req->rq_repmsg->transno = HTON__u64(oti->oti_transno);
48         EXIT;
49 }
50
51 static int ost_destroy(struct ptlrpc_request *req, struct obd_trans_info *oti)
52 {
53         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
54         struct ost_body *body;
55         int rc, size = sizeof(*body);
56         ENTRY;
57
58         body = lustre_msg_buf(req->rq_reqmsg, 0);
59
60         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
61         if (rc)
62                 RETURN(rc);
63
64         req->rq_status = obd_destroy(conn, &body->oa, NULL, oti);
65         RETURN(0);
66 }
67
68 static int ost_getattr(struct ptlrpc_request *req)
69 {
70         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
71         struct ost_body *body, *repbody;
72         int rc, size = sizeof(*body);
73         ENTRY;
74
75         body = lustre_msg_buf(req->rq_reqmsg, 0);
76
77         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
78         if (rc)
79                 RETURN(rc);
80
81         repbody = lustre_msg_buf(req->rq_repmsg, 0);
82         /* FIXME: unpack only valid fields instead of memcpy, endianness */
83         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
84         req->rq_status = obd_getattr(conn, &repbody->oa, NULL);
85         RETURN(0);
86 }
87
88 static int ost_statfs(struct ptlrpc_request *req)
89 {
90         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
91         struct obd_statfs *osfs;
92         int rc, size = sizeof(*osfs);
93         ENTRY;
94
95         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
96         if (rc)
97                 RETURN(rc);
98
99         osfs = lustre_msg_buf(req->rq_repmsg, 0);
100         memset(osfs, 0, size);
101
102         rc = obd_statfs(conn, osfs);
103         if (rc) {
104                 CERROR("ost: statfs failed: rc %d\n", rc);
105                 req->rq_status = rc;
106                 RETURN(rc);
107         }
108         obd_statfs_pack(osfs, osfs);
109
110         RETURN(0);
111 }
112
113 static int ost_syncfs(struct ptlrpc_request *req)
114 {
115         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
116         struct obd_statfs *osfs;
117         int rc, size = sizeof(*osfs);
118         ENTRY;
119
120         rc = lustre_pack_msg(0, &size, NULL, &req->rq_replen, &req->rq_repmsg);
121         if (rc)
122                 RETURN(rc);
123
124         rc = obd_syncfs(conn);
125         if (rc) {
126                 CERROR("ost: syncfs failed: rc %d\n", rc);
127                 req->rq_status = rc;
128                 RETURN(rc);
129         }
130
131         RETURN(0);
132 }
133
134 static int ost_open(struct ptlrpc_request *req, struct obd_trans_info *oti)
135 {
136         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
137         struct ost_body *body, *repbody;
138         int rc, size = sizeof(*body);
139         ENTRY;
140
141         body = lustre_msg_buf(req->rq_reqmsg, 0);
142
143         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
144         if (rc)
145                 RETURN(rc);
146
147         repbody = lustre_msg_buf(req->rq_repmsg, 0);
148         /* FIXME: unpack only valid fields instead of memcpy, endianness */
149         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
150         req->rq_status = obd_open(conn, &repbody->oa, NULL, oti);
151         RETURN(0);
152 }
153
154 static int ost_close(struct ptlrpc_request *req, struct obd_trans_info *oti)
155 {
156         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
157         struct ost_body *body, *repbody;
158         int rc, size = sizeof(*body);
159         ENTRY;
160
161         body = lustre_msg_buf(req->rq_reqmsg, 0);
162
163         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
164         if (rc)
165                 RETURN(rc);
166
167         repbody = lustre_msg_buf(req->rq_repmsg, 0);
168         /* FIXME: unpack only valid fields instead of memcpy, endianness */
169         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
170         req->rq_status = obd_close(conn, &repbody->oa, NULL, oti);
171         RETURN(0);
172 }
173
174 static int ost_create(struct ptlrpc_request *req, struct obd_trans_info *oti)
175 {
176         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
177         struct ost_body *body, *repbody;
178         int rc, size = sizeof(*body);
179         ENTRY;
180
181         body = lustre_msg_buf(req->rq_reqmsg, 0);
182
183         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
184         if (rc)
185                 RETURN(rc);
186
187         repbody = lustre_msg_buf(req->rq_repmsg, 0);
188         /* FIXME: unpack only valid fields instead of memcpy, endianness */
189         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
190         req->rq_status = obd_create(conn, &repbody->oa, NULL, oti);
191         RETURN(0);
192 }
193
194 static int ost_punch(struct ptlrpc_request *req, struct obd_trans_info *oti)
195 {
196         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
197         struct ost_body *body, *repbody;
198         int rc, size = sizeof(*body);
199         ENTRY;
200
201         body = lustre_msg_buf(req->rq_reqmsg, 0);
202
203         if ((NTOH__u32(body->oa.o_valid) & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))!=
204             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
205                 RETURN(-EINVAL);
206
207         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
208         if (rc)
209                 RETURN(rc);
210
211         repbody = lustre_msg_buf(req->rq_repmsg, 0);
212         /* FIXME: unpack only valid fields instead of memcpy, endianness */
213         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
214         req->rq_status = obd_punch(conn, &repbody->oa, NULL,
215                                    repbody->oa.o_size, repbody->oa.o_blocks, oti);
216         RETURN(0);
217 }
218
219 static int ost_setattr(struct ptlrpc_request *req, struct obd_trans_info *oti)
220 {
221         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
222         struct ost_body *body, *repbody;
223         int rc, size = sizeof(*body);
224         ENTRY;
225
226         body = lustre_msg_buf(req->rq_reqmsg, 0);
227
228         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
229         if (rc)
230                 RETURN(rc);
231
232         repbody = lustre_msg_buf(req->rq_repmsg, 0);
233         /* FIXME: unpack only valid fields instead of memcpy, endianness */
234         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
235         req->rq_status = obd_setattr(conn, &repbody->oa, NULL, oti);
236         RETURN(0);
237 }
238
239 static int ost_bulk_timeout(void *data)
240 {
241         ENTRY;
242         /* We don't fail the connection here, because having the export
243          * killed makes the (vital) call to commitrw very sad.
244          */
245         RETURN(1);
246 }
247
248 static int ost_brw_read(struct ptlrpc_request *req)
249 {
250         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
251         struct ptlrpc_bulk_desc *desc;
252         struct obd_ioobj *tmp1;
253         void *tmp2, *end2;
254         struct niobuf_remote *remote_nb;
255         struct niobuf_local *local_nb = NULL;
256         struct obd_ioobj *ioo;
257         struct ost_body *body;
258         struct l_wait_info lwi;
259         void *desc_priv = NULL;
260         int cmd, i, j, objcount, niocount, size = sizeof(*body);
261         int rc = 0;
262 #if CHECKSUM_BULK
263         __u64 cksum = 0;
264 #endif
265         ENTRY;
266
267         body = lustre_msg_buf(req->rq_reqmsg, 0);
268         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
269         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
270         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
271         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
272         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
273         cmd = OBD_BRW_READ;
274
275         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
276                 GOTO(out, req->rq_status = -EIO);
277
278         /* Hmm, we don't return anything in this reply buffer?
279          * We should be returning per-page status codes and also
280          * per-object size, blocks count, mtime, ctime.  (bug 593) */
281         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
282         if (rc)
283                 GOTO(out, req->rq_status = rc);
284
285         for (i = 0; i < objcount; i++) {
286                 ost_unpack_ioo(&tmp1, &ioo);
287                 if (tmp2 + ioo->ioo_bufcnt > end2) {
288                         LBUG();
289                         GOTO(out, rc = -EFAULT);
290                 }
291                 for (j = 0; j < ioo->ioo_bufcnt; j++) {
292                         /* XXX verify niobuf[j].offset > niobuf[j-1].offset */
293                         ost_unpack_niobuf(&tmp2, &remote_nb);
294                 }
295         }
296
297         OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
298         if (local_nb == NULL)
299                 GOTO(out, rc = -ENOMEM);
300
301         /* The unpackers move tmp1 and tmp2, so reset them before using */
302         ioo = lustre_msg_buf(req->rq_reqmsg, 1);
303         remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
304         req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount,
305                                     remote_nb, local_nb, &desc_priv, NULL);
306
307         if (req->rq_status)
308                 GOTO(out, req->rq_status);
309
310         desc = ptlrpc_prep_bulk(req->rq_connection);
311         if (desc == NULL)
312                 GOTO(out_local, rc = -ENOMEM);
313         desc->bd_ptl_ev_hdlr = NULL;
314         desc->bd_portal = OST_BULK_PORTAL;
315
316         for (i = 0; i < niocount; i++) {
317                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
318
319                 if (bulk == NULL)
320                         GOTO(out_bulk, rc = -ENOMEM);
321                 bulk->bp_xid = remote_nb[i].xid;
322                 bulk->bp_buf = local_nb[i].addr;
323                 bulk->bp_buflen = remote_nb[i].len;
324                 if (body->oa.o_valid & NTOH__u32(OBD_MD_FLCKSUM))
325                         ost_checksum(&cksum, bulk->bp_buf, bulk->bp_buflen);
326         }
327
328         rc = ptlrpc_bulk_put(desc);
329         if (rc)
330                 GOTO(out_bulk, rc);
331
332         lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
333         rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_SENT,
334                           &lwi);
335         if (rc) {
336                 LASSERT(rc == -ETIMEDOUT);
337                 GOTO(out_bulk, rc);
338         }
339
340         req->rq_status = obd_commitrw(cmd, conn, objcount, ioo, niocount,
341                                       local_nb, desc_priv, NULL);
342
343 out_bulk:
344         ptlrpc_bulk_decref(desc);
345 out_local:
346         OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
347 out:
348         if (rc)
349                 ptlrpc_error(req->rq_svc, req);
350         else {
351 #if CHECKSUM_BULK
352                 body = lustre_msg_buf(req->rq_repmsg, 0);
353                 body->oa.o_rdev = HTON__u64(cksum);
354                 body->oa.o_valid |= HTON__u32(OBD_MD_FLCKSUM);
355 #endif
356                 ptlrpc_reply(req->rq_svc, req);
357         }
358
359         RETURN(rc);
360 }
361
362 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
363 {
364         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
365         struct ptlrpc_bulk_desc *desc;
366         struct obd_ioobj *tmp1;
367         void *tmp2, *end2;
368         struct niobuf_remote *remote_nb;
369         struct niobuf_local *local_nb = NULL;
370         struct obd_ioobj *ioo;
371         struct ost_body *body;
372         struct l_wait_info lwi;
373         void *desc_priv = NULL;
374         int cmd, i, j, objcount, niocount, size = sizeof(*body);
375         int rc = 0;
376         ENTRY;
377
378         body = lustre_msg_buf(req->rq_reqmsg, 0);
379         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
380         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
381         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
382         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
383         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
384         cmd = OBD_BRW_WRITE;
385
386         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
387                 GOTO(out, req->rq_status = -EIO);
388
389         for (i = 0; i < objcount; i++) {
390                 ost_unpack_ioo(&tmp1, &ioo);
391                 if (tmp2 + ioo->ioo_bufcnt > end2) {
392                         LBUG();
393                         GOTO(out, rc = -EFAULT);
394                 }
395                 for (j = 0; j < ioo->ioo_bufcnt; j++) {
396                         /* XXX verify niobuf[j].offset > niobuf[j-1].offset */
397                         ost_unpack_niobuf(&tmp2, &remote_nb);
398                 }
399         }
400
401         OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
402         if (local_nb == NULL)
403                 GOTO(out, rc = -ENOMEM);
404
405         /* The unpackers move tmp1 and tmp2, so reset them before using */
406         ioo = lustre_msg_buf(req->rq_reqmsg, 1);
407         remote_nb = lustre_msg_buf(req->rq_reqmsg, 2);
408         req->rq_status = obd_preprw(cmd, conn, objcount, ioo, niocount,
409                                     remote_nb, local_nb, &desc_priv, oti);
410
411         if (req->rq_status)
412                 GOTO(out_local, rc = 0);
413
414         desc = ptlrpc_prep_bulk(req->rq_connection);
415         if (desc == NULL)
416                 GOTO(out_local, rc = -ENOMEM);
417         desc->bd_ptl_ev_hdlr = NULL;
418         desc->bd_portal = OSC_BULK_PORTAL;
419
420         for (i = 0; i < niocount; i++) {
421                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
422
423                 if (bulk == NULL)
424                         GOTO(out_bulk, rc = -ENOMEM);
425                 bulk->bp_xid = remote_nb[i].xid;
426                 bulk->bp_buf = local_nb[i].addr;
427                 bulk->bp_buflen = remote_nb[i].len;
428         }
429
430         rc = ptlrpc_bulk_get(desc);
431         if (rc)
432                 GOTO(out_bulk, rc);
433
434         lwi = LWI_TIMEOUT(obd_timeout * HZ, ost_bulk_timeout, desc);
435         rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_RCVD,
436                           &lwi);
437         if (rc) {
438                 LASSERT(rc == -ETIMEDOUT);
439                 ptlrpc_abort_bulk(desc);
440                 recovd_conn_fail(desc->bd_connection);
441                 obd_commitrw(cmd, conn, objcount, ioo, niocount, local_nb,
442                              desc_priv, oti);
443                 GOTO(out_bulk, rc);
444         }
445
446 #if CHECKSUM_BULK
447         if ((body->oa.o_valid & NTOH__u32(OBD_MD_FLCKSUM))) {
448                 static int cksum_counter;
449                 __u64 client_cksum = NTOH__u64(body->oa.o_rdev);
450                 __u64 cksum = 0;
451
452                 for (i = 0; i < niocount; i++) {
453                         char *ptr = kmap(local_nb[i].page);
454                         int   off = local_nb[i].offset & (PAGE_SIZE - 1);
455                         int   len = local_nb[i].len;
456
457                         LASSERT(off + len <= PAGE_SIZE);
458                         ost_checksum(&cksum, ptr + off, len);
459                         kunmap(local_nb[i].page);
460                 }
461
462                 if (client_cksum != cksum) {
463                         CERROR("Bad checksum: client "LPX64", server "LPX64
464                                ", client NID "LPX64"\n", client_cksum, cksum,
465                                req->rq_connection->c_peer.peer_nid);
466                         cksum_counter = 1;
467                 } else {
468                         cksum_counter++;
469                         if ((cksum_counter & (-cksum_counter)) == cksum_counter)
470                                 CERROR("Checksum %d from "LPX64": "LPX64" OK\n",
471                                         cksum_counter,
472                                         req->rq_connection->c_peer.peer_nid,
473                                         cksum);
474                 }
475         }
476 #endif
477
478         req->rq_status = obd_commitrw(cmd, conn, objcount, ioo, niocount,
479                                       local_nb, desc_priv, oti);
480
481  out_bulk:
482         ptlrpc_bulk_decref(desc);
483  out_local:
484         OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
485  out:
486         if (!rc)
487                 /* Hmm, we don't return anything in this reply buffer?
488                  * We should be returning per-page status codes and also
489                  * per-object size, blocks count, mtime, ctime.  (bug 593) */
490                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
491                                      &req->rq_repmsg);
492         if (rc)
493                 ptlrpc_error(req->rq_svc, req);
494         else {
495                 oti_to_request(oti, req);
496                 rc = ptlrpc_reply(req->rq_svc, req);
497         }
498         RETURN(rc);
499 }
500
501 static int ost_san_brw(struct ptlrpc_request *req, int alloc)
502 {
503         struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
504         struct niobuf_remote *remote_nb, *res_nb;
505         struct obd_ioobj *ioo;
506         struct ost_body *body;
507         int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
508         void *tmp1, *tmp2, *end2;
509         ENTRY;
510
511         body = lustre_msg_buf(req->rq_reqmsg, 0);
512         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
513         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
514         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
515         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
516         niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
517         
518         cmd = alloc ? OBD_BRW_WRITE : OBD_BRW_READ;
519
520         for (i = 0; i < objcount; i++) {
521                 ost_unpack_ioo((void *)&tmp1, &ioo);
522                 if (tmp2 + ioo->ioo_bufcnt > end2) {
523                         rc = -EFAULT;
524                         break;
525                 }
526                 for (j = 0; j < ioo->ioo_bufcnt; j++)
527                         ost_unpack_niobuf((void *)&tmp2, &remote_nb);
528         }
529
530         size[1] = niocount * sizeof(*remote_nb);
531         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
532         if (rc)
533                 GOTO(out, rc);
534
535         /* The unpackers move tmp1 and tmp2, so reset them before using */
536         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
537         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
538
539         req->rq_status = obd_san_preprw(cmd, conn, objcount, tmp1,
540                                         niocount, tmp2);
541
542         if (req->rq_status) {
543                 rc = 0;
544                 goto out;
545         }
546
547         remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
548         res_nb = lustre_msg_buf(req->rq_reqmsg, 2);
549         for (i = 0; i < niocount; i++) {
550                 /* this advances remote_nb */
551                 ost_pack_niobuf((void **)&remote_nb,
552                                 res_nb[i].offset,
553                                 res_nb[i].len, /* 0 */
554                                 res_nb[i].flags, /* 0 */
555                                 res_nb[i].xid
556                                 );
557         }
558
559         rc = 0;
560
561 out:
562         if (rc) {
563                 OBD_FREE(req->rq_repmsg, req->rq_replen);
564                 req->rq_repmsg = NULL;
565                 ptlrpc_error(req->rq_svc, req);
566         } else
567                 ptlrpc_reply(req->rq_svc, req);
568
569         return rc;
570 }
571
572 static int filter_recovery_request(struct ptlrpc_request *req,
573                                    struct obd_device *obd, int *process)
574 {
575         switch (req->rq_reqmsg->opc) {
576         case OST_CONNECT: /* This will never get here, but for completeness. */
577         case OST_DISCONNECT:
578                *process = 1;
579                RETURN(0);
580
581         case OST_CLOSE:
582         case OST_CREATE:
583         case OST_DESTROY:
584         case OST_OPEN:
585         case OST_PUNCH:
586         case OST_SETATTR: 
587         case OST_SYNCFS:
588         case OST_WRITE:
589         case LDLM_ENQUEUE:
590                 *process = target_queue_recovery_request(req, obd);
591                 RETURN(0);
592
593         default:
594                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
595                 *process = 0;
596                 /* XXX what should we set rq_status to here? */
597                 RETURN(ptlrpc_error(req->rq_svc, req));
598         }
599 }
600
601 static int ost_handle(struct ptlrpc_request *req)
602 {
603         struct obd_trans_info trans_info = { 0, }, *oti = &trans_info;
604         int should_process, rc;
605         ENTRY;
606
607         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
608         if (rc || OBD_FAIL_CHECK(OBD_FAIL_OST_HANDLE_UNPACK)) {
609                 CERROR("lustre_ost: Invalid request\n");
610                 GOTO(out, rc);
611         }
612
613         if (req->rq_reqmsg->opc != OST_CONNECT) {
614                 struct obd_device *obd;
615
616                 if (req->rq_export == NULL) {
617                         CERROR("lustre_ost: operation %d on unconnected OST\n",
618                                req->rq_reqmsg->opc);
619                         req->rq_status = -ENOTCONN;
620                         GOTO(out, rc = -ENOTCONN);
621                 }
622
623                 obd = req->rq_export->exp_obd;
624
625                 spin_lock_bh(&obd->obd_processing_task_lock);
626                 if (obd->obd_flags & OBD_ABORT_RECOVERY)
627                         target_abort_recovery(obd);
628                 spin_unlock_bh(&obd->obd_processing_task_lock);
629
630                 if (obd->obd_flags & OBD_RECOVERING) {
631                         rc = filter_recovery_request(req, obd, &should_process);
632                         if (rc || !should_process)
633                                 RETURN(rc);
634                 } else if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
635 #if 0
636 /* need to store this reply somewhere... */
637                         if (req->rq_xid == med->med_last_xid) {
638                                 DEBUG_REQ(D_HA, req, "resending reply");
639                                 OBD_ALLOC(req->rq_repmsg, med->med_last_replen);
640                                 req->rq_replen = med->med_last_replen;
641                                 memcpy(req->rq_repmsg, med->med_last_reply,
642                                        req->rq_replen);
643                                 ptlrpc_reply(req->rq_svc, req);
644                                 return 0;
645                         }
646                         DEBUG_REQ(D_HA, req, "no reply for resend, continuing");
647 #endif
648                 }
649
650         } 
651
652         if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0)
653                 GOTO(out, rc = -EINVAL);
654
655         switch (req->rq_reqmsg->opc) {
656         case OST_CONNECT:
657                 CDEBUG(D_INODE, "connect\n");
658                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
659                 rc = target_handle_connect(req, ost_handle);
660                 break;
661         case OST_DISCONNECT:
662                 CDEBUG(D_INODE, "disconnect\n");
663                 OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0);
664                 rc = target_handle_disconnect(req);
665                 break;
666         case OST_CREATE:
667                 CDEBUG(D_INODE, "create\n");
668                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
669                 rc = ost_create(req, oti);
670                 break;
671         case OST_DESTROY:
672                 CDEBUG(D_INODE, "destroy\n");
673                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
674                 rc = ost_destroy(req, oti);
675                 break;
676         case OST_GETATTR:
677                 CDEBUG(D_INODE, "getattr\n");
678                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
679                 rc = ost_getattr(req);
680                 break;
681         case OST_SETATTR:
682                 CDEBUG(D_INODE, "setattr\n");
683                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
684                 rc = ost_setattr(req, oti);
685                 break;
686         case OST_OPEN:
687                 CDEBUG(D_INODE, "open\n");
688                 OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
689                 rc = ost_open(req, oti);
690                 break;
691         case OST_CLOSE:
692                 CDEBUG(D_INODE, "close\n");
693                 OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
694                 rc = ost_close(req, oti);
695                 break;
696         case OST_WRITE:
697                 CDEBUG(D_INODE, "write\n");
698                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
699                 rc = ost_brw_write(req, oti);
700                 /* ost_brw sends its own replies */
701                 RETURN(rc);
702         case OST_READ:
703                 CDEBUG(D_INODE, "read\n");
704                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
705                 rc = ost_brw_read(req);
706                 /* ost_brw sends its own replies */
707                 RETURN(rc);
708         case OST_SAN_READ:
709                 CDEBUG(D_INODE, "san read\n");
710                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
711                 rc = ost_san_brw(req, 0);
712                 /* ost_san_brw sends its own replies */
713                 RETURN(rc);
714         case OST_SAN_WRITE:
715                 CDEBUG(D_INODE, "san write\n");
716                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
717                 rc = ost_san_brw(req, 1);
718                 /* ost_san_brw sends its own replies */
719                 RETURN(rc);
720         case OST_PUNCH:
721                 CDEBUG(D_INODE, "punch\n");
722                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
723                 rc = ost_punch(req, oti);
724                 break;
725         case OST_STATFS:
726                 CDEBUG(D_INODE, "statfs\n");
727                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
728                 rc = ost_statfs(req);
729                 break;
730         case OST_SYNCFS:
731                 CDEBUG(D_INODE, "sync\n");
732                 OBD_FAIL_RETURN(OBD_FAIL_OST_SYNCFS_NET, 0);
733                 rc = ost_syncfs(req);
734                 break;
735         case LDLM_ENQUEUE:
736                 CDEBUG(D_INODE, "enqueue\n");
737                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
738                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
739                                          ldlm_server_blocking_ast);
740                 break;
741         case LDLM_CONVERT:
742                 CDEBUG(D_INODE, "convert\n");
743                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
744                 rc = ldlm_handle_convert(req);
745                 break;
746         case LDLM_CANCEL:
747                 CDEBUG(D_INODE, "cancel\n");
748                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
749                 rc = ldlm_handle_cancel(req);
750                 break;
751         case LDLM_BL_CALLBACK:
752         case LDLM_CP_CALLBACK:
753                 CDEBUG(D_INODE, "callback\n");
754                 CERROR("callbacks should not happen on OST\n");
755                 LBUG();
756                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
757                 break;
758         default:
759                 req->rq_status = -ENOTSUPP;
760                 rc = ptlrpc_error(req->rq_svc, req);
761                 RETURN(rc);
762         }
763
764         EXIT;
765         /* If we're DISCONNECTing, the export_data is already freed */
766         if (!rc && req->rq_reqmsg->opc != OST_DISCONNECT) {
767                 struct obd_device *obd  = req->rq_export->exp_obd;
768                 if ((obd->obd_flags & OBD_NO_TRANSNO) == 0) {
769                         req->rq_repmsg->last_committed =
770                                 HTON__u64(obd->obd_last_committed);
771                 } else {
772                         DEBUG_REQ(D_IOCTL, req,
773                                   "not sending last_committed update");
774                 }
775                 CDEBUG(D_INFO, "last_committed "LPU64", xid "LPX64"\n",
776                        obd->obd_last_committed, HTON__u64(req->rq_xid));
777         }
778
779 out:
780         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
781                 struct obd_device *obd = req->rq_export->exp_obd;
782
783                 if (obd && (obd->obd_flags & OBD_RECOVERING)) {
784                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
785                         return target_queue_final_reply(req, rc);
786                 }
787                 /* Lost a race with recovery; let the error path DTRT. */
788                 rc = req->rq_status = -ENOTCONN;
789         }
790
791         if (rc) {
792                 CERROR("ost: processing error (opcode=%d): %d\n",
793                        req->rq_reqmsg->opc, rc);
794                 ptlrpc_error(req->rq_svc, req);
795         } else {
796                 CDEBUG(D_INODE, "sending reply\n");
797                 if (req->rq_repmsg == NULL)
798                         CERROR("handler for opcode %d returned rc=0 without "
799                                "creating rq_repmsg; needs to return rc != 0!\n",
800                                req->rq_reqmsg->opc);
801                 else
802                         oti_to_request(oti, req);
803                 ptlrpc_reply(req->rq_svc, req);
804         }
805
806         return 0;
807 }
808
809 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
810 {
811         struct ost_obd *ost = &obddev->u.ost;
812         int err;
813         int i;
814         ENTRY;
815
816         ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS,
817                                            OST_BUFSIZE, OST_MAXREQSIZE,
818                                            OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
819                                            ost_handle, "ost");
820         if (!ost->ost_service) {
821                 CERROR("failed to start service\n");
822                 GOTO(error_disc, err = -ENOMEM);
823         }
824
825         for (i = 0; i < OST_NUM_THREADS; i++) {
826                 char name[32];
827                 sprintf(name, "ll_ost_%02d", i);
828                 err = ptlrpc_start_thread(obddev, ost->ost_service, name);
829                 if (err) {
830                         CERROR("error starting thread #%d: rc %d\n", i, err);
831                         GOTO(error_disc, err = -EINVAL);
832                 }
833         }
834
835         RETURN(0);
836
837 error_disc:
838         RETURN(err);
839 }
840
841 static int ost_cleanup(struct obd_device * obddev)
842 {
843         struct ost_obd *ost = &obddev->u.ost;
844         int err = 0;
845
846         ENTRY;
847
848         ptlrpc_stop_all_threads(ost->ost_service);
849         ptlrpc_unregister_service(ost->ost_service);
850
851         RETURN(err);
852 }
853
854 int ost_attach(struct obd_device *dev, obd_count len, void *data)
855 {
856         struct lprocfs_static_vars lvars;
857
858         lprocfs_init_vars(&lvars);
859         return lprocfs_obd_attach(dev, lvars.obd_vars);
860 }
861
862 int ost_detach(struct obd_device *dev)
863 {
864         return lprocfs_obd_detach(dev);
865 }
866
867 /* I don't think this function is ever used, since nothing 
868  * connects directly to this module.
869  */
870 static int ost_connect(struct lustre_handle *conn,
871                        struct obd_device *obd, struct obd_uuid *cluuid,
872                        struct recovd_obd *recovd,
873                        ptlrpc_recovery_cb_t recover)
874 {
875         struct obd_export *exp;
876         int rc;
877         ENTRY;
878
879         if (!conn || !obd || !cluuid)
880                 RETURN(-EINVAL);
881
882         rc = class_connect(conn, obd, cluuid);
883         if (rc)
884                 RETURN(rc);
885         exp = class_conn2export(conn);
886         LASSERT(exp);
887
888         RETURN(0);
889 }
890
891 /* use obd ops to offer management infrastructure */
892 static struct obd_ops ost_obd_ops = {
893         o_owner:        THIS_MODULE,
894         o_attach:       ost_attach,
895         o_detach:       ost_detach,
896         o_setup:        ost_setup,
897         o_cleanup:      ost_cleanup,
898         o_connect:      ost_connect,
899 };
900
901 static int __init ost_init(void)
902 {
903         struct lprocfs_static_vars lvars;
904         ENTRY;
905
906         lprocfs_init_vars(&lvars);
907         RETURN(class_register_type(&ost_obd_ops, lvars.module_vars,
908                                    LUSTRE_OST_NAME));
909 }
910
911 static void __exit ost_exit(void)
912 {
913         class_unregister_type(LUSTRE_OST_NAME);
914 }
915
916 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
917 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
918 MODULE_LICENSE("GPL");
919
920 module_init(ost_init);
921 module_exit(ost_exit);