Whamcloud - gitweb
land 0.5.20.3 b_devel onto HEAD (b_devel will remain)
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  *  For testing and management it is treated as an obd_device,
23  *  although * it does not export a full OBD method table (the
24  *  requests are coming * in over the wire, so object target modules
25  *  do not have a full * method table.)
26  *
27  */
28
29 #define EXPORT_SYMTAB
30 #define DEBUG_SUBSYSTEM S_OSC
31
32 #ifdef __KERNEL__
33 #include <linux/version.h>
34 #include <linux/module.h>
35 #include <linux/mm.h>
36 #include <linux/highmem.h>
37 #include <linux/lustre_dlm.h>
38 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
39 #include <linux/workqueue.h>
40 #include <linux/smp_lock.h>
41 #else
42 #include <linux/locks.h>
43 #endif
44 #else
45 #include <liblustre.h>
46 #endif
47
48 #include <linux/kp30.h>
49 #include <linux/lustre_mds.h> /* for mds_objid */
50 #include <linux/obd_ost.h>
51 #include <linux/obd_lov.h> /* for IOC_LOV_SET_OSC_ACTIVE */
52 #include <linux/ctype.h>
53 #include <linux/init.h>
54 #include <linux/lustre_ha.h>
55 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
56 #include <linux/lustre_lite.h> /* for ll_i2info */
57 #include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
58 #include <linux/lprocfs_status.h>
59
60 /* It is important that ood_fh remain the first item in this structure: that
61  * way, we don't have to re-pack the obdo's inline data before we send it to
62  * the server, we can just send the whole struct unaltered. */
63 #define OSC_OBDO_DATA_MAGIC 0xD15EA5ED
64 struct osc_obdo_data {
65         struct lustre_handle ood_fh;
66         struct ptlrpc_request *ood_request;
67         __u32 ood_magic;
68 };
69 #include <linux/obd_lov.h> /* just for the startup assertion; is that wrong? */
70
71 static int send_sync(struct obd_import *imp, struct ll_fid *rootfid,
72                           int level, int msg_flags)
73 {
74         struct ptlrpc_request *req;
75         struct mds_body *body;
76         int rc, size = sizeof(*body);
77         ENTRY;
78
79         req = ptlrpc_prep_req(imp, OST_SYNCFS, 1, &size, NULL);
80         if (!req)
81                 GOTO(out, rc = -ENOMEM);
82
83         body = lustre_msg_buf(req->rq_reqmsg, 0);
84         req->rq_level = level;
85         req->rq_replen = lustre_msg_size(1, &size);
86
87         req->rq_reqmsg->flags |= msg_flags;
88         rc = ptlrpc_queue_wait(req);
89
90         if (!rc) {
91                 CDEBUG(D_NET, "last_committed="LPU64
92                        ", last_xid="LPU64"\n",
93                        req->rq_repmsg->last_committed,
94                        req->rq_repmsg->last_xid);
95         }
96
97         EXIT;
98  out:
99         ptlrpc_req_finished(req);
100         return rc;
101 }
102
103 static int signal_completed_replay(struct obd_import *imp)
104 {
105         struct ll_fid fid;
106
107         return send_sync(imp, &fid, LUSTRE_CONN_RECOVD, MSG_LAST_REPLAY);
108 }
109
110 static int osc_attach(struct obd_device *dev, obd_count len, void *data)
111 {
112         struct lprocfs_static_vars lvars;
113
114         lprocfs_init_vars(&lvars);
115         return lprocfs_obd_attach(dev, lvars.obd_vars);
116 }
117
118 static int osc_detach(struct obd_device *dev)
119 {
120         return lprocfs_obd_detach(dev);
121 }
122
123 /* Pack OSC object metadata for shipment to the MDS. */
124 static int osc_packmd(struct lustre_handle *conn, struct lov_mds_md **lmmp,
125                       struct lov_stripe_md *lsm)
126 {
127         int lmm_size;
128         ENTRY;
129
130         lmm_size = sizeof(**lmmp);
131         if (!lmmp)
132                 RETURN(lmm_size);
133
134         if (*lmmp && !lsm) {
135                 OBD_FREE(*lmmp, lmm_size);
136                 *lmmp = NULL;
137                 RETURN(0);
138         }
139
140         if (!*lmmp) {
141                 OBD_ALLOC(*lmmp, lmm_size);
142                 if (!*lmmp)
143                         RETURN(-ENOMEM);
144         }
145         if (lsm) {
146                 LASSERT(lsm->lsm_object_id);
147                 (*lmmp)->lmm_object_id = (lsm->lsm_object_id);
148         }
149
150         RETURN(lmm_size);
151 }
152
153 static int osc_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
154                         struct lov_mds_md *lmm)
155 {
156         int lsm_size;
157         ENTRY;
158
159         lsm_size = sizeof(**lsmp);
160         if (!lsmp)
161                 RETURN(lsm_size);
162
163         if (*lsmp && !lmm) {
164                 OBD_FREE(*lsmp, lsm_size);
165                 *lsmp = NULL;
166                 RETURN(0);
167         }
168
169         if (!*lsmp) {
170                 OBD_ALLOC(*lsmp, lsm_size);
171                 if (!*lsmp)
172                         RETURN(-ENOMEM);
173         }
174
175         /* XXX endianness */
176         if (lmm) {
177                 (*lsmp)->lsm_object_id = (lmm->lmm_object_id);
178                 LASSERT((*lsmp)->lsm_object_id);
179         }
180
181         RETURN(lsm_size);
182 }
183
184 inline void oti_from_request(struct obd_trans_info *oti,
185                              struct ptlrpc_request *req)
186 {
187         if (oti && req->rq_repmsg)
188                 oti->oti_transno = NTOH__u64(req->rq_repmsg->transno);
189         EXIT;
190 }
191
192 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
193                        struct lov_stripe_md *md)
194 {
195         struct ptlrpc_request *request;
196         struct ost_body *body;
197         int rc, size = sizeof(*body);
198         ENTRY;
199
200         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
201                                   &size, NULL);
202         if (!request)
203                 RETURN(-ENOMEM);
204
205         body = lustre_msg_buf(request->rq_reqmsg, 0);
206 #warning FIXME: pack only valid fields instead of memcpy, endianness
207         memcpy(&body->oa, oa, sizeof(*oa));
208
209         request->rq_replen = lustre_msg_size(1, &size);
210
211         rc = ptlrpc_queue_wait(request);
212         if (rc) {
213                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
214                 GOTO(out, rc);
215         }
216
217         body = lustre_msg_buf(request->rq_repmsg, 0);
218         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
219         memcpy(oa, &body->oa, sizeof(*oa));
220
221         EXIT;
222  out:
223         ptlrpc_req_finished(request);
224         return rc;
225 }
226
227 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
228                     struct lov_stripe_md *md, struct obd_trans_info *oti)
229 {
230         struct ptlrpc_request *request;
231         struct ost_body *body;
232         int rc, size = sizeof(*body);
233         ENTRY;
234
235         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
236                                   NULL);
237         if (!request)
238                 RETURN(-ENOMEM);
239
240         request->rq_flags |= PTL_RPC_FL_REPLAY;
241         body = lustre_msg_buf(request->rq_reqmsg, 0);
242 #warning FIXME: pack only valid fields instead of memcpy, endianness
243         memcpy(&body->oa, oa, sizeof(*oa));
244
245         request->rq_replen = lustre_msg_size(1, &size);
246
247         rc = ptlrpc_queue_wait(request);
248         if (rc)
249                 GOTO(out, rc);
250
251         if (oa) {
252                 struct osc_obdo_data ood;
253                 body = lustre_msg_buf(request->rq_repmsg, 0);
254                 memcpy(oa, &body->oa, sizeof(*oa));
255
256                 /* If the open succeeded, we better have a handle */
257                 /* BlueArc OSTs don't send back (o_valid | FLHANDLE).  sigh.
258                  * Temporary workaround until fixed. -phil 24 Feb 03 */
259                 //LASSERT(oa->o_valid & OBD_MD_FLHANDLE);
260                 oa->o_valid |= OBD_MD_FLHANDLE;
261
262                 memcpy(&ood.ood_fh, obdo_handle(oa), sizeof(ood.ood_fh));
263                 ood.ood_request = ptlrpc_request_addref(request);
264                 ood.ood_magic = OSC_OBDO_DATA_MAGIC;
265
266                 /* Save this data in the request; it will be passed back to us
267                  * in future obdos.  This memcpy is guaranteed to be safe,
268                  * because we check at compile-time that sizeof(ood) is smaller
269                  * than oa->o_inline. */
270                 memcpy(&oa->o_inline, &ood, sizeof(ood));
271         }
272
273         EXIT;
274  out:
275         ptlrpc_req_finished(request);
276         return rc;
277 }
278
279 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
280                      struct lov_stripe_md *md, struct obd_trans_info *oti)
281 {
282         struct obd_import *import = class_conn2cliimp(conn);
283         struct ptlrpc_request *request;
284         struct ost_body *body;
285         struct osc_obdo_data *ood;
286         unsigned long flags;
287         int rc, size = sizeof(*body);
288         ENTRY;
289
290         LASSERT(oa != NULL);
291         ood = (struct osc_obdo_data *)&oa->o_inline;
292         LASSERT(ood->ood_magic == OSC_OBDO_DATA_MAGIC);
293
294         request = ptlrpc_prep_req(import, OST_CLOSE, 1, &size, NULL);
295         if (!request)
296                 RETURN(-ENOMEM);
297
298         body = lustre_msg_buf(request->rq_reqmsg, 0);
299 #warning FIXME: pack only valid fields instead of memcpy, endianness
300         memcpy(&body->oa, oa, sizeof(*oa));
301
302         request->rq_replen = lustre_msg_size(1, &size);
303
304         rc = ptlrpc_queue_wait(request);
305         if (rc) {
306                 /* FIXME: Does this mean that the file is still open locally?
307                  * If not, and I somehow suspect not, we need to cleanup
308                  * below */
309                 GOTO(out, rc);
310         }
311
312         spin_lock_irqsave(&import->imp_lock, flags);
313         ood->ood_request->rq_flags &= ~PTL_RPC_FL_REPLAY;
314         /* see comments in llite/file.c:ll_mdc_close() */
315         if (ood->ood_request->rq_transno) {
316                 LBUG(); /* this can't happen yet */
317                 if (!request->rq_transno) {
318                         request->rq_transno = ood->ood_request->rq_transno;
319                         ptlrpc_retain_replayable_request(request, import);
320                 }
321                 spin_unlock_irqrestore(&import->imp_lock, flags);
322         } else {
323                 spin_unlock_irqrestore(&import->imp_lock, flags);
324                 ptlrpc_req_finished(ood->ood_request);
325         }
326
327         body = lustre_msg_buf(request->rq_repmsg, 0);
328         memcpy(oa, &body->oa, sizeof(*oa));
329
330         EXIT;
331  out:
332         ptlrpc_req_finished(request);
333         return rc;
334 }
335
336 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
337                        struct lov_stripe_md *md, struct obd_trans_info *oti)
338 {
339         struct ptlrpc_request *request;
340         struct ost_body *body;
341         int rc, size = sizeof(*body);
342         ENTRY;
343
344         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
345                                   &size, NULL);
346         if (!request)
347                 RETURN(-ENOMEM);
348
349         body = lustre_msg_buf(request->rq_reqmsg, 0);
350         memcpy(&body->oa, oa, sizeof(*oa));
351
352         request->rq_replen = lustre_msg_size(1, &size);
353
354         rc = ptlrpc_queue_wait(request);
355
356         ptlrpc_req_finished(request);
357         return rc;
358 }
359
360 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
361                       struct lov_stripe_md **ea, struct obd_trans_info *oti_in)
362 {
363         struct ptlrpc_request *request;
364         struct ost_body *body;
365         struct lov_stripe_md *lsm;
366         struct obd_trans_info *oti, trans_info;
367         int rc, size = sizeof(*body);
368         ENTRY;
369
370         LASSERT(oa);
371         LASSERT(ea);
372
373         lsm = *ea;
374         if (!lsm) {
375                 rc = obd_alloc_memmd(conn, &lsm);
376                 if (rc < 0)
377                         RETURN(rc);
378         }
379
380         if (oti_in)
381                 oti = oti_in;
382         else
383                 oti = &trans_info;
384
385         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
386                                   NULL);
387         if (!request)
388                 GOTO(out, rc = -ENOMEM);
389
390         body = lustre_msg_buf(request->rq_reqmsg, 0);
391         memcpy(&body->oa, oa, sizeof(*oa));
392
393         request->rq_replen = lustre_msg_size(1, &size);
394
395         rc = ptlrpc_queue_wait(request);
396         if (rc)
397                 GOTO(out_req, rc);
398
399         body = lustre_msg_buf(request->rq_repmsg, 0);
400         memcpy(oa, &body->oa, sizeof(*oa));
401
402         lsm->lsm_object_id = oa->o_id;
403         lsm->lsm_stripe_count = 0;
404         *ea = lsm;
405
406         oti_from_request(oti, request);
407         CDEBUG(D_HA, "transno: "LPD64"\n", oti->oti_transno);
408         EXIT;
409 out_req:
410         ptlrpc_req_finished(request);
411 out:
412         if (rc && !*ea)
413                 obd_free_memmd(conn, &lsm);
414         return rc;
415 }
416
417 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
418                      struct lov_stripe_md *md, obd_size start,
419                      obd_size end, struct obd_trans_info *oti)
420 {
421         struct ptlrpc_request *request;
422         struct ost_body *body;
423         int rc, size = sizeof(*body);
424         ENTRY;
425
426         if (!oa) {
427                 CERROR("oa NULL\n");
428                 RETURN(-EINVAL);
429         }
430
431         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
432                                   NULL);
433         if (!request)
434                 RETURN(-ENOMEM);
435
436         body = lustre_msg_buf(request->rq_reqmsg, 0);
437 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
438         memcpy(&body->oa, oa, sizeof(*oa));
439
440         /* overload the size and blocks fields in the oa with start/end */
441         body->oa.o_size = HTON__u64(start);
442         body->oa.o_blocks = HTON__u64(end);
443         body->oa.o_valid |= HTON__u32(OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
444
445         request->rq_replen = lustre_msg_size(1, &size);
446
447         rc = ptlrpc_queue_wait(request);
448         if (rc)
449                 GOTO(out, rc);
450
451         body = lustre_msg_buf(request->rq_repmsg, 0);
452         memcpy(oa, &body->oa, sizeof(*oa));
453
454         EXIT;
455  out:
456         ptlrpc_req_finished(request);
457         return rc;
458 }
459
460 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
461                        struct lov_stripe_md *ea, struct obd_trans_info *oti)
462 {
463         struct ptlrpc_request *request;
464         struct ost_body *body;
465         int rc, size = sizeof(*body);
466         ENTRY;
467
468         if (!oa) {
469                 CERROR("oa NULL\n");
470                 RETURN(-EINVAL);
471         }
472         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
473                                   &size, NULL);
474         if (!request)
475                 RETURN(-ENOMEM);
476
477         body = lustre_msg_buf(request->rq_reqmsg, 0);
478 #warning FIXME: pack only valid fields instead of memcpy, endianness
479         memcpy(&body->oa, oa, sizeof(*oa));
480
481         request->rq_replen = lustre_msg_size(1, &size);
482
483         rc = ptlrpc_queue_wait(request);
484         if (rc)
485                 GOTO(out, rc);
486
487         body = lustre_msg_buf(request->rq_repmsg, 0);
488         memcpy(oa, &body->oa, sizeof(*oa));
489
490         EXIT;
491  out:
492         ptlrpc_req_finished(request);
493         return rc;
494 }
495
496 /* Our bulk-unmapping bottom half. */
497 static void unmap_and_decref_bulk_desc(void *data)
498 {
499         struct ptlrpc_bulk_desc *desc = data;
500         struct list_head *tmp;
501         ENTRY;
502
503         list_for_each(tmp, &desc->bd_page_list) {
504                 struct ptlrpc_bulk_page *bulk;
505                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
506
507                 kunmap(bulk->bp_page);
508                 obd_kmap_put(1);
509         }
510
511         ptlrpc_bulk_decref(desc);
512         EXIT;
513 }
514
515
516 /*  this is the callback function which is invoked by the Portals
517  *  event handler associated with the bulk_sink queue and bulk_source queue.
518  */
519 static void osc_ptl_ev_hdlr(struct ptlrpc_bulk_desc *desc)
520 {
521         ENTRY;
522
523         LASSERT(desc->bd_brw_set != NULL);
524         LASSERT(desc->bd_brw_set->brw_callback != NULL);
525
526         desc->bd_brw_set->brw_callback(desc->bd_brw_set, CB_PHASE_FINISH);
527
528         /* We can't kunmap the desc from interrupt context, so we do it from
529          * the bottom half above. */
530         prepare_work(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
531         schedule_work(&desc->bd_queue);
532
533         EXIT;
534 }
535
536 /*
537  * This is called when there was a bulk error return.  However, we don't know
538  * whether the bulk completed or not.  We cancel the portals bulk descriptors,
539  * so that if the OST decides to send them later we don't double free.  Then
540  * remove this descriptor from the set so that the set callback doesn't wait
541  * forever for the last CB_PHASE_FINISH to be called, and finally dump all of
542  * the bulk descriptor references.
543  */
544 static void osc_ptl_ev_abort(struct ptlrpc_bulk_desc *desc)
545 {
546         ENTRY;
547
548         LASSERT(desc->bd_brw_set != NULL);
549
550         ptlrpc_abort_bulk(desc);
551         obd_brw_set_del(desc);
552         unmap_and_decref_bulk_desc(desc);
553
554         EXIT;
555 }
556
557 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
558                         obd_count page_count, struct brw_page *pga,
559                         struct obd_brw_set *set)
560 {
561         struct obd_import *imp = class_conn2cliimp(conn);
562         struct ptlrpc_connection *connection = imp->imp_connection;
563         struct ptlrpc_request *request = NULL;
564         struct ptlrpc_bulk_desc *desc = NULL;
565         struct ost_body *body;
566         int rc, size[3] = {sizeof(*body)}, mapped = 0;
567         struct obd_ioobj *iooptr;
568         void *nioptr;
569         __u32 xid;
570         ENTRY;
571
572 restart_bulk:
573         size[1] = sizeof(struct obd_ioobj);
574         size[2] = page_count * sizeof(struct niobuf_remote);
575
576         request = ptlrpc_prep_req(imp, OST_READ, 3, size, NULL);
577         if (!request)
578                 RETURN(-ENOMEM);
579
580         body = lustre_msg_buf(request->rq_reqmsg, 0);
581         body->oa.o_valid = HTON__u32(OBD_MD_FLCKSUM * CHECKSUM_BULK);
582
583         desc = ptlrpc_prep_bulk(connection);
584         if (!desc)
585                 GOTO(out_req, rc = -ENOMEM);
586         desc->bd_portal = OST_BULK_PORTAL;
587         desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
588         CDEBUG(D_PAGE, "desc = %p\n", desc);
589
590         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
591         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
592         ost_pack_ioo(&iooptr, lsm, page_count);
593         /* end almost identical to brw_write case */
594
595         xid = ptlrpc_next_xid();       /* single xid for all pages */
596
597         obd_kmap_get(page_count, 0);
598
599         for (mapped = 0; mapped < page_count; mapped++) {
600                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
601                 if (bulk == NULL) {
602                         unmap_and_decref_bulk_desc(desc);
603                         GOTO(out_req, rc = -ENOMEM);
604                 }
605
606                 bulk->bp_xid = xid;           /* single xid for all pages */
607
608                 bulk->bp_buf = kmap(pga[mapped].pg);
609                 bulk->bp_page = pga[mapped].pg;
610                 bulk->bp_buflen = PAGE_SIZE;
611                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
612                                 pga[mapped].flag, bulk->bp_xid);
613         }
614
615         /*
616          * Register the bulk first, because the reply could arrive out of order,
617          * and we want to be ready for the bulk data.
618          *
619          * One reference is released when osc_ptl_ev_hdlr() is called by
620          * portals, the other when the caller removes us from the "set" list.
621          *
622          * On error, we never do the brw_finish, so we handle all decrefs.
623          */
624         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
625                 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
626                        OBD_FAIL_OSC_BRW_READ_BULK);
627         } else {
628                 rc = ptlrpc_register_bulk_put(desc);
629                 if (rc) {
630                         unmap_and_decref_bulk_desc(desc);
631                         GOTO(out_req, rc);
632                 }
633                 obd_brw_set_add(set, desc);
634         }
635
636         request->rq_flags |= PTL_RPC_FL_NO_RESEND;
637         request->rq_replen = lustre_msg_size(1, size);
638         rc = ptlrpc_queue_wait(request);
639
640         /* XXX bug 937 here */
641         if (rc == -ETIMEDOUT && (request->rq_flags & PTL_RPC_FL_RESEND)) {
642                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
643                 ptlrpc_req_finished(request);
644                 goto restart_bulk;
645         }
646
647         if (rc) {
648                 osc_ptl_ev_abort(desc);
649                 GOTO(out_req, rc);
650         }
651
652 #if CHECKSUM_BULK
653         body = lustre_msg_buf(request->rq_repmsg, 0);
654         if (body->oa.o_valid & NTOH__u32(OBD_MD_FLCKSUM)) {
655                 static int cksum_counter;
656                 __u64 server_cksum = NTOH__u64(body->oa.o_rdev);
657                 __u64 cksum = 0;
658
659                 for (mapped = 0; mapped < page_count; mapped++) {
660                         char *ptr = kmap(pga[mapped].pg);
661                         int   off = pga[mapped].off & (PAGE_SIZE - 1);
662                         int   len = pga[mapped].count;
663
664                         LASSERT(off + len <= PAGE_SIZE);
665                         ost_checksum(&cksum, ptr + off, len);
666                         kunmap(pga[mapped].pg);
667                 }
668
669                 cksum_counter++;
670                 if (server_cksum != cksum) {
671                         CERROR("Bad checksum: server "LPX64", client "LPX64
672                                ", server NID "LPX64"\n", server_cksum, cksum,
673                                imp->imp_connection->c_peer.peer_nid);
674                         cksum_counter = 0;
675                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter)
676                         CERROR("Checksum %u from "LPX64" OK: "LPX64"\n",
677                                cksum_counter,
678                                imp->imp_connection->c_peer.peer_nid, cksum);
679         } else {
680                 static int cksum_missed;
681                 cksum_missed++;
682                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
683                         CERROR("Request checksum %u from "LPX64", no reply\n",
684                                cksum_missed,
685                                imp->imp_connection->c_peer.peer_nid);
686         }
687 #endif
688
689         EXIT;
690  out_req:
691         ptlrpc_req_finished(request);
692         return rc;
693 }
694
695 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *lsm,
696                          obd_count page_count, struct brw_page *pga,
697                          struct obd_brw_set *set, struct obd_trans_info *oti)
698 {
699         struct obd_import *imp = class_conn2cliimp(conn);
700         struct ptlrpc_connection *connection = imp->imp_connection;
701         struct ptlrpc_request *request = NULL;
702         struct ptlrpc_bulk_desc *desc = NULL;
703         struct ost_body *body;
704         int rc, size[3] = {sizeof(*body)}, mapped = 0;
705         struct obd_ioobj *iooptr;
706         void *nioptr;
707         __u32 xid;
708 #if CHECKSUM_BULK
709         __u64 cksum = 0;
710 #endif
711         ENTRY;
712
713 restart_bulk:
714         size[1] = sizeof(struct obd_ioobj);
715         size[2] = page_count * sizeof(struct niobuf_remote);
716
717         request = ptlrpc_prep_req(imp, OST_WRITE, 3, size, NULL);
718         if (!request)
719                 RETURN(-ENOMEM);
720
721         body = lustre_msg_buf(request->rq_reqmsg, 0);
722
723         desc = ptlrpc_prep_bulk(connection);
724         if (!desc)
725                 GOTO(out_req, rc = -ENOMEM);
726         desc->bd_portal = OSC_BULK_PORTAL;
727         desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
728         CDEBUG(D_PAGE, "desc = %p\n", desc);
729
730         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
731         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
732         ost_pack_ioo(&iooptr, lsm, page_count);
733         /* end almost identical to brw_read case */
734
735         xid = ptlrpc_next_xid();       /* single xid for all pages */
736
737         obd_kmap_get(page_count, 0);
738
739         for (mapped = 0; mapped < page_count; mapped++) {
740                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
741                 if (bulk == NULL) {
742                         unmap_and_decref_bulk_desc(desc);
743                         GOTO(out_req, rc = -ENOMEM);
744                 }
745
746                 bulk->bp_xid = xid;           /* single xid for all pages */
747
748                 bulk->bp_buf = kmap(pga[mapped].pg);
749                 bulk->bp_page = pga[mapped].pg;
750                 bulk->bp_buflen = pga[mapped].count;
751                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
752                                 pga[mapped].flag, bulk->bp_xid);
753                 ost_checksum(&cksum, bulk->bp_buf, bulk->bp_buflen);
754         }
755
756 #if CHECKSUM_BULK
757         body->oa.o_rdev = HTON__u64(cksum);
758         body->oa.o_valid |= HTON__u32(OBD_MD_FLCKSUM);
759 #endif
760         /*
761          * Register the bulk first, because the reply could arrive out of
762          * order, and we want to be ready for the bulk data.
763          *
764          * One reference is released when brw_finish is complete, the other
765          * when the caller removes us from the "set" list.
766          *
767          * On error, we never do the brw_finish, so we handle all decrefs.
768          */
769         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK)) {
770                 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
771                        OBD_FAIL_OSC_BRW_WRITE_BULK);
772         } else {
773                 rc = ptlrpc_register_bulk_get(desc);
774                 if (rc) {
775                         unmap_and_decref_bulk_desc(desc);
776                         GOTO(out_req, rc);
777                 }
778                 obd_brw_set_add(set, desc);
779         }
780
781         request->rq_flags |= PTL_RPC_FL_NO_RESEND;
782         request->rq_replen = lustre_msg_size(1, size);
783         rc = ptlrpc_queue_wait(request);
784
785         /* XXX bug 937 here */
786         if (rc == -ETIMEDOUT && (request->rq_flags & PTL_RPC_FL_RESEND)) {
787                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
788                 ptlrpc_req_finished(request);
789                 goto restart_bulk;
790         }
791
792         if (rc) {
793                 osc_ptl_ev_abort(desc);
794                 GOTO(out_req, rc);
795         }
796
797         EXIT;
798  out_req:
799         ptlrpc_req_finished(request);
800         return rc;
801 }
802
803 #ifndef min_t
804 #define min_t(a,b,c) ( b<c ) ? b : c
805 #endif
806
807 #warning "FIXME: make values dynamic based on get_info at setup (bug 665)"
808 #define OSC_BRW_MAX_SIZE 65536
809 #define OSC_BRW_MAX_IOV min_t(int, PTL_MD_MAX_IOV, OSC_BRW_MAX_SIZE/PAGE_SIZE)
810
811 static int osc_brw(int cmd, struct lustre_handle *conn,
812                    struct lov_stripe_md *md, obd_count page_count,
813                    struct brw_page *pga, struct obd_brw_set *set,
814                    struct obd_trans_info *oti)
815 {
816         ENTRY;
817
818         while (page_count) {
819                 obd_count pages_per_brw;
820                 int rc;
821
822                 if (page_count > OSC_BRW_MAX_IOV)
823                         pages_per_brw = OSC_BRW_MAX_IOV;
824                 else
825                         pages_per_brw = page_count;
826
827                 if (cmd & OBD_BRW_WRITE)
828                         rc = osc_brw_write(conn, md, pages_per_brw, pga,
829                                            set, oti);
830                 else
831                         rc = osc_brw_read(conn, md, pages_per_brw, pga, set);
832
833                 if (rc != 0)
834                         RETURN(rc);
835
836                 page_count -= pages_per_brw;
837                 pga += pages_per_brw;
838         }
839         RETURN(0);
840 }
841
842 #ifdef __KERNEL__
843 /* Note: caller will lock/unlock, and set uptodate on the pages */
844 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
845 static int sanosc_brw_read(struct lustre_handle *conn,
846                            struct lov_stripe_md *md,
847                            obd_count page_count,
848                            struct brw_page *pga,
849                            struct obd_brw_set *set)
850 {
851         struct ptlrpc_request *request = NULL;
852         struct ost_body *body;
853         struct niobuf_remote *remote, *nio_rep;
854         int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
855         struct obd_ioobj *iooptr;
856         void *nioptr;
857         ENTRY;
858
859         size[1] = sizeof(struct obd_ioobj);
860         size[2] = page_count * sizeof(*remote);
861
862         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_READ, 3,
863                                   size, NULL);
864         if (!request)
865                 RETURN(-ENOMEM);
866
867         body = lustre_msg_buf(request->rq_reqmsg, 0);
868         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
869         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
870         ost_pack_ioo(&iooptr, md, page_count);
871
872         obd_kmap_get(page_count, 0);
873
874         for (mapped = 0; mapped < page_count; mapped++) {
875                 LASSERT(PageLocked(pga[mapped].pg));
876
877                 kmap(pga[mapped].pg);
878                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
879                                 pga[mapped].flag, 0);
880         }
881
882         size[1] = page_count * sizeof(*remote);
883         request->rq_replen = lustre_msg_size(2, size);
884
885         rc = ptlrpc_queue_wait(request);
886         if (rc)
887                 GOTO(out_unmap, rc);
888
889         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
890         if (!nioptr)
891                 GOTO(out_unmap, rc = -EINVAL);
892
893         if (request->rq_repmsg->buflens[1] != size[1]) {
894                 CERROR("buffer length wrong (%d vs. %d)\n",
895                        request->rq_repmsg->buflens[1], size[1]);
896                 GOTO(out_unmap, rc = -EINVAL);
897         }
898
899         for (j = 0; j < page_count; j++) {
900                 ost_unpack_niobuf(&nioptr, &remote);
901         }
902
903         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
904         nio_rep = (struct niobuf_remote*)nioptr;
905
906         /* actual read */
907         for (j = 0; j < page_count; j++) {
908                 struct page *page = pga[j].pg;
909                 struct buffer_head *bh;
910                 kdev_t dev;
911
912                 /* got san device associated */
913                 LASSERT(class_conn2obd(conn));
914                 dev = class_conn2obd(conn)->u.cli.cl_sandev;
915
916                 /* hole */
917                 if (!nio_rep[j].offset) {
918                         CDEBUG(D_PAGE, "hole at ino %lu; index %ld\n",
919                                         page->mapping->host->i_ino,
920                                         page->index);
921                         memset(page_address(page), 0, PAGE_SIZE);
922                         continue;
923                 }
924
925                 if (!page->buffers) {
926                         create_empty_buffers(page, dev, PAGE_SIZE);
927                         bh = page->buffers;
928
929                         clear_bit(BH_New, &bh->b_state);
930                         set_bit(BH_Mapped, &bh->b_state);
931                         bh->b_blocknr = (unsigned long)nio_rep[j].offset;
932
933                         clear_bit(BH_Uptodate, &bh->b_state);
934
935                         ll_rw_block(READ, 1, &bh);
936                 } else {
937                         bh = page->buffers;
938
939                         /* if buffer already existed, it must be the
940                          * one we mapped before, check it */
941                         LASSERT(!test_bit(BH_New, &bh->b_state));
942                         LASSERT(test_bit(BH_Mapped, &bh->b_state));
943                         LASSERT(bh->b_blocknr ==
944                                 (unsigned long)nio_rep[j].offset);
945
946                         /* wait it's io completion */
947                         if (test_bit(BH_Lock, &bh->b_state))
948                                 wait_on_buffer(bh);
949
950                         if (!test_bit(BH_Uptodate, &bh->b_state))
951                                 ll_rw_block(READ, 1, &bh);
952                 }
953
954
955                 /* must do syncronous write here */
956                 wait_on_buffer(bh);
957                 if (!buffer_uptodate(bh)) {
958                         /* I/O error */
959                         rc = -EIO;
960                         goto out_unmap;
961                 }
962         }
963
964 out_req:
965         ptlrpc_req_finished(request);
966         RETURN(rc);
967
968 out_unmap:
969         /* Clean up on error. */
970         while (mapped-- > 0)
971                 kunmap(pga[mapped].pg);
972
973         obd_kmap_put(page_count);
974
975         goto out_req;
976 }
977
978 static int sanosc_brw_write(struct lustre_handle *conn,
979                             struct lov_stripe_md *md,
980                             obd_count page_count,
981                             struct brw_page *pga,
982                             struct obd_brw_set *set)
983 {
984         struct ptlrpc_request *request = NULL;
985         struct ost_body *body;
986         struct niobuf_remote *remote, *nio_rep;
987         int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
988         struct obd_ioobj *iooptr;
989         void *nioptr;
990         ENTRY;
991
992         size[1] = sizeof(struct obd_ioobj);
993         size[2] = page_count * sizeof(*remote);
994
995         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_WRITE,
996                                   3, size, NULL);
997         if (!request)
998                 RETURN(-ENOMEM);
999
1000         body = lustre_msg_buf(request->rq_reqmsg, 0);
1001         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
1002         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
1003         ost_pack_ioo(&iooptr, md, page_count);
1004
1005         /* map pages, and pack request */
1006         obd_kmap_get(page_count, 0);
1007         for (mapped = 0; mapped < page_count; mapped++) {
1008                 LASSERT(PageLocked(pga[mapped].pg));
1009
1010                 kmap(pga[mapped].pg);
1011                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
1012                                 pga[mapped].flag, 0);
1013         }
1014
1015         size[1] = page_count * sizeof(*remote);
1016         request->rq_replen = lustre_msg_size(2, size);
1017
1018         rc = ptlrpc_queue_wait(request);
1019         if (rc)
1020                 GOTO(out_unmap, rc);
1021
1022         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
1023         if (!nioptr)
1024                 GOTO(out_unmap, rc = -EINVAL);
1025
1026         if (request->rq_repmsg->buflens[1] != size[1]) {
1027                 CERROR("buffer length wrong (%d vs. %d)\n",
1028                        request->rq_repmsg->buflens[1], size[1]);
1029                 GOTO(out_unmap, rc = -EINVAL);
1030         }
1031
1032         for (j = 0; j < page_count; j++) {
1033                 ost_unpack_niobuf(&nioptr, &remote);
1034         }
1035
1036         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
1037         nio_rep = (struct niobuf_remote*)nioptr;
1038
1039         /* actual write */
1040         for (j = 0; j < page_count; j++) {
1041                 struct page *page = pga[j].pg;
1042                 struct buffer_head *bh;
1043                 kdev_t dev;
1044
1045                 /* got san device associated */
1046                 LASSERT(class_conn2obd(conn));
1047                 dev = class_conn2obd(conn)->u.cli.cl_sandev;
1048
1049                 if (!page->buffers) {
1050                         create_empty_buffers(page, dev, PAGE_SIZE);
1051                 } else {
1052                         /* checking */
1053                         LASSERT(!test_bit(BH_New, &page->buffers->b_state));
1054                         LASSERT(test_bit(BH_Mapped, &page->buffers->b_state));
1055                         LASSERT(page->buffers->b_blocknr ==
1056                                 (unsigned long)nio_rep[j].offset);
1057                 }
1058                 bh = page->buffers;
1059
1060                 LASSERT(bh);
1061
1062                 /* if buffer locked, wait it's io completion */
1063                 if (test_bit(BH_Lock, &bh->b_state))
1064                         wait_on_buffer(bh);
1065
1066                 clear_bit(BH_New, &bh->b_state);
1067                 set_bit(BH_Mapped, &bh->b_state);
1068
1069                 /* override the block nr */
1070                 bh->b_blocknr = (unsigned long)nio_rep[j].offset;
1071
1072                 /* we are about to write it, so set it
1073                  * uptodate/dirty
1074                  * page lock should garentee no race condition here */
1075                 set_bit(BH_Uptodate, &bh->b_state);
1076                 set_bit(BH_Dirty, &bh->b_state);
1077
1078                 ll_rw_block(WRITE, 1, &bh);
1079
1080                 /* must do syncronous write here */
1081                 wait_on_buffer(bh);
1082                 if (!buffer_uptodate(bh) || test_bit(BH_Dirty, &bh->b_state)) {
1083                         /* I/O error */
1084                         rc = -EIO;
1085                         goto out_unmap;
1086                 }
1087         }
1088
1089 out_req:
1090         ptlrpc_req_finished(request);
1091         RETURN(rc);
1092
1093 out_unmap:
1094         /* Clean up on error. */
1095         while (mapped-- > 0)
1096                 kunmap(pga[mapped].pg);
1097
1098         obd_kmap_put(page_count);
1099
1100         goto out_req;
1101 }
1102 #else
1103 static int sanosc_brw_read(struct lustre_handle *conn,
1104                            struct lov_stripe_md *md,
1105                            obd_count page_count,
1106                            struct brw_page *pga,
1107                            struct obd_brw_set *set)
1108 {
1109         LBUG();
1110         return 0;
1111 }
1112
1113 static int sanosc_brw_write(struct lustre_handle *conn,
1114                             struct lov_stripe_md *md,
1115                             obd_count page_count,
1116                             struct brw_page *pga,
1117                             struct obd_brw_set *set)
1118 {
1119         LBUG();
1120         return 0;
1121 }
1122 #endif
1123
1124 static int sanosc_brw(int cmd, struct lustre_handle *conn,
1125                       struct lov_stripe_md *md, obd_count page_count,
1126                       struct brw_page *pga, struct obd_brw_set *set,
1127                       struct obd_trans_info *oti)
1128 {
1129         ENTRY;
1130
1131         while (page_count) {
1132                 obd_count pages_per_brw;
1133                 int rc;
1134
1135                 if (page_count > OSC_BRW_MAX_IOV)
1136                         pages_per_brw = OSC_BRW_MAX_IOV;
1137                 else
1138                         pages_per_brw = page_count;
1139
1140                 if (cmd & OBD_BRW_WRITE)
1141                         rc = sanosc_brw_write(conn, md, pages_per_brw,
1142                                               pga, set);
1143                 else
1144                         rc = sanosc_brw_read(conn, md, pages_per_brw, pga, set);
1145
1146                 if (rc != 0)
1147                         RETURN(rc);
1148
1149                 page_count -= pages_per_brw;
1150                 pga += pages_per_brw;
1151         }
1152         RETURN(0);
1153 }
1154 #endif
1155
1156 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
1157                        struct lustre_handle *parent_lock,
1158                        __u32 type, void *extentp, int extent_len, __u32 mode,
1159                        int *flags, void *callback, void *data, int datalen,
1160                        struct lustre_handle *lockh)
1161 {
1162         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
1163         struct obd_device *obddev = class_conn2obd(connh);
1164         struct ldlm_extent *extent = extentp;
1165         int rc;
1166         ENTRY;
1167
1168         /* Filesystem locks are given a bit of special treatment: if
1169          * this is not a file size lock (which has end == -1), we
1170          * fixup the lock to start and end on page boundaries. */
1171         if (extent->end != OBD_OBJECT_EOF) {
1172                 extent->start &= PAGE_MASK;
1173                 extent->end = (extent->end & PAGE_MASK) + PAGE_SIZE - 1;
1174         }
1175
1176         /* Next, search for already existing extent locks that will cover us */
1177         rc = ldlm_lock_match(obddev->obd_namespace, 0, &res_id, type, extent,
1178                              sizeof(extent), mode, lockh);
1179         if (rc == 1)
1180                 /* We already have a lock, and it's referenced */
1181                 RETURN(ELDLM_OK);
1182
1183         /* If we're trying to read, we also search for an existing PW lock.  The
1184          * VFS and page cache already protect us locally, so lots of readers/
1185          * writers can share a single PW lock.
1186          *
1187          * There are problems with conversion deadlocks, so instead of
1188          * converting a read lock to a write lock, we'll just enqueue a new
1189          * one.
1190          *
1191          * At some point we should cancel the read lock instead of making them
1192          * send us a blocking callback, but there are problems with canceling
1193          * locks out from other users right now, too. */
1194
1195         if (mode == LCK_PR) {
1196                 rc = ldlm_lock_match(obddev->obd_namespace, 0, &res_id, type,
1197                                      extent, sizeof(extent), LCK_PW, lockh);
1198                 if (rc == 1) {
1199                         /* FIXME: This is not incredibly elegant, but it might
1200                          * be more elegant than adding another parameter to
1201                          * lock_match.  I want a second opinion. */
1202                         ldlm_lock_addref(lockh, LCK_PR);
1203                         ldlm_lock_decref(lockh, LCK_PW);
1204
1205                         RETURN(ELDLM_OK);
1206                 }
1207         }
1208
1209         rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
1210                               res_id, type, extent, sizeof(extent), mode, flags,
1211                               ldlm_completion_ast, callback, data, NULL,
1212                               lockh);
1213         RETURN(rc);
1214 }
1215
1216 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
1217                       __u32 mode, struct lustre_handle *lockh)
1218 {
1219         ENTRY;
1220
1221         ldlm_lock_decref(lockh, mode);
1222
1223         RETURN(0);
1224 }
1225
1226 static int osc_cancel_unused(struct lustre_handle *connh,
1227                              struct lov_stripe_md *lsm, int flags)
1228 {
1229         struct obd_device *obddev = class_conn2obd(connh);
1230         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
1231
1232         return ldlm_cli_cancel_unused(obddev->obd_namespace, &res_id, flags);
1233 }
1234
1235 static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
1236 {
1237         struct ptlrpc_request *request;
1238         int rc, size = sizeof(*osfs);
1239         ENTRY;
1240
1241         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
1242                                   NULL);
1243         if (!request)
1244                 RETURN(-ENOMEM);
1245
1246         request->rq_replen = lustre_msg_size(1, &size);
1247
1248         rc = ptlrpc_queue_wait(request);
1249         if (rc) {
1250                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
1251                 GOTO(out, rc);
1252         }
1253
1254         obd_statfs_unpack(osfs, lustre_msg_buf(request->rq_repmsg, 0));
1255
1256         EXIT;
1257  out:
1258         ptlrpc_req_finished(request);
1259         return rc;
1260 }
1261
1262 /* Retrieve object striping information.
1263  *
1264  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
1265  * the maximum number of OST indices which will fit in the user buffer.
1266  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
1267  */
1268 static int osc_getstripe(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1269                          struct lov_mds_md *lmmu)
1270 {
1271         struct lov_mds_md lmm, *lmmk;
1272         int rc, lmm_size;
1273         ENTRY;
1274
1275         if (!lsm)
1276                 RETURN(-ENODATA);
1277
1278         rc = copy_from_user(&lmm, lmmu, sizeof(lmm));
1279         if (rc)
1280                 RETURN(-EFAULT);
1281
1282         if (lmm.lmm_magic != LOV_MAGIC)
1283                 RETURN(-EINVAL);
1284
1285         if (lmm.lmm_ost_count < 1)
1286                 RETURN(-EOVERFLOW);
1287
1288         lmm_size = sizeof(lmm) + sizeof(lmm.lmm_objects[0]);
1289         OBD_ALLOC(lmmk, lmm_size);
1290         if (rc < 0)
1291                 RETURN(rc);
1292
1293         lmmk->lmm_stripe_count = 1;
1294         lmmk->lmm_ost_count = 1;
1295         lmmk->lmm_object_id = lsm->lsm_object_id;
1296         lmmk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
1297
1298         if (copy_to_user(lmmu, lmmk, lmm_size))
1299                 rc = -EFAULT;
1300
1301         OBD_FREE(lmmk, lmm_size);
1302
1303         RETURN(rc);
1304 }
1305
1306 static int osc_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
1307                          void *karg, void *uarg)
1308 {
1309         struct obd_device *obddev = class_conn2obd(conn);
1310         struct obd_ioctl_data *data = karg;
1311         int err = 0;
1312         ENTRY;
1313
1314         switch (cmd) {
1315 #if 0
1316         case IOC_LDLM_TEST: {
1317                 err = ldlm_test(obddev, conn);
1318                 CERROR("-- done err %d\n", err);
1319                 GOTO(out, err);
1320         }
1321         case IOC_LDLM_REGRESS_START: {
1322                 unsigned int numthreads = 1;
1323                 unsigned int numheld = 10;
1324                 unsigned int numres = 10;
1325                 unsigned int numext = 10;
1326                 char *parse;
1327
1328                 if (data->ioc_inllen1) {
1329                         parse = data->ioc_inlbuf1;
1330                         if (*parse != '\0') {
1331                                 while(isspace(*parse)) parse++;
1332                                 numthreads = simple_strtoul(parse, &parse, 0);
1333                                 while(isspace(*parse)) parse++;
1334                         }
1335                         if (*parse != '\0') {
1336                                 while(isspace(*parse)) parse++;
1337                                 numheld = simple_strtoul(parse, &parse, 0);
1338                                 while(isspace(*parse)) parse++;
1339                         }
1340                         if (*parse != '\0') {
1341                                 while(isspace(*parse)) parse++;
1342                                 numres = simple_strtoul(parse, &parse, 0);
1343                                 while(isspace(*parse)) parse++;
1344                         }
1345                         if (*parse != '\0') {
1346                                 while(isspace(*parse)) parse++;
1347                                 numext = simple_strtoul(parse, &parse, 0);
1348                                 while(isspace(*parse)) parse++;
1349                         }
1350                 }
1351
1352                 err = ldlm_regression_start(obddev, conn, numthreads,
1353                                 numheld, numres, numext);
1354
1355                 CERROR("-- done err %d\n", err);
1356                 GOTO(out, err);
1357         }
1358         case IOC_LDLM_REGRESS_STOP: {
1359                 err = ldlm_regression_stop();
1360                 CERROR("-- done err %d\n", err);
1361                 GOTO(out, err);
1362         }
1363 #endif
1364         case IOC_OSC_REGISTER_LOV: {
1365                 if (obddev->u.cli.cl_containing_lov)
1366                         GOTO(out, err = -EALREADY);
1367                 obddev->u.cli.cl_containing_lov = (struct obd_device *)karg;
1368                 GOTO(out, err);
1369         }
1370         case OBD_IOC_LOV_GET_CONFIG: {
1371                 char *buf;
1372                 struct lov_desc *desc;
1373                 struct obd_uuid uuid;
1374
1375                 buf = NULL;
1376                 len = 0;
1377                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
1378                         GOTO(out, err = -EINVAL);
1379
1380                 data = (struct obd_ioctl_data *)buf;
1381
1382                 if (sizeof(*desc) > data->ioc_inllen1) {
1383                         OBD_FREE(buf, len);
1384                         GOTO(out, err = -EINVAL);
1385                 }
1386
1387                 if (data->ioc_inllen2 < sizeof(uuid)) {
1388                         OBD_FREE(buf, len);
1389                         GOTO(out, err = -EINVAL);
1390                 }
1391
1392                 desc = (struct lov_desc *)data->ioc_inlbuf1;
1393                 desc->ld_tgt_count = 1;
1394                 desc->ld_active_tgt_count = 1;
1395                 desc->ld_default_stripe_count = 1;
1396                 desc->ld_default_stripe_size = 0;
1397                 desc->ld_default_stripe_offset = 0;
1398                 desc->ld_pattern = 0;
1399                 memcpy(&desc->ld_uuid, &obddev->obd_uuid, sizeof(uuid));
1400
1401                 memcpy(data->ioc_inlbuf2, &obddev->obd_uuid, sizeof(uuid));
1402
1403                 err = copy_to_user((void *)uarg, buf, len);
1404                 if (err)
1405                         err = -EFAULT;
1406                 OBD_FREE(buf, len);
1407                 GOTO(out, err);
1408         }
1409         case LL_IOC_LOV_SETSTRIPE:
1410                 err = obd_alloc_memmd(conn, karg);
1411                 if (err > 0)
1412                         err = 0;
1413                 GOTO(out, err);
1414         case LL_IOC_LOV_GETSTRIPE:
1415                 err = osc_getstripe(conn, karg, uarg);
1416                 GOTO(out, err);
1417         default:
1418                 CERROR ("osc_ioctl(): unrecognised ioctl %#x\n", cmd);
1419                 GOTO(out, err = -ENOTTY);
1420         }
1421 out:
1422         return err;
1423 }
1424
1425 static void set_osc_active(struct obd_import *imp, int active)
1426 {
1427         struct obd_device *notify_obd;
1428
1429         LASSERT(imp->imp_obd);
1430
1431         notify_obd = imp->imp_obd->u.cli.cl_containing_lov;
1432
1433         if (notify_obd == NULL)
1434                 return;
1435
1436         /* How gross is _this_? */
1437         if (!list_empty(&notify_obd->obd_exports)) {
1438                 int rc;
1439                 struct lustre_handle fakeconn;
1440                 struct obd_ioctl_data ioc_data = { 0 };
1441                 struct obd_export *exp =
1442                         list_entry(notify_obd->obd_exports.next,
1443                                    struct obd_export, exp_obd_chain);
1444
1445                 fakeconn.addr = (__u64)(unsigned long)exp;
1446                 fakeconn.cookie = exp->exp_cookie;
1447                 ioc_data.ioc_inlbuf1 =
1448                         (char *)&imp->imp_obd->u.cli.cl_target_uuid;
1449                 ioc_data.ioc_offset = active;
1450                 rc = obd_iocontrol(IOC_LOV_SET_OSC_ACTIVE, &fakeconn,
1451                                    sizeof ioc_data, &ioc_data, NULL);
1452                 if (rc)
1453                         CERROR("error disabling %s on LOV %p/%s: %d\n",
1454                                imp->imp_obd->u.cli.cl_target_uuid.uuid,
1455                                notify_obd, notify_obd->obd_uuid.uuid, rc);
1456         } else {
1457                 CDEBUG(D_HA, "No exports for obd %p/%s, can't notify about "
1458                        "%p\n", notify_obd, notify_obd->obd_uuid.uuid,
1459                        imp->imp_obd->obd_uuid.uuid);
1460         }
1461 }
1462
1463 static int osc_recover(struct obd_import *imp, int phase)
1464 {
1465         int rc;
1466         unsigned long flags;
1467         int msg_flags;
1468         struct ptlrpc_request *req;
1469         struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
1470         ENTRY;
1471
1472         CDEBUG(D_HA, "%s: entering phase: %d\n",
1473                imp->imp_obd->obd_name, phase);
1474         switch(phase) {
1475
1476             case PTLRPC_RECOVD_PHASE_PREPARE: {
1477                 if (imp->imp_flags & IMP_REPLAYABLE) {
1478                         CDEBUG(D_HA, "failover OST\n");
1479                         /* If we're a failover OSC/OST, just cancel unused
1480                          * locks to simplify lock replay.
1481                          */
1482                         ldlm_cli_cancel_unused(ns, NULL, LDLM_FL_LOCAL_ONLY);
1483                 } else {
1484                         CDEBUG(D_HA, "non-failover OST\n");
1485                         /* Non-failover OSTs (LLNL scenario) disable the OSC
1486                          * and invalidate local state.
1487                          */
1488                         ldlm_namespace_cleanup(ns, 1 /* no network ops */);
1489                         ptlrpc_abort_inflight(imp, 0);
1490                         set_osc_active(imp, 0 /* inactive */);
1491                 }
1492                 RETURN(0);
1493             }
1494
1495         case PTLRPC_RECOVD_PHASE_RECOVER: {
1496         reconnect:
1497                 imp->imp_flags &= ~IMP_INVALID;
1498                 rc = ptlrpc_reconnect_import(imp, OST_CONNECT, &req);
1499
1500                 msg_flags = req->rq_repmsg
1501                         ? lustre_msg_get_op_flags(req->rq_repmsg)
1502                         : 0;
1503
1504                 if (rc == -EBUSY && (msg_flags & MSG_CONNECT_RECOVERING))
1505                         CERROR("reconnect denied by recovery; should retry\n");
1506
1507                 if (rc) {
1508                         if (phase != PTLRPC_RECOVD_PHASE_NOTCONN) {
1509                                 CERROR("can't reconnect, invalidating\n");
1510                                 ldlm_namespace_cleanup(ns, 1);
1511                                 ptlrpc_abort_inflight(imp, 0);
1512                         }
1513                         imp->imp_flags |= IMP_INVALID;
1514                         ptlrpc_req_finished(req);
1515                         RETURN(rc);
1516                 }
1517
1518                 if (msg_flags & MSG_CONNECT_RECOVERING) {
1519                         /* Replay if they want it. */
1520                         DEBUG_REQ(D_HA, req, "OST wants replay");
1521                         rc = ptlrpc_replay(imp);
1522                         if (rc)
1523                                 GOTO(check_rc, rc);
1524
1525                         rc = ldlm_replay_locks(imp);
1526                         if (rc)
1527                                 GOTO(check_rc, rc);
1528
1529                         rc = signal_completed_replay(imp);
1530                         if (rc)
1531                                 GOTO(check_rc, rc);
1532                 } else if (msg_flags & MSG_CONNECT_RECONNECT) {
1533                         DEBUG_REQ(D_HA, req, "reconnecting to MDS\n");
1534                         /* Nothing else to do here. */
1535                 } else {
1536                         DEBUG_REQ(D_HA, req, "evicted: invalidating\n");
1537                         /* Otherwise, clean everything up. */
1538                         ldlm_namespace_cleanup(ns, 1);
1539                         ptlrpc_abort_inflight(imp, 0);
1540                 }
1541
1542                 ptlrpc_req_finished(req);
1543
1544                 spin_lock_irqsave(&imp->imp_lock, flags);
1545                 imp->imp_level = LUSTRE_CONN_FULL;
1546                 imp->imp_flags &= ~IMP_INVALID;
1547                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1548
1549                 /* Is this the right place?  Should we do this in _PREPARE
1550                  * as well?  What about raising the level right away?
1551                  */
1552                 ptlrpc_wake_delayed(imp);
1553
1554                 rc = ptlrpc_resend(imp);
1555                 if (rc)
1556                         GOTO(check_rc, rc);
1557
1558                 set_osc_active(imp, 1 /* active */);
1559                 RETURN(0);
1560
1561         check_rc:
1562                 /* If we get disconnected in the middle, recovery has probably
1563                  * failed.  Reconnect and find out.
1564                  */
1565                 if (rc == -ENOTCONN)
1566                         goto reconnect;
1567                 RETURN(rc);
1568         }
1569             case PTLRPC_RECOVD_PHASE_NOTCONN:
1570                 osc_recover(imp, PTLRPC_RECOVD_PHASE_PREPARE);
1571                 RETURN(osc_recover(imp, PTLRPC_RECOVD_PHASE_RECOVER));
1572
1573             default:
1574                 RETURN(-EINVAL);
1575         }
1576 }
1577
1578 static int osc_connect(struct lustre_handle *conn, struct obd_device *obd,
1579                        struct obd_uuid *cluuid, struct recovd_obd *recovd,
1580                        ptlrpc_recovery_cb_t recover)
1581 {
1582         struct obd_import *imp = &obd->u.cli.cl_import;
1583         imp->imp_recover = osc_recover;
1584         return client_obd_connect(conn, obd, cluuid, recovd, recover);
1585 }
1586
1587 struct obd_ops osc_obd_ops = {
1588         o_owner:        THIS_MODULE,
1589         o_attach:       osc_attach,
1590         o_detach:       osc_detach,
1591         o_setup:        client_obd_setup,
1592         o_cleanup:      client_obd_cleanup,
1593         o_connect:      osc_connect,
1594         o_disconnect:   client_obd_disconnect,
1595         o_statfs:       osc_statfs,
1596         o_packmd:       osc_packmd,
1597         o_unpackmd:     osc_unpackmd,
1598         o_create:       osc_create,
1599         o_destroy:      osc_destroy,
1600         o_getattr:      osc_getattr,
1601         o_setattr:      osc_setattr,
1602         o_open:         osc_open,
1603         o_close:        osc_close,
1604         o_brw:          osc_brw,
1605         o_punch:        osc_punch,
1606         o_enqueue:      osc_enqueue,
1607         o_cancel:       osc_cancel,
1608         o_cancel_unused: osc_cancel_unused,
1609         o_iocontrol:    osc_iocontrol
1610 };
1611
1612 struct obd_ops sanosc_obd_ops = {
1613         o_owner:        THIS_MODULE,
1614         o_attach:       osc_attach,
1615         o_detach:       osc_detach,
1616         o_cleanup:      client_obd_cleanup,
1617         o_connect:      osc_connect,
1618         o_disconnect:   client_obd_disconnect,
1619         o_statfs:       osc_statfs,
1620         o_packmd:       osc_packmd,
1621         o_unpackmd:     osc_unpackmd,
1622         o_create:       osc_create,
1623         o_destroy:      osc_destroy,
1624         o_getattr:      osc_getattr,
1625         o_setattr:      osc_setattr,
1626         o_open:         osc_open,
1627         o_close:        osc_close,
1628 #ifdef __KERNEL__
1629         o_setup:        client_sanobd_setup,
1630         o_brw:          sanosc_brw,
1631 #endif
1632         o_punch:        osc_punch,
1633         o_enqueue:      osc_enqueue,
1634         o_cancel:       osc_cancel,
1635         o_cancel_unused: osc_cancel_unused,
1636         o_iocontrol:    osc_iocontrol,
1637 };
1638
1639 int __init osc_init(void)
1640 {
1641         struct lprocfs_static_vars lvars;
1642         int rc;
1643         ENTRY;
1644
1645         LASSERT(sizeof(struct osc_obdo_data) <= FD_OSTDATA_SIZE);
1646
1647         lprocfs_init_vars(&lvars);
1648
1649         rc = class_register_type(&osc_obd_ops, lvars.module_vars,
1650                                  LUSTRE_OSC_NAME);
1651         if (rc)
1652                 RETURN(rc);
1653
1654         rc = class_register_type(&sanosc_obd_ops, lvars.module_vars,
1655                                  LUSTRE_SANOSC_NAME);
1656         if (rc)
1657                 class_unregister_type(LUSTRE_OSC_NAME);
1658
1659         RETURN(rc);
1660 }
1661
1662 static void __exit osc_exit(void)
1663 {
1664         class_unregister_type(LUSTRE_SANOSC_NAME);
1665         class_unregister_type(LUSTRE_OSC_NAME);
1666 }
1667
1668 #ifdef __KERNEL__
1669 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1670 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
1671 MODULE_LICENSE("GPL");
1672
1673 module_init(osc_init);
1674 module_exit(osc_exit);
1675 #endif