Whamcloud - gitweb
Land b_recovery (other than WIP lock-replay work).
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  *  For testing and management it is treated as an obd_device,
23  *  although * it does not export a full OBD method table (the
24  *  requests are coming * in over the wire, so object target modules
25  *  do not have a full * method table.)
26  *
27  */
28
29 #define EXPORT_SYMTAB
30 #define DEBUG_SUBSYSTEM S_OSC
31
32 #include <linux/version.h>
33 #include <linux/module.h>
34 #include <linux/mm.h>
35 #include <linux/highmem.h>
36 #include <linux/lustre_dlm.h>
37 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
38 #include <linux/workqueue.h>
39 #endif
40 #include <linux/kp30.h>
41 #include <linux/lustre_mds.h> /* for mds_objid */
42 #include <linux/obd_ost.h>
43 #include <linux/obd_lov.h> /* for IOC_LOV_SET_OSC_ACTIVE */
44 #include <linux/ctype.h>
45 #include <linux/init.h>
46 #include <linux/lustre_ha.h>
47 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
48 #include <linux/lustre_lite.h> /* for ll_i2info */
49 #include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
50 #include <linux/lprocfs_status.h>
51
52 extern struct lprocfs_vars status_var_nm_1[];
53 extern struct lprocfs_vars status_class_var[];
54
55 static int osc_attach(struct obd_device *dev, obd_count len, void *data)
56 {
57         return lprocfs_reg_obd(dev, status_var_nm_1, dev);
58 }
59
60 static int osc_detach(struct obd_device *dev)
61 {
62         return lprocfs_dereg_obd(dev);
63 }
64
65 /* Pack OSC object metadata for shipment to the MDS. */
66 static int osc_packmd(struct lustre_handle *conn, struct lov_mds_md **lmmp,
67                       struct lov_stripe_md *lsm)
68 {
69         int lmm_size;
70         ENTRY;
71
72         lmm_size = sizeof(**lmmp);
73         if (!lmmp)
74                 RETURN(lmm_size);
75
76         if (*lmmp && !lsm) {
77                 OBD_FREE(*lmmp, lmm_size);
78                 *lmmp = NULL;
79                 RETURN(0);
80         }
81
82         if (!*lmmp) {
83                 OBD_ALLOC(*lmmp, lmm_size);
84                 if (!*lmmp)
85                         RETURN(-ENOMEM);
86         }
87         if (lsm) {
88                 LASSERT(lsm->lsm_object_id);
89                 (*lmmp)->lmm_object_id = (lsm->lsm_object_id);
90         }
91
92         RETURN(lmm_size);
93 }
94
95 static int osc_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
96                         struct lov_mds_md *lmm)
97 {
98         int lsm_size;
99         ENTRY;
100
101         lsm_size = sizeof(**lsmp);
102         if (!lsmp)
103                 RETURN(lsm_size);
104
105         if (*lsmp && !lmm) {
106                 OBD_FREE(*lsmp, lsm_size);
107                 *lsmp = NULL;
108                 RETURN(0);
109         }
110
111         if (!*lsmp) {
112                 OBD_ALLOC(*lsmp, lsm_size);
113                 if (!*lsmp)
114                         RETURN(-ENOMEM);
115         }
116
117         /* XXX endianness */
118         if (lmm) {
119                 (*lsmp)->lsm_object_id = (lmm->lmm_object_id);
120                 LASSERT((*lsmp)->lsm_object_id);
121         }
122
123         RETURN(lsm_size);
124 }
125
126 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
127                        struct lov_stripe_md *md)
128 {
129         struct ptlrpc_request *request;
130         struct ost_body *body;
131         int rc, size = sizeof(*body);
132         ENTRY;
133
134         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
135                                   &size, NULL);
136         if (!request)
137                 RETURN(-ENOMEM);
138
139         body = lustre_msg_buf(request->rq_reqmsg, 0);
140 #warning FIXME: pack only valid fields instead of memcpy, endianness
141         memcpy(&body->oa, oa, sizeof(*oa));
142
143         request->rq_replen = lustre_msg_size(1, &size);
144
145         rc = ptlrpc_queue_wait(request);
146         if (rc) {
147                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
148                 GOTO(out, rc);
149         }
150
151         body = lustre_msg_buf(request->rq_repmsg, 0);
152         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
153         if (oa)
154                 memcpy(oa, &body->oa, sizeof(*oa));
155
156         EXIT;
157  out:
158         ptlrpc_req_finished(request);
159         return rc;
160 }
161
162 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
163                     struct lov_stripe_md *md)
164 {
165         struct ptlrpc_request *request;
166         struct ost_body *body;
167         int rc, size = sizeof(*body);
168         ENTRY;
169
170         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
171                                   NULL);
172         if (!request)
173                 RETURN(-ENOMEM);
174
175         body = lustre_msg_buf(request->rq_reqmsg, 0);
176 #warning FIXME: pack only valid fields instead of memcpy, endianness
177         memcpy(&body->oa, oa, sizeof(*oa));
178
179         request->rq_replen = lustre_msg_size(1, &size);
180
181         rc = ptlrpc_queue_wait(request);
182         if (rc)
183                 GOTO(out, rc);
184
185         body = lustre_msg_buf(request->rq_repmsg, 0);
186         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
187         if (oa)
188                 memcpy(oa, &body->oa, sizeof(*oa));
189
190         EXIT;
191  out:
192         ptlrpc_req_finished(request);
193         return rc;
194 }
195
196 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
197                      struct lov_stripe_md *md)
198 {
199         struct ptlrpc_request *request;
200         struct ost_body *body;
201         int rc, size = sizeof(*body);
202         ENTRY;
203
204         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CLOSE, 1, &size,
205                                   NULL);
206         if (!request)
207                 RETURN(-ENOMEM);
208
209         body = lustre_msg_buf(request->rq_reqmsg, 0);
210 #warning FIXME: pack only valid fields instead of memcpy, endianness
211         memcpy(&body->oa, oa, sizeof(*oa));
212
213         request->rq_replen = lustre_msg_size(1, &size);
214
215         rc = ptlrpc_queue_wait(request);
216         if (rc)
217                 GOTO(out, rc);
218
219         body = lustre_msg_buf(request->rq_repmsg, 0);
220         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
221         if (oa)
222                 memcpy(oa, &body->oa, sizeof(*oa));
223
224         EXIT;
225  out:
226         ptlrpc_req_finished(request);
227         return rc;
228 }
229
230 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
231                        struct lov_stripe_md *md)
232 {
233         struct ptlrpc_request *request;
234         struct ost_body *body;
235         int rc, size = sizeof(*body);
236         ENTRY;
237
238         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
239                                   &size, NULL);
240         if (!request)
241                 RETURN(-ENOMEM);
242
243         body = lustre_msg_buf(request->rq_reqmsg, 0);
244         memcpy(&body->oa, oa, sizeof(*oa));
245
246         request->rq_replen = lustre_msg_size(1, &size);
247
248         rc = ptlrpc_queue_wait(request);
249
250         ptlrpc_req_finished(request);
251         return rc;
252 }
253
254 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
255                       struct lov_stripe_md **ea)
256 {
257         struct ptlrpc_request *request;
258         struct ost_body *body;
259         struct lov_stripe_md *lsm;
260         int rc, size = sizeof(*body);
261         ENTRY;
262
263         LASSERT(oa);
264         LASSERT(ea);
265
266         lsm = *ea;
267         if (!lsm) {
268                 rc = obd_alloc_memmd(conn, &lsm);
269                 if (rc < 0)
270                         RETURN(rc);
271         }
272
273         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
274                                   NULL);
275         if (!request)
276                 GOTO(out, rc = -ENOMEM);
277
278         body = lustre_msg_buf(request->rq_reqmsg, 0);
279         memcpy(&body->oa, oa, sizeof(*oa));
280
281         request->rq_replen = lustre_msg_size(1, &size);
282
283         rc = ptlrpc_queue_wait(request);
284         if (rc)
285                 GOTO(out_req, rc);
286
287         body = lustre_msg_buf(request->rq_repmsg, 0);
288         memcpy(oa, &body->oa, sizeof(*oa));
289
290         lsm->lsm_object_id = oa->o_id;
291         lsm->lsm_stripe_count = 0;
292         *ea = lsm;
293         EXIT;
294 out_req:
295         ptlrpc_req_finished(request);
296 out:
297         if (rc && !*ea)
298                 obd_free_memmd(conn, &lsm);
299         return rc;
300 }
301
302 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
303                      struct lov_stripe_md *md, obd_size start,
304                      obd_size end)
305 {
306         struct ptlrpc_request *request;
307         struct ost_body *body;
308         int rc, size = sizeof(*body);
309         ENTRY;
310
311         if (!oa) {
312                 CERROR("oa NULL\n");
313                 RETURN(-EINVAL);
314         }
315
316         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
317                                   NULL);
318         if (!request)
319                 RETURN(-ENOMEM);
320
321         body = lustre_msg_buf(request->rq_reqmsg, 0);
322 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
323         memcpy(&body->oa, oa, sizeof(*oa));
324
325         /* overload the size and blocks fields in the oa with start/end */
326         body->oa.o_size = HTON__u64(start);
327         body->oa.o_blocks = HTON__u64(end);
328         body->oa.o_valid |= HTON__u32(OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
329
330         request->rq_replen = lustre_msg_size(1, &size);
331
332         rc = ptlrpc_queue_wait(request);
333         if (rc)
334                 GOTO(out, rc);
335
336         body = lustre_msg_buf(request->rq_repmsg, 0);
337         memcpy(oa, &body->oa, sizeof(*oa));
338
339         EXIT;
340  out:
341         ptlrpc_req_finished(request);
342         return rc;
343 }
344
345 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
346                        struct lov_stripe_md *ea)
347 {
348         struct ptlrpc_request *request;
349         struct ost_body *body;
350         int rc, size = sizeof(*body);
351         ENTRY;
352
353         if (!oa) {
354                 CERROR("oa NULL\n");
355                 RETURN(-EINVAL);
356         }
357         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
358                                   &size, NULL);
359         if (!request)
360                 RETURN(-ENOMEM);
361
362         body = lustre_msg_buf(request->rq_reqmsg, 0);
363 #warning FIXME: pack only valid fields instead of memcpy, endianness
364         memcpy(&body->oa, oa, sizeof(*oa));
365
366         request->rq_replen = lustre_msg_size(1, &size);
367
368         rc = ptlrpc_queue_wait(request);
369         if (rc)
370                 GOTO(out, rc);
371
372         body = lustre_msg_buf(request->rq_repmsg, 0);
373         memcpy(oa, &body->oa, sizeof(*oa));
374
375         EXIT;
376  out:
377         ptlrpc_req_finished(request);
378         return rc;
379 }
380
381 /* Our bulk-unmapping bottom half. */
382 static void unmap_and_decref_bulk_desc(void *data)
383 {
384         struct ptlrpc_bulk_desc *desc = data;
385         struct list_head *tmp;
386         ENTRY;
387
388         /* This feels wrong to me. */
389         list_for_each(tmp, &desc->bd_page_list) {
390                 struct ptlrpc_bulk_page *bulk;
391                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
392
393                 kunmap(bulk->bp_page);
394                 obd_kmap_put(1);
395         }
396
397         ptlrpc_bulk_decref(desc);
398         EXIT;
399 }
400
401 /*  this is the callback function which is invoked by the Portals
402  *  event handler associated with the bulk_sink queue and bulk_source queue. 
403  */
404 static void osc_ptl_ev_hdlr(struct ptlrpc_bulk_desc *desc)
405 {
406         ENTRY;
407
408         LASSERT(desc->bd_brw_set != NULL);
409         LASSERT(desc->bd_brw_set->brw_callback != NULL);
410
411         desc->bd_brw_set->brw_callback(desc->bd_brw_set, CB_PHASE_FINISH);
412
413         /* We can't kunmap the desc from interrupt context, so we do it from
414          * the bottom half above. */
415         prepare_work(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
416         schedule_work(&desc->bd_queue);
417
418         EXIT;
419 }
420
421 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
422                         obd_count page_count, struct brw_page *pga,
423                         struct obd_brw_set *set)
424 {
425         struct obd_import *imp = class_conn2cliimp(conn);
426         struct ptlrpc_connection *connection = imp->imp_connection;
427         struct ptlrpc_request *request = NULL;
428         struct ptlrpc_bulk_desc *desc = NULL;
429         struct ost_body *body;
430         int rc, size[3] = {sizeof(*body)}, mapped = 0;
431         void *iooptr, *nioptr;
432         __u32 xid;
433         ENTRY;
434
435         size[1] = sizeof(struct obd_ioobj);
436         size[2] = page_count * sizeof(struct niobuf_remote);
437
438         request = ptlrpc_prep_req(imp, OST_READ, 3, size, NULL);
439         if (!request)
440                 RETURN(-ENOMEM);
441
442         body = lustre_msg_buf(request->rq_reqmsg, 0);
443
444         desc = ptlrpc_prep_bulk(connection);
445         if (!desc)
446                 GOTO(out_req, rc = -ENOMEM);
447         desc->bd_portal = OST_BULK_PORTAL;
448         desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
449         CDEBUG(D_PAGE, "desc = %p\n", desc);
450
451         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
452         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
453         ost_pack_ioo(&iooptr, lsm, page_count);
454         /* end almost identical to brw_write case */
455
456         spin_lock(&imp->imp_lock);
457         xid = ++imp->imp_last_xid;       /* single xid for all pages */
458         spin_unlock(&imp->imp_lock);
459
460         obd_kmap_get(page_count, 0);
461
462         for (mapped = 0; mapped < page_count; mapped++) {
463                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
464                 if (bulk == NULL)
465                         GOTO(out_unmap, rc = -ENOMEM);
466
467                 bulk->bp_xid = xid;           /* single xid for all pages */
468
469                 bulk->bp_buf = kmap(pga[mapped].pg);
470                 bulk->bp_page = pga[mapped].pg;
471                 bulk->bp_buflen = PAGE_SIZE;
472                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
473                                 pga[mapped].flag, bulk->bp_xid);
474         }
475
476         /*
477          * Register the bulk first, because the reply could arrive out of order,
478          * and we want to be ready for the bulk data.
479          *
480          * One reference is released when brw_finish is complete, the other when
481          * the caller removes us from the "set" list.
482          *
483          * On error, we never do the brw_finish, so we handle all decrefs.
484          */
485         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
486                 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
487                        OBD_FAIL_OSC_BRW_READ_BULK);
488         } else {
489                 rc = ptlrpc_register_bulk(desc);
490                 if (rc)
491                         GOTO(out_unmap, rc);
492                 obd_brw_set_add(set, desc);
493         }
494
495         request->rq_replen = lustre_msg_size(1, size);
496         rc = ptlrpc_queue_wait(request);
497
498         /*
499          * XXX: If there is an error during the processing of the callback,
500          *      such as a timeout in a sleep that it performs, brw_finish
501          *      will never get called, and we'll leak the desc, fail to kunmap
502          *      things, cats will live with dogs.  One solution would be to
503          *      export brw_finish as osc_brw_finish, so that the timeout case
504          *      and its kin could call it for proper cleanup.  An alternative
505          *      would be for an error return from the callback to cause us to
506          *      clean up, but that doesn't help the truly async cases (like
507          *      LOV), which will immediately return from their PHASE_START
508          *      callback, before any such cleanup-requiring error condition can
509          *      be detected.
510          */
511  out_req:
512         ptlrpc_req_finished(request);
513         RETURN(rc);
514
515         /* Clean up on error. */
516 out_unmap:
517         while (mapped-- > 0)
518                 kunmap(pga[mapped].pg);
519         obd_kmap_put(page_count);
520         ptlrpc_bulk_decref(desc);
521         goto out_req;
522 }
523
524 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
525                          obd_count page_count, struct brw_page *pga,
526                          struct obd_brw_set *set)
527 {
528         struct ptlrpc_connection *connection =
529                 client_conn2cli(conn)->cl_import.imp_connection;
530         struct ptlrpc_request *request = NULL;
531         struct ptlrpc_bulk_desc *desc = NULL;
532         struct ost_body *body;
533         struct niobuf_local *local = NULL;
534         struct niobuf_remote *remote;
535         int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
536         void *iooptr, *nioptr;
537         ENTRY;
538
539         size[1] = sizeof(struct obd_ioobj);
540         size[2] = page_count * sizeof(*remote);
541
542         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_WRITE, 3, size,
543                                   NULL);
544         if (!request)
545                 RETURN(-ENOMEM);
546
547         body = lustre_msg_buf(request->rq_reqmsg, 0);
548
549         desc = ptlrpc_prep_bulk(connection);
550         if (!desc)
551                GOTO(out_req, rc = -ENOMEM);
552         desc->bd_portal = OSC_BULK_PORTAL;
553         desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
554         CDEBUG(D_PAGE, "desc = %p\n", desc);
555
556         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
557         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
558         ost_pack_ioo(&iooptr, md, page_count);
559         /* end almost identical to brw_read case */
560
561         OBD_ALLOC(local, page_count * sizeof(*local));
562         if (!local)
563                 GOTO(out_desc, rc = -ENOMEM);
564
565         obd_kmap_get(page_count, 0);
566
567         for (mapped = 0; mapped < page_count; mapped++) {
568                 local[mapped].addr = kmap(pga[mapped].pg);
569
570                 CDEBUG(D_INFO, "kmap(pg) = %p ; pg->flags = %lx ; pg->count = "
571                        "%d ; page %d of %d\n",
572                        local[mapped].addr, pga[mapped].pg->flags,
573                        page_count(pga[mapped].pg),
574                        mapped, page_count - 1);
575
576                 local[mapped].offset = pga[mapped].off;
577                 local[mapped].len = pga[mapped].count;
578                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
579                                 pga[mapped].flag, 0);
580         }
581
582         size[1] = page_count * sizeof(*remote);
583         request->rq_replen = lustre_msg_size(2, size);
584         rc = ptlrpc_queue_wait(request);
585         if (rc)
586                 GOTO(out_unmap, rc);
587
588         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
589         if (!nioptr)
590                 GOTO(out_unmap, rc = -EINVAL);
591
592         if (request->rq_repmsg->buflens[1] != size[1]) {
593                 CERROR("buffer length wrong (%d vs. %d)\n",
594                        request->rq_repmsg->buflens[1], size[1]);
595                 GOTO(out_unmap, rc = -EINVAL);
596         }
597
598         for (j = 0; j < page_count; j++) {
599                 struct ptlrpc_bulk_page *bulk;
600
601                 ost_unpack_niobuf(&nioptr, &remote);
602
603                 bulk = ptlrpc_prep_bulk_page(desc);
604                 if (!bulk)
605                         GOTO(out_unmap, rc = -ENOMEM);
606
607                 bulk->bp_buf = (void *)(unsigned long)local[j].addr;
608                 bulk->bp_buflen = local[j].len;
609                 bulk->bp_xid = remote->xid;
610                 bulk->bp_page = pga[j].pg;
611         }
612
613         if (desc->bd_page_count != page_count)
614                 LBUG();
615
616         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
617                 GOTO(out_unmap, rc = 0);
618
619         OBD_FREE(local, page_count * sizeof(*local));
620
621         /* One reference is released when brw_finish is complete, the other
622          * when the caller removes it from the "set" list. */
623         obd_brw_set_add(set, desc);
624         rc = ptlrpc_send_bulk(desc);
625
626         /* XXX: Mike, same question as in osc_brw_read. */
627 out_req:
628         ptlrpc_req_finished(request);
629         RETURN(rc);
630
631         /* Clean up on error. */
632 out_unmap:
633         while (mapped-- > 0)
634                 kunmap(pga[mapped].pg);
635
636         obd_kmap_put(page_count);
637
638         OBD_FREE(local, page_count * sizeof(*local));
639 out_desc:
640         ptlrpc_bulk_decref(desc);
641         goto out_req;
642 }
643
644 static int osc_brw(int cmd, struct lustre_handle *conn,
645                    struct lov_stripe_md *md, obd_count page_count,
646                    struct brw_page *pga, struct obd_brw_set *set)
647 {
648         ENTRY;
649
650         while (page_count) {
651                 obd_count pages_per_brw;
652                 int rc;
653
654                 if (page_count > PTL_MD_MAX_IOV)
655                         pages_per_brw = PTL_MD_MAX_IOV;
656                 else
657                         pages_per_brw = page_count;
658
659                 if (cmd & OBD_BRW_WRITE)
660                         rc = osc_brw_write(conn, md, pages_per_brw, pga, set);
661                 else
662                         rc = osc_brw_read(conn, md, pages_per_brw, pga, set);
663
664                 if (rc != 0)
665                         RETURN(rc);
666
667                 page_count -= pages_per_brw;
668                 pga += pages_per_brw;
669         }
670         RETURN(0);
671 }
672
673 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
674                        struct lustre_handle *parent_lock,
675                        __u32 type, void *extentp, int extent_len, __u32 mode,
676                        int *flags, void *callback, void *data, int datalen,
677                        struct lustre_handle *lockh)
678 {
679         __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
680         struct obd_device *obddev = class_conn2obd(connh);
681         struct ldlm_extent *extent = extentp;
682         int rc;
683         ENTRY;
684
685         /* Filesystem locks are given a bit of special treatment: if
686          * this is not a file size lock (which has end == -1), we
687          * fixup the lock to start and end on page boundaries. */
688         if (extent->end != OBD_OBJECT_EOF) {
689                 extent->start &= PAGE_MASK;
690                 extent->end = (extent->end & PAGE_MASK) + PAGE_SIZE - 1;
691         }
692
693         /* Next, search for already existing extent locks that will cover us */
694         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
695                              sizeof(extent), mode, lockh);
696         if (rc == 1)
697                 /* We already have a lock, and it's referenced */
698                 RETURN(ELDLM_OK);
699
700         /* If we're trying to read, we also search for an existing PW lock.  The
701          * VFS and page cache already protect us locally, so lots of readers/
702          * writers can share a single PW lock.
703          *
704          * There are problems with conversion deadlocks, so instead of
705          * converting a read lock to a write lock, we'll just enqueue a new
706          * one.
707          *
708          * At some point we should cancel the read lock instead of making them
709          * send us a blocking callback, but there are problems with canceling
710          * locks out from other users right now, too. */
711
712         if (mode == LCK_PR) {
713                 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type,
714                                      extent, sizeof(extent), LCK_PW, lockh);
715                 if (rc == 1) {
716                         /* FIXME: This is not incredibly elegant, but it might
717                          * be more elegant than adding another parameter to
718                          * lock_match.  I want a second opinion. */
719                         ldlm_lock_addref(lockh, LCK_PR);
720                         ldlm_lock_decref(lockh, LCK_PW);
721
722                         RETURN(ELDLM_OK);
723                 }
724         }
725
726         rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
727                               res_id, type, extent, sizeof(extent), mode, flags,
728                               ldlm_completion_ast, callback, data, datalen,
729                               lockh);
730         RETURN(rc);
731 }
732
733 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
734                       __u32 mode, struct lustre_handle *lockh)
735 {
736         ENTRY;
737
738         ldlm_lock_decref(lockh, mode);
739
740         RETURN(0);
741 }
742
743 static int osc_cancel_unused(struct lustre_handle *connh,
744                              struct lov_stripe_md *lsm, int flags)
745 {
746         struct obd_device *obddev = class_conn2obd(connh);
747         __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
748
749         return ldlm_cli_cancel_unused(obddev->obd_namespace, res_id, flags);
750 }
751
752 static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
753 {
754         struct ptlrpc_request *request;
755         int rc, size = sizeof(*osfs);
756         ENTRY;
757
758         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
759                                   NULL);
760         if (!request)
761                 RETURN(-ENOMEM);
762
763         request->rq_replen = lustre_msg_size(1, &size);
764
765         rc = ptlrpc_queue_wait(request);
766         if (rc) {
767                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
768                 GOTO(out, rc);
769         }
770
771         obd_statfs_unpack(osfs, lustre_msg_buf(request->rq_repmsg, 0));
772
773         EXIT;
774  out:
775         ptlrpc_req_finished(request);
776         return rc;
777 }
778
779 static int osc_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
780                          void *karg, void *uarg)
781 {
782         struct obd_device *obddev = class_conn2obd(conn);
783         struct obd_ioctl_data *data = karg;
784         int err = 0;
785         ENTRY;
786
787         switch (cmd) {
788         case IOC_LDLM_TEST: {
789                 err = ldlm_test(obddev, conn);
790                 CERROR("-- done err %d\n", err);
791                 GOTO(out, err);
792         }
793         case IOC_LDLM_REGRESS_START: {
794                 unsigned int numthreads = 1;
795                 unsigned int numheld = 10;
796                 unsigned int numres = 10;
797                 unsigned int numext = 10;
798                 char *parse;
799
800                 if (data->ioc_inllen1) {
801                         parse = data->ioc_inlbuf1;
802                         if (*parse != '\0') {
803                                 while(isspace(*parse)) parse++;
804                                 numthreads = simple_strtoul(parse, &parse, 0);
805                                 while(isspace(*parse)) parse++;
806                         }
807                         if (*parse != '\0') {
808                                 while(isspace(*parse)) parse++;
809                                 numheld = simple_strtoul(parse, &parse, 0);
810                                 while(isspace(*parse)) parse++;
811                         }
812                         if (*parse != '\0') {
813                                 while(isspace(*parse)) parse++;
814                                 numres = simple_strtoul(parse, &parse, 0);
815                                 while(isspace(*parse)) parse++;
816                         }
817                         if (*parse != '\0') {
818                                 while(isspace(*parse)) parse++;
819                                 numext = simple_strtoul(parse, &parse, 0);
820                                 while(isspace(*parse)) parse++;
821                         }
822                 }
823
824                 err = ldlm_regression_start(obddev, conn, numthreads,
825                                 numheld, numres, numext);
826
827                 CERROR("-- done err %d\n", err);
828                 GOTO(out, err);
829         }
830         case IOC_LDLM_REGRESS_STOP: {
831                 err = ldlm_regression_stop();
832                 CERROR("-- done err %d\n", err);
833                 GOTO(out, err);
834         }
835         case IOC_OSC_REGISTER_LOV: {
836                 if (obddev->u.cli.cl_containing_lov)
837                         GOTO(out, err = -EALREADY);
838                 obddev->u.cli.cl_containing_lov = (struct obd_device *)karg;
839                 GOTO(out, err);
840         }
841         case OBD_IOC_LOV_GET_CONFIG: {
842                 char *buf;
843                 struct lov_desc *desc;
844                 obd_uuid_t *uuidp;
845
846                 buf = NULL;
847                 len = 0;
848                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
849                         GOTO(out, err = -EINVAL);
850
851                 data = (struct obd_ioctl_data *)buf;
852
853                 if (sizeof(*desc) > data->ioc_inllen1) {
854                         OBD_FREE(buf, len);
855                         GOTO(out, err = -EINVAL);
856                 }
857
858                 if (data->ioc_inllen2 < sizeof(*uuidp)) {
859                         OBD_FREE(buf, len);
860                         GOTO(out, err = -EINVAL);
861                 }
862
863                 desc = (struct lov_desc *)data->ioc_inlbuf1;
864                 desc->ld_tgt_count = 1;
865                 desc->ld_active_tgt_count = 1;
866                 desc->ld_default_stripe_count = 1;
867                 desc->ld_default_stripe_size = 0;
868                 desc->ld_default_stripe_offset = 0;
869                 desc->ld_pattern = 0;
870                 memcpy(desc->ld_uuid,  obddev->obd_uuid, sizeof(*uuidp));
871
872                 uuidp = (obd_uuid_t *)data->ioc_inlbuf2;
873                 memcpy(uuidp,  obddev->obd_uuid, sizeof(*uuidp));
874
875                 err = copy_to_user((void *)uarg, buf, len);
876                 if (err)
877                         err = -EFAULT;
878                 OBD_FREE(buf, len);
879                 GOTO(out, err);
880         }
881         default:
882                 CERROR ("osc_ioctl(): unrecognised ioctl %#lx\n", cmd);
883                 GOTO(out, err = -ENOTTY);
884         }
885 out:
886         return err;
887 }
888
889 static void set_osc_active(struct obd_import *imp, int active)
890 {
891         struct obd_device *notify_obd = imp->imp_obd->u.cli.cl_containing_lov;
892
893         if (notify_obd == NULL)
894                 return;
895
896         /* How gross is _this_? */
897         if (!list_empty(&notify_obd->obd_exports)) {
898                 int rc;
899                 struct lustre_handle fakeconn;
900                 struct obd_ioctl_data ioc_data;
901                 struct obd_export *exp =
902                         list_entry(notify_obd->obd_exports.next,
903                                    struct obd_export, exp_obd_chain);
904
905                 fakeconn.addr = (__u64)(unsigned long)exp;
906                 fakeconn.cookie = exp->exp_cookie;
907                 ioc_data.ioc_inlbuf1 = imp->imp_obd->obd_uuid;
908                 ioc_data.ioc_offset = active;
909                 rc = obd_iocontrol(IOC_LOV_SET_OSC_ACTIVE, &fakeconn,
910                                    sizeof ioc_data, &ioc_data, NULL);
911                 if (rc)
912                         CERROR("disabling %s on LOV %p/%s: %d\n",
913                                imp->imp_obd->obd_uuid, notify_obd,
914                                notify_obd->obd_uuid, rc);
915         } else {
916                 CDEBUG(D_HA, "No exports for obd %p/%s, can't notify about "
917                        "%p\n", notify_obd, notify_obd->obd_uuid,
918                        imp->imp_obd->obd_uuid);
919         }
920 }
921
922
923 /* XXX looks a lot like super.c:invalidate_request_list, don't it? */
924 static void abort_inflight_for_import(struct obd_import *imp)
925 {
926         struct list_head *tmp, *n;
927
928         /* Make sure that no new requests get processed for this import.
929          * ptlrpc_queue_wait must (and does) hold imp_lock while testing this
930          * flag and then putting requests on sending_list or delayed_list.
931          */
932         spin_lock(&imp->imp_lock);
933         imp->imp_flags |= IMP_INVALID;
934         spin_unlock(&imp->imp_lock);
935
936         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
937                 struct ptlrpc_request *req =
938                         list_entry(tmp, struct ptlrpc_request, rq_list);
939
940                 DEBUG_REQ(D_HA, req, "inflight");
941                 req->rq_flags |= PTL_RPC_FL_ERR;
942                 wake_up(&req->rq_wait_for_rep);
943         }
944
945         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
946                 struct ptlrpc_request *req =
947                         list_entry(tmp, struct ptlrpc_request, rq_list);
948
949                 DEBUG_REQ(D_HA, req, "aborting waiting req");
950                 req->rq_flags |= PTL_RPC_FL_ERR;
951                 wake_up(&req->rq_wait_for_rep);
952         }
953 }
954
955 static int osc_recover(struct obd_import *imp, int phase)
956 {
957         int rc;
958         ENTRY;
959
960         switch(phase) {
961             case PTLRPC_RECOVD_PHASE_PREPARE: {
962                 struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
963                 ldlm_namespace_cleanup(ns, 1 /* no network ops */);
964                 abort_inflight_for_import(imp);
965                 set_osc_active(imp, 0 /* inactive */);
966                 RETURN(0);
967             }
968
969             case PTLRPC_RECOVD_PHASE_RECOVER:
970                 imp->imp_flags &= ~IMP_INVALID;
971                 rc = ptlrpc_reconnect_import(imp, OST_CONNECT);
972                 if (rc) {
973                         imp->imp_flags |= IMP_INVALID;
974                         RETURN(rc);
975                 }
976
977                 spin_lock(&imp->imp_lock);
978                 imp->imp_level = LUSTRE_CONN_FULL;
979                 spin_unlock(&imp->imp_lock);
980
981                 set_osc_active(imp, 1 /* active */);
982                 RETURN(0);
983
984             default:
985                 RETURN(-EINVAL);
986         }
987 }
988
989 static int osc_connect(struct lustre_handle *conn, struct obd_device *obd,
990                        obd_uuid_t cluuid, struct recovd_obd *recovd,
991                        ptlrpc_recovery_cb_t recover)
992 {
993         struct obd_import *imp = &obd->u.cli.cl_import;
994         imp->imp_recover = osc_recover;
995         return client_obd_connect(conn, obd, cluuid, recovd, recover);
996 }
997
998 struct obd_ops osc_obd_ops = {
999         o_attach:       osc_attach,
1000         o_detach:       osc_detach,
1001         o_setup:        client_obd_setup,
1002         o_cleanup:      client_obd_cleanup,
1003         o_connect:      osc_connect,
1004         o_disconnect:   client_obd_disconnect,
1005         o_statfs:       osc_statfs,
1006         o_packmd:       osc_packmd,
1007         o_unpackmd:     osc_unpackmd,
1008         o_create:       osc_create,
1009         o_destroy:      osc_destroy,
1010         o_getattr:      osc_getattr,
1011         o_setattr:      osc_setattr,
1012         o_open:         osc_open,
1013         o_close:        osc_close,
1014         o_brw:          osc_brw,
1015         o_punch:        osc_punch,
1016         o_enqueue:      osc_enqueue,
1017         o_cancel:       osc_cancel,
1018         o_cancel_unused: osc_cancel_unused,
1019         o_iocontrol:    osc_iocontrol
1020 };
1021
1022 static int __init osc_init(void)
1023 {
1024         RETURN(class_register_type(&osc_obd_ops, status_class_var,
1025                                    LUSTRE_OSC_NAME));
1026 }
1027
1028 static void __exit osc_exit(void)
1029 {
1030         class_unregister_type(LUSTRE_OSC_NAME);
1031 }
1032
1033 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1034 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
1035 MODULE_LICENSE("GPL");
1036
1037 module_init(osc_init);
1038 module_exit(osc_exit);