Whamcloud - gitweb
Merge b_md into HEAD
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  *  For testing and management it is treated as an obd_device,
23  *  although * it does not export a full OBD method table (the
24  *  requests are coming * in over the wire, so object target modules
25  *  do not have a full * method table.)
26  *
27  */
28
29 #define EXPORT_SYMTAB
30 #define DEBUG_SUBSYSTEM S_OSC
31
32 #include <linux/version.h>
33 #include <linux/module.h>
34 #include <linux/mm.h>
35 #include <linux/highmem.h>
36 #include <linux/lustre_dlm.h>
37 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
38 #include <linux/workqueue.h>
39 #endif
40 #include <linux/kp30.h>
41 #include <linux/lustre_mds.h> /* for mds_objid */
42 #include <linux/obd_ost.h>
43 #include <linux/obd_lov.h> /* for IOC_LOV_SET_OSC_ACTIVE */
44 #include <linux/ctype.h>
45 #include <linux/init.h>
46 #include <linux/lustre_ha.h>
47 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
48 #include <linux/lustre_lite.h> /* for ll_i2info */
49 #include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
50 #include <linux/lprocfs_status.h>
51
52 static int osc_attach(struct obd_device *dev, obd_count len, void *data)
53 {
54         struct lprocfs_static_vars lvars;
55
56         lprocfs_init_vars(&lvars);
57         return lprocfs_obd_attach(dev, lvars.obd_vars);
58 }
59
60 static int osc_detach(struct obd_device *dev)
61 {
62         return lprocfs_obd_detach(dev);
63 }
64
65 /* Pack OSC object metadata for shipment to the MDS. */
66 static int osc_packmd(struct lustre_handle *conn, struct lov_mds_md **lmmp,
67                       struct lov_stripe_md *lsm)
68 {
69         int lmm_size;
70         ENTRY;
71
72         lmm_size = sizeof(**lmmp);
73         if (!lmmp)
74                 RETURN(lmm_size);
75
76         if (*lmmp && !lsm) {
77                 OBD_FREE(*lmmp, lmm_size);
78                 *lmmp = NULL;
79                 RETURN(0);
80         }
81
82         if (!*lmmp) {
83                 OBD_ALLOC(*lmmp, lmm_size);
84                 if (!*lmmp)
85                         RETURN(-ENOMEM);
86         }
87         if (lsm) {
88                 LASSERT(lsm->lsm_object_id);
89                 (*lmmp)->lmm_object_id = (lsm->lsm_object_id);
90         }
91
92         RETURN(lmm_size);
93 }
94
95 static int osc_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
96                         struct lov_mds_md *lmm)
97 {
98         int lsm_size;
99         ENTRY;
100
101         lsm_size = sizeof(**lsmp);
102         if (!lsmp)
103                 RETURN(lsm_size);
104
105         if (*lsmp && !lmm) {
106                 OBD_FREE(*lsmp, lsm_size);
107                 *lsmp = NULL;
108                 RETURN(0);
109         }
110
111         if (!*lsmp) {
112                 OBD_ALLOC(*lsmp, lsm_size);
113                 if (!*lsmp)
114                         RETURN(-ENOMEM);
115         }
116
117         /* XXX endianness */
118         if (lmm) {
119                 (*lsmp)->lsm_object_id = (lmm->lmm_object_id);
120                 LASSERT((*lsmp)->lsm_object_id);
121         }
122
123         RETURN(lsm_size);
124 }
125
126 inline void oti_from_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
127 {
128         if (oti && req->rq_repmsg)
129                 oti->oti_transno = NTOH__u64(req->rq_repmsg->transno);
130         EXIT;
131 }
132
133 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
134                        struct lov_stripe_md *md)
135 {
136         struct ptlrpc_request *request;
137         struct ost_body *body;
138         int rc, size = sizeof(*body);
139         ENTRY;
140
141         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
142                                   &size, NULL);
143         if (!request)
144                 RETURN(-ENOMEM);
145
146         body = lustre_msg_buf(request->rq_reqmsg, 0);
147 #warning FIXME: pack only valid fields instead of memcpy, endianness
148         memcpy(&body->oa, oa, sizeof(*oa));
149
150         request->rq_replen = lustre_msg_size(1, &size);
151
152         rc = ptlrpc_queue_wait(request);
153         if (rc) {
154                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
155                 GOTO(out, rc);
156         }
157
158         body = lustre_msg_buf(request->rq_repmsg, 0);
159         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
160         memcpy(oa, &body->oa, sizeof(*oa));
161
162         EXIT;
163  out:
164         ptlrpc_req_finished(request);
165         return rc;
166 }
167
168 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
169                     struct lov_stripe_md *md, struct obd_trans_info *oti)
170 {
171         struct ptlrpc_request *request;
172         struct ost_body *body;
173         int rc, size = sizeof(*body);
174         ENTRY;
175
176         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
177                                   NULL);
178         if (!request)
179                 RETURN(-ENOMEM);
180
181 #warning FIXME: request->rq_flags |= PTL_RPC_FL_REPLAY;
182         body = lustre_msg_buf(request->rq_reqmsg, 0);
183 #warning FIXME: pack only valid fields instead of memcpy, endianness
184         memcpy(&body->oa, oa, sizeof(*oa));
185
186         request->rq_replen = lustre_msg_size(1, &size);
187
188         rc = ptlrpc_queue_wait(request);
189         if (rc)
190                 GOTO(out, rc);
191
192         body = lustre_msg_buf(request->rq_repmsg, 0);
193         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
194         if (oa)
195                 memcpy(oa, &body->oa, sizeof(*oa));
196
197         EXIT;
198  out:
199         ptlrpc_req_finished(request);
200         return rc;
201 }
202
203 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
204                      struct lov_stripe_md *md, struct obd_trans_info *oti)
205 {
206         struct ptlrpc_request *request;
207         struct ost_body *body;
208         int rc, size = sizeof(*body);
209         ENTRY;
210
211         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CLOSE, 1, &size,
212                                   NULL);
213         if (!request)
214                 RETURN(-ENOMEM);
215
216         body = lustre_msg_buf(request->rq_reqmsg, 0);
217 #warning FIXME: pack only valid fields instead of memcpy, endianness
218         memcpy(&body->oa, oa, sizeof(*oa));
219
220         request->rq_replen = lustre_msg_size(1, &size);
221
222         rc = ptlrpc_queue_wait(request);
223         if (rc)
224                 GOTO(out, rc);
225
226         body = lustre_msg_buf(request->rq_repmsg, 0);
227         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
228         if (oa)
229                 memcpy(oa, &body->oa, sizeof(*oa));
230
231         EXIT;
232  out:
233         ptlrpc_req_finished(request);
234         return rc;
235 }
236
237 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
238                        struct lov_stripe_md *md, struct obd_trans_info *oti)
239 {
240         struct ptlrpc_request *request;
241         struct ost_body *body;
242         int rc, size = sizeof(*body);
243         ENTRY;
244
245         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
246                                   &size, NULL);
247         if (!request)
248                 RETURN(-ENOMEM);
249
250         body = lustre_msg_buf(request->rq_reqmsg, 0);
251         memcpy(&body->oa, oa, sizeof(*oa));
252
253         request->rq_replen = lustre_msg_size(1, &size);
254
255         rc = ptlrpc_queue_wait(request);
256
257         ptlrpc_req_finished(request);
258         return rc;
259 }
260
261 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
262                       struct lov_stripe_md **ea, struct obd_trans_info *oti_in)
263 {
264         struct ptlrpc_request *request;
265         struct ost_body *body;
266         struct lov_stripe_md *lsm;
267         struct obd_trans_info *oti, trans_info;
268         int rc, size = sizeof(*body);
269         ENTRY;
270
271         LASSERT(oa);
272         LASSERT(ea);
273
274         lsm = *ea;
275         if (!lsm) {
276                 rc = obd_alloc_memmd(conn, &lsm);
277                 if (rc < 0)
278                         RETURN(rc);
279         }
280
281         if (oti_in)
282                 oti = oti_in;
283         else
284                 oti = &trans_info;
285
286         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
287                                   NULL);
288         if (!request)
289                 GOTO(out, rc = -ENOMEM);
290
291         body = lustre_msg_buf(request->rq_reqmsg, 0);
292         memcpy(&body->oa, oa, sizeof(*oa));
293
294         request->rq_replen = lustre_msg_size(1, &size);
295
296         rc = ptlrpc_queue_wait(request);
297         if (rc)
298                 GOTO(out_req, rc);
299
300         body = lustre_msg_buf(request->rq_repmsg, 0);
301         memcpy(oa, &body->oa, sizeof(*oa));
302
303         lsm->lsm_object_id = oa->o_id;
304         lsm->lsm_stripe_count = 0;
305         *ea = lsm;
306
307         oti_from_request(oti, request);
308         CDEBUG(D_HA, "transno: "LPD64"\n", oti->oti_transno);
309         EXIT;
310 out_req:
311         ptlrpc_req_finished(request);
312 out:
313         if (rc && !*ea)
314                 obd_free_memmd(conn, &lsm);
315         return rc;
316 }
317
318 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
319                      struct lov_stripe_md *md, obd_size start,
320                      obd_size end, struct obd_trans_info *oti)
321 {
322         struct ptlrpc_request *request;
323         struct ost_body *body;
324         int rc, size = sizeof(*body);
325         ENTRY;
326
327         if (!oa) {
328                 CERROR("oa NULL\n");
329                 RETURN(-EINVAL);
330         }
331
332         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
333                                   NULL);
334         if (!request)
335                 RETURN(-ENOMEM);
336
337         body = lustre_msg_buf(request->rq_reqmsg, 0);
338 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
339         memcpy(&body->oa, oa, sizeof(*oa));
340
341         /* overload the size and blocks fields in the oa with start/end */
342         body->oa.o_size = HTON__u64(start);
343         body->oa.o_blocks = HTON__u64(end);
344         body->oa.o_valid |= HTON__u32(OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
345
346         request->rq_replen = lustre_msg_size(1, &size);
347
348         rc = ptlrpc_queue_wait(request);
349         if (rc)
350                 GOTO(out, rc);
351
352         body = lustre_msg_buf(request->rq_repmsg, 0);
353         memcpy(oa, &body->oa, sizeof(*oa));
354
355         EXIT;
356  out:
357         ptlrpc_req_finished(request);
358         return rc;
359 }
360
361 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
362                        struct lov_stripe_md *ea, struct obd_trans_info *oti)
363 {
364         struct ptlrpc_request *request;
365         struct ost_body *body;
366         int rc, size = sizeof(*body);
367         ENTRY;
368
369         if (!oa) {
370                 CERROR("oa NULL\n");
371                 RETURN(-EINVAL);
372         }
373         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
374                                   &size, NULL);
375         if (!request)
376                 RETURN(-ENOMEM);
377
378         body = lustre_msg_buf(request->rq_reqmsg, 0);
379 #warning FIXME: pack only valid fields instead of memcpy, endianness
380         memcpy(&body->oa, oa, sizeof(*oa));
381
382         request->rq_replen = lustre_msg_size(1, &size);
383
384         rc = ptlrpc_queue_wait(request);
385         if (rc)
386                 GOTO(out, rc);
387
388         body = lustre_msg_buf(request->rq_repmsg, 0);
389         memcpy(oa, &body->oa, sizeof(*oa));
390
391         EXIT;
392  out:
393         ptlrpc_req_finished(request);
394         return rc;
395 }
396
397 /* Our bulk-unmapping bottom half. */
398 static void unmap_and_decref_bulk_desc(void *data)
399 {
400         struct ptlrpc_bulk_desc *desc = data;
401         struct list_head *tmp;
402         ENTRY;
403
404         /* This feels wrong to me. */
405         list_for_each(tmp, &desc->bd_page_list) {
406                 struct ptlrpc_bulk_page *bulk;
407                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
408
409                 kunmap(bulk->bp_page);
410                 obd_kmap_put(1);
411         }
412
413         ptlrpc_bulk_decref(desc);
414         EXIT;
415 }
416
417
418 /*  this is the callback function which is invoked by the Portals
419  *  event handler associated with the bulk_sink queue and bulk_source queue.
420  */
421 static void osc_ptl_ev_hdlr(struct ptlrpc_bulk_desc *desc)
422 {
423         ENTRY;
424
425         LASSERT(desc->bd_brw_set != NULL);
426         LASSERT(desc->bd_brw_set->brw_callback != NULL);
427
428         desc->bd_brw_set->brw_callback(desc->bd_brw_set, CB_PHASE_FINISH);
429
430         /* We can't kunmap the desc from interrupt context, so we do it from
431          * the bottom half above. */
432         prepare_work(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
433         schedule_work(&desc->bd_queue);
434
435         EXIT;
436 }
437
438 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
439                         obd_count page_count, struct brw_page *pga,
440                         struct obd_brw_set *set)
441 {
442         struct obd_import *imp = class_conn2cliimp(conn);
443         struct ptlrpc_connection *connection = imp->imp_connection;
444         struct ptlrpc_request *request = NULL;
445         struct ptlrpc_bulk_desc *desc = NULL;
446         struct ost_body *body;
447         int rc, size[3] = {sizeof(*body)}, mapped = 0;
448         unsigned long flags;
449         struct obd_ioobj *iooptr;
450         void *nioptr;
451         __u32 xid;
452         ENTRY;
453
454         size[1] = sizeof(struct obd_ioobj);
455         size[2] = page_count * sizeof(struct niobuf_remote);
456
457         request = ptlrpc_prep_req(imp, OST_READ, 3, size, NULL);
458         if (!request)
459                 RETURN(-ENOMEM);
460
461         body = lustre_msg_buf(request->rq_reqmsg, 0);
462
463         desc = ptlrpc_prep_bulk(connection);
464         if (!desc)
465                 GOTO(out_req, rc = -ENOMEM);
466         desc->bd_portal = OST_BULK_PORTAL;
467         desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
468         CDEBUG(D_PAGE, "desc = %p\n", desc);
469
470         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
471         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
472         ost_pack_ioo(&iooptr, lsm, page_count);
473         /* end almost identical to brw_write case */
474
475         spin_lock_irqsave(&imp->imp_lock, flags);
476         xid = ++imp->imp_last_xid;       /* single xid for all pages */
477         spin_unlock_irqrestore(&imp->imp_lock, flags);
478
479         obd_kmap_get(page_count, 0);
480
481         for (mapped = 0; mapped < page_count; mapped++) {
482                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
483                 if (bulk == NULL)
484                         GOTO(out_unmap, rc = -ENOMEM);
485
486                 bulk->bp_xid = xid;           /* single xid for all pages */
487
488                 bulk->bp_buf = kmap(pga[mapped].pg);
489                 bulk->bp_page = pga[mapped].pg;
490                 bulk->bp_buflen = PAGE_SIZE;
491                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
492                                 pga[mapped].flag, bulk->bp_xid);
493         }
494
495         /*
496          * Register the bulk first, because the reply could arrive out of order,
497          * and we want to be ready for the bulk data.
498          *
499          * One reference is released when brw_finish is complete, the other when
500          * the caller removes us from the "set" list.
501          *
502          * On error, we never do the brw_finish, so we handle all decrefs.
503          */
504         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
505                 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
506                        OBD_FAIL_OSC_BRW_READ_BULK);
507         } else {
508                 rc = ptlrpc_register_bulk_put(desc);
509                 if (rc)
510                         GOTO(out_unmap, rc);
511                 obd_brw_set_add(set, desc);
512         }
513
514         request->rq_replen = lustre_msg_size(1, size);
515         rc = ptlrpc_queue_wait(request);
516
517         /*
518          * XXX: If there is an error during the processing of the callback,
519          *      such as a timeout in a sleep that it performs, brw_finish
520          *      will never get called, and we'll leak the desc, fail to kunmap
521          *      things, cats will live with dogs.  One solution would be to
522          *      export brw_finish as osc_brw_finish, so that the timeout case
523          *      and its kin could call it for proper cleanup.  An alternative
524          *      would be for an error return from the callback to cause us to
525          *      clean up, but that doesn't help the truly async cases (like
526          *      LOV), which will immediately return from their PHASE_START
527          *      callback, before any such cleanup-requiring error condition can
528          *      be detected.
529          */
530  out_req:
531         ptlrpc_req_finished(request);
532         RETURN(rc);
533
534         /* Clean up on error. */
535 out_unmap:
536         while (mapped-- > 0)
537                 kunmap(pga[mapped].pg);
538         obd_kmap_put(page_count);
539         ptlrpc_bulk_decref(desc);
540         goto out_req;
541 }
542
543 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *lsm,
544                          obd_count page_count, struct brw_page *pga,
545                          struct obd_brw_set *set, struct obd_trans_info *oti)
546 {
547         struct obd_import *imp = class_conn2cliimp(conn);
548         struct ptlrpc_connection *connection = imp->imp_connection;
549         struct ptlrpc_request *request = NULL;
550         struct ptlrpc_bulk_desc *desc = NULL;
551         struct ost_body *body;
552         int rc, size[3] = {sizeof(*body)}, mapped = 0;
553         unsigned long flags;
554         struct obd_ioobj *iooptr;
555         void *nioptr;
556         __u32 xid;
557         ENTRY;
558
559         size[1] = sizeof(struct obd_ioobj);
560         size[2] = page_count * sizeof(struct niobuf_remote);
561
562         request = ptlrpc_prep_req(imp, OST_WRITE, 3, size, NULL);
563         if (!request)
564                 RETURN(-ENOMEM);
565
566         body = lustre_msg_buf(request->rq_reqmsg, 0);
567
568         desc = ptlrpc_prep_bulk(connection);
569         if (!desc)
570                 GOTO(out_req, rc = -ENOMEM);
571         desc->bd_portal = OSC_BULK_PORTAL;
572         desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
573         CDEBUG(D_PAGE, "desc = %p\n", desc);
574
575         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
576         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
577         ost_pack_ioo(&iooptr, lsm, page_count);
578         /* end almost identical to brw_read case */
579
580         spin_lock_irqsave(&imp->imp_lock, flags);
581         xid = ++imp->imp_last_xid;       /* single xid for all pages */
582         spin_unlock_irqrestore(&imp->imp_lock, flags);
583
584         obd_kmap_get(page_count, 0);
585
586         for (mapped = 0; mapped < page_count; mapped++) {
587                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
588                 if (bulk == NULL)
589                         GOTO(out_unmap, rc = -ENOMEM);
590
591                 bulk->bp_xid = xid;           /* single xid for all pages */
592
593                 bulk->bp_buf = kmap(pga[mapped].pg);
594                 bulk->bp_page = pga[mapped].pg;
595                 bulk->bp_buflen = PAGE_SIZE;
596                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
597                                 pga[mapped].flag, bulk->bp_xid);
598         }
599
600         /*
601          * Register the bulk first, because the reply could arrive out of
602          * order, and we want to be ready for the bulk data.
603          *
604          * One reference is released when brw_finish is complete, the other
605          * when the caller removes us from the "set" list.
606          *
607          * On error, we never do the brw_finish, so we handle all decrefs.
608          */
609         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK)) {
610                 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
611                 OBD_FAIL_OSC_BRW_WRITE_BULK);
612         } else {
613                 rc = ptlrpc_register_bulk_get(desc);
614                 if (rc)
615                         GOTO(out_unmap, rc);
616                 obd_brw_set_add(set, desc);
617         }
618
619         request->rq_replen = lustre_msg_size(1, size);
620         rc = ptlrpc_queue_wait(request);
621
622         /*
623          * XXX: If there is an error during the processing of the callback,
624          *      such as a timeout in a sleep that it performs, brw_finish
625          *      will never get called, and we'll leak the desc, fail to kunmap
626          *      things, cats will live with dogs.  One solution would be to
627          *      export brw_finish as osc_brw_finish, so that the timeout case
628          *      and its kin could call it for proper cleanup.  An alternative
629          *      would be for an error return from the callback to cause us to
630          *      clean up, but that doesn't help the truly async cases (like
631          *      LOV), which will immediately return from their PHASE_START
632          *      callback, before any such cleanup-requiring error condition can
633          *      be detected.
634          */
635  out_req:
636         ptlrpc_req_finished(request);
637         RETURN(rc);
638
639         /* Clean up on error. */
640 out_unmap:
641         while (mapped-- > 0)
642                 kunmap(pga[mapped].pg);
643         obd_kmap_put(page_count);
644         ptlrpc_bulk_decref(desc);
645         goto out_req;
646 }
647
648 static int osc_brw(int cmd, struct lustre_handle *conn,
649                    struct lov_stripe_md *md, obd_count page_count,
650                    struct brw_page *pga, struct obd_brw_set *set, 
651                    struct obd_trans_info *oti)
652 {
653         ENTRY;
654
655         while (page_count) {
656                 obd_count pages_per_brw;
657                 int rc;
658
659                 if (page_count > PTL_MD_MAX_IOV)
660                         pages_per_brw = PTL_MD_MAX_IOV;
661                 else
662                         pages_per_brw = page_count;
663
664                 if (cmd & OBD_BRW_WRITE)
665                         rc = osc_brw_write(conn, md, pages_per_brw, pga, set, oti);
666                 else
667                         rc = osc_brw_read(conn, md, pages_per_brw, pga, set);
668
669                 if (rc != 0)
670                         RETURN(rc);
671
672                 page_count -= pages_per_brw;
673                 pga += pages_per_brw;
674         }
675         RETURN(0);
676 }
677
678 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
679                        struct lustre_handle *parent_lock,
680                        __u32 type, void *extentp, int extent_len, __u32 mode,
681                        int *flags, void *callback, void *data, int datalen,
682                        struct lustre_handle *lockh)
683 {
684         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
685         struct obd_device *obddev = class_conn2obd(connh);
686         struct ldlm_extent *extent = extentp;
687         int rc;
688         ENTRY;
689
690         /* Filesystem locks are given a bit of special treatment: if
691          * this is not a file size lock (which has end == -1), we
692          * fixup the lock to start and end on page boundaries. */
693         if (extent->end != OBD_OBJECT_EOF) {
694                 extent->start &= PAGE_MASK;
695                 extent->end = (extent->end & PAGE_MASK) + PAGE_SIZE - 1;
696         }
697
698         /* Next, search for already existing extent locks that will cover us */
699         rc = ldlm_lock_match(obddev->obd_namespace, 0, &res_id, type, extent,
700                              sizeof(extent), mode, lockh);
701         if (rc == 1)
702                 /* We already have a lock, and it's referenced */
703                 RETURN(ELDLM_OK);
704
705         /* If we're trying to read, we also search for an existing PW lock.  The
706          * VFS and page cache already protect us locally, so lots of readers/
707          * writers can share a single PW lock.
708          *
709          * There are problems with conversion deadlocks, so instead of
710          * converting a read lock to a write lock, we'll just enqueue a new
711          * one.
712          *
713          * At some point we should cancel the read lock instead of making them
714          * send us a blocking callback, but there are problems with canceling
715          * locks out from other users right now, too. */
716
717         if (mode == LCK_PR) {
718                 rc = ldlm_lock_match(obddev->obd_namespace, 0, &res_id, type,
719                                      extent, sizeof(extent), LCK_PW, lockh);
720                 if (rc == 1) {
721                         /* FIXME: This is not incredibly elegant, but it might
722                          * be more elegant than adding another parameter to
723                          * lock_match.  I want a second opinion. */
724                         ldlm_lock_addref(lockh, LCK_PR);
725                         ldlm_lock_decref(lockh, LCK_PW);
726
727                         RETURN(ELDLM_OK);
728                 }
729         }
730
731         rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
732                               res_id, type, extent, sizeof(extent), mode, flags,
733                               ldlm_completion_ast, callback, data, NULL,
734                               lockh);
735         RETURN(rc);
736 }
737
738 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
739                       __u32 mode, struct lustre_handle *lockh)
740 {
741         ENTRY;
742
743         ldlm_lock_decref(lockh, mode);
744
745         RETURN(0);
746 }
747
748 static int osc_cancel_unused(struct lustre_handle *connh,
749                              struct lov_stripe_md *lsm, int flags)
750 {
751         struct obd_device *obddev = class_conn2obd(connh);
752         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
753
754         return ldlm_cli_cancel_unused(obddev->obd_namespace, &res_id, flags);
755 }
756
757 static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
758 {
759         struct ptlrpc_request *request;
760         int rc, size = sizeof(*osfs);
761         ENTRY;
762
763         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
764                                   NULL);
765         if (!request)
766                 RETURN(-ENOMEM);
767
768         request->rq_replen = lustre_msg_size(1, &size);
769
770         rc = ptlrpc_queue_wait(request);
771         if (rc) {
772                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
773                 GOTO(out, rc);
774         }
775
776         obd_statfs_unpack(osfs, lustre_msg_buf(request->rq_repmsg, 0));
777
778         EXIT;
779  out:
780         ptlrpc_req_finished(request);
781         return rc;
782 }
783
784 /* Retrieve object striping information.
785  *
786  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
787  * the maximum number of OST indices which will fit in the user buffer.
788  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
789  */
790 static int osc_getstripe(struct lustre_handle *conn, struct lov_stripe_md *lsm,
791                          struct lov_mds_md *lmmu)
792 {
793         struct lov_mds_md lmm, *lmmk;
794         int rc, lmm_size;
795         ENTRY;
796
797         if (!lsm)
798                 RETURN(-ENODATA);
799
800         rc = copy_from_user(&lmm, lmmu, sizeof(lmm));
801         if (rc)
802                 RETURN(-EFAULT);
803
804         if (lmm.lmm_magic != LOV_MAGIC)
805                 RETURN(-EINVAL);
806
807         if (lmm.lmm_ost_count < 1)
808                 RETURN(-EOVERFLOW);
809
810         lmm_size = sizeof(lmm) + sizeof(lmm.lmm_objects[0]);
811         OBD_ALLOC(lmmk, lmm_size);
812         if (rc < 0)
813                 RETURN(rc);
814
815         lmmk->lmm_stripe_count = 1;
816         lmmk->lmm_ost_count = 1;
817         lmmk->lmm_object_id = lsm->lsm_object_id;
818         lmmk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
819
820         if (copy_to_user(lmmu, lmmk, lmm_size))
821                 rc = -EFAULT;
822
823         OBD_FREE(lmmk, lmm_size);
824
825         RETURN(rc);
826 }
827
828 static int osc_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
829                          void *karg, void *uarg)
830 {
831         struct obd_device *obddev = class_conn2obd(conn);
832         struct obd_ioctl_data *data = karg;
833         int err = 0;
834         ENTRY;
835
836         switch (cmd) {
837 #if 0
838         case IOC_LDLM_TEST: {
839                 err = ldlm_test(obddev, conn);
840                 CERROR("-- done err %d\n", err);
841                 GOTO(out, err);
842         }
843         case IOC_LDLM_REGRESS_START: {
844                 unsigned int numthreads = 1;
845                 unsigned int numheld = 10;
846                 unsigned int numres = 10;
847                 unsigned int numext = 10;
848                 char *parse;
849
850                 if (data->ioc_inllen1) {
851                         parse = data->ioc_inlbuf1;
852                         if (*parse != '\0') {
853                                 while(isspace(*parse)) parse++;
854                                 numthreads = simple_strtoul(parse, &parse, 0);
855                                 while(isspace(*parse)) parse++;
856                         }
857                         if (*parse != '\0') {
858                                 while(isspace(*parse)) parse++;
859                                 numheld = simple_strtoul(parse, &parse, 0);
860                                 while(isspace(*parse)) parse++;
861                         }
862                         if (*parse != '\0') {
863                                 while(isspace(*parse)) parse++;
864                                 numres = simple_strtoul(parse, &parse, 0);
865                                 while(isspace(*parse)) parse++;
866                         }
867                         if (*parse != '\0') {
868                                 while(isspace(*parse)) parse++;
869                                 numext = simple_strtoul(parse, &parse, 0);
870                                 while(isspace(*parse)) parse++;
871                         }
872                 }
873
874                 err = ldlm_regression_start(obddev, conn, numthreads,
875                                 numheld, numres, numext);
876
877                 CERROR("-- done err %d\n", err);
878                 GOTO(out, err);
879         }
880         case IOC_LDLM_REGRESS_STOP: {
881                 err = ldlm_regression_stop();
882                 CERROR("-- done err %d\n", err);
883                 GOTO(out, err);
884         }
885 #endif
886         case IOC_OSC_REGISTER_LOV: {
887                 if (obddev->u.cli.cl_containing_lov)
888                         GOTO(out, err = -EALREADY);
889                 obddev->u.cli.cl_containing_lov = (struct obd_device *)karg;
890                 GOTO(out, err);
891         }
892         case OBD_IOC_LOV_GET_CONFIG: {
893                 char *buf;
894                 struct lov_desc *desc;
895                 struct obd_uuid uuid;
896
897                 buf = NULL;
898                 len = 0;
899                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
900                         GOTO(out, err = -EINVAL);
901
902                 data = (struct obd_ioctl_data *)buf;
903
904                 if (sizeof(*desc) > data->ioc_inllen1) {
905                         OBD_FREE(buf, len);
906                         GOTO(out, err = -EINVAL);
907                 }
908
909                 if (data->ioc_inllen2 < sizeof(uuid.uuid)) {
910                         OBD_FREE(buf, len);
911                         GOTO(out, err = -EINVAL);
912                 }
913
914                 desc = (struct lov_desc *)data->ioc_inlbuf1;
915                 desc->ld_tgt_count = 1;
916                 desc->ld_active_tgt_count = 1;
917                 desc->ld_default_stripe_count = 1;
918                 desc->ld_default_stripe_size = 0;
919                 desc->ld_default_stripe_offset = 0;
920                 desc->ld_pattern = 0;
921                 memcpy(desc->ld_uuid.uuid,  obddev->obd_uuid.uuid, sizeof(uuid.uuid));
922
923                 memcpy(data->ioc_inlbuf2,  obddev->obd_uuid.uuid, 
924                        sizeof(uuid.uuid));
925
926                 err = copy_to_user((void *)uarg, buf, len);
927                 if (err)
928                         err = -EFAULT;
929                 OBD_FREE(buf, len);
930                 GOTO(out, err);
931         }
932         case LL_IOC_LOV_SETSTRIPE:
933                 err = obd_alloc_memmd(conn, karg);
934                 if (err > 0)
935                         err = 0;
936                 GOTO(out, err);
937         case LL_IOC_LOV_GETSTRIPE:
938                 err = osc_getstripe(conn, karg, uarg);
939                 GOTO(out, err);
940         default:
941                 CERROR ("osc_ioctl(): unrecognised ioctl %#x\n", cmd);
942                 GOTO(out, err = -ENOTTY);
943         }
944 out:
945         return err;
946 }
947
948 static void set_osc_active(struct obd_import *imp, int active)
949 {
950         struct obd_device *notify_obd;
951
952         LASSERT(imp->imp_obd);
953
954         notify_obd = imp->imp_obd->u.cli.cl_containing_lov;
955
956         if (notify_obd == NULL)
957                 return;
958
959         /* How gross is _this_? */
960         if (!list_empty(&notify_obd->obd_exports)) {
961                 int rc;
962                 struct lustre_handle fakeconn;
963                 struct obd_ioctl_data ioc_data = { 0 };
964                 struct obd_export *exp =
965                         list_entry(notify_obd->obd_exports.next,
966                                    struct obd_export, exp_obd_chain);
967
968                 fakeconn.addr = (__u64)(unsigned long)exp;
969                 fakeconn.cookie = exp->exp_cookie;
970                 ioc_data.ioc_inlbuf1 = &imp->imp_obd->u.cli.cl_target_uuid;
971                 ioc_data.ioc_offset = active;
972                 rc = obd_iocontrol(IOC_LOV_SET_OSC_ACTIVE, &fakeconn,
973                                    sizeof ioc_data, &ioc_data, NULL);
974                 if (rc) {
975                         CERROR("disabling %s on LOV %p/%s: %d\n",
976                                imp->imp_obd->u.cli.cl_target_uuid.uuid,
977                                notify_obd, notify_obd->obd_uuid.uuid, rc);
978                 }
979         } else {
980                 CDEBUG(D_HA, "No exports for obd %p/%s, can't notify about "
981                        "%p\n", notify_obd, notify_obd->obd_uuid.uuid,
982                        imp->imp_obd->obd_uuid.uuid);
983         }
984 }
985
986 static int osc_recover(struct obd_import *imp, int phase)
987 {
988         int rc;
989         unsigned long flags;
990         struct ptlrpc_request *req;
991         ENTRY;
992
993         switch(phase) {
994
995             case PTLRPC_RECOVD_PHASE_PREPARE: {
996                 struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
997                 ldlm_namespace_cleanup(ns, 1 /* no network ops */);
998                 ptlrpc_abort_inflight(imp, 0);
999                 set_osc_active(imp, 0 /* inactive */);
1000                 RETURN(0);
1001             }
1002
1003             case PTLRPC_RECOVD_PHASE_RECOVER:
1004                 imp->imp_flags &= ~IMP_INVALID;
1005                 rc = ptlrpc_reconnect_import(imp, OST_CONNECT, &req);
1006                 ptlrpc_req_finished(req);
1007                 if (rc) {
1008                         imp->imp_flags |= IMP_INVALID;
1009                         RETURN(rc);
1010                 }
1011
1012                 spin_lock_irqsave(&imp->imp_lock, flags);
1013                 imp->imp_level = LUSTRE_CONN_FULL;
1014                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1015
1016                 /* Is this the right place?  Should we do this in _PREPARE
1017                  * as well?  What about raising the level right away?
1018                  */
1019                 ptlrpc_wake_delayed(imp);
1020
1021                 set_osc_active(imp, 1 /* active */);
1022                 RETURN(0);
1023
1024             case PTLRPC_RECOVD_PHASE_NOTCONN:
1025                 osc_recover(imp, PTLRPC_RECOVD_PHASE_PREPARE);
1026                 RETURN(osc_recover(imp, PTLRPC_RECOVD_PHASE_RECOVER));
1027
1028             default:
1029                 RETURN(-EINVAL);
1030         }
1031 }
1032
1033 static int osc_connect(struct lustre_handle *conn, struct obd_device *obd,
1034                        struct obd_uuid *cluuid, struct recovd_obd *recovd,
1035                        ptlrpc_recovery_cb_t recover)
1036 {
1037         struct obd_import *imp = &obd->u.cli.cl_import;
1038         imp->imp_recover = osc_recover;
1039         return client_obd_connect(conn, obd, cluuid, recovd, recover);
1040 }
1041
1042 struct obd_ops osc_obd_ops = {
1043         o_owner:        THIS_MODULE,
1044         o_attach:       osc_attach,
1045         o_detach:       osc_detach,
1046         o_setup:        client_obd_setup,
1047         o_cleanup:      client_obd_cleanup,
1048         o_connect:      osc_connect,
1049         o_disconnect:   client_obd_disconnect,
1050         o_statfs:       osc_statfs,
1051         o_packmd:       osc_packmd,
1052         o_unpackmd:     osc_unpackmd,
1053         o_create:       osc_create,
1054         o_destroy:      osc_destroy,
1055         o_getattr:      osc_getattr,
1056         o_setattr:      osc_setattr,
1057         o_open:         osc_open,
1058         o_close:        osc_close,
1059         o_brw:          osc_brw,
1060         o_punch:        osc_punch,
1061         o_enqueue:      osc_enqueue,
1062         o_cancel:       osc_cancel,
1063         o_cancel_unused: osc_cancel_unused,
1064         o_iocontrol:    osc_iocontrol
1065 };
1066
1067 static int __init osc_init(void)
1068 {
1069         struct lprocfs_static_vars lvars;
1070
1071         lprocfs_init_vars(&lvars);
1072         RETURN(class_register_type(&osc_obd_ops, lvars.module_vars,
1073                                    LUSTRE_OSC_NAME));
1074 }
1075
1076 static void __exit osc_exit(void)
1077 {
1078         class_unregister_type(LUSTRE_OSC_NAME);
1079 }
1080
1081 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1082 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
1083 MODULE_LICENSE("GPL");
1084
1085 module_init(osc_init);
1086 module_exit(osc_exit);